From 858755e812633b397f3ac5616684cc3205f5fc1b Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 10 Jul 2023 01:14:07 -0400 Subject: [PATCH] feat: add `Pool.with_` --- src/pool.ml | 22 ++++++++++++++++++++++ src/pool.mli | 16 +++++++++++++--- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/src/pool.ml b/src/pool.ml index 7fb77ffc..2c74dfc5 100644 --- a/src/pool.ml +++ b/src/pool.ml @@ -3,6 +3,8 @@ module A = Atomic_ include Runner +let ( let@ ) = ( @@ ) + type thread_loop_wrapper = thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit @@ -158,6 +160,17 @@ let shutdown_ ~wait (self : state) : unit = if was_active then Array.iter Bb_queue.close self.qs; if wait then Array.iter Thread.join self.threads +type 'a create_args = + ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> + ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> + ?thread_wrappers:thread_loop_wrapper list -> + ?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> + ?around_task:(t -> 'a) * (t -> 'a -> unit) -> + ?min:int -> + ?per_domain:int -> + 'a +(** Arguments used in {!create}. See {!create} for explanations. *) + let create ?(on_init_thread = default_thread_init_exit_) ?(on_exit_thread = default_thread_init_exit_) ?(thread_wrappers = []) ?(on_exn = fun _ _ -> ()) ?around_task ?min:(min_threads = 1) @@ -256,3 +269,12 @@ let create ?(on_init_thread = default_thread_init_exit_) done; runner + +let with_ ?on_init_thread ?on_exit_thread ?thread_wrappers ?on_exn ?around_task + ?min ?per_domain () f = + let pool = + create ?on_init_thread ?on_exit_thread ?thread_wrappers ?on_exn ?around_task + ?min ?per_domain () + in + let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in + f pool diff --git a/src/pool.mli b/src/pool.mli index 20319a49..b176c405 100644 --- a/src/pool.mli +++ b/src/pool.mli @@ -28,7 +28,7 @@ val add_global_thread_loop_wrapper : thread_loop_wrapper -> unit thread, for all existing pools, and all new pools created with [create]. These wrappers accumulate: they all apply, but their order is not specified. *) -val create : +type 'a create_args = ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?thread_wrappers:thread_loop_wrapper list -> @@ -36,8 +36,10 @@ val create : ?around_task:(t -> 'a) * (t -> 'a -> unit) -> ?min:int -> ?per_domain:int -> - unit -> - t + 'a +(** Arguments used in {!create}. See {!create} for explanations. *) + +val create : (unit -> t) create_args (** [create ()] makes a new thread pool. @param on_init_thread called at the beginning of each new thread in the pool. @@ -58,6 +60,14 @@ val create : the same thread after the task is over. (since 0.2) *) +val with_ : (unit -> (t -> 'a) -> 'a) create_args +(** [with_ () f] calls [f pool], where [pool] is obtained via {!create}. + When [f pool] returns or fails, [pool] is shutdown and its resources + are released. + + Most parameters are the same as in {!create}. + @since NEXT_RELEASE *) + val run : t -> (unit -> unit) -> unit [@@deprecated "use run_async"] (** deprecated alias to {!run_async} *)