Quickly hacked version joining a previous domain

Ensure domains have been cleaned up before re-using a slot.
This commit is contained in:
David Allsopp 2023-08-29 18:46:53 +01:00 committed by Simon Cruanes
parent 25d42d5b8c
commit c0db72b40c
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -14,11 +14,11 @@ type worker_state = {
(** Array of (optional) workers. (** Array of (optional) workers.
Workers are started/stop on demand. *) Workers are started/stop on demand. *)
let domains_ : worker_state option Lock.t array = let domains_ : (worker_state option * unit Domain.t option) Lock.t array =
(* number of domains we spawn. Note that we spawn n-1 domains (* number of domains we spawn. Note that we spawn n-1 domains
because there already is the main domain running. *) because there already is the main domain running. *)
let n = max 1 (Domain_.recommended_number () - 1) in let n = max 1 (Domain_.recommended_number () - 1) in
Array.init n (fun _ -> Lock.create None) Array.init n (fun _ -> Lock.create (None, None))
(** main work loop for a domain worker. (** main work loop for a domain worker.
@ -64,15 +64,15 @@ let work_ idx (st : worker_state) : unit =
(* exit: try to remove ourselves from [domains]. If that fails, keep living. *) (* exit: try to remove ourselves from [domains]. If that fails, keep living. *)
let is_alive = let is_alive =
Lock.update_map domains_.(idx) (function Lock.update_map domains_.(idx) (function
| None -> assert false | None, _ -> assert false
| Some _st' -> | Some (_st'), dom ->
assert (st == _st'); assert (st == _st');
if Atomic_.get st.th_count > 0 then if Atomic_.get st.th_count > 0 then
(* still alive! *) (* still alive! *)
Some st, true (Some st, dom), true
else else
None, false) (None, dom), false)
in in
is_alive is_alive
@ -87,21 +87,22 @@ let run_on (i : int) (f : unit -> unit) : unit =
assert (i < Array.length domains_); assert (i < Array.length domains_);
let w = let w =
Lock.update_map domains_.(i) (function Lock.update_map domains_.(i) (function
| Some w as st -> | (Some w, _) as st ->
Atomic_.incr w.th_count; Atomic_.incr w.th_count;
st, w st, w
| None -> | None, _dom ->
Option.iter Domain.join _dom;
let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in
let _worker : domain = Domain_.spawn (fun () -> work_ i w) in let worker : domain = Domain_.spawn (fun () -> work_ i w) in
Some w, w) (Some w, Some worker), w)
in in
Bb_queue.push w.q (Run f) Bb_queue.push w.q (Run f)
let decr_on (i : int) : unit = let decr_on (i : int) : unit =
assert (i < Array.length domains_); assert (i < Array.length domains_);
match Lock.get domains_.(i) with match Lock.get domains_.(i) with
| Some st -> Bb_queue.push st.q Decr | Some st, _ -> Bb_queue.push st.q Decr
| None -> () | None, _ -> ()
let run_on_and_wait (i : int) (f : unit -> 'a) : 'a = let run_on_and_wait (i : int) (f : unit -> 'a) : 'a =
let q = Bb_queue.create () in let q = Bb_queue.create () in