feat: add semaphore to limit number of simultaneous connections

This commit is contained in:
Simon Cruanes 2019-11-18 22:44:08 -06:00
parent 046ac43bac
commit 3ac031b367
4 changed files with 58 additions and 4 deletions

View file

@ -478,11 +478,41 @@ module Response = struct
flush oc flush oc
end end
(* semaphore, for limiting concurrency. *)
module Sem_ = struct
type t = {
mutable n : int;
mutex : Mutex.t;
cond : Condition.t;
}
let create n =
if n <= 0 then invalid_arg "Semaphore.create";
{ n; mutex=Mutex.create(); cond=Condition.create(); }
let acquire m t =
Mutex.lock t.mutex;
while t.n < m do
Condition.wait t.cond t.mutex;
done;
assert (t.n >= m);
t.n <- t.n - m;
Condition.broadcast t.cond;
Mutex.unlock t.mutex
let release m t =
Mutex.lock t.mutex;
t.n <- t.n + m;
Condition.broadcast t.cond;
Mutex.unlock t.mutex
end
type cb_path_handler = string Request.t -> Response.t type cb_path_handler = string Request.t -> Response.t
type t = { type t = {
addr: string; addr: string;
port: int; port: int;
sem_max_connections: Sem_.t;
new_thread: (unit -> unit) -> unit; new_thread: (unit -> unit) -> unit;
masksigpipe: bool; masksigpipe: bool;
mutable handler: (string Request.t -> Response.t); mutable handler: (string Request.t -> Response.t);
@ -521,10 +551,13 @@ let add_path_handler
let create let create
?(masksigpipe=true) ?(masksigpipe=true)
?(max_connections=32)
?(new_thread=(fun f -> ignore (Thread.create f () : Thread.t))) ?(new_thread=(fun f -> ignore (Thread.create f () : Thread.t)))
?(addr="127.0.0.1") ?(port=8080) () : t = ?(addr="127.0.0.1") ?(port=8080) () : t =
let handler _req = Response.fail ~code:404 "no top handler" in let handler _req = Response.fail ~code:404 "no top handler" in
{ new_thread; addr; port; masksigpipe; handler; running= true; let max_connections = max 4 max_connections in
{ new_thread; addr; port; masksigpipe; handler;
running= true; sem_max_connections=Sem_.create max_connections;
path_handlers=[]; path_handlers=[];
cb_encode_resp=[]; cb_decode_req=[]; cb_encode_resp=[]; cb_decode_req=[];
} }
@ -611,6 +644,7 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
Unix.close client_sock; Unix.close client_sock;
done; done;
_debug (fun k->k "done with client, exiting"); _debug (fun k->k "done with client, exiting");
(try Unix.close client_sock with _ -> ());
() ()
let run (self:t) : (unit,_) result = let run (self:t) : (unit,_) result =
@ -626,9 +660,19 @@ let run (self:t) : (unit,_) result =
Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port)); Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port));
Unix.listen sock 10; Unix.listen sock 10;
while self.running do while self.running do
(* limit concurrency *)
Sem_.acquire 1 self.sem_max_connections;
let client_sock, _ = Unix.accept sock in let client_sock, _ = Unix.accept sock in
self.new_thread self.new_thread
(fun () -> handle_client_ self client_sock); (fun () ->
try
handle_client_ self client_sock;
Sem_.release 1 self.sem_max_connections;
with e ->
(try Unix.close client_sock with _ -> ());
Sem_.release 1 self.sem_max_connections;
raise e
);
done; done;
Ok () Ok ()
with e -> Error e with e -> Error e

View file

@ -306,6 +306,7 @@ type t
val create : val create :
?masksigpipe:bool -> ?masksigpipe:bool ->
?max_connections:int ->
?new_thread:((unit -> unit) -> unit) -> ?new_thread:((unit -> unit) -> unit) ->
?addr:string -> ?addr:string ->
?port:int -> ?port:int ->
@ -324,6 +325,7 @@ val create :
new client connection. By default it is {!Thread.create} but one new client connection. By default it is {!Thread.create} but one
could use a thread pool instead. could use a thread pool instead.
@param max_connections maximum number of simultaneous connections.
@param addr the address (IPv4) to listen on. Default ["127.0.0.1"]. @param addr the address (IPv4) to listen on. Default ["127.0.0.1"].
@param port to listen on. Default [8080]. @param port to listen on. Default [8080].
*) *)

View file

@ -8,6 +8,7 @@ type config = {
mutable upload: bool; mutable upload: bool;
mutable max_upload_size: int; mutable max_upload_size: int;
mutable delete: bool; mutable delete: bool;
mutable j: int;
} }
let default_config () : config = { let default_config () : config = {
@ -16,6 +17,7 @@ let default_config () : config = {
delete=false; delete=false;
upload=true; upload=true;
max_upload_size = 10 * 1024 * 1024; max_upload_size = 10 * 1024 * 1024;
j=32;
} }
let contains_dot_dot s = let contains_dot_dot s =
@ -65,7 +67,7 @@ let date_of_time (f:float) : string =
let serve ~config (dir:string) : _ result = let serve ~config (dir:string) : _ result =
Printf.printf "serve directory %s on http://%s:%d\n%!" dir config.addr config.port; Printf.printf "serve directory %s on http://%s:%d\n%!" dir config.addr config.port;
let server = S.create ~addr:config.addr ~port:config.port () in let server = S.create ~max_connections:config.j ~addr:config.addr ~port:config.port () in
if config.delete then ( if config.delete then (
S.add_path_handler server ~meth:`DELETE "/%s" S.add_path_handler server ~meth:`DELETE "/%s"
(fun path _req -> (fun path _req ->
@ -167,6 +169,7 @@ let main () =
"--debug", Unit (fun () -> S._enable_debug true), " debug mode"; "--debug", Unit (fun () -> S._enable_debug true), " debug mode";
"--delete", Unit (fun () -> config.delete <- true), " enable `delete` on files"; "--delete", Unit (fun () -> config.delete <- true), " enable `delete` on files";
"--no-delete", Unit (fun () -> config.delete <- false), " disable `delete` on files"; "--no-delete", Unit (fun () -> config.delete <- false), " disable `delete` on files";
"-j", Int (fun j->config.j <- j), " maximum number of simultaneous connections";
]) (fun s -> dir_ := s) "http_of_dir [options] [dir]"; ]) (fun s -> dir_ := s) "http_of_dir [options] [dir]";
match serve ~config !dir_ with match serve ~config !dir_ with
| Ok () -> () | Ok () -> ()

View file

@ -7,7 +7,12 @@ let debug_ k =
) )
let () = let () =
let server = S.create () in let j = ref 32 in
Arg.parse (Arg.align [
"--debug", Arg.Unit (fun () -> S._enable_debug true), " enable debug";
"-j", Arg.Set_int j, " maximum number of connections";
]) (fun _ -> raise (Arg.Bad "")) "echo [option]*";
let server = S.create ~max_connections:!j () in
(* say hello *) (* say hello *)
S.add_path_handler ~meth:`GET server S.add_path_handler ~meth:`GET server
"/hello/%s@/" (fun name _req -> S.Response.make_string (Ok ("hello " ^name ^"!\n"))); "/hello/%s@/" (fun name _req -> S.Response.make_string (Ok ("hello " ^name ^"!\n")));