Compare commits

...

4 commits

Author SHA1 Message Date
Stéphane Lavergne
4b7bd8f45a
Merge 19db102b22 into a0a5b989b7 2025-08-24 02:14:09 +00:00
Simon Cruanes
a0a5b989b7
Update README.md
Some checks failed
build / Build (push) Has been cancelled
2025-05-18 01:00:41 -04:00
Stéphane Lavergne
19db102b22
Apply suggestions from code review
Applied as suggested and we'll go from here.

Co-authored-by: Simon Cruanes <simon.cruanes.2007@m4x.org>
2025-02-07 22:14:06 -05:00
Stéphane Lavergne
dab9ca8190 Server-Sent Event support in both modes 2025-02-04 10:46:00 -05:00
3 changed files with 287 additions and 19 deletions

View file

@ -1,6 +1,6 @@
# EZCurl [![build](https://github.com/c-cube/ezcurl/actions/workflows/main.yml/badge.svg)](https://github.com/c-cube/ezcurl/actions/workflows/main.yml)
A simple wrapper around OCurl, for easy tasks around http.
A simple wrapper around [OCurl](https://github.com/ygrek/ocurl/), for easy tasks around http.
**project goals**

View file

@ -144,9 +144,21 @@ let _apply_config (self : t) (config : Config.t) : unit =
opt_iter authmethod ~f:(Curl.set_httpauth self.curl);
opt_iter username ~f:(Curl.set_username self.curl);
opt_iter password ~f:(Curl.set_password self.curl);
Curl.set_nosignal self.curl (_get_no_signal ());
()
let _eq_case a b =
let low = String.lowercase_ascii in
String.equal (low a) (low b)
let _add_header_nodup (h : string * string) (headers : _ list ref) : unit =
let sq = List.to_seq !headers in
let k, v = h in
if not (List.exists (fun (tk,tv) -> _eq_case k tk && _eq_case v tv) !headers) then
headers := h :: !headers;
let _contains_resp_headers (h : string) (headers : string list) : bool =
List.exists (_eq_case h) headers
let _set_headers (self : t) (headers : _ list) : unit =
let headers = List.map (fun (k, v) -> k ^ ": " ^ v) headers in
Curl.set_httpheader self.curl headers;
@ -217,6 +229,20 @@ let string_of_meth = function
let pp_meth out m = Format.pp_print_string out (string_of_meth m)
type sse_frame = {
event: string option;
id: string option;
data: string option;
retry: int option;
empties: string list; (* Lines without a ':' *)
}
type sse_state =
| Frame of sse_frame
| End_of_stream
type sse_callback = sse_state -> bool
module type IO = sig
type 'a t
@ -237,6 +263,7 @@ module type S = sig
?range:string ->
?content:[ `String of string | `Write of bytes -> int -> int ] ->
?headers:(string * string) list ->
?callback:[ `Sse_event of sse_callback ] ->
url:string ->
meth:meth ->
unit ->
@ -275,6 +302,7 @@ module type S = sig
?range:string ->
?content:[ `String of string | `Write of bytes -> int -> int ] ->
?headers:(string * string) list ->
?callback:[ `Sse_event of sse_callback ] ->
url:string ->
meth:meth ->
write_into:#input_stream ->
@ -289,6 +317,7 @@ module type S = sig
?config:Config.t ->
?range:string ->
?headers:(string * string) list ->
?callback:[ `Sse_event of sse_callback ] ->
url:string ->
unit ->
(string response, Curl.curlCode * string) result io
@ -301,6 +330,7 @@ module type S = sig
?client:t ->
?config:Config.t ->
?headers:(string * string) list ->
?callback:[ `Sse_event of sse_callback ] ->
url:string ->
content:[ `String of string | `Write of bytes -> int -> int ] ->
unit ->
@ -315,6 +345,7 @@ module type S = sig
?config:Config.t ->
?headers:(string * string) list ->
?content:[ `String of string | `Write of bytes -> int -> int ] ->
?callback:[ `Sse_event of sse_callback ] ->
params:Curl.curlHTTPPost list ->
url:string ->
unit ->
@ -353,6 +384,166 @@ let mk_res (self : t) headers body : (_ response, _) result =
Ok { headers; code; body; info }
with Parse_error (e, msg) -> Error (e, Curl.strerror e ^ ": " ^ msg)
let sse_frame_with_event sse_f v =
{
(!sse_f) with
event = Some v;
}
let sse_frame_with_id sse_f v =
{
event = !sse_f.event;
id = Some v;
data = !sse_f.data;
retry = !sse_f.retry;
empties = !sse_f.empties;
}
let sse_frame_append_data sse_f v =
let data =
match !sse_f.data with
| None -> Some v
| Some vv -> Some (vv ^ "\n" ^ v)
in
{
event = !sse_f.event;
id = !sse_f.id;
data;
retry = !sse_f.retry;
empties = !sse_f.empties;
}
let sse_frame_with_retry sse_f v =
let retry = int_of_string_opt v in
{
event = !sse_f.event;
id = !sse_f.id;
data = !sse_f.data;
retry;
empties = !sse_f.empties;
}
let sse_frame_append_empties sse_f v =
let empties =
match !sse_f.empties with
| [] -> [ v ]
| vv -> v :: vv
in
{
event = !sse_f.event;
id = !sse_f.id;
data = !sse_f.data;
retry = !sse_f.retry;
empties;
}
let sse_process_pair k v sse_f =
match k with
| "event" -> sse_f := sse_frame_with_event sse_f v
| "id" -> sse_f := sse_frame_with_id sse_f v
| "data" -> sse_f := sse_frame_append_data sse_f v
| "retry" -> sse_f := sse_frame_with_retry sse_f v
| "" -> () (* The field is ignored *)
| _ -> sse_f := sse_frame_append_empties sse_f k
let sse_split_line s =
let l = String.length s in
let sq = String.to_seq s in
match Seq.find_index (fun c -> c = ':') sq with
| None -> s :: []
| Some p -> [ String.sub s 0 p; String.sub s (p + 1) (l - p - 1) ]
let sse_parse_line line sse_f =
match sse_split_line line with
| [ k; v ] ->
let k = String.trim k in
let v = String.trim v in
sse_process_pair k v sse_f
| [ k ] ->
let k = String.trim k in
sse_process_pair k "" sse_f
| _ ->
();
()
let sse_extract_next_line body =
let len = Buffer.length body in
let bf_seq = Buffer.to_seq body in
(* Search for some complete line *)
match Seq.find_index (fun c -> c = '\n') bf_seq with
(* Then no complete line available for now *)
| None -> None
(* Oh nice a complete line found *)
| Some pivot ->
(* Extract line except ending LF *)
let bf_line = Bytes.create pivot in
Buffer.blit body 0 bf_line 0 pivot;
let line = String.trim (Bytes.to_string bf_line) in
(* Now shift left the remaining after LF *)
let pivot = pivot + 1 in
let rem = len - pivot in
let bf_after = Bytes.create rem in
if rem > 0 then Buffer.blit body pivot bf_after 0 rem;
Buffer.reset body;
Buffer.add_bytes body bf_after;
(* Here the line finally *)
Some line
let sse_parse_lines body sse_f =
let rec loop body sse_f =
match sse_extract_next_line body with
| None -> false (* Nothing for now *)
| Some line ->
(match line with
(* Ready to send event *)
| "" -> true
(* Try next line *)
| line ->
sse_parse_line line sse_f;
loop body sse_f)
in
loop body sse_f
let sse_handle_post_write callback body sse_f =
match sse_f with
| None -> true (* Stream can continue (No SSE content) *)
| Some _sse_f ->
(match callback with
| None -> true (* Stream can continue (No SSE callback) *)
| Some (`Sse_event sse_cb) ->
let sse_cb_clean sse_cb _sse_f =
(* Send callback to user *)
let r = sse_cb (Frame !_sse_f) in
(* And reset internal event data *)
_sse_f :=
{ event = None; id = None; data = None; retry = None; empties = [] };
r
in
let rec loop body sse_f =
match sse_parse_lines body sse_f with
(* Stream can continue *)
| false -> true
(* Must dispatch event now*)
| true ->
(match sse_cb_clean sse_cb sse_f with
(* Stream must close now *)
| false -> false
(* Else continue parse *)
| true -> loop body sse_f)
in
loop body _sse_f)
let sse_handle_post_finish callback body sse_f =
match sse_f with
| None -> ()
| Some _ ->
(match callback with
| None -> ()
| Some (`Sse_event sse_cb) ->
let _ = sse_handle_post_write callback body sse_f in
let _ = sse_cb End_of_stream in
())
module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
open IO
@ -388,10 +579,11 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
do_cleanup: bool;
mutable resp_headers: string list;
mutable resp_headers_done: bool;
mutable sse_frame: sse_frame ref option;
}
let http_setup_ ?client ?(config = Config.default) ?range ?content
?(headers = []) ~url ~meth () : http_state_ =
?(headers = []) ?callback ~url ~meth () : http_state_ =
let headers = ref headers in
let do_cleanup, self =
match client with
@ -412,6 +604,14 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
| Some size, POST _ -> Curl.set_postfieldsize self.curl size
| Some size, _ -> Curl.set_infilesize self.curl size);
(* Add more pre-determined request headers depend of feature *)
(match callback with
| None -> ()
| Some (`Sse_event _) ->
_add_header_nodup ("Cache-control", "no-cache") headers;
_add_header_nodup ("Accept", "text/event-stream") headers;
());
(* local state *)
let st =
{
@ -419,6 +619,7 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
client = self;
resp_headers = [];
resp_headers_done = false;
sse_frame = None;
}
in
@ -442,9 +643,27 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
Curl.set_headerfunction self.curl (fun s0 ->
let s = String.trim s0 in
(* Printf.printf "got header %S\n%!" s0; *)
if s0 = "\r\n" then
st.resp_headers_done <- true
else (
if s0 = "\r\n" then (
st.resp_headers_done <- true ;
(* Validate headers for user callback *)
match callback with
| None -> ()
| Some (`Sse_event _) ->
if
_contains_resp_headers "Content-type: text/event-stream"
st.resp_headers
then
st.sse_frame <-
Some
(ref
{
event = None;
id = None;
data = None;
retry = None;
empties = [];
})
) else (
(* redirection: drop previous headers *)
if st.resp_headers_done then (
st.resp_headers_done <- false;
@ -457,22 +676,27 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
st
let http ?(tries = 1) ?client ?config ?range ?content ?headers ~url ~meth () :
let http ?(tries = 1) ?client ?config ?range ?content ?headers ?callback
~url ~meth () :
(string response, _) result io =
(* at least one attempt *)
let tries = max tries 1 in
let st =
http_setup_ ?client ?config ?range ?content ?headers ~url ~meth ()
http_setup_ ?client ?config ?range ?content ?headers ?callback ~url ~meth ()
in
let body = Buffer.create 64 in
Curl.set_writefunction st.client.curl (fun s ->
Buffer.add_string body s;
String.length s);
match sse_handle_post_write callback body st.sse_frame with
| true -> String.length s (* Continue write *)
| false -> 0xFFFFFFFE
(* Stop stream, not-forked libcurl has no CURL_WRITE_FUNC_ABORT *));
let rec loop i =
IO.perform st.client.curl >>= function
| Curl.CURLE_OK ->
let _ = sse_handle_post_finish callback body st.sse_frame in
let r =
mk_res st.client (List.rev st.resp_headers) (Buffer.contents body)
in
@ -480,45 +704,64 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
return r
| Curl.CURLE_AGAIN when i > 1 -> loop (i - 1) (* try again *)
| c ->
let _ = sse_handle_post_finish callback body st.sse_frame in
if st.do_cleanup then Curl.cleanup st.client.curl;
return (Error (c, Curl.strerror c))
in
loop tries
let http_stream ?(tries = 1) ?client ?config ?range ?content ?headers ~url
~meth ~(write_into : #input_stream) () : (unit response, _) result io =
let http_stream ?(tries = 1) ?client ?config ?range ?content ?headers
?callback ~url ~meth ~(write_into : #input_stream) () : (unit response, _) result io =
let tries = max tries 1 in
let st =
http_setup_ ?client ?config ?range ?content ?headers ~url ~meth ()
http_setup_ ?client ?config ?range ?content ?headers ?callback ~url ~meth ()
in
let body_sse_ =
match callback with
| None -> None
| Some (`Sse_event _) -> Some (Buffer.create 64) in
Curl.set_writefunction st.client.curl (fun s ->
let n = String.length s in
write_into#on_input (Bytes.unsafe_of_string s) 0 n;
n);
match body_sse_ with
| None -> n
| Some body ->
Buffer.add_string body s;
(match sse_handle_post_write callback body st.sse_frame with
| true -> n (* Continue write *)
| false -> 0xFFFFFFFE
(* Stop stream, not-forked libcurl has no CURL_WRITE_FUNC_ABORT *))
);
let rec loop i =
IO.perform st.client.curl >>= function
| Curl.CURLE_OK ->
(match body_sse_ with
| None -> ()
| Some body -> let _ = sse_handle_post_finish callback body st.sse_frame in ());
let r = mk_res st.client (List.rev st.resp_headers) () in
write_into#on_close ();
if st.do_cleanup then Curl.cleanup st.client.curl;
return r
| Curl.CURLE_AGAIN when i > 1 -> loop (i - 1) (* try again *)
| c ->
(match body_sse_ with
| None -> ()
| Some body -> let _ = sse_handle_post_finish callback body st.sse_frame in ());
write_into#on_close ();
if st.do_cleanup then Curl.cleanup st.client.curl;
return (Error (c, Curl.strerror c))
in
loop tries
let get ?tries ?client ?config ?range ?headers ~url () : _ result io =
http ?tries ?client ?config ?range ?headers ~url ~meth:GET ()
let get ?tries ?client ?config ?range ?headers ?callback ~url () : _ result io =
http ?tries ?client ?config ?range ?headers ?callback ~url ~meth:GET ()
let post ?tries ?client ?config ?headers ?content ~params ~url () :
let post ?tries ?client ?config ?headers ?content ?callback ~params ~url () :
_ result io =
http ?tries ?client ?config ?headers ?content ~url ~meth:(POST params) ()
http ?tries ?client ?config ?headers ?content ?callback ~url ~meth:(POST params) ()
let put ?tries ?client ?config ?headers ~url ~content () : _ result io =
http ?tries ?client ?config ?headers ~url ~content ~meth:PUT ()
let put ?tries ?client ?config ?headers ?callback ~url ~content () : _ result io =
http ?tries ?client ?config ?headers ?callback ~url ~content ~meth:PUT ()
end

View file

@ -114,6 +114,20 @@ type meth =
val pp_meth : Format.formatter -> meth -> unit
val string_of_meth : meth -> string
type sse_frame = {
event: string option;
id: string option;
data: string option;
retry: int option;
empties: string list; (* Lines without a ':' *)
}
type sse_state =
| Frame of sse_frame
| End_of_stream
type sse_callback = sse_state -> bool
(** {2 Underlying IO Monad} *)
module type IO = sig
type 'a t
@ -136,6 +150,7 @@ module type S = sig
?range:string ->
?content:[ `String of string | `Write of bytes -> int -> int ] ->
?headers:(string * string) list ->
?callback:[ `Sse_event of sse_callback ] ->
url:string ->
meth:meth ->
unit ->
@ -158,6 +173,12 @@ module type S = sig
It must return [0] when the content is entirely written, and not
before.
@param headers headers of the query
@param callback callback to use on received body, either
a [None] to keep normal Curl write behavior, or [`Sse_event f]
to enable {{: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events}
Server-sent events } processing, where [f] is a callback type [sse_callback]
and returns boolean to indicate if the internal write callback can
continue to proceed process or else close the incoming infinite stream.
*)
(** Push-based stream of bytes
@ -174,6 +195,7 @@ module type S = sig
?range:string ->
?content:[ `String of string | `Write of bytes -> int -> int ] ->
?headers:(string * string) list ->
?callback:[ `Sse_event of sse_callback ] ->
url:string ->
meth:meth ->
write_into:#input_stream ->
@ -191,6 +213,7 @@ module type S = sig
?config:Config.t ->
?range:string ->
?headers:(string * string) list ->
?callback:[ `Sse_event of sse_callback ] ->
url:string ->
unit ->
(string response, Curl.curlCode * string) result io
@ -203,6 +226,7 @@ module type S = sig
?client:t ->
?config:Config.t ->
?headers:(string * string) list ->
?callback:[ `Sse_event of sse_callback ] ->
url:string ->
content:[ `String of string | `Write of bytes -> int -> int ] ->
unit ->
@ -217,6 +241,7 @@ module type S = sig
?config:Config.t ->
?headers:(string * string) list ->
?content:[ `String of string | `Write of bytes -> int -> int ] ->
?callback:[ `Sse_event of sse_callback ] ->
params:Curl.curlHTTPPost list ->
url:string ->
unit ->