mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 03:05:30 -05:00
Merge pull request #8 from c-cube/wip-7
full lifecycle for worker domains
This commit is contained in:
commit
7b0e7de94d
3 changed files with 52 additions and 25 deletions
|
|
@ -1,34 +1,58 @@
|
||||||
type domain = Domain_.t
|
type domain = Domain_.t
|
||||||
|
|
||||||
let work_ _i q : unit =
|
type event =
|
||||||
|
| Run of (unit -> unit) (** Run this function *)
|
||||||
|
| Decr
|
||||||
|
(** decrement number of threads on this domain. If it reaches 0,
|
||||||
|
wind down *)
|
||||||
|
|
||||||
|
(* State for a domain worker. It should not do too much except for starting
|
||||||
|
new threads for pools. *)
|
||||||
|
type worker_state = {
|
||||||
|
q: event Bb_queue.t;
|
||||||
|
th_count: int Atomic_.t; (** Number of threads on this *)
|
||||||
|
}
|
||||||
|
|
||||||
|
(** Array of (optional) workers.
|
||||||
|
|
||||||
|
Workers are started/stop on demand. *)
|
||||||
|
let domains_ : worker_state option Lock.t array =
|
||||||
|
(* number of domains we spawn. Note that we spawn n-1 domains
|
||||||
|
because there already is the main domain running. *)
|
||||||
|
let n = max 1 (Domain_.recommended_number () - 1) in
|
||||||
|
Array.init n (fun _ -> Lock.create None)
|
||||||
|
|
||||||
|
let work_ idx (st : worker_state) : unit =
|
||||||
Dla_.setup_domain ();
|
Dla_.setup_domain ();
|
||||||
while true do
|
while Atomic_.get st.th_count > 0 do
|
||||||
let f = Bb_queue.pop q in
|
match Bb_queue.pop st.q with
|
||||||
try f () with _ -> ()
|
| Run f -> (try f () with _ -> ())
|
||||||
|
| Decr ->
|
||||||
|
if Atomic_.fetch_and_add st.th_count (-1) = 1 then
|
||||||
|
Lock.set domains_.(idx) None
|
||||||
done
|
done
|
||||||
|
|
||||||
(* A domain level worker. It should not do too much except for starting
|
let[@inline] n_domains () : int = Array.length domains_
|
||||||
new threads for pools. *)
|
|
||||||
(* TODO: take [Run of (unit -> unit) | Incr | Decr] to handle refcounting? *)
|
|
||||||
type worker = { q: (unit -> unit) Bb_queue.t } [@@unboxed]
|
|
||||||
|
|
||||||
(* TODO: use [worker option array], with a mechanism to start/stop them *)
|
|
||||||
let domains_ : worker array lazy_t =
|
|
||||||
lazy
|
|
||||||
((* number of domains we spawn. Note that we spawn n-1 domains
|
|
||||||
because there already is the main domain running. *)
|
|
||||||
let n = max 1 (Domain_.recommended_number () - 1) in
|
|
||||||
Array.init n (fun i ->
|
|
||||||
let q = Bb_queue.create () in
|
|
||||||
let _domain : domain = Domain_.spawn (fun () -> work_ i q) in
|
|
||||||
{ q }))
|
|
||||||
|
|
||||||
let[@inline] n_domains () : int = Array.length (Lazy.force domains_)
|
|
||||||
|
|
||||||
let run_on (i : int) (f : unit -> unit) : unit =
|
let run_on (i : int) (f : unit -> unit) : unit =
|
||||||
let (lazy arr) = domains_ in
|
assert (i < Array.length domains_);
|
||||||
assert (i < Array.length arr);
|
|
||||||
Bb_queue.push arr.(i).q f
|
Lock.update domains_.(i) (function
|
||||||
|
| Some w as st ->
|
||||||
|
Atomic_.incr w.th_count;
|
||||||
|
Bb_queue.push w.q (Run f);
|
||||||
|
st
|
||||||
|
| None ->
|
||||||
|
let st = { th_count = Atomic_.make 1; q = Bb_queue.create () } in
|
||||||
|
let _domain : domain = Domain_.spawn (fun () -> work_ i st) in
|
||||||
|
Bb_queue.push st.q (Run f);
|
||||||
|
Some st)
|
||||||
|
|
||||||
|
let decr_on (i : int) : unit =
|
||||||
|
assert (i < Array.length domains_);
|
||||||
|
match Lock.get domains_.(i) with
|
||||||
|
| None -> ()
|
||||||
|
| Some st -> Bb_queue.push st.q Decr
|
||||||
|
|
||||||
let run_on_and_wait (i : int) (f : unit -> 'a) : 'a =
|
let run_on_and_wait (i : int) (f : unit -> 'a) : 'a =
|
||||||
let q = Bb_queue.create () in
|
let q = Bb_queue.create () in
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,9 @@ val run_on : int -> (unit -> unit) -> unit
|
||||||
(** [run_on i f] runs [f()] on the domain with index [i].
|
(** [run_on i f] runs [f()] on the domain with index [i].
|
||||||
Precondition: [0 <= i < n_domains()] *)
|
Precondition: [0 <= i < n_domains()] *)
|
||||||
|
|
||||||
|
val decr_on : int -> unit
|
||||||
|
(** Signal that a thread is stopping on the domain with index [i] *)
|
||||||
|
|
||||||
val run_on_and_wait : int -> (unit -> 'a) -> 'a
|
val run_on_and_wait : int -> (unit -> 'a) -> 'a
|
||||||
(** [run_on_and_wait i f] runs [f()] on the domain with index [i],
|
(** [run_on_and_wait i f] runs [f()] on the domain with index [i],
|
||||||
and blocks until the result of [f()] is returned back. *)
|
and blocks until the result of [f()] is returned back. *)
|
||||||
|
|
|
||||||
|
|
@ -241,7 +241,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
|
||||||
in
|
in
|
||||||
|
|
||||||
(* now run the main loop *)
|
(* now run the main loop *)
|
||||||
run' ();
|
Fun.protect run' ~finally:(fun () -> D_pool_.decr_on dom_idx);
|
||||||
on_exit_thread ~dom_id:dom_idx ~t_id ()
|
on_exit_thread ~dom_id:dom_idx ~t_id ()
|
||||||
in
|
in
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue