mirror of
https://github.com/c-cube/tiny_httpd.git
synced 2025-12-06 11:15:35 -05:00
use a resource pool to recycle buffers
This commit is contained in:
parent
e59978ca3c
commit
45026eca59
7 changed files with 74 additions and 35 deletions
|
|
@ -5,4 +5,4 @@ module Util = Tiny_httpd_util
|
||||||
module Dir = Tiny_httpd_dir
|
module Dir = Tiny_httpd_dir
|
||||||
module Html = Tiny_httpd_html
|
module Html = Tiny_httpd_html
|
||||||
module IO = Tiny_httpd_io
|
module IO = Tiny_httpd_io
|
||||||
module Pool = Pool
|
module Pool = Tiny_httpd_pool
|
||||||
|
|
|
||||||
|
|
@ -102,7 +102,7 @@ module Util = Tiny_httpd_util
|
||||||
|
|
||||||
(** {2 Resource pool} *)
|
(** {2 Resource pool} *)
|
||||||
|
|
||||||
module Pool = Pool
|
module Pool = Tiny_httpd_pool
|
||||||
|
|
||||||
(** {2 Static directory serving} *)
|
(** {2 Static directory serving} *)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,12 +4,13 @@ type 'a list_ = Nil | Cons of int * 'a * 'a list_
|
||||||
|
|
||||||
type 'a t = {
|
type 'a t = {
|
||||||
mk_item: unit -> 'a;
|
mk_item: unit -> 'a;
|
||||||
|
clear: 'a -> unit;
|
||||||
max_size: int; (** Max number of items *)
|
max_size: int; (** Max number of items *)
|
||||||
items: 'a list_ A.t;
|
items: 'a list_ A.t;
|
||||||
}
|
}
|
||||||
|
|
||||||
let create ~mk_item ?(max_size = 128) () : _ t =
|
let create ?(clear = ignore) ~mk_item ?(max_size = 512) () : _ t =
|
||||||
{ mk_item; max_size; items = A.make Nil }
|
{ mk_item; clear; max_size; items = A.make Nil }
|
||||||
|
|
||||||
let rec acquire_ self =
|
let rec acquire_ self =
|
||||||
match A.get self.items with
|
match A.get self.items with
|
||||||
|
|
@ -24,14 +25,19 @@ let[@inline] size_ = function
|
||||||
| Cons (sz, _, _) -> sz
|
| Cons (sz, _, _) -> sz
|
||||||
| Nil -> 0
|
| Nil -> 0
|
||||||
|
|
||||||
let rec release_ self x : unit =
|
let release_ self x : unit =
|
||||||
match A.get self.items with
|
let rec loop () =
|
||||||
| Cons (sz, _, _) when sz >= self.max_size ->
|
match A.get self.items with
|
||||||
(* forget the item *)
|
| Cons (sz, _, _) when sz >= self.max_size ->
|
||||||
()
|
(* forget the item *)
|
||||||
| l ->
|
()
|
||||||
if not (A.compare_and_set self.items l (Cons (size_ l + 1, x, l))) then
|
| l ->
|
||||||
release_ self x
|
if not (A.compare_and_set self.items l (Cons (size_ l + 1, x, l))) then
|
||||||
|
loop ()
|
||||||
|
in
|
||||||
|
|
||||||
|
self.clear x;
|
||||||
|
loop ()
|
||||||
|
|
||||||
let with_resource (self : _ t) f =
|
let with_resource (self : _ t) f =
|
||||||
let x = acquire_ self in
|
let x = acquire_ self in
|
||||||
|
|
|
||||||
|
|
@ -3,8 +3,14 @@
|
||||||
type 'a t
|
type 'a t
|
||||||
(** Pool of values of type ['a] *)
|
(** Pool of values of type ['a] *)
|
||||||
|
|
||||||
val create : mk_item:(unit -> 'a) -> ?max_size:int -> unit -> 'a t
|
val create :
|
||||||
(** Create a new pool. *)
|
?clear:('a -> unit) -> mk_item:(unit -> 'a) -> ?max_size:int -> unit -> 'a t
|
||||||
|
(** Create a new pool.
|
||||||
|
@param mk_item produce a new item in case the pool is empty
|
||||||
|
@param max_size maximum number of item in the pool before we start
|
||||||
|
dropping resources on the floor. This controls resource consumption.
|
||||||
|
@param clear a function called on items before recycling them.
|
||||||
|
*)
|
||||||
|
|
||||||
val with_resource : 'a t -> ('a -> 'b) -> 'b
|
val with_resource : 'a t -> ('a -> 'b) -> 'b
|
||||||
(** [with_resource pool f] runs [f x] with [x] a resource;
|
(** [with_resource pool f] runs [f x] with [x] a resource;
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ let _debug k =
|
||||||
module Buf = Tiny_httpd_buf
|
module Buf = Tiny_httpd_buf
|
||||||
module Byte_stream = Tiny_httpd_stream
|
module Byte_stream = Tiny_httpd_stream
|
||||||
module IO = Tiny_httpd_io
|
module IO = Tiny_httpd_io
|
||||||
|
module Pool = Tiny_httpd_pool
|
||||||
|
|
||||||
exception Bad_req of int * string
|
exception Bad_req of int * string
|
||||||
|
|
||||||
|
|
@ -325,9 +326,13 @@ module Request = struct
|
||||||
| Bad_req (c, s) -> Error (c, s)
|
| Bad_req (c, s) -> Error (c, s)
|
||||||
| e -> Error (400, Printexc.to_string e)
|
| e -> Error (400, Printexc.to_string e)
|
||||||
|
|
||||||
let read_body_full ?buf_size (self : byte_stream t) : string t =
|
let read_body_full ?buf ?buf_size (self : byte_stream t) : string t =
|
||||||
try
|
try
|
||||||
let buf = Buf.create ?size:buf_size () in
|
let buf =
|
||||||
|
match buf with
|
||||||
|
| Some b -> b
|
||||||
|
| None -> Buf.create ?size:buf_size ()
|
||||||
|
in
|
||||||
let body = Byte_stream.read_all ~buf self.body in
|
let body = Byte_stream.read_all ~buf self.body in
|
||||||
{ self with body }
|
{ self with body }
|
||||||
with
|
with
|
||||||
|
|
@ -424,12 +429,12 @@ module Response = struct
|
||||||
Format.fprintf out "{@[code=%d;@ headers=[@[%a@]];@ body=%a@]}" self.code
|
Format.fprintf out "{@[code=%d;@ headers=[@[%a@]];@ body=%a@]}" self.code
|
||||||
Headers.pp self.headers pp_body self.body
|
Headers.pp self.headers pp_body self.body
|
||||||
|
|
||||||
let output_ ?(buf = Buf.create ~size:256 ()) (oc : IO.Out_channel.t)
|
let output_ ~buf (oc : IO.Out_channel.t) (self : t) : unit =
|
||||||
(self : t) : unit =
|
|
||||||
(* double indirection:
|
(* double indirection:
|
||||||
- print into [buffer] using [bprintf]
|
- print into [buffer] using [bprintf]
|
||||||
- transfer to [buf_] so we can output from there *)
|
- transfer to [buf_] so we can output from there *)
|
||||||
let tmp_buffer = Buffer.create 32 in
|
let tmp_buffer = Buffer.create 32 in
|
||||||
|
Buf.clear buf;
|
||||||
|
|
||||||
(* write start of reply *)
|
(* write start of reply *)
|
||||||
Printf.bprintf tmp_buffer "HTTP/1.1 %d %s\r\n" self.code
|
Printf.bprintf tmp_buffer "HTTP/1.1 %d %s\r\n" self.code
|
||||||
|
|
@ -650,6 +655,7 @@ type t = {
|
||||||
mutable path_handlers:
|
mutable path_handlers:
|
||||||
(unit Request.t -> cb_path_handler resp_result option) list;
|
(unit Request.t -> cb_path_handler resp_result option) list;
|
||||||
(** path handlers *)
|
(** path handlers *)
|
||||||
|
buf_pool: Buf.t Pool.t;
|
||||||
}
|
}
|
||||||
|
|
||||||
let get_addr_ sock =
|
let get_addr_ sock =
|
||||||
|
|
@ -741,7 +747,11 @@ let add_route_handler_ ?(accept = fun _req -> Ok ()) ?(middlewares = []) ?meth
|
||||||
let add_route_handler (type a) ?accept ?middlewares ?meth self
|
let add_route_handler (type a) ?accept ?middlewares ?meth self
|
||||||
(route : (a, _) Route.t) (f : _) : unit =
|
(route : (a, _) Route.t) (f : _) : unit =
|
||||||
let tr_req _oc req ~resp f =
|
let tr_req _oc req ~resp f =
|
||||||
resp (f (Request.read_body_full ~buf_size:self.buf_size req))
|
let req =
|
||||||
|
Pool.with_resource self.buf_pool @@ fun buf ->
|
||||||
|
Request.read_body_full ~buf req
|
||||||
|
in
|
||||||
|
resp (f req)
|
||||||
in
|
in
|
||||||
add_route_handler_ ?accept ?middlewares ?meth self route ~tr_req f
|
add_route_handler_ ?accept ?middlewares ?meth self route ~tr_req f
|
||||||
|
|
||||||
|
|
@ -758,7 +768,10 @@ exception Exit_SSE
|
||||||
|
|
||||||
let add_route_server_sent_handler ?accept self route f =
|
let add_route_server_sent_handler ?accept self route f =
|
||||||
let tr_req (oc : IO.Out_channel.t) req ~resp f =
|
let tr_req (oc : IO.Out_channel.t) req ~resp f =
|
||||||
let req = Request.read_body_full ~buf_size:self.buf_size req in
|
let req =
|
||||||
|
Pool.with_resource self.buf_pool @@ fun buf ->
|
||||||
|
Request.read_body_full ~buf req
|
||||||
|
in
|
||||||
let headers =
|
let headers =
|
||||||
ref Headers.(empty |> set "content-type" "text/event-stream")
|
ref Headers.(empty |> set "content-type" "text/event-stream")
|
||||||
in
|
in
|
||||||
|
|
@ -821,6 +834,10 @@ let create_from ?(buf_size = 16 * 1_024) ?(middlewares = []) ~backend () : t =
|
||||||
path_handlers = [];
|
path_handlers = [];
|
||||||
middlewares = [];
|
middlewares = [];
|
||||||
middlewares_sorted = lazy [];
|
middlewares_sorted = lazy [];
|
||||||
|
buf_pool =
|
||||||
|
Pool.create ~clear:Buf.clear
|
||||||
|
~mk_item:(fun () -> Buf.create ~size:buf_size ())
|
||||||
|
();
|
||||||
}
|
}
|
||||||
in
|
in
|
||||||
List.iter (fun (stage, m) -> add_middleware self ~stage m) middlewares;
|
List.iter (fun (stage, m) -> add_middleware self ~stage m) middlewares;
|
||||||
|
|
@ -981,7 +998,8 @@ let find_map f l =
|
||||||
|
|
||||||
(* handle client on [ic] and [oc] *)
|
(* handle client on [ic] and [oc] *)
|
||||||
let client_handle_for (self : t) ic oc : unit =
|
let client_handle_for (self : t) ic oc : unit =
|
||||||
let buf = Buf.create ~size:self.buf_size () in
|
Pool.with_resource self.buf_pool @@ fun buf ->
|
||||||
|
Pool.with_resource self.buf_pool @@ fun buf_res ->
|
||||||
let is = Byte_stream.of_input ~buf_size:self.buf_size ic in
|
let is = Byte_stream.of_input ~buf_size:self.buf_size ic in
|
||||||
let continue = ref true in
|
let continue = ref true in
|
||||||
while !continue && running self do
|
while !continue && running self do
|
||||||
|
|
@ -992,7 +1010,7 @@ let client_handle_for (self : t) ic oc : unit =
|
||||||
| Error (c, s) ->
|
| Error (c, s) ->
|
||||||
(* connection error, close *)
|
(* connection error, close *)
|
||||||
let res = Response.make_raw ~code:c s in
|
let res = Response.make_raw ~code:c s in
|
||||||
(try Response.output_ oc res with Sys_error _ -> ());
|
(try Response.output_ ~buf:buf_res oc res with Sys_error _ -> ());
|
||||||
continue := false
|
continue := false
|
||||||
| Ok (Some req) ->
|
| Ok (Some req) ->
|
||||||
_debug (fun k -> k "req: %s" (Format.asprintf "@[%a@]" Request.pp_ req));
|
_debug (fun k -> k "req: %s" (Format.asprintf "@[%a@]" Request.pp_ req));
|
||||||
|
|
@ -1007,7 +1025,8 @@ let client_handle_for (self : t) ic oc : unit =
|
||||||
| None ->
|
| None ->
|
||||||
fun _oc req ~resp ->
|
fun _oc req ~resp ->
|
||||||
let body_str =
|
let body_str =
|
||||||
Request.read_body_full ~buf_size:self.buf_size req
|
Pool.with_resource self.buf_pool @@ fun buf ->
|
||||||
|
Request.read_body_full ~buf req
|
||||||
in
|
in
|
||||||
resp (self.handler body_str)
|
resp (self.handler body_str)
|
||||||
in
|
in
|
||||||
|
|
@ -1016,7 +1035,7 @@ let client_handle_for (self : t) ic oc : unit =
|
||||||
(match Request.get_header ~f:String.trim req "Expect" with
|
(match Request.get_header ~f:String.trim req "Expect" with
|
||||||
| Some "100-continue" ->
|
| Some "100-continue" ->
|
||||||
_debug (fun k -> k "send back: 100 CONTINUE");
|
_debug (fun k -> k "send back: 100 CONTINUE");
|
||||||
Response.output_ oc (Response.make_raw ~code:100 "")
|
Response.output_ ~buf:buf_res oc (Response.make_raw ~code:100 "")
|
||||||
| Some s -> bad_reqf 417 "unknown expectation %s" s
|
| Some s -> bad_reqf 417 "unknown expectation %s" s
|
||||||
| None -> ());
|
| None -> ());
|
||||||
|
|
||||||
|
|
@ -1041,7 +1060,7 @@ let client_handle_for (self : t) ic oc : unit =
|
||||||
try
|
try
|
||||||
if Headers.get "connection" r.Response.headers = Some "close" then
|
if Headers.get "connection" r.Response.headers = Some "close" then
|
||||||
continue := false;
|
continue := false;
|
||||||
Response.output_ oc r
|
Response.output_ ~buf:buf_res oc r
|
||||||
with Sys_error _ -> continue := false
|
with Sys_error _ -> continue := false
|
||||||
in
|
in
|
||||||
|
|
||||||
|
|
@ -1052,10 +1071,10 @@ let client_handle_for (self : t) ic oc : unit =
|
||||||
(* connection broken somehow *)
|
(* connection broken somehow *)
|
||||||
| Bad_req (code, s) ->
|
| Bad_req (code, s) ->
|
||||||
continue := false;
|
continue := false;
|
||||||
Response.output_ oc @@ Response.make_raw ~code s
|
Response.output_ ~buf:buf_res oc @@ Response.make_raw ~code s
|
||||||
| e ->
|
| e ->
|
||||||
continue := false;
|
continue := false;
|
||||||
Response.output_ oc
|
Response.output_ ~buf:buf_res oc
|
||||||
@@ Response.fail ~code:500 "server error: %s" (Printexc.to_string e))
|
@@ Response.fail ~code:500 "server error: %s" (Printexc.to_string e))
|
||||||
done
|
done
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -148,10 +148,13 @@ module Request : sig
|
||||||
@since 0.3
|
@since 0.3
|
||||||
*)
|
*)
|
||||||
|
|
||||||
val read_body_full : ?buf_size:int -> byte_stream t -> string t
|
val read_body_full :
|
||||||
|
?buf:Tiny_httpd_buf.t -> ?buf_size:int -> byte_stream t -> string t
|
||||||
(** Read the whole body into a string. Potentially blocking.
|
(** Read the whole body into a string. Potentially blocking.
|
||||||
|
|
||||||
@param buf_size initial size of underlying buffer (since 0.11) *)
|
@param buf_size initial size of underlying buffer (since 0.11)
|
||||||
|
@param buf the initial buffer (since NEXT_RELEASE)
|
||||||
|
*)
|
||||||
|
|
||||||
(**/**)
|
(**/**)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
module IO = Tiny_httpd_io
|
module IO = Tiny_httpd_io
|
||||||
module H = Tiny_httpd_server
|
module H = Tiny_httpd_server
|
||||||
|
module Pool = Tiny_httpd_pool
|
||||||
|
|
||||||
let ( let@ ) = ( @@ )
|
let ( let@ ) = ( @@ )
|
||||||
|
|
||||||
|
|
@ -17,8 +18,9 @@ let get_max_connection_ ?(max_connections = 64) () : int =
|
||||||
|
|
||||||
let buf_size = 16 * 1024
|
let buf_size = 16 * 1024
|
||||||
|
|
||||||
let ic_of_flow (flow : Eio.Net.stream_socket) : IO.In_channel.t =
|
let ic_of_flow ~buf_pool:ic_pool (flow : Eio.Net.stream_socket) :
|
||||||
let cstruct = Cstruct.create buf_size in
|
IO.In_channel.t =
|
||||||
|
Pool.with_resource ic_pool @@ fun cstruct ->
|
||||||
let len_slice = ref 0 in
|
let len_slice = ref 0 in
|
||||||
let offset = ref 0 in
|
let offset = ref 0 in
|
||||||
|
|
||||||
|
|
@ -46,9 +48,10 @@ let ic_of_flow (flow : Eio.Net.stream_socket) : IO.In_channel.t =
|
||||||
let close () = flow#shutdown `Receive in
|
let close () = flow#shutdown `Receive in
|
||||||
{ IO.In_channel.input; close }
|
{ IO.In_channel.input; close }
|
||||||
|
|
||||||
let oc_of_flow (flow : Eio.Net.stream_socket) : IO.Out_channel.t =
|
let oc_of_flow ~buf_pool:oc_pool (flow : Eio.Net.stream_socket) :
|
||||||
|
IO.Out_channel.t =
|
||||||
(* write buffer *)
|
(* write buffer *)
|
||||||
let wbuf = Bytes.create buf_size in
|
Pool.with_resource oc_pool @@ fun wbuf ->
|
||||||
let offset = ref 0 in
|
let offset = ref 0 in
|
||||||
|
|
||||||
let flush () =
|
let flush () =
|
||||||
|
|
@ -100,6 +103,8 @@ let io_backend ?(addr = "127.0.0.1") ?(port = 8080) ?max_connections
|
||||||
let init_addr () = addr
|
let init_addr () = addr
|
||||||
let init_port () = port
|
let init_port () = port
|
||||||
let get_time_s () = Unix.gettimeofday ()
|
let get_time_s () = Unix.gettimeofday ()
|
||||||
|
let ic_pool = Pool.create ~mk_item:(fun () -> Cstruct.create buf_size) ()
|
||||||
|
let oc_pool = Pool.create ~mk_item:(fun () -> Bytes.create buf_size) ()
|
||||||
|
|
||||||
let tcp_server () : IO.TCP_server.builder =
|
let tcp_server () : IO.TCP_server.builder =
|
||||||
{
|
{
|
||||||
|
|
@ -151,8 +156,8 @@ let io_backend ?(addr = "127.0.0.1") ?(port = 8080) ?max_connections
|
||||||
k "Tiny_httpd_eio: client handler returned");
|
k "Tiny_httpd_eio: client handler returned");
|
||||||
Atomic.decr active_conns)
|
Atomic.decr active_conns)
|
||||||
in
|
in
|
||||||
let ic = ic_of_flow flow in
|
let ic = ic_of_flow ~buf_pool:ic_pool flow in
|
||||||
let oc = oc_of_flow flow in
|
let oc = oc_of_flow ~buf_pool:oc_pool flow in
|
||||||
handle.handle ic oc)
|
handle.handle ic oc)
|
||||||
done);
|
done);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue