format with 0.27

This commit is contained in:
Simon Cruanes 2025-05-02 10:58:50 -04:00
parent d50c227578
commit bb9418d86a
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
31 changed files with 601 additions and 640 deletions

View file

@ -78,7 +78,7 @@ jobs:
strategy:
matrix:
ocaml-compiler:
- '5.2'
- '5.3'
runs-on: 'ubuntu-latest'
steps:
- uses: actions/checkout@main
@ -89,6 +89,6 @@ jobs:
dune-cache: true
allow-prerelease-opam: true
- run: opam install ocamlformat.0.26.2
- run: opam install ocamlformat.0.27.0
- run: opam exec -- make format-check

View file

@ -1,4 +1,4 @@
version = 0.26.2
version = 0.27.0
profile=conventional
margin=80
if-then-else=k-r

View file

@ -1,4 +1,5 @@
(** Example from https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 *)
(** Example from
https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 *)
let ( let@ ) = ( @@ )

View file

@ -1,13 +1,11 @@
(** A simple runner with a single background thread.
Because this is guaranteed to have a single worker thread,
tasks scheduled in this runner always run asynchronously but
in a sequential fashion.
Because this is guaranteed to have a single worker thread, tasks scheduled
in this runner always run asynchronously but in a sequential fashion.
This is similar to {!Fifo_pool} with exactly one thread.
@since 0.6
*)
@since 0.6 *)
include module type of Runner

View file

@ -19,32 +19,29 @@ val pop : 'a t -> 'a
@raise Closed if the queue was closed before a new element was available. *)
val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop q] immediately pops the first element of [q], if any,
or returns [None] without blocking.
@param force_lock if true, use {!Mutex.lock} (which can block under contention);
if false, use {!Mutex.try_lock}, which might return [None] even in
presence of an element if there's contention *)
(** [try_pop q] immediately pops the first element of [q], if any, or returns
[None] without blocking.
@param force_lock
if true, use {!Mutex.lock} (which can block under contention); if false,
use {!Mutex.try_lock}, which might return [None] even in presence of an
element if there's contention *)
val try_push : 'a t -> 'a -> bool
(** [try_push q x] tries to push into [q], in which case
it returns [true]; or it fails to push and returns [false]
without blocking.
@raise Closed if the locking succeeded but the queue is closed.
*)
(** [try_push q x] tries to push into [q], in which case it returns [true]; or
it fails to push and returns [false] without blocking.
@raise Closed if the locking succeeded but the queue is closed. *)
val transfer : 'a t -> 'a Queue.t -> unit
(** [transfer bq q2] transfers all items presently
in [bq] into [q2] in one atomic section, and clears [bq].
It blocks if no element is in [bq].
This is useful to consume elements from the queue in batch.
Create a [Queue.t] locally:
(** [transfer bq q2] transfers all items presently in [bq] into [q2] in one
atomic section, and clears [bq]. It blocks if no element is in [bq].
This is useful to consume elements from the queue in batch. Create a
[Queue.t] locally:
{[
let dowork (work_queue: job Bb_queue.t) =
let dowork (work_queue : job Bb_queue.t) =
(* local queue, not thread safe *)
let local_q = Queue.create() in
let local_q = Queue.create () in
try
while true do
(* work on local events, already on this thread *)
@ -69,8 +66,8 @@ type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit
val to_iter : 'a t -> 'a iter
(** [to_iter q] returns an iterator over all items in the queue.
This might not terminate if [q] is never closed.
(** [to_iter q] returns an iterator over all items in the queue. This might not
terminate if [q] is never closed.
@since 0.4 *)
val to_gen : 'a t -> 'a gen

View file

@ -1,15 +1,13 @@
(** A blocking queue of finite size.
This queue, while still using locks underneath
(like the regular blocking queue) should be enough for
usage under reasonable contention.
This queue, while still using locks underneath (like the regular blocking
queue) should be enough for usage under reasonable contention.
The bounded size is helpful whenever some form of backpressure is
desirable: if the queue is used to communicate between producer(s)
and consumer(s), the consumer(s) can limit the rate at which
producer(s) send new work down their way.
Whenever the queue is full, means that producer(s) will have to
wait before pushing new work.
The bounded size is helpful whenever some form of backpressure is desirable:
if the queue is used to communicate between producer(s) and consumer(s), the
consumer(s) can limit the rate at which producer(s) send new work down their
way. Whenever the queue is full, means that producer(s) will have to wait
before pushing new work.
@since 0.4 *)
@ -19,42 +17,41 @@ type 'a t
val create : max_size:int -> unit -> 'a t
val close : _ t -> unit
(** [close q] closes [q]. No new elements can be pushed into [q],
and after all the elements still in [q] currently are [pop]'d,
{!pop} will also raise {!Closed}. *)
(** [close q] closes [q]. No new elements can be pushed into [q], and after all
the elements still in [q] currently are [pop]'d, {!pop} will also raise
{!Closed}. *)
exception Closed
val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] at the end of the queue.
If [q] is full, this will block until there is
room for [x].
(** [push q x] pushes [x] at the end of the queue. If [q] is full, this will
block until there is room for [x].
@raise Closed if [q] is closed. *)
val try_push : force_lock:bool -> 'a t -> 'a -> bool
(** [try_push q x] attempts to push [x] into [q], but abandons
if it cannot acquire [q] or if [q] is full.
(** [try_push q x] attempts to push [x] into [q], but abandons if it cannot
acquire [q] or if [q] is full.
@param force_lock if true, use {!Mutex.lock} (which can block
under contention);
if false, use {!Mutex.try_lock}, which might return [false] even
if there's room in the queue.
@param force_lock
if true, use {!Mutex.lock} (which can block under contention); if false,
use {!Mutex.try_lock}, which might return [false] even if there's room in
the queue.
@raise Closed if [q] is closed. *)
val pop : 'a t -> 'a
(** [pop q] pops the first element off [q]. It blocks if [q]
is empty, until some element becomes available.
(** [pop q] pops the first element off [q]. It blocks if [q] is empty, until
some element becomes available.
@raise Closed if [q] is empty and closed. *)
val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop ~force_lock q] tries to pop the first element, or returns [None]
if no element is available or if it failed to acquire [q].
(** [try_pop ~force_lock q] tries to pop the first element, or returns [None] if
no element is available or if it failed to acquire [q].
@param force_lock if true, use {!Mutex.lock} (which can block
under contention);
if false, use {!Mutex.try_lock}, which might return [None] even in
presence of an element if there's contention.
@param force_lock
if true, use {!Mutex.lock} (which can block under contention); if false,
use {!Mutex.try_lock}, which might return [None] even in presence of an
element if there's contention.
@raise Closed if [q] is empty and closed. *)
@ -65,9 +62,8 @@ val max_size : _ t -> int
(** Maximum size of the queue. See {!create}. *)
val transfer : 'a t -> 'a Queue.t -> unit
(** [transfer bq q2] transfers all elements currently available
in [bq] into local queue [q2], and clears [bq], atomically.
It blocks if [bq] is empty.
(** [transfer bq q2] transfers all elements currently available in [bq] into
local queue [q2], and clears [bq], atomically. It blocks if [bq] is empty.
See {!Bb_queue.transfer} for more details.
@raise Closed if [bq] is empty and closed. *)
@ -76,8 +72,8 @@ type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit
val to_iter : 'a t -> 'a iter
(** [to_iter q] returns an iterator over all items in the queue.
This might not terminate if [q] is never closed. *)
(** [to_iter q] returns an iterator over all items in the queue. This might not
terminate if [q] is never closed. *)
val to_gen : 'a t -> 'a gen
(** [to_gen q] returns a generator from the queue. *)

View file

@ -15,31 +15,30 @@ val create : max_size:int -> unit -> 'a t
exception Closed
val try_push : 'a t -> 'a -> bool
(** [try_push chan x] pushes [x] into [chan]. This does not block.
Returns [true] if it succeeded in pushing.
(** [try_push chan x] pushes [x] into [chan]. This does not block. Returns
[true] if it succeeded in pushing.
@raise Closed if the channel is closed. *)
val try_pop : 'a t -> 'a option
(** [try_pop chan] pops and return an element if one is available
immediately. Otherwise it returns [None].
@raise Closed if the channel is closed and empty.
*)
(** [try_pop chan] pops and return an element if one is available immediately.
Otherwise it returns [None].
@raise Closed if the channel is closed and empty. *)
val close : _ t -> unit
(** Close the channel. Further push and pop calls will fail.
This is idempotent. *)
(** Close the channel. Further push and pop calls will fail. This is idempotent.
*)
[@@@ifge 5.0]
val push : 'a t -> 'a -> unit
(** Push the value into the channel, suspending the current task
if the channel is currently full.
(** Push the value into the channel, suspending the current task if the channel
is currently full.
@raise Closed if the channel is closed
@since 0.7 *)
val pop : 'a t -> 'a
(** Pop an element. This might suspend the current task if the
channel is currently empty.
(** Pop an element. This might suspend the current task if the channel is
currently empty.
@raise Closed if the channel is empty and closed.
@since 0.7 *)

View file

@ -1,16 +1,16 @@
(** A simple thread pool in FIFO order.
FIFO: first-in, first-out. Basically tasks are put into a queue,
and worker threads pull them out of the queue at the other end.
FIFO: first-in, first-out. Basically tasks are put into a queue, and worker
threads pull them out of the queue at the other end.
Since this uses a single blocking queue to manage tasks, it's very
simple and reliable. The number of worker threads is fixed, but
they are spread over several domains to enable parallelism.
Since this uses a single blocking queue to manage tasks, it's very simple
and reliable. The number of worker threads is fixed, but they are spread
over several domains to enable parallelism.
This can be useful for latency-sensitive applications (e.g. as a
pool of workers for network servers). Work-stealing pools might
have higher throughput but they're very unfair to some tasks; by
contrast, here, older tasks have priority over younger tasks.
This can be useful for latency-sensitive applications (e.g. as a pool of
workers for network servers). Work-stealing pools might have higher
throughput but they're very unfair to some tasks; by contrast, here, older
tasks have priority over younger tasks.
@since 0.5 *)
@ -28,22 +28,22 @@ type ('a, 'b) create_args =
val create : (unit -> t, _) create_args
(** [create ()] makes a new thread pool.
@param on_init_thread called at the beginning of each new thread in the pool.
@param min minimum size of the pool. See {!Pool.create_args}.
The default is [Domain.recommended_domain_count()], ie one worker per
CPU core.
On OCaml 4 the default is [4] (since there is only one domain).
@param on_init_thread
called at the beginning of each new thread in the pool.
@param min
minimum size of the pool. See {!Pool.create_args}. The default is
[Domain.recommended_domain_count()], ie one worker per CPU core. On OCaml
4 the default is [4] (since there is only one domain).
@param on_exit_thread called at the end of each worker thread in the pool.
@param around_task a pair of [before, after] functions
ran around each task. See {!Pool.create_args}.
@param name name for the pool, used in tracing (since 0.6)
*)
@param around_task
a pair of [before, after] functions ran around each task. See
{!Pool.create_args}.
@param name name for the pool, used in tracing (since 0.6) *)
val with_ : (unit -> (t -> 'a) -> 'a, _) create_args
(** [with_ () f] calls [f pool], where [pool] is obtained via {!create}.
When [f pool] returns or fails, [pool] is shutdown and its resources
are released.
Most parameters are the same as in {!create}. *)
(** [with_ () f] calls [f pool], where [pool] is obtained via {!create}. When
[f pool] returns or fails, [pool] is shutdown and its resources are
released. Most parameters are the same as in {!create}. *)
(**/**)

View file

@ -1,21 +1,19 @@
(** Futures.
A future of type ['a t] represents the result of a computation
that will yield a value of type ['a].
A future of type ['a t] represents the result of a computation that will
yield a value of type ['a].
Typically, the computation is running on a thread pool {!Runner.t}
and will proceed on some worker. Once set, a future cannot change.
It either succeeds (storing a [Ok x] with [x: 'a]), or fail
(storing a [Error (exn, bt)] with an exception and the corresponding
backtrace).
Typically, the computation is running on a thread pool {!Runner.t} and will
proceed on some worker. Once set, a future cannot change. It either succeeds
(storing a [Ok x] with [x: 'a]), or fail (storing a [Error (exn, bt)] with
an exception and the corresponding backtrace).
Combinators such as {!map} and {!join_array} can be used to produce
futures from other futures (in a monadic way). Some combinators take
a [on] argument to specify a runner on which the intermediate computation takes
place; for example [map ~on:pool ~f fut] maps the value in [fut]
using function [f], applicatively; the call to [f] happens on
the runner [pool] (once [fut] resolves successfully with a value).
*)
Combinators such as {!map} and {!join_array} can be used to produce futures
from other futures (in a monadic way). Some combinators take a [on] argument
to specify a runner on which the intermediate computation takes place; for
example [map ~on:pool ~f fut] maps the value in [fut] using function [f],
applicatively; the call to [f] happens on the runner [pool] (once [fut]
resolves successfully with a value). *)
type 'a or_error = ('a, Exn_bt.t) result
@ -23,9 +21,9 @@ type 'a t = 'a Picos.Computation.t
(** A future with a result of type ['a]. *)
type 'a promise = private 'a t
(** A promise, which can be fulfilled exactly once to set
the corresponding future.
This is a private alias of ['a t] since 0.7, previously it was opaque. *)
(** A promise, which can be fulfilled exactly once to set the corresponding
future. This is a private alias of ['a t] since 0.7, previously it was
opaque. *)
val make : unit -> 'a t * 'a promise
(** Make a new future with the associated promise. *)
@ -35,21 +33,20 @@ val make_promise : unit -> 'a promise
future). This is useful mostly to preserve memory.
How to upcast to a future in the worst case:
{[let prom = Fut.make_promise();;
let fut = (prom : _ Fut.promise :> _ Fut.t) ;;
{[
let prom = Fut.make_promise ()
let fut = (prom : _ Fut.promise :> _ Fut.t)
]}
@since 0.7 *)
val on_result : 'a t -> ('a or_error -> unit) -> unit
(** [on_result fut f] registers [f] to be called in the future
when [fut] is set ;
or calls [f] immediately if [fut] is already set. *)
(** [on_result fut f] registers [f] to be called in the future when [fut] is set
; or calls [f] immediately if [fut] is already set. *)
val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
(** [on_result_ignore fut f] registers [f] to be called in the future
when [fut] is set;
or calls [f] immediately if [fut] is already set.
It does not pass the result, only a success/error signal.
(** [on_result_ignore fut f] registers [f] to be called in the future when [fut]
is set; or calls [f] immediately if [fut] is already set. It does not pass
the result, only a success/error signal.
@since 0.7 *)
exception Already_fulfilled
@ -59,8 +56,8 @@ val fulfill : 'a promise -> 'a or_error -> unit
@raise Already_fulfilled if the promise is already fulfilled. *)
val fulfill_idempotent : 'a promise -> 'a or_error -> unit
(** Fullfill the promise, setting the future at the same time.
Does nothing if the promise is already fulfilled. *)
(** Fullfill the promise, setting the future at the same time. Does nothing if
the promise is already fulfilled. *)
val return : 'a -> 'a t
(** Already settled future, with a result *)
@ -78,22 +75,22 @@ val is_resolved : _ t -> bool
(** [is_resolved fut] is [true] iff [fut] is resolved. *)
val peek : 'a t -> 'a or_error option
(** [peek fut] returns [Some r] if [fut] is currently resolved with [r],
and [None] if [fut] is not resolved yet. *)
(** [peek fut] returns [Some r] if [fut] is currently resolved with [r], and
[None] if [fut] is not resolved yet. *)
exception Not_ready
(** @since 0.2 *)
val get_or_fail : 'a t -> 'a or_error
(** [get_or_fail fut] obtains the result from [fut] if it's fulfilled
(i.e. if [peek fut] returns [Some res], [get_or_fail fut] returns [res]).
(** [get_or_fail fut] obtains the result from [fut] if it's fulfilled (i.e. if
[peek fut] returns [Some res], [get_or_fail fut] returns [res]).
@raise Not_ready if the future is not ready.
@since 0.2 *)
val get_or_fail_exn : 'a t -> 'a
(** [get_or_fail_exn fut] obtains the result from [fut] if it's fulfilled,
like {!get_or_fail}. If the result is an [Error _], the exception inside
is re-raised.
(** [get_or_fail_exn fut] obtains the result from [fut] if it's fulfilled, like
{!get_or_fail}. If the result is an [Error _], the exception inside is
re-raised.
@raise Not_ready if the future is not ready.
@since 0.2 *)
@ -116,12 +113,12 @@ val raise_if_failed : _ t -> unit
(** {2 Combinators} *)
val spawn : on:Runner.t -> (unit -> 'a) -> 'a t
(** [spaw ~on f] runs [f()] on the given runner [on], and return a future that will
hold its result. *)
(** [spaw ~on f] runs [f()] on the given runner [on], and return a future that
will hold its result. *)
val spawn_on_current_runner : (unit -> 'a) -> 'a t
(** This must be run from inside a runner, and schedules
the new task on it as well.
(** This must be run from inside a runner, and schedules the new task on it as
well.
See {!Runner.get_current_runner} to see how the runner is found.
@ -129,28 +126,26 @@ val spawn_on_current_runner : (unit -> 'a) -> 'a t
@raise Failure if run from outside a runner. *)
val reify_error : 'a t -> 'a or_error t
(** [reify_error fut] turns a failing future into a non-failing
one that contain [Error (exn, bt)]. A non-failing future
returning [x] is turned into [Ok x]
(** [reify_error fut] turns a failing future into a non-failing one that contain
[Error (exn, bt)]. A non-failing future returning [x] is turned into [Ok x]
@since 0.4 *)
val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t
(** [map ?on ~f fut] returns a new future [fut2] that resolves
with [f x] if [fut] resolved with [x];
and fails with [e] if [fut] fails with [e] or [f x] raises [e].
(** [map ?on ~f fut] returns a new future [fut2] that resolves with [f x] if
[fut] resolved with [x]; and fails with [e] if [fut] fails with [e] or [f x]
raises [e].
@param on if provided, [f] runs on the given runner *)
val bind : ?on:Runner.t -> f:('a -> 'b t) -> 'a t -> 'b t
(** [bind ?on ~f fut] returns a new future [fut2] that resolves
like the future [f x] if [fut] resolved with [x];
and fails with [e] if [fut] fails with [e] or [f x] raises [e].
(** [bind ?on ~f fut] returns a new future [fut2] that resolves like the future
[f x] if [fut] resolved with [x]; and fails with [e] if [fut] fails with [e]
or [f x] raises [e].
@param on if provided, [f] runs on the given runner *)
val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t
(** [bind_reify_error ?on ~f fut] returns a new future [fut2] that resolves
like the future [f (Ok x)] if [fut] resolved with [x];
and resolves like the future [f (Error (exn, bt))]
if [fut] fails with [exn] and backtrace [bt].
(** [bind_reify_error ?on ~f fut] returns a new future [fut2] that resolves like
the future [f (Ok x)] if [fut] resolved with [x]; and resolves like the
future [f (Error (exn, bt))] if [fut] fails with [exn] and backtrace [bt].
@param on if provided, [f] runs on the given runner
@since 0.4 *)
@ -159,18 +154,18 @@ val join : 'a t t -> 'a t
@since 0.2 *)
val both : 'a t -> 'b t -> ('a * 'b) t
(** [both a b] succeeds with [x, y] if [a] succeeds with [x] and
[b] succeeds with [y], or fails if any of them fails. *)
(** [both a b] succeeds with [x, y] if [a] succeeds with [x] and [b] succeeds
with [y], or fails if any of them fails. *)
val choose : 'a t -> 'b t -> ('a, 'b) Either.t t
(** [choose a b] succeeds [Left x] or [Right y] if [a] succeeds with [x] or
[b] succeeds with [y], or fails if both of them fails.
If they both succeed, it is not specified which result is used. *)
(** [choose a b] succeeds [Left x] or [Right y] if [a] succeeds with [x] or [b]
succeeds with [y], or fails if both of them fails. If they both succeed, it
is not specified which result is used. *)
val choose_same : 'a t -> 'a t -> 'a t
(** [choose_same a b] succeeds with the value of one of [a] or [b] if
they succeed, or fails if both fail.
If they both succeed, it is not specified which result is used. *)
(** [choose_same a b] succeeds with the value of one of [a] or [b] if they
succeed, or fails if both fail. If they both succeed, it is not specified
which result is used. *)
val join_array : 'a t array -> 'a array t
(** Wait for all the futures in the array. Fails if any future fails. *)
@ -185,18 +180,18 @@ module Advanced : sig
aggregate_results:(('a t -> 'a) -> 'cont -> 'res) ->
'cont ->
'res t
(** [barrier_on_abstract_container_of_futures ~iter ~aggregate_results ~len cont] takes a
container of futures ([cont]), with [len] elements,
and returns a future result of type [res]
(possibly another type of container).
(** [barrier_on_abstract_container_of_futures ~iter ~aggregate_results ~len
cont] takes a container of futures ([cont]), with [len] elements, and
returns a future result of type [res] (possibly another type of
container).
This waits for all futures in [cont: 'cont] to be done
(futures obtained via [iter <some function> cont]). If they
all succeed, their results are aggregated into a new
result of type ['res] via [aggregate_results <some function> cont].
This waits for all futures in [cont: 'cont] to be done (futures obtained
via [iter <some function> cont]). If they all succeed, their results are
aggregated into a new result of type ['res] via
[aggregate_results <some function> cont].
{b NOTE}: the behavior is not specified if [iter f cont] (for a function f)
doesn't call [f] on exactly [len cont] elements.
{b NOTE}: the behavior is not specified if [iter f cont] (for a function
f) doesn't call [f] on exactly [len cont] elements.
@since 0.5.1 *)
end
@ -206,23 +201,22 @@ val map_list : f:('a -> 'b t) -> 'a list -> 'b list t
@since 0.5.1 *)
val wait_array : _ t array -> unit t
(** [wait_array arr] waits for all futures in [arr] to resolve. It discards
the individual results of futures in [arr]. It fails if any future fails. *)
(** [wait_array arr] waits for all futures in [arr] to resolve. It discards the
individual results of futures in [arr]. It fails if any future fails. *)
val wait_list : _ t list -> unit t
(** [wait_list l] waits for all futures in [l] to resolve. It discards
the individual results of futures in [l]. It fails if any future fails. *)
(** [wait_list l] waits for all futures in [l] to resolve. It discards the
individual results of futures in [l]. It fails if any future fails. *)
val for_ : on:Runner.t -> int -> (int -> unit) -> unit t
(** [for_ ~on n f] runs [f 0], [f 1], …, [f (n-1)] on the runner, and returns
a future that resolves when all the tasks have resolved, or fails
as soon as one task has failed. *)
(** [for_ ~on n f] runs [f 0], [f 1], …, [f (n-1)] on the runner, and returns a
future that resolves when all the tasks have resolved, or fails as soon as
one task has failed. *)
val for_array : on:Runner.t -> 'a array -> (int -> 'a -> unit) -> unit t
(** [for_array ~on arr f] runs [f 0 arr.(0)], …, [f (n-1) arr.(n-1)] in
the runner (where [n = Array.length arr]), and returns a future
that resolves when all the tasks are done,
or fails if any of them fails.
(** [for_array ~on arr f] runs [f 0 arr.(0)], …, [f (n-1) arr.(n-1)] in the
runner (where [n = Array.length arr]), and returns a future that resolves
when all the tasks are done, or fails if any of them fails.
@since 0.2 *)
val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t
@ -242,43 +236,39 @@ val await : 'a t -> 'a
@since 0.3
This must only be run from inside the runner itself. The runner must
support {!Suspend_}.
{b NOTE}: only on OCaml 5.x
*)
This must only be run from inside the runner itself. The runner must support
{!Suspend_}. {b NOTE}: only on OCaml 5.x *)
[@@@endif]
(** {2 Blocking} *)
val wait_block : 'a t -> 'a or_error
(** [wait_block fut] blocks the current thread until [fut] is resolved,
and returns its value.
(** [wait_block fut] blocks the current thread until [fut] is resolved, and
returns its value.
{b NOTE}: A word of warning: this will monopolize the calling thread until the future
resolves. This can also easily cause deadlocks, if enough threads in a pool
call [wait_block] on futures running on the same pool or a pool depending on it.
{b NOTE}: A word of warning: this will monopolize the calling thread until
the future resolves. This can also easily cause deadlocks, if enough threads
in a pool call [wait_block] on futures running on the same pool or a pool
depending on it.
A good rule to avoid deadlocks is to run this from outside of any pool,
or to have an acyclic order between pools where [wait_block]
is only called from a pool on futures evaluated in a pool that comes lower
in the hierarchy.
If this rule is broken, it is possible for all threads in a pool to wait
for futures that can only make progress on these same threads,
hence the deadlock.
*)
A good rule to avoid deadlocks is to run this from outside of any pool, or
to have an acyclic order between pools where [wait_block] is only called
from a pool on futures evaluated in a pool that comes lower in the
hierarchy. If this rule is broken, it is possible for all threads in a pool
to wait for futures that can only make progress on these same threads, hence
the deadlock. *)
val wait_block_exn : 'a t -> 'a
(** Same as {!wait_block} but re-raises the exception if the future failed. *)
(** {2 Infix operators}
These combinators run on either the current pool (if present),
or on the same thread that just fulfilled the previous future
if not.
These combinators run on either the current pool (if present), or on the
same thread that just fulfilled the previous future if not.
They were previously present as [module Infix_local] and [val infix],
but are now simplified.
They were previously present as [module Infix_local] and [val infix], but
are now simplified.
@since 0.5 *)

View file

@ -42,9 +42,8 @@ let[@inline] remove_in_local_hmap (k : _ Hmap.key) : unit =
let[@inline] set_in_local_hmap (k : 'a Hmap.key) (v : 'a) : unit =
update_local_hmap (Hmap.add k v)
(** [with_in_local_hmap k v f] calls [f()] in a context
where [k] is bound to [v] in the local hmap. Then it restores the
previous binding for [k]. *)
(** [with_in_local_hmap k v f] calls [f()] in a context where [k] is bound to
[v] in the local hmap. Then it restores the previous binding for [k]. *)
let with_in_local_hmap (k : 'a Hmap.key) (v : 'a) f =
let h = get_local_hmap () in
match Hmap.find k h with

View file

@ -1,8 +1,8 @@
(** Mutex-protected resource.
This lock is a synchronous concurrency primitive, as a thin wrapper
around {!Mutex} that encourages proper management of the critical
section in RAII style:
This lock is a synchronous concurrency primitive, as a thin wrapper around
{!Mutex} that encourages proper management of the critical section in RAII
style:
{[
let (let@) = (@@)
@ -19,8 +19,8 @@
]}
This lock does not work well with {!Fut.await}. A critical section
that contains a call to [await] might cause deadlocks, or lock starvation,
This lock does not work well with {!Fut.await}. A critical section that
contains a call to [await] might cause deadlocks, or lock starvation,
because it will hold onto the lock while it goes to sleep.
@since 0.3 *)
@ -32,27 +32,27 @@ val create : 'a -> 'a t
(** Create a new protected value. *)
val with_ : 'a t -> ('a -> 'b) -> 'b
(** [with_ l f] runs [f x] where [x] is the value protected with
the lock [l], in a critical section. If [f x] fails, [with_lock l f]
fails too but the lock is released. *)
(** [with_ l f] runs [f x] where [x] is the value protected with the lock [l],
in a critical section. If [f x] fails, [with_lock l f] fails too but the
lock is released. *)
val update : 'a t -> ('a -> 'a) -> unit
(** [update l f] replaces the content [x] of [l] with [f x], while protected
by the mutex. *)
(** [update l f] replaces the content [x] of [l] with [f x], while protected by
the mutex. *)
val update_map : 'a t -> ('a -> 'a * 'b) -> 'b
(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l]
and returns [y], while protected by the mutex. *)
(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l] and
returns [y], while protected by the mutex. *)
val mutex : _ t -> Mutex.t
(** Underlying mutex. *)
val get : 'a t -> 'a
(** Atomically get the value in the lock. The value that is returned
isn't protected! *)
(** Atomically get the value in the lock. The value that is returned isn't
protected! *)
val set : 'a t -> 'a -> unit
(** Atomically set the value.
{b NOTE} caution: using {!get} and {!set} as if this were a {!ref}
is an anti pattern and will not protect data against some race conditions. *)
{b NOTE} caution: using {!get} and {!set} as if this were a {!ref} is an
anti pattern and will not protect data against some race conditions. *)

View file

@ -1,13 +1,12 @@
(** Moonpool
A pool within a bigger pool (ie the ocean). Here, we're talking about
pools of [Thread.t] that are dispatched over several [Domain.t] to
enable parallelism.
A pool within a bigger pool (ie the ocean). Here, we're talking about pools
of [Thread.t] that are dispatched over several [Domain.t] to enable
parallelism.
We provide several implementations of pools
with distinct scheduling strategies, alongside some concurrency
primitives such as guarding locks ({!Lock.t}) and futures ({!Fut.t}).
*)
We provide several implementations of pools with distinct scheduling
strategies, alongside some concurrency primitives such as guarding locks
({!Lock.t}) and futures ({!Fut.t}). *)
module Ws_pool = Ws_pool
module Fifo_pool = Fifo_pool
@ -24,45 +23,45 @@ module Immediate_runner : sig end
module Exn_bt = Exn_bt
exception Shutdown
(** Exception raised when trying to run tasks on
runners that have been shut down.
(** Exception raised when trying to run tasks on runners that have been shut
down.
@since 0.6 *)
val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t
(** Similar to {!Thread.create}, but it picks a background domain at random
to run the thread. This ensures that we don't always pick the same domain
to run all the various threads needed in an application (timers, event loops, etc.) *)
(** Similar to {!Thread.create}, but it picks a background domain at random to
run the thread. This ensures that we don't always pick the same domain to
run all the various threads needed in an application (timers, event loops,
etc.) *)
val run_async : ?fiber:Picos.Fiber.t -> Runner.t -> (unit -> unit) -> unit
(** [run_async runner task] schedules the task to run
on the given runner. This means [task()] will be executed
at some point in the future, possibly in another thread.
(** [run_async runner task] schedules the task to run on the given runner. This
means [task()] will be executed at some point in the future, possibly in
another thread.
@param fiber optional initial (picos) fiber state
@since 0.5 *)
val run_wait_block : ?fiber:Picos.Fiber.t -> Runner.t -> (unit -> 'a) -> 'a
(** [run_wait_block runner f] schedules [f] for later execution
on the runner, like {!run_async}.
It then blocks the current thread until [f()] is done executing,
and returns its result. If [f()] raises an exception, then [run_wait_block pool f]
will raise it as well.
(** [run_wait_block runner f] schedules [f] for later execution on the runner,
like {!run_async}. It then blocks the current thread until [f()] is done
executing, and returns its result. If [f()] raises an exception, then
[run_wait_block pool f] will raise it as well.
See {!run_async} for more details.
{b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block}
about the required discipline to avoid deadlocks).
{b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block} about the
required discipline to avoid deadlocks).
@raise Shutdown if the runner was already shut down
@since 0.6 *)
val recommended_thread_count : unit -> int
(** Number of threads recommended to saturate the CPU.
For IO pools this makes little sense (you might want more threads than
this because many of them will be blocked most of the time).
(** Number of threads recommended to saturate the CPU. For IO pools this makes
little sense (you might want more threads than this because many of them
will be blocked most of the time).
@since 0.5 *)
val spawn : on:Runner.t -> (unit -> 'a) -> 'a Fut.t
(** [spawn ~on f] runs [f()] on the runner (a thread pool typically)
and returns a future result for it. See {!Fut.spawn}.
(** [spawn ~on f] runs [f()] on the runner (a thread pool typically) and returns
a future result for it. See {!Fut.spawn}.
@since 0.5 *)
val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.t
@ -76,14 +75,13 @@ val get_current_runner : unit -> Runner.t option
[@@@ifge 5.0]
val await : 'a Fut.t -> 'a
(** Await a future, must be run on a moonpool runner. See {!Fut.await}.
Only on OCaml >= 5.0.
(** Await a future, must be run on a moonpool runner. See {!Fut.await}. Only on
OCaml >= 5.0.
@since 0.5 *)
val yield : unit -> unit
(** Yield from the current task, must be run on a moonpool runner.
Only on OCaml >= 5.0.
(** Yield from the current task, must be run on a moonpool runner. Only on OCaml
>= 5.0.
@since NEXT_RELEASE *)
[@@@endif]
@ -96,35 +94,33 @@ module Thread_local_storage = Thread_local_storage
(** A simple blocking queue.
This queue is quite basic and will not behave well under heavy
contention. However, it can be sufficient for many practical use cases.
This queue is quite basic and will not behave well under heavy contention.
However, it can be sufficient for many practical use cases.
{b NOTE}: this queue will typically block the caller thread
in case the operation (push/pop) cannot proceed.
Be wary of deadlocks when using the queue {i from} a pool
when you expect the other end to also be produced/consumed from
the same pool.
{b NOTE}: this queue will typically block the caller thread in case the
operation (push/pop) cannot proceed. Be wary of deadlocks when using the
queue {i from} a pool when you expect the other end to also be
produced/consumed from the same pool.
See discussion on {!Fut.wait_block} for more details on deadlocks
and how to mitigate the risk of running into them.
See discussion on {!Fut.wait_block} for more details on deadlocks and how to
mitigate the risk of running into them.
More scalable queues can be found in
Lockfree (https://github.com/ocaml-multicore/lockfree/)
*)
More scalable queues can be found in Lockfree
(https://github.com/ocaml-multicore/lockfree/) *)
module Blocking_queue : sig
type 'a t
(** Unbounded blocking queue.
This queue is thread-safe and will block when calling {!pop}
on it when it's empty. *)
This queue is thread-safe and will block when calling {!pop} on it when
it's empty. *)
val create : unit -> _ t
(** Create a new unbounded queue. *)
val size : _ t -> int
(** Number of items currently in the queue. Note that [pop]
might still block if this returns a non-zero number, since another
thread might have consumed the items in the mean time.
(** Number of items currently in the queue. Note that [pop] might still block
if this returns a non-zero number, since another thread might have
consumed the items in the mean time.
@since 0.2 *)
exception Closed
@ -132,50 +128,47 @@ module Blocking_queue : sig
val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] into [q], and returns [()].
In the current implementation, [push q] will never block for
a long time, it will only block while waiting for a lock
so it can push the element.
In the current implementation, [push q] will never block for a long time,
it will only block while waiting for a lock so it can push the element.
@raise Closed if the queue is closed (by a previous call to [close q]) *)
val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *)
val close : _ t -> unit
(** Close the queue, meaning there won't be any more [push] allowed,
ie [push] will raise {!Closed}.
[pop] will keep working and will return the elements present in the
queue, until it's entirely drained; then [pop] will
also raise {!Closed}. *)
val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop q] immediately pops the first element of [q], if any,
or returns [None] without blocking.
@param force_lock if true, use {!Mutex.lock} (which can block under contention);
if false, use {!Mutex.try_lock}, which might return [None] even in
presence of an element if there's contention *)
val try_push : 'a t -> 'a -> bool
(** [try_push q x] tries to push into [q], in which case
it returns [true]; or it fails to push and returns [false]
without blocking.
@raise Closed if the locking succeeded but the queue is closed.
(** [pop q] pops the next element in [q]. It might block until an element
comes.
@raise Closed if the queue was closed before a new element was available.
*)
val close : _ t -> unit
(** Close the queue, meaning there won't be any more [push] allowed, ie [push]
will raise {!Closed}.
[pop] will keep working and will return the elements present in the queue,
until it's entirely drained; then [pop] will also raise {!Closed}. *)
val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop q] immediately pops the first element of [q], if any, or returns
[None] without blocking.
@param force_lock
if true, use {!Mutex.lock} (which can block under contention); if false,
use {!Mutex.try_lock}, which might return [None] even in presence of an
element if there's contention *)
val try_push : 'a t -> 'a -> bool
(** [try_push q x] tries to push into [q], in which case it returns [true]; or
it fails to push and returns [false] without blocking.
@raise Closed if the locking succeeded but the queue is closed. *)
val transfer : 'a t -> 'a Queue.t -> unit
(** [transfer bq q2] transfers all items presently
in [bq] into [q2] in one atomic section, and clears [bq].
It blocks if no element is in [bq].
This is useful to consume elements from the queue in batch.
Create a [Queue.t] locally:
(** [transfer bq q2] transfers all items presently in [bq] into [q2] in one
atomic section, and clears [bq]. It blocks if no element is in [bq].
This is useful to consume elements from the queue in batch. Create a
[Queue.t] locally:
{[
let dowork (work_queue: job Bb_queue.t) =
let dowork (work_queue : job Bb_queue.t) =
(* local queue, not thread safe *)
let local_q = Queue.create() in
let local_q = Queue.create () in
try
while true do
(* work on local events, already on this thread *)
@ -197,8 +190,8 @@ module Blocking_queue : sig
type 'a iter = ('a -> unit) -> unit
val to_iter : 'a t -> 'a iter
(** [to_iter q] returns an iterator over all items in the queue.
This might not terminate if [q] is never closed.
(** [to_iter q] returns an iterator over all items in the queue. This might
not terminate if [q] is never closed.
@since 0.4 *)
val to_gen : 'a t -> 'a gen
@ -215,8 +208,8 @@ module Bounded_queue = Bounded_queue
module Atomic = Atomic_
(** Atomic values.
This is either a shim using [ref], on pre-OCaml 5, or the
standard [Atomic] module on OCaml 5. *)
This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic]
module on OCaml 5. *)
(**/**)
@ -226,8 +219,8 @@ module Private : sig
(** A deque for work stealing, fixed size. *)
module Worker_loop_ = Worker_loop_
(** Worker loop. This is useful to implement custom runners, it
should run on each thread of the runner.
(** Worker loop. This is useful to implement custom runners, it should run on
each thread of the runner.
@since 0.7 *)
module Domain_ = Domain_

View file

@ -1,9 +1,8 @@
(** Interface for runners.
This provides an abstraction for running tasks in the background,
which is implemented by various thread pools.
@since 0.3
*)
This provides an abstraction for running tasks in the background, which is
implemented by various thread pools.
@since 0.3 *)
type fiber = Picos.Fiber.t
type task = unit -> unit
@ -12,19 +11,19 @@ type t
(** A runner.
If a runner is no longer needed, {!shutdown} can be used to signal all
worker threads
in it to stop (after they finish their work), and wait for them to stop.
worker threads in it to stop (after they finish their work), and wait for
them to stop.
The threads are distributed across a fixed domain pool
(whose size is determined by {!Domain.recommended_domain_count} on OCaml 5, and
simple the single runtime on OCaml 4). *)
The threads are distributed across a fixed domain pool (whose size is
determined by {!Domain.recommended_domain_count} on OCaml 5, and simple the
single runtime on OCaml 4). *)
val size : t -> int
(** Number of threads/workers. *)
val num_tasks : t -> int
(** Current number of tasks. This is at best a snapshot, useful for metrics
and debugging. *)
(** Current number of tasks. This is at best a snapshot, useful for metrics and
debugging. *)
val shutdown : t -> unit
(** Shutdown the runner and wait for it to terminate. Idempotent. *)
@ -35,32 +34,31 @@ val shutdown_without_waiting : t -> unit
exception Shutdown
val run_async : ?fiber:fiber -> t -> task -> unit
(** [run_async pool f] schedules [f] for later execution on the runner
in one of the threads. [f()] will run on one of the runner's
worker threads/domains.
(** [run_async pool f] schedules [f] for later execution on the runner in one of
the threads. [f()] will run on one of the runner's worker threads/domains.
@param fiber if provided, run the task with this initial fiber data
@raise Shutdown if the runner was shut down before [run_async] was called. *)
@raise Shutdown if the runner was shut down before [run_async] was called.
*)
val run_wait_block : ?fiber:fiber -> t -> (unit -> 'a) -> 'a
(** [run_wait_block pool f] schedules [f] for later execution
on the pool, like {!run_async}.
It then blocks the current thread until [f()] is done executing,
and returns its result. If [f()] raises an exception, then [run_wait_block pool f]
will raise it as well.
(** [run_wait_block pool f] schedules [f] for later execution on the pool, like
{!run_async}. It then blocks the current thread until [f()] is done
executing, and returns its result. If [f()] raises an exception, then
[run_wait_block pool f] will raise it as well.
{b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block}
about the required discipline to avoid deadlocks).
{b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block} about the
required discipline to avoid deadlocks).
@raise Shutdown if the runner was already shut down *)
val dummy : t
(** Runner that fails when scheduling tasks on it.
Calling {!run_async} on it will raise Failure.
(** Runner that fails when scheduling tasks on it. Calling {!run_async} on it
will raise Failure.
@since 0.6 *)
(** {2 Implementing runners} *)
(** This module is specifically intended for users who implement their
own runners. Regular users of Moonpool should not need to look at it. *)
(** This module is specifically intended for users who implement their own
runners. Regular users of Moonpool should not need to look at it. *)
module For_runner_implementors : sig
val create :
size:(unit -> int) ->
@ -71,21 +69,20 @@ module For_runner_implementors : sig
t
(** Create a new runner.
{b NOTE}: the runner should support DLA and {!Suspend_} on OCaml 5.x,
so that {!Fork_join} and other 5.x features work properly. *)
{b NOTE}: the runner should support DLA and {!Suspend_} on OCaml 5.x, so
that {!Fork_join} and other 5.x features work properly. *)
val k_cur_runner : t Thread_local_storage.t
(** Key that should be used by each runner to store itself in TLS
on every thread it controls, so that tasks running on these threads
can access the runner. This is necessary for {!get_current_runner}
to work. *)
(** Key that should be used by each runner to store itself in TLS on every
thread it controls, so that tasks running on these threads can access the
runner. This is necessary for {!get_current_runner} to work. *)
end
val get_current_runner : unit -> t option
(** Access the current runner. This returns [Some r] if the call
happens on a thread that belongs in a runner.
(** Access the current runner. This returns [Some r] if the call happens on a
thread that belongs in a runner.
@since 0.5 *)
val get_current_fiber : unit -> fiber option
(** [get_current_storage runner] gets the local storage
for the currently running task. *)
(** [get_current_storage runner] gets the local storage for the currently
running task. *)

View file

@ -1,41 +1,38 @@
(** Task-local storage.
This storage is associated to the current task,
just like thread-local storage is associated with
the current thread. The storage is carried along in case
the current task is suspended.
This storage is associated to the current task, just like thread-local
storage is associated with the current thread. The storage is carried along
in case the current task is suspended.
@since 0.6
*)
@since 0.6 *)
type 'a t = 'a Picos.Fiber.FLS.t
val create : unit -> 'a t
(** [create ()] makes a new key. Keys are expensive and
should never be allocated dynamically or in a loop. *)
(** [create ()] makes a new key. Keys are expensive and should never be
allocated dynamically or in a loop. *)
exception Not_set
val get_exn : 'a t -> 'a
(** [get k] gets the value for the current task for key [k].
Must be run from inside a task running on a runner.
(** [get k] gets the value for the current task for key [k]. Must be run from
inside a task running on a runner.
@raise Not_set otherwise *)
val get_opt : 'a t -> 'a option
(** [get_opt k] gets the current task's value for key [k],
or [None] if not run from inside the task. *)
(** [get_opt k] gets the current task's value for key [k], or [None] if not run
from inside the task. *)
val get : 'a t -> default:'a -> 'a
val set : 'a t -> 'a -> unit
(** [set k v] sets the storage for [k] to [v].
Must be run from inside a task running on a runner.
(** [set k v] sets the storage for [k] to [v]. Must be run from inside a task
running on a runner.
@raise Failure otherwise *)
val with_value : 'a t -> 'a -> (unit -> 'b) -> 'b
(** [with_value k v f] sets [k] to [v] for the duration of the call
to [f()]. When [f()] returns (or fails), [k] is restored
to its old value. *)
(** [with_value k v f] sets [k] to [v] for the duration of the call to [f()].
When [f()] returns (or fails), [k] is restored to its old value. *)
(** {2 Local [Hmap.t]}

View file

@ -1,7 +1,7 @@
(** Internal module that is used for workers.
A thread pool should use this [worker_loop] to run tasks,
handle effects, etc. *)
A thread pool should use this [worker_loop] to run tasks, handle effects,
etc. *)
open Types_

View file

@ -16,7 +16,8 @@ end
type state = {
id_: Id.t;
(** Unique to this pool. Used to make sure tasks stay within the same pool. *)
(** Unique to this pool. Used to make sure tasks stay within the same
pool. *)
active: bool A.t; (** Becomes [false] when the pool is shutdown. *)
mutable workers: worker_state array; (** Fixed set of workers. *)
main_q: WL.task_full Queue.t;
@ -43,9 +44,8 @@ and worker_state = {
q: WL.task_full WSQ.t; (** Work stealing queue *)
rng: Random.State.t;
}
(** State for a given worker. Only this worker is
allowed to push into the queue, but other workers
can come and steal from it if they're idle. *)
(** State for a given worker. Only this worker is allowed to push into the
queue, but other workers can come and steal from it if they're idle. *)
let[@inline] size_ (self : state) = Array.length self.workers
@ -55,9 +55,8 @@ let num_tasks_ (self : state) : int =
Array.iter (fun w -> n := !n + WSQ.size w.q) self.workers;
!n
(** TLS, used by worker to store their specific state
and be able to retrieve it from tasks when we schedule new
sub-tasks. *)
(** TLS, used by worker to store their specific state and be able to retrieve it
from tasks when we schedule new sub-tasks. *)
let k_worker_state : worker_state TLS.t = TLS.create ()
let[@inline] get_current_worker_ () : worker_state option =
@ -77,8 +76,8 @@ let[@inline] try_wake_someone_ (self : state) : unit =
Mutex.unlock self.mutex
)
(** Push into worker's local queue, open to work stealing.
precondition: this runs on the worker thread whose state is [self] *)
(** Push into worker's local queue, open to work stealing. precondition: this
runs on the worker thread whose state is [self] *)
let schedule_on_current_worker (self : worker_state) task : unit =
(* we're on this same pool, schedule in the worker's state. Otherwise
we might also be on pool A but asking to schedule on pool B,

View file

@ -1,23 +1,22 @@
(** Work-stealing thread pool.
A pool of threads with a worker-stealing scheduler.
The pool contains a fixed number of threads that wait for work
items to come, process these, and loop.
A pool of threads with a worker-stealing scheduler. The pool contains a
fixed number of threads that wait for work items to come, process these, and
loop.
This is good for CPU-intensive tasks that feature a lot of small tasks.
Note that tasks will not always be processed in the order they are
scheduled, so this is not great for workloads where the latency
of individual tasks matter (for that see {!Fifo_pool}).
This is good for CPU-intensive tasks that feature a lot of small tasks. Note
that tasks will not always be processed in the order they are scheduled, so
this is not great for workloads where the latency of individual tasks matter
(for that see {!Fifo_pool}).
This implements {!Runner.t} since 0.3.
If a pool is no longer needed, {!shutdown} can be used to signal all threads
in it to stop (after they finish their work), and wait for them to stop.
The threads are distributed across a fixed domain pool
(whose size is determined by {!Domain.recommended_domain_count} on OCaml 5,
and simply the single runtime on OCaml 4).
*)
The threads are distributed across a fixed domain pool (whose size is
determined by {!Domain.recommended_domain_count} on OCaml 5, and simply the
single runtime on OCaml 4). *)
include module type of Runner
@ -33,25 +32,26 @@ type ('a, 'b) create_args =
val create : (unit -> t, _) create_args
(** [create ()] makes a new thread pool.
@param on_init_thread called at the beginning of each new thread
in the pool.
@param num_threads size of the pool, ie. number of worker threads.
It will be at least [1] internally, so [0] or negative values make no sense.
The default is [Domain.recommended_domain_count()], ie one worker
thread per CPU core.
On OCaml 4 the default is [4] (since there is only one domain).
@param on_init_thread
called at the beginning of each new thread in the pool.
@param num_threads
size of the pool, ie. number of worker threads. It will be at least [1]
internally, so [0] or negative values make no sense. The default is
[Domain.recommended_domain_count()], ie one worker thread per CPU core. On
OCaml 4 the default is [4] (since there is only one domain).
@param on_exit_thread called at the end of each thread in the pool
@param around_task a pair of [before, after], where [before pool] is called
before a task is processed,
on the worker thread about to run it, and returns [x]; and [after pool x] is called by
the same thread after the task is over. (since 0.2)
@param name a name for this thread pool, used if tracing is enabled (since 0.6)
*)
@param around_task
a pair of [before, after], where [before pool] is called before a task is
processed, on the worker thread about to run it, and returns [x]; and
[after pool x] is called by the same thread after the task is over. (since
0.2)
@param name
a name for this thread pool, used if tracing is enabled (since 0.6) *)
val with_ : (unit -> (t -> 'a) -> 'a, _) create_args
(** [with_ () f] calls [f pool], where [pool] is obtained via {!create}.
When [f pool] returns or fails, [pool] is shutdown and its resources
are released.
(** [with_ () f] calls [f pool], where [pool] is obtained via {!create}. When
[f pool] returns or fails, [pool] is shutdown and its resources are
released.
Most parameters are the same as in {!create}.
@since 0.3 *)

View file

@ -76,10 +76,10 @@ type worker_state = {
(** Array of (optional) workers.
Workers are started/stop on demand. For each index we have
the (currently active) domain's state
including a work queue and a thread refcount; and the domain itself,
if any, in a separate option because it might outlive its own state. *)
Workers are started/stop on demand. For each index we have the (currently
active) domain's state including a work queue and a thread refcount; and the
domain itself, if any, in a separate option because it might outlive its own
state. *)
let domains_ : (worker_state option * Domain_.t option) Lock.t array =
let n = max 1 (Domain_.recommended_number ()) in
Array.init n (fun _ -> Lock.create (None, None))
@ -88,13 +88,12 @@ let domains_ : (worker_state option * Domain_.t option) Lock.t array =
A domain worker does two things:
- run functions it's asked to (mainly, to start new threads inside it)
- decrease the refcount when one of these threads stops. The thread
will notify the domain that it's exiting, so the domain can know
how many threads are still using it. If all threads exit, the domain
polls a bit (in case new threads are created really shortly after,
which happens with a [Pool.with_] or [Pool.create() Pool.shutdown()]
in a tight loop), and if nothing happens it tries to stop to free resources.
*)
- decrease the refcount when one of these threads stops. The thread will
notify the domain that it's exiting, so the domain can know how many
threads are still using it. If all threads exit, the domain polls a bit
(in case new threads are created really shortly after, which happens with
a [Pool.with_] or [Pool.create() Pool.shutdown()] in a tight loop), and
if nothing happens it tries to stop to free resources. *)
let work_ idx (st : worker_state) : unit =
let main_loop () =
let continue = ref true in

View file

@ -1,18 +1,17 @@
(** Static pool of domains.
These domains are shared between {b all} the pools in moonpool.
The rationale is that we should not have more domains than cores, so
it's easier to reserve exactly that many domain slots, and run more flexible
thread pools on top (each domain being shared by potentially multiple threads
from multiple pools).
These domains are shared between {b all} the pools in moonpool. The
rationale is that we should not have more domains than cores, so it's easier
to reserve exactly that many domain slots, and run more flexible thread
pools on top (each domain being shared by potentially multiple threads from
multiple pools).
The pool should not contain actual domains if it's not in use, ie if no
runner is presently actively using one or more of the domain slots.
{b NOTE}: Interface is still experimental.
@since 0.6
*)
@since 0.6 *)
type domain = Domain_.t
@ -24,13 +23,13 @@ val max_number_of_domains : unit -> int
Be very cautious with this interface, or resource leaks might occur. *)
val run_on : int -> (unit -> unit) -> unit
(** [run_on i f] runs [f()] on the domain with index [i].
Precondition: [0 <= i < n_domains()]. The thread must call {!decr_on}
with [i] once it's done. *)
(** [run_on i f] runs [f()] on the domain with index [i]. Precondition:
[0 <= i < n_domains()]. The thread must call {!decr_on} with [i] once it's
done. *)
val decr_on : int -> unit
(** Signal that a thread is stopping on the domain with index [i]. *)
val run_on_and_wait : int -> (unit -> 'a) -> 'a
(** [run_on_and_wait i f] runs [f()] on the domain with index [i],
and blocks until the result of [f()] is returned back. *)
(** [run_on_and_wait i f] runs [f()] on the domain with index [i], and blocks
until the result of [f()] is returned back. *)

View file

@ -187,8 +187,8 @@ let with_on_cancel (self : _ t) cb (k : unit -> 'a) : 'a =
let h = add_on_cancel self cb in
Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
(** Successfully resolve the fiber. This might still fail if
some children failed. *)
(** Successfully resolve the fiber. This might still fail if some children
failed. *)
let resolve_ok_ (self : 'a t) (r : 'a) : unit =
let r = A.make @@ Ok r in
let promise = prom_of_fut self.res in

View file

@ -1,13 +1,11 @@
(** Fibers.
A fiber is a lightweight computation that runs cooperatively
alongside other fibers. In the context of moonpool, fibers
have additional properties:
A fiber is a lightweight computation that runs cooperatively alongside other
fibers. In the context of moonpool, fibers have additional properties:
- they run in a moonpool runner
- they form a simple supervision tree, enabling a limited form
of structured concurrency
*)
- they form a simple supervision tree, enabling a limited form of structured
concurrency *)
type cancel_callback = Exn_bt.t -> unit
(** A callback used in case of cancellation *)
@ -26,8 +24,8 @@ module Private_ : sig
runner: Runner.t;
pfiber: pfiber;
}
(** Type definition, exposed so that {!any} can be unboxed.
Please do not rely on that. *)
(** Type definition, exposed so that {!any} can be unboxed. Please do not rely
on that. *)
type any = Any : _ t -> any [@@unboxed]
@ -58,8 +56,7 @@ val return : 'a -> 'a t
val fail : Exn_bt.t -> _ t
val self : unit -> any
(** [self ()] is the current fiber.
Must be run from inside a fiber.
(** [self ()] is the current fiber. Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *)
val peek : 'a t -> 'a Fut.or_error option
@ -78,16 +75,16 @@ val await : 'a t -> 'a
(** [await fib] is like [Fut.await (res fib)] *)
val wait_block_exn : 'a t -> 'a
(** [wait_block_exn fib] is [Fut.wait_block_exn (res fib)].
{b NOTE}: See {!Fut.wait_block} for warnings about deadlocks. *)
(** [wait_block_exn fib] is [Fut.wait_block_exn (res fib)]. {b NOTE}: See
{!Fut.wait_block} for warnings about deadlocks. *)
val wait_block : 'a t -> 'a Fut.or_error
(** [wait_block fib] is [Fut.wait_block (res fib)].
{b NOTE}: See {!Fut.wait_block} for warnings about deadlocks. *)
(** [wait_block fib] is [Fut.wait_block (res fib)]. {b NOTE}: See
{!Fut.wait_block} for warnings about deadlocks. *)
val check_if_cancelled : unit -> unit
(** Check if the current fiber is cancelled, in which case this raises.
Must be run from inside a fiber.
(** Check if the current fiber is cancelled, in which case this raises. Must be
run from inside a fiber.
@raise e if the current fiber is cancelled with exception [e]
@raise Failure if not run from a fiber. *)
@ -99,54 +96,53 @@ type cancel_handle
(** An opaque handle for a single cancel callback in a fiber *)
val add_on_cancel : _ t -> cancel_callback -> cancel_handle
(** [add_on_cancel fib cb] adds [cb] to the list of cancel callbacks
for [fib]. If [fib] is already cancelled, [cb] is called immediately. *)
(** [add_on_cancel fib cb] adds [cb] to the list of cancel callbacks for [fib].
If [fib] is already cancelled, [cb] is called immediately. *)
val remove_on_cancel : _ t -> cancel_handle -> unit
(** [remove_on_cancel fib h] removes the cancel callback
associated with handle [h]. *)
(** [remove_on_cancel fib h] removes the cancel callback associated with handle
[h]. *)
val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'a
(** [with_on_cancel fib cb (fun () -> <e>)] evaluates [e]
in a scope in which, if the fiber [fib] is cancelled,
[cb()] is called. If [e] returns without the fiber being cancelled,
this callback is removed. *)
(** [with_on_cancel fib cb (fun () -> <e>)] evaluates [e] in a scope in which,
if the fiber [fib] is cancelled, [cb()] is called. If [e] returns without
the fiber being cancelled, this callback is removed. *)
val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'a
(** [with_on_self_cancel cb f] calls [f()] in a scope where
[cb] is added to the cancel callbacks of the current fiber;
and [f()] terminates, [cb] is removed from the list. *)
(** [with_on_self_cancel cb f] calls [f()] in a scope where [cb] is added to the
cancel callbacks of the current fiber; and [f()] terminates, [cb] is removed
from the list. *)
val on_result : 'a t -> 'a callback -> unit
(** Wait for fiber to be done and call the callback
with the result. If the fiber is done already then the
callback is invoked immediately with its result. *)
(** Wait for fiber to be done and call the callback with the result. If the
fiber is done already then the callback is invoked immediately with its
result. *)
val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t
(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner.
This fiber is not the child of any other fiber: its lifetime
is only determined by the lifetime of [f()]. *)
(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. This
fiber is not the child of any other fiber: its lifetime is only determined
by the lifetime of [f()]. *)
val spawn : ?on:Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a t
(** [spawn ~protect f] spawns a sub-fiber [f_child]
from a running fiber [parent].
The sub-fiber [f_child] is attached to the current fiber and fails
(** [spawn ~protect f] spawns a sub-fiber [f_child] from a running fiber
[parent]. The sub-fiber [f_child] is attached to the current fiber and fails
if the current fiber [parent] fails.
@param on if provided, start the fiber on the given runner. If not
provided, use the parent's runner.
@param protect if true, when [f_child] fails, it does not
affect [parent]. If false, [f_child] failing also
causes [parent] to fail (and therefore all other children
of [parent]). Default is [true].
@param on
if provided, start the fiber on the given runner. If not provided, use the
parent's runner.
@param protect
if true, when [f_child] fails, it does not affect [parent]. If false,
[f_child] failing also causes [parent] to fail (and therefore all other
children of [parent]). Default is [true].
Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *)
val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit
(** [spawn_ignore f] is [ignore (spawn f)].
The fiber will still affect termination of the parent, ie. the
parent will exit only after this new fiber exits.
(** [spawn_ignore f] is [ignore (spawn f)]. The fiber will still affect
termination of the parent, ie. the parent will exit only after this new
fiber exits.
@param on the optional runner to use, added since 0.7 *)
val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit

View file

@ -1,18 +1,16 @@
(** Fiber-local storage.
This storage is associated to the current fiber,
just like thread-local storage is associated with
the current thread.
This storage is associated to the current fiber, just like thread-local
storage is associated with the current thread.
See {!Moonpool.Task_local_storage} for more general information, as
this is based on it.
See {!Moonpool.Task_local_storage} for more general information, as this is
based on it.
{b NOTE}: it's important to note that, while each fiber
has its own storage, spawning a sub-fiber [f2] from a fiber [f1]
will only do a shallow copy of the storage.
Values inside [f1]'s storage will be physically shared with [f2].
It is thus recommended to store only persistent values in the local storage.
*)
{b NOTE}: it's important to note that, while each fiber has its own storage,
spawning a sub-fiber [f2] from a fiber [f1] will only do a shallow copy of
the storage. Values inside [f1]'s storage will be physically shared with
[f2]. It is thus recommended to store only persistent values in the local
storage. *)
include module type of struct
include Task_local_storage

View file

@ -1,7 +1,7 @@
(** The unique name of a fiber.
Each fiber has a unique handle that can be used to
refer to it in maps or sets. *)
Each fiber has a unique handle that can be used to refer to it in maps or
sets. *)
type t = private int
(** Unique, opaque identifier for a fiber. *)

View file

@ -1,26 +1,25 @@
(** Main thread.
This is evolved from [Moonpool.Immediate_runner], but unlike it,
this API assumes you run it in a thread (possibly
the main thread) which will block until the initial computation is done.
This is evolved from [Moonpool.Immediate_runner], but unlike it, this API
assumes you run it in a thread (possibly the main thread) which will block
until the initial computation is done.
This means it's reasonable to use [Main.main (fun () -> do_everything)]
at the beginning of the program.
Other Moonpool pools can be created for background tasks, etc. to do the
heavy lifting, and the main thread (inside this immediate runner) can coordinate
tasks via [Fiber.await].
This means it's reasonable to use [Main.main (fun () -> do_everything)] at
the beginning of the program. Other Moonpool pools can be created for
background tasks, etc. to do the heavy lifting, and the main thread (inside
this immediate runner) can coordinate tasks via [Fiber.await].
Aside from the fact that this blocks the caller thread, it is fairly similar to
{!Background_thread} in that there's a single worker to process
Aside from the fact that this blocks the caller thread, it is fairly similar
to {!Background_thread} in that there's a single worker to process
tasks/fibers.
This handles effects, including the ones in {!Fiber}.
@since 0.6
*)
@since 0.6 *)
val main : (Moonpool.Runner.t -> 'a) -> 'a
(** [main f] runs [f()] in a scope that handles effects, including {!Fiber.await}.
(** [main f] runs [f()] in a scope that handles effects, including
{!Fiber.await}.
This scope can run background tasks as well, in a cooperative fashion. *)

View file

@ -5,20 +5,22 @@
@since 0.3 *)
val both : (unit -> 'a) -> (unit -> 'b) -> 'a * 'b
(** [both f g] runs [f()] and [g()], potentially in parallel,
and returns their result when both are done.
If any of [f()] and [g()] fails, then the whole computation fails.
(** [both f g] runs [f()] and [g()], potentially in parallel, and returns their
result when both are done. If any of [f()] and [g()] fails, then the whole
computation fails.
This must be run from within the pool: for example, inside {!Pool.run}
or inside a {!Fut.spawn} computation.
This is because it relies on an effect handler to be installed.
This must be run from within the pool: for example, inside {!Pool.run} or
inside a {!Fut.spawn} computation. This is because it relies on an effect
handler to be installed.
@since 0.3
{b NOTE} this is only available on OCaml 5. *)
val both_ignore : (unit -> _) -> (unit -> _) -> unit
(** Same as [both f g |> ignore].
@since 0.3
{b NOTE} this is only available on OCaml 5. *)
val for_ : ?chunk_size:int -> int -> (int -> int -> unit) -> unit
@ -63,43 +65,49 @@ val for_ : ?chunk_size:int -> int -> (int -> int -> unit) -> unit
{b NOTE} this is only available on OCaml 5. *)
val all_array : ?chunk_size:int -> (unit -> 'a) array -> 'a array
(** [all_array fs] runs all functions in [fs] in tasks, and waits for
all the results.
(** [all_array fs] runs all functions in [fs] in tasks, and waits for all the
results.
@param chunk_size if equal to [n], groups items by [n] to be run in
a single task. Default is [1].
@param chunk_size
if equal to [n], groups items by [n] to be run in a single task. Default
is [1].
@since 0.3
{b NOTE} this is only available on OCaml 5. *)
val all_list : ?chunk_size:int -> (unit -> 'a) list -> 'a list
(** [all_list fs] runs all functions in [fs] in tasks, and waits for
all the results.
(** [all_list fs] runs all functions in [fs] in tasks, and waits for all the
results.
@param chunk_size if equal to [n], groups items by [n] to be run in
a single task. Default is not specified.
This parameter is available since 0.3.
@param chunk_size
if equal to [n], groups items by [n] to be run in a single task. Default
is not specified. This parameter is available since 0.3.
@since 0.3
{b NOTE} this is only available on OCaml 5. *)
val all_init : ?chunk_size:int -> int -> (int -> 'a) -> 'a list
(** [all_init n f] runs functions [f 0], [f 1], … [f (n-1)] in tasks, and waits for
all the results.
(** [all_init n f] runs functions [f 0], [f 1], … [f (n-1)] in tasks, and waits
for all the results.
@param chunk_size if equal to [n], groups items by [n] to be run in
a single task. Default is not specified.
This parameter is available since 0.3.
@param chunk_size
if equal to [n], groups items by [n] to be run in a single task. Default
is not specified. This parameter is available since 0.3.
@since 0.3
{b NOTE} this is only available on OCaml 5. *)
val map_array : ?chunk_size:int -> ('a -> 'b) -> 'a array -> 'b array
(** [map_array f arr] is like [Array.map f arr], but runs in parallel.
@since 0.3
{b NOTE} this is only available on OCaml 5. *)
val map_list : ?chunk_size:int -> ('a -> 'b) -> 'a list -> 'b list
(** [map_list f l] is like [List.map f l], but runs in parallel.
@since 0.3
{b NOTE} this is only available on OCaml 5. *)

View file

@ -2,8 +2,7 @@ open Common_
class type t = object
method input : bytes -> int -> int -> int
(** Read into the slice. Returns [0] only if the
stream is closed. *)
(** Read into the slice. Returns [0] only if the stream is closed. *)
method close : unit -> unit
(** Close the input. Must be idempotent. *)

View file

@ -1,8 +1,7 @@
(** Lwt_engine-based event loop for Moonpool.
In what follows, we mean by "lwt thread" the thread
running [Lwt_main.run] (so, the thread where the Lwt event
loop and all Lwt callbacks execute).
In what follows, we mean by "lwt thread" the thread running [Lwt_main.run]
(so, the thread where the Lwt event loop and all Lwt callbacks execute).
{b NOTE}: this is experimental and might change in future versions.
@ -14,53 +13,50 @@ module FLS = Moonpool_fib.Fls
(** {2 Basic conversions} *)
val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t
(** [fut_of_lwt lwt_fut] makes a thread-safe moonpool future that
completes when [lwt_fut] does. This must be run from within
the Lwt thread. *)
(** [fut_of_lwt lwt_fut] makes a thread-safe moonpool future that completes when
[lwt_fut] does. This must be run from within the Lwt thread. *)
val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t
(** [lwt_of_fut fut] makes a lwt future that completes when
[fut] does. This must be called from the Lwt thread, and the result
must always be used only from inside the Lwt thread. *)
(** [lwt_of_fut fut] makes a lwt future that completes when [fut] does. This
must be called from the Lwt thread, and the result must always be used only
from inside the Lwt thread. *)
(** {2 Helpers on the moonpool side} *)
val await_lwt : 'a Lwt.t -> 'a
(** [await_lwt fut] awaits a Lwt future from inside a task running on
a moonpool runner. This must be run from within a Moonpool runner
so that the await-ing effect is handled. *)
(** [await_lwt fut] awaits a Lwt future from inside a task running on a moonpool
runner. This must be run from within a Moonpool runner so that the await-ing
effect is handled. *)
val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t
(** [run_in_lwt f] runs [f()] from within the Lwt thread
and returns a thread-safe future. This can be run from anywhere. *)
(** [run_in_lwt f] runs [f()] from within the Lwt thread and returns a
thread-safe future. This can be run from anywhere. *)
val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a
(** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and
awaits its result. Must be run from inside a moonpool runner
so that the await-in effect is handled.
(** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and awaits its result.
Must be run from inside a moonpool runner so that the await-in effect is
handled.
This is similar to [Moonpool.await @@ run_in_lwt f]. *)
val get_runner : unit -> Moonpool.Runner.t
(** Returns the runner from within which this is called.
Must be run from within a fiber.
(** Returns the runner from within which this is called. Must be run from within
a fiber.
@raise Failure if not run within a fiber *)
(** {2 IO} *)
(** IO using the Lwt event loop.
These IO operations work on non-blocking file descriptors
and rely on a [Lwt_engine] event loop being active (meaning,
[Lwt_main.run] is currently running in some thread).
These IO operations work on non-blocking file descriptors and rely on a
[Lwt_engine] event loop being active (meaning, [Lwt_main.run] is currently
running in some thread).
Calling these functions must be done from a moonpool runner.
A function like [read] will first try to perform the IO action
directly (here, call {!Unix.read}); if the action fails because
the FD is not ready, then [await_readable] is called:
it suspends the fiber and subscribes it to Lwt to be awakened
when the FD becomes ready.
*)
Calling these functions must be done from a moonpool runner. A function like
[read] will first try to perform the IO action directly (here, call
{!Unix.read}); if the action fails because the FD is not ready, then
[await_readable] is called: it suspends the fiber and subscribes it to Lwt
to be awakened when the FD becomes ready. *)
module IO : sig
val read : Unix.file_descr -> bytes -> int -> int -> int
(** Read from the file descriptor *)
@ -91,27 +87,29 @@ module TCP_server : sig
type t = Lwt_io.server
val establish_lwt :
?backlog:(* ?server_fd:Unix.file_descr -> *)
?backlog:
(* ?server_fd:Unix.file_descr -> *)
int ->
?no_close:bool ->
runner:Moonpool.Runner.t ->
Unix.sockaddr ->
(Unix.sockaddr -> Lwt_io.input_channel -> Lwt_io.output_channel -> unit) ->
t
(** [establish ~runner addr handler] runs a TCP server in the Lwt
thread. When a client connects, a moonpool fiber is started on [runner]
to handle it. *)
(** [establish ~runner addr handler] runs a TCP server in the Lwt thread. When
a client connects, a moonpool fiber is started on [runner] to handle it.
*)
val establish :
?backlog:(* ?server_fd:Unix.file_descr -> *)
?backlog:
(* ?server_fd:Unix.file_descr -> *)
int ->
?no_close:bool ->
runner:Moonpool.Runner.t ->
Unix.sockaddr ->
(Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) ->
t
(** Like {!establish_lwt} but uses {!IO} to directly handle
reads and writes on client sockets. *)
(** Like {!establish_lwt} but uses {!IO} to directly handle reads and writes
on client sockets. *)
val shutdown : t -> unit
(** Shutdown the server *)
@ -121,8 +119,8 @@ module TCP_client : sig
val connect : Unix.sockaddr -> Unix.file_descr
val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a
(** Open a connection, and use {!IO} to read and write from
the socket in a non blocking way. *)
(** Open a connection, and use {!IO} to read and write from the socket in a
non blocking way. *)
val with_connect_lwt :
Unix.sockaddr -> (Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) -> 'a
@ -132,15 +130,15 @@ end
(** {2 Helpers on the lwt side} *)
val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t
(** [detach_in_runner ~runner f] runs [f] in the given moonpool runner,
and returns a lwt future. This must be run from within the thread
running [Lwt_main]. *)
(** [detach_in_runner ~runner f] runs [f] in the given moonpool runner, and
returns a lwt future. This must be run from within the thread running
[Lwt_main]. *)
(** {2 Wrappers around Lwt_main} *)
val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a
(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside
a fiber in [runner]. *)
(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()]
inside a fiber in [runner]. *)
val main : (unit -> 'a) -> 'a
(** Like {!main_with_runner} but with a default choice of runner. *)

View file

@ -2,10 +2,9 @@
Adapted from "Dynamic circular work stealing deque", Chase & Lev.
However note that this one is not dynamic in the sense that there
is no resizing. Instead we return [false] when [push] fails, which
keeps the implementation fairly lightweight.
*)
However note that this one is not dynamic in the sense that there is no
resizing. Instead we return [false] when [push] fails, which keeps the
implementation fairly lightweight. *)
type 'a t
(** Deque containing values of type ['a] *)
@ -14,12 +13,12 @@ val create : dummy:'a -> unit -> 'a t
(** Create a new deque. *)
val push : 'a t -> 'a -> bool
(** Push value at the bottom of deque. returns [true] if it succeeds.
This must be called only by the owner thread. *)
(** Push value at the bottom of deque. returns [true] if it succeeds. This must
be called only by the owner thread. *)
val pop : 'a t -> 'a option
(** Pop value from the bottom of deque.
This must be called only by the owner thread. *)
(** Pop value from the bottom of deque. This must be called only by the owner
thread. *)
exception Empty

View file

@ -1,8 +1,8 @@
(** Mutex-protected resource.
This lock is a synchronous concurrency primitive, as a thin wrapper
around {!Mutex} that encourages proper management of the critical
section in RAII style:
This lock is a synchronous concurrency primitive, as a thin wrapper around
{!Mutex} that encourages proper management of the critical section in RAII
style:
{[
let (let@) = (@@)
@ -30,27 +30,27 @@ val create : 'a -> 'a t
(** Create a new protected value. *)
val with_ : 'a t -> ('a -> 'b) -> 'b
(** [with_ l f] runs [f x] where [x] is the value protected with
the lock [l], in a critical section. If [f x] fails, [with_lock l f]
fails too but the lock is released. *)
(** [with_ l f] runs [f x] where [x] is the value protected with the lock [l],
in a critical section. If [f x] fails, [with_lock l f] fails too but the
lock is released. *)
val update : 'a t -> ('a -> 'a) -> unit
(** [update l f] replaces the content [x] of [l] with [f x], while protected
by the mutex. *)
(** [update l f] replaces the content [x] of [l] with [f x], while protected by
the mutex. *)
val update_map : 'a t -> ('a -> 'a * 'b) -> 'b
(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l]
and returns [y], while protected by the mutex. *)
(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l] and
returns [y], while protected by the mutex. *)
val mutex : _ t -> Picos_std_sync.Mutex.t
(** Underlying mutex. *)
val get : 'a t -> 'a
(** Atomically get the value in the lock. The value that is returned
isn't protected! *)
(** Atomically get the value in the lock. The value that is returned isn't
protected! *)
val set : 'a t -> 'a -> unit
(** Atomically set the value.
{b NOTE} caution: using {!get} and {!set} as if this were a {!ref}
is an anti pattern and will not protect data against some race conditions. *)
{b NOTE} caution: using {!get} and {!set} as if this were a {!ref} is an
anti pattern and will not protect data against some race conditions. *)