From 3125f3274b757f4833423dfbbedfa9e63d077acf Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 30 Apr 2025 22:16:19 -0400 Subject: [PATCH] wip: posix --- nanoev-posix.opam | 32 ++++++++++ src/posix/dune | 2 +- src/posix/nanoev_posix.ml | 120 +++++++++++++++++++++++++------------- 3 files changed, 113 insertions(+), 41 deletions(-) create mode 100644 nanoev-posix.opam diff --git a/nanoev-posix.opam b/nanoev-posix.opam new file mode 100644 index 0000000..d113361 --- /dev/null +++ b/nanoev-posix.opam @@ -0,0 +1,32 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Use mtime+iomux (posix compliant) as a backend for nanoev" +maintainer: ["Simon Cruanes"] +authors: ["Simon Cruanes"] +license: "MIT" +tags: ["unix" "select" "async" "iomux" "nanoev"] +homepage: "https://github.com/c-cube/nanoev" +bug-reports: "https://github.com/c-cube/nanoev/issues" +depends: [ + "ocaml" + "dune" {>= "2.7"} + "base-unix" + "iomux" + "mtime" {>= "2.0"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/c-cube/nanoev.git" diff --git a/src/posix/dune b/src/posix/dune index e7147f2..7387ac6 100644 --- a/src/posix/dune +++ b/src/posix/dune @@ -3,4 +3,4 @@ (public_name nanoev-posix) (synopsis "posix backend (poll/ppoll+mtime)") (private_modules heap) - (libraries nanoev unix iomux mtime mtime.clock.os)) + (libraries threads nanoev unix iomux mtime mtime.clock.os)) diff --git a/src/posix/nanoev_posix.ml b/src/posix/nanoev_posix.ml index ad2e0b0..8601421 100644 --- a/src/posix/nanoev_posix.ml +++ b/src/posix/nanoev_posix.ml @@ -18,11 +18,30 @@ module P = Iomux.Poll module Flags = P.Flags open Iomux.Util +(* TODO: remove +module Sync_queue = struct + type 'a t = { + q: 'a Queue.t; + mutex: Mutex.t; + } + + let create () : _ t = { q = Queue.create (); mutex = Mutex.create () } + + let push (self : _ t) x : unit = + Mutex.lock self.mutex; + Queue.push x self.q; + Mutex.unlock self.mutex +end +*) + (** Callback list *) type cbs = | Nil | Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs +(** Single callback *) +type cb = Cb : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) -> cb + let[@inline] cb_is_empty = function | Nil -> true | Sub _ -> false @@ -38,25 +57,40 @@ type timer_ev = type per_fd = { fd: Unix.file_descr; - idx: int; (** Index in the buffer *) + mutable idx: int; + (** Index in the buffer. Can change because we swap FDs sometimes to + remove items. *) mutable r: cbs; mutable w: cbs; } +let[@inline] per_fd_flags (self : per_fd) : Flags.t = + let fl = ref Flags.empty in + (if self.r <> Nil then fl := Flags.(!fl + pollin)); + (if self.w <> Nil then fl := Flags.(!fl + pollout)); + !fl + +type queued_task = + | Q_timer of timer_ev + | Q_on_readable of Unix.file_descr * cb + | Q_on_writable of Unix.file_descr * cb + | Q_clear + type st = { timer: timer_ev Heap.t; fds: per_fd Fd_tbl.t; poll: P.t; - mutable cur_idx: int; (** index in the [poll] buffer *) - mutable idx_freelist: int list; - (** previously used indices that are recycled *) + mutable len: int; (** length of the active prefix of the [poll] buffer *) wakeup_rd: Unix.file_descr; wakeup_wr: Unix.file_descr; wakeup_triggered: bool Atomic.t; (** Make [wakeup_from_outside] idempotent within an iteration of [step] *) - in_poll: bool Atomic.t; - (** Are we currently inside a call to [poll]? Useful for other threads to - know whether to wake us up via the pipe *) + in_poll: Thread.t option Atomic.t; + (** Are we currently inside a call to [poll], and in which thread? Useful + for other threads to know whether to wake us up via the pipe *) + queued_tasks: queued_task Queue.t; + (** While in [poll()], changes get queued, so we don't invalidate the poll + buffer before the syscall returns *) lock: Mutex.t; } @@ -72,23 +106,7 @@ let rec perform_cbs_closed ~closed = function f ~closed x y; perform_cbs_closed ~closed tail -let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline - -let create_st () : st = - let wakeup_rd, wakeup_wr = Unix.pipe () in - Unix.set_nonblock wakeup_rd; - { - timer = Heap.create ~leq:leq_timer (); - fds = Fd_tbl.create 16; - poll = P.create (); - cur_idx = 0; - idx_freelist = []; - wakeup_rd; - wakeup_wr; - wakeup_triggered = Atomic.make false; - in_poll = Atomic.make false; - lock = Mutex.create (); - } +let[@inline] leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline let[@inline] with_lock_ (self : st) f = Mutex.lock self.lock; @@ -100,6 +118,24 @@ let[@inline] with_lock_ (self : st) f = Mutex.unlock self.lock; res +let create_st () : st = + let wakeup_rd, wakeup_wr = Unix.pipe () in + Unix.set_nonblock wakeup_rd; + { + timer = Heap.create ~leq:leq_timer (); + fds = Fd_tbl.create 16; + poll = P.create (); + len = 0; + wakeup_rd; + wakeup_wr; + wakeup_triggered = Atomic.make false; + in_poll = Atomic.make None; + queued_tasks = Queue.create (); + lock = Mutex.create (); + } + +let max_fds (self : st) : int = P.maxfds self.poll + let clear (self : st) = let@ self = with_lock_ self in Heap.clear self.timer; @@ -107,8 +143,7 @@ let clear (self : st) = for i = 0 to P.maxfds self.poll - 1 do P.set_index self.poll i P.invalid_fd Flags.empty done; - self.cur_idx <- 0; - self.idx_freelist <- []; + self.len <- 0; () let wakeup_from_outside (self : st) : unit = @@ -124,36 +159,40 @@ let get_fd_ (self : st) fd : per_fd = | per_fd -> per_fd | exception Not_found -> let idx = - match self.idx_freelist with - | i :: tl -> - self.idx_freelist <- tl; - i - | [] -> - if self.cur_idx = P.maxfds self.poll then - invalid_arg "No available slot in poll"; - let n = self.cur_idx in - self.cur_idx <- self.cur_idx + 1; - n + if self.len = P.maxfds self.poll then + invalid_arg "No available slot in poll"; + let n = self.len in + self.len <- self.len + 1; + n in let per_fd = { idx; fd; r = Nil; w = Nil } in Fd_tbl.add self.fds fd per_fd; per_fd -let close self fd : unit = +let close_ (self : st) fd : unit = let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in let r, w = let@ self = with_lock_ self in match Fd_tbl.find self.fds fd with | per_fd -> Fd_tbl.remove self.fds fd; - self.idx_freelist <- per_fd.idx :: self.idx_freelist; - if Atomic.get self.in_poll then wakeup_from_outside self; + + (* not the last element, move the last element here *) + if per_fd.idx + 1 < self.len then ( + let last_fd = P.get_fd self.poll (self.len - 1) in + match Fd_tbl.find_opt self.fds last_fd with + | None -> () + | Some last_per_fd -> + last_per_fd.idx <- per_fd.idx; + P.set_index self.poll per_fd.idx last_fd (per_fd_flags last_per_fd) + ); + self.len <- self.len - 1; per_fd.r, per_fd.w | exception Not_found -> invalid_arg "File descriptor is not known to Nanoev" in - (* call callbacks outside of the lock *) + (* call callbacks outside of the lock's critical section *) perform_cbs_closed ~closed:true r; perform_cbs_closed ~closed:true w; () @@ -282,6 +321,7 @@ let ops : st Nanoev.Impl.ops = on_readable; on_writable; run_after_s; + max_fds; wakeup_from_outside; clear; }