mirror of
https://github.com/c-cube/tiny_httpd.git
synced 2025-12-05 19:00:32 -05:00
Compare commits
13 commits
236c93ea4f
...
ac466a8fcb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ac466a8fcb | ||
|
|
8a8aadfbb0 | ||
|
|
9a1343aef7 | ||
|
|
f10992ec32 | ||
|
|
0f917ddf72 | ||
|
|
03c3e09f12 | ||
|
|
023805232f | ||
|
|
022a495de3 | ||
|
|
a56dd0ec65 | ||
|
|
7f9fae1fc8 | ||
|
|
e199162e1f | ||
|
|
cf9c14b1c2 | ||
|
|
c43ffb5ff4 |
20 changed files with 804 additions and 61 deletions
|
|
@ -1,4 +1,4 @@
|
|||
version = 0.26.2
|
||||
version = 0.27.0
|
||||
profile=conventional
|
||||
margin=80
|
||||
if-then-else=k-r
|
||||
|
|
|
|||
11
dune-project
11
dune-project
|
|
@ -39,3 +39,14 @@
|
|||
(iostream-camlzip (>= 0.2.1))
|
||||
(logs :with-test)
|
||||
(odoc :with-doc)))
|
||||
|
||||
(package
|
||||
(name tiny_httpd_moonpool)
|
||||
(synopsis "Moonpool+picos_stdio backend for Tiny_httpd")
|
||||
(depends
|
||||
seq
|
||||
(tiny_httpd (= :version))
|
||||
(moonpool (>= 0.7))
|
||||
(moonpool-io (>= 0.7))
|
||||
(ocaml (>= 5.0))
|
||||
(odoc :with-doc)))
|
||||
|
|
|
|||
2
echo_mio.sh
Executable file
2
echo_mio.sh
Executable file
|
|
@ -0,0 +1,2 @@
|
|||
#!/bin/sh
|
||||
exec dune exec --display=quiet --profile=release "examples/echo_mio.exe" -- $@
|
||||
|
|
@ -14,6 +14,12 @@
|
|||
(modules echo vfs)
|
||||
(libraries tiny_httpd logs tiny_httpd_camlzip tiny_httpd.multipart-form-data))
|
||||
|
||||
(executable
|
||||
(name echo_mio)
|
||||
(flags :standard -warn-error -a+8)
|
||||
(modules echo_mio)
|
||||
(libraries tiny_httpd tiny_httpd_moonpool logs))
|
||||
|
||||
(executable
|
||||
(name writer)
|
||||
(flags :standard -warn-error -a+8)
|
||||
|
|
|
|||
294
examples/echo_mio.ml
Normal file
294
examples/echo_mio.ml
Normal file
|
|
@ -0,0 +1,294 @@
|
|||
open Tiny_httpd_core
|
||||
module Log = Tiny_httpd.Log
|
||||
|
||||
let now_ = Unix.gettimeofday
|
||||
|
||||
let alice_text =
|
||||
"CHAPTER I. Down the Rabbit-Hole Alice was beginning to get very tired of \
|
||||
sitting by her sister on the bank, and of having nothing to do: once or \
|
||||
twice she had peeped into the book her sister was reading, but it had no \
|
||||
pictures or conversations in it, <and what is the use of a book,> thought \
|
||||
Alice <without pictures or conversations?> So she was considering in her \
|
||||
own mind (as well as she could, for the hot day made her feel very sleepy \
|
||||
and stupid), whether the pleasure of making a daisy-chain would be worth \
|
||||
the trouble of getting up and picking the daisies, when suddenly a White \
|
||||
Rabbit with pink eyes ran close by her. There was nothing so very \
|
||||
remarkable in that; nor did Alice think it so very much out of the way to \
|
||||
hear the Rabbit say to itself, <Oh dear! Oh dear! I shall be late!> (when \
|
||||
she thought it over afterwards, it occurred to her that she ought to have \
|
||||
wondered at this, but at the time it all seemed quite natural); but when \
|
||||
the Rabbit actually took a watch out of its waistcoat-pocket, and looked at \
|
||||
it, and then hurried on, Alice started to her feet, for it flashed across \
|
||||
her mind that she had never before seen a rabbit with either a \
|
||||
waistcoat-pocket, or a watch to take out of it, and burning with curiosity, \
|
||||
she ran across the field after it, and fortunately was just in time to see \
|
||||
it pop down a large rabbit-hole under the hedge. In another moment down \
|
||||
went Alice after it, never once considering how in the world she was to get \
|
||||
out again. The rabbit-hole went straight on like a tunnel for some way, and \
|
||||
then dipped suddenly down, so suddenly that Alice had not a moment to think \
|
||||
about stopping herself before she found herself falling down a very deep \
|
||||
well. Either the well was very deep, or she fell very slowly, for she had \
|
||||
plenty of time as she went down to look about her and to wonder what was \
|
||||
going to happen next. First, she tried to look down and make out what she \
|
||||
was coming to, but it was too dark to see anything; then she looked at the \
|
||||
sides of the well, and noticed that they were filled with cupboards......"
|
||||
|
||||
(* util: a little middleware collecting statistics *)
|
||||
let middleware_stat () : Server.Middleware.t * (unit -> string) =
|
||||
let n_req = ref 0 in
|
||||
let total_time_ = ref 0. in
|
||||
let parse_time_ = ref 0. in
|
||||
let build_time_ = ref 0. in
|
||||
let write_time_ = ref 0. in
|
||||
|
||||
let m h req ~resp =
|
||||
incr n_req;
|
||||
let t1 = Request.start_time req in
|
||||
let t2 = now_ () in
|
||||
h req ~resp:(fun response ->
|
||||
let t3 = now_ () in
|
||||
resp response;
|
||||
let t4 = now_ () in
|
||||
total_time_ := !total_time_ +. (t4 -. t1);
|
||||
parse_time_ := !parse_time_ +. (t2 -. t1);
|
||||
build_time_ := !build_time_ +. (t3 -. t2);
|
||||
write_time_ := !write_time_ +. (t4 -. t3))
|
||||
and get_stat () =
|
||||
Printf.sprintf
|
||||
"%d requests (average response time: %.3fms = %.3fms + %.3fms + %.3fms)"
|
||||
!n_req
|
||||
(!total_time_ /. float !n_req *. 1e3)
|
||||
(!parse_time_ /. float !n_req *. 1e3)
|
||||
(!build_time_ /. float !n_req *. 1e3)
|
||||
(!write_time_ /. float !n_req *. 1e3)
|
||||
in
|
||||
m, get_stat
|
||||
|
||||
(* ugly AF *)
|
||||
let base64 x =
|
||||
let ic, oc = Unix.open_process "base64" in
|
||||
output_string oc x;
|
||||
flush oc;
|
||||
close_out oc;
|
||||
let r = input_line ic in
|
||||
ignore (Unix.close_process (ic, oc));
|
||||
r
|
||||
|
||||
let setup_logging () =
|
||||
Logs.set_reporter @@ Logs.format_reporter ();
|
||||
Logs.set_level ~all:true (Some Logs.Debug)
|
||||
|
||||
let () =
|
||||
Moonpool_fib.main @@ fun _ ->
|
||||
let port_ = ref 8080 in
|
||||
let max_conn = ref 800 in
|
||||
let j = ref 16 in
|
||||
Arg.parse
|
||||
(Arg.align
|
||||
[
|
||||
"--port", Arg.Set_int port_, " set port";
|
||||
"-p", Arg.Set_int port_, " set port";
|
||||
"--debug", Arg.Unit setup_logging, " enable debug";
|
||||
( "--max-connections",
|
||||
Arg.Set_int max_conn,
|
||||
" maximum number of connections" );
|
||||
"-j", Arg.Set_int j, " Size of thread pool";
|
||||
])
|
||||
(fun _ -> raise (Arg.Bad ""))
|
||||
"echo [option]*";
|
||||
|
||||
let runner = Moonpool.Ws_pool.create ~num_threads:!j () in
|
||||
let server : Server.t =
|
||||
Tiny_httpd_moonpool.create ~runner ~port:!port_ ~max_connections:!max_conn
|
||||
()
|
||||
in
|
||||
|
||||
let m_stats, get_stats = middleware_stat () in
|
||||
Server.add_middleware server ~stage:(`Stage 1) m_stats;
|
||||
|
||||
(* say hello *)
|
||||
Server.add_route_handler ~meth:`GET server
|
||||
Route.(exact "hello" @/ string @/ return)
|
||||
(fun name _req -> Response.make_string (Ok ("hello " ^ name ^ "!\n")));
|
||||
|
||||
(* compressed file access *)
|
||||
Server.add_route_handler ~meth:`GET server
|
||||
Route.(exact "zcat" @/ string_urlencoded @/ return)
|
||||
(fun path _req ->
|
||||
let ic = open_in path in
|
||||
let str = IO.Input.of_in_channel ic in
|
||||
let mime_type =
|
||||
try
|
||||
let p = Unix.open_process_in (Printf.sprintf "file -i -b %S" path) in
|
||||
try
|
||||
let s = [ "Content-Type", String.trim (input_line p) ] in
|
||||
ignore @@ Unix.close_process_in p;
|
||||
s
|
||||
with _ ->
|
||||
ignore @@ Unix.close_process_in p;
|
||||
[]
|
||||
with _ -> []
|
||||
in
|
||||
Response.make_stream ~headers:mime_type (Ok str));
|
||||
|
||||
(* echo request *)
|
||||
Server.add_route_handler server
|
||||
Route.(exact "echo" @/ return)
|
||||
(fun req ->
|
||||
let q =
|
||||
Request.query req
|
||||
|> List.map (fun (k, v) -> Printf.sprintf "%S = %S" k v)
|
||||
|> String.concat ";"
|
||||
in
|
||||
Response.make_string
|
||||
(Ok (Format.asprintf "echo:@ %a@ (query: %s)@." Request.pp req q)));
|
||||
|
||||
(* file upload *)
|
||||
Server.add_route_handler_stream ~meth:`PUT server
|
||||
Route.(exact "upload" @/ string @/ return)
|
||||
(fun path req ->
|
||||
Log.debug (fun k ->
|
||||
k "start upload %S, headers:\n%s\n\n%!" path
|
||||
(Format.asprintf "%a" Headers.pp (Request.headers req)));
|
||||
try
|
||||
let oc = open_out @@ "/tmp/" ^ path in
|
||||
IO.Input.to_chan oc req.Request.body;
|
||||
flush oc;
|
||||
Response.make_string (Ok "uploaded file")
|
||||
with e ->
|
||||
Response.fail ~code:500 "couldn't upload file: %s"
|
||||
(Printexc.to_string e));
|
||||
|
||||
(* protected by login *)
|
||||
Server.add_route_handler server
|
||||
Route.(exact "protected" @/ return)
|
||||
(fun req ->
|
||||
let ok =
|
||||
match Request.get_header req "authorization" with
|
||||
| Some v ->
|
||||
Log.debug (fun k -> k "authenticate with %S" v);
|
||||
v = "Basic " ^ base64 "user:foobar"
|
||||
| None -> false
|
||||
in
|
||||
if ok then (
|
||||
(* FIXME: a logout link *)
|
||||
let s =
|
||||
"<p>hello, this is super secret!</p><a href=\"/logout\">log out</a>"
|
||||
in
|
||||
Response.make_string (Ok s)
|
||||
) else (
|
||||
let headers =
|
||||
Headers.(empty |> set "www-authenticate" "basic realm=\"echo\"")
|
||||
in
|
||||
Response.fail ~code:401 ~headers "invalid"
|
||||
));
|
||||
|
||||
(* logout *)
|
||||
Server.add_route_handler server
|
||||
Route.(exact "logout" @/ return)
|
||||
(fun _req -> Response.fail ~code:401 "logged out");
|
||||
|
||||
(* stats *)
|
||||
Server.add_route_handler server
|
||||
Route.(exact "stats" @/ return)
|
||||
(fun _req ->
|
||||
let stats = get_stats () in
|
||||
Response.make_string @@ Ok stats);
|
||||
|
||||
Server.add_route_handler server
|
||||
Route.(exact "alice" @/ return)
|
||||
(fun _req -> Response.make_string (Ok alice_text));
|
||||
|
||||
Server.add_route_handler server
|
||||
Route.(exact "alice10" @/ return)
|
||||
(fun _req ->
|
||||
let writer =
|
||||
IO.Writer.make () ~write:(fun oc ->
|
||||
for _i = 1 to 10 do
|
||||
IO.Output.output_string oc alice_text;
|
||||
IO.Output.flush oc
|
||||
done)
|
||||
in
|
||||
Response.make_writer (Ok writer));
|
||||
|
||||
(* main page *)
|
||||
Server.add_route_handler server
|
||||
Route.(return)
|
||||
(fun _req ->
|
||||
let open Tiny_httpd_html in
|
||||
let h =
|
||||
html []
|
||||
[
|
||||
head [] [ title [] [ txt "index of echo" ] ];
|
||||
body []
|
||||
[
|
||||
h3 [] [ txt "welcome!" ];
|
||||
p [] [ b [] [ txt "endpoints are:" ] ];
|
||||
ul []
|
||||
[
|
||||
li [] [ pre [] [ txt "/hello/:name (GET)" ] ];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
a [ A.href "/echo/" ] [ txt "echo" ];
|
||||
txt " echo back query";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[ pre [] [ txt "/upload/:path (PUT) to upload a file" ] ];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
txt
|
||||
"/zcat/:path (GET) to download a file (deflate \
|
||||
transfer-encoding)";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
a [ A.href "/stats/" ] [ txt "/stats/" ];
|
||||
txt " (GET) to access statistics";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
a [ A.href "/vfs/" ] [ txt "/vfs" ];
|
||||
txt " (GET) to access a VFS embedded in the binary";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
a [ A.href "/protected" ] [ txt "/protected" ];
|
||||
txt
|
||||
" (GET) to see a protected page (login: user, \
|
||||
password: foobar)";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
a [ A.href "/logout" ] [ txt "/logout" ];
|
||||
txt " (POST) to log out";
|
||||
];
|
||||
];
|
||||
];
|
||||
];
|
||||
]
|
||||
in
|
||||
let s = to_string_top h in
|
||||
Response.make_string ~headers:[ "content-type", "text/html" ] @@ Ok s);
|
||||
|
||||
Printf.printf "listening on http://%s:%d\n%!" (Server.addr server)
|
||||
(Server.port server);
|
||||
match Server.run server with
|
||||
| Ok () -> ()
|
||||
| Error e -> raise e
|
||||
|
|
@ -26,11 +26,6 @@ let atomic_before_412 =
|
|||
|
||||
let atomic_after_412 = {|include Atomic|}
|
||||
|
||||
let write_file file s =
|
||||
let oc = open_out file in
|
||||
output_string oc s;
|
||||
close_out oc
|
||||
|
||||
let () =
|
||||
let version = Scanf.sscanf Sys.ocaml_version "%d.%d.%s" (fun x y _ -> x, y) in
|
||||
print_endline
|
||||
|
|
|
|||
|
|
@ -73,9 +73,9 @@ let rec pp_ : type a b. Buffer.t -> (a, b) t -> unit =
|
|||
| Rest { url_encoded } ->
|
||||
bpf out "<rest_of_url%s>"
|
||||
(if url_encoded then
|
||||
"_urlencoded"
|
||||
else
|
||||
"")
|
||||
"_urlencoded"
|
||||
else
|
||||
"")
|
||||
| Compose (Exact s, tl) -> bpf out "%s/%a" s pp_ tl
|
||||
| Compose (Int, tl) -> bpf out "<int>/%a" pp_ tl
|
||||
| Compose (String, tl) -> bpf out "<str>/%a" pp_ tl
|
||||
|
|
@ -91,3 +91,34 @@ module Private_ = struct
|
|||
end
|
||||
|
||||
let pp out x = Format.pp_print_string out (to_string x)
|
||||
|
||||
let rec to_url_rec : type b. Buffer.t -> (b, string) t -> b =
|
||||
fun buf route ->
|
||||
match route with
|
||||
| Fire -> Buffer.contents buf
|
||||
| Rest { url_encoded = _ } ->
|
||||
fun str ->
|
||||
Buffer.add_string buf str;
|
||||
Buffer.contents buf
|
||||
| Compose (comp, rest) ->
|
||||
(match comp with
|
||||
| Exact s ->
|
||||
Buffer.add_string buf s;
|
||||
Buffer.add_char buf '/';
|
||||
to_url_rec buf rest
|
||||
| Int ->
|
||||
fun i ->
|
||||
Printf.bprintf buf "%d/" i;
|
||||
to_url_rec buf rest
|
||||
| String ->
|
||||
fun s ->
|
||||
Printf.bprintf buf "%s/" s;
|
||||
to_url_rec buf rest
|
||||
| String_urlencoded ->
|
||||
fun s ->
|
||||
Printf.bprintf buf "%s/" (Util.percent_encode s);
|
||||
to_url_rec buf rest)
|
||||
|
||||
let to_url (h : ('a, string) t) : 'a =
|
||||
let buf = Buffer.create 16 in
|
||||
to_url_rec buf h
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
(** Routing
|
||||
|
||||
Basic type-safe routing of handlers based on URL paths. This is optional,
|
||||
it is possible to only define the root handler with something like
|
||||
{{: https://github.com/anuragsoni/routes/} Routes}.
|
||||
Basic type-safe routing of handlers based on URL paths. This is optional, it
|
||||
is possible to only define the root handler with something like
|
||||
{{:https://github.com/anuragsoni/routes/} Routes}.
|
||||
@since 0.6 *)
|
||||
|
||||
type ('a, 'b) comp
|
||||
|
|
@ -27,31 +27,35 @@ val return : ('a, 'a) t
|
|||
(** Matches the empty path. *)
|
||||
|
||||
val rest_of_path : (string -> 'a, 'a) t
|
||||
(** Matches a string, even containing ['/']. This will match
|
||||
the entirety of the remaining route.
|
||||
@since 0.7 *)
|
||||
(** Matches a string, even containing ['/']. This will match the entirety of the
|
||||
remaining route.
|
||||
@since 0.7 *)
|
||||
|
||||
val rest_of_path_urlencoded : (string -> 'a, 'a) t
|
||||
(** Matches a string, even containing ['/'], and URL-decode it (piecewise).
|
||||
This will match the entirety of the remaining route.
|
||||
@since 0.7 *)
|
||||
(** Matches a string, even containing ['/'], and URL-decode it (piecewise). This
|
||||
will match the entirety of the remaining route.
|
||||
@since 0.7 *)
|
||||
|
||||
val ( @/ ) : ('a, 'b) comp -> ('b, 'c) t -> ('a, 'c) t
|
||||
(** [comp / route] matches ["foo/bar/…"] iff [comp] matches ["foo"],
|
||||
and [route] matches ["bar/…"]. *)
|
||||
(** [comp / route] matches ["foo/bar/…"] iff [comp] matches ["foo"], and [route]
|
||||
matches ["bar/…"]. *)
|
||||
|
||||
val exact_path : string -> ('a, 'b) t -> ('a, 'b) t
|
||||
(** [exact_path "foo/bar/..." r] is equivalent to
|
||||
[exact "foo" @/ exact "bar" @/ ... @/ r]
|
||||
@since 0.11 **)
|
||||
[exact "foo" @/ exact "bar" @/ ... @/ r]
|
||||
@since 0.11 **)
|
||||
|
||||
val pp : Format.formatter -> _ t -> unit
|
||||
(** Print the route.
|
||||
@since 0.7 *)
|
||||
@since 0.7 *)
|
||||
|
||||
val to_string : _ t -> string
|
||||
(** Print the route.
|
||||
@since 0.7 *)
|
||||
@since 0.7 *)
|
||||
|
||||
val to_url : ('a, string) t -> 'a
|
||||
(** [to_url route args] takes a route, and turns it into a URL path.
|
||||
@since NEXT_RELEASE *)
|
||||
|
||||
module Private_ : sig
|
||||
val eval : string list -> ('a, 'b) t -> 'a -> 'b option
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ module Head_middleware = struct
|
|||
type t = { handle: 'a. 'a Request.t -> 'a Request.t }
|
||||
|
||||
let trivial = { handle = Fun.id }
|
||||
let[@inline] apply (self : t) req = self.handle req
|
||||
let[@inline] apply' req (self : t) = self.handle req
|
||||
|
||||
let to_middleware (self : t) : Middleware.t =
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
(* adapted from https://github.com/sindresorhus/html-tags (MIT licensed) *)
|
||||
|
||||
let pf = Printf.printf
|
||||
let spf = Printf.sprintf
|
||||
|
||||
let void =
|
||||
[
|
||||
|
|
|
|||
6
src/moonpool-io/dune
Normal file
6
src/moonpool-io/dune
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
|
||||
|
||||
(library
|
||||
(name tiny_httpd_moonpool)
|
||||
(public_name tiny_httpd_moonpool)
|
||||
(libraries tiny_httpd moonpool moonpool.sync moonpool.fib moonpool-io))
|
||||
220
src/moonpool-io/io_server.ml
Normal file
220
src/moonpool-io/io_server.ml
Normal file
|
|
@ -0,0 +1,220 @@
|
|||
open Tiny_httpd_core
|
||||
module A = Atomic
|
||||
module MIO = Moonpool_io
|
||||
module Sem = Moonpool_sync.Semaphore.Counting
|
||||
module Fd = Moonpool_io.Fd
|
||||
|
||||
module IO_helper = struct
|
||||
module Slice = Iostream.Slice
|
||||
|
||||
module Output = struct
|
||||
include IO.Output
|
||||
|
||||
class of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) (fd : Fd.t) :
|
||||
t =
|
||||
object
|
||||
inherit t_from_output ~bytes:buf.bytes ()
|
||||
|
||||
method private output_underlying bs i len0 =
|
||||
let i = ref i in
|
||||
let len = ref len0 in
|
||||
while !len > 0 do
|
||||
match MIO.Unix.write fd bs !i !len with
|
||||
| 0 -> failwith "write failed"
|
||||
| n ->
|
||||
i := !i + n;
|
||||
len := !len - n
|
||||
done
|
||||
|
||||
method private close_underlying () =
|
||||
if not !closed then (
|
||||
closed := true;
|
||||
if close_noerr then (
|
||||
try MIO.Unix.close fd with _ -> ()
|
||||
) else
|
||||
MIO.Unix.close fd
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
module Input = struct
|
||||
include IO.Input
|
||||
|
||||
let of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) (fd : Fd.t) :
|
||||
t =
|
||||
let eof = ref false in
|
||||
object
|
||||
inherit Iostream.In_buf.t_from_refill ~bytes:buf.bytes ()
|
||||
|
||||
method private refill (slice : Slice.t) =
|
||||
if not !eof then (
|
||||
slice.off <- 0;
|
||||
let continue = ref true in
|
||||
while !continue do
|
||||
match
|
||||
MIO.Unix.read fd slice.bytes 0 (Bytes.length slice.bytes)
|
||||
with
|
||||
| n ->
|
||||
slice.len <- n;
|
||||
continue := false
|
||||
done;
|
||||
(* Printf.eprintf "read returned %d B\n%!" !n; *)
|
||||
if slice.len = 0 then eof := true
|
||||
)
|
||||
|
||||
method close () =
|
||||
if not !closed then (
|
||||
closed := true;
|
||||
eof := true;
|
||||
if close_noerr then (
|
||||
try MIO.Unix.close fd with _ -> ()
|
||||
) else
|
||||
MIO.Unix.close fd
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
open struct
|
||||
let get_addr_ (fd : Fd.t) =
|
||||
match Unix.getsockname (Fd.unsafe_get fd) with
|
||||
| Unix.ADDR_INET (addr, port) -> addr, port
|
||||
| _ -> invalid_arg "httpd: address is not INET"
|
||||
|
||||
let shutdown_silent_ (fd : Fd.t) : unit =
|
||||
try MIO.Unix.shutdown fd Unix.SHUTDOWN_ALL with _ -> ()
|
||||
|
||||
let close_silent_ (fd : Fd.t) : unit = try MIO.Unix.close fd with _ -> ()
|
||||
end
|
||||
|
||||
type t = {
|
||||
addr: string;
|
||||
port: int;
|
||||
buf_pool: Buf.t Pool.t;
|
||||
slice_pool: IO.Slice.t Pool.t;
|
||||
max_connections: int;
|
||||
sem_max_connections: Sem.t;
|
||||
(** semaphore to restrict the number of active concurrent connections *)
|
||||
mutable sock: Fd.t option; (** Socket *)
|
||||
new_thread: (unit -> unit) -> unit;
|
||||
timeout: float;
|
||||
running: bool A.t; (* TODO: use an atomic? *)
|
||||
}
|
||||
|
||||
let to_tcp_server (self : t) : IO.TCP_server.builder =
|
||||
{
|
||||
IO.TCP_server.serve =
|
||||
(fun ~after_init ~handle () : unit ->
|
||||
let sock, should_bind =
|
||||
match self.sock with
|
||||
| Some s ->
|
||||
(* Because we're getting a socket from the caller (e.g. systemd) *)
|
||||
s, false
|
||||
| None ->
|
||||
let sock =
|
||||
Unix.socket
|
||||
(if Util.is_ipv6_str self.addr then
|
||||
Unix.PF_INET6
|
||||
else
|
||||
Unix.PF_INET)
|
||||
Unix.SOCK_STREAM 0
|
||||
in
|
||||
let fd = Fd.create sock in
|
||||
fd, true (* Because we're creating the socket ourselves *)
|
||||
in
|
||||
MIO.Unix.set_nonblock sock;
|
||||
MIO.Unix.setsockopt_optint sock Unix.SO_LINGER None;
|
||||
if should_bind then (
|
||||
let inet_addr = Unix.inet_addr_of_string self.addr in
|
||||
MIO.Unix.setsockopt sock Unix.SO_REUSEADDR true;
|
||||
MIO.Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port));
|
||||
let n_listen = 2 * self.max_connections in
|
||||
MIO.Unix.listen sock n_listen
|
||||
);
|
||||
|
||||
self.sock <- Some sock;
|
||||
|
||||
let tcp_server =
|
||||
{
|
||||
IO.TCP_server.stop = (fun () -> Atomic.set self.running false);
|
||||
running = (fun () -> Atomic.get self.running);
|
||||
active_connections =
|
||||
(fun () ->
|
||||
self.max_connections - Sem.get_value self.sem_max_connections);
|
||||
endpoint =
|
||||
(fun () ->
|
||||
let addr, port = get_addr_ sock in
|
||||
Unix.string_of_inet_addr addr, port);
|
||||
}
|
||||
in
|
||||
after_init tcp_server;
|
||||
|
||||
(* how to handle a single client *)
|
||||
let handle_client_ (client_sock : Fd.t) (client_addr : Unix.sockaddr) :
|
||||
unit =
|
||||
Log.debug (fun k ->
|
||||
k "t[%d]: serving new client on %s"
|
||||
(Thread.id @@ Thread.self ())
|
||||
(Util.show_sockaddr client_addr));
|
||||
|
||||
MIO.Unix.set_nonblock client_sock;
|
||||
MIO.Unix.setsockopt client_sock Unix.TCP_NODELAY true;
|
||||
MIO.Unix.(setsockopt_float client_sock SO_RCVTIMEO self.timeout);
|
||||
MIO.Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout);
|
||||
|
||||
Pool.with_resource self.slice_pool @@ fun ic_buf ->
|
||||
Pool.with_resource self.slice_pool @@ fun oc_buf ->
|
||||
let closed = ref false in
|
||||
|
||||
let oc =
|
||||
new IO_helper.Output.of_unix_fd
|
||||
~close_noerr:true ~closed ~buf:oc_buf client_sock
|
||||
in
|
||||
let ic =
|
||||
IO_helper.Input.of_unix_fd ~close_noerr:true ~closed ~buf:ic_buf
|
||||
client_sock
|
||||
in
|
||||
handle.handle ~client_addr ic oc
|
||||
in
|
||||
|
||||
MIO.Unix.set_nonblock sock;
|
||||
while Atomic.get self.running do
|
||||
match MIO.Unix.accept sock with
|
||||
| client_sock, client_addr ->
|
||||
(* limit concurrency *)
|
||||
Sem.acquire self.sem_max_connections;
|
||||
self.new_thread (fun () ->
|
||||
try
|
||||
handle_client_ client_sock client_addr;
|
||||
Log.debug (fun k ->
|
||||
k "t[%d]: done with client on %s, exiting"
|
||||
(Thread.id @@ Thread.self ())
|
||||
@@ Util.show_sockaddr client_addr);
|
||||
shutdown_silent_ client_sock;
|
||||
close_silent_ client_sock;
|
||||
Sem.release self.sem_max_connections
|
||||
with e ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
shutdown_silent_ client_sock;
|
||||
close_silent_ client_sock;
|
||||
Sem.release self.sem_max_connections;
|
||||
Log.error (fun k ->
|
||||
k
|
||||
"@[<v>Handler: uncaught exception for client %s:@ %s@ \
|
||||
%s@]"
|
||||
(Util.show_sockaddr client_addr)
|
||||
(Printexc.to_string e)
|
||||
(Printexc.raw_backtrace_to_string bt)))
|
||||
| exception e ->
|
||||
Log.error (fun k ->
|
||||
k "Unix.accept raised an exception: %s" (Printexc.to_string e));
|
||||
Atomic.set self.running false
|
||||
done;
|
||||
|
||||
(* Wait for all threads to be done: this only works if all threads are done. *)
|
||||
MIO.Unix.close sock;
|
||||
while Sem.get_value self.sem_max_connections < self.max_connections do
|
||||
Sem.acquire self.sem_max_connections
|
||||
done;
|
||||
());
|
||||
}
|
||||
52
src/moonpool-io/tiny_httpd_moonpool.ml
Normal file
52
src/moonpool-io/tiny_httpd_moonpool.ml
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
include Tiny_httpd
|
||||
module Fd = Io_server.Fd
|
||||
|
||||
open struct
|
||||
let get_max_connection_ ?(max_connections = 64) () : int =
|
||||
let max_connections = max 4 max_connections in
|
||||
max_connections
|
||||
|
||||
let clear_slice (slice : IO.Slice.t) =
|
||||
Bytes.fill slice.bytes 0 (Bytes.length slice.bytes) '\x00';
|
||||
slice.off <- 0;
|
||||
slice.len <- 0
|
||||
end
|
||||
|
||||
let create ?max_connections ?(timeout = 0.0) ?buf_size
|
||||
?(get_time_s = Unix.gettimeofday) ?(addr = "127.0.0.1") ?(port = 8080)
|
||||
?(sock : Fd.t option) ?middlewares ~(runner : Moonpool.Runner.t) () : t =
|
||||
let new_thread f =
|
||||
ignore (Moonpool_fib.spawn_top ~on:runner f : _ Moonpool_fib.t)
|
||||
in
|
||||
let max_connections = get_max_connection_ ?max_connections () in
|
||||
let server =
|
||||
{
|
||||
Io_server.addr;
|
||||
new_thread;
|
||||
buf_pool =
|
||||
Pool.create ~clear:Buf.clear_and_zero
|
||||
~mk_item:(fun () -> Buf.create ?size:buf_size ())
|
||||
();
|
||||
slice_pool =
|
||||
Pool.create ~clear:clear_slice
|
||||
~mk_item:
|
||||
(let buf_size = Option.value buf_size ~default:4096 in
|
||||
fun () -> IO.Slice.create buf_size)
|
||||
();
|
||||
running = Atomic.make true;
|
||||
port;
|
||||
sock;
|
||||
max_connections;
|
||||
sem_max_connections = Io_server.Sem.make max_connections;
|
||||
timeout;
|
||||
}
|
||||
in
|
||||
let tcp_server_builder = Io_server.to_tcp_server server in
|
||||
let module B = struct
|
||||
let init_addr () = addr
|
||||
let init_port () = port
|
||||
let get_time_s = get_time_s
|
||||
let tcp_server () = tcp_server_builder
|
||||
end in
|
||||
let backend = (module B : IO_BACKEND) in
|
||||
Server.create_from ?buf_size ?middlewares ~backend ()
|
||||
|
|
@ -1,5 +1,26 @@
|
|||
open Common_ws_
|
||||
|
||||
module With_lock = struct
|
||||
type t = { with_lock: 'a. (unit -> 'a) -> 'a }
|
||||
type builder = unit -> t
|
||||
|
||||
let default_builder : builder =
|
||||
fun () ->
|
||||
let mutex = Mutex.create () in
|
||||
{
|
||||
with_lock =
|
||||
(fun f ->
|
||||
Mutex.lock mutex;
|
||||
try
|
||||
let x = f () in
|
||||
Mutex.unlock mutex;
|
||||
x
|
||||
with e ->
|
||||
Mutex.unlock mutex;
|
||||
raise e);
|
||||
}
|
||||
end
|
||||
|
||||
type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit
|
||||
|
||||
module Frame_type = struct
|
||||
|
|
@ -52,10 +73,10 @@ module Writer = struct
|
|||
mutable offset: int; (** number of bytes already in [buf] *)
|
||||
oc: IO.Output.t;
|
||||
mutable closed: bool;
|
||||
mutex: Mutex.t;
|
||||
mutex: With_lock.t;
|
||||
}
|
||||
|
||||
let create ?(buf_size = 16 * 1024) ~oc () : t =
|
||||
let create ?(buf_size = 16 * 1024) ~with_lock ~oc () : t =
|
||||
{
|
||||
header = Header.create ();
|
||||
header_buf = Bytes.create 16;
|
||||
|
|
@ -63,19 +84,9 @@ module Writer = struct
|
|||
offset = 0;
|
||||
oc;
|
||||
closed = false;
|
||||
mutex = Mutex.create ();
|
||||
mutex = with_lock;
|
||||
}
|
||||
|
||||
let[@inline] with_mutex_ (self : t) f =
|
||||
Mutex.lock self.mutex;
|
||||
try
|
||||
let x = f () in
|
||||
Mutex.unlock self.mutex;
|
||||
x
|
||||
with e ->
|
||||
Mutex.unlock self.mutex;
|
||||
raise e
|
||||
|
||||
let[@inline] close self = self.closed <- true
|
||||
let int_of_bool : bool -> int = Obj.magic
|
||||
|
||||
|
|
@ -142,7 +153,7 @@ module Writer = struct
|
|||
if self.offset = Bytes.length self.buf then really_output_buf_ self
|
||||
|
||||
let send_pong (self : t) : unit =
|
||||
let@ () = with_mutex_ self in
|
||||
let@ () = self.mutex.with_lock in
|
||||
self.header.fin <- true;
|
||||
self.header.ty <- Frame_type.pong;
|
||||
self.header.payload_len <- 0;
|
||||
|
|
@ -151,7 +162,7 @@ module Writer = struct
|
|||
write_header_ self
|
||||
|
||||
let output_char (self : t) c : unit =
|
||||
let@ () = with_mutex_ self in
|
||||
let@ () = self.mutex.with_lock in
|
||||
let cap = Bytes.length self.buf - self.offset in
|
||||
(* make room for [c] *)
|
||||
if cap = 0 then really_output_buf_ self;
|
||||
|
|
@ -161,7 +172,7 @@ module Writer = struct
|
|||
if cap = 1 then really_output_buf_ self
|
||||
|
||||
let output (self : t) buf i len : unit =
|
||||
let@ () = with_mutex_ self in
|
||||
let@ () = self.mutex.with_lock in
|
||||
let i = ref i in
|
||||
let len = ref len in
|
||||
while !len > 0 do
|
||||
|
|
@ -179,7 +190,7 @@ module Writer = struct
|
|||
flush_if_full self
|
||||
|
||||
let flush self : unit =
|
||||
let@ () = with_mutex_ self in
|
||||
let@ () = self.mutex.with_lock in
|
||||
flush_ self
|
||||
end
|
||||
|
||||
|
|
@ -187,8 +198,8 @@ module Reader = struct
|
|||
type state =
|
||||
| Begin (** At the beginning of a frame *)
|
||||
| Reading_frame of { mutable remaining_bytes: int; mutable num_read: int }
|
||||
(** Currently reading the payload of a frame with [remaining_bytes]
|
||||
left to read from the underlying [ic] *)
|
||||
(** Currently reading the payload of a frame with [remaining_bytes] left
|
||||
to read from the underlying [ic] *)
|
||||
| Close
|
||||
|
||||
type t = {
|
||||
|
|
@ -266,7 +277,7 @@ module Reader = struct
|
|||
external apply_masking_ :
|
||||
key:bytes -> key_offset:int -> buf:bytes -> int -> int -> unit
|
||||
= "tiny_httpd_ws_apply_masking"
|
||||
[@@noalloc]
|
||||
[@@noalloc]
|
||||
(** Apply masking to the parsed data *)
|
||||
|
||||
let[@inline] apply_masking ~mask_key ~mask_offset (buf : bytes) off len : unit
|
||||
|
|
@ -390,8 +401,8 @@ module Reader = struct
|
|||
)
|
||||
end
|
||||
|
||||
let upgrade ic oc : _ * _ =
|
||||
let writer = Writer.create ~oc () in
|
||||
let upgrade ?(with_lock = With_lock.default_builder ()) ic oc : _ * _ =
|
||||
let writer = Writer.create ~with_lock ~oc () in
|
||||
let reader = Reader.create ~ic ~writer () in
|
||||
let ws_ic : IO.Input.t =
|
||||
object
|
||||
|
|
@ -414,9 +425,11 @@ let upgrade ic oc : _ * _ =
|
|||
in
|
||||
ws_ic, ws_oc
|
||||
|
||||
(** Turn a regular connection handler (provided by the user) into a websocket upgrade handler *)
|
||||
(** Turn a regular connection handler (provided by the user) into a websocket
|
||||
upgrade handler *)
|
||||
module Make_upgrade_handler (X : sig
|
||||
val accept_ws_protocol : string -> bool
|
||||
val with_lock : With_lock.builder
|
||||
val handler : handler
|
||||
end) : Server.UPGRADE_HANDLER with type handshake_state = unit Request.t =
|
||||
struct
|
||||
|
|
@ -461,7 +474,8 @@ struct
|
|||
try Ok (handshake_ req) with Bad_req s -> Error s
|
||||
|
||||
let handle_connection req ic oc =
|
||||
let ws_ic, ws_oc = upgrade ic oc in
|
||||
let with_lock = X.with_lock () in
|
||||
let ws_ic, ws_oc = upgrade ~with_lock ic oc in
|
||||
try X.handler req ws_ic ws_oc
|
||||
with Close_connection ->
|
||||
Log.debug (fun k -> k "websocket: requested to close the connection");
|
||||
|
|
@ -469,9 +483,11 @@ struct
|
|||
end
|
||||
|
||||
let add_route_handler ?accept ?(accept_ws_protocol = fun _ -> true) ?middlewares
|
||||
(server : Server.t) route (f : handler) : unit =
|
||||
?(with_lock = With_lock.default_builder) (server : Server.t) route
|
||||
(f : handler) : unit =
|
||||
let module M = Make_upgrade_handler (struct
|
||||
let handler = f
|
||||
let with_lock = with_lock
|
||||
let accept_ws_protocol = accept_ws_protocol
|
||||
end) in
|
||||
let up : Server.upgrade_handler = (module M) in
|
||||
|
|
|
|||
|
|
@ -1,30 +1,60 @@
|
|||
(** Websockets for Tiny_httpd.
|
||||
|
||||
This sub-library ([tiny_httpd.ws]) exports a small implementation
|
||||
for a websocket server. It has no additional dependencies.
|
||||
*)
|
||||
This sub-library ([tiny_httpd.ws]) exports a small implementation for a
|
||||
websocket server. It has no additional dependencies. *)
|
||||
|
||||
(** Synchronization primitive used to allow both the reader to reply to "ping",
|
||||
and the handler to send messages, without stepping on each other's toes.
|
||||
|
||||
@since NEXT_RELEASE *)
|
||||
module With_lock : sig
|
||||
type t = { with_lock: 'a. (unit -> 'a) -> 'a }
|
||||
(** A primitive to run the callback in a critical section where others cannot
|
||||
run at the same time.
|
||||
|
||||
The default is a mutex, but that works poorly with thread pools so it's
|
||||
possible to use a semaphore or a cooperative mutex instead. *)
|
||||
|
||||
type builder = unit -> t
|
||||
|
||||
val default_builder : builder
|
||||
(** Lock using [Mutex]. *)
|
||||
end
|
||||
|
||||
type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit
|
||||
(** Websocket handler *)
|
||||
|
||||
val upgrade : IO.Input.t -> IO.Output.t -> IO.Input.t * IO.Output.t
|
||||
(** Upgrade a byte stream to the websocket framing protocol. *)
|
||||
val upgrade :
|
||||
?with_lock:With_lock.t ->
|
||||
IO.Input.t ->
|
||||
IO.Output.t ->
|
||||
IO.Input.t * IO.Output.t
|
||||
(** Upgrade a byte stream to the websocket framing protocol.
|
||||
@param with_lock
|
||||
if provided, use this to prevent reader and writer to compete on sending
|
||||
frames. since NEXT_RELEASE. *)
|
||||
|
||||
exception Close_connection
|
||||
(** Exception that can be raised from IOs inside the handler,
|
||||
when the connection is closed from underneath. *)
|
||||
(** Exception that can be raised from IOs inside the handler, when the
|
||||
connection is closed from underneath. *)
|
||||
|
||||
val add_route_handler :
|
||||
?accept:(unit Request.t -> (unit, int * string) result) ->
|
||||
?accept_ws_protocol:(string -> bool) ->
|
||||
?middlewares:Server.Head_middleware.t list ->
|
||||
?with_lock:With_lock.builder ->
|
||||
Server.t ->
|
||||
(Server.upgrade_handler, Server.upgrade_handler) Route.t ->
|
||||
handler ->
|
||||
unit
|
||||
(** Add a route handler for a websocket endpoint.
|
||||
@param accept_ws_protocol decides whether this endpoint accepts the websocket protocol
|
||||
sent by the client. Default accepts everything. *)
|
||||
@param accept_ws_protocol
|
||||
decides whether this endpoint accepts the websocket protocol sent by the
|
||||
client. Default accepts everything.
|
||||
@param with_lock
|
||||
if provided, use this to synchronize writes between the frame reader
|
||||
(replies "pong" to "ping") and the handler emitting writes. since
|
||||
NEXT_RELEASE. *)
|
||||
|
||||
(**/**)
|
||||
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ CAMLprim value tiny_httpd_ws_apply_masking(value _mask_key, value _mask_offset,
|
|||
CAMLparam5(_mask_key, _mask_offset, _buf, _offset, _len);
|
||||
|
||||
char const *mask_key = String_val(_mask_key);
|
||||
char *buf = Bytes_val(_buf);
|
||||
unsigned char *buf = Bytes_val(_buf);
|
||||
intnat mask_offset = Int_val(_mask_offset);
|
||||
intnat offset = Int_val(_offset);
|
||||
intnat len = Int_val(_len);
|
||||
|
|
|
|||
21
tests/dune
21
tests/dune
|
|
@ -19,6 +19,27 @@
|
|||
(action
|
||||
(diff echo1.expect echo1.out)))
|
||||
|
||||
(rule
|
||||
(targets echo_mio1.out)
|
||||
(deps
|
||||
(:bin ../examples/echo_mio.exe))
|
||||
(locks /port)
|
||||
(enabled_if
|
||||
(= %{system} "linux"))
|
||||
(package tiny_httpd_moonpool)
|
||||
(action
|
||||
(with-stdout-to
|
||||
%{targets}
|
||||
(run ./echo_mio1.sh %{bin}))))
|
||||
|
||||
(rule
|
||||
(alias runtest)
|
||||
(package tiny_httpd_moonpool)
|
||||
(enabled_if
|
||||
(= %{system} "linux"))
|
||||
(action
|
||||
(diff echo_mio1.expect echo_mio1.out)))
|
||||
|
||||
(rule
|
||||
(targets sse_count.out)
|
||||
(deps
|
||||
|
|
|
|||
10
tests/echo_mio1.expect
Normal file
10
tests/echo_mio1.expect
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
listening on http://127.0.0.1:8085
|
||||
test moonpool_io
|
||||
echo:
|
||||
{meth=GET; host=localhost:8085;
|
||||
headers=[user-agent: test
|
||||
accept: */*
|
||||
host: localhost:8085];
|
||||
path="/echo/?a=b&c=d"; body=""; path_components=["echo"];
|
||||
query=["c","d";"a","b"]}
|
||||
(query: "c" = "d";"a" = "b")
|
||||
12
tests/echo_mio1.sh
Executable file
12
tests/echo_mio1.sh
Executable file
|
|
@ -0,0 +1,12 @@
|
|||
#!/usr/bin/env sh
|
||||
|
||||
ECHO=$1
|
||||
PORT=8085
|
||||
|
||||
"$ECHO" -p $PORT &
|
||||
PID=$!
|
||||
sleep 0.1
|
||||
echo "test moonpool_io"
|
||||
curl -N "http://localhost:${PORT}/echo/?a=b&c=d" -H user-agent:test --max-time 5
|
||||
|
||||
kill $PID
|
||||
35
tiny_httpd_moonpool.opam
Normal file
35
tiny_httpd_moonpool.opam
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
# This file is generated by dune, edit dune-project instead
|
||||
opam-version: "2.0"
|
||||
version: "0.17"
|
||||
synopsis: "Moonpool+picos_stdio backend for Tiny_httpd"
|
||||
maintainer: ["c-cube"]
|
||||
authors: ["c-cube"]
|
||||
license: "MIT"
|
||||
homepage: "https://github.com/c-cube/tiny_httpd/"
|
||||
bug-reports: "https://github.com/c-cube/tiny_httpd/issues"
|
||||
depends: [
|
||||
"dune" {>= "2.9"}
|
||||
"seq"
|
||||
"tiny_httpd" {= version}
|
||||
"moonpool" {>= "0.7"}
|
||||
"moonpool-io" {>= "0.7"}
|
||||
"ocaml" {>= "5.0"}
|
||||
"odoc" {with-doc}
|
||||
]
|
||||
build: [
|
||||
["dune" "subst"] {dev}
|
||||
[
|
||||
"dune"
|
||||
"build"
|
||||
"-p"
|
||||
name
|
||||
"-j"
|
||||
jobs
|
||||
"--promote-install-files=false"
|
||||
"@install"
|
||||
"@runtest" {with-test}
|
||||
"@doc" {with-doc}
|
||||
]
|
||||
["dune" "install" "-p" name "--create-install-files" name]
|
||||
]
|
||||
dev-repo: "git+https://github.com/c-cube/tiny_httpd.git"
|
||||
Loading…
Add table
Reference in a new issue