full lifecycle for worker domains

domains can now stop when all worker threads running on them are done
This commit is contained in:
Simon Cruanes 2023-08-12 14:09:35 -04:00
parent 5680938a6c
commit 9db0a9fe28
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
3 changed files with 48 additions and 23 deletions

View file

@ -1,34 +1,56 @@
type domain = Domain_.t type domain = Domain_.t
let work_ _i q : unit = type event =
| Run of (unit -> unit) (** Run this function *)
| Decr
(** decrement number of threads on this domain. If it reaches 0,
wind down *)
(* State for a domain worker. It should not do too much except for starting
new threads for pools. *)
type worker_state = {
q: event Bb_queue.t;
th_count: int Atomic_.t; (** Number of threads on this *)
}
let work_ (st : worker_state) : unit =
Dla_.setup_domain (); Dla_.setup_domain ();
while true do while Atomic_.get st.th_count > 0 do
let f = Bb_queue.pop q in match Bb_queue.pop st.q with
try f () with _ -> () | Run f -> (try f () with _ -> ())
| Decr -> Atomic_.decr st.th_count
done done
(* A domain level worker. It should not do too much except for starting (** Array of (optional) workers.
new threads for pools. *)
(* TODO: take [Run of (unit -> unit) | Incr | Decr] to handle refcounting? *)
type worker = { q: (unit -> unit) Bb_queue.t } [@@unboxed]
(* TODO: use [worker option array], with a mechanism to start/stop them *) Workers are started/stop on demand. *)
let domains_ : worker array lazy_t = let domains_ : worker_state option Lock.t array =
lazy (* number of domains we spawn. Note that we spawn n-1 domains
((* number of domains we spawn. Note that we spawn n-1 domains
because there already is the main domain running. *) because there already is the main domain running. *)
let n = max 1 (Domain_.recommended_number () - 1) in let n = max 1 (Domain_.recommended_number () - 1) in
Array.init n (fun i -> Array.init n (fun _ -> Lock.create None)
let q = Bb_queue.create () in
let _domain : domain = Domain_.spawn (fun () -> work_ i q) in
{ q }))
let[@inline] n_domains () : int = Array.length (Lazy.force domains_) let[@inline] n_domains () : int = Array.length domains_
let run_on (i : int) (f : unit -> unit) : unit = let run_on (i : int) (f : unit -> unit) : unit =
let (lazy arr) = domains_ in assert (i < Array.length domains_);
assert (i < Array.length arr);
Bb_queue.push arr.(i).q f Lock.update domains_.(i) (function
| Some w as st ->
Atomic_.incr w.th_count;
Bb_queue.push w.q (Run f);
st
| None ->
let st = { th_count = Atomic_.make 1; q = Bb_queue.create () } in
let _domain : domain = Domain_.spawn (fun () -> work_ st) in
Bb_queue.push st.q (Run f);
Some st)
let decr_on (i : int) : unit =
assert (i < Array.length domains_);
match Lock.get domains_.(i) with
| None -> ()
| Some st -> Bb_queue.push st.q Decr
let run_on_and_wait (i : int) (f : unit -> 'a) : 'a = let run_on_and_wait (i : int) (f : unit -> 'a) : 'a =
let q = Bb_queue.create () in let q = Bb_queue.create () in

View file

@ -15,6 +15,9 @@ val run_on : int -> (unit -> unit) -> unit
(** [run_on i f] runs [f()] on the domain with index [i]. (** [run_on i f] runs [f()] on the domain with index [i].
Precondition: [0 <= i < n_domains()] *) Precondition: [0 <= i < n_domains()] *)
val decr_on : int -> unit
(** Signal that a thread is stopping on the domain with index [i] *)
val run_on_and_wait : int -> (unit -> 'a) -> 'a val run_on_and_wait : int -> (unit -> 'a) -> 'a
(** [run_on_and_wait i f] runs [f()] on the domain with index [i], (** [run_on_and_wait i f] runs [f()] on the domain with index [i],
and blocks until the result of [f()] is returned back. *) and blocks until the result of [f()] is returned back. *)

View file

@ -241,7 +241,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
in in
(* now run the main loop *) (* now run the main loop *)
run' (); Fun.protect run' ~finally:(fun () -> D_pool_.decr_on dom_idx);
on_exit_thread ~dom_id:dom_idx ~t_id () on_exit_thread ~dom_id:dom_idx ~t_id ()
in in