From 295b3b5c24be521c09d8f63aa6c01880b290b64e Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 30 Apr 2025 16:12:49 -0400 Subject: [PATCH] wip: nanoev-posix, using mtime and iomux --- dune-project | 7 + src/posix/dune | 6 + src/posix/heap.ml | 61 ++++++++ src/posix/heap.mli | 13 ++ src/posix/nanoev_posix.ml | 291 +++++++++++++++++++++++++++++++++++++ src/posix/nanoev_posix.mli | 8 + src/unix/dune | 1 + 7 files changed, 387 insertions(+) create mode 100644 src/posix/dune create mode 100644 src/posix/heap.ml create mode 100644 src/posix/heap.mli create mode 100644 src/posix/nanoev_posix.ml create mode 100644 src/posix/nanoev_posix.mli diff --git a/dune-project b/dune-project index e0cd8bb..b5f2f69 100644 --- a/dune-project +++ b/dune-project @@ -26,6 +26,13 @@ (tags (unix select async))) +(package + (name nanoev-posix) + (synopsis "Use mtime+iomux (posix compliant) as a backend for nanoev") + (depends ocaml dune base-unix iomux (mtime (>= 2.0))) + (tags + (unix select async iomux nanoev))) + (package (name nanoev_tiny_httpd) (synopsis "Use nanoev as a basis for tiny_httpd") diff --git a/src/posix/dune b/src/posix/dune new file mode 100644 index 0000000..e7147f2 --- /dev/null +++ b/src/posix/dune @@ -0,0 +1,6 @@ +(library + (name nanoev_posix) + (public_name nanoev-posix) + (synopsis "posix backend (poll/ppoll+mtime)") + (private_modules heap) + (libraries nanoev unix iomux mtime mtime.clock.os)) diff --git a/src/posix/heap.ml b/src/posix/heap.ml new file mode 100644 index 0000000..a9a9c9e --- /dev/null +++ b/src/posix/heap.ml @@ -0,0 +1,61 @@ +type 'a tree = + | E + | N of int * 'a * 'a tree * 'a tree + +type 'a t = { + leq: 'a -> 'a -> bool; + mutable t: 'a tree; +} + +let create ~leq () : _ t = { leq; t = E } + +let[@inline] is_empty (self : _ t) = + match self.t with + | E -> true + | N _ -> false + +exception Empty + +open struct + (** Rank of the tree *) + let[@inline] rank_ = function + | E -> 0 + | N (r, _, _, _) -> r + + (** Make a balanced node labelled with [x], and subtrees [a] and [b]. + We ensure that the right child's rank is ≤ to the rank of the + left child (leftist property). The rank of the resulting node + is the length of the rightmost path. *) + let[@inline] mk_node_ x a b = + if rank_ a >= rank_ b then + N (rank_ b + 1, x, a, b) + else + N (rank_ a + 1, x, b, a) + + let rec merge ~leq t1 t2 = + match t1, t2 with + | t, E -> t + | E, t -> t + | N (_, x, a1, b1), N (_, y, a2, b2) -> + if leq x y then + mk_node_ x a1 (merge ~leq b1 t2) + else + mk_node_ y a2 (merge ~leq t1 b2) +end + +let clear self = self.t <- E + +let[@inline] insert (self : _ t) x : unit = + self.t <- merge ~leq:self.leq self.t (N (1, x, E, E)) + +let[@inline] peek_min_exn (self : _ t) = + match self.t with + | E -> raise Empty + | N (_, x, _, _) -> x + +let[@inline] pop_min_exn (self : _ t) = + match self.t with + | E -> raise Empty + | N (_, x, l, r) -> + self.t <- merge ~leq:self.leq l r; + x diff --git a/src/posix/heap.mli b/src/posix/heap.mli new file mode 100644 index 0000000..3efd4c4 --- /dev/null +++ b/src/posix/heap.mli @@ -0,0 +1,13 @@ +type 'a t + +val create : leq:('a -> 'a -> bool) -> unit -> 'a t + +val is_empty : _ t -> bool +(** [is_empty h] returns [true] if the heap [h] is empty. *) + +exception Empty + +val clear : _ t -> unit +val insert : 'a t -> 'a -> unit +val peek_min_exn : 'a t -> 'a +val pop_min_exn : 'a t -> 'a diff --git a/src/posix/nanoev_posix.ml b/src/posix/nanoev_posix.ml new file mode 100644 index 0000000..ad2e0b0 --- /dev/null +++ b/src/posix/nanoev_posix.ml @@ -0,0 +1,291 @@ +open struct + module Trace_ = Nanoev.Trace_ + + let ( let@ ) = ( @@ ) + let now_ns : unit -> int64 = Mtime_clock.now_ns +end + +module Fd_tbl = Hashtbl.Make (struct + open Iomux.Util + + type t = Unix.file_descr + + let equal a b = Int.equal (fd_of_unix a) (fd_of_unix b) + let hash a = Hashtbl.hash (fd_of_unix a) +end) + +module P = Iomux.Poll +module Flags = P.Flags +open Iomux.Util + +(** Callback list *) +type cbs = + | Nil + | Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs + +let[@inline] cb_is_empty = function + | Nil -> true + | Sub _ -> false + +type timer_ev = + | Timer : { + deadline: float; + x: 'a; + y: 'b; + f: 'a -> 'b -> unit; + } + -> timer_ev + +type per_fd = { + fd: Unix.file_descr; + idx: int; (** Index in the buffer *) + mutable r: cbs; + mutable w: cbs; +} + +type st = { + timer: timer_ev Heap.t; + fds: per_fd Fd_tbl.t; + poll: P.t; + mutable cur_idx: int; (** index in the [poll] buffer *) + mutable idx_freelist: int list; + (** previously used indices that are recycled *) + wakeup_rd: Unix.file_descr; + wakeup_wr: Unix.file_descr; + wakeup_triggered: bool Atomic.t; + (** Make [wakeup_from_outside] idempotent within an iteration of [step] *) + in_poll: bool Atomic.t; + (** Are we currently inside a call to [poll]? Useful for other threads to + know whether to wake us up via the pipe *) + lock: Mutex.t; +} + +let rec perform_cbs ~closed = function + | Nil -> () + | Sub (x, y, f, tail) -> + f ~closed x y; + perform_cbs ~closed tail + +let rec perform_cbs_closed ~closed = function + | Nil -> () + | Sub (x, y, f, tail) -> + f ~closed x y; + perform_cbs_closed ~closed tail + +let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline + +let create_st () : st = + let wakeup_rd, wakeup_wr = Unix.pipe () in + Unix.set_nonblock wakeup_rd; + { + timer = Heap.create ~leq:leq_timer (); + fds = Fd_tbl.create 16; + poll = P.create (); + cur_idx = 0; + idx_freelist = []; + wakeup_rd; + wakeup_wr; + wakeup_triggered = Atomic.make false; + in_poll = Atomic.make false; + lock = Mutex.create (); + } + +let[@inline] with_lock_ (self : st) f = + Mutex.lock self.lock; + match f self with + | exception e -> + Mutex.unlock self.lock; + raise e + | res -> + Mutex.unlock self.lock; + res + +let clear (self : st) = + let@ self = with_lock_ self in + Heap.clear self.timer; + Fd_tbl.clear self.fds; + for i = 0 to P.maxfds self.poll - 1 do + P.set_index self.poll i P.invalid_fd Flags.empty + done; + self.cur_idx <- 0; + self.idx_freelist <- []; + () + +let wakeup_from_outside (self : st) : unit = + if not (Atomic.exchange self.wakeup_triggered true) then + let@ _sp = + Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.wakeup-from-outside" + in + let b = Bytes.make 1 '!' in + ignore (Unix.write self.wakeup_wr b 0 1 : int) + +let get_fd_ (self : st) fd : per_fd = + match Fd_tbl.find self.fds fd with + | per_fd -> per_fd + | exception Not_found -> + let idx = + match self.idx_freelist with + | i :: tl -> + self.idx_freelist <- tl; + i + | [] -> + if self.cur_idx = P.maxfds self.poll then + invalid_arg "No available slot in poll"; + let n = self.cur_idx in + self.cur_idx <- self.cur_idx + 1; + n + in + let per_fd = { idx; fd; r = Nil; w = Nil } in + Fd_tbl.add self.fds fd per_fd; + per_fd + +let close self fd : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in + let r, w = + let@ self = with_lock_ self in + match Fd_tbl.find self.fds fd with + | per_fd -> + Fd_tbl.remove self.fds fd; + self.idx_freelist <- per_fd.idx :: self.idx_freelist; + if Atomic.get self.in_poll then wakeup_from_outside self; + per_fd.r, per_fd.w + | exception Not_found -> + invalid_arg "File descriptor is not known to Nanoev" + in + + (* call callbacks outside of the lock *) + perform_cbs_closed ~closed:true r; + perform_cbs_closed ~closed:true w; + () + +let on_readable self fd x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in + let@ self = with_lock_ self in + let per_fd = get_fd_ self fd in + per_fd.r <- Sub (x, y, f, per_fd.r); + (* FIXME: P.set_index *) + self.sub_up_to_date <- false; + if Atomic.get self.in_select then wakeup_from_outside self + +let on_writable self fd x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in + let@ self = with_lock_ self in + let per_fd = get_fd_ self fd in + per_fd.w <- Sub (x, y, f, per_fd.w); + (* FIXME: P.set_index *) + self.sub_up_to_date <- false; + if Atomic.get self.in_select then wakeup_from_outside self + +let run_after_s self time x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.run-after-s" in + let@ self = with_lock_ self in + let deadline = now_ () +. time in + Heap.insert self.timer (Timer { deadline; x; y; f }); + if Atomic.get self.in_select then wakeup_from_outside self + +let recompute_if_needed (self : st) = + if not self.sub_up_to_date then ( + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "recompute-if-needed" in + self.sub_up_to_date <- true; + self.sub_r <- []; + self.sub_w <- []; + Hashtbl.iter + (fun fd per_fd -> + if cb_is_empty per_fd.r && cb_is_empty per_fd.w then + Hashtbl.remove self.fds fd; + if not (cb_is_empty per_fd.r) then self.sub_r <- fd :: self.sub_r; + if not (cb_is_empty per_fd.w) then self.sub_w <- fd :: self.sub_w) + self.fds + ) + +let next_deadline_ (self : st) : float option = + match Heap.peek_min_exn self.timer with + | exception Heap.Empty -> None + | Timer t -> Some t.deadline + +let step (self : st) : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.unix.step" in + (* gather the subscriptions and timeout *) + let timeout, sub_r, sub_w = + let@ self = with_lock_ self in + recompute_if_needed self; + let timeout = + match next_deadline_ self with + | None -> 30. + | Some d -> max 0. (d -. now_ ()) + in + timeout, self.sub_r, self.sub_w + in + + (* enter [select] *) + Atomic.set self.in_select true; + let r_reads, r_writes, _ = + let@ _sp = + Trace_.with_span ~__FILE__ ~__LINE__ "select" ~data:(fun () -> + [ + "timeout", `Float timeout; + "reads", `Int (List.length sub_r); + "writes", `Int (List.length sub_w); + ]) + in + Unix.select (self.wakeup_rd :: sub_r) sub_w [] timeout + in + Atomic.set self.in_select false; + + (* drain pipe *) + if Atomic.exchange self.wakeup_triggered false then ( + let b1 = Bytes.create 1 in + while try Unix.read self.wakeup_rd b1 0 1 > 0 with _ -> false do + () + done + ); + + (* gather the [per_fd] that are ready *) + let ready_r = ref [] in + let ready_w = ref [] in + + (* gather the [per_fd] that have updates *) + (let@ self = with_lock_ self in + if r_reads != [] || r_writes != [] then self.sub_up_to_date <- false; + + List.iter + (fun fd -> + if fd != self.wakeup_rd then ( + let per_fd = Hashtbl.find self.fds fd in + ready_r := per_fd :: !ready_r + )) + r_reads; + List.iter + (fun fd -> + let per_fd = Hashtbl.find self.fds fd in + ready_w := per_fd :: !ready_w) + r_writes); + + (* call callbacks *) + List.iter + (fun fd -> + perform_cbs ~closed:false fd.r; + fd.r <- Nil) + !ready_r; + List.iter + (fun fd -> + perform_cbs ~closed:false fd.w; + fd.w <- Nil) + !ready_w; + + () + +let ops : st Nanoev.Impl.ops = + { + step; + close; + on_readable; + on_writable; + run_after_s; + wakeup_from_outside; + clear; + } + +include Nanoev + +let create () : t = Impl.build ops (create_st ()) diff --git a/src/posix/nanoev_posix.mli b/src/posix/nanoev_posix.mli new file mode 100644 index 0000000..63ee287 --- /dev/null +++ b/src/posix/nanoev_posix.mli @@ -0,0 +1,8 @@ +(** Nano event loop using Poll/Ppoll *) + +include module type of struct + include Nanoev +end + +val create : unit -> t +val create_with : Iomux.Poll.t -> t diff --git a/src/unix/dune b/src/unix/dune index 4f14854..3c0fe50 100644 --- a/src/unix/dune +++ b/src/unix/dune @@ -2,4 +2,5 @@ (name nanoev_unix) (public_name nanoev.unix) (synopsis "Unix/select backend") + (private_modules heap) (libraries nanoev unix))