feat fifo: expose private interface for the main runner threads

This commit is contained in:
Simon Cruanes 2024-03-04 15:46:25 -05:00
parent 51459f9b0b
commit 25104ce3b7
2 changed files with 39 additions and 13 deletions

View file

@ -107,6 +107,17 @@ type ('a, 'b) create_args =
?name:string -> ?name:string ->
'a 'a
let default_around_task_ : around_task = AT_pair (ignore, fun _ _ -> ())
let runner_of_state (pool : state) : t =
let run_async ~ls f = schedule_ pool @@ T_start { f; ls } in
Runner.For_runner_implementors.create
~shutdown:(fun ~wait () -> shutdown_ pool ~wait)
~run_async
~size:(fun () -> size_ pool)
~num_tasks:(fun () -> num_tasks_ pool)
()
let create ?(on_init_thread = default_thread_init_exit_) let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
?around_task ?num_threads ?name () : t = ?around_task ?num_threads ?name () : t =
@ -114,7 +125,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
let around_task = let around_task =
match around_task with match around_task with
| Some (f, g) -> AT_pair (f, g) | Some (f, g) -> AT_pair (f, g)
| None -> AT_pair (ignore, fun _ _ -> ()) | None -> default_around_task_
in in
let num_domains = Domain_pool_.n_domains () in let num_domains = Domain_pool_.n_domains () in
@ -126,20 +137,11 @@ let create ?(on_init_thread = default_thread_init_exit_)
let offset = Random.int num_domains in let offset = Random.int num_domains in
let pool = let pool =
let dummy = Thread.self () in let dummy_thread = Thread.self () in
{ threads = Array.make num_threads dummy; q = Bb_queue.create () } { threads = Array.make num_threads dummy_thread; q = Bb_queue.create () }
in in
let run_async ~ls f = schedule_ pool @@ T_start { f; ls } in let runner = runner_of_state pool in
let runner =
Runner.For_runner_implementors.create
~shutdown:(fun ~wait () -> shutdown_ pool ~wait)
~run_async
~size:(fun () -> size_ pool)
~num_tasks:(fun () -> num_tasks_ pool)
()
in
(* temporary queue used to obtain thread handles from domains (* temporary queue used to obtain thread handles from domains
on which the thread are started. *) on which the thread are started. *)
@ -203,3 +205,13 @@ let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
in in
let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in
f pool f pool
module Private_ = struct
type nonrec state = state
let create_state ~threads () : state = { threads; q = Bb_queue.create () }
let runner_of_state = runner_of_state
let run_thread (st : state) (self : t) ~on_exn : unit =
worker_thread_ st self ~on_exn ~around_task:default_around_task_
end

View file

@ -44,3 +44,17 @@ val with_ : (unit -> (t -> 'a) -> 'a, _) create_args
When [f pool] returns or fails, [pool] is shutdown and its resources When [f pool] returns or fails, [pool] is shutdown and its resources
are released. are released.
Most parameters are the same as in {!create}. *) Most parameters are the same as in {!create}. *)
(**/**)
module Private_ : sig
type state
val create_state : threads:Thread.t array -> unit -> state
val runner_of_state : state -> Runner.t
val run_thread :
state -> t -> on_exn:(exn -> Printexc.raw_backtrace -> unit) -> unit
end
(**/**)