diff --git a/moonpool-lwt/_doc-dir/CHANGES.md b/moonpool-lwt/_doc-dir/CHANGES.md index a295fa77..e5dc0608 100644 --- a/moonpool-lwt/_doc-dir/CHANGES.md +++ b/moonpool-lwt/_doc-dir/CHANGES.md @@ -1,4 +1,11 @@ +# 0.7 + +- add `Moonpool_fiber.spawn_top_ignore` +- add `moonpool-io`, based on `picos_io` (still very experimental) +- move to picos as the foundation layer for concurrency primitives (#30) +- move to `thread-local-storage` 0.2 with get/set API + # 0.6 - breaking: remove `Immediate_runner` (bug prone and didn't diff --git a/moonpool/Moonpool/Exn_bt/index.html b/moonpool/Moonpool/Exn_bt/index.html index d7489959..5b2c2d24 100644 --- a/moonpool/Moonpool/Exn_bt/index.html +++ b/moonpool/Moonpool/Exn_bt/index.html @@ -1,2 +1,2 @@ -
Moonpool.Exn_btException with backtrace.
Type changed
An exception bundled with a backtrace
type t = exn * Stdlib.Printexc.raw_backtraceval exn : t -> exnval bt : t -> Stdlib.Printexc.raw_backtraceval raise : t -> 'aval get : exn -> tval get_callstack : int -> exn -> tval make : exn -> Stdlib.Printexc.raw_backtrace -> tTrivial builder
val show : t -> stringSimple printing
val pp : Stdlib.Format.formatter -> t -> unitval unwrap : 'a result -> 'aunwrap (Ok x) is x, unwrap (Error ebt) re-raises ebt.
Moonpool.Exn_btException with backtrace.
Type changed
An exception bundled with a backtrace
type t = exn * Stdlib.Printexc.raw_backtraceval exn : t -> exnval bt : t -> Stdlib.Printexc.raw_backtraceval raise : t -> 'aval get : exn -> tval get_callstack : int -> exn -> tval make : exn -> Stdlib.Printexc.raw_backtrace -> tTrivial builder
val show : t -> stringSimple printing
val pp : Stdlib.Format.formatter -> t -> unitval unwrap : 'a result -> 'aunwrap (Ok x) is x, unwrap (Error ebt) re-raises ebt.
Moonpool.FutFutures.
A future of type 'a t represents the result of a computation that will yield a value of type 'a.
Typically, the computation is running on a thread pool Runner.t and will proceed on some worker. Once set, a future cannot change. It either succeeds (storing a Ok x with x: 'a), or fail (storing a Error (exn, bt) with an exception and the corresponding backtrace).
Combinators such as map and join_array can be used to produce futures from other futures (in a monadic way). Some combinators take a on argument to specify a runner on which the intermediate computation takes place; for example map ~on:pool ~f fut maps the value in fut using function f, applicatively; the call to f happens on the runner pool (once fut resolves successfully with a value).
on_result fut f registers f to be called in the future when fut is set ; or calls f immediately if fut is already set.
on_result_ignore fut f registers f to be called in the future when fut is set; or calls f immediately if fut is already set. It does not pass the result, only a success/error signal.
Fullfill the promise, setting the future at the same time.
Fullfill the promise, setting the future at the same time. Does nothing if the promise is already fulfilled.
val return : 'a -> 'a tAlready settled future, with a result
val fail : exn -> Stdlib.Printexc.raw_backtrace -> _ tAlready settled future, with a failure
val is_resolved : _ t -> boolis_resolved fut is true iff fut is resolved.
peek fut returns Some r if fut is currently resolved with r, and None if fut is not resolved yet.
get_or_fail fut obtains the result from fut if it's fulfilled (i.e. if peek fut returns Some res, get_or_fail fut returns res).
val get_or_fail_exn : 'a t -> 'aget_or_fail_exn fut obtains the result from fut if it's fulfilled, like get_or_fail. If the result is an Error _, the exception inside is re-raised.
val is_done : _ t -> boolIs the future resolved? This is the same as peek fut |> Option.is_some.
val is_success : _ t -> boolChecks if the future is resolved with Ok _ as a result.
val is_failed : _ t -> boolChecks if the future is resolved with Error _ as a result.
val raise_if_failed : _ t -> unitraise_if_failed fut raises e if fut failed with e.
spaw ~on f runs f() on the given runner on, and return a future that will hold its result.
val spawn_on_current_runner : (unit -> 'a) -> 'a tThis must be run from inside a runner, and schedules the new task on it as well.
See Runner.get_current_runner to see how the runner is found.
reify_error fut turns a failing future into a non-failing one that contain Error (exn, bt). A non-failing future returning x is turned into Ok x
map ?on ~f fut returns a new future fut2 that resolves with f x if fut resolved with x; and fails with e if fut fails with e or f x raises e.
bind ?on ~f fut returns a new future fut2 that resolves like the future f x if fut resolved with x; and fails with e if fut fails with e or f x raises e.
bind_reify_error ?on ~f fut returns a new future fut2 that resolves like the future f (Ok x) if fut resolved with x; and resolves like the future f (Error (exn, bt)) if fut fails with exn and backtrace bt.
both a b succeeds with x, y if a succeeds with x and b succeeds with y, or fails if any of them fails.
choose a b succeeds Left x or Right y if a succeeds with x or b succeeds with y, or fails if both of them fails. If they both succeed, it is not specified which result is used.
choose_same a b succeeds with the value of one of a or b if they succeed, or fails if both fail. If they both succeed, it is not specified which result is used.
Wait for all the futures in the array. Fails if any future fails.
Wait for all the futures in the list. Fails if any future fails.
module Advanced : sig ... endmap_list ~f l is like join_list @@ List.map f l.
wait_array arr waits for all futures in arr to resolve. It discards the individual results of futures in arr. It fails if any future fails.
wait_list l waits for all futures in l to resolve. It discards the individual results of futures in l. It fails if any future fails.
for_ ~on n f runs f 0, f 1, …, f (n-1) on the runner, and returns a future that resolves when all the tasks have resolved, or fails as soon as one task has failed.
for_array ~on arr f runs f 0 arr.(0), …, f (n-1) arr.(n-1) in the runner (where n = Array.length arr), and returns a future that resolves when all the tasks are done, or fails if any of them fails.
for_list ~on l f is like for_array ~on (Array.of_list l) f.
NOTE This is only available on OCaml 5.
val await : 'a t -> 'aawait fut suspends the current tasks until fut is fulfilled, then resumes the task on this same runner (but possibly on a different thread/domain).
This must only be run from inside the runner itself. The runner must support Suspend_. NOTE: only on OCaml 5.x
wait_block fut blocks the current thread until fut is resolved, and returns its value.
NOTE: A word of warning: this will monopolize the calling thread until the future resolves. This can also easily cause deadlocks, if enough threads in a pool call wait_block on futures running on the same pool or a pool depending on it.
A good rule to avoid deadlocks is to run this from outside of any pool, or to have an acyclic order between pools where wait_block is only called from a pool on futures evaluated in a pool that comes lower in the hierarchy. If this rule is broken, it is possible for all threads in a pool to wait for futures that can only make progress on these same threads, hence the deadlock.
val wait_block_exn : 'a t -> 'aSame as wait_block but re-raises the exception if the future failed.
These combinators run on either the current pool (if present), or on the same thread that just fulfilled the previous future if not.
They were previously present as module Infix_local and val infix, but are now simplified.
module Infix : sig ... endmodule Infix_local = InfixMoonpool.FutFutures.
A future of type 'a t represents the result of a computation that will yield a value of type 'a.
Typically, the computation is running on a thread pool Runner.t and will proceed on some worker. Once set, a future cannot change. It either succeeds (storing a Ok x with x: 'a), or fail (storing a Error (exn, bt) with an exception and the corresponding backtrace).
Combinators such as map and join_array can be used to produce futures from other futures (in a monadic way). Some combinators take a on argument to specify a runner on which the intermediate computation takes place; for example map ~on:pool ~f fut maps the value in fut using function f, applicatively; the call to f happens on the runner pool (once fut resolves successfully with a value).
on_result fut f registers f to be called in the future when fut is set ; or calls f immediately if fut is already set.
on_result_ignore fut f registers f to be called in the future when fut is set; or calls f immediately if fut is already set. It does not pass the result, only a success/error signal.
Fullfill the promise, setting the future at the same time.
Fullfill the promise, setting the future at the same time. Does nothing if the promise is already fulfilled.
val return : 'a -> 'a tAlready settled future, with a result
val fail : exn -> Stdlib.Printexc.raw_backtrace -> _ tAlready settled future, with a failure
val is_resolved : _ t -> boolis_resolved fut is true iff fut is resolved.
peek fut returns Some r if fut is currently resolved with r, and None if fut is not resolved yet.
get_or_fail fut obtains the result from fut if it's fulfilled (i.e. if peek fut returns Some res, get_or_fail fut returns res).
val get_or_fail_exn : 'a t -> 'aget_or_fail_exn fut obtains the result from fut if it's fulfilled, like get_or_fail. If the result is an Error _, the exception inside is re-raised.
val is_done : _ t -> boolIs the future resolved? This is the same as peek fut |> Option.is_some.
val is_success : _ t -> boolChecks if the future is resolved with Ok _ as a result.
val is_failed : _ t -> boolChecks if the future is resolved with Error _ as a result.
val raise_if_failed : _ t -> unitraise_if_failed fut raises e if fut failed with e.
spaw ~on f runs f() on the given runner on, and return a future that will hold its result.
val spawn_on_current_runner : (unit -> 'a) -> 'a tThis must be run from inside a runner, and schedules the new task on it as well.
See Runner.get_current_runner to see how the runner is found.
reify_error fut turns a failing future into a non-failing one that contain Error (exn, bt). A non-failing future returning x is turned into Ok x
map ?on ~f fut returns a new future fut2 that resolves with f x if fut resolved with x; and fails with e if fut fails with e or f x raises e.
bind ?on ~f fut returns a new future fut2 that resolves like the future f x if fut resolved with x; and fails with e if fut fails with e or f x raises e.
bind_reify_error ?on ~f fut returns a new future fut2 that resolves like the future f (Ok x) if fut resolved with x; and resolves like the future f (Error (exn, bt)) if fut fails with exn and backtrace bt.
both a b succeeds with x, y if a succeeds with x and b succeeds with y, or fails if any of them fails.
choose a b succeeds Left x or Right y if a succeeds with x or b succeeds with y, or fails if both of them fails. If they both succeed, it is not specified which result is used.
choose_same a b succeeds with the value of one of a or b if they succeed, or fails if both fail. If they both succeed, it is not specified which result is used.
Wait for all the futures in the array. Fails if any future fails.
Wait for all the futures in the list. Fails if any future fails.
module Advanced : sig ... endmap_list ~f l is like join_list @@ List.map f l.
wait_array arr waits for all futures in arr to resolve. It discards the individual results of futures in arr. It fails if any future fails.
wait_list l waits for all futures in l to resolve. It discards the individual results of futures in l. It fails if any future fails.
for_ ~on n f runs f 0, f 1, …, f (n-1) on the runner, and returns a future that resolves when all the tasks have resolved, or fails as soon as one task has failed.
for_array ~on arr f runs f 0 arr.(0), …, f (n-1) arr.(n-1) in the runner (where n = Array.length arr), and returns a future that resolves when all the tasks are done, or fails if any of them fails.
for_list ~on l f is like for_array ~on (Array.of_list l) f.
NOTE This is only available on OCaml 5.
val await : 'a t -> 'aawait fut suspends the current tasks until fut is fulfilled, then resumes the task on this same runner (but possibly on a different thread/domain).
This must only be run from inside the runner itself. The runner must support Suspend_. NOTE: only on OCaml 5.x
wait_block fut blocks the current thread until fut is resolved, and returns its value.
NOTE: A word of warning: this will monopolize the calling thread until the future resolves. This can also easily cause deadlocks, if enough threads in a pool call wait_block on futures running on the same pool or a pool depending on it.
A good rule to avoid deadlocks is to run this from outside of any pool, or to have an acyclic order between pools where wait_block is only called from a pool on futures evaluated in a pool that comes lower in the hierarchy. If this rule is broken, it is possible for all threads in a pool to wait for futures that can only make progress on these same threads, hence the deadlock.
val wait_block_exn : 'a t -> 'aSame as wait_block but re-raises the exception if the future failed.
These combinators run on either the current pool (if present), or on the same thread that just fulfilled the previous future if not.
They were previously present as module Infix_local and val infix, but are now simplified.
module Infix : sig ... endmodule Infix_local = InfixMoonpool.TriggerTriggers from picos
include module type of struct include Picos.Trigger endtype t = Picos.Trigger.tRepresents a trigger. A trigger can be in one of three states: initial, awaiting, or signaled.
ℹ️ Once a trigger becomes signaled it no longer changes state.
🏎️ A trigger in the initial and signaled states is a tiny object that does not hold onto any other objects.
val create : unit -> tcreate () allocates a new trigger in the initial state.
val is_signaled : t -> boolis_signaled trigger determines whether the trigger is in the signaled state.
This can be useful, for example, when a trigger is being inserted to multiple locations and might be signaled concurrently while doing so. In such a case one can periodically check with is_signaled trigger whether it makes sense to continue.
ℹ️ Computation.try_attach already checks that the trigger being inserted has not been signaled so when attaching a trigger to multiple computations there is no need to separately check with is_signaled.
val await : t -> (exn * Stdlib.Printexc.raw_backtrace) optionawait trigger waits for the trigger to be signaled.
The return value is None in case the trigger has been signaled and the fiber was resumed normally. Otherwise the return value is Some (exn, bt), which indicates that the fiber has been canceled and the caller should raise the exception. In either case the caller is responsible for cleaning up. Usually this means making sure that no references to the trigger remain to avoid space leaks.
⚠️ As a rule of thumb, if you inserted the trigger to some data structure or attached it to some computation, then you are responsible for removing and detaching the trigger after await.
ℹ️ A trigger in the signaled state only takes a small constant amount of memory. Make sure that it is not possible for a program to accumulate unbounded numbers of signaled triggers under any circumstance.
⚠️ Only the owner or creator of a trigger may call await. It is considered an error to make multiple calls to await.
ℹ️ The behavior is that, unless await can return immediately,
await will perform the Await effect, andawait will call the await operation of the current handler.val signal : t -> unitsignal trigger puts the trigger into the signaled state and calls the resume action, if any, attached using on_signal.
The intention is that calling signal trigger guarantees that any fiber awaiting the trigger will be resumed. However, when and whether a fiber having called await will be resumed normally or as canceled is determined by the scheduler that handles the Await effect.
ℹ️ Note that under normal circumstances, signal should never raise an exception. If an exception is raised by signal, it means that the handler of Await has a bug or some catastrophic failure has occurred.
⚠️ Do not call signal from an effect handler in a scheduler.
val is_initial : t -> boolis_initial trigger determines whether the trigger is in the initial or in the signaled state.
ℹ️ Consider using is_signaled instead of is_initial as in some contexts a trigger might reasonably be either in the initial or the awaiting state depending on the order in which things are being done.
on_signal trigger x y resume attempts to attach the resume action to the trigger and transition the trigger to the awaiting state.
The return value is true in case the action was attached successfully. Otherwise the return value is false, which means that the trigger was already in the signaled state.
⚠️ The action that you attach to a trigger must be safe to call from any context that might end up signaling the trigger directly or indirectly through propagation. Unless you know, then you should assume that the resume action might be called from a different domain running in parallel with neither effect nor exception handlers and that if the attached action doesn't return the system may deadlock or if the action doesn't return quickly it may cause performance issues.
⚠️ It is considered an error to make multiple calls to on_signal with a specific trigger.
from_action x y resume is equivalent to let t = create () in assert (on_signal t x y resume); t.
⚠️ The action that you attach to a trigger must be safe to call from any context that might end up signaling the trigger directly or indirectly through propagation. Unless you know, then you should assume that the resume action might be called from a different domain running in parallel with neither effect nor exception handlers and that if the attached action doesn't return the system may deadlock or if the action doesn't return quickly it may cause performance issues.
⚠️ The returned trigger will be in the awaiting state, which means that it is an error to call await, on_signal, or dispose on it.
val dispose : t -> unittype Stdlib.Effect.t += private | Await : t -> (exn * Stdlib.Printexc.raw_backtrace) option Stdlib.Effect.tSchedulers must handle the Await effect to implement the behavior of await.
In case the fiber permits propagation of cancelation, the trigger must be attached to the computation of the fiber for the duration of suspending the fiber by the scheduler.
Typically the scheduler calls try_suspend, which in turn calls on_signal, to attach a scheduler specific resume action to the trigger. The scheduler must guarantee that the fiber will be resumed after signal has been called on the trigger.
Whether being resumed due to cancelation or not, the trigger must be either signaled outside of the effect handler, or disposed by the effect handler, before resuming the fiber.
In case the fiber permits propagation of cancelation and the computation associated with the fiber has been canceled the scheduler is free to continue the fiber immediately with the cancelation exception after disposing the trigger.
⚠️ A scheduler must not discontinue, i.e. raise an exception to, the fiber as a response to Await.
The scheduler is free to choose which ready fiber to resume next.
A key idea behind this design is that the handler for Await does not need to run arbitrary user defined code while suspending a fiber: the handler calls on_signal by itself. This should make it easier to get both the handler and the user code correct.
Another key idea is that the signal operation provides no feedback as to the outcome regarding cancelation. Calling signal merely guarantees that the caller of await will return. This means that the point at which cancelation must be determined can be as late as possible. A scheduler can check the cancelation status just before calling continue and it is, of course, possible to check the cancelation status earlier. This allows maximal flexibility for the handler of Await.
The consequence of this is that the only place to handle cancelation is at the point of await. This makes the design simpler and should make it easier for the user to get the handling of cancelation right. A minor detail is that await returns an option instead of raising an exception. The reason for this is that matching against an option is slightly faster than setting up an exception handler. Returning an option also clearly communicates the two different cases to handle.
On the other hand, the trigger mechanism does not have a way to specify a user-defined callback to perform cancelation immediately before the fiber is resumed. Such an immediately called callback could be useful for e.g. canceling an underlying IO request. One justification for not having such a callback is that cancelation is allowed to take place from outside of the scheduler, i.e. from another system level thread, and, in such a case, the callback could not be called immediately. Instead, the scheduler is free to choose how to schedule canceled and continued fibers and, assuming that fibers can be trusted, a scheduler may give priority to canceled fibers.
This design also separates the allocation of the atomic state for the trigger, or create, from await, and allows the state to be polled using is_signaled before calling await. This is particularly useful when the trigger might need to be inserted to multiple places and be signaled in parallel before the call of await.
No mechanism is provided to communicate any result with the signal. That can be done outside of the mechanism and is often not needed. This simplifies the design.
Once signal has been called, a trigger no longer refers to any other object and takes just two words of memory. This e.g. allows lazy removal of triggers, assuming the number of attached triggers can be bounded, because nothing except the trigger itself would be leaked.
To further understand the problem domain, in this design, in a suspend-resume scenario, there are three distinct pieces of state:
The trigger and cancelation status are both updated independently and atomically through code in this interface. The key requirement left for the user is to make sure that the state of the shared data structure is updated correctly independently of what await returns. So, for example, a mutex implementation must check, after getting Some (exn, bt), what the state of the mutex is and how it should be updated.
val await_exn : t -> unitMoonpool.TriggerTriggers from picos
include module type of struct include Picos.Trigger endtype t = Picos.Trigger.tRepresents a trigger. A trigger can be in one of three states: initial, awaiting, or signaled.
ℹ️ Once a trigger becomes signaled it no longer changes state.
🏎️ A trigger in the initial and signaled states is a tiny object that does not hold onto any other objects.
val create : unit -> tcreate () allocates a new trigger in the initial state.
val is_signaled : t -> boolis_signaled trigger determines whether the trigger is in the signaled state.
This can be useful, for example, when a trigger is being inserted to multiple locations and might be signaled concurrently while doing so. In such a case one can periodically check with is_signaled trigger whether it makes sense to continue.
ℹ️ Computation.try_attach already checks that the trigger being inserted has not been signaled so when attaching a trigger to multiple computations there is no need to separately check with is_signaled.
val await : t -> (exn * Stdlib.Printexc.raw_backtrace) optionawait trigger waits for the trigger to be signaled.
The return value is None in case the trigger has been signaled and the fiber was resumed normally. Otherwise the return value is Some (exn, bt), which indicates that the fiber has been canceled and the caller should raise the exception. In either case the caller is responsible for cleaning up. Usually this means making sure that no references to the trigger remain to avoid space leaks.
⚠️ As a rule of thumb, if you inserted the trigger to some data structure or attached it to some computation, then you are responsible for removing and detaching the trigger after await.
ℹ️ A trigger in the signaled state only takes a small constant amount of memory. Make sure that it is not possible for a program to accumulate unbounded numbers of signaled triggers under any circumstance.
⚠️ Only the owner or creator of a trigger may call await. It is considered an error to make multiple calls to await.
ℹ️ The behavior is that, unless await can return immediately,
await will perform the Await effect, andawait will call the await operation of the current handler.val signal : t -> unitsignal trigger puts the trigger into the signaled state and calls the resume action, if any, attached using on_signal.
The intention is that calling signal trigger guarantees that any fiber awaiting the trigger will be resumed. However, when and whether a fiber having called await will be resumed normally or as canceled is determined by the scheduler that handles the Await effect.
ℹ️ Note that under normal circumstances, signal should never raise an exception. If an exception is raised by signal, it means that the handler of Await has a bug or some catastrophic failure has occurred.
⚠️ Do not call signal from an effect handler in a scheduler.
val is_initial : t -> boolis_initial trigger determines whether the trigger is in the initial or in the signaled state.
ℹ️ Consider using is_signaled instead of is_initial as in some contexts a trigger might reasonably be either in the initial or the awaiting state depending on the order in which things are being done.
on_signal trigger x y resume attempts to attach the resume action to the trigger and transition the trigger to the awaiting state.
The return value is true in case the action was attached successfully. Otherwise the return value is false, which means that the trigger was already in the signaled state.
⚠️ The action that you attach to a trigger must be safe to call from any context that might end up signaling the trigger directly or indirectly through propagation. Unless you know, then you should assume that the resume action might be called from a different domain running in parallel with neither effect nor exception handlers and that if the attached action doesn't return the system may deadlock or if the action doesn't return quickly it may cause performance issues.
⚠️ It is considered an error to make multiple calls to on_signal with a specific trigger.
from_action x y resume is equivalent to let t = create () in assert (on_signal t x y resume); t.
⚠️ The action that you attach to a trigger must be safe to call from any context that might end up signaling the trigger directly or indirectly through propagation. Unless you know, then you should assume that the resume action might be called from a different domain running in parallel with neither effect nor exception handlers and that if the attached action doesn't return the system may deadlock or if the action doesn't return quickly it may cause performance issues.
⚠️ The returned trigger will be in the awaiting state, which means that it is an error to call await, on_signal, or dispose on it.
val dispose : t -> unittype Stdlib.Effect.t += private | Await : t -> (exn * Stdlib.Printexc.raw_backtrace) option Stdlib.Effect.tSchedulers must handle the Await effect to implement the behavior of await.
In case the fiber permits propagation of cancelation, the trigger must be attached to the computation of the fiber for the duration of suspending the fiber by the scheduler.
Typically the scheduler calls try_suspend, which in turn calls on_signal, to attach a scheduler specific resume action to the trigger. The scheduler must guarantee that the fiber will be resumed after signal has been called on the trigger.
Whether being resumed due to cancelation or not, the trigger must be either signaled outside of the effect handler, or disposed by the effect handler, before resuming the fiber.
In case the fiber permits propagation of cancelation and the computation associated with the fiber has been canceled the scheduler is free to continue the fiber immediately with the cancelation exception after disposing the trigger.
⚠️ A scheduler must not discontinue, i.e. raise an exception to, the fiber as a response to Await.
The scheduler is free to choose which ready fiber to resume next.
A key idea behind this design is that the handler for Await does not need to run arbitrary user defined code while suspending a fiber: the handler calls on_signal by itself. This should make it easier to get both the handler and the user code correct.
Another key idea is that the signal operation provides no feedback as to the outcome regarding cancelation. Calling signal merely guarantees that the caller of await will return. This means that the point at which cancelation must be determined can be as late as possible. A scheduler can check the cancelation status just before calling continue and it is, of course, possible to check the cancelation status earlier. This allows maximal flexibility for the handler of Await.
The consequence of this is that the only place to handle cancelation is at the point of await. This makes the design simpler and should make it easier for the user to get the handling of cancelation right. A minor detail is that await returns an option instead of raising an exception. The reason for this is that matching against an option is slightly faster than setting up an exception handler. Returning an option also clearly communicates the two different cases to handle.
On the other hand, the trigger mechanism does not have a way to specify a user-defined callback to perform cancelation immediately before the fiber is resumed. Such an immediately called callback could be useful for e.g. canceling an underlying IO request. One justification for not having such a callback is that cancelation is allowed to take place from outside of the scheduler, i.e. from another system level thread, and, in such a case, the callback could not be called immediately. Instead, the scheduler is free to choose how to schedule canceled and continued fibers and, assuming that fibers can be trusted, a scheduler may give priority to canceled fibers.
This design also separates the allocation of the atomic state for the trigger, or create, from await, and allows the state to be polled using is_signaled before calling await. This is particularly useful when the trigger might need to be inserted to multiple places and be signaled in parallel before the call of await.
No mechanism is provided to communicate any result with the signal. That can be done outside of the mechanism and is often not needed. This simplifies the design.
Once signal has been called, a trigger no longer refers to any other object and takes just two words of memory. This e.g. allows lazy removal of triggers, assuming the number of attached triggers can be bounded, because nothing except the trigger itself would be leaked.
To further understand the problem domain, in this design, in a suspend-resume scenario, there are three distinct pieces of state:
The trigger and cancelation status are both updated independently and atomically through code in this interface. The key requirement left for the user is to make sure that the state of the shared data structure is updated correctly independently of what await returns. So, for example, a mutex implementation must check, after getting Some (exn, bt), what the state of the mutex is and how it should be updated.
val await_exn : t -> unitMoonpoolMoonpool
A pool within a bigger pool (ie the ocean). Here, we're talking about pools of Thread.t that are dispatched over several Domain.t to enable parallelism.
We provide several implementations of pools with distinct scheduling strategies, alongside some concurrency primitives such as guarding locks (Lock.t) and futures (Fut.t).
module Ws_pool : sig ... endWork-stealing thread pool.
module Fifo_pool : sig ... endA simple thread pool in FIFO order.
module Background_thread : sig ... endA simple runner with a single background thread.
module Runner : sig ... endInterface for runners.
module Trigger : sig ... endTriggers from picos
module Immediate_runner : sig ... endRunner that runs tasks in the caller thread.
module Exn_bt : sig ... endException with backtrace.
val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.tSimilar to Thread.create, but it picks a background domain at random to run the thread. This ensures that we don't always pick the same domain to run all the various threads needed in an application (timers, event loops, etc.)
val run_async : ?fiber:Picos.Fiber.t -> Runner.t -> (unit -> unit) -> unitrun_async runner task schedules the task to run on the given runner. This means task() will be executed at some point in the future, possibly in another thread.
val run_wait_block : ?fiber:Picos.Fiber.t -> Runner.t -> (unit -> 'a) -> 'arun_wait_block runner f schedules f for later execution on the runner, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.
See run_async for more details.
NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).
Number of threads recommended to saturate the CPU. For IO pools this makes little sense (you might want more threads than this because many of them will be blocked most of the time).
spawn ~on f runs f() on the runner (a thread pool typically) and returns a future result for it. See Fut.spawn.
val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.tval get_current_runner : unit -> Runner.t optionmodule Lock : sig ... endMutex-protected resource.
module Fut : sig ... endFutures.
module Chan : sig ... endChannels.
module Task_local_storage : sig ... endTask-local storage.
module Thread_local_storage = Thread_local_storagemodule Blocking_queue : sig ... endA simple blocking queue.
module Bounded_queue : sig ... endA blocking queue of finite size.
module Atomic = Moonpool_private.Atomic_Atomic values.
MoonpoolMoonpool
A pool within a bigger pool (ie the ocean). Here, we're talking about pools of Thread.t that are dispatched over several Domain.t to enable parallelism.
We provide several implementations of pools with distinct scheduling strategies, alongside some concurrency primitives such as guarding locks (Lock.t) and futures (Fut.t).
module Ws_pool : sig ... endWork-stealing thread pool.
module Fifo_pool : sig ... endA simple thread pool in FIFO order.
module Background_thread : sig ... endA simple runner with a single background thread.
module Runner : sig ... endInterface for runners.
module Trigger : sig ... endTriggers from picos
module Immediate_runner : sig ... endRunner that runs tasks in the caller thread.
module Exn_bt : sig ... endException with backtrace.
val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.tSimilar to Thread.create, but it picks a background domain at random to run the thread. This ensures that we don't always pick the same domain to run all the various threads needed in an application (timers, event loops, etc.)
val run_async : ?fiber:Picos.Fiber.t -> Runner.t -> (unit -> unit) -> unitrun_async runner task schedules the task to run on the given runner. This means task() will be executed at some point in the future, possibly in another thread.
val run_wait_block : ?fiber:Picos.Fiber.t -> Runner.t -> (unit -> 'a) -> 'arun_wait_block runner f schedules f for later execution on the runner, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.
See run_async for more details.
NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).
Number of threads recommended to saturate the CPU. For IO pools this makes little sense (you might want more threads than this because many of them will be blocked most of the time).
spawn ~on f runs f() on the runner (a thread pool typically) and returns a future result for it. See Fut.spawn.
val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.tval get_current_runner : unit -> Runner.t optionmodule Lock : sig ... endMutex-protected resource.
module Fut : sig ... endFutures.
module Chan : sig ... endChannels.
module Task_local_storage : sig ... endTask-local storage.
module Thread_local_storage = Thread_local_storagemodule Blocking_queue : sig ... endA simple blocking queue.
module Bounded_queue : sig ... endA blocking queue of finite size.
module Atomic = Moonpool_private.Atomic_Atomic values.
Moonpool_fib.FiberFibers.
A fiber is a lightweight computation that runs cooperatively alongside other fibers. In the context of moonpool, fibers have additional properties:
type cancel_callback = Moonpool.Exn_bt.t -> unitA callback used in case of cancellation
val res : 'a t -> 'a Moonpool.Fut.tFuture result of the fiber.
type 'a callback = 'a Moonpool.Exn_bt.result -> unitCallbacks that are called when a fiber is done.
val return : 'a -> 'a tval fail : Moonpool.Exn_bt.t -> _ tval self : unit -> anyself () is the current fiber. Must be run from inside a fiber.
val peek : 'a t -> 'a Moonpool.Fut.or_error optionPeek inside the future result
val is_done : _ t -> boolHas the fiber completed?
val is_cancelled : _ t -> boolHas the fiber completed with a failure?
val is_success : _ t -> boolHas the fiber completed with a value?
val await : 'a t -> 'aawait fib is like Fut.await (res fib)
val wait_block_exn : 'a t -> 'await_block_exn fib is Fut.wait_block_exn (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.
val wait_block : 'a t -> 'a Moonpool.Fut.or_errorwait_block fib is Fut.wait_block (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.
Check if the current fiber is cancelled, in which case this raises. Must be run from inside a fiber.
val add_on_cancel : _ t -> cancel_callback -> cancel_handleadd_on_cancel fib cb adds cb to the list of cancel callbacks for fib. If fib is already cancelled, cb is called immediately.
val remove_on_cancel : _ t -> cancel_handle -> unitremove_on_cancel fib h removes the cancel callback associated with handle h.
val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'awith_on_cancel fib cb (fun () -> <e>) evaluates e in a scope in which, if the fiber fib is cancelled, cb() is called. If e returns without the fiber being cancelled, this callback is removed.
val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'awith_on_self_cancel cb f calls f() in a scope where cb is added to the cancel callbacks of the current fiber; and f() terminates, cb is removed from the list.
Wait for fiber to be done and call the callback with the result. If the fiber is done already then the callback is invoked immediately with its result.
val spawn_top : on:Moonpool.Runner.t -> (unit -> 'a) -> 'a tspawn_top ~on f spawns a new (toplevel) fiber onto the given runner. This fiber is not the child of any other fiber: its lifetime is only determined by the lifetime of f().
val spawn : ?on:Moonpool.Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a tspawn ~protect f spawns a sub-fiber f_child from a running fiber parent. The sub-fiber f_child is attached to the current fiber and fails if the current fiber parent fails.
spawn_ignore f is ignore (spawn f). The fiber will still affect termination of the parent, ie. the parent will exit only after this new fiber exits.
val spawn_top_ignore : on:Moonpool.Runner.t -> (unit -> _) -> unitLike spawn_top but ignores the result.
Moonpool_fib.FiberFibers.
A fiber is a lightweight computation that runs cooperatively alongside other fibers. In the context of moonpool, fibers have additional properties:
type cancel_callback = Moonpool.Exn_bt.t -> unitA callback used in case of cancellation
val res : 'a t -> 'a Moonpool.Fut.tFuture result of the fiber.
type 'a callback = 'a Moonpool.Exn_bt.result -> unitCallbacks that are called when a fiber is done.
val return : 'a -> 'a tval fail : Moonpool.Exn_bt.t -> _ tval self : unit -> anyself () is the current fiber. Must be run from inside a fiber.
val peek : 'a t -> 'a Moonpool.Fut.or_error optionPeek inside the future result
val is_done : _ t -> boolHas the fiber completed?
val is_cancelled : _ t -> boolHas the fiber completed with a failure?
val is_success : _ t -> boolHas the fiber completed with a value?
val await : 'a t -> 'aawait fib is like Fut.await (res fib)
val wait_block_exn : 'a t -> 'await_block_exn fib is Fut.wait_block_exn (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.
val wait_block : 'a t -> 'a Moonpool.Fut.or_errorwait_block fib is Fut.wait_block (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.
Check if the current fiber is cancelled, in which case this raises. Must be run from inside a fiber.
val add_on_cancel : _ t -> cancel_callback -> cancel_handleadd_on_cancel fib cb adds cb to the list of cancel callbacks for fib. If fib is already cancelled, cb is called immediately.
val remove_on_cancel : _ t -> cancel_handle -> unitremove_on_cancel fib h removes the cancel callback associated with handle h.
val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'awith_on_cancel fib cb (fun () -> <e>) evaluates e in a scope in which, if the fiber fib is cancelled, cb() is called. If e returns without the fiber being cancelled, this callback is removed.
val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'awith_on_self_cancel cb f calls f() in a scope where cb is added to the cancel callbacks of the current fiber; and f() terminates, cb is removed from the list.
Wait for fiber to be done and call the callback with the result. If the fiber is done already then the callback is invoked immediately with its result.
val spawn_top : on:Moonpool.Runner.t -> (unit -> 'a) -> 'a tspawn_top ~on f spawns a new (toplevel) fiber onto the given runner. This fiber is not the child of any other fiber: its lifetime is only determined by the lifetime of f().
val spawn : ?on:Moonpool.Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a tspawn ~protect f spawns a sub-fiber f_child from a running fiber parent. The sub-fiber f_child is attached to the current fiber and fails if the current fiber parent fails.
spawn_ignore f is ignore (spawn f). The fiber will still affect termination of the parent, ie. the parent will exit only after this new fiber exits.
val spawn_top_ignore : on:Moonpool.Runner.t -> (unit -> _) -> unitLike spawn_top but ignores the result.
Moonpool_fibFibers for moonpool.
See Fiber for the most important explanations.
module Fiber : sig ... endFibers.
module Fls : sig ... endFiber-local storage.
module Handle : sig ... endThe unique name of a fiber.
module Main : sig ... endMain thread.
include module type of struct include Fiber endtype cancel_callback = Moonpool.Exn_bt.t -> unitA callback used in case of cancellation
type 'a t = 'a Fiber.{Private_}1.tA fiber returning a value of type 'a.
val res : 'a t -> 'a Moonpool.Fut.tFuture result of the fiber.
type 'a callback = 'a Moonpool.Exn_bt.result -> unitCallbacks that are called when a fiber is done.
Type erased fiber
val return : 'a -> 'a tval fail : Moonpool.Exn_bt.t -> _ tval self : unit -> anyself () is the current fiber. Must be run from inside a fiber.
val peek : 'a t -> 'a Moonpool.Fut.or_error optionPeek inside the future result
val is_done : _ t -> boolHas the fiber completed?
val is_cancelled : _ t -> boolHas the fiber completed with a failure?
val is_success : _ t -> boolHas the fiber completed with a value?
val await : 'a t -> 'aawait fib is like Fut.await (res fib)
val wait_block_exn : 'a t -> 'await_block_exn fib is Fut.wait_block_exn (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.
val wait_block : 'a t -> 'a Moonpool.Fut.or_errorwait_block fib is Fut.wait_block (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.
Check if the current fiber is cancelled, in which case this raises. Must be run from inside a fiber.
type cancel_handle = Fiber.cancel_handleAn opaque handle for a single cancel callback in a fiber
val add_on_cancel : _ t -> cancel_callback -> cancel_handleadd_on_cancel fib cb adds cb to the list of cancel callbacks for fib. If fib is already cancelled, cb is called immediately.
val remove_on_cancel : _ t -> cancel_handle -> unitremove_on_cancel fib h removes the cancel callback associated with handle h.
val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'awith_on_cancel fib cb (fun () -> <e>) evaluates e in a scope in which, if the fiber fib is cancelled, cb() is called. If e returns without the fiber being cancelled, this callback is removed.
val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'awith_on_self_cancel cb f calls f() in a scope where cb is added to the cancel callbacks of the current fiber; and f() terminates, cb is removed from the list.
Wait for fiber to be done and call the callback with the result. If the fiber is done already then the callback is invoked immediately with its result.
val spawn_top : on:Moonpool.Runner.t -> (unit -> 'a) -> 'a tspawn_top ~on f spawns a new (toplevel) fiber onto the given runner. This fiber is not the child of any other fiber: its lifetime is only determined by the lifetime of f().
val spawn : ?on:Moonpool.Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a tspawn ~protect f spawns a sub-fiber f_child from a running fiber parent. The sub-fiber f_child is attached to the current fiber and fails if the current fiber parent fails.
spawn_ignore f is ignore (spawn f). The fiber will still affect termination of the parent, ie. the parent will exit only after this new fiber exits.
val spawn_top_ignore : on:Moonpool.Runner.t -> (unit -> _) -> unitLike spawn_top but ignores the result.
include module type of struct include Main endval main : (Moonpool.Runner.t -> 'a) -> 'amain f runs f() in a scope that handles effects, including Fiber.await.
This scope can run background tasks as well, in a cooperative fashion.
Moonpool_fibFibers for moonpool.
See Fiber for the most important explanations.
module Fiber : sig ... endFibers.
module Fls : sig ... endFiber-local storage.
module Handle : sig ... endThe unique name of a fiber.
module Main : sig ... endMain thread.
include module type of struct include Fiber endtype cancel_callback = Moonpool.Exn_bt.t -> unitA callback used in case of cancellation
type 'a t = 'a Fiber.{Private_}1.tA fiber returning a value of type 'a.
val res : 'a t -> 'a Moonpool.Fut.tFuture result of the fiber.
type 'a callback = 'a Moonpool.Exn_bt.result -> unitCallbacks that are called when a fiber is done.
Type erased fiber
val return : 'a -> 'a tval fail : Moonpool.Exn_bt.t -> _ tval self : unit -> anyself () is the current fiber. Must be run from inside a fiber.
val peek : 'a t -> 'a Moonpool.Fut.or_error optionPeek inside the future result
val is_done : _ t -> boolHas the fiber completed?
val is_cancelled : _ t -> boolHas the fiber completed with a failure?
val is_success : _ t -> boolHas the fiber completed with a value?
val await : 'a t -> 'aawait fib is like Fut.await (res fib)
val wait_block_exn : 'a t -> 'await_block_exn fib is Fut.wait_block_exn (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.
val wait_block : 'a t -> 'a Moonpool.Fut.or_errorwait_block fib is Fut.wait_block (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.
Check if the current fiber is cancelled, in which case this raises. Must be run from inside a fiber.
type cancel_handle = Fiber.cancel_handleAn opaque handle for a single cancel callback in a fiber
val add_on_cancel : _ t -> cancel_callback -> cancel_handleadd_on_cancel fib cb adds cb to the list of cancel callbacks for fib. If fib is already cancelled, cb is called immediately.
val remove_on_cancel : _ t -> cancel_handle -> unitremove_on_cancel fib h removes the cancel callback associated with handle h.
val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'awith_on_cancel fib cb (fun () -> <e>) evaluates e in a scope in which, if the fiber fib is cancelled, cb() is called. If e returns without the fiber being cancelled, this callback is removed.
val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'awith_on_self_cancel cb f calls f() in a scope where cb is added to the cancel callbacks of the current fiber; and f() terminates, cb is removed from the list.
Wait for fiber to be done and call the callback with the result. If the fiber is done already then the callback is invoked immediately with its result.
val spawn_top : on:Moonpool.Runner.t -> (unit -> 'a) -> 'a tspawn_top ~on f spawns a new (toplevel) fiber onto the given runner. This fiber is not the child of any other fiber: its lifetime is only determined by the lifetime of f().
val spawn : ?on:Moonpool.Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a tspawn ~protect f spawns a sub-fiber f_child from a running fiber parent. The sub-fiber f_child is attached to the current fiber and fails if the current fiber parent fails.
spawn_ignore f is ignore (spawn f). The fiber will still affect termination of the parent, ie. the parent will exit only after this new fiber exits.
val spawn_top_ignore : on:Moonpool.Runner.t -> (unit -> _) -> unitLike spawn_top but ignores the result.
include module type of struct include Main endval main : (Moonpool.Runner.t -> 'a) -> 'amain f runs f() in a scope that handles effects, including Fiber.await.
This scope can run background tasks as well, in a cooperative fashion.
This lock is based on Picos_sync.Mutex so it is await-safe.
val create : 'a -> 'a tCreate a new protected value.
val with_ : 'a t -> ('a -> 'b) -> 'bwith_ l f runs f x where x is the value protected with the lock l, in a critical section. If f x fails, with_lock l f fails too but the lock is released.
val update : 'a t -> ('a -> 'a) -> unitupdate l f replaces the content x of l with f x, while protected by the mutex.
val update_map : 'a t -> ('a -> 'a * 'b) -> 'bupdate_map l f computes x', y = f (get l), then puts x' in l and returns y, while protected by the mutex.
val mutex : _ t -> Picos_std_sync.Mutex.tUnderlying mutex.
val get : 'a t -> 'aAtomically get the value in the lock. The value that is returned isn't protected!