Compare commits

..

2 commits

Author SHA1 Message Date
Simon Cruanes
f68fc426b2
Merge 997d996c13 into d957f7b54e 2025-11-12 14:11:48 +00:00
Simon Cruanes
997d996c13
fix test 2025-11-12 09:10:52 -05:00
8 changed files with 40 additions and 59 deletions

View file

@ -27,6 +27,11 @@ type worker_state = {
let[@inline] size_ (self : state) = Array.length self.threads let[@inline] size_ (self : state) = Array.length self.threads
let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q
(*
get_thread_state = TLS.get_opt k_worker_state
*)
let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = () let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = ()
let shutdown_ ~wait (self : state) : unit = let shutdown_ ~wait (self : state) : unit =

View file

@ -9,19 +9,19 @@ let k_local_hmap : Hmap.t FLS.t = FLS.create ()
(** Access the local [hmap], or an empty one if not set *) (** Access the local [hmap], or an empty one if not set *)
let[@inline] get_local_hmap () : Hmap.t = let[@inline] get_local_hmap () : Hmap.t =
match TLS.get_exn k_cur_st with match TLS.get_exn k_cur_fiber with
| exception TLS.Not_set -> Hmap.empty | exception TLS.Not_set -> Hmap.empty
| { cur_fiber = fiber; _ } -> FLS.get fiber ~default:Hmap.empty k_local_hmap | fiber -> FLS.get fiber ~default:Hmap.empty k_local_hmap
let[@inline] set_local_hmap (h : Hmap.t) : unit = let[@inline] set_local_hmap (h : Hmap.t) : unit =
match TLS.get_exn k_cur_st with match TLS.get_exn k_cur_fiber with
| exception TLS.Not_set -> () | exception TLS.Not_set -> ()
| { cur_fiber = fiber; _ } -> FLS.set fiber k_local_hmap h | fiber -> FLS.set fiber k_local_hmap h
let[@inline] update_local_hmap (f : Hmap.t -> Hmap.t) : unit = let[@inline] update_local_hmap (f : Hmap.t -> Hmap.t) : unit =
match TLS.get_exn k_cur_st with match TLS.get_exn k_cur_fiber with
| exception TLS.Not_set -> () | exception TLS.Not_set -> ()
| { cur_fiber = fiber; _ } -> | fiber ->
let h = FLS.get fiber ~default:Hmap.empty k_local_hmap in let h = FLS.get fiber ~default:Hmap.empty k_local_hmap in
let h = f h in let h = f h in
FLS.set fiber k_local_hmap h FLS.set fiber k_local_hmap h

View file

@ -1,7 +1,6 @@
exception Oh_no of Exn_bt.t exception Oh_no of Exn_bt.t
let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a = let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
let module WL = Worker_loop_ in
let worker_st = let worker_st =
Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ()) Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ())
~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt))) ~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt)))
@ -9,17 +8,15 @@ let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
in in
let runner = Fifo_pool.Private_.runner_of_state worker_st in let runner = Fifo_pool.Private_.runner_of_state worker_st in
try try
let fut = Fut.spawn ~on:runner (fun () -> f runner) in let fiber = Fut.spawn ~on:runner (fun () -> f runner) in
Fut.on_result fut (fun _ -> Runner.shutdown_without_waiting runner); Fut.on_result fiber (fun _ -> Runner.shutdown_without_waiting runner);
Thread_local_storage.set Runner.For_runner_implementors.k_cur_st
{ cur_fiber = Picos.Fiber.create ~forbid:true fut; runner };
(* run the main thread *) (* run the main thread *)
WL.worker_loop worker_st ~block_signals (* do not disturb existing thread *) Worker_loop_.worker_loop worker_st
~block_signals (* do not disturb existing thread *)
~ops:Fifo_pool.Private_.worker_ops; ~ops:Fifo_pool.Private_.worker_ops;
match Fut.peek fut with match Fut.peek fiber with
| Some (Ok x) -> x | Some (Ok x) -> x
| Some (Error ebt) -> Exn_bt.raise ebt | Some (Error ebt) -> Exn_bt.raise ebt
| None -> assert false | None -> assert false

View file

@ -47,12 +47,7 @@ module For_runner_implementors = struct
let create ~size ~num_tasks ~shutdown ~run_async () : t = let create ~size ~num_tasks ~shutdown ~run_async () : t =
{ size; num_tasks; shutdown; run_async } { size; num_tasks; shutdown; run_async }
type nonrec thread_local_state = thread_local_state = { let k_cur_runner : t TLS.t = Types_.k_cur_runner
mutable runner: t;
mutable cur_fiber: fiber;
}
let k_cur_st : thread_local_state TLS.t = Types_.k_cur_st
end end
let dummy : t = let dummy : t =

View file

@ -72,13 +72,7 @@ module For_runner_implementors : sig
{b NOTE}: the runner should support DLA and {!Suspend_} on OCaml 5.x, so {b NOTE}: the runner should support DLA and {!Suspend_} on OCaml 5.x, so
that {!Fork_join} and other 5.x features work properly. *) that {!Fork_join} and other 5.x features work properly. *)
type thread_local_state = { val k_cur_runner : t Thread_local_storage.t
mutable runner: t;
mutable cur_fiber: fiber;
}
(** State set in thread-local-storage for worker threads *)
val k_cur_st : thread_local_state Thread_local_storage.t
(** Key that should be used by each runner to store itself in TLS on every (** Key that should be used by each runner to store itself in TLS on every
thread it controls, so that tasks running on these threads can access the thread it controls, so that tasks running on these threads can access the
runner. This is necessary for {!get_current_runner} to work. *) runner. This is necessary for {!get_current_runner} to work. *)

View file

@ -11,12 +11,8 @@ type runner = {
num_tasks: unit -> int; num_tasks: unit -> int;
} }
type thread_local_state = { let k_cur_runner : runner TLS.t = TLS.create ()
mutable runner: runner; let k_cur_fiber : fiber TLS.t = TLS.create ()
mutable cur_fiber: fiber;
}
let k_cur_st : thread_local_state TLS.t = TLS.create ()
let _dummy_computation : Picos.Computation.packed = let _dummy_computation : Picos.Computation.packed =
let c = Picos.Computation.create () in let c = Picos.Computation.create () in
@ -24,15 +20,11 @@ let _dummy_computation : Picos.Computation.packed =
Picos.Computation.Packed c Picos.Computation.Packed c
let _dummy_fiber = Picos.Fiber.create_packed ~forbid:true _dummy_computation let _dummy_fiber = Picos.Fiber.create_packed ~forbid:true _dummy_computation
let[@inline] get_current_runner () : _ option = TLS.get_opt k_cur_runner
let[@inline] get_current_runner () : _ option =
match TLS.get_exn k_cur_st with
| st -> Some st.runner
| exception TLS.Not_set -> None
let[@inline] get_current_fiber () : fiber option = let[@inline] get_current_fiber () : fiber option =
match TLS.get_exn k_cur_st with match TLS.get_exn k_cur_fiber with
| { cur_fiber = f; _ } when f != _dummy_fiber -> Some f | f when f != _dummy_fiber -> Some f
| _ -> None | _ -> None
| exception TLS.Not_set -> None | exception TLS.Not_set -> None
@ -40,7 +32,7 @@ let error_get_current_fiber_ =
"Moonpool: get_current_fiber was called outside of a fiber." "Moonpool: get_current_fiber was called outside of a fiber."
let[@inline] get_current_fiber_exn () : fiber = let[@inline] get_current_fiber_exn () : fiber =
match TLS.get_exn k_cur_st with match TLS.get_exn k_cur_fiber with
| { cur_fiber = f; _ } when f != _dummy_fiber -> f | f when f != _dummy_fiber -> f
| _ -> failwith error_get_current_fiber_ | _ -> failwith error_get_current_fiber_
| exception TLS.Not_set -> failwith error_get_current_fiber_ | exception TLS.Not_set -> failwith error_get_current_fiber_

View file

@ -102,13 +102,7 @@ end
module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
open Args open Args
let cur_st : Runner.For_runner_implementors.thread_local_state Lazy.t = let cur_fiber : fiber ref = ref _dummy_fiber
lazy
(match TLS.get_exn Runner.For_runner_implementors.k_cur_st with
| st -> st
| exception TLS.Not_set ->
failwith "Moonpool: worker loop: no current state set")
let runner = ops.runner st let runner = ops.runner st
type state = type state =
@ -124,7 +118,10 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
| T_start { fiber; _ } | T_resume { fiber; _ } -> fiber | T_start { fiber; _ } | T_resume { fiber; _ } -> fiber
in in
(Lazy.force cur_st).cur_fiber <- fiber; cur_fiber := fiber;
TLS.set k_cur_fiber fiber;
(* let _ctx = before_task runner in *)
(* run the task now, catching errors, handling effects *) (* run the task now, catching errors, handling effects *)
assert (task != _dummy_task); assert (task != _dummy_task);
@ -139,7 +136,9 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
let ebt = Exn_bt.make e bt in let ebt = Exn_bt.make e bt in
ops.on_exn st ebt); ops.on_exn st ebt);
(Lazy.force cur_st).cur_fiber <- _dummy_fiber (* after_task runner _ctx; *)
cur_fiber := _dummy_fiber;
TLS.set k_cur_fiber _dummy_fiber
let setup ~block_signals () : unit = let setup ~block_signals () : unit =
if !state <> New then invalid_arg "worker_loop.setup: not a new instance"; if !state <> New then invalid_arg "worker_loop.setup: not a new instance";
@ -162,9 +161,9 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
with _ -> () with _ -> ()
); );
ops.before_start st; TLS.set Runner.For_runner_implementors.k_cur_runner runner;
(Lazy.force cur_st).runner <- runner;
() ops.before_start st
let run ?(max_tasks = max_int) () : unit = let run ?(max_tasks = max_int) () : unit =
if !state <> Ready then invalid_arg "worker_loop.run: not setup"; if !state <> Ready then invalid_arg "worker_loop.run: not setup";
@ -182,7 +181,7 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
let teardown () = let teardown () =
if !state <> Torn_down then ( if !state <> Torn_down then (
state := Torn_down; state := Torn_down;
(Lazy.force cur_st).cur_fiber <- _dummy_fiber; cur_fiber := _dummy_fiber;
ops.cleanup st ops.cleanup st
) )
end end

View file

@ -55,8 +55,7 @@ let num_tasks_ (self : state) : int =
!n !n
(** TLS, used by worker to store their specific state and be able to retrieve it (** TLS, used by worker to store their specific state and be able to retrieve it
from tasks when we schedule new sub-tasks. This way we can schedule the new from tasks when we schedule new sub-tasks. *)
task directly in the local work queue, where it might be stolen. *)
let k_worker_state : worker_state TLS.t = TLS.create () let k_worker_state : worker_state TLS.t = TLS.create ()
let[@inline] get_current_worker_ () : worker_state option = let[@inline] get_current_worker_ () : worker_state option =
@ -180,8 +179,8 @@ and wait_on_main_queue (self : worker_state) : WL.task_full =
let before_start (self : worker_state) : unit = let before_start (self : worker_state) : unit =
let t_id = Thread.id @@ Thread.self () in let t_id = Thread.id @@ Thread.self () in
self.st.on_init_thread ~dom_id:self.dom_id ~t_id (); self.st.on_init_thread ~dom_id:self.dom_id ~t_id ();
TLS.set Runner.For_runner_implementors.k_cur_st TLS.set k_cur_fiber _dummy_fiber;
{ cur_fiber = _dummy_fiber; runner = self.st.as_runner }; TLS.set Runner.For_runner_implementors.k_cur_runner self.st.as_runner;
TLS.set k_worker_state self; TLS.set k_worker_state self;
(* set thread name *) (* set thread name *)