mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-13 14:30:32 -05:00
perf: implement batch scheduling for the Fifo pool
This commit is contained in:
parent
0f0b887f32
commit
0f6bd6288d
1 changed files with 8 additions and 0 deletions
|
|
@ -15,6 +15,13 @@ let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q
|
||||||
let schedule_ (self : state) (task : task) : unit =
|
let schedule_ (self : state) (task : task) : unit =
|
||||||
try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown
|
try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown
|
||||||
|
|
||||||
|
let schedule_batch_ (self : state) (batch : task iter) : unit =
|
||||||
|
(* local queue for the batch, so we can transfer the whole batch into
|
||||||
|
[self.q] in one go (one critical section) *)
|
||||||
|
let batch_q = Queue.create () in
|
||||||
|
batch (fun x -> Queue.push x batch_q);
|
||||||
|
Bb_queue.transfer_into batch_q self.q
|
||||||
|
|
||||||
type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task
|
type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task
|
||||||
|
|
||||||
let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit =
|
let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit =
|
||||||
|
|
@ -86,6 +93,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
|
||||||
Runner.For_runner_implementors.create
|
Runner.For_runner_implementors.create
|
||||||
~shutdown:(fun ~wait () -> shutdown_ pool ~wait)
|
~shutdown:(fun ~wait () -> shutdown_ pool ~wait)
|
||||||
~run_async:(fun f -> schedule_ pool f)
|
~run_async:(fun f -> schedule_ pool f)
|
||||||
|
~run_async_batch:(fun b -> schedule_batch_ pool b)
|
||||||
~size:(fun () -> size_ pool)
|
~size:(fun () -> size_ pool)
|
||||||
~num_tasks:(fun () -> num_tasks_ pool)
|
~num_tasks:(fun () -> num_tasks_ pool)
|
||||||
()
|
()
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue