This commit is contained in:
Simon Cruanes 2025-07-28 18:33:52 +00:00 committed by GitHub
commit c0c3c3bbe1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 863 additions and 2 deletions

View file

@ -39,3 +39,13 @@
(iostream-camlzip (>= 0.2.1))
(logs :with-test)
(odoc :with-doc)))
(package
(name tiny_httpd_lwt)
(synopsis "Tiny_httpd backend based on lwt.unix for OCaml 5")
(depends
(tiny_httpd (= :version))
(lwt (>= 5.0))
base-unix
(logs :with-test)
(odoc :with-doc)))

2
echo_lwt.sh Executable file
View file

@ -0,0 +1,2 @@
#!/bin/sh
exec dune exec --display=quiet --profile=release "examples/echo_lwt.exe" -- $@

View file

@ -8,11 +8,23 @@
(modules sse_client)
(libraries unix))
(library
(name example_vfs)
(wrapped false)
(modules vfs)
(libraries tiny_httpd))
(executable
(name echo)
(flags :standard -warn-error -a+8)
(modules echo vfs)
(libraries tiny_httpd logs tiny_httpd_camlzip tiny_httpd.multipart-form-data))
(modules echo)
(libraries tiny_httpd logs tiny_httpd_camlzip tiny_httpd.multipart-form-data example_vfs))
(executable
(name echo_lwt)
(flags :standard -warn-error -a+8)
(modules echo_lwt)
(libraries tiny_httpd tiny_httpd_lwt logs tiny_httpd_camlzip tiny_httpd.multipart-form-data example_vfs))
(executable
(name writer)

380
examples/echo_lwt.ml Normal file
View file

@ -0,0 +1,380 @@
open Tiny_httpd_core
module Log = Tiny_httpd.Log
module MFD = Tiny_httpd_multipart_form_data
module Lwt_direct = Tiny_httpd_lwt.Lwt_direct
let now_ = Unix.gettimeofday
let alice_text =
"CHAPTER I. Down the Rabbit-Hole Alice was beginning to get very tired of \
sitting by her sister on the bank, and of having nothing to do: once or \
twice she had peeped into the book her sister was reading, but it had no \
pictures or conversations in it, <and what is the use of a book,> thought \
Alice <without pictures or conversations?> So she was considering in her \
own mind (as well as she could, for the hot day made her feel very sleepy \
and stupid), whether the pleasure of making a daisy-chain would be worth \
the trouble of getting up and picking the daisies, when suddenly a White \
Rabbit with pink eyes ran close by her. There was nothing so very \
remarkable in that; nor did Alice think it so very much out of the way to \
hear the Rabbit say to itself, <Oh dear! Oh dear! I shall be late!> (when \
she thought it over afterwards, it occurred to her that she ought to have \
wondered at this, but at the time it all seemed quite natural); but when \
the Rabbit actually took a watch out of its waistcoat-pocket, and looked at \
it, and then hurried on, Alice started to her feet, for it flashed across \
her mind that she had never before seen a rabbit with either a \
waistcoat-pocket, or a watch to take out of it, and burning with curiosity, \
she ran across the field after it, and fortunately was just in time to see \
it pop down a large rabbit-hole under the hedge. In another moment down \
went Alice after it, never once considering how in the world she was to get \
out again. The rabbit-hole went straight on like a tunnel for some way, and \
then dipped suddenly down, so suddenly that Alice had not a moment to think \
about stopping herself before she found herself falling down a very deep \
well. Either the well was very deep, or she fell very slowly, for she had \
plenty of time as she went down to look about her and to wonder what was \
going to happen next. First, she tried to look down and make out what she \
was coming to, but it was too dark to see anything; then she looked at the \
sides of the well, and noticed that they were filled with cupboards......"
(* util: a little middleware collecting statistics *)
let middleware_stat () : Server.Middleware.t * (unit -> string) =
let n_req = ref 0 in
let total_time_ = ref 0. in
let parse_time_ = ref 0. in
let build_time_ = ref 0. in
let write_time_ = ref 0. in
let m h req ~resp =
incr n_req;
let t1 = Request.start_time req in
let t2 = now_ () in
h req ~resp:(fun response ->
let t3 = now_ () in
resp response;
let t4 = now_ () in
total_time_ := !total_time_ +. (t4 -. t1);
parse_time_ := !parse_time_ +. (t2 -. t1);
build_time_ := !build_time_ +. (t3 -. t2);
write_time_ := !write_time_ +. (t4 -. t3))
and get_stat () =
Printf.sprintf
"%d requests (average response time: %.3fms = %.3fms + %.3fms + %.3fms)"
!n_req
(!total_time_ /. float !n_req *. 1e3)
(!parse_time_ /. float !n_req *. 1e3)
(!build_time_ /. float !n_req *. 1e3)
(!write_time_ /. float !n_req *. 1e3)
in
m, get_stat
(* ugly AF *)
let base64 x =
let ic, oc = Unix.open_process "base64" in
output_string oc x;
flush oc;
close_out oc;
let r = input_line ic in
ignore (Unix.close_process (ic, oc));
r
let setup_logging () =
Logs.set_reporter @@ Logs.format_reporter ();
Logs.set_level ~all:true (Some Logs.Debug)
let setup_upload server : unit =
Server.add_route_handler_stream ~meth:`POST server
Route.(exact "upload" @/ return)
(fun req ->
let (`boundary boundary) =
match MFD.parse_content_type req.headers with
| Some b -> b
| None -> Response.fail_raise ~code:400 "no boundary found"
in
let st = MFD.create ~boundary req.body in
let tbl = Hashtbl.create 16 in
let cur = ref "" in
let cur_kind = ref "" in
let buf = Buffer.create 16 in
let rec loop () =
match MFD.next st with
| End_of_input ->
if !cur <> "" then
Hashtbl.add tbl !cur (!cur_kind, Buffer.contents buf)
| Part headers ->
if !cur <> "" then
Hashtbl.add tbl !cur (!cur_kind, Buffer.contents buf);
(match MFD.Content_disposition.parse headers with
| Some { kind; name = Some name; filename = _ } ->
cur := name;
cur_kind := kind;
Buffer.clear buf;
loop ()
| _ -> Response.fail_raise ~code:400 "content disposition missing")
| Read sl ->
Buffer.add_subbytes buf sl.bytes sl.off sl.len;
loop ()
in
loop ();
let open Tiny_httpd_html in
let data =
Hashtbl.fold
(fun name (kind, data) acc ->
Printf.sprintf "%S (kind: %S): %S" name kind data :: acc)
tbl []
in
let html =
body []
[
pre []
[ txt (Printf.sprintf "{\n%s\n}" @@ String.concat "\n" data) ];
]
in
Response.make_string ~code:201 @@ Ok (to_string_top html))
let () =
let port_ = ref 8080 in
let j = ref 32 in
let addr = ref "127.0.0.1" in
Arg.parse
(Arg.align
[
"--port", Arg.Set_int port_, " set port";
"-p", Arg.Set_int port_, " set port";
"--debug", Arg.Unit setup_logging, " enable debug";
"-j", Arg.Set_int j, " maximum number of connections";
"--addr", Arg.Set_string addr, " binding address";
])
(fun _ -> raise (Arg.Bad ""))
"echo [option]*";
let ev = new Lwt_engine.libev () in
Lwt_engine.set ev;
Lwt_main.run @@ Lwt_direct.run
@@ fun () ->
let server =
Tiny_httpd_lwt.create ~addr:!addr ~port:!port_ ~max_connections:!j ()
|> Lwt_direct.await
in
Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(16 * 1024) server;
let m_stats, get_stats = middleware_stat () in
Server.add_middleware server ~stage:(`Stage 1) m_stats;
(* say hello *)
Server.add_route_handler ~meth:`GET server
Route.(exact "hello" @/ string @/ return)
(fun name _req -> Response.make_string (Ok ("hello " ^ name ^ "!\n")));
(* compressed file access *)
Server.add_route_handler ~meth:`GET server
Route.(exact "zcat" @/ string_urlencoded @/ return)
(fun path _req ->
let ic = open_in path in
let str = IO.Input.of_in_channel ic in
let mime_type =
try
let p = Unix.open_process_in (Printf.sprintf "file -i -b %S" path) in
try
let s = [ "Content-Type", String.trim (input_line p) ] in
ignore @@ Unix.close_process_in p;
s
with _ ->
ignore @@ Unix.close_process_in p;
[]
with _ -> []
in
Response.make_stream ~headers:mime_type (Ok str));
(* echo request *)
Server.add_route_handler server
Route.(exact "echo" @/ return)
(fun req ->
let q =
Request.query req
|> List.map (fun (k, v) -> Printf.sprintf "%S = %S" k v)
|> String.concat ";"
in
Response.make_string
(Ok (Format.asprintf "echo:@ %a@ (query: %s)@." Request.pp req q)));
(* file upload *)
Server.add_route_handler_stream ~meth:`PUT server
Route.(exact "upload" @/ string @/ return)
(fun path req ->
Log.debug (fun k ->
k "start upload %S, headers:\n%s\n\n%!" path
(Format.asprintf "%a" Headers.pp (Request.headers req)));
try
let oc = open_out @@ "/tmp/" ^ path in
IO.Input.to_chan oc req.Request.body;
flush oc;
Response.make_string (Ok "uploaded file")
with e ->
Response.fail ~code:500 "couldn't upload file: %s"
(Printexc.to_string e));
(* protected by login *)
Server.add_route_handler server
Route.(exact "protected" @/ return)
(fun req ->
let ok =
match Request.get_header req "authorization" with
| Some v ->
Log.debug (fun k -> k "authenticate with %S" v);
v = "Basic " ^ base64 "user:foobar"
| None -> false
in
if ok then (
(* FIXME: a logout link *)
let s =
"<p>hello, this is super secret!</p><a href=\"/logout\">log out</a>"
in
Response.make_string (Ok s)
) else (
let headers =
Headers.(empty |> set "www-authenticate" "basic realm=\"echo\"")
in
Response.fail ~code:401 ~headers "invalid"
));
(* logout *)
Server.add_route_handler server
Route.(exact "logout" @/ return)
(fun _req -> Response.fail ~code:401 "logged out");
(* stats *)
Server.add_route_handler server
Route.(exact "stats" @/ return)
(fun _req ->
let stats = get_stats () in
Response.make_string @@ Ok stats);
Server.add_route_handler server
Route.(exact "alice" @/ return)
(fun _req -> Response.make_string (Ok alice_text));
Server.add_route_handler ~meth:`HEAD server
Route.(exact "head" @/ return)
(fun _req ->
Response.make_void ~code:200 ~headers:[ "x-hello", "world" ] ());
(* VFS *)
Tiny_httpd.Dir.add_vfs server
~config:
(Tiny_httpd.Dir.config ~download:true
~dir_behavior:Tiny_httpd.Dir.Index_or_lists ())
~vfs:Vfs.vfs ~prefix:"vfs";
setup_upload server;
(* main page *)
Server.add_route_handler server
Route.(return)
(fun _req ->
let open Tiny_httpd_html in
let h =
html []
[
head [] [ title [] [ txt "index of echo" ] ];
body []
[
h3 [] [ txt "welcome!" ];
p [] [ b [] [ txt "endpoints are:" ] ];
ul []
[
li [] [ pre [] [ txt "/hello/:name (GET)" ] ];
li []
[
pre []
[
a [ A.href "/echo/" ] [ txt "echo" ];
txt " echo back query";
];
];
li []
[ pre [] [ txt "/upload/:path (PUT) to upload a file" ] ];
li []
[
pre []
[
txt
"/zcat/:path (GET) to download a file (deflate \
transfer-encoding)";
];
];
li []
[
pre []
[
a [ A.href "/stats/" ] [ txt "/stats/" ];
txt " (GET) to access statistics";
];
];
li []
[
pre []
[
a [ A.href "/vfs/" ] [ txt "/vfs" ];
txt " (GET) to access a VFS embedded in the binary";
];
];
li []
[
pre []
[
a [ A.href "/protected" ] [ txt "/protected" ];
txt
" (GET) to see a protected page (login: user, \
password: foobar)";
];
];
li []
[
pre []
[
a [ A.href "/logout" ] [ txt "/logout" ];
txt " (POST) to log out";
];
];
li []
[
form
[
A.action "/upload";
A.enctype "multipart/form-data";
A.target "_self";
A.method_ "POST";
]
[
label [] [ txt "my beautiful form" ];
input [ A.type_ "file"; A.name "file1" ];
input [ A.type_ "file"; A.name "file2" ];
input
[
A.type_ "text";
A.name "a";
A.placeholder "text A";
];
input
[
A.type_ "text";
A.name "b";
A.placeholder "text B";
];
input [ A.type_ "submit" ];
];
];
];
];
]
in
let s = to_string_top h in
Response.make_string ~headers:[ "content-type", "text/html" ] @@ Ok s);
Printf.printf "listening on http://%s:%d\n%!" (Server.addr server)
(Server.port server);
match Server.run server with
| Ok () -> ()
| Error e -> raise e

6
src/lwt/dune Normal file
View file

@ -0,0 +1,6 @@
(library
(name tiny_httpd_lwt)
(public_name tiny_httpd_lwt)
(enabled_if (>= %{ocaml_version} 5.0))
(libraries tiny_httpd lwt lwt.unix))

104
src/lwt/lwt_direct.ml Normal file
View file

@ -0,0 +1,104 @@
(* Direct-style wrapper for Lwt code
The implementation of the direct-style wrapper relies on ocaml5's effect
system capturing continuations and adding them as a callback to some lwt
promises. *)
(* part 1: tasks, getting the scheduler to call them *)
let tasks : (unit -> unit) Queue.t = Queue.create ()
let[@inline] push_task f : unit = Queue.push f tasks
let absolute_max_number_of_steps =
(* TODO 6.0: what's a good number here? should it be customisable? *)
10_000
let run_all_tasks () : unit =
let n_processed = ref 0 in
let max_number_of_steps =
min absolute_max_number_of_steps (2 * Queue.length tasks)
in
while (not (Queue.is_empty tasks)) && !n_processed < max_number_of_steps do
let t = Queue.pop tasks in
incr n_processed;
try t ()
with exn ->
(* TODO 6.0: change async_exception handler to accept a backtrace, pass it
here and at the other use site. *)
(* TODO 6.0: this and other try-with: respect exception-filter *)
!Lwt.async_exception_hook exn
done;
(* In the case where there are no promises ready for wakeup, the scheduler's
engine will pause until some IO completes. There might never be completed
IO, depending on the program structure and the state of the world. If this
happens and the queue is not empty, we add a [pause] so that the engine has
something to wakeup for so that the rest of the queue can be processed. *)
if (not (Queue.is_empty tasks)) && Lwt.paused_count () = 0 then
ignore (Lwt.pause () : unit Lwt.t)
let setup_hooks =
let already_done = ref false in
fun () ->
if not !already_done then (
already_done := true;
(* TODO 6.0: assess whether we should have both hooks or just one (which
one). Tempted to say we should only have the enter hook. *)
let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in
let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in
()
)
(* part 2: effects, performing them *)
type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t | Yield : unit Effect.t
let await (fut : 'a Lwt.t) : 'a =
match Lwt.state fut with
| Lwt.Return x -> x
| Lwt.Fail exn -> raise exn
| Lwt.Sleep -> Effect.perform (Await fut)
let yield () : unit = Effect.perform Yield
(* part 3: handling effects *)
let handler : _ Effect.Deep.effect_handler =
let effc : type b.
b Effect.t -> ((b, unit) Effect.Deep.continuation -> 'a) option = function
| Yield -> Some (fun k -> push_task (fun () -> Effect.Deep.continue k ()))
| Await fut ->
Some
(fun k ->
Lwt.on_any fut
(fun res -> push_task (fun () -> Effect.Deep.continue k res))
(fun exn -> push_task (fun () -> Effect.Deep.discontinue k exn)))
| _ -> None
in
{ effc }
(* part 4: putting it all together: running tasks *)
let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () :
unit =
let run_f_and_set_res () =
match f () with
| res -> Lwt.wakeup promise res
| exception exc -> Lwt.wakeup_exn promise exc
in
Effect.Deep.try_with run_f_and_set_res () handler
let spawn f : _ Lwt.t =
setup_hooks ();
let lwt, resolve = Lwt.wait () in
push_task (run_inside_effect_handler_and_resolve_ resolve f);
lwt
(* part 4 (encore): running a task in the background *)
let run_inside_effect_handler_in_the_background_ f () : unit =
let run_f () : unit = try f () with exn -> !Lwt.async_exception_hook exn in
Effect.Deep.try_with run_f () handler
let spawn_in_the_background f : unit =
setup_hooks ();
push_task (run_inside_effect_handler_in_the_background_ f)

67
src/lwt/lwt_direct.mli Normal file
View file

@ -0,0 +1,67 @@
(** Direct style control flow for Lwt.
This module relies on OCaml 5's
{{:https://ocaml.org/manual/5.3/effects.html} effect handlers}. Instead of
chaining promises using {!Lwt.bind} and {!Lwt.map} and other combinators, it
becomes possible to start lightweight "tasks" using
[Lwt_direct.spawn (fun () -> ...)]. The body of such a task is written in
direct-style code, using OCaml's standard control flow structures such as
loops, higher-order functions, exception handlers, [match], etc.
Interactions with the rest of lwt can be done using [await], for example:
{[
Lwt_direct.spawn (fun () ->
let continue = ref true in
while !continue do
match Lwt_io.read_line in_channel |> Lwt_direct.await with
| exception End_of_file -> continue := false
| line ->
let uppercase_line = String.uppercase_ascii line in
Lwt_io.write_line out_channel uppercase_line |> Lwt_direct.await
done)
]}
This code snippet contains a simple "task" that repeatedly reads a line from
a [Lwt_io] channel, uppercases it, and writes the uppercase version to
another channel.
This task is itself a [unit Lwt.t], which is resolved when the function
returns. It is possible to use {!Lwt_direct.run_in_the_background} to ignore
the result and let the task run in the background instead. *)
val spawn : (unit -> 'a) -> 'a Lwt.t
(** [spawn f] runs the function [f ()] in a task within the [Lwt_unix] event
loop. [f ()] can create [Lwt] promises and use {!await} to wait for them.
Like any promise in Lwt, [f ()] can starve the event loop if it runs long
computations without yielding to the event loop.
When [f ()] terminates (successfully or not), the promise [spawn f] is
resolved with [f ()]'s result, or the exception raised by [f ()].
The promise returned by [spawn f] is not cancellable. Canceling it will have
no effect. *)
val spawn_in_the_background : (unit -> unit) -> unit
(** [spawn_in_the_background f] is similar to [ignore (spawn f)]. The
computation [f()] runs in the background in the event loop and returns no
result. If [f()] raises an exception, {!Lwt.async_exception_hook} is called.
*)
val yield : unit -> unit
(** Yield to the event loop.
Calling [yield] outside of {!spawn} or {!run_in_the_background} will raise
an exception, crash your program, or otherwise cause errors. It is a
programming error to do so. *)
val await : 'a Lwt.t -> 'a
(** [await prom] returns the result of [prom], or re-raises the exception with
which [prom] failed if it failed. If [prom] is not resolved yet,
[await prom] will suspend the current task and resume it when [prom] is
resolved.
Calling [await] outside of {!spawn} or {!run_in_the_background} will raise
an exception, crash your program, or otherwise cause errors. It is a
programming error to do so. *)

220
src/lwt/tiny_httpd_lwt.ml Normal file
View file

@ -0,0 +1,220 @@
module IO = Tiny_httpd.IO
module H = Tiny_httpd.Server
module Pool = Tiny_httpd.Pool
module Slice = IO.Slice
module Log = Tiny_httpd.Log
module Lwt_direct = Lwt_direct
let spf = Printf.sprintf
let ( let@ ) = ( @@ )
type 'a with_args =
?addr:string ->
?port:int ->
?unix_sock:string ->
?max_connections:int ->
?max_buf_pool_size:int ->
?buf_size:int ->
'a
let get_max_connection_ ?(max_connections = 64) () : int =
let max_connections = max 4 max_connections in
max_connections
let default_buf_size = 16 * 1024
let show_sockaddr = function
| Unix.ADDR_UNIX s -> s
| Unix.ADDR_INET (addr, port) ->
spf "%s:%d" (Unix.string_of_inet_addr addr) port
let ic_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) :
IO.Input.t =
object
inherit Iostream.In_buf.t_from_refill ~bytes ()
method private refill (sl : Slice.t) =
assert (sl.len = 0);
sl.off <- 0;
let n =
Lwt_unix.read fd sl.bytes 0 (Bytes.length sl.bytes) |> Lwt_direct.await
in
sl.len <- n
method close () =
decr num_open;
if !num_open <= 0 then Lwt_unix.close fd |> Lwt_direct.await
end
let oc_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) :
IO.Output.t =
object
inherit IO.Output.t_from_output ~bytes ()
(* method flush () : unit = Lwt_io.flush oc |> Lwt_direct.await *)
method private output_underlying buf i len =
let i = ref i in
let len = ref len in
while !len > 0 do
let n = Lwt_unix.write fd buf !i !len |> Lwt_direct.await in
i := !i + n;
len := !len - n
done
method private close_underlying () =
decr num_open;
if !num_open <= 0 then Lwt_unix.close fd |> Lwt_direct.await
end
let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
?(buf_size = default_buf_size) () : (module H.IO_BACKEND) =
let _buf_pool =
Pool.create ?max_size:max_buf_pool_size
~mk_item:(fun () -> Lwt_bytes.create buf_size)
()
in
let addr, port, (sockaddr : Unix.sockaddr) =
match addr, port, unix_sock with
| _, _, Some s -> Printf.sprintf "unix:%s" s, 0, Unix.ADDR_UNIX s
| addr, port, None ->
let addr = Option.value ~default:"127.0.0.1" addr in
let sockaddr, port =
match Lwt_unix.getaddrinfo addr "" [] |> Lwt_direct.await, port with
| { Unix.ai_addr = ADDR_INET (h, _); _ } :: _, None ->
let p = 8080 in
Unix.ADDR_INET (h, p), p
| { Unix.ai_addr = ADDR_INET (h, _); _ } :: _, Some p ->
Unix.ADDR_INET (h, p), p
| _ ->
failwith @@ Printf.sprintf "Could not parse TCP address from %S" addr
in
addr, port, sockaddr
in
let module M = struct
let init_addr () = addr
let init_port () = port
let get_time_s () = Unix.gettimeofday ()
let max_connections = get_max_connection_ ?max_connections ()
let pool_size =
match max_buf_pool_size with
| Some n -> n
| None -> min 4096 (max_connections * 2)
let tcp_server () : IO.TCP_server.builder =
{
IO.TCP_server.serve =
(fun ~after_init ~handle () : unit ->
let server_done, set_server_done = Lwt.wait () in
let running = Atomic.make true in
let active_conns = Atomic.make 0 in
(* a pool of buffers, to reduce allocations *)
let buf_pool =
Pool.create ~max_size:pool_size
~clear:(fun buf -> Bytes.fill buf 0 (Bytes.length buf) '\x00')
~mk_item:(fun () -> Bytes.create buf_size)
()
in
(* Eio.Switch.on_release sw (fun () -> Atomic.set running false); *)
let port = ref port in
let server_loop : unit Lwt.t =
let@ () = Lwt_direct.spawn in
let backlog = max_connections in
let sock =
Lwt_unix.socket ~cloexec:true
(Unix.domain_of_sockaddr sockaddr)
Unix.SOCK_STREAM 0
in
Lwt_unix.setsockopt sock Unix.TCP_NODELAY true;
Lwt_unix.setsockopt_optint sock Unix.SO_LINGER None;
Lwt_unix.setsockopt sock Unix.SO_REUSEADDR true;
Lwt_unix.setsockopt sock Unix.SO_REUSEPORT true;
Lwt_unix.bind sock sockaddr |> Lwt_direct.await;
Lwt_unix.listen sock backlog;
let cleanup () = Lwt_unix.close sock |> Lwt_direct.await in
(* recover real port, if any *)
(match Unix.getsockname (Lwt_unix.unix_file_descr sock) with
| Unix.ADDR_INET (_, p) -> port := p
| _ -> ());
let handle_client client_addr fd : unit =
Atomic.incr active_conns;
Lwt_direct.spawn_in_the_background @@ fun () ->
let@ buf_ic = Pool.with_resource buf_pool in
let@ buf_oc = Pool.with_resource buf_pool in
(* close FD when both ends are closed *)
let num_open = ref 2 in
let ic = ic_of_fd ~num_open ~bytes:buf_ic fd in
let oc = oc_of_fd ~num_open ~bytes:buf_oc fd in
let cleanup ~shutdown () =
Log.debug (fun k ->
k "Tiny_httpd_lwt: client handler returned");
Atomic.decr active_conns;
if shutdown then (
try Lwt_unix.shutdown fd SHUTDOWN_ALL with _ -> ()
);
ic#close ();
oc#close ()
in
try
handle.handle ~client_addr ic oc;
cleanup ~shutdown:true ()
with exn ->
let bt = Printexc.get_raw_backtrace () in
cleanup ~shutdown:false ();
Log.error (fun k ->
k "Client handler for %s failed with %s\n%s"
(show_sockaddr client_addr)
(Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt))
in
try
while Atomic.get running do
let fd, addr = Lwt_unix.accept sock |> Lwt_direct.await in
handle_client addr fd
done;
cleanup ()
with exn ->
let bt = Printexc.get_raw_backtrace () in
cleanup ();
Printexc.raise_with_backtrace exn bt
in
let tcp_server : IO.TCP_server.t =
{
running = (fun () -> Atomic.get running);
stop =
(fun () ->
Atomic.set running false;
Lwt.wakeup_later set_server_done ();
Lwt_direct.await server_loop);
endpoint = (fun () -> addr, !port);
active_connections = (fun () -> Atomic.get active_conns);
}
in
after_init tcp_server;
Lwt_direct.await server_done);
}
end in
(module M)
let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ?buf_size
?middlewares () : H.t Lwt.t =
let@ () = Lwt_direct.spawn in
let backend =
io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections
?buf_size ()
in
H.create_from ?buf_size ?middlewares ~backend ()

View file

@ -0,0 +1,28 @@
(** Lwt backend for Tiny_httpd.
This only works on OCaml 5 because it uses effect handlers to use Lwt in
direct style.
{b NOTE}: this is very experimental and will absolutely change over time,
@since NEXT_RELEASE *)
module Lwt_direct = Lwt_direct
type 'a with_args =
?addr:string ->
?port:int ->
?unix_sock:string ->
?max_connections:int ->
?max_buf_pool_size:int ->
?buf_size:int ->
'a
val io_backend : (unit -> (module Tiny_httpd.Server.IO_BACKEND)) with_args
(** Create a server *)
val create :
(?middlewares:([ `Encoding | `Stage of int ] * Tiny_httpd.Middleware.t) list ->
unit ->
Tiny_httpd.Server.t Lwt.t)
with_args
(** Create a server *)

32
tiny_httpd_lwt.opam Normal file
View file

@ -0,0 +1,32 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.19"
synopsis: "Tiny_httpd backend based on lwt.unix for OCaml 5"
maintainer: ["c-cube"]
authors: ["c-cube"]
license: "MIT"
homepage: "https://github.com/c-cube/tiny_httpd/"
bug-reports: "https://github.com/c-cube/tiny_httpd/issues"
depends: [
"dune" {>= "3.2"}
"tiny_httpd" {= version}
"lwt" {>= "5.0"}
"base-unix"
"logs" {with-test}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/c-cube/tiny_httpd.git"