diff --git a/src/s_queue.ml b/src/bb_queue.ml similarity index 100% rename from src/s_queue.ml rename to src/bb_queue.ml diff --git a/src/s_queue.mli b/src/bb_queue.mli similarity index 97% rename from src/s_queue.mli rename to src/bb_queue.mli index 644c1b86..4dee92b6 100644 --- a/src/s_queue.mli +++ b/src/bb_queue.mli @@ -1,4 +1,4 @@ -(** Simple blocking queue *) +(** Basic Blocking Queue *) type 'a t diff --git a/src/d_pool_.ml b/src/d_pool_.ml index e2368ceb..4619712b 100644 --- a/src/d_pool_.ml +++ b/src/d_pool_.ml @@ -2,19 +2,19 @@ type domain = Domain_.t let work_ _i q : unit = while true do - let f = S_queue.pop q in + let f = Bb_queue.pop q in try f () with _ -> () done (* A domain level worker. It should not do too much except for starting new threads for pools. *) -type worker = { q: (unit -> unit) S_queue.t } [@@unboxed] +type worker = { q: (unit -> unit) Bb_queue.t } [@@unboxed] let domains_ : worker array lazy_t = lazy (let n = Domain_.recommended_number () in Array.init n (fun i -> - let q = S_queue.create () in + let q = Bb_queue.create () in let _domain : domain = Domain_.spawn (fun () -> work_ i q) in { q })) @@ -23,11 +23,11 @@ let[@inline] n_domains () : int = Array.length (Lazy.force domains_) let run_on (i : int) (f : unit -> unit) : unit = let (lazy arr) = domains_ in assert (i < Array.length arr); - S_queue.push arr.(i).q f + Bb_queue.push arr.(i).q f let run_on_and_wait (i : int) (f : unit -> 'a) : 'a = - let q = S_queue.create () in + let q = Bb_queue.create () in run_on i (fun () -> let x = f () in - S_queue.push q x); - S_queue.pop q + Bb_queue.push q x); + Bb_queue.pop q diff --git a/src/fut.ml b/src/fut.ml index cd8d2bf3..108575c7 100644 --- a/src/fut.ml +++ b/src/fut.ml @@ -290,9 +290,9 @@ let wait_block (self : 'a t) : 'a or_error = | Waiting _ -> let real_block () = (* use queue only once *) - let q = S_queue.create () in - on_result self (fun r -> S_queue.push q r); - S_queue.pop q + let q = Bb_queue.create () in + on_result self (fun r -> Bb_queue.push q r); + Bb_queue.pop q in (* a bit of spinlock *) diff --git a/src/pool.ml b/src/pool.ml index e7dcb1cd..7cd78820 100644 --- a/src/pool.ml +++ b/src/pool.ml @@ -5,7 +5,7 @@ module A = Atomic_ type t = { active: bool A.t; threads: Thread.t array; - q: (unit -> unit) S_queue.t; + q: (unit -> unit) Bb_queue.t; } type thread_loop_wrapper = @@ -24,14 +24,14 @@ let add_global_thread_loop_wrapper f : unit = exception Shutdown let[@inline] run self f : unit = - try S_queue.push self.q f with S_queue.Closed -> raise Shutdown + try Bb_queue.push self.q f with Bb_queue.Closed -> raise Shutdown let size self = Array.length self.threads -let worker_thread_ ~on_exn (active : bool A.t) (q : _ S_queue.t) : unit = +let worker_thread_ ~on_exn (active : bool A.t) (q : _ Bb_queue.t) : unit = while A.get active do - match S_queue.pop q with - | exception S_queue.Closed -> () + match Bb_queue.pop q with + | exception Bb_queue.Closed -> () | task -> (try task () with e -> @@ -54,7 +54,7 @@ let create ?(on_init_thread = default_thread_init_exit_) let offset = Random.int n_domains in let active = A.make true in - let q = S_queue.create () in + let q = Bb_queue.create () in let pool = let dummy = Thread.self () in @@ -63,7 +63,7 @@ let create ?(on_init_thread = default_thread_init_exit_) (* temporary queue used to obtain thread handles from domains on which the thread are started. *) - let receive_threads = S_queue.create () in + let receive_threads = Bb_queue.create () in (* start the thread with index [i] *) let start_thread_with_idx i = @@ -96,7 +96,7 @@ let create ?(on_init_thread = default_thread_init_exit_) let create_thread_in_domain () = let thread = Thread.create main_thread_fun () in (* send the thread from the domain back to us *) - S_queue.push receive_threads (i, thread) + Bb_queue.push receive_threads (i, thread) in D_pool_.run_on dom_idx create_thread_in_domain @@ -110,7 +110,7 @@ let create ?(on_init_thread = default_thread_init_exit_) (* receive the newly created threads back from domains *) for _j = 1 to n do - let i, th = S_queue.pop receive_threads in + let i, th = Bb_queue.pop receive_threads in pool.threads.(i) <- th done; pool @@ -119,5 +119,5 @@ let shutdown (self : t) : unit = let was_active = A.exchange self.active false in (* close the job queue, which will fail future calls to [run], and wake up the subset of [self.threads] that are waiting on it. *) - if was_active then S_queue.close self.q; + if was_active then Bb_queue.close self.q; Array.iter Thread.join self.threads