mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 11:15:38 -05:00
test: add load test for chans
This commit is contained in:
parent
25e8dcdbce
commit
eab774813d
2 changed files with 96 additions and 1 deletions
|
|
@ -1,5 +1,5 @@
|
||||||
(tests
|
(tests
|
||||||
(names t_fib t_bench1 t_fib_rec t_futs1 t_tree_futs t_props)
|
(names t_fib t_bench1 t_fib_rec t_futs1 t_tree_futs t_props t_chan_train)
|
||||||
(libraries moonpool qcheck-core qcheck-core.runner
|
(libraries moonpool qcheck-core qcheck-core.runner
|
||||||
;tracy-client.trace
|
;tracy-client.trace
|
||||||
trace))
|
trace))
|
||||||
|
|
|
||||||
95
test/t_chan_train.ml
Normal file
95
test/t_chan_train.ml
Normal file
|
|
@ -0,0 +1,95 @@
|
||||||
|
open Moonpool
|
||||||
|
|
||||||
|
(* large pool, some of our tasks below are long lived *)
|
||||||
|
let pool = Pool.create ~min:30 ()
|
||||||
|
|
||||||
|
open (val Fut.infix pool)
|
||||||
|
|
||||||
|
type event =
|
||||||
|
| E_int of int
|
||||||
|
| E_close
|
||||||
|
|
||||||
|
let mk_chan (ic : event Chan.t) : event Chan.t =
|
||||||
|
let out = Chan.create () in
|
||||||
|
|
||||||
|
let rec loop () =
|
||||||
|
let* ev = Chan.pop ic in
|
||||||
|
Chan.push out ev;
|
||||||
|
match ev with
|
||||||
|
| E_close -> Fut.return ()
|
||||||
|
| E_int _x -> loop ()
|
||||||
|
in
|
||||||
|
|
||||||
|
ignore (Fut.spawn ~on:pool loop : _ Fut.t);
|
||||||
|
out
|
||||||
|
|
||||||
|
(* a train of channels connected to one another, with a
|
||||||
|
loop pushing events from the input to the output *)
|
||||||
|
let rec mk_train n ic : _ Chan.t =
|
||||||
|
if n = 0 then
|
||||||
|
ic
|
||||||
|
else (
|
||||||
|
let c = mk_chan ic in
|
||||||
|
mk_train (n - 1) c
|
||||||
|
)
|
||||||
|
|
||||||
|
let run () =
|
||||||
|
let start = Unix.gettimeofday () in
|
||||||
|
|
||||||
|
let n_trains = 5 in
|
||||||
|
let len_train = 100 in
|
||||||
|
let n_events = 1_000 in
|
||||||
|
let range = 5 in
|
||||||
|
|
||||||
|
(* start trains *)
|
||||||
|
let trains =
|
||||||
|
List.init n_trains (fun _ ->
|
||||||
|
let c = Chan.create () in
|
||||||
|
let out = mk_train len_train c in
|
||||||
|
c, out)
|
||||||
|
in
|
||||||
|
|
||||||
|
let pushers =
|
||||||
|
List.map
|
||||||
|
(fun (ic, _oc) ->
|
||||||
|
Fut.spawn ~on:pool (fun () ->
|
||||||
|
for i = 1 to n_events do
|
||||||
|
Chan.push ic (E_int (i mod range))
|
||||||
|
done;
|
||||||
|
Chan.push ic E_close))
|
||||||
|
trains
|
||||||
|
in
|
||||||
|
|
||||||
|
let gatherers =
|
||||||
|
List.map
|
||||||
|
(fun (_ic, oc) ->
|
||||||
|
let sum = ref 0 in
|
||||||
|
try
|
||||||
|
while true do
|
||||||
|
match Chan.pop_block_exn oc with
|
||||||
|
| E_close -> raise Exit
|
||||||
|
| E_int x -> sum := !sum + x
|
||||||
|
done;
|
||||||
|
assert false
|
||||||
|
with Exit -> !sum)
|
||||||
|
trains
|
||||||
|
in
|
||||||
|
|
||||||
|
Fut.wait_block_exn (Fut.wait_list pushers);
|
||||||
|
|
||||||
|
let expected_sum =
|
||||||
|
let sum = ref 0 in
|
||||||
|
for i = 1 to n_events do
|
||||||
|
sum := !sum + (i mod range)
|
||||||
|
done;
|
||||||
|
!sum
|
||||||
|
in
|
||||||
|
|
||||||
|
Printf.printf "got %d events in %d trains (len=%d) in %.2fs\n%!" n_events
|
||||||
|
n_trains len_train
|
||||||
|
(Unix.gettimeofday () -. start);
|
||||||
|
|
||||||
|
assert (gatherers = List.init n_trains (fun _ -> expected_sum));
|
||||||
|
()
|
||||||
|
|
||||||
|
let () = run ()
|
||||||
Loading…
Add table
Reference in a new issue