From dc0a1f08ac00addf9c2bc52fb5c6fbb0f519fa7e Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 1 Jan 2022 23:11:08 -0500 Subject: [PATCH] perf: add a buffer pool to reuse buffers, make `~buf` mandatory no more allocation of new buffers for each small thing, new request, etc. Instead we keep a pool of buffers and use a weak form of RAII to make sure we recycle them once done. - add `with_alloc_buf` for the user to do so - add `alloc_buf_for_stream` to tie buffer's lifetime to a byte stream, typically for a streaming response. --- examples/echo.ml | 4 +- src/Tiny_httpd.mli | 3 +- src/Tiny_httpd_buf.ml | 36 ++++++++++++++++++ src/Tiny_httpd_buf.mli | 16 ++++++++ src/Tiny_httpd_server.ml | 78 ++++++++++++++++++++++++++++++++------- src/Tiny_httpd_server.mli | 21 ++++++++++- src/Tiny_httpd_stream.ml | 16 +++++--- src/Tiny_httpd_stream.mli | 10 +++-- 8 files changed, 158 insertions(+), 26 deletions(-) diff --git a/examples/echo.ml b/examples/echo.ml index 0a5e4237..e025d540 100644 --- a/examples/echo.ml +++ b/examples/echo.ml @@ -73,7 +73,9 @@ let () = S.Route.(exact "zcat" @/ string_urlencoded @/ return) (fun path _req -> let ic = open_in path in - let str = S.Byte_stream.of_chan ic in + let str = + S.alloc_buf_for_stream server @@ fun buf -> + S.Byte_stream.of_chan ~buf ic in let mime_type = try let p = Unix.open_process_in (Printf.sprintf "file -i -b %S" path) in diff --git a/src/Tiny_httpd.mli b/src/Tiny_httpd.mli index c6e79d9a..950ffb2d 100644 --- a/src/Tiny_httpd.mli +++ b/src/Tiny_httpd.mli @@ -75,10 +75,11 @@ echo: *) -(** {2 Tiny buffer implementation} +(** Buffers These buffers are used to avoid allocating too many byte arrays when processing streams and parsing requests. + They should be allocated through the main server, see {!with_buf}. *) module Buf = Tiny_httpd_buf diff --git a/src/Tiny_httpd_buf.ml b/src/Tiny_httpd_buf.ml index dda9b653..39b161ff 100644 --- a/src/Tiny_httpd_buf.ml +++ b/src/Tiny_httpd_buf.ml @@ -3,6 +3,7 @@ type t = { mutable bytes: bytes; mutable i: int; } +type buf = t let create ?(size=4_096) () : t = { bytes=Bytes.make size ' '; i=0 } @@ -33,3 +34,38 @@ let contents_and_clear (self:t) : string = let x = contents self in clear self; x + +module Pool = struct + let b_create_ = create + + type t = { + buf_size: int; + mutable n: int; + mutable bufs: buf list; + } + + let create ?(buf_size=16 * 1024) () : t = + { buf_size; + n=0; bufs=[]; + } + + let alloc self = + match self.bufs with + | [] -> b_create_ ~size:self.buf_size () + | b :: tl -> + self.bufs <- tl; + self.n <- self.n - 1; + b + + let max_bufs_ = 64 (* do not recycle buffers if we already have that many *) + + let dealloc self b = + if self.n < max_bufs_ && + Bytes.length b.bytes >= self.buf_size + then ( + clear b; + self.n <- self.n + 1; + self.bufs <- b :: self.bufs + ) + +end diff --git a/src/Tiny_httpd_buf.mli b/src/Tiny_httpd_buf.mli index aa93551f..ab60e72b 100644 --- a/src/Tiny_httpd_buf.mli +++ b/src/Tiny_httpd_buf.mli @@ -8,9 +8,15 @@ *) type t + +type buf = t + val size : t -> int + val clear : t -> unit + val create : ?size:int -> unit -> t + val contents : t -> string val bytes_slice : t -> bytes @@ -25,3 +31,13 @@ val add_bytes : t -> bytes -> int -> int -> unit (** Append given bytes slice to the buffer. @since 0.5 *) +(** A pool of buffers, to reuse memory. *) +module Pool : sig + type t + + val create : ?buf_size:int -> unit -> t + + val alloc : t -> buf + + val dealloc : t -> buf -> unit +end diff --git a/src/Tiny_httpd_server.ml b/src/Tiny_httpd_server.ml index 7547ee5e..1ee56ffd 100644 --- a/src/Tiny_httpd_server.ml +++ b/src/Tiny_httpd_server.ml @@ -196,9 +196,9 @@ module Request = struct self.path self.body pp_comp_ self.path_components pp_query self.query (* decode a "chunked" stream into a normal stream *) - let read_stream_chunked_ ?buf (bs:byte_stream) : byte_stream = + let read_stream_chunked_ ?(buf=Buf.create()) (bs:byte_stream) : byte_stream = _debug (fun k->k "body: start reading chunked stream..."); - Byte_stream.read_chunked ?buf + Byte_stream.read_chunked ~buf ~fail:(fun s -> Bad_req (400, s)) bs @@ -290,9 +290,12 @@ module Request = struct | e -> Error (400, Printexc.to_string e) - let read_body_full ?buf_size (self:byte_stream t) : string t = + let read_body_full ?buf (self:byte_stream t) : string t = + let buf = match buf with + | Some b -> b + | None -> Buf.create () + in try - let buf = Buf.create ?size:buf_size () in let body = Byte_stream.read_all ~buf self.body in { self with body } with @@ -409,6 +412,13 @@ module Response = struct | `Stream str -> Byte_stream.output_chunked oc str; end; flush oc + + (** Dispose of the stream. *) + let close_ (self:t) = + match self.body with + | `Stream bs -> Byte_stream.close bs + | `String _ | `Void -> () + end (* semaphore, for limiting concurrency. *) @@ -580,6 +590,7 @@ type t = { masksigpipe: bool; buf_size: int; + buf_pool : Buf.Pool.t; get_time_s : unit -> float; @@ -605,6 +616,23 @@ let port self = self.port let active_connections self = Sem_.num_acquired self.sem_max_connections - 1 +let with_alloc_buf self f = + let buf = Buf.Pool.alloc self.buf_pool in + try + let res = f buf in + Buf.Pool.dealloc self.buf_pool buf; + res + with e -> + Buf.Pool.dealloc self.buf_pool buf; + raise e + +let alloc_buf_for_stream self f = + let buf = Buf.Pool.alloc self.buf_pool in + let stream = f buf in + let close = stream.Byte_stream.close in + Byte_stream.set_close stream (fun () -> Buf.Pool.dealloc self.buf_pool buf; close()); + stream + let add_middleware ~stage self m = let stage = match stage with | `Encoding -> 0 @@ -670,7 +698,13 @@ let add_route_handler_ let add_route_handler (type a) ?accept ?middlewares ?meth self (route:(a,_) Route.t) (f:_) : unit = - let tr_req _oc req ~resp f = resp (f (Request.read_body_full ~buf_size:self.buf_size req)) in + let tr_req _oc req ~resp f = + let body = + with_alloc_buf self @@ fun buf -> + Request.read_body_full ~buf req + in + resp (f body) + in add_route_handler_ ?accept ?middlewares ?meth self route ~tr_req f let add_route_handler_stream ?accept ?middlewares ?meth self route f = @@ -683,7 +717,10 @@ let[@inline] _opt_iter ~f o = match o with let add_route_server_sent_handler ?accept self route f = let tr_req oc req ~resp f = - let req = Request.read_body_full ~buf_size:self.buf_size req in + let req = + with_alloc_buf self @@ fun buf -> + Request.read_body_full ~buf req + in let headers = ref Headers.(empty |> set "content-type" "text/event-stream") in (* send response once *) @@ -733,8 +770,10 @@ let create () : t = let handler _req = Response.fail ~code:404 "no top handler" in let max_connections = max 4 max_connections in + let buf_pool = Buf.Pool.create ~buf_size () in let self = { - new_thread; addr; port; sock; masksigpipe; handler; buf_size; + new_thread; addr; port; sock; masksigpipe; handler; + buf_size; buf_pool; running= true; sem_max_connections=Sem_.create max_connections; path_handlers=[]; timeout; get_time_s; middlewares=[]; middlewares_sorted=lazy []; @@ -758,12 +797,16 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit = Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout); let ic = Unix.in_channel_of_descr client_sock in let oc = Unix.out_channel_of_descr client_sock in - let buf = Buf.create ~size:self.buf_size () in - let is = Byte_stream.of_chan ~buf_size:self.buf_size ic in + let buf_q = Buf.Pool.alloc self.buf_pool in + let buf_is = Buf.Pool.alloc self.buf_pool in + let buf_body = Buf.Pool.alloc self.buf_pool in + let is = Byte_stream.of_chan ~buf:buf_is ic in + let continue = ref true in while !continue && self.running do _debug (fun k->k "read next request"); - match Request.parse_req_start ~get_time_s:self.get_time_s ~buf is with + Buf.clear buf_q; + match Request.parse_req_start ~get_time_s:self.get_time_s ~buf:buf_q is with | Ok None -> continue := false (* client is done *) @@ -788,7 +831,9 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit = | Some f -> unwrap_resp_result f | None -> (fun _oc req ~resp -> - let body_str = Request.read_body_full ~buf_size:self.buf_size req in + let buf = Buf.Pool.alloc self.buf_pool in + let body_str = Request.read_body_full ~buf req in + Buf.Pool.dealloc self.buf_pool buf; resp (self.handler body_str)) in @@ -809,8 +854,9 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit = in (* now actually read request's body into a stream *) + Buf.clear buf_body; let req = - Request.parse_body_ ~tr_stream:(fun s->s) ~buf {req with body=is} + Request.parse_body_ ~tr_stream:(fun s->s) ~buf:buf_body {req with body=is} |> unwrap_resp_result in @@ -819,7 +865,8 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit = try if Headers.get "connection" r.Response.headers = Some"close" then continue := false; - Response.output_ oc r + Response.output_ oc r; + Response.close_ r; with Sys_error _ -> continue := false in @@ -838,6 +885,11 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit = continue := false; Response.output_ oc @@ Response.fail ~code:500 "server error: %s" (Printexc.to_string e) done; + + Buf.Pool.dealloc self.buf_pool buf_q; + Buf.Pool.dealloc self.buf_pool buf_is; + Buf.Pool.dealloc self.buf_pool buf_body; + _debug (fun k->k "done with client, exiting"); (try Unix.close client_sock with e -> _debug (fun k->k "error when closing sock: %s" (Printexc.to_string e))); diff --git a/src/Tiny_httpd_server.mli b/src/Tiny_httpd_server.mli index 075e14f7..2ff7092b 100644 --- a/src/Tiny_httpd_server.mli +++ b/src/Tiny_httpd_server.mli @@ -145,7 +145,7 @@ module Request : sig @since 0.3 *) - val read_body_full : ?buf_size:int -> byte_stream t -> string t + val read_body_full : ?buf:buf -> byte_stream t -> string t (** Read the whole body into a string. Potentially blocking. @param buf_size initial size of underlying buffer (since 0.11) *) @@ -396,6 +396,25 @@ val create : This parameter exists since 0.11. *) +val with_alloc_buf : t -> (buf -> 'a) -> 'a +(** [with_alloc_buf server f] calls [f buf] with a buffer [buf]. + It behaves like [f buf]. + + Make sure that the [buf] argument doesn't escape the scope of the call to + [f], as the buffer might be recycled internally. + @since NEXT_RELEASE +*) + +val alloc_buf_for_stream : + t -> (buf -> byte_stream) -> byte_stream +(** Similar to {!with_alloc_buf}, except the buffer can live as long as the returned + byte stream. + This is handy along with {!Response.make_stream}, to request a buffer to + process the stream, and ensure the buffer will be recycled when + the stream is closed. + @since NEXT_RELEASE +*) + val addr : t -> string (** Address on which the server listens. *) diff --git a/src/Tiny_httpd_stream.ml b/src/Tiny_httpd_stream.ml index af057500..8408de9e 100644 --- a/src/Tiny_httpd_stream.ml +++ b/src/Tiny_httpd_stream.ml @@ -10,11 +10,12 @@ type t = { mutable len : int; fill_buf: unit -> unit; consume: int -> unit; - close: unit -> unit; + mutable close: unit -> unit; _rest: hidden; } let[@inline] close self = self.close() +let[@inline] set_close self f = self.close <- f let empty = { bs=Bytes.empty; @@ -26,7 +27,9 @@ let empty = { _rest=(); } -let make ?(bs=Bytes.create @@ 16 * 1024) ?(close=ignore) ~consume ~fill () : t = +let make + ?(bs=Bytes.create @@ 16 * 1024) + ?(close=ignore) ~consume ~fill () : t = let rec self = { bs; off=0; @@ -43,9 +46,10 @@ let make ?(bs=Bytes.create @@ 16 * 1024) ?(close=ignore) ~consume ~fill () : t = } in self -let of_chan_ ?(buf_size=16 * 1024) ~close ic : t = +let of_chan_ ?(buf=Buf.create ~size:(16*1024) ()) ~close ic : t = + let bs = Buf.bytes_slice buf in (* we just reuse the bytes of [buf] *) make - ~bs:(Bytes.create buf_size) + ~bs ~close:(fun _ -> close ic) ~consume:(fun self n -> self.off <- self.off + n; @@ -101,10 +105,10 @@ let of_bytes ?(i=0) ?len (bs:bytes) : t = let of_string s : t = of_bytes (Bytes.unsafe_of_string s) -let with_file ?buf_size file f = +let with_file ?buf file f = let ic = open_in file in try - let x = f (of_chan ?buf_size ic) in + let x = f (of_chan ?buf ic) in close_in ic; x with e -> diff --git a/src/Tiny_httpd_stream.mli b/src/Tiny_httpd_stream.mli index fbb2d287..20bbf70d 100644 --- a/src/Tiny_httpd_stream.mli +++ b/src/Tiny_httpd_stream.mli @@ -28,7 +28,7 @@ type t = { (** Consume [n] bytes from the buffer. This should only be called with [n <= len]. *) - close: unit -> unit; + mutable close: unit -> unit; (** Close the stream. *) _rest: hidden; @@ -41,13 +41,15 @@ type t = { val close : t -> unit (** Close stream *) +val set_close : t -> (unit -> unit) -> unit + val empty : t (** Stream with 0 bytes inside *) -val of_chan : ?buf_size:int -> in_channel -> t +val of_chan : ?buf:Tiny_httpd_buf.t -> in_channel -> t (** Make a buffered stream from the given channel. *) -val of_chan_close_noerr : ?buf_size:int -> in_channel -> t +val of_chan_close_noerr : ?buf:Tiny_httpd_buf.t -> in_channel -> t (** Same as {!of_chan} but the [close] method will never fail. *) val of_bytes : ?i:int -> ?len:int -> bytes -> t @@ -76,7 +78,7 @@ val make : @param init_size size of the buffer. *) -val with_file : ?buf_size:int -> string -> (t -> 'a) -> 'a +val with_file : ?buf:Tiny_httpd_buf.t -> string -> (t -> 'a) -> 'a (** Open a file with given name, and obtain an input stream on its content. When the function returns, the stream (and file) are closed. *)