From 45026eca59c6a449411e966184b687ac17e40c76 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 5 Jun 2023 23:38:02 -0400 Subject: [PATCH] use a resource pool to recycle buffers --- src/Tiny_httpd.ml | 2 +- src/Tiny_httpd.mli | 2 +- src/Tiny_httpd_pool.ml | 26 +++++++++++++--------- src/Tiny_httpd_pool.mli | 10 +++++++-- src/Tiny_httpd_server.ml | 45 ++++++++++++++++++++++++++++----------- src/Tiny_httpd_server.mli | 7 ++++-- src/eio/tiny_httpd_eio.ml | 17 +++++++++------ 7 files changed, 74 insertions(+), 35 deletions(-) diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index 4011fae2..cddbc0db 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -5,4 +5,4 @@ module Util = Tiny_httpd_util module Dir = Tiny_httpd_dir module Html = Tiny_httpd_html module IO = Tiny_httpd_io -module Pool = Pool +module Pool = Tiny_httpd_pool diff --git a/src/Tiny_httpd.mli b/src/Tiny_httpd.mli index be1a6f24..1ab90cc5 100644 --- a/src/Tiny_httpd.mli +++ b/src/Tiny_httpd.mli @@ -102,7 +102,7 @@ module Util = Tiny_httpd_util (** {2 Resource pool} *) -module Pool = Pool +module Pool = Tiny_httpd_pool (** {2 Static directory serving} *) diff --git a/src/Tiny_httpd_pool.ml b/src/Tiny_httpd_pool.ml index 97b8e1ea..0ef45d4a 100644 --- a/src/Tiny_httpd_pool.ml +++ b/src/Tiny_httpd_pool.ml @@ -4,12 +4,13 @@ type 'a list_ = Nil | Cons of int * 'a * 'a list_ type 'a t = { mk_item: unit -> 'a; + clear: 'a -> unit; max_size: int; (** Max number of items *) items: 'a list_ A.t; } -let create ~mk_item ?(max_size = 128) () : _ t = - { mk_item; max_size; items = A.make Nil } +let create ?(clear = ignore) ~mk_item ?(max_size = 512) () : _ t = + { mk_item; clear; max_size; items = A.make Nil } let rec acquire_ self = match A.get self.items with @@ -24,14 +25,19 @@ let[@inline] size_ = function | Cons (sz, _, _) -> sz | Nil -> 0 -let rec release_ self x : unit = - match A.get self.items with - | Cons (sz, _, _) when sz >= self.max_size -> - (* forget the item *) - () - | l -> - if not (A.compare_and_set self.items l (Cons (size_ l + 1, x, l))) then - release_ self x +let release_ self x : unit = + let rec loop () = + match A.get self.items with + | Cons (sz, _, _) when sz >= self.max_size -> + (* forget the item *) + () + | l -> + if not (A.compare_and_set self.items l (Cons (size_ l + 1, x, l))) then + loop () + in + + self.clear x; + loop () let with_resource (self : _ t) f = let x = acquire_ self in diff --git a/src/Tiny_httpd_pool.mli b/src/Tiny_httpd_pool.mli index dd702f44..117869ef 100644 --- a/src/Tiny_httpd_pool.mli +++ b/src/Tiny_httpd_pool.mli @@ -3,8 +3,14 @@ type 'a t (** Pool of values of type ['a] *) -val create : mk_item:(unit -> 'a) -> ?max_size:int -> unit -> 'a t -(** Create a new pool. *) +val create : + ?clear:('a -> unit) -> mk_item:(unit -> 'a) -> ?max_size:int -> unit -> 'a t +(** Create a new pool. + @param mk_item produce a new item in case the pool is empty + @param max_size maximum number of item in the pool before we start + dropping resources on the floor. This controls resource consumption. + @param clear a function called on items before recycling them. + *) val with_resource : 'a t -> ('a -> 'b) -> 'b (** [with_resource pool f] runs [f x] with [x] a resource; diff --git a/src/Tiny_httpd_server.ml b/src/Tiny_httpd_server.ml index ef897fb9..f4f1f07a 100644 --- a/src/Tiny_httpd_server.ml +++ b/src/Tiny_httpd_server.ml @@ -19,6 +19,7 @@ let _debug k = module Buf = Tiny_httpd_buf module Byte_stream = Tiny_httpd_stream module IO = Tiny_httpd_io +module Pool = Tiny_httpd_pool exception Bad_req of int * string @@ -325,9 +326,13 @@ module Request = struct | Bad_req (c, s) -> Error (c, s) | e -> Error (400, Printexc.to_string e) - let read_body_full ?buf_size (self : byte_stream t) : string t = + let read_body_full ?buf ?buf_size (self : byte_stream t) : string t = try - let buf = Buf.create ?size:buf_size () in + let buf = + match buf with + | Some b -> b + | None -> Buf.create ?size:buf_size () + in let body = Byte_stream.read_all ~buf self.body in { self with body } with @@ -424,12 +429,12 @@ module Response = struct Format.fprintf out "{@[code=%d;@ headers=[@[%a@]];@ body=%a@]}" self.code Headers.pp self.headers pp_body self.body - let output_ ?(buf = Buf.create ~size:256 ()) (oc : IO.Out_channel.t) - (self : t) : unit = + let output_ ~buf (oc : IO.Out_channel.t) (self : t) : unit = (* double indirection: - print into [buffer] using [bprintf] - transfer to [buf_] so we can output from there *) let tmp_buffer = Buffer.create 32 in + Buf.clear buf; (* write start of reply *) Printf.bprintf tmp_buffer "HTTP/1.1 %d %s\r\n" self.code @@ -650,6 +655,7 @@ type t = { mutable path_handlers: (unit Request.t -> cb_path_handler resp_result option) list; (** path handlers *) + buf_pool: Buf.t Pool.t; } let get_addr_ sock = @@ -741,7 +747,11 @@ let add_route_handler_ ?(accept = fun _req -> Ok ()) ?(middlewares = []) ?meth let add_route_handler (type a) ?accept ?middlewares ?meth self (route : (a, _) Route.t) (f : _) : unit = let tr_req _oc req ~resp f = - resp (f (Request.read_body_full ~buf_size:self.buf_size req)) + let req = + Pool.with_resource self.buf_pool @@ fun buf -> + Request.read_body_full ~buf req + in + resp (f req) in add_route_handler_ ?accept ?middlewares ?meth self route ~tr_req f @@ -758,7 +768,10 @@ exception Exit_SSE let add_route_server_sent_handler ?accept self route f = let tr_req (oc : IO.Out_channel.t) req ~resp f = - let req = Request.read_body_full ~buf_size:self.buf_size req in + let req = + Pool.with_resource self.buf_pool @@ fun buf -> + Request.read_body_full ~buf req + in let headers = ref Headers.(empty |> set "content-type" "text/event-stream") in @@ -821,6 +834,10 @@ let create_from ?(buf_size = 16 * 1_024) ?(middlewares = []) ~backend () : t = path_handlers = []; middlewares = []; middlewares_sorted = lazy []; + buf_pool = + Pool.create ~clear:Buf.clear + ~mk_item:(fun () -> Buf.create ~size:buf_size ()) + (); } in List.iter (fun (stage, m) -> add_middleware self ~stage m) middlewares; @@ -981,7 +998,8 @@ let find_map f l = (* handle client on [ic] and [oc] *) let client_handle_for (self : t) ic oc : unit = - let buf = Buf.create ~size:self.buf_size () in + Pool.with_resource self.buf_pool @@ fun buf -> + Pool.with_resource self.buf_pool @@ fun buf_res -> let is = Byte_stream.of_input ~buf_size:self.buf_size ic in let continue = ref true in while !continue && running self do @@ -992,7 +1010,7 @@ let client_handle_for (self : t) ic oc : unit = | Error (c, s) -> (* connection error, close *) let res = Response.make_raw ~code:c s in - (try Response.output_ oc res with Sys_error _ -> ()); + (try Response.output_ ~buf:buf_res oc res with Sys_error _ -> ()); continue := false | Ok (Some req) -> _debug (fun k -> k "req: %s" (Format.asprintf "@[%a@]" Request.pp_ req)); @@ -1007,7 +1025,8 @@ let client_handle_for (self : t) ic oc : unit = | None -> fun _oc req ~resp -> let body_str = - Request.read_body_full ~buf_size:self.buf_size req + Pool.with_resource self.buf_pool @@ fun buf -> + Request.read_body_full ~buf req in resp (self.handler body_str) in @@ -1016,7 +1035,7 @@ let client_handle_for (self : t) ic oc : unit = (match Request.get_header ~f:String.trim req "Expect" with | Some "100-continue" -> _debug (fun k -> k "send back: 100 CONTINUE"); - Response.output_ oc (Response.make_raw ~code:100 "") + Response.output_ ~buf:buf_res oc (Response.make_raw ~code:100 "") | Some s -> bad_reqf 417 "unknown expectation %s" s | None -> ()); @@ -1041,7 +1060,7 @@ let client_handle_for (self : t) ic oc : unit = try if Headers.get "connection" r.Response.headers = Some "close" then continue := false; - Response.output_ oc r + Response.output_ ~buf:buf_res oc r with Sys_error _ -> continue := false in @@ -1052,10 +1071,10 @@ let client_handle_for (self : t) ic oc : unit = (* connection broken somehow *) | Bad_req (code, s) -> continue := false; - Response.output_ oc @@ Response.make_raw ~code s + Response.output_ ~buf:buf_res oc @@ Response.make_raw ~code s | e -> continue := false; - Response.output_ oc + Response.output_ ~buf:buf_res oc @@ Response.fail ~code:500 "server error: %s" (Printexc.to_string e)) done diff --git a/src/Tiny_httpd_server.mli b/src/Tiny_httpd_server.mli index 819fae14..6a0e0157 100644 --- a/src/Tiny_httpd_server.mli +++ b/src/Tiny_httpd_server.mli @@ -148,10 +148,13 @@ module Request : sig @since 0.3 *) - val read_body_full : ?buf_size:int -> byte_stream t -> string t + val read_body_full : + ?buf:Tiny_httpd_buf.t -> ?buf_size:int -> byte_stream t -> string t (** Read the whole body into a string. Potentially blocking. - @param buf_size initial size of underlying buffer (since 0.11) *) + @param buf_size initial size of underlying buffer (since 0.11) + @param buf the initial buffer (since NEXT_RELEASE) + *) (**/**) diff --git a/src/eio/tiny_httpd_eio.ml b/src/eio/tiny_httpd_eio.ml index bbcf7a9d..d8c89008 100644 --- a/src/eio/tiny_httpd_eio.ml +++ b/src/eio/tiny_httpd_eio.ml @@ -1,5 +1,6 @@ module IO = Tiny_httpd_io module H = Tiny_httpd_server +module Pool = Tiny_httpd_pool let ( let@ ) = ( @@ ) @@ -17,8 +18,9 @@ let get_max_connection_ ?(max_connections = 64) () : int = let buf_size = 16 * 1024 -let ic_of_flow (flow : Eio.Net.stream_socket) : IO.In_channel.t = - let cstruct = Cstruct.create buf_size in +let ic_of_flow ~buf_pool:ic_pool (flow : Eio.Net.stream_socket) : + IO.In_channel.t = + Pool.with_resource ic_pool @@ fun cstruct -> let len_slice = ref 0 in let offset = ref 0 in @@ -46,9 +48,10 @@ let ic_of_flow (flow : Eio.Net.stream_socket) : IO.In_channel.t = let close () = flow#shutdown `Receive in { IO.In_channel.input; close } -let oc_of_flow (flow : Eio.Net.stream_socket) : IO.Out_channel.t = +let oc_of_flow ~buf_pool:oc_pool (flow : Eio.Net.stream_socket) : + IO.Out_channel.t = (* write buffer *) - let wbuf = Bytes.create buf_size in + Pool.with_resource oc_pool @@ fun wbuf -> let offset = ref 0 in let flush () = @@ -100,6 +103,8 @@ let io_backend ?(addr = "127.0.0.1") ?(port = 8080) ?max_connections let init_addr () = addr let init_port () = port let get_time_s () = Unix.gettimeofday () + let ic_pool = Pool.create ~mk_item:(fun () -> Cstruct.create buf_size) () + let oc_pool = Pool.create ~mk_item:(fun () -> Bytes.create buf_size) () let tcp_server () : IO.TCP_server.builder = { @@ -151,8 +156,8 @@ let io_backend ?(addr = "127.0.0.1") ?(port = 8080) ?max_connections k "Tiny_httpd_eio: client handler returned"); Atomic.decr active_conns) in - let ic = ic_of_flow flow in - let oc = oc_of_flow flow in + let ic = ic_of_flow ~buf_pool:ic_pool flow in + let oc = oc_of_flow ~buf_pool:oc_pool flow in handle.handle ic oc) done); }