From 2e0f08366fc6c6501915aa1c9d7609a3aee955a8 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 16 Jul 2021 01:12:39 -0400 Subject: [PATCH] wip: feat: server-sent events, and asynchronous handlers --- src/Tiny_httpd.ml | 244 ++++++++++++++++++++++++++++++++++----------- src/Tiny_httpd.mli | 113 ++++++++++++++++++++- 2 files changed, 296 insertions(+), 61 deletions(-) diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index 0f18b7c8..e345f6c4 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -695,8 +695,6 @@ module Sem_ = struct Mutex.unlock t.mutex end -type cb_path_handler = byte_stream Request.t -> Response.t - module Route = struct type path = string list (* split on '/' *) @@ -782,18 +780,58 @@ module Route = struct let pp out x = Format.pp_print_string out (to_string x) end +module type ASYNC_HANDLER_ARG = sig + val write : bytes -> int -> int -> unit + val set_response : Response.t -> unit + val close : unit -> unit +end + +(* a handler, either synchronous (blocks the thread), + or asynchronous (frees the thread and replies from another thread + as it goes) *) +type cb_path_handler = + | CB_sync of (byte_stream Request.t -> Response.t) + | CB_async of (byte_stream Request.t -> (module ASYNC_HANDLER_ARG) -> unit) + +module type SERVER_SENT_GENERATOR = sig + val send_event : + ?event:string -> + ?id:string -> + ?retry:string -> + data:string -> + unit -> unit +end +type server_sent_generator = (module SERVER_SENT_GENERATOR) + type t = { addr: string; + port: int; + sem_max_connections: Sem_.t; + (* semaphore to restrict the number of active concurrent connections *) + new_thread: (unit -> unit) -> unit; + (* a function to run the given callback in a separate thread (or thread pool) *) + masksigpipe: bool; + mutable handler: (string Request.t -> Response.t); - mutable path_handlers : (unit Request.t -> cb_path_handler resp_result option) list; + (* toplevel handler, if any *) + + mutable path_handlers : (unit Request.t -> out_channel -> cb_path_handler resp_result option) list; + (* path handlers *) + mutable cb_decode_req: (unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) list; + (* middleware to decode requests *) + mutable cb_encode_resp: (unit Request.t -> Response.t -> Response.t option) list; + (* middleware to encode responses *) + mutable running: bool; + (* true while the server is running. no need to protect with a mutex, + writes should be atomic enough. *) } let addr self = self.addr @@ -806,7 +844,7 @@ let set_top_handler self f = self.handler <- f let add_path_handler_ ?(accept=fun _req -> Ok ()) ?meth ~tr_req self fmt f = - let ph req : cb_path_handler resp_result option = + let ph req _oc : cb_path_handler resp_result option = match meth with | Some m when m <> req.Request.meth -> None (* ignore *) | _ -> @@ -814,7 +852,7 @@ let add_path_handler_ | handler -> (* we have a handler, do we accept the request based on its headers? *) begin match accept req with - | Ok () -> Some (Ok (fun req -> handler @@ tr_req req)) + | Ok () -> Some (Ok (CB_sync (fun req -> handler @@ tr_req req))) | Error _ as e -> Some e end | exception _ -> @@ -831,10 +869,11 @@ let add_path_handler ?accept ?meth self fmt f = let add_path_handler_stream ?accept ?meth self fmt f = add_path_handler_ ?accept ?meth ~tr_req:(fun x->x) self fmt f +(* route the given handler *) let add_route_handler_ ?(accept=fun _req -> Ok ()) - ?meth ~tr_req self (route:_ Route.t) f = - let ph req : cb_path_handler resp_result option = + ?meth self (route:_ Route.t) mk_cb f = + let ph req oc : cb_path_handler resp_result option = match meth with | Some m when m <> req.Request.meth -> None (* ignore *) | _ -> @@ -842,7 +881,7 @@ let add_route_handler_ | Some handler -> (* we have a handler, do we accept the request based on its headers? *) begin match accept req with - | Ok () -> Some (Ok (fun req -> handler @@ tr_req req)) + | Ok () -> Some (Ok (mk_cb oc handler)) | Error _ as e -> Some e end | None -> @@ -851,11 +890,59 @@ let add_route_handler_ in self.path_handlers <- ph :: self.path_handlers -let add_route_handler ?accept ?meth self route f = - add_route_handler_ ?accept ?meth ~tr_req:Request.read_body_full self route f +let add_route_handler (type a) ?accept ?meth self (route:(a,_) Route.t) (f:_) : unit = + let mk_cb _oc f = CB_sync (fun req -> f (Request.read_body_full req)) in + add_route_handler_ ?accept ?meth self route mk_cb f let add_route_handler_stream ?accept ?meth self route f = - add_route_handler_ ?accept ?meth ~tr_req:(fun x->x) self route f + let mk_cb _oc f = CB_sync f in + add_route_handler_ ?accept ?meth self route mk_cb f + +(* TODO *) +let add_route_async_stream_handler ?accept ?meth self route f = + let mk_cb _oc f : unit = CB_async f in + add_route_handler_ ?accept ?meth self route mk_cb f + +let add_route_async_handler ?accept ?meth self route f = + let f req arg _oc : unit = f (Request.read_body_full req) arg in + add_route_handler_ ?accept ?meth self route (CB_async f) + +module Make_SSGEN(Conn : sig val oc: out_channel end) = struct + open Conn + + let initial_resp () = + let headers = Headers.(empty |> set "content-type" "text/event-stream") in + Response.make_raw ~headers ~code:200 "" + + let _opt_iter ~f o = match o with + | None -> () + | Some x -> f x + + let send_event ?event ?id ?retry ~data () : unit = + _opt_iter event ~f:(fun e -> Printf.fprintf oc "data: %s\n" e); + _opt_iter id ~f:(fun e -> Printf.fprintf oc "id: %s\n" e); + _opt_iter retry ~f:(fun e -> Printf.fprintf oc "retry: %s\n" e); + let l = String.split_on_char '\n' data in + List.iter (fun s -> Printf.fprintf oc "data: %s\n" s) l; + output_string oc "\n"; (* finish group *) + flush oc +end + +let add_route_server_sent_handler ?accept self route f = + let handle_req _req oc = + let module SSGEN = Make_SSGEN(struct let oc=oc end) in + Response.output_ oc @@ SSGEN.initial_resp(); + f (module SSGEN : SERVER_SENT_GENERATOR) + in + add_route_handler_ self ?accept ~meth:`GET route handle_req + +let add_route_server_sent_async_handler ?accept self route f = + let handle_req _req (module A:ASYNC_HANDLER_ARG) oc = + let module SSGEN = Make_SSGEN(struct let oc=oc end) in + A.set_response @@ SSGEN.initial_resp(); + f (module SSGEN : SERVER_SENT_GENERATOR) + in + add_route_handler_ self ?accept ~meth:`GET route handle_req let create ?(masksigpipe=true) @@ -890,67 +977,104 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit = while !continue && self.running do _debug (fun k->k "read next request"); match Request.parse_req_start ~buf is with - | Ok None -> continue := false + | Ok None -> + continue := false (* client is done *) + | Error (c,s) -> + (* connection error, close *) let res = Response.make_raw ~code:c s in begin try Response.output_ oc res with Sys_error _ -> () end; continue := false + | Ok (Some req) -> _debug (fun k->k "req: %s" (Format.asprintf "@[%a@]" Request.pp_ req)); - let res = - try - (* is there a handler for this path? *) - let handler = - match find_map (fun ph -> ph req) self.path_handlers with - | Some f -> unwrap_resp_result f - | None -> (fun req -> self.handler @@ Request.read_body_full req) - in - (* handle expectations *) - begin match Request.get_header ~f:String.trim req "Expect" with - | Some "100-continue" -> - _debug (fun k->k "send back: 100 CONTINUE"); - Response.output_ oc (Response.make_raw ~code:100 ""); - | Some s -> bad_reqf 417 "unknown expectation %s" s - | None -> () - end; - (* preprocess request's input stream *) - let req0, tr_stream = - List.fold_left - (fun (req,tr) cb -> - match cb req with - | None -> req, tr - | Some (r',f) -> r', (fun is -> tr is |> f)) - (req, (fun is->is)) self.cb_decode_req - in - (* now actually read request's body *) - let req = - Request.parse_body_ ~tr_stream ~buf {req0 with body=is} - |> unwrap_resp_result - in - let resp = handler req in - (* post-process response *) + + try + (* is there a handler for this path? *) + let handler = + match find_map (fun ph -> ph req) self.path_handlers with + | Some f -> unwrap_resp_result f + | None -> CB_sync (fun req _oc -> self.handler (Request.read_body_full req)) + in + + (* handle expect/continue *) + begin match Request.get_header ~f:String.trim req "Expect" with + | Some "100-continue" -> + _debug (fun k->k "send back: 100 CONTINUE"); + Response.output_ oc (Response.make_raw ~code:100 ""); + | Some s -> bad_reqf 417 "unknown expectation %s" s + | None -> () + end; + + (* preprocess request's input stream *) + let req0, tr_stream = + List.fold_left + (fun (req,tr) cb -> + match cb req with + | None -> req, tr + | Some (r',f) -> r', (fun is -> tr is |> f)) + (req, (fun is->is)) self.cb_decode_req + in + (* now actually read request's body into a stream *) + let req = + Request.parse_body_ ~tr_stream ~buf {req0 with body=is} + |> unwrap_resp_result + in + + (* how to post-process response accordingly *) + let post_process_resp resp = List.fold_left (fun resp cb -> match cb req0 resp with None -> resp | Some r' -> r') resp self.cb_encode_resp - with - | Bad_req (code,s) -> - continue := false; - Response.make_raw ~code s - | e -> - Response.fail ~code:500 "server error: %s" (Printexc.to_string e) - in - begin - try Response.output_ oc res - with Sys_error _ -> continue := false - end - | exception Bad_req (code,s) -> - Response.output_ oc (Response.make_raw ~code s); - continue := false - | exception Sys_error _ -> - continue := false; (* connection broken somehow *) + in + + begin match handler with + | CB_sync h -> + (* compute response *) + let resp = h req oc |> post_process_resp in + + (* send response back *) + begin + try Response.output_ oc resp + with Sys_error _ -> continue := false + end + + | CB_async h -> + + (* bundle state to be able to send reply *) + let is_done = ref false in + let module M = struct + let write b i len = output oc b i len + let set_response r = + if !is_done then failwith "async handler: done writing response"; + is_done := true; + let r = post_process_resp r in + Response.output_ oc r + let close () = is_done := true; close_out_noerr oc + end in + let arg = (module M : ASYNC_HANDLER_ARG) in + + (* last query we handle on this connection, too complicated otherwise *) + continue := false; + + (* make sure we close the connection if it is GC'ed *) + let dispose (module M:ASYNC_HANDLER_ARG) = M.close() in + Gc.finalise dispose arg; + + h req arg oc + end + with + | Sys_error _ -> + continue := false; (* connection broken somehow *) + | Bad_req (code,s) -> + continue := false; + Response.output_ oc @@ Response.make_raw ~code s + | e -> + continue := false; + Response.output_ oc @@ Response.fail ~code:500 "server error: %s" (Printexc.to_string e) done; _debug (fun k->k "done with client, exiting"); (try Unix.close client_sock diff --git a/src/Tiny_httpd.mli b/src/Tiny_httpd.mli index c98c0938..87be8eae 100644 --- a/src/Tiny_httpd.mli +++ b/src/Tiny_httpd.mli @@ -420,7 +420,7 @@ module Route : sig @since 0.7 *) end -(** {2 Server} *) +(** {2 Main Server type} *) type t (** A HTTP server. See {!create} for more details. *) @@ -480,6 +480,8 @@ val add_encode_response_cb: as well as the current response. *) +(** {2 Request handlers} *) + val set_top_handler : t -> (string Request.t -> Response.t) -> unit (** Setup a handler called by default. @@ -507,6 +509,7 @@ val add_route_handler : its content is too big, or for some permission error). See the {!http_of_dir} program for an example of how to use [accept] to filter uploads that are too large before the upload even starts. + The default always returns [Ok()], i.e. it accepts all requests. @since 0.6 *) @@ -554,6 +557,114 @@ val add_path_handler_stream : json decoder (such as [Jsonm]) or into a file. @since 0.3 *) +(** {2 Async handler} + + {b EXPERIMENTAL}: this API is not stable yet. *) + +(** Async handler arguments + + Async handlers are handlers that do not block a thread. + Instead they receive this argument, and can call the functions + when they want. The functions might be registered in some event loop, + for example, or stashed in a table only to be called by a thread or thread + pool later when some event happened. *) +module type ASYNC_HANDLER_ARG = sig + val write : bytes -> int -> int -> unit + (** Write some data on the socket. + + Note that this is still done using blocking IO. *) + + val set_response : Response.t -> unit + (** Write response. More data can still be written. + The writing is done using blocking IO. *) + + val close : unit -> unit + (** Close connection. This is idempotent and failures will not be + reported here. *) +end + +val add_route_async_handler : + ?accept:(unit Request.t -> (unit, Response_code.t * string) result) -> + ?meth:Meth.t -> + t -> + ('a, string Request.t -> (module ASYNC_HANDLER_ARG) -> unit) Route.t -> 'a -> + unit +(** Add a route handler that replies asynchronously. + + See {!ASYNC_HANDLER_ARG} for some details on how the handler might + generate a response, write data, etc. + + @since NEXT_RELEASE *) + +val add_route_async_stream_handler : + ?accept:(unit Request.t -> (unit, Response_code.t * string) result) -> + ?meth:Meth.t -> + t -> + ('a, byte_stream Request.t -> (module ASYNC_HANDLER_ARG) -> unit) Route.t -> 'a -> + unit +(** Like {!add_route_async_handler} but takes the request body as a stream, + not a string. + @since NEXT_RELEASE *) + +(** {2 Server-sent events} + + {b EXPERIMENTAL}: this API is not stable yet. *) + +(** A server-side function to generate of Server-sent events. + + See {{: https://html.spec.whatwg.org/multipage/server-sent-events.html} the w3c page} + and {{: https://jvns.ca/blog/2021/01/12/day-36--server-sent-events-are-cool--and-a-fun-bug/} + this blog post}. + + @since NEXT_RELEASE + *) +module type SERVER_SENT_GENERATOR = sig + val send_event : + ?event:string -> + ?id:string -> + ?retry:string -> + data:string -> + unit -> unit + (** Send an event from the server. + If data is a multiline string, it will be sent on separate "data:" lines. *) +end + +type server_sent_generator = (module SERVER_SENT_GENERATOR) +(** Server-sent event generator + @since NEXT_RELEASE *) + +val add_route_server_sent_handler : + ?accept:(unit Request.t -> (unit, Response_code.t * string) result) -> + t -> + ('a, string Request.t -> server_sent_generator -> unit) Route.t -> 'a -> + unit +(** Add a handler on an endpoint, that serves server-sent events. + + The callback is given a generator that can be used to send events + as it pleases. The connection is always closed by the client, + and the accepted method is always [GET]. + This will set the header "content-type" to "text/event-stream" automatically + and reply with a 200 immediately. + See {!server_sent_generator} for more details. + + This handler stays on the original thread (it is synchronous). + + @since NEXT_RELEASE *) + +val add_route_server_sent_async_handler : + ?accept:(unit Request.t -> (unit, Response_code.t * string) result) -> + t -> + ('a, string Request.t -> server_sent_generator -> unit) Route.t -> 'a -> + unit +(** Similar to {!add_route_server_sent_handler}, but the thread quits immediately + after responding "200", and the rest of the data (the events) are made + from the callback from somewhere else. + Note that the IO are still blocking. + + @since NEXT_RELEASE *) + +(** {2 Run the server} *) + val stop : t -> unit (** Ask the server to stop. This might not have an immediate effect as {!run} might currently be waiting on IO. *)