diff --git a/src/core/ezcurl_core.ml b/src/core/ezcurl_core.ml index 8a46104..29fd4f0 100644 --- a/src/core/ezcurl_core.ml +++ b/src/core/ezcurl_core.ml @@ -265,13 +265,6 @@ module type S = sig @param headers headers of the query *) - (** Push-stream of bytes - @since NEXT_RELEASE *) - class type input_stream = object - method on_close : unit -> unit - method on_input : bytes -> int -> int -> unit - end - val http_stream : ?tries:int -> ?client:t -> @@ -281,7 +274,10 @@ module type S = sig ?headers:header list -> url:string -> meth:meth -> - write_into:#input_stream -> + ?on_respond:((unit response, curl_error) result -> unit) -> + on_write:(bytes -> length:int -> unit) -> + ?on_close:(unit -> unit) -> + ?on_progress:(downloaded:int64 -> expected:int64 option -> [`Continue | `Abort]) -> unit -> (unit response, curl_error) result io (** HTTP call via cURL, with a streaming response body. @@ -355,11 +351,6 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct | `String s -> Some (String.length s) | `Write _ -> None - class type input_stream = object - method on_close : unit -> unit - method on_input : bytes -> int -> int -> unit - end - type http_state_ = { client: client; do_cleanup: bool; @@ -489,27 +480,56 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct loop tries let http_stream ?(tries = 1) ?client ?config ?range ?content ?headers ~url - ~meth ~(write_into : #input_stream) () : (unit response, _) result io = + ~meth ?(on_respond=ignore) ~on_write ?(on_close=ignore) ?on_progress () + : (unit response, _) result io = let tries = max tries 1 in let st = http_setup_ ?client ?config ?range ?content ?headers ~url ~meth () in + let responded = ref false in + let respond () = + if not !responded then ( + responded := true; + on_respond (mk_res st ()) + ) + in + Curl.set_writefunction st.client.curl (fun s -> - let n = String.length s in - write_into#on_input (Bytes.unsafe_of_string s) 0 n; - n); + respond (); + let length = String.length s in + on_write (Bytes.unsafe_of_string s) ~length; + length); + + opt_iter on_progress ~f:(fun on_progress -> + Curl.set_xferinfofunction st.client.curl + (fun dltotal dlnow _ultotal _ulnow -> + match + on_progress + ~downloaded:dlnow + ~expected:(if dltotal = 0L then None else Some dltotal) + with + | `Continue -> false + | `Abort -> true + ); + Curl.set_noprogress st.client.curl false; + ); let rec loop i = IO.perform st.client.curl >>= function | Curl.CURLE_OK -> let r = mk_res st () in - write_into#on_close (); + if not !responded then ( + responded := true; + on_respond r; + ); + on_close (); if st.do_cleanup then Curl.cleanup st.client.curl; return r | Curl.CURLE_AGAIN when i > 1 -> loop (i - 1) (* try again *) | c -> - write_into#on_close (); + respond (); + on_close (); if st.do_cleanup then Curl.cleanup st.client.curl; return (Error (c, Curl.strerror c)) in diff --git a/src/core/ezcurl_core.mli b/src/core/ezcurl_core.mli index 04ee87d..b03befb 100644 --- a/src/core/ezcurl_core.mli +++ b/src/core/ezcurl_core.mli @@ -164,13 +164,6 @@ module type S = sig @param headers headers of the query *) - (** Push-based stream of bytes - @since NEXT_RELEASE *) - class type input_stream = object - method on_close : unit -> unit - method on_input : bytes -> int -> int -> unit - end - val http_stream : ?tries:int -> ?client:t -> @@ -180,13 +173,19 @@ module type S = sig ?headers:header list -> url:string -> meth:meth -> - write_into:#input_stream -> - unit -> - (unit response, curl_error) result io + ?on_respond:((unit response, curl_error) result -> unit) -> + on_write:(bytes -> length:int -> unit) -> + ?on_close:(unit -> unit) -> + ?on_progress:(downloaded:int64 -> + expected:int64 option -> [ `Abort | `Continue ]) -> + unit -> (unit response, curl_error) result io (** HTTP call via cURL, with a streaming response body. - The body is given to [write_into] by chunks, - then [write_into#on_close ()] is called - and the response is returned. + [on_respond] is called as soon as the response code and headers are + available. If the call failed, the response code will be zero. + Then the body is given to [on_write] by chunks, and, finally [on_close ()] + is called and the response is returned. + [on_progress] is regularly called to give the opportunity to track the + progress of a large or slow transfer and to abort it. @since NEXT_RELEASE *) val get : diff --git a/test/basic_test.ml b/test/basic_test.ml index 1c0a1e0..6e1338e 100644 --- a/test/basic_test.ml +++ b/test/basic_test.ml @@ -15,11 +15,7 @@ let () = let buf = Buffer.create 32 in match Ezcurl.http_stream ~meth:GET ~url - ~write_into: - (object - method on_input bs i len = Buffer.add_subbytes buf bs i len - method on_close () = () - end) + ~on_write:(fun bs ~length -> Buffer.add_subbytes buf bs 0 length) () with | Error (code, msg) ->