From 532ec3657992c4e574baf92d29c7c8290fd9de36 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 12 Mar 2024 22:14:18 -0400 Subject: [PATCH] feat io: add `Fd` wrapper, ressuscitate Net --- src/io/IO_unix.ml.tmp | 142 ------------------------------- src/io/IO_unix.mli.tmp | 21 ----- src/io/dune | 2 +- src/io/ev_loop.mli | 2 + src/io/fd.ml | 146 ++++++++++++++++++++++++++++++++ src/io/fd.mli | 41 +++++++++ src/io/moonpool_io.ml | 4 +- src/io/moonpool_io.mli | 9 +- src/io/{net.ml.tmp => net.ml} | 63 ++++++-------- src/io/{net.mli.tmp => net.mli} | 12 ++- 10 files changed, 231 insertions(+), 211 deletions(-) delete mode 100644 src/io/IO_unix.ml.tmp delete mode 100644 src/io/IO_unix.mli.tmp create mode 100644 src/io/fd.ml create mode 100644 src/io/fd.mli rename src/io/{net.ml.tmp => net.ml} (63%) rename src/io/{net.mli.tmp => net.mli} (70%) diff --git a/src/io/IO_unix.ml.tmp b/src/io/IO_unix.ml.tmp deleted file mode 100644 index 0457ab31..00000000 --- a/src/io/IO_unix.ml.tmp +++ /dev/null @@ -1,142 +0,0 @@ -open struct - let _default_buf_size = 16 * 1024 -end - -type file_descr = Unix.file_descr - -let await_readable fd = - let loop = U_loop.cur () in - (* wait for FD to be ready *) - Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup -> - ignore - (loop#on_readable fd (fun ev -> - wakeup (); - Cancel_handle.cancel ev) - : Cancel_handle.t)) - -let rec read fd buf i len : int = - if len = 0 then - 0 - else ( - match Unix.read fd buf i len with - | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> - await_readable fd; - read fd buf i len - | n -> n - ) - -let await_writable fd = - let loop = U_loop.cur () in - (* wait for FD to be ready *) - Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup -> - ignore - (loop#on_writable fd (fun ev -> - wakeup (); - Cancel_handle.cancel ev) - : Cancel_handle.t)) - -let rec write_once fd buf i len : int = - if len = 0 then - 0 - else ( - match Unix.write fd buf i len with - | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> - await_writable fd; - write_once fd buf i len - | n -> n - ) - -let write fd buf i len : unit = - let i = ref i in - let len = ref len in - while !len > 0 do - let n = write_once fd buf !i !len in - i := !i + n; - len := !len - n - done - -module Out = struct - include Iostream.Out - - let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) - fd : t = - let buf_off = ref 0 in - - let[@inline] is_full () = !buf_off = Bytes.length buf in - - let flush () = - if !buf_off > 0 then ( - write fd buf 0 !buf_off; - buf_off := 0 - ) - in - - object - method output_char c = - if is_full () then flush (); - Bytes.set buf !buf_off c; - incr buf_off - - method output bs i len : unit = - let i = ref i in - let len = ref len in - - while !len > 0 do - (* make space *) - if is_full () then flush (); - - let n = min !len (Bytes.length buf - !buf_off) in - Bytes.blit bs !i buf !buf_off n; - buf_off := !buf_off + n; - i := !i + n; - len := !len - n - done; - (* if full, write eagerly *) - if is_full () then flush () - - method close () = - if close_noerr then ( - try - flush (); - Unix.close fd - with _ -> () - ) else ( - flush (); - Unix.close fd - ) - - method flush = flush - end -end - -module In = struct - include Iostream.In - - let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) - (fd : Unix.file_descr) : t = - let buf_len = ref 0 in - let buf_off = ref 0 in - - let refill () = - buf_off := 0; - buf_len := read fd buf 0 (Bytes.length buf) - in - - object - method input b i len : int = - if !buf_len = 0 then refill (); - let n = min len !buf_len in - if n > 0 then ( - Bytes.blit buf !buf_off b i n; - buf_off := !buf_off + n; - buf_len := !buf_len - n - ); - n - - method close () = - if close_noerr then ( - try Unix.close fd with _ -> () - ) else - Unix.close fd - end -end diff --git a/src/io/IO_unix.mli.tmp b/src/io/IO_unix.mli.tmp deleted file mode 100644 index 96010e2f..00000000 --- a/src/io/IO_unix.mli.tmp +++ /dev/null @@ -1,21 +0,0 @@ -(** Low level Unix IOs *) - -type file_descr = Unix.file_descr - -val read : file_descr -> bytes -> int -> int -> int -val write_once : file_descr -> bytes -> int -> int -> int -val write : file_descr -> bytes -> int -> int -> unit -val await_readable : file_descr -> unit -val await_writable : file_descr -> unit - -module In : sig - include module type of Iostream.In - - val of_unix_fd : ?close_noerr:bool -> ?buf:bytes -> file_descr -> t -end - -module Out : sig - include module type of Iostream.Out - - val of_unix_fd : ?close_noerr:bool -> ?buf:bytes -> file_descr -> t -end diff --git a/src/io/dune b/src/io/dune index 6d165d9e..c0b22fc0 100644 --- a/src/io/dune +++ b/src/io/dune @@ -1,7 +1,7 @@ (library (name moonpool_io) - (public_name moonpool.io) + (public_name moonpool-io) (synopsis "Event loop for Moonpool based on poll") (optional) ; dep on poll (flags :standard -open Moonpool) diff --git a/src/io/ev_loop.mli b/src/io/ev_loop.mli index 047bade6..a422585e 100644 --- a/src/io/ev_loop.mli +++ b/src/io/ev_loop.mli @@ -11,3 +11,5 @@ val on_readable : t -> Unix.file_descr -> (unit -> unit) -> Cancel_handle.t val on_writable : t -> Unix.file_descr -> (unit -> unit) -> Cancel_handle.t val run_after_s : t -> float -> (unit -> unit) -> Cancel_handle.t val run_every_s : t -> float -> (Cancel_handle.t -> unit) -> Cancel_handle.t + +(* FIXME: add a close function that closes a FD here and removes the subscription *) diff --git a/src/io/fd.ml b/src/io/fd.ml new file mode 100644 index 00000000..0bb2ea05 --- /dev/null +++ b/src/io/fd.ml @@ -0,0 +1,146 @@ +open Common_ + +open struct + let _default_buf_size = 16 * 1024 +end + +exception Closed + +type state = + | Closed + | Open of { + fd: Unix.file_descr; + close_noerr: bool; + } + +type t = { st: state A.t } [@@unboxed] + +let[@inline] st (self : t) : state = A.get self.st + +let[@inline] fd (self : t) = + match st self with + | Closed -> raise Closed + | Open { fd; _ } -> fd + +let create ?(close_noerr = true) fd : t = + Unix.set_nonblock fd; + { st = A.make @@ Open { close_noerr; fd } } + +let close (self : t) : unit = + while + let old = A.get self.st in + match old with + | Closed -> false + | Open { fd; close_noerr } -> + if A.compare_and_set self.st old Closed then ( + if close_noerr then ( + try Unix.close fd with _ -> () + ) else + Unix.close fd; + + false + ) else + true + do + () + done + +let await_readable (self : t) = + let loop = Ev_loop.get_or_create () in + match A.get self.st with + | Closed -> raise Closed + | Open { fd; _ } -> + (* wait for FD to be ready *) + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~run:_ ~resume sus -> + ignore + (Ev_loop.on_readable loop fd (fun () -> resume sus @@ Ok ()) + : Cancel_handle.t)); + } + +let rec read (self : t) buf i len : int = + match st self with + | Closed -> 0 + | Open { fd; _ } -> + if len = 0 then + 0 + else ( + match Unix.read fd buf i len with + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + (match await_readable self with + | () -> read self buf i len + | exception Closed -> 0) + | n -> n + ) + +let await_writable (self : t) = + let loop = Ev_loop.get_or_create () in + match st self with + | Closed -> raise Closed + | Open { fd; _ } -> + (* wait for FD to be ready *) + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~run:_ ~resume sus -> + ignore + (Ev_loop.on_writable loop fd (fun () -> resume sus @@ Ok ()) + : Cancel_handle.t)); + } + +let rec write_once (self : t) buf i len : int = + match st self with + | Closed -> 0 + | Open { fd; _ } -> + if len = 0 then + 0 + else ( + match Unix.write fd buf i len with + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + (match await_writable self with + | () -> write_once self buf i len + | exception Closed -> 0) + | n -> n + ) + +let write self buf i len : unit = + let i = ref i in + let len = ref len in + while !len > 0 do + let n = write_once self buf !i !len in + if n = 0 then raise Closed; + i := !i + n; + len := !len - n + done + +class to_in (self : t) : Iostream.In.t = + object + method input bs i len = read self bs i len + method close () = close self + end + +class to_out (self : t) : Iostream.Out.t = + object + method output bs i len = write self bs i len + method close () = close self + end + +class to_in_buf ?bytes (self : t) : Iostream.In_buf.t = + object + inherit Iostream.In_buf.t_from_refill ?bytes () + + method private refill (slice : Iostream.Slice.t) = + slice.off <- 0; + slice.len <- read self slice.bytes 0 (Bytes.length slice.bytes) + + method close () = close self + end + +class to_out_buf ?bytes (self : t) : Iostream.Out_buf.t = + object + inherit Iostream.Out_buf.t_from_output ?bytes () + method private close_underlying () = close self + method private output_underlying bs i len = write self bs i len + end diff --git a/src/io/fd.mli b/src/io/fd.mli new file mode 100644 index 00000000..9c19c121 --- /dev/null +++ b/src/io/fd.mli @@ -0,0 +1,41 @@ +type t +(** A wrapper around a unix file descriptor. + This makes closing idempotent and ensures that the + Unix FD is only accessible while this is open. *) + +exception Closed + +val create : ?close_noerr:bool -> Unix.file_descr -> t +(** Create a new file descriptor from a unix one. *) + +val fd : t -> Unix.file_descr +(** Access the underlying file descriptor. + @raise Closed if the FD was closed. *) + +val close : t -> unit +(** Close the FD. Idempotent. *) + +val read : t -> bytes -> int -> int -> int +(** Read from the FD. Suspends if the FD is not ready. + @return 0 if the FD is closed *) + +val write_once : t -> bytes -> int -> int -> int +(** Write into the FD. Suspends if the FD is not ready. + @return 0 if the FD is closed *) + +val write : t -> bytes -> int -> int -> unit +(** A loop around {!write_once}. + @raise Closed if the FD is closed before this completes. *) + +val await_readable : t -> unit +(** Wait for the FD to be readable. + @raise Closed if the FD is closed. *) + +val await_writable : t -> unit +(** Wait for the FD to be writable. + @raise Closed if the FD is closed. *) + +class to_in : t -> Iostream.In.t +class to_out : t -> Iostream.Out.t +class to_in_buf : ?bytes:bytes -> t -> Iostream.In_buf.t +class to_out_buf : ?bytes:bytes -> t -> Iostream.Out_buf.t diff --git a/src/io/moonpool_io.ml b/src/io/moonpool_io.ml index d6958c13..ed565218 100644 --- a/src/io/moonpool_io.ml +++ b/src/io/moonpool_io.ml @@ -1,5 +1,5 @@ open Common_ +module Fd = Fd module Timer = Timer module Ev_loop = Ev_loop -(* TODO: module IO_unix = IO_unix *) -(* TODO: module Net = Net *) +module Net = Net diff --git a/src/io/moonpool_io.mli b/src/io/moonpool_io.mli index 1fae812e..352dd2c6 100644 --- a/src/io/moonpool_io.mli +++ b/src/io/moonpool_io.mli @@ -1,4 +1,9 @@ -(* module IO_unix = IO_unix *) -(* module Net = Net *) +(** IO loop. + + This event loop runs in a background thread and provides + non-blocking IOs to moonpool. *) + +module Fd = Fd module Timer = Timer module Ev_loop = Ev_loop +module Net = Net diff --git a/src/io/net.ml.tmp b/src/io/net.ml similarity index 63% rename from src/io/net.ml.tmp rename to src/io/net.ml index 8db14797..fec855cd 100644 --- a/src/io/net.ml.tmp +++ b/src/io/net.ml @@ -54,18 +54,16 @@ module TCP_server = struct Unix.set_nonblock sock; Unix.setsockopt sock Unix.SO_REUSEADDR true; Unix.listen sock 32; + let fd_sock = Fd.create sock in - let fut, _ = Fut.make () in + let fut, promise = Fut.make () in let self = { fut } in let loop_client client_sock client_addr : unit = - Unix.set_nonblock client_sock; Unix.setsockopt client_sock Unix.TCP_NODELAY true; + let client_sock = Fd.create ~close_noerr:true client_sock in - let@ () = - Fun.protect ~finally:(fun () -> - try Unix.close client_sock with _ -> ()) - in + let@ () = Fun.protect ~finally:(fun () -> Fd.close client_sock) in handle_client client_addr client_sock in @@ -78,74 +76,61 @@ module TCP_server = struct loop_client client_sock client_addr) : _ Fiber.t) | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + (* FIXME: possible race condition: the socket became readable + in the mid-time and we won't get notified. We need to call + [accept] after subscribing to [on_readable]. *) (* suspend *) - let loop = U_loop.cur () in - Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup -> - (* FIXME: possible race condition: the socket became readable - in the mid-time and we won't get notified. We need to call - [accept] after subscribing to [on_readable]. *) - ignore - (loop#on_readable sock (fun _ev -> - wakeup (); - Cancel_handle.cancel _ev) - : Cancel_handle.t)) + Fd.await_readable fd_sock done in - let loop_fiber = - let sched = Fuseau.get_scheduler () in - Fuseau.spawn_as_child_of ~propagate_cancel_to_parent:true sched fiber loop - in + let _loop_fiber : unit Fiber.t = Fiber.spawn ~protect:false loop in let finally () = - stop_ loop_fiber; - Unix.close sock + Fut.fulfill_idempotent promise @@ Ok (); + Fd.close fd_sock in let@ () = Fun.protect ~finally in f self let with_serve (addr : Sockaddr.t) handle_client (f : t -> 'a) : 'a = with_serve' addr - (fun client_addr client_sock -> - let ic = IO_unix.In.of_unix_fd client_sock in - let oc = IO_unix.Out.of_unix_fd client_sock in + (fun client_addr (client_sock : Fd.t) -> + let ic = new Fd.to_in_buf client_sock in + let oc = new Fd.to_out_buf client_sock in handle_client client_addr ic oc) f end module TCP_client = struct - let with_connect' addr (f : Unix.file_descr -> 'a) : 'a = + let with_connect' addr (f : Fd.t -> 'a) : 'a = let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in - Unix.set_nonblock sock; Unix.setsockopt sock Unix.TCP_NODELAY true; + let sock = Fd.create ~close_noerr:true sock in (* connect asynchronously *) while try - Unix.connect sock addr; + let fd = Fd.fd sock in + Unix.connect fd addr; false with | Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _) -> - Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup -> - let loop = U_loop.cur () in - ignore - (loop#on_writable sock (fun _ev -> - wakeup (); - Cancel_handle.cancel _ev) - : Cancel_handle.t)); + Fd.await_writable sock; true do () done; - let finally () = try Unix.close sock with _ -> () in + let finally () = Fd.close sock in let@ () = Fun.protect ~finally in f sock - let with_connect addr (f : Iostream.In.t -> Iostream.Out.t -> 'a) : 'a = + let with_connect addr (f : Iostream.In_buf.t -> Iostream.Out_buf.t -> 'a) : 'a + = with_connect' addr (fun sock -> - let ic = IO_unix.In.of_unix_fd sock in - let oc = IO_unix.Out.of_unix_fd sock in + let ic = new Fd.to_in_buf sock in + let oc = new Fd.to_out_buf sock in f ic oc) end diff --git a/src/io/net.mli.tmp b/src/io/net.mli similarity index 70% rename from src/io/net.mli.tmp rename to src/io/net.mli index f6289844..f32a6492 100644 --- a/src/io/net.mli.tmp +++ b/src/io/net.mli @@ -32,18 +32,22 @@ module TCP_server : sig val join : t -> unit val with_serve' : - Sockaddr.t -> (Sockaddr.t -> Unix.file_descr -> unit) -> (t -> 'a) -> 'a + Sockaddr.t -> (Sockaddr.t -> Fd.t -> unit) -> (t -> 'a) -> 'a + + (* TODO: bytes pool *) val with_serve : Sockaddr.t -> - (Sockaddr.t -> Iostream.In.t -> Iostream.Out.t -> unit) -> + (Sockaddr.t -> Iostream.In_buf.t -> Iostream.Out_buf.t -> unit) -> (t -> 'a) -> 'a end module TCP_client : sig - val with_connect' : Unix.sockaddr -> (Unix.file_descr -> 'a) -> 'a + val with_connect' : Unix.sockaddr -> (Fd.t -> 'a) -> 'a + + (* TODO: bytes pool *) val with_connect : - Unix.sockaddr -> (Iostream.In.t -> Iostream.Out.t -> 'a) -> 'a + Unix.sockaddr -> (Iostream.In_buf.t -> Iostream.Out_buf.t -> 'a) -> 'a end