diff --git a/examples/dune b/examples/dune new file mode 100644 index 00000000..f623f8d0 --- /dev/null +++ b/examples/dune @@ -0,0 +1,26 @@ + +(executable + (name sse_server) + (modules sse_server) + (libraries tiny_httpd unix ptime ptime.clock.os)) + +(executable + (name sse_client) + (modules sse_client) + (libraries unix)) + +(rule + (targets test_output.txt) + (deps (:script ./run_test.sh) ./sse_client.exe ./sse_server.exe) + (enabled_if (= %{system} "linux")) + (package tiny_httpd) + (action + (with-stdout-to %{targets} (run %{script})))) + +(rule + (alias runtest) + (package tiny_httpd) + (enabled_if (= %{system} "linux")) + (deps test_output.txt) + (action + (diff test_output.txt.expected test_output.txt))) diff --git a/examples/run_test.sh b/examples/run_test.sh new file mode 100755 index 00000000..e522747d --- /dev/null +++ b/examples/run_test.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash + +PORT=8082 + +./sse_server.exe -p $PORT & +sleep 0.1 +./sse_client.exe -p $PORT --alarm=1 /count | tr -d '\r' || true + +kill %1 +echo "success" diff --git a/examples/sse_client.ml b/examples/sse_client.ml new file mode 100644 index 00000000..9ba011e4 --- /dev/null +++ b/examples/sse_client.ml @@ -0,0 +1,31 @@ +let addr = ref "127.0.0.1" +let port = ref 8080 +let path = ref "/clock" + +let bufsize = 1024 + +let () = + Arg.parse (Arg.align [ + "-h", Arg.Set_string addr, " address to connect to"; + "-p", Arg.Set_int port, " port to connect to"; + "--alarm", Arg.Int (fun i->Unix.alarm i|>ignore), " set alarm (in seconds)"; + ]) (fun s -> path := s) "sse_client [opt]* path?"; + + Format.printf "connect to %s:%d@." !addr !port; + let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + Unix.connect sock (Unix.ADDR_INET (Unix.inet_addr_of_string !addr, !port)); + Unix.setsockopt sock Unix.TCP_NODELAY false; + + let ic = Unix.in_channel_of_descr sock in + let oc = Unix.out_channel_of_descr sock in + Printf.fprintf oc "GET %s HTTP/1.1\r\nHost: localhost\r\n\r\n" !path; + flush oc; + + let continue = ref true in + let buf = Bytes.create bufsize in + while !continue do + let n = input ic buf 0 bufsize in + if n=0 then continue := false; + output stdout buf 0 n; flush stdout + done; + Format.printf "exit!@." diff --git a/examples/sse_server.ml b/examples/sse_server.ml new file mode 100644 index 00000000..cf30c95e --- /dev/null +++ b/examples/sse_server.ml @@ -0,0 +1,54 @@ + +(* serves some streams of events *) + +module S = Tiny_httpd + +let port = ref 8080 + +let () = + Arg.parse (Arg.align [ + "-p", Arg.Set_int port, " port to listen on"; + "--debug", Arg.Bool S._enable_debug, " toggle debug"; + ]) (fun _ -> ()) "sse_clock [opt*]"; + let server = S.create ~port:!port () in + + let extra_headers = [ + "Access-Control-Allow-Origin", "*"; + "Access-Control-Allow-Methods", "POST, GET, OPTIONS"; + ] in + + (* tick/tock goes the clock *) + S.add_route_server_sent_handler server S.Route.(exact "clock" @/ return) + (fun _req (module EV : S.SERVER_SENT_GENERATOR) -> + S._debug (fun k->k"new connection"); + EV.set_headers extra_headers; + let tick = ref true in + while true do + let now = Ptime_clock.now() in + S._debug (fun k->k"send clock ev %s" (Format.asprintf "%a" Ptime.pp now)); + EV.send_event ~event:(if !tick then "tick" else "tock") + ~data:(Ptime.to_rfc3339 now) (); + tick := not !tick; + + Unix.sleepf 1.0; + done; + ); + + (* just count *) + S.add_route_server_sent_handler server S.Route.(exact "count" @/ return) + (fun _req (module EV : S.SERVER_SENT_GENERATOR) -> + let n = ref 0 in + while true do + EV.send_event ~data:(string_of_int !n) (); + incr n; + Unix.sleepf 0.1; + done; + ); + + Printf.printf "listening on http://localhost:%d/\n%!" (S.port server); + match S.run server with + | Ok () -> () + | Error e -> + Printf.eprintf "error: %s\n%!" (Printexc.to_string e); exit 1 + + diff --git a/examples/test_output.txt.expected b/examples/test_output.txt.expected new file mode 100644 index 00000000..5adb0103 --- /dev/null +++ b/examples/test_output.txt.expected @@ -0,0 +1,26 @@ +listening on http://localhost:8082/ +connect to 127.0.0.1:8082 +HTTP/1.1 200 OK +content-type: text/event-stream + +data: 0 + +data: 1 + +data: 2 + +data: 3 + +data: 4 + +data: 5 + +data: 6 + +data: 7 + +data: 8 + +data: 9 + +success diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index 0f18b7c8..eec0ccda 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -576,7 +576,7 @@ end *) module Response = struct - type body = [`String of string | `Stream of byte_stream] + type body = [`String of string | `Stream of byte_stream | `Void] type t = { code: Response_code.t; headers: Headers.t; @@ -595,6 +595,9 @@ module Response = struct let headers = Headers.set "Transfer-Encoding" "chunked" headers in { code; headers; body=`Stream body; } + let make_void ?(headers=[]) ~code () : t = + { code; headers; body=`Void; } + let make_string ?headers r = match r with | Ok body -> make_raw ?headers ~code:200 body | Error (code,msg) -> make_raw ?headers ~code msg @@ -606,6 +609,7 @@ module Response = struct let make ?headers r : t = match r with | Ok (`String body) -> make_raw ?headers ~code:200 body | Ok (`Stream body) -> make_raw_stream ?headers ~code:200 body + | Ok `Void -> make_void ?headers ~code:200 () | Error (code,msg) -> make_raw ?headers ~code msg let fail ?headers ~code fmt = @@ -617,6 +621,7 @@ module Response = struct let pp_body out = function | `String s -> Format.fprintf out "%S" s | `Stream _ -> Format.pp_print_string out "" + | `Void -> () in Format.fprintf out "{@[code=%d;@ headers=[@[%a@]];@ body=%a@]}" self.code Headers.pp self.headers pp_body self.body @@ -645,6 +650,7 @@ module Response = struct `Stream (Byte_stream.of_string s), true | `String _ as b -> b, false | `Stream _ as b -> b, true + | `Void as b -> b, false in let headers = if is_chunked then ( @@ -659,7 +665,7 @@ module Response = struct List.iter (fun (k,v) -> Printf.fprintf oc "%s: %s\r\n" k v) headers; output_string oc "\r\n"; begin match body with - | `String "" -> () + | `String "" | `Void -> () | `String s -> output_string oc s; | `Stream str -> output_stream_chunked_ oc str; end; @@ -695,8 +701,6 @@ module Sem_ = struct Mutex.unlock t.mutex end -type cb_path_handler = byte_stream Request.t -> Response.t - module Route = struct type path = string list (* split on '/' *) @@ -782,18 +786,53 @@ module Route = struct let pp out x = Format.pp_print_string out (to_string x) end +(* a request handler. handles a single request. *) +type cb_path_handler = + out_channel -> + byte_stream Request.t -> + resp:(Response.t -> unit) -> + unit + +module type SERVER_SENT_GENERATOR = sig + val set_headers : Headers.t -> unit + val send_event : + ?event:string -> + ?id:string -> + ?retry:string -> + data:string -> + unit -> unit +end +type server_sent_generator = (module SERVER_SENT_GENERATOR) + type t = { addr: string; + port: int; + sem_max_connections: Sem_.t; + (* semaphore to restrict the number of active concurrent connections *) + new_thread: (unit -> unit) -> unit; + (* a function to run the given callback in a separate thread (or thread pool) *) + masksigpipe: bool; + mutable handler: (string Request.t -> Response.t); + (* toplevel handler, if any *) + mutable path_handlers : (unit Request.t -> cb_path_handler resp_result option) list; + (* path handlers *) + mutable cb_decode_req: (unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) list; + (* middleware to decode requests *) + mutable cb_encode_resp: (unit Request.t -> Response.t -> Response.t option) list; + (* middleware to encode responses *) + mutable running: bool; + (* true while the server is running. no need to protect with a mutex, + writes should be atomic enough. *) } let addr self = self.addr @@ -814,7 +853,7 @@ let add_path_handler_ | handler -> (* we have a handler, do we accept the request based on its headers? *) begin match accept req with - | Ok () -> Some (Ok (fun req -> handler @@ tr_req req)) + | Ok () -> Some (Ok (fun _oc req ~resp -> resp (handler (tr_req req)))) | Error _ as e -> Some e end | exception _ -> @@ -831,6 +870,9 @@ let add_path_handler ?accept ?meth self fmt f = let add_path_handler_stream ?accept ?meth self fmt f = add_path_handler_ ?accept ?meth ~tr_req:(fun x->x) self fmt f +(* route the given handler. + @param tr_req wraps the actual concrete function returned by the route + and makes it into a handler. *) let add_route_handler_ ?(accept=fun _req -> Ok ()) ?meth ~tr_req self (route:_ Route.t) f = @@ -842,7 +884,7 @@ let add_route_handler_ | Some handler -> (* we have a handler, do we accept the request based on its headers? *) begin match accept req with - | Ok () -> Some (Ok (fun req -> handler @@ tr_req req)) + | Ok () -> Some (Ok (fun oc req ~resp -> tr_req oc req ~resp handler)) | Error _ as e -> Some e end | None -> @@ -851,11 +893,55 @@ let add_route_handler_ in self.path_handlers <- ph :: self.path_handlers -let add_route_handler ?accept ?meth self route f = - add_route_handler_ ?accept ?meth ~tr_req:Request.read_body_full self route f +let add_route_handler (type a) ?accept ?meth self (route:(a,_) Route.t) (f:_) : unit = + let tr_req _oc req ~resp f = resp (f (Request.read_body_full req)) in + add_route_handler_ ?accept ?meth self route ~tr_req f let add_route_handler_stream ?accept ?meth self route f = - add_route_handler_ ?accept ?meth ~tr_req:(fun x->x) self route f + let tr_req _oc req ~resp f = resp (f req) in + add_route_handler_ ?accept ?meth self route ~tr_req f + +let[@inline] _opt_iter ~f o = match o with + | None -> () + | Some x -> f x + +let add_route_server_sent_handler ?accept self route f = + let tr_req oc req ~resp f = + let req = Request.read_body_full req in + let headers = ref Headers.(empty |> set "content-type" "text/event-stream") in + + (* send response once *) + let resp_sent = ref false in + let send_response_idempotent_ () = + if not !resp_sent then ( + resp_sent := true; + (* send 200 response now *) + let initial_resp = Response.make_void ~headers:!headers ~code:200 () in + resp initial_resp; + ) + in + + let send_event ?event ?id ?retry ~data () : unit = + send_response_idempotent_(); + _opt_iter event ~f:(fun e -> Printf.fprintf oc "data: %s\n" e); + _opt_iter id ~f:(fun e -> Printf.fprintf oc "id: %s\n" e); + _opt_iter retry ~f:(fun e -> Printf.fprintf oc "retry: %s\n" e); + let l = String.split_on_char '\n' data in + List.iter (fun s -> Printf.fprintf oc "data: %s\n" s) l; + output_string oc "\n"; (* finish group *) + flush oc + in + let module SSG = struct + let set_headers h = + if not !resp_sent then ( + headers := List.rev_append h !headers; + send_response_idempotent_() + ) + let send_event = send_event + end in + f req (module SSG : SERVER_SENT_GENERATOR); + in + add_route_handler_ self ?accept ~meth:`GET route ~tr_req f let create ?(masksigpipe=true) @@ -890,67 +976,82 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit = while !continue && self.running do _debug (fun k->k "read next request"); match Request.parse_req_start ~buf is with - | Ok None -> continue := false + | Ok None -> + continue := false (* client is done *) + | Error (c,s) -> + (* connection error, close *) let res = Response.make_raw ~code:c s in begin try Response.output_ oc res with Sys_error _ -> () end; continue := false + | Ok (Some req) -> _debug (fun k->k "req: %s" (Format.asprintf "@[%a@]" Request.pp_ req)); - let res = - try - (* is there a handler for this path? *) - let handler = - match find_map (fun ph -> ph req) self.path_handlers with - | Some f -> unwrap_resp_result f - | None -> (fun req -> self.handler @@ Request.read_body_full req) - in - (* handle expectations *) - begin match Request.get_header ~f:String.trim req "Expect" with - | Some "100-continue" -> - _debug (fun k->k "send back: 100 CONTINUE"); - Response.output_ oc (Response.make_raw ~code:100 ""); - | Some s -> bad_reqf 417 "unknown expectation %s" s - | None -> () - end; - (* preprocess request's input stream *) - let req0, tr_stream = - List.fold_left - (fun (req,tr) cb -> - match cb req with - | None -> req, tr - | Some (r',f) -> r', (fun is -> tr is |> f)) - (req, (fun is->is)) self.cb_decode_req - in - (* now actually read request's body *) - let req = - Request.parse_body_ ~tr_stream ~buf {req0 with body=is} - |> unwrap_resp_result - in - let resp = handler req in - (* post-process response *) + + try + (* is there a handler for this path? *) + let handler = + match find_map (fun ph -> ph req) self.path_handlers with + | Some f -> unwrap_resp_result f + | None -> (fun _oc req ~resp -> resp (self.handler (Request.read_body_full req))) + in + + (* handle expect/continue *) + begin match Request.get_header ~f:String.trim req "Expect" with + | Some "100-continue" -> + _debug (fun k->k "send back: 100 CONTINUE"); + Response.output_ oc (Response.make_raw ~code:100 ""); + | Some s -> bad_reqf 417 "unknown expectation %s" s + | None -> () + end; + + (* preprocess request's input stream *) + let req0, tr_stream = + List.fold_left + (fun (req,tr) cb -> + match cb req with + | None -> req, tr + | Some (r',f) -> r', (fun is -> tr is |> f)) + (req, (fun is->is)) self.cb_decode_req + in + (* now actually read request's body into a stream *) + let req = + Request.parse_body_ ~tr_stream ~buf {req0 with body=is} + |> unwrap_resp_result + in + + (* how to post-process response accordingly *) + let post_process_resp resp = List.fold_left (fun resp cb -> match cb req0 resp with None -> resp | Some r' -> r') resp self.cb_encode_resp - with - | Bad_req (code,s) -> - continue := false; - Response.make_raw ~code s - | e -> - Response.fail ~code:500 "server error: %s" (Printexc.to_string e) - in - begin - try Response.output_ oc res - with Sys_error _ -> continue := false - end - | exception Bad_req (code,s) -> - Response.output_ oc (Response.make_raw ~code s); - continue := false - | exception Sys_error _ -> - continue := false; (* connection broken somehow *) + in + + (* how to reply *) + let resp r = + try + let r = post_process_resp r in + Response.output_ oc r + with Sys_error _ -> continue := false + in + + (* call handler *) + begin + try handler oc req ~resp + with Sys_error _ -> continue := false + end + with + | Sys_error _ -> + continue := false; (* connection broken somehow *) + | Bad_req (code,s) -> + continue := false; + Response.output_ oc @@ Response.make_raw ~code s + | e -> + continue := false; + Response.output_ oc @@ Response.fail ~code:500 "server error: %s" (Printexc.to_string e) done; _debug (fun k->k "done with client, exiting"); (try Unix.close client_sock diff --git a/src/Tiny_httpd.mli b/src/Tiny_httpd.mli index c98c0938..45024687 100644 --- a/src/Tiny_httpd.mli +++ b/src/Tiny_httpd.mli @@ -15,15 +15,19 @@ module S = Tiny_httpd let () = let server = S.create () in + (* say hello *) S.add_route_handler ~meth:`GET server S.Route.(exact "hello" @/ string @/ return) (fun name _req -> S.Response.make_string (Ok ("hello " ^name ^"!\n"))); + (* echo request *) S.add_route_handler server S.Route.(exact "echo" @/ return) (fun req -> S.Response.make_string (Ok (Format.asprintf "echo:@ %a@." S.Request.pp req))); + + (* file upload *) S.add_route_handler ~meth:`PUT server S.Route.(exact "upload" @/ string_urlencoded @/ return) (fun path req -> @@ -36,6 +40,8 @@ let () = S.Response.fail ~code:500 "couldn't upload file: %s" (Printexc.to_string e) ); + + (* run the server *) Printf.printf "listening on http://%s:%d\n%!" (S.addr server) (S.port server); match S.run server with | Ok () -> () @@ -308,9 +314,9 @@ end the client to answer a {!Request.t}*) module Response : sig - type body = [`String of string | `Stream of byte_stream] + type body = [`String of string | `Stream of byte_stream | `Void] (** Body of a response, either as a simple string, - or a stream of bytes. *) + or a stream of bytes, or nothing (for server-sent events). *) type t = { code: Response_code.t; (** HTTP response code. See {!Response_code}. *) @@ -420,7 +426,7 @@ module Route : sig @since 0.7 *) end -(** {2 Server} *) +(** {2 Main Server type} *) type t (** A HTTP server. See {!create} for more details. *) @@ -480,6 +486,8 @@ val add_encode_response_cb: as well as the current response. *) +(** {2 Request handlers} *) + val set_top_handler : t -> (string Request.t -> Response.t) -> unit (** Setup a handler called by default. @@ -507,6 +515,7 @@ val add_route_handler : its content is too big, or for some permission error). See the {!http_of_dir} program for an example of how to use [accept] to filter uploads that are too large before the upload even starts. + The default always returns [Ok()], i.e. it accepts all requests. @since 0.6 *) @@ -554,6 +563,59 @@ val add_path_handler_stream : json decoder (such as [Jsonm]) or into a file. @since 0.3 *) +(** {2 Server-sent events} + + {b EXPERIMENTAL}: this API is not stable yet. *) + +(** A server-side function to generate of Server-sent events. + + See {{: https://html.spec.whatwg.org/multipage/server-sent-events.html} the w3c page} + and {{: https://jvns.ca/blog/2021/01/12/day-36--server-sent-events-are-cool--and-a-fun-bug/} + this blog post}. + + @since NEXT_RELEASE + *) +module type SERVER_SENT_GENERATOR = sig + val set_headers : Headers.t -> unit + (** Set headers of the response. + This is not mandatory but if used at all, it must be called before + any call to {!send_event} (once events are sent the response is + already sent too). *) + + val send_event : + ?event:string -> + ?id:string -> + ?retry:string -> + data:string -> + unit -> unit + (** Send an event from the server. + If data is a multiline string, it will be sent on separate "data:" lines. *) +end + +type server_sent_generator = (module SERVER_SENT_GENERATOR) +(** Server-sent event generator + @since NEXT_RELEASE *) + +val add_route_server_sent_handler : + ?accept:(unit Request.t -> (unit, Response_code.t * string) result) -> + t -> + ('a, string Request.t -> server_sent_generator -> unit) Route.t -> 'a -> + unit +(** Add a handler on an endpoint, that serves server-sent events. + + The callback is given a generator that can be used to send events + as it pleases. The connection is always closed by the client, + and the accepted method is always [GET]. + This will set the header "content-type" to "text/event-stream" automatically + and reply with a 200 immediately. + See {!server_sent_generator} for more details. + + This handler stays on the original thread (it is synchronous). + + @since NEXT_RELEASE *) + +(** {2 Run the server} *) + val stop : t -> unit (** Ask the server to stop. This might not have an immediate effect as {!run} might currently be waiting on IO. *) diff --git a/src/camlzip/Tiny_httpd_camlzip.ml b/src/camlzip/Tiny_httpd_camlzip.ml index a2076e35..0ff8c20e 100644 --- a/src/camlzip/Tiny_httpd_camlzip.ml +++ b/src/camlzip/Tiny_httpd_camlzip.ml @@ -189,7 +189,7 @@ let cb_encode_compressed_stream headers= set_headers resp.headers; body=`Stream (encode_deflate_stream_ ~buf_size str); } - | `String _ -> None + | `String _ | `Void -> None ) else None let setup diff --git a/src/camlzip/dune b/src/camlzip/dune index 22927cbd..b0e3ab5e 100644 --- a/src/camlzip/dune +++ b/src/camlzip/dune @@ -3,5 +3,5 @@ (name tiny_httpd_camlzip) (public_name tiny_httpd_camlzip) (synopsis "A wrapper around camlzip to bring compression to Tiny_httpd") - (flags :standard -safe-string -warn-error -a) + (flags :standard -safe-string -warn-error -a+8) (libraries tiny_httpd camlzip)) diff --git a/tiny_httpd.opam b/tiny_httpd.opam index 095ac64c..92d51273 100644 --- a/tiny_httpd.opam +++ b/tiny_httpd.opam @@ -17,6 +17,7 @@ depends: [ "qtest" { >= "2.9" & with-test} "qcheck" {with-test & >= "0.9" } "ounit2" {with-test} + "ptime" {with-test} ] tags: [ "http" "thread" "server" "tiny_httpd" "http_of_dir" "simplehttpserver" ] homepage: "https://github.com/c-cube/tiny_httpd/"