introduce Fiber.Nursery.t; change local storage accordingly

This commit is contained in:
Simon Cruanes 2024-03-01 17:07:52 -05:00
parent cec77d2ee9
commit 37751c79e4
11 changed files with 222 additions and 148 deletions

View file

@ -6,11 +6,11 @@ let k_storage = Task_local_storage.Private_.Storage.k_storage
type task_full = type task_full =
| T_start of { | T_start of {
ls: Task_local_storage.storage; ls: Task_local_storage.storage ref;
f: task; f: task;
} }
| T_resume : { | T_resume : {
ls: Task_local_storage.storage; ls: Task_local_storage.storage ref;
k: 'a -> unit; k: 'a -> unit;
x: 'a; x: 'a;
} }
@ -30,23 +30,22 @@ let schedule_ (self : state) (task : task_full) : unit =
try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown
type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task
type worker_state = { cur_ls: Task_local_storage.storage ref } type worker_state = { mutable cur_ls: Task_local_storage.storage ref option }
let k_worker_state : worker_state option ref TLS.key = let k_worker_state : worker_state option ref TLS.key =
TLS.new_key (fun () -> ref None) TLS.new_key (fun () -> ref None)
let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit = let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit =
let w = { cur_ls = ref Task_local_storage.Private_.Storage.dummy } in let w = { cur_ls = None } in
TLS.get k_worker_state := Some w; TLS.get k_worker_state := Some w;
TLS.set k_storage (Some w.cur_ls);
TLS.get Runner.For_runner_implementors.k_cur_runner := Some runner; TLS.get Runner.For_runner_implementors.k_cur_runner := Some runner;
let (AT_pair (before_task, after_task)) = around_task in let (AT_pair (before_task, after_task)) = around_task in
let on_suspend () = let on_suspend () =
match !(TLS.get k_worker_state) with match !(TLS.get k_worker_state) with
| None -> assert false | Some { cur_ls = Some ls; _ } -> ls
| Some w -> !(w.cur_ls) | _ -> assert false
in in
let run_another_task ls task' = schedule_ self @@ T_start { f = task'; ls } in let run_another_task ls task' = schedule_ self @@ T_start { f = task'; ls } in
let resume ls k res = schedule_ self @@ T_resume { ls; k; x = res } in let resume ls k res = schedule_ self @@ T_resume { ls; k; x = res } in
@ -56,7 +55,8 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit =
match task with match task with
| T_start { ls; _ } | T_resume { ls; _ } -> ls | T_start { ls; _ } | T_resume { ls; _ } -> ls
in in
w.cur_ls := ls; w.cur_ls <- Some ls;
TLS.set k_storage (Some ls);
let _ctx = before_task runner in let _ctx = before_task runner in
(* run the task now, catching errors, handling effects *) (* run the task now, catching errors, handling effects *)
@ -74,7 +74,8 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit =
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
on_exn e bt); on_exn e bt);
after_task runner _ctx; after_task runner _ctx;
w.cur_ls := Task_local_storage.Private_.Storage.dummy w.cur_ls <- None;
TLS.set k_storage None
in in
let main_loop () = let main_loop () =

View file

@ -4,10 +4,8 @@ include Runner
(* convenient alias *) (* convenient alias *)
let k_ls = Task_local_storage.Private_.Storage.k_storage let k_ls = Task_local_storage.Private_.Storage.k_storage
let run_async_ ~ls f = let run_async_ ~ls:cur_ls f =
let cur_ls = ref ls in
TLS.set k_ls (Some cur_ls); TLS.set k_ls (Some cur_ls);
cur_ls := ls;
try try
let x = f () in let x = f () in
TLS.set k_ls None; TLS.set k_ls None;

View file

@ -27,14 +27,14 @@ val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t
to run all the various threads needed in an application (timers, event loops, etc.) *) to run all the various threads needed in an application (timers, event loops, etc.) *)
val run_async : val run_async :
?ls:Task_local_storage.storage -> Runner.t -> (unit -> unit) -> unit ?ls:Task_local_storage.storage ref -> Runner.t -> (unit -> unit) -> unit
(** [run_async runner task] schedules the task to run (** [run_async runner task] schedules the task to run
on the given runner. This means [task()] will be executed on the given runner. This means [task()] will be executed
at some point in the future, possibly in another thread. at some point in the future, possibly in another thread.
@since 0.5 *) @since 0.5 *)
val run_wait_block : val run_wait_block :
?ls:Task_local_storage.storage -> Runner.t -> (unit -> 'a) -> 'a ?ls:Task_local_storage.storage ref -> Runner.t -> (unit -> 'a) -> 'a
(** [run_wait_block runner f] schedules [f] for later execution (** [run_wait_block runner f] schedules [f] for later execution
on the runner, like {!run_async}. on the runner, like {!run_async}.
It then blocks the current thread until [f()] is done executing, It then blocks the current thread until [f()] is done executing,

View file

@ -3,7 +3,7 @@ module TLS = Thread_local_storage_
type task = unit -> unit type task = unit -> unit
type t = { type t = {
run_async: ls:Task_local_storage.storage -> task -> unit; run_async: ls:Task_local_storage.storage ref -> task -> unit;
shutdown: wait:bool -> unit -> unit; shutdown: wait:bool -> unit -> unit;
size: unit -> int; size: unit -> int;
num_tasks: unit -> int; num_tasks: unit -> int;
@ -11,8 +11,9 @@ type t = {
exception Shutdown exception Shutdown
let[@inline] run_async ?(ls = Task_local_storage.Private_.Storage.create ()) let[@inline] run_async
(self : t) f : unit = ?(ls = ref @@ Task_local_storage.Private_.Storage.create ()) (self : t) f :
unit =
self.run_async ~ls f self.run_async ~ls f
let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true () let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true ()

View file

@ -33,14 +33,15 @@ val shutdown_without_waiting : t -> unit
exception Shutdown exception Shutdown
val run_async : ?ls:Task_local_storage.storage -> t -> task -> unit val run_async : ?ls:Task_local_storage.storage ref -> t -> task -> unit
(** [run_async pool f] schedules [f] for later execution on the runner (** [run_async pool f] schedules [f] for later execution on the runner
in one of the threads. [f()] will run on one of the runner's in one of the threads. [f()] will run on one of the runner's
worker threads/domains. worker threads/domains.
@param ls if provided, run the task with this initial local storage @param ls if provided, run the task with this initial local storage
@raise Shutdown if the runner was shut down before [run_async] was called. *) @raise Shutdown if the runner was shut down before [run_async] was called. *)
val run_wait_block : ?ls:Task_local_storage.storage -> t -> (unit -> 'a) -> 'a val run_wait_block :
?ls:Task_local_storage.storage ref -> t -> (unit -> 'a) -> 'a
(** [run_wait_block pool f] schedules [f] for later execution (** [run_wait_block pool f] schedules [f] for later execution
on the pool, like {!run_async}. on the pool, like {!run_async}.
It then blocks the current thread until [f()] is done executing, It then blocks the current thread until [f()] is done executing,
@ -60,7 +61,7 @@ module For_runner_implementors : sig
size:(unit -> int) -> size:(unit -> int) ->
num_tasks:(unit -> int) -> num_tasks:(unit -> int) ->
shutdown:(wait:bool -> unit -> unit) -> shutdown:(wait:bool -> unit -> unit) ->
run_async:(ls:Task_local_storage.storage -> task -> unit) -> run_async:(ls:Task_local_storage.storage ref -> task -> unit) ->
unit -> unit ->
t t
(** Create a new runner. (** Create a new runner.

View file

@ -18,11 +18,11 @@ type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task
type task_full = type task_full =
| T_start of { | T_start of {
ls: Task_local_storage.storage; ls: Task_local_storage.storage ref;
f: task; f: task;
} }
| T_resume : { | T_resume : {
ls: Task_local_storage.storage; ls: Task_local_storage.storage ref;
k: 'a -> unit; k: 'a -> unit;
x: 'a; x: 'a;
} }
@ -32,7 +32,7 @@ type worker_state = {
pool_id_: Id.t; (** Unique per pool *) pool_id_: Id.t; (** Unique per pool *)
mutable thread: Thread.t; mutable thread: Thread.t;
q: task_full WSQ.t; (** Work stealing queue *) q: task_full WSQ.t; (** Work stealing queue *)
cur_ls: Task_local_storage.storage ref; (** Task storage *) mutable cur_ls: Task_local_storage.storage ref option; (** Task storage *)
rng: Random.State.t; rng: Random.State.t;
} }
(** State for a given worker. Only this worker is (** State for a given worker. Only this worker is
@ -120,17 +120,14 @@ let run_task_now_ (self : state) ~runner ~(w : worker_state) (task : task_full)
| T_start { ls; _ } | T_resume { ls; _ } -> ls | T_start { ls; _ } | T_resume { ls; _ } -> ls
in in
w.cur_ls := ls; w.cur_ls <- Some ls;
TLS.set k_storage (Some ls);
let _ctx = before_task runner in let _ctx = before_task runner in
let[@inline] on_suspend () = let[@inline] on_suspend () : _ ref =
let w = match find_current_worker_ () with
match find_current_worker_ () with | Some { cur_ls = Some w; _ } -> w
| Some w -> w | _ -> assert false
| None -> assert false
in
let ls = !(w.cur_ls) in
ls
in in
let run_another_task ls (task' : task) = let run_another_task ls (task' : task) =
@ -139,7 +136,7 @@ let run_task_now_ (self : state) ~runner ~(w : worker_state) (task : task_full)
| Some w when Id.equal w.pool_id_ self.id_ -> Some w | Some w when Id.equal w.pool_id_ self.id_ -> Some w
| _ -> None | _ -> None
in in
let ls' = Task_local_storage.Private_.Storage.copy ls in let ls' = ref @@ Task_local_storage.Private_.Storage.copy !ls in
schedule_task_ self ~w @@ T_start { ls = ls'; f = task' } schedule_task_ self ~w @@ T_start { ls = ls'; f = task' }
in in
@ -168,7 +165,8 @@ let run_task_now_ (self : state) ~runner ~(w : worker_state) (task : task_full)
self.on_exn e bt); self.on_exn e bt);
after_task runner _ctx; after_task runner _ctx;
w.cur_ls := Task_local_storage.Private_.Storage.dummy w.cur_ls <- None;
TLS.set k_storage None
let run_async_ (self : state) ~ls (f : task) : unit = let run_async_ (self : state) ~ls (f : task) : unit =
let w = find_current_worker_ () in let w = find_current_worker_ () in
@ -291,7 +289,7 @@ type ('a, 'b) create_args =
(** Arguments used in {!create}. See {!create} for explanations. *) (** Arguments used in {!create}. See {!create} for explanations. *)
let dummy_task_ : task_full = let dummy_task_ : task_full =
T_start { f = ignore; ls = Task_local_storage.Private_.Storage.dummy } T_start { f = ignore; ls = ref Task_local_storage.Private_.Storage.dummy }
let create ?(on_init_thread = default_thread_init_exit_) let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
@ -318,7 +316,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
thread = dummy; thread = dummy;
q = WSQ.create ~dummy:dummy_task_ (); q = WSQ.create ~dummy:dummy_task_ ();
rng = Random.State.make [| i |]; rng = Random.State.make [| i |];
cur_ls = ref Task_local_storage.Private_.Storage.dummy; cur_ls = None;
}) })
in in
@ -360,7 +358,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
let thread = Thread.self () in let thread = Thread.self () in
let t_id = Thread.id thread in let t_id = Thread.id thread in
on_init_thread ~dom_id:dom_idx ~t_id (); on_init_thread ~dom_id:dom_idx ~t_id ();
TLS.set k_storage (Some w.cur_ls); TLS.set k_storage None;
(* set thread name *) (* set thread name *)
Option.iter Option.iter

View file

@ -16,6 +16,7 @@ module Private_ = struct
state: 'a state A.t; (** Current state in the lifetime of the fiber *) state: 'a state A.t; (** Current state in the lifetime of the fiber *)
res: 'a Fut.t; res: 'a Fut.t;
runner: Runner.t; runner: Runner.t;
ls: Task_local_storage.storage ref;
} }
and 'a state = and 'a state =
@ -28,12 +29,18 @@ module Private_ = struct
and children = any FM.t and children = any FM.t
and any = Any : _ t -> any [@@unboxed] and any = Any : _ t -> any [@@unboxed]
and nursery = Nursery : _ t -> nursery [@@unboxed]
(** Key to access the current fiber. *) (** Key to access the current fiber. *)
let k_current_fiber : any option Task_local_storage.key = let k_current_fiber : any option Task_local_storage.key =
Task_local_storage.new_key ~init:(fun () -> None) () Task_local_storage.new_key ~init:(fun () -> None) ()
let[@inline] get_cur () : any option = Task_local_storage.get k_current_fiber let[@inline] get_cur () : any option = Task_local_storage.get k_current_fiber
let[@inline] is_closed (self : _ t) =
match A.get self.state with
| Alive _ -> false
| Terminating_or_done _ -> true
end end
include Private_ include Private_
@ -44,6 +51,9 @@ let[@inline] is_done self = Fut.is_done self.res
let[@inline] is_success self = Fut.is_success self.res let[@inline] is_success self = Fut.is_success self.res
let[@inline] is_cancelled self = Fut.is_failed self.res let[@inline] is_cancelled self = Fut.is_failed self.res
let[@inline] on_result (self : _ t) f = Fut.on_result self.res f let[@inline] on_result (self : _ t) f = Fut.on_result self.res f
let[@inline] await self = Fut.await self.res
let[@inline] wait_block self = Fut.wait_block self.res
let[@inline] wait_block_exn self = Fut.wait_block_exn self.res
(** Resolve [promise] once [children] are all done *) (** Resolve [promise] once [children] are all done *)
let resolve_once_children_are_done_ ~children ~promise let resolve_once_children_are_done_ ~children ~promise
@ -92,6 +102,53 @@ let rec resolve_as_failed_ : type a. a t -> Exn_bt.t -> unit =
and cancel_children_ ebt ~children : unit = and cancel_children_ ebt ~children : unit =
FM.iter (fun _ (Any f) -> resolve_as_failed_ f ebt) children FM.iter (fun _ (Any f) -> resolve_as_failed_ f ebt) children
type cancel_handle = int
let add_on_cancel (self : _ t) cb : cancel_handle =
let h = ref 0 in
while
match A.get self.state with
| Alive { children; cancel_id; on_cancel } as old ->
let new_st =
Alive
{
children;
cancel_id = cancel_id + 1;
on_cancel = Int_map.add cancel_id cb on_cancel;
}
in
if A.compare_and_set self.state old new_st then (
h := cancel_id;
false
) else
true
| Terminating_or_done r ->
(match A.get r with
| Error ebt -> cb ebt
| Ok _ -> ());
false
do
()
done;
!h
let remove_on_cancel (self : _ t) h =
while
match A.get self.state with
| Alive ({ on_cancel; _ } as alive) as old ->
let new_st =
Alive { alive with on_cancel = Int_map.remove h on_cancel }
in
not (A.compare_and_set self.state old new_st)
| Terminating_or_done _ -> false
do
()
done
let with_cancel_callback (self : _ t) cb (k : unit -> 'a) : 'a =
let h = add_on_cancel self cb in
Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
(** Successfully resolve the fiber *) (** Successfully resolve the fiber *)
let resolve_ok_ (self : 'a t) (r : 'a) : unit = let resolve_ok_ (self : 'a t) (r : 'a) : unit =
let r = A.make @@ Ok r in let r = A.make @@ Ok r in
@ -156,26 +213,28 @@ let add_child_ ~protect (self : _ t) (child : _ t) =
() ()
done done
let spawn_ ~ls ~on (f : _ -> 'a) : 'a t = let create_ ~ls ~runner () : 'a t =
let id = Handle.generate_fresh () in let id = Handle.generate_fresh () in
let res, _promise = Fut.make () in let res, _promise = Fut.make () in
let fib = {
{ state =
state = A.make
A.make @@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 };
@@ Alive id;
{ children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 }; res;
id; runner;
res; ls;
runner = on; }
}
in let spawn_ ~ls (Nursery n) (f : nursery -> 'a) : 'a t =
if is_closed n then failwith "spawn: nursery is closed";
let fib = create_ ~ls ~runner:n.runner () in
let run () = let run () =
(* make sure the fiber is accessible from inside itself *) (* make sure the fiber is accessible from inside itself *)
Task_local_storage.set k_current_fiber (Some (Any fib)); Task_local_storage.set k_current_fiber (Some (Any fib));
try try
let res = f () in let res = f (Nursery fib) in
resolve_ok_ fib res resolve_ok_ fib res
with exn -> with exn ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
@ -183,90 +242,64 @@ let spawn_ ~ls ~on (f : _ -> 'a) : 'a t =
resolve_as_failed_ fib ebt resolve_as_failed_ fib ebt
in in
Runner.run_async ?ls on run; Runner.run_async ~ls n.runner run;
fib fib
let[@inline] spawn_top ~on f : _ t = spawn_ ~ls:None ~on f let spawn (Nursery n) ?(protect = true) f : _ t =
(* spawn [f()] with a copy of our local storage *)
let ls = ref (Task_local_storage.Private_.Storage.copy !(n.ls)) in
let child = spawn_ ~ls (Nursery n) f in
add_child_ ~protect n child;
child
let[@inline] spawn_ignore n ?protect f : unit =
ignore (spawn n ?protect f : _ t)
module Nursery = struct
type t = nursery
let[@inline] await (Nursery n) : unit =
ignore (await n);
()
let cancel_with (Nursery n) ebt : unit = resolve_as_failed_ n ebt
let with_create_top ~on () f =
let n =
create_
~ls:(ref @@ Task_local_storage.Private_.Storage.create ())
~runner:on ()
in
Fun.protect ~finally:(fun () -> resolve_ok_ n ()) (fun () -> f (Nursery n))
let with_create_sub ~protect (Nursery parent : t) f =
let n =
create_
~ls:(ref @@ Task_local_storage.Private_.Storage.copy !(parent.ls))
~runner:parent.runner ()
in
add_child_ ~protect parent n;
Fun.protect ~finally:(fun () -> resolve_ok_ n ()) (fun () -> f (Nursery n))
let[@inline] with_cancel_callback (Nursery self) cb f =
with_cancel_callback self cb f
end
let[@inline] self () : any = let[@inline] self () : any =
match Task_local_storage.get k_current_fiber with match Task_local_storage.get k_current_fiber with
| None -> failwith "Fiber.self: must be run from inside a fiber." | None -> failwith "Fiber.self: must be run from inside a fiber."
| Some f -> f | Some f -> f
let spawn_link_ ?(protect = true) parent f : _ t = let[@inline] cur_nursery () =
(* spawn [f()] with a copy of our local storage *) let (Any f) = self () in
let ls = Task_local_storage.Private_.Storage.copy_of_current () in Nursery f
let child = spawn_ ~ls:(Some ls) ~on:parent.runner f in
add_child_ ~protect parent child;
child
let spawn_link ?protect f : _ t =
match Task_local_storage.get k_current_fiber with
| None -> failwith "Fiber.spawn_link: must be run from inside a fiber."
| Some (Any parent) -> spawn_link_ ?protect parent f
let spawn_top_or_link ?protect ~on f : _ t =
match Task_local_storage.get_opt k_current_fiber with
| Some (Some (Any parent)) -> spawn_link_ ?protect parent f
| None | Some None -> spawn_top ~on f
type cancel_handle = int
let add_on_cancel (self : _ t) cb : cancel_handle =
let h = ref 0 in
while
match A.get self.state with
| Alive { children; cancel_id; on_cancel } as old ->
let new_st =
Alive
{
children;
cancel_id = cancel_id + 1;
on_cancel = Int_map.add cancel_id cb on_cancel;
}
in
if A.compare_and_set self.state old new_st then (
h := cancel_id;
false
) else
true
| Terminating_or_done r ->
(match A.get r with
| Error ebt -> cb ebt
| Ok _ -> ());
false
do
()
done;
!h
let remove_on_cancel (self : _ t) h =
while
match A.get self.state with
| Alive ({ on_cancel; _ } as alive) as old ->
let new_st =
Alive { alive with on_cancel = Int_map.remove h on_cancel }
in
not (A.compare_and_set self.state old new_st)
| Terminating_or_done _ -> false
do
()
done
let with_cancel_callback (self : _ t) cb (k : unit -> 'a) : 'a =
let h = add_on_cancel self cb in
Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
let with_self_cancel_callback cb (k : unit -> 'a) : 'a = let with_self_cancel_callback cb (k : unit -> 'a) : 'a =
let (Any self) = self () in let (Any self) = self () in
let h = add_on_cancel self cb in let h = add_on_cancel self cb in
Fun.protect k ~finally:(fun () -> remove_on_cancel self h) Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
let[@inline] await self = Fut.await self.res
let[@inline] wait_block self = Fut.wait_block self.res
let[@inline] wait_block_exn self = Fut.wait_block_exn self.res
module Suspend_ = Moonpool.Private.Suspend_ module Suspend_ = Moonpool.Private.Suspend_
let check_if_cancelled () = let check_if_cancelled () =

View file

@ -9,6 +9,46 @@
of structured concurrency of structured concurrency
*) *)
type cancel_callback = Exn_bt.t -> unit
(** A callback used in case of cancellation *)
(** Nurseries.
Fibers belong in a {b nursery}
(as defined in {{: https://trio.readthedocs.io/en/stable/reference-core.html} Trio}).
The nursery does multiple things.
- it stores a runner, so we easily know where to run fibers;
- it makes it clear in signatures that we might run fibers in a function
- it groups cancellation of multiple fibers together
*)
module Nursery : sig
type t
val await : t -> unit
(** Await for the nursery to exit. *)
val with_create_top : on:Runner.t -> unit -> (t -> 'a) -> 'a
(** New toplevel nursery. It runs fibers on the [on] runner. *)
val with_create_sub : protect:bool -> t -> (t -> 'a) -> 'a
(** Create a sub-nursery. The sub-nursery is cancelled if the
parent is. Cancelling the sub-nursery also cancels the
parent if [protect=false]. When the function returns,
the nursery is closed an no other fiber can be scheduled on it. *)
val cancel_with : t -> Exn_bt.t -> unit
(** Cancel the nursery (and all its children) with the given exception. *)
val with_cancel_callback : t -> cancel_callback -> (unit -> 'a) -> 'a
(** [with_cancel_callback nursery cb (fun () -> <e>)] evaluates [e]
in a scope in which, if the nursery [nursery] is cancelled,
[cb()] is called. If [e] returns without the nursery being cancelled,
this callback is removed. *)
end
(**/**) (**/**)
module Private_ : sig module Private_ : sig
@ -19,6 +59,7 @@ module Private_ : sig
state: 'a state Atomic.t; (** Current state in the lifetime of the fiber *) state: 'a state Atomic.t; (** Current state in the lifetime of the fiber *)
res: 'a Fut.t; res: 'a Fut.t;
runner: Runner.t; runner: Runner.t;
ls: Task_local_storage.storage ref;
} }
(** Type definition, exposed so that {!any} can be unboxed. (** Type definition, exposed so that {!any} can be unboxed.
Please do not rely on that. *) Please do not rely on that. *)
@ -39,8 +80,6 @@ val res : 'a t -> 'a Fut.t
type 'a callback = 'a Exn_bt.result -> unit type 'a callback = 'a Exn_bt.result -> unit
(** Callbacks that are called when a fiber is done. *) (** Callbacks that are called when a fiber is done. *)
type cancel_callback = Exn_bt.t -> unit
(** Type erased fiber *) (** Type erased fiber *)
type any = Private_.any = Any : _ t -> any [@@unboxed] type any = Private_.any = Any : _ t -> any [@@unboxed]
@ -49,6 +88,11 @@ val self : unit -> any
Must be run from inside a fiber. Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *) @raise Failure if not run from inside a fiber. *)
val cur_nursery : unit -> Nursery.t
(** [cur_nursery ()] returns the nearest nursery.
Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *)
val peek : 'a t -> 'a Fut.or_error option val peek : 'a t -> 'a Fut.or_error option
(** Peek inside the future result *) (** Peek inside the future result *)
@ -107,28 +151,22 @@ val on_result : 'a t -> 'a callback -> unit
with the result. If the fiber is done already then the with the result. If the fiber is done already then the
callback is invoked immediately with its result. *) callback is invoked immediately with its result. *)
val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t val spawn : Nursery.t -> ?protect:bool -> (Nursery.t -> 'a) -> 'a t
(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. (** [spawn n f] spawns a new fiber [fib] in the given nursery [n].
This fiber is not the child of any other fiber: its lifetime
is only determined by the lifetime of [f()]. *)
val spawn_link : ?protect:bool -> (unit -> 'a) -> 'a t The fiber [fib] is attached to the nursery and fails
(** [spawn_link ~protect f] spawns a sub-fiber [f_child] if the nursery fails.
from a running fiber [parent].
The sub-fiber [f_child] is attached to the current fiber and fails
if the current fiber [parent] fails.
@param protect if true, when [f_child] fails, it does not The function [f] is passed a nursery whose lifetime is
affect [parent]. If false, [f_child] failing also the same as the fiber's.
causes [parent] to fail (and therefore all other children
of [parent]). Default is [true].
Must be run from inside a fiber. @param protect if true, when [fib] fails, it does not
@raise Failure if not run from inside a fiber. *) affect [nursery] (but the failure can still be re-raised
in {!await}). If false, [fib] failing also
causes [nursery] to fail (and therefore all other children
of [nursery] to be cancelled). Default is [true]. *)
val spawn_top_or_link : ?protect:bool -> on:Runner.t -> (unit -> 'a) -> 'a t val spawn_ignore : Nursery.t -> ?protect:bool -> (Nursery.t -> _) -> unit
(** [spawn_top_or_link ~on ~protect f] runs [f()] in a new fiber. (** [spawn_ignore n f] is [ignore (spawn n f)].
If this is run from inside a fiber, this behaves like [spawn_link ~protect f] The fiber will still affect termination of [n], ie. [n] will exit
(links to the parent); otherwise it behaves like [spawn_top ~on f]. only after the fiber exits. *)
@param protect if false, failure of the new fiber will also cancel
the parent (in case there is a parent). Default [true]. See {!spawn_link}. *)

View file

@ -1,4 +1,7 @@
(** The unique name of a fiber *) (** The unique name of a fiber.
Each fiber has a unique handle that can be used to
refer to it in maps or sets. *)
type t = private int type t = private int
(** Unique, opaque identifier for a fiber. *) (** Unique, opaque identifier for a fiber. *)

View file

@ -144,13 +144,14 @@ let detach_in_runner ~runner f : _ Lwt.t =
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn)); Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn));
fut fut
let main_with_runner ~runner (f : unit -> 'a) : 'a = let main_with_runner ~runner (f : Fiber.Nursery.t -> 'a) : 'a =
let lwt_fut, lwt_prom = Lwt.wait () in let lwt_fut, lwt_prom = Lwt.wait () in
let@ n = Fiber.Nursery.with_create_top ~on:runner () in
let _fiber = let _fiber =
Fiber.spawn_top ~on:runner (fun () -> Fiber.spawn n (fun _n ->
try try
let x = f () in let x = f n in
Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x)) Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x))
with exn -> with exn ->
Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn))) Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn)))

View file

@ -136,9 +136,9 @@ val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t
(** {2 Wrappers around Lwt_main} *) (** {2 Wrappers around Lwt_main} *)
val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a val main_with_runner : runner:Moonpool.Runner.t -> (Fiber.Nursery.t -> 'a) -> 'a
(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside (** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside
a fiber in [runner]. *) a fiber in [runner]. *)
val main : (unit -> 'a) -> 'a val main : (Fiber.Nursery.t -> 'a) -> 'a
(** Like {!main_with_runner} but with a default choice of runner. *) (** Like {!main_with_runner} but with a default choice of runner. *)