Merge pull request #64 from c-cube/wip-eio

support running on Eio
This commit is contained in:
Simon Cruanes 2023-07-05 21:53:07 -04:00 committed by GitHub
commit b927f98490
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 747 additions and 36 deletions

View file

@ -13,12 +13,12 @@ jobs:
fail-fast: true
matrix:
os:
- macos-latest
- ubuntu-latest
- windows-latest
#- macos-latest
#- windows-latest
ocaml-compiler:
- 4.04.x
- 4.14.x
- 4.05
- 4.14
runs-on: ${{ matrix.os }}
@ -30,9 +30,12 @@ jobs:
uses: ocaml/setup-ocaml@v2
with:
ocaml-compiler: ${{ matrix.ocaml-compiler }}
opam-local-packages: |
./tiny_httpd.opam
./tiny_httpd_camlzip.opam
opam-depext-flags: --with-test
- run: opam install . --deps-only --with-test
- run: opam install ./tiny_httpd.opam ./tiny_httpd_camlzip.opam --deps-only --with-test
- run: opam exec -- dune build @install -p tiny_httpd,tiny_httpd_camlzip
@ -43,3 +46,4 @@ jobs:
- run: opam exec -- dune build @src/runtest @examples/runtest @tests/runtest -p tiny_httpd_camlzip
if: ${{ matrix.os == 'ubuntu-latest' }}

45
.github/workflows/main5.yml vendored Normal file
View file

@ -0,0 +1,45 @@
name: build (ocaml 5)
on:
pull_request:
push:
schedule:
# Prime the caches every Monday
- cron: 0 1 * * MON
jobs:
build:
strategy:
fail-fast: true
matrix:
os:
- ubuntu-latest
#- macos-latest
#- windows-latest
ocaml-compiler:
- 5.0.x
runs-on: ${{ matrix.os }}
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Use OCaml ${{ matrix.ocaml-compiler }}
uses: ocaml/setup-ocaml@v2
with:
ocaml-compiler: ${{ matrix.ocaml-compiler }}
opam-depext-flags: --with-test
- run: opam install . --deps-only --with-test
- run: opam exec -- dune build @install -p tiny_httpd,tiny_httpd_camlzip,tiny_httpd_eio
- run: opam exec -- dune build @src/runtest @examples/runtest @tests/runtest -p tiny_httpd
if: ${{ matrix.os == 'ubuntu-latest' }}
- run: opam install tiny_httpd
- run: opam exec -- dune build @src/runtest @examples/runtest @tests/runtest -p tiny_httpd_camlzip
if: ${{ matrix.os == 'ubuntu-latest' }}

View file

@ -15,7 +15,7 @@ clean:
doc:
@dune build @doc
WATCH?= "@install @runtest"
WATCH?= @install @runtest
watch:
@dune build $(OPTS) $(WATCH) -w

2
echo_eio.sh Executable file
View file

@ -0,0 +1,2 @@
#!/bin/sh
exec dune exec --display=quiet --profile=release "examples/echo_eio.exe" -- $@

View file

@ -14,6 +14,13 @@
(modules echo vfs)
(libraries tiny_httpd tiny_httpd_camlzip))
(executable
(name echo_eio)
(flags :standard -warn-error -a+8)
(modules echo_eio)
(libraries tiny_httpd tiny_httpd_camlzip
tiny_httpd_eio eio eio_posix))
(rule
(targets test_output.txt)
(deps

View file

@ -2,6 +2,36 @@ module S = Tiny_httpd
let now_ = Unix.gettimeofday
let alice_text =
"CHAPTER I. Down the Rabbit-Hole Alice was beginning to get very tired of \
sitting by her sister on the bank, and of having nothing to do: once or \
twice she had peeped into the book her sister was reading, but it had no \
pictures or conversations in it, <and what is the use of a book,> thought \
Alice <without pictures or conversations?> So she was considering in her \
own mind (as well as she could, for the hot day made her feel very sleepy \
and stupid), whether the pleasure of making a daisy-chain would be worth \
the trouble of getting up and picking the daisies, when suddenly a White \
Rabbit with pink eyes ran close by her. There was nothing so very \
remarkable in that; nor did Alice think it so very much out of the way to \
hear the Rabbit say to itself, <Oh dear! Oh dear! I shall be late!> (when \
she thought it over afterwards, it occurred to her that she ought to have \
wondered at this, but at the time it all seemed quite natural); but when \
the Rabbit actually took a watch out of its waistcoat-pocket, and looked at \
it, and then hurried on, Alice started to her feet, for it flashed across \
her mind that she had never before seen a rabbit with either a \
waistcoat-pocket, or a watch to take out of it, and burning with curiosity, \
she ran across the field after it, and fortunately was just in time to see \
it pop down a large rabbit-hole under the hedge. In another moment down \
went Alice after it, never once considering how in the world she was to get \
out again. The rabbit-hole went straight on like a tunnel for some way, and \
then dipped suddenly down, so suddenly that Alice had not a moment to think \
about stopping herself before she found herself falling down a very deep \
well. Either the well was very deep, or she fell very slowly, for she had \
plenty of time as she went down to look about her and to wonder what was \
going to happen next. First, she tried to look down and make out what she \
was coming to, but it was too dark to see anything; then she looked at the \
sides of the well, and noticed that they were filled with cupboards......"
(* util: a little middleware collecting statistics *)
let middleware_stat () : S.Middleware.t * (unit -> string) =
let n_req = ref 0 in
@ -152,6 +182,10 @@ let () =
let stats = get_stats () in
S.Response.make_string @@ Ok stats);
S.add_route_handler server
S.Route.(exact "alice" @/ return)
(fun _req -> S.Response.make_string (Ok alice_text));
(* VFS *)
Tiny_httpd_dir.add_vfs server
~config:

246
examples/echo_eio.ml Normal file
View file

@ -0,0 +1,246 @@
module S = Tiny_httpd
module S_eio = Tiny_httpd_eio
let now_ = Unix.gettimeofday
let ( let@ ) = ( @@ )
(* util: a little middleware collecting statistics *)
let middleware_stat () : S.Middleware.t * (unit -> string) =
let n_req = ref 0 in
let total_time_ = ref 0. in
let parse_time_ = ref 0. in
let build_time_ = ref 0. in
let write_time_ = ref 0. in
let m h req ~resp =
incr n_req;
let t1 = S.Request.start_time req in
let t2 = now_ () in
h req ~resp:(fun response ->
let t3 = now_ () in
resp response;
let t4 = now_ () in
total_time_ := !total_time_ +. (t4 -. t1);
parse_time_ := !parse_time_ +. (t2 -. t1);
build_time_ := !build_time_ +. (t3 -. t2);
write_time_ := !write_time_ +. (t4 -. t3))
and get_stat () =
Printf.sprintf
"%d requests (average response time: %.3fms = %.3fms + %.3fms + %.3fms)"
!n_req
(!total_time_ /. float !n_req *. 1e3)
(!parse_time_ /. float !n_req *. 1e3)
(!build_time_ /. float !n_req *. 1e3)
(!write_time_ /. float !n_req *. 1e3)
in
m, get_stat
let alice_text =
"CHAPTER I. Down the Rabbit-Hole Alice was beginning to get very tired of \
sitting by her sister on the bank, and of having nothing to do: once or \
twice she had peeped into the book her sister was reading, but it had no \
pictures or conversations in it, <and what is the use of a book,> thought \
Alice <without pictures or conversations?> So she was considering in her \
own mind (as well as she could, for the hot day made her feel very sleepy \
and stupid), whether the pleasure of making a daisy-chain would be worth \
the trouble of getting up and picking the daisies, when suddenly a White \
Rabbit with pink eyes ran close by her. There was nothing so very \
remarkable in that; nor did Alice think it so very much out of the way to \
hear the Rabbit say to itself, <Oh dear! Oh dear! I shall be late!> (when \
she thought it over afterwards, it occurred to her that she ought to have \
wondered at this, but at the time it all seemed quite natural); but when \
the Rabbit actually took a watch out of its waistcoat-pocket, and looked at \
it, and then hurried on, Alice started to her feet, for it flashed across \
her mind that she had never before seen a rabbit with either a \
waistcoat-pocket, or a watch to take out of it, and burning with curiosity, \
she ran across the field after it, and fortunately was just in time to see \
it pop down a large rabbit-hole under the hedge. In another moment down \
went Alice after it, never once considering how in the world she was to get \
out again. The rabbit-hole went straight on like a tunnel for some way, and \
then dipped suddenly down, so suddenly that Alice had not a moment to think \
about stopping herself before she found herself falling down a very deep \
well. Either the well was very deep, or she fell very slowly, for she had \
plenty of time as she went down to look about her and to wonder what was \
going to happen next. First, she tried to look down and make out what she \
was coming to, but it was too dark to see anything; then she looked at the \
sides of the well, and noticed that they were filled with cupboards......"
let () =
let port_ = ref 8080 in
let j = ref 32 in
Arg.parse
(Arg.align
[
"--port", Arg.Set_int port_, " set port";
"-p", Arg.Set_int port_, " set port";
"--debug", Arg.Unit (fun () -> S._enable_debug true), " enable debug";
"-j", Arg.Set_int j, " maximum number of connections";
])
(fun _ -> raise (Arg.Bad ""))
"echo [option]*";
(* use eio *)
let@ stdenv = Eio_posix.run in
let@ sw = Eio.Switch.run in
(* create server *)
let server : S.t =
S_eio.create ~port:!port_ ~max_connections:!j
~stdenv:(stdenv :> Eio_unix.Stdenv.base)
~sw ()
in
Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(16 * 1024) server;
let m_stats, get_stats = middleware_stat () in
S.add_middleware server ~stage:(`Stage 1) m_stats;
(* say hello *)
S.add_route_handler ~meth:`GET server
S.Route.(exact "hello" @/ string @/ return)
(fun name _req -> S.Response.make_string (Ok ("hello " ^ name ^ "!\n")));
(* compressed file access *)
S.add_route_handler ~meth:`GET server
S.Route.(exact "zcat" @/ string_urlencoded @/ return)
(fun path _req ->
let ic = open_in path in
let str = S.Byte_stream.of_chan ic in
let mime_type =
try
let p = Unix.open_process_in (Printf.sprintf "file -i -b %S" path) in
try
let s = [ "Content-Type", String.trim (input_line p) ] in
ignore @@ Unix.close_process_in p;
s
with _ ->
ignore @@ Unix.close_process_in p;
[]
with _ -> []
in
S.Response.make_stream ~headers:mime_type (Ok str));
(* echo request *)
S.add_route_handler server
S.Route.(exact "echo" @/ return)
(fun req ->
let q =
S.Request.query req
|> List.map (fun (k, v) -> Printf.sprintf "%S = %S" k v)
|> String.concat ";"
in
S.Response.make_string
(Ok (Format.asprintf "echo:@ %a@ (query: %s)@." S.Request.pp req q)));
(* file upload *)
S.add_route_handler_stream ~meth:`PUT server
S.Route.(exact "upload" @/ string @/ return)
(fun path req ->
S._debug (fun k ->
k "start upload %S, headers:\n%s\n\n%!" path
(Format.asprintf "%a" S.Headers.pp (S.Request.headers req)));
try
let oc = open_out @@ "/tmp/" ^ path in
S.Byte_stream.to_chan oc req.S.Request.body;
flush oc;
S.Response.make_string (Ok "uploaded file")
with e ->
S.Response.fail ~code:500 "couldn't upload file: %s"
(Printexc.to_string e));
(* stats *)
S.add_route_handler server
S.Route.(exact "stats" @/ return)
(fun _req ->
let stats = get_stats () in
S.Response.make_string @@ Ok stats);
S.add_route_handler server ~meth:`POST
S.Route.(exact "quit" @/ return)
(fun _req ->
S.stop server;
S.Response.make_string (Ok "quitting"));
S.add_route_handler server
S.Route.(exact "alice" @/ return)
(fun _req -> S.Response.make_string (Ok alice_text));
(* main page *)
S.add_route_handler server
S.Route.(return)
(fun _req ->
let open Tiny_httpd_html in
let h =
html []
[
head [] [ title [] [ txt "index of echo" ] ];
body []
[
h3 [] [ txt "welcome!" ];
p [] [ b [] [ txt "endpoints are:" ] ];
ul []
[
li [] [ pre [] [ txt "/hello/:name (GET)" ] ];
li []
[
pre []
[
a [ A.href "/echo/" ] [ txt "echo" ];
txt " echo back query";
];
];
li []
[ pre [] [ txt "/upload/:path (PUT) to upload a file" ] ];
li []
[
pre []
[
txt
"/zcat/:path (GET) to download a file (deflate \
transfer-encoding)";
];
];
li []
[
pre []
[
a [ A.href "/stats/" ] [ txt "/stats/" ];
txt " (GET) to access statistics";
];
];
li []
[
pre []
[
a [ A.href "/vfs/" ] [ txt "/vfs" ];
txt " (GET) to access a VFS embedded in the binary";
];
];
li []
[
pre
[ A.style "display: inline" ]
[
a [ A.href "/quit" ] [ txt "/quit" ];
txt " (POST) to stop server";
];
form
[
A.style "display: inline";
A.action "/quit";
A.method_ "POST";
]
[ button [ A.type_ "submit" ] [ txt "quit" ] ];
];
];
];
]
in
let s = to_string_top h in
S.Response.make_string ~headers:[ "content-type", "text/html" ] @@ Ok s);
Printf.printf "listening on http://%s:%d\n%!" (S.addr server) (S.port server);
let res = S.run server in
Gc.print_stat stdout;
match res with
| Ok () -> ()
| Error e -> raise e

View file

@ -5,3 +5,4 @@ module Util = Tiny_httpd_util
module Dir = Tiny_httpd_dir
module Html = Tiny_httpd_html
module IO = Tiny_httpd_io
module Pool = Tiny_httpd_pool

View file

@ -100,6 +100,10 @@ end
module Util = Tiny_httpd_util
(** {2 Resource pool} *)
module Pool = Tiny_httpd_pool
(** {2 Static directory serving} *)
module Dir = Tiny_httpd_dir

51
src/Tiny_httpd_pool.ml Normal file
View file

@ -0,0 +1,51 @@
module A = Tiny_httpd_atomic_
type 'a list_ = Nil | Cons of int * 'a * 'a list_
type 'a t = {
mk_item: unit -> 'a;
clear: 'a -> unit;
max_size: int; (** Max number of items *)
items: 'a list_ A.t;
}
let create ?(clear = ignore) ~mk_item ?(max_size = 512) () : _ t =
{ mk_item; clear; max_size; items = A.make Nil }
let rec acquire_ self =
match A.get self.items with
| Nil -> self.mk_item ()
| Cons (_, x, tl) as l ->
if A.compare_and_set self.items l tl then
x
else
acquire_ self
let[@inline] size_ = function
| Cons (sz, _, _) -> sz
| Nil -> 0
let release_ self x : unit =
let rec loop () =
match A.get self.items with
| Cons (sz, _, _) when sz >= self.max_size ->
(* forget the item *)
()
| l ->
if not (A.compare_and_set self.items l (Cons (size_ l + 1, x, l))) then
loop ()
in
self.clear x;
loop ()
let with_resource (self : _ t) f =
let x = acquire_ self in
try
let res = f x in
release_ self x;
res
with e ->
let bt = Printexc.get_raw_backtrace () in
release_ self x;
Printexc.raise_with_backtrace e bt

25
src/Tiny_httpd_pool.mli Normal file
View file

@ -0,0 +1,25 @@
(** Resource pool.
This pool is used for buffers. It can be used for other resources
but do note that it assumes resources are still reasonably
cheap to produce and discard, and will never block waiting for
a resource it's not a good pool for DB connections.
@since NEXT_RELEASE. *)
type 'a t
(** Pool of values of type ['a] *)
val create :
?clear:('a -> unit) -> mk_item:(unit -> 'a) -> ?max_size:int -> unit -> 'a t
(** Create a new pool.
@param mk_item produce a new item in case the pool is empty
@param max_size maximum number of item in the pool before we start
dropping resources on the floor. This controls resource consumption.
@param clear a function called on items before recycling them.
*)
val with_resource : 'a t -> ('a -> 'b) -> 'b
(** [with_resource pool f] runs [f x] with [x] a resource;
when [f] fails or returns, [x] is returned to the pool for
future reuse. *)

View file

@ -19,6 +19,7 @@ let _debug k =
module Buf = Tiny_httpd_buf
module Byte_stream = Tiny_httpd_stream
module IO = Tiny_httpd_io
module Pool = Tiny_httpd_pool
exception Bad_req of int * string
@ -325,9 +326,13 @@ module Request = struct
| Bad_req (c, s) -> Error (c, s)
| e -> Error (400, Printexc.to_string e)
let read_body_full ?buf_size (self : byte_stream t) : string t =
let read_body_full ?buf ?buf_size (self : byte_stream t) : string t =
try
let buf = Buf.create ?size:buf_size () in
let buf =
match buf with
| Some b -> b
| None -> Buf.create ?size:buf_size ()
in
let body = Byte_stream.read_all ~buf self.body in
{ self with body }
with
@ -424,12 +429,12 @@ module Response = struct
Format.fprintf out "{@[code=%d;@ headers=[@[%a@]];@ body=%a@]}" self.code
Headers.pp self.headers pp_body self.body
let output_ ?(buf = Buf.create ~size:256 ()) (oc : IO.Out_channel.t)
(self : t) : unit =
let output_ ~buf (oc : IO.Out_channel.t) (self : t) : unit =
(* double indirection:
- print into [buffer] using [bprintf]
- transfer to [buf_] so we can output from there *)
let tmp_buffer = Buffer.create 32 in
Buf.clear buf;
(* write start of reply *)
Printf.bprintf tmp_buffer "HTTP/1.1 %d %s\r\n" self.code
@ -631,11 +636,6 @@ module type IO_BACKEND = sig
val init_addr : unit -> string
val init_port : unit -> int
val spawn : (unit -> unit) -> unit
(** function used to spawn a new thread to handle a
new client connection. By default it is {!Thread.create} but one
could use a thread pool instead.*)
val get_time_s : unit -> float
(** obtain the current timestamp in seconds. *)
@ -655,6 +655,7 @@ type t = {
mutable path_handlers:
(unit Request.t -> cb_path_handler resp_result option) list;
(** path handlers *)
buf_pool: Buf.t Pool.t;
}
let get_addr_ sock =
@ -746,7 +747,11 @@ let add_route_handler_ ?(accept = fun _req -> Ok ()) ?(middlewares = []) ?meth
let add_route_handler (type a) ?accept ?middlewares ?meth self
(route : (a, _) Route.t) (f : _) : unit =
let tr_req _oc req ~resp f =
resp (f (Request.read_body_full ~buf_size:self.buf_size req))
let req =
Pool.with_resource self.buf_pool @@ fun buf ->
Request.read_body_full ~buf req
in
resp (f req)
in
add_route_handler_ ?accept ?middlewares ?meth self route ~tr_req f
@ -763,7 +768,10 @@ exception Exit_SSE
let add_route_server_sent_handler ?accept self route f =
let tr_req (oc : IO.Out_channel.t) req ~resp f =
let req = Request.read_body_full ~buf_size:self.buf_size req in
let req =
Pool.with_resource self.buf_pool @@ fun buf ->
Request.read_body_full ~buf req
in
let headers =
ref Headers.(empty |> set "content-type" "text/event-stream")
in
@ -826,6 +834,10 @@ let create_from ?(buf_size = 16 * 1_024) ?(middlewares = []) ~backend () : t =
path_handlers = [];
middlewares = [];
middlewares_sorted = lazy [];
buf_pool =
Pool.create ~clear:Buf.clear
~mk_item:(fun () -> Buf.create ~size:buf_size ())
();
}
in
List.iter (fun (stage, m) -> add_middleware self ~stage m) middlewares;
@ -959,7 +971,6 @@ let create ?(masksigpipe = true) ?max_connections ?(timeout = 0.0) ?buf_size
let init_addr () = addr
let init_port () = port
let get_time_s = get_time_s
let spawn f = new_thread f
let tcp_server () = tcp_server_builder
end in
let backend = (module B : IO_BACKEND) in
@ -987,7 +998,8 @@ let find_map f l =
(* handle client on [ic] and [oc] *)
let client_handle_for (self : t) ic oc : unit =
let buf = Buf.create ~size:self.buf_size () in
Pool.with_resource self.buf_pool @@ fun buf ->
Pool.with_resource self.buf_pool @@ fun buf_res ->
let is = Byte_stream.of_input ~buf_size:self.buf_size ic in
let continue = ref true in
while !continue && running self do
@ -998,7 +1010,7 @@ let client_handle_for (self : t) ic oc : unit =
| Error (c, s) ->
(* connection error, close *)
let res = Response.make_raw ~code:c s in
(try Response.output_ oc res with Sys_error _ -> ());
(try Response.output_ ~buf:buf_res oc res with Sys_error _ -> ());
continue := false
| Ok (Some req) ->
_debug (fun k -> k "req: %s" (Format.asprintf "@[%a@]" Request.pp_ req));
@ -1013,7 +1025,8 @@ let client_handle_for (self : t) ic oc : unit =
| None ->
fun _oc req ~resp ->
let body_str =
Request.read_body_full ~buf_size:self.buf_size req
Pool.with_resource self.buf_pool @@ fun buf ->
Request.read_body_full ~buf req
in
resp (self.handler body_str)
in
@ -1022,7 +1035,7 @@ let client_handle_for (self : t) ic oc : unit =
(match Request.get_header ~f:String.trim req "Expect" with
| Some "100-continue" ->
_debug (fun k -> k "send back: 100 CONTINUE");
Response.output_ oc (Response.make_raw ~code:100 "")
Response.output_ ~buf:buf_res oc (Response.make_raw ~code:100 "")
| Some s -> bad_reqf 417 "unknown expectation %s" s
| None -> ());
@ -1047,7 +1060,7 @@ let client_handle_for (self : t) ic oc : unit =
try
if Headers.get "connection" r.Response.headers = Some "close" then
continue := false;
Response.output_ oc r
Response.output_ ~buf:buf_res oc r
with Sys_error _ -> continue := false
in
@ -1058,10 +1071,10 @@ let client_handle_for (self : t) ic oc : unit =
(* connection broken somehow *)
| Bad_req (code, s) ->
continue := false;
Response.output_ oc @@ Response.make_raw ~code s
Response.output_ ~buf:buf_res oc @@ Response.make_raw ~code s
| e ->
continue := false;
Response.output_ oc
Response.output_ ~buf:buf_res oc
@@ Response.fail ~code:500 "server error: %s" (Printexc.to_string e))
done

View file

@ -148,10 +148,13 @@ module Request : sig
@since 0.3
*)
val read_body_full : ?buf_size:int -> byte_stream t -> string t
val read_body_full :
?buf:Tiny_httpd_buf.t -> ?buf_size:int -> byte_stream t -> string t
(** Read the whole body into a string. Potentially blocking.
@param buf_size initial size of underlying buffer (since 0.11) *)
@param buf_size initial size of underlying buffer (since 0.11)
@param buf the initial buffer (since NEXT_RELEASE)
*)
(**/**)
@ -406,11 +409,6 @@ module type IO_BACKEND = sig
val init_addr : unit -> string
val init_port : unit -> int
val spawn : (unit -> unit) -> unit
(** function used to spawn a new thread to handle a
new client connection. By default it is {!Thread.create} but one
could use a thread pool instead.*)
val get_time_s : unit -> float
(** obtain the current timestamp in seconds. *)

View file

@ -18,3 +18,12 @@
(with-stdout-to
%{targets}
(run %{bin}))))
(rule
(targets Tiny_httpd_atomic_.ml)
(deps
(:bin ./gen/mkshims.exe))
(action
(with-stdout-to
%{targets}
(run %{bin}))))

8
src/eio/dune Normal file
View file

@ -0,0 +1,8 @@
(library
(name tiny_httpd_eio)
(public_name tiny_httpd_eio)
(synopsis "An EIO-based backend for Tiny_httpd")
(flags :standard -safe-string -warn-error -a+8)
(libraries tiny_httpd eio eio.unix))

170
src/eio/tiny_httpd_eio.ml Normal file
View file

@ -0,0 +1,170 @@
module IO = Tiny_httpd_io
module H = Tiny_httpd_server
module Pool = Tiny_httpd_pool
let ( let@ ) = ( @@ )
type 'a with_args =
?addr:string ->
?port:int ->
?max_connections:int ->
stdenv:Eio_unix.Stdenv.base ->
sw:Eio.Switch.t ->
'a
let get_max_connection_ ?(max_connections = 64) () : int =
let max_connections = max 4 max_connections in
max_connections
let buf_size = 16 * 1024
let ic_of_flow ~buf_pool:ic_pool (flow : Eio.Net.stream_socket) :
IO.In_channel.t =
Pool.with_resource ic_pool @@ fun cstruct ->
let len_slice = ref 0 in
let offset = ref 0 in
let input buf i len =
if len = 0 then
0
else (
let available = ref (!len_slice - !offset) in
if !available = 0 then (
let n = flow#read_into cstruct in
offset := 0;
len_slice := n;
available := n
);
let n = min !available len in
if n > 0 then (
Cstruct.blit_to_bytes cstruct !offset buf i n;
offset := !offset + n;
n
) else
0
)
in
let close () = flow#shutdown `Receive in
{ IO.In_channel.input; close }
let oc_of_flow ~buf_pool:oc_pool (flow : Eio.Net.stream_socket) :
IO.Out_channel.t =
(* write buffer *)
Pool.with_resource oc_pool @@ fun wbuf ->
let offset = ref 0 in
let flush () =
if !offset > 0 then (
let i = ref 0 in
let len = ref !offset in
let src =
object
inherit Eio.Flow.source
method read_into (cstruct : Cstruct.t) : int =
if !len = 0 then raise End_of_file;
let n = min !len (Cstruct.length cstruct) in
Cstruct.blit_from_bytes wbuf !i cstruct 0 n;
i := !i + n;
len := !len - n;
n
end
in
flow#copy src;
offset := 0
)
in
let output buf i len =
let i = ref i in
let len = ref len in
while !len > 0 do
let available = Bytes.length wbuf - !offset in
let n = min !len available in
Bytes.blit buf !i wbuf !offset n;
offset := !offset + n;
i := !i + n;
len := !len - n;
if !offset = Bytes.length wbuf then flush ()
done
in
let close () = flow#shutdown `Send in
{ IO.Out_channel.close; flush; output }
let io_backend ?(addr = "127.0.0.1") ?(port = 8080) ?max_connections
~(stdenv : Eio_unix.Stdenv.base) ~(sw : Eio.Switch.t) () :
(module H.IO_BACKEND) =
let module M = struct
let init_addr () = addr
let init_port () = port
let get_time_s () = Unix.gettimeofday ()
let ic_pool = Pool.create ~mk_item:(fun () -> Cstruct.create buf_size) ()
let oc_pool = Pool.create ~mk_item:(fun () -> Bytes.create buf_size) ()
let tcp_server () : IO.TCP_server.builder =
{
IO.TCP_server.serve =
(fun ~after_init ~handle () : unit ->
(* FIXME: parse *)
let ip_addr = Eio.Net.Ipaddr.V4.any in
let running = Atomic.make true in
let active_conns = Atomic.make 0 in
Eio.Switch.on_release sw (fun () -> Atomic.set running false);
let net = Eio.Stdenv.net stdenv in
(* main server socket *)
let sock =
let backlog = get_max_connection_ ?max_connections () in
Eio.Net.listen ~reuse_addr:true ~reuse_port:true ~backlog ~sw net
(`Tcp (ip_addr, port))
in
let tcp_server : IO.TCP_server.t =
{
running = (fun () -> Atomic.get running);
stop =
(fun () ->
Atomic.set running false;
Eio.Switch.fail sw Exit);
endpoint =
(fun () ->
(* TODO: find the real port *)
addr, port);
active_connections = (fun () -> Atomic.get active_conns);
}
in
after_init tcp_server;
while Atomic.get running do
Eio.Net.accept_fork ~sw
~on_error:(fun exn ->
H._debug (fun k ->
k "error in client handler: %s" (Printexc.to_string exn)))
sock
(fun flow _client_addr ->
Atomic.incr active_conns;
let@ () =
Fun.protect ~finally:(fun () ->
H._debug (fun k ->
k "Tiny_httpd_eio: client handler returned");
Atomic.decr active_conns)
in
let ic = ic_of_flow ~buf_pool:ic_pool flow in
let oc = oc_of_flow ~buf_pool:oc_pool flow in
handle.handle ic oc)
done);
}
end in
(module M)
let create ?addr ?port ?max_connections ~stdenv ~sw ?buf_size ?middlewares () :
H.t =
let backend = io_backend ?addr ?port ?max_connections ~stdenv ~sw () in
H.create_from ?buf_size ?middlewares ~backend ()

View file

@ -0,0 +1,30 @@
(** Tiny httpd EIO backend.
This replaces the threads + Unix blocking syscalls of {!Tiny_httpd_server}
with an Eio-based cooperative system.
{b NOTE}: this is very experimental and will absolutely change over time,
especially since Eio itself is also subject to change.
@since NEXT_RELEASE *)
(* TODO: pass in a switch *)
type 'a with_args =
?addr:string ->
?port:int ->
?max_connections:int ->
stdenv:Eio_unix.Stdenv.base ->
sw:Eio.Switch.t ->
'a
val io_backend : (unit -> (module Tiny_httpd_server.IO_BACKEND)) with_args
(** Create a server *)
val create :
(?buf_size:int ->
?middlewares:
([ `Encoding | `Stage of int ] * Tiny_httpd_server.Middleware.t) list ->
unit ->
Tiny_httpd_server.t)
with_args
(** Create a server *)

View file

@ -1,2 +1,2 @@
(executable
(name gentags))
(executables
(names gentags mkshims))

41
src/gen/mkshims.ml Normal file
View file

@ -0,0 +1,41 @@
let atomic_before_412 =
{|
type 'a t = {mutable x: 'a}
let[@inline] make x = {x}
let[@inline] get {x} = x
let[@inline] set r x = r.x <- x
let[@inline] exchange r x =
let y = r.x in
r.x <- x;
y
let[@inline] compare_and_set r seen v =
if r.x == seen then (
r.x <- v;
true
) else false
let[@inline] fetch_and_add r x =
let v = r.x in
r.x <- x + r.x;
v
let[@inline] incr r = r.x <- 1 + r.x
let[@inline] decr r = r.x <- r.x - 1
|}
let atomic_after_412 = {|include Atomic|}
let write_file file s =
let oc = open_out file in
output_string oc s;
close_out oc
let () =
let version = Scanf.sscanf Sys.ocaml_version "%d.%d.%s" (fun x y _ -> x, y) in
print_endline
(if version >= (4, 12) then
atomic_after_412
else
atomic_before_412);
()

View file

@ -14,9 +14,10 @@ depends: [
"base-threads"
"result"
"seq"
"ocaml" { >= "4.04.0" }
"ocaml" { >= "4.05.0" }
"odoc" {with-doc}
"conf-libcurl" {with-test}
"ptime" {with-test}
"qcheck-core" {with-test & >= "0.9" }
"ptime" {with-test}
]

View file

@ -13,7 +13,7 @@ depends: [
"dune" { >= "2.0" }
"camlzip" {>= "1.06"}
"tiny_httpd" { = version }
"ocaml" { >= "4.04.0" }
"ocaml" { >= "4.05.0" }
"odoc" {with-doc}
]
tags: [ "http" "thread" "server" "gzip" "camlzip" ]

22
tiny_httpd_eio.opam Normal file
View file

@ -0,0 +1,22 @@
opam-version: "2.0"
version: "0.13"
authors: ["Simon Cruanes"]
maintainer: "simon.cruanes.2007@m4x.org"
license: "MIT"
synopsis: "Run tiny_httpd on Eio"
build: [
["dune" "build" "@install" "-p" name "-j" jobs]
["dune" "build" "@doc" "-p" name] {with-doc}
["dune" "runtest" "-p" name] {with-test}
]
depends: [
"dune" { >= "2.0" }
"eio" {>= "0.9"}
"tiny_httpd" { = version }
"odoc" {with-doc}
]
tags: [ "http" "server" "eio" ]
homepage: "https://github.com/c-cube/tiny_httpd/"
doc: "https://c-cube.github.io/tiny_httpd/"
bug-reports: "https://github.com/c-cube/tiny_httpd/issues"
dev-repo: "git+https://github.com/c-cube/tiny_httpd.git"