fix(eio): address review feedback

- Add closed flag to ic_of_flow/oc_of_flow to prevent double-release of
  pool cstructs and double-shutdown
- Enforce max_connections with Eio.Semaphore to limit concurrent connections
- Fix port 0 detection using Eio.Net.listening_addr to return actual port
- Use pool_size for cstruct pool max_size (was computed but unused)
- Set TCP_NODELAY on accepted connections for low latency
This commit is contained in:
Simon Cruanes 2026-02-15 20:30:13 +00:00
parent f0aadc0307
commit 28f7ddd74f

View file

@ -31,8 +31,10 @@ let eio_sock_addr_to_unix (a : Eio.Net.Sockaddr.stream) : Unix.sockaddr =
| `Tcp (h, p) -> Unix.ADDR_INET (eio_ipaddr_to_unix h, p) | `Tcp (h, p) -> Unix.ADDR_INET (eio_ipaddr_to_unix h, p)
| `Unix s -> Unix.ADDR_UNIX s | `Unix s -> Unix.ADDR_UNIX s
let ic_of_flow ~buf_pool:ic_pool (flow : _ Eio.Net.stream_socket) : IO.Input.t = let ic_of_flow ~closed ~buf_pool:ic_pool (flow : _ Eio.Net.stream_socket) :
IO.Input.t =
let cstruct = Pool.Raw.acquire ic_pool in let cstruct = Pool.Raw.acquire ic_pool in
let sent_shutdown = ref false in
object object
inherit Iostream.In_buf.t_from_refill () inherit Iostream.In_buf.t_from_refill ()
@ -52,15 +54,20 @@ let ic_of_flow ~buf_pool:ic_pool (flow : _ Eio.Net.stream_socket) : IO.Input.t =
sl.len <- n sl.len <- n
method close () = method close () =
Pool.Raw.release ic_pool cstruct; if not !closed then (
Eio.Flow.shutdown flow `Receive closed := true;
Pool.Raw.release ic_pool cstruct);
if not !sent_shutdown then (
sent_shutdown := true;
Eio.Flow.shutdown flow `Receive)
end end
let oc_of_flow ~buf_pool:oc_pool (flow : _ Eio.Net.stream_socket) : IO.Output.t let oc_of_flow ~closed ~buf_pool:oc_pool (flow : _ Eio.Net.stream_socket) :
= IO.Output.t =
(* write buffer *) (* write buffer *)
let wbuf : Cstruct.t = Pool.Raw.acquire oc_pool in let wbuf : Cstruct.t = Pool.Raw.acquire oc_pool in
let offset = ref 0 in let offset = ref 0 in
let sent_shutdown = ref false in
object (self) object (self)
method flush () : unit = method flush () : unit =
@ -91,8 +98,12 @@ let oc_of_flow ~buf_pool:oc_pool (flow : _ Eio.Net.stream_socket) : IO.Output.t
if !offset = Cstruct.length wbuf then self#flush () if !offset = Cstruct.length wbuf then self#flush ()
method close () = method close () =
Pool.Raw.release oc_pool wbuf; if not !closed then (
Eio.Flow.shutdown flow `Send closed := true;
Pool.Raw.release oc_pool wbuf);
if not !sent_shutdown then (
sent_shutdown := true;
Eio.Flow.shutdown flow `Send)
end end
let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
@ -127,7 +138,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
| None -> min 4096 (max_connections * 2) | None -> min 4096 (max_connections * 2)
let cstruct_pool = let cstruct_pool =
Pool.create ~max_size:max_connections Pool.create ~max_size:pool_size
~mk_item:(fun () -> Cstruct.create buf_size) ~mk_item:(fun () -> Cstruct.create buf_size)
() ()
@ -137,6 +148,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
(fun ~after_init ~handle () : unit -> (fun ~after_init ~handle () : unit ->
let running = Atomic.make true in let running = Atomic.make true in
let active_conns = Atomic.make 0 in let active_conns = Atomic.make 0 in
let sem = Eio.Semaphore.make max_connections in
Eio.Switch.on_release sw (fun () -> Atomic.set running false); Eio.Switch.on_release sw (fun () -> Atomic.set running false);
let net = Eio.Stdenv.net stdenv in let net = Eio.Stdenv.net stdenv in
@ -148,6 +160,13 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
sockaddr sockaddr
in in
(* Resolve actual address/port (important for port 0) *)
let actual_addr, actual_port =
match Eio.Net.listening_addr sock with
| `Tcp (_, p) -> addr, p
| `Unix s -> Printf.sprintf "unix:%s" s, 0
in
let tcp_server : IO.TCP_server.t = let tcp_server : IO.TCP_server.t =
{ {
running = (fun () -> Atomic.get running); running = (fun () -> Atomic.get running);
@ -155,10 +174,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
(fun () -> (fun () ->
Atomic.set running false; Atomic.set running false;
Eio.Switch.fail sw Exit); Eio.Switch.fail sw Exit);
endpoint = endpoint = (fun () -> actual_addr, actual_port);
(fun () ->
(* TODO: find the real port *)
addr, port);
active_connections = (fun () -> Atomic.get active_conns); active_connections = (fun () -> Atomic.get active_conns);
} }
in in
@ -172,15 +188,22 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
k "error in client handler: %s" (Printexc.to_string exn))) k "error in client handler: %s" (Printexc.to_string exn)))
sock sock
(fun flow client_addr -> (fun flow client_addr ->
Eio.Semaphore.acquire sem;
Eio_unix.Fd.use_exn "setsockopt"
(Eio_unix.Net.fd flow) (fun fd ->
Unix.setsockopt fd Unix.TCP_NODELAY true);
Atomic.incr active_conns; Atomic.incr active_conns;
let@ () = let@ () =
Fun.protect ~finally:(fun () -> Fun.protect ~finally:(fun () ->
Log.debug (fun k -> Log.debug (fun k ->
k "Tiny_httpd_eio: client handler returned"); k "Tiny_httpd_eio: client handler returned");
Atomic.decr active_conns) Atomic.decr active_conns;
Eio.Semaphore.release sem)
in in
let ic = ic_of_flow ~buf_pool:cstruct_pool flow in let ic_closed = ref false in
let oc = oc_of_flow ~buf_pool:cstruct_pool flow in let oc_closed = ref false in
let ic = ic_of_flow ~closed:ic_closed ~buf_pool:cstruct_pool flow in
let oc = oc_of_flow ~closed:oc_closed ~buf_pool:cstruct_pool flow in
Log.debug (fun k -> Log.debug (fun k ->
k "handling client on %a…" Eio.Net.Sockaddr.pp client_addr); k "handling client on %a…" Eio.Net.Sockaddr.pp client_addr);