mirror of
https://github.com/c-cube/nanoev.git
synced 2025-12-06 11:15:48 -05:00
test: better tracing and scalability for echo client/server
This commit is contained in:
parent
74f87af96c
commit
b0a29618e7
4 changed files with 131 additions and 47 deletions
6
tests/posix/echo/README.md
Normal file
6
tests/posix/echo/README.md
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
|
||||
notes about system limits in Linux:
|
||||
- `ulimit -n 100000` will raise the max number of FDs for a process to 100000
|
||||
- `/proc/sys/net/core/netdev_max_backlog` controls the kernel backlog size, raise it (default is 1000)
|
||||
- `/proc/sys/net/core/somaxconn` is the max size of a socket backlog (as given to `listen()`), raise it (default is 4096)
|
||||
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
(executables
|
||||
(names echo_server echo_client)
|
||||
(libraries moonpool moonpool.fib nanoev.picos nanoev-posix iostream
|
||||
(libraries moonpool moonpool.fib nanoev-picos nanoev-posix iostream
|
||||
trace.core trace-tef))
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
module Trace = Trace_core
|
||||
module F = Moonpool_fib
|
||||
module IO = Nanoev_picos
|
||||
module Sem = Picos_std_sync.Semaphore.Counting
|
||||
|
||||
[@@@ocaml.alert "-deprecated"]
|
||||
|
||||
|
|
@ -8,8 +9,12 @@ let ( let@ ) = ( @@ )
|
|||
let spf = Printf.sprintf
|
||||
let pf = Printf.printf
|
||||
let verbose = ref false
|
||||
let reset_line = "\x1b[2K\r"
|
||||
let n_loops_per_task = 100
|
||||
|
||||
let main ~runner:_ ~port ~unix_sock ~n ~n_conn () =
|
||||
Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
|
||||
|
||||
let main ~port ~unix_sock ~n ~n_conn () =
|
||||
pf "connect on %s n=%d n_conn=%d\n%!"
|
||||
(if unix_sock = "" then
|
||||
spf "localhost:%d" port
|
||||
|
|
@ -24,57 +29,91 @@ let main ~port ~unix_sock ~n ~n_conn () =
|
|||
Unix.ADDR_UNIX unix_sock
|
||||
in
|
||||
|
||||
let remaining = Atomic.make n in
|
||||
let all_done = Atomic.make 0 in
|
||||
|
||||
Printf.printf "connecting to port %d\n%!" port;
|
||||
|
||||
let rec run_task () =
|
||||
let n = Atomic.fetch_and_add remaining (-1) in
|
||||
let all_done = Atomic.make false in
|
||||
let n_queries = Atomic.make 0 in
|
||||
|
||||
(* limit simultaneous number of connections *)
|
||||
let sem = Sem.make n_conn in
|
||||
let n_active_conns = Atomic.make 0 in
|
||||
|
||||
let progress_loop () =
|
||||
while not (Atomic.get all_done) do
|
||||
let n_queries = Atomic.get n_queries in
|
||||
let n_conns = Atomic.get n_active_conns in
|
||||
|
||||
(* progress *)
|
||||
Printf.printf "%sdone %d queries, %d active connections%!" reset_line
|
||||
n_queries n_conns;
|
||||
|
||||
Trace.counter_int ~level:Info "n-conns" n_conns;
|
||||
Trace.counter_int ~level:Info "n-queries" n_queries;
|
||||
let gc = Gc.quick_stat () in
|
||||
Trace.counter_int ~level:Info "gc.major" gc.major_collections;
|
||||
Trace.counter_int ~level:Info "gc.minor" gc.minor_collections;
|
||||
Trace.counter_int ~level:Info "gc.heap-size" (gc.heap_words * 64);
|
||||
|
||||
Thread.delay 0.2
|
||||
done
|
||||
in
|
||||
|
||||
ignore (Thread.create progress_loop () : Thread.t);
|
||||
|
||||
let run_task () =
|
||||
let _task_sp =
|
||||
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "run-task"
|
||||
~data:(fun () -> [ "n", `Int n ])
|
||||
in
|
||||
if n > 0 then (
|
||||
( (* let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "connect.client" in *)
|
||||
IO.Net_client.with_connect addr
|
||||
@@ fun ic oc ->
|
||||
Sem.acquire sem;
|
||||
( IO.Net_client.with_connect addr @@ fun ic oc ->
|
||||
Atomic.incr n_active_conns;
|
||||
let buf = Bytes.create 32 in
|
||||
|
||||
for _j = 1 to 100 do
|
||||
let _sp =
|
||||
Trace.enter_manual_sub_span ~parent:(Trace.ctx_of_span _task_sp)
|
||||
~__FILE__ ~__LINE__ "write.loop" ~data:(fun () ->
|
||||
[ "iter", `Int _j ])
|
||||
in
|
||||
for _j = 1 to n_loops_per_task do
|
||||
(*let _sp =
|
||||
Trace.enter_manual_sub_span ~parent:_task_sp ~__FILE__ ~__LINE__
|
||||
"write.loop" ~data:(fun () -> [ "iter", `Int _j ])
|
||||
in*)
|
||||
Atomic.incr n_queries;
|
||||
|
||||
Iostream.Out.output_string oc "hello";
|
||||
Iostream.Out_buf.flush oc;
|
||||
|
||||
(* read back what we wrote *)
|
||||
Iostream.In.really_input ic buf 0 (String.length "hello");
|
||||
Trace.exit_manual_span _sp;
|
||||
(* Trace.exit_manual_span _sp; *)
|
||||
F.yield ()
|
||||
done );
|
||||
done;
|
||||
|
||||
Atomic.decr n_active_conns;
|
||||
Sem.release sem );
|
||||
|
||||
(* run another task *)
|
||||
F.spawn_ignore run_task
|
||||
) else (
|
||||
(* if we're the last to exit, resolve the promise *)
|
||||
let n_already_done = Atomic.fetch_and_add all_done 1 in
|
||||
if n_already_done = n_conn - 1 then Printf.printf "all done\n%!"
|
||||
);
|
||||
Trace.exit_manual_span _task_sp
|
||||
in
|
||||
|
||||
let t_start = Mtime_clock.now () in
|
||||
|
||||
(* start the first [n_conn] tasks *)
|
||||
let fibers = List.init n_conn (fun _ -> F.spawn run_task) in
|
||||
let fibers = List.init (n * n_conn) (fun _ -> F.spawn run_task) in
|
||||
List.iter F.await fibers;
|
||||
Atomic.set all_done true;
|
||||
|
||||
let t_stop = Mtime_clock.now () in
|
||||
let elapsed_s =
|
||||
(Mtime.span t_start t_stop |> Mtime.Span.to_uint64_ns |> Int64.to_float)
|
||||
*. 1e-9
|
||||
in
|
||||
|
||||
(* exit when [fut_exit] is resolved *)
|
||||
Printf.printf "done with main\n%!"
|
||||
Printf.printf
|
||||
"%sdone with main (time=%.4fs, n queries=%d, expect=%d, %.3f req/s)\n%!"
|
||||
reset_line elapsed_s (Atomic.get n_queries)
|
||||
(n * n_conn * n_loops_per_task)
|
||||
(float (Atomic.get n_queries) /. elapsed_s)
|
||||
|
||||
let () =
|
||||
let@ () = Trace_tef.with_setup () in
|
||||
Trace.set_current_level Info;
|
||||
Trace.set_thread_name "main";
|
||||
|
||||
let port = ref 1234 in
|
||||
|
|
@ -96,5 +135,5 @@ let () =
|
|||
let@ () =
|
||||
Nanoev_picos.Background_thread.with_setup (Nanoev_posix.create ())
|
||||
in
|
||||
F.main @@ fun _runner ->
|
||||
main ~port:!port ~unix_sock:!unix_sock ~n:!n ~n_conn:!n_conn ()
|
||||
F.main @@ fun runner ->
|
||||
main ~runner ~port:!port ~unix_sock:!unix_sock ~n:!n ~n_conn:!n_conn ()
|
||||
|
|
|
|||
|
|
@ -8,13 +8,16 @@ let ( let@ ) = ( @@ )
|
|||
let pf = Printf.printf
|
||||
let spf = Printf.sprintf
|
||||
let verbose = ref false
|
||||
let n_reply_response = Atomic.make 0
|
||||
|
||||
let str_of_sockaddr = function
|
||||
| Unix.ADDR_UNIX s -> s
|
||||
| Unix.ADDR_INET (addr, port) ->
|
||||
spf "%s:%d" (Unix.string_of_inet_addr addr) port
|
||||
|
||||
let main ~port ~unix_sock ~runner () =
|
||||
let main ~port ~unix_sock ~max_conns ~runner () =
|
||||
Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
|
||||
|
||||
pf "serve on %s\n%!"
|
||||
(if unix_sock = "" then
|
||||
spf "localhost:%d" port
|
||||
|
|
@ -24,12 +27,15 @@ let main ~port ~unix_sock ~runner () =
|
|||
let addr =
|
||||
if unix_sock = "" then
|
||||
Unix.ADDR_INET (Unix.inet_addr_loopback, port)
|
||||
else
|
||||
else (
|
||||
(* remove leftover unix socket file, if any *)
|
||||
(try Sys.remove unix_sock with _ -> ());
|
||||
Unix.ADDR_UNIX unix_sock
|
||||
)
|
||||
in
|
||||
|
||||
let server =
|
||||
IO.Net_server.establish addr
|
||||
IO.Net_server.establish ?max_connections:max_conns addr
|
||||
~spawn:(fun f -> Moonpool.spawn ~on:runner f)
|
||||
~client_handler:(fun client_addr ic oc ->
|
||||
let _sp =
|
||||
|
|
@ -42,31 +48,63 @@ let main ~port ~unix_sock ~runner () =
|
|||
let buf = Bytes.create 256 in
|
||||
let continue = ref true in
|
||||
while !continue do
|
||||
let n = Iostream.In.input ic buf 0 (Bytes.length buf) in
|
||||
if n = 0 then
|
||||
continue := false
|
||||
else (
|
||||
match Iostream.In.input ic buf 0 (Bytes.length buf) with
|
||||
| exception exn ->
|
||||
continue := false;
|
||||
Printf.eprintf "error in client handler: %s\n%!"
|
||||
(Printexc.to_string exn)
|
||||
| 0 -> continue := false
|
||||
| n ->
|
||||
Atomic.incr n_reply_response;
|
||||
Iostream.Out.output oc buf 0 n;
|
||||
Iostream.Out_buf.flush oc
|
||||
)
|
||||
Iostream.Out_buf.flush oc;
|
||||
Picos.Fiber.yield ()
|
||||
done;
|
||||
|
||||
Trace.exit_manual_span _sp;
|
||||
if !verbose then
|
||||
pf "done with client on %s\n%!" (str_of_sockaddr client_addr))
|
||||
in
|
||||
|
||||
Printf.printf "max number of connections: %d\n%!"
|
||||
(IO.Net_server.max_connections server);
|
||||
|
||||
if Trace.enabled () then
|
||||
ignore
|
||||
(Thread.create
|
||||
(fun () ->
|
||||
while IO.Net_server.running server do
|
||||
Trace.counter_int ~level:Info "n-conns"
|
||||
(IO.Net_server.n_active_connections server);
|
||||
let gc = Gc.quick_stat () in
|
||||
Trace.counter_int ~level:Info "gc.major" gc.major_collections;
|
||||
Trace.counter_int ~level:Info "gc.minor" gc.minor_collections;
|
||||
Trace.counter_int ~level:Info "n-reply-response"
|
||||
(Atomic.get n_reply_response);
|
||||
Trace.counter_int ~level:Info "gc.heap-size" (gc.heap_words * 64);
|
||||
|
||||
Thread.delay 0.2
|
||||
done)
|
||||
()
|
||||
: Thread.t);
|
||||
|
||||
IO.Net_server.join server;
|
||||
IO.Net_server.shutdown server;
|
||||
print_endline "exit"
|
||||
|
||||
let () =
|
||||
let@ () = Trace_tef.with_setup () in
|
||||
Trace.set_current_level Info;
|
||||
let port = ref 1234 in
|
||||
let unix_sock = ref "" in
|
||||
let max_conns = ref None in
|
||||
let opts =
|
||||
[
|
||||
"-p", Arg.Set_int port, " port";
|
||||
"--unix", Arg.Set_string unix_sock, " unix socket";
|
||||
( "--max-conns",
|
||||
Arg.Int (fun i -> max_conns := Some i),
|
||||
" max number of connections" );
|
||||
"-v", Arg.Set verbose, " verbose";
|
||||
]
|
||||
|> Arg.align
|
||||
|
|
@ -76,4 +114,5 @@ let () =
|
|||
let@ () =
|
||||
Nanoev_picos.Background_thread.with_setup (Nanoev_posix.create ())
|
||||
in
|
||||
F.main @@ fun runner -> main ~port:!port ~unix_sock:!unix_sock ~runner ()
|
||||
F.main @@ fun runner ->
|
||||
main ~port:!port ~unix_sock:!unix_sock ~max_conns:!max_conns ~runner ()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue