From 9db0a9fe28bf12eafc4363d0663025c157429dd6 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 12 Aug 2023 14:09:35 -0400 Subject: [PATCH 1/2] full lifecycle for worker domains domains can now stop when all worker threads running on them are done --- src/d_pool_.ml | 66 ++++++++++++++++++++++++++++++++----------------- src/d_pool_.mli | 3 +++ src/pool.ml | 2 +- 3 files changed, 48 insertions(+), 23 deletions(-) diff --git a/src/d_pool_.ml b/src/d_pool_.ml index af0f7811..076d61ba 100644 --- a/src/d_pool_.ml +++ b/src/d_pool_.ml @@ -1,34 +1,56 @@ type domain = Domain_.t -let work_ _i q : unit = +type event = + | Run of (unit -> unit) (** Run this function *) + | Decr + (** decrement number of threads on this domain. If it reaches 0, + wind down *) + +(* State for a domain worker. It should not do too much except for starting + new threads for pools. *) +type worker_state = { + q: event Bb_queue.t; + th_count: int Atomic_.t; (** Number of threads on this *) +} + +let work_ (st : worker_state) : unit = Dla_.setup_domain (); - while true do - let f = Bb_queue.pop q in - try f () with _ -> () + while Atomic_.get st.th_count > 0 do + match Bb_queue.pop st.q with + | Run f -> (try f () with _ -> ()) + | Decr -> Atomic_.decr st.th_count done -(* A domain level worker. It should not do too much except for starting - new threads for pools. *) -(* TODO: take [Run of (unit -> unit) | Incr | Decr] to handle refcounting? *) -type worker = { q: (unit -> unit) Bb_queue.t } [@@unboxed] +(** Array of (optional) workers. -(* TODO: use [worker option array], with a mechanism to start/stop them *) -let domains_ : worker array lazy_t = - lazy - ((* number of domains we spawn. Note that we spawn n-1 domains - because there already is the main domain running. *) - let n = max 1 (Domain_.recommended_number () - 1) in - Array.init n (fun i -> - let q = Bb_queue.create () in - let _domain : domain = Domain_.spawn (fun () -> work_ i q) in - { q })) + Workers are started/stop on demand. *) +let domains_ : worker_state option Lock.t array = + (* number of domains we spawn. Note that we spawn n-1 domains + because there already is the main domain running. *) + let n = max 1 (Domain_.recommended_number () - 1) in + Array.init n (fun _ -> Lock.create None) -let[@inline] n_domains () : int = Array.length (Lazy.force domains_) +let[@inline] n_domains () : int = Array.length domains_ let run_on (i : int) (f : unit -> unit) : unit = - let (lazy arr) = domains_ in - assert (i < Array.length arr); - Bb_queue.push arr.(i).q f + assert (i < Array.length domains_); + + Lock.update domains_.(i) (function + | Some w as st -> + Atomic_.incr w.th_count; + Bb_queue.push w.q (Run f); + st + | None -> + let st = { th_count = Atomic_.make 1; q = Bb_queue.create () } in + let _domain : domain = Domain_.spawn (fun () -> work_ st) in + Bb_queue.push st.q (Run f); + Some st) + +let decr_on (i : int) : unit = + assert (i < Array.length domains_); + match Lock.get domains_.(i) with + | None -> () + | Some st -> Bb_queue.push st.q Decr let run_on_and_wait (i : int) (f : unit -> 'a) : 'a = let q = Bb_queue.create () in diff --git a/src/d_pool_.mli b/src/d_pool_.mli index 1dd05d5f..dd0902af 100644 --- a/src/d_pool_.mli +++ b/src/d_pool_.mli @@ -15,6 +15,9 @@ val run_on : int -> (unit -> unit) -> unit (** [run_on i f] runs [f()] on the domain with index [i]. Precondition: [0 <= i < n_domains()] *) +val decr_on : int -> unit +(** Signal that a thread is stopping on the domain with index [i] *) + val run_on_and_wait : int -> (unit -> 'a) -> 'a (** [run_on_and_wait i f] runs [f()] on the domain with index [i], and blocks until the result of [f()] is returned back. *) diff --git a/src/pool.ml b/src/pool.ml index 55d78a10..eded6b24 100644 --- a/src/pool.ml +++ b/src/pool.ml @@ -241,7 +241,7 @@ let create ?(on_init_thread = default_thread_init_exit_) in (* now run the main loop *) - run' (); + Fun.protect run' ~finally:(fun () -> D_pool_.decr_on dom_idx); on_exit_thread ~dom_id:dom_idx ~t_id () in From e38ee31b9363a661e0039488413e3df21cb3dbc9 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 12 Aug 2023 22:06:58 -0400 Subject: [PATCH 2/2] bugfix: forgot to dispose of the worker state on winding down --- src/d_pool_.ml | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/d_pool_.ml b/src/d_pool_.ml index 076d61ba..2dedc339 100644 --- a/src/d_pool_.ml +++ b/src/d_pool_.ml @@ -13,14 +13,6 @@ type worker_state = { th_count: int Atomic_.t; (** Number of threads on this *) } -let work_ (st : worker_state) : unit = - Dla_.setup_domain (); - while Atomic_.get st.th_count > 0 do - match Bb_queue.pop st.q with - | Run f -> (try f () with _ -> ()) - | Decr -> Atomic_.decr st.th_count - done - (** Array of (optional) workers. Workers are started/stop on demand. *) @@ -30,6 +22,16 @@ let domains_ : worker_state option Lock.t array = let n = max 1 (Domain_.recommended_number () - 1) in Array.init n (fun _ -> Lock.create None) +let work_ idx (st : worker_state) : unit = + Dla_.setup_domain (); + while Atomic_.get st.th_count > 0 do + match Bb_queue.pop st.q with + | Run f -> (try f () with _ -> ()) + | Decr -> + if Atomic_.fetch_and_add st.th_count (-1) = 1 then + Lock.set domains_.(idx) None + done + let[@inline] n_domains () : int = Array.length domains_ let run_on (i : int) (f : unit -> unit) : unit = @@ -42,7 +44,7 @@ let run_on (i : int) (f : unit -> unit) : unit = st | None -> let st = { th_count = Atomic_.make 1; q = Bb_queue.create () } in - let _domain : domain = Domain_.spawn (fun () -> work_ st) in + let _domain : domain = Domain_.spawn (fun () -> work_ i st) in Bb_queue.push st.q (Run f); Some st)