moonpool/moonpool/Moonpool/Blocking_queue/index.html
2025-05-02 15:01:11 +00:00

17 lines
11 KiB
HTML
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<!DOCTYPE html>
<html xmlns="http://www.w3.org/1999/xhtml"><head><title>Blocking_queue (moonpool.Moonpool.Blocking_queue)</title><meta charset="utf-8"/><link rel="stylesheet" href="../../../_odoc-theme/odoc.css"/><meta name="generator" content="odoc 3.0.0"/><meta name="viewport" content="width=device-width,initial-scale=1.0"/><script src="../../../highlight.pack.js"></script><script>hljs.initHighlightingOnLoad();</script></head><body class="odoc"><nav class="odoc-nav"><a href="../index.html">Up</a> <a href="../../../index.html">Index</a> &#x00BB; <a href="../../index.html">moonpool</a> &#x00BB; <a href="../index.html">Moonpool</a> &#x00BB; Blocking_queue</nav><header class="odoc-preamble"><h1>Module <code><span>Moonpool.Blocking_queue</span></code></h1><p>A simple blocking queue.</p><p>This queue is quite basic and will not behave well under heavy contention. However, it can be sufficient for many practical use cases.</p><p><b>NOTE</b>: this queue will typically block the caller thread in case the operation (push/pop) cannot proceed. Be wary of deadlocks when using the queue <i>from</i> a pool when you expect the other end to also be produced/consumed from the same pool.</p><p>See discussion on <a href="../Fut/index.html#val-wait_block"><code>Fut.wait_block</code></a> for more details on deadlocks and how to mitigate the risk of running into them.</p><p>More scalable queues can be found in Lockfree (https://github.com/ocaml-multicore/lockfree/)</p></header><div class="odoc-content"><div class="odoc-spec"><div class="spec type anchored" id="type-t"><a href="#type-t" class="anchor"></a><code><span><span class="keyword">type</span> <span>'a t</span></span></code></div><div class="spec-doc"><p>Unbounded blocking queue.</p><p>This queue is thread-safe and will block when calling <a href="#val-pop"><code>pop</code></a> on it when it's empty.</p></div></div><div class="odoc-spec"><div class="spec value anchored" id="val-create"><a href="#val-create" class="anchor"></a><code><span><span class="keyword">val</span> create : <span>unit <span class="arrow">&#45;&gt;</span></span> <span><span class="type-var">_</span> <a href="#type-t">t</a></span></span></code></div><div class="spec-doc"><p>Create a new unbounded queue.</p></div></div><div class="odoc-spec"><div class="spec value anchored" id="val-size"><a href="#val-size" class="anchor"></a><code><span><span class="keyword">val</span> size : <span><span><span class="type-var">_</span> <a href="#type-t">t</a></span> <span class="arrow">&#45;&gt;</span></span> int</span></code></div><div class="spec-doc"><p>Number of items currently in the queue. Note that <code>pop</code> might still block if this returns a non-zero number, since another thread might have consumed the items in the mean time.</p><ul class="at-tags"><li class="since"><span class="at-tag">since</span> 0.2</li></ul></div></div><div class="odoc-spec"><div class="spec exception anchored" id="exception-Closed"><a href="#exception-Closed" class="anchor"></a><code><span><span class="keyword">exception</span> </span><span><span class="exception">Closed</span></span></code></div></div><div class="odoc-spec"><div class="spec value anchored" id="val-push"><a href="#val-push" class="anchor"></a><code><span><span class="keyword">val</span> push : <span><span><span class="type-var">'a</span> <a href="#type-t">t</a></span> <span class="arrow">&#45;&gt;</span></span> <span><span class="type-var">'a</span> <span class="arrow">&#45;&gt;</span></span> unit</span></code></div><div class="spec-doc"><p><code>push q x</code> pushes <code>x</code> into <code>q</code>, and returns <code>()</code>.</p><p>In the current implementation, <code>push q</code> will never block for a long time, it will only block while waiting for a lock so it can push the element.</p><ul class="at-tags"><li class="raises"><span class="at-tag">raises</span> <a href="#exception-Closed"><code>Closed</code></a> <p>if the queue is closed (by a previous call to <code>close q</code>)</p></li></ul></div></div><div class="odoc-spec"><div class="spec value anchored" id="val-pop"><a href="#val-pop" class="anchor"></a><code><span><span class="keyword">val</span> pop : <span><span><span class="type-var">'a</span> <a href="#type-t">t</a></span> <span class="arrow">&#45;&gt;</span></span> <span class="type-var">'a</span></span></code></div><div class="spec-doc"><p><code>pop q</code> pops the next element in <code>q</code>. It might block until an element comes.</p><ul class="at-tags"><li class="raises"><span class="at-tag">raises</span> <a href="#exception-Closed"><code>Closed</code></a> <p>if the queue was closed before a new element was available.</p></li></ul></div></div><div class="odoc-spec"><div class="spec value anchored" id="val-close"><a href="#val-close" class="anchor"></a><code><span><span class="keyword">val</span> close : <span><span><span class="type-var">_</span> <a href="#type-t">t</a></span> <span class="arrow">&#45;&gt;</span></span> unit</span></code></div><div class="spec-doc"><p>Close the queue, meaning there won't be any more <code>push</code> allowed, ie <code>push</code> will raise <a href="#exception-Closed"><code>Closed</code></a>.</p><p><code>pop</code> will keep working and will return the elements present in the queue, until it's entirely drained; then <code>pop</code> will also raise <a href="#exception-Closed"><code>Closed</code></a>.</p></div></div><div class="odoc-spec"><div class="spec value anchored" id="val-try_pop"><a href="#val-try_pop" class="anchor"></a><code><span><span class="keyword">val</span> try_pop : <span><span class="label">force_lock</span>:bool <span class="arrow">&#45;&gt;</span></span> <span><span><span class="type-var">'a</span> <a href="#type-t">t</a></span> <span class="arrow">&#45;&gt;</span></span> <span><span class="type-var">'a</span> option</span></span></code></div><div class="spec-doc"><p><code>try_pop q</code> immediately pops the first element of <code>q</code>, if any, or returns <code>None</code> without blocking.</p><ul class="at-tags"><li class="parameter"><span class="at-tag">parameter</span> <span class="value">force_lock</span> <p>if true, use <code>Mutex.lock</code> (which can block under contention); if false, use <code>Mutex.try_lock</code>, which might return <code>None</code> even in presence of an element if there's contention</p></li></ul></div></div><div class="odoc-spec"><div class="spec value anchored" id="val-try_push"><a href="#val-try_push" class="anchor"></a><code><span><span class="keyword">val</span> try_push : <span><span><span class="type-var">'a</span> <a href="#type-t">t</a></span> <span class="arrow">&#45;&gt;</span></span> <span><span class="type-var">'a</span> <span class="arrow">&#45;&gt;</span></span> bool</span></code></div><div class="spec-doc"><p><code>try_push q x</code> tries to push into <code>q</code>, in which case it returns <code>true</code>; or it fails to push and returns <code>false</code> without blocking.</p><ul class="at-tags"><li class="raises"><span class="at-tag">raises</span> <a href="#exception-Closed"><code>Closed</code></a> <p>if the locking succeeded but the queue is closed.</p></li></ul></div></div><div class="odoc-spec"><div class="spec value anchored" id="val-transfer"><a href="#val-transfer" class="anchor"></a><code><span><span class="keyword">val</span> transfer : <span><span><span class="type-var">'a</span> <a href="#type-t">t</a></span> <span class="arrow">&#45;&gt;</span></span> <span><span><span class="type-var">'a</span> <a href="../../../ocaml/Stdlib/Queue/index.html#type-t">Stdlib.Queue.t</a></span> <span class="arrow">&#45;&gt;</span></span> unit</span></code></div><div class="spec-doc"><p><code>transfer bq q2</code> transfers all items presently in <code>bq</code> into <code>q2</code> in one atomic section, and clears <code>bq</code>. It blocks if no element is in <code>bq</code>.</p><p>This is useful to consume elements from the queue in batch. Create a <code>Queue.t</code> locally:</p><pre class="language-ocaml"><code> let dowork (work_queue : job Bb_queue.t) =
(* local queue, not thread safe *)
let local_q = Queue.create () in
try
while true do
(* work on local events, already on this thread *)
while not (Queue.is_empty local_q) do
let job = Queue.pop local_q in
process_job job
done;
(* get all the events in the incoming blocking queue, in
one single critical section. *)
Bb_queue.transfer work_queue local_q
done
with Bb_queue.Closed -&gt; ()</code></pre><ul class="at-tags"><li class="since"><span class="at-tag">since</span> 0.4</li></ul></div></div><div class="odoc-spec"><div class="spec type anchored" id="type-gen"><a href="#type-gen" class="anchor"></a><code><span><span class="keyword">type</span> <span>'a gen</span></span><span> = <span>unit <span class="arrow">&#45;&gt;</span></span> <span><span class="type-var">'a</span> option</span></span></code></div></div><div class="odoc-spec"><div class="spec type anchored" id="type-iter"><a href="#type-iter" class="anchor"></a><code><span><span class="keyword">type</span> <span>'a iter</span></span><span> = <span><span>(<span><span class="type-var">'a</span> <span class="arrow">&#45;&gt;</span></span> unit)</span> <span class="arrow">&#45;&gt;</span></span> unit</span></code></div></div><div class="odoc-spec"><div class="spec value anchored" id="val-to_iter"><a href="#val-to_iter" class="anchor"></a><code><span><span class="keyword">val</span> to_iter : <span><span><span class="type-var">'a</span> <a href="#type-t">t</a></span> <span class="arrow">&#45;&gt;</span></span> <span><span class="type-var">'a</span> <a href="#type-iter">iter</a></span></span></code></div><div class="spec-doc"><p><code>to_iter q</code> returns an iterator over all items in the queue. This might not terminate if <code>q</code> is never closed.</p><ul class="at-tags"><li class="since"><span class="at-tag">since</span> 0.4</li></ul></div></div><div class="odoc-spec"><div class="spec value anchored" id="val-to_gen"><a href="#val-to_gen" class="anchor"></a><code><span><span class="keyword">val</span> to_gen : <span><span><span class="type-var">'a</span> <a href="#type-t">t</a></span> <span class="arrow">&#45;&gt;</span></span> <span><span class="type-var">'a</span> <a href="#type-gen">gen</a></span></span></code></div><div class="spec-doc"><p><code>to_gen q</code> returns a generator from the queue.</p><ul class="at-tags"><li class="since"><span class="at-tag">since</span> 0.4</li></ul></div></div><div class="odoc-spec"><div class="spec value anchored" id="val-to_seq"><a href="#val-to_seq" class="anchor"></a><code><span><span class="keyword">val</span> to_seq : <span><span><span class="type-var">'a</span> <a href="#type-t">t</a></span> <span class="arrow">&#45;&gt;</span></span> <span><span class="type-var">'a</span> <a href="../../../ocaml/Stdlib/Seq/index.html#type-t">Stdlib.Seq.t</a></span></span></code></div><div class="spec-doc"><p><code>to_gen q</code> returns a (transient) sequence from the queue.</p><ul class="at-tags"><li class="since"><span class="at-tag">since</span> 0.4</li></ul></div></div></div></body></html>