diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index fb8daa9..0a73e77 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -56,6 +56,6 @@ jobs: dune-cache: true allow-prerelease-opam: true - - run: opam install ocamlformat.0.26.2 + - run: opam install ocamlformat.0.27.0 - run: opam exec -- make format-check diff --git a/.ocamlformat b/.ocamlformat index 7818345..f33f722 100644 --- a/.ocamlformat +++ b/.ocamlformat @@ -1,4 +1,4 @@ -version = 0.26.2 +version = 0.27.0 profile=conventional margin=80 if-then-else=k-r diff --git a/dune-project b/dune-project index e0cd8bb..b9b1aca 100644 --- a/dune-project +++ b/dune-project @@ -20,21 +20,64 @@ (synopsis "Tiny event loop abstraction") (depends ocaml dune base-unix) (depopts - (trace (>= 0.7)) - (picos - (and (>= 0.5) (< 0.7)))) + (trace + (>= 0.7))) (tags (unix select async))) +(package + (name nanoev-picos) + (synopsis "Use nanoev from picos") + (depends + ocaml + dune + base-unix + (nanoev + (= :version)) + (iostream + (>= 0.3)) + (picos + (and + (>= 0.5) + (< 0.7))) + (picos_std + (and + (>= 0.5) + (< 0.7)))) + (tags + (unix select async))) + +(package + (name nanoev-posix) + (synopsis "Use mtime+iomux (posix compliant) as a backend for nanoev") + (depends + ocaml + dune + base-unix + iomux + (nanoev (= :version)) + (nanoev-picos (= :version)) + (mtime + (>= 2.0)) + (moonpool :with-test) + (trace :with-test) + (trace-tef :with-test)) + (tags + (unix select async iomux nanoev))) + (package (name nanoev_tiny_httpd) (synopsis "Use nanoev as a basis for tiny_httpd") (depends ocaml dune - nanoev + (nanoev (= :version)) + (nanoev-picos (= :version)) picos - (tiny_httpd (>= 0.17))) - (tags (nanoev http))) + picos_std + (tiny_httpd + (>= 0.17))) + (tags + (nanoev http))) ; See the complete stanza docs at https://dune.readthedocs.io/en/stable/reference/dune-project/index.html diff --git a/examples/echo/dune b/examples/echo/dune index 7fa6bea..99959c7 100644 --- a/examples/echo/dune +++ b/examples/echo/dune @@ -1,4 +1,4 @@ (executable (name echo) - (libraries nanoev nanoev.unix moonpool moonpool.fib trace trace-tef - nanoev_tiny_httpd)) + (libraries nanoev nanoev.unix nanoev-posix moonpool moonpool.fib trace + trace-tef nanoev_tiny_httpd)) diff --git a/examples/echo/echo.ml b/examples/echo/echo.ml index a5543b7..0519801 100644 --- a/examples/echo/echo.ml +++ b/examples/echo/echo.ml @@ -79,33 +79,71 @@ let setup_logging () = Logs.set_reporter @@ Logs.format_reporter (); Logs.set_level ~all:true (Some Logs.Debug) +let emit_metrics_ pool server () = + while true do + Trace.counter_int ~level:Info "pool.tasks" (Moonpool.Runner.num_tasks pool); + Trace.counter_int ~level:Info "http.active-conns" + (Server.active_connections server); + Thread.delay 0.3 + done + let () = + Trace.set_current_level Info; let@ () = Trace_tef.with_setup () in Trace.set_thread_name "main"; let port_ = ref 8080 in let max_conn = ref 1024 in let j = ref 8 in + let backend = ref `Posix in + let buf_size = ref 4096 in + let max_buf_pool_size = ref None in + + let set_backend = function + | "posix" | "poll" | "default" -> backend := `Posix + | "unix" | "select" -> backend := `Unix + | s -> failwith @@ Printf.sprintf "unknown backend %S" s + in Arg.parse (Arg.align [ "--port", Arg.Set_int port_, " set port"; "-p", Arg.Set_int port_, " set port"; "-j", Arg.Set_int j, " number of threads"; + ( "--max-buf-pool-size", + Arg.Int (fun i -> max_buf_pool_size := Some i), + " max buffer pool size" ); + "--buf-size", Arg.Set_int buf_size, " buffer size"; "--debug", Arg.Unit setup_logging, " enable debug"; "--max-conns", Arg.Set_int max_conn, " maximum concurrent connections"; + ( "--backend", + Arg.Symbol + ([ "posix"; "default"; "unix"; "select"; "poll" ], set_backend), + " event loop backend" ); ]) (fun _ -> raise (Arg.Bad "")) "echo [option]*"; - let@ pool = Moonpool.Ws_pool.with_ ~num_threads:!j () in - let@ _runner = Moonpool_fib.main in + let@ pool = + fun yield -> + if !j > 1 then + let@ pool = Moonpool.Ws_pool.with_ ~num_threads:!j () in + let@ _runner = Moonpool_fib.main in + yield pool + else + Moonpool_fib.main yield + in - let ev = Nanoev_unix.create () in - Nanoev_picos.setup_bg_thread ev; + let ev = + match !backend with + | `Posix -> Nanoev_posix.create () + | `Unix -> Nanoev_unix.create () + in + let@ () = Nanoev_picos.Background_thread.with_setup ev in let server = Nanoev_tiny_httpd.create ~new_thread:(Moonpool.run_async pool) ~port:!port_ + ?max_buf_pool_size:!max_buf_pool_size ~buf_size:!buf_size ~max_connections:!max_conn () in @@ -273,8 +311,14 @@ let () = let s = to_string_top h in Response.make_string ~headers:[ "content-type", "text/html" ] @@ Ok s); - Printf.printf "listening on http://%s:%d\n%!" (Server.addr server) - (Server.port server); + Printf.printf + "listening on http://%s:%d with %d threads, %d max connections, %d max fds\n\ + %!" + (Server.addr server) (Server.port server) !j !max_conn (Nanoev.max_fds ev); + + if Trace.enabled () then + ignore (Thread.create (emit_metrics_ pool server) () : Thread.t); + match Server.run server with | Ok () -> () | Error e -> raise e diff --git a/nanoev-picos.opam b/nanoev-picos.opam new file mode 100644 index 0000000..44e1230 --- /dev/null +++ b/nanoev-picos.opam @@ -0,0 +1,34 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Use nanoev from picos" +maintainer: ["Simon Cruanes"] +authors: ["Simon Cruanes"] +license: "MIT" +tags: ["unix" "select" "async"] +homepage: "https://github.com/c-cube/nanoev" +bug-reports: "https://github.com/c-cube/nanoev/issues" +depends: [ + "ocaml" + "dune" {>= "2.7"} + "base-unix" + "nanoev" {= version} + "iostream" {>= "0.3"} + "picos" {>= "0.5" & < "0.7"} + "picos_std" {>= "0.5" & < "0.7"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/c-cube/nanoev.git" diff --git a/nanoev-posix.opam b/nanoev-posix.opam new file mode 100644 index 0000000..b9df267 --- /dev/null +++ b/nanoev-posix.opam @@ -0,0 +1,37 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Use mtime+iomux (posix compliant) as a backend for nanoev" +maintainer: ["Simon Cruanes"] +authors: ["Simon Cruanes"] +license: "MIT" +tags: ["unix" "select" "async" "iomux" "nanoev"] +homepage: "https://github.com/c-cube/nanoev" +bug-reports: "https://github.com/c-cube/nanoev/issues" +depends: [ + "ocaml" + "dune" {>= "2.7"} + "base-unix" + "iomux" + "nanoev" {= version} + "nanoev-picos" {= version} + "mtime" {>= "2.0"} + "moonpool" {with-test} + "trace" {with-test} + "trace-tef" {with-test} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/c-cube/nanoev.git" diff --git a/nanoev.opam b/nanoev.opam index 9fa7ba9..8268f40 100644 --- a/nanoev.opam +++ b/nanoev.opam @@ -15,7 +15,6 @@ depends: [ ] depopts: [ "trace" {>= "0.7"} - "picos" {>= "0.5" & < "0.7"} ] build: [ ["dune" "subst"] {dev} diff --git a/nanoev_tiny_httpd.opam b/nanoev_tiny_httpd.opam index 9504331..08833e7 100644 --- a/nanoev_tiny_httpd.opam +++ b/nanoev_tiny_httpd.opam @@ -10,8 +10,10 @@ bug-reports: "https://github.com/c-cube/nanoev/issues" depends: [ "ocaml" "dune" {>= "2.7"} - "nanoev" + "nanoev" {= version} + "nanoev-picos" {= version} "picos" + "picos_std" "tiny_httpd" {>= "0.17"} "odoc" {with-doc} ] diff --git a/src/core/nanoev.ml b/src/core/nanoev.ml index c15935e..75d34b7 100644 --- a/src/core/nanoev.ml +++ b/src/core/nanoev.ml @@ -7,6 +7,7 @@ module Impl = struct clear: 'st -> unit; wakeup_from_outside: 'st -> unit; close: 'st -> Unix.file_descr -> unit; + max_fds: 'st -> int; on_readable: 'a 'b. 'st -> @@ -39,6 +40,7 @@ type t = Impl.t let[@inline] clear (Ev (ops, st)) = ops.clear st let[@inline] wakeup_from_outside (Ev (ops, st)) = ops.wakeup_from_outside st let[@inline] close (Ev (ops, st)) fd = ops.close st fd +let[@inline] max_fds (Ev (ops, st)) = ops.max_fds st let[@inline] on_readable (Ev (ops, st)) fd x y f : unit = ops.on_readable st fd x y f diff --git a/src/core/nanoev.mli b/src/core/nanoev.mli index 4792cd6..213b9de 100644 --- a/src/core/nanoev.mli +++ b/src/core/nanoev.mli @@ -9,6 +9,7 @@ module Impl : sig clear: 'st -> unit; wakeup_from_outside: 'st -> unit; close: 'st -> Unix.file_descr -> unit; + max_fds: 'st -> int; on_readable: 'a 'b. 'st -> @@ -43,6 +44,9 @@ val step : t -> unit val close : t -> Unix.file_descr -> unit (** Close the file descriptor and clean it up *) +val max_fds : t -> int +(** Maximum number of file descriptors that can be observed at once. *) + val on_readable : t -> Unix.file_descr -> 'a -> 'b -> (closed:bool -> 'a -> 'b -> unit) -> unit diff --git a/src/picos/IO_in.ml b/src/picos/IO_in.ml new file mode 100644 index 0000000..6184eba --- /dev/null +++ b/src/picos/IO_in.ml @@ -0,0 +1,152 @@ +open Common_ + +class type t = object + method input : bytes -> int -> int -> int + (** Read into the slice. Returns [0] only if the stream is closed. *) + + method close : unit -> unit + (** Close the input. Must be idempotent. *) +end + +let create ?(close = ignore) ~input () : t = + object + method close = close + method input = input + end + +let empty : t = + object + method close () = () + method input _ _ _ = 0 + end + +let of_bytes ?(off = 0) ?len (b : bytes) : t = + (* i: current position in [b] *) + let i = ref off in + + let len = + match len with + | Some n -> + if n > Bytes.length b - off then invalid_arg "Iostream.In.of_bytes"; + n + | None -> Bytes.length b - off + in + let end_ = off + len in + + object + method input b_out i_out len_out = + let n = min (end_ - !i) len_out in + Bytes.blit b !i b_out i_out n; + i := !i + n; + n + + method close () = i := end_ + end + +let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s) + +(** Read into the given slice. + @return the number of bytes read, [0] means end of input. *) +let[@inline] input (self : #t) buf i len = self#input buf i len + +(** Close the channel. *) +let[@inline] close self : unit = self#close () + +let rec really_input (self : #t) buf i len = + if len > 0 then ( + let n = input self buf i len in + if n = 0 then raise End_of_file; + (really_input [@tailrec]) self buf (i + n) (len - n) + ) + +let really_input_string self n : string = + let buf = Bytes.create n in + really_input self buf 0 n; + Bytes.unsafe_to_string buf + +let copy_into ?(buf = Bytes.create _default_buf_size) (ic : #t) (oc : IO_out.t) + : unit = + let continue = ref true in + while !continue do + let len = input ic buf 0 (Bytes.length buf) in + if len = 0 then + continue := false + else + IO_out.output oc buf 0 len + done + +let concat (l0 : t list) : t = + let l = ref l0 in + let rec input b i len : int = + match !l with + | [] -> 0 + | ic :: tl -> + let n = ic#input b i len in + if n > 0 then + n + else ( + l := tl; + input b i len + ) + in + let close () = List.iter close l0 in + create ~close ~input () + +let input_all ?(buf = Bytes.create 128) (self : #t) : string = + let buf = ref buf in + let i = ref 0 in + + let[@inline] full_ () = !i = Bytes.length !buf in + + let grow_ () = + let old_size = Bytes.length !buf in + let new_size = min Sys.max_string_length (old_size + (old_size / 4) + 10) in + if old_size = new_size then + failwith "input_all: maximum input size exceeded"; + let new_buf = Bytes.extend !buf 0 (new_size - old_size) in + buf := new_buf + in + + let rec loop () = + if full_ () then grow_ (); + let available = Bytes.length !buf - !i in + let n = input self !buf !i available in + if n > 0 then ( + i := !i + n; + (loop [@tailrec]) () + ) + in + loop (); + + if full_ () then + Bytes.unsafe_to_string !buf + else + Bytes.sub_string !buf 0 !i + +let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) + (fd : Unix.file_descr) : t = + let buf_len = ref 0 in + let buf_off = ref 0 in + + let refill () = + buf_off := 0; + buf_len := Base.read fd buf 0 (Bytes.length buf) + in + + object + method input b i len : int = + if !buf_len = 0 then refill (); + let n = min len !buf_len in + if n > 0 then ( + Bytes.blit buf !buf_off b i n; + buf_off := !buf_off + n; + buf_len := !buf_len - n + ); + n + + method close () = + if close_noerr then ( + try Unix.close fd with _ -> () + ) else + Unix.close fd + end diff --git a/src/picos/IO_out.ml b/src/picos/IO_out.ml new file mode 100644 index 0000000..b98525f --- /dev/null +++ b/src/picos/IO_out.ml @@ -0,0 +1,118 @@ +open Common_ + +class type t = object + method output_char : char -> unit + method output : bytes -> int -> int -> unit + method flush : unit -> unit + method close : unit -> unit +end + +let create ?(flush = ignore) ?(close = ignore) ~output_char ~output () : t = + object + method flush () = flush () + method close () = close () + method output_char c = output_char c + method output bs i len = output bs i len + end + +let dummy : t = + object + method flush () = () + method close () = () + method output_char _ = () + method output _ _ _ = () + end + +let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) fd + : t = + let buf_off = ref 0 in + + let[@inline] is_full () = !buf_off = Bytes.length buf in + + let flush () = + if !buf_off > 0 then ( + Base.write fd buf 0 !buf_off; + buf_off := 0 + ) + in + + object + method output_char c = + if is_full () then flush (); + Bytes.set buf !buf_off c; + incr buf_off + + method output bs i len : unit = + let i = ref i in + let len = ref len in + + while !len > 0 do + (* make space *) + if is_full () then flush (); + + let n = min !len (Bytes.length buf - !buf_off) in + Bytes.blit bs !i buf !buf_off n; + buf_off := !buf_off + n; + i := !i + n; + len := !len - n + done; + (* if full, write eagerly *) + if is_full () then flush () + + method close () = + if close_noerr then ( + try + flush (); + Unix.close fd + with _ -> () + ) else ( + flush (); + Unix.close fd + ) + + method flush = flush + end + +let of_buffer (buf : Buffer.t) : t = + object + method close () = () + method flush () = () + method output_char c = Buffer.add_char buf c + method output bs i len = Buffer.add_subbytes buf bs i len + end + +(** Output the buffer slice into this channel *) +let[@inline] output_char (self : #t) c : unit = self#output_char c + +(** Output the buffer slice into this channel *) +let[@inline] output (self : #t) buf i len : unit = self#output buf i len + +let[@inline] output_string (self : #t) (str : string) : unit = + self#output (Bytes.unsafe_of_string str) 0 (String.length str) + +let output_line (self : #t) (str : string) : unit = + output_string self str; + output_char self '\n' + +(** Close the channel. *) +let[@inline] close self : unit = self#close () + +(** Flush (ie. force write) any buffered bytes. *) +let[@inline] flush self : unit = self#flush () + +let output_int self i = + let s = string_of_int i in + output_string self s + +let output_lines self seq = Seq.iter (output_line self) seq + +let tee (l : t list) : t = + match l with + | [] -> dummy + | [ oc ] -> oc + | _ -> + let output bs i len = List.iter (fun oc -> output oc bs i len) l in + let output_char c = List.iter (fun oc -> output_char oc c) l in + let close () = List.iter close l in + let flush () = List.iter flush l in + create ~flush ~close ~output ~output_char () diff --git a/src/picos/background_thread.ml b/src/picos/background_thread.ml new file mode 100644 index 0000000..261daad --- /dev/null +++ b/src/picos/background_thread.ml @@ -0,0 +1,7 @@ +let is_setup = Global_.has_bg_thread +let setup = Global_.setup_bg_thread +let shutdown = Global_.shutdown_bg_thread + +let with_setup ev f = + setup ev; + Fun.protect ~finally:shutdown f diff --git a/src/picos/background_thread.mli b/src/picos/background_thread.mli new file mode 100644 index 0000000..6e4a531 --- /dev/null +++ b/src/picos/background_thread.mli @@ -0,0 +1,10 @@ +val setup : Nanoev.t -> unit +(** Install this event loop in a background thread *) + +val shutdown : unit -> unit +(** Shutdown background thread, assuming {! is_setup} returns [true] *) + +val with_setup : Nanoev.t -> (unit -> 'a) -> 'a + +val is_setup : unit -> bool +(** [is_setup()] is [true] iff a background thread is running a nanoev loop *) diff --git a/src/picos/base.ml b/src/picos/base.ml new file mode 100644 index 0000000..142e467 --- /dev/null +++ b/src/picos/base.ml @@ -0,0 +1,104 @@ +open Common_ + +let[@inline] get_loop_exn_ () : Nanoev.t = + match Atomic.get Global_.st with + | None -> failwith "No nanoev loop installed." + | Some st -> st.nanoev + +let[@inline] unwrap_ = function + | None -> () + | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt + +let[@unroll 1] rec retry_read_ fd f = + match f () with + | res -> res + | exception + Unix.Unix_error + ( ( Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR | Unix.EINPROGRESS + | Unix.ECONNRESET ), + _, + _ ) -> + (* Trace_.message "read must wait"; *) + let trigger = Picos.Trigger.create () in + let closed_r = ref false in + let ev = get_loop_exn_ () in + Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r -> + closed_r := closed; + Picos.Trigger.signal trigger); + Picos.Trigger.await trigger |> unwrap_; + if !closed_r then raise Closed; + retry_read_ fd f + +let[@unroll 1] rec retry_write_ fd f = + match f () with + | res -> res + | exception + Unix.Unix_error + ( ( Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR | Unix.EINPROGRESS + | Unix.ECONNRESET ), + _, + _ ) -> + (* Trace_.message "write must wait"; *) + let ev = get_loop_exn_ () in + let trigger = Picos.Trigger.create () in + let closed_r = ref false in + Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r -> + closed_r := closed; + Picos.Trigger.signal trigger); + Picos.Trigger.await trigger |> unwrap_; + if !closed_r then raise Closed; + retry_write_ fd f + +let read fd buf i len : int = + try + retry_read_ fd (fun () -> + (* Trace_.message "read"; *) + Unix.read fd buf i len) + with Closed -> 0 + +let close fd = + Unix.close fd; + let ev = get_loop_exn_ () in + Nanoev.close ev fd + +let accept fd = + try + retry_read_ fd (fun () -> + (* Trace_.message "accept"; *) + Unix.accept fd) + with Unix.Unix_error ((Unix.ESHUTDOWN | Unix.ECONNABORTED), _, _) -> + raise Closed + +let write_once fd buf i len : int = + try + retry_write_ fd (fun () -> + (* Trace_.message "write"; *) + Unix.write fd buf i len) + with Closed -> 0 + +let rec write fd buf i len = + if len > 0 then ( + let n = write_once fd buf i len in + if n < len then write fd buf (i + n) (len - n) + ) + +let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr) + +let[@inline] max_fds () = + match Atomic.get Global_.st with + | None -> 1024 + | Some st -> Nanoev.max_fds st.nanoev + +let sleep t = + if t > 0. then ( + let ev = get_loop_exn_ () in + let trigger = Picos.Trigger.create () in + Nanoev.run_after_s ev t trigger () (fun trigger () -> + Picos.Trigger.signal trigger); + Picos.Trigger.await trigger |> unwrap_ + ) + +module Raw = struct + let retry_read = retry_read_ + let retry_write = retry_write_ +end diff --git a/src/picos/base.mli b/src/picos/base.mli new file mode 100644 index 0000000..7b949e9 --- /dev/null +++ b/src/picos/base.mli @@ -0,0 +1,37 @@ +val read : Unix.file_descr -> bytes -> int -> int -> int +(** Read from the non blocking FD. + @raise Nanoev.Closed if the FD is closed + @raise Unix.Unix_error for other errors *) + +val write_once : Unix.file_descr -> bytes -> int -> int -> int +(** Write into the non blocking FD. + @raise Nanoev.Closed if the FD is closed + @raise Unix.Unix_error for other errors *) + +val write : Unix.file_descr -> bytes -> int -> int -> unit + +val close : Unix.file_descr -> unit +(** Close the file descriptor + @raise Unix.Unix_error when it fails *) + +val connect : Unix.file_descr -> Unix.sockaddr -> unit +(** Connect this FD to the remote address. + @raise Nanoev.Closed if the FD is closed. + @raise Unix.Unix_error for other errors *) + +val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr +(** Accept a connection on this fd. + @raise Nanoev.Closed if the FD is closed. + @raise Unix.Unix_error for other errors *) + +val max_fds : unit -> int +(** Maximum number of file descriptors one can await on. See {!Nanoev.max_fds} +*) + +val sleep : float -> unit +(** Suspend current fiber for [n] seconds *) + +module Raw : sig + val retry_read : Unix.file_descr -> (unit -> 'a) -> 'a + val retry_write : Unix.file_descr -> (unit -> 'a) -> 'a +end diff --git a/src/picos/common_.ml b/src/picos/common_.ml new file mode 100644 index 0000000..d35dd22 --- /dev/null +++ b/src/picos/common_.ml @@ -0,0 +1,6 @@ +module Trace_ = Nanoev.Trace_ + +let ( let@ ) = ( @@ ) +let _default_buf_size = 4 * 1024 + +exception Closed = Nanoev.Closed diff --git a/src/picos/dune b/src/picos/dune index db9792d..de66efd 100644 --- a/src/picos/dune +++ b/src/picos/dune @@ -1,5 +1,4 @@ (library (name nanoev_picos) - (public_name nanoev.picos) - (optional) ; picos - (libraries threads picos nanoev)) + (public_name nanoev-picos) + (libraries threads picos picos_std.sync iostream nanoev)) diff --git a/src/picos/global_.ml b/src/picos/global_.ml new file mode 100644 index 0000000..2e590b8 --- /dev/null +++ b/src/picos/global_.ml @@ -0,0 +1,59 @@ +open Common_ + +type st = + | None + | Some of { + active: bool Atomic.t; + nanoev: Nanoev.t; + th: Thread.t; + } + +let st : st Atomic.t = Atomic.make None +let lock = Mutex.create () + +let with_lock lock f = + Mutex.lock lock; + match f () with + | exception e -> + Mutex.unlock lock; + raise e + | x -> + Mutex.unlock lock; + x + +let bg_thread_ ~active ~evloop () : unit = + Trace_.set_thread_name "nanoev.picos.bg-thread"; + while Atomic.get active do + Nanoev.step evloop + done + +let[@inline] has_bg_thread () = Atomic.get st <> None + +let setup_bg_thread (ev : Nanoev.t) : unit = + let@ () = with_lock lock in + (* shutdown existing thread, if any *) + (match Atomic.get st with + | Some st -> + Atomic.set st.active false; + Nanoev.wakeup_from_outside st.nanoev; + Thread.join st.th + | None -> ()); + + (* start new bg thread *) + let active = Atomic.make true in + Atomic.set st + @@ Some + { + active; + nanoev = ev; + th = Thread.create (bg_thread_ ~active ~evloop:ev) (); + } + +let shutdown_bg_thread () = + let@ () = with_lock lock in + match Atomic.exchange st None with + | None -> () + | Some st -> + Atomic.set st.active false; + Nanoev.wakeup_from_outside st.nanoev; + Thread.join st.th diff --git a/src/picos/nanoev_picos.ml b/src/picos/nanoev_picos.ml index 9920345..8029fda 100644 --- a/src/picos/nanoev_picos.ml +++ b/src/picos/nanoev_picos.ml @@ -1,146 +1,7 @@ -open struct - module Trace_ = Nanoev.Trace_ - - let ( let@ ) = ( @@ ) -end - -exception Closed = Nanoev.Closed - -module Global_ = struct - type st = - | None - | Some of { - active: bool Atomic.t; - nanoev: Nanoev.t; - th: Thread.t; - } - - let st : st Atomic.t = Atomic.make None - let lock = Mutex.create () - - let with_lock lock f = - Mutex.lock lock; - match f () with - | exception e -> - Mutex.unlock lock; - raise e - | x -> - Mutex.unlock lock; - x - - let bg_thread_ ~active ~evloop () : unit = - Trace_.set_thread_name "nanoev.picos.bg-thread"; - while Atomic.get active do - Nanoev.step evloop - done - - let[@inline] has_bg_thread () = Atomic.get st <> None - - let setup_bg_thread (ev : Nanoev.t) : unit = - let@ () = with_lock lock in - (* shutdown existing thread, if any *) - (match Atomic.get st with - | Some st -> - Atomic.set st.active false; - Nanoev.wakeup_from_outside st.nanoev; - Thread.join st.th - | None -> ()); - - (* start new bg thread *) - let active = Atomic.make true in - Atomic.set st - @@ Some - { - active; - nanoev = ev; - th = Thread.create (bg_thread_ ~active ~evloop:ev) (); - } -end - -let has_bg_thread = Global_.has_bg_thread -let setup_bg_thread = Global_.setup_bg_thread - -let[@inline] get_loop_exn_ () : Nanoev.t = - match Atomic.get Global_.st with - | None -> failwith "No nanoev loop installed." - | Some st -> st.nanoev - -let[@inline] unwrap_ = function - | None -> () - | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt - -let retry_read_ fd f = - let ev = get_loop_exn_ () in - let[@unroll 1] rec loop () = - match f () with - | res -> res - | exception - Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> - Trace_.message "read must wait"; - let trigger = Picos.Trigger.create () in - let closed_r = ref false in - Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r -> - closed_r := closed; - Picos.Trigger.signal trigger); - Picos.Trigger.await trigger |> unwrap_; - if !closed_r then raise Closed; - loop () - in - loop () - -let retry_write_ fd f = - let ev = get_loop_exn_ () in - let rec loop () = - match f () with - | res -> res - | exception - Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> - Trace_.message "write must wait"; - let trigger = Picos.Trigger.create () in - let closed_r = ref false in - Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r -> - closed_r := closed; - Picos.Trigger.signal trigger); - Picos.Trigger.await trigger |> unwrap_; - if !closed_r then raise Closed; - loop () - in - loop () - -let read fd buf i len : int = - try - retry_read_ fd (fun () -> - Trace_.message "read"; - Unix.read fd buf i len) - with Closed -> 0 - -let close fd = - Unix.close fd; - let ev = get_loop_exn_ () in - Nanoev.close ev fd - -let accept fd = - try - retry_read_ fd (fun () -> - Trace_.message "accept"; - Unix.accept fd) - with Unix.Unix_error ((Unix.ESHUTDOWN | Unix.ECONNABORTED), _, _) -> - raise Closed - -let write fd buf i len : int = - try - retry_write_ fd (fun () -> - Trace_.message "write"; - Unix.write fd buf i len) - with Closed -> 0 - -let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr) - -let sleep t = - if t > 0. then ( - let ev = get_loop_exn_ () in - let trigger = Picos.Trigger.create () in - Nanoev.run_after_s ev t trigger () (fun trigger () -> - Picos.Trigger.signal trigger); - Picos.Trigger.await trigger |> unwrap_ - ) +module Background_thread = Background_thread +module Base = Base +include Base +module IO_in = IO_in +module IO_out = IO_out +module Net_client = Net_client +module Net_server = Net_server diff --git a/src/picos/nanoev_picos.mli b/src/picos/nanoev_picos.mli index ef795bb..c89e3a5 100644 --- a/src/picos/nanoev_picos.mli +++ b/src/picos/nanoev_picos.mli @@ -1,32 +1,18 @@ (** Basic interface with picos *) -val setup_bg_thread : Nanoev.t -> unit -(** Install this event loop in a background thread *) - -val has_bg_thread : unit -> bool -(** [has_bg_thread ()] is [true] iff a background thread is running a nanoev loop *) +module Background_thread = Background_thread (** {2 Non blocking IO primitives} *) -val read : Unix.file_descr -> bytes -> int -> int -> int -(** Read from the non blocking FD. - @raise Nanoev.Closed if the FD is closed - @raise Unix.Unix_error for other errors *) +module Base = Base -val write : Unix.file_descr -> bytes -> int -> int -> int -(** Write into the non blocking FD. - @raise Nanoev.Closed if the FD is closed - @raise Unix.Unix_error for other errors *) +include module type of struct + include Base +end -val close : Unix.file_descr -> unit -(** Close the file descriptor - @raise Unix.Unix_error when it fails *) +(** {2 Building blocks on top of {!Base}} *) -val connect : Unix.file_descr -> Unix.sockaddr -> unit - -val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr -(** Accept a connection on this fd. - @raise Nanoev.Closed if the FD is closed. - @raise Unix.Unix_error for other errors *) - -val sleep : float -> unit +module IO_in = IO_in +module IO_out = IO_out +module Net_client = Net_client +module Net_server = Net_server diff --git a/src/picos/net_client.ml b/src/picos/net_client.ml new file mode 100644 index 0000000..20ec2c4 --- /dev/null +++ b/src/picos/net_client.ml @@ -0,0 +1,23 @@ +open Common_ + +let connect addr : Unix.file_descr = + let sock = Unix.socket (Unix.domain_of_sockaddr addr) Unix.SOCK_STREAM 0 in + Unix.set_nonblock sock; + (try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ()); + + (* connect asynchronously *) + Base.connect sock addr; + sock + +let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a = + let sock = connect addr in + + let ic = IO_in.of_unix_fd sock in + let oc = IO_out.of_unix_fd sock in + + let finally () = + (try Unix.shutdown sock Unix.SHUTDOWN_ALL with _ -> ()); + try Unix.close sock with _ -> () + in + let@ () = Fun.protect ~finally in + f ic oc diff --git a/src/picos/net_server.ml b/src/picos/net_server.ml new file mode 100644 index 0000000..642ba48 --- /dev/null +++ b/src/picos/net_server.ml @@ -0,0 +1,111 @@ +module Sem = Picos_std_sync.Semaphore.Counting + +type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit + +type t = { + active: bool Atomic.t; + sock: Unix.file_descr; + client_handler: client_handler; + spawn: (unit -> unit) -> unit Picos.Computation.t; + max_conns: int; + sem: Sem.t; + mutable running: unit Picos.Computation.t option; + exn_handler: exn -> Printexc.raw_backtrace -> unit; +} + +let[@inline] join (self : t) : unit = + Option.iter Picos.Computation.await self.running + +let[@inline] max_connections self = self.max_conns + +let[@inline] n_active_connections (self : t) : int = + self.max_conns - Sem.get_value self.sem + +let[@inline] running (self : t) : bool = Atomic.get self.active +let shutdown (self : t) = if Atomic.exchange self.active false then () + +open struct + let default_exn_handler exn bt = + Printf.eprintf "uncaught exception in network server: %s\n%s%!" + (Printexc.to_string exn) + (Printexc.raw_backtrace_to_string bt) + + let run (self : t) () : unit = + while Atomic.get self.active do + let client_sock, client_addr = Base.accept self.sock in + Sem.acquire self.sem; + + let cleanup () = + (try Unix.shutdown client_sock Unix.SHUTDOWN_ALL with _ -> ()); + (* TODO: close in nanoev too *) + (try Unix.close client_sock with _ -> ()); + Sem.release self.sem + in + + let comp : _ Picos.Computation.t = + self.spawn (fun () -> + let ic = IO_in.of_unix_fd client_sock in + let oc = IO_out.of_unix_fd client_sock in + try + self.client_handler client_addr ic oc; + cleanup () + with exn -> + let bt = Printexc.get_raw_backtrace () in + cleanup (); + self.exn_handler exn bt) + in + ignore (comp : _ Picos.Computation.t) + done +end + +let establish ?backlog ?max_connections ?(exn_handler = default_exn_handler) + ~spawn ~(client_handler : client_handler) addr : t = + let ev = + match Atomic.get Global_.st with + | Some { nanoev = ev; _ } -> ev + | None -> invalid_arg "Nanoev_picos.Net_server: no event loop installed" + in + + let max_connections = + match max_connections with + | None -> Nanoev.max_fds ev + | Some n -> min (Nanoev.max_fds ev) n + in + let sem = Sem.make max_connections in + + let backlog = + match backlog with + | Some n -> max 4 n + | None -> max 4 max_connections + in + + let domain = Unix.domain_of_sockaddr addr in + let sock = Unix.socket domain Unix.SOCK_STREAM 0 in + + Unix.bind sock addr; + Unix.listen sock backlog; + Unix.set_nonblock sock; + Unix.setsockopt sock Unix.SO_REUSEADDR true; + (try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ()); + + let server = + { + active = Atomic.make true; + max_conns = max_connections; + sem; + spawn; + sock; + client_handler; + running = None; + exn_handler; + } + in + + server.running <- Some (spawn (run server)); + server + +let with_ ?backlog ?max_connections ?exn_handler ~spawn ~client_handler addr f = + let server = + establish ?backlog ?max_connections ?exn_handler ~spawn ~client_handler addr + in + Fun.protect ~finally:(fun () -> shutdown server) (fun () -> f server) diff --git a/src/picos/net_server.mli b/src/picos/net_server.mli new file mode 100644 index 0000000..8f9cf62 --- /dev/null +++ b/src/picos/net_server.mli @@ -0,0 +1,37 @@ +type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit +type t + +val join : t -> unit +(** Wait for server to shutdown *) + +val shutdown : t -> unit +(** Ask the server to stop *) + +val running : t -> bool +val max_connections : t -> int +val n_active_connections : t -> int + +val establish : + ?backlog:int -> + ?max_connections:int -> + ?exn_handler:(exn -> Printexc.raw_backtrace -> unit) -> + spawn:((unit -> unit) -> unit Picos.Computation.t) -> + client_handler:client_handler -> + Unix.sockaddr -> + t +(** Create and start a new server on the given socket address. + @param spawn used to spawn a new computation per client + @param client_handler + the logic for talking to a client, will run in its own computation + @param backlog number of connections waiting in the listening socket + @param max_connections max number of simultaneous connections *) + +val with_ : + ?backlog:int -> + ?max_connections:int -> + ?exn_handler:(exn -> Printexc.raw_backtrace -> unit) -> + spawn:((unit -> unit) -> unit Picos.Computation.t) -> + client_handler:client_handler -> + Unix.sockaddr -> + (t -> 'a) -> + 'a diff --git a/src/posix/dune b/src/posix/dune new file mode 100644 index 0000000..7387ac6 --- /dev/null +++ b/src/posix/dune @@ -0,0 +1,6 @@ +(library + (name nanoev_posix) + (public_name nanoev-posix) + (synopsis "posix backend (poll/ppoll+mtime)") + (private_modules heap) + (libraries threads nanoev unix iomux mtime mtime.clock.os)) diff --git a/src/posix/heap.ml b/src/posix/heap.ml new file mode 100644 index 0000000..1a553d3 --- /dev/null +++ b/src/posix/heap.ml @@ -0,0 +1,61 @@ +type 'a tree = + | E + | N of int * 'a * 'a tree * 'a tree + +type 'a t = { + leq: 'a -> 'a -> bool; + mutable t: 'a tree; +} + +let create ~leq () : _ t = { leq; t = E } + +let[@inline] is_empty (self : _ t) = + match self.t with + | E -> true + | N _ -> false + +exception Empty + +open struct + (** Rank of the tree *) + let[@inline] rank_ = function + | E -> 0 + | N (r, _, _, _) -> r + + (** Make a balanced node labelled with [x], and subtrees [a] and [b]. We + ensure that the right child's rank is ≤ to the rank of the left child + (leftist property). The rank of the resulting node is the length of the + rightmost path. *) + let[@inline] mk_node_ x a b = + if rank_ a >= rank_ b then + N (rank_ b + 1, x, a, b) + else + N (rank_ a + 1, x, b, a) + + let rec merge ~leq t1 t2 = + match t1, t2 with + | t, E -> t + | E, t -> t + | N (_, x, a1, b1), N (_, y, a2, b2) -> + if leq x y then + mk_node_ x a1 (merge ~leq b1 t2) + else + mk_node_ y a2 (merge ~leq t1 b2) +end + +let clear self = self.t <- E + +let[@inline] insert (self : _ t) x : unit = + self.t <- merge ~leq:self.leq self.t (N (1, x, E, E)) + +let[@inline] peek_min_exn (self : _ t) = + match self.t with + | E -> raise Empty + | N (_, x, _, _) -> x + +let[@inline] pop_min_exn (self : _ t) = + match self.t with + | E -> raise Empty + | N (_, x, l, r) -> + self.t <- merge ~leq:self.leq l r; + x diff --git a/src/posix/heap.mli b/src/posix/heap.mli new file mode 100644 index 0000000..3efd4c4 --- /dev/null +++ b/src/posix/heap.mli @@ -0,0 +1,13 @@ +type 'a t + +val create : leq:('a -> 'a -> bool) -> unit -> 'a t + +val is_empty : _ t -> bool +(** [is_empty h] returns [true] if the heap [h] is empty. *) + +exception Empty + +val clear : _ t -> unit +val insert : 'a t -> 'a -> unit +val peek_min_exn : 'a t -> 'a +val pop_min_exn : 'a t -> 'a diff --git a/src/posix/nanoev_posix.ml b/src/posix/nanoev_posix.ml new file mode 100644 index 0000000..2e9a5ca --- /dev/null +++ b/src/posix/nanoev_posix.ml @@ -0,0 +1,403 @@ +open struct + module Trace_ = Nanoev.Trace_ + + let ( let@ ) = ( @@ ) + let now_ns : unit -> int64 = Mtime_clock.now_ns + let[@inline] ns_of_s (t : float) : int64 = Int64.of_float (t *. 1e9) + let[@inline] ns_to_s (t : int64) : float = Int64.to_float t /. 1e9 +end + +module Fd_tbl = Hashtbl.Make (struct + open Iomux.Util + + type t = Unix.file_descr + + let equal a b = Int.equal (fd_of_unix a) (fd_of_unix b) + let hash a = Hashtbl.hash (fd_of_unix a) +end) + +module P = Iomux.Poll +module Flags = P.Flags + +module Sync_queue = struct + type 'a t = { + q: 'a Queue.t; + mutex: Mutex.t; + } + + let create () : _ t = { q = Queue.create (); mutex = Mutex.create () } + + let push (self : _ t) x : unit = + Mutex.lock self.mutex; + Queue.push x self.q; + Mutex.unlock self.mutex + + let transfer (self : _ t) q : unit = + Mutex.lock self.mutex; + Queue.transfer self.q q; + Mutex.unlock self.mutex +end + +(** Callback list *) +type cbs = + | Nil + | Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs + +type timer_ev = + | Timer : { + deadline: int64; + x: 'a; + y: 'b; + f: 'a -> 'b -> unit; + } + -> timer_ev + +type fd_data = { + fd: Unix.file_descr; + mutable idx: int; + (** Index in the poll buffer. Mutable because we might change it when we + swap FDs to remove items. *) + mutable r: cbs; + mutable w: cbs; +} +(** Data associated to a given FD *) + +let[@inline] fd_flags (self : fd_data) : Flags.t = + let fl = ref Flags.empty in + (if self.r != Nil then fl := Flags.(!fl + pollin)); + (if self.w != Nil then fl := Flags.(!fl + pollout)); + !fl + +type queued_task = + | Q_run_after of timer_ev + | Q_on_readable : + Unix.file_descr * 'a * 'b * (closed:bool -> 'a -> 'b -> unit) + -> queued_task + | Q_on_writable : + Unix.file_descr * 'a * 'b * (closed:bool -> 'a -> 'b -> unit) + -> queued_task + | Q_clear + | Q_close of Unix.file_descr + +type st = { + timer: timer_ev Heap.t; + fds: fd_data Fd_tbl.t; + poll: P.t; + mutable len: int; (** length of the active prefix of the [poll] buffer *) + wakeup_rd: Unix.file_descr; + wakeup_wr: Unix.file_descr; + wakeup_triggered: bool Atomic.t; + (** Make [wakeup_from_outside] idempotent within an iteration of [step] *) + in_poll: bool Atomic.t; + (** Are we currently inside a call to [poll], and in which thread? Useful + for other threads to know whether to wake us up via the pipe *) + mutable owner_thread: int; + (** Thread allowed to perform operations on this poll instance. Starts at + [-1]. *) + queued_tasks: queued_task Sync_queue.t; + (** While in [poll()], changes get queued, so we don't invalidate the poll + buffer before the syscall returns *) +} + +let[@inline] queue_task_ (self : st) t : unit = + Sync_queue.push self.queued_tasks t + +(** [true] if called from the owner thread *) +let[@inline] in_owner_thread (self : st) : bool = + self.owner_thread != -1 && self.owner_thread == Thread.(id (self ())) + +let[@inline] in_poll (self : st) : bool = Atomic.get self.in_poll +let[@inline] leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline + +let create_st () : st = + let wakeup_rd, wakeup_wr = Unix.pipe () in + (* reading end must be non blocking so it's not always immediately + ready; writing end is blocking to make it simpler to wakeup from other + threads *) + Unix.set_nonblock wakeup_rd; + let self = + { + timer = Heap.create ~leq:leq_timer (); + fds = Fd_tbl.create 16; + poll = P.create (); + len = 0; + wakeup_rd; + wakeup_wr; + wakeup_triggered = Atomic.make false; + in_poll = Atomic.make false; + owner_thread = -1; + queued_tasks = Sync_queue.create (); + } + in + + (* always watch for the pipe being readable *) + P.set_index self.poll 0 self.wakeup_rd Flags.pollin; + self.len <- 1; + + self + +let max_fds (self : st) : int = P.maxfds self.poll + +let[@inline never] wakeup_real_ (self : st) : unit = + let@ _sp = + Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.wakeup-from-outside" + in + let b = Bytes.make 1 '!' in + ignore (Unix.write self.wakeup_wr b 0 1 : int) + +let[@inline] wakeup_ (self : st) : unit = + if not (Atomic.exchange self.wakeup_triggered true) then wakeup_real_ self + +let wakeup_from_outside (self : st) : unit = + let already_awake = + (* to avoid race conditions we only take the shortcut if + this is called from the owner thread *) + in_owner_thread self && not (Atomic.get self.in_poll) + in + if not already_awake then wakeup_ self + +let rec perform_cbs ~closed = function + | Nil -> () + | Sub (x, y, f, tail) -> + f ~closed x y; + perform_cbs ~closed tail + +(** Change the event loop right now. This must be called only from the owner + thread and outside of [poll]. *) +module Run_now_ = struct + let rec perform_cbs_closed ~closed = function + | Nil -> () + | Sub (x, y, f, tail) -> + f ~closed x y; + perform_cbs_closed ~closed tail + + let clear_ (self : st) : unit = + Heap.clear self.timer; + Fd_tbl.clear self.fds; + for i = 0 to P.maxfds self.poll - 1 do + P.set_index self.poll i P.invalid_fd Flags.empty + done; + Atomic.set self.wakeup_triggered false; + self.len <- 0; + () + + let get_fd_ (self : st) fd : fd_data = + (* assert (in_owner_thread self && not (in_poll self)); *) + match Fd_tbl.find self.fds fd with + | per_fd -> per_fd + | exception Not_found -> + let idx = + if self.len = P.maxfds self.poll then + invalid_arg "No available slot in poll"; + let n = self.len in + self.len <- self.len + 1; + n + in + let per_fd = { idx; fd; r = Nil; w = Nil } in + Fd_tbl.add self.fds fd per_fd; + per_fd + + let remove_fd_ (self : st) (fd_data : fd_data) : unit = + Fd_tbl.remove self.fds fd_data.fd; + P.set_index self.poll fd_data.idx P.invalid_fd Flags.empty; + + (* assert (in_owner_thread self && not (in_poll self)); *) + if fd_data.idx > 0 && fd_data.idx + 1 < self.len then ( + (* not the last element nor the first (pipe_rd), move the last element + here to keep the buffer non sparse *) + let last_fd = P.get_fd self.poll (self.len - 1) in + assert (last_fd <> fd_data.fd); + match Fd_tbl.find_opt self.fds last_fd with + | None -> assert false + | Some last_fd_data -> + (* move the last FD to [idx] *) + last_fd_data.idx <- fd_data.idx; + P.set_index self.poll fd_data.idx last_fd (fd_flags last_fd_data) + ); + + self.len <- self.len - 1; + () + + let close_ (self : st) fd : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in + let r, w = + match Fd_tbl.find self.fds fd with + | fd_data -> + remove_fd_ self fd_data; + fd_data.r, fd_data.w + | exception Not_found -> Nil, Nil + in + perform_cbs_closed ~closed:true r; + perform_cbs_closed ~closed:true w; + () + + let on_readable_ self fd x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in + let fd_data = get_fd_ self fd in + fd_data.r <- Sub (x, y, f, fd_data.r); + P.set_index self.poll fd_data.idx fd (fd_flags fd_data) + + let on_writable_ self fd x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in + let fd_data = get_fd_ self fd in + fd_data.w <- Sub (x, y, f, fd_data.w); + P.set_index self.poll fd_data.idx fd (fd_flags fd_data) + + let run_after_s_ self ev : unit = Heap.insert self.timer ev + + let perform_task_ self (t : queued_task) : unit = + match t with + | Q_run_after t -> run_after_s_ self t + | Q_on_readable (fd, x, y, f) -> on_readable_ self fd x y f + | Q_on_writable (fd, x, y, f) -> on_writable_ self fd x y f + | Q_clear -> clear_ self + | Q_close fd -> close_ self fd +end + +let clear (self : st) = + if in_owner_thread self && not (in_poll self) then + Run_now_.clear_ self + else ( + queue_task_ self @@ Q_clear; + wakeup_from_outside self + ) + +let close (self : st) fd : unit = + if in_owner_thread self && not (in_poll self) then + Run_now_.close_ self fd + else ( + queue_task_ self @@ Q_close fd; + wakeup_from_outside self + ) + +let on_readable self fd x y f : unit = + if in_owner_thread self && not (in_poll self) then + Run_now_.on_readable_ self fd x y f + else ( + queue_task_ self @@ Q_on_readable (fd, x, y, f); + wakeup_from_outside self + ) + +let on_writable self fd x y f : unit = + if in_owner_thread self && not (in_poll self) then + Run_now_.on_writable_ self fd x y f + else ( + queue_task_ self @@ Q_on_writable (fd, x, y, f); + wakeup_from_outside self + ) + +let run_after_s self (time : float) x y f : unit = + let deadline = Int64.add (now_ns ()) (ns_of_s time) in + let ev = Timer { deadline; x; y; f } in + if in_owner_thread self && not (in_poll self) then + Run_now_.run_after_s_ self ev + else ( + queue_task_ self @@ Q_run_after ev; + wakeup_from_outside self + ) + +let next_deadline_ (self : st) : int64 option = + match Heap.peek_min_exn self.timer with + | exception Heap.Empty -> None + | Timer t -> Some t.deadline + +let step (self : st) : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.posix.step" in + + self.owner_thread <- Thread.(id (self ())); + let now = now_ns () in + let timeout_ns : int64 = + match next_deadline_ self with + | None -> 30_000_000_000L + | Some d -> Int64.max 0L (Int64.sub d now) + in + + (* run timers *) + while + if Heap.is_empty self.timer then + false + else ( + let (Timer t) = Heap.peek_min_exn self.timer in + if t.deadline <= now then ( + ignore (Heap.pop_min_exn self.timer : timer_ev); + t.f t.x t.y; + true + ) else + false + ) + do + () + done; + + (* process all queued tasks. + + NOTE: race condition: if another thread queues tasks after we do + the transfer, it will call [wakeup_from_outside] and make the pipe_rd FD + readable. So as soon as we call [poll], it will return and we will find + the queued tasks waiting for us. *) + let local_q = Queue.create () in + Sync_queue.transfer self.queued_tasks local_q; + while not (Queue.is_empty local_q) do + let t = Queue.pop local_q in + Run_now_.perform_task_ self t + done; + + Atomic.set self.in_poll true; + + (* enter [poll] *) + let num_ready_fds = + let@ _sp = + Trace_.with_span ~__FILE__ ~__LINE__ "poll" ~data:(fun () -> + [ "timeout", `Float (ns_to_s timeout_ns); "len", `Int self.len ]) + in + P.ppoll_or_poll self.poll self.len (Nanoseconds timeout_ns) + in + + Atomic.set self.in_poll false; + + (* drain notification pipe *) + if Atomic.exchange self.wakeup_triggered false then ( + let b1 = Bytes.create 8 in + while try Unix.read self.wakeup_rd b1 0 8 > 0 with _ -> false do + () + done + ); + + (* call callbacks *) + P.iter_ready self.poll num_ready_fds (fun _idx fd flags -> + if fd <> self.wakeup_rd then ( + let fd_data = + try Fd_tbl.find self.fds fd with Not_found -> assert false + in + + if Flags.mem Flags.pollin flags then ( + let r = fd_data.r in + fd_data.r <- Nil; + perform_cbs ~closed:false r + ); + if Flags.mem Flags.pollout flags then ( + let w = fd_data.w in + fd_data.w <- Nil; + perform_cbs ~closed:false w + ); + + if Flags.empty = fd_flags fd_data then Run_now_.remove_fd_ self fd_data + )); + + () + +let ops : st Nanoev.Impl.ops = + { + step; + close; + on_readable; + on_writable; + run_after_s; + max_fds; + wakeup_from_outside; + clear; + } + +include Nanoev + +let create () : t = Impl.build ops (create_st ()) diff --git a/src/posix/nanoev_posix.mli b/src/posix/nanoev_posix.mli new file mode 100644 index 0000000..60e88ee --- /dev/null +++ b/src/posix/nanoev_posix.mli @@ -0,0 +1,8 @@ +(** Nano event loop using Poll/Ppoll *) + +include module type of struct + include Nanoev +end + +val create : unit -> t +(** Create a new nanoev loop using [Iomux] (poll/ppoll). *) diff --git a/src/tiny_httpd/dune b/src/tiny_httpd/dune index a372e9d..12a3c11 100644 --- a/src/tiny_httpd/dune +++ b/src/tiny_httpd/dune @@ -5,6 +5,7 @@ threads picos (re_export nanoev) - nanoev.picos + (re_export nanoev-picos) + picos_std.sync (re_export iostream) (re_export tiny_httpd))) diff --git a/src/tiny_httpd/nanoev_tiny_httpd.ml b/src/tiny_httpd/nanoev_tiny_httpd.ml index 7369fde..b637122 100644 --- a/src/tiny_httpd/nanoev_tiny_httpd.ml +++ b/src/tiny_httpd/nanoev_tiny_httpd.ml @@ -5,45 +5,8 @@ module Slice = Iostream.Slice module Pool = TH.Pool module Buf = TH.Buf -let unwrap_ = function - | None -> () - | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt - +module Sem_ = Picos_std_sync.Semaphore.Counting (** Non blocking semaphore *) -module Sem_ = struct - type t = { - mutable n: int; - max: int; - waiting: Picos.Trigger.t Queue.t; - mutex: Mutex.t; - } - - let create n = - if n <= 0 then invalid_arg "Semaphore.create"; - { n; max = n; mutex = Mutex.create (); waiting = Queue.create () } - - let acquire self = - Mutex.lock self.mutex; - while self.n = 0 do - let tr = Picos.Trigger.create () in - Queue.push tr self.waiting; - Mutex.unlock self.mutex; - let res = Picos.Trigger.await tr in - unwrap_ res; - Mutex.lock self.mutex - done; - assert (self.n > 0); - self.n <- self.n - 1; - Mutex.unlock self.mutex - - let release self = - Mutex.lock self.mutex; - self.n <- self.n + 1; - Option.iter Picos.Trigger.signal (Queue.take_opt self.waiting); - Mutex.unlock self.mutex - - let num_acquired self = self.max - self.n -end module Out = struct open Iostream @@ -60,7 +23,7 @@ module Out = struct let i = ref i in let len = ref len0 in while !len > 0 do - match EV.write fd bs !i !len with + match EV.write_once fd bs !i !len with | 0 -> failwith "write failed" | n -> i := !i + n; @@ -147,7 +110,7 @@ module Unix_tcp_server_ = struct new_thread: (unit -> unit) -> unit; timeout: float; masksigpipe: bool; - mutable running: bool; (* TODO: use an atomic? *) + running: bool Atomic.t; } let shutdown_silent_ fd = @@ -183,7 +146,7 @@ module Unix_tcp_server_ = struct let inet_addr = Unix.inet_addr_of_string self.addr in Unix.setsockopt sock Unix.SO_REUSEADDR true; Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port)); - let n_listen = 2 * self.max_connections in + let n_listen = self.max_connections in Unix.listen sock n_listen ); @@ -191,10 +154,16 @@ module Unix_tcp_server_ = struct let tcp_server = { - TH.IO.TCP_server.stop = (fun () -> self.running <- false); - running = (fun () -> self.running); + TH.IO.TCP_server.stop = + (fun () -> + Atomic.set self.running false; + + (* close accept socket so the main loop will return *) + try Unix.close sock with _ -> ()); + running = (fun () -> Atomic.get self.running); active_connections = - (fun () -> Sem_.num_acquired self.sem_max_connections); + (fun () -> + self.max_connections - Sem_.get_value self.sem_max_connections); endpoint = (fun () -> let addr, port = get_addr_ sock in @@ -233,7 +202,7 @@ module Unix_tcp_server_ = struct in Unix.set_nonblock sock; - while self.running do + while Atomic.get self.running do match EV.accept sock with | client_sock, client_addr -> (* limit concurrency *) @@ -272,7 +241,7 @@ module Unix_tcp_server_ = struct done; (* Wait for all threads to be done: this only works if all threads are done. *) - Unix.close sock; + (try Unix.close sock with _ -> ()); (* TODO? *) (* Sem_.acquire self.sem_max_connections.max self.sem_max_connections; *) ()); @@ -281,7 +250,7 @@ end open struct let get_max_connection_ ?(max_connections = 2048) () : int = - let max_connections = max 4 max_connections in + let max_connections = min (max 4 @@ EV.max_fds ()) max_connections in max_connections let clear_slice (slice : Slice.t) = @@ -290,16 +259,22 @@ open struct slice.len <- 0 end -let create ?(masksigpipe = not Sys.win32) ?max_connections ?(timeout = 0.0) - ?buf_size ?(get_time_s = Unix.gettimeofday) ?(addr = "127.0.0.1") - ?(port = 8080) ?sock ?middlewares ~new_thread () : TH.Server.t = +let create ?(masksigpipe = not Sys.win32) ?max_connections ?max_buf_pool_size + ?(timeout = 0.0) ?buf_size ?(get_time_s = Unix.gettimeofday) + ?(addr = "127.0.0.1") ?(port = 8080) ?sock ?middlewares ~new_thread () : + TH.Server.t = let max_connections = get_max_connection_ ?max_connections () in + let max_pool_size = + match max_buf_pool_size with + | None -> min 4096 max_connections * 2 + | Some m -> m + in let server = { Unix_tcp_server_.addr; new_thread; buf_pool = - Pool.create ~clear:Buf.clear_and_zero + Pool.create ~clear:Buf.clear_and_zero ~max_size:max_pool_size ~mk_item:(fun () -> Buf.create ?size:buf_size ()) (); slice_pool = @@ -308,11 +283,11 @@ let create ?(masksigpipe = not Sys.win32) ?max_connections ?(timeout = 0.0) (let buf_size = Option.value buf_size ~default:4096 in fun () -> Slice.create buf_size) (); - running = true; + running = Atomic.make true; port; sock; max_connections; - sem_max_connections = Sem_.create max_connections; + sem_max_connections = Sem_.make max_connections; masksigpipe; timeout; } diff --git a/src/tiny_httpd/nanoev_tiny_httpd.mli b/src/tiny_httpd/nanoev_tiny_httpd.mli index f0cd9af..7c858e5 100644 --- a/src/tiny_httpd/nanoev_tiny_httpd.mli +++ b/src/tiny_httpd/nanoev_tiny_httpd.mli @@ -3,6 +3,7 @@ module TH = Tiny_httpd_core val create : ?masksigpipe:bool -> ?max_connections:int -> + ?max_buf_pool_size:int -> ?timeout:float -> ?buf_size:int -> ?get_time_s:(unit -> float) -> diff --git a/src/unix/dune b/src/unix/dune index 4f14854..3c0fe50 100644 --- a/src/unix/dune +++ b/src/unix/dune @@ -2,4 +2,5 @@ (name nanoev_unix) (public_name nanoev.unix) (synopsis "Unix/select backend") + (private_modules heap) (libraries nanoev unix)) diff --git a/src/unix/heap.ml b/src/unix/heap.ml index a9a9c9e..1a553d3 100644 --- a/src/unix/heap.ml +++ b/src/unix/heap.ml @@ -22,10 +22,10 @@ open struct | E -> 0 | N (r, _, _, _) -> r - (** Make a balanced node labelled with [x], and subtrees [a] and [b]. - We ensure that the right child's rank is ≤ to the rank of the - left child (leftist property). The rank of the resulting node - is the length of the rightmost path. *) + (** Make a balanced node labelled with [x], and subtrees [a] and [b]. We + ensure that the right child's rank is ≤ to the rank of the left child + (leftist property). The rank of the resulting node is the length of the + rightmost path. *) let[@inline] mk_node_ x a b = if rank_ a >= rank_ b then N (rank_ b + 1, x, a, b) diff --git a/src/unix/nanoev_unix.ml b/src/unix/nanoev_unix.ml index 7d656c1..17b2088 100644 --- a/src/unix/nanoev_unix.ml +++ b/src/unix/nanoev_unix.ml @@ -174,17 +174,35 @@ let next_deadline_ (self : st) : float option = let step (self : st) : unit = let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.unix.step" in (* gather the subscriptions and timeout *) + let now = now_ () in let timeout, sub_r, sub_w = let@ self = with_lock_ self in recompute_if_needed self; let timeout = match next_deadline_ self with | None -> 30. - | Some d -> max 0. (d -. now_ ()) + | Some d -> max 0. (d -. now) in timeout, self.sub_r, self.sub_w in + (* run timers *) + while + if Heap.is_empty self.timer then + false + else ( + let (Timer t) = Heap.peek_min_exn self.timer in + if t.deadline <= now then ( + ignore (Heap.pop_min_exn self.timer : timer_ev); + t.f t.x t.y; + true + ) else + false + ) + do + () + done; + (* enter [select] *) Atomic.set self.in_select true; let r_reads, r_writes, _ = @@ -243,12 +261,16 @@ let step (self : st) : unit = () +(* limit for select is fixed and known *) +let max_fds _ = 1024 + let ops : st Nanoev.Impl.ops = { step; close; on_readable; on_writable; + max_fds; run_after_s; wakeup_from_outside; clear; diff --git a/tests/posix/dune b/tests/posix/dune new file mode 100644 index 0000000..08e8e1a --- /dev/null +++ b/tests/posix/dune @@ -0,0 +1,3 @@ +(tests + (names t1) + (libraries nanoev nanoev-posix threads)) diff --git a/tests/posix/echo/README.md b/tests/posix/echo/README.md new file mode 100644 index 0000000..0fdef34 --- /dev/null +++ b/tests/posix/echo/README.md @@ -0,0 +1,6 @@ + +notes about system limits in Linux: +- `ulimit -n 100000` will raise the max number of FDs for a process to 100000 +- `/proc/sys/net/core/netdev_max_backlog` controls the kernel backlog size, raise it (default is 1000) +- `/proc/sys/net/core/somaxconn` is the max size of a socket backlog (as given to `listen()`), raise it (default is 4096) + diff --git a/tests/posix/echo/dune b/tests/posix/echo/dune new file mode 100644 index 0000000..21b26c2 --- /dev/null +++ b/tests/posix/echo/dune @@ -0,0 +1,4 @@ +(executables + (names echo_server echo_client) + (libraries moonpool moonpool.fib nanoev-picos nanoev-posix iostream + trace.core trace-tef)) diff --git a/tests/posix/echo/echo_client.ml b/tests/posix/echo/echo_client.ml new file mode 100644 index 0000000..859dec1 --- /dev/null +++ b/tests/posix/echo/echo_client.ml @@ -0,0 +1,139 @@ +module Trace = Trace_core +module F = Moonpool_fib +module IO = Nanoev_picos +module Sem = Picos_std_sync.Semaphore.Counting + +[@@@ocaml.alert "-deprecated"] + +let ( let@ ) = ( @@ ) +let spf = Printf.sprintf +let pf = Printf.printf +let verbose = ref false +let reset_line = "\x1b[2K\r" +let n_loops_per_task = 100 + +let main ~runner:_ ~port ~unix_sock ~n ~n_conn () = + Sys.set_signal Sys.sigpipe Sys.Signal_ignore; + + pf "connect on %s n=%d n_conn=%d\n%!" + (if unix_sock = "" then + spf "localhost:%d" port + else + spf "unix:%S" unix_sock) + n n_conn; + + let addr = + if unix_sock = "" then + Unix.ADDR_INET (Unix.inet_addr_loopback, port) + else + Unix.ADDR_UNIX unix_sock + in + + Printf.printf "connecting to port %d\n%!" port; + + let all_done = Atomic.make false in + let n_queries = Atomic.make 0 in + + (* limit simultaneous number of connections *) + let sem = Sem.make n_conn in + let n_active_conns = Atomic.make 0 in + + let progress_loop () = + while not (Atomic.get all_done) do + let n_queries = Atomic.get n_queries in + let n_conns = Atomic.get n_active_conns in + + (* progress *) + Printf.printf "%sdone %d queries, %d active connections%!" reset_line + n_queries n_conns; + + Trace.counter_int ~level:Info "n-conns" n_conns; + Trace.counter_int ~level:Info "n-queries" n_queries; + let gc = Gc.quick_stat () in + Trace.counter_int ~level:Info "gc.major" gc.major_collections; + Trace.counter_int ~level:Info "gc.minor" gc.minor_collections; + Trace.counter_int ~level:Info "gc.heap-size" (gc.heap_words * 64); + + Thread.delay 0.2 + done + in + + ignore (Thread.create progress_loop () : Thread.t); + + let run_task () = + let _task_sp = + Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "run-task" + in + Sem.acquire sem; + ( IO.Net_client.with_connect addr @@ fun ic oc -> + Atomic.incr n_active_conns; + let buf = Bytes.create 32 in + + for _j = 1 to n_loops_per_task do + (*let _sp = + Trace.enter_manual_sub_span ~parent:_task_sp ~__FILE__ ~__LINE__ + "write.loop" ~data:(fun () -> [ "iter", `Int _j ]) + in*) + Atomic.incr n_queries; + + Iostream.Out.output_string oc "hello"; + Iostream.Out_buf.flush oc; + + (* read back what we wrote *) + Iostream.In.really_input ic buf 0 (String.length "hello"); + (* Trace.exit_manual_span _sp; *) + F.yield () + done; + + Atomic.decr n_active_conns; + Sem.release sem ); + + Trace.exit_manual_span _task_sp + in + + let t_start = Mtime_clock.now () in + + (* start the first [n_conn] tasks *) + let fibers = List.init (n * n_conn) (fun _ -> F.spawn run_task) in + List.iter F.await fibers; + Atomic.set all_done true; + + let t_stop = Mtime_clock.now () in + let elapsed_s = + (Mtime.span t_start t_stop |> Mtime.Span.to_uint64_ns |> Int64.to_float) + *. 1e-9 + in + + (* exit when [fut_exit] is resolved *) + Printf.printf + "%sdone with main (time=%.4fs, n queries=%d, expect=%d, %.3f req/s)\n%!" + reset_line elapsed_s (Atomic.get n_queries) + (n * n_conn * n_loops_per_task) + (float (Atomic.get n_queries) /. elapsed_s) + +let () = + let@ () = Trace_tef.with_setup () in + Trace.set_current_level Info; + Trace.set_thread_name "main"; + + let port = ref 1234 in + let unix_sock = ref "" in + let n = ref 1000 in + let n_conn = ref 20 in + let opts = + [ + "-p", Arg.Set_int port, " port"; + "-v", Arg.Set verbose, " verbose"; + "-n", Arg.Set_int n, " number of iterations"; + "--unix", Arg.Set_string unix_sock, " unix socket"; + "--n-conn", Arg.Set_int n_conn, " number of simultaneous connections"; + ] + |> Arg.align + in + Arg.parse opts ignore "echo_client"; + + let@ () = + Nanoev_picos.Background_thread.with_setup (Nanoev_posix.create ()) + in + F.main @@ fun runner -> + main ~runner ~port:!port ~unix_sock:!unix_sock ~n:!n ~n_conn:!n_conn () diff --git a/tests/posix/echo/echo_server.ml b/tests/posix/echo/echo_server.ml new file mode 100644 index 0000000..7ed110b --- /dev/null +++ b/tests/posix/echo/echo_server.ml @@ -0,0 +1,118 @@ +module F = Moonpool_fib +module IO = Nanoev_picos +module Trace = Trace_core + +[@@@ocaml.alert "-deprecated"] + +let ( let@ ) = ( @@ ) +let pf = Printf.printf +let spf = Printf.sprintf +let verbose = ref false +let n_reply_response = Atomic.make 0 + +let str_of_sockaddr = function + | Unix.ADDR_UNIX s -> s + | Unix.ADDR_INET (addr, port) -> + spf "%s:%d" (Unix.string_of_inet_addr addr) port + +let main ~port ~unix_sock ~max_conns ~runner () = + Sys.set_signal Sys.sigpipe Sys.Signal_ignore; + + pf "serve on %s\n%!" + (if unix_sock = "" then + spf "localhost:%d" port + else + spf "unix:%S" unix_sock); + + let addr = + if unix_sock = "" then + Unix.ADDR_INET (Unix.inet_addr_loopback, port) + else ( + (* remove leftover unix socket file, if any *) + (try Sys.remove unix_sock with _ -> ()); + Unix.ADDR_UNIX unix_sock + ) + in + + let server = + IO.Net_server.establish ?max_connections:max_conns addr + ~spawn:(fun f -> Moonpool.spawn ~on:runner f) + ~client_handler:(fun client_addr ic oc -> + let _sp = + Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "serve" + in + + if !verbose then + pf "handle client on %s\n%!" (str_of_sockaddr client_addr); + + let buf = Bytes.create 256 in + let continue = ref true in + while !continue do + match Iostream.In.input ic buf 0 (Bytes.length buf) with + | exception exn -> + continue := false; + Printf.eprintf "error in client handler: %s\n%!" + (Printexc.to_string exn) + | 0 -> continue := false + | n -> + Atomic.incr n_reply_response; + Iostream.Out.output oc buf 0 n; + Iostream.Out_buf.flush oc; + Picos.Fiber.yield () + done; + + Trace.exit_manual_span _sp; + if !verbose then + pf "done with client on %s\n%!" (str_of_sockaddr client_addr)) + in + + Printf.printf "max number of connections: %d\n%!" + (IO.Net_server.max_connections server); + + if Trace.enabled () then + ignore + (Thread.create + (fun () -> + while IO.Net_server.running server do + Trace.counter_int ~level:Info "n-conns" + (IO.Net_server.n_active_connections server); + let gc = Gc.quick_stat () in + Trace.counter_int ~level:Info "gc.major" gc.major_collections; + Trace.counter_int ~level:Info "gc.minor" gc.minor_collections; + Trace.counter_int ~level:Info "n-reply-response" + (Atomic.get n_reply_response); + Trace.counter_int ~level:Info "gc.heap-size" (gc.heap_words * 64); + + Thread.delay 0.2 + done) + () + : Thread.t); + + IO.Net_server.join server; + IO.Net_server.shutdown server; + print_endline "exit" + +let () = + let@ () = Trace_tef.with_setup () in + Trace.set_current_level Info; + let port = ref 1234 in + let unix_sock = ref "" in + let max_conns = ref None in + let opts = + [ + "-p", Arg.Set_int port, " port"; + "--unix", Arg.Set_string unix_sock, " unix socket"; + ( "--max-conns", + Arg.Int (fun i -> max_conns := Some i), + " max number of connections" ); + "-v", Arg.Set verbose, " verbose"; + ] + |> Arg.align + in + Arg.parse opts ignore "echo_server"; + + let@ () = + Nanoev_picos.Background_thread.with_setup (Nanoev_posix.create ()) + in + F.main @@ fun runner -> + main ~port:!port ~unix_sock:!unix_sock ~max_conns:!max_conns ~runner () diff --git a/tests/posix/t1.expected b/tests/posix/t1.expected new file mode 100644 index 0000000..a36183e --- /dev/null +++ b/tests/posix/t1.expected @@ -0,0 +1,3 @@ +writing +can read +done writing diff --git a/tests/posix/t1.ml b/tests/posix/t1.ml new file mode 100644 index 0000000..366a5c9 --- /dev/null +++ b/tests/posix/t1.ml @@ -0,0 +1,29 @@ +module E = Nanoev_posix + +let mkpipe () : Unix.file_descr * Unix.file_descr = + let f1, f2 = Unix.pipe () in + Unix.set_nonblock f1; + Unix.set_nonblock f2; + f1, f2 + +let loop (e : E.t) = + while true do + E.step e + done + +let () = + let ev = E.create () in + ignore (Thread.create loop ev : Thread.t); + let rd, wr = mkpipe () in + E.on_readable ev rd () () (fun ~closed () () -> + if closed then + print_endline "closed!" + else + print_endline "can read"); + Thread.delay 0.05; + print_endline "writing"; + ignore + (Unix.write wr (Bytes.unsafe_of_string "hello") 0 (String.length "hello") + : int); + Thread.delay 0.1; + print_endline "done writing"