diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index fd46a7c8..894b13ba 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -78,7 +78,7 @@ jobs: strategy: matrix: ocaml-compiler: - - '5.2' + - '5.3' runs-on: 'ubuntu-latest' steps: - uses: actions/checkout@main @@ -89,6 +89,6 @@ jobs: dune-cache: true allow-prerelease-opam: true - - run: opam install ocamlformat.0.26.2 + - run: opam install ocamlformat.0.27.0 - run: opam exec -- make format-check diff --git a/.ocamlformat b/.ocamlformat index 78183459..f33f722c 100644 --- a/.ocamlformat +++ b/.ocamlformat @@ -1,4 +1,4 @@ -version = 0.26.2 +version = 0.27.0 profile=conventional margin=80 if-then-else=k-r diff --git a/examples/discuss1.ml b/examples/discuss1.ml index 7554335e..3eaf47b4 100644 --- a/examples/discuss1.ml +++ b/examples/discuss1.ml @@ -1,4 +1,5 @@ -(** Example from https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 *) +(** Example from + https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 *) let ( let@ ) = ( @@ ) diff --git a/src/core/background_thread.mli b/src/core/background_thread.mli index ea23fabd..33ef42f0 100644 --- a/src/core/background_thread.mli +++ b/src/core/background_thread.mli @@ -1,13 +1,11 @@ (** A simple runner with a single background thread. - Because this is guaranteed to have a single worker thread, - tasks scheduled in this runner always run asynchronously but - in a sequential fashion. + Because this is guaranteed to have a single worker thread, tasks scheduled + in this runner always run asynchronously but in a sequential fashion. This is similar to {!Fifo_pool} with exactly one thread. - @since 0.6 -*) + @since 0.6 *) include module type of Runner diff --git a/src/core/bb_queue.mli b/src/core/bb_queue.mli index ba4d0fc5..de765918 100644 --- a/src/core/bb_queue.mli +++ b/src/core/bb_queue.mli @@ -16,48 +16,45 @@ val size : _ t -> int val pop : 'a t -> 'a (** [pop q] pops the next element in [q]. It might block until an element comes. - @raise Closed if the queue was closed before a new element was available. *) + @raise Closed if the queue was closed before a new element was available. *) val try_pop : force_lock:bool -> 'a t -> 'a option -(** [try_pop q] immediately pops the first element of [q], if any, - or returns [None] without blocking. - @param force_lock if true, use {!Mutex.lock} (which can block under contention); - if false, use {!Mutex.try_lock}, which might return [None] even in - presence of an element if there's contention *) +(** [try_pop q] immediately pops the first element of [q], if any, or returns + [None] without blocking. + @param force_lock + if true, use {!Mutex.lock} (which can block under contention); if false, + use {!Mutex.try_lock}, which might return [None] even in presence of an + element if there's contention *) val try_push : 'a t -> 'a -> bool -(** [try_push q x] tries to push into [q], in which case - it returns [true]; or it fails to push and returns [false] - without blocking. - @raise Closed if the locking succeeded but the queue is closed. -*) +(** [try_push q x] tries to push into [q], in which case it returns [true]; or + it fails to push and returns [false] without blocking. + @raise Closed if the locking succeeded but the queue is closed. *) val transfer : 'a t -> 'a Queue.t -> unit -(** [transfer bq q2] transfers all items presently - in [bq] into [q2] in one atomic section, and clears [bq]. - It blocks if no element is in [bq]. - - This is useful to consume elements from the queue in batch. - Create a [Queue.t] locally: +(** [transfer bq q2] transfers all items presently in [bq] into [q2] in one + atomic section, and clears [bq]. It blocks if no element is in [bq]. + This is useful to consume elements from the queue in batch. Create a + [Queue.t] locally: {[ - let dowork (work_queue: job Bb_queue.t) = - (* local queue, not thread safe *) - let local_q = Queue.create() in - try - while true do - (* work on local events, already on this thread *) - while not (Queue.is_empty local_q) do - let job = Queue.pop local_q in - process_job job - done; + let dowork (work_queue : job Bb_queue.t) = + (* local queue, not thread safe *) + let local_q = Queue.create () in + try + while true do + (* work on local events, already on this thread *) + while not (Queue.is_empty local_q) do + let job = Queue.pop local_q in + process_job job + done; - (* get all the events in the incoming blocking queue, in - one single critical section. *) - Bb_queue.transfer work_queue local_q - done - with Bb_queue.Closed -> () + (* get all the events in the incoming blocking queue, in + one single critical section. *) + Bb_queue.transfer work_queue local_q + done + with Bb_queue.Closed -> () ]} @since 0.4 *) @@ -69,8 +66,8 @@ type 'a gen = unit -> 'a option type 'a iter = ('a -> unit) -> unit val to_iter : 'a t -> 'a iter -(** [to_iter q] returns an iterator over all items in the queue. - This might not terminate if [q] is never closed. +(** [to_iter q] returns an iterator over all items in the queue. This might not + terminate if [q] is never closed. @since 0.4 *) val to_gen : 'a t -> 'a gen diff --git a/src/core/bounded_queue.mli b/src/core/bounded_queue.mli index c5d46df6..165f7681 100644 --- a/src/core/bounded_queue.mli +++ b/src/core/bounded_queue.mli @@ -1,15 +1,13 @@ (** A blocking queue of finite size. - This queue, while still using locks underneath - (like the regular blocking queue) should be enough for - usage under reasonable contention. + This queue, while still using locks underneath (like the regular blocking + queue) should be enough for usage under reasonable contention. - The bounded size is helpful whenever some form of backpressure is - desirable: if the queue is used to communicate between producer(s) - and consumer(s), the consumer(s) can limit the rate at which - producer(s) send new work down their way. - Whenever the queue is full, means that producer(s) will have to - wait before pushing new work. + The bounded size is helpful whenever some form of backpressure is desirable: + if the queue is used to communicate between producer(s) and consumer(s), the + consumer(s) can limit the rate at which producer(s) send new work down their + way. Whenever the queue is full, means that producer(s) will have to wait + before pushing new work. @since 0.4 *) @@ -19,42 +17,41 @@ type 'a t val create : max_size:int -> unit -> 'a t val close : _ t -> unit -(** [close q] closes [q]. No new elements can be pushed into [q], - and after all the elements still in [q] currently are [pop]'d, - {!pop} will also raise {!Closed}. *) +(** [close q] closes [q]. No new elements can be pushed into [q], and after all + the elements still in [q] currently are [pop]'d, {!pop} will also raise + {!Closed}. *) exception Closed val push : 'a t -> 'a -> unit -(** [push q x] pushes [x] at the end of the queue. - If [q] is full, this will block until there is - room for [x]. +(** [push q x] pushes [x] at the end of the queue. If [q] is full, this will + block until there is room for [x]. @raise Closed if [q] is closed. *) val try_push : force_lock:bool -> 'a t -> 'a -> bool -(** [try_push q x] attempts to push [x] into [q], but abandons - if it cannot acquire [q] or if [q] is full. +(** [try_push q x] attempts to push [x] into [q], but abandons if it cannot + acquire [q] or if [q] is full. - @param force_lock if true, use {!Mutex.lock} (which can block - under contention); - if false, use {!Mutex.try_lock}, which might return [false] even - if there's room in the queue. + @param force_lock + if true, use {!Mutex.lock} (which can block under contention); if false, + use {!Mutex.try_lock}, which might return [false] even if there's room in + the queue. @raise Closed if [q] is closed. *) val pop : 'a t -> 'a -(** [pop q] pops the first element off [q]. It blocks if [q] - is empty, until some element becomes available. +(** [pop q] pops the first element off [q]. It blocks if [q] is empty, until + some element becomes available. @raise Closed if [q] is empty and closed. *) val try_pop : force_lock:bool -> 'a t -> 'a option -(** [try_pop ~force_lock q] tries to pop the first element, or returns [None] - if no element is available or if it failed to acquire [q]. +(** [try_pop ~force_lock q] tries to pop the first element, or returns [None] if + no element is available or if it failed to acquire [q]. - @param force_lock if true, use {!Mutex.lock} (which can block - under contention); - if false, use {!Mutex.try_lock}, which might return [None] even in - presence of an element if there's contention. + @param force_lock + if true, use {!Mutex.lock} (which can block under contention); if false, + use {!Mutex.try_lock}, which might return [None] even in presence of an + element if there's contention. @raise Closed if [q] is empty and closed. *) @@ -65,9 +62,8 @@ val max_size : _ t -> int (** Maximum size of the queue. See {!create}. *) val transfer : 'a t -> 'a Queue.t -> unit -(** [transfer bq q2] transfers all elements currently available - in [bq] into local queue [q2], and clears [bq], atomically. - It blocks if [bq] is empty. +(** [transfer bq q2] transfers all elements currently available in [bq] into + local queue [q2], and clears [bq], atomically. It blocks if [bq] is empty. See {!Bb_queue.transfer} for more details. @raise Closed if [bq] is empty and closed. *) @@ -76,8 +72,8 @@ type 'a gen = unit -> 'a option type 'a iter = ('a -> unit) -> unit val to_iter : 'a t -> 'a iter -(** [to_iter q] returns an iterator over all items in the queue. - This might not terminate if [q] is never closed. *) +(** [to_iter q] returns an iterator over all items in the queue. This might not + terminate if [q] is never closed. *) val to_gen : 'a t -> 'a gen (** [to_gen q] returns a generator from the queue. *) diff --git a/src/core/chan.mli b/src/core/chan.mli index 3f3444ef..a4898069 100644 --- a/src/core/chan.mli +++ b/src/core/chan.mli @@ -15,31 +15,30 @@ val create : max_size:int -> unit -> 'a t exception Closed val try_push : 'a t -> 'a -> bool -(** [try_push chan x] pushes [x] into [chan]. This does not block. - Returns [true] if it succeeded in pushing. +(** [try_push chan x] pushes [x] into [chan]. This does not block. Returns + [true] if it succeeded in pushing. @raise Closed if the channel is closed. *) val try_pop : 'a t -> 'a option -(** [try_pop chan] pops and return an element if one is available - immediately. Otherwise it returns [None]. - @raise Closed if the channel is closed and empty. - *) +(** [try_pop chan] pops and return an element if one is available immediately. + Otherwise it returns [None]. + @raise Closed if the channel is closed and empty. *) val close : _ t -> unit -(** Close the channel. Further push and pop calls will fail. - This is idempotent. *) +(** Close the channel. Further push and pop calls will fail. This is idempotent. +*) [@@@ifge 5.0] val push : 'a t -> 'a -> unit -(** Push the value into the channel, suspending the current task - if the channel is currently full. +(** Push the value into the channel, suspending the current task if the channel + is currently full. @raise Closed if the channel is closed @since 0.7 *) val pop : 'a t -> 'a -(** Pop an element. This might suspend the current task if the - channel is currently empty. +(** Pop an element. This might suspend the current task if the channel is + currently empty. @raise Closed if the channel is empty and closed. @since 0.7 *) diff --git a/src/core/fifo_pool.mli b/src/core/fifo_pool.mli index 11ba4ed5..018b512e 100644 --- a/src/core/fifo_pool.mli +++ b/src/core/fifo_pool.mli @@ -1,16 +1,16 @@ (** A simple thread pool in FIFO order. - FIFO: first-in, first-out. Basically tasks are put into a queue, - and worker threads pull them out of the queue at the other end. + FIFO: first-in, first-out. Basically tasks are put into a queue, and worker + threads pull them out of the queue at the other end. - Since this uses a single blocking queue to manage tasks, it's very - simple and reliable. The number of worker threads is fixed, but - they are spread over several domains to enable parallelism. + Since this uses a single blocking queue to manage tasks, it's very simple + and reliable. The number of worker threads is fixed, but they are spread + over several domains to enable parallelism. - This can be useful for latency-sensitive applications (e.g. as a - pool of workers for network servers). Work-stealing pools might - have higher throughput but they're very unfair to some tasks; by - contrast, here, older tasks have priority over younger tasks. + This can be useful for latency-sensitive applications (e.g. as a pool of + workers for network servers). Work-stealing pools might have higher + throughput but they're very unfair to some tasks; by contrast, here, older + tasks have priority over younger tasks. @since 0.5 *) @@ -28,22 +28,22 @@ type ('a, 'b) create_args = val create : (unit -> t, _) create_args (** [create ()] makes a new thread pool. - @param on_init_thread called at the beginning of each new thread in the pool. - @param min minimum size of the pool. See {!Pool.create_args}. - The default is [Domain.recommended_domain_count()], ie one worker per - CPU core. - On OCaml 4 the default is [4] (since there is only one domain). - @param on_exit_thread called at the end of each worker thread in the pool. - @param around_task a pair of [before, after] functions - ran around each task. See {!Pool.create_args}. - @param name name for the pool, used in tracing (since 0.6) - *) + @param on_init_thread + called at the beginning of each new thread in the pool. + @param min + minimum size of the pool. See {!Pool.create_args}. The default is + [Domain.recommended_domain_count()], ie one worker per CPU core. On OCaml + 4 the default is [4] (since there is only one domain). + @param on_exit_thread called at the end of each worker thread in the pool. + @param around_task + a pair of [before, after] functions ran around each task. See + {!Pool.create_args}. + @param name name for the pool, used in tracing (since 0.6) *) val with_ : (unit -> (t -> 'a) -> 'a, _) create_args -(** [with_ () f] calls [f pool], where [pool] is obtained via {!create}. - When [f pool] returns or fails, [pool] is shutdown and its resources - are released. - Most parameters are the same as in {!create}. *) +(** [with_ () f] calls [f pool], where [pool] is obtained via {!create}. When + [f pool] returns or fails, [pool] is shutdown and its resources are + released. Most parameters are the same as in {!create}. *) (**/**) diff --git a/src/core/fut.mli b/src/core/fut.mli index c56af470..6d667376 100644 --- a/src/core/fut.mli +++ b/src/core/fut.mli @@ -1,21 +1,19 @@ (** Futures. - A future of type ['a t] represents the result of a computation - that will yield a value of type ['a]. + A future of type ['a t] represents the result of a computation that will + yield a value of type ['a]. - Typically, the computation is running on a thread pool {!Runner.t} - and will proceed on some worker. Once set, a future cannot change. - It either succeeds (storing a [Ok x] with [x: 'a]), or fail - (storing a [Error (exn, bt)] with an exception and the corresponding - backtrace). + Typically, the computation is running on a thread pool {!Runner.t} and will + proceed on some worker. Once set, a future cannot change. It either succeeds + (storing a [Ok x] with [x: 'a]), or fail (storing a [Error (exn, bt)] with + an exception and the corresponding backtrace). - Combinators such as {!map} and {!join_array} can be used to produce - futures from other futures (in a monadic way). Some combinators take - a [on] argument to specify a runner on which the intermediate computation takes - place; for example [map ~on:pool ~f fut] maps the value in [fut] - using function [f], applicatively; the call to [f] happens on - the runner [pool] (once [fut] resolves successfully with a value). -*) + Combinators such as {!map} and {!join_array} can be used to produce futures + from other futures (in a monadic way). Some combinators take a [on] argument + to specify a runner on which the intermediate computation takes place; for + example [map ~on:pool ~f fut] maps the value in [fut] using function [f], + applicatively; the call to [f] happens on the runner [pool] (once [fut] + resolves successfully with a value). *) type 'a or_error = ('a, Exn_bt.t) result @@ -23,9 +21,9 @@ type 'a t = 'a Picos.Computation.t (** A future with a result of type ['a]. *) type 'a promise = private 'a t -(** A promise, which can be fulfilled exactly once to set - the corresponding future. - This is a private alias of ['a t] since 0.7, previously it was opaque. *) +(** A promise, which can be fulfilled exactly once to set the corresponding + future. This is a private alias of ['a t] since 0.7, previously it was + opaque. *) val make : unit -> 'a t * 'a promise (** Make a new future with the associated promise. *) @@ -34,33 +32,32 @@ val make_promise : unit -> 'a promise (** Same as {!make} but returns a single promise (which can be upcast to a future). This is useful mostly to preserve memory. - How to upcast to a future in the worst case: - {[let prom = Fut.make_promise();; - let fut = (prom : _ Fut.promise :> _ Fut.t) ;; - ]} - @since 0.7 *) + How to upcast to a future in the worst case: + {[ + let prom = Fut.make_promise () + let fut = (prom : _ Fut.promise :> _ Fut.t) + ]} + @since 0.7 *) val on_result : 'a t -> ('a or_error -> unit) -> unit -(** [on_result fut f] registers [f] to be called in the future - when [fut] is set ; - or calls [f] immediately if [fut] is already set. *) +(** [on_result fut f] registers [f] to be called in the future when [fut] is set + ; or calls [f] immediately if [fut] is already set. *) val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit -(** [on_result_ignore fut f] registers [f] to be called in the future - when [fut] is set; - or calls [f] immediately if [fut] is already set. - It does not pass the result, only a success/error signal. +(** [on_result_ignore fut f] registers [f] to be called in the future when [fut] + is set; or calls [f] immediately if [fut] is already set. It does not pass + the result, only a success/error signal. @since 0.7 *) exception Already_fulfilled val fulfill : 'a promise -> 'a or_error -> unit (** Fullfill the promise, setting the future at the same time. - @raise Already_fulfilled if the promise is already fulfilled. *) + @raise Already_fulfilled if the promise is already fulfilled. *) val fulfill_idempotent : 'a promise -> 'a or_error -> unit -(** Fullfill the promise, setting the future at the same time. - Does nothing if the promise is already fulfilled. *) +(** Fullfill the promise, setting the future at the same time. Does nothing if + the promise is already fulfilled. *) val return : 'a -> 'a t (** Already settled future, with a result *) @@ -78,22 +75,22 @@ val is_resolved : _ t -> bool (** [is_resolved fut] is [true] iff [fut] is resolved. *) val peek : 'a t -> 'a or_error option -(** [peek fut] returns [Some r] if [fut] is currently resolved with [r], - and [None] if [fut] is not resolved yet. *) +(** [peek fut] returns [Some r] if [fut] is currently resolved with [r], and + [None] if [fut] is not resolved yet. *) exception Not_ready (** @since 0.2 *) val get_or_fail : 'a t -> 'a or_error -(** [get_or_fail fut] obtains the result from [fut] if it's fulfilled - (i.e. if [peek fut] returns [Some res], [get_or_fail fut] returns [res]). +(** [get_or_fail fut] obtains the result from [fut] if it's fulfilled (i.e. if + [peek fut] returns [Some res], [get_or_fail fut] returns [res]). @raise Not_ready if the future is not ready. @since 0.2 *) val get_or_fail_exn : 'a t -> 'a -(** [get_or_fail_exn fut] obtains the result from [fut] if it's fulfilled, - like {!get_or_fail}. If the result is an [Error _], the exception inside - is re-raised. +(** [get_or_fail_exn fut] obtains the result from [fut] if it's fulfilled, like + {!get_or_fail}. If the result is an [Error _], the exception inside is + re-raised. @raise Not_ready if the future is not ready. @since 0.2 *) @@ -116,12 +113,12 @@ val raise_if_failed : _ t -> unit (** {2 Combinators} *) val spawn : on:Runner.t -> (unit -> 'a) -> 'a t -(** [spaw ~on f] runs [f()] on the given runner [on], and return a future that will - hold its result. *) +(** [spaw ~on f] runs [f()] on the given runner [on], and return a future that + will hold its result. *) val spawn_on_current_runner : (unit -> 'a) -> 'a t -(** This must be run from inside a runner, and schedules - the new task on it as well. +(** This must be run from inside a runner, and schedules the new task on it as + well. See {!Runner.get_current_runner} to see how the runner is found. @@ -129,28 +126,26 @@ val spawn_on_current_runner : (unit -> 'a) -> 'a t @raise Failure if run from outside a runner. *) val reify_error : 'a t -> 'a or_error t -(** [reify_error fut] turns a failing future into a non-failing - one that contain [Error (exn, bt)]. A non-failing future - returning [x] is turned into [Ok x] +(** [reify_error fut] turns a failing future into a non-failing one that contain + [Error (exn, bt)]. A non-failing future returning [x] is turned into [Ok x] @since 0.4 *) val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t -(** [map ?on ~f fut] returns a new future [fut2] that resolves - with [f x] if [fut] resolved with [x]; - and fails with [e] if [fut] fails with [e] or [f x] raises [e]. - @param on if provided, [f] runs on the given runner *) +(** [map ?on ~f fut] returns a new future [fut2] that resolves with [f x] if + [fut] resolved with [x]; and fails with [e] if [fut] fails with [e] or [f x] + raises [e]. + @param on if provided, [f] runs on the given runner *) val bind : ?on:Runner.t -> f:('a -> 'b t) -> 'a t -> 'b t -(** [bind ?on ~f fut] returns a new future [fut2] that resolves - like the future [f x] if [fut] resolved with [x]; - and fails with [e] if [fut] fails with [e] or [f x] raises [e]. - @param on if provided, [f] runs on the given runner *) +(** [bind ?on ~f fut] returns a new future [fut2] that resolves like the future + [f x] if [fut] resolved with [x]; and fails with [e] if [fut] fails with [e] + or [f x] raises [e]. + @param on if provided, [f] runs on the given runner *) val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t -(** [bind_reify_error ?on ~f fut] returns a new future [fut2] that resolves - like the future [f (Ok x)] if [fut] resolved with [x]; - and resolves like the future [f (Error (exn, bt))] - if [fut] fails with [exn] and backtrace [bt]. +(** [bind_reify_error ?on ~f fut] returns a new future [fut2] that resolves like + the future [f (Ok x)] if [fut] resolved with [x]; and resolves like the + future [f (Error (exn, bt))] if [fut] fails with [exn] and backtrace [bt]. @param on if provided, [f] runs on the given runner @since 0.4 *) @@ -159,18 +154,18 @@ val join : 'a t t -> 'a t @since 0.2 *) val both : 'a t -> 'b t -> ('a * 'b) t -(** [both a b] succeeds with [x, y] if [a] succeeds with [x] and - [b] succeeds with [y], or fails if any of them fails. *) +(** [both a b] succeeds with [x, y] if [a] succeeds with [x] and [b] succeeds + with [y], or fails if any of them fails. *) val choose : 'a t -> 'b t -> ('a, 'b) Either.t t -(** [choose a b] succeeds [Left x] or [Right y] if [a] succeeds with [x] or - [b] succeeds with [y], or fails if both of them fails. - If they both succeed, it is not specified which result is used. *) +(** [choose a b] succeeds [Left x] or [Right y] if [a] succeeds with [x] or [b] + succeeds with [y], or fails if both of them fails. If they both succeed, it + is not specified which result is used. *) val choose_same : 'a t -> 'a t -> 'a t -(** [choose_same a b] succeeds with the value of one of [a] or [b] if - they succeed, or fails if both fail. - If they both succeed, it is not specified which result is used. *) +(** [choose_same a b] succeeds with the value of one of [a] or [b] if they + succeed, or fails if both fail. If they both succeed, it is not specified + which result is used. *) val join_array : 'a t array -> 'a array t (** Wait for all the futures in the array. Fails if any future fails. *) @@ -185,20 +180,20 @@ module Advanced : sig aggregate_results:(('a t -> 'a) -> 'cont -> 'res) -> 'cont -> 'res t - (** [barrier_on_abstract_container_of_futures ~iter ~aggregate_results ~len cont] takes a - container of futures ([cont]), with [len] elements, - and returns a future result of type [res] - (possibly another type of container). + (** [barrier_on_abstract_container_of_futures ~iter ~aggregate_results ~len + cont] takes a container of futures ([cont]), with [len] elements, and + returns a future result of type [res] (possibly another type of + container). - This waits for all futures in [cont: 'cont] to be done - (futures obtained via [iter cont]). If they - all succeed, their results are aggregated into a new - result of type ['res] via [aggregate_results cont]. + This waits for all futures in [cont: 'cont] to be done (futures obtained + via [iter cont]). If they all succeed, their results are + aggregated into a new result of type ['res] via + [aggregate_results cont]. - {b NOTE}: the behavior is not specified if [iter f cont] (for a function f) - doesn't call [f] on exactly [len cont] elements. + {b NOTE}: the behavior is not specified if [iter f cont] (for a function + f) doesn't call [f] on exactly [len cont] elements. - @since 0.5.1 *) + @since 0.5.1 *) end val map_list : f:('a -> 'b t) -> 'a list -> 'b list t @@ -206,23 +201,22 @@ val map_list : f:('a -> 'b t) -> 'a list -> 'b list t @since 0.5.1 *) val wait_array : _ t array -> unit t -(** [wait_array arr] waits for all futures in [arr] to resolve. It discards - the individual results of futures in [arr]. It fails if any future fails. *) +(** [wait_array arr] waits for all futures in [arr] to resolve. It discards the + individual results of futures in [arr]. It fails if any future fails. *) val wait_list : _ t list -> unit t -(** [wait_list l] waits for all futures in [l] to resolve. It discards - the individual results of futures in [l]. It fails if any future fails. *) +(** [wait_list l] waits for all futures in [l] to resolve. It discards the + individual results of futures in [l]. It fails if any future fails. *) val for_ : on:Runner.t -> int -> (int -> unit) -> unit t -(** [for_ ~on n f] runs [f 0], [f 1], …, [f (n-1)] on the runner, and returns - a future that resolves when all the tasks have resolved, or fails - as soon as one task has failed. *) +(** [for_ ~on n f] runs [f 0], [f 1], …, [f (n-1)] on the runner, and returns a + future that resolves when all the tasks have resolved, or fails as soon as + one task has failed. *) val for_array : on:Runner.t -> 'a array -> (int -> 'a -> unit) -> unit t -(** [for_array ~on arr f] runs [f 0 arr.(0)], …, [f (n-1) arr.(n-1)] in - the runner (where [n = Array.length arr]), and returns a future - that resolves when all the tasks are done, - or fails if any of them fails. +(** [for_array ~on arr f] runs [f 0 arr.(0)], …, [f (n-1) arr.(n-1)] in the + runner (where [n = Array.length arr]), and returns a future that resolves + when all the tasks are done, or fails if any of them fails. @since 0.2 *) val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t @@ -242,43 +236,39 @@ val await : 'a t -> 'a @since 0.3 - This must only be run from inside the runner itself. The runner must - support {!Suspend_}. - {b NOTE}: only on OCaml 5.x -*) + This must only be run from inside the runner itself. The runner must support + {!Suspend_}. {b NOTE}: only on OCaml 5.x *) [@@@endif] (** {2 Blocking} *) val wait_block : 'a t -> 'a or_error -(** [wait_block fut] blocks the current thread until [fut] is resolved, - and returns its value. +(** [wait_block fut] blocks the current thread until [fut] is resolved, and + returns its value. - {b NOTE}: A word of warning: this will monopolize the calling thread until the future - resolves. This can also easily cause deadlocks, if enough threads in a pool - call [wait_block] on futures running on the same pool or a pool depending on it. + {b NOTE}: A word of warning: this will monopolize the calling thread until + the future resolves. This can also easily cause deadlocks, if enough threads + in a pool call [wait_block] on futures running on the same pool or a pool + depending on it. - A good rule to avoid deadlocks is to run this from outside of any pool, - or to have an acyclic order between pools where [wait_block] - is only called from a pool on futures evaluated in a pool that comes lower - in the hierarchy. - If this rule is broken, it is possible for all threads in a pool to wait - for futures that can only make progress on these same threads, - hence the deadlock. - *) + A good rule to avoid deadlocks is to run this from outside of any pool, or + to have an acyclic order between pools where [wait_block] is only called + from a pool on futures evaluated in a pool that comes lower in the + hierarchy. If this rule is broken, it is possible for all threads in a pool + to wait for futures that can only make progress on these same threads, hence + the deadlock. *) val wait_block_exn : 'a t -> 'a (** Same as {!wait_block} but re-raises the exception if the future failed. *) (** {2 Infix operators} - These combinators run on either the current pool (if present), - or on the same thread that just fulfilled the previous future - if not. + These combinators run on either the current pool (if present), or on the + same thread that just fulfilled the previous future if not. - They were previously present as [module Infix_local] and [val infix], - but are now simplified. + They were previously present as [module Infix_local] and [val infix], but + are now simplified. @since 0.5 *) diff --git a/src/core/hmap_ls_.real.ml b/src/core/hmap_ls_.real.ml index c8a3e1a1..8b7950a5 100644 --- a/src/core/hmap_ls_.real.ml +++ b/src/core/hmap_ls_.real.ml @@ -42,9 +42,8 @@ let[@inline] remove_in_local_hmap (k : _ Hmap.key) : unit = let[@inline] set_in_local_hmap (k : 'a Hmap.key) (v : 'a) : unit = update_local_hmap (Hmap.add k v) -(** [with_in_local_hmap k v f] calls [f()] in a context - where [k] is bound to [v] in the local hmap. Then it restores the - previous binding for [k]. *) +(** [with_in_local_hmap k v f] calls [f()] in a context where [k] is bound to + [v] in the local hmap. Then it restores the previous binding for [k]. *) let with_in_local_hmap (k : 'a Hmap.key) (v : 'a) f = let h = get_local_hmap () in match Hmap.find k h with diff --git a/src/core/lock.mli b/src/core/lock.mli index f85f3d49..2b63a890 100644 --- a/src/core/lock.mli +++ b/src/core/lock.mli @@ -1,8 +1,8 @@ (** Mutex-protected resource. - This lock is a synchronous concurrency primitive, as a thin wrapper - around {!Mutex} that encourages proper management of the critical - section in RAII style: + This lock is a synchronous concurrency primitive, as a thin wrapper around + {!Mutex} that encourages proper management of the critical section in RAII + style: {[ let (let@) = (@@) @@ -19,8 +19,8 @@ … ]} - This lock does not work well with {!Fut.await}. A critical section - that contains a call to [await] might cause deadlocks, or lock starvation, + This lock does not work well with {!Fut.await}. A critical section that + contains a call to [await] might cause deadlocks, or lock starvation, because it will hold onto the lock while it goes to sleep. @since 0.3 *) @@ -32,27 +32,27 @@ val create : 'a -> 'a t (** Create a new protected value. *) val with_ : 'a t -> ('a -> 'b) -> 'b -(** [with_ l f] runs [f x] where [x] is the value protected with - the lock [l], in a critical section. If [f x] fails, [with_lock l f] - fails too but the lock is released. *) +(** [with_ l f] runs [f x] where [x] is the value protected with the lock [l], + in a critical section. If [f x] fails, [with_lock l f] fails too but the + lock is released. *) val update : 'a t -> ('a -> 'a) -> unit -(** [update l f] replaces the content [x] of [l] with [f x], while protected - by the mutex. *) +(** [update l f] replaces the content [x] of [l] with [f x], while protected by + the mutex. *) val update_map : 'a t -> ('a -> 'a * 'b) -> 'b -(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l] - and returns [y], while protected by the mutex. *) +(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l] and + returns [y], while protected by the mutex. *) val mutex : _ t -> Mutex.t (** Underlying mutex. *) val get : 'a t -> 'a -(** Atomically get the value in the lock. The value that is returned - isn't protected! *) +(** Atomically get the value in the lock. The value that is returned isn't + protected! *) val set : 'a t -> 'a -> unit (** Atomically set the value. - {b NOTE} caution: using {!get} and {!set} as if this were a {!ref} - is an anti pattern and will not protect data against some race conditions. *) + {b NOTE} caution: using {!get} and {!set} as if this were a {!ref} is an + anti pattern and will not protect data against some race conditions. *) diff --git a/src/core/moonpool.mli b/src/core/moonpool.mli index 8f706fdd..aa4548a5 100644 --- a/src/core/moonpool.mli +++ b/src/core/moonpool.mli @@ -1,13 +1,12 @@ (** Moonpool - A pool within a bigger pool (ie the ocean). Here, we're talking about - pools of [Thread.t] that are dispatched over several [Domain.t] to - enable parallelism. + A pool within a bigger pool (ie the ocean). Here, we're talking about pools + of [Thread.t] that are dispatched over several [Domain.t] to enable + parallelism. - We provide several implementations of pools - with distinct scheduling strategies, alongside some concurrency - primitives such as guarding locks ({!Lock.t}) and futures ({!Fut.t}). -*) + We provide several implementations of pools with distinct scheduling + strategies, alongside some concurrency primitives such as guarding locks + ({!Lock.t}) and futures ({!Fut.t}). *) module Ws_pool = Ws_pool module Fifo_pool = Fifo_pool @@ -24,45 +23,45 @@ module Immediate_runner : sig end module Exn_bt = Exn_bt exception Shutdown -(** Exception raised when trying to run tasks on - runners that have been shut down. +(** Exception raised when trying to run tasks on runners that have been shut + down. @since 0.6 *) val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t -(** Similar to {!Thread.create}, but it picks a background domain at random - to run the thread. This ensures that we don't always pick the same domain - to run all the various threads needed in an application (timers, event loops, etc.) *) +(** Similar to {!Thread.create}, but it picks a background domain at random to + run the thread. This ensures that we don't always pick the same domain to + run all the various threads needed in an application (timers, event loops, + etc.) *) val run_async : ?fiber:Picos.Fiber.t -> Runner.t -> (unit -> unit) -> unit -(** [run_async runner task] schedules the task to run - on the given runner. This means [task()] will be executed - at some point in the future, possibly in another thread. - @param fiber optional initial (picos) fiber state - @since 0.5 *) +(** [run_async runner task] schedules the task to run on the given runner. This + means [task()] will be executed at some point in the future, possibly in + another thread. + @param fiber optional initial (picos) fiber state + @since 0.5 *) val run_wait_block : ?fiber:Picos.Fiber.t -> Runner.t -> (unit -> 'a) -> 'a -(** [run_wait_block runner f] schedules [f] for later execution - on the runner, like {!run_async}. - It then blocks the current thread until [f()] is done executing, - and returns its result. If [f()] raises an exception, then [run_wait_block pool f] - will raise it as well. +(** [run_wait_block runner f] schedules [f] for later execution on the runner, + like {!run_async}. It then blocks the current thread until [f()] is done + executing, and returns its result. If [f()] raises an exception, then + [run_wait_block pool f] will raise it as well. See {!run_async} for more details. - {b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block} - about the required discipline to avoid deadlocks). + {b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block} about the + required discipline to avoid deadlocks). @raise Shutdown if the runner was already shut down @since 0.6 *) val recommended_thread_count : unit -> int -(** Number of threads recommended to saturate the CPU. - For IO pools this makes little sense (you might want more threads than - this because many of them will be blocked most of the time). - @since 0.5 *) +(** Number of threads recommended to saturate the CPU. For IO pools this makes + little sense (you might want more threads than this because many of them + will be blocked most of the time). + @since 0.5 *) val spawn : on:Runner.t -> (unit -> 'a) -> 'a Fut.t -(** [spawn ~on f] runs [f()] on the runner (a thread pool typically) - and returns a future result for it. See {!Fut.spawn}. +(** [spawn ~on f] runs [f()] on the runner (a thread pool typically) and returns + a future result for it. See {!Fut.spawn}. @since 0.5 *) val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.t @@ -71,19 +70,18 @@ val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.t val get_current_runner : unit -> Runner.t option (** See {!Runner.get_current_runner} - @since 0.7 *) + @since 0.7 *) [@@@ifge 5.0] val await : 'a Fut.t -> 'a -(** Await a future, must be run on a moonpool runner. See {!Fut.await}. - Only on OCaml >= 5.0. +(** Await a future, must be run on a moonpool runner. See {!Fut.await}. Only on + OCaml >= 5.0. @since 0.5 *) - val yield : unit -> unit -(** Yield from the current task, must be run on a moonpool runner. - Only on OCaml >= 5.0. +(** Yield from the current task, must be run on a moonpool runner. Only on OCaml + >= 5.0. @since NEXT_RELEASE *) [@@@endif] @@ -96,35 +94,33 @@ module Thread_local_storage = Thread_local_storage (** A simple blocking queue. - This queue is quite basic and will not behave well under heavy - contention. However, it can be sufficient for many practical use cases. + This queue is quite basic and will not behave well under heavy contention. + However, it can be sufficient for many practical use cases. - {b NOTE}: this queue will typically block the caller thread - in case the operation (push/pop) cannot proceed. - Be wary of deadlocks when using the queue {i from} a pool - when you expect the other end to also be produced/consumed from - the same pool. + {b NOTE}: this queue will typically block the caller thread in case the + operation (push/pop) cannot proceed. Be wary of deadlocks when using the + queue {i from} a pool when you expect the other end to also be + produced/consumed from the same pool. - See discussion on {!Fut.wait_block} for more details on deadlocks - and how to mitigate the risk of running into them. + See discussion on {!Fut.wait_block} for more details on deadlocks and how to + mitigate the risk of running into them. - More scalable queues can be found in - Lockfree (https://github.com/ocaml-multicore/lockfree/) -*) + More scalable queues can be found in Lockfree + (https://github.com/ocaml-multicore/lockfree/) *) module Blocking_queue : sig type 'a t (** Unbounded blocking queue. - This queue is thread-safe and will block when calling {!pop} - on it when it's empty. *) + This queue is thread-safe and will block when calling {!pop} on it when + it's empty. *) val create : unit -> _ t (** Create a new unbounded queue. *) val size : _ t -> int - (** Number of items currently in the queue. Note that [pop] - might still block if this returns a non-zero number, since another - thread might have consumed the items in the mean time. + (** Number of items currently in the queue. Note that [pop] might still block + if this returns a non-zero number, since another thread might have + consumed the items in the mean time. @since 0.2 *) exception Closed @@ -132,73 +128,70 @@ module Blocking_queue : sig val push : 'a t -> 'a -> unit (** [push q x] pushes [x] into [q], and returns [()]. - In the current implementation, [push q] will never block for - a long time, it will only block while waiting for a lock - so it can push the element. + In the current implementation, [push q] will never block for a long time, + it will only block while waiting for a lock so it can push the element. @raise Closed if the queue is closed (by a previous call to [close q]) *) val pop : 'a t -> 'a - (** [pop q] pops the next element in [q]. It might block until an element comes. - @raise Closed if the queue was closed before a new element was available. *) - - val close : _ t -> unit - (** Close the queue, meaning there won't be any more [push] allowed, - ie [push] will raise {!Closed}. - - [pop] will keep working and will return the elements present in the - queue, until it's entirely drained; then [pop] will - also raise {!Closed}. *) - - val try_pop : force_lock:bool -> 'a t -> 'a option - (** [try_pop q] immediately pops the first element of [q], if any, - or returns [None] without blocking. - @param force_lock if true, use {!Mutex.lock} (which can block under contention); - if false, use {!Mutex.try_lock}, which might return [None] even in - presence of an element if there's contention *) - - val try_push : 'a t -> 'a -> bool - (** [try_push q x] tries to push into [q], in which case - it returns [true]; or it fails to push and returns [false] - without blocking. - @raise Closed if the locking succeeded but the queue is closed. + (** [pop q] pops the next element in [q]. It might block until an element + comes. + @raise Closed if the queue was closed before a new element was available. *) + val close : _ t -> unit + (** Close the queue, meaning there won't be any more [push] allowed, ie [push] + will raise {!Closed}. + + [pop] will keep working and will return the elements present in the queue, + until it's entirely drained; then [pop] will also raise {!Closed}. *) + + val try_pop : force_lock:bool -> 'a t -> 'a option + (** [try_pop q] immediately pops the first element of [q], if any, or returns + [None] without blocking. + @param force_lock + if true, use {!Mutex.lock} (which can block under contention); if false, + use {!Mutex.try_lock}, which might return [None] even in presence of an + element if there's contention *) + + val try_push : 'a t -> 'a -> bool + (** [try_push q x] tries to push into [q], in which case it returns [true]; or + it fails to push and returns [false] without blocking. + @raise Closed if the locking succeeded but the queue is closed. *) + val transfer : 'a t -> 'a Queue.t -> unit - (** [transfer bq q2] transfers all items presently - in [bq] into [q2] in one atomic section, and clears [bq]. - It blocks if no element is in [bq]. + (** [transfer bq q2] transfers all items presently in [bq] into [q2] in one + atomic section, and clears [bq]. It blocks if no element is in [bq]. - This is useful to consume elements from the queue in batch. - Create a [Queue.t] locally: + This is useful to consume elements from the queue in batch. Create a + [Queue.t] locally: + {[ + let dowork (work_queue : job Bb_queue.t) = + (* local queue, not thread safe *) + let local_q = Queue.create () in + try + while true do + (* work on local events, already on this thread *) + while not (Queue.is_empty local_q) do + let job = Queue.pop local_q in + process_job job + done; - {[ - let dowork (work_queue: job Bb_queue.t) = - (* local queue, not thread safe *) - let local_q = Queue.create() in - try - while true do - (* work on local events, already on this thread *) - while not (Queue.is_empty local_q) do - let job = Queue.pop local_q in - process_job job - done; + (* get all the events in the incoming blocking queue, in + one single critical section. *) + Bb_queue.transfer work_queue local_q + done + with Bb_queue.Closed -> () + ]} - (* get all the events in the incoming blocking queue, in - one single critical section. *) - Bb_queue.transfer work_queue local_q - done - with Bb_queue.Closed -> () - ]} - - @since 0.4 *) + @since 0.4 *) type 'a gen = unit -> 'a option type 'a iter = ('a -> unit) -> unit val to_iter : 'a t -> 'a iter - (** [to_iter q] returns an iterator over all items in the queue. - This might not terminate if [q] is never closed. + (** [to_iter q] returns an iterator over all items in the queue. This might + not terminate if [q] is never closed. @since 0.4 *) val to_gen : 'a t -> 'a gen @@ -215,8 +208,8 @@ module Bounded_queue = Bounded_queue module Atomic = Atomic_ (** Atomic values. - This is either a shim using [ref], on pre-OCaml 5, or the - standard [Atomic] module on OCaml 5. *) + This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic] + module on OCaml 5. *) (**/**) @@ -226,9 +219,9 @@ module Private : sig (** A deque for work stealing, fixed size. *) module Worker_loop_ = Worker_loop_ - (** Worker loop. This is useful to implement custom runners, it - should run on each thread of the runner. - @since 0.7 *) + (** Worker loop. This is useful to implement custom runners, it should run on + each thread of the runner. + @since 0.7 *) module Domain_ = Domain_ (** Utils for domains *) diff --git a/src/core/runner.mli b/src/core/runner.mli index 958c8598..37db1ce4 100644 --- a/src/core/runner.mli +++ b/src/core/runner.mli @@ -1,9 +1,8 @@ (** Interface for runners. - This provides an abstraction for running tasks in the background, - which is implemented by various thread pools. - @since 0.3 -*) + This provides an abstraction for running tasks in the background, which is + implemented by various thread pools. + @since 0.3 *) type fiber = Picos.Fiber.t type task = unit -> unit @@ -12,19 +11,19 @@ type t (** A runner. If a runner is no longer needed, {!shutdown} can be used to signal all - worker threads - in it to stop (after they finish their work), and wait for them to stop. + worker threads in it to stop (after they finish their work), and wait for + them to stop. - The threads are distributed across a fixed domain pool - (whose size is determined by {!Domain.recommended_domain_count} on OCaml 5, and - simple the single runtime on OCaml 4). *) + The threads are distributed across a fixed domain pool (whose size is + determined by {!Domain.recommended_domain_count} on OCaml 5, and simple the + single runtime on OCaml 4). *) val size : t -> int (** Number of threads/workers. *) val num_tasks : t -> int -(** Current number of tasks. This is at best a snapshot, useful for metrics - and debugging. *) +(** Current number of tasks. This is at best a snapshot, useful for metrics and + debugging. *) val shutdown : t -> unit (** Shutdown the runner and wait for it to terminate. Idempotent. *) @@ -35,32 +34,31 @@ val shutdown_without_waiting : t -> unit exception Shutdown val run_async : ?fiber:fiber -> t -> task -> unit -(** [run_async pool f] schedules [f] for later execution on the runner - in one of the threads. [f()] will run on one of the runner's - worker threads/domains. +(** [run_async pool f] schedules [f] for later execution on the runner in one of + the threads. [f()] will run on one of the runner's worker threads/domains. @param fiber if provided, run the task with this initial fiber data - @raise Shutdown if the runner was shut down before [run_async] was called. *) + @raise Shutdown if the runner was shut down before [run_async] was called. +*) val run_wait_block : ?fiber:fiber -> t -> (unit -> 'a) -> 'a -(** [run_wait_block pool f] schedules [f] for later execution - on the pool, like {!run_async}. - It then blocks the current thread until [f()] is done executing, - and returns its result. If [f()] raises an exception, then [run_wait_block pool f] - will raise it as well. +(** [run_wait_block pool f] schedules [f] for later execution on the pool, like + {!run_async}. It then blocks the current thread until [f()] is done + executing, and returns its result. If [f()] raises an exception, then + [run_wait_block pool f] will raise it as well. - {b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block} - about the required discipline to avoid deadlocks). + {b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block} about the + required discipline to avoid deadlocks). @raise Shutdown if the runner was already shut down *) val dummy : t -(** Runner that fails when scheduling tasks on it. - Calling {!run_async} on it will raise Failure. +(** Runner that fails when scheduling tasks on it. Calling {!run_async} on it + will raise Failure. @since 0.6 *) (** {2 Implementing runners} *) -(** This module is specifically intended for users who implement their - own runners. Regular users of Moonpool should not need to look at it. *) +(** This module is specifically intended for users who implement their own + runners. Regular users of Moonpool should not need to look at it. *) module For_runner_implementors : sig val create : size:(unit -> int) -> @@ -71,21 +69,20 @@ module For_runner_implementors : sig t (** Create a new runner. - {b NOTE}: the runner should support DLA and {!Suspend_} on OCaml 5.x, - so that {!Fork_join} and other 5.x features work properly. *) + {b NOTE}: the runner should support DLA and {!Suspend_} on OCaml 5.x, so + that {!Fork_join} and other 5.x features work properly. *) val k_cur_runner : t Thread_local_storage.t - (** Key that should be used by each runner to store itself in TLS - on every thread it controls, so that tasks running on these threads - can access the runner. This is necessary for {!get_current_runner} - to work. *) + (** Key that should be used by each runner to store itself in TLS on every + thread it controls, so that tasks running on these threads can access the + runner. This is necessary for {!get_current_runner} to work. *) end val get_current_runner : unit -> t option -(** Access the current runner. This returns [Some r] if the call - happens on a thread that belongs in a runner. +(** Access the current runner. This returns [Some r] if the call happens on a + thread that belongs in a runner. @since 0.5 *) val get_current_fiber : unit -> fiber option -(** [get_current_storage runner] gets the local storage - for the currently running task. *) +(** [get_current_storage runner] gets the local storage for the currently + running task. *) diff --git a/src/core/task_local_storage.mli b/src/core/task_local_storage.mli index 71c7ffe6..ebdf1e2a 100644 --- a/src/core/task_local_storage.mli +++ b/src/core/task_local_storage.mli @@ -1,41 +1,38 @@ (** Task-local storage. - This storage is associated to the current task, - just like thread-local storage is associated with - the current thread. The storage is carried along in case - the current task is suspended. + This storage is associated to the current task, just like thread-local + storage is associated with the current thread. The storage is carried along + in case the current task is suspended. - @since 0.6 -*) + @since 0.6 *) type 'a t = 'a Picos.Fiber.FLS.t val create : unit -> 'a t -(** [create ()] makes a new key. Keys are expensive and - should never be allocated dynamically or in a loop. *) +(** [create ()] makes a new key. Keys are expensive and should never be + allocated dynamically or in a loop. *) exception Not_set val get_exn : 'a t -> 'a -(** [get k] gets the value for the current task for key [k]. - Must be run from inside a task running on a runner. +(** [get k] gets the value for the current task for key [k]. Must be run from + inside a task running on a runner. @raise Not_set otherwise *) val get_opt : 'a t -> 'a option -(** [get_opt k] gets the current task's value for key [k], - or [None] if not run from inside the task. *) +(** [get_opt k] gets the current task's value for key [k], or [None] if not run + from inside the task. *) val get : 'a t -> default:'a -> 'a val set : 'a t -> 'a -> unit -(** [set k v] sets the storage for [k] to [v]. - Must be run from inside a task running on a runner. +(** [set k v] sets the storage for [k] to [v]. Must be run from inside a task + running on a runner. @raise Failure otherwise *) val with_value : 'a t -> 'a -> (unit -> 'b) -> 'b -(** [with_value k v f] sets [k] to [v] for the duration of the call - to [f()]. When [f()] returns (or fails), [k] is restored - to its old value. *) +(** [with_value k v f] sets [k] to [v] for the duration of the call to [f()]. + When [f()] returns (or fails), [k] is restored to its old value. *) (** {2 Local [Hmap.t]} diff --git a/src/core/worker_loop_.ml b/src/core/worker_loop_.ml index 41fe34ba..1dfc415a 100644 --- a/src/core/worker_loop_.ml +++ b/src/core/worker_loop_.ml @@ -83,8 +83,8 @@ let with_handler (type st arg) ~(ops : st ops) (self : st) : let fiber = get_current_fiber_exn () in (* when triggers is signaled, reschedule task *) if not (Picos.Fiber.try_suspend fiber trigger fiber k reschedule) then - (* trigger was already signaled, run task now *) - reschedule trigger fiber k) + (* trigger was already signaled, run task now *) + reschedule trigger fiber k) | Picos.Computation.Cancel_after _r -> Some (fun k -> diff --git a/src/core/worker_loop_.mli b/src/core/worker_loop_.mli index e5b0c969..7098deb8 100644 --- a/src/core/worker_loop_.mli +++ b/src/core/worker_loop_.mli @@ -1,7 +1,7 @@ (** Internal module that is used for workers. - A thread pool should use this [worker_loop] to run tasks, - handle effects, etc. *) + A thread pool should use this [worker_loop] to run tasks, handle effects, + etc. *) open Types_ diff --git a/src/core/ws_pool.ml b/src/core/ws_pool.ml index fefdf8d9..137e76cd 100644 --- a/src/core/ws_pool.ml +++ b/src/core/ws_pool.ml @@ -16,7 +16,8 @@ end type state = { id_: Id.t; - (** Unique to this pool. Used to make sure tasks stay within the same pool. *) + (** Unique to this pool. Used to make sure tasks stay within the same + pool. *) active: bool A.t; (** Becomes [false] when the pool is shutdown. *) mutable workers: worker_state array; (** Fixed set of workers. *) main_q: WL.task_full Queue.t; @@ -43,9 +44,8 @@ and worker_state = { q: WL.task_full WSQ.t; (** Work stealing queue *) rng: Random.State.t; } -(** State for a given worker. Only this worker is - allowed to push into the queue, but other workers - can come and steal from it if they're idle. *) +(** State for a given worker. Only this worker is allowed to push into the + queue, but other workers can come and steal from it if they're idle. *) let[@inline] size_ (self : state) = Array.length self.workers @@ -55,9 +55,8 @@ let num_tasks_ (self : state) : int = Array.iter (fun w -> n := !n + WSQ.size w.q) self.workers; !n -(** TLS, used by worker to store their specific state - and be able to retrieve it from tasks when we schedule new - sub-tasks. *) +(** TLS, used by worker to store their specific state and be able to retrieve it + from tasks when we schedule new sub-tasks. *) let k_worker_state : worker_state TLS.t = TLS.create () let[@inline] get_current_worker_ () : worker_state option = @@ -77,8 +76,8 @@ let[@inline] try_wake_someone_ (self : state) : unit = Mutex.unlock self.mutex ) -(** Push into worker's local queue, open to work stealing. - precondition: this runs on the worker thread whose state is [self] *) +(** Push into worker's local queue, open to work stealing. precondition: this + runs on the worker thread whose state is [self] *) let schedule_on_current_worker (self : worker_state) task : unit = (* we're on this same pool, schedule in the worker's state. Otherwise we might also be on pool A but asking to schedule on pool B, diff --git a/src/core/ws_pool.mli b/src/core/ws_pool.mli index 3329adba..78ff3173 100644 --- a/src/core/ws_pool.mli +++ b/src/core/ws_pool.mli @@ -1,23 +1,22 @@ (** Work-stealing thread pool. - A pool of threads with a worker-stealing scheduler. - The pool contains a fixed number of threads that wait for work - items to come, process these, and loop. + A pool of threads with a worker-stealing scheduler. The pool contains a + fixed number of threads that wait for work items to come, process these, and + loop. - This is good for CPU-intensive tasks that feature a lot of small tasks. - Note that tasks will not always be processed in the order they are - scheduled, so this is not great for workloads where the latency - of individual tasks matter (for that see {!Fifo_pool}). + This is good for CPU-intensive tasks that feature a lot of small tasks. Note + that tasks will not always be processed in the order they are scheduled, so + this is not great for workloads where the latency of individual tasks matter + (for that see {!Fifo_pool}). This implements {!Runner.t} since 0.3. If a pool is no longer needed, {!shutdown} can be used to signal all threads in it to stop (after they finish their work), and wait for them to stop. - The threads are distributed across a fixed domain pool - (whose size is determined by {!Domain.recommended_domain_count} on OCaml 5, - and simply the single runtime on OCaml 4). - *) + The threads are distributed across a fixed domain pool (whose size is + determined by {!Domain.recommended_domain_count} on OCaml 5, and simply the + single runtime on OCaml 4). *) include module type of Runner @@ -33,25 +32,26 @@ type ('a, 'b) create_args = val create : (unit -> t, _) create_args (** [create ()] makes a new thread pool. - @param on_init_thread called at the beginning of each new thread - in the pool. - @param num_threads size of the pool, ie. number of worker threads. - It will be at least [1] internally, so [0] or negative values make no sense. - The default is [Domain.recommended_domain_count()], ie one worker - thread per CPU core. - On OCaml 4 the default is [4] (since there is only one domain). - @param on_exit_thread called at the end of each thread in the pool - @param around_task a pair of [before, after], where [before pool] is called - before a task is processed, - on the worker thread about to run it, and returns [x]; and [after pool x] is called by - the same thread after the task is over. (since 0.2) - @param name a name for this thread pool, used if tracing is enabled (since 0.6) - *) + @param on_init_thread + called at the beginning of each new thread in the pool. + @param num_threads + size of the pool, ie. number of worker threads. It will be at least [1] + internally, so [0] or negative values make no sense. The default is + [Domain.recommended_domain_count()], ie one worker thread per CPU core. On + OCaml 4 the default is [4] (since there is only one domain). + @param on_exit_thread called at the end of each thread in the pool + @param around_task + a pair of [before, after], where [before pool] is called before a task is + processed, on the worker thread about to run it, and returns [x]; and + [after pool x] is called by the same thread after the task is over. (since + 0.2) + @param name + a name for this thread pool, used if tracing is enabled (since 0.6) *) val with_ : (unit -> (t -> 'a) -> 'a, _) create_args -(** [with_ () f] calls [f pool], where [pool] is obtained via {!create}. - When [f pool] returns or fails, [pool] is shutdown and its resources - are released. +(** [with_ () f] calls [f pool], where [pool] is obtained via {!create}. When + [f pool] returns or fails, [pool] is shutdown and its resources are + released. Most parameters are the same as in {!create}. @since 0.3 *) diff --git a/src/dpool/moonpool_dpool.ml b/src/dpool/moonpool_dpool.ml index e396d716..5f177362 100644 --- a/src/dpool/moonpool_dpool.ml +++ b/src/dpool/moonpool_dpool.ml @@ -76,25 +76,24 @@ type worker_state = { (** Array of (optional) workers. - Workers are started/stop on demand. For each index we have - the (currently active) domain's state - including a work queue and a thread refcount; and the domain itself, - if any, in a separate option because it might outlive its own state. *) + Workers are started/stop on demand. For each index we have the (currently + active) domain's state including a work queue and a thread refcount; and the + domain itself, if any, in a separate option because it might outlive its own + state. *) let domains_ : (worker_state option * Domain_.t option) Lock.t array = let n = max 1 (Domain_.recommended_number ()) in Array.init n (fun _ -> Lock.create (None, None)) (** main work loop for a domain worker. - A domain worker does two things: - - run functions it's asked to (mainly, to start new threads inside it) - - decrease the refcount when one of these threads stops. The thread - will notify the domain that it's exiting, so the domain can know - how many threads are still using it. If all threads exit, the domain - polls a bit (in case new threads are created really shortly after, - which happens with a [Pool.with_] or [Pool.create() … Pool.shutdown()] - in a tight loop), and if nothing happens it tries to stop to free resources. -*) + A domain worker does two things: + - run functions it's asked to (mainly, to start new threads inside it) + - decrease the refcount when one of these threads stops. The thread will + notify the domain that it's exiting, so the domain can know how many + threads are still using it. If all threads exit, the domain polls a bit + (in case new threads are created really shortly after, which happens with + a [Pool.with_] or [Pool.create() … Pool.shutdown()] in a tight loop), and + if nothing happens it tries to stop to free resources. *) let work_ idx (st : worker_state) : unit = let main_loop () = let continue = ref true in diff --git a/src/dpool/moonpool_dpool.mli b/src/dpool/moonpool_dpool.mli index 35ef918c..e579f3ad 100644 --- a/src/dpool/moonpool_dpool.mli +++ b/src/dpool/moonpool_dpool.mli @@ -1,18 +1,17 @@ (** Static pool of domains. - These domains are shared between {b all} the pools in moonpool. - The rationale is that we should not have more domains than cores, so - it's easier to reserve exactly that many domain slots, and run more flexible - thread pools on top (each domain being shared by potentially multiple threads - from multiple pools). + These domains are shared between {b all} the pools in moonpool. The + rationale is that we should not have more domains than cores, so it's easier + to reserve exactly that many domain slots, and run more flexible thread + pools on top (each domain being shared by potentially multiple threads from + multiple pools). - The pool should not contain actual domains if it's not in use, ie if no - runner is presently actively using one or more of the domain slots. + The pool should not contain actual domains if it's not in use, ie if no + runner is presently actively using one or more of the domain slots. - {b NOTE}: Interface is still experimental. + {b NOTE}: Interface is still experimental. - @since 0.6 -*) + @since 0.6 *) type domain = Domain_.t @@ -24,13 +23,13 @@ val max_number_of_domains : unit -> int Be very cautious with this interface, or resource leaks might occur. *) val run_on : int -> (unit -> unit) -> unit -(** [run_on i f] runs [f()] on the domain with index [i]. - Precondition: [0 <= i < n_domains()]. The thread must call {!decr_on} - with [i] once it's done. *) +(** [run_on i f] runs [f()] on the domain with index [i]. Precondition: + [0 <= i < n_domains()]. The thread must call {!decr_on} with [i] once it's + done. *) val decr_on : int -> unit (** Signal that a thread is stopping on the domain with index [i]. *) val run_on_and_wait : int -> (unit -> 'a) -> 'a -(** [run_on_and_wait i f] runs [f()] on the domain with index [i], - and blocks until the result of [f()] is returned back. *) +(** [run_on_and_wait i f] runs [f()] on the domain with index [i], and blocks + until the result of [f()] is returned back. *) diff --git a/src/fib/fiber.ml b/src/fib/fiber.ml index 15589581..255f1ab9 100644 --- a/src/fib/fiber.ml +++ b/src/fib/fiber.ml @@ -187,8 +187,8 @@ let with_on_cancel (self : _ t) cb (k : unit -> 'a) : 'a = let h = add_on_cancel self cb in Fun.protect k ~finally:(fun () -> remove_on_cancel self h) -(** Successfully resolve the fiber. This might still fail if - some children failed. *) +(** Successfully resolve the fiber. This might still fail if some children + failed. *) let resolve_ok_ (self : 'a t) (r : 'a) : unit = let r = A.make @@ Ok r in let promise = prom_of_fut self.res in diff --git a/src/fib/fiber.mli b/src/fib/fiber.mli index d423b9d1..a6458015 100644 --- a/src/fib/fiber.mli +++ b/src/fib/fiber.mli @@ -1,13 +1,11 @@ (** Fibers. - A fiber is a lightweight computation that runs cooperatively - alongside other fibers. In the context of moonpool, fibers - have additional properties: + A fiber is a lightweight computation that runs cooperatively alongside other + fibers. In the context of moonpool, fibers have additional properties: - they run in a moonpool runner - - they form a simple supervision tree, enabling a limited form - of structured concurrency -*) + - they form a simple supervision tree, enabling a limited form of structured + concurrency *) type cancel_callback = Exn_bt.t -> unit (** A callback used in case of cancellation *) @@ -26,8 +24,8 @@ module Private_ : sig runner: Runner.t; pfiber: pfiber; } - (** Type definition, exposed so that {!any} can be unboxed. - Please do not rely on that. *) + (** Type definition, exposed so that {!any} can be unboxed. Please do not rely + on that. *) type any = Any : _ t -> any [@@unboxed] @@ -58,8 +56,7 @@ val return : 'a -> 'a t val fail : Exn_bt.t -> _ t val self : unit -> any -(** [self ()] is the current fiber. - Must be run from inside a fiber. +(** [self ()] is the current fiber. Must be run from inside a fiber. @raise Failure if not run from inside a fiber. *) val peek : 'a t -> 'a Fut.or_error option @@ -78,16 +75,16 @@ val await : 'a t -> 'a (** [await fib] is like [Fut.await (res fib)] *) val wait_block_exn : 'a t -> 'a -(** [wait_block_exn fib] is [Fut.wait_block_exn (res fib)]. - {b NOTE}: See {!Fut.wait_block} for warnings about deadlocks. *) +(** [wait_block_exn fib] is [Fut.wait_block_exn (res fib)]. {b NOTE}: See + {!Fut.wait_block} for warnings about deadlocks. *) val wait_block : 'a t -> 'a Fut.or_error -(** [wait_block fib] is [Fut.wait_block (res fib)]. - {b NOTE}: See {!Fut.wait_block} for warnings about deadlocks. *) +(** [wait_block fib] is [Fut.wait_block (res fib)]. {b NOTE}: See + {!Fut.wait_block} for warnings about deadlocks. *) val check_if_cancelled : unit -> unit -(** Check if the current fiber is cancelled, in which case this raises. - Must be run from inside a fiber. +(** Check if the current fiber is cancelled, in which case this raises. Must be + run from inside a fiber. @raise e if the current fiber is cancelled with exception [e] @raise Failure if not run from a fiber. *) @@ -99,55 +96,54 @@ type cancel_handle (** An opaque handle for a single cancel callback in a fiber *) val add_on_cancel : _ t -> cancel_callback -> cancel_handle -(** [add_on_cancel fib cb] adds [cb] to the list of cancel callbacks - for [fib]. If [fib] is already cancelled, [cb] is called immediately. *) +(** [add_on_cancel fib cb] adds [cb] to the list of cancel callbacks for [fib]. + If [fib] is already cancelled, [cb] is called immediately. *) val remove_on_cancel : _ t -> cancel_handle -> unit -(** [remove_on_cancel fib h] removes the cancel callback - associated with handle [h]. *) +(** [remove_on_cancel fib h] removes the cancel callback associated with handle + [h]. *) val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'a -(** [with_on_cancel fib cb (fun () -> )] evaluates [e] - in a scope in which, if the fiber [fib] is cancelled, - [cb()] is called. If [e] returns without the fiber being cancelled, - this callback is removed. *) +(** [with_on_cancel fib cb (fun () -> )] evaluates [e] in a scope in which, + if the fiber [fib] is cancelled, [cb()] is called. If [e] returns without + the fiber being cancelled, this callback is removed. *) val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'a -(** [with_on_self_cancel cb f] calls [f()] in a scope where - [cb] is added to the cancel callbacks of the current fiber; - and [f()] terminates, [cb] is removed from the list. *) +(** [with_on_self_cancel cb f] calls [f()] in a scope where [cb] is added to the + cancel callbacks of the current fiber; and [f()] terminates, [cb] is removed + from the list. *) val on_result : 'a t -> 'a callback -> unit -(** Wait for fiber to be done and call the callback - with the result. If the fiber is done already then the - callback is invoked immediately with its result. *) +(** Wait for fiber to be done and call the callback with the result. If the + fiber is done already then the callback is invoked immediately with its + result. *) val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t -(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. - This fiber is not the child of any other fiber: its lifetime - is only determined by the lifetime of [f()]. *) +(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. This + fiber is not the child of any other fiber: its lifetime is only determined + by the lifetime of [f()]. *) val spawn : ?on:Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a t -(** [spawn ~protect f] spawns a sub-fiber [f_child] - from a running fiber [parent]. - The sub-fiber [f_child] is attached to the current fiber and fails +(** [spawn ~protect f] spawns a sub-fiber [f_child] from a running fiber + [parent]. The sub-fiber [f_child] is attached to the current fiber and fails if the current fiber [parent] fails. - @param on if provided, start the fiber on the given runner. If not - provided, use the parent's runner. - @param protect if true, when [f_child] fails, it does not - affect [parent]. If false, [f_child] failing also - causes [parent] to fail (and therefore all other children - of [parent]). Default is [true]. + @param on + if provided, start the fiber on the given runner. If not provided, use the + parent's runner. + @param protect + if true, when [f_child] fails, it does not affect [parent]. If false, + [f_child] failing also causes [parent] to fail (and therefore all other + children of [parent]). Default is [true]. Must be run from inside a fiber. - @raise Failure if not run from inside a fiber. *) + @raise Failure if not run from inside a fiber. *) val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit -(** [spawn_ignore f] is [ignore (spawn f)]. - The fiber will still affect termination of the parent, ie. the - parent will exit only after this new fiber exits. - @param on the optional runner to use, added since 0.7 *) +(** [spawn_ignore f] is [ignore (spawn f)]. The fiber will still affect + termination of the parent, ie. the parent will exit only after this new + fiber exits. + @param on the optional runner to use, added since 0.7 *) val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit (** Like {!spawn_top} but ignores the result. diff --git a/src/fib/fls.mli b/src/fib/fls.mli index 35210a8d..15ae4f2f 100644 --- a/src/fib/fls.mli +++ b/src/fib/fls.mli @@ -1,18 +1,16 @@ (** Fiber-local storage. - This storage is associated to the current fiber, - just like thread-local storage is associated with - the current thread. + This storage is associated to the current fiber, just like thread-local + storage is associated with the current thread. - See {!Moonpool.Task_local_storage} for more general information, as - this is based on it. + See {!Moonpool.Task_local_storage} for more general information, as this is + based on it. - {b NOTE}: it's important to note that, while each fiber - has its own storage, spawning a sub-fiber [f2] from a fiber [f1] - will only do a shallow copy of the storage. - Values inside [f1]'s storage will be physically shared with [f2]. - It is thus recommended to store only persistent values in the local storage. -*) + {b NOTE}: it's important to note that, while each fiber has its own storage, + spawning a sub-fiber [f2] from a fiber [f1] will only do a shallow copy of + the storage. Values inside [f1]'s storage will be physically shared with + [f2]. It is thus recommended to store only persistent values in the local + storage. *) include module type of struct include Task_local_storage diff --git a/src/fib/handle.mli b/src/fib/handle.mli index b31e61e4..6e1c13a9 100644 --- a/src/fib/handle.mli +++ b/src/fib/handle.mli @@ -1,7 +1,7 @@ (** The unique name of a fiber. - Each fiber has a unique handle that can be used to - refer to it in maps or sets. *) + Each fiber has a unique handle that can be used to refer to it in maps or + sets. *) type t = private int (** Unique, opaque identifier for a fiber. *) diff --git a/src/fib/main.mli b/src/fib/main.mli index 609d4f2c..85cad3d9 100644 --- a/src/fib/main.mli +++ b/src/fib/main.mli @@ -1,26 +1,25 @@ (** Main thread. - This is evolved from [Moonpool.Immediate_runner], but unlike it, - this API assumes you run it in a thread (possibly - the main thread) which will block until the initial computation is done. + This is evolved from [Moonpool.Immediate_runner], but unlike it, this API + assumes you run it in a thread (possibly the main thread) which will block + until the initial computation is done. - This means it's reasonable to use [Main.main (fun () -> do_everything)] - at the beginning of the program. - Other Moonpool pools can be created for background tasks, etc. to do the - heavy lifting, and the main thread (inside this immediate runner) can coordinate - tasks via [Fiber.await]. + This means it's reasonable to use [Main.main (fun () -> do_everything)] at + the beginning of the program. Other Moonpool pools can be created for + background tasks, etc. to do the heavy lifting, and the main thread (inside + this immediate runner) can coordinate tasks via [Fiber.await]. - Aside from the fact that this blocks the caller thread, it is fairly similar to - {!Background_thread} in that there's a single worker to process + Aside from the fact that this blocks the caller thread, it is fairly similar + to {!Background_thread} in that there's a single worker to process tasks/fibers. This handles effects, including the ones in {!Fiber}. - @since 0.6 -*) + @since 0.6 *) val main : (Moonpool.Runner.t -> 'a) -> 'a -(** [main f] runs [f()] in a scope that handles effects, including {!Fiber.await}. +(** [main f] runs [f()] in a scope that handles effects, including + {!Fiber.await}. This scope can run background tasks as well, in a cooperative fashion. *) diff --git a/src/forkjoin/moonpool_forkjoin.mli b/src/forkjoin/moonpool_forkjoin.mli index ba3b80f0..2e7149c8 100644 --- a/src/forkjoin/moonpool_forkjoin.mli +++ b/src/forkjoin/moonpool_forkjoin.mli @@ -5,20 +5,22 @@ @since 0.3 *) val both : (unit -> 'a) -> (unit -> 'b) -> 'a * 'b -(** [both f g] runs [f()] and [g()], potentially in parallel, - and returns their result when both are done. - If any of [f()] and [g()] fails, then the whole computation fails. +(** [both f g] runs [f()] and [g()], potentially in parallel, and returns their + result when both are done. If any of [f()] and [g()] fails, then the whole + computation fails. - This must be run from within the pool: for example, inside {!Pool.run} - or inside a {!Fut.spawn} computation. - This is because it relies on an effect handler to be installed. + This must be run from within the pool: for example, inside {!Pool.run} or + inside a {!Fut.spawn} computation. This is because it relies on an effect + handler to be installed. @since 0.3 + {b NOTE} this is only available on OCaml 5. *) val both_ignore : (unit -> _) -> (unit -> _) -> unit (** Same as [both f g |> ignore]. @since 0.3 + {b NOTE} this is only available on OCaml 5. *) val for_ : ?chunk_size:int -> int -> (int -> int -> unit) -> unit @@ -63,43 +65,49 @@ val for_ : ?chunk_size:int -> int -> (int -> int -> unit) -> unit {b NOTE} this is only available on OCaml 5. *) val all_array : ?chunk_size:int -> (unit -> 'a) array -> 'a array -(** [all_array fs] runs all functions in [fs] in tasks, and waits for - all the results. +(** [all_array fs] runs all functions in [fs] in tasks, and waits for all the + results. - @param chunk_size if equal to [n], groups items by [n] to be run in - a single task. Default is [1]. + @param chunk_size + if equal to [n], groups items by [n] to be run in a single task. Default + is [1]. @since 0.3 + {b NOTE} this is only available on OCaml 5. *) val all_list : ?chunk_size:int -> (unit -> 'a) list -> 'a list -(** [all_list fs] runs all functions in [fs] in tasks, and waits for - all the results. +(** [all_list fs] runs all functions in [fs] in tasks, and waits for all the + results. - @param chunk_size if equal to [n], groups items by [n] to be run in - a single task. Default is not specified. - This parameter is available since 0.3. + @param chunk_size + if equal to [n], groups items by [n] to be run in a single task. Default + is not specified. This parameter is available since 0.3. @since 0.3 + {b NOTE} this is only available on OCaml 5. *) val all_init : ?chunk_size:int -> int -> (int -> 'a) -> 'a list -(** [all_init n f] runs functions [f 0], [f 1], … [f (n-1)] in tasks, and waits for - all the results. +(** [all_init n f] runs functions [f 0], [f 1], … [f (n-1)] in tasks, and waits + for all the results. - @param chunk_size if equal to [n], groups items by [n] to be run in - a single task. Default is not specified. - This parameter is available since 0.3. + @param chunk_size + if equal to [n], groups items by [n] to be run in a single task. Default + is not specified. This parameter is available since 0.3. @since 0.3 + {b NOTE} this is only available on OCaml 5. *) val map_array : ?chunk_size:int -> ('a -> 'b) -> 'a array -> 'b array (** [map_array f arr] is like [Array.map f arr], but runs in parallel. @since 0.3 + {b NOTE} this is only available on OCaml 5. *) val map_list : ?chunk_size:int -> ('a -> 'b) -> 'a list -> 'b list (** [map_list f l] is like [List.map f l], but runs in parallel. @since 0.3 + {b NOTE} this is only available on OCaml 5. *) diff --git a/src/lwt/IO_in.ml b/src/lwt/IO_in.ml index 0a7e33bc..b5ad110b 100644 --- a/src/lwt/IO_in.ml +++ b/src/lwt/IO_in.ml @@ -2,8 +2,7 @@ open Common_ class type t = object method input : bytes -> int -> int -> int - (** Read into the slice. Returns [0] only if the - stream is closed. *) + (** Read into the slice. Returns [0] only if the stream is closed. *) method close : unit -> unit (** Close the input. Must be idempotent. *) @@ -47,7 +46,7 @@ let of_bytes ?(off = 0) ?len (b : bytes) : t = let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s) (** Read into the given slice. - @return the number of bytes read, [0] means end of input. *) + @return the number of bytes read, [0] means end of input. *) let[@inline] input (self : #t) buf i len = self#input buf i len (** Close the channel. *) diff --git a/src/lwt/base.ml b/src/lwt/base.ml index e859f06e..16124b8f 100644 --- a/src/lwt/base.ml +++ b/src/lwt/base.ml @@ -67,7 +67,7 @@ module Perform_action_in_lwt = struct let actions_ : Action_queue.t = Action_queue.create () (** Gets the current set of notifications and perform them from inside the - Lwt thread *) + Lwt thread *) let perform_pending_actions () : unit = let@ _sp = Moonpool.Private.Tracing_.with_span diff --git a/src/lwt/moonpool_lwt.mli b/src/lwt/moonpool_lwt.mli index 0d171504..d933a73f 100644 --- a/src/lwt/moonpool_lwt.mli +++ b/src/lwt/moonpool_lwt.mli @@ -1,8 +1,7 @@ (** Lwt_engine-based event loop for Moonpool. - In what follows, we mean by "lwt thread" the thread - running [Lwt_main.run] (so, the thread where the Lwt event - loop and all Lwt callbacks execute). + In what follows, we mean by "lwt thread" the thread running [Lwt_main.run] + (so, the thread where the Lwt event loop and all Lwt callbacks execute). {b NOTE}: this is experimental and might change in future versions. @@ -14,53 +13,50 @@ module FLS = Moonpool_fib.Fls (** {2 Basic conversions} *) val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t -(** [fut_of_lwt lwt_fut] makes a thread-safe moonpool future that - completes when [lwt_fut] does. This must be run from within - the Lwt thread. *) +(** [fut_of_lwt lwt_fut] makes a thread-safe moonpool future that completes when + [lwt_fut] does. This must be run from within the Lwt thread. *) val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t -(** [lwt_of_fut fut] makes a lwt future that completes when - [fut] does. This must be called from the Lwt thread, and the result - must always be used only from inside the Lwt thread. *) +(** [lwt_of_fut fut] makes a lwt future that completes when [fut] does. This + must be called from the Lwt thread, and the result must always be used only + from inside the Lwt thread. *) (** {2 Helpers on the moonpool side} *) val await_lwt : 'a Lwt.t -> 'a -(** [await_lwt fut] awaits a Lwt future from inside a task running on - a moonpool runner. This must be run from within a Moonpool runner - so that the await-ing effect is handled. *) +(** [await_lwt fut] awaits a Lwt future from inside a task running on a moonpool + runner. This must be run from within a Moonpool runner so that the await-ing + effect is handled. *) val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t -(** [run_in_lwt f] runs [f()] from within the Lwt thread - and returns a thread-safe future. This can be run from anywhere. *) +(** [run_in_lwt f] runs [f()] from within the Lwt thread and returns a + thread-safe future. This can be run from anywhere. *) val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a -(** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and - awaits its result. Must be run from inside a moonpool runner - so that the await-in effect is handled. +(** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and awaits its result. + Must be run from inside a moonpool runner so that the await-in effect is + handled. This is similar to [Moonpool.await @@ run_in_lwt f]. *) val get_runner : unit -> Moonpool.Runner.t -(** Returns the runner from within which this is called. - Must be run from within a fiber. +(** Returns the runner from within which this is called. Must be run from within + a fiber. @raise Failure if not run within a fiber *) (** {2 IO} *) (** IO using the Lwt event loop. - These IO operations work on non-blocking file descriptors - and rely on a [Lwt_engine] event loop being active (meaning, - [Lwt_main.run] is currently running in some thread). + These IO operations work on non-blocking file descriptors and rely on a + [Lwt_engine] event loop being active (meaning, [Lwt_main.run] is currently + running in some thread). - Calling these functions must be done from a moonpool runner. - A function like [read] will first try to perform the IO action - directly (here, call {!Unix.read}); if the action fails because - the FD is not ready, then [await_readable] is called: - it suspends the fiber and subscribes it to Lwt to be awakened - when the FD becomes ready. -*) + Calling these functions must be done from a moonpool runner. A function like + [read] will first try to perform the IO action directly (here, call + {!Unix.read}); if the action fails because the FD is not ready, then + [await_readable] is called: it suspends the fiber and subscribes it to Lwt + to be awakened when the FD becomes ready. *) module IO : sig val read : Unix.file_descr -> bytes -> int -> int -> int (** Read from the file descriptor *) @@ -91,27 +87,29 @@ module TCP_server : sig type t = Lwt_io.server val establish_lwt : - ?backlog:(* ?server_fd:Unix.file_descr -> *) - int -> + ?backlog: + (* ?server_fd:Unix.file_descr -> *) + int -> ?no_close:bool -> runner:Moonpool.Runner.t -> Unix.sockaddr -> (Unix.sockaddr -> Lwt_io.input_channel -> Lwt_io.output_channel -> unit) -> t - (** [establish ~runner addr handler] runs a TCP server in the Lwt - thread. When a client connects, a moonpool fiber is started on [runner] - to handle it. *) + (** [establish ~runner addr handler] runs a TCP server in the Lwt thread. When + a client connects, a moonpool fiber is started on [runner] to handle it. + *) val establish : - ?backlog:(* ?server_fd:Unix.file_descr -> *) - int -> + ?backlog: + (* ?server_fd:Unix.file_descr -> *) + int -> ?no_close:bool -> runner:Moonpool.Runner.t -> Unix.sockaddr -> (Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) -> t - (** Like {!establish_lwt} but uses {!IO} to directly handle - reads and writes on client sockets. *) + (** Like {!establish_lwt} but uses {!IO} to directly handle reads and writes + on client sockets. *) val shutdown : t -> unit (** Shutdown the server *) @@ -121,8 +119,8 @@ module TCP_client : sig val connect : Unix.sockaddr -> Unix.file_descr val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a - (** Open a connection, and use {!IO} to read and write from - the socket in a non blocking way. *) + (** Open a connection, and use {!IO} to read and write from the socket in a + non blocking way. *) val with_connect_lwt : Unix.sockaddr -> (Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) -> 'a @@ -132,15 +130,15 @@ end (** {2 Helpers on the lwt side} *) val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t -(** [detach_in_runner ~runner f] runs [f] in the given moonpool runner, - and returns a lwt future. This must be run from within the thread - running [Lwt_main]. *) +(** [detach_in_runner ~runner f] runs [f] in the given moonpool runner, and + returns a lwt future. This must be run from within the thread running + [Lwt_main]. *) (** {2 Wrappers around Lwt_main} *) val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a -(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside - a fiber in [runner]. *) +(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] + inside a fiber in [runner]. *) val main : (unit -> 'a) -> 'a (** Like {!main_with_runner} but with a default choice of runner. *) diff --git a/src/private/ws_deque_.mli b/src/private/ws_deque_.mli index 0b9fd84a..b7df21a9 100644 --- a/src/private/ws_deque_.mli +++ b/src/private/ws_deque_.mli @@ -1,11 +1,10 @@ (** Work-stealing deque. - Adapted from "Dynamic circular work stealing deque", Chase & Lev. + Adapted from "Dynamic circular work stealing deque", Chase & Lev. - However note that this one is not dynamic in the sense that there - is no resizing. Instead we return [false] when [push] fails, which - keeps the implementation fairly lightweight. - *) + However note that this one is not dynamic in the sense that there is no + resizing. Instead we return [false] when [push] fails, which keeps the + implementation fairly lightweight. *) type 'a t (** Deque containing values of type ['a] *) @@ -14,12 +13,12 @@ val create : dummy:'a -> unit -> 'a t (** Create a new deque. *) val push : 'a t -> 'a -> bool -(** Push value at the bottom of deque. returns [true] if it succeeds. - This must be called only by the owner thread. *) +(** Push value at the bottom of deque. returns [true] if it succeeds. This must + be called only by the owner thread. *) val pop : 'a t -> 'a option -(** Pop value from the bottom of deque. - This must be called only by the owner thread. *) +(** Pop value from the bottom of deque. This must be called only by the owner + thread. *) exception Empty diff --git a/src/sync/lock.mli b/src/sync/lock.mli index 2240b41d..2e8a8994 100644 --- a/src/sync/lock.mli +++ b/src/sync/lock.mli @@ -1,8 +1,8 @@ (** Mutex-protected resource. - This lock is a synchronous concurrency primitive, as a thin wrapper - around {!Mutex} that encourages proper management of the critical - section in RAII style: + This lock is a synchronous concurrency primitive, as a thin wrapper around + {!Mutex} that encourages proper management of the critical section in RAII + style: {[ let (let@) = (@@) @@ -30,27 +30,27 @@ val create : 'a -> 'a t (** Create a new protected value. *) val with_ : 'a t -> ('a -> 'b) -> 'b -(** [with_ l f] runs [f x] where [x] is the value protected with - the lock [l], in a critical section. If [f x] fails, [with_lock l f] - fails too but the lock is released. *) +(** [with_ l f] runs [f x] where [x] is the value protected with the lock [l], + in a critical section. If [f x] fails, [with_lock l f] fails too but the + lock is released. *) val update : 'a t -> ('a -> 'a) -> unit -(** [update l f] replaces the content [x] of [l] with [f x], while protected - by the mutex. *) +(** [update l f] replaces the content [x] of [l] with [f x], while protected by + the mutex. *) val update_map : 'a t -> ('a -> 'a * 'b) -> 'b -(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l] - and returns [y], while protected by the mutex. *) +(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l] and + returns [y], while protected by the mutex. *) val mutex : _ t -> Picos_std_sync.Mutex.t (** Underlying mutex. *) val get : 'a t -> 'a -(** Atomically get the value in the lock. The value that is returned - isn't protected! *) +(** Atomically get the value in the lock. The value that is returned isn't + protected! *) val set : 'a t -> 'a -> unit (** Atomically set the value. - {b NOTE} caution: using {!get} and {!set} as if this were a {!ref} - is an anti pattern and will not protect data against some race conditions. *) + {b NOTE} caution: using {!get} and {!set} as if this were a {!ref} is an + anti pattern and will not protect data against some race conditions. *)