tiny_httpd: use picos semaphore; tweak pool size, buf size

This commit is contained in:
Simon Cruanes 2025-05-01 13:22:15 -04:00
parent ff870e7fa7
commit 34a1cc1769
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 31 additions and 53 deletions

View file

@ -40,7 +40,8 @@
ocaml ocaml
dune dune
nanoev nanoev
picos (picos (>= 0.6))
picos_std
(tiny_httpd (>= 0.17))) (tiny_httpd (>= 0.17)))
(tags (nanoev http))) (tags (nanoev http)))

View file

@ -11,7 +11,8 @@ depends: [
"ocaml" "ocaml"
"dune" {>= "2.7"} "dune" {>= "2.7"}
"nanoev" "nanoev"
"picos" "picos" {>= "0.6"}
"picos_std"
"tiny_httpd" {>= "0.17"} "tiny_httpd" {>= "0.17"}
"odoc" {with-doc} "odoc" {with-doc}
] ]

View file

@ -6,5 +6,6 @@
picos picos
(re_export nanoev) (re_export nanoev)
nanoev.picos nanoev.picos
picos_std.sync
(re_export iostream) (re_export iostream)
(re_export tiny_httpd))) (re_export tiny_httpd)))

View file

@ -5,45 +5,8 @@ module Slice = Iostream.Slice
module Pool = TH.Pool module Pool = TH.Pool
module Buf = TH.Buf module Buf = TH.Buf
let unwrap_ = function module Sem_ = Picos_std_sync.Semaphore.Counting
| None -> ()
| Some (exn, bt) -> Printexc.raise_with_backtrace exn bt
(** Non blocking semaphore *) (** Non blocking semaphore *)
module Sem_ = struct
type t = {
mutable n: int;
max: int;
waiting: Picos.Trigger.t Queue.t;
mutex: Mutex.t;
}
let create n =
if n <= 0 then invalid_arg "Semaphore.create";
{ n; max = n; mutex = Mutex.create (); waiting = Queue.create () }
let acquire self =
Mutex.lock self.mutex;
while self.n = 0 do
let tr = Picos.Trigger.create () in
Queue.push tr self.waiting;
Mutex.unlock self.mutex;
let res = Picos.Trigger.await tr in
unwrap_ res;
Mutex.lock self.mutex
done;
assert (self.n > 0);
self.n <- self.n - 1;
Mutex.unlock self.mutex
let release self =
Mutex.lock self.mutex;
self.n <- self.n + 1;
Option.iter Picos.Trigger.signal (Queue.take_opt self.waiting);
Mutex.unlock self.mutex
let num_acquired self = self.max - self.n
end
module Out = struct module Out = struct
open Iostream open Iostream
@ -147,7 +110,7 @@ module Unix_tcp_server_ = struct
new_thread: (unit -> unit) -> unit; new_thread: (unit -> unit) -> unit;
timeout: float; timeout: float;
masksigpipe: bool; masksigpipe: bool;
mutable running: bool; (* TODO: use an atomic? *) running: bool Atomic.t;
} }
let shutdown_silent_ fd = let shutdown_silent_ fd =
@ -183,7 +146,7 @@ module Unix_tcp_server_ = struct
let inet_addr = Unix.inet_addr_of_string self.addr in let inet_addr = Unix.inet_addr_of_string self.addr in
Unix.setsockopt sock Unix.SO_REUSEADDR true; Unix.setsockopt sock Unix.SO_REUSEADDR true;
Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port)); Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port));
let n_listen = 2 * self.max_connections in let n_listen = self.max_connections in
Unix.listen sock n_listen Unix.listen sock n_listen
); );
@ -191,10 +154,16 @@ module Unix_tcp_server_ = struct
let tcp_server = let tcp_server =
{ {
TH.IO.TCP_server.stop = (fun () -> self.running <- false); TH.IO.TCP_server.stop =
running = (fun () -> self.running); (fun () ->
Atomic.set self.running false;
(* close accept socket so the main loop will return *)
try Unix.close sock with _ -> ());
running = (fun () -> Atomic.get self.running);
active_connections = active_connections =
(fun () -> Sem_.num_acquired self.sem_max_connections); (fun () ->
self.max_connections - Sem_.get_value self.sem_max_connections);
endpoint = endpoint =
(fun () -> (fun () ->
let addr, port = get_addr_ sock in let addr, port = get_addr_ sock in
@ -233,7 +202,7 @@ module Unix_tcp_server_ = struct
in in
Unix.set_nonblock sock; Unix.set_nonblock sock;
while self.running do while Atomic.get self.running do
match EV.accept sock with match EV.accept sock with
| client_sock, client_addr -> | client_sock, client_addr ->
(* limit concurrency *) (* limit concurrency *)
@ -272,7 +241,7 @@ module Unix_tcp_server_ = struct
done; done;
(* Wait for all threads to be done: this only works if all threads are done. *) (* Wait for all threads to be done: this only works if all threads are done. *)
Unix.close sock; (try Unix.close sock with _ -> ());
(* TODO? *) (* TODO? *)
(* Sem_.acquire self.sem_max_connections.max self.sem_max_connections; *) (* Sem_.acquire self.sem_max_connections.max self.sem_max_connections; *)
()); ());
@ -290,11 +259,16 @@ open struct
slice.len <- 0 slice.len <- 0
end end
let create ?(masksigpipe = not Sys.win32) ?max_connections ?(timeout = 0.0) let create ?(masksigpipe = not Sys.win32) ?max_connections ?max_buf_pool_size
?buf_size ?(get_time_s = Unix.gettimeofday) ?(addr = "127.0.0.1") ?(timeout = 0.0) ?buf_size ?(get_time_s = Unix.gettimeofday)
?(port = 8080) ?sock ?middlewares ~new_thread () : TH.Server.t = ?(addr = "127.0.0.1") ?(port = 8080) ?sock ?middlewares ~new_thread () :
TH.Server.t =
let max_connections = get_max_connection_ ?max_connections () in let max_connections = get_max_connection_ ?max_connections () in
let max_pool_size = min 4096 max_connections * 2 in let max_pool_size =
match max_buf_pool_size with
| None -> min 4096 max_connections * 2
| Some m -> m
in
let server = let server =
{ {
Unix_tcp_server_.addr; Unix_tcp_server_.addr;
@ -309,11 +283,11 @@ let create ?(masksigpipe = not Sys.win32) ?max_connections ?(timeout = 0.0)
(let buf_size = Option.value buf_size ~default:4096 in (let buf_size = Option.value buf_size ~default:4096 in
fun () -> Slice.create buf_size) fun () -> Slice.create buf_size)
(); ();
running = true; running = Atomic.make true;
port; port;
sock; sock;
max_connections; max_connections;
sem_max_connections = Sem_.create max_connections; sem_max_connections = Sem_.make max_connections;
masksigpipe; masksigpipe;
timeout; timeout;
} }

View file

@ -3,6 +3,7 @@ module TH = Tiny_httpd_core
val create : val create :
?masksigpipe:bool -> ?masksigpipe:bool ->
?max_connections:int -> ?max_connections:int ->
?max_buf_pool_size:int ->
?timeout:float -> ?timeout:float ->
?buf_size:int -> ?buf_size:int ->
?get_time_s:(unit -> float) -> ?get_time_s:(unit -> float) ->