Compare commits

..

2 commits

Author SHA1 Message Date
Simon Cruanes
f68fc426b2
Merge 997d996c13 into d957f7b54e 2025-11-12 14:11:48 +00:00
Simon Cruanes
997d996c13
fix test 2025-11-12 09:10:52 -05:00
8 changed files with 40 additions and 59 deletions

View file

@ -27,6 +27,11 @@ type worker_state = {
let[@inline] size_ (self : state) = Array.length self.threads
let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q
(*
get_thread_state = TLS.get_opt k_worker_state
*)
let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = ()
let shutdown_ ~wait (self : state) : unit =

View file

@ -9,19 +9,19 @@ let k_local_hmap : Hmap.t FLS.t = FLS.create ()
(** Access the local [hmap], or an empty one if not set *)
let[@inline] get_local_hmap () : Hmap.t =
match TLS.get_exn k_cur_st with
match TLS.get_exn k_cur_fiber with
| exception TLS.Not_set -> Hmap.empty
| { cur_fiber = fiber; _ } -> FLS.get fiber ~default:Hmap.empty k_local_hmap
| fiber -> FLS.get fiber ~default:Hmap.empty k_local_hmap
let[@inline] set_local_hmap (h : Hmap.t) : unit =
match TLS.get_exn k_cur_st with
match TLS.get_exn k_cur_fiber with
| exception TLS.Not_set -> ()
| { cur_fiber = fiber; _ } -> FLS.set fiber k_local_hmap h
| fiber -> FLS.set fiber k_local_hmap h
let[@inline] update_local_hmap (f : Hmap.t -> Hmap.t) : unit =
match TLS.get_exn k_cur_st with
match TLS.get_exn k_cur_fiber with
| exception TLS.Not_set -> ()
| { cur_fiber = fiber; _ } ->
| fiber ->
let h = FLS.get fiber ~default:Hmap.empty k_local_hmap in
let h = f h in
FLS.set fiber k_local_hmap h

View file

@ -1,7 +1,6 @@
exception Oh_no of Exn_bt.t
let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
let module WL = Worker_loop_ in
let worker_st =
Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ())
~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt)))
@ -9,17 +8,15 @@ let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
in
let runner = Fifo_pool.Private_.runner_of_state worker_st in
try
let fut = Fut.spawn ~on:runner (fun () -> f runner) in
Fut.on_result fut (fun _ -> Runner.shutdown_without_waiting runner);
Thread_local_storage.set Runner.For_runner_implementors.k_cur_st
{ cur_fiber = Picos.Fiber.create ~forbid:true fut; runner };
let fiber = Fut.spawn ~on:runner (fun () -> f runner) in
Fut.on_result fiber (fun _ -> Runner.shutdown_without_waiting runner);
(* run the main thread *)
WL.worker_loop worker_st ~block_signals (* do not disturb existing thread *)
Worker_loop_.worker_loop worker_st
~block_signals (* do not disturb existing thread *)
~ops:Fifo_pool.Private_.worker_ops;
match Fut.peek fut with
match Fut.peek fiber with
| Some (Ok x) -> x
| Some (Error ebt) -> Exn_bt.raise ebt
| None -> assert false

View file

@ -47,12 +47,7 @@ module For_runner_implementors = struct
let create ~size ~num_tasks ~shutdown ~run_async () : t =
{ size; num_tasks; shutdown; run_async }
type nonrec thread_local_state = thread_local_state = {
mutable runner: t;
mutable cur_fiber: fiber;
}
let k_cur_st : thread_local_state TLS.t = Types_.k_cur_st
let k_cur_runner : t TLS.t = Types_.k_cur_runner
end
let dummy : t =

View file

@ -72,13 +72,7 @@ module For_runner_implementors : sig
{b NOTE}: the runner should support DLA and {!Suspend_} on OCaml 5.x, so
that {!Fork_join} and other 5.x features work properly. *)
type thread_local_state = {
mutable runner: t;
mutable cur_fiber: fiber;
}
(** State set in thread-local-storage for worker threads *)
val k_cur_st : thread_local_state Thread_local_storage.t
val k_cur_runner : t Thread_local_storage.t
(** Key that should be used by each runner to store itself in TLS on every
thread it controls, so that tasks running on these threads can access the
runner. This is necessary for {!get_current_runner} to work. *)

View file

@ -11,12 +11,8 @@ type runner = {
num_tasks: unit -> int;
}
type thread_local_state = {
mutable runner: runner;
mutable cur_fiber: fiber;
}
let k_cur_st : thread_local_state TLS.t = TLS.create ()
let k_cur_runner : runner TLS.t = TLS.create ()
let k_cur_fiber : fiber TLS.t = TLS.create ()
let _dummy_computation : Picos.Computation.packed =
let c = Picos.Computation.create () in
@ -24,15 +20,11 @@ let _dummy_computation : Picos.Computation.packed =
Picos.Computation.Packed c
let _dummy_fiber = Picos.Fiber.create_packed ~forbid:true _dummy_computation
let[@inline] get_current_runner () : _ option =
match TLS.get_exn k_cur_st with
| st -> Some st.runner
| exception TLS.Not_set -> None
let[@inline] get_current_runner () : _ option = TLS.get_opt k_cur_runner
let[@inline] get_current_fiber () : fiber option =
match TLS.get_exn k_cur_st with
| { cur_fiber = f; _ } when f != _dummy_fiber -> Some f
match TLS.get_exn k_cur_fiber with
| f when f != _dummy_fiber -> Some f
| _ -> None
| exception TLS.Not_set -> None
@ -40,7 +32,7 @@ let error_get_current_fiber_ =
"Moonpool: get_current_fiber was called outside of a fiber."
let[@inline] get_current_fiber_exn () : fiber =
match TLS.get_exn k_cur_st with
| { cur_fiber = f; _ } when f != _dummy_fiber -> f
match TLS.get_exn k_cur_fiber with
| f when f != _dummy_fiber -> f
| _ -> failwith error_get_current_fiber_
| exception TLS.Not_set -> failwith error_get_current_fiber_

View file

@ -102,13 +102,7 @@ end
module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
open Args
let cur_st : Runner.For_runner_implementors.thread_local_state Lazy.t =
lazy
(match TLS.get_exn Runner.For_runner_implementors.k_cur_st with
| st -> st
| exception TLS.Not_set ->
failwith "Moonpool: worker loop: no current state set")
let cur_fiber : fiber ref = ref _dummy_fiber
let runner = ops.runner st
type state =
@ -124,7 +118,10 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
| T_start { fiber; _ } | T_resume { fiber; _ } -> fiber
in
(Lazy.force cur_st).cur_fiber <- fiber;
cur_fiber := fiber;
TLS.set k_cur_fiber fiber;
(* let _ctx = before_task runner in *)
(* run the task now, catching errors, handling effects *)
assert (task != _dummy_task);
@ -139,7 +136,9 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
let ebt = Exn_bt.make e bt in
ops.on_exn st ebt);
(Lazy.force cur_st).cur_fiber <- _dummy_fiber
(* after_task runner _ctx; *)
cur_fiber := _dummy_fiber;
TLS.set k_cur_fiber _dummy_fiber
let setup ~block_signals () : unit =
if !state <> New then invalid_arg "worker_loop.setup: not a new instance";
@ -162,9 +161,9 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
with _ -> ()
);
ops.before_start st;
(Lazy.force cur_st).runner <- runner;
()
TLS.set Runner.For_runner_implementors.k_cur_runner runner;
ops.before_start st
let run ?(max_tasks = max_int) () : unit =
if !state <> Ready then invalid_arg "worker_loop.run: not setup";
@ -182,7 +181,7 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
let teardown () =
if !state <> Torn_down then (
state := Torn_down;
(Lazy.force cur_st).cur_fiber <- _dummy_fiber;
cur_fiber := _dummy_fiber;
ops.cleanup st
)
end

View file

@ -55,8 +55,7 @@ let num_tasks_ (self : state) : int =
!n
(** TLS, used by worker to store their specific state and be able to retrieve it
from tasks when we schedule new sub-tasks. This way we can schedule the new
task directly in the local work queue, where it might be stolen. *)
from tasks when we schedule new sub-tasks. *)
let k_worker_state : worker_state TLS.t = TLS.create ()
let[@inline] get_current_worker_ () : worker_state option =
@ -180,8 +179,8 @@ and wait_on_main_queue (self : worker_state) : WL.task_full =
let before_start (self : worker_state) : unit =
let t_id = Thread.id @@ Thread.self () in
self.st.on_init_thread ~dom_id:self.dom_id ~t_id ();
TLS.set Runner.For_runner_implementors.k_cur_st
{ cur_fiber = _dummy_fiber; runner = self.st.as_runner };
TLS.set k_cur_fiber _dummy_fiber;
TLS.set Runner.For_runner_implementors.k_cur_runner self.st.as_runner;
TLS.set k_worker_state self;
(* set thread name *)