diff --git a/src/pool.ml b/src/pool.ml index 9cef70ab..34fe3cb7 100644 --- a/src/pool.ml +++ b/src/pool.ml @@ -1,15 +1,7 @@ (* TODO: use a better queue for the tasks *) module A = Atomic_ - -type task = unit -> unit - -type t = { - active: bool A.t; - threads: Thread.t array; - qs: task Bb_queue.t array; - cur_q: int A.t; (** Selects queue into which to push *) -} +include Runner type thread_loop_wrapper = thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit @@ -24,10 +16,16 @@ let add_global_thread_loop_wrapper f : unit = Domain_.relax () done -exception Shutdown +type state = { + active: bool A.t; + threads: Thread.t array; + qs: task Bb_queue.t array; + cur_q: int A.t; (** Selects queue into which to push *) +} +(** internal state *) (** Run [task] as is, on the pool. *) -let run_direct_ (self : t) (task : task) : unit = +let run_direct_ (self : state) (task : task) : unit = let n_qs = Array.length self.qs in let offset = A.fetch_and_add self.cur_q 1 in @@ -52,37 +50,21 @@ let run_direct_ (self : t) (task : task) : unit = | Exit -> () | Bb_queue.Closed -> raise Shutdown -(** Run [task]. It will be wrapped with an effect handler to - support {!Fut.await}. *) -let rec run_async (self : t) (task : task) : unit = +let rec run_async_ (self : state) (task : task) : unit = let task' () = (* run [f()] and handle [suspend] in it *) Suspend_.with_suspend task ~run:(fun ~with_handler task -> if with_handler then - run_async self task + run_async_ self task else run_direct_ self task) in run_direct_ self task' let run = run_async +let size_ (self : state) = Array.length self.threads -let run_wait_block self (f : unit -> 'a) : 'a = - let q = Bb_queue.create () in - run_async self (fun () -> - try - let x = f () in - Bb_queue.push q (Ok x) - with exn -> - let bt = Printexc.get_raw_backtrace () in - Bb_queue.push q (Error (exn, bt))); - match Bb_queue.pop q with - | Ok x -> x - | Error (exn, bt) -> Printexc.raise_with_backtrace exn bt - -let[@inline] size self = Array.length self.threads - -let num_tasks (self : t) : int = +let num_tasks_ (self : state) : int = let n = ref 0 in Array.iter (fun q -> n := !n + Bb_queue.size q) self.qs; !n @@ -112,7 +94,7 @@ exception Got_task of task type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task -let worker_thread_ pool ~on_exn ~around_task (active : bool A.t) +let worker_thread_ (runner : t) ~on_exn ~around_task (active : bool A.t) (qs : task Bb_queue.t array) ~(offset : int) : unit = let num_qs = Array.length qs in let (AT_pair (before_task, after_task)) = around_task in @@ -137,13 +119,13 @@ let worker_thread_ pool ~on_exn ~around_task (active : bool A.t) with Got_task f -> f in - let _ctx = before_task pool in + let _ctx = before_task runner in (* run the task now, catching errors *) (try task () with e -> let bt = Printexc.get_raw_backtrace () in on_exn e bt); - after_task pool _ctx + after_task runner _ctx done in @@ -162,6 +144,13 @@ let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = () via the ancestral technique of the pifomètre). *) let max_queues = 32 +let shutdown_ ~wait (self : state) : unit = + let was_active = A.exchange self.active false in + (* close the job queues, which will fail future calls to [run], + and wake up the subset of [self.threads] that are waiting on them. *) + if was_active then Array.iter Bb_queue.close self.qs; + if wait then Array.iter Thread.join self.threads + let create ?(on_init_thread = default_thread_init_exit_) ?(on_exit_thread = default_thread_init_exit_) ?(thread_wrappers = []) ?(on_exn = fun _ _ -> ()) ?around_task ?min:(min_threads = 1) @@ -193,6 +182,15 @@ let create ?(on_init_thread = default_thread_init_exit_) { active; threads = Array.make num_threads dummy; qs; cur_q = A.make 0 } in + let runner = + Runner.For_runner_implementors.create + ~shutdown:(fun ~wait () -> shutdown_ pool ~wait) + ~run_async:(fun f -> run_async_ pool f) + ~size:(fun () -> size_ pool) + ~num_tasks:(fun () -> num_tasks_ pool) + () + in + (* temporary queue used to obtain thread handles from domains on which the thread are started. *) let receive_threads = Bb_queue.create () in @@ -212,12 +210,14 @@ let create ?(on_init_thread = default_thread_init_exit_) in let run () = - worker_thread_ pool ~on_exn ~around_task active qs ~offset:i + worker_thread_ runner ~on_exn ~around_task active qs ~offset:i in (* the actual worker loop is [worker_thread_], with all wrappers for this pool and for all pools (global_thread_wrappers_) *) let run' = - List.fold_left (fun run f -> f ~thread ~pool run) run all_wrappers + List.fold_left + (fun run f -> f ~thread ~pool:runner run) + run all_wrappers in (* now run the main loop *) @@ -247,14 +247,5 @@ let create ?(on_init_thread = default_thread_init_exit_) let i, th = Bb_queue.pop receive_threads in pool.threads.(i) <- th done; - pool -let shutdown_ ~wait (self : t) : unit = - let was_active = A.exchange self.active false in - (* close the job queues, which will fail future calls to [run], - and wake up the subset of [self.threads] that are waiting on them. *) - if was_active then Array.iter Bb_queue.close self.qs; - if wait then Array.iter Thread.join self.threads - -let shutdown_without_waiting (self : t) : unit = shutdown_ self ~wait:false -let shutdown (self : t) : unit = shutdown_ self ~wait:true + runner diff --git a/src/pool.mli b/src/pool.mli index ecf82288..20319a49 100644 --- a/src/pool.mli +++ b/src/pool.mli @@ -1,15 +1,19 @@ -(** Thread pool. *) +(** Thread pool. -type t -(** A pool of threads. The pool contains a fixed number of threads that + A pool of threads. The pool contains a fixed number of threads that wait for work items to come, process these, and loop. + This implements {!Runner.t} since NEXT_RELEASE. + If a pool is no longer needed, {!shutdown} can be used to signal all threads in it to stop (after they finish their work), and wait for them to stop. The threads are distributed across a fixed domain pool - (whose size is determined by {!Domain.recommended_domain_count} on OCaml 5, and - simple the single runtime on OCaml 4). *) + (whose size is determined by {!Domain.recommended_domain_count} on OCaml 5, + and simply the single runtime on OCaml 4). + *) + +include module type of Runner type thread_loop_wrapper = thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit @@ -54,40 +58,6 @@ val create : the same thread after the task is over. (since 0.2) *) -val size : t -> int -(** Number of threads *) - -val num_tasks : t -> int -(** Current number of tasks. This is at best a snapshot, useful for metrics - and debugging. - @since 0.2 *) - -val shutdown : t -> unit -(** Shutdown the pool and wait for it to terminate. Idempotent. *) - -val shutdown_without_waiting : t -> unit -(** Shutdown the pool, and do not wait for it to terminate. Idempotent. - @since 0.2 *) - -exception Shutdown - -val run_async : t -> (unit -> unit) -> unit -(** [run_async pool f] schedules [f] for later execution on the pool - in one of the threads. [f()] will run on one of the pool's - worker threads. - @raise Shutdown if the pool was shut down before [run_async] was called. - @since 0.3 *) - val run : t -> (unit -> unit) -> unit [@@deprecated "use run_async"] (** deprecated alias to {!run_async} *) - -val run_wait_block : t -> (unit -> 'a) -> 'a -(** [run_wait_block pool f] schedules [f] for later execution - on the pool, like {!run_async}. - It then blocks the current thread until [f()] is done executing, - and returns its result. If [f()] raises an exception, then [run_wait_block pool f] - will raise it as well. - - {b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block}). - @since 0.3 *) diff --git a/src/runner.ml b/src/runner.ml new file mode 100644 index 00000000..91cde5a2 --- /dev/null +++ b/src/runner.ml @@ -0,0 +1,37 @@ +type task = unit -> unit + +type t = { + run_async: (unit -> unit) -> unit; + shutdown: wait:bool -> unit -> unit; + size: unit -> int; + num_tasks: unit -> int; +} + +exception Shutdown + +let[@inline] run_async (self : t) f : unit = self.run_async f +let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true () + +let[@inline] shutdown_without_waiting (self : t) : unit = + self.shutdown ~wait:false () + +let[@inline] num_tasks (self : t) : int = self.num_tasks () +let[@inline] size (self : t) : int = self.size () + +let run_wait_block self (f : unit -> 'a) : 'a = + let q = Bb_queue.create () in + run_async self (fun () -> + try + let x = f () in + Bb_queue.push q (Ok x) + with exn -> + let bt = Printexc.get_raw_backtrace () in + Bb_queue.push q (Error (exn, bt))); + match Bb_queue.pop q with + | Ok x -> x + | Error (exn, bt) -> Printexc.raise_with_backtrace exn bt + +module For_runner_implementors = struct + let create ~size ~num_tasks ~shutdown ~run_async () : t = + { size; num_tasks; shutdown; run_async } +end diff --git a/src/runner.mli b/src/runner.mli new file mode 100644 index 00000000..49d597ff --- /dev/null +++ b/src/runner.mli @@ -0,0 +1,64 @@ +(** Abstract runner. + + This provides an abstraction for running tasks in the background. + @since NEXT_RELEASE +*) + +type task = unit -> unit + +type t = private { + run_async: task -> unit; + shutdown: wait:bool -> unit -> unit; + size: unit -> int; + num_tasks: unit -> int; +} +(** A runner. + + If a runner is no longer needed, {!shutdown} can be used to signal all + worker threads + in it to stop (after they finish their work), and wait for them to stop. + + The threads are distributed across a fixed domain pool + (whose size is determined by {!Domain.recommended_domain_count} on OCaml 5, and + simple the single runtime on OCaml 4). *) + +val size : t -> int +(** Number of threads/workers. *) + +val num_tasks : t -> int +(** Current number of tasks. This is at best a snapshot, useful for metrics + and debugging. *) + +val shutdown : t -> unit +(** Shutdown the runner and wait for it to terminate. Idempotent. *) + +val shutdown_without_waiting : t -> unit +(** Shutdown the pool, and do not wait for it to terminate. Idempotent. *) + +exception Shutdown + +val run_async : t -> task -> unit +(** [run_async pool f] schedules [f] for later execution on the runner + in one of the threads. [f()] will run on one of the runner's + worker threads/domains. + @raise Shutdown if the runner was shut down before [run_async] was called. *) + +val run_wait_block : t -> (unit -> 'a) -> 'a +(** [run_wait_block pool f] schedules [f] for later execution + on the pool, like {!run_async}. + It then blocks the current thread until [f()] is done executing, + and returns its result. If [f()] raises an exception, then [run_wait_block pool f] + will raise it as well. + + {b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block}). *) + +module For_runner_implementors : sig + val create : + size:(unit -> int) -> + num_tasks:(unit -> int) -> + shutdown:(wait:bool -> unit -> unit) -> + run_async:(task -> unit) -> + unit -> + t + (** Create a new runner. *) +end