From dc72dad5db5e33564e89cd946f6950bcc84006e5 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 11 Dec 2021 09:30:41 -0500 Subject: [PATCH] revert: remove timeout handling for now This reverts commit 6536bfeeb3506db4ec32798a9a813670b5214fb0. --- src/Tiny_httpd.ml | 67 +++++----------------------------------------- src/Tiny_httpd.mli | 12 ++------- 2 files changed, 9 insertions(+), 70 deletions(-) diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index 508bff26..f0f95b6f 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -83,37 +83,6 @@ module Byte_stream = struct let of_chan = of_chan_ ~close:close_in let of_chan_close_noerr = of_chan_ ~close:close_in_noerr - exception Timeout - - let of_descr_ ?(timeout=(-1.0)) ~close ic : t = - let i = ref 0 in - let len = ref 0 in - let buf = Bytes.make 4096 ' ' in - { bs_fill_buf=(fun () -> - if !i >= !len then ( - i := 0; - - let rec wait() = - let to_read,_,_ = Unix.select [ic] [] [] timeout in - if to_read = [] then raise Timeout; - read() - and read() = - try len := Unix.read ic buf 0 (Bytes.length buf) - with - | Unix.Unix_error (EAGAIN, _, _) -> read() - | Unix.Unix_error (EWOULDBLOCK, _, _) -> - (* FIXME: we should decrease the timeout by however long was spent in [select] *) - wait() - in - wait() - ); - buf, !i,!len - !i); - bs_consume=(fun n -> i := !i + n); - bs_close=(fun () -> close ic) - } - - let of_descr = of_descr_ ~close:Unix.close - let rec iter f (self:t) : unit = let s, i, len = self.bs_fill_buf () in if len=0 then ( @@ -542,9 +511,6 @@ module Request = struct headers; body=()}) with | End_of_file | Sys_error _ -> Ok None - | Byte_stream.Timeout -> - _debug (fun k -> k"Timeout"); - Ok None | Bad_req (c,s) -> Error (c,s) | e -> Error (400, Printexc.to_string e) @@ -710,14 +676,13 @@ end module Sem_ = struct type t = { mutable n : int; - max: int; mutex : Mutex.t; cond : Condition.t; } let create n = if n <= 0 then invalid_arg "Semaphore.create"; - { n; max=n; mutex=Mutex.create(); cond=Condition.create(); } + { n; mutex=Mutex.create(); cond=Condition.create(); } let acquire m t = Mutex.lock t.mutex; @@ -734,8 +699,6 @@ module Sem_ = struct t.n <- t.n + m; Condition.broadcast t.cond; Mutex.unlock t.mutex - - let num_acquired self = self.max - self.n end module Route = struct @@ -851,9 +814,6 @@ type t = { sem_max_connections: Sem_.t; (* semaphore to restrict the number of active concurrent connections *) - max_keep_alive: float; - (* maximum time in second before closing the client connections *) - new_thread: (unit -> unit) -> unit; (* a function to run the given callback in a separate thread (or thread pool) *) @@ -880,10 +840,6 @@ type t = { let addr self = self.addr let port self = self.port -let active_connections self = - (* -1 because we decrease the semaphore before Unix.accept *) - Sem_.num_acquired self.sem_max_connections - 1 - let add_decode_request_cb self f = self.cb_decode_req <- f :: self.cb_decode_req let add_encode_response_cb self f = self.cb_encode_resp <- f :: self.cb_encode_resp let set_top_handler self f = self.handler <- f @@ -992,14 +948,13 @@ let add_route_server_sent_handler ?accept self route f = let create ?(masksigpipe=true) ?(max_connections=32) - ?(max_keep_alive=(-1.0)) ?(new_thread=(fun f -> ignore (Thread.create f () : Thread.t))) ?(addr="127.0.0.1") ?(port=8080) ?sock () : t = let handler _req = Response.fail ~code:404 "no top handler" in let max_connections = max 4 max_connections in { new_thread; addr; port; sock; masksigpipe; handler; running= true; sem_max_connections=Sem_.create max_connections; - path_handlers=[]; max_keep_alive; + path_handlers=[]; cb_encode_resp=[]; cb_decode_req=[]; } @@ -1015,11 +970,10 @@ let find_map f l = in aux f l let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit = - let write_sock = Unix.dup client_sock in - let _ = Unix.set_nonblock client_sock in - let oc = Unix.out_channel_of_descr write_sock in + let ic = Unix.in_channel_of_descr client_sock in + let oc = Unix.out_channel_of_descr client_sock in let buf = Buf_.create() in - let is = Byte_stream.of_descr ~timeout:self.max_keep_alive client_sock in + let is = Byte_stream.of_chan ic in let continue = ref true in while !continue && self.running do _debug (fun k->k "read next request"); @@ -1133,24 +1087,17 @@ let run (self:t) : (unit,_) result = end; while self.running do (* limit concurrency *) + Sem_.acquire 1 self.sem_max_connections; try - Sem_.acquire 1 self.sem_max_connections; let client_sock, _ = Unix.accept sock in self.new_thread (fun () -> try handle_client_ self client_sock; Sem_.release 1 self.sem_max_connections; - _debug (fun k -> k - "closing inactive connections (%d connections active)" - (active_connections self)) - with - | e -> + with e -> (try Unix.close client_sock with _ -> ()); Sem_.release 1 self.sem_max_connections; - _debug (fun k -> k - "closing connections on error (%d connections active)" - (active_connections self)); raise e ); with e -> diff --git a/src/Tiny_httpd.mli b/src/Tiny_httpd.mli index 1e7069bf..77610de7 100644 --- a/src/Tiny_httpd.mli +++ b/src/Tiny_httpd.mli @@ -434,7 +434,6 @@ type t val create : ?masksigpipe:bool -> ?max_connections:int -> - ?max_keep_alive:float -> ?new_thread:((unit -> unit) -> unit) -> ?addr:string -> ?port:int -> @@ -454,11 +453,7 @@ val create : new client connection. By default it is {!Thread.create} but one could use a thread pool instead. - @param max_connections maximum number of simultaneous connections. Default 32. - @param max_keep_alive maximum number of seconds a thread in maintened for - a client with nothing to read. Default: -1.0, meaning threads are maintened - until client close the socket. - This parameter exists since 0.11. + @param max_connections maximum number of simultaneous connections. @param addr address (IPv4 or IPv6) to listen on. Default ["127.0.0.1"]. @param port to listen on. Default [8080]. @param sock an existing socket given to the server to listen on, e.g. by @@ -477,10 +472,6 @@ val is_ipv6 : t -> bool val port : t -> int (** Port on which the server listens. *) -val active_connections : t -> int -(** number of currently opened connections with a client. - @since 0.11 *) - val add_decode_request_cb : t -> (unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) -> unit @@ -648,3 +639,4 @@ val _debug : ((('a, out_channel, unit, unit, unit, unit) format6 -> 'a) -> unit) val _enable_debug: bool -> unit (**/**) +