Compare commits

..

3 commits

Author SHA1 Message Date
Simon Cruanes
78552c9c04
Merge 5caef14945 into 8a8aadfbb0 2025-07-17 02:58:40 +00:00
Simon Cruanes
5caef14945
buffer pool for lwt server 2025-07-16 22:58:21 -04:00
Simon Cruanes
76cefc0991
cleanup for lwt 2025-07-16 22:53:26 -04:00

View file

@ -111,6 +111,14 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
let running = Atomic.make true in let running = Atomic.make true in
let active_conns = Atomic.make 0 in let active_conns = Atomic.make 0 in
(* a pool of buffers, to reduce allocations *)
let buf_pool =
Pool.create ~max_size:pool_size
~clear:(fun buf -> Bytes.fill buf 0 (Bytes.length buf) '\x00')
~mk_item:(fun () -> Bytes.create buf_size)
()
in
(* Eio.Switch.on_release sw (fun () -> Atomic.set running false); *) (* Eio.Switch.on_release sw (fun () -> Atomic.set running false); *)
let port = ref port in let port = ref port in
@ -129,6 +137,8 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
Lwt_unix.bind sock sockaddr |> Lwt_direct.await; Lwt_unix.bind sock sockaddr |> Lwt_direct.await;
Lwt_unix.listen sock backlog; Lwt_unix.listen sock backlog;
let cleanup () = Lwt_unix.close sock |> Lwt_direct.await in
(* recover real port, if any *) (* recover real port, if any *)
(match Unix.getsockname (Lwt_unix.unix_file_descr sock) with (match Unix.getsockname (Lwt_unix.unix_file_descr sock) with
| Unix.ADDR_INET (_, p) -> port := p | Unix.ADDR_INET (_, p) -> port := p
@ -137,29 +147,31 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
let handle_client client_addr fd : unit = let handle_client client_addr fd : unit =
Atomic.incr active_conns; Atomic.incr active_conns;
Lwt_direct.run_in_the_background @@ fun () -> Lwt_direct.run_in_the_background @@ fun () ->
let cleanup () =
Log.debug (fun k ->
k "Tiny_httpd_lwt: client handler returned");
Atomic.decr active_conns
in
let buf_ic = Bytes.create buf_size in
let buf_oc = Bytes.create buf_size in
(*
let@ buf_ic = Pool.with_resource buf_pool in let@ buf_ic = Pool.with_resource buf_pool in
let@ buf_oc = Pool.with_resource buf_pool in let@ buf_oc = Pool.with_resource buf_pool in
*)
(* close FD when both ends are closed *) (* close FD when both ends are closed *)
let num_open = ref 2 in let num_open = ref 2 in
let ic = ic_of_fd ~num_open ~bytes:buf_ic fd in let ic = ic_of_fd ~num_open ~bytes:buf_ic fd in
let oc = oc_of_fd ~num_open ~bytes:buf_oc fd in let oc = oc_of_fd ~num_open ~bytes:buf_oc fd in
let cleanup ~shutdown () =
Log.debug (fun k ->
k "Tiny_httpd_lwt: client handler returned");
Atomic.decr active_conns;
if shutdown then (
try Lwt_unix.shutdown fd SHUTDOWN_ALL with _ -> ()
);
ic#close ();
oc#close ()
in
try try
handle.handle ~client_addr ic oc; handle.handle ~client_addr ic oc;
cleanup () cleanup ~shutdown:true ()
with exn -> with exn ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
cleanup (); cleanup ~shutdown:false ();
Log.error (fun k -> Log.error (fun k ->
k "Client handler for %s failed with %s\n%s" k "Client handler for %s failed with %s\n%s"
(show_sockaddr client_addr) (show_sockaddr client_addr)
@ -167,10 +179,16 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
(Printexc.raw_backtrace_to_string bt)) (Printexc.raw_backtrace_to_string bt))
in in
while Atomic.get running do try
let fd, addr = Lwt_unix.accept sock |> Lwt_direct.await in while Atomic.get running do
handle_client addr fd let fd, addr = Lwt_unix.accept sock |> Lwt_direct.await in
done handle_client addr fd
done;
cleanup ()
with exn ->
let bt = Printexc.get_raw_backtrace () in
cleanup ();
Printexc.raise_with_backtrace exn bt
in in
let tcp_server : IO.TCP_server.t = let tcp_server : IO.TCP_server.t =