feat: add Pool.with_

This commit is contained in:
Simon Cruanes 2023-07-10 01:14:07 -04:00
parent b080c962e1
commit 858755e812
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 35 additions and 3 deletions

View file

@ -3,6 +3,8 @@
module A = Atomic_ module A = Atomic_
include Runner include Runner
let ( let@ ) = ( @@ )
type thread_loop_wrapper = type thread_loop_wrapper =
thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit
@ -158,6 +160,17 @@ let shutdown_ ~wait (self : state) : unit =
if was_active then Array.iter Bb_queue.close self.qs; if was_active then Array.iter Bb_queue.close self.qs;
if wait then Array.iter Thread.join self.threads if wait then Array.iter Thread.join self.threads
type 'a create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?thread_wrappers:thread_loop_wrapper list ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'a) * (t -> 'a -> unit) ->
?min:int ->
?per_domain:int ->
'a
(** Arguments used in {!create}. See {!create} for explanations. *)
let create ?(on_init_thread = default_thread_init_exit_) let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(thread_wrappers = []) ?(on_exit_thread = default_thread_init_exit_) ?(thread_wrappers = [])
?(on_exn = fun _ _ -> ()) ?around_task ?min:(min_threads = 1) ?(on_exn = fun _ _ -> ()) ?around_task ?min:(min_threads = 1)
@ -256,3 +269,12 @@ let create ?(on_init_thread = default_thread_init_exit_)
done; done;
runner runner
let with_ ?on_init_thread ?on_exit_thread ?thread_wrappers ?on_exn ?around_task
?min ?per_domain () f =
let pool =
create ?on_init_thread ?on_exit_thread ?thread_wrappers ?on_exn ?around_task
?min ?per_domain ()
in
let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in
f pool

View file

@ -28,7 +28,7 @@ val add_global_thread_loop_wrapper : thread_loop_wrapper -> unit
thread, for all existing pools, and all new pools created with [create]. thread, for all existing pools, and all new pools created with [create].
These wrappers accumulate: they all apply, but their order is not specified. *) These wrappers accumulate: they all apply, but their order is not specified. *)
val create : type 'a create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?thread_wrappers:thread_loop_wrapper list -> ?thread_wrappers:thread_loop_wrapper list ->
@ -36,8 +36,10 @@ val create :
?around_task:(t -> 'a) * (t -> 'a -> unit) -> ?around_task:(t -> 'a) * (t -> 'a -> unit) ->
?min:int -> ?min:int ->
?per_domain:int -> ?per_domain:int ->
unit -> 'a
t (** Arguments used in {!create}. See {!create} for explanations. *)
val create : (unit -> t) create_args
(** [create ()] makes a new thread pool. (** [create ()] makes a new thread pool.
@param on_init_thread called at the beginning of each new thread @param on_init_thread called at the beginning of each new thread
in the pool. in the pool.
@ -58,6 +60,14 @@ val create :
the same thread after the task is over. (since 0.2) the same thread after the task is over. (since 0.2)
*) *)
val with_ : (unit -> (t -> 'a) -> 'a) create_args
(** [with_ () f] calls [f pool], where [pool] is obtained via {!create}.
When [f pool] returns or fails, [pool] is shutdown and its resources
are released.
Most parameters are the same as in {!create}.
@since NEXT_RELEASE *)
val run : t -> (unit -> unit) -> unit val run : t -> (unit -> unit) -> unit
[@@deprecated "use run_async"] [@@deprecated "use run_async"]
(** deprecated alias to {!run_async} *) (** deprecated alias to {!run_async} *)