mirror of
https://github.com/c-cube/tiny_httpd.git
synced 2025-12-06 03:05:29 -05:00
Merge 5caef14945 into 8a8aadfbb0
This commit is contained in:
commit
78552c9c04
10 changed files with 818 additions and 2 deletions
10
dune-project
10
dune-project
|
|
@ -39,3 +39,13 @@
|
|||
(iostream-camlzip (>= 0.2.1))
|
||||
(logs :with-test)
|
||||
(odoc :with-doc)))
|
||||
|
||||
(package
|
||||
(name tiny_httpd_lwt)
|
||||
(synopsis "Tiny_httpd backend based on lwt.unix for OCaml 5")
|
||||
(depends
|
||||
(tiny_httpd (= :version))
|
||||
(lwt (>= 5.0))
|
||||
base-unix
|
||||
(logs :with-test)
|
||||
(odoc :with-doc)))
|
||||
|
|
|
|||
2
echo_lwt.sh
Executable file
2
echo_lwt.sh
Executable file
|
|
@ -0,0 +1,2 @@
|
|||
#!/bin/sh
|
||||
exec dune exec --display=quiet --profile=release "examples/echo_lwt.exe" -- $@
|
||||
|
|
@ -8,11 +8,23 @@
|
|||
(modules sse_client)
|
||||
(libraries unix))
|
||||
|
||||
(library
|
||||
(name example_vfs)
|
||||
(wrapped false)
|
||||
(modules vfs)
|
||||
(libraries tiny_httpd))
|
||||
|
||||
(executable
|
||||
(name echo)
|
||||
(flags :standard -warn-error -a+8)
|
||||
(modules echo vfs)
|
||||
(libraries tiny_httpd logs tiny_httpd_camlzip tiny_httpd.multipart-form-data))
|
||||
(modules echo)
|
||||
(libraries tiny_httpd logs tiny_httpd_camlzip tiny_httpd.multipart-form-data example_vfs))
|
||||
|
||||
(executable
|
||||
(name echo_lwt)
|
||||
(flags :standard -warn-error -a+8)
|
||||
(modules echo_lwt)
|
||||
(libraries tiny_httpd tiny_httpd_lwt logs tiny_httpd_camlzip tiny_httpd.multipart-form-data example_vfs))
|
||||
|
||||
(executable
|
||||
(name writer)
|
||||
|
|
|
|||
380
examples/echo_lwt.ml
Normal file
380
examples/echo_lwt.ml
Normal file
|
|
@ -0,0 +1,380 @@
|
|||
open Tiny_httpd_core
|
||||
module Log = Tiny_httpd.Log
|
||||
module MFD = Tiny_httpd_multipart_form_data
|
||||
module Lwt_direct = Tiny_httpd_lwt.Lwt_direct
|
||||
|
||||
let now_ = Unix.gettimeofday
|
||||
|
||||
let alice_text =
|
||||
"CHAPTER I. Down the Rabbit-Hole Alice was beginning to get very tired of \
|
||||
sitting by her sister on the bank, and of having nothing to do: once or \
|
||||
twice she had peeped into the book her sister was reading, but it had no \
|
||||
pictures or conversations in it, <and what is the use of a book,> thought \
|
||||
Alice <without pictures or conversations?> So she was considering in her \
|
||||
own mind (as well as she could, for the hot day made her feel very sleepy \
|
||||
and stupid), whether the pleasure of making a daisy-chain would be worth \
|
||||
the trouble of getting up and picking the daisies, when suddenly a White \
|
||||
Rabbit with pink eyes ran close by her. There was nothing so very \
|
||||
remarkable in that; nor did Alice think it so very much out of the way to \
|
||||
hear the Rabbit say to itself, <Oh dear! Oh dear! I shall be late!> (when \
|
||||
she thought it over afterwards, it occurred to her that she ought to have \
|
||||
wondered at this, but at the time it all seemed quite natural); but when \
|
||||
the Rabbit actually took a watch out of its waistcoat-pocket, and looked at \
|
||||
it, and then hurried on, Alice started to her feet, for it flashed across \
|
||||
her mind that she had never before seen a rabbit with either a \
|
||||
waistcoat-pocket, or a watch to take out of it, and burning with curiosity, \
|
||||
she ran across the field after it, and fortunately was just in time to see \
|
||||
it pop down a large rabbit-hole under the hedge. In another moment down \
|
||||
went Alice after it, never once considering how in the world she was to get \
|
||||
out again. The rabbit-hole went straight on like a tunnel for some way, and \
|
||||
then dipped suddenly down, so suddenly that Alice had not a moment to think \
|
||||
about stopping herself before she found herself falling down a very deep \
|
||||
well. Either the well was very deep, or she fell very slowly, for she had \
|
||||
plenty of time as she went down to look about her and to wonder what was \
|
||||
going to happen next. First, she tried to look down and make out what she \
|
||||
was coming to, but it was too dark to see anything; then she looked at the \
|
||||
sides of the well, and noticed that they were filled with cupboards......"
|
||||
|
||||
(* util: a little middleware collecting statistics *)
|
||||
let middleware_stat () : Server.Middleware.t * (unit -> string) =
|
||||
let n_req = ref 0 in
|
||||
let total_time_ = ref 0. in
|
||||
let parse_time_ = ref 0. in
|
||||
let build_time_ = ref 0. in
|
||||
let write_time_ = ref 0. in
|
||||
|
||||
let m h req ~resp =
|
||||
incr n_req;
|
||||
let t1 = Request.start_time req in
|
||||
let t2 = now_ () in
|
||||
h req ~resp:(fun response ->
|
||||
let t3 = now_ () in
|
||||
resp response;
|
||||
let t4 = now_ () in
|
||||
total_time_ := !total_time_ +. (t4 -. t1);
|
||||
parse_time_ := !parse_time_ +. (t2 -. t1);
|
||||
build_time_ := !build_time_ +. (t3 -. t2);
|
||||
write_time_ := !write_time_ +. (t4 -. t3))
|
||||
and get_stat () =
|
||||
Printf.sprintf
|
||||
"%d requests (average response time: %.3fms = %.3fms + %.3fms + %.3fms)"
|
||||
!n_req
|
||||
(!total_time_ /. float !n_req *. 1e3)
|
||||
(!parse_time_ /. float !n_req *. 1e3)
|
||||
(!build_time_ /. float !n_req *. 1e3)
|
||||
(!write_time_ /. float !n_req *. 1e3)
|
||||
in
|
||||
m, get_stat
|
||||
|
||||
(* ugly AF *)
|
||||
let base64 x =
|
||||
let ic, oc = Unix.open_process "base64" in
|
||||
output_string oc x;
|
||||
flush oc;
|
||||
close_out oc;
|
||||
let r = input_line ic in
|
||||
ignore (Unix.close_process (ic, oc));
|
||||
r
|
||||
|
||||
let setup_logging () =
|
||||
Logs.set_reporter @@ Logs.format_reporter ();
|
||||
Logs.set_level ~all:true (Some Logs.Debug)
|
||||
|
||||
let setup_upload server : unit =
|
||||
Server.add_route_handler_stream ~meth:`POST server
|
||||
Route.(exact "upload" @/ return)
|
||||
(fun req ->
|
||||
let (`boundary boundary) =
|
||||
match MFD.parse_content_type req.headers with
|
||||
| Some b -> b
|
||||
| None -> Response.fail_raise ~code:400 "no boundary found"
|
||||
in
|
||||
|
||||
let st = MFD.create ~boundary req.body in
|
||||
let tbl = Hashtbl.create 16 in
|
||||
let cur = ref "" in
|
||||
let cur_kind = ref "" in
|
||||
let buf = Buffer.create 16 in
|
||||
let rec loop () =
|
||||
match MFD.next st with
|
||||
| End_of_input ->
|
||||
if !cur <> "" then
|
||||
Hashtbl.add tbl !cur (!cur_kind, Buffer.contents buf)
|
||||
| Part headers ->
|
||||
if !cur <> "" then
|
||||
Hashtbl.add tbl !cur (!cur_kind, Buffer.contents buf);
|
||||
(match MFD.Content_disposition.parse headers with
|
||||
| Some { kind; name = Some name; filename = _ } ->
|
||||
cur := name;
|
||||
cur_kind := kind;
|
||||
Buffer.clear buf;
|
||||
loop ()
|
||||
| _ -> Response.fail_raise ~code:400 "content disposition missing")
|
||||
| Read sl ->
|
||||
Buffer.add_subbytes buf sl.bytes sl.off sl.len;
|
||||
loop ()
|
||||
in
|
||||
loop ();
|
||||
|
||||
let open Tiny_httpd_html in
|
||||
let data =
|
||||
Hashtbl.fold
|
||||
(fun name (kind, data) acc ->
|
||||
Printf.sprintf "%S (kind: %S): %S" name kind data :: acc)
|
||||
tbl []
|
||||
in
|
||||
let html =
|
||||
body []
|
||||
[
|
||||
pre []
|
||||
[ txt (Printf.sprintf "{\n%s\n}" @@ String.concat "\n" data) ];
|
||||
]
|
||||
in
|
||||
Response.make_string ~code:201 @@ Ok (to_string_top html))
|
||||
|
||||
let () =
|
||||
let port_ = ref 8080 in
|
||||
let j = ref 32 in
|
||||
let addr = ref "127.0.0.1" in
|
||||
Arg.parse
|
||||
(Arg.align
|
||||
[
|
||||
"--port", Arg.Set_int port_, " set port";
|
||||
"-p", Arg.Set_int port_, " set port";
|
||||
"--debug", Arg.Unit setup_logging, " enable debug";
|
||||
"-j", Arg.Set_int j, " maximum number of connections";
|
||||
"--addr", Arg.Set_string addr, " binding address";
|
||||
])
|
||||
(fun _ -> raise (Arg.Bad ""))
|
||||
"echo [option]*";
|
||||
|
||||
let ev = new Lwt_engine.libev () in
|
||||
Lwt_engine.set ev;
|
||||
|
||||
Lwt_main.run @@ Lwt_direct.run
|
||||
@@ fun () ->
|
||||
let server =
|
||||
Tiny_httpd_lwt.create ~addr:!addr ~port:!port_ ~max_connections:!j ()
|
||||
|> Lwt_direct.await
|
||||
in
|
||||
|
||||
Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(16 * 1024) server;
|
||||
let m_stats, get_stats = middleware_stat () in
|
||||
Server.add_middleware server ~stage:(`Stage 1) m_stats;
|
||||
|
||||
(* say hello *)
|
||||
Server.add_route_handler ~meth:`GET server
|
||||
Route.(exact "hello" @/ string @/ return)
|
||||
(fun name _req -> Response.make_string (Ok ("hello " ^ name ^ "!\n")));
|
||||
|
||||
(* compressed file access *)
|
||||
Server.add_route_handler ~meth:`GET server
|
||||
Route.(exact "zcat" @/ string_urlencoded @/ return)
|
||||
(fun path _req ->
|
||||
let ic = open_in path in
|
||||
let str = IO.Input.of_in_channel ic in
|
||||
let mime_type =
|
||||
try
|
||||
let p = Unix.open_process_in (Printf.sprintf "file -i -b %S" path) in
|
||||
try
|
||||
let s = [ "Content-Type", String.trim (input_line p) ] in
|
||||
ignore @@ Unix.close_process_in p;
|
||||
s
|
||||
with _ ->
|
||||
ignore @@ Unix.close_process_in p;
|
||||
[]
|
||||
with _ -> []
|
||||
in
|
||||
Response.make_stream ~headers:mime_type (Ok str));
|
||||
|
||||
(* echo request *)
|
||||
Server.add_route_handler server
|
||||
Route.(exact "echo" @/ return)
|
||||
(fun req ->
|
||||
let q =
|
||||
Request.query req
|
||||
|> List.map (fun (k, v) -> Printf.sprintf "%S = %S" k v)
|
||||
|> String.concat ";"
|
||||
in
|
||||
Response.make_string
|
||||
(Ok (Format.asprintf "echo:@ %a@ (query: %s)@." Request.pp req q)));
|
||||
|
||||
(* file upload *)
|
||||
Server.add_route_handler_stream ~meth:`PUT server
|
||||
Route.(exact "upload" @/ string @/ return)
|
||||
(fun path req ->
|
||||
Log.debug (fun k ->
|
||||
k "start upload %S, headers:\n%s\n\n%!" path
|
||||
(Format.asprintf "%a" Headers.pp (Request.headers req)));
|
||||
try
|
||||
let oc = open_out @@ "/tmp/" ^ path in
|
||||
IO.Input.to_chan oc req.Request.body;
|
||||
flush oc;
|
||||
Response.make_string (Ok "uploaded file")
|
||||
with e ->
|
||||
Response.fail ~code:500 "couldn't upload file: %s"
|
||||
(Printexc.to_string e));
|
||||
|
||||
(* protected by login *)
|
||||
Server.add_route_handler server
|
||||
Route.(exact "protected" @/ return)
|
||||
(fun req ->
|
||||
let ok =
|
||||
match Request.get_header req "authorization" with
|
||||
| Some v ->
|
||||
Log.debug (fun k -> k "authenticate with %S" v);
|
||||
v = "Basic " ^ base64 "user:foobar"
|
||||
| None -> false
|
||||
in
|
||||
if ok then (
|
||||
(* FIXME: a logout link *)
|
||||
let s =
|
||||
"<p>hello, this is super secret!</p><a href=\"/logout\">log out</a>"
|
||||
in
|
||||
Response.make_string (Ok s)
|
||||
) else (
|
||||
let headers =
|
||||
Headers.(empty |> set "www-authenticate" "basic realm=\"echo\"")
|
||||
in
|
||||
Response.fail ~code:401 ~headers "invalid"
|
||||
));
|
||||
|
||||
(* logout *)
|
||||
Server.add_route_handler server
|
||||
Route.(exact "logout" @/ return)
|
||||
(fun _req -> Response.fail ~code:401 "logged out");
|
||||
|
||||
(* stats *)
|
||||
Server.add_route_handler server
|
||||
Route.(exact "stats" @/ return)
|
||||
(fun _req ->
|
||||
let stats = get_stats () in
|
||||
Response.make_string @@ Ok stats);
|
||||
|
||||
Server.add_route_handler server
|
||||
Route.(exact "alice" @/ return)
|
||||
(fun _req -> Response.make_string (Ok alice_text));
|
||||
|
||||
Server.add_route_handler ~meth:`HEAD server
|
||||
Route.(exact "head" @/ return)
|
||||
(fun _req ->
|
||||
Response.make_void ~code:200 ~headers:[ "x-hello", "world" ] ());
|
||||
|
||||
(* VFS *)
|
||||
Tiny_httpd.Dir.add_vfs server
|
||||
~config:
|
||||
(Tiny_httpd.Dir.config ~download:true
|
||||
~dir_behavior:Tiny_httpd.Dir.Index_or_lists ())
|
||||
~vfs:Vfs.vfs ~prefix:"vfs";
|
||||
|
||||
setup_upload server;
|
||||
|
||||
(* main page *)
|
||||
Server.add_route_handler server
|
||||
Route.(return)
|
||||
(fun _req ->
|
||||
let open Tiny_httpd_html in
|
||||
let h =
|
||||
html []
|
||||
[
|
||||
head [] [ title [] [ txt "index of echo" ] ];
|
||||
body []
|
||||
[
|
||||
h3 [] [ txt "welcome!" ];
|
||||
p [] [ b [] [ txt "endpoints are:" ] ];
|
||||
ul []
|
||||
[
|
||||
li [] [ pre [] [ txt "/hello/:name (GET)" ] ];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
a [ A.href "/echo/" ] [ txt "echo" ];
|
||||
txt " echo back query";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[ pre [] [ txt "/upload/:path (PUT) to upload a file" ] ];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
txt
|
||||
"/zcat/:path (GET) to download a file (deflate \
|
||||
transfer-encoding)";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
a [ A.href "/stats/" ] [ txt "/stats/" ];
|
||||
txt " (GET) to access statistics";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
a [ A.href "/vfs/" ] [ txt "/vfs" ];
|
||||
txt " (GET) to access a VFS embedded in the binary";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
a [ A.href "/protected" ] [ txt "/protected" ];
|
||||
txt
|
||||
" (GET) to see a protected page (login: user, \
|
||||
password: foobar)";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[
|
||||
pre []
|
||||
[
|
||||
a [ A.href "/logout" ] [ txt "/logout" ];
|
||||
txt " (POST) to log out";
|
||||
];
|
||||
];
|
||||
li []
|
||||
[
|
||||
form
|
||||
[
|
||||
A.action "/upload";
|
||||
A.enctype "multipart/form-data";
|
||||
A.target "_self";
|
||||
A.method_ "POST";
|
||||
]
|
||||
[
|
||||
label [] [ txt "my beautiful form" ];
|
||||
input [ A.type_ "file"; A.name "file1" ];
|
||||
input [ A.type_ "file"; A.name "file2" ];
|
||||
input
|
||||
[
|
||||
A.type_ "text";
|
||||
A.name "a";
|
||||
A.placeholder "text A";
|
||||
];
|
||||
input
|
||||
[
|
||||
A.type_ "text";
|
||||
A.name "b";
|
||||
A.placeholder "text B";
|
||||
];
|
||||
input [ A.type_ "submit" ];
|
||||
];
|
||||
];
|
||||
];
|
||||
];
|
||||
]
|
||||
in
|
||||
let s = to_string_top h in
|
||||
Response.make_string ~headers:[ "content-type", "text/html" ] @@ Ok s);
|
||||
|
||||
Printf.printf "listening on http://%s:%d\n%!" (Server.addr server)
|
||||
(Server.port server);
|
||||
match Server.run server with
|
||||
| Ok () -> ()
|
||||
| Error e -> raise e
|
||||
6
src/lwt/dune
Normal file
6
src/lwt/dune
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
|
||||
(library
|
||||
(name tiny_httpd_lwt)
|
||||
(public_name tiny_httpd_lwt)
|
||||
(enabled_if (>= %{ocaml_version} 5.0))
|
||||
(libraries tiny_httpd lwt lwt.unix))
|
||||
93
src/lwt/lwt_direct.ml
Normal file
93
src/lwt/lwt_direct.ml
Normal file
|
|
@ -0,0 +1,93 @@
|
|||
module ED = Effect.Deep
|
||||
|
||||
type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t | Yield : unit Effect.t
|
||||
|
||||
(** Queue of microtasks that are ready *)
|
||||
let tasks : (unit -> unit) Queue.t = Queue.create ()
|
||||
|
||||
let[@inline] push_task f : unit = Queue.push f tasks
|
||||
|
||||
let default_on_uncaught_exn exn bt =
|
||||
Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!"
|
||||
(Printexc.to_string exn)
|
||||
(Printexc.raw_backtrace_to_string bt)
|
||||
|
||||
let run_all_tasks () : unit =
|
||||
let n_processed = ref 0 in
|
||||
let max_number_of_steps = min 10_000 (2 * Queue.length tasks) in
|
||||
while (not (Queue.is_empty tasks)) && !n_processed < max_number_of_steps do
|
||||
let t = Queue.pop tasks in
|
||||
incr n_processed;
|
||||
try t ()
|
||||
with exn ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
default_on_uncaught_exn exn bt
|
||||
done;
|
||||
(* make sure we don't sleep forever if there's no lwt promise
|
||||
ready but [tasks] contains ready tasks *)
|
||||
if not (Queue.is_empty tasks) then ignore (Lwt.pause () : unit Lwt.t)
|
||||
|
||||
let setup_hooks =
|
||||
let already_done = ref false in
|
||||
fun () ->
|
||||
if not !already_done then (
|
||||
already_done := true;
|
||||
let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in
|
||||
let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in
|
||||
()
|
||||
)
|
||||
|
||||
let await (fut : 'a Lwt.t) : 'a =
|
||||
match Lwt.state fut with
|
||||
| Lwt.Return x -> x
|
||||
| Lwt.Fail exn -> raise exn
|
||||
| Lwt.Sleep -> Effect.perform (Await fut)
|
||||
|
||||
let yield () : unit = Effect.perform Yield
|
||||
|
||||
(** the main effect handler *)
|
||||
let handler : _ ED.effect_handler =
|
||||
let effc : type b. b Effect.t -> ((b, unit) ED.continuation -> 'a) option =
|
||||
function
|
||||
| Yield -> Some (fun k -> push_task (fun () -> ED.continue k ()))
|
||||
| Await fut ->
|
||||
Some
|
||||
(fun k ->
|
||||
Lwt.on_any fut
|
||||
(fun res -> push_task (fun () -> ED.continue k res))
|
||||
(fun exn -> push_task (fun () -> ED.discontinue k exn)))
|
||||
| _ -> None
|
||||
in
|
||||
{ effc }
|
||||
|
||||
let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () :
|
||||
unit =
|
||||
let res = ref (Error (Failure "not resolved")) in
|
||||
let run_f_and_set_res () =
|
||||
(try
|
||||
let r = f () in
|
||||
res := Ok r
|
||||
with exn -> res := Error exn);
|
||||
Lwt.wakeup_result promise !res
|
||||
in
|
||||
ED.try_with run_f_and_set_res () handler
|
||||
|
||||
let run f : _ Lwt.t =
|
||||
setup_hooks ();
|
||||
let lwt, resolve = Lwt.wait () in
|
||||
push_task (run_inside_effect_handler_and_resolve_ resolve f);
|
||||
lwt
|
||||
|
||||
let run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f () : unit =
|
||||
let run_f () : unit =
|
||||
try f ()
|
||||
with exn ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
on_uncaught_exn exn bt
|
||||
in
|
||||
ED.try_with run_f () handler
|
||||
|
||||
let run_in_the_background ?(on_uncaught_exn = default_on_uncaught_exn) f : unit
|
||||
=
|
||||
setup_hooks ();
|
||||
push_task (run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f)
|
||||
33
src/lwt/lwt_direct.mli
Normal file
33
src/lwt/lwt_direct.mli
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
(** Direct style control flow for Lwt. *)
|
||||
|
||||
val run : (unit -> 'a) -> 'a Lwt.t
|
||||
(** [run f] runs the function [f ()] in a task within
|
||||
the [Lwt_unix] event loop. [f ()] can create [Lwt]
|
||||
promises and use {!await} to wait for them. Like any promise
|
||||
in Lwt, [f ()] can starve the event loop if it runs long computations
|
||||
without yielding to the event loop.
|
||||
|
||||
When [f ()] terminates (successfully or not), the promise
|
||||
[run f] is resolved with [f ()]'s result, or the exception
|
||||
raised by [f ()]. *)
|
||||
|
||||
val run_in_the_background :
|
||||
?on_uncaught_exn:(exn -> Printexc.raw_backtrace -> unit) ->
|
||||
(unit -> unit) ->
|
||||
unit
|
||||
(** [run_in_the_background f] is similar to [ignore (run f)].
|
||||
The computation [f()] runs in the background in the event loop
|
||||
and returns no result.
|
||||
@param on_uncaught_exn if provided, this is called when [f()]
|
||||
raises an exception. *)
|
||||
|
||||
val yield : unit -> unit
|
||||
(** Yield to the event loop.
|
||||
Can only be used inside {!run} or {!run_in_the_background}. *)
|
||||
|
||||
val await : 'a Lwt.t -> 'a
|
||||
(** [await prom] returns the result of [prom], or re-raises the
|
||||
exception with which [prom] failed if it failed.
|
||||
If [prom] is not resolved yet, [await prom] will suspend the
|
||||
current task and resume it when [prom] is resolved.
|
||||
Can only be used inside {!run} or {!run_in_the_background}. *)
|
||||
220
src/lwt/tiny_httpd_lwt.ml
Normal file
220
src/lwt/tiny_httpd_lwt.ml
Normal file
|
|
@ -0,0 +1,220 @@
|
|||
module IO = Tiny_httpd.IO
|
||||
module H = Tiny_httpd.Server
|
||||
module Pool = Tiny_httpd.Pool
|
||||
module Slice = IO.Slice
|
||||
module Log = Tiny_httpd.Log
|
||||
module Lwt_direct = Lwt_direct
|
||||
|
||||
let spf = Printf.sprintf
|
||||
let ( let@ ) = ( @@ )
|
||||
|
||||
type 'a with_args =
|
||||
?addr:string ->
|
||||
?port:int ->
|
||||
?unix_sock:string ->
|
||||
?max_connections:int ->
|
||||
?max_buf_pool_size:int ->
|
||||
?buf_size:int ->
|
||||
'a
|
||||
|
||||
let get_max_connection_ ?(max_connections = 64) () : int =
|
||||
let max_connections = max 4 max_connections in
|
||||
max_connections
|
||||
|
||||
let default_buf_size = 16 * 1024
|
||||
|
||||
let show_sockaddr = function
|
||||
| Unix.ADDR_UNIX s -> s
|
||||
| Unix.ADDR_INET (addr, port) ->
|
||||
spf "%s:%d" (Unix.string_of_inet_addr addr) port
|
||||
|
||||
let ic_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) :
|
||||
IO.Input.t =
|
||||
object
|
||||
inherit Iostream.In_buf.t_from_refill ~bytes ()
|
||||
|
||||
method private refill (sl : Slice.t) =
|
||||
assert (sl.len = 0);
|
||||
sl.off <- 0;
|
||||
let n =
|
||||
Lwt_unix.read fd sl.bytes 0 (Bytes.length sl.bytes) |> Lwt_direct.await
|
||||
in
|
||||
sl.len <- n
|
||||
|
||||
method close () =
|
||||
decr num_open;
|
||||
if !num_open <= 0 then Lwt_unix.close fd |> Lwt_direct.await
|
||||
end
|
||||
|
||||
let oc_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) :
|
||||
IO.Output.t =
|
||||
object
|
||||
inherit IO.Output.t_from_output ~bytes ()
|
||||
(* method flush () : unit = Lwt_io.flush oc |> Lwt_direct.await *)
|
||||
|
||||
method private output_underlying buf i len =
|
||||
let i = ref i in
|
||||
let len = ref len in
|
||||
while !len > 0 do
|
||||
let n = Lwt_unix.write fd buf !i !len |> Lwt_direct.await in
|
||||
i := !i + n;
|
||||
len := !len - n
|
||||
done
|
||||
|
||||
method private close_underlying () =
|
||||
decr num_open;
|
||||
if !num_open <= 0 then Lwt_unix.close fd |> Lwt_direct.await
|
||||
end
|
||||
|
||||
let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
|
||||
?(buf_size = default_buf_size) () : (module H.IO_BACKEND) =
|
||||
let _buf_pool =
|
||||
Pool.create ?max_size:max_buf_pool_size
|
||||
~mk_item:(fun () -> Lwt_bytes.create buf_size)
|
||||
()
|
||||
in
|
||||
|
||||
let addr, port, (sockaddr : Unix.sockaddr) =
|
||||
match addr, port, unix_sock with
|
||||
| _, _, Some s -> Printf.sprintf "unix:%s" s, 0, Unix.ADDR_UNIX s
|
||||
| addr, port, None ->
|
||||
let addr = Option.value ~default:"127.0.0.1" addr in
|
||||
let sockaddr, port =
|
||||
match Lwt_unix.getaddrinfo addr "" [] |> Lwt_direct.await, port with
|
||||
| { Unix.ai_addr = ADDR_INET (h, _); _ } :: _, None ->
|
||||
let p = 8080 in
|
||||
Unix.ADDR_INET (h, p), p
|
||||
| { Unix.ai_addr = ADDR_INET (h, _); _ } :: _, Some p ->
|
||||
Unix.ADDR_INET (h, p), p
|
||||
| _ ->
|
||||
failwith @@ Printf.sprintf "Could not parse TCP address from %S" addr
|
||||
in
|
||||
addr, port, sockaddr
|
||||
in
|
||||
|
||||
let module M = struct
|
||||
let init_addr () = addr
|
||||
let init_port () = port
|
||||
let get_time_s () = Unix.gettimeofday ()
|
||||
let max_connections = get_max_connection_ ?max_connections ()
|
||||
|
||||
let pool_size =
|
||||
match max_buf_pool_size with
|
||||
| Some n -> n
|
||||
| None -> min 4096 (max_connections * 2)
|
||||
|
||||
let tcp_server () : IO.TCP_server.builder =
|
||||
{
|
||||
IO.TCP_server.serve =
|
||||
(fun ~after_init ~handle () : unit ->
|
||||
let server_done, set_server_done = Lwt.wait () in
|
||||
let running = Atomic.make true in
|
||||
let active_conns = Atomic.make 0 in
|
||||
|
||||
(* a pool of buffers, to reduce allocations *)
|
||||
let buf_pool =
|
||||
Pool.create ~max_size:pool_size
|
||||
~clear:(fun buf -> Bytes.fill buf 0 (Bytes.length buf) '\x00')
|
||||
~mk_item:(fun () -> Bytes.create buf_size)
|
||||
()
|
||||
in
|
||||
|
||||
(* Eio.Switch.on_release sw (fun () -> Atomic.set running false); *)
|
||||
let port = ref port in
|
||||
|
||||
let server_loop : unit Lwt.t =
|
||||
let@ () = Lwt_direct.run in
|
||||
let backlog = max_connections in
|
||||
let sock =
|
||||
Lwt_unix.socket ~cloexec:true
|
||||
(Unix.domain_of_sockaddr sockaddr)
|
||||
Unix.SOCK_STREAM 0
|
||||
in
|
||||
Lwt_unix.setsockopt sock Unix.TCP_NODELAY true;
|
||||
Lwt_unix.setsockopt_optint sock Unix.SO_LINGER None;
|
||||
Lwt_unix.setsockopt sock Unix.SO_REUSEADDR true;
|
||||
Lwt_unix.setsockopt sock Unix.SO_REUSEPORT true;
|
||||
Lwt_unix.bind sock sockaddr |> Lwt_direct.await;
|
||||
Lwt_unix.listen sock backlog;
|
||||
|
||||
let cleanup () = Lwt_unix.close sock |> Lwt_direct.await in
|
||||
|
||||
(* recover real port, if any *)
|
||||
(match Unix.getsockname (Lwt_unix.unix_file_descr sock) with
|
||||
| Unix.ADDR_INET (_, p) -> port := p
|
||||
| _ -> ());
|
||||
|
||||
let handle_client client_addr fd : unit =
|
||||
Atomic.incr active_conns;
|
||||
Lwt_direct.run_in_the_background @@ fun () ->
|
||||
let@ buf_ic = Pool.with_resource buf_pool in
|
||||
let@ buf_oc = Pool.with_resource buf_pool in
|
||||
|
||||
(* close FD when both ends are closed *)
|
||||
let num_open = ref 2 in
|
||||
let ic = ic_of_fd ~num_open ~bytes:buf_ic fd in
|
||||
let oc = oc_of_fd ~num_open ~bytes:buf_oc fd in
|
||||
|
||||
let cleanup ~shutdown () =
|
||||
Log.debug (fun k ->
|
||||
k "Tiny_httpd_lwt: client handler returned");
|
||||
Atomic.decr active_conns;
|
||||
if shutdown then (
|
||||
try Lwt_unix.shutdown fd SHUTDOWN_ALL with _ -> ()
|
||||
);
|
||||
ic#close ();
|
||||
oc#close ()
|
||||
in
|
||||
|
||||
try
|
||||
handle.handle ~client_addr ic oc;
|
||||
cleanup ~shutdown:true ()
|
||||
with exn ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
cleanup ~shutdown:false ();
|
||||
Log.error (fun k ->
|
||||
k "Client handler for %s failed with %s\n%s"
|
||||
(show_sockaddr client_addr)
|
||||
(Printexc.to_string exn)
|
||||
(Printexc.raw_backtrace_to_string bt))
|
||||
in
|
||||
|
||||
try
|
||||
while Atomic.get running do
|
||||
let fd, addr = Lwt_unix.accept sock |> Lwt_direct.await in
|
||||
handle_client addr fd
|
||||
done;
|
||||
cleanup ()
|
||||
with exn ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
cleanup ();
|
||||
Printexc.raise_with_backtrace exn bt
|
||||
in
|
||||
|
||||
let tcp_server : IO.TCP_server.t =
|
||||
{
|
||||
running = (fun () -> Atomic.get running);
|
||||
stop =
|
||||
(fun () ->
|
||||
Atomic.set running false;
|
||||
Lwt.wakeup_later set_server_done ();
|
||||
Lwt_direct.await server_loop);
|
||||
endpoint = (fun () -> addr, !port);
|
||||
active_connections = (fun () -> Atomic.get active_conns);
|
||||
}
|
||||
in
|
||||
|
||||
after_init tcp_server;
|
||||
Lwt_direct.await server_done);
|
||||
}
|
||||
end in
|
||||
(module M)
|
||||
|
||||
let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ?buf_size
|
||||
?middlewares () : H.t Lwt.t =
|
||||
let@ () = Lwt_direct.run in
|
||||
let backend =
|
||||
io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections
|
||||
?buf_size ()
|
||||
in
|
||||
H.create_from ?buf_size ?middlewares ~backend ()
|
||||
28
src/lwt/tiny_httpd_lwt.mli
Normal file
28
src/lwt/tiny_httpd_lwt.mli
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
(** Lwt backend for Tiny_httpd.
|
||||
|
||||
This only works on OCaml 5 because it uses effect handlers to use Lwt in
|
||||
direct style.
|
||||
|
||||
{b NOTE}: this is very experimental and will absolutely change over time,
|
||||
@since NEXT_RELEASE *)
|
||||
|
||||
module Lwt_direct = Lwt_direct
|
||||
|
||||
type 'a with_args =
|
||||
?addr:string ->
|
||||
?port:int ->
|
||||
?unix_sock:string ->
|
||||
?max_connections:int ->
|
||||
?max_buf_pool_size:int ->
|
||||
?buf_size:int ->
|
||||
'a
|
||||
|
||||
val io_backend : (unit -> (module Tiny_httpd.Server.IO_BACKEND)) with_args
|
||||
(** Create a server *)
|
||||
|
||||
val create :
|
||||
(?middlewares:([ `Encoding | `Stage of int ] * Tiny_httpd.Middleware.t) list ->
|
||||
unit ->
|
||||
Tiny_httpd.Server.t Lwt.t)
|
||||
with_args
|
||||
(** Create a server *)
|
||||
32
tiny_httpd_lwt.opam
Normal file
32
tiny_httpd_lwt.opam
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
# This file is generated by dune, edit dune-project instead
|
||||
opam-version: "2.0"
|
||||
version: "0.19"
|
||||
synopsis: "Tiny_httpd backend based on lwt.unix for OCaml 5"
|
||||
maintainer: ["c-cube"]
|
||||
authors: ["c-cube"]
|
||||
license: "MIT"
|
||||
homepage: "https://github.com/c-cube/tiny_httpd/"
|
||||
bug-reports: "https://github.com/c-cube/tiny_httpd/issues"
|
||||
depends: [
|
||||
"dune" {>= "3.2"}
|
||||
"tiny_httpd" {= version}
|
||||
"lwt" {>= "5.0"}
|
||||
"base-unix"
|
||||
"logs" {with-test}
|
||||
"odoc" {with-doc}
|
||||
]
|
||||
build: [
|
||||
["dune" "subst"] {dev}
|
||||
[
|
||||
"dune"
|
||||
"build"
|
||||
"-p"
|
||||
name
|
||||
"-j"
|
||||
jobs
|
||||
"@install"
|
||||
"@runtest" {with-test}
|
||||
"@doc" {with-doc}
|
||||
]
|
||||
]
|
||||
dev-repo: "git+https://github.com/c-cube/tiny_httpd.git"
|
||||
Loading…
Add table
Reference in a new issue