fix evloop: pipe draining must catch EAGAIN

This commit is contained in:
Simon Cruanes 2024-06-24 12:11:10 -04:00
parent 76a7ce0f45
commit a19106d74e
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -153,6 +153,7 @@ module Ev_loop = struct
(** Is the ev loop thread currently waiting? *) (** Is the ev loop thread currently waiting? *)
pipe_read: Unix.file_descr; (** Main thread only *) pipe_read: Unix.file_descr; (** Main thread only *)
pipe_write: Unix.file_descr; (** Wakeup main thread *) pipe_write: Unix.file_descr; (** Wakeup main thread *)
b4: bytes; (** small buffer *)
} }
let create () : t = let create () : t =
@ -166,6 +167,7 @@ module Ev_loop = struct
actions = Action_queue.create (); actions = Action_queue.create ();
pipe_read; pipe_read;
pipe_write; pipe_write;
b4 = Bytes.create 4;
} }
(** Perform the action from within the ev loop thread *) (** Perform the action from within the ev loop thread *)
@ -188,14 +190,14 @@ module Ev_loop = struct
(** Empty the pipe *) (** Empty the pipe *)
let drain_pipe_ (self : t) : unit = let drain_pipe_ (self : t) : unit =
let b = Bytes.create 1 in
try try
let continue = ref true in let continue = ref true in
while !continue do while !continue do
let n = Unix.read self.pipe_read b 0 1 in match Unix.read self.pipe_read self.b4 0 (Bytes.length self.b4) with
if n = 0 then continue := false | n -> if n = 0 then continue := false
| exception Unix.Unix_error (Unix.EAGAIN, _, _) -> ()
done done
with Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> () with Unix.Unix_error (Unix.EWOULDBLOCK, _, _) -> ()
let run_step_ (self : t) : unit = let run_step_ (self : t) : unit =
perform_pending_actions self; perform_pending_actions self;
@ -207,11 +209,18 @@ module Ev_loop = struct
let reads, writes = IO_tbl.prepare_select self.io_tbl in let reads, writes = IO_tbl.prepare_select self.io_tbl in
A.set self.in_blocking_section true; A.set self.in_blocking_section true;
let reads, writes, _ = let reads, writes, _ =
Tracing_.message
(Printf.sprintf "select: %d read (+selfpipe), %d writes"
(List.length reads) (List.length writes));
let@ _sp = Tracing_.with_span "moonpool-unix.evloop.select" in let@ _sp = Tracing_.with_span "moonpool-unix.evloop.select" in
Unix.select (self.pipe_read :: reads) writes [] delay Unix.select (self.pipe_read :: reads) writes [] delay
in in
A.set self.in_blocking_section false; A.set self.in_blocking_section false;
Tracing_.message
(Printf.sprintf "select: %d read ready, %d writes ready"
(List.length reads) (List.length writes));
drain_pipe_ self; drain_pipe_ self;
IO_tbl.handle_ready ~ignore_read:self.pipe_read self.io_tbl reads writes; IO_tbl.handle_ready ~ignore_read:self.pipe_read self.io_tbl reads writes;