From b0a29618e73e416cb9e4fc2a1ba30cd4a8d2a8f1 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 2 May 2025 13:48:54 -0400 Subject: [PATCH] test: better tracing and scalability for echo client/server --- tests/posix/echo/README.md | 6 ++ tests/posix/echo/dune | 2 +- tests/posix/echo/echo_client.ml | 111 +++++++++++++++++++++----------- tests/posix/echo/echo_server.ml | 59 ++++++++++++++--- 4 files changed, 131 insertions(+), 47 deletions(-) create mode 100644 tests/posix/echo/README.md diff --git a/tests/posix/echo/README.md b/tests/posix/echo/README.md new file mode 100644 index 0000000..0fdef34 --- /dev/null +++ b/tests/posix/echo/README.md @@ -0,0 +1,6 @@ + +notes about system limits in Linux: +- `ulimit -n 100000` will raise the max number of FDs for a process to 100000 +- `/proc/sys/net/core/netdev_max_backlog` controls the kernel backlog size, raise it (default is 1000) +- `/proc/sys/net/core/somaxconn` is the max size of a socket backlog (as given to `listen()`), raise it (default is 4096) + diff --git a/tests/posix/echo/dune b/tests/posix/echo/dune index 3c94762..21b26c2 100644 --- a/tests/posix/echo/dune +++ b/tests/posix/echo/dune @@ -1,4 +1,4 @@ (executables (names echo_server echo_client) - (libraries moonpool moonpool.fib nanoev.picos nanoev-posix iostream + (libraries moonpool moonpool.fib nanoev-picos nanoev-posix iostream trace.core trace-tef)) diff --git a/tests/posix/echo/echo_client.ml b/tests/posix/echo/echo_client.ml index 5c8db61..859dec1 100644 --- a/tests/posix/echo/echo_client.ml +++ b/tests/posix/echo/echo_client.ml @@ -1,6 +1,7 @@ module Trace = Trace_core module F = Moonpool_fib module IO = Nanoev_picos +module Sem = Picos_std_sync.Semaphore.Counting [@@@ocaml.alert "-deprecated"] @@ -8,8 +9,12 @@ let ( let@ ) = ( @@ ) let spf = Printf.sprintf let pf = Printf.printf let verbose = ref false +let reset_line = "\x1b[2K\r" +let n_loops_per_task = 100 + +let main ~runner:_ ~port ~unix_sock ~n ~n_conn () = + Sys.set_signal Sys.sigpipe Sys.Signal_ignore; -let main ~port ~unix_sock ~n ~n_conn () = pf "connect on %s n=%d n_conn=%d\n%!" (if unix_sock = "" then spf "localhost:%d" port @@ -24,57 +29,91 @@ let main ~port ~unix_sock ~n ~n_conn () = Unix.ADDR_UNIX unix_sock in - let remaining = Atomic.make n in - let all_done = Atomic.make 0 in - Printf.printf "connecting to port %d\n%!" port; - let rec run_task () = - let n = Atomic.fetch_and_add remaining (-1) in + let all_done = Atomic.make false in + let n_queries = Atomic.make 0 in + + (* limit simultaneous number of connections *) + let sem = Sem.make n_conn in + let n_active_conns = Atomic.make 0 in + + let progress_loop () = + while not (Atomic.get all_done) do + let n_queries = Atomic.get n_queries in + let n_conns = Atomic.get n_active_conns in + + (* progress *) + Printf.printf "%sdone %d queries, %d active connections%!" reset_line + n_queries n_conns; + + Trace.counter_int ~level:Info "n-conns" n_conns; + Trace.counter_int ~level:Info "n-queries" n_queries; + let gc = Gc.quick_stat () in + Trace.counter_int ~level:Info "gc.major" gc.major_collections; + Trace.counter_int ~level:Info "gc.minor" gc.minor_collections; + Trace.counter_int ~level:Info "gc.heap-size" (gc.heap_words * 64); + + Thread.delay 0.2 + done + in + + ignore (Thread.create progress_loop () : Thread.t); + + let run_task () = let _task_sp = Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "run-task" - ~data:(fun () -> [ "n", `Int n ]) in - if n > 0 then ( - ( (* let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "connect.client" in *) - IO.Net_client.with_connect addr - @@ fun ic oc -> - let buf = Bytes.create 32 in + Sem.acquire sem; + ( IO.Net_client.with_connect addr @@ fun ic oc -> + Atomic.incr n_active_conns; + let buf = Bytes.create 32 in - for _j = 1 to 100 do - let _sp = - Trace.enter_manual_sub_span ~parent:(Trace.ctx_of_span _task_sp) - ~__FILE__ ~__LINE__ "write.loop" ~data:(fun () -> - [ "iter", `Int _j ]) - in - Iostream.Out.output_string oc "hello"; - Iostream.Out_buf.flush oc; + for _j = 1 to n_loops_per_task do + (*let _sp = + Trace.enter_manual_sub_span ~parent:_task_sp ~__FILE__ ~__LINE__ + "write.loop" ~data:(fun () -> [ "iter", `Int _j ]) + in*) + Atomic.incr n_queries; - (* read back what we wrote *) - Iostream.In.really_input ic buf 0 (String.length "hello"); - Trace.exit_manual_span _sp; - F.yield () - done ); + Iostream.Out.output_string oc "hello"; + Iostream.Out_buf.flush oc; + + (* read back what we wrote *) + Iostream.In.really_input ic buf 0 (String.length "hello"); + (* Trace.exit_manual_span _sp; *) + F.yield () + done; + + Atomic.decr n_active_conns; + Sem.release sem ); - (* run another task *) - F.spawn_ignore run_task - ) else ( - (* if we're the last to exit, resolve the promise *) - let n_already_done = Atomic.fetch_and_add all_done 1 in - if n_already_done = n_conn - 1 then Printf.printf "all done\n%!" - ); Trace.exit_manual_span _task_sp in + let t_start = Mtime_clock.now () in + (* start the first [n_conn] tasks *) - let fibers = List.init n_conn (fun _ -> F.spawn run_task) in + let fibers = List.init (n * n_conn) (fun _ -> F.spawn run_task) in List.iter F.await fibers; + Atomic.set all_done true; + + let t_stop = Mtime_clock.now () in + let elapsed_s = + (Mtime.span t_start t_stop |> Mtime.Span.to_uint64_ns |> Int64.to_float) + *. 1e-9 + in (* exit when [fut_exit] is resolved *) - Printf.printf "done with main\n%!" + Printf.printf + "%sdone with main (time=%.4fs, n queries=%d, expect=%d, %.3f req/s)\n%!" + reset_line elapsed_s (Atomic.get n_queries) + (n * n_conn * n_loops_per_task) + (float (Atomic.get n_queries) /. elapsed_s) let () = let@ () = Trace_tef.with_setup () in + Trace.set_current_level Info; Trace.set_thread_name "main"; let port = ref 1234 in @@ -96,5 +135,5 @@ let () = let@ () = Nanoev_picos.Background_thread.with_setup (Nanoev_posix.create ()) in - F.main @@ fun _runner -> - main ~port:!port ~unix_sock:!unix_sock ~n:!n ~n_conn:!n_conn () + F.main @@ fun runner -> + main ~runner ~port:!port ~unix_sock:!unix_sock ~n:!n ~n_conn:!n_conn () diff --git a/tests/posix/echo/echo_server.ml b/tests/posix/echo/echo_server.ml index 67405f4..7ed110b 100644 --- a/tests/posix/echo/echo_server.ml +++ b/tests/posix/echo/echo_server.ml @@ -8,13 +8,16 @@ let ( let@ ) = ( @@ ) let pf = Printf.printf let spf = Printf.sprintf let verbose = ref false +let n_reply_response = Atomic.make 0 let str_of_sockaddr = function | Unix.ADDR_UNIX s -> s | Unix.ADDR_INET (addr, port) -> spf "%s:%d" (Unix.string_of_inet_addr addr) port -let main ~port ~unix_sock ~runner () = +let main ~port ~unix_sock ~max_conns ~runner () = + Sys.set_signal Sys.sigpipe Sys.Signal_ignore; + pf "serve on %s\n%!" (if unix_sock = "" then spf "localhost:%d" port @@ -24,12 +27,15 @@ let main ~port ~unix_sock ~runner () = let addr = if unix_sock = "" then Unix.ADDR_INET (Unix.inet_addr_loopback, port) - else + else ( + (* remove leftover unix socket file, if any *) + (try Sys.remove unix_sock with _ -> ()); Unix.ADDR_UNIX unix_sock + ) in let server = - IO.Net_server.establish addr + IO.Net_server.establish ?max_connections:max_conns addr ~spawn:(fun f -> Moonpool.spawn ~on:runner f) ~client_handler:(fun client_addr ic oc -> let _sp = @@ -42,31 +48,63 @@ let main ~port ~unix_sock ~runner () = let buf = Bytes.create 256 in let continue = ref true in while !continue do - let n = Iostream.In.input ic buf 0 (Bytes.length buf) in - if n = 0 then - continue := false - else ( + match Iostream.In.input ic buf 0 (Bytes.length buf) with + | exception exn -> + continue := false; + Printf.eprintf "error in client handler: %s\n%!" + (Printexc.to_string exn) + | 0 -> continue := false + | n -> + Atomic.incr n_reply_response; Iostream.Out.output oc buf 0 n; - Iostream.Out_buf.flush oc - ) + Iostream.Out_buf.flush oc; + Picos.Fiber.yield () done; Trace.exit_manual_span _sp; if !verbose then pf "done with client on %s\n%!" (str_of_sockaddr client_addr)) in + + Printf.printf "max number of connections: %d\n%!" + (IO.Net_server.max_connections server); + + if Trace.enabled () then + ignore + (Thread.create + (fun () -> + while IO.Net_server.running server do + Trace.counter_int ~level:Info "n-conns" + (IO.Net_server.n_active_connections server); + let gc = Gc.quick_stat () in + Trace.counter_int ~level:Info "gc.major" gc.major_collections; + Trace.counter_int ~level:Info "gc.minor" gc.minor_collections; + Trace.counter_int ~level:Info "n-reply-response" + (Atomic.get n_reply_response); + Trace.counter_int ~level:Info "gc.heap-size" (gc.heap_words * 64); + + Thread.delay 0.2 + done) + () + : Thread.t); + IO.Net_server.join server; IO.Net_server.shutdown server; print_endline "exit" let () = let@ () = Trace_tef.with_setup () in + Trace.set_current_level Info; let port = ref 1234 in let unix_sock = ref "" in + let max_conns = ref None in let opts = [ "-p", Arg.Set_int port, " port"; "--unix", Arg.Set_string unix_sock, " unix socket"; + ( "--max-conns", + Arg.Int (fun i -> max_conns := Some i), + " max number of connections" ); "-v", Arg.Set verbose, " verbose"; ] |> Arg.align @@ -76,4 +114,5 @@ let () = let@ () = Nanoev_picos.Background_thread.with_setup (Nanoev_posix.create ()) in - F.main @@ fun runner -> main ~port:!port ~unix_sock:!unix_sock ~runner () + F.main @@ fun runner -> + main ~port:!port ~unix_sock:!unix_sock ~max_conns:!max_conns ~runner ()