From eab774813d61d3f60e7400e339e391b939d5d1bd Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 19 Jun 2023 20:58:28 -0400 Subject: [PATCH] test: add load test for chans --- test/dune | 2 +- test/t_chan_train.ml | 95 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 test/t_chan_train.ml diff --git a/test/dune b/test/dune index 5b210cad..50a73317 100644 --- a/test/dune +++ b/test/dune @@ -1,5 +1,5 @@ (tests - (names t_fib t_bench1 t_fib_rec t_futs1 t_tree_futs t_props) + (names t_fib t_bench1 t_fib_rec t_futs1 t_tree_futs t_props t_chan_train) (libraries moonpool qcheck-core qcheck-core.runner ;tracy-client.trace trace)) diff --git a/test/t_chan_train.ml b/test/t_chan_train.ml new file mode 100644 index 00000000..90a241c3 --- /dev/null +++ b/test/t_chan_train.ml @@ -0,0 +1,95 @@ +open Moonpool + +(* large pool, some of our tasks below are long lived *) +let pool = Pool.create ~min:30 () + +open (val Fut.infix pool) + +type event = + | E_int of int + | E_close + +let mk_chan (ic : event Chan.t) : event Chan.t = + let out = Chan.create () in + + let rec loop () = + let* ev = Chan.pop ic in + Chan.push out ev; + match ev with + | E_close -> Fut.return () + | E_int _x -> loop () + in + + ignore (Fut.spawn ~on:pool loop : _ Fut.t); + out + +(* a train of channels connected to one another, with a + loop pushing events from the input to the output *) +let rec mk_train n ic : _ Chan.t = + if n = 0 then + ic + else ( + let c = mk_chan ic in + mk_train (n - 1) c + ) + +let run () = + let start = Unix.gettimeofday () in + + let n_trains = 5 in + let len_train = 100 in + let n_events = 1_000 in + let range = 5 in + + (* start trains *) + let trains = + List.init n_trains (fun _ -> + let c = Chan.create () in + let out = mk_train len_train c in + c, out) + in + + let pushers = + List.map + (fun (ic, _oc) -> + Fut.spawn ~on:pool (fun () -> + for i = 1 to n_events do + Chan.push ic (E_int (i mod range)) + done; + Chan.push ic E_close)) + trains + in + + let gatherers = + List.map + (fun (_ic, oc) -> + let sum = ref 0 in + try + while true do + match Chan.pop_block_exn oc with + | E_close -> raise Exit + | E_int x -> sum := !sum + x + done; + assert false + with Exit -> !sum) + trains + in + + Fut.wait_block_exn (Fut.wait_list pushers); + + let expected_sum = + let sum = ref 0 in + for i = 1 to n_events do + sum := !sum + (i mod range) + done; + !sum + in + + Printf.printf "got %d events in %d trains (len=%d) in %.2fs\n%!" n_events + n_trains len_train + (Unix.gettimeofday () -. start); + + assert (gatherers = List.init n_trains (fun _ -> expected_sum)); + () + +let () = run ()