Merge pull request #2 from c-cube/simon/posix-iomux

poll/ppoll backend using iomux+mtime
This commit is contained in:
Simon Cruanes 2025-05-02 13:52:51 -04:00 committed by GitHub
commit a14280c1a8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
43 changed files with 1718 additions and 250 deletions

View file

@ -56,6 +56,6 @@ jobs:
dune-cache: true dune-cache: true
allow-prerelease-opam: true allow-prerelease-opam: true
- run: opam install ocamlformat.0.26.2 - run: opam install ocamlformat.0.27.0
- run: opam exec -- make format-check - run: opam exec -- make format-check

View file

@ -1,4 +1,4 @@
version = 0.26.2 version = 0.27.0
profile=conventional profile=conventional
margin=80 margin=80
if-then-else=k-r if-then-else=k-r

View file

@ -20,21 +20,64 @@
(synopsis "Tiny event loop abstraction") (synopsis "Tiny event loop abstraction")
(depends ocaml dune base-unix) (depends ocaml dune base-unix)
(depopts (depopts
(trace (>= 0.7)) (trace
(picos (>= 0.7)))
(and (>= 0.5) (< 0.7))))
(tags (tags
(unix select async))) (unix select async)))
(package
(name nanoev-picos)
(synopsis "Use nanoev from picos")
(depends
ocaml
dune
base-unix
(nanoev
(= :version))
(iostream
(>= 0.3))
(picos
(and
(>= 0.5)
(< 0.7)))
(picos_std
(and
(>= 0.5)
(< 0.7))))
(tags
(unix select async)))
(package
(name nanoev-posix)
(synopsis "Use mtime+iomux (posix compliant) as a backend for nanoev")
(depends
ocaml
dune
base-unix
iomux
(nanoev (= :version))
(nanoev-picos (= :version))
(mtime
(>= 2.0))
(moonpool :with-test)
(trace :with-test)
(trace-tef :with-test))
(tags
(unix select async iomux nanoev)))
(package (package
(name nanoev_tiny_httpd) (name nanoev_tiny_httpd)
(synopsis "Use nanoev as a basis for tiny_httpd") (synopsis "Use nanoev as a basis for tiny_httpd")
(depends (depends
ocaml ocaml
dune dune
nanoev (nanoev (= :version))
(nanoev-picos (= :version))
picos picos
(tiny_httpd (>= 0.17))) picos_std
(tags (nanoev http))) (tiny_httpd
(>= 0.17)))
(tags
(nanoev http)))
; See the complete stanza docs at https://dune.readthedocs.io/en/stable/reference/dune-project/index.html ; See the complete stanza docs at https://dune.readthedocs.io/en/stable/reference/dune-project/index.html

View file

@ -1,4 +1,4 @@
(executable (executable
(name echo) (name echo)
(libraries nanoev nanoev.unix moonpool moonpool.fib trace trace-tef (libraries nanoev nanoev.unix nanoev-posix moonpool moonpool.fib trace
nanoev_tiny_httpd)) trace-tef nanoev_tiny_httpd))

View file

@ -79,33 +79,71 @@ let setup_logging () =
Logs.set_reporter @@ Logs.format_reporter (); Logs.set_reporter @@ Logs.format_reporter ();
Logs.set_level ~all:true (Some Logs.Debug) Logs.set_level ~all:true (Some Logs.Debug)
let emit_metrics_ pool server () =
while true do
Trace.counter_int ~level:Info "pool.tasks" (Moonpool.Runner.num_tasks pool);
Trace.counter_int ~level:Info "http.active-conns"
(Server.active_connections server);
Thread.delay 0.3
done
let () = let () =
Trace.set_current_level Info;
let@ () = Trace_tef.with_setup () in let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "main"; Trace.set_thread_name "main";
let port_ = ref 8080 in let port_ = ref 8080 in
let max_conn = ref 1024 in let max_conn = ref 1024 in
let j = ref 8 in let j = ref 8 in
let backend = ref `Posix in
let buf_size = ref 4096 in
let max_buf_pool_size = ref None in
let set_backend = function
| "posix" | "poll" | "default" -> backend := `Posix
| "unix" | "select" -> backend := `Unix
| s -> failwith @@ Printf.sprintf "unknown backend %S" s
in
Arg.parse Arg.parse
(Arg.align (Arg.align
[ [
"--port", Arg.Set_int port_, " set port"; "--port", Arg.Set_int port_, " set port";
"-p", Arg.Set_int port_, " set port"; "-p", Arg.Set_int port_, " set port";
"-j", Arg.Set_int j, " number of threads"; "-j", Arg.Set_int j, " number of threads";
( "--max-buf-pool-size",
Arg.Int (fun i -> max_buf_pool_size := Some i),
" max buffer pool size" );
"--buf-size", Arg.Set_int buf_size, " buffer size";
"--debug", Arg.Unit setup_logging, " enable debug"; "--debug", Arg.Unit setup_logging, " enable debug";
"--max-conns", Arg.Set_int max_conn, " maximum concurrent connections"; "--max-conns", Arg.Set_int max_conn, " maximum concurrent connections";
( "--backend",
Arg.Symbol
([ "posix"; "default"; "unix"; "select"; "poll" ], set_backend),
" event loop backend" );
]) ])
(fun _ -> raise (Arg.Bad "")) (fun _ -> raise (Arg.Bad ""))
"echo [option]*"; "echo [option]*";
let@ pool = Moonpool.Ws_pool.with_ ~num_threads:!j () in let@ pool =
let@ _runner = Moonpool_fib.main in fun yield ->
if !j > 1 then
let@ pool = Moonpool.Ws_pool.with_ ~num_threads:!j () in
let@ _runner = Moonpool_fib.main in
yield pool
else
Moonpool_fib.main yield
in
let ev = Nanoev_unix.create () in let ev =
Nanoev_picos.setup_bg_thread ev; match !backend with
| `Posix -> Nanoev_posix.create ()
| `Unix -> Nanoev_unix.create ()
in
let@ () = Nanoev_picos.Background_thread.with_setup ev in
let server = let server =
Nanoev_tiny_httpd.create ~new_thread:(Moonpool.run_async pool) ~port:!port_ Nanoev_tiny_httpd.create ~new_thread:(Moonpool.run_async pool) ~port:!port_
?max_buf_pool_size:!max_buf_pool_size ~buf_size:!buf_size
~max_connections:!max_conn () ~max_connections:!max_conn ()
in in
@ -273,8 +311,14 @@ let () =
let s = to_string_top h in let s = to_string_top h in
Response.make_string ~headers:[ "content-type", "text/html" ] @@ Ok s); Response.make_string ~headers:[ "content-type", "text/html" ] @@ Ok s);
Printf.printf "listening on http://%s:%d\n%!" (Server.addr server) Printf.printf
(Server.port server); "listening on http://%s:%d with %d threads, %d max connections, %d max fds\n\
%!"
(Server.addr server) (Server.port server) !j !max_conn (Nanoev.max_fds ev);
if Trace.enabled () then
ignore (Thread.create (emit_metrics_ pool server) () : Thread.t);
match Server.run server with match Server.run server with
| Ok () -> () | Ok () -> ()
| Error e -> raise e | Error e -> raise e

34
nanoev-picos.opam Normal file
View file

@ -0,0 +1,34 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
synopsis: "Use nanoev from picos"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]
license: "MIT"
tags: ["unix" "select" "async"]
homepage: "https://github.com/c-cube/nanoev"
bug-reports: "https://github.com/c-cube/nanoev/issues"
depends: [
"ocaml"
"dune" {>= "2.7"}
"base-unix"
"nanoev" {= version}
"iostream" {>= "0.3"}
"picos" {>= "0.5" & < "0.7"}
"picos_std" {>= "0.5" & < "0.7"}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/c-cube/nanoev.git"

37
nanoev-posix.opam Normal file
View file

@ -0,0 +1,37 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
synopsis: "Use mtime+iomux (posix compliant) as a backend for nanoev"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]
license: "MIT"
tags: ["unix" "select" "async" "iomux" "nanoev"]
homepage: "https://github.com/c-cube/nanoev"
bug-reports: "https://github.com/c-cube/nanoev/issues"
depends: [
"ocaml"
"dune" {>= "2.7"}
"base-unix"
"iomux"
"nanoev" {= version}
"nanoev-picos" {= version}
"mtime" {>= "2.0"}
"moonpool" {with-test}
"trace" {with-test}
"trace-tef" {with-test}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/c-cube/nanoev.git"

View file

@ -15,7 +15,6 @@ depends: [
] ]
depopts: [ depopts: [
"trace" {>= "0.7"} "trace" {>= "0.7"}
"picos" {>= "0.5" & < "0.7"}
] ]
build: [ build: [
["dune" "subst"] {dev} ["dune" "subst"] {dev}

View file

@ -10,8 +10,10 @@ bug-reports: "https://github.com/c-cube/nanoev/issues"
depends: [ depends: [
"ocaml" "ocaml"
"dune" {>= "2.7"} "dune" {>= "2.7"}
"nanoev" "nanoev" {= version}
"nanoev-picos" {= version}
"picos" "picos"
"picos_std"
"tiny_httpd" {>= "0.17"} "tiny_httpd" {>= "0.17"}
"odoc" {with-doc} "odoc" {with-doc}
] ]

View file

@ -7,6 +7,7 @@ module Impl = struct
clear: 'st -> unit; clear: 'st -> unit;
wakeup_from_outside: 'st -> unit; wakeup_from_outside: 'st -> unit;
close: 'st -> Unix.file_descr -> unit; close: 'st -> Unix.file_descr -> unit;
max_fds: 'st -> int;
on_readable: on_readable:
'a 'b. 'a 'b.
'st -> 'st ->
@ -39,6 +40,7 @@ type t = Impl.t
let[@inline] clear (Ev (ops, st)) = ops.clear st let[@inline] clear (Ev (ops, st)) = ops.clear st
let[@inline] wakeup_from_outside (Ev (ops, st)) = ops.wakeup_from_outside st let[@inline] wakeup_from_outside (Ev (ops, st)) = ops.wakeup_from_outside st
let[@inline] close (Ev (ops, st)) fd = ops.close st fd let[@inline] close (Ev (ops, st)) fd = ops.close st fd
let[@inline] max_fds (Ev (ops, st)) = ops.max_fds st
let[@inline] on_readable (Ev (ops, st)) fd x y f : unit = let[@inline] on_readable (Ev (ops, st)) fd x y f : unit =
ops.on_readable st fd x y f ops.on_readable st fd x y f

View file

@ -9,6 +9,7 @@ module Impl : sig
clear: 'st -> unit; clear: 'st -> unit;
wakeup_from_outside: 'st -> unit; wakeup_from_outside: 'st -> unit;
close: 'st -> Unix.file_descr -> unit; close: 'st -> Unix.file_descr -> unit;
max_fds: 'st -> int;
on_readable: on_readable:
'a 'b. 'a 'b.
'st -> 'st ->
@ -43,6 +44,9 @@ val step : t -> unit
val close : t -> Unix.file_descr -> unit val close : t -> Unix.file_descr -> unit
(** Close the file descriptor and clean it up *) (** Close the file descriptor and clean it up *)
val max_fds : t -> int
(** Maximum number of file descriptors that can be observed at once. *)
val on_readable : val on_readable :
t -> Unix.file_descr -> 'a -> 'b -> (closed:bool -> 'a -> 'b -> unit) -> unit t -> Unix.file_descr -> 'a -> 'b -> (closed:bool -> 'a -> 'b -> unit) -> unit

152
src/picos/IO_in.ml Normal file
View file

@ -0,0 +1,152 @@
open Common_
class type t = object
method input : bytes -> int -> int -> int
(** Read into the slice. Returns [0] only if the stream is closed. *)
method close : unit -> unit
(** Close the input. Must be idempotent. *)
end
let create ?(close = ignore) ~input () : t =
object
method close = close
method input = input
end
let empty : t =
object
method close () = ()
method input _ _ _ = 0
end
let of_bytes ?(off = 0) ?len (b : bytes) : t =
(* i: current position in [b] *)
let i = ref off in
let len =
match len with
| Some n ->
if n > Bytes.length b - off then invalid_arg "Iostream.In.of_bytes";
n
| None -> Bytes.length b - off
in
let end_ = off + len in
object
method input b_out i_out len_out =
let n = min (end_ - !i) len_out in
Bytes.blit b !i b_out i_out n;
i := !i + n;
n
method close () = i := end_
end
let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s)
(** Read into the given slice.
@return the number of bytes read, [0] means end of input. *)
let[@inline] input (self : #t) buf i len = self#input buf i len
(** Close the channel. *)
let[@inline] close self : unit = self#close ()
let rec really_input (self : #t) buf i len =
if len > 0 then (
let n = input self buf i len in
if n = 0 then raise End_of_file;
(really_input [@tailrec]) self buf (i + n) (len - n)
)
let really_input_string self n : string =
let buf = Bytes.create n in
really_input self buf 0 n;
Bytes.unsafe_to_string buf
let copy_into ?(buf = Bytes.create _default_buf_size) (ic : #t) (oc : IO_out.t)
: unit =
let continue = ref true in
while !continue do
let len = input ic buf 0 (Bytes.length buf) in
if len = 0 then
continue := false
else
IO_out.output oc buf 0 len
done
let concat (l0 : t list) : t =
let l = ref l0 in
let rec input b i len : int =
match !l with
| [] -> 0
| ic :: tl ->
let n = ic#input b i len in
if n > 0 then
n
else (
l := tl;
input b i len
)
in
let close () = List.iter close l0 in
create ~close ~input ()
let input_all ?(buf = Bytes.create 128) (self : #t) : string =
let buf = ref buf in
let i = ref 0 in
let[@inline] full_ () = !i = Bytes.length !buf in
let grow_ () =
let old_size = Bytes.length !buf in
let new_size = min Sys.max_string_length (old_size + (old_size / 4) + 10) in
if old_size = new_size then
failwith "input_all: maximum input size exceeded";
let new_buf = Bytes.extend !buf 0 (new_size - old_size) in
buf := new_buf
in
let rec loop () =
if full_ () then grow_ ();
let available = Bytes.length !buf - !i in
let n = input self !buf !i available in
if n > 0 then (
i := !i + n;
(loop [@tailrec]) ()
)
in
loop ();
if full_ () then
Bytes.unsafe_to_string !buf
else
Bytes.sub_string !buf 0 !i
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size)
(fd : Unix.file_descr) : t =
let buf_len = ref 0 in
let buf_off = ref 0 in
let refill () =
buf_off := 0;
buf_len := Base.read fd buf 0 (Bytes.length buf)
in
object
method input b i len : int =
if !buf_len = 0 then refill ();
let n = min len !buf_len in
if n > 0 then (
Bytes.blit buf !buf_off b i n;
buf_off := !buf_off + n;
buf_len := !buf_len - n
);
n
method close () =
if close_noerr then (
try Unix.close fd with _ -> ()
) else
Unix.close fd
end

118
src/picos/IO_out.ml Normal file
View file

@ -0,0 +1,118 @@
open Common_
class type t = object
method output_char : char -> unit
method output : bytes -> int -> int -> unit
method flush : unit -> unit
method close : unit -> unit
end
let create ?(flush = ignore) ?(close = ignore) ~output_char ~output () : t =
object
method flush () = flush ()
method close () = close ()
method output_char c = output_char c
method output bs i len = output bs i len
end
let dummy : t =
object
method flush () = ()
method close () = ()
method output_char _ = ()
method output _ _ _ = ()
end
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) fd
: t =
let buf_off = ref 0 in
let[@inline] is_full () = !buf_off = Bytes.length buf in
let flush () =
if !buf_off > 0 then (
Base.write fd buf 0 !buf_off;
buf_off := 0
)
in
object
method output_char c =
if is_full () then flush ();
Bytes.set buf !buf_off c;
incr buf_off
method output bs i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
(* make space *)
if is_full () then flush ();
let n = min !len (Bytes.length buf - !buf_off) in
Bytes.blit bs !i buf !buf_off n;
buf_off := !buf_off + n;
i := !i + n;
len := !len - n
done;
(* if full, write eagerly *)
if is_full () then flush ()
method close () =
if close_noerr then (
try
flush ();
Unix.close fd
with _ -> ()
) else (
flush ();
Unix.close fd
)
method flush = flush
end
let of_buffer (buf : Buffer.t) : t =
object
method close () = ()
method flush () = ()
method output_char c = Buffer.add_char buf c
method output bs i len = Buffer.add_subbytes buf bs i len
end
(** Output the buffer slice into this channel *)
let[@inline] output_char (self : #t) c : unit = self#output_char c
(** Output the buffer slice into this channel *)
let[@inline] output (self : #t) buf i len : unit = self#output buf i len
let[@inline] output_string (self : #t) (str : string) : unit =
self#output (Bytes.unsafe_of_string str) 0 (String.length str)
let output_line (self : #t) (str : string) : unit =
output_string self str;
output_char self '\n'
(** Close the channel. *)
let[@inline] close self : unit = self#close ()
(** Flush (ie. force write) any buffered bytes. *)
let[@inline] flush self : unit = self#flush ()
let output_int self i =
let s = string_of_int i in
output_string self s
let output_lines self seq = Seq.iter (output_line self) seq
let tee (l : t list) : t =
match l with
| [] -> dummy
| [ oc ] -> oc
| _ ->
let output bs i len = List.iter (fun oc -> output oc bs i len) l in
let output_char c = List.iter (fun oc -> output_char oc c) l in
let close () = List.iter close l in
let flush () = List.iter flush l in
create ~flush ~close ~output ~output_char ()

View file

@ -0,0 +1,7 @@
let is_setup = Global_.has_bg_thread
let setup = Global_.setup_bg_thread
let shutdown = Global_.shutdown_bg_thread
let with_setup ev f =
setup ev;
Fun.protect ~finally:shutdown f

View file

@ -0,0 +1,10 @@
val setup : Nanoev.t -> unit
(** Install this event loop in a background thread *)
val shutdown : unit -> unit
(** Shutdown background thread, assuming {! is_setup} returns [true] *)
val with_setup : Nanoev.t -> (unit -> 'a) -> 'a
val is_setup : unit -> bool
(** [is_setup()] is [true] iff a background thread is running a nanoev loop *)

104
src/picos/base.ml Normal file
View file

@ -0,0 +1,104 @@
open Common_
let[@inline] get_loop_exn_ () : Nanoev.t =
match Atomic.get Global_.st with
| None -> failwith "No nanoev loop installed."
| Some st -> st.nanoev
let[@inline] unwrap_ = function
| None -> ()
| Some (exn, bt) -> Printexc.raise_with_backtrace exn bt
let[@unroll 1] rec retry_read_ fd f =
match f () with
| res -> res
| exception
Unix.Unix_error
( ( Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR | Unix.EINPROGRESS
| Unix.ECONNRESET ),
_,
_ ) ->
(* Trace_.message "read must wait"; *)
let trigger = Picos.Trigger.create () in
let closed_r = ref false in
let ev = get_loop_exn_ () in
Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r ->
closed_r := closed;
Picos.Trigger.signal trigger);
Picos.Trigger.await trigger |> unwrap_;
if !closed_r then raise Closed;
retry_read_ fd f
let[@unroll 1] rec retry_write_ fd f =
match f () with
| res -> res
| exception
Unix.Unix_error
( ( Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR | Unix.EINPROGRESS
| Unix.ECONNRESET ),
_,
_ ) ->
(* Trace_.message "write must wait"; *)
let ev = get_loop_exn_ () in
let trigger = Picos.Trigger.create () in
let closed_r = ref false in
Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r ->
closed_r := closed;
Picos.Trigger.signal trigger);
Picos.Trigger.await trigger |> unwrap_;
if !closed_r then raise Closed;
retry_write_ fd f
let read fd buf i len : int =
try
retry_read_ fd (fun () ->
(* Trace_.message "read"; *)
Unix.read fd buf i len)
with Closed -> 0
let close fd =
Unix.close fd;
let ev = get_loop_exn_ () in
Nanoev.close ev fd
let accept fd =
try
retry_read_ fd (fun () ->
(* Trace_.message "accept"; *)
Unix.accept fd)
with Unix.Unix_error ((Unix.ESHUTDOWN | Unix.ECONNABORTED), _, _) ->
raise Closed
let write_once fd buf i len : int =
try
retry_write_ fd (fun () ->
(* Trace_.message "write"; *)
Unix.write fd buf i len)
with Closed -> 0
let rec write fd buf i len =
if len > 0 then (
let n = write_once fd buf i len in
if n < len then write fd buf (i + n) (len - n)
)
let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr)
let[@inline] max_fds () =
match Atomic.get Global_.st with
| None -> 1024
| Some st -> Nanoev.max_fds st.nanoev
let sleep t =
if t > 0. then (
let ev = get_loop_exn_ () in
let trigger = Picos.Trigger.create () in
Nanoev.run_after_s ev t trigger () (fun trigger () ->
Picos.Trigger.signal trigger);
Picos.Trigger.await trigger |> unwrap_
)
module Raw = struct
let retry_read = retry_read_
let retry_write = retry_write_
end

37
src/picos/base.mli Normal file
View file

@ -0,0 +1,37 @@
val read : Unix.file_descr -> bytes -> int -> int -> int
(** Read from the non blocking FD.
@raise Nanoev.Closed if the FD is closed
@raise Unix.Unix_error for other errors *)
val write_once : Unix.file_descr -> bytes -> int -> int -> int
(** Write into the non blocking FD.
@raise Nanoev.Closed if the FD is closed
@raise Unix.Unix_error for other errors *)
val write : Unix.file_descr -> bytes -> int -> int -> unit
val close : Unix.file_descr -> unit
(** Close the file descriptor
@raise Unix.Unix_error when it fails *)
val connect : Unix.file_descr -> Unix.sockaddr -> unit
(** Connect this FD to the remote address.
@raise Nanoev.Closed if the FD is closed.
@raise Unix.Unix_error for other errors *)
val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr
(** Accept a connection on this fd.
@raise Nanoev.Closed if the FD is closed.
@raise Unix.Unix_error for other errors *)
val max_fds : unit -> int
(** Maximum number of file descriptors one can await on. See {!Nanoev.max_fds}
*)
val sleep : float -> unit
(** Suspend current fiber for [n] seconds *)
module Raw : sig
val retry_read : Unix.file_descr -> (unit -> 'a) -> 'a
val retry_write : Unix.file_descr -> (unit -> 'a) -> 'a
end

6
src/picos/common_.ml Normal file
View file

@ -0,0 +1,6 @@
module Trace_ = Nanoev.Trace_
let ( let@ ) = ( @@ )
let _default_buf_size = 4 * 1024
exception Closed = Nanoev.Closed

View file

@ -1,5 +1,4 @@
(library (library
(name nanoev_picos) (name nanoev_picos)
(public_name nanoev.picos) (public_name nanoev-picos)
(optional) ; picos (libraries threads picos picos_std.sync iostream nanoev))
(libraries threads picos nanoev))

59
src/picos/global_.ml Normal file
View file

@ -0,0 +1,59 @@
open Common_
type st =
| None
| Some of {
active: bool Atomic.t;
nanoev: Nanoev.t;
th: Thread.t;
}
let st : st Atomic.t = Atomic.make None
let lock = Mutex.create ()
let with_lock lock f =
Mutex.lock lock;
match f () with
| exception e ->
Mutex.unlock lock;
raise e
| x ->
Mutex.unlock lock;
x
let bg_thread_ ~active ~evloop () : unit =
Trace_.set_thread_name "nanoev.picos.bg-thread";
while Atomic.get active do
Nanoev.step evloop
done
let[@inline] has_bg_thread () = Atomic.get st <> None
let setup_bg_thread (ev : Nanoev.t) : unit =
let@ () = with_lock lock in
(* shutdown existing thread, if any *)
(match Atomic.get st with
| Some st ->
Atomic.set st.active false;
Nanoev.wakeup_from_outside st.nanoev;
Thread.join st.th
| None -> ());
(* start new bg thread *)
let active = Atomic.make true in
Atomic.set st
@@ Some
{
active;
nanoev = ev;
th = Thread.create (bg_thread_ ~active ~evloop:ev) ();
}
let shutdown_bg_thread () =
let@ () = with_lock lock in
match Atomic.exchange st None with
| None -> ()
| Some st ->
Atomic.set st.active false;
Nanoev.wakeup_from_outside st.nanoev;
Thread.join st.th

View file

@ -1,146 +1,7 @@
open struct module Background_thread = Background_thread
module Trace_ = Nanoev.Trace_ module Base = Base
include Base
let ( let@ ) = ( @@ ) module IO_in = IO_in
end module IO_out = IO_out
module Net_client = Net_client
exception Closed = Nanoev.Closed module Net_server = Net_server
module Global_ = struct
type st =
| None
| Some of {
active: bool Atomic.t;
nanoev: Nanoev.t;
th: Thread.t;
}
let st : st Atomic.t = Atomic.make None
let lock = Mutex.create ()
let with_lock lock f =
Mutex.lock lock;
match f () with
| exception e ->
Mutex.unlock lock;
raise e
| x ->
Mutex.unlock lock;
x
let bg_thread_ ~active ~evloop () : unit =
Trace_.set_thread_name "nanoev.picos.bg-thread";
while Atomic.get active do
Nanoev.step evloop
done
let[@inline] has_bg_thread () = Atomic.get st <> None
let setup_bg_thread (ev : Nanoev.t) : unit =
let@ () = with_lock lock in
(* shutdown existing thread, if any *)
(match Atomic.get st with
| Some st ->
Atomic.set st.active false;
Nanoev.wakeup_from_outside st.nanoev;
Thread.join st.th
| None -> ());
(* start new bg thread *)
let active = Atomic.make true in
Atomic.set st
@@ Some
{
active;
nanoev = ev;
th = Thread.create (bg_thread_ ~active ~evloop:ev) ();
}
end
let has_bg_thread = Global_.has_bg_thread
let setup_bg_thread = Global_.setup_bg_thread
let[@inline] get_loop_exn_ () : Nanoev.t =
match Atomic.get Global_.st with
| None -> failwith "No nanoev loop installed."
| Some st -> st.nanoev
let[@inline] unwrap_ = function
| None -> ()
| Some (exn, bt) -> Printexc.raise_with_backtrace exn bt
let retry_read_ fd f =
let ev = get_loop_exn_ () in
let[@unroll 1] rec loop () =
match f () with
| res -> res
| exception
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
Trace_.message "read must wait";
let trigger = Picos.Trigger.create () in
let closed_r = ref false in
Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r ->
closed_r := closed;
Picos.Trigger.signal trigger);
Picos.Trigger.await trigger |> unwrap_;
if !closed_r then raise Closed;
loop ()
in
loop ()
let retry_write_ fd f =
let ev = get_loop_exn_ () in
let rec loop () =
match f () with
| res -> res
| exception
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
Trace_.message "write must wait";
let trigger = Picos.Trigger.create () in
let closed_r = ref false in
Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r ->
closed_r := closed;
Picos.Trigger.signal trigger);
Picos.Trigger.await trigger |> unwrap_;
if !closed_r then raise Closed;
loop ()
in
loop ()
let read fd buf i len : int =
try
retry_read_ fd (fun () ->
Trace_.message "read";
Unix.read fd buf i len)
with Closed -> 0
let close fd =
Unix.close fd;
let ev = get_loop_exn_ () in
Nanoev.close ev fd
let accept fd =
try
retry_read_ fd (fun () ->
Trace_.message "accept";
Unix.accept fd)
with Unix.Unix_error ((Unix.ESHUTDOWN | Unix.ECONNABORTED), _, _) ->
raise Closed
let write fd buf i len : int =
try
retry_write_ fd (fun () ->
Trace_.message "write";
Unix.write fd buf i len)
with Closed -> 0
let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr)
let sleep t =
if t > 0. then (
let ev = get_loop_exn_ () in
let trigger = Picos.Trigger.create () in
Nanoev.run_after_s ev t trigger () (fun trigger () ->
Picos.Trigger.signal trigger);
Picos.Trigger.await trigger |> unwrap_
)

View file

@ -1,32 +1,18 @@
(** Basic interface with picos *) (** Basic interface with picos *)
val setup_bg_thread : Nanoev.t -> unit module Background_thread = Background_thread
(** Install this event loop in a background thread *)
val has_bg_thread : unit -> bool
(** [has_bg_thread ()] is [true] iff a background thread is running a nanoev loop *)
(** {2 Non blocking IO primitives} *) (** {2 Non blocking IO primitives} *)
val read : Unix.file_descr -> bytes -> int -> int -> int module Base = Base
(** Read from the non blocking FD.
@raise Nanoev.Closed if the FD is closed
@raise Unix.Unix_error for other errors *)
val write : Unix.file_descr -> bytes -> int -> int -> int include module type of struct
(** Write into the non blocking FD. include Base
@raise Nanoev.Closed if the FD is closed end
@raise Unix.Unix_error for other errors *)
val close : Unix.file_descr -> unit (** {2 Building blocks on top of {!Base}} *)
(** Close the file descriptor
@raise Unix.Unix_error when it fails *)
val connect : Unix.file_descr -> Unix.sockaddr -> unit module IO_in = IO_in
module IO_out = IO_out
val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr module Net_client = Net_client
(** Accept a connection on this fd. module Net_server = Net_server
@raise Nanoev.Closed if the FD is closed.
@raise Unix.Unix_error for other errors *)
val sleep : float -> unit

23
src/picos/net_client.ml Normal file
View file

@ -0,0 +1,23 @@
open Common_
let connect addr : Unix.file_descr =
let sock = Unix.socket (Unix.domain_of_sockaddr addr) Unix.SOCK_STREAM 0 in
Unix.set_nonblock sock;
(try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ());
(* connect asynchronously *)
Base.connect sock addr;
sock
let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a =
let sock = connect addr in
let ic = IO_in.of_unix_fd sock in
let oc = IO_out.of_unix_fd sock in
let finally () =
(try Unix.shutdown sock Unix.SHUTDOWN_ALL with _ -> ());
try Unix.close sock with _ -> ()
in
let@ () = Fun.protect ~finally in
f ic oc

111
src/picos/net_server.ml Normal file
View file

@ -0,0 +1,111 @@
module Sem = Picos_std_sync.Semaphore.Counting
type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit
type t = {
active: bool Atomic.t;
sock: Unix.file_descr;
client_handler: client_handler;
spawn: (unit -> unit) -> unit Picos.Computation.t;
max_conns: int;
sem: Sem.t;
mutable running: unit Picos.Computation.t option;
exn_handler: exn -> Printexc.raw_backtrace -> unit;
}
let[@inline] join (self : t) : unit =
Option.iter Picos.Computation.await self.running
let[@inline] max_connections self = self.max_conns
let[@inline] n_active_connections (self : t) : int =
self.max_conns - Sem.get_value self.sem
let[@inline] running (self : t) : bool = Atomic.get self.active
let shutdown (self : t) = if Atomic.exchange self.active false then ()
open struct
let default_exn_handler exn bt =
Printf.eprintf "uncaught exception in network server: %s\n%s%!"
(Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt)
let run (self : t) () : unit =
while Atomic.get self.active do
let client_sock, client_addr = Base.accept self.sock in
Sem.acquire self.sem;
let cleanup () =
(try Unix.shutdown client_sock Unix.SHUTDOWN_ALL with _ -> ());
(* TODO: close in nanoev too *)
(try Unix.close client_sock with _ -> ());
Sem.release self.sem
in
let comp : _ Picos.Computation.t =
self.spawn (fun () ->
let ic = IO_in.of_unix_fd client_sock in
let oc = IO_out.of_unix_fd client_sock in
try
self.client_handler client_addr ic oc;
cleanup ()
with exn ->
let bt = Printexc.get_raw_backtrace () in
cleanup ();
self.exn_handler exn bt)
in
ignore (comp : _ Picos.Computation.t)
done
end
let establish ?backlog ?max_connections ?(exn_handler = default_exn_handler)
~spawn ~(client_handler : client_handler) addr : t =
let ev =
match Atomic.get Global_.st with
| Some { nanoev = ev; _ } -> ev
| None -> invalid_arg "Nanoev_picos.Net_server: no event loop installed"
in
let max_connections =
match max_connections with
| None -> Nanoev.max_fds ev
| Some n -> min (Nanoev.max_fds ev) n
in
let sem = Sem.make max_connections in
let backlog =
match backlog with
| Some n -> max 4 n
| None -> max 4 max_connections
in
let domain = Unix.domain_of_sockaddr addr in
let sock = Unix.socket domain Unix.SOCK_STREAM 0 in
Unix.bind sock addr;
Unix.listen sock backlog;
Unix.set_nonblock sock;
Unix.setsockopt sock Unix.SO_REUSEADDR true;
(try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ());
let server =
{
active = Atomic.make true;
max_conns = max_connections;
sem;
spawn;
sock;
client_handler;
running = None;
exn_handler;
}
in
server.running <- Some (spawn (run server));
server
let with_ ?backlog ?max_connections ?exn_handler ~spawn ~client_handler addr f =
let server =
establish ?backlog ?max_connections ?exn_handler ~spawn ~client_handler addr
in
Fun.protect ~finally:(fun () -> shutdown server) (fun () -> f server)

37
src/picos/net_server.mli Normal file
View file

@ -0,0 +1,37 @@
type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit
type t
val join : t -> unit
(** Wait for server to shutdown *)
val shutdown : t -> unit
(** Ask the server to stop *)
val running : t -> bool
val max_connections : t -> int
val n_active_connections : t -> int
val establish :
?backlog:int ->
?max_connections:int ->
?exn_handler:(exn -> Printexc.raw_backtrace -> unit) ->
spawn:((unit -> unit) -> unit Picos.Computation.t) ->
client_handler:client_handler ->
Unix.sockaddr ->
t
(** Create and start a new server on the given socket address.
@param spawn used to spawn a new computation per client
@param client_handler
the logic for talking to a client, will run in its own computation
@param backlog number of connections waiting in the listening socket
@param max_connections max number of simultaneous connections *)
val with_ :
?backlog:int ->
?max_connections:int ->
?exn_handler:(exn -> Printexc.raw_backtrace -> unit) ->
spawn:((unit -> unit) -> unit Picos.Computation.t) ->
client_handler:client_handler ->
Unix.sockaddr ->
(t -> 'a) ->
'a

6
src/posix/dune Normal file
View file

@ -0,0 +1,6 @@
(library
(name nanoev_posix)
(public_name nanoev-posix)
(synopsis "posix backend (poll/ppoll+mtime)")
(private_modules heap)
(libraries threads nanoev unix iomux mtime mtime.clock.os))

61
src/posix/heap.ml Normal file
View file

@ -0,0 +1,61 @@
type 'a tree =
| E
| N of int * 'a * 'a tree * 'a tree
type 'a t = {
leq: 'a -> 'a -> bool;
mutable t: 'a tree;
}
let create ~leq () : _ t = { leq; t = E }
let[@inline] is_empty (self : _ t) =
match self.t with
| E -> true
| N _ -> false
exception Empty
open struct
(** Rank of the tree *)
let[@inline] rank_ = function
| E -> 0
| N (r, _, _, _) -> r
(** Make a balanced node labelled with [x], and subtrees [a] and [b]. We
ensure that the right child's rank is to the rank of the left child
(leftist property). The rank of the resulting node is the length of the
rightmost path. *)
let[@inline] mk_node_ x a b =
if rank_ a >= rank_ b then
N (rank_ b + 1, x, a, b)
else
N (rank_ a + 1, x, b, a)
let rec merge ~leq t1 t2 =
match t1, t2 with
| t, E -> t
| E, t -> t
| N (_, x, a1, b1), N (_, y, a2, b2) ->
if leq x y then
mk_node_ x a1 (merge ~leq b1 t2)
else
mk_node_ y a2 (merge ~leq t1 b2)
end
let clear self = self.t <- E
let[@inline] insert (self : _ t) x : unit =
self.t <- merge ~leq:self.leq self.t (N (1, x, E, E))
let[@inline] peek_min_exn (self : _ t) =
match self.t with
| E -> raise Empty
| N (_, x, _, _) -> x
let[@inline] pop_min_exn (self : _ t) =
match self.t with
| E -> raise Empty
| N (_, x, l, r) ->
self.t <- merge ~leq:self.leq l r;
x

13
src/posix/heap.mli Normal file
View file

@ -0,0 +1,13 @@
type 'a t
val create : leq:('a -> 'a -> bool) -> unit -> 'a t
val is_empty : _ t -> bool
(** [is_empty h] returns [true] if the heap [h] is empty. *)
exception Empty
val clear : _ t -> unit
val insert : 'a t -> 'a -> unit
val peek_min_exn : 'a t -> 'a
val pop_min_exn : 'a t -> 'a

403
src/posix/nanoev_posix.ml Normal file
View file

@ -0,0 +1,403 @@
open struct
module Trace_ = Nanoev.Trace_
let ( let@ ) = ( @@ )
let now_ns : unit -> int64 = Mtime_clock.now_ns
let[@inline] ns_of_s (t : float) : int64 = Int64.of_float (t *. 1e9)
let[@inline] ns_to_s (t : int64) : float = Int64.to_float t /. 1e9
end
module Fd_tbl = Hashtbl.Make (struct
open Iomux.Util
type t = Unix.file_descr
let equal a b = Int.equal (fd_of_unix a) (fd_of_unix b)
let hash a = Hashtbl.hash (fd_of_unix a)
end)
module P = Iomux.Poll
module Flags = P.Flags
module Sync_queue = struct
type 'a t = {
q: 'a Queue.t;
mutex: Mutex.t;
}
let create () : _ t = { q = Queue.create (); mutex = Mutex.create () }
let push (self : _ t) x : unit =
Mutex.lock self.mutex;
Queue.push x self.q;
Mutex.unlock self.mutex
let transfer (self : _ t) q : unit =
Mutex.lock self.mutex;
Queue.transfer self.q q;
Mutex.unlock self.mutex
end
(** Callback list *)
type cbs =
| Nil
| Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs
type timer_ev =
| Timer : {
deadline: int64;
x: 'a;
y: 'b;
f: 'a -> 'b -> unit;
}
-> timer_ev
type fd_data = {
fd: Unix.file_descr;
mutable idx: int;
(** Index in the poll buffer. Mutable because we might change it when we
swap FDs to remove items. *)
mutable r: cbs;
mutable w: cbs;
}
(** Data associated to a given FD *)
let[@inline] fd_flags (self : fd_data) : Flags.t =
let fl = ref Flags.empty in
(if self.r != Nil then fl := Flags.(!fl + pollin));
(if self.w != Nil then fl := Flags.(!fl + pollout));
!fl
type queued_task =
| Q_run_after of timer_ev
| Q_on_readable :
Unix.file_descr * 'a * 'b * (closed:bool -> 'a -> 'b -> unit)
-> queued_task
| Q_on_writable :
Unix.file_descr * 'a * 'b * (closed:bool -> 'a -> 'b -> unit)
-> queued_task
| Q_clear
| Q_close of Unix.file_descr
type st = {
timer: timer_ev Heap.t;
fds: fd_data Fd_tbl.t;
poll: P.t;
mutable len: int; (** length of the active prefix of the [poll] buffer *)
wakeup_rd: Unix.file_descr;
wakeup_wr: Unix.file_descr;
wakeup_triggered: bool Atomic.t;
(** Make [wakeup_from_outside] idempotent within an iteration of [step] *)
in_poll: bool Atomic.t;
(** Are we currently inside a call to [poll], and in which thread? Useful
for other threads to know whether to wake us up via the pipe *)
mutable owner_thread: int;
(** Thread allowed to perform operations on this poll instance. Starts at
[-1]. *)
queued_tasks: queued_task Sync_queue.t;
(** While in [poll()], changes get queued, so we don't invalidate the poll
buffer before the syscall returns *)
}
let[@inline] queue_task_ (self : st) t : unit =
Sync_queue.push self.queued_tasks t
(** [true] if called from the owner thread *)
let[@inline] in_owner_thread (self : st) : bool =
self.owner_thread != -1 && self.owner_thread == Thread.(id (self ()))
let[@inline] in_poll (self : st) : bool = Atomic.get self.in_poll
let[@inline] leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline
let create_st () : st =
let wakeup_rd, wakeup_wr = Unix.pipe () in
(* reading end must be non blocking so it's not always immediately
ready; writing end is blocking to make it simpler to wakeup from other
threads *)
Unix.set_nonblock wakeup_rd;
let self =
{
timer = Heap.create ~leq:leq_timer ();
fds = Fd_tbl.create 16;
poll = P.create ();
len = 0;
wakeup_rd;
wakeup_wr;
wakeup_triggered = Atomic.make false;
in_poll = Atomic.make false;
owner_thread = -1;
queued_tasks = Sync_queue.create ();
}
in
(* always watch for the pipe being readable *)
P.set_index self.poll 0 self.wakeup_rd Flags.pollin;
self.len <- 1;
self
let max_fds (self : st) : int = P.maxfds self.poll
let[@inline never] wakeup_real_ (self : st) : unit =
let@ _sp =
Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.wakeup-from-outside"
in
let b = Bytes.make 1 '!' in
ignore (Unix.write self.wakeup_wr b 0 1 : int)
let[@inline] wakeup_ (self : st) : unit =
if not (Atomic.exchange self.wakeup_triggered true) then wakeup_real_ self
let wakeup_from_outside (self : st) : unit =
let already_awake =
(* to avoid race conditions we only take the shortcut if
this is called from the owner thread *)
in_owner_thread self && not (Atomic.get self.in_poll)
in
if not already_awake then wakeup_ self
let rec perform_cbs ~closed = function
| Nil -> ()
| Sub (x, y, f, tail) ->
f ~closed x y;
perform_cbs ~closed tail
(** Change the event loop right now. This must be called only from the owner
thread and outside of [poll]. *)
module Run_now_ = struct
let rec perform_cbs_closed ~closed = function
| Nil -> ()
| Sub (x, y, f, tail) ->
f ~closed x y;
perform_cbs_closed ~closed tail
let clear_ (self : st) : unit =
Heap.clear self.timer;
Fd_tbl.clear self.fds;
for i = 0 to P.maxfds self.poll - 1 do
P.set_index self.poll i P.invalid_fd Flags.empty
done;
Atomic.set self.wakeup_triggered false;
self.len <- 0;
()
let get_fd_ (self : st) fd : fd_data =
(* assert (in_owner_thread self && not (in_poll self)); *)
match Fd_tbl.find self.fds fd with
| per_fd -> per_fd
| exception Not_found ->
let idx =
if self.len = P.maxfds self.poll then
invalid_arg "No available slot in poll";
let n = self.len in
self.len <- self.len + 1;
n
in
let per_fd = { idx; fd; r = Nil; w = Nil } in
Fd_tbl.add self.fds fd per_fd;
per_fd
let remove_fd_ (self : st) (fd_data : fd_data) : unit =
Fd_tbl.remove self.fds fd_data.fd;
P.set_index self.poll fd_data.idx P.invalid_fd Flags.empty;
(* assert (in_owner_thread self && not (in_poll self)); *)
if fd_data.idx > 0 && fd_data.idx + 1 < self.len then (
(* not the last element nor the first (pipe_rd), move the last element
here to keep the buffer non sparse *)
let last_fd = P.get_fd self.poll (self.len - 1) in
assert (last_fd <> fd_data.fd);
match Fd_tbl.find_opt self.fds last_fd with
| None -> assert false
| Some last_fd_data ->
(* move the last FD to [idx] *)
last_fd_data.idx <- fd_data.idx;
P.set_index self.poll fd_data.idx last_fd (fd_flags last_fd_data)
);
self.len <- self.len - 1;
()
let close_ (self : st) fd : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in
let r, w =
match Fd_tbl.find self.fds fd with
| fd_data ->
remove_fd_ self fd_data;
fd_data.r, fd_data.w
| exception Not_found -> Nil, Nil
in
perform_cbs_closed ~closed:true r;
perform_cbs_closed ~closed:true w;
()
let on_readable_ self fd x y f : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in
let fd_data = get_fd_ self fd in
fd_data.r <- Sub (x, y, f, fd_data.r);
P.set_index self.poll fd_data.idx fd (fd_flags fd_data)
let on_writable_ self fd x y f : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in
let fd_data = get_fd_ self fd in
fd_data.w <- Sub (x, y, f, fd_data.w);
P.set_index self.poll fd_data.idx fd (fd_flags fd_data)
let run_after_s_ self ev : unit = Heap.insert self.timer ev
let perform_task_ self (t : queued_task) : unit =
match t with
| Q_run_after t -> run_after_s_ self t
| Q_on_readable (fd, x, y, f) -> on_readable_ self fd x y f
| Q_on_writable (fd, x, y, f) -> on_writable_ self fd x y f
| Q_clear -> clear_ self
| Q_close fd -> close_ self fd
end
let clear (self : st) =
if in_owner_thread self && not (in_poll self) then
Run_now_.clear_ self
else (
queue_task_ self @@ Q_clear;
wakeup_from_outside self
)
let close (self : st) fd : unit =
if in_owner_thread self && not (in_poll self) then
Run_now_.close_ self fd
else (
queue_task_ self @@ Q_close fd;
wakeup_from_outside self
)
let on_readable self fd x y f : unit =
if in_owner_thread self && not (in_poll self) then
Run_now_.on_readable_ self fd x y f
else (
queue_task_ self @@ Q_on_readable (fd, x, y, f);
wakeup_from_outside self
)
let on_writable self fd x y f : unit =
if in_owner_thread self && not (in_poll self) then
Run_now_.on_writable_ self fd x y f
else (
queue_task_ self @@ Q_on_writable (fd, x, y, f);
wakeup_from_outside self
)
let run_after_s self (time : float) x y f : unit =
let deadline = Int64.add (now_ns ()) (ns_of_s time) in
let ev = Timer { deadline; x; y; f } in
if in_owner_thread self && not (in_poll self) then
Run_now_.run_after_s_ self ev
else (
queue_task_ self @@ Q_run_after ev;
wakeup_from_outside self
)
let next_deadline_ (self : st) : int64 option =
match Heap.peek_min_exn self.timer with
| exception Heap.Empty -> None
| Timer t -> Some t.deadline
let step (self : st) : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.posix.step" in
self.owner_thread <- Thread.(id (self ()));
let now = now_ns () in
let timeout_ns : int64 =
match next_deadline_ self with
| None -> 30_000_000_000L
| Some d -> Int64.max 0L (Int64.sub d now)
in
(* run timers *)
while
if Heap.is_empty self.timer then
false
else (
let (Timer t) = Heap.peek_min_exn self.timer in
if t.deadline <= now then (
ignore (Heap.pop_min_exn self.timer : timer_ev);
t.f t.x t.y;
true
) else
false
)
do
()
done;
(* process all queued tasks.
NOTE: race condition: if another thread queues tasks after we do
the transfer, it will call [wakeup_from_outside] and make the pipe_rd FD
readable. So as soon as we call [poll], it will return and we will find
the queued tasks waiting for us. *)
let local_q = Queue.create () in
Sync_queue.transfer self.queued_tasks local_q;
while not (Queue.is_empty local_q) do
let t = Queue.pop local_q in
Run_now_.perform_task_ self t
done;
Atomic.set self.in_poll true;
(* enter [poll] *)
let num_ready_fds =
let@ _sp =
Trace_.with_span ~__FILE__ ~__LINE__ "poll" ~data:(fun () ->
[ "timeout", `Float (ns_to_s timeout_ns); "len", `Int self.len ])
in
P.ppoll_or_poll self.poll self.len (Nanoseconds timeout_ns)
in
Atomic.set self.in_poll false;
(* drain notification pipe *)
if Atomic.exchange self.wakeup_triggered false then (
let b1 = Bytes.create 8 in
while try Unix.read self.wakeup_rd b1 0 8 > 0 with _ -> false do
()
done
);
(* call callbacks *)
P.iter_ready self.poll num_ready_fds (fun _idx fd flags ->
if fd <> self.wakeup_rd then (
let fd_data =
try Fd_tbl.find self.fds fd with Not_found -> assert false
in
if Flags.mem Flags.pollin flags then (
let r = fd_data.r in
fd_data.r <- Nil;
perform_cbs ~closed:false r
);
if Flags.mem Flags.pollout flags then (
let w = fd_data.w in
fd_data.w <- Nil;
perform_cbs ~closed:false w
);
if Flags.empty = fd_flags fd_data then Run_now_.remove_fd_ self fd_data
));
()
let ops : st Nanoev.Impl.ops =
{
step;
close;
on_readable;
on_writable;
run_after_s;
max_fds;
wakeup_from_outside;
clear;
}
include Nanoev
let create () : t = Impl.build ops (create_st ())

View file

@ -0,0 +1,8 @@
(** Nano event loop using Poll/Ppoll *)
include module type of struct
include Nanoev
end
val create : unit -> t
(** Create a new nanoev loop using [Iomux] (poll/ppoll). *)

View file

@ -5,6 +5,7 @@
threads threads
picos picos
(re_export nanoev) (re_export nanoev)
nanoev.picos (re_export nanoev-picos)
picos_std.sync
(re_export iostream) (re_export iostream)
(re_export tiny_httpd))) (re_export tiny_httpd)))

View file

@ -5,45 +5,8 @@ module Slice = Iostream.Slice
module Pool = TH.Pool module Pool = TH.Pool
module Buf = TH.Buf module Buf = TH.Buf
let unwrap_ = function module Sem_ = Picos_std_sync.Semaphore.Counting
| None -> ()
| Some (exn, bt) -> Printexc.raise_with_backtrace exn bt
(** Non blocking semaphore *) (** Non blocking semaphore *)
module Sem_ = struct
type t = {
mutable n: int;
max: int;
waiting: Picos.Trigger.t Queue.t;
mutex: Mutex.t;
}
let create n =
if n <= 0 then invalid_arg "Semaphore.create";
{ n; max = n; mutex = Mutex.create (); waiting = Queue.create () }
let acquire self =
Mutex.lock self.mutex;
while self.n = 0 do
let tr = Picos.Trigger.create () in
Queue.push tr self.waiting;
Mutex.unlock self.mutex;
let res = Picos.Trigger.await tr in
unwrap_ res;
Mutex.lock self.mutex
done;
assert (self.n > 0);
self.n <- self.n - 1;
Mutex.unlock self.mutex
let release self =
Mutex.lock self.mutex;
self.n <- self.n + 1;
Option.iter Picos.Trigger.signal (Queue.take_opt self.waiting);
Mutex.unlock self.mutex
let num_acquired self = self.max - self.n
end
module Out = struct module Out = struct
open Iostream open Iostream
@ -60,7 +23,7 @@ module Out = struct
let i = ref i in let i = ref i in
let len = ref len0 in let len = ref len0 in
while !len > 0 do while !len > 0 do
match EV.write fd bs !i !len with match EV.write_once fd bs !i !len with
| 0 -> failwith "write failed" | 0 -> failwith "write failed"
| n -> | n ->
i := !i + n; i := !i + n;
@ -147,7 +110,7 @@ module Unix_tcp_server_ = struct
new_thread: (unit -> unit) -> unit; new_thread: (unit -> unit) -> unit;
timeout: float; timeout: float;
masksigpipe: bool; masksigpipe: bool;
mutable running: bool; (* TODO: use an atomic? *) running: bool Atomic.t;
} }
let shutdown_silent_ fd = let shutdown_silent_ fd =
@ -183,7 +146,7 @@ module Unix_tcp_server_ = struct
let inet_addr = Unix.inet_addr_of_string self.addr in let inet_addr = Unix.inet_addr_of_string self.addr in
Unix.setsockopt sock Unix.SO_REUSEADDR true; Unix.setsockopt sock Unix.SO_REUSEADDR true;
Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port)); Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port));
let n_listen = 2 * self.max_connections in let n_listen = self.max_connections in
Unix.listen sock n_listen Unix.listen sock n_listen
); );
@ -191,10 +154,16 @@ module Unix_tcp_server_ = struct
let tcp_server = let tcp_server =
{ {
TH.IO.TCP_server.stop = (fun () -> self.running <- false); TH.IO.TCP_server.stop =
running = (fun () -> self.running); (fun () ->
Atomic.set self.running false;
(* close accept socket so the main loop will return *)
try Unix.close sock with _ -> ());
running = (fun () -> Atomic.get self.running);
active_connections = active_connections =
(fun () -> Sem_.num_acquired self.sem_max_connections); (fun () ->
self.max_connections - Sem_.get_value self.sem_max_connections);
endpoint = endpoint =
(fun () -> (fun () ->
let addr, port = get_addr_ sock in let addr, port = get_addr_ sock in
@ -233,7 +202,7 @@ module Unix_tcp_server_ = struct
in in
Unix.set_nonblock sock; Unix.set_nonblock sock;
while self.running do while Atomic.get self.running do
match EV.accept sock with match EV.accept sock with
| client_sock, client_addr -> | client_sock, client_addr ->
(* limit concurrency *) (* limit concurrency *)
@ -272,7 +241,7 @@ module Unix_tcp_server_ = struct
done; done;
(* Wait for all threads to be done: this only works if all threads are done. *) (* Wait for all threads to be done: this only works if all threads are done. *)
Unix.close sock; (try Unix.close sock with _ -> ());
(* TODO? *) (* TODO? *)
(* Sem_.acquire self.sem_max_connections.max self.sem_max_connections; *) (* Sem_.acquire self.sem_max_connections.max self.sem_max_connections; *)
()); ());
@ -281,7 +250,7 @@ end
open struct open struct
let get_max_connection_ ?(max_connections = 2048) () : int = let get_max_connection_ ?(max_connections = 2048) () : int =
let max_connections = max 4 max_connections in let max_connections = min (max 4 @@ EV.max_fds ()) max_connections in
max_connections max_connections
let clear_slice (slice : Slice.t) = let clear_slice (slice : Slice.t) =
@ -290,16 +259,22 @@ open struct
slice.len <- 0 slice.len <- 0
end end
let create ?(masksigpipe = not Sys.win32) ?max_connections ?(timeout = 0.0) let create ?(masksigpipe = not Sys.win32) ?max_connections ?max_buf_pool_size
?buf_size ?(get_time_s = Unix.gettimeofday) ?(addr = "127.0.0.1") ?(timeout = 0.0) ?buf_size ?(get_time_s = Unix.gettimeofday)
?(port = 8080) ?sock ?middlewares ~new_thread () : TH.Server.t = ?(addr = "127.0.0.1") ?(port = 8080) ?sock ?middlewares ~new_thread () :
TH.Server.t =
let max_connections = get_max_connection_ ?max_connections () in let max_connections = get_max_connection_ ?max_connections () in
let max_pool_size =
match max_buf_pool_size with
| None -> min 4096 max_connections * 2
| Some m -> m
in
let server = let server =
{ {
Unix_tcp_server_.addr; Unix_tcp_server_.addr;
new_thread; new_thread;
buf_pool = buf_pool =
Pool.create ~clear:Buf.clear_and_zero Pool.create ~clear:Buf.clear_and_zero ~max_size:max_pool_size
~mk_item:(fun () -> Buf.create ?size:buf_size ()) ~mk_item:(fun () -> Buf.create ?size:buf_size ())
(); ();
slice_pool = slice_pool =
@ -308,11 +283,11 @@ let create ?(masksigpipe = not Sys.win32) ?max_connections ?(timeout = 0.0)
(let buf_size = Option.value buf_size ~default:4096 in (let buf_size = Option.value buf_size ~default:4096 in
fun () -> Slice.create buf_size) fun () -> Slice.create buf_size)
(); ();
running = true; running = Atomic.make true;
port; port;
sock; sock;
max_connections; max_connections;
sem_max_connections = Sem_.create max_connections; sem_max_connections = Sem_.make max_connections;
masksigpipe; masksigpipe;
timeout; timeout;
} }

View file

@ -3,6 +3,7 @@ module TH = Tiny_httpd_core
val create : val create :
?masksigpipe:bool -> ?masksigpipe:bool ->
?max_connections:int -> ?max_connections:int ->
?max_buf_pool_size:int ->
?timeout:float -> ?timeout:float ->
?buf_size:int -> ?buf_size:int ->
?get_time_s:(unit -> float) -> ?get_time_s:(unit -> float) ->

View file

@ -2,4 +2,5 @@
(name nanoev_unix) (name nanoev_unix)
(public_name nanoev.unix) (public_name nanoev.unix)
(synopsis "Unix/select backend") (synopsis "Unix/select backend")
(private_modules heap)
(libraries nanoev unix)) (libraries nanoev unix))

View file

@ -22,10 +22,10 @@ open struct
| E -> 0 | E -> 0
| N (r, _, _, _) -> r | N (r, _, _, _) -> r
(** Make a balanced node labelled with [x], and subtrees [a] and [b]. (** Make a balanced node labelled with [x], and subtrees [a] and [b]. We
We ensure that the right child's rank is to the rank of the ensure that the right child's rank is to the rank of the left child
left child (leftist property). The rank of the resulting node (leftist property). The rank of the resulting node is the length of the
is the length of the rightmost path. *) rightmost path. *)
let[@inline] mk_node_ x a b = let[@inline] mk_node_ x a b =
if rank_ a >= rank_ b then if rank_ a >= rank_ b then
N (rank_ b + 1, x, a, b) N (rank_ b + 1, x, a, b)

View file

@ -174,17 +174,35 @@ let next_deadline_ (self : st) : float option =
let step (self : st) : unit = let step (self : st) : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.unix.step" in let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.unix.step" in
(* gather the subscriptions and timeout *) (* gather the subscriptions and timeout *)
let now = now_ () in
let timeout, sub_r, sub_w = let timeout, sub_r, sub_w =
let@ self = with_lock_ self in let@ self = with_lock_ self in
recompute_if_needed self; recompute_if_needed self;
let timeout = let timeout =
match next_deadline_ self with match next_deadline_ self with
| None -> 30. | None -> 30.
| Some d -> max 0. (d -. now_ ()) | Some d -> max 0. (d -. now)
in in
timeout, self.sub_r, self.sub_w timeout, self.sub_r, self.sub_w
in in
(* run timers *)
while
if Heap.is_empty self.timer then
false
else (
let (Timer t) = Heap.peek_min_exn self.timer in
if t.deadline <= now then (
ignore (Heap.pop_min_exn self.timer : timer_ev);
t.f t.x t.y;
true
) else
false
)
do
()
done;
(* enter [select] *) (* enter [select] *)
Atomic.set self.in_select true; Atomic.set self.in_select true;
let r_reads, r_writes, _ = let r_reads, r_writes, _ =
@ -243,12 +261,16 @@ let step (self : st) : unit =
() ()
(* limit for select is fixed and known *)
let max_fds _ = 1024
let ops : st Nanoev.Impl.ops = let ops : st Nanoev.Impl.ops =
{ {
step; step;
close; close;
on_readable; on_readable;
on_writable; on_writable;
max_fds;
run_after_s; run_after_s;
wakeup_from_outside; wakeup_from_outside;
clear; clear;

3
tests/posix/dune Normal file
View file

@ -0,0 +1,3 @@
(tests
(names t1)
(libraries nanoev nanoev-posix threads))

View file

@ -0,0 +1,6 @@
notes about system limits in Linux:
- `ulimit -n 100000` will raise the max number of FDs for a process to 100000
- `/proc/sys/net/core/netdev_max_backlog` controls the kernel backlog size, raise it (default is 1000)
- `/proc/sys/net/core/somaxconn` is the max size of a socket backlog (as given to `listen()`), raise it (default is 4096)

4
tests/posix/echo/dune Normal file
View file

@ -0,0 +1,4 @@
(executables
(names echo_server echo_client)
(libraries moonpool moonpool.fib nanoev-picos nanoev-posix iostream
trace.core trace-tef))

View file

@ -0,0 +1,139 @@
module Trace = Trace_core
module F = Moonpool_fib
module IO = Nanoev_picos
module Sem = Picos_std_sync.Semaphore.Counting
[@@@ocaml.alert "-deprecated"]
let ( let@ ) = ( @@ )
let spf = Printf.sprintf
let pf = Printf.printf
let verbose = ref false
let reset_line = "\x1b[2K\r"
let n_loops_per_task = 100
let main ~runner:_ ~port ~unix_sock ~n ~n_conn () =
Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
pf "connect on %s n=%d n_conn=%d\n%!"
(if unix_sock = "" then
spf "localhost:%d" port
else
spf "unix:%S" unix_sock)
n n_conn;
let addr =
if unix_sock = "" then
Unix.ADDR_INET (Unix.inet_addr_loopback, port)
else
Unix.ADDR_UNIX unix_sock
in
Printf.printf "connecting to port %d\n%!" port;
let all_done = Atomic.make false in
let n_queries = Atomic.make 0 in
(* limit simultaneous number of connections *)
let sem = Sem.make n_conn in
let n_active_conns = Atomic.make 0 in
let progress_loop () =
while not (Atomic.get all_done) do
let n_queries = Atomic.get n_queries in
let n_conns = Atomic.get n_active_conns in
(* progress *)
Printf.printf "%sdone %d queries, %d active connections%!" reset_line
n_queries n_conns;
Trace.counter_int ~level:Info "n-conns" n_conns;
Trace.counter_int ~level:Info "n-queries" n_queries;
let gc = Gc.quick_stat () in
Trace.counter_int ~level:Info "gc.major" gc.major_collections;
Trace.counter_int ~level:Info "gc.minor" gc.minor_collections;
Trace.counter_int ~level:Info "gc.heap-size" (gc.heap_words * 64);
Thread.delay 0.2
done
in
ignore (Thread.create progress_loop () : Thread.t);
let run_task () =
let _task_sp =
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "run-task"
in
Sem.acquire sem;
( IO.Net_client.with_connect addr @@ fun ic oc ->
Atomic.incr n_active_conns;
let buf = Bytes.create 32 in
for _j = 1 to n_loops_per_task do
(*let _sp =
Trace.enter_manual_sub_span ~parent:_task_sp ~__FILE__ ~__LINE__
"write.loop" ~data:(fun () -> [ "iter", `Int _j ])
in*)
Atomic.incr n_queries;
Iostream.Out.output_string oc "hello";
Iostream.Out_buf.flush oc;
(* read back what we wrote *)
Iostream.In.really_input ic buf 0 (String.length "hello");
(* Trace.exit_manual_span _sp; *)
F.yield ()
done;
Atomic.decr n_active_conns;
Sem.release sem );
Trace.exit_manual_span _task_sp
in
let t_start = Mtime_clock.now () in
(* start the first [n_conn] tasks *)
let fibers = List.init (n * n_conn) (fun _ -> F.spawn run_task) in
List.iter F.await fibers;
Atomic.set all_done true;
let t_stop = Mtime_clock.now () in
let elapsed_s =
(Mtime.span t_start t_stop |> Mtime.Span.to_uint64_ns |> Int64.to_float)
*. 1e-9
in
(* exit when [fut_exit] is resolved *)
Printf.printf
"%sdone with main (time=%.4fs, n queries=%d, expect=%d, %.3f req/s)\n%!"
reset_line elapsed_s (Atomic.get n_queries)
(n * n_conn * n_loops_per_task)
(float (Atomic.get n_queries) /. elapsed_s)
let () =
let@ () = Trace_tef.with_setup () in
Trace.set_current_level Info;
Trace.set_thread_name "main";
let port = ref 1234 in
let unix_sock = ref "" in
let n = ref 1000 in
let n_conn = ref 20 in
let opts =
[
"-p", Arg.Set_int port, " port";
"-v", Arg.Set verbose, " verbose";
"-n", Arg.Set_int n, " number of iterations";
"--unix", Arg.Set_string unix_sock, " unix socket";
"--n-conn", Arg.Set_int n_conn, " number of simultaneous connections";
]
|> Arg.align
in
Arg.parse opts ignore "echo_client";
let@ () =
Nanoev_picos.Background_thread.with_setup (Nanoev_posix.create ())
in
F.main @@ fun runner ->
main ~runner ~port:!port ~unix_sock:!unix_sock ~n:!n ~n_conn:!n_conn ()

View file

@ -0,0 +1,118 @@
module F = Moonpool_fib
module IO = Nanoev_picos
module Trace = Trace_core
[@@@ocaml.alert "-deprecated"]
let ( let@ ) = ( @@ )
let pf = Printf.printf
let spf = Printf.sprintf
let verbose = ref false
let n_reply_response = Atomic.make 0
let str_of_sockaddr = function
| Unix.ADDR_UNIX s -> s
| Unix.ADDR_INET (addr, port) ->
spf "%s:%d" (Unix.string_of_inet_addr addr) port
let main ~port ~unix_sock ~max_conns ~runner () =
Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
pf "serve on %s\n%!"
(if unix_sock = "" then
spf "localhost:%d" port
else
spf "unix:%S" unix_sock);
let addr =
if unix_sock = "" then
Unix.ADDR_INET (Unix.inet_addr_loopback, port)
else (
(* remove leftover unix socket file, if any *)
(try Sys.remove unix_sock with _ -> ());
Unix.ADDR_UNIX unix_sock
)
in
let server =
IO.Net_server.establish ?max_connections:max_conns addr
~spawn:(fun f -> Moonpool.spawn ~on:runner f)
~client_handler:(fun client_addr ic oc ->
let _sp =
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "serve"
in
if !verbose then
pf "handle client on %s\n%!" (str_of_sockaddr client_addr);
let buf = Bytes.create 256 in
let continue = ref true in
while !continue do
match Iostream.In.input ic buf 0 (Bytes.length buf) with
| exception exn ->
continue := false;
Printf.eprintf "error in client handler: %s\n%!"
(Printexc.to_string exn)
| 0 -> continue := false
| n ->
Atomic.incr n_reply_response;
Iostream.Out.output oc buf 0 n;
Iostream.Out_buf.flush oc;
Picos.Fiber.yield ()
done;
Trace.exit_manual_span _sp;
if !verbose then
pf "done with client on %s\n%!" (str_of_sockaddr client_addr))
in
Printf.printf "max number of connections: %d\n%!"
(IO.Net_server.max_connections server);
if Trace.enabled () then
ignore
(Thread.create
(fun () ->
while IO.Net_server.running server do
Trace.counter_int ~level:Info "n-conns"
(IO.Net_server.n_active_connections server);
let gc = Gc.quick_stat () in
Trace.counter_int ~level:Info "gc.major" gc.major_collections;
Trace.counter_int ~level:Info "gc.minor" gc.minor_collections;
Trace.counter_int ~level:Info "n-reply-response"
(Atomic.get n_reply_response);
Trace.counter_int ~level:Info "gc.heap-size" (gc.heap_words * 64);
Thread.delay 0.2
done)
()
: Thread.t);
IO.Net_server.join server;
IO.Net_server.shutdown server;
print_endline "exit"
let () =
let@ () = Trace_tef.with_setup () in
Trace.set_current_level Info;
let port = ref 1234 in
let unix_sock = ref "" in
let max_conns = ref None in
let opts =
[
"-p", Arg.Set_int port, " port";
"--unix", Arg.Set_string unix_sock, " unix socket";
( "--max-conns",
Arg.Int (fun i -> max_conns := Some i),
" max number of connections" );
"-v", Arg.Set verbose, " verbose";
]
|> Arg.align
in
Arg.parse opts ignore "echo_server";
let@ () =
Nanoev_picos.Background_thread.with_setup (Nanoev_posix.create ())
in
F.main @@ fun runner ->
main ~port:!port ~unix_sock:!unix_sock ~max_conns:!max_conns ~runner ()

3
tests/posix/t1.expected Normal file
View file

@ -0,0 +1,3 @@
writing
can read
done writing

29
tests/posix/t1.ml Normal file
View file

@ -0,0 +1,29 @@
module E = Nanoev_posix
let mkpipe () : Unix.file_descr * Unix.file_descr =
let f1, f2 = Unix.pipe () in
Unix.set_nonblock f1;
Unix.set_nonblock f2;
f1, f2
let loop (e : E.t) =
while true do
E.step e
done
let () =
let ev = E.create () in
ignore (Thread.create loop ev : Thread.t);
let rd, wr = mkpipe () in
E.on_readable ev rd () () (fun ~closed () () ->
if closed then
print_endline "closed!"
else
print_endline "can read");
Thread.delay 0.05;
print_endline "writing";
ignore
(Unix.write wr (Bytes.unsafe_of_string "hello") 0 (String.length "hello")
: int);
Thread.delay 0.1;
print_endline "done writing"