fix: implement wakeup_from_outside; fix bugs

This commit is contained in:
Simon Cruanes 2024-10-24 00:06:56 -04:00
parent c346420401
commit 4da0257298
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 49 additions and 22 deletions

View file

@ -32,3 +32,13 @@ let[@inline] run_after_s (Ev (ops, st)) time x y f : unit =
ops.run_after_s st time x y f ops.run_after_s st time x y f
let[@inline] step (Ev (ops, st)) : unit = ops.step st let[@inline] step (Ev (ops, st)) : unit = ops.step st
(*
let rec read (self:t) fd buf i len : int =
match Unix.read fd buf i len with
| n -> n
| exception Unix.Unix_error (Unix, _, _) ->
read self fd buf i len
*)

View file

@ -34,18 +34,29 @@ type st = {
mutable sub_w: Unix.file_descr list; mutable sub_w: Unix.file_descr list;
mutable sub_up_to_date: bool; mutable sub_up_to_date: bool;
(** are [sub_r] and [sub_w] faithful reflections of [fds]? *) (** are [sub_r] and [sub_w] faithful reflections of [fds]? *)
wakeup_rd: Unix.file_descr;
wakeup_wr: Unix.file_descr;
wakeup_triggered: bool Atomic.t;
(** Make [wakeup_from_outside] idempotent within an iteration of [step] *)
in_select: bool Atomic.t;
lock: Mutex.t; lock: Mutex.t;
} }
let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline
let create_st () : st = let create_st () : st =
let wakeup_rd, wakeup_wr = Unix.pipe () in
Unix.set_nonblock wakeup_rd;
{ {
timer = Heap.create ~leq:leq_timer (); timer = Heap.create ~leq:leq_timer ();
fds = Hashtbl.create 16; fds = Hashtbl.create 16;
sub_r = []; sub_r = [];
sub_w = []; sub_w = [];
sub_up_to_date = true; sub_up_to_date = true;
wakeup_rd;
wakeup_wr;
wakeup_triggered = Atomic.make false;
in_select = Atomic.make false;
lock = Mutex.create (); lock = Mutex.create ();
} }
@ -68,8 +79,11 @@ let clear (self : st) =
self.sub_up_to_date <- true; self.sub_up_to_date <- true;
() ()
(* TODO: *) let wakeup_from_outside (self : st) : unit =
let wakeup_from_outside _self : unit = () if not (Atomic.exchange self.wakeup_triggered true) then (
let b = Bytes.make 1 '!' in
ignore (Unix.write self.wakeup_wr b 0 1 : int)
)
let get_fd_ (self : st) fd : per_fd = let get_fd_ (self : st) fd : per_fd =
match Hashtbl.find self.fds fd with match Hashtbl.find self.fds fd with
@ -82,17 +96,22 @@ let get_fd_ (self : st) fd : per_fd =
let on_readable self fd x y f : unit = let on_readable self fd x y f : unit =
let@ self = with_lock_ self in let@ self = with_lock_ self in
let per_fd = get_fd_ self fd in let per_fd = get_fd_ self fd in
per_fd.r <- Sub (x, y, f, per_fd.r) per_fd.r <- Sub (x, y, f, per_fd.r);
self.sub_up_to_date <- false;
if Atomic.get self.in_select then wakeup_from_outside self
let on_writable self fd x y f : unit = let on_writable self fd x y f : unit =
let@ self = with_lock_ self in let@ self = with_lock_ self in
let per_fd = get_fd_ self fd in let per_fd = get_fd_ self fd in
per_fd.w <- Sub (x, y, f, per_fd.w) per_fd.w <- Sub (x, y, f, per_fd.w);
self.sub_up_to_date <- false;
if Atomic.get self.in_select then wakeup_from_outside self
let run_after_s self time x y f : unit = let run_after_s self time x y f : unit =
let@ self = with_lock_ self in let@ self = with_lock_ self in
let deadline = now_ () +. time in let deadline = now_ () +. time in
Heap.insert self.timer (Timer { deadline; x; y; f }) Heap.insert self.timer (Timer { deadline; x; y; f });
if Atomic.get self.in_select then wakeup_from_outside self
let recompute_if_needed (self : st) = let recompute_if_needed (self : st) =
if not self.sub_up_to_date then ( if not self.sub_up_to_date then (
@ -108,16 +127,6 @@ let recompute_if_needed (self : st) =
self.fds self.fds
) )
(*
let set fd (ev : Event.t) : unit =
needs_recompute := true;
match Hashtbl.find subs fd with
| exception Not_found -> Hashtbl.add subs fd (ref ev)
| old_ev -> old_ev := Event.(!old_ev lor ev)
let iter_ready f : unit = List.iter (fun (fd, ev) -> f fd ev) !ready_fds
*)
let next_deadline_ (self : st) : float option = let next_deadline_ (self : st) : float option =
match Heap.peek_min_exn self.timer with match Heap.peek_min_exn self.timer with
| exception Heap.Empty -> None | exception Heap.Empty -> None
@ -136,18 +145,26 @@ let step (self : st) : unit =
recompute_if_needed self; recompute_if_needed self;
let timeout = let timeout =
match next_deadline_ self with match next_deadline_ self with
| None -> | None -> 30.
let has_waiters = self.sub_r != [] || self.sub_w != [] in
if has_waiters then
1e9
else
0.
| Some d -> max 0. (d -. now_ ()) | Some d -> max 0. (d -. now_ ())
in in
timeout, self.sub_r, self.sub_w timeout, self.sub_r, self.sub_w
in in
let r_reads, r_writes, _ = Unix.select sub_r sub_w [] timeout in (* enter [select] *)
Atomic.set self.in_select true;
let r_reads, r_writes, _ =
Unix.select (self.wakeup_rd :: sub_r) sub_w [] timeout
in
Atomic.set self.in_select false;
(* drain pipe *)
if Atomic.exchange self.wakeup_triggered false then (
let b1 = Bytes.create 1 in
while try Unix.read self.wakeup_rd b1 0 1 > 0 with _ -> false do
()
done
);
(* gather the [per_fd] that are ready *) (* gather the [per_fd] that are ready *)
let ready_r = ref [] in let ready_r = ref [] in