diff --git a/src/nanoev.ml b/src/nanoev.ml index 4bf425a..00989e9 100644 --- a/src/nanoev.ml +++ b/src/nanoev.ml @@ -32,3 +32,13 @@ let[@inline] run_after_s (Ev (ops, st)) time x y f : unit = ops.run_after_s st time x y f let[@inline] step (Ev (ops, st)) : unit = ops.step st + +(* +let rec read (self:t) fd buf i len : int = + match Unix.read fd buf i len with + | n -> n + | exception Unix.Unix_error (Unix, _, _) -> + read self fd buf i len +*) + + diff --git a/src/unix/nanoev_unix.ml b/src/unix/nanoev_unix.ml index 3d54a5a..5c29cf6 100644 --- a/src/unix/nanoev_unix.ml +++ b/src/unix/nanoev_unix.ml @@ -34,18 +34,29 @@ type st = { mutable sub_w: Unix.file_descr list; mutable sub_up_to_date: bool; (** are [sub_r] and [sub_w] faithful reflections of [fds]? *) + wakeup_rd: Unix.file_descr; + wakeup_wr: Unix.file_descr; + wakeup_triggered: bool Atomic.t; + (** Make [wakeup_from_outside] idempotent within an iteration of [step] *) + in_select: bool Atomic.t; lock: Mutex.t; } let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline let create_st () : st = + let wakeup_rd, wakeup_wr = Unix.pipe () in + Unix.set_nonblock wakeup_rd; { timer = Heap.create ~leq:leq_timer (); fds = Hashtbl.create 16; sub_r = []; sub_w = []; sub_up_to_date = true; + wakeup_rd; + wakeup_wr; + wakeup_triggered = Atomic.make false; + in_select = Atomic.make false; lock = Mutex.create (); } @@ -68,8 +79,11 @@ let clear (self : st) = self.sub_up_to_date <- true; () -(* TODO: *) -let wakeup_from_outside _self : unit = () +let wakeup_from_outside (self : st) : unit = + if not (Atomic.exchange self.wakeup_triggered true) then ( + let b = Bytes.make 1 '!' in + ignore (Unix.write self.wakeup_wr b 0 1 : int) + ) let get_fd_ (self : st) fd : per_fd = match Hashtbl.find self.fds fd with @@ -82,17 +96,22 @@ let get_fd_ (self : st) fd : per_fd = let on_readable self fd x y f : unit = let@ self = with_lock_ self in let per_fd = get_fd_ self fd in - per_fd.r <- Sub (x, y, f, per_fd.r) + per_fd.r <- Sub (x, y, f, per_fd.r); + self.sub_up_to_date <- false; + if Atomic.get self.in_select then wakeup_from_outside self let on_writable self fd x y f : unit = let@ self = with_lock_ self in let per_fd = get_fd_ self fd in - per_fd.w <- Sub (x, y, f, per_fd.w) + per_fd.w <- Sub (x, y, f, per_fd.w); + self.sub_up_to_date <- false; + if Atomic.get self.in_select then wakeup_from_outside self let run_after_s self time x y f : unit = let@ self = with_lock_ self in let deadline = now_ () +. time in - Heap.insert self.timer (Timer { deadline; x; y; f }) + Heap.insert self.timer (Timer { deadline; x; y; f }); + if Atomic.get self.in_select then wakeup_from_outside self let recompute_if_needed (self : st) = if not self.sub_up_to_date then ( @@ -108,16 +127,6 @@ let recompute_if_needed (self : st) = self.fds ) -(* - let set fd (ev : Event.t) : unit = - needs_recompute := true; - match Hashtbl.find subs fd with - | exception Not_found -> Hashtbl.add subs fd (ref ev) - | old_ev -> old_ev := Event.(!old_ev lor ev) - - let iter_ready f : unit = List.iter (fun (fd, ev) -> f fd ev) !ready_fds -*) - let next_deadline_ (self : st) : float option = match Heap.peek_min_exn self.timer with | exception Heap.Empty -> None @@ -136,18 +145,26 @@ let step (self : st) : unit = recompute_if_needed self; let timeout = match next_deadline_ self with - | None -> - let has_waiters = self.sub_r != [] || self.sub_w != [] in - if has_waiters then - 1e9 - else - 0. + | None -> 30. | Some d -> max 0. (d -. now_ ()) in timeout, self.sub_r, self.sub_w in - let r_reads, r_writes, _ = Unix.select sub_r sub_w [] timeout in + (* enter [select] *) + Atomic.set self.in_select true; + let r_reads, r_writes, _ = + Unix.select (self.wakeup_rd :: sub_r) sub_w [] timeout + in + Atomic.set self.in_select false; + + (* drain pipe *) + if Atomic.exchange self.wakeup_triggered false then ( + let b1 = Bytes.create 1 in + while try Unix.read self.wakeup_rd b1 0 1 > 0 with _ -> false do + () + done + ); (* gather the [per_fd] that are ready *) let ready_r = ref [] in