From 25104ce3b72419a5253e75f734abb7a90fa46de8 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 4 Mar 2024 15:46:25 -0500 Subject: [PATCH] feat fifo: expose private interface for the main runner threads --- src/core/fifo_pool.ml | 38 +++++++++++++++++++++++++------------- src/core/fifo_pool.mli | 14 ++++++++++++++ 2 files changed, 39 insertions(+), 13 deletions(-) diff --git a/src/core/fifo_pool.ml b/src/core/fifo_pool.ml index 0023c1a8..415bb8e8 100644 --- a/src/core/fifo_pool.ml +++ b/src/core/fifo_pool.ml @@ -107,6 +107,17 @@ type ('a, 'b) create_args = ?name:string -> 'a +let default_around_task_ : around_task = AT_pair (ignore, fun _ _ -> ()) + +let runner_of_state (pool : state) : t = + let run_async ~ls f = schedule_ pool @@ T_start { f; ls } in + Runner.For_runner_implementors.create + ~shutdown:(fun ~wait () -> shutdown_ pool ~wait) + ~run_async + ~size:(fun () -> size_ pool) + ~num_tasks:(fun () -> num_tasks_ pool) + () + let create ?(on_init_thread = default_thread_init_exit_) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) ?around_task ?num_threads ?name () : t = @@ -114,7 +125,7 @@ let create ?(on_init_thread = default_thread_init_exit_) let around_task = match around_task with | Some (f, g) -> AT_pair (f, g) - | None -> AT_pair (ignore, fun _ _ -> ()) + | None -> default_around_task_ in let num_domains = Domain_pool_.n_domains () in @@ -126,20 +137,11 @@ let create ?(on_init_thread = default_thread_init_exit_) let offset = Random.int num_domains in let pool = - let dummy = Thread.self () in - { threads = Array.make num_threads dummy; q = Bb_queue.create () } + let dummy_thread = Thread.self () in + { threads = Array.make num_threads dummy_thread; q = Bb_queue.create () } in - let run_async ~ls f = schedule_ pool @@ T_start { f; ls } in - - let runner = - Runner.For_runner_implementors.create - ~shutdown:(fun ~wait () -> shutdown_ pool ~wait) - ~run_async - ~size:(fun () -> size_ pool) - ~num_tasks:(fun () -> num_tasks_ pool) - () - in + let runner = runner_of_state pool in (* temporary queue used to obtain thread handles from domains on which the thread are started. *) @@ -203,3 +205,13 @@ let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads in let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in f pool + +module Private_ = struct + type nonrec state = state + + let create_state ~threads () : state = { threads; q = Bb_queue.create () } + let runner_of_state = runner_of_state + + let run_thread (st : state) (self : t) ~on_exn : unit = + worker_thread_ st self ~on_exn ~around_task:default_around_task_ +end diff --git a/src/core/fifo_pool.mli b/src/core/fifo_pool.mli index 25cde495..28b7a3a9 100644 --- a/src/core/fifo_pool.mli +++ b/src/core/fifo_pool.mli @@ -44,3 +44,17 @@ val with_ : (unit -> (t -> 'a) -> 'a, _) create_args When [f pool] returns or fails, [pool] is shutdown and its resources are released. Most parameters are the same as in {!create}. *) + +(**/**) + +module Private_ : sig + type state + + val create_state : threads:Thread.t array -> unit -> state + val runner_of_state : state -> Runner.t + + val run_thread : + state -> t -> on_exn:(exn -> Printexc.raw_backtrace -> unit) -> unit +end + +(**/**)