feat: add Runner, change Pool to produce a Runner.t

futures, chans, etc. should use the `Runner.t` abstraction
and not depend on the exact pool implementation to run tasks.

For now `Pool.create` is the only implementation of a runner, but now
it's possible to implement alternatives.
This commit is contained in:
Simon Cruanes 2023-07-09 17:17:00 -04:00
parent 30d2560a27
commit 76ca0f2d88
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
4 changed files with 147 additions and 85 deletions

View file

@ -1,15 +1,7 @@
(* TODO: use a better queue for the tasks *) (* TODO: use a better queue for the tasks *)
module A = Atomic_ module A = Atomic_
include Runner
type task = unit -> unit
type t = {
active: bool A.t;
threads: Thread.t array;
qs: task Bb_queue.t array;
cur_q: int A.t; (** Selects queue into which to push *)
}
type thread_loop_wrapper = type thread_loop_wrapper =
thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit
@ -24,10 +16,16 @@ let add_global_thread_loop_wrapper f : unit =
Domain_.relax () Domain_.relax ()
done done
exception Shutdown type state = {
active: bool A.t;
threads: Thread.t array;
qs: task Bb_queue.t array;
cur_q: int A.t; (** Selects queue into which to push *)
}
(** internal state *)
(** Run [task] as is, on the pool. *) (** Run [task] as is, on the pool. *)
let run_direct_ (self : t) (task : task) : unit = let run_direct_ (self : state) (task : task) : unit =
let n_qs = Array.length self.qs in let n_qs = Array.length self.qs in
let offset = A.fetch_and_add self.cur_q 1 in let offset = A.fetch_and_add self.cur_q 1 in
@ -52,37 +50,21 @@ let run_direct_ (self : t) (task : task) : unit =
| Exit -> () | Exit -> ()
| Bb_queue.Closed -> raise Shutdown | Bb_queue.Closed -> raise Shutdown
(** Run [task]. It will be wrapped with an effect handler to let rec run_async_ (self : state) (task : task) : unit =
support {!Fut.await}. *)
let rec run_async (self : t) (task : task) : unit =
let task' () = let task' () =
(* run [f()] and handle [suspend] in it *) (* run [f()] and handle [suspend] in it *)
Suspend_.with_suspend task ~run:(fun ~with_handler task -> Suspend_.with_suspend task ~run:(fun ~with_handler task ->
if with_handler then if with_handler then
run_async self task run_async_ self task
else else
run_direct_ self task) run_direct_ self task)
in in
run_direct_ self task' run_direct_ self task'
let run = run_async let run = run_async
let size_ (self : state) = Array.length self.threads
let run_wait_block self (f : unit -> 'a) : 'a = let num_tasks_ (self : state) : int =
let q = Bb_queue.create () in
run_async self (fun () ->
try
let x = f () in
Bb_queue.push q (Ok x)
with exn ->
let bt = Printexc.get_raw_backtrace () in
Bb_queue.push q (Error (exn, bt)));
match Bb_queue.pop q with
| Ok x -> x
| Error (exn, bt) -> Printexc.raise_with_backtrace exn bt
let[@inline] size self = Array.length self.threads
let num_tasks (self : t) : int =
let n = ref 0 in let n = ref 0 in
Array.iter (fun q -> n := !n + Bb_queue.size q) self.qs; Array.iter (fun q -> n := !n + Bb_queue.size q) self.qs;
!n !n
@ -112,7 +94,7 @@ exception Got_task of task
type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task
let worker_thread_ pool ~on_exn ~around_task (active : bool A.t) let worker_thread_ (runner : t) ~on_exn ~around_task (active : bool A.t)
(qs : task Bb_queue.t array) ~(offset : int) : unit = (qs : task Bb_queue.t array) ~(offset : int) : unit =
let num_qs = Array.length qs in let num_qs = Array.length qs in
let (AT_pair (before_task, after_task)) = around_task in let (AT_pair (before_task, after_task)) = around_task in
@ -137,13 +119,13 @@ let worker_thread_ pool ~on_exn ~around_task (active : bool A.t)
with Got_task f -> f with Got_task f -> f
in in
let _ctx = before_task pool in let _ctx = before_task runner in
(* run the task now, catching errors *) (* run the task now, catching errors *)
(try task () (try task ()
with e -> with e ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
on_exn e bt); on_exn e bt);
after_task pool _ctx after_task runner _ctx
done done
in in
@ -162,6 +144,13 @@ let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = ()
via the ancestral technique of the pifomètre). *) via the ancestral technique of the pifomètre). *)
let max_queues = 32 let max_queues = 32
let shutdown_ ~wait (self : state) : unit =
let was_active = A.exchange self.active false in
(* close the job queues, which will fail future calls to [run],
and wake up the subset of [self.threads] that are waiting on them. *)
if was_active then Array.iter Bb_queue.close self.qs;
if wait then Array.iter Thread.join self.threads
let create ?(on_init_thread = default_thread_init_exit_) let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(thread_wrappers = []) ?(on_exit_thread = default_thread_init_exit_) ?(thread_wrappers = [])
?(on_exn = fun _ _ -> ()) ?around_task ?min:(min_threads = 1) ?(on_exn = fun _ _ -> ()) ?around_task ?min:(min_threads = 1)
@ -193,6 +182,15 @@ let create ?(on_init_thread = default_thread_init_exit_)
{ active; threads = Array.make num_threads dummy; qs; cur_q = A.make 0 } { active; threads = Array.make num_threads dummy; qs; cur_q = A.make 0 }
in in
let runner =
Runner.For_runner_implementors.create
~shutdown:(fun ~wait () -> shutdown_ pool ~wait)
~run_async:(fun f -> run_async_ pool f)
~size:(fun () -> size_ pool)
~num_tasks:(fun () -> num_tasks_ pool)
()
in
(* temporary queue used to obtain thread handles from domains (* temporary queue used to obtain thread handles from domains
on which the thread are started. *) on which the thread are started. *)
let receive_threads = Bb_queue.create () in let receive_threads = Bb_queue.create () in
@ -212,12 +210,14 @@ let create ?(on_init_thread = default_thread_init_exit_)
in in
let run () = let run () =
worker_thread_ pool ~on_exn ~around_task active qs ~offset:i worker_thread_ runner ~on_exn ~around_task active qs ~offset:i
in in
(* the actual worker loop is [worker_thread_], with all (* the actual worker loop is [worker_thread_], with all
wrappers for this pool and for all pools (global_thread_wrappers_) *) wrappers for this pool and for all pools (global_thread_wrappers_) *)
let run' = let run' =
List.fold_left (fun run f -> f ~thread ~pool run) run all_wrappers List.fold_left
(fun run f -> f ~thread ~pool:runner run)
run all_wrappers
in in
(* now run the main loop *) (* now run the main loop *)
@ -247,14 +247,5 @@ let create ?(on_init_thread = default_thread_init_exit_)
let i, th = Bb_queue.pop receive_threads in let i, th = Bb_queue.pop receive_threads in
pool.threads.(i) <- th pool.threads.(i) <- th
done; done;
pool
let shutdown_ ~wait (self : t) : unit = runner
let was_active = A.exchange self.active false in
(* close the job queues, which will fail future calls to [run],
and wake up the subset of [self.threads] that are waiting on them. *)
if was_active then Array.iter Bb_queue.close self.qs;
if wait then Array.iter Thread.join self.threads
let shutdown_without_waiting (self : t) : unit = shutdown_ self ~wait:false
let shutdown (self : t) : unit = shutdown_ self ~wait:true

View file

@ -1,15 +1,19 @@
(** Thread pool. *) (** Thread pool.
type t A pool of threads. The pool contains a fixed number of threads that
(** A pool of threads. The pool contains a fixed number of threads that
wait for work items to come, process these, and loop. wait for work items to come, process these, and loop.
This implements {!Runner.t} since NEXT_RELEASE.
If a pool is no longer needed, {!shutdown} can be used to signal all threads If a pool is no longer needed, {!shutdown} can be used to signal all threads
in it to stop (after they finish their work), and wait for them to stop. in it to stop (after they finish their work), and wait for them to stop.
The threads are distributed across a fixed domain pool The threads are distributed across a fixed domain pool
(whose size is determined by {!Domain.recommended_domain_count} on OCaml 5, and (whose size is determined by {!Domain.recommended_domain_count} on OCaml 5,
simple the single runtime on OCaml 4). *) and simply the single runtime on OCaml 4).
*)
include module type of Runner
type thread_loop_wrapper = type thread_loop_wrapper =
thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit
@ -54,40 +58,6 @@ val create :
the same thread after the task is over. (since 0.2) the same thread after the task is over. (since 0.2)
*) *)
val size : t -> int
(** Number of threads *)
val num_tasks : t -> int
(** Current number of tasks. This is at best a snapshot, useful for metrics
and debugging.
@since 0.2 *)
val shutdown : t -> unit
(** Shutdown the pool and wait for it to terminate. Idempotent. *)
val shutdown_without_waiting : t -> unit
(** Shutdown the pool, and do not wait for it to terminate. Idempotent.
@since 0.2 *)
exception Shutdown
val run_async : t -> (unit -> unit) -> unit
(** [run_async pool f] schedules [f] for later execution on the pool
in one of the threads. [f()] will run on one of the pool's
worker threads.
@raise Shutdown if the pool was shut down before [run_async] was called.
@since 0.3 *)
val run : t -> (unit -> unit) -> unit val run : t -> (unit -> unit) -> unit
[@@deprecated "use run_async"] [@@deprecated "use run_async"]
(** deprecated alias to {!run_async} *) (** deprecated alias to {!run_async} *)
val run_wait_block : t -> (unit -> 'a) -> 'a
(** [run_wait_block pool f] schedules [f] for later execution
on the pool, like {!run_async}.
It then blocks the current thread until [f()] is done executing,
and returns its result. If [f()] raises an exception, then [run_wait_block pool f]
will raise it as well.
{b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block}).
@since 0.3 *)

37
src/runner.ml Normal file
View file

@ -0,0 +1,37 @@
type task = unit -> unit
type t = {
run_async: (unit -> unit) -> unit;
shutdown: wait:bool -> unit -> unit;
size: unit -> int;
num_tasks: unit -> int;
}
exception Shutdown
let[@inline] run_async (self : t) f : unit = self.run_async f
let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true ()
let[@inline] shutdown_without_waiting (self : t) : unit =
self.shutdown ~wait:false ()
let[@inline] num_tasks (self : t) : int = self.num_tasks ()
let[@inline] size (self : t) : int = self.size ()
let run_wait_block self (f : unit -> 'a) : 'a =
let q = Bb_queue.create () in
run_async self (fun () ->
try
let x = f () in
Bb_queue.push q (Ok x)
with exn ->
let bt = Printexc.get_raw_backtrace () in
Bb_queue.push q (Error (exn, bt)));
match Bb_queue.pop q with
| Ok x -> x
| Error (exn, bt) -> Printexc.raise_with_backtrace exn bt
module For_runner_implementors = struct
let create ~size ~num_tasks ~shutdown ~run_async () : t =
{ size; num_tasks; shutdown; run_async }
end

64
src/runner.mli Normal file
View file

@ -0,0 +1,64 @@
(** Abstract runner.
This provides an abstraction for running tasks in the background.
@since NEXT_RELEASE
*)
type task = unit -> unit
type t = private {
run_async: task -> unit;
shutdown: wait:bool -> unit -> unit;
size: unit -> int;
num_tasks: unit -> int;
}
(** A runner.
If a runner is no longer needed, {!shutdown} can be used to signal all
worker threads
in it to stop (after they finish their work), and wait for them to stop.
The threads are distributed across a fixed domain pool
(whose size is determined by {!Domain.recommended_domain_count} on OCaml 5, and
simple the single runtime on OCaml 4). *)
val size : t -> int
(** Number of threads/workers. *)
val num_tasks : t -> int
(** Current number of tasks. This is at best a snapshot, useful for metrics
and debugging. *)
val shutdown : t -> unit
(** Shutdown the runner and wait for it to terminate. Idempotent. *)
val shutdown_without_waiting : t -> unit
(** Shutdown the pool, and do not wait for it to terminate. Idempotent. *)
exception Shutdown
val run_async : t -> task -> unit
(** [run_async pool f] schedules [f] for later execution on the runner
in one of the threads. [f()] will run on one of the runner's
worker threads/domains.
@raise Shutdown if the runner was shut down before [run_async] was called. *)
val run_wait_block : t -> (unit -> 'a) -> 'a
(** [run_wait_block pool f] schedules [f] for later execution
on the pool, like {!run_async}.
It then blocks the current thread until [f()] is done executing,
and returns its result. If [f()] raises an exception, then [run_wait_block pool f]
will raise it as well.
{b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block}). *)
module For_runner_implementors : sig
val create :
size:(unit -> int) ->
num_tasks:(unit -> int) ->
shutdown:(wait:bool -> unit -> unit) ->
run_async:(task -> unit) ->
unit ->
t
(** Create a new runner. *)
end