From a147eeb0ab9a871dfedc6fc459298fe43185a1ca Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 26 Jun 2024 12:14:52 -0400 Subject: [PATCH] feat: move to `poll` for IOs --- src/io/async_io.ml | 16 +++--- src/io/ev_loop.ml | 121 +++++++++++++++++++++++++++------------------ src/io/ev_loop.mli | 8 +-- 3 files changed, 84 insertions(+), 61 deletions(-) diff --git a/src/io/async_io.ml b/src/io/async_io.ml index 71f0aedc..9b71b9b5 100644 --- a/src/io/async_io.ml +++ b/src/io/async_io.ml @@ -17,7 +17,7 @@ let rec read (fd : Fd.t) buf i len : int = { handle = (fun ~run:_ ~resume sus -> - Ev_loop.wait_readable fd.fd cancel (fun cancel -> + Ev_loop.wait_readable fd cancel (fun cancel -> resume sus @@ Ok (); Cancel_handle.cancel cancel)); }; @@ -40,7 +40,7 @@ let rec write_once (fd : Fd.t) buf i len : int = { handle = (fun ~run:_ ~resume sus -> - Ev_loop.wait_writable fd.fd cancel (fun cancel -> + Ev_loop.wait_writable fd cancel (fun cancel -> resume sus @@ Ok (); Cancel_handle.cancel cancel)); }; @@ -159,8 +159,8 @@ module TCP_server = struct | Running | Stopped - let rec accept_ (sock : Unix.file_descr) = - match Unix.accept sock with + let rec accept_ (sock : Fd.t) = + match Unix.accept sock.fd with | csock, addr -> csock, addr | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> (let cancel = Cancel_handle.create () in @@ -219,7 +219,7 @@ module TCP_server = struct Unix.set_nonblock sock; Unix.bind sock addr; Unix.listen sock listen; - sock + Fd.create sock with e -> let bt = Printexc.get_raw_backtrace () in A.set st Stopped; @@ -273,8 +273,8 @@ end module TCP_client = struct (** connect asynchronously *) - let rec connect_ sock addr = - match Unix.connect sock addr with + let rec connect_ (sock : Fd.t) addr = + match Unix.connect sock.fd addr with | () -> () | exception Unix.Unix_error @@ -292,9 +292,9 @@ module TCP_client = struct let sock = Unix.socket (Sockaddr.domain addr) Unix.SOCK_STREAM 0 in Unix.set_nonblock sock; Unix.setsockopt sock Unix.TCP_NODELAY true; + let sock = Fd.create sock in connect_ sock addr; - let sock = Fd.create sock in let finally () = Fd.close_noerr sock in let@ () = Fun.protect ~finally in diff --git a/src/io/ev_loop.ml b/src/io/ev_loop.ml index c25a746d..7cff644f 100644 --- a/src/io/ev_loop.ml +++ b/src/io/ev_loop.ml @@ -6,8 +6,8 @@ module Action = struct (** Action that we ask the lwt loop to perform, from the outside *) type t = - | Wait_readable of Unix.file_descr * cb * Cancel_handle.t - | Wait_writable of Unix.file_descr * cb * Cancel_handle.t + | Wait_readable of Fd.t * cb * Cancel_handle.t + | Wait_writable of Fd.t * cb * Cancel_handle.t | Run_after_s of float * cb * Cancel_handle.t | Run_every_s of float * cb * Cancel_handle.t end @@ -55,12 +55,23 @@ end module Per_fd = struct type t = { - fd: Unix.file_descr; + fd: Fd.t; mutable reads: IO_wait.t list; mutable writes: IO_wait.t list; } let[@inline] is_empty self = self.reads = [] && self.writes = [] + + let update_event (self : t) ~(poll : Poll.t) : unit = + let ev = + match self.reads, self.writes with + | _ when Fd.closed self.fd -> Poll.Event.none + | [], [] -> Poll.Event.none + | _ :: _, [] -> Poll.Event.read + | [], _ :: _ -> Poll.Event.write + | _ :: _, _ :: _ -> Poll.Event.read_write + in + Poll.set poll self.fd.fd ev end (** Keep track of the subscriptions to channels *) @@ -68,16 +79,18 @@ module IO_tbl = struct type t = { mutable n_read: int; mutable n_write: int; + poll: Poll.t; tbl: (Unix.file_descr, Per_fd.t) Hashtbl.t; } - let create () : t = { tbl = Hashtbl.create 32; n_read = 0; n_write = 0 } + let create ~poll () : t = + { tbl = Hashtbl.create 32; n_read = 0; n_write = 0; poll } - let get_or_create (self : t) fd : Per_fd.t = - try Hashtbl.find self.tbl fd + let get_or_create (self : t) (fd : Fd.t) : Per_fd.t = + try Hashtbl.find self.tbl fd.fd with Not_found -> let per_fd = { Per_fd.fd; reads = []; writes = [] } in - Hashtbl.add self.tbl fd per_fd; + Hashtbl.add self.tbl fd.fd per_fd; per_fd let add_io_wait (self : t) fd mode (ev : IO_wait.t) = @@ -86,48 +99,51 @@ module IO_tbl = struct match mode with | Read -> self.n_read <- 1 + self.n_read; - per_fd.reads <- ev :: per_fd.reads + per_fd.reads <- ev :: per_fd.reads; + if self.n_read = 0 then Per_fd.update_event per_fd ~poll:self.poll | Write -> self.n_write <- 1 + self.n_write; - per_fd.writes <- ev :: per_fd.writes + per_fd.writes <- ev :: per_fd.writes; + if self.n_write = 0 then Per_fd.update_event per_fd ~poll:self.poll - let prepare_select (self : t) = - let reads = ref [] in - let writes = ref [] in - Hashtbl.iter - (fun _ (per_fd : Per_fd.t) -> - if Per_fd.is_empty per_fd then - Hashtbl.remove self.tbl per_fd.fd - else ( - if per_fd.reads <> [] then reads := per_fd.fd :: !reads; - if per_fd.writes <> [] then writes := per_fd.fd :: !writes - )) - self.tbl; - !reads, !writes - - let trigger_waiter (io : IO_wait.t) = + let[@inline] trigger_waiter (io : IO_wait.t) = if io.active then io.f io.as_cancel_handle - let handle_ready ~ignore_read (self : t) (reads : Unix.file_descr list) - (writes : Unix.file_descr list) : unit = - List.iter - (fun fd -> - if fd <> ignore_read then ( - let per_fd = Hashtbl.find self.tbl fd in + (** Wake up waiters on FDs who received events *) + let handle_ready ~ignore_fd (self : t) : unit = + let update_per_fd (per_fd : Per_fd.t) (event : Poll.Event.t) = + if Fd.closed per_fd.fd then + (* cleanup *) + Hashtbl.remove self.tbl per_fd.fd.fd + else ( + if event.readable then ( List.iter trigger_waiter per_fd.reads; self.n_read <- self.n_read - List.length per_fd.reads; per_fd.reads <- [] - )) - reads; + ); - List.iter - (fun fd -> - let per_fd = Hashtbl.find self.tbl fd in - List.iter trigger_waiter per_fd.writes; - self.n_write <- self.n_write - List.length per_fd.writes; - per_fd.writes <- []) - writes; + if event.writable then ( + List.iter trigger_waiter per_fd.writes; + self.n_write <- self.n_write - List.length per_fd.writes; + per_fd.writes <- [] + ); + Per_fd.update_event per_fd ~poll:self.poll + ) + in + + Poll.iter_ready self.poll ~f:(fun fd event -> + if fd <> ignore_fd then ( + let per_fd = Hashtbl.find self.tbl fd in + update_per_fd per_fd event + )); () + + (** Remove closed FDs *) + let regular_cleanup (self : t) : unit = + Hashtbl.iter + (fun key (per_fd : Per_fd.t) -> + if Fd.closed per_fd.fd then Hashtbl.remove self.tbl key) + self.tbl end let run_timer_ (t : Timer.t) = @@ -154,19 +170,25 @@ module Ev_loop = struct (** Is the ev loop thread currently waiting? *) pipe_read: Unix.file_descr; (** Main thread only *) pipe_write: Unix.file_descr; (** Wakeup main thread *) + poll: Poll.t; b4: bytes; (** small buffer *) b1: bytes; (** small buffer *) } let create () : t = let pipe_read, pipe_write = Unix.pipe ~cloexec:true () in - Unix.set_nonblock pipe_read; (* pipe_write remains blocking *) + Unix.set_nonblock pipe_read; + + let poll = Poll.create () in + Poll.set poll pipe_read Poll.Event.read; + { timer = Timer.create (); - io_tbl = IO_tbl.create (); + io_tbl = IO_tbl.create ~poll (); in_blocking_section = A.make false; actions = Action_queue.create (); + poll; pipe_read; pipe_write; b4 = Bytes.create 4; @@ -207,21 +229,26 @@ module Ev_loop = struct let run_step_ (self : t) : unit = perform_pending_actions self; - let delay = run_timer_ self.timer in - let delay = Option.value delay ~default:10. in + let delay_s = run_timer_ self.timer in + let delay_s = Option.value delay_s ~default:10. in + let timeout = Poll.Timeout.after Int64.(of_float (delay_s *. 1e9)) in (* run [select] *) - let reads, writes = IO_tbl.prepare_select self.io_tbl in A.set self.in_blocking_section true; - let reads, writes, _ = + let has_events = let@ _sp = Tracing_.with_span "moonpool-unix.evloop.select" in - Unix.select (self.pipe_read :: reads) writes [] delay + match Poll.wait self.poll timeout with + | `Timeout -> false + | `Ok -> true in A.set self.in_blocking_section false; drain_pipe_ self; - IO_tbl.handle_ready ~ignore_read:self.pipe_read self.io_tbl reads writes; + if has_events then + IO_tbl.handle_ready ~ignore_fd:self.pipe_read self.io_tbl + else + IO_tbl.regular_cleanup self.io_tbl; perform_pending_actions self; () diff --git a/src/io/ev_loop.mli b/src/io/ev_loop.mli index 4f0e1ebe..a58b13ab 100644 --- a/src/io/ev_loop.mli +++ b/src/io/ev_loop.mli @@ -1,10 +1,6 @@ (** Event loop *) -val wait_readable : - Unix.file_descr -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit - -val wait_writable : - Unix.file_descr -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit - +val wait_readable : Fd.t -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit +val wait_writable : Fd.t -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit val run_after_s : float -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit val run_every_s : float -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit