fix(pool): make sure the work queue is closed properly

this way we can't submit new jobs after the pool has been shutdown.
This commit is contained in:
Simon Cruanes 2023-06-05 13:04:10 -04:00
parent 6ffbd15a34
commit adfa1e62cb
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
4 changed files with 83 additions and 17 deletions

View file

@ -21,16 +21,22 @@ let add_global_thread_loop_wrapper f : unit =
()
done
let[@inline] run self f : unit = S_queue.push self.q f
exception Shutdown
let[@inline] run self f : unit =
try S_queue.push self.q f with S_queue.Closed -> raise Shutdown
let size self = Array.length self.threads
let worker_thread_ ~on_exn (active : bool A.t) (q : _ S_queue.t) : unit =
while A.get active do
let task = S_queue.pop q in
try task ()
with e ->
let bt = Printexc.get_raw_backtrace () in
on_exn e bt
match S_queue.pop q with
| exception S_queue.Closed -> ()
| task ->
(try task ()
with e ->
let bt = Printexc.get_raw_backtrace () in
on_exn e bt)
done
let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = ()
@ -111,8 +117,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
let shutdown (self : t) : unit =
let was_active = A.exchange self.active false in
(* make sure to wakeup all the sleeping threads by scheduling one task each.
This way, a thread that is asleep, waiting for tasks,
will wakeup to process this trivial task, check [self.active], and terminate. *)
if was_active then Array.iter (fun _ -> run self ignore) self.threads;
(* close the job queue, which will fail future calls to [run],
and wake up the subset of [self.threads] that are waiting on it. *)
if was_active then S_queue.close self.q;
Array.iter Thread.join self.threads

View file

@ -1,7 +1,15 @@
(** Thread pool. *)
type t
(** A pool of threads. *)
(** A pool of threads. The pool contains a fixed number of threads that
wait for work items to come, process these, and loop.
If a pool is no longer needed, {!shutdown} can be used to signal all threads
in it to stop (after they finish their work), and wait for them to stop.
The threads are distributed across a fixed domain pool
(whose size is determined by {!Domain.recommended_domain_count} on OCaml 5, and
simple the single runtime on OCaml 4). *)
type thread_loop_wrapper =
thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit
@ -39,6 +47,10 @@ val size : t -> int
val shutdown : t -> unit
(** Shutdown the pool and wait for it to terminate. Idempotent. *)
exception Shutdown
val run : t -> (unit -> unit) -> unit
(** [run pool f] schedules [f] for later execution on the pool
in one of the threads. *)
in one of the threads. [f()] will run on one of the pool's
worker threads.
@raise Shutdown if the pool was shut down before [run] was called. *)

View file

@ -2,21 +2,45 @@ type 'a t = {
mutex: Mutex.t;
cond: Condition.t;
q: 'a Queue.t;
mutable closed: bool;
}
exception Closed
let create () : _ t =
{ mutex = Mutex.create (); cond = Condition.create (); q = Queue.create () }
{
mutex = Mutex.create ();
cond = Condition.create ();
q = Queue.create ();
closed = false;
}
let close (self : _ t) =
Mutex.lock self.mutex;
if not self.closed then (
self.closed <- true;
Condition.broadcast self.cond (* awake waiters so they fail *)
);
Mutex.unlock self.mutex
let push (self : _ t) x : unit =
Mutex.lock self.mutex;
Queue.push x self.q;
Condition.signal self.cond;
Mutex.unlock self.mutex
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
) else (
Queue.push x self.q;
Condition.signal self.cond;
Mutex.unlock self.mutex
)
let pop (self : 'a t) : 'a =
Mutex.lock self.mutex;
let rec loop () =
if Queue.is_empty self.q then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
) else if Queue.is_empty self.q then (
Condition.wait self.cond self.mutex;
(loop [@tailcall]) ()
) else (
@ -26,3 +50,13 @@ let pop (self : 'a t) : 'a =
)
in
loop ()
let try_pop (self : _ t) : _ option =
Mutex.lock self.mutex;
match Queue.pop self.q with
| x ->
Mutex.unlock self.mutex;
Some x
| exception Queue.Empty ->
Mutex.unlock self.mutex;
None

View file

@ -3,5 +3,20 @@
type 'a t
val create : unit -> _ t
exception Closed
val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] into [q], and returns [()].
@raise Closed if [close q] was previously called.*)
val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *)
val try_pop : 'a t -> 'a option
(** [try_pop q] immediately pops the first element of [q], if any,
or returns [None] without blocking. *)
val close : _ t -> unit
(** Close the queue, meaning there won't be any more [push] allowed. *)