From 28f7ddd74fdc1fea356407b08d2a8a2ce3cee75a Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sun, 15 Feb 2026 20:30:13 +0000 Subject: [PATCH] fix(eio): address review feedback - Add closed flag to ic_of_flow/oc_of_flow to prevent double-release of pool cstructs and double-shutdown - Enforce max_connections with Eio.Semaphore to limit concurrent connections - Fix port 0 detection using Eio.Net.listening_addr to return actual port - Use pool_size for cstruct pool max_size (was computed but unused) - Set TCP_NODELAY on accepted connections for low latency --- src/eio/tiny_httpd_eio.ml | 53 ++++++++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/src/eio/tiny_httpd_eio.ml b/src/eio/tiny_httpd_eio.ml index e58d1b6b..61060a97 100644 --- a/src/eio/tiny_httpd_eio.ml +++ b/src/eio/tiny_httpd_eio.ml @@ -31,8 +31,10 @@ let eio_sock_addr_to_unix (a : Eio.Net.Sockaddr.stream) : Unix.sockaddr = | `Tcp (h, p) -> Unix.ADDR_INET (eio_ipaddr_to_unix h, p) | `Unix s -> Unix.ADDR_UNIX s -let ic_of_flow ~buf_pool:ic_pool (flow : _ Eio.Net.stream_socket) : IO.Input.t = +let ic_of_flow ~closed ~buf_pool:ic_pool (flow : _ Eio.Net.stream_socket) : + IO.Input.t = let cstruct = Pool.Raw.acquire ic_pool in + let sent_shutdown = ref false in object inherit Iostream.In_buf.t_from_refill () @@ -52,15 +54,20 @@ let ic_of_flow ~buf_pool:ic_pool (flow : _ Eio.Net.stream_socket) : IO.Input.t = sl.len <- n method close () = - Pool.Raw.release ic_pool cstruct; - Eio.Flow.shutdown flow `Receive + if not !closed then ( + closed := true; + Pool.Raw.release ic_pool cstruct); + if not !sent_shutdown then ( + sent_shutdown := true; + Eio.Flow.shutdown flow `Receive) end -let oc_of_flow ~buf_pool:oc_pool (flow : _ Eio.Net.stream_socket) : IO.Output.t - = +let oc_of_flow ~closed ~buf_pool:oc_pool (flow : _ Eio.Net.stream_socket) : + IO.Output.t = (* write buffer *) let wbuf : Cstruct.t = Pool.Raw.acquire oc_pool in let offset = ref 0 in + let sent_shutdown = ref false in object (self) method flush () : unit = @@ -91,8 +98,12 @@ let oc_of_flow ~buf_pool:oc_pool (flow : _ Eio.Net.stream_socket) : IO.Output.t if !offset = Cstruct.length wbuf then self#flush () method close () = - Pool.Raw.release oc_pool wbuf; - Eio.Flow.shutdown flow `Send + if not !closed then ( + closed := true; + Pool.Raw.release oc_pool wbuf); + if not !sent_shutdown then ( + sent_shutdown := true; + Eio.Flow.shutdown flow `Send) end let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size @@ -127,7 +138,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size | None -> min 4096 (max_connections * 2) let cstruct_pool = - Pool.create ~max_size:max_connections + Pool.create ~max_size:pool_size ~mk_item:(fun () -> Cstruct.create buf_size) () @@ -137,6 +148,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size (fun ~after_init ~handle () : unit -> let running = Atomic.make true in let active_conns = Atomic.make 0 in + let sem = Eio.Semaphore.make max_connections in Eio.Switch.on_release sw (fun () -> Atomic.set running false); let net = Eio.Stdenv.net stdenv in @@ -148,6 +160,13 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size sockaddr in + (* Resolve actual address/port (important for port 0) *) + let actual_addr, actual_port = + match Eio.Net.listening_addr sock with + | `Tcp (_, p) -> addr, p + | `Unix s -> Printf.sprintf "unix:%s" s, 0 + in + let tcp_server : IO.TCP_server.t = { running = (fun () -> Atomic.get running); @@ -155,10 +174,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size (fun () -> Atomic.set running false; Eio.Switch.fail sw Exit); - endpoint = - (fun () -> - (* TODO: find the real port *) - addr, port); + endpoint = (fun () -> actual_addr, actual_port); active_connections = (fun () -> Atomic.get active_conns); } in @@ -172,15 +188,22 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size k "error in client handler: %s" (Printexc.to_string exn))) sock (fun flow client_addr -> + Eio.Semaphore.acquire sem; + Eio_unix.Fd.use_exn "setsockopt" + (Eio_unix.Net.fd flow) (fun fd -> + Unix.setsockopt fd Unix.TCP_NODELAY true); Atomic.incr active_conns; let@ () = Fun.protect ~finally:(fun () -> Log.debug (fun k -> k "Tiny_httpd_eio: client handler returned"); - Atomic.decr active_conns) + Atomic.decr active_conns; + Eio.Semaphore.release sem) in - let ic = ic_of_flow ~buf_pool:cstruct_pool flow in - let oc = oc_of_flow ~buf_pool:cstruct_pool flow in + let ic_closed = ref false in + let oc_closed = ref false in + let ic = ic_of_flow ~closed:ic_closed ~buf_pool:cstruct_pool flow in + let oc = oc_of_flow ~closed:oc_closed ~buf_pool:cstruct_pool flow in Log.debug (fun k -> k "handling client on %a…" Eio.Net.Sockaddr.pp client_addr);