mirror of
https://github.com/c-cube/tiny_httpd.git
synced 2025-12-06 11:15:35 -05:00
Merge pull request #23 from craff/master
max_keep_alive and non blocking reading of request
This commit is contained in:
commit
6dcae7f996
2 changed files with 58 additions and 8 deletions
|
|
@ -83,6 +83,27 @@ module Byte_stream = struct
|
||||||
let of_chan = of_chan_ ~close:close_in
|
let of_chan = of_chan_ ~close:close_in
|
||||||
let of_chan_close_noerr = of_chan_ ~close:close_in_noerr
|
let of_chan_close_noerr = of_chan_ ~close:close_in_noerr
|
||||||
|
|
||||||
|
exception Timeout
|
||||||
|
|
||||||
|
let of_descr_ ?(timeout=(-1.0)) ~close ic : t =
|
||||||
|
let i = ref 0 in
|
||||||
|
let len = ref 0 in
|
||||||
|
let buf = Bytes.make 4096 ' ' in
|
||||||
|
{ bs_fill_buf=(fun () ->
|
||||||
|
if !i >= !len then (
|
||||||
|
i := 0;
|
||||||
|
let (to_read,_,_) = Unix.select [ic] [] [] timeout in
|
||||||
|
if to_read = [] then raise Timeout;
|
||||||
|
try len := Unix.read ic buf 0 (Bytes.length buf)
|
||||||
|
with Unix.Unix_error ((EAGAIN | EWOULDBLOCK), _, _) -> ();
|
||||||
|
);
|
||||||
|
buf, !i,!len - !i);
|
||||||
|
bs_consume=(fun n -> i := !i + n);
|
||||||
|
bs_close=(fun () -> close ic)
|
||||||
|
}
|
||||||
|
|
||||||
|
let of_descr = of_descr_ ~close:Unix.close
|
||||||
|
|
||||||
let rec iter f (self:t) : unit =
|
let rec iter f (self:t) : unit =
|
||||||
let s, i, len = self.bs_fill_buf () in
|
let s, i, len = self.bs_fill_buf () in
|
||||||
if len=0 then (
|
if len=0 then (
|
||||||
|
|
@ -511,6 +532,9 @@ module Request = struct
|
||||||
headers; body=()})
|
headers; body=()})
|
||||||
with
|
with
|
||||||
| End_of_file | Sys_error _ -> Ok None
|
| End_of_file | Sys_error _ -> Ok None
|
||||||
|
| Byte_stream.Timeout ->
|
||||||
|
_debug (fun k -> k"Timeout");
|
||||||
|
Ok None
|
||||||
| Bad_req (c,s) -> Error (c,s)
|
| Bad_req (c,s) -> Error (c,s)
|
||||||
| e ->
|
| e ->
|
||||||
Error (400, Printexc.to_string e)
|
Error (400, Printexc.to_string e)
|
||||||
|
|
@ -699,6 +723,10 @@ module Sem_ = struct
|
||||||
t.n <- t.n + m;
|
t.n <- t.n + m;
|
||||||
Condition.broadcast t.cond;
|
Condition.broadcast t.cond;
|
||||||
Mutex.unlock t.mutex
|
Mutex.unlock t.mutex
|
||||||
|
|
||||||
|
(* +1 because we decrease the semaphore before Unix.accept *)
|
||||||
|
let available_connections t = t.n + 1
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
module Route = struct
|
module Route = struct
|
||||||
|
|
@ -814,6 +842,9 @@ type t = {
|
||||||
sem_max_connections: Sem_.t;
|
sem_max_connections: Sem_.t;
|
||||||
(* semaphore to restrict the number of active concurrent connections *)
|
(* semaphore to restrict the number of active concurrent connections *)
|
||||||
|
|
||||||
|
max_keep_alive: float;
|
||||||
|
(* maximum time in second before closing the client connections *)
|
||||||
|
|
||||||
new_thread: (unit -> unit) -> unit;
|
new_thread: (unit -> unit) -> unit;
|
||||||
(* a function to run the given callback in a separate thread (or thread pool) *)
|
(* a function to run the given callback in a separate thread (or thread pool) *)
|
||||||
|
|
||||||
|
|
@ -840,6 +871,9 @@ type t = {
|
||||||
let addr self = self.addr
|
let addr self = self.addr
|
||||||
let port self = self.port
|
let port self = self.port
|
||||||
|
|
||||||
|
let available_connections self =
|
||||||
|
Sem_.available_connections self.sem_max_connections
|
||||||
|
|
||||||
let add_decode_request_cb self f = self.cb_decode_req <- f :: self.cb_decode_req
|
let add_decode_request_cb self f = self.cb_decode_req <- f :: self.cb_decode_req
|
||||||
let add_encode_response_cb self f = self.cb_encode_resp <- f :: self.cb_encode_resp
|
let add_encode_response_cb self f = self.cb_encode_resp <- f :: self.cb_encode_resp
|
||||||
let set_top_handler self f = self.handler <- f
|
let set_top_handler self f = self.handler <- f
|
||||||
|
|
@ -948,13 +982,14 @@ let add_route_server_sent_handler ?accept self route f =
|
||||||
let create
|
let create
|
||||||
?(masksigpipe=true)
|
?(masksigpipe=true)
|
||||||
?(max_connections=32)
|
?(max_connections=32)
|
||||||
|
?(max_keep_alive=(-1.0))
|
||||||
?(new_thread=(fun f -> ignore (Thread.create f () : Thread.t)))
|
?(new_thread=(fun f -> ignore (Thread.create f () : Thread.t)))
|
||||||
?(addr="127.0.0.1") ?(port=8080) ?sock () : t =
|
?(addr="127.0.0.1") ?(port=8080) ?sock () : t =
|
||||||
let handler _req = Response.fail ~code:404 "no top handler" in
|
let handler _req = Response.fail ~code:404 "no top handler" in
|
||||||
let max_connections = max 4 max_connections in
|
let max_connections = max 4 max_connections in
|
||||||
{ new_thread; addr; port; sock; masksigpipe; handler;
|
{ new_thread; addr; port; sock; masksigpipe; handler;
|
||||||
running= true; sem_max_connections=Sem_.create max_connections;
|
running= true; sem_max_connections=Sem_.create max_connections;
|
||||||
path_handlers=[];
|
path_handlers=[]; max_keep_alive;
|
||||||
cb_encode_resp=[]; cb_decode_req=[];
|
cb_encode_resp=[]; cb_decode_req=[];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -970,10 +1005,11 @@ let find_map f l =
|
||||||
in aux f l
|
in aux f l
|
||||||
|
|
||||||
let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
|
let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
|
||||||
let ic = Unix.in_channel_of_descr client_sock in
|
let write_sock = Unix.dup client_sock in
|
||||||
let oc = Unix.out_channel_of_descr client_sock in
|
let _ = Unix.set_nonblock client_sock in
|
||||||
|
let oc = Unix.out_channel_of_descr write_sock in
|
||||||
let buf = Buf_.create() in
|
let buf = Buf_.create() in
|
||||||
let is = Byte_stream.of_chan ic in
|
let is = Byte_stream.of_descr ~timeout:self.max_keep_alive client_sock in
|
||||||
let continue = ref true in
|
let continue = ref true in
|
||||||
while !continue && self.running do
|
while !continue && self.running do
|
||||||
_debug (fun k->k "read next request");
|
_debug (fun k->k "read next request");
|
||||||
|
|
@ -1087,17 +1123,24 @@ let run (self:t) : (unit,_) result =
|
||||||
end;
|
end;
|
||||||
while self.running do
|
while self.running do
|
||||||
(* limit concurrency *)
|
(* limit concurrency *)
|
||||||
Sem_.acquire 1 self.sem_max_connections;
|
|
||||||
try
|
try
|
||||||
|
Sem_.acquire 1 self.sem_max_connections;
|
||||||
let client_sock, _ = Unix.accept sock in
|
let client_sock, _ = Unix.accept sock in
|
||||||
self.new_thread
|
self.new_thread
|
||||||
(fun () ->
|
(fun () ->
|
||||||
try
|
try
|
||||||
handle_client_ self client_sock;
|
handle_client_ self client_sock;
|
||||||
Sem_.release 1 self.sem_max_connections;
|
Sem_.release 1 self.sem_max_connections;
|
||||||
with e ->
|
_debug (fun k -> k
|
||||||
|
"closing inactive connections (%d connections available)"
|
||||||
|
(Sem_.available_connections self.sem_max_connections))
|
||||||
|
with
|
||||||
|
| e ->
|
||||||
(try Unix.close client_sock with _ -> ());
|
(try Unix.close client_sock with _ -> ());
|
||||||
Sem_.release 1 self.sem_max_connections;
|
Sem_.release 1 self.sem_max_connections;
|
||||||
|
_debug (fun k -> k
|
||||||
|
"closing connections on error (%d connections available)"
|
||||||
|
(Sem_.available_connections self.sem_max_connections));
|
||||||
raise e
|
raise e
|
||||||
);
|
);
|
||||||
with e ->
|
with e ->
|
||||||
|
|
|
||||||
|
|
@ -434,6 +434,7 @@ type t
|
||||||
val create :
|
val create :
|
||||||
?masksigpipe:bool ->
|
?masksigpipe:bool ->
|
||||||
?max_connections:int ->
|
?max_connections:int ->
|
||||||
|
?max_keep_alive:float ->
|
||||||
?new_thread:((unit -> unit) -> unit) ->
|
?new_thread:((unit -> unit) -> unit) ->
|
||||||
?addr:string ->
|
?addr:string ->
|
||||||
?port:int ->
|
?port:int ->
|
||||||
|
|
@ -453,7 +454,11 @@ val create :
|
||||||
new client connection. By default it is {!Thread.create} but one
|
new client connection. By default it is {!Thread.create} but one
|
||||||
could use a thread pool instead.
|
could use a thread pool instead.
|
||||||
|
|
||||||
@param max_connections maximum number of simultaneous connections.
|
@param max_connections maximum number of simultaneous connections. Default 32.
|
||||||
|
@param max_keep_alive maximum number of seconds a thread in maintened for
|
||||||
|
a client with nothing to read. Default: -1.0, meaning threads are maintened
|
||||||
|
until client close the socket.
|
||||||
|
This parameter exists since 0.11.
|
||||||
@param addr address (IPv4 or IPv6) to listen on. Default ["127.0.0.1"].
|
@param addr address (IPv4 or IPv6) to listen on. Default ["127.0.0.1"].
|
||||||
@param port to listen on. Default [8080].
|
@param port to listen on. Default [8080].
|
||||||
@param sock an existing socket given to the server to listen on, e.g. by
|
@param sock an existing socket given to the server to listen on, e.g. by
|
||||||
|
|
@ -472,6 +477,9 @@ val is_ipv6 : t -> bool
|
||||||
val port : t -> int
|
val port : t -> int
|
||||||
(** Port on which the server listens. *)
|
(** Port on which the server listens. *)
|
||||||
|
|
||||||
|
val available_connections : t -> int
|
||||||
|
(** number of available connections on the server. *)
|
||||||
|
|
||||||
val add_decode_request_cb :
|
val add_decode_request_cb :
|
||||||
t ->
|
t ->
|
||||||
(unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) -> unit
|
(unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) -> unit
|
||||||
|
|
@ -639,4 +647,3 @@ val _debug : ((('a, out_channel, unit, unit, unit, unit) format6 -> 'a) -> unit)
|
||||||
val _enable_debug: bool -> unit
|
val _enable_debug: bool -> unit
|
||||||
|
|
||||||
(**/**)
|
(**/**)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue