diff --git a/.github/workflows/gh-pages.yml b/.github/workflows/gh-pages.yml new file mode 100644 index 0000000..46a599b --- /dev/null +++ b/.github/workflows/gh-pages.yml @@ -0,0 +1,35 @@ +name: github pages + +on: + push: + branches: + - main # Set a branch name to trigger deployment + +jobs: + deploy: + name: Deploy doc + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@main + + - name: Use OCaml + uses: ocaml/setup-ocaml@v3 + with: + ocaml-compiler: '5.2' + dune-cache: true + allow-prerelease-opam: true + + # temporary until it's in a release + - run: opam pin picos 0.6.0 -y -n + + - run: opam install picos moonpool trace + + - run: opam exec -- odig odoc --cache-dir=_doc/ nanoev + + - name: Deploy + uses: peaceiris/actions-gh-pages@v3 + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + publish_dir: ./_doc/html + destination_dir: . + enable_jekyll: false diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 0000000..fb8daa9 --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,61 @@ +name: Build and Test + +on: + push: + branches: + - main + pull_request: + +jobs: + run: + name: build # build+test on various versions of OCaml, on linux + timeout-minutes: 15 + strategy: + fail-fast: true + matrix: + os: + - ubuntu-latest + ocaml-compiler: + - '5.0' + - '5.3' + + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@main + - name: Use OCaml ${{ matrix.ocaml-compiler }} + uses: ocaml/setup-ocaml@v3 + with: + ocaml-compiler: ${{ matrix.ocaml-compiler }} + dune-cache: true + allow-prerelease-opam: true + + - run: opam pin picos 0.6.0 -y -n + - run: opam install picos + - run: opam install -t --deps-only . + + - run: opam exec -- dune build @install + + # install some depopts and test deps + - run: opam install -t trace + + - run: opam exec -- dune build --profile=release --force @install @runtest + + format: + name: format + strategy: + matrix: + ocaml-compiler: + - '5.2' + runs-on: 'ubuntu-latest' + steps: + - uses: actions/checkout@main + - name: Use OCaml ${{ matrix.ocaml-compiler }} + uses: ocaml/setup-ocaml@v3 + with: + ocaml-compiler: ${{ matrix.ocaml-compiler }} + dune-cache: true + allow-prerelease-opam: true + + - run: opam install ocamlformat.0.26.2 + - run: opam exec -- make format-check + diff --git a/README.md b/README.md new file mode 100644 index 0000000..1266d69 --- /dev/null +++ b/README.md @@ -0,0 +1,31 @@ +# nanoev + +A minimalistic but modular abstraction for IO event loops. + +The goal of this library is to provide a uniform abstraction over multiple +system event loops, in a way that plays well with Picos. + +## Usage + +Very basic usage would look like this: + +```ocaml +module EV = Nanoev_picos + + +let () = + (* use a backend, eg. select *) + let ev = Nanoev_unix.create () in + + (* install the backend *) + Nanoev_picos.setup_bg_thread ev; + + (* setup a picos scheduler and use EV.read, EV.write, etc. *) + … +``` + + +## Backends + +- [x] select +- [ ] uring diff --git a/dune-project b/dune-project index e1d3ceb..e0cd8bb 100644 --- a/dune-project +++ b/dune-project @@ -1,6 +1,7 @@ (lang dune 2.7) (name nanoev) + (generate_opam_files true) (source @@ -16,8 +17,24 @@ (package (name nanoev) - (synopsis "Tiny event loop around `select`") + (synopsis "Tiny event loop abstraction") (depends ocaml dune base-unix) - (tags (unix select async))) + (depopts + (trace (>= 0.7)) + (picos + (and (>= 0.5) (< 0.7)))) + (tags + (unix select async))) + +(package + (name nanoev_tiny_httpd) + (synopsis "Use nanoev as a basis for tiny_httpd") + (depends + ocaml + dune + nanoev + picos + (tiny_httpd (>= 0.17))) + (tags (nanoev http))) ; See the complete stanza docs at https://dune.readthedocs.io/en/stable/reference/dune-project/index.html diff --git a/echo.sh b/echo.sh new file mode 100755 index 0000000..a3a0bcf --- /dev/null +++ b/echo.sh @@ -0,0 +1,2 @@ +#!/bin/sh +exec dune exec --display=quiet -- examples/echo/echo.exe $@ diff --git a/examples/echo/dune b/examples/echo/dune new file mode 100644 index 0000000..7fa6bea --- /dev/null +++ b/examples/echo/dune @@ -0,0 +1,4 @@ +(executable + (name echo) + (libraries nanoev nanoev.unix moonpool moonpool.fib trace trace-tef + nanoev_tiny_httpd)) diff --git a/examples/echo/echo.ml b/examples/echo/echo.ml new file mode 100644 index 0000000..a5543b7 --- /dev/null +++ b/examples/echo/echo.ml @@ -0,0 +1,280 @@ +open Tiny_httpd_core +module Log = Tiny_httpd.Log + +let ( let@ ) = ( @@ ) +let now_ = Unix.gettimeofday + +let alice_text = + "CHAPTER I. Down the Rabbit-Hole Alice was beginning to get very tired of \ + sitting by her sister on the bank, and of having nothing to do: once or \ + twice she had peeped into the book her sister was reading, but it had no \ + pictures or conversations in it, thought \ + Alice So she was considering in her \ + own mind (as well as she could, for the hot day made her feel very sleepy \ + and stupid), whether the pleasure of making a daisy-chain would be worth \ + the trouble of getting up and picking the daisies, when suddenly a White \ + Rabbit with pink eyes ran close by her. There was nothing so very \ + remarkable in that; nor did Alice think it so very much out of the way to \ + hear the Rabbit say to itself, (when \ + she thought it over afterwards, it occurred to her that she ought to have \ + wondered at this, but at the time it all seemed quite natural); but when \ + the Rabbit actually took a watch out of its waistcoat-pocket, and looked at \ + it, and then hurried on, Alice started to her feet, for it flashed across \ + her mind that she had never before seen a rabbit with either a \ + waistcoat-pocket, or a watch to take out of it, and burning with curiosity, \ + she ran across the field after it, and fortunately was just in time to see \ + it pop down a large rabbit-hole under the hedge. In another moment down \ + went Alice after it, never once considering how in the world she was to get \ + out again. The rabbit-hole went straight on like a tunnel for some way, and \ + then dipped suddenly down, so suddenly that Alice had not a moment to think \ + about stopping herself before she found herself falling down a very deep \ + well. Either the well was very deep, or she fell very slowly, for she had \ + plenty of time as she went down to look about her and to wonder what was \ + going to happen next. First, she tried to look down and make out what she \ + was coming to, but it was too dark to see anything; then she looked at the \ + sides of the well, and noticed that they were filled with cupboards......" + +(* util: a little middleware collecting statistics *) +let middleware_stat () : Server.Middleware.t * (unit -> string) = + let n_req = ref 0 in + let total_time_ = ref 0. in + let parse_time_ = ref 0. in + let build_time_ = ref 0. in + let write_time_ = ref 0. in + + let m h req ~resp = + incr n_req; + let t1 = Request.start_time req in + let t2 = now_ () in + h req ~resp:(fun response -> + let t3 = now_ () in + resp response; + let t4 = now_ () in + total_time_ := !total_time_ +. (t4 -. t1); + parse_time_ := !parse_time_ +. (t2 -. t1); + build_time_ := !build_time_ +. (t3 -. t2); + write_time_ := !write_time_ +. (t4 -. t3)) + and get_stat () = + Printf.sprintf + "%d requests (average response time: %.3fms = %.3fms + %.3fms + %.3fms)" + !n_req + (!total_time_ /. float !n_req *. 1e3) + (!parse_time_ /. float !n_req *. 1e3) + (!build_time_ /. float !n_req *. 1e3) + (!write_time_ /. float !n_req *. 1e3) + in + m, get_stat + +(* ugly AF *) +let base64 x = + let ic, oc = Unix.open_process "base64" in + output_string oc x; + flush oc; + close_out oc; + let r = input_line ic in + ignore (Unix.close_process (ic, oc)); + r + +let setup_logging () = + Logs.set_reporter @@ Logs.format_reporter (); + Logs.set_level ~all:true (Some Logs.Debug) + +let () = + let@ () = Trace_tef.with_setup () in + Trace.set_thread_name "main"; + + let port_ = ref 8080 in + let max_conn = ref 1024 in + let j = ref 8 in + Arg.parse + (Arg.align + [ + "--port", Arg.Set_int port_, " set port"; + "-p", Arg.Set_int port_, " set port"; + "-j", Arg.Set_int j, " number of threads"; + "--debug", Arg.Unit setup_logging, " enable debug"; + "--max-conns", Arg.Set_int max_conn, " maximum concurrent connections"; + ]) + (fun _ -> raise (Arg.Bad "")) + "echo [option]*"; + + let@ pool = Moonpool.Ws_pool.with_ ~num_threads:!j () in + let@ _runner = Moonpool_fib.main in + + let ev = Nanoev_unix.create () in + Nanoev_picos.setup_bg_thread ev; + + let server = + Nanoev_tiny_httpd.create ~new_thread:(Moonpool.run_async pool) ~port:!port_ + ~max_connections:!max_conn () + in + + let m_stats, get_stats = middleware_stat () in + Server.add_middleware server ~stage:(`Stage 1) m_stats; + + (* say hello *) + Server.add_route_handler ~meth:`GET server + Route.(exact "hello" @/ string @/ return) + (fun name _req -> Response.make_string (Ok ("hello " ^ name ^ "!\n"))); + + (* compressed file access *) + Server.add_route_handler ~meth:`GET server + Route.(exact "zcat" @/ string_urlencoded @/ return) + (fun path _req -> + let ic = open_in path in + let str = IO.Input.of_in_channel ic in + let mime_type = + try + let p = Unix.open_process_in (Printf.sprintf "file -i -b %S" path) in + try + let s = [ "Content-Type", String.trim (input_line p) ] in + ignore @@ Unix.close_process_in p; + s + with _ -> + ignore @@ Unix.close_process_in p; + [] + with _ -> [] + in + Response.make_stream ~headers:mime_type (Ok str)); + + (* echo request *) + Server.add_route_handler server + Route.(exact "echo" @/ return) + (fun req -> + let q = + Request.query req + |> List.map (fun (k, v) -> Printf.sprintf "%S = %S" k v) + |> String.concat ";" + in + Response.make_string + (Ok (Format.asprintf "echo:@ %a@ (query: %s)@." Request.pp req q))); + + (* file upload *) + Server.add_route_handler_stream ~meth:`PUT server + Route.(exact "upload" @/ string @/ return) + (fun path req -> + Log.debug (fun k -> + k "start upload %S, headers:\n%s\n\n%!" path + (Format.asprintf "%a" Headers.pp (Request.headers req))); + try + let oc = open_out @@ "/tmp/" ^ path in + IO.Input.to_chan oc req.Request.body; + flush oc; + Response.make_string (Ok "uploaded file") + with e -> + Response.fail ~code:500 "couldn't upload file: %s" + (Printexc.to_string e)); + + (* protected by login *) + Server.add_route_handler server + Route.(exact "protected" @/ return) + (fun req -> + let ok = + match Request.get_header req "authorization" with + | Some v -> + Log.debug (fun k -> k "authenticate with %S" v); + v = "Basic " ^ base64 "user:foobar" + | None -> false + in + if ok then ( + (* FIXME: a logout link *) + let s = + "

hello, this is super secret!

log out" + in + Response.make_string (Ok s) + ) else ( + let headers = + Headers.(empty |> set "www-authenticate" "basic realm=\"echo\"") + in + Response.fail ~code:401 ~headers "invalid" + )); + + (* logout *) + Server.add_route_handler server + Route.(exact "logout" @/ return) + (fun _req -> Response.fail ~code:401 "logged out"); + + (* stats *) + Server.add_route_handler server + Route.(exact "stats" @/ return) + (fun _req -> + let stats = get_stats () in + Response.make_string @@ Ok stats); + + Server.add_route_handler server + Route.(exact "alice" @/ return) + (fun _req -> Response.make_string (Ok alice_text)); + + (* main page *) + Server.add_route_handler server + Route.(return) + (fun _req -> + let open Tiny_httpd_html in + let h = + html [] + [ + head [] [ title [] [ txt "index of echo" ] ]; + body [] + [ + h3 [] [ txt "welcome!" ]; + p [] [ b [] [ txt "endpoints are:" ] ]; + ul [] + [ + li [] [ pre [] [ txt "/hello/:name (GET)" ] ]; + li [] + [ + pre [] + [ + a [ A.href "/echo/" ] [ txt "echo" ]; + txt " echo back query"; + ]; + ]; + li [] + [ pre [] [ txt "/upload/:path (PUT) to upload a file" ] ]; + li [] + [ + pre [] + [ + txt + "/zcat/:path (GET) to download a file (deflate \ + transfer-encoding)"; + ]; + ]; + li [] + [ + pre [] + [ + a [ A.href "/stats/" ] [ txt "/stats/" ]; + txt " (GET) to access statistics"; + ]; + ]; + li [] + [ + pre [] + [ + a [ A.href "/protected" ] [ txt "/protected" ]; + txt + " (GET) to see a protected page (login: user, \ + password: foobar)"; + ]; + ]; + li [] + [ + pre [] + [ + a [ A.href "/logout" ] [ txt "/logout" ]; + txt " (POST) to log out"; + ]; + ]; + ]; + ]; + ] + in + let s = to_string_top h in + Response.make_string ~headers:[ "content-type", "text/html" ] @@ Ok s); + + Printf.printf "listening on http://%s:%d\n%!" (Server.addr server) + (Server.port server); + match Server.run server with + | Ok () -> () + | Error e -> raise e diff --git a/nanoev.opam b/nanoev.opam index 2981f11..9fa7ba9 100644 --- a/nanoev.opam +++ b/nanoev.opam @@ -1,6 +1,6 @@ # This file is generated by dune, edit dune-project instead opam-version: "2.0" -synopsis: "Tiny event loop around `select`" +synopsis: "Tiny event loop abstraction" maintainer: ["Simon Cruanes"] authors: ["Simon Cruanes"] license: "MIT" @@ -13,6 +13,10 @@ depends: [ "base-unix" "odoc" {with-doc} ] +depopts: [ + "trace" {>= "0.7"} + "picos" {>= "0.5" & < "0.7"} +] build: [ ["dune" "subst"] {dev} [ diff --git a/nanoev_tiny_httpd.opam b/nanoev_tiny_httpd.opam new file mode 100644 index 0000000..9504331 --- /dev/null +++ b/nanoev_tiny_httpd.opam @@ -0,0 +1,32 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Use nanoev as a basis for tiny_httpd" +maintainer: ["Simon Cruanes"] +authors: ["Simon Cruanes"] +license: "MIT" +tags: ["nanoev" "http"] +homepage: "https://github.com/c-cube/nanoev" +bug-reports: "https://github.com/c-cube/nanoev/issues" +depends: [ + "ocaml" + "dune" {>= "2.7"} + "nanoev" + "picos" + "tiny_httpd" {>= "0.17"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/c-cube/nanoev.git" diff --git a/src/core/dune b/src/core/dune new file mode 100644 index 0000000..07249b5 --- /dev/null +++ b/src/core/dune @@ -0,0 +1,11 @@ +(library + (name nanoev) + (public_name nanoev) + (synopsis "Nano ev loop") + (libraries + unix + (select + trace_.ml + from + (trace.core -> trace_.real.ml) + (-> trace_.dummy.ml)))) diff --git a/src/nanoev.ml b/src/core/nanoev.ml similarity index 67% rename from src/nanoev.ml rename to src/core/nanoev.ml index 00989e9..c15935e 100644 --- a/src/nanoev.ml +++ b/src/core/nanoev.ml @@ -1,11 +1,28 @@ +module Trace_ = Trace_ + +exception Closed + module Impl = struct type 'st ops = { clear: 'st -> unit; wakeup_from_outside: 'st -> unit; + close: 'st -> Unix.file_descr -> unit; on_readable: - 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + 'a 'b. + 'st -> + Unix.file_descr -> + 'a -> + 'b -> + (closed:bool -> 'a -> 'b -> unit) -> + unit; on_writable: - 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + 'a 'b. + 'st -> + Unix.file_descr -> + 'a -> + 'b -> + (closed:bool -> 'a -> 'b -> unit) -> + unit; run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; step: 'st -> unit; } @@ -21,6 +38,7 @@ type t = Impl.t let[@inline] clear (Ev (ops, st)) = ops.clear st let[@inline] wakeup_from_outside (Ev (ops, st)) = ops.wakeup_from_outside st +let[@inline] close (Ev (ops, st)) fd = ops.close st fd let[@inline] on_readable (Ev (ops, st)) fd x y f : unit = ops.on_readable st fd x y f @@ -32,13 +50,3 @@ let[@inline] run_after_s (Ev (ops, st)) time x y f : unit = ops.run_after_s st time x y f let[@inline] step (Ev (ops, st)) : unit = ops.step st - -(* -let rec read (self:t) fd buf i len : int = - match Unix.read fd buf i len with - | n -> n - | exception Unix.Unix_error (Unix, _, _) -> - read self fd buf i len -*) - - diff --git a/src/core/nanoev.mli b/src/core/nanoev.mli new file mode 100644 index 0000000..4792cd6 --- /dev/null +++ b/src/core/nanoev.mli @@ -0,0 +1,58 @@ +(** Nano event loop *) + +type t + +exception Closed + +module Impl : sig + type 'st ops = { + clear: 'st -> unit; + wakeup_from_outside: 'st -> unit; + close: 'st -> Unix.file_descr -> unit; + on_readable: + 'a 'b. + 'st -> + Unix.file_descr -> + 'a -> + 'b -> + (closed:bool -> 'a -> 'b -> unit) -> + unit; + on_writable: + 'a 'b. + 'st -> + Unix.file_descr -> + 'a -> + 'b -> + (closed:bool -> 'a -> 'b -> unit) -> + unit; + run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; + step: 'st -> unit; + } + + val build : 'a ops -> 'a -> t +end + +val clear : t -> unit +(** Reset the state *) + +val wakeup_from_outside : t -> unit + +val step : t -> unit +(** Run one step of the event loop until something happens *) + +val close : t -> Unix.file_descr -> unit +(** Close the file descriptor and clean it up *) + +val on_readable : + t -> Unix.file_descr -> 'a -> 'b -> (closed:bool -> 'a -> 'b -> unit) -> unit + +val on_writable : + t -> Unix.file_descr -> 'a -> 'b -> (closed:bool -> 'a -> 'b -> unit) -> unit + +val run_after_s : t -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit + +(**/**) + +module Trace_ = Trace_ + +(**/**) diff --git a/src/core/trace_.dummy.ml b/src/core/trace_.dummy.ml new file mode 100644 index 0000000..b08b984 --- /dev/null +++ b/src/core/trace_.dummy.ml @@ -0,0 +1,3 @@ +let[@inline] with_span ?data:_ ~__FILE__:_ ~__LINE__:_ _name f = f 0L +let[@inline] message ?data:_ _ = () +let set_thread_name (_ : string) = () diff --git a/src/core/trace_.real.ml b/src/core/trace_.real.ml new file mode 100644 index 0000000..6c513a5 --- /dev/null +++ b/src/core/trace_.real.ml @@ -0,0 +1,5 @@ +let[@inline] with_span ?data ~__FILE__ ~__LINE__ name f = + Trace_core.with_span ?data ~__FILE__ ~__LINE__ name f + +let[@inline] message ?data m = Trace_core.message ?data m +let set_thread_name name = Trace_core.set_thread_name name diff --git a/src/dune b/src/dune deleted file mode 100644 index 8109e16..0000000 --- a/src/dune +++ /dev/null @@ -1,5 +0,0 @@ -(library - (name nanoev) - (public_name nanoev) - (synopsis "Nano ev loop") - (libraries unix)) diff --git a/src/nanoev.mli b/src/nanoev.mli deleted file mode 100644 index fcbd0b8..0000000 --- a/src/nanoev.mli +++ /dev/null @@ -1,30 +0,0 @@ -(** Nano event loop *) - -type t - -module Impl : sig - type 'st ops = { - clear: 'st -> unit; - wakeup_from_outside: 'st -> unit; - on_readable: - 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; - on_writable: - 'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; - run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit; - step: 'st -> unit; - } - - val build : 'a ops -> 'a -> t -end - -val clear : t -> unit -(** Reset the state *) - -val wakeup_from_outside : t -> unit - -val step : t -> unit -(** Run one step of the event loop until something happens *) - -val on_readable : t -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit -val on_writable : t -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit -val run_after_s : t -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit diff --git a/src/picos/dune b/src/picos/dune new file mode 100644 index 0000000..db9792d --- /dev/null +++ b/src/picos/dune @@ -0,0 +1,5 @@ +(library + (name nanoev_picos) + (public_name nanoev.picos) + (optional) ; picos + (libraries threads picos nanoev)) diff --git a/src/picos/nanoev_picos.ml b/src/picos/nanoev_picos.ml new file mode 100644 index 0000000..9920345 --- /dev/null +++ b/src/picos/nanoev_picos.ml @@ -0,0 +1,146 @@ +open struct + module Trace_ = Nanoev.Trace_ + + let ( let@ ) = ( @@ ) +end + +exception Closed = Nanoev.Closed + +module Global_ = struct + type st = + | None + | Some of { + active: bool Atomic.t; + nanoev: Nanoev.t; + th: Thread.t; + } + + let st : st Atomic.t = Atomic.make None + let lock = Mutex.create () + + let with_lock lock f = + Mutex.lock lock; + match f () with + | exception e -> + Mutex.unlock lock; + raise e + | x -> + Mutex.unlock lock; + x + + let bg_thread_ ~active ~evloop () : unit = + Trace_.set_thread_name "nanoev.picos.bg-thread"; + while Atomic.get active do + Nanoev.step evloop + done + + let[@inline] has_bg_thread () = Atomic.get st <> None + + let setup_bg_thread (ev : Nanoev.t) : unit = + let@ () = with_lock lock in + (* shutdown existing thread, if any *) + (match Atomic.get st with + | Some st -> + Atomic.set st.active false; + Nanoev.wakeup_from_outside st.nanoev; + Thread.join st.th + | None -> ()); + + (* start new bg thread *) + let active = Atomic.make true in + Atomic.set st + @@ Some + { + active; + nanoev = ev; + th = Thread.create (bg_thread_ ~active ~evloop:ev) (); + } +end + +let has_bg_thread = Global_.has_bg_thread +let setup_bg_thread = Global_.setup_bg_thread + +let[@inline] get_loop_exn_ () : Nanoev.t = + match Atomic.get Global_.st with + | None -> failwith "No nanoev loop installed." + | Some st -> st.nanoev + +let[@inline] unwrap_ = function + | None -> () + | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt + +let retry_read_ fd f = + let ev = get_loop_exn_ () in + let[@unroll 1] rec loop () = + match f () with + | res -> res + | exception + Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> + Trace_.message "read must wait"; + let trigger = Picos.Trigger.create () in + let closed_r = ref false in + Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r -> + closed_r := closed; + Picos.Trigger.signal trigger); + Picos.Trigger.await trigger |> unwrap_; + if !closed_r then raise Closed; + loop () + in + loop () + +let retry_write_ fd f = + let ev = get_loop_exn_ () in + let rec loop () = + match f () with + | res -> res + | exception + Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> + Trace_.message "write must wait"; + let trigger = Picos.Trigger.create () in + let closed_r = ref false in + Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r -> + closed_r := closed; + Picos.Trigger.signal trigger); + Picos.Trigger.await trigger |> unwrap_; + if !closed_r then raise Closed; + loop () + in + loop () + +let read fd buf i len : int = + try + retry_read_ fd (fun () -> + Trace_.message "read"; + Unix.read fd buf i len) + with Closed -> 0 + +let close fd = + Unix.close fd; + let ev = get_loop_exn_ () in + Nanoev.close ev fd + +let accept fd = + try + retry_read_ fd (fun () -> + Trace_.message "accept"; + Unix.accept fd) + with Unix.Unix_error ((Unix.ESHUTDOWN | Unix.ECONNABORTED), _, _) -> + raise Closed + +let write fd buf i len : int = + try + retry_write_ fd (fun () -> + Trace_.message "write"; + Unix.write fd buf i len) + with Closed -> 0 + +let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr) + +let sleep t = + if t > 0. then ( + let ev = get_loop_exn_ () in + let trigger = Picos.Trigger.create () in + Nanoev.run_after_s ev t trigger () (fun trigger () -> + Picos.Trigger.signal trigger); + Picos.Trigger.await trigger |> unwrap_ + ) diff --git a/src/picos/nanoev_picos.mli b/src/picos/nanoev_picos.mli new file mode 100644 index 0000000..ef795bb --- /dev/null +++ b/src/picos/nanoev_picos.mli @@ -0,0 +1,32 @@ +(** Basic interface with picos *) + +val setup_bg_thread : Nanoev.t -> unit +(** Install this event loop in a background thread *) + +val has_bg_thread : unit -> bool +(** [has_bg_thread ()] is [true] iff a background thread is running a nanoev loop *) + +(** {2 Non blocking IO primitives} *) + +val read : Unix.file_descr -> bytes -> int -> int -> int +(** Read from the non blocking FD. + @raise Nanoev.Closed if the FD is closed + @raise Unix.Unix_error for other errors *) + +val write : Unix.file_descr -> bytes -> int -> int -> int +(** Write into the non blocking FD. + @raise Nanoev.Closed if the FD is closed + @raise Unix.Unix_error for other errors *) + +val close : Unix.file_descr -> unit +(** Close the file descriptor + @raise Unix.Unix_error when it fails *) + +val connect : Unix.file_descr -> Unix.sockaddr -> unit + +val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr +(** Accept a connection on this fd. + @raise Nanoev.Closed if the FD is closed. + @raise Unix.Unix_error for other errors *) + +val sleep : float -> unit diff --git a/src/tiny_httpd/dune b/src/tiny_httpd/dune new file mode 100644 index 0000000..a372e9d --- /dev/null +++ b/src/tiny_httpd/dune @@ -0,0 +1,10 @@ +(library + (name nanoev_tiny_httpd) + (public_name nanoev_tiny_httpd) + (libraries + threads + picos + (re_export nanoev) + nanoev.picos + (re_export iostream) + (re_export tiny_httpd))) diff --git a/src/tiny_httpd/nanoev_tiny_httpd.ml b/src/tiny_httpd/nanoev_tiny_httpd.ml new file mode 100644 index 0000000..7369fde --- /dev/null +++ b/src/tiny_httpd/nanoev_tiny_httpd.ml @@ -0,0 +1,328 @@ +module TH = Tiny_httpd_core +module EV = Nanoev_picos +module Log = TH.Log +module Slice = Iostream.Slice +module Pool = TH.Pool +module Buf = TH.Buf + +let unwrap_ = function + | None -> () + | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt + +(** Non blocking semaphore *) +module Sem_ = struct + type t = { + mutable n: int; + max: int; + waiting: Picos.Trigger.t Queue.t; + mutex: Mutex.t; + } + + let create n = + if n <= 0 then invalid_arg "Semaphore.create"; + { n; max = n; mutex = Mutex.create (); waiting = Queue.create () } + + let acquire self = + Mutex.lock self.mutex; + while self.n = 0 do + let tr = Picos.Trigger.create () in + Queue.push tr self.waiting; + Mutex.unlock self.mutex; + let res = Picos.Trigger.await tr in + unwrap_ res; + Mutex.lock self.mutex + done; + assert (self.n > 0); + self.n <- self.n - 1; + Mutex.unlock self.mutex + + let release self = + Mutex.lock self.mutex; + self.n <- self.n + 1; + Option.iter Picos.Trigger.signal (Queue.take_opt self.waiting); + Mutex.unlock self.mutex + + let num_acquired self = self.max - self.n +end + +module Out = struct + open Iostream + + class type t = Out_buf.t + + class of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) + (fd : Unix.file_descr) : + t = + object + inherit Out_buf.t_from_output ~bytes:buf.bytes () + + method private output_underlying bs i len0 = + let i = ref i in + let len = ref len0 in + while !len > 0 do + match EV.write fd bs !i !len with + | 0 -> failwith "write failed" + | n -> + i := !i + n; + len := !len - n + | exception + Unix.Unix_error + ( (( Unix.EBADF | Unix.ENOTCONN | Unix.ESHUTDOWN + | Unix.ECONNRESET | Unix.EPIPE ) as err), + fn, + _ ) -> + failwith + @@ Printf.sprintf "write failed in %s: %s" fn + (Unix.error_message err) + done + + method private close_underlying () = + if not (Atomic.exchange closed true) then + if close_noerr then ( + try EV.close fd with _ -> () + ) else + EV.close fd + end +end + +module In = struct + open Iostream + + class type t = In_buf.t + + let of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) + (fd : Unix.file_descr) : t = + let eof = ref false in + object + inherit Iostream.In_buf.t_from_refill ~bytes:buf.bytes () + + method private refill (slice : Slice.t) = + if not !eof then ( + slice.off <- 0; + let continue = ref true in + while !continue do + match EV.read fd slice.bytes 0 (Bytes.length slice.bytes) with + | n -> + slice.len <- n; + continue := false + | exception + Unix.Unix_error + ( ( Unix.EBADF | Unix.ENOTCONN | Unix.ESHUTDOWN + | Unix.ECONNRESET | Unix.EPIPE ), + _, + _ ) -> + eof := true; + continue := false + done; + (* Printf.eprintf "read returned %d B\n%!" !n; *) + if slice.len = 0 then eof := true + ) + + method close () = + if not (Atomic.exchange closed true) then ( + eof := true; + if close_noerr then ( + try EV.close fd with _ -> () + ) else + EV.close fd + ) + end +end + +module Unix_tcp_server_ = struct + let get_addr_ sock = + match Unix.getsockname sock with + | Unix.ADDR_INET (addr, port) -> addr, port + | _ -> invalid_arg "httpd: address is not INET" + + type t = { + addr: string; + port: int; + buf_pool: Buf.t Pool.t; + slice_pool: Slice.t Pool.t; + max_connections: int; + sem_max_connections: Sem_.t; + (** semaphore to restrict the number of active concurrent connections *) + mutable sock: Unix.file_descr option; (** Socket *) + new_thread: (unit -> unit) -> unit; + timeout: float; + masksigpipe: bool; + mutable running: bool; (* TODO: use an atomic? *) + } + + let shutdown_silent_ fd = + try Unix.shutdown fd Unix.SHUTDOWN_ALL with _ -> () + + let close_silent_ fd = try Unix.close fd with _ -> () + + let to_tcp_server (self : t) : TH.IO.TCP_server.builder = + { + TH.IO.TCP_server.serve = + (fun ~after_init ~handle () : unit -> + if self.masksigpipe && not Sys.win32 then + ignore (Unix.sigprocmask Unix.SIG_BLOCK [ Sys.sigpipe ] : _ list); + let sock, should_bind = + match self.sock with + | Some s -> + s, false + (* Because we're getting a socket from the caller (e.g. systemd) *) + | None -> + let s = + Unix.socket + (if TH.Util.is_ipv6_str self.addr then + Unix.PF_INET6 + else + Unix.PF_INET) + Unix.SOCK_STREAM 0 + in + s, true (* Because we're creating the socket ourselves *) + in + Unix.set_nonblock sock; + Unix.setsockopt_optint sock Unix.SO_LINGER None; + if should_bind then ( + let inet_addr = Unix.inet_addr_of_string self.addr in + Unix.setsockopt sock Unix.SO_REUSEADDR true; + Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port)); + let n_listen = 2 * self.max_connections in + Unix.listen sock n_listen + ); + + self.sock <- Some sock; + + let tcp_server = + { + TH.IO.TCP_server.stop = (fun () -> self.running <- false); + running = (fun () -> self.running); + active_connections = + (fun () -> Sem_.num_acquired self.sem_max_connections); + endpoint = + (fun () -> + let addr, port = get_addr_ sock in + Unix.string_of_inet_addr addr, port); + } + in + after_init tcp_server; + + (* how to handle a single client *) + let handle_client_ (client_sock : Unix.file_descr) + (client_addr : Unix.sockaddr) : unit = + Log.debug (fun k -> + k "t[%d]: serving new client on %s" + (Thread.id @@ Thread.self ()) + (TH.Util.show_sockaddr client_addr)); + + if self.masksigpipe && not Sys.win32 then + ignore (Unix.sigprocmask Unix.SIG_BLOCK [ Sys.sigpipe ] : _ list); + Unix.set_nonblock client_sock; + Unix.setsockopt client_sock Unix.TCP_NODELAY true; + Unix.(setsockopt_float client_sock SO_RCVTIMEO self.timeout); + Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout); + + Pool.with_resource self.slice_pool @@ fun ic_buf -> + Pool.with_resource self.slice_pool @@ fun oc_buf -> + let closed = Atomic.make false in + + let oc = + new Out.of_unix_fd + ~close_noerr:true ~closed ~buf:oc_buf client_sock + in + let ic = + In.of_unix_fd ~close_noerr:true ~closed ~buf:ic_buf client_sock + in + handle.handle ~client_addr ic oc + in + + Unix.set_nonblock sock; + while self.running do + match EV.accept sock with + | client_sock, client_addr -> + (* limit concurrency *) + Sem_.acquire self.sem_max_connections; + (* Block INT/HUP while cloning to avoid children handling them. + When thread gets them, our Unix.accept raises neatly. *) + if not Sys.win32 then + ignore Unix.(sigprocmask SIG_BLOCK Sys.[ sigint; sighup ]); + self.new_thread (fun () -> + try + handle_client_ client_sock client_addr; + Log.debug (fun k -> + k "t[%d]: done with client on %s, exiting" + (Thread.id @@ Thread.self ()) + @@ TH.Util.show_sockaddr client_addr); + shutdown_silent_ client_sock; + close_silent_ client_sock; + Sem_.release self.sem_max_connections + with e -> + let bt = Printexc.get_raw_backtrace () in + shutdown_silent_ client_sock; + close_silent_ client_sock; + Sem_.release self.sem_max_connections; + Log.error (fun k -> + k + "@[Handler: uncaught exception for client %s:@ \ + %s@ %s@]" + (TH.Util.show_sockaddr client_addr) + (Printexc.to_string e) + (Printexc.raw_backtrace_to_string bt))); + if not Sys.win32 then + ignore Unix.(sigprocmask SIG_UNBLOCK Sys.[ sigint; sighup ]) + | exception e -> + Log.error (fun k -> + k "Unix.accept raised an exception: %s" (Printexc.to_string e)) + done; + + (* Wait for all threads to be done: this only works if all threads are done. *) + Unix.close sock; + (* TODO? *) + (* Sem_.acquire self.sem_max_connections.max self.sem_max_connections; *) + ()); + } +end + +open struct + let get_max_connection_ ?(max_connections = 2048) () : int = + let max_connections = max 4 max_connections in + max_connections + + let clear_slice (slice : Slice.t) = + Bytes.fill slice.bytes 0 (Bytes.length slice.bytes) '\x00'; + slice.off <- 0; + slice.len <- 0 +end + +let create ?(masksigpipe = not Sys.win32) ?max_connections ?(timeout = 0.0) + ?buf_size ?(get_time_s = Unix.gettimeofday) ?(addr = "127.0.0.1") + ?(port = 8080) ?sock ?middlewares ~new_thread () : TH.Server.t = + let max_connections = get_max_connection_ ?max_connections () in + let server = + { + Unix_tcp_server_.addr; + new_thread; + buf_pool = + Pool.create ~clear:Buf.clear_and_zero + ~mk_item:(fun () -> Buf.create ?size:buf_size ()) + (); + slice_pool = + Pool.create ~clear:clear_slice + ~mk_item: + (let buf_size = Option.value buf_size ~default:4096 in + fun () -> Slice.create buf_size) + (); + running = true; + port; + sock; + max_connections; + sem_max_connections = Sem_.create max_connections; + masksigpipe; + timeout; + } + in + let tcp_server_builder = Unix_tcp_server_.to_tcp_server server in + let module B = struct + let init_addr () = addr + let init_port () = port + let get_time_s = get_time_s + let tcp_server () = tcp_server_builder + end in + let backend = (module B : TH.Server.IO_BACKEND) in + TH.Server.create_from ?buf_size ?middlewares ~backend () diff --git a/src/tiny_httpd/nanoev_tiny_httpd.mli b/src/tiny_httpd/nanoev_tiny_httpd.mli new file mode 100644 index 0000000..f0cd9af --- /dev/null +++ b/src/tiny_httpd/nanoev_tiny_httpd.mli @@ -0,0 +1,15 @@ +module TH = Tiny_httpd_core + +val create : + ?masksigpipe:bool -> + ?max_connections:int -> + ?timeout:float -> + ?buf_size:int -> + ?get_time_s:(unit -> float) -> + ?addr:string -> + ?port:int -> + ?sock:Unix.file_descr -> + ?middlewares:([ `Encoding | `Stage of int ] * TH.Server.Middleware.t) list -> + new_thread:((unit -> unit) -> unit) -> + unit -> + TH.Server.t diff --git a/src/unix/nanoev_unix.ml b/src/unix/nanoev_unix.ml index 5c29cf6..7d656c1 100644 --- a/src/unix/nanoev_unix.ml +++ b/src/unix/nanoev_unix.ml @@ -1,12 +1,14 @@ -(* module type BACKEND = Intf.BACKEND *) +open struct + module Trace_ = Nanoev.Trace_ -let ( let@ ) = ( @@ ) -let now_ : unit -> float = Unix.gettimeofday + let ( let@ ) = ( @@ ) + let now_ : unit -> float = Unix.gettimeofday +end (** Callback list *) type cbs = | Nil - | Sub : 'a * 'b * ('a -> 'b -> unit) * cbs -> cbs + | Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs let[@inline] cb_is_empty = function | Nil -> true @@ -42,6 +44,18 @@ type st = { lock: Mutex.t; } +let rec perform_cbs ~closed = function + | Nil -> () + | Sub (x, y, f, tail) -> + f ~closed x y; + perform_cbs ~closed tail + +let rec perform_cbs_closed ~closed = function + | Nil -> () + | Sub (x, y, f, tail) -> + f ~closed x y; + perform_cbs_closed ~closed tail + let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline let create_st () : st = @@ -80,10 +94,12 @@ let clear (self : st) = () let wakeup_from_outside (self : st) : unit = - if not (Atomic.exchange self.wakeup_triggered true) then ( + if not (Atomic.exchange self.wakeup_triggered true) then + let@ _sp = + Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.wakeup-from-outside" + in let b = Bytes.make 1 '!' in ignore (Unix.write self.wakeup_wr b 0 1 : int) - ) let get_fd_ (self : st) fd : per_fd = match Hashtbl.find self.fds fd with @@ -93,7 +109,27 @@ let get_fd_ (self : st) fd : per_fd = Hashtbl.add self.fds fd per_fd; per_fd +let close self fd : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in + let r, w = + let@ self = with_lock_ self in + match Hashtbl.find self.fds fd with + | per_fd -> + Hashtbl.remove self.fds fd; + self.sub_up_to_date <- false; + if Atomic.get self.in_select then wakeup_from_outside self; + per_fd.r, per_fd.w + | exception Not_found -> + invalid_arg "File descriptor is not known to Nanoev" + in + + (* call callbacks outside of the lock *) + perform_cbs_closed ~closed:true r; + perform_cbs_closed ~closed:true w; + () + let on_readable self fd x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in let@ self = with_lock_ self in let per_fd = get_fd_ self fd in per_fd.r <- Sub (x, y, f, per_fd.r); @@ -101,6 +137,7 @@ let on_readable self fd x y f : unit = if Atomic.get self.in_select then wakeup_from_outside self let on_writable self fd x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in let@ self = with_lock_ self in let per_fd = get_fd_ self fd in per_fd.w <- Sub (x, y, f, per_fd.w); @@ -108,6 +145,7 @@ let on_writable self fd x y f : unit = if Atomic.get self.in_select then wakeup_from_outside self let run_after_s self time x y f : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.run-after-s" in let@ self = with_lock_ self in let deadline = now_ () +. time in Heap.insert self.timer (Timer { deadline; x; y; f }); @@ -115,6 +153,7 @@ let run_after_s self time x y f : unit = let recompute_if_needed (self : st) = if not self.sub_up_to_date then ( + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "recompute-if-needed" in self.sub_up_to_date <- true; self.sub_r <- []; self.sub_w <- []; @@ -132,13 +171,8 @@ let next_deadline_ (self : st) : float option = | exception Heap.Empty -> None | Timer t -> Some t.deadline -let rec perform_cbs = function - | Nil -> () - | Sub (x, y, f, tail) -> - f x y; - perform_cbs tail - let step (self : st) : unit = + let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.unix.step" in (* gather the subscriptions and timeout *) let timeout, sub_r, sub_w = let@ self = with_lock_ self in @@ -154,6 +188,14 @@ let step (self : st) : unit = (* enter [select] *) Atomic.set self.in_select true; let r_reads, r_writes, _ = + let@ _sp = + Trace_.with_span ~__FILE__ ~__LINE__ "select" ~data:(fun () -> + [ + "timeout", `Float timeout; + "reads", `Int (List.length sub_r); + "writes", `Int (List.length sub_w); + ]) + in Unix.select (self.wakeup_rd :: sub_r) sub_w [] timeout in Atomic.set self.in_select false; @@ -176,8 +218,10 @@ let step (self : st) : unit = List.iter (fun fd -> - let per_fd = Hashtbl.find self.fds fd in - ready_r := per_fd :: !ready_r) + if fd != self.wakeup_rd then ( + let per_fd = Hashtbl.find self.fds fd in + ready_r := per_fd :: !ready_r + )) r_reads; List.iter (fun fd -> @@ -188,19 +232,27 @@ let step (self : st) : unit = (* call callbacks *) List.iter (fun fd -> - perform_cbs fd.r; + perform_cbs ~closed:false fd.r; fd.r <- Nil) !ready_r; List.iter (fun fd -> - perform_cbs fd.w; + perform_cbs ~closed:false fd.w; fd.w <- Nil) !ready_w; () let ops : st Nanoev.Impl.ops = - { step; on_readable; on_writable; run_after_s; wakeup_from_outside; clear } + { + step; + close; + on_readable; + on_writable; + run_after_s; + wakeup_from_outside; + clear; + } include Nanoev diff --git a/tests/unix/dune b/tests/unix/dune index 8679c90..3ee6379 100644 --- a/tests/unix/dune +++ b/tests/unix/dune @@ -1,4 +1,3 @@ - (tests - (names t1) - (libraries nanoev nanoev.unix threads)) + (names t1) + (libraries nanoev nanoev.unix threads)) diff --git a/tests/unix/t1.ml b/tests/unix/t1.ml index 79e76ff..810087d 100644 --- a/tests/unix/t1.ml +++ b/tests/unix/t1.ml @@ -15,7 +15,11 @@ let () = let ev = E.create () in ignore (Thread.create loop ev : Thread.t); let rd, wr = mkpipe () in - E.on_readable ev rd () () (fun () () -> print_endline "can read"); + E.on_readable ev rd () () (fun ~closed () () -> + if closed then + print_endline "closed!" + else + print_endline "can read"); Thread.delay 0.05; print_endline "writing"; ignore