diff --git a/src/core/fifo_pool.ml b/src/core/fifo_pool.ml index df4837c2..7646ca11 100644 --- a/src/core/fifo_pool.ml +++ b/src/core/fifo_pool.ml @@ -6,11 +6,11 @@ let k_storage = Task_local_storage.Private_.Storage.k_storage type task_full = | T_start of { - ls: Task_local_storage.storage; + ls: Task_local_storage.storage ref; f: task; } | T_resume : { - ls: Task_local_storage.storage; + ls: Task_local_storage.storage ref; k: 'a -> unit; x: 'a; } @@ -30,23 +30,22 @@ let schedule_ (self : state) (task : task_full) : unit = try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task -type worker_state = { cur_ls: Task_local_storage.storage ref } +type worker_state = { mutable cur_ls: Task_local_storage.storage ref option } let k_worker_state : worker_state option ref TLS.key = TLS.new_key (fun () -> ref None) let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit = - let w = { cur_ls = ref Task_local_storage.Private_.Storage.dummy } in + let w = { cur_ls = None } in TLS.get k_worker_state := Some w; - TLS.set k_storage (Some w.cur_ls); TLS.get Runner.For_runner_implementors.k_cur_runner := Some runner; let (AT_pair (before_task, after_task)) = around_task in let on_suspend () = match !(TLS.get k_worker_state) with - | None -> assert false - | Some w -> !(w.cur_ls) + | Some { cur_ls = Some ls; _ } -> ls + | _ -> assert false in let run_another_task ls task' = schedule_ self @@ T_start { f = task'; ls } in let resume ls k res = schedule_ self @@ T_resume { ls; k; x = res } in @@ -56,7 +55,8 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit = match task with | T_start { ls; _ } | T_resume { ls; _ } -> ls in - w.cur_ls := ls; + w.cur_ls <- Some ls; + TLS.set k_storage (Some ls); let _ctx = before_task runner in (* run the task now, catching errors, handling effects *) @@ -74,7 +74,8 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit = let bt = Printexc.get_raw_backtrace () in on_exn e bt); after_task runner _ctx; - w.cur_ls := Task_local_storage.Private_.Storage.dummy + w.cur_ls <- None; + TLS.set k_storage None in let main_loop () = diff --git a/src/core/immediate_runner.ml b/src/core/immediate_runner.ml index 9412fd35..4e15c434 100644 --- a/src/core/immediate_runner.ml +++ b/src/core/immediate_runner.ml @@ -4,10 +4,8 @@ include Runner (* convenient alias *) let k_ls = Task_local_storage.Private_.Storage.k_storage -let run_async_ ~ls f = - let cur_ls = ref ls in +let run_async_ ~ls:cur_ls f = TLS.set k_ls (Some cur_ls); - cur_ls := ls; try let x = f () in TLS.set k_ls None; diff --git a/src/core/moonpool.mli b/src/core/moonpool.mli index 92ed0dd5..df09d409 100644 --- a/src/core/moonpool.mli +++ b/src/core/moonpool.mli @@ -27,14 +27,14 @@ val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t to run all the various threads needed in an application (timers, event loops, etc.) *) val run_async : - ?ls:Task_local_storage.storage -> Runner.t -> (unit -> unit) -> unit + ?ls:Task_local_storage.storage ref -> Runner.t -> (unit -> unit) -> unit (** [run_async runner task] schedules the task to run on the given runner. This means [task()] will be executed at some point in the future, possibly in another thread. @since 0.5 *) val run_wait_block : - ?ls:Task_local_storage.storage -> Runner.t -> (unit -> 'a) -> 'a + ?ls:Task_local_storage.storage ref -> Runner.t -> (unit -> 'a) -> 'a (** [run_wait_block runner f] schedules [f] for later execution on the runner, like {!run_async}. It then blocks the current thread until [f()] is done executing, diff --git a/src/core/runner.ml b/src/core/runner.ml index 360ec6ba..391fdcd9 100644 --- a/src/core/runner.ml +++ b/src/core/runner.ml @@ -3,7 +3,7 @@ module TLS = Thread_local_storage_ type task = unit -> unit type t = { - run_async: ls:Task_local_storage.storage -> task -> unit; + run_async: ls:Task_local_storage.storage ref -> task -> unit; shutdown: wait:bool -> unit -> unit; size: unit -> int; num_tasks: unit -> int; @@ -11,8 +11,9 @@ type t = { exception Shutdown -let[@inline] run_async ?(ls = Task_local_storage.Private_.Storage.create ()) - (self : t) f : unit = +let[@inline] run_async + ?(ls = ref @@ Task_local_storage.Private_.Storage.create ()) (self : t) f : + unit = self.run_async ~ls f let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true () diff --git a/src/core/runner.mli b/src/core/runner.mli index 331e8b50..577a4b39 100644 --- a/src/core/runner.mli +++ b/src/core/runner.mli @@ -33,14 +33,15 @@ val shutdown_without_waiting : t -> unit exception Shutdown -val run_async : ?ls:Task_local_storage.storage -> t -> task -> unit +val run_async : ?ls:Task_local_storage.storage ref -> t -> task -> unit (** [run_async pool f] schedules [f] for later execution on the runner in one of the threads. [f()] will run on one of the runner's worker threads/domains. @param ls if provided, run the task with this initial local storage @raise Shutdown if the runner was shut down before [run_async] was called. *) -val run_wait_block : ?ls:Task_local_storage.storage -> t -> (unit -> 'a) -> 'a +val run_wait_block : + ?ls:Task_local_storage.storage ref -> t -> (unit -> 'a) -> 'a (** [run_wait_block pool f] schedules [f] for later execution on the pool, like {!run_async}. It then blocks the current thread until [f()] is done executing, @@ -60,7 +61,7 @@ module For_runner_implementors : sig size:(unit -> int) -> num_tasks:(unit -> int) -> shutdown:(wait:bool -> unit -> unit) -> - run_async:(ls:Task_local_storage.storage -> task -> unit) -> + run_async:(ls:Task_local_storage.storage ref -> task -> unit) -> unit -> t (** Create a new runner. diff --git a/src/core/ws_pool.ml b/src/core/ws_pool.ml index 956dd9ce..367fbae2 100644 --- a/src/core/ws_pool.ml +++ b/src/core/ws_pool.ml @@ -18,11 +18,11 @@ type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task type task_full = | T_start of { - ls: Task_local_storage.storage; + ls: Task_local_storage.storage ref; f: task; } | T_resume : { - ls: Task_local_storage.storage; + ls: Task_local_storage.storage ref; k: 'a -> unit; x: 'a; } @@ -32,7 +32,7 @@ type worker_state = { pool_id_: Id.t; (** Unique per pool *) mutable thread: Thread.t; q: task_full WSQ.t; (** Work stealing queue *) - cur_ls: Task_local_storage.storage ref; (** Task storage *) + mutable cur_ls: Task_local_storage.storage ref option; (** Task storage *) rng: Random.State.t; } (** State for a given worker. Only this worker is @@ -120,17 +120,14 @@ let run_task_now_ (self : state) ~runner ~(w : worker_state) (task : task_full) | T_start { ls; _ } | T_resume { ls; _ } -> ls in - w.cur_ls := ls; + w.cur_ls <- Some ls; + TLS.set k_storage (Some ls); let _ctx = before_task runner in - let[@inline] on_suspend () = - let w = - match find_current_worker_ () with - | Some w -> w - | None -> assert false - in - let ls = !(w.cur_ls) in - ls + let[@inline] on_suspend () : _ ref = + match find_current_worker_ () with + | Some { cur_ls = Some w; _ } -> w + | _ -> assert false in let run_another_task ls (task' : task) = @@ -139,7 +136,7 @@ let run_task_now_ (self : state) ~runner ~(w : worker_state) (task : task_full) | Some w when Id.equal w.pool_id_ self.id_ -> Some w | _ -> None in - let ls' = Task_local_storage.Private_.Storage.copy ls in + let ls' = ref @@ Task_local_storage.Private_.Storage.copy !ls in schedule_task_ self ~w @@ T_start { ls = ls'; f = task' } in @@ -168,7 +165,8 @@ let run_task_now_ (self : state) ~runner ~(w : worker_state) (task : task_full) self.on_exn e bt); after_task runner _ctx; - w.cur_ls := Task_local_storage.Private_.Storage.dummy + w.cur_ls <- None; + TLS.set k_storage None let run_async_ (self : state) ~ls (f : task) : unit = let w = find_current_worker_ () in @@ -291,7 +289,7 @@ type ('a, 'b) create_args = (** Arguments used in {!create}. See {!create} for explanations. *) let dummy_task_ : task_full = - T_start { f = ignore; ls = Task_local_storage.Private_.Storage.dummy } + T_start { f = ignore; ls = ref Task_local_storage.Private_.Storage.dummy } let create ?(on_init_thread = default_thread_init_exit_) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) @@ -318,7 +316,7 @@ let create ?(on_init_thread = default_thread_init_exit_) thread = dummy; q = WSQ.create ~dummy:dummy_task_ (); rng = Random.State.make [| i |]; - cur_ls = ref Task_local_storage.Private_.Storage.dummy; + cur_ls = None; }) in @@ -360,7 +358,7 @@ let create ?(on_init_thread = default_thread_init_exit_) let thread = Thread.self () in let t_id = Thread.id thread in on_init_thread ~dom_id:dom_idx ~t_id (); - TLS.set k_storage (Some w.cur_ls); + TLS.set k_storage None; (* set thread name *) Option.iter diff --git a/src/fib/fiber.ml b/src/fib/fiber.ml index d8390a2e..73c82ee4 100644 --- a/src/fib/fiber.ml +++ b/src/fib/fiber.ml @@ -16,6 +16,7 @@ module Private_ = struct state: 'a state A.t; (** Current state in the lifetime of the fiber *) res: 'a Fut.t; runner: Runner.t; + ls: Task_local_storage.storage ref; } and 'a state = @@ -28,12 +29,18 @@ module Private_ = struct and children = any FM.t and any = Any : _ t -> any [@@unboxed] + and nursery = Nursery : _ t -> nursery [@@unboxed] (** Key to access the current fiber. *) let k_current_fiber : any option Task_local_storage.key = Task_local_storage.new_key ~init:(fun () -> None) () let[@inline] get_cur () : any option = Task_local_storage.get k_current_fiber + + let[@inline] is_closed (self : _ t) = + match A.get self.state with + | Alive _ -> false + | Terminating_or_done _ -> true end include Private_ @@ -44,6 +51,9 @@ let[@inline] is_done self = Fut.is_done self.res let[@inline] is_success self = Fut.is_success self.res let[@inline] is_cancelled self = Fut.is_failed self.res let[@inline] on_result (self : _ t) f = Fut.on_result self.res f +let[@inline] await self = Fut.await self.res +let[@inline] wait_block self = Fut.wait_block self.res +let[@inline] wait_block_exn self = Fut.wait_block_exn self.res (** Resolve [promise] once [children] are all done *) let resolve_once_children_are_done_ ~children ~promise @@ -92,6 +102,53 @@ let rec resolve_as_failed_ : type a. a t -> Exn_bt.t -> unit = and cancel_children_ ebt ~children : unit = FM.iter (fun _ (Any f) -> resolve_as_failed_ f ebt) children +type cancel_handle = int + +let add_on_cancel (self : _ t) cb : cancel_handle = + let h = ref 0 in + while + match A.get self.state with + | Alive { children; cancel_id; on_cancel } as old -> + let new_st = + Alive + { + children; + cancel_id = cancel_id + 1; + on_cancel = Int_map.add cancel_id cb on_cancel; + } + in + if A.compare_and_set self.state old new_st then ( + h := cancel_id; + false + ) else + true + | Terminating_or_done r -> + (match A.get r with + | Error ebt -> cb ebt + | Ok _ -> ()); + false + do + () + done; + !h + +let remove_on_cancel (self : _ t) h = + while + match A.get self.state with + | Alive ({ on_cancel; _ } as alive) as old -> + let new_st = + Alive { alive with on_cancel = Int_map.remove h on_cancel } + in + not (A.compare_and_set self.state old new_st) + | Terminating_or_done _ -> false + do + () + done + +let with_cancel_callback (self : _ t) cb (k : unit -> 'a) : 'a = + let h = add_on_cancel self cb in + Fun.protect k ~finally:(fun () -> remove_on_cancel self h) + (** Successfully resolve the fiber *) let resolve_ok_ (self : 'a t) (r : 'a) : unit = let r = A.make @@ Ok r in @@ -156,26 +213,28 @@ let add_child_ ~protect (self : _ t) (child : _ t) = () done -let spawn_ ~ls ~on (f : _ -> 'a) : 'a t = +let create_ ~ls ~runner () : 'a t = let id = Handle.generate_fresh () in let res, _promise = Fut.make () in - let fib = - { - state = - A.make - @@ Alive - { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 }; - id; - res; - runner = on; - } - in + { + state = + A.make + @@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 }; + id; + res; + runner; + ls; + } + +let spawn_ ~ls (Nursery n) (f : nursery -> 'a) : 'a t = + if is_closed n then failwith "spawn: nursery is closed"; + let fib = create_ ~ls ~runner:n.runner () in let run () = (* make sure the fiber is accessible from inside itself *) Task_local_storage.set k_current_fiber (Some (Any fib)); try - let res = f () in + let res = f (Nursery fib) in resolve_ok_ fib res with exn -> let bt = Printexc.get_raw_backtrace () in @@ -183,90 +242,64 @@ let spawn_ ~ls ~on (f : _ -> 'a) : 'a t = resolve_as_failed_ fib ebt in - Runner.run_async ?ls on run; + Runner.run_async ~ls n.runner run; fib -let[@inline] spawn_top ~on f : _ t = spawn_ ~ls:None ~on f +let spawn (Nursery n) ?(protect = true) f : _ t = + (* spawn [f()] with a copy of our local storage *) + let ls = ref (Task_local_storage.Private_.Storage.copy !(n.ls)) in + let child = spawn_ ~ls (Nursery n) f in + add_child_ ~protect n child; + child + +let[@inline] spawn_ignore n ?protect f : unit = + ignore (spawn n ?protect f : _ t) + +module Nursery = struct + type t = nursery + + let[@inline] await (Nursery n) : unit = + ignore (await n); + () + + let cancel_with (Nursery n) ebt : unit = resolve_as_failed_ n ebt + + let with_create_top ~on () f = + let n = + create_ + ~ls:(ref @@ Task_local_storage.Private_.Storage.create ()) + ~runner:on () + in + Fun.protect ~finally:(fun () -> resolve_ok_ n ()) (fun () -> f (Nursery n)) + + let with_create_sub ~protect (Nursery parent : t) f = + let n = + create_ + ~ls:(ref @@ Task_local_storage.Private_.Storage.copy !(parent.ls)) + ~runner:parent.runner () + in + add_child_ ~protect parent n; + Fun.protect ~finally:(fun () -> resolve_ok_ n ()) (fun () -> f (Nursery n)) + + let[@inline] with_cancel_callback (Nursery self) cb f = + with_cancel_callback self cb f +end let[@inline] self () : any = match Task_local_storage.get k_current_fiber with | None -> failwith "Fiber.self: must be run from inside a fiber." | Some f -> f -let spawn_link_ ?(protect = true) parent f : _ t = - (* spawn [f()] with a copy of our local storage *) - let ls = Task_local_storage.Private_.Storage.copy_of_current () in - let child = spawn_ ~ls:(Some ls) ~on:parent.runner f in - add_child_ ~protect parent child; - child - -let spawn_link ?protect f : _ t = - match Task_local_storage.get k_current_fiber with - | None -> failwith "Fiber.spawn_link: must be run from inside a fiber." - | Some (Any parent) -> spawn_link_ ?protect parent f - -let spawn_top_or_link ?protect ~on f : _ t = - match Task_local_storage.get_opt k_current_fiber with - | Some (Some (Any parent)) -> spawn_link_ ?protect parent f - | None | Some None -> spawn_top ~on f - -type cancel_handle = int - -let add_on_cancel (self : _ t) cb : cancel_handle = - let h = ref 0 in - while - match A.get self.state with - | Alive { children; cancel_id; on_cancel } as old -> - let new_st = - Alive - { - children; - cancel_id = cancel_id + 1; - on_cancel = Int_map.add cancel_id cb on_cancel; - } - in - if A.compare_and_set self.state old new_st then ( - h := cancel_id; - false - ) else - true - | Terminating_or_done r -> - (match A.get r with - | Error ebt -> cb ebt - | Ok _ -> ()); - false - do - () - done; - !h - -let remove_on_cancel (self : _ t) h = - while - match A.get self.state with - | Alive ({ on_cancel; _ } as alive) as old -> - let new_st = - Alive { alive with on_cancel = Int_map.remove h on_cancel } - in - not (A.compare_and_set self.state old new_st) - | Terminating_or_done _ -> false - do - () - done - -let with_cancel_callback (self : _ t) cb (k : unit -> 'a) : 'a = - let h = add_on_cancel self cb in - Fun.protect k ~finally:(fun () -> remove_on_cancel self h) +let[@inline] cur_nursery () = + let (Any f) = self () in + Nursery f let with_self_cancel_callback cb (k : unit -> 'a) : 'a = let (Any self) = self () in let h = add_on_cancel self cb in Fun.protect k ~finally:(fun () -> remove_on_cancel self h) -let[@inline] await self = Fut.await self.res -let[@inline] wait_block self = Fut.wait_block self.res -let[@inline] wait_block_exn self = Fut.wait_block_exn self.res - module Suspend_ = Moonpool.Private.Suspend_ let check_if_cancelled () = diff --git a/src/fib/fiber.mli b/src/fib/fiber.mli index a53bbeb3..095ab569 100644 --- a/src/fib/fiber.mli +++ b/src/fib/fiber.mli @@ -9,6 +9,46 @@ of structured concurrency *) +type cancel_callback = Exn_bt.t -> unit +(** A callback used in case of cancellation *) + +(** Nurseries. + + Fibers belong in a {b nursery} + (as defined in {{: https://trio.readthedocs.io/en/stable/reference-core.html} Trio}). + + The nursery does multiple things. + + - it stores a runner, so we easily know where to run fibers; + - it makes it clear in signatures that we might run fibers in a function + - it groups cancellation of multiple fibers together + +*) +module Nursery : sig + type t + + val await : t -> unit + (** Await for the nursery to exit. *) + + val with_create_top : on:Runner.t -> unit -> (t -> 'a) -> 'a + (** New toplevel nursery. It runs fibers on the [on] runner. *) + + val with_create_sub : protect:bool -> t -> (t -> 'a) -> 'a + (** Create a sub-nursery. The sub-nursery is cancelled if the + parent is. Cancelling the sub-nursery also cancels the + parent if [protect=false]. When the function returns, + the nursery is closed an no other fiber can be scheduled on it. *) + + val cancel_with : t -> Exn_bt.t -> unit + (** Cancel the nursery (and all its children) with the given exception. *) + + val with_cancel_callback : t -> cancel_callback -> (unit -> 'a) -> 'a + (** [with_cancel_callback nursery cb (fun () -> )] evaluates [e] + in a scope in which, if the nursery [nursery] is cancelled, + [cb()] is called. If [e] returns without the nursery being cancelled, + this callback is removed. *) +end + (**/**) module Private_ : sig @@ -19,6 +59,7 @@ module Private_ : sig state: 'a state Atomic.t; (** Current state in the lifetime of the fiber *) res: 'a Fut.t; runner: Runner.t; + ls: Task_local_storage.storage ref; } (** Type definition, exposed so that {!any} can be unboxed. Please do not rely on that. *) @@ -39,8 +80,6 @@ val res : 'a t -> 'a Fut.t type 'a callback = 'a Exn_bt.result -> unit (** Callbacks that are called when a fiber is done. *) -type cancel_callback = Exn_bt.t -> unit - (** Type erased fiber *) type any = Private_.any = Any : _ t -> any [@@unboxed] @@ -49,6 +88,11 @@ val self : unit -> any Must be run from inside a fiber. @raise Failure if not run from inside a fiber. *) +val cur_nursery : unit -> Nursery.t +(** [cur_nursery ()] returns the nearest nursery. + Must be run from inside a fiber. + @raise Failure if not run from inside a fiber. *) + val peek : 'a t -> 'a Fut.or_error option (** Peek inside the future result *) @@ -107,28 +151,22 @@ val on_result : 'a t -> 'a callback -> unit with the result. If the fiber is done already then the callback is invoked immediately with its result. *) -val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t -(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. - This fiber is not the child of any other fiber: its lifetime - is only determined by the lifetime of [f()]. *) +val spawn : Nursery.t -> ?protect:bool -> (Nursery.t -> 'a) -> 'a t +(** [spawn n f] spawns a new fiber [fib] in the given nursery [n]. -val spawn_link : ?protect:bool -> (unit -> 'a) -> 'a t -(** [spawn_link ~protect f] spawns a sub-fiber [f_child] - from a running fiber [parent]. - The sub-fiber [f_child] is attached to the current fiber and fails - if the current fiber [parent] fails. + The fiber [fib] is attached to the nursery and fails + if the nursery fails. - @param protect if true, when [f_child] fails, it does not - affect [parent]. If false, [f_child] failing also - causes [parent] to fail (and therefore all other children - of [parent]). Default is [true]. + The function [f] is passed a nursery whose lifetime is + the same as the fiber's. - Must be run from inside a fiber. - @raise Failure if not run from inside a fiber. *) + @param protect if true, when [fib] fails, it does not + affect [nursery] (but the failure can still be re-raised + in {!await}). If false, [fib] failing also + causes [nursery] to fail (and therefore all other children + of [nursery] to be cancelled). Default is [true]. *) -val spawn_top_or_link : ?protect:bool -> on:Runner.t -> (unit -> 'a) -> 'a t -(** [spawn_top_or_link ~on ~protect f] runs [f()] in a new fiber. - If this is run from inside a fiber, this behaves like [spawn_link ~protect f] - (links to the parent); otherwise it behaves like [spawn_top ~on f]. - @param protect if false, failure of the new fiber will also cancel - the parent (in case there is a parent). Default [true]. See {!spawn_link}. *) +val spawn_ignore : Nursery.t -> ?protect:bool -> (Nursery.t -> _) -> unit +(** [spawn_ignore n f] is [ignore (spawn n f)]. + The fiber will still affect termination of [n], ie. [n] will exit + only after the fiber exits. *) diff --git a/src/fib/handle.mli b/src/fib/handle.mli index 1fc5b106..b31e61e4 100644 --- a/src/fib/handle.mli +++ b/src/fib/handle.mli @@ -1,4 +1,7 @@ -(** The unique name of a fiber *) +(** The unique name of a fiber. + + Each fiber has a unique handle that can be used to + refer to it in maps or sets. *) type t = private int (** Unique, opaque identifier for a fiber. *) diff --git a/src/lwt/base.ml b/src/lwt/base.ml index 88e7ed3d..3b646b05 100644 --- a/src/lwt/base.ml +++ b/src/lwt/base.ml @@ -144,13 +144,14 @@ let detach_in_runner ~runner f : _ Lwt.t = Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn)); fut -let main_with_runner ~runner (f : unit -> 'a) : 'a = +let main_with_runner ~runner (f : Fiber.Nursery.t -> 'a) : 'a = let lwt_fut, lwt_prom = Lwt.wait () in + let@ n = Fiber.Nursery.with_create_top ~on:runner () in let _fiber = - Fiber.spawn_top ~on:runner (fun () -> + Fiber.spawn n (fun _n -> try - let x = f () in + let x = f n in Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x)) with exn -> Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn))) diff --git a/src/lwt/moonpool_lwt.mli b/src/lwt/moonpool_lwt.mli index ac218e0c..bf7492a0 100644 --- a/src/lwt/moonpool_lwt.mli +++ b/src/lwt/moonpool_lwt.mli @@ -136,9 +136,9 @@ val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t (** {2 Wrappers around Lwt_main} *) -val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a +val main_with_runner : runner:Moonpool.Runner.t -> (Fiber.Nursery.t -> 'a) -> 'a (** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside a fiber in [runner]. *) -val main : (unit -> 'a) -> 'a +val main : (Fiber.Nursery.t -> 'a) -> 'a (** Like {!main_with_runner} but with a default choice of runner. *)