diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index f0f95b6f..f6cc707b 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -83,6 +83,27 @@ module Byte_stream = struct let of_chan = of_chan_ ~close:close_in let of_chan_close_noerr = of_chan_ ~close:close_in_noerr + exception Timeout + + let of_descr_ ?(timeout=(-1.0)) ~close ic : t = + let i = ref 0 in + let len = ref 0 in + let buf = Bytes.make 4096 ' ' in + { bs_fill_buf=(fun () -> + if !i >= !len then ( + i := 0; + let (to_read,_,_) = Unix.select [ic] [] [] timeout in + if to_read = [] then raise Timeout; + try len := Unix.read ic buf 0 (Bytes.length buf) + with Unix.Unix_error ((EAGAIN | EWOULDBLOCK), _, _) -> (); + ); + buf, !i,!len - !i); + bs_consume=(fun n -> i := !i + n); + bs_close=(fun () -> close ic) + } + + let of_descr = of_descr_ ~close:Unix.close + let rec iter f (self:t) : unit = let s, i, len = self.bs_fill_buf () in if len=0 then ( @@ -511,6 +532,9 @@ module Request = struct headers; body=()}) with | End_of_file | Sys_error _ -> Ok None + | Byte_stream.Timeout -> + _debug (fun k -> k"Timeout"); + Ok None | Bad_req (c,s) -> Error (c,s) | e -> Error (400, Printexc.to_string e) @@ -699,6 +723,10 @@ module Sem_ = struct t.n <- t.n + m; Condition.broadcast t.cond; Mutex.unlock t.mutex + + (* +1 because we decrease the semaphore before Unix.accept *) + let available_connections t = t.n + 1 + end module Route = struct @@ -814,6 +842,9 @@ type t = { sem_max_connections: Sem_.t; (* semaphore to restrict the number of active concurrent connections *) + max_keep_alive: float; + (* maximum time in second before closing the client connections *) + new_thread: (unit -> unit) -> unit; (* a function to run the given callback in a separate thread (or thread pool) *) @@ -840,6 +871,9 @@ type t = { let addr self = self.addr let port self = self.port +let available_connections self = + Sem_.available_connections self.sem_max_connections + let add_decode_request_cb self f = self.cb_decode_req <- f :: self.cb_decode_req let add_encode_response_cb self f = self.cb_encode_resp <- f :: self.cb_encode_resp let set_top_handler self f = self.handler <- f @@ -948,13 +982,14 @@ let add_route_server_sent_handler ?accept self route f = let create ?(masksigpipe=true) ?(max_connections=32) + ?(max_keep_alive=(-1.0)) ?(new_thread=(fun f -> ignore (Thread.create f () : Thread.t))) ?(addr="127.0.0.1") ?(port=8080) ?sock () : t = let handler _req = Response.fail ~code:404 "no top handler" in let max_connections = max 4 max_connections in { new_thread; addr; port; sock; masksigpipe; handler; running= true; sem_max_connections=Sem_.create max_connections; - path_handlers=[]; + path_handlers=[]; max_keep_alive; cb_encode_resp=[]; cb_decode_req=[]; } @@ -970,10 +1005,11 @@ let find_map f l = in aux f l let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit = - let ic = Unix.in_channel_of_descr client_sock in - let oc = Unix.out_channel_of_descr client_sock in + let write_sock = Unix.dup client_sock in + let _ = Unix.set_nonblock client_sock in + let oc = Unix.out_channel_of_descr write_sock in let buf = Buf_.create() in - let is = Byte_stream.of_chan ic in + let is = Byte_stream.of_descr ~timeout:self.max_keep_alive client_sock in let continue = ref true in while !continue && self.running do _debug (fun k->k "read next request"); @@ -1087,17 +1123,24 @@ let run (self:t) : (unit,_) result = end; while self.running do (* limit concurrency *) - Sem_.acquire 1 self.sem_max_connections; try + Sem_.acquire 1 self.sem_max_connections; let client_sock, _ = Unix.accept sock in self.new_thread (fun () -> try handle_client_ self client_sock; Sem_.release 1 self.sem_max_connections; - with e -> + _debug (fun k -> k + "closing inactive connections (%d connections available)" + (Sem_.available_connections self.sem_max_connections)) + with + | e -> (try Unix.close client_sock with _ -> ()); Sem_.release 1 self.sem_max_connections; + _debug (fun k -> k + "closing connections on error (%d connections available)" + (Sem_.available_connections self.sem_max_connections)); raise e ); with e -> diff --git a/src/Tiny_httpd.mli b/src/Tiny_httpd.mli index 77610de7..1b55a15a 100644 --- a/src/Tiny_httpd.mli +++ b/src/Tiny_httpd.mli @@ -434,6 +434,7 @@ type t val create : ?masksigpipe:bool -> ?max_connections:int -> + ?max_keep_alive:float -> ?new_thread:((unit -> unit) -> unit) -> ?addr:string -> ?port:int -> @@ -453,7 +454,11 @@ val create : new client connection. By default it is {!Thread.create} but one could use a thread pool instead. - @param max_connections maximum number of simultaneous connections. + @param max_connections maximum number of simultaneous connections. Default 32. + @param max_keep_alive maximum number of seconds a thread in maintened for + a client with nothing to read. Default: -1.0, meaning threads are maintened + until client close the socket. + This parameter exists since 0.11. @param addr address (IPv4 or IPv6) to listen on. Default ["127.0.0.1"]. @param port to listen on. Default [8080]. @param sock an existing socket given to the server to listen on, e.g. by @@ -472,6 +477,9 @@ val is_ipv6 : t -> bool val port : t -> int (** Port on which the server listens. *) +val available_connections : t -> int +(** number of available connections on the server. *) + val add_decode_request_cb : t -> (unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) -> unit @@ -639,4 +647,3 @@ val _debug : ((('a, out_channel, unit, unit, unit, unit) format6 -> 'a) -> unit) val _enable_debug: bool -> unit (**/**) -