From 879d380faf527d5125282da1a0013c3df274f237 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 25 Jun 2024 14:08:39 -0400 Subject: [PATCH] fix: suspend when calling accept/connect --- src/unix/async_io.ml | 23 ++++++++++++++++++++--- src/unix/ev_loop.ml | 22 +++++++++++++--------- 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/src/unix/async_io.ml b/src/unix/async_io.ml index e0bae3d4..71f0aedc 100644 --- a/src/unix/async_io.ml +++ b/src/unix/async_io.ml @@ -163,8 +163,19 @@ module TCP_server = struct match Unix.accept sock with | csock, addr -> csock, addr | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> - Ev_loop.wait_readable sock Cancel_handle.dummy ignore; - accept_ sock + (let cancel = Cancel_handle.create () in + let@ () = + Fiber.with_on_self_cancel (fun _ -> Cancel_handle.cancel cancel) + in + Tracing_.message "accept: suspend"; + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~run:_ ~resume sus -> + Ev_loop.wait_readable sock cancel (fun _cancel -> + resume sus @@ Ok ())); + }); + (accept_ [@tailcall]) sock class base_server ?(listen = 32) ?(buf_pool = Buf_pool.dummy) ?(buf_size = 4096) ~runner ~(handle : conn_handler) (addr : Sockaddr.t) : @@ -268,7 +279,13 @@ module TCP_client = struct | exception Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _) -> - Ev_loop.wait_writable sock Cancel_handle.dummy ignore; + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~run:_ ~resume sus -> + Ev_loop.wait_writable sock Cancel_handle.dummy (fun _cancel -> + resume sus @@ Ok ())); + }; connect_ sock addr let with_connect' addr (f : Fd.t -> 'a) : 'a = diff --git a/src/unix/ev_loop.ml b/src/unix/ev_loop.ml index 274b3508..c25a746d 100644 --- a/src/unix/ev_loop.ml +++ b/src/unix/ev_loop.ml @@ -81,6 +81,7 @@ module IO_tbl = struct per_fd let add_io_wait (self : t) fd mode (ev : IO_wait.t) = + Tracing_.message "add io wait"; let per_fd = get_or_create self fd in match mode with | Read -> @@ -154,12 +155,13 @@ module Ev_loop = struct pipe_read: Unix.file_descr; (** Main thread only *) pipe_write: Unix.file_descr; (** Wakeup main thread *) b4: bytes; (** small buffer *) + b1: bytes; (** small buffer *) } let create () : t = let pipe_read, pipe_write = Unix.pipe ~cloexec:true () in Unix.set_nonblock pipe_read; - Unix.set_nonblock pipe_write; + (* pipe_write remains blocking *) { timer = Timer.create (); io_tbl = IO_tbl.create (); @@ -168,6 +170,7 @@ module Ev_loop = struct pipe_read; pipe_write; b4 = Bytes.create 4; + b1 = Bytes.create 1; } (** Perform the action from within the ev loop thread *) @@ -193,11 +196,13 @@ module Ev_loop = struct try let continue = ref true in while !continue do - match Unix.read self.pipe_read self.b4 0 (Bytes.length self.b4) with - | n -> if n = 0 then continue := false - | exception Unix.Unix_error (Unix.EAGAIN, _, _) -> () + let n = Unix.read self.pipe_read self.b4 0 (Bytes.length self.b4) in + if n = 0 then + continue := false + else + Tracing_.message (spf "drained %dB from pipe" n) done - with Unix.Unix_error (Unix.EWOULDBLOCK, _, _) -> () + with Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> () let run_step_ (self : t) : unit = perform_pending_actions self; @@ -212,6 +217,7 @@ module Ev_loop = struct let@ _sp = Tracing_.with_span "moonpool-unix.evloop.select" in Unix.select (self.pipe_read :: reads) writes [] delay in + A.set self.in_blocking_section false; drain_pipe_ self; @@ -255,10 +261,8 @@ let[@inline] get_current_ () = | None -> start_background_loop () let interrupt_if_in_blocking_section_ (self : Ev_loop.t) = - if A.get self.in_blocking_section then ( - let b = Bytes.create 1 in - ignore (Unix.write self.pipe_write b 0 1 : int) - ) + if A.get self.in_blocking_section then + ignore (Unix.write self.pipe_write self.b1 0 1 : int) let wait_readable fd cancel f : unit = let ev_loop = get_current_ () in