Compare commits

...

13 commits

Author SHA1 Message Date
Simon Cruanes
ac466a8fcb
Merge a56dd0ec65 into 8a8aadfbb0 2025-06-28 19:08:46 +00:00
Simon Cruanes
8a8aadfbb0
doc
Some checks failed
github pages / deploy (push) Has been cancelled
build / build (4.08.x, ubuntu-latest) (push) Has been cancelled
build / build (4.14.x, ubuntu-latest) (push) Has been cancelled
build / build (5.03.x, ubuntu-latest) (push) Has been cancelled
2025-06-24 21:13:18 -04:00
Simon Cruanes
9a1343aef7
remove global withlock builder, pass it as argument instead
Some checks failed
github pages / deploy (push) Has been cancelled
build / build (4.08.x, ubuntu-latest) (push) Has been cancelled
build / build (4.14.x, ubuntu-latest) (push) Has been cancelled
build / build (5.03.x, ubuntu-latest) (push) Has been cancelled
2025-06-23 10:08:07 -04:00
Simon Cruanes
f10992ec32
feat WS: abstraction for critical section
Some checks failed
github pages / deploy (push) Has been cancelled
build / build (4.08.x, ubuntu-latest) (push) Has been cancelled
build / build (4.14.x, ubuntu-latest) (push) Has been cancelled
build / build (5.03.x, ubuntu-latest) (push) Has been cancelled
can be replaced with a proper cooperative lock
2025-06-20 18:03:40 -04:00
Simon Cruanes
0f917ddf72
format
Some checks failed
github pages / deploy (push) Has been cancelled
build / build (4.08.x, ubuntu-latest) (push) Has been cancelled
build / build (4.14.x, ubuntu-latest) (push) Has been cancelled
build / build (5.03.x, ubuntu-latest) (push) Has been cancelled
2025-06-06 22:25:48 -04:00
Simon Cruanes
03c3e09f12
feat route: add to_url, to produce a URL path from a route
provide arguments and get the corresponding path, which makes
it easy to build a full URL if needed.
2025-06-06 22:25:01 -04:00
Simon Cruanes
023805232f
fix warnings in C stubs 2025-06-06 22:24:52 -04:00
Simon Cruanes
022a495de3
fix warnings 2025-06-06 22:24:39 -04:00
Simon Cruanes
a56dd0ec65
add echo_mio.sh 2024-09-06 17:19:56 -04:00
Simon Cruanes
7f9fae1fc8
test: echo_mio: add a heavier endpoint 2024-09-03 15:51:15 -04:00
Simon Cruanes
e199162e1f
fix: also make server socket nonblocking 2024-09-03 15:40:26 -04:00
Simon Cruanes
cf9c14b1c2
basic test for moonpool-io 2024-09-03 15:17:25 -04:00
Simon Cruanes
c43ffb5ff4
feat: add tiny_httpd_moonpool library 2024-09-03 15:16:33 -04:00
20 changed files with 804 additions and 61 deletions

View file

@ -1,4 +1,4 @@
version = 0.26.2 version = 0.27.0
profile=conventional profile=conventional
margin=80 margin=80
if-then-else=k-r if-then-else=k-r

View file

@ -39,3 +39,14 @@
(iostream-camlzip (>= 0.2.1)) (iostream-camlzip (>= 0.2.1))
(logs :with-test) (logs :with-test)
(odoc :with-doc))) (odoc :with-doc)))
(package
(name tiny_httpd_moonpool)
(synopsis "Moonpool+picos_stdio backend for Tiny_httpd")
(depends
seq
(tiny_httpd (= :version))
(moonpool (>= 0.7))
(moonpool-io (>= 0.7))
(ocaml (>= 5.0))
(odoc :with-doc)))

2
echo_mio.sh Executable file
View file

@ -0,0 +1,2 @@
#!/bin/sh
exec dune exec --display=quiet --profile=release "examples/echo_mio.exe" -- $@

View file

@ -14,6 +14,12 @@
(modules echo vfs) (modules echo vfs)
(libraries tiny_httpd logs tiny_httpd_camlzip tiny_httpd.multipart-form-data)) (libraries tiny_httpd logs tiny_httpd_camlzip tiny_httpd.multipart-form-data))
(executable
(name echo_mio)
(flags :standard -warn-error -a+8)
(modules echo_mio)
(libraries tiny_httpd tiny_httpd_moonpool logs))
(executable (executable
(name writer) (name writer)
(flags :standard -warn-error -a+8) (flags :standard -warn-error -a+8)

294
examples/echo_mio.ml Normal file
View file

@ -0,0 +1,294 @@
open Tiny_httpd_core
module Log = Tiny_httpd.Log
let now_ = Unix.gettimeofday
let alice_text =
"CHAPTER I. Down the Rabbit-Hole Alice was beginning to get very tired of \
sitting by her sister on the bank, and of having nothing to do: once or \
twice she had peeped into the book her sister was reading, but it had no \
pictures or conversations in it, <and what is the use of a book,> thought \
Alice <without pictures or conversations?> So she was considering in her \
own mind (as well as she could, for the hot day made her feel very sleepy \
and stupid), whether the pleasure of making a daisy-chain would be worth \
the trouble of getting up and picking the daisies, when suddenly a White \
Rabbit with pink eyes ran close by her. There was nothing so very \
remarkable in that; nor did Alice think it so very much out of the way to \
hear the Rabbit say to itself, <Oh dear! Oh dear! I shall be late!> (when \
she thought it over afterwards, it occurred to her that she ought to have \
wondered at this, but at the time it all seemed quite natural); but when \
the Rabbit actually took a watch out of its waistcoat-pocket, and looked at \
it, and then hurried on, Alice started to her feet, for it flashed across \
her mind that she had never before seen a rabbit with either a \
waistcoat-pocket, or a watch to take out of it, and burning with curiosity, \
she ran across the field after it, and fortunately was just in time to see \
it pop down a large rabbit-hole under the hedge. In another moment down \
went Alice after it, never once considering how in the world she was to get \
out again. The rabbit-hole went straight on like a tunnel for some way, and \
then dipped suddenly down, so suddenly that Alice had not a moment to think \
about stopping herself before she found herself falling down a very deep \
well. Either the well was very deep, or she fell very slowly, for she had \
plenty of time as she went down to look about her and to wonder what was \
going to happen next. First, she tried to look down and make out what she \
was coming to, but it was too dark to see anything; then she looked at the \
sides of the well, and noticed that they were filled with cupboards......"
(* util: a little middleware collecting statistics *)
let middleware_stat () : Server.Middleware.t * (unit -> string) =
let n_req = ref 0 in
let total_time_ = ref 0. in
let parse_time_ = ref 0. in
let build_time_ = ref 0. in
let write_time_ = ref 0. in
let m h req ~resp =
incr n_req;
let t1 = Request.start_time req in
let t2 = now_ () in
h req ~resp:(fun response ->
let t3 = now_ () in
resp response;
let t4 = now_ () in
total_time_ := !total_time_ +. (t4 -. t1);
parse_time_ := !parse_time_ +. (t2 -. t1);
build_time_ := !build_time_ +. (t3 -. t2);
write_time_ := !write_time_ +. (t4 -. t3))
and get_stat () =
Printf.sprintf
"%d requests (average response time: %.3fms = %.3fms + %.3fms + %.3fms)"
!n_req
(!total_time_ /. float !n_req *. 1e3)
(!parse_time_ /. float !n_req *. 1e3)
(!build_time_ /. float !n_req *. 1e3)
(!write_time_ /. float !n_req *. 1e3)
in
m, get_stat
(* ugly AF *)
let base64 x =
let ic, oc = Unix.open_process "base64" in
output_string oc x;
flush oc;
close_out oc;
let r = input_line ic in
ignore (Unix.close_process (ic, oc));
r
let setup_logging () =
Logs.set_reporter @@ Logs.format_reporter ();
Logs.set_level ~all:true (Some Logs.Debug)
let () =
Moonpool_fib.main @@ fun _ ->
let port_ = ref 8080 in
let max_conn = ref 800 in
let j = ref 16 in
Arg.parse
(Arg.align
[
"--port", Arg.Set_int port_, " set port";
"-p", Arg.Set_int port_, " set port";
"--debug", Arg.Unit setup_logging, " enable debug";
( "--max-connections",
Arg.Set_int max_conn,
" maximum number of connections" );
"-j", Arg.Set_int j, " Size of thread pool";
])
(fun _ -> raise (Arg.Bad ""))
"echo [option]*";
let runner = Moonpool.Ws_pool.create ~num_threads:!j () in
let server : Server.t =
Tiny_httpd_moonpool.create ~runner ~port:!port_ ~max_connections:!max_conn
()
in
let m_stats, get_stats = middleware_stat () in
Server.add_middleware server ~stage:(`Stage 1) m_stats;
(* say hello *)
Server.add_route_handler ~meth:`GET server
Route.(exact "hello" @/ string @/ return)
(fun name _req -> Response.make_string (Ok ("hello " ^ name ^ "!\n")));
(* compressed file access *)
Server.add_route_handler ~meth:`GET server
Route.(exact "zcat" @/ string_urlencoded @/ return)
(fun path _req ->
let ic = open_in path in
let str = IO.Input.of_in_channel ic in
let mime_type =
try
let p = Unix.open_process_in (Printf.sprintf "file -i -b %S" path) in
try
let s = [ "Content-Type", String.trim (input_line p) ] in
ignore @@ Unix.close_process_in p;
s
with _ ->
ignore @@ Unix.close_process_in p;
[]
with _ -> []
in
Response.make_stream ~headers:mime_type (Ok str));
(* echo request *)
Server.add_route_handler server
Route.(exact "echo" @/ return)
(fun req ->
let q =
Request.query req
|> List.map (fun (k, v) -> Printf.sprintf "%S = %S" k v)
|> String.concat ";"
in
Response.make_string
(Ok (Format.asprintf "echo:@ %a@ (query: %s)@." Request.pp req q)));
(* file upload *)
Server.add_route_handler_stream ~meth:`PUT server
Route.(exact "upload" @/ string @/ return)
(fun path req ->
Log.debug (fun k ->
k "start upload %S, headers:\n%s\n\n%!" path
(Format.asprintf "%a" Headers.pp (Request.headers req)));
try
let oc = open_out @@ "/tmp/" ^ path in
IO.Input.to_chan oc req.Request.body;
flush oc;
Response.make_string (Ok "uploaded file")
with e ->
Response.fail ~code:500 "couldn't upload file: %s"
(Printexc.to_string e));
(* protected by login *)
Server.add_route_handler server
Route.(exact "protected" @/ return)
(fun req ->
let ok =
match Request.get_header req "authorization" with
| Some v ->
Log.debug (fun k -> k "authenticate with %S" v);
v = "Basic " ^ base64 "user:foobar"
| None -> false
in
if ok then (
(* FIXME: a logout link *)
let s =
"<p>hello, this is super secret!</p><a href=\"/logout\">log out</a>"
in
Response.make_string (Ok s)
) else (
let headers =
Headers.(empty |> set "www-authenticate" "basic realm=\"echo\"")
in
Response.fail ~code:401 ~headers "invalid"
));
(* logout *)
Server.add_route_handler server
Route.(exact "logout" @/ return)
(fun _req -> Response.fail ~code:401 "logged out");
(* stats *)
Server.add_route_handler server
Route.(exact "stats" @/ return)
(fun _req ->
let stats = get_stats () in
Response.make_string @@ Ok stats);
Server.add_route_handler server
Route.(exact "alice" @/ return)
(fun _req -> Response.make_string (Ok alice_text));
Server.add_route_handler server
Route.(exact "alice10" @/ return)
(fun _req ->
let writer =
IO.Writer.make () ~write:(fun oc ->
for _i = 1 to 10 do
IO.Output.output_string oc alice_text;
IO.Output.flush oc
done)
in
Response.make_writer (Ok writer));
(* main page *)
Server.add_route_handler server
Route.(return)
(fun _req ->
let open Tiny_httpd_html in
let h =
html []
[
head [] [ title [] [ txt "index of echo" ] ];
body []
[
h3 [] [ txt "welcome!" ];
p [] [ b [] [ txt "endpoints are:" ] ];
ul []
[
li [] [ pre [] [ txt "/hello/:name (GET)" ] ];
li []
[
pre []
[
a [ A.href "/echo/" ] [ txt "echo" ];
txt " echo back query";
];
];
li []
[ pre [] [ txt "/upload/:path (PUT) to upload a file" ] ];
li []
[
pre []
[
txt
"/zcat/:path (GET) to download a file (deflate \
transfer-encoding)";
];
];
li []
[
pre []
[
a [ A.href "/stats/" ] [ txt "/stats/" ];
txt " (GET) to access statistics";
];
];
li []
[
pre []
[
a [ A.href "/vfs/" ] [ txt "/vfs" ];
txt " (GET) to access a VFS embedded in the binary";
];
];
li []
[
pre []
[
a [ A.href "/protected" ] [ txt "/protected" ];
txt
" (GET) to see a protected page (login: user, \
password: foobar)";
];
];
li []
[
pre []
[
a [ A.href "/logout" ] [ txt "/logout" ];
txt " (POST) to log out";
];
];
];
];
]
in
let s = to_string_top h in
Response.make_string ~headers:[ "content-type", "text/html" ] @@ Ok s);
Printf.printf "listening on http://%s:%d\n%!" (Server.addr server)
(Server.port server);
match Server.run server with
| Ok () -> ()
| Error e -> raise e

View file

@ -26,11 +26,6 @@ let atomic_before_412 =
let atomic_after_412 = {|include Atomic|} let atomic_after_412 = {|include Atomic|}
let write_file file s =
let oc = open_out file in
output_string oc s;
close_out oc
let () = let () =
let version = Scanf.sscanf Sys.ocaml_version "%d.%d.%s" (fun x y _ -> x, y) in let version = Scanf.sscanf Sys.ocaml_version "%d.%d.%s" (fun x y _ -> x, y) in
print_endline print_endline

View file

@ -73,9 +73,9 @@ let rec pp_ : type a b. Buffer.t -> (a, b) t -> unit =
| Rest { url_encoded } -> | Rest { url_encoded } ->
bpf out "<rest_of_url%s>" bpf out "<rest_of_url%s>"
(if url_encoded then (if url_encoded then
"_urlencoded" "_urlencoded"
else else
"") "")
| Compose (Exact s, tl) -> bpf out "%s/%a" s pp_ tl | Compose (Exact s, tl) -> bpf out "%s/%a" s pp_ tl
| Compose (Int, tl) -> bpf out "<int>/%a" pp_ tl | Compose (Int, tl) -> bpf out "<int>/%a" pp_ tl
| Compose (String, tl) -> bpf out "<str>/%a" pp_ tl | Compose (String, tl) -> bpf out "<str>/%a" pp_ tl
@ -91,3 +91,34 @@ module Private_ = struct
end end
let pp out x = Format.pp_print_string out (to_string x) let pp out x = Format.pp_print_string out (to_string x)
let rec to_url_rec : type b. Buffer.t -> (b, string) t -> b =
fun buf route ->
match route with
| Fire -> Buffer.contents buf
| Rest { url_encoded = _ } ->
fun str ->
Buffer.add_string buf str;
Buffer.contents buf
| Compose (comp, rest) ->
(match comp with
| Exact s ->
Buffer.add_string buf s;
Buffer.add_char buf '/';
to_url_rec buf rest
| Int ->
fun i ->
Printf.bprintf buf "%d/" i;
to_url_rec buf rest
| String ->
fun s ->
Printf.bprintf buf "%s/" s;
to_url_rec buf rest
| String_urlencoded ->
fun s ->
Printf.bprintf buf "%s/" (Util.percent_encode s);
to_url_rec buf rest)
let to_url (h : ('a, string) t) : 'a =
let buf = Buffer.create 16 in
to_url_rec buf h

View file

@ -1,8 +1,8 @@
(** Routing (** Routing
Basic type-safe routing of handlers based on URL paths. This is optional, Basic type-safe routing of handlers based on URL paths. This is optional, it
it is possible to only define the root handler with something like is possible to only define the root handler with something like
{{: https://github.com/anuragsoni/routes/} Routes}. {{:https://github.com/anuragsoni/routes/} Routes}.
@since 0.6 *) @since 0.6 *)
type ('a, 'b) comp type ('a, 'b) comp
@ -27,31 +27,35 @@ val return : ('a, 'a) t
(** Matches the empty path. *) (** Matches the empty path. *)
val rest_of_path : (string -> 'a, 'a) t val rest_of_path : (string -> 'a, 'a) t
(** Matches a string, even containing ['/']. This will match (** Matches a string, even containing ['/']. This will match the entirety of the
the entirety of the remaining route. remaining route.
@since 0.7 *) @since 0.7 *)
val rest_of_path_urlencoded : (string -> 'a, 'a) t val rest_of_path_urlencoded : (string -> 'a, 'a) t
(** Matches a string, even containing ['/'], and URL-decode it (piecewise). (** Matches a string, even containing ['/'], and URL-decode it (piecewise). This
This will match the entirety of the remaining route. will match the entirety of the remaining route.
@since 0.7 *) @since 0.7 *)
val ( @/ ) : ('a, 'b) comp -> ('b, 'c) t -> ('a, 'c) t val ( @/ ) : ('a, 'b) comp -> ('b, 'c) t -> ('a, 'c) t
(** [comp / route] matches ["foo/bar/…"] iff [comp] matches ["foo"], (** [comp / route] matches ["foo/bar/…"] iff [comp] matches ["foo"], and [route]
and [route] matches ["bar/…"]. *) matches ["bar/…"]. *)
val exact_path : string -> ('a, 'b) t -> ('a, 'b) t val exact_path : string -> ('a, 'b) t -> ('a, 'b) t
(** [exact_path "foo/bar/..." r] is equivalent to (** [exact_path "foo/bar/..." r] is equivalent to
[exact "foo" @/ exact "bar" @/ ... @/ r] [exact "foo" @/ exact "bar" @/ ... @/ r]
@since 0.11 **) @since 0.11 **)
val pp : Format.formatter -> _ t -> unit val pp : Format.formatter -> _ t -> unit
(** Print the route. (** Print the route.
@since 0.7 *) @since 0.7 *)
val to_string : _ t -> string val to_string : _ t -> string
(** Print the route. (** Print the route.
@since 0.7 *) @since 0.7 *)
val to_url : ('a, string) t -> 'a
(** [to_url route args] takes a route, and turns it into a URL path.
@since NEXT_RELEASE *)
module Private_ : sig module Private_ : sig
val eval : string list -> ('a, 'b) t -> 'a -> 'b option val eval : string list -> ('a, 'b) t -> 'a -> 'b option

View file

@ -15,7 +15,6 @@ module Head_middleware = struct
type t = { handle: 'a. 'a Request.t -> 'a Request.t } type t = { handle: 'a. 'a Request.t -> 'a Request.t }
let trivial = { handle = Fun.id } let trivial = { handle = Fun.id }
let[@inline] apply (self : t) req = self.handle req
let[@inline] apply' req (self : t) = self.handle req let[@inline] apply' req (self : t) = self.handle req
let to_middleware (self : t) : Middleware.t = let to_middleware (self : t) : Middleware.t =

View file

@ -1,7 +1,6 @@
(* adapted from https://github.com/sindresorhus/html-tags (MIT licensed) *) (* adapted from https://github.com/sindresorhus/html-tags (MIT licensed) *)
let pf = Printf.printf let pf = Printf.printf
let spf = Printf.sprintf
let void = let void =
[ [

6
src/moonpool-io/dune Normal file
View file

@ -0,0 +1,6 @@
(library
(name tiny_httpd_moonpool)
(public_name tiny_httpd_moonpool)
(libraries tiny_httpd moonpool moonpool.sync moonpool.fib moonpool-io))

View file

@ -0,0 +1,220 @@
open Tiny_httpd_core
module A = Atomic
module MIO = Moonpool_io
module Sem = Moonpool_sync.Semaphore.Counting
module Fd = Moonpool_io.Fd
module IO_helper = struct
module Slice = Iostream.Slice
module Output = struct
include IO.Output
class of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) (fd : Fd.t) :
t =
object
inherit t_from_output ~bytes:buf.bytes ()
method private output_underlying bs i len0 =
let i = ref i in
let len = ref len0 in
while !len > 0 do
match MIO.Unix.write fd bs !i !len with
| 0 -> failwith "write failed"
| n ->
i := !i + n;
len := !len - n
done
method private close_underlying () =
if not !closed then (
closed := true;
if close_noerr then (
try MIO.Unix.close fd with _ -> ()
) else
MIO.Unix.close fd
)
end
end
module Input = struct
include IO.Input
let of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) (fd : Fd.t) :
t =
let eof = ref false in
object
inherit Iostream.In_buf.t_from_refill ~bytes:buf.bytes ()
method private refill (slice : Slice.t) =
if not !eof then (
slice.off <- 0;
let continue = ref true in
while !continue do
match
MIO.Unix.read fd slice.bytes 0 (Bytes.length slice.bytes)
with
| n ->
slice.len <- n;
continue := false
done;
(* Printf.eprintf "read returned %d B\n%!" !n; *)
if slice.len = 0 then eof := true
)
method close () =
if not !closed then (
closed := true;
eof := true;
if close_noerr then (
try MIO.Unix.close fd with _ -> ()
) else
MIO.Unix.close fd
)
end
end
end
open struct
let get_addr_ (fd : Fd.t) =
match Unix.getsockname (Fd.unsafe_get fd) with
| Unix.ADDR_INET (addr, port) -> addr, port
| _ -> invalid_arg "httpd: address is not INET"
let shutdown_silent_ (fd : Fd.t) : unit =
try MIO.Unix.shutdown fd Unix.SHUTDOWN_ALL with _ -> ()
let close_silent_ (fd : Fd.t) : unit = try MIO.Unix.close fd with _ -> ()
end
type t = {
addr: string;
port: int;
buf_pool: Buf.t Pool.t;
slice_pool: IO.Slice.t Pool.t;
max_connections: int;
sem_max_connections: Sem.t;
(** semaphore to restrict the number of active concurrent connections *)
mutable sock: Fd.t option; (** Socket *)
new_thread: (unit -> unit) -> unit;
timeout: float;
running: bool A.t; (* TODO: use an atomic? *)
}
let to_tcp_server (self : t) : IO.TCP_server.builder =
{
IO.TCP_server.serve =
(fun ~after_init ~handle () : unit ->
let sock, should_bind =
match self.sock with
| Some s ->
(* Because we're getting a socket from the caller (e.g. systemd) *)
s, false
| None ->
let sock =
Unix.socket
(if Util.is_ipv6_str self.addr then
Unix.PF_INET6
else
Unix.PF_INET)
Unix.SOCK_STREAM 0
in
let fd = Fd.create sock in
fd, true (* Because we're creating the socket ourselves *)
in
MIO.Unix.set_nonblock sock;
MIO.Unix.setsockopt_optint sock Unix.SO_LINGER None;
if should_bind then (
let inet_addr = Unix.inet_addr_of_string self.addr in
MIO.Unix.setsockopt sock Unix.SO_REUSEADDR true;
MIO.Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port));
let n_listen = 2 * self.max_connections in
MIO.Unix.listen sock n_listen
);
self.sock <- Some sock;
let tcp_server =
{
IO.TCP_server.stop = (fun () -> Atomic.set self.running false);
running = (fun () -> Atomic.get self.running);
active_connections =
(fun () ->
self.max_connections - Sem.get_value self.sem_max_connections);
endpoint =
(fun () ->
let addr, port = get_addr_ sock in
Unix.string_of_inet_addr addr, port);
}
in
after_init tcp_server;
(* how to handle a single client *)
let handle_client_ (client_sock : Fd.t) (client_addr : Unix.sockaddr) :
unit =
Log.debug (fun k ->
k "t[%d]: serving new client on %s"
(Thread.id @@ Thread.self ())
(Util.show_sockaddr client_addr));
MIO.Unix.set_nonblock client_sock;
MIO.Unix.setsockopt client_sock Unix.TCP_NODELAY true;
MIO.Unix.(setsockopt_float client_sock SO_RCVTIMEO self.timeout);
MIO.Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout);
Pool.with_resource self.slice_pool @@ fun ic_buf ->
Pool.with_resource self.slice_pool @@ fun oc_buf ->
let closed = ref false in
let oc =
new IO_helper.Output.of_unix_fd
~close_noerr:true ~closed ~buf:oc_buf client_sock
in
let ic =
IO_helper.Input.of_unix_fd ~close_noerr:true ~closed ~buf:ic_buf
client_sock
in
handle.handle ~client_addr ic oc
in
MIO.Unix.set_nonblock sock;
while Atomic.get self.running do
match MIO.Unix.accept sock with
| client_sock, client_addr ->
(* limit concurrency *)
Sem.acquire self.sem_max_connections;
self.new_thread (fun () ->
try
handle_client_ client_sock client_addr;
Log.debug (fun k ->
k "t[%d]: done with client on %s, exiting"
(Thread.id @@ Thread.self ())
@@ Util.show_sockaddr client_addr);
shutdown_silent_ client_sock;
close_silent_ client_sock;
Sem.release self.sem_max_connections
with e ->
let bt = Printexc.get_raw_backtrace () in
shutdown_silent_ client_sock;
close_silent_ client_sock;
Sem.release self.sem_max_connections;
Log.error (fun k ->
k
"@[<v>Handler: uncaught exception for client %s:@ %s@ \
%s@]"
(Util.show_sockaddr client_addr)
(Printexc.to_string e)
(Printexc.raw_backtrace_to_string bt)))
| exception e ->
Log.error (fun k ->
k "Unix.accept raised an exception: %s" (Printexc.to_string e));
Atomic.set self.running false
done;
(* Wait for all threads to be done: this only works if all threads are done. *)
MIO.Unix.close sock;
while Sem.get_value self.sem_max_connections < self.max_connections do
Sem.acquire self.sem_max_connections
done;
());
}

View file

@ -0,0 +1,52 @@
include Tiny_httpd
module Fd = Io_server.Fd
open struct
let get_max_connection_ ?(max_connections = 64) () : int =
let max_connections = max 4 max_connections in
max_connections
let clear_slice (slice : IO.Slice.t) =
Bytes.fill slice.bytes 0 (Bytes.length slice.bytes) '\x00';
slice.off <- 0;
slice.len <- 0
end
let create ?max_connections ?(timeout = 0.0) ?buf_size
?(get_time_s = Unix.gettimeofday) ?(addr = "127.0.0.1") ?(port = 8080)
?(sock : Fd.t option) ?middlewares ~(runner : Moonpool.Runner.t) () : t =
let new_thread f =
ignore (Moonpool_fib.spawn_top ~on:runner f : _ Moonpool_fib.t)
in
let max_connections = get_max_connection_ ?max_connections () in
let server =
{
Io_server.addr;
new_thread;
buf_pool =
Pool.create ~clear:Buf.clear_and_zero
~mk_item:(fun () -> Buf.create ?size:buf_size ())
();
slice_pool =
Pool.create ~clear:clear_slice
~mk_item:
(let buf_size = Option.value buf_size ~default:4096 in
fun () -> IO.Slice.create buf_size)
();
running = Atomic.make true;
port;
sock;
max_connections;
sem_max_connections = Io_server.Sem.make max_connections;
timeout;
}
in
let tcp_server_builder = Io_server.to_tcp_server server in
let module B = struct
let init_addr () = addr
let init_port () = port
let get_time_s = get_time_s
let tcp_server () = tcp_server_builder
end in
let backend = (module B : IO_BACKEND) in
Server.create_from ?buf_size ?middlewares ~backend ()

View file

@ -1,5 +1,26 @@
open Common_ws_ open Common_ws_
module With_lock = struct
type t = { with_lock: 'a. (unit -> 'a) -> 'a }
type builder = unit -> t
let default_builder : builder =
fun () ->
let mutex = Mutex.create () in
{
with_lock =
(fun f ->
Mutex.lock mutex;
try
let x = f () in
Mutex.unlock mutex;
x
with e ->
Mutex.unlock mutex;
raise e);
}
end
type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit
module Frame_type = struct module Frame_type = struct
@ -52,10 +73,10 @@ module Writer = struct
mutable offset: int; (** number of bytes already in [buf] *) mutable offset: int; (** number of bytes already in [buf] *)
oc: IO.Output.t; oc: IO.Output.t;
mutable closed: bool; mutable closed: bool;
mutex: Mutex.t; mutex: With_lock.t;
} }
let create ?(buf_size = 16 * 1024) ~oc () : t = let create ?(buf_size = 16 * 1024) ~with_lock ~oc () : t =
{ {
header = Header.create (); header = Header.create ();
header_buf = Bytes.create 16; header_buf = Bytes.create 16;
@ -63,19 +84,9 @@ module Writer = struct
offset = 0; offset = 0;
oc; oc;
closed = false; closed = false;
mutex = Mutex.create (); mutex = with_lock;
} }
let[@inline] with_mutex_ (self : t) f =
Mutex.lock self.mutex;
try
let x = f () in
Mutex.unlock self.mutex;
x
with e ->
Mutex.unlock self.mutex;
raise e
let[@inline] close self = self.closed <- true let[@inline] close self = self.closed <- true
let int_of_bool : bool -> int = Obj.magic let int_of_bool : bool -> int = Obj.magic
@ -142,7 +153,7 @@ module Writer = struct
if self.offset = Bytes.length self.buf then really_output_buf_ self if self.offset = Bytes.length self.buf then really_output_buf_ self
let send_pong (self : t) : unit = let send_pong (self : t) : unit =
let@ () = with_mutex_ self in let@ () = self.mutex.with_lock in
self.header.fin <- true; self.header.fin <- true;
self.header.ty <- Frame_type.pong; self.header.ty <- Frame_type.pong;
self.header.payload_len <- 0; self.header.payload_len <- 0;
@ -151,7 +162,7 @@ module Writer = struct
write_header_ self write_header_ self
let output_char (self : t) c : unit = let output_char (self : t) c : unit =
let@ () = with_mutex_ self in let@ () = self.mutex.with_lock in
let cap = Bytes.length self.buf - self.offset in let cap = Bytes.length self.buf - self.offset in
(* make room for [c] *) (* make room for [c] *)
if cap = 0 then really_output_buf_ self; if cap = 0 then really_output_buf_ self;
@ -161,7 +172,7 @@ module Writer = struct
if cap = 1 then really_output_buf_ self if cap = 1 then really_output_buf_ self
let output (self : t) buf i len : unit = let output (self : t) buf i len : unit =
let@ () = with_mutex_ self in let@ () = self.mutex.with_lock in
let i = ref i in let i = ref i in
let len = ref len in let len = ref len in
while !len > 0 do while !len > 0 do
@ -179,7 +190,7 @@ module Writer = struct
flush_if_full self flush_if_full self
let flush self : unit = let flush self : unit =
let@ () = with_mutex_ self in let@ () = self.mutex.with_lock in
flush_ self flush_ self
end end
@ -187,8 +198,8 @@ module Reader = struct
type state = type state =
| Begin (** At the beginning of a frame *) | Begin (** At the beginning of a frame *)
| Reading_frame of { mutable remaining_bytes: int; mutable num_read: int } | Reading_frame of { mutable remaining_bytes: int; mutable num_read: int }
(** Currently reading the payload of a frame with [remaining_bytes] (** Currently reading the payload of a frame with [remaining_bytes] left
left to read from the underlying [ic] *) to read from the underlying [ic] *)
| Close | Close
type t = { type t = {
@ -266,7 +277,7 @@ module Reader = struct
external apply_masking_ : external apply_masking_ :
key:bytes -> key_offset:int -> buf:bytes -> int -> int -> unit key:bytes -> key_offset:int -> buf:bytes -> int -> int -> unit
= "tiny_httpd_ws_apply_masking" = "tiny_httpd_ws_apply_masking"
[@@noalloc] [@@noalloc]
(** Apply masking to the parsed data *) (** Apply masking to the parsed data *)
let[@inline] apply_masking ~mask_key ~mask_offset (buf : bytes) off len : unit let[@inline] apply_masking ~mask_key ~mask_offset (buf : bytes) off len : unit
@ -390,8 +401,8 @@ module Reader = struct
) )
end end
let upgrade ic oc : _ * _ = let upgrade ?(with_lock = With_lock.default_builder ()) ic oc : _ * _ =
let writer = Writer.create ~oc () in let writer = Writer.create ~with_lock ~oc () in
let reader = Reader.create ~ic ~writer () in let reader = Reader.create ~ic ~writer () in
let ws_ic : IO.Input.t = let ws_ic : IO.Input.t =
object object
@ -414,9 +425,11 @@ let upgrade ic oc : _ * _ =
in in
ws_ic, ws_oc ws_ic, ws_oc
(** Turn a regular connection handler (provided by the user) into a websocket upgrade handler *) (** Turn a regular connection handler (provided by the user) into a websocket
upgrade handler *)
module Make_upgrade_handler (X : sig module Make_upgrade_handler (X : sig
val accept_ws_protocol : string -> bool val accept_ws_protocol : string -> bool
val with_lock : With_lock.builder
val handler : handler val handler : handler
end) : Server.UPGRADE_HANDLER with type handshake_state = unit Request.t = end) : Server.UPGRADE_HANDLER with type handshake_state = unit Request.t =
struct struct
@ -461,7 +474,8 @@ struct
try Ok (handshake_ req) with Bad_req s -> Error s try Ok (handshake_ req) with Bad_req s -> Error s
let handle_connection req ic oc = let handle_connection req ic oc =
let ws_ic, ws_oc = upgrade ic oc in let with_lock = X.with_lock () in
let ws_ic, ws_oc = upgrade ~with_lock ic oc in
try X.handler req ws_ic ws_oc try X.handler req ws_ic ws_oc
with Close_connection -> with Close_connection ->
Log.debug (fun k -> k "websocket: requested to close the connection"); Log.debug (fun k -> k "websocket: requested to close the connection");
@ -469,9 +483,11 @@ struct
end end
let add_route_handler ?accept ?(accept_ws_protocol = fun _ -> true) ?middlewares let add_route_handler ?accept ?(accept_ws_protocol = fun _ -> true) ?middlewares
(server : Server.t) route (f : handler) : unit = ?(with_lock = With_lock.default_builder) (server : Server.t) route
(f : handler) : unit =
let module M = Make_upgrade_handler (struct let module M = Make_upgrade_handler (struct
let handler = f let handler = f
let with_lock = with_lock
let accept_ws_protocol = accept_ws_protocol let accept_ws_protocol = accept_ws_protocol
end) in end) in
let up : Server.upgrade_handler = (module M) in let up : Server.upgrade_handler = (module M) in

View file

@ -1,30 +1,60 @@
(** Websockets for Tiny_httpd. (** Websockets for Tiny_httpd.
This sub-library ([tiny_httpd.ws]) exports a small implementation This sub-library ([tiny_httpd.ws]) exports a small implementation for a
for a websocket server. It has no additional dependencies. websocket server. It has no additional dependencies. *)
*)
(** Synchronization primitive used to allow both the reader to reply to "ping",
and the handler to send messages, without stepping on each other's toes.
@since NEXT_RELEASE *)
module With_lock : sig
type t = { with_lock: 'a. (unit -> 'a) -> 'a }
(** A primitive to run the callback in a critical section where others cannot
run at the same time.
The default is a mutex, but that works poorly with thread pools so it's
possible to use a semaphore or a cooperative mutex instead. *)
type builder = unit -> t
val default_builder : builder
(** Lock using [Mutex]. *)
end
type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit
(** Websocket handler *) (** Websocket handler *)
val upgrade : IO.Input.t -> IO.Output.t -> IO.Input.t * IO.Output.t val upgrade :
(** Upgrade a byte stream to the websocket framing protocol. *) ?with_lock:With_lock.t ->
IO.Input.t ->
IO.Output.t ->
IO.Input.t * IO.Output.t
(** Upgrade a byte stream to the websocket framing protocol.
@param with_lock
if provided, use this to prevent reader and writer to compete on sending
frames. since NEXT_RELEASE. *)
exception Close_connection exception Close_connection
(** Exception that can be raised from IOs inside the handler, (** Exception that can be raised from IOs inside the handler, when the
when the connection is closed from underneath. *) connection is closed from underneath. *)
val add_route_handler : val add_route_handler :
?accept:(unit Request.t -> (unit, int * string) result) -> ?accept:(unit Request.t -> (unit, int * string) result) ->
?accept_ws_protocol:(string -> bool) -> ?accept_ws_protocol:(string -> bool) ->
?middlewares:Server.Head_middleware.t list -> ?middlewares:Server.Head_middleware.t list ->
?with_lock:With_lock.builder ->
Server.t -> Server.t ->
(Server.upgrade_handler, Server.upgrade_handler) Route.t -> (Server.upgrade_handler, Server.upgrade_handler) Route.t ->
handler -> handler ->
unit unit
(** Add a route handler for a websocket endpoint. (** Add a route handler for a websocket endpoint.
@param accept_ws_protocol decides whether this endpoint accepts the websocket protocol @param accept_ws_protocol
sent by the client. Default accepts everything. *) decides whether this endpoint accepts the websocket protocol sent by the
client. Default accepts everything.
@param with_lock
if provided, use this to synchronize writes between the frame reader
(replies "pong" to "ping") and the handler emitting writes. since
NEXT_RELEASE. *)
(**/**) (**/**)

View file

@ -8,7 +8,7 @@ CAMLprim value tiny_httpd_ws_apply_masking(value _mask_key, value _mask_offset,
CAMLparam5(_mask_key, _mask_offset, _buf, _offset, _len); CAMLparam5(_mask_key, _mask_offset, _buf, _offset, _len);
char const *mask_key = String_val(_mask_key); char const *mask_key = String_val(_mask_key);
char *buf = Bytes_val(_buf); unsigned char *buf = Bytes_val(_buf);
intnat mask_offset = Int_val(_mask_offset); intnat mask_offset = Int_val(_mask_offset);
intnat offset = Int_val(_offset); intnat offset = Int_val(_offset);
intnat len = Int_val(_len); intnat len = Int_val(_len);

View file

@ -19,6 +19,27 @@
(action (action
(diff echo1.expect echo1.out))) (diff echo1.expect echo1.out)))
(rule
(targets echo_mio1.out)
(deps
(:bin ../examples/echo_mio.exe))
(locks /port)
(enabled_if
(= %{system} "linux"))
(package tiny_httpd_moonpool)
(action
(with-stdout-to
%{targets}
(run ./echo_mio1.sh %{bin}))))
(rule
(alias runtest)
(package tiny_httpd_moonpool)
(enabled_if
(= %{system} "linux"))
(action
(diff echo_mio1.expect echo_mio1.out)))
(rule (rule
(targets sse_count.out) (targets sse_count.out)
(deps (deps

10
tests/echo_mio1.expect Normal file
View file

@ -0,0 +1,10 @@
listening on http://127.0.0.1:8085
test moonpool_io
echo:
{meth=GET; host=localhost:8085;
headers=[user-agent: test
accept: */*
host: localhost:8085];
path="/echo/?a=b&c=d"; body=""; path_components=["echo"];
query=["c","d";"a","b"]}
(query: "c" = "d";"a" = "b")

12
tests/echo_mio1.sh Executable file
View file

@ -0,0 +1,12 @@
#!/usr/bin/env sh
ECHO=$1
PORT=8085
"$ECHO" -p $PORT &
PID=$!
sleep 0.1
echo "test moonpool_io"
curl -N "http://localhost:${PORT}/echo/?a=b&c=d" -H user-agent:test --max-time 5
kill $PID

35
tiny_httpd_moonpool.opam Normal file
View file

@ -0,0 +1,35 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.17"
synopsis: "Moonpool+picos_stdio backend for Tiny_httpd"
maintainer: ["c-cube"]
authors: ["c-cube"]
license: "MIT"
homepage: "https://github.com/c-cube/tiny_httpd/"
bug-reports: "https://github.com/c-cube/tiny_httpd/issues"
depends: [
"dune" {>= "2.9"}
"seq"
"tiny_httpd" {= version}
"moonpool" {>= "0.7"}
"moonpool-io" {>= "0.7"}
"ocaml" {>= "5.0"}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"--promote-install-files=false"
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
["dune" "install" "-p" name "--create-install-files" name]
]
dev-repo: "git+https://github.com/c-cube/tiny_httpd.git"