use a resource pool to recycle buffers

This commit is contained in:
Simon Cruanes 2023-06-05 23:38:02 -04:00
parent 5ba1ff0de4
commit b9faebffe4
7 changed files with 74 additions and 35 deletions

View file

@ -5,4 +5,4 @@ module Util = Tiny_httpd_util
module Dir = Tiny_httpd_dir module Dir = Tiny_httpd_dir
module Html = Tiny_httpd_html module Html = Tiny_httpd_html
module IO = Tiny_httpd_io module IO = Tiny_httpd_io
module Pool = Pool module Pool = Tiny_httpd_pool

View file

@ -102,7 +102,7 @@ module Util = Tiny_httpd_util
(** {2 Resource pool} *) (** {2 Resource pool} *)
module Pool = Pool module Pool = Tiny_httpd_pool
(** {2 Static directory serving} *) (** {2 Static directory serving} *)

View file

@ -4,12 +4,13 @@ type 'a list_ = Nil | Cons of int * 'a * 'a list_
type 'a t = { type 'a t = {
mk_item: unit -> 'a; mk_item: unit -> 'a;
clear: 'a -> unit;
max_size: int; (** Max number of items *) max_size: int; (** Max number of items *)
items: 'a list_ A.t; items: 'a list_ A.t;
} }
let create ~mk_item ?(max_size = 128) () : _ t = let create ?(clear = ignore) ~mk_item ?(max_size = 512) () : _ t =
{ mk_item; max_size; items = A.make Nil } { mk_item; clear; max_size; items = A.make Nil }
let rec acquire_ self = let rec acquire_ self =
match A.get self.items with match A.get self.items with
@ -24,14 +25,19 @@ let[@inline] size_ = function
| Cons (sz, _, _) -> sz | Cons (sz, _, _) -> sz
| Nil -> 0 | Nil -> 0
let rec release_ self x : unit = let release_ self x : unit =
match A.get self.items with let rec loop () =
| Cons (sz, _, _) when sz >= self.max_size -> match A.get self.items with
(* forget the item *) | Cons (sz, _, _) when sz >= self.max_size ->
() (* forget the item *)
| l -> ()
if not (A.compare_and_set self.items l (Cons (size_ l + 1, x, l))) then | l ->
release_ self x if not (A.compare_and_set self.items l (Cons (size_ l + 1, x, l))) then
loop ()
in
self.clear x;
loop ()
let with_resource (self : _ t) f = let with_resource (self : _ t) f =
let x = acquire_ self in let x = acquire_ self in

View file

@ -3,8 +3,14 @@
type 'a t type 'a t
(** Pool of values of type ['a] *) (** Pool of values of type ['a] *)
val create : mk_item:(unit -> 'a) -> ?max_size:int -> unit -> 'a t val create :
(** Create a new pool. *) ?clear:('a -> unit) -> mk_item:(unit -> 'a) -> ?max_size:int -> unit -> 'a t
(** Create a new pool.
@param mk_item produce a new item in case the pool is empty
@param max_size maximum number of item in the pool before we start
dropping resources on the floor. This controls resource consumption.
@param clear a function called on items before recycling them.
*)
val with_resource : 'a t -> ('a -> 'b) -> 'b val with_resource : 'a t -> ('a -> 'b) -> 'b
(** [with_resource pool f] runs [f x] with [x] a resource; (** [with_resource pool f] runs [f x] with [x] a resource;

View file

@ -19,6 +19,7 @@ let _debug k =
module Buf = Tiny_httpd_buf module Buf = Tiny_httpd_buf
module Byte_stream = Tiny_httpd_stream module Byte_stream = Tiny_httpd_stream
module IO = Tiny_httpd_io module IO = Tiny_httpd_io
module Pool = Tiny_httpd_pool
exception Bad_req of int * string exception Bad_req of int * string
@ -325,9 +326,13 @@ module Request = struct
| Bad_req (c, s) -> Error (c, s) | Bad_req (c, s) -> Error (c, s)
| e -> Error (400, Printexc.to_string e) | e -> Error (400, Printexc.to_string e)
let read_body_full ?buf_size (self : byte_stream t) : string t = let read_body_full ?buf ?buf_size (self : byte_stream t) : string t =
try try
let buf = Buf.create ?size:buf_size () in let buf =
match buf with
| Some b -> b
| None -> Buf.create ?size:buf_size ()
in
let body = Byte_stream.read_all ~buf self.body in let body = Byte_stream.read_all ~buf self.body in
{ self with body } { self with body }
with with
@ -424,12 +429,12 @@ module Response = struct
Format.fprintf out "{@[code=%d;@ headers=[@[%a@]];@ body=%a@]}" self.code Format.fprintf out "{@[code=%d;@ headers=[@[%a@]];@ body=%a@]}" self.code
Headers.pp self.headers pp_body self.body Headers.pp self.headers pp_body self.body
let output_ ?(buf = Buf.create ~size:256 ()) (oc : IO.Out_channel.t) let output_ ~buf (oc : IO.Out_channel.t) (self : t) : unit =
(self : t) : unit =
(* double indirection: (* double indirection:
- print into [buffer] using [bprintf] - print into [buffer] using [bprintf]
- transfer to [buf_] so we can output from there *) - transfer to [buf_] so we can output from there *)
let tmp_buffer = Buffer.create 32 in let tmp_buffer = Buffer.create 32 in
Buf.clear buf;
(* write start of reply *) (* write start of reply *)
Printf.bprintf tmp_buffer "HTTP/1.1 %d %s\r\n" self.code Printf.bprintf tmp_buffer "HTTP/1.1 %d %s\r\n" self.code
@ -650,6 +655,7 @@ type t = {
mutable path_handlers: mutable path_handlers:
(unit Request.t -> cb_path_handler resp_result option) list; (unit Request.t -> cb_path_handler resp_result option) list;
(** path handlers *) (** path handlers *)
buf_pool: Buf.t Pool.t;
} }
let get_addr_ sock = let get_addr_ sock =
@ -741,7 +747,11 @@ let add_route_handler_ ?(accept = fun _req -> Ok ()) ?(middlewares = []) ?meth
let add_route_handler (type a) ?accept ?middlewares ?meth self let add_route_handler (type a) ?accept ?middlewares ?meth self
(route : (a, _) Route.t) (f : _) : unit = (route : (a, _) Route.t) (f : _) : unit =
let tr_req _oc req ~resp f = let tr_req _oc req ~resp f =
resp (f (Request.read_body_full ~buf_size:self.buf_size req)) let req =
Pool.with_resource self.buf_pool @@ fun buf ->
Request.read_body_full ~buf req
in
resp (f req)
in in
add_route_handler_ ?accept ?middlewares ?meth self route ~tr_req f add_route_handler_ ?accept ?middlewares ?meth self route ~tr_req f
@ -758,7 +768,10 @@ exception Exit_SSE
let add_route_server_sent_handler ?accept self route f = let add_route_server_sent_handler ?accept self route f =
let tr_req (oc : IO.Out_channel.t) req ~resp f = let tr_req (oc : IO.Out_channel.t) req ~resp f =
let req = Request.read_body_full ~buf_size:self.buf_size req in let req =
Pool.with_resource self.buf_pool @@ fun buf ->
Request.read_body_full ~buf req
in
let headers = let headers =
ref Headers.(empty |> set "content-type" "text/event-stream") ref Headers.(empty |> set "content-type" "text/event-stream")
in in
@ -821,6 +834,10 @@ let create_from ?(buf_size = 16 * 1_024) ?(middlewares = []) ~backend () : t =
path_handlers = []; path_handlers = [];
middlewares = []; middlewares = [];
middlewares_sorted = lazy []; middlewares_sorted = lazy [];
buf_pool =
Pool.create ~clear:Buf.clear
~mk_item:(fun () -> Buf.create ~size:buf_size ())
();
} }
in in
List.iter (fun (stage, m) -> add_middleware self ~stage m) middlewares; List.iter (fun (stage, m) -> add_middleware self ~stage m) middlewares;
@ -981,7 +998,8 @@ let find_map f l =
(* handle client on [ic] and [oc] *) (* handle client on [ic] and [oc] *)
let client_handle_for (self : t) ic oc : unit = let client_handle_for (self : t) ic oc : unit =
let buf = Buf.create ~size:self.buf_size () in Pool.with_resource self.buf_pool @@ fun buf ->
Pool.with_resource self.buf_pool @@ fun buf_res ->
let is = Byte_stream.of_input ~buf_size:self.buf_size ic in let is = Byte_stream.of_input ~buf_size:self.buf_size ic in
let continue = ref true in let continue = ref true in
while !continue && running self do while !continue && running self do
@ -992,7 +1010,7 @@ let client_handle_for (self : t) ic oc : unit =
| Error (c, s) -> | Error (c, s) ->
(* connection error, close *) (* connection error, close *)
let res = Response.make_raw ~code:c s in let res = Response.make_raw ~code:c s in
(try Response.output_ oc res with Sys_error _ -> ()); (try Response.output_ ~buf:buf_res oc res with Sys_error _ -> ());
continue := false continue := false
| Ok (Some req) -> | Ok (Some req) ->
_debug (fun k -> k "req: %s" (Format.asprintf "@[%a@]" Request.pp_ req)); _debug (fun k -> k "req: %s" (Format.asprintf "@[%a@]" Request.pp_ req));
@ -1007,7 +1025,8 @@ let client_handle_for (self : t) ic oc : unit =
| None -> | None ->
fun _oc req ~resp -> fun _oc req ~resp ->
let body_str = let body_str =
Request.read_body_full ~buf_size:self.buf_size req Pool.with_resource self.buf_pool @@ fun buf ->
Request.read_body_full ~buf req
in in
resp (self.handler body_str) resp (self.handler body_str)
in in
@ -1016,7 +1035,7 @@ let client_handle_for (self : t) ic oc : unit =
(match Request.get_header ~f:String.trim req "Expect" with (match Request.get_header ~f:String.trim req "Expect" with
| Some "100-continue" -> | Some "100-continue" ->
_debug (fun k -> k "send back: 100 CONTINUE"); _debug (fun k -> k "send back: 100 CONTINUE");
Response.output_ oc (Response.make_raw ~code:100 "") Response.output_ ~buf:buf_res oc (Response.make_raw ~code:100 "")
| Some s -> bad_reqf 417 "unknown expectation %s" s | Some s -> bad_reqf 417 "unknown expectation %s" s
| None -> ()); | None -> ());
@ -1041,7 +1060,7 @@ let client_handle_for (self : t) ic oc : unit =
try try
if Headers.get "connection" r.Response.headers = Some "close" then if Headers.get "connection" r.Response.headers = Some "close" then
continue := false; continue := false;
Response.output_ oc r Response.output_ ~buf:buf_res oc r
with Sys_error _ -> continue := false with Sys_error _ -> continue := false
in in
@ -1052,10 +1071,10 @@ let client_handle_for (self : t) ic oc : unit =
(* connection broken somehow *) (* connection broken somehow *)
| Bad_req (code, s) -> | Bad_req (code, s) ->
continue := false; continue := false;
Response.output_ oc @@ Response.make_raw ~code s Response.output_ ~buf:buf_res oc @@ Response.make_raw ~code s
| e -> | e ->
continue := false; continue := false;
Response.output_ oc Response.output_ ~buf:buf_res oc
@@ Response.fail ~code:500 "server error: %s" (Printexc.to_string e)) @@ Response.fail ~code:500 "server error: %s" (Printexc.to_string e))
done done

View file

@ -148,10 +148,13 @@ module Request : sig
@since 0.3 @since 0.3
*) *)
val read_body_full : ?buf_size:int -> byte_stream t -> string t val read_body_full :
?buf:Tiny_httpd_buf.t -> ?buf_size:int -> byte_stream t -> string t
(** Read the whole body into a string. Potentially blocking. (** Read the whole body into a string. Potentially blocking.
@param buf_size initial size of underlying buffer (since 0.11) *) @param buf_size initial size of underlying buffer (since 0.11)
@param buf the initial buffer (since NEXT_RELEASE)
*)
(**/**) (**/**)

View file

@ -1,5 +1,6 @@
module IO = Tiny_httpd_io module IO = Tiny_httpd_io
module H = Tiny_httpd_server module H = Tiny_httpd_server
module Pool = Tiny_httpd_pool
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
@ -17,8 +18,9 @@ let get_max_connection_ ?(max_connections = 64) () : int =
let buf_size = 16 * 1024 let buf_size = 16 * 1024
let ic_of_flow (flow : Eio.Net.stream_socket) : IO.In_channel.t = let ic_of_flow ~buf_pool:ic_pool (flow : Eio.Net.stream_socket) :
let cstruct = Cstruct.create buf_size in IO.In_channel.t =
Pool.with_resource ic_pool @@ fun cstruct ->
let len_slice = ref 0 in let len_slice = ref 0 in
let offset = ref 0 in let offset = ref 0 in
@ -46,9 +48,10 @@ let ic_of_flow (flow : Eio.Net.stream_socket) : IO.In_channel.t =
let close () = flow#shutdown `Receive in let close () = flow#shutdown `Receive in
{ IO.In_channel.input; close } { IO.In_channel.input; close }
let oc_of_flow (flow : Eio.Net.stream_socket) : IO.Out_channel.t = let oc_of_flow ~buf_pool:oc_pool (flow : Eio.Net.stream_socket) :
IO.Out_channel.t =
(* write buffer *) (* write buffer *)
let wbuf = Bytes.create buf_size in Pool.with_resource oc_pool @@ fun wbuf ->
let offset = ref 0 in let offset = ref 0 in
let flush () = let flush () =
@ -100,6 +103,8 @@ let io_backend ?(addr = "127.0.0.1") ?(port = 8080) ?max_connections
let init_addr () = addr let init_addr () = addr
let init_port () = port let init_port () = port
let get_time_s () = Unix.gettimeofday () let get_time_s () = Unix.gettimeofday ()
let ic_pool = Pool.create ~mk_item:(fun () -> Cstruct.create buf_size) ()
let oc_pool = Pool.create ~mk_item:(fun () -> Bytes.create buf_size) ()
let tcp_server () : IO.TCP_server.builder = let tcp_server () : IO.TCP_server.builder =
{ {
@ -151,8 +156,8 @@ let io_backend ?(addr = "127.0.0.1") ?(port = 8080) ?max_connections
k "Tiny_httpd_eio: client handler returned"); k "Tiny_httpd_eio: client handler returned");
Atomic.decr active_conns) Atomic.decr active_conns)
in in
let ic = ic_of_flow flow in let ic = ic_of_flow ~buf_pool:ic_pool flow in
let oc = oc_of_flow flow in let oc = oc_of_flow ~buf_pool:oc_pool flow in
handle.handle ic oc) handle.handle ic oc)
done); done);
} }