From 49b265ce566f17ef7cbc09aa01fa7e02b6424e88 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 2 Oct 2024 12:46:43 -0400 Subject: [PATCH] implement `http_stream` --- src/core/ezcurl_core.ml | 107 +++++++++++++++++++++++++++++---------- src/core/ezcurl_core.mli | 14 +++-- 2 files changed, 90 insertions(+), 31 deletions(-) diff --git a/src/core/ezcurl_core.ml b/src/core/ezcurl_core.ml index e04e66e..a3ad4b5 100644 --- a/src/core/ezcurl_core.ml +++ b/src/core/ezcurl_core.ml @@ -69,6 +69,7 @@ module Config = struct end type t = { curl: Curl.t } [@@unboxed] +type client = t let _init = let initialized = ref false in @@ -246,12 +247,12 @@ module type S = sig @param headers headers of the query *) - type stream = { - on_close: unit -> unit io; - on_chunk: string -> int -> int -> unit io; - } - (** Push-based stream of bytes + (** Push-stream of bytes @since NEXT_RELEASE *) + class type input_stream = object + method on_close : unit -> unit + method on_input : bytes -> int -> int -> unit + end val http_stream : ?tries:int -> @@ -262,8 +263,9 @@ module type S = sig ?headers:(string * string) list -> url:string -> meth:meth -> + write_into:#input_stream -> unit -> - (stream response, Curl.curlCode * string) result io + (unit response, Curl.curlCode * string) result io (** HTTP call via cURL, with a streaming response body. @since NEXT_RELEASE *) @@ -362,13 +364,20 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct | `String s -> Some (String.length s) | `Write _ -> None - type stream = { - on_close: unit -> unit io; - on_chunk: string -> int -> int -> unit io; + class type input_stream = object + method on_close : unit -> unit + method on_input : bytes -> int -> int -> unit + end + + type http_state_ = { + client: client; + do_cleanup: bool; + mutable resp_headers: string list; + mutable resp_headers_done: bool; } - let http_ ?(tries = 1) ?client ?(config = Config.default) ?range ?content - ?(headers = []) ~url ~meth () : (stream response, _) result io = + let http_setup_ ?client ?(config = Config.default) ?range ?content + ?(headers = []) ~url ~meth () : http_state_ = let headers = ref headers in let do_cleanup, self = match client with @@ -390,11 +399,15 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct | Some size, _ -> Curl.set_infilesize self.curl size); (* local state *) - let tries = max tries 1 in - (* at least one attempt *) - let body = Buffer.create 64 in - let resp_headers = ref [] in - let resp_headers_done = ref false in + let st = + { + do_cleanup; + client = self; + resp_headers = []; + resp_headers_done = false; + } + in + (* once we get "\r\n" header line *) Curl.set_url self.curl url; (match meth with @@ -416,29 +429,71 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct let s = String.trim s0 in (* Printf.printf "got header %S\n%!" s0; *) if s0 = "\r\n" then - resp_headers_done := true + st.resp_headers_done <- true else ( (* redirection: drop previous headers *) - if !resp_headers_done then ( - resp_headers_done := false; - resp_headers := [] + if st.resp_headers_done then ( + st.resp_headers_done <- false; + st.resp_headers <- [] ); - resp_headers := s :: !resp_headers + st.resp_headers <- s :: st.resp_headers ); String.length s0); - Curl.set_writefunction self.curl (fun s -> + + st + + let http ?(tries = 1) ?client ?config ?range ?content ?headers ~url ~meth () : + (string response, _) result io = + (* at least one attempt *) + let tries = max tries 1 in + let st = + http_setup_ ?client ?config ?range ?content ?headers ~url ~meth () + in + + let body = Buffer.create 64 in + Curl.set_writefunction st.client.curl (fun s -> Buffer.add_string body s; String.length s); + let rec loop i = - IO.perform self.curl >>= function + IO.perform st.client.curl >>= function | Curl.CURLE_OK -> - let r = mk_res self (List.rev !resp_headers) (Buffer.contents body) in - if do_cleanup then Curl.cleanup self.curl; + let r = + mk_res st.client (List.rev st.resp_headers) (Buffer.contents body) + in + if st.do_cleanup then Curl.cleanup st.client.curl; return r | Curl.CURLE_AGAIN when i > 1 -> loop (i - 1) (* try again *) | c -> - if do_cleanup then Curl.cleanup self.curl; + if st.do_cleanup then Curl.cleanup st.client.curl; + return (Error (c, Curl.strerror c)) + in + loop tries + + let http_stream ?(tries = 1) ?client ?config ?range ?content ?headers ~url + ~meth ~(write_into : #input_stream) () : (unit response, _) result io = + let tries = max tries 1 in + let st = + http_setup_ ?client ?config ?range ?content ?headers ~url ~meth () + in + + Curl.set_writefunction st.client.curl (fun s -> + let n = String.length s in + write_into#on_input (Bytes.unsafe_of_string s) 0 n; + n); + + let rec loop i = + IO.perform st.client.curl >>= function + | Curl.CURLE_OK -> + let r = mk_res st.client (List.rev st.resp_headers) () in + write_into#on_close (); + if st.do_cleanup then Curl.cleanup st.client.curl; + return r + | Curl.CURLE_AGAIN when i > 1 -> loop (i - 1) (* try again *) + | c -> + write_into#on_close (); + if st.do_cleanup then Curl.cleanup st.client.curl; return (Error (c, Curl.strerror c)) in loop tries diff --git a/src/core/ezcurl_core.mli b/src/core/ezcurl_core.mli index fdc9f03..6a06538 100644 --- a/src/core/ezcurl_core.mli +++ b/src/core/ezcurl_core.mli @@ -155,12 +155,12 @@ module type S = sig @param headers headers of the query *) - type stream = { - on_close: unit -> unit io; - on_chunk: string -> int -> int -> unit io; - } (** Push-based stream of bytes @since NEXT_RELEASE *) + class type input_stream = object + method on_close : unit -> unit + method on_input : bytes -> int -> int -> unit + end val http_stream : ?tries:int -> @@ -171,9 +171,13 @@ module type S = sig ?headers:(string * string) list -> url:string -> meth:meth -> + write_into:#input_stream -> unit -> - (stream response, Curl.curlCode * string) result io + (unit response, Curl.curlCode * string) result io (** HTTP call via cURL, with a streaming response body. + The body is given to [write_into] by chunks, + then [write_into#on_close ()] is called + and the response is returned. @since NEXT_RELEASE *) val get :