From d09da9c0923da24e3a2122636a9d1cdcff9212c6 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 28 Oct 2023 13:00:15 -0400 Subject: [PATCH] breaking: change interface for number of threads now the user can specify `num_threads`; if not provided a sensible default is picked. --- src/fifo_pool.ml | 19 ++++++++----------- src/fifo_pool.mli | 8 ++++---- src/util_pool_.ml | 11 +++++++++++ src/util_pool_.mli | 5 +++++ src/ws_pool.ml | 17 ++++++----------- src/ws_pool.mli | 16 ++++++---------- 6 files changed, 40 insertions(+), 36 deletions(-) create mode 100644 src/util_pool_.ml create mode 100644 src/util_pool_.mli diff --git a/src/fifo_pool.ml b/src/fifo_pool.ml index 044e0013..1a95d715 100644 --- a/src/fifo_pool.ml +++ b/src/fifo_pool.ml @@ -56,13 +56,12 @@ type ('a, 'b) create_args = ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> ?around_task:(t -> 'b) * (t -> 'b -> unit) -> - ?min:int -> - ?per_domain:int -> + ?num_threads:int -> 'a let create ?(on_init_thread = default_thread_init_exit_) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) - ?around_task ?min:(min_threads = 1) ?(per_domain = 0) () : t = + ?around_task ?num_threads () : t = (* wrapper *) let around_task = match around_task with @@ -70,11 +69,10 @@ let create ?(on_init_thread = default_thread_init_exit_) | None -> AT_pair (ignore, fun _ _ -> ()) in - (* number of threads to run *) - let min_threads = max 1 min_threads in let num_domains = D_pool_.n_domains () in - assert (num_domains >= 1); - let num_threads = max min_threads (num_domains * per_domain) in + + (* number of threads to run *) + let num_threads = Util_pool_.num_threads ?num_threads () in (* make sure we don't bias towards the first domain(s) in {!D_pool_} *) let offset = Random.int num_domains in @@ -141,11 +139,10 @@ let create ?(on_init_thread = default_thread_init_exit_) runner -let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain - () f = +let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads () f + = let pool = - create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain - () + create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads () in let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in f pool diff --git a/src/fifo_pool.mli b/src/fifo_pool.mli index 252083c5..4371db58 100644 --- a/src/fifo_pool.mli +++ b/src/fifo_pool.mli @@ -21,8 +21,7 @@ type ('a, 'b) create_args = ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> ?around_task:(t -> 'b) * (t -> 'b -> unit) -> - ?min:int -> - ?per_domain:int -> + ?num_threads:int -> 'a (** Arguments used in {!create}. See {!create} for explanations. *) @@ -30,8 +29,9 @@ val create : (unit -> t, _) create_args (** [create ()] makes a new thread pool. @param on_init_thread called at the beginning of each new thread in the pool. @param min minimum size of the pool. See {!Pool.create_args}. - @param per_domain is the number of threads allocated per domain in the fixed - domain pool. See {!Pool.create_args}. + The default is [Domain.recommended_domain_count()], ie one worker per + CPU core. + On OCaml 4 the default is [4] (since there is only one domain). @param on_exit_thread called at the end of each worker thread in the pool. @param around_task a pair of [before, after] functions ran around each task. See {!Pool.create_args}. diff --git a/src/util_pool_.ml b/src/util_pool_.ml new file mode 100644 index 00000000..8207062a --- /dev/null +++ b/src/util_pool_.ml @@ -0,0 +1,11 @@ +let num_threads ?num_threads () : int = + let n_domains = D_pool_.n_domains () in + + (* number of threads to run *) + let num_threads = + match num_threads with + | Some j -> max 1 j + | None -> n_domains + in + + num_threads diff --git a/src/util_pool_.mli b/src/util_pool_.mli new file mode 100644 index 00000000..68fdde22 --- /dev/null +++ b/src/util_pool_.mli @@ -0,0 +1,5 @@ +(** Utils for pools *) + +val num_threads : ?num_threads:int -> unit -> int +(** Number of threads a pool should have. + @param num_threads user-specified number of threads *) diff --git a/src/ws_pool.ml b/src/ws_pool.ml index ca5d2500..179d555a 100644 --- a/src/ws_pool.ml +++ b/src/ws_pool.ml @@ -198,14 +198,13 @@ type ('a, 'b) create_args = ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> ?around_task:(t -> 'b) * (t -> 'b -> unit) -> - ?min:int -> - ?per_domain:int -> + ?num_threads:int -> 'a (** Arguments used in {!create}. See {!create} for explanations. *) let create ?(on_init_thread = default_thread_init_exit_) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) - ?around_task ?min:(min_threads = 1) ?(per_domain = 0) () : t = + ?around_task ?num_threads () : t = (* wrapper *) let around_task = match around_task with @@ -213,11 +212,8 @@ let create ?(on_init_thread = default_thread_init_exit_) | None -> AT_pair (ignore, fun _ _ -> ()) in - (* number of threads to run *) - let min_threads = max 1 min_threads in let num_domains = D_pool_.n_domains () in - assert (num_domains >= 1); - let num_threads = max min_threads (num_domains * per_domain) in + let num_threads = Util_pool_.num_threads ?num_threads () in (* make sure we don't bias towards the first domain(s) in {!D_pool_} *) let offset = Random.int num_domains in @@ -301,11 +297,10 @@ let create ?(on_init_thread = default_thread_init_exit_) runner -let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain - () f = +let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads () f + = let pool = - create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain - () + create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads () in let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in f pool diff --git a/src/ws_pool.mli b/src/ws_pool.mli index 4775024c..c13e4c75 100644 --- a/src/ws_pool.mli +++ b/src/ws_pool.mli @@ -26,8 +26,7 @@ type ('a, 'b) create_args = ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> ?around_task:(t -> 'b) * (t -> 'b -> unit) -> - ?min:int -> - ?per_domain:int -> + ?num_threads:int -> 'a (** Arguments used in {!create}. See {!create} for explanations. *) @@ -35,14 +34,11 @@ val create : (unit -> t, _) create_args (** [create ()] makes a new thread pool. @param on_init_thread called at the beginning of each new thread in the pool. - @param min minimum size of the pool. It will be at least [1] internally, - so [0] or negative values make no sense. - @param per_domain is the number of threads allocated per domain in the fixed - domain pool. The default value is [0], but setting, say, [~per_domain:2] - means that if there are [8] domains (which might be the case on an 8-core machine) - then the minimum size of the pool is [16]. - If both [min] and [per_domain] are specified, the maximum of both - [min] and [per_domain * num_of_domains] is used. + @param num_threads size of the pool, ie. number of worker threads. + It will be at least [1] internally, so [0] or negative values make no sense. + The default is [Domain.recommended_domain_count()], ie one worker + thread per CPU core. + On OCaml 4 the default is [4] (since there is only one domain). @param on_exit_thread called at the end of each thread in the pool @param around_task a pair of [before, after], where [before pool] is called before a task is processed,