Merge pull request #1 from c-cube/simon/tiny_httpd

add support for tiny_httpd
This commit is contained in:
Simon Cruanes 2025-02-16 22:21:55 -05:00 committed by GitHub
commit 14d365547d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 1178 additions and 71 deletions

35
.github/workflows/gh-pages.yml vendored Normal file
View file

@ -0,0 +1,35 @@
name: github pages
on:
push:
branches:
- main # Set a branch name to trigger deployment
jobs:
deploy:
name: Deploy doc
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@main
- name: Use OCaml
uses: ocaml/setup-ocaml@v3
with:
ocaml-compiler: '5.2'
dune-cache: true
allow-prerelease-opam: true
# temporary until it's in a release
- run: opam pin picos 0.6.0 -y -n
- run: opam install picos moonpool trace
- run: opam exec -- odig odoc --cache-dir=_doc/ nanoev
- name: Deploy
uses: peaceiris/actions-gh-pages@v3
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
publish_dir: ./_doc/html
destination_dir: .
enable_jekyll: false

61
.github/workflows/main.yml vendored Normal file
View file

@ -0,0 +1,61 @@
name: Build and Test
on:
push:
branches:
- main
pull_request:
jobs:
run:
name: build # build+test on various versions of OCaml, on linux
timeout-minutes: 15
strategy:
fail-fast: true
matrix:
os:
- ubuntu-latest
ocaml-compiler:
- '5.0'
- '5.3'
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@main
- name: Use OCaml ${{ matrix.ocaml-compiler }}
uses: ocaml/setup-ocaml@v3
with:
ocaml-compiler: ${{ matrix.ocaml-compiler }}
dune-cache: true
allow-prerelease-opam: true
- run: opam pin picos 0.6.0 -y -n
- run: opam install picos
- run: opam install -t --deps-only .
- run: opam exec -- dune build @install
# install some depopts and test deps
- run: opam install -t trace
- run: opam exec -- dune build --profile=release --force @install @runtest
format:
name: format
strategy:
matrix:
ocaml-compiler:
- '5.2'
runs-on: 'ubuntu-latest'
steps:
- uses: actions/checkout@main
- name: Use OCaml ${{ matrix.ocaml-compiler }}
uses: ocaml/setup-ocaml@v3
with:
ocaml-compiler: ${{ matrix.ocaml-compiler }}
dune-cache: true
allow-prerelease-opam: true
- run: opam install ocamlformat.0.26.2
- run: opam exec -- make format-check

31
README.md Normal file
View file

@ -0,0 +1,31 @@
# nanoev
A minimalistic but modular abstraction for IO event loops.
The goal of this library is to provide a uniform abstraction over multiple
system event loops, in a way that plays well with Picos.
## Usage
Very basic usage would look like this:
```ocaml
module EV = Nanoev_picos
let () =
(* use a backend, eg. select *)
let ev = Nanoev_unix.create () in
(* install the backend *)
Nanoev_picos.setup_bg_thread ev;
(* setup a picos scheduler and use EV.read, EV.write, etc. *)
```
## Backends
- [x] select
- [ ] uring

View file

@ -1,6 +1,7 @@
(lang dune 2.7)
(name nanoev)
(generate_opam_files true)
(source
@ -16,8 +17,24 @@
(package
(name nanoev)
(synopsis "Tiny event loop around `select`")
(synopsis "Tiny event loop abstraction")
(depends ocaml dune base-unix)
(tags (unix select async)))
(depopts
(trace (>= 0.7))
(picos
(and (>= 0.5) (< 0.7))))
(tags
(unix select async)))
(package
(name nanoev_tiny_httpd)
(synopsis "Use nanoev as a basis for tiny_httpd")
(depends
ocaml
dune
nanoev
picos
(tiny_httpd (>= 0.17)))
(tags (nanoev http)))
; See the complete stanza docs at https://dune.readthedocs.io/en/stable/reference/dune-project/index.html

2
echo.sh Executable file
View file

@ -0,0 +1,2 @@
#!/bin/sh
exec dune exec --display=quiet -- examples/echo/echo.exe $@

4
examples/echo/dune Normal file
View file

@ -0,0 +1,4 @@
(executable
(name echo)
(libraries nanoev nanoev.unix moonpool moonpool.fib trace trace-tef
nanoev_tiny_httpd))

280
examples/echo/echo.ml Normal file
View file

@ -0,0 +1,280 @@
open Tiny_httpd_core
module Log = Tiny_httpd.Log
let ( let@ ) = ( @@ )
let now_ = Unix.gettimeofday
let alice_text =
"CHAPTER I. Down the Rabbit-Hole Alice was beginning to get very tired of \
sitting by her sister on the bank, and of having nothing to do: once or \
twice she had peeped into the book her sister was reading, but it had no \
pictures or conversations in it, <and what is the use of a book,> thought \
Alice <without pictures or conversations?> So she was considering in her \
own mind (as well as she could, for the hot day made her feel very sleepy \
and stupid), whether the pleasure of making a daisy-chain would be worth \
the trouble of getting up and picking the daisies, when suddenly a White \
Rabbit with pink eyes ran close by her. There was nothing so very \
remarkable in that; nor did Alice think it so very much out of the way to \
hear the Rabbit say to itself, <Oh dear! Oh dear! I shall be late!> (when \
she thought it over afterwards, it occurred to her that she ought to have \
wondered at this, but at the time it all seemed quite natural); but when \
the Rabbit actually took a watch out of its waistcoat-pocket, and looked at \
it, and then hurried on, Alice started to her feet, for it flashed across \
her mind that she had never before seen a rabbit with either a \
waistcoat-pocket, or a watch to take out of it, and burning with curiosity, \
she ran across the field after it, and fortunately was just in time to see \
it pop down a large rabbit-hole under the hedge. In another moment down \
went Alice after it, never once considering how in the world she was to get \
out again. The rabbit-hole went straight on like a tunnel for some way, and \
then dipped suddenly down, so suddenly that Alice had not a moment to think \
about stopping herself before she found herself falling down a very deep \
well. Either the well was very deep, or she fell very slowly, for she had \
plenty of time as she went down to look about her and to wonder what was \
going to happen next. First, she tried to look down and make out what she \
was coming to, but it was too dark to see anything; then she looked at the \
sides of the well, and noticed that they were filled with cupboards......"
(* util: a little middleware collecting statistics *)
let middleware_stat () : Server.Middleware.t * (unit -> string) =
let n_req = ref 0 in
let total_time_ = ref 0. in
let parse_time_ = ref 0. in
let build_time_ = ref 0. in
let write_time_ = ref 0. in
let m h req ~resp =
incr n_req;
let t1 = Request.start_time req in
let t2 = now_ () in
h req ~resp:(fun response ->
let t3 = now_ () in
resp response;
let t4 = now_ () in
total_time_ := !total_time_ +. (t4 -. t1);
parse_time_ := !parse_time_ +. (t2 -. t1);
build_time_ := !build_time_ +. (t3 -. t2);
write_time_ := !write_time_ +. (t4 -. t3))
and get_stat () =
Printf.sprintf
"%d requests (average response time: %.3fms = %.3fms + %.3fms + %.3fms)"
!n_req
(!total_time_ /. float !n_req *. 1e3)
(!parse_time_ /. float !n_req *. 1e3)
(!build_time_ /. float !n_req *. 1e3)
(!write_time_ /. float !n_req *. 1e3)
in
m, get_stat
(* ugly AF *)
let base64 x =
let ic, oc = Unix.open_process "base64" in
output_string oc x;
flush oc;
close_out oc;
let r = input_line ic in
ignore (Unix.close_process (ic, oc));
r
let setup_logging () =
Logs.set_reporter @@ Logs.format_reporter ();
Logs.set_level ~all:true (Some Logs.Debug)
let () =
let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "main";
let port_ = ref 8080 in
let max_conn = ref 1024 in
let j = ref 8 in
Arg.parse
(Arg.align
[
"--port", Arg.Set_int port_, " set port";
"-p", Arg.Set_int port_, " set port";
"-j", Arg.Set_int j, " number of threads";
"--debug", Arg.Unit setup_logging, " enable debug";
"--max-conns", Arg.Set_int max_conn, " maximum concurrent connections";
])
(fun _ -> raise (Arg.Bad ""))
"echo [option]*";
let@ pool = Moonpool.Ws_pool.with_ ~num_threads:!j () in
let@ _runner = Moonpool_fib.main in
let ev = Nanoev_unix.create () in
Nanoev_picos.setup_bg_thread ev;
let server =
Nanoev_tiny_httpd.create ~new_thread:(Moonpool.run_async pool) ~port:!port_
~max_connections:!max_conn ()
in
let m_stats, get_stats = middleware_stat () in
Server.add_middleware server ~stage:(`Stage 1) m_stats;
(* say hello *)
Server.add_route_handler ~meth:`GET server
Route.(exact "hello" @/ string @/ return)
(fun name _req -> Response.make_string (Ok ("hello " ^ name ^ "!\n")));
(* compressed file access *)
Server.add_route_handler ~meth:`GET server
Route.(exact "zcat" @/ string_urlencoded @/ return)
(fun path _req ->
let ic = open_in path in
let str = IO.Input.of_in_channel ic in
let mime_type =
try
let p = Unix.open_process_in (Printf.sprintf "file -i -b %S" path) in
try
let s = [ "Content-Type", String.trim (input_line p) ] in
ignore @@ Unix.close_process_in p;
s
with _ ->
ignore @@ Unix.close_process_in p;
[]
with _ -> []
in
Response.make_stream ~headers:mime_type (Ok str));
(* echo request *)
Server.add_route_handler server
Route.(exact "echo" @/ return)
(fun req ->
let q =
Request.query req
|> List.map (fun (k, v) -> Printf.sprintf "%S = %S" k v)
|> String.concat ";"
in
Response.make_string
(Ok (Format.asprintf "echo:@ %a@ (query: %s)@." Request.pp req q)));
(* file upload *)
Server.add_route_handler_stream ~meth:`PUT server
Route.(exact "upload" @/ string @/ return)
(fun path req ->
Log.debug (fun k ->
k "start upload %S, headers:\n%s\n\n%!" path
(Format.asprintf "%a" Headers.pp (Request.headers req)));
try
let oc = open_out @@ "/tmp/" ^ path in
IO.Input.to_chan oc req.Request.body;
flush oc;
Response.make_string (Ok "uploaded file")
with e ->
Response.fail ~code:500 "couldn't upload file: %s"
(Printexc.to_string e));
(* protected by login *)
Server.add_route_handler server
Route.(exact "protected" @/ return)
(fun req ->
let ok =
match Request.get_header req "authorization" with
| Some v ->
Log.debug (fun k -> k "authenticate with %S" v);
v = "Basic " ^ base64 "user:foobar"
| None -> false
in
if ok then (
(* FIXME: a logout link *)
let s =
"<p>hello, this is super secret!</p><a href=\"/logout\">log out</a>"
in
Response.make_string (Ok s)
) else (
let headers =
Headers.(empty |> set "www-authenticate" "basic realm=\"echo\"")
in
Response.fail ~code:401 ~headers "invalid"
));
(* logout *)
Server.add_route_handler server
Route.(exact "logout" @/ return)
(fun _req -> Response.fail ~code:401 "logged out");
(* stats *)
Server.add_route_handler server
Route.(exact "stats" @/ return)
(fun _req ->
let stats = get_stats () in
Response.make_string @@ Ok stats);
Server.add_route_handler server
Route.(exact "alice" @/ return)
(fun _req -> Response.make_string (Ok alice_text));
(* main page *)
Server.add_route_handler server
Route.(return)
(fun _req ->
let open Tiny_httpd_html in
let h =
html []
[
head [] [ title [] [ txt "index of echo" ] ];
body []
[
h3 [] [ txt "welcome!" ];
p [] [ b [] [ txt "endpoints are:" ] ];
ul []
[
li [] [ pre [] [ txt "/hello/:name (GET)" ] ];
li []
[
pre []
[
a [ A.href "/echo/" ] [ txt "echo" ];
txt " echo back query";
];
];
li []
[ pre [] [ txt "/upload/:path (PUT) to upload a file" ] ];
li []
[
pre []
[
txt
"/zcat/:path (GET) to download a file (deflate \
transfer-encoding)";
];
];
li []
[
pre []
[
a [ A.href "/stats/" ] [ txt "/stats/" ];
txt " (GET) to access statistics";
];
];
li []
[
pre []
[
a [ A.href "/protected" ] [ txt "/protected" ];
txt
" (GET) to see a protected page (login: user, \
password: foobar)";
];
];
li []
[
pre []
[
a [ A.href "/logout" ] [ txt "/logout" ];
txt " (POST) to log out";
];
];
];
];
]
in
let s = to_string_top h in
Response.make_string ~headers:[ "content-type", "text/html" ] @@ Ok s);
Printf.printf "listening on http://%s:%d\n%!" (Server.addr server)
(Server.port server);
match Server.run server with
| Ok () -> ()
| Error e -> raise e

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
synopsis: "Tiny event loop around `select`"
synopsis: "Tiny event loop abstraction"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]
license: "MIT"
@ -13,6 +13,10 @@ depends: [
"base-unix"
"odoc" {with-doc}
]
depopts: [
"trace" {>= "0.7"}
"picos" {>= "0.5" & < "0.7"}
]
build: [
["dune" "subst"] {dev}
[

32
nanoev_tiny_httpd.opam Normal file
View file

@ -0,0 +1,32 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
synopsis: "Use nanoev as a basis for tiny_httpd"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]
license: "MIT"
tags: ["nanoev" "http"]
homepage: "https://github.com/c-cube/nanoev"
bug-reports: "https://github.com/c-cube/nanoev/issues"
depends: [
"ocaml"
"dune" {>= "2.7"}
"nanoev"
"picos"
"tiny_httpd" {>= "0.17"}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/c-cube/nanoev.git"

11
src/core/dune Normal file
View file

@ -0,0 +1,11 @@
(library
(name nanoev)
(public_name nanoev)
(synopsis "Nano ev loop")
(libraries
unix
(select
trace_.ml
from
(trace.core -> trace_.real.ml)
(-> trace_.dummy.ml))))

View file

@ -1,11 +1,28 @@
module Trace_ = Trace_
exception Closed
module Impl = struct
type 'st ops = {
clear: 'st -> unit;
wakeup_from_outside: 'st -> unit;
close: 'st -> Unix.file_descr -> unit;
on_readable:
'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
'a 'b.
'st ->
Unix.file_descr ->
'a ->
'b ->
(closed:bool -> 'a -> 'b -> unit) ->
unit;
on_writable:
'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
'a 'b.
'st ->
Unix.file_descr ->
'a ->
'b ->
(closed:bool -> 'a -> 'b -> unit) ->
unit;
run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
step: 'st -> unit;
}
@ -21,6 +38,7 @@ type t = Impl.t
let[@inline] clear (Ev (ops, st)) = ops.clear st
let[@inline] wakeup_from_outside (Ev (ops, st)) = ops.wakeup_from_outside st
let[@inline] close (Ev (ops, st)) fd = ops.close st fd
let[@inline] on_readable (Ev (ops, st)) fd x y f : unit =
ops.on_readable st fd x y f
@ -32,13 +50,3 @@ let[@inline] run_after_s (Ev (ops, st)) time x y f : unit =
ops.run_after_s st time x y f
let[@inline] step (Ev (ops, st)) : unit = ops.step st
(*
let rec read (self:t) fd buf i len : int =
match Unix.read fd buf i len with
| n -> n
| exception Unix.Unix_error (Unix, _, _) ->
read self fd buf i len
*)

58
src/core/nanoev.mli Normal file
View file

@ -0,0 +1,58 @@
(** Nano event loop *)
type t
exception Closed
module Impl : sig
type 'st ops = {
clear: 'st -> unit;
wakeup_from_outside: 'st -> unit;
close: 'st -> Unix.file_descr -> unit;
on_readable:
'a 'b.
'st ->
Unix.file_descr ->
'a ->
'b ->
(closed:bool -> 'a -> 'b -> unit) ->
unit;
on_writable:
'a 'b.
'st ->
Unix.file_descr ->
'a ->
'b ->
(closed:bool -> 'a -> 'b -> unit) ->
unit;
run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
step: 'st -> unit;
}
val build : 'a ops -> 'a -> t
end
val clear : t -> unit
(** Reset the state *)
val wakeup_from_outside : t -> unit
val step : t -> unit
(** Run one step of the event loop until something happens *)
val close : t -> Unix.file_descr -> unit
(** Close the file descriptor and clean it up *)
val on_readable :
t -> Unix.file_descr -> 'a -> 'b -> (closed:bool -> 'a -> 'b -> unit) -> unit
val on_writable :
t -> Unix.file_descr -> 'a -> 'b -> (closed:bool -> 'a -> 'b -> unit) -> unit
val run_after_s : t -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit
(**/**)
module Trace_ = Trace_
(**/**)

3
src/core/trace_.dummy.ml Normal file
View file

@ -0,0 +1,3 @@
let[@inline] with_span ?data:_ ~__FILE__:_ ~__LINE__:_ _name f = f 0L
let[@inline] message ?data:_ _ = ()
let set_thread_name (_ : string) = ()

5
src/core/trace_.real.ml Normal file
View file

@ -0,0 +1,5 @@
let[@inline] with_span ?data ~__FILE__ ~__LINE__ name f =
Trace_core.with_span ?data ~__FILE__ ~__LINE__ name f
let[@inline] message ?data m = Trace_core.message ?data m
let set_thread_name name = Trace_core.set_thread_name name

View file

@ -1,5 +0,0 @@
(library
(name nanoev)
(public_name nanoev)
(synopsis "Nano ev loop")
(libraries unix))

View file

@ -1,30 +0,0 @@
(** Nano event loop *)
type t
module Impl : sig
type 'st ops = {
clear: 'st -> unit;
wakeup_from_outside: 'st -> unit;
on_readable:
'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
on_writable:
'a 'b. 'st -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
run_after_s: 'a 'b. 'st -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit;
step: 'st -> unit;
}
val build : 'a ops -> 'a -> t
end
val clear : t -> unit
(** Reset the state *)
val wakeup_from_outside : t -> unit
val step : t -> unit
(** Run one step of the event loop until something happens *)
val on_readable : t -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit
val on_writable : t -> Unix.file_descr -> 'a -> 'b -> ('a -> 'b -> unit) -> unit
val run_after_s : t -> float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit

5
src/picos/dune Normal file
View file

@ -0,0 +1,5 @@
(library
(name nanoev_picos)
(public_name nanoev.picos)
(optional) ; picos
(libraries threads picos nanoev))

146
src/picos/nanoev_picos.ml Normal file
View file

@ -0,0 +1,146 @@
open struct
module Trace_ = Nanoev.Trace_
let ( let@ ) = ( @@ )
end
exception Closed = Nanoev.Closed
module Global_ = struct
type st =
| None
| Some of {
active: bool Atomic.t;
nanoev: Nanoev.t;
th: Thread.t;
}
let st : st Atomic.t = Atomic.make None
let lock = Mutex.create ()
let with_lock lock f =
Mutex.lock lock;
match f () with
| exception e ->
Mutex.unlock lock;
raise e
| x ->
Mutex.unlock lock;
x
let bg_thread_ ~active ~evloop () : unit =
Trace_.set_thread_name "nanoev.picos.bg-thread";
while Atomic.get active do
Nanoev.step evloop
done
let[@inline] has_bg_thread () = Atomic.get st <> None
let setup_bg_thread (ev : Nanoev.t) : unit =
let@ () = with_lock lock in
(* shutdown existing thread, if any *)
(match Atomic.get st with
| Some st ->
Atomic.set st.active false;
Nanoev.wakeup_from_outside st.nanoev;
Thread.join st.th
| None -> ());
(* start new bg thread *)
let active = Atomic.make true in
Atomic.set st
@@ Some
{
active;
nanoev = ev;
th = Thread.create (bg_thread_ ~active ~evloop:ev) ();
}
end
let has_bg_thread = Global_.has_bg_thread
let setup_bg_thread = Global_.setup_bg_thread
let[@inline] get_loop_exn_ () : Nanoev.t =
match Atomic.get Global_.st with
| None -> failwith "No nanoev loop installed."
| Some st -> st.nanoev
let[@inline] unwrap_ = function
| None -> ()
| Some (exn, bt) -> Printexc.raise_with_backtrace exn bt
let retry_read_ fd f =
let ev = get_loop_exn_ () in
let[@unroll 1] rec loop () =
match f () with
| res -> res
| exception
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
Trace_.message "read must wait";
let trigger = Picos.Trigger.create () in
let closed_r = ref false in
Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r ->
closed_r := closed;
Picos.Trigger.signal trigger);
Picos.Trigger.await trigger |> unwrap_;
if !closed_r then raise Closed;
loop ()
in
loop ()
let retry_write_ fd f =
let ev = get_loop_exn_ () in
let rec loop () =
match f () with
| res -> res
| exception
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
Trace_.message "write must wait";
let trigger = Picos.Trigger.create () in
let closed_r = ref false in
Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r ->
closed_r := closed;
Picos.Trigger.signal trigger);
Picos.Trigger.await trigger |> unwrap_;
if !closed_r then raise Closed;
loop ()
in
loop ()
let read fd buf i len : int =
try
retry_read_ fd (fun () ->
Trace_.message "read";
Unix.read fd buf i len)
with Closed -> 0
let close fd =
Unix.close fd;
let ev = get_loop_exn_ () in
Nanoev.close ev fd
let accept fd =
try
retry_read_ fd (fun () ->
Trace_.message "accept";
Unix.accept fd)
with Unix.Unix_error ((Unix.ESHUTDOWN | Unix.ECONNABORTED), _, _) ->
raise Closed
let write fd buf i len : int =
try
retry_write_ fd (fun () ->
Trace_.message "write";
Unix.write fd buf i len)
with Closed -> 0
let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr)
let sleep t =
if t > 0. then (
let ev = get_loop_exn_ () in
let trigger = Picos.Trigger.create () in
Nanoev.run_after_s ev t trigger () (fun trigger () ->
Picos.Trigger.signal trigger);
Picos.Trigger.await trigger |> unwrap_
)

View file

@ -0,0 +1,32 @@
(** Basic interface with picos *)
val setup_bg_thread : Nanoev.t -> unit
(** Install this event loop in a background thread *)
val has_bg_thread : unit -> bool
(** [has_bg_thread ()] is [true] iff a background thread is running a nanoev loop *)
(** {2 Non blocking IO primitives} *)
val read : Unix.file_descr -> bytes -> int -> int -> int
(** Read from the non blocking FD.
@raise Nanoev.Closed if the FD is closed
@raise Unix.Unix_error for other errors *)
val write : Unix.file_descr -> bytes -> int -> int -> int
(** Write into the non blocking FD.
@raise Nanoev.Closed if the FD is closed
@raise Unix.Unix_error for other errors *)
val close : Unix.file_descr -> unit
(** Close the file descriptor
@raise Unix.Unix_error when it fails *)
val connect : Unix.file_descr -> Unix.sockaddr -> unit
val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr
(** Accept a connection on this fd.
@raise Nanoev.Closed if the FD is closed.
@raise Unix.Unix_error for other errors *)
val sleep : float -> unit

10
src/tiny_httpd/dune Normal file
View file

@ -0,0 +1,10 @@
(library
(name nanoev_tiny_httpd)
(public_name nanoev_tiny_httpd)
(libraries
threads
picos
(re_export nanoev)
nanoev.picos
(re_export iostream)
(re_export tiny_httpd)))

View file

@ -0,0 +1,328 @@
module TH = Tiny_httpd_core
module EV = Nanoev_picos
module Log = TH.Log
module Slice = Iostream.Slice
module Pool = TH.Pool
module Buf = TH.Buf
let unwrap_ = function
| None -> ()
| Some (exn, bt) -> Printexc.raise_with_backtrace exn bt
(** Non blocking semaphore *)
module Sem_ = struct
type t = {
mutable n: int;
max: int;
waiting: Picos.Trigger.t Queue.t;
mutex: Mutex.t;
}
let create n =
if n <= 0 then invalid_arg "Semaphore.create";
{ n; max = n; mutex = Mutex.create (); waiting = Queue.create () }
let acquire self =
Mutex.lock self.mutex;
while self.n = 0 do
let tr = Picos.Trigger.create () in
Queue.push tr self.waiting;
Mutex.unlock self.mutex;
let res = Picos.Trigger.await tr in
unwrap_ res;
Mutex.lock self.mutex
done;
assert (self.n > 0);
self.n <- self.n - 1;
Mutex.unlock self.mutex
let release self =
Mutex.lock self.mutex;
self.n <- self.n + 1;
Option.iter Picos.Trigger.signal (Queue.take_opt self.waiting);
Mutex.unlock self.mutex
let num_acquired self = self.max - self.n
end
module Out = struct
open Iostream
class type t = Out_buf.t
class of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t)
(fd : Unix.file_descr) :
t =
object
inherit Out_buf.t_from_output ~bytes:buf.bytes ()
method private output_underlying bs i len0 =
let i = ref i in
let len = ref len0 in
while !len > 0 do
match EV.write fd bs !i !len with
| 0 -> failwith "write failed"
| n ->
i := !i + n;
len := !len - n
| exception
Unix.Unix_error
( (( Unix.EBADF | Unix.ENOTCONN | Unix.ESHUTDOWN
| Unix.ECONNRESET | Unix.EPIPE ) as err),
fn,
_ ) ->
failwith
@@ Printf.sprintf "write failed in %s: %s" fn
(Unix.error_message err)
done
method private close_underlying () =
if not (Atomic.exchange closed true) then
if close_noerr then (
try EV.close fd with _ -> ()
) else
EV.close fd
end
end
module In = struct
open Iostream
class type t = In_buf.t
let of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t)
(fd : Unix.file_descr) : t =
let eof = ref false in
object
inherit Iostream.In_buf.t_from_refill ~bytes:buf.bytes ()
method private refill (slice : Slice.t) =
if not !eof then (
slice.off <- 0;
let continue = ref true in
while !continue do
match EV.read fd slice.bytes 0 (Bytes.length slice.bytes) with
| n ->
slice.len <- n;
continue := false
| exception
Unix.Unix_error
( ( Unix.EBADF | Unix.ENOTCONN | Unix.ESHUTDOWN
| Unix.ECONNRESET | Unix.EPIPE ),
_,
_ ) ->
eof := true;
continue := false
done;
(* Printf.eprintf "read returned %d B\n%!" !n; *)
if slice.len = 0 then eof := true
)
method close () =
if not (Atomic.exchange closed true) then (
eof := true;
if close_noerr then (
try EV.close fd with _ -> ()
) else
EV.close fd
)
end
end
module Unix_tcp_server_ = struct
let get_addr_ sock =
match Unix.getsockname sock with
| Unix.ADDR_INET (addr, port) -> addr, port
| _ -> invalid_arg "httpd: address is not INET"
type t = {
addr: string;
port: int;
buf_pool: Buf.t Pool.t;
slice_pool: Slice.t Pool.t;
max_connections: int;
sem_max_connections: Sem_.t;
(** semaphore to restrict the number of active concurrent connections *)
mutable sock: Unix.file_descr option; (** Socket *)
new_thread: (unit -> unit) -> unit;
timeout: float;
masksigpipe: bool;
mutable running: bool; (* TODO: use an atomic? *)
}
let shutdown_silent_ fd =
try Unix.shutdown fd Unix.SHUTDOWN_ALL with _ -> ()
let close_silent_ fd = try Unix.close fd with _ -> ()
let to_tcp_server (self : t) : TH.IO.TCP_server.builder =
{
TH.IO.TCP_server.serve =
(fun ~after_init ~handle () : unit ->
if self.masksigpipe && not Sys.win32 then
ignore (Unix.sigprocmask Unix.SIG_BLOCK [ Sys.sigpipe ] : _ list);
let sock, should_bind =
match self.sock with
| Some s ->
s, false
(* Because we're getting a socket from the caller (e.g. systemd) *)
| None ->
let s =
Unix.socket
(if TH.Util.is_ipv6_str self.addr then
Unix.PF_INET6
else
Unix.PF_INET)
Unix.SOCK_STREAM 0
in
s, true (* Because we're creating the socket ourselves *)
in
Unix.set_nonblock sock;
Unix.setsockopt_optint sock Unix.SO_LINGER None;
if should_bind then (
let inet_addr = Unix.inet_addr_of_string self.addr in
Unix.setsockopt sock Unix.SO_REUSEADDR true;
Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port));
let n_listen = 2 * self.max_connections in
Unix.listen sock n_listen
);
self.sock <- Some sock;
let tcp_server =
{
TH.IO.TCP_server.stop = (fun () -> self.running <- false);
running = (fun () -> self.running);
active_connections =
(fun () -> Sem_.num_acquired self.sem_max_connections);
endpoint =
(fun () ->
let addr, port = get_addr_ sock in
Unix.string_of_inet_addr addr, port);
}
in
after_init tcp_server;
(* how to handle a single client *)
let handle_client_ (client_sock : Unix.file_descr)
(client_addr : Unix.sockaddr) : unit =
Log.debug (fun k ->
k "t[%d]: serving new client on %s"
(Thread.id @@ Thread.self ())
(TH.Util.show_sockaddr client_addr));
if self.masksigpipe && not Sys.win32 then
ignore (Unix.sigprocmask Unix.SIG_BLOCK [ Sys.sigpipe ] : _ list);
Unix.set_nonblock client_sock;
Unix.setsockopt client_sock Unix.TCP_NODELAY true;
Unix.(setsockopt_float client_sock SO_RCVTIMEO self.timeout);
Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout);
Pool.with_resource self.slice_pool @@ fun ic_buf ->
Pool.with_resource self.slice_pool @@ fun oc_buf ->
let closed = Atomic.make false in
let oc =
new Out.of_unix_fd
~close_noerr:true ~closed ~buf:oc_buf client_sock
in
let ic =
In.of_unix_fd ~close_noerr:true ~closed ~buf:ic_buf client_sock
in
handle.handle ~client_addr ic oc
in
Unix.set_nonblock sock;
while self.running do
match EV.accept sock with
| client_sock, client_addr ->
(* limit concurrency *)
Sem_.acquire self.sem_max_connections;
(* Block INT/HUP while cloning to avoid children handling them.
When thread gets them, our Unix.accept raises neatly. *)
if not Sys.win32 then
ignore Unix.(sigprocmask SIG_BLOCK Sys.[ sigint; sighup ]);
self.new_thread (fun () ->
try
handle_client_ client_sock client_addr;
Log.debug (fun k ->
k "t[%d]: done with client on %s, exiting"
(Thread.id @@ Thread.self ())
@@ TH.Util.show_sockaddr client_addr);
shutdown_silent_ client_sock;
close_silent_ client_sock;
Sem_.release self.sem_max_connections
with e ->
let bt = Printexc.get_raw_backtrace () in
shutdown_silent_ client_sock;
close_silent_ client_sock;
Sem_.release self.sem_max_connections;
Log.error (fun k ->
k
"@[<v>Handler: uncaught exception for client %s:@ \
%s@ %s@]"
(TH.Util.show_sockaddr client_addr)
(Printexc.to_string e)
(Printexc.raw_backtrace_to_string bt)));
if not Sys.win32 then
ignore Unix.(sigprocmask SIG_UNBLOCK Sys.[ sigint; sighup ])
| exception e ->
Log.error (fun k ->
k "Unix.accept raised an exception: %s" (Printexc.to_string e))
done;
(* Wait for all threads to be done: this only works if all threads are done. *)
Unix.close sock;
(* TODO? *)
(* Sem_.acquire self.sem_max_connections.max self.sem_max_connections; *)
());
}
end
open struct
let get_max_connection_ ?(max_connections = 2048) () : int =
let max_connections = max 4 max_connections in
max_connections
let clear_slice (slice : Slice.t) =
Bytes.fill slice.bytes 0 (Bytes.length slice.bytes) '\x00';
slice.off <- 0;
slice.len <- 0
end
let create ?(masksigpipe = not Sys.win32) ?max_connections ?(timeout = 0.0)
?buf_size ?(get_time_s = Unix.gettimeofday) ?(addr = "127.0.0.1")
?(port = 8080) ?sock ?middlewares ~new_thread () : TH.Server.t =
let max_connections = get_max_connection_ ?max_connections () in
let server =
{
Unix_tcp_server_.addr;
new_thread;
buf_pool =
Pool.create ~clear:Buf.clear_and_zero
~mk_item:(fun () -> Buf.create ?size:buf_size ())
();
slice_pool =
Pool.create ~clear:clear_slice
~mk_item:
(let buf_size = Option.value buf_size ~default:4096 in
fun () -> Slice.create buf_size)
();
running = true;
port;
sock;
max_connections;
sem_max_connections = Sem_.create max_connections;
masksigpipe;
timeout;
}
in
let tcp_server_builder = Unix_tcp_server_.to_tcp_server server in
let module B = struct
let init_addr () = addr
let init_port () = port
let get_time_s = get_time_s
let tcp_server () = tcp_server_builder
end in
let backend = (module B : TH.Server.IO_BACKEND) in
TH.Server.create_from ?buf_size ?middlewares ~backend ()

View file

@ -0,0 +1,15 @@
module TH = Tiny_httpd_core
val create :
?masksigpipe:bool ->
?max_connections:int ->
?timeout:float ->
?buf_size:int ->
?get_time_s:(unit -> float) ->
?addr:string ->
?port:int ->
?sock:Unix.file_descr ->
?middlewares:([ `Encoding | `Stage of int ] * TH.Server.Middleware.t) list ->
new_thread:((unit -> unit) -> unit) ->
unit ->
TH.Server.t

View file

@ -1,12 +1,14 @@
(* module type BACKEND = Intf.BACKEND *)
open struct
module Trace_ = Nanoev.Trace_
let ( let@ ) = ( @@ )
let now_ : unit -> float = Unix.gettimeofday
end
(** Callback list *)
type cbs =
| Nil
| Sub : 'a * 'b * ('a -> 'b -> unit) * cbs -> cbs
| Sub : 'a * 'b * (closed:bool -> 'a -> 'b -> unit) * cbs -> cbs
let[@inline] cb_is_empty = function
| Nil -> true
@ -42,6 +44,18 @@ type st = {
lock: Mutex.t;
}
let rec perform_cbs ~closed = function
| Nil -> ()
| Sub (x, y, f, tail) ->
f ~closed x y;
perform_cbs ~closed tail
let rec perform_cbs_closed ~closed = function
| Nil -> ()
| Sub (x, y, f, tail) ->
f ~closed x y;
perform_cbs_closed ~closed tail
let leq_timer (Timer a) (Timer b) = a.deadline <= b.deadline
let create_st () : st =
@ -80,10 +94,12 @@ let clear (self : st) =
()
let wakeup_from_outside (self : st) : unit =
if not (Atomic.exchange self.wakeup_triggered true) then (
if not (Atomic.exchange self.wakeup_triggered true) then
let@ _sp =
Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.wakeup-from-outside"
in
let b = Bytes.make 1 '!' in
ignore (Unix.write self.wakeup_wr b 0 1 : int)
)
let get_fd_ (self : st) fd : per_fd =
match Hashtbl.find self.fds fd with
@ -93,7 +109,27 @@ let get_fd_ (self : st) fd : per_fd =
Hashtbl.add self.fds fd per_fd;
per_fd
let close self fd : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.close" in
let r, w =
let@ self = with_lock_ self in
match Hashtbl.find self.fds fd with
| per_fd ->
Hashtbl.remove self.fds fd;
self.sub_up_to_date <- false;
if Atomic.get self.in_select then wakeup_from_outside self;
per_fd.r, per_fd.w
| exception Not_found ->
invalid_arg "File descriptor is not known to Nanoev"
in
(* call callbacks outside of the lock *)
perform_cbs_closed ~closed:true r;
perform_cbs_closed ~closed:true w;
()
let on_readable self fd x y f : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-readable" in
let@ self = with_lock_ self in
let per_fd = get_fd_ self fd in
per_fd.r <- Sub (x, y, f, per_fd.r);
@ -101,6 +137,7 @@ let on_readable self fd x y f : unit =
if Atomic.get self.in_select then wakeup_from_outside self
let on_writable self fd x y f : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.on-writable" in
let@ self = with_lock_ self in
let per_fd = get_fd_ self fd in
per_fd.w <- Sub (x, y, f, per_fd.w);
@ -108,6 +145,7 @@ let on_writable self fd x y f : unit =
if Atomic.get self.in_select then wakeup_from_outside self
let run_after_s self time x y f : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.run-after-s" in
let@ self = with_lock_ self in
let deadline = now_ () +. time in
Heap.insert self.timer (Timer { deadline; x; y; f });
@ -115,6 +153,7 @@ let run_after_s self time x y f : unit =
let recompute_if_needed (self : st) =
if not self.sub_up_to_date then (
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "recompute-if-needed" in
self.sub_up_to_date <- true;
self.sub_r <- [];
self.sub_w <- [];
@ -132,13 +171,8 @@ let next_deadline_ (self : st) : float option =
| exception Heap.Empty -> None
| Timer t -> Some t.deadline
let rec perform_cbs = function
| Nil -> ()
| Sub (x, y, f, tail) ->
f x y;
perform_cbs tail
let step (self : st) : unit =
let@ _sp = Trace_.with_span ~__FILE__ ~__LINE__ "nanoev.unix.step" in
(* gather the subscriptions and timeout *)
let timeout, sub_r, sub_w =
let@ self = with_lock_ self in
@ -154,6 +188,14 @@ let step (self : st) : unit =
(* enter [select] *)
Atomic.set self.in_select true;
let r_reads, r_writes, _ =
let@ _sp =
Trace_.with_span ~__FILE__ ~__LINE__ "select" ~data:(fun () ->
[
"timeout", `Float timeout;
"reads", `Int (List.length sub_r);
"writes", `Int (List.length sub_w);
])
in
Unix.select (self.wakeup_rd :: sub_r) sub_w [] timeout
in
Atomic.set self.in_select false;
@ -176,8 +218,10 @@ let step (self : st) : unit =
List.iter
(fun fd ->
if fd != self.wakeup_rd then (
let per_fd = Hashtbl.find self.fds fd in
ready_r := per_fd :: !ready_r)
ready_r := per_fd :: !ready_r
))
r_reads;
List.iter
(fun fd ->
@ -188,19 +232,27 @@ let step (self : st) : unit =
(* call callbacks *)
List.iter
(fun fd ->
perform_cbs fd.r;
perform_cbs ~closed:false fd.r;
fd.r <- Nil)
!ready_r;
List.iter
(fun fd ->
perform_cbs fd.w;
perform_cbs ~closed:false fd.w;
fd.w <- Nil)
!ready_w;
()
let ops : st Nanoev.Impl.ops =
{ step; on_readable; on_writable; run_after_s; wakeup_from_outside; clear }
{
step;
close;
on_readable;
on_writable;
run_after_s;
wakeup_from_outside;
clear;
}
include Nanoev

View file

@ -1,4 +1,3 @@
(tests
(names t1)
(libraries nanoev nanoev.unix threads))

View file

@ -15,7 +15,11 @@ let () =
let ev = E.create () in
ignore (Thread.create loop ev : Thread.t);
let rd, wr = mkpipe () in
E.on_readable ev rd () () (fun () () -> print_endline "can read");
E.on_readable ev rd () () (fun ~closed () () ->
if closed then
print_endline "closed!"
else
print_endline "can read");
Thread.delay 0.05;
print_endline "writing";
ignore