Merge pull request #65 from c-cube/wip-cleanup-2023-06-20

cleanup and IO backend
This commit is contained in:
Simon Cruanes 2023-06-21 00:12:42 -04:00 committed by GitHub
commit 80ed51576b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 595 additions and 173 deletions

View file

@ -2,11 +2,12 @@
all: build test
OPTS?=--profile=release
build:
@dune build @install
@dune build @install $(OPTS)
test:
@dune runtest --no-buffer --force
@dune runtest --no-buffer --force $(OPTS)
clean:
@dune clean
@ -16,7 +17,7 @@ doc:
WATCH?= "@install @runtest"
watch:
@dune build $(WATCH) -w
@dune build $(OPTS) $(WATCH) -w
.PHONY: benchs tests build watch

View file

@ -1,2 +1,2 @@
#!/bin/sh
exec dune exec --profile=release "examples/echo.exe" -- $@
exec dune exec --display=quiet --profile=release "examples/echo.exe" -- $@

View file

@ -33,6 +33,16 @@ let middleware_stat () : S.Middleware.t * (unit -> string) =
in
m, get_stat
(* ugly AF *)
let base64 x =
let ic, oc = Unix.open_process "base64" in
output_string oc x;
flush oc;
close_out oc;
let r = input_line ic in
ignore (Unix.close_process (ic, oc));
r
let () =
let port_ = ref 8080 in
let j = ref 32 in
@ -106,6 +116,35 @@ let () =
S.Response.fail ~code:500 "couldn't upload file: %s"
(Printexc.to_string e));
(* protected by login *)
S.add_route_handler server
S.Route.(exact "protected" @/ return)
(fun req ->
let ok =
match S.Request.get_header req "authorization" with
| Some v ->
S._debug (fun k -> k "authenticate with %S" v);
v = "Basic " ^ base64 "user:foobar"
| None -> false
in
if ok then (
(* FIXME: a logout link *)
let s =
"<p>hello, this is super secret!</p><a href=\"/logout\">log out</a>"
in
S.Response.make_string (Ok s)
) else (
let headers =
S.Headers.(empty |> set "www-authenticate" "basic realm=\"echo\"")
in
S.Response.fail ~code:401 ~headers "invalid"
));
(* logout *)
S.add_route_handler server
S.Route.(exact "logout" @/ return)
(fun _req -> S.Response.fail ~code:401 "logged out");
(* stats *)
S.add_route_handler server
S.Route.(exact "stats" @/ return)
@ -171,6 +210,24 @@ let () =
txt " (GET) to access a VFS embedded in the binary";
];
];
li []
[
pre []
[
a [ A.href "/protected" ] [ txt "/protected" ];
txt
" (GET) to see a protected page (login: user, \
password: foobar)";
];
];
li []
[
pre []
[
a [ A.href "/logout" ] [ txt "/logout" ];
txt " (POST) to log out";
];
];
];
];
]

View file

@ -4,3 +4,4 @@ include Tiny_httpd_server
module Util = Tiny_httpd_util
module Dir = Tiny_httpd_dir
module Html = Tiny_httpd_html
module IO = Tiny_httpd_io

View file

@ -85,6 +85,10 @@ module Buf = Tiny_httpd_buf
module Byte_stream = Tiny_httpd_stream
(** {2 IO Abstraction} *)
module IO = Tiny_httpd_io
(** {2 Main Server Type} *)
(** @inline *)

View file

@ -20,6 +20,16 @@ let add_bytes (self : t) s i len : unit =
Bytes.blit s i self.bytes self.i len;
self.i <- self.i + len
let[@inline] add_string self str : unit =
add_bytes self (Bytes.unsafe_of_string str) 0 (String.length str)
let add_buffer (self : t) (buf : Buffer.t) : unit =
let len = Buffer.length buf in
if self.i + len >= Bytes.length self.bytes then
resize self (self.i + (self.i / 2) + len + 10);
Buffer.blit buf 0 self.bytes self.i len;
self.i <- self.i + len
let contents (self : t) : string = Bytes.sub_string self.bytes 0 self.i
let contents_and_clear (self : t) : string =

View file

@ -24,3 +24,11 @@ val contents_and_clear : t -> string
val add_bytes : t -> bytes -> int -> int -> unit
(** Append given bytes slice to the buffer.
@since 0.5 *)
val add_string : t -> string -> unit
(** Add string.
@since NEXT_RELEASE *)
val add_buffer : t -> Buffer.t -> unit
(** Append bytes from buffer.
@since NEXT_RELEASE *)

103
src/Tiny_httpd_io.ml Normal file
View file

@ -0,0 +1,103 @@
(** IO abstraction.
We abstract IO so we can support classic unix blocking IOs
with threads, and modern async IO with Eio.
{b NOTE}: experimental.
@since NEXT_RELEASE
*)
module Buf = Tiny_httpd_buf
module In_channel = struct
type t = {
input: bytes -> int -> int -> int;
(** Read into the slice. Returns [0] only if the
channel is closed. *)
close: unit -> unit;
}
let of_in_channel ?(close_noerr = false) (ic : in_channel) : t =
{
input = (fun buf i len -> input ic buf i len);
close =
(fun () ->
if close_noerr then
close_in_noerr ic
else
close_in ic);
}
let of_unix_fd ?(close_noerr = false) (fd : Unix.file_descr) : t =
{
input = (fun buf i len -> Unix.read fd buf i len);
close =
(fun () ->
if close_noerr then (
try Unix.close fd with _ -> ()
) else
Unix.close fd);
}
let[@inline] input (self : t) buf i len = self.input buf i len
let[@inline] close self : unit = self.close ()
end
module Out_channel = struct
type t = {
output: bytes -> int -> int -> unit; (** Output slice *)
flush: unit -> unit; (** Flush underlying buffer *)
close: unit -> unit;
}
let of_out_channel ?(close_noerr = false) (oc : out_channel) : t =
{
output = (fun buf i len -> output oc buf i len);
flush = (fun () -> flush oc);
close =
(fun () ->
if close_noerr then
close_out_noerr oc
else
close_out oc);
}
let[@inline] output (self : t) buf i len : unit = self.output buf i len
let[@inline] output_string (self : t) (str : string) : unit =
self.output (Bytes.unsafe_of_string str) 0 (String.length str)
let[@inline] close self : unit = self.close ()
let[@inline] flush self : unit = self.flush ()
let output_buf (self : t) (buf : Buf.t) : unit =
let b = Buf.bytes_slice buf in
output self b 0 (Buf.size buf)
end
(** A TCP server abstraction *)
module TCP_server = struct
type conn_handler = {
handle: In_channel.t -> Out_channel.t -> unit;
(** Handle client connection *)
}
type t = {
endpoint: unit -> string * int;
(** Endpoint we listen on. This can only be called from within [serve]. *)
active_connections: unit -> int;
(** Number of connections currently active *)
running: unit -> bool; (** Is the server currently running? *)
stop: unit -> unit;
(** Ask the server to stop. This might not take effect immediately. *)
}
(** Running server. *)
type builder = {
serve: after_init:(t -> unit) -> handle:conn_handler -> unit -> unit;
(** Blocking call to listen for incoming connections and handle them.
Uses the connection handler to handle individual client connections. *)
}
(** A TCP server implementation. *)
end

View file

@ -18,6 +18,7 @@ let _debug k =
module Buf = Tiny_httpd_buf
module Byte_stream = Tiny_httpd_stream
module IO = Tiny_httpd_io
exception Bad_req of int * string
@ -40,6 +41,7 @@ module Response_code = struct
| 302 -> "Found"
| 304 -> "Not Modified"
| 400 -> "Bad request"
| 401 -> "Unauthorized"
| 403 -> "Forbidden"
| 404 -> "Not found"
| 405 -> "Method not allowed"
@ -422,9 +424,19 @@ module Response = struct
Format.fprintf out "{@[code=%d;@ headers=[@[%a@]];@ body=%a@]}" self.code
Headers.pp self.headers pp_body self.body
let output_ (oc : out_channel) (self : t) : unit =
Printf.fprintf oc "HTTP/1.1 %d %s\r\n" self.code
let output_ ?(buf = Buf.create ~size:256 ()) (oc : IO.Out_channel.t)
(self : t) : unit =
(* double indirection:
- print into [buffer] using [bprintf]
- transfer to [buf_] so we can output from there *)
let tmp_buffer = Buffer.create 32 in
(* write start of reply *)
Printf.bprintf tmp_buffer "HTTP/1.1 %d %s\r\n" self.code
(Response_code.descr self.code);
Buf.add_buffer buf tmp_buffer;
Buffer.clear tmp_buffer;
let body, is_chunked =
match self.body with
| `String s when String.length s > 1024 * 500 ->
@ -446,19 +458,29 @@ module Response = struct
_debug (fun k ->
k "output response: %s"
(Format.asprintf "%a" pp { self with body = `String "<…>" }));
List.iter (fun (k, v) -> Printf.fprintf oc "%s: %s\r\n" k v) headers;
output_string oc "\r\n";
(* write headers *)
List.iter
(fun (k, v) ->
Printf.bprintf tmp_buffer "%s: %s\r\n" k v;
Buf.add_buffer buf tmp_buffer;
Buffer.clear tmp_buffer)
headers;
IO.Out_channel.output_buf oc buf;
IO.Out_channel.output_string oc "\r\n";
(match body with
| `String "" | `Void -> ()
| `String s -> output_string oc s
| `String s -> IO.Out_channel.output_string oc s
| `Stream str ->
(try
Byte_stream.output_chunked oc str;
Byte_stream.output_chunked' oc str;
Byte_stream.close str
with e ->
Byte_stream.close str;
raise e));
flush oc
IO.Out_channel.flush oc
end
(* semaphore, for limiting concurrency. *)
@ -592,7 +614,7 @@ module Middleware = struct
end
(* a request handler. handles a single request. *)
type cb_path_handler = out_channel -> Middleware.handler
type cb_path_handler = IO.Out_channel.t -> Middleware.handler
module type SERVER_SENT_GENERATOR = sig
val set_headers : Headers.t -> unit
@ -605,18 +627,26 @@ end
type server_sent_generator = (module SERVER_SENT_GENERATOR)
module type IO_BACKEND = sig
val init_addr : unit -> string
val init_port : unit -> int
val spawn : (unit -> unit) -> unit
(** function used to spawn a new thread to handle a
new client connection. By default it is {!Thread.create} but one
could use a thread pool instead.*)
val get_time_s : unit -> float
(** obtain the current timestamp in seconds. *)
val tcp_server : unit -> Tiny_httpd_io.TCP_server.builder
(** Server that can listen on a port and handle clients. *)
end
type t = {
addr: string; (** Address at creation *)
port: int; (** Port at creation *)
mutable sock: Unix.file_descr option; (** Socket *)
timeout: float;
sem_max_connections: Sem_.t;
(* semaphore to restrict the number of active concurrent connections *)
new_thread: (unit -> unit) -> unit;
(* a function to run the given callback in a separate thread (or thread pool) *)
masksigpipe: bool;
backend: (module IO_BACKEND);
mutable tcp_server: IO.TCP_server.t option;
buf_size: int;
get_time_s: unit -> float;
mutable handler: string Request.t -> Response.t;
(** toplevel handler, if any *)
mutable middlewares: (int * Middleware.t) list; (** Global middlewares *)
@ -625,9 +655,6 @@ type t = {
mutable path_handlers:
(unit Request.t -> cb_path_handler resp_result option) list;
(** path handlers *)
mutable running: bool;
(** true while the server is running. no need to protect with a mutex,
writes should be atomic enough. *)
}
let get_addr_ sock =
@ -635,17 +662,24 @@ let get_addr_ sock =
| Unix.ADDR_INET (addr, port) -> addr, port
| _ -> invalid_arg "httpd: address is not INET"
let addr self =
match self.sock with
| None -> self.addr
| Some s -> Unix.string_of_inet_addr (fst @@ get_addr_ s)
let addr (self : t) =
match self.tcp_server with
| None ->
let (module B) = self.backend in
B.init_addr ()
| Some s -> fst @@ s.endpoint ()
let port self =
match self.sock with
| None -> self.port
| Some sock -> snd @@ get_addr_ sock
let port (self : t) =
match self.tcp_server with
| None ->
let (module B) = self.backend in
B.init_port ()
| Some s -> snd @@ s.endpoint ()
let active_connections self = Sem_.num_acquired self.sem_max_connections - 1
let active_connections (self : t) =
match self.tcp_server with
| None -> 0
| Some s -> s.active_connections ()
let add_middleware ~stage self m =
let stage =
@ -725,8 +759,10 @@ let[@inline] _opt_iter ~f o =
| None -> ()
| Some x -> f x
exception Exit_SSE
let add_route_server_sent_handler ?accept self route f =
let tr_req oc req ~resp f =
let tr_req (oc : IO.Out_channel.t) req ~resp f =
let req = Request.read_body_full ~buf_size:self.buf_size req in
let headers =
ref Headers.(empty |> set "content-type" "text/event-stream")
@ -745,16 +781,20 @@ let add_route_server_sent_handler ?accept self route f =
)
in
let[@inline] writef fmt =
Printf.ksprintf (IO.Out_channel.output_string oc) fmt
in
let send_event ?event ?id ?retry ~data () : unit =
send_response_idempotent_ ();
_opt_iter event ~f:(fun e -> Printf.fprintf oc "event: %s\n" e);
_opt_iter id ~f:(fun e -> Printf.fprintf oc "id: %s\n" e);
_opt_iter retry ~f:(fun e -> Printf.fprintf oc "retry: %s\n" e);
_opt_iter event ~f:(fun e -> writef "event: %s\n" e);
_opt_iter id ~f:(fun e -> writef "id: %s\n" e);
_opt_iter retry ~f:(fun e -> writef "retry: %s\n" e);
let l = String.split_on_char '\n' data in
List.iter (fun s -> Printf.fprintf oc "data: %s\n" s) l;
output_string oc "\n";
List.iter (fun s -> writef "data: %s\n" s) l;
IO.Out_channel.output_string oc "\n";
(* finish group *)
flush oc
IO.Out_channel.flush oc
in
let module SSG = struct
let set_headers h =
@ -764,32 +804,26 @@ let add_route_server_sent_handler ?accept self route f =
)
let send_event = send_event
let close () = raise Exit
let close () = raise Exit_SSE
end in
try f req (module SSG : SERVER_SENT_GENERATOR) with Exit -> close_out oc
try f req (module SSG : SERVER_SENT_GENERATOR)
with Exit_SSE -> IO.Out_channel.close oc
in
add_route_handler_ self ?accept ~meth:`GET route ~tr_req f
let create ?(masksigpipe = true) ?(max_connections = 32) ?(timeout = 0.0)
?(buf_size = 16 * 1_024) ?(get_time_s = Unix.gettimeofday)
?(new_thread = fun f -> ignore (Thread.create f () : Thread.t))
?(addr = "127.0.0.1") ?(port = 8080) ?sock ?(middlewares = []) () : t =
let handler _req = Response.fail ~code:404 "no top handler" in
let get_max_connection_ ?(max_connections = 64) () : int =
let max_connections = max 4 max_connections in
max_connections
let create_from ?(buf_size = 16 * 1_024) ?(middlewares = []) ~backend () : t =
let handler _req = Response.fail ~code:404 "no top handler" in
let self =
{
new_thread;
addr;
port;
sock;
masksigpipe;
backend;
tcp_server = None;
handler;
buf_size;
running = true;
sem_max_connections = Sem_.create max_connections;
path_handlers = [];
timeout;
get_time_s;
middlewares = [];
middlewares_sorted = lazy [];
}
@ -797,7 +831,149 @@ let create ?(masksigpipe = true) ?(max_connections = 32) ?(timeout = 0.0)
List.iter (fun (stage, m) -> add_middleware self ~stage m) middlewares;
self
let stop s = s.running <- false
let is_ipv6_str addr : bool = String.contains addr ':'
module Unix_tcp_server_ = struct
type t = {
addr: string;
port: int;
max_connections: int;
sem_max_connections: Sem_.t;
(** semaphore to restrict the number of active concurrent connections *)
mutable sock: Unix.file_descr option; (** Socket *)
new_thread: (unit -> unit) -> unit;
timeout: float;
masksigpipe: bool;
mutable running: bool; (* TODO: use an atomic? *)
}
let to_tcp_server (self : t) : IO.TCP_server.builder =
{
IO.TCP_server.serve =
(fun ~after_init ~handle () : unit ->
if self.masksigpipe then
ignore (Unix.sigprocmask Unix.SIG_BLOCK [ Sys.sigpipe ] : _ list);
let sock, should_bind =
match self.sock with
| Some s ->
( s,
false
(* Because we're getting a socket from the caller (e.g. systemd) *)
)
| None ->
( Unix.socket
(if is_ipv6_str self.addr then
Unix.PF_INET6
else
Unix.PF_INET)
Unix.SOCK_STREAM 0,
true (* Because we're creating the socket ourselves *) )
in
Unix.clear_nonblock sock;
Unix.setsockopt_optint sock Unix.SO_LINGER None;
if should_bind then (
let inet_addr = Unix.inet_addr_of_string self.addr in
Unix.setsockopt sock Unix.SO_REUSEADDR true;
Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port));
let n_listen = 2 * self.max_connections in
Unix.listen sock n_listen
);
self.sock <- Some sock;
let tcp_server =
{
IO.TCP_server.stop = (fun () -> self.running <- false);
running = (fun () -> self.running);
active_connections =
(fun () -> Sem_.num_acquired self.sem_max_connections - 1);
endpoint =
(fun () ->
let addr, port = get_addr_ sock in
Unix.string_of_inet_addr addr, port);
}
in
after_init tcp_server;
(* how to handle a single client *)
let handle_client_unix_ (client_sock : Unix.file_descr) : unit =
Unix.(setsockopt_float client_sock SO_RCVTIMEO self.timeout);
Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout);
let oc =
IO.Out_channel.of_out_channel
@@ Unix.out_channel_of_descr client_sock
in
let ic = IO.In_channel.of_unix_fd client_sock in
handle.handle ic oc;
_debug (fun k -> k "done with client, exiting");
(try Unix.close client_sock
with e ->
_debug (fun k ->
k "error when closing sock: %s" (Printexc.to_string e)));
()
in
while self.running do
(* limit concurrency *)
Sem_.acquire 1 self.sem_max_connections;
try
let client_sock, _ = Unix.accept sock in
self.new_thread (fun () ->
try
handle_client_unix_ client_sock;
Sem_.release 1 self.sem_max_connections
with e ->
(try Unix.close client_sock with _ -> ());
Sem_.release 1 self.sem_max_connections;
raise e)
with e ->
Sem_.release 1 self.sem_max_connections;
_debug (fun k ->
k "Unix.accept or Thread.create raised an exception: %s"
(Printexc.to_string e))
done;
());
}
end
let create ?(masksigpipe = true) ?max_connections ?(timeout = 0.0) ?buf_size
?(get_time_s = Unix.gettimeofday)
?(new_thread = fun f -> ignore (Thread.create f () : Thread.t))
?(addr = "127.0.0.1") ?(port = 8080) ?sock ?middlewares () : t =
let max_connections = get_max_connection_ ?max_connections () in
let server =
{
Unix_tcp_server_.addr;
new_thread;
running = true;
port;
sock;
max_connections;
sem_max_connections = Sem_.create max_connections;
masksigpipe;
timeout;
}
in
let tcp_server_builder = Unix_tcp_server_.to_tcp_server server in
let module B = struct
let init_addr () = addr
let init_port () = port
let get_time_s = get_time_s
let spawn f = new_thread f
let tcp_server () = tcp_server_builder
end in
let backend = (module B : IO_BACKEND) in
create_from ?buf_size ?middlewares ~backend ()
let stop (self : t) =
match self.tcp_server with
| None -> ()
| Some s -> s.stop ()
let running (self : t) =
match self.tcp_server with
| None -> false
| Some s -> s.running ()
let find_map f l =
let rec aux f = function
@ -809,16 +985,15 @@ let find_map f l =
in
aux f l
let handle_client_ (self : t) (client_sock : Unix.file_descr) : unit =
Unix.(setsockopt_float client_sock SO_RCVTIMEO self.timeout);
Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout);
let oc = Unix.out_channel_of_descr client_sock in
(* handle client on [ic] and [oc] *)
let client_handle_for (self : t) ic oc : unit =
let buf = Buf.create ~size:self.buf_size () in
let is = Byte_stream.of_fd ~buf_size:self.buf_size client_sock in
let is = Byte_stream.of_input ~buf_size:self.buf_size ic in
let continue = ref true in
while !continue && self.running do
while !continue && running self do
_debug (fun k -> k "read next request");
match Request.parse_req_start ~get_time_s:self.get_time_s ~buf is with
let (module B) = self.backend in
match Request.parse_req_start ~get_time_s:B.get_time_s ~buf is with
| Ok None -> continue := false (* client is done *)
| Error (c, s) ->
(* connection error, close *)
@ -832,7 +1007,7 @@ let handle_client_ (self : t) (client_sock : Unix.file_descr) : unit =
(try
(* is there a handler for this path? *)
let handler =
let base_handler =
match find_map (fun ph -> ph req) self.path_handlers with
| Some f -> unwrap_resp_result f
| None ->
@ -856,7 +1031,7 @@ let handle_client_ (self : t) (client_sock : Unix.file_descr) : unit =
List.fold_right
(fun (_, m) h -> m h)
(Lazy.force self.middlewares_sorted)
(handler oc)
(base_handler oc)
in
(* now actually read request's body into a stream *)
@ -888,62 +1063,24 @@ let handle_client_ (self : t) (client_sock : Unix.file_descr) : unit =
continue := false;
Response.output_ oc
@@ Response.fail ~code:500 "server error: %s" (Printexc.to_string e))
done;
_debug (fun k -> k "done with client, exiting");
(try Unix.close client_sock
with e ->
_debug (fun k -> k "error when closing sock: %s" (Printexc.to_string e)));
()
done
let is_ipv6 self = String.contains self.addr ':'
let client_handler (self : t) : IO.TCP_server.conn_handler =
{ IO.TCP_server.handle = client_handle_for self }
let is_ipv6 (self : t) =
let (module B) = self.backend in
is_ipv6_str (B.init_addr ())
(* TODO: init TCP server *)
let run ?(after_init = ignore) (self : t) : (unit, _) result =
try
if self.masksigpipe then
ignore (Unix.sigprocmask Unix.SIG_BLOCK [ Sys.sigpipe ] : _ list);
let sock, should_bind =
match self.sock with
| Some s ->
( s,
false
(* Because we're getting a socket from the caller (e.g. systemd) *) )
| None ->
( Unix.socket
(if is_ipv6 self then
Unix.PF_INET6
else
Unix.PF_INET)
Unix.SOCK_STREAM 0,
true (* Because we're creating the socket ourselves *) )
in
Unix.clear_nonblock sock;
Unix.setsockopt_optint sock Unix.SO_LINGER None;
if should_bind then (
let inet_addr = Unix.inet_addr_of_string self.addr in
Unix.setsockopt sock Unix.SO_REUSEADDR true;
Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port));
Unix.listen sock (2 * self.sem_max_connections.Sem_.n)
);
self.sock <- Some sock;
after_init ();
while self.running do
(* limit concurrency *)
Sem_.acquire 1 self.sem_max_connections;
try
let client_sock, _ = Unix.accept sock in
self.new_thread (fun () ->
try
handle_client_ self client_sock;
Sem_.release 1 self.sem_max_connections
with e ->
(try Unix.close client_sock with _ -> ());
Sem_.release 1 self.sem_max_connections;
raise e)
with e ->
Sem_.release 1 self.sem_max_connections;
_debug (fun k ->
k "Unix.accept or Thread.create raised an exception: %s"
(Printexc.to_string e))
done;
let (module B) = self.backend in
let server = B.tcp_server () in
server.serve
~after_init:(fun tcp_server ->
self.tcp_server <- Some tcp_server;
after_init ())
~handle:(client_handler self) ();
Ok ()
with e -> Error e

View file

@ -369,7 +369,7 @@ val create :
?middlewares:([ `Encoding | `Stage of int ] * Middleware.t) list ->
unit ->
t
(** Create a new webserver.
(** Create a new webserver using UNIX abstractions.
The server will not do anything until {!run} is called on it.
Before starting the server, one can use {!add_path_handler} and
@ -401,6 +401,41 @@ val create :
This parameter exists since 0.11.
*)
(** A backend that provides IO operations, network operations, etc. *)
module type IO_BACKEND = sig
val init_addr : unit -> string
val init_port : unit -> int
val spawn : (unit -> unit) -> unit
(** function used to spawn a new thread to handle a
new client connection. By default it is {!Thread.create} but one
could use a thread pool instead.*)
val get_time_s : unit -> float
(** obtain the current timestamp in seconds. *)
val tcp_server : unit -> Tiny_httpd_io.TCP_server.builder
(** Server that can listen on a port and handle clients. *)
end
val create_from :
?buf_size:int ->
?middlewares:([ `Encoding | `Stage of int ] * Middleware.t) list ->
backend:(module IO_BACKEND) ->
unit ->
t
(** Create a new webserver using provided backend.
The server will not do anything until {!run} is called on it.
Before starting the server, one can use {!add_path_handler} and
{!set_top_handler} to specify how to handle incoming requests.
@param buf_size size for buffers (since 0.11)
@param middlewares see {!add_middleware} for more details.
@since NEXT_RELEASE
*)
val addr : t -> string
(** Address on which the server listens. *)
@ -556,6 +591,10 @@ val add_route_server_sent_handler :
(** {2 Run the server} *)
val running : t -> bool
(** Is the server running?
@since NEXT_RELEASE *)
val stop : t -> unit
(** Ask the server to stop. This might not have an immediate effect
as {!run} might currently be waiting on IO. *)

View file

@ -1,4 +1,5 @@
module Buf = Tiny_httpd_buf
module IO = Tiny_httpd_io
let spf = Printf.sprintf
@ -45,37 +46,32 @@ let make ?(bs = Bytes.create @@ (16 * 1024)) ?(close = ignore) ~consume ~fill ()
in
self
let of_chan_ ?(buf_size = 16 * 1024) ~close ic : t =
let of_input ?(buf_size = 16 * 1024) (ic : IO.In_channel.t) : t =
make ~bs:(Bytes.create buf_size)
~close:(fun _ -> close ic)
~close:(fun _ -> IO.In_channel.close ic)
~consume:(fun self n ->
self.off <- self.off + n;
self.len <- self.len - n)
~fill:(fun self ->
if self.off >= self.len then (
self.off <- 0;
self.len <- input ic self.bs 0 (Bytes.length self.bs)
self.len <- IO.In_channel.input ic self.bs 0 (Bytes.length self.bs)
))
()
let of_chan = of_chan_ ~close:close_in
let of_chan_close_noerr = of_chan_ ~close:close_in_noerr
let of_chan_ ?buf_size ic ~close_noerr : t =
let inc = IO.In_channel.of_in_channel ~close_noerr ic in
of_input ?buf_size inc
let of_fd_ ?(buf_size = 16 * 1024) ~close ic : t =
make ~bs:(Bytes.create buf_size)
~close:(fun _ -> close ic)
~consume:(fun self n ->
self.off <- self.off + n;
self.len <- self.len - n)
~fill:(fun self ->
if self.off >= self.len then (
self.off <- 0;
self.len <- Unix.read ic self.bs 0 (Bytes.length self.bs)
))
()
let of_chan ?buf_size ic = of_chan_ ?buf_size ic ~close_noerr:false
let of_chan_close_noerr ?buf_size ic = of_chan_ ?buf_size ic ~close_noerr:true
let of_fd = of_fd_ ~close:Unix.close
let of_fd_close_noerr = of_fd_ ~close:(fun f -> try Unix.close f with _ -> ())
let of_fd_ ?buf_size ~close_noerr ic : t =
let inc = IO.In_channel.of_unix_fd ~close_noerr ic in
of_input ?buf_size inc
let of_fd ?buf_size fd : t = of_fd_ ?buf_size ~close_noerr:false fd
let of_fd_close_noerr ?buf_size fd : t = of_fd_ ?buf_size ~close_noerr:true fd
let rec iter f (self : t) : unit =
self.fill_buf ();
@ -90,6 +86,9 @@ let rec iter f (self : t) : unit =
let to_chan (oc : out_channel) (self : t) =
iter (fun s i len -> output oc s i len) self
let to_chan' (oc : IO.Out_channel.t) (self : t) =
iter (fun s i len -> IO.Out_channel.output oc s i len) self
let of_bytes ?(i = 0) ?len (bs : bytes) : t =
(* invariant: !i+!len is constant *)
let len =
@ -298,19 +297,22 @@ let read_chunked ?(buf = Buf.create ()) ~fail (bs : t) : t =
refill := false)
()
(* print a stream as a series of chunks *)
let output_chunked (oc : out_channel) (self : t) : unit =
let output_chunked' (oc : IO.Out_channel.t) (self : t) : unit =
let continue = ref true in
while !continue do
(* next chunk *)
self.fill_buf ();
let n = self.len in
Printf.fprintf oc "%x\r\n" n;
output oc self.bs self.off n;
IO.Out_channel.output_string oc (Printf.sprintf "%x\r\n" n);
IO.Out_channel.output oc self.bs self.off n;
self.consume n;
if n = 0 then continue := false;
output_string oc "\r\n"
IO.Out_channel.output_string oc "\r\n"
done;
(* write another crlf after the stream (see #56) *)
output_string oc "\r\n";
IO.Out_channel.output_string oc "\r\n";
()
(* print a stream as a series of chunks *)
let output_chunked (oc : out_channel) (self : t) : unit =
output_chunked' (IO.Out_channel.of_out_channel oc) self

View file

@ -27,8 +27,7 @@ type t = {
_rest: hidden; (** Use {!make} to build a stream. *)
}
(** A buffered stream, with a view into the current buffer (or refill if empty),
and a function to consume [n] bytes.
See {!Byte_stream} for more details. *)
and a function to consume [n] bytes. *)
val close : t -> unit
(** Close stream *)
@ -36,6 +35,10 @@ val close : t -> unit
val empty : t
(** Stream with 0 bytes inside *)
val of_input : ?buf_size:int -> Tiny_httpd_io.In_channel.t -> t
(** Make a buffered stream from the given channel.
@since NEXT_RELEASE *)
val of_chan : ?buf_size:int -> in_channel -> t
(** Make a buffered stream from the given channel. *)
@ -62,6 +65,10 @@ val to_chan : out_channel -> t -> unit
(** Write the stream to the channel.
@since 0.3 *)
val to_chan' : Tiny_httpd_io.Out_channel.t -> t -> unit
(** Write to the IO channel.
@since NEXT_RELEASE *)
val make :
?bs:bytes ->
?close:(t -> unit) ->
@ -111,3 +118,7 @@ val read_exactly :
val output_chunked : out_channel -> t -> unit
(** Write the stream into the channel, using the chunked encoding. *)
val output_chunked' : Tiny_httpd_io.Out_channel.t -> t -> unit
(** Write the stream into the channel, using the chunked encoding.
@since NEXT_RELEASE *)

View file

@ -1,8 +1,13 @@
(env
(_
(flags :standard -warn-error -a+8 -w +a-4-32-40-42-44-70 -color always -safe-string
-strict-sequence)))
(library
(name tiny_httpd)
(public_name tiny_httpd)
(libraries threads seq)
(flags :standard -safe-string -strict-sequence -w +a-4-40-42 -warn-error -a+8)
(wrapped false))
(rule

View file

@ -1,5 +1,5 @@
(test
(name t_util)
(tests
(names t_util t_buf)
(package tiny_httpd)
(libraries tiny_httpd qcheck-core qcheck-core.runner))
(libraries tiny_httpd qcheck-core qcheck-core.runner test_util))

27
tests/unit/t_buf.ml Normal file
View file

@ -0,0 +1,27 @@
open Test_util
open Tiny_httpd_buf
let spf = Printf.sprintf
let () =
let b = create ~size:4 () in
add_string b "hello";
assert_eq ~to_string:(spf "%S") "hello" (contents b);
add_string b " world";
assert_eq ~to_string:(spf "%S") "hello world" (contents b);
()
let buffer_of_string str =
let buf = Buffer.create 32 in
Buffer.add_string buf str;
buf
let () =
let b = create ~size:4 () in
add_buffer b (buffer_of_string "hello");
assert_eq ~to_string:(spf "%S") "hello" (contents b);
add_buffer b (buffer_of_string " world");
assert_eq ~to_string:(spf "%S") "hello world" (contents b);
()

View file

@ -1,25 +1,6 @@
module Q = QCheck
(* test utils *)
let pp_res f = function
| Ok x -> f x
| Error e -> e
let pp_res_query = Q.Print.(pp_res (list (pair string string)))
let err_map f = function
| Ok x -> Ok (f x)
| Error e -> Error e
let sort_l l = List.sort compare l
let eq_sorted a b = err_map sort_l a = err_map sort_l b
let is_ascii_char c = Char.code c < 128
let assert_eq ?(cmp = ( = )) a b = assert (cmp a b)
open Test_util
open Tiny_httpd_util
let qchecks = ref []
let add_qcheck f = qchecks := f :: !qchecks
let () = assert_eq "hello%20world" (percent_encode "hello world")
let () = assert_eq "%23%25^%24%40^%40" (percent_encode "#%^$@^@")
@ -67,4 +48,4 @@ let () =
in
eq_sorted (Ok l) (parse_query s))
let () = exit @@ QCheck_base_runner.run_tests ~colors:false !qchecks
let () = run_qcheck_and_exit ()

5
tests/unit/util/dune Normal file
View file

@ -0,0 +1,5 @@
(library
(name test_util)
(modules test_util)
(libraries qcheck-core qcheck-core.runner))

View file

@ -0,0 +1,31 @@
module Q = QCheck
(* test utils *)
let pp_res f = function
| Ok x -> f x
| Error e -> e
let pp_res_query = Q.Print.(pp_res (list (pair string string)))
let err_map f = function
| Ok x -> Ok (f x)
| Error e -> Error e
let sort_l l = List.sort compare l
let eq_sorted a b = err_map sort_l a = err_map sort_l b
let is_ascii_char c = Char.code c < 128
let assert_eq ?to_string ?(cmp = ( = )) a b =
let ok = cmp a b in
if not ok then (
(match to_string with
| Some f -> Printf.eprintf "failed: %s != %s\n%!" (f a) (f b)
| None -> ());
failwith "test failed"
)
let qchecks = ref []
let add_qcheck f = qchecks := f :: !qchecks
let run_qcheck_and_exit () : 'a =
exit @@ QCheck_base_runner.run_tests ~colors:false !qchecks