mirror of
https://github.com/c-cube/nanoev.git
synced 2025-12-06 03:05:32 -05:00
feat(posix): first working version
This commit is contained in:
parent
26bdb34cba
commit
adc468b59d
2 changed files with 270 additions and 211 deletions
|
|
@ -3,6 +3,8 @@ open struct
|
||||||
|
|
||||||
let ( let@ ) = ( @@ )
|
let ( let@ ) = ( @@ )
|
||||||
let now_ns : unit -> int64 = Mtime_clock.now_ns
|
let now_ns : unit -> int64 = Mtime_clock.now_ns
|
||||||
|
let[@inline] ns_of_s (t : float) : int64 = Int64.of_float (t *. 1e9)
|
||||||
|
let[@inline] ns_to_s (t : int64) : float = Int64.to_float t /. 1e9
|
||||||
end
|
end
|
||||||
|
|
||||||
module Fd_tbl = Hashtbl.Make (struct
|
module Fd_tbl = Hashtbl.Make (struct
|
||||||
|
|
@ -16,9 +18,7 @@ end)
|
||||||
|
|
||||||
module P = Iomux.Poll
|
module P = Iomux.Poll
|
||||||
module Flags = P.Flags
|
module Flags = P.Flags
|
||||||
open Iomux.Util
|
|
||||||
|
|
||||||
(* TODO: remove
|
|
||||||
module Sync_queue = struct
|
module Sync_queue = struct
|
||||||
type 'a t = {
|
type 'a t = {
|
||||||
q: 'a Queue.t;
|
q: 'a Queue.t;
|
||||||
|
|
@ -31,96 +31,94 @@ module Sync_queue = struct
|
||||||
Mutex.lock self.mutex;
|
Mutex.lock self.mutex;
|
||||||
Queue.push x self.q;
|
Queue.push x self.q;
|
||||||
Mutex.unlock self.mutex
|
Mutex.unlock self.mutex
|
||||||
|
|
||||||
|
let transfer (self : _ t) q : unit =
|
||||||
|
Mutex.lock self.mutex;
|
||||||
|
Queue.transfer self.q q;
|
||||||
|
Mutex.unlock self.mutex
|
||||||
end
|
end
|
||||||
*)
|
|
||||||
|
|
||||||
(** Callback list *)
|
(** Callback list *)
|
||||||
type cbs =
|
type cbs =
|
||||||
| Nil
|
| Nil
|
||||||
| Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs
|
| Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs
|
||||||
|
|
||||||
(** Single callback *)
|
|
||||||
type cb = Cb : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) -> cb
|
|
||||||
|
|
||||||
let[@inline] cb_is_empty = function
|
|
||||||
| Nil -> true
|
|
||||||
| Sub _ -> false
|
|
||||||
|
|
||||||
type timer_ev =
|
type timer_ev =
|
||||||
| Timer : {
|
| Timer : {
|
||||||
deadline: float;
|
deadline: int64;
|
||||||
x: 'a;
|
x: 'a;
|
||||||
y: 'b;
|
y: 'b;
|
||||||
f: 'a -> 'b -> unit;
|
f: 'a -> 'b -> unit;
|
||||||
}
|
}
|
||||||
-> timer_ev
|
-> timer_ev
|
||||||
|
|
||||||
type per_fd = {
|
type fd_data = {
|
||||||
fd: Unix.file_descr;
|
fd: Unix.file_descr;
|
||||||
mutable idx: int;
|
mutable idx: int;
|
||||||
(** Index in the buffer. Can change because we swap FDs sometimes to
|
(** Index in the poll buffer. Mutable because we might change it when we
|
||||||
remove items. *)
|
swap FDs to remove items. *)
|
||||||
mutable r: cbs;
|
mutable r: cbs;
|
||||||
mutable w: cbs;
|
mutable w: cbs;
|
||||||
}
|
}
|
||||||
|
(** Data associated to a given FD *)
|
||||||
|
|
||||||
let[@inline] per_fd_flags (self : per_fd) : Flags.t =
|
let[@inline] fd_flags (self : fd_data) : Flags.t =
|
||||||
let fl = ref Flags.empty in
|
let fl = ref Flags.empty in
|
||||||
(if self.r <> Nil then fl := Flags.(!fl + pollin));
|
(if self.r != Nil then fl := Flags.(!fl + pollin));
|
||||||
(if self.w <> Nil then fl := Flags.(!fl + pollout));
|
(if self.w != Nil then fl := Flags.(!fl + pollout));
|
||||||
!fl
|
!fl
|
||||||
|
|
||||||
type queued_task =
|
type queued_task =
|
||||||
| Q_timer of timer_ev
|
| Q_run_after of timer_ev
|
||||||
| Q_on_readable of Unix.file_descr * cb
|
| Q_on_readable :
|
||||||
| Q_on_writable of Unix.file_descr * cb
|
Unix.file_descr * 'a * 'b * (closed:bool -> 'a -> 'b -> unit)
|
||||||
|
-> queued_task
|
||||||
|
| Q_on_writable :
|
||||||
|
Unix.file_descr * 'a * 'b * (closed:bool -> 'a -> 'b -> unit)
|
||||||
|
-> queued_task
|
||||||
| Q_clear
|
| Q_clear
|
||||||
|
| Q_close of Unix.file_descr
|
||||||
|
|
||||||
type st = {
|
type st = {
|
||||||
timer: timer_ev Heap.t;
|
timer: timer_ev Heap.t;
|
||||||
fds: per_fd Fd_tbl.t;
|
fds: fd_data Fd_tbl.t;
|
||||||
poll: P.t;
|
poll: P.t;
|
||||||
mutable len: int; (** length of the active prefix of the [poll] buffer *)
|
mutable len: int; (** length of the active prefix of the [poll] buffer *)
|
||||||
wakeup_rd: Unix.file_descr;
|
wakeup_rd: Unix.file_descr;
|
||||||
wakeup_wr: Unix.file_descr;
|
wakeup_wr: Unix.file_descr;
|
||||||
wakeup_triggered: bool Atomic.t;
|
wakeup_triggered: bool Atomic.t;
|
||||||
(** Make [wakeup_from_outside] idempotent within an iteration of [step] *)
|
(** Make [wakeup_from_outside] idempotent within an iteration of [step] *)
|
||||||
in_poll: Thread.t option Atomic.t;
|
in_poll: bool Atomic.t;
|
||||||
(** Are we currently inside a call to [poll], and in which thread? Useful
|
(** Are we currently inside a call to [poll], and in which thread? Useful
|
||||||
for other threads to know whether to wake us up via the pipe *)
|
for other threads to know whether to wake us up via the pipe *)
|
||||||
queued_tasks: queued_task Queue.t;
|
mutable owner_thread: int;
|
||||||
|
(** Thread allowed to perform operations on this poll instance. Starts at
|
||||||
|
[-1]. *)
|
||||||
|
queued_tasks: queued_task Sync_queue.t;
|
||||||
(** While in [poll()], changes get queued, so we don't invalidate the poll
|
(** While in [poll()], changes get queued, so we don't invalidate the poll
|
||||||
buffer before the syscall returns *)
|
buffer before the syscall returns *)
|
||||||
lock: Mutex.t;
|
|
||||||
}
|
}
|
||||||
|
(* TODO: [Thread.t] field to remember the owner thread, and
|
||||||
|
thread-safe queue for externally queued tasks.
|
||||||
|
Only owner thread can call [step]. *)
|
||||||
|
|
||||||
let rec perform_cbs ~closed = function
|
let[@inline] queue_task_ (self : st) t : unit =
|
||||||
| Nil -> ()
|
Sync_queue.push self.queued_tasks t
|
||||||
| Sub (x, y, f, tail) ->
|
|
||||||
f ~closed x y;
|
|
||||||
perform_cbs ~closed tail
|
|
||||||
|
|
||||||
let rec perform_cbs_closed ~closed = function
|
(** [true] if called from the owner thread *)
|
||||||
| Nil -> ()
|
let[@inline] in_owner_thread (self : st) : bool =
|
||||||
| Sub (x, y, f, tail) ->
|
self.owner_thread != -1 && self.owner_thread == Thread.(id (self ()))
|
||||||
f ~closed x y;
|
|
||||||
perform_cbs_closed ~closed tail
|
|
||||||
|
|
||||||
|
let[@inline] in_poll (self : st) : bool = Atomic.get self.in_poll
|
||||||
let[@inline] leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline
|
let[@inline] leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline
|
||||||
|
|
||||||
let[@inline] with_lock_ (self : st) f =
|
|
||||||
Mutex.lock self.lock;
|
|
||||||
match f self with
|
|
||||||
| exception e ->
|
|
||||||
Mutex.unlock self.lock;
|
|
||||||
raise e
|
|
||||||
| res ->
|
|
||||||
Mutex.unlock self.lock;
|
|
||||||
res
|
|
||||||
|
|
||||||
let create_st () : st =
|
let create_st () : st =
|
||||||
let wakeup_rd, wakeup_wr = Unix.pipe () in
|
let wakeup_rd, wakeup_wr = Unix.pipe () in
|
||||||
|
(* reading end must be non blocking so it's not always immediately
|
||||||
|
ready; writing end is blocking to make it simpler to wakeup from other
|
||||||
|
threads *)
|
||||||
Unix.set_nonblock wakeup_rd;
|
Unix.set_nonblock wakeup_rd;
|
||||||
|
let self =
|
||||||
{
|
{
|
||||||
timer = Heap.create ~leq:leq_timer ();
|
timer = Heap.create ~leq:leq_timer ();
|
||||||
fds = Fd_tbl.create 16;
|
fds = Fd_tbl.create 16;
|
||||||
|
|
@ -129,32 +127,65 @@ let create_st () : st =
|
||||||
wakeup_rd;
|
wakeup_rd;
|
||||||
wakeup_wr;
|
wakeup_wr;
|
||||||
wakeup_triggered = Atomic.make false;
|
wakeup_triggered = Atomic.make false;
|
||||||
in_poll = Atomic.make None;
|
in_poll = Atomic.make false;
|
||||||
queued_tasks = Queue.create ();
|
owner_thread = -1;
|
||||||
lock = Mutex.create ();
|
queued_tasks = Sync_queue.create ();
|
||||||
}
|
}
|
||||||
|
in
|
||||||
|
|
||||||
|
(* always watch for the pipe being readable *)
|
||||||
|
P.set_index self.poll 0 self.wakeup_rd Flags.pollin;
|
||||||
|
self.len <- 1;
|
||||||
|
|
||||||
|
self
|
||||||
|
|
||||||
let max_fds (self : st) : int = P.maxfds self.poll
|
let max_fds (self : st) : int = P.maxfds self.poll
|
||||||
|
|
||||||
let clear (self : st) =
|
let[@inline never] wakeup_real_ (self : st) : unit =
|
||||||
let@ self = with_lock_ self in
|
|
||||||
Heap.clear self.timer;
|
|
||||||
Fd_tbl.clear self.fds;
|
|
||||||
for i = 0 to P.maxfds self.poll - 1 do
|
|
||||||
P.set_index self.poll i P.invalid_fd Flags.empty
|
|
||||||
done;
|
|
||||||
self.len <- 0;
|
|
||||||
()
|
|
||||||
|
|
||||||
let wakeup_from_outside (self : st) : unit =
|
|
||||||
if not (Atomic.exchange self.wakeup_triggered true) then
|
|
||||||
let@ _sp =
|
let@ _sp =
|
||||||
Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.wakeup-from-outside"
|
Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.wakeup-from-outside"
|
||||||
in
|
in
|
||||||
let b = Bytes.make 1 '!' in
|
let b = Bytes.make 1 '!' in
|
||||||
ignore (Unix.write self.wakeup_wr b 0 1 : int)
|
ignore (Unix.write self.wakeup_wr b 0 1 : int)
|
||||||
|
|
||||||
let get_fd_ (self : st) fd : per_fd =
|
let[@inline] wakeup_ (self : st) : unit =
|
||||||
|
if not (Atomic.exchange self.wakeup_triggered true) then wakeup_real_ self
|
||||||
|
|
||||||
|
let wakeup_from_outside (self : st) : unit =
|
||||||
|
let already_awake =
|
||||||
|
(* to avoid race conditions we only take the shortcut if
|
||||||
|
this is called from the owner thread *)
|
||||||
|
in_owner_thread self && not (Atomic.get self.in_poll)
|
||||||
|
in
|
||||||
|
if not already_awake then wakeup_ self
|
||||||
|
|
||||||
|
let rec perform_cbs ~closed = function
|
||||||
|
| Nil -> ()
|
||||||
|
| Sub (x, y, f, tail) ->
|
||||||
|
f ~closed x y;
|
||||||
|
perform_cbs ~closed tail
|
||||||
|
|
||||||
|
(** Change the event loop right now. This must be called only from the owner
|
||||||
|
thread and outside of [poll]. *)
|
||||||
|
module Run_now_ = struct
|
||||||
|
let rec perform_cbs_closed ~closed = function
|
||||||
|
| Nil -> ()
|
||||||
|
| Sub (x, y, f, tail) ->
|
||||||
|
f ~closed x y;
|
||||||
|
perform_cbs_closed ~closed tail
|
||||||
|
|
||||||
|
let clear_ (self : st) : unit =
|
||||||
|
Heap.clear self.timer;
|
||||||
|
Fd_tbl.clear self.fds;
|
||||||
|
for i = 0 to P.maxfds self.poll - 1 do
|
||||||
|
P.set_index self.poll i P.invalid_fd Flags.empty
|
||||||
|
done;
|
||||||
|
Atomic.set self.wakeup_triggered false;
|
||||||
|
self.len <- 0;
|
||||||
|
()
|
||||||
|
|
||||||
|
let get_fd_ (self : st) fd : fd_data =
|
||||||
|
(* assert (in_owner_thread self && not (in_poll self)); *)
|
||||||
match Fd_tbl.find self.fds fd with
|
match Fd_tbl.find self.fds fd with
|
||||||
| per_fd -> per_fd
|
| per_fd -> per_fd
|
||||||
| exception Not_found ->
|
| exception Not_found ->
|
||||||
|
|
@ -169,148 +200,174 @@ let get_fd_ (self : st) fd : per_fd =
|
||||||
Fd_tbl.add self.fds fd per_fd;
|
Fd_tbl.add self.fds fd per_fd;
|
||||||
per_fd
|
per_fd
|
||||||
|
|
||||||
|
let remove_fd_ (self : st) (fd_data : fd_data) : unit =
|
||||||
|
Fd_tbl.remove self.fds fd_data.fd;
|
||||||
|
P.set_index self.poll fd_data.idx P.invalid_fd Flags.empty;
|
||||||
|
|
||||||
|
(* assert (in_owner_thread self && not (in_poll self)); *)
|
||||||
|
if fd_data.idx > 0 && fd_data.idx + 1 < self.len then (
|
||||||
|
(* not the last element nor the first (pipe_rd), move the last element
|
||||||
|
here to keep the buffer non sparse *)
|
||||||
|
let last_fd = P.get_fd self.poll (self.len - 1) in
|
||||||
|
assert (last_fd <> fd_data.fd);
|
||||||
|
match Fd_tbl.find_opt self.fds last_fd with
|
||||||
|
| None -> assert false
|
||||||
|
| Some last_fd_data ->
|
||||||
|
(* move the last FD to [idx] *)
|
||||||
|
last_fd_data.idx <- fd_data.idx;
|
||||||
|
P.set_index self.poll fd_data.idx last_fd (fd_flags last_fd_data)
|
||||||
|
);
|
||||||
|
|
||||||
|
self.len <- self.len - 1;
|
||||||
|
()
|
||||||
|
|
||||||
let close_ (self : st) fd : unit =
|
let close_ (self : st) fd : unit =
|
||||||
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in
|
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in
|
||||||
let r, w =
|
let r, w =
|
||||||
let@ self = with_lock_ self in
|
|
||||||
match Fd_tbl.find self.fds fd with
|
match Fd_tbl.find self.fds fd with
|
||||||
| per_fd ->
|
| fd_data ->
|
||||||
Fd_tbl.remove self.fds fd;
|
remove_fd_ self fd_data;
|
||||||
|
fd_data.r, fd_data.w
|
||||||
(* not the last element, move the last element here *)
|
| exception Not_found -> Nil, Nil
|
||||||
if per_fd.idx + 1 < self.len then (
|
|
||||||
let last_fd = P.get_fd self.poll (self.len - 1) in
|
|
||||||
match Fd_tbl.find_opt self.fds last_fd with
|
|
||||||
| None -> ()
|
|
||||||
| Some last_per_fd ->
|
|
||||||
last_per_fd.idx <- per_fd.idx;
|
|
||||||
P.set_index self.poll per_fd.idx last_fd (per_fd_flags last_per_fd)
|
|
||||||
);
|
|
||||||
self.len <- self.len - 1;
|
|
||||||
per_fd.r, per_fd.w
|
|
||||||
| exception Not_found ->
|
|
||||||
invalid_arg "File descriptor is not known to Nanoev"
|
|
||||||
in
|
in
|
||||||
|
|
||||||
(* call callbacks outside of the lock's critical section *)
|
|
||||||
perform_cbs_closed ~closed:true r;
|
perform_cbs_closed ~closed:true r;
|
||||||
perform_cbs_closed ~closed:true w;
|
perform_cbs_closed ~closed:true w;
|
||||||
()
|
()
|
||||||
|
|
||||||
let on_readable self fd x y f : unit =
|
let on_readable_ self fd x y f : unit =
|
||||||
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in
|
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in
|
||||||
let@ self = with_lock_ self in
|
let fd_data = get_fd_ self fd in
|
||||||
let per_fd = get_fd_ self fd in
|
fd_data.r <- Sub (x, y, f, fd_data.r);
|
||||||
per_fd.r <- Sub (x, y, f, per_fd.r);
|
P.set_index self.poll fd_data.idx fd (fd_flags fd_data)
|
||||||
(* FIXME: P.set_index *)
|
|
||||||
self.sub_up_to_date <- false;
|
|
||||||
if Atomic.get self.in_select then wakeup_from_outside self
|
|
||||||
|
|
||||||
let on_writable self fd x y f : unit =
|
let on_writable_ self fd x y f : unit =
|
||||||
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in
|
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in
|
||||||
let@ self = with_lock_ self in
|
let fd_data = get_fd_ self fd in
|
||||||
let per_fd = get_fd_ self fd in
|
fd_data.w <- Sub (x, y, f, fd_data.w);
|
||||||
per_fd.w <- Sub (x, y, f, per_fd.w);
|
P.set_index self.poll fd_data.idx fd (fd_flags fd_data)
|
||||||
(* FIXME: P.set_index *)
|
|
||||||
self.sub_up_to_date <- false;
|
|
||||||
if Atomic.get self.in_select then wakeup_from_outside self
|
|
||||||
|
|
||||||
let run_after_s self time x y f : unit =
|
let run_after_s_ self ev : unit = Heap.insert self.timer ev
|
||||||
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.run-after-s" in
|
|
||||||
let@ self = with_lock_ self in
|
|
||||||
let deadline = now_ () +. time in
|
|
||||||
Heap.insert self.timer (Timer { deadline; x; y; f });
|
|
||||||
if Atomic.get self.in_select then wakeup_from_outside self
|
|
||||||
|
|
||||||
let recompute_if_needed (self : st) =
|
let perform_task_ self (t : queued_task) : unit =
|
||||||
if not self.sub_up_to_date then (
|
match t with
|
||||||
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "recompute-if-needed" in
|
| Q_run_after t -> run_after_s_ self t
|
||||||
self.sub_up_to_date <- true;
|
| Q_on_readable (fd, x, y, f) -> on_readable_ self fd x y f
|
||||||
self.sub_r <- [];
|
| Q_on_writable (fd, x, y, f) -> on_writable_ self fd x y f
|
||||||
self.sub_w <- [];
|
| Q_clear -> clear_ self
|
||||||
Hashtbl.iter
|
| Q_close fd -> close_ self fd
|
||||||
(fun fd per_fd ->
|
end
|
||||||
if cb_is_empty per_fd.r && cb_is_empty per_fd.w then
|
|
||||||
Hashtbl.remove self.fds fd;
|
let clear (self : st) =
|
||||||
if not (cb_is_empty per_fd.r) then self.sub_r <- fd :: self.sub_r;
|
if in_owner_thread self && not (in_poll self) then
|
||||||
if not (cb_is_empty per_fd.w) then self.sub_w <- fd :: self.sub_w)
|
Run_now_.clear_ self
|
||||||
self.fds
|
else (
|
||||||
|
queue_task_ self @@ Q_clear;
|
||||||
|
wakeup_from_outside self
|
||||||
)
|
)
|
||||||
|
|
||||||
let next_deadline_ (self : st) : float option =
|
let close (self : st) fd : unit =
|
||||||
|
if in_owner_thread self && not (in_poll self) then
|
||||||
|
Run_now_.close_ self fd
|
||||||
|
else (
|
||||||
|
queue_task_ self @@ Q_close fd;
|
||||||
|
wakeup_from_outside self
|
||||||
|
)
|
||||||
|
|
||||||
|
let on_readable self fd x y f : unit =
|
||||||
|
if in_owner_thread self && not (in_poll self) then
|
||||||
|
Run_now_.on_readable_ self fd x y f
|
||||||
|
else (
|
||||||
|
queue_task_ self @@ Q_on_readable (fd, x, y, f);
|
||||||
|
wakeup_from_outside self
|
||||||
|
)
|
||||||
|
|
||||||
|
let on_writable self fd x y f : unit =
|
||||||
|
if in_owner_thread self && not (in_poll self) then
|
||||||
|
Run_now_.on_writable_ self fd x y f
|
||||||
|
else (
|
||||||
|
queue_task_ self @@ Q_on_writable (fd, x, y, f);
|
||||||
|
wakeup_from_outside self
|
||||||
|
)
|
||||||
|
|
||||||
|
let run_after_s self (time : float) x y f : unit =
|
||||||
|
let deadline = Int64.add (now_ns ()) (ns_of_s time) in
|
||||||
|
let ev = Timer { deadline; x; y; f } in
|
||||||
|
if in_owner_thread self && not (in_poll self) then
|
||||||
|
Run_now_.run_after_s_ self ev
|
||||||
|
else (
|
||||||
|
queue_task_ self @@ Q_run_after ev;
|
||||||
|
wakeup_from_outside self
|
||||||
|
)
|
||||||
|
|
||||||
|
let next_deadline_ (self : st) : int64 option =
|
||||||
match Heap.peek_min_exn self.timer with
|
match Heap.peek_min_exn self.timer with
|
||||||
| exception Heap.Empty -> None
|
| exception Heap.Empty -> None
|
||||||
| Timer t -> Some t.deadline
|
| Timer t -> Some t.deadline
|
||||||
|
|
||||||
let step (self : st) : unit =
|
let step (self : st) : unit =
|
||||||
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.unix.step" in
|
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.posix.step" in
|
||||||
(* gather the subscriptions and timeout *)
|
|
||||||
let timeout, sub_r, sub_w =
|
self.owner_thread <- Thread.(id (self ()));
|
||||||
let@ self = with_lock_ self in
|
let timeout_ns : int64 =
|
||||||
recompute_if_needed self;
|
|
||||||
let timeout =
|
|
||||||
match next_deadline_ self with
|
match next_deadline_ self with
|
||||||
| None -> 30.
|
| None -> 30_000_000_000L
|
||||||
| Some d -> max 0. (d -. now_ ())
|
| Some d -> Int64.max 0L (Int64.sub d (now_ns ()))
|
||||||
in
|
|
||||||
timeout, self.sub_r, self.sub_w
|
|
||||||
in
|
in
|
||||||
|
|
||||||
(* enter [select] *)
|
(* process all queued tasks.
|
||||||
Atomic.set self.in_select true;
|
|
||||||
let r_reads, r_writes, _ =
|
NOTE: race condition: if another thread queues tasks after we do
|
||||||
|
the transfer, it will call [wakeup_from_outside] and make the pipe_rd FD
|
||||||
|
readable. So as soon as we call [poll], it will return and we will find
|
||||||
|
the queued tasks waiting for us. *)
|
||||||
|
let local_q = Queue.create () in
|
||||||
|
Sync_queue.transfer self.queued_tasks local_q;
|
||||||
|
while not (Queue.is_empty local_q) do
|
||||||
|
let t = Queue.pop local_q in
|
||||||
|
Run_now_.perform_task_ self t
|
||||||
|
done;
|
||||||
|
|
||||||
|
Atomic.set self.in_poll true;
|
||||||
|
|
||||||
|
(* enter [poll] *)
|
||||||
|
let num_ready_fds =
|
||||||
let@ _sp =
|
let@ _sp =
|
||||||
Trace_.with_span ~__FILE__ ~__LINE__ "select" ~data:(fun () ->
|
Trace_.with_span ~__FILE__ ~__LINE__ "poll" ~data:(fun () ->
|
||||||
[
|
[ "timeout", `Float (ns_to_s timeout_ns); "len", `Int self.len ])
|
||||||
"timeout", `Float timeout;
|
|
||||||
"reads", `Int (List.length sub_r);
|
|
||||||
"writes", `Int (List.length sub_w);
|
|
||||||
])
|
|
||||||
in
|
in
|
||||||
Unix.select (self.wakeup_rd :: sub_r) sub_w [] timeout
|
P.ppoll_or_poll self.poll self.len (Nanoseconds timeout_ns)
|
||||||
in
|
in
|
||||||
Atomic.set self.in_select false;
|
|
||||||
|
|
||||||
(* drain pipe *)
|
Atomic.set self.in_poll false;
|
||||||
|
|
||||||
|
(* drain notification pipe *)
|
||||||
if Atomic.exchange self.wakeup_triggered false then (
|
if Atomic.exchange self.wakeup_triggered false then (
|
||||||
let b1 = Bytes.create 1 in
|
let b1 = Bytes.create 8 in
|
||||||
while try Unix.read self.wakeup_rd b1 0 1 > 0 with _ -> false do
|
while try Unix.read self.wakeup_rd b1 0 8 > 0 with _ -> false do
|
||||||
()
|
()
|
||||||
done
|
done
|
||||||
);
|
);
|
||||||
|
|
||||||
(* gather the [per_fd] that are ready *)
|
|
||||||
let ready_r = ref [] in
|
|
||||||
let ready_w = ref [] in
|
|
||||||
|
|
||||||
(* gather the [per_fd] that have updates *)
|
|
||||||
(let@ self = with_lock_ self in
|
|
||||||
if r_reads != [] || r_writes != [] then self.sub_up_to_date <- false;
|
|
||||||
|
|
||||||
List.iter
|
|
||||||
(fun fd ->
|
|
||||||
if fd != self.wakeup_rd then (
|
|
||||||
let per_fd = Hashtbl.find self.fds fd in
|
|
||||||
ready_r := per_fd :: !ready_r
|
|
||||||
))
|
|
||||||
r_reads;
|
|
||||||
List.iter
|
|
||||||
(fun fd ->
|
|
||||||
let per_fd = Hashtbl.find self.fds fd in
|
|
||||||
ready_w := per_fd :: !ready_w)
|
|
||||||
r_writes);
|
|
||||||
|
|
||||||
(* call callbacks *)
|
(* call callbacks *)
|
||||||
List.iter
|
P.iter_ready self.poll num_ready_fds (fun _idx fd flags ->
|
||||||
(fun fd ->
|
if fd <> self.wakeup_rd then (
|
||||||
perform_cbs ~closed:false fd.r;
|
let fd_data =
|
||||||
fd.r <- Nil)
|
try Fd_tbl.find self.fds fd with Not_found -> assert false
|
||||||
!ready_r;
|
in
|
||||||
List.iter
|
|
||||||
(fun fd ->
|
if Flags.mem Flags.pollin flags then (
|
||||||
perform_cbs ~closed:false fd.w;
|
let r = fd_data.r in
|
||||||
fd.w <- Nil)
|
fd_data.r <- Nil;
|
||||||
!ready_w;
|
perform_cbs ~closed:false r
|
||||||
|
);
|
||||||
|
if Flags.mem Flags.pollout flags then (
|
||||||
|
let w = fd_data.w in
|
||||||
|
fd_data.w <- Nil;
|
||||||
|
perform_cbs ~closed:false w
|
||||||
|
);
|
||||||
|
|
||||||
|
if Flags.empty = fd_flags fd_data then Run_now_.remove_fd_ self fd_data
|
||||||
|
));
|
||||||
|
|
||||||
()
|
()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,4 +5,6 @@ include module type of struct
|
||||||
end
|
end
|
||||||
|
|
||||||
val create : unit -> t
|
val create : unit -> t
|
||||||
val create_with : Iomux.Poll.t -> t
|
(** Create a new nanoev loop using [Iomux] (poll/ppoll).
|
||||||
|
|
||||||
|
{b NOTE}: this is NOT thread-safe *)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue