diff --git a/src/pool.ml b/src/pool.ml index 8e22f664..4264a20f 100644 --- a/src/pool.ml +++ b/src/pool.ml @@ -21,16 +21,22 @@ let add_global_thread_loop_wrapper f : unit = () done -let[@inline] run self f : unit = S_queue.push self.q f +exception Shutdown + +let[@inline] run self f : unit = + try S_queue.push self.q f with S_queue.Closed -> raise Shutdown + let size self = Array.length self.threads let worker_thread_ ~on_exn (active : bool A.t) (q : _ S_queue.t) : unit = while A.get active do - let task = S_queue.pop q in - try task () - with e -> - let bt = Printexc.get_raw_backtrace () in - on_exn e bt + match S_queue.pop q with + | exception S_queue.Closed -> () + | task -> + (try task () + with e -> + let bt = Printexc.get_raw_backtrace () in + on_exn e bt) done let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = () @@ -111,8 +117,7 @@ let create ?(on_init_thread = default_thread_init_exit_) let shutdown (self : t) : unit = let was_active = A.exchange self.active false in - (* make sure to wakeup all the sleeping threads by scheduling one task each. - This way, a thread that is asleep, waiting for tasks, - will wakeup to process this trivial task, check [self.active], and terminate. *) - if was_active then Array.iter (fun _ -> run self ignore) self.threads; + (* close the job queue, which will fail future calls to [run], + and wake up the subset of [self.threads] that are waiting on it. *) + if was_active then S_queue.close self.q; Array.iter Thread.join self.threads diff --git a/src/pool.mli b/src/pool.mli index b5676d4a..cc90f71e 100644 --- a/src/pool.mli +++ b/src/pool.mli @@ -1,7 +1,15 @@ (** Thread pool. *) type t -(** A pool of threads. *) +(** A pool of threads. The pool contains a fixed number of threads that + wait for work items to come, process these, and loop. + + If a pool is no longer needed, {!shutdown} can be used to signal all threads + in it to stop (after they finish their work), and wait for them to stop. + + The threads are distributed across a fixed domain pool + (whose size is determined by {!Domain.recommended_domain_count} on OCaml 5, and + simple the single runtime on OCaml 4). *) type thread_loop_wrapper = thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit @@ -39,6 +47,10 @@ val size : t -> int val shutdown : t -> unit (** Shutdown the pool and wait for it to terminate. Idempotent. *) +exception Shutdown + val run : t -> (unit -> unit) -> unit (** [run pool f] schedules [f] for later execution on the pool - in one of the threads. *) + in one of the threads. [f()] will run on one of the pool's + worker threads. + @raise Shutdown if the pool was shut down before [run] was called. *) diff --git a/src/s_queue.ml b/src/s_queue.ml index 8a3e8280..fb639f85 100644 --- a/src/s_queue.ml +++ b/src/s_queue.ml @@ -2,21 +2,45 @@ type 'a t = { mutex: Mutex.t; cond: Condition.t; q: 'a Queue.t; + mutable closed: bool; } +exception Closed + let create () : _ t = - { mutex = Mutex.create (); cond = Condition.create (); q = Queue.create () } + { + mutex = Mutex.create (); + cond = Condition.create (); + q = Queue.create (); + closed = false; + } + +let close (self : _ t) = + Mutex.lock self.mutex; + if not self.closed then ( + self.closed <- true; + Condition.broadcast self.cond (* awake waiters so they fail *) + ); + Mutex.unlock self.mutex let push (self : _ t) x : unit = Mutex.lock self.mutex; - Queue.push x self.q; - Condition.signal self.cond; - Mutex.unlock self.mutex + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ) else ( + Queue.push x self.q; + Condition.signal self.cond; + Mutex.unlock self.mutex + ) let pop (self : 'a t) : 'a = Mutex.lock self.mutex; let rec loop () = - if Queue.is_empty self.q then ( + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ) else if Queue.is_empty self.q then ( Condition.wait self.cond self.mutex; (loop [@tailcall]) () ) else ( @@ -26,3 +50,13 @@ let pop (self : 'a t) : 'a = ) in loop () + +let try_pop (self : _ t) : _ option = + Mutex.lock self.mutex; + match Queue.pop self.q with + | x -> + Mutex.unlock self.mutex; + Some x + | exception Queue.Empty -> + Mutex.unlock self.mutex; + None diff --git a/src/s_queue.mli b/src/s_queue.mli index 6adad6da..d09b80c6 100644 --- a/src/s_queue.mli +++ b/src/s_queue.mli @@ -3,5 +3,20 @@ type 'a t val create : unit -> _ t + +exception Closed + val push : 'a t -> 'a -> unit +(** [push q x] pushes [x] into [q], and returns [()]. + @raise Closed if [close q] was previously called.*) + val pop : 'a t -> 'a +(** [pop q] pops the next element in [q]. It might block until an element comes. + @raise Closed if the queue was closed before a new element was available. *) + +val try_pop : 'a t -> 'a option +(** [try_pop q] immediately pops the first element of [q], if any, + or returns [None] without blocking. *) + +val close : _ t -> unit +(** Close the queue, meaning there won't be any more [push] allowed. *)