Compare commits

...

8 commits

Author SHA1 Message Date
Simon Cruanes
95de0e7e27
test: update readme and the mdx test 2025-10-25 21:50:47 -04:00
Simon Cruanes
4924b5f52b
test: update tests, removing the fibers and cancellation tests 2025-10-25 21:50:47 -04:00
Simon Cruanes
db9cddf999
feat core: add Main, salvaged from moonpool.fib 2025-10-25 21:50:46 -04:00
Simon Cruanes
f9ab951c36
remove moonpool.fib
it's complicated and hard to use in practice, because it's not obvious
if a piece of code is running under another fiber or not, so
`Fiber.spawn` might fail because it has no parent.

So in practice we've been using `Fiber.spawn_top`… which has no
interest over just using `Fut.spawn`.
2025-10-25 21:50:46 -04:00
Simon Cruanes
2aa2612963
doc for Fut 2025-10-25 21:50:46 -04:00
Simon Cruanes
f92efa562d
doc 2025-10-25 21:50:46 -04:00
Simon Cruanes
d957f7b54e
small refactor
Some checks failed
github pages / Deploy doc (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / build-compat (push) Has been cancelled
Build and Test / format (push) Has been cancelled
2025-10-25 21:46:20 -04:00
Simon Cruanes
a26503df0b
refactor chan; fix bug in Chan.try_push
we could return `false` even though we succeeded in pushing a value into
the chan.
2025-10-25 21:21:03 -04:00
36 changed files with 5942 additions and 6531 deletions

View file

@ -173,49 +173,9 @@ val expected_sum : int = 5050
We have a `Exn_bt.t` type that comes in handy in many places. It bundles together
an exception and the backtrace associated with the place the exception was caught.
### Fibers
### Local storage
On OCaml 5, Moonpool comes with a library `moonpool.fib` (module `Moonpool_fib`)
which provides _lightweight fibers_
that can run on any Moonpool runner.
These fibers are a sort of lightweight thread, dispatched on the runner's
background thread(s).
Fibers rely on effects to implement `Fiber.await`, suspending themselves until the `await`-ed fiber
is done.
```ocaml
# #require "moonpool.fib";;
...
# (* convenient alias *)
module F = Moonpool_fib;;
module F = Moonpool_fib
# F.main (fun _runner ->
let f1 = F.spawn (fun () -> fib 10) in
let f2 = F.spawn (fun () -> fib 15) in
F.await f1 + F.await f2);;
- : int = 1076
```
Fibers form a _tree_, where a fiber calling `Fiber.spawn` to start a sub-fiber is
the sub-fiber's _parent_.
When a parent fails, all its children are cancelled (forced to fail).
This is a simple form of [Structured Concurrency](https://en.wikipedia.org/wiki/Structured_concurrency).
Like a future, a fiber eventually _resolves_ into a value (or an `Exn_bt.t`) that it's possible
to `await`. With `Fiber.res : 'a Fiber.t -> 'a Fut.t` it's possible to access that result
as a regular future, too.
However, this resolution is only done after all the children of the fiber have
resolved — the lifetime of fibers forms a well-nested tree in that sense.
When a fiber is suspended because it `await`s another fiber (or future), the scheduler's
thread on which it was running becomes available again and can go on process another task.
When the fiber resumes, it will automatically be re-scheduled on the same runner it started on.
This means fibers on pool P1 can await fibers from pool P2 and still be resumed on P1.
In addition to all that, fibers provide _fiber local storage_ (like thread-local storage, but per fiber).
This storage is inherited in `spawn` (as a shallow copy only — it's advisable to only
put persistent data in storage to avoid confusing aliasing).
Moonpool, via picos, provides _task local storage_ (like thread-local storage, but per task).
The storage is convenient for carrying around context for cross-cutting concerns such
as logging or tracing (e.g. a log tag for the current user or request ID, or a tracing
scope).

2
dune
View file

@ -3,7 +3,7 @@
(flags :standard -strict-sequence -warn-error -a+8 -w +a-4-40-42-70)))
(mdx
(libraries moonpool moonpool.forkjoin moonpool.fib threads)
(libraries moonpool moonpool.forkjoin threads)
(package moonpool)
(enabled_if
(>= %{ocaml_version} 5.0)))

View file

@ -1,11 +1,12 @@
(** Example from
https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 *)
(** NOTE: this was an example from
https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 but
there is no cancelation anymore :) *)
let ( let@ ) = ( @@ )
let () =
let@ () = Trace_tef.with_setup () in
let@ _ = Moonpool_fib.main in
let@ _ = Moonpool.main in
(* let@ runner = Moonpool.Ws_pool.with_ () in *)
let@ runner = Moonpool.Background_thread.with_ () in
@ -13,15 +14,13 @@ let () =
(* Pretend this is some long-running read loop *)
for i = 1 to 10 do
Printf.printf "MAIN LOOP %d\n%!" i;
Moonpool_fib.check_if_cancelled ();
let _ : _ Moonpool_fib.t =
Moonpool_fib.spawn ~on:runner ~protect:false (fun () ->
let _ : _ Moonpool.Fut.t =
Moonpool.Fut.spawn ~on:runner (fun () ->
Printf.printf "RUN FIBER %d\n%!" i;
Moonpool_fib.check_if_cancelled ();
Format.printf "FIBER %d NOT CANCELLED YET@." i;
failwith "BOOM")
in
Moonpool_fib.yield ();
Moonpool.Fut.yield ();
(* Thread.delay 0.2; *)
(* Thread.delay 0.0001; *)
()

View file

@ -5,7 +5,6 @@
;(package moonpool)
(libraries
moonpool
moonpool.fib
trace
trace-tef
;tracy-client.trace

View file

@ -21,7 +21,6 @@ let create ~max_size () : _ t =
}
let try_push (self : _ t) x : bool =
let res = ref false in
if Mutex.try_lock self.mutex then (
if self.closed then (
Mutex.unlock self.mutex;
@ -33,42 +32,46 @@ let try_push (self : _ t) x : bool =
let to_awake = Queue.create () in
Queue.push x self.q;
Queue.transfer self.pop_waiters to_awake;
res := true;
Mutex.unlock self.mutex;
(* wake up pop triggers if needed. Be careful to do that
outside the critical section*)
Queue.iter Trigger.signal to_awake
Queue.iter Trigger.signal to_awake;
true
| n when n < self.max_size ->
Queue.push x self.q;
Mutex.unlock self.mutex
| _ -> Mutex.unlock self.mutex
);
!res
Mutex.unlock self.mutex;
true
| _ ->
Mutex.unlock self.mutex;
false
) else
false
let try_pop (type elt) self : elt option =
let res = ref None in
if Mutex.try_lock self.mutex then (
(match Queue.pop self.q with
match Queue.pop self.q with
| exception Queue.Empty ->
if self.closed then (
Mutex.unlock self.mutex;
Mutex.unlock self.mutex;
if self.closed then
raise Closed
)
| x -> res := Some x);
Mutex.unlock self.mutex
);
!res
else
None
| x ->
Mutex.unlock self.mutex;
Some x
) else
None
let close (self : _ t) : unit =
let q = Queue.create () in
let triggers_to_signal = Queue.create () in
Mutex.lock self.mutex;
if not self.closed then (
self.closed <- true;
Queue.transfer self.pop_waiters q;
Queue.transfer self.push_waiters q
Queue.transfer self.pop_waiters triggers_to_signal;
Queue.transfer self.push_waiters triggers_to_signal
);
Mutex.unlock self.mutex;
Queue.iter Trigger.signal q
Queue.iter Trigger.signal triggers_to_signal
let rec push (self : _ t) x : unit =
Mutex.lock self.mutex;

View file

@ -1,7 +1,8 @@
(** Channels.
The channels have bounded size. Push/pop return futures or can use effects
to provide an [await]-friendly version.
The channels have bounded size. They use effects/await to provide
a direct style implementation. Pushing into a full channel,
or popping from an empty one, will suspend the current task.
The channels became bounded since @0.7 .
*)

View file

@ -431,12 +431,13 @@ let await (self : 'a t) : 'a =
| exception C.Running ->
let trigger = Trigger.create () in
(* suspend until the future is resolved *)
if C.try_attach self trigger then
Option.iter Exn_bt.raise @@ Trigger.await trigger;
if C.try_attach self trigger then Trigger.await_exn trigger;
(* un-suspended: we should have a result! *)
get_or_fail_exn self
let yield = Picos.Fiber.yield
module Infix = struct
let[@inline] ( >|= ) x f = map ~f x
let[@inline] ( >>= ) x f = bind ~f x

View file

@ -8,12 +8,16 @@
(storing a [Ok x] with [x: 'a]), or fail (storing a [Error (exn, bt)] with
an exception and the corresponding backtrace).
Using {!spawn}, it's possible to start a bunch of tasks, obtaining futures,
and then use {!await} to get their result in the desired order.
Combinators such as {!map} and {!join_array} can be used to produce futures
from other futures (in a monadic way). Some combinators take a [on] argument
to specify a runner on which the intermediate computation takes place; for
example [map ~on:pool ~f fut] maps the value in [fut] using function [f],
applicatively; the call to [f] happens on the runner [pool] (once [fut]
resolves successfully with a value). *)
resolves successfully with a value). Be aware that these combinators do not
preserve local storage. *)
type 'a or_error = ('a, Exn_bt.t) result
@ -30,7 +34,8 @@ val make : unit -> 'a t * 'a promise
val make_promise : unit -> 'a promise
(** Same as {!make} but returns a single promise (which can be upcast to a
future). This is useful mostly to preserve memory.
future). This is useful mostly to preserve memory, you probably don't need
it.
How to upcast to a future in the worst case:
{[
@ -40,8 +45,11 @@ val make_promise : unit -> 'a promise
@since 0.7 *)
val on_result : 'a t -> ('a or_error -> unit) -> unit
(** [on_result fut f] registers [f] to be called in the future when [fut] is set
; or calls [f] immediately if [fut] is already set. *)
(** [on_result fut f] registers [f] to be called in the future when [fut] is
set; or calls [f] immediately if [fut] is already set.
{b NOTE:} it's ill advised to do meaningful work inside the callback [f].
Instead, try to spawn another task on the runner, or use {!await}. *)
val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
(** [on_result_ignore fut f] registers [f] to be called in the future when [fut]
@ -52,13 +60,14 @@ val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
exception Already_fulfilled
val try_cancel : _ promise -> Exn_bt.t -> bool
(** [try_cancel promise ebt] tries to cancel the promise, returning [true]. It
returns [false] if the promise is already resolved.
@since NEXT_RELEASE *)
(** [try_cancel promise ebt] tries to cancel the promise using the given
exception, returning [true]. It returns [false] if the promise is already
resolved.
@since 0.9 *)
val cancel : _ promise -> Exn_bt.t -> unit
(** Silent version of {!try_cancel}, ignoring the result.
@since NEXT_RELEASE *)
@since 0.9 *)
val fulfill : 'a promise -> 'a or_error -> unit
(** Fullfill the promise, setting the future at the same time.
@ -79,6 +88,7 @@ val fail_exn_bt : Exn_bt.t -> _ t
@since 0.6 *)
val of_result : 'a or_error -> 'a t
(** Already resolved future from a result. *)
val is_resolved : _ t -> bool
(** [is_resolved fut] is [true] iff [fut] is resolved. *)
@ -136,7 +146,7 @@ val spawn_on_current_runner : (unit -> 'a) -> 'a t
val reify_error : 'a t -> 'a or_error t
(** [reify_error fut] turns a failing future into a non-failing one that contain
[Error (exn, bt)]. A non-failing future returning [x] is turned into [Ok x]
[Error (exn, bt)]. A non-failing future returning [x] is turned into [Ok x].
@since 0.4 *)
val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t
@ -149,12 +159,18 @@ val bind : ?on:Runner.t -> f:('a -> 'b t) -> 'a t -> 'b t
(** [bind ?on ~f fut] returns a new future [fut2] that resolves like the future
[f x] if [fut] resolved with [x]; and fails with [e] if [fut] fails with [e]
or [f x] raises [e].
This does not preserve local storage of [fut] inside [f].
@param on if provided, [f] runs on the given runner *)
val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t
(** [bind_reify_error ?on ~f fut] returns a new future [fut2] that resolves like
the future [f (Ok x)] if [fut] resolved with [x]; and resolves like the
future [f (Error (exn, bt))] if [fut] fails with [exn] and backtrace [bt].
This does not preserve local storage of [fut] inside [f].
@param on if provided, [f] runs on the given runner
@since 0.4 *)
@ -182,6 +198,7 @@ val join_array : 'a t array -> 'a array t
val join_list : 'a t list -> 'a list t
(** Wait for all the futures in the list. Fails if any future fails. *)
(** Advanced primitives for synchronization *)
module Advanced : sig
val barrier_on_abstract_container_of_futures :
iter:(('a t -> unit) -> 'cont -> unit) ->
@ -234,7 +251,9 @@ val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t
(** {2 Await}
{b NOTE} This is only available on OCaml 5. *)
This suspends the current task using an OCaml 5 algebraic effect, and makes
preparations for the task to be resumed once the future has been resolved.
*)
val await : 'a t -> 'a
(** [await fut] suspends the current tasks until [fut] is fulfilled, then
@ -244,7 +263,11 @@ val await : 'a t -> 'a
@since 0.3
This must only be run from inside the runner itself. The runner must support
{!Suspend_}. {b NOTE}: only on OCaml 5.x *)
{!Suspend_}. *)
val yield : unit -> unit
(** Like {!Moonpool.yield}.
@since NEXT_RELEASE *)
(** {2 Blocking} *)
@ -252,7 +275,7 @@ val wait_block : 'a t -> 'a or_error
(** [wait_block fut] blocks the current thread until [fut] is resolved, and
returns its value.
{b NOTE}: A word of warning: this will monopolize the calling thread until
{b NOTE:} A word of warning: this will monopolize the calling thread until
the future resolves. This can also easily cause deadlocks, if enough threads
in a pool call [wait_block] on futures running on the same pool or a pool
depending on it.
@ -265,7 +288,10 @@ val wait_block : 'a t -> 'a or_error
the deadlock. *)
val wait_block_exn : 'a t -> 'a
(** Same as {!wait_block} but re-raises the exception if the future failed. *)
(** Same as {!wait_block} but re-raises the exception if the future failed.
{b NOTE:} do check the cautionary note in {!wait_block} concerning
deadlocks. *)
(** {2 Infix operators}
@ -297,9 +323,10 @@ module Infix_local = Infix
module Private_ : sig
val unsafe_promise_of_fut : 'a t -> 'a promise
(** please do not use *)
(** Do not use unless you know exactly what you are doing. *)
val as_computation : 'a t -> 'a Picos.Computation.t
(** Picos compat *)
end
(**/**)

View file

@ -8,15 +8,15 @@ let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
in
let runner = Fifo_pool.Private_.runner_of_state worker_st in
try
let fiber = Fiber.spawn_top ~on:runner (fun () -> f runner) in
Fiber.on_result fiber (fun _ -> Runner.shutdown_without_waiting runner);
let fiber = Fut.spawn ~on:runner (fun () -> f runner) in
Fut.on_result fiber (fun _ -> Runner.shutdown_without_waiting runner);
(* run the main thread *)
Moonpool.Private.Worker_loop_.worker_loop worker_st
Worker_loop_.worker_loop worker_st
~block_signals (* do not disturb existing thread *)
~ops:Fifo_pool.Private_.worker_ops;
match Fiber.peek fiber with
match Fut.peek fiber with
| Some (Ok x) -> x
| Some (Error ebt) -> Exn_bt.raise ebt
| None -> assert false

View file

@ -13,16 +13,18 @@
to {!Background_thread} in that there's a single worker to process
tasks/fibers.
This handles effects, including the ones in {!Fiber}.
This handles the concurency effects used in moonpool, including [await] and
[yield].
@since 0.6 *)
This module was migrated from the late [Moonpool_fib].
val main : (Moonpool.Runner.t -> 'a) -> 'a
@since NEXT_RELEASE *)
val main : (Runner.t -> 'a) -> 'a
(** [main f] runs [f()] in a scope that handles effects, including
{!Fiber.await}.
This scope can run background tasks as well, in a cooperative fashion. *)
val main' : ?block_signals:bool -> unit -> (Moonpool.Runner.t -> 'a) -> 'a
(** Same as {!main} but with room for optional arguments.
@since 0.7 *)
val main' : ?block_signals:bool -> unit -> (Runner.t -> 'a) -> 'a
(** Same as {!main} but with room for optional arguments. *)

View file

@ -23,6 +23,7 @@ module Exn_bt = Exn_bt
module Fifo_pool = Fifo_pool
module Fut = Fut
module Lock = Lock
module Main = Main
module Immediate_runner = struct end
module Runner = Runner
module Task_local_storage = Task_local_storage
@ -30,6 +31,9 @@ module Thread_local_storage = Thread_local_storage
module Trigger = Trigger
module Ws_pool = Ws_pool
(* re-export main *)
include Main
module Private = struct
module Ws_deque_ = Ws_deque_
module Worker_loop_ = Worker_loop_

View file

@ -13,6 +13,7 @@ module Fifo_pool = Fifo_pool
module Background_thread = Background_thread
module Runner = Runner
module Trigger = Trigger
module Main = Main
module Immediate_runner : sig end
[@@deprecated "use Moonpool_fib.Main"]
@ -80,7 +81,7 @@ val await : 'a Fut.t -> 'a
val yield : unit -> unit
(** Yield from the current task, must be run on a moonpool runner. Only on OCaml
>= 5.0.
@since NEXT_RELEASE *)
@since 0.9 *)
module Lock = Lock
module Fut = Fut
@ -205,6 +206,10 @@ module Atomic = Atomic
This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic]
module on OCaml 5. *)
include module type of struct
include Main
end
(**/**)
(** Private internals, with no stability guarantees *)

View file

@ -1,6 +0,0 @@
(library
(name moonpool_fib)
(public_name moonpool.fib)
(synopsis "Fibers and structured concurrency for Moonpool")
(libraries moonpool picos)
(flags :standard -open Moonpool_private -open Moonpool))

View file

@ -1,334 +0,0 @@
open Moonpool.Private.Types_
module A = Atomic
module FM = Handle.Map
module Int_map = Map.Make (Int)
module PF = Picos.Fiber
module FLS = Picos.Fiber.FLS
type 'a callback = 'a Exn_bt.result -> unit
(** Callbacks that are called when a fiber is done. *)
type cancel_callback = Exn_bt.t -> unit
let prom_of_fut : 'a Fut.t -> 'a Fut.promise =
Fut.Private_.unsafe_promise_of_fut
(* TODO: replace with picos structured at some point? *)
module Private_ = struct
type pfiber = PF.t
type 'a t = {
id: Handle.t; (** unique identifier for this fiber *)
state: 'a state A.t; (** Current state in the lifetime of the fiber *)
res: 'a Fut.t;
runner: Runner.t;
pfiber: pfiber; (** Associated picos fiber *)
}
and 'a state =
| Alive of {
children: children;
on_cancel: cancel_callback Int_map.t;
cancel_id: int;
}
| Terminating_or_done of 'a Exn_bt.result A.t
and children = any FM.t
and any = Any : _ t -> any [@@unboxed]
(** Key to access the current moonpool.fiber. *)
let k_current_fiber : any FLS.t = FLS.create ()
exception Not_set = FLS.Not_set
let[@inline] get_cur_from_exn (pfiber : pfiber) : any =
FLS.get_exn pfiber k_current_fiber
let[@inline] get_cur_exn () : any =
get_cur_from_exn @@ get_current_fiber_exn ()
let[@inline] get_cur_opt () = try Some (get_cur_exn ()) with _ -> None
let[@inline] is_closed (self : _ t) =
match A.get self.state with
| Alive _ -> false
| Terminating_or_done _ -> true
end
include Private_
let create_ ~pfiber ~runner ~res () : 'a t =
let id = Handle.generate_fresh () in
{
state =
A.make
@@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 };
id;
res;
runner;
pfiber;
}
let create_done_ ~res () : _ t =
let id = Handle.generate_fresh () in
{
state =
A.make
@@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 };
id;
res;
runner = Runner.dummy;
pfiber = Moonpool.Private.Types_._dummy_fiber;
}
let[@inline] return x = create_done_ ~res:(Fut.return x) ()
let[@inline] fail ebt = create_done_ ~res:(Fut.fail_exn_bt ebt) ()
let[@inline] res self = self.res
let[@inline] peek self = Fut.peek self.res
let[@inline] is_done self = Fut.is_done self.res
let[@inline] is_success self = Fut.is_success self.res
let[@inline] is_cancelled self = Fut.is_failed self.res
let[@inline] on_result (self : _ t) f = Fut.on_result self.res f
let[@inline] await self = Fut.await self.res
let[@inline] wait_block self = Fut.wait_block self.res
let[@inline] wait_block_exn self = Fut.wait_block_exn self.res
(** Resolve [promise] once [children] are all done *)
let resolve_once_children_are_done_ ~children ~promise
(res : 'a Exn_bt.result A.t) : unit =
let n_children = FM.cardinal children in
if n_children > 0 then (
(* wait for all children to be done *)
let n_waiting = A.make (FM.cardinal children) in
let on_child_finish (r : _ result) =
(* make sure the parent fails if any child fails *)
(match r with
| Ok _ -> ()
| Error ebt -> A.set res (Error ebt));
(* if we're the last to finish, resolve the parent fiber's [res] *)
if A.fetch_and_add n_waiting (-1) = 1 then (
let res = A.get res in
Fut.fulfill promise res
)
in
FM.iter (fun _ (Any f) -> Fut.on_result f.res on_child_finish) children
) else
Fut.fulfill promise @@ A.get res
let rec resolve_as_failed_ : type a. a t -> Exn_bt.t -> unit =
fun self ebt ->
let promise = prom_of_fut self.res in
while
match A.get self.state with
| Alive { children; cancel_id = _; on_cancel } as old ->
let new_st = Terminating_or_done (A.make @@ Error ebt) in
if A.compare_and_set self.state old new_st then (
(* here, unlike in {!resolve_fiber}, we immediately cancel children *)
cancel_children_ ~children ebt;
Int_map.iter (fun _ cb -> cb ebt) on_cancel;
resolve_once_children_are_done_ ~children ~promise (A.make @@ Error ebt);
false
) else
true
| Terminating_or_done _ -> false
do
()
done
(** Cancel eagerly all children *)
and cancel_children_ ebt ~children : unit =
FM.iter (fun _ (Any f) -> resolve_as_failed_ f ebt) children
type cancel_handle = int
let add_on_cancel (self : _ t) cb : cancel_handle =
let h = ref 0 in
while
match A.get self.state with
| Alive { children; cancel_id; on_cancel } as old ->
let new_st =
Alive
{
children;
cancel_id = cancel_id + 1;
on_cancel = Int_map.add cancel_id cb on_cancel;
}
in
if A.compare_and_set self.state old new_st then (
h := cancel_id;
false
) else
true
| Terminating_or_done r ->
(match A.get r with
| Error ebt -> cb ebt
| Ok _ -> ());
false
do
()
done;
!h
let remove_on_cancel (self : _ t) h =
while
match A.get self.state with
| Alive ({ on_cancel; _ } as alive) as old ->
let new_st =
Alive { alive with on_cancel = Int_map.remove h on_cancel }
in
not (A.compare_and_set self.state old new_st)
| Terminating_or_done _ -> false
do
()
done
let with_on_cancel (self : _ t) cb (k : unit -> 'a) : 'a =
let h = add_on_cancel self cb in
Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
(** Successfully resolve the fiber. This might still fail if some children
failed. *)
let resolve_ok_ (self : 'a t) (r : 'a) : unit =
let r = A.make @@ Ok r in
let promise = prom_of_fut self.res in
while
match A.get self.state with
| Alive { children; _ } as old ->
let new_st = Terminating_or_done r in
if A.compare_and_set self.state old new_st then (
resolve_once_children_are_done_ ~children ~promise r;
false
) else
true
| Terminating_or_done _ -> false
do
()
done
let remove_child_ (self : _ t) (child : _ t) =
while
match A.get self.state with
| Alive ({ children; _ } as alive) as old ->
let new_st =
Alive { alive with children = FM.remove child.id children }
in
not (A.compare_and_set self.state old new_st)
| _ -> false
do
()
done
(** Add a child to [self].
@param protected if true, the child's failure will not affect [self]. *)
let add_child_ ~protect (self : _ t) (child : _ t) =
while
match A.get self.state with
| Alive ({ children; _ } as alive) as old ->
let new_st =
Alive { alive with children = FM.add child.id (Any child) children }
in
if A.compare_and_set self.state old new_st then (
(* make sure to remove [child] from [self.children] once it's done;
fail [self] is [child] failed and [protect=false] *)
Fut.on_result child.res (function
| Ok _ -> remove_child_ self child
| Error ebt ->
(* child failed, we must fail too *)
remove_child_ self child;
if not protect then resolve_as_failed_ self ebt);
false
) else
true
| Terminating_or_done r ->
(match A.get r with
| Error ebt ->
(* cancel child immediately *)
resolve_as_failed_ child ebt
| Ok _ -> ());
false
do
()
done
let spawn_ ~parent ~runner (f : unit -> 'a) : 'a t =
let res, _ = Fut.make () in
let pfiber = PF.create ~forbid:false (Fut.Private_.as_computation res) in
(* copy local hmap from parent, if present *)
Option.iter
(fun (p : _ t) -> Fls.Private_hmap_ls_.copy_fls p.pfiber pfiber)
parent;
(match parent with
| Some p when is_closed p -> failwith "spawn: nursery is closed"
| _ -> ());
let fib = create_ ~pfiber ~runner ~res () in
let run () =
(* make sure the fiber is accessible from inside itself *)
FLS.set pfiber k_current_fiber (Any fib);
try
let res = f () in
resolve_ok_ fib res
with exn ->
let bt = Printexc.get_raw_backtrace () in
let ebt = Exn_bt.make exn bt in
resolve_as_failed_ fib ebt
in
Runner.run_async ~fiber:pfiber runner run;
fib
let spawn_top ~on f : _ t = spawn_ ~runner:on ~parent:None f
let spawn ?on ?(protect = true) f : _ t =
(* spawn [f()] with a copy of our local storage *)
let (Any p) =
try get_cur_exn ()
with Not_set ->
failwith "Fiber.spawn: must be run from within another fiber."
in
let runner =
match on with
| Some r -> r
| None -> p.runner
in
let child = spawn_ ~parent:(Some p) ~runner f in
add_child_ ~protect p child;
child
let[@inline] spawn_ignore ?on ?protect f : unit =
ignore (spawn ?on ?protect f : _ t)
let[@inline] spawn_top_ignore ~on f : unit = ignore (spawn_top ~on f : _ t)
let[@inline] self () : any =
match get_cur_exn () with
| exception Not_set -> failwith "Fiber.self: must be run from inside a fiber."
| f -> f
let with_on_self_cancel cb (k : unit -> 'a) : 'a =
let (Any self) = self () in
let h = add_on_cancel self cb in
Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
let[@inline] check_if_cancelled_ (self : _ t) = PF.check self.pfiber
let check_if_cancelled () =
match get_cur_exn () with
| exception Not_set ->
failwith "Fiber.check_if_cancelled: must be run from inside a fiber."
| Any self -> check_if_cancelled_ self
let yield () : unit =
match get_cur_exn () with
| exception Not_set ->
failwith "Fiber.yield: must be run from inside a fiber."
| Any self ->
check_if_cancelled_ self;
PF.yield ();
check_if_cancelled_ self

View file

@ -1,150 +0,0 @@
(** Fibers.
A fiber is a lightweight computation that runs cooperatively alongside other
fibers. In the context of moonpool, fibers have additional properties:
- they run in a moonpool runner
- they form a simple supervision tree, enabling a limited form of structured
concurrency *)
type cancel_callback = Exn_bt.t -> unit
(** A callback used in case of cancellation *)
(**/**)
(** Do not rely on this, it is internal implementation details. *)
module Private_ : sig
type 'a state
type pfiber
type 'a t = private {
id: Handle.t; (** unique identifier for this fiber *)
state: 'a state Atomic.t; (** Current state in the lifetime of the fiber *)
res: 'a Fut.t;
runner: Runner.t;
pfiber: pfiber;
}
(** Type definition, exposed so that {!any} can be unboxed. Please do not rely
on that. *)
type any = Any : _ t -> any [@@unboxed]
exception Not_set
val get_cur_exn : unit -> any
(** [get_cur_exn ()] either returns the current fiber, or
@raise Not_set if run outside a fiber. *)
val get_cur_opt : unit -> any option
end
(**/**)
type 'a t = 'a Private_.t
(** A fiber returning a value of type ['a]. *)
val res : 'a t -> 'a Fut.t
(** Future result of the fiber. *)
type 'a callback = 'a Exn_bt.result -> unit
(** Callbacks that are called when a fiber is done. *)
(** Type erased fiber *)
type any = Private_.any = Any : _ t -> any [@@unboxed]
val return : 'a -> 'a t
val fail : Exn_bt.t -> _ t
val self : unit -> any
(** [self ()] is the current fiber. Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *)
val peek : 'a t -> 'a Fut.or_error option
(** Peek inside the future result *)
val is_done : _ t -> bool
(** Has the fiber completed? *)
val is_cancelled : _ t -> bool
(** Has the fiber completed with a failure? *)
val is_success : _ t -> bool
(** Has the fiber completed with a value? *)
val await : 'a t -> 'a
(** [await fib] is like [Fut.await (res fib)] *)
val wait_block_exn : 'a t -> 'a
(** [wait_block_exn fib] is [Fut.wait_block_exn (res fib)]. {b NOTE}: See
{!Fut.wait_block} for warnings about deadlocks. *)
val wait_block : 'a t -> 'a Fut.or_error
(** [wait_block fib] is [Fut.wait_block (res fib)]. {b NOTE}: See
{!Fut.wait_block} for warnings about deadlocks. *)
val check_if_cancelled : unit -> unit
(** Check if the current fiber is cancelled, in which case this raises. Must be
run from inside a fiber.
@raise e if the current fiber is cancelled with exception [e]
@raise Failure if not run from a fiber. *)
val yield : unit -> unit
(** Yield control to the scheduler from the current fiber.
@raise Failure if not run from inside a fiber. *)
type cancel_handle
(** An opaque handle for a single cancel callback in a fiber *)
val add_on_cancel : _ t -> cancel_callback -> cancel_handle
(** [add_on_cancel fib cb] adds [cb] to the list of cancel callbacks for [fib].
If [fib] is already cancelled, [cb] is called immediately. *)
val remove_on_cancel : _ t -> cancel_handle -> unit
(** [remove_on_cancel fib h] removes the cancel callback associated with handle
[h]. *)
val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'a
(** [with_on_cancel fib cb (fun () -> <e>)] evaluates [e] in a scope in which,
if the fiber [fib] is cancelled, [cb()] is called. If [e] returns without
the fiber being cancelled, this callback is removed. *)
val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'a
(** [with_on_self_cancel cb f] calls [f()] in a scope where [cb] is added to the
cancel callbacks of the current fiber; and [f()] terminates, [cb] is removed
from the list. *)
val on_result : 'a t -> 'a callback -> unit
(** Wait for fiber to be done and call the callback with the result. If the
fiber is done already then the callback is invoked immediately with its
result. *)
val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t
(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. This
fiber is not the child of any other fiber: its lifetime is only determined
by the lifetime of [f()]. *)
val spawn : ?on:Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a t
(** [spawn ~protect f] spawns a sub-fiber [f_child] from a running fiber
[parent]. The sub-fiber [f_child] is attached to the current fiber and fails
if the current fiber [parent] fails.
@param on
if provided, start the fiber on the given runner. If not provided, use the
parent's runner.
@param protect
if true, when [f_child] fails, it does not affect [parent]. If false,
[f_child] failing also causes [parent] to fail (and therefore all other
children of [parent]). Default is [true].
Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *)
val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit
(** [spawn_ignore f] is [ignore (spawn f)]. The fiber will still affect
termination of the parent, ie. the parent will exit only after this new
fiber exits.
@param on the optional runner to use, added since 0.7 *)
val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit
(** Like {!spawn_top} but ignores the result.
@since 0.7 *)

View file

@ -1 +0,0 @@
include Task_local_storage

View file

@ -1,17 +0,0 @@
(** Fiber-local storage.
This storage is associated to the current fiber, just like thread-local
storage is associated with the current thread.
See {!Moonpool.Task_local_storage} for more general information, as this is
based on it.
{b NOTE}: it's important to note that, while each fiber has its own storage,
spawning a sub-fiber [f2] from a fiber [f1] will only do a shallow copy of
the storage. Values inside [f1]'s storage will be physically shared with
[f2]. It is thus recommended to store only persistent values in the local
storage. *)
include module type of struct
include Task_local_storage
end

View file

@ -1,14 +0,0 @@
module A = Atomic
type t = int
let counter_ = A.make 0
let equal : t -> t -> bool = ( = )
let compare : t -> t -> int = Stdlib.compare
let[@inline] generate_fresh () = A.fetch_and_add counter_ 1
(* TODO: better hash *)
let[@inline] hash x = x land max_int
module Set = Set.Make (Int)
module Map = Map.Make (Int)

View file

@ -1,17 +0,0 @@
(** The unique name of a fiber.
Each fiber has a unique handle that can be used to refer to it in maps or
sets. *)
type t = private int
(** Unique, opaque identifier for a fiber. *)
val equal : t -> t -> bool
val compare : t -> t -> int
val hash : t -> int
val generate_fresh : unit -> t
(** Generate a fresh, unique identifier *)
module Set : Set.S with type elt = t
module Map : Map.S with type key = t

View file

@ -1,12 +0,0 @@
(** Fibers for moonpool.
See {!Fiber} for the most important explanations.
@since 0.6. *)
module Fiber = Fiber
module Fls = Fls
module Handle = Handle
module Main = Main
include Fiber
include Main

View file

@ -64,7 +64,7 @@ module State_ = struct
done;
(* wait for the other computation to be done *)
if !must_await then Trigger.await trigger |> Option.iter Exn_bt.raise
if !must_await then Trigger.await_exn trigger
| Right_solved _ | Both_solved _ -> assert false
end
@ -144,7 +144,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
i := !i + len_range
done;
Trigger.await trigger |> Option.iter Exn_bt.raise;
Trigger.await_exn trigger;
Option.iter Exn_bt.raise @@ A.get failure;
()
)

View file

@ -5,7 +5,6 @@
(>= %{ocaml_version} 5.0))
(libraries
(re_export moonpool)
moonpool.fib
picos
(re_export lwt)
lwt.unix))

View file

@ -289,7 +289,7 @@ let[@inline] is_setup () = Option.is_some @@ Atomic.get Scheduler_state.cur_st
let spawn_lwt f : _ Lwt.t =
let st = Main_state.get_st () in
let lwt_fut, lwt_prom = Lwt.wait () in
Moonpool_fib.spawn_top_ignore ~on:st.as_runner (fun () ->
M.run_async st.as_runner (fun () ->
try
let x = f () in
Lwt.wakeup lwt_prom x

View file

@ -6,7 +6,6 @@
(libraries
t_fibers
moonpool
moonpool.fib
trace
trace-tef
qcheck-core

View file

@ -2,4 +2,4 @@
(name t_fibers)
(package moonpool)
(optional)
(libraries moonpool moonpool.fib trace qcheck-core hmap))
(libraries moonpool trace qcheck-core hmap))

View file

@ -1,6 +1,7 @@
open! Moonpool
module Chan = Moonpool.Chan
module Exn_bt = Moonpool.Exn_bt
module A = Atomic
module F = Moonpool_fib.Fiber
module Fut = Moonpool.Fut
let ( let@ ) = ( @@ )
@ -49,24 +50,23 @@ let logf = Log_.logf
let run1 ~runner () =
Printf.printf "============\nstart\n%!";
let clock = ref TS.init in
let fib =
F.spawn_top ~on:runner @@ fun () ->
let fut =
Fut.spawn ~on:runner @@ fun () ->
let chan_progress = Chan.create ~max_size:4 () in
let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in
let subs =
List.init 5 (fun i ->
F.spawn ~protect:false @@ fun _n ->
Fut.spawn ~on:runner @@ fun _n ->
Thread.delay (float i *. 0.01);
Chan.pop chans.(i);
Chan.push chan_progress i;
F.check_if_cancelled ();
i)
in
logf (TS.tick_get clock) "wait for subs";
F.spawn_ignore (fun () ->
Moonpool.run_async runner (fun () ->
for i = 0 to 4 do
Chan.push chans.(i) ();
let i' = Chan.pop chan_progress in
@ -78,19 +78,15 @@ let run1 ~runner () =
(fun i f ->
let clock = ref (0 :: i :: clock0) in
logf !clock "await fiber %d" i;
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
(Option.is_some @@ F.Private_.get_cur_opt ());
let res = F.await f in
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
(Option.is_some @@ F.Private_.get_cur_opt ());
F.yield ();
let res = Fut.await f in
Fut.yield ();
logf (TS.tick_get clock) "res %d = %d" i res)
subs);
logf (TS.tick_get clock) "main fiber done"
in
Fut.await @@ F.res fib;
Fut.await fut;
logf (TS.tick_get clock) "main fiber exited";
Log_.print_and_clear ();
()
@ -99,15 +95,11 @@ let run2 ~runner () =
(* same but now, cancel one of the sub-fibers *)
Printf.printf "============\nstart\n";
let clock = ref TS.init in
let fib =
F.spawn_top ~on:runner @@ fun () ->
let@ () =
F.with_on_self_cancel (fun ebt ->
logf (TS.tick_get clock) "main fiber cancelled with %s"
@@ Exn_bt.show ebt)
in
let to_await = ref [] in
let clock = ref TS.init in
let fut =
Fut.spawn ~on:runner @@ fun () ->
let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in
let chan_progress = Chan.create ~max_size:4 () in
@ -116,11 +108,7 @@ let run2 ~runner () =
let clock0 = !clock in
List.init 10 (fun i ->
let clock = ref (0 :: i :: clock0) in
F.spawn ~protect:false @@ fun _n ->
let@ () =
F.with_on_self_cancel (fun _ ->
logf (TS.tick_get clock) "sub-fiber %d was cancelled" i)
in
Fut.spawn ~on:runner @@ fun _n ->
Thread.delay 0.002;
(* sync for determinism *)
@ -132,46 +120,51 @@ let run2 ~runner () =
failwith "oh no!"
);
F.check_if_cancelled ();
i)
in
let post = TS.tick_get clock in
List.iteri
(fun i fib ->
F.on_result fib (function
Fut.on_result fib (function
| Ok _ -> logf (i :: post) "fiber %d resolved as ok" i
| Error _ -> logf (i :: post) "fiber %d resolved as error" i))
subs;
(* sequentialize the fibers, for determinism *)
F.spawn_ignore (fun () ->
for j = 0 to 9 do
Chan.push chans_unblock.(j) ();
let j' = Chan.pop chan_progress in
assert (j = j')
done);
let sender =
Fut.spawn ~on:runner (fun () ->
for j = 0 to 9 do
Chan.push chans_unblock.(j) ();
let j' = Chan.pop chan_progress in
assert (j = j')
done)
in
to_await := sender :: !to_await;
logf (TS.tick_get clock) "wait for subs";
List.iteri
(fun i f ->
logf (TS.tick_get clock) "await fiber %d" i;
let res = F.await f in
let res = Fut.await f in
logf (TS.tick_get clock) "res %d = %d" i res)
subs;
logf (TS.tick_get clock) "yield";
F.yield ();
Fut.yield ();
logf (TS.tick_get clock) "yielded";
logf (TS.tick_get clock) "main fiber done"
in
F.on_result fib (function
Fut.on_result fut (function
| Ok () -> logf (TS.tick_get clock) "main fiber result: ok"
| Error ebt ->
logf (TS.tick_get clock) "main fiber result: error %s" (Exn_bt.show ebt));
(try Fut.await @@ F.res fib
(try Fut.await fut
with Failure msg -> logf (TS.tick_get clock) "main fib failed with %S" msg);
logf (TS.tick_get clock) "main fiber exited";
List.iter Fut.await !to_await;
Log_.print_and_clear ();
()

View file

@ -1,7 +1,7 @@
open! Moonpool
module A = Atomic
module F = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls
module F = Moonpool.Fut
module FLS = Moonpool.Task_local_storage
(* ### dummy little tracing system with local storage *)
@ -122,7 +122,7 @@ let run ~pool ~pool_name () =
let subs =
List.init 2 (fun idx_sub_sub ->
F.spawn ~protect:true (fun () ->
F.spawn ~on:pool (fun () ->
sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ()))
in
List.iter F.await subs
@ -133,8 +133,7 @@ let run ~pool ~pool_name () =
let subs =
List.init 2 (fun k ->
F.spawn ~protect:true @@ fun () ->
sub_child ~idx ~idx_child ~idx_sub:k ())
F.spawn ~on:pool @@ fun () -> sub_child ~idx ~idx_child ~idx_sub:k ())
in
let@ () =
@ -149,16 +148,14 @@ let run ~pool ~pool_name () =
let subs =
List.init 5 (fun j ->
F.spawn ~protect:true @@ fun () -> top_child ~idx ~idx_child:j ())
F.spawn ~on:pool @@ fun () -> top_child ~idx ~idx_child:j ())
in
List.iter F.await subs
in
Printf.printf "run test on pool = %s\n" pool_name;
let fibs =
List.init 8 (fun idx -> F.spawn_top ~on:pool (fun () -> top idx))
in
let fibs = List.init 8 (fun idx -> F.spawn ~on:pool (fun () -> top idx)) in
List.iter F.await fibs;
Printf.printf "tracing complete\n";

View file

@ -2,33 +2,21 @@
start
1: wait for subs
1.0.0: await fiber 0
1.0.1: cur fiber[0] is some: true
1.0.2: cur fiber[0] is some: true
1.0.3: res 0 = 0
1.0.1: res 0 = 0
1.1.0: await fiber 1
1.1.1: cur fiber[1] is some: true
1.1.2: cur fiber[1] is some: true
1.1.3: res 1 = 1
1.1.1: res 1 = 1
1.2.0: await fiber 2
1.2.1: cur fiber[2] is some: true
1.2.2: cur fiber[2] is some: true
1.2.3: res 2 = 2
1.2.1: res 2 = 2
1.3.0: await fiber 3
1.3.1: cur fiber[3] is some: true
1.3.2: cur fiber[3] is some: true
1.3.3: res 3 = 3
1.3.1: res 3 = 3
1.4.0: await fiber 4
1.4.1: cur fiber[4] is some: true
1.4.2: cur fiber[4] is some: true
1.4.3: res 4 = 4
1.4.1: res 4 = 4
2: main fiber done
3: main fiber exited
============
start
1: start fibers
1.7.1: I'm fiber 7 and I'm about to fail…
1.8.1: sub-fiber 8 was cancelled
1.9.1: sub-fiber 9 was cancelled
2.0: fiber 0 resolved as ok
2.1: fiber 1 resolved as ok
2.2: fiber 2 resolved as ok
@ -37,8 +25,8 @@ start
2.5: fiber 5 resolved as ok
2.6: fiber 6 resolved as ok
2.7: fiber 7 resolved as error
2.8: fiber 8 resolved as error
2.9: fiber 9 resolved as error
2.8: fiber 8 resolved as ok
2.9: fiber 9 resolved as ok
3: wait for subs
4: await fiber 0
5: res 0 = 0
@ -55,7 +43,6 @@ start
16: await fiber 6
17: res 6 = 6
18: await fiber 7
19: main fiber cancelled with Failure("oh no!")
20: main fiber result: error Failure("oh no!")
21: main fib failed with "oh no!"
22: main fiber exited
19: main fiber result: error Failure("oh no!")
20: main fib failed with "oh no!"
21: main fiber exited

View file

@ -1,6 +1,6 @@
let ( let@ ) = ( @@ )
let () =
let@ runner = Moonpool_fib.main in
let@ runner = Moonpool.main in
T_fibers.Fib.run1 ~runner ();
T_fibers.Fib.run2 ~runner ()

File diff suppressed because it is too large Load diff

View file

@ -3,7 +3,7 @@ open! Moonpool
let ( let@ ) = ( @@ )
let () =
let@ _ = Moonpool_fib.main in
let@ _ = Moonpool.main in
(let@ pool = Ws_pool.with_ () in
T_fibers.Fls.run ~pool ~pool_name:"ws_pool" ());

View file

@ -1,14 +1,14 @@
open Moonpool
module F = Moonpool_fib
module F = Moonpool.Fut
let ( let@ ) = ( @@ )
let () =
let r =
F.main @@ fun runner ->
let f1 = F.spawn (fun () -> 1) in
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
let f3 = F.spawn (fun () -> F.await f1 + 10) in
Moonpool.main @@ fun runner ->
let f1 = F.spawn ~on:runner (fun () -> 1) in
let f2 = F.spawn ~on:runner (fun () -> 2) in
let f3 = F.spawn ~on:runner (fun () -> F.await f1 + 10) in
let r = F.await f2 + F.await f3 in
assert (r = 13);
r
@ -19,10 +19,10 @@ let () =
(* run fibers in the background, await them in the main thread *)
let@ bg = Fifo_pool.with_ ~num_threads:4 () in
let r =
F.main @@ fun runner ->
let f1 = F.spawn_top ~on:bg (fun () -> 1) in
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
let f3 = F.spawn_top ~on:bg (fun () -> F.await f1 + 10) in
Moonpool.main @@ fun runner ->
let f1 = F.spawn ~on:bg (fun () -> 1) in
let f2 = F.spawn ~on:runner (fun () -> 2) in
let f3 = F.spawn ~on:bg (fun () -> F.await f1 + 10) in
let r = F.await f2 + F.await f3 in
assert (r = 13);
r
@ -32,8 +32,8 @@ let () =
let () =
try
let _r =
F.main @@ fun _r ->
let fib = F.spawn (fun () -> failwith "oops") in
Moonpool.main @@ fun runner ->
let fib = F.spawn ~on:runner (fun () -> failwith "oops") in
F.await fib
in

View file

@ -6,7 +6,6 @@
(libraries
t_fibers
moonpool
moonpool.fib
moonpool-lwt
hmap
trace

View file

@ -2,33 +2,21 @@
start
1: wait for subs
1.0.0: await fiber 0
1.0.1: cur fiber[0] is some: true
1.0.2: cur fiber[0] is some: true
1.0.3: res 0 = 0
1.0.1: res 0 = 0
1.1.0: await fiber 1
1.1.1: cur fiber[1] is some: true
1.1.2: cur fiber[1] is some: true
1.1.3: res 1 = 1
1.1.1: res 1 = 1
1.2.0: await fiber 2
1.2.1: cur fiber[2] is some: true
1.2.2: cur fiber[2] is some: true
1.2.3: res 2 = 2
1.2.1: res 2 = 2
1.3.0: await fiber 3
1.3.1: cur fiber[3] is some: true
1.3.2: cur fiber[3] is some: true
1.3.3: res 3 = 3
1.3.1: res 3 = 3
1.4.0: await fiber 4
1.4.1: cur fiber[4] is some: true
1.4.2: cur fiber[4] is some: true
1.4.3: res 4 = 4
1.4.1: res 4 = 4
2: main fiber done
3: main fiber exited
============
start
1: start fibers
1.7.1: I'm fiber 7 and I'm about to fail…
1.8.1: sub-fiber 8 was cancelled
1.9.1: sub-fiber 9 was cancelled
2.0: fiber 0 resolved as ok
2.1: fiber 1 resolved as ok
2.2: fiber 2 resolved as ok
@ -37,8 +25,8 @@ start
2.5: fiber 5 resolved as ok
2.6: fiber 6 resolved as ok
2.7: fiber 7 resolved as error
2.8: fiber 8 resolved as error
2.9: fiber 9 resolved as error
2.8: fiber 8 resolved as ok
2.9: fiber 9 resolved as ok
3: wait for subs
4: await fiber 0
5: res 0 = 0
@ -55,7 +43,6 @@ start
16: await fiber 6
17: res 6 = 6
18: await fiber 7
19: main fiber cancelled with Failure("oh no!")
20: main fiber result: error Failure("oh no!")
21: main fib failed with "oh no!"
22: main fiber exited
19: main fiber result: error Failure("oh no!")
20: main fib failed with "oh no!"
21: main fiber exited

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,6 @@
open Moonpool
module M_lwt = Moonpool_lwt
module F = Moonpool_fib
module F = Moonpool.Fut
let ( let@ ) = ( @@ )
@ -9,9 +9,9 @@ let () =
let@ bg = Fifo_pool.with_ ~num_threads:4 () in
let r =
M_lwt.lwt_main @@ fun runner ->
let f1 = F.spawn_top ~on:bg (fun () -> 1) in
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
let f3 = F.spawn_top ~on:runner (fun () -> F.await f1 + 10) in
let f1 = F.spawn ~on:bg (fun () -> 1) in
let f2 = F.spawn ~on:runner (fun () -> 2) in
let f3 = F.spawn ~on:runner (fun () -> F.await f1 + 10) in
let r = F.await f2 + F.await f3 in
assert (r = 13);
r
@ -24,7 +24,7 @@ let () =
try
let _r =
M_lwt.lwt_main @@ fun runner ->
let fib = F.spawn_top ~on:runner (fun () -> failwith "oops") in
let fib = F.spawn ~on:runner (fun () -> failwith "oops") in
F.await fib
in