Merge branch 'eio-fixes' into simon/use-eio-round2

* eio-fixes:
  eio: add 60s shutdown backstop, protect Flow.close from raising
  eio: fix semaphore acquisition, graceful stop, and time source
This commit is contained in:
Simon Cruanes 2026-02-15 16:28:54 -05:00
commit 8cf59ffed1
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -133,7 +133,8 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
let module M = struct let module M = struct
let init_addr () = addr let init_addr () = addr
let init_port () = port let init_port () = port
let get_time_s () = Unix.gettimeofday () let clock = Eio.Stdenv.clock stdenv
let get_time_s () = Eio.Time.now clock
let max_connections = get_max_connection_ ?max_connections () let max_connections = get_max_connection_ ?max_connections ()
let pool_size = let pool_size =
@ -177,7 +178,12 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
stop = stop =
(fun () -> (fun () ->
Atomic.set running false; Atomic.set running false;
Eio.Switch.fail sw Exit); (* Backstop: fail the switch after 60s if handlers don't complete *)
Eio.Fiber.fork_daemon ~sw (fun () ->
Eio.Time.sleep clock 60.0;
if Eio.Switch.get_error sw |> Option.is_none then
Eio.Switch.fail sw Exit;
`Stop_daemon));
endpoint = (fun () -> actual_addr, actual_port); endpoint = (fun () -> actual_addr, actual_port);
active_connections = (fun () -> Atomic.get active_conns); active_connections = (fun () -> Atomic.get active_conns);
} }
@ -186,43 +192,50 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
after_init tcp_server; after_init tcp_server;
while Atomic.get running do while Atomic.get running do
Eio.Net.accept_fork ~sw match Eio.Net.accept ~sw sock with
~on_error:(fun exn -> | exception (Eio.Cancel.Cancelled _ | Eio.Io _)
Log.error (fun k -> when not (Atomic.get running) ->
k "error in client handler: %s" (Printexc.to_string exn))) (* Socket closed or switch cancelled during shutdown; exit loop *)
sock ()
(fun flow client_addr -> | conn, client_addr ->
Eio.Semaphore.acquire sem; (* Acquire semaphore BEFORE spawning a fiber so we
Eio_unix.Fd.use_exn "setsockopt" (Eio_unix.Net.fd flow) bound the number of in-flight fibers. *)
(fun fd -> Unix.setsockopt fd Unix.TCP_NODELAY true); Eio.Semaphore.acquire sem;
Atomic.incr active_conns; Eio.Fiber.fork ~sw (fun () ->
let@ () = let@ () =
Fun.protect ~finally:(fun () -> Fun.protect ~finally:(fun () ->
Log.debug (fun k -> Log.debug (fun k ->
k "Tiny_httpd_eio: client handler returned"); k "Tiny_httpd_eio: client handler returned");
Atomic.decr active_conns; Atomic.decr active_conns;
Eio.Semaphore.release sem) Eio.Semaphore.release sem;
in try Eio.Flow.close conn with Eio.Io _ -> ())
let ic_closed = ref false in in
let oc_closed = ref false in (try
let ic = Eio_unix.Fd.use_exn "setsockopt" (Eio_unix.Net.fd conn)
ic_of_flow ~closed:ic_closed ~buf_pool:cstruct_pool flow (fun fd -> Unix.setsockopt fd Unix.TCP_NODELAY true)
in with Unix.Unix_error _ -> ());
let oc = Atomic.incr active_conns;
oc_of_flow ~closed:oc_closed ~buf_pool:cstruct_pool flow let ic_closed = ref false in
in let oc_closed = ref false in
let ic =
ic_of_flow ~closed:ic_closed ~buf_pool:cstruct_pool conn
in
let oc =
oc_of_flow ~closed:oc_closed ~buf_pool:cstruct_pool conn
in
Log.debug (fun k -> Log.debug (fun k ->
k "handling client on %a…" Eio.Net.Sockaddr.pp client_addr); k "handling client on %a…" Eio.Net.Sockaddr.pp
let client_addr_unix = eio_sock_addr_to_unix client_addr in client_addr);
try handle.handle ~client_addr:client_addr_unix ic oc let client_addr_unix = eio_sock_addr_to_unix client_addr in
with exn -> try handle.handle ~client_addr:client_addr_unix ic oc
let bt = Printexc.get_raw_backtrace () in with exn ->
Log.error (fun k -> let bt = Printexc.get_raw_backtrace () in
k "Client handler for %a failed with %s\n%s" Log.error (fun k ->
Eio.Net.Sockaddr.pp client_addr k "Client handler for %a failed with %s\n%s"
(Printexc.to_string exn) Eio.Net.Sockaddr.pp client_addr
(Printexc.raw_backtrace_to_string bt))) (Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt)))
done); done);
} }
end in end in