From ee310b5262b8d42d4c96a22a3c067f4909f17efa Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sun, 2 Jan 2022 16:48:46 -0500 Subject: [PATCH 1/8] fix: description for 401 code --- src/Tiny_httpd_server.ml | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Tiny_httpd_server.ml b/src/Tiny_httpd_server.ml index 30c35d11..9378faae 100644 --- a/src/Tiny_httpd_server.ml +++ b/src/Tiny_httpd_server.ml @@ -40,6 +40,7 @@ module Response_code = struct | 302 -> "Found" | 304 -> "Not Modified" | 400 -> "Bad request" + | 401 -> "Unauthorized" | 403 -> "Forbidden" | 404 -> "Not found" | 405 -> "Method not allowed" From a32297ac6cf0833b976c50ed7c6a2eeefc6845c5 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 2 Jun 2023 20:09:28 -0400 Subject: [PATCH 2/8] add `Tiny_httpd_io` module, abstraction over IOs abstract channels, and abstract TCP server. --- src/Tiny_httpd.ml | 1 + src/Tiny_httpd.mli | 4 ++ src/Tiny_httpd_io.ml | 85 +++++++++++++++++++++++++++++++++++++++ src/Tiny_httpd_stream.ml | 52 ++++++++++++------------ src/Tiny_httpd_stream.mli | 12 ++++++ 5 files changed, 129 insertions(+), 25 deletions(-) create mode 100644 src/Tiny_httpd_io.ml diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index 87e7890b..5213ef34 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -4,3 +4,4 @@ include Tiny_httpd_server module Util = Tiny_httpd_util module Dir = Tiny_httpd_dir module Html = Tiny_httpd_html +module IO = Tiny_httpd_io diff --git a/src/Tiny_httpd.mli b/src/Tiny_httpd.mli index e7385567..607d7461 100644 --- a/src/Tiny_httpd.mli +++ b/src/Tiny_httpd.mli @@ -85,6 +85,10 @@ module Buf = Tiny_httpd_buf module Byte_stream = Tiny_httpd_stream +(** {2 IO Abstraction} *) + +module IO = Tiny_httpd_io + (** {2 Main Server Type} *) (** @inline *) diff --git a/src/Tiny_httpd_io.ml b/src/Tiny_httpd_io.ml new file mode 100644 index 00000000..8d36a2a4 --- /dev/null +++ b/src/Tiny_httpd_io.ml @@ -0,0 +1,85 @@ +(** IO abstraction. + + We abstract IO so we can support classic unix blocking IOs + with threads, and modern async IO with Eio. + + {b NOTE}: experimental. + + @since NEXT_RELEASE +*) + +module In_channel = struct + type t = { + input: bytes -> int -> int -> int; + (** Read into the slice. Returns [0] only if the + channel is closed. *) + close: unit -> unit; + } + + let of_in_channel ?(close_noerr = false) (ic : in_channel) : t = + { + input = (fun buf i len -> input ic buf i len); + close = + (fun () -> + if close_noerr then + close_in_noerr ic + else + close_in ic); + } + + let of_unix_fd ?(close_noerr = false) (fd : Unix.file_descr) : t = + { + input = (fun buf i len -> Unix.read fd buf i len); + close = + (fun () -> + if close_noerr then ( + try Unix.close fd with _ -> () + ) else + Unix.close fd); + } + + let[@inline] input (self : t) buf i len = self.input buf i len + let[@inline] close self : unit = self.close () +end + +module Out_channel = struct + type t = { + output: bytes -> int -> int -> unit; (** Output slice *) + flush: unit -> unit; (** Flush underlying buffer *) + close: unit -> unit; + } + + let of_out_channel ?(close_noerr = false) (oc : out_channel) : t = + { + output = (fun buf i len -> output oc buf i len); + flush = (fun () -> flush oc); + close = + (fun () -> + if close_noerr then + close_out_noerr oc + else + close_out oc); + } + + let[@inline] output (self : t) buf i len : unit = self.output buf i len + + let[@inline] output_string (self : t) (str : string) : unit = + self.output (Bytes.unsafe_of_string str) 0 (String.length str) + + let[@inline] close self : unit = self.close () +end + +(** A TCP server abstraction *) +module TCP_server = struct + type conn_handler = { + handle: In_channel.t -> Out_channel.t -> unit; + (** Handle client connection *) + } + + type t = { + listen: handle:conn_handler -> unit -> unit; + (** Blocking call to start listening for incoming connections. + Uses the connection handler to handle individual client connections. *) + endpoint: unit -> Unix.inet_addr * int; (** Endpoint we listen on *) + } +end diff --git a/src/Tiny_httpd_stream.ml b/src/Tiny_httpd_stream.ml index 66ce7aa3..3a100ebf 100644 --- a/src/Tiny_httpd_stream.ml +++ b/src/Tiny_httpd_stream.ml @@ -1,4 +1,5 @@ module Buf = Tiny_httpd_buf +module IO = Tiny_httpd_io let spf = Printf.sprintf @@ -45,37 +46,32 @@ let make ?(bs = Bytes.create @@ (16 * 1024)) ?(close = ignore) ~consume ~fill () in self -let of_chan_ ?(buf_size = 16 * 1024) ~close ic : t = +let of_input ?(buf_size = 16 * 1024) (ic : IO.In_channel.t) : t = make ~bs:(Bytes.create buf_size) - ~close:(fun _ -> close ic) + ~close:(fun _ -> IO.In_channel.close ic) ~consume:(fun self n -> self.off <- self.off + n; self.len <- self.len - n) ~fill:(fun self -> if self.off >= self.len then ( self.off <- 0; - self.len <- input ic self.bs 0 (Bytes.length self.bs) + self.len <- IO.In_channel.input ic self.bs 0 (Bytes.length self.bs) )) () -let of_chan = of_chan_ ~close:close_in -let of_chan_close_noerr = of_chan_ ~close:close_in_noerr +let of_chan_ ?buf_size ic ~close_noerr : t = + let inc = IO.In_channel.of_in_channel ~close_noerr ic in + of_input ?buf_size inc -let of_fd_ ?(buf_size = 16 * 1024) ~close ic : t = - make ~bs:(Bytes.create buf_size) - ~close:(fun _ -> close ic) - ~consume:(fun self n -> - self.off <- self.off + n; - self.len <- self.len - n) - ~fill:(fun self -> - if self.off >= self.len then ( - self.off <- 0; - self.len <- Unix.read ic self.bs 0 (Bytes.length self.bs) - )) - () +let of_chan ?buf_size ic = of_chan_ ?buf_size ic ~close_noerr:false +let of_chan_close_noerr ?buf_size ic = of_chan_ ?buf_size ic ~close_noerr:true -let of_fd = of_fd_ ~close:Unix.close -let of_fd_close_noerr = of_fd_ ~close:(fun f -> try Unix.close f with _ -> ()) +let of_fd_ ?buf_size ~close_noerr ic : t = + let inc = IO.In_channel.of_unix_fd ~close_noerr ic in + of_input ?buf_size inc + +let of_fd ?buf_size fd : t = of_fd_ ?buf_size ~close_noerr:false fd +let of_fd_close_noerr ?buf_size fd : t = of_fd_ ?buf_size ~close_noerr:true fd let rec iter f (self : t) : unit = self.fill_buf (); @@ -90,6 +86,9 @@ let rec iter f (self : t) : unit = let to_chan (oc : out_channel) (self : t) = iter (fun s i len -> output oc s i len) self +let to_chan' (oc : IO.Out_channel.t) (self : t) = + iter (fun s i len -> IO.Out_channel.output oc s i len) self + let of_bytes ?(i = 0) ?len (bs : bytes) : t = (* invariant: !i+!len is constant *) let len = @@ -298,19 +297,22 @@ let read_chunked ?(buf = Buf.create ()) ~fail (bs : t) : t = refill := false) () -(* print a stream as a series of chunks *) -let output_chunked (oc : out_channel) (self : t) : unit = +let output_chunked' (oc : IO.Out_channel.t) (self : t) : unit = let continue = ref true in while !continue do (* next chunk *) self.fill_buf (); let n = self.len in - Printf.fprintf oc "%x\r\n" n; - output oc self.bs self.off n; + IO.Out_channel.output_string oc (Printf.sprintf "%x\r\n" n); + IO.Out_channel.output oc self.bs self.off n; self.consume n; if n = 0 then continue := false; - output_string oc "\r\n" + IO.Out_channel.output_string oc "\r\n" done; (* write another crlf after the stream (see #56) *) - output_string oc "\r\n"; + IO.Out_channel.output_string oc "\r\n"; () + +(* print a stream as a series of chunks *) +let output_chunked (oc : out_channel) (self : t) : unit = + output_chunked' (IO.Out_channel.of_out_channel oc) self diff --git a/src/Tiny_httpd_stream.mli b/src/Tiny_httpd_stream.mli index b2966662..4fc7dbcd 100644 --- a/src/Tiny_httpd_stream.mli +++ b/src/Tiny_httpd_stream.mli @@ -36,6 +36,10 @@ val close : t -> unit val empty : t (** Stream with 0 bytes inside *) +val of_input : ?buf_size:int -> Tiny_httpd_io.In_channel.t -> t +(** Make a buffered stream from the given channel. + @since NEXT_RELEASE *) + val of_chan : ?buf_size:int -> in_channel -> t (** Make a buffered stream from the given channel. *) @@ -62,6 +66,10 @@ val to_chan : out_channel -> t -> unit (** Write the stream to the channel. @since 0.3 *) +val to_chan' : Tiny_httpd_io.Out_channel.t -> t -> unit +(** Write to the IO channel. + @since NEXT_RELEASE *) + val make : ?bs:bytes -> ?close:(t -> unit) -> @@ -111,3 +119,7 @@ val read_exactly : val output_chunked : out_channel -> t -> unit (** Write the stream into the channel, using the chunked encoding. *) + +val output_chunked' : Tiny_httpd_io.Out_channel.t -> t -> unit +(** Write the stream into the channel, using the chunked encoding. + @since NEXT_RELEASE *) From 009a8d6d3b7e9ce1924692ba650c6a5df09206e0 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 3 Jun 2023 20:54:18 -0400 Subject: [PATCH 3/8] test: tests for Buf --- tests/unit/dune | 6 +++--- tests/unit/t_buf.ml | 27 +++++++++++++++++++++++++++ tests/unit/t_util.ml | 23 ++--------------------- tests/unit/util/dune | 5 +++++ tests/unit/util/test_util.ml | 31 +++++++++++++++++++++++++++++++ 5 files changed, 68 insertions(+), 24 deletions(-) create mode 100644 tests/unit/t_buf.ml create mode 100644 tests/unit/util/dune create mode 100644 tests/unit/util/test_util.ml diff --git a/tests/unit/dune b/tests/unit/dune index 4bc4f509..329becc8 100644 --- a/tests/unit/dune +++ b/tests/unit/dune @@ -1,5 +1,5 @@ -(test - (name t_util) +(tests + (names t_util t_buf) (package tiny_httpd) - (libraries tiny_httpd qcheck-core qcheck-core.runner)) + (libraries tiny_httpd qcheck-core qcheck-core.runner test_util)) diff --git a/tests/unit/t_buf.ml b/tests/unit/t_buf.ml new file mode 100644 index 00000000..9ee0f685 --- /dev/null +++ b/tests/unit/t_buf.ml @@ -0,0 +1,27 @@ +open Test_util +open Tiny_httpd_buf + +let spf = Printf.sprintf + +let () = + let b = create ~size:4 () in + add_string b "hello"; + assert_eq ~to_string:(spf "%S") "hello" (contents b); + + add_string b " world"; + assert_eq ~to_string:(spf "%S") "hello world" (contents b); + () + +let buffer_of_string str = + let buf = Buffer.create 32 in + Buffer.add_string buf str; + buf + +let () = + let b = create ~size:4 () in + add_buffer b (buffer_of_string "hello"); + assert_eq ~to_string:(spf "%S") "hello" (contents b); + + add_buffer b (buffer_of_string " world"); + assert_eq ~to_string:(spf "%S") "hello world" (contents b); + () diff --git a/tests/unit/t_util.ml b/tests/unit/t_util.ml index 556c5e6f..3ae913a4 100644 --- a/tests/unit/t_util.ml +++ b/tests/unit/t_util.ml @@ -1,25 +1,6 @@ -module Q = QCheck - -(* test utils *) -let pp_res f = function - | Ok x -> f x - | Error e -> e - -let pp_res_query = Q.Print.(pp_res (list (pair string string))) - -let err_map f = function - | Ok x -> Ok (f x) - | Error e -> Error e - -let sort_l l = List.sort compare l -let eq_sorted a b = err_map sort_l a = err_map sort_l b -let is_ascii_char c = Char.code c < 128 -let assert_eq ?(cmp = ( = )) a b = assert (cmp a b) - +open Test_util open Tiny_httpd_util -let qchecks = ref [] -let add_qcheck f = qchecks := f :: !qchecks let () = assert_eq "hello%20world" (percent_encode "hello world") let () = assert_eq "%23%25^%24%40^%40" (percent_encode "#%^$@^@") @@ -67,4 +48,4 @@ let () = in eq_sorted (Ok l) (parse_query s)) -let () = exit @@ QCheck_base_runner.run_tests ~colors:false !qchecks +let () = run_qcheck_and_exit () diff --git a/tests/unit/util/dune b/tests/unit/util/dune new file mode 100644 index 00000000..fb97b15e --- /dev/null +++ b/tests/unit/util/dune @@ -0,0 +1,5 @@ + +(library + (name test_util) + (modules test_util) + (libraries qcheck-core qcheck-core.runner)) diff --git a/tests/unit/util/test_util.ml b/tests/unit/util/test_util.ml new file mode 100644 index 00000000..ae225b00 --- /dev/null +++ b/tests/unit/util/test_util.ml @@ -0,0 +1,31 @@ +module Q = QCheck + +(* test utils *) +let pp_res f = function + | Ok x -> f x + | Error e -> e + +let pp_res_query = Q.Print.(pp_res (list (pair string string))) + +let err_map f = function + | Ok x -> Ok (f x) + | Error e -> Error e + +let sort_l l = List.sort compare l +let eq_sorted a b = err_map sort_l a = err_map sort_l b +let is_ascii_char c = Char.code c < 128 + +let assert_eq ?to_string ?(cmp = ( = )) a b = + let ok = cmp a b in + if not ok then ( + (match to_string with + | Some f -> Printf.eprintf "failed: %s != %s\n%!" (f a) (f b) + | None -> ()); + failwith "test failed" + ) + +let qchecks = ref [] +let add_qcheck f = qchecks := f :: !qchecks + +let run_qcheck_and_exit () : 'a = + exit @@ QCheck_base_runner.run_tests ~colors:false !qchecks From 5d7637becc89c402d3d5bc0324d5c5b366e76ea2 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 3 Jun 2023 20:54:28 -0400 Subject: [PATCH 4/8] server: add `IO_BACKEND` abstraction; implement a unix version of it this doesn't change the `create`+`run` version, but makes it possible to create a server that doesn't use unix IOs. --- src/Tiny_httpd_buf.ml | 10 ++ src/Tiny_httpd_buf.mli | 8 + src/Tiny_httpd_io.ml | 26 ++- src/Tiny_httpd_server.ml | 368 ++++++++++++++++++++++++++------------ src/Tiny_httpd_server.mli | 41 ++++- 5 files changed, 332 insertions(+), 121 deletions(-) diff --git a/src/Tiny_httpd_buf.ml b/src/Tiny_httpd_buf.ml index 2c706180..e3e2faa2 100644 --- a/src/Tiny_httpd_buf.ml +++ b/src/Tiny_httpd_buf.ml @@ -20,6 +20,16 @@ let add_bytes (self : t) s i len : unit = Bytes.blit s i self.bytes self.i len; self.i <- self.i + len +let[@inline] add_string self str : unit = + add_bytes self (Bytes.unsafe_of_string str) 0 (String.length str) + +let add_buffer (self : t) (buf : Buffer.t) : unit = + let len = Buffer.length buf in + if self.i + len >= Bytes.length self.bytes then + resize self (self.i + (self.i / 2) + len + 10); + Buffer.blit buf 0 self.bytes self.i len; + self.i <- self.i + len + let contents (self : t) : string = Bytes.sub_string self.bytes 0 self.i let contents_and_clear (self : t) : string = diff --git a/src/Tiny_httpd_buf.mli b/src/Tiny_httpd_buf.mli index b500ccaf..2bcfe58b 100644 --- a/src/Tiny_httpd_buf.mli +++ b/src/Tiny_httpd_buf.mli @@ -24,3 +24,11 @@ val contents_and_clear : t -> string val add_bytes : t -> bytes -> int -> int -> unit (** Append given bytes slice to the buffer. @since 0.5 *) + +val add_string : t -> string -> unit +(** Add string. + @since NEXT_RELEASE *) + +val add_buffer : t -> Buffer.t -> unit +(** Append bytes from buffer. + @since NEXT_RELEASE *) diff --git a/src/Tiny_httpd_io.ml b/src/Tiny_httpd_io.ml index 8d36a2a4..749f53d9 100644 --- a/src/Tiny_httpd_io.ml +++ b/src/Tiny_httpd_io.ml @@ -8,6 +8,8 @@ @since NEXT_RELEASE *) +module Buf = Tiny_httpd_buf + module In_channel = struct type t = { input: bytes -> int -> int -> int; @@ -67,6 +69,11 @@ module Out_channel = struct self.output (Bytes.unsafe_of_string str) 0 (String.length str) let[@inline] close self : unit = self.close () + let[@inline] flush self : unit = self.flush () + + let output_buf (self : t) (buf : Buf.t) : unit = + let b = Buf.bytes_slice buf in + output self b 0 (Buf.size buf) end (** A TCP server abstraction *) @@ -77,9 +84,20 @@ module TCP_server = struct } type t = { - listen: handle:conn_handler -> unit -> unit; - (** Blocking call to start listening for incoming connections. - Uses the connection handler to handle individual client connections. *) - endpoint: unit -> Unix.inet_addr * int; (** Endpoint we listen on *) + endpoint: unit -> string * int; + (** Endpoint we listen on. This can only be called from within [serve]. *) + active_connections: unit -> int; + (** Number of connections currently active *) + running: unit -> bool; (** Is the server currently running? *) + stop: unit -> unit; + (** Ask the server to stop. This might not take effect immediately. *) } + (** Running server. *) + + type builder = { + serve: after_init:(t -> unit) -> handle:conn_handler -> unit -> unit; + (** Blocking call to listen for incoming connections and handle them. + Uses the connection handler to handle individual client connections. *) + } + (** A TCP server implementation. *) end diff --git a/src/Tiny_httpd_server.ml b/src/Tiny_httpd_server.ml index 9378faae..24086a08 100644 --- a/src/Tiny_httpd_server.ml +++ b/src/Tiny_httpd_server.ml @@ -18,6 +18,7 @@ let _debug k = module Buf = Tiny_httpd_buf module Byte_stream = Tiny_httpd_stream +module IO = Tiny_httpd_io exception Bad_req of int * string @@ -423,9 +424,19 @@ module Response = struct Format.fprintf out "{@[code=%d;@ headers=[@[%a@]];@ body=%a@]}" self.code Headers.pp self.headers pp_body self.body - let output_ (oc : out_channel) (self : t) : unit = - Printf.fprintf oc "HTTP/1.1 %d %s\r\n" self.code + let output_ ?(buf = Buf.create ~size:256 ()) (oc : IO.Out_channel.t) + (self : t) : unit = + (* double indirection: + - print into [buffer] using [bprintf] + - transfer to [buf_] so we can output from there *) + let tmp_buffer = Buffer.create 32 in + + (* write start of reply *) + Printf.bprintf tmp_buffer "HTTP/1.1 %d %s\r\n" self.code (Response_code.descr self.code); + Buf.add_buffer buf tmp_buffer; + Buffer.clear tmp_buffer; + let body, is_chunked = match self.body with | `String s when String.length s > 1024 * 500 -> @@ -447,19 +458,29 @@ module Response = struct _debug (fun k -> k "output response: %s" (Format.asprintf "%a" pp { self with body = `String "<…>" })); - List.iter (fun (k, v) -> Printf.fprintf oc "%s: %s\r\n" k v) headers; - output_string oc "\r\n"; + + (* write headers *) + List.iter + (fun (k, v) -> + Printf.bprintf tmp_buffer "%s: %s\r\n" k v; + Buf.add_buffer buf tmp_buffer; + Buffer.clear tmp_buffer) + headers; + + IO.Out_channel.output_buf oc buf; + IO.Out_channel.output_string oc "\r\n"; + (match body with | `String "" | `Void -> () - | `String s -> output_string oc s + | `String s -> IO.Out_channel.output_string oc s | `Stream str -> (try - Byte_stream.output_chunked oc str; + Byte_stream.output_chunked' oc str; Byte_stream.close str with e -> Byte_stream.close str; raise e)); - flush oc + IO.Out_channel.flush oc end (* semaphore, for limiting concurrency. *) @@ -593,7 +614,7 @@ module Middleware = struct end (* a request handler. handles a single request. *) -type cb_path_handler = out_channel -> Middleware.handler +type cb_path_handler = IO.Out_channel.t -> Middleware.handler module type SERVER_SENT_GENERATOR = sig val set_headers : Headers.t -> unit @@ -606,18 +627,26 @@ end type server_sent_generator = (module SERVER_SENT_GENERATOR) +module type IO_BACKEND = sig + val init_addr : unit -> string + val init_port : unit -> int + + val spawn : (unit -> unit) -> unit + (** function used to spawn a new thread to handle a + new client connection. By default it is {!Thread.create} but one + could use a thread pool instead.*) + + val get_time_s : unit -> float + (** obtain the current timestamp in seconds. *) + + val tcp_server : unit -> Tiny_httpd_io.TCP_server.builder + (** Server that can listen on a port and handle clients. *) +end + type t = { - addr: string; (** Address at creation *) - port: int; (** Port at creation *) - mutable sock: Unix.file_descr option; (** Socket *) - timeout: float; - sem_max_connections: Sem_.t; - (* semaphore to restrict the number of active concurrent connections *) - new_thread: (unit -> unit) -> unit; - (* a function to run the given callback in a separate thread (or thread pool) *) - masksigpipe: bool; + backend: (module IO_BACKEND); + mutable tcp_server: IO.TCP_server.t option; buf_size: int; - get_time_s: unit -> float; mutable handler: string Request.t -> Response.t; (** toplevel handler, if any *) mutable middlewares: (int * Middleware.t) list; (** Global middlewares *) @@ -626,9 +655,6 @@ type t = { mutable path_handlers: (unit Request.t -> cb_path_handler resp_result option) list; (** path handlers *) - mutable running: bool; - (** true while the server is running. no need to protect with a mutex, - writes should be atomic enough. *) } let get_addr_ sock = @@ -636,17 +662,24 @@ let get_addr_ sock = | Unix.ADDR_INET (addr, port) -> addr, port | _ -> invalid_arg "httpd: address is not INET" -let addr self = - match self.sock with - | None -> self.addr - | Some s -> Unix.string_of_inet_addr (fst @@ get_addr_ s) +let addr (self : t) = + match self.tcp_server with + | None -> + let (module B) = self.backend in + B.init_addr () + | Some s -> fst @@ s.endpoint () -let port self = - match self.sock with - | None -> self.port - | Some sock -> snd @@ get_addr_ sock +let port (self : t) = + match self.tcp_server with + | None -> + let (module B) = self.backend in + B.init_port () + | Some s -> snd @@ s.endpoint () -let active_connections self = Sem_.num_acquired self.sem_max_connections - 1 +let active_connections (self : t) = + match self.tcp_server with + | None -> 0 + | Some s -> s.active_connections () let add_middleware ~stage self m = let stage = @@ -726,8 +759,10 @@ let[@inline] _opt_iter ~f o = | None -> () | Some x -> f x +exception Exit_SSE + let add_route_server_sent_handler ?accept self route f = - let tr_req oc req ~resp f = + let tr_req (oc : IO.Out_channel.t) req ~resp f = let req = Request.read_body_full ~buf_size:self.buf_size req in let headers = ref Headers.(empty |> set "content-type" "text/event-stream") @@ -746,16 +781,20 @@ let add_route_server_sent_handler ?accept self route f = ) in + let[@inline] writef fmt = + Printf.ksprintf (IO.Out_channel.output_string oc) fmt + in + let send_event ?event ?id ?retry ~data () : unit = send_response_idempotent_ (); - _opt_iter event ~f:(fun e -> Printf.fprintf oc "event: %s\n" e); - _opt_iter id ~f:(fun e -> Printf.fprintf oc "id: %s\n" e); - _opt_iter retry ~f:(fun e -> Printf.fprintf oc "retry: %s\n" e); + _opt_iter event ~f:(fun e -> writef "event: %s\n" e); + _opt_iter id ~f:(fun e -> writef "id: %s\n" e); + _opt_iter retry ~f:(fun e -> writef "retry: %s\n" e); let l = String.split_on_char '\n' data in - List.iter (fun s -> Printf.fprintf oc "data: %s\n" s) l; - output_string oc "\n"; + List.iter (fun s -> writef "data: %s\n" s) l; + IO.Out_channel.output_string oc "\n"; (* finish group *) - flush oc + IO.Out_channel.flush oc in let module SSG = struct let set_headers h = @@ -765,32 +804,26 @@ let add_route_server_sent_handler ?accept self route f = ) let send_event = send_event - let close () = raise Exit + let close () = raise Exit_SSE end in - try f req (module SSG : SERVER_SENT_GENERATOR) with Exit -> close_out oc + try f req (module SSG : SERVER_SENT_GENERATOR) + with Exit_SSE -> IO.Out_channel.close oc in add_route_handler_ self ?accept ~meth:`GET route ~tr_req f -let create ?(masksigpipe = true) ?(max_connections = 32) ?(timeout = 0.0) - ?(buf_size = 16 * 1_024) ?(get_time_s = Unix.gettimeofday) - ?(new_thread = fun f -> ignore (Thread.create f () : Thread.t)) - ?(addr = "127.0.0.1") ?(port = 8080) ?sock ?(middlewares = []) () : t = - let handler _req = Response.fail ~code:404 "no top handler" in +let get_max_connection_ ?(max_connections = 64) () : int = let max_connections = max 4 max_connections in + max_connections + +let create_from ?(buf_size = 16 * 1_024) ?(middlewares = []) ~backend () : t = + let handler _req = Response.fail ~code:404 "no top handler" in let self = { - new_thread; - addr; - port; - sock; - masksigpipe; + backend; + tcp_server = None; handler; buf_size; - running = true; - sem_max_connections = Sem_.create max_connections; path_handlers = []; - timeout; - get_time_s; middlewares = []; middlewares_sorted = lazy []; } @@ -798,7 +831,149 @@ let create ?(masksigpipe = true) ?(max_connections = 32) ?(timeout = 0.0) List.iter (fun (stage, m) -> add_middleware self ~stage m) middlewares; self -let stop s = s.running <- false +let is_ipv6_str addr : bool = String.contains addr ':' + +module Unix_tcp_server_ = struct + type t = { + addr: string; + port: int; + max_connections: int; + sem_max_connections: Sem_.t; + (** semaphore to restrict the number of active concurrent connections *) + mutable sock: Unix.file_descr option; (** Socket *) + new_thread: (unit -> unit) -> unit; + timeout: float; + masksigpipe: bool; + mutable running: bool; (* TODO: use an atomic? *) + } + + let to_tcp_server (self : t) : IO.TCP_server.builder = + { + IO.TCP_server.serve = + (fun ~after_init ~handle () : unit -> + if self.masksigpipe then + ignore (Unix.sigprocmask Unix.SIG_BLOCK [ Sys.sigpipe ] : _ list); + let sock, should_bind = + match self.sock with + | Some s -> + ( s, + false + (* Because we're getting a socket from the caller (e.g. systemd) *) + ) + | None -> + ( Unix.socket + (if is_ipv6_str self.addr then + Unix.PF_INET6 + else + Unix.PF_INET) + Unix.SOCK_STREAM 0, + true (* Because we're creating the socket ourselves *) ) + in + Unix.clear_nonblock sock; + Unix.setsockopt_optint sock Unix.SO_LINGER None; + if should_bind then ( + let inet_addr = Unix.inet_addr_of_string self.addr in + Unix.setsockopt sock Unix.SO_REUSEADDR true; + Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port)); + let n_listen = 2 * self.max_connections in + Unix.listen sock n_listen + ); + + self.sock <- Some sock; + + let tcp_server = + { + IO.TCP_server.stop = (fun () -> self.running <- false); + running = (fun () -> self.running); + active_connections = + (fun () -> Sem_.num_acquired self.sem_max_connections - 1); + endpoint = + (fun () -> + let addr, port = get_addr_ sock in + Unix.string_of_inet_addr addr, port); + } + in + after_init tcp_server; + + (* how to handle a single client *) + let handle_client_unix_ (client_sock : Unix.file_descr) : unit = + Unix.(setsockopt_float client_sock SO_RCVTIMEO self.timeout); + Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout); + let oc = + IO.Out_channel.of_out_channel + @@ Unix.out_channel_of_descr client_sock + in + let ic = IO.In_channel.of_unix_fd client_sock in + handle.handle ic oc; + _debug (fun k -> k "done with client, exiting"); + (try Unix.close client_sock + with e -> + _debug (fun k -> + k "error when closing sock: %s" (Printexc.to_string e))); + () + in + + while self.running do + (* limit concurrency *) + Sem_.acquire 1 self.sem_max_connections; + try + let client_sock, _ = Unix.accept sock in + self.new_thread (fun () -> + try + handle_client_unix_ client_sock; + Sem_.release 1 self.sem_max_connections + with e -> + (try Unix.close client_sock with _ -> ()); + Sem_.release 1 self.sem_max_connections; + raise e) + with e -> + Sem_.release 1 self.sem_max_connections; + _debug (fun k -> + k "Unix.accept or Thread.create raised an exception: %s" + (Printexc.to_string e)) + done; + ()); + } +end + +let create ?(masksigpipe = true) ?max_connections ?(timeout = 0.0) ?buf_size + ?(get_time_s = Unix.gettimeofday) + ?(new_thread = fun f -> ignore (Thread.create f () : Thread.t)) + ?(addr = "127.0.0.1") ?(port = 8080) ?sock ?middlewares () : t = + let max_connections = get_max_connection_ ?max_connections () in + let server = + { + Unix_tcp_server_.addr; + new_thread; + running = true; + port; + sock; + max_connections; + sem_max_connections = Sem_.create max_connections; + masksigpipe; + timeout; + } + in + let tcp_server_builder = Unix_tcp_server_.to_tcp_server server in + let module B = struct + let init_addr () = addr + let init_port () = port + let get_time_s = get_time_s + let spawn f = new_thread f + let tcp_server () = tcp_server_builder + end in + let backend = (module B : IO_BACKEND) in + create_from ?buf_size ?middlewares ~backend () + +let stop (self : t) = + match self.tcp_server with + | None -> () + | Some s -> s.stop () + +let running (self : t) = + match self.tcp_server with + | None -> false + | Some s -> s.running () let find_map f l = let rec aux f = function @@ -810,16 +985,15 @@ let find_map f l = in aux f l -let handle_client_ (self : t) (client_sock : Unix.file_descr) : unit = - Unix.(setsockopt_float client_sock SO_RCVTIMEO self.timeout); - Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout); - let oc = Unix.out_channel_of_descr client_sock in +(* handle client on [ic] and [oc] *) +let client_handle_for (self : t) ic oc : unit = let buf = Buf.create ~size:self.buf_size () in - let is = Byte_stream.of_fd ~buf_size:self.buf_size client_sock in + let is = Byte_stream.of_input ~buf_size:self.buf_size ic in let continue = ref true in - while !continue && self.running do + while !continue && running self do _debug (fun k -> k "read next request"); - match Request.parse_req_start ~get_time_s:self.get_time_s ~buf is with + let (module B) = self.backend in + match Request.parse_req_start ~get_time_s:B.get_time_s ~buf is with | Ok None -> continue := false (* client is done *) | Error (c, s) -> (* connection error, close *) @@ -833,7 +1007,7 @@ let handle_client_ (self : t) (client_sock : Unix.file_descr) : unit = (try (* is there a handler for this path? *) - let handler = + let base_handler = match find_map (fun ph -> ph req) self.path_handlers with | Some f -> unwrap_resp_result f | None -> @@ -857,7 +1031,7 @@ let handle_client_ (self : t) (client_sock : Unix.file_descr) : unit = List.fold_right (fun (_, m) h -> m h) (Lazy.force self.middlewares_sorted) - (handler oc) + (base_handler oc) in (* now actually read request's body into a stream *) @@ -889,62 +1063,24 @@ let handle_client_ (self : t) (client_sock : Unix.file_descr) : unit = continue := false; Response.output_ oc @@ Response.fail ~code:500 "server error: %s" (Printexc.to_string e)) - done; - _debug (fun k -> k "done with client, exiting"); - (try Unix.close client_sock - with e -> - _debug (fun k -> k "error when closing sock: %s" (Printexc.to_string e))); - () + done -let is_ipv6 self = String.contains self.addr ':' +let client_handler (self : t) : IO.TCP_server.conn_handler = + { IO.TCP_server.handle = client_handle_for self } +let is_ipv6 (self : t) = + let (module B) = self.backend in + is_ipv6_str (B.init_addr ()) + +(* TODO: init TCP server *) let run ?(after_init = ignore) (self : t) : (unit, _) result = try - if self.masksigpipe then - ignore (Unix.sigprocmask Unix.SIG_BLOCK [ Sys.sigpipe ] : _ list); - let sock, should_bind = - match self.sock with - | Some s -> - ( s, - false - (* Because we're getting a socket from the caller (e.g. systemd) *) ) - | None -> - ( Unix.socket - (if is_ipv6 self then - Unix.PF_INET6 - else - Unix.PF_INET) - Unix.SOCK_STREAM 0, - true (* Because we're creating the socket ourselves *) ) - in - Unix.clear_nonblock sock; - Unix.setsockopt_optint sock Unix.SO_LINGER None; - if should_bind then ( - let inet_addr = Unix.inet_addr_of_string self.addr in - Unix.setsockopt sock Unix.SO_REUSEADDR true; - Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port)); - Unix.listen sock (2 * self.sem_max_connections.Sem_.n) - ); - self.sock <- Some sock; - after_init (); - while self.running do - (* limit concurrency *) - Sem_.acquire 1 self.sem_max_connections; - try - let client_sock, _ = Unix.accept sock in - self.new_thread (fun () -> - try - handle_client_ self client_sock; - Sem_.release 1 self.sem_max_connections - with e -> - (try Unix.close client_sock with _ -> ()); - Sem_.release 1 self.sem_max_connections; - raise e) - with e -> - Sem_.release 1 self.sem_max_connections; - _debug (fun k -> - k "Unix.accept or Thread.create raised an exception: %s" - (Printexc.to_string e)) - done; + let (module B) = self.backend in + let server = B.tcp_server () in + server.serve + ~after_init:(fun tcp_server -> + self.tcp_server <- Some tcp_server; + after_init ()) + ~handle:(client_handler self) (); Ok () with e -> Error e diff --git a/src/Tiny_httpd_server.mli b/src/Tiny_httpd_server.mli index 3ff0eac7..88011f56 100644 --- a/src/Tiny_httpd_server.mli +++ b/src/Tiny_httpd_server.mli @@ -369,7 +369,7 @@ val create : ?middlewares:([ `Encoding | `Stage of int ] * Middleware.t) list -> unit -> t -(** Create a new webserver. +(** Create a new webserver using UNIX abstractions. The server will not do anything until {!run} is called on it. Before starting the server, one can use {!add_path_handler} and @@ -401,6 +401,41 @@ val create : This parameter exists since 0.11. *) +(** A backend that provides IO operations, network operations, etc. *) +module type IO_BACKEND = sig + val init_addr : unit -> string + val init_port : unit -> int + + val spawn : (unit -> unit) -> unit + (** function used to spawn a new thread to handle a + new client connection. By default it is {!Thread.create} but one + could use a thread pool instead.*) + + val get_time_s : unit -> float + (** obtain the current timestamp in seconds. *) + + val tcp_server : unit -> Tiny_httpd_io.TCP_server.builder + (** Server that can listen on a port and handle clients. *) +end + +val create_from : + ?buf_size:int -> + ?middlewares:([ `Encoding | `Stage of int ] * Middleware.t) list -> + backend:(module IO_BACKEND) -> + unit -> + t +(** Create a new webserver using provided backend. + + The server will not do anything until {!run} is called on it. + Before starting the server, one can use {!add_path_handler} and + {!set_top_handler} to specify how to handle incoming requests. + + @param buf_size size for buffers (since 0.11) + @param middlewares see {!add_middleware} for more details. + + @since NEXT_RELEASE +*) + val addr : t -> string (** Address on which the server listens. *) @@ -556,6 +591,10 @@ val add_route_server_sent_handler : (** {2 Run the server} *) +val running : t -> bool +(** Is the server running? + @since NEXT_RELEASE *) + val stop : t -> unit (** Ask the server to stop. This might not have an immediate effect as {!run} might currently be waiting on IO. *) From c907dc6af140656379edbaeaff50c466a4b30ab5 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 3 Jun 2023 22:45:38 -0400 Subject: [PATCH 5/8] tighten flags; little fix --- echo.sh | 2 +- src/dune | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/echo.sh b/echo.sh index 0f6cf2d7..fc660ce6 100755 --- a/echo.sh +++ b/echo.sh @@ -1,2 +1,2 @@ #!/bin/sh -exec dune exec --profile=release "examples/echo.exe" -- $@ +exec dune exec --display=quiet --profile=release "examples/echo.exe" -- $@ diff --git a/src/dune b/src/dune index 5cccc26d..cebbd305 100644 --- a/src/dune +++ b/src/dune @@ -1,8 +1,13 @@ + +(env + (_ + (flags :standard -warn-error -a+8 -w +a-4-32-40-42-44-70 -color always -safe-string + -strict-sequence))) + (library (name tiny_httpd) (public_name tiny_httpd) (libraries threads seq) - (flags :standard -safe-string -strict-sequence -w +a-4-40-42 -warn-error -a+8) (wrapped false)) (rule From ed0f016f274baf876d403037598431d92ec8944e Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 5 Jun 2023 21:58:48 -0400 Subject: [PATCH 6/8] chore: makefile runs everything in release mode --- Makefile | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index de348c88..4da8a1e6 100644 --- a/Makefile +++ b/Makefile @@ -2,11 +2,12 @@ all: build test +OPTS?=--profile=release build: - @dune build @install + @dune build @install $(OPTS) test: - @dune runtest --no-buffer --force + @dune runtest --no-buffer --force $(OPTS) clean: @dune clean @@ -16,7 +17,7 @@ doc: WATCH?= "@install @runtest" watch: - @dune build $(WATCH) -w + @dune build $(OPTS) $(WATCH) -w .PHONY: benchs tests build watch From 56bb2db880ae39ec2bbeccfef53a6d84e08a91b5 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 15 Jun 2023 21:16:47 -0400 Subject: [PATCH 7/8] doc --- src/Tiny_httpd_stream.mli | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Tiny_httpd_stream.mli b/src/Tiny_httpd_stream.mli index 4fc7dbcd..fe2d5968 100644 --- a/src/Tiny_httpd_stream.mli +++ b/src/Tiny_httpd_stream.mli @@ -27,8 +27,7 @@ type t = { _rest: hidden; (** Use {!make} to build a stream. *) } (** A buffered stream, with a view into the current buffer (or refill if empty), - and a function to consume [n] bytes. - See {!Byte_stream} for more details. *) + and a function to consume [n] bytes. *) val close : t -> unit (** Close stream *) From 04f17262b68cb9376e67c4076972a812625f2ba7 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sun, 2 Jan 2022 16:44:14 -0500 Subject: [PATCH 8/8] example: add super stupid http auth endpoints to echo.ml the endpoint /protected requires user:foobar login to see the content. --- examples/echo.ml | 57 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/examples/echo.ml b/examples/echo.ml index b0d152c0..6956f4d0 100644 --- a/examples/echo.ml +++ b/examples/echo.ml @@ -33,6 +33,16 @@ let middleware_stat () : S.Middleware.t * (unit -> string) = in m, get_stat +(* ugly AF *) +let base64 x = + let ic, oc = Unix.open_process "base64" in + output_string oc x; + flush oc; + close_out oc; + let r = input_line ic in + ignore (Unix.close_process (ic, oc)); + r + let () = let port_ = ref 8080 in let j = ref 32 in @@ -106,6 +116,35 @@ let () = S.Response.fail ~code:500 "couldn't upload file: %s" (Printexc.to_string e)); + (* protected by login *) + S.add_route_handler server + S.Route.(exact "protected" @/ return) + (fun req -> + let ok = + match S.Request.get_header req "authorization" with + | Some v -> + S._debug (fun k -> k "authenticate with %S" v); + v = "Basic " ^ base64 "user:foobar" + | None -> false + in + if ok then ( + (* FIXME: a logout link *) + let s = + "

hello, this is super secret!

log out" + in + S.Response.make_string (Ok s) + ) else ( + let headers = + S.Headers.(empty |> set "www-authenticate" "basic realm=\"echo\"") + in + S.Response.fail ~code:401 ~headers "invalid" + )); + + (* logout *) + S.add_route_handler server + S.Route.(exact "logout" @/ return) + (fun _req -> S.Response.fail ~code:401 "logged out"); + (* stats *) S.add_route_handler server S.Route.(exact "stats" @/ return) @@ -171,6 +210,24 @@ let () = txt " (GET) to access a VFS embedded in the binary"; ]; ]; + li [] + [ + pre [] + [ + a [ A.href "/protected" ] [ txt "/protected" ]; + txt + " (GET) to see a protected page (login: user, \ + password: foobar)"; + ]; + ]; + li [] + [ + pre [] + [ + a [ A.href "/logout" ] [ txt "/logout" ]; + txt " (POST) to log out"; + ]; + ]; ]; ]; ]