diff --git a/moonpool/Moonpool/Blocking_queue/index.html b/moonpool/Moonpool/Blocking_queue/index.html index 2b7c2c44..2ef85651 100644 --- a/moonpool/Moonpool/Blocking_queue/index.html +++ b/moonpool/Moonpool/Blocking_queue/index.html @@ -1,17 +1,17 @@ -
Moonpool.Blocking_queueA simple blocking queue.
This queue is quite basic and will not behave well under heavy contention. However, it can be sufficient for many practical use cases.
NOTE: this queue will typically block the caller thread in case the operation (push/pop) cannot proceed. Be wary of deadlocks when using the queue from a pool when you expect the other end to also be produced/consumed from the same pool.
See discussion on Fut.wait_block for more details on deadlocks and how to mitigate the risk of running into them.
More scalable queues can be found in Lockfree (https://github.com/ocaml-multicore/lockfree/)
Unbounded blocking queue.
This queue is thread-safe and will block when calling pop on it when it's empty.
val create : unit -> _ tCreate a new unbounded queue.
val size : _ t -> intNumber of items currently in the queue. Note that pop might still block if this returns a non-zero number, since another thread might have consumed the items in the mean time.
val push : 'a t -> 'a -> unitpush q x pushes x into q, and returns ().
In the current implementation, push q will never block for a long time, it will only block while waiting for a lock so it can push the element.
val pop : 'a t -> 'apop q pops the next element in q. It might block until an element comes.
val close : _ t -> unitval try_pop : force_lock:bool -> 'a t -> 'a optiontry_pop q immediately pops the first element of q, if any, or returns None without blocking.
val try_push : 'a t -> 'a -> booltry_push q x tries to push into q, in which case it returns true; or it fails to push and returns false without blocking.
val transfer : 'a t -> 'a Stdlib.Queue.t -> unittransfer bq q2 transfers all items presently in bq into q2 in one atomic section, and clears bq. It blocks if no element is in bq.
This is useful to consume elements from the queue in batch. Create a Queue.t locally:
let dowork (work_queue: job Bb_queue.t) =
- (* local queue, not thread safe *)
- let local_q = Queue.create() in
- try
- while true do
- (* work on local events, already on this thread *)
- while not (Queue.is_empty local_q) do
- let job = Queue.pop local_q in
- process_job job
- done;
+Blocking_queue (moonpool.Moonpool.Blocking_queue) Module Moonpool.Blocking_queue
A simple blocking queue.
This queue is quite basic and will not behave well under heavy contention. However, it can be sufficient for many practical use cases.
NOTE: this queue will typically block the caller thread in case the operation (push/pop) cannot proceed. Be wary of deadlocks when using the queue from a pool when you expect the other end to also be produced/consumed from the same pool.
See discussion on Fut.wait_block for more details on deadlocks and how to mitigate the risk of running into them.
More scalable queues can be found in Lockfree (https://github.com/ocaml-multicore/lockfree/)
Unbounded blocking queue.
This queue is thread-safe and will block when calling pop on it when it's empty.
val create : unit -> _ tCreate a new unbounded queue.
val size : _ t -> intNumber of items currently in the queue. Note that pop might still block if this returns a non-zero number, since another thread might have consumed the items in the mean time.
val push : 'a t -> 'a -> unitpush q x pushes x into q, and returns ().
In the current implementation, push q will never block for a long time, it will only block while waiting for a lock so it can push the element.
val pop : 'a t -> 'apop q pops the next element in q. It might block until an element comes.
val close : _ t -> unitval try_pop : force_lock:bool -> 'a t -> 'a optiontry_pop q immediately pops the first element of q, if any, or returns None without blocking.
val try_push : 'a t -> 'a -> booltry_push q x tries to push into q, in which case it returns true; or it fails to push and returns false without blocking.
val transfer : 'a t -> 'a Stdlib.Queue.t -> unittransfer bq q2 transfers all items presently in bq into q2 in one atomic section, and clears bq. It blocks if no element is in bq.
This is useful to consume elements from the queue in batch. Create a Queue.t locally:
let dowork (work_queue : job Bb_queue.t) =
+ (* local queue, not thread safe *)
+ let local_q = Queue.create () in
+ try
+ while true do
+ (* work on local events, already on this thread *)
+ while not (Queue.is_empty local_q) do
+ let job = Queue.pop local_q in
+ process_job job
+ done;
- (* get all the events in the incoming blocking queue, in
- one single critical section. *)
- Bb_queue.transfer work_queue local_q
- done
- with Bb_queue.Closed -> ()
to_iter q returns an iterator over all items in the queue. This might not terminate if q is never closed.
val to_seq : 'a t -> 'a Stdlib.Seq.tto_gen q returns a (transient) sequence from the queue.
+ (* get all the events in the incoming blocking queue, in
+ one single critical section. *)
+ Bb_queue.transfer work_queue local_q
+ done
+ with Bb_queue.Closed -> ()to_iter q returns an iterator over all items in the queue. This might not terminate if q is never closed.
val to_seq : 'a t -> 'a Stdlib.Seq.tto_gen q returns a (transient) sequence from the queue.
Moonpool.FutFutures.
A future of type 'a t represents the result of a computation that will yield a value of type 'a.
Typically, the computation is running on a thread pool Runner.t and will proceed on some worker. Once set, a future cannot change. It either succeeds (storing a Ok x with x: 'a), or fail (storing a Error (exn, bt) with an exception and the corresponding backtrace).
Combinators such as map and join_array can be used to produce futures from other futures (in a monadic way). Some combinators take a on argument to specify a runner on which the intermediate computation takes place; for example map ~on:pool ~f fut maps the value in fut using function f, applicatively; the call to f happens on the runner pool (once fut resolves successfully with a value).
type 'a t = 'a Picos.Computation.tA future with a result of type 'a.
type 'a promise = private 'a tA promise, which can be fulfilled exactly once to set the corresponding future. This is a private alias of 'a t since 0.7, previously it was opaque.
val make_promise : unit -> 'a promiseSame as make but returns a single promise (which can be upcast to a future). This is useful mostly to preserve memory.
How to upcast to a future in the worst case:
let prom = Fut.make_promise();;
- let fut = (prom : _ Fut.promise :> _ Fut.t) ;;on_result fut f registers f to be called in the future when fut is set ; or calls f immediately if fut is already set.
on_result_ignore fut f registers f to be called in the future when fut is set; or calls f immediately if fut is already set. It does not pass the result, only a success/error signal.
Fullfill the promise, setting the future at the same time.
Fullfill the promise, setting the future at the same time. Does nothing if the promise is already fulfilled.
val return : 'a -> 'a tAlready settled future, with a result
val fail : exn -> Stdlib.Printexc.raw_backtrace -> _ tAlready settled future, with a failure
val is_resolved : _ t -> boolis_resolved fut is true iff fut is resolved.
peek fut returns Some r if fut is currently resolved with r, and None if fut is not resolved yet.
get_or_fail fut obtains the result from fut if it's fulfilled (i.e. if peek fut returns Some res, get_or_fail fut returns res).
val get_or_fail_exn : 'a t -> 'aget_or_fail_exn fut obtains the result from fut if it's fulfilled, like get_or_fail. If the result is an Error _, the exception inside is re-raised.
val is_done : _ t -> boolIs the future resolved? This is the same as peek fut |> Option.is_some.
val is_success : _ t -> boolChecks if the future is resolved with Ok _ as a result.
val is_failed : _ t -> boolChecks if the future is resolved with Error _ as a result.
val raise_if_failed : _ t -> unitraise_if_failed fut raises e if fut failed with e.
spaw ~on f runs f() on the given runner on, and return a future that will hold its result.
val spawn_on_current_runner : (unit -> 'a) -> 'a tThis must be run from inside a runner, and schedules the new task on it as well.
See Runner.get_current_runner to see how the runner is found.
reify_error fut turns a failing future into a non-failing one that contain Error (exn, bt). A non-failing future returning x is turned into Ok x
map ?on ~f fut returns a new future fut2 that resolves with f x if fut resolved with x; and fails with e if fut fails with e or f x raises e.
bind ?on ~f fut returns a new future fut2 that resolves like the future f x if fut resolved with x; and fails with e if fut fails with e or f x raises e.
bind_reify_error ?on ~f fut returns a new future fut2 that resolves like the future f (Ok x) if fut resolved with x; and resolves like the future f (Error (exn, bt)) if fut fails with exn and backtrace bt.
both a b succeeds with x, y if a succeeds with x and b succeeds with y, or fails if any of them fails.
choose a b succeeds Left x or Right y if a succeeds with x or b succeeds with y, or fails if both of them fails. If they both succeed, it is not specified which result is used.
choose_same a b succeeds with the value of one of a or b if they succeed, or fails if both fail. If they both succeed, it is not specified which result is used.
Wait for all the futures in the array. Fails if any future fails.
Wait for all the futures in the list. Fails if any future fails.
module Advanced : sig ... endmap_list ~f l is like join_list @@ List.map f l.
wait_array arr waits for all futures in arr to resolve. It discards the individual results of futures in arr. It fails if any future fails.
wait_list l waits for all futures in l to resolve. It discards the individual results of futures in l. It fails if any future fails.
for_ ~on n f runs f 0, f 1, …, f (n-1) on the runner, and returns a future that resolves when all the tasks have resolved, or fails as soon as one task has failed.
for_array ~on arr f runs f 0 arr.(0), …, f (n-1) arr.(n-1) in the runner (where n = Array.length arr), and returns a future that resolves when all the tasks are done, or fails if any of them fails.
for_list ~on l f is like for_array ~on (Array.of_list l) f.
NOTE This is only available on OCaml 5.
val await : 'a t -> 'aawait fut suspends the current tasks until fut is fulfilled, then resumes the task on this same runner (but possibly on a different thread/domain).
This must only be run from inside the runner itself. The runner must support Suspend_. NOTE: only on OCaml 5.x
wait_block fut blocks the current thread until fut is resolved, and returns its value.
NOTE: A word of warning: this will monopolize the calling thread until the future resolves. This can also easily cause deadlocks, if enough threads in a pool call wait_block on futures running on the same pool or a pool depending on it.
A good rule to avoid deadlocks is to run this from outside of any pool, or to have an acyclic order between pools where wait_block is only called from a pool on futures evaluated in a pool that comes lower in the hierarchy. If this rule is broken, it is possible for all threads in a pool to wait for futures that can only make progress on these same threads, hence the deadlock.
val wait_block_exn : 'a t -> 'aSame as wait_block but re-raises the exception if the future failed.
These combinators run on either the current pool (if present), or on the same thread that just fulfilled the previous future if not.
They were previously present as module Infix_local and val infix, but are now simplified.
module Infix : sig ... endmodule Infix_local = InfixMoonpool.FutFutures.
A future of type 'a t represents the result of a computation that will yield a value of type 'a.
Typically, the computation is running on a thread pool Runner.t and will proceed on some worker. Once set, a future cannot change. It either succeeds (storing a Ok x with x: 'a), or fail (storing a Error (exn, bt) with an exception and the corresponding backtrace).
Combinators such as map and join_array can be used to produce futures from other futures (in a monadic way). Some combinators take a on argument to specify a runner on which the intermediate computation takes place; for example map ~on:pool ~f fut maps the value in fut using function f, applicatively; the call to f happens on the runner pool (once fut resolves successfully with a value).
type 'a t = 'a Picos.Computation.tA future with a result of type 'a.
type 'a promise = private 'a tA promise, which can be fulfilled exactly once to set the corresponding future. This is a private alias of 'a t since 0.7, previously it was opaque.
val make_promise : unit -> 'a promiseSame as make but returns a single promise (which can be upcast to a future). This is useful mostly to preserve memory.
How to upcast to a future in the worst case:
let prom = Fut.make_promise ()
+ let fut = (prom : _ Fut.promise :> _ Fut.t)on_result fut f registers f to be called in the future when fut is set ; or calls f immediately if fut is already set.
on_result_ignore fut f registers f to be called in the future when fut is set; or calls f immediately if fut is already set. It does not pass the result, only a success/error signal.
Fullfill the promise, setting the future at the same time.
Fullfill the promise, setting the future at the same time. Does nothing if the promise is already fulfilled.
val return : 'a -> 'a tAlready settled future, with a result
val fail : exn -> Stdlib.Printexc.raw_backtrace -> _ tAlready settled future, with a failure
val is_resolved : _ t -> boolis_resolved fut is true iff fut is resolved.
peek fut returns Some r if fut is currently resolved with r, and None if fut is not resolved yet.
get_or_fail fut obtains the result from fut if it's fulfilled (i.e. if peek fut returns Some res, get_or_fail fut returns res).
val get_or_fail_exn : 'a t -> 'aget_or_fail_exn fut obtains the result from fut if it's fulfilled, like get_or_fail. If the result is an Error _, the exception inside is re-raised.
val is_done : _ t -> boolIs the future resolved? This is the same as peek fut |> Option.is_some.
val is_success : _ t -> boolChecks if the future is resolved with Ok _ as a result.
val is_failed : _ t -> boolChecks if the future is resolved with Error _ as a result.
val raise_if_failed : _ t -> unitraise_if_failed fut raises e if fut failed with e.
spaw ~on f runs f() on the given runner on, and return a future that will hold its result.
val spawn_on_current_runner : (unit -> 'a) -> 'a tThis must be run from inside a runner, and schedules the new task on it as well.
See Runner.get_current_runner to see how the runner is found.
reify_error fut turns a failing future into a non-failing one that contain Error (exn, bt). A non-failing future returning x is turned into Ok x
map ?on ~f fut returns a new future fut2 that resolves with f x if fut resolved with x; and fails with e if fut fails with e or f x raises e.
bind ?on ~f fut returns a new future fut2 that resolves like the future f x if fut resolved with x; and fails with e if fut fails with e or f x raises e.
bind_reify_error ?on ~f fut returns a new future fut2 that resolves like the future f (Ok x) if fut resolved with x; and resolves like the future f (Error (exn, bt)) if fut fails with exn and backtrace bt.
both a b succeeds with x, y if a succeeds with x and b succeeds with y, or fails if any of them fails.
choose a b succeeds Left x or Right y if a succeeds with x or b succeeds with y, or fails if both of them fails. If they both succeed, it is not specified which result is used.
choose_same a b succeeds with the value of one of a or b if they succeed, or fails if both fail. If they both succeed, it is not specified which result is used.
Wait for all the futures in the array. Fails if any future fails.
Wait for all the futures in the list. Fails if any future fails.
module Advanced : sig ... endmap_list ~f l is like join_list @@ List.map f l.
wait_array arr waits for all futures in arr to resolve. It discards the individual results of futures in arr. It fails if any future fails.
wait_list l waits for all futures in l to resolve. It discards the individual results of futures in l. It fails if any future fails.
for_ ~on n f runs f 0, f 1, …, f (n-1) on the runner, and returns a future that resolves when all the tasks have resolved, or fails as soon as one task has failed.
for_array ~on arr f runs f 0 arr.(0), …, f (n-1) arr.(n-1) in the runner (where n = Array.length arr), and returns a future that resolves when all the tasks are done, or fails if any of them fails.
for_list ~on l f is like for_array ~on (Array.of_list l) f.
NOTE This is only available on OCaml 5.
val await : 'a t -> 'aawait fut suspends the current tasks until fut is fulfilled, then resumes the task on this same runner (but possibly on a different thread/domain).
This must only be run from inside the runner itself. The runner must support Suspend_. NOTE: only on OCaml 5.x
wait_block fut blocks the current thread until fut is resolved, and returns its value.
NOTE: A word of warning: this will monopolize the calling thread until the future resolves. This can also easily cause deadlocks, if enough threads in a pool call wait_block on futures running on the same pool or a pool depending on it.
A good rule to avoid deadlocks is to run this from outside of any pool, or to have an acyclic order between pools where wait_block is only called from a pool on futures evaluated in a pool that comes lower in the hierarchy. If this rule is broken, it is possible for all threads in a pool to wait for futures that can only make progress on these same threads, hence the deadlock.
val wait_block_exn : 'a t -> 'aSame as wait_block but re-raises the exception if the future failed.
These combinators run on either the current pool (if present), or on the same thread that just fulfilled the previous future if not.
They were previously present as module Infix_local and val infix, but are now simplified.
module Infix : sig ... endmodule Infix_local = Infix