mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 11:15:38 -05:00
feat runner: add run_async_batch
This enables optimizations for the case where we schedule a batch of tasks all at once. Scheduling a batch might cost fewer critical sections or wakeup checks than scheduling tasks one by one.
This commit is contained in:
parent
678664b00d
commit
0f0b887f32
2 changed files with 23 additions and 2 deletions
|
|
@ -1,7 +1,9 @@
|
||||||
type task = unit -> unit
|
type task = unit -> unit
|
||||||
|
type 'a iter = ('a -> unit) -> unit
|
||||||
|
|
||||||
type t = {
|
type t = {
|
||||||
run_async: (unit -> unit) -> unit;
|
run_async: (unit -> unit) -> unit;
|
||||||
|
run_async_batch: task iter -> unit;
|
||||||
shutdown: wait:bool -> unit -> unit;
|
shutdown: wait:bool -> unit -> unit;
|
||||||
size: unit -> int;
|
size: unit -> int;
|
||||||
num_tasks: unit -> int;
|
num_tasks: unit -> int;
|
||||||
|
|
@ -10,6 +12,7 @@ type t = {
|
||||||
exception Shutdown
|
exception Shutdown
|
||||||
|
|
||||||
let[@inline] run_async (self : t) f : unit = self.run_async f
|
let[@inline] run_async (self : t) f : unit = self.run_async f
|
||||||
|
let[@inline] run_async_batch (self : t) b : unit = self.run_async_batch b
|
||||||
let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true ()
|
let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true ()
|
||||||
|
|
||||||
let[@inline] shutdown_without_waiting (self : t) : unit =
|
let[@inline] shutdown_without_waiting (self : t) : unit =
|
||||||
|
|
@ -32,6 +35,13 @@ let run_wait_block self (f : unit -> 'a) : 'a =
|
||||||
| Error (exn, bt) -> Printexc.raise_with_backtrace exn bt
|
| Error (exn, bt) -> Printexc.raise_with_backtrace exn bt
|
||||||
|
|
||||||
module For_runner_implementors = struct
|
module For_runner_implementors = struct
|
||||||
let create ~size ~num_tasks ~shutdown ~run_async () : t =
|
let create ~size ~num_tasks ~shutdown ~run_async ?run_async_batch () : t =
|
||||||
{ size; num_tasks; shutdown; run_async }
|
let run_async_batch =
|
||||||
|
match run_async_batch with
|
||||||
|
| Some f -> f
|
||||||
|
| None ->
|
||||||
|
(* default is to just schedule each task *)
|
||||||
|
fun b -> b run_async
|
||||||
|
in
|
||||||
|
{ size; num_tasks; shutdown; run_async; run_async_batch }
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,16 @@ val run_async : t -> task -> unit
|
||||||
worker threads/domains.
|
worker threads/domains.
|
||||||
@raise Shutdown if the runner was shut down before [run_async] was called. *)
|
@raise Shutdown if the runner was shut down before [run_async] was called. *)
|
||||||
|
|
||||||
|
type 'a iter = ('a -> unit) -> unit
|
||||||
|
|
||||||
|
val run_async_batch : t -> task iter -> unit
|
||||||
|
(** [run_async_batch r batch] schedules all tasks from the batch
|
||||||
|
into [r].
|
||||||
|
It might be more efficient than calling {!run_async}
|
||||||
|
on each individual task.
|
||||||
|
@since NEXT_RELEASE
|
||||||
|
*)
|
||||||
|
|
||||||
val run_wait_block : t -> (unit -> 'a) -> 'a
|
val run_wait_block : t -> (unit -> 'a) -> 'a
|
||||||
(** [run_wait_block pool f] schedules [f] for later execution
|
(** [run_wait_block pool f] schedules [f] for later execution
|
||||||
on the pool, like {!run_async}.
|
on the pool, like {!run_async}.
|
||||||
|
|
@ -57,6 +67,7 @@ module For_runner_implementors : sig
|
||||||
num_tasks:(unit -> int) ->
|
num_tasks:(unit -> int) ->
|
||||||
shutdown:(wait:bool -> unit -> unit) ->
|
shutdown:(wait:bool -> unit -> unit) ->
|
||||||
run_async:(task -> unit) ->
|
run_async:(task -> unit) ->
|
||||||
|
?run_async_batch:(task iter -> unit) ->
|
||||||
unit ->
|
unit ->
|
||||||
t
|
t
|
||||||
(** Create a new runner.
|
(** Create a new runner.
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue