From 9d536e1716355e94f0b3cc490edc8daa1c2b92c8 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 20 Jun 2023 23:57:40 -0400 Subject: [PATCH] breaking: drop connections when maximum number of active conns is reached also provide the statistics for that --- src/Tiny_httpd_io.ml | 2 + src/Tiny_httpd_server.ml | 77 ++++++++++++++++----------------------- src/Tiny_httpd_server.mli | 6 +++ src/eio/tiny_httpd_eio.ml | 1 + 4 files changed, 41 insertions(+), 45 deletions(-) diff --git a/src/Tiny_httpd_io.ml b/src/Tiny_httpd_io.ml index 749f53d9..96796b6a 100644 --- a/src/Tiny_httpd_io.ml +++ b/src/Tiny_httpd_io.ml @@ -88,6 +88,8 @@ module TCP_server = struct (** Endpoint we listen on. This can only be called from within [serve]. *) active_connections: unit -> int; (** Number of connections currently active *) + dropped_connections: unit -> int; + (** Number of connections dropped so far *) running: unit -> bool; (** Is the server currently running? *) stop: unit -> unit; (** Ask the server to stop. This might not take effect immediately. *) diff --git a/src/Tiny_httpd_server.ml b/src/Tiny_httpd_server.ml index f4f1f07a..71697d4f 100644 --- a/src/Tiny_httpd_server.ml +++ b/src/Tiny_httpd_server.ml @@ -1,3 +1,5 @@ +module Atomic = Tiny_httpd_atomic_ + type buf = Tiny_httpd_buf.t type byte_stream = Tiny_httpd_stream.t @@ -488,33 +490,6 @@ module Response = struct IO.Out_channel.flush oc end -(* semaphore, for limiting concurrency. *) -module Sem_ = struct - type t = { mutable n: int; max: int; mutex: Mutex.t; cond: Condition.t } - - let create n = - if n <= 0 then invalid_arg "Semaphore.create"; - { n; max = n; mutex = Mutex.create (); cond = Condition.create () } - - let acquire m t = - Mutex.lock t.mutex; - while t.n < m do - Condition.wait t.cond t.mutex - done; - assert (t.n >= m); - t.n <- t.n - m; - Condition.broadcast t.cond; - Mutex.unlock t.mutex - - let release m t = - Mutex.lock t.mutex; - t.n <- t.n + m; - Condition.broadcast t.cond; - Mutex.unlock t.mutex - - let num_acquired t = t.max - t.n -end - module Route = struct type path = string list (* split on '/' *) @@ -682,6 +657,11 @@ let active_connections (self : t) = | None -> 0 | Some s -> s.active_connections () +let dropped_connections (self : t) = + match self.tcp_server with + | None -> 0 + | Some s -> s.dropped_connections () + let add_middleware ~stage self m = let stage = match stage with @@ -850,8 +830,8 @@ module Unix_tcp_server_ = struct addr: string; port: int; max_connections: int; - sem_max_connections: Sem_.t; - (** semaphore to restrict the number of active concurrent connections *) + active_conns: int Atomic.t; (** Number of currently active connections *) + dropped_conns: int Atomic.t; mutable sock: Unix.file_descr option; (** Socket *) new_thread: (unit -> unit) -> unit; timeout: float; @@ -897,8 +877,8 @@ module Unix_tcp_server_ = struct { IO.TCP_server.stop = (fun () -> self.running <- false); running = (fun () -> self.running); - active_connections = - (fun () -> Sem_.num_acquired self.sem_max_connections - 1); + active_connections = (fun () -> Atomic.get self.active_conns); + dropped_connections = (fun () -> Atomic.get self.dropped_conns); endpoint = (fun () -> let addr, port = get_addr_ sock in @@ -927,19 +907,25 @@ module Unix_tcp_server_ = struct while self.running do (* limit concurrency *) - Sem_.acquire 1 self.sem_max_connections; - try - let client_sock, _ = Unix.accept sock in - self.new_thread (fun () -> - try - handle_client_unix_ client_sock; - Sem_.release 1 self.sem_max_connections - with e -> - (try Unix.close client_sock with _ -> ()); - Sem_.release 1 self.sem_max_connections; - raise e) - with e -> - Sem_.release 1 self.sem_max_connections; + match Unix.accept sock with + | client_sock, _ -> + if + Atomic.fetch_and_add self.active_conns 1 >= self.max_connections + then ( + (* drop connection *) + Atomic.decr self.active_conns; + Atomic.incr self.dropped_conns; + try Unix.close client_sock with _ -> () + ) else + self.new_thread (fun () -> + try + handle_client_unix_ client_sock; + Atomic.decr self.active_conns + with e -> + (try Unix.close client_sock with _ -> ()); + Atomic.decr self.active_conns; + raise e) + | exception e -> _debug (fun k -> k "Unix.accept or Thread.create raised an exception: %s" (Printexc.to_string e)) @@ -961,7 +947,8 @@ let create ?(masksigpipe = true) ?max_connections ?(timeout = 0.0) ?buf_size port; sock; max_connections; - sem_max_connections = Sem_.create max_connections; + active_conns = Atomic.make 0; + dropped_conns = Atomic.make 0; masksigpipe; timeout; } diff --git a/src/Tiny_httpd_server.mli b/src/Tiny_httpd_server.mli index 6a0e0157..8c24b840 100644 --- a/src/Tiny_httpd_server.mli +++ b/src/Tiny_httpd_server.mli @@ -449,6 +449,12 @@ val port : t -> int val active_connections : t -> int (** Number of currently active connections. *) +val dropped_connections : t -> int +(** Number of connections that were dropped because the maximum number of + active connections was reached. + See {!create} with the [max_connections] parameter. + @since NEXT_RELEASE *) + val add_decode_request_cb : t -> (unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) -> diff --git a/src/eio/tiny_httpd_eio.ml b/src/eio/tiny_httpd_eio.ml index d8c89008..b3f50955 100644 --- a/src/eio/tiny_httpd_eio.ml +++ b/src/eio/tiny_httpd_eio.ml @@ -137,6 +137,7 @@ let io_backend ?(addr = "127.0.0.1") ?(port = 8080) ?max_connections (* TODO: find the real port *) addr, port); active_connections = (fun () -> Atomic.get active_conns); + dropped_connections = (fun () -> 0); } in