From 4c7d27dadd1e9efbf70afa4e25b6e2e8a1577696 Mon Sep 17 00:00:00 2001 From: craff Date: Wed, 8 Dec 2021 11:46:01 -1000 Subject: [PATCH 1/7] Added a max_keep_alive and moved to a non blocking reading of request not to loose connection slot. Default is max_keep_alive = -1.0 which keeps the original behaviour. --- src/Tiny_httpd.ml | 49 ++++++++++++++++++++++++++++++++++++++-------- src/Tiny_httpd.mli | 7 +++++-- 2 files changed, 46 insertions(+), 10 deletions(-) diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index f0f95b6f..ee3e1139 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -83,6 +83,27 @@ module Byte_stream = struct let of_chan = of_chan_ ~close:close_in let of_chan_close_noerr = of_chan_ ~close:close_in_noerr + exception Timeout + + let of_descr_ ?(timeout=(-1.0)) ~close ic : t = + let i = ref 0 in + let len = ref 0 in + let buf = Bytes.make 4096 ' ' in + { bs_fill_buf=(fun () -> + if !i >= !len then ( + i := 0; + let (to_read,_,_) = Unix.select [ic] [] [] timeout in + if to_read = [] then raise Timeout; + try len := Unix.read ic buf 0 (Bytes.length buf) + with Unix.Unix_error ((EAGAIN | EWOULDBLOCK), _, _) -> (); + ); + buf, !i,!len - !i); + bs_consume=(fun n -> i := !i + n); + bs_close=(fun () -> close ic) + } + + let of_descr = of_descr_ ~close:Unix.close + let rec iter f (self:t) : unit = let s, i, len = self.bs_fill_buf () in if len=0 then ( @@ -511,6 +532,9 @@ module Request = struct headers; body=()}) with | End_of_file | Sys_error _ -> Ok None + | Byte_stream.Timeout -> + _debug (fun k -> k"Timeout"); + Ok None | Bad_req (c,s) -> Error (c,s) | e -> Error (400, Printexc.to_string e) @@ -697,8 +721,10 @@ module Sem_ = struct let release m t = Mutex.lock t.mutex; t.n <- t.n + m; + let r = t.n in Condition.broadcast t.cond; - Mutex.unlock t.mutex + Mutex.unlock t.mutex; + r end module Route = struct @@ -814,6 +840,9 @@ type t = { sem_max_connections: Sem_.t; (* semaphore to restrict the number of active concurrent connections *) + max_keep_alive: float; + (* maximum time in second before closing the client connections *) + new_thread: (unit -> unit) -> unit; (* a function to run the given callback in a separate thread (or thread pool) *) @@ -948,13 +977,14 @@ let add_route_server_sent_handler ?accept self route f = let create ?(masksigpipe=true) ?(max_connections=32) + ?(max_keep_alive=(-1.0)) ?(new_thread=(fun f -> ignore (Thread.create f () : Thread.t))) ?(addr="127.0.0.1") ?(port=8080) ?sock () : t = let handler _req = Response.fail ~code:404 "no top handler" in let max_connections = max 4 max_connections in { new_thread; addr; port; sock; masksigpipe; handler; running= true; sem_max_connections=Sem_.create max_connections; - path_handlers=[]; + path_handlers=[]; max_keep_alive; cb_encode_resp=[]; cb_decode_req=[]; } @@ -970,10 +1000,10 @@ let find_map f l = in aux f l let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit = - let ic = Unix.in_channel_of_descr client_sock in + let _ = Unix.set_nonblock client_sock in let oc = Unix.out_channel_of_descr client_sock in let buf = Buf_.create() in - let is = Byte_stream.of_chan ic in + let is = Byte_stream.of_descr ~timeout:self.max_keep_alive client_sock in let continue = ref true in while !continue && self.running do _debug (fun k->k "read next request"); @@ -1087,17 +1117,20 @@ let run (self:t) : (unit,_) result = end; while self.running do (* limit concurrency *) - Sem_.acquire 1 self.sem_max_connections; try let client_sock, _ = Unix.accept sock in + Sem_.acquire 1 self.sem_max_connections; self.new_thread (fun () -> try handle_client_ self client_sock; - Sem_.release 1 self.sem_max_connections; - with e -> + let avail = Sem_.release 1 self.sem_max_connections in + _debug (fun k -> k"closing inactive connections (%d connections available)" avail) + with + | e -> (try Unix.close client_sock with _ -> ()); - Sem_.release 1 self.sem_max_connections; + let avail = Sem_.release 1 self.sem_max_connections in + _debug (fun k -> k"closing connections on error (%d connections available)" avail); raise e ); with e -> diff --git a/src/Tiny_httpd.mli b/src/Tiny_httpd.mli index 77610de7..7d8b86cd 100644 --- a/src/Tiny_httpd.mli +++ b/src/Tiny_httpd.mli @@ -434,6 +434,7 @@ type t val create : ?masksigpipe:bool -> ?max_connections:int -> + ?max_keep_alive:float -> ?new_thread:((unit -> unit) -> unit) -> ?addr:string -> ?port:int -> @@ -453,7 +454,10 @@ val create : new client connection. By default it is {!Thread.create} but one could use a thread pool instead. - @param max_connections maximum number of simultaneous connections. + @param max_connections maximum number of simultaneous connections. Default 32. + @param max_keep_alive maximum number of seconds a thread in maintened for + a client with nothing to read. Default: -1.0, meaning threads are maintened + until client close the socket. @param addr address (IPv4 or IPv6) to listen on. Default ["127.0.0.1"]. @param port to listen on. Default [8080]. @param sock an existing socket given to the server to listen on, e.g. by @@ -639,4 +643,3 @@ val _debug : ((('a, out_channel, unit, unit, unit, unit) format6 -> 'a) -> unit) val _enable_debug: bool -> unit (**/**) - From 9e9fe3234768a1fe5b72dfc7594970b35a999ad0 Mon Sep 17 00:00:00 2001 From: craff Date: Wed, 8 Dec 2021 12:07:35 -1000 Subject: [PATCH 2/7] only set non blocking for reading request, not to write answer --- src/Tiny_httpd.ml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index ee3e1139..dd4244c7 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -1000,8 +1000,9 @@ let find_map f l = in aux f l let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit = + let write_sock = Unix.dup client_sock in let _ = Unix.set_nonblock client_sock in - let oc = Unix.out_channel_of_descr client_sock in + let oc = Unix.out_channel_of_descr write_sock in let buf = Buf_.create() in let is = Byte_stream.of_descr ~timeout:self.max_keep_alive client_sock in let continue = ref true in From 024a774f390a866c78fe886d73b5db44d58f4333 Mon Sep 17 00:00:00 2001 From: Christophe Raffalli Date: Wed, 8 Dec 2021 16:25:07 -1000 Subject: [PATCH 3/7] Update src/Tiny_httpd.mli Co-authored-by: Simon Cruanes --- src/Tiny_httpd.mli | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Tiny_httpd.mli b/src/Tiny_httpd.mli index 7d8b86cd..fd33d260 100644 --- a/src/Tiny_httpd.mli +++ b/src/Tiny_httpd.mli @@ -458,6 +458,7 @@ val create : @param max_keep_alive maximum number of seconds a thread in maintened for a client with nothing to read. Default: -1.0, meaning threads are maintened until client close the socket. + This parameter exists since 0.11. @param addr address (IPv4 or IPv6) to listen on. Default ["127.0.0.1"]. @param port to listen on. Default [8080]. @param sock an existing socket given to the server to listen on, e.g. by From e87428ae51a47d8a40d7f9cc17115017293066cd Mon Sep 17 00:00:00 2001 From: craff Date: Wed, 8 Dec 2021 16:38:59 -1000 Subject: [PATCH 4/7] release semaphore does not return anything anymore --- src/Tiny_httpd.ml | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index dd4244c7..eb50e830 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -721,8 +721,13 @@ module Sem_ = struct let release m t = Mutex.lock t.mutex; t.n <- t.n + m; - let r = t.n in Condition.broadcast t.cond; + Mutex.unlock t.mutex + + let available_connection t = + (* locking necessary unless we have atomic read/write for int.*) + Mutex.lock t.mutex; + let r = t.n in Mutex.unlock t.mutex; r end @@ -1125,13 +1130,17 @@ let run (self:t) : (unit,_) result = (fun () -> try handle_client_ self client_sock; - let avail = Sem_.release 1 self.sem_max_connections in - _debug (fun k -> k"closing inactive connections (%d connections available)" avail) + Sem_.release 1 self.sem_max_connections; + _debug (fun k -> k + "closing inactive connections (%d connections available)" + (Sem_.available_connection self.sem_max_connections)) with | e -> (try Unix.close client_sock with _ -> ()); - let avail = Sem_.release 1 self.sem_max_connections in - _debug (fun k -> k"closing connections on error (%d connections available)" avail); + Sem_.release 1 self.sem_max_connections; + _debug (fun k -> k + "closing connections on error (%d connections available)" + (Sem_.available_connection self.sem_max_connections)); raise e ); with e -> From 9b4a1b1197b1438da5cb657b027749aa7864ffd9 Mon Sep 17 00:00:00 2001 From: craff Date: Wed, 8 Dec 2021 17:47:53 -1000 Subject: [PATCH 5/7] remove mutex to get available_connections --- src/Tiny_httpd.ml | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index eb50e830..c265e440 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -724,12 +724,8 @@ module Sem_ = struct Condition.broadcast t.cond; Mutex.unlock t.mutex - let available_connection t = - (* locking necessary unless we have atomic read/write for int.*) - Mutex.lock t.mutex; - let r = t.n in - Mutex.unlock t.mutex; - r + let available_connections t = t.n + end module Route = struct @@ -1133,14 +1129,14 @@ let run (self:t) : (unit,_) result = Sem_.release 1 self.sem_max_connections; _debug (fun k -> k "closing inactive connections (%d connections available)" - (Sem_.available_connection self.sem_max_connections)) + (Sem_.available_connections self.sem_max_connections)) with | e -> (try Unix.close client_sock with _ -> ()); Sem_.release 1 self.sem_max_connections; _debug (fun k -> k "closing connections on error (%d connections available)" - (Sem_.available_connection self.sem_max_connections)); + (Sem_.available_connections self.sem_max_connections)); raise e ); with e -> From 97eeaf96dc28914b8cd7420e8ddffb5dc67d700f Mon Sep 17 00:00:00 2001 From: craff Date: Wed, 8 Dec 2021 17:54:08 -1000 Subject: [PATCH 6/7] make accessible to the end user the number of available connections --- src/Tiny_httpd.ml | 3 +++ src/Tiny_httpd.mli | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index c265e440..16114465 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -870,6 +870,9 @@ type t = { let addr self = self.addr let port self = self.port +let available_connections self = + Sem_.available_connections self.sem_max_connections + let add_decode_request_cb self f = self.cb_decode_req <- f :: self.cb_decode_req let add_encode_response_cb self f = self.cb_encode_resp <- f :: self.cb_encode_resp let set_top_handler self f = self.handler <- f diff --git a/src/Tiny_httpd.mli b/src/Tiny_httpd.mli index fd33d260..1b55a15a 100644 --- a/src/Tiny_httpd.mli +++ b/src/Tiny_httpd.mli @@ -477,6 +477,9 @@ val is_ipv6 : t -> bool val port : t -> int (** Port on which the server listens. *) +val available_connections : t -> int +(** number of available connections on the server. *) + val add_decode_request_cb : t -> (unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) -> unit From c3265af08fe24c90a07697e9c37499067954edc3 Mon Sep 17 00:00:00 2001 From: craff Date: Fri, 10 Dec 2021 12:49:51 -1000 Subject: [PATCH 7/7] moved back acquire before accept but inside the try ... with ... one never knowns --- src/Tiny_httpd.ml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index 16114465..f6cc707b 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -724,7 +724,8 @@ module Sem_ = struct Condition.broadcast t.cond; Mutex.unlock t.mutex - let available_connections t = t.n + (* +1 because we decrease the semaphore before Unix.accept *) + let available_connections t = t.n + 1 end @@ -1123,8 +1124,8 @@ let run (self:t) : (unit,_) result = while self.running do (* limit concurrency *) try - let client_sock, _ = Unix.accept sock in Sem_.acquire 1 self.sem_max_connections; + let client_sock, _ = Unix.accept sock in self.new_thread (fun () -> try