From 4ff45df7e776cb81c9b25daaf0fcb8a2e175efc3 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 8 Feb 2024 23:05:30 -0500 Subject: [PATCH] basic echo client for moonpool-lwt --- test/lwt/dune | 2 +- test/lwt/echo_client.ml | 76 +++++++++++++++++++++++++++++++++++++++++ test/lwt/echo_server.ml | 2 +- 3 files changed, 78 insertions(+), 2 deletions(-) create mode 100644 test/lwt/echo_client.ml diff --git a/test/lwt/dune b/test/lwt/dune index 899cfaaf..112d0e35 100644 --- a/test/lwt/dune +++ b/test/lwt/dune @@ -1,3 +1,3 @@ (executables - (names echo_server) + (names echo_server echo_client) (libraries moonpool moonpool-lwt lwt lwt.unix trace.core trace-tef)) diff --git a/test/lwt/echo_client.ml b/test/lwt/echo_client.ml new file mode 100644 index 00000000..20615bef --- /dev/null +++ b/test/lwt/echo_client.ml @@ -0,0 +1,76 @@ +module M = Moonpool +module M_lwt = Moonpool_lwt +module Trace = Trace_core + +let ( let@ ) = ( @@ ) + +let main ~port ~runner ~n ~n_conn () : unit Lwt.t = + let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in + + let remaining = Atomic.make n in + let all_done = Atomic.make 0 in + + let fut_exit, prom_exit = M.Fut.make () in + + Printf.printf "connecting to port %d\n%!" port; + let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in + + let rec run_task () = + (* Printf.printf "running task\n%!"; *) + let n = Atomic.fetch_and_add remaining (-1) in + if n > 0 then ( + ( (* let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "connect.client" in *) + M_lwt.TCP_client.with_connect addr + @@ fun ic oc -> + let buf = Bytes.create 32 in + + for _j = 1 to 100 do + let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "write.loop" in + M_lwt.IO_out.output_string oc "hello"; + + (* read back something *) + let _n = M_lwt.IO_in.really_input ic buf in + () + done ); + + (* run another task *) M.Runner.run_async runner run_task + ) else ( + (* if we're the last to exit, resolve the promise *) + let n_already_done = Atomic.fetch_and_add all_done 1 in + if n_already_done = n_conn - 1 then ( + Printf.printf "all done\n%!"; + M.Fut.fulfill prom_exit @@ Ok () + ) + ) + in + + (* start the first [n_conn] tasks *) + for _i = 1 to n_conn do + M.Runner.run_async runner run_task + done; + + (* exit when [fut_exit] is resolved *) + M_lwt.lwt_of_fut fut_exit + +let () = + let@ () = Trace_tef.with_setup () in + Trace.set_thread_name "main"; + let port = ref 0 in + let j = ref 4 in + let n_conn = ref 100 in + let n = ref 50_000 in + + let opts = + [ + "-p", Arg.Set_int port, " port"; + "j", Arg.Set_int j, " number of threads"; + "-n", Arg.Set_int n, " total number of connections"; + "--n-conn", Arg.Set_int n_conn, " number of parallel connections"; + ] + |> Arg.align + in + Arg.parse opts ignore "echo client"; + + let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in + Lwt_engine.set @@ new Lwt_engine.libev (); + Lwt_main.run @@ main ~runner ~port:!port ~n:!n ~n_conn:!n_conn () diff --git a/test/lwt/echo_server.ml b/test/lwt/echo_server.ml index a1ba88aa..7d95fa2a 100644 --- a/test/lwt/echo_server.ml +++ b/test/lwt/echo_server.ml @@ -55,7 +55,7 @@ let () = let opts = [ - "-p", Arg.Set_int port, " port"; "j", Arg.Set_int j, " number of threads"; + "-p", Arg.Set_int port, " port"; "-j", Arg.Set_int j, " number of threads"; ] |> Arg.align in