feat io: add Fd wrapper, ressuscitate Net

This commit is contained in:
Simon Cruanes 2024-03-12 22:14:18 -04:00
parent 3530527522
commit 532ec36579
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
10 changed files with 231 additions and 211 deletions

View file

@ -1,142 +0,0 @@
open struct
let _default_buf_size = 16 * 1024
end
type file_descr = Unix.file_descr
let await_readable fd =
let loop = U_loop.cur () in
(* wait for FD to be ready *)
Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup ->
ignore
(loop#on_readable fd (fun ev ->
wakeup ();
Cancel_handle.cancel ev)
: Cancel_handle.t))
let rec read fd buf i len : int =
if len = 0 then
0
else (
match Unix.read fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
await_readable fd;
read fd buf i len
| n -> n
)
let await_writable fd =
let loop = U_loop.cur () in
(* wait for FD to be ready *)
Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup ->
ignore
(loop#on_writable fd (fun ev ->
wakeup ();
Cancel_handle.cancel ev)
: Cancel_handle.t))
let rec write_once fd buf i len : int =
if len = 0 then
0
else (
match Unix.write fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
await_writable fd;
write_once fd buf i len
| n -> n
)
let write fd buf i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
let n = write_once fd buf !i !len in
i := !i + n;
len := !len - n
done
module Out = struct
include Iostream.Out
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size)
fd : t =
let buf_off = ref 0 in
let[@inline] is_full () = !buf_off = Bytes.length buf in
let flush () =
if !buf_off > 0 then (
write fd buf 0 !buf_off;
buf_off := 0
)
in
object
method output_char c =
if is_full () then flush ();
Bytes.set buf !buf_off c;
incr buf_off
method output bs i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
(* make space *)
if is_full () then flush ();
let n = min !len (Bytes.length buf - !buf_off) in
Bytes.blit bs !i buf !buf_off n;
buf_off := !buf_off + n;
i := !i + n;
len := !len - n
done;
(* if full, write eagerly *)
if is_full () then flush ()
method close () =
if close_noerr then (
try
flush ();
Unix.close fd
with _ -> ()
) else (
flush ();
Unix.close fd
)
method flush = flush
end
end
module In = struct
include Iostream.In
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size)
(fd : Unix.file_descr) : t =
let buf_len = ref 0 in
let buf_off = ref 0 in
let refill () =
buf_off := 0;
buf_len := read fd buf 0 (Bytes.length buf)
in
object
method input b i len : int =
if !buf_len = 0 then refill ();
let n = min len !buf_len in
if n > 0 then (
Bytes.blit buf !buf_off b i n;
buf_off := !buf_off + n;
buf_len := !buf_len - n
);
n
method close () =
if close_noerr then (
try Unix.close fd with _ -> ()
) else
Unix.close fd
end
end

View file

@ -1,21 +0,0 @@
(** Low level Unix IOs *)
type file_descr = Unix.file_descr
val read : file_descr -> bytes -> int -> int -> int
val write_once : file_descr -> bytes -> int -> int -> int
val write : file_descr -> bytes -> int -> int -> unit
val await_readable : file_descr -> unit
val await_writable : file_descr -> unit
module In : sig
include module type of Iostream.In
val of_unix_fd : ?close_noerr:bool -> ?buf:bytes -> file_descr -> t
end
module Out : sig
include module type of Iostream.Out
val of_unix_fd : ?close_noerr:bool -> ?buf:bytes -> file_descr -> t
end

View file

@ -1,7 +1,7 @@
(library (library
(name moonpool_io) (name moonpool_io)
(public_name moonpool.io) (public_name moonpool-io)
(synopsis "Event loop for Moonpool based on poll") (synopsis "Event loop for Moonpool based on poll")
(optional) ; dep on poll (optional) ; dep on poll
(flags :standard -open Moonpool) (flags :standard -open Moonpool)

View file

@ -11,3 +11,5 @@ val on_readable : t -> Unix.file_descr -> (unit -> unit) -> Cancel_handle.t
val on_writable : t -> Unix.file_descr -> (unit -> unit) -> Cancel_handle.t val on_writable : t -> Unix.file_descr -> (unit -> unit) -> Cancel_handle.t
val run_after_s : t -> float -> (unit -> unit) -> Cancel_handle.t val run_after_s : t -> float -> (unit -> unit) -> Cancel_handle.t
val run_every_s : t -> float -> (Cancel_handle.t -> unit) -> Cancel_handle.t val run_every_s : t -> float -> (Cancel_handle.t -> unit) -> Cancel_handle.t
(* FIXME: add a close function that closes a FD here and removes the subscription *)

146
src/io/fd.ml Normal file
View file

@ -0,0 +1,146 @@
open Common_
open struct
let _default_buf_size = 16 * 1024
end
exception Closed
type state =
| Closed
| Open of {
fd: Unix.file_descr;
close_noerr: bool;
}
type t = { st: state A.t } [@@unboxed]
let[@inline] st (self : t) : state = A.get self.st
let[@inline] fd (self : t) =
match st self with
| Closed -> raise Closed
| Open { fd; _ } -> fd
let create ?(close_noerr = true) fd : t =
Unix.set_nonblock fd;
{ st = A.make @@ Open { close_noerr; fd } }
let close (self : t) : unit =
while
let old = A.get self.st in
match old with
| Closed -> false
| Open { fd; close_noerr } ->
if A.compare_and_set self.st old Closed then (
if close_noerr then (
try Unix.close fd with _ -> ()
) else
Unix.close fd;
false
) else
true
do
()
done
let await_readable (self : t) =
let loop = Ev_loop.get_or_create () in
match A.get self.st with
| Closed -> raise Closed
| Open { fd; _ } ->
(* wait for FD to be ready *)
Moonpool.Private.Suspend_.suspend
{
handle =
(fun ~run:_ ~resume sus ->
ignore
(Ev_loop.on_readable loop fd (fun () -> resume sus @@ Ok ())
: Cancel_handle.t));
}
let rec read (self : t) buf i len : int =
match st self with
| Closed -> 0
| Open { fd; _ } ->
if len = 0 then
0
else (
match Unix.read fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
(match await_readable self with
| () -> read self buf i len
| exception Closed -> 0)
| n -> n
)
let await_writable (self : t) =
let loop = Ev_loop.get_or_create () in
match st self with
| Closed -> raise Closed
| Open { fd; _ } ->
(* wait for FD to be ready *)
Moonpool.Private.Suspend_.suspend
{
handle =
(fun ~run:_ ~resume sus ->
ignore
(Ev_loop.on_writable loop fd (fun () -> resume sus @@ Ok ())
: Cancel_handle.t));
}
let rec write_once (self : t) buf i len : int =
match st self with
| Closed -> 0
| Open { fd; _ } ->
if len = 0 then
0
else (
match Unix.write fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
(match await_writable self with
| () -> write_once self buf i len
| exception Closed -> 0)
| n -> n
)
let write self buf i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
let n = write_once self buf !i !len in
if n = 0 then raise Closed;
i := !i + n;
len := !len - n
done
class to_in (self : t) : Iostream.In.t =
object
method input bs i len = read self bs i len
method close () = close self
end
class to_out (self : t) : Iostream.Out.t =
object
method output bs i len = write self bs i len
method close () = close self
end
class to_in_buf ?bytes (self : t) : Iostream.In_buf.t =
object
inherit Iostream.In_buf.t_from_refill ?bytes ()
method private refill (slice : Iostream.Slice.t) =
slice.off <- 0;
slice.len <- read self slice.bytes 0 (Bytes.length slice.bytes)
method close () = close self
end
class to_out_buf ?bytes (self : t) : Iostream.Out_buf.t =
object
inherit Iostream.Out_buf.t_from_output ?bytes ()
method private close_underlying () = close self
method private output_underlying bs i len = write self bs i len
end

41
src/io/fd.mli Normal file
View file

@ -0,0 +1,41 @@
type t
(** A wrapper around a unix file descriptor.
This makes closing idempotent and ensures that the
Unix FD is only accessible while this is open. *)
exception Closed
val create : ?close_noerr:bool -> Unix.file_descr -> t
(** Create a new file descriptor from a unix one. *)
val fd : t -> Unix.file_descr
(** Access the underlying file descriptor.
@raise Closed if the FD was closed. *)
val close : t -> unit
(** Close the FD. Idempotent. *)
val read : t -> bytes -> int -> int -> int
(** Read from the FD. Suspends if the FD is not ready.
@return 0 if the FD is closed *)
val write_once : t -> bytes -> int -> int -> int
(** Write into the FD. Suspends if the FD is not ready.
@return 0 if the FD is closed *)
val write : t -> bytes -> int -> int -> unit
(** A loop around {!write_once}.
@raise Closed if the FD is closed before this completes. *)
val await_readable : t -> unit
(** Wait for the FD to be readable.
@raise Closed if the FD is closed. *)
val await_writable : t -> unit
(** Wait for the FD to be writable.
@raise Closed if the FD is closed. *)
class to_in : t -> Iostream.In.t
class to_out : t -> Iostream.Out.t
class to_in_buf : ?bytes:bytes -> t -> Iostream.In_buf.t
class to_out_buf : ?bytes:bytes -> t -> Iostream.Out_buf.t

View file

@ -1,5 +1,5 @@
open Common_ open Common_
module Fd = Fd
module Timer = Timer module Timer = Timer
module Ev_loop = Ev_loop module Ev_loop = Ev_loop
(* TODO: module IO_unix = IO_unix *) module Net = Net
(* TODO: module Net = Net *)

View file

@ -1,4 +1,9 @@
(* module IO_unix = IO_unix *) (** IO loop.
(* module Net = Net *)
This event loop runs in a background thread and provides
non-blocking IOs to moonpool. *)
module Fd = Fd
module Timer = Timer module Timer = Timer
module Ev_loop = Ev_loop module Ev_loop = Ev_loop
module Net = Net

View file

@ -54,18 +54,16 @@ module TCP_server = struct
Unix.set_nonblock sock; Unix.set_nonblock sock;
Unix.setsockopt sock Unix.SO_REUSEADDR true; Unix.setsockopt sock Unix.SO_REUSEADDR true;
Unix.listen sock 32; Unix.listen sock 32;
let fd_sock = Fd.create sock in
let fut, _ = Fut.make () in let fut, promise = Fut.make () in
let self = { fut } in let self = { fut } in
let loop_client client_sock client_addr : unit = let loop_client client_sock client_addr : unit =
Unix.set_nonblock client_sock;
Unix.setsockopt client_sock Unix.TCP_NODELAY true; Unix.setsockopt client_sock Unix.TCP_NODELAY true;
let client_sock = Fd.create ~close_noerr:true client_sock in
let@ () = let@ () = Fun.protect ~finally:(fun () -> Fd.close client_sock) in
Fun.protect ~finally:(fun () ->
try Unix.close client_sock with _ -> ())
in
handle_client client_addr client_sock handle_client client_addr client_sock
in in
@ -78,74 +76,61 @@ module TCP_server = struct
loop_client client_sock client_addr) loop_client client_sock client_addr)
: _ Fiber.t) : _ Fiber.t)
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
(* FIXME: possible race condition: the socket became readable
in the mid-time and we won't get notified. We need to call
[accept] after subscribing to [on_readable]. *)
(* suspend *) (* suspend *)
let loop = U_loop.cur () in Fd.await_readable fd_sock
Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup ->
(* FIXME: possible race condition: the socket became readable
in the mid-time and we won't get notified. We need to call
[accept] after subscribing to [on_readable]. *)
ignore
(loop#on_readable sock (fun _ev ->
wakeup ();
Cancel_handle.cancel _ev)
: Cancel_handle.t))
done done
in in
let loop_fiber = let _loop_fiber : unit Fiber.t = Fiber.spawn ~protect:false loop in
let sched = Fuseau.get_scheduler () in
Fuseau.spawn_as_child_of ~propagate_cancel_to_parent:true sched fiber loop
in
let finally () = let finally () =
stop_ loop_fiber; Fut.fulfill_idempotent promise @@ Ok ();
Unix.close sock Fd.close fd_sock
in in
let@ () = Fun.protect ~finally in let@ () = Fun.protect ~finally in
f self f self
let with_serve (addr : Sockaddr.t) handle_client (f : t -> 'a) : 'a = let with_serve (addr : Sockaddr.t) handle_client (f : t -> 'a) : 'a =
with_serve' addr with_serve' addr
(fun client_addr client_sock -> (fun client_addr (client_sock : Fd.t) ->
let ic = IO_unix.In.of_unix_fd client_sock in let ic = new Fd.to_in_buf client_sock in
let oc = IO_unix.Out.of_unix_fd client_sock in let oc = new Fd.to_out_buf client_sock in
handle_client client_addr ic oc) handle_client client_addr ic oc)
f f
end end
module TCP_client = struct module TCP_client = struct
let with_connect' addr (f : Unix.file_descr -> 'a) : 'a = let with_connect' addr (f : Fd.t -> 'a) : 'a =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.set_nonblock sock;
Unix.setsockopt sock Unix.TCP_NODELAY true; Unix.setsockopt sock Unix.TCP_NODELAY true;
let sock = Fd.create ~close_noerr:true sock in
(* connect asynchronously *) (* connect asynchronously *)
while while
try try
Unix.connect sock addr; let fd = Fd.fd sock in
Unix.connect fd addr;
false false
with with
| Unix.Unix_error | Unix.Unix_error
((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _) ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _)
-> ->
Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup -> Fd.await_writable sock;
let loop = U_loop.cur () in
ignore
(loop#on_writable sock (fun _ev ->
wakeup ();
Cancel_handle.cancel _ev)
: Cancel_handle.t));
true true
do do
() ()
done; done;
let finally () = try Unix.close sock with _ -> () in let finally () = Fd.close sock in
let@ () = Fun.protect ~finally in let@ () = Fun.protect ~finally in
f sock f sock
let with_connect addr (f : Iostream.In.t -> Iostream.Out.t -> 'a) : 'a = let with_connect addr (f : Iostream.In_buf.t -> Iostream.Out_buf.t -> 'a) : 'a
=
with_connect' addr (fun sock -> with_connect' addr (fun sock ->
let ic = IO_unix.In.of_unix_fd sock in let ic = new Fd.to_in_buf sock in
let oc = IO_unix.Out.of_unix_fd sock in let oc = new Fd.to_out_buf sock in
f ic oc) f ic oc)
end end

View file

@ -32,18 +32,22 @@ module TCP_server : sig
val join : t -> unit val join : t -> unit
val with_serve' : val with_serve' :
Sockaddr.t -> (Sockaddr.t -> Unix.file_descr -> unit) -> (t -> 'a) -> 'a Sockaddr.t -> (Sockaddr.t -> Fd.t -> unit) -> (t -> 'a) -> 'a
(* TODO: bytes pool *)
val with_serve : val with_serve :
Sockaddr.t -> Sockaddr.t ->
(Sockaddr.t -> Iostream.In.t -> Iostream.Out.t -> unit) -> (Sockaddr.t -> Iostream.In_buf.t -> Iostream.Out_buf.t -> unit) ->
(t -> 'a) -> (t -> 'a) ->
'a 'a
end end
module TCP_client : sig module TCP_client : sig
val with_connect' : Unix.sockaddr -> (Unix.file_descr -> 'a) -> 'a val with_connect' : Unix.sockaddr -> (Fd.t -> 'a) -> 'a
(* TODO: bytes pool *)
val with_connect : val with_connect :
Unix.sockaddr -> (Iostream.In.t -> Iostream.Out.t -> 'a) -> 'a Unix.sockaddr -> (Iostream.In_buf.t -> Iostream.Out_buf.t -> 'a) -> 'a
end end