From 00229d652fb7a409353a2f69ea7b8f3a1de016c6 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 30 Apr 2025 16:12:44 -0400 Subject: [PATCH 01/28] ocamlformat --- .ocamlformat | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.ocamlformat b/.ocamlformat index 7818345..f33f722 100644 --- a/.ocamlformat +++ b/.ocamlformat @@ -1,4 +1,4 @@ -version = 0.26.2 +version = 0.27.0 profile=conventional margin=80 if-then-else=k-r From 295b3b5c24be521c09d8f63aa6c01880b290b64e Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 30 Apr 2025 16:12:49 -0400 Subject: [PATCH 02/28] wip: nanoev-posix, using mtime and iomux --- dune-project | 7 + src/posix/dune | 6 + src/posix/heap.ml | 61 ++++++++ src/posix/heap.mli | 13 ++ src/posix/nanoev_posix.ml | 291 +++++++++++++++++++++++++++++++++++++ src/posix/nanoev_posix.mli | 8 + src/unix/dune | 1 + 7 files changed, 387 insertions(+) create mode 100644 src/posix/dune create mode 100644 src/posix/heap.ml create mode 100644 src/posix/heap.mli create mode 100644 src/posix/nanoev_posix.ml create mode 100644 src/posix/nanoev_posix.mli diff --git a/dune-project b/dune-project index e0cd8bb..b5f2f69 100644 --- a/dune-project +++ b/dune-project @@ -26,6 +26,13 @@ (tags (unix select async))) +(package + (name nanoev-posix) + (synopsis "Use mtime+iomux (posix compliant) as a backend for nanoev") + (depends ocaml dune base-unix iomux (mtime (>= 2.0))) + (tags + (unix select async iomux nanoev))) + (package (name nanoev_tiny_httpd) (synopsis "Use nanoev as a basis for tiny_httpd") diff --git a/src/posix/dune b/src/posix/dune new file mode 100644 index 0000000..e7147f2 --- /dev/null +++ b/src/posix/dune @@ -0,0 +1,6 @@ +(library + (name nanoev_posix) + (public_name nanoev-posix) + (synopsis "posix backend (poll/ppoll+mtime)") + (private_modules heap) + (libraries nanoev unix iomux mtime mtime.clock.os)) diff --git a/src/posix/heap.ml b/src/posix/heap.ml new file mode 100644 index 0000000..a9a9c9e --- /dev/null +++ b/src/posix/heap.ml @@ -0,0 +1,61 @@ +type 'a tree = + | E + | N of int * 'a * 'a tree * 'a tree + +type 'a t = { + leq: 'a -> 'a -> bool; + mutable t: 'a tree; +} + +let create ~leq () : _ t = { leq; t = E } + +let[@inline] is_empty (self : _ t) = + match self.t with + | E -> true + | N _ -> false + +exception Empty + +open struct + (** Rank of the tree *) + let[@inline] rank_ = function + | E -> 0 + | N (r, _, _, _) -> r + + (** Make a balanced node labelled with [x], and subtrees [a] and [b]. + We ensure that the right child's rank is ≤ to the rank of the + left child (leftist property). The rank of the resulting node + is the length of the rightmost path. *) + let[@inline] mk_node_ x a b = + if rank_ a >= rank_ b then + N (rank_ b + 1, x, a, b) + else + N (rank_ a + 1, x, b, a) + + let rec merge ~leq t1 t2 = + match t1, t2 with + | t, E -> t + | E, t -> t + | N (_, x, a1, b1), N (_, y, a2, b2) -> + if leq x y then + mk_node_ x a1 (merge ~leq b1 t2) + else + mk_node_ y a2 (merge ~leq t1 b2) +end + +let clear self = self.t <- E + +let[@inline] insert (self : _ t) x : unit = + self.t <- merge ~leq:self.leq self.t (N (1, x, E, E)) + +let[@inline] peek_min_exn (self : _ t) = + match self.t with + | E -> raise Empty + | N (_, x, _, _) -> x + +let[@inline] pop_min_exn (self : _ t) = + match self.t with + | E -> raise Empty + | N (_, x, l, r) -> + self.t <- merge ~leq:self.leq l r; + x diff --git a/src/posix/heap.mli b/src/posix/heap.mli new file mode 100644 index 0000000..3efd4c4 --- /dev/null +++ b/src/posix/heap.mli @@ -0,0 +1,13 @@ +type 'a t + +val create : leq:('a -> 'a -> bool) -> unit -> 'a t + +val is_empty : _ t -> bool +(** [is_empty h] returns [true] if the heap [h] is empty. *) + +exception Empty + +val clear : _ t -> unit +val insert : 'a t -> 'a -> unit +val peek_min_exn : 'a t -> 'a +val pop_min_exn : 'a t -> 'a diff --git a/src/posix/nanoev_posix.ml b/src/posix/nanoev_posix.ml new file mode 100644 index 0000000..ad2e0b0 --- /dev/null +++ b/src/posix/nanoev_posix.ml @@ -0,0 +1,291 @@ +open struct + module Trace_ = Nanoev.Trace_ + + let ( let@ ) = ( @@ ) + let now_ns : unit -> int64 = Mtime_clock.now_ns +end + +module Fd_tbl = Hashtbl.Make (struct + open Iomux.Util + + type t = Unix.file_descr + + let equal a b = Int.equal (fd_of_unix a) (fd_of_unix b) + let hash a = Hashtbl.hash (fd_of_unix a) +end) + +module P = Iomux.Poll +module Flags = P.Flags +open Iomux.Util + +(** Callback list *) +type cbs = + | Nil + | Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs + +let[@inline] cb_is_empty = function + | Nil -> true + | Sub _ -> false + +type timer_ev = + | Timer : { + deadline: float; + x: 'a; + y: 'b; + f: 'a -> 'b -> unit; + } + -> timer_ev + +type per_fd = { + fd: Unix.file_descr; + idx: int; (** Index in the buffer *) + mutable r: cbs; + mutable w: cbs; +} + +type st = { + timer: timer_ev Heap.t; + fds: per_fd Fd_tbl.t; + poll: P.t; + mutable cur_idx: int; (** index in the [poll] buffer *) + mutable idx_freelist: int list; + (** previously used indices that are recycled *) + wakeup_rd: Unix.file_descr; + wakeup_wr: Unix.file_descr; + wakeup_triggered: bool Atomic.t; + (** Make [wakeup_from_outside] idempotent within an iteration of [step] *) + in_poll: bool Atomic.t; + (** Are we currently inside a call to [poll]? Useful for other threads to + know whether to wake us up via the pipe *) + lock: Mutex.t; +} + +let rec perform_cbs ~closed = function + | Nil -> () + | Sub (x, y, f, tail) -> + f ~closed x y; + perform_cbs ~closed tail + +let rec perform_cbs_closed ~closed = function + | Nil -> () + | Sub (x, y, f, tail) -> + f ~closed x y; + perform_cbs_closed ~closed tail + +let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline + +let create_st () : st = + let wakeup_rd, wakeup_wr = Unix.pipe () in + Unix.set_nonblock wakeup_rd; + { + timer = Heap.create ~leq:leq_timer (); + fds = Fd_tbl.create 16; + poll = P.create (); + cur_idx = 0; + idx_freelist = []; + wakeup_rd; + wakeup_wr; + wakeup_triggered = Atomic.make false; + in_poll = Atomic.make false; + lock = Mutex.create (); + } + +let[@inline] with_lock_ (self : st) f = + Mutex.lock self.lock; + match f self with + | exception e -> + Mutex.unlock self.lock; + raise e + | res -> + Mutex.unlock self.lock; + res + +let clear (self : st) = + let@ self = with_lock_ self in + Heap.clear self.timer; + Fd_tbl.clear self.fds; + for i = 0 to P.maxfds self.poll - 1 do + P.set_index self.poll i P.invalid_fd Flags.empty + done; + self.cur_idx <- 0; + self.idx_freelist <- []; + () + +let wakeup_from_outside (self : st) : unit = + if not (Atomic.exchange self.wakeup_triggered true) then + let@ _sp = + Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.wakeup-from-outside" + in + let b = Bytes.make 1 '!' in + ignore (Unix.write self.wakeup_wr b 0 1 : int) + +let get_fd_ (self : st) fd : per_fd = + match Fd_tbl.find self.fds fd with + | per_fd -> per_fd + | exception Not_found -> + let idx = + match self.idx_freelist with + | i :: tl -> + self.idx_freelist <- tl; + i + | [] -> + if self.cur_idx = P.maxfds self.poll then + invalid_arg "No available slot in poll"; + let n = self.cur_idx in + self.cur_idx <- self.cur_idx + 1; + n + in + let per_fd = { idx; fd; r = Nil; w = Nil } in + Fd_tbl.add self.fds fd per_fd; + per_fd + +let close self fd : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in + let r, w = + let@ self = with_lock_ self in + match Fd_tbl.find self.fds fd with + | per_fd -> + Fd_tbl.remove self.fds fd; + self.idx_freelist <- per_fd.idx :: self.idx_freelist; + if Atomic.get self.in_poll then wakeup_from_outside self; + per_fd.r, per_fd.w + | exception Not_found -> + invalid_arg "File descriptor is not known to Nanoev" + in + + (* call callbacks outside of the lock *) + perform_cbs_closed ~closed:true r; + perform_cbs_closed ~closed:true w; + () + +let on_readable self fd x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in + let@ self = with_lock_ self in + let per_fd = get_fd_ self fd in + per_fd.r <- Sub (x, y, f, per_fd.r); + (* FIXME: P.set_index *) + self.sub_up_to_date <- false; + if Atomic.get self.in_select then wakeup_from_outside self + +let on_writable self fd x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in + let@ self = with_lock_ self in + let per_fd = get_fd_ self fd in + per_fd.w <- Sub (x, y, f, per_fd.w); + (* FIXME: P.set_index *) + self.sub_up_to_date <- false; + if Atomic.get self.in_select then wakeup_from_outside self + +let run_after_s self time x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.run-after-s" in + let@ self = with_lock_ self in + let deadline = now_ () +. time in + Heap.insert self.timer (Timer { deadline; x; y; f }); + if Atomic.get self.in_select then wakeup_from_outside self + +let recompute_if_needed (self : st) = + if not self.sub_up_to_date then ( + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "recompute-if-needed" in + self.sub_up_to_date <- true; + self.sub_r <- []; + self.sub_w <- []; + Hashtbl.iter + (fun fd per_fd -> + if cb_is_empty per_fd.r && cb_is_empty per_fd.w then + Hashtbl.remove self.fds fd; + if not (cb_is_empty per_fd.r) then self.sub_r <- fd :: self.sub_r; + if not (cb_is_empty per_fd.w) then self.sub_w <- fd :: self.sub_w) + self.fds + ) + +let next_deadline_ (self : st) : float option = + match Heap.peek_min_exn self.timer with + | exception Heap.Empty -> None + | Timer t -> Some t.deadline + +let step (self : st) : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.unix.step" in + (* gather the subscriptions and timeout *) + let timeout, sub_r, sub_w = + let@ self = with_lock_ self in + recompute_if_needed self; + let timeout = + match next_deadline_ self with + | None -> 30. + | Some d -> max 0. (d -. now_ ()) + in + timeout, self.sub_r, self.sub_w + in + + (* enter [select] *) + Atomic.set self.in_select true; + let r_reads, r_writes, _ = + let@ _sp = + Trace_.with_span ~__FILE__ ~__LINE__ "select" ~data:(fun () -> + [ + "timeout", `Float timeout; + "reads", `Int (List.length sub_r); + "writes", `Int (List.length sub_w); + ]) + in + Unix.select (self.wakeup_rd :: sub_r) sub_w [] timeout + in + Atomic.set self.in_select false; + + (* drain pipe *) + if Atomic.exchange self.wakeup_triggered false then ( + let b1 = Bytes.create 1 in + while try Unix.read self.wakeup_rd b1 0 1 > 0 with _ -> false do + () + done + ); + + (* gather the [per_fd] that are ready *) + let ready_r = ref [] in + let ready_w = ref [] in + + (* gather the [per_fd] that have updates *) + (let@ self = with_lock_ self in + if r_reads != [] || r_writes != [] then self.sub_up_to_date <- false; + + List.iter + (fun fd -> + if fd != self.wakeup_rd then ( + let per_fd = Hashtbl.find self.fds fd in + ready_r := per_fd :: !ready_r + )) + r_reads; + List.iter + (fun fd -> + let per_fd = Hashtbl.find self.fds fd in + ready_w := per_fd :: !ready_w) + r_writes); + + (* call callbacks *) + List.iter + (fun fd -> + perform_cbs ~closed:false fd.r; + fd.r <- Nil) + !ready_r; + List.iter + (fun fd -> + perform_cbs ~closed:false fd.w; + fd.w <- Nil) + !ready_w; + + () + +let ops : st Nanoev.Impl.ops = + { + step; + close; + on_readable; + on_writable; + run_after_s; + wakeup_from_outside; + clear; + } + +include Nanoev + +let create () : t = Impl.build ops (create_st ()) diff --git a/src/posix/nanoev_posix.mli b/src/posix/nanoev_posix.mli new file mode 100644 index 0000000..63ee287 --- /dev/null +++ b/src/posix/nanoev_posix.mli @@ -0,0 +1,8 @@ +(** Nano event loop using Poll/Ppoll *) + +include module type of struct + include Nanoev +end + +val create : unit -> t +val create_with : Iomux.Poll.t -> t diff --git a/src/unix/dune b/src/unix/dune index 4f14854..3c0fe50 100644 --- a/src/unix/dune +++ b/src/unix/dune @@ -2,4 +2,5 @@ (name nanoev_unix) (public_name nanoev.unix) (synopsis "Unix/select backend") + (private_modules heap) (libraries nanoev unix)) From 8de69367875c3bf5caab86f410ec84a32fd6bf69 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 30 Apr 2025 21:46:03 -0400 Subject: [PATCH 03/28] format --- src/picos/nanoev_picos.mli | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/picos/nanoev_picos.mli b/src/picos/nanoev_picos.mli index ef795bb..7dfbe1b 100644 --- a/src/picos/nanoev_picos.mli +++ b/src/picos/nanoev_picos.mli @@ -4,29 +4,33 @@ val setup_bg_thread : Nanoev.t -> unit (** Install this event loop in a background thread *) val has_bg_thread : unit -> bool -(** [has_bg_thread ()] is [true] iff a background thread is running a nanoev loop *) +(** [has_bg_thread ()] is [true] iff a background thread is running a nanoev + loop *) (** {2 Non blocking IO primitives} *) val read : Unix.file_descr -> bytes -> int -> int -> int (** Read from the non blocking FD. - @raise Nanoev.Closed if the FD is closed - @raise Unix.Unix_error for other errors *) + @raise Nanoev.Closed if the FD is closed + @raise Unix.Unix_error for other errors *) val write : Unix.file_descr -> bytes -> int -> int -> int (** Write into the non blocking FD. - @raise Nanoev.Closed if the FD is closed - @raise Unix.Unix_error for other errors *) + @raise Nanoev.Closed if the FD is closed + @raise Unix.Unix_error for other errors *) val close : Unix.file_descr -> unit (** Close the file descriptor - @raise Unix.Unix_error when it fails *) + @raise Unix.Unix_error when it fails *) val connect : Unix.file_descr -> Unix.sockaddr -> unit +(** Connect this FD to the remote address. + @raise Nanoev.Closed if the FD is closed. + @raise Unix.Unix_error for other errors *) val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr (** Accept a connection on this fd. - @raise Nanoev.Closed if the FD is closed. - @raise Unix.Unix_error for other errors *) + @raise Nanoev.Closed if the FD is closed. + @raise Unix.Unix_error for other errors *) val sleep : float -> unit From 7a0c3e127935e63d565a0081f16832c570d81815 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 30 Apr 2025 22:16:06 -0400 Subject: [PATCH 04/28] add `max_fds` to nanoev core interface --- src/core/nanoev.ml | 2 ++ src/core/nanoev.mli | 4 ++++ src/unix/nanoev_unix.ml | 4 ++++ 3 files changed, 10 insertions(+) diff --git a/src/core/nanoev.ml b/src/core/nanoev.ml index c15935e..75d34b7 100644 --- a/src/core/nanoev.ml +++ b/src/core/nanoev.ml @@ -7,6 +7,7 @@ module Impl = struct clear: 'st -> unit; wakeup_from_outside: 'st -> unit; close: 'st -> Unix.file_descr -> unit; + max_fds: 'st -> int; on_readable: 'a 'b. 'st -> @@ -39,6 +40,7 @@ type t = Impl.t let[@inline] clear (Ev (ops, st)) = ops.clear st let[@inline] wakeup_from_outside (Ev (ops, st)) = ops.wakeup_from_outside st let[@inline] close (Ev (ops, st)) fd = ops.close st fd +let[@inline] max_fds (Ev (ops, st)) = ops.max_fds st let[@inline] on_readable (Ev (ops, st)) fd x y f : unit = ops.on_readable st fd x y f diff --git a/src/core/nanoev.mli b/src/core/nanoev.mli index 4792cd6..213b9de 100644 --- a/src/core/nanoev.mli +++ b/src/core/nanoev.mli @@ -9,6 +9,7 @@ module Impl : sig clear: 'st -> unit; wakeup_from_outside: 'st -> unit; close: 'st -> Unix.file_descr -> unit; + max_fds: 'st -> int; on_readable: 'a 'b. 'st -> @@ -43,6 +44,9 @@ val step : t -> unit val close : t -> Unix.file_descr -> unit (** Close the file descriptor and clean it up *) +val max_fds : t -> int +(** Maximum number of file descriptors that can be observed at once. *) + val on_readable : t -> Unix.file_descr -> 'a -> 'b -> (closed:bool -> 'a -> 'b -> unit) -> unit diff --git a/src/unix/nanoev_unix.ml b/src/unix/nanoev_unix.ml index 7d656c1..b74ea06 100644 --- a/src/unix/nanoev_unix.ml +++ b/src/unix/nanoev_unix.ml @@ -243,12 +243,16 @@ let step (self : st) : unit = () +(* limit for select is fixed and known *) +let max_fds _ = 1024 + let ops : st Nanoev.Impl.ops = { step; close; on_readable; on_writable; + max_fds; run_after_s; wakeup_from_outside; clear; From 3125f3274b757f4833423dfbbedfa9e63d077acf Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 30 Apr 2025 22:16:19 -0400 Subject: [PATCH 05/28] wip: posix --- nanoev-posix.opam | 32 ++++++++++ src/posix/dune | 2 +- src/posix/nanoev_posix.ml | 120 +++++++++++++++++++++++++------------- 3 files changed, 113 insertions(+), 41 deletions(-) create mode 100644 nanoev-posix.opam diff --git a/nanoev-posix.opam b/nanoev-posix.opam new file mode 100644 index 0000000..d113361 --- /dev/null +++ b/nanoev-posix.opam @@ -0,0 +1,32 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Use mtime+iomux (posix compliant) as a backend for nanoev" +maintainer: ["Simon Cruanes"] +authors: ["Simon Cruanes"] +license: "MIT" +tags: ["unix" "select" "async" "iomux" "nanoev"] +homepage: "https://github.com/c-cube/nanoev" +bug-reports: "https://github.com/c-cube/nanoev/issues" +depends: [ + "ocaml" + "dune" {>= "2.7"} + "base-unix" + "iomux" + "mtime" {>= "2.0"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/c-cube/nanoev.git" diff --git a/src/posix/dune b/src/posix/dune index e7147f2..7387ac6 100644 --- a/src/posix/dune +++ b/src/posix/dune @@ -3,4 +3,4 @@ (public_name nanoev-posix) (synopsis "posix backend (poll/ppoll+mtime)") (private_modules heap) - (libraries nanoev unix iomux mtime mtime.clock.os)) + (libraries threads nanoev unix iomux mtime mtime.clock.os)) diff --git a/src/posix/nanoev_posix.ml b/src/posix/nanoev_posix.ml index ad2e0b0..8601421 100644 --- a/src/posix/nanoev_posix.ml +++ b/src/posix/nanoev_posix.ml @@ -18,11 +18,30 @@ module P = Iomux.Poll module Flags = P.Flags open Iomux.Util +(* TODO: remove +module Sync_queue = struct + type 'a t = { + q: 'a Queue.t; + mutex: Mutex.t; + } + + let create () : _ t = { q = Queue.create (); mutex = Mutex.create () } + + let push (self : _ t) x : unit = + Mutex.lock self.mutex; + Queue.push x self.q; + Mutex.unlock self.mutex +end +*) + (** Callback list *) type cbs = | Nil | Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs +(** Single callback *) +type cb = Cb : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) -> cb + let[@inline] cb_is_empty = function | Nil -> true | Sub _ -> false @@ -38,25 +57,40 @@ type timer_ev = type per_fd = { fd: Unix.file_descr; - idx: int; (** Index in the buffer *) + mutable idx: int; + (** Index in the buffer. Can change because we swap FDs sometimes to + remove items. *) mutable r: cbs; mutable w: cbs; } +let[@inline] per_fd_flags (self : per_fd) : Flags.t = + let fl = ref Flags.empty in + (if self.r <> Nil then fl := Flags.(!fl + pollin)); + (if self.w <> Nil then fl := Flags.(!fl + pollout)); + !fl + +type queued_task = + | Q_timer of timer_ev + | Q_on_readable of Unix.file_descr * cb + | Q_on_writable of Unix.file_descr * cb + | Q_clear + type st = { timer: timer_ev Heap.t; fds: per_fd Fd_tbl.t; poll: P.t; - mutable cur_idx: int; (** index in the [poll] buffer *) - mutable idx_freelist: int list; - (** previously used indices that are recycled *) + mutable len: int; (** length of the active prefix of the [poll] buffer *) wakeup_rd: Unix.file_descr; wakeup_wr: Unix.file_descr; wakeup_triggered: bool Atomic.t; (** Make [wakeup_from_outside] idempotent within an iteration of [step] *) - in_poll: bool Atomic.t; - (** Are we currently inside a call to [poll]? Useful for other threads to - know whether to wake us up via the pipe *) + in_poll: Thread.t option Atomic.t; + (** Are we currently inside a call to [poll], and in which thread? Useful + for other threads to know whether to wake us up via the pipe *) + queued_tasks: queued_task Queue.t; + (** While in [poll()], changes get queued, so we don't invalidate the poll + buffer before the syscall returns *) lock: Mutex.t; } @@ -72,23 +106,7 @@ let rec perform_cbs_closed ~closed = function f ~closed x y; perform_cbs_closed ~closed tail -let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline - -let create_st () : st = - let wakeup_rd, wakeup_wr = Unix.pipe () in - Unix.set_nonblock wakeup_rd; - { - timer = Heap.create ~leq:leq_timer (); - fds = Fd_tbl.create 16; - poll = P.create (); - cur_idx = 0; - idx_freelist = []; - wakeup_rd; - wakeup_wr; - wakeup_triggered = Atomic.make false; - in_poll = Atomic.make false; - lock = Mutex.create (); - } +let[@inline] leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline let[@inline] with_lock_ (self : st) f = Mutex.lock self.lock; @@ -100,6 +118,24 @@ let[@inline] with_lock_ (self : st) f = Mutex.unlock self.lock; res +let create_st () : st = + let wakeup_rd, wakeup_wr = Unix.pipe () in + Unix.set_nonblock wakeup_rd; + { + timer = Heap.create ~leq:leq_timer (); + fds = Fd_tbl.create 16; + poll = P.create (); + len = 0; + wakeup_rd; + wakeup_wr; + wakeup_triggered = Atomic.make false; + in_poll = Atomic.make None; + queued_tasks = Queue.create (); + lock = Mutex.create (); + } + +let max_fds (self : st) : int = P.maxfds self.poll + let clear (self : st) = let@ self = with_lock_ self in Heap.clear self.timer; @@ -107,8 +143,7 @@ let clear (self : st) = for i = 0 to P.maxfds self.poll - 1 do P.set_index self.poll i P.invalid_fd Flags.empty done; - self.cur_idx <- 0; - self.idx_freelist <- []; + self.len <- 0; () let wakeup_from_outside (self : st) : unit = @@ -124,36 +159,40 @@ let get_fd_ (self : st) fd : per_fd = | per_fd -> per_fd | exception Not_found -> let idx = - match self.idx_freelist with - | i :: tl -> - self.idx_freelist <- tl; - i - | [] -> - if self.cur_idx = P.maxfds self.poll then - invalid_arg "No available slot in poll"; - let n = self.cur_idx in - self.cur_idx <- self.cur_idx + 1; - n + if self.len = P.maxfds self.poll then + invalid_arg "No available slot in poll"; + let n = self.len in + self.len <- self.len + 1; + n in let per_fd = { idx; fd; r = Nil; w = Nil } in Fd_tbl.add self.fds fd per_fd; per_fd -let close self fd : unit = +let close_ (self : st) fd : unit = let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in let r, w = let@ self = with_lock_ self in match Fd_tbl.find self.fds fd with | per_fd -> Fd_tbl.remove self.fds fd; - self.idx_freelist <- per_fd.idx :: self.idx_freelist; - if Atomic.get self.in_poll then wakeup_from_outside self; + + (* not the last element, move the last element here *) + if per_fd.idx + 1 < self.len then ( + let last_fd = P.get_fd self.poll (self.len - 1) in + match Fd_tbl.find_opt self.fds last_fd with + | None -> () + | Some last_per_fd -> + last_per_fd.idx <- per_fd.idx; + P.set_index self.poll per_fd.idx last_fd (per_fd_flags last_per_fd) + ); + self.len <- self.len - 1; per_fd.r, per_fd.w | exception Not_found -> invalid_arg "File descriptor is not known to Nanoev" in - (* call callbacks outside of the lock *) + (* call callbacks outside of the lock's critical section *) perform_cbs_closed ~closed:true r; perform_cbs_closed ~closed:true w; () @@ -282,6 +321,7 @@ let ops : st Nanoev.Impl.ops = on_readable; on_writable; run_after_s; + max_fds; wakeup_from_outside; clear; } From 4c3c53ee169affe609c967cf8bef22cadd548fb7 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 11:22:32 -0400 Subject: [PATCH 06/28] feat(picos): add shutdown and max_fds --- src/picos/nanoev_picos.ml | 19 +++++++++++++++++++ src/picos/nanoev_picos.mli | 22 +++++++++++++++++----- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/src/picos/nanoev_picos.ml b/src/picos/nanoev_picos.ml index 9920345..0835252 100644 --- a/src/picos/nanoev_picos.ml +++ b/src/picos/nanoev_picos.ml @@ -55,10 +55,24 @@ module Global_ = struct nanoev = ev; th = Thread.create (bg_thread_ ~active ~evloop:ev) (); } + + let shutdown_bg_thread () = + let@ () = with_lock lock in + match Atomic.exchange st None with + | None -> () + | Some st -> + Atomic.set st.active false; + Nanoev.wakeup_from_outside st.nanoev; + Thread.join st.th end let has_bg_thread = Global_.has_bg_thread let setup_bg_thread = Global_.setup_bg_thread +let shutdown_bg_thread = Global_.shutdown_bg_thread + +let with_setup_bg_thread ev f = + setup_bg_thread ev; + Fun.protect ~finally:shutdown_bg_thread f let[@inline] get_loop_exn_ () : Nanoev.t = match Atomic.get Global_.st with @@ -136,6 +150,11 @@ let write fd buf i len : int = let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr) +let[@inline] max_fds () = + match Atomic.get Global_.st with + | None -> 1024 + | Some st -> Nanoev.max_fds st.nanoev + let sleep t = if t > 0. then ( let ev = get_loop_exn_ () in diff --git a/src/picos/nanoev_picos.mli b/src/picos/nanoev_picos.mli index 7dfbe1b..a35bb4b 100644 --- a/src/picos/nanoev_picos.mli +++ b/src/picos/nanoev_picos.mli @@ -1,11 +1,18 @@ (** Basic interface with picos *) -val setup_bg_thread : Nanoev.t -> unit -(** Install this event loop in a background thread *) +module Background_thread : sig + val setup_bg_thread : Nanoev.t -> unit + (** Install this event loop in a background thread *) -val has_bg_thread : unit -> bool -(** [has_bg_thread ()] is [true] iff a background thread is running a nanoev - loop *) + val shutdown_bg_thread : unit -> unit + (** Shutdown background thread, assuming {! has_bg_thread} returns [true] *) + + val with_setup_bg_thread : Nanoev.t -> (unit -> 'a) -> 'a + + val has_bg_thread : unit -> bool + (** [has_bg_thread ()] is [true] iff a background thread is running a nanoev + loop *) +end (** {2 Non blocking IO primitives} *) @@ -33,4 +40,9 @@ val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr @raise Nanoev.Closed if the FD is closed. @raise Unix.Unix_error for other errors *) +val max_fds : unit -> int +(** Maximum number of file descriptors one can await on. See {!Nanoev.max_fds} +*) + val sleep : float -> unit +(** Suspend current fiber for [n] seconds *) From 26bdb34cba835d4f54c7b927358835aba8e56793 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 11:39:00 -0400 Subject: [PATCH 07/28] feat(tiny_httpd): adjust buffer pool size to number of connections --- src/tiny_httpd/nanoev_tiny_httpd.ml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/tiny_httpd/nanoev_tiny_httpd.ml b/src/tiny_httpd/nanoev_tiny_httpd.ml index 7369fde..71ebb65 100644 --- a/src/tiny_httpd/nanoev_tiny_httpd.ml +++ b/src/tiny_httpd/nanoev_tiny_httpd.ml @@ -281,7 +281,7 @@ end open struct let get_max_connection_ ?(max_connections = 2048) () : int = - let max_connections = max 4 max_connections in + let max_connections = min (max 4 @@ EV.max_fds ()) max_connections in max_connections let clear_slice (slice : Slice.t) = @@ -294,12 +294,13 @@ let create ?(masksigpipe = not Sys.win32) ?max_connections ?(timeout = 0.0) ?buf_size ?(get_time_s = Unix.gettimeofday) ?(addr = "127.0.0.1") ?(port = 8080) ?sock ?middlewares ~new_thread () : TH.Server.t = let max_connections = get_max_connection_ ?max_connections () in + let max_pool_size = max_connections * 2 in let server = { Unix_tcp_server_.addr; new_thread; buf_pool = - Pool.create ~clear:Buf.clear_and_zero + Pool.create ~clear:Buf.clear_and_zero ~max_size:max_pool_size ~mk_item:(fun () -> Buf.create ?size:buf_size ()) (); slice_pool = From adc468b59df89d9e4e73ac14a5e64ef0bff0f862 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 11:39:23 -0400 Subject: [PATCH 08/28] feat(posix): first working version --- src/posix/nanoev_posix.ml | 477 +++++++++++++++++++++---------------- src/posix/nanoev_posix.mli | 4 +- 2 files changed, 270 insertions(+), 211 deletions(-) diff --git a/src/posix/nanoev_posix.ml b/src/posix/nanoev_posix.ml index 8601421..ccc6806 100644 --- a/src/posix/nanoev_posix.ml +++ b/src/posix/nanoev_posix.ml @@ -3,6 +3,8 @@ open struct let ( let@ ) = ( @@ ) let now_ns : unit -> int64 = Mtime_clock.now_ns + let[@inline] ns_of_s (t : float) : int64 = Int64.of_float (t *. 1e9) + let[@inline] ns_to_s (t : int64) : float = Int64.to_float t /. 1e9 end module Fd_tbl = Hashtbl.Make (struct @@ -16,9 +18,7 @@ end) module P = Iomux.Poll module Flags = P.Flags -open Iomux.Util -(* TODO: remove module Sync_queue = struct type 'a t = { q: 'a Queue.t; @@ -31,68 +31,133 @@ module Sync_queue = struct Mutex.lock self.mutex; Queue.push x self.q; Mutex.unlock self.mutex + + let transfer (self : _ t) q : unit = + Mutex.lock self.mutex; + Queue.transfer self.q q; + Mutex.unlock self.mutex end -*) (** Callback list *) type cbs = | Nil | Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs -(** Single callback *) -type cb = Cb : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) -> cb - -let[@inline] cb_is_empty = function - | Nil -> true - | Sub _ -> false - type timer_ev = | Timer : { - deadline: float; + deadline: int64; x: 'a; y: 'b; f: 'a -> 'b -> unit; } -> timer_ev -type per_fd = { +type fd_data = { fd: Unix.file_descr; mutable idx: int; - (** Index in the buffer. Can change because we swap FDs sometimes to - remove items. *) + (** Index in the poll buffer. Mutable because we might change it when we + swap FDs to remove items. *) mutable r: cbs; mutable w: cbs; } +(** Data associated to a given FD *) -let[@inline] per_fd_flags (self : per_fd) : Flags.t = +let[@inline] fd_flags (self : fd_data) : Flags.t = let fl = ref Flags.empty in - (if self.r <> Nil then fl := Flags.(!fl + pollin)); - (if self.w <> Nil then fl := Flags.(!fl + pollout)); + (if self.r != Nil then fl := Flags.(!fl + pollin)); + (if self.w != Nil then fl := Flags.(!fl + pollout)); !fl type queued_task = - | Q_timer of timer_ev - | Q_on_readable of Unix.file_descr * cb - | Q_on_writable of Unix.file_descr * cb + | Q_run_after of timer_ev + | Q_on_readable : + Unix.file_descr * 'a * 'b * (closed:bool -> 'a -> 'b -> unit) + -> queued_task + | Q_on_writable : + Unix.file_descr * 'a * 'b * (closed:bool -> 'a -> 'b -> unit) + -> queued_task | Q_clear + | Q_close of Unix.file_descr type st = { timer: timer_ev Heap.t; - fds: per_fd Fd_tbl.t; + fds: fd_data Fd_tbl.t; poll: P.t; mutable len: int; (** length of the active prefix of the [poll] buffer *) wakeup_rd: Unix.file_descr; wakeup_wr: Unix.file_descr; wakeup_triggered: bool Atomic.t; (** Make [wakeup_from_outside] idempotent within an iteration of [step] *) - in_poll: Thread.t option Atomic.t; + in_poll: bool Atomic.t; (** Are we currently inside a call to [poll], and in which thread? Useful for other threads to know whether to wake us up via the pipe *) - queued_tasks: queued_task Queue.t; + mutable owner_thread: int; + (** Thread allowed to perform operations on this poll instance. Starts at + [-1]. *) + queued_tasks: queued_task Sync_queue.t; (** While in [poll()], changes get queued, so we don't invalidate the poll buffer before the syscall returns *) - lock: Mutex.t; } +(* TODO: [Thread.t] field to remember the owner thread, and + thread-safe queue for externally queued tasks. + Only owner thread can call [step]. *) + +let[@inline] queue_task_ (self : st) t : unit = + Sync_queue.push self.queued_tasks t + +(** [true] if called from the owner thread *) +let[@inline] in_owner_thread (self : st) : bool = + self.owner_thread != -1 && self.owner_thread == Thread.(id (self ())) + +let[@inline] in_poll (self : st) : bool = Atomic.get self.in_poll +let[@inline] leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline + +let create_st () : st = + let wakeup_rd, wakeup_wr = Unix.pipe () in + (* reading end must be non blocking so it's not always immediately + ready; writing end is blocking to make it simpler to wakeup from other + threads *) + Unix.set_nonblock wakeup_rd; + let self = + { + timer = Heap.create ~leq:leq_timer (); + fds = Fd_tbl.create 16; + poll = P.create (); + len = 0; + wakeup_rd; + wakeup_wr; + wakeup_triggered = Atomic.make false; + in_poll = Atomic.make false; + owner_thread = -1; + queued_tasks = Sync_queue.create (); + } + in + + (* always watch for the pipe being readable *) + P.set_index self.poll 0 self.wakeup_rd Flags.pollin; + self.len <- 1; + + self + +let max_fds (self : st) : int = P.maxfds self.poll + +let[@inline never] wakeup_real_ (self : st) : unit = + let@ _sp = + Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.wakeup-from-outside" + in + let b = Bytes.make 1 '!' in + ignore (Unix.write self.wakeup_wr b 0 1 : int) + +let[@inline] wakeup_ (self : st) : unit = + if not (Atomic.exchange self.wakeup_triggered true) then wakeup_real_ self + +let wakeup_from_outside (self : st) : unit = + let already_awake = + (* to avoid race conditions we only take the shortcut if + this is called from the owner thread *) + in_owner_thread self && not (Atomic.get self.in_poll) + in + if not already_awake then wakeup_ self let rec perform_cbs ~closed = function | Nil -> () @@ -100,217 +165,209 @@ let rec perform_cbs ~closed = function f ~closed x y; perform_cbs ~closed tail -let rec perform_cbs_closed ~closed = function - | Nil -> () - | Sub (x, y, f, tail) -> - f ~closed x y; - perform_cbs_closed ~closed tail +(** Change the event loop right now. This must be called only from the owner + thread and outside of [poll]. *) +module Run_now_ = struct + let rec perform_cbs_closed ~closed = function + | Nil -> () + | Sub (x, y, f, tail) -> + f ~closed x y; + perform_cbs_closed ~closed tail -let[@inline] leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline + let clear_ (self : st) : unit = + Heap.clear self.timer; + Fd_tbl.clear self.fds; + for i = 0 to P.maxfds self.poll - 1 do + P.set_index self.poll i P.invalid_fd Flags.empty + done; + Atomic.set self.wakeup_triggered false; + self.len <- 0; + () -let[@inline] with_lock_ (self : st) f = - Mutex.lock self.lock; - match f self with - | exception e -> - Mutex.unlock self.lock; - raise e - | res -> - Mutex.unlock self.lock; - res + let get_fd_ (self : st) fd : fd_data = + (* assert (in_owner_thread self && not (in_poll self)); *) + match Fd_tbl.find self.fds fd with + | per_fd -> per_fd + | exception Not_found -> + let idx = + if self.len = P.maxfds self.poll then + invalid_arg "No available slot in poll"; + let n = self.len in + self.len <- self.len + 1; + n + in + let per_fd = { idx; fd; r = Nil; w = Nil } in + Fd_tbl.add self.fds fd per_fd; + per_fd -let create_st () : st = - let wakeup_rd, wakeup_wr = Unix.pipe () in - Unix.set_nonblock wakeup_rd; - { - timer = Heap.create ~leq:leq_timer (); - fds = Fd_tbl.create 16; - poll = P.create (); - len = 0; - wakeup_rd; - wakeup_wr; - wakeup_triggered = Atomic.make false; - in_poll = Atomic.make None; - queued_tasks = Queue.create (); - lock = Mutex.create (); - } + let remove_fd_ (self : st) (fd_data : fd_data) : unit = + Fd_tbl.remove self.fds fd_data.fd; + P.set_index self.poll fd_data.idx P.invalid_fd Flags.empty; -let max_fds (self : st) : int = P.maxfds self.poll + (* assert (in_owner_thread self && not (in_poll self)); *) + if fd_data.idx > 0 && fd_data.idx + 1 < self.len then ( + (* not the last element nor the first (pipe_rd), move the last element + here to keep the buffer non sparse *) + let last_fd = P.get_fd self.poll (self.len - 1) in + assert (last_fd <> fd_data.fd); + match Fd_tbl.find_opt self.fds last_fd with + | None -> assert false + | Some last_fd_data -> + (* move the last FD to [idx] *) + last_fd_data.idx <- fd_data.idx; + P.set_index self.poll fd_data.idx last_fd (fd_flags last_fd_data) + ); + + self.len <- self.len - 1; + () + + let close_ (self : st) fd : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in + let r, w = + match Fd_tbl.find self.fds fd with + | fd_data -> + remove_fd_ self fd_data; + fd_data.r, fd_data.w + | exception Not_found -> Nil, Nil + in + perform_cbs_closed ~closed:true r; + perform_cbs_closed ~closed:true w; + () + + let on_readable_ self fd x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in + let fd_data = get_fd_ self fd in + fd_data.r <- Sub (x, y, f, fd_data.r); + P.set_index self.poll fd_data.idx fd (fd_flags fd_data) + + let on_writable_ self fd x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in + let fd_data = get_fd_ self fd in + fd_data.w <- Sub (x, y, f, fd_data.w); + P.set_index self.poll fd_data.idx fd (fd_flags fd_data) + + let run_after_s_ self ev : unit = Heap.insert self.timer ev + + let perform_task_ self (t : queued_task) : unit = + match t with + | Q_run_after t -> run_after_s_ self t + | Q_on_readable (fd, x, y, f) -> on_readable_ self fd x y f + | Q_on_writable (fd, x, y, f) -> on_writable_ self fd x y f + | Q_clear -> clear_ self + | Q_close fd -> close_ self fd +end let clear (self : st) = - let@ self = with_lock_ self in - Heap.clear self.timer; - Fd_tbl.clear self.fds; - for i = 0 to P.maxfds self.poll - 1 do - P.set_index self.poll i P.invalid_fd Flags.empty - done; - self.len <- 0; - () - -let wakeup_from_outside (self : st) : unit = - if not (Atomic.exchange self.wakeup_triggered true) then - let@ _sp = - Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.wakeup-from-outside" - in - let b = Bytes.make 1 '!' in - ignore (Unix.write self.wakeup_wr b 0 1 : int) - -let get_fd_ (self : st) fd : per_fd = - match Fd_tbl.find self.fds fd with - | per_fd -> per_fd - | exception Not_found -> - let idx = - if self.len = P.maxfds self.poll then - invalid_arg "No available slot in poll"; - let n = self.len in - self.len <- self.len + 1; - n - in - let per_fd = { idx; fd; r = Nil; w = Nil } in - Fd_tbl.add self.fds fd per_fd; - per_fd - -let close_ (self : st) fd : unit = - let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in - let r, w = - let@ self = with_lock_ self in - match Fd_tbl.find self.fds fd with - | per_fd -> - Fd_tbl.remove self.fds fd; - - (* not the last element, move the last element here *) - if per_fd.idx + 1 < self.len then ( - let last_fd = P.get_fd self.poll (self.len - 1) in - match Fd_tbl.find_opt self.fds last_fd with - | None -> () - | Some last_per_fd -> - last_per_fd.idx <- per_fd.idx; - P.set_index self.poll per_fd.idx last_fd (per_fd_flags last_per_fd) - ); - self.len <- self.len - 1; - per_fd.r, per_fd.w - | exception Not_found -> - invalid_arg "File descriptor is not known to Nanoev" - in - - (* call callbacks outside of the lock's critical section *) - perform_cbs_closed ~closed:true r; - perform_cbs_closed ~closed:true w; - () - -let on_readable self fd x y f : unit = - let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in - let@ self = with_lock_ self in - let per_fd = get_fd_ self fd in - per_fd.r <- Sub (x, y, f, per_fd.r); - (* FIXME: P.set_index *) - self.sub_up_to_date <- false; - if Atomic.get self.in_select then wakeup_from_outside self - -let on_writable self fd x y f : unit = - let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in - let@ self = with_lock_ self in - let per_fd = get_fd_ self fd in - per_fd.w <- Sub (x, y, f, per_fd.w); - (* FIXME: P.set_index *) - self.sub_up_to_date <- false; - if Atomic.get self.in_select then wakeup_from_outside self - -let run_after_s self time x y f : unit = - let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.run-after-s" in - let@ self = with_lock_ self in - let deadline = now_ () +. time in - Heap.insert self.timer (Timer { deadline; x; y; f }); - if Atomic.get self.in_select then wakeup_from_outside self - -let recompute_if_needed (self : st) = - if not self.sub_up_to_date then ( - let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "recompute-if-needed" in - self.sub_up_to_date <- true; - self.sub_r <- []; - self.sub_w <- []; - Hashtbl.iter - (fun fd per_fd -> - if cb_is_empty per_fd.r && cb_is_empty per_fd.w then - Hashtbl.remove self.fds fd; - if not (cb_is_empty per_fd.r) then self.sub_r <- fd :: self.sub_r; - if not (cb_is_empty per_fd.w) then self.sub_w <- fd :: self.sub_w) - self.fds + if in_owner_thread self && not (in_poll self) then + Run_now_.clear_ self + else ( + queue_task_ self @@ Q_clear; + wakeup_from_outside self ) -let next_deadline_ (self : st) : float option = +let close (self : st) fd : unit = + if in_owner_thread self && not (in_poll self) then + Run_now_.close_ self fd + else ( + queue_task_ self @@ Q_close fd; + wakeup_from_outside self + ) + +let on_readable self fd x y f : unit = + if in_owner_thread self && not (in_poll self) then + Run_now_.on_readable_ self fd x y f + else ( + queue_task_ self @@ Q_on_readable (fd, x, y, f); + wakeup_from_outside self + ) + +let on_writable self fd x y f : unit = + if in_owner_thread self && not (in_poll self) then + Run_now_.on_writable_ self fd x y f + else ( + queue_task_ self @@ Q_on_writable (fd, x, y, f); + wakeup_from_outside self + ) + +let run_after_s self (time : float) x y f : unit = + let deadline = Int64.add (now_ns ()) (ns_of_s time) in + let ev = Timer { deadline; x; y; f } in + if in_owner_thread self && not (in_poll self) then + Run_now_.run_after_s_ self ev + else ( + queue_task_ self @@ Q_run_after ev; + wakeup_from_outside self + ) + +let next_deadline_ (self : st) : int64 option = match Heap.peek_min_exn self.timer with | exception Heap.Empty -> None | Timer t -> Some t.deadline let step (self : st) : unit = - let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.unix.step" in - (* gather the subscriptions and timeout *) - let timeout, sub_r, sub_w = - let@ self = with_lock_ self in - recompute_if_needed self; - let timeout = - match next_deadline_ self with - | None -> 30. - | Some d -> max 0. (d -. now_ ()) - in - timeout, self.sub_r, self.sub_w + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.posix.step" in + + self.owner_thread <- Thread.(id (self ())); + let timeout_ns : int64 = + match next_deadline_ self with + | None -> 30_000_000_000L + | Some d -> Int64.max 0L (Int64.sub d (now_ns ())) in - (* enter [select] *) - Atomic.set self.in_select true; - let r_reads, r_writes, _ = + (* process all queued tasks. + + NOTE: race condition: if another thread queues tasks after we do + the transfer, it will call [wakeup_from_outside] and make the pipe_rd FD + readable. So as soon as we call [poll], it will return and we will find + the queued tasks waiting for us. *) + let local_q = Queue.create () in + Sync_queue.transfer self.queued_tasks local_q; + while not (Queue.is_empty local_q) do + let t = Queue.pop local_q in + Run_now_.perform_task_ self t + done; + + Atomic.set self.in_poll true; + + (* enter [poll] *) + let num_ready_fds = let@ _sp = - Trace_.with_span ~__FILE__ ~__LINE__ "select" ~data:(fun () -> - [ - "timeout", `Float timeout; - "reads", `Int (List.length sub_r); - "writes", `Int (List.length sub_w); - ]) + Trace_.with_span ~__FILE__ ~__LINE__ "poll" ~data:(fun () -> + [ "timeout", `Float (ns_to_s timeout_ns); "len", `Int self.len ]) in - Unix.select (self.wakeup_rd :: sub_r) sub_w [] timeout + P.ppoll_or_poll self.poll self.len (Nanoseconds timeout_ns) in - Atomic.set self.in_select false; - (* drain pipe *) + Atomic.set self.in_poll false; + + (* drain notification pipe *) if Atomic.exchange self.wakeup_triggered false then ( - let b1 = Bytes.create 1 in - while try Unix.read self.wakeup_rd b1 0 1 > 0 with _ -> false do + let b1 = Bytes.create 8 in + while try Unix.read self.wakeup_rd b1 0 8 > 0 with _ -> false do () done ); - (* gather the [per_fd] that are ready *) - let ready_r = ref [] in - let ready_w = ref [] in - - (* gather the [per_fd] that have updates *) - (let@ self = with_lock_ self in - if r_reads != [] || r_writes != [] then self.sub_up_to_date <- false; - - List.iter - (fun fd -> - if fd != self.wakeup_rd then ( - let per_fd = Hashtbl.find self.fds fd in - ready_r := per_fd :: !ready_r - )) - r_reads; - List.iter - (fun fd -> - let per_fd = Hashtbl.find self.fds fd in - ready_w := per_fd :: !ready_w) - r_writes); - (* call callbacks *) - List.iter - (fun fd -> - perform_cbs ~closed:false fd.r; - fd.r <- Nil) - !ready_r; - List.iter - (fun fd -> - perform_cbs ~closed:false fd.w; - fd.w <- Nil) - !ready_w; + P.iter_ready self.poll num_ready_fds (fun _idx fd flags -> + if fd <> self.wakeup_rd then ( + let fd_data = + try Fd_tbl.find self.fds fd with Not_found -> assert false + in + + if Flags.mem Flags.pollin flags then ( + let r = fd_data.r in + fd_data.r <- Nil; + perform_cbs ~closed:false r + ); + if Flags.mem Flags.pollout flags then ( + let w = fd_data.w in + fd_data.w <- Nil; + perform_cbs ~closed:false w + ); + + if Flags.empty = fd_flags fd_data then Run_now_.remove_fd_ self fd_data + )); () diff --git a/src/posix/nanoev_posix.mli b/src/posix/nanoev_posix.mli index 63ee287..5d23b61 100644 --- a/src/posix/nanoev_posix.mli +++ b/src/posix/nanoev_posix.mli @@ -5,4 +5,6 @@ include module type of struct end val create : unit -> t -val create_with : Iomux.Poll.t -> t +(** Create a new nanoev loop using [Iomux] (poll/ppoll). + + {b NOTE}: this is NOT thread-safe *) From bbd77c473086eb5f38df153fd6dfb26ac7efd9bd Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 11:39:45 -0400 Subject: [PATCH 09/28] add posix backend to test and to `echo.ml` --- examples/echo/dune | 4 ++-- examples/echo/echo.ml | 35 +++++++++++++++++++++++++++++------ tests/posix/dune | 3 +++ tests/posix/t1.expected | 3 +++ tests/posix/t1.ml | 29 +++++++++++++++++++++++++++++ 5 files changed, 66 insertions(+), 8 deletions(-) create mode 100644 tests/posix/dune create mode 100644 tests/posix/t1.expected create mode 100644 tests/posix/t1.ml diff --git a/examples/echo/dune b/examples/echo/dune index 7fa6bea..99959c7 100644 --- a/examples/echo/dune +++ b/examples/echo/dune @@ -1,4 +1,4 @@ (executable (name echo) - (libraries nanoev nanoev.unix moonpool moonpool.fib trace trace-tef - nanoev_tiny_httpd)) + (libraries nanoev nanoev.unix nanoev-posix moonpool moonpool.fib trace + trace-tef nanoev_tiny_httpd)) diff --git a/examples/echo/echo.ml b/examples/echo/echo.ml index a5543b7..330897f 100644 --- a/examples/echo/echo.ml +++ b/examples/echo/echo.ml @@ -86,6 +86,13 @@ let () = let port_ = ref 8080 in let max_conn = ref 1024 in let j = ref 8 in + let backend = ref `Posix in + + let set_backend = function + | "posix" | "poll" | "default" -> backend := `Posix + | "unix" | "select" -> backend := `Unix + | s -> failwith @@ Printf.sprintf "unknown backend %S" s + in Arg.parse (Arg.align [ @@ -94,15 +101,30 @@ let () = "-j", Arg.Set_int j, " number of threads"; "--debug", Arg.Unit setup_logging, " enable debug"; "--max-conns", Arg.Set_int max_conn, " maximum concurrent connections"; + ( "--backend", + Arg.Symbol + ([ "posix"; "default"; "unix"; "select"; "poll" ], set_backend), + " event loop backend" ); ]) (fun _ -> raise (Arg.Bad "")) "echo [option]*"; - let@ pool = Moonpool.Ws_pool.with_ ~num_threads:!j () in - let@ _runner = Moonpool_fib.main in + let@ pool = + fun yield -> + if !j > 1 then + let@ pool = Moonpool.Ws_pool.with_ ~num_threads:!j () in + let@ _runner = Moonpool_fib.main in + yield pool + else + Moonpool_fib.main yield + in - let ev = Nanoev_unix.create () in - Nanoev_picos.setup_bg_thread ev; + let ev = + match !backend with + | `Posix -> Nanoev_posix.create () + | `Unix -> Nanoev_unix.create () + in + let@ () = Nanoev_picos.with_setup_bg_thread ev in let server = Nanoev_tiny_httpd.create ~new_thread:(Moonpool.run_async pool) ~port:!port_ @@ -273,8 +295,9 @@ let () = let s = to_string_top h in Response.make_string ~headers:[ "content-type", "text/html" ] @@ Ok s); - Printf.printf "listening on http://%s:%d\n%!" (Server.addr server) - (Server.port server); + Printf.printf + "listening on http://%s:%d with %d threads, %d max connections\n%!" + (Server.addr server) (Server.port server) !j !max_conn; match Server.run server with | Ok () -> () | Error e -> raise e diff --git a/tests/posix/dune b/tests/posix/dune new file mode 100644 index 0000000..08e8e1a --- /dev/null +++ b/tests/posix/dune @@ -0,0 +1,3 @@ +(tests + (names t1) + (libraries nanoev nanoev-posix threads)) diff --git a/tests/posix/t1.expected b/tests/posix/t1.expected new file mode 100644 index 0000000..a36183e --- /dev/null +++ b/tests/posix/t1.expected @@ -0,0 +1,3 @@ +writing +can read +done writing diff --git a/tests/posix/t1.ml b/tests/posix/t1.ml new file mode 100644 index 0000000..366a5c9 --- /dev/null +++ b/tests/posix/t1.ml @@ -0,0 +1,29 @@ +module E = Nanoev_posix + +let mkpipe () : Unix.file_descr * Unix.file_descr = + let f1, f2 = Unix.pipe () in + Unix.set_nonblock f1; + Unix.set_nonblock f2; + f1, f2 + +let loop (e : E.t) = + while true do + E.step e + done + +let () = + let ev = E.create () in + ignore (Thread.create loop ev : Thread.t); + let rd, wr = mkpipe () in + E.on_readable ev rd () () (fun ~closed () () -> + if closed then + print_endline "closed!" + else + print_endline "can read"); + Thread.delay 0.05; + print_endline "writing"; + ignore + (Unix.write wr (Bytes.unsafe_of_string "hello") 0 (String.length "hello") + : int); + Thread.delay 0.1; + print_endline "done writing" From c2b7c0e39db239479548a0a3ad315b91b63f0c90 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 11:41:07 -0400 Subject: [PATCH 10/28] remove comment --- src/posix/nanoev_posix.mli | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/posix/nanoev_posix.mli b/src/posix/nanoev_posix.mli index 5d23b61..60e88ee 100644 --- a/src/posix/nanoev_posix.mli +++ b/src/posix/nanoev_posix.mli @@ -5,6 +5,4 @@ include module type of struct end val create : unit -> t -(** Create a new nanoev loop using [Iomux] (poll/ppoll). - - {b NOTE}: this is NOT thread-safe *) +(** Create a new nanoev loop using [Iomux] (poll/ppoll). *) From 14d744c3698afbf3f10995cef79240cb2c6a897c Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 12:05:51 -0400 Subject: [PATCH 11/28] refactor picos: move whole setup into `Background_thread` mod --- src/picos/nanoev_picos.ml | 14 ++++++++------ src/picos/nanoev_picos.mli | 13 ++++++------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/picos/nanoev_picos.ml b/src/picos/nanoev_picos.ml index 0835252..414bf38 100644 --- a/src/picos/nanoev_picos.ml +++ b/src/picos/nanoev_picos.ml @@ -66,13 +66,15 @@ module Global_ = struct Thread.join st.th end -let has_bg_thread = Global_.has_bg_thread -let setup_bg_thread = Global_.setup_bg_thread -let shutdown_bg_thread = Global_.shutdown_bg_thread +module Background_thread = struct + let is_setup = Global_.has_bg_thread + let setup = Global_.setup_bg_thread + let shutdown = Global_.shutdown_bg_thread -let with_setup_bg_thread ev f = - setup_bg_thread ev; - Fun.protect ~finally:shutdown_bg_thread f + let with_setup ev f = + setup ev; + Fun.protect ~finally:shutdown f +end let[@inline] get_loop_exn_ () : Nanoev.t = match Atomic.get Global_.st with diff --git a/src/picos/nanoev_picos.mli b/src/picos/nanoev_picos.mli index a35bb4b..27310dc 100644 --- a/src/picos/nanoev_picos.mli +++ b/src/picos/nanoev_picos.mli @@ -1,17 +1,16 @@ (** Basic interface with picos *) module Background_thread : sig - val setup_bg_thread : Nanoev.t -> unit + val setup : Nanoev.t -> unit (** Install this event loop in a background thread *) - val shutdown_bg_thread : unit -> unit - (** Shutdown background thread, assuming {! has_bg_thread} returns [true] *) + val shutdown : unit -> unit + (** Shutdown background thread, assuming {! is_setup} returns [true] *) - val with_setup_bg_thread : Nanoev.t -> (unit -> 'a) -> 'a + val with_setup : Nanoev.t -> (unit -> 'a) -> 'a - val has_bg_thread : unit -> bool - (** [has_bg_thread ()] is [true] iff a background thread is running a nanoev - loop *) + val is_setup : unit -> bool + (** [is_setup()] is [true] iff a background thread is running a nanoev loop *) end (** {2 Non blocking IO primitives} *) From 9b7c628506747885b47614befae77d944c312b02 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 12:06:07 -0400 Subject: [PATCH 12/28] fix: update example --- examples/echo/echo.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/echo/echo.ml b/examples/echo/echo.ml index 330897f..804e31f 100644 --- a/examples/echo/echo.ml +++ b/examples/echo/echo.ml @@ -124,7 +124,7 @@ let () = | `Posix -> Nanoev_posix.create () | `Unix -> Nanoev_unix.create () in - let@ () = Nanoev_picos.with_setup_bg_thread ev in + let@ () = Nanoev_picos.Background_thread.with_setup ev in let server = Nanoev_tiny_httpd.create ~new_thread:(Moonpool.run_async pool) ~port:!port_ From ff870e7fa77ee26ad445bdbbf007b50126dc91da Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 12:06:15 -0400 Subject: [PATCH 13/28] tiny_httpd: limit max pool size to 4096 --- src/tiny_httpd/nanoev_tiny_httpd.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tiny_httpd/nanoev_tiny_httpd.ml b/src/tiny_httpd/nanoev_tiny_httpd.ml index 71ebb65..290b0ad 100644 --- a/src/tiny_httpd/nanoev_tiny_httpd.ml +++ b/src/tiny_httpd/nanoev_tiny_httpd.ml @@ -294,7 +294,7 @@ let create ?(masksigpipe = not Sys.win32) ?max_connections ?(timeout = 0.0) ?buf_size ?(get_time_s = Unix.gettimeofday) ?(addr = "127.0.0.1") ?(port = 8080) ?sock ?middlewares ~new_thread () : TH.Server.t = let max_connections = get_max_connection_ ?max_connections () in - let max_pool_size = max_connections * 2 in + let max_pool_size = min 4096 max_connections * 2 in let server = { Unix_tcp_server_.addr; From 34a1cc17693f9ec206275a47527d713a4e20c97f Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 13:22:15 -0400 Subject: [PATCH 14/28] tiny_httpd: use picos semaphore; tweak pool size, buf size --- dune-project | 3 +- nanoev_tiny_httpd.opam | 3 +- src/tiny_httpd/dune | 1 + src/tiny_httpd/nanoev_tiny_httpd.ml | 76 +++++++++------------------- src/tiny_httpd/nanoev_tiny_httpd.mli | 1 + 5 files changed, 31 insertions(+), 53 deletions(-) diff --git a/dune-project b/dune-project index b5f2f69..ff2150c 100644 --- a/dune-project +++ b/dune-project @@ -40,7 +40,8 @@ ocaml dune nanoev - picos + (picos (>= 0.6)) + picos_std (tiny_httpd (>= 0.17))) (tags (nanoev http))) diff --git a/nanoev_tiny_httpd.opam b/nanoev_tiny_httpd.opam index 9504331..ea48089 100644 --- a/nanoev_tiny_httpd.opam +++ b/nanoev_tiny_httpd.opam @@ -11,7 +11,8 @@ depends: [ "ocaml" "dune" {>= "2.7"} "nanoev" - "picos" + "picos" {>= "0.6"} + "picos_std" "tiny_httpd" {>= "0.17"} "odoc" {with-doc} ] diff --git a/src/tiny_httpd/dune b/src/tiny_httpd/dune index a372e9d..d68f539 100644 --- a/src/tiny_httpd/dune +++ b/src/tiny_httpd/dune @@ -6,5 +6,6 @@ picos (re_export nanoev) nanoev.picos + picos_std.sync (re_export iostream) (re_export tiny_httpd))) diff --git a/src/tiny_httpd/nanoev_tiny_httpd.ml b/src/tiny_httpd/nanoev_tiny_httpd.ml index 290b0ad..46314e8 100644 --- a/src/tiny_httpd/nanoev_tiny_httpd.ml +++ b/src/tiny_httpd/nanoev_tiny_httpd.ml @@ -5,45 +5,8 @@ module Slice = Iostream.Slice module Pool = TH.Pool module Buf = TH.Buf -let unwrap_ = function - | None -> () - | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt - +module Sem_ = Picos_std_sync.Semaphore.Counting (** Non blocking semaphore *) -module Sem_ = struct - type t = { - mutable n: int; - max: int; - waiting: Picos.Trigger.t Queue.t; - mutex: Mutex.t; - } - - let create n = - if n <= 0 then invalid_arg "Semaphore.create"; - { n; max = n; mutex = Mutex.create (); waiting = Queue.create () } - - let acquire self = - Mutex.lock self.mutex; - while self.n = 0 do - let tr = Picos.Trigger.create () in - Queue.push tr self.waiting; - Mutex.unlock self.mutex; - let res = Picos.Trigger.await tr in - unwrap_ res; - Mutex.lock self.mutex - done; - assert (self.n > 0); - self.n <- self.n - 1; - Mutex.unlock self.mutex - - let release self = - Mutex.lock self.mutex; - self.n <- self.n + 1; - Option.iter Picos.Trigger.signal (Queue.take_opt self.waiting); - Mutex.unlock self.mutex - - let num_acquired self = self.max - self.n -end module Out = struct open Iostream @@ -147,7 +110,7 @@ module Unix_tcp_server_ = struct new_thread: (unit -> unit) -> unit; timeout: float; masksigpipe: bool; - mutable running: bool; (* TODO: use an atomic? *) + running: bool Atomic.t; } let shutdown_silent_ fd = @@ -183,7 +146,7 @@ module Unix_tcp_server_ = struct let inet_addr = Unix.inet_addr_of_string self.addr in Unix.setsockopt sock Unix.SO_REUSEADDR true; Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port)); - let n_listen = 2 * self.max_connections in + let n_listen = self.max_connections in Unix.listen sock n_listen ); @@ -191,10 +154,16 @@ module Unix_tcp_server_ = struct let tcp_server = { - TH.IO.TCP_server.stop = (fun () -> self.running <- false); - running = (fun () -> self.running); + TH.IO.TCP_server.stop = + (fun () -> + Atomic.set self.running false; + + (* close accept socket so the main loop will return *) + try Unix.close sock with _ -> ()); + running = (fun () -> Atomic.get self.running); active_connections = - (fun () -> Sem_.num_acquired self.sem_max_connections); + (fun () -> + self.max_connections - Sem_.get_value self.sem_max_connections); endpoint = (fun () -> let addr, port = get_addr_ sock in @@ -233,7 +202,7 @@ module Unix_tcp_server_ = struct in Unix.set_nonblock sock; - while self.running do + while Atomic.get self.running do match EV.accept sock with | client_sock, client_addr -> (* limit concurrency *) @@ -272,7 +241,7 @@ module Unix_tcp_server_ = struct done; (* Wait for all threads to be done: this only works if all threads are done. *) - Unix.close sock; + (try Unix.close sock with _ -> ()); (* TODO? *) (* Sem_.acquire self.sem_max_connections.max self.sem_max_connections; *) ()); @@ -290,11 +259,16 @@ open struct slice.len <- 0 end -let create ?(masksigpipe = not Sys.win32) ?max_connections ?(timeout = 0.0) - ?buf_size ?(get_time_s = Unix.gettimeofday) ?(addr = "127.0.0.1") - ?(port = 8080) ?sock ?middlewares ~new_thread () : TH.Server.t = +let create ?(masksigpipe = not Sys.win32) ?max_connections ?max_buf_pool_size + ?(timeout = 0.0) ?buf_size ?(get_time_s = Unix.gettimeofday) + ?(addr = "127.0.0.1") ?(port = 8080) ?sock ?middlewares ~new_thread () : + TH.Server.t = let max_connections = get_max_connection_ ?max_connections () in - let max_pool_size = min 4096 max_connections * 2 in + let max_pool_size = + match max_buf_pool_size with + | None -> min 4096 max_connections * 2 + | Some m -> m + in let server = { Unix_tcp_server_.addr; @@ -309,11 +283,11 @@ let create ?(masksigpipe = not Sys.win32) ?max_connections ?(timeout = 0.0) (let buf_size = Option.value buf_size ~default:4096 in fun () -> Slice.create buf_size) (); - running = true; + running = Atomic.make true; port; sock; max_connections; - sem_max_connections = Sem_.create max_connections; + sem_max_connections = Sem_.make max_connections; masksigpipe; timeout; } diff --git a/src/tiny_httpd/nanoev_tiny_httpd.mli b/src/tiny_httpd/nanoev_tiny_httpd.mli index f0cd9af..7c858e5 100644 --- a/src/tiny_httpd/nanoev_tiny_httpd.mli +++ b/src/tiny_httpd/nanoev_tiny_httpd.mli @@ -3,6 +3,7 @@ module TH = Tiny_httpd_core val create : ?masksigpipe:bool -> ?max_connections:int -> + ?max_buf_pool_size:int -> ?timeout:float -> ?buf_size:int -> ?get_time_s:(unit -> float) -> From bd983a395c2944e405e2180452478eddb3b89568 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 13:22:45 -0400 Subject: [PATCH 15/28] picos: simplify a bit read/write retry loops --- src/picos/nanoev_picos.ml | 72 ++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 39 deletions(-) diff --git a/src/picos/nanoev_picos.ml b/src/picos/nanoev_picos.ml index 414bf38..a11c89f 100644 --- a/src/picos/nanoev_picos.ml +++ b/src/picos/nanoev_picos.ml @@ -85,48 +85,42 @@ let[@inline] unwrap_ = function | None -> () | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt -let retry_read_ fd f = - let ev = get_loop_exn_ () in - let[@unroll 1] rec loop () = - match f () with - | res -> res - | exception - Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> - Trace_.message "read must wait"; - let trigger = Picos.Trigger.create () in - let closed_r = ref false in - Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r -> - closed_r := closed; - Picos.Trigger.signal trigger); - Picos.Trigger.await trigger |> unwrap_; - if !closed_r then raise Closed; - loop () - in - loop () +let[@unroll 1] rec retry_read_ fd f = + match f () with + | res -> res + | exception + Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> + (* Trace_.message "read must wait"; *) + let trigger = Picos.Trigger.create () in + let closed_r = ref false in + let ev = get_loop_exn_ () in + Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r -> + closed_r := closed; + Picos.Trigger.signal trigger); + Picos.Trigger.await trigger |> unwrap_; + if !closed_r then raise Closed; + retry_read_ fd f -let retry_write_ fd f = - let ev = get_loop_exn_ () in - let rec loop () = - match f () with - | res -> res - | exception - Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> - Trace_.message "write must wait"; - let trigger = Picos.Trigger.create () in - let closed_r = ref false in - Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r -> - closed_r := closed; - Picos.Trigger.signal trigger); - Picos.Trigger.await trigger |> unwrap_; - if !closed_r then raise Closed; - loop () - in - loop () +let[@unroll 1] rec retry_write_ fd f = + match f () with + | res -> res + | exception + Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> + (* Trace_.message "write must wait"; *) + let ev = get_loop_exn_ () in + let trigger = Picos.Trigger.create () in + let closed_r = ref false in + Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r -> + closed_r := closed; + Picos.Trigger.signal trigger); + Picos.Trigger.await trigger |> unwrap_; + if !closed_r then raise Closed; + retry_write_ fd f let read fd buf i len : int = try retry_read_ fd (fun () -> - Trace_.message "read"; + (* Trace_.message "read"; *) Unix.read fd buf i len) with Closed -> 0 @@ -138,7 +132,7 @@ let close fd = let accept fd = try retry_read_ fd (fun () -> - Trace_.message "accept"; + (* Trace_.message "accept"; *) Unix.accept fd) with Unix.Unix_error ((Unix.ESHUTDOWN | Unix.ECONNABORTED), _, _) -> raise Closed @@ -146,7 +140,7 @@ let accept fd = let write fd buf i len : int = try retry_write_ fd (fun () -> - Trace_.message "write"; + (* Trace_.message "write"; *) Unix.write fd buf i len) with Closed -> 0 From fcaa4b6636d597e956f1f50657f468480acbe46d Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 13:23:01 -0400 Subject: [PATCH 16/28] echo: emit metrics, allow user to set buf size/buf pool size --- examples/echo/echo.ml | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/examples/echo/echo.ml b/examples/echo/echo.ml index 804e31f..0519801 100644 --- a/examples/echo/echo.ml +++ b/examples/echo/echo.ml @@ -79,7 +79,16 @@ let setup_logging () = Logs.set_reporter @@ Logs.format_reporter (); Logs.set_level ~all:true (Some Logs.Debug) +let emit_metrics_ pool server () = + while true do + Trace.counter_int ~level:Info "pool.tasks" (Moonpool.Runner.num_tasks pool); + Trace.counter_int ~level:Info "http.active-conns" + (Server.active_connections server); + Thread.delay 0.3 + done + let () = + Trace.set_current_level Info; let@ () = Trace_tef.with_setup () in Trace.set_thread_name "main"; @@ -87,6 +96,8 @@ let () = let max_conn = ref 1024 in let j = ref 8 in let backend = ref `Posix in + let buf_size = ref 4096 in + let max_buf_pool_size = ref None in let set_backend = function | "posix" | "poll" | "default" -> backend := `Posix @@ -99,6 +110,10 @@ let () = "--port", Arg.Set_int port_, " set port"; "-p", Arg.Set_int port_, " set port"; "-j", Arg.Set_int j, " number of threads"; + ( "--max-buf-pool-size", + Arg.Int (fun i -> max_buf_pool_size := Some i), + " max buffer pool size" ); + "--buf-size", Arg.Set_int buf_size, " buffer size"; "--debug", Arg.Unit setup_logging, " enable debug"; "--max-conns", Arg.Set_int max_conn, " maximum concurrent connections"; ( "--backend", @@ -128,6 +143,7 @@ let () = let server = Nanoev_tiny_httpd.create ~new_thread:(Moonpool.run_async pool) ~port:!port_ + ?max_buf_pool_size:!max_buf_pool_size ~buf_size:!buf_size ~max_connections:!max_conn () in @@ -296,8 +312,13 @@ let () = Response.make_string ~headers:[ "content-type", "text/html" ] @@ Ok s); Printf.printf - "listening on http://%s:%d with %d threads, %d max connections\n%!" - (Server.addr server) (Server.port server) !j !max_conn; + "listening on http://%s:%d with %d threads, %d max connections, %d max fds\n\ + %!" + (Server.addr server) (Server.port server) !j !max_conn (Nanoev.max_fds ev); + + if Trace.enabled () then + ignore (Thread.create (emit_metrics_ pool server) () : Thread.t); + match Server.run server with | Ok () -> () | Error e -> raise e From 6e2d11384b5a1e43a631243e348894be321f6cc4 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 13:23:53 -0400 Subject: [PATCH 17/28] CI --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index fb8daa9..0a73e77 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -56,6 +56,6 @@ jobs: dune-cache: true allow-prerelease-opam: true - - run: opam install ocamlformat.0.26.2 + - run: opam install ocamlformat.0.27.0 - run: opam exec -- make format-check From 90311ad4fab675812a0d7e67d951b25b3236541c Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 13:32:37 -0400 Subject: [PATCH 18/28] format --- src/posix/heap.ml | 8 ++++---- src/unix/heap.ml | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/posix/heap.ml b/src/posix/heap.ml index a9a9c9e..1a553d3 100644 --- a/src/posix/heap.ml +++ b/src/posix/heap.ml @@ -22,10 +22,10 @@ open struct | E -> 0 | N (r, _, _, _) -> r - (** Make a balanced node labelled with [x], and subtrees [a] and [b]. - We ensure that the right child's rank is ≤ to the rank of the - left child (leftist property). The rank of the resulting node - is the length of the rightmost path. *) + (** Make a balanced node labelled with [x], and subtrees [a] and [b]. We + ensure that the right child's rank is ≤ to the rank of the left child + (leftist property). The rank of the resulting node is the length of the + rightmost path. *) let[@inline] mk_node_ x a b = if rank_ a >= rank_ b then N (rank_ b + 1, x, a, b) diff --git a/src/unix/heap.ml b/src/unix/heap.ml index a9a9c9e..1a553d3 100644 --- a/src/unix/heap.ml +++ b/src/unix/heap.ml @@ -22,10 +22,10 @@ open struct | E -> 0 | N (r, _, _, _) -> r - (** Make a balanced node labelled with [x], and subtrees [a] and [b]. - We ensure that the right child's rank is ≤ to the rank of the - left child (leftist property). The rank of the resulting node - is the length of the rightmost path. *) + (** Make a balanced node labelled with [x], and subtrees [a] and [b]. We + ensure that the right child's rank is ≤ to the rank of the left child + (leftist property). The rank of the resulting node is the length of the + rightmost path. *) let[@inline] mk_node_ x a b = if rank_ a >= rank_ b then N (rank_ b + 1, x, a, b) From 653fddb850fe4d9d6f0e04209b1969831db1cd31 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 23:20:49 -0400 Subject: [PATCH 19/28] feat(nanoev.picos): dep on iostream, add IO channels+Net --- dune-project | 18 ++- nanoev.opam | 1 + src/picos/IO_in.ml | 152 +++++++++++++++++++++++++ src/picos/IO_out.ml | 118 +++++++++++++++++++ src/picos/background_thread.ml | 7 ++ src/picos/background_thread.mli | 10 ++ src/picos/base.ml | 96 ++++++++++++++++ src/picos/base.mli | 37 ++++++ src/picos/common_.ml | 6 + src/picos/dune | 2 +- src/picos/global_.ml | 59 ++++++++++ src/picos/nanoev_picos.ml | 168 ++-------------------------- src/picos/nanoev_picos.mli | 49 ++------ src/picos/net_client.ml | 20 ++++ src/picos/net_server.ml | 48 ++++++++ src/picos/net_server.mli | 19 ++++ src/tiny_httpd/nanoev_tiny_httpd.ml | 2 +- 17 files changed, 605 insertions(+), 207 deletions(-) create mode 100644 src/picos/IO_in.ml create mode 100644 src/picos/IO_out.ml create mode 100644 src/picos/background_thread.ml create mode 100644 src/picos/background_thread.mli create mode 100644 src/picos/base.ml create mode 100644 src/picos/base.mli create mode 100644 src/picos/common_.ml create mode 100644 src/picos/global_.ml create mode 100644 src/picos/net_client.ml create mode 100644 src/picos/net_server.ml create mode 100644 src/picos/net_server.mli diff --git a/dune-project b/dune-project index ff2150c..4035343 100644 --- a/dune-project +++ b/dune-project @@ -20,9 +20,14 @@ (synopsis "Tiny event loop abstraction") (depends ocaml dune base-unix) (depopts - (trace (>= 0.7)) + (trace + (>= 0.7)) + (iostream + (>= 0.3)) (picos - (and (>= 0.5) (< 0.7)))) + (and + (>= 0.5) + (< 0.7)))) (tags (unix select async))) @@ -40,9 +45,12 @@ ocaml dune nanoev - (picos (>= 0.6)) + (picos + (>= 0.6)) picos_std - (tiny_httpd (>= 0.17))) - (tags (nanoev http))) + (tiny_httpd + (>= 0.17))) + (tags + (nanoev http))) ; See the complete stanza docs at https://dune.readthedocs.io/en/stable/reference/dune-project/index.html diff --git a/nanoev.opam b/nanoev.opam index 9fa7ba9..9aeadaa 100644 --- a/nanoev.opam +++ b/nanoev.opam @@ -15,6 +15,7 @@ depends: [ ] depopts: [ "trace" {>= "0.7"} + "iostream" {>= "0.3"} "picos" {>= "0.5" & < "0.7"} ] build: [ diff --git a/src/picos/IO_in.ml b/src/picos/IO_in.ml new file mode 100644 index 0000000..6184eba --- /dev/null +++ b/src/picos/IO_in.ml @@ -0,0 +1,152 @@ +open Common_ + +class type t = object + method input : bytes -> int -> int -> int + (** Read into the slice. Returns [0] only if the stream is closed. *) + + method close : unit -> unit + (** Close the input. Must be idempotent. *) +end + +let create ?(close = ignore) ~input () : t = + object + method close = close + method input = input + end + +let empty : t = + object + method close () = () + method input _ _ _ = 0 + end + +let of_bytes ?(off = 0) ?len (b : bytes) : t = + (* i: current position in [b] *) + let i = ref off in + + let len = + match len with + | Some n -> + if n > Bytes.length b - off then invalid_arg "Iostream.In.of_bytes"; + n + | None -> Bytes.length b - off + in + let end_ = off + len in + + object + method input b_out i_out len_out = + let n = min (end_ - !i) len_out in + Bytes.blit b !i b_out i_out n; + i := !i + n; + n + + method close () = i := end_ + end + +let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s) + +(** Read into the given slice. + @return the number of bytes read, [0] means end of input. *) +let[@inline] input (self : #t) buf i len = self#input buf i len + +(** Close the channel. *) +let[@inline] close self : unit = self#close () + +let rec really_input (self : #t) buf i len = + if len > 0 then ( + let n = input self buf i len in + if n = 0 then raise End_of_file; + (really_input [@tailrec]) self buf (i + n) (len - n) + ) + +let really_input_string self n : string = + let buf = Bytes.create n in + really_input self buf 0 n; + Bytes.unsafe_to_string buf + +let copy_into ?(buf = Bytes.create _default_buf_size) (ic : #t) (oc : IO_out.t) + : unit = + let continue = ref true in + while !continue do + let len = input ic buf 0 (Bytes.length buf) in + if len = 0 then + continue := false + else + IO_out.output oc buf 0 len + done + +let concat (l0 : t list) : t = + let l = ref l0 in + let rec input b i len : int = + match !l with + | [] -> 0 + | ic :: tl -> + let n = ic#input b i len in + if n > 0 then + n + else ( + l := tl; + input b i len + ) + in + let close () = List.iter close l0 in + create ~close ~input () + +let input_all ?(buf = Bytes.create 128) (self : #t) : string = + let buf = ref buf in + let i = ref 0 in + + let[@inline] full_ () = !i = Bytes.length !buf in + + let grow_ () = + let old_size = Bytes.length !buf in + let new_size = min Sys.max_string_length (old_size + (old_size / 4) + 10) in + if old_size = new_size then + failwith "input_all: maximum input size exceeded"; + let new_buf = Bytes.extend !buf 0 (new_size - old_size) in + buf := new_buf + in + + let rec loop () = + if full_ () then grow_ (); + let available = Bytes.length !buf - !i in + let n = input self !buf !i available in + if n > 0 then ( + i := !i + n; + (loop [@tailrec]) () + ) + in + loop (); + + if full_ () then + Bytes.unsafe_to_string !buf + else + Bytes.sub_string !buf 0 !i + +let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) + (fd : Unix.file_descr) : t = + let buf_len = ref 0 in + let buf_off = ref 0 in + + let refill () = + buf_off := 0; + buf_len := Base.read fd buf 0 (Bytes.length buf) + in + + object + method input b i len : int = + if !buf_len = 0 then refill (); + let n = min len !buf_len in + if n > 0 then ( + Bytes.blit buf !buf_off b i n; + buf_off := !buf_off + n; + buf_len := !buf_len - n + ); + n + + method close () = + if close_noerr then ( + try Unix.close fd with _ -> () + ) else + Unix.close fd + end diff --git a/src/picos/IO_out.ml b/src/picos/IO_out.ml new file mode 100644 index 0000000..b98525f --- /dev/null +++ b/src/picos/IO_out.ml @@ -0,0 +1,118 @@ +open Common_ + +class type t = object + method output_char : char -> unit + method output : bytes -> int -> int -> unit + method flush : unit -> unit + method close : unit -> unit +end + +let create ?(flush = ignore) ?(close = ignore) ~output_char ~output () : t = + object + method flush () = flush () + method close () = close () + method output_char c = output_char c + method output bs i len = output bs i len + end + +let dummy : t = + object + method flush () = () + method close () = () + method output_char _ = () + method output _ _ _ = () + end + +let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) fd + : t = + let buf_off = ref 0 in + + let[@inline] is_full () = !buf_off = Bytes.length buf in + + let flush () = + if !buf_off > 0 then ( + Base.write fd buf 0 !buf_off; + buf_off := 0 + ) + in + + object + method output_char c = + if is_full () then flush (); + Bytes.set buf !buf_off c; + incr buf_off + + method output bs i len : unit = + let i = ref i in + let len = ref len in + + while !len > 0 do + (* make space *) + if is_full () then flush (); + + let n = min !len (Bytes.length buf - !buf_off) in + Bytes.blit bs !i buf !buf_off n; + buf_off := !buf_off + n; + i := !i + n; + len := !len - n + done; + (* if full, write eagerly *) + if is_full () then flush () + + method close () = + if close_noerr then ( + try + flush (); + Unix.close fd + with _ -> () + ) else ( + flush (); + Unix.close fd + ) + + method flush = flush + end + +let of_buffer (buf : Buffer.t) : t = + object + method close () = () + method flush () = () + method output_char c = Buffer.add_char buf c + method output bs i len = Buffer.add_subbytes buf bs i len + end + +(** Output the buffer slice into this channel *) +let[@inline] output_char (self : #t) c : unit = self#output_char c + +(** Output the buffer slice into this channel *) +let[@inline] output (self : #t) buf i len : unit = self#output buf i len + +let[@inline] output_string (self : #t) (str : string) : unit = + self#output (Bytes.unsafe_of_string str) 0 (String.length str) + +let output_line (self : #t) (str : string) : unit = + output_string self str; + output_char self '\n' + +(** Close the channel. *) +let[@inline] close self : unit = self#close () + +(** Flush (ie. force write) any buffered bytes. *) +let[@inline] flush self : unit = self#flush () + +let output_int self i = + let s = string_of_int i in + output_string self s + +let output_lines self seq = Seq.iter (output_line self) seq + +let tee (l : t list) : t = + match l with + | [] -> dummy + | [ oc ] -> oc + | _ -> + let output bs i len = List.iter (fun oc -> output oc bs i len) l in + let output_char c = List.iter (fun oc -> output_char oc c) l in + let close () = List.iter close l in + let flush () = List.iter flush l in + create ~flush ~close ~output ~output_char () diff --git a/src/picos/background_thread.ml b/src/picos/background_thread.ml new file mode 100644 index 0000000..261daad --- /dev/null +++ b/src/picos/background_thread.ml @@ -0,0 +1,7 @@ +let is_setup = Global_.has_bg_thread +let setup = Global_.setup_bg_thread +let shutdown = Global_.shutdown_bg_thread + +let with_setup ev f = + setup ev; + Fun.protect ~finally:shutdown f diff --git a/src/picos/background_thread.mli b/src/picos/background_thread.mli new file mode 100644 index 0000000..6e4a531 --- /dev/null +++ b/src/picos/background_thread.mli @@ -0,0 +1,10 @@ +val setup : Nanoev.t -> unit +(** Install this event loop in a background thread *) + +val shutdown : unit -> unit +(** Shutdown background thread, assuming {! is_setup} returns [true] *) + +val with_setup : Nanoev.t -> (unit -> 'a) -> 'a + +val is_setup : unit -> bool +(** [is_setup()] is [true] iff a background thread is running a nanoev loop *) diff --git a/src/picos/base.ml b/src/picos/base.ml new file mode 100644 index 0000000..a34577f --- /dev/null +++ b/src/picos/base.ml @@ -0,0 +1,96 @@ +open Common_ + +let[@inline] get_loop_exn_ () : Nanoev.t = + match Atomic.get Global_.st with + | None -> failwith "No nanoev loop installed." + | Some st -> st.nanoev + +let[@inline] unwrap_ = function + | None -> () + | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt + +let[@unroll 1] rec retry_read_ fd f = + match f () with + | res -> res + | exception + Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> + (* Trace_.message "read must wait"; *) + let trigger = Picos.Trigger.create () in + let closed_r = ref false in + let ev = get_loop_exn_ () in + Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r -> + closed_r := closed; + Picos.Trigger.signal trigger); + Picos.Trigger.await trigger |> unwrap_; + if !closed_r then raise Closed; + retry_read_ fd f + +let[@unroll 1] rec retry_write_ fd f = + match f () with + | res -> res + | exception + Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> + (* Trace_.message "write must wait"; *) + let ev = get_loop_exn_ () in + let trigger = Picos.Trigger.create () in + let closed_r = ref false in + Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r -> + closed_r := closed; + Picos.Trigger.signal trigger); + Picos.Trigger.await trigger |> unwrap_; + if !closed_r then raise Closed; + retry_write_ fd f + +let read fd buf i len : int = + try + retry_read_ fd (fun () -> + (* Trace_.message "read"; *) + Unix.read fd buf i len) + with Closed -> 0 + +let close fd = + Unix.close fd; + let ev = get_loop_exn_ () in + Nanoev.close ev fd + +let accept fd = + try + retry_read_ fd (fun () -> + (* Trace_.message "accept"; *) + Unix.accept fd) + with Unix.Unix_error ((Unix.ESHUTDOWN | Unix.ECONNABORTED), _, _) -> + raise Closed + +let write_once fd buf i len : int = + try + retry_write_ fd (fun () -> + (* Trace_.message "write"; *) + Unix.write fd buf i len) + with Closed -> 0 + +let rec write fd buf i len = + if len > 0 then ( + let n = write_once fd buf i len in + if n < len then write fd buf (i + n) (len - n) + ) + +let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr) + +let[@inline] max_fds () = + match Atomic.get Global_.st with + | None -> 1024 + | Some st -> Nanoev.max_fds st.nanoev + +let sleep t = + if t > 0. then ( + let ev = get_loop_exn_ () in + let trigger = Picos.Trigger.create () in + Nanoev.run_after_s ev t trigger () (fun trigger () -> + Picos.Trigger.signal trigger); + Picos.Trigger.await trigger |> unwrap_ + ) + +module Raw = struct + let retry_read = retry_read_ + let retry_write = retry_write_ +end diff --git a/src/picos/base.mli b/src/picos/base.mli new file mode 100644 index 0000000..7b949e9 --- /dev/null +++ b/src/picos/base.mli @@ -0,0 +1,37 @@ +val read : Unix.file_descr -> bytes -> int -> int -> int +(** Read from the non blocking FD. + @raise Nanoev.Closed if the FD is closed + @raise Unix.Unix_error for other errors *) + +val write_once : Unix.file_descr -> bytes -> int -> int -> int +(** Write into the non blocking FD. + @raise Nanoev.Closed if the FD is closed + @raise Unix.Unix_error for other errors *) + +val write : Unix.file_descr -> bytes -> int -> int -> unit + +val close : Unix.file_descr -> unit +(** Close the file descriptor + @raise Unix.Unix_error when it fails *) + +val connect : Unix.file_descr -> Unix.sockaddr -> unit +(** Connect this FD to the remote address. + @raise Nanoev.Closed if the FD is closed. + @raise Unix.Unix_error for other errors *) + +val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr +(** Accept a connection on this fd. + @raise Nanoev.Closed if the FD is closed. + @raise Unix.Unix_error for other errors *) + +val max_fds : unit -> int +(** Maximum number of file descriptors one can await on. See {!Nanoev.max_fds} +*) + +val sleep : float -> unit +(** Suspend current fiber for [n] seconds *) + +module Raw : sig + val retry_read : Unix.file_descr -> (unit -> 'a) -> 'a + val retry_write : Unix.file_descr -> (unit -> 'a) -> 'a +end diff --git a/src/picos/common_.ml b/src/picos/common_.ml new file mode 100644 index 0000000..d35dd22 --- /dev/null +++ b/src/picos/common_.ml @@ -0,0 +1,6 @@ +module Trace_ = Nanoev.Trace_ + +let ( let@ ) = ( @@ ) +let _default_buf_size = 4 * 1024 + +exception Closed = Nanoev.Closed diff --git a/src/picos/dune b/src/picos/dune index db9792d..fb37e29 100644 --- a/src/picos/dune +++ b/src/picos/dune @@ -2,4 +2,4 @@ (name nanoev_picos) (public_name nanoev.picos) (optional) ; picos - (libraries threads picos nanoev)) + (libraries threads picos iostream nanoev)) diff --git a/src/picos/global_.ml b/src/picos/global_.ml new file mode 100644 index 0000000..2e590b8 --- /dev/null +++ b/src/picos/global_.ml @@ -0,0 +1,59 @@ +open Common_ + +type st = + | None + | Some of { + active: bool Atomic.t; + nanoev: Nanoev.t; + th: Thread.t; + } + +let st : st Atomic.t = Atomic.make None +let lock = Mutex.create () + +let with_lock lock f = + Mutex.lock lock; + match f () with + | exception e -> + Mutex.unlock lock; + raise e + | x -> + Mutex.unlock lock; + x + +let bg_thread_ ~active ~evloop () : unit = + Trace_.set_thread_name "nanoev.picos.bg-thread"; + while Atomic.get active do + Nanoev.step evloop + done + +let[@inline] has_bg_thread () = Atomic.get st <> None + +let setup_bg_thread (ev : Nanoev.t) : unit = + let@ () = with_lock lock in + (* shutdown existing thread, if any *) + (match Atomic.get st with + | Some st -> + Atomic.set st.active false; + Nanoev.wakeup_from_outside st.nanoev; + Thread.join st.th + | None -> ()); + + (* start new bg thread *) + let active = Atomic.make true in + Atomic.set st + @@ Some + { + active; + nanoev = ev; + th = Thread.create (bg_thread_ ~active ~evloop:ev) (); + } + +let shutdown_bg_thread () = + let@ () = with_lock lock in + match Atomic.exchange st None with + | None -> () + | Some st -> + Atomic.set st.active false; + Nanoev.wakeup_from_outside st.nanoev; + Thread.join st.th diff --git a/src/picos/nanoev_picos.ml b/src/picos/nanoev_picos.ml index a11c89f..8029fda 100644 --- a/src/picos/nanoev_picos.ml +++ b/src/picos/nanoev_picos.ml @@ -1,161 +1,7 @@ -open struct - module Trace_ = Nanoev.Trace_ - - let ( let@ ) = ( @@ ) -end - -exception Closed = Nanoev.Closed - -module Global_ = struct - type st = - | None - | Some of { - active: bool Atomic.t; - nanoev: Nanoev.t; - th: Thread.t; - } - - let st : st Atomic.t = Atomic.make None - let lock = Mutex.create () - - let with_lock lock f = - Mutex.lock lock; - match f () with - | exception e -> - Mutex.unlock lock; - raise e - | x -> - Mutex.unlock lock; - x - - let bg_thread_ ~active ~evloop () : unit = - Trace_.set_thread_name "nanoev.picos.bg-thread"; - while Atomic.get active do - Nanoev.step evloop - done - - let[@inline] has_bg_thread () = Atomic.get st <> None - - let setup_bg_thread (ev : Nanoev.t) : unit = - let@ () = with_lock lock in - (* shutdown existing thread, if any *) - (match Atomic.get st with - | Some st -> - Atomic.set st.active false; - Nanoev.wakeup_from_outside st.nanoev; - Thread.join st.th - | None -> ()); - - (* start new bg thread *) - let active = Atomic.make true in - Atomic.set st - @@ Some - { - active; - nanoev = ev; - th = Thread.create (bg_thread_ ~active ~evloop:ev) (); - } - - let shutdown_bg_thread () = - let@ () = with_lock lock in - match Atomic.exchange st None with - | None -> () - | Some st -> - Atomic.set st.active false; - Nanoev.wakeup_from_outside st.nanoev; - Thread.join st.th -end - -module Background_thread = struct - let is_setup = Global_.has_bg_thread - let setup = Global_.setup_bg_thread - let shutdown = Global_.shutdown_bg_thread - - let with_setup ev f = - setup ev; - Fun.protect ~finally:shutdown f -end - -let[@inline] get_loop_exn_ () : Nanoev.t = - match Atomic.get Global_.st with - | None -> failwith "No nanoev loop installed." - | Some st -> st.nanoev - -let[@inline] unwrap_ = function - | None -> () - | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt - -let[@unroll 1] rec retry_read_ fd f = - match f () with - | res -> res - | exception - Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> - (* Trace_.message "read must wait"; *) - let trigger = Picos.Trigger.create () in - let closed_r = ref false in - let ev = get_loop_exn_ () in - Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r -> - closed_r := closed; - Picos.Trigger.signal trigger); - Picos.Trigger.await trigger |> unwrap_; - if !closed_r then raise Closed; - retry_read_ fd f - -let[@unroll 1] rec retry_write_ fd f = - match f () with - | res -> res - | exception - Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> - (* Trace_.message "write must wait"; *) - let ev = get_loop_exn_ () in - let trigger = Picos.Trigger.create () in - let closed_r = ref false in - Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r -> - closed_r := closed; - Picos.Trigger.signal trigger); - Picos.Trigger.await trigger |> unwrap_; - if !closed_r then raise Closed; - retry_write_ fd f - -let read fd buf i len : int = - try - retry_read_ fd (fun () -> - (* Trace_.message "read"; *) - Unix.read fd buf i len) - with Closed -> 0 - -let close fd = - Unix.close fd; - let ev = get_loop_exn_ () in - Nanoev.close ev fd - -let accept fd = - try - retry_read_ fd (fun () -> - (* Trace_.message "accept"; *) - Unix.accept fd) - with Unix.Unix_error ((Unix.ESHUTDOWN | Unix.ECONNABORTED), _, _) -> - raise Closed - -let write fd buf i len : int = - try - retry_write_ fd (fun () -> - (* Trace_.message "write"; *) - Unix.write fd buf i len) - with Closed -> 0 - -let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr) - -let[@inline] max_fds () = - match Atomic.get Global_.st with - | None -> 1024 - | Some st -> Nanoev.max_fds st.nanoev - -let sleep t = - if t > 0. then ( - let ev = get_loop_exn_ () in - let trigger = Picos.Trigger.create () in - Nanoev.run_after_s ev t trigger () (fun trigger () -> - Picos.Trigger.signal trigger); - Picos.Trigger.await trigger |> unwrap_ - ) +module Background_thread = Background_thread +module Base = Base +include Base +module IO_in = IO_in +module IO_out = IO_out +module Net_client = Net_client +module Net_server = Net_server diff --git a/src/picos/nanoev_picos.mli b/src/picos/nanoev_picos.mli index 27310dc..c89e3a5 100644 --- a/src/picos/nanoev_picos.mli +++ b/src/picos/nanoev_picos.mli @@ -1,47 +1,18 @@ (** Basic interface with picos *) -module Background_thread : sig - val setup : Nanoev.t -> unit - (** Install this event loop in a background thread *) - - val shutdown : unit -> unit - (** Shutdown background thread, assuming {! is_setup} returns [true] *) - - val with_setup : Nanoev.t -> (unit -> 'a) -> 'a - - val is_setup : unit -> bool - (** [is_setup()] is [true] iff a background thread is running a nanoev loop *) -end +module Background_thread = Background_thread (** {2 Non blocking IO primitives} *) -val read : Unix.file_descr -> bytes -> int -> int -> int -(** Read from the non blocking FD. - @raise Nanoev.Closed if the FD is closed - @raise Unix.Unix_error for other errors *) +module Base = Base -val write : Unix.file_descr -> bytes -> int -> int -> int -(** Write into the non blocking FD. - @raise Nanoev.Closed if the FD is closed - @raise Unix.Unix_error for other errors *) +include module type of struct + include Base +end -val close : Unix.file_descr -> unit -(** Close the file descriptor - @raise Unix.Unix_error when it fails *) +(** {2 Building blocks on top of {!Base}} *) -val connect : Unix.file_descr -> Unix.sockaddr -> unit -(** Connect this FD to the remote address. - @raise Nanoev.Closed if the FD is closed. - @raise Unix.Unix_error for other errors *) - -val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr -(** Accept a connection on this fd. - @raise Nanoev.Closed if the FD is closed. - @raise Unix.Unix_error for other errors *) - -val max_fds : unit -> int -(** Maximum number of file descriptors one can await on. See {!Nanoev.max_fds} -*) - -val sleep : float -> unit -(** Suspend current fiber for [n] seconds *) +module IO_in = IO_in +module IO_out = IO_out +module Net_client = Net_client +module Net_server = Net_server diff --git a/src/picos/net_client.ml b/src/picos/net_client.ml new file mode 100644 index 0000000..1bb3cf5 --- /dev/null +++ b/src/picos/net_client.ml @@ -0,0 +1,20 @@ +open Common_ + +let connect addr : Unix.file_descr = + let sock = Unix.socket (Unix.domain_of_sockaddr addr) Unix.SOCK_STREAM 0 in + Unix.set_nonblock sock; + (try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ()); + + (* connect asynchronously *) + Base.Raw.retry_write sock (fun () -> Unix.connect sock addr); + sock + +let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a = + let sock = connect addr in + + let ic = IO_in.of_unix_fd sock in + let oc = IO_out.of_unix_fd sock in + + let finally () = try Unix.close sock with _ -> () in + let@ () = Fun.protect ~finally in + f ic oc diff --git a/src/picos/net_server.ml b/src/picos/net_server.ml new file mode 100644 index 0000000..8c91799 --- /dev/null +++ b/src/picos/net_server.ml @@ -0,0 +1,48 @@ +type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit + +type t = { + active: bool Atomic.t; + sock: Unix.file_descr; + client_handler: client_handler; + spawn: (unit -> unit) -> unit Picos.Computation.t; + mutable running: unit Picos.Computation.t option; +} + +let shutdown (self : t) = + if Atomic.exchange self.active false then + Option.iter Picos.Computation.await self.running + +open struct + let run (self : t) () : unit = + while Atomic.get self.active do + let client_sock, client_addr = Base.accept self.sock in + let comp = + self.spawn (fun () -> + let ic = IO_in.of_unix_fd client_sock in + let oc = IO_out.of_unix_fd client_sock in + self.client_handler client_addr ic oc) + in + ignore (comp : _ Picos.Computation.t) + done +end + +let establish ?(backlog = 32) ~spawn ~(client_handler : client_handler) addr : t + = + let domain = Unix.domain_of_sockaddr addr in + let sock = Unix.socket domain Unix.SOCK_STREAM 0 in + Unix.bind sock addr; + Unix.listen sock backlog; + Unix.set_nonblock sock; + Unix.setsockopt sock Unix.SO_REUSEADDR true; + (try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ()); + + let server = + { active = Atomic.make true; spawn; sock; client_handler; running = None } + in + + server.running <- Some (spawn (run server)); + server + +let with_ ?backlog ~spawn ~client_handler addr f = + let server = establish ?backlog ~spawn ~client_handler addr in + Fun.protect ~finally:(fun () -> shutdown server) (fun () -> f server) diff --git a/src/picos/net_server.mli b/src/picos/net_server.mli new file mode 100644 index 0000000..7432335 --- /dev/null +++ b/src/picos/net_server.mli @@ -0,0 +1,19 @@ +type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit +type t + +val shutdown : t -> unit + +val establish : + ?backlog:int -> + spawn:((unit -> unit) -> unit Picos.Computation.t) -> + client_handler:client_handler -> + Unix.sockaddr -> + t + +val with_ : + ?backlog:int -> + spawn:((unit -> unit) -> unit Picos.Computation.t) -> + client_handler:client_handler -> + Unix.sockaddr -> + (t -> 'a) -> + 'a diff --git a/src/tiny_httpd/nanoev_tiny_httpd.ml b/src/tiny_httpd/nanoev_tiny_httpd.ml index 46314e8..b637122 100644 --- a/src/tiny_httpd/nanoev_tiny_httpd.ml +++ b/src/tiny_httpd/nanoev_tiny_httpd.ml @@ -23,7 +23,7 @@ module Out = struct let i = ref i in let len = ref len0 in while !len > 0 do - match EV.write fd bs !i !len with + match EV.write_once fd bs !i !len with | 0 -> failwith "write failed" | n -> i := !i + n; From 8077a7d493f22235093a290b46e07a4847a4563c Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 23:22:17 -0400 Subject: [PATCH 20/28] wip: tests --- dune-project | 11 ++++++++++- nanoev-posix.opam | 3 +++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/dune-project b/dune-project index 4035343..f0a6568 100644 --- a/dune-project +++ b/dune-project @@ -34,7 +34,16 @@ (package (name nanoev-posix) (synopsis "Use mtime+iomux (posix compliant) as a backend for nanoev") - (depends ocaml dune base-unix iomux (mtime (>= 2.0))) + (depends + ocaml + dune + base-unix + iomux + (mtime + (>= 2.0)) + (moonpool :with-test) + (trace :with-test) + (trace-tef :with-test)) (tags (unix select async iomux nanoev))) diff --git a/nanoev-posix.opam b/nanoev-posix.opam index d113361..8b82637 100644 --- a/nanoev-posix.opam +++ b/nanoev-posix.opam @@ -13,6 +13,9 @@ depends: [ "base-unix" "iomux" "mtime" {>= "2.0"} + "moonpool" {with-test} + "trace" {with-test} + "trace-tef" {with-test} "odoc" {with-doc} ] build: [ From c8d88e388740f9672775688de100d68b924d5870 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 23:30:00 -0400 Subject: [PATCH 21/28] wip: echo test --- tests/posix/echo/dune | 4 ++ tests/posix/echo/echo_client.ml | 83 +++++++++++++++++++++++++++++++++ tests/posix/echo/echo_server.ml | 60 ++++++++++++++++++++++++ 3 files changed, 147 insertions(+) create mode 100644 tests/posix/echo/dune create mode 100644 tests/posix/echo/echo_client.ml create mode 100644 tests/posix/echo/echo_server.ml diff --git a/tests/posix/echo/dune b/tests/posix/echo/dune new file mode 100644 index 0000000..3c94762 --- /dev/null +++ b/tests/posix/echo/dune @@ -0,0 +1,4 @@ +(executables + (names echo_server echo_client) + (libraries moonpool moonpool.fib nanoev.picos nanoev-posix iostream + trace.core trace-tef)) diff --git a/tests/posix/echo/echo_client.ml b/tests/posix/echo/echo_client.ml new file mode 100644 index 0000000..133d397 --- /dev/null +++ b/tests/posix/echo/echo_client.ml @@ -0,0 +1,83 @@ +module Trace = Trace_core +module F = Moonpool_fib +module IO = Nanoev_picos + +[@@@ocaml.alert "-deprecated"] + +let ( let@ ) = ( @@ ) +let pf = Printf.printf +let verbose = ref false + +let main ~port ~n ~n_conn () = + pf "connect on localhost:%d n=%d n_conn=%d\n%!" port n n_conn; + + let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in + + let remaining = Atomic.make n in + let all_done = Atomic.make 0 in + + Printf.printf "connecting to port %d\n%!" port; + + let rec run_task () = + let n = Atomic.fetch_and_add remaining (-1) in + let _task_sp = + Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "run-task" + ~data:(fun () -> [ "n", `Int n ]) + in + if n > 0 then ( + ( (* let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "connect.client" in *) + IO.Net_client.with_connect addr + @@ fun ic oc -> + let buf = Bytes.create 32 in + + for _j = 1 to 100 do + let _sp = + Trace.enter_manual_sub_span ~parent:(Trace.ctx_of_span _task_sp) + ~__FILE__ ~__LINE__ "write.loop" ~data:(fun () -> + [ "iter", `Int _j ]) + in + Iostream.Out.output_string oc "hello"; + Iostream.Out_buf.flush oc; + + (* read back what we wrote *) + Iostream.In.really_input ic buf 0 (String.length "hello"); + Trace.exit_manual_span _sp; + F.yield () + done ); + + (* run another task *) + F.spawn_ignore run_task + ) else ( + (* if we're the last to exit, resolve the promise *) + let n_already_done = Atomic.fetch_and_add all_done 1 in + if n_already_done = n_conn - 1 then Printf.printf "all done\n%!" + ); + Trace.exit_manual_span _task_sp + in + + (* start the first [n_conn] tasks *) + let fibers = List.init n_conn (fun _ -> F.spawn run_task) in + List.iter F.await fibers; + + (* exit when [fut_exit] is resolved *) + Printf.printf "done with main\n%!" + +let () = + let@ () = Trace_tef.with_setup () in + Trace.set_thread_name "main"; + + let port = ref 1234 in + let n = ref 1000 in + let n_conn = ref 20 in + let opts = + [ + "-p", Arg.Set_int port, " port"; + "-v", Arg.Set verbose, " verbose"; + "-n", Arg.Set_int n, " number of iterations"; + "--n-conn", Arg.Set_int n_conn, " number of simultaneous connections"; + ] + |> Arg.align + in + Arg.parse opts ignore "echo_client"; + + F.main @@ fun _runner -> main ~port:!port ~n:!n ~n_conn:!n_conn () diff --git a/tests/posix/echo/echo_server.ml b/tests/posix/echo/echo_server.ml new file mode 100644 index 0000000..1e5e291 --- /dev/null +++ b/tests/posix/echo/echo_server.ml @@ -0,0 +1,60 @@ +module F = Moonpool_fib +module IO = Nanoev_picos +module Trace = Trace_core + +[@@@ocaml.alert "-deprecated"] + +let ( let@ ) = ( @@ ) +let pf = Printf.printf +let spf = Printf.sprintf +let verbose = ref false + +let str_of_sockaddr = function + | Unix.ADDR_UNIX s -> s + | Unix.ADDR_INET (addr, port) -> + spf "%s:%d" (Unix.string_of_inet_addr addr) port + +let main ~port ~runner () = + pf "serve on localhost:%d\n%!" port; + + let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in + let server = + IO.Net_server.establish addr + ~spawn:(fun f -> Moonpool.spawn ~on:runner f) + ~client_handler:(fun client_addr ic oc -> + let _sp = + Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "serve" + in + + if !verbose then + pf "handle client on %s\n%!" (str_of_sockaddr client_addr); + + let buf = Bytes.create 256 in + let continue = ref true in + while !continue do + let n = Iostream.In.input ic buf 0 (Bytes.length buf) in + if n = 0 then + continue := false + else ( + Iostream.Out.output oc buf 0 n; + Iostream.Out_buf.flush oc + ) + done; + + Trace.exit_manual_span _sp; + if !verbose then + pf "done with client on %s\n%!" (str_of_sockaddr client_addr)) + in + IO.Net_server.shutdown server; + print_endline "exit" + +let () = + let@ () = Trace_tef.with_setup () in + let port = ref 1234 in + let opts = + [ "-p", Arg.Set_int port, " port"; "-v", Arg.Set verbose, " verbose" ] + |> Arg.align + in + Arg.parse opts ignore "echo_server"; + + F.main @@ fun runner -> main ~port:!port ~runner () From 299dd9dddb9c2d42e2ae7600ea87b1264b3485e4 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 2 May 2025 00:27:11 -0400 Subject: [PATCH 22/28] improve server --- src/picos/base.ml | 12 ++++++++++-- src/picos/net_server.ml | 5 ++--- src/picos/net_server.mli | 1 + 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/picos/base.ml b/src/picos/base.ml index a34577f..142e467 100644 --- a/src/picos/base.ml +++ b/src/picos/base.ml @@ -13,7 +13,11 @@ let[@unroll 1] rec retry_read_ fd f = match f () with | res -> res | exception - Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> + Unix.Unix_error + ( ( Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR | Unix.EINPROGRESS + | Unix.ECONNRESET ), + _, + _ ) -> (* Trace_.message "read must wait"; *) let trigger = Picos.Trigger.create () in let closed_r = ref false in @@ -29,7 +33,11 @@ let[@unroll 1] rec retry_write_ fd f = match f () with | res -> res | exception - Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> + Unix.Unix_error + ( ( Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR | Unix.EINPROGRESS + | Unix.ECONNRESET ), + _, + _ ) -> (* Trace_.message "write must wait"; *) let ev = get_loop_exn_ () in let trigger = Picos.Trigger.create () in diff --git a/src/picos/net_server.ml b/src/picos/net_server.ml index 8c91799..ca74b0c 100644 --- a/src/picos/net_server.ml +++ b/src/picos/net_server.ml @@ -8,9 +8,8 @@ type t = { mutable running: unit Picos.Computation.t option; } -let shutdown (self : t) = - if Atomic.exchange self.active false then - Option.iter Picos.Computation.await self.running +let join (self : t) : unit = Option.iter Picos.Computation.await self.running +let shutdown (self : t) = if Atomic.exchange self.active false then () open struct let run (self : t) () : unit = diff --git a/src/picos/net_server.mli b/src/picos/net_server.mli index 7432335..178fc0d 100644 --- a/src/picos/net_server.mli +++ b/src/picos/net_server.mli @@ -1,6 +1,7 @@ type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit type t +val join : t -> unit val shutdown : t -> unit val establish : From caeae5794c63ae131866520ac222dbd5dfd0b843 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 2 May 2025 00:27:21 -0400 Subject: [PATCH 23/28] wip: improve echo server/client --- tests/posix/echo/echo_client.ml | 25 +++++++++++++++++++++---- tests/posix/echo/echo_server.ml | 29 ++++++++++++++++++++++++----- 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/tests/posix/echo/echo_client.ml b/tests/posix/echo/echo_client.ml index 133d397..5c8db61 100644 --- a/tests/posix/echo/echo_client.ml +++ b/tests/posix/echo/echo_client.ml @@ -5,13 +5,24 @@ module IO = Nanoev_picos [@@@ocaml.alert "-deprecated"] let ( let@ ) = ( @@ ) +let spf = Printf.sprintf let pf = Printf.printf let verbose = ref false -let main ~port ~n ~n_conn () = - pf "connect on localhost:%d n=%d n_conn=%d\n%!" port n n_conn; +let main ~port ~unix_sock ~n ~n_conn () = + pf "connect on %s n=%d n_conn=%d\n%!" + (if unix_sock = "" then + spf "localhost:%d" port + else + spf "unix:%S" unix_sock) + n n_conn; - let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in + let addr = + if unix_sock = "" then + Unix.ADDR_INET (Unix.inet_addr_loopback, port) + else + Unix.ADDR_UNIX unix_sock + in let remaining = Atomic.make n in let all_done = Atomic.make 0 in @@ -67,6 +78,7 @@ let () = Trace.set_thread_name "main"; let port = ref 1234 in + let unix_sock = ref "" in let n = ref 1000 in let n_conn = ref 20 in let opts = @@ -74,10 +86,15 @@ let () = "-p", Arg.Set_int port, " port"; "-v", Arg.Set verbose, " verbose"; "-n", Arg.Set_int n, " number of iterations"; + "--unix", Arg.Set_string unix_sock, " unix socket"; "--n-conn", Arg.Set_int n_conn, " number of simultaneous connections"; ] |> Arg.align in Arg.parse opts ignore "echo_client"; - F.main @@ fun _runner -> main ~port:!port ~n:!n ~n_conn:!n_conn () + let@ () = + Nanoev_picos.Background_thread.with_setup (Nanoev_posix.create ()) + in + F.main @@ fun _runner -> + main ~port:!port ~unix_sock:!unix_sock ~n:!n ~n_conn:!n_conn () diff --git a/tests/posix/echo/echo_server.ml b/tests/posix/echo/echo_server.ml index 1e5e291..67405f4 100644 --- a/tests/posix/echo/echo_server.ml +++ b/tests/posix/echo/echo_server.ml @@ -14,10 +14,20 @@ let str_of_sockaddr = function | Unix.ADDR_INET (addr, port) -> spf "%s:%d" (Unix.string_of_inet_addr addr) port -let main ~port ~runner () = - pf "serve on localhost:%d\n%!" port; +let main ~port ~unix_sock ~runner () = + pf "serve on %s\n%!" + (if unix_sock = "" then + spf "localhost:%d" port + else + spf "unix:%S" unix_sock); + + let addr = + if unix_sock = "" then + Unix.ADDR_INET (Unix.inet_addr_loopback, port) + else + Unix.ADDR_UNIX unix_sock + in - let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in let server = IO.Net_server.establish addr ~spawn:(fun f -> Moonpool.spawn ~on:runner f) @@ -45,16 +55,25 @@ let main ~port ~runner () = if !verbose then pf "done with client on %s\n%!" (str_of_sockaddr client_addr)) in + IO.Net_server.join server; IO.Net_server.shutdown server; print_endline "exit" let () = let@ () = Trace_tef.with_setup () in let port = ref 1234 in + let unix_sock = ref "" in let opts = - [ "-p", Arg.Set_int port, " port"; "-v", Arg.Set verbose, " verbose" ] + [ + "-p", Arg.Set_int port, " port"; + "--unix", Arg.Set_string unix_sock, " unix socket"; + "-v", Arg.Set verbose, " verbose"; + ] |> Arg.align in Arg.parse opts ignore "echo_server"; - F.main @@ fun runner -> main ~port:!port ~runner () + let@ () = + Nanoev_picos.Background_thread.with_setup (Nanoev_posix.create ()) + in + F.main @@ fun runner -> main ~port:!port ~unix_sock:!unix_sock ~runner () From 1dcadb34707bfd6b4ec34d183a71de1a5efca350 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 2 May 2025 13:47:40 -0400 Subject: [PATCH 24/28] add nanoev-picos as a package, also using picos_std --- dune-project | 26 +++++++++++-- nanoev-picos.opam | 34 +++++++++++++++++ nanoev-posix.opam | 2 + nanoev.opam | 2 - nanoev_tiny_httpd.opam | 5 ++- src/picos/dune | 5 +-- src/picos/net_client.ml | 7 +++- src/picos/net_server.ml | 80 ++++++++++++++++++++++++++++++++++++---- src/picos/net_server.mli | 17 +++++++++ 9 files changed, 157 insertions(+), 21 deletions(-) create mode 100644 nanoev-picos.opam diff --git a/dune-project b/dune-project index f0a6568..b9b1aca 100644 --- a/dune-project +++ b/dune-project @@ -21,10 +21,26 @@ (depends ocaml dune base-unix) (depopts (trace - (>= 0.7)) + (>= 0.7))) + (tags + (unix select async))) + +(package + (name nanoev-picos) + (synopsis "Use nanoev from picos") + (depends + ocaml + dune + base-unix + (nanoev + (= :version)) (iostream (>= 0.3)) (picos + (and + (>= 0.5) + (< 0.7))) + (picos_std (and (>= 0.5) (< 0.7)))) @@ -39,6 +55,8 @@ dune base-unix iomux + (nanoev (= :version)) + (nanoev-picos (= :version)) (mtime (>= 2.0)) (moonpool :with-test) @@ -53,9 +71,9 @@ (depends ocaml dune - nanoev - (picos - (>= 0.6)) + (nanoev (= :version)) + (nanoev-picos (= :version)) + picos picos_std (tiny_httpd (>= 0.17))) diff --git a/nanoev-picos.opam b/nanoev-picos.opam new file mode 100644 index 0000000..44e1230 --- /dev/null +++ b/nanoev-picos.opam @@ -0,0 +1,34 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Use nanoev from picos" +maintainer: ["Simon Cruanes"] +authors: ["Simon Cruanes"] +license: "MIT" +tags: ["unix" "select" "async"] +homepage: "https://github.com/c-cube/nanoev" +bug-reports: "https://github.com/c-cube/nanoev/issues" +depends: [ + "ocaml" + "dune" {>= "2.7"} + "base-unix" + "nanoev" {= version} + "iostream" {>= "0.3"} + "picos" {>= "0.5" & < "0.7"} + "picos_std" {>= "0.5" & < "0.7"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/c-cube/nanoev.git" diff --git a/nanoev-posix.opam b/nanoev-posix.opam index 8b82637..b9df267 100644 --- a/nanoev-posix.opam +++ b/nanoev-posix.opam @@ -12,6 +12,8 @@ depends: [ "dune" {>= "2.7"} "base-unix" "iomux" + "nanoev" {= version} + "nanoev-picos" {= version} "mtime" {>= "2.0"} "moonpool" {with-test} "trace" {with-test} diff --git a/nanoev.opam b/nanoev.opam index 9aeadaa..8268f40 100644 --- a/nanoev.opam +++ b/nanoev.opam @@ -15,8 +15,6 @@ depends: [ ] depopts: [ "trace" {>= "0.7"} - "iostream" {>= "0.3"} - "picos" {>= "0.5" & < "0.7"} ] build: [ ["dune" "subst"] {dev} diff --git a/nanoev_tiny_httpd.opam b/nanoev_tiny_httpd.opam index ea48089..08833e7 100644 --- a/nanoev_tiny_httpd.opam +++ b/nanoev_tiny_httpd.opam @@ -10,8 +10,9 @@ bug-reports: "https://github.com/c-cube/nanoev/issues" depends: [ "ocaml" "dune" {>= "2.7"} - "nanoev" - "picos" {>= "0.6"} + "nanoev" {= version} + "nanoev-picos" {= version} + "picos" "picos_std" "tiny_httpd" {>= "0.17"} "odoc" {with-doc} diff --git a/src/picos/dune b/src/picos/dune index fb37e29..de66efd 100644 --- a/src/picos/dune +++ b/src/picos/dune @@ -1,5 +1,4 @@ (library (name nanoev_picos) - (public_name nanoev.picos) - (optional) ; picos - (libraries threads picos iostream nanoev)) + (public_name nanoev-picos) + (libraries threads picos picos_std.sync iostream nanoev)) diff --git a/src/picos/net_client.ml b/src/picos/net_client.ml index 1bb3cf5..20ec2c4 100644 --- a/src/picos/net_client.ml +++ b/src/picos/net_client.ml @@ -6,7 +6,7 @@ let connect addr : Unix.file_descr = (try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ()); (* connect asynchronously *) - Base.Raw.retry_write sock (fun () -> Unix.connect sock addr); + Base.connect sock addr; sock let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a = @@ -15,6 +15,9 @@ let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a = let ic = IO_in.of_unix_fd sock in let oc = IO_out.of_unix_fd sock in - let finally () = try Unix.close sock with _ -> () in + let finally () = + (try Unix.shutdown sock Unix.SHUTDOWN_ALL with _ -> ()); + try Unix.close sock with _ -> () + in let@ () = Fun.protect ~finally in f ic oc diff --git a/src/picos/net_server.ml b/src/picos/net_server.ml index ca74b0c..642ba48 100644 --- a/src/picos/net_server.ml +++ b/src/picos/net_server.ml @@ -1,3 +1,5 @@ +module Sem = Picos_std_sync.Semaphore.Counting + type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit type t = { @@ -5,30 +7,81 @@ type t = { sock: Unix.file_descr; client_handler: client_handler; spawn: (unit -> unit) -> unit Picos.Computation.t; + max_conns: int; + sem: Sem.t; mutable running: unit Picos.Computation.t option; + exn_handler: exn -> Printexc.raw_backtrace -> unit; } -let join (self : t) : unit = Option.iter Picos.Computation.await self.running +let[@inline] join (self : t) : unit = + Option.iter Picos.Computation.await self.running + +let[@inline] max_connections self = self.max_conns + +let[@inline] n_active_connections (self : t) : int = + self.max_conns - Sem.get_value self.sem + +let[@inline] running (self : t) : bool = Atomic.get self.active let shutdown (self : t) = if Atomic.exchange self.active false then () open struct + let default_exn_handler exn bt = + Printf.eprintf "uncaught exception in network server: %s\n%s%!" + (Printexc.to_string exn) + (Printexc.raw_backtrace_to_string bt) + let run (self : t) () : unit = while Atomic.get self.active do let client_sock, client_addr = Base.accept self.sock in - let comp = + Sem.acquire self.sem; + + let cleanup () = + (try Unix.shutdown client_sock Unix.SHUTDOWN_ALL with _ -> ()); + (* TODO: close in nanoev too *) + (try Unix.close client_sock with _ -> ()); + Sem.release self.sem + in + + let comp : _ Picos.Computation.t = self.spawn (fun () -> let ic = IO_in.of_unix_fd client_sock in let oc = IO_out.of_unix_fd client_sock in - self.client_handler client_addr ic oc) + try + self.client_handler client_addr ic oc; + cleanup () + with exn -> + let bt = Printexc.get_raw_backtrace () in + cleanup (); + self.exn_handler exn bt) in ignore (comp : _ Picos.Computation.t) done end -let establish ?(backlog = 32) ~spawn ~(client_handler : client_handler) addr : t - = +let establish ?backlog ?max_connections ?(exn_handler = default_exn_handler) + ~spawn ~(client_handler : client_handler) addr : t = + let ev = + match Atomic.get Global_.st with + | Some { nanoev = ev; _ } -> ev + | None -> invalid_arg "Nanoev_picos.Net_server: no event loop installed" + in + + let max_connections = + match max_connections with + | None -> Nanoev.max_fds ev + | Some n -> min (Nanoev.max_fds ev) n + in + let sem = Sem.make max_connections in + + let backlog = + match backlog with + | Some n -> max 4 n + | None -> max 4 max_connections + in + let domain = Unix.domain_of_sockaddr addr in let sock = Unix.socket domain Unix.SOCK_STREAM 0 in + Unix.bind sock addr; Unix.listen sock backlog; Unix.set_nonblock sock; @@ -36,12 +89,23 @@ let establish ?(backlog = 32) ~spawn ~(client_handler : client_handler) addr : t (try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ()); let server = - { active = Atomic.make true; spawn; sock; client_handler; running = None } + { + active = Atomic.make true; + max_conns = max_connections; + sem; + spawn; + sock; + client_handler; + running = None; + exn_handler; + } in server.running <- Some (spawn (run server)); server -let with_ ?backlog ~spawn ~client_handler addr f = - let server = establish ?backlog ~spawn ~client_handler addr in +let with_ ?backlog ?max_connections ?exn_handler ~spawn ~client_handler addr f = + let server = + establish ?backlog ?max_connections ?exn_handler ~spawn ~client_handler addr + in Fun.protect ~finally:(fun () -> shutdown server) (fun () -> f server) diff --git a/src/picos/net_server.mli b/src/picos/net_server.mli index 178fc0d..8f9cf62 100644 --- a/src/picos/net_server.mli +++ b/src/picos/net_server.mli @@ -2,17 +2,34 @@ type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit type t val join : t -> unit +(** Wait for server to shutdown *) + val shutdown : t -> unit +(** Ask the server to stop *) + +val running : t -> bool +val max_connections : t -> int +val n_active_connections : t -> int val establish : ?backlog:int -> + ?max_connections:int -> + ?exn_handler:(exn -> Printexc.raw_backtrace -> unit) -> spawn:((unit -> unit) -> unit Picos.Computation.t) -> client_handler:client_handler -> Unix.sockaddr -> t +(** Create and start a new server on the given socket address. + @param spawn used to spawn a new computation per client + @param client_handler + the logic for talking to a client, will run in its own computation + @param backlog number of connections waiting in the listening socket + @param max_connections max number of simultaneous connections *) val with_ : ?backlog:int -> + ?max_connections:int -> + ?exn_handler:(exn -> Printexc.raw_backtrace -> unit) -> spawn:((unit -> unit) -> unit Picos.Computation.t) -> client_handler:client_handler -> Unix.sockaddr -> From 03af765e43ef84db13b7eb89a896ec859e4447be Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 2 May 2025 13:47:57 -0400 Subject: [PATCH 25/28] fix(unix): properly implement timers(!) --- src/unix/nanoev_unix.ml | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/unix/nanoev_unix.ml b/src/unix/nanoev_unix.ml index b74ea06..17b2088 100644 --- a/src/unix/nanoev_unix.ml +++ b/src/unix/nanoev_unix.ml @@ -174,17 +174,35 @@ let next_deadline_ (self : st) : float option = let step (self : st) : unit = let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.unix.step" in (* gather the subscriptions and timeout *) + let now = now_ () in let timeout, sub_r, sub_w = let@ self = with_lock_ self in recompute_if_needed self; let timeout = match next_deadline_ self with | None -> 30. - | Some d -> max 0. (d -. now_ ()) + | Some d -> max 0. (d -. now) in timeout, self.sub_r, self.sub_w in + (* run timers *) + while + if Heap.is_empty self.timer then + false + else ( + let (Timer t) = Heap.peek_min_exn self.timer in + if t.deadline <= now then ( + ignore (Heap.pop_min_exn self.timer : timer_ev); + t.f t.x t.y; + true + ) else + false + ) + do + () + done; + (* enter [select] *) Atomic.set self.in_select true; let r_reads, r_writes, _ = From abd651428d0bed8643c884cc3fd5235b6b356d5c Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 2 May 2025 13:48:12 -0400 Subject: [PATCH 26/28] fix(posix): implement timers --- src/posix/nanoev_posix.ml | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/posix/nanoev_posix.ml b/src/posix/nanoev_posix.ml index ccc6806..2e9a5ca 100644 --- a/src/posix/nanoev_posix.ml +++ b/src/posix/nanoev_posix.ml @@ -98,9 +98,6 @@ type st = { (** While in [poll()], changes get queued, so we don't invalidate the poll buffer before the syscall returns *) } -(* TODO: [Thread.t] field to remember the owner thread, and - thread-safe queue for externally queued tasks. - Only owner thread can call [step]. *) let[@inline] queue_task_ (self : st) t : unit = Sync_queue.push self.queued_tasks t @@ -308,12 +305,30 @@ let step (self : st) : unit = let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.posix.step" in self.owner_thread <- Thread.(id (self ())); + let now = now_ns () in let timeout_ns : int64 = match next_deadline_ self with | None -> 30_000_000_000L - | Some d -> Int64.max 0L (Int64.sub d (now_ns ())) + | Some d -> Int64.max 0L (Int64.sub d now) in + (* run timers *) + while + if Heap.is_empty self.timer then + false + else ( + let (Timer t) = Heap.peek_min_exn self.timer in + if t.deadline <= now then ( + ignore (Heap.pop_min_exn self.timer : timer_ev); + t.f t.x t.y; + true + ) else + false + ) + do + () + done; + (* process all queued tasks. NOTE: race condition: if another thread queues tasks after we do From 74f87af96cd51b0e52f9840bd1330cd53b10417b Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 2 May 2025 13:48:47 -0400 Subject: [PATCH 27/28] detail in tiny_httpd --- src/tiny_httpd/dune | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tiny_httpd/dune b/src/tiny_httpd/dune index d68f539..12a3c11 100644 --- a/src/tiny_httpd/dune +++ b/src/tiny_httpd/dune @@ -5,7 +5,7 @@ threads picos (re_export nanoev) - nanoev.picos + (re_export nanoev-picos) picos_std.sync (re_export iostream) (re_export tiny_httpd))) From b0a29618e73e416cb9e4fc2a1ba30cd4a8d2a8f1 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 2 May 2025 13:48:54 -0400 Subject: [PATCH 28/28] test: better tracing and scalability for echo client/server --- tests/posix/echo/README.md | 6 ++ tests/posix/echo/dune | 2 +- tests/posix/echo/echo_client.ml | 111 +++++++++++++++++++++----------- tests/posix/echo/echo_server.ml | 59 ++++++++++++++--- 4 files changed, 131 insertions(+), 47 deletions(-) create mode 100644 tests/posix/echo/README.md diff --git a/tests/posix/echo/README.md b/tests/posix/echo/README.md new file mode 100644 index 0000000..0fdef34 --- /dev/null +++ b/tests/posix/echo/README.md @@ -0,0 +1,6 @@ + +notes about system limits in Linux: +- `ulimit -n 100000` will raise the max number of FDs for a process to 100000 +- `/proc/sys/net/core/netdev_max_backlog` controls the kernel backlog size, raise it (default is 1000) +- `/proc/sys/net/core/somaxconn` is the max size of a socket backlog (as given to `listen()`), raise it (default is 4096) + diff --git a/tests/posix/echo/dune b/tests/posix/echo/dune index 3c94762..21b26c2 100644 --- a/tests/posix/echo/dune +++ b/tests/posix/echo/dune @@ -1,4 +1,4 @@ (executables (names echo_server echo_client) - (libraries moonpool moonpool.fib nanoev.picos nanoev-posix iostream + (libraries moonpool moonpool.fib nanoev-picos nanoev-posix iostream trace.core trace-tef)) diff --git a/tests/posix/echo/echo_client.ml b/tests/posix/echo/echo_client.ml index 5c8db61..859dec1 100644 --- a/tests/posix/echo/echo_client.ml +++ b/tests/posix/echo/echo_client.ml @@ -1,6 +1,7 @@ module Trace = Trace_core module F = Moonpool_fib module IO = Nanoev_picos +module Sem = Picos_std_sync.Semaphore.Counting [@@@ocaml.alert "-deprecated"] @@ -8,8 +9,12 @@ let ( let@ ) = ( @@ ) let spf = Printf.sprintf let pf = Printf.printf let verbose = ref false +let reset_line = "\x1b[2K\r" +let n_loops_per_task = 100 + +let main ~runner:_ ~port ~unix_sock ~n ~n_conn () = + Sys.set_signal Sys.sigpipe Sys.Signal_ignore; -let main ~port ~unix_sock ~n ~n_conn () = pf "connect on %s n=%d n_conn=%d\n%!" (if unix_sock = "" then spf "localhost:%d" port @@ -24,57 +29,91 @@ let main ~port ~unix_sock ~n ~n_conn () = Unix.ADDR_UNIX unix_sock in - let remaining = Atomic.make n in - let all_done = Atomic.make 0 in - Printf.printf "connecting to port %d\n%!" port; - let rec run_task () = - let n = Atomic.fetch_and_add remaining (-1) in + let all_done = Atomic.make false in + let n_queries = Atomic.make 0 in + + (* limit simultaneous number of connections *) + let sem = Sem.make n_conn in + let n_active_conns = Atomic.make 0 in + + let progress_loop () = + while not (Atomic.get all_done) do + let n_queries = Atomic.get n_queries in + let n_conns = Atomic.get n_active_conns in + + (* progress *) + Printf.printf "%sdone %d queries, %d active connections%!" reset_line + n_queries n_conns; + + Trace.counter_int ~level:Info "n-conns" n_conns; + Trace.counter_int ~level:Info "n-queries" n_queries; + let gc = Gc.quick_stat () in + Trace.counter_int ~level:Info "gc.major" gc.major_collections; + Trace.counter_int ~level:Info "gc.minor" gc.minor_collections; + Trace.counter_int ~level:Info "gc.heap-size" (gc.heap_words * 64); + + Thread.delay 0.2 + done + in + + ignore (Thread.create progress_loop () : Thread.t); + + let run_task () = let _task_sp = Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "run-task" - ~data:(fun () -> [ "n", `Int n ]) in - if n > 0 then ( - ( (* let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "connect.client" in *) - IO.Net_client.with_connect addr - @@ fun ic oc -> - let buf = Bytes.create 32 in + Sem.acquire sem; + ( IO.Net_client.with_connect addr @@ fun ic oc -> + Atomic.incr n_active_conns; + let buf = Bytes.create 32 in - for _j = 1 to 100 do - let _sp = - Trace.enter_manual_sub_span ~parent:(Trace.ctx_of_span _task_sp) - ~__FILE__ ~__LINE__ "write.loop" ~data:(fun () -> - [ "iter", `Int _j ]) - in - Iostream.Out.output_string oc "hello"; - Iostream.Out_buf.flush oc; + for _j = 1 to n_loops_per_task do + (*let _sp = + Trace.enter_manual_sub_span ~parent:_task_sp ~__FILE__ ~__LINE__ + "write.loop" ~data:(fun () -> [ "iter", `Int _j ]) + in*) + Atomic.incr n_queries; - (* read back what we wrote *) - Iostream.In.really_input ic buf 0 (String.length "hello"); - Trace.exit_manual_span _sp; - F.yield () - done ); + Iostream.Out.output_string oc "hello"; + Iostream.Out_buf.flush oc; + + (* read back what we wrote *) + Iostream.In.really_input ic buf 0 (String.length "hello"); + (* Trace.exit_manual_span _sp; *) + F.yield () + done; + + Atomic.decr n_active_conns; + Sem.release sem ); - (* run another task *) - F.spawn_ignore run_task - ) else ( - (* if we're the last to exit, resolve the promise *) - let n_already_done = Atomic.fetch_and_add all_done 1 in - if n_already_done = n_conn - 1 then Printf.printf "all done\n%!" - ); Trace.exit_manual_span _task_sp in + let t_start = Mtime_clock.now () in + (* start the first [n_conn] tasks *) - let fibers = List.init n_conn (fun _ -> F.spawn run_task) in + let fibers = List.init (n * n_conn) (fun _ -> F.spawn run_task) in List.iter F.await fibers; + Atomic.set all_done true; + + let t_stop = Mtime_clock.now () in + let elapsed_s = + (Mtime.span t_start t_stop |> Mtime.Span.to_uint64_ns |> Int64.to_float) + *. 1e-9 + in (* exit when [fut_exit] is resolved *) - Printf.printf "done with main\n%!" + Printf.printf + "%sdone with main (time=%.4fs, n queries=%d, expect=%d, %.3f req/s)\n%!" + reset_line elapsed_s (Atomic.get n_queries) + (n * n_conn * n_loops_per_task) + (float (Atomic.get n_queries) /. elapsed_s) let () = let@ () = Trace_tef.with_setup () in + Trace.set_current_level Info; Trace.set_thread_name "main"; let port = ref 1234 in @@ -96,5 +135,5 @@ let () = let@ () = Nanoev_picos.Background_thread.with_setup (Nanoev_posix.create ()) in - F.main @@ fun _runner -> - main ~port:!port ~unix_sock:!unix_sock ~n:!n ~n_conn:!n_conn () + F.main @@ fun runner -> + main ~runner ~port:!port ~unix_sock:!unix_sock ~n:!n ~n_conn:!n_conn () diff --git a/tests/posix/echo/echo_server.ml b/tests/posix/echo/echo_server.ml index 67405f4..7ed110b 100644 --- a/tests/posix/echo/echo_server.ml +++ b/tests/posix/echo/echo_server.ml @@ -8,13 +8,16 @@ let ( let@ ) = ( @@ ) let pf = Printf.printf let spf = Printf.sprintf let verbose = ref false +let n_reply_response = Atomic.make 0 let str_of_sockaddr = function | Unix.ADDR_UNIX s -> s | Unix.ADDR_INET (addr, port) -> spf "%s:%d" (Unix.string_of_inet_addr addr) port -let main ~port ~unix_sock ~runner () = +let main ~port ~unix_sock ~max_conns ~runner () = + Sys.set_signal Sys.sigpipe Sys.Signal_ignore; + pf "serve on %s\n%!" (if unix_sock = "" then spf "localhost:%d" port @@ -24,12 +27,15 @@ let main ~port ~unix_sock ~runner () = let addr = if unix_sock = "" then Unix.ADDR_INET (Unix.inet_addr_loopback, port) - else + else ( + (* remove leftover unix socket file, if any *) + (try Sys.remove unix_sock with _ -> ()); Unix.ADDR_UNIX unix_sock + ) in let server = - IO.Net_server.establish addr + IO.Net_server.establish ?max_connections:max_conns addr ~spawn:(fun f -> Moonpool.spawn ~on:runner f) ~client_handler:(fun client_addr ic oc -> let _sp = @@ -42,31 +48,63 @@ let main ~port ~unix_sock ~runner () = let buf = Bytes.create 256 in let continue = ref true in while !continue do - let n = Iostream.In.input ic buf 0 (Bytes.length buf) in - if n = 0 then - continue := false - else ( + match Iostream.In.input ic buf 0 (Bytes.length buf) with + | exception exn -> + continue := false; + Printf.eprintf "error in client handler: %s\n%!" + (Printexc.to_string exn) + | 0 -> continue := false + | n -> + Atomic.incr n_reply_response; Iostream.Out.output oc buf 0 n; - Iostream.Out_buf.flush oc - ) + Iostream.Out_buf.flush oc; + Picos.Fiber.yield () done; Trace.exit_manual_span _sp; if !verbose then pf "done with client on %s\n%!" (str_of_sockaddr client_addr)) in + + Printf.printf "max number of connections: %d\n%!" + (IO.Net_server.max_connections server); + + if Trace.enabled () then + ignore + (Thread.create + (fun () -> + while IO.Net_server.running server do + Trace.counter_int ~level:Info "n-conns" + (IO.Net_server.n_active_connections server); + let gc = Gc.quick_stat () in + Trace.counter_int ~level:Info "gc.major" gc.major_collections; + Trace.counter_int ~level:Info "gc.minor" gc.minor_collections; + Trace.counter_int ~level:Info "n-reply-response" + (Atomic.get n_reply_response); + Trace.counter_int ~level:Info "gc.heap-size" (gc.heap_words * 64); + + Thread.delay 0.2 + done) + () + : Thread.t); + IO.Net_server.join server; IO.Net_server.shutdown server; print_endline "exit" let () = let@ () = Trace_tef.with_setup () in + Trace.set_current_level Info; let port = ref 1234 in let unix_sock = ref "" in + let max_conns = ref None in let opts = [ "-p", Arg.Set_int port, " port"; "--unix", Arg.Set_string unix_sock, " unix socket"; + ( "--max-conns", + Arg.Int (fun i -> max_conns := Some i), + " max number of connections" ); "-v", Arg.Set verbose, " verbose"; ] |> Arg.align @@ -76,4 +114,5 @@ let () = let@ () = Nanoev_picos.Background_thread.with_setup (Nanoev_posix.create ()) in - F.main @@ fun runner -> main ~port:!port ~unix_sock:!unix_sock ~runner () + F.main @@ fun runner -> + main ~port:!port ~unix_sock:!unix_sock ~max_conns:!max_conns ~runner ()