revert nurseries

This commit is contained in:
Simon Cruanes 2024-03-04 10:53:00 -05:00
parent 8c10c2b329
commit 5817a8aee7
6 changed files with 59 additions and 128 deletions

View file

@ -29,7 +29,6 @@ module Private_ = struct
and children = any FM.t and children = any FM.t
and any = Any : _ t -> any [@@unboxed] and any = Any : _ t -> any [@@unboxed]
and nursery = Nursery : _ t -> nursery [@@unboxed]
(** Key to access the current fiber. *) (** Key to access the current fiber. *)
let k_current_fiber : any option Task_local_storage.key = let k_current_fiber : any option Task_local_storage.key =
@ -226,15 +225,17 @@ let create_ ~ls ~runner () : 'a t =
ls; ls;
} }
let spawn_ ~ls (Nursery n) (f : nursery -> 'a) : 'a t = let spawn_ ~ls ~parent ~runner (f : unit -> 'a) : 'a t =
if is_closed n then failwith "spawn: nursery is closed"; (match parent with
let fib = create_ ~ls ~runner:n.runner () in | Some p when is_closed p -> failwith "spawn: nursery is closed"
| _ -> ());
let fib = create_ ~ls ~runner () in
let run () = let run () =
(* make sure the fiber is accessible from inside itself *) (* make sure the fiber is accessible from inside itself *)
Task_local_storage.set k_current_fiber (Some (Any fib)); Task_local_storage.set k_current_fiber (Some (Any fib));
try try
let res = f (Nursery fib) in let res = f () in
resolve_ok_ fib res resolve_ok_ fib res
with exn -> with exn ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
@ -242,57 +243,33 @@ let spawn_ ~ls (Nursery n) (f : nursery -> 'a) : 'a t =
resolve_as_failed_ fib ebt resolve_as_failed_ fib ebt
in in
Runner.run_async ~ls n.runner run; Runner.run_async ~ls runner run;
fib fib
let spawn (Nursery n) ?(protect = true) f : _ t = let spawn_top ~on f : _ t =
let ls = Task_local_storage.Direct.create () in
spawn_ ~ls ~runner:on ~parent:None f
let spawn ?(protect = true) f : _ t =
(* spawn [f()] with a copy of our local storage *) (* spawn [f()] with a copy of our local storage *)
let ls = Task_local_storage.Direct.copy n.ls in let (Any p) =
let child = spawn_ ~ls (Nursery n) f in match get_cur () with
add_child_ ~protect n child; | None -> failwith "Fiber.spawn: must be run from within another fiber."
| Some p -> p
in
let ls = Task_local_storage.Direct.copy p.ls in
let child = spawn_ ~ls ~parent:(Some p) ~runner:p.runner f in
add_child_ ~protect p child;
child child
let[@inline] spawn_ignore n ?protect f : unit = let[@inline] spawn_ignore ?protect f : unit = ignore (spawn ?protect f : _ t)
ignore (spawn n ?protect f : _ t)
module Nursery = struct
type t = nursery
let[@inline] runner (Nursery n) = n.runner
let[@inline] await (Nursery n) : unit =
ignore (await n);
()
let cancel_with (Nursery n) ebt : unit = resolve_as_failed_ n ebt
let with_create_top ~on () f =
let n = create_ ~ls:(Task_local_storage.Direct.create ()) ~runner:on () in
Fun.protect ~finally:(fun () -> resolve_ok_ n ()) (fun () -> f (Nursery n))
let with_create_sub ~protect (Nursery parent : t) f =
let n =
create_
~ls:(Task_local_storage.Direct.copy parent.ls)
~runner:parent.runner ()
in
add_child_ ~protect parent n;
Fun.protect ~finally:(fun () -> resolve_ok_ n ()) (fun () -> f (Nursery n))
let[@inline] with_cancel_callback (Nursery self) cb f =
with_cancel_callback self cb f
end
let[@inline] self () : any = let[@inline] self () : any =
match Task_local_storage.get k_current_fiber with match Task_local_storage.get k_current_fiber with
| None -> failwith "Fiber.self: must be run from inside a fiber." | None -> failwith "Fiber.self: must be run from inside a fiber."
| Some f -> f | Some f -> f
let[@inline] cur_nursery () =
let (Any f) = self () in
Nursery f
let with_self_cancel_callback cb (k : unit -> 'a) : 'a = let with_self_cancel_callback cb (k : unit -> 'a) : 'a =
let (Any self) = self () in let (Any self) = self () in
let h = add_on_cancel self cb in let h = add_on_cancel self cb in

View file

@ -12,46 +12,6 @@
type cancel_callback = Exn_bt.t -> unit type cancel_callback = Exn_bt.t -> unit
(** A callback used in case of cancellation *) (** A callback used in case of cancellation *)
(** Nurseries.
Fibers belong in a {b nursery}
(as defined in {{: https://trio.readthedocs.io/en/stable/reference-core.html} Trio}).
The nursery does multiple things.
- it stores a runner, so we easily know where to run fibers;
- it makes it clear in signatures that we might run fibers in a function
- it groups cancellation of multiple fibers together
*)
module Nursery : sig
type t
val runner : t -> Runner.t
(** Recover the runner this nursery uses to spawn fibers *)
val await : t -> unit
(** Await for the nursery to exit. *)
val with_create_top : on:Runner.t -> unit -> (t -> 'a) -> 'a
(** New toplevel nursery. It runs fibers on the [on] runner. *)
val with_create_sub : protect:bool -> t -> (t -> 'a) -> 'a
(** Create a sub-nursery. The sub-nursery is cancelled if the
parent is. Cancelling the sub-nursery also cancels the
parent if [protect=false]. When the function returns,
the nursery is closed an no other fiber can be scheduled on it. *)
val cancel_with : t -> Exn_bt.t -> unit
(** Cancel the nursery (and all its children) with the given exception. *)
val with_cancel_callback : t -> cancel_callback -> (unit -> 'a) -> 'a
(** [with_cancel_callback nursery cb (fun () -> <e>)] evaluates [e]
in a scope in which, if the nursery [nursery] is cancelled,
[cb()] is called. If [e] returns without the nursery being cancelled,
this callback is removed. *)
end
(**/**) (**/**)
module Private_ : sig module Private_ : sig
@ -91,11 +51,6 @@ val self : unit -> any
Must be run from inside a fiber. Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *) @raise Failure if not run from inside a fiber. *)
val cur_nursery : unit -> Nursery.t
(** [cur_nursery ()] returns the nearest nursery.
Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *)
val peek : 'a t -> 'a Fut.or_error option val peek : 'a t -> 'a Fut.or_error option
(** Peek inside the future result *) (** Peek inside the future result *)
@ -154,22 +109,26 @@ val on_result : 'a t -> 'a callback -> unit
with the result. If the fiber is done already then the with the result. If the fiber is done already then the
callback is invoked immediately with its result. *) callback is invoked immediately with its result. *)
val spawn : Nursery.t -> ?protect:bool -> (Nursery.t -> 'a) -> 'a t val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t
(** [spawn n f] spawns a new fiber [fib] in the given nursery [n]. (** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner.
This fiber is not the child of any other fiber: its lifetime
is only determined by the lifetime of [f()]. *)
The fiber [fib] is attached to the nursery and fails val spawn : ?protect:bool -> (unit -> 'a) -> 'a t
if the nursery fails. (** [spawn ~protect f] spawns a sub-fiber [f_child]
from a running fiber [parent].
The sub-fiber [f_child] is attached to the current fiber and fails
if the current fiber [parent] fails.
The function [f] is passed a nursery whose lifetime is @param protect if true, when [f_child] fails, it does not
the same as the fiber's. affect [parent]. If false, [f_child] failing also
causes [parent] to fail (and therefore all other children
of [parent]). Default is [true].
@param protect if true, when [fib] fails, it does not Must be run from inside a fiber.
affect [nursery] (but the failure can still be re-raised @raise Failure if not run from inside a fiber. *)
in {!await}). If false, [fib] failing also
causes [nursery] to fail (and therefore all other children
of [nursery] to be cancelled). Default is [true]. *)
val spawn_ignore : Nursery.t -> ?protect:bool -> (Nursery.t -> _) -> unit val spawn_ignore : ?protect:bool -> (unit -> _) -> unit
(** [spawn_ignore n f] is [ignore (spawn n f)]. (** [spawn_ignore f] is [ignore (spawn f)].
The fiber will still affect termination of [n], ie. [n] will exit The fiber will still affect termination of the parent, ie. the
only after the fiber exits. *) parent will exit only after this new fiber exits. *)

View file

@ -144,14 +144,13 @@ let detach_in_runner ~runner f : _ Lwt.t =
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn)); Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn));
fut fut
let main_with_runner ~runner (f : Fiber.Nursery.t -> 'a) : 'a = let main_with_runner ~runner (f : unit -> 'a) : 'a =
let lwt_fut, lwt_prom = Lwt.wait () in let lwt_fut, lwt_prom = Lwt.wait () in
let@ n = Fiber.Nursery.with_create_top ~on:runner () in
let _fiber = let _fiber =
Fiber.spawn n (fun _n -> Fiber.spawn_top ~on:runner (fun () ->
try try
let x = f n in let x = f () in
Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x)) Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x))
with exn -> with exn ->
Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn))) Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn)))

View file

@ -136,9 +136,9 @@ val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t
(** {2 Wrappers around Lwt_main} *) (** {2 Wrappers around Lwt_main} *)
val main_with_runner : runner:Moonpool.Runner.t -> (Fiber.Nursery.t -> 'a) -> 'a val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a
(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside (** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside
a fiber in [runner]. *) a fiber in [runner]. *)
val main : (Fiber.Nursery.t -> 'a) -> 'a val main : (unit -> 'a) -> 'a
(** Like {!main_with_runner} but with a default choice of runner. *) (** Like {!main_with_runner} but with a default choice of runner. *)

View file

@ -49,18 +49,17 @@ let logf = Log_.logf
let () = let () =
Printf.printf "============\nstart\n"; Printf.printf "============\nstart\n";
let@ nursery = F.Nursery.with_create_top ~on:runner () in
let clock = ref TS.init in let clock = ref TS.init in
let fib = let fib =
F.spawn nursery @@ fun nursery -> F.spawn_top ~on:runner @@ fun () ->
let subs = let subs =
List.init 5 (fun i -> List.init 5 (fun i ->
F.spawn nursery ~protect:false @@ fun _n -> F.spawn ~protect:false @@ fun _n ->
Thread.delay (float i *. 0.01); Thread.delay (float i *. 0.01);
i) i)
in in
F.spawn_ignore nursery ~protect:false (fun _n -> F.spawn_ignore ~protect:false (fun _n ->
Thread.delay 0.4; Thread.delay 0.4;
TS.tick clock; TS.tick clock;
logf !clock "other fib done"); logf !clock "other fib done");
@ -91,9 +90,8 @@ let () =
Printf.printf "============\nstart\n"; Printf.printf "============\nstart\n";
let clock = ref TS.init in let clock = ref TS.init in
let@ nursery = F.Nursery.with_create_top ~on:runner () in
let fib = let fib =
F.spawn nursery @@ fun nursery -> F.spawn_top ~on:runner @@ fun () ->
let@ () = let@ () =
F.with_self_cancel_callback (fun ebt -> F.with_self_cancel_callback (fun ebt ->
logf (TS.tick_get clock) "main fiber cancelled with %s" logf (TS.tick_get clock) "main fiber cancelled with %s"
@ -104,7 +102,7 @@ let () =
let subs = let subs =
List.init 10 (fun i -> List.init 10 (fun i ->
let clock = ref (0 :: i :: !clock) in let clock = ref (0 :: i :: !clock) in
F.spawn nursery ~protect:false @@ fun _n -> F.spawn ~protect:false @@ fun _n ->
let@ () = let@ () =
F.with_self_cancel_callback (fun _ -> F.with_self_cancel_callback (fun _ ->
logf (TS.tick_get clock) "sub-fiber %d was cancelled" i) logf (TS.tick_get clock) "sub-fiber %d was cancelled" i)
@ -126,7 +124,7 @@ let () =
| Error _ -> logf (i :: post) "fiber %d resolved as error" i)) | Error _ -> logf (i :: post) "fiber %d resolved as error" i))
subs; subs;
F.spawn_ignore nursery ~protect:false (fun _n -> F.spawn_ignore ~protect:false (fun _n ->
Thread.delay 0.2; Thread.delay 0.2;
logf (TS.tick_get clock) "other fib done"); logf (TS.tick_get clock) "other fib done");

View file

@ -96,7 +96,6 @@ module Render = struct
end end
let run ~pool ~pool_name () = let run ~pool ~pool_name () =
let@ nursery = F.Nursery.with_create_top ~on:pool () in
let tracer = Tracer.create () in let tracer = Tracer.create () in
let sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub () = let sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub () =
@ -111,7 +110,7 @@ let run ~pool ~pool_name () =
done done
in in
let sub_child ~idx ~idx_child ~idx_sub nursery = let sub_child ~idx ~idx_child ~idx_sub () =
let@ () = let@ () =
Tracer.with_span tracer (spf "child_%d.%d.%d" idx idx_child idx_sub) Tracer.with_span tracer (spf "child_%d.%d.%d" idx idx_child idx_sub)
in in
@ -123,19 +122,19 @@ let run ~pool ~pool_name () =
let subs = let subs =
List.init 2 (fun idx_sub_sub -> List.init 2 (fun idx_sub_sub ->
F.spawn ~protect:true nursery (fun _nursery -> F.spawn ~protect:true (fun () ->
sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ())) sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ()))
in in
List.iter F.await subs List.iter F.await subs
in in
let top_child ~idx ~idx_child nursery = let top_child ~idx ~idx_child () =
let@ () = Tracer.with_span tracer (spf "child.%d.%d" idx idx_child) in let@ () = Tracer.with_span tracer (spf "child.%d.%d" idx idx_child) in
let subs = let subs =
List.init 2 (fun k -> List.init 2 (fun k ->
F.spawn nursery ~protect:true @@ fun nursery -> F.spawn ~protect:true @@ fun () ->
sub_child ~idx ~idx_child ~idx_sub:k nursery) sub_child ~idx ~idx_child ~idx_sub:k ())
in in
let@ () = let@ () =
@ -145,13 +144,12 @@ let run ~pool ~pool_name () =
List.iter F.await subs List.iter F.await subs
in in
let top nursery idx = let top idx =
let@ () = Tracer.with_span tracer (spf "top_%d" idx) in let@ () = Tracer.with_span tracer (spf "top_%d" idx) in
let subs = let subs =
List.init 5 (fun j -> List.init 5 (fun j ->
F.spawn nursery ~protect:true @@ fun nursery -> F.spawn ~protect:true @@ fun () -> top_child ~idx ~idx_child:j ())
top_child ~idx ~idx_child:j nursery)
in in
List.iter F.await subs List.iter F.await subs
@ -159,7 +157,7 @@ let run ~pool ~pool_name () =
Printf.printf "run test on pool = %s\n" pool_name; Printf.printf "run test on pool = %s\n" pool_name;
let fibs = let fibs =
List.init 8 (fun idx -> F.spawn nursery (fun nursery -> top nursery idx)) List.init 8 (fun idx -> F.spawn_top ~on:pool (fun () -> top idx))
in in
List.iter F.wait_block_exn fibs; List.iter F.wait_block_exn fibs;