feat: handle close properly; read/write callbacks are informed about

closed FDs
This commit is contained in:
Simon Cruanes 2025-02-15 16:47:59 -05:00
parent dfee834611
commit 6007cf9392
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
7 changed files with 148 additions and 48 deletions

View file

@ -1,13 +1,28 @@
module Trace_ = Trace_
exception Closed
module Impl = struct
type 'st ops = {
clear: 'st -> unit;
wakeup_from_outside: 'st -> unit;
close: 'st -> Unix.file_descr -> unit;
on_readable:
'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
'a 'b.
'st ->
Unix.file_descr ->
'a ->
'b ->
(closed:bool -> 'a -> 'b -> unit) ->
unit;
on_writable:
'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
'a 'b.
'st ->
Unix.file_descr ->
'a ->
'b ->
(closed:bool -> 'a -> 'b -> unit) ->
unit;
run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
step: 'st -> unit;
}
@ -23,6 +38,7 @@ type t = Impl.t
let[@inline] clear (Ev (ops, st)) = ops.clear st
let[@inline] wakeup_from_outside (Ev (ops, st)) = ops.wakeup_from_outside st
let[@inline] close (Ev (ops, st)) fd = ops.close st fd
let[@inline] on_readable (Ev (ops, st)) fd x y f : unit =
ops.on_readable st fd x y f
@ -34,11 +50,3 @@ let[@inline] run_after_s (Ev (ops, st)) time x y f : unit =
ops.run_after_s st time x y f
let[@inline] step (Ev (ops, st)) : unit = ops.step st
(*
let rec read (self:t) fd buf i len : int =
match Unix.read fd buf i len with
| n -> n
| exception Unix.Unix_error (Unix, _, _) ->
read self fd buf i len
*)

View file

@ -2,14 +2,29 @@
type t
exception Closed
module Impl : sig
type 'st ops = {
clear: 'st -> unit;
wakeup_from_outside: 'st -> unit;
close: 'st -> Unix.file_descr -> unit;
on_readable:
'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
'a 'b.
'st ->
Unix.file_descr ->
'a ->
'b ->
(closed:bool -> 'a -> 'b -> unit) ->
unit;
on_writable:
'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
'a 'b.
'st ->
Unix.file_descr ->
'a ->
'b ->
(closed:bool -> 'a -> 'b -> unit) ->
unit;
run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
step: 'st -> unit;
}
@ -25,8 +40,15 @@ val wakeup_from_outside : t -> unit
val step : t -> unit
(** Run one step of the event loop until something happens *)
val on_readable : t -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit
val on_writable : t -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit
val close : t -> Unix.file_descr -> unit
(** Close the file descriptor and clean it up *)
val on_readable :
t -> Unix.file_descr -> 'a -> 'b -> (closed:bool -> 'a -> 'b -> unit) -> unit
val on_writable :
t -> Unix.file_descr -> 'a -> 'b -> (closed:bool -> 'a -> 'b -> unit) -> unit
val run_after_s : t -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit
(**/**)

View file

@ -4,6 +4,8 @@ open struct
let ( let@ ) = ( @@ )
end
exception Closed = Nanoev.Closed
module Global_ = struct
type st =
| None
@ -76,9 +78,12 @@ let retry_read_ fd f =
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
Trace_.message "read must wait";
let trigger = Picos.Trigger.create () in
Nanoev.on_readable ev fd trigger () (fun trigger () ->
let closed_r = ref false in
Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r ->
closed_r := closed;
Picos.Trigger.signal trigger);
Picos.Trigger.await trigger |> unwrap_;
if !closed_r then raise Closed;
loop ()
in
loop ()
@ -92,27 +97,42 @@ let retry_write_ fd f =
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
Trace_.message "write must wait";
let trigger = Picos.Trigger.create () in
Nanoev.on_writable ev fd trigger () (fun trigger () ->
let closed_r = ref false in
Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r ->
closed_r := closed;
Picos.Trigger.signal trigger);
Picos.Trigger.await trigger |> unwrap_;
if !closed_r then raise Closed;
loop ()
in
loop ()
let read fd buf i len : int =
try
retry_read_ fd (fun () ->
Trace_.message "read";
Unix.read fd buf i len)
with Closed -> 0
let close fd =
Unix.close fd;
let ev = get_loop_exn_ () in
Nanoev.close ev fd
let accept fd =
try
retry_read_ fd (fun () ->
Trace_.message "accept";
Unix.accept fd)
with Unix.Unix_error ((Unix.ESHUTDOWN | Unix.ECONNABORTED), _, _) ->
raise Closed
let write fd buf i len : int =
try
retry_write_ fd (fun () ->
Trace_.message "write";
Unix.write fd buf i len)
with Closed -> 0
let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr)

View file

@ -9,7 +9,24 @@ val has_bg_thread : unit -> bool
(** {2 Non blocking IO primitives} *)
val read : Unix.file_descr -> bytes -> int -> int -> int
val connect : Unix.file_descr -> Unix.sockaddr -> unit
(** Read from the non blocking FD.
@raise Nanoev.Closed if the FD is closed
@raise Unix.Unix_error for other errors *)
val write : Unix.file_descr -> bytes -> int -> int -> int
(** Write into the non blocking FD.
@raise Nanoev.Closed if the FD is closed
@raise Unix.Unix_error for other errors *)
val close : Unix.file_descr -> unit
(** Close the file descriptor
@raise Unix.Unix_error when it fails *)
val connect : Unix.file_descr -> Unix.sockaddr -> unit
val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr
(** Accept a connection on this fd.
@raise Nanoev.Closed if the FD is closed.
@raise Unix.Unix_error for other errors *)
val sleep : float -> unit

View file

@ -77,13 +77,11 @@ module Out = struct
done
method private close_underlying () =
if not !closed then (
closed := true;
if not (Atomic.exchange closed true) then
if close_noerr then (
try Unix.close fd with _ -> ()
try EV.close fd with _ -> ()
) else
Unix.close fd
)
EV.close fd
end
end
@ -92,7 +90,6 @@ module In = struct
class type t = In_buf.t
(* FIXME: closed should be atomic *)
let of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t)
(fd : Unix.file_descr) : t =
let eof = ref false in
@ -122,13 +119,12 @@ module In = struct
)
method close () =
if not !closed then (
closed := true;
if not (Atomic.exchange closed true) then (
eof := true;
if close_noerr then (
try Unix.close fd with _ -> ()
try EV.close fd with _ -> ()
) else
Unix.close fd
EV.close fd
)
end
end
@ -224,7 +220,7 @@ module Unix_tcp_server_ = struct
Pool.with_resource self.slice_pool @@ fun ic_buf ->
Pool.with_resource self.slice_pool @@ fun oc_buf ->
let closed = ref false in
let closed = Atomic.make false in
let oc =
new Out.of_unix_fd

View file

@ -8,7 +8,7 @@ end
(** Callback list *)
type cbs =
| Nil
| Sub : 'a * 'b * ('a -> 'b -> unit) * cbs -> cbs
| Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs
let[@inline] cb_is_empty = function
| Nil -> true
@ -44,6 +44,18 @@ type st = {
lock: Mutex.t;
}
let rec perform_cbs ~closed = function
| Nil -> ()
| Sub (x, y, f, tail) ->
f ~closed x y;
perform_cbs ~closed tail
let rec perform_cbs_closed ~closed = function
| Nil -> ()
| Sub (x, y, f, tail) ->
f ~closed x y;
perform_cbs_closed ~closed tail
let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline
let create_st () : st =
@ -97,6 +109,25 @@ let get_fd_ (self : st) fd : per_fd =
Hashtbl.add self.fds fd per_fd;
per_fd
let close self fd : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in
let r, w =
let@ self = with_lock_ self in
match Hashtbl.find self.fds fd with
| per_fd ->
Hashtbl.remove self.fds fd;
self.sub_up_to_date <- false;
if Atomic.get self.in_select then wakeup_from_outside self;
per_fd.r, per_fd.w
| exception Not_found ->
invalid_arg "File descriptor is not known to Nanoev"
in
(* call callbacks outside of the lock *)
perform_cbs_closed ~closed:true r;
perform_cbs_closed ~closed:true w;
()
let on_readable self fd x y f : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in
let@ self = with_lock_ self in
@ -140,12 +171,6 @@ let next_deadline_ (self : st) : float option =
| exception Heap.Empty -> None
| Timer t -> Some t.deadline
let rec perform_cbs = function
| Nil -> ()
| Sub (x, y, f, tail) ->
f x y;
perform_cbs tail
let step (self : st) : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.unix.step" in
(* gather the subscriptions and timeout *)
@ -207,19 +232,27 @@ let step (self : st) : unit =
(* call callbacks *)
List.iter
(fun fd ->
perform_cbs fd.r;
perform_cbs ~closed:false fd.r;
fd.r <- Nil)
!ready_r;
List.iter
(fun fd ->
perform_cbs fd.w;
perform_cbs ~closed:false fd.w;
fd.w <- Nil)
!ready_w;
()
let ops : st Nanoev.Impl.ops =
{ step; on_readable; on_writable; run_after_s; wakeup_from_outside; clear }
{
step;
close;
on_readable;
on_writable;
run_after_s;
wakeup_from_outside;
clear;
}
include Nanoev

View file

@ -15,7 +15,11 @@ let () =
let ev = E.create () in
ignore (Thread.create loop ev : Thread.t);
let rd, wr = mkpipe () in
E.on_readable ev rd () () (fun () () -> print_endline "can read");
E.on_readable ev rd () () (fun ~closed () () ->
if closed then
print_endline "closed!"
else
print_endline "can read");
Thread.delay 0.05;
print_endline "writing";
ignore