Tweak streaming API

This commit is contained in:
Frédéric Bour 2025-03-14 19:16:54 +09:00
parent 8d783dc626
commit 322b4da0ed
3 changed files with 52 additions and 37 deletions

View file

@ -265,13 +265,6 @@ module type S = sig
@param headers headers of the query
*)
(** Push-stream of bytes
@since NEXT_RELEASE *)
class type input_stream = object
method on_close : unit -> unit
method on_input : bytes -> int -> int -> unit
end
val http_stream :
?tries:int ->
?client:t ->
@ -281,7 +274,10 @@ module type S = sig
?headers:header list ->
url:string ->
meth:meth ->
write_into:#input_stream ->
?on_respond:((unit response, curl_error) result -> unit) ->
on_write:(bytes -> length:int -> unit) ->
?on_close:(unit -> unit) ->
?on_progress:(downloaded:int64 -> expected:int64 option -> [`Continue | `Abort]) ->
unit ->
(unit response, curl_error) result io
(** HTTP call via cURL, with a streaming response body.
@ -355,11 +351,6 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
| `String s -> Some (String.length s)
| `Write _ -> None
class type input_stream = object
method on_close : unit -> unit
method on_input : bytes -> int -> int -> unit
end
type http_state_ = {
client: client;
do_cleanup: bool;
@ -489,27 +480,56 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
loop tries
let http_stream ?(tries = 1) ?client ?config ?range ?content ?headers ~url
~meth ~(write_into : #input_stream) () : (unit response, _) result io =
~meth ?(on_respond=ignore) ~on_write ?(on_close=ignore) ?on_progress ()
: (unit response, _) result io =
let tries = max tries 1 in
let st =
http_setup_ ?client ?config ?range ?content ?headers ~url ~meth ()
in
let responded = ref false in
let respond () =
if not !responded then (
responded := true;
on_respond (mk_res st ())
)
in
Curl.set_writefunction st.client.curl (fun s ->
let n = String.length s in
write_into#on_input (Bytes.unsafe_of_string s) 0 n;
n);
respond ();
let length = String.length s in
on_write (Bytes.unsafe_of_string s) ~length;
length);
opt_iter on_progress ~f:(fun on_progress ->
Curl.set_xferinfofunction st.client.curl
(fun dltotal dlnow _ultotal _ulnow ->
match
on_progress
~downloaded:dlnow
~expected:(if dltotal = 0L then None else Some dltotal)
with
| `Continue -> false
| `Abort -> true
);
Curl.set_noprogress st.client.curl false;
);
let rec loop i =
IO.perform st.client.curl >>= function
| Curl.CURLE_OK ->
let r = mk_res st () in
write_into#on_close ();
if not !responded then (
responded := true;
on_respond r;
);
on_close ();
if st.do_cleanup then Curl.cleanup st.client.curl;
return r
| Curl.CURLE_AGAIN when i > 1 -> loop (i - 1) (* try again *)
| c ->
write_into#on_close ();
respond ();
on_close ();
if st.do_cleanup then Curl.cleanup st.client.curl;
return (Error (c, Curl.strerror c))
in

View file

@ -164,13 +164,6 @@ module type S = sig
@param headers headers of the query
*)
(** Push-based stream of bytes
@since NEXT_RELEASE *)
class type input_stream = object
method on_close : unit -> unit
method on_input : bytes -> int -> int -> unit
end
val http_stream :
?tries:int ->
?client:t ->
@ -180,13 +173,19 @@ module type S = sig
?headers:header list ->
url:string ->
meth:meth ->
write_into:#input_stream ->
unit ->
(unit response, curl_error) result io
?on_respond:((unit response, curl_error) result -> unit) ->
on_write:(bytes -> length:int -> unit) ->
?on_close:(unit -> unit) ->
?on_progress:(downloaded:int64 ->
expected:int64 option -> [ `Abort | `Continue ]) ->
unit -> (unit response, curl_error) result io
(** HTTP call via cURL, with a streaming response body.
The body is given to [write_into] by chunks,
then [write_into#on_close ()] is called
and the response is returned.
[on_respond] is called as soon as the response code and headers are
available. If the call failed, the response code will be zero.
Then the body is given to [on_write] by chunks, and, finally [on_close ()]
is called and the response is returned.
[on_progress] is regularly called to give the opportunity to track the
progress of a large or slow transfer and to abort it.
@since NEXT_RELEASE *)
val get :

View file

@ -15,11 +15,7 @@ let () =
let buf = Buffer.create 32 in
match
Ezcurl.http_stream ~meth:GET ~url
~write_into:
(object
method on_input bs i len = Buffer.add_subbytes buf bs i len
method on_close () = ()
end)
~on_write:(fun bs ~length -> Buffer.add_subbytes buf bs 0 length)
()
with
| Error (code, msg) ->