Merge pull request #30 from craff/master

release semaphore in case of exception in accept
This commit is contained in:
Simon Cruanes 2021-12-12 12:08:01 -05:00 committed by GitHub
commit 267d29d6c7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 31 additions and 8 deletions

View file

@ -3,8 +3,10 @@
PORT=8082 PORT=8082
./sse_server.exe -p $PORT & ./sse_server.exe -p $PORT &
PID=$!
sleep 0.1 sleep 0.1
./sse_client.exe -p $PORT --alarm=1 /count | tr -d '\r' || true ./sse_client.exe -p $PORT --alarm=1 /count | tr -d '\r' || true
kill %1 kill $PID
echo "success" echo "success"

View file

@ -676,13 +676,14 @@ end
module Sem_ = struct module Sem_ = struct
type t = { type t = {
mutable n : int; mutable n : int;
max : int;
mutex : Mutex.t; mutex : Mutex.t;
cond : Condition.t; cond : Condition.t;
} }
let create n = let create n =
if n <= 0 then invalid_arg "Semaphore.create"; if n <= 0 then invalid_arg "Semaphore.create";
{ n; mutex=Mutex.create(); cond=Condition.create(); } { n; max=n; mutex=Mutex.create(); cond=Condition.create(); }
let acquire m t = let acquire m t =
Mutex.lock t.mutex; Mutex.lock t.mutex;
@ -699,6 +700,8 @@ module Sem_ = struct
t.n <- t.n + m; t.n <- t.n + m;
Condition.broadcast t.cond; Condition.broadcast t.cond;
Mutex.unlock t.mutex Mutex.unlock t.mutex
let num_acquired t = t.max - t.n
end end
module Route = struct module Route = struct
@ -812,6 +815,8 @@ type t = {
sock: Unix.file_descr option; sock: Unix.file_descr option;
timeout: float;
sem_max_connections: Sem_.t; sem_max_connections: Sem_.t;
(* semaphore to restrict the number of active concurrent connections *) (* semaphore to restrict the number of active concurrent connections *)
@ -841,6 +846,8 @@ type t = {
let addr self = self.addr let addr self = self.addr
let port self = self.port let port self = self.port
let active_connections self = Sem_.num_acquired self.sem_max_connections - 1
let add_decode_request_cb self f = self.cb_decode_req <- f :: self.cb_decode_req let add_decode_request_cb self f = self.cb_decode_req <- f :: self.cb_decode_req
let add_encode_response_cb self f = self.cb_encode_resp <- f :: self.cb_encode_resp let add_encode_response_cb self f = self.cb_encode_resp <- f :: self.cb_encode_resp
let set_top_handler self f = self.handler <- f let set_top_handler self f = self.handler <- f
@ -951,13 +958,14 @@ let add_route_server_sent_handler ?accept self route f =
let create let create
?(masksigpipe=true) ?(masksigpipe=true)
?(max_connections=32) ?(max_connections=32)
?(timeout=0.0)
?(new_thread=(fun f -> ignore (Thread.create f () : Thread.t))) ?(new_thread=(fun f -> ignore (Thread.create f () : Thread.t)))
?(addr="127.0.0.1") ?(port=8080) ?sock () : t = ?(addr="127.0.0.1") ?(port=8080) ?sock () : t =
let handler _req = Response.fail ~code:404 "no top handler" in let handler _req = Response.fail ~code:404 "no top handler" in
let max_connections = max 4 max_connections in let max_connections = max 4 max_connections in
{ new_thread; addr; port; sock; masksigpipe; handler; { new_thread; addr; port; sock; masksigpipe; handler;
running= true; sem_max_connections=Sem_.create max_connections; running= true; sem_max_connections=Sem_.create max_connections;
path_handlers=[]; path_handlers=[]; timeout;
cb_encode_resp=[]; cb_decode_req=[]; cb_encode_resp=[]; cb_decode_req=[];
} }
@ -973,6 +981,8 @@ let find_map f l =
in aux f l in aux f l
let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit = let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
let _ = Unix.(setsockopt_float client_sock SO_RCVTIMEO self.timeout) in
let _ = Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout) in
let ic = Unix.in_channel_of_descr client_sock in let ic = Unix.in_channel_of_descr client_sock in
let oc = Unix.out_channel_of_descr client_sock in let oc = Unix.out_channel_of_descr client_sock in
let buf = Buf_.create() in let buf = Buf_.create() in
@ -1104,6 +1114,7 @@ let run (self:t) : (unit,_) result =
raise e raise e
); );
with e -> with e ->
Sem_.release 1 self.sem_max_connections;
_debug (fun k -> k _debug (fun k -> k
"Unix.accept or Thread.create raised an exception: %s" "Unix.accept or Thread.create raised an exception: %s"
(Printexc.to_string e)) (Printexc.to_string e))

View file

@ -434,6 +434,7 @@ type t
val create : val create :
?masksigpipe:bool -> ?masksigpipe:bool ->
?max_connections:int -> ?max_connections:int ->
?timeout:float ->
?new_thread:((unit -> unit) -> unit) -> ?new_thread:((unit -> unit) -> unit) ->
?addr:string -> ?addr:string ->
?port:int -> ?port:int ->
@ -454,6 +455,9 @@ val create :
could use a thread pool instead. could use a thread pool instead.
@param max_connections maximum number of simultaneous connections. @param max_connections maximum number of simultaneous connections.
@param timeout connection is closed if the socket does not do read or
write for the amount of second. Default: 0.0 which means no timeout.
timeout is not recommended when using proxy.
@param addr address (IPv4 or IPv6) to listen on. Default ["127.0.0.1"]. @param addr address (IPv4 or IPv6) to listen on. Default ["127.0.0.1"].
@param port to listen on. Default [8080]. @param port to listen on. Default [8080].
@param sock an existing socket given to the server to listen on, e.g. by @param sock an existing socket given to the server to listen on, e.g. by
@ -472,6 +476,9 @@ val is_ipv6 : t -> bool
val port : t -> int val port : t -> int
(** Port on which the server listens. *) (** Port on which the server listens. *)
val active_connections : t -> int
(** Number of active connections *)
val add_decode_request_cb : val add_decode_request_cb :
t -> t ->
(unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) -> unit (unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) -> unit
@ -643,4 +650,3 @@ val _debug : ((('a, out_channel, unit, unit, unit, unit) format6 -> 'a) -> unit)
val _enable_debug: bool -> unit val _enable_debug: bool -> unit
(**/**) (**/**)

View file

@ -3,11 +3,12 @@
SERVER=$1 SERVER=$1
PORT=8084 PORT=8084
"$SERVER" . -p $PORT & "$SERVER" . -p $PORT &
PID=$!
sleep 0.1 sleep 0.1
curl -N "http://localhost:${PORT}/foo_50" -o data2 \ curl -N "http://localhost:${PORT}/foo_50" -o data2 \
-H 'Tranfer-encoding: chunked' -H 'Tranfer-encoding: chunked'
kill %1 kill $PID
wc data2 wc data2

View file

@ -4,6 +4,7 @@ ECHO=$1
PORT=8085 PORT=8085
"$ECHO" -p $PORT & "$ECHO" -p $PORT &
PID=$!
sleep 0.1 sleep 0.1
curl -N "http://localhost:${PORT}/echo/?a=b&c=d" -H user-agent:test curl -N "http://localhost:${PORT}/echo/?a=b&c=d" -H user-agent:test
kill %1 kill $PID

View file

@ -4,7 +4,8 @@ SSE_SERVER=$1
PORT=8086 PORT=8086
"$SSE_SERVER" -p $PORT & "$SSE_SERVER" -p $PORT &
PID=$!
sleep 0.1 sleep 0.1
curl -N "http://localhost:${PORT}/count/10" -H user-agent:test curl -N "http://localhost:${PORT}/count/10" -H user-agent:test
kill %1 kill $PID

View file

@ -6,10 +6,11 @@ SERVER=$1
PORT=8087 PORT=8087
"$SERVER" . -p $PORT --upload --max-upload 100000000000 & "$SERVER" . -p $PORT --upload --max-upload 100000000000 &
PID=$!
sleep 0.1 sleep 0.1
cat foo_50 | curl -N -X PUT http://localhost:$PORT/data --data-binary @- -H 'Transfer-Encoding: chunked' cat foo_50 | curl -N -X PUT http://localhost:$PORT/data --data-binary @- -H 'Transfer-Encoding: chunked'
kill %1 kill $PID
wc data wc data