diff --git a/src/posix/nanoev_posix.ml b/src/posix/nanoev_posix.ml index 8601421..ccc6806 100644 --- a/src/posix/nanoev_posix.ml +++ b/src/posix/nanoev_posix.ml @@ -3,6 +3,8 @@ open struct let ( let@ ) = ( @@ ) let now_ns : unit -> int64 = Mtime_clock.now_ns + let[@inline] ns_of_s (t : float) : int64 = Int64.of_float (t *. 1e9) + let[@inline] ns_to_s (t : int64) : float = Int64.to_float t /. 1e9 end module Fd_tbl = Hashtbl.Make (struct @@ -16,9 +18,7 @@ end) module P = Iomux.Poll module Flags = P.Flags -open Iomux.Util -(* TODO: remove module Sync_queue = struct type 'a t = { q: 'a Queue.t; @@ -31,68 +31,133 @@ module Sync_queue = struct Mutex.lock self.mutex; Queue.push x self.q; Mutex.unlock self.mutex + + let transfer (self : _ t) q : unit = + Mutex.lock self.mutex; + Queue.transfer self.q q; + Mutex.unlock self.mutex end -*) (** Callback list *) type cbs = | Nil | Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs -(** Single callback *) -type cb = Cb : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) -> cb - -let[@inline] cb_is_empty = function - | Nil -> true - | Sub _ -> false - type timer_ev = | Timer : { - deadline: float; + deadline: int64; x: 'a; y: 'b; f: 'a -> 'b -> unit; } -> timer_ev -type per_fd = { +type fd_data = { fd: Unix.file_descr; mutable idx: int; - (** Index in the buffer. Can change because we swap FDs sometimes to - remove items. *) + (** Index in the poll buffer. Mutable because we might change it when we + swap FDs to remove items. *) mutable r: cbs; mutable w: cbs; } +(** Data associated to a given FD *) -let[@inline] per_fd_flags (self : per_fd) : Flags.t = +let[@inline] fd_flags (self : fd_data) : Flags.t = let fl = ref Flags.empty in - (if self.r <> Nil then fl := Flags.(!fl + pollin)); - (if self.w <> Nil then fl := Flags.(!fl + pollout)); + (if self.r != Nil then fl := Flags.(!fl + pollin)); + (if self.w != Nil then fl := Flags.(!fl + pollout)); !fl type queued_task = - | Q_timer of timer_ev - | Q_on_readable of Unix.file_descr * cb - | Q_on_writable of Unix.file_descr * cb + | Q_run_after of timer_ev + | Q_on_readable : + Unix.file_descr * 'a * 'b * (closed:bool -> 'a -> 'b -> unit) + -> queued_task + | Q_on_writable : + Unix.file_descr * 'a * 'b * (closed:bool -> 'a -> 'b -> unit) + -> queued_task | Q_clear + | Q_close of Unix.file_descr type st = { timer: timer_ev Heap.t; - fds: per_fd Fd_tbl.t; + fds: fd_data Fd_tbl.t; poll: P.t; mutable len: int; (** length of the active prefix of the [poll] buffer *) wakeup_rd: Unix.file_descr; wakeup_wr: Unix.file_descr; wakeup_triggered: bool Atomic.t; (** Make [wakeup_from_outside] idempotent within an iteration of [step] *) - in_poll: Thread.t option Atomic.t; + in_poll: bool Atomic.t; (** Are we currently inside a call to [poll], and in which thread? Useful for other threads to know whether to wake us up via the pipe *) - queued_tasks: queued_task Queue.t; + mutable owner_thread: int; + (** Thread allowed to perform operations on this poll instance. Starts at + [-1]. *) + queued_tasks: queued_task Sync_queue.t; (** While in [poll()], changes get queued, so we don't invalidate the poll buffer before the syscall returns *) - lock: Mutex.t; } +(* TODO: [Thread.t] field to remember the owner thread, and + thread-safe queue for externally queued tasks. + Only owner thread can call [step]. *) + +let[@inline] queue_task_ (self : st) t : unit = + Sync_queue.push self.queued_tasks t + +(** [true] if called from the owner thread *) +let[@inline] in_owner_thread (self : st) : bool = + self.owner_thread != -1 && self.owner_thread == Thread.(id (self ())) + +let[@inline] in_poll (self : st) : bool = Atomic.get self.in_poll +let[@inline] leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline + +let create_st () : st = + let wakeup_rd, wakeup_wr = Unix.pipe () in + (* reading end must be non blocking so it's not always immediately + ready; writing end is blocking to make it simpler to wakeup from other + threads *) + Unix.set_nonblock wakeup_rd; + let self = + { + timer = Heap.create ~leq:leq_timer (); + fds = Fd_tbl.create 16; + poll = P.create (); + len = 0; + wakeup_rd; + wakeup_wr; + wakeup_triggered = Atomic.make false; + in_poll = Atomic.make false; + owner_thread = -1; + queued_tasks = Sync_queue.create (); + } + in + + (* always watch for the pipe being readable *) + P.set_index self.poll 0 self.wakeup_rd Flags.pollin; + self.len <- 1; + + self + +let max_fds (self : st) : int = P.maxfds self.poll + +let[@inline never] wakeup_real_ (self : st) : unit = + let@ _sp = + Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.wakeup-from-outside" + in + let b = Bytes.make 1 '!' in + ignore (Unix.write self.wakeup_wr b 0 1 : int) + +let[@inline] wakeup_ (self : st) : unit = + if not (Atomic.exchange self.wakeup_triggered true) then wakeup_real_ self + +let wakeup_from_outside (self : st) : unit = + let already_awake = + (* to avoid race conditions we only take the shortcut if + this is called from the owner thread *) + in_owner_thread self && not (Atomic.get self.in_poll) + in + if not already_awake then wakeup_ self let rec perform_cbs ~closed = function | Nil -> () @@ -100,217 +165,209 @@ let rec perform_cbs ~closed = function f ~closed x y; perform_cbs ~closed tail -let rec perform_cbs_closed ~closed = function - | Nil -> () - | Sub (x, y, f, tail) -> - f ~closed x y; - perform_cbs_closed ~closed tail +(** Change the event loop right now. This must be called only from the owner + thread and outside of [poll]. *) +module Run_now_ = struct + let rec perform_cbs_closed ~closed = function + | Nil -> () + | Sub (x, y, f, tail) -> + f ~closed x y; + perform_cbs_closed ~closed tail -let[@inline] leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline + let clear_ (self : st) : unit = + Heap.clear self.timer; + Fd_tbl.clear self.fds; + for i = 0 to P.maxfds self.poll - 1 do + P.set_index self.poll i P.invalid_fd Flags.empty + done; + Atomic.set self.wakeup_triggered false; + self.len <- 0; + () -let[@inline] with_lock_ (self : st) f = - Mutex.lock self.lock; - match f self with - | exception e -> - Mutex.unlock self.lock; - raise e - | res -> - Mutex.unlock self.lock; - res + let get_fd_ (self : st) fd : fd_data = + (* assert (in_owner_thread self && not (in_poll self)); *) + match Fd_tbl.find self.fds fd with + | per_fd -> per_fd + | exception Not_found -> + let idx = + if self.len = P.maxfds self.poll then + invalid_arg "No available slot in poll"; + let n = self.len in + self.len <- self.len + 1; + n + in + let per_fd = { idx; fd; r = Nil; w = Nil } in + Fd_tbl.add self.fds fd per_fd; + per_fd -let create_st () : st = - let wakeup_rd, wakeup_wr = Unix.pipe () in - Unix.set_nonblock wakeup_rd; - { - timer = Heap.create ~leq:leq_timer (); - fds = Fd_tbl.create 16; - poll = P.create (); - len = 0; - wakeup_rd; - wakeup_wr; - wakeup_triggered = Atomic.make false; - in_poll = Atomic.make None; - queued_tasks = Queue.create (); - lock = Mutex.create (); - } + let remove_fd_ (self : st) (fd_data : fd_data) : unit = + Fd_tbl.remove self.fds fd_data.fd; + P.set_index self.poll fd_data.idx P.invalid_fd Flags.empty; -let max_fds (self : st) : int = P.maxfds self.poll + (* assert (in_owner_thread self && not (in_poll self)); *) + if fd_data.idx > 0 && fd_data.idx + 1 < self.len then ( + (* not the last element nor the first (pipe_rd), move the last element + here to keep the buffer non sparse *) + let last_fd = P.get_fd self.poll (self.len - 1) in + assert (last_fd <> fd_data.fd); + match Fd_tbl.find_opt self.fds last_fd with + | None -> assert false + | Some last_fd_data -> + (* move the last FD to [idx] *) + last_fd_data.idx <- fd_data.idx; + P.set_index self.poll fd_data.idx last_fd (fd_flags last_fd_data) + ); + + self.len <- self.len - 1; + () + + let close_ (self : st) fd : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in + let r, w = + match Fd_tbl.find self.fds fd with + | fd_data -> + remove_fd_ self fd_data; + fd_data.r, fd_data.w + | exception Not_found -> Nil, Nil + in + perform_cbs_closed ~closed:true r; + perform_cbs_closed ~closed:true w; + () + + let on_readable_ self fd x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in + let fd_data = get_fd_ self fd in + fd_data.r <- Sub (x, y, f, fd_data.r); + P.set_index self.poll fd_data.idx fd (fd_flags fd_data) + + let on_writable_ self fd x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in + let fd_data = get_fd_ self fd in + fd_data.w <- Sub (x, y, f, fd_data.w); + P.set_index self.poll fd_data.idx fd (fd_flags fd_data) + + let run_after_s_ self ev : unit = Heap.insert self.timer ev + + let perform_task_ self (t : queued_task) : unit = + match t with + | Q_run_after t -> run_after_s_ self t + | Q_on_readable (fd, x, y, f) -> on_readable_ self fd x y f + | Q_on_writable (fd, x, y, f) -> on_writable_ self fd x y f + | Q_clear -> clear_ self + | Q_close fd -> close_ self fd +end let clear (self : st) = - let@ self = with_lock_ self in - Heap.clear self.timer; - Fd_tbl.clear self.fds; - for i = 0 to P.maxfds self.poll - 1 do - P.set_index self.poll i P.invalid_fd Flags.empty - done; - self.len <- 0; - () - -let wakeup_from_outside (self : st) : unit = - if not (Atomic.exchange self.wakeup_triggered true) then - let@ _sp = - Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.wakeup-from-outside" - in - let b = Bytes.make 1 '!' in - ignore (Unix.write self.wakeup_wr b 0 1 : int) - -let get_fd_ (self : st) fd : per_fd = - match Fd_tbl.find self.fds fd with - | per_fd -> per_fd - | exception Not_found -> - let idx = - if self.len = P.maxfds self.poll then - invalid_arg "No available slot in poll"; - let n = self.len in - self.len <- self.len + 1; - n - in - let per_fd = { idx; fd; r = Nil; w = Nil } in - Fd_tbl.add self.fds fd per_fd; - per_fd - -let close_ (self : st) fd : unit = - let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in - let r, w = - let@ self = with_lock_ self in - match Fd_tbl.find self.fds fd with - | per_fd -> - Fd_tbl.remove self.fds fd; - - (* not the last element, move the last element here *) - if per_fd.idx + 1 < self.len then ( - let last_fd = P.get_fd self.poll (self.len - 1) in - match Fd_tbl.find_opt self.fds last_fd with - | None -> () - | Some last_per_fd -> - last_per_fd.idx <- per_fd.idx; - P.set_index self.poll per_fd.idx last_fd (per_fd_flags last_per_fd) - ); - self.len <- self.len - 1; - per_fd.r, per_fd.w - | exception Not_found -> - invalid_arg "File descriptor is not known to Nanoev" - in - - (* call callbacks outside of the lock's critical section *) - perform_cbs_closed ~closed:true r; - perform_cbs_closed ~closed:true w; - () - -let on_readable self fd x y f : unit = - let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in - let@ self = with_lock_ self in - let per_fd = get_fd_ self fd in - per_fd.r <- Sub (x, y, f, per_fd.r); - (* FIXME: P.set_index *) - self.sub_up_to_date <- false; - if Atomic.get self.in_select then wakeup_from_outside self - -let on_writable self fd x y f : unit = - let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in - let@ self = with_lock_ self in - let per_fd = get_fd_ self fd in - per_fd.w <- Sub (x, y, f, per_fd.w); - (* FIXME: P.set_index *) - self.sub_up_to_date <- false; - if Atomic.get self.in_select then wakeup_from_outside self - -let run_after_s self time x y f : unit = - let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.run-after-s" in - let@ self = with_lock_ self in - let deadline = now_ () +. time in - Heap.insert self.timer (Timer { deadline; x; y; f }); - if Atomic.get self.in_select then wakeup_from_outside self - -let recompute_if_needed (self : st) = - if not self.sub_up_to_date then ( - let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "recompute-if-needed" in - self.sub_up_to_date <- true; - self.sub_r <- []; - self.sub_w <- []; - Hashtbl.iter - (fun fd per_fd -> - if cb_is_empty per_fd.r && cb_is_empty per_fd.w then - Hashtbl.remove self.fds fd; - if not (cb_is_empty per_fd.r) then self.sub_r <- fd :: self.sub_r; - if not (cb_is_empty per_fd.w) then self.sub_w <- fd :: self.sub_w) - self.fds + if in_owner_thread self && not (in_poll self) then + Run_now_.clear_ self + else ( + queue_task_ self @@ Q_clear; + wakeup_from_outside self ) -let next_deadline_ (self : st) : float option = +let close (self : st) fd : unit = + if in_owner_thread self && not (in_poll self) then + Run_now_.close_ self fd + else ( + queue_task_ self @@ Q_close fd; + wakeup_from_outside self + ) + +let on_readable self fd x y f : unit = + if in_owner_thread self && not (in_poll self) then + Run_now_.on_readable_ self fd x y f + else ( + queue_task_ self @@ Q_on_readable (fd, x, y, f); + wakeup_from_outside self + ) + +let on_writable self fd x y f : unit = + if in_owner_thread self && not (in_poll self) then + Run_now_.on_writable_ self fd x y f + else ( + queue_task_ self @@ Q_on_writable (fd, x, y, f); + wakeup_from_outside self + ) + +let run_after_s self (time : float) x y f : unit = + let deadline = Int64.add (now_ns ()) (ns_of_s time) in + let ev = Timer { deadline; x; y; f } in + if in_owner_thread self && not (in_poll self) then + Run_now_.run_after_s_ self ev + else ( + queue_task_ self @@ Q_run_after ev; + wakeup_from_outside self + ) + +let next_deadline_ (self : st) : int64 option = match Heap.peek_min_exn self.timer with | exception Heap.Empty -> None | Timer t -> Some t.deadline let step (self : st) : unit = - let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.unix.step" in - (* gather the subscriptions and timeout *) - let timeout, sub_r, sub_w = - let@ self = with_lock_ self in - recompute_if_needed self; - let timeout = - match next_deadline_ self with - | None -> 30. - | Some d -> max 0. (d -. now_ ()) - in - timeout, self.sub_r, self.sub_w + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.posix.step" in + + self.owner_thread <- Thread.(id (self ())); + let timeout_ns : int64 = + match next_deadline_ self with + | None -> 30_000_000_000L + | Some d -> Int64.max 0L (Int64.sub d (now_ns ())) in - (* enter [select] *) - Atomic.set self.in_select true; - let r_reads, r_writes, _ = + (* process all queued tasks. + + NOTE: race condition: if another thread queues tasks after we do + the transfer, it will call [wakeup_from_outside] and make the pipe_rd FD + readable. So as soon as we call [poll], it will return and we will find + the queued tasks waiting for us. *) + let local_q = Queue.create () in + Sync_queue.transfer self.queued_tasks local_q; + while not (Queue.is_empty local_q) do + let t = Queue.pop local_q in + Run_now_.perform_task_ self t + done; + + Atomic.set self.in_poll true; + + (* enter [poll] *) + let num_ready_fds = let@ _sp = - Trace_.with_span ~__FILE__ ~__LINE__ "select" ~data:(fun () -> - [ - "timeout", `Float timeout; - "reads", `Int (List.length sub_r); - "writes", `Int (List.length sub_w); - ]) + Trace_.with_span ~__FILE__ ~__LINE__ "poll" ~data:(fun () -> + [ "timeout", `Float (ns_to_s timeout_ns); "len", `Int self.len ]) in - Unix.select (self.wakeup_rd :: sub_r) sub_w [] timeout + P.ppoll_or_poll self.poll self.len (Nanoseconds timeout_ns) in - Atomic.set self.in_select false; - (* drain pipe *) + Atomic.set self.in_poll false; + + (* drain notification pipe *) if Atomic.exchange self.wakeup_triggered false then ( - let b1 = Bytes.create 1 in - while try Unix.read self.wakeup_rd b1 0 1 > 0 with _ -> false do + let b1 = Bytes.create 8 in + while try Unix.read self.wakeup_rd b1 0 8 > 0 with _ -> false do () done ); - (* gather the [per_fd] that are ready *) - let ready_r = ref [] in - let ready_w = ref [] in - - (* gather the [per_fd] that have updates *) - (let@ self = with_lock_ self in - if r_reads != [] || r_writes != [] then self.sub_up_to_date <- false; - - List.iter - (fun fd -> - if fd != self.wakeup_rd then ( - let per_fd = Hashtbl.find self.fds fd in - ready_r := per_fd :: !ready_r - )) - r_reads; - List.iter - (fun fd -> - let per_fd = Hashtbl.find self.fds fd in - ready_w := per_fd :: !ready_w) - r_writes); - (* call callbacks *) - List.iter - (fun fd -> - perform_cbs ~closed:false fd.r; - fd.r <- Nil) - !ready_r; - List.iter - (fun fd -> - perform_cbs ~closed:false fd.w; - fd.w <- Nil) - !ready_w; + P.iter_ready self.poll num_ready_fds (fun _idx fd flags -> + if fd <> self.wakeup_rd then ( + let fd_data = + try Fd_tbl.find self.fds fd with Not_found -> assert false + in + + if Flags.mem Flags.pollin flags then ( + let r = fd_data.r in + fd_data.r <- Nil; + perform_cbs ~closed:false r + ); + if Flags.mem Flags.pollout flags then ( + let w = fd_data.w in + fd_data.w <- Nil; + perform_cbs ~closed:false w + ); + + if Flags.empty = fd_flags fd_data then Run_now_.remove_fd_ self fd_data + )); () diff --git a/src/posix/nanoev_posix.mli b/src/posix/nanoev_posix.mli index 63ee287..5d23b61 100644 --- a/src/posix/nanoev_posix.mli +++ b/src/posix/nanoev_posix.mli @@ -5,4 +5,6 @@ include module type of struct end val create : unit -> t -val create_with : Iomux.Poll.t -> t +(** Create a new nanoev loop using [Iomux] (poll/ppoll). + + {b NOTE}: this is NOT thread-safe *)