fix: suspend when calling accept/connect

This commit is contained in:
Simon Cruanes 2024-06-25 14:08:39 -04:00
parent a2a0e3d306
commit 879d380faf
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 33 additions and 12 deletions

View file

@ -163,8 +163,19 @@ module TCP_server = struct
match Unix.accept sock with match Unix.accept sock with
| csock, addr -> csock, addr | csock, addr -> csock, addr
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
Ev_loop.wait_readable sock Cancel_handle.dummy ignore; (let cancel = Cancel_handle.create () in
accept_ sock let@ () =
Fiber.with_on_self_cancel (fun _ -> Cancel_handle.cancel cancel)
in
Tracing_.message "accept: suspend";
Moonpool.Private.Suspend_.suspend
{
handle =
(fun ~run:_ ~resume sus ->
Ev_loop.wait_readable sock cancel (fun _cancel ->
resume sus @@ Ok ()));
});
(accept_ [@tailcall]) sock
class base_server ?(listen = 32) ?(buf_pool = Buf_pool.dummy) class base_server ?(listen = 32) ?(buf_pool = Buf_pool.dummy)
?(buf_size = 4096) ~runner ~(handle : conn_handler) (addr : Sockaddr.t) : ?(buf_size = 4096) ~runner ~(handle : conn_handler) (addr : Sockaddr.t) :
@ -268,7 +279,13 @@ module TCP_client = struct
| exception | exception
Unix.Unix_error Unix.Unix_error
((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _) -> ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _) ->
Ev_loop.wait_writable sock Cancel_handle.dummy ignore; Moonpool.Private.Suspend_.suspend
{
handle =
(fun ~run:_ ~resume sus ->
Ev_loop.wait_writable sock Cancel_handle.dummy (fun _cancel ->
resume sus @@ Ok ()));
};
connect_ sock addr connect_ sock addr
let with_connect' addr (f : Fd.t -> 'a) : 'a = let with_connect' addr (f : Fd.t -> 'a) : 'a =

View file

@ -81,6 +81,7 @@ module IO_tbl = struct
per_fd per_fd
let add_io_wait (self : t) fd mode (ev : IO_wait.t) = let add_io_wait (self : t) fd mode (ev : IO_wait.t) =
Tracing_.message "add io wait";
let per_fd = get_or_create self fd in let per_fd = get_or_create self fd in
match mode with match mode with
| Read -> | Read ->
@ -154,12 +155,13 @@ module Ev_loop = struct
pipe_read: Unix.file_descr; (** Main thread only *) pipe_read: Unix.file_descr; (** Main thread only *)
pipe_write: Unix.file_descr; (** Wakeup main thread *) pipe_write: Unix.file_descr; (** Wakeup main thread *)
b4: bytes; (** small buffer *) b4: bytes; (** small buffer *)
b1: bytes; (** small buffer *)
} }
let create () : t = let create () : t =
let pipe_read, pipe_write = Unix.pipe ~cloexec:true () in let pipe_read, pipe_write = Unix.pipe ~cloexec:true () in
Unix.set_nonblock pipe_read; Unix.set_nonblock pipe_read;
Unix.set_nonblock pipe_write; (* pipe_write remains blocking *)
{ {
timer = Timer.create (); timer = Timer.create ();
io_tbl = IO_tbl.create (); io_tbl = IO_tbl.create ();
@ -168,6 +170,7 @@ module Ev_loop = struct
pipe_read; pipe_read;
pipe_write; pipe_write;
b4 = Bytes.create 4; b4 = Bytes.create 4;
b1 = Bytes.create 1;
} }
(** Perform the action from within the ev loop thread *) (** Perform the action from within the ev loop thread *)
@ -193,11 +196,13 @@ module Ev_loop = struct
try try
let continue = ref true in let continue = ref true in
while !continue do while !continue do
match Unix.read self.pipe_read self.b4 0 (Bytes.length self.b4) with let n = Unix.read self.pipe_read self.b4 0 (Bytes.length self.b4) in
| n -> if n = 0 then continue := false if n = 0 then
| exception Unix.Unix_error (Unix.EAGAIN, _, _) -> () continue := false
else
Tracing_.message (spf "drained %dB from pipe" n)
done done
with Unix.Unix_error (Unix.EWOULDBLOCK, _, _) -> () with Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> ()
let run_step_ (self : t) : unit = let run_step_ (self : t) : unit =
perform_pending_actions self; perform_pending_actions self;
@ -212,6 +217,7 @@ module Ev_loop = struct
let@ _sp = Tracing_.with_span "moonpool-unix.evloop.select" in let@ _sp = Tracing_.with_span "moonpool-unix.evloop.select" in
Unix.select (self.pipe_read :: reads) writes [] delay Unix.select (self.pipe_read :: reads) writes [] delay
in in
A.set self.in_blocking_section false; A.set self.in_blocking_section false;
drain_pipe_ self; drain_pipe_ self;
@ -255,10 +261,8 @@ let[@inline] get_current_ () =
| None -> start_background_loop () | None -> start_background_loop ()
let interrupt_if_in_blocking_section_ (self : Ev_loop.t) = let interrupt_if_in_blocking_section_ (self : Ev_loop.t) =
if A.get self.in_blocking_section then ( if A.get self.in_blocking_section then
let b = Bytes.create 1 in ignore (Unix.write self.pipe_write self.b1 0 1 : int)
ignore (Unix.write self.pipe_write b 0 1 : int)
)
let wait_readable fd cancel f : unit = let wait_readable fd cancel f : unit =
let ev_loop = get_current_ () in let ev_loop = get_current_ () in