From 15b39c3541b20c594c9c59f62303f6ee14036a3f Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 14 Feb 2025 21:53:15 -0500 Subject: [PATCH] add tracing; improve echo --- examples/echo/dune | 6 +++--- examples/echo/echo.ml | 8 ++++++- src/picos/dune | 9 ++++---- src/picos/nanoev_picos.ml | 29 ++++++++++++++++++++----- src/tiny_httpd/nanoev_tiny_httpd.ml | 16 ++++++-------- src/unix/dune | 4 +++- src/unix/nanoev_unix.ml | 33 +++++++++++++++++++++++------ tests/unix/dune | 5 ++--- 8 files changed, 76 insertions(+), 34 deletions(-) diff --git a/examples/echo/dune b/examples/echo/dune index add5249..7fa6bea 100644 --- a/examples/echo/dune +++ b/examples/echo/dune @@ -1,4 +1,4 @@ - (executable - (name echo) - (libraries nanoev nanoev.unix moonpool nanoev_tiny_httpd)) + (name echo) + (libraries nanoev nanoev.unix moonpool moonpool.fib trace trace-tef + nanoev_tiny_httpd)) diff --git a/examples/echo/echo.ml b/examples/echo/echo.ml index 7ffae54..5cd6bd0 100644 --- a/examples/echo/echo.ml +++ b/examples/echo/echo.ml @@ -80,20 +80,26 @@ let setup_logging () = Logs.set_level ~all:true (Some Logs.Debug) let () = + let@ () = Trace_tef.with_setup () in + Trace.set_thread_name "main"; + let port_ = ref 8080 in let max_conn = ref 1024 in + let j = ref 8 in Arg.parse (Arg.align [ "--port", Arg.Set_int port_, " set port"; "-p", Arg.Set_int port_, " set port"; + "-j", Arg.Set_int j, " number of threads"; "--debug", Arg.Unit setup_logging, " enable debug"; "--max-conns", Arg.Set_int max_conn, " maximum concurrent connections"; ]) (fun _ -> raise (Arg.Bad "")) "echo [option]*"; - let@ pool = Moonpool.Ws_pool.with_ () in + let@ pool = Moonpool.Ws_pool.with_ ~num_threads:!j () in + let@ _runner = Moonpool_fib.main in let ev = Nanoev_unix.create () in Nanoev_picos.setup_bg_thread ev; diff --git a/src/picos/dune b/src/picos/dune index ca7ccf9..db9792d 100644 --- a/src/picos/dune +++ b/src/picos/dune @@ -1,6 +1,5 @@ - (library - (name nanoev_picos) - (public_name nanoev.picos) - (optional) ; picos - (libraries threads picos nanoev)) + (name nanoev_picos) + (public_name nanoev.picos) + (optional) ; picos + (libraries threads picos nanoev)) diff --git a/src/picos/nanoev_picos.ml b/src/picos/nanoev_picos.ml index 53c594a..45f5cc6 100644 --- a/src/picos/nanoev_picos.ml +++ b/src/picos/nanoev_picos.ml @@ -1,4 +1,8 @@ -let ( let@ ) = ( @@ ) +open struct + module Trace_ = Nanoev.Trace_ + + let ( let@ ) = ( @@ ) +end module Global_ = struct type st = @@ -23,6 +27,7 @@ module Global_ = struct x let bg_thread_ ~active ~evloop () : unit = + Trace_.set_thread_name "nanoev.picos.bg-thread"; while Atomic.get active do Nanoev.step evloop done @@ -31,12 +36,15 @@ module Global_ = struct let setup_bg_thread (ev : Nanoev.t) : unit = let@ () = with_lock lock in + (* shutdown existing thread, if any *) (match Atomic.get st with | Some st -> Atomic.set st.active false; Nanoev.wakeup_from_outside st.nanoev; Thread.join st.th | None -> ()); + + (* start new bg thread *) let active = Atomic.make true in Atomic.set st @@ Some @@ -61,11 +69,12 @@ let[@inline] unwrap_ = function let retry_read_ fd f = let ev = get_loop_exn_ () in - let rec loop () = + let[@unroll 1] rec loop () = match f () with | res -> res | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> + Trace_.message "read must wait"; let trigger = Picos.Trigger.create () in Nanoev.on_readable ev fd trigger () (fun trigger () -> Picos.Trigger.signal trigger); @@ -81,6 +90,7 @@ let retry_write_ fd f = | res -> res | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> + Trace_.message "write must wait"; let trigger = Picos.Trigger.create () in Nanoev.on_writable ev fd trigger () (fun trigger () -> Picos.Trigger.signal trigger); @@ -89,11 +99,20 @@ let retry_write_ fd f = in loop () -let read fd buf i len : int = retry_read_ fd (fun () -> Unix.read fd buf i len) -let accept fd = retry_read_ fd (fun () -> Unix.accept fd) +let read fd buf i len : int = + retry_read_ fd (fun () -> + Trace_.message "read"; + Unix.read fd buf i len) + +let accept fd = + retry_read_ fd (fun () -> + Trace_.message "accept"; + Unix.accept fd) let write fd buf i len : int = - retry_write_ fd (fun () -> Unix.write fd buf i len) + retry_write_ fd (fun () -> + Trace_.message "write"; + Unix.write fd buf i len) let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr) diff --git a/src/tiny_httpd/nanoev_tiny_httpd.ml b/src/tiny_httpd/nanoev_tiny_httpd.ml index 50b7f46..ffcb7d4 100644 --- a/src/tiny_httpd/nanoev_tiny_httpd.ml +++ b/src/tiny_httpd/nanoev_tiny_httpd.ml @@ -92,6 +92,7 @@ module In = struct class type t = In_buf.t + (* FIXME: closed should be atomic *) let of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) (fd : Unix.file_descr) : t = let eof = ref false in @@ -197,7 +198,7 @@ module Unix_tcp_server_ = struct TH.IO.TCP_server.stop = (fun () -> self.running <- false); running = (fun () -> self.running); active_connections = - (fun () -> Sem_.num_acquired self.sem_max_connections - 1); + (fun () -> Sem_.num_acquired self.sem_max_connections); endpoint = (fun () -> let addr, port = get_addr_ sock in @@ -207,7 +208,7 @@ module Unix_tcp_server_ = struct after_init tcp_server; (* how to handle a single client *) - let handle_client_unix_ (client_sock : Unix.file_descr) + let handle_client_ (client_sock : Unix.file_descr) (client_addr : Unix.sockaddr) : unit = Log.debug (fun k -> k "t[%d]: serving new client on %s" @@ -237,7 +238,7 @@ module Unix_tcp_server_ = struct Unix.set_nonblock sock; while self.running do - match Unix.accept sock with + match EV.accept sock with | client_sock, client_addr -> (* limit concurrency *) Sem_.acquire self.sem_max_connections; @@ -247,7 +248,7 @@ module Unix_tcp_server_ = struct ignore Unix.(sigprocmask SIG_BLOCK Sys.[ sigint; sighup ]); self.new_thread (fun () -> try - handle_client_unix_ client_sock client_addr; + handle_client_ client_sock client_addr; Log.debug (fun k -> k "t[%d]: done with client on %s, exiting" (Thread.id @@ Thread.self ()) @@ -269,14 +270,9 @@ module Unix_tcp_server_ = struct (Printexc.raw_backtrace_to_string bt))); if not Sys.win32 then ignore Unix.(sigprocmask SIG_UNBLOCK Sys.[ sigint; sighup ]) - | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) - -> - (* wait for the socket to be ready, and re-enter the loop *) - ignore (Unix.select [ sock ] [] [ sock ] 1.0 : _ * _ * _) | exception e -> Log.error (fun k -> - k "Unix.accept raised an exception: %s" (Printexc.to_string e)); - Thread.delay 0.01 + k "Unix.accept raised an exception: %s" (Printexc.to_string e)) done; (* Wait for all threads to be done: this only works if all threads are done. *) diff --git a/src/unix/dune b/src/unix/dune index 4f14854..c40d47f 100644 --- a/src/unix/dune +++ b/src/unix/dune @@ -2,4 +2,6 @@ (name nanoev_unix) (public_name nanoev.unix) (synopsis "Unix/select backend") - (libraries nanoev unix)) + (libraries + nanoev + unix)) diff --git a/src/unix/nanoev_unix.ml b/src/unix/nanoev_unix.ml index 538f4e3..769c590 100644 --- a/src/unix/nanoev_unix.ml +++ b/src/unix/nanoev_unix.ml @@ -1,5 +1,9 @@ -let ( let@ ) = ( @@ ) -let now_ : unit -> float = Unix.gettimeofday +open struct + module Trace_ = Nanoev.Trace_ + + let ( let@ ) = ( @@ ) + let now_ : unit -> float = Unix.gettimeofday +end (** Callback list *) type cbs = @@ -78,10 +82,12 @@ let clear (self : st) = () let wakeup_from_outside (self : st) : unit = - if not (Atomic.exchange self.wakeup_triggered true) then ( + if not (Atomic.exchange self.wakeup_triggered true) then + let@ _sp = + Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.wakeup-from-outside" + in let b = Bytes.make 1 '!' in ignore (Unix.write self.wakeup_wr b 0 1 : int) - ) let get_fd_ (self : st) fd : per_fd = match Hashtbl.find self.fds fd with @@ -92,6 +98,7 @@ let get_fd_ (self : st) fd : per_fd = per_fd let on_readable self fd x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in let@ self = with_lock_ self in let per_fd = get_fd_ self fd in per_fd.r <- Sub (x, y, f, per_fd.r); @@ -99,6 +106,7 @@ let on_readable self fd x y f : unit = if Atomic.get self.in_select then wakeup_from_outside self let on_writable self fd x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in let@ self = with_lock_ self in let per_fd = get_fd_ self fd in per_fd.w <- Sub (x, y, f, per_fd.w); @@ -106,6 +114,7 @@ let on_writable self fd x y f : unit = if Atomic.get self.in_select then wakeup_from_outside self let run_after_s self time x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.run-after-s" in let@ self = with_lock_ self in let deadline = now_ () +. time in Heap.insert self.timer (Timer { deadline; x; y; f }); @@ -113,6 +122,7 @@ let run_after_s self time x y f : unit = let recompute_if_needed (self : st) = if not self.sub_up_to_date then ( + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "recompute-if-needed" in self.sub_up_to_date <- true; self.sub_r <- []; self.sub_w <- []; @@ -137,6 +147,7 @@ let rec perform_cbs = function perform_cbs tail let step (self : st) : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.unix.step" in (* gather the subscriptions and timeout *) let timeout, sub_r, sub_w = let@ self = with_lock_ self in @@ -152,6 +163,14 @@ let step (self : st) : unit = (* enter [select] *) Atomic.set self.in_select true; let r_reads, r_writes, _ = + let@ _sp = + Trace_.with_span ~__FILE__ ~__LINE__ "select" ~data:(fun () -> + [ + "timeout", `Float timeout; + "reads", `Int (List.length sub_r); + "writes", `Int (List.length sub_w); + ]) + in Unix.select (self.wakeup_rd :: sub_r) sub_w [] timeout in Atomic.set self.in_select false; @@ -174,8 +193,10 @@ let step (self : st) : unit = List.iter (fun fd -> - let per_fd = Hashtbl.find self.fds fd in - ready_r := per_fd :: !ready_r) + if fd != self.wakeup_rd then ( + let per_fd = Hashtbl.find self.fds fd in + ready_r := per_fd :: !ready_r + )) r_reads; List.iter (fun fd -> diff --git a/tests/unix/dune b/tests/unix/dune index 8679c90..3ee6379 100644 --- a/tests/unix/dune +++ b/tests/unix/dune @@ -1,4 +1,3 @@ - (tests - (names t1) - (libraries nanoev nanoev.unix threads)) + (names t1) + (libraries nanoev nanoev.unix threads))