From 34a1cc17693f9ec206275a47527d713a4e20c97f Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 13:22:15 -0400 Subject: [PATCH] tiny_httpd: use picos semaphore; tweak pool size, buf size --- dune-project | 3 +- nanoev_tiny_httpd.opam | 3 +- src/tiny_httpd/dune | 1 + src/tiny_httpd/nanoev_tiny_httpd.ml | 76 +++++++++------------------- src/tiny_httpd/nanoev_tiny_httpd.mli | 1 + 5 files changed, 31 insertions(+), 53 deletions(-) diff --git a/dune-project b/dune-project index b5f2f69..ff2150c 100644 --- a/dune-project +++ b/dune-project @@ -40,7 +40,8 @@ ocaml dune nanoev - picos + (picos (>= 0.6)) + picos_std (tiny_httpd (>= 0.17))) (tags (nanoev http))) diff --git a/nanoev_tiny_httpd.opam b/nanoev_tiny_httpd.opam index 9504331..ea48089 100644 --- a/nanoev_tiny_httpd.opam +++ b/nanoev_tiny_httpd.opam @@ -11,7 +11,8 @@ depends: [ "ocaml" "dune" {>= "2.7"} "nanoev" - "picos" + "picos" {>= "0.6"} + "picos_std" "tiny_httpd" {>= "0.17"} "odoc" {with-doc} ] diff --git a/src/tiny_httpd/dune b/src/tiny_httpd/dune index a372e9d..d68f539 100644 --- a/src/tiny_httpd/dune +++ b/src/tiny_httpd/dune @@ -6,5 +6,6 @@ picos (re_export nanoev) nanoev.picos + picos_std.sync (re_export iostream) (re_export tiny_httpd))) diff --git a/src/tiny_httpd/nanoev_tiny_httpd.ml b/src/tiny_httpd/nanoev_tiny_httpd.ml index 290b0ad..46314e8 100644 --- a/src/tiny_httpd/nanoev_tiny_httpd.ml +++ b/src/tiny_httpd/nanoev_tiny_httpd.ml @@ -5,45 +5,8 @@ module Slice = Iostream.Slice module Pool = TH.Pool module Buf = TH.Buf -let unwrap_ = function - | None -> () - | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt - +module Sem_ = Picos_std_sync.Semaphore.Counting (** Non blocking semaphore *) -module Sem_ = struct - type t = { - mutable n: int; - max: int; - waiting: Picos.Trigger.t Queue.t; - mutex: Mutex.t; - } - - let create n = - if n <= 0 then invalid_arg "Semaphore.create"; - { n; max = n; mutex = Mutex.create (); waiting = Queue.create () } - - let acquire self = - Mutex.lock self.mutex; - while self.n = 0 do - let tr = Picos.Trigger.create () in - Queue.push tr self.waiting; - Mutex.unlock self.mutex; - let res = Picos.Trigger.await tr in - unwrap_ res; - Mutex.lock self.mutex - done; - assert (self.n > 0); - self.n <- self.n - 1; - Mutex.unlock self.mutex - - let release self = - Mutex.lock self.mutex; - self.n <- self.n + 1; - Option.iter Picos.Trigger.signal (Queue.take_opt self.waiting); - Mutex.unlock self.mutex - - let num_acquired self = self.max - self.n -end module Out = struct open Iostream @@ -147,7 +110,7 @@ module Unix_tcp_server_ = struct new_thread: (unit -> unit) -> unit; timeout: float; masksigpipe: bool; - mutable running: bool; (* TODO: use an atomic? *) + running: bool Atomic.t; } let shutdown_silent_ fd = @@ -183,7 +146,7 @@ module Unix_tcp_server_ = struct let inet_addr = Unix.inet_addr_of_string self.addr in Unix.setsockopt sock Unix.SO_REUSEADDR true; Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port)); - let n_listen = 2 * self.max_connections in + let n_listen = self.max_connections in Unix.listen sock n_listen ); @@ -191,10 +154,16 @@ module Unix_tcp_server_ = struct let tcp_server = { - TH.IO.TCP_server.stop = (fun () -> self.running <- false); - running = (fun () -> self.running); + TH.IO.TCP_server.stop = + (fun () -> + Atomic.set self.running false; + + (* close accept socket so the main loop will return *) + try Unix.close sock with _ -> ()); + running = (fun () -> Atomic.get self.running); active_connections = - (fun () -> Sem_.num_acquired self.sem_max_connections); + (fun () -> + self.max_connections - Sem_.get_value self.sem_max_connections); endpoint = (fun () -> let addr, port = get_addr_ sock in @@ -233,7 +202,7 @@ module Unix_tcp_server_ = struct in Unix.set_nonblock sock; - while self.running do + while Atomic.get self.running do match EV.accept sock with | client_sock, client_addr -> (* limit concurrency *) @@ -272,7 +241,7 @@ module Unix_tcp_server_ = struct done; (* Wait for all threads to be done: this only works if all threads are done. *) - Unix.close sock; + (try Unix.close sock with _ -> ()); (* TODO? *) (* Sem_.acquire self.sem_max_connections.max self.sem_max_connections; *) ()); @@ -290,11 +259,16 @@ open struct slice.len <- 0 end -let create ?(masksigpipe = not Sys.win32) ?max_connections ?(timeout = 0.0) - ?buf_size ?(get_time_s = Unix.gettimeofday) ?(addr = "127.0.0.1") - ?(port = 8080) ?sock ?middlewares ~new_thread () : TH.Server.t = +let create ?(masksigpipe = not Sys.win32) ?max_connections ?max_buf_pool_size + ?(timeout = 0.0) ?buf_size ?(get_time_s = Unix.gettimeofday) + ?(addr = "127.0.0.1") ?(port = 8080) ?sock ?middlewares ~new_thread () : + TH.Server.t = let max_connections = get_max_connection_ ?max_connections () in - let max_pool_size = min 4096 max_connections * 2 in + let max_pool_size = + match max_buf_pool_size with + | None -> min 4096 max_connections * 2 + | Some m -> m + in let server = { Unix_tcp_server_.addr; @@ -309,11 +283,11 @@ let create ?(masksigpipe = not Sys.win32) ?max_connections ?(timeout = 0.0) (let buf_size = Option.value buf_size ~default:4096 in fun () -> Slice.create buf_size) (); - running = true; + running = Atomic.make true; port; sock; max_connections; - sem_max_connections = Sem_.create max_connections; + sem_max_connections = Sem_.make max_connections; masksigpipe; timeout; } diff --git a/src/tiny_httpd/nanoev_tiny_httpd.mli b/src/tiny_httpd/nanoev_tiny_httpd.mli index f0cd9af..7c858e5 100644 --- a/src/tiny_httpd/nanoev_tiny_httpd.mli +++ b/src/tiny_httpd/nanoev_tiny_httpd.mli @@ -3,6 +3,7 @@ module TH = Tiny_httpd_core val create : ?masksigpipe:bool -> ?max_connections:int -> + ?max_buf_pool_size:int -> ?timeout:float -> ?buf_size:int -> ?get_time_s:(unit -> float) ->