revert: remove timeout handling for now

This reverts commit 6536bfeeb3.
This commit is contained in:
Simon Cruanes 2021-12-11 09:30:41 -05:00
parent 6536bfeeb3
commit dc72dad5db
No known key found for this signature in database
GPG key ID: 4AC01D0849AA62B6
2 changed files with 9 additions and 70 deletions

View file

@ -83,37 +83,6 @@ module Byte_stream = struct
let of_chan = of_chan_ ~close:close_in
let of_chan_close_noerr = of_chan_ ~close:close_in_noerr
exception Timeout
let of_descr_ ?(timeout=(-1.0)) ~close ic : t =
let i = ref 0 in
let len = ref 0 in
let buf = Bytes.make 4096 ' ' in
{ bs_fill_buf=(fun () ->
if !i >= !len then (
i := 0;
let rec wait() =
let to_read,_,_ = Unix.select [ic] [] [] timeout in
if to_read = [] then raise Timeout;
read()
and read() =
try len := Unix.read ic buf 0 (Bytes.length buf)
with
| Unix.Unix_error (EAGAIN, _, _) -> read()
| Unix.Unix_error (EWOULDBLOCK, _, _) ->
(* FIXME: we should decrease the timeout by however long was spent in [select] *)
wait()
in
wait()
);
buf, !i,!len - !i);
bs_consume=(fun n -> i := !i + n);
bs_close=(fun () -> close ic)
}
let of_descr = of_descr_ ~close:Unix.close
let rec iter f (self:t) : unit =
let s, i, len = self.bs_fill_buf () in
if len=0 then (
@ -542,9 +511,6 @@ module Request = struct
headers; body=()})
with
| End_of_file | Sys_error _ -> Ok None
| Byte_stream.Timeout ->
_debug (fun k -> k"Timeout");
Ok None
| Bad_req (c,s) -> Error (c,s)
| e ->
Error (400, Printexc.to_string e)
@ -710,14 +676,13 @@ end
module Sem_ = struct
type t = {
mutable n : int;
max: int;
mutex : Mutex.t;
cond : Condition.t;
}
let create n =
if n <= 0 then invalid_arg "Semaphore.create";
{ n; max=n; mutex=Mutex.create(); cond=Condition.create(); }
{ n; mutex=Mutex.create(); cond=Condition.create(); }
let acquire m t =
Mutex.lock t.mutex;
@ -734,8 +699,6 @@ module Sem_ = struct
t.n <- t.n + m;
Condition.broadcast t.cond;
Mutex.unlock t.mutex
let num_acquired self = self.max - self.n
end
module Route = struct
@ -851,9 +814,6 @@ type t = {
sem_max_connections: Sem_.t;
(* semaphore to restrict the number of active concurrent connections *)
max_keep_alive: float;
(* maximum time in second before closing the client connections *)
new_thread: (unit -> unit) -> unit;
(* a function to run the given callback in a separate thread (or thread pool) *)
@ -880,10 +840,6 @@ type t = {
let addr self = self.addr
let port self = self.port
let active_connections self =
(* -1 because we decrease the semaphore before Unix.accept *)
Sem_.num_acquired self.sem_max_connections - 1
let add_decode_request_cb self f = self.cb_decode_req <- f :: self.cb_decode_req
let add_encode_response_cb self f = self.cb_encode_resp <- f :: self.cb_encode_resp
let set_top_handler self f = self.handler <- f
@ -992,14 +948,13 @@ let add_route_server_sent_handler ?accept self route f =
let create
?(masksigpipe=true)
?(max_connections=32)
?(max_keep_alive=(-1.0))
?(new_thread=(fun f -> ignore (Thread.create f () : Thread.t)))
?(addr="127.0.0.1") ?(port=8080) ?sock () : t =
let handler _req = Response.fail ~code:404 "no top handler" in
let max_connections = max 4 max_connections in
{ new_thread; addr; port; sock; masksigpipe; handler;
running= true; sem_max_connections=Sem_.create max_connections;
path_handlers=[]; max_keep_alive;
path_handlers=[];
cb_encode_resp=[]; cb_decode_req=[];
}
@ -1015,11 +970,10 @@ let find_map f l =
in aux f l
let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
let write_sock = Unix.dup client_sock in
let _ = Unix.set_nonblock client_sock in
let oc = Unix.out_channel_of_descr write_sock in
let ic = Unix.in_channel_of_descr client_sock in
let oc = Unix.out_channel_of_descr client_sock in
let buf = Buf_.create() in
let is = Byte_stream.of_descr ~timeout:self.max_keep_alive client_sock in
let is = Byte_stream.of_chan ic in
let continue = ref true in
while !continue && self.running do
_debug (fun k->k "read next request");
@ -1133,24 +1087,17 @@ let run (self:t) : (unit,_) result =
end;
while self.running do
(* limit concurrency *)
Sem_.acquire 1 self.sem_max_connections;
try
Sem_.acquire 1 self.sem_max_connections;
let client_sock, _ = Unix.accept sock in
self.new_thread
(fun () ->
try
handle_client_ self client_sock;
Sem_.release 1 self.sem_max_connections;
_debug (fun k -> k
"closing inactive connections (%d connections active)"
(active_connections self))
with
| e ->
with e ->
(try Unix.close client_sock with _ -> ());
Sem_.release 1 self.sem_max_connections;
_debug (fun k -> k
"closing connections on error (%d connections active)"
(active_connections self));
raise e
);
with e ->

View file

@ -434,7 +434,6 @@ type t
val create :
?masksigpipe:bool ->
?max_connections:int ->
?max_keep_alive:float ->
?new_thread:((unit -> unit) -> unit) ->
?addr:string ->
?port:int ->
@ -454,11 +453,7 @@ val create :
new client connection. By default it is {!Thread.create} but one
could use a thread pool instead.
@param max_connections maximum number of simultaneous connections. Default 32.
@param max_keep_alive maximum number of seconds a thread in maintened for
a client with nothing to read. Default: -1.0, meaning threads are maintened
until client close the socket.
This parameter exists since 0.11.
@param max_connections maximum number of simultaneous connections.
@param addr address (IPv4 or IPv6) to listen on. Default ["127.0.0.1"].
@param port to listen on. Default [8080].
@param sock an existing socket given to the server to listen on, e.g. by
@ -477,10 +472,6 @@ val is_ipv6 : t -> bool
val port : t -> int
(** Port on which the server listens. *)
val active_connections : t -> int
(** number of currently opened connections with a client.
@since 0.11 *)
val add_decode_request_cb :
t ->
(unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) -> unit
@ -648,3 +639,4 @@ val _debug : ((('a, out_channel, unit, unit, unit, unit) format6 -> 'a) -> unit)
val _enable_debug: bool -> unit
(**/**)