diff --git a/src/core/nanoev.ml b/src/core/nanoev.ml index f0ea5c7..d574367 100644 --- a/src/core/nanoev.ml +++ b/src/core/nanoev.ml @@ -1,13 +1,22 @@ module Trace_ = Trace_ +module FD = struct + type t = { + fd: Unix.file_descr; + closed: bool Atomic.t; + } + + let close self = + if not (Atomic.exchange self.closed true) then Unix.close self.fd +end + module Impl = struct type 'st ops = { clear: 'st -> unit; + get_fd: 'st -> Unix.file_descr -> FD.t; wakeup_from_outside: 'st -> unit; - on_readable: - 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; - on_writable: - 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + on_readable: 'a 'b. 'st -> FD.t -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + on_writable: 'a 'b. 'st -> FD.t -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; step: 'st -> unit; } @@ -22,6 +31,7 @@ open Impl type t = Impl.t let[@inline] clear (Ev (ops, st)) = ops.clear st +let[@inline] get_fd (Ev (ops, st)) fd = ops.get_fd st fd let[@inline] wakeup_from_outside (Ev (ops, st)) = ops.wakeup_from_outside st let[@inline] on_readable (Ev (ops, st)) fd x y f : unit = @@ -34,11 +44,3 @@ let[@inline] run_after_s (Ev (ops, st)) time x y f : unit = ops.run_after_s st time x y f let[@inline] step (Ev (ops, st)) : unit = ops.step st - -(* -let rec read (self:t) fd buf i len : int = - match Unix.read fd buf i len with - | n -> n - | exception Unix.Unix_error (Unix, _, _) -> - read self fd buf i len -*) diff --git a/src/core/nanoev.mli b/src/core/nanoev.mli index 2224efa..fb32320 100644 --- a/src/core/nanoev.mli +++ b/src/core/nanoev.mli @@ -2,14 +2,22 @@ type t +module FD : sig + type t = private { + fd: Unix.file_descr; + closed: bool Atomic.t; + } + + val close : t -> unit +end + module Impl : sig type 'st ops = { clear: 'st -> unit; + get_fd: 'st -> Unix.file_descr -> FD.t; wakeup_from_outside: 'st -> unit; - on_readable: - 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; - on_writable: - 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + on_readable: 'a 'b. 'st -> FD.t -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + on_writable: 'a 'b. 'st -> FD.t -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; step: 'st -> unit; } @@ -21,12 +29,13 @@ val clear : t -> unit (** Reset the state *) val wakeup_from_outside : t -> unit +val get_fd : t -> Unix.file_descr -> FD.t val step : t -> unit (** Run one step of the event loop until something happens *) -val on_readable : t -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit -val on_writable : t -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit +val on_readable : t -> FD.t -> 'a -> 'b -> ('a -> 'b -> unit) -> unit +val on_writable : t -> FD.t -> 'a -> 'b -> ('a -> 'b -> unit) -> unit val run_after_s : t -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit (**/**) diff --git a/src/picos/nanoev_picos.ml b/src/picos/nanoev_picos.ml index 45f5cc6..54b7011 100644 --- a/src/picos/nanoev_picos.ml +++ b/src/picos/nanoev_picos.ml @@ -4,6 +4,8 @@ open struct let ( let@ ) = ( @@ ) end +module FD = Nanoev.FD + module Global_ = struct type st = | None @@ -67,6 +69,10 @@ let[@inline] unwrap_ = function | None -> () | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt +let get_fd (fd : Unix.file_descr) : FD.t = + let ev = get_loop_exn_ () in + Nanoev.get_fd ev fd + let retry_read_ fd f = let ev = get_loop_exn_ () in let[@unroll 1] rec loop () = @@ -99,22 +105,24 @@ let retry_write_ fd f = in loop () -let read fd buf i len : int = +let read (fd : FD.t) buf i len : int = retry_read_ fd (fun () -> Trace_.message "read"; - Unix.read fd buf i len) + Unix.read fd.fd buf i len) -let accept fd = +let accept (fd : FD.t) = retry_read_ fd (fun () -> Trace_.message "accept"; - Unix.accept fd) + let client_sock, client_addr = Unix.accept fd.fd in + get_fd client_sock, client_addr) -let write fd buf i len : int = +let write (fd : FD.t) buf i len : int = retry_write_ fd (fun () -> Trace_.message "write"; - Unix.write fd buf i len) + Unix.write fd.fd buf i len) -let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr) +let connect (fd : FD.t) addr = + retry_write_ fd (fun () -> Unix.connect fd.fd addr) let sleep t = if t > 0. then ( diff --git a/src/picos/nanoev_picos.mli b/src/picos/nanoev_picos.mli index 290e655..9736a7f 100644 --- a/src/picos/nanoev_picos.mli +++ b/src/picos/nanoev_picos.mli @@ -1,5 +1,7 @@ (** Basic interface with picos *) +module FD = Nanoev.FD + val setup_bg_thread : Nanoev.t -> unit (** Install this event loop in a background thread *) @@ -8,8 +10,9 @@ val has_bg_thread : unit -> bool (** {2 Non blocking IO primitives} *) -val read : Unix.file_descr -> bytes -> int -> int -> int -val connect : Unix.file_descr -> Unix.sockaddr -> unit -val write : Unix.file_descr -> bytes -> int -> int -> int -val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr +val get_fd : Unix.file_descr -> FD.t +val read : FD.t -> bytes -> int -> int -> int +val connect : FD.t -> Unix.sockaddr -> unit +val write : FD.t -> bytes -> int -> int -> int +val accept : FD.t -> FD.t * Unix.sockaddr val sleep : float -> unit diff --git a/src/unix/nanoev_unix.ml b/src/unix/nanoev_unix.ml index 769c590..3be11a2 100644 --- a/src/unix/nanoev_unix.ml +++ b/src/unix/nanoev_unix.ml @@ -1,5 +1,6 @@ open struct module Trace_ = Nanoev.Trace_ + module FD = Nanoev.FD let ( let@ ) = ( @@ ) let now_ : unit -> float = Unix.gettimeofday @@ -89,7 +90,7 @@ let wakeup_from_outside (self : st) : unit = let b = Bytes.make 1 '!' in ignore (Unix.write self.wakeup_wr b 0 1 : int) -let get_fd_ (self : st) fd : per_fd = +let get_per_fd_ (self : st) fd : per_fd = match Hashtbl.find self.fds fd with | per_fd -> per_fd | exception Not_found -> @@ -97,10 +98,10 @@ let get_fd_ (self : st) fd : per_fd = Hashtbl.add self.fds fd per_fd; per_fd -let on_readable self fd x y f : unit = +let on_readable self (fd : FD.t) x y f : unit = let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in let@ self = with_lock_ self in - let per_fd = get_fd_ self fd in + let per_fd = get_per_fd_ self fd in per_fd.r <- Sub (x, y, f, per_fd.r); self.sub_up_to_date <- false; if Atomic.get self.in_select then wakeup_from_outside self @@ -108,7 +109,7 @@ let on_readable self fd x y f : unit = let on_writable self fd x y f : unit = let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in let@ self = with_lock_ self in - let per_fd = get_fd_ self fd in + let per_fd = get_per_fd_ self fd in per_fd.w <- Sub (x, y, f, per_fd.w); self.sub_up_to_date <- false; if Atomic.get self.in_select then wakeup_from_outside self