fix fiber: spawn sub-fiber with a copy of current local storage

This commit is contained in:
Simon Cruanes 2024-02-27 22:11:50 -05:00
parent bd7a48a4b4
commit bfd70dc5c2
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
4 changed files with 18 additions and 5 deletions

View file

@ -65,6 +65,7 @@ module Private_ = struct
let k_storage = k_ls_values
let[@inline] create () = [||]
let copy = Array.copy
let[@inline] copy_of_current () = copy @@ !(get_cur_ ())
let dummy = [||]
end
end

View file

@ -54,6 +54,7 @@ module Private_ : sig
val k_storage : t ref option Thread_local_storage_.key
val create : unit -> t
val copy : t -> t
val copy_of_current : unit -> t
val dummy : t
end
end

View file

@ -154,7 +154,7 @@ let add_child_ ~protect (self : _ t) (child : _ t) =
()
done
let spawn_ ~on (f : _ -> 'a) : 'a t =
let spawn_ ~ls ~on (f : _ -> 'a) : 'a t =
let id = Handle.generate_fresh () in
let res, _promise = Fut.make () in
let fib =
@ -169,7 +169,6 @@ let spawn_ ~on (f : _ -> 'a) : 'a t =
let run () =
(* make sure the fiber is accessible from inside itself *)
Task_local_storage.set k_current_fiber (Some (Any fib));
assert (Task_local_storage.get k_current_fiber |> Option.is_some);
try
let res = f () in
resolve_ok_ fib res
@ -179,11 +178,11 @@ let spawn_ ~on (f : _ -> 'a) : 'a t =
resolve_as_failed_ fib ebt
in
Runner.run_async on run;
Runner.run_async ?ls on run;
fib
let[@inline] spawn_top ~on f : _ t = spawn_ ~on f
let[@inline] spawn_top ~on f : _ t = spawn_ ~ls:None ~on f
let[@inline] self () : any =
match Task_local_storage.get k_current_fiber with
@ -194,7 +193,9 @@ let spawn_link ~protect f : _ t =
match Task_local_storage.get k_current_fiber with
| None -> failwith "Fiber.spawn_link: must be run from inside a fiber."
| Some (Any parent) ->
let child = spawn_ ~on:parent.runner f in
(* spawn [f()] with a copy of our local storage *)
let ls = Task_local_storage.Private_.Storage.copy_of_current () in
let child = spawn_ ~ls:(Some ls) ~on:parent.runner f in
add_child_ ~protect parent child;
child
@ -235,6 +236,8 @@ let with_self_cancel_callback cb (k : unit -> 'a) : 'a =
Fun.protect k ~finally:(fun () -> remove_top_cancel_cb_ self)
let[@inline] await self = Fut.await self.res
let[@inline] wait_block self = Fut.wait_block self.res
let[@inline] wait_block_exn self = Fut.wait_block_exn self.res
module Suspend_ = Moonpool.Private.Suspend_

View file

@ -64,6 +64,14 @@ val is_success : _ t -> bool
val await : 'a t -> 'a
(** [await fib] is like [Fut.await (res fib)] *)
val wait_block_exn : 'a t -> 'a
(** [wait_block_exn fib] is [Fut.wait_block_exn (res fib)].
{b NOTE}: See {!Fut.wait_block} for warnings about deadlocks. *)
val wait_block : 'a t -> 'a Fut.or_error
(** [wait_block fib] is [Fut.wait_block (res fib)].
{b NOTE}: See {!Fut.wait_block} for warnings about deadlocks. *)
val check_if_cancelled : unit -> unit
(** Check if the current fiber is cancelled, in which case this raises.
Must be run from inside a fiber.