rename Pool to Ws_pool; deprecated Moonpool.Pool

This commit is contained in:
Simon Cruanes 2023-10-25 23:40:01 -04:00
parent 30035fa67d
commit 3e614ec992
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
25 changed files with 108 additions and 118 deletions

View file

@ -24,20 +24,20 @@ In addition, some concurrency and parallelism primitives are provided:
## Usage ## Usage
The user can create several thread pools. These pools use regular posix threads, The user can create several thread pools (implementing the interface `Runner.t`).
but the threads are spread across multiple domains (on OCaml 5), which enables These pools use regular posix threads, but the threads are spread across
parallelism. multiple domains (on OCaml 5), which enables parallelism.
The function `Pool.run_async pool task` runs `task()` on one of the workers The function `Runner.run_async pool task` schedules `task()` to run on one of
of `pool`, as soon as one is available. No result is returned. the workers of `pool`, as soon as one is available. No result is returned by `run_async`.
```ocaml ```ocaml
# #require "threads";; # #require "threads";;
# let pool = Moonpool.Pool.create ~min:4 ();; # let pool = Moonpool.Fifo_pool.create ~min:4 ();;
val pool : Moonpool.Runner.t = <abstr> val pool : Moonpool.Runner.t = <abstr>
# begin # begin
Moonpool.Pool.run_async pool Moonpool.Runner.run_async pool
(fun () -> (fun () ->
Thread.delay 0.1; Thread.delay 0.1;
print_endline "running from the pool"); print_endline "running from the pool");
@ -49,11 +49,13 @@ running from the pool
- : unit = () - : unit = ()
``` ```
To wait until the task is done, you can use `Pool.run_wait_block` instead: To wait until the task is done, you can use `Runner.run_wait_block`[^1] instead:
[^1]: beware of deadlock! See documentation for more details.
```ocaml ```ocaml
# begin # begin
Moonpool.Pool.run_wait_block pool Moonpool.Runner.run_wait_block pool
(fun () -> (fun () ->
Thread.delay 0.1; Thread.delay 0.1;
print_endline "running from the pool"); print_endline "running from the pool");
@ -155,7 +157,7 @@ val expected_sum : int = 5050
On OCaml 5, again using effect handlers, the module `Fork_join` On OCaml 5, again using effect handlers, the module `Fork_join`
implements the [fork-join model](https://en.wikipedia.org/wiki/Fork%E2%80%93join_model). implements the [fork-join model](https://en.wikipedia.org/wiki/Fork%E2%80%93join_model).
It must run on a pool (using [Pool.run] or inside a future via [Future.spawn]). It must run on a pool (using [Runner.run_async] or inside a future via [Fut.spawn]).
```ocaml ```ocaml
# let rec select_sort arr i len = # let rec select_sort arr i len =
@ -257,7 +259,7 @@ This works for OCaml >= 4.08.
the same pool, too — this is useful for threads blocking on IO). the same pool, too — this is useful for threads blocking on IO).
A useful analogy is that each domain is a bit like a CPU core, and `Thread.t` is a logical thread running on a core. A useful analogy is that each domain is a bit like a CPU core, and `Thread.t` is a logical thread running on a core.
Multiple threads have to share a single core and do not run in parallel on it[^1]. Multiple threads have to share a single core and do not run in parallel on it[^2].
We can therefore build pools that spread their worker threads on multiple cores to enable parallelism within each pool. We can therefore build pools that spread their worker threads on multiple cores to enable parallelism within each pool.
TODO: actually use https://github.com/haesbaert/ocaml-processor to pin domains to cores, TODO: actually use https://github.com/haesbaert/ocaml-processor to pin domains to cores,
@ -273,4 +275,4 @@ MIT license.
$ opam install moonpool $ opam install moonpool
``` ```
[^1]: let's not talk about hyperthreading. [^2]: let's not talk about hyperthreading.

View file

@ -21,7 +21,7 @@ let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ])
let create_pool ~psize ~kind () = let create_pool ~psize ~kind () =
match kind with match kind with
| "fifo" -> Fifo_pool.create ~min:psize () | "fifo" -> Fifo_pool.create ~min:psize ()
| "pool" -> Pool.create ~min:psize () | "pool" -> Ws_pool.create ~min:psize ()
| _ -> assert false | _ -> assert false
let run ~psize ~n ~seq ~niter ~kind () : unit = let run ~psize ~n ~seq ~niter ~kind () : unit =
@ -38,7 +38,7 @@ let run ~psize ~n ~seq ~niter ~kind () : unit =
in in
Printf.printf "fib %d = %d\n%!" n res Printf.printf "fib %d = %d\n%!" n res
done; done;
if not seq then Pool.shutdown (Lazy.force pool) if not seq then Ws_pool.shutdown (Lazy.force pool)
let () = let () =
let n = ref 40 in let n = ref 40 in

View file

@ -21,9 +21,9 @@ let with_pool ~kind f =
match kind with match kind with
| "pool" -> | "pool" ->
if !j = 0 then if !j = 0 then
Pool.with_ ~per_domain:1 f Ws_pool.with_ ~per_domain:1 f
else else
Pool.with_ ~min:!j f Ws_pool.with_ ~min:!j f
| "fifo" -> | "fifo" ->
if !j = 0 then if !j = 0 then
Fifo_pool.with_ ~per_domain:1 f Fifo_pool.with_ ~per_domain:1 f
@ -35,7 +35,7 @@ let with_pool ~kind f =
let run_par1 ~kind (num_steps : int) : float = let run_par1 ~kind (num_steps : int) : float =
let@ pool = with_pool ~kind () in let@ pool = with_pool ~kind () in
let num_tasks = Pool.size pool in let num_tasks = Ws_pool.size pool in
let step = 1. /. float num_steps in let step = 1. /. float num_steps in
let global_sum = Lock.create 0. in let global_sum = Lock.create 0. in
@ -64,12 +64,12 @@ let run_par1 ~kind (num_steps : int) : float =
let run_fork_join ~kind num_steps : float = let run_fork_join ~kind num_steps : float =
let@ pool = with_pool ~kind () in let@ pool = with_pool ~kind () in
let num_tasks = Pool.size pool in let num_tasks = Ws_pool.size pool in
let step = 1. /. float num_steps in let step = 1. /. float num_steps in
let global_sum = Lock.create 0. in let global_sum = Lock.create 0. in
Pool.run_wait_block pool (fun () -> Ws_pool.run_wait_block pool (fun () ->
Fork_join.for_ Fork_join.for_
~chunk_size:(3 + (num_steps / num_tasks)) ~chunk_size:(3 + (num_steps / num_tasks))
num_steps num_steps

View file

@ -97,7 +97,7 @@ let spawn ~on f : _ t =
fulfill promise res fulfill promise res
in in
Pool.run_async on task; Runner.run_async on task;
fut fut
let reify_error (f : 'a t) : 'a or_error t = let reify_error (f : 'a t) : 'a or_error t =
@ -131,7 +131,7 @@ let map ?on ~f fut : _ t =
match on with match on with
| None -> map_and_fulfill () | None -> map_and_fulfill ()
| Some on -> Pool.run_async on map_and_fulfill); | Some on -> Runner.run_async on map_and_fulfill);
fut2 fut2
@ -158,14 +158,14 @@ let bind ?on ~f fut : _ t =
| None -> apply_f_to_res r | None -> apply_f_to_res r
| Some on -> | Some on ->
let fut2, promise = make () in let fut2, promise = make () in
Pool.run_async on (bind_and_fulfill r promise); Runner.run_async on (bind_and_fulfill r promise);
fut2) fut2)
| None -> | None ->
let fut2, promise = make () in let fut2, promise = make () in
on_result fut (fun r -> on_result fut (fun r ->
match on with match on with
| None -> bind_and_fulfill r promise () | None -> bind_and_fulfill r promise ()
| Some on -> Pool.run_async on (bind_and_fulfill r promise)); | Some on -> Runner.run_async on (bind_and_fulfill r promise));
fut2 fut2
@ -403,7 +403,7 @@ module type INFIX = sig
end end
module Infix_ (X : sig module Infix_ (X : sig
val pool : Pool.t option val pool : Runner.t option
end) : INFIX = struct end) : INFIX = struct
let[@inline] ( >|= ) x f = map ?on:X.pool ~f x let[@inline] ( >|= ) x f = map ?on:X.pool ~f x
let[@inline] ( >>= ) x f = bind ?on:X.pool ~f x let[@inline] ( >>= ) x f = bind ?on:X.pool ~f x
@ -420,7 +420,7 @@ end)
include Infix_local include Infix_local
module Infix (X : sig module Infix (X : sig
val pool : Pool.t val pool : Runner.t
end) = end) =
Infix_ (struct Infix_ (struct
let pool = Some X.pool let pool = Some X.pool

View file

@ -9,7 +9,8 @@ module Chan = Chan
module Fork_join = Fork_join module Fork_join = Fork_join
module Fut = Fut module Fut = Fut
module Lock = Lock module Lock = Lock
module Pool = Pool module Pool = Fifo_pool
module Ws_pool = Ws_pool
module Runner = Runner module Runner = Runner
module Fifo_pool = Fifo_pool module Fifo_pool = Fifo_pool

View file

@ -9,10 +9,14 @@
primitives such as guarding locks ({!Lock.t}) and futures ({!Fut.t}). primitives such as guarding locks ({!Lock.t}) and futures ({!Fut.t}).
*) *)
module Pool = Pool module Ws_pool = Ws_pool
module Fifo_pool = Fifo_pool module Fifo_pool = Fifo_pool
module Runner = Runner module Runner = Runner
module Pool = Fifo_pool
[@@deprecated "use Fifo_pool or Ws_pool"]
(** Default pool. Please explicitly pick an implementation instead. *)
val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t
(** Similar to {!Thread.create}, but it picks a background domain at random (** Similar to {!Thread.create}, but it picks a background domain at random
to run the thread. This ensures that we don't always pick the same domain to run the thread. This ensures that we don't always pick the same domain

View file

@ -4,9 +4,6 @@ include Runner
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
type thread_loop_wrapper =
thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit
type worker_state = { type worker_state = {
mutable thread: Thread.t; mutable thread: Thread.t;
q: task WSQ.t; (** Work stealing queue *) q: task WSQ.t; (** Work stealing queue *)
@ -227,7 +224,6 @@ let shutdown_ ~wait (self : state) : unit =
type ('a, 'b) create_args = type ('a, 'b) create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?thread_wrappers:thread_loop_wrapper list ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> ?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) -> ?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?min:int -> ?min:int ->
@ -236,9 +232,8 @@ type ('a, 'b) create_args =
(** Arguments used in {!create}. See {!create} for explanations. *) (** Arguments used in {!create}. See {!create} for explanations. *)
let create ?(on_init_thread = default_thread_init_exit_) let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(thread_wrappers = []) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
?(on_exn = fun _ _ -> ()) ?around_task ?min:(min_threads = 1) ?around_task ?min:(min_threads = 1) ?(per_domain = 0) () : t =
?(per_domain = 0) () : t =
(* wrapper *) (* wrapper *)
let around_task = let around_task =
match around_task with match around_task with
@ -294,16 +289,9 @@ let create ?(on_init_thread = default_thread_init_exit_)
on_init_thread ~dom_id:dom_idx ~t_id (); on_init_thread ~dom_id:dom_idx ~t_id ();
let run () = worker_thread_ pool runner w ~on_exn ~around_task in let run () = worker_thread_ pool runner w ~on_exn ~around_task in
(* the actual worker loop is [worker_thread_], with all
wrappers for this pool and for all pools (global_thread_wrappers_) *)
let run' =
List.fold_left
(fun run f -> f ~thread ~pool:runner run)
run thread_wrappers
in
(* now run the main loop *) (* now run the main loop *)
Fun.protect run' ~finally:(fun () -> Fun.protect run ~finally:(fun () ->
(* on termination, decrease refcount of underlying domain *) (* on termination, decrease refcount of underlying domain *)
D_pool_.decr_on dom_idx); D_pool_.decr_on dom_idx);
on_exit_thread ~dom_id:dom_idx ~t_id () on_exit_thread ~dom_id:dom_idx ~t_id ()
@ -335,11 +323,11 @@ let create ?(on_init_thread = default_thread_init_exit_)
runner runner
let with_ ?on_init_thread ?on_exit_thread ?thread_wrappers ?on_exn ?around_task let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain
?min ?per_domain () f = () f =
let pool = let pool =
create ?on_init_thread ?on_exit_thread ?thread_wrappers ?on_exn ?around_task create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain
?min ?per_domain () ()
in in
let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in
f pool f pool

View file

@ -1,7 +1,13 @@
(** Thread pool. (** Work-stealing thread pool.
A pool of threads. The pool contains a fixed number of threads that A pool of threads with a worker-stealing scheduler.
wait for work items to come, process these, and loop. The pool contains a fixed number of threads that wait for work
items to come, process these, and loop.
This is good for CPU-intensive tasks that feature a lot of small tasks.
Note that tasks will not always be processed in the order they are
scheduled, so this is not great for workloads where the latency
of individual tasks matter (for that see {!Fifo_pool}).
This implements {!Runner.t} since 0.3. This implements {!Runner.t} since 0.3.
@ -15,18 +21,9 @@
include module type of Runner include module type of Runner
type thread_loop_wrapper =
thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit
(** A thread wrapper [f] takes the current thread, the current pool,
and the worker function [loop : unit -> unit] which is
the worker's main loop, and returns a new loop function.
By default it just returns the same loop function but it can be used
to install tracing, effect handlers, etc. *)
type ('a, 'b) create_args = type ('a, 'b) create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?thread_wrappers:thread_loop_wrapper list ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> ?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) -> ?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?min:int -> ?min:int ->
@ -47,8 +44,6 @@ val create : (unit -> t, _) create_args
If both [min] and [per_domain] are specified, the maximum of both If both [min] and [per_domain] are specified, the maximum of both
[min] and [per_domain * num_of_domains] is used. [min] and [per_domain * num_of_domains] is used.
@param on_exit_thread called at the end of each thread in the pool @param on_exit_thread called at the end of each thread in the pool
@param thread_wrappers a list of {!thread_loop_wrapper} functions
to use for this pool's workers.
@param around_task a pair of [before, after], where [before pool] is called @param around_task a pair of [before, after], where [before pool] is called
before a task is processed, before a task is processed,
on the worker thread about to run it, and returns [x]; and [after pool x] is called by on the worker thread about to run it, and returns [x]; and [after pool x] is called by

View file

@ -26,13 +26,13 @@ let fib ~on x : int Fut.t =
let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ]) let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ])
let fib_40 : int = let fib_40 : int =
let pool = Pool.create ~min:8 () in let pool = Ws_pool.create ~min:8 () in
fib ~on:pool 40 |> Fut.wait_block_exn fib ~on:pool 40 |> Fut.wait_block_exn
let () = Printf.printf "fib 40 = %d\n%!" fib_40 let () = Printf.printf "fib 40 = %d\n%!" fib_40
let run_test () = let run_test () =
let pool = Pool.create ~min:8 () in let pool = Ws_pool.create ~min:8 () in
assert ( assert (
List.init 10 (fib ~on:pool) List.init 10 (fib ~on:pool)
@ -42,7 +42,7 @@ let run_test () =
let fibs = Array.init 3 (fun _ -> fib ~on:pool 40) in let fibs = Array.init 3 (fun _ -> fib ~on:pool 40) in
let res = Fut.join_array fibs |> Fut.wait_block in let res = Fut.join_array fibs |> Fut.wait_block in
Pool.shutdown pool; Ws_pool.shutdown pool;
assert (res = Ok (Array.make 3 fib_40)) assert (res = Ok (Array.make 3 fib_40))

View file

@ -27,13 +27,13 @@ let fib ~on x : int Fut.t =
let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ]) let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ])
let fib_40 : int = let fib_40 : int =
let pool = Pool.create ~min:8 () in let pool = Ws_pool.create ~min:8 () in
fib ~on:pool 40 |> Fut.wait_block_exn fib ~on:pool 40 |> Fut.wait_block_exn
let () = Printf.printf "fib 40 = %d\n%!" fib_40 let () = Printf.printf "fib 40 = %d\n%!" fib_40
let run_test () = let run_test () =
let pool = Pool.create ~min:8 () in let pool = Ws_pool.create ~min:8 () in
assert ( assert (
List.init 10 (fib ~on:pool) List.init 10 (fib ~on:pool)
@ -43,7 +43,7 @@ let run_test () =
let fibs = Array.init 3 (fun _ -> fib ~on:pool 40) in let fibs = Array.init 3 (fun _ -> fib ~on:pool 40) in
let res = Fut.join_array fibs |> Fut.wait_block in let res = Fut.join_array fibs |> Fut.wait_block in
Pool.shutdown pool; Ws_pool.shutdown pool;
assert (res = Ok (Array.make 3 fib_40)) assert (res = Ok (Array.make 3 fib_40))

View file

@ -22,13 +22,13 @@ let rec fib x : int =
) )
let fib_40 : int = let fib_40 : int =
let@ pool = Pool.with_ ~min:8 () in let@ pool = Ws_pool.with_ ~min:8 () in
Fut.spawn ~on:pool (fun () -> fib 40) |> Fut.wait_block_exn Fut.spawn ~on:pool (fun () -> fib 40) |> Fut.wait_block_exn
let () = Printf.printf "fib 40 = %d\n%!" fib_40 let () = Printf.printf "fib 40 = %d\n%!" fib_40
let run_test () = let run_test () =
let@ pool = Pool.with_ ~min:8 () in let@ pool = Ws_pool.with_ ~min:8 () in
let fut = let fut =
Fut.spawn ~on:pool (fun () -> Fut.spawn ~on:pool (fun () ->
@ -37,7 +37,7 @@ let run_test () =
in in
let res = Fut.wait_block_exn fut in let res = Fut.wait_block_exn fut in
Pool.shutdown pool; Ws_pool.shutdown pool;
assert (res = (Array.make 3 fib_40 |> Array.to_list)) assert (res = (Array.make 3 fib_40 |> Array.to_list))

View file

@ -5,11 +5,11 @@ let ( let@ ) = ( @@ )
open! Moonpool open! Moonpool
let pool = Pool.create ~min:4 () let pool = Ws_pool.create ~min:4 ()
let () = let () =
let x = let x =
Pool.run_wait_block pool (fun () -> Ws_pool.run_wait_block pool (fun () ->
let x, y = let x, y =
Fork_join.both Fork_join.both
(fun () -> (fun () ->
@ -25,7 +25,7 @@ let () =
let () = let () =
try try
Pool.run_wait_block pool (fun () -> Ws_pool.run_wait_block pool (fun () ->
Fork_join.both_ignore Fork_join.both_ignore
(fun () -> Thread.delay 0.005) (fun () -> Thread.delay 0.005)
(fun () -> (fun () ->
@ -36,21 +36,21 @@ let () =
let () = let () =
let par_sum = let par_sum =
Pool.run_wait_block pool (fun () -> Ws_pool.run_wait_block pool (fun () ->
Fork_join.all_init 42 (fun i -> i * i) |> List.fold_left ( + ) 0) Fork_join.all_init 42 (fun i -> i * i) |> List.fold_left ( + ) 0)
in in
let exp_sum = List.init 42 (fun x -> x * x) |> List.fold_left ( + ) 0 in let exp_sum = List.init 42 (fun x -> x * x) |> List.fold_left ( + ) 0 in
assert (par_sum = exp_sum) assert (par_sum = exp_sum)
let () = let () =
Pool.run_wait_block pool (fun () -> Ws_pool.run_wait_block pool (fun () ->
Fork_join.for_ 0 (fun _ _ -> assert false)); Fork_join.for_ 0 (fun _ _ -> assert false));
() ()
let () = let () =
let total_sum = Atomic.make 0 in let total_sum = Atomic.make 0 in
Pool.run_wait_block pool (fun () -> Ws_pool.run_wait_block pool (fun () ->
Fork_join.for_ ~chunk_size:5 100 (fun low high -> Fork_join.for_ ~chunk_size:5 100 (fun low high ->
(* iterate on the range sequentially. The range should have 5 items or less. *) (* iterate on the range sequentially. The range should have 5 items or less. *)
let local_sum = ref 0 in let local_sum = ref 0 in
@ -63,7 +63,7 @@ let () =
let () = let () =
let total_sum = Atomic.make 0 in let total_sum = Atomic.make 0 in
Pool.run_wait_block pool (fun () -> Ws_pool.run_wait_block pool (fun () ->
Fork_join.for_ ~chunk_size:1 100 (fun low high -> Fork_join.for_ ~chunk_size:1 100 (fun low high ->
assert (low = high); assert (low = high);
ignore (Atomic.fetch_and_add total_sum low : int))); ignore (Atomic.fetch_and_add total_sum low : int)));
@ -270,7 +270,7 @@ end
let t_eval = let t_eval =
let arb = Q.set_stats [ "size", Evaluator.size ] Evaluator.arb in let arb = Q.set_stats [ "size", Evaluator.size ] Evaluator.arb in
Q.Test.make ~name:"same eval" arb (fun e -> Q.Test.make ~name:"same eval" arb (fun e ->
let@ pool = Pool.with_ ~min:4 () in let@ pool = Ws_pool.with_ ~min:4 () in
(* Printf.eprintf "eval %s\n%!" (Evaluator.show e); *) (* Printf.eprintf "eval %s\n%!" (Evaluator.show e); *)
let x = Evaluator.eval_seq e in let x = Evaluator.eval_seq e in
let y = Evaluator.eval_fork_join ~pool e in let y = Evaluator.eval_fork_join ~pool e in
@ -288,8 +288,8 @@ let t_for_nested ~min ~chunk_size () =
let ref_l2 = List.map (List.map neg) ref_l1 in let ref_l2 = List.map (List.map neg) ref_l1 in
let l1, l2 = let l1, l2 =
let@ pool = Pool.with_ ~min () in let@ pool = Ws_pool.with_ ~min () in
let@ () = Pool.run_wait_block pool in let@ () = Ws_pool.run_wait_block pool in
let l1 = let l1 =
Fork_join.map_list ~chunk_size (Fork_join.map_list ~chunk_size neg) l Fork_join.map_list ~chunk_size (Fork_join.map_list ~chunk_size neg) l
in in
@ -310,8 +310,8 @@ let t_map ~chunk_size () =
Q.Test.make ~name:"map1" Q.Test.make ~name:"map1"
Q.(small_list small_int |> Q.set_stats [ "len", List.length ]) Q.(small_list small_int |> Q.set_stats [ "len", List.length ])
(fun l -> (fun l ->
let@ pool = Pool.with_ ~min:4 () in let@ pool = Ws_pool.with_ ~min:4 () in
let@ () = Pool.run_wait_block pool in let@ () = Ws_pool.run_wait_block pool in
let a1 = let a1 =
Fork_join.map_list ~chunk_size string_of_int l |> Array.of_list Fork_join.map_list ~chunk_size string_of_int l |> Array.of_list

View file

@ -27,8 +27,8 @@ let run ~min () =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "step" in let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "step" in
let l1, l2 = let l1, l2 =
let@ pool = Pool.with_ ~min () in let@ pool = Ws_pool.with_ ~min () in
let@ () = Pool.run_wait_block pool in let@ () = Ws_pool.run_wait_block pool in
let l1, l2 = let l1, l2 =
Fork_join.both Fork_join.both

View file

@ -2,7 +2,7 @@
open! Moonpool open! Moonpool
let pool = Pool.create ~min:4 () let pool = Ws_pool.create ~min:4 ()
let () = let () =
let fut = Array.init 10 (fun i -> Fut.spawn ~on:pool (fun () -> i)) in let fut = Array.init 10 (fun i -> Fut.spawn ~on:pool (fun () -> i)) in

View file

@ -34,15 +34,15 @@ let () =
run ~pool ()); run ~pool ());
(print_endline "with WS(1)"; (print_endline "with WS(1)";
let@ pool = Pool.with_ ~min:1 () in let@ pool = Ws_pool.with_ ~min:1 () in
run ~pool ()); run ~pool ());
(print_endline "with WS(2)"; (print_endline "with WS(2)";
let@ pool = Pool.with_ ~min:2 () in let@ pool = Ws_pool.with_ ~min:2 () in
run ~pool ()); run ~pool ());
(print_endline "with WS(4)"; (print_endline "with WS(4)";
let@ pool = Pool.with_ ~min:4 () in let@ pool = Ws_pool.with_ ~min:4 () in
run ~pool ()); run ~pool ());
() ()

View file

@ -59,7 +59,7 @@ let rec quicksort arr i len : unit =
(fun () -> quicksort arr !low (len - (!low - i))) (fun () -> quicksort arr !low (len - (!low - i)))
) )
let pool = Moonpool.Pool.create ~min:8 () let pool = Moonpool.Ws_pool.create ~min:8 ()
let () = let () =
let arr = Array.init 400_000 (fun _ -> Random.int 300_000) in let arr = Array.init 400_000 (fun _ -> Random.int 300_000) in

View file

@ -8,7 +8,7 @@ let rec fib x =
let run ~psize ~n ~j () : _ Fut.t = let run ~psize ~n ~j () : _ Fut.t =
Printf.printf "pool size=%d, n=%d, j=%d\n%!" psize n j; Printf.printf "pool size=%d, n=%d, j=%d\n%!" psize n j;
let pool = Pool.create ~min:psize ~per_domain:0 () in let pool = Ws_pool.create ~min:psize ~per_domain:0 () in
(* TODO: a ppx for tracy so we can use instrumentation *) (* TODO: a ppx for tracy so we can use instrumentation *)
let loop () = let loop () =

View file

@ -1,7 +1,7 @@
open Moonpool open Moonpool
(* large pool, some of our tasks below are long lived *) (* large pool, some of our tasks below are long lived *)
let pool = Pool.create ~min:30 () let pool = Ws_pool.create ~min:30 ()
open (val Fut.infix pool) open (val Fut.infix pool)

View file

@ -5,7 +5,7 @@ let ( let@ ) = ( @@ )
let with_pool ~kind () f = let with_pool ~kind () f =
match kind with match kind with
| `Fifo_pool -> Fifo_pool.with_ ~min:4 () f | `Fifo_pool -> Fifo_pool.with_ ~min:4 () f
| `Pool -> Pool.with_ ~min:4 () f | `Ws_pool -> Ws_pool.with_ ~min:4 () f
let rec fib x = let rec fib x =
if x <= 1 then if x <= 1 then
@ -18,7 +18,7 @@ let () = assert (List.init 10 fib = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ])
let run_test ~pool () = let run_test ~pool () =
let fibs = Array.init 30 (fun n -> Fut.spawn ~on:pool (fun () -> fib n)) in let fibs = Array.init 30 (fun n -> Fut.spawn ~on:pool (fun () -> fib n)) in
let res = Fut.join_array fibs |> Fut.wait_block in let res = Fut.join_array fibs |> Fut.wait_block in
Pool.shutdown pool; Ws_pool.shutdown pool;
assert ( assert (
res res
@ -74,5 +74,5 @@ let run ~kind () =
Array.iter Thread.join jobs Array.iter Thread.join jobs
let () = let () =
run ~kind:`Pool (); run ~kind:`Ws_pool ();
run ~kind:`Fifo_pool () run ~kind:`Fifo_pool ()

View file

@ -25,9 +25,9 @@ let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ])
let fib_40 : int lazy_t = let fib_40 : int lazy_t =
lazy lazy
(let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "fib40" in (let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "fib40" in
let pool = Pool.create ~min:8 () in let pool = Ws_pool.create ~min:8 () in
let r = fib ~on:pool 40 |> Fut.wait_block_exn in let r = fib ~on:pool 40 |> Fut.wait_block_exn in
Pool.shutdown pool; Ws_pool.shutdown pool;
r) r)
let run_test ~pool () = let run_test ~pool () =
@ -49,7 +49,7 @@ let run_test ~pool () =
let run_test_size ~size () = let run_test_size ~size () =
Printf.printf "test pool(%d)\n%!" size; Printf.printf "test pool(%d)\n%!" size;
let@ pool = Pool.with_ ~min:size () in let@ pool = Ws_pool.with_ ~min:size () in
run_test ~pool () run_test ~pool ()
let run_test_fifo ~size () = let run_test_fifo ~size () =

View file

@ -1,7 +1,7 @@
open! Moonpool open! Moonpool
let pool = Pool.create ~min:4 () let pool = Ws_pool.create ~min:4 ()
let pool2 = Pool.create ~min:2 () let pool2 = Ws_pool.create ~min:2 ()
let () = let () =
let fut = Fut.return 1 in let fut = Fut.return 1 in

View file

@ -8,7 +8,7 @@ let add_test t = tests := t :: !tests
let with_pool ~kind () f = let with_pool ~kind () f =
match kind with match kind with
| `Fifo_pool -> Fifo_pool.with_ ~min:4 ~per_domain:1 () f | `Fifo_pool -> Fifo_pool.with_ ~min:4 ~per_domain:1 () f
| `Pool -> Pool.with_ ~min:4 ~per_domain:1 () f | `Ws_pool -> Ws_pool.with_ ~min:4 ~per_domain:1 () f
let () = let () =
add_test @@ fun ~kind -> add_test @@ fun ~kind ->
@ -48,7 +48,7 @@ let () =
let () = let () =
let tests = let tests =
List.map (fun t -> [ t ~kind:`Fifo_pool; t ~kind:`Pool ]) !tests List.map (fun t -> [ t ~kind:`Fifo_pool; t ~kind:`Ws_pool ]) !tests
|> List.flatten |> List.flatten
in in
QCheck_base_runner.run_tests_main tests QCheck_base_runner.run_tests_main tests

View file

@ -5,7 +5,7 @@ let ( let@ ) = ( @@ )
let with_pool ~kind () f = let with_pool ~kind () f =
match kind with match kind with
| `Fifo_pool -> Fifo_pool.with_ ~min:4 ~per_domain:1 () f | `Fifo_pool -> Fifo_pool.with_ ~min:4 ~per_domain:1 () f
| `Pool -> Pool.with_ ~min:4 ~per_domain:1 () f | `Ws_pool -> Ws_pool.with_ ~min:4 ~per_domain:1 () f
(* test proper resource handling *) (* test proper resource handling *)
let run ~kind () = let run ~kind () =
@ -18,10 +18,10 @@ let run ~kind () =
(* allocate a new pool at each iteration *) (* allocate a new pool at each iteration *)
let@ p = with_pool ~kind () in let@ p = with_pool ~kind () in
Pool.run_wait_block p (fun () -> Atomic.incr a) Ws_pool.run_wait_block p (fun () -> Atomic.incr a)
done; done;
assert (Atomic.get a = 1_000) assert (Atomic.get a = 1_000)
let () = let () =
run ~kind:`Pool (); run ~kind:`Ws_pool ();
run ~kind:`Fifo_pool () run ~kind:`Fifo_pool ()

View file

@ -5,7 +5,7 @@ let ( let@ ) = ( @@ )
let with_pool ~kind ~j () f = let with_pool ~kind ~j () f =
match kind with match kind with
| `Fifo_pool -> Fifo_pool.with_ ~min:j () f | `Fifo_pool -> Fifo_pool.with_ ~min:j () f
| `Pool -> Pool.with_ ~min:j () f | `Ws_pool -> Ws_pool.with_ ~min:j () f
type 'a tree = type 'a tree =
| Leaf of 'a | Leaf of 'a
@ -88,5 +88,5 @@ let () =
(* (*
Tracy_client_trace.setup (); Tracy_client_trace.setup ();
*) *)
run_main ~kind:`Pool (); run_main ~kind:`Ws_pool ();
run_main ~kind:`Fifo_pool () run_main ~kind:`Fifo_pool ()

View file

@ -15,32 +15,32 @@ let run ~kind () =
let on_init_thread ~dom_id:_ ~t_id () = let on_init_thread ~dom_id:_ ~t_id () =
Trace.set_thread_name (Printf.sprintf "pool worker %d" t_id) Trace.set_thread_name (Printf.sprintf "pool worker %d" t_id)
and around_task = and around_task =
( (fun self -> Trace.counter_int "n_tasks" (Pool.num_tasks self)), ( (fun self -> Trace.counter_int "n_tasks" (Ws_pool.num_tasks self)),
fun self () -> Trace.counter_int "n_tasks" (Pool.num_tasks self) ) fun self () -> Trace.counter_int "n_tasks" (Ws_pool.num_tasks self) )
in in
match kind with match kind with
| `Simple -> Fifo_pool.create ~min:3 ~on_init_thread ~around_task () | `Simple -> Fifo_pool.create ~min:3 ~on_init_thread ~around_task ()
| `Pool -> Pool.create ~min:3 ~on_init_thread ~around_task () | `Ws_pool -> Ws_pool.create ~min:3 ~on_init_thread ~around_task ()
in in
(* make all threads busy *) (* make all threads busy *)
Pool.run_async pool (sleep_for 0.01); Ws_pool.run_async pool (sleep_for 0.01);
Pool.run_async pool (sleep_for 0.01); Ws_pool.run_async pool (sleep_for 0.01);
Pool.run_async pool (sleep_for 0.05); Ws_pool.run_async pool (sleep_for 0.05);
let t = Unix.gettimeofday () in let t = Unix.gettimeofday () in
for _i = 1 to 100 do for _i = 1 to 100 do
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "schedule step" in let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "schedule step" in
Pool.run_async pool (sleep_for 0.001); Ws_pool.run_async pool (sleep_for 0.001);
Pool.run_async pool (sleep_for 0.001); Ws_pool.run_async pool (sleep_for 0.001);
Pool.run_async pool (sleep_for 0.01) Ws_pool.run_async pool (sleep_for 0.01)
done; done;
Printf.printf "pool size: %d\n%!" (Pool.num_tasks pool); Printf.printf "pool size: %d\n%!" (Ws_pool.num_tasks pool);
(let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "shutdown" in (let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "shutdown" in
Pool.shutdown pool); Ws_pool.shutdown pool);
Printf.printf "pool size after shutdown: %d\n%!" (Pool.num_tasks pool); Printf.printf "pool size after shutdown: %d\n%!" (Ws_pool.num_tasks pool);
let elapsed = Unix.gettimeofday () -. t in let elapsed = Unix.gettimeofday () -. t in
Printf.printf "elapsed: %.4fs\n%!" elapsed Printf.printf "elapsed: %.4fs\n%!" elapsed
@ -49,4 +49,4 @@ let () =
let@ () = Trace_tef.with_setup () in let@ () = Trace_tef.with_setup () in
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
run ~kind:`Simple (); run ~kind:`Simple ();
run ~kind:`Pool () run ~kind:`Ws_pool ()