wip: tiny_httpd_lwt

This commit is contained in:
Simon Cruanes 2025-07-02 22:44:57 -04:00
parent cd0407973f
commit 0193a2c0d1
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
3 changed files with 64 additions and 29 deletions

View file

@ -62,7 +62,7 @@ let run_inside_effect_handler_ (type a) (promise : a Lwt.u) f () : unit =
let r = f () in
res := Ok r
with exn -> res := Error exn);
Lwt.wakeup_later_result promise !res
Lwt.wakeup_result promise !res
in
ED.try_with run_f_and_set_res () handler
@ -72,3 +72,5 @@ let run f : _ Lwt.t =
lwt
let run_async f : unit = ignore (run f : unit Lwt.t)
(* TODO: yield, use that in loops? *)

View file

@ -3,6 +3,7 @@ module H = Tiny_httpd.Server
module Pool = Tiny_httpd.Pool
module Slice = IO.Slice
module Log = Tiny_httpd.Log
module Task = Task
let spf = Printf.sprintf
let ( let@ ) = ( @@ )
@ -20,41 +21,57 @@ let get_max_connection_ ?(max_connections = 64) () : int =
let max_connections = max 4 max_connections in
max_connections
let buf_size = 16 * 1024
let default_buf_size = 4 * 1024
let show_sockaddr = function
| Unix.ADDR_UNIX s -> s
| Unix.ADDR_INET (addr, port) ->
spf "%s:%d" (Unix.string_of_inet_addr addr) port
let ic_of_channel (ic : Lwt_io.input_channel) : IO.Input.t =
let ic_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) :
IO.Input.t =
let lwt_bytes = Lwt_bytes.create (Bytes.length bytes) in
object
inherit Iostream.In_buf.t_from_refill ()
inherit Iostream.In_buf.t_from_refill ~bytes ()
method private refill (sl : Slice.t) =
assert (sl.len = 0);
let n =
Lwt_io.read_into ic sl.bytes 0 (Bytes.length sl.bytes) |> Task.await
Lwt_bytes.read fd lwt_bytes 0 (Lwt_bytes.length lwt_bytes) |> Task.await
in
Lwt_bytes.blit_to_bytes lwt_bytes 0 bytes 0 n;
sl.len <- n
method close () = Lwt_io.close ic |> Task.await
method close () =
decr num_open;
if !num_open <= 0 then Lwt_unix.close fd |> Task.await
end
let oc_of_channel (oc : Lwt_io.output_channel) : IO.Output.t =
let oc_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) :
IO.Output.t =
let lwt_bytes = Lwt_bytes.create (Bytes.length bytes) in
object
method flush () : unit = Lwt_io.flush oc |> Task.await
inherit IO.Output.t_from_output ~bytes ()
(* method flush () : unit = Lwt_io.flush oc |> Task.await *)
method output buf i len =
Lwt_io.write_from_exactly oc buf i len |> Task.await
method private output_underlying buf i len =
Lwt_bytes.blit_from_bytes buf i lwt_bytes 0 len;
let i = ref 0 in
let len = ref len in
while !len > 0 do
let n = Lwt_bytes.write fd lwt_bytes !i !len |> Task.await in
i := !i + n;
len := !len - n
done
method output_char c = Lwt_io.write_char oc c |> Task.await
method close () = Lwt_io.close oc |> Task.await
method private close_underlying () =
decr num_open;
if !num_open <= 0 then Lwt_unix.close fd |> Task.await
end
let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
?(buf_size = buf_size) () : (module H.IO_BACKEND) =
let buf_pool =
?(buf_size = default_buf_size) () : (module H.IO_BACKEND) =
let _buf_pool =
Pool.create ?max_size:max_buf_pool_size
~mk_item:(fun () -> Lwt_bytes.create buf_size)
()
@ -93,6 +110,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
{
IO.TCP_server.serve =
(fun ~after_init ~handle () : unit ->
let server_done, set_server_done = Lwt.wait () in
let running = Atomic.make true in
let active_conns = Atomic.make 0 in
@ -107,6 +125,10 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
(Unix.domain_of_sockaddr sockaddr)
Unix.SOCK_STREAM 0
in
Lwt_unix.setsockopt sock Unix.TCP_NODELAY true;
Lwt_unix.setsockopt_optint sock Unix.SO_LINGER None;
Lwt_unix.setsockopt sock Unix.SO_REUSEADDR true;
Lwt_unix.setsockopt sock Unix.SO_REUSEPORT true;
Lwt_unix.bind sock sockaddr |> Task.await;
Lwt_unix.listen sock backlog;
@ -118,24 +140,30 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
let handle_client client_addr fd : unit =
Atomic.incr active_conns;
let@ () = Task.run_async in
let@ () =
Fun.protect ~finally:(fun () ->
Log.debug (fun k ->
k "Tiny_httpd_lwt: client handler returned");
Atomic.decr active_conns)
let cleanup () =
Log.debug (fun k ->
k "Tiny_httpd_lwt: client handler returned");
Atomic.decr active_conns
in
let buf_ic = Bytes.create buf_size in
let buf_oc = Bytes.create buf_size in
(*
let@ buf_ic = Pool.with_resource buf_pool in
let@ buf_oc = Pool.with_resource buf_pool in
let ic =
ic_of_channel @@ Lwt_io.of_fd ~mode:Input ~buffer:buf_ic fd
in
let oc =
oc_of_channel @@ Lwt_io.of_fd ~mode:Output ~buffer:buf_ic fd
in
try handle.handle ~client_addr ic oc
*)
(* close FD when both ends are closed *)
let num_open = ref 2 in
let ic = ic_of_fd ~num_open ~bytes:buf_ic fd in
let oc = oc_of_fd ~num_open ~bytes:buf_oc fd in
try
handle.handle ~client_addr ic oc;
cleanup ()
with exn ->
let bt = Printexc.get_raw_backtrace () in
cleanup ();
Log.error (fun k ->
k "Client handler for %s failed with %s\n%s"
(show_sockaddr client_addr)
@ -155,19 +183,22 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
stop =
(fun () ->
Atomic.set running false;
Lwt.wakeup_later set_server_done ();
Task.await server_loop);
endpoint = (fun () -> addr, !port);
active_connections = (fun () -> Atomic.get active_conns);
}
in
after_init tcp_server);
after_init tcp_server;
Task.await server_done);
}
end in
(module M)
let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ?buf_size
?middlewares () : H.t =
?middlewares () : H.t Lwt.t =
let@ () = Task.run in
let backend =
io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections
?buf_size ()

View file

@ -6,6 +6,8 @@
{b NOTE}: this is very experimental and will absolutely change over time,
@since NEXT_RELEASE *)
module Task = Task
type 'a with_args =
?addr:string ->
?port:int ->
@ -21,6 +23,6 @@ val io_backend : (unit -> (module Tiny_httpd.Server.IO_BACKEND)) with_args
val create :
(?middlewares:([ `Encoding | `Stage of int ] * Tiny_httpd.Middleware.t) list ->
unit ->
Tiny_httpd.Server.t)
Tiny_httpd.Server.t Lwt.t)
with_args
(** Create a server *)