This commit is contained in:
Simon Cruanes 2025-02-15 16:14:53 -05:00
parent dfee834611
commit 2e8b5c2bdd
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 56 additions and 33 deletions

View file

@ -1,13 +1,22 @@
module Trace_ = Trace_
module FD = struct
type t = {
fd: Unix.file_descr;
closed: bool Atomic.t;
}
let close self =
if not (Atomic.exchange self.closed true) then Unix.close self.fd
end
module Impl = struct
type 'st ops = {
clear: 'st -> unit;
get_fd: 'st -> Unix.file_descr -> FD.t;
wakeup_from_outside: 'st -> unit;
on_readable:
'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
on_writable:
'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
on_readable: 'a 'b. 'st -> FD.t -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
on_writable: 'a 'b. 'st -> FD.t -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
step: 'st -> unit;
}
@ -22,6 +31,7 @@ open Impl
type t = Impl.t
let[@inline] clear (Ev (ops, st)) = ops.clear st
let[@inline] get_fd (Ev (ops, st)) fd = ops.get_fd st fd
let[@inline] wakeup_from_outside (Ev (ops, st)) = ops.wakeup_from_outside st
let[@inline] on_readable (Ev (ops, st)) fd x y f : unit =
@ -34,11 +44,3 @@ let[@inline] run_after_s (Ev (ops, st)) time x y f : unit =
ops.run_after_s st time x y f
let[@inline] step (Ev (ops, st)) : unit = ops.step st
(*
let rec read (self:t) fd buf i len : int =
match Unix.read fd buf i len with
| n -> n
| exception Unix.Unix_error (Unix, _, _) ->
read self fd buf i len
*)

View file

@ -2,14 +2,22 @@
type t
module FD : sig
type t = private {
fd: Unix.file_descr;
closed: bool Atomic.t;
}
val close : t -> unit
end
module Impl : sig
type 'st ops = {
clear: 'st -> unit;
get_fd: 'st -> Unix.file_descr -> FD.t;
wakeup_from_outside: 'st -> unit;
on_readable:
'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
on_writable:
'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
on_readable: 'a 'b. 'st -> FD.t -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
on_writable: 'a 'b. 'st -> FD.t -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
step: 'st -> unit;
}
@ -21,12 +29,13 @@ val clear : t -> unit
(** Reset the state *)
val wakeup_from_outside : t -> unit
val get_fd : t -> Unix.file_descr -> FD.t
val step : t -> unit
(** Run one step of the event loop until something happens *)
val on_readable : t -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit
val on_writable : t -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit
val on_readable : t -> FD.t -> 'a -> 'b -> ('a -> 'b -> unit) -> unit
val on_writable : t -> FD.t -> 'a -> 'b -> ('a -> 'b -> unit) -> unit
val run_after_s : t -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit
(**/**)

View file

@ -4,6 +4,8 @@ open struct
let ( let@ ) = ( @@ )
end
module FD = Nanoev.FD
module Global_ = struct
type st =
| None
@ -67,6 +69,10 @@ let[@inline] unwrap_ = function
| None -> ()
| Some (exn, bt) -> Printexc.raise_with_backtrace exn bt
let get_fd (fd : Unix.file_descr) : FD.t =
let ev = get_loop_exn_ () in
Nanoev.get_fd ev fd
let retry_read_ fd f =
let ev = get_loop_exn_ () in
let[@unroll 1] rec loop () =
@ -99,22 +105,24 @@ let retry_write_ fd f =
in
loop ()
let read fd buf i len : int =
let read (fd : FD.t) buf i len : int =
retry_read_ fd (fun () ->
Trace_.message "read";
Unix.read fd buf i len)
Unix.read fd.fd buf i len)
let accept fd =
let accept (fd : FD.t) =
retry_read_ fd (fun () ->
Trace_.message "accept";
Unix.accept fd)
let client_sock, client_addr = Unix.accept fd.fd in
get_fd client_sock, client_addr)
let write fd buf i len : int =
let write (fd : FD.t) buf i len : int =
retry_write_ fd (fun () ->
Trace_.message "write";
Unix.write fd buf i len)
Unix.write fd.fd buf i len)
let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr)
let connect (fd : FD.t) addr =
retry_write_ fd (fun () -> Unix.connect fd.fd addr)
let sleep t =
if t > 0. then (

View file

@ -1,5 +1,7 @@
(** Basic interface with picos *)
module FD = Nanoev.FD
val setup_bg_thread : Nanoev.t -> unit
(** Install this event loop in a background thread *)
@ -8,8 +10,9 @@ val has_bg_thread : unit -> bool
(** {2 Non blocking IO primitives} *)
val read : Unix.file_descr -> bytes -> int -> int -> int
val connect : Unix.file_descr -> Unix.sockaddr -> unit
val write : Unix.file_descr -> bytes -> int -> int -> int
val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr
val get_fd : Unix.file_descr -> FD.t
val read : FD.t -> bytes -> int -> int -> int
val connect : FD.t -> Unix.sockaddr -> unit
val write : FD.t -> bytes -> int -> int -> int
val accept : FD.t -> FD.t * Unix.sockaddr
val sleep : float -> unit

View file

@ -1,5 +1,6 @@
open struct
module Trace_ = Nanoev.Trace_
module FD = Nanoev.FD
let ( let@ ) = ( @@ )
let now_ : unit -> float = Unix.gettimeofday
@ -89,7 +90,7 @@ let wakeup_from_outside (self : st) : unit =
let b = Bytes.make 1 '!' in
ignore (Unix.write self.wakeup_wr b 0 1 : int)
let get_fd_ (self : st) fd : per_fd =
let get_per_fd_ (self : st) fd : per_fd =
match Hashtbl.find self.fds fd with
| per_fd -> per_fd
| exception Not_found ->
@ -97,10 +98,10 @@ let get_fd_ (self : st) fd : per_fd =
Hashtbl.add self.fds fd per_fd;
per_fd
let on_readable self fd x y f : unit =
let on_readable self (fd : FD.t) x y f : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in
let@ self = with_lock_ self in
let per_fd = get_fd_ self fd in
let per_fd = get_per_fd_ self fd in
per_fd.r <- Sub (x, y, f, per_fd.r);
self.sub_up_to_date <- false;
if Atomic.get self.in_select then wakeup_from_outside self
@ -108,7 +109,7 @@ let on_readable self fd x y f : unit =
let on_writable self fd x y f : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in
let@ self = with_lock_ self in
let per_fd = get_fd_ self fd in
let per_fd = get_per_fd_ self fd in
per_fd.w <- Sub (x, y, f, per_fd.w);
self.sub_up_to_date <- false;
if Atomic.get self.in_select then wakeup_from_outside self