diff --git a/src/nanoev.ml b/src/nanoev.ml index 2da6ab2..4bf425a 100644 --- a/src/nanoev.ml +++ b/src/nanoev.ml @@ -1,182 +1,34 @@ -(* module type BACKEND = Intf.BACKEND *) - -let ( let@ ) = ( @@ ) -let now_ : unit -> float = Unix.gettimeofday - -(** Callback list *) -type cbs = - | Nil - | Sub : 'a * ('a -> unit) * cbs -> cbs - -let[@inline] cb_is_empty = function - | Nil -> true - | Sub _ -> false - -type timer_ev = - | Timer : { - deadline: float; - x: 'a; - f: 'a -> unit; - } - -> timer_ev - -type per_fd = { - fd: Unix.file_descr; - mutable r: cbs; - mutable w: cbs; -} - -type t = { - timer: timer_ev Heap.t; - fds: (Unix.file_descr, per_fd) Hashtbl.t; - mutable sub_r: Unix.file_descr list; - mutable sub_w: Unix.file_descr list; - mutable sub_up_to_date: bool; - (** are [sub_r] and [sub_w] faithful reflections of [fds]? *) - lock: Mutex.t; -} - -let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline - -let create () : t = - { - timer = Heap.create ~leq:leq_timer (); - fds = Hashtbl.create 16; - sub_r = []; - sub_w = []; - sub_up_to_date = true; - lock = Mutex.create (); +module Impl = struct + type 'st ops = { + clear: 'st -> unit; + wakeup_from_outside: 'st -> unit; + on_readable: + 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + on_writable: + 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + step: 'st -> unit; } -let[@inline] with_lock_ (self : t) f = - Mutex.lock self.lock; - match f self with - | exception e -> - Mutex.unlock self.lock; - raise e - | res -> - Mutex.unlock self.lock; - res + type t = Ev : 'a ops * 'a -> t -let clear (self : t) = - let@ self = with_lock_ self in - Heap.clear self.timer; - Hashtbl.clear self.fds; - self.sub_r <- []; - self.sub_w <- []; - self.sub_up_to_date <- true; - () + let[@inline] build ops st : t = Ev (ops, st) +end -(* TODO: *) -let wakeup_from_outside _self : unit = () +open Impl -let get_fd_ (self : t) fd : per_fd = - match Hashtbl.find self.fds fd with - | per_fd -> per_fd - | exception Not_found -> - let per_fd = { fd; r = Nil; w = Nil } in - Hashtbl.add self.fds fd per_fd; - per_fd +type t = Impl.t -let on_readable self fd x f : unit = - let@ self = with_lock_ self in - let per_fd = get_fd_ self fd in - per_fd.r <- Sub (x, f, per_fd.r) +let[@inline] clear (Ev (ops, st)) = ops.clear st +let[@inline] wakeup_from_outside (Ev (ops, st)) = ops.wakeup_from_outside st -let on_writable self fd x f : unit = - let@ self = with_lock_ self in - let per_fd = get_fd_ self fd in - per_fd.w <- Sub (x, f, per_fd.w) +let[@inline] on_readable (Ev (ops, st)) fd x y f : unit = + ops.on_readable st fd x y f -let run_after_s self time x f : unit = - let@ self = with_lock_ self in - let deadline = now_ () +. time in - Heap.insert self.timer (Timer { deadline; x; f }) +let[@inline] on_writable (Ev (ops, st)) fd x y f : unit = + ops.on_writable st fd x y f -let recompute_if_needed (self : t) = - if not self.sub_up_to_date then ( - self.sub_up_to_date <- true; - self.sub_r <- []; - self.sub_w <- []; - Hashtbl.iter - (fun fd per_fd -> - if cb_is_empty per_fd.r && cb_is_empty per_fd.w then - Hashtbl.remove self.fds fd; - if not (cb_is_empty per_fd.r) then self.sub_r <- fd :: self.sub_r; - if not (cb_is_empty per_fd.w) then self.sub_w <- fd :: self.sub_w) - self.fds - ) +let[@inline] run_after_s (Ev (ops, st)) time x y f : unit = + ops.run_after_s st time x y f -(* - let set fd (ev : Event.t) : unit = - needs_recompute := true; - match Hashtbl.find subs fd with - | exception Not_found -> Hashtbl.add subs fd (ref ev) - | old_ev -> old_ev := Event.(!old_ev lor ev) - - let iter_ready f : unit = List.iter (fun (fd, ev) -> f fd ev) !ready_fds -*) - -let next_deadline_ (self : t) : float option = - match Heap.peek_min_exn self.timer with - | exception Heap.Empty -> None - | Timer t -> Some t.deadline - -let rec perform_cbs = function - | Nil -> () - | Sub (x, f, tail) -> - f x; - perform_cbs tail - -let step (self : t) : unit = - (* gather the subscriptions and timeout *) - let timeout, sub_r, sub_w = - let@ self = with_lock_ self in - recompute_if_needed self; - let timeout = - match next_deadline_ self with - | None -> - let has_waiters = self.sub_r != [] || self.sub_w != [] in - if has_waiters then - 1e9 - else - 0. - | Some d -> max 0. (d -. now_ ()) - in - timeout, self.sub_r, self.sub_w - in - - let r_reads, r_writes, _ = Unix.select sub_r sub_w [] timeout in - - (* gather the [per_fd] that are ready *) - let ready_r = ref [] in - let ready_w = ref [] in - - (* gather the [per_fd] that have updates *) - (let@ self = with_lock_ self in - if r_reads != [] || r_writes != [] then self.sub_up_to_date <- false; - - List.iter - (fun fd -> - let per_fd = Hashtbl.find self.fds fd in - ready_r := per_fd :: !ready_r) - r_reads; - List.iter - (fun fd -> - let per_fd = Hashtbl.find self.fds fd in - ready_w := per_fd :: !ready_w) - r_writes); - - (* call callbacks *) - List.iter - (fun fd -> - perform_cbs fd.r; - fd.r <- Nil) - !ready_r; - List.iter - (fun fd -> - perform_cbs fd.w; - fd.w <- Nil) - !ready_w; - - () +let[@inline] step (Ev (ops, st)) : unit = ops.step st diff --git a/src/nanoev.mli b/src/nanoev.mli index 8e28c97..fcbd0b8 100644 --- a/src/nanoev.mli +++ b/src/nanoev.mli @@ -1,15 +1,21 @@ (** Nano event loop *) -(* -module type BACKEND = Intf.BACKEND - -val unix : unit -> (module BACKEND) -val create : ?backend:(module BACKEND) -> unit -> t -*) - type t -val create : unit -> t +module Impl : sig + type 'st ops = { + clear: 'st -> unit; + wakeup_from_outside: 'st -> unit; + on_readable: + 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + on_writable: + 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + step: 'st -> unit; + } + + val build : 'a ops -> 'a -> t +end val clear : t -> unit (** Reset the state *) @@ -19,6 +25,6 @@ val wakeup_from_outside : t -> unit val step : t -> unit (** Run one step of the event loop until something happens *) -val on_readable : t -> Unix.file_descr -> 'a -> ('a -> unit) -> unit -val on_writable : t -> Unix.file_descr -> 'a -> ('a -> unit) -> unit -val run_after_s : t -> float -> 'a -> ('a -> unit) -> unit +val on_readable : t -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit +val on_writable : t -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit +val run_after_s : t -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit diff --git a/src/unix/dune b/src/unix/dune new file mode 100644 index 0000000..4f14854 --- /dev/null +++ b/src/unix/dune @@ -0,0 +1,5 @@ +(library + (name nanoev_unix) + (public_name nanoev.unix) + (synopsis "Unix/select backend") + (libraries nanoev unix)) diff --git a/src/heap.ml b/src/unix/heap.ml similarity index 100% rename from src/heap.ml rename to src/unix/heap.ml diff --git a/src/heap.mli b/src/unix/heap.mli similarity index 100% rename from src/heap.mli rename to src/unix/heap.mli diff --git a/src/unix/nanoev_unix.ml b/src/unix/nanoev_unix.ml new file mode 100644 index 0000000..3d54a5a --- /dev/null +++ b/src/unix/nanoev_unix.ml @@ -0,0 +1,190 @@ +(* module type BACKEND = Intf.BACKEND *) + +let ( let@ ) = ( @@ ) +let now_ : unit -> float = Unix.gettimeofday + +(** Callback list *) +type cbs = + | Nil + | Sub : 'a * 'b * ('a -> 'b -> unit) * cbs -> cbs + +let[@inline] cb_is_empty = function + | Nil -> true + | Sub _ -> false + +type timer_ev = + | Timer : { + deadline: float; + x: 'a; + y: 'b; + f: 'a -> 'b -> unit; + } + -> timer_ev + +type per_fd = { + fd: Unix.file_descr; + mutable r: cbs; + mutable w: cbs; +} + +type st = { + timer: timer_ev Heap.t; + fds: (Unix.file_descr, per_fd) Hashtbl.t; + mutable sub_r: Unix.file_descr list; + mutable sub_w: Unix.file_descr list; + mutable sub_up_to_date: bool; + (** are [sub_r] and [sub_w] faithful reflections of [fds]? *) + lock: Mutex.t; +} + +let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline + +let create_st () : st = + { + timer = Heap.create ~leq:leq_timer (); + fds = Hashtbl.create 16; + sub_r = []; + sub_w = []; + sub_up_to_date = true; + lock = Mutex.create (); + } + +let[@inline] with_lock_ (self : st) f = + Mutex.lock self.lock; + match f self with + | exception e -> + Mutex.unlock self.lock; + raise e + | res -> + Mutex.unlock self.lock; + res + +let clear (self : st) = + let@ self = with_lock_ self in + Heap.clear self.timer; + Hashtbl.clear self.fds; + self.sub_r <- []; + self.sub_w <- []; + self.sub_up_to_date <- true; + () + +(* TODO: *) +let wakeup_from_outside _self : unit = () + +let get_fd_ (self : st) fd : per_fd = + match Hashtbl.find self.fds fd with + | per_fd -> per_fd + | exception Not_found -> + let per_fd = { fd; r = Nil; w = Nil } in + Hashtbl.add self.fds fd per_fd; + per_fd + +let on_readable self fd x y f : unit = + let@ self = with_lock_ self in + let per_fd = get_fd_ self fd in + per_fd.r <- Sub (x, y, f, per_fd.r) + +let on_writable self fd x y f : unit = + let@ self = with_lock_ self in + let per_fd = get_fd_ self fd in + per_fd.w <- Sub (x, y, f, per_fd.w) + +let run_after_s self time x y f : unit = + let@ self = with_lock_ self in + let deadline = now_ () +. time in + Heap.insert self.timer (Timer { deadline; x; y; f }) + +let recompute_if_needed (self : st) = + if not self.sub_up_to_date then ( + self.sub_up_to_date <- true; + self.sub_r <- []; + self.sub_w <- []; + Hashtbl.iter + (fun fd per_fd -> + if cb_is_empty per_fd.r && cb_is_empty per_fd.w then + Hashtbl.remove self.fds fd; + if not (cb_is_empty per_fd.r) then self.sub_r <- fd :: self.sub_r; + if not (cb_is_empty per_fd.w) then self.sub_w <- fd :: self.sub_w) + self.fds + ) + +(* + let set fd (ev : Event.t) : unit = + needs_recompute := true; + match Hashtbl.find subs fd with + | exception Not_found -> Hashtbl.add subs fd (ref ev) + | old_ev -> old_ev := Event.(!old_ev lor ev) + + let iter_ready f : unit = List.iter (fun (fd, ev) -> f fd ev) !ready_fds +*) + +let next_deadline_ (self : st) : float option = + match Heap.peek_min_exn self.timer with + | exception Heap.Empty -> None + | Timer t -> Some t.deadline + +let rec perform_cbs = function + | Nil -> () + | Sub (x, y, f, tail) -> + f x y; + perform_cbs tail + +let step (self : st) : unit = + (* gather the subscriptions and timeout *) + let timeout, sub_r, sub_w = + let@ self = with_lock_ self in + recompute_if_needed self; + let timeout = + match next_deadline_ self with + | None -> + let has_waiters = self.sub_r != [] || self.sub_w != [] in + if has_waiters then + 1e9 + else + 0. + | Some d -> max 0. (d -. now_ ()) + in + timeout, self.sub_r, self.sub_w + in + + let r_reads, r_writes, _ = Unix.select sub_r sub_w [] timeout in + + (* gather the [per_fd] that are ready *) + let ready_r = ref [] in + let ready_w = ref [] in + + (* gather the [per_fd] that have updates *) + (let@ self = with_lock_ self in + if r_reads != [] || r_writes != [] then self.sub_up_to_date <- false; + + List.iter + (fun fd -> + let per_fd = Hashtbl.find self.fds fd in + ready_r := per_fd :: !ready_r) + r_reads; + List.iter + (fun fd -> + let per_fd = Hashtbl.find self.fds fd in + ready_w := per_fd :: !ready_w) + r_writes); + + (* call callbacks *) + List.iter + (fun fd -> + perform_cbs fd.r; + fd.r <- Nil) + !ready_r; + List.iter + (fun fd -> + perform_cbs fd.w; + fd.w <- Nil) + !ready_w; + + () + +let ops : st Nanoev.Impl.ops = + { step; on_readable; on_writable; run_after_s; wakeup_from_outside; clear } + +include Nanoev + +let create () : t = Impl.build ops (create_st ()) diff --git a/src/unix/nanoev_unix.mli b/src/unix/nanoev_unix.mli new file mode 100644 index 0000000..5aba925 --- /dev/null +++ b/src/unix/nanoev_unix.mli @@ -0,0 +1,7 @@ +(** Nano event loop *) + +include module type of struct + include Nanoev +end + +val create : unit -> t