domain pool: let domains live a bit longer

this is an optimization to ensure we don't stop/start domains too often,
which harms performance really badly.
This commit is contained in:
Simon Cruanes 2023-08-13 22:48:04 -04:00
parent cfbcc72648
commit 18d5bad2a9
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
3 changed files with 70 additions and 58 deletions

View file

@ -2,14 +2,13 @@ type domain = Domain_.t
type event = type event =
| Run of (unit -> unit) (** Run this function *) | Run of (unit -> unit) (** Run this function *)
| Die (** Nudge the domain, asking it to die *) | Decr (** Decrease count *)
(* State for a domain worker. It should not do too much except for starting (* State for a domain worker. It should not do too much except for starting
new threads for pools. *) new threads for pools. *)
type worker_state = { type worker_state = {
q: event Bb_queue.t; q: event Bb_queue.t;
th_count: int Atomic_.t; (** Number of threads on this *) th_count: int Atomic_.t; (** Number of threads on this *)
mutable domain: domain option;
} }
(** Array of (optional) workers. (** Array of (optional) workers.
@ -21,14 +20,66 @@ let domains_ : worker_state option Lock.t array =
let n = max 1 (Domain_.recommended_number () - 1) in let n = max 1 (Domain_.recommended_number () - 1) in
Array.init n (fun _ -> Lock.create None) Array.init n (fun _ -> Lock.create None)
let work_ (st : worker_state) : unit = (** main work loop for a domain worker.
A domain worker does two things:
- run functions it's asked to (mainly, to start new threads inside it)
- decrease the refcount when one of these threads stops. The thread
will notify the domain that it's exiting, so the domain can know
how many threads are still using it. If all threads exit, the domain
polls a bit (in case new threads are created really shortly after,
which happens with a [Pool.with_] or [Pool.create() Pool.shutdown()]
in a tight loop), and if nothing happens it tries to stop to free resources.
*)
let work_ idx (st : worker_state) : unit =
Dla_.setup_domain (); Dla_.setup_domain ();
let continue = ref true in
while !continue do let main_loop () =
match Bb_queue.pop st.q with let continue = ref true in
| Run f -> (try f () with _ -> ()) while !continue do
| Die -> continue := false match Bb_queue.pop st.q with
done | Run f -> (try f () with _ -> ())
| Decr ->
if Atomic_.fetch_and_add st.th_count (-1) = 1 then (
continue := false;
(* wait a bit, we might be needed again in a short amount of time *)
try
for _n_attempt = 1 to 50 do
Thread.delay 0.001;
if Atomic_.get st.th_count > 0 then (
(* needed again! *)
continue := true;
raise Exit
)
done
with Exit -> ()
)
done
in
while
main_loop ();
(* exit: try to remove ourselves from [domains]. If that fails, keep living. *)
let is_alive =
Lock.update_map domains_.(idx) (function
| None -> assert false
| Some _st' ->
assert (st == _st');
if Atomic_.get st.th_count > 0 then
(* still alive! *)
Some st, true
else
None, false)
in
is_alive
do
()
done;
()
let[@inline] n_domains () : int = Array.length domains_ let[@inline] n_domains () : int = Array.length domains_
@ -40,34 +91,17 @@ let run_on (i : int) (f : unit -> unit) : unit =
Atomic_.incr w.th_count; Atomic_.incr w.th_count;
st, w st, w
| None -> | None ->
let w = let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in
{ th_count = Atomic_.make 1; q = Bb_queue.create (); domain = None } let _worker : domain = Domain_.spawn (fun () -> work_ i w) in
in
let worker : domain = Domain_.spawn (fun () -> work_ w) in
w.domain <- Some worker;
Some w, w) Some w, w)
in in
Bb_queue.push w.q (Run f) Bb_queue.push w.q (Run f)
let decr_on (i : int) ~(domain_to_join : Domain_.t -> unit) : unit = let decr_on (i : int) : unit =
assert (i < Array.length domains_); assert (i < Array.length domains_);
let st_to_kill = match Lock.get domains_.(i) with
Lock.update_map domains_.(i) (function | Some st -> Bb_queue.push st.q Decr
| None -> assert false
| Some st ->
if Atomic_.fetch_and_add st.th_count (-1) = 1 then
None, Some st
else
Some st, None)
in
(* prepare for domain termination outside of critical section *)
match st_to_kill with
| None -> () | None -> ()
| Some st ->
(* ask the domain to die *)
Bb_queue.push st.q Die;
Option.iter domain_to_join st.domain
let run_on_and_wait (i : int) (f : unit -> 'a) : 'a = let run_on_and_wait (i : int) (f : unit -> 'a) : 'a =
let q = Bb_queue.create () in let q = Bb_queue.create () in

View file

@ -15,10 +15,8 @@ val run_on : int -> (unit -> unit) -> unit
(** [run_on i f] runs [f()] on the domain with index [i]. (** [run_on i f] runs [f()] on the domain with index [i].
Precondition: [0 <= i < n_domains()] *) Precondition: [0 <= i < n_domains()] *)
val decr_on : int -> domain_to_join:(Domain_.t -> unit) -> unit val decr_on : int -> unit
(** Signal that a thread is stopping on the domain with index [i]. (** Signal that a thread is stopping on the domain with index [i]. *)
@param domain_to_join called with a domain if this domain shuts down
because no one is using it anymore *)
val run_on_and_wait : int -> (unit -> 'a) -> 'a val run_on_and_wait : int -> (unit -> 'a) -> 'a
(** [run_on_and_wait i f] runs [f()] on the domain with index [i], (** [run_on_and_wait i f] runs [f()] on the domain with index [i],

View file

@ -22,7 +22,6 @@ type state = {
active: bool A.t; active: bool A.t;
threads: Thread.t array; threads: Thread.t array;
qs: task Bb_queue.t array; qs: task Bb_queue.t array;
domains_to_join: Domain_.t Bb_queue.t;
cur_q: int A.t; (** Selects queue into which to push *) cur_q: int A.t; (** Selects queue into which to push *)
} }
(** internal state *) (** internal state *)
@ -159,19 +158,7 @@ let shutdown_ ~wait (self : state) : unit =
(* close the job queues, which will fail future calls to [run], (* close the job queues, which will fail future calls to [run],
and wake up the subset of [self.threads] that are waiting on them. *) and wake up the subset of [self.threads] that are waiting on them. *)
if was_active then Array.iter Bb_queue.close self.qs; if was_active then Array.iter Bb_queue.close self.qs;
if wait then Array.iter Thread.join self.threads; if wait then Array.iter Thread.join self.threads
Bb_queue.close self.domains_to_join;
(* now join domains which need to be joined *)
while
match Bb_queue.pop self.domains_to_join with
| exception Bb_queue.Closed -> false
| d ->
Domain_.join d;
true
do
()
done
type ('a, 'b) create_args = type ('a, 'b) create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
@ -212,13 +199,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
let pool = let pool =
let dummy = Thread.self () in let dummy = Thread.self () in
{ { active; threads = Array.make num_threads dummy; qs; cur_q = A.make 0 }
active;
threads = Array.make num_threads dummy;
qs;
cur_q = A.make 0;
domains_to_join = Bb_queue.create ();
}
in in
let runner = let runner =
@ -262,8 +243,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
(* now run the main loop *) (* now run the main loop *)
Fun.protect run' ~finally:(fun () -> Fun.protect run' ~finally:(fun () ->
(* on termination, decrease refcount of underlying domain *) (* on termination, decrease refcount of underlying domain *)
D_pool_.decr_on dom_idx D_pool_.decr_on dom_idx);
~domain_to_join:(Bb_queue.push pool.domains_to_join));
on_exit_thread ~dom_id:dom_idx ~t_id () on_exit_thread ~dom_id:dom_idx ~t_id ()
in in