Compare commits

...

12 commits

Author SHA1 Message Date
Simon Cruanes
213d9bdd19
revert previous delayed await
Some checks failed
github pages / Deploy doc (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / build-compat (push) Has been cancelled
Build and Test / format (push) Has been cancelled
2025-05-02 13:04:04 -04:00
Simon Cruanes
bb9418d86a
format with 0.27 2025-05-02 10:58:50 -04:00
Simon Cruanes
d50c227578
perf: await on immediately ready timer queues its task 2025-05-02 10:51:46 -04:00
Simon Cruanes
b46a048401
feat: add Moonpool.yield on ocaml 5
a mere alias to Picos.Fiber.yield
2025-05-02 10:33:30 -04:00
Simon Cruanes
ed0eda226c
prepare for 0.8 2025-04-17 16:35:19 -04:00
Simon Cruanes
2b00a0cea1
feat(exn_bt): in show/pp, do print the backtrace when present 2025-04-15 10:10:02 -04:00
Simon Cruanes
3a5eaaa44d
api(fut): public alias 'a Fut.t = 'a Picos.Computation.t 2025-03-19 17:40:17 -04:00
Simon Cruanes
f0ea8c294d
single system call for signal blocking 2025-03-13 15:42:04 -04:00
Simon Cruanes
dd88008a0a
fix: do not die if we fail to block a signal 2025-03-13 10:45:21 -04:00
Simon Cruanes
c51a0a6bd4
don't try to block sigstop 2025-03-13 10:45:01 -04:00
Simon Cruanes
deb96302e1
mli for worker loop 2025-03-13 10:07:39 -04:00
Simon Cruanes
a20208ec37
feat: block signals in workers if asked to 2025-03-13 10:07:20 -04:00
41 changed files with 731 additions and 676 deletions

View file

@ -78,7 +78,7 @@ jobs:
strategy: strategy:
matrix: matrix:
ocaml-compiler: ocaml-compiler:
- '5.2' - '5.3'
runs-on: 'ubuntu-latest' runs-on: 'ubuntu-latest'
steps: steps:
- uses: actions/checkout@main - uses: actions/checkout@main
@ -89,6 +89,6 @@ jobs:
dune-cache: true dune-cache: true
allow-prerelease-opam: true allow-prerelease-opam: true
- run: opam install ocamlformat.0.26.2 - run: opam install ocamlformat.0.27.0
- run: opam exec -- make format-check - run: opam exec -- make format-check

View file

@ -1,4 +1,4 @@
version = 0.26.2 version = 0.27.0
profile=conventional profile=conventional
margin=80 margin=80
if-then-else=k-r if-then-else=k-r

View file

@ -1,4 +1,17 @@
# 0.8
- api(fut): make alias `'a Fut.t = 'a Picos.Computation.t` public
- feat: add `Fut.make_promise`, have `'a promise = private 'a fut`
- feat(exn_bt): in show/pp, do print the backtrace when present
- feat: block signals in workers if asked to
- relax bound on picos to 0.5-0.6
- feat fib: `spawn_ignore` now has `?on` optional param
- change Moonpool.Chan so it's bounded (stil experimental)
- fix task local storage: type was too specific
- fix fiber: use a single fut/computation in fibers
# 0.7 # 0.7
- add `Moonpool_fiber.spawn_top_ignore` - add `Moonpool_fiber.spawn_top_ignore`

View file

@ -2,7 +2,7 @@
(using mdx 0.2) (using mdx 0.2)
(name moonpool) (name moonpool)
(version 0.7) (version 0.8)
(generate_opam_files true) (generate_opam_files true)
(source (source
(github c-cube/moonpool)) (github c-cube/moonpool))

View file

@ -1,4 +1,5 @@
(** Example from https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 *) (** Example from
https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 *)
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead # This file is generated by dune, edit dune-project instead
opam-version: "2.0" opam-version: "2.0"
version: "0.7" version: "0.8"
synopsis: "Async IO for moonpool, relying on picos (experimental)" synopsis: "Async IO for moonpool, relying on picos (experimental)"
maintainer: ["Simon Cruanes"] maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"] authors: ["Simon Cruanes"]

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead # This file is generated by dune, edit dune-project instead
opam-version: "2.0" opam-version: "2.0"
version: "0.7" version: "0.8"
synopsis: "Event loop for moonpool based on Lwt-engine (experimental)" synopsis: "Event loop for moonpool based on Lwt-engine (experimental)"
maintainer: ["Simon Cruanes"] maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"] authors: ["Simon Cruanes"]

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead # This file is generated by dune, edit dune-project instead
opam-version: "2.0" opam-version: "2.0"
version: "0.7" version: "0.8"
synopsis: "Pools of threads supported by a pool of domains" synopsis: "Pools of threads supported by a pool of domains"
maintainer: ["Simon Cruanes"] maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"] authors: ["Simon Cruanes"]

View file

@ -1,13 +1,11 @@
(** A simple runner with a single background thread. (** A simple runner with a single background thread.
Because this is guaranteed to have a single worker thread, Because this is guaranteed to have a single worker thread, tasks scheduled
tasks scheduled in this runner always run asynchronously but in this runner always run asynchronously but in a sequential fashion.
in a sequential fashion.
This is similar to {!Fifo_pool} with exactly one thread. This is similar to {!Fifo_pool} with exactly one thread.
@since 0.6 @since 0.6 *)
*)
include module type of Runner include module type of Runner

View file

@ -16,48 +16,45 @@ val size : _ t -> int
val pop : 'a t -> 'a val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element comes. (** [pop q] pops the next element in [q]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *) @raise Closed if the queue was closed before a new element was available. *)
val try_pop : force_lock:bool -> 'a t -> 'a option val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop q] immediately pops the first element of [q], if any, (** [try_pop q] immediately pops the first element of [q], if any, or returns
or returns [None] without blocking. [None] without blocking.
@param force_lock if true, use {!Mutex.lock} (which can block under contention); @param force_lock
if false, use {!Mutex.try_lock}, which might return [None] even in if true, use {!Mutex.lock} (which can block under contention); if false,
presence of an element if there's contention *) use {!Mutex.try_lock}, which might return [None] even in presence of an
element if there's contention *)
val try_push : 'a t -> 'a -> bool val try_push : 'a t -> 'a -> bool
(** [try_push q x] tries to push into [q], in which case (** [try_push q x] tries to push into [q], in which case it returns [true]; or
it returns [true]; or it fails to push and returns [false] it fails to push and returns [false] without blocking.
without blocking. @raise Closed if the locking succeeded but the queue is closed. *)
@raise Closed if the locking succeeded but the queue is closed.
*)
val transfer : 'a t -> 'a Queue.t -> unit val transfer : 'a t -> 'a Queue.t -> unit
(** [transfer bq q2] transfers all items presently (** [transfer bq q2] transfers all items presently in [bq] into [q2] in one
in [bq] into [q2] in one atomic section, and clears [bq]. atomic section, and clears [bq]. It blocks if no element is in [bq].
It blocks if no element is in [bq].
This is useful to consume elements from the queue in batch.
Create a [Queue.t] locally:
This is useful to consume elements from the queue in batch. Create a
[Queue.t] locally:
{[ {[
let dowork (work_queue: job Bb_queue.t) = let dowork (work_queue : job Bb_queue.t) =
(* local queue, not thread safe *) (* local queue, not thread safe *)
let local_q = Queue.create() in let local_q = Queue.create () in
try try
while true do while true do
(* work on local events, already on this thread *) (* work on local events, already on this thread *)
while not (Queue.is_empty local_q) do while not (Queue.is_empty local_q) do
let job = Queue.pop local_q in let job = Queue.pop local_q in
process_job job process_job job
done; done;
(* get all the events in the incoming blocking queue, in (* get all the events in the incoming blocking queue, in
one single critical section. *) one single critical section. *)
Bb_queue.transfer work_queue local_q Bb_queue.transfer work_queue local_q
done done
with Bb_queue.Closed -> () with Bb_queue.Closed -> ()
]} ]}
@since 0.4 *) @since 0.4 *)
@ -69,8 +66,8 @@ type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit type 'a iter = ('a -> unit) -> unit
val to_iter : 'a t -> 'a iter val to_iter : 'a t -> 'a iter
(** [to_iter q] returns an iterator over all items in the queue. (** [to_iter q] returns an iterator over all items in the queue. This might not
This might not terminate if [q] is never closed. terminate if [q] is never closed.
@since 0.4 *) @since 0.4 *)
val to_gen : 'a t -> 'a gen val to_gen : 'a t -> 'a gen

View file

@ -1,15 +1,13 @@
(** A blocking queue of finite size. (** A blocking queue of finite size.
This queue, while still using locks underneath This queue, while still using locks underneath (like the regular blocking
(like the regular blocking queue) should be enough for queue) should be enough for usage under reasonable contention.
usage under reasonable contention.
The bounded size is helpful whenever some form of backpressure is The bounded size is helpful whenever some form of backpressure is desirable:
desirable: if the queue is used to communicate between producer(s) if the queue is used to communicate between producer(s) and consumer(s), the
and consumer(s), the consumer(s) can limit the rate at which consumer(s) can limit the rate at which producer(s) send new work down their
producer(s) send new work down their way. way. Whenever the queue is full, means that producer(s) will have to wait
Whenever the queue is full, means that producer(s) will have to before pushing new work.
wait before pushing new work.
@since 0.4 *) @since 0.4 *)
@ -19,42 +17,41 @@ type 'a t
val create : max_size:int -> unit -> 'a t val create : max_size:int -> unit -> 'a t
val close : _ t -> unit val close : _ t -> unit
(** [close q] closes [q]. No new elements can be pushed into [q], (** [close q] closes [q]. No new elements can be pushed into [q], and after all
and after all the elements still in [q] currently are [pop]'d, the elements still in [q] currently are [pop]'d, {!pop} will also raise
{!pop} will also raise {!Closed}. *) {!Closed}. *)
exception Closed exception Closed
val push : 'a t -> 'a -> unit val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] at the end of the queue. (** [push q x] pushes [x] at the end of the queue. If [q] is full, this will
If [q] is full, this will block until there is block until there is room for [x].
room for [x].
@raise Closed if [q] is closed. *) @raise Closed if [q] is closed. *)
val try_push : force_lock:bool -> 'a t -> 'a -> bool val try_push : force_lock:bool -> 'a t -> 'a -> bool
(** [try_push q x] attempts to push [x] into [q], but abandons (** [try_push q x] attempts to push [x] into [q], but abandons if it cannot
if it cannot acquire [q] or if [q] is full. acquire [q] or if [q] is full.
@param force_lock if true, use {!Mutex.lock} (which can block @param force_lock
under contention); if true, use {!Mutex.lock} (which can block under contention); if false,
if false, use {!Mutex.try_lock}, which might return [false] even use {!Mutex.try_lock}, which might return [false] even if there's room in
if there's room in the queue. the queue.
@raise Closed if [q] is closed. *) @raise Closed if [q] is closed. *)
val pop : 'a t -> 'a val pop : 'a t -> 'a
(** [pop q] pops the first element off [q]. It blocks if [q] (** [pop q] pops the first element off [q]. It blocks if [q] is empty, until
is empty, until some element becomes available. some element becomes available.
@raise Closed if [q] is empty and closed. *) @raise Closed if [q] is empty and closed. *)
val try_pop : force_lock:bool -> 'a t -> 'a option val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop ~force_lock q] tries to pop the first element, or returns [None] (** [try_pop ~force_lock q] tries to pop the first element, or returns [None] if
if no element is available or if it failed to acquire [q]. no element is available or if it failed to acquire [q].
@param force_lock if true, use {!Mutex.lock} (which can block @param force_lock
under contention); if true, use {!Mutex.lock} (which can block under contention); if false,
if false, use {!Mutex.try_lock}, which might return [None] even in use {!Mutex.try_lock}, which might return [None] even in presence of an
presence of an element if there's contention. element if there's contention.
@raise Closed if [q] is empty and closed. *) @raise Closed if [q] is empty and closed. *)
@ -65,9 +62,8 @@ val max_size : _ t -> int
(** Maximum size of the queue. See {!create}. *) (** Maximum size of the queue. See {!create}. *)
val transfer : 'a t -> 'a Queue.t -> unit val transfer : 'a t -> 'a Queue.t -> unit
(** [transfer bq q2] transfers all elements currently available (** [transfer bq q2] transfers all elements currently available in [bq] into
in [bq] into local queue [q2], and clears [bq], atomically. local queue [q2], and clears [bq], atomically. It blocks if [bq] is empty.
It blocks if [bq] is empty.
See {!Bb_queue.transfer} for more details. See {!Bb_queue.transfer} for more details.
@raise Closed if [bq] is empty and closed. *) @raise Closed if [bq] is empty and closed. *)
@ -76,8 +72,8 @@ type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit type 'a iter = ('a -> unit) -> unit
val to_iter : 'a t -> 'a iter val to_iter : 'a t -> 'a iter
(** [to_iter q] returns an iterator over all items in the queue. (** [to_iter q] returns an iterator over all items in the queue. This might not
This might not terminate if [q] is never closed. *) terminate if [q] is never closed. *)
val to_gen : 'a t -> 'a gen val to_gen : 'a t -> 'a gen
(** [to_gen q] returns a generator from the queue. *) (** [to_gen q] returns a generator from the queue. *)

View file

@ -3,7 +3,7 @@
The channels have bounded size. Push/pop return futures or can use effects The channels have bounded size. Push/pop return futures or can use effects
to provide an [await]-friendly version. to provide an [await]-friendly version.
The channels became bounded since @NEXT_RELEASE . The channels became bounded since @0.7 .
*) *)
type 'a t type 'a t
@ -15,33 +15,32 @@ val create : max_size:int -> unit -> 'a t
exception Closed exception Closed
val try_push : 'a t -> 'a -> bool val try_push : 'a t -> 'a -> bool
(** [try_push chan x] pushes [x] into [chan]. This does not block. (** [try_push chan x] pushes [x] into [chan]. This does not block. Returns
Returns [true] if it succeeded in pushing. [true] if it succeeded in pushing.
@raise Closed if the channel is closed. *) @raise Closed if the channel is closed. *)
val try_pop : 'a t -> 'a option val try_pop : 'a t -> 'a option
(** [try_pop chan] pops and return an element if one is available (** [try_pop chan] pops and return an element if one is available immediately.
immediately. Otherwise it returns [None]. Otherwise it returns [None].
@raise Closed if the channel is closed and empty. @raise Closed if the channel is closed and empty. *)
*)
val close : _ t -> unit val close : _ t -> unit
(** Close the channel. Further push and pop calls will fail. (** Close the channel. Further push and pop calls will fail. This is idempotent.
This is idempotent. *) *)
[@@@ifge 5.0] [@@@ifge 5.0]
val push : 'a t -> 'a -> unit val push : 'a t -> 'a -> unit
(** Push the value into the channel, suspending the current task (** Push the value into the channel, suspending the current task if the channel
if the channel is currently full. is currently full.
@raise Closed if the channel is closed @raise Closed if the channel is closed
@since NEXT_RELEASE *) @since 0.7 *)
val pop : 'a t -> 'a val pop : 'a t -> 'a
(** Pop an element. This might suspend the current task if the (** Pop an element. This might suspend the current task if the channel is
channel is currently empty. currently empty.
@raise Closed if the channel is empty and closed. @raise Closed if the channel is empty and closed.
@since NEXT_RELEASE *) @since 0.7 *)
(* (*
val pop_block_exn : 'a t -> 'a val pop_block_exn : 'a t -> 'a

View file

@ -3,7 +3,15 @@ type t = exn * Printexc.raw_backtrace
let[@inline] make exn bt : t = exn, bt let[@inline] make exn bt : t = exn, bt
let[@inline] exn (e, _) = e let[@inline] exn (e, _) = e
let[@inline] bt (_, bt) = bt let[@inline] bt (_, bt) = bt
let show self = Printexc.to_string (exn self)
let show self =
let bt = Printexc.raw_backtrace_to_string (bt self) in
let exn = Printexc.to_string (exn self) in
if bt = "" then
exn
else
Printf.sprintf "%s\n%s" exn bt
let pp out self = Format.pp_print_string out (show self) let pp out self = Format.pp_print_string out (show self)
let[@inline] raise (e, bt) = Printexc.raise_with_backtrace e bt let[@inline] raise (e, bt) = Printexc.raise_with_backtrace e bt

View file

@ -165,7 +165,9 @@ let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
create the thread and push it into [receive_threads] *) create the thread and push it into [receive_threads] *)
let create_thread_in_domain () = let create_thread_in_domain () =
let st = { idx = i; dom_idx; st = pool } in let st = { idx = i; dom_idx; st = pool } in
let thread = Thread.create (WL.worker_loop ~ops:worker_ops) st in let thread =
Thread.create (WL.worker_loop ~block_signals:true ~ops:worker_ops) st
in
(* send the thread from the domain back to us *) (* send the thread from the domain back to us *)
Bb_queue.push receive_threads (i, thread) Bb_queue.push receive_threads (i, thread)
in in

View file

@ -1,16 +1,16 @@
(** A simple thread pool in FIFO order. (** A simple thread pool in FIFO order.
FIFO: first-in, first-out. Basically tasks are put into a queue, FIFO: first-in, first-out. Basically tasks are put into a queue, and worker
and worker threads pull them out of the queue at the other end. threads pull them out of the queue at the other end.
Since this uses a single blocking queue to manage tasks, it's very Since this uses a single blocking queue to manage tasks, it's very simple
simple and reliable. The number of worker threads is fixed, but and reliable. The number of worker threads is fixed, but they are spread
they are spread over several domains to enable parallelism. over several domains to enable parallelism.
This can be useful for latency-sensitive applications (e.g. as a This can be useful for latency-sensitive applications (e.g. as a pool of
pool of workers for network servers). Work-stealing pools might workers for network servers). Work-stealing pools might have higher
have higher throughput but they're very unfair to some tasks; by throughput but they're very unfair to some tasks; by contrast, here, older
contrast, here, older tasks have priority over younger tasks. tasks have priority over younger tasks.
@since 0.5 *) @since 0.5 *)
@ -28,22 +28,22 @@ type ('a, 'b) create_args =
val create : (unit -> t, _) create_args val create : (unit -> t, _) create_args
(** [create ()] makes a new thread pool. (** [create ()] makes a new thread pool.
@param on_init_thread called at the beginning of each new thread in the pool. @param on_init_thread
@param min minimum size of the pool. See {!Pool.create_args}. called at the beginning of each new thread in the pool.
The default is [Domain.recommended_domain_count()], ie one worker per @param min
CPU core. minimum size of the pool. See {!Pool.create_args}. The default is
On OCaml 4 the default is [4] (since there is only one domain). [Domain.recommended_domain_count()], ie one worker per CPU core. On OCaml
@param on_exit_thread called at the end of each worker thread in the pool. 4 the default is [4] (since there is only one domain).
@param around_task a pair of [before, after] functions @param on_exit_thread called at the end of each worker thread in the pool.
ran around each task. See {!Pool.create_args}. @param around_task
@param name name for the pool, used in tracing (since 0.6) a pair of [before, after] functions ran around each task. See
*) {!Pool.create_args}.
@param name name for the pool, used in tracing (since 0.6) *)
val with_ : (unit -> (t -> 'a) -> 'a, _) create_args val with_ : (unit -> (t -> 'a) -> 'a, _) create_args
(** [with_ () f] calls [f pool], where [pool] is obtained via {!create}. (** [with_ () f] calls [f pool], where [pool] is obtained via {!create}. When
When [f pool] returns or fails, [pool] is shutdown and its resources [f pool] returns or fails, [pool] is shutdown and its resources are
are released. released. Most parameters are the same as in {!create}. *)
Most parameters are the same as in {!create}. *)
(**/**) (**/**)

View file

@ -3,23 +3,23 @@ module C = Picos.Computation
type 'a or_error = ('a, Exn_bt.t) result type 'a or_error = ('a, Exn_bt.t) result
type 'a waiter = 'a or_error -> unit type 'a waiter = 'a or_error -> unit
type 'a t = { st: 'a C.t } [@@unboxed] type 'a t = 'a C.t
type 'a promise = 'a t type 'a promise = 'a t
let[@inline] make_promise () : _ t = let[@inline] make_promise () : _ t =
let fut = { st = C.create ~mode:`LIFO () } in let fut = C.create ~mode:`LIFO () in
fut fut
let make () = let make () =
let fut = make_promise () in let fut = make_promise () in
fut, fut fut, fut
let[@inline] return x : _ t = { st = C.returned x } let[@inline] return x : _ t = C.returned x
let[@inline] fail exn bt : _ t = let[@inline] fail exn bt : _ t =
let st = C.create () in let fut = C.create () in
C.cancel st exn bt; C.cancel fut exn bt;
{ st } fut
let[@inline] fail_exn_bt ebt = fail (Exn_bt.exn ebt) (Exn_bt.bt ebt) let[@inline] fail_exn_bt ebt = fail (Exn_bt.exn ebt) (Exn_bt.bt ebt)
@ -27,32 +27,32 @@ let[@inline] of_result = function
| Ok x -> return x | Ok x -> return x
| Error ebt -> fail_exn_bt ebt | Error ebt -> fail_exn_bt ebt
let[@inline] is_resolved self : bool = not (C.is_running self.st) let[@inline] is_resolved self : bool = not (C.is_running self)
let is_done = is_resolved let is_done = is_resolved
let[@inline] peek self : _ option = C.peek self.st let peek : 'a t -> _ option = C.peek
let[@inline] raise_if_failed self : unit = C.check self.st let raise_if_failed : _ t -> unit = C.check
let[@inline] is_success self = let[@inline] is_success self =
match C.peek_exn self.st with match C.peek_exn self with
| _ -> true | _ -> true
| exception _ -> false | exception _ -> false
let[@inline] is_failed self = C.is_canceled self.st let is_failed : _ t -> bool = C.is_canceled
exception Not_ready exception Not_ready
let[@inline] get_or_fail self = let[@inline] get_or_fail self =
match C.peek self.st with match C.peek self with
| Some x -> x | Some x -> x
| None -> raise Not_ready | None -> raise Not_ready
let[@inline] get_or_fail_exn self = let[@inline] get_or_fail_exn self =
match C.peek_exn self.st with match C.peek_exn self with
| x -> x | x -> x
| exception C.Running -> raise Not_ready | exception C.Running -> raise Not_ready
let[@inline] peek_or_assert_ (self : 'a t) : 'a = let[@inline] peek_or_assert_ (self : 'a t) : 'a =
match C.peek_exn self.st with match C.peek_exn self with
| x -> x | x -> x
| exception C.Running -> assert false | exception C.Running -> assert false
@ -67,32 +67,32 @@ let on_result (self : _ t) (f : _ waiter) : unit =
let trigger = let trigger =
(Trigger.from_action f self on_result_cb_ [@alert "-handler"]) (Trigger.from_action f self on_result_cb_ [@alert "-handler"])
in in
if not (C.try_attach self.st trigger) then on_result_cb_ () f self if not (C.try_attach self trigger) then on_result_cb_ () f self
let on_result_ignore_cb_ _tr f (self : _ t) = let on_result_ignore_cb_ _tr f (self : _ t) =
f (Picos.Computation.canceled self.st) f (Picos.Computation.canceled self)
let on_result_ignore (self : _ t) f : unit = let on_result_ignore (self : _ t) f : unit =
if Picos.Computation.is_running self.st then ( if Picos.Computation.is_running self then (
let trigger = let trigger =
(Trigger.from_action f self on_result_ignore_cb_ [@alert "-handler"]) (Trigger.from_action f self on_result_ignore_cb_ [@alert "-handler"])
in in
if not (C.try_attach self.st trigger) then on_result_ignore_cb_ () f self if not (C.try_attach self trigger) then on_result_ignore_cb_ () f self
) else ) else
on_result_ignore_cb_ () f self on_result_ignore_cb_ () f self
let[@inline] fulfill_idempotent self r = let[@inline] fulfill_idempotent self r =
match r with match r with
| Ok x -> C.return self.st x | Ok x -> C.return self x
| Error ebt -> C.cancel self.st (Exn_bt.exn ebt) (Exn_bt.bt ebt) | Error ebt -> C.cancel self (Exn_bt.exn ebt) (Exn_bt.bt ebt)
exception Already_fulfilled exception Already_fulfilled
let fulfill (self : _ t) (r : _ result) : unit = let fulfill (self : _ t) (r : _ result) : unit =
let ok = let ok =
match r with match r with
| Ok x -> C.try_return self.st x | Ok x -> C.try_return self x
| Error ebt -> C.try_cancel self.st (Exn_bt.exn ebt) (Exn_bt.bt ebt) | Error ebt -> C.try_cancel self (Exn_bt.exn ebt) (Exn_bt.bt ebt)
in in
if not ok then raise Already_fulfilled if not ok then raise Already_fulfilled
@ -104,10 +104,10 @@ let spawn ~on f : _ t =
let task () = let task () =
try try
let res = f () in let res = f () in
C.return fut.st res C.return fut res
with exn -> with exn ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
C.cancel fut.st exn bt C.cancel fut exn bt
in in
Runner.run_async on task; Runner.run_async on task;
@ -380,7 +380,7 @@ let for_list ~on l f : unit t =
let push_queue_ _tr q () = Bb_queue.push q () let push_queue_ _tr q () = Bb_queue.push q ()
let wait_block_exn (self : 'a t) : 'a = let wait_block_exn (self : 'a t) : 'a =
match C.peek_exn self.st with match C.peek_exn self with
| x -> x (* fast path *) | x -> x (* fast path *)
| exception C.Running -> | exception C.Running ->
let real_block () = let real_block () =
@ -394,7 +394,7 @@ let wait_block_exn (self : 'a t) : 'a =
assert attached; assert attached;
(* blockingly wait for trigger if computation didn't complete in the mean time *) (* blockingly wait for trigger if computation didn't complete in the mean time *)
if C.try_attach self.st trigger then Bb_queue.pop q; if C.try_attach self trigger then Bb_queue.pop q;
(* trigger was signaled! computation must be done*) (* trigger was signaled! computation must be done*)
peek_or_assert_ self peek_or_assert_ self
@ -406,7 +406,7 @@ let wait_block_exn (self : 'a t) : 'a =
if i = 0 then if i = 0 then
real_block () real_block ()
else ( else (
match C.peek_exn self.st with match C.peek_exn self with
| x -> x | x -> x
| exception C.Running -> | exception C.Running ->
Domain_.relax (); Domain_.relax ();
@ -426,12 +426,12 @@ let wait_block self =
let await (self : 'a t) : 'a = let await (self : 'a t) : 'a =
(* fast path: peek *) (* fast path: peek *)
match C.peek_exn self.st with match C.peek_exn self with
| res -> res | res -> res
| exception C.Running -> | exception C.Running ->
let trigger = Trigger.create () in let trigger = Trigger.create () in
(* suspend until the future is resolved *) (* suspend until the future is resolved *)
if C.try_attach self.st trigger then if C.try_attach self trigger then
Option.iter Exn_bt.raise @@ Trigger.await trigger; Option.iter Exn_bt.raise @@ Trigger.await trigger;
(* un-suspended: we should have a result! *) (* un-suspended: we should have a result! *)
@ -453,5 +453,5 @@ module Infix_local = Infix [@@deprecated "use Infix"]
module Private_ = struct module Private_ = struct
let[@inline] unsafe_promise_of_fut x = x let[@inline] unsafe_promise_of_fut x = x
let[@inline] as_computation self = self.st let[@inline] as_computation self = self
end end

View file

@ -1,31 +1,29 @@
(** Futures. (** Futures.
A future of type ['a t] represents the result of a computation A future of type ['a t] represents the result of a computation that will
that will yield a value of type ['a]. yield a value of type ['a].
Typically, the computation is running on a thread pool {!Runner.t} Typically, the computation is running on a thread pool {!Runner.t} and will
and will proceed on some worker. Once set, a future cannot change. proceed on some worker. Once set, a future cannot change. It either succeeds
It either succeeds (storing a [Ok x] with [x: 'a]), or fail (storing a [Ok x] with [x: 'a]), or fail (storing a [Error (exn, bt)] with
(storing a [Error (exn, bt)] with an exception and the corresponding an exception and the corresponding backtrace).
backtrace).
Combinators such as {!map} and {!join_array} can be used to produce Combinators such as {!map} and {!join_array} can be used to produce futures
futures from other futures (in a monadic way). Some combinators take from other futures (in a monadic way). Some combinators take a [on] argument
a [on] argument to specify a runner on which the intermediate computation takes to specify a runner on which the intermediate computation takes place; for
place; for example [map ~on:pool ~f fut] maps the value in [fut] example [map ~on:pool ~f fut] maps the value in [fut] using function [f],
using function [f], applicatively; the call to [f] happens on applicatively; the call to [f] happens on the runner [pool] (once [fut]
the runner [pool] (once [fut] resolves successfully with a value). resolves successfully with a value). *)
*)
type 'a or_error = ('a, Exn_bt.t) result type 'a or_error = ('a, Exn_bt.t) result
type 'a t type 'a t = 'a Picos.Computation.t
(** A future with a result of type ['a]. *) (** A future with a result of type ['a]. *)
type 'a promise = private 'a t type 'a promise = private 'a t
(** A promise, which can be fulfilled exactly once to set (** A promise, which can be fulfilled exactly once to set the corresponding
the corresponding future. future. This is a private alias of ['a t] since 0.7, previously it was
This is a private alias of ['a t] since NEXT_RELEASE, previously it was opaque. *) opaque. *)
val make : unit -> 'a t * 'a promise val make : unit -> 'a t * 'a promise
(** Make a new future with the associated promise. *) (** Make a new future with the associated promise. *)
@ -34,33 +32,32 @@ val make_promise : unit -> 'a promise
(** Same as {!make} but returns a single promise (which can be upcast to a (** Same as {!make} but returns a single promise (which can be upcast to a
future). This is useful mostly to preserve memory. future). This is useful mostly to preserve memory.
How to upcast to a future in the worst case: How to upcast to a future in the worst case:
{[let prom = Fut.make_promise();; {[
let fut = (prom : _ Fut.promise :> _ Fut.t) ;; let prom = Fut.make_promise ()
]} let fut = (prom : _ Fut.promise :> _ Fut.t)
@since NEXT_RELEASE *) ]}
@since 0.7 *)
val on_result : 'a t -> ('a or_error -> unit) -> unit val on_result : 'a t -> ('a or_error -> unit) -> unit
(** [on_result fut f] registers [f] to be called in the future (** [on_result fut f] registers [f] to be called in the future when [fut] is set
when [fut] is set ; ; or calls [f] immediately if [fut] is already set. *)
or calls [f] immediately if [fut] is already set. *)
val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
(** [on_result_ignore fut f] registers [f] to be called in the future (** [on_result_ignore fut f] registers [f] to be called in the future when [fut]
when [fut] is set; is set; or calls [f] immediately if [fut] is already set. It does not pass
or calls [f] immediately if [fut] is already set. the result, only a success/error signal.
It does not pass the result, only a success/error signal.
@since 0.7 *) @since 0.7 *)
exception Already_fulfilled exception Already_fulfilled
val fulfill : 'a promise -> 'a or_error -> unit val fulfill : 'a promise -> 'a or_error -> unit
(** Fullfill the promise, setting the future at the same time. (** Fullfill the promise, setting the future at the same time.
@raise Already_fulfilled if the promise is already fulfilled. *) @raise Already_fulfilled if the promise is already fulfilled. *)
val fulfill_idempotent : 'a promise -> 'a or_error -> unit val fulfill_idempotent : 'a promise -> 'a or_error -> unit
(** Fullfill the promise, setting the future at the same time. (** Fullfill the promise, setting the future at the same time. Does nothing if
Does nothing if the promise is already fulfilled. *) the promise is already fulfilled. *)
val return : 'a -> 'a t val return : 'a -> 'a t
(** Already settled future, with a result *) (** Already settled future, with a result *)
@ -78,22 +75,22 @@ val is_resolved : _ t -> bool
(** [is_resolved fut] is [true] iff [fut] is resolved. *) (** [is_resolved fut] is [true] iff [fut] is resolved. *)
val peek : 'a t -> 'a or_error option val peek : 'a t -> 'a or_error option
(** [peek fut] returns [Some r] if [fut] is currently resolved with [r], (** [peek fut] returns [Some r] if [fut] is currently resolved with [r], and
and [None] if [fut] is not resolved yet. *) [None] if [fut] is not resolved yet. *)
exception Not_ready exception Not_ready
(** @since 0.2 *) (** @since 0.2 *)
val get_or_fail : 'a t -> 'a or_error val get_or_fail : 'a t -> 'a or_error
(** [get_or_fail fut] obtains the result from [fut] if it's fulfilled (** [get_or_fail fut] obtains the result from [fut] if it's fulfilled (i.e. if
(i.e. if [peek fut] returns [Some res], [get_or_fail fut] returns [res]). [peek fut] returns [Some res], [get_or_fail fut] returns [res]).
@raise Not_ready if the future is not ready. @raise Not_ready if the future is not ready.
@since 0.2 *) @since 0.2 *)
val get_or_fail_exn : 'a t -> 'a val get_or_fail_exn : 'a t -> 'a
(** [get_or_fail_exn fut] obtains the result from [fut] if it's fulfilled, (** [get_or_fail_exn fut] obtains the result from [fut] if it's fulfilled, like
like {!get_or_fail}. If the result is an [Error _], the exception inside {!get_or_fail}. If the result is an [Error _], the exception inside is
is re-raised. re-raised.
@raise Not_ready if the future is not ready. @raise Not_ready if the future is not ready.
@since 0.2 *) @since 0.2 *)
@ -116,12 +113,12 @@ val raise_if_failed : _ t -> unit
(** {2 Combinators} *) (** {2 Combinators} *)
val spawn : on:Runner.t -> (unit -> 'a) -> 'a t val spawn : on:Runner.t -> (unit -> 'a) -> 'a t
(** [spaw ~on f] runs [f()] on the given runner [on], and return a future that will (** [spaw ~on f] runs [f()] on the given runner [on], and return a future that
hold its result. *) will hold its result. *)
val spawn_on_current_runner : (unit -> 'a) -> 'a t val spawn_on_current_runner : (unit -> 'a) -> 'a t
(** This must be run from inside a runner, and schedules (** This must be run from inside a runner, and schedules the new task on it as
the new task on it as well. well.
See {!Runner.get_current_runner} to see how the runner is found. See {!Runner.get_current_runner} to see how the runner is found.
@ -129,28 +126,26 @@ val spawn_on_current_runner : (unit -> 'a) -> 'a t
@raise Failure if run from outside a runner. *) @raise Failure if run from outside a runner. *)
val reify_error : 'a t -> 'a or_error t val reify_error : 'a t -> 'a or_error t
(** [reify_error fut] turns a failing future into a non-failing (** [reify_error fut] turns a failing future into a non-failing one that contain
one that contain [Error (exn, bt)]. A non-failing future [Error (exn, bt)]. A non-failing future returning [x] is turned into [Ok x]
returning [x] is turned into [Ok x]
@since 0.4 *) @since 0.4 *)
val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t
(** [map ?on ~f fut] returns a new future [fut2] that resolves (** [map ?on ~f fut] returns a new future [fut2] that resolves with [f x] if
with [f x] if [fut] resolved with [x]; [fut] resolved with [x]; and fails with [e] if [fut] fails with [e] or [f x]
and fails with [e] if [fut] fails with [e] or [f x] raises [e]. raises [e].
@param on if provided, [f] runs on the given runner *) @param on if provided, [f] runs on the given runner *)
val bind : ?on:Runner.t -> f:('a -> 'b t) -> 'a t -> 'b t val bind : ?on:Runner.t -> f:('a -> 'b t) -> 'a t -> 'b t
(** [bind ?on ~f fut] returns a new future [fut2] that resolves (** [bind ?on ~f fut] returns a new future [fut2] that resolves like the future
like the future [f x] if [fut] resolved with [x]; [f x] if [fut] resolved with [x]; and fails with [e] if [fut] fails with [e]
and fails with [e] if [fut] fails with [e] or [f x] raises [e]. or [f x] raises [e].
@param on if provided, [f] runs on the given runner *) @param on if provided, [f] runs on the given runner *)
val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t
(** [bind_reify_error ?on ~f fut] returns a new future [fut2] that resolves (** [bind_reify_error ?on ~f fut] returns a new future [fut2] that resolves like
like the future [f (Ok x)] if [fut] resolved with [x]; the future [f (Ok x)] if [fut] resolved with [x]; and resolves like the
and resolves like the future [f (Error (exn, bt))] future [f (Error (exn, bt))] if [fut] fails with [exn] and backtrace [bt].
if [fut] fails with [exn] and backtrace [bt].
@param on if provided, [f] runs on the given runner @param on if provided, [f] runs on the given runner
@since 0.4 *) @since 0.4 *)
@ -159,18 +154,18 @@ val join : 'a t t -> 'a t
@since 0.2 *) @since 0.2 *)
val both : 'a t -> 'b t -> ('a * 'b) t val both : 'a t -> 'b t -> ('a * 'b) t
(** [both a b] succeeds with [x, y] if [a] succeeds with [x] and (** [both a b] succeeds with [x, y] if [a] succeeds with [x] and [b] succeeds
[b] succeeds with [y], or fails if any of them fails. *) with [y], or fails if any of them fails. *)
val choose : 'a t -> 'b t -> ('a, 'b) Either.t t val choose : 'a t -> 'b t -> ('a, 'b) Either.t t
(** [choose a b] succeeds [Left x] or [Right y] if [a] succeeds with [x] or (** [choose a b] succeeds [Left x] or [Right y] if [a] succeeds with [x] or [b]
[b] succeeds with [y], or fails if both of them fails. succeeds with [y], or fails if both of them fails. If they both succeed, it
If they both succeed, it is not specified which result is used. *) is not specified which result is used. *)
val choose_same : 'a t -> 'a t -> 'a t val choose_same : 'a t -> 'a t -> 'a t
(** [choose_same a b] succeeds with the value of one of [a] or [b] if (** [choose_same a b] succeeds with the value of one of [a] or [b] if they
they succeed, or fails if both fail. succeed, or fails if both fail. If they both succeed, it is not specified
If they both succeed, it is not specified which result is used. *) which result is used. *)
val join_array : 'a t array -> 'a array t val join_array : 'a t array -> 'a array t
(** Wait for all the futures in the array. Fails if any future fails. *) (** Wait for all the futures in the array. Fails if any future fails. *)
@ -185,20 +180,20 @@ module Advanced : sig
aggregate_results:(('a t -> 'a) -> 'cont -> 'res) -> aggregate_results:(('a t -> 'a) -> 'cont -> 'res) ->
'cont -> 'cont ->
'res t 'res t
(** [barrier_on_abstract_container_of_futures ~iter ~aggregate_results ~len cont] takes a (** [barrier_on_abstract_container_of_futures ~iter ~aggregate_results ~len
container of futures ([cont]), with [len] elements, cont] takes a container of futures ([cont]), with [len] elements, and
and returns a future result of type [res] returns a future result of type [res] (possibly another type of
(possibly another type of container). container).
This waits for all futures in [cont: 'cont] to be done This waits for all futures in [cont: 'cont] to be done (futures obtained
(futures obtained via [iter <some function> cont]). If they via [iter <some function> cont]). If they all succeed, their results are
all succeed, their results are aggregated into a new aggregated into a new result of type ['res] via
result of type ['res] via [aggregate_results <some function> cont]. [aggregate_results <some function> cont].
{b NOTE}: the behavior is not specified if [iter f cont] (for a function f) {b NOTE}: the behavior is not specified if [iter f cont] (for a function
doesn't call [f] on exactly [len cont] elements. f) doesn't call [f] on exactly [len cont] elements.
@since 0.5.1 *) @since 0.5.1 *)
end end
val map_list : f:('a -> 'b t) -> 'a list -> 'b list t val map_list : f:('a -> 'b t) -> 'a list -> 'b list t
@ -206,23 +201,22 @@ val map_list : f:('a -> 'b t) -> 'a list -> 'b list t
@since 0.5.1 *) @since 0.5.1 *)
val wait_array : _ t array -> unit t val wait_array : _ t array -> unit t
(** [wait_array arr] waits for all futures in [arr] to resolve. It discards (** [wait_array arr] waits for all futures in [arr] to resolve. It discards the
the individual results of futures in [arr]. It fails if any future fails. *) individual results of futures in [arr]. It fails if any future fails. *)
val wait_list : _ t list -> unit t val wait_list : _ t list -> unit t
(** [wait_list l] waits for all futures in [l] to resolve. It discards (** [wait_list l] waits for all futures in [l] to resolve. It discards the
the individual results of futures in [l]. It fails if any future fails. *) individual results of futures in [l]. It fails if any future fails. *)
val for_ : on:Runner.t -> int -> (int -> unit) -> unit t val for_ : on:Runner.t -> int -> (int -> unit) -> unit t
(** [for_ ~on n f] runs [f 0], [f 1], …, [f (n-1)] on the runner, and returns (** [for_ ~on n f] runs [f 0], [f 1], …, [f (n-1)] on the runner, and returns a
a future that resolves when all the tasks have resolved, or fails future that resolves when all the tasks have resolved, or fails as soon as
as soon as one task has failed. *) one task has failed. *)
val for_array : on:Runner.t -> 'a array -> (int -> 'a -> unit) -> unit t val for_array : on:Runner.t -> 'a array -> (int -> 'a -> unit) -> unit t
(** [for_array ~on arr f] runs [f 0 arr.(0)], …, [f (n-1) arr.(n-1)] in (** [for_array ~on arr f] runs [f 0 arr.(0)], …, [f (n-1) arr.(n-1)] in the
the runner (where [n = Array.length arr]), and returns a future runner (where [n = Array.length arr]), and returns a future that resolves
that resolves when all the tasks are done, when all the tasks are done, or fails if any of them fails.
or fails if any of them fails.
@since 0.2 *) @since 0.2 *)
val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t
@ -242,43 +236,39 @@ val await : 'a t -> 'a
@since 0.3 @since 0.3
This must only be run from inside the runner itself. The runner must This must only be run from inside the runner itself. The runner must support
support {!Suspend_}. {!Suspend_}. {b NOTE}: only on OCaml 5.x *)
{b NOTE}: only on OCaml 5.x
*)
[@@@endif] [@@@endif]
(** {2 Blocking} *) (** {2 Blocking} *)
val wait_block : 'a t -> 'a or_error val wait_block : 'a t -> 'a or_error
(** [wait_block fut] blocks the current thread until [fut] is resolved, (** [wait_block fut] blocks the current thread until [fut] is resolved, and
and returns its value. returns its value.
{b NOTE}: A word of warning: this will monopolize the calling thread until the future {b NOTE}: A word of warning: this will monopolize the calling thread until
resolves. This can also easily cause deadlocks, if enough threads in a pool the future resolves. This can also easily cause deadlocks, if enough threads
call [wait_block] on futures running on the same pool or a pool depending on it. in a pool call [wait_block] on futures running on the same pool or a pool
depending on it.
A good rule to avoid deadlocks is to run this from outside of any pool, A good rule to avoid deadlocks is to run this from outside of any pool, or
or to have an acyclic order between pools where [wait_block] to have an acyclic order between pools where [wait_block] is only called
is only called from a pool on futures evaluated in a pool that comes lower from a pool on futures evaluated in a pool that comes lower in the
in the hierarchy. hierarchy. If this rule is broken, it is possible for all threads in a pool
If this rule is broken, it is possible for all threads in a pool to wait to wait for futures that can only make progress on these same threads, hence
for futures that can only make progress on these same threads, the deadlock. *)
hence the deadlock.
*)
val wait_block_exn : 'a t -> 'a val wait_block_exn : 'a t -> 'a
(** Same as {!wait_block} but re-raises the exception if the future failed. *) (** Same as {!wait_block} but re-raises the exception if the future failed. *)
(** {2 Infix operators} (** {2 Infix operators}
These combinators run on either the current pool (if present), These combinators run on either the current pool (if present), or on the
or on the same thread that just fulfilled the previous future same thread that just fulfilled the previous future if not.
if not.
They were previously present as [module Infix_local] and [val infix], They were previously present as [module Infix_local] and [val infix], but
but are now simplified. are now simplified.
@since 0.5 *) @since 0.5 *)

View file

@ -42,9 +42,8 @@ let[@inline] remove_in_local_hmap (k : _ Hmap.key) : unit =
let[@inline] set_in_local_hmap (k : 'a Hmap.key) (v : 'a) : unit = let[@inline] set_in_local_hmap (k : 'a Hmap.key) (v : 'a) : unit =
update_local_hmap (Hmap.add k v) update_local_hmap (Hmap.add k v)
(** [with_in_local_hmap k v f] calls [f()] in a context (** [with_in_local_hmap k v f] calls [f()] in a context where [k] is bound to
where [k] is bound to [v] in the local hmap. Then it restores the [v] in the local hmap. Then it restores the previous binding for [k]. *)
previous binding for [k]. *)
let with_in_local_hmap (k : 'a Hmap.key) (v : 'a) f = let with_in_local_hmap (k : 'a Hmap.key) (v : 'a) f =
let h = get_local_hmap () in let h = get_local_hmap () in
match Hmap.find k h with match Hmap.find k h with

View file

@ -1,8 +1,8 @@
(** Mutex-protected resource. (** Mutex-protected resource.
This lock is a synchronous concurrency primitive, as a thin wrapper This lock is a synchronous concurrency primitive, as a thin wrapper around
around {!Mutex} that encourages proper management of the critical {!Mutex} that encourages proper management of the critical section in RAII
section in RAII style: style:
{[ {[
let (let@) = (@@) let (let@) = (@@)
@ -19,8 +19,8 @@
]} ]}
This lock does not work well with {!Fut.await}. A critical section This lock does not work well with {!Fut.await}. A critical section that
that contains a call to [await] might cause deadlocks, or lock starvation, contains a call to [await] might cause deadlocks, or lock starvation,
because it will hold onto the lock while it goes to sleep. because it will hold onto the lock while it goes to sleep.
@since 0.3 *) @since 0.3 *)
@ -32,27 +32,27 @@ val create : 'a -> 'a t
(** Create a new protected value. *) (** Create a new protected value. *)
val with_ : 'a t -> ('a -> 'b) -> 'b val with_ : 'a t -> ('a -> 'b) -> 'b
(** [with_ l f] runs [f x] where [x] is the value protected with (** [with_ l f] runs [f x] where [x] is the value protected with the lock [l],
the lock [l], in a critical section. If [f x] fails, [with_lock l f] in a critical section. If [f x] fails, [with_lock l f] fails too but the
fails too but the lock is released. *) lock is released. *)
val update : 'a t -> ('a -> 'a) -> unit val update : 'a t -> ('a -> 'a) -> unit
(** [update l f] replaces the content [x] of [l] with [f x], while protected (** [update l f] replaces the content [x] of [l] with [f x], while protected by
by the mutex. *) the mutex. *)
val update_map : 'a t -> ('a -> 'a * 'b) -> 'b val update_map : 'a t -> ('a -> 'a * 'b) -> 'b
(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l] (** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l] and
and returns [y], while protected by the mutex. *) returns [y], while protected by the mutex. *)
val mutex : _ t -> Mutex.t val mutex : _ t -> Mutex.t
(** Underlying mutex. *) (** Underlying mutex. *)
val get : 'a t -> 'a val get : 'a t -> 'a
(** Atomically get the value in the lock. The value that is returned (** Atomically get the value in the lock. The value that is returned isn't
isn't protected! *) protected! *)
val set : 'a t -> 'a -> unit val set : 'a t -> 'a -> unit
(** Atomically set the value. (** Atomically set the value.
{b NOTE} caution: using {!get} and {!set} as if this were a {!ref} {b NOTE} caution: using {!get} and {!set} as if this were a {!ref} is an
is an anti pattern and will not protect data against some race conditions. *) anti pattern and will not protect data against some race conditions. *)

View file

@ -16,6 +16,7 @@ let spawn_on_current_runner = Fut.spawn_on_current_runner
[@@@ifge 5.0] [@@@ifge 5.0]
let await = Fut.await let await = Fut.await
let yield = Picos.Fiber.yield
[@@@endif] [@@@endif]

View file

@ -1,13 +1,12 @@
(** Moonpool (** Moonpool
A pool within a bigger pool (ie the ocean). Here, we're talking about A pool within a bigger pool (ie the ocean). Here, we're talking about pools
pools of [Thread.t] that are dispatched over several [Domain.t] to of [Thread.t] that are dispatched over several [Domain.t] to enable
enable parallelism. parallelism.
We provide several implementations of pools We provide several implementations of pools with distinct scheduling
with distinct scheduling strategies, alongside some concurrency strategies, alongside some concurrency primitives such as guarding locks
primitives such as guarding locks ({!Lock.t}) and futures ({!Fut.t}). ({!Lock.t}) and futures ({!Fut.t}). *)
*)
module Ws_pool = Ws_pool module Ws_pool = Ws_pool
module Fifo_pool = Fifo_pool module Fifo_pool = Fifo_pool
@ -24,45 +23,45 @@ module Immediate_runner : sig end
module Exn_bt = Exn_bt module Exn_bt = Exn_bt
exception Shutdown exception Shutdown
(** Exception raised when trying to run tasks on (** Exception raised when trying to run tasks on runners that have been shut
runners that have been shut down. down.
@since 0.6 *) @since 0.6 *)
val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t
(** Similar to {!Thread.create}, but it picks a background domain at random (** Similar to {!Thread.create}, but it picks a background domain at random to
to run the thread. This ensures that we don't always pick the same domain run the thread. This ensures that we don't always pick the same domain to
to run all the various threads needed in an application (timers, event loops, etc.) *) run all the various threads needed in an application (timers, event loops,
etc.) *)
val run_async : ?fiber:Picos.Fiber.t -> Runner.t -> (unit -> unit) -> unit val run_async : ?fiber:Picos.Fiber.t -> Runner.t -> (unit -> unit) -> unit
(** [run_async runner task] schedules the task to run (** [run_async runner task] schedules the task to run on the given runner. This
on the given runner. This means [task()] will be executed means [task()] will be executed at some point in the future, possibly in
at some point in the future, possibly in another thread. another thread.
@param fiber optional initial (picos) fiber state @param fiber optional initial (picos) fiber state
@since 0.5 *) @since 0.5 *)
val run_wait_block : ?fiber:Picos.Fiber.t -> Runner.t -> (unit -> 'a) -> 'a val run_wait_block : ?fiber:Picos.Fiber.t -> Runner.t -> (unit -> 'a) -> 'a
(** [run_wait_block runner f] schedules [f] for later execution (** [run_wait_block runner f] schedules [f] for later execution on the runner,
on the runner, like {!run_async}. like {!run_async}. It then blocks the current thread until [f()] is done
It then blocks the current thread until [f()] is done executing, executing, and returns its result. If [f()] raises an exception, then
and returns its result. If [f()] raises an exception, then [run_wait_block pool f] [run_wait_block pool f] will raise it as well.
will raise it as well.
See {!run_async} for more details. See {!run_async} for more details.
{b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block} {b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block} about the
about the required discipline to avoid deadlocks). required discipline to avoid deadlocks).
@raise Shutdown if the runner was already shut down @raise Shutdown if the runner was already shut down
@since 0.6 *) @since 0.6 *)
val recommended_thread_count : unit -> int val recommended_thread_count : unit -> int
(** Number of threads recommended to saturate the CPU. (** Number of threads recommended to saturate the CPU. For IO pools this makes
For IO pools this makes little sense (you might want more threads than little sense (you might want more threads than this because many of them
this because many of them will be blocked most of the time). will be blocked most of the time).
@since 0.5 *) @since 0.5 *)
val spawn : on:Runner.t -> (unit -> 'a) -> 'a Fut.t val spawn : on:Runner.t -> (unit -> 'a) -> 'a Fut.t
(** [spawn ~on f] runs [f()] on the runner (a thread pool typically) (** [spawn ~on f] runs [f()] on the runner (a thread pool typically) and returns
and returns a future result for it. See {!Fut.spawn}. a future result for it. See {!Fut.spawn}.
@since 0.5 *) @since 0.5 *)
val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.t val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.t
@ -71,15 +70,20 @@ val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.t
val get_current_runner : unit -> Runner.t option val get_current_runner : unit -> Runner.t option
(** See {!Runner.get_current_runner} (** See {!Runner.get_current_runner}
@since 0.7 *) @since 0.7 *)
[@@@ifge 5.0] [@@@ifge 5.0]
val await : 'a Fut.t -> 'a val await : 'a Fut.t -> 'a
(** Await a future. See {!Fut.await}. (** Await a future, must be run on a moonpool runner. See {!Fut.await}. Only on
Only on OCaml >= 5.0. OCaml >= 5.0.
@since 0.5 *) @since 0.5 *)
val yield : unit -> unit
(** Yield from the current task, must be run on a moonpool runner. Only on OCaml
>= 5.0.
@since NEXT_RELEASE *)
[@@@endif] [@@@endif]
module Lock = Lock module Lock = Lock
@ -90,35 +94,33 @@ module Thread_local_storage = Thread_local_storage
(** A simple blocking queue. (** A simple blocking queue.
This queue is quite basic and will not behave well under heavy This queue is quite basic and will not behave well under heavy contention.
contention. However, it can be sufficient for many practical use cases. However, it can be sufficient for many practical use cases.
{b NOTE}: this queue will typically block the caller thread {b NOTE}: this queue will typically block the caller thread in case the
in case the operation (push/pop) cannot proceed. operation (push/pop) cannot proceed. Be wary of deadlocks when using the
Be wary of deadlocks when using the queue {i from} a pool queue {i from} a pool when you expect the other end to also be
when you expect the other end to also be produced/consumed from produced/consumed from the same pool.
the same pool.
See discussion on {!Fut.wait_block} for more details on deadlocks See discussion on {!Fut.wait_block} for more details on deadlocks and how to
and how to mitigate the risk of running into them. mitigate the risk of running into them.
More scalable queues can be found in More scalable queues can be found in Lockfree
Lockfree (https://github.com/ocaml-multicore/lockfree/) (https://github.com/ocaml-multicore/lockfree/) *)
*)
module Blocking_queue : sig module Blocking_queue : sig
type 'a t type 'a t
(** Unbounded blocking queue. (** Unbounded blocking queue.
This queue is thread-safe and will block when calling {!pop} This queue is thread-safe and will block when calling {!pop} on it when
on it when it's empty. *) it's empty. *)
val create : unit -> _ t val create : unit -> _ t
(** Create a new unbounded queue. *) (** Create a new unbounded queue. *)
val size : _ t -> int val size : _ t -> int
(** Number of items currently in the queue. Note that [pop] (** Number of items currently in the queue. Note that [pop] might still block
might still block if this returns a non-zero number, since another if this returns a non-zero number, since another thread might have
thread might have consumed the items in the mean time. consumed the items in the mean time.
@since 0.2 *) @since 0.2 *)
exception Closed exception Closed
@ -126,73 +128,70 @@ module Blocking_queue : sig
val push : 'a t -> 'a -> unit val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] into [q], and returns [()]. (** [push q x] pushes [x] into [q], and returns [()].
In the current implementation, [push q] will never block for In the current implementation, [push q] will never block for a long time,
a long time, it will only block while waiting for a lock it will only block while waiting for a lock so it can push the element.
so it can push the element.
@raise Closed if the queue is closed (by a previous call to [close q]) *) @raise Closed if the queue is closed (by a previous call to [close q]) *)
val pop : 'a t -> 'a val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element comes. (** [pop q] pops the next element in [q]. It might block until an element
@raise Closed if the queue was closed before a new element was available. *) comes.
@raise Closed if the queue was closed before a new element was available.
val close : _ t -> unit
(** Close the queue, meaning there won't be any more [push] allowed,
ie [push] will raise {!Closed}.
[pop] will keep working and will return the elements present in the
queue, until it's entirely drained; then [pop] will
also raise {!Closed}. *)
val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop q] immediately pops the first element of [q], if any,
or returns [None] without blocking.
@param force_lock if true, use {!Mutex.lock} (which can block under contention);
if false, use {!Mutex.try_lock}, which might return [None] even in
presence of an element if there's contention *)
val try_push : 'a t -> 'a -> bool
(** [try_push q x] tries to push into [q], in which case
it returns [true]; or it fails to push and returns [false]
without blocking.
@raise Closed if the locking succeeded but the queue is closed.
*) *)
val close : _ t -> unit
(** Close the queue, meaning there won't be any more [push] allowed, ie [push]
will raise {!Closed}.
[pop] will keep working and will return the elements present in the queue,
until it's entirely drained; then [pop] will also raise {!Closed}. *)
val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop q] immediately pops the first element of [q], if any, or returns
[None] without blocking.
@param force_lock
if true, use {!Mutex.lock} (which can block under contention); if false,
use {!Mutex.try_lock}, which might return [None] even in presence of an
element if there's contention *)
val try_push : 'a t -> 'a -> bool
(** [try_push q x] tries to push into [q], in which case it returns [true]; or
it fails to push and returns [false] without blocking.
@raise Closed if the locking succeeded but the queue is closed. *)
val transfer : 'a t -> 'a Queue.t -> unit val transfer : 'a t -> 'a Queue.t -> unit
(** [transfer bq q2] transfers all items presently (** [transfer bq q2] transfers all items presently in [bq] into [q2] in one
in [bq] into [q2] in one atomic section, and clears [bq]. atomic section, and clears [bq]. It blocks if no element is in [bq].
It blocks if no element is in [bq].
This is useful to consume elements from the queue in batch. This is useful to consume elements from the queue in batch. Create a
Create a [Queue.t] locally: [Queue.t] locally:
{[
let dowork (work_queue : job Bb_queue.t) =
(* local queue, not thread safe *)
let local_q = Queue.create () in
try
while true do
(* work on local events, already on this thread *)
while not (Queue.is_empty local_q) do
let job = Queue.pop local_q in
process_job job
done;
{[ (* get all the events in the incoming blocking queue, in
let dowork (work_queue: job Bb_queue.t) = one single critical section. *)
(* local queue, not thread safe *) Bb_queue.transfer work_queue local_q
let local_q = Queue.create() in done
try with Bb_queue.Closed -> ()
while true do ]}
(* work on local events, already on this thread *)
while not (Queue.is_empty local_q) do
let job = Queue.pop local_q in
process_job job
done;
(* get all the events in the incoming blocking queue, in @since 0.4 *)
one single critical section. *)
Bb_queue.transfer work_queue local_q
done
with Bb_queue.Closed -> ()
]}
@since 0.4 *)
type 'a gen = unit -> 'a option type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit type 'a iter = ('a -> unit) -> unit
val to_iter : 'a t -> 'a iter val to_iter : 'a t -> 'a iter
(** [to_iter q] returns an iterator over all items in the queue. (** [to_iter q] returns an iterator over all items in the queue. This might
This might not terminate if [q] is never closed. not terminate if [q] is never closed.
@since 0.4 *) @since 0.4 *)
val to_gen : 'a t -> 'a gen val to_gen : 'a t -> 'a gen
@ -209,8 +208,8 @@ module Bounded_queue = Bounded_queue
module Atomic = Atomic_ module Atomic = Atomic_
(** Atomic values. (** Atomic values.
This is either a shim using [ref], on pre-OCaml 5, or the This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic]
standard [Atomic] module on OCaml 5. *) module on OCaml 5. *)
(**/**) (**/**)
@ -220,9 +219,9 @@ module Private : sig
(** A deque for work stealing, fixed size. *) (** A deque for work stealing, fixed size. *)
module Worker_loop_ = Worker_loop_ module Worker_loop_ = Worker_loop_
(** Worker loop. This is useful to implement custom runners, it (** Worker loop. This is useful to implement custom runners, it should run on
should run on each thread of the runner. each thread of the runner.
@since 0.7 *) @since 0.7 *)
module Domain_ = Domain_ module Domain_ = Domain_
(** Utils for domains *) (** Utils for domains *)

View file

@ -1,9 +1,8 @@
(** Interface for runners. (** Interface for runners.
This provides an abstraction for running tasks in the background, This provides an abstraction for running tasks in the background, which is
which is implemented by various thread pools. implemented by various thread pools.
@since 0.3 @since 0.3 *)
*)
type fiber = Picos.Fiber.t type fiber = Picos.Fiber.t
type task = unit -> unit type task = unit -> unit
@ -12,19 +11,19 @@ type t
(** A runner. (** A runner.
If a runner is no longer needed, {!shutdown} can be used to signal all If a runner is no longer needed, {!shutdown} can be used to signal all
worker threads worker threads in it to stop (after they finish their work), and wait for
in it to stop (after they finish their work), and wait for them to stop. them to stop.
The threads are distributed across a fixed domain pool The threads are distributed across a fixed domain pool (whose size is
(whose size is determined by {!Domain.recommended_domain_count} on OCaml 5, and determined by {!Domain.recommended_domain_count} on OCaml 5, and simple the
simple the single runtime on OCaml 4). *) single runtime on OCaml 4). *)
val size : t -> int val size : t -> int
(** Number of threads/workers. *) (** Number of threads/workers. *)
val num_tasks : t -> int val num_tasks : t -> int
(** Current number of tasks. This is at best a snapshot, useful for metrics (** Current number of tasks. This is at best a snapshot, useful for metrics and
and debugging. *) debugging. *)
val shutdown : t -> unit val shutdown : t -> unit
(** Shutdown the runner and wait for it to terminate. Idempotent. *) (** Shutdown the runner and wait for it to terminate. Idempotent. *)
@ -35,32 +34,31 @@ val shutdown_without_waiting : t -> unit
exception Shutdown exception Shutdown
val run_async : ?fiber:fiber -> t -> task -> unit val run_async : ?fiber:fiber -> t -> task -> unit
(** [run_async pool f] schedules [f] for later execution on the runner (** [run_async pool f] schedules [f] for later execution on the runner in one of
in one of the threads. [f()] will run on one of the runner's the threads. [f()] will run on one of the runner's worker threads/domains.
worker threads/domains.
@param fiber if provided, run the task with this initial fiber data @param fiber if provided, run the task with this initial fiber data
@raise Shutdown if the runner was shut down before [run_async] was called. *) @raise Shutdown if the runner was shut down before [run_async] was called.
*)
val run_wait_block : ?fiber:fiber -> t -> (unit -> 'a) -> 'a val run_wait_block : ?fiber:fiber -> t -> (unit -> 'a) -> 'a
(** [run_wait_block pool f] schedules [f] for later execution (** [run_wait_block pool f] schedules [f] for later execution on the pool, like
on the pool, like {!run_async}. {!run_async}. It then blocks the current thread until [f()] is done
It then blocks the current thread until [f()] is done executing, executing, and returns its result. If [f()] raises an exception, then
and returns its result. If [f()] raises an exception, then [run_wait_block pool f] [run_wait_block pool f] will raise it as well.
will raise it as well.
{b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block} {b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block} about the
about the required discipline to avoid deadlocks). required discipline to avoid deadlocks).
@raise Shutdown if the runner was already shut down *) @raise Shutdown if the runner was already shut down *)
val dummy : t val dummy : t
(** Runner that fails when scheduling tasks on it. (** Runner that fails when scheduling tasks on it. Calling {!run_async} on it
Calling {!run_async} on it will raise Failure. will raise Failure.
@since 0.6 *) @since 0.6 *)
(** {2 Implementing runners} *) (** {2 Implementing runners} *)
(** This module is specifically intended for users who implement their (** This module is specifically intended for users who implement their own
own runners. Regular users of Moonpool should not need to look at it. *) runners. Regular users of Moonpool should not need to look at it. *)
module For_runner_implementors : sig module For_runner_implementors : sig
val create : val create :
size:(unit -> int) -> size:(unit -> int) ->
@ -71,21 +69,20 @@ module For_runner_implementors : sig
t t
(** Create a new runner. (** Create a new runner.
{b NOTE}: the runner should support DLA and {!Suspend_} on OCaml 5.x, {b NOTE}: the runner should support DLA and {!Suspend_} on OCaml 5.x, so
so that {!Fork_join} and other 5.x features work properly. *) that {!Fork_join} and other 5.x features work properly. *)
val k_cur_runner : t Thread_local_storage.t val k_cur_runner : t Thread_local_storage.t
(** Key that should be used by each runner to store itself in TLS (** Key that should be used by each runner to store itself in TLS on every
on every thread it controls, so that tasks running on these threads thread it controls, so that tasks running on these threads can access the
can access the runner. This is necessary for {!get_current_runner} runner. This is necessary for {!get_current_runner} to work. *)
to work. *)
end end
val get_current_runner : unit -> t option val get_current_runner : unit -> t option
(** Access the current runner. This returns [Some r] if the call (** Access the current runner. This returns [Some r] if the call happens on a
happens on a thread that belongs in a runner. thread that belongs in a runner.
@since 0.5 *) @since 0.5 *)
val get_current_fiber : unit -> fiber option val get_current_fiber : unit -> fiber option
(** [get_current_storage runner] gets the local storage (** [get_current_storage runner] gets the local storage for the currently
for the currently running task. *) running task. *)

View file

@ -1,41 +1,38 @@
(** Task-local storage. (** Task-local storage.
This storage is associated to the current task, This storage is associated to the current task, just like thread-local
just like thread-local storage is associated with storage is associated with the current thread. The storage is carried along
the current thread. The storage is carried along in case in case the current task is suspended.
the current task is suspended.
@since 0.6 @since 0.6 *)
*)
type 'a t = 'a Picos.Fiber.FLS.t type 'a t = 'a Picos.Fiber.FLS.t
val create : unit -> 'a t val create : unit -> 'a t
(** [create ()] makes a new key. Keys are expensive and (** [create ()] makes a new key. Keys are expensive and should never be
should never be allocated dynamically or in a loop. *) allocated dynamically or in a loop. *)
exception Not_set exception Not_set
val get_exn : 'a t -> 'a val get_exn : 'a t -> 'a
(** [get k] gets the value for the current task for key [k]. (** [get k] gets the value for the current task for key [k]. Must be run from
Must be run from inside a task running on a runner. inside a task running on a runner.
@raise Not_set otherwise *) @raise Not_set otherwise *)
val get_opt : 'a t -> 'a option val get_opt : 'a t -> 'a option
(** [get_opt k] gets the current task's value for key [k], (** [get_opt k] gets the current task's value for key [k], or [None] if not run
or [None] if not run from inside the task. *) from inside the task. *)
val get : 'a t -> default:'a -> 'a val get : 'a t -> default:'a -> 'a
val set : 'a t -> 'a -> unit val set : 'a t -> 'a -> unit
(** [set k v] sets the storage for [k] to [v]. (** [set k v] sets the storage for [k] to [v]. Must be run from inside a task
Must be run from inside a task running on a runner. running on a runner.
@raise Failure otherwise *) @raise Failure otherwise *)
val with_value : 'a t -> 'a -> (unit -> 'b) -> 'b val with_value : 'a t -> 'a -> (unit -> 'b) -> 'b
(** [with_value k v f] sets [k] to [v] for the duration of the call (** [with_value k v f] sets [k] to [v] for the duration of the call to [f()].
to [f()]. When [f()] returns (or fails), [k] is restored When [f()] returns (or fails), [k] is restored to its old value. *)
to its old value. *)
(** {2 Local [Hmap.t]} (** {2 Local [Hmap.t]}

View file

@ -102,7 +102,24 @@ let with_handler ~ops:_ self f = f ()
[@@@endif] [@@@endif]
let worker_loop (type st) ~(ops : st ops) (self : st) : unit = let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
if block_signals then (
try
ignore
(Unix.sigprocmask SIG_BLOCK
[
Sys.sigterm;
Sys.sigpipe;
Sys.sigint;
Sys.sigchld;
Sys.sigalrm;
Sys.sigusr1;
Sys.sigusr2;
]
: _ list)
with _ -> ()
);
let cur_fiber : fiber ref = ref _dummy_fiber in let cur_fiber : fiber ref = ref _dummy_fiber in
let runner = ops.runner self in let runner = ops.runner self in
TLS.set Runner.For_runner_implementors.k_cur_runner runner; TLS.set Runner.For_runner_implementors.k_cur_runner runner;

37
src/core/worker_loop_.mli Normal file
View file

@ -0,0 +1,37 @@
(** Internal module that is used for workers.
A thread pool should use this [worker_loop] to run tasks, handle effects,
etc. *)
open Types_
type task_full =
| T_start of {
fiber: fiber;
f: unit -> unit;
}
| T_resume : {
fiber: fiber;
k: unit -> unit;
}
-> task_full
val _dummy_task : task_full
type around_task =
| AT_pair : (Runner.t -> 'a) * (Runner.t -> 'a -> unit) -> around_task
exception No_more_tasks
type 'st ops = {
schedule: 'st -> task_full -> unit;
get_next_task: 'st -> task_full;
get_thread_state: unit -> 'st;
around_task: 'st -> around_task;
on_exn: 'st -> Exn_bt.t -> unit;
runner: 'st -> Runner.t;
before_start: 'st -> unit;
cleanup: 'st -> unit;
}
val worker_loop : block_signals:bool -> ops:'st ops -> 'st -> unit

View file

@ -16,7 +16,8 @@ end
type state = { type state = {
id_: Id.t; id_: Id.t;
(** Unique to this pool. Used to make sure tasks stay within the same pool. *) (** Unique to this pool. Used to make sure tasks stay within the same
pool. *)
active: bool A.t; (** Becomes [false] when the pool is shutdown. *) active: bool A.t; (** Becomes [false] when the pool is shutdown. *)
mutable workers: worker_state array; (** Fixed set of workers. *) mutable workers: worker_state array; (** Fixed set of workers. *)
main_q: WL.task_full Queue.t; main_q: WL.task_full Queue.t;
@ -43,9 +44,8 @@ and worker_state = {
q: WL.task_full WSQ.t; (** Work stealing queue *) q: WL.task_full WSQ.t; (** Work stealing queue *)
rng: Random.State.t; rng: Random.State.t;
} }
(** State for a given worker. Only this worker is (** State for a given worker. Only this worker is allowed to push into the
allowed to push into the queue, but other workers queue, but other workers can come and steal from it if they're idle. *)
can come and steal from it if they're idle. *)
let[@inline] size_ (self : state) = Array.length self.workers let[@inline] size_ (self : state) = Array.length self.workers
@ -55,9 +55,8 @@ let num_tasks_ (self : state) : int =
Array.iter (fun w -> n := !n + WSQ.size w.q) self.workers; Array.iter (fun w -> n := !n + WSQ.size w.q) self.workers;
!n !n
(** TLS, used by worker to store their specific state (** TLS, used by worker to store their specific state and be able to retrieve it
and be able to retrieve it from tasks when we schedule new from tasks when we schedule new sub-tasks. *)
sub-tasks. *)
let k_worker_state : worker_state TLS.t = TLS.create () let k_worker_state : worker_state TLS.t = TLS.create ()
let[@inline] get_current_worker_ () : worker_state option = let[@inline] get_current_worker_ () : worker_state option =
@ -77,8 +76,8 @@ let[@inline] try_wake_someone_ (self : state) : unit =
Mutex.unlock self.mutex Mutex.unlock self.mutex
) )
(** Push into worker's local queue, open to work stealing. (** Push into worker's local queue, open to work stealing. precondition: this
precondition: this runs on the worker thread whose state is [self] *) runs on the worker thread whose state is [self] *)
let schedule_on_current_worker (self : worker_state) task : unit = let schedule_on_current_worker (self : worker_state) task : unit =
(* we're on this same pool, schedule in the worker's state. Otherwise (* we're on this same pool, schedule in the worker's state. Otherwise
we might also be on pool A but asking to schedule on pool B, we might also be on pool A but asking to schedule on pool B,
@ -310,7 +309,9 @@ let create ?(on_init_thread = default_thread_init_exit_)
(* function called in domain with index [i], to (* function called in domain with index [i], to
create the thread and push it into [receive_threads] *) create the thread and push it into [receive_threads] *)
let create_thread_in_domain () = let create_thread_in_domain () =
let thread = Thread.create (WL.worker_loop ~ops:worker_ops) st in let thread =
Thread.create (WL.worker_loop ~block_signals:true ~ops:worker_ops) st
in
(* send the thread from the domain back to us *) (* send the thread from the domain back to us *)
Bb_queue.push receive_threads (idx, thread) Bb_queue.push receive_threads (idx, thread)
in in

View file

@ -1,23 +1,22 @@
(** Work-stealing thread pool. (** Work-stealing thread pool.
A pool of threads with a worker-stealing scheduler. A pool of threads with a worker-stealing scheduler. The pool contains a
The pool contains a fixed number of threads that wait for work fixed number of threads that wait for work items to come, process these, and
items to come, process these, and loop. loop.
This is good for CPU-intensive tasks that feature a lot of small tasks. This is good for CPU-intensive tasks that feature a lot of small tasks. Note
Note that tasks will not always be processed in the order they are that tasks will not always be processed in the order they are scheduled, so
scheduled, so this is not great for workloads where the latency this is not great for workloads where the latency of individual tasks matter
of individual tasks matter (for that see {!Fifo_pool}). (for that see {!Fifo_pool}).
This implements {!Runner.t} since 0.3. This implements {!Runner.t} since 0.3.
If a pool is no longer needed, {!shutdown} can be used to signal all threads If a pool is no longer needed, {!shutdown} can be used to signal all threads
in it to stop (after they finish their work), and wait for them to stop. in it to stop (after they finish their work), and wait for them to stop.
The threads are distributed across a fixed domain pool The threads are distributed across a fixed domain pool (whose size is
(whose size is determined by {!Domain.recommended_domain_count} on OCaml 5, determined by {!Domain.recommended_domain_count} on OCaml 5, and simply the
and simply the single runtime on OCaml 4). single runtime on OCaml 4). *)
*)
include module type of Runner include module type of Runner
@ -33,25 +32,26 @@ type ('a, 'b) create_args =
val create : (unit -> t, _) create_args val create : (unit -> t, _) create_args
(** [create ()] makes a new thread pool. (** [create ()] makes a new thread pool.
@param on_init_thread called at the beginning of each new thread @param on_init_thread
in the pool. called at the beginning of each new thread in the pool.
@param num_threads size of the pool, ie. number of worker threads. @param num_threads
It will be at least [1] internally, so [0] or negative values make no sense. size of the pool, ie. number of worker threads. It will be at least [1]
The default is [Domain.recommended_domain_count()], ie one worker internally, so [0] or negative values make no sense. The default is
thread per CPU core. [Domain.recommended_domain_count()], ie one worker thread per CPU core. On
On OCaml 4 the default is [4] (since there is only one domain). OCaml 4 the default is [4] (since there is only one domain).
@param on_exit_thread called at the end of each thread in the pool @param on_exit_thread called at the end of each thread in the pool
@param around_task a pair of [before, after], where [before pool] is called @param around_task
before a task is processed, a pair of [before, after], where [before pool] is called before a task is
on the worker thread about to run it, and returns [x]; and [after pool x] is called by processed, on the worker thread about to run it, and returns [x]; and
the same thread after the task is over. (since 0.2) [after pool x] is called by the same thread after the task is over. (since
@param name a name for this thread pool, used if tracing is enabled (since 0.6) 0.2)
*) @param name
a name for this thread pool, used if tracing is enabled (since 0.6) *)
val with_ : (unit -> (t -> 'a) -> 'a, _) create_args val with_ : (unit -> (t -> 'a) -> 'a, _) create_args
(** [with_ () f] calls [f pool], where [pool] is obtained via {!create}. (** [with_ () f] calls [f pool], where [pool] is obtained via {!create}. When
When [f pool] returns or fails, [pool] is shutdown and its resources [f pool] returns or fails, [pool] is shutdown and its resources are
are released. released.
Most parameters are the same as in {!create}. Most parameters are the same as in {!create}.
@since 0.3 *) @since 0.3 *)

View file

@ -76,25 +76,24 @@ type worker_state = {
(** Array of (optional) workers. (** Array of (optional) workers.
Workers are started/stop on demand. For each index we have Workers are started/stop on demand. For each index we have the (currently
the (currently active) domain's state active) domain's state including a work queue and a thread refcount; and the
including a work queue and a thread refcount; and the domain itself, domain itself, if any, in a separate option because it might outlive its own
if any, in a separate option because it might outlive its own state. *) state. *)
let domains_ : (worker_state option * Domain_.t option) Lock.t array = let domains_ : (worker_state option * Domain_.t option) Lock.t array =
let n = max 1 (Domain_.recommended_number ()) in let n = max 1 (Domain_.recommended_number ()) in
Array.init n (fun _ -> Lock.create (None, None)) Array.init n (fun _ -> Lock.create (None, None))
(** main work loop for a domain worker. (** main work loop for a domain worker.
A domain worker does two things: A domain worker does two things:
- run functions it's asked to (mainly, to start new threads inside it) - run functions it's asked to (mainly, to start new threads inside it)
- decrease the refcount when one of these threads stops. The thread - decrease the refcount when one of these threads stops. The thread will
will notify the domain that it's exiting, so the domain can know notify the domain that it's exiting, so the domain can know how many
how many threads are still using it. If all threads exit, the domain threads are still using it. If all threads exit, the domain polls a bit
polls a bit (in case new threads are created really shortly after, (in case new threads are created really shortly after, which happens with
which happens with a [Pool.with_] or [Pool.create() Pool.shutdown()] a [Pool.with_] or [Pool.create() Pool.shutdown()] in a tight loop), and
in a tight loop), and if nothing happens it tries to stop to free resources. if nothing happens it tries to stop to free resources. *)
*)
let work_ idx (st : worker_state) : unit = let work_ idx (st : worker_state) : unit =
let main_loop () = let main_loop () =
let continue = ref true in let continue = ref true in

View file

@ -1,18 +1,17 @@
(** Static pool of domains. (** Static pool of domains.
These domains are shared between {b all} the pools in moonpool. These domains are shared between {b all} the pools in moonpool. The
The rationale is that we should not have more domains than cores, so rationale is that we should not have more domains than cores, so it's easier
it's easier to reserve exactly that many domain slots, and run more flexible to reserve exactly that many domain slots, and run more flexible thread
thread pools on top (each domain being shared by potentially multiple threads pools on top (each domain being shared by potentially multiple threads from
from multiple pools). multiple pools).
The pool should not contain actual domains if it's not in use, ie if no The pool should not contain actual domains if it's not in use, ie if no
runner is presently actively using one or more of the domain slots. runner is presently actively using one or more of the domain slots.
{b NOTE}: Interface is still experimental. {b NOTE}: Interface is still experimental.
@since 0.6 @since 0.6 *)
*)
type domain = Domain_.t type domain = Domain_.t
@ -24,13 +23,13 @@ val max_number_of_domains : unit -> int
Be very cautious with this interface, or resource leaks might occur. *) Be very cautious with this interface, or resource leaks might occur. *)
val run_on : int -> (unit -> unit) -> unit val run_on : int -> (unit -> unit) -> unit
(** [run_on i f] runs [f()] on the domain with index [i]. (** [run_on i f] runs [f()] on the domain with index [i]. Precondition:
Precondition: [0 <= i < n_domains()]. The thread must call {!decr_on} [0 <= i < n_domains()]. The thread must call {!decr_on} with [i] once it's
with [i] once it's done. *) done. *)
val decr_on : int -> unit val decr_on : int -> unit
(** Signal that a thread is stopping on the domain with index [i]. *) (** Signal that a thread is stopping on the domain with index [i]. *)
val run_on_and_wait : int -> (unit -> 'a) -> 'a val run_on_and_wait : int -> (unit -> 'a) -> 'a
(** [run_on_and_wait i f] runs [f()] on the domain with index [i], (** [run_on_and_wait i f] runs [f()] on the domain with index [i], and blocks
and blocks until the result of [f()] is returned back. *) until the result of [f()] is returned back. *)

View file

@ -187,8 +187,8 @@ let with_on_cancel (self : _ t) cb (k : unit -> 'a) : 'a =
let h = add_on_cancel self cb in let h = add_on_cancel self cb in
Fun.protect k ~finally:(fun () -> remove_on_cancel self h) Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
(** Successfully resolve the fiber. This might still fail if (** Successfully resolve the fiber. This might still fail if some children
some children failed. *) failed. *)
let resolve_ok_ (self : 'a t) (r : 'a) : unit = let resolve_ok_ (self : 'a t) (r : 'a) : unit =
let r = A.make @@ Ok r in let r = A.make @@ Ok r in
let promise = prom_of_fut self.res in let promise = prom_of_fut self.res in

View file

@ -1,13 +1,11 @@
(** Fibers. (** Fibers.
A fiber is a lightweight computation that runs cooperatively A fiber is a lightweight computation that runs cooperatively alongside other
alongside other fibers. In the context of moonpool, fibers fibers. In the context of moonpool, fibers have additional properties:
have additional properties:
- they run in a moonpool runner - they run in a moonpool runner
- they form a simple supervision tree, enabling a limited form - they form a simple supervision tree, enabling a limited form of structured
of structured concurrency concurrency *)
*)
type cancel_callback = Exn_bt.t -> unit type cancel_callback = Exn_bt.t -> unit
(** A callback used in case of cancellation *) (** A callback used in case of cancellation *)
@ -26,8 +24,8 @@ module Private_ : sig
runner: Runner.t; runner: Runner.t;
pfiber: pfiber; pfiber: pfiber;
} }
(** Type definition, exposed so that {!any} can be unboxed. (** Type definition, exposed so that {!any} can be unboxed. Please do not rely
Please do not rely on that. *) on that. *)
type any = Any : _ t -> any [@@unboxed] type any = Any : _ t -> any [@@unboxed]
@ -58,8 +56,7 @@ val return : 'a -> 'a t
val fail : Exn_bt.t -> _ t val fail : Exn_bt.t -> _ t
val self : unit -> any val self : unit -> any
(** [self ()] is the current fiber. (** [self ()] is the current fiber. Must be run from inside a fiber.
Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *) @raise Failure if not run from inside a fiber. *)
val peek : 'a t -> 'a Fut.or_error option val peek : 'a t -> 'a Fut.or_error option
@ -78,16 +75,16 @@ val await : 'a t -> 'a
(** [await fib] is like [Fut.await (res fib)] *) (** [await fib] is like [Fut.await (res fib)] *)
val wait_block_exn : 'a t -> 'a val wait_block_exn : 'a t -> 'a
(** [wait_block_exn fib] is [Fut.wait_block_exn (res fib)]. (** [wait_block_exn fib] is [Fut.wait_block_exn (res fib)]. {b NOTE}: See
{b NOTE}: See {!Fut.wait_block} for warnings about deadlocks. *) {!Fut.wait_block} for warnings about deadlocks. *)
val wait_block : 'a t -> 'a Fut.or_error val wait_block : 'a t -> 'a Fut.or_error
(** [wait_block fib] is [Fut.wait_block (res fib)]. (** [wait_block fib] is [Fut.wait_block (res fib)]. {b NOTE}: See
{b NOTE}: See {!Fut.wait_block} for warnings about deadlocks. *) {!Fut.wait_block} for warnings about deadlocks. *)
val check_if_cancelled : unit -> unit val check_if_cancelled : unit -> unit
(** Check if the current fiber is cancelled, in which case this raises. (** Check if the current fiber is cancelled, in which case this raises. Must be
Must be run from inside a fiber. run from inside a fiber.
@raise e if the current fiber is cancelled with exception [e] @raise e if the current fiber is cancelled with exception [e]
@raise Failure if not run from a fiber. *) @raise Failure if not run from a fiber. *)
@ -99,55 +96,54 @@ type cancel_handle
(** An opaque handle for a single cancel callback in a fiber *) (** An opaque handle for a single cancel callback in a fiber *)
val add_on_cancel : _ t -> cancel_callback -> cancel_handle val add_on_cancel : _ t -> cancel_callback -> cancel_handle
(** [add_on_cancel fib cb] adds [cb] to the list of cancel callbacks (** [add_on_cancel fib cb] adds [cb] to the list of cancel callbacks for [fib].
for [fib]. If [fib] is already cancelled, [cb] is called immediately. *) If [fib] is already cancelled, [cb] is called immediately. *)
val remove_on_cancel : _ t -> cancel_handle -> unit val remove_on_cancel : _ t -> cancel_handle -> unit
(** [remove_on_cancel fib h] removes the cancel callback (** [remove_on_cancel fib h] removes the cancel callback associated with handle
associated with handle [h]. *) [h]. *)
val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'a val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'a
(** [with_on_cancel fib cb (fun () -> <e>)] evaluates [e] (** [with_on_cancel fib cb (fun () -> <e>)] evaluates [e] in a scope in which,
in a scope in which, if the fiber [fib] is cancelled, if the fiber [fib] is cancelled, [cb()] is called. If [e] returns without
[cb()] is called. If [e] returns without the fiber being cancelled, the fiber being cancelled, this callback is removed. *)
this callback is removed. *)
val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'a val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'a
(** [with_on_self_cancel cb f] calls [f()] in a scope where (** [with_on_self_cancel cb f] calls [f()] in a scope where [cb] is added to the
[cb] is added to the cancel callbacks of the current fiber; cancel callbacks of the current fiber; and [f()] terminates, [cb] is removed
and [f()] terminates, [cb] is removed from the list. *) from the list. *)
val on_result : 'a t -> 'a callback -> unit val on_result : 'a t -> 'a callback -> unit
(** Wait for fiber to be done and call the callback (** Wait for fiber to be done and call the callback with the result. If the
with the result. If the fiber is done already then the fiber is done already then the callback is invoked immediately with its
callback is invoked immediately with its result. *) result. *)
val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t
(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. (** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. This
This fiber is not the child of any other fiber: its lifetime fiber is not the child of any other fiber: its lifetime is only determined
is only determined by the lifetime of [f()]. *) by the lifetime of [f()]. *)
val spawn : ?on:Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a t val spawn : ?on:Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a t
(** [spawn ~protect f] spawns a sub-fiber [f_child] (** [spawn ~protect f] spawns a sub-fiber [f_child] from a running fiber
from a running fiber [parent]. [parent]. The sub-fiber [f_child] is attached to the current fiber and fails
The sub-fiber [f_child] is attached to the current fiber and fails
if the current fiber [parent] fails. if the current fiber [parent] fails.
@param on if provided, start the fiber on the given runner. If not @param on
provided, use the parent's runner. if provided, start the fiber on the given runner. If not provided, use the
@param protect if true, when [f_child] fails, it does not parent's runner.
affect [parent]. If false, [f_child] failing also @param protect
causes [parent] to fail (and therefore all other children if true, when [f_child] fails, it does not affect [parent]. If false,
of [parent]). Default is [true]. [f_child] failing also causes [parent] to fail (and therefore all other
children of [parent]). Default is [true].
Must be run from inside a fiber. Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *) @raise Failure if not run from inside a fiber. *)
val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit
(** [spawn_ignore f] is [ignore (spawn f)]. (** [spawn_ignore f] is [ignore (spawn f)]. The fiber will still affect
The fiber will still affect termination of the parent, ie. the termination of the parent, ie. the parent will exit only after this new
parent will exit only after this new fiber exits. fiber exits.
@param on the optional runner to use, added since NEXT_RELEASE *) @param on the optional runner to use, added since 0.7 *)
val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit
(** Like {!spawn_top} but ignores the result. (** Like {!spawn_top} but ignores the result.

View file

@ -1,18 +1,16 @@
(** Fiber-local storage. (** Fiber-local storage.
This storage is associated to the current fiber, This storage is associated to the current fiber, just like thread-local
just like thread-local storage is associated with storage is associated with the current thread.
the current thread.
See {!Moonpool.Task_local_storage} for more general information, as See {!Moonpool.Task_local_storage} for more general information, as this is
this is based on it. based on it.
{b NOTE}: it's important to note that, while each fiber {b NOTE}: it's important to note that, while each fiber has its own storage,
has its own storage, spawning a sub-fiber [f2] from a fiber [f1] spawning a sub-fiber [f2] from a fiber [f1] will only do a shallow copy of
will only do a shallow copy of the storage. the storage. Values inside [f1]'s storage will be physically shared with
Values inside [f1]'s storage will be physically shared with [f2]. [f2]. It is thus recommended to store only persistent values in the local
It is thus recommended to store only persistent values in the local storage. storage. *)
*)
include module type of struct include module type of struct
include Task_local_storage include Task_local_storage

View file

@ -1,7 +1,7 @@
(** The unique name of a fiber. (** The unique name of a fiber.
Each fiber has a unique handle that can be used to Each fiber has a unique handle that can be used to refer to it in maps or
refer to it in maps or sets. *) sets. *)
type t = private int type t = private int
(** Unique, opaque identifier for a fiber. *) (** Unique, opaque identifier for a fiber. *)

View file

@ -1,6 +1,6 @@
exception Oh_no of Exn_bt.t exception Oh_no of Exn_bt.t
let main (f : Runner.t -> 'a) : 'a = let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
let worker_st = let worker_st =
Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ()) Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ())
~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt))) ~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt)))
@ -13,6 +13,7 @@ let main (f : Runner.t -> 'a) : 'a =
(* run the main thread *) (* run the main thread *)
Moonpool.Private.Worker_loop_.worker_loop worker_st Moonpool.Private.Worker_loop_.worker_loop worker_st
~block_signals (* do not disturb existing thread *)
~ops:Fifo_pool.Private_.worker_ops; ~ops:Fifo_pool.Private_.worker_ops;
match Fiber.peek fiber with match Fiber.peek fiber with
@ -20,3 +21,6 @@ let main (f : Runner.t -> 'a) : 'a =
| Some (Error ebt) -> Exn_bt.raise ebt | Some (Error ebt) -> Exn_bt.raise ebt
| None -> assert false | None -> assert false
with Oh_no ebt -> Exn_bt.raise ebt with Oh_no ebt -> Exn_bt.raise ebt
let main f =
main' () f ~block_signals:false (* do not disturb existing thread *)

View file

@ -1,25 +1,28 @@
(** Main thread. (** Main thread.
This is evolved from [Moonpool.Immediate_runner], but unlike it, This is evolved from [Moonpool.Immediate_runner], but unlike it, this API
this API assumes you run it in a thread (possibly assumes you run it in a thread (possibly the main thread) which will block
the main thread) which will block until the initial computation is done. until the initial computation is done.
This means it's reasonable to use [Main.main (fun () -> do_everything)] This means it's reasonable to use [Main.main (fun () -> do_everything)] at
at the beginning of the program. the beginning of the program. Other Moonpool pools can be created for
Other Moonpool pools can be created for background tasks, etc. to do the background tasks, etc. to do the heavy lifting, and the main thread (inside
heavy lifting, and the main thread (inside this immediate runner) can coordinate this immediate runner) can coordinate tasks via [Fiber.await].
tasks via [Fiber.await].
Aside from the fact that this blocks the caller thread, it is fairly similar to Aside from the fact that this blocks the caller thread, it is fairly similar
{!Background_thread} in that there's a single worker to process to {!Background_thread} in that there's a single worker to process
tasks/fibers. tasks/fibers.
This handles effects, including the ones in {!Fiber}. This handles effects, including the ones in {!Fiber}.
@since 0.6 @since 0.6 *)
*)
val main : (Moonpool.Runner.t -> 'a) -> 'a val main : (Moonpool.Runner.t -> 'a) -> 'a
(** [main f] runs [f()] in a scope that handles effects, including {!Fiber.await}. (** [main f] runs [f()] in a scope that handles effects, including
{!Fiber.await}.
This scope can run background tasks as well, in a cooperative fashion. *) This scope can run background tasks as well, in a cooperative fashion. *)
val main' : ?block_signals:bool -> unit -> (Moonpool.Runner.t -> 'a) -> 'a
(** Same as {!main} but with room for optional arguments.
@since 0.7 *)

View file

@ -5,20 +5,22 @@
@since 0.3 *) @since 0.3 *)
val both : (unit -> 'a) -> (unit -> 'b) -> 'a * 'b val both : (unit -> 'a) -> (unit -> 'b) -> 'a * 'b
(** [both f g] runs [f()] and [g()], potentially in parallel, (** [both f g] runs [f()] and [g()], potentially in parallel, and returns their
and returns their result when both are done. result when both are done. If any of [f()] and [g()] fails, then the whole
If any of [f()] and [g()] fails, then the whole computation fails. computation fails.
This must be run from within the pool: for example, inside {!Pool.run} This must be run from within the pool: for example, inside {!Pool.run} or
or inside a {!Fut.spawn} computation. inside a {!Fut.spawn} computation. This is because it relies on an effect
This is because it relies on an effect handler to be installed. handler to be installed.
@since 0.3 @since 0.3
{b NOTE} this is only available on OCaml 5. *) {b NOTE} this is only available on OCaml 5. *)
val both_ignore : (unit -> _) -> (unit -> _) -> unit val both_ignore : (unit -> _) -> (unit -> _) -> unit
(** Same as [both f g |> ignore]. (** Same as [both f g |> ignore].
@since 0.3 @since 0.3
{b NOTE} this is only available on OCaml 5. *) {b NOTE} this is only available on OCaml 5. *)
val for_ : ?chunk_size:int -> int -> (int -> int -> unit) -> unit val for_ : ?chunk_size:int -> int -> (int -> int -> unit) -> unit
@ -63,43 +65,49 @@ val for_ : ?chunk_size:int -> int -> (int -> int -> unit) -> unit
{b NOTE} this is only available on OCaml 5. *) {b NOTE} this is only available on OCaml 5. *)
val all_array : ?chunk_size:int -> (unit -> 'a) array -> 'a array val all_array : ?chunk_size:int -> (unit -> 'a) array -> 'a array
(** [all_array fs] runs all functions in [fs] in tasks, and waits for (** [all_array fs] runs all functions in [fs] in tasks, and waits for all the
all the results. results.
@param chunk_size if equal to [n], groups items by [n] to be run in @param chunk_size
a single task. Default is [1]. if equal to [n], groups items by [n] to be run in a single task. Default
is [1].
@since 0.3 @since 0.3
{b NOTE} this is only available on OCaml 5. *) {b NOTE} this is only available on OCaml 5. *)
val all_list : ?chunk_size:int -> (unit -> 'a) list -> 'a list val all_list : ?chunk_size:int -> (unit -> 'a) list -> 'a list
(** [all_list fs] runs all functions in [fs] in tasks, and waits for (** [all_list fs] runs all functions in [fs] in tasks, and waits for all the
all the results. results.
@param chunk_size if equal to [n], groups items by [n] to be run in @param chunk_size
a single task. Default is not specified. if equal to [n], groups items by [n] to be run in a single task. Default
This parameter is available since 0.3. is not specified. This parameter is available since 0.3.
@since 0.3 @since 0.3
{b NOTE} this is only available on OCaml 5. *) {b NOTE} this is only available on OCaml 5. *)
val all_init : ?chunk_size:int -> int -> (int -> 'a) -> 'a list val all_init : ?chunk_size:int -> int -> (int -> 'a) -> 'a list
(** [all_init n f] runs functions [f 0], [f 1], … [f (n-1)] in tasks, and waits for (** [all_init n f] runs functions [f 0], [f 1], … [f (n-1)] in tasks, and waits
all the results. for all the results.
@param chunk_size if equal to [n], groups items by [n] to be run in @param chunk_size
a single task. Default is not specified. if equal to [n], groups items by [n] to be run in a single task. Default
This parameter is available since 0.3. is not specified. This parameter is available since 0.3.
@since 0.3 @since 0.3
{b NOTE} this is only available on OCaml 5. *) {b NOTE} this is only available on OCaml 5. *)
val map_array : ?chunk_size:int -> ('a -> 'b) -> 'a array -> 'b array val map_array : ?chunk_size:int -> ('a -> 'b) -> 'a array -> 'b array
(** [map_array f arr] is like [Array.map f arr], but runs in parallel. (** [map_array f arr] is like [Array.map f arr], but runs in parallel.
@since 0.3 @since 0.3
{b NOTE} this is only available on OCaml 5. *) {b NOTE} this is only available on OCaml 5. *)
val map_list : ?chunk_size:int -> ('a -> 'b) -> 'a list -> 'b list val map_list : ?chunk_size:int -> ('a -> 'b) -> 'a list -> 'b list
(** [map_list f l] is like [List.map f l], but runs in parallel. (** [map_list f l] is like [List.map f l], but runs in parallel.
@since 0.3 @since 0.3
{b NOTE} this is only available on OCaml 5. *) {b NOTE} this is only available on OCaml 5. *)

View file

@ -2,8 +2,7 @@ open Common_
class type t = object class type t = object
method input : bytes -> int -> int -> int method input : bytes -> int -> int -> int
(** Read into the slice. Returns [0] only if the (** Read into the slice. Returns [0] only if the stream is closed. *)
stream is closed. *)
method close : unit -> unit method close : unit -> unit
(** Close the input. Must be idempotent. *) (** Close the input. Must be idempotent. *)
@ -47,7 +46,7 @@ let of_bytes ?(off = 0) ?len (b : bytes) : t =
let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s) let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s)
(** Read into the given slice. (** Read into the given slice.
@return the number of bytes read, [0] means end of input. *) @return the number of bytes read, [0] means end of input. *)
let[@inline] input (self : #t) buf i len = self#input buf i len let[@inline] input (self : #t) buf i len = self#input buf i len
(** Close the channel. *) (** Close the channel. *)

View file

@ -67,7 +67,7 @@ module Perform_action_in_lwt = struct
let actions_ : Action_queue.t = Action_queue.create () let actions_ : Action_queue.t = Action_queue.create ()
(** Gets the current set of notifications and perform them from inside the (** Gets the current set of notifications and perform them from inside the
Lwt thread *) Lwt thread *)
let perform_pending_actions () : unit = let perform_pending_actions () : unit =
let@ _sp = let@ _sp =
Moonpool.Private.Tracing_.with_span Moonpool.Private.Tracing_.with_span

View file

@ -1,8 +1,7 @@
(** Lwt_engine-based event loop for Moonpool. (** Lwt_engine-based event loop for Moonpool.
In what follows, we mean by "lwt thread" the thread In what follows, we mean by "lwt thread" the thread running [Lwt_main.run]
running [Lwt_main.run] (so, the thread where the Lwt event (so, the thread where the Lwt event loop and all Lwt callbacks execute).
loop and all Lwt callbacks execute).
{b NOTE}: this is experimental and might change in future versions. {b NOTE}: this is experimental and might change in future versions.
@ -14,53 +13,50 @@ module FLS = Moonpool_fib.Fls
(** {2 Basic conversions} *) (** {2 Basic conversions} *)
val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t
(** [fut_of_lwt lwt_fut] makes a thread-safe moonpool future that (** [fut_of_lwt lwt_fut] makes a thread-safe moonpool future that completes when
completes when [lwt_fut] does. This must be run from within [lwt_fut] does. This must be run from within the Lwt thread. *)
the Lwt thread. *)
val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t
(** [lwt_of_fut fut] makes a lwt future that completes when (** [lwt_of_fut fut] makes a lwt future that completes when [fut] does. This
[fut] does. This must be called from the Lwt thread, and the result must be called from the Lwt thread, and the result must always be used only
must always be used only from inside the Lwt thread. *) from inside the Lwt thread. *)
(** {2 Helpers on the moonpool side} *) (** {2 Helpers on the moonpool side} *)
val await_lwt : 'a Lwt.t -> 'a val await_lwt : 'a Lwt.t -> 'a
(** [await_lwt fut] awaits a Lwt future from inside a task running on (** [await_lwt fut] awaits a Lwt future from inside a task running on a moonpool
a moonpool runner. This must be run from within a Moonpool runner runner. This must be run from within a Moonpool runner so that the await-ing
so that the await-ing effect is handled. *) effect is handled. *)
val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t
(** [run_in_lwt f] runs [f()] from within the Lwt thread (** [run_in_lwt f] runs [f()] from within the Lwt thread and returns a
and returns a thread-safe future. This can be run from anywhere. *) thread-safe future. This can be run from anywhere. *)
val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a
(** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and (** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and awaits its result.
awaits its result. Must be run from inside a moonpool runner Must be run from inside a moonpool runner so that the await-in effect is
so that the await-in effect is handled. handled.
This is similar to [Moonpool.await @@ run_in_lwt f]. *) This is similar to [Moonpool.await @@ run_in_lwt f]. *)
val get_runner : unit -> Moonpool.Runner.t val get_runner : unit -> Moonpool.Runner.t
(** Returns the runner from within which this is called. (** Returns the runner from within which this is called. Must be run from within
Must be run from within a fiber. a fiber.
@raise Failure if not run within a fiber *) @raise Failure if not run within a fiber *)
(** {2 IO} *) (** {2 IO} *)
(** IO using the Lwt event loop. (** IO using the Lwt event loop.
These IO operations work on non-blocking file descriptors These IO operations work on non-blocking file descriptors and rely on a
and rely on a [Lwt_engine] event loop being active (meaning, [Lwt_engine] event loop being active (meaning, [Lwt_main.run] is currently
[Lwt_main.run] is currently running in some thread). running in some thread).
Calling these functions must be done from a moonpool runner. Calling these functions must be done from a moonpool runner. A function like
A function like [read] will first try to perform the IO action [read] will first try to perform the IO action directly (here, call
directly (here, call {!Unix.read}); if the action fails because {!Unix.read}); if the action fails because the FD is not ready, then
the FD is not ready, then [await_readable] is called: [await_readable] is called: it suspends the fiber and subscribes it to Lwt
it suspends the fiber and subscribes it to Lwt to be awakened to be awakened when the FD becomes ready. *)
when the FD becomes ready.
*)
module IO : sig module IO : sig
val read : Unix.file_descr -> bytes -> int -> int -> int val read : Unix.file_descr -> bytes -> int -> int -> int
(** Read from the file descriptor *) (** Read from the file descriptor *)
@ -91,27 +87,29 @@ module TCP_server : sig
type t = Lwt_io.server type t = Lwt_io.server
val establish_lwt : val establish_lwt :
?backlog:(* ?server_fd:Unix.file_descr -> *) ?backlog:
int -> (* ?server_fd:Unix.file_descr -> *)
int ->
?no_close:bool -> ?no_close:bool ->
runner:Moonpool.Runner.t -> runner:Moonpool.Runner.t ->
Unix.sockaddr -> Unix.sockaddr ->
(Unix.sockaddr -> Lwt_io.input_channel -> Lwt_io.output_channel -> unit) -> (Unix.sockaddr -> Lwt_io.input_channel -> Lwt_io.output_channel -> unit) ->
t t
(** [establish ~runner addr handler] runs a TCP server in the Lwt (** [establish ~runner addr handler] runs a TCP server in the Lwt thread. When
thread. When a client connects, a moonpool fiber is started on [runner] a client connects, a moonpool fiber is started on [runner] to handle it.
to handle it. *) *)
val establish : val establish :
?backlog:(* ?server_fd:Unix.file_descr -> *) ?backlog:
int -> (* ?server_fd:Unix.file_descr -> *)
int ->
?no_close:bool -> ?no_close:bool ->
runner:Moonpool.Runner.t -> runner:Moonpool.Runner.t ->
Unix.sockaddr -> Unix.sockaddr ->
(Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) -> (Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) ->
t t
(** Like {!establish_lwt} but uses {!IO} to directly handle (** Like {!establish_lwt} but uses {!IO} to directly handle reads and writes
reads and writes on client sockets. *) on client sockets. *)
val shutdown : t -> unit val shutdown : t -> unit
(** Shutdown the server *) (** Shutdown the server *)
@ -121,8 +119,8 @@ module TCP_client : sig
val connect : Unix.sockaddr -> Unix.file_descr val connect : Unix.sockaddr -> Unix.file_descr
val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a
(** Open a connection, and use {!IO} to read and write from (** Open a connection, and use {!IO} to read and write from the socket in a
the socket in a non blocking way. *) non blocking way. *)
val with_connect_lwt : val with_connect_lwt :
Unix.sockaddr -> (Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) -> 'a Unix.sockaddr -> (Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) -> 'a
@ -132,15 +130,15 @@ end
(** {2 Helpers on the lwt side} *) (** {2 Helpers on the lwt side} *)
val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t
(** [detach_in_runner ~runner f] runs [f] in the given moonpool runner, (** [detach_in_runner ~runner f] runs [f] in the given moonpool runner, and
and returns a lwt future. This must be run from within the thread returns a lwt future. This must be run from within the thread running
running [Lwt_main]. *) [Lwt_main]. *)
(** {2 Wrappers around Lwt_main} *) (** {2 Wrappers around Lwt_main} *)
val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a
(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside (** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()]
a fiber in [runner]. *) inside a fiber in [runner]. *)
val main : (unit -> 'a) -> 'a val main : (unit -> 'a) -> 'a
(** Like {!main_with_runner} but with a default choice of runner. *) (** Like {!main_with_runner} but with a default choice of runner. *)

View file

@ -1,11 +1,10 @@
(** Work-stealing deque. (** Work-stealing deque.
Adapted from "Dynamic circular work stealing deque", Chase & Lev. Adapted from "Dynamic circular work stealing deque", Chase & Lev.
However note that this one is not dynamic in the sense that there However note that this one is not dynamic in the sense that there is no
is no resizing. Instead we return [false] when [push] fails, which resizing. Instead we return [false] when [push] fails, which keeps the
keeps the implementation fairly lightweight. implementation fairly lightweight. *)
*)
type 'a t type 'a t
(** Deque containing values of type ['a] *) (** Deque containing values of type ['a] *)
@ -14,12 +13,12 @@ val create : dummy:'a -> unit -> 'a t
(** Create a new deque. *) (** Create a new deque. *)
val push : 'a t -> 'a -> bool val push : 'a t -> 'a -> bool
(** Push value at the bottom of deque. returns [true] if it succeeds. (** Push value at the bottom of deque. returns [true] if it succeeds. This must
This must be called only by the owner thread. *) be called only by the owner thread. *)
val pop : 'a t -> 'a option val pop : 'a t -> 'a option
(** Pop value from the bottom of deque. (** Pop value from the bottom of deque. This must be called only by the owner
This must be called only by the owner thread. *) thread. *)
exception Empty exception Empty

View file

@ -1,8 +1,8 @@
(** Mutex-protected resource. (** Mutex-protected resource.
This lock is a synchronous concurrency primitive, as a thin wrapper This lock is a synchronous concurrency primitive, as a thin wrapper around
around {!Mutex} that encourages proper management of the critical {!Mutex} that encourages proper management of the critical section in RAII
section in RAII style: style:
{[ {[
let (let@) = (@@) let (let@) = (@@)
@ -30,27 +30,27 @@ val create : 'a -> 'a t
(** Create a new protected value. *) (** Create a new protected value. *)
val with_ : 'a t -> ('a -> 'b) -> 'b val with_ : 'a t -> ('a -> 'b) -> 'b
(** [with_ l f] runs [f x] where [x] is the value protected with (** [with_ l f] runs [f x] where [x] is the value protected with the lock [l],
the lock [l], in a critical section. If [f x] fails, [with_lock l f] in a critical section. If [f x] fails, [with_lock l f] fails too but the
fails too but the lock is released. *) lock is released. *)
val update : 'a t -> ('a -> 'a) -> unit val update : 'a t -> ('a -> 'a) -> unit
(** [update l f] replaces the content [x] of [l] with [f x], while protected (** [update l f] replaces the content [x] of [l] with [f x], while protected by
by the mutex. *) the mutex. *)
val update_map : 'a t -> ('a -> 'a * 'b) -> 'b val update_map : 'a t -> ('a -> 'a * 'b) -> 'b
(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l] (** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l] and
and returns [y], while protected by the mutex. *) returns [y], while protected by the mutex. *)
val mutex : _ t -> Picos_std_sync.Mutex.t val mutex : _ t -> Picos_std_sync.Mutex.t
(** Underlying mutex. *) (** Underlying mutex. *)
val get : 'a t -> 'a val get : 'a t -> 'a
(** Atomically get the value in the lock. The value that is returned (** Atomically get the value in the lock. The value that is returned isn't
isn't protected! *) protected! *)
val set : 'a t -> 'a -> unit val set : 'a t -> 'a -> unit
(** Atomically set the value. (** Atomically set the value.
{b NOTE} caution: using {!get} and {!set} as if this were a {!ref} {b NOTE} caution: using {!get} and {!set} as if this were a {!ref} is an
is an anti pattern and will not protect data against some race conditions. *) anti pattern and will not protect data against some race conditions. *)