mirror of
https://github.com/c-cube/tiny_httpd.git
synced 2026-03-07 21:37:57 -05:00
eio: fix semaphore acquisition, graceful stop, and time source
- Acquire semaphore BEFORE spawning handler fiber: replace Eio.Net.accept_fork with manual accept + Semaphore.acquire + Fiber.fork so we bound the number of in-flight fibers rather than spawning unlimited fibers that all block on the semaphore. - Graceful stop: remove Eio.Switch.fail sw Exit from stop(), just set running to false so existing handlers can complete naturally instead of being cancelled immediately. - Replace Unix.gettimeofday with Eio.Time.now clock to use the Eio clock abstraction instead of direct Unix calls.
This commit is contained in:
parent
97c4e4dc08
commit
3b631b7e4c
1 changed files with 22 additions and 17 deletions
|
|
@ -129,7 +129,8 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
|
|||
let module M = struct
|
||||
let init_addr () = addr
|
||||
let init_port () = port
|
||||
let get_time_s () = Unix.gettimeofday ()
|
||||
let clock = Eio.Stdenv.clock stdenv
|
||||
let get_time_s () = Eio.Time.now clock
|
||||
let max_connections = get_max_connection_ ?max_connections ()
|
||||
|
||||
let pool_size =
|
||||
|
|
@ -172,8 +173,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
|
|||
running = (fun () -> Atomic.get running);
|
||||
stop =
|
||||
(fun () ->
|
||||
Atomic.set running false;
|
||||
Eio.Switch.fail sw Exit);
|
||||
Atomic.set running false);
|
||||
endpoint = (fun () -> actual_addr, actual_port);
|
||||
active_connections = (fun () -> Atomic.get active_conns);
|
||||
}
|
||||
|
|
@ -182,28 +182,33 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
|
|||
after_init tcp_server;
|
||||
|
||||
while Atomic.get running do
|
||||
Eio.Net.accept_fork ~sw
|
||||
~on_error:(fun exn ->
|
||||
Log.error (fun k ->
|
||||
k "error in client handler: %s" (Printexc.to_string exn)))
|
||||
sock
|
||||
(fun flow client_addr ->
|
||||
match Eio.Net.accept ~sw sock with
|
||||
| exception (Eio.Cancel.Cancelled _ | Eio.Io _) when not (Atomic.get running) ->
|
||||
(* Socket closed or switch cancelled during shutdown; exit loop *)
|
||||
()
|
||||
| conn, client_addr ->
|
||||
(* Acquire semaphore BEFORE spawning a fiber so we
|
||||
bound the number of in-flight fibers. *)
|
||||
Eio.Semaphore.acquire sem;
|
||||
Eio_unix.Fd.use_exn "setsockopt"
|
||||
(Eio_unix.Net.fd flow) (fun fd ->
|
||||
Unix.setsockopt fd Unix.TCP_NODELAY true);
|
||||
Atomic.incr active_conns;
|
||||
Eio.Fiber.fork ~sw (fun () ->
|
||||
let@ () =
|
||||
Fun.protect ~finally:(fun () ->
|
||||
Log.debug (fun k ->
|
||||
k "Tiny_httpd_eio: client handler returned");
|
||||
Atomic.decr active_conns;
|
||||
Eio.Semaphore.release sem)
|
||||
Eio.Semaphore.release sem;
|
||||
Eio.Flow.close conn)
|
||||
in
|
||||
(try
|
||||
Eio_unix.Fd.use_exn "setsockopt"
|
||||
(Eio_unix.Net.fd conn) (fun fd ->
|
||||
Unix.setsockopt fd Unix.TCP_NODELAY true)
|
||||
with Unix.Unix_error _ -> ());
|
||||
Atomic.incr active_conns;
|
||||
let ic_closed = ref false in
|
||||
let oc_closed = ref false in
|
||||
let ic = ic_of_flow ~closed:ic_closed ~buf_pool:cstruct_pool flow in
|
||||
let oc = oc_of_flow ~closed:oc_closed ~buf_pool:cstruct_pool flow in
|
||||
let ic = ic_of_flow ~closed:ic_closed ~buf_pool:cstruct_pool conn in
|
||||
let oc = oc_of_flow ~closed:oc_closed ~buf_pool:cstruct_pool conn in
|
||||
|
||||
Log.debug (fun k ->
|
||||
k "handling client on %a…" Eio.Net.Sockaddr.pp client_addr);
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue