diff --git a/src/d_pool_.ml b/src/d_pool_.ml index 13468359..43c3be37 100644 --- a/src/d_pool_.ml +++ b/src/d_pool_.ml @@ -2,14 +2,13 @@ type domain = Domain_.t type event = | Run of (unit -> unit) (** Run this function *) - | Die (** Nudge the domain, asking it to die *) + | Decr (** Decrease count *) (* State for a domain worker. It should not do too much except for starting new threads for pools. *) type worker_state = { q: event Bb_queue.t; th_count: int Atomic_.t; (** Number of threads on this *) - mutable domain: domain option; } (** Array of (optional) workers. @@ -21,14 +20,66 @@ let domains_ : worker_state option Lock.t array = let n = max 1 (Domain_.recommended_number () - 1) in Array.init n (fun _ -> Lock.create None) -let work_ (st : worker_state) : unit = +(** main work loop for a domain worker. + + A domain worker does two things: + - run functions it's asked to (mainly, to start new threads inside it) + - decrease the refcount when one of these threads stops. The thread + will notify the domain that it's exiting, so the domain can know + how many threads are still using it. If all threads exit, the domain + polls a bit (in case new threads are created really shortly after, + which happens with a [Pool.with_] or [Pool.create() … Pool.shutdown()] + in a tight loop), and if nothing happens it tries to stop to free resources. +*) +let work_ idx (st : worker_state) : unit = Dla_.setup_domain (); - let continue = ref true in - while !continue do - match Bb_queue.pop st.q with - | Run f -> (try f () with _ -> ()) - | Die -> continue := false - done + + let main_loop () = + let continue = ref true in + while !continue do + match Bb_queue.pop st.q with + | Run f -> (try f () with _ -> ()) + | Decr -> + if Atomic_.fetch_and_add st.th_count (-1) = 1 then ( + continue := false; + + (* wait a bit, we might be needed again in a short amount of time *) + try + for _n_attempt = 1 to 50 do + Thread.delay 0.001; + if Atomic_.get st.th_count > 0 then ( + (* needed again! *) + continue := true; + raise Exit + ) + done + with Exit -> () + ) + done + in + + while + main_loop (); + + (* exit: try to remove ourselves from [domains]. If that fails, keep living. *) + let is_alive = + Lock.update_map domains_.(idx) (function + | None -> assert false + | Some _st' -> + assert (st == _st'); + + if Atomic_.get st.th_count > 0 then + (* still alive! *) + Some st, true + else + None, false) + in + + is_alive + do + () + done; + () let[@inline] n_domains () : int = Array.length domains_ @@ -40,34 +91,17 @@ let run_on (i : int) (f : unit -> unit) : unit = Atomic_.incr w.th_count; st, w | None -> - let w = - { th_count = Atomic_.make 1; q = Bb_queue.create (); domain = None } - in - let worker : domain = Domain_.spawn (fun () -> work_ w) in - w.domain <- Some worker; + let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in + let _worker : domain = Domain_.spawn (fun () -> work_ i w) in Some w, w) in Bb_queue.push w.q (Run f) -let decr_on (i : int) ~(domain_to_join : Domain_.t -> unit) : unit = +let decr_on (i : int) : unit = assert (i < Array.length domains_); - let st_to_kill = - Lock.update_map domains_.(i) (function - | None -> assert false - | Some st -> - if Atomic_.fetch_and_add st.th_count (-1) = 1 then - None, Some st - else - Some st, None) - in - - (* prepare for domain termination outside of critical section *) - match st_to_kill with + match Lock.get domains_.(i) with + | Some st -> Bb_queue.push st.q Decr | None -> () - | Some st -> - (* ask the domain to die *) - Bb_queue.push st.q Die; - Option.iter domain_to_join st.domain let run_on_and_wait (i : int) (f : unit -> 'a) : 'a = let q = Bb_queue.create () in diff --git a/src/d_pool_.mli b/src/d_pool_.mli index 378c065a..56001393 100644 --- a/src/d_pool_.mli +++ b/src/d_pool_.mli @@ -15,10 +15,8 @@ val run_on : int -> (unit -> unit) -> unit (** [run_on i f] runs [f()] on the domain with index [i]. Precondition: [0 <= i < n_domains()] *) -val decr_on : int -> domain_to_join:(Domain_.t -> unit) -> unit -(** Signal that a thread is stopping on the domain with index [i]. - @param domain_to_join called with a domain if this domain shuts down - because no one is using it anymore *) +val decr_on : int -> unit +(** Signal that a thread is stopping on the domain with index [i]. *) val run_on_and_wait : int -> (unit -> 'a) -> 'a (** [run_on_and_wait i f] runs [f()] on the domain with index [i], diff --git a/src/pool.ml b/src/pool.ml index 5faefc4d..43cda564 100644 --- a/src/pool.ml +++ b/src/pool.ml @@ -22,7 +22,6 @@ type state = { active: bool A.t; threads: Thread.t array; qs: task Bb_queue.t array; - domains_to_join: Domain_.t Bb_queue.t; cur_q: int A.t; (** Selects queue into which to push *) } (** internal state *) @@ -159,19 +158,7 @@ let shutdown_ ~wait (self : state) : unit = (* close the job queues, which will fail future calls to [run], and wake up the subset of [self.threads] that are waiting on them. *) if was_active then Array.iter Bb_queue.close self.qs; - if wait then Array.iter Thread.join self.threads; - Bb_queue.close self.domains_to_join; - - (* now join domains which need to be joined *) - while - match Bb_queue.pop self.domains_to_join with - | exception Bb_queue.Closed -> false - | d -> - Domain_.join d; - true - do - () - done + if wait then Array.iter Thread.join self.threads type ('a, 'b) create_args = ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> @@ -212,13 +199,7 @@ let create ?(on_init_thread = default_thread_init_exit_) let pool = let dummy = Thread.self () in - { - active; - threads = Array.make num_threads dummy; - qs; - cur_q = A.make 0; - domains_to_join = Bb_queue.create (); - } + { active; threads = Array.make num_threads dummy; qs; cur_q = A.make 0 } in let runner = @@ -262,8 +243,7 @@ let create ?(on_init_thread = default_thread_init_exit_) (* now run the main loop *) Fun.protect run' ~finally:(fun () -> (* on termination, decrease refcount of underlying domain *) - D_pool_.decr_on dom_idx - ~domain_to_join:(Bb_queue.push pool.domains_to_join)); + D_pool_.decr_on dom_idx); on_exit_thread ~dom_id:dom_idx ~t_id () in