Compare commits

..

23 commits

Author SHA1 Message Date
Simon Cruanes
8cf59ffed1
Merge branch 'eio-fixes' into simon/use-eio-round2
* eio-fixes:
  eio: add 60s shutdown backstop, protect Flow.close from raising
  eio: fix semaphore acquisition, graceful stop, and time source
2026-02-15 16:28:54 -05:00
Simon Cruanes
c6468dced8 eio: add 60s shutdown backstop, protect Flow.close from raising 2026-02-15 21:26:25 +00:00
Simon Cruanes
3b631b7e4c eio: fix semaphore acquisition, graceful stop, and time source
- Acquire semaphore BEFORE spawning handler fiber: replace
  Eio.Net.accept_fork with manual accept + Semaphore.acquire + Fiber.fork
  so we bound the number of in-flight fibers rather than spawning
  unlimited fibers that all block on the semaphore.

- Graceful stop: remove Eio.Switch.fail sw Exit from stop(), just set
  running to false so existing handlers can complete naturally instead
  of being cancelled immediately.

- Replace Unix.gettimeofday with Eio.Time.now clock to use the Eio
  clock abstraction instead of direct Unix calls.
2026-02-15 21:22:39 +00:00
Simon Cruanes
2a3cfa015a
CI 2026-02-15 16:14:49 -05:00
Simon Cruanes
ffdcc1139c
format 2026-02-15 16:13:24 -05:00
Simon Cruanes
32421a26bc
go back to eio.unix 2026-02-15 16:08:59 -05:00
Simon Cruanes
97c4e4dc08 chore: upgrade qcheck to 0.91, fix deprecation warnings
- small_list -> list_small
- bytes_of_size -> bytes_size
- small_nat -> nat_small
- prefix unused Frame_type constants with underscore
2026-02-15 21:00:48 +00:00
Simon Cruanes
6c3e705df5 ci: only build/test eio on OCaml 5.x 2026-02-15 20:59:50 +00:00
Simon Cruanes
28f7ddd74f fix(eio): address review feedback
- Add closed flag to ic_of_flow/oc_of_flow to prevent double-release of
  pool cstructs and double-shutdown
- Enforce max_connections with Eio.Semaphore to limit concurrent connections
- Fix port 0 detection using Eio.Net.listening_addr to return actual port
- Use pool_size for cstruct pool max_size (was computed but unused)
- Set TCP_NODELAY on accepted connections for low latency
2026-02-15 20:59:50 +00:00
Simon Cruanes
f0aadc0307
disable warning 2026-02-15 15:54:03 -05:00
Simon Cruanes
5e32ce7bcc
fix dune 2026-02-15 15:46:31 -05:00
Simon Cruanes
98385b43a9
format 2026-02-15 15:39:56 -05:00
Simon Cruanes
93c08944bf
fix CI 2026-02-15 15:23:22 -05:00
Simon Cruanes
46d30392b9
delete gh-pages action 2026-02-15 15:23:18 -05:00
Simon Cruanes
94ed68c30c
fix warnings 2026-02-15 15:13:25 -05:00
Simon Cruanes
a11ed88522
chore: bounds on eio 2026-02-15 15:12:30 -05:00
Simon Cruanes
55dac0fa0b
CI 2026-02-15 15:12:30 -05:00
Simon Cruanes
0e35e38c09
example with eio 2026-02-15 15:12:30 -05:00
Simon Cruanes
3e7c73093e
feat: tiny_httpd_eio library
provides a tiny_httpd server that relies on Eio for non-blocking
sockets and for concurrency using eio fibers.
2026-02-15 15:12:30 -05:00
Simon Cruanes
ba19880d75
hardening bugfixes
Some checks failed
github pages / deploy (push) Has been cancelled
build / build (4.13.x, ubuntu-latest) (push) Has been cancelled
build / build (4.14.x, ubuntu-latest) (push) Has been cancelled
build / build (5.03.x, ubuntu-latest) (push) Has been cancelled
* fix: use realpath to validate filesystem paths against traversal

- add string_prefix helper to check path containment
- compute root_canonical once per add_vfs_ call
- use realpath only for filesystem (on_fs=true), keeping simple
  contains_dot_dot check for VFS
- paths are already URL-decoded by Route.rest_of_path_urlencoded

* fix: add header size limits to prevent memory exhaustion

add optional limits to Headers.parse_:
- max_headers: 100 (default)
- max_header_size: 16KiB per header (default)
- max_total_size: 256KiB total (default)

returns 431 status code when limits exceeded per RFC 6585.
2026-02-10 19:57:21 -05:00
Simon Cruanes
8a8aadfbb0
doc
Some checks failed
github pages / deploy (push) Has been cancelled
build / build (4.08.x, ubuntu-latest) (push) Has been cancelled
build / build (4.14.x, ubuntu-latest) (push) Has been cancelled
build / build (5.03.x, ubuntu-latest) (push) Has been cancelled
2025-06-24 21:13:18 -04:00
Simon Cruanes
9a1343aef7
remove global withlock builder, pass it as argument instead
Some checks failed
github pages / deploy (push) Has been cancelled
build / build (4.08.x, ubuntu-latest) (push) Has been cancelled
build / build (4.14.x, ubuntu-latest) (push) Has been cancelled
build / build (5.03.x, ubuntu-latest) (push) Has been cancelled
2025-06-23 10:08:07 -04:00
Simon Cruanes
f10992ec32
feat WS: abstraction for critical section
Some checks failed
github pages / deploy (push) Has been cancelled
build / build (4.08.x, ubuntu-latest) (push) Has been cancelled
build / build (4.14.x, ubuntu-latest) (push) Has been cancelled
build / build (5.03.x, ubuntu-latest) (push) Has been cancelled
can be replaced with a proper cooperative lock
2025-06-20 18:03:40 -04:00
23 changed files with 310 additions and 142 deletions

28
.github/workflows/format.yml vendored Normal file
View file

@ -0,0 +1,28 @@
name: format
on:
pull_request:
push:
branches:
- main
jobs:
format:
name: format
strategy:
matrix:
ocaml-compiler:
- '5.3'
runs-on: 'ubuntu-latest'
steps:
- uses: actions/checkout@main
- name: Use OCaml ${{ matrix.ocaml-compiler }}
uses: ocaml/setup-ocaml@v3
with:
ocaml-compiler: ${{ matrix.ocaml-compiler }}
dune-cache: true
allow-prerelease-opam: true
- run: opam install ocamlformat.0.27.0
- run: opam exec -- make format-check

View file

@ -1,36 +0,0 @@
name: github pages
on:
push:
branches:
- main
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Use OCaml
uses: ocaml/setup-ocaml@v3
with:
ocaml-compiler: 5.03.x
dune-cache: true
allow-prerelease-opam: true
- name: Deps
run: opam install odig tiny_httpd tiny_httpd_camlzip tiny_httpd_eio
- name: Build
run: opam exec -- odig odoc --cache-dir=_doc/ tiny_httpd tiny_httpd_camlzip tiny_httpd_eio
- name: Deploy
uses: peaceiris/actions-gh-pages@v3
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
publish_dir: ./_doc/html
destination_dir: .
enable_jekyll: false
#keep_files: true

View file

@ -16,7 +16,7 @@ jobs:
#- macos-latest
#- windows-latest
ocaml-compiler:
- 4.08.x
- 4.13.x
- 4.14.x
- 5.03.x
@ -38,7 +38,15 @@ jobs:
- run: opam install ./tiny_httpd.opam ./tiny_httpd_camlzip.opam --deps-only --with-test
- run: opam exec -- dune build @install -p tiny_httpd,tiny_httpd_camlzip,tiny_httpd_eio
- name: Build (OCaml 4.x)
run: opam exec -- dune build @install -p tiny_httpd,tiny_httpd_camlzip
if: ${{ !startsWith(matrix.ocaml-compiler, '5.') }}
- name: Build (OCaml 5.x, includes eio)
run: |
opam install ./tiny_httpd.opam ./tiny_httpd_eio.opam --deps-only --with-test
opam exec -- dune build @install -p tiny_httpd,tiny_httpd_camlzip,tiny_httpd_eio
if: ${{ startsWith(matrix.ocaml-compiler, '5.') }}
- run: opam exec -- dune build @src/runtest @examples/runtest @tests/runtest -p tiny_httpd
if: ${{ matrix.os == 'ubuntu-latest' }}
@ -50,4 +58,10 @@ jobs:
- run: opam install logs magic-mime -y
- run: opam exec -- dune build @install -p tiny_httpd,tiny_httpd_camlzip,tiny_httpd_eio
- name: Final build (OCaml 4.x)
run: opam exec -- dune build @install -p tiny_httpd,tiny_httpd_camlzip
if: ${{ !startsWith(matrix.ocaml-compiler, '5.') }}
- name: Final build (OCaml 5.x, includes eio)
run: opam exec -- dune build @install -p tiny_httpd,tiny_httpd_camlzip,tiny_httpd_eio
if: ${{ startsWith(matrix.ocaml-compiler, '5.') }}

2
.gitignore vendored
View file

@ -3,3 +3,5 @@ _build
_opam
*.install
.merlin
todo.md
*.tmp

View file

@ -23,12 +23,12 @@
result
hmap
(iostream (>= 0.2))
(ocaml (>= 4.08))
(ocaml (>= 4.13))
(odoc :with-doc)
(logs :with-test)
(conf-libcurl :with-test)
(ptime :with-test)
(qcheck-core (and (>= 0.9) :with-test))))
(qcheck-core (and (>= 0.91) :with-test))))
(package
(name tiny_httpd_camlzip)
@ -46,5 +46,6 @@
(depends
(tiny_httpd (= :version))
(eio (and (>= 1.0) (< 2.0)))
base-unix
(logs :with-test)
(odoc :with-doc)))

View file

@ -73,10 +73,10 @@
; produce an embedded FS
(library
(name echo_vfs)
(modules vfs)
(wrapped false)
(libraries tiny_httpd))
(name echo_vfs)
(modules vfs)
(wrapped false)
(libraries tiny_httpd))
(rule
(targets vfs.ml)

View file

@ -69,12 +69,10 @@ let middleware_stat () : Server.Middleware.t * (unit -> string) =
let middleware_trace : Server.Middleware.t =
fun (h : Server.Middleware.handler) req ~resp ->
let _sp =
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "http.handle"
in
let _sp = Trace.enter_span ~__FILE__ ~__LINE__ "http.handle" in
let new_resp (r : Response.t) =
Trace.add_data_to_manual_span _sp [ "http.code", `Int r.code ];
Trace.exit_manual_span _sp;
Trace.add_data_to_span _sp [ "http.code", `Int r.code ];
Trace.exit_span _sp;
resp r
in
h req ~resp:new_resp

View file

@ -83,8 +83,13 @@ let parse_line_ (line : string) : _ result =
Ok (k, v)
with Failure msg -> Error msg
let parse_ ~(buf : Buf.t) (bs : IO.Input.t) : t =
let rec loop acc =
let parse_ ~(buf : Buf.t) ?(max_headers = 100) ?(max_header_size = 16 * 1024)
?(max_total_size = 256 * 1024) (bs : IO.Input.t) : t =
let rec loop acc count total_size =
if count >= max_headers then
bad_reqf 431 "too many headers (max: %d)" max_headers;
if total_size >= max_total_size then
bad_reqf 431 "headers too large (max: %d bytes)" max_total_size;
match IO.Input.read_line_using_opt ~buf bs with
| None -> raise End_of_file
| Some "" -> assert false
@ -92,12 +97,15 @@ let parse_ ~(buf : Buf.t) (bs : IO.Input.t) : t =
| Some line when line.[String.length line - 1] <> '\r' ->
bad_reqf 400 "bad header line, not ended in CRLF"
| Some line ->
let line_len = String.length line in
if line_len > max_header_size then
bad_reqf 431 "header too large (max: %d bytes)" max_header_size;
let k, v =
match parse_line_ line with
| Ok r -> r
| Error msg ->
bad_reqf 400 "invalid header line: %s\nline is: %S" msg line
in
loop ((k, v) :: acc)
loop ((k, v) :: acc) (count + 1) (total_size + line_len)
in
loop []
loop [] 0 0

View file

@ -34,7 +34,14 @@ val pp : Format.formatter -> t -> unit
(**/*)
val parse_ : buf:Buf.t -> IO.Input.t -> t
val parse_ :
buf:Buf.t ->
?max_headers:int ->
?max_header_size:int ->
?max_total_size:int ->
IO.Input.t ->
t
val parse_line_ : string -> (string * string, string) result
(**/*)

View file

@ -25,6 +25,7 @@ let descr = function
| 411 -> "Length required"
| 413 -> "Payload too large"
| 417 -> "Expectation failed"
| 431 -> "Request Header Fields Too Large"
| 500 -> "Internal server error"
| 501 -> "Not implemented"
| 503 -> "Service unavailable"

View file

@ -55,6 +55,9 @@ val to_string : _ t -> string
val to_url : ('a, string) t -> 'a
(** [to_url route args] takes a route, and turns it into a URL path.
@since NEXT_RELEASE *)
module Private_ : sig
val eval : string list -> ('a, 'b) t -> 'a -> 'b option
end

View file

@ -1,8 +1,6 @@
(library
(name tiny_httpd_eio)
(public_name tiny_httpd_eio)
(synopsis "An EIO-based backend for Tiny_httpd")
(flags :standard -safe-string -warn-error -a+8)
(libraries tiny_httpd eio eio.unix))
(name tiny_httpd_eio)
(public_name tiny_httpd_eio)
(synopsis "An EIO-based backend for Tiny_httpd")
(flags :standard -safe-string -warn-error -a+8)
(libraries tiny_httpd eio eio.unix))

View file

@ -31,8 +31,10 @@ let eio_sock_addr_to_unix (a : Eio.Net.Sockaddr.stream) : Unix.sockaddr =
| `Tcp (h, p) -> Unix.ADDR_INET (eio_ipaddr_to_unix h, p)
| `Unix s -> Unix.ADDR_UNIX s
let ic_of_flow ~buf_pool:ic_pool (flow : _ Eio.Net.stream_socket) : IO.Input.t =
let ic_of_flow ~closed ~buf_pool:ic_pool (flow : _ Eio.Net.stream_socket) :
IO.Input.t =
let cstruct = Pool.Raw.acquire ic_pool in
let sent_shutdown = ref false in
object
inherit Iostream.In_buf.t_from_refill ()
@ -52,15 +54,22 @@ let ic_of_flow ~buf_pool:ic_pool (flow : _ Eio.Net.stream_socket) : IO.Input.t =
sl.len <- n
method close () =
Pool.Raw.release ic_pool cstruct;
Eio.Flow.shutdown flow `Receive
if not !closed then (
closed := true;
Pool.Raw.release ic_pool cstruct
);
if not !sent_shutdown then (
sent_shutdown := true;
Eio.Flow.shutdown flow `Receive
)
end
let oc_of_flow ~buf_pool:oc_pool (flow : _ Eio.Net.stream_socket) : IO.Output.t
=
let oc_of_flow ~closed ~buf_pool:oc_pool (flow : _ Eio.Net.stream_socket) :
IO.Output.t =
(* write buffer *)
let wbuf : Cstruct.t = Pool.Raw.acquire oc_pool in
let offset = ref 0 in
let sent_shutdown = ref false in
object (self)
method flush () : unit =
@ -91,8 +100,14 @@ let oc_of_flow ~buf_pool:oc_pool (flow : _ Eio.Net.stream_socket) : IO.Output.t
if !offset = Cstruct.length wbuf then self#flush ()
method close () =
Pool.Raw.release oc_pool wbuf;
Eio.Flow.shutdown flow `Send
if not !closed then (
closed := true;
Pool.Raw.release oc_pool wbuf
);
if not !sent_shutdown then (
sent_shutdown := true;
Eio.Flow.shutdown flow `Send
)
end
let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
@ -118,7 +133,8 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
let module M = struct
let init_addr () = addr
let init_port () = port
let get_time_s () = Unix.gettimeofday ()
let clock = Eio.Stdenv.clock stdenv
let get_time_s () = Eio.Time.now clock
let max_connections = get_max_connection_ ?max_connections ()
let pool_size =
@ -127,7 +143,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
| None -> min 4096 (max_connections * 2)
let cstruct_pool =
Pool.create ~max_size:max_connections
Pool.create ~max_size:pool_size
~mk_item:(fun () -> Cstruct.create buf_size)
()
@ -137,6 +153,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
(fun ~after_init ~handle () : unit ->
let running = Atomic.make true in
let active_conns = Atomic.make 0 in
let sem = Eio.Semaphore.make max_connections in
Eio.Switch.on_release sw (fun () -> Atomic.set running false);
let net = Eio.Stdenv.net stdenv in
@ -148,17 +165,26 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
sockaddr
in
(* Resolve actual address/port (important for port 0) *)
let actual_addr, actual_port =
match Eio.Net.listening_addr sock with
| `Tcp (_, p) -> addr, p
| `Unix s -> Printf.sprintf "unix:%s" s, 0
in
let tcp_server : IO.TCP_server.t =
{
running = (fun () -> Atomic.get running);
stop =
(fun () ->
Atomic.set running false;
Eio.Switch.fail sw Exit);
endpoint =
(fun () ->
(* TODO: find the real port *)
addr, port);
(* Backstop: fail the switch after 60s if handlers don't complete *)
Eio.Fiber.fork_daemon ~sw (fun () ->
Eio.Time.sleep clock 60.0;
if Eio.Switch.get_error sw |> Option.is_none then
Eio.Switch.fail sw Exit;
`Stop_daemon));
endpoint = (fun () -> actual_addr, actual_port);
active_connections = (fun () -> Atomic.get active_conns);
}
in
@ -166,33 +192,50 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
after_init tcp_server;
while Atomic.get running do
Eio.Net.accept_fork ~sw
~on_error:(fun exn ->
Log.error (fun k ->
k "error in client handler: %s" (Printexc.to_string exn)))
sock
(fun flow client_addr ->
Atomic.incr active_conns;
let@ () =
Fun.protect ~finally:(fun () ->
Log.debug (fun k ->
k "Tiny_httpd_eio: client handler returned");
Atomic.decr active_conns)
in
let ic = ic_of_flow ~buf_pool:cstruct_pool flow in
let oc = oc_of_flow ~buf_pool:cstruct_pool flow in
match Eio.Net.accept ~sw sock with
| exception (Eio.Cancel.Cancelled _ | Eio.Io _)
when not (Atomic.get running) ->
(* Socket closed or switch cancelled during shutdown; exit loop *)
()
| conn, client_addr ->
(* Acquire semaphore BEFORE spawning a fiber so we
bound the number of in-flight fibers. *)
Eio.Semaphore.acquire sem;
Eio.Fiber.fork ~sw (fun () ->
let@ () =
Fun.protect ~finally:(fun () ->
Log.debug (fun k ->
k "Tiny_httpd_eio: client handler returned");
Atomic.decr active_conns;
Eio.Semaphore.release sem;
try Eio.Flow.close conn with Eio.Io _ -> ())
in
(try
Eio_unix.Fd.use_exn "setsockopt" (Eio_unix.Net.fd conn)
(fun fd -> Unix.setsockopt fd Unix.TCP_NODELAY true)
with Unix.Unix_error _ -> ());
Atomic.incr active_conns;
let ic_closed = ref false in
let oc_closed = ref false in
let ic =
ic_of_flow ~closed:ic_closed ~buf_pool:cstruct_pool conn
in
let oc =
oc_of_flow ~closed:oc_closed ~buf_pool:cstruct_pool conn
in
Log.debug (fun k ->
k "handling client on %a…" Eio.Net.Sockaddr.pp client_addr);
let client_addr_unix = eio_sock_addr_to_unix client_addr in
try handle.handle ~client_addr:client_addr_unix ic oc
with exn ->
let bt = Printexc.get_raw_backtrace () in
Log.error (fun k ->
k "Client handler for %a failed with %s\n%s"
Eio.Net.Sockaddr.pp client_addr
(Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt)))
Log.debug (fun k ->
k "handling client on %a…" Eio.Net.Sockaddr.pp
client_addr);
let client_addr_unix = eio_sock_addr_to_unix client_addr in
try handle.handle ~client_addr:client_addr_unix ic oc
with exn ->
let bt = Printexc.get_raw_backtrace () in
Log.error (fun k ->
k "Client handler for %a failed with %s\n%s"
Eio.Net.Sockaddr.pp client_addr
(Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt)))
done);
}
end in

View file

@ -43,6 +43,27 @@ let contains_dot_dot s =
false
with Exit -> true
(* Check if string [s] starts with prefix [pre] *)
let string_prefix ~pre s =
let len_pre = String.length pre in
String.length s >= len_pre && String.sub s 0 len_pre = pre
(* Check if a path is safe (doesn't escape root directory).
Only needed for real filesystem access. *)
let is_path_safe ~root_canonical ~path =
try
let full_path = Filename.concat root_canonical path in
let path_canonical = Unix.realpath full_path in
string_prefix ~pre:root_canonical path_canonical
with Unix.Unix_error _ ->
(* If realpath fails (e.g., file doesn't exist for uploads),
check parent directory *)
(try
let parent = Filename.dirname (Filename.concat root_canonical path) in
let parent_canonical = Unix.realpath parent in
string_prefix ~pre:root_canonical parent_canonical
with Unix.Unix_error _ -> false)
(* Human readable size *)
let human_size (x : int) : string =
if x >= 1_000_000_000 then
@ -206,6 +227,17 @@ let html_list_dir (module VFS : VFS) ~prefix ~parent d : Html.elt =
(* @param on_fs: if true, we assume the file exists on the FS *)
let add_vfs_ ~on_fs ~top ~config ~vfs:((module VFS : VFS) as vfs) ~prefix server
: unit =
let root_canonical =
if on_fs then (
try Some (Unix.realpath top) with _ -> None
) else
None
in
let check_path path =
match root_canonical with
| Some root -> is_path_safe ~root_canonical:root ~path
| None -> not (contains_dot_dot path)
in
let route () =
if prefix = "" then
Route.rest_of_path_urlencoded
@ -214,7 +246,7 @@ let add_vfs_ ~on_fs ~top ~config ~vfs:((module VFS : VFS) as vfs) ~prefix server
in
if config.delete then
S.add_route_handler server ~meth:`DELETE (route ()) (fun path _req ->
if contains_dot_dot path then
if not (check_path path) then
Response.fail_raise ~code:403 "invalid path in delete"
else
Response.make_string
@ -233,7 +265,7 @@ let add_vfs_ ~on_fs ~top ~config ~vfs:((module VFS : VFS) as vfs) ~prefix server
| Some n when n > config.max_upload_size ->
Error
(403, "max upload size is " ^ string_of_int config.max_upload_size)
| Some _ when contains_dot_dot req.Request.path ->
| Some _ when not (check_path req.Request.path) ->
Error (403, "invalid path (contains '..')")
| _ -> Ok ())
(fun path req ->
@ -264,7 +296,7 @@ let add_vfs_ ~on_fs ~top ~config ~vfs:((module VFS : VFS) as vfs) ~prefix server
| None -> Response.fail_raise ~code:403 "Cannot access file"
| Some t -> Printf.sprintf "mtime: %.4f" t)
in
if contains_dot_dot path then
if not (check_path path) then
Response.fail ~code:403 "Path is forbidden"
else if not (VFS.contains path) then
Response.fail ~code:404 "File not found"

View file

@ -51,7 +51,7 @@
(public_name tiny_httpd.ws)
(synopsis "Websockets for tiny_httpd")
(private_modules common_ws_ utils_)
(flags :standard -open Tiny_httpd_core)
(flags :standard -w -32 -open Tiny_httpd_core)
(foreign_stubs
(language c)
(names tiny_httpd_ws_stubs)

View file

@ -1,15 +1,36 @@
open Common_ws_
module With_lock = struct
type t = { with_lock: 'a. (unit -> 'a) -> 'a }
type builder = unit -> t
let default_builder : builder =
fun () ->
let mutex = Mutex.create () in
{
with_lock =
(fun f ->
Mutex.lock mutex;
try
let x = f () in
Mutex.unlock mutex;
x
with e ->
Mutex.unlock mutex;
raise e);
}
end
type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit
module Frame_type = struct
type t = int
let continuation : t = 0
let text : t = 1
let _continuation : t = 0
let _text : t = 1
let binary : t = 2
let close : t = 8
let ping : t = 9
let _close : t = 8
let _ping : t = 9
let pong : t = 10
let show = function
@ -52,10 +73,10 @@ module Writer = struct
mutable offset: int; (** number of bytes already in [buf] *)
oc: IO.Output.t;
mutable closed: bool;
mutex: Mutex.t;
mutex: With_lock.t;
}
let create ?(buf_size = 16 * 1024) ~oc () : t =
let create ?(buf_size = 16 * 1024) ~with_lock ~oc () : t =
{
header = Header.create ();
header_buf = Bytes.create 16;
@ -63,19 +84,9 @@ module Writer = struct
offset = 0;
oc;
closed = false;
mutex = Mutex.create ();
mutex = with_lock;
}
let[@inline] with_mutex_ (self : t) f =
Mutex.lock self.mutex;
try
let x = f () in
Mutex.unlock self.mutex;
x
with e ->
Mutex.unlock self.mutex;
raise e
let[@inline] close self = self.closed <- true
let int_of_bool : bool -> int = Obj.magic
@ -121,7 +132,7 @@ module Writer = struct
()
(** Max fragment size: send 16 kB at a time *)
let max_fragment_size = 16 * 1024
let _max_fragment_size = 16 * 1024
let[@inline never] really_output_buf_ (self : t) =
self.header.fin <- true;
@ -142,7 +153,7 @@ module Writer = struct
if self.offset = Bytes.length self.buf then really_output_buf_ self
let send_pong (self : t) : unit =
let@ () = with_mutex_ self in
let@ () = self.mutex.with_lock in
self.header.fin <- true;
self.header.ty <- Frame_type.pong;
self.header.payload_len <- 0;
@ -151,7 +162,7 @@ module Writer = struct
write_header_ self
let output_char (self : t) c : unit =
let@ () = with_mutex_ self in
let@ () = self.mutex.with_lock in
let cap = Bytes.length self.buf - self.offset in
(* make room for [c] *)
if cap = 0 then really_output_buf_ self;
@ -161,7 +172,7 @@ module Writer = struct
if cap = 1 then really_output_buf_ self
let output (self : t) buf i len : unit =
let@ () = with_mutex_ self in
let@ () = self.mutex.with_lock in
let i = ref i in
let len = ref len in
while !len > 0 do
@ -179,7 +190,7 @@ module Writer = struct
flush_if_full self
let flush self : unit =
let@ () = with_mutex_ self in
let@ () = self.mutex.with_lock in
flush_ self
end
@ -390,8 +401,8 @@ module Reader = struct
)
end
let upgrade ic oc : _ * _ =
let writer = Writer.create ~oc () in
let upgrade ?(with_lock = With_lock.default_builder ()) ic oc : _ * _ =
let writer = Writer.create ~with_lock ~oc () in
let reader = Reader.create ~ic ~writer () in
let ws_ic : IO.Input.t =
object
@ -418,6 +429,7 @@ let upgrade ic oc : _ * _ =
upgrade handler *)
module Make_upgrade_handler (X : sig
val accept_ws_protocol : string -> bool
val with_lock : With_lock.builder
val handler : handler
end) : Server.UPGRADE_HANDLER with type handshake_state = unit Request.t =
struct
@ -462,7 +474,8 @@ struct
try Ok (handshake_ req) with Bad_req s -> Error s
let handle_connection req ic oc =
let ws_ic, ws_oc = upgrade ic oc in
let with_lock = X.with_lock () in
let ws_ic, ws_oc = upgrade ~with_lock ic oc in
try X.handler req ws_ic ws_oc
with Close_connection ->
Log.debug (fun k -> k "websocket: requested to close the connection");
@ -470,9 +483,11 @@ struct
end
let add_route_handler ?accept ?(accept_ws_protocol = fun _ -> true) ?middlewares
(server : Server.t) route (f : handler) : unit =
?(with_lock = With_lock.default_builder) (server : Server.t) route
(f : handler) : unit =
let module M = Make_upgrade_handler (struct
let handler = f
let with_lock = with_lock
let accept_ws_protocol = accept_ws_protocol
end) in
let up : Server.upgrade_handler = (module M) in

View file

@ -3,11 +3,36 @@
This sub-library ([tiny_httpd.ws]) exports a small implementation for a
websocket server. It has no additional dependencies. *)
(** Synchronization primitive used to allow both the reader to reply to "ping",
and the handler to send messages, without stepping on each other's toes.
@since NEXT_RELEASE *)
module With_lock : sig
type t = { with_lock: 'a. (unit -> 'a) -> 'a }
(** A primitive to run the callback in a critical section where others cannot
run at the same time.
The default is a mutex, but that works poorly with thread pools so it's
possible to use a semaphore or a cooperative mutex instead. *)
type builder = unit -> t
val default_builder : builder
(** Lock using [Mutex]. *)
end
type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit
(** Websocket handler *)
val upgrade : IO.Input.t -> IO.Output.t -> IO.Input.t * IO.Output.t
(** Upgrade a byte stream to the websocket framing protocol. *)
val upgrade :
?with_lock:With_lock.t ->
IO.Input.t ->
IO.Output.t ->
IO.Input.t * IO.Output.t
(** Upgrade a byte stream to the websocket framing protocol.
@param with_lock
if provided, use this to prevent reader and writer to compete on sending
frames. since NEXT_RELEASE. *)
exception Close_connection
(** Exception that can be raised from IOs inside the handler, when the
@ -17,6 +42,7 @@ val add_route_handler :
?accept:(unit Request.t -> (unit, int * string) result) ->
?accept_ws_protocol:(string -> bool) ->
?middlewares:Server.Head_middleware.t list ->
?with_lock:With_lock.builder ->
Server.t ->
(Server.upgrade_handler, Server.upgrade_handler) Route.t ->
handler ->
@ -24,7 +50,11 @@ val add_route_handler :
(** Add a route handler for a websocket endpoint.
@param accept_ws_protocol
decides whether this endpoint accepts the websocket protocol sent by the
client. Default accepts everything. *)
client. Default accepts everything.
@param with_lock
if provided, use this to synchronize writes between the frame reader
(replies "pong" to "ping") and the handler emitting writes. since
NEXT_RELEASE. *)
(**/**)

View file

@ -1,4 +1,4 @@
(tests
(names t_util t_buf t_server t_io t_response)
(names t_util t_buf t_server t_io t_response t_headers)
(package tiny_httpd)
(libraries tiny_httpd.core qcheck-core qcheck-core.runner test_util))

23
tests/unit/t_headers.ml Normal file
View file

@ -0,0 +1,23 @@
open Tiny_httpd_core
(* Test that header size limits are enforced *)
let test_header_too_large () =
(* Create a header that's larger than 16KB *)
let large_value = String.make 20000 'x' in
let q =
"GET / HTTP/1.1\r\nHost: example.com\r\nX-Large: " ^ large_value
^ "\r\n\r\n"
in
let str = IO.Input.of_string q in
let client_addr = Unix.(ADDR_INET (inet_addr_loopback, 1024)) in
let buf = Buf.create () in
try
let _ =
Request.Private_.parse_req_start_exn ~client_addr ~buf
~get_time_s:(fun _ -> 0.)
str
in
failwith "should have failed with 431"
with Tiny_httpd_core.Response.Bad_req (431, _) -> () (* expected *)
let () = test_header_too_large ()

View file

@ -40,7 +40,7 @@ let () = assert_eq (Ok [ "foo", "bar" ]) (U.parse_query "yolo#foo=bar")
let () =
add_qcheck
@@ QCheck.Test.make ~name:__LOC__ ~long_factor:20 ~count:1_000
Q.(small_list (pair string string))
Q.(list_small (pair string string))
(fun l ->
List.iter
(fun (a, b) ->

View file

@ -14,9 +14,9 @@ let () =
@@ QCheck.Test.make ~count:10_000
Q.(
triple
(bytes_of_size (Gen.return 4))
(option small_nat)
(bytes_of_size Gen.(0 -- 6000))
(bytes_size (Gen.return 4))
(option nat_small)
(bytes_size Gen.(0 -- 6000))
(* |> Q.add_stat ("b.size", fun (_k, b) -> Bytes.length b) *)
|> Q.add_shrink_invariant (fun (k, _, _) -> Bytes.length k = 4))
(fun (key, mask_offset, b) ->

View file

@ -17,12 +17,12 @@ depends: [
"result"
"hmap"
"iostream" {>= "0.2"}
"ocaml" {>= "4.08"}
"ocaml" {>= "4.13"}
"odoc" {with-doc}
"logs" {with-test}
"conf-libcurl" {with-test}
"ptime" {with-test}
"qcheck-core" {>= "0.9" & with-test}
"qcheck-core" {>= "0.91" & with-test}
]
depopts: [
"logs"

View file

@ -11,6 +11,7 @@ depends: [
"dune" {>= "3.2"}
"tiny_httpd" {= version}
"eio" {>= "1.0" & < "2.0"}
"base-unix"
"logs" {with-test}
"odoc" {with-doc}
]