perf: add a buffer pool to reuse buffers, make ~buf mandatory

no more allocation of new buffers for each small thing, new request,
etc. Instead we keep a pool of buffers and use a weak form of RAII to
make sure we recycle them once done.

- add `with_alloc_buf` for the user to do so
- add `alloc_buf_for_stream` to tie buffer's lifetime to a byte
  stream, typically for a streaming response.
This commit is contained in:
Simon Cruanes 2022-01-01 23:11:08 -05:00
parent 0b3af5cd6e
commit dc0a1f08ac
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
8 changed files with 158 additions and 26 deletions

View file

@ -73,7 +73,9 @@ let () =
S.Route.(exact "zcat" @/ string_urlencoded @/ return) S.Route.(exact "zcat" @/ string_urlencoded @/ return)
(fun path _req -> (fun path _req ->
let ic = open_in path in let ic = open_in path in
let str = S.Byte_stream.of_chan ic in let str =
S.alloc_buf_for_stream server @@ fun buf ->
S.Byte_stream.of_chan ~buf ic in
let mime_type = let mime_type =
try try
let p = Unix.open_process_in (Printf.sprintf "file -i -b %S" path) in let p = Unix.open_process_in (Printf.sprintf "file -i -b %S" path) in

View file

@ -75,10 +75,11 @@ echo:
*) *)
(** {2 Tiny buffer implementation} (** Buffers
These buffers are used to avoid allocating too many byte arrays when These buffers are used to avoid allocating too many byte arrays when
processing streams and parsing requests. processing streams and parsing requests.
They should be allocated through the main server, see {!with_buf}.
*) *)
module Buf = Tiny_httpd_buf module Buf = Tiny_httpd_buf

View file

@ -3,6 +3,7 @@ type t = {
mutable bytes: bytes; mutable bytes: bytes;
mutable i: int; mutable i: int;
} }
type buf = t
let create ?(size=4_096) () : t = let create ?(size=4_096) () : t =
{ bytes=Bytes.make size ' '; i=0 } { bytes=Bytes.make size ' '; i=0 }
@ -33,3 +34,38 @@ let contents_and_clear (self:t) : string =
let x = contents self in let x = contents self in
clear self; clear self;
x x
module Pool = struct
let b_create_ = create
type t = {
buf_size: int;
mutable n: int;
mutable bufs: buf list;
}
let create ?(buf_size=16 * 1024) () : t =
{ buf_size;
n=0; bufs=[];
}
let alloc self =
match self.bufs with
| [] -> b_create_ ~size:self.buf_size ()
| b :: tl ->
self.bufs <- tl;
self.n <- self.n - 1;
b
let max_bufs_ = 64 (* do not recycle buffers if we already have that many *)
let dealloc self b =
if self.n < max_bufs_ &&
Bytes.length b.bytes >= self.buf_size
then (
clear b;
self.n <- self.n + 1;
self.bufs <- b :: self.bufs
)
end

View file

@ -8,9 +8,15 @@
*) *)
type t type t
type buf = t
val size : t -> int val size : t -> int
val clear : t -> unit val clear : t -> unit
val create : ?size:int -> unit -> t val create : ?size:int -> unit -> t
val contents : t -> string val contents : t -> string
val bytes_slice : t -> bytes val bytes_slice : t -> bytes
@ -25,3 +31,13 @@ val add_bytes : t -> bytes -> int -> int -> unit
(** Append given bytes slice to the buffer. (** Append given bytes slice to the buffer.
@since 0.5 *) @since 0.5 *)
(** A pool of buffers, to reuse memory. *)
module Pool : sig
type t
val create : ?buf_size:int -> unit -> t
val alloc : t -> buf
val dealloc : t -> buf -> unit
end

View file

@ -196,9 +196,9 @@ module Request = struct
self.path self.body pp_comp_ self.path_components pp_query self.query self.path self.body pp_comp_ self.path_components pp_query self.query
(* decode a "chunked" stream into a normal stream *) (* decode a "chunked" stream into a normal stream *)
let read_stream_chunked_ ?buf (bs:byte_stream) : byte_stream = let read_stream_chunked_ ?(buf=Buf.create()) (bs:byte_stream) : byte_stream =
_debug (fun k->k "body: start reading chunked stream..."); _debug (fun k->k "body: start reading chunked stream...");
Byte_stream.read_chunked ?buf Byte_stream.read_chunked ~buf
~fail:(fun s -> Bad_req (400, s)) ~fail:(fun s -> Bad_req (400, s))
bs bs
@ -290,9 +290,12 @@ module Request = struct
| e -> | e ->
Error (400, Printexc.to_string e) Error (400, Printexc.to_string e)
let read_body_full ?buf_size (self:byte_stream t) : string t = let read_body_full ?buf (self:byte_stream t) : string t =
let buf = match buf with
| Some b -> b
| None -> Buf.create ()
in
try try
let buf = Buf.create ?size:buf_size () in
let body = Byte_stream.read_all ~buf self.body in let body = Byte_stream.read_all ~buf self.body in
{ self with body } { self with body }
with with
@ -409,6 +412,13 @@ module Response = struct
| `Stream str -> Byte_stream.output_chunked oc str; | `Stream str -> Byte_stream.output_chunked oc str;
end; end;
flush oc flush oc
(** Dispose of the stream. *)
let close_ (self:t) =
match self.body with
| `Stream bs -> Byte_stream.close bs
| `String _ | `Void -> ()
end end
(* semaphore, for limiting concurrency. *) (* semaphore, for limiting concurrency. *)
@ -580,6 +590,7 @@ type t = {
masksigpipe: bool; masksigpipe: bool;
buf_size: int; buf_size: int;
buf_pool : Buf.Pool.t;
get_time_s : unit -> float; get_time_s : unit -> float;
@ -605,6 +616,23 @@ let port self = self.port
let active_connections self = Sem_.num_acquired self.sem_max_connections - 1 let active_connections self = Sem_.num_acquired self.sem_max_connections - 1
let with_alloc_buf self f =
let buf = Buf.Pool.alloc self.buf_pool in
try
let res = f buf in
Buf.Pool.dealloc self.buf_pool buf;
res
with e ->
Buf.Pool.dealloc self.buf_pool buf;
raise e
let alloc_buf_for_stream self f =
let buf = Buf.Pool.alloc self.buf_pool in
let stream = f buf in
let close = stream.Byte_stream.close in
Byte_stream.set_close stream (fun () -> Buf.Pool.dealloc self.buf_pool buf; close());
stream
let add_middleware ~stage self m = let add_middleware ~stage self m =
let stage = match stage with let stage = match stage with
| `Encoding -> 0 | `Encoding -> 0
@ -670,7 +698,13 @@ let add_route_handler_
let add_route_handler (type a) ?accept ?middlewares ?meth let add_route_handler (type a) ?accept ?middlewares ?meth
self (route:(a,_) Route.t) (f:_) : unit = self (route:(a,_) Route.t) (f:_) : unit =
let tr_req _oc req ~resp f = resp (f (Request.read_body_full ~buf_size:self.buf_size req)) in let tr_req _oc req ~resp f =
let body =
with_alloc_buf self @@ fun buf ->
Request.read_body_full ~buf req
in
resp (f body)
in
add_route_handler_ ?accept ?middlewares ?meth self route ~tr_req f add_route_handler_ ?accept ?middlewares ?meth self route ~tr_req f
let add_route_handler_stream ?accept ?middlewares ?meth self route f = let add_route_handler_stream ?accept ?middlewares ?meth self route f =
@ -683,7 +717,10 @@ let[@inline] _opt_iter ~f o = match o with
let add_route_server_sent_handler ?accept self route f = let add_route_server_sent_handler ?accept self route f =
let tr_req oc req ~resp f = let tr_req oc req ~resp f =
let req = Request.read_body_full ~buf_size:self.buf_size req in let req =
with_alloc_buf self @@ fun buf ->
Request.read_body_full ~buf req
in
let headers = ref Headers.(empty |> set "content-type" "text/event-stream") in let headers = ref Headers.(empty |> set "content-type" "text/event-stream") in
(* send response once *) (* send response once *)
@ -733,8 +770,10 @@ let create
() : t = () : t =
let handler _req = Response.fail ~code:404 "no top handler" in let handler _req = Response.fail ~code:404 "no top handler" in
let max_connections = max 4 max_connections in let max_connections = max 4 max_connections in
let buf_pool = Buf.Pool.create ~buf_size () in
let self = { let self = {
new_thread; addr; port; sock; masksigpipe; handler; buf_size; new_thread; addr; port; sock; masksigpipe; handler;
buf_size; buf_pool;
running= true; sem_max_connections=Sem_.create max_connections; running= true; sem_max_connections=Sem_.create max_connections;
path_handlers=[]; timeout; get_time_s; path_handlers=[]; timeout; get_time_s;
middlewares=[]; middlewares_sorted=lazy []; middlewares=[]; middlewares_sorted=lazy [];
@ -758,12 +797,16 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout); Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout);
let ic = Unix.in_channel_of_descr client_sock in let ic = Unix.in_channel_of_descr client_sock in
let oc = Unix.out_channel_of_descr client_sock in let oc = Unix.out_channel_of_descr client_sock in
let buf = Buf.create ~size:self.buf_size () in let buf_q = Buf.Pool.alloc self.buf_pool in
let is = Byte_stream.of_chan ~buf_size:self.buf_size ic in let buf_is = Buf.Pool.alloc self.buf_pool in
let buf_body = Buf.Pool.alloc self.buf_pool in
let is = Byte_stream.of_chan ~buf:buf_is ic in
let continue = ref true in let continue = ref true in
while !continue && self.running do while !continue && self.running do
_debug (fun k->k "read next request"); _debug (fun k->k "read next request");
match Request.parse_req_start ~get_time_s:self.get_time_s ~buf is with Buf.clear buf_q;
match Request.parse_req_start ~get_time_s:self.get_time_s ~buf:buf_q is with
| Ok None -> | Ok None ->
continue := false (* client is done *) continue := false (* client is done *)
@ -788,7 +831,9 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
| Some f -> unwrap_resp_result f | Some f -> unwrap_resp_result f
| None -> | None ->
(fun _oc req ~resp -> (fun _oc req ~resp ->
let body_str = Request.read_body_full ~buf_size:self.buf_size req in let buf = Buf.Pool.alloc self.buf_pool in
let body_str = Request.read_body_full ~buf req in
Buf.Pool.dealloc self.buf_pool buf;
resp (self.handler body_str)) resp (self.handler body_str))
in in
@ -809,8 +854,9 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
in in
(* now actually read request's body into a stream *) (* now actually read request's body into a stream *)
Buf.clear buf_body;
let req = let req =
Request.parse_body_ ~tr_stream:(fun s->s) ~buf {req with body=is} Request.parse_body_ ~tr_stream:(fun s->s) ~buf:buf_body {req with body=is}
|> unwrap_resp_result |> unwrap_resp_result
in in
@ -819,7 +865,8 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
try try
if Headers.get "connection" r.Response.headers = Some"close" then if Headers.get "connection" r.Response.headers = Some"close" then
continue := false; continue := false;
Response.output_ oc r Response.output_ oc r;
Response.close_ r;
with Sys_error _ -> continue := false with Sys_error _ -> continue := false
in in
@ -838,6 +885,11 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
continue := false; continue := false;
Response.output_ oc @@ Response.fail ~code:500 "server error: %s" (Printexc.to_string e) Response.output_ oc @@ Response.fail ~code:500 "server error: %s" (Printexc.to_string e)
done; done;
Buf.Pool.dealloc self.buf_pool buf_q;
Buf.Pool.dealloc self.buf_pool buf_is;
Buf.Pool.dealloc self.buf_pool buf_body;
_debug (fun k->k "done with client, exiting"); _debug (fun k->k "done with client, exiting");
(try Unix.close client_sock (try Unix.close client_sock
with e -> _debug (fun k->k "error when closing sock: %s" (Printexc.to_string e))); with e -> _debug (fun k->k "error when closing sock: %s" (Printexc.to_string e)));

View file

@ -145,7 +145,7 @@ module Request : sig
@since 0.3 @since 0.3
*) *)
val read_body_full : ?buf_size:int -> byte_stream t -> string t val read_body_full : ?buf:buf -> byte_stream t -> string t
(** Read the whole body into a string. Potentially blocking. (** Read the whole body into a string. Potentially blocking.
@param buf_size initial size of underlying buffer (since 0.11) *) @param buf_size initial size of underlying buffer (since 0.11) *)
@ -396,6 +396,25 @@ val create :
This parameter exists since 0.11. This parameter exists since 0.11.
*) *)
val with_alloc_buf : t -> (buf -> 'a) -> 'a
(** [with_alloc_buf server f] calls [f buf] with a buffer [buf].
It behaves like [f buf].
Make sure that the [buf] argument doesn't escape the scope of the call to
[f], as the buffer might be recycled internally.
@since NEXT_RELEASE
*)
val alloc_buf_for_stream :
t -> (buf -> byte_stream) -> byte_stream
(** Similar to {!with_alloc_buf}, except the buffer can live as long as the returned
byte stream.
This is handy along with {!Response.make_stream}, to request a buffer to
process the stream, and ensure the buffer will be recycled when
the stream is closed.
@since NEXT_RELEASE
*)
val addr : t -> string val addr : t -> string
(** Address on which the server listens. *) (** Address on which the server listens. *)

View file

@ -10,11 +10,12 @@ type t = {
mutable len : int; mutable len : int;
fill_buf: unit -> unit; fill_buf: unit -> unit;
consume: int -> unit; consume: int -> unit;
close: unit -> unit; mutable close: unit -> unit;
_rest: hidden; _rest: hidden;
} }
let[@inline] close self = self.close() let[@inline] close self = self.close()
let[@inline] set_close self f = self.close <- f
let empty = { let empty = {
bs=Bytes.empty; bs=Bytes.empty;
@ -26,7 +27,9 @@ let empty = {
_rest=(); _rest=();
} }
let make ?(bs=Bytes.create @@ 16 * 1024) ?(close=ignore) ~consume ~fill () : t = let make
?(bs=Bytes.create @@ 16 * 1024)
?(close=ignore) ~consume ~fill () : t =
let rec self = { let rec self = {
bs; bs;
off=0; off=0;
@ -43,9 +46,10 @@ let make ?(bs=Bytes.create @@ 16 * 1024) ?(close=ignore) ~consume ~fill () : t =
} in } in
self self
let of_chan_ ?(buf_size=16 * 1024) ~close ic : t = let of_chan_ ?(buf=Buf.create ~size:(16*1024) ()) ~close ic : t =
let bs = Buf.bytes_slice buf in (* we just reuse the bytes of [buf] *)
make make
~bs:(Bytes.create buf_size) ~bs
~close:(fun _ -> close ic) ~close:(fun _ -> close ic)
~consume:(fun self n -> ~consume:(fun self n ->
self.off <- self.off + n; self.off <- self.off + n;
@ -101,10 +105,10 @@ let of_bytes ?(i=0) ?len (bs:bytes) : t =
let of_string s : t = let of_string s : t =
of_bytes (Bytes.unsafe_of_string s) of_bytes (Bytes.unsafe_of_string s)
let with_file ?buf_size file f = let with_file ?buf file f =
let ic = open_in file in let ic = open_in file in
try try
let x = f (of_chan ?buf_size ic) in let x = f (of_chan ?buf ic) in
close_in ic; close_in ic;
x x
with e -> with e ->

View file

@ -28,7 +28,7 @@ type t = {
(** Consume [n] bytes from the buffer. (** Consume [n] bytes from the buffer.
This should only be called with [n <= len]. *) This should only be called with [n <= len]. *)
close: unit -> unit; mutable close: unit -> unit;
(** Close the stream. *) (** Close the stream. *)
_rest: hidden; _rest: hidden;
@ -41,13 +41,15 @@ type t = {
val close : t -> unit val close : t -> unit
(** Close stream *) (** Close stream *)
val set_close : t -> (unit -> unit) -> unit
val empty : t val empty : t
(** Stream with 0 bytes inside *) (** Stream with 0 bytes inside *)
val of_chan : ?buf_size:int -> in_channel -> t val of_chan : ?buf:Tiny_httpd_buf.t -> in_channel -> t
(** Make a buffered stream from the given channel. *) (** Make a buffered stream from the given channel. *)
val of_chan_close_noerr : ?buf_size:int -> in_channel -> t val of_chan_close_noerr : ?buf:Tiny_httpd_buf.t -> in_channel -> t
(** Same as {!of_chan} but the [close] method will never fail. *) (** Same as {!of_chan} but the [close] method will never fail. *)
val of_bytes : ?i:int -> ?len:int -> bytes -> t val of_bytes : ?i:int -> ?len:int -> bytes -> t
@ -76,7 +78,7 @@ val make :
@param init_size size of the buffer. @param init_size size of the buffer.
*) *)
val with_file : ?buf_size:int -> string -> (t -> 'a) -> 'a val with_file : ?buf:Tiny_httpd_buf.t -> string -> (t -> 'a) -> 'a
(** Open a file with given name, and obtain an input stream (** Open a file with given name, and obtain an input stream
on its content. When the function returns, the stream (and file) are closed. *) on its content. When the function returns, the stream (and file) are closed. *)