Compare commits

...

18 commits

Author SHA1 Message Date
Simon Cruanes
4de33f0121
prepare for 0.10
Some checks are pending
github pages / Deploy doc (push) Waiting to run
Build and Test / build (push) Waiting to run
Build and Test / build-compat (push) Waiting to run
Build and Test / format (push) Waiting to run
2025-11-13 20:27:01 -05:00
Simon Cruanes
58a0f891f7
Merge pull request #36 from c-cube/simon/fix-35
fix domain pool: block signals in background threads
2025-11-13 19:58:54 -05:00
Simon Cruanes
b1688f71e7
more signal handling 2025-11-13 19:53:02 -05:00
Simon Cruanes
794b263d36
improve lock 2025-11-13 19:50:40 -05:00
Simon Cruanes
a40ea8b41b
avoid recursion in dpool 2025-11-13 19:46:56 -05:00
Simon Cruanes
40e97d969a
fix domain pool: block signals in background threads
close #35
2025-11-13 19:46:56 -05:00
Simon Cruanes
c3f235f7e9
Merge pull request #40 from c-cube/simon/reduce-scope-round2
reduce scope again: remove structured concurrency-based fibers
2025-11-13 19:40:37 -05:00
Simon Cruanes
0b28898586
rename 2025-11-13 19:39:57 -05:00
Simon Cruanes
997d996c13
fix test 2025-11-12 09:10:52 -05:00
Simon Cruanes
ee7972910f
breaking: remove around_task from schedulers 2025-11-12 00:25:02 -05:00
Simon Cruanes
2ce3fa7d3e
docs 2025-11-12 00:25:02 -05:00
Simon Cruanes
8770d4fb9c
repro for #41 2025-11-12 00:25:02 -05:00
Simon Cruanes
95de0e7e27
test: update readme and the mdx test 2025-10-25 21:50:47 -04:00
Simon Cruanes
4924b5f52b
test: update tests, removing the fibers and cancellation tests 2025-10-25 21:50:47 -04:00
Simon Cruanes
db9cddf999
feat core: add Main, salvaged from moonpool.fib 2025-10-25 21:50:46 -04:00
Simon Cruanes
f9ab951c36
remove moonpool.fib
it's complicated and hard to use in practice, because it's not obvious
if a piece of code is running under another fiber or not, so
`Fiber.spawn` might fail because it has no parent.

So in practice we've been using `Fiber.spawn_top`… which has no
interest over just using `Fut.spawn`.
2025-10-25 21:50:46 -04:00
Simon Cruanes
2aa2612963
doc for Fut 2025-10-25 21:50:46 -04:00
Simon Cruanes
f92efa562d
doc 2025-10-25 21:50:46 -04:00
51 changed files with 6103 additions and 6654 deletions

View file

@ -1,4 +1,16 @@
# 0.10
- breaking: remove `around_task` from schedulers
- breaking: remove `moonpool.fib` entirely. Please use `picos_std.structured`
if you really need structured concurrency.
- remove deprecated moonpool-io and moonpool.sync
- feat core: add `Main`, salvaged from moonpool.fib
- block signals in background threads
- refactor `chan`; fix bug in `Chan.try_push`
- fix: make `Moonpool_lwt.fut_of_lwt` idempotent
# 0.9
- breaking: require OCaml 5

View file

@ -67,6 +67,14 @@ bench-pi:
'./_build/default/benchs/pi.exe -n $(PI_NSTEPS) -j 16 -mode forkjoin -kind=pool' \
'./_build/default/benchs/pi.exe -n $(PI_NSTEPS) -j 20 -mode forkjoin -kind=pool'
bench-repro-41:
dune build $(DUNE_OPTS_BENCH) examples/repro_41/run.exe
hyperfine --warmup=1 \
"./_build/default/examples/repro_41/run.exe 4 domainslib" \
"./_build/default/examples/repro_41/run.exe 4 moonpool" \
"./_build/default/examples/repro_41/run.exe 5 moonpool" \
"./_build/default/examples/repro_41/run.exe 5 seq"
.PHONY: test clean bench-fib bench-pi
VERSION=$(shell awk '/^version:/ {print $$2}' moonpool.opam)

View file

@ -173,49 +173,9 @@ val expected_sum : int = 5050
We have a `Exn_bt.t` type that comes in handy in many places. It bundles together
an exception and the backtrace associated with the place the exception was caught.
### Fibers
### Local storage
On OCaml 5, Moonpool comes with a library `moonpool.fib` (module `Moonpool_fib`)
which provides _lightweight fibers_
that can run on any Moonpool runner.
These fibers are a sort of lightweight thread, dispatched on the runner's
background thread(s).
Fibers rely on effects to implement `Fiber.await`, suspending themselves until the `await`-ed fiber
is done.
```ocaml
# #require "moonpool.fib";;
...
# (* convenient alias *)
module F = Moonpool_fib;;
module F = Moonpool_fib
# F.main (fun _runner ->
let f1 = F.spawn (fun () -> fib 10) in
let f2 = F.spawn (fun () -> fib 15) in
F.await f1 + F.await f2);;
- : int = 1076
```
Fibers form a _tree_, where a fiber calling `Fiber.spawn` to start a sub-fiber is
the sub-fiber's _parent_.
When a parent fails, all its children are cancelled (forced to fail).
This is a simple form of [Structured Concurrency](https://en.wikipedia.org/wiki/Structured_concurrency).
Like a future, a fiber eventually _resolves_ into a value (or an `Exn_bt.t`) that it's possible
to `await`. With `Fiber.res : 'a Fiber.t -> 'a Fut.t` it's possible to access that result
as a regular future, too.
However, this resolution is only done after all the children of the fiber have
resolved — the lifetime of fibers forms a well-nested tree in that sense.
When a fiber is suspended because it `await`s another fiber (or future), the scheduler's
thread on which it was running becomes available again and can go on process another task.
When the fiber resumes, it will automatically be re-scheduled on the same runner it started on.
This means fibers on pool P1 can await fibers from pool P2 and still be resumed on P1.
In addition to all that, fibers provide _fiber local storage_ (like thread-local storage, but per fiber).
This storage is inherited in `spawn` (as a shallow copy only — it's advisable to only
put persistent data in storage to avoid confusing aliasing).
Moonpool, via picos, provides _task local storage_ (like thread-local storage, but per task).
The storage is convenient for carrying around context for cross-cutting concerns such
as logging or tracing (e.g. a log tag for the current user or request ID, or a tracing
scope).

2
dune
View file

@ -3,7 +3,7 @@
(flags :standard -strict-sequence -warn-error -a+8 -w +a-4-40-42-70)))
(mdx
(libraries moonpool moonpool.forkjoin moonpool.fib threads)
(libraries moonpool moonpool.forkjoin threads)
(package moonpool)
(enabled_if
(>= %{ocaml_version} 5.0)))

View file

@ -1,13 +1,20 @@
(lang dune 3.0)
(using mdx 0.2)
(name moonpool)
(version 0.9)
(version 0.10)
(generate_opam_files true)
(source
(github c-cube/moonpool))
(authors "Simon Cruanes")
(maintainers "Simon Cruanes")
(license MIT)
;(documentation https://url/to/documentation)
@ -16,41 +23,60 @@
(name moonpool)
(synopsis "Pools of threads supported by a pool of domains")
(depends
(ocaml (>= 5.0))
(ocaml
(>= 5.0))
dune
(either (>= 1.0))
(either
(>= 1.0))
(trace :with-test)
(trace-tef :with-test)
(qcheck-core (and :with-test (>= 0.19)))
(thread-local-storage (and (>= 0.2) (< 0.3)))
(qcheck-core
(and
:with-test
(>= 0.19)))
(thread-local-storage
(and
(>= 0.2)
(< 0.3)))
(odoc :with-doc)
(hmap :with-test)
(picos (and (>= 0.5) (< 0.7)))
(picos_std (and (>= 0.5) (< 0.7)))
(picos
(and
(>= 0.5)
(< 0.7)))
(picos_std
(and
(>= 0.5)
(< 0.7)))
(mdx
(and
(>= 1.9.0)
:with-test)))
(depopts
hmap
(trace (>= 0.6)))
hmap
(trace
(>= 0.6)))
(tags
(thread pool domain futures fork-join)))
(package
(name moonpool-lwt)
(synopsis "Event loop for moonpool based on Lwt-engine (experimental)")
(allow_empty) ; on < 5.0
(depends
(moonpool (= :version))
(ocaml (>= 5.0))
(qcheck-core (and :with-test (>= 0.19)))
(hmap :with-test)
lwt
base-unix
(trace :with-test)
(trace-tef :with-test)
(odoc :with-doc)))
(name moonpool-lwt)
(synopsis "Event loop for moonpool based on Lwt-engine (experimental)")
(allow_empty) ; on < 5.0
(depends
(moonpool
(= :version))
(ocaml
(>= 5.0))
(qcheck-core
(and
:with-test
(>= 0.19)))
(hmap :with-test)
lwt
base-unix
(trace :with-test)
(trace-tef :with-test)
(odoc :with-doc)))
; See the complete stanza docs at https://dune.readthedocs.io/en/stable/dune-files.html#dune-project

View file

@ -1,11 +1,12 @@
(** Example from
https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 *)
(** NOTE: this was an example from
https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 but
there is no cancelation anymore :) *)
let ( let@ ) = ( @@ )
let () =
let@ () = Trace_tef.with_setup () in
let@ _ = Moonpool_fib.main in
let@ _ = Moonpool.main in
(* let@ runner = Moonpool.Ws_pool.with_ () in *)
let@ runner = Moonpool.Background_thread.with_ () in
@ -13,15 +14,13 @@ let () =
(* Pretend this is some long-running read loop *)
for i = 1 to 10 do
Printf.printf "MAIN LOOP %d\n%!" i;
Moonpool_fib.check_if_cancelled ();
let _ : _ Moonpool_fib.t =
Moonpool_fib.spawn ~on:runner ~protect:false (fun () ->
let _ : _ Moonpool.Fut.t =
Moonpool.Fut.spawn ~on:runner (fun () ->
Printf.printf "RUN FIBER %d\n%!" i;
Moonpool_fib.check_if_cancelled ();
Format.printf "FIBER %d NOT CANCELLED YET@." i;
failwith "BOOM")
in
Moonpool_fib.yield ();
Moonpool.Fut.yield ();
(* Thread.delay 0.2; *)
(* Thread.delay 0.0001; *)
()

View file

@ -5,7 +5,6 @@
;(package moonpool)
(libraries
moonpool
moonpool.fib
trace
trace-tef
;tracy-client.trace

5
examples/repro_41/dune Normal file
View file

@ -0,0 +1,5 @@
(executables
(names run)
(enabled_if
(>= %{ocaml_version} 5.0))
(libraries moonpool trace trace-tef domainslib))

54
examples/repro_41/run.ml Normal file
View file

@ -0,0 +1,54 @@
(* fibo.ml *)
let cutoff = 25
let input = 40
let rec fibo_seq n =
if n <= 1 then
n
else
fibo_seq (n - 1) + fibo_seq (n - 2)
let rec fibo_domainslib ctx n =
if n <= cutoff then
fibo_seq n
else
let open Domainslib in
let fut1 = Task.async ctx (fun () -> fibo_domainslib ctx (n - 1)) in
let fut2 = Task.async ctx (fun () -> fibo_domainslib ctx (n - 2)) in
Task.await ctx fut1 + Task.await ctx fut2
let rec fibo_moonpool ctx n =
if n <= cutoff then
fibo_seq n
else
let open Moonpool in
let fut1 = Fut.spawn ~on:ctx (fun () -> fibo_moonpool ctx (n - 1)) in
let fut2 = Fut.spawn ~on:ctx (fun () -> fibo_moonpool ctx (n - 2)) in
Fut.await fut1 + Fut.await fut2
let usage =
"fibo.exe <num_domains> [ domainslib | moonpool | moonpool_fifo | seq ]"
let num_domains = try int_of_string Sys.argv.(1) with _ -> failwith usage
let implem = try Sys.argv.(2) with _ -> failwith usage
let () =
let output =
match implem with
| "moonpool" ->
let open Moonpool in
let ctx = Ws_pool.create ~num_threads:num_domains () in
Ws_pool.run_wait_block ctx (fun () -> fibo_moonpool ctx input)
| "moonpool_fifo" ->
let open Moonpool in
let ctx = Fifo_pool.create ~num_threads:num_domains () in
Ws_pool.run_wait_block ctx (fun () -> fibo_moonpool ctx input)
| "domainslib" ->
let open Domainslib in
let pool = Task.setup_pool ~num_domains () in
Task.run pool (fun () -> fibo_domainslib pool input)
| "seq" -> fibo_seq input
| _ -> failwith usage
in
print_int output;
print_newline ()

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.9"
version: "0.10"
synopsis: "Event loop for moonpool based on Lwt-engine (experimental)"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.9"
version: "0.10"
synopsis: "Pools of threads supported by a pool of domains"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]

View file

@ -6,18 +6,15 @@ type ('a, 'b) create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?name:string ->
'a
(** Arguments used in {!create}. See {!create} for explanations. *)
let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?name () : t =
Fifo_pool.create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?name
~num_threads:1 ()
let create ?on_init_thread ?on_exit_thread ?on_exn ?name () : t =
Fifo_pool.create ?on_init_thread ?on_exit_thread ?on_exn ?name ~num_threads:1
()
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?name () f =
let pool =
create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?name ()
in
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?name () f =
let pool = create ?on_init_thread ?on_exit_thread ?on_exn ?name () in
let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in
f pool

View file

@ -13,7 +13,6 @@ type ('a, 'b) create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?name:string ->
'a
(** Arguments used in {!create}. See {!create} for explanations. *)

View file

@ -10,7 +10,6 @@ let ( let@ ) = ( @@ )
type state = {
threads: Thread.t array;
q: task_full Bb_queue.t; (** Queue for tasks. *)
around_task: WL.around_task;
mutable as_runner: t;
(* init options *)
name: string option;
@ -43,13 +42,10 @@ type ('a, 'b) create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?num_threads:int ->
?name:string ->
'a
let default_around_task_ : WL.around_task = AT_pair (ignore, fun _ _ -> ())
(** Run [task] as is, on the pool. *)
let schedule_ (self : state) (task : task_full) : unit =
try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown
@ -88,7 +84,6 @@ let cleanup (self : worker_state) : unit =
let worker_ops : worker_state WL.ops =
let runner (st : worker_state) = st.st.as_runner in
let around_task st = st.st.around_task in
let on_exn (st : worker_state) (ebt : Exn_bt.t) =
st.st.on_exn (Exn_bt.exn ebt) (Exn_bt.bt ebt)
in
@ -96,7 +91,6 @@ let worker_ops : worker_state WL.ops =
WL.schedule = schedule_w;
runner;
get_next_task;
around_task;
on_exn;
before_start;
cleanup;
@ -104,19 +98,11 @@ let worker_ops : worker_state WL.ops =
let create_ ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
?around_task ~threads ?name () : state =
(* wrapper *)
let around_task =
match around_task with
| Some (f, g) -> WL.AT_pair (f, g)
| None -> default_around_task_
in
~threads ?name () : state =
let self =
{
threads;
q = Bb_queue.create ();
around_task;
as_runner = Runner.dummy;
name;
on_init_thread;
@ -127,8 +113,7 @@ let create_ ?(on_init_thread = default_thread_init_exit_)
self.as_runner <- runner_of_state self;
self
let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
?name () : t =
let create ?on_init_thread ?on_exit_thread ?on_exn ?num_threads ?name () : t =
let num_domains = Domain_pool_.max_number_of_domains () in
(* number of threads to run *)
@ -140,8 +125,7 @@ let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
let pool =
let dummy_thread = Thread.self () in
let threads = Array.make num_threads dummy_thread in
create_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ~threads ?name
()
create_ ?on_init_thread ?on_exit_thread ?on_exn ~threads ?name ()
in
let runner = runner_of_state pool in
@ -181,11 +165,9 @@ let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
runner
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
?name () f =
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?num_threads ?name () f =
let pool =
create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
?name ()
create ?on_init_thread ?on_exit_thread ?on_exn ?num_threads ?name ()
in
let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in
f pool

View file

@ -20,7 +20,6 @@ type ('a, 'b) create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?num_threads:int ->
?name:string ->
'a
@ -35,9 +34,6 @@ val create : (unit -> t, _) create_args
[Domain.recommended_domain_count()], ie one worker per CPU core. On OCaml
4 the default is [4] (since there is only one domain).
@param on_exit_thread called at the end of each worker thread in the pool.
@param around_task
a pair of [before, after] functions ran around each task. See
{!Pool.create_args}.
@param name name for the pool, used in tracing (since 0.6) *)
val with_ : (unit -> (t -> 'a) -> 'a, _) create_args

View file

@ -436,6 +436,8 @@ let await (self : 'a t) : 'a =
(* un-suspended: we should have a result! *)
get_or_fail_exn self
let yield = Picos.Fiber.yield
module Infix = struct
let[@inline] ( >|= ) x f = map ~f x
let[@inline] ( >>= ) x f = bind ~f x

View file

@ -8,12 +8,16 @@
(storing a [Ok x] with [x: 'a]), or fail (storing a [Error (exn, bt)] with
an exception and the corresponding backtrace).
Using {!spawn}, it's possible to start a bunch of tasks, obtaining futures,
and then use {!await} to get their result in the desired order.
Combinators such as {!map} and {!join_array} can be used to produce futures
from other futures (in a monadic way). Some combinators take a [on] argument
to specify a runner on which the intermediate computation takes place; for
example [map ~on:pool ~f fut] maps the value in [fut] using function [f],
applicatively; the call to [f] happens on the runner [pool] (once [fut]
resolves successfully with a value). *)
resolves successfully with a value). Be aware that these combinators do not
preserve local storage. *)
type 'a or_error = ('a, Exn_bt.t) result
@ -30,7 +34,8 @@ val make : unit -> 'a t * 'a promise
val make_promise : unit -> 'a promise
(** Same as {!make} but returns a single promise (which can be upcast to a
future). This is useful mostly to preserve memory.
future). This is useful mostly to preserve memory, you probably don't need
it.
How to upcast to a future in the worst case:
{[
@ -40,8 +45,11 @@ val make_promise : unit -> 'a promise
@since 0.7 *)
val on_result : 'a t -> ('a or_error -> unit) -> unit
(** [on_result fut f] registers [f] to be called in the future when [fut] is set
; or calls [f] immediately if [fut] is already set. *)
(** [on_result fut f] registers [f] to be called in the future when [fut] is
set; or calls [f] immediately if [fut] is already set.
{b NOTE:} it's ill advised to do meaningful work inside the callback [f].
Instead, try to spawn another task on the runner, or use {!await}. *)
val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
(** [on_result_ignore fut f] registers [f] to be called in the future when [fut]
@ -52,13 +60,14 @@ val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
exception Already_fulfilled
val try_cancel : _ promise -> Exn_bt.t -> bool
(** [try_cancel promise ebt] tries to cancel the promise, returning [true]. It
returns [false] if the promise is already resolved.
@since NEXT_RELEASE *)
(** [try_cancel promise ebt] tries to cancel the promise using the given
exception, returning [true]. It returns [false] if the promise is already
resolved.
@since 0.9 *)
val cancel : _ promise -> Exn_bt.t -> unit
(** Silent version of {!try_cancel}, ignoring the result.
@since NEXT_RELEASE *)
@since 0.9 *)
val fulfill : 'a promise -> 'a or_error -> unit
(** Fullfill the promise, setting the future at the same time.
@ -79,6 +88,7 @@ val fail_exn_bt : Exn_bt.t -> _ t
@since 0.6 *)
val of_result : 'a or_error -> 'a t
(** Already resolved future from a result. *)
val is_resolved : _ t -> bool
(** [is_resolved fut] is [true] iff [fut] is resolved. *)
@ -136,7 +146,7 @@ val spawn_on_current_runner : (unit -> 'a) -> 'a t
val reify_error : 'a t -> 'a or_error t
(** [reify_error fut] turns a failing future into a non-failing one that contain
[Error (exn, bt)]. A non-failing future returning [x] is turned into [Ok x]
[Error (exn, bt)]. A non-failing future returning [x] is turned into [Ok x].
@since 0.4 *)
val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t
@ -149,12 +159,18 @@ val bind : ?on:Runner.t -> f:('a -> 'b t) -> 'a t -> 'b t
(** [bind ?on ~f fut] returns a new future [fut2] that resolves like the future
[f x] if [fut] resolved with [x]; and fails with [e] if [fut] fails with [e]
or [f x] raises [e].
This does not preserve local storage of [fut] inside [f].
@param on if provided, [f] runs on the given runner *)
val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t
(** [bind_reify_error ?on ~f fut] returns a new future [fut2] that resolves like
the future [f (Ok x)] if [fut] resolved with [x]; and resolves like the
future [f (Error (exn, bt))] if [fut] fails with [exn] and backtrace [bt].
This does not preserve local storage of [fut] inside [f].
@param on if provided, [f] runs on the given runner
@since 0.4 *)
@ -182,6 +198,7 @@ val join_array : 'a t array -> 'a array t
val join_list : 'a t list -> 'a list t
(** Wait for all the futures in the list. Fails if any future fails. *)
(** Advanced primitives for synchronization *)
module Advanced : sig
val barrier_on_abstract_container_of_futures :
iter:(('a t -> unit) -> 'cont -> unit) ->
@ -234,7 +251,9 @@ val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t
(** {2 Await}
{b NOTE} This is only available on OCaml 5. *)
This suspends the current task using an OCaml 5 algebraic effect, and makes
preparations for the task to be resumed once the future has been resolved.
*)
val await : 'a t -> 'a
(** [await fut] suspends the current tasks until [fut] is fulfilled, then
@ -244,7 +263,11 @@ val await : 'a t -> 'a
@since 0.3
This must only be run from inside the runner itself. The runner must support
{!Suspend_}. {b NOTE}: only on OCaml 5.x *)
{!Suspend_}. *)
val yield : unit -> unit
(** Like {!Moonpool.yield}.
@since 0.10 *)
(** {2 Blocking} *)
@ -252,7 +275,7 @@ val wait_block : 'a t -> 'a or_error
(** [wait_block fut] blocks the current thread until [fut] is resolved, and
returns its value.
{b NOTE}: A word of warning: this will monopolize the calling thread until
{b NOTE:} A word of warning: this will monopolize the calling thread until
the future resolves. This can also easily cause deadlocks, if enough threads
in a pool call [wait_block] on futures running on the same pool or a pool
depending on it.
@ -265,7 +288,10 @@ val wait_block : 'a t -> 'a or_error
the deadlock. *)
val wait_block_exn : 'a t -> 'a
(** Same as {!wait_block} but re-raises the exception if the future failed. *)
(** Same as {!wait_block} but re-raises the exception if the future failed.
{b NOTE:} do check the cautionary note in {!wait_block} concerning
deadlocks. *)
(** {2 Infix operators}
@ -297,9 +323,10 @@ module Infix_local = Infix
module Private_ : sig
val unsafe_promise_of_fut : 'a t -> 'a promise
(** please do not use *)
(** Do not use unless you know exactly what you are doing. *)
val as_computation : 'a t -> 'a Picos.Computation.t
(** Picos compat *)
end
(**/**)

View file

@ -8,15 +8,15 @@ let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
in
let runner = Fifo_pool.Private_.runner_of_state worker_st in
try
let fiber = Fiber.spawn_top ~on:runner (fun () -> f runner) in
Fiber.on_result fiber (fun _ -> Runner.shutdown_without_waiting runner);
let fut = Fut.spawn ~on:runner (fun () -> f runner) in
Fut.on_result fut (fun _ -> Runner.shutdown_without_waiting runner);
(* run the main thread *)
Moonpool.Private.Worker_loop_.worker_loop worker_st
Worker_loop_.worker_loop worker_st
~block_signals (* do not disturb existing thread *)
~ops:Fifo_pool.Private_.worker_ops;
match Fiber.peek fiber with
match Fut.peek fut with
| Some (Ok x) -> x
| Some (Error ebt) -> Exn_bt.raise ebt
| None -> assert false

View file

@ -13,16 +13,18 @@
to {!Background_thread} in that there's a single worker to process
tasks/fibers.
This handles effects, including the ones in {!Fiber}.
This handles the concurency effects used in moonpool, including [await] and
[yield].
@since 0.6 *)
This module was migrated from the late [Moonpool_fib].
val main : (Moonpool.Runner.t -> 'a) -> 'a
@since 0.10 *)
val main : (Runner.t -> 'a) -> 'a
(** [main f] runs [f()] in a scope that handles effects, including
{!Fiber.await}.
This scope can run background tasks as well, in a cooperative fashion. *)
val main' : ?block_signals:bool -> unit -> (Moonpool.Runner.t -> 'a) -> 'a
(** Same as {!main} but with room for optional arguments.
@since 0.7 *)
val main' : ?block_signals:bool -> unit -> (Runner.t -> 'a) -> 'a
(** Same as {!main} but with room for optional arguments. *)

View file

@ -23,6 +23,7 @@ module Exn_bt = Exn_bt
module Fifo_pool = Fifo_pool
module Fut = Fut
module Lock = Lock
module Main = Main
module Immediate_runner = struct end
module Runner = Runner
module Task_local_storage = Task_local_storage
@ -30,6 +31,9 @@ module Thread_local_storage = Thread_local_storage
module Trigger = Trigger
module Ws_pool = Ws_pool
(* re-export main *)
include Main
module Private = struct
module Ws_deque_ = Ws_deque_
module Worker_loop_ = Worker_loop_

View file

@ -13,6 +13,7 @@ module Fifo_pool = Fifo_pool
module Background_thread = Background_thread
module Runner = Runner
module Trigger = Trigger
module Main = Main
module Immediate_runner : sig end
[@@deprecated "use Moonpool_fib.Main"]
@ -80,7 +81,7 @@ val await : 'a Fut.t -> 'a
val yield : unit -> unit
(** Yield from the current task, must be run on a moonpool runner. Only on OCaml
>= 5.0.
@since NEXT_RELEASE *)
@since 0.9 *)
module Lock = Lock
module Fut = Fut
@ -205,6 +206,10 @@ module Atomic = Atomic
This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic]
module on OCaml 5. *)
include module type of struct
include Main
end
(**/**)
(** Private internals, with no stability guarantees *)

View file

@ -13,15 +13,11 @@ type task_full =
}
-> task_full
type around_task =
| AT_pair : (Runner.t -> 'a) * (Runner.t -> 'a -> unit) -> around_task
exception No_more_tasks
type 'st ops = {
schedule: 'st -> task_full -> unit;
get_next_task: 'st -> task_full; (** @raise No_more_tasks *)
around_task: 'st -> around_task;
on_exn: 'st -> Exn_bt.t -> unit;
runner: 'st -> Runner.t;
before_start: 'st -> unit;
@ -117,7 +113,6 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
let state = ref New
let run_task (task : task_full) : unit =
let (AT_pair (before_task, after_task)) = ops.around_task st in
let fiber =
match task with
| T_start { fiber; _ } | T_resume { fiber; _ } -> fiber
@ -125,7 +120,8 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
cur_fiber := fiber;
TLS.set k_cur_fiber fiber;
let _ctx = before_task runner in
(* let _ctx = before_task runner in *)
(* run the task now, catching errors, handling effects *)
assert (task != _dummy_task);
@ -140,8 +136,7 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
let ebt = Exn_bt.make e bt in
ops.on_exn st ebt);
after_task runner _ctx;
(* after_task runner _ctx; *)
cur_fiber := _dummy_fiber;
TLS.set k_cur_fiber _dummy_fiber
@ -149,22 +144,7 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
if !state <> New then invalid_arg "worker_loop.setup: not a new instance";
state := Ready;
if block_signals then (
try
ignore
(Unix.sigprocmask SIG_BLOCK
[
Sys.sigterm;
Sys.sigpipe;
Sys.sigint;
Sys.sigchld;
Sys.sigalrm;
Sys.sigusr1;
Sys.sigusr2;
]
: _ list)
with _ -> ()
);
if block_signals then Signals_.ignore_signals_ ();
TLS.set Runner.For_runner_implementors.k_cur_runner runner;

View file

@ -18,15 +18,11 @@ type task_full =
val _dummy_task : task_full
type around_task =
| AT_pair : (Runner.t -> 'a) * (Runner.t -> 'a -> unit) -> around_task
exception No_more_tasks
type 'st ops = {
schedule: 'st -> task_full -> unit;
get_next_task: 'st -> task_full;
around_task: 'st -> around_task;
on_exn: 'st -> Exn_bt.t -> unit;
runner: 'st -> Runner.t;
before_start: 'st -> unit;

View file

@ -28,7 +28,6 @@ type state = {
cond: Condition.t;
mutable as_runner: t;
(* init options *)
around_task: WL.around_task;
name: string option;
on_init_thread: dom_id:int -> t_id:int -> unit -> unit;
on_exit_thread: dom_id:int -> t_id:int -> unit -> unit;
@ -198,7 +197,6 @@ let cleanup (self : worker_state) : unit =
let worker_ops : worker_state WL.ops =
let runner (st : worker_state) = st.st.as_runner in
let around_task st = st.st.around_task in
let on_exn (st : worker_state) (ebt : Exn_bt.t) =
st.st.on_exn (Exn_bt.exn ebt) (Exn_bt.bt ebt)
in
@ -206,7 +204,6 @@ let worker_ops : worker_state WL.ops =
WL.schedule = schedule_from_w;
runner;
get_next_task;
around_task;
on_exn;
before_start;
cleanup;
@ -235,7 +232,6 @@ type ('a, 'b) create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?num_threads:int ->
?name:string ->
'a
@ -243,15 +239,8 @@ type ('a, 'b) create_args =
let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
?around_task ?num_threads ?name () : t =
?num_threads ?name () : t =
let pool_id_ = Id.create () in
(* wrapper *)
let around_task =
match around_task with
| Some (f, g) -> WL.AT_pair (f, g)
| None -> WL.AT_pair (ignore, fun _ _ -> ())
in
let num_domains = Domain_pool_.max_number_of_domains () in
let num_threads = Util_pool_.num_threads ?num_threads () in
@ -268,7 +257,6 @@ let create ?(on_init_thread = default_thread_init_exit_)
n_waiting_nonzero = true;
mutex = Mutex.create ();
cond = Condition.create ();
around_task;
on_exn;
on_init_thread;
on_exit_thread;
@ -324,11 +312,9 @@ let create ?(on_init_thread = default_thread_init_exit_)
pool.as_runner
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
?name () f =
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?num_threads ?name () f =
let pool =
create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
?name ()
create ?on_init_thread ?on_exit_thread ?on_exn ?num_threads ?name ()
in
let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in
f pool

View file

@ -24,7 +24,6 @@ type ('a, 'b) create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?num_threads:int ->
?name:string ->
'a
@ -40,11 +39,6 @@ val create : (unit -> t, _) create_args
[Domain.recommended_domain_count()], ie one worker thread per CPU core. On
OCaml 4 the default is [4] (since there is only one domain).
@param on_exit_thread called at the end of each thread in the pool
@param around_task
a pair of [before, after], where [before pool] is called before a task is
processed, on the worker thread about to run it, and returns [x]; and
[after pool x] is called by the same thread after the task is over. (since
0.2)
@param name
a name for this thread pool, used if tracing is enabled (since 0.6) *)

View file

@ -15,19 +15,23 @@ module Bb_queue = struct
if was_empty then Condition.broadcast self.cond;
Mutex.unlock self.mutex
let pop (self : 'a t) : 'a =
Mutex.lock self.mutex;
let rec loop () =
if Queue.is_empty self.q then (
Condition.wait self.cond self.mutex;
(loop [@tailcall]) ()
) else (
let x = Queue.pop self.q in
Mutex.unlock self.mutex;
x
)
in
loop ()
let pop (type a) (self : a t) : a =
let module M = struct
exception Found of a
end in
try
Mutex.lock self.mutex;
while true do
if Queue.is_empty self.q then
Condition.wait self.cond self.mutex
else (
let x = Queue.pop self.q in
Mutex.unlock self.mutex;
raise (M.Found x)
)
done;
assert false
with M.Found x -> x
end
module Lock = struct
@ -38,13 +42,13 @@ module Lock = struct
let create content : _ t = { mutex = Mutex.create (); content }
let with_ (self : _ t) f =
let[@inline never] with_ (self : _ t) f =
Mutex.lock self.mutex;
try
let x = f self.content in
match f self.content with
| x ->
Mutex.unlock self.mutex;
x
with e ->
| exception e ->
Mutex.unlock self.mutex;
raise e
@ -95,6 +99,7 @@ let domains_ : (worker_state option * Domain_.t option) Lock.t array =
a [Pool.with_] or [Pool.create() Pool.shutdown()] in a tight loop), and
if nothing happens it tries to stop to free resources. *)
let work_ idx (st : worker_state) : unit =
Signals_.ignore_signals_ ();
let main_loop () =
let continue = ref true in
while !continue do
@ -146,7 +151,7 @@ let work_ idx (st : worker_state) : unit =
let () =
assert (Domain_.is_main_domain ());
let w = { th_count = Atomic.make 1; q = Bb_queue.create () } in
(* thread that stays alive *)
(* thread that stays alive since [th_count>0] will always hold *)
ignore (Thread.create (fun () -> work_ 0 w) () : Thread.t);
domains_.(0) <- Lock.create (Some w, None)
@ -154,7 +159,8 @@ let[@inline] max_number_of_domains () : int = Array.length domains_
let run_on (i : int) (f : unit -> unit) : unit =
assert (i < Array.length domains_);
let w =
let w : worker_state =
Lock.update_map domains_.(i) (function
| (Some w, _) as st ->
Atomic.incr w.th_count;

View file

@ -1,6 +0,0 @@
(library
(name moonpool_fib)
(public_name moonpool.fib)
(synopsis "Fibers and structured concurrency for Moonpool")
(libraries moonpool picos)
(flags :standard -open Moonpool_private -open Moonpool))

View file

@ -1,334 +0,0 @@
open Moonpool.Private.Types_
module A = Atomic
module FM = Handle.Map
module Int_map = Map.Make (Int)
module PF = Picos.Fiber
module FLS = Picos.Fiber.FLS
type 'a callback = 'a Exn_bt.result -> unit
(** Callbacks that are called when a fiber is done. *)
type cancel_callback = Exn_bt.t -> unit
let prom_of_fut : 'a Fut.t -> 'a Fut.promise =
Fut.Private_.unsafe_promise_of_fut
(* TODO: replace with picos structured at some point? *)
module Private_ = struct
type pfiber = PF.t
type 'a t = {
id: Handle.t; (** unique identifier for this fiber *)
state: 'a state A.t; (** Current state in the lifetime of the fiber *)
res: 'a Fut.t;
runner: Runner.t;
pfiber: pfiber; (** Associated picos fiber *)
}
and 'a state =
| Alive of {
children: children;
on_cancel: cancel_callback Int_map.t;
cancel_id: int;
}
| Terminating_or_done of 'a Exn_bt.result A.t
and children = any FM.t
and any = Any : _ t -> any [@@unboxed]
(** Key to access the current moonpool.fiber. *)
let k_current_fiber : any FLS.t = FLS.create ()
exception Not_set = FLS.Not_set
let[@inline] get_cur_from_exn (pfiber : pfiber) : any =
FLS.get_exn pfiber k_current_fiber
let[@inline] get_cur_exn () : any =
get_cur_from_exn @@ get_current_fiber_exn ()
let[@inline] get_cur_opt () = try Some (get_cur_exn ()) with _ -> None
let[@inline] is_closed (self : _ t) =
match A.get self.state with
| Alive _ -> false
| Terminating_or_done _ -> true
end
include Private_
let create_ ~pfiber ~runner ~res () : 'a t =
let id = Handle.generate_fresh () in
{
state =
A.make
@@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 };
id;
res;
runner;
pfiber;
}
let create_done_ ~res () : _ t =
let id = Handle.generate_fresh () in
{
state =
A.make
@@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 };
id;
res;
runner = Runner.dummy;
pfiber = Moonpool.Private.Types_._dummy_fiber;
}
let[@inline] return x = create_done_ ~res:(Fut.return x) ()
let[@inline] fail ebt = create_done_ ~res:(Fut.fail_exn_bt ebt) ()
let[@inline] res self = self.res
let[@inline] peek self = Fut.peek self.res
let[@inline] is_done self = Fut.is_done self.res
let[@inline] is_success self = Fut.is_success self.res
let[@inline] is_cancelled self = Fut.is_failed self.res
let[@inline] on_result (self : _ t) f = Fut.on_result self.res f
let[@inline] await self = Fut.await self.res
let[@inline] wait_block self = Fut.wait_block self.res
let[@inline] wait_block_exn self = Fut.wait_block_exn self.res
(** Resolve [promise] once [children] are all done *)
let resolve_once_children_are_done_ ~children ~promise
(res : 'a Exn_bt.result A.t) : unit =
let n_children = FM.cardinal children in
if n_children > 0 then (
(* wait for all children to be done *)
let n_waiting = A.make (FM.cardinal children) in
let on_child_finish (r : _ result) =
(* make sure the parent fails if any child fails *)
(match r with
| Ok _ -> ()
| Error ebt -> A.set res (Error ebt));
(* if we're the last to finish, resolve the parent fiber's [res] *)
if A.fetch_and_add n_waiting (-1) = 1 then (
let res = A.get res in
Fut.fulfill promise res
)
in
FM.iter (fun _ (Any f) -> Fut.on_result f.res on_child_finish) children
) else
Fut.fulfill promise @@ A.get res
let rec resolve_as_failed_ : type a. a t -> Exn_bt.t -> unit =
fun self ebt ->
let promise = prom_of_fut self.res in
while
match A.get self.state with
| Alive { children; cancel_id = _; on_cancel } as old ->
let new_st = Terminating_or_done (A.make @@ Error ebt) in
if A.compare_and_set self.state old new_st then (
(* here, unlike in {!resolve_fiber}, we immediately cancel children *)
cancel_children_ ~children ebt;
Int_map.iter (fun _ cb -> cb ebt) on_cancel;
resolve_once_children_are_done_ ~children ~promise (A.make @@ Error ebt);
false
) else
true
| Terminating_or_done _ -> false
do
()
done
(** Cancel eagerly all children *)
and cancel_children_ ebt ~children : unit =
FM.iter (fun _ (Any f) -> resolve_as_failed_ f ebt) children
type cancel_handle = int
let add_on_cancel (self : _ t) cb : cancel_handle =
let h = ref 0 in
while
match A.get self.state with
| Alive { children; cancel_id; on_cancel } as old ->
let new_st =
Alive
{
children;
cancel_id = cancel_id + 1;
on_cancel = Int_map.add cancel_id cb on_cancel;
}
in
if A.compare_and_set self.state old new_st then (
h := cancel_id;
false
) else
true
| Terminating_or_done r ->
(match A.get r with
| Error ebt -> cb ebt
| Ok _ -> ());
false
do
()
done;
!h
let remove_on_cancel (self : _ t) h =
while
match A.get self.state with
| Alive ({ on_cancel; _ } as alive) as old ->
let new_st =
Alive { alive with on_cancel = Int_map.remove h on_cancel }
in
not (A.compare_and_set self.state old new_st)
| Terminating_or_done _ -> false
do
()
done
let with_on_cancel (self : _ t) cb (k : unit -> 'a) : 'a =
let h = add_on_cancel self cb in
Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
(** Successfully resolve the fiber. This might still fail if some children
failed. *)
let resolve_ok_ (self : 'a t) (r : 'a) : unit =
let r = A.make @@ Ok r in
let promise = prom_of_fut self.res in
while
match A.get self.state with
| Alive { children; _ } as old ->
let new_st = Terminating_or_done r in
if A.compare_and_set self.state old new_st then (
resolve_once_children_are_done_ ~children ~promise r;
false
) else
true
| Terminating_or_done _ -> false
do
()
done
let remove_child_ (self : _ t) (child : _ t) =
while
match A.get self.state with
| Alive ({ children; _ } as alive) as old ->
let new_st =
Alive { alive with children = FM.remove child.id children }
in
not (A.compare_and_set self.state old new_st)
| _ -> false
do
()
done
(** Add a child to [self].
@param protected if true, the child's failure will not affect [self]. *)
let add_child_ ~protect (self : _ t) (child : _ t) =
while
match A.get self.state with
| Alive ({ children; _ } as alive) as old ->
let new_st =
Alive { alive with children = FM.add child.id (Any child) children }
in
if A.compare_and_set self.state old new_st then (
(* make sure to remove [child] from [self.children] once it's done;
fail [self] is [child] failed and [protect=false] *)
Fut.on_result child.res (function
| Ok _ -> remove_child_ self child
| Error ebt ->
(* child failed, we must fail too *)
remove_child_ self child;
if not protect then resolve_as_failed_ self ebt);
false
) else
true
| Terminating_or_done r ->
(match A.get r with
| Error ebt ->
(* cancel child immediately *)
resolve_as_failed_ child ebt
| Ok _ -> ());
false
do
()
done
let spawn_ ~parent ~runner (f : unit -> 'a) : 'a t =
let res, _ = Fut.make () in
let pfiber = PF.create ~forbid:false (Fut.Private_.as_computation res) in
(* copy local hmap from parent, if present *)
Option.iter
(fun (p : _ t) -> Fls.Private_hmap_ls_.copy_fls p.pfiber pfiber)
parent;
(match parent with
| Some p when is_closed p -> failwith "spawn: nursery is closed"
| _ -> ());
let fib = create_ ~pfiber ~runner ~res () in
let run () =
(* make sure the fiber is accessible from inside itself *)
FLS.set pfiber k_current_fiber (Any fib);
try
let res = f () in
resolve_ok_ fib res
with exn ->
let bt = Printexc.get_raw_backtrace () in
let ebt = Exn_bt.make exn bt in
resolve_as_failed_ fib ebt
in
Runner.run_async ~fiber:pfiber runner run;
fib
let spawn_top ~on f : _ t = spawn_ ~runner:on ~parent:None f
let spawn ?on ?(protect = true) f : _ t =
(* spawn [f()] with a copy of our local storage *)
let (Any p) =
try get_cur_exn ()
with Not_set ->
failwith "Fiber.spawn: must be run from within another fiber."
in
let runner =
match on with
| Some r -> r
| None -> p.runner
in
let child = spawn_ ~parent:(Some p) ~runner f in
add_child_ ~protect p child;
child
let[@inline] spawn_ignore ?on ?protect f : unit =
ignore (spawn ?on ?protect f : _ t)
let[@inline] spawn_top_ignore ~on f : unit = ignore (spawn_top ~on f : _ t)
let[@inline] self () : any =
match get_cur_exn () with
| exception Not_set -> failwith "Fiber.self: must be run from inside a fiber."
| f -> f
let with_on_self_cancel cb (k : unit -> 'a) : 'a =
let (Any self) = self () in
let h = add_on_cancel self cb in
Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
let[@inline] check_if_cancelled_ (self : _ t) = PF.check self.pfiber
let check_if_cancelled () =
match get_cur_exn () with
| exception Not_set ->
failwith "Fiber.check_if_cancelled: must be run from inside a fiber."
| Any self -> check_if_cancelled_ self
let yield () : unit =
match get_cur_exn () with
| exception Not_set ->
failwith "Fiber.yield: must be run from inside a fiber."
| Any self ->
check_if_cancelled_ self;
PF.yield ();
check_if_cancelled_ self

View file

@ -1,150 +0,0 @@
(** Fibers.
A fiber is a lightweight computation that runs cooperatively alongside other
fibers. In the context of moonpool, fibers have additional properties:
- they run in a moonpool runner
- they form a simple supervision tree, enabling a limited form of structured
concurrency *)
type cancel_callback = Exn_bt.t -> unit
(** A callback used in case of cancellation *)
(**/**)
(** Do not rely on this, it is internal implementation details. *)
module Private_ : sig
type 'a state
type pfiber
type 'a t = private {
id: Handle.t; (** unique identifier for this fiber *)
state: 'a state Atomic.t; (** Current state in the lifetime of the fiber *)
res: 'a Fut.t;
runner: Runner.t;
pfiber: pfiber;
}
(** Type definition, exposed so that {!any} can be unboxed. Please do not rely
on that. *)
type any = Any : _ t -> any [@@unboxed]
exception Not_set
val get_cur_exn : unit -> any
(** [get_cur_exn ()] either returns the current fiber, or
@raise Not_set if run outside a fiber. *)
val get_cur_opt : unit -> any option
end
(**/**)
type 'a t = 'a Private_.t
(** A fiber returning a value of type ['a]. *)
val res : 'a t -> 'a Fut.t
(** Future result of the fiber. *)
type 'a callback = 'a Exn_bt.result -> unit
(** Callbacks that are called when a fiber is done. *)
(** Type erased fiber *)
type any = Private_.any = Any : _ t -> any [@@unboxed]
val return : 'a -> 'a t
val fail : Exn_bt.t -> _ t
val self : unit -> any
(** [self ()] is the current fiber. Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *)
val peek : 'a t -> 'a Fut.or_error option
(** Peek inside the future result *)
val is_done : _ t -> bool
(** Has the fiber completed? *)
val is_cancelled : _ t -> bool
(** Has the fiber completed with a failure? *)
val is_success : _ t -> bool
(** Has the fiber completed with a value? *)
val await : 'a t -> 'a
(** [await fib] is like [Fut.await (res fib)] *)
val wait_block_exn : 'a t -> 'a
(** [wait_block_exn fib] is [Fut.wait_block_exn (res fib)]. {b NOTE}: See
{!Fut.wait_block} for warnings about deadlocks. *)
val wait_block : 'a t -> 'a Fut.or_error
(** [wait_block fib] is [Fut.wait_block (res fib)]. {b NOTE}: See
{!Fut.wait_block} for warnings about deadlocks. *)
val check_if_cancelled : unit -> unit
(** Check if the current fiber is cancelled, in which case this raises. Must be
run from inside a fiber.
@raise e if the current fiber is cancelled with exception [e]
@raise Failure if not run from a fiber. *)
val yield : unit -> unit
(** Yield control to the scheduler from the current fiber.
@raise Failure if not run from inside a fiber. *)
type cancel_handle
(** An opaque handle for a single cancel callback in a fiber *)
val add_on_cancel : _ t -> cancel_callback -> cancel_handle
(** [add_on_cancel fib cb] adds [cb] to the list of cancel callbacks for [fib].
If [fib] is already cancelled, [cb] is called immediately. *)
val remove_on_cancel : _ t -> cancel_handle -> unit
(** [remove_on_cancel fib h] removes the cancel callback associated with handle
[h]. *)
val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'a
(** [with_on_cancel fib cb (fun () -> <e>)] evaluates [e] in a scope in which,
if the fiber [fib] is cancelled, [cb()] is called. If [e] returns without
the fiber being cancelled, this callback is removed. *)
val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'a
(** [with_on_self_cancel cb f] calls [f()] in a scope where [cb] is added to the
cancel callbacks of the current fiber; and [f()] terminates, [cb] is removed
from the list. *)
val on_result : 'a t -> 'a callback -> unit
(** Wait for fiber to be done and call the callback with the result. If the
fiber is done already then the callback is invoked immediately with its
result. *)
val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t
(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. This
fiber is not the child of any other fiber: its lifetime is only determined
by the lifetime of [f()]. *)
val spawn : ?on:Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a t
(** [spawn ~protect f] spawns a sub-fiber [f_child] from a running fiber
[parent]. The sub-fiber [f_child] is attached to the current fiber and fails
if the current fiber [parent] fails.
@param on
if provided, start the fiber on the given runner. If not provided, use the
parent's runner.
@param protect
if true, when [f_child] fails, it does not affect [parent]. If false,
[f_child] failing also causes [parent] to fail (and therefore all other
children of [parent]). Default is [true].
Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *)
val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit
(** [spawn_ignore f] is [ignore (spawn f)]. The fiber will still affect
termination of the parent, ie. the parent will exit only after this new
fiber exits.
@param on the optional runner to use, added since 0.7 *)
val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit
(** Like {!spawn_top} but ignores the result.
@since 0.7 *)

View file

@ -1 +0,0 @@
include Task_local_storage

View file

@ -1,17 +0,0 @@
(** Fiber-local storage.
This storage is associated to the current fiber, just like thread-local
storage is associated with the current thread.
See {!Moonpool.Task_local_storage} for more general information, as this is
based on it.
{b NOTE}: it's important to note that, while each fiber has its own storage,
spawning a sub-fiber [f2] from a fiber [f1] will only do a shallow copy of
the storage. Values inside [f1]'s storage will be physically shared with
[f2]. It is thus recommended to store only persistent values in the local
storage. *)
include module type of struct
include Task_local_storage
end

View file

@ -1,14 +0,0 @@
module A = Atomic
type t = int
let counter_ = A.make 0
let equal : t -> t -> bool = ( = )
let compare : t -> t -> int = Stdlib.compare
let[@inline] generate_fresh () = A.fetch_and_add counter_ 1
(* TODO: better hash *)
let[@inline] hash x = x land max_int
module Set = Set.Make (Int)
module Map = Map.Make (Int)

View file

@ -1,17 +0,0 @@
(** The unique name of a fiber.
Each fiber has a unique handle that can be used to refer to it in maps or
sets. *)
type t = private int
(** Unique, opaque identifier for a fiber. *)
val equal : t -> t -> bool
val compare : t -> t -> int
val hash : t -> int
val generate_fresh : unit -> t
(** Generate a fresh, unique identifier *)
module Set : Set.S with type elt = t
module Map : Map.S with type key = t

View file

@ -1,12 +0,0 @@
(** Fibers for moonpool.
See {!Fiber} for the most important explanations.
@since 0.6. *)
module Fiber = Fiber
module Fls = Fls
module Handle = Handle
module Main = Main
include Fiber
include Main

View file

@ -5,7 +5,6 @@
(>= %{ocaml_version} 5.0))
(libraries
(re_export moonpool)
moonpool.fib
picos
(re_export lwt)
lwt.unix))

View file

@ -7,8 +7,6 @@ end
module Fut = Moonpool.Fut
let default_around_task_ : WL.around_task = AT_pair (ignore, fun _ _ -> ())
let on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref =
ref (fun ebt ->
Printf.eprintf "uncaught exception in moonpool-lwt:\n%s" (Exn_bt.show ebt))
@ -90,8 +88,6 @@ end
module Ops = struct
type st = Scheduler_state.st
let around_task _ = default_around_task_
let schedule (self : st) t =
if Atomic.get self.closed then
failwith "moonpool-lwt.schedule: scheduler is closed";
@ -122,15 +118,7 @@ module Ops = struct
()
let ops : st WL.ops =
{
schedule;
around_task;
get_next_task;
on_exn;
runner;
before_start;
cleanup;
}
{ schedule; get_next_task; on_exn; runner; before_start; cleanup }
let setup st =
if Atomic.compare_and_set Scheduler_state.cur_st None (Some st) then
@ -289,7 +277,7 @@ let[@inline] is_setup () = Option.is_some @@ Atomic.get Scheduler_state.cur_st
let spawn_lwt f : _ Lwt.t =
let st = Main_state.get_st () in
let lwt_fut, lwt_prom = Lwt.wait () in
Moonpool_fib.spawn_top_ignore ~on:st.as_runner (fun () ->
M.run_async st.as_runner (fun () ->
try
let x = f () in
Lwt.wakeup lwt_prom x

15
src/private/signals_.ml Normal file
View file

@ -0,0 +1,15 @@
let ignore_signals_ () =
try
Thread.sigmask SIG_BLOCK
[
Sys.sigpipe;
Sys.sigbus;
Sys.sigterm;
Sys.sigchld;
Sys.sigalrm;
Sys.sigint;
Sys.sigusr1;
Sys.sigusr2;
]
|> ignore
with _ -> ()

View file

@ -6,7 +6,6 @@
(libraries
t_fibers
moonpool
moonpool.fib
trace
trace-tef
qcheck-core

View file

@ -2,4 +2,4 @@
(name t_fibers)
(package moonpool)
(optional)
(libraries moonpool moonpool.fib trace qcheck-core hmap))
(libraries moonpool trace qcheck-core hmap))

View file

@ -1,6 +1,7 @@
open! Moonpool
module Chan = Moonpool.Chan
module Exn_bt = Moonpool.Exn_bt
module A = Atomic
module F = Moonpool_fib.Fiber
module Fut = Moonpool.Fut
let ( let@ ) = ( @@ )
@ -49,24 +50,23 @@ let logf = Log_.logf
let run1 ~runner () =
Printf.printf "============\nstart\n%!";
let clock = ref TS.init in
let fib =
F.spawn_top ~on:runner @@ fun () ->
let fut =
Fut.spawn ~on:runner @@ fun () ->
let chan_progress = Chan.create ~max_size:4 () in
let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in
let subs =
List.init 5 (fun i ->
F.spawn ~protect:false @@ fun _n ->
Fut.spawn ~on:runner @@ fun _n ->
Thread.delay (float i *. 0.01);
Chan.pop chans.(i);
Chan.push chan_progress i;
F.check_if_cancelled ();
i)
in
logf (TS.tick_get clock) "wait for subs";
F.spawn_ignore (fun () ->
Moonpool.run_async runner (fun () ->
for i = 0 to 4 do
Chan.push chans.(i) ();
let i' = Chan.pop chan_progress in
@ -78,19 +78,15 @@ let run1 ~runner () =
(fun i f ->
let clock = ref (0 :: i :: clock0) in
logf !clock "await fiber %d" i;
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
(Option.is_some @@ F.Private_.get_cur_opt ());
let res = F.await f in
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
(Option.is_some @@ F.Private_.get_cur_opt ());
F.yield ();
let res = Fut.await f in
Fut.yield ();
logf (TS.tick_get clock) "res %d = %d" i res)
subs);
logf (TS.tick_get clock) "main fiber done"
in
Fut.await @@ F.res fib;
Fut.await fut;
logf (TS.tick_get clock) "main fiber exited";
Log_.print_and_clear ();
()
@ -99,15 +95,11 @@ let run2 ~runner () =
(* same but now, cancel one of the sub-fibers *)
Printf.printf "============\nstart\n";
let clock = ref TS.init in
let fib =
F.spawn_top ~on:runner @@ fun () ->
let@ () =
F.with_on_self_cancel (fun ebt ->
logf (TS.tick_get clock) "main fiber cancelled with %s"
@@ Exn_bt.show ebt)
in
let to_await = ref [] in
let clock = ref TS.init in
let fut =
Fut.spawn ~on:runner @@ fun () ->
let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in
let chan_progress = Chan.create ~max_size:4 () in
@ -116,11 +108,7 @@ let run2 ~runner () =
let clock0 = !clock in
List.init 10 (fun i ->
let clock = ref (0 :: i :: clock0) in
F.spawn ~protect:false @@ fun _n ->
let@ () =
F.with_on_self_cancel (fun _ ->
logf (TS.tick_get clock) "sub-fiber %d was cancelled" i)
in
Fut.spawn ~on:runner @@ fun _n ->
Thread.delay 0.002;
(* sync for determinism *)
@ -132,46 +120,51 @@ let run2 ~runner () =
failwith "oh no!"
);
F.check_if_cancelled ();
i)
in
let post = TS.tick_get clock in
List.iteri
(fun i fib ->
F.on_result fib (function
Fut.on_result fib (function
| Ok _ -> logf (i :: post) "fiber %d resolved as ok" i
| Error _ -> logf (i :: post) "fiber %d resolved as error" i))
subs;
(* sequentialize the fibers, for determinism *)
F.spawn_ignore (fun () ->
for j = 0 to 9 do
Chan.push chans_unblock.(j) ();
let j' = Chan.pop chan_progress in
assert (j = j')
done);
let sender =
Fut.spawn ~on:runner (fun () ->
for j = 0 to 9 do
Chan.push chans_unblock.(j) ();
let j' = Chan.pop chan_progress in
assert (j = j')
done)
in
to_await := sender :: !to_await;
logf (TS.tick_get clock) "wait for subs";
List.iteri
(fun i f ->
logf (TS.tick_get clock) "await fiber %d" i;
let res = F.await f in
let res = Fut.await f in
logf (TS.tick_get clock) "res %d = %d" i res)
subs;
logf (TS.tick_get clock) "yield";
F.yield ();
Fut.yield ();
logf (TS.tick_get clock) "yielded";
logf (TS.tick_get clock) "main fiber done"
in
F.on_result fib (function
Fut.on_result fut (function
| Ok () -> logf (TS.tick_get clock) "main fiber result: ok"
| Error ebt ->
logf (TS.tick_get clock) "main fiber result: error %s" (Exn_bt.show ebt));
(try Fut.await @@ F.res fib
(try Fut.await fut
with Failure msg -> logf (TS.tick_get clock) "main fib failed with %S" msg);
logf (TS.tick_get clock) "main fiber exited";
List.iter Fut.await !to_await;
Log_.print_and_clear ();
()

View file

@ -1,7 +1,7 @@
open! Moonpool
module A = Atomic
module F = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls
module F = Moonpool.Fut
module FLS = Moonpool.Task_local_storage
(* ### dummy little tracing system with local storage *)
@ -122,7 +122,7 @@ let run ~pool ~pool_name () =
let subs =
List.init 2 (fun idx_sub_sub ->
F.spawn ~protect:true (fun () ->
F.spawn ~on:pool (fun () ->
sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ()))
in
List.iter F.await subs
@ -133,8 +133,7 @@ let run ~pool ~pool_name () =
let subs =
List.init 2 (fun k ->
F.spawn ~protect:true @@ fun () ->
sub_child ~idx ~idx_child ~idx_sub:k ())
F.spawn ~on:pool @@ fun () -> sub_child ~idx ~idx_child ~idx_sub:k ())
in
let@ () =
@ -149,16 +148,14 @@ let run ~pool ~pool_name () =
let subs =
List.init 5 (fun j ->
F.spawn ~protect:true @@ fun () -> top_child ~idx ~idx_child:j ())
F.spawn ~on:pool @@ fun () -> top_child ~idx ~idx_child:j ())
in
List.iter F.await subs
in
Printf.printf "run test on pool = %s\n" pool_name;
let fibs =
List.init 8 (fun idx -> F.spawn_top ~on:pool (fun () -> top idx))
in
let fibs = List.init 8 (fun idx -> F.spawn ~on:pool (fun () -> top idx)) in
List.iter F.await fibs;
Printf.printf "tracing complete\n";

View file

@ -2,33 +2,21 @@
start
1: wait for subs
1.0.0: await fiber 0
1.0.1: cur fiber[0] is some: true
1.0.2: cur fiber[0] is some: true
1.0.3: res 0 = 0
1.0.1: res 0 = 0
1.1.0: await fiber 1
1.1.1: cur fiber[1] is some: true
1.1.2: cur fiber[1] is some: true
1.1.3: res 1 = 1
1.1.1: res 1 = 1
1.2.0: await fiber 2
1.2.1: cur fiber[2] is some: true
1.2.2: cur fiber[2] is some: true
1.2.3: res 2 = 2
1.2.1: res 2 = 2
1.3.0: await fiber 3
1.3.1: cur fiber[3] is some: true
1.3.2: cur fiber[3] is some: true
1.3.3: res 3 = 3
1.3.1: res 3 = 3
1.4.0: await fiber 4
1.4.1: cur fiber[4] is some: true
1.4.2: cur fiber[4] is some: true
1.4.3: res 4 = 4
1.4.1: res 4 = 4
2: main fiber done
3: main fiber exited
============
start
1: start fibers
1.7.1: I'm fiber 7 and I'm about to fail…
1.8.1: sub-fiber 8 was cancelled
1.9.1: sub-fiber 9 was cancelled
2.0: fiber 0 resolved as ok
2.1: fiber 1 resolved as ok
2.2: fiber 2 resolved as ok
@ -37,8 +25,8 @@ start
2.5: fiber 5 resolved as ok
2.6: fiber 6 resolved as ok
2.7: fiber 7 resolved as error
2.8: fiber 8 resolved as error
2.9: fiber 9 resolved as error
2.8: fiber 8 resolved as ok
2.9: fiber 9 resolved as ok
3: wait for subs
4: await fiber 0
5: res 0 = 0
@ -55,7 +43,6 @@ start
16: await fiber 6
17: res 6 = 6
18: await fiber 7
19: main fiber cancelled with Failure("oh no!")
20: main fiber result: error Failure("oh no!")
21: main fib failed with "oh no!"
22: main fiber exited
19: main fiber result: error Failure("oh no!")
20: main fib failed with "oh no!"
21: main fiber exited

View file

@ -1,6 +1,6 @@
let ( let@ ) = ( @@ )
let () =
let@ runner = Moonpool_fib.main in
let@ runner = Moonpool.main in
T_fibers.Fib.run1 ~runner ();
T_fibers.Fib.run2 ~runner ()

File diff suppressed because it is too large Load diff

View file

@ -3,7 +3,7 @@ open! Moonpool
let ( let@ ) = ( @@ )
let () =
let@ _ = Moonpool_fib.main in
let@ _ = Moonpool.main in
(let@ pool = Ws_pool.with_ () in
T_fibers.Fls.run ~pool ~pool_name:"ws_pool" ());

View file

@ -1,14 +1,14 @@
open Moonpool
module F = Moonpool_fib
module F = Moonpool.Fut
let ( let@ ) = ( @@ )
let () =
let r =
F.main @@ fun runner ->
let f1 = F.spawn (fun () -> 1) in
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
let f3 = F.spawn (fun () -> F.await f1 + 10) in
Moonpool.main @@ fun runner ->
let f1 = F.spawn ~on:runner (fun () -> 1) in
let f2 = F.spawn ~on:runner (fun () -> 2) in
let f3 = F.spawn ~on:runner (fun () -> F.await f1 + 10) in
let r = F.await f2 + F.await f3 in
assert (r = 13);
r
@ -19,10 +19,10 @@ let () =
(* run fibers in the background, await them in the main thread *)
let@ bg = Fifo_pool.with_ ~num_threads:4 () in
let r =
F.main @@ fun runner ->
let f1 = F.spawn_top ~on:bg (fun () -> 1) in
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
let f3 = F.spawn_top ~on:bg (fun () -> F.await f1 + 10) in
Moonpool.main @@ fun runner ->
let f1 = F.spawn ~on:bg (fun () -> 1) in
let f2 = F.spawn ~on:runner (fun () -> 2) in
let f3 = F.spawn ~on:bg (fun () -> F.await f1 + 10) in
let r = F.await f2 + F.await f3 in
assert (r = 13);
r
@ -32,8 +32,8 @@ let () =
let () =
try
let _r =
F.main @@ fun _r ->
let fib = F.spawn (fun () -> failwith "oops") in
Moonpool.main @@ fun runner ->
let fib = F.spawn ~on:runner (fun () -> failwith "oops") in
F.await fib
in

View file

@ -6,7 +6,6 @@
(libraries
t_fibers
moonpool
moonpool.fib
moonpool-lwt
hmap
trace

View file

@ -2,33 +2,21 @@
start
1: wait for subs
1.0.0: await fiber 0
1.0.1: cur fiber[0] is some: true
1.0.2: cur fiber[0] is some: true
1.0.3: res 0 = 0
1.0.1: res 0 = 0
1.1.0: await fiber 1
1.1.1: cur fiber[1] is some: true
1.1.2: cur fiber[1] is some: true
1.1.3: res 1 = 1
1.1.1: res 1 = 1
1.2.0: await fiber 2
1.2.1: cur fiber[2] is some: true
1.2.2: cur fiber[2] is some: true
1.2.3: res 2 = 2
1.2.1: res 2 = 2
1.3.0: await fiber 3
1.3.1: cur fiber[3] is some: true
1.3.2: cur fiber[3] is some: true
1.3.3: res 3 = 3
1.3.1: res 3 = 3
1.4.0: await fiber 4
1.4.1: cur fiber[4] is some: true
1.4.2: cur fiber[4] is some: true
1.4.3: res 4 = 4
1.4.1: res 4 = 4
2: main fiber done
3: main fiber exited
============
start
1: start fibers
1.7.1: I'm fiber 7 and I'm about to fail…
1.8.1: sub-fiber 8 was cancelled
1.9.1: sub-fiber 9 was cancelled
2.0: fiber 0 resolved as ok
2.1: fiber 1 resolved as ok
2.2: fiber 2 resolved as ok
@ -37,8 +25,8 @@ start
2.5: fiber 5 resolved as ok
2.6: fiber 6 resolved as ok
2.7: fiber 7 resolved as error
2.8: fiber 8 resolved as error
2.9: fiber 9 resolved as error
2.8: fiber 8 resolved as ok
2.9: fiber 9 resolved as ok
3: wait for subs
4: await fiber 0
5: res 0 = 0
@ -55,7 +43,6 @@ start
16: await fiber 6
17: res 6 = 6
18: await fiber 7
19: main fiber cancelled with Failure("oh no!")
20: main fiber result: error Failure("oh no!")
21: main fib failed with "oh no!"
22: main fiber exited
19: main fiber result: error Failure("oh no!")
20: main fib failed with "oh no!"
21: main fiber exited

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,6 @@
open Moonpool
module M_lwt = Moonpool_lwt
module F = Moonpool_fib
module F = Moonpool.Fut
let ( let@ ) = ( @@ )
@ -9,9 +9,9 @@ let () =
let@ bg = Fifo_pool.with_ ~num_threads:4 () in
let r =
M_lwt.lwt_main @@ fun runner ->
let f1 = F.spawn_top ~on:bg (fun () -> 1) in
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
let f3 = F.spawn_top ~on:runner (fun () -> F.await f1 + 10) in
let f1 = F.spawn ~on:bg (fun () -> 1) in
let f2 = F.spawn ~on:runner (fun () -> 2) in
let f3 = F.spawn ~on:runner (fun () -> F.await f1 + 10) in
let r = F.await f2 + F.await f3 in
assert (r = 13);
r
@ -24,7 +24,7 @@ let () =
try
let _r =
M_lwt.lwt_main @@ fun runner ->
let fib = F.spawn_top ~on:runner (fun () -> failwith "oops") in
let fib = F.spawn ~on:runner (fun () -> failwith "oops") in
F.await fib
in

View file

@ -14,14 +14,11 @@ let run ~kind () =
let pool =
let on_init_thread ~dom_id:_ ~t_id () =
Trace.set_thread_name (Printf.sprintf "pool worker %d" t_id)
and around_task =
( (fun self -> Trace.counter_int "n_tasks" (Ws_pool.num_tasks self)),
fun self () -> Trace.counter_int "n_tasks" (Ws_pool.num_tasks self) )
in
match kind with
| `Simple -> Fifo_pool.create ~num_threads:3 ~on_init_thread ~around_task ()
| `Ws_pool -> Ws_pool.create ~num_threads:3 ~on_init_thread ~around_task ()
| `Simple -> Fifo_pool.create ~num_threads:3 ~on_init_thread ()
| `Ws_pool -> Ws_pool.create ~num_threads:3 ~on_init_thread ()
in
(* make all threads busy *)