feat: pass buf_size in many places, set default buf_size to 16kb

This commit is contained in:
Simon Cruanes 2021-12-10 11:43:03 -05:00
parent 0ae9ec4426
commit ba31534551
No known key found for this signature in database
GPG key ID: 4AC01D0849AA62B6
3 changed files with 27 additions and 18 deletions

View file

@ -11,7 +11,7 @@ let () =
"-j", Arg.Set_int j, " maximum number of connections"; "-j", Arg.Set_int j, " maximum number of connections";
]) (fun _ -> raise (Arg.Bad "")) "echo [option]*"; ]) (fun _ -> raise (Arg.Bad "")) "echo [option]*";
let server = S.create ~port:!port_ ~max_connections:!j () in let server = S.create ~port:!port_ ~max_connections:!j () in
Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(1024*1024) server; Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(16*1024) server;
(* say hello *) (* say hello *)
S.add_route_handler ~meth:`GET server S.add_route_handler ~meth:`GET server
S.Route.(exact "hello" @/ string @/ return) S.Route.(exact "hello" @/ string @/ return)

View file

@ -66,10 +66,10 @@ module Byte_stream = struct
bs_close=(fun () -> ()); bs_close=(fun () -> ());
} }
let of_chan_ ~close ic : t = let of_chan_ ?(buf_size=16 * 1024) ~close ic : t =
let i = ref 0 in let i = ref 0 in
let len = ref 0 in let len = ref 0 in
let buf = Bytes.make 4096 ' ' in let buf = Bytes.make buf_size ' ' in
{ bs_fill_buf=(fun () -> { bs_fill_buf=(fun () ->
if !i >= !len then ( if !i >= !len then (
i := 0; i := 0;
@ -116,10 +116,10 @@ module Byte_stream = struct
let of_string s : t = let of_string s : t =
of_bytes (Bytes.unsafe_of_string s) of_bytes (Bytes.unsafe_of_string s)
let with_file file f = let with_file ?buf_size file f =
let ic = open_in file in let ic = open_in file in
try try
let x = f (of_chan ic) in let x = f (of_chan ?buf_size ic) in
close_in ic; close_in ic;
x x
with e -> with e ->
@ -542,9 +542,10 @@ module Request = struct
| e -> | e ->
Error (400, Printexc.to_string e) Error (400, Printexc.to_string e)
let read_body_full (self:byte_stream t) : string t = let read_body_full ?buf_size (self:byte_stream t) : string t =
try try
let body = Byte_stream.read_all self.body in let buf = Buf_.create ?size:buf_size () in
let body = Byte_stream.read_all ~buf self.body in
{ self with body } { self with body }
with with
| Bad_req _ as e -> raise e | Bad_req _ as e -> raise e
@ -834,6 +835,8 @@ type t = {
masksigpipe: bool; masksigpipe: bool;
buf_size: int;
mutable handler: (string Request.t -> Response.t); mutable handler: (string Request.t -> Response.t);
(* toplevel handler, if any *) (* toplevel handler, if any *)
@ -921,7 +924,7 @@ let add_route_handler_
let add_route_handler (type a) ?accept ?middlewares ?meth let add_route_handler (type a) ?accept ?middlewares ?meth
self (route:(a,_) Route.t) (f:_) : unit = self (route:(a,_) Route.t) (f:_) : unit =
let tr_req _oc req ~resp f = resp (f (Request.read_body_full req)) in let tr_req _oc req ~resp f = resp (f (Request.read_body_full ~buf_size:self.buf_size req)) in
add_route_handler_ ?accept ?middlewares ?meth self route ~tr_req f add_route_handler_ ?accept ?middlewares ?meth self route ~tr_req f
let add_route_handler_stream ?accept ?middlewares ?meth self route f = let add_route_handler_stream ?accept ?middlewares ?meth self route f =
@ -934,7 +937,7 @@ let[@inline] _opt_iter ~f o = match o with
let add_route_server_sent_handler ?accept self route f = let add_route_server_sent_handler ?accept self route f =
let tr_req oc req ~resp f = let tr_req oc req ~resp f =
let req = Request.read_body_full req in let req = Request.read_body_full ~buf_size:self.buf_size req in
let headers = ref Headers.(empty |> set "content-type" "text/event-stream") in let headers = ref Headers.(empty |> set "content-type" "text/event-stream") in
(* send response once *) (* send response once *)
@ -976,11 +979,12 @@ let create
?(masksigpipe=true) ?(masksigpipe=true)
?(max_connections=32) ?(max_connections=32)
?(timeout=0.0) ?(timeout=0.0)
?(buf_size=16 * 1_024)
?(new_thread=(fun f -> ignore (Thread.create f () : Thread.t))) ?(new_thread=(fun f -> ignore (Thread.create f () : Thread.t)))
?(addr="127.0.0.1") ?(port=8080) ?sock () : t = ?(addr="127.0.0.1") ?(port=8080) ?sock () : t =
let handler _req = Response.fail ~code:404 "no top handler" in let handler _req = Response.fail ~code:404 "no top handler" in
let max_connections = max 4 max_connections in let max_connections = max 4 max_connections in
{ new_thread; addr; port; sock; masksigpipe; handler; { new_thread; addr; port; sock; masksigpipe; handler; buf_size;
running= true; sem_max_connections=Sem_.create max_connections; running= true; sem_max_connections=Sem_.create max_connections;
path_handlers=[]; timeout; path_handlers=[]; timeout;
middlewares=[]; middlewares_sorted=lazy []; middlewares=[]; middlewares_sorted=lazy [];
@ -1002,8 +1006,8 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
let _ = Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout) in let _ = Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout) in
let ic = Unix.in_channel_of_descr client_sock in let ic = Unix.in_channel_of_descr client_sock in
let oc = Unix.out_channel_of_descr client_sock in let oc = Unix.out_channel_of_descr client_sock in
let buf = Buf_.create() in let buf = Buf_.create ~size:self.buf_size () in
let is = Byte_stream.of_chan ic in let is = Byte_stream.of_chan ~buf_size:self.buf_size ic in
let continue = ref true in let continue = ref true in
while !continue && self.running do while !continue && self.running do
_debug (fun k->k "read next request"); _debug (fun k->k "read next request");
@ -1030,7 +1034,7 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
| Some f -> unwrap_resp_result f | Some f -> unwrap_resp_result f
| None -> | None ->
(fun _oc req ~resp -> (fun _oc req ~resp ->
let body_str = Request.read_body_full req in let body_str = Request.read_body_full ~buf_size:self.buf_size req in
resp (self.handler body_str)) resp (self.handler body_str))
in in

View file

@ -129,10 +129,10 @@ module Byte_stream : sig
val empty : t val empty : t
val of_chan : in_channel -> t val of_chan : ?buf_size:int -> in_channel -> t
(** Make a buffered stream from the given channel. *) (** Make a buffered stream from the given channel. *)
val of_chan_close_noerr : in_channel -> t val of_chan_close_noerr : ?buf_size:int -> in_channel -> t
(** Same as {!of_chan} but the [close] method will never fail. *) (** Same as {!of_chan} but the [close] method will never fail. *)
val of_bytes : ?i:int -> ?len:int -> bytes -> t val of_bytes : ?i:int -> ?len:int -> bytes -> t
@ -149,7 +149,7 @@ module Byte_stream : sig
(** Write the stream to the channel. (** Write the stream to the channel.
@since 0.3 *) @since 0.3 *)
val with_file : string -> (t -> 'a) -> 'a val with_file : ?buf_size:int -> string -> (t -> 'a) -> 'a
(** Open a file with given name, and obtain an input stream (** Open a file with given name, and obtain an input stream
on its content. When the function returns, the stream (and file) are closed. *) on its content. When the function returns, the stream (and file) are closed. *)
@ -277,8 +277,10 @@ module Request : sig
@since 0.3 @since 0.3
*) *)
val read_body_full : byte_stream t -> string t val read_body_full : ?buf_size:int -> byte_stream t -> string t
(** Read the whole body into a string. Potentially blocking. *) (** Read the whole body into a string. Potentially blocking.
@param buf_size initial size of underlying buffer (since NEXT_RELEASE) *)
(**/**) (**/**)
(* for testing purpose, do not use *) (* for testing purpose, do not use *)
@ -460,6 +462,7 @@ val create :
?masksigpipe:bool -> ?masksigpipe:bool ->
?max_connections:int -> ?max_connections:int ->
?timeout:float -> ?timeout:float ->
?buf_size:int ->
?new_thread:((unit -> unit) -> unit) -> ?new_thread:((unit -> unit) -> unit) ->
?addr:string -> ?addr:string ->
?port:int -> ?port:int ->
@ -475,6 +478,8 @@ val create :
@param masksigpipe if true, block the signal {!Sys.sigpipe} which otherwise @param masksigpipe if true, block the signal {!Sys.sigpipe} which otherwise
tends to kill client threads when they try to write on broken sockets. Default: [true]. tends to kill client threads when they try to write on broken sockets. Default: [true].
@param buf_size size for buffers (since NEXT_RELEASE)
@param new_thread a function used to spawn a new thread to handle a @param new_thread a function used to spawn a new thread to handle a
new client connection. By default it is {!Thread.create} but one new client connection. By default it is {!Thread.create} but one
could use a thread pool instead. could use a thread pool instead.