mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 03:05:30 -05:00
Compare commits
18 commits
d957f7b54e
...
4de33f0121
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4de33f0121 | ||
|
|
58a0f891f7 | ||
|
|
b1688f71e7 | ||
|
|
794b263d36 | ||
|
|
a40ea8b41b | ||
|
|
40e97d969a | ||
|
|
c3f235f7e9 | ||
|
|
0b28898586 | ||
|
|
997d996c13 | ||
|
|
ee7972910f | ||
|
|
2ce3fa7d3e | ||
|
|
8770d4fb9c | ||
|
|
95de0e7e27 | ||
|
|
4924b5f52b | ||
|
|
db9cddf999 | ||
|
|
f9ab951c36 | ||
|
|
2aa2612963 | ||
|
|
f92efa562d |
51 changed files with 6103 additions and 6654 deletions
12
CHANGES.md
12
CHANGES.md
|
|
@ -1,4 +1,16 @@
|
|||
|
||||
# 0.10
|
||||
|
||||
- breaking: remove `around_task` from schedulers
|
||||
- breaking: remove `moonpool.fib` entirely. Please use `picos_std.structured`
|
||||
if you really need structured concurrency.
|
||||
- remove deprecated moonpool-io and moonpool.sync
|
||||
|
||||
- feat core: add `Main`, salvaged from moonpool.fib
|
||||
- block signals in background threads
|
||||
- refactor `chan`; fix bug in `Chan.try_push`
|
||||
- fix: make `Moonpool_lwt.fut_of_lwt` idempotent
|
||||
|
||||
# 0.9
|
||||
|
||||
- breaking: require OCaml 5
|
||||
|
|
|
|||
8
Makefile
8
Makefile
|
|
@ -67,6 +67,14 @@ bench-pi:
|
|||
'./_build/default/benchs/pi.exe -n $(PI_NSTEPS) -j 16 -mode forkjoin -kind=pool' \
|
||||
'./_build/default/benchs/pi.exe -n $(PI_NSTEPS) -j 20 -mode forkjoin -kind=pool'
|
||||
|
||||
bench-repro-41:
|
||||
dune build $(DUNE_OPTS_BENCH) examples/repro_41/run.exe
|
||||
hyperfine --warmup=1 \
|
||||
"./_build/default/examples/repro_41/run.exe 4 domainslib" \
|
||||
"./_build/default/examples/repro_41/run.exe 4 moonpool" \
|
||||
"./_build/default/examples/repro_41/run.exe 5 moonpool" \
|
||||
"./_build/default/examples/repro_41/run.exe 5 seq"
|
||||
|
||||
.PHONY: test clean bench-fib bench-pi
|
||||
|
||||
VERSION=$(shell awk '/^version:/ {print $$2}' moonpool.opam)
|
||||
|
|
|
|||
44
README.md
44
README.md
|
|
@ -173,49 +173,9 @@ val expected_sum : int = 5050
|
|||
We have a `Exn_bt.t` type that comes in handy in many places. It bundles together
|
||||
an exception and the backtrace associated with the place the exception was caught.
|
||||
|
||||
### Fibers
|
||||
### Local storage
|
||||
|
||||
On OCaml 5, Moonpool comes with a library `moonpool.fib` (module `Moonpool_fib`)
|
||||
which provides _lightweight fibers_
|
||||
that can run on any Moonpool runner.
|
||||
These fibers are a sort of lightweight thread, dispatched on the runner's
|
||||
background thread(s).
|
||||
Fibers rely on effects to implement `Fiber.await`, suspending themselves until the `await`-ed fiber
|
||||
is done.
|
||||
|
||||
```ocaml
|
||||
# #require "moonpool.fib";;
|
||||
...
|
||||
|
||||
# (* convenient alias *)
|
||||
module F = Moonpool_fib;;
|
||||
module F = Moonpool_fib
|
||||
# F.main (fun _runner ->
|
||||
let f1 = F.spawn (fun () -> fib 10) in
|
||||
let f2 = F.spawn (fun () -> fib 15) in
|
||||
F.await f1 + F.await f2);;
|
||||
- : int = 1076
|
||||
```
|
||||
|
||||
Fibers form a _tree_, where a fiber calling `Fiber.spawn` to start a sub-fiber is
|
||||
the sub-fiber's _parent_.
|
||||
When a parent fails, all its children are cancelled (forced to fail).
|
||||
This is a simple form of [Structured Concurrency](https://en.wikipedia.org/wiki/Structured_concurrency).
|
||||
|
||||
Like a future, a fiber eventually _resolves_ into a value (or an `Exn_bt.t`) that it's possible
|
||||
to `await`. With `Fiber.res : 'a Fiber.t -> 'a Fut.t` it's possible to access that result
|
||||
as a regular future, too.
|
||||
However, this resolution is only done after all the children of the fiber have
|
||||
resolved — the lifetime of fibers forms a well-nested tree in that sense.
|
||||
|
||||
When a fiber is suspended because it `await`s another fiber (or future), the scheduler's
|
||||
thread on which it was running becomes available again and can go on process another task.
|
||||
When the fiber resumes, it will automatically be re-scheduled on the same runner it started on.
|
||||
This means fibers on pool P1 can await fibers from pool P2 and still be resumed on P1.
|
||||
|
||||
In addition to all that, fibers provide _fiber local storage_ (like thread-local storage, but per fiber).
|
||||
This storage is inherited in `spawn` (as a shallow copy only — it's advisable to only
|
||||
put persistent data in storage to avoid confusing aliasing).
|
||||
Moonpool, via picos, provides _task local storage_ (like thread-local storage, but per task).
|
||||
The storage is convenient for carrying around context for cross-cutting concerns such
|
||||
as logging or tracing (e.g. a log tag for the current user or request ID, or a tracing
|
||||
scope).
|
||||
|
|
|
|||
2
dune
2
dune
|
|
@ -3,7 +3,7 @@
|
|||
(flags :standard -strict-sequence -warn-error -a+8 -w +a-4-40-42-70)))
|
||||
|
||||
(mdx
|
||||
(libraries moonpool moonpool.forkjoin moonpool.fib threads)
|
||||
(libraries moonpool moonpool.forkjoin threads)
|
||||
(package moonpool)
|
||||
(enabled_if
|
||||
(>= %{ocaml_version} 5.0)))
|
||||
|
|
|
|||
72
dune-project
72
dune-project
|
|
@ -1,13 +1,20 @@
|
|||
(lang dune 3.0)
|
||||
|
||||
(using mdx 0.2)
|
||||
|
||||
(name moonpool)
|
||||
(version 0.9)
|
||||
|
||||
(version 0.10)
|
||||
|
||||
(generate_opam_files true)
|
||||
|
||||
(source
|
||||
(github c-cube/moonpool))
|
||||
|
||||
(authors "Simon Cruanes")
|
||||
|
||||
(maintainers "Simon Cruanes")
|
||||
|
||||
(license MIT)
|
||||
|
||||
;(documentation https://url/to/documentation)
|
||||
|
|
@ -16,41 +23,60 @@
|
|||
(name moonpool)
|
||||
(synopsis "Pools of threads supported by a pool of domains")
|
||||
(depends
|
||||
(ocaml (>= 5.0))
|
||||
(ocaml
|
||||
(>= 5.0))
|
||||
dune
|
||||
(either (>= 1.0))
|
||||
(either
|
||||
(>= 1.0))
|
||||
(trace :with-test)
|
||||
(trace-tef :with-test)
|
||||
(qcheck-core (and :with-test (>= 0.19)))
|
||||
(thread-local-storage (and (>= 0.2) (< 0.3)))
|
||||
(qcheck-core
|
||||
(and
|
||||
:with-test
|
||||
(>= 0.19)))
|
||||
(thread-local-storage
|
||||
(and
|
||||
(>= 0.2)
|
||||
(< 0.3)))
|
||||
(odoc :with-doc)
|
||||
(hmap :with-test)
|
||||
(picos (and (>= 0.5) (< 0.7)))
|
||||
(picos_std (and (>= 0.5) (< 0.7)))
|
||||
(picos
|
||||
(and
|
||||
(>= 0.5)
|
||||
(< 0.7)))
|
||||
(picos_std
|
||||
(and
|
||||
(>= 0.5)
|
||||
(< 0.7)))
|
||||
(mdx
|
||||
(and
|
||||
(>= 1.9.0)
|
||||
:with-test)))
|
||||
(depopts
|
||||
hmap
|
||||
(trace (>= 0.6)))
|
||||
hmap
|
||||
(trace
|
||||
(>= 0.6)))
|
||||
(tags
|
||||
(thread pool domain futures fork-join)))
|
||||
|
||||
(package
|
||||
(name moonpool-lwt)
|
||||
(synopsis "Event loop for moonpool based on Lwt-engine (experimental)")
|
||||
(allow_empty) ; on < 5.0
|
||||
(depends
|
||||
(moonpool (= :version))
|
||||
(ocaml (>= 5.0))
|
||||
(qcheck-core (and :with-test (>= 0.19)))
|
||||
(hmap :with-test)
|
||||
lwt
|
||||
base-unix
|
||||
(trace :with-test)
|
||||
(trace-tef :with-test)
|
||||
(odoc :with-doc)))
|
||||
|
||||
(name moonpool-lwt)
|
||||
(synopsis "Event loop for moonpool based on Lwt-engine (experimental)")
|
||||
(allow_empty) ; on < 5.0
|
||||
(depends
|
||||
(moonpool
|
||||
(= :version))
|
||||
(ocaml
|
||||
(>= 5.0))
|
||||
(qcheck-core
|
||||
(and
|
||||
:with-test
|
||||
(>= 0.19)))
|
||||
(hmap :with-test)
|
||||
lwt
|
||||
base-unix
|
||||
(trace :with-test)
|
||||
(trace-tef :with-test)
|
||||
(odoc :with-doc)))
|
||||
|
||||
; See the complete stanza docs at https://dune.readthedocs.io/en/stable/dune-files.html#dune-project
|
||||
|
|
|
|||
|
|
@ -1,11 +1,12 @@
|
|||
(** Example from
|
||||
https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 *)
|
||||
(** NOTE: this was an example from
|
||||
https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 but
|
||||
there is no cancelation anymore :) *)
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
|
||||
let () =
|
||||
let@ () = Trace_tef.with_setup () in
|
||||
let@ _ = Moonpool_fib.main in
|
||||
let@ _ = Moonpool.main in
|
||||
|
||||
(* let@ runner = Moonpool.Ws_pool.with_ () in *)
|
||||
let@ runner = Moonpool.Background_thread.with_ () in
|
||||
|
|
@ -13,15 +14,13 @@ let () =
|
|||
(* Pretend this is some long-running read loop *)
|
||||
for i = 1 to 10 do
|
||||
Printf.printf "MAIN LOOP %d\n%!" i;
|
||||
Moonpool_fib.check_if_cancelled ();
|
||||
let _ : _ Moonpool_fib.t =
|
||||
Moonpool_fib.spawn ~on:runner ~protect:false (fun () ->
|
||||
let _ : _ Moonpool.Fut.t =
|
||||
Moonpool.Fut.spawn ~on:runner (fun () ->
|
||||
Printf.printf "RUN FIBER %d\n%!" i;
|
||||
Moonpool_fib.check_if_cancelled ();
|
||||
Format.printf "FIBER %d NOT CANCELLED YET@." i;
|
||||
failwith "BOOM")
|
||||
in
|
||||
Moonpool_fib.yield ();
|
||||
Moonpool.Fut.yield ();
|
||||
(* Thread.delay 0.2; *)
|
||||
(* Thread.delay 0.0001; *)
|
||||
()
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@
|
|||
;(package moonpool)
|
||||
(libraries
|
||||
moonpool
|
||||
moonpool.fib
|
||||
trace
|
||||
trace-tef
|
||||
;tracy-client.trace
|
||||
|
|
|
|||
5
examples/repro_41/dune
Normal file
5
examples/repro_41/dune
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
(executables
|
||||
(names run)
|
||||
(enabled_if
|
||||
(>= %{ocaml_version} 5.0))
|
||||
(libraries moonpool trace trace-tef domainslib))
|
||||
54
examples/repro_41/run.ml
Normal file
54
examples/repro_41/run.ml
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
(* fibo.ml *)
|
||||
let cutoff = 25
|
||||
let input = 40
|
||||
|
||||
let rec fibo_seq n =
|
||||
if n <= 1 then
|
||||
n
|
||||
else
|
||||
fibo_seq (n - 1) + fibo_seq (n - 2)
|
||||
|
||||
let rec fibo_domainslib ctx n =
|
||||
if n <= cutoff then
|
||||
fibo_seq n
|
||||
else
|
||||
let open Domainslib in
|
||||
let fut1 = Task.async ctx (fun () -> fibo_domainslib ctx (n - 1)) in
|
||||
let fut2 = Task.async ctx (fun () -> fibo_domainslib ctx (n - 2)) in
|
||||
Task.await ctx fut1 + Task.await ctx fut2
|
||||
|
||||
let rec fibo_moonpool ctx n =
|
||||
if n <= cutoff then
|
||||
fibo_seq n
|
||||
else
|
||||
let open Moonpool in
|
||||
let fut1 = Fut.spawn ~on:ctx (fun () -> fibo_moonpool ctx (n - 1)) in
|
||||
let fut2 = Fut.spawn ~on:ctx (fun () -> fibo_moonpool ctx (n - 2)) in
|
||||
Fut.await fut1 + Fut.await fut2
|
||||
|
||||
let usage =
|
||||
"fibo.exe <num_domains> [ domainslib | moonpool | moonpool_fifo | seq ]"
|
||||
|
||||
let num_domains = try int_of_string Sys.argv.(1) with _ -> failwith usage
|
||||
let implem = try Sys.argv.(2) with _ -> failwith usage
|
||||
|
||||
let () =
|
||||
let output =
|
||||
match implem with
|
||||
| "moonpool" ->
|
||||
let open Moonpool in
|
||||
let ctx = Ws_pool.create ~num_threads:num_domains () in
|
||||
Ws_pool.run_wait_block ctx (fun () -> fibo_moonpool ctx input)
|
||||
| "moonpool_fifo" ->
|
||||
let open Moonpool in
|
||||
let ctx = Fifo_pool.create ~num_threads:num_domains () in
|
||||
Ws_pool.run_wait_block ctx (fun () -> fibo_moonpool ctx input)
|
||||
| "domainslib" ->
|
||||
let open Domainslib in
|
||||
let pool = Task.setup_pool ~num_domains () in
|
||||
Task.run pool (fun () -> fibo_domainslib pool input)
|
||||
| "seq" -> fibo_seq input
|
||||
| _ -> failwith usage
|
||||
in
|
||||
print_int output;
|
||||
print_newline ()
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
# This file is generated by dune, edit dune-project instead
|
||||
opam-version: "2.0"
|
||||
version: "0.9"
|
||||
version: "0.10"
|
||||
synopsis: "Event loop for moonpool based on Lwt-engine (experimental)"
|
||||
maintainer: ["Simon Cruanes"]
|
||||
authors: ["Simon Cruanes"]
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
# This file is generated by dune, edit dune-project instead
|
||||
opam-version: "2.0"
|
||||
version: "0.9"
|
||||
version: "0.10"
|
||||
synopsis: "Pools of threads supported by a pool of domains"
|
||||
maintainer: ["Simon Cruanes"]
|
||||
authors: ["Simon Cruanes"]
|
||||
|
|
|
|||
|
|
@ -6,18 +6,15 @@ type ('a, 'b) create_args =
|
|||
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
|
||||
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
|
||||
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
|
||||
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
|
||||
?name:string ->
|
||||
'a
|
||||
(** Arguments used in {!create}. See {!create} for explanations. *)
|
||||
|
||||
let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?name () : t =
|
||||
Fifo_pool.create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?name
|
||||
~num_threads:1 ()
|
||||
let create ?on_init_thread ?on_exit_thread ?on_exn ?name () : t =
|
||||
Fifo_pool.create ?on_init_thread ?on_exit_thread ?on_exn ?name ~num_threads:1
|
||||
()
|
||||
|
||||
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?name () f =
|
||||
let pool =
|
||||
create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?name ()
|
||||
in
|
||||
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?name () f =
|
||||
let pool = create ?on_init_thread ?on_exit_thread ?on_exn ?name () in
|
||||
let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in
|
||||
f pool
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@ type ('a, 'b) create_args =
|
|||
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
|
||||
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
|
||||
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
|
||||
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
|
||||
?name:string ->
|
||||
'a
|
||||
(** Arguments used in {!create}. See {!create} for explanations. *)
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ let ( let@ ) = ( @@ )
|
|||
type state = {
|
||||
threads: Thread.t array;
|
||||
q: task_full Bb_queue.t; (** Queue for tasks. *)
|
||||
around_task: WL.around_task;
|
||||
mutable as_runner: t;
|
||||
(* init options *)
|
||||
name: string option;
|
||||
|
|
@ -43,13 +42,10 @@ type ('a, 'b) create_args =
|
|||
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
|
||||
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
|
||||
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
|
||||
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
|
||||
?num_threads:int ->
|
||||
?name:string ->
|
||||
'a
|
||||
|
||||
let default_around_task_ : WL.around_task = AT_pair (ignore, fun _ _ -> ())
|
||||
|
||||
(** Run [task] as is, on the pool. *)
|
||||
let schedule_ (self : state) (task : task_full) : unit =
|
||||
try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown
|
||||
|
|
@ -88,7 +84,6 @@ let cleanup (self : worker_state) : unit =
|
|||
|
||||
let worker_ops : worker_state WL.ops =
|
||||
let runner (st : worker_state) = st.st.as_runner in
|
||||
let around_task st = st.st.around_task in
|
||||
let on_exn (st : worker_state) (ebt : Exn_bt.t) =
|
||||
st.st.on_exn (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||
in
|
||||
|
|
@ -96,7 +91,6 @@ let worker_ops : worker_state WL.ops =
|
|||
WL.schedule = schedule_w;
|
||||
runner;
|
||||
get_next_task;
|
||||
around_task;
|
||||
on_exn;
|
||||
before_start;
|
||||
cleanup;
|
||||
|
|
@ -104,19 +98,11 @@ let worker_ops : worker_state WL.ops =
|
|||
|
||||
let create_ ?(on_init_thread = default_thread_init_exit_)
|
||||
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
|
||||
?around_task ~threads ?name () : state =
|
||||
(* wrapper *)
|
||||
let around_task =
|
||||
match around_task with
|
||||
| Some (f, g) -> WL.AT_pair (f, g)
|
||||
| None -> default_around_task_
|
||||
in
|
||||
|
||||
~threads ?name () : state =
|
||||
let self =
|
||||
{
|
||||
threads;
|
||||
q = Bb_queue.create ();
|
||||
around_task;
|
||||
as_runner = Runner.dummy;
|
||||
name;
|
||||
on_init_thread;
|
||||
|
|
@ -127,8 +113,7 @@ let create_ ?(on_init_thread = default_thread_init_exit_)
|
|||
self.as_runner <- runner_of_state self;
|
||||
self
|
||||
|
||||
let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
|
||||
?name () : t =
|
||||
let create ?on_init_thread ?on_exit_thread ?on_exn ?num_threads ?name () : t =
|
||||
let num_domains = Domain_pool_.max_number_of_domains () in
|
||||
|
||||
(* number of threads to run *)
|
||||
|
|
@ -140,8 +125,7 @@ let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
|
|||
let pool =
|
||||
let dummy_thread = Thread.self () in
|
||||
let threads = Array.make num_threads dummy_thread in
|
||||
create_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ~threads ?name
|
||||
()
|
||||
create_ ?on_init_thread ?on_exit_thread ?on_exn ~threads ?name ()
|
||||
in
|
||||
let runner = runner_of_state pool in
|
||||
|
||||
|
|
@ -181,11 +165,9 @@ let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
|
|||
|
||||
runner
|
||||
|
||||
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
|
||||
?name () f =
|
||||
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?num_threads ?name () f =
|
||||
let pool =
|
||||
create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
|
||||
?name ()
|
||||
create ?on_init_thread ?on_exit_thread ?on_exn ?num_threads ?name ()
|
||||
in
|
||||
let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in
|
||||
f pool
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ type ('a, 'b) create_args =
|
|||
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
|
||||
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
|
||||
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
|
||||
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
|
||||
?num_threads:int ->
|
||||
?name:string ->
|
||||
'a
|
||||
|
|
@ -35,9 +34,6 @@ val create : (unit -> t, _) create_args
|
|||
[Domain.recommended_domain_count()], ie one worker per CPU core. On OCaml
|
||||
4 the default is [4] (since there is only one domain).
|
||||
@param on_exit_thread called at the end of each worker thread in the pool.
|
||||
@param around_task
|
||||
a pair of [before, after] functions ran around each task. See
|
||||
{!Pool.create_args}.
|
||||
@param name name for the pool, used in tracing (since 0.6) *)
|
||||
|
||||
val with_ : (unit -> (t -> 'a) -> 'a, _) create_args
|
||||
|
|
|
|||
|
|
@ -436,6 +436,8 @@ let await (self : 'a t) : 'a =
|
|||
(* un-suspended: we should have a result! *)
|
||||
get_or_fail_exn self
|
||||
|
||||
let yield = Picos.Fiber.yield
|
||||
|
||||
module Infix = struct
|
||||
let[@inline] ( >|= ) x f = map ~f x
|
||||
let[@inline] ( >>= ) x f = bind ~f x
|
||||
|
|
|
|||
|
|
@ -8,12 +8,16 @@
|
|||
(storing a [Ok x] with [x: 'a]), or fail (storing a [Error (exn, bt)] with
|
||||
an exception and the corresponding backtrace).
|
||||
|
||||
Using {!spawn}, it's possible to start a bunch of tasks, obtaining futures,
|
||||
and then use {!await} to get their result in the desired order.
|
||||
|
||||
Combinators such as {!map} and {!join_array} can be used to produce futures
|
||||
from other futures (in a monadic way). Some combinators take a [on] argument
|
||||
to specify a runner on which the intermediate computation takes place; for
|
||||
example [map ~on:pool ~f fut] maps the value in [fut] using function [f],
|
||||
applicatively; the call to [f] happens on the runner [pool] (once [fut]
|
||||
resolves successfully with a value). *)
|
||||
resolves successfully with a value). Be aware that these combinators do not
|
||||
preserve local storage. *)
|
||||
|
||||
type 'a or_error = ('a, Exn_bt.t) result
|
||||
|
||||
|
|
@ -30,7 +34,8 @@ val make : unit -> 'a t * 'a promise
|
|||
|
||||
val make_promise : unit -> 'a promise
|
||||
(** Same as {!make} but returns a single promise (which can be upcast to a
|
||||
future). This is useful mostly to preserve memory.
|
||||
future). This is useful mostly to preserve memory, you probably don't need
|
||||
it.
|
||||
|
||||
How to upcast to a future in the worst case:
|
||||
{[
|
||||
|
|
@ -40,8 +45,11 @@ val make_promise : unit -> 'a promise
|
|||
@since 0.7 *)
|
||||
|
||||
val on_result : 'a t -> ('a or_error -> unit) -> unit
|
||||
(** [on_result fut f] registers [f] to be called in the future when [fut] is set
|
||||
; or calls [f] immediately if [fut] is already set. *)
|
||||
(** [on_result fut f] registers [f] to be called in the future when [fut] is
|
||||
set; or calls [f] immediately if [fut] is already set.
|
||||
|
||||
{b NOTE:} it's ill advised to do meaningful work inside the callback [f].
|
||||
Instead, try to spawn another task on the runner, or use {!await}. *)
|
||||
|
||||
val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
|
||||
(** [on_result_ignore fut f] registers [f] to be called in the future when [fut]
|
||||
|
|
@ -52,13 +60,14 @@ val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
|
|||
exception Already_fulfilled
|
||||
|
||||
val try_cancel : _ promise -> Exn_bt.t -> bool
|
||||
(** [try_cancel promise ebt] tries to cancel the promise, returning [true]. It
|
||||
returns [false] if the promise is already resolved.
|
||||
@since NEXT_RELEASE *)
|
||||
(** [try_cancel promise ebt] tries to cancel the promise using the given
|
||||
exception, returning [true]. It returns [false] if the promise is already
|
||||
resolved.
|
||||
@since 0.9 *)
|
||||
|
||||
val cancel : _ promise -> Exn_bt.t -> unit
|
||||
(** Silent version of {!try_cancel}, ignoring the result.
|
||||
@since NEXT_RELEASE *)
|
||||
@since 0.9 *)
|
||||
|
||||
val fulfill : 'a promise -> 'a or_error -> unit
|
||||
(** Fullfill the promise, setting the future at the same time.
|
||||
|
|
@ -79,6 +88,7 @@ val fail_exn_bt : Exn_bt.t -> _ t
|
|||
@since 0.6 *)
|
||||
|
||||
val of_result : 'a or_error -> 'a t
|
||||
(** Already resolved future from a result. *)
|
||||
|
||||
val is_resolved : _ t -> bool
|
||||
(** [is_resolved fut] is [true] iff [fut] is resolved. *)
|
||||
|
|
@ -136,7 +146,7 @@ val spawn_on_current_runner : (unit -> 'a) -> 'a t
|
|||
|
||||
val reify_error : 'a t -> 'a or_error t
|
||||
(** [reify_error fut] turns a failing future into a non-failing one that contain
|
||||
[Error (exn, bt)]. A non-failing future returning [x] is turned into [Ok x]
|
||||
[Error (exn, bt)]. A non-failing future returning [x] is turned into [Ok x].
|
||||
@since 0.4 *)
|
||||
|
||||
val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t
|
||||
|
|
@ -149,12 +159,18 @@ val bind : ?on:Runner.t -> f:('a -> 'b t) -> 'a t -> 'b t
|
|||
(** [bind ?on ~f fut] returns a new future [fut2] that resolves like the future
|
||||
[f x] if [fut] resolved with [x]; and fails with [e] if [fut] fails with [e]
|
||||
or [f x] raises [e].
|
||||
|
||||
This does not preserve local storage of [fut] inside [f].
|
||||
|
||||
@param on if provided, [f] runs on the given runner *)
|
||||
|
||||
val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t
|
||||
(** [bind_reify_error ?on ~f fut] returns a new future [fut2] that resolves like
|
||||
the future [f (Ok x)] if [fut] resolved with [x]; and resolves like the
|
||||
future [f (Error (exn, bt))] if [fut] fails with [exn] and backtrace [bt].
|
||||
|
||||
This does not preserve local storage of [fut] inside [f].
|
||||
|
||||
@param on if provided, [f] runs on the given runner
|
||||
@since 0.4 *)
|
||||
|
||||
|
|
@ -182,6 +198,7 @@ val join_array : 'a t array -> 'a array t
|
|||
val join_list : 'a t list -> 'a list t
|
||||
(** Wait for all the futures in the list. Fails if any future fails. *)
|
||||
|
||||
(** Advanced primitives for synchronization *)
|
||||
module Advanced : sig
|
||||
val barrier_on_abstract_container_of_futures :
|
||||
iter:(('a t -> unit) -> 'cont -> unit) ->
|
||||
|
|
@ -234,7 +251,9 @@ val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t
|
|||
|
||||
(** {2 Await}
|
||||
|
||||
{b NOTE} This is only available on OCaml 5. *)
|
||||
This suspends the current task using an OCaml 5 algebraic effect, and makes
|
||||
preparations for the task to be resumed once the future has been resolved.
|
||||
*)
|
||||
|
||||
val await : 'a t -> 'a
|
||||
(** [await fut] suspends the current tasks until [fut] is fulfilled, then
|
||||
|
|
@ -244,7 +263,11 @@ val await : 'a t -> 'a
|
|||
@since 0.3
|
||||
|
||||
This must only be run from inside the runner itself. The runner must support
|
||||
{!Suspend_}. {b NOTE}: only on OCaml 5.x *)
|
||||
{!Suspend_}. *)
|
||||
|
||||
val yield : unit -> unit
|
||||
(** Like {!Moonpool.yield}.
|
||||
@since 0.10 *)
|
||||
|
||||
(** {2 Blocking} *)
|
||||
|
||||
|
|
@ -252,7 +275,7 @@ val wait_block : 'a t -> 'a or_error
|
|||
(** [wait_block fut] blocks the current thread until [fut] is resolved, and
|
||||
returns its value.
|
||||
|
||||
{b NOTE}: A word of warning: this will monopolize the calling thread until
|
||||
{b NOTE:} A word of warning: this will monopolize the calling thread until
|
||||
the future resolves. This can also easily cause deadlocks, if enough threads
|
||||
in a pool call [wait_block] on futures running on the same pool or a pool
|
||||
depending on it.
|
||||
|
|
@ -265,7 +288,10 @@ val wait_block : 'a t -> 'a or_error
|
|||
the deadlock. *)
|
||||
|
||||
val wait_block_exn : 'a t -> 'a
|
||||
(** Same as {!wait_block} but re-raises the exception if the future failed. *)
|
||||
(** Same as {!wait_block} but re-raises the exception if the future failed.
|
||||
|
||||
{b NOTE:} do check the cautionary note in {!wait_block} concerning
|
||||
deadlocks. *)
|
||||
|
||||
(** {2 Infix operators}
|
||||
|
||||
|
|
@ -297,9 +323,10 @@ module Infix_local = Infix
|
|||
|
||||
module Private_ : sig
|
||||
val unsafe_promise_of_fut : 'a t -> 'a promise
|
||||
(** please do not use *)
|
||||
(** Do not use unless you know exactly what you are doing. *)
|
||||
|
||||
val as_computation : 'a t -> 'a Picos.Computation.t
|
||||
(** Picos compat *)
|
||||
end
|
||||
|
||||
(**/**)
|
||||
|
|
|
|||
|
|
@ -8,15 +8,15 @@ let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
|
|||
in
|
||||
let runner = Fifo_pool.Private_.runner_of_state worker_st in
|
||||
try
|
||||
let fiber = Fiber.spawn_top ~on:runner (fun () -> f runner) in
|
||||
Fiber.on_result fiber (fun _ -> Runner.shutdown_without_waiting runner);
|
||||
let fut = Fut.spawn ~on:runner (fun () -> f runner) in
|
||||
Fut.on_result fut (fun _ -> Runner.shutdown_without_waiting runner);
|
||||
|
||||
(* run the main thread *)
|
||||
Moonpool.Private.Worker_loop_.worker_loop worker_st
|
||||
Worker_loop_.worker_loop worker_st
|
||||
~block_signals (* do not disturb existing thread *)
|
||||
~ops:Fifo_pool.Private_.worker_ops;
|
||||
|
||||
match Fiber.peek fiber with
|
||||
match Fut.peek fut with
|
||||
| Some (Ok x) -> x
|
||||
| Some (Error ebt) -> Exn_bt.raise ebt
|
||||
| None -> assert false
|
||||
|
|
@ -13,16 +13,18 @@
|
|||
to {!Background_thread} in that there's a single worker to process
|
||||
tasks/fibers.
|
||||
|
||||
This handles effects, including the ones in {!Fiber}.
|
||||
This handles the concurency effects used in moonpool, including [await] and
|
||||
[yield].
|
||||
|
||||
@since 0.6 *)
|
||||
This module was migrated from the late [Moonpool_fib].
|
||||
|
||||
val main : (Moonpool.Runner.t -> 'a) -> 'a
|
||||
@since 0.10 *)
|
||||
|
||||
val main : (Runner.t -> 'a) -> 'a
|
||||
(** [main f] runs [f()] in a scope that handles effects, including
|
||||
{!Fiber.await}.
|
||||
|
||||
This scope can run background tasks as well, in a cooperative fashion. *)
|
||||
|
||||
val main' : ?block_signals:bool -> unit -> (Moonpool.Runner.t -> 'a) -> 'a
|
||||
(** Same as {!main} but with room for optional arguments.
|
||||
@since 0.7 *)
|
||||
val main' : ?block_signals:bool -> unit -> (Runner.t -> 'a) -> 'a
|
||||
(** Same as {!main} but with room for optional arguments. *)
|
||||
|
|
@ -23,6 +23,7 @@ module Exn_bt = Exn_bt
|
|||
module Fifo_pool = Fifo_pool
|
||||
module Fut = Fut
|
||||
module Lock = Lock
|
||||
module Main = Main
|
||||
module Immediate_runner = struct end
|
||||
module Runner = Runner
|
||||
module Task_local_storage = Task_local_storage
|
||||
|
|
@ -30,6 +31,9 @@ module Thread_local_storage = Thread_local_storage
|
|||
module Trigger = Trigger
|
||||
module Ws_pool = Ws_pool
|
||||
|
||||
(* re-export main *)
|
||||
include Main
|
||||
|
||||
module Private = struct
|
||||
module Ws_deque_ = Ws_deque_
|
||||
module Worker_loop_ = Worker_loop_
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ module Fifo_pool = Fifo_pool
|
|||
module Background_thread = Background_thread
|
||||
module Runner = Runner
|
||||
module Trigger = Trigger
|
||||
module Main = Main
|
||||
|
||||
module Immediate_runner : sig end
|
||||
[@@deprecated "use Moonpool_fib.Main"]
|
||||
|
|
@ -80,7 +81,7 @@ val await : 'a Fut.t -> 'a
|
|||
val yield : unit -> unit
|
||||
(** Yield from the current task, must be run on a moonpool runner. Only on OCaml
|
||||
>= 5.0.
|
||||
@since NEXT_RELEASE *)
|
||||
@since 0.9 *)
|
||||
|
||||
module Lock = Lock
|
||||
module Fut = Fut
|
||||
|
|
@ -205,6 +206,10 @@ module Atomic = Atomic
|
|||
This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic]
|
||||
module on OCaml 5. *)
|
||||
|
||||
include module type of struct
|
||||
include Main
|
||||
end
|
||||
|
||||
(**/**)
|
||||
|
||||
(** Private internals, with no stability guarantees *)
|
||||
|
|
|
|||
|
|
@ -13,15 +13,11 @@ type task_full =
|
|||
}
|
||||
-> task_full
|
||||
|
||||
type around_task =
|
||||
| AT_pair : (Runner.t -> 'a) * (Runner.t -> 'a -> unit) -> around_task
|
||||
|
||||
exception No_more_tasks
|
||||
|
||||
type 'st ops = {
|
||||
schedule: 'st -> task_full -> unit;
|
||||
get_next_task: 'st -> task_full; (** @raise No_more_tasks *)
|
||||
around_task: 'st -> around_task;
|
||||
on_exn: 'st -> Exn_bt.t -> unit;
|
||||
runner: 'st -> Runner.t;
|
||||
before_start: 'st -> unit;
|
||||
|
|
@ -117,7 +113,6 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
|
|||
let state = ref New
|
||||
|
||||
let run_task (task : task_full) : unit =
|
||||
let (AT_pair (before_task, after_task)) = ops.around_task st in
|
||||
let fiber =
|
||||
match task with
|
||||
| T_start { fiber; _ } | T_resume { fiber; _ } -> fiber
|
||||
|
|
@ -125,7 +120,8 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
|
|||
|
||||
cur_fiber := fiber;
|
||||
TLS.set k_cur_fiber fiber;
|
||||
let _ctx = before_task runner in
|
||||
|
||||
(* let _ctx = before_task runner in *)
|
||||
|
||||
(* run the task now, catching errors, handling effects *)
|
||||
assert (task != _dummy_task);
|
||||
|
|
@ -140,8 +136,7 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
|
|||
let ebt = Exn_bt.make e bt in
|
||||
ops.on_exn st ebt);
|
||||
|
||||
after_task runner _ctx;
|
||||
|
||||
(* after_task runner _ctx; *)
|
||||
cur_fiber := _dummy_fiber;
|
||||
TLS.set k_cur_fiber _dummy_fiber
|
||||
|
||||
|
|
@ -149,22 +144,7 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
|
|||
if !state <> New then invalid_arg "worker_loop.setup: not a new instance";
|
||||
state := Ready;
|
||||
|
||||
if block_signals then (
|
||||
try
|
||||
ignore
|
||||
(Unix.sigprocmask SIG_BLOCK
|
||||
[
|
||||
Sys.sigterm;
|
||||
Sys.sigpipe;
|
||||
Sys.sigint;
|
||||
Sys.sigchld;
|
||||
Sys.sigalrm;
|
||||
Sys.sigusr1;
|
||||
Sys.sigusr2;
|
||||
]
|
||||
: _ list)
|
||||
with _ -> ()
|
||||
);
|
||||
if block_signals then Signals_.ignore_signals_ ();
|
||||
|
||||
TLS.set Runner.For_runner_implementors.k_cur_runner runner;
|
||||
|
||||
|
|
|
|||
|
|
@ -18,15 +18,11 @@ type task_full =
|
|||
|
||||
val _dummy_task : task_full
|
||||
|
||||
type around_task =
|
||||
| AT_pair : (Runner.t -> 'a) * (Runner.t -> 'a -> unit) -> around_task
|
||||
|
||||
exception No_more_tasks
|
||||
|
||||
type 'st ops = {
|
||||
schedule: 'st -> task_full -> unit;
|
||||
get_next_task: 'st -> task_full;
|
||||
around_task: 'st -> around_task;
|
||||
on_exn: 'st -> Exn_bt.t -> unit;
|
||||
runner: 'st -> Runner.t;
|
||||
before_start: 'st -> unit;
|
||||
|
|
|
|||
|
|
@ -28,7 +28,6 @@ type state = {
|
|||
cond: Condition.t;
|
||||
mutable as_runner: t;
|
||||
(* init options *)
|
||||
around_task: WL.around_task;
|
||||
name: string option;
|
||||
on_init_thread: dom_id:int -> t_id:int -> unit -> unit;
|
||||
on_exit_thread: dom_id:int -> t_id:int -> unit -> unit;
|
||||
|
|
@ -198,7 +197,6 @@ let cleanup (self : worker_state) : unit =
|
|||
|
||||
let worker_ops : worker_state WL.ops =
|
||||
let runner (st : worker_state) = st.st.as_runner in
|
||||
let around_task st = st.st.around_task in
|
||||
let on_exn (st : worker_state) (ebt : Exn_bt.t) =
|
||||
st.st.on_exn (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||
in
|
||||
|
|
@ -206,7 +204,6 @@ let worker_ops : worker_state WL.ops =
|
|||
WL.schedule = schedule_from_w;
|
||||
runner;
|
||||
get_next_task;
|
||||
around_task;
|
||||
on_exn;
|
||||
before_start;
|
||||
cleanup;
|
||||
|
|
@ -235,7 +232,6 @@ type ('a, 'b) create_args =
|
|||
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
|
||||
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
|
||||
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
|
||||
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
|
||||
?num_threads:int ->
|
||||
?name:string ->
|
||||
'a
|
||||
|
|
@ -243,15 +239,8 @@ type ('a, 'b) create_args =
|
|||
|
||||
let create ?(on_init_thread = default_thread_init_exit_)
|
||||
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
|
||||
?around_task ?num_threads ?name () : t =
|
||||
?num_threads ?name () : t =
|
||||
let pool_id_ = Id.create () in
|
||||
(* wrapper *)
|
||||
let around_task =
|
||||
match around_task with
|
||||
| Some (f, g) -> WL.AT_pair (f, g)
|
||||
| None -> WL.AT_pair (ignore, fun _ _ -> ())
|
||||
in
|
||||
|
||||
let num_domains = Domain_pool_.max_number_of_domains () in
|
||||
let num_threads = Util_pool_.num_threads ?num_threads () in
|
||||
|
||||
|
|
@ -268,7 +257,6 @@ let create ?(on_init_thread = default_thread_init_exit_)
|
|||
n_waiting_nonzero = true;
|
||||
mutex = Mutex.create ();
|
||||
cond = Condition.create ();
|
||||
around_task;
|
||||
on_exn;
|
||||
on_init_thread;
|
||||
on_exit_thread;
|
||||
|
|
@ -324,11 +312,9 @@ let create ?(on_init_thread = default_thread_init_exit_)
|
|||
|
||||
pool.as_runner
|
||||
|
||||
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
|
||||
?name () f =
|
||||
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?num_threads ?name () f =
|
||||
let pool =
|
||||
create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
|
||||
?name ()
|
||||
create ?on_init_thread ?on_exit_thread ?on_exn ?num_threads ?name ()
|
||||
in
|
||||
let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in
|
||||
f pool
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@ type ('a, 'b) create_args =
|
|||
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
|
||||
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
|
||||
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
|
||||
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
|
||||
?num_threads:int ->
|
||||
?name:string ->
|
||||
'a
|
||||
|
|
@ -40,11 +39,6 @@ val create : (unit -> t, _) create_args
|
|||
[Domain.recommended_domain_count()], ie one worker thread per CPU core. On
|
||||
OCaml 4 the default is [4] (since there is only one domain).
|
||||
@param on_exit_thread called at the end of each thread in the pool
|
||||
@param around_task
|
||||
a pair of [before, after], where [before pool] is called before a task is
|
||||
processed, on the worker thread about to run it, and returns [x]; and
|
||||
[after pool x] is called by the same thread after the task is over. (since
|
||||
0.2)
|
||||
@param name
|
||||
a name for this thread pool, used if tracing is enabled (since 0.6) *)
|
||||
|
||||
|
|
|
|||
|
|
@ -15,19 +15,23 @@ module Bb_queue = struct
|
|||
if was_empty then Condition.broadcast self.cond;
|
||||
Mutex.unlock self.mutex
|
||||
|
||||
let pop (self : 'a t) : 'a =
|
||||
Mutex.lock self.mutex;
|
||||
let rec loop () =
|
||||
if Queue.is_empty self.q then (
|
||||
Condition.wait self.cond self.mutex;
|
||||
(loop [@tailcall]) ()
|
||||
) else (
|
||||
let x = Queue.pop self.q in
|
||||
Mutex.unlock self.mutex;
|
||||
x
|
||||
)
|
||||
in
|
||||
loop ()
|
||||
let pop (type a) (self : a t) : a =
|
||||
let module M = struct
|
||||
exception Found of a
|
||||
end in
|
||||
try
|
||||
Mutex.lock self.mutex;
|
||||
while true do
|
||||
if Queue.is_empty self.q then
|
||||
Condition.wait self.cond self.mutex
|
||||
else (
|
||||
let x = Queue.pop self.q in
|
||||
Mutex.unlock self.mutex;
|
||||
raise (M.Found x)
|
||||
)
|
||||
done;
|
||||
assert false
|
||||
with M.Found x -> x
|
||||
end
|
||||
|
||||
module Lock = struct
|
||||
|
|
@ -38,13 +42,13 @@ module Lock = struct
|
|||
|
||||
let create content : _ t = { mutex = Mutex.create (); content }
|
||||
|
||||
let with_ (self : _ t) f =
|
||||
let[@inline never] with_ (self : _ t) f =
|
||||
Mutex.lock self.mutex;
|
||||
try
|
||||
let x = f self.content in
|
||||
match f self.content with
|
||||
| x ->
|
||||
Mutex.unlock self.mutex;
|
||||
x
|
||||
with e ->
|
||||
| exception e ->
|
||||
Mutex.unlock self.mutex;
|
||||
raise e
|
||||
|
||||
|
|
@ -95,6 +99,7 @@ let domains_ : (worker_state option * Domain_.t option) Lock.t array =
|
|||
a [Pool.with_] or [Pool.create() … Pool.shutdown()] in a tight loop), and
|
||||
if nothing happens it tries to stop to free resources. *)
|
||||
let work_ idx (st : worker_state) : unit =
|
||||
Signals_.ignore_signals_ ();
|
||||
let main_loop () =
|
||||
let continue = ref true in
|
||||
while !continue do
|
||||
|
|
@ -146,7 +151,7 @@ let work_ idx (st : worker_state) : unit =
|
|||
let () =
|
||||
assert (Domain_.is_main_domain ());
|
||||
let w = { th_count = Atomic.make 1; q = Bb_queue.create () } in
|
||||
(* thread that stays alive *)
|
||||
(* thread that stays alive since [th_count>0] will always hold *)
|
||||
ignore (Thread.create (fun () -> work_ 0 w) () : Thread.t);
|
||||
domains_.(0) <- Lock.create (Some w, None)
|
||||
|
||||
|
|
@ -154,7 +159,8 @@ let[@inline] max_number_of_domains () : int = Array.length domains_
|
|||
|
||||
let run_on (i : int) (f : unit -> unit) : unit =
|
||||
assert (i < Array.length domains_);
|
||||
let w =
|
||||
|
||||
let w : worker_state =
|
||||
Lock.update_map domains_.(i) (function
|
||||
| (Some w, _) as st ->
|
||||
Atomic.incr w.th_count;
|
||||
|
|
|
|||
|
|
@ -1,6 +0,0 @@
|
|||
(library
|
||||
(name moonpool_fib)
|
||||
(public_name moonpool.fib)
|
||||
(synopsis "Fibers and structured concurrency for Moonpool")
|
||||
(libraries moonpool picos)
|
||||
(flags :standard -open Moonpool_private -open Moonpool))
|
||||
334
src/fib/fiber.ml
334
src/fib/fiber.ml
|
|
@ -1,334 +0,0 @@
|
|||
open Moonpool.Private.Types_
|
||||
module A = Atomic
|
||||
module FM = Handle.Map
|
||||
module Int_map = Map.Make (Int)
|
||||
module PF = Picos.Fiber
|
||||
module FLS = Picos.Fiber.FLS
|
||||
|
||||
type 'a callback = 'a Exn_bt.result -> unit
|
||||
(** Callbacks that are called when a fiber is done. *)
|
||||
|
||||
type cancel_callback = Exn_bt.t -> unit
|
||||
|
||||
let prom_of_fut : 'a Fut.t -> 'a Fut.promise =
|
||||
Fut.Private_.unsafe_promise_of_fut
|
||||
|
||||
(* TODO: replace with picos structured at some point? *)
|
||||
module Private_ = struct
|
||||
type pfiber = PF.t
|
||||
|
||||
type 'a t = {
|
||||
id: Handle.t; (** unique identifier for this fiber *)
|
||||
state: 'a state A.t; (** Current state in the lifetime of the fiber *)
|
||||
res: 'a Fut.t;
|
||||
runner: Runner.t;
|
||||
pfiber: pfiber; (** Associated picos fiber *)
|
||||
}
|
||||
|
||||
and 'a state =
|
||||
| Alive of {
|
||||
children: children;
|
||||
on_cancel: cancel_callback Int_map.t;
|
||||
cancel_id: int;
|
||||
}
|
||||
| Terminating_or_done of 'a Exn_bt.result A.t
|
||||
|
||||
and children = any FM.t
|
||||
and any = Any : _ t -> any [@@unboxed]
|
||||
|
||||
(** Key to access the current moonpool.fiber. *)
|
||||
let k_current_fiber : any FLS.t = FLS.create ()
|
||||
|
||||
exception Not_set = FLS.Not_set
|
||||
|
||||
let[@inline] get_cur_from_exn (pfiber : pfiber) : any =
|
||||
FLS.get_exn pfiber k_current_fiber
|
||||
|
||||
let[@inline] get_cur_exn () : any =
|
||||
get_cur_from_exn @@ get_current_fiber_exn ()
|
||||
|
||||
let[@inline] get_cur_opt () = try Some (get_cur_exn ()) with _ -> None
|
||||
|
||||
let[@inline] is_closed (self : _ t) =
|
||||
match A.get self.state with
|
||||
| Alive _ -> false
|
||||
| Terminating_or_done _ -> true
|
||||
end
|
||||
|
||||
include Private_
|
||||
|
||||
let create_ ~pfiber ~runner ~res () : 'a t =
|
||||
let id = Handle.generate_fresh () in
|
||||
{
|
||||
state =
|
||||
A.make
|
||||
@@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 };
|
||||
id;
|
||||
res;
|
||||
runner;
|
||||
pfiber;
|
||||
}
|
||||
|
||||
let create_done_ ~res () : _ t =
|
||||
let id = Handle.generate_fresh () in
|
||||
{
|
||||
state =
|
||||
A.make
|
||||
@@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 };
|
||||
id;
|
||||
res;
|
||||
runner = Runner.dummy;
|
||||
pfiber = Moonpool.Private.Types_._dummy_fiber;
|
||||
}
|
||||
|
||||
let[@inline] return x = create_done_ ~res:(Fut.return x) ()
|
||||
let[@inline] fail ebt = create_done_ ~res:(Fut.fail_exn_bt ebt) ()
|
||||
let[@inline] res self = self.res
|
||||
let[@inline] peek self = Fut.peek self.res
|
||||
let[@inline] is_done self = Fut.is_done self.res
|
||||
let[@inline] is_success self = Fut.is_success self.res
|
||||
let[@inline] is_cancelled self = Fut.is_failed self.res
|
||||
let[@inline] on_result (self : _ t) f = Fut.on_result self.res f
|
||||
let[@inline] await self = Fut.await self.res
|
||||
let[@inline] wait_block self = Fut.wait_block self.res
|
||||
let[@inline] wait_block_exn self = Fut.wait_block_exn self.res
|
||||
|
||||
(** Resolve [promise] once [children] are all done *)
|
||||
let resolve_once_children_are_done_ ~children ~promise
|
||||
(res : 'a Exn_bt.result A.t) : unit =
|
||||
let n_children = FM.cardinal children in
|
||||
if n_children > 0 then (
|
||||
(* wait for all children to be done *)
|
||||
let n_waiting = A.make (FM.cardinal children) in
|
||||
let on_child_finish (r : _ result) =
|
||||
(* make sure the parent fails if any child fails *)
|
||||
(match r with
|
||||
| Ok _ -> ()
|
||||
| Error ebt -> A.set res (Error ebt));
|
||||
|
||||
(* if we're the last to finish, resolve the parent fiber's [res] *)
|
||||
if A.fetch_and_add n_waiting (-1) = 1 then (
|
||||
let res = A.get res in
|
||||
Fut.fulfill promise res
|
||||
)
|
||||
in
|
||||
FM.iter (fun _ (Any f) -> Fut.on_result f.res on_child_finish) children
|
||||
) else
|
||||
Fut.fulfill promise @@ A.get res
|
||||
|
||||
let rec resolve_as_failed_ : type a. a t -> Exn_bt.t -> unit =
|
||||
fun self ebt ->
|
||||
let promise = prom_of_fut self.res in
|
||||
while
|
||||
match A.get self.state with
|
||||
| Alive { children; cancel_id = _; on_cancel } as old ->
|
||||
let new_st = Terminating_or_done (A.make @@ Error ebt) in
|
||||
if A.compare_and_set self.state old new_st then (
|
||||
(* here, unlike in {!resolve_fiber}, we immediately cancel children *)
|
||||
cancel_children_ ~children ebt;
|
||||
Int_map.iter (fun _ cb -> cb ebt) on_cancel;
|
||||
resolve_once_children_are_done_ ~children ~promise (A.make @@ Error ebt);
|
||||
false
|
||||
) else
|
||||
true
|
||||
| Terminating_or_done _ -> false
|
||||
do
|
||||
()
|
||||
done
|
||||
|
||||
(** Cancel eagerly all children *)
|
||||
and cancel_children_ ebt ~children : unit =
|
||||
FM.iter (fun _ (Any f) -> resolve_as_failed_ f ebt) children
|
||||
|
||||
type cancel_handle = int
|
||||
|
||||
let add_on_cancel (self : _ t) cb : cancel_handle =
|
||||
let h = ref 0 in
|
||||
while
|
||||
match A.get self.state with
|
||||
| Alive { children; cancel_id; on_cancel } as old ->
|
||||
let new_st =
|
||||
Alive
|
||||
{
|
||||
children;
|
||||
cancel_id = cancel_id + 1;
|
||||
on_cancel = Int_map.add cancel_id cb on_cancel;
|
||||
}
|
||||
in
|
||||
if A.compare_and_set self.state old new_st then (
|
||||
h := cancel_id;
|
||||
false
|
||||
) else
|
||||
true
|
||||
| Terminating_or_done r ->
|
||||
(match A.get r with
|
||||
| Error ebt -> cb ebt
|
||||
| Ok _ -> ());
|
||||
false
|
||||
do
|
||||
()
|
||||
done;
|
||||
!h
|
||||
|
||||
let remove_on_cancel (self : _ t) h =
|
||||
while
|
||||
match A.get self.state with
|
||||
| Alive ({ on_cancel; _ } as alive) as old ->
|
||||
let new_st =
|
||||
Alive { alive with on_cancel = Int_map.remove h on_cancel }
|
||||
in
|
||||
not (A.compare_and_set self.state old new_st)
|
||||
| Terminating_or_done _ -> false
|
||||
do
|
||||
()
|
||||
done
|
||||
|
||||
let with_on_cancel (self : _ t) cb (k : unit -> 'a) : 'a =
|
||||
let h = add_on_cancel self cb in
|
||||
Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
|
||||
|
||||
(** Successfully resolve the fiber. This might still fail if some children
|
||||
failed. *)
|
||||
let resolve_ok_ (self : 'a t) (r : 'a) : unit =
|
||||
let r = A.make @@ Ok r in
|
||||
let promise = prom_of_fut self.res in
|
||||
while
|
||||
match A.get self.state with
|
||||
| Alive { children; _ } as old ->
|
||||
let new_st = Terminating_or_done r in
|
||||
if A.compare_and_set self.state old new_st then (
|
||||
resolve_once_children_are_done_ ~children ~promise r;
|
||||
false
|
||||
) else
|
||||
true
|
||||
| Terminating_or_done _ -> false
|
||||
do
|
||||
()
|
||||
done
|
||||
|
||||
let remove_child_ (self : _ t) (child : _ t) =
|
||||
while
|
||||
match A.get self.state with
|
||||
| Alive ({ children; _ } as alive) as old ->
|
||||
let new_st =
|
||||
Alive { alive with children = FM.remove child.id children }
|
||||
in
|
||||
not (A.compare_and_set self.state old new_st)
|
||||
| _ -> false
|
||||
do
|
||||
()
|
||||
done
|
||||
|
||||
(** Add a child to [self].
|
||||
@param protected if true, the child's failure will not affect [self]. *)
|
||||
let add_child_ ~protect (self : _ t) (child : _ t) =
|
||||
while
|
||||
match A.get self.state with
|
||||
| Alive ({ children; _ } as alive) as old ->
|
||||
let new_st =
|
||||
Alive { alive with children = FM.add child.id (Any child) children }
|
||||
in
|
||||
|
||||
if A.compare_and_set self.state old new_st then (
|
||||
(* make sure to remove [child] from [self.children] once it's done;
|
||||
fail [self] is [child] failed and [protect=false] *)
|
||||
Fut.on_result child.res (function
|
||||
| Ok _ -> remove_child_ self child
|
||||
| Error ebt ->
|
||||
(* child failed, we must fail too *)
|
||||
remove_child_ self child;
|
||||
if not protect then resolve_as_failed_ self ebt);
|
||||
false
|
||||
) else
|
||||
true
|
||||
| Terminating_or_done r ->
|
||||
(match A.get r with
|
||||
| Error ebt ->
|
||||
(* cancel child immediately *)
|
||||
resolve_as_failed_ child ebt
|
||||
| Ok _ -> ());
|
||||
false
|
||||
do
|
||||
()
|
||||
done
|
||||
|
||||
let spawn_ ~parent ~runner (f : unit -> 'a) : 'a t =
|
||||
let res, _ = Fut.make () in
|
||||
let pfiber = PF.create ~forbid:false (Fut.Private_.as_computation res) in
|
||||
|
||||
(* copy local hmap from parent, if present *)
|
||||
Option.iter
|
||||
(fun (p : _ t) -> Fls.Private_hmap_ls_.copy_fls p.pfiber pfiber)
|
||||
parent;
|
||||
|
||||
(match parent with
|
||||
| Some p when is_closed p -> failwith "spawn: nursery is closed"
|
||||
| _ -> ());
|
||||
let fib = create_ ~pfiber ~runner ~res () in
|
||||
|
||||
let run () =
|
||||
(* make sure the fiber is accessible from inside itself *)
|
||||
FLS.set pfiber k_current_fiber (Any fib);
|
||||
try
|
||||
let res = f () in
|
||||
resolve_ok_ fib res
|
||||
with exn ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
let ebt = Exn_bt.make exn bt in
|
||||
resolve_as_failed_ fib ebt
|
||||
in
|
||||
|
||||
Runner.run_async ~fiber:pfiber runner run;
|
||||
|
||||
fib
|
||||
|
||||
let spawn_top ~on f : _ t = spawn_ ~runner:on ~parent:None f
|
||||
|
||||
let spawn ?on ?(protect = true) f : _ t =
|
||||
(* spawn [f()] with a copy of our local storage *)
|
||||
let (Any p) =
|
||||
try get_cur_exn ()
|
||||
with Not_set ->
|
||||
failwith "Fiber.spawn: must be run from within another fiber."
|
||||
in
|
||||
|
||||
let runner =
|
||||
match on with
|
||||
| Some r -> r
|
||||
| None -> p.runner
|
||||
in
|
||||
let child = spawn_ ~parent:(Some p) ~runner f in
|
||||
add_child_ ~protect p child;
|
||||
child
|
||||
|
||||
let[@inline] spawn_ignore ?on ?protect f : unit =
|
||||
ignore (spawn ?on ?protect f : _ t)
|
||||
|
||||
let[@inline] spawn_top_ignore ~on f : unit = ignore (spawn_top ~on f : _ t)
|
||||
|
||||
let[@inline] self () : any =
|
||||
match get_cur_exn () with
|
||||
| exception Not_set -> failwith "Fiber.self: must be run from inside a fiber."
|
||||
| f -> f
|
||||
|
||||
let with_on_self_cancel cb (k : unit -> 'a) : 'a =
|
||||
let (Any self) = self () in
|
||||
let h = add_on_cancel self cb in
|
||||
Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
|
||||
|
||||
let[@inline] check_if_cancelled_ (self : _ t) = PF.check self.pfiber
|
||||
|
||||
let check_if_cancelled () =
|
||||
match get_cur_exn () with
|
||||
| exception Not_set ->
|
||||
failwith "Fiber.check_if_cancelled: must be run from inside a fiber."
|
||||
| Any self -> check_if_cancelled_ self
|
||||
|
||||
let yield () : unit =
|
||||
match get_cur_exn () with
|
||||
| exception Not_set ->
|
||||
failwith "Fiber.yield: must be run from inside a fiber."
|
||||
| Any self ->
|
||||
check_if_cancelled_ self;
|
||||
PF.yield ();
|
||||
check_if_cancelled_ self
|
||||
|
|
@ -1,150 +0,0 @@
|
|||
(** Fibers.
|
||||
|
||||
A fiber is a lightweight computation that runs cooperatively alongside other
|
||||
fibers. In the context of moonpool, fibers have additional properties:
|
||||
|
||||
- they run in a moonpool runner
|
||||
- they form a simple supervision tree, enabling a limited form of structured
|
||||
concurrency *)
|
||||
|
||||
type cancel_callback = Exn_bt.t -> unit
|
||||
(** A callback used in case of cancellation *)
|
||||
|
||||
(**/**)
|
||||
|
||||
(** Do not rely on this, it is internal implementation details. *)
|
||||
module Private_ : sig
|
||||
type 'a state
|
||||
type pfiber
|
||||
|
||||
type 'a t = private {
|
||||
id: Handle.t; (** unique identifier for this fiber *)
|
||||
state: 'a state Atomic.t; (** Current state in the lifetime of the fiber *)
|
||||
res: 'a Fut.t;
|
||||
runner: Runner.t;
|
||||
pfiber: pfiber;
|
||||
}
|
||||
(** Type definition, exposed so that {!any} can be unboxed. Please do not rely
|
||||
on that. *)
|
||||
|
||||
type any = Any : _ t -> any [@@unboxed]
|
||||
|
||||
exception Not_set
|
||||
|
||||
val get_cur_exn : unit -> any
|
||||
(** [get_cur_exn ()] either returns the current fiber, or
|
||||
@raise Not_set if run outside a fiber. *)
|
||||
|
||||
val get_cur_opt : unit -> any option
|
||||
end
|
||||
|
||||
(**/**)
|
||||
|
||||
type 'a t = 'a Private_.t
|
||||
(** A fiber returning a value of type ['a]. *)
|
||||
|
||||
val res : 'a t -> 'a Fut.t
|
||||
(** Future result of the fiber. *)
|
||||
|
||||
type 'a callback = 'a Exn_bt.result -> unit
|
||||
(** Callbacks that are called when a fiber is done. *)
|
||||
|
||||
(** Type erased fiber *)
|
||||
type any = Private_.any = Any : _ t -> any [@@unboxed]
|
||||
|
||||
val return : 'a -> 'a t
|
||||
val fail : Exn_bt.t -> _ t
|
||||
|
||||
val self : unit -> any
|
||||
(** [self ()] is the current fiber. Must be run from inside a fiber.
|
||||
@raise Failure if not run from inside a fiber. *)
|
||||
|
||||
val peek : 'a t -> 'a Fut.or_error option
|
||||
(** Peek inside the future result *)
|
||||
|
||||
val is_done : _ t -> bool
|
||||
(** Has the fiber completed? *)
|
||||
|
||||
val is_cancelled : _ t -> bool
|
||||
(** Has the fiber completed with a failure? *)
|
||||
|
||||
val is_success : _ t -> bool
|
||||
(** Has the fiber completed with a value? *)
|
||||
|
||||
val await : 'a t -> 'a
|
||||
(** [await fib] is like [Fut.await (res fib)] *)
|
||||
|
||||
val wait_block_exn : 'a t -> 'a
|
||||
(** [wait_block_exn fib] is [Fut.wait_block_exn (res fib)]. {b NOTE}: See
|
||||
{!Fut.wait_block} for warnings about deadlocks. *)
|
||||
|
||||
val wait_block : 'a t -> 'a Fut.or_error
|
||||
(** [wait_block fib] is [Fut.wait_block (res fib)]. {b NOTE}: See
|
||||
{!Fut.wait_block} for warnings about deadlocks. *)
|
||||
|
||||
val check_if_cancelled : unit -> unit
|
||||
(** Check if the current fiber is cancelled, in which case this raises. Must be
|
||||
run from inside a fiber.
|
||||
@raise e if the current fiber is cancelled with exception [e]
|
||||
@raise Failure if not run from a fiber. *)
|
||||
|
||||
val yield : unit -> unit
|
||||
(** Yield control to the scheduler from the current fiber.
|
||||
@raise Failure if not run from inside a fiber. *)
|
||||
|
||||
type cancel_handle
|
||||
(** An opaque handle for a single cancel callback in a fiber *)
|
||||
|
||||
val add_on_cancel : _ t -> cancel_callback -> cancel_handle
|
||||
(** [add_on_cancel fib cb] adds [cb] to the list of cancel callbacks for [fib].
|
||||
If [fib] is already cancelled, [cb] is called immediately. *)
|
||||
|
||||
val remove_on_cancel : _ t -> cancel_handle -> unit
|
||||
(** [remove_on_cancel fib h] removes the cancel callback associated with handle
|
||||
[h]. *)
|
||||
|
||||
val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'a
|
||||
(** [with_on_cancel fib cb (fun () -> <e>)] evaluates [e] in a scope in which,
|
||||
if the fiber [fib] is cancelled, [cb()] is called. If [e] returns without
|
||||
the fiber being cancelled, this callback is removed. *)
|
||||
|
||||
val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'a
|
||||
(** [with_on_self_cancel cb f] calls [f()] in a scope where [cb] is added to the
|
||||
cancel callbacks of the current fiber; and [f()] terminates, [cb] is removed
|
||||
from the list. *)
|
||||
|
||||
val on_result : 'a t -> 'a callback -> unit
|
||||
(** Wait for fiber to be done and call the callback with the result. If the
|
||||
fiber is done already then the callback is invoked immediately with its
|
||||
result. *)
|
||||
|
||||
val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t
|
||||
(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. This
|
||||
fiber is not the child of any other fiber: its lifetime is only determined
|
||||
by the lifetime of [f()]. *)
|
||||
|
||||
val spawn : ?on:Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a t
|
||||
(** [spawn ~protect f] spawns a sub-fiber [f_child] from a running fiber
|
||||
[parent]. The sub-fiber [f_child] is attached to the current fiber and fails
|
||||
if the current fiber [parent] fails.
|
||||
|
||||
@param on
|
||||
if provided, start the fiber on the given runner. If not provided, use the
|
||||
parent's runner.
|
||||
@param protect
|
||||
if true, when [f_child] fails, it does not affect [parent]. If false,
|
||||
[f_child] failing also causes [parent] to fail (and therefore all other
|
||||
children of [parent]). Default is [true].
|
||||
|
||||
Must be run from inside a fiber.
|
||||
@raise Failure if not run from inside a fiber. *)
|
||||
|
||||
val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit
|
||||
(** [spawn_ignore f] is [ignore (spawn f)]. The fiber will still affect
|
||||
termination of the parent, ie. the parent will exit only after this new
|
||||
fiber exits.
|
||||
@param on the optional runner to use, added since 0.7 *)
|
||||
|
||||
val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit
|
||||
(** Like {!spawn_top} but ignores the result.
|
||||
@since 0.7 *)
|
||||
|
|
@ -1 +0,0 @@
|
|||
include Task_local_storage
|
||||
|
|
@ -1,17 +0,0 @@
|
|||
(** Fiber-local storage.
|
||||
|
||||
This storage is associated to the current fiber, just like thread-local
|
||||
storage is associated with the current thread.
|
||||
|
||||
See {!Moonpool.Task_local_storage} for more general information, as this is
|
||||
based on it.
|
||||
|
||||
{b NOTE}: it's important to note that, while each fiber has its own storage,
|
||||
spawning a sub-fiber [f2] from a fiber [f1] will only do a shallow copy of
|
||||
the storage. Values inside [f1]'s storage will be physically shared with
|
||||
[f2]. It is thus recommended to store only persistent values in the local
|
||||
storage. *)
|
||||
|
||||
include module type of struct
|
||||
include Task_local_storage
|
||||
end
|
||||
|
|
@ -1,14 +0,0 @@
|
|||
module A = Atomic
|
||||
|
||||
type t = int
|
||||
|
||||
let counter_ = A.make 0
|
||||
let equal : t -> t -> bool = ( = )
|
||||
let compare : t -> t -> int = Stdlib.compare
|
||||
let[@inline] generate_fresh () = A.fetch_and_add counter_ 1
|
||||
|
||||
(* TODO: better hash *)
|
||||
let[@inline] hash x = x land max_int
|
||||
|
||||
module Set = Set.Make (Int)
|
||||
module Map = Map.Make (Int)
|
||||
|
|
@ -1,17 +0,0 @@
|
|||
(** The unique name of a fiber.
|
||||
|
||||
Each fiber has a unique handle that can be used to refer to it in maps or
|
||||
sets. *)
|
||||
|
||||
type t = private int
|
||||
(** Unique, opaque identifier for a fiber. *)
|
||||
|
||||
val equal : t -> t -> bool
|
||||
val compare : t -> t -> int
|
||||
val hash : t -> int
|
||||
|
||||
val generate_fresh : unit -> t
|
||||
(** Generate a fresh, unique identifier *)
|
||||
|
||||
module Set : Set.S with type elt = t
|
||||
module Map : Map.S with type key = t
|
||||
|
|
@ -1,12 +0,0 @@
|
|||
(** Fibers for moonpool.
|
||||
|
||||
See {!Fiber} for the most important explanations.
|
||||
|
||||
@since 0.6. *)
|
||||
|
||||
module Fiber = Fiber
|
||||
module Fls = Fls
|
||||
module Handle = Handle
|
||||
module Main = Main
|
||||
include Fiber
|
||||
include Main
|
||||
|
|
@ -5,7 +5,6 @@
|
|||
(>= %{ocaml_version} 5.0))
|
||||
(libraries
|
||||
(re_export moonpool)
|
||||
moonpool.fib
|
||||
picos
|
||||
(re_export lwt)
|
||||
lwt.unix))
|
||||
|
|
|
|||
|
|
@ -7,8 +7,6 @@ end
|
|||
|
||||
module Fut = Moonpool.Fut
|
||||
|
||||
let default_around_task_ : WL.around_task = AT_pair (ignore, fun _ _ -> ())
|
||||
|
||||
let on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref =
|
||||
ref (fun ebt ->
|
||||
Printf.eprintf "uncaught exception in moonpool-lwt:\n%s" (Exn_bt.show ebt))
|
||||
|
|
@ -90,8 +88,6 @@ end
|
|||
module Ops = struct
|
||||
type st = Scheduler_state.st
|
||||
|
||||
let around_task _ = default_around_task_
|
||||
|
||||
let schedule (self : st) t =
|
||||
if Atomic.get self.closed then
|
||||
failwith "moonpool-lwt.schedule: scheduler is closed";
|
||||
|
|
@ -122,15 +118,7 @@ module Ops = struct
|
|||
()
|
||||
|
||||
let ops : st WL.ops =
|
||||
{
|
||||
schedule;
|
||||
around_task;
|
||||
get_next_task;
|
||||
on_exn;
|
||||
runner;
|
||||
before_start;
|
||||
cleanup;
|
||||
}
|
||||
{ schedule; get_next_task; on_exn; runner; before_start; cleanup }
|
||||
|
||||
let setup st =
|
||||
if Atomic.compare_and_set Scheduler_state.cur_st None (Some st) then
|
||||
|
|
@ -289,7 +277,7 @@ let[@inline] is_setup () = Option.is_some @@ Atomic.get Scheduler_state.cur_st
|
|||
let spawn_lwt f : _ Lwt.t =
|
||||
let st = Main_state.get_st () in
|
||||
let lwt_fut, lwt_prom = Lwt.wait () in
|
||||
Moonpool_fib.spawn_top_ignore ~on:st.as_runner (fun () ->
|
||||
M.run_async st.as_runner (fun () ->
|
||||
try
|
||||
let x = f () in
|
||||
Lwt.wakeup lwt_prom x
|
||||
|
|
|
|||
15
src/private/signals_.ml
Normal file
15
src/private/signals_.ml
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
let ignore_signals_ () =
|
||||
try
|
||||
Thread.sigmask SIG_BLOCK
|
||||
[
|
||||
Sys.sigpipe;
|
||||
Sys.sigbus;
|
||||
Sys.sigterm;
|
||||
Sys.sigchld;
|
||||
Sys.sigalrm;
|
||||
Sys.sigint;
|
||||
Sys.sigusr1;
|
||||
Sys.sigusr2;
|
||||
]
|
||||
|> ignore
|
||||
with _ -> ()
|
||||
|
|
@ -6,7 +6,6 @@
|
|||
(libraries
|
||||
t_fibers
|
||||
moonpool
|
||||
moonpool.fib
|
||||
trace
|
||||
trace-tef
|
||||
qcheck-core
|
||||
|
|
|
|||
|
|
@ -2,4 +2,4 @@
|
|||
(name t_fibers)
|
||||
(package moonpool)
|
||||
(optional)
|
||||
(libraries moonpool moonpool.fib trace qcheck-core hmap))
|
||||
(libraries moonpool trace qcheck-core hmap))
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
open! Moonpool
|
||||
module Chan = Moonpool.Chan
|
||||
module Exn_bt = Moonpool.Exn_bt
|
||||
module A = Atomic
|
||||
module F = Moonpool_fib.Fiber
|
||||
module Fut = Moonpool.Fut
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
|
||||
|
|
@ -49,24 +50,23 @@ let logf = Log_.logf
|
|||
let run1 ~runner () =
|
||||
Printf.printf "============\nstart\n%!";
|
||||
let clock = ref TS.init in
|
||||
let fib =
|
||||
F.spawn_top ~on:runner @@ fun () ->
|
||||
let fut =
|
||||
Fut.spawn ~on:runner @@ fun () ->
|
||||
let chan_progress = Chan.create ~max_size:4 () in
|
||||
let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in
|
||||
|
||||
let subs =
|
||||
List.init 5 (fun i ->
|
||||
F.spawn ~protect:false @@ fun _n ->
|
||||
Fut.spawn ~on:runner @@ fun _n ->
|
||||
Thread.delay (float i *. 0.01);
|
||||
Chan.pop chans.(i);
|
||||
Chan.push chan_progress i;
|
||||
F.check_if_cancelled ();
|
||||
i)
|
||||
in
|
||||
|
||||
logf (TS.tick_get clock) "wait for subs";
|
||||
|
||||
F.spawn_ignore (fun () ->
|
||||
Moonpool.run_async runner (fun () ->
|
||||
for i = 0 to 4 do
|
||||
Chan.push chans.(i) ();
|
||||
let i' = Chan.pop chan_progress in
|
||||
|
|
@ -78,19 +78,15 @@ let run1 ~runner () =
|
|||
(fun i f ->
|
||||
let clock = ref (0 :: i :: clock0) in
|
||||
logf !clock "await fiber %d" i;
|
||||
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
|
||||
(Option.is_some @@ F.Private_.get_cur_opt ());
|
||||
let res = F.await f in
|
||||
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
|
||||
(Option.is_some @@ F.Private_.get_cur_opt ());
|
||||
F.yield ();
|
||||
let res = Fut.await f in
|
||||
Fut.yield ();
|
||||
logf (TS.tick_get clock) "res %d = %d" i res)
|
||||
subs);
|
||||
|
||||
logf (TS.tick_get clock) "main fiber done"
|
||||
in
|
||||
|
||||
Fut.await @@ F.res fib;
|
||||
Fut.await fut;
|
||||
logf (TS.tick_get clock) "main fiber exited";
|
||||
Log_.print_and_clear ();
|
||||
()
|
||||
|
|
@ -99,15 +95,11 @@ let run2 ~runner () =
|
|||
(* same but now, cancel one of the sub-fibers *)
|
||||
Printf.printf "============\nstart\n";
|
||||
|
||||
let clock = ref TS.init in
|
||||
let fib =
|
||||
F.spawn_top ~on:runner @@ fun () ->
|
||||
let@ () =
|
||||
F.with_on_self_cancel (fun ebt ->
|
||||
logf (TS.tick_get clock) "main fiber cancelled with %s"
|
||||
@@ Exn_bt.show ebt)
|
||||
in
|
||||
let to_await = ref [] in
|
||||
|
||||
let clock = ref TS.init in
|
||||
let fut =
|
||||
Fut.spawn ~on:runner @@ fun () ->
|
||||
let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in
|
||||
let chan_progress = Chan.create ~max_size:4 () in
|
||||
|
||||
|
|
@ -116,11 +108,7 @@ let run2 ~runner () =
|
|||
let clock0 = !clock in
|
||||
List.init 10 (fun i ->
|
||||
let clock = ref (0 :: i :: clock0) in
|
||||
F.spawn ~protect:false @@ fun _n ->
|
||||
let@ () =
|
||||
F.with_on_self_cancel (fun _ ->
|
||||
logf (TS.tick_get clock) "sub-fiber %d was cancelled" i)
|
||||
in
|
||||
Fut.spawn ~on:runner @@ fun _n ->
|
||||
Thread.delay 0.002;
|
||||
|
||||
(* sync for determinism *)
|
||||
|
|
@ -132,46 +120,51 @@ let run2 ~runner () =
|
|||
failwith "oh no!"
|
||||
);
|
||||
|
||||
F.check_if_cancelled ();
|
||||
i)
|
||||
in
|
||||
|
||||
let post = TS.tick_get clock in
|
||||
List.iteri
|
||||
(fun i fib ->
|
||||
F.on_result fib (function
|
||||
Fut.on_result fib (function
|
||||
| Ok _ -> logf (i :: post) "fiber %d resolved as ok" i
|
||||
| Error _ -> logf (i :: post) "fiber %d resolved as error" i))
|
||||
subs;
|
||||
|
||||
(* sequentialize the fibers, for determinism *)
|
||||
F.spawn_ignore (fun () ->
|
||||
for j = 0 to 9 do
|
||||
Chan.push chans_unblock.(j) ();
|
||||
let j' = Chan.pop chan_progress in
|
||||
assert (j = j')
|
||||
done);
|
||||
let sender =
|
||||
Fut.spawn ~on:runner (fun () ->
|
||||
for j = 0 to 9 do
|
||||
Chan.push chans_unblock.(j) ();
|
||||
let j' = Chan.pop chan_progress in
|
||||
assert (j = j')
|
||||
done)
|
||||
in
|
||||
to_await := sender :: !to_await;
|
||||
|
||||
logf (TS.tick_get clock) "wait for subs";
|
||||
List.iteri
|
||||
(fun i f ->
|
||||
logf (TS.tick_get clock) "await fiber %d" i;
|
||||
let res = F.await f in
|
||||
let res = Fut.await f in
|
||||
logf (TS.tick_get clock) "res %d = %d" i res)
|
||||
subs;
|
||||
logf (TS.tick_get clock) "yield";
|
||||
F.yield ();
|
||||
Fut.yield ();
|
||||
logf (TS.tick_get clock) "yielded";
|
||||
logf (TS.tick_get clock) "main fiber done"
|
||||
in
|
||||
|
||||
F.on_result fib (function
|
||||
Fut.on_result fut (function
|
||||
| Ok () -> logf (TS.tick_get clock) "main fiber result: ok"
|
||||
| Error ebt ->
|
||||
logf (TS.tick_get clock) "main fiber result: error %s" (Exn_bt.show ebt));
|
||||
|
||||
(try Fut.await @@ F.res fib
|
||||
(try Fut.await fut
|
||||
with Failure msg -> logf (TS.tick_get clock) "main fib failed with %S" msg);
|
||||
logf (TS.tick_get clock) "main fiber exited";
|
||||
|
||||
List.iter Fut.await !to_await;
|
||||
|
||||
Log_.print_and_clear ();
|
||||
()
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
open! Moonpool
|
||||
module A = Atomic
|
||||
module F = Moonpool_fib.Fiber
|
||||
module FLS = Moonpool_fib.Fls
|
||||
module F = Moonpool.Fut
|
||||
module FLS = Moonpool.Task_local_storage
|
||||
|
||||
(* ### dummy little tracing system with local storage *)
|
||||
|
||||
|
|
@ -122,7 +122,7 @@ let run ~pool ~pool_name () =
|
|||
|
||||
let subs =
|
||||
List.init 2 (fun idx_sub_sub ->
|
||||
F.spawn ~protect:true (fun () ->
|
||||
F.spawn ~on:pool (fun () ->
|
||||
sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ()))
|
||||
in
|
||||
List.iter F.await subs
|
||||
|
|
@ -133,8 +133,7 @@ let run ~pool ~pool_name () =
|
|||
|
||||
let subs =
|
||||
List.init 2 (fun k ->
|
||||
F.spawn ~protect:true @@ fun () ->
|
||||
sub_child ~idx ~idx_child ~idx_sub:k ())
|
||||
F.spawn ~on:pool @@ fun () -> sub_child ~idx ~idx_child ~idx_sub:k ())
|
||||
in
|
||||
|
||||
let@ () =
|
||||
|
|
@ -149,16 +148,14 @@ let run ~pool ~pool_name () =
|
|||
|
||||
let subs =
|
||||
List.init 5 (fun j ->
|
||||
F.spawn ~protect:true @@ fun () -> top_child ~idx ~idx_child:j ())
|
||||
F.spawn ~on:pool @@ fun () -> top_child ~idx ~idx_child:j ())
|
||||
in
|
||||
|
||||
List.iter F.await subs
|
||||
in
|
||||
|
||||
Printf.printf "run test on pool = %s\n" pool_name;
|
||||
let fibs =
|
||||
List.init 8 (fun idx -> F.spawn_top ~on:pool (fun () -> top idx))
|
||||
in
|
||||
let fibs = List.init 8 (fun idx -> F.spawn ~on:pool (fun () -> top idx)) in
|
||||
List.iter F.await fibs;
|
||||
|
||||
Printf.printf "tracing complete\n";
|
||||
|
|
|
|||
|
|
@ -2,33 +2,21 @@
|
|||
start
|
||||
1: wait for subs
|
||||
1.0.0: await fiber 0
|
||||
1.0.1: cur fiber[0] is some: true
|
||||
1.0.2: cur fiber[0] is some: true
|
||||
1.0.3: res 0 = 0
|
||||
1.0.1: res 0 = 0
|
||||
1.1.0: await fiber 1
|
||||
1.1.1: cur fiber[1] is some: true
|
||||
1.1.2: cur fiber[1] is some: true
|
||||
1.1.3: res 1 = 1
|
||||
1.1.1: res 1 = 1
|
||||
1.2.0: await fiber 2
|
||||
1.2.1: cur fiber[2] is some: true
|
||||
1.2.2: cur fiber[2] is some: true
|
||||
1.2.3: res 2 = 2
|
||||
1.2.1: res 2 = 2
|
||||
1.3.0: await fiber 3
|
||||
1.3.1: cur fiber[3] is some: true
|
||||
1.3.2: cur fiber[3] is some: true
|
||||
1.3.3: res 3 = 3
|
||||
1.3.1: res 3 = 3
|
||||
1.4.0: await fiber 4
|
||||
1.4.1: cur fiber[4] is some: true
|
||||
1.4.2: cur fiber[4] is some: true
|
||||
1.4.3: res 4 = 4
|
||||
1.4.1: res 4 = 4
|
||||
2: main fiber done
|
||||
3: main fiber exited
|
||||
============
|
||||
start
|
||||
1: start fibers
|
||||
1.7.1: I'm fiber 7 and I'm about to fail…
|
||||
1.8.1: sub-fiber 8 was cancelled
|
||||
1.9.1: sub-fiber 9 was cancelled
|
||||
2.0: fiber 0 resolved as ok
|
||||
2.1: fiber 1 resolved as ok
|
||||
2.2: fiber 2 resolved as ok
|
||||
|
|
@ -37,8 +25,8 @@ start
|
|||
2.5: fiber 5 resolved as ok
|
||||
2.6: fiber 6 resolved as ok
|
||||
2.7: fiber 7 resolved as error
|
||||
2.8: fiber 8 resolved as error
|
||||
2.9: fiber 9 resolved as error
|
||||
2.8: fiber 8 resolved as ok
|
||||
2.9: fiber 9 resolved as ok
|
||||
3: wait for subs
|
||||
4: await fiber 0
|
||||
5: res 0 = 0
|
||||
|
|
@ -55,7 +43,6 @@ start
|
|||
16: await fiber 6
|
||||
17: res 6 = 6
|
||||
18: await fiber 7
|
||||
19: main fiber cancelled with Failure("oh no!")
|
||||
20: main fiber result: error Failure("oh no!")
|
||||
21: main fib failed with "oh no!"
|
||||
22: main fiber exited
|
||||
19: main fiber result: error Failure("oh no!")
|
||||
20: main fib failed with "oh no!"
|
||||
21: main fiber exited
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
let ( let@ ) = ( @@ )
|
||||
|
||||
let () =
|
||||
let@ runner = Moonpool_fib.main in
|
||||
let@ runner = Moonpool.main in
|
||||
T_fibers.Fib.run1 ~runner ();
|
||||
T_fibers.Fib.run2 ~runner ()
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -3,7 +3,7 @@ open! Moonpool
|
|||
let ( let@ ) = ( @@ )
|
||||
|
||||
let () =
|
||||
let@ _ = Moonpool_fib.main in
|
||||
let@ _ = Moonpool.main in
|
||||
(let@ pool = Ws_pool.with_ () in
|
||||
T_fibers.Fls.run ~pool ~pool_name:"ws_pool" ());
|
||||
|
||||
|
|
|
|||
|
|
@ -1,14 +1,14 @@
|
|||
open Moonpool
|
||||
module F = Moonpool_fib
|
||||
module F = Moonpool.Fut
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
|
||||
let () =
|
||||
let r =
|
||||
F.main @@ fun runner ->
|
||||
let f1 = F.spawn (fun () -> 1) in
|
||||
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
|
||||
let f3 = F.spawn (fun () -> F.await f1 + 10) in
|
||||
Moonpool.main @@ fun runner ->
|
||||
let f1 = F.spawn ~on:runner (fun () -> 1) in
|
||||
let f2 = F.spawn ~on:runner (fun () -> 2) in
|
||||
let f3 = F.spawn ~on:runner (fun () -> F.await f1 + 10) in
|
||||
let r = F.await f2 + F.await f3 in
|
||||
assert (r = 13);
|
||||
r
|
||||
|
|
@ -19,10 +19,10 @@ let () =
|
|||
(* run fibers in the background, await them in the main thread *)
|
||||
let@ bg = Fifo_pool.with_ ~num_threads:4 () in
|
||||
let r =
|
||||
F.main @@ fun runner ->
|
||||
let f1 = F.spawn_top ~on:bg (fun () -> 1) in
|
||||
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
|
||||
let f3 = F.spawn_top ~on:bg (fun () -> F.await f1 + 10) in
|
||||
Moonpool.main @@ fun runner ->
|
||||
let f1 = F.spawn ~on:bg (fun () -> 1) in
|
||||
let f2 = F.spawn ~on:runner (fun () -> 2) in
|
||||
let f3 = F.spawn ~on:bg (fun () -> F.await f1 + 10) in
|
||||
let r = F.await f2 + F.await f3 in
|
||||
assert (r = 13);
|
||||
r
|
||||
|
|
@ -32,8 +32,8 @@ let () =
|
|||
let () =
|
||||
try
|
||||
let _r =
|
||||
F.main @@ fun _r ->
|
||||
let fib = F.spawn (fun () -> failwith "oops") in
|
||||
Moonpool.main @@ fun runner ->
|
||||
let fib = F.spawn ~on:runner (fun () -> failwith "oops") in
|
||||
F.await fib
|
||||
in
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@
|
|||
(libraries
|
||||
t_fibers
|
||||
moonpool
|
||||
moonpool.fib
|
||||
moonpool-lwt
|
||||
hmap
|
||||
trace
|
||||
|
|
|
|||
|
|
@ -2,33 +2,21 @@
|
|||
start
|
||||
1: wait for subs
|
||||
1.0.0: await fiber 0
|
||||
1.0.1: cur fiber[0] is some: true
|
||||
1.0.2: cur fiber[0] is some: true
|
||||
1.0.3: res 0 = 0
|
||||
1.0.1: res 0 = 0
|
||||
1.1.0: await fiber 1
|
||||
1.1.1: cur fiber[1] is some: true
|
||||
1.1.2: cur fiber[1] is some: true
|
||||
1.1.3: res 1 = 1
|
||||
1.1.1: res 1 = 1
|
||||
1.2.0: await fiber 2
|
||||
1.2.1: cur fiber[2] is some: true
|
||||
1.2.2: cur fiber[2] is some: true
|
||||
1.2.3: res 2 = 2
|
||||
1.2.1: res 2 = 2
|
||||
1.3.0: await fiber 3
|
||||
1.3.1: cur fiber[3] is some: true
|
||||
1.3.2: cur fiber[3] is some: true
|
||||
1.3.3: res 3 = 3
|
||||
1.3.1: res 3 = 3
|
||||
1.4.0: await fiber 4
|
||||
1.4.1: cur fiber[4] is some: true
|
||||
1.4.2: cur fiber[4] is some: true
|
||||
1.4.3: res 4 = 4
|
||||
1.4.1: res 4 = 4
|
||||
2: main fiber done
|
||||
3: main fiber exited
|
||||
============
|
||||
start
|
||||
1: start fibers
|
||||
1.7.1: I'm fiber 7 and I'm about to fail…
|
||||
1.8.1: sub-fiber 8 was cancelled
|
||||
1.9.1: sub-fiber 9 was cancelled
|
||||
2.0: fiber 0 resolved as ok
|
||||
2.1: fiber 1 resolved as ok
|
||||
2.2: fiber 2 resolved as ok
|
||||
|
|
@ -37,8 +25,8 @@ start
|
|||
2.5: fiber 5 resolved as ok
|
||||
2.6: fiber 6 resolved as ok
|
||||
2.7: fiber 7 resolved as error
|
||||
2.8: fiber 8 resolved as error
|
||||
2.9: fiber 9 resolved as error
|
||||
2.8: fiber 8 resolved as ok
|
||||
2.9: fiber 9 resolved as ok
|
||||
3: wait for subs
|
||||
4: await fiber 0
|
||||
5: res 0 = 0
|
||||
|
|
@ -55,7 +43,6 @@ start
|
|||
16: await fiber 6
|
||||
17: res 6 = 6
|
||||
18: await fiber 7
|
||||
19: main fiber cancelled with Failure("oh no!")
|
||||
20: main fiber result: error Failure("oh no!")
|
||||
21: main fib failed with "oh no!"
|
||||
22: main fiber exited
|
||||
19: main fiber result: error Failure("oh no!")
|
||||
20: main fib failed with "oh no!"
|
||||
21: main fiber exited
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -1,6 +1,6 @@
|
|||
open Moonpool
|
||||
module M_lwt = Moonpool_lwt
|
||||
module F = Moonpool_fib
|
||||
module F = Moonpool.Fut
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
|
||||
|
|
@ -9,9 +9,9 @@ let () =
|
|||
let@ bg = Fifo_pool.with_ ~num_threads:4 () in
|
||||
let r =
|
||||
M_lwt.lwt_main @@ fun runner ->
|
||||
let f1 = F.spawn_top ~on:bg (fun () -> 1) in
|
||||
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
|
||||
let f3 = F.spawn_top ~on:runner (fun () -> F.await f1 + 10) in
|
||||
let f1 = F.spawn ~on:bg (fun () -> 1) in
|
||||
let f2 = F.spawn ~on:runner (fun () -> 2) in
|
||||
let f3 = F.spawn ~on:runner (fun () -> F.await f1 + 10) in
|
||||
let r = F.await f2 + F.await f3 in
|
||||
assert (r = 13);
|
||||
r
|
||||
|
|
@ -24,7 +24,7 @@ let () =
|
|||
try
|
||||
let _r =
|
||||
M_lwt.lwt_main @@ fun runner ->
|
||||
let fib = F.spawn_top ~on:runner (fun () -> failwith "oops") in
|
||||
let fib = F.spawn ~on:runner (fun () -> failwith "oops") in
|
||||
F.await fib
|
||||
in
|
||||
|
||||
|
|
|
|||
|
|
@ -14,14 +14,11 @@ let run ~kind () =
|
|||
let pool =
|
||||
let on_init_thread ~dom_id:_ ~t_id () =
|
||||
Trace.set_thread_name (Printf.sprintf "pool worker %d" t_id)
|
||||
and around_task =
|
||||
( (fun self -> Trace.counter_int "n_tasks" (Ws_pool.num_tasks self)),
|
||||
fun self () -> Trace.counter_int "n_tasks" (Ws_pool.num_tasks self) )
|
||||
in
|
||||
|
||||
match kind with
|
||||
| `Simple -> Fifo_pool.create ~num_threads:3 ~on_init_thread ~around_task ()
|
||||
| `Ws_pool -> Ws_pool.create ~num_threads:3 ~on_init_thread ~around_task ()
|
||||
| `Simple -> Fifo_pool.create ~num_threads:3 ~on_init_thread ()
|
||||
| `Ws_pool -> Ws_pool.create ~num_threads:3 ~on_init_thread ()
|
||||
in
|
||||
|
||||
(* make all threads busy *)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue