feat: remove async for now, finish server sent events

This commit is contained in:
Simon Cruanes 2021-07-16 09:00:15 -04:00
parent 2e0f08366f
commit 17d8f37c93
4 changed files with 64 additions and 160 deletions

View file

@ -780,18 +780,12 @@ module Route = struct
let pp out x = Format.pp_print_string out (to_string x) let pp out x = Format.pp_print_string out (to_string x)
end end
module type ASYNC_HANDLER_ARG = sig (* a request handler. handles a single request. *)
val write : bytes -> int -> int -> unit
val set_response : Response.t -> unit
val close : unit -> unit
end
(* a handler, either synchronous (blocks the thread),
or asynchronous (frees the thread and replies from another thread
as it goes) *)
type cb_path_handler = type cb_path_handler =
| CB_sync of (byte_stream Request.t -> Response.t) out_channel ->
| CB_async of (byte_stream Request.t -> (module ASYNC_HANDLER_ARG) -> unit) byte_stream Request.t ->
resp:(Response.t -> unit) ->
unit
module type SERVER_SENT_GENERATOR = sig module type SERVER_SENT_GENERATOR = sig
val send_event : val send_event :
@ -819,7 +813,7 @@ type t = {
mutable handler: (string Request.t -> Response.t); mutable handler: (string Request.t -> Response.t);
(* toplevel handler, if any *) (* toplevel handler, if any *)
mutable path_handlers : (unit Request.t -> out_channel -> cb_path_handler resp_result option) list; mutable path_handlers : (unit Request.t -> cb_path_handler resp_result option) list;
(* path handlers *) (* path handlers *)
mutable cb_decode_req: mutable cb_decode_req:
@ -844,7 +838,7 @@ let set_top_handler self f = self.handler <- f
let add_path_handler_ let add_path_handler_
?(accept=fun _req -> Ok ()) ?(accept=fun _req -> Ok ())
?meth ~tr_req self fmt f = ?meth ~tr_req self fmt f =
let ph req _oc : cb_path_handler resp_result option = let ph req : cb_path_handler resp_result option =
match meth with match meth with
| Some m when m <> req.Request.meth -> None (* ignore *) | Some m when m <> req.Request.meth -> None (* ignore *)
| _ -> | _ ->
@ -852,7 +846,7 @@ let add_path_handler_
| handler -> | handler ->
(* we have a handler, do we accept the request based on its headers? *) (* we have a handler, do we accept the request based on its headers? *)
begin match accept req with begin match accept req with
| Ok () -> Some (Ok (CB_sync (fun req -> handler @@ tr_req req))) | Ok () -> Some (Ok (fun _oc req ~resp -> resp (handler (tr_req req))))
| Error _ as e -> Some e | Error _ as e -> Some e
end end
| exception _ -> | exception _ ->
@ -869,11 +863,13 @@ let add_path_handler ?accept ?meth self fmt f =
let add_path_handler_stream ?accept ?meth self fmt f = let add_path_handler_stream ?accept ?meth self fmt f =
add_path_handler_ ?accept ?meth ~tr_req:(fun x->x) self fmt f add_path_handler_ ?accept ?meth ~tr_req:(fun x->x) self fmt f
(* route the given handler *) (* route the given handler.
@param tr_req wraps the actual concrete function returned by the route
and makes it into a handler. *)
let add_route_handler_ let add_route_handler_
?(accept=fun _req -> Ok ()) ?(accept=fun _req -> Ok ())
?meth self (route:_ Route.t) mk_cb f = ?meth ~tr_req self (route:_ Route.t) f =
let ph req oc : cb_path_handler resp_result option = let ph req : cb_path_handler resp_result option =
match meth with match meth with
| Some m when m <> req.Request.meth -> None (* ignore *) | Some m when m <> req.Request.meth -> None (* ignore *)
| _ -> | _ ->
@ -881,7 +877,7 @@ let add_route_handler_
| Some handler -> | Some handler ->
(* we have a handler, do we accept the request based on its headers? *) (* we have a handler, do we accept the request based on its headers? *)
begin match accept req with begin match accept req with
| Ok () -> Some (Ok (mk_cb oc handler)) | Ok () -> Some (Ok (fun oc req ~resp -> tr_req oc req ~resp handler))
| Error _ as e -> Some e | Error _ as e -> Some e
end end
| None -> | None ->
@ -891,33 +887,28 @@ let add_route_handler_
self.path_handlers <- ph :: self.path_handlers self.path_handlers <- ph :: self.path_handlers
let add_route_handler (type a) ?accept ?meth self (route:(a,_) Route.t) (f:_) : unit = let add_route_handler (type a) ?accept ?meth self (route:(a,_) Route.t) (f:_) : unit =
let mk_cb _oc f = CB_sync (fun req -> f (Request.read_body_full req)) in let tr_req _oc req ~resp f = resp (f (Request.read_body_full req)) in
add_route_handler_ ?accept ?meth self route mk_cb f add_route_handler_ ?accept ?meth self route ~tr_req f
let add_route_handler_stream ?accept ?meth self route f = let add_route_handler_stream ?accept ?meth self route f =
let mk_cb _oc f = CB_sync f in let tr_req _oc req ~resp f = resp (f req) in
add_route_handler_ ?accept ?meth self route mk_cb f add_route_handler_ ?accept ?meth self route ~tr_req f
(* TODO *) let[@inline] _opt_iter ~f o = match o with
let add_route_async_stream_handler ?accept ?meth self route f =
let mk_cb _oc f : unit = CB_async f in
add_route_handler_ ?accept ?meth self route mk_cb f
let add_route_async_handler ?accept ?meth self route f =
let f req arg _oc : unit = f (Request.read_body_full req) arg in
add_route_handler_ ?accept ?meth self route (CB_async f)
module Make_SSGEN(Conn : sig val oc: out_channel end) = struct
open Conn
let initial_resp () =
let headers = Headers.(empty |> set "content-type" "text/event-stream") in
Response.make_raw ~headers ~code:200 ""
let _opt_iter ~f o = match o with
| None -> () | None -> ()
| Some x -> f x | Some x -> f x
let add_route_server_sent_handler ?accept self route f =
let tr_req oc req ~resp f =
let req = Request.read_body_full req in
(* send 200 response now *)
let initial_resp =
let headers = Headers.(empty |> set "content-type" "text/event-stream") in
Response.make_raw ~headers ~code:200 ""
in
resp initial_resp;
let send_event ?event ?id ?retry ~data () : unit = let send_event ?event ?id ?retry ~data () : unit =
_opt_iter event ~f:(fun e -> Printf.fprintf oc "data: %s\n" e); _opt_iter event ~f:(fun e -> Printf.fprintf oc "data: %s\n" e);
_opt_iter id ~f:(fun e -> Printf.fprintf oc "id: %s\n" e); _opt_iter id ~f:(fun e -> Printf.fprintf oc "id: %s\n" e);
@ -926,23 +917,13 @@ module Make_SSGEN(Conn : sig val oc: out_channel end) = struct
List.iter (fun s -> Printf.fprintf oc "data: %s\n" s) l; List.iter (fun s -> Printf.fprintf oc "data: %s\n" s) l;
output_string oc "\n"; (* finish group *) output_string oc "\n"; (* finish group *)
flush oc flush oc
end
let add_route_server_sent_handler ?accept self route f =
let handle_req _req oc =
let module SSGEN = Make_SSGEN(struct let oc=oc end) in
Response.output_ oc @@ SSGEN.initial_resp();
f (module SSGEN : SERVER_SENT_GENERATOR)
in in
add_route_handler_ self ?accept ~meth:`GET route handle_req let module SSG = struct
let send_event = send_event
let add_route_server_sent_async_handler ?accept self route f = end in
let handle_req _req (module A:ASYNC_HANDLER_ARG) oc = f req (module SSG : SERVER_SENT_GENERATOR);
let module SSGEN = Make_SSGEN(struct let oc=oc end) in
A.set_response @@ SSGEN.initial_resp();
f (module SSGEN : SERVER_SENT_GENERATOR)
in in
add_route_handler_ self ?accept ~meth:`GET route handle_req add_route_handler_ self ?accept ~meth:`GET route ~tr_req f
let create let create
?(masksigpipe=true) ?(masksigpipe=true)
@ -997,7 +978,7 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
let handler = let handler =
match find_map (fun ph -> ph req) self.path_handlers with match find_map (fun ph -> ph req) self.path_handlers with
| Some f -> unwrap_resp_result f | Some f -> unwrap_resp_result f
| None -> CB_sync (fun req _oc -> self.handler (Request.read_body_full req)) | None -> (fun _oc req ~resp -> resp (self.handler (Request.read_body_full req)))
in in
(* handle expect/continue *) (* handle expect/continue *)
@ -1031,40 +1012,18 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
resp self.cb_encode_resp resp self.cb_encode_resp
in in
begin match handler with (* how to reply *)
| CB_sync h -> let resp r =
(* compute response *) try
let resp = h req oc |> post_process_resp in
(* send response back *)
begin
try Response.output_ oc resp
with Sys_error _ -> continue := false
end
| CB_async h ->
(* bundle state to be able to send reply *)
let is_done = ref false in
let module M = struct
let write b i len = output oc b i len
let set_response r =
if !is_done then failwith "async handler: done writing response";
is_done := true;
let r = post_process_resp r in let r = post_process_resp r in
Response.output_ oc r Response.output_ oc r
let close () = is_done := true; close_out_noerr oc with Sys_error _ -> continue := false
end in in
let arg = (module M : ASYNC_HANDLER_ARG) in
(* last query we handle on this connection, too complicated otherwise *) (* call handler *)
continue := false; begin
try handler oc req ~resp
(* make sure we close the connection if it is GC'ed *) with Sys_error _ -> continue := false
let dispose (module M:ASYNC_HANDLER_ARG) = M.close() in
Gc.finalise dispose arg;
h req arg oc
end end
with with
| Sys_error _ -> | Sys_error _ ->

View file

@ -15,15 +15,19 @@ module S = Tiny_httpd
let () = let () =
let server = S.create () in let server = S.create () in
(* say hello *) (* say hello *)
S.add_route_handler ~meth:`GET server S.add_route_handler ~meth:`GET server
S.Route.(exact "hello" @/ string @/ return) S.Route.(exact "hello" @/ string @/ return)
(fun name _req -> S.Response.make_string (Ok ("hello " ^name ^"!\n"))); (fun name _req -> S.Response.make_string (Ok ("hello " ^name ^"!\n")));
(* echo request *) (* echo request *)
S.add_route_handler server S.add_route_handler server
S.Route.(exact "echo" @/ return) S.Route.(exact "echo" @/ return)
(fun req -> S.Response.make_string (fun req -> S.Response.make_string
(Ok (Format.asprintf "echo:@ %a@." S.Request.pp req))); (Ok (Format.asprintf "echo:@ %a@." S.Request.pp req)));
(* file upload *)
S.add_route_handler ~meth:`PUT server S.add_route_handler ~meth:`PUT server
S.Route.(exact "upload" @/ string_urlencoded @/ return) S.Route.(exact "upload" @/ string_urlencoded @/ return)
(fun path req -> (fun path req ->
@ -36,6 +40,8 @@ let () =
S.Response.fail ~code:500 "couldn't upload file: %s" S.Response.fail ~code:500 "couldn't upload file: %s"
(Printexc.to_string e) (Printexc.to_string e)
); );
(* run the server *)
Printf.printf "listening on http://%s:%d\n%!" (S.addr server) (S.port server); Printf.printf "listening on http://%s:%d\n%!" (S.addr server) (S.port server);
match S.run server with match S.run server with
| Ok () -> () | Ok () -> ()
@ -557,55 +563,6 @@ val add_path_handler_stream :
json decoder (such as [Jsonm]) or into a file. json decoder (such as [Jsonm]) or into a file.
@since 0.3 *) @since 0.3 *)
(** {2 Async handler}
{b EXPERIMENTAL}: this API is not stable yet. *)
(** Async handler arguments
Async handlers are handlers that do not block a thread.
Instead they receive this argument, and can call the functions
when they want. The functions might be registered in some event loop,
for example, or stashed in a table only to be called by a thread or thread
pool later when some event happened. *)
module type ASYNC_HANDLER_ARG = sig
val write : bytes -> int -> int -> unit
(** Write some data on the socket.
Note that this is still done using blocking IO. *)
val set_response : Response.t -> unit
(** Write response. More data can still be written.
The writing is done using blocking IO. *)
val close : unit -> unit
(** Close connection. This is idempotent and failures will not be
reported here. *)
end
val add_route_async_handler :
?accept:(unit Request.t -> (unit, Response_code.t * string) result) ->
?meth:Meth.t ->
t ->
('a, string Request.t -> (module ASYNC_HANDLER_ARG) -> unit) Route.t -> 'a ->
unit
(** Add a route handler that replies asynchronously.
See {!ASYNC_HANDLER_ARG} for some details on how the handler might
generate a response, write data, etc.
@since NEXT_RELEASE *)
val add_route_async_stream_handler :
?accept:(unit Request.t -> (unit, Response_code.t * string) result) ->
?meth:Meth.t ->
t ->
('a, byte_stream Request.t -> (module ASYNC_HANDLER_ARG) -> unit) Route.t -> 'a ->
unit
(** Like {!add_route_async_handler} but takes the request body as a stream,
not a string.
@since NEXT_RELEASE *)
(** {2 Server-sent events} (** {2 Server-sent events}
{b EXPERIMENTAL}: this API is not stable yet. *) {b EXPERIMENTAL}: this API is not stable yet. *)
@ -651,18 +608,6 @@ val add_route_server_sent_handler :
@since NEXT_RELEASE *) @since NEXT_RELEASE *)
val add_route_server_sent_async_handler :
?accept:(unit Request.t -> (unit, Response_code.t * string) result) ->
t ->
('a, string Request.t -> server_sent_generator -> unit) Route.t -> 'a ->
unit
(** Similar to {!add_route_server_sent_handler}, but the thread quits immediately
after responding "200", and the rest of the data (the events) are made
from the callback from somewhere else.
Note that the IO are still blocking.
@since NEXT_RELEASE *)
(** {2 Run the server} *) (** {2 Run the server} *)
val stop : t -> unit val stop : t -> unit

View file

@ -189,7 +189,7 @@ let cb_encode_compressed_stream
headers= set_headers resp.headers; headers= set_headers resp.headers;
body=`Stream (encode_deflate_stream_ ~buf_size str); body=`Stream (encode_deflate_stream_ ~buf_size str);
} }
| `String _ -> None | `String _ | `Void -> None
) else None ) else None
let setup let setup

View file

@ -3,5 +3,5 @@
(name tiny_httpd_camlzip) (name tiny_httpd_camlzip)
(public_name tiny_httpd_camlzip) (public_name tiny_httpd_camlzip)
(synopsis "A wrapper around camlzip to bring compression to Tiny_httpd") (synopsis "A wrapper around camlzip to bring compression to Tiny_httpd")
(flags :standard -safe-string -warn-error -a) (flags :standard -safe-string -warn-error -a+8)
(libraries tiny_httpd camlzip)) (libraries tiny_httpd camlzip))