refactor: rename little blocking queue

This commit is contained in:
Simon Cruanes 2023-06-08 14:22:23 -04:00
parent b9ee0d71a1
commit d2521472c0
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 21 additions and 21 deletions

View file

@ -1,4 +1,4 @@
(** Simple blocking queue *) (** Basic Blocking Queue *)
type 'a t type 'a t

View file

@ -2,19 +2,19 @@ type domain = Domain_.t
let work_ _i q : unit = let work_ _i q : unit =
while true do while true do
let f = S_queue.pop q in let f = Bb_queue.pop q in
try f () with _ -> () try f () with _ -> ()
done done
(* A domain level worker. It should not do too much except for starting (* A domain level worker. It should not do too much except for starting
new threads for pools. *) new threads for pools. *)
type worker = { q: (unit -> unit) S_queue.t } [@@unboxed] type worker = { q: (unit -> unit) Bb_queue.t } [@@unboxed]
let domains_ : worker array lazy_t = let domains_ : worker array lazy_t =
lazy lazy
(let n = Domain_.recommended_number () in (let n = Domain_.recommended_number () in
Array.init n (fun i -> Array.init n (fun i ->
let q = S_queue.create () in let q = Bb_queue.create () in
let _domain : domain = Domain_.spawn (fun () -> work_ i q) in let _domain : domain = Domain_.spawn (fun () -> work_ i q) in
{ q })) { q }))
@ -23,11 +23,11 @@ let[@inline] n_domains () : int = Array.length (Lazy.force domains_)
let run_on (i : int) (f : unit -> unit) : unit = let run_on (i : int) (f : unit -> unit) : unit =
let (lazy arr) = domains_ in let (lazy arr) = domains_ in
assert (i < Array.length arr); assert (i < Array.length arr);
S_queue.push arr.(i).q f Bb_queue.push arr.(i).q f
let run_on_and_wait (i : int) (f : unit -> 'a) : 'a = let run_on_and_wait (i : int) (f : unit -> 'a) : 'a =
let q = S_queue.create () in let q = Bb_queue.create () in
run_on i (fun () -> run_on i (fun () ->
let x = f () in let x = f () in
S_queue.push q x); Bb_queue.push q x);
S_queue.pop q Bb_queue.pop q

View file

@ -290,9 +290,9 @@ let wait_block (self : 'a t) : 'a or_error =
| Waiting _ -> | Waiting _ ->
let real_block () = let real_block () =
(* use queue only once *) (* use queue only once *)
let q = S_queue.create () in let q = Bb_queue.create () in
on_result self (fun r -> S_queue.push q r); on_result self (fun r -> Bb_queue.push q r);
S_queue.pop q Bb_queue.pop q
in in
(* a bit of spinlock *) (* a bit of spinlock *)

View file

@ -5,7 +5,7 @@ module A = Atomic_
type t = { type t = {
active: bool A.t; active: bool A.t;
threads: Thread.t array; threads: Thread.t array;
q: (unit -> unit) S_queue.t; q: (unit -> unit) Bb_queue.t;
} }
type thread_loop_wrapper = type thread_loop_wrapper =
@ -24,14 +24,14 @@ let add_global_thread_loop_wrapper f : unit =
exception Shutdown exception Shutdown
let[@inline] run self f : unit = let[@inline] run self f : unit =
try S_queue.push self.q f with S_queue.Closed -> raise Shutdown try Bb_queue.push self.q f with Bb_queue.Closed -> raise Shutdown
let size self = Array.length self.threads let size self = Array.length self.threads
let worker_thread_ ~on_exn (active : bool A.t) (q : _ S_queue.t) : unit = let worker_thread_ ~on_exn (active : bool A.t) (q : _ Bb_queue.t) : unit =
while A.get active do while A.get active do
match S_queue.pop q with match Bb_queue.pop q with
| exception S_queue.Closed -> () | exception Bb_queue.Closed -> ()
| task -> | task ->
(try task () (try task ()
with e -> with e ->
@ -54,7 +54,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
let offset = Random.int n_domains in let offset = Random.int n_domains in
let active = A.make true in let active = A.make true in
let q = S_queue.create () in let q = Bb_queue.create () in
let pool = let pool =
let dummy = Thread.self () in let dummy = Thread.self () in
@ -63,7 +63,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
(* temporary queue used to obtain thread handles from domains (* temporary queue used to obtain thread handles from domains
on which the thread are started. *) on which the thread are started. *)
let receive_threads = S_queue.create () in let receive_threads = Bb_queue.create () in
(* start the thread with index [i] *) (* start the thread with index [i] *)
let start_thread_with_idx i = let start_thread_with_idx i =
@ -96,7 +96,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
let create_thread_in_domain () = let create_thread_in_domain () =
let thread = Thread.create main_thread_fun () in let thread = Thread.create main_thread_fun () in
(* send the thread from the domain back to us *) (* send the thread from the domain back to us *)
S_queue.push receive_threads (i, thread) Bb_queue.push receive_threads (i, thread)
in in
D_pool_.run_on dom_idx create_thread_in_domain D_pool_.run_on dom_idx create_thread_in_domain
@ -110,7 +110,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
(* receive the newly created threads back from domains *) (* receive the newly created threads back from domains *)
for _j = 1 to n do for _j = 1 to n do
let i, th = S_queue.pop receive_threads in let i, th = Bb_queue.pop receive_threads in
pool.threads.(i) <- th pool.threads.(i) <- th
done; done;
pool pool
@ -119,5 +119,5 @@ let shutdown (self : t) : unit =
let was_active = A.exchange self.active false in let was_active = A.exchange self.active false in
(* close the job queue, which will fail future calls to [run], (* close the job queue, which will fail future calls to [run],
and wake up the subset of [self.threads] that are waiting on it. *) and wake up the subset of [self.threads] that are waiting on it. *)
if was_active then S_queue.close self.q; if was_active then Bb_queue.close self.q;
Array.iter Thread.join self.threads Array.iter Thread.join self.threads