Merge pull request #27 from c-cube/wip-middleware

add middlewares
This commit is contained in:
Simon Cruanes 2021-12-16 10:52:34 -05:00
commit b5e50fac59
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
6 changed files with 324 additions and 97 deletions

View file

@ -1,6 +1,39 @@
module S = Tiny_httpd module S = Tiny_httpd
let now_ = Unix.gettimeofday
(* util: a little middleware collecting statistics *)
let middleware_stat () : S.Middleware.t * (unit -> string) =
let n_req = ref 0 in
let total_time_ = ref 0. in
let parse_time_ = ref 0. in
let build_time_ = ref 0. in
let write_time_ = ref 0. in
let m h req ~resp =
incr n_req;
let t1 = S.Request.start_time req in
let t2 = now_ () in
h req ~resp:(fun response ->
let t3 = now_ () in
resp response;
let t4 = now_ () in
total_time_ := !total_time_ +. (t4 -. t1);
parse_time_ := !parse_time_ +. (t2 -. t1);
build_time_ := !build_time_ +. (t3 -. t2);
write_time_ := !write_time_ +. (t4 -. t3);
)
and get_stat () =
Printf.sprintf "%d requests (average response time: %.3fms = %.3fms + %.3fms + %.3fms)"
!n_req (!total_time_ /. float !n_req *. 1e3)
(!parse_time_ /. float !n_req *. 1e3)
(!build_time_ /. float !n_req *. 1e3)
(!write_time_ /. float !n_req *. 1e3)
in
m, get_stat
let () = let () =
let port_ = ref 8080 in let port_ = ref 8080 in
let j = ref 32 in let j = ref 32 in
@ -10,12 +43,19 @@ let () =
"--debug", Arg.Unit (fun () -> S._enable_debug true), " enable debug"; "--debug", Arg.Unit (fun () -> S._enable_debug true), " enable debug";
"-j", Arg.Set_int j, " maximum number of connections"; "-j", Arg.Set_int j, " maximum number of connections";
]) (fun _ -> raise (Arg.Bad "")) "echo [option]*"; ]) (fun _ -> raise (Arg.Bad "")) "echo [option]*";
let server = S.create ~port:!port_ ~max_connections:!j () in let server = S.create ~port:!port_ ~max_connections:!j () in
Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(1024*1024) server; Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(16*1024) server;
let m_stats, get_stats = middleware_stat () in
S.add_middleware server ~stage:(`Stage 1) m_stats;
(* say hello *) (* say hello *)
S.add_route_handler ~meth:`GET server S.add_route_handler ~meth:`GET server
S.Route.(exact "hello" @/ string @/ return) S.Route.(exact "hello" @/ string @/ return)
(fun name _req -> S.Response.make_string (Ok ("hello " ^name ^"!\n"))); (fun name _req -> S.Response.make_string (Ok ("hello " ^name ^"!\n")));
(* compressed file access *)
S.add_route_handler ~meth:`GET server S.add_route_handler ~meth:`GET server
S.Route.(exact "zcat" @/ string_urlencoded @/ return) S.Route.(exact "zcat" @/ string_urlencoded @/ return)
(fun path _req -> (fun path _req ->
@ -33,6 +73,7 @@ let () =
in in
S.Response.make_stream ~headers:mime_type (Ok str) S.Response.make_stream ~headers:mime_type (Ok str)
); );
(* echo request *) (* echo request *)
S.add_route_handler server S.add_route_handler server
S.Route.(exact "echo" @/ return) S.Route.(exact "echo" @/ return)
@ -43,6 +84,8 @@ let () =
in in
S.Response.make_string S.Response.make_string
(Ok (Format.asprintf "echo:@ %a@ (query: %s)@." S.Request.pp req q))); (Ok (Format.asprintf "echo:@ %a@ (query: %s)@." S.Request.pp req q)));
(* file upload *)
S.add_route_handler_stream ~meth:`PUT server S.add_route_handler_stream ~meth:`PUT server
S.Route.(exact "upload" @/ string @/ return) S.Route.(exact "upload" @/ string @/ return)
(fun path req -> (fun path req ->
@ -56,6 +99,28 @@ let () =
with e -> with e ->
S.Response.fail ~code:500 "couldn't upload file: %s" (Printexc.to_string e) S.Response.fail ~code:500 "couldn't upload file: %s" (Printexc.to_string e)
); );
(* stats *)
S.add_route_handler server S.Route.(exact "stats" @/ return)
(fun _req ->
let stats = get_stats() in
S.Response.make_string @@ Ok stats
);
(* main page *)
S.add_route_handler server S.Route.(return)
(fun _req ->
let s = "<head></head><body>\n\
<p><b>welcome!</b>\n<p>endpoints are:\n<ul>\
<li><pre>/hello/'name' (GET)</pre></li>\n\
<li><pre>/echo/ (GET) echoes back query</pre></li>\n\
<li><pre>/upload/'path' (PUT) to upload a file</pre></li>\n\
<li><pre>/zcat/'path' (GET) to download a file (compressed)</pre></li>\n\
<li><pre>/stats/ (GET) to access statistics</pre></li>\n\
</ul></body>"
in
S.Response.make_string ~headers:["content-type", "text/html"] @@ Ok s);
Printf.printf "listening on http://%s:%d\n%!" (S.addr server) (S.port server); Printf.printf "listening on http://%s:%d\n%!" (S.addr server) (S.port server);
match S.run server with match S.run server with
| Ok () -> () | Ok () -> ()

View file

@ -66,10 +66,10 @@ module Byte_stream = struct
bs_close=(fun () -> ()); bs_close=(fun () -> ());
} }
let of_chan_ ~close ic : t = let of_chan_ ?(buf_size=16 * 1024) ~close ic : t =
let i = ref 0 in let i = ref 0 in
let len = ref 0 in let len = ref 0 in
let buf = Bytes.make 4096 ' ' in let buf = Bytes.make buf_size ' ' in
{ bs_fill_buf=(fun () -> { bs_fill_buf=(fun () ->
if !i >= !len then ( if !i >= !len then (
i := 0; i := 0;
@ -116,10 +116,10 @@ module Byte_stream = struct
let of_string s : t = let of_string s : t =
of_bytes (Bytes.unsafe_of_string s) of_bytes (Bytes.unsafe_of_string s)
let with_file file f = let with_file ?buf_size file f =
let ic = open_in file in let ic = open_in file in
try try
let x = f (of_chan ic) in let x = f (of_chan ?buf_size ic) in
close_in ic; close_in ic;
x x
with e -> with e ->
@ -367,6 +367,7 @@ module Request = struct
path_components: string list; path_components: string list;
query: (string*string) list; query: (string*string) list;
body: 'body; body: 'body;
start_time: float;
} }
let headers self = self.headers let headers self = self.headers
@ -374,13 +375,16 @@ module Request = struct
let meth self = self.meth let meth self = self.meth
let path self = self.path let path self = self.path
let body self = self.body let body self = self.body
let start_time self = self.start_time
let query self = self.query let query self = self.query
let get_header ?f self h = Headers.get ?f h self.headers let get_header ?f self h = Headers.get ?f h self.headers
let get_header_int self h = match get_header self h with let get_header_int self h = match get_header self h with
| Some x -> (try Some (int_of_string x) with _ -> None) | Some x -> (try Some (int_of_string x) with _ -> None)
| None -> None | None -> None
let set_header self k v = {self with headers=Headers.set k v self.headers} let set_header k v self = {self with headers=Headers.set k v self.headers}
let update_headers f self = {self with headers=f self.headers}
let set_body b self = {self with body=b}
let pp_comp_ out comp = let pp_comp_ out comp =
Format.fprintf out "[%s]" Format.fprintf out "[%s]"
@ -481,6 +485,7 @@ module Request = struct
let parse_req_start ~buf (bs:byte_stream) : unit t option resp_result = let parse_req_start ~buf (bs:byte_stream) : unit t option resp_result =
try try
let line = Byte_stream.read_line ~buf bs in let line = Byte_stream.read_line ~buf bs in
let start_time = Unix.gettimeofday () in
let meth, path = let meth, path =
try try
let m, p, v = Scanf.sscanf line "%s %s HTTP/1.%d\r" (fun x y z->x,y,z) in let m, p, v = Scanf.sscanf line "%s %s HTTP/1.%d\r" (fun x y z->x,y,z) in
@ -506,7 +511,7 @@ module Request = struct
| Error e -> bad_reqf 400 "invalid query: %s" e | Error e -> bad_reqf 400 "invalid query: %s" e
in in
Ok (Some {meth; query; host; path; path_components; Ok (Some {meth; query; host; path; path_components;
headers; body=()}) headers; body=(); start_time; })
with with
| End_of_file | Sys_error _ -> Ok None | End_of_file | Sys_error _ -> Ok None
| Bad_req (c,s) -> Error (c,s) | Bad_req (c,s) -> Error (c,s)
@ -540,9 +545,10 @@ module Request = struct
| e -> | e ->
Error (400, Printexc.to_string e) Error (400, Printexc.to_string e)
let read_body_full (self:byte_stream t) : string t = let read_body_full ?buf_size (self:byte_stream t) : string t =
try try
let body = Byte_stream.read_all self.body in let buf = Buf_.create ?size:buf_size () in
let body = Byte_stream.read_all ~buf self.body in
{ self with body } { self with body }
with with
| Bad_req _ as e -> raise e | Bad_req _ as e -> raise e
@ -581,6 +587,12 @@ module Response = struct
body: body; body: body;
} }
let set_body body self = {self with body}
let set_headers headers self = {self with headers}
let update_headers f self = {self with headers=f self.headers}
let set_header k v self = {self with headers = Headers.set k v self.headers}
let set_code code self = {self with code}
let make_raw ?(headers=[]) ~code body : t = let make_raw ?(headers=[]) ~code body : t =
(* add content length to response *) (* add content length to response *)
let headers = let headers =
@ -787,12 +799,21 @@ module Route = struct
let pp out x = Format.pp_print_string out (to_string x) let pp out x = Format.pp_print_string out (to_string x)
end end
module Middleware = struct
type handler = byte_stream Request.t -> resp:(Response.t -> unit) -> unit
type t = handler -> handler
(** Apply a list of middlewares to [h] *)
let apply_l (l:t list) (h:handler) : handler =
List.fold_right (fun m h -> m h) l h
let[@inline] nil : t = fun h -> h
end
(* a request handler. handles a single request. *) (* a request handler. handles a single request. *)
type cb_path_handler = type cb_path_handler =
out_channel -> out_channel ->
byte_stream Request.t -> Middleware.handler
resp:(Response.t -> unit) ->
unit
module type SERVER_SENT_GENERATOR = sig module type SERVER_SENT_GENERATOR = sig
val set_headers : Headers.t -> unit val set_headers : Headers.t -> unit
@ -823,19 +844,20 @@ type t = {
masksigpipe: bool; masksigpipe: bool;
buf_size: int;
mutable handler: (string Request.t -> Response.t); mutable handler: (string Request.t -> Response.t);
(* toplevel handler, if any *) (* toplevel handler, if any *)
mutable middlewares : (int * Middleware.t) list;
(** Global middlewares *)
mutable middlewares_sorted : (int * Middleware.t) list lazy_t;
(* sorted version of {!middlewares} *)
mutable path_handlers : (unit Request.t -> cb_path_handler resp_result option) list; mutable path_handlers : (unit Request.t -> cb_path_handler resp_result option) list;
(* path handlers *) (* path handlers *)
mutable cb_decode_req:
(unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) list;
(* middleware to decode requests *)
mutable cb_encode_resp: (unit Request.t -> Response.t -> Response.t option) list;
(* middleware to encode responses *)
mutable running: bool; mutable running: bool;
(* true while the server is running. no need to protect with a mutex, (* true while the server is running. no need to protect with a mutex,
writes should be atomic enough. *) writes should be atomic enough. *)
@ -846,15 +868,48 @@ let port self = self.port
let active_connections self = Sem_.num_acquired self.sem_max_connections - 1 let active_connections self = Sem_.num_acquired self.sem_max_connections - 1
let add_decode_request_cb self f = self.cb_decode_req <- f :: self.cb_decode_req let add_middleware ~stage self m =
let add_encode_response_cb self f = self.cb_encode_resp <- f :: self.cb_encode_resp let stage = match stage with
| `Encoding -> 0
| `Stage n when n < 1 -> invalid_arg "add_middleware: bad stage"
| `Stage n -> n
in
self.middlewares <- (stage,m) :: self.middlewares;
self.middlewares_sorted <- lazy (
List.stable_sort (fun (s1,_) (s2,_) -> compare s1 s2) self.middlewares
)
let add_decode_request_cb self f =
(* turn it into a middleware *)
let m h req ~resp =
(* see if [f] modifies the stream *)
let req0 = {req with Request.body=()} in
match f req0 with
| None -> h req ~resp (* pass through *)
| Some (req1, tr_stream) ->
let req = {req1 with Request.body=tr_stream req.Request.body} in
h req ~resp
in
add_middleware self ~stage:`Encoding m
let add_encode_response_cb self f =
let m h req ~resp =
h req ~resp:(fun r ->
let req0 = {req with Request.body=()} in
(* now transform [r] if we want to *)
match f req0 r with
| None -> resp r
| Some r' -> resp r')
in
add_middleware self ~stage:`Encoding m
let set_top_handler self f = self.handler <- f let set_top_handler self f = self.handler <- f
(* route the given handler. (* route the given handler.
@param tr_req wraps the actual concrete function returned by the route @param tr_req wraps the actual concrete function returned by the route
and makes it into a handler. *) and makes it into a handler. *)
let add_route_handler_ let add_route_handler_
?(accept=fun _req -> Ok ()) ?(accept=fun _req -> Ok ()) ?(middlewares=[])
?meth ~tr_req self (route:_ Route.t) f = ?meth ~tr_req self (route:_ Route.t) f =
let ph req : cb_path_handler resp_result option = let ph req : cb_path_handler resp_result option =
match meth with match meth with
@ -864,7 +919,10 @@ let add_route_handler_
| Some handler -> | Some handler ->
(* we have a handler, do we accept the request based on its headers? *) (* we have a handler, do we accept the request based on its headers? *)
begin match accept req with begin match accept req with
| Ok () -> Some (Ok (fun oc req ~resp -> tr_req oc req ~resp handler)) | Ok () ->
Some (Ok (fun oc ->
Middleware.apply_l middlewares @@
fun req ~resp -> tr_req oc req ~resp handler))
| Error _ as e -> Some e | Error _ as e -> Some e
end end
| None -> | None ->
@ -873,13 +931,14 @@ let add_route_handler_
in in
self.path_handlers <- ph :: self.path_handlers self.path_handlers <- ph :: self.path_handlers
let add_route_handler (type a) ?accept ?meth self (route:(a,_) Route.t) (f:_) : unit = let add_route_handler (type a) ?accept ?middlewares ?meth
let tr_req _oc req ~resp f = resp (f (Request.read_body_full req)) in self (route:(a,_) Route.t) (f:_) : unit =
add_route_handler_ ?accept ?meth self route ~tr_req f let tr_req _oc req ~resp f = resp (f (Request.read_body_full ~buf_size:self.buf_size req)) in
add_route_handler_ ?accept ?middlewares ?meth self route ~tr_req f
let add_route_handler_stream ?accept ?meth self route f = let add_route_handler_stream ?accept ?middlewares ?meth self route f =
let tr_req _oc req ~resp f = resp (f req) in let tr_req _oc req ~resp f = resp (f req) in
add_route_handler_ ?accept ?meth self route ~tr_req f add_route_handler_ ?accept ?middlewares ?meth self route ~tr_req f
let[@inline] _opt_iter ~f o = match o with let[@inline] _opt_iter ~f o = match o with
| None -> () | None -> ()
@ -887,7 +946,7 @@ let[@inline] _opt_iter ~f o = match o with
let add_route_server_sent_handler ?accept self route f = let add_route_server_sent_handler ?accept self route f =
let tr_req oc req ~resp f = let tr_req oc req ~resp f =
let req = Request.read_body_full req in let req = Request.read_body_full ~buf_size:self.buf_size req in
let headers = ref Headers.(empty |> set "content-type" "text/event-stream") in let headers = ref Headers.(empty |> set "content-type" "text/event-stream") in
(* send response once *) (* send response once *)
@ -929,15 +988,21 @@ let create
?(masksigpipe=true) ?(masksigpipe=true)
?(max_connections=32) ?(max_connections=32)
?(timeout=0.0) ?(timeout=0.0)
?(buf_size=16 * 1_024)
?(new_thread=(fun f -> ignore (Thread.create f () : Thread.t))) ?(new_thread=(fun f -> ignore (Thread.create f () : Thread.t)))
?(addr="127.0.0.1") ?(port=8080) ?sock () : t = ?(addr="127.0.0.1") ?(port=8080) ?sock
?(middlewares=[])
() : t =
let handler _req = Response.fail ~code:404 "no top handler" in let handler _req = Response.fail ~code:404 "no top handler" in
let max_connections = max 4 max_connections in let max_connections = max 4 max_connections in
{ new_thread; addr; port; sock; masksigpipe; handler; let self = {
new_thread; addr; port; sock; masksigpipe; handler; buf_size;
running= true; sem_max_connections=Sem_.create max_connections; running= true; sem_max_connections=Sem_.create max_connections;
path_handlers=[]; timeout; path_handlers=[]; timeout;
cb_encode_resp=[]; cb_decode_req=[]; middlewares=[]; middlewares_sorted=lazy [];
} } in
List.iter (fun (stage,m) -> add_middleware self ~stage m) middlewares;
self
let stop s = s.running <- false let stop s = s.running <- false
@ -955,8 +1020,8 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
let _ = Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout) in let _ = Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout) in
let ic = Unix.in_channel_of_descr client_sock in let ic = Unix.in_channel_of_descr client_sock in
let oc = Unix.out_channel_of_descr client_sock in let oc = Unix.out_channel_of_descr client_sock in
let buf = Buf_.create() in let buf = Buf_.create ~size:self.buf_size () in
let is = Byte_stream.of_chan ic in let is = Byte_stream.of_chan ~buf_size:self.buf_size ic in
let continue = ref true in let continue = ref true in
while !continue && self.running do while !continue && self.running do
_debug (fun k->k "read next request"); _debug (fun k->k "read next request");
@ -981,7 +1046,10 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
let handler = let handler =
match find_map (fun ph -> ph req) self.path_handlers with match find_map (fun ph -> ph req) self.path_handlers with
| Some f -> unwrap_resp_result f | Some f -> unwrap_resp_result f
| None -> (fun _oc req ~resp -> resp (self.handler (Request.read_body_full req))) | None ->
(fun _oc req ~resp ->
let body_str = Request.read_body_full ~buf_size:self.buf_size req in
resp (self.handler body_str))
in in
(* handle expect/continue *) (* handle expect/continue *)
@ -993,33 +1061,22 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
| None -> () | None -> ()
end; end;
(* preprocess request's input stream *) (* apply middlewares *)
let req0, tr_stream = let handler =
List.fold_left fun oc ->
(fun (req,tr) cb -> List.fold_right (fun (_, m) h -> m h)
match cb req with (Lazy.force self.middlewares_sorted) (handler oc)
| None -> req, tr
| Some (r',f) -> r', (fun is -> tr is |> f))
(req, (fun is->is)) self.cb_decode_req
in
(* now actually read request's body into a stream *)
let req =
Request.parse_body_ ~tr_stream ~buf {req0 with body=is}
|> unwrap_resp_result
in in
(* how to post-process response accordingly *) (* now actually read request's body into a stream *)
let post_process_resp resp = let req =
List.fold_left Request.parse_body_ ~tr_stream:(fun s->s) ~buf {req with body=is}
(fun resp cb -> match cb req0 resp with None -> resp | Some r' -> r') |> unwrap_resp_result
resp self.cb_encode_resp
in in
(* how to reply *) (* how to reply *)
let resp r = let resp r =
try try Response.output_ oc r
let r = post_process_resp r in
Response.output_ oc r
with Sys_error _ -> continue := false with Sys_error _ -> continue := false
in in

View file

@ -129,10 +129,10 @@ module Byte_stream : sig
val empty : t val empty : t
val of_chan : in_channel -> t val of_chan : ?buf_size:int -> in_channel -> t
(** Make a buffered stream from the given channel. *) (** Make a buffered stream from the given channel. *)
val of_chan_close_noerr : in_channel -> t val of_chan_close_noerr : ?buf_size:int -> in_channel -> t
(** Same as {!of_chan} but the [close] method will never fail. *) (** Same as {!of_chan} but the [close] method will never fail. *)
val of_bytes : ?i:int -> ?len:int -> bytes -> t val of_bytes : ?i:int -> ?len:int -> bytes -> t
@ -149,7 +149,7 @@ module Byte_stream : sig
(** Write the stream to the channel. (** Write the stream to the channel.
@since 0.3 *) @since 0.3 *)
val with_file : string -> (t -> 'a) -> 'a val with_file : ?buf_size:int -> string -> (t -> 'a) -> 'a
(** Open a file with given name, and obtain an input stream (** Open a file with given name, and obtain an input stream
on its content. When the function returns, the stream (and file) are closed. *) on its content. When the function returns, the stream (and file) are closed. *)
@ -227,6 +227,7 @@ module Request : sig
path_components: string list; path_components: string list;
query: (string*string) list; query: (string*string) list;
body: 'body; body: 'body;
start_time: float; (** @since NEXT_RELEASE *)
} }
(** A request with method, path, host, headers, and a body, sent by a client. (** A request with method, path, host, headers, and a body, sent by a client.
@ -253,7 +254,16 @@ module Request : sig
val get_header_int : _ t -> string -> int option val get_header_int : _ t -> string -> int option
val set_header : 'a t -> string -> string -> 'a t val set_header : string -> string -> 'a t -> 'a t
(** [set_header k v req] sets [k: v] in the request [req]'s headers. *)
val update_headers : (Headers.t -> Headers.t) -> 'a t -> 'a t
(** Modify headers
@since 0.11 *)
val set_body : 'a -> _ t -> 'a t
(** [set_body b req] returns a new query whose body is [b].
@since 0.11 *)
val host : _ t -> string val host : _ t -> string
(** Host field of the request. It also appears in the headers. *) (** Host field of the request. It also appears in the headers. *)
@ -271,14 +281,20 @@ module Request : sig
val body : 'b t -> 'b val body : 'b t -> 'b
(** Request body, possibly empty. *) (** Request body, possibly empty. *)
val start_time : _ t -> float
(** time stamp (from {!Unix.gettimeofday}) after parsing the first line of the request
@since NEXT_RELEASE *)
val limit_body_size : max_size:int -> byte_stream t -> byte_stream t val limit_body_size : max_size:int -> byte_stream t -> byte_stream t
(** Limit the body size to [max_size] bytes, or return (** Limit the body size to [max_size] bytes, or return
a [413] error. a [413] error.
@since 0.3 @since 0.3
*) *)
val read_body_full : byte_stream t -> string t val read_body_full : ?buf_size:int -> byte_stream t -> string t
(** Read the whole body into a string. Potentially blocking. *) (** Read the whole body into a string. Potentially blocking.
@param buf_size initial size of underlying buffer (since NEXT_RELEASE) *)
(**/**) (**/**)
(* for testing purpose, do not use *) (* for testing purpose, do not use *)
@ -318,13 +334,33 @@ module Response : sig
(** Body of a response, either as a simple string, (** Body of a response, either as a simple string,
or a stream of bytes, or nothing (for server-sent events). *) or a stream of bytes, or nothing (for server-sent events). *)
type t = { type t = private {
code: Response_code.t; (** HTTP response code. See {!Response_code}. *) code: Response_code.t; (** HTTP response code. See {!Response_code}. *)
headers: Headers.t; (** Headers of the reply. Some will be set by [Tiny_httpd] automatically. *) headers: Headers.t; (** Headers of the reply. Some will be set by [Tiny_httpd] automatically. *)
body: body; (** Body of the response. Can be empty. *) body: body; (** Body of the response. Can be empty. *)
} }
(** A response to send back to a client. *) (** A response to send back to a client. *)
val set_body : body -> t -> t
(** Set the body of the response.
@since 0.11 *)
val set_header : string -> string -> t -> t
(** Set a header.
@since 0.11 *)
val update_headers : (Headers.t -> Headers.t) -> t -> t
(** Modify headers
@since 0.11 *)
val set_headers : Headers.t -> t -> t
(** Set all headers.
@since 0.11 *)
val set_code : Response_code.t -> t -> t
(** Set the response code.
@since 0.11 *)
val make_raw : val make_raw :
?headers:Headers.t -> ?headers:Headers.t ->
code:Response_code.t -> code:Response_code.t ->
@ -426,6 +462,31 @@ module Route : sig
@since 0.7 *) @since 0.7 *)
end end
(** {2 Middlewares}
A middleware can be inserted in a handler to modify or observe
its behavior.
@since NEXT_RELEASE
*)
module Middleware : sig
type handler = byte_stream Request.t -> resp:(Response.t -> unit) -> unit
(** Handlers are functions returning a response to a request.
The response can be delayed, hence the use of a continuation
as the [resp] parameter. *)
type t = handler -> handler
(** A middleware is a handler transformation.
It takes the existing handler [h],
and returns a new one which, given a query, modify it or log it
before passing it to [h], or fail. It can also log or modify or drop
the response. *)
val nil : t
(** Trivial middleware that does nothing. *)
end
(** {2 Main Server type} *) (** {2 Main Server type} *)
type t type t
@ -435,10 +496,12 @@ val create :
?masksigpipe:bool -> ?masksigpipe:bool ->
?max_connections:int -> ?max_connections:int ->
?timeout:float -> ?timeout:float ->
?buf_size:int ->
?new_thread:((unit -> unit) -> unit) -> ?new_thread:((unit -> unit) -> unit) ->
?addr:string -> ?addr:string ->
?port:int -> ?port:int ->
?sock:Unix.file_descr -> ?sock:Unix.file_descr ->
?middlewares:([`Encoding | `Stage of int] * Middleware.t) list ->
unit -> unit ->
t t
(** Create a new webserver. (** Create a new webserver.
@ -450,10 +513,14 @@ val create :
@param masksigpipe if true, block the signal {!Sys.sigpipe} which otherwise @param masksigpipe if true, block the signal {!Sys.sigpipe} which otherwise
tends to kill client threads when they try to write on broken sockets. Default: [true]. tends to kill client threads when they try to write on broken sockets. Default: [true].
@param buf_size size for buffers (since NEXT_RELEASE)
@param new_thread a function used to spawn a new thread to handle a @param new_thread a function used to spawn a new thread to handle a
new client connection. By default it is {!Thread.create} but one new client connection. By default it is {!Thread.create} but one
could use a thread pool instead. could use a thread pool instead.
@param middlewares see {!add_middleware} for more details.
@param max_connections maximum number of simultaneous connections. @param max_connections maximum number of simultaneous connections.
@param timeout connection is closed if the socket does not do read or @param timeout connection is closed if the socket does not do read or
write for the amount of second. Default: 0.0 which means no timeout. write for the amount of second. Default: 0.0 which means no timeout.
@ -482,20 +549,36 @@ val active_connections : t -> int
val add_decode_request_cb : val add_decode_request_cb :
t -> t ->
(unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) -> unit (unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) -> unit
[@@deprecated "use add_middleware"]
(** Add a callback for every request. (** Add a callback for every request.
The callback can provide a stream transformer and a new request (with The callback can provide a stream transformer and a new request (with
modified headers, typically). modified headers, typically).
A possible use is to handle decompression by looking for a [Transfer-Encoding] A possible use is to handle decompression by looking for a [Transfer-Encoding]
header and returning a stream transformer that decompresses on the fly. header and returning a stream transformer that decompresses on the fly.
@deprecated use {!add_middleware} instead
*) *)
val add_encode_response_cb: val add_encode_response_cb:
t -> (unit Request.t -> Response.t -> Response.t option) -> unit t -> (unit Request.t -> Response.t -> Response.t option) -> unit
[@@deprecated "use add_middleware"]
(** Add a callback for every request/response pair. (** Add a callback for every request/response pair.
Similarly to {!add_encode_response_cb} the callback can return a new Similarly to {!add_encode_response_cb} the callback can return a new
response, for example to compress it. response, for example to compress it.
The callback is given the query with only its headers, The callback is given the query with only its headers,
as well as the current response. as well as the current response.
@deprecated use {!add_middleware} instead
*)
val add_middleware :
stage:[`Encoding | `Stage of int] ->
t -> Middleware.t -> unit
(** Add a middleware to every request/response pair.
@param stage specify when middleware applies.
Encoding comes first (outermost layer), then stages in increasing order.
@raise Invalid_argument if stage is [`Stage n] where [n < 1]
@since NEXT_RELEASE
*) *)
(** {2 Request handlers} *) (** {2 Request handlers} *)
@ -509,6 +592,7 @@ val set_top_handler : t -> (string Request.t -> Response.t) -> unit
val add_route_handler : val add_route_handler :
?accept:(unit Request.t -> (unit, Response_code.t * string) result) -> ?accept:(unit Request.t -> (unit, Response_code.t * string) result) ->
?middlewares:Middleware.t list ->
?meth:Meth.t -> ?meth:Meth.t ->
t -> t ->
('a, string Request.t -> Response.t) Route.t -> 'a -> ('a, string Request.t -> Response.t) Route.t -> 'a ->
@ -534,6 +618,7 @@ val add_route_handler :
val add_route_handler_stream : val add_route_handler_stream :
?accept:(unit Request.t -> (unit, Response_code.t * string) result) -> ?accept:(unit Request.t -> (unit, Response_code.t * string) result) ->
?middlewares:Middleware.t list ->
?meth:Meth.t -> ?meth:Meth.t ->
t -> t ->
('a, byte_stream Request.t -> Response.t) Route.t -> 'a -> ('a, byte_stream Request.t -> Response.t) Route.t -> 'a ->

View file

@ -2,7 +2,7 @@
module S = Tiny_httpd module S = Tiny_httpd
module BS = Tiny_httpd.Byte_stream module BS = Tiny_httpd.Byte_stream
let mk_decode_deflate_stream_ ~buf_size () (is:S.byte_stream) : S.byte_stream = let decode_deflate_stream_ ~buf_size (is:S.byte_stream) : S.byte_stream =
S._debug (fun k->k "wrap stream with deflate.decode"); S._debug (fun k->k "wrap stream with deflate.decode");
let buf = Bytes.make buf_size ' ' in let buf = Bytes.make buf_size ' ' in
let buf_len = ref 0 in let buf_len = ref 0 in
@ -145,7 +145,8 @@ let has_deflate s =
try Scanf.sscanf s "deflate, %s" (fun _ -> true) try Scanf.sscanf s "deflate, %s" (fun _ -> true)
with _ -> false with _ -> false
let cb_decode_compressed_stream ~buf_size (req:unit S.Request.t) : _ option = (* decompress [req]'s body if needed *)
let decompress_req_stream_ ~buf_size (req:BS.t S.Request.t) : _ S.Request.t =
match S.Request.get_header ~f:String.trim req "Transfer-Encoding" with match S.Request.get_header ~f:String.trim req "Transfer-Encoding" with
(* TODO (* TODO
| Some "gzip" -> | Some "gzip" ->
@ -155,49 +156,63 @@ let cb_decode_compressed_stream ~buf_size (req:unit S.Request.t) : _ option =
| Some s when has_deflate s -> | Some s when has_deflate s ->
begin match Scanf.sscanf s "deflate, %s" (fun s -> s) with begin match Scanf.sscanf s "deflate, %s" (fun s -> s) with
| tr' -> | tr' ->
let req' = S.Request.set_header req "Transfer-Encoding" tr' in let body' = S.Request.body req |> decode_deflate_stream_ ~buf_size in
Some (req', mk_decode_deflate_stream_ ~buf_size ()) req
| exception _ -> None |> S.Request.set_header "Transfer-Encoding" tr'
|> S.Request.set_body body'
| exception _ -> req
end end
| _ -> None | _ -> req
let cb_encode_compressed_stream let compress_resp_stream_
~compress_above ~compress_above
~buf_size (req:_ S.Request.t) (resp:S.Response.t) : _ option = ~buf_size
if accept_deflate req then ( (req:_ S.Request.t) (resp:S.Response.t) : S.Response.t =
let set_headers h =
(* headers for compressed stream *)
let update_headers h =
h h
|> S.Headers.remove "Content-Length" |> S.Headers.remove "Content-Length"
|> S.Headers.set "Content-Encoding" "deflate" |> S.Headers.set "Content-Encoding" "deflate"
in in
if accept_deflate req then (
match resp.body with match resp.body with
| `String s when String.length s > compress_above -> | `String s when String.length s > compress_above ->
(* big string, we compress *)
S._debug S._debug
(fun k->k "encode str response with deflate (size %d, threshold %d)" (fun k->k "encode str response with deflate (size %d, threshold %d)"
(String.length s) compress_above); (String.length s) compress_above);
let body = let body =
encode_deflate_stream_ ~buf_size @@ S.Byte_stream.of_string s encode_deflate_stream_ ~buf_size @@ S.Byte_stream.of_string s
in in
Some { resp
resp with |> S.Response.update_headers update_headers
headers=set_headers resp.headers; body=`Stream body; |> S.Response.set_body (`Stream body)
}
| `Stream str -> | `Stream str ->
S._debug (fun k->k "encode stream response with deflate"); S._debug (fun k->k "encode stream response with deflate");
Some { resp
resp with |> S.Response.update_headers update_headers
headers= set_headers resp.headers; |> S.Response.set_body (`Stream (encode_deflate_stream_ ~buf_size str))
body=`Stream (encode_deflate_stream_ ~buf_size str);
} | `String _ | `Void -> resp
| `String _ | `Void -> None ) else resp
) else None
let middleware
?(compress_above=16 * 1024)
?(buf_size=16 * 1_024)
() : S.Middleware.t =
let buf_size = max buf_size 1_024 in
fun h req ~resp ->
let req = decompress_req_stream_ ~buf_size req in
h req
~resp:(fun response ->
resp @@ compress_resp_stream_ ~buf_size ~compress_above req response)
let setup let setup
?(compress_above=500*1024) ?compress_above ?buf_size server =
?(buf_size=48 * 1_024) (server:S.t) : unit = let m = middleware ?compress_above ?buf_size () in
let buf_size = max buf_size 1_024 in S._debug (fun k->k "setup gzip support");
S._debug (fun k->k "setup gzip support (buf-size %d)" buf_size); S.add_middleware ~stage:`Encoding server m
S.add_decode_request_cb server (cb_decode_compressed_stream ~buf_size);
S.add_encode_response_cb server (cb_encode_compressed_stream ~compress_above ~buf_size);
()

View file

@ -1,8 +1,13 @@
val middleware :
?compress_above:int ->
?buf_size:int -> unit ->
Tiny_httpd.Middleware.t
val setup : val setup :
?compress_above:int -> ?compress_above:int ->
?buf_size:int -> Tiny_httpd.t -> unit ?buf_size:int -> Tiny_httpd.t -> unit
(** Install callbacks for tiny_httpd to be able to encode/decode (** Install middleware for tiny_httpd to be able to encode/decode
compressed streams compressed streams
@param compress_above threshold above with string responses are compressed @param compress_above threshold above with string responses are compressed
@param buf_size size of the underlying buffer for compression/decompression *) @param buf_size size of the underlying buffer for compression/decompression *)

View file

@ -1,6 +1,6 @@
#!/usr/bin/env sh #!/usr/bin/env sh
rm data if [ -f data ]; then rm data ; fi
SERVER=$1 SERVER=$1
PORT=8087 PORT=8087