diff --git a/src/core/nanoev.ml b/src/core/nanoev.ml index f0ea5c7..c15935e 100644 --- a/src/core/nanoev.ml +++ b/src/core/nanoev.ml @@ -1,13 +1,28 @@ module Trace_ = Trace_ +exception Closed + module Impl = struct type 'st ops = { clear: 'st -> unit; wakeup_from_outside: 'st -> unit; + close: 'st -> Unix.file_descr -> unit; on_readable: - 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + 'a 'b. + 'st -> + Unix.file_descr -> + 'a -> + 'b -> + (closed:bool -> 'a -> 'b -> unit) -> + unit; on_writable: - 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + 'a 'b. + 'st -> + Unix.file_descr -> + 'a -> + 'b -> + (closed:bool -> 'a -> 'b -> unit) -> + unit; run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; step: 'st -> unit; } @@ -23,6 +38,7 @@ type t = Impl.t let[@inline] clear (Ev (ops, st)) = ops.clear st let[@inline] wakeup_from_outside (Ev (ops, st)) = ops.wakeup_from_outside st +let[@inline] close (Ev (ops, st)) fd = ops.close st fd let[@inline] on_readable (Ev (ops, st)) fd x y f : unit = ops.on_readable st fd x y f @@ -34,11 +50,3 @@ let[@inline] run_after_s (Ev (ops, st)) time x y f : unit = ops.run_after_s st time x y f let[@inline] step (Ev (ops, st)) : unit = ops.step st - -(* -let rec read (self:t) fd buf i len : int = - match Unix.read fd buf i len with - | n -> n - | exception Unix.Unix_error (Unix, _, _) -> - read self fd buf i len -*) diff --git a/src/core/nanoev.mli b/src/core/nanoev.mli index 2224efa..4792cd6 100644 --- a/src/core/nanoev.mli +++ b/src/core/nanoev.mli @@ -2,14 +2,29 @@ type t +exception Closed + module Impl : sig type 'st ops = { clear: 'st -> unit; wakeup_from_outside: 'st -> unit; + close: 'st -> Unix.file_descr -> unit; on_readable: - 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + 'a 'b. + 'st -> + Unix.file_descr -> + 'a -> + 'b -> + (closed:bool -> 'a -> 'b -> unit) -> + unit; on_writable: - 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + 'a 'b. + 'st -> + Unix.file_descr -> + 'a -> + 'b -> + (closed:bool -> 'a -> 'b -> unit) -> + unit; run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; step: 'st -> unit; } @@ -25,8 +40,15 @@ val wakeup_from_outside : t -> unit val step : t -> unit (** Run one step of the event loop until something happens *) -val on_readable : t -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit -val on_writable : t -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit +val close : t -> Unix.file_descr -> unit +(** Close the file descriptor and clean it up *) + +val on_readable : + t -> Unix.file_descr -> 'a -> 'b -> (closed:bool -> 'a -> 'b -> unit) -> unit + +val on_writable : + t -> Unix.file_descr -> 'a -> 'b -> (closed:bool -> 'a -> 'b -> unit) -> unit + val run_after_s : t -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit (**/**) diff --git a/src/picos/nanoev_picos.ml b/src/picos/nanoev_picos.ml index 45f5cc6..9920345 100644 --- a/src/picos/nanoev_picos.ml +++ b/src/picos/nanoev_picos.ml @@ -4,6 +4,8 @@ open struct let ( let@ ) = ( @@ ) end +exception Closed = Nanoev.Closed + module Global_ = struct type st = | None @@ -76,9 +78,12 @@ let retry_read_ fd f = Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> Trace_.message "read must wait"; let trigger = Picos.Trigger.create () in - Nanoev.on_readable ev fd trigger () (fun trigger () -> + let closed_r = ref false in + Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r -> + closed_r := closed; Picos.Trigger.signal trigger); Picos.Trigger.await trigger |> unwrap_; + if !closed_r then raise Closed; loop () in loop () @@ -92,27 +97,42 @@ let retry_write_ fd f = Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> Trace_.message "write must wait"; let trigger = Picos.Trigger.create () in - Nanoev.on_writable ev fd trigger () (fun trigger () -> + let closed_r = ref false in + Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r -> + closed_r := closed; Picos.Trigger.signal trigger); Picos.Trigger.await trigger |> unwrap_; + if !closed_r then raise Closed; loop () in loop () let read fd buf i len : int = - retry_read_ fd (fun () -> - Trace_.message "read"; - Unix.read fd buf i len) + try + retry_read_ fd (fun () -> + Trace_.message "read"; + Unix.read fd buf i len) + with Closed -> 0 + +let close fd = + Unix.close fd; + let ev = get_loop_exn_ () in + Nanoev.close ev fd let accept fd = - retry_read_ fd (fun () -> - Trace_.message "accept"; - Unix.accept fd) + try + retry_read_ fd (fun () -> + Trace_.message "accept"; + Unix.accept fd) + with Unix.Unix_error ((Unix.ESHUTDOWN | Unix.ECONNABORTED), _, _) -> + raise Closed let write fd buf i len : int = - retry_write_ fd (fun () -> - Trace_.message "write"; - Unix.write fd buf i len) + try + retry_write_ fd (fun () -> + Trace_.message "write"; + Unix.write fd buf i len) + with Closed -> 0 let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr) diff --git a/src/picos/nanoev_picos.mli b/src/picos/nanoev_picos.mli index 290e655..ef795bb 100644 --- a/src/picos/nanoev_picos.mli +++ b/src/picos/nanoev_picos.mli @@ -9,7 +9,24 @@ val has_bg_thread : unit -> bool (** {2 Non blocking IO primitives} *) val read : Unix.file_descr -> bytes -> int -> int -> int -val connect : Unix.file_descr -> Unix.sockaddr -> unit +(** Read from the non blocking FD. + @raise Nanoev.Closed if the FD is closed + @raise Unix.Unix_error for other errors *) + val write : Unix.file_descr -> bytes -> int -> int -> int +(** Write into the non blocking FD. + @raise Nanoev.Closed if the FD is closed + @raise Unix.Unix_error for other errors *) + +val close : Unix.file_descr -> unit +(** Close the file descriptor + @raise Unix.Unix_error when it fails *) + +val connect : Unix.file_descr -> Unix.sockaddr -> unit + val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr +(** Accept a connection on this fd. + @raise Nanoev.Closed if the FD is closed. + @raise Unix.Unix_error for other errors *) + val sleep : float -> unit diff --git a/src/tiny_httpd/nanoev_tiny_httpd.ml b/src/tiny_httpd/nanoev_tiny_httpd.ml index ffcb7d4..3542c7b 100644 --- a/src/tiny_httpd/nanoev_tiny_httpd.ml +++ b/src/tiny_httpd/nanoev_tiny_httpd.ml @@ -77,13 +77,11 @@ module Out = struct done method private close_underlying () = - if not !closed then ( - closed := true; + if not (Atomic.exchange closed true) then if close_noerr then ( - try Unix.close fd with _ -> () + try EV.close fd with _ -> () ) else - Unix.close fd - ) + EV.close fd end end @@ -92,7 +90,6 @@ module In = struct class type t = In_buf.t - (* FIXME: closed should be atomic *) let of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) (fd : Unix.file_descr) : t = let eof = ref false in @@ -122,13 +119,12 @@ module In = struct ) method close () = - if not !closed then ( - closed := true; + if not (Atomic.exchange closed true) then ( eof := true; if close_noerr then ( - try Unix.close fd with _ -> () + try EV.close fd with _ -> () ) else - Unix.close fd + EV.close fd ) end end @@ -224,7 +220,7 @@ module Unix_tcp_server_ = struct Pool.with_resource self.slice_pool @@ fun ic_buf -> Pool.with_resource self.slice_pool @@ fun oc_buf -> - let closed = ref false in + let closed = Atomic.make false in let oc = new Out.of_unix_fd diff --git a/src/unix/nanoev_unix.ml b/src/unix/nanoev_unix.ml index 769c590..7d656c1 100644 --- a/src/unix/nanoev_unix.ml +++ b/src/unix/nanoev_unix.ml @@ -8,7 +8,7 @@ end (** Callback list *) type cbs = | Nil - | Sub : 'a * 'b * ('a -> 'b -> unit) * cbs -> cbs + | Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs let[@inline] cb_is_empty = function | Nil -> true @@ -44,6 +44,18 @@ type st = { lock: Mutex.t; } +let rec perform_cbs ~closed = function + | Nil -> () + | Sub (x, y, f, tail) -> + f ~closed x y; + perform_cbs ~closed tail + +let rec perform_cbs_closed ~closed = function + | Nil -> () + | Sub (x, y, f, tail) -> + f ~closed x y; + perform_cbs_closed ~closed tail + let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline let create_st () : st = @@ -97,6 +109,25 @@ let get_fd_ (self : st) fd : per_fd = Hashtbl.add self.fds fd per_fd; per_fd +let close self fd : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in + let r, w = + let@ self = with_lock_ self in + match Hashtbl.find self.fds fd with + | per_fd -> + Hashtbl.remove self.fds fd; + self.sub_up_to_date <- false; + if Atomic.get self.in_select then wakeup_from_outside self; + per_fd.r, per_fd.w + | exception Not_found -> + invalid_arg "File descriptor is not known to Nanoev" + in + + (* call callbacks outside of the lock *) + perform_cbs_closed ~closed:true r; + perform_cbs_closed ~closed:true w; + () + let on_readable self fd x y f : unit = let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in let@ self = with_lock_ self in @@ -140,12 +171,6 @@ let next_deadline_ (self : st) : float option = | exception Heap.Empty -> None | Timer t -> Some t.deadline -let rec perform_cbs = function - | Nil -> () - | Sub (x, y, f, tail) -> - f x y; - perform_cbs tail - let step (self : st) : unit = let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.unix.step" in (* gather the subscriptions and timeout *) @@ -207,19 +232,27 @@ let step (self : st) : unit = (* call callbacks *) List.iter (fun fd -> - perform_cbs fd.r; + perform_cbs ~closed:false fd.r; fd.r <- Nil) !ready_r; List.iter (fun fd -> - perform_cbs fd.w; + perform_cbs ~closed:false fd.w; fd.w <- Nil) !ready_w; () let ops : st Nanoev.Impl.ops = - { step; on_readable; on_writable; run_after_s; wakeup_from_outside; clear } + { + step; + close; + on_readable; + on_writable; + run_after_s; + wakeup_from_outside; + clear; + } include Nanoev diff --git a/tests/unix/t1.ml b/tests/unix/t1.ml index 79e76ff..810087d 100644 --- a/tests/unix/t1.ml +++ b/tests/unix/t1.ml @@ -15,7 +15,11 @@ let () = let ev = E.create () in ignore (Thread.create loop ev : Thread.t); let rd, wr = mkpipe () in - E.on_readable ev rd () () (fun () () -> print_endline "can read"); + E.on_readable ev rd () () (fun ~closed () () -> + if closed then + print_endline "closed!" + else + print_endline "can read"); Thread.delay 0.05; print_endline "writing"; ignore