refactor: make Nanoev.t an interface type

This commit is contained in:
Simon Cruanes 2024-10-23 23:00:48 -04:00
parent bb84d9d685
commit c346420401
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
7 changed files with 243 additions and 183 deletions

View file

@ -1,182 +1,34 @@
(* module type BACKEND = Intf.BACKEND *) module Impl = struct
type 'st ops = {
let ( let@ ) = ( @@ ) clear: 'st -> unit;
let now_ : unit -> float = Unix.gettimeofday wakeup_from_outside: 'st -> unit;
on_readable:
(** Callback list *) 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
type cbs = on_writable:
| Nil 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
| Sub : 'a * ('a -> unit) * cbs -> cbs run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
step: 'st -> unit;
let[@inline] cb_is_empty = function
| Nil -> true
| Sub _ -> false
type timer_ev =
| Timer : {
deadline: float;
x: 'a;
f: 'a -> unit;
}
-> timer_ev
type per_fd = {
fd: Unix.file_descr;
mutable r: cbs;
mutable w: cbs;
} }
type t = { type t = Ev : 'a ops * 'a -> t
timer: timer_ev Heap.t;
fds: (Unix.file_descr, per_fd) Hashtbl.t;
mutable sub_r: Unix.file_descr list;
mutable sub_w: Unix.file_descr list;
mutable sub_up_to_date: bool;
(** are [sub_r] and [sub_w] faithful reflections of [fds]? *)
lock: Mutex.t;
}
let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline let[@inline] build ops st : t = Ev (ops, st)
end
let create () : t = open Impl
{
timer = Heap.create ~leq:leq_timer ();
fds = Hashtbl.create 16;
sub_r = [];
sub_w = [];
sub_up_to_date = true;
lock = Mutex.create ();
}
let[@inline] with_lock_ (self : t) f = type t = Impl.t
Mutex.lock self.lock;
match f self with
| exception e ->
Mutex.unlock self.lock;
raise e
| res ->
Mutex.unlock self.lock;
res
let clear (self : t) = let[@inline] clear (Ev (ops, st)) = ops.clear st
let@ self = with_lock_ self in let[@inline] wakeup_from_outside (Ev (ops, st)) = ops.wakeup_from_outside st
Heap.clear self.timer;
Hashtbl.clear self.fds;
self.sub_r <- [];
self.sub_w <- [];
self.sub_up_to_date <- true;
()
(* TODO: *) let[@inline] on_readable (Ev (ops, st)) fd x y f : unit =
let wakeup_from_outside _self : unit = () ops.on_readable st fd x y f
let get_fd_ (self : t) fd : per_fd = let[@inline] on_writable (Ev (ops, st)) fd x y f : unit =
match Hashtbl.find self.fds fd with ops.on_writable st fd x y f
| per_fd -> per_fd
| exception Not_found ->
let per_fd = { fd; r = Nil; w = Nil } in
Hashtbl.add self.fds fd per_fd;
per_fd
let on_readable self fd x f : unit = let[@inline] run_after_s (Ev (ops, st)) time x y f : unit =
let@ self = with_lock_ self in ops.run_after_s st time x y f
let per_fd = get_fd_ self fd in
per_fd.r <- Sub (x, f, per_fd.r)
let on_writable self fd x f : unit = let[@inline] step (Ev (ops, st)) : unit = ops.step st
let@ self = with_lock_ self in
let per_fd = get_fd_ self fd in
per_fd.w <- Sub (x, f, per_fd.w)
let run_after_s self time x f : unit =
let@ self = with_lock_ self in
let deadline = now_ () +. time in
Heap.insert self.timer (Timer { deadline; x; f })
let recompute_if_needed (self : t) =
if not self.sub_up_to_date then (
self.sub_up_to_date <- true;
self.sub_r <- [];
self.sub_w <- [];
Hashtbl.iter
(fun fd per_fd ->
if cb_is_empty per_fd.r && cb_is_empty per_fd.w then
Hashtbl.remove self.fds fd;
if not (cb_is_empty per_fd.r) then self.sub_r <- fd :: self.sub_r;
if not (cb_is_empty per_fd.w) then self.sub_w <- fd :: self.sub_w)
self.fds
)
(*
let set fd (ev : Event.t) : unit =
needs_recompute := true;
match Hashtbl.find subs fd with
| exception Not_found -> Hashtbl.add subs fd (ref ev)
| old_ev -> old_ev := Event.(!old_ev lor ev)
let iter_ready f : unit = List.iter (fun (fd, ev) -> f fd ev) !ready_fds
*)
let next_deadline_ (self : t) : float option =
match Heap.peek_min_exn self.timer with
| exception Heap.Empty -> None
| Timer t -> Some t.deadline
let rec perform_cbs = function
| Nil -> ()
| Sub (x, f, tail) ->
f x;
perform_cbs tail
let step (self : t) : unit =
(* gather the subscriptions and timeout *)
let timeout, sub_r, sub_w =
let@ self = with_lock_ self in
recompute_if_needed self;
let timeout =
match next_deadline_ self with
| None ->
let has_waiters = self.sub_r != [] || self.sub_w != [] in
if has_waiters then
1e9
else
0.
| Some d -> max 0. (d -. now_ ())
in
timeout, self.sub_r, self.sub_w
in
let r_reads, r_writes, _ = Unix.select sub_r sub_w [] timeout in
(* gather the [per_fd] that are ready *)
let ready_r = ref [] in
let ready_w = ref [] in
(* gather the [per_fd] that have updates *)
(let@ self = with_lock_ self in
if r_reads != [] || r_writes != [] then self.sub_up_to_date <- false;
List.iter
(fun fd ->
let per_fd = Hashtbl.find self.fds fd in
ready_r := per_fd :: !ready_r)
r_reads;
List.iter
(fun fd ->
let per_fd = Hashtbl.find self.fds fd in
ready_w := per_fd :: !ready_w)
r_writes);
(* call callbacks *)
List.iter
(fun fd ->
perform_cbs fd.r;
fd.r <- Nil)
!ready_r;
List.iter
(fun fd ->
perform_cbs fd.w;
fd.w <- Nil)
!ready_w;
()

View file

@ -1,15 +1,21 @@
(** Nano event loop *) (** Nano event loop *)
(*
module type BACKEND = Intf.BACKEND
val unix : unit -> (module BACKEND)
val create : ?backend:(module BACKEND) -> unit -> t
*)
type t type t
val create : unit -> t module Impl : sig
type 'st ops = {
clear: 'st -> unit;
wakeup_from_outside: 'st -> unit;
on_readable:
'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
on_writable:
'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
step: 'st -> unit;
}
val build : 'a ops -> 'a -> t
end
val clear : t -> unit val clear : t -> unit
(** Reset the state *) (** Reset the state *)
@ -19,6 +25,6 @@ val wakeup_from_outside : t -> unit
val step : t -> unit val step : t -> unit
(** Run one step of the event loop until something happens *) (** Run one step of the event loop until something happens *)
val on_readable : t -> Unix.file_descr -> 'a -> ('a -> unit) -> unit val on_readable : t -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit
val on_writable : t -> Unix.file_descr -> 'a -> ('a -> unit) -> unit val on_writable : t -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit
val run_after_s : t -> float -> 'a -> ('a -> unit) -> unit val run_after_s : t -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit

5
src/unix/dune Normal file
View file

@ -0,0 +1,5 @@
(library
(name nanoev_unix)
(public_name nanoev.unix)
(synopsis "Unix/select backend")
(libraries nanoev unix))

190
src/unix/nanoev_unix.ml Normal file
View file

@ -0,0 +1,190 @@
(* module type BACKEND = Intf.BACKEND *)
let ( let@ ) = ( @@ )
let now_ : unit -> float = Unix.gettimeofday
(** Callback list *)
type cbs =
| Nil
| Sub : 'a * 'b * ('a -> 'b -> unit) * cbs -> cbs
let[@inline] cb_is_empty = function
| Nil -> true
| Sub _ -> false
type timer_ev =
| Timer : {
deadline: float;
x: 'a;
y: 'b;
f: 'a -> 'b -> unit;
}
-> timer_ev
type per_fd = {
fd: Unix.file_descr;
mutable r: cbs;
mutable w: cbs;
}
type st = {
timer: timer_ev Heap.t;
fds: (Unix.file_descr, per_fd) Hashtbl.t;
mutable sub_r: Unix.file_descr list;
mutable sub_w: Unix.file_descr list;
mutable sub_up_to_date: bool;
(** are [sub_r] and [sub_w] faithful reflections of [fds]? *)
lock: Mutex.t;
}
let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline
let create_st () : st =
{
timer = Heap.create ~leq:leq_timer ();
fds = Hashtbl.create 16;
sub_r = [];
sub_w = [];
sub_up_to_date = true;
lock = Mutex.create ();
}
let[@inline] with_lock_ (self : st) f =
Mutex.lock self.lock;
match f self with
| exception e ->
Mutex.unlock self.lock;
raise e
| res ->
Mutex.unlock self.lock;
res
let clear (self : st) =
let@ self = with_lock_ self in
Heap.clear self.timer;
Hashtbl.clear self.fds;
self.sub_r <- [];
self.sub_w <- [];
self.sub_up_to_date <- true;
()
(* TODO: *)
let wakeup_from_outside _self : unit = ()
let get_fd_ (self : st) fd : per_fd =
match Hashtbl.find self.fds fd with
| per_fd -> per_fd
| exception Not_found ->
let per_fd = { fd; r = Nil; w = Nil } in
Hashtbl.add self.fds fd per_fd;
per_fd
let on_readable self fd x y f : unit =
let@ self = with_lock_ self in
let per_fd = get_fd_ self fd in
per_fd.r <- Sub (x, y, f, per_fd.r)
let on_writable self fd x y f : unit =
let@ self = with_lock_ self in
let per_fd = get_fd_ self fd in
per_fd.w <- Sub (x, y, f, per_fd.w)
let run_after_s self time x y f : unit =
let@ self = with_lock_ self in
let deadline = now_ () +. time in
Heap.insert self.timer (Timer { deadline; x; y; f })
let recompute_if_needed (self : st) =
if not self.sub_up_to_date then (
self.sub_up_to_date <- true;
self.sub_r <- [];
self.sub_w <- [];
Hashtbl.iter
(fun fd per_fd ->
if cb_is_empty per_fd.r && cb_is_empty per_fd.w then
Hashtbl.remove self.fds fd;
if not (cb_is_empty per_fd.r) then self.sub_r <- fd :: self.sub_r;
if not (cb_is_empty per_fd.w) then self.sub_w <- fd :: self.sub_w)
self.fds
)
(*
let set fd (ev : Event.t) : unit =
needs_recompute := true;
match Hashtbl.find subs fd with
| exception Not_found -> Hashtbl.add subs fd (ref ev)
| old_ev -> old_ev := Event.(!old_ev lor ev)
let iter_ready f : unit = List.iter (fun (fd, ev) -> f fd ev) !ready_fds
*)
let next_deadline_ (self : st) : float option =
match Heap.peek_min_exn self.timer with
| exception Heap.Empty -> None
| Timer t -> Some t.deadline
let rec perform_cbs = function
| Nil -> ()
| Sub (x, y, f, tail) ->
f x y;
perform_cbs tail
let step (self : st) : unit =
(* gather the subscriptions and timeout *)
let timeout, sub_r, sub_w =
let@ self = with_lock_ self in
recompute_if_needed self;
let timeout =
match next_deadline_ self with
| None ->
let has_waiters = self.sub_r != [] || self.sub_w != [] in
if has_waiters then
1e9
else
0.
| Some d -> max 0. (d -. now_ ())
in
timeout, self.sub_r, self.sub_w
in
let r_reads, r_writes, _ = Unix.select sub_r sub_w [] timeout in
(* gather the [per_fd] that are ready *)
let ready_r = ref [] in
let ready_w = ref [] in
(* gather the [per_fd] that have updates *)
(let@ self = with_lock_ self in
if r_reads != [] || r_writes != [] then self.sub_up_to_date <- false;
List.iter
(fun fd ->
let per_fd = Hashtbl.find self.fds fd in
ready_r := per_fd :: !ready_r)
r_reads;
List.iter
(fun fd ->
let per_fd = Hashtbl.find self.fds fd in
ready_w := per_fd :: !ready_w)
r_writes);
(* call callbacks *)
List.iter
(fun fd ->
perform_cbs fd.r;
fd.r <- Nil)
!ready_r;
List.iter
(fun fd ->
perform_cbs fd.w;
fd.w <- Nil)
!ready_w;
()
let ops : st Nanoev.Impl.ops =
{ step; on_readable; on_writable; run_after_s; wakeup_from_outside; clear }
include Nanoev
let create () : t = Impl.build ops (create_st ())

7
src/unix/nanoev_unix.mli Normal file
View file

@ -0,0 +1,7 @@
(** Nano event loop *)
include module type of struct
include Nanoev
end
val create : unit -> t