Compare commits

..

No commits in common. "213d9bdd194cc3cf772e718d45823d9e4ab5785c" and "389f237993f2c63a51102ae11054685815d6355e" have entirely different histories.

41 changed files with 670 additions and 725 deletions

View file

@ -78,7 +78,7 @@ jobs:
strategy: strategy:
matrix: matrix:
ocaml-compiler: ocaml-compiler:
- '5.3' - '5.2'
runs-on: 'ubuntu-latest' runs-on: 'ubuntu-latest'
steps: steps:
- uses: actions/checkout@main - uses: actions/checkout@main
@ -89,6 +89,6 @@ jobs:
dune-cache: true dune-cache: true
allow-prerelease-opam: true allow-prerelease-opam: true
- run: opam install ocamlformat.0.27.0 - run: opam install ocamlformat.0.26.2
- run: opam exec -- make format-check - run: opam exec -- make format-check

View file

@ -1,4 +1,4 @@
version = 0.27.0 version = 0.26.2
profile=conventional profile=conventional
margin=80 margin=80
if-then-else=k-r if-then-else=k-r

View file

@ -1,17 +1,4 @@
# 0.8
- api(fut): make alias `'a Fut.t = 'a Picos.Computation.t` public
- feat: add `Fut.make_promise`, have `'a promise = private 'a fut`
- feat(exn_bt): in show/pp, do print the backtrace when present
- feat: block signals in workers if asked to
- relax bound on picos to 0.5-0.6
- feat fib: `spawn_ignore` now has `?on` optional param
- change Moonpool.Chan so it's bounded (stil experimental)
- fix task local storage: type was too specific
- fix fiber: use a single fut/computation in fibers
# 0.7 # 0.7
- add `Moonpool_fiber.spawn_top_ignore` - add `Moonpool_fiber.spawn_top_ignore`

View file

@ -2,7 +2,7 @@
(using mdx 0.2) (using mdx 0.2)
(name moonpool) (name moonpool)
(version 0.8) (version 0.7)
(generate_opam_files true) (generate_opam_files true)
(source (source
(github c-cube/moonpool)) (github c-cube/moonpool))

View file

@ -1,5 +1,4 @@
(** Example from (** Example from https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 *)
https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 *)
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead # This file is generated by dune, edit dune-project instead
opam-version: "2.0" opam-version: "2.0"
version: "0.8" version: "0.7"
synopsis: "Async IO for moonpool, relying on picos (experimental)" synopsis: "Async IO for moonpool, relying on picos (experimental)"
maintainer: ["Simon Cruanes"] maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"] authors: ["Simon Cruanes"]

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead # This file is generated by dune, edit dune-project instead
opam-version: "2.0" opam-version: "2.0"
version: "0.8" version: "0.7"
synopsis: "Event loop for moonpool based on Lwt-engine (experimental)" synopsis: "Event loop for moonpool based on Lwt-engine (experimental)"
maintainer: ["Simon Cruanes"] maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"] authors: ["Simon Cruanes"]

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead # This file is generated by dune, edit dune-project instead
opam-version: "2.0" opam-version: "2.0"
version: "0.8" version: "0.7"
synopsis: "Pools of threads supported by a pool of domains" synopsis: "Pools of threads supported by a pool of domains"
maintainer: ["Simon Cruanes"] maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"] authors: ["Simon Cruanes"]

View file

@ -1,11 +1,13 @@
(** A simple runner with a single background thread. (** A simple runner with a single background thread.
Because this is guaranteed to have a single worker thread, tasks scheduled Because this is guaranteed to have a single worker thread,
in this runner always run asynchronously but in a sequential fashion. tasks scheduled in this runner always run asynchronously but
in a sequential fashion.
This is similar to {!Fifo_pool} with exactly one thread. This is similar to {!Fifo_pool} with exactly one thread.
@since 0.6 *) @since 0.6
*)
include module type of Runner include module type of Runner

View file

@ -16,45 +16,48 @@ val size : _ t -> int
val pop : 'a t -> 'a val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element comes. (** [pop q] pops the next element in [q]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *) @raise Closed if the queue was closed before a new element was available. *)
val try_pop : force_lock:bool -> 'a t -> 'a option val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop q] immediately pops the first element of [q], if any, or returns (** [try_pop q] immediately pops the first element of [q], if any,
[None] without blocking. or returns [None] without blocking.
@param force_lock @param force_lock if true, use {!Mutex.lock} (which can block under contention);
if true, use {!Mutex.lock} (which can block under contention); if false, if false, use {!Mutex.try_lock}, which might return [None] even in
use {!Mutex.try_lock}, which might return [None] even in presence of an presence of an element if there's contention *)
element if there's contention *)
val try_push : 'a t -> 'a -> bool val try_push : 'a t -> 'a -> bool
(** [try_push q x] tries to push into [q], in which case it returns [true]; or (** [try_push q x] tries to push into [q], in which case
it fails to push and returns [false] without blocking. it returns [true]; or it fails to push and returns [false]
@raise Closed if the locking succeeded but the queue is closed. *) without blocking.
@raise Closed if the locking succeeded but the queue is closed.
*)
val transfer : 'a t -> 'a Queue.t -> unit val transfer : 'a t -> 'a Queue.t -> unit
(** [transfer bq q2] transfers all items presently in [bq] into [q2] in one (** [transfer bq q2] transfers all items presently
atomic section, and clears [bq]. It blocks if no element is in [bq]. in [bq] into [q2] in one atomic section, and clears [bq].
It blocks if no element is in [bq].
This is useful to consume elements from the queue in batch.
Create a [Queue.t] locally:
This is useful to consume elements from the queue in batch. Create a
[Queue.t] locally:
{[ {[
let dowork (work_queue : job Bb_queue.t) = let dowork (work_queue: job Bb_queue.t) =
(* local queue, not thread safe *) (* local queue, not thread safe *)
let local_q = Queue.create () in let local_q = Queue.create() in
try try
while true do while true do
(* work on local events, already on this thread *) (* work on local events, already on this thread *)
while not (Queue.is_empty local_q) do while not (Queue.is_empty local_q) do
let job = Queue.pop local_q in let job = Queue.pop local_q in
process_job job process_job job
done; done;
(* get all the events in the incoming blocking queue, in (* get all the events in the incoming blocking queue, in
one single critical section. *) one single critical section. *)
Bb_queue.transfer work_queue local_q Bb_queue.transfer work_queue local_q
done done
with Bb_queue.Closed -> () with Bb_queue.Closed -> ()
]} ]}
@since 0.4 *) @since 0.4 *)
@ -66,8 +69,8 @@ type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit type 'a iter = ('a -> unit) -> unit
val to_iter : 'a t -> 'a iter val to_iter : 'a t -> 'a iter
(** [to_iter q] returns an iterator over all items in the queue. This might not (** [to_iter q] returns an iterator over all items in the queue.
terminate if [q] is never closed. This might not terminate if [q] is never closed.
@since 0.4 *) @since 0.4 *)
val to_gen : 'a t -> 'a gen val to_gen : 'a t -> 'a gen

View file

@ -1,13 +1,15 @@
(** A blocking queue of finite size. (** A blocking queue of finite size.
This queue, while still using locks underneath (like the regular blocking This queue, while still using locks underneath
queue) should be enough for usage under reasonable contention. (like the regular blocking queue) should be enough for
usage under reasonable contention.
The bounded size is helpful whenever some form of backpressure is desirable: The bounded size is helpful whenever some form of backpressure is
if the queue is used to communicate between producer(s) and consumer(s), the desirable: if the queue is used to communicate between producer(s)
consumer(s) can limit the rate at which producer(s) send new work down their and consumer(s), the consumer(s) can limit the rate at which
way. Whenever the queue is full, means that producer(s) will have to wait producer(s) send new work down their way.
before pushing new work. Whenever the queue is full, means that producer(s) will have to
wait before pushing new work.
@since 0.4 *) @since 0.4 *)
@ -17,41 +19,42 @@ type 'a t
val create : max_size:int -> unit -> 'a t val create : max_size:int -> unit -> 'a t
val close : _ t -> unit val close : _ t -> unit
(** [close q] closes [q]. No new elements can be pushed into [q], and after all (** [close q] closes [q]. No new elements can be pushed into [q],
the elements still in [q] currently are [pop]'d, {!pop} will also raise and after all the elements still in [q] currently are [pop]'d,
{!Closed}. *) {!pop} will also raise {!Closed}. *)
exception Closed exception Closed
val push : 'a t -> 'a -> unit val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] at the end of the queue. If [q] is full, this will (** [push q x] pushes [x] at the end of the queue.
block until there is room for [x]. If [q] is full, this will block until there is
room for [x].
@raise Closed if [q] is closed. *) @raise Closed if [q] is closed. *)
val try_push : force_lock:bool -> 'a t -> 'a -> bool val try_push : force_lock:bool -> 'a t -> 'a -> bool
(** [try_push q x] attempts to push [x] into [q], but abandons if it cannot (** [try_push q x] attempts to push [x] into [q], but abandons
acquire [q] or if [q] is full. if it cannot acquire [q] or if [q] is full.
@param force_lock @param force_lock if true, use {!Mutex.lock} (which can block
if true, use {!Mutex.lock} (which can block under contention); if false, under contention);
use {!Mutex.try_lock}, which might return [false] even if there's room in if false, use {!Mutex.try_lock}, which might return [false] even
the queue. if there's room in the queue.
@raise Closed if [q] is closed. *) @raise Closed if [q] is closed. *)
val pop : 'a t -> 'a val pop : 'a t -> 'a
(** [pop q] pops the first element off [q]. It blocks if [q] is empty, until (** [pop q] pops the first element off [q]. It blocks if [q]
some element becomes available. is empty, until some element becomes available.
@raise Closed if [q] is empty and closed. *) @raise Closed if [q] is empty and closed. *)
val try_pop : force_lock:bool -> 'a t -> 'a option val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop ~force_lock q] tries to pop the first element, or returns [None] if (** [try_pop ~force_lock q] tries to pop the first element, or returns [None]
no element is available or if it failed to acquire [q]. if no element is available or if it failed to acquire [q].
@param force_lock @param force_lock if true, use {!Mutex.lock} (which can block
if true, use {!Mutex.lock} (which can block under contention); if false, under contention);
use {!Mutex.try_lock}, which might return [None] even in presence of an if false, use {!Mutex.try_lock}, which might return [None] even in
element if there's contention. presence of an element if there's contention.
@raise Closed if [q] is empty and closed. *) @raise Closed if [q] is empty and closed. *)
@ -62,8 +65,9 @@ val max_size : _ t -> int
(** Maximum size of the queue. See {!create}. *) (** Maximum size of the queue. See {!create}. *)
val transfer : 'a t -> 'a Queue.t -> unit val transfer : 'a t -> 'a Queue.t -> unit
(** [transfer bq q2] transfers all elements currently available in [bq] into (** [transfer bq q2] transfers all elements currently available
local queue [q2], and clears [bq], atomically. It blocks if [bq] is empty. in [bq] into local queue [q2], and clears [bq], atomically.
It blocks if [bq] is empty.
See {!Bb_queue.transfer} for more details. See {!Bb_queue.transfer} for more details.
@raise Closed if [bq] is empty and closed. *) @raise Closed if [bq] is empty and closed. *)
@ -72,8 +76,8 @@ type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit type 'a iter = ('a -> unit) -> unit
val to_iter : 'a t -> 'a iter val to_iter : 'a t -> 'a iter
(** [to_iter q] returns an iterator over all items in the queue. This might not (** [to_iter q] returns an iterator over all items in the queue.
terminate if [q] is never closed. *) This might not terminate if [q] is never closed. *)
val to_gen : 'a t -> 'a gen val to_gen : 'a t -> 'a gen
(** [to_gen q] returns a generator from the queue. *) (** [to_gen q] returns a generator from the queue. *)

View file

@ -3,7 +3,7 @@
The channels have bounded size. Push/pop return futures or can use effects The channels have bounded size. Push/pop return futures or can use effects
to provide an [await]-friendly version. to provide an [await]-friendly version.
The channels became bounded since @0.7 . The channels became bounded since @NEXT_RELEASE .
*) *)
type 'a t type 'a t
@ -15,32 +15,33 @@ val create : max_size:int -> unit -> 'a t
exception Closed exception Closed
val try_push : 'a t -> 'a -> bool val try_push : 'a t -> 'a -> bool
(** [try_push chan x] pushes [x] into [chan]. This does not block. Returns (** [try_push chan x] pushes [x] into [chan]. This does not block.
[true] if it succeeded in pushing. Returns [true] if it succeeded in pushing.
@raise Closed if the channel is closed. *) @raise Closed if the channel is closed. *)
val try_pop : 'a t -> 'a option val try_pop : 'a t -> 'a option
(** [try_pop chan] pops and return an element if one is available immediately. (** [try_pop chan] pops and return an element if one is available
Otherwise it returns [None]. immediately. Otherwise it returns [None].
@raise Closed if the channel is closed and empty. *) @raise Closed if the channel is closed and empty.
*)
val close : _ t -> unit val close : _ t -> unit
(** Close the channel. Further push and pop calls will fail. This is idempotent. (** Close the channel. Further push and pop calls will fail.
*) This is idempotent. *)
[@@@ifge 5.0] [@@@ifge 5.0]
val push : 'a t -> 'a -> unit val push : 'a t -> 'a -> unit
(** Push the value into the channel, suspending the current task if the channel (** Push the value into the channel, suspending the current task
is currently full. if the channel is currently full.
@raise Closed if the channel is closed @raise Closed if the channel is closed
@since 0.7 *) @since NEXT_RELEASE *)
val pop : 'a t -> 'a val pop : 'a t -> 'a
(** Pop an element. This might suspend the current task if the channel is (** Pop an element. This might suspend the current task if the
currently empty. channel is currently empty.
@raise Closed if the channel is empty and closed. @raise Closed if the channel is empty and closed.
@since 0.7 *) @since NEXT_RELEASE *)
(* (*
val pop_block_exn : 'a t -> 'a val pop_block_exn : 'a t -> 'a

View file

@ -3,15 +3,7 @@ type t = exn * Printexc.raw_backtrace
let[@inline] make exn bt : t = exn, bt let[@inline] make exn bt : t = exn, bt
let[@inline] exn (e, _) = e let[@inline] exn (e, _) = e
let[@inline] bt (_, bt) = bt let[@inline] bt (_, bt) = bt
let show self = Printexc.to_string (exn self)
let show self =
let bt = Printexc.raw_backtrace_to_string (bt self) in
let exn = Printexc.to_string (exn self) in
if bt = "" then
exn
else
Printf.sprintf "%s\n%s" exn bt
let pp out self = Format.pp_print_string out (show self) let pp out self = Format.pp_print_string out (show self)
let[@inline] raise (e, bt) = Printexc.raise_with_backtrace e bt let[@inline] raise (e, bt) = Printexc.raise_with_backtrace e bt

View file

@ -165,9 +165,7 @@ let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
create the thread and push it into [receive_threads] *) create the thread and push it into [receive_threads] *)
let create_thread_in_domain () = let create_thread_in_domain () =
let st = { idx = i; dom_idx; st = pool } in let st = { idx = i; dom_idx; st = pool } in
let thread = let thread = Thread.create (WL.worker_loop ~ops:worker_ops) st in
Thread.create (WL.worker_loop ~block_signals:true ~ops:worker_ops) st
in
(* send the thread from the domain back to us *) (* send the thread from the domain back to us *)
Bb_queue.push receive_threads (i, thread) Bb_queue.push receive_threads (i, thread)
in in

View file

@ -1,16 +1,16 @@
(** A simple thread pool in FIFO order. (** A simple thread pool in FIFO order.
FIFO: first-in, first-out. Basically tasks are put into a queue, and worker FIFO: first-in, first-out. Basically tasks are put into a queue,
threads pull them out of the queue at the other end. and worker threads pull them out of the queue at the other end.
Since this uses a single blocking queue to manage tasks, it's very simple Since this uses a single blocking queue to manage tasks, it's very
and reliable. The number of worker threads is fixed, but they are spread simple and reliable. The number of worker threads is fixed, but
over several domains to enable parallelism. they are spread over several domains to enable parallelism.
This can be useful for latency-sensitive applications (e.g. as a pool of This can be useful for latency-sensitive applications (e.g. as a
workers for network servers). Work-stealing pools might have higher pool of workers for network servers). Work-stealing pools might
throughput but they're very unfair to some tasks; by contrast, here, older have higher throughput but they're very unfair to some tasks; by
tasks have priority over younger tasks. contrast, here, older tasks have priority over younger tasks.
@since 0.5 *) @since 0.5 *)
@ -28,22 +28,22 @@ type ('a, 'b) create_args =
val create : (unit -> t, _) create_args val create : (unit -> t, _) create_args
(** [create ()] makes a new thread pool. (** [create ()] makes a new thread pool.
@param on_init_thread @param on_init_thread called at the beginning of each new thread in the pool.
called at the beginning of each new thread in the pool. @param min minimum size of the pool. See {!Pool.create_args}.
@param min The default is [Domain.recommended_domain_count()], ie one worker per
minimum size of the pool. See {!Pool.create_args}. The default is CPU core.
[Domain.recommended_domain_count()], ie one worker per CPU core. On OCaml On OCaml 4 the default is [4] (since there is only one domain).
4 the default is [4] (since there is only one domain). @param on_exit_thread called at the end of each worker thread in the pool.
@param on_exit_thread called at the end of each worker thread in the pool. @param around_task a pair of [before, after] functions
@param around_task ran around each task. See {!Pool.create_args}.
a pair of [before, after] functions ran around each task. See @param name name for the pool, used in tracing (since 0.6)
{!Pool.create_args}. *)
@param name name for the pool, used in tracing (since 0.6) *)
val with_ : (unit -> (t -> 'a) -> 'a, _) create_args val with_ : (unit -> (t -> 'a) -> 'a, _) create_args
(** [with_ () f] calls [f pool], where [pool] is obtained via {!create}. When (** [with_ () f] calls [f pool], where [pool] is obtained via {!create}.
[f pool] returns or fails, [pool] is shutdown and its resources are When [f pool] returns or fails, [pool] is shutdown and its resources
released. Most parameters are the same as in {!create}. *) are released.
Most parameters are the same as in {!create}. *)
(**/**) (**/**)

View file

@ -3,23 +3,23 @@ module C = Picos.Computation
type 'a or_error = ('a, Exn_bt.t) result type 'a or_error = ('a, Exn_bt.t) result
type 'a waiter = 'a or_error -> unit type 'a waiter = 'a or_error -> unit
type 'a t = 'a C.t type 'a t = { st: 'a C.t } [@@unboxed]
type 'a promise = 'a t type 'a promise = 'a t
let[@inline] make_promise () : _ t = let[@inline] make_promise () : _ t =
let fut = C.create ~mode:`LIFO () in let fut = { st = C.create ~mode:`LIFO () } in
fut fut
let make () = let make () =
let fut = make_promise () in let fut = make_promise () in
fut, fut fut, fut
let[@inline] return x : _ t = C.returned x let[@inline] return x : _ t = { st = C.returned x }
let[@inline] fail exn bt : _ t = let[@inline] fail exn bt : _ t =
let fut = C.create () in let st = C.create () in
C.cancel fut exn bt; C.cancel st exn bt;
fut { st }
let[@inline] fail_exn_bt ebt = fail (Exn_bt.exn ebt) (Exn_bt.bt ebt) let[@inline] fail_exn_bt ebt = fail (Exn_bt.exn ebt) (Exn_bt.bt ebt)
@ -27,32 +27,32 @@ let[@inline] of_result = function
| Ok x -> return x | Ok x -> return x
| Error ebt -> fail_exn_bt ebt | Error ebt -> fail_exn_bt ebt
let[@inline] is_resolved self : bool = not (C.is_running self) let[@inline] is_resolved self : bool = not (C.is_running self.st)
let is_done = is_resolved let is_done = is_resolved
let peek : 'a t -> _ option = C.peek let[@inline] peek self : _ option = C.peek self.st
let raise_if_failed : _ t -> unit = C.check let[@inline] raise_if_failed self : unit = C.check self.st
let[@inline] is_success self = let[@inline] is_success self =
match C.peek_exn self with match C.peek_exn self.st with
| _ -> true | _ -> true
| exception _ -> false | exception _ -> false
let is_failed : _ t -> bool = C.is_canceled let[@inline] is_failed self = C.is_canceled self.st
exception Not_ready exception Not_ready
let[@inline] get_or_fail self = let[@inline] get_or_fail self =
match C.peek self with match C.peek self.st with
| Some x -> x | Some x -> x
| None -> raise Not_ready | None -> raise Not_ready
let[@inline] get_or_fail_exn self = let[@inline] get_or_fail_exn self =
match C.peek_exn self with match C.peek_exn self.st with
| x -> x | x -> x
| exception C.Running -> raise Not_ready | exception C.Running -> raise Not_ready
let[@inline] peek_or_assert_ (self : 'a t) : 'a = let[@inline] peek_or_assert_ (self : 'a t) : 'a =
match C.peek_exn self with match C.peek_exn self.st with
| x -> x | x -> x
| exception C.Running -> assert false | exception C.Running -> assert false
@ -67,32 +67,32 @@ let on_result (self : _ t) (f : _ waiter) : unit =
let trigger = let trigger =
(Trigger.from_action f self on_result_cb_ [@alert "-handler"]) (Trigger.from_action f self on_result_cb_ [@alert "-handler"])
in in
if not (C.try_attach self trigger) then on_result_cb_ () f self if not (C.try_attach self.st trigger) then on_result_cb_ () f self
let on_result_ignore_cb_ _tr f (self : _ t) = let on_result_ignore_cb_ _tr f (self : _ t) =
f (Picos.Computation.canceled self) f (Picos.Computation.canceled self.st)
let on_result_ignore (self : _ t) f : unit = let on_result_ignore (self : _ t) f : unit =
if Picos.Computation.is_running self then ( if Picos.Computation.is_running self.st then (
let trigger = let trigger =
(Trigger.from_action f self on_result_ignore_cb_ [@alert "-handler"]) (Trigger.from_action f self on_result_ignore_cb_ [@alert "-handler"])
in in
if not (C.try_attach self trigger) then on_result_ignore_cb_ () f self if not (C.try_attach self.st trigger) then on_result_ignore_cb_ () f self
) else ) else
on_result_ignore_cb_ () f self on_result_ignore_cb_ () f self
let[@inline] fulfill_idempotent self r = let[@inline] fulfill_idempotent self r =
match r with match r with
| Ok x -> C.return self x | Ok x -> C.return self.st x
| Error ebt -> C.cancel self (Exn_bt.exn ebt) (Exn_bt.bt ebt) | Error ebt -> C.cancel self.st (Exn_bt.exn ebt) (Exn_bt.bt ebt)
exception Already_fulfilled exception Already_fulfilled
let fulfill (self : _ t) (r : _ result) : unit = let fulfill (self : _ t) (r : _ result) : unit =
let ok = let ok =
match r with match r with
| Ok x -> C.try_return self x | Ok x -> C.try_return self.st x
| Error ebt -> C.try_cancel self (Exn_bt.exn ebt) (Exn_bt.bt ebt) | Error ebt -> C.try_cancel self.st (Exn_bt.exn ebt) (Exn_bt.bt ebt)
in in
if not ok then raise Already_fulfilled if not ok then raise Already_fulfilled
@ -104,10 +104,10 @@ let spawn ~on f : _ t =
let task () = let task () =
try try
let res = f () in let res = f () in
C.return fut res C.return fut.st res
with exn -> with exn ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
C.cancel fut exn bt C.cancel fut.st exn bt
in in
Runner.run_async on task; Runner.run_async on task;
@ -380,7 +380,7 @@ let for_list ~on l f : unit t =
let push_queue_ _tr q () = Bb_queue.push q () let push_queue_ _tr q () = Bb_queue.push q ()
let wait_block_exn (self : 'a t) : 'a = let wait_block_exn (self : 'a t) : 'a =
match C.peek_exn self with match C.peek_exn self.st with
| x -> x (* fast path *) | x -> x (* fast path *)
| exception C.Running -> | exception C.Running ->
let real_block () = let real_block () =
@ -394,7 +394,7 @@ let wait_block_exn (self : 'a t) : 'a =
assert attached; assert attached;
(* blockingly wait for trigger if computation didn't complete in the mean time *) (* blockingly wait for trigger if computation didn't complete in the mean time *)
if C.try_attach self trigger then Bb_queue.pop q; if C.try_attach self.st trigger then Bb_queue.pop q;
(* trigger was signaled! computation must be done*) (* trigger was signaled! computation must be done*)
peek_or_assert_ self peek_or_assert_ self
@ -406,7 +406,7 @@ let wait_block_exn (self : 'a t) : 'a =
if i = 0 then if i = 0 then
real_block () real_block ()
else ( else (
match C.peek_exn self with match C.peek_exn self.st with
| x -> x | x -> x
| exception C.Running -> | exception C.Running ->
Domain_.relax (); Domain_.relax ();
@ -426,12 +426,12 @@ let wait_block self =
let await (self : 'a t) : 'a = let await (self : 'a t) : 'a =
(* fast path: peek *) (* fast path: peek *)
match C.peek_exn self with match C.peek_exn self.st with
| res -> res | res -> res
| exception C.Running -> | exception C.Running ->
let trigger = Trigger.create () in let trigger = Trigger.create () in
(* suspend until the future is resolved *) (* suspend until the future is resolved *)
if C.try_attach self trigger then if C.try_attach self.st trigger then
Option.iter Exn_bt.raise @@ Trigger.await trigger; Option.iter Exn_bt.raise @@ Trigger.await trigger;
(* un-suspended: we should have a result! *) (* un-suspended: we should have a result! *)
@ -453,5 +453,5 @@ module Infix_local = Infix [@@deprecated "use Infix"]
module Private_ = struct module Private_ = struct
let[@inline] unsafe_promise_of_fut x = x let[@inline] unsafe_promise_of_fut x = x
let[@inline] as_computation self = self let[@inline] as_computation self = self.st
end end

View file

@ -1,29 +1,31 @@
(** Futures. (** Futures.
A future of type ['a t] represents the result of a computation that will A future of type ['a t] represents the result of a computation
yield a value of type ['a]. that will yield a value of type ['a].
Typically, the computation is running on a thread pool {!Runner.t} and will Typically, the computation is running on a thread pool {!Runner.t}
proceed on some worker. Once set, a future cannot change. It either succeeds and will proceed on some worker. Once set, a future cannot change.
(storing a [Ok x] with [x: 'a]), or fail (storing a [Error (exn, bt)] with It either succeeds (storing a [Ok x] with [x: 'a]), or fail
an exception and the corresponding backtrace). (storing a [Error (exn, bt)] with an exception and the corresponding
backtrace).
Combinators such as {!map} and {!join_array} can be used to produce futures Combinators such as {!map} and {!join_array} can be used to produce
from other futures (in a monadic way). Some combinators take a [on] argument futures from other futures (in a monadic way). Some combinators take
to specify a runner on which the intermediate computation takes place; for a [on] argument to specify a runner on which the intermediate computation takes
example [map ~on:pool ~f fut] maps the value in [fut] using function [f], place; for example [map ~on:pool ~f fut] maps the value in [fut]
applicatively; the call to [f] happens on the runner [pool] (once [fut] using function [f], applicatively; the call to [f] happens on
resolves successfully with a value). *) the runner [pool] (once [fut] resolves successfully with a value).
*)
type 'a or_error = ('a, Exn_bt.t) result type 'a or_error = ('a, Exn_bt.t) result
type 'a t = 'a Picos.Computation.t type 'a t
(** A future with a result of type ['a]. *) (** A future with a result of type ['a]. *)
type 'a promise = private 'a t type 'a promise = private 'a t
(** A promise, which can be fulfilled exactly once to set the corresponding (** A promise, which can be fulfilled exactly once to set
future. This is a private alias of ['a t] since 0.7, previously it was the corresponding future.
opaque. *) This is a private alias of ['a t] since NEXT_RELEASE, previously it was opaque. *)
val make : unit -> 'a t * 'a promise val make : unit -> 'a t * 'a promise
(** Make a new future with the associated promise. *) (** Make a new future with the associated promise. *)
@ -32,32 +34,33 @@ val make_promise : unit -> 'a promise
(** Same as {!make} but returns a single promise (which can be upcast to a (** Same as {!make} but returns a single promise (which can be upcast to a
future). This is useful mostly to preserve memory. future). This is useful mostly to preserve memory.
How to upcast to a future in the worst case: How to upcast to a future in the worst case:
{[ {[let prom = Fut.make_promise();;
let prom = Fut.make_promise () let fut = (prom : _ Fut.promise :> _ Fut.t) ;;
let fut = (prom : _ Fut.promise :> _ Fut.t) ]}
]} @since NEXT_RELEASE *)
@since 0.7 *)
val on_result : 'a t -> ('a or_error -> unit) -> unit val on_result : 'a t -> ('a or_error -> unit) -> unit
(** [on_result fut f] registers [f] to be called in the future when [fut] is set (** [on_result fut f] registers [f] to be called in the future
; or calls [f] immediately if [fut] is already set. *) when [fut] is set ;
or calls [f] immediately if [fut] is already set. *)
val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
(** [on_result_ignore fut f] registers [f] to be called in the future when [fut] (** [on_result_ignore fut f] registers [f] to be called in the future
is set; or calls [f] immediately if [fut] is already set. It does not pass when [fut] is set;
the result, only a success/error signal. or calls [f] immediately if [fut] is already set.
It does not pass the result, only a success/error signal.
@since 0.7 *) @since 0.7 *)
exception Already_fulfilled exception Already_fulfilled
val fulfill : 'a promise -> 'a or_error -> unit val fulfill : 'a promise -> 'a or_error -> unit
(** Fullfill the promise, setting the future at the same time. (** Fullfill the promise, setting the future at the same time.
@raise Already_fulfilled if the promise is already fulfilled. *) @raise Already_fulfilled if the promise is already fulfilled. *)
val fulfill_idempotent : 'a promise -> 'a or_error -> unit val fulfill_idempotent : 'a promise -> 'a or_error -> unit
(** Fullfill the promise, setting the future at the same time. Does nothing if (** Fullfill the promise, setting the future at the same time.
the promise is already fulfilled. *) Does nothing if the promise is already fulfilled. *)
val return : 'a -> 'a t val return : 'a -> 'a t
(** Already settled future, with a result *) (** Already settled future, with a result *)
@ -75,22 +78,22 @@ val is_resolved : _ t -> bool
(** [is_resolved fut] is [true] iff [fut] is resolved. *) (** [is_resolved fut] is [true] iff [fut] is resolved. *)
val peek : 'a t -> 'a or_error option val peek : 'a t -> 'a or_error option
(** [peek fut] returns [Some r] if [fut] is currently resolved with [r], and (** [peek fut] returns [Some r] if [fut] is currently resolved with [r],
[None] if [fut] is not resolved yet. *) and [None] if [fut] is not resolved yet. *)
exception Not_ready exception Not_ready
(** @since 0.2 *) (** @since 0.2 *)
val get_or_fail : 'a t -> 'a or_error val get_or_fail : 'a t -> 'a or_error
(** [get_or_fail fut] obtains the result from [fut] if it's fulfilled (i.e. if (** [get_or_fail fut] obtains the result from [fut] if it's fulfilled
[peek fut] returns [Some res], [get_or_fail fut] returns [res]). (i.e. if [peek fut] returns [Some res], [get_or_fail fut] returns [res]).
@raise Not_ready if the future is not ready. @raise Not_ready if the future is not ready.
@since 0.2 *) @since 0.2 *)
val get_or_fail_exn : 'a t -> 'a val get_or_fail_exn : 'a t -> 'a
(** [get_or_fail_exn fut] obtains the result from [fut] if it's fulfilled, like (** [get_or_fail_exn fut] obtains the result from [fut] if it's fulfilled,
{!get_or_fail}. If the result is an [Error _], the exception inside is like {!get_or_fail}. If the result is an [Error _], the exception inside
re-raised. is re-raised.
@raise Not_ready if the future is not ready. @raise Not_ready if the future is not ready.
@since 0.2 *) @since 0.2 *)
@ -113,12 +116,12 @@ val raise_if_failed : _ t -> unit
(** {2 Combinators} *) (** {2 Combinators} *)
val spawn : on:Runner.t -> (unit -> 'a) -> 'a t val spawn : on:Runner.t -> (unit -> 'a) -> 'a t
(** [spaw ~on f] runs [f()] on the given runner [on], and return a future that (** [spaw ~on f] runs [f()] on the given runner [on], and return a future that will
will hold its result. *) hold its result. *)
val spawn_on_current_runner : (unit -> 'a) -> 'a t val spawn_on_current_runner : (unit -> 'a) -> 'a t
(** This must be run from inside a runner, and schedules the new task on it as (** This must be run from inside a runner, and schedules
well. the new task on it as well.
See {!Runner.get_current_runner} to see how the runner is found. See {!Runner.get_current_runner} to see how the runner is found.
@ -126,26 +129,28 @@ val spawn_on_current_runner : (unit -> 'a) -> 'a t
@raise Failure if run from outside a runner. *) @raise Failure if run from outside a runner. *)
val reify_error : 'a t -> 'a or_error t val reify_error : 'a t -> 'a or_error t
(** [reify_error fut] turns a failing future into a non-failing one that contain (** [reify_error fut] turns a failing future into a non-failing
[Error (exn, bt)]. A non-failing future returning [x] is turned into [Ok x] one that contain [Error (exn, bt)]. A non-failing future
returning [x] is turned into [Ok x]
@since 0.4 *) @since 0.4 *)
val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t
(** [map ?on ~f fut] returns a new future [fut2] that resolves with [f x] if (** [map ?on ~f fut] returns a new future [fut2] that resolves
[fut] resolved with [x]; and fails with [e] if [fut] fails with [e] or [f x] with [f x] if [fut] resolved with [x];
raises [e]. and fails with [e] if [fut] fails with [e] or [f x] raises [e].
@param on if provided, [f] runs on the given runner *) @param on if provided, [f] runs on the given runner *)
val bind : ?on:Runner.t -> f:('a -> 'b t) -> 'a t -> 'b t val bind : ?on:Runner.t -> f:('a -> 'b t) -> 'a t -> 'b t
(** [bind ?on ~f fut] returns a new future [fut2] that resolves like the future (** [bind ?on ~f fut] returns a new future [fut2] that resolves
[f x] if [fut] resolved with [x]; and fails with [e] if [fut] fails with [e] like the future [f x] if [fut] resolved with [x];
or [f x] raises [e]. and fails with [e] if [fut] fails with [e] or [f x] raises [e].
@param on if provided, [f] runs on the given runner *) @param on if provided, [f] runs on the given runner *)
val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t
(** [bind_reify_error ?on ~f fut] returns a new future [fut2] that resolves like (** [bind_reify_error ?on ~f fut] returns a new future [fut2] that resolves
the future [f (Ok x)] if [fut] resolved with [x]; and resolves like the like the future [f (Ok x)] if [fut] resolved with [x];
future [f (Error (exn, bt))] if [fut] fails with [exn] and backtrace [bt]. and resolves like the future [f (Error (exn, bt))]
if [fut] fails with [exn] and backtrace [bt].
@param on if provided, [f] runs on the given runner @param on if provided, [f] runs on the given runner
@since 0.4 *) @since 0.4 *)
@ -154,18 +159,18 @@ val join : 'a t t -> 'a t
@since 0.2 *) @since 0.2 *)
val both : 'a t -> 'b t -> ('a * 'b) t val both : 'a t -> 'b t -> ('a * 'b) t
(** [both a b] succeeds with [x, y] if [a] succeeds with [x] and [b] succeeds (** [both a b] succeeds with [x, y] if [a] succeeds with [x] and
with [y], or fails if any of them fails. *) [b] succeeds with [y], or fails if any of them fails. *)
val choose : 'a t -> 'b t -> ('a, 'b) Either.t t val choose : 'a t -> 'b t -> ('a, 'b) Either.t t
(** [choose a b] succeeds [Left x] or [Right y] if [a] succeeds with [x] or [b] (** [choose a b] succeeds [Left x] or [Right y] if [a] succeeds with [x] or
succeeds with [y], or fails if both of them fails. If they both succeed, it [b] succeeds with [y], or fails if both of them fails.
is not specified which result is used. *) If they both succeed, it is not specified which result is used. *)
val choose_same : 'a t -> 'a t -> 'a t val choose_same : 'a t -> 'a t -> 'a t
(** [choose_same a b] succeeds with the value of one of [a] or [b] if they (** [choose_same a b] succeeds with the value of one of [a] or [b] if
succeed, or fails if both fail. If they both succeed, it is not specified they succeed, or fails if both fail.
which result is used. *) If they both succeed, it is not specified which result is used. *)
val join_array : 'a t array -> 'a array t val join_array : 'a t array -> 'a array t
(** Wait for all the futures in the array. Fails if any future fails. *) (** Wait for all the futures in the array. Fails if any future fails. *)
@ -180,20 +185,20 @@ module Advanced : sig
aggregate_results:(('a t -> 'a) -> 'cont -> 'res) -> aggregate_results:(('a t -> 'a) -> 'cont -> 'res) ->
'cont -> 'cont ->
'res t 'res t
(** [barrier_on_abstract_container_of_futures ~iter ~aggregate_results ~len (** [barrier_on_abstract_container_of_futures ~iter ~aggregate_results ~len cont] takes a
cont] takes a container of futures ([cont]), with [len] elements, and container of futures ([cont]), with [len] elements,
returns a future result of type [res] (possibly another type of and returns a future result of type [res]
container). (possibly another type of container).
This waits for all futures in [cont: 'cont] to be done (futures obtained This waits for all futures in [cont: 'cont] to be done
via [iter <some function> cont]). If they all succeed, their results are (futures obtained via [iter <some function> cont]). If they
aggregated into a new result of type ['res] via all succeed, their results are aggregated into a new
[aggregate_results <some function> cont]. result of type ['res] via [aggregate_results <some function> cont].
{b NOTE}: the behavior is not specified if [iter f cont] (for a function {b NOTE}: the behavior is not specified if [iter f cont] (for a function f)
f) doesn't call [f] on exactly [len cont] elements. doesn't call [f] on exactly [len cont] elements.
@since 0.5.1 *) @since 0.5.1 *)
end end
val map_list : f:('a -> 'b t) -> 'a list -> 'b list t val map_list : f:('a -> 'b t) -> 'a list -> 'b list t
@ -201,22 +206,23 @@ val map_list : f:('a -> 'b t) -> 'a list -> 'b list t
@since 0.5.1 *) @since 0.5.1 *)
val wait_array : _ t array -> unit t val wait_array : _ t array -> unit t
(** [wait_array arr] waits for all futures in [arr] to resolve. It discards the (** [wait_array arr] waits for all futures in [arr] to resolve. It discards
individual results of futures in [arr]. It fails if any future fails. *) the individual results of futures in [arr]. It fails if any future fails. *)
val wait_list : _ t list -> unit t val wait_list : _ t list -> unit t
(** [wait_list l] waits for all futures in [l] to resolve. It discards the (** [wait_list l] waits for all futures in [l] to resolve. It discards
individual results of futures in [l]. It fails if any future fails. *) the individual results of futures in [l]. It fails if any future fails. *)
val for_ : on:Runner.t -> int -> (int -> unit) -> unit t val for_ : on:Runner.t -> int -> (int -> unit) -> unit t
(** [for_ ~on n f] runs [f 0], [f 1], …, [f (n-1)] on the runner, and returns a (** [for_ ~on n f] runs [f 0], [f 1], …, [f (n-1)] on the runner, and returns
future that resolves when all the tasks have resolved, or fails as soon as a future that resolves when all the tasks have resolved, or fails
one task has failed. *) as soon as one task has failed. *)
val for_array : on:Runner.t -> 'a array -> (int -> 'a -> unit) -> unit t val for_array : on:Runner.t -> 'a array -> (int -> 'a -> unit) -> unit t
(** [for_array ~on arr f] runs [f 0 arr.(0)], …, [f (n-1) arr.(n-1)] in the (** [for_array ~on arr f] runs [f 0 arr.(0)], …, [f (n-1) arr.(n-1)] in
runner (where [n = Array.length arr]), and returns a future that resolves the runner (where [n = Array.length arr]), and returns a future
when all the tasks are done, or fails if any of them fails. that resolves when all the tasks are done,
or fails if any of them fails.
@since 0.2 *) @since 0.2 *)
val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t
@ -236,39 +242,43 @@ val await : 'a t -> 'a
@since 0.3 @since 0.3
This must only be run from inside the runner itself. The runner must support This must only be run from inside the runner itself. The runner must
{!Suspend_}. {b NOTE}: only on OCaml 5.x *) support {!Suspend_}.
{b NOTE}: only on OCaml 5.x
*)
[@@@endif] [@@@endif]
(** {2 Blocking} *) (** {2 Blocking} *)
val wait_block : 'a t -> 'a or_error val wait_block : 'a t -> 'a or_error
(** [wait_block fut] blocks the current thread until [fut] is resolved, and (** [wait_block fut] blocks the current thread until [fut] is resolved,
returns its value. and returns its value.
{b NOTE}: A word of warning: this will monopolize the calling thread until {b NOTE}: A word of warning: this will monopolize the calling thread until the future
the future resolves. This can also easily cause deadlocks, if enough threads resolves. This can also easily cause deadlocks, if enough threads in a pool
in a pool call [wait_block] on futures running on the same pool or a pool call [wait_block] on futures running on the same pool or a pool depending on it.
depending on it.
A good rule to avoid deadlocks is to run this from outside of any pool, or A good rule to avoid deadlocks is to run this from outside of any pool,
to have an acyclic order between pools where [wait_block] is only called or to have an acyclic order between pools where [wait_block]
from a pool on futures evaluated in a pool that comes lower in the is only called from a pool on futures evaluated in a pool that comes lower
hierarchy. If this rule is broken, it is possible for all threads in a pool in the hierarchy.
to wait for futures that can only make progress on these same threads, hence If this rule is broken, it is possible for all threads in a pool to wait
the deadlock. *) for futures that can only make progress on these same threads,
hence the deadlock.
*)
val wait_block_exn : 'a t -> 'a val wait_block_exn : 'a t -> 'a
(** Same as {!wait_block} but re-raises the exception if the future failed. *) (** Same as {!wait_block} but re-raises the exception if the future failed. *)
(** {2 Infix operators} (** {2 Infix operators}
These combinators run on either the current pool (if present), or on the These combinators run on either the current pool (if present),
same thread that just fulfilled the previous future if not. or on the same thread that just fulfilled the previous future
if not.
They were previously present as [module Infix_local] and [val infix], but They were previously present as [module Infix_local] and [val infix],
are now simplified. but are now simplified.
@since 0.5 *) @since 0.5 *)

View file

@ -42,8 +42,9 @@ let[@inline] remove_in_local_hmap (k : _ Hmap.key) : unit =
let[@inline] set_in_local_hmap (k : 'a Hmap.key) (v : 'a) : unit = let[@inline] set_in_local_hmap (k : 'a Hmap.key) (v : 'a) : unit =
update_local_hmap (Hmap.add k v) update_local_hmap (Hmap.add k v)
(** [with_in_local_hmap k v f] calls [f()] in a context where [k] is bound to (** [with_in_local_hmap k v f] calls [f()] in a context
[v] in the local hmap. Then it restores the previous binding for [k]. *) where [k] is bound to [v] in the local hmap. Then it restores the
previous binding for [k]. *)
let with_in_local_hmap (k : 'a Hmap.key) (v : 'a) f = let with_in_local_hmap (k : 'a Hmap.key) (v : 'a) f =
let h = get_local_hmap () in let h = get_local_hmap () in
match Hmap.find k h with match Hmap.find k h with

View file

@ -1,8 +1,8 @@
(** Mutex-protected resource. (** Mutex-protected resource.
This lock is a synchronous concurrency primitive, as a thin wrapper around This lock is a synchronous concurrency primitive, as a thin wrapper
{!Mutex} that encourages proper management of the critical section in RAII around {!Mutex} that encourages proper management of the critical
style: section in RAII style:
{[ {[
let (let@) = (@@) let (let@) = (@@)
@ -19,8 +19,8 @@
]} ]}
This lock does not work well with {!Fut.await}. A critical section that This lock does not work well with {!Fut.await}. A critical section
contains a call to [await] might cause deadlocks, or lock starvation, that contains a call to [await] might cause deadlocks, or lock starvation,
because it will hold onto the lock while it goes to sleep. because it will hold onto the lock while it goes to sleep.
@since 0.3 *) @since 0.3 *)
@ -32,27 +32,27 @@ val create : 'a -> 'a t
(** Create a new protected value. *) (** Create a new protected value. *)
val with_ : 'a t -> ('a -> 'b) -> 'b val with_ : 'a t -> ('a -> 'b) -> 'b
(** [with_ l f] runs [f x] where [x] is the value protected with the lock [l], (** [with_ l f] runs [f x] where [x] is the value protected with
in a critical section. If [f x] fails, [with_lock l f] fails too but the the lock [l], in a critical section. If [f x] fails, [with_lock l f]
lock is released. *) fails too but the lock is released. *)
val update : 'a t -> ('a -> 'a) -> unit val update : 'a t -> ('a -> 'a) -> unit
(** [update l f] replaces the content [x] of [l] with [f x], while protected by (** [update l f] replaces the content [x] of [l] with [f x], while protected
the mutex. *) by the mutex. *)
val update_map : 'a t -> ('a -> 'a * 'b) -> 'b val update_map : 'a t -> ('a -> 'a * 'b) -> 'b
(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l] and (** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l]
returns [y], while protected by the mutex. *) and returns [y], while protected by the mutex. *)
val mutex : _ t -> Mutex.t val mutex : _ t -> Mutex.t
(** Underlying mutex. *) (** Underlying mutex. *)
val get : 'a t -> 'a val get : 'a t -> 'a
(** Atomically get the value in the lock. The value that is returned isn't (** Atomically get the value in the lock. The value that is returned
protected! *) isn't protected! *)
val set : 'a t -> 'a -> unit val set : 'a t -> 'a -> unit
(** Atomically set the value. (** Atomically set the value.
{b NOTE} caution: using {!get} and {!set} as if this were a {!ref} is an {b NOTE} caution: using {!get} and {!set} as if this were a {!ref}
anti pattern and will not protect data against some race conditions. *) is an anti pattern and will not protect data against some race conditions. *)

View file

@ -16,7 +16,6 @@ let spawn_on_current_runner = Fut.spawn_on_current_runner
[@@@ifge 5.0] [@@@ifge 5.0]
let await = Fut.await let await = Fut.await
let yield = Picos.Fiber.yield
[@@@endif] [@@@endif]

View file

@ -1,12 +1,13 @@
(** Moonpool (** Moonpool
A pool within a bigger pool (ie the ocean). Here, we're talking about pools A pool within a bigger pool (ie the ocean). Here, we're talking about
of [Thread.t] that are dispatched over several [Domain.t] to enable pools of [Thread.t] that are dispatched over several [Domain.t] to
parallelism. enable parallelism.
We provide several implementations of pools with distinct scheduling We provide several implementations of pools
strategies, alongside some concurrency primitives such as guarding locks with distinct scheduling strategies, alongside some concurrency
({!Lock.t}) and futures ({!Fut.t}). *) primitives such as guarding locks ({!Lock.t}) and futures ({!Fut.t}).
*)
module Ws_pool = Ws_pool module Ws_pool = Ws_pool
module Fifo_pool = Fifo_pool module Fifo_pool = Fifo_pool
@ -23,45 +24,45 @@ module Immediate_runner : sig end
module Exn_bt = Exn_bt module Exn_bt = Exn_bt
exception Shutdown exception Shutdown
(** Exception raised when trying to run tasks on runners that have been shut (** Exception raised when trying to run tasks on
down. runners that have been shut down.
@since 0.6 *) @since 0.6 *)
val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t
(** Similar to {!Thread.create}, but it picks a background domain at random to (** Similar to {!Thread.create}, but it picks a background domain at random
run the thread. This ensures that we don't always pick the same domain to to run the thread. This ensures that we don't always pick the same domain
run all the various threads needed in an application (timers, event loops, to run all the various threads needed in an application (timers, event loops, etc.) *)
etc.) *)
val run_async : ?fiber:Picos.Fiber.t -> Runner.t -> (unit -> unit) -> unit val run_async : ?fiber:Picos.Fiber.t -> Runner.t -> (unit -> unit) -> unit
(** [run_async runner task] schedules the task to run on the given runner. This (** [run_async runner task] schedules the task to run
means [task()] will be executed at some point in the future, possibly in on the given runner. This means [task()] will be executed
another thread. at some point in the future, possibly in another thread.
@param fiber optional initial (picos) fiber state @param fiber optional initial (picos) fiber state
@since 0.5 *) @since 0.5 *)
val run_wait_block : ?fiber:Picos.Fiber.t -> Runner.t -> (unit -> 'a) -> 'a val run_wait_block : ?fiber:Picos.Fiber.t -> Runner.t -> (unit -> 'a) -> 'a
(** [run_wait_block runner f] schedules [f] for later execution on the runner, (** [run_wait_block runner f] schedules [f] for later execution
like {!run_async}. It then blocks the current thread until [f()] is done on the runner, like {!run_async}.
executing, and returns its result. If [f()] raises an exception, then It then blocks the current thread until [f()] is done executing,
[run_wait_block pool f] will raise it as well. and returns its result. If [f()] raises an exception, then [run_wait_block pool f]
will raise it as well.
See {!run_async} for more details. See {!run_async} for more details.
{b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block} about the {b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block}
required discipline to avoid deadlocks). about the required discipline to avoid deadlocks).
@raise Shutdown if the runner was already shut down @raise Shutdown if the runner was already shut down
@since 0.6 *) @since 0.6 *)
val recommended_thread_count : unit -> int val recommended_thread_count : unit -> int
(** Number of threads recommended to saturate the CPU. For IO pools this makes (** Number of threads recommended to saturate the CPU.
little sense (you might want more threads than this because many of them For IO pools this makes little sense (you might want more threads than
will be blocked most of the time). this because many of them will be blocked most of the time).
@since 0.5 *) @since 0.5 *)
val spawn : on:Runner.t -> (unit -> 'a) -> 'a Fut.t val spawn : on:Runner.t -> (unit -> 'a) -> 'a Fut.t
(** [spawn ~on f] runs [f()] on the runner (a thread pool typically) and returns (** [spawn ~on f] runs [f()] on the runner (a thread pool typically)
a future result for it. See {!Fut.spawn}. and returns a future result for it. See {!Fut.spawn}.
@since 0.5 *) @since 0.5 *)
val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.t val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.t
@ -70,20 +71,15 @@ val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.t
val get_current_runner : unit -> Runner.t option val get_current_runner : unit -> Runner.t option
(** See {!Runner.get_current_runner} (** See {!Runner.get_current_runner}
@since 0.7 *) @since 0.7 *)
[@@@ifge 5.0] [@@@ifge 5.0]
val await : 'a Fut.t -> 'a val await : 'a Fut.t -> 'a
(** Await a future, must be run on a moonpool runner. See {!Fut.await}. Only on (** Await a future. See {!Fut.await}.
OCaml >= 5.0. Only on OCaml >= 5.0.
@since 0.5 *) @since 0.5 *)
val yield : unit -> unit
(** Yield from the current task, must be run on a moonpool runner. Only on OCaml
>= 5.0.
@since NEXT_RELEASE *)
[@@@endif] [@@@endif]
module Lock = Lock module Lock = Lock
@ -94,33 +90,35 @@ module Thread_local_storage = Thread_local_storage
(** A simple blocking queue. (** A simple blocking queue.
This queue is quite basic and will not behave well under heavy contention. This queue is quite basic and will not behave well under heavy
However, it can be sufficient for many practical use cases. contention. However, it can be sufficient for many practical use cases.
{b NOTE}: this queue will typically block the caller thread in case the {b NOTE}: this queue will typically block the caller thread
operation (push/pop) cannot proceed. Be wary of deadlocks when using the in case the operation (push/pop) cannot proceed.
queue {i from} a pool when you expect the other end to also be Be wary of deadlocks when using the queue {i from} a pool
produced/consumed from the same pool. when you expect the other end to also be produced/consumed from
the same pool.
See discussion on {!Fut.wait_block} for more details on deadlocks and how to See discussion on {!Fut.wait_block} for more details on deadlocks
mitigate the risk of running into them. and how to mitigate the risk of running into them.
More scalable queues can be found in Lockfree More scalable queues can be found in
(https://github.com/ocaml-multicore/lockfree/) *) Lockfree (https://github.com/ocaml-multicore/lockfree/)
*)
module Blocking_queue : sig module Blocking_queue : sig
type 'a t type 'a t
(** Unbounded blocking queue. (** Unbounded blocking queue.
This queue is thread-safe and will block when calling {!pop} on it when This queue is thread-safe and will block when calling {!pop}
it's empty. *) on it when it's empty. *)
val create : unit -> _ t val create : unit -> _ t
(** Create a new unbounded queue. *) (** Create a new unbounded queue. *)
val size : _ t -> int val size : _ t -> int
(** Number of items currently in the queue. Note that [pop] might still block (** Number of items currently in the queue. Note that [pop]
if this returns a non-zero number, since another thread might have might still block if this returns a non-zero number, since another
consumed the items in the mean time. thread might have consumed the items in the mean time.
@since 0.2 *) @since 0.2 *)
exception Closed exception Closed
@ -128,70 +126,73 @@ module Blocking_queue : sig
val push : 'a t -> 'a -> unit val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] into [q], and returns [()]. (** [push q x] pushes [x] into [q], and returns [()].
In the current implementation, [push q] will never block for a long time, In the current implementation, [push q] will never block for
it will only block while waiting for a lock so it can push the element. a long time, it will only block while waiting for a lock
so it can push the element.
@raise Closed if the queue is closed (by a previous call to [close q]) *) @raise Closed if the queue is closed (by a previous call to [close q]) *)
val pop : 'a t -> 'a val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element (** [pop q] pops the next element in [q]. It might block until an element comes.
comes. @raise Closed if the queue was closed before a new element was available. *)
@raise Closed if the queue was closed before a new element was available.
*)
val close : _ t -> unit val close : _ t -> unit
(** Close the queue, meaning there won't be any more [push] allowed, ie [push] (** Close the queue, meaning there won't be any more [push] allowed,
will raise {!Closed}. ie [push] will raise {!Closed}.
[pop] will keep working and will return the elements present in the queue, [pop] will keep working and will return the elements present in the
until it's entirely drained; then [pop] will also raise {!Closed}. *) queue, until it's entirely drained; then [pop] will
also raise {!Closed}. *)
val try_pop : force_lock:bool -> 'a t -> 'a option val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop q] immediately pops the first element of [q], if any, or returns (** [try_pop q] immediately pops the first element of [q], if any,
[None] without blocking. or returns [None] without blocking.
@param force_lock @param force_lock if true, use {!Mutex.lock} (which can block under contention);
if true, use {!Mutex.lock} (which can block under contention); if false, if false, use {!Mutex.try_lock}, which might return [None] even in
use {!Mutex.try_lock}, which might return [None] even in presence of an presence of an element if there's contention *)
element if there's contention *)
val try_push : 'a t -> 'a -> bool val try_push : 'a t -> 'a -> bool
(** [try_push q x] tries to push into [q], in which case it returns [true]; or (** [try_push q x] tries to push into [q], in which case
it fails to push and returns [false] without blocking. it returns [true]; or it fails to push and returns [false]
@raise Closed if the locking succeeded but the queue is closed. *) without blocking.
@raise Closed if the locking succeeded but the queue is closed.
*)
val transfer : 'a t -> 'a Queue.t -> unit val transfer : 'a t -> 'a Queue.t -> unit
(** [transfer bq q2] transfers all items presently in [bq] into [q2] in one (** [transfer bq q2] transfers all items presently
atomic section, and clears [bq]. It blocks if no element is in [bq]. in [bq] into [q2] in one atomic section, and clears [bq].
It blocks if no element is in [bq].
This is useful to consume elements from the queue in batch. Create a This is useful to consume elements from the queue in batch.
[Queue.t] locally: Create a [Queue.t] locally:
{[
let dowork (work_queue : job Bb_queue.t) =
(* local queue, not thread safe *)
let local_q = Queue.create () in
try
while true do
(* work on local events, already on this thread *)
while not (Queue.is_empty local_q) do
let job = Queue.pop local_q in
process_job job
done;
(* get all the events in the incoming blocking queue, in {[
one single critical section. *) let dowork (work_queue: job Bb_queue.t) =
Bb_queue.transfer work_queue local_q (* local queue, not thread safe *)
done let local_q = Queue.create() in
with Bb_queue.Closed -> () try
]} while true do
(* work on local events, already on this thread *)
while not (Queue.is_empty local_q) do
let job = Queue.pop local_q in
process_job job
done;
@since 0.4 *) (* get all the events in the incoming blocking queue, in
one single critical section. *)
Bb_queue.transfer work_queue local_q
done
with Bb_queue.Closed -> ()
]}
@since 0.4 *)
type 'a gen = unit -> 'a option type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit type 'a iter = ('a -> unit) -> unit
val to_iter : 'a t -> 'a iter val to_iter : 'a t -> 'a iter
(** [to_iter q] returns an iterator over all items in the queue. This might (** [to_iter q] returns an iterator over all items in the queue.
not terminate if [q] is never closed. This might not terminate if [q] is never closed.
@since 0.4 *) @since 0.4 *)
val to_gen : 'a t -> 'a gen val to_gen : 'a t -> 'a gen
@ -208,8 +209,8 @@ module Bounded_queue = Bounded_queue
module Atomic = Atomic_ module Atomic = Atomic_
(** Atomic values. (** Atomic values.
This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic] This is either a shim using [ref], on pre-OCaml 5, or the
module on OCaml 5. *) standard [Atomic] module on OCaml 5. *)
(**/**) (**/**)
@ -219,9 +220,9 @@ module Private : sig
(** A deque for work stealing, fixed size. *) (** A deque for work stealing, fixed size. *)
module Worker_loop_ = Worker_loop_ module Worker_loop_ = Worker_loop_
(** Worker loop. This is useful to implement custom runners, it should run on (** Worker loop. This is useful to implement custom runners, it
each thread of the runner. should run on each thread of the runner.
@since 0.7 *) @since 0.7 *)
module Domain_ = Domain_ module Domain_ = Domain_
(** Utils for domains *) (** Utils for domains *)

View file

@ -1,8 +1,9 @@
(** Interface for runners. (** Interface for runners.
This provides an abstraction for running tasks in the background, which is This provides an abstraction for running tasks in the background,
implemented by various thread pools. which is implemented by various thread pools.
@since 0.3 *) @since 0.3
*)
type fiber = Picos.Fiber.t type fiber = Picos.Fiber.t
type task = unit -> unit type task = unit -> unit
@ -11,19 +12,19 @@ type t
(** A runner. (** A runner.
If a runner is no longer needed, {!shutdown} can be used to signal all If a runner is no longer needed, {!shutdown} can be used to signal all
worker threads in it to stop (after they finish their work), and wait for worker threads
them to stop. in it to stop (after they finish their work), and wait for them to stop.
The threads are distributed across a fixed domain pool (whose size is The threads are distributed across a fixed domain pool
determined by {!Domain.recommended_domain_count} on OCaml 5, and simple the (whose size is determined by {!Domain.recommended_domain_count} on OCaml 5, and
single runtime on OCaml 4). *) simple the single runtime on OCaml 4). *)
val size : t -> int val size : t -> int
(** Number of threads/workers. *) (** Number of threads/workers. *)
val num_tasks : t -> int val num_tasks : t -> int
(** Current number of tasks. This is at best a snapshot, useful for metrics and (** Current number of tasks. This is at best a snapshot, useful for metrics
debugging. *) and debugging. *)
val shutdown : t -> unit val shutdown : t -> unit
(** Shutdown the runner and wait for it to terminate. Idempotent. *) (** Shutdown the runner and wait for it to terminate. Idempotent. *)
@ -34,31 +35,32 @@ val shutdown_without_waiting : t -> unit
exception Shutdown exception Shutdown
val run_async : ?fiber:fiber -> t -> task -> unit val run_async : ?fiber:fiber -> t -> task -> unit
(** [run_async pool f] schedules [f] for later execution on the runner in one of (** [run_async pool f] schedules [f] for later execution on the runner
the threads. [f()] will run on one of the runner's worker threads/domains. in one of the threads. [f()] will run on one of the runner's
worker threads/domains.
@param fiber if provided, run the task with this initial fiber data @param fiber if provided, run the task with this initial fiber data
@raise Shutdown if the runner was shut down before [run_async] was called. @raise Shutdown if the runner was shut down before [run_async] was called. *)
*)
val run_wait_block : ?fiber:fiber -> t -> (unit -> 'a) -> 'a val run_wait_block : ?fiber:fiber -> t -> (unit -> 'a) -> 'a
(** [run_wait_block pool f] schedules [f] for later execution on the pool, like (** [run_wait_block pool f] schedules [f] for later execution
{!run_async}. It then blocks the current thread until [f()] is done on the pool, like {!run_async}.
executing, and returns its result. If [f()] raises an exception, then It then blocks the current thread until [f()] is done executing,
[run_wait_block pool f] will raise it as well. and returns its result. If [f()] raises an exception, then [run_wait_block pool f]
will raise it as well.
{b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block} about the {b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block}
required discipline to avoid deadlocks). about the required discipline to avoid deadlocks).
@raise Shutdown if the runner was already shut down *) @raise Shutdown if the runner was already shut down *)
val dummy : t val dummy : t
(** Runner that fails when scheduling tasks on it. Calling {!run_async} on it (** Runner that fails when scheduling tasks on it.
will raise Failure. Calling {!run_async} on it will raise Failure.
@since 0.6 *) @since 0.6 *)
(** {2 Implementing runners} *) (** {2 Implementing runners} *)
(** This module is specifically intended for users who implement their own (** This module is specifically intended for users who implement their
runners. Regular users of Moonpool should not need to look at it. *) own runners. Regular users of Moonpool should not need to look at it. *)
module For_runner_implementors : sig module For_runner_implementors : sig
val create : val create :
size:(unit -> int) -> size:(unit -> int) ->
@ -69,20 +71,21 @@ module For_runner_implementors : sig
t t
(** Create a new runner. (** Create a new runner.
{b NOTE}: the runner should support DLA and {!Suspend_} on OCaml 5.x, so {b NOTE}: the runner should support DLA and {!Suspend_} on OCaml 5.x,
that {!Fork_join} and other 5.x features work properly. *) so that {!Fork_join} and other 5.x features work properly. *)
val k_cur_runner : t Thread_local_storage.t val k_cur_runner : t Thread_local_storage.t
(** Key that should be used by each runner to store itself in TLS on every (** Key that should be used by each runner to store itself in TLS
thread it controls, so that tasks running on these threads can access the on every thread it controls, so that tasks running on these threads
runner. This is necessary for {!get_current_runner} to work. *) can access the runner. This is necessary for {!get_current_runner}
to work. *)
end end
val get_current_runner : unit -> t option val get_current_runner : unit -> t option
(** Access the current runner. This returns [Some r] if the call happens on a (** Access the current runner. This returns [Some r] if the call
thread that belongs in a runner. happens on a thread that belongs in a runner.
@since 0.5 *) @since 0.5 *)
val get_current_fiber : unit -> fiber option val get_current_fiber : unit -> fiber option
(** [get_current_storage runner] gets the local storage for the currently (** [get_current_storage runner] gets the local storage
running task. *) for the currently running task. *)

View file

@ -1,38 +1,41 @@
(** Task-local storage. (** Task-local storage.
This storage is associated to the current task, just like thread-local This storage is associated to the current task,
storage is associated with the current thread. The storage is carried along just like thread-local storage is associated with
in case the current task is suspended. the current thread. The storage is carried along in case
the current task is suspended.
@since 0.6 *) @since 0.6
*)
type 'a t = 'a Picos.Fiber.FLS.t type 'a t = 'a Picos.Fiber.FLS.t
val create : unit -> 'a t val create : unit -> 'a t
(** [create ()] makes a new key. Keys are expensive and should never be (** [create ()] makes a new key. Keys are expensive and
allocated dynamically or in a loop. *) should never be allocated dynamically or in a loop. *)
exception Not_set exception Not_set
val get_exn : 'a t -> 'a val get_exn : 'a t -> 'a
(** [get k] gets the value for the current task for key [k]. Must be run from (** [get k] gets the value for the current task for key [k].
inside a task running on a runner. Must be run from inside a task running on a runner.
@raise Not_set otherwise *) @raise Not_set otherwise *)
val get_opt : 'a t -> 'a option val get_opt : 'a t -> 'a option
(** [get_opt k] gets the current task's value for key [k], or [None] if not run (** [get_opt k] gets the current task's value for key [k],
from inside the task. *) or [None] if not run from inside the task. *)
val get : 'a t -> default:'a -> 'a val get : 'a t -> default:'a -> 'a
val set : 'a t -> 'a -> unit val set : 'a t -> 'a -> unit
(** [set k v] sets the storage for [k] to [v]. Must be run from inside a task (** [set k v] sets the storage for [k] to [v].
running on a runner. Must be run from inside a task running on a runner.
@raise Failure otherwise *) @raise Failure otherwise *)
val with_value : 'a t -> 'a -> (unit -> 'b) -> 'b val with_value : 'a t -> 'a -> (unit -> 'b) -> 'b
(** [with_value k v f] sets [k] to [v] for the duration of the call to [f()]. (** [with_value k v f] sets [k] to [v] for the duration of the call
When [f()] returns (or fails), [k] is restored to its old value. *) to [f()]. When [f()] returns (or fails), [k] is restored
to its old value. *)
(** {2 Local [Hmap.t]} (** {2 Local [Hmap.t]}

View file

@ -102,24 +102,7 @@ let with_handler ~ops:_ self f = f ()
[@@@endif] [@@@endif]
let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit = let worker_loop (type st) ~(ops : st ops) (self : st) : unit =
if block_signals then (
try
ignore
(Unix.sigprocmask SIG_BLOCK
[
Sys.sigterm;
Sys.sigpipe;
Sys.sigint;
Sys.sigchld;
Sys.sigalrm;
Sys.sigusr1;
Sys.sigusr2;
]
: _ list)
with _ -> ()
);
let cur_fiber : fiber ref = ref _dummy_fiber in let cur_fiber : fiber ref = ref _dummy_fiber in
let runner = ops.runner self in let runner = ops.runner self in
TLS.set Runner.For_runner_implementors.k_cur_runner runner; TLS.set Runner.For_runner_implementors.k_cur_runner runner;

View file

@ -1,37 +0,0 @@
(** Internal module that is used for workers.
A thread pool should use this [worker_loop] to run tasks, handle effects,
etc. *)
open Types_
type task_full =
| T_start of {
fiber: fiber;
f: unit -> unit;
}
| T_resume : {
fiber: fiber;
k: unit -> unit;
}
-> task_full
val _dummy_task : task_full
type around_task =
| AT_pair : (Runner.t -> 'a) * (Runner.t -> 'a -> unit) -> around_task
exception No_more_tasks
type 'st ops = {
schedule: 'st -> task_full -> unit;
get_next_task: 'st -> task_full;
get_thread_state: unit -> 'st;
around_task: 'st -> around_task;
on_exn: 'st -> Exn_bt.t -> unit;
runner: 'st -> Runner.t;
before_start: 'st -> unit;
cleanup: 'st -> unit;
}
val worker_loop : block_signals:bool -> ops:'st ops -> 'st -> unit

View file

@ -16,8 +16,7 @@ end
type state = { type state = {
id_: Id.t; id_: Id.t;
(** Unique to this pool. Used to make sure tasks stay within the same (** Unique to this pool. Used to make sure tasks stay within the same pool. *)
pool. *)
active: bool A.t; (** Becomes [false] when the pool is shutdown. *) active: bool A.t; (** Becomes [false] when the pool is shutdown. *)
mutable workers: worker_state array; (** Fixed set of workers. *) mutable workers: worker_state array; (** Fixed set of workers. *)
main_q: WL.task_full Queue.t; main_q: WL.task_full Queue.t;
@ -44,8 +43,9 @@ and worker_state = {
q: WL.task_full WSQ.t; (** Work stealing queue *) q: WL.task_full WSQ.t; (** Work stealing queue *)
rng: Random.State.t; rng: Random.State.t;
} }
(** State for a given worker. Only this worker is allowed to push into the (** State for a given worker. Only this worker is
queue, but other workers can come and steal from it if they're idle. *) allowed to push into the queue, but other workers
can come and steal from it if they're idle. *)
let[@inline] size_ (self : state) = Array.length self.workers let[@inline] size_ (self : state) = Array.length self.workers
@ -55,8 +55,9 @@ let num_tasks_ (self : state) : int =
Array.iter (fun w -> n := !n + WSQ.size w.q) self.workers; Array.iter (fun w -> n := !n + WSQ.size w.q) self.workers;
!n !n
(** TLS, used by worker to store their specific state and be able to retrieve it (** TLS, used by worker to store their specific state
from tasks when we schedule new sub-tasks. *) and be able to retrieve it from tasks when we schedule new
sub-tasks. *)
let k_worker_state : worker_state TLS.t = TLS.create () let k_worker_state : worker_state TLS.t = TLS.create ()
let[@inline] get_current_worker_ () : worker_state option = let[@inline] get_current_worker_ () : worker_state option =
@ -76,8 +77,8 @@ let[@inline] try_wake_someone_ (self : state) : unit =
Mutex.unlock self.mutex Mutex.unlock self.mutex
) )
(** Push into worker's local queue, open to work stealing. precondition: this (** Push into worker's local queue, open to work stealing.
runs on the worker thread whose state is [self] *) precondition: this runs on the worker thread whose state is [self] *)
let schedule_on_current_worker (self : worker_state) task : unit = let schedule_on_current_worker (self : worker_state) task : unit =
(* we're on this same pool, schedule in the worker's state. Otherwise (* we're on this same pool, schedule in the worker's state. Otherwise
we might also be on pool A but asking to schedule on pool B, we might also be on pool A but asking to schedule on pool B,
@ -309,9 +310,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
(* function called in domain with index [i], to (* function called in domain with index [i], to
create the thread and push it into [receive_threads] *) create the thread and push it into [receive_threads] *)
let create_thread_in_domain () = let create_thread_in_domain () =
let thread = let thread = Thread.create (WL.worker_loop ~ops:worker_ops) st in
Thread.create (WL.worker_loop ~block_signals:true ~ops:worker_ops) st
in
(* send the thread from the domain back to us *) (* send the thread from the domain back to us *)
Bb_queue.push receive_threads (idx, thread) Bb_queue.push receive_threads (idx, thread)
in in

View file

@ -1,22 +1,23 @@
(** Work-stealing thread pool. (** Work-stealing thread pool.
A pool of threads with a worker-stealing scheduler. The pool contains a A pool of threads with a worker-stealing scheduler.
fixed number of threads that wait for work items to come, process these, and The pool contains a fixed number of threads that wait for work
loop. items to come, process these, and loop.
This is good for CPU-intensive tasks that feature a lot of small tasks. Note This is good for CPU-intensive tasks that feature a lot of small tasks.
that tasks will not always be processed in the order they are scheduled, so Note that tasks will not always be processed in the order they are
this is not great for workloads where the latency of individual tasks matter scheduled, so this is not great for workloads where the latency
(for that see {!Fifo_pool}). of individual tasks matter (for that see {!Fifo_pool}).
This implements {!Runner.t} since 0.3. This implements {!Runner.t} since 0.3.
If a pool is no longer needed, {!shutdown} can be used to signal all threads If a pool is no longer needed, {!shutdown} can be used to signal all threads
in it to stop (after they finish their work), and wait for them to stop. in it to stop (after they finish their work), and wait for them to stop.
The threads are distributed across a fixed domain pool (whose size is The threads are distributed across a fixed domain pool
determined by {!Domain.recommended_domain_count} on OCaml 5, and simply the (whose size is determined by {!Domain.recommended_domain_count} on OCaml 5,
single runtime on OCaml 4). *) and simply the single runtime on OCaml 4).
*)
include module type of Runner include module type of Runner
@ -32,26 +33,25 @@ type ('a, 'b) create_args =
val create : (unit -> t, _) create_args val create : (unit -> t, _) create_args
(** [create ()] makes a new thread pool. (** [create ()] makes a new thread pool.
@param on_init_thread @param on_init_thread called at the beginning of each new thread
called at the beginning of each new thread in the pool. in the pool.
@param num_threads @param num_threads size of the pool, ie. number of worker threads.
size of the pool, ie. number of worker threads. It will be at least [1] It will be at least [1] internally, so [0] or negative values make no sense.
internally, so [0] or negative values make no sense. The default is The default is [Domain.recommended_domain_count()], ie one worker
[Domain.recommended_domain_count()], ie one worker thread per CPU core. On thread per CPU core.
OCaml 4 the default is [4] (since there is only one domain). On OCaml 4 the default is [4] (since there is only one domain).
@param on_exit_thread called at the end of each thread in the pool @param on_exit_thread called at the end of each thread in the pool
@param around_task @param around_task a pair of [before, after], where [before pool] is called
a pair of [before, after], where [before pool] is called before a task is before a task is processed,
processed, on the worker thread about to run it, and returns [x]; and on the worker thread about to run it, and returns [x]; and [after pool x] is called by
[after pool x] is called by the same thread after the task is over. (since the same thread after the task is over. (since 0.2)
0.2) @param name a name for this thread pool, used if tracing is enabled (since 0.6)
@param name *)
a name for this thread pool, used if tracing is enabled (since 0.6) *)
val with_ : (unit -> (t -> 'a) -> 'a, _) create_args val with_ : (unit -> (t -> 'a) -> 'a, _) create_args
(** [with_ () f] calls [f pool], where [pool] is obtained via {!create}. When (** [with_ () f] calls [f pool], where [pool] is obtained via {!create}.
[f pool] returns or fails, [pool] is shutdown and its resources are When [f pool] returns or fails, [pool] is shutdown and its resources
released. are released.
Most parameters are the same as in {!create}. Most parameters are the same as in {!create}.
@since 0.3 *) @since 0.3 *)

View file

@ -76,24 +76,25 @@ type worker_state = {
(** Array of (optional) workers. (** Array of (optional) workers.
Workers are started/stop on demand. For each index we have the (currently Workers are started/stop on demand. For each index we have
active) domain's state including a work queue and a thread refcount; and the the (currently active) domain's state
domain itself, if any, in a separate option because it might outlive its own including a work queue and a thread refcount; and the domain itself,
state. *) if any, in a separate option because it might outlive its own state. *)
let domains_ : (worker_state option * Domain_.t option) Lock.t array = let domains_ : (worker_state option * Domain_.t option) Lock.t array =
let n = max 1 (Domain_.recommended_number ()) in let n = max 1 (Domain_.recommended_number ()) in
Array.init n (fun _ -> Lock.create (None, None)) Array.init n (fun _ -> Lock.create (None, None))
(** main work loop for a domain worker. (** main work loop for a domain worker.
A domain worker does two things: A domain worker does two things:
- run functions it's asked to (mainly, to start new threads inside it) - run functions it's asked to (mainly, to start new threads inside it)
- decrease the refcount when one of these threads stops. The thread will - decrease the refcount when one of these threads stops. The thread
notify the domain that it's exiting, so the domain can know how many will notify the domain that it's exiting, so the domain can know
threads are still using it. If all threads exit, the domain polls a bit how many threads are still using it. If all threads exit, the domain
(in case new threads are created really shortly after, which happens with polls a bit (in case new threads are created really shortly after,
a [Pool.with_] or [Pool.create() Pool.shutdown()] in a tight loop), and which happens with a [Pool.with_] or [Pool.create() Pool.shutdown()]
if nothing happens it tries to stop to free resources. *) in a tight loop), and if nothing happens it tries to stop to free resources.
*)
let work_ idx (st : worker_state) : unit = let work_ idx (st : worker_state) : unit =
let main_loop () = let main_loop () =
let continue = ref true in let continue = ref true in

View file

@ -1,17 +1,18 @@
(** Static pool of domains. (** Static pool of domains.
These domains are shared between {b all} the pools in moonpool. The These domains are shared between {b all} the pools in moonpool.
rationale is that we should not have more domains than cores, so it's easier The rationale is that we should not have more domains than cores, so
to reserve exactly that many domain slots, and run more flexible thread it's easier to reserve exactly that many domain slots, and run more flexible
pools on top (each domain being shared by potentially multiple threads from thread pools on top (each domain being shared by potentially multiple threads
multiple pools). from multiple pools).
The pool should not contain actual domains if it's not in use, ie if no The pool should not contain actual domains if it's not in use, ie if no
runner is presently actively using one or more of the domain slots. runner is presently actively using one or more of the domain slots.
{b NOTE}: Interface is still experimental. {b NOTE}: Interface is still experimental.
@since 0.6 *) @since 0.6
*)
type domain = Domain_.t type domain = Domain_.t
@ -23,13 +24,13 @@ val max_number_of_domains : unit -> int
Be very cautious with this interface, or resource leaks might occur. *) Be very cautious with this interface, or resource leaks might occur. *)
val run_on : int -> (unit -> unit) -> unit val run_on : int -> (unit -> unit) -> unit
(** [run_on i f] runs [f()] on the domain with index [i]. Precondition: (** [run_on i f] runs [f()] on the domain with index [i].
[0 <= i < n_domains()]. The thread must call {!decr_on} with [i] once it's Precondition: [0 <= i < n_domains()]. The thread must call {!decr_on}
done. *) with [i] once it's done. *)
val decr_on : int -> unit val decr_on : int -> unit
(** Signal that a thread is stopping on the domain with index [i]. *) (** Signal that a thread is stopping on the domain with index [i]. *)
val run_on_and_wait : int -> (unit -> 'a) -> 'a val run_on_and_wait : int -> (unit -> 'a) -> 'a
(** [run_on_and_wait i f] runs [f()] on the domain with index [i], and blocks (** [run_on_and_wait i f] runs [f()] on the domain with index [i],
until the result of [f()] is returned back. *) and blocks until the result of [f()] is returned back. *)

View file

@ -187,8 +187,8 @@ let with_on_cancel (self : _ t) cb (k : unit -> 'a) : 'a =
let h = add_on_cancel self cb in let h = add_on_cancel self cb in
Fun.protect k ~finally:(fun () -> remove_on_cancel self h) Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
(** Successfully resolve the fiber. This might still fail if some children (** Successfully resolve the fiber. This might still fail if
failed. *) some children failed. *)
let resolve_ok_ (self : 'a t) (r : 'a) : unit = let resolve_ok_ (self : 'a t) (r : 'a) : unit =
let r = A.make @@ Ok r in let r = A.make @@ Ok r in
let promise = prom_of_fut self.res in let promise = prom_of_fut self.res in

View file

@ -1,11 +1,13 @@
(** Fibers. (** Fibers.
A fiber is a lightweight computation that runs cooperatively alongside other A fiber is a lightweight computation that runs cooperatively
fibers. In the context of moonpool, fibers have additional properties: alongside other fibers. In the context of moonpool, fibers
have additional properties:
- they run in a moonpool runner - they run in a moonpool runner
- they form a simple supervision tree, enabling a limited form of structured - they form a simple supervision tree, enabling a limited form
concurrency *) of structured concurrency
*)
type cancel_callback = Exn_bt.t -> unit type cancel_callback = Exn_bt.t -> unit
(** A callback used in case of cancellation *) (** A callback used in case of cancellation *)
@ -24,8 +26,8 @@ module Private_ : sig
runner: Runner.t; runner: Runner.t;
pfiber: pfiber; pfiber: pfiber;
} }
(** Type definition, exposed so that {!any} can be unboxed. Please do not rely (** Type definition, exposed so that {!any} can be unboxed.
on that. *) Please do not rely on that. *)
type any = Any : _ t -> any [@@unboxed] type any = Any : _ t -> any [@@unboxed]
@ -56,7 +58,8 @@ val return : 'a -> 'a t
val fail : Exn_bt.t -> _ t val fail : Exn_bt.t -> _ t
val self : unit -> any val self : unit -> any
(** [self ()] is the current fiber. Must be run from inside a fiber. (** [self ()] is the current fiber.
Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *) @raise Failure if not run from inside a fiber. *)
val peek : 'a t -> 'a Fut.or_error option val peek : 'a t -> 'a Fut.or_error option
@ -75,16 +78,16 @@ val await : 'a t -> 'a
(** [await fib] is like [Fut.await (res fib)] *) (** [await fib] is like [Fut.await (res fib)] *)
val wait_block_exn : 'a t -> 'a val wait_block_exn : 'a t -> 'a
(** [wait_block_exn fib] is [Fut.wait_block_exn (res fib)]. {b NOTE}: See (** [wait_block_exn fib] is [Fut.wait_block_exn (res fib)].
{!Fut.wait_block} for warnings about deadlocks. *) {b NOTE}: See {!Fut.wait_block} for warnings about deadlocks. *)
val wait_block : 'a t -> 'a Fut.or_error val wait_block : 'a t -> 'a Fut.or_error
(** [wait_block fib] is [Fut.wait_block (res fib)]. {b NOTE}: See (** [wait_block fib] is [Fut.wait_block (res fib)].
{!Fut.wait_block} for warnings about deadlocks. *) {b NOTE}: See {!Fut.wait_block} for warnings about deadlocks. *)
val check_if_cancelled : unit -> unit val check_if_cancelled : unit -> unit
(** Check if the current fiber is cancelled, in which case this raises. Must be (** Check if the current fiber is cancelled, in which case this raises.
run from inside a fiber. Must be run from inside a fiber.
@raise e if the current fiber is cancelled with exception [e] @raise e if the current fiber is cancelled with exception [e]
@raise Failure if not run from a fiber. *) @raise Failure if not run from a fiber. *)
@ -96,54 +99,55 @@ type cancel_handle
(** An opaque handle for a single cancel callback in a fiber *) (** An opaque handle for a single cancel callback in a fiber *)
val add_on_cancel : _ t -> cancel_callback -> cancel_handle val add_on_cancel : _ t -> cancel_callback -> cancel_handle
(** [add_on_cancel fib cb] adds [cb] to the list of cancel callbacks for [fib]. (** [add_on_cancel fib cb] adds [cb] to the list of cancel callbacks
If [fib] is already cancelled, [cb] is called immediately. *) for [fib]. If [fib] is already cancelled, [cb] is called immediately. *)
val remove_on_cancel : _ t -> cancel_handle -> unit val remove_on_cancel : _ t -> cancel_handle -> unit
(** [remove_on_cancel fib h] removes the cancel callback associated with handle (** [remove_on_cancel fib h] removes the cancel callback
[h]. *) associated with handle [h]. *)
val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'a val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'a
(** [with_on_cancel fib cb (fun () -> <e>)] evaluates [e] in a scope in which, (** [with_on_cancel fib cb (fun () -> <e>)] evaluates [e]
if the fiber [fib] is cancelled, [cb()] is called. If [e] returns without in a scope in which, if the fiber [fib] is cancelled,
the fiber being cancelled, this callback is removed. *) [cb()] is called. If [e] returns without the fiber being cancelled,
this callback is removed. *)
val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'a val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'a
(** [with_on_self_cancel cb f] calls [f()] in a scope where [cb] is added to the (** [with_on_self_cancel cb f] calls [f()] in a scope where
cancel callbacks of the current fiber; and [f()] terminates, [cb] is removed [cb] is added to the cancel callbacks of the current fiber;
from the list. *) and [f()] terminates, [cb] is removed from the list. *)
val on_result : 'a t -> 'a callback -> unit val on_result : 'a t -> 'a callback -> unit
(** Wait for fiber to be done and call the callback with the result. If the (** Wait for fiber to be done and call the callback
fiber is done already then the callback is invoked immediately with its with the result. If the fiber is done already then the
result. *) callback is invoked immediately with its result. *)
val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t
(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. This (** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner.
fiber is not the child of any other fiber: its lifetime is only determined This fiber is not the child of any other fiber: its lifetime
by the lifetime of [f()]. *) is only determined by the lifetime of [f()]. *)
val spawn : ?on:Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a t val spawn : ?on:Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a t
(** [spawn ~protect f] spawns a sub-fiber [f_child] from a running fiber (** [spawn ~protect f] spawns a sub-fiber [f_child]
[parent]. The sub-fiber [f_child] is attached to the current fiber and fails from a running fiber [parent].
The sub-fiber [f_child] is attached to the current fiber and fails
if the current fiber [parent] fails. if the current fiber [parent] fails.
@param on @param on if provided, start the fiber on the given runner. If not
if provided, start the fiber on the given runner. If not provided, use the provided, use the parent's runner.
parent's runner. @param protect if true, when [f_child] fails, it does not
@param protect affect [parent]. If false, [f_child] failing also
if true, when [f_child] fails, it does not affect [parent]. If false, causes [parent] to fail (and therefore all other children
[f_child] failing also causes [parent] to fail (and therefore all other of [parent]). Default is [true].
children of [parent]). Default is [true].
Must be run from inside a fiber. Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *) @raise Failure if not run from inside a fiber. *)
val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit
(** [spawn_ignore f] is [ignore (spawn f)]. The fiber will still affect (** [spawn_ignore f] is [ignore (spawn f)].
termination of the parent, ie. the parent will exit only after this new The fiber will still affect termination of the parent, ie. the
fiber exits. parent will exit only after this new fiber exits.
@param on the optional runner to use, added since 0.7 *) @param on the optional runner to use, added since NEXT_RELEASE *)
val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit
(** Like {!spawn_top} but ignores the result. (** Like {!spawn_top} but ignores the result.

View file

@ -1,16 +1,18 @@
(** Fiber-local storage. (** Fiber-local storage.
This storage is associated to the current fiber, just like thread-local This storage is associated to the current fiber,
storage is associated with the current thread. just like thread-local storage is associated with
the current thread.
See {!Moonpool.Task_local_storage} for more general information, as this is See {!Moonpool.Task_local_storage} for more general information, as
based on it. this is based on it.
{b NOTE}: it's important to note that, while each fiber has its own storage, {b NOTE}: it's important to note that, while each fiber
spawning a sub-fiber [f2] from a fiber [f1] will only do a shallow copy of has its own storage, spawning a sub-fiber [f2] from a fiber [f1]
the storage. Values inside [f1]'s storage will be physically shared with will only do a shallow copy of the storage.
[f2]. It is thus recommended to store only persistent values in the local Values inside [f1]'s storage will be physically shared with [f2].
storage. *) It is thus recommended to store only persistent values in the local storage.
*)
include module type of struct include module type of struct
include Task_local_storage include Task_local_storage

View file

@ -1,7 +1,7 @@
(** The unique name of a fiber. (** The unique name of a fiber.
Each fiber has a unique handle that can be used to refer to it in maps or Each fiber has a unique handle that can be used to
sets. *) refer to it in maps or sets. *)
type t = private int type t = private int
(** Unique, opaque identifier for a fiber. *) (** Unique, opaque identifier for a fiber. *)

View file

@ -1,6 +1,6 @@
exception Oh_no of Exn_bt.t exception Oh_no of Exn_bt.t
let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a = let main (f : Runner.t -> 'a) : 'a =
let worker_st = let worker_st =
Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ()) Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ())
~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt))) ~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt)))
@ -13,7 +13,6 @@ let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
(* run the main thread *) (* run the main thread *)
Moonpool.Private.Worker_loop_.worker_loop worker_st Moonpool.Private.Worker_loop_.worker_loop worker_st
~block_signals (* do not disturb existing thread *)
~ops:Fifo_pool.Private_.worker_ops; ~ops:Fifo_pool.Private_.worker_ops;
match Fiber.peek fiber with match Fiber.peek fiber with
@ -21,6 +20,3 @@ let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
| Some (Error ebt) -> Exn_bt.raise ebt | Some (Error ebt) -> Exn_bt.raise ebt
| None -> assert false | None -> assert false
with Oh_no ebt -> Exn_bt.raise ebt with Oh_no ebt -> Exn_bt.raise ebt
let main f =
main' () f ~block_signals:false (* do not disturb existing thread *)

View file

@ -1,28 +1,25 @@
(** Main thread. (** Main thread.
This is evolved from [Moonpool.Immediate_runner], but unlike it, this API This is evolved from [Moonpool.Immediate_runner], but unlike it,
assumes you run it in a thread (possibly the main thread) which will block this API assumes you run it in a thread (possibly
until the initial computation is done. the main thread) which will block until the initial computation is done.
This means it's reasonable to use [Main.main (fun () -> do_everything)] at This means it's reasonable to use [Main.main (fun () -> do_everything)]
the beginning of the program. Other Moonpool pools can be created for at the beginning of the program.
background tasks, etc. to do the heavy lifting, and the main thread (inside Other Moonpool pools can be created for background tasks, etc. to do the
this immediate runner) can coordinate tasks via [Fiber.await]. heavy lifting, and the main thread (inside this immediate runner) can coordinate
tasks via [Fiber.await].
Aside from the fact that this blocks the caller thread, it is fairly similar Aside from the fact that this blocks the caller thread, it is fairly similar to
to {!Background_thread} in that there's a single worker to process {!Background_thread} in that there's a single worker to process
tasks/fibers. tasks/fibers.
This handles effects, including the ones in {!Fiber}. This handles effects, including the ones in {!Fiber}.
@since 0.6 *) @since 0.6
*)
val main : (Moonpool.Runner.t -> 'a) -> 'a val main : (Moonpool.Runner.t -> 'a) -> 'a
(** [main f] runs [f()] in a scope that handles effects, including (** [main f] runs [f()] in a scope that handles effects, including {!Fiber.await}.
{!Fiber.await}.
This scope can run background tasks as well, in a cooperative fashion. *) This scope can run background tasks as well, in a cooperative fashion. *)
val main' : ?block_signals:bool -> unit -> (Moonpool.Runner.t -> 'a) -> 'a
(** Same as {!main} but with room for optional arguments.
@since 0.7 *)

View file

@ -5,22 +5,20 @@
@since 0.3 *) @since 0.3 *)
val both : (unit -> 'a) -> (unit -> 'b) -> 'a * 'b val both : (unit -> 'a) -> (unit -> 'b) -> 'a * 'b
(** [both f g] runs [f()] and [g()], potentially in parallel, and returns their (** [both f g] runs [f()] and [g()], potentially in parallel,
result when both are done. If any of [f()] and [g()] fails, then the whole and returns their result when both are done.
computation fails. If any of [f()] and [g()] fails, then the whole computation fails.
This must be run from within the pool: for example, inside {!Pool.run} or This must be run from within the pool: for example, inside {!Pool.run}
inside a {!Fut.spawn} computation. This is because it relies on an effect or inside a {!Fut.spawn} computation.
handler to be installed. This is because it relies on an effect handler to be installed.
@since 0.3 @since 0.3
{b NOTE} this is only available on OCaml 5. *) {b NOTE} this is only available on OCaml 5. *)
val both_ignore : (unit -> _) -> (unit -> _) -> unit val both_ignore : (unit -> _) -> (unit -> _) -> unit
(** Same as [both f g |> ignore]. (** Same as [both f g |> ignore].
@since 0.3 @since 0.3
{b NOTE} this is only available on OCaml 5. *) {b NOTE} this is only available on OCaml 5. *)
val for_ : ?chunk_size:int -> int -> (int -> int -> unit) -> unit val for_ : ?chunk_size:int -> int -> (int -> int -> unit) -> unit
@ -65,49 +63,43 @@ val for_ : ?chunk_size:int -> int -> (int -> int -> unit) -> unit
{b NOTE} this is only available on OCaml 5. *) {b NOTE} this is only available on OCaml 5. *)
val all_array : ?chunk_size:int -> (unit -> 'a) array -> 'a array val all_array : ?chunk_size:int -> (unit -> 'a) array -> 'a array
(** [all_array fs] runs all functions in [fs] in tasks, and waits for all the (** [all_array fs] runs all functions in [fs] in tasks, and waits for
results. all the results.
@param chunk_size @param chunk_size if equal to [n], groups items by [n] to be run in
if equal to [n], groups items by [n] to be run in a single task. Default a single task. Default is [1].
is [1].
@since 0.3 @since 0.3
{b NOTE} this is only available on OCaml 5. *) {b NOTE} this is only available on OCaml 5. *)
val all_list : ?chunk_size:int -> (unit -> 'a) list -> 'a list val all_list : ?chunk_size:int -> (unit -> 'a) list -> 'a list
(** [all_list fs] runs all functions in [fs] in tasks, and waits for all the (** [all_list fs] runs all functions in [fs] in tasks, and waits for
results. all the results.
@param chunk_size @param chunk_size if equal to [n], groups items by [n] to be run in
if equal to [n], groups items by [n] to be run in a single task. Default a single task. Default is not specified.
is not specified. This parameter is available since 0.3. This parameter is available since 0.3.
@since 0.3 @since 0.3
{b NOTE} this is only available on OCaml 5. *) {b NOTE} this is only available on OCaml 5. *)
val all_init : ?chunk_size:int -> int -> (int -> 'a) -> 'a list val all_init : ?chunk_size:int -> int -> (int -> 'a) -> 'a list
(** [all_init n f] runs functions [f 0], [f 1], … [f (n-1)] in tasks, and waits (** [all_init n f] runs functions [f 0], [f 1], … [f (n-1)] in tasks, and waits for
for all the results. all the results.
@param chunk_size @param chunk_size if equal to [n], groups items by [n] to be run in
if equal to [n], groups items by [n] to be run in a single task. Default a single task. Default is not specified.
is not specified. This parameter is available since 0.3. This parameter is available since 0.3.
@since 0.3 @since 0.3
{b NOTE} this is only available on OCaml 5. *) {b NOTE} this is only available on OCaml 5. *)
val map_array : ?chunk_size:int -> ('a -> 'b) -> 'a array -> 'b array val map_array : ?chunk_size:int -> ('a -> 'b) -> 'a array -> 'b array
(** [map_array f arr] is like [Array.map f arr], but runs in parallel. (** [map_array f arr] is like [Array.map f arr], but runs in parallel.
@since 0.3 @since 0.3
{b NOTE} this is only available on OCaml 5. *) {b NOTE} this is only available on OCaml 5. *)
val map_list : ?chunk_size:int -> ('a -> 'b) -> 'a list -> 'b list val map_list : ?chunk_size:int -> ('a -> 'b) -> 'a list -> 'b list
(** [map_list f l] is like [List.map f l], but runs in parallel. (** [map_list f l] is like [List.map f l], but runs in parallel.
@since 0.3 @since 0.3
{b NOTE} this is only available on OCaml 5. *) {b NOTE} this is only available on OCaml 5. *)

View file

@ -2,7 +2,8 @@ open Common_
class type t = object class type t = object
method input : bytes -> int -> int -> int method input : bytes -> int -> int -> int
(** Read into the slice. Returns [0] only if the stream is closed. *) (** Read into the slice. Returns [0] only if the
stream is closed. *)
method close : unit -> unit method close : unit -> unit
(** Close the input. Must be idempotent. *) (** Close the input. Must be idempotent. *)
@ -46,7 +47,7 @@ let of_bytes ?(off = 0) ?len (b : bytes) : t =
let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s) let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s)
(** Read into the given slice. (** Read into the given slice.
@return the number of bytes read, [0] means end of input. *) @return the number of bytes read, [0] means end of input. *)
let[@inline] input (self : #t) buf i len = self#input buf i len let[@inline] input (self : #t) buf i len = self#input buf i len
(** Close the channel. *) (** Close the channel. *)

View file

@ -67,7 +67,7 @@ module Perform_action_in_lwt = struct
let actions_ : Action_queue.t = Action_queue.create () let actions_ : Action_queue.t = Action_queue.create ()
(** Gets the current set of notifications and perform them from inside the (** Gets the current set of notifications and perform them from inside the
Lwt thread *) Lwt thread *)
let perform_pending_actions () : unit = let perform_pending_actions () : unit =
let@ _sp = let@ _sp =
Moonpool.Private.Tracing_.with_span Moonpool.Private.Tracing_.with_span

View file

@ -1,7 +1,8 @@
(** Lwt_engine-based event loop for Moonpool. (** Lwt_engine-based event loop for Moonpool.
In what follows, we mean by "lwt thread" the thread running [Lwt_main.run] In what follows, we mean by "lwt thread" the thread
(so, the thread where the Lwt event loop and all Lwt callbacks execute). running [Lwt_main.run] (so, the thread where the Lwt event
loop and all Lwt callbacks execute).
{b NOTE}: this is experimental and might change in future versions. {b NOTE}: this is experimental and might change in future versions.
@ -13,50 +14,53 @@ module FLS = Moonpool_fib.Fls
(** {2 Basic conversions} *) (** {2 Basic conversions} *)
val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t
(** [fut_of_lwt lwt_fut] makes a thread-safe moonpool future that completes when (** [fut_of_lwt lwt_fut] makes a thread-safe moonpool future that
[lwt_fut] does. This must be run from within the Lwt thread. *) completes when [lwt_fut] does. This must be run from within
the Lwt thread. *)
val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t
(** [lwt_of_fut fut] makes a lwt future that completes when [fut] does. This (** [lwt_of_fut fut] makes a lwt future that completes when
must be called from the Lwt thread, and the result must always be used only [fut] does. This must be called from the Lwt thread, and the result
from inside the Lwt thread. *) must always be used only from inside the Lwt thread. *)
(** {2 Helpers on the moonpool side} *) (** {2 Helpers on the moonpool side} *)
val await_lwt : 'a Lwt.t -> 'a val await_lwt : 'a Lwt.t -> 'a
(** [await_lwt fut] awaits a Lwt future from inside a task running on a moonpool (** [await_lwt fut] awaits a Lwt future from inside a task running on
runner. This must be run from within a Moonpool runner so that the await-ing a moonpool runner. This must be run from within a Moonpool runner
effect is handled. *) so that the await-ing effect is handled. *)
val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t
(** [run_in_lwt f] runs [f()] from within the Lwt thread and returns a (** [run_in_lwt f] runs [f()] from within the Lwt thread
thread-safe future. This can be run from anywhere. *) and returns a thread-safe future. This can be run from anywhere. *)
val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a
(** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and awaits its result. (** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and
Must be run from inside a moonpool runner so that the await-in effect is awaits its result. Must be run from inside a moonpool runner
handled. so that the await-in effect is handled.
This is similar to [Moonpool.await @@ run_in_lwt f]. *) This is similar to [Moonpool.await @@ run_in_lwt f]. *)
val get_runner : unit -> Moonpool.Runner.t val get_runner : unit -> Moonpool.Runner.t
(** Returns the runner from within which this is called. Must be run from within (** Returns the runner from within which this is called.
a fiber. Must be run from within a fiber.
@raise Failure if not run within a fiber *) @raise Failure if not run within a fiber *)
(** {2 IO} *) (** {2 IO} *)
(** IO using the Lwt event loop. (** IO using the Lwt event loop.
These IO operations work on non-blocking file descriptors and rely on a These IO operations work on non-blocking file descriptors
[Lwt_engine] event loop being active (meaning, [Lwt_main.run] is currently and rely on a [Lwt_engine] event loop being active (meaning,
running in some thread). [Lwt_main.run] is currently running in some thread).
Calling these functions must be done from a moonpool runner. A function like Calling these functions must be done from a moonpool runner.
[read] will first try to perform the IO action directly (here, call A function like [read] will first try to perform the IO action
{!Unix.read}); if the action fails because the FD is not ready, then directly (here, call {!Unix.read}); if the action fails because
[await_readable] is called: it suspends the fiber and subscribes it to Lwt the FD is not ready, then [await_readable] is called:
to be awakened when the FD becomes ready. *) it suspends the fiber and subscribes it to Lwt to be awakened
when the FD becomes ready.
*)
module IO : sig module IO : sig
val read : Unix.file_descr -> bytes -> int -> int -> int val read : Unix.file_descr -> bytes -> int -> int -> int
(** Read from the file descriptor *) (** Read from the file descriptor *)
@ -87,29 +91,27 @@ module TCP_server : sig
type t = Lwt_io.server type t = Lwt_io.server
val establish_lwt : val establish_lwt :
?backlog: ?backlog:(* ?server_fd:Unix.file_descr -> *)
(* ?server_fd:Unix.file_descr -> *) int ->
int ->
?no_close:bool -> ?no_close:bool ->
runner:Moonpool.Runner.t -> runner:Moonpool.Runner.t ->
Unix.sockaddr -> Unix.sockaddr ->
(Unix.sockaddr -> Lwt_io.input_channel -> Lwt_io.output_channel -> unit) -> (Unix.sockaddr -> Lwt_io.input_channel -> Lwt_io.output_channel -> unit) ->
t t
(** [establish ~runner addr handler] runs a TCP server in the Lwt thread. When (** [establish ~runner addr handler] runs a TCP server in the Lwt
a client connects, a moonpool fiber is started on [runner] to handle it. thread. When a client connects, a moonpool fiber is started on [runner]
*) to handle it. *)
val establish : val establish :
?backlog: ?backlog:(* ?server_fd:Unix.file_descr -> *)
(* ?server_fd:Unix.file_descr -> *) int ->
int ->
?no_close:bool -> ?no_close:bool ->
runner:Moonpool.Runner.t -> runner:Moonpool.Runner.t ->
Unix.sockaddr -> Unix.sockaddr ->
(Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) -> (Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) ->
t t
(** Like {!establish_lwt} but uses {!IO} to directly handle reads and writes (** Like {!establish_lwt} but uses {!IO} to directly handle
on client sockets. *) reads and writes on client sockets. *)
val shutdown : t -> unit val shutdown : t -> unit
(** Shutdown the server *) (** Shutdown the server *)
@ -119,8 +121,8 @@ module TCP_client : sig
val connect : Unix.sockaddr -> Unix.file_descr val connect : Unix.sockaddr -> Unix.file_descr
val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a
(** Open a connection, and use {!IO} to read and write from the socket in a (** Open a connection, and use {!IO} to read and write from
non blocking way. *) the socket in a non blocking way. *)
val with_connect_lwt : val with_connect_lwt :
Unix.sockaddr -> (Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) -> 'a Unix.sockaddr -> (Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) -> 'a
@ -130,15 +132,15 @@ end
(** {2 Helpers on the lwt side} *) (** {2 Helpers on the lwt side} *)
val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t
(** [detach_in_runner ~runner f] runs [f] in the given moonpool runner, and (** [detach_in_runner ~runner f] runs [f] in the given moonpool runner,
returns a lwt future. This must be run from within the thread running and returns a lwt future. This must be run from within the thread
[Lwt_main]. *) running [Lwt_main]. *)
(** {2 Wrappers around Lwt_main} *) (** {2 Wrappers around Lwt_main} *)
val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a
(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] (** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside
inside a fiber in [runner]. *) a fiber in [runner]. *)
val main : (unit -> 'a) -> 'a val main : (unit -> 'a) -> 'a
(** Like {!main_with_runner} but with a default choice of runner. *) (** Like {!main_with_runner} but with a default choice of runner. *)

View file

@ -1,10 +1,11 @@
(** Work-stealing deque. (** Work-stealing deque.
Adapted from "Dynamic circular work stealing deque", Chase & Lev. Adapted from "Dynamic circular work stealing deque", Chase & Lev.
However note that this one is not dynamic in the sense that there is no However note that this one is not dynamic in the sense that there
resizing. Instead we return [false] when [push] fails, which keeps the is no resizing. Instead we return [false] when [push] fails, which
implementation fairly lightweight. *) keeps the implementation fairly lightweight.
*)
type 'a t type 'a t
(** Deque containing values of type ['a] *) (** Deque containing values of type ['a] *)
@ -13,12 +14,12 @@ val create : dummy:'a -> unit -> 'a t
(** Create a new deque. *) (** Create a new deque. *)
val push : 'a t -> 'a -> bool val push : 'a t -> 'a -> bool
(** Push value at the bottom of deque. returns [true] if it succeeds. This must (** Push value at the bottom of deque. returns [true] if it succeeds.
be called only by the owner thread. *) This must be called only by the owner thread. *)
val pop : 'a t -> 'a option val pop : 'a t -> 'a option
(** Pop value from the bottom of deque. This must be called only by the owner (** Pop value from the bottom of deque.
thread. *) This must be called only by the owner thread. *)
exception Empty exception Empty

View file

@ -1,8 +1,8 @@
(** Mutex-protected resource. (** Mutex-protected resource.
This lock is a synchronous concurrency primitive, as a thin wrapper around This lock is a synchronous concurrency primitive, as a thin wrapper
{!Mutex} that encourages proper management of the critical section in RAII around {!Mutex} that encourages proper management of the critical
style: section in RAII style:
{[ {[
let (let@) = (@@) let (let@) = (@@)
@ -30,27 +30,27 @@ val create : 'a -> 'a t
(** Create a new protected value. *) (** Create a new protected value. *)
val with_ : 'a t -> ('a -> 'b) -> 'b val with_ : 'a t -> ('a -> 'b) -> 'b
(** [with_ l f] runs [f x] where [x] is the value protected with the lock [l], (** [with_ l f] runs [f x] where [x] is the value protected with
in a critical section. If [f x] fails, [with_lock l f] fails too but the the lock [l], in a critical section. If [f x] fails, [with_lock l f]
lock is released. *) fails too but the lock is released. *)
val update : 'a t -> ('a -> 'a) -> unit val update : 'a t -> ('a -> 'a) -> unit
(** [update l f] replaces the content [x] of [l] with [f x], while protected by (** [update l f] replaces the content [x] of [l] with [f x], while protected
the mutex. *) by the mutex. *)
val update_map : 'a t -> ('a -> 'a * 'b) -> 'b val update_map : 'a t -> ('a -> 'a * 'b) -> 'b
(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l] and (** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l]
returns [y], while protected by the mutex. *) and returns [y], while protected by the mutex. *)
val mutex : _ t -> Picos_std_sync.Mutex.t val mutex : _ t -> Picos_std_sync.Mutex.t
(** Underlying mutex. *) (** Underlying mutex. *)
val get : 'a t -> 'a val get : 'a t -> 'a
(** Atomically get the value in the lock. The value that is returned isn't (** Atomically get the value in the lock. The value that is returned
protected! *) isn't protected! *)
val set : 'a t -> 'a -> unit val set : 'a t -> 'a -> unit
(** Atomically set the value. (** Atomically set the value.
{b NOTE} caution: using {!get} and {!set} as if this were a {!ref} is an {b NOTE} caution: using {!get} and {!set} as if this were a {!ref}
anti pattern and will not protect data against some race conditions. *) is an anti pattern and will not protect data against some race conditions. *)