mirror of
https://github.com/c-cube/nanoev.git
synced 2025-12-05 19:00:35 -05:00
wip: nanoev-posix, using mtime and iomux
This commit is contained in:
parent
00229d652f
commit
295b3b5c24
7 changed files with 387 additions and 0 deletions
|
|
@ -26,6 +26,13 @@
|
|||
(tags
|
||||
(unix select async)))
|
||||
|
||||
(package
|
||||
(name nanoev-posix)
|
||||
(synopsis "Use mtime+iomux (posix compliant) as a backend for nanoev")
|
||||
(depends ocaml dune base-unix iomux (mtime (>= 2.0)))
|
||||
(tags
|
||||
(unix select async iomux nanoev)))
|
||||
|
||||
(package
|
||||
(name nanoev_tiny_httpd)
|
||||
(synopsis "Use nanoev as a basis for tiny_httpd")
|
||||
|
|
|
|||
6
src/posix/dune
Normal file
6
src/posix/dune
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
(library
|
||||
(name nanoev_posix)
|
||||
(public_name nanoev-posix)
|
||||
(synopsis "posix backend (poll/ppoll+mtime)")
|
||||
(private_modules heap)
|
||||
(libraries nanoev unix iomux mtime mtime.clock.os))
|
||||
61
src/posix/heap.ml
Normal file
61
src/posix/heap.ml
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
type 'a tree =
|
||||
| E
|
||||
| N of int * 'a * 'a tree * 'a tree
|
||||
|
||||
type 'a t = {
|
||||
leq: 'a -> 'a -> bool;
|
||||
mutable t: 'a tree;
|
||||
}
|
||||
|
||||
let create ~leq () : _ t = { leq; t = E }
|
||||
|
||||
let[@inline] is_empty (self : _ t) =
|
||||
match self.t with
|
||||
| E -> true
|
||||
| N _ -> false
|
||||
|
||||
exception Empty
|
||||
|
||||
open struct
|
||||
(** Rank of the tree *)
|
||||
let[@inline] rank_ = function
|
||||
| E -> 0
|
||||
| N (r, _, _, _) -> r
|
||||
|
||||
(** Make a balanced node labelled with [x], and subtrees [a] and [b].
|
||||
We ensure that the right child's rank is ≤ to the rank of the
|
||||
left child (leftist property). The rank of the resulting node
|
||||
is the length of the rightmost path. *)
|
||||
let[@inline] mk_node_ x a b =
|
||||
if rank_ a >= rank_ b then
|
||||
N (rank_ b + 1, x, a, b)
|
||||
else
|
||||
N (rank_ a + 1, x, b, a)
|
||||
|
||||
let rec merge ~leq t1 t2 =
|
||||
match t1, t2 with
|
||||
| t, E -> t
|
||||
| E, t -> t
|
||||
| N (_, x, a1, b1), N (_, y, a2, b2) ->
|
||||
if leq x y then
|
||||
mk_node_ x a1 (merge ~leq b1 t2)
|
||||
else
|
||||
mk_node_ y a2 (merge ~leq t1 b2)
|
||||
end
|
||||
|
||||
let clear self = self.t <- E
|
||||
|
||||
let[@inline] insert (self : _ t) x : unit =
|
||||
self.t <- merge ~leq:self.leq self.t (N (1, x, E, E))
|
||||
|
||||
let[@inline] peek_min_exn (self : _ t) =
|
||||
match self.t with
|
||||
| E -> raise Empty
|
||||
| N (_, x, _, _) -> x
|
||||
|
||||
let[@inline] pop_min_exn (self : _ t) =
|
||||
match self.t with
|
||||
| E -> raise Empty
|
||||
| N (_, x, l, r) ->
|
||||
self.t <- merge ~leq:self.leq l r;
|
||||
x
|
||||
13
src/posix/heap.mli
Normal file
13
src/posix/heap.mli
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
type 'a t
|
||||
|
||||
val create : leq:('a -> 'a -> bool) -> unit -> 'a t
|
||||
|
||||
val is_empty : _ t -> bool
|
||||
(** [is_empty h] returns [true] if the heap [h] is empty. *)
|
||||
|
||||
exception Empty
|
||||
|
||||
val clear : _ t -> unit
|
||||
val insert : 'a t -> 'a -> unit
|
||||
val peek_min_exn : 'a t -> 'a
|
||||
val pop_min_exn : 'a t -> 'a
|
||||
291
src/posix/nanoev_posix.ml
Normal file
291
src/posix/nanoev_posix.ml
Normal file
|
|
@ -0,0 +1,291 @@
|
|||
open struct
|
||||
module Trace_ = Nanoev.Trace_
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
let now_ns : unit -> int64 = Mtime_clock.now_ns
|
||||
end
|
||||
|
||||
module Fd_tbl = Hashtbl.Make (struct
|
||||
open Iomux.Util
|
||||
|
||||
type t = Unix.file_descr
|
||||
|
||||
let equal a b = Int.equal (fd_of_unix a) (fd_of_unix b)
|
||||
let hash a = Hashtbl.hash (fd_of_unix a)
|
||||
end)
|
||||
|
||||
module P = Iomux.Poll
|
||||
module Flags = P.Flags
|
||||
open Iomux.Util
|
||||
|
||||
(** Callback list *)
|
||||
type cbs =
|
||||
| Nil
|
||||
| Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs
|
||||
|
||||
let[@inline] cb_is_empty = function
|
||||
| Nil -> true
|
||||
| Sub _ -> false
|
||||
|
||||
type timer_ev =
|
||||
| Timer : {
|
||||
deadline: float;
|
||||
x: 'a;
|
||||
y: 'b;
|
||||
f: 'a -> 'b -> unit;
|
||||
}
|
||||
-> timer_ev
|
||||
|
||||
type per_fd = {
|
||||
fd: Unix.file_descr;
|
||||
idx: int; (** Index in the buffer *)
|
||||
mutable r: cbs;
|
||||
mutable w: cbs;
|
||||
}
|
||||
|
||||
type st = {
|
||||
timer: timer_ev Heap.t;
|
||||
fds: per_fd Fd_tbl.t;
|
||||
poll: P.t;
|
||||
mutable cur_idx: int; (** index in the [poll] buffer *)
|
||||
mutable idx_freelist: int list;
|
||||
(** previously used indices that are recycled *)
|
||||
wakeup_rd: Unix.file_descr;
|
||||
wakeup_wr: Unix.file_descr;
|
||||
wakeup_triggered: bool Atomic.t;
|
||||
(** Make [wakeup_from_outside] idempotent within an iteration of [step] *)
|
||||
in_poll: bool Atomic.t;
|
||||
(** Are we currently inside a call to [poll]? Useful for other threads to
|
||||
know whether to wake us up via the pipe *)
|
||||
lock: Mutex.t;
|
||||
}
|
||||
|
||||
let rec perform_cbs ~closed = function
|
||||
| Nil -> ()
|
||||
| Sub (x, y, f, tail) ->
|
||||
f ~closed x y;
|
||||
perform_cbs ~closed tail
|
||||
|
||||
let rec perform_cbs_closed ~closed = function
|
||||
| Nil -> ()
|
||||
| Sub (x, y, f, tail) ->
|
||||
f ~closed x y;
|
||||
perform_cbs_closed ~closed tail
|
||||
|
||||
let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline
|
||||
|
||||
let create_st () : st =
|
||||
let wakeup_rd, wakeup_wr = Unix.pipe () in
|
||||
Unix.set_nonblock wakeup_rd;
|
||||
{
|
||||
timer = Heap.create ~leq:leq_timer ();
|
||||
fds = Fd_tbl.create 16;
|
||||
poll = P.create ();
|
||||
cur_idx = 0;
|
||||
idx_freelist = [];
|
||||
wakeup_rd;
|
||||
wakeup_wr;
|
||||
wakeup_triggered = Atomic.make false;
|
||||
in_poll = Atomic.make false;
|
||||
lock = Mutex.create ();
|
||||
}
|
||||
|
||||
let[@inline] with_lock_ (self : st) f =
|
||||
Mutex.lock self.lock;
|
||||
match f self with
|
||||
| exception e ->
|
||||
Mutex.unlock self.lock;
|
||||
raise e
|
||||
| res ->
|
||||
Mutex.unlock self.lock;
|
||||
res
|
||||
|
||||
let clear (self : st) =
|
||||
let@ self = with_lock_ self in
|
||||
Heap.clear self.timer;
|
||||
Fd_tbl.clear self.fds;
|
||||
for i = 0 to P.maxfds self.poll - 1 do
|
||||
P.set_index self.poll i P.invalid_fd Flags.empty
|
||||
done;
|
||||
self.cur_idx <- 0;
|
||||
self.idx_freelist <- [];
|
||||
()
|
||||
|
||||
let wakeup_from_outside (self : st) : unit =
|
||||
if not (Atomic.exchange self.wakeup_triggered true) then
|
||||
let@ _sp =
|
||||
Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.wakeup-from-outside"
|
||||
in
|
||||
let b = Bytes.make 1 '!' in
|
||||
ignore (Unix.write self.wakeup_wr b 0 1 : int)
|
||||
|
||||
let get_fd_ (self : st) fd : per_fd =
|
||||
match Fd_tbl.find self.fds fd with
|
||||
| per_fd -> per_fd
|
||||
| exception Not_found ->
|
||||
let idx =
|
||||
match self.idx_freelist with
|
||||
| i :: tl ->
|
||||
self.idx_freelist <- tl;
|
||||
i
|
||||
| [] ->
|
||||
if self.cur_idx = P.maxfds self.poll then
|
||||
invalid_arg "No available slot in poll";
|
||||
let n = self.cur_idx in
|
||||
self.cur_idx <- self.cur_idx + 1;
|
||||
n
|
||||
in
|
||||
let per_fd = { idx; fd; r = Nil; w = Nil } in
|
||||
Fd_tbl.add self.fds fd per_fd;
|
||||
per_fd
|
||||
|
||||
let close self fd : unit =
|
||||
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in
|
||||
let r, w =
|
||||
let@ self = with_lock_ self in
|
||||
match Fd_tbl.find self.fds fd with
|
||||
| per_fd ->
|
||||
Fd_tbl.remove self.fds fd;
|
||||
self.idx_freelist <- per_fd.idx :: self.idx_freelist;
|
||||
if Atomic.get self.in_poll then wakeup_from_outside self;
|
||||
per_fd.r, per_fd.w
|
||||
| exception Not_found ->
|
||||
invalid_arg "File descriptor is not known to Nanoev"
|
||||
in
|
||||
|
||||
(* call callbacks outside of the lock *)
|
||||
perform_cbs_closed ~closed:true r;
|
||||
perform_cbs_closed ~closed:true w;
|
||||
()
|
||||
|
||||
let on_readable self fd x y f : unit =
|
||||
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in
|
||||
let@ self = with_lock_ self in
|
||||
let per_fd = get_fd_ self fd in
|
||||
per_fd.r <- Sub (x, y, f, per_fd.r);
|
||||
(* FIXME: P.set_index *)
|
||||
self.sub_up_to_date <- false;
|
||||
if Atomic.get self.in_select then wakeup_from_outside self
|
||||
|
||||
let on_writable self fd x y f : unit =
|
||||
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in
|
||||
let@ self = with_lock_ self in
|
||||
let per_fd = get_fd_ self fd in
|
||||
per_fd.w <- Sub (x, y, f, per_fd.w);
|
||||
(* FIXME: P.set_index *)
|
||||
self.sub_up_to_date <- false;
|
||||
if Atomic.get self.in_select then wakeup_from_outside self
|
||||
|
||||
let run_after_s self time x y f : unit =
|
||||
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.run-after-s" in
|
||||
let@ self = with_lock_ self in
|
||||
let deadline = now_ () +. time in
|
||||
Heap.insert self.timer (Timer { deadline; x; y; f });
|
||||
if Atomic.get self.in_select then wakeup_from_outside self
|
||||
|
||||
let recompute_if_needed (self : st) =
|
||||
if not self.sub_up_to_date then (
|
||||
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "recompute-if-needed" in
|
||||
self.sub_up_to_date <- true;
|
||||
self.sub_r <- [];
|
||||
self.sub_w <- [];
|
||||
Hashtbl.iter
|
||||
(fun fd per_fd ->
|
||||
if cb_is_empty per_fd.r && cb_is_empty per_fd.w then
|
||||
Hashtbl.remove self.fds fd;
|
||||
if not (cb_is_empty per_fd.r) then self.sub_r <- fd :: self.sub_r;
|
||||
if not (cb_is_empty per_fd.w) then self.sub_w <- fd :: self.sub_w)
|
||||
self.fds
|
||||
)
|
||||
|
||||
let next_deadline_ (self : st) : float option =
|
||||
match Heap.peek_min_exn self.timer with
|
||||
| exception Heap.Empty -> None
|
||||
| Timer t -> Some t.deadline
|
||||
|
||||
let step (self : st) : unit =
|
||||
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.unix.step" in
|
||||
(* gather the subscriptions and timeout *)
|
||||
let timeout, sub_r, sub_w =
|
||||
let@ self = with_lock_ self in
|
||||
recompute_if_needed self;
|
||||
let timeout =
|
||||
match next_deadline_ self with
|
||||
| None -> 30.
|
||||
| Some d -> max 0. (d -. now_ ())
|
||||
in
|
||||
timeout, self.sub_r, self.sub_w
|
||||
in
|
||||
|
||||
(* enter [select] *)
|
||||
Atomic.set self.in_select true;
|
||||
let r_reads, r_writes, _ =
|
||||
let@ _sp =
|
||||
Trace_.with_span ~__FILE__ ~__LINE__ "select" ~data:(fun () ->
|
||||
[
|
||||
"timeout", `Float timeout;
|
||||
"reads", `Int (List.length sub_r);
|
||||
"writes", `Int (List.length sub_w);
|
||||
])
|
||||
in
|
||||
Unix.select (self.wakeup_rd :: sub_r) sub_w [] timeout
|
||||
in
|
||||
Atomic.set self.in_select false;
|
||||
|
||||
(* drain pipe *)
|
||||
if Atomic.exchange self.wakeup_triggered false then (
|
||||
let b1 = Bytes.create 1 in
|
||||
while try Unix.read self.wakeup_rd b1 0 1 > 0 with _ -> false do
|
||||
()
|
||||
done
|
||||
);
|
||||
|
||||
(* gather the [per_fd] that are ready *)
|
||||
let ready_r = ref [] in
|
||||
let ready_w = ref [] in
|
||||
|
||||
(* gather the [per_fd] that have updates *)
|
||||
(let@ self = with_lock_ self in
|
||||
if r_reads != [] || r_writes != [] then self.sub_up_to_date <- false;
|
||||
|
||||
List.iter
|
||||
(fun fd ->
|
||||
if fd != self.wakeup_rd then (
|
||||
let per_fd = Hashtbl.find self.fds fd in
|
||||
ready_r := per_fd :: !ready_r
|
||||
))
|
||||
r_reads;
|
||||
List.iter
|
||||
(fun fd ->
|
||||
let per_fd = Hashtbl.find self.fds fd in
|
||||
ready_w := per_fd :: !ready_w)
|
||||
r_writes);
|
||||
|
||||
(* call callbacks *)
|
||||
List.iter
|
||||
(fun fd ->
|
||||
perform_cbs ~closed:false fd.r;
|
||||
fd.r <- Nil)
|
||||
!ready_r;
|
||||
List.iter
|
||||
(fun fd ->
|
||||
perform_cbs ~closed:false fd.w;
|
||||
fd.w <- Nil)
|
||||
!ready_w;
|
||||
|
||||
()
|
||||
|
||||
let ops : st Nanoev.Impl.ops =
|
||||
{
|
||||
step;
|
||||
close;
|
||||
on_readable;
|
||||
on_writable;
|
||||
run_after_s;
|
||||
wakeup_from_outside;
|
||||
clear;
|
||||
}
|
||||
|
||||
include Nanoev
|
||||
|
||||
let create () : t = Impl.build ops (create_st ())
|
||||
8
src/posix/nanoev_posix.mli
Normal file
8
src/posix/nanoev_posix.mli
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
(** Nano event loop using Poll/Ppoll *)
|
||||
|
||||
include module type of struct
|
||||
include Nanoev
|
||||
end
|
||||
|
||||
val create : unit -> t
|
||||
val create_with : Iomux.Poll.t -> t
|
||||
|
|
@ -2,4 +2,5 @@
|
|||
(name nanoev_unix)
|
||||
(public_name nanoev.unix)
|
||||
(synopsis "Unix/select backend")
|
||||
(private_modules heap)
|
||||
(libraries nanoev unix))
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue