wip: feat: server-sent events, and asynchronous handlers

This commit is contained in:
Simon Cruanes 2021-07-16 01:12:39 -04:00
parent 91e9323671
commit 2e0f08366f
2 changed files with 296 additions and 61 deletions

View file

@ -695,8 +695,6 @@ module Sem_ = struct
Mutex.unlock t.mutex
end
type cb_path_handler = byte_stream Request.t -> Response.t
module Route = struct
type path = string list (* split on '/' *)
@ -782,18 +780,58 @@ module Route = struct
let pp out x = Format.pp_print_string out (to_string x)
end
module type ASYNC_HANDLER_ARG = sig
val write : bytes -> int -> int -> unit
val set_response : Response.t -> unit
val close : unit -> unit
end
(* a handler, either synchronous (blocks the thread),
or asynchronous (frees the thread and replies from another thread
as it goes) *)
type cb_path_handler =
| CB_sync of (byte_stream Request.t -> Response.t)
| CB_async of (byte_stream Request.t -> (module ASYNC_HANDLER_ARG) -> unit)
module type SERVER_SENT_GENERATOR = sig
val send_event :
?event:string ->
?id:string ->
?retry:string ->
data:string ->
unit -> unit
end
type server_sent_generator = (module SERVER_SENT_GENERATOR)
type t = {
addr: string;
port: int;
sem_max_connections: Sem_.t;
(* semaphore to restrict the number of active concurrent connections *)
new_thread: (unit -> unit) -> unit;
(* a function to run the given callback in a separate thread (or thread pool) *)
masksigpipe: bool;
mutable handler: (string Request.t -> Response.t);
mutable path_handlers : (unit Request.t -> cb_path_handler resp_result option) list;
(* toplevel handler, if any *)
mutable path_handlers : (unit Request.t -> out_channel -> cb_path_handler resp_result option) list;
(* path handlers *)
mutable cb_decode_req:
(unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) list;
(* middleware to decode requests *)
mutable cb_encode_resp: (unit Request.t -> Response.t -> Response.t option) list;
(* middleware to encode responses *)
mutable running: bool;
(* true while the server is running. no need to protect with a mutex,
writes should be atomic enough. *)
}
let addr self = self.addr
@ -806,7 +844,7 @@ let set_top_handler self f = self.handler <- f
let add_path_handler_
?(accept=fun _req -> Ok ())
?meth ~tr_req self fmt f =
let ph req : cb_path_handler resp_result option =
let ph req _oc : cb_path_handler resp_result option =
match meth with
| Some m when m <> req.Request.meth -> None (* ignore *)
| _ ->
@ -814,7 +852,7 @@ let add_path_handler_
| handler ->
(* we have a handler, do we accept the request based on its headers? *)
begin match accept req with
| Ok () -> Some (Ok (fun req -> handler @@ tr_req req))
| Ok () -> Some (Ok (CB_sync (fun req -> handler @@ tr_req req)))
| Error _ as e -> Some e
end
| exception _ ->
@ -831,10 +869,11 @@ let add_path_handler ?accept ?meth self fmt f =
let add_path_handler_stream ?accept ?meth self fmt f =
add_path_handler_ ?accept ?meth ~tr_req:(fun x->x) self fmt f
(* route the given handler *)
let add_route_handler_
?(accept=fun _req -> Ok ())
?meth ~tr_req self (route:_ Route.t) f =
let ph req : cb_path_handler resp_result option =
?meth self (route:_ Route.t) mk_cb f =
let ph req oc : cb_path_handler resp_result option =
match meth with
| Some m when m <> req.Request.meth -> None (* ignore *)
| _ ->
@ -842,7 +881,7 @@ let add_route_handler_
| Some handler ->
(* we have a handler, do we accept the request based on its headers? *)
begin match accept req with
| Ok () -> Some (Ok (fun req -> handler @@ tr_req req))
| Ok () -> Some (Ok (mk_cb oc handler))
| Error _ as e -> Some e
end
| None ->
@ -851,11 +890,59 @@ let add_route_handler_
in
self.path_handlers <- ph :: self.path_handlers
let add_route_handler ?accept ?meth self route f =
add_route_handler_ ?accept ?meth ~tr_req:Request.read_body_full self route f
let add_route_handler (type a) ?accept ?meth self (route:(a,_) Route.t) (f:_) : unit =
let mk_cb _oc f = CB_sync (fun req -> f (Request.read_body_full req)) in
add_route_handler_ ?accept ?meth self route mk_cb f
let add_route_handler_stream ?accept ?meth self route f =
add_route_handler_ ?accept ?meth ~tr_req:(fun x->x) self route f
let mk_cb _oc f = CB_sync f in
add_route_handler_ ?accept ?meth self route mk_cb f
(* TODO *)
let add_route_async_stream_handler ?accept ?meth self route f =
let mk_cb _oc f : unit = CB_async f in
add_route_handler_ ?accept ?meth self route mk_cb f
let add_route_async_handler ?accept ?meth self route f =
let f req arg _oc : unit = f (Request.read_body_full req) arg in
add_route_handler_ ?accept ?meth self route (CB_async f)
module Make_SSGEN(Conn : sig val oc: out_channel end) = struct
open Conn
let initial_resp () =
let headers = Headers.(empty |> set "content-type" "text/event-stream") in
Response.make_raw ~headers ~code:200 ""
let _opt_iter ~f o = match o with
| None -> ()
| Some x -> f x
let send_event ?event ?id ?retry ~data () : unit =
_opt_iter event ~f:(fun e -> Printf.fprintf oc "data: %s\n" e);
_opt_iter id ~f:(fun e -> Printf.fprintf oc "id: %s\n" e);
_opt_iter retry ~f:(fun e -> Printf.fprintf oc "retry: %s\n" e);
let l = String.split_on_char '\n' data in
List.iter (fun s -> Printf.fprintf oc "data: %s\n" s) l;
output_string oc "\n"; (* finish group *)
flush oc
end
let add_route_server_sent_handler ?accept self route f =
let handle_req _req oc =
let module SSGEN = Make_SSGEN(struct let oc=oc end) in
Response.output_ oc @@ SSGEN.initial_resp();
f (module SSGEN : SERVER_SENT_GENERATOR)
in
add_route_handler_ self ?accept ~meth:`GET route handle_req
let add_route_server_sent_async_handler ?accept self route f =
let handle_req _req (module A:ASYNC_HANDLER_ARG) oc =
let module SSGEN = Make_SSGEN(struct let oc=oc end) in
A.set_response @@ SSGEN.initial_resp();
f (module SSGEN : SERVER_SENT_GENERATOR)
in
add_route_handler_ self ?accept ~meth:`GET route handle_req
let create
?(masksigpipe=true)
@ -890,67 +977,104 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
while !continue && self.running do
_debug (fun k->k "read next request");
match Request.parse_req_start ~buf is with
| Ok None -> continue := false
| Ok None ->
continue := false (* client is done *)
| Error (c,s) ->
(* connection error, close *)
let res = Response.make_raw ~code:c s in
begin
try Response.output_ oc res
with Sys_error _ -> ()
end;
continue := false
| Ok (Some req) ->
_debug (fun k->k "req: %s" (Format.asprintf "@[%a@]" Request.pp_ req));
let res =
try
(* is there a handler for this path? *)
let handler =
match find_map (fun ph -> ph req) self.path_handlers with
| Some f -> unwrap_resp_result f
| None -> (fun req -> self.handler @@ Request.read_body_full req)
in
(* handle expectations *)
begin match Request.get_header ~f:String.trim req "Expect" with
| Some "100-continue" ->
_debug (fun k->k "send back: 100 CONTINUE");
Response.output_ oc (Response.make_raw ~code:100 "");
| Some s -> bad_reqf 417 "unknown expectation %s" s
| None -> ()
end;
(* preprocess request's input stream *)
let req0, tr_stream =
List.fold_left
(fun (req,tr) cb ->
match cb req with
| None -> req, tr
| Some (r',f) -> r', (fun is -> tr is |> f))
(req, (fun is->is)) self.cb_decode_req
in
(* now actually read request's body *)
let req =
Request.parse_body_ ~tr_stream ~buf {req0 with body=is}
|> unwrap_resp_result
in
let resp = handler req in
(* post-process response *)
try
(* is there a handler for this path? *)
let handler =
match find_map (fun ph -> ph req) self.path_handlers with
| Some f -> unwrap_resp_result f
| None -> CB_sync (fun req _oc -> self.handler (Request.read_body_full req))
in
(* handle expect/continue *)
begin match Request.get_header ~f:String.trim req "Expect" with
| Some "100-continue" ->
_debug (fun k->k "send back: 100 CONTINUE");
Response.output_ oc (Response.make_raw ~code:100 "");
| Some s -> bad_reqf 417 "unknown expectation %s" s
| None -> ()
end;
(* preprocess request's input stream *)
let req0, tr_stream =
List.fold_left
(fun (req,tr) cb ->
match cb req with
| None -> req, tr
| Some (r',f) -> r', (fun is -> tr is |> f))
(req, (fun is->is)) self.cb_decode_req
in
(* now actually read request's body into a stream *)
let req =
Request.parse_body_ ~tr_stream ~buf {req0 with body=is}
|> unwrap_resp_result
in
(* how to post-process response accordingly *)
let post_process_resp resp =
List.fold_left
(fun resp cb -> match cb req0 resp with None -> resp | Some r' -> r')
resp self.cb_encode_resp
with
| Bad_req (code,s) ->
continue := false;
Response.make_raw ~code s
| e ->
Response.fail ~code:500 "server error: %s" (Printexc.to_string e)
in
begin
try Response.output_ oc res
with Sys_error _ -> continue := false
end
| exception Bad_req (code,s) ->
Response.output_ oc (Response.make_raw ~code s);
continue := false
| exception Sys_error _ ->
continue := false; (* connection broken somehow *)
in
begin match handler with
| CB_sync h ->
(* compute response *)
let resp = h req oc |> post_process_resp in
(* send response back *)
begin
try Response.output_ oc resp
with Sys_error _ -> continue := false
end
| CB_async h ->
(* bundle state to be able to send reply *)
let is_done = ref false in
let module M = struct
let write b i len = output oc b i len
let set_response r =
if !is_done then failwith "async handler: done writing response";
is_done := true;
let r = post_process_resp r in
Response.output_ oc r
let close () = is_done := true; close_out_noerr oc
end in
let arg = (module M : ASYNC_HANDLER_ARG) in
(* last query we handle on this connection, too complicated otherwise *)
continue := false;
(* make sure we close the connection if it is GC'ed *)
let dispose (module M:ASYNC_HANDLER_ARG) = M.close() in
Gc.finalise dispose arg;
h req arg oc
end
with
| Sys_error _ ->
continue := false; (* connection broken somehow *)
| Bad_req (code,s) ->
continue := false;
Response.output_ oc @@ Response.make_raw ~code s
| e ->
continue := false;
Response.output_ oc @@ Response.fail ~code:500 "server error: %s" (Printexc.to_string e)
done;
_debug (fun k->k "done with client, exiting");
(try Unix.close client_sock

View file

@ -420,7 +420,7 @@ module Route : sig
@since 0.7 *)
end
(** {2 Server} *)
(** {2 Main Server type} *)
type t
(** A HTTP server. See {!create} for more details. *)
@ -480,6 +480,8 @@ val add_encode_response_cb:
as well as the current response.
*)
(** {2 Request handlers} *)
val set_top_handler : t -> (string Request.t -> Response.t) -> unit
(** Setup a handler called by default.
@ -507,6 +509,7 @@ val add_route_handler :
its content is too big, or for some permission error).
See the {!http_of_dir} program for an example of how to use [accept] to
filter uploads that are too large before the upload even starts.
The default always returns [Ok()], i.e. it accepts all requests.
@since 0.6
*)
@ -554,6 +557,114 @@ val add_path_handler_stream :
json decoder (such as [Jsonm]) or into a file.
@since 0.3 *)
(** {2 Async handler}
{b EXPERIMENTAL}: this API is not stable yet. *)
(** Async handler arguments
Async handlers are handlers that do not block a thread.
Instead they receive this argument, and can call the functions
when they want. The functions might be registered in some event loop,
for example, or stashed in a table only to be called by a thread or thread
pool later when some event happened. *)
module type ASYNC_HANDLER_ARG = sig
val write : bytes -> int -> int -> unit
(** Write some data on the socket.
Note that this is still done using blocking IO. *)
val set_response : Response.t -> unit
(** Write response. More data can still be written.
The writing is done using blocking IO. *)
val close : unit -> unit
(** Close connection. This is idempotent and failures will not be
reported here. *)
end
val add_route_async_handler :
?accept:(unit Request.t -> (unit, Response_code.t * string) result) ->
?meth:Meth.t ->
t ->
('a, string Request.t -> (module ASYNC_HANDLER_ARG) -> unit) Route.t -> 'a ->
unit
(** Add a route handler that replies asynchronously.
See {!ASYNC_HANDLER_ARG} for some details on how the handler might
generate a response, write data, etc.
@since NEXT_RELEASE *)
val add_route_async_stream_handler :
?accept:(unit Request.t -> (unit, Response_code.t * string) result) ->
?meth:Meth.t ->
t ->
('a, byte_stream Request.t -> (module ASYNC_HANDLER_ARG) -> unit) Route.t -> 'a ->
unit
(** Like {!add_route_async_handler} but takes the request body as a stream,
not a string.
@since NEXT_RELEASE *)
(** {2 Server-sent events}
{b EXPERIMENTAL}: this API is not stable yet. *)
(** A server-side function to generate of Server-sent events.
See {{: https://html.spec.whatwg.org/multipage/server-sent-events.html} the w3c page}
and {{: https://jvns.ca/blog/2021/01/12/day-36--server-sent-events-are-cool--and-a-fun-bug/}
this blog post}.
@since NEXT_RELEASE
*)
module type SERVER_SENT_GENERATOR = sig
val send_event :
?event:string ->
?id:string ->
?retry:string ->
data:string ->
unit -> unit
(** Send an event from the server.
If data is a multiline string, it will be sent on separate "data:" lines. *)
end
type server_sent_generator = (module SERVER_SENT_GENERATOR)
(** Server-sent event generator
@since NEXT_RELEASE *)
val add_route_server_sent_handler :
?accept:(unit Request.t -> (unit, Response_code.t * string) result) ->
t ->
('a, string Request.t -> server_sent_generator -> unit) Route.t -> 'a ->
unit
(** Add a handler on an endpoint, that serves server-sent events.
The callback is given a generator that can be used to send events
as it pleases. The connection is always closed by the client,
and the accepted method is always [GET].
This will set the header "content-type" to "text/event-stream" automatically
and reply with a 200 immediately.
See {!server_sent_generator} for more details.
This handler stays on the original thread (it is synchronous).
@since NEXT_RELEASE *)
val add_route_server_sent_async_handler :
?accept:(unit Request.t -> (unit, Response_code.t * string) result) ->
t ->
('a, string Request.t -> server_sent_generator -> unit) Route.t -> 'a ->
unit
(** Similar to {!add_route_server_sent_handler}, but the thread quits immediately
after responding "200", and the rest of the data (the events) are made
from the callback from somewhere else.
Note that the IO are still blocking.
@since NEXT_RELEASE *)
(** {2 Run the server} *)
val stop : t -> unit
(** Ask the server to stop. This might not have an immediate effect
as {!run} might currently be waiting on IO. *)