mirror of
https://github.com/c-cube/tiny_httpd.git
synced 2026-03-07 21:37:57 -05:00
eio backend, second attempt (#95)
feat: `tiny_httpd_eio` library provides a tiny_httpd server that relies on Eio for non-blocking sockets and for concurrency using eio fibers.
This commit is contained in:
parent
ba19880d75
commit
a07936dac4
16 changed files with 794 additions and 51 deletions
1
.github/workflows/format.yml
vendored
1
.github/workflows/format.yml
vendored
|
|
@ -6,6 +6,7 @@ on:
|
|||
branches:
|
||||
- main
|
||||
|
||||
jobs:
|
||||
format:
|
||||
name: format
|
||||
strategy:
|
||||
|
|
|
|||
36
.github/workflows/gh-pages.yml
vendored
36
.github/workflows/gh-pages.yml
vendored
|
|
@ -1,36 +0,0 @@
|
|||
name: github pages
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
|
||||
jobs:
|
||||
deploy:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Use OCaml
|
||||
uses: ocaml/setup-ocaml@v3
|
||||
with:
|
||||
ocaml-compiler: 5.03.x
|
||||
dune-cache: true
|
||||
allow-prerelease-opam: true
|
||||
|
||||
- name: Deps
|
||||
run: opam install odig tiny_httpd tiny_httpd_camlzip
|
||||
|
||||
- name: Build
|
||||
run: opam exec -- odig odoc --cache-dir=_doc/ tiny_httpd tiny_httpd_camlzip
|
||||
|
||||
- name: Deploy
|
||||
uses: peaceiris/actions-gh-pages@v3
|
||||
with:
|
||||
github_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
publish_dir: ./_doc/html
|
||||
destination_dir: .
|
||||
enable_jekyll: false
|
||||
#keep_files: true
|
||||
|
||||
18
.github/workflows/main.yml
vendored
18
.github/workflows/main.yml
vendored
|
|
@ -38,7 +38,15 @@ jobs:
|
|||
|
||||
- run: opam install ./tiny_httpd.opam ./tiny_httpd_camlzip.opam --deps-only --with-test
|
||||
|
||||
- run: opam exec -- dune build @install -p tiny_httpd,tiny_httpd_camlzip
|
||||
- name: Build (OCaml 4.x)
|
||||
run: opam exec -- dune build @install -p tiny_httpd,tiny_httpd_camlzip
|
||||
if: ${{ !startsWith(matrix.ocaml-compiler, '5.') }}
|
||||
|
||||
- name: Build (OCaml 5.x, includes eio)
|
||||
run: |
|
||||
opam install ./tiny_httpd.opam ./tiny_httpd_eio.opam --deps-only --with-test
|
||||
opam exec -- dune build @install -p tiny_httpd,tiny_httpd_camlzip,tiny_httpd_eio
|
||||
if: ${{ startsWith(matrix.ocaml-compiler, '5.') }}
|
||||
|
||||
- run: opam exec -- dune build @src/runtest @examples/runtest @tests/runtest -p tiny_httpd
|
||||
if: ${{ matrix.os == 'ubuntu-latest' }}
|
||||
|
|
@ -50,4 +58,10 @@ jobs:
|
|||
|
||||
- run: opam install logs magic-mime -y
|
||||
|
||||
- run: opam exec -- dune build @install -p tiny_httpd,tiny_httpd_camlzip
|
||||
- name: Final build (OCaml 4.x)
|
||||
run: opam exec -- dune build @install -p tiny_httpd,tiny_httpd_camlzip
|
||||
if: ${{ !startsWith(matrix.ocaml-compiler, '5.') }}
|
||||
|
||||
- name: Final build (OCaml 5.x, includes eio)
|
||||
run: opam exec -- dune build @install -p tiny_httpd,tiny_httpd_camlzip,tiny_httpd_eio
|
||||
if: ${{ startsWith(matrix.ocaml-compiler, '5.') }}
|
||||
|
|
|
|||
12
dune-project
12
dune-project
|
|
@ -28,7 +28,7 @@
|
|||
(logs :with-test)
|
||||
(conf-libcurl :with-test)
|
||||
(ptime :with-test)
|
||||
(qcheck-core (and (>= 0.9) :with-test))))
|
||||
(qcheck-core (and (>= 0.91) :with-test))))
|
||||
|
||||
(package
|
||||
(name tiny_httpd_camlzip)
|
||||
|
|
@ -39,3 +39,13 @@
|
|||
(iostream-camlzip (>= 0.2.1))
|
||||
(logs :with-test)
|
||||
(odoc :with-doc)))
|
||||
|
||||
(package
|
||||
(name tiny_httpd_eio)
|
||||
(synopsis "Use eio for tiny_httpd")
|
||||
(depends
|
||||
(tiny_httpd (= :version))
|
||||
(eio (and (>= 1.0) (< 2.0)))
|
||||
base-unix
|
||||
(logs :with-test)
|
||||
(odoc :with-doc)))
|
||||
|
|
|
|||
2
echo_eio.sh
Executable file
2
echo_eio.sh
Executable file
|
|
@ -0,0 +1,2 @@
|
|||
#!/bin/sh
|
||||
exec dune exec --display=quiet --profile=release "examples/echo_eio.exe" -- $@
|
||||
|
|
@ -11,10 +11,27 @@
|
|||
(executable
|
||||
(name echo)
|
||||
(flags :standard -warn-error -a+8)
|
||||
(modules echo vfs)
|
||||
(modules echo)
|
||||
(libraries
|
||||
tiny_httpd
|
||||
logs
|
||||
echo_vfs
|
||||
tiny_httpd_camlzip
|
||||
tiny_httpd.multipart-form-data))
|
||||
|
||||
(executable
|
||||
(name echo_eio)
|
||||
(flags :standard -warn-error -a+8)
|
||||
(modules echo_eio)
|
||||
(libraries
|
||||
tiny_httpd
|
||||
tiny_httpd_eio
|
||||
eio
|
||||
eio_main
|
||||
logs
|
||||
echo_vfs
|
||||
trace.core
|
||||
trace-tef
|
||||
tiny_httpd_camlzip
|
||||
tiny_httpd.multipart-form-data))
|
||||
|
||||
|
|
@ -55,6 +72,12 @@
|
|||
|
||||
; produce an embedded FS
|
||||
|
||||
(library
|
||||
(name echo_vfs)
|
||||
(modules vfs)
|
||||
(wrapped false)
|
||||
(libraries tiny_httpd))
|
||||
|
||||
(rule
|
||||
(targets vfs.ml)
|
||||
(deps
|
||||
|
|
|
|||
410
examples/echo_eio.ml
Normal file
410
examples/echo_eio.ml
Normal file
|
|
@ -0,0 +1,410 @@
|
|||
open Tiny_httpd_core
|
||||
module Trace = Trace_core
|
||||
module Log = Tiny_httpd.Log
|
||||
module MFD = Tiny_httpd_multipart_form_data
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
let now_ = Unix.gettimeofday
|
||||
|
||||
let alice_text =
|
||||
"CHAPTER I. Down the Rabbit-Hole Alice was beginning to get very tired of \
|
||||
sitting by her sister on the bank, and of having nothing to do: once or \
|
||||
twice she had peeped into the book her sister was reading, but it had no \
|
||||
pictures or conversations in it, <and what is the use of a book,> thought \
|
||||
Alice <without pictures or conversations?> So she was considering in her \
|
||||
own mind (as well as she could, for the hot day made her feel very sleepy \
|
||||
and stupid), whether the pleasure of making a daisy-chain would be worth \
|
||||
the trouble of getting up and picking the daisies, when suddenly a White \
|
||||
Rabbit with pink eyes ran close by her. There was nothing so very \
|
||||
remarkable in that; nor did Alice think it so very much out of the way to \
|
||||
hear the Rabbit say to itself, <Oh dear! Oh dear! I shall be late!> (when \
|
||||
she thought it over afterwards, it occurred to her that she ought to have \
|
||||
wondered at this, but at the time it all seemed quite natural); but when \
|
||||
the Rabbit actually took a watch out of its waistcoat-pocket, and looked at \
|
||||
it, and then hurried on, Alice started to her feet, for it flashed across \
|
||||
her mind that she had never before seen a rabbit with either a \
|
||||
waistcoat-pocket, or a watch to take out of it, and burning with curiosity, \
|
||||
she ran across the field after it, and fortunately was just in time to see \
|
||||
it pop down a large rabbit-hole under the hedge. In another moment down \
|
||||
went Alice after it, never once considering how in the world she was to get \
|
||||
out again. The rabbit-hole went straight on like a tunnel for some way, and \
|
||||
then dipped suddenly down, so suddenly that Alice had not a moment to think \
|
||||
about stopping herself before she found herself falling down a very deep \
|
||||
well. Either the well was very deep, or she fell very slowly, for she had \
|
||||
plenty of time as she went down to look about her and to wonder what was \
|
||||
going to happen next. First, she tried to look down and make out what she \
|
||||
was coming to, but it was too dark to see anything; then she looked at the \
|
||||
sides of the well, and noticed that they were filled with cupboards......"
|
||||
|
||||
(* util: a little middleware collecting statistics *)
|
||||
let middleware_stat () : Server.Middleware.t * (unit -> string) =
|
||||
let n_req = ref 0 in
|
||||
let total_time_ = ref 0. in
|
||||
let parse_time_ = ref 0. in
|
||||
let build_time_ = ref 0. in
|
||||
let write_time_ = ref 0. in
|
||||
|
||||
let m h req ~resp =
|
||||
incr n_req;
|
||||
let t1 = Request.start_time req in
|
||||
let t2 = now_ () in
|
||||
h req ~resp:(fun response ->
|
||||
let t3 = now_ () in
|
||||
resp response;
|
||||
let t4 = now_ () in
|
||||
total_time_ := !total_time_ +. (t4 -. t1);
|
||||
parse_time_ := !parse_time_ +. (t2 -. t1);
|
||||
build_time_ := !build_time_ +. (t3 -. t2);
|
||||
write_time_ := !write_time_ +. (t4 -. t3))
|
||||
and get_stat () =
|
||||
Printf.sprintf
|
||||
"%d requests (average response time: %.3fms = %.3fms + %.3fms + %.3fms)"
|
||||
!n_req
|
||||
(!total_time_ /. float !n_req *. 1e3)
|
||||
(!parse_time_ /. float !n_req *. 1e3)
|
||||
(!build_time_ /. float !n_req *. 1e3)
|
||||
(!write_time_ /. float !n_req *. 1e3)
|
||||
in
|
||||
m, get_stat
|
||||
|
||||
let middleware_trace : Server.Middleware.t =
|
||||
fun (h : Server.Middleware.handler) req ~resp ->
|
||||
let _sp = Trace.enter_span ~__FILE__ ~__LINE__ "http.handle" in
|
||||
let new_resp (r : Response.t) =
|
||||
Trace.add_data_to_span _sp [ "http.code", `Int r.code ];
|
||||
Trace.exit_span _sp;
|
||||
resp r
|
||||
in
|
||||
h req ~resp:new_resp
|
||||
|
||||
(* ugly AF *)
|
||||
let base64 x =
|
||||
let ic, oc = Unix.open_process "base64" in
|
||||
output_string oc x;
|
||||
flush oc;
|
||||
close_out oc;
|
||||
let r = input_line ic in
|
||||
ignore (Unix.close_process (ic, oc));
|
||||
r
|
||||
|
||||
let setup_logging () =
|
||||
Logs.set_reporter @@ Logs.format_reporter ();
|
||||
Logs.set_level ~all:true (Some Logs.Debug)
|
||||
|
||||
let setup_upload server : unit =
|
||||
Server.add_route_handler_stream ~meth:`POST server
|
||||
Route.(exact "upload" @/ return)
|
||||
(fun req ->
|
||||
let (`boundary boundary) =
|
||||
match MFD.parse_content_type req.headers with
|
||||
| Some b -> b
|
||||
| None -> Response.fail_raise ~code:400 "no boundary found"
|
||||
in
|
||||
|
||||
let st = MFD.create ~boundary req.body in
|
||||
let tbl = Hashtbl.create 16 in
|
||||
let cur = ref "" in
|
||||
let cur_kind = ref "" in
|
||||
let buf = Buffer.create 16 in
|
||||
let rec loop () =
|
||||
match MFD.next st with
|
||||
| End_of_input ->
|
||||
if !cur <> "" then
|
||||
Hashtbl.add tbl !cur (!cur_kind, Buffer.contents buf)
|
||||
| Part headers ->
|
||||
if !cur <> "" then
|
||||
Hashtbl.add tbl !cur (!cur_kind, Buffer.contents buf);
|
||||
(match MFD.Content_disposition.parse headers with
|
||||
| Some { kind; name = Some name; filename = _ } ->
|
||||
cur := name;
|
||||
cur_kind := kind;
|
||||
Buffer.clear buf;
|
||||
loop ()
|
||||
| _ -> Response.fail_raise ~code:400 "content disposition missing")
|
||||
| Read sl ->
|
||||
Buffer.add_subbytes buf sl.bytes sl.off sl.len;
|
||||
loop ()
|
||||
in
|
||||
loop ();
|
||||
|
||||
let open Tiny_httpd_html in
|
||||
let data =
|
||||
Hashtbl.fold
|
||||
(fun name (kind, data) acc ->
|
||||
Printf.sprintf "%S (kind: %S): %S" name kind data :: acc)
|
||||
tbl []
|
||||
in
|
||||
let html =
|
||||
body []
|
||||
[
|
||||
pre []
|
||||
[ txt (Printf.sprintf "{\n%s\n}" @@ String.concat "\n" data) ];
|
||||
]
|
||||
in
|
||||
Response.make_string ~code:201 @@ Ok (to_string_top html))
|
||||
|
||||
let () =
|
||||
let@ () = Trace_tef.with_setup () in
|
||||
let port_ = ref 8080 in
|
||||
let max_conns = ref 16_000 in
|
||||
let pool_buf_size = ref None in
|
||||
let buf_size = ref 4096 in
|
||||
let unix_sock = ref "" in
|
||||
let addr = ref "127.0.0.1" in
|
||||
Arg.parse
|
||||
(Arg.align
|
||||
[
|
||||
"--port", Arg.Set_int port_, " set port";
|
||||
"-p", Arg.Set_int port_, " set port";
|
||||
"--unix", Arg.Set_string unix_sock, " set unix socket";
|
||||
"--debug", Arg.Unit setup_logging, " enable debug";
|
||||
( "--max-buf-pool-size",
|
||||
Arg.Int (fun i -> pool_buf_size := Some i),
|
||||
" maximum buffer pool size" );
|
||||
"--buf-size", Arg.Set_int buf_size, " buffer size";
|
||||
"--max-conns", Arg.Set_int max_conns, " maximum number of connections";
|
||||
"--addr", Arg.Set_string addr, " binding address";
|
||||
])
|
||||
(fun _ -> raise (Arg.Bad ""))
|
||||
"echo [option]*";
|
||||
|
||||
let@ stdenv = Eio_main.run in
|
||||
let@ sw = Eio.Switch.run ~name:"main" in
|
||||
let server =
|
||||
Tiny_httpd_eio.create ~addr:!addr ~port:!port_ ~max_connections:!max_conns
|
||||
~buf_size:!buf_size ?max_buf_pool_size:!pool_buf_size ~stdenv ~sw ()
|
||||
in
|
||||
|
||||
if Trace.enabled () then (
|
||||
Tiny_httpd.Server.add_middleware server ~stage:(`Stage 1) middleware_trace;
|
||||
|
||||
(* fiber that emits metrics *)
|
||||
Eio.Fiber.fork_daemon ~sw (fun () ->
|
||||
while Eio.Switch.get_error sw |> Option.is_none do
|
||||
Trace.counter_int "http.active-conns"
|
||||
(Server.active_connections server);
|
||||
Eio_unix.sleep 0.5
|
||||
done;
|
||||
`Stop_daemon)
|
||||
);
|
||||
|
||||
Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(16 * 1024) server;
|
||||
let m_stats, get_stats = middleware_stat () in
|
||||
Server.add_middleware server ~stage:(`Stage 1) m_stats;
|
||||
|
||||
(* say hello *)
|
||||
Server.add_route_handler ~meth:`GET server
|
||||
Route.(exact "hello" @/ string @/ return)
|
||||
(fun name _req -> Response.make_string (Ok ("hello " ^ name ^ "!\n")));
|
||||
|
||||
(* compressed file access *)
|
||||
Server.add_route_handler ~meth:`GET server
|
||||
Route.(exact "zcat" @/ string_urlencoded @/ return)
|
||||
(fun path _req ->
|
||||
let ic = open_in path in
|
||||
let str = IO.Input.of_in_channel ic in
|
||||
let mime_type =
|
||||
try
|
||||
let p = Unix.open_process_in (Printf.sprintf "file -i -b %S" path) in
|
||||
try
|
||||
let s = [ "Content-Type", String.trim (input_line p) ] in
|
||||
ignore @@ Unix.close_process_in p;
|
||||
s
|
||||
with _ ->
|
||||
ignore @@ Unix.close_process_in p;
|
||||
[]
|
||||
with _ -> []
|
||||
in
|
||||
Response.make_stream ~headers:mime_type (Ok str));
|
||||
|
||||
(* echo request *)
|
||||
Server.add_route_handler server
|
||||
Route.(exact "echo" @/ return)
|
||||
(fun req ->
|
||||
let q =
|
||||
Request.query req
|
||||
|> List.map (fun (k, v) -> Printf.sprintf "%S = %S" k v)
|
||||
|> String.concat ";"
|
||||
in
|
||||
Response.make_string
|
||||
(Ok (Format.asprintf "echo:@ %a@ (query: %s)@." Request.pp req q)));
|
||||
|
||||
(* file upload *)
|
||||
Server.add_route_handler_stream ~meth:`PUT server
|
||||
Route.(exact "upload" @/ string @/ return)
|
||||
(fun path req ->
|
||||
Log.debug (fun k ->
|
||||
k "start upload %S, headers:\n%s\n\n%!" path
|
||||
(Format.asprintf "%a" Headers.pp (Request.headers req)));
|
||||
try
|
||||
let oc = open_out @@ "/tmp/" ^ path in
|
||||
IO.Input.to_chan oc req.Request.body;
|
||||
flush oc;
|
||||
Response.make_string (Ok "uploaded file")
|
||||
with e ->
|
||||
Response.fail ~code:500 "couldn't upload file: %s"
|
||||
(Printexc.to_string e));
|
||||
|
||||
(* protected by login *)
|
||||
Server.add_route_handler server
|
||||
Route.(exact "protected" @/ return)
|
||||
(fun req ->
|
||||
let ok =
|
||||
match Request.get_header req "authorization" with
|
||||
| Some v ->
|
||||
Log.debug (fun k -> k "authenticate with %S" v);
|
||||
v = "Basic " ^ base64 "user:foobar"
|
||||
| None -> false
|
||||
in
|
||||
if ok then (
|
||||
(* FIXME: a logout link *)
|
||||
let s =
|
||||
"<p>hello, this is super secret!</p><a href=\"/logout\">log out</a>"
|
||||
in
|
||||
Response.make_string (Ok s)
|
||||
) else (
|
||||
let headers =
|
||||
Headers.(empty |> set "www-authenticate" "basic realm=\"echo\"")
|
||||
in
|
||||
Response.fail ~code:401 ~headers "invalid"
|
||||
));
|
||||
|
||||
(* logout *)
|
||||
Server.add_route_handler server
|
||||
Route.(exact "logout" @/ return)
|
||||
(fun _req -> Response.fail ~code:401 "logged out");
|
||||
|
||||
(* stats *)
|
||||
Server.add_route_handler server
|
||||
Route.(exact "stats" @/ return)
|
||||
(fun _req ->
|
||||
let stats = get_stats () in
|
||||
Response.make_string @@ Ok stats);
|
||||
|
||||
Server.add_route_handler server
|
||||
Route.(exact "alice" @/ return)
|
||||
(fun _req -> Response.make_string (Ok alice_text));
|
||||
|
||||
Server.add_route_handler ~meth:`HEAD server
|
||||
Route.(exact "head" @/ return)
|
||||
(fun _req ->
|
||||
Response.make_void ~code:200 ~headers:[ "x-hello", "world" ] ());
|
||||
|
||||
(* VFS *)
|
||||
Tiny_httpd.Dir.add_vfs server
|
||||
~config:
|
||||
(Tiny_httpd.Dir.config ~download:true
|
||||
~dir_behavior:Tiny_httpd.Dir.Index_or_lists ())
|
||||
~vfs:Vfs.vfs ~prefix:"vfs";
|
||||
|
||||
setup_upload server;
|
||||
|
||||
(* main page *)
|
||||
Server.add_route_handler server
|
||||
Route.(return)
|
||||
(fun _req ->
|
||||
let open Tiny_httpd_html in
|
||||
let h =
|
||||
html []
|
||||
[
|
||||
head [] [ title [] [ txt "index of echo" ] ];
|
||||
body []
|
||||
[
|
||||
h3 [] [ txt "welcome!" ];
|
||||
p [] [ b [] [ txt "endpoints are:" ] ];
|
||||
ul []
|
||||
[
|
||||
li [] [ pre [] [ txt "/hello/:name (GET)" ] ];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
a [ A.href "/echo/" ] [ txt "echo" ];
|
||||
txt " echo back query";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[ pre [] [ txt "/upload/:path (PUT) to upload a file" ] ];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
txt
|
||||
"/zcat/:path (GET) to download a file (deflate \
|
||||
transfer-encoding)";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
a [ A.href "/stats/" ] [ txt "/stats/" ];
|
||||
txt " (GET) to access statistics";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
a [ A.href "/vfs/" ] [ txt "/vfs" ];
|
||||
txt " (GET) to access a VFS embedded in the binary";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
a [ A.href "/protected" ] [ txt "/protected" ];
|
||||
txt
|
||||
" (GET) to see a protected page (login: user, \
|
||||
password: foobar)";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
a [ A.href "/logout" ] [ txt "/logout" ];
|
||||
txt " (POST) to log out";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[
|
||||
form
|
||||
[
|
||||
A.action "/upload";
|
||||
A.enctype "multipart/form-data";
|
||||
A.target "_self";
|
||||
A.method_ "POST";
|
||||
]
|
||||
[
|
||||
label [] [ txt "my beautiful form" ];
|
||||
input [ A.type_ "file"; A.name "file1" ];
|
||||
input [ A.type_ "file"; A.name "file2" ];
|
||||
input
|
||||
[
|
||||
A.type_ "text";
|
||||
A.name "a";
|
||||
A.placeholder "text A";
|
||||
];
|
||||
input
|
||||
[
|
||||
A.type_ "text";
|
||||
A.name "b";
|
||||
A.placeholder "text B";
|
||||
];
|
||||
input [ A.type_ "submit" ];
|
||||
];
|
||||
];
|
||||
];
|
||||
];
|
||||
]
|
||||
in
|
||||
let s = to_string_top h in
|
||||
Response.make_string ~headers:[ "content-type", "text/html" ] @@ Ok s);
|
||||
|
||||
Printf.printf "listening on http://%s:%d\n%!" (Server.addr server)
|
||||
(Server.port server);
|
||||
match Server.run server with
|
||||
| Ok () -> ()
|
||||
| Error e -> raise e
|
||||
6
src/eio/dune
Normal file
6
src/eio/dune
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
(library
|
||||
(name tiny_httpd_eio)
|
||||
(public_name tiny_httpd_eio)
|
||||
(synopsis "An EIO-based backend for Tiny_httpd")
|
||||
(flags :standard -safe-string -warn-error -a+8)
|
||||
(libraries tiny_httpd eio eio.unix))
|
||||
250
src/eio/tiny_httpd_eio.ml
Normal file
250
src/eio/tiny_httpd_eio.ml
Normal file
|
|
@ -0,0 +1,250 @@
|
|||
module IO = Tiny_httpd.IO
|
||||
module H = Tiny_httpd.Server
|
||||
module Pool = Tiny_httpd.Pool
|
||||
module Slice = IO.Slice
|
||||
module Log = Tiny_httpd.Log
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
|
||||
type 'a with_args =
|
||||
?addr:string ->
|
||||
?port:int ->
|
||||
?unix_sock:string ->
|
||||
?max_connections:int ->
|
||||
?max_buf_pool_size:int ->
|
||||
stdenv:Eio_unix.Stdenv.base ->
|
||||
sw:Eio.Switch.t ->
|
||||
'a
|
||||
|
||||
let get_max_connection_ ?(max_connections = 64) () : int =
|
||||
let max_connections = max 4 max_connections in
|
||||
max_connections
|
||||
|
||||
let buf_size = 16 * 1024
|
||||
|
||||
let eio_ipaddr_to_unix (a : _ Eio.Net.Ipaddr.t) : Unix.inet_addr =
|
||||
(* TODO: for ipv4 we really could do it faster via sprintf 🙄 *)
|
||||
Unix.inet_addr_of_string (Format.asprintf "%a" Eio.Net.Ipaddr.pp a)
|
||||
|
||||
let eio_sock_addr_to_unix (a : Eio.Net.Sockaddr.stream) : Unix.sockaddr =
|
||||
match a with
|
||||
| `Tcp (h, p) -> Unix.ADDR_INET (eio_ipaddr_to_unix h, p)
|
||||
| `Unix s -> Unix.ADDR_UNIX s
|
||||
|
||||
let ic_of_flow ~closed ~buf_pool:ic_pool (flow : _ Eio.Net.stream_socket) :
|
||||
IO.Input.t =
|
||||
let cstruct = Pool.Raw.acquire ic_pool in
|
||||
let sent_shutdown = ref false in
|
||||
|
||||
object
|
||||
inherit Iostream.In_buf.t_from_refill ()
|
||||
|
||||
method private refill (sl : Slice.t) =
|
||||
assert (sl.len = 0);
|
||||
let cap = min (Bytes.length sl.bytes) (Cstruct.length cstruct) in
|
||||
|
||||
match Eio.Flow.single_read flow (Cstruct.sub cstruct 0 cap) with
|
||||
| exception End_of_file ->
|
||||
Log.debug (fun k -> k "read: eof");
|
||||
()
|
||||
| n ->
|
||||
Log.debug (fun k -> k "read %d bytes..." n);
|
||||
Cstruct.blit_to_bytes cstruct 0 sl.bytes 0 n;
|
||||
sl.off <- 0;
|
||||
sl.len <- n
|
||||
|
||||
method close () =
|
||||
if not !closed then (
|
||||
closed := true;
|
||||
Pool.Raw.release ic_pool cstruct
|
||||
);
|
||||
if not !sent_shutdown then (
|
||||
sent_shutdown := true;
|
||||
Eio.Flow.shutdown flow `Receive
|
||||
)
|
||||
end
|
||||
|
||||
let oc_of_flow ~closed ~buf_pool:oc_pool (flow : _ Eio.Net.stream_socket) :
|
||||
IO.Output.t =
|
||||
(* write buffer *)
|
||||
let wbuf : Cstruct.t = Pool.Raw.acquire oc_pool in
|
||||
let offset = ref 0 in
|
||||
let sent_shutdown = ref false in
|
||||
|
||||
object (self)
|
||||
method flush () : unit =
|
||||
if !offset > 0 then (
|
||||
Eio.Flow.write flow [ Cstruct.sub wbuf 0 !offset ];
|
||||
offset := 0
|
||||
)
|
||||
|
||||
method output buf i len =
|
||||
let i = ref i in
|
||||
let len = ref len in
|
||||
|
||||
while !len > 0 do
|
||||
let available = Cstruct.length wbuf - !offset in
|
||||
let n = min !len available in
|
||||
Cstruct.blit_from_bytes buf !i wbuf !offset n;
|
||||
offset := !offset + n;
|
||||
i := !i + n;
|
||||
len := !len - n;
|
||||
|
||||
if !offset = Cstruct.length wbuf then self#flush ()
|
||||
done
|
||||
|
||||
method output_char c =
|
||||
if !offset = Cstruct.length wbuf then self#flush ();
|
||||
Cstruct.set_char wbuf !offset c;
|
||||
incr offset;
|
||||
if !offset = Cstruct.length wbuf then self#flush ()
|
||||
|
||||
method close () =
|
||||
if not !closed then (
|
||||
closed := true;
|
||||
Pool.Raw.release oc_pool wbuf
|
||||
);
|
||||
if not !sent_shutdown then (
|
||||
sent_shutdown := true;
|
||||
Eio.Flow.shutdown flow `Send
|
||||
)
|
||||
end
|
||||
|
||||
let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
|
||||
~(stdenv : Eio_unix.Stdenv.base) ~(sw : Eio.Switch.t) () :
|
||||
(module H.IO_BACKEND) =
|
||||
let addr, port, (sockaddr : Eio.Net.Sockaddr.stream) =
|
||||
match addr, port, unix_sock with
|
||||
| _, _, Some s -> Printf.sprintf "unix:%s" s, 0, `Unix s
|
||||
| addr, port, None ->
|
||||
let addr = Option.value ~default:"127.0.0.1" addr in
|
||||
let sockaddr, port =
|
||||
match Eio.Net.getaddrinfo stdenv#net addr, port with
|
||||
| `Tcp (h, _) :: _, None ->
|
||||
let p = 8080 in
|
||||
`Tcp (h, p), p
|
||||
| `Tcp (h, _) :: _, Some p -> `Tcp (h, p), p
|
||||
| _ ->
|
||||
failwith @@ Printf.sprintf "Could not parse TCP address from %S" addr
|
||||
in
|
||||
addr, port, sockaddr
|
||||
in
|
||||
|
||||
let module M = struct
|
||||
let init_addr () = addr
|
||||
let init_port () = port
|
||||
let clock = Eio.Stdenv.clock stdenv
|
||||
let get_time_s () = Eio.Time.now clock
|
||||
let max_connections = get_max_connection_ ?max_connections ()
|
||||
|
||||
let pool_size =
|
||||
match max_buf_pool_size with
|
||||
| Some n -> n
|
||||
| None -> min 4096 (max_connections * 2)
|
||||
|
||||
let cstruct_pool =
|
||||
Pool.create ~max_size:pool_size
|
||||
~mk_item:(fun () -> Cstruct.create buf_size)
|
||||
()
|
||||
|
||||
let tcp_server () : IO.TCP_server.builder =
|
||||
{
|
||||
IO.TCP_server.serve =
|
||||
(fun ~after_init ~handle () : unit ->
|
||||
let running = Atomic.make true in
|
||||
let active_conns = Atomic.make 0 in
|
||||
let sem = Eio.Semaphore.make max_connections in
|
||||
|
||||
Eio.Switch.on_release sw (fun () -> Atomic.set running false);
|
||||
let net = Eio.Stdenv.net stdenv in
|
||||
|
||||
(* main server socket *)
|
||||
let sock =
|
||||
let backlog = max_connections in
|
||||
Eio.Net.listen ~reuse_addr:true ~reuse_port:true ~backlog ~sw net
|
||||
sockaddr
|
||||
in
|
||||
|
||||
(* Resolve actual address/port (important for port 0) *)
|
||||
let actual_addr, actual_port =
|
||||
match Eio.Net.listening_addr sock with
|
||||
| `Tcp (_, p) -> addr, p
|
||||
| `Unix s -> Printf.sprintf "unix:%s" s, 0
|
||||
in
|
||||
|
||||
let tcp_server : IO.TCP_server.t =
|
||||
{
|
||||
running = (fun () -> Atomic.get running);
|
||||
stop =
|
||||
(fun () ->
|
||||
Atomic.set running false;
|
||||
(* Backstop: fail the switch after 60s if handlers don't complete *)
|
||||
Eio.Fiber.fork_daemon ~sw (fun () ->
|
||||
Eio.Time.sleep clock 60.0;
|
||||
if Eio.Switch.get_error sw |> Option.is_none then
|
||||
Eio.Switch.fail sw Exit;
|
||||
`Stop_daemon));
|
||||
endpoint = (fun () -> actual_addr, actual_port);
|
||||
active_connections = (fun () -> Atomic.get active_conns);
|
||||
}
|
||||
in
|
||||
|
||||
after_init tcp_server;
|
||||
|
||||
while Atomic.get running do
|
||||
match Eio.Net.accept ~sw sock with
|
||||
| exception (Eio.Cancel.Cancelled _ | Eio.Io _)
|
||||
when not (Atomic.get running) ->
|
||||
(* Socket closed or switch cancelled during shutdown; exit loop *)
|
||||
()
|
||||
| conn, client_addr ->
|
||||
(* Acquire semaphore BEFORE spawning a fiber so we
|
||||
bound the number of in-flight fibers. *)
|
||||
Eio.Semaphore.acquire sem;
|
||||
Eio.Fiber.fork ~sw (fun () ->
|
||||
let@ () =
|
||||
Fun.protect ~finally:(fun () ->
|
||||
Log.debug (fun k ->
|
||||
k "Tiny_httpd_eio: client handler returned");
|
||||
Atomic.decr active_conns;
|
||||
Eio.Semaphore.release sem;
|
||||
try Eio.Flow.close conn with Eio.Io _ -> ())
|
||||
in
|
||||
(try
|
||||
Eio_unix.Fd.use_exn "setsockopt" (Eio_unix.Net.fd conn)
|
||||
(fun fd -> Unix.setsockopt fd Unix.TCP_NODELAY true)
|
||||
with Unix.Unix_error _ -> ());
|
||||
Atomic.incr active_conns;
|
||||
let ic_closed = ref false in
|
||||
let oc_closed = ref false in
|
||||
let ic =
|
||||
ic_of_flow ~closed:ic_closed ~buf_pool:cstruct_pool conn
|
||||
in
|
||||
let oc =
|
||||
oc_of_flow ~closed:oc_closed ~buf_pool:cstruct_pool conn
|
||||
in
|
||||
|
||||
Log.debug (fun k ->
|
||||
k "handling client on %a…" Eio.Net.Sockaddr.pp
|
||||
client_addr);
|
||||
let client_addr_unix = eio_sock_addr_to_unix client_addr in
|
||||
try handle.handle ~client_addr:client_addr_unix ic oc
|
||||
with exn ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
Log.error (fun k ->
|
||||
k "Client handler for %a failed with %s\n%s"
|
||||
Eio.Net.Sockaddr.pp client_addr
|
||||
(Printexc.to_string exn)
|
||||
(Printexc.raw_backtrace_to_string bt)))
|
||||
done);
|
||||
}
|
||||
end in
|
||||
(module M)
|
||||
|
||||
let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ~stdenv
|
||||
~sw ?buf_size ?middlewares () : H.t =
|
||||
let backend =
|
||||
io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections
|
||||
~stdenv ~sw ()
|
||||
in
|
||||
H.create_from ?buf_size ?middlewares ~backend ()
|
||||
31
src/eio/tiny_httpd_eio.mli
Normal file
31
src/eio/tiny_httpd_eio.mli
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
(** Tiny httpd EIO backend.
|
||||
|
||||
This replaces the threads + Unix blocking syscalls of {!Tiny_httpd_server}
|
||||
with an Eio-based cooperative system.
|
||||
|
||||
{b NOTE}: this is very experimental and will absolutely change over time,
|
||||
especially since Eio itself is also subject to change.
|
||||
@since NEXT_RELEASE *)
|
||||
|
||||
(* TODO: pass in a switch *)
|
||||
|
||||
type 'a with_args =
|
||||
?addr:string ->
|
||||
?port:int ->
|
||||
?unix_sock:string ->
|
||||
?max_connections:int ->
|
||||
?max_buf_pool_size:int ->
|
||||
stdenv:Eio_unix.Stdenv.base ->
|
||||
sw:Eio.Switch.t ->
|
||||
'a
|
||||
|
||||
val io_backend : (unit -> (module Tiny_httpd.Server.IO_BACKEND)) with_args
|
||||
(** Create a server *)
|
||||
|
||||
val create :
|
||||
(?buf_size:int ->
|
||||
?middlewares:([ `Encoding | `Stage of int ] * Tiny_httpd.Middleware.t) list ->
|
||||
unit ->
|
||||
Tiny_httpd.Server.t)
|
||||
with_args
|
||||
(** Create a server *)
|
||||
|
|
@ -51,7 +51,7 @@
|
|||
(public_name tiny_httpd.ws)
|
||||
(synopsis "Websockets for tiny_httpd")
|
||||
(private_modules common_ws_ utils_)
|
||||
(flags :standard -open Tiny_httpd_core)
|
||||
(flags :standard -w -32 -open Tiny_httpd_core)
|
||||
(foreign_stubs
|
||||
(language c)
|
||||
(names tiny_httpd_ws_stubs)
|
||||
|
|
|
|||
|
|
@ -26,11 +26,11 @@ type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit
|
|||
module Frame_type = struct
|
||||
type t = int
|
||||
|
||||
let continuation : t = 0
|
||||
let text : t = 1
|
||||
let _continuation : t = 0
|
||||
let _text : t = 1
|
||||
let binary : t = 2
|
||||
let close : t = 8
|
||||
let ping : t = 9
|
||||
let _close : t = 8
|
||||
let _ping : t = 9
|
||||
let pong : t = 10
|
||||
|
||||
let show = function
|
||||
|
|
@ -132,7 +132,7 @@ module Writer = struct
|
|||
()
|
||||
|
||||
(** Max fragment size: send 16 kB at a time *)
|
||||
let max_fragment_size = 16 * 1024
|
||||
let _max_fragment_size = 16 * 1024
|
||||
|
||||
let[@inline never] really_output_buf_ (self : t) =
|
||||
self.header.fin <- true;
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ let () = assert_eq (Ok [ "foo", "bar" ]) (U.parse_query "yolo#foo=bar")
|
|||
let () =
|
||||
add_qcheck
|
||||
@@ QCheck.Test.make ~name:__LOC__ ~long_factor:20 ~count:1_000
|
||||
Q.(small_list (pair string string))
|
||||
Q.(list_small (pair string string))
|
||||
(fun l ->
|
||||
List.iter
|
||||
(fun (a, b) ->
|
||||
|
|
|
|||
|
|
@ -14,9 +14,9 @@ let () =
|
|||
@@ QCheck.Test.make ~count:10_000
|
||||
Q.(
|
||||
triple
|
||||
(bytes_of_size (Gen.return 4))
|
||||
(option small_nat)
|
||||
(bytes_of_size Gen.(0 -- 6000))
|
||||
(bytes_size (Gen.return 4))
|
||||
(option nat_small)
|
||||
(bytes_size Gen.(0 -- 6000))
|
||||
(* |> Q.add_stat ("b.size", fun (_k, b) -> Bytes.length b) *)
|
||||
|> Q.add_shrink_invariant (fun (k, _, _) -> Bytes.length k = 4))
|
||||
(fun (key, mask_offset, b) ->
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ depends: [
|
|||
"logs" {with-test}
|
||||
"conf-libcurl" {with-test}
|
||||
"ptime" {with-test}
|
||||
"qcheck-core" {>= "0.9" & with-test}
|
||||
"qcheck-core" {>= "0.91" & with-test}
|
||||
]
|
||||
depopts: [
|
||||
"logs"
|
||||
|
|
|
|||
32
tiny_httpd_eio.opam
Normal file
32
tiny_httpd_eio.opam
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
# This file is generated by dune, edit dune-project instead
|
||||
opam-version: "2.0"
|
||||
version: "0.19"
|
||||
synopsis: "Use eio for tiny_httpd"
|
||||
maintainer: ["c-cube"]
|
||||
authors: ["c-cube"]
|
||||
license: "MIT"
|
||||
homepage: "https://github.com/c-cube/tiny_httpd/"
|
||||
bug-reports: "https://github.com/c-cube/tiny_httpd/issues"
|
||||
depends: [
|
||||
"dune" {>= "3.2"}
|
||||
"tiny_httpd" {= version}
|
||||
"eio" {>= "1.0" & < "2.0"}
|
||||
"base-unix"
|
||||
"logs" {with-test}
|
||||
"odoc" {with-doc}
|
||||
]
|
||||
build: [
|
||||
["dune" "subst"] {dev}
|
||||
[
|
||||
"dune"
|
||||
"build"
|
||||
"-p"
|
||||
name
|
||||
"-j"
|
||||
jobs
|
||||
"@install"
|
||||
"@runtest" {with-test}
|
||||
"@doc" {with-doc}
|
||||
]
|
||||
]
|
||||
dev-repo: "git+https://github.com/c-cube/tiny_httpd.git"
|
||||
Loading…
Add table
Reference in a new issue