mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-05 19:00:33 -05:00
bugfix: forgot to dispose of the worker state on winding down
This commit is contained in:
parent
9db0a9fe28
commit
e38ee31b93
1 changed files with 11 additions and 9 deletions
|
|
@ -13,14 +13,6 @@ type worker_state = {
|
|||
th_count: int Atomic_.t; (** Number of threads on this *)
|
||||
}
|
||||
|
||||
let work_ (st : worker_state) : unit =
|
||||
Dla_.setup_domain ();
|
||||
while Atomic_.get st.th_count > 0 do
|
||||
match Bb_queue.pop st.q with
|
||||
| Run f -> (try f () with _ -> ())
|
||||
| Decr -> Atomic_.decr st.th_count
|
||||
done
|
||||
|
||||
(** Array of (optional) workers.
|
||||
|
||||
Workers are started/stop on demand. *)
|
||||
|
|
@ -30,6 +22,16 @@ let domains_ : worker_state option Lock.t array =
|
|||
let n = max 1 (Domain_.recommended_number () - 1) in
|
||||
Array.init n (fun _ -> Lock.create None)
|
||||
|
||||
let work_ idx (st : worker_state) : unit =
|
||||
Dla_.setup_domain ();
|
||||
while Atomic_.get st.th_count > 0 do
|
||||
match Bb_queue.pop st.q with
|
||||
| Run f -> (try f () with _ -> ())
|
||||
| Decr ->
|
||||
if Atomic_.fetch_and_add st.th_count (-1) = 1 then
|
||||
Lock.set domains_.(idx) None
|
||||
done
|
||||
|
||||
let[@inline] n_domains () : int = Array.length domains_
|
||||
|
||||
let run_on (i : int) (f : unit -> unit) : unit =
|
||||
|
|
@ -42,7 +44,7 @@ let run_on (i : int) (f : unit -> unit) : unit =
|
|||
st
|
||||
| None ->
|
||||
let st = { th_count = Atomic_.make 1; q = Bb_queue.create () } in
|
||||
let _domain : domain = Domain_.spawn (fun () -> work_ st) in
|
||||
let _domain : domain = Domain_.spawn (fun () -> work_ i st) in
|
||||
Bb_queue.push st.q (Run f);
|
||||
Some st)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue