add tracing; improve echo

This commit is contained in:
Simon Cruanes 2025-02-14 21:53:15 -05:00
parent ce7ed336c2
commit 15b39c3541
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
8 changed files with 76 additions and 34 deletions

View file

@ -1,4 +1,4 @@
(executable
(name echo)
(libraries nanoev nanoev.unix moonpool nanoev_tiny_httpd))
(libraries nanoev nanoev.unix moonpool moonpool.fib trace trace-tef
nanoev_tiny_httpd))

View file

@ -80,20 +80,26 @@ let setup_logging () =
Logs.set_level ~all:true (Some Logs.Debug)
let () =
let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "main";
let port_ = ref 8080 in
let max_conn = ref 1024 in
let j = ref 8 in
Arg.parse
(Arg.align
[
"--port", Arg.Set_int port_, " set port";
"-p", Arg.Set_int port_, " set port";
"-j", Arg.Set_int j, " number of threads";
"--debug", Arg.Unit setup_logging, " enable debug";
"--max-conns", Arg.Set_int max_conn, " maximum concurrent connections";
])
(fun _ -> raise (Arg.Bad ""))
"echo [option]*";
let@ pool = Moonpool.Ws_pool.with_ () in
let@ pool = Moonpool.Ws_pool.with_ ~num_threads:!j () in
let@ _runner = Moonpool_fib.main in
let ev = Nanoev_unix.create () in
Nanoev_picos.setup_bg_thread ev;

View file

@ -1,4 +1,3 @@
(library
(name nanoev_picos)
(public_name nanoev.picos)

View file

@ -1,4 +1,8 @@
open struct
module Trace_ = Nanoev.Trace_
let ( let@ ) = ( @@ )
end
module Global_ = struct
type st =
@ -23,6 +27,7 @@ module Global_ = struct
x
let bg_thread_ ~active ~evloop () : unit =
Trace_.set_thread_name "nanoev.picos.bg-thread";
while Atomic.get active do
Nanoev.step evloop
done
@ -31,12 +36,15 @@ module Global_ = struct
let setup_bg_thread (ev : Nanoev.t) : unit =
let@ () = with_lock lock in
(* shutdown existing thread, if any *)
(match Atomic.get st with
| Some st ->
Atomic.set st.active false;
Nanoev.wakeup_from_outside st.nanoev;
Thread.join st.th
| None -> ());
(* start new bg thread *)
let active = Atomic.make true in
Atomic.set st
@@ Some
@ -61,11 +69,12 @@ let[@inline] unwrap_ = function
let retry_read_ fd f =
let ev = get_loop_exn_ () in
let rec loop () =
let[@unroll 1] rec loop () =
match f () with
| res -> res
| exception
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
Trace_.message "read must wait";
let trigger = Picos.Trigger.create () in
Nanoev.on_readable ev fd trigger () (fun trigger () ->
Picos.Trigger.signal trigger);
@ -81,6 +90,7 @@ let retry_write_ fd f =
| res -> res
| exception
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
Trace_.message "write must wait";
let trigger = Picos.Trigger.create () in
Nanoev.on_writable ev fd trigger () (fun trigger () ->
Picos.Trigger.signal trigger);
@ -89,11 +99,20 @@ let retry_write_ fd f =
in
loop ()
let read fd buf i len : int = retry_read_ fd (fun () -> Unix.read fd buf i len)
let accept fd = retry_read_ fd (fun () -> Unix.accept fd)
let read fd buf i len : int =
retry_read_ fd (fun () ->
Trace_.message "read";
Unix.read fd buf i len)
let accept fd =
retry_read_ fd (fun () ->
Trace_.message "accept";
Unix.accept fd)
let write fd buf i len : int =
retry_write_ fd (fun () -> Unix.write fd buf i len)
retry_write_ fd (fun () ->
Trace_.message "write";
Unix.write fd buf i len)
let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr)

View file

@ -92,6 +92,7 @@ module In = struct
class type t = In_buf.t
(* FIXME: closed should be atomic *)
let of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t)
(fd : Unix.file_descr) : t =
let eof = ref false in
@ -197,7 +198,7 @@ module Unix_tcp_server_ = struct
TH.IO.TCP_server.stop = (fun () -> self.running <- false);
running = (fun () -> self.running);
active_connections =
(fun () -> Sem_.num_acquired self.sem_max_connections - 1);
(fun () -> Sem_.num_acquired self.sem_max_connections);
endpoint =
(fun () ->
let addr, port = get_addr_ sock in
@ -207,7 +208,7 @@ module Unix_tcp_server_ = struct
after_init tcp_server;
(* how to handle a single client *)
let handle_client_unix_ (client_sock : Unix.file_descr)
let handle_client_ (client_sock : Unix.file_descr)
(client_addr : Unix.sockaddr) : unit =
Log.debug (fun k ->
k "t[%d]: serving new client on %s"
@ -237,7 +238,7 @@ module Unix_tcp_server_ = struct
Unix.set_nonblock sock;
while self.running do
match Unix.accept sock with
match EV.accept sock with
| client_sock, client_addr ->
(* limit concurrency *)
Sem_.acquire self.sem_max_connections;
@ -247,7 +248,7 @@ module Unix_tcp_server_ = struct
ignore Unix.(sigprocmask SIG_BLOCK Sys.[ sigint; sighup ]);
self.new_thread (fun () ->
try
handle_client_unix_ client_sock client_addr;
handle_client_ client_sock client_addr;
Log.debug (fun k ->
k "t[%d]: done with client on %s, exiting"
(Thread.id @@ Thread.self ())
@ -269,14 +270,9 @@ module Unix_tcp_server_ = struct
(Printexc.raw_backtrace_to_string bt)));
if not Sys.win32 then
ignore Unix.(sigprocmask SIG_UNBLOCK Sys.[ sigint; sighup ])
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _)
->
(* wait for the socket to be ready, and re-enter the loop *)
ignore (Unix.select [ sock ] [] [ sock ] 1.0 : _ * _ * _)
| exception e ->
Log.error (fun k ->
k "Unix.accept raised an exception: %s" (Printexc.to_string e));
Thread.delay 0.01
k "Unix.accept raised an exception: %s" (Printexc.to_string e))
done;
(* Wait for all threads to be done: this only works if all threads are done. *)

View file

@ -2,4 +2,6 @@
(name nanoev_unix)
(public_name nanoev.unix)
(synopsis "Unix/select backend")
(libraries nanoev unix))
(libraries
nanoev
unix))

View file

@ -1,5 +1,9 @@
open struct
module Trace_ = Nanoev.Trace_
let ( let@ ) = ( @@ )
let now_ : unit -> float = Unix.gettimeofday
end
(** Callback list *)
type cbs =
@ -78,10 +82,12 @@ let clear (self : st) =
()
let wakeup_from_outside (self : st) : unit =
if not (Atomic.exchange self.wakeup_triggered true) then (
if not (Atomic.exchange self.wakeup_triggered true) then
let@ _sp =
Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.wakeup-from-outside"
in
let b = Bytes.make 1 '!' in
ignore (Unix.write self.wakeup_wr b 0 1 : int)
)
let get_fd_ (self : st) fd : per_fd =
match Hashtbl.find self.fds fd with
@ -92,6 +98,7 @@ let get_fd_ (self : st) fd : per_fd =
per_fd
let on_readable self fd x y f : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in
let@ self = with_lock_ self in
let per_fd = get_fd_ self fd in
per_fd.r <- Sub (x, y, f, per_fd.r);
@ -99,6 +106,7 @@ let on_readable self fd x y f : unit =
if Atomic.get self.in_select then wakeup_from_outside self
let on_writable self fd x y f : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in
let@ self = with_lock_ self in
let per_fd = get_fd_ self fd in
per_fd.w <- Sub (x, y, f, per_fd.w);
@ -106,6 +114,7 @@ let on_writable self fd x y f : unit =
if Atomic.get self.in_select then wakeup_from_outside self
let run_after_s self time x y f : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.run-after-s" in
let@ self = with_lock_ self in
let deadline = now_ () +. time in
Heap.insert self.timer (Timer { deadline; x; y; f });
@ -113,6 +122,7 @@ let run_after_s self time x y f : unit =
let recompute_if_needed (self : st) =
if not self.sub_up_to_date then (
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "recompute-if-needed" in
self.sub_up_to_date <- true;
self.sub_r <- [];
self.sub_w <- [];
@ -137,6 +147,7 @@ let rec perform_cbs = function
perform_cbs tail
let step (self : st) : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.unix.step" in
(* gather the subscriptions and timeout *)
let timeout, sub_r, sub_w =
let@ self = with_lock_ self in
@ -152,6 +163,14 @@ let step (self : st) : unit =
(* enter [select] *)
Atomic.set self.in_select true;
let r_reads, r_writes, _ =
let@ _sp =
Trace_.with_span ~__FILE__ ~__LINE__ "select" ~data:(fun () ->
[
"timeout", `Float timeout;
"reads", `Int (List.length sub_r);
"writes", `Int (List.length sub_w);
])
in
Unix.select (self.wakeup_rd :: sub_r) sub_w [] timeout
in
Atomic.set self.in_select false;
@ -174,8 +193,10 @@ let step (self : st) : unit =
List.iter
(fun fd ->
if fd != self.wakeup_rd then (
let per_fd = Hashtbl.find self.fds fd in
ready_r := per_fd :: !ready_r)
ready_r := per_fd :: !ready_r
))
r_reads;
List.iter
(fun fd ->

View file

@ -1,4 +1,3 @@
(tests
(names t1)
(libraries nanoev nanoev.unix threads))