From 0193a2c0d190a192666e48606cfc3194f78b86ee Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 2 Jul 2025 22:44:57 -0400 Subject: [PATCH] wip: tiny_httpd_lwt --- src/lwt/task.ml | 4 +- src/lwt/tiny_httpd_lwt.ml | 85 ++++++++++++++++++++++++++------------ src/lwt/tiny_httpd_lwt.mli | 4 +- 3 files changed, 64 insertions(+), 29 deletions(-) diff --git a/src/lwt/task.ml b/src/lwt/task.ml index d1e615f8..2902022f 100644 --- a/src/lwt/task.ml +++ b/src/lwt/task.ml @@ -62,7 +62,7 @@ let run_inside_effect_handler_ (type a) (promise : a Lwt.u) f () : unit = let r = f () in res := Ok r with exn -> res := Error exn); - Lwt.wakeup_later_result promise !res + Lwt.wakeup_result promise !res in ED.try_with run_f_and_set_res () handler @@ -72,3 +72,5 @@ let run f : _ Lwt.t = lwt let run_async f : unit = ignore (run f : unit Lwt.t) + +(* TODO: yield, use that in loops? *) diff --git a/src/lwt/tiny_httpd_lwt.ml b/src/lwt/tiny_httpd_lwt.ml index 909c177c..a5dbe7b7 100644 --- a/src/lwt/tiny_httpd_lwt.ml +++ b/src/lwt/tiny_httpd_lwt.ml @@ -3,6 +3,7 @@ module H = Tiny_httpd.Server module Pool = Tiny_httpd.Pool module Slice = IO.Slice module Log = Tiny_httpd.Log +module Task = Task let spf = Printf.sprintf let ( let@ ) = ( @@ ) @@ -20,41 +21,57 @@ let get_max_connection_ ?(max_connections = 64) () : int = let max_connections = max 4 max_connections in max_connections -let buf_size = 16 * 1024 +let default_buf_size = 4 * 1024 let show_sockaddr = function | Unix.ADDR_UNIX s -> s | Unix.ADDR_INET (addr, port) -> spf "%s:%d" (Unix.string_of_inet_addr addr) port -let ic_of_channel (ic : Lwt_io.input_channel) : IO.Input.t = +let ic_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) : + IO.Input.t = + let lwt_bytes = Lwt_bytes.create (Bytes.length bytes) in object - inherit Iostream.In_buf.t_from_refill () + inherit Iostream.In_buf.t_from_refill ~bytes () method private refill (sl : Slice.t) = assert (sl.len = 0); let n = - Lwt_io.read_into ic sl.bytes 0 (Bytes.length sl.bytes) |> Task.await + Lwt_bytes.read fd lwt_bytes 0 (Lwt_bytes.length lwt_bytes) |> Task.await in + Lwt_bytes.blit_to_bytes lwt_bytes 0 bytes 0 n; sl.len <- n - method close () = Lwt_io.close ic |> Task.await + method close () = + decr num_open; + if !num_open <= 0 then Lwt_unix.close fd |> Task.await end -let oc_of_channel (oc : Lwt_io.output_channel) : IO.Output.t = +let oc_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) : + IO.Output.t = + let lwt_bytes = Lwt_bytes.create (Bytes.length bytes) in object - method flush () : unit = Lwt_io.flush oc |> Task.await + inherit IO.Output.t_from_output ~bytes () + (* method flush () : unit = Lwt_io.flush oc |> Task.await *) - method output buf i len = - Lwt_io.write_from_exactly oc buf i len |> Task.await + method private output_underlying buf i len = + Lwt_bytes.blit_from_bytes buf i lwt_bytes 0 len; + let i = ref 0 in + let len = ref len in + while !len > 0 do + let n = Lwt_bytes.write fd lwt_bytes !i !len |> Task.await in + i := !i + n; + len := !len - n + done - method output_char c = Lwt_io.write_char oc c |> Task.await - method close () = Lwt_io.close oc |> Task.await + method private close_underlying () = + decr num_open; + if !num_open <= 0 then Lwt_unix.close fd |> Task.await end let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size - ?(buf_size = buf_size) () : (module H.IO_BACKEND) = - let buf_pool = + ?(buf_size = default_buf_size) () : (module H.IO_BACKEND) = + let _buf_pool = Pool.create ?max_size:max_buf_pool_size ~mk_item:(fun () -> Lwt_bytes.create buf_size) () @@ -93,6 +110,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size { IO.TCP_server.serve = (fun ~after_init ~handle () : unit -> + let server_done, set_server_done = Lwt.wait () in let running = Atomic.make true in let active_conns = Atomic.make 0 in @@ -107,6 +125,10 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size (Unix.domain_of_sockaddr sockaddr) Unix.SOCK_STREAM 0 in + Lwt_unix.setsockopt sock Unix.TCP_NODELAY true; + Lwt_unix.setsockopt_optint sock Unix.SO_LINGER None; + Lwt_unix.setsockopt sock Unix.SO_REUSEADDR true; + Lwt_unix.setsockopt sock Unix.SO_REUSEPORT true; Lwt_unix.bind sock sockaddr |> Task.await; Lwt_unix.listen sock backlog; @@ -118,24 +140,30 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let handle_client client_addr fd : unit = Atomic.incr active_conns; let@ () = Task.run_async in - let@ () = - Fun.protect ~finally:(fun () -> - Log.debug (fun k -> - k "Tiny_httpd_lwt: client handler returned"); - Atomic.decr active_conns) + + let cleanup () = + Log.debug (fun k -> + k "Tiny_httpd_lwt: client handler returned"); + Atomic.decr active_conns in + let buf_ic = Bytes.create buf_size in + let buf_oc = Bytes.create buf_size in + (* let@ buf_ic = Pool.with_resource buf_pool in let@ buf_oc = Pool.with_resource buf_pool in - let ic = - ic_of_channel @@ Lwt_io.of_fd ~mode:Input ~buffer:buf_ic fd - in - let oc = - oc_of_channel @@ Lwt_io.of_fd ~mode:Output ~buffer:buf_ic fd - in - try handle.handle ~client_addr ic oc +*) + + (* close FD when both ends are closed *) + let num_open = ref 2 in + let ic = ic_of_fd ~num_open ~bytes:buf_ic fd in + let oc = oc_of_fd ~num_open ~bytes:buf_oc fd in + try + handle.handle ~client_addr ic oc; + cleanup () with exn -> let bt = Printexc.get_raw_backtrace () in + cleanup (); Log.error (fun k -> k "Client handler for %s failed with %s\n%s" (show_sockaddr client_addr) @@ -155,19 +183,22 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size stop = (fun () -> Atomic.set running false; + Lwt.wakeup_later set_server_done (); Task.await server_loop); endpoint = (fun () -> addr, !port); active_connections = (fun () -> Atomic.get active_conns); } in - after_init tcp_server); + after_init tcp_server; + Task.await server_done); } end in (module M) let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ?buf_size - ?middlewares () : H.t = + ?middlewares () : H.t Lwt.t = + let@ () = Task.run in let backend = io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections ?buf_size () diff --git a/src/lwt/tiny_httpd_lwt.mli b/src/lwt/tiny_httpd_lwt.mli index 56bd6868..4b0fabf5 100644 --- a/src/lwt/tiny_httpd_lwt.mli +++ b/src/lwt/tiny_httpd_lwt.mli @@ -6,6 +6,8 @@ {b NOTE}: this is very experimental and will absolutely change over time, @since NEXT_RELEASE *) +module Task = Task + type 'a with_args = ?addr:string -> ?port:int -> @@ -21,6 +23,6 @@ val io_backend : (unit -> (module Tiny_httpd.Server.IO_BACKEND)) with_args val create : (?middlewares:([ `Encoding | `Stage of int ] * Tiny_httpd.Middleware.t) list -> unit -> - Tiny_httpd.Server.t) + Tiny_httpd.Server.t Lwt.t) with_args (** Create a server *)