mirror of
https://github.com/c-cube/ezcurl.git
synced 2025-12-05 19:00:34 -05:00
Server-Sent Event support in both modes
This commit is contained in:
parent
4b7d9ec769
commit
dab9ca8190
2 changed files with 301 additions and 18 deletions
|
|
@ -144,9 +144,33 @@ let _apply_config (self : t) (config : Config.t) : unit =
|
|||
opt_iter authmethod ~f:(Curl.set_httpauth self.curl);
|
||||
opt_iter username ~f:(Curl.set_username self.curl);
|
||||
opt_iter password ~f:(Curl.set_password self.curl);
|
||||
Curl.set_nosignal self.curl (_get_no_signal ());
|
||||
()
|
||||
|
||||
let _eq_case a b =
|
||||
let low = String.lowercase_ascii in
|
||||
String.compare (low a) (low b) = 0
|
||||
|
||||
let _add_header_nodup (h : string * string) (headers : _ list ref) : unit =
|
||||
let sq = List.to_seq !headers in
|
||||
let k, v = h in
|
||||
match
|
||||
Seq.find_index
|
||||
(fun t ->
|
||||
let tk, tv = t in
|
||||
_eq_case k tk && _eq_case v tv)
|
||||
sq
|
||||
with
|
||||
| None ->
|
||||
headers := h :: !headers;
|
||||
()
|
||||
| Some _ -> () (* No duplicate *)
|
||||
|
||||
let _contains_resp_headers (h : string) (headers : string list) : bool =
|
||||
let sq = List.to_seq headers in
|
||||
match Seq.find_index (fun hh -> _eq_case h hh) sq with
|
||||
| None -> false
|
||||
| Some _ -> true
|
||||
|
||||
let _set_headers (self : t) (headers : _ list) : unit =
|
||||
let headers = List.map (fun (k, v) -> k ^ ": " ^ v) headers in
|
||||
Curl.set_httpheader self.curl headers;
|
||||
|
|
@ -217,6 +241,20 @@ let string_of_meth = function
|
|||
|
||||
let pp_meth out m = Format.pp_print_string out (string_of_meth m)
|
||||
|
||||
type sse_frame = {
|
||||
event: string option;
|
||||
id: string option;
|
||||
data: string option;
|
||||
retry: int option;
|
||||
empties: string list; (* Lines without a ':' *)
|
||||
}
|
||||
|
||||
type sse_state =
|
||||
| Frame of sse_frame
|
||||
| End_of_stream
|
||||
|
||||
type sse_callback = sse_state -> bool
|
||||
|
||||
module type IO = sig
|
||||
type 'a t
|
||||
|
||||
|
|
@ -237,6 +275,7 @@ module type S = sig
|
|||
?range:string ->
|
||||
?content:[ `String of string | `Write of bytes -> int -> int ] ->
|
||||
?headers:(string * string) list ->
|
||||
?callback:[ `Sse_event of sse_callback ] ->
|
||||
url:string ->
|
||||
meth:meth ->
|
||||
unit ->
|
||||
|
|
@ -275,6 +314,7 @@ module type S = sig
|
|||
?range:string ->
|
||||
?content:[ `String of string | `Write of bytes -> int -> int ] ->
|
||||
?headers:(string * string) list ->
|
||||
?callback:[ `Sse_event of sse_callback ] ->
|
||||
url:string ->
|
||||
meth:meth ->
|
||||
write_into:#input_stream ->
|
||||
|
|
@ -289,6 +329,7 @@ module type S = sig
|
|||
?config:Config.t ->
|
||||
?range:string ->
|
||||
?headers:(string * string) list ->
|
||||
?callback:[ `Sse_event of sse_callback ] ->
|
||||
url:string ->
|
||||
unit ->
|
||||
(string response, Curl.curlCode * string) result io
|
||||
|
|
@ -301,6 +342,7 @@ module type S = sig
|
|||
?client:t ->
|
||||
?config:Config.t ->
|
||||
?headers:(string * string) list ->
|
||||
?callback:[ `Sse_event of sse_callback ] ->
|
||||
url:string ->
|
||||
content:[ `String of string | `Write of bytes -> int -> int ] ->
|
||||
unit ->
|
||||
|
|
@ -315,6 +357,7 @@ module type S = sig
|
|||
?config:Config.t ->
|
||||
?headers:(string * string) list ->
|
||||
?content:[ `String of string | `Write of bytes -> int -> int ] ->
|
||||
?callback:[ `Sse_event of sse_callback ] ->
|
||||
params:Curl.curlHTTPPost list ->
|
||||
url:string ->
|
||||
unit ->
|
||||
|
|
@ -353,6 +396,169 @@ let mk_res (self : t) headers body : (_ response, _) result =
|
|||
Ok { headers; code; body; info }
|
||||
with Parse_error (e, msg) -> Error (e, Curl.strerror e ^ ": " ^ msg)
|
||||
|
||||
let sse_frame_with_event sse_f v =
|
||||
{
|
||||
event = Some v;
|
||||
id = !sse_f.id;
|
||||
data = !sse_f.data;
|
||||
retry = !sse_f.retry;
|
||||
empties = !sse_f.empties;
|
||||
}
|
||||
|
||||
let sse_frame_with_id sse_f v =
|
||||
{
|
||||
event = !sse_f.event;
|
||||
id = Some v;
|
||||
data = !sse_f.data;
|
||||
retry = !sse_f.retry;
|
||||
empties = !sse_f.empties;
|
||||
}
|
||||
|
||||
let sse_frame_append_data sse_f v =
|
||||
let data =
|
||||
match !sse_f.data with
|
||||
| None -> Some v
|
||||
| Some vv -> Some (vv ^ "\n" ^ v)
|
||||
in
|
||||
{
|
||||
event = !sse_f.event;
|
||||
id = !sse_f.id;
|
||||
data;
|
||||
retry = !sse_f.retry;
|
||||
empties = !sse_f.empties;
|
||||
}
|
||||
|
||||
let sse_frame_with_retry sse_f v =
|
||||
let retry = int_of_string_opt v in
|
||||
{
|
||||
event = !sse_f.event;
|
||||
id = !sse_f.id;
|
||||
data = !sse_f.data;
|
||||
retry;
|
||||
empties = !sse_f.empties;
|
||||
}
|
||||
|
||||
let sse_frame_append_empties sse_f v =
|
||||
let empties =
|
||||
match !sse_f.empties with
|
||||
| [] -> [ v ]
|
||||
| vv -> v :: vv
|
||||
in
|
||||
{
|
||||
event = !sse_f.event;
|
||||
id = !sse_f.id;
|
||||
data = !sse_f.data;
|
||||
retry = !sse_f.retry;
|
||||
empties;
|
||||
}
|
||||
|
||||
let sse_process_pair k v sse_f =
|
||||
match k with
|
||||
| "event" -> sse_f := sse_frame_with_event sse_f v
|
||||
| "id" -> sse_f := sse_frame_with_id sse_f v
|
||||
| "data" -> sse_f := sse_frame_append_data sse_f v
|
||||
| "retry" -> sse_f := sse_frame_with_retry sse_f v
|
||||
| "" -> () (* The field is ignored *)
|
||||
| _ -> sse_f := sse_frame_append_empties sse_f k
|
||||
|
||||
let sse_split_line s =
|
||||
let l = String.length s in
|
||||
let sq = String.to_seq s in
|
||||
match Seq.find_index (fun c -> c = ':') sq with
|
||||
| None -> s :: []
|
||||
| Some p -> [ String.sub s 0 p; String.sub s (p + 1) (l - p - 1) ]
|
||||
|
||||
let sse_parse_line line sse_f =
|
||||
match sse_split_line line with
|
||||
| [ k; v ] ->
|
||||
let k = String.trim k in
|
||||
let v = String.trim v in
|
||||
sse_process_pair k v sse_f
|
||||
| [ k ] ->
|
||||
let k = String.trim k in
|
||||
sse_process_pair k "" sse_f
|
||||
| _ ->
|
||||
();
|
||||
()
|
||||
|
||||
let sse_extract_next_line body =
|
||||
let len = Buffer.length body in
|
||||
let bf_seq = Buffer.to_seq body in
|
||||
(* Search for some complete line *)
|
||||
match Seq.find_index (fun c -> c = '\n') bf_seq with
|
||||
(* Then no complete line available for now *)
|
||||
| None -> None
|
||||
(* Oh nice a complete line found *)
|
||||
| Some pivot ->
|
||||
(* Extract line except ending LF *)
|
||||
let bf_line = Bytes.create pivot in
|
||||
Buffer.blit body 0 bf_line 0 pivot;
|
||||
let line = String.trim (Bytes.to_string bf_line) in
|
||||
(* Now shift left the remaining after LF *)
|
||||
let pivot = pivot + 1 in
|
||||
let rem = len - pivot in
|
||||
let bf_after = Bytes.create rem in
|
||||
if rem > 0 then Buffer.blit body pivot bf_after 0 rem;
|
||||
Buffer.reset body;
|
||||
Buffer.add_bytes body bf_after;
|
||||
(* Here the line finally *)
|
||||
Some line
|
||||
|
||||
let sse_parse_lines body sse_f =
|
||||
let rec loop body sse_f =
|
||||
match sse_extract_next_line body with
|
||||
| None -> false (* Nothing for now *)
|
||||
| Some line ->
|
||||
(match line with
|
||||
(* Ready to send event *)
|
||||
| "" -> true
|
||||
(* Try next line *)
|
||||
| line ->
|
||||
sse_parse_line line sse_f;
|
||||
loop body sse_f)
|
||||
in
|
||||
loop body sse_f
|
||||
|
||||
let sse_handle_post_write callback body sse_f =
|
||||
match sse_f with
|
||||
| None -> true (* Stream can continue (No SSE content) *)
|
||||
| Some _sse_f ->
|
||||
(match callback with
|
||||
| None -> true (* Stream can continue (No SSE callback) *)
|
||||
| Some (`Sse_event sse_cb) ->
|
||||
let sse_cb_clean sse_cb _sse_f =
|
||||
(* Send callback to user *)
|
||||
let r = sse_cb (Frame !_sse_f) in
|
||||
(* And reset internal event data *)
|
||||
_sse_f :=
|
||||
{ event = None; id = None; data = None; retry = None; empties = [] };
|
||||
r
|
||||
in
|
||||
let rec loop body sse_f =
|
||||
match sse_parse_lines body sse_f with
|
||||
(* Stream can continue *)
|
||||
| false -> true
|
||||
(* Must dispatch event now*)
|
||||
| true ->
|
||||
(match sse_cb_clean sse_cb sse_f with
|
||||
(* Stream must close now *)
|
||||
| false -> false
|
||||
(* Else continue parse *)
|
||||
| true -> loop body sse_f)
|
||||
in
|
||||
loop body _sse_f)
|
||||
|
||||
let sse_handle_post_finish callback body sse_f =
|
||||
match sse_f with
|
||||
| None -> ()
|
||||
| Some _ ->
|
||||
(match callback with
|
||||
| None -> ()
|
||||
| Some (`Sse_event sse_cb) ->
|
||||
let _ = sse_handle_post_write callback body sse_f in
|
||||
let _ = sse_cb End_of_stream in
|
||||
())
|
||||
|
||||
module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
|
||||
open IO
|
||||
|
||||
|
|
@ -388,10 +594,11 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
|
|||
do_cleanup: bool;
|
||||
mutable resp_headers: string list;
|
||||
mutable resp_headers_done: bool;
|
||||
mutable sse_frame: sse_frame ref option;
|
||||
}
|
||||
|
||||
let http_setup_ ?client ?(config = Config.default) ?range ?content
|
||||
?(headers = []) ~url ~meth () : http_state_ =
|
||||
?(headers = []) ?callback ~url ~meth () : http_state_ =
|
||||
let headers = ref headers in
|
||||
let do_cleanup, self =
|
||||
match client with
|
||||
|
|
@ -412,6 +619,14 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
|
|||
| Some size, POST _ -> Curl.set_postfieldsize self.curl size
|
||||
| Some size, _ -> Curl.set_infilesize self.curl size);
|
||||
|
||||
(* Add more pre-determined request headers depend of feature *)
|
||||
(match callback with
|
||||
| None -> ()
|
||||
| Some (`Sse_event _) ->
|
||||
_add_header_nodup ("Cache-control", "no-cache") headers;
|
||||
_add_header_nodup ("Accept", "text/event-stream") headers;
|
||||
());
|
||||
|
||||
(* local state *)
|
||||
let st =
|
||||
{
|
||||
|
|
@ -419,6 +634,7 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
|
|||
client = self;
|
||||
resp_headers = [];
|
||||
resp_headers_done = false;
|
||||
sse_frame = None;
|
||||
}
|
||||
in
|
||||
|
||||
|
|
@ -442,9 +658,27 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
|
|||
Curl.set_headerfunction self.curl (fun s0 ->
|
||||
let s = String.trim s0 in
|
||||
(* Printf.printf "got header %S\n%!" s0; *)
|
||||
if s0 = "\r\n" then
|
||||
st.resp_headers_done <- true
|
||||
else (
|
||||
if s0 = "\r\n" then (
|
||||
st.resp_headers_done <- true ;
|
||||
(* Validate headers for user callback *)
|
||||
match callback with
|
||||
| None -> ()
|
||||
| Some (`Sse_event _) ->
|
||||
if
|
||||
_contains_resp_headers "Content-type: text/event-stream"
|
||||
st.resp_headers
|
||||
then
|
||||
st.sse_frame <-
|
||||
Some
|
||||
(ref
|
||||
{
|
||||
event = None;
|
||||
id = None;
|
||||
data = None;
|
||||
retry = None;
|
||||
empties = [];
|
||||
})
|
||||
) else (
|
||||
(* redirection: drop previous headers *)
|
||||
if st.resp_headers_done then (
|
||||
st.resp_headers_done <- false;
|
||||
|
|
@ -457,22 +691,27 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
|
|||
|
||||
st
|
||||
|
||||
let http ?(tries = 1) ?client ?config ?range ?content ?headers ~url ~meth () :
|
||||
let http ?(tries = 1) ?client ?config ?range ?content ?headers ?callback
|
||||
~url ~meth () :
|
||||
(string response, _) result io =
|
||||
(* at least one attempt *)
|
||||
let tries = max tries 1 in
|
||||
let st =
|
||||
http_setup_ ?client ?config ?range ?content ?headers ~url ~meth ()
|
||||
http_setup_ ?client ?config ?range ?content ?headers ?callback ~url ~meth ()
|
||||
in
|
||||
|
||||
let body = Buffer.create 64 in
|
||||
Curl.set_writefunction st.client.curl (fun s ->
|
||||
Buffer.add_string body s;
|
||||
String.length s);
|
||||
match sse_handle_post_write callback body st.sse_frame with
|
||||
| true -> String.length s (* Continue write *)
|
||||
| false -> 0xFFFFFFFE
|
||||
(* Stop stream, not-forked libcurl has no CURL_WRITE_FUNC_ABORT *));
|
||||
|
||||
let rec loop i =
|
||||
IO.perform st.client.curl >>= function
|
||||
| Curl.CURLE_OK ->
|
||||
let _ = sse_handle_post_finish callback body st.sse_frame in
|
||||
let r =
|
||||
mk_res st.client (List.rev st.resp_headers) (Buffer.contents body)
|
||||
in
|
||||
|
|
@ -480,45 +719,64 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
|
|||
return r
|
||||
| Curl.CURLE_AGAIN when i > 1 -> loop (i - 1) (* try again *)
|
||||
| c ->
|
||||
let _ = sse_handle_post_finish callback body st.sse_frame in
|
||||
if st.do_cleanup then Curl.cleanup st.client.curl;
|
||||
return (Error (c, Curl.strerror c))
|
||||
in
|
||||
loop tries
|
||||
|
||||
let http_stream ?(tries = 1) ?client ?config ?range ?content ?headers ~url
|
||||
~meth ~(write_into : #input_stream) () : (unit response, _) result io =
|
||||
let http_stream ?(tries = 1) ?client ?config ?range ?content ?headers
|
||||
?callback ~url ~meth ~(write_into : #input_stream) () : (unit response, _) result io =
|
||||
let tries = max tries 1 in
|
||||
let st =
|
||||
http_setup_ ?client ?config ?range ?content ?headers ~url ~meth ()
|
||||
http_setup_ ?client ?config ?range ?content ?headers ?callback ~url ~meth ()
|
||||
in
|
||||
|
||||
let body_sse_ =
|
||||
match callback with
|
||||
| None -> None
|
||||
| Some (`Sse_event _) -> Some (Buffer.create 64) in
|
||||
Curl.set_writefunction st.client.curl (fun s ->
|
||||
let n = String.length s in
|
||||
write_into#on_input (Bytes.unsafe_of_string s) 0 n;
|
||||
n);
|
||||
match body_sse_ with
|
||||
| None -> n
|
||||
| Some body ->
|
||||
Buffer.add_string body s;
|
||||
(match sse_handle_post_write callback body st.sse_frame with
|
||||
| true -> n (* Continue write *)
|
||||
| false -> 0xFFFFFFFE
|
||||
(* Stop stream, not-forked libcurl has no CURL_WRITE_FUNC_ABORT *))
|
||||
);
|
||||
|
||||
let rec loop i =
|
||||
IO.perform st.client.curl >>= function
|
||||
| Curl.CURLE_OK ->
|
||||
(match body_sse_ with
|
||||
| None -> ()
|
||||
| Some body -> let _ = sse_handle_post_finish callback body st.sse_frame in ());
|
||||
let r = mk_res st.client (List.rev st.resp_headers) () in
|
||||
write_into#on_close ();
|
||||
if st.do_cleanup then Curl.cleanup st.client.curl;
|
||||
return r
|
||||
| Curl.CURLE_AGAIN when i > 1 -> loop (i - 1) (* try again *)
|
||||
| c ->
|
||||
(match body_sse_ with
|
||||
| None -> ()
|
||||
| Some body -> let _ = sse_handle_post_finish callback body st.sse_frame in ());
|
||||
write_into#on_close ();
|
||||
if st.do_cleanup then Curl.cleanup st.client.curl;
|
||||
return (Error (c, Curl.strerror c))
|
||||
in
|
||||
loop tries
|
||||
|
||||
let get ?tries ?client ?config ?range ?headers ~url () : _ result io =
|
||||
http ?tries ?client ?config ?range ?headers ~url ~meth:GET ()
|
||||
let get ?tries ?client ?config ?range ?headers ?callback ~url () : _ result io =
|
||||
http ?tries ?client ?config ?range ?headers ?callback ~url ~meth:GET ()
|
||||
|
||||
let post ?tries ?client ?config ?headers ?content ~params ~url () :
|
||||
let post ?tries ?client ?config ?headers ?content ?callback ~params ~url () :
|
||||
_ result io =
|
||||
http ?tries ?client ?config ?headers ?content ~url ~meth:(POST params) ()
|
||||
http ?tries ?client ?config ?headers ?content ?callback ~url ~meth:(POST params) ()
|
||||
|
||||
let put ?tries ?client ?config ?headers ~url ~content () : _ result io =
|
||||
http ?tries ?client ?config ?headers ~url ~content ~meth:PUT ()
|
||||
let put ?tries ?client ?config ?headers ?callback ~url ~content () : _ result io =
|
||||
http ?tries ?client ?config ?headers ?callback ~url ~content ~meth:PUT ()
|
||||
end
|
||||
|
|
|
|||
|
|
@ -114,6 +114,20 @@ type meth =
|
|||
val pp_meth : Format.formatter -> meth -> unit
|
||||
val string_of_meth : meth -> string
|
||||
|
||||
type sse_frame = {
|
||||
event: string option;
|
||||
id: string option;
|
||||
data: string option;
|
||||
retry: int option;
|
||||
empties: string list; (* Lines without a ':' *)
|
||||
}
|
||||
|
||||
type sse_state =
|
||||
| Frame of sse_frame
|
||||
| End_of_stream
|
||||
|
||||
type sse_callback = sse_state -> bool
|
||||
|
||||
(** {2 Underlying IO Monad} *)
|
||||
module type IO = sig
|
||||
type 'a t
|
||||
|
|
@ -136,6 +150,7 @@ module type S = sig
|
|||
?range:string ->
|
||||
?content:[ `String of string | `Write of bytes -> int -> int ] ->
|
||||
?headers:(string * string) list ->
|
||||
?callback:[ `Sse_event of sse_callback ] ->
|
||||
url:string ->
|
||||
meth:meth ->
|
||||
unit ->
|
||||
|
|
@ -158,6 +173,12 @@ module type S = sig
|
|||
It must return [0] when the content is entirely written, and not
|
||||
before.
|
||||
@param headers headers of the query
|
||||
@param callback callback to use on received body, either
|
||||
a [None] to keep normal Curl write behavior, or [`Sse_event f]
|
||||
to enable {{: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events}
|
||||
Server-sent events } processing, where [f] is a callback type [sse_callback]
|
||||
and returns boolean to indicate if the internal write callback can
|
||||
continue to proceed process or else close the incoming infinite stream.
|
||||
*)
|
||||
|
||||
(** Push-based stream of bytes
|
||||
|
|
@ -174,6 +195,7 @@ module type S = sig
|
|||
?range:string ->
|
||||
?content:[ `String of string | `Write of bytes -> int -> int ] ->
|
||||
?headers:(string * string) list ->
|
||||
?callback:[ `Sse_event of sse_callback ] ->
|
||||
url:string ->
|
||||
meth:meth ->
|
||||
write_into:#input_stream ->
|
||||
|
|
@ -191,6 +213,7 @@ module type S = sig
|
|||
?config:Config.t ->
|
||||
?range:string ->
|
||||
?headers:(string * string) list ->
|
||||
?callback:[ `Sse_event of sse_callback ] ->
|
||||
url:string ->
|
||||
unit ->
|
||||
(string response, Curl.curlCode * string) result io
|
||||
|
|
@ -203,6 +226,7 @@ module type S = sig
|
|||
?client:t ->
|
||||
?config:Config.t ->
|
||||
?headers:(string * string) list ->
|
||||
?callback:[ `Sse_event of sse_callback ] ->
|
||||
url:string ->
|
||||
content:[ `String of string | `Write of bytes -> int -> int ] ->
|
||||
unit ->
|
||||
|
|
@ -217,6 +241,7 @@ module type S = sig
|
|||
?config:Config.t ->
|
||||
?headers:(string * string) list ->
|
||||
?content:[ `String of string | `Write of bytes -> int -> int ] ->
|
||||
?callback:[ `Sse_event of sse_callback ] ->
|
||||
params:Curl.curlHTTPPost list ->
|
||||
url:string ->
|
||||
unit ->
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue