breaking: drop connections when maximum number of active conns is reached

also provide the statistics for that
This commit is contained in:
Simon Cruanes 2023-06-20 23:57:40 -04:00
parent b927f98490
commit 9d536e1716
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
4 changed files with 41 additions and 45 deletions

View file

@ -88,6 +88,8 @@ module TCP_server = struct
(** Endpoint we listen on. This can only be called from within [serve]. *)
active_connections: unit -> int;
(** Number of connections currently active *)
dropped_connections: unit -> int;
(** Number of connections dropped so far *)
running: unit -> bool; (** Is the server currently running? *)
stop: unit -> unit;
(** Ask the server to stop. This might not take effect immediately. *)

View file

@ -1,3 +1,5 @@
module Atomic = Tiny_httpd_atomic_
type buf = Tiny_httpd_buf.t
type byte_stream = Tiny_httpd_stream.t
@ -488,33 +490,6 @@ module Response = struct
IO.Out_channel.flush oc
end
(* semaphore, for limiting concurrency. *)
module Sem_ = struct
type t = { mutable n: int; max: int; mutex: Mutex.t; cond: Condition.t }
let create n =
if n <= 0 then invalid_arg "Semaphore.create";
{ n; max = n; mutex = Mutex.create (); cond = Condition.create () }
let acquire m t =
Mutex.lock t.mutex;
while t.n < m do
Condition.wait t.cond t.mutex
done;
assert (t.n >= m);
t.n <- t.n - m;
Condition.broadcast t.cond;
Mutex.unlock t.mutex
let release m t =
Mutex.lock t.mutex;
t.n <- t.n + m;
Condition.broadcast t.cond;
Mutex.unlock t.mutex
let num_acquired t = t.max - t.n
end
module Route = struct
type path = string list (* split on '/' *)
@ -682,6 +657,11 @@ let active_connections (self : t) =
| None -> 0
| Some s -> s.active_connections ()
let dropped_connections (self : t) =
match self.tcp_server with
| None -> 0
| Some s -> s.dropped_connections ()
let add_middleware ~stage self m =
let stage =
match stage with
@ -850,8 +830,8 @@ module Unix_tcp_server_ = struct
addr: string;
port: int;
max_connections: int;
sem_max_connections: Sem_.t;
(** semaphore to restrict the number of active concurrent connections *)
active_conns: int Atomic.t; (** Number of currently active connections *)
dropped_conns: int Atomic.t;
mutable sock: Unix.file_descr option; (** Socket *)
new_thread: (unit -> unit) -> unit;
timeout: float;
@ -897,8 +877,8 @@ module Unix_tcp_server_ = struct
{
IO.TCP_server.stop = (fun () -> self.running <- false);
running = (fun () -> self.running);
active_connections =
(fun () -> Sem_.num_acquired self.sem_max_connections - 1);
active_connections = (fun () -> Atomic.get self.active_conns);
dropped_connections = (fun () -> Atomic.get self.dropped_conns);
endpoint =
(fun () ->
let addr, port = get_addr_ sock in
@ -927,19 +907,25 @@ module Unix_tcp_server_ = struct
while self.running do
(* limit concurrency *)
Sem_.acquire 1 self.sem_max_connections;
try
let client_sock, _ = Unix.accept sock in
self.new_thread (fun () ->
try
handle_client_unix_ client_sock;
Sem_.release 1 self.sem_max_connections
with e ->
(try Unix.close client_sock with _ -> ());
Sem_.release 1 self.sem_max_connections;
raise e)
with e ->
Sem_.release 1 self.sem_max_connections;
match Unix.accept sock with
| client_sock, _ ->
if
Atomic.fetch_and_add self.active_conns 1 >= self.max_connections
then (
(* drop connection *)
Atomic.decr self.active_conns;
Atomic.incr self.dropped_conns;
try Unix.close client_sock with _ -> ()
) else
self.new_thread (fun () ->
try
handle_client_unix_ client_sock;
Atomic.decr self.active_conns
with e ->
(try Unix.close client_sock with _ -> ());
Atomic.decr self.active_conns;
raise e)
| exception e ->
_debug (fun k ->
k "Unix.accept or Thread.create raised an exception: %s"
(Printexc.to_string e))
@ -961,7 +947,8 @@ let create ?(masksigpipe = true) ?max_connections ?(timeout = 0.0) ?buf_size
port;
sock;
max_connections;
sem_max_connections = Sem_.create max_connections;
active_conns = Atomic.make 0;
dropped_conns = Atomic.make 0;
masksigpipe;
timeout;
}

View file

@ -449,6 +449,12 @@ val port : t -> int
val active_connections : t -> int
(** Number of currently active connections. *)
val dropped_connections : t -> int
(** Number of connections that were dropped because the maximum number of
active connections was reached.
See {!create} with the [max_connections] parameter.
@since NEXT_RELEASE *)
val add_decode_request_cb :
t ->
(unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) ->

View file

@ -137,6 +137,7 @@ let io_backend ?(addr = "127.0.0.1") ?(port = 8080) ?max_connections
(* TODO: find the real port *)
addr, port);
active_connections = (fun () -> Atomic.get active_conns);
dropped_connections = (fun () -> 0);
}
in