mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-05 19:00:33 -05:00
Compare commits
8 commits
4b9e480013
...
95de0e7e27
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
95de0e7e27 | ||
|
|
4924b5f52b | ||
|
|
db9cddf999 | ||
|
|
f9ab951c36 | ||
|
|
2aa2612963 | ||
|
|
f92efa562d | ||
|
|
d957f7b54e | ||
|
|
a26503df0b |
36 changed files with 5942 additions and 6531 deletions
44
README.md
44
README.md
|
|
@ -173,49 +173,9 @@ val expected_sum : int = 5050
|
|||
We have a `Exn_bt.t` type that comes in handy in many places. It bundles together
|
||||
an exception and the backtrace associated with the place the exception was caught.
|
||||
|
||||
### Fibers
|
||||
### Local storage
|
||||
|
||||
On OCaml 5, Moonpool comes with a library `moonpool.fib` (module `Moonpool_fib`)
|
||||
which provides _lightweight fibers_
|
||||
that can run on any Moonpool runner.
|
||||
These fibers are a sort of lightweight thread, dispatched on the runner's
|
||||
background thread(s).
|
||||
Fibers rely on effects to implement `Fiber.await`, suspending themselves until the `await`-ed fiber
|
||||
is done.
|
||||
|
||||
```ocaml
|
||||
# #require "moonpool.fib";;
|
||||
...
|
||||
|
||||
# (* convenient alias *)
|
||||
module F = Moonpool_fib;;
|
||||
module F = Moonpool_fib
|
||||
# F.main (fun _runner ->
|
||||
let f1 = F.spawn (fun () -> fib 10) in
|
||||
let f2 = F.spawn (fun () -> fib 15) in
|
||||
F.await f1 + F.await f2);;
|
||||
- : int = 1076
|
||||
```
|
||||
|
||||
Fibers form a _tree_, where a fiber calling `Fiber.spawn` to start a sub-fiber is
|
||||
the sub-fiber's _parent_.
|
||||
When a parent fails, all its children are cancelled (forced to fail).
|
||||
This is a simple form of [Structured Concurrency](https://en.wikipedia.org/wiki/Structured_concurrency).
|
||||
|
||||
Like a future, a fiber eventually _resolves_ into a value (or an `Exn_bt.t`) that it's possible
|
||||
to `await`. With `Fiber.res : 'a Fiber.t -> 'a Fut.t` it's possible to access that result
|
||||
as a regular future, too.
|
||||
However, this resolution is only done after all the children of the fiber have
|
||||
resolved — the lifetime of fibers forms a well-nested tree in that sense.
|
||||
|
||||
When a fiber is suspended because it `await`s another fiber (or future), the scheduler's
|
||||
thread on which it was running becomes available again and can go on process another task.
|
||||
When the fiber resumes, it will automatically be re-scheduled on the same runner it started on.
|
||||
This means fibers on pool P1 can await fibers from pool P2 and still be resumed on P1.
|
||||
|
||||
In addition to all that, fibers provide _fiber local storage_ (like thread-local storage, but per fiber).
|
||||
This storage is inherited in `spawn` (as a shallow copy only — it's advisable to only
|
||||
put persistent data in storage to avoid confusing aliasing).
|
||||
Moonpool, via picos, provides _task local storage_ (like thread-local storage, but per task).
|
||||
The storage is convenient for carrying around context for cross-cutting concerns such
|
||||
as logging or tracing (e.g. a log tag for the current user or request ID, or a tracing
|
||||
scope).
|
||||
|
|
|
|||
2
dune
2
dune
|
|
@ -3,7 +3,7 @@
|
|||
(flags :standard -strict-sequence -warn-error -a+8 -w +a-4-40-42-70)))
|
||||
|
||||
(mdx
|
||||
(libraries moonpool moonpool.forkjoin moonpool.fib threads)
|
||||
(libraries moonpool moonpool.forkjoin threads)
|
||||
(package moonpool)
|
||||
(enabled_if
|
||||
(>= %{ocaml_version} 5.0)))
|
||||
|
|
|
|||
|
|
@ -1,11 +1,12 @@
|
|||
(** Example from
|
||||
https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 *)
|
||||
(** NOTE: this was an example from
|
||||
https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 but
|
||||
there is no cancelation anymore :) *)
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
|
||||
let () =
|
||||
let@ () = Trace_tef.with_setup () in
|
||||
let@ _ = Moonpool_fib.main in
|
||||
let@ _ = Moonpool.main in
|
||||
|
||||
(* let@ runner = Moonpool.Ws_pool.with_ () in *)
|
||||
let@ runner = Moonpool.Background_thread.with_ () in
|
||||
|
|
@ -13,15 +14,13 @@ let () =
|
|||
(* Pretend this is some long-running read loop *)
|
||||
for i = 1 to 10 do
|
||||
Printf.printf "MAIN LOOP %d\n%!" i;
|
||||
Moonpool_fib.check_if_cancelled ();
|
||||
let _ : _ Moonpool_fib.t =
|
||||
Moonpool_fib.spawn ~on:runner ~protect:false (fun () ->
|
||||
let _ : _ Moonpool.Fut.t =
|
||||
Moonpool.Fut.spawn ~on:runner (fun () ->
|
||||
Printf.printf "RUN FIBER %d\n%!" i;
|
||||
Moonpool_fib.check_if_cancelled ();
|
||||
Format.printf "FIBER %d NOT CANCELLED YET@." i;
|
||||
failwith "BOOM")
|
||||
in
|
||||
Moonpool_fib.yield ();
|
||||
Moonpool.Fut.yield ();
|
||||
(* Thread.delay 0.2; *)
|
||||
(* Thread.delay 0.0001; *)
|
||||
()
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@
|
|||
;(package moonpool)
|
||||
(libraries
|
||||
moonpool
|
||||
moonpool.fib
|
||||
trace
|
||||
trace-tef
|
||||
;tracy-client.trace
|
||||
|
|
|
|||
|
|
@ -21,7 +21,6 @@ let create ~max_size () : _ t =
|
|||
}
|
||||
|
||||
let try_push (self : _ t) x : bool =
|
||||
let res = ref false in
|
||||
if Mutex.try_lock self.mutex then (
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
|
|
@ -33,42 +32,46 @@ let try_push (self : _ t) x : bool =
|
|||
let to_awake = Queue.create () in
|
||||
Queue.push x self.q;
|
||||
Queue.transfer self.pop_waiters to_awake;
|
||||
res := true;
|
||||
Mutex.unlock self.mutex;
|
||||
(* wake up pop triggers if needed. Be careful to do that
|
||||
outside the critical section*)
|
||||
Queue.iter Trigger.signal to_awake
|
||||
Queue.iter Trigger.signal to_awake;
|
||||
true
|
||||
| n when n < self.max_size ->
|
||||
Queue.push x self.q;
|
||||
Mutex.unlock self.mutex
|
||||
| _ -> Mutex.unlock self.mutex
|
||||
);
|
||||
!res
|
||||
Mutex.unlock self.mutex;
|
||||
true
|
||||
| _ ->
|
||||
Mutex.unlock self.mutex;
|
||||
false
|
||||
) else
|
||||
false
|
||||
|
||||
let try_pop (type elt) self : elt option =
|
||||
let res = ref None in
|
||||
if Mutex.try_lock self.mutex then (
|
||||
(match Queue.pop self.q with
|
||||
match Queue.pop self.q with
|
||||
| exception Queue.Empty ->
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
Mutex.unlock self.mutex;
|
||||
if self.closed then
|
||||
raise Closed
|
||||
)
|
||||
| x -> res := Some x);
|
||||
Mutex.unlock self.mutex
|
||||
);
|
||||
!res
|
||||
else
|
||||
None
|
||||
| x ->
|
||||
Mutex.unlock self.mutex;
|
||||
Some x
|
||||
) else
|
||||
None
|
||||
|
||||
let close (self : _ t) : unit =
|
||||
let q = Queue.create () in
|
||||
let triggers_to_signal = Queue.create () in
|
||||
Mutex.lock self.mutex;
|
||||
if not self.closed then (
|
||||
self.closed <- true;
|
||||
Queue.transfer self.pop_waiters q;
|
||||
Queue.transfer self.push_waiters q
|
||||
Queue.transfer self.pop_waiters triggers_to_signal;
|
||||
Queue.transfer self.push_waiters triggers_to_signal
|
||||
);
|
||||
Mutex.unlock self.mutex;
|
||||
Queue.iter Trigger.signal q
|
||||
Queue.iter Trigger.signal triggers_to_signal
|
||||
|
||||
let rec push (self : _ t) x : unit =
|
||||
Mutex.lock self.mutex;
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
(** Channels.
|
||||
|
||||
The channels have bounded size. Push/pop return futures or can use effects
|
||||
to provide an [await]-friendly version.
|
||||
The channels have bounded size. They use effects/await to provide
|
||||
a direct style implementation. Pushing into a full channel,
|
||||
or popping from an empty one, will suspend the current task.
|
||||
|
||||
The channels became bounded since @0.7 .
|
||||
*)
|
||||
|
|
|
|||
|
|
@ -431,12 +431,13 @@ let await (self : 'a t) : 'a =
|
|||
| exception C.Running ->
|
||||
let trigger = Trigger.create () in
|
||||
(* suspend until the future is resolved *)
|
||||
if C.try_attach self trigger then
|
||||
Option.iter Exn_bt.raise @@ Trigger.await trigger;
|
||||
if C.try_attach self trigger then Trigger.await_exn trigger;
|
||||
|
||||
(* un-suspended: we should have a result! *)
|
||||
get_or_fail_exn self
|
||||
|
||||
let yield = Picos.Fiber.yield
|
||||
|
||||
module Infix = struct
|
||||
let[@inline] ( >|= ) x f = map ~f x
|
||||
let[@inline] ( >>= ) x f = bind ~f x
|
||||
|
|
|
|||
|
|
@ -8,12 +8,16 @@
|
|||
(storing a [Ok x] with [x: 'a]), or fail (storing a [Error (exn, bt)] with
|
||||
an exception and the corresponding backtrace).
|
||||
|
||||
Using {!spawn}, it's possible to start a bunch of tasks, obtaining futures,
|
||||
and then use {!await} to get their result in the desired order.
|
||||
|
||||
Combinators such as {!map} and {!join_array} can be used to produce futures
|
||||
from other futures (in a monadic way). Some combinators take a [on] argument
|
||||
to specify a runner on which the intermediate computation takes place; for
|
||||
example [map ~on:pool ~f fut] maps the value in [fut] using function [f],
|
||||
applicatively; the call to [f] happens on the runner [pool] (once [fut]
|
||||
resolves successfully with a value). *)
|
||||
resolves successfully with a value). Be aware that these combinators do not
|
||||
preserve local storage. *)
|
||||
|
||||
type 'a or_error = ('a, Exn_bt.t) result
|
||||
|
||||
|
|
@ -30,7 +34,8 @@ val make : unit -> 'a t * 'a promise
|
|||
|
||||
val make_promise : unit -> 'a promise
|
||||
(** Same as {!make} but returns a single promise (which can be upcast to a
|
||||
future). This is useful mostly to preserve memory.
|
||||
future). This is useful mostly to preserve memory, you probably don't need
|
||||
it.
|
||||
|
||||
How to upcast to a future in the worst case:
|
||||
{[
|
||||
|
|
@ -40,8 +45,11 @@ val make_promise : unit -> 'a promise
|
|||
@since 0.7 *)
|
||||
|
||||
val on_result : 'a t -> ('a or_error -> unit) -> unit
|
||||
(** [on_result fut f] registers [f] to be called in the future when [fut] is set
|
||||
; or calls [f] immediately if [fut] is already set. *)
|
||||
(** [on_result fut f] registers [f] to be called in the future when [fut] is
|
||||
set; or calls [f] immediately if [fut] is already set.
|
||||
|
||||
{b NOTE:} it's ill advised to do meaningful work inside the callback [f].
|
||||
Instead, try to spawn another task on the runner, or use {!await}. *)
|
||||
|
||||
val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
|
||||
(** [on_result_ignore fut f] registers [f] to be called in the future when [fut]
|
||||
|
|
@ -52,13 +60,14 @@ val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
|
|||
exception Already_fulfilled
|
||||
|
||||
val try_cancel : _ promise -> Exn_bt.t -> bool
|
||||
(** [try_cancel promise ebt] tries to cancel the promise, returning [true]. It
|
||||
returns [false] if the promise is already resolved.
|
||||
@since NEXT_RELEASE *)
|
||||
(** [try_cancel promise ebt] tries to cancel the promise using the given
|
||||
exception, returning [true]. It returns [false] if the promise is already
|
||||
resolved.
|
||||
@since 0.9 *)
|
||||
|
||||
val cancel : _ promise -> Exn_bt.t -> unit
|
||||
(** Silent version of {!try_cancel}, ignoring the result.
|
||||
@since NEXT_RELEASE *)
|
||||
@since 0.9 *)
|
||||
|
||||
val fulfill : 'a promise -> 'a or_error -> unit
|
||||
(** Fullfill the promise, setting the future at the same time.
|
||||
|
|
@ -79,6 +88,7 @@ val fail_exn_bt : Exn_bt.t -> _ t
|
|||
@since 0.6 *)
|
||||
|
||||
val of_result : 'a or_error -> 'a t
|
||||
(** Already resolved future from a result. *)
|
||||
|
||||
val is_resolved : _ t -> bool
|
||||
(** [is_resolved fut] is [true] iff [fut] is resolved. *)
|
||||
|
|
@ -136,7 +146,7 @@ val spawn_on_current_runner : (unit -> 'a) -> 'a t
|
|||
|
||||
val reify_error : 'a t -> 'a or_error t
|
||||
(** [reify_error fut] turns a failing future into a non-failing one that contain
|
||||
[Error (exn, bt)]. A non-failing future returning [x] is turned into [Ok x]
|
||||
[Error (exn, bt)]. A non-failing future returning [x] is turned into [Ok x].
|
||||
@since 0.4 *)
|
||||
|
||||
val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t
|
||||
|
|
@ -149,12 +159,18 @@ val bind : ?on:Runner.t -> f:('a -> 'b t) -> 'a t -> 'b t
|
|||
(** [bind ?on ~f fut] returns a new future [fut2] that resolves like the future
|
||||
[f x] if [fut] resolved with [x]; and fails with [e] if [fut] fails with [e]
|
||||
or [f x] raises [e].
|
||||
|
||||
This does not preserve local storage of [fut] inside [f].
|
||||
|
||||
@param on if provided, [f] runs on the given runner *)
|
||||
|
||||
val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t
|
||||
(** [bind_reify_error ?on ~f fut] returns a new future [fut2] that resolves like
|
||||
the future [f (Ok x)] if [fut] resolved with [x]; and resolves like the
|
||||
future [f (Error (exn, bt))] if [fut] fails with [exn] and backtrace [bt].
|
||||
|
||||
This does not preserve local storage of [fut] inside [f].
|
||||
|
||||
@param on if provided, [f] runs on the given runner
|
||||
@since 0.4 *)
|
||||
|
||||
|
|
@ -182,6 +198,7 @@ val join_array : 'a t array -> 'a array t
|
|||
val join_list : 'a t list -> 'a list t
|
||||
(** Wait for all the futures in the list. Fails if any future fails. *)
|
||||
|
||||
(** Advanced primitives for synchronization *)
|
||||
module Advanced : sig
|
||||
val barrier_on_abstract_container_of_futures :
|
||||
iter:(('a t -> unit) -> 'cont -> unit) ->
|
||||
|
|
@ -234,7 +251,9 @@ val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t
|
|||
|
||||
(** {2 Await}
|
||||
|
||||
{b NOTE} This is only available on OCaml 5. *)
|
||||
This suspends the current task using an OCaml 5 algebraic effect, and makes
|
||||
preparations for the task to be resumed once the future has been resolved.
|
||||
*)
|
||||
|
||||
val await : 'a t -> 'a
|
||||
(** [await fut] suspends the current tasks until [fut] is fulfilled, then
|
||||
|
|
@ -244,7 +263,11 @@ val await : 'a t -> 'a
|
|||
@since 0.3
|
||||
|
||||
This must only be run from inside the runner itself. The runner must support
|
||||
{!Suspend_}. {b NOTE}: only on OCaml 5.x *)
|
||||
{!Suspend_}. *)
|
||||
|
||||
val yield : unit -> unit
|
||||
(** Like {!Moonpool.yield}.
|
||||
@since NEXT_RELEASE *)
|
||||
|
||||
(** {2 Blocking} *)
|
||||
|
||||
|
|
@ -252,7 +275,7 @@ val wait_block : 'a t -> 'a or_error
|
|||
(** [wait_block fut] blocks the current thread until [fut] is resolved, and
|
||||
returns its value.
|
||||
|
||||
{b NOTE}: A word of warning: this will monopolize the calling thread until
|
||||
{b NOTE:} A word of warning: this will monopolize the calling thread until
|
||||
the future resolves. This can also easily cause deadlocks, if enough threads
|
||||
in a pool call [wait_block] on futures running on the same pool or a pool
|
||||
depending on it.
|
||||
|
|
@ -265,7 +288,10 @@ val wait_block : 'a t -> 'a or_error
|
|||
the deadlock. *)
|
||||
|
||||
val wait_block_exn : 'a t -> 'a
|
||||
(** Same as {!wait_block} but re-raises the exception if the future failed. *)
|
||||
(** Same as {!wait_block} but re-raises the exception if the future failed.
|
||||
|
||||
{b NOTE:} do check the cautionary note in {!wait_block} concerning
|
||||
deadlocks. *)
|
||||
|
||||
(** {2 Infix operators}
|
||||
|
||||
|
|
@ -297,9 +323,10 @@ module Infix_local = Infix
|
|||
|
||||
module Private_ : sig
|
||||
val unsafe_promise_of_fut : 'a t -> 'a promise
|
||||
(** please do not use *)
|
||||
(** Do not use unless you know exactly what you are doing. *)
|
||||
|
||||
val as_computation : 'a t -> 'a Picos.Computation.t
|
||||
(** Picos compat *)
|
||||
end
|
||||
|
||||
(**/**)
|
||||
|
|
|
|||
|
|
@ -8,15 +8,15 @@ let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
|
|||
in
|
||||
let runner = Fifo_pool.Private_.runner_of_state worker_st in
|
||||
try
|
||||
let fiber = Fiber.spawn_top ~on:runner (fun () -> f runner) in
|
||||
Fiber.on_result fiber (fun _ -> Runner.shutdown_without_waiting runner);
|
||||
let fiber = Fut.spawn ~on:runner (fun () -> f runner) in
|
||||
Fut.on_result fiber (fun _ -> Runner.shutdown_without_waiting runner);
|
||||
|
||||
(* run the main thread *)
|
||||
Moonpool.Private.Worker_loop_.worker_loop worker_st
|
||||
Worker_loop_.worker_loop worker_st
|
||||
~block_signals (* do not disturb existing thread *)
|
||||
~ops:Fifo_pool.Private_.worker_ops;
|
||||
|
||||
match Fiber.peek fiber with
|
||||
match Fut.peek fiber with
|
||||
| Some (Ok x) -> x
|
||||
| Some (Error ebt) -> Exn_bt.raise ebt
|
||||
| None -> assert false
|
||||
|
|
@ -13,16 +13,18 @@
|
|||
to {!Background_thread} in that there's a single worker to process
|
||||
tasks/fibers.
|
||||
|
||||
This handles effects, including the ones in {!Fiber}.
|
||||
This handles the concurency effects used in moonpool, including [await] and
|
||||
[yield].
|
||||
|
||||
@since 0.6 *)
|
||||
This module was migrated from the late [Moonpool_fib].
|
||||
|
||||
val main : (Moonpool.Runner.t -> 'a) -> 'a
|
||||
@since NEXT_RELEASE *)
|
||||
|
||||
val main : (Runner.t -> 'a) -> 'a
|
||||
(** [main f] runs [f()] in a scope that handles effects, including
|
||||
{!Fiber.await}.
|
||||
|
||||
This scope can run background tasks as well, in a cooperative fashion. *)
|
||||
|
||||
val main' : ?block_signals:bool -> unit -> (Moonpool.Runner.t -> 'a) -> 'a
|
||||
(** Same as {!main} but with room for optional arguments.
|
||||
@since 0.7 *)
|
||||
val main' : ?block_signals:bool -> unit -> (Runner.t -> 'a) -> 'a
|
||||
(** Same as {!main} but with room for optional arguments. *)
|
||||
|
|
@ -23,6 +23,7 @@ module Exn_bt = Exn_bt
|
|||
module Fifo_pool = Fifo_pool
|
||||
module Fut = Fut
|
||||
module Lock = Lock
|
||||
module Main = Main
|
||||
module Immediate_runner = struct end
|
||||
module Runner = Runner
|
||||
module Task_local_storage = Task_local_storage
|
||||
|
|
@ -30,6 +31,9 @@ module Thread_local_storage = Thread_local_storage
|
|||
module Trigger = Trigger
|
||||
module Ws_pool = Ws_pool
|
||||
|
||||
(* re-export main *)
|
||||
include Main
|
||||
|
||||
module Private = struct
|
||||
module Ws_deque_ = Ws_deque_
|
||||
module Worker_loop_ = Worker_loop_
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ module Fifo_pool = Fifo_pool
|
|||
module Background_thread = Background_thread
|
||||
module Runner = Runner
|
||||
module Trigger = Trigger
|
||||
module Main = Main
|
||||
|
||||
module Immediate_runner : sig end
|
||||
[@@deprecated "use Moonpool_fib.Main"]
|
||||
|
|
@ -80,7 +81,7 @@ val await : 'a Fut.t -> 'a
|
|||
val yield : unit -> unit
|
||||
(** Yield from the current task, must be run on a moonpool runner. Only on OCaml
|
||||
>= 5.0.
|
||||
@since NEXT_RELEASE *)
|
||||
@since 0.9 *)
|
||||
|
||||
module Lock = Lock
|
||||
module Fut = Fut
|
||||
|
|
@ -205,6 +206,10 @@ module Atomic = Atomic
|
|||
This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic]
|
||||
module on OCaml 5. *)
|
||||
|
||||
include module type of struct
|
||||
include Main
|
||||
end
|
||||
|
||||
(**/**)
|
||||
|
||||
(** Private internals, with no stability guarantees *)
|
||||
|
|
|
|||
|
|
@ -1,6 +0,0 @@
|
|||
(library
|
||||
(name moonpool_fib)
|
||||
(public_name moonpool.fib)
|
||||
(synopsis "Fibers and structured concurrency for Moonpool")
|
||||
(libraries moonpool picos)
|
||||
(flags :standard -open Moonpool_private -open Moonpool))
|
||||
334
src/fib/fiber.ml
334
src/fib/fiber.ml
|
|
@ -1,334 +0,0 @@
|
|||
open Moonpool.Private.Types_
|
||||
module A = Atomic
|
||||
module FM = Handle.Map
|
||||
module Int_map = Map.Make (Int)
|
||||
module PF = Picos.Fiber
|
||||
module FLS = Picos.Fiber.FLS
|
||||
|
||||
type 'a callback = 'a Exn_bt.result -> unit
|
||||
(** Callbacks that are called when a fiber is done. *)
|
||||
|
||||
type cancel_callback = Exn_bt.t -> unit
|
||||
|
||||
let prom_of_fut : 'a Fut.t -> 'a Fut.promise =
|
||||
Fut.Private_.unsafe_promise_of_fut
|
||||
|
||||
(* TODO: replace with picos structured at some point? *)
|
||||
module Private_ = struct
|
||||
type pfiber = PF.t
|
||||
|
||||
type 'a t = {
|
||||
id: Handle.t; (** unique identifier for this fiber *)
|
||||
state: 'a state A.t; (** Current state in the lifetime of the fiber *)
|
||||
res: 'a Fut.t;
|
||||
runner: Runner.t;
|
||||
pfiber: pfiber; (** Associated picos fiber *)
|
||||
}
|
||||
|
||||
and 'a state =
|
||||
| Alive of {
|
||||
children: children;
|
||||
on_cancel: cancel_callback Int_map.t;
|
||||
cancel_id: int;
|
||||
}
|
||||
| Terminating_or_done of 'a Exn_bt.result A.t
|
||||
|
||||
and children = any FM.t
|
||||
and any = Any : _ t -> any [@@unboxed]
|
||||
|
||||
(** Key to access the current moonpool.fiber. *)
|
||||
let k_current_fiber : any FLS.t = FLS.create ()
|
||||
|
||||
exception Not_set = FLS.Not_set
|
||||
|
||||
let[@inline] get_cur_from_exn (pfiber : pfiber) : any =
|
||||
FLS.get_exn pfiber k_current_fiber
|
||||
|
||||
let[@inline] get_cur_exn () : any =
|
||||
get_cur_from_exn @@ get_current_fiber_exn ()
|
||||
|
||||
let[@inline] get_cur_opt () = try Some (get_cur_exn ()) with _ -> None
|
||||
|
||||
let[@inline] is_closed (self : _ t) =
|
||||
match A.get self.state with
|
||||
| Alive _ -> false
|
||||
| Terminating_or_done _ -> true
|
||||
end
|
||||
|
||||
include Private_
|
||||
|
||||
let create_ ~pfiber ~runner ~res () : 'a t =
|
||||
let id = Handle.generate_fresh () in
|
||||
{
|
||||
state =
|
||||
A.make
|
||||
@@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 };
|
||||
id;
|
||||
res;
|
||||
runner;
|
||||
pfiber;
|
||||
}
|
||||
|
||||
let create_done_ ~res () : _ t =
|
||||
let id = Handle.generate_fresh () in
|
||||
{
|
||||
state =
|
||||
A.make
|
||||
@@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 };
|
||||
id;
|
||||
res;
|
||||
runner = Runner.dummy;
|
||||
pfiber = Moonpool.Private.Types_._dummy_fiber;
|
||||
}
|
||||
|
||||
let[@inline] return x = create_done_ ~res:(Fut.return x) ()
|
||||
let[@inline] fail ebt = create_done_ ~res:(Fut.fail_exn_bt ebt) ()
|
||||
let[@inline] res self = self.res
|
||||
let[@inline] peek self = Fut.peek self.res
|
||||
let[@inline] is_done self = Fut.is_done self.res
|
||||
let[@inline] is_success self = Fut.is_success self.res
|
||||
let[@inline] is_cancelled self = Fut.is_failed self.res
|
||||
let[@inline] on_result (self : _ t) f = Fut.on_result self.res f
|
||||
let[@inline] await self = Fut.await self.res
|
||||
let[@inline] wait_block self = Fut.wait_block self.res
|
||||
let[@inline] wait_block_exn self = Fut.wait_block_exn self.res
|
||||
|
||||
(** Resolve [promise] once [children] are all done *)
|
||||
let resolve_once_children_are_done_ ~children ~promise
|
||||
(res : 'a Exn_bt.result A.t) : unit =
|
||||
let n_children = FM.cardinal children in
|
||||
if n_children > 0 then (
|
||||
(* wait for all children to be done *)
|
||||
let n_waiting = A.make (FM.cardinal children) in
|
||||
let on_child_finish (r : _ result) =
|
||||
(* make sure the parent fails if any child fails *)
|
||||
(match r with
|
||||
| Ok _ -> ()
|
||||
| Error ebt -> A.set res (Error ebt));
|
||||
|
||||
(* if we're the last to finish, resolve the parent fiber's [res] *)
|
||||
if A.fetch_and_add n_waiting (-1) = 1 then (
|
||||
let res = A.get res in
|
||||
Fut.fulfill promise res
|
||||
)
|
||||
in
|
||||
FM.iter (fun _ (Any f) -> Fut.on_result f.res on_child_finish) children
|
||||
) else
|
||||
Fut.fulfill promise @@ A.get res
|
||||
|
||||
let rec resolve_as_failed_ : type a. a t -> Exn_bt.t -> unit =
|
||||
fun self ebt ->
|
||||
let promise = prom_of_fut self.res in
|
||||
while
|
||||
match A.get self.state with
|
||||
| Alive { children; cancel_id = _; on_cancel } as old ->
|
||||
let new_st = Terminating_or_done (A.make @@ Error ebt) in
|
||||
if A.compare_and_set self.state old new_st then (
|
||||
(* here, unlike in {!resolve_fiber}, we immediately cancel children *)
|
||||
cancel_children_ ~children ebt;
|
||||
Int_map.iter (fun _ cb -> cb ebt) on_cancel;
|
||||
resolve_once_children_are_done_ ~children ~promise (A.make @@ Error ebt);
|
||||
false
|
||||
) else
|
||||
true
|
||||
| Terminating_or_done _ -> false
|
||||
do
|
||||
()
|
||||
done
|
||||
|
||||
(** Cancel eagerly all children *)
|
||||
and cancel_children_ ebt ~children : unit =
|
||||
FM.iter (fun _ (Any f) -> resolve_as_failed_ f ebt) children
|
||||
|
||||
type cancel_handle = int
|
||||
|
||||
let add_on_cancel (self : _ t) cb : cancel_handle =
|
||||
let h = ref 0 in
|
||||
while
|
||||
match A.get self.state with
|
||||
| Alive { children; cancel_id; on_cancel } as old ->
|
||||
let new_st =
|
||||
Alive
|
||||
{
|
||||
children;
|
||||
cancel_id = cancel_id + 1;
|
||||
on_cancel = Int_map.add cancel_id cb on_cancel;
|
||||
}
|
||||
in
|
||||
if A.compare_and_set self.state old new_st then (
|
||||
h := cancel_id;
|
||||
false
|
||||
) else
|
||||
true
|
||||
| Terminating_or_done r ->
|
||||
(match A.get r with
|
||||
| Error ebt -> cb ebt
|
||||
| Ok _ -> ());
|
||||
false
|
||||
do
|
||||
()
|
||||
done;
|
||||
!h
|
||||
|
||||
let remove_on_cancel (self : _ t) h =
|
||||
while
|
||||
match A.get self.state with
|
||||
| Alive ({ on_cancel; _ } as alive) as old ->
|
||||
let new_st =
|
||||
Alive { alive with on_cancel = Int_map.remove h on_cancel }
|
||||
in
|
||||
not (A.compare_and_set self.state old new_st)
|
||||
| Terminating_or_done _ -> false
|
||||
do
|
||||
()
|
||||
done
|
||||
|
||||
let with_on_cancel (self : _ t) cb (k : unit -> 'a) : 'a =
|
||||
let h = add_on_cancel self cb in
|
||||
Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
|
||||
|
||||
(** Successfully resolve the fiber. This might still fail if some children
|
||||
failed. *)
|
||||
let resolve_ok_ (self : 'a t) (r : 'a) : unit =
|
||||
let r = A.make @@ Ok r in
|
||||
let promise = prom_of_fut self.res in
|
||||
while
|
||||
match A.get self.state with
|
||||
| Alive { children; _ } as old ->
|
||||
let new_st = Terminating_or_done r in
|
||||
if A.compare_and_set self.state old new_st then (
|
||||
resolve_once_children_are_done_ ~children ~promise r;
|
||||
false
|
||||
) else
|
||||
true
|
||||
| Terminating_or_done _ -> false
|
||||
do
|
||||
()
|
||||
done
|
||||
|
||||
let remove_child_ (self : _ t) (child : _ t) =
|
||||
while
|
||||
match A.get self.state with
|
||||
| Alive ({ children; _ } as alive) as old ->
|
||||
let new_st =
|
||||
Alive { alive with children = FM.remove child.id children }
|
||||
in
|
||||
not (A.compare_and_set self.state old new_st)
|
||||
| _ -> false
|
||||
do
|
||||
()
|
||||
done
|
||||
|
||||
(** Add a child to [self].
|
||||
@param protected if true, the child's failure will not affect [self]. *)
|
||||
let add_child_ ~protect (self : _ t) (child : _ t) =
|
||||
while
|
||||
match A.get self.state with
|
||||
| Alive ({ children; _ } as alive) as old ->
|
||||
let new_st =
|
||||
Alive { alive with children = FM.add child.id (Any child) children }
|
||||
in
|
||||
|
||||
if A.compare_and_set self.state old new_st then (
|
||||
(* make sure to remove [child] from [self.children] once it's done;
|
||||
fail [self] is [child] failed and [protect=false] *)
|
||||
Fut.on_result child.res (function
|
||||
| Ok _ -> remove_child_ self child
|
||||
| Error ebt ->
|
||||
(* child failed, we must fail too *)
|
||||
remove_child_ self child;
|
||||
if not protect then resolve_as_failed_ self ebt);
|
||||
false
|
||||
) else
|
||||
true
|
||||
| Terminating_or_done r ->
|
||||
(match A.get r with
|
||||
| Error ebt ->
|
||||
(* cancel child immediately *)
|
||||
resolve_as_failed_ child ebt
|
||||
| Ok _ -> ());
|
||||
false
|
||||
do
|
||||
()
|
||||
done
|
||||
|
||||
let spawn_ ~parent ~runner (f : unit -> 'a) : 'a t =
|
||||
let res, _ = Fut.make () in
|
||||
let pfiber = PF.create ~forbid:false (Fut.Private_.as_computation res) in
|
||||
|
||||
(* copy local hmap from parent, if present *)
|
||||
Option.iter
|
||||
(fun (p : _ t) -> Fls.Private_hmap_ls_.copy_fls p.pfiber pfiber)
|
||||
parent;
|
||||
|
||||
(match parent with
|
||||
| Some p when is_closed p -> failwith "spawn: nursery is closed"
|
||||
| _ -> ());
|
||||
let fib = create_ ~pfiber ~runner ~res () in
|
||||
|
||||
let run () =
|
||||
(* make sure the fiber is accessible from inside itself *)
|
||||
FLS.set pfiber k_current_fiber (Any fib);
|
||||
try
|
||||
let res = f () in
|
||||
resolve_ok_ fib res
|
||||
with exn ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
let ebt = Exn_bt.make exn bt in
|
||||
resolve_as_failed_ fib ebt
|
||||
in
|
||||
|
||||
Runner.run_async ~fiber:pfiber runner run;
|
||||
|
||||
fib
|
||||
|
||||
let spawn_top ~on f : _ t = spawn_ ~runner:on ~parent:None f
|
||||
|
||||
let spawn ?on ?(protect = true) f : _ t =
|
||||
(* spawn [f()] with a copy of our local storage *)
|
||||
let (Any p) =
|
||||
try get_cur_exn ()
|
||||
with Not_set ->
|
||||
failwith "Fiber.spawn: must be run from within another fiber."
|
||||
in
|
||||
|
||||
let runner =
|
||||
match on with
|
||||
| Some r -> r
|
||||
| None -> p.runner
|
||||
in
|
||||
let child = spawn_ ~parent:(Some p) ~runner f in
|
||||
add_child_ ~protect p child;
|
||||
child
|
||||
|
||||
let[@inline] spawn_ignore ?on ?protect f : unit =
|
||||
ignore (spawn ?on ?protect f : _ t)
|
||||
|
||||
let[@inline] spawn_top_ignore ~on f : unit = ignore (spawn_top ~on f : _ t)
|
||||
|
||||
let[@inline] self () : any =
|
||||
match get_cur_exn () with
|
||||
| exception Not_set -> failwith "Fiber.self: must be run from inside a fiber."
|
||||
| f -> f
|
||||
|
||||
let with_on_self_cancel cb (k : unit -> 'a) : 'a =
|
||||
let (Any self) = self () in
|
||||
let h = add_on_cancel self cb in
|
||||
Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
|
||||
|
||||
let[@inline] check_if_cancelled_ (self : _ t) = PF.check self.pfiber
|
||||
|
||||
let check_if_cancelled () =
|
||||
match get_cur_exn () with
|
||||
| exception Not_set ->
|
||||
failwith "Fiber.check_if_cancelled: must be run from inside a fiber."
|
||||
| Any self -> check_if_cancelled_ self
|
||||
|
||||
let yield () : unit =
|
||||
match get_cur_exn () with
|
||||
| exception Not_set ->
|
||||
failwith "Fiber.yield: must be run from inside a fiber."
|
||||
| Any self ->
|
||||
check_if_cancelled_ self;
|
||||
PF.yield ();
|
||||
check_if_cancelled_ self
|
||||
|
|
@ -1,150 +0,0 @@
|
|||
(** Fibers.
|
||||
|
||||
A fiber is a lightweight computation that runs cooperatively alongside other
|
||||
fibers. In the context of moonpool, fibers have additional properties:
|
||||
|
||||
- they run in a moonpool runner
|
||||
- they form a simple supervision tree, enabling a limited form of structured
|
||||
concurrency *)
|
||||
|
||||
type cancel_callback = Exn_bt.t -> unit
|
||||
(** A callback used in case of cancellation *)
|
||||
|
||||
(**/**)
|
||||
|
||||
(** Do not rely on this, it is internal implementation details. *)
|
||||
module Private_ : sig
|
||||
type 'a state
|
||||
type pfiber
|
||||
|
||||
type 'a t = private {
|
||||
id: Handle.t; (** unique identifier for this fiber *)
|
||||
state: 'a state Atomic.t; (** Current state in the lifetime of the fiber *)
|
||||
res: 'a Fut.t;
|
||||
runner: Runner.t;
|
||||
pfiber: pfiber;
|
||||
}
|
||||
(** Type definition, exposed so that {!any} can be unboxed. Please do not rely
|
||||
on that. *)
|
||||
|
||||
type any = Any : _ t -> any [@@unboxed]
|
||||
|
||||
exception Not_set
|
||||
|
||||
val get_cur_exn : unit -> any
|
||||
(** [get_cur_exn ()] either returns the current fiber, or
|
||||
@raise Not_set if run outside a fiber. *)
|
||||
|
||||
val get_cur_opt : unit -> any option
|
||||
end
|
||||
|
||||
(**/**)
|
||||
|
||||
type 'a t = 'a Private_.t
|
||||
(** A fiber returning a value of type ['a]. *)
|
||||
|
||||
val res : 'a t -> 'a Fut.t
|
||||
(** Future result of the fiber. *)
|
||||
|
||||
type 'a callback = 'a Exn_bt.result -> unit
|
||||
(** Callbacks that are called when a fiber is done. *)
|
||||
|
||||
(** Type erased fiber *)
|
||||
type any = Private_.any = Any : _ t -> any [@@unboxed]
|
||||
|
||||
val return : 'a -> 'a t
|
||||
val fail : Exn_bt.t -> _ t
|
||||
|
||||
val self : unit -> any
|
||||
(** [self ()] is the current fiber. Must be run from inside a fiber.
|
||||
@raise Failure if not run from inside a fiber. *)
|
||||
|
||||
val peek : 'a t -> 'a Fut.or_error option
|
||||
(** Peek inside the future result *)
|
||||
|
||||
val is_done : _ t -> bool
|
||||
(** Has the fiber completed? *)
|
||||
|
||||
val is_cancelled : _ t -> bool
|
||||
(** Has the fiber completed with a failure? *)
|
||||
|
||||
val is_success : _ t -> bool
|
||||
(** Has the fiber completed with a value? *)
|
||||
|
||||
val await : 'a t -> 'a
|
||||
(** [await fib] is like [Fut.await (res fib)] *)
|
||||
|
||||
val wait_block_exn : 'a t -> 'a
|
||||
(** [wait_block_exn fib] is [Fut.wait_block_exn (res fib)]. {b NOTE}: See
|
||||
{!Fut.wait_block} for warnings about deadlocks. *)
|
||||
|
||||
val wait_block : 'a t -> 'a Fut.or_error
|
||||
(** [wait_block fib] is [Fut.wait_block (res fib)]. {b NOTE}: See
|
||||
{!Fut.wait_block} for warnings about deadlocks. *)
|
||||
|
||||
val check_if_cancelled : unit -> unit
|
||||
(** Check if the current fiber is cancelled, in which case this raises. Must be
|
||||
run from inside a fiber.
|
||||
@raise e if the current fiber is cancelled with exception [e]
|
||||
@raise Failure if not run from a fiber. *)
|
||||
|
||||
val yield : unit -> unit
|
||||
(** Yield control to the scheduler from the current fiber.
|
||||
@raise Failure if not run from inside a fiber. *)
|
||||
|
||||
type cancel_handle
|
||||
(** An opaque handle for a single cancel callback in a fiber *)
|
||||
|
||||
val add_on_cancel : _ t -> cancel_callback -> cancel_handle
|
||||
(** [add_on_cancel fib cb] adds [cb] to the list of cancel callbacks for [fib].
|
||||
If [fib] is already cancelled, [cb] is called immediately. *)
|
||||
|
||||
val remove_on_cancel : _ t -> cancel_handle -> unit
|
||||
(** [remove_on_cancel fib h] removes the cancel callback associated with handle
|
||||
[h]. *)
|
||||
|
||||
val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'a
|
||||
(** [with_on_cancel fib cb (fun () -> <e>)] evaluates [e] in a scope in which,
|
||||
if the fiber [fib] is cancelled, [cb()] is called. If [e] returns without
|
||||
the fiber being cancelled, this callback is removed. *)
|
||||
|
||||
val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'a
|
||||
(** [with_on_self_cancel cb f] calls [f()] in a scope where [cb] is added to the
|
||||
cancel callbacks of the current fiber; and [f()] terminates, [cb] is removed
|
||||
from the list. *)
|
||||
|
||||
val on_result : 'a t -> 'a callback -> unit
|
||||
(** Wait for fiber to be done and call the callback with the result. If the
|
||||
fiber is done already then the callback is invoked immediately with its
|
||||
result. *)
|
||||
|
||||
val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t
|
||||
(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. This
|
||||
fiber is not the child of any other fiber: its lifetime is only determined
|
||||
by the lifetime of [f()]. *)
|
||||
|
||||
val spawn : ?on:Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a t
|
||||
(** [spawn ~protect f] spawns a sub-fiber [f_child] from a running fiber
|
||||
[parent]. The sub-fiber [f_child] is attached to the current fiber and fails
|
||||
if the current fiber [parent] fails.
|
||||
|
||||
@param on
|
||||
if provided, start the fiber on the given runner. If not provided, use the
|
||||
parent's runner.
|
||||
@param protect
|
||||
if true, when [f_child] fails, it does not affect [parent]. If false,
|
||||
[f_child] failing also causes [parent] to fail (and therefore all other
|
||||
children of [parent]). Default is [true].
|
||||
|
||||
Must be run from inside a fiber.
|
||||
@raise Failure if not run from inside a fiber. *)
|
||||
|
||||
val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit
|
||||
(** [spawn_ignore f] is [ignore (spawn f)]. The fiber will still affect
|
||||
termination of the parent, ie. the parent will exit only after this new
|
||||
fiber exits.
|
||||
@param on the optional runner to use, added since 0.7 *)
|
||||
|
||||
val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit
|
||||
(** Like {!spawn_top} but ignores the result.
|
||||
@since 0.7 *)
|
||||
|
|
@ -1 +0,0 @@
|
|||
include Task_local_storage
|
||||
|
|
@ -1,17 +0,0 @@
|
|||
(** Fiber-local storage.
|
||||
|
||||
This storage is associated to the current fiber, just like thread-local
|
||||
storage is associated with the current thread.
|
||||
|
||||
See {!Moonpool.Task_local_storage} for more general information, as this is
|
||||
based on it.
|
||||
|
||||
{b NOTE}: it's important to note that, while each fiber has its own storage,
|
||||
spawning a sub-fiber [f2] from a fiber [f1] will only do a shallow copy of
|
||||
the storage. Values inside [f1]'s storage will be physically shared with
|
||||
[f2]. It is thus recommended to store only persistent values in the local
|
||||
storage. *)
|
||||
|
||||
include module type of struct
|
||||
include Task_local_storage
|
||||
end
|
||||
|
|
@ -1,14 +0,0 @@
|
|||
module A = Atomic
|
||||
|
||||
type t = int
|
||||
|
||||
let counter_ = A.make 0
|
||||
let equal : t -> t -> bool = ( = )
|
||||
let compare : t -> t -> int = Stdlib.compare
|
||||
let[@inline] generate_fresh () = A.fetch_and_add counter_ 1
|
||||
|
||||
(* TODO: better hash *)
|
||||
let[@inline] hash x = x land max_int
|
||||
|
||||
module Set = Set.Make (Int)
|
||||
module Map = Map.Make (Int)
|
||||
|
|
@ -1,17 +0,0 @@
|
|||
(** The unique name of a fiber.
|
||||
|
||||
Each fiber has a unique handle that can be used to refer to it in maps or
|
||||
sets. *)
|
||||
|
||||
type t = private int
|
||||
(** Unique, opaque identifier for a fiber. *)
|
||||
|
||||
val equal : t -> t -> bool
|
||||
val compare : t -> t -> int
|
||||
val hash : t -> int
|
||||
|
||||
val generate_fresh : unit -> t
|
||||
(** Generate a fresh, unique identifier *)
|
||||
|
||||
module Set : Set.S with type elt = t
|
||||
module Map : Map.S with type key = t
|
||||
|
|
@ -1,12 +0,0 @@
|
|||
(** Fibers for moonpool.
|
||||
|
||||
See {!Fiber} for the most important explanations.
|
||||
|
||||
@since 0.6. *)
|
||||
|
||||
module Fiber = Fiber
|
||||
module Fls = Fls
|
||||
module Handle = Handle
|
||||
module Main = Main
|
||||
include Fiber
|
||||
include Main
|
||||
|
|
@ -64,7 +64,7 @@ module State_ = struct
|
|||
done;
|
||||
|
||||
(* wait for the other computation to be done *)
|
||||
if !must_await then Trigger.await trigger |> Option.iter Exn_bt.raise
|
||||
if !must_await then Trigger.await_exn trigger
|
||||
| Right_solved _ | Both_solved _ -> assert false
|
||||
end
|
||||
|
||||
|
|
@ -144,7 +144,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
|
|||
i := !i + len_range
|
||||
done;
|
||||
|
||||
Trigger.await trigger |> Option.iter Exn_bt.raise;
|
||||
Trigger.await_exn trigger;
|
||||
Option.iter Exn_bt.raise @@ A.get failure;
|
||||
()
|
||||
)
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@
|
|||
(>= %{ocaml_version} 5.0))
|
||||
(libraries
|
||||
(re_export moonpool)
|
||||
moonpool.fib
|
||||
picos
|
||||
(re_export lwt)
|
||||
lwt.unix))
|
||||
|
|
|
|||
|
|
@ -289,7 +289,7 @@ let[@inline] is_setup () = Option.is_some @@ Atomic.get Scheduler_state.cur_st
|
|||
let spawn_lwt f : _ Lwt.t =
|
||||
let st = Main_state.get_st () in
|
||||
let lwt_fut, lwt_prom = Lwt.wait () in
|
||||
Moonpool_fib.spawn_top_ignore ~on:st.as_runner (fun () ->
|
||||
M.run_async st.as_runner (fun () ->
|
||||
try
|
||||
let x = f () in
|
||||
Lwt.wakeup lwt_prom x
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@
|
|||
(libraries
|
||||
t_fibers
|
||||
moonpool
|
||||
moonpool.fib
|
||||
trace
|
||||
trace-tef
|
||||
qcheck-core
|
||||
|
|
|
|||
|
|
@ -2,4 +2,4 @@
|
|||
(name t_fibers)
|
||||
(package moonpool)
|
||||
(optional)
|
||||
(libraries moonpool moonpool.fib trace qcheck-core hmap))
|
||||
(libraries moonpool trace qcheck-core hmap))
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
open! Moonpool
|
||||
module Chan = Moonpool.Chan
|
||||
module Exn_bt = Moonpool.Exn_bt
|
||||
module A = Atomic
|
||||
module F = Moonpool_fib.Fiber
|
||||
module Fut = Moonpool.Fut
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
|
||||
|
|
@ -49,24 +50,23 @@ let logf = Log_.logf
|
|||
let run1 ~runner () =
|
||||
Printf.printf "============\nstart\n%!";
|
||||
let clock = ref TS.init in
|
||||
let fib =
|
||||
F.spawn_top ~on:runner @@ fun () ->
|
||||
let fut =
|
||||
Fut.spawn ~on:runner @@ fun () ->
|
||||
let chan_progress = Chan.create ~max_size:4 () in
|
||||
let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in
|
||||
|
||||
let subs =
|
||||
List.init 5 (fun i ->
|
||||
F.spawn ~protect:false @@ fun _n ->
|
||||
Fut.spawn ~on:runner @@ fun _n ->
|
||||
Thread.delay (float i *. 0.01);
|
||||
Chan.pop chans.(i);
|
||||
Chan.push chan_progress i;
|
||||
F.check_if_cancelled ();
|
||||
i)
|
||||
in
|
||||
|
||||
logf (TS.tick_get clock) "wait for subs";
|
||||
|
||||
F.spawn_ignore (fun () ->
|
||||
Moonpool.run_async runner (fun () ->
|
||||
for i = 0 to 4 do
|
||||
Chan.push chans.(i) ();
|
||||
let i' = Chan.pop chan_progress in
|
||||
|
|
@ -78,19 +78,15 @@ let run1 ~runner () =
|
|||
(fun i f ->
|
||||
let clock = ref (0 :: i :: clock0) in
|
||||
logf !clock "await fiber %d" i;
|
||||
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
|
||||
(Option.is_some @@ F.Private_.get_cur_opt ());
|
||||
let res = F.await f in
|
||||
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
|
||||
(Option.is_some @@ F.Private_.get_cur_opt ());
|
||||
F.yield ();
|
||||
let res = Fut.await f in
|
||||
Fut.yield ();
|
||||
logf (TS.tick_get clock) "res %d = %d" i res)
|
||||
subs);
|
||||
|
||||
logf (TS.tick_get clock) "main fiber done"
|
||||
in
|
||||
|
||||
Fut.await @@ F.res fib;
|
||||
Fut.await fut;
|
||||
logf (TS.tick_get clock) "main fiber exited";
|
||||
Log_.print_and_clear ();
|
||||
()
|
||||
|
|
@ -99,15 +95,11 @@ let run2 ~runner () =
|
|||
(* same but now, cancel one of the sub-fibers *)
|
||||
Printf.printf "============\nstart\n";
|
||||
|
||||
let clock = ref TS.init in
|
||||
let fib =
|
||||
F.spawn_top ~on:runner @@ fun () ->
|
||||
let@ () =
|
||||
F.with_on_self_cancel (fun ebt ->
|
||||
logf (TS.tick_get clock) "main fiber cancelled with %s"
|
||||
@@ Exn_bt.show ebt)
|
||||
in
|
||||
let to_await = ref [] in
|
||||
|
||||
let clock = ref TS.init in
|
||||
let fut =
|
||||
Fut.spawn ~on:runner @@ fun () ->
|
||||
let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in
|
||||
let chan_progress = Chan.create ~max_size:4 () in
|
||||
|
||||
|
|
@ -116,11 +108,7 @@ let run2 ~runner () =
|
|||
let clock0 = !clock in
|
||||
List.init 10 (fun i ->
|
||||
let clock = ref (0 :: i :: clock0) in
|
||||
F.spawn ~protect:false @@ fun _n ->
|
||||
let@ () =
|
||||
F.with_on_self_cancel (fun _ ->
|
||||
logf (TS.tick_get clock) "sub-fiber %d was cancelled" i)
|
||||
in
|
||||
Fut.spawn ~on:runner @@ fun _n ->
|
||||
Thread.delay 0.002;
|
||||
|
||||
(* sync for determinism *)
|
||||
|
|
@ -132,46 +120,51 @@ let run2 ~runner () =
|
|||
failwith "oh no!"
|
||||
);
|
||||
|
||||
F.check_if_cancelled ();
|
||||
i)
|
||||
in
|
||||
|
||||
let post = TS.tick_get clock in
|
||||
List.iteri
|
||||
(fun i fib ->
|
||||
F.on_result fib (function
|
||||
Fut.on_result fib (function
|
||||
| Ok _ -> logf (i :: post) "fiber %d resolved as ok" i
|
||||
| Error _ -> logf (i :: post) "fiber %d resolved as error" i))
|
||||
subs;
|
||||
|
||||
(* sequentialize the fibers, for determinism *)
|
||||
F.spawn_ignore (fun () ->
|
||||
for j = 0 to 9 do
|
||||
Chan.push chans_unblock.(j) ();
|
||||
let j' = Chan.pop chan_progress in
|
||||
assert (j = j')
|
||||
done);
|
||||
let sender =
|
||||
Fut.spawn ~on:runner (fun () ->
|
||||
for j = 0 to 9 do
|
||||
Chan.push chans_unblock.(j) ();
|
||||
let j' = Chan.pop chan_progress in
|
||||
assert (j = j')
|
||||
done)
|
||||
in
|
||||
to_await := sender :: !to_await;
|
||||
|
||||
logf (TS.tick_get clock) "wait for subs";
|
||||
List.iteri
|
||||
(fun i f ->
|
||||
logf (TS.tick_get clock) "await fiber %d" i;
|
||||
let res = F.await f in
|
||||
let res = Fut.await f in
|
||||
logf (TS.tick_get clock) "res %d = %d" i res)
|
||||
subs;
|
||||
logf (TS.tick_get clock) "yield";
|
||||
F.yield ();
|
||||
Fut.yield ();
|
||||
logf (TS.tick_get clock) "yielded";
|
||||
logf (TS.tick_get clock) "main fiber done"
|
||||
in
|
||||
|
||||
F.on_result fib (function
|
||||
Fut.on_result fut (function
|
||||
| Ok () -> logf (TS.tick_get clock) "main fiber result: ok"
|
||||
| Error ebt ->
|
||||
logf (TS.tick_get clock) "main fiber result: error %s" (Exn_bt.show ebt));
|
||||
|
||||
(try Fut.await @@ F.res fib
|
||||
(try Fut.await fut
|
||||
with Failure msg -> logf (TS.tick_get clock) "main fib failed with %S" msg);
|
||||
logf (TS.tick_get clock) "main fiber exited";
|
||||
|
||||
List.iter Fut.await !to_await;
|
||||
|
||||
Log_.print_and_clear ();
|
||||
()
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
open! Moonpool
|
||||
module A = Atomic
|
||||
module F = Moonpool_fib.Fiber
|
||||
module FLS = Moonpool_fib.Fls
|
||||
module F = Moonpool.Fut
|
||||
module FLS = Moonpool.Task_local_storage
|
||||
|
||||
(* ### dummy little tracing system with local storage *)
|
||||
|
||||
|
|
@ -122,7 +122,7 @@ let run ~pool ~pool_name () =
|
|||
|
||||
let subs =
|
||||
List.init 2 (fun idx_sub_sub ->
|
||||
F.spawn ~protect:true (fun () ->
|
||||
F.spawn ~on:pool (fun () ->
|
||||
sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ()))
|
||||
in
|
||||
List.iter F.await subs
|
||||
|
|
@ -133,8 +133,7 @@ let run ~pool ~pool_name () =
|
|||
|
||||
let subs =
|
||||
List.init 2 (fun k ->
|
||||
F.spawn ~protect:true @@ fun () ->
|
||||
sub_child ~idx ~idx_child ~idx_sub:k ())
|
||||
F.spawn ~on:pool @@ fun () -> sub_child ~idx ~idx_child ~idx_sub:k ())
|
||||
in
|
||||
|
||||
let@ () =
|
||||
|
|
@ -149,16 +148,14 @@ let run ~pool ~pool_name () =
|
|||
|
||||
let subs =
|
||||
List.init 5 (fun j ->
|
||||
F.spawn ~protect:true @@ fun () -> top_child ~idx ~idx_child:j ())
|
||||
F.spawn ~on:pool @@ fun () -> top_child ~idx ~idx_child:j ())
|
||||
in
|
||||
|
||||
List.iter F.await subs
|
||||
in
|
||||
|
||||
Printf.printf "run test on pool = %s\n" pool_name;
|
||||
let fibs =
|
||||
List.init 8 (fun idx -> F.spawn_top ~on:pool (fun () -> top idx))
|
||||
in
|
||||
let fibs = List.init 8 (fun idx -> F.spawn ~on:pool (fun () -> top idx)) in
|
||||
List.iter F.await fibs;
|
||||
|
||||
Printf.printf "tracing complete\n";
|
||||
|
|
|
|||
|
|
@ -2,33 +2,21 @@
|
|||
start
|
||||
1: wait for subs
|
||||
1.0.0: await fiber 0
|
||||
1.0.1: cur fiber[0] is some: true
|
||||
1.0.2: cur fiber[0] is some: true
|
||||
1.0.3: res 0 = 0
|
||||
1.0.1: res 0 = 0
|
||||
1.1.0: await fiber 1
|
||||
1.1.1: cur fiber[1] is some: true
|
||||
1.1.2: cur fiber[1] is some: true
|
||||
1.1.3: res 1 = 1
|
||||
1.1.1: res 1 = 1
|
||||
1.2.0: await fiber 2
|
||||
1.2.1: cur fiber[2] is some: true
|
||||
1.2.2: cur fiber[2] is some: true
|
||||
1.2.3: res 2 = 2
|
||||
1.2.1: res 2 = 2
|
||||
1.3.0: await fiber 3
|
||||
1.3.1: cur fiber[3] is some: true
|
||||
1.3.2: cur fiber[3] is some: true
|
||||
1.3.3: res 3 = 3
|
||||
1.3.1: res 3 = 3
|
||||
1.4.0: await fiber 4
|
||||
1.4.1: cur fiber[4] is some: true
|
||||
1.4.2: cur fiber[4] is some: true
|
||||
1.4.3: res 4 = 4
|
||||
1.4.1: res 4 = 4
|
||||
2: main fiber done
|
||||
3: main fiber exited
|
||||
============
|
||||
start
|
||||
1: start fibers
|
||||
1.7.1: I'm fiber 7 and I'm about to fail…
|
||||
1.8.1: sub-fiber 8 was cancelled
|
||||
1.9.1: sub-fiber 9 was cancelled
|
||||
2.0: fiber 0 resolved as ok
|
||||
2.1: fiber 1 resolved as ok
|
||||
2.2: fiber 2 resolved as ok
|
||||
|
|
@ -37,8 +25,8 @@ start
|
|||
2.5: fiber 5 resolved as ok
|
||||
2.6: fiber 6 resolved as ok
|
||||
2.7: fiber 7 resolved as error
|
||||
2.8: fiber 8 resolved as error
|
||||
2.9: fiber 9 resolved as error
|
||||
2.8: fiber 8 resolved as ok
|
||||
2.9: fiber 9 resolved as ok
|
||||
3: wait for subs
|
||||
4: await fiber 0
|
||||
5: res 0 = 0
|
||||
|
|
@ -55,7 +43,6 @@ start
|
|||
16: await fiber 6
|
||||
17: res 6 = 6
|
||||
18: await fiber 7
|
||||
19: main fiber cancelled with Failure("oh no!")
|
||||
20: main fiber result: error Failure("oh no!")
|
||||
21: main fib failed with "oh no!"
|
||||
22: main fiber exited
|
||||
19: main fiber result: error Failure("oh no!")
|
||||
20: main fib failed with "oh no!"
|
||||
21: main fiber exited
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
let ( let@ ) = ( @@ )
|
||||
|
||||
let () =
|
||||
let@ runner = Moonpool_fib.main in
|
||||
let@ runner = Moonpool.main in
|
||||
T_fibers.Fib.run1 ~runner ();
|
||||
T_fibers.Fib.run2 ~runner ()
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -3,7 +3,7 @@ open! Moonpool
|
|||
let ( let@ ) = ( @@ )
|
||||
|
||||
let () =
|
||||
let@ _ = Moonpool_fib.main in
|
||||
let@ _ = Moonpool.main in
|
||||
(let@ pool = Ws_pool.with_ () in
|
||||
T_fibers.Fls.run ~pool ~pool_name:"ws_pool" ());
|
||||
|
||||
|
|
|
|||
|
|
@ -1,14 +1,14 @@
|
|||
open Moonpool
|
||||
module F = Moonpool_fib
|
||||
module F = Moonpool.Fut
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
|
||||
let () =
|
||||
let r =
|
||||
F.main @@ fun runner ->
|
||||
let f1 = F.spawn (fun () -> 1) in
|
||||
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
|
||||
let f3 = F.spawn (fun () -> F.await f1 + 10) in
|
||||
Moonpool.main @@ fun runner ->
|
||||
let f1 = F.spawn ~on:runner (fun () -> 1) in
|
||||
let f2 = F.spawn ~on:runner (fun () -> 2) in
|
||||
let f3 = F.spawn ~on:runner (fun () -> F.await f1 + 10) in
|
||||
let r = F.await f2 + F.await f3 in
|
||||
assert (r = 13);
|
||||
r
|
||||
|
|
@ -19,10 +19,10 @@ let () =
|
|||
(* run fibers in the background, await them in the main thread *)
|
||||
let@ bg = Fifo_pool.with_ ~num_threads:4 () in
|
||||
let r =
|
||||
F.main @@ fun runner ->
|
||||
let f1 = F.spawn_top ~on:bg (fun () -> 1) in
|
||||
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
|
||||
let f3 = F.spawn_top ~on:bg (fun () -> F.await f1 + 10) in
|
||||
Moonpool.main @@ fun runner ->
|
||||
let f1 = F.spawn ~on:bg (fun () -> 1) in
|
||||
let f2 = F.spawn ~on:runner (fun () -> 2) in
|
||||
let f3 = F.spawn ~on:bg (fun () -> F.await f1 + 10) in
|
||||
let r = F.await f2 + F.await f3 in
|
||||
assert (r = 13);
|
||||
r
|
||||
|
|
@ -32,8 +32,8 @@ let () =
|
|||
let () =
|
||||
try
|
||||
let _r =
|
||||
F.main @@ fun _r ->
|
||||
let fib = F.spawn (fun () -> failwith "oops") in
|
||||
Moonpool.main @@ fun runner ->
|
||||
let fib = F.spawn ~on:runner (fun () -> failwith "oops") in
|
||||
F.await fib
|
||||
in
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@
|
|||
(libraries
|
||||
t_fibers
|
||||
moonpool
|
||||
moonpool.fib
|
||||
moonpool-lwt
|
||||
hmap
|
||||
trace
|
||||
|
|
|
|||
|
|
@ -2,33 +2,21 @@
|
|||
start
|
||||
1: wait for subs
|
||||
1.0.0: await fiber 0
|
||||
1.0.1: cur fiber[0] is some: true
|
||||
1.0.2: cur fiber[0] is some: true
|
||||
1.0.3: res 0 = 0
|
||||
1.0.1: res 0 = 0
|
||||
1.1.0: await fiber 1
|
||||
1.1.1: cur fiber[1] is some: true
|
||||
1.1.2: cur fiber[1] is some: true
|
||||
1.1.3: res 1 = 1
|
||||
1.1.1: res 1 = 1
|
||||
1.2.0: await fiber 2
|
||||
1.2.1: cur fiber[2] is some: true
|
||||
1.2.2: cur fiber[2] is some: true
|
||||
1.2.3: res 2 = 2
|
||||
1.2.1: res 2 = 2
|
||||
1.3.0: await fiber 3
|
||||
1.3.1: cur fiber[3] is some: true
|
||||
1.3.2: cur fiber[3] is some: true
|
||||
1.3.3: res 3 = 3
|
||||
1.3.1: res 3 = 3
|
||||
1.4.0: await fiber 4
|
||||
1.4.1: cur fiber[4] is some: true
|
||||
1.4.2: cur fiber[4] is some: true
|
||||
1.4.3: res 4 = 4
|
||||
1.4.1: res 4 = 4
|
||||
2: main fiber done
|
||||
3: main fiber exited
|
||||
============
|
||||
start
|
||||
1: start fibers
|
||||
1.7.1: I'm fiber 7 and I'm about to fail…
|
||||
1.8.1: sub-fiber 8 was cancelled
|
||||
1.9.1: sub-fiber 9 was cancelled
|
||||
2.0: fiber 0 resolved as ok
|
||||
2.1: fiber 1 resolved as ok
|
||||
2.2: fiber 2 resolved as ok
|
||||
|
|
@ -37,8 +25,8 @@ start
|
|||
2.5: fiber 5 resolved as ok
|
||||
2.6: fiber 6 resolved as ok
|
||||
2.7: fiber 7 resolved as error
|
||||
2.8: fiber 8 resolved as error
|
||||
2.9: fiber 9 resolved as error
|
||||
2.8: fiber 8 resolved as ok
|
||||
2.9: fiber 9 resolved as ok
|
||||
3: wait for subs
|
||||
4: await fiber 0
|
||||
5: res 0 = 0
|
||||
|
|
@ -55,7 +43,6 @@ start
|
|||
16: await fiber 6
|
||||
17: res 6 = 6
|
||||
18: await fiber 7
|
||||
19: main fiber cancelled with Failure("oh no!")
|
||||
20: main fiber result: error Failure("oh no!")
|
||||
21: main fib failed with "oh no!"
|
||||
22: main fiber exited
|
||||
19: main fiber result: error Failure("oh no!")
|
||||
20: main fib failed with "oh no!"
|
||||
21: main fiber exited
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -1,6 +1,6 @@
|
|||
open Moonpool
|
||||
module M_lwt = Moonpool_lwt
|
||||
module F = Moonpool_fib
|
||||
module F = Moonpool.Fut
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
|
||||
|
|
@ -9,9 +9,9 @@ let () =
|
|||
let@ bg = Fifo_pool.with_ ~num_threads:4 () in
|
||||
let r =
|
||||
M_lwt.lwt_main @@ fun runner ->
|
||||
let f1 = F.spawn_top ~on:bg (fun () -> 1) in
|
||||
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
|
||||
let f3 = F.spawn_top ~on:runner (fun () -> F.await f1 + 10) in
|
||||
let f1 = F.spawn ~on:bg (fun () -> 1) in
|
||||
let f2 = F.spawn ~on:runner (fun () -> 2) in
|
||||
let f3 = F.spawn ~on:runner (fun () -> F.await f1 + 10) in
|
||||
let r = F.await f2 + F.await f3 in
|
||||
assert (r = 13);
|
||||
r
|
||||
|
|
@ -24,7 +24,7 @@ let () =
|
|||
try
|
||||
let _r =
|
||||
M_lwt.lwt_main @@ fun runner ->
|
||||
let fib = F.spawn_top ~on:runner (fun () -> failwith "oops") in
|
||||
let fib = F.spawn ~on:runner (fun () -> failwith "oops") in
|
||||
F.await fib
|
||||
in
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue