diff --git a/src/fib/fiber.ml b/src/fib/fiber.ml index fe6fe22b..dcc59557 100644 --- a/src/fib/fiber.ml +++ b/src/fib/fiber.ml @@ -29,7 +29,6 @@ module Private_ = struct and children = any FM.t and any = Any : _ t -> any [@@unboxed] - and nursery = Nursery : _ t -> nursery [@@unboxed] (** Key to access the current fiber. *) let k_current_fiber : any option Task_local_storage.key = @@ -226,15 +225,17 @@ let create_ ~ls ~runner () : 'a t = ls; } -let spawn_ ~ls (Nursery n) (f : nursery -> 'a) : 'a t = - if is_closed n then failwith "spawn: nursery is closed"; - let fib = create_ ~ls ~runner:n.runner () in +let spawn_ ~ls ~parent ~runner (f : unit -> 'a) : 'a t = + (match parent with + | Some p when is_closed p -> failwith "spawn: nursery is closed" + | _ -> ()); + let fib = create_ ~ls ~runner () in let run () = (* make sure the fiber is accessible from inside itself *) Task_local_storage.set k_current_fiber (Some (Any fib)); try - let res = f (Nursery fib) in + let res = f () in resolve_ok_ fib res with exn -> let bt = Printexc.get_raw_backtrace () in @@ -242,57 +243,33 @@ let spawn_ ~ls (Nursery n) (f : nursery -> 'a) : 'a t = resolve_as_failed_ fib ebt in - Runner.run_async ~ls n.runner run; + Runner.run_async ~ls runner run; fib -let spawn (Nursery n) ?(protect = true) f : _ t = +let spawn_top ~on f : _ t = + let ls = Task_local_storage.Direct.create () in + spawn_ ~ls ~runner:on ~parent:None f + +let spawn ?(protect = true) f : _ t = (* spawn [f()] with a copy of our local storage *) - let ls = Task_local_storage.Direct.copy n.ls in - let child = spawn_ ~ls (Nursery n) f in - add_child_ ~protect n child; + let (Any p) = + match get_cur () with + | None -> failwith "Fiber.spawn: must be run from within another fiber." + | Some p -> p + in + let ls = Task_local_storage.Direct.copy p.ls in + let child = spawn_ ~ls ~parent:(Some p) ~runner:p.runner f in + add_child_ ~protect p child; child -let[@inline] spawn_ignore n ?protect f : unit = - ignore (spawn n ?protect f : _ t) - -module Nursery = struct - type t = nursery - - let[@inline] runner (Nursery n) = n.runner - - let[@inline] await (Nursery n) : unit = - ignore (await n); - () - - let cancel_with (Nursery n) ebt : unit = resolve_as_failed_ n ebt - - let with_create_top ~on () f = - let n = create_ ~ls:(Task_local_storage.Direct.create ()) ~runner:on () in - Fun.protect ~finally:(fun () -> resolve_ok_ n ()) (fun () -> f (Nursery n)) - - let with_create_sub ~protect (Nursery parent : t) f = - let n = - create_ - ~ls:(Task_local_storage.Direct.copy parent.ls) - ~runner:parent.runner () - in - add_child_ ~protect parent n; - Fun.protect ~finally:(fun () -> resolve_ok_ n ()) (fun () -> f (Nursery n)) - - let[@inline] with_cancel_callback (Nursery self) cb f = - with_cancel_callback self cb f -end +let[@inline] spawn_ignore ?protect f : unit = ignore (spawn ?protect f : _ t) let[@inline] self () : any = match Task_local_storage.get k_current_fiber with | None -> failwith "Fiber.self: must be run from inside a fiber." | Some f -> f -let[@inline] cur_nursery () = - let (Any f) = self () in - Nursery f - let with_self_cancel_callback cb (k : unit -> 'a) : 'a = let (Any self) = self () in let h = add_on_cancel self cb in diff --git a/src/fib/fiber.mli b/src/fib/fiber.mli index 4a6366b8..a8e3b6c6 100644 --- a/src/fib/fiber.mli +++ b/src/fib/fiber.mli @@ -12,46 +12,6 @@ type cancel_callback = Exn_bt.t -> unit (** A callback used in case of cancellation *) -(** Nurseries. - - Fibers belong in a {b nursery} - (as defined in {{: https://trio.readthedocs.io/en/stable/reference-core.html} Trio}). - - The nursery does multiple things. - - - it stores a runner, so we easily know where to run fibers; - - it makes it clear in signatures that we might run fibers in a function - - it groups cancellation of multiple fibers together - -*) -module Nursery : sig - type t - - val runner : t -> Runner.t - (** Recover the runner this nursery uses to spawn fibers *) - - val await : t -> unit - (** Await for the nursery to exit. *) - - val with_create_top : on:Runner.t -> unit -> (t -> 'a) -> 'a - (** New toplevel nursery. It runs fibers on the [on] runner. *) - - val with_create_sub : protect:bool -> t -> (t -> 'a) -> 'a - (** Create a sub-nursery. The sub-nursery is cancelled if the - parent is. Cancelling the sub-nursery also cancels the - parent if [protect=false]. When the function returns, - the nursery is closed an no other fiber can be scheduled on it. *) - - val cancel_with : t -> Exn_bt.t -> unit - (** Cancel the nursery (and all its children) with the given exception. *) - - val with_cancel_callback : t -> cancel_callback -> (unit -> 'a) -> 'a - (** [with_cancel_callback nursery cb (fun () -> )] evaluates [e] - in a scope in which, if the nursery [nursery] is cancelled, - [cb()] is called. If [e] returns without the nursery being cancelled, - this callback is removed. *) -end - (**/**) module Private_ : sig @@ -91,11 +51,6 @@ val self : unit -> any Must be run from inside a fiber. @raise Failure if not run from inside a fiber. *) -val cur_nursery : unit -> Nursery.t -(** [cur_nursery ()] returns the nearest nursery. - Must be run from inside a fiber. - @raise Failure if not run from inside a fiber. *) - val peek : 'a t -> 'a Fut.or_error option (** Peek inside the future result *) @@ -154,22 +109,26 @@ val on_result : 'a t -> 'a callback -> unit with the result. If the fiber is done already then the callback is invoked immediately with its result. *) -val spawn : Nursery.t -> ?protect:bool -> (Nursery.t -> 'a) -> 'a t -(** [spawn n f] spawns a new fiber [fib] in the given nursery [n]. +val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t +(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. + This fiber is not the child of any other fiber: its lifetime + is only determined by the lifetime of [f()]. *) - The fiber [fib] is attached to the nursery and fails - if the nursery fails. +val spawn : ?protect:bool -> (unit -> 'a) -> 'a t +(** [spawn ~protect f] spawns a sub-fiber [f_child] + from a running fiber [parent]. + The sub-fiber [f_child] is attached to the current fiber and fails + if the current fiber [parent] fails. - The function [f] is passed a nursery whose lifetime is - the same as the fiber's. + @param protect if true, when [f_child] fails, it does not + affect [parent]. If false, [f_child] failing also + causes [parent] to fail (and therefore all other children + of [parent]). Default is [true]. - @param protect if true, when [fib] fails, it does not - affect [nursery] (but the failure can still be re-raised - in {!await}). If false, [fib] failing also - causes [nursery] to fail (and therefore all other children - of [nursery] to be cancelled). Default is [true]. *) + Must be run from inside a fiber. + @raise Failure if not run from inside a fiber. *) -val spawn_ignore : Nursery.t -> ?protect:bool -> (Nursery.t -> _) -> unit -(** [spawn_ignore n f] is [ignore (spawn n f)]. - The fiber will still affect termination of [n], ie. [n] will exit - only after the fiber exits. *) +val spawn_ignore : ?protect:bool -> (unit -> _) -> unit +(** [spawn_ignore f] is [ignore (spawn f)]. + The fiber will still affect termination of the parent, ie. the + parent will exit only after this new fiber exits. *) diff --git a/src/lwt/base.ml b/src/lwt/base.ml index 3b646b05..88e7ed3d 100644 --- a/src/lwt/base.ml +++ b/src/lwt/base.ml @@ -144,14 +144,13 @@ let detach_in_runner ~runner f : _ Lwt.t = Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn)); fut -let main_with_runner ~runner (f : Fiber.Nursery.t -> 'a) : 'a = +let main_with_runner ~runner (f : unit -> 'a) : 'a = let lwt_fut, lwt_prom = Lwt.wait () in - let@ n = Fiber.Nursery.with_create_top ~on:runner () in let _fiber = - Fiber.spawn n (fun _n -> + Fiber.spawn_top ~on:runner (fun () -> try - let x = f n in + let x = f () in Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x)) with exn -> Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn))) diff --git a/src/lwt/moonpool_lwt.mli b/src/lwt/moonpool_lwt.mli index bf7492a0..ac218e0c 100644 --- a/src/lwt/moonpool_lwt.mli +++ b/src/lwt/moonpool_lwt.mli @@ -136,9 +136,9 @@ val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t (** {2 Wrappers around Lwt_main} *) -val main_with_runner : runner:Moonpool.Runner.t -> (Fiber.Nursery.t -> 'a) -> 'a +val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a (** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside a fiber in [runner]. *) -val main : (Fiber.Nursery.t -> 'a) -> 'a +val main : (unit -> 'a) -> 'a (** Like {!main_with_runner} but with a default choice of runner. *) diff --git a/test/fiber/t_fib1.ml b/test/fiber/t_fib1.ml index 09c033f6..45fa2021 100644 --- a/test/fiber/t_fib1.ml +++ b/test/fiber/t_fib1.ml @@ -49,18 +49,17 @@ let logf = Log_.logf let () = Printf.printf "============\nstart\n"; - let@ nursery = F.Nursery.with_create_top ~on:runner () in let clock = ref TS.init in let fib = - F.spawn nursery @@ fun nursery -> + F.spawn_top ~on:runner @@ fun () -> let subs = List.init 5 (fun i -> - F.spawn nursery ~protect:false @@ fun _n -> + F.spawn ~protect:false @@ fun _n -> Thread.delay (float i *. 0.01); i) in - F.spawn_ignore nursery ~protect:false (fun _n -> + F.spawn_ignore ~protect:false (fun _n -> Thread.delay 0.4; TS.tick clock; logf !clock "other fib done"); @@ -91,9 +90,8 @@ let () = Printf.printf "============\nstart\n"; let clock = ref TS.init in - let@ nursery = F.Nursery.with_create_top ~on:runner () in let fib = - F.spawn nursery @@ fun nursery -> + F.spawn_top ~on:runner @@ fun () -> let@ () = F.with_self_cancel_callback (fun ebt -> logf (TS.tick_get clock) "main fiber cancelled with %s" @@ -104,7 +102,7 @@ let () = let subs = List.init 10 (fun i -> let clock = ref (0 :: i :: !clock) in - F.spawn nursery ~protect:false @@ fun _n -> + F.spawn ~protect:false @@ fun _n -> let@ () = F.with_self_cancel_callback (fun _ -> logf (TS.tick_get clock) "sub-fiber %d was cancelled" i) @@ -126,7 +124,7 @@ let () = | Error _ -> logf (i :: post) "fiber %d resolved as error" i)) subs; - F.spawn_ignore nursery ~protect:false (fun _n -> + F.spawn_ignore ~protect:false (fun _n -> Thread.delay 0.2; logf (TS.tick_get clock) "other fib done"); diff --git a/test/fiber/t_fls.ml b/test/fiber/t_fls.ml index 47fd21c4..01ee96ef 100644 --- a/test/fiber/t_fls.ml +++ b/test/fiber/t_fls.ml @@ -96,7 +96,6 @@ module Render = struct end let run ~pool ~pool_name () = - let@ nursery = F.Nursery.with_create_top ~on:pool () in let tracer = Tracer.create () in let sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub () = @@ -111,7 +110,7 @@ let run ~pool ~pool_name () = done in - let sub_child ~idx ~idx_child ~idx_sub nursery = + let sub_child ~idx ~idx_child ~idx_sub () = let@ () = Tracer.with_span tracer (spf "child_%d.%d.%d" idx idx_child idx_sub) in @@ -123,19 +122,19 @@ let run ~pool ~pool_name () = let subs = List.init 2 (fun idx_sub_sub -> - F.spawn ~protect:true nursery (fun _nursery -> + F.spawn ~protect:true (fun () -> sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ())) in List.iter F.await subs in - let top_child ~idx ~idx_child nursery = + let top_child ~idx ~idx_child () = let@ () = Tracer.with_span tracer (spf "child.%d.%d" idx idx_child) in let subs = List.init 2 (fun k -> - F.spawn nursery ~protect:true @@ fun nursery -> - sub_child ~idx ~idx_child ~idx_sub:k nursery) + F.spawn ~protect:true @@ fun () -> + sub_child ~idx ~idx_child ~idx_sub:k ()) in let@ () = @@ -145,13 +144,12 @@ let run ~pool ~pool_name () = List.iter F.await subs in - let top nursery idx = + let top idx = let@ () = Tracer.with_span tracer (spf "top_%d" idx) in let subs = List.init 5 (fun j -> - F.spawn nursery ~protect:true @@ fun nursery -> - top_child ~idx ~idx_child:j nursery) + F.spawn ~protect:true @@ fun () -> top_child ~idx ~idx_child:j ()) in List.iter F.await subs @@ -159,7 +157,7 @@ let run ~pool ~pool_name () = Printf.printf "run test on pool = %s\n" pool_name; let fibs = - List.init 8 (fun idx -> F.spawn nursery (fun nursery -> top nursery idx)) + List.init 8 (fun idx -> F.spawn_top ~on:pool (fun () -> top idx)) in List.iter F.wait_block_exn fibs;