diff --git a/src/fifo_pool.ml b/src/fifo_pool.ml index 1a95d715..c4457e60 100644 --- a/src/fifo_pool.ml +++ b/src/fifo_pool.ml @@ -15,6 +15,13 @@ let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q let schedule_ (self : state) (task : task) : unit = try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown +let schedule_batch_ (self : state) (batch : task iter) : unit = + (* local queue for the batch, so we can transfer the whole batch into + [self.q] in one go (one critical section) *) + let batch_q = Queue.create () in + batch (fun x -> Queue.push x batch_q); + Bb_queue.transfer_into batch_q self.q + type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit = @@ -86,6 +93,7 @@ let create ?(on_init_thread = default_thread_init_exit_) Runner.For_runner_implementors.create ~shutdown:(fun ~wait () -> shutdown_ pool ~wait) ~run_async:(fun f -> schedule_ pool f) + ~run_async_batch:(fun b -> schedule_batch_ pool b) ~size:(fun () -> size_ pool) ~num_tasks:(fun () -> num_tasks_ pool) ()