mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-17 16:16:48 -05:00
Compare commits
6 commits
95de0e7e27
...
4b9e480013
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4b9e480013 | ||
|
|
f8e553d473 | ||
|
|
7082447073 | ||
|
|
6715502fdd | ||
|
|
e95f0e421d | ||
|
|
3e2ce57669 |
33 changed files with 5913 additions and 6505 deletions
44
README.md
44
README.md
|
|
@ -173,49 +173,9 @@ val expected_sum : int = 5050
|
||||||
We have a `Exn_bt.t` type that comes in handy in many places. It bundles together
|
We have a `Exn_bt.t` type that comes in handy in many places. It bundles together
|
||||||
an exception and the backtrace associated with the place the exception was caught.
|
an exception and the backtrace associated with the place the exception was caught.
|
||||||
|
|
||||||
### Fibers
|
### Local storage
|
||||||
|
|
||||||
On OCaml 5, Moonpool comes with a library `moonpool.fib` (module `Moonpool_fib`)
|
Moonpool, via picos, provides _task local storage_ (like thread-local storage, but per task).
|
||||||
which provides _lightweight fibers_
|
|
||||||
that can run on any Moonpool runner.
|
|
||||||
These fibers are a sort of lightweight thread, dispatched on the runner's
|
|
||||||
background thread(s).
|
|
||||||
Fibers rely on effects to implement `Fiber.await`, suspending themselves until the `await`-ed fiber
|
|
||||||
is done.
|
|
||||||
|
|
||||||
```ocaml
|
|
||||||
# #require "moonpool.fib";;
|
|
||||||
...
|
|
||||||
|
|
||||||
# (* convenient alias *)
|
|
||||||
module F = Moonpool_fib;;
|
|
||||||
module F = Moonpool_fib
|
|
||||||
# F.main (fun _runner ->
|
|
||||||
let f1 = F.spawn (fun () -> fib 10) in
|
|
||||||
let f2 = F.spawn (fun () -> fib 15) in
|
|
||||||
F.await f1 + F.await f2);;
|
|
||||||
- : int = 1076
|
|
||||||
```
|
|
||||||
|
|
||||||
Fibers form a _tree_, where a fiber calling `Fiber.spawn` to start a sub-fiber is
|
|
||||||
the sub-fiber's _parent_.
|
|
||||||
When a parent fails, all its children are cancelled (forced to fail).
|
|
||||||
This is a simple form of [Structured Concurrency](https://en.wikipedia.org/wiki/Structured_concurrency).
|
|
||||||
|
|
||||||
Like a future, a fiber eventually _resolves_ into a value (or an `Exn_bt.t`) that it's possible
|
|
||||||
to `await`. With `Fiber.res : 'a Fiber.t -> 'a Fut.t` it's possible to access that result
|
|
||||||
as a regular future, too.
|
|
||||||
However, this resolution is only done after all the children of the fiber have
|
|
||||||
resolved — the lifetime of fibers forms a well-nested tree in that sense.
|
|
||||||
|
|
||||||
When a fiber is suspended because it `await`s another fiber (or future), the scheduler's
|
|
||||||
thread on which it was running becomes available again and can go on process another task.
|
|
||||||
When the fiber resumes, it will automatically be re-scheduled on the same runner it started on.
|
|
||||||
This means fibers on pool P1 can await fibers from pool P2 and still be resumed on P1.
|
|
||||||
|
|
||||||
In addition to all that, fibers provide _fiber local storage_ (like thread-local storage, but per fiber).
|
|
||||||
This storage is inherited in `spawn` (as a shallow copy only — it's advisable to only
|
|
||||||
put persistent data in storage to avoid confusing aliasing).
|
|
||||||
The storage is convenient for carrying around context for cross-cutting concerns such
|
The storage is convenient for carrying around context for cross-cutting concerns such
|
||||||
as logging or tracing (e.g. a log tag for the current user or request ID, or a tracing
|
as logging or tracing (e.g. a log tag for the current user or request ID, or a tracing
|
||||||
scope).
|
scope).
|
||||||
|
|
|
||||||
2
dune
2
dune
|
|
@ -3,7 +3,7 @@
|
||||||
(flags :standard -strict-sequence -warn-error -a+8 -w +a-4-40-42-70)))
|
(flags :standard -strict-sequence -warn-error -a+8 -w +a-4-40-42-70)))
|
||||||
|
|
||||||
(mdx
|
(mdx
|
||||||
(libraries moonpool moonpool.forkjoin moonpool.fib threads)
|
(libraries moonpool moonpool.forkjoin threads)
|
||||||
(package moonpool)
|
(package moonpool)
|
||||||
(enabled_if
|
(enabled_if
|
||||||
(>= %{ocaml_version} 5.0)))
|
(>= %{ocaml_version} 5.0)))
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,12 @@
|
||||||
(** Example from
|
(** NOTE: this was an example from
|
||||||
https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 *)
|
https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 but
|
||||||
|
there is no cancelation anymore :) *)
|
||||||
|
|
||||||
let ( let@ ) = ( @@ )
|
let ( let@ ) = ( @@ )
|
||||||
|
|
||||||
let () =
|
let () =
|
||||||
let@ () = Trace_tef.with_setup () in
|
let@ () = Trace_tef.with_setup () in
|
||||||
let@ _ = Moonpool_fib.main in
|
let@ _ = Moonpool.main in
|
||||||
|
|
||||||
(* let@ runner = Moonpool.Ws_pool.with_ () in *)
|
(* let@ runner = Moonpool.Ws_pool.with_ () in *)
|
||||||
let@ runner = Moonpool.Background_thread.with_ () in
|
let@ runner = Moonpool.Background_thread.with_ () in
|
||||||
|
|
@ -13,15 +14,13 @@ let () =
|
||||||
(* Pretend this is some long-running read loop *)
|
(* Pretend this is some long-running read loop *)
|
||||||
for i = 1 to 10 do
|
for i = 1 to 10 do
|
||||||
Printf.printf "MAIN LOOP %d\n%!" i;
|
Printf.printf "MAIN LOOP %d\n%!" i;
|
||||||
Moonpool_fib.check_if_cancelled ();
|
let _ : _ Moonpool.Fut.t =
|
||||||
let _ : _ Moonpool_fib.t =
|
Moonpool.Fut.spawn ~on:runner (fun () ->
|
||||||
Moonpool_fib.spawn ~on:runner ~protect:false (fun () ->
|
|
||||||
Printf.printf "RUN FIBER %d\n%!" i;
|
Printf.printf "RUN FIBER %d\n%!" i;
|
||||||
Moonpool_fib.check_if_cancelled ();
|
|
||||||
Format.printf "FIBER %d NOT CANCELLED YET@." i;
|
Format.printf "FIBER %d NOT CANCELLED YET@." i;
|
||||||
failwith "BOOM")
|
failwith "BOOM")
|
||||||
in
|
in
|
||||||
Moonpool_fib.yield ();
|
Moonpool.Fut.yield ();
|
||||||
(* Thread.delay 0.2; *)
|
(* Thread.delay 0.2; *)
|
||||||
(* Thread.delay 0.0001; *)
|
(* Thread.delay 0.0001; *)
|
||||||
()
|
()
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,6 @@
|
||||||
;(package moonpool)
|
;(package moonpool)
|
||||||
(libraries
|
(libraries
|
||||||
moonpool
|
moonpool
|
||||||
moonpool.fib
|
|
||||||
trace
|
trace
|
||||||
trace-tef
|
trace-tef
|
||||||
;tracy-client.trace
|
;tracy-client.trace
|
||||||
|
|
|
||||||
|
|
@ -437,6 +437,8 @@ let await (self : 'a t) : 'a =
|
||||||
(* un-suspended: we should have a result! *)
|
(* un-suspended: we should have a result! *)
|
||||||
get_or_fail_exn self
|
get_or_fail_exn self
|
||||||
|
|
||||||
|
let yield = Picos.Fiber.yield
|
||||||
|
|
||||||
module Infix = struct
|
module Infix = struct
|
||||||
let[@inline] ( >|= ) x f = map ~f x
|
let[@inline] ( >|= ) x f = map ~f x
|
||||||
let[@inline] ( >>= ) x f = bind ~f x
|
let[@inline] ( >>= ) x f = bind ~f x
|
||||||
|
|
|
||||||
|
|
@ -8,12 +8,16 @@
|
||||||
(storing a [Ok x] with [x: 'a]), or fail (storing a [Error (exn, bt)] with
|
(storing a [Ok x] with [x: 'a]), or fail (storing a [Error (exn, bt)] with
|
||||||
an exception and the corresponding backtrace).
|
an exception and the corresponding backtrace).
|
||||||
|
|
||||||
|
Using {!spawn}, it's possible to start a bunch of tasks, obtaining futures,
|
||||||
|
and then use {!await} to get their result in the desired order.
|
||||||
|
|
||||||
Combinators such as {!map} and {!join_array} can be used to produce futures
|
Combinators such as {!map} and {!join_array} can be used to produce futures
|
||||||
from other futures (in a monadic way). Some combinators take a [on] argument
|
from other futures (in a monadic way). Some combinators take a [on] argument
|
||||||
to specify a runner on which the intermediate computation takes place; for
|
to specify a runner on which the intermediate computation takes place; for
|
||||||
example [map ~on:pool ~f fut] maps the value in [fut] using function [f],
|
example [map ~on:pool ~f fut] maps the value in [fut] using function [f],
|
||||||
applicatively; the call to [f] happens on the runner [pool] (once [fut]
|
applicatively; the call to [f] happens on the runner [pool] (once [fut]
|
||||||
resolves successfully with a value). *)
|
resolves successfully with a value). Be aware that these combinators do not
|
||||||
|
preserve local storage. *)
|
||||||
|
|
||||||
type 'a or_error = ('a, Exn_bt.t) result
|
type 'a or_error = ('a, Exn_bt.t) result
|
||||||
|
|
||||||
|
|
@ -30,7 +34,8 @@ val make : unit -> 'a t * 'a promise
|
||||||
|
|
||||||
val make_promise : unit -> 'a promise
|
val make_promise : unit -> 'a promise
|
||||||
(** Same as {!make} but returns a single promise (which can be upcast to a
|
(** Same as {!make} but returns a single promise (which can be upcast to a
|
||||||
future). This is useful mostly to preserve memory.
|
future). This is useful mostly to preserve memory, you probably don't need
|
||||||
|
it.
|
||||||
|
|
||||||
How to upcast to a future in the worst case:
|
How to upcast to a future in the worst case:
|
||||||
{[
|
{[
|
||||||
|
|
@ -40,8 +45,11 @@ val make_promise : unit -> 'a promise
|
||||||
@since 0.7 *)
|
@since 0.7 *)
|
||||||
|
|
||||||
val on_result : 'a t -> ('a or_error -> unit) -> unit
|
val on_result : 'a t -> ('a or_error -> unit) -> unit
|
||||||
(** [on_result fut f] registers [f] to be called in the future when [fut] is set
|
(** [on_result fut f] registers [f] to be called in the future when [fut] is
|
||||||
; or calls [f] immediately if [fut] is already set. *)
|
set; or calls [f] immediately if [fut] is already set.
|
||||||
|
|
||||||
|
{b NOTE:} it's ill advised to do meaningful work inside the callback [f].
|
||||||
|
Instead, try to spawn another task on the runner, or use {!await}. *)
|
||||||
|
|
||||||
val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
|
val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
|
||||||
(** [on_result_ignore fut f] registers [f] to be called in the future when [fut]
|
(** [on_result_ignore fut f] registers [f] to be called in the future when [fut]
|
||||||
|
|
@ -52,13 +60,14 @@ val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
|
||||||
exception Already_fulfilled
|
exception Already_fulfilled
|
||||||
|
|
||||||
val try_cancel : _ promise -> Exn_bt.t -> bool
|
val try_cancel : _ promise -> Exn_bt.t -> bool
|
||||||
(** [try_cancel promise ebt] tries to cancel the promise, returning [true]. It
|
(** [try_cancel promise ebt] tries to cancel the promise using the given
|
||||||
returns [false] if the promise is already resolved.
|
exception, returning [true]. It returns [false] if the promise is already
|
||||||
@since NEXT_RELEASE *)
|
resolved.
|
||||||
|
@since 0.9 *)
|
||||||
|
|
||||||
val cancel : _ promise -> Exn_bt.t -> unit
|
val cancel : _ promise -> Exn_bt.t -> unit
|
||||||
(** Silent version of {!try_cancel}, ignoring the result.
|
(** Silent version of {!try_cancel}, ignoring the result.
|
||||||
@since NEXT_RELEASE *)
|
@since 0.9 *)
|
||||||
|
|
||||||
val fulfill : 'a promise -> 'a or_error -> unit
|
val fulfill : 'a promise -> 'a or_error -> unit
|
||||||
(** Fullfill the promise, setting the future at the same time.
|
(** Fullfill the promise, setting the future at the same time.
|
||||||
|
|
@ -79,6 +88,7 @@ val fail_exn_bt : Exn_bt.t -> _ t
|
||||||
@since 0.6 *)
|
@since 0.6 *)
|
||||||
|
|
||||||
val of_result : 'a or_error -> 'a t
|
val of_result : 'a or_error -> 'a t
|
||||||
|
(** Already resolved future from a result. *)
|
||||||
|
|
||||||
val is_resolved : _ t -> bool
|
val is_resolved : _ t -> bool
|
||||||
(** [is_resolved fut] is [true] iff [fut] is resolved. *)
|
(** [is_resolved fut] is [true] iff [fut] is resolved. *)
|
||||||
|
|
@ -136,7 +146,7 @@ val spawn_on_current_runner : (unit -> 'a) -> 'a t
|
||||||
|
|
||||||
val reify_error : 'a t -> 'a or_error t
|
val reify_error : 'a t -> 'a or_error t
|
||||||
(** [reify_error fut] turns a failing future into a non-failing one that contain
|
(** [reify_error fut] turns a failing future into a non-failing one that contain
|
||||||
[Error (exn, bt)]. A non-failing future returning [x] is turned into [Ok x]
|
[Error (exn, bt)]. A non-failing future returning [x] is turned into [Ok x].
|
||||||
@since 0.4 *)
|
@since 0.4 *)
|
||||||
|
|
||||||
val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t
|
val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t
|
||||||
|
|
@ -149,12 +159,18 @@ val bind : ?on:Runner.t -> f:('a -> 'b t) -> 'a t -> 'b t
|
||||||
(** [bind ?on ~f fut] returns a new future [fut2] that resolves like the future
|
(** [bind ?on ~f fut] returns a new future [fut2] that resolves like the future
|
||||||
[f x] if [fut] resolved with [x]; and fails with [e] if [fut] fails with [e]
|
[f x] if [fut] resolved with [x]; and fails with [e] if [fut] fails with [e]
|
||||||
or [f x] raises [e].
|
or [f x] raises [e].
|
||||||
|
|
||||||
|
This does not preserve local storage of [fut] inside [f].
|
||||||
|
|
||||||
@param on if provided, [f] runs on the given runner *)
|
@param on if provided, [f] runs on the given runner *)
|
||||||
|
|
||||||
val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t
|
val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t
|
||||||
(** [bind_reify_error ?on ~f fut] returns a new future [fut2] that resolves like
|
(** [bind_reify_error ?on ~f fut] returns a new future [fut2] that resolves like
|
||||||
the future [f (Ok x)] if [fut] resolved with [x]; and resolves like the
|
the future [f (Ok x)] if [fut] resolved with [x]; and resolves like the
|
||||||
future [f (Error (exn, bt))] if [fut] fails with [exn] and backtrace [bt].
|
future [f (Error (exn, bt))] if [fut] fails with [exn] and backtrace [bt].
|
||||||
|
|
||||||
|
This does not preserve local storage of [fut] inside [f].
|
||||||
|
|
||||||
@param on if provided, [f] runs on the given runner
|
@param on if provided, [f] runs on the given runner
|
||||||
@since 0.4 *)
|
@since 0.4 *)
|
||||||
|
|
||||||
|
|
@ -182,6 +198,7 @@ val join_array : 'a t array -> 'a array t
|
||||||
val join_list : 'a t list -> 'a list t
|
val join_list : 'a t list -> 'a list t
|
||||||
(** Wait for all the futures in the list. Fails if any future fails. *)
|
(** Wait for all the futures in the list. Fails if any future fails. *)
|
||||||
|
|
||||||
|
(** Advanced primitives for synchronization *)
|
||||||
module Advanced : sig
|
module Advanced : sig
|
||||||
val barrier_on_abstract_container_of_futures :
|
val barrier_on_abstract_container_of_futures :
|
||||||
iter:(('a t -> unit) -> 'cont -> unit) ->
|
iter:(('a t -> unit) -> 'cont -> unit) ->
|
||||||
|
|
@ -234,7 +251,9 @@ val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t
|
||||||
|
|
||||||
(** {2 Await}
|
(** {2 Await}
|
||||||
|
|
||||||
{b NOTE} This is only available on OCaml 5. *)
|
This suspends the current task using an OCaml 5 algebraic effect, and makes
|
||||||
|
preparations for the task to be resumed once the future has been resolved.
|
||||||
|
*)
|
||||||
|
|
||||||
val await : 'a t -> 'a
|
val await : 'a t -> 'a
|
||||||
(** [await fut] suspends the current tasks until [fut] is fulfilled, then
|
(** [await fut] suspends the current tasks until [fut] is fulfilled, then
|
||||||
|
|
@ -244,7 +263,11 @@ val await : 'a t -> 'a
|
||||||
@since 0.3
|
@since 0.3
|
||||||
|
|
||||||
This must only be run from inside the runner itself. The runner must support
|
This must only be run from inside the runner itself. The runner must support
|
||||||
{!Suspend_}. {b NOTE}: only on OCaml 5.x *)
|
{!Suspend_}. *)
|
||||||
|
|
||||||
|
val yield : unit -> unit
|
||||||
|
(** Like {!Moonpool.yield}.
|
||||||
|
@since NEXT_RELEASE *)
|
||||||
|
|
||||||
(** {2 Blocking} *)
|
(** {2 Blocking} *)
|
||||||
|
|
||||||
|
|
@ -252,7 +275,7 @@ val wait_block : 'a t -> 'a or_error
|
||||||
(** [wait_block fut] blocks the current thread until [fut] is resolved, and
|
(** [wait_block fut] blocks the current thread until [fut] is resolved, and
|
||||||
returns its value.
|
returns its value.
|
||||||
|
|
||||||
{b NOTE}: A word of warning: this will monopolize the calling thread until
|
{b NOTE:} A word of warning: this will monopolize the calling thread until
|
||||||
the future resolves. This can also easily cause deadlocks, if enough threads
|
the future resolves. This can also easily cause deadlocks, if enough threads
|
||||||
in a pool call [wait_block] on futures running on the same pool or a pool
|
in a pool call [wait_block] on futures running on the same pool or a pool
|
||||||
depending on it.
|
depending on it.
|
||||||
|
|
@ -265,7 +288,10 @@ val wait_block : 'a t -> 'a or_error
|
||||||
the deadlock. *)
|
the deadlock. *)
|
||||||
|
|
||||||
val wait_block_exn : 'a t -> 'a
|
val wait_block_exn : 'a t -> 'a
|
||||||
(** Same as {!wait_block} but re-raises the exception if the future failed. *)
|
(** Same as {!wait_block} but re-raises the exception if the future failed.
|
||||||
|
|
||||||
|
{b NOTE:} do check the cautionary note in {!wait_block} concerning
|
||||||
|
deadlocks. *)
|
||||||
|
|
||||||
(** {2 Infix operators}
|
(** {2 Infix operators}
|
||||||
|
|
||||||
|
|
@ -297,9 +323,10 @@ module Infix_local = Infix
|
||||||
|
|
||||||
module Private_ : sig
|
module Private_ : sig
|
||||||
val unsafe_promise_of_fut : 'a t -> 'a promise
|
val unsafe_promise_of_fut : 'a t -> 'a promise
|
||||||
(** please do not use *)
|
(** Do not use unless you know exactly what you are doing. *)
|
||||||
|
|
||||||
val as_computation : 'a t -> 'a Picos.Computation.t
|
val as_computation : 'a t -> 'a Picos.Computation.t
|
||||||
|
(** Picos compat *)
|
||||||
end
|
end
|
||||||
|
|
||||||
(**/**)
|
(**/**)
|
||||||
|
|
|
||||||
|
|
@ -8,15 +8,15 @@ let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
|
||||||
in
|
in
|
||||||
let runner = Fifo_pool.Private_.runner_of_state worker_st in
|
let runner = Fifo_pool.Private_.runner_of_state worker_st in
|
||||||
try
|
try
|
||||||
let fiber = Fiber.spawn_top ~on:runner (fun () -> f runner) in
|
let fiber = Fut.spawn ~on:runner (fun () -> f runner) in
|
||||||
Fiber.on_result fiber (fun _ -> Runner.shutdown_without_waiting runner);
|
Fut.on_result fiber (fun _ -> Runner.shutdown_without_waiting runner);
|
||||||
|
|
||||||
(* run the main thread *)
|
(* run the main thread *)
|
||||||
Moonpool.Private.Worker_loop_.worker_loop worker_st
|
Worker_loop_.worker_loop worker_st
|
||||||
~block_signals (* do not disturb existing thread *)
|
~block_signals (* do not disturb existing thread *)
|
||||||
~ops:Fifo_pool.Private_.worker_ops;
|
~ops:Fifo_pool.Private_.worker_ops;
|
||||||
|
|
||||||
match Fiber.peek fiber with
|
match Fut.peek fiber with
|
||||||
| Some (Ok x) -> x
|
| Some (Ok x) -> x
|
||||||
| Some (Error ebt) -> Exn_bt.raise ebt
|
| Some (Error ebt) -> Exn_bt.raise ebt
|
||||||
| None -> assert false
|
| None -> assert false
|
||||||
|
|
@ -13,16 +13,18 @@
|
||||||
to {!Background_thread} in that there's a single worker to process
|
to {!Background_thread} in that there's a single worker to process
|
||||||
tasks/fibers.
|
tasks/fibers.
|
||||||
|
|
||||||
This handles effects, including the ones in {!Fiber}.
|
This handles the concurency effects used in moonpool, including [await] and
|
||||||
|
[yield].
|
||||||
|
|
||||||
@since 0.6 *)
|
This module was migrated from the late [Moonpool_fib].
|
||||||
|
|
||||||
val main : (Moonpool.Runner.t -> 'a) -> 'a
|
@since NEXT_RELEASE *)
|
||||||
|
|
||||||
|
val main : (Runner.t -> 'a) -> 'a
|
||||||
(** [main f] runs [f()] in a scope that handles effects, including
|
(** [main f] runs [f()] in a scope that handles effects, including
|
||||||
{!Fiber.await}.
|
{!Fiber.await}.
|
||||||
|
|
||||||
This scope can run background tasks as well, in a cooperative fashion. *)
|
This scope can run background tasks as well, in a cooperative fashion. *)
|
||||||
|
|
||||||
val main' : ?block_signals:bool -> unit -> (Moonpool.Runner.t -> 'a) -> 'a
|
val main' : ?block_signals:bool -> unit -> (Runner.t -> 'a) -> 'a
|
||||||
(** Same as {!main} but with room for optional arguments.
|
(** Same as {!main} but with room for optional arguments. *)
|
||||||
@since 0.7 *)
|
|
||||||
|
|
@ -23,6 +23,7 @@ module Exn_bt = Exn_bt
|
||||||
module Fifo_pool = Fifo_pool
|
module Fifo_pool = Fifo_pool
|
||||||
module Fut = Fut
|
module Fut = Fut
|
||||||
module Lock = Lock
|
module Lock = Lock
|
||||||
|
module Main = Main
|
||||||
module Immediate_runner = struct end
|
module Immediate_runner = struct end
|
||||||
module Runner = Runner
|
module Runner = Runner
|
||||||
module Task_local_storage = Task_local_storage
|
module Task_local_storage = Task_local_storage
|
||||||
|
|
@ -30,6 +31,9 @@ module Thread_local_storage = Thread_local_storage
|
||||||
module Trigger = Trigger
|
module Trigger = Trigger
|
||||||
module Ws_pool = Ws_pool
|
module Ws_pool = Ws_pool
|
||||||
|
|
||||||
|
(* re-export main *)
|
||||||
|
include Main
|
||||||
|
|
||||||
module Private = struct
|
module Private = struct
|
||||||
module Ws_deque_ = Ws_deque_
|
module Ws_deque_ = Ws_deque_
|
||||||
module Worker_loop_ = Worker_loop_
|
module Worker_loop_ = Worker_loop_
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ module Fifo_pool = Fifo_pool
|
||||||
module Background_thread = Background_thread
|
module Background_thread = Background_thread
|
||||||
module Runner = Runner
|
module Runner = Runner
|
||||||
module Trigger = Trigger
|
module Trigger = Trigger
|
||||||
|
module Main = Main
|
||||||
|
|
||||||
module Immediate_runner : sig end
|
module Immediate_runner : sig end
|
||||||
[@@deprecated "use Moonpool_fib.Main"]
|
[@@deprecated "use Moonpool_fib.Main"]
|
||||||
|
|
@ -80,7 +81,7 @@ val await : 'a Fut.t -> 'a
|
||||||
val yield : unit -> unit
|
val yield : unit -> unit
|
||||||
(** Yield from the current task, must be run on a moonpool runner. Only on OCaml
|
(** Yield from the current task, must be run on a moonpool runner. Only on OCaml
|
||||||
>= 5.0.
|
>= 5.0.
|
||||||
@since NEXT_RELEASE *)
|
@since 0.9 *)
|
||||||
|
|
||||||
module Lock = Lock
|
module Lock = Lock
|
||||||
module Fut = Fut
|
module Fut = Fut
|
||||||
|
|
@ -205,6 +206,10 @@ module Atomic = Atomic
|
||||||
This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic]
|
This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic]
|
||||||
module on OCaml 5. *)
|
module on OCaml 5. *)
|
||||||
|
|
||||||
|
include module type of struct
|
||||||
|
include Main
|
||||||
|
end
|
||||||
|
|
||||||
(**/**)
|
(**/**)
|
||||||
|
|
||||||
(** Private internals, with no stability guarantees *)
|
(** Private internals, with no stability guarantees *)
|
||||||
|
|
|
||||||
|
|
@ -1,6 +0,0 @@
|
||||||
(library
|
|
||||||
(name moonpool_fib)
|
|
||||||
(public_name moonpool.fib)
|
|
||||||
(synopsis "Fibers and structured concurrency for Moonpool")
|
|
||||||
(libraries moonpool picos)
|
|
||||||
(flags :standard -open Moonpool_private -open Moonpool))
|
|
||||||
334
src/fib/fiber.ml
334
src/fib/fiber.ml
|
|
@ -1,334 +0,0 @@
|
||||||
open Moonpool.Private.Types_
|
|
||||||
module A = Atomic
|
|
||||||
module FM = Handle.Map
|
|
||||||
module Int_map = Map.Make (Int)
|
|
||||||
module PF = Picos.Fiber
|
|
||||||
module FLS = Picos.Fiber.FLS
|
|
||||||
|
|
||||||
type 'a callback = 'a Exn_bt.result -> unit
|
|
||||||
(** Callbacks that are called when a fiber is done. *)
|
|
||||||
|
|
||||||
type cancel_callback = Exn_bt.t -> unit
|
|
||||||
|
|
||||||
let prom_of_fut : 'a Fut.t -> 'a Fut.promise =
|
|
||||||
Fut.Private_.unsafe_promise_of_fut
|
|
||||||
|
|
||||||
(* TODO: replace with picos structured at some point? *)
|
|
||||||
module Private_ = struct
|
|
||||||
type pfiber = PF.t
|
|
||||||
|
|
||||||
type 'a t = {
|
|
||||||
id: Handle.t; (** unique identifier for this fiber *)
|
|
||||||
state: 'a state A.t; (** Current state in the lifetime of the fiber *)
|
|
||||||
res: 'a Fut.t;
|
|
||||||
runner: Runner.t;
|
|
||||||
pfiber: pfiber; (** Associated picos fiber *)
|
|
||||||
}
|
|
||||||
|
|
||||||
and 'a state =
|
|
||||||
| Alive of {
|
|
||||||
children: children;
|
|
||||||
on_cancel: cancel_callback Int_map.t;
|
|
||||||
cancel_id: int;
|
|
||||||
}
|
|
||||||
| Terminating_or_done of 'a Exn_bt.result A.t
|
|
||||||
|
|
||||||
and children = any FM.t
|
|
||||||
and any = Any : _ t -> any [@@unboxed]
|
|
||||||
|
|
||||||
(** Key to access the current moonpool.fiber. *)
|
|
||||||
let k_current_fiber : any FLS.t = FLS.create ()
|
|
||||||
|
|
||||||
exception Not_set = FLS.Not_set
|
|
||||||
|
|
||||||
let[@inline] get_cur_from_exn (pfiber : pfiber) : any =
|
|
||||||
FLS.get_exn pfiber k_current_fiber
|
|
||||||
|
|
||||||
let[@inline] get_cur_exn () : any =
|
|
||||||
get_cur_from_exn @@ get_current_fiber_exn ()
|
|
||||||
|
|
||||||
let[@inline] get_cur_opt () = try Some (get_cur_exn ()) with _ -> None
|
|
||||||
|
|
||||||
let[@inline] is_closed (self : _ t) =
|
|
||||||
match A.get self.state with
|
|
||||||
| Alive _ -> false
|
|
||||||
| Terminating_or_done _ -> true
|
|
||||||
end
|
|
||||||
|
|
||||||
include Private_
|
|
||||||
|
|
||||||
let create_ ~pfiber ~runner ~res () : 'a t =
|
|
||||||
let id = Handle.generate_fresh () in
|
|
||||||
{
|
|
||||||
state =
|
|
||||||
A.make
|
|
||||||
@@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 };
|
|
||||||
id;
|
|
||||||
res;
|
|
||||||
runner;
|
|
||||||
pfiber;
|
|
||||||
}
|
|
||||||
|
|
||||||
let create_done_ ~res () : _ t =
|
|
||||||
let id = Handle.generate_fresh () in
|
|
||||||
{
|
|
||||||
state =
|
|
||||||
A.make
|
|
||||||
@@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 };
|
|
||||||
id;
|
|
||||||
res;
|
|
||||||
runner = Runner.dummy;
|
|
||||||
pfiber = Moonpool.Private.Types_._dummy_fiber;
|
|
||||||
}
|
|
||||||
|
|
||||||
let[@inline] return x = create_done_ ~res:(Fut.return x) ()
|
|
||||||
let[@inline] fail ebt = create_done_ ~res:(Fut.fail_exn_bt ebt) ()
|
|
||||||
let[@inline] res self = self.res
|
|
||||||
let[@inline] peek self = Fut.peek self.res
|
|
||||||
let[@inline] is_done self = Fut.is_done self.res
|
|
||||||
let[@inline] is_success self = Fut.is_success self.res
|
|
||||||
let[@inline] is_cancelled self = Fut.is_failed self.res
|
|
||||||
let[@inline] on_result (self : _ t) f = Fut.on_result self.res f
|
|
||||||
let[@inline] await self = Fut.await self.res
|
|
||||||
let[@inline] wait_block self = Fut.wait_block self.res
|
|
||||||
let[@inline] wait_block_exn self = Fut.wait_block_exn self.res
|
|
||||||
|
|
||||||
(** Resolve [promise] once [children] are all done *)
|
|
||||||
let resolve_once_children_are_done_ ~children ~promise
|
|
||||||
(res : 'a Exn_bt.result A.t) : unit =
|
|
||||||
let n_children = FM.cardinal children in
|
|
||||||
if n_children > 0 then (
|
|
||||||
(* wait for all children to be done *)
|
|
||||||
let n_waiting = A.make (FM.cardinal children) in
|
|
||||||
let on_child_finish (r : _ result) =
|
|
||||||
(* make sure the parent fails if any child fails *)
|
|
||||||
(match r with
|
|
||||||
| Ok _ -> ()
|
|
||||||
| Error ebt -> A.set res (Error ebt));
|
|
||||||
|
|
||||||
(* if we're the last to finish, resolve the parent fiber's [res] *)
|
|
||||||
if A.fetch_and_add n_waiting (-1) = 1 then (
|
|
||||||
let res = A.get res in
|
|
||||||
Fut.fulfill promise res
|
|
||||||
)
|
|
||||||
in
|
|
||||||
FM.iter (fun _ (Any f) -> Fut.on_result f.res on_child_finish) children
|
|
||||||
) else
|
|
||||||
Fut.fulfill promise @@ A.get res
|
|
||||||
|
|
||||||
let rec resolve_as_failed_ : type a. a t -> Exn_bt.t -> unit =
|
|
||||||
fun self ebt ->
|
|
||||||
let promise = prom_of_fut self.res in
|
|
||||||
while
|
|
||||||
match A.get self.state with
|
|
||||||
| Alive { children; cancel_id = _; on_cancel } as old ->
|
|
||||||
let new_st = Terminating_or_done (A.make @@ Error ebt) in
|
|
||||||
if A.compare_and_set self.state old new_st then (
|
|
||||||
(* here, unlike in {!resolve_fiber}, we immediately cancel children *)
|
|
||||||
cancel_children_ ~children ebt;
|
|
||||||
Int_map.iter (fun _ cb -> cb ebt) on_cancel;
|
|
||||||
resolve_once_children_are_done_ ~children ~promise (A.make @@ Error ebt);
|
|
||||||
false
|
|
||||||
) else
|
|
||||||
true
|
|
||||||
| Terminating_or_done _ -> false
|
|
||||||
do
|
|
||||||
()
|
|
||||||
done
|
|
||||||
|
|
||||||
(** Cancel eagerly all children *)
|
|
||||||
and cancel_children_ ebt ~children : unit =
|
|
||||||
FM.iter (fun _ (Any f) -> resolve_as_failed_ f ebt) children
|
|
||||||
|
|
||||||
type cancel_handle = int
|
|
||||||
|
|
||||||
let add_on_cancel (self : _ t) cb : cancel_handle =
|
|
||||||
let h = ref 0 in
|
|
||||||
while
|
|
||||||
match A.get self.state with
|
|
||||||
| Alive { children; cancel_id; on_cancel } as old ->
|
|
||||||
let new_st =
|
|
||||||
Alive
|
|
||||||
{
|
|
||||||
children;
|
|
||||||
cancel_id = cancel_id + 1;
|
|
||||||
on_cancel = Int_map.add cancel_id cb on_cancel;
|
|
||||||
}
|
|
||||||
in
|
|
||||||
if A.compare_and_set self.state old new_st then (
|
|
||||||
h := cancel_id;
|
|
||||||
false
|
|
||||||
) else
|
|
||||||
true
|
|
||||||
| Terminating_or_done r ->
|
|
||||||
(match A.get r with
|
|
||||||
| Error ebt -> cb ebt
|
|
||||||
| Ok _ -> ());
|
|
||||||
false
|
|
||||||
do
|
|
||||||
()
|
|
||||||
done;
|
|
||||||
!h
|
|
||||||
|
|
||||||
let remove_on_cancel (self : _ t) h =
|
|
||||||
while
|
|
||||||
match A.get self.state with
|
|
||||||
| Alive ({ on_cancel; _ } as alive) as old ->
|
|
||||||
let new_st =
|
|
||||||
Alive { alive with on_cancel = Int_map.remove h on_cancel }
|
|
||||||
in
|
|
||||||
not (A.compare_and_set self.state old new_st)
|
|
||||||
| Terminating_or_done _ -> false
|
|
||||||
do
|
|
||||||
()
|
|
||||||
done
|
|
||||||
|
|
||||||
let with_on_cancel (self : _ t) cb (k : unit -> 'a) : 'a =
|
|
||||||
let h = add_on_cancel self cb in
|
|
||||||
Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
|
|
||||||
|
|
||||||
(** Successfully resolve the fiber. This might still fail if some children
|
|
||||||
failed. *)
|
|
||||||
let resolve_ok_ (self : 'a t) (r : 'a) : unit =
|
|
||||||
let r = A.make @@ Ok r in
|
|
||||||
let promise = prom_of_fut self.res in
|
|
||||||
while
|
|
||||||
match A.get self.state with
|
|
||||||
| Alive { children; _ } as old ->
|
|
||||||
let new_st = Terminating_or_done r in
|
|
||||||
if A.compare_and_set self.state old new_st then (
|
|
||||||
resolve_once_children_are_done_ ~children ~promise r;
|
|
||||||
false
|
|
||||||
) else
|
|
||||||
true
|
|
||||||
| Terminating_or_done _ -> false
|
|
||||||
do
|
|
||||||
()
|
|
||||||
done
|
|
||||||
|
|
||||||
let remove_child_ (self : _ t) (child : _ t) =
|
|
||||||
while
|
|
||||||
match A.get self.state with
|
|
||||||
| Alive ({ children; _ } as alive) as old ->
|
|
||||||
let new_st =
|
|
||||||
Alive { alive with children = FM.remove child.id children }
|
|
||||||
in
|
|
||||||
not (A.compare_and_set self.state old new_st)
|
|
||||||
| _ -> false
|
|
||||||
do
|
|
||||||
()
|
|
||||||
done
|
|
||||||
|
|
||||||
(** Add a child to [self].
|
|
||||||
@param protected if true, the child's failure will not affect [self]. *)
|
|
||||||
let add_child_ ~protect (self : _ t) (child : _ t) =
|
|
||||||
while
|
|
||||||
match A.get self.state with
|
|
||||||
| Alive ({ children; _ } as alive) as old ->
|
|
||||||
let new_st =
|
|
||||||
Alive { alive with children = FM.add child.id (Any child) children }
|
|
||||||
in
|
|
||||||
|
|
||||||
if A.compare_and_set self.state old new_st then (
|
|
||||||
(* make sure to remove [child] from [self.children] once it's done;
|
|
||||||
fail [self] is [child] failed and [protect=false] *)
|
|
||||||
Fut.on_result child.res (function
|
|
||||||
| Ok _ -> remove_child_ self child
|
|
||||||
| Error ebt ->
|
|
||||||
(* child failed, we must fail too *)
|
|
||||||
remove_child_ self child;
|
|
||||||
if not protect then resolve_as_failed_ self ebt);
|
|
||||||
false
|
|
||||||
) else
|
|
||||||
true
|
|
||||||
| Terminating_or_done r ->
|
|
||||||
(match A.get r with
|
|
||||||
| Error ebt ->
|
|
||||||
(* cancel child immediately *)
|
|
||||||
resolve_as_failed_ child ebt
|
|
||||||
| Ok _ -> ());
|
|
||||||
false
|
|
||||||
do
|
|
||||||
()
|
|
||||||
done
|
|
||||||
|
|
||||||
let spawn_ ~parent ~runner (f : unit -> 'a) : 'a t =
|
|
||||||
let res, _ = Fut.make () in
|
|
||||||
let pfiber = PF.create ~forbid:false (Fut.Private_.as_computation res) in
|
|
||||||
|
|
||||||
(* copy local hmap from parent, if present *)
|
|
||||||
Option.iter
|
|
||||||
(fun (p : _ t) -> Fls.Private_hmap_ls_.copy_fls p.pfiber pfiber)
|
|
||||||
parent;
|
|
||||||
|
|
||||||
(match parent with
|
|
||||||
| Some p when is_closed p -> failwith "spawn: nursery is closed"
|
|
||||||
| _ -> ());
|
|
||||||
let fib = create_ ~pfiber ~runner ~res () in
|
|
||||||
|
|
||||||
let run () =
|
|
||||||
(* make sure the fiber is accessible from inside itself *)
|
|
||||||
FLS.set pfiber k_current_fiber (Any fib);
|
|
||||||
try
|
|
||||||
let res = f () in
|
|
||||||
resolve_ok_ fib res
|
|
||||||
with exn ->
|
|
||||||
let bt = Printexc.get_raw_backtrace () in
|
|
||||||
let ebt = Exn_bt.make exn bt in
|
|
||||||
resolve_as_failed_ fib ebt
|
|
||||||
in
|
|
||||||
|
|
||||||
Runner.run_async ~fiber:pfiber runner run;
|
|
||||||
|
|
||||||
fib
|
|
||||||
|
|
||||||
let spawn_top ~on f : _ t = spawn_ ~runner:on ~parent:None f
|
|
||||||
|
|
||||||
let spawn ?on ?(protect = true) f : _ t =
|
|
||||||
(* spawn [f()] with a copy of our local storage *)
|
|
||||||
let (Any p) =
|
|
||||||
try get_cur_exn ()
|
|
||||||
with Not_set ->
|
|
||||||
failwith "Fiber.spawn: must be run from within another fiber."
|
|
||||||
in
|
|
||||||
|
|
||||||
let runner =
|
|
||||||
match on with
|
|
||||||
| Some r -> r
|
|
||||||
| None -> p.runner
|
|
||||||
in
|
|
||||||
let child = spawn_ ~parent:(Some p) ~runner f in
|
|
||||||
add_child_ ~protect p child;
|
|
||||||
child
|
|
||||||
|
|
||||||
let[@inline] spawn_ignore ?on ?protect f : unit =
|
|
||||||
ignore (spawn ?on ?protect f : _ t)
|
|
||||||
|
|
||||||
let[@inline] spawn_top_ignore ~on f : unit = ignore (spawn_top ~on f : _ t)
|
|
||||||
|
|
||||||
let[@inline] self () : any =
|
|
||||||
match get_cur_exn () with
|
|
||||||
| exception Not_set -> failwith "Fiber.self: must be run from inside a fiber."
|
|
||||||
| f -> f
|
|
||||||
|
|
||||||
let with_on_self_cancel cb (k : unit -> 'a) : 'a =
|
|
||||||
let (Any self) = self () in
|
|
||||||
let h = add_on_cancel self cb in
|
|
||||||
Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
|
|
||||||
|
|
||||||
let[@inline] check_if_cancelled_ (self : _ t) = PF.check self.pfiber
|
|
||||||
|
|
||||||
let check_if_cancelled () =
|
|
||||||
match get_cur_exn () with
|
|
||||||
| exception Not_set ->
|
|
||||||
failwith "Fiber.check_if_cancelled: must be run from inside a fiber."
|
|
||||||
| Any self -> check_if_cancelled_ self
|
|
||||||
|
|
||||||
let yield () : unit =
|
|
||||||
match get_cur_exn () with
|
|
||||||
| exception Not_set ->
|
|
||||||
failwith "Fiber.yield: must be run from inside a fiber."
|
|
||||||
| Any self ->
|
|
||||||
check_if_cancelled_ self;
|
|
||||||
PF.yield ();
|
|
||||||
check_if_cancelled_ self
|
|
||||||
|
|
@ -1,150 +0,0 @@
|
||||||
(** Fibers.
|
|
||||||
|
|
||||||
A fiber is a lightweight computation that runs cooperatively alongside other
|
|
||||||
fibers. In the context of moonpool, fibers have additional properties:
|
|
||||||
|
|
||||||
- they run in a moonpool runner
|
|
||||||
- they form a simple supervision tree, enabling a limited form of structured
|
|
||||||
concurrency *)
|
|
||||||
|
|
||||||
type cancel_callback = Exn_bt.t -> unit
|
|
||||||
(** A callback used in case of cancellation *)
|
|
||||||
|
|
||||||
(**/**)
|
|
||||||
|
|
||||||
(** Do not rely on this, it is internal implementation details. *)
|
|
||||||
module Private_ : sig
|
|
||||||
type 'a state
|
|
||||||
type pfiber
|
|
||||||
|
|
||||||
type 'a t = private {
|
|
||||||
id: Handle.t; (** unique identifier for this fiber *)
|
|
||||||
state: 'a state Atomic.t; (** Current state in the lifetime of the fiber *)
|
|
||||||
res: 'a Fut.t;
|
|
||||||
runner: Runner.t;
|
|
||||||
pfiber: pfiber;
|
|
||||||
}
|
|
||||||
(** Type definition, exposed so that {!any} can be unboxed. Please do not rely
|
|
||||||
on that. *)
|
|
||||||
|
|
||||||
type any = Any : _ t -> any [@@unboxed]
|
|
||||||
|
|
||||||
exception Not_set
|
|
||||||
|
|
||||||
val get_cur_exn : unit -> any
|
|
||||||
(** [get_cur_exn ()] either returns the current fiber, or
|
|
||||||
@raise Not_set if run outside a fiber. *)
|
|
||||||
|
|
||||||
val get_cur_opt : unit -> any option
|
|
||||||
end
|
|
||||||
|
|
||||||
(**/**)
|
|
||||||
|
|
||||||
type 'a t = 'a Private_.t
|
|
||||||
(** A fiber returning a value of type ['a]. *)
|
|
||||||
|
|
||||||
val res : 'a t -> 'a Fut.t
|
|
||||||
(** Future result of the fiber. *)
|
|
||||||
|
|
||||||
type 'a callback = 'a Exn_bt.result -> unit
|
|
||||||
(** Callbacks that are called when a fiber is done. *)
|
|
||||||
|
|
||||||
(** Type erased fiber *)
|
|
||||||
type any = Private_.any = Any : _ t -> any [@@unboxed]
|
|
||||||
|
|
||||||
val return : 'a -> 'a t
|
|
||||||
val fail : Exn_bt.t -> _ t
|
|
||||||
|
|
||||||
val self : unit -> any
|
|
||||||
(** [self ()] is the current fiber. Must be run from inside a fiber.
|
|
||||||
@raise Failure if not run from inside a fiber. *)
|
|
||||||
|
|
||||||
val peek : 'a t -> 'a Fut.or_error option
|
|
||||||
(** Peek inside the future result *)
|
|
||||||
|
|
||||||
val is_done : _ t -> bool
|
|
||||||
(** Has the fiber completed? *)
|
|
||||||
|
|
||||||
val is_cancelled : _ t -> bool
|
|
||||||
(** Has the fiber completed with a failure? *)
|
|
||||||
|
|
||||||
val is_success : _ t -> bool
|
|
||||||
(** Has the fiber completed with a value? *)
|
|
||||||
|
|
||||||
val await : 'a t -> 'a
|
|
||||||
(** [await fib] is like [Fut.await (res fib)] *)
|
|
||||||
|
|
||||||
val wait_block_exn : 'a t -> 'a
|
|
||||||
(** [wait_block_exn fib] is [Fut.wait_block_exn (res fib)]. {b NOTE}: See
|
|
||||||
{!Fut.wait_block} for warnings about deadlocks. *)
|
|
||||||
|
|
||||||
val wait_block : 'a t -> 'a Fut.or_error
|
|
||||||
(** [wait_block fib] is [Fut.wait_block (res fib)]. {b NOTE}: See
|
|
||||||
{!Fut.wait_block} for warnings about deadlocks. *)
|
|
||||||
|
|
||||||
val check_if_cancelled : unit -> unit
|
|
||||||
(** Check if the current fiber is cancelled, in which case this raises. Must be
|
|
||||||
run from inside a fiber.
|
|
||||||
@raise e if the current fiber is cancelled with exception [e]
|
|
||||||
@raise Failure if not run from a fiber. *)
|
|
||||||
|
|
||||||
val yield : unit -> unit
|
|
||||||
(** Yield control to the scheduler from the current fiber.
|
|
||||||
@raise Failure if not run from inside a fiber. *)
|
|
||||||
|
|
||||||
type cancel_handle
|
|
||||||
(** An opaque handle for a single cancel callback in a fiber *)
|
|
||||||
|
|
||||||
val add_on_cancel : _ t -> cancel_callback -> cancel_handle
|
|
||||||
(** [add_on_cancel fib cb] adds [cb] to the list of cancel callbacks for [fib].
|
|
||||||
If [fib] is already cancelled, [cb] is called immediately. *)
|
|
||||||
|
|
||||||
val remove_on_cancel : _ t -> cancel_handle -> unit
|
|
||||||
(** [remove_on_cancel fib h] removes the cancel callback associated with handle
|
|
||||||
[h]. *)
|
|
||||||
|
|
||||||
val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'a
|
|
||||||
(** [with_on_cancel fib cb (fun () -> <e>)] evaluates [e] in a scope in which,
|
|
||||||
if the fiber [fib] is cancelled, [cb()] is called. If [e] returns without
|
|
||||||
the fiber being cancelled, this callback is removed. *)
|
|
||||||
|
|
||||||
val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'a
|
|
||||||
(** [with_on_self_cancel cb f] calls [f()] in a scope where [cb] is added to the
|
|
||||||
cancel callbacks of the current fiber; and [f()] terminates, [cb] is removed
|
|
||||||
from the list. *)
|
|
||||||
|
|
||||||
val on_result : 'a t -> 'a callback -> unit
|
|
||||||
(** Wait for fiber to be done and call the callback with the result. If the
|
|
||||||
fiber is done already then the callback is invoked immediately with its
|
|
||||||
result. *)
|
|
||||||
|
|
||||||
val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t
|
|
||||||
(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. This
|
|
||||||
fiber is not the child of any other fiber: its lifetime is only determined
|
|
||||||
by the lifetime of [f()]. *)
|
|
||||||
|
|
||||||
val spawn : ?on:Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a t
|
|
||||||
(** [spawn ~protect f] spawns a sub-fiber [f_child] from a running fiber
|
|
||||||
[parent]. The sub-fiber [f_child] is attached to the current fiber and fails
|
|
||||||
if the current fiber [parent] fails.
|
|
||||||
|
|
||||||
@param on
|
|
||||||
if provided, start the fiber on the given runner. If not provided, use the
|
|
||||||
parent's runner.
|
|
||||||
@param protect
|
|
||||||
if true, when [f_child] fails, it does not affect [parent]. If false,
|
|
||||||
[f_child] failing also causes [parent] to fail (and therefore all other
|
|
||||||
children of [parent]). Default is [true].
|
|
||||||
|
|
||||||
Must be run from inside a fiber.
|
|
||||||
@raise Failure if not run from inside a fiber. *)
|
|
||||||
|
|
||||||
val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit
|
|
||||||
(** [spawn_ignore f] is [ignore (spawn f)]. The fiber will still affect
|
|
||||||
termination of the parent, ie. the parent will exit only after this new
|
|
||||||
fiber exits.
|
|
||||||
@param on the optional runner to use, added since 0.7 *)
|
|
||||||
|
|
||||||
val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit
|
|
||||||
(** Like {!spawn_top} but ignores the result.
|
|
||||||
@since 0.7 *)
|
|
||||||
|
|
@ -1 +0,0 @@
|
||||||
include Task_local_storage
|
|
||||||
|
|
@ -1,17 +0,0 @@
|
||||||
(** Fiber-local storage.
|
|
||||||
|
|
||||||
This storage is associated to the current fiber, just like thread-local
|
|
||||||
storage is associated with the current thread.
|
|
||||||
|
|
||||||
See {!Moonpool.Task_local_storage} for more general information, as this is
|
|
||||||
based on it.
|
|
||||||
|
|
||||||
{b NOTE}: it's important to note that, while each fiber has its own storage,
|
|
||||||
spawning a sub-fiber [f2] from a fiber [f1] will only do a shallow copy of
|
|
||||||
the storage. Values inside [f1]'s storage will be physically shared with
|
|
||||||
[f2]. It is thus recommended to store only persistent values in the local
|
|
||||||
storage. *)
|
|
||||||
|
|
||||||
include module type of struct
|
|
||||||
include Task_local_storage
|
|
||||||
end
|
|
||||||
|
|
@ -1,14 +0,0 @@
|
||||||
module A = Atomic
|
|
||||||
|
|
||||||
type t = int
|
|
||||||
|
|
||||||
let counter_ = A.make 0
|
|
||||||
let equal : t -> t -> bool = ( = )
|
|
||||||
let compare : t -> t -> int = Stdlib.compare
|
|
||||||
let[@inline] generate_fresh () = A.fetch_and_add counter_ 1
|
|
||||||
|
|
||||||
(* TODO: better hash *)
|
|
||||||
let[@inline] hash x = x land max_int
|
|
||||||
|
|
||||||
module Set = Set.Make (Int)
|
|
||||||
module Map = Map.Make (Int)
|
|
||||||
|
|
@ -1,17 +0,0 @@
|
||||||
(** The unique name of a fiber.
|
|
||||||
|
|
||||||
Each fiber has a unique handle that can be used to refer to it in maps or
|
|
||||||
sets. *)
|
|
||||||
|
|
||||||
type t = private int
|
|
||||||
(** Unique, opaque identifier for a fiber. *)
|
|
||||||
|
|
||||||
val equal : t -> t -> bool
|
|
||||||
val compare : t -> t -> int
|
|
||||||
val hash : t -> int
|
|
||||||
|
|
||||||
val generate_fresh : unit -> t
|
|
||||||
(** Generate a fresh, unique identifier *)
|
|
||||||
|
|
||||||
module Set : Set.S with type elt = t
|
|
||||||
module Map : Map.S with type key = t
|
|
||||||
|
|
@ -1,12 +0,0 @@
|
||||||
(** Fibers for moonpool.
|
|
||||||
|
|
||||||
See {!Fiber} for the most important explanations.
|
|
||||||
|
|
||||||
@since 0.6. *)
|
|
||||||
|
|
||||||
module Fiber = Fiber
|
|
||||||
module Fls = Fls
|
|
||||||
module Handle = Handle
|
|
||||||
module Main = Main
|
|
||||||
include Fiber
|
|
||||||
include Main
|
|
||||||
|
|
@ -5,7 +5,6 @@
|
||||||
(>= %{ocaml_version} 5.0))
|
(>= %{ocaml_version} 5.0))
|
||||||
(libraries
|
(libraries
|
||||||
(re_export moonpool)
|
(re_export moonpool)
|
||||||
moonpool.fib
|
|
||||||
picos
|
picos
|
||||||
(re_export lwt)
|
(re_export lwt)
|
||||||
lwt.unix))
|
lwt.unix))
|
||||||
|
|
|
||||||
|
|
@ -289,7 +289,7 @@ let[@inline] is_setup () = Option.is_some @@ Atomic.get Scheduler_state.cur_st
|
||||||
let spawn_lwt f : _ Lwt.t =
|
let spawn_lwt f : _ Lwt.t =
|
||||||
let st = Main_state.get_st () in
|
let st = Main_state.get_st () in
|
||||||
let lwt_fut, lwt_prom = Lwt.wait () in
|
let lwt_fut, lwt_prom = Lwt.wait () in
|
||||||
Moonpool_fib.spawn_top_ignore ~on:st.as_runner (fun () ->
|
M.run_async st.as_runner (fun () ->
|
||||||
try
|
try
|
||||||
let x = f () in
|
let x = f () in
|
||||||
Lwt.wakeup lwt_prom x
|
Lwt.wakeup lwt_prom x
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,6 @@
|
||||||
(libraries
|
(libraries
|
||||||
t_fibers
|
t_fibers
|
||||||
moonpool
|
moonpool
|
||||||
moonpool.fib
|
|
||||||
trace
|
trace
|
||||||
trace-tef
|
trace-tef
|
||||||
qcheck-core
|
qcheck-core
|
||||||
|
|
|
||||||
|
|
@ -2,4 +2,4 @@
|
||||||
(name t_fibers)
|
(name t_fibers)
|
||||||
(package moonpool)
|
(package moonpool)
|
||||||
(optional)
|
(optional)
|
||||||
(libraries moonpool moonpool.fib trace qcheck-core hmap))
|
(libraries moonpool trace qcheck-core hmap))
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
open! Moonpool
|
module Chan = Moonpool.Chan
|
||||||
|
module Exn_bt = Moonpool.Exn_bt
|
||||||
module A = Atomic
|
module A = Atomic
|
||||||
module F = Moonpool_fib.Fiber
|
module Fut = Moonpool.Fut
|
||||||
|
|
||||||
let ( let@ ) = ( @@ )
|
let ( let@ ) = ( @@ )
|
||||||
|
|
||||||
|
|
@ -49,24 +50,23 @@ let logf = Log_.logf
|
||||||
let run1 ~runner () =
|
let run1 ~runner () =
|
||||||
Printf.printf "============\nstart\n%!";
|
Printf.printf "============\nstart\n%!";
|
||||||
let clock = ref TS.init in
|
let clock = ref TS.init in
|
||||||
let fib =
|
let fut =
|
||||||
F.spawn_top ~on:runner @@ fun () ->
|
Fut.spawn ~on:runner @@ fun () ->
|
||||||
let chan_progress = Chan.create ~max_size:4 () in
|
let chan_progress = Chan.create ~max_size:4 () in
|
||||||
let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in
|
let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in
|
||||||
|
|
||||||
let subs =
|
let subs =
|
||||||
List.init 5 (fun i ->
|
List.init 5 (fun i ->
|
||||||
F.spawn ~protect:false @@ fun _n ->
|
Fut.spawn ~on:runner @@ fun _n ->
|
||||||
Thread.delay (float i *. 0.01);
|
Thread.delay (float i *. 0.01);
|
||||||
Chan.pop chans.(i);
|
Chan.pop chans.(i);
|
||||||
Chan.push chan_progress i;
|
Chan.push chan_progress i;
|
||||||
F.check_if_cancelled ();
|
|
||||||
i)
|
i)
|
||||||
in
|
in
|
||||||
|
|
||||||
logf (TS.tick_get clock) "wait for subs";
|
logf (TS.tick_get clock) "wait for subs";
|
||||||
|
|
||||||
F.spawn_ignore (fun () ->
|
Moonpool.run_async runner (fun () ->
|
||||||
for i = 0 to 4 do
|
for i = 0 to 4 do
|
||||||
Chan.push chans.(i) ();
|
Chan.push chans.(i) ();
|
||||||
let i' = Chan.pop chan_progress in
|
let i' = Chan.pop chan_progress in
|
||||||
|
|
@ -78,19 +78,15 @@ let run1 ~runner () =
|
||||||
(fun i f ->
|
(fun i f ->
|
||||||
let clock = ref (0 :: i :: clock0) in
|
let clock = ref (0 :: i :: clock0) in
|
||||||
logf !clock "await fiber %d" i;
|
logf !clock "await fiber %d" i;
|
||||||
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
|
let res = Fut.await f in
|
||||||
(Option.is_some @@ F.Private_.get_cur_opt ());
|
Fut.yield ();
|
||||||
let res = F.await f in
|
|
||||||
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
|
|
||||||
(Option.is_some @@ F.Private_.get_cur_opt ());
|
|
||||||
F.yield ();
|
|
||||||
logf (TS.tick_get clock) "res %d = %d" i res)
|
logf (TS.tick_get clock) "res %d = %d" i res)
|
||||||
subs);
|
subs);
|
||||||
|
|
||||||
logf (TS.tick_get clock) "main fiber done"
|
logf (TS.tick_get clock) "main fiber done"
|
||||||
in
|
in
|
||||||
|
|
||||||
Fut.await @@ F.res fib;
|
Fut.await fut;
|
||||||
logf (TS.tick_get clock) "main fiber exited";
|
logf (TS.tick_get clock) "main fiber exited";
|
||||||
Log_.print_and_clear ();
|
Log_.print_and_clear ();
|
||||||
()
|
()
|
||||||
|
|
@ -99,15 +95,11 @@ let run2 ~runner () =
|
||||||
(* same but now, cancel one of the sub-fibers *)
|
(* same but now, cancel one of the sub-fibers *)
|
||||||
Printf.printf "============\nstart\n";
|
Printf.printf "============\nstart\n";
|
||||||
|
|
||||||
let clock = ref TS.init in
|
let to_await = ref [] in
|
||||||
let fib =
|
|
||||||
F.spawn_top ~on:runner @@ fun () ->
|
|
||||||
let@ () =
|
|
||||||
F.with_on_self_cancel (fun ebt ->
|
|
||||||
logf (TS.tick_get clock) "main fiber cancelled with %s"
|
|
||||||
@@ Exn_bt.show ebt)
|
|
||||||
in
|
|
||||||
|
|
||||||
|
let clock = ref TS.init in
|
||||||
|
let fut =
|
||||||
|
Fut.spawn ~on:runner @@ fun () ->
|
||||||
let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in
|
let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in
|
||||||
let chan_progress = Chan.create ~max_size:4 () in
|
let chan_progress = Chan.create ~max_size:4 () in
|
||||||
|
|
||||||
|
|
@ -116,11 +108,7 @@ let run2 ~runner () =
|
||||||
let clock0 = !clock in
|
let clock0 = !clock in
|
||||||
List.init 10 (fun i ->
|
List.init 10 (fun i ->
|
||||||
let clock = ref (0 :: i :: clock0) in
|
let clock = ref (0 :: i :: clock0) in
|
||||||
F.spawn ~protect:false @@ fun _n ->
|
Fut.spawn ~on:runner @@ fun _n ->
|
||||||
let@ () =
|
|
||||||
F.with_on_self_cancel (fun _ ->
|
|
||||||
logf (TS.tick_get clock) "sub-fiber %d was cancelled" i)
|
|
||||||
in
|
|
||||||
Thread.delay 0.002;
|
Thread.delay 0.002;
|
||||||
|
|
||||||
(* sync for determinism *)
|
(* sync for determinism *)
|
||||||
|
|
@ -132,46 +120,51 @@ let run2 ~runner () =
|
||||||
failwith "oh no!"
|
failwith "oh no!"
|
||||||
);
|
);
|
||||||
|
|
||||||
F.check_if_cancelled ();
|
|
||||||
i)
|
i)
|
||||||
in
|
in
|
||||||
|
|
||||||
let post = TS.tick_get clock in
|
let post = TS.tick_get clock in
|
||||||
List.iteri
|
List.iteri
|
||||||
(fun i fib ->
|
(fun i fib ->
|
||||||
F.on_result fib (function
|
Fut.on_result fib (function
|
||||||
| Ok _ -> logf (i :: post) "fiber %d resolved as ok" i
|
| Ok _ -> logf (i :: post) "fiber %d resolved as ok" i
|
||||||
| Error _ -> logf (i :: post) "fiber %d resolved as error" i))
|
| Error _ -> logf (i :: post) "fiber %d resolved as error" i))
|
||||||
subs;
|
subs;
|
||||||
|
|
||||||
(* sequentialize the fibers, for determinism *)
|
(* sequentialize the fibers, for determinism *)
|
||||||
F.spawn_ignore (fun () ->
|
let sender =
|
||||||
for j = 0 to 9 do
|
Fut.spawn ~on:runner (fun () ->
|
||||||
Chan.push chans_unblock.(j) ();
|
for j = 0 to 9 do
|
||||||
let j' = Chan.pop chan_progress in
|
Chan.push chans_unblock.(j) ();
|
||||||
assert (j = j')
|
let j' = Chan.pop chan_progress in
|
||||||
done);
|
assert (j = j')
|
||||||
|
done)
|
||||||
|
in
|
||||||
|
to_await := sender :: !to_await;
|
||||||
|
|
||||||
logf (TS.tick_get clock) "wait for subs";
|
logf (TS.tick_get clock) "wait for subs";
|
||||||
List.iteri
|
List.iteri
|
||||||
(fun i f ->
|
(fun i f ->
|
||||||
logf (TS.tick_get clock) "await fiber %d" i;
|
logf (TS.tick_get clock) "await fiber %d" i;
|
||||||
let res = F.await f in
|
let res = Fut.await f in
|
||||||
logf (TS.tick_get clock) "res %d = %d" i res)
|
logf (TS.tick_get clock) "res %d = %d" i res)
|
||||||
subs;
|
subs;
|
||||||
logf (TS.tick_get clock) "yield";
|
logf (TS.tick_get clock) "yield";
|
||||||
F.yield ();
|
Fut.yield ();
|
||||||
logf (TS.tick_get clock) "yielded";
|
logf (TS.tick_get clock) "yielded";
|
||||||
logf (TS.tick_get clock) "main fiber done"
|
logf (TS.tick_get clock) "main fiber done"
|
||||||
in
|
in
|
||||||
|
|
||||||
F.on_result fib (function
|
Fut.on_result fut (function
|
||||||
| Ok () -> logf (TS.tick_get clock) "main fiber result: ok"
|
| Ok () -> logf (TS.tick_get clock) "main fiber result: ok"
|
||||||
| Error ebt ->
|
| Error ebt ->
|
||||||
logf (TS.tick_get clock) "main fiber result: error %s" (Exn_bt.show ebt));
|
logf (TS.tick_get clock) "main fiber result: error %s" (Exn_bt.show ebt));
|
||||||
|
|
||||||
(try Fut.await @@ F.res fib
|
(try Fut.await fut
|
||||||
with Failure msg -> logf (TS.tick_get clock) "main fib failed with %S" msg);
|
with Failure msg -> logf (TS.tick_get clock) "main fib failed with %S" msg);
|
||||||
logf (TS.tick_get clock) "main fiber exited";
|
logf (TS.tick_get clock) "main fiber exited";
|
||||||
|
|
||||||
|
List.iter Fut.await !to_await;
|
||||||
|
|
||||||
Log_.print_and_clear ();
|
Log_.print_and_clear ();
|
||||||
()
|
()
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
open! Moonpool
|
open! Moonpool
|
||||||
module A = Atomic
|
module A = Atomic
|
||||||
module F = Moonpool_fib.Fiber
|
module F = Moonpool.Fut
|
||||||
module FLS = Moonpool_fib.Fls
|
module FLS = Moonpool.Task_local_storage
|
||||||
|
|
||||||
(* ### dummy little tracing system with local storage *)
|
(* ### dummy little tracing system with local storage *)
|
||||||
|
|
||||||
|
|
@ -122,7 +122,7 @@ let run ~pool ~pool_name () =
|
||||||
|
|
||||||
let subs =
|
let subs =
|
||||||
List.init 2 (fun idx_sub_sub ->
|
List.init 2 (fun idx_sub_sub ->
|
||||||
F.spawn ~protect:true (fun () ->
|
F.spawn ~on:pool (fun () ->
|
||||||
sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ()))
|
sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ()))
|
||||||
in
|
in
|
||||||
List.iter F.await subs
|
List.iter F.await subs
|
||||||
|
|
@ -133,8 +133,7 @@ let run ~pool ~pool_name () =
|
||||||
|
|
||||||
let subs =
|
let subs =
|
||||||
List.init 2 (fun k ->
|
List.init 2 (fun k ->
|
||||||
F.spawn ~protect:true @@ fun () ->
|
F.spawn ~on:pool @@ fun () -> sub_child ~idx ~idx_child ~idx_sub:k ())
|
||||||
sub_child ~idx ~idx_child ~idx_sub:k ())
|
|
||||||
in
|
in
|
||||||
|
|
||||||
let@ () =
|
let@ () =
|
||||||
|
|
@ -149,16 +148,14 @@ let run ~pool ~pool_name () =
|
||||||
|
|
||||||
let subs =
|
let subs =
|
||||||
List.init 5 (fun j ->
|
List.init 5 (fun j ->
|
||||||
F.spawn ~protect:true @@ fun () -> top_child ~idx ~idx_child:j ())
|
F.spawn ~on:pool @@ fun () -> top_child ~idx ~idx_child:j ())
|
||||||
in
|
in
|
||||||
|
|
||||||
List.iter F.await subs
|
List.iter F.await subs
|
||||||
in
|
in
|
||||||
|
|
||||||
Printf.printf "run test on pool = %s\n" pool_name;
|
Printf.printf "run test on pool = %s\n" pool_name;
|
||||||
let fibs =
|
let fibs = List.init 8 (fun idx -> F.spawn ~on:pool (fun () -> top idx)) in
|
||||||
List.init 8 (fun idx -> F.spawn_top ~on:pool (fun () -> top idx))
|
|
||||||
in
|
|
||||||
List.iter F.await fibs;
|
List.iter F.await fibs;
|
||||||
|
|
||||||
Printf.printf "tracing complete\n";
|
Printf.printf "tracing complete\n";
|
||||||
|
|
|
||||||
|
|
@ -2,33 +2,21 @@
|
||||||
start
|
start
|
||||||
1: wait for subs
|
1: wait for subs
|
||||||
1.0.0: await fiber 0
|
1.0.0: await fiber 0
|
||||||
1.0.1: cur fiber[0] is some: true
|
1.0.1: res 0 = 0
|
||||||
1.0.2: cur fiber[0] is some: true
|
|
||||||
1.0.3: res 0 = 0
|
|
||||||
1.1.0: await fiber 1
|
1.1.0: await fiber 1
|
||||||
1.1.1: cur fiber[1] is some: true
|
1.1.1: res 1 = 1
|
||||||
1.1.2: cur fiber[1] is some: true
|
|
||||||
1.1.3: res 1 = 1
|
|
||||||
1.2.0: await fiber 2
|
1.2.0: await fiber 2
|
||||||
1.2.1: cur fiber[2] is some: true
|
1.2.1: res 2 = 2
|
||||||
1.2.2: cur fiber[2] is some: true
|
|
||||||
1.2.3: res 2 = 2
|
|
||||||
1.3.0: await fiber 3
|
1.3.0: await fiber 3
|
||||||
1.3.1: cur fiber[3] is some: true
|
1.3.1: res 3 = 3
|
||||||
1.3.2: cur fiber[3] is some: true
|
|
||||||
1.3.3: res 3 = 3
|
|
||||||
1.4.0: await fiber 4
|
1.4.0: await fiber 4
|
||||||
1.4.1: cur fiber[4] is some: true
|
1.4.1: res 4 = 4
|
||||||
1.4.2: cur fiber[4] is some: true
|
|
||||||
1.4.3: res 4 = 4
|
|
||||||
2: main fiber done
|
2: main fiber done
|
||||||
3: main fiber exited
|
3: main fiber exited
|
||||||
============
|
============
|
||||||
start
|
start
|
||||||
1: start fibers
|
1: start fibers
|
||||||
1.7.1: I'm fiber 7 and I'm about to fail…
|
1.7.1: I'm fiber 7 and I'm about to fail…
|
||||||
1.8.1: sub-fiber 8 was cancelled
|
|
||||||
1.9.1: sub-fiber 9 was cancelled
|
|
||||||
2.0: fiber 0 resolved as ok
|
2.0: fiber 0 resolved as ok
|
||||||
2.1: fiber 1 resolved as ok
|
2.1: fiber 1 resolved as ok
|
||||||
2.2: fiber 2 resolved as ok
|
2.2: fiber 2 resolved as ok
|
||||||
|
|
@ -37,8 +25,8 @@ start
|
||||||
2.5: fiber 5 resolved as ok
|
2.5: fiber 5 resolved as ok
|
||||||
2.6: fiber 6 resolved as ok
|
2.6: fiber 6 resolved as ok
|
||||||
2.7: fiber 7 resolved as error
|
2.7: fiber 7 resolved as error
|
||||||
2.8: fiber 8 resolved as error
|
2.8: fiber 8 resolved as ok
|
||||||
2.9: fiber 9 resolved as error
|
2.9: fiber 9 resolved as ok
|
||||||
3: wait for subs
|
3: wait for subs
|
||||||
4: await fiber 0
|
4: await fiber 0
|
||||||
5: res 0 = 0
|
5: res 0 = 0
|
||||||
|
|
@ -55,7 +43,6 @@ start
|
||||||
16: await fiber 6
|
16: await fiber 6
|
||||||
17: res 6 = 6
|
17: res 6 = 6
|
||||||
18: await fiber 7
|
18: await fiber 7
|
||||||
19: main fiber cancelled with Failure("oh no!")
|
19: main fiber result: error Failure("oh no!")
|
||||||
20: main fiber result: error Failure("oh no!")
|
20: main fib failed with "oh no!"
|
||||||
21: main fib failed with "oh no!"
|
21: main fiber exited
|
||||||
22: main fiber exited
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
let ( let@ ) = ( @@ )
|
let ( let@ ) = ( @@ )
|
||||||
|
|
||||||
let () =
|
let () =
|
||||||
let@ runner = Moonpool_fib.main in
|
let@ runner = Moonpool.main in
|
||||||
T_fibers.Fib.run1 ~runner ();
|
T_fibers.Fib.run1 ~runner ();
|
||||||
T_fibers.Fib.run2 ~runner ()
|
T_fibers.Fib.run2 ~runner ()
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load diff
|
|
@ -3,7 +3,7 @@ open! Moonpool
|
||||||
let ( let@ ) = ( @@ )
|
let ( let@ ) = ( @@ )
|
||||||
|
|
||||||
let () =
|
let () =
|
||||||
let@ _ = Moonpool_fib.main in
|
let@ _ = Moonpool.main in
|
||||||
(let@ pool = Ws_pool.with_ () in
|
(let@ pool = Ws_pool.with_ () in
|
||||||
T_fibers.Fls.run ~pool ~pool_name:"ws_pool" ());
|
T_fibers.Fls.run ~pool ~pool_name:"ws_pool" ());
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,14 @@
|
||||||
open Moonpool
|
open Moonpool
|
||||||
module F = Moonpool_fib
|
module F = Moonpool.Fut
|
||||||
|
|
||||||
let ( let@ ) = ( @@ )
|
let ( let@ ) = ( @@ )
|
||||||
|
|
||||||
let () =
|
let () =
|
||||||
let r =
|
let r =
|
||||||
F.main @@ fun runner ->
|
Moonpool.main @@ fun runner ->
|
||||||
let f1 = F.spawn (fun () -> 1) in
|
let f1 = F.spawn ~on:runner (fun () -> 1) in
|
||||||
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
|
let f2 = F.spawn ~on:runner (fun () -> 2) in
|
||||||
let f3 = F.spawn (fun () -> F.await f1 + 10) in
|
let f3 = F.spawn ~on:runner (fun () -> F.await f1 + 10) in
|
||||||
let r = F.await f2 + F.await f3 in
|
let r = F.await f2 + F.await f3 in
|
||||||
assert (r = 13);
|
assert (r = 13);
|
||||||
r
|
r
|
||||||
|
|
@ -19,10 +19,10 @@ let () =
|
||||||
(* run fibers in the background, await them in the main thread *)
|
(* run fibers in the background, await them in the main thread *)
|
||||||
let@ bg = Fifo_pool.with_ ~num_threads:4 () in
|
let@ bg = Fifo_pool.with_ ~num_threads:4 () in
|
||||||
let r =
|
let r =
|
||||||
F.main @@ fun runner ->
|
Moonpool.main @@ fun runner ->
|
||||||
let f1 = F.spawn_top ~on:bg (fun () -> 1) in
|
let f1 = F.spawn ~on:bg (fun () -> 1) in
|
||||||
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
|
let f2 = F.spawn ~on:runner (fun () -> 2) in
|
||||||
let f3 = F.spawn_top ~on:bg (fun () -> F.await f1 + 10) in
|
let f3 = F.spawn ~on:bg (fun () -> F.await f1 + 10) in
|
||||||
let r = F.await f2 + F.await f3 in
|
let r = F.await f2 + F.await f3 in
|
||||||
assert (r = 13);
|
assert (r = 13);
|
||||||
r
|
r
|
||||||
|
|
@ -32,8 +32,8 @@ let () =
|
||||||
let () =
|
let () =
|
||||||
try
|
try
|
||||||
let _r =
|
let _r =
|
||||||
F.main @@ fun _r ->
|
Moonpool.main @@ fun runner ->
|
||||||
let fib = F.spawn (fun () -> failwith "oops") in
|
let fib = F.spawn ~on:runner (fun () -> failwith "oops") in
|
||||||
F.await fib
|
F.await fib
|
||||||
in
|
in
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,6 @@
|
||||||
(libraries
|
(libraries
|
||||||
t_fibers
|
t_fibers
|
||||||
moonpool
|
moonpool
|
||||||
moonpool.fib
|
|
||||||
moonpool-lwt
|
moonpool-lwt
|
||||||
hmap
|
hmap
|
||||||
trace
|
trace
|
||||||
|
|
|
||||||
|
|
@ -2,33 +2,21 @@
|
||||||
start
|
start
|
||||||
1: wait for subs
|
1: wait for subs
|
||||||
1.0.0: await fiber 0
|
1.0.0: await fiber 0
|
||||||
1.0.1: cur fiber[0] is some: true
|
1.0.1: res 0 = 0
|
||||||
1.0.2: cur fiber[0] is some: true
|
|
||||||
1.0.3: res 0 = 0
|
|
||||||
1.1.0: await fiber 1
|
1.1.0: await fiber 1
|
||||||
1.1.1: cur fiber[1] is some: true
|
1.1.1: res 1 = 1
|
||||||
1.1.2: cur fiber[1] is some: true
|
|
||||||
1.1.3: res 1 = 1
|
|
||||||
1.2.0: await fiber 2
|
1.2.0: await fiber 2
|
||||||
1.2.1: cur fiber[2] is some: true
|
1.2.1: res 2 = 2
|
||||||
1.2.2: cur fiber[2] is some: true
|
|
||||||
1.2.3: res 2 = 2
|
|
||||||
1.3.0: await fiber 3
|
1.3.0: await fiber 3
|
||||||
1.3.1: cur fiber[3] is some: true
|
1.3.1: res 3 = 3
|
||||||
1.3.2: cur fiber[3] is some: true
|
|
||||||
1.3.3: res 3 = 3
|
|
||||||
1.4.0: await fiber 4
|
1.4.0: await fiber 4
|
||||||
1.4.1: cur fiber[4] is some: true
|
1.4.1: res 4 = 4
|
||||||
1.4.2: cur fiber[4] is some: true
|
|
||||||
1.4.3: res 4 = 4
|
|
||||||
2: main fiber done
|
2: main fiber done
|
||||||
3: main fiber exited
|
3: main fiber exited
|
||||||
============
|
============
|
||||||
start
|
start
|
||||||
1: start fibers
|
1: start fibers
|
||||||
1.7.1: I'm fiber 7 and I'm about to fail…
|
1.7.1: I'm fiber 7 and I'm about to fail…
|
||||||
1.8.1: sub-fiber 8 was cancelled
|
|
||||||
1.9.1: sub-fiber 9 was cancelled
|
|
||||||
2.0: fiber 0 resolved as ok
|
2.0: fiber 0 resolved as ok
|
||||||
2.1: fiber 1 resolved as ok
|
2.1: fiber 1 resolved as ok
|
||||||
2.2: fiber 2 resolved as ok
|
2.2: fiber 2 resolved as ok
|
||||||
|
|
@ -37,8 +25,8 @@ start
|
||||||
2.5: fiber 5 resolved as ok
|
2.5: fiber 5 resolved as ok
|
||||||
2.6: fiber 6 resolved as ok
|
2.6: fiber 6 resolved as ok
|
||||||
2.7: fiber 7 resolved as error
|
2.7: fiber 7 resolved as error
|
||||||
2.8: fiber 8 resolved as error
|
2.8: fiber 8 resolved as ok
|
||||||
2.9: fiber 9 resolved as error
|
2.9: fiber 9 resolved as ok
|
||||||
3: wait for subs
|
3: wait for subs
|
||||||
4: await fiber 0
|
4: await fiber 0
|
||||||
5: res 0 = 0
|
5: res 0 = 0
|
||||||
|
|
@ -55,7 +43,6 @@ start
|
||||||
16: await fiber 6
|
16: await fiber 6
|
||||||
17: res 6 = 6
|
17: res 6 = 6
|
||||||
18: await fiber 7
|
18: await fiber 7
|
||||||
19: main fiber cancelled with Failure("oh no!")
|
19: main fiber result: error Failure("oh no!")
|
||||||
20: main fiber result: error Failure("oh no!")
|
20: main fib failed with "oh no!"
|
||||||
21: main fib failed with "oh no!"
|
21: main fiber exited
|
||||||
22: main fiber exited
|
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load diff
|
|
@ -1,6 +1,6 @@
|
||||||
open Moonpool
|
open Moonpool
|
||||||
module M_lwt = Moonpool_lwt
|
module M_lwt = Moonpool_lwt
|
||||||
module F = Moonpool_fib
|
module F = Moonpool.Fut
|
||||||
|
|
||||||
let ( let@ ) = ( @@ )
|
let ( let@ ) = ( @@ )
|
||||||
|
|
||||||
|
|
@ -9,9 +9,9 @@ let () =
|
||||||
let@ bg = Fifo_pool.with_ ~num_threads:4 () in
|
let@ bg = Fifo_pool.with_ ~num_threads:4 () in
|
||||||
let r =
|
let r =
|
||||||
M_lwt.lwt_main @@ fun runner ->
|
M_lwt.lwt_main @@ fun runner ->
|
||||||
let f1 = F.spawn_top ~on:bg (fun () -> 1) in
|
let f1 = F.spawn ~on:bg (fun () -> 1) in
|
||||||
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
|
let f2 = F.spawn ~on:runner (fun () -> 2) in
|
||||||
let f3 = F.spawn_top ~on:runner (fun () -> F.await f1 + 10) in
|
let f3 = F.spawn ~on:runner (fun () -> F.await f1 + 10) in
|
||||||
let r = F.await f2 + F.await f3 in
|
let r = F.await f2 + F.await f3 in
|
||||||
assert (r = 13);
|
assert (r = 13);
|
||||||
r
|
r
|
||||||
|
|
@ -24,7 +24,7 @@ let () =
|
||||||
try
|
try
|
||||||
let _r =
|
let _r =
|
||||||
M_lwt.lwt_main @@ fun runner ->
|
M_lwt.lwt_main @@ fun runner ->
|
||||||
let fib = F.spawn_top ~on:runner (fun () -> failwith "oops") in
|
let fib = F.spawn ~on:runner (fun () -> failwith "oops") in
|
||||||
F.await fib
|
F.await fib
|
||||||
in
|
in
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue