wip: posix

This commit is contained in:
Simon Cruanes 2025-04-30 22:16:19 -04:00
parent 7a0c3e1279
commit 3125f3274b
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
3 changed files with 113 additions and 41 deletions

32
nanoev-posix.opam Normal file
View file

@ -0,0 +1,32 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
synopsis: "Use mtime+iomux (posix compliant) as a backend for nanoev"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]
license: "MIT"
tags: ["unix" "select" "async" "iomux" "nanoev"]
homepage: "https://github.com/c-cube/nanoev"
bug-reports: "https://github.com/c-cube/nanoev/issues"
depends: [
"ocaml"
"dune" {>= "2.7"}
"base-unix"
"iomux"
"mtime" {>= "2.0"}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/c-cube/nanoev.git"

View file

@ -3,4 +3,4 @@
(public_name nanoev-posix) (public_name nanoev-posix)
(synopsis "posix backend (poll/ppoll+mtime)") (synopsis "posix backend (poll/ppoll+mtime)")
(private_modules heap) (private_modules heap)
(libraries nanoev unix iomux mtime mtime.clock.os)) (libraries threads nanoev unix iomux mtime mtime.clock.os))

View file

@ -18,11 +18,30 @@ module P = Iomux.Poll
module Flags = P.Flags module Flags = P.Flags
open Iomux.Util open Iomux.Util
(* TODO: remove
module Sync_queue = struct
type 'a t = {
q: 'a Queue.t;
mutex: Mutex.t;
}
let create () : _ t = { q = Queue.create (); mutex = Mutex.create () }
let push (self : _ t) x : unit =
Mutex.lock self.mutex;
Queue.push x self.q;
Mutex.unlock self.mutex
end
*)
(** Callback list *) (** Callback list *)
type cbs = type cbs =
| Nil | Nil
| Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs | Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs
(** Single callback *)
type cb = Cb : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) -> cb
let[@inline] cb_is_empty = function let[@inline] cb_is_empty = function
| Nil -> true | Nil -> true
| Sub _ -> false | Sub _ -> false
@ -38,25 +57,40 @@ type timer_ev =
type per_fd = { type per_fd = {
fd: Unix.file_descr; fd: Unix.file_descr;
idx: int; (** Index in the buffer *) mutable idx: int;
(** Index in the buffer. Can change because we swap FDs sometimes to
remove items. *)
mutable r: cbs; mutable r: cbs;
mutable w: cbs; mutable w: cbs;
} }
let[@inline] per_fd_flags (self : per_fd) : Flags.t =
let fl = ref Flags.empty in
(if self.r <> Nil then fl := Flags.(!fl + pollin));
(if self.w <> Nil then fl := Flags.(!fl + pollout));
!fl
type queued_task =
| Q_timer of timer_ev
| Q_on_readable of Unix.file_descr * cb
| Q_on_writable of Unix.file_descr * cb
| Q_clear
type st = { type st = {
timer: timer_ev Heap.t; timer: timer_ev Heap.t;
fds: per_fd Fd_tbl.t; fds: per_fd Fd_tbl.t;
poll: P.t; poll: P.t;
mutable cur_idx: int; (** index in the [poll] buffer *) mutable len: int; (** length of the active prefix of the [poll] buffer *)
mutable idx_freelist: int list;
(** previously used indices that are recycled *)
wakeup_rd: Unix.file_descr; wakeup_rd: Unix.file_descr;
wakeup_wr: Unix.file_descr; wakeup_wr: Unix.file_descr;
wakeup_triggered: bool Atomic.t; wakeup_triggered: bool Atomic.t;
(** Make [wakeup_from_outside] idempotent within an iteration of [step] *) (** Make [wakeup_from_outside] idempotent within an iteration of [step] *)
in_poll: bool Atomic.t; in_poll: Thread.t option Atomic.t;
(** Are we currently inside a call to [poll]? Useful for other threads to (** Are we currently inside a call to [poll], and in which thread? Useful
know whether to wake us up via the pipe *) for other threads to know whether to wake us up via the pipe *)
queued_tasks: queued_task Queue.t;
(** While in [poll()], changes get queued, so we don't invalidate the poll
buffer before the syscall returns *)
lock: Mutex.t; lock: Mutex.t;
} }
@ -72,23 +106,7 @@ let rec perform_cbs_closed ~closed = function
f ~closed x y; f ~closed x y;
perform_cbs_closed ~closed tail perform_cbs_closed ~closed tail
let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline let[@inline] leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline
let create_st () : st =
let wakeup_rd, wakeup_wr = Unix.pipe () in
Unix.set_nonblock wakeup_rd;
{
timer = Heap.create ~leq:leq_timer ();
fds = Fd_tbl.create 16;
poll = P.create ();
cur_idx = 0;
idx_freelist = [];
wakeup_rd;
wakeup_wr;
wakeup_triggered = Atomic.make false;
in_poll = Atomic.make false;
lock = Mutex.create ();
}
let[@inline] with_lock_ (self : st) f = let[@inline] with_lock_ (self : st) f =
Mutex.lock self.lock; Mutex.lock self.lock;
@ -100,6 +118,24 @@ let[@inline] with_lock_ (self : st) f =
Mutex.unlock self.lock; Mutex.unlock self.lock;
res res
let create_st () : st =
let wakeup_rd, wakeup_wr = Unix.pipe () in
Unix.set_nonblock wakeup_rd;
{
timer = Heap.create ~leq:leq_timer ();
fds = Fd_tbl.create 16;
poll = P.create ();
len = 0;
wakeup_rd;
wakeup_wr;
wakeup_triggered = Atomic.make false;
in_poll = Atomic.make None;
queued_tasks = Queue.create ();
lock = Mutex.create ();
}
let max_fds (self : st) : int = P.maxfds self.poll
let clear (self : st) = let clear (self : st) =
let@ self = with_lock_ self in let@ self = with_lock_ self in
Heap.clear self.timer; Heap.clear self.timer;
@ -107,8 +143,7 @@ let clear (self : st) =
for i = 0 to P.maxfds self.poll - 1 do for i = 0 to P.maxfds self.poll - 1 do
P.set_index self.poll i P.invalid_fd Flags.empty P.set_index self.poll i P.invalid_fd Flags.empty
done; done;
self.cur_idx <- 0; self.len <- 0;
self.idx_freelist <- [];
() ()
let wakeup_from_outside (self : st) : unit = let wakeup_from_outside (self : st) : unit =
@ -124,36 +159,40 @@ let get_fd_ (self : st) fd : per_fd =
| per_fd -> per_fd | per_fd -> per_fd
| exception Not_found -> | exception Not_found ->
let idx = let idx =
match self.idx_freelist with if self.len = P.maxfds self.poll then
| i :: tl ->
self.idx_freelist <- tl;
i
| [] ->
if self.cur_idx = P.maxfds self.poll then
invalid_arg "No available slot in poll"; invalid_arg "No available slot in poll";
let n = self.cur_idx in let n = self.len in
self.cur_idx <- self.cur_idx + 1; self.len <- self.len + 1;
n n
in in
let per_fd = { idx; fd; r = Nil; w = Nil } in let per_fd = { idx; fd; r = Nil; w = Nil } in
Fd_tbl.add self.fds fd per_fd; Fd_tbl.add self.fds fd per_fd;
per_fd per_fd
let close self fd : unit = let close_ (self : st) fd : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in
let r, w = let r, w =
let@ self = with_lock_ self in let@ self = with_lock_ self in
match Fd_tbl.find self.fds fd with match Fd_tbl.find self.fds fd with
| per_fd -> | per_fd ->
Fd_tbl.remove self.fds fd; Fd_tbl.remove self.fds fd;
self.idx_freelist <- per_fd.idx :: self.idx_freelist;
if Atomic.get self.in_poll then wakeup_from_outside self; (* not the last element, move the last element here *)
if per_fd.idx + 1 < self.len then (
let last_fd = P.get_fd self.poll (self.len - 1) in
match Fd_tbl.find_opt self.fds last_fd with
| None -> ()
| Some last_per_fd ->
last_per_fd.idx <- per_fd.idx;
P.set_index self.poll per_fd.idx last_fd (per_fd_flags last_per_fd)
);
self.len <- self.len - 1;
per_fd.r, per_fd.w per_fd.r, per_fd.w
| exception Not_found -> | exception Not_found ->
invalid_arg "File descriptor is not known to Nanoev" invalid_arg "File descriptor is not known to Nanoev"
in in
(* call callbacks outside of the lock *) (* call callbacks outside of the lock's critical section *)
perform_cbs_closed ~closed:true r; perform_cbs_closed ~closed:true r;
perform_cbs_closed ~closed:true w; perform_cbs_closed ~closed:true w;
() ()
@ -282,6 +321,7 @@ let ops : st Nanoev.Impl.ops =
on_readable; on_readable;
on_writable; on_writable;
run_after_s; run_after_s;
max_fds;
wakeup_from_outside; wakeup_from_outside;
clear; clear;
} }