implement http_stream

This commit is contained in:
Simon Cruanes 2024-10-02 12:46:43 -04:00
parent fbd71baa19
commit 49b265ce56
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 90 additions and 31 deletions

View file

@ -69,6 +69,7 @@ module Config = struct
end
type t = { curl: Curl.t } [@@unboxed]
type client = t
let _init =
let initialized = ref false in
@ -246,12 +247,12 @@ module type S = sig
@param headers headers of the query
*)
type stream = {
on_close: unit -> unit io;
on_chunk: string -> int -> int -> unit io;
}
(** Push-based stream of bytes
(** Push-stream of bytes
@since NEXT_RELEASE *)
class type input_stream = object
method on_close : unit -> unit
method on_input : bytes -> int -> int -> unit
end
val http_stream :
?tries:int ->
@ -262,8 +263,9 @@ module type S = sig
?headers:(string * string) list ->
url:string ->
meth:meth ->
write_into:#input_stream ->
unit ->
(stream response, Curl.curlCode * string) result io
(unit response, Curl.curlCode * string) result io
(** HTTP call via cURL, with a streaming response body.
@since NEXT_RELEASE *)
@ -362,13 +364,20 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
| `String s -> Some (String.length s)
| `Write _ -> None
type stream = {
on_close: unit -> unit io;
on_chunk: string -> int -> int -> unit io;
class type input_stream = object
method on_close : unit -> unit
method on_input : bytes -> int -> int -> unit
end
type http_state_ = {
client: client;
do_cleanup: bool;
mutable resp_headers: string list;
mutable resp_headers_done: bool;
}
let http_ ?(tries = 1) ?client ?(config = Config.default) ?range ?content
?(headers = []) ~url ~meth () : (stream response, _) result io =
let http_setup_ ?client ?(config = Config.default) ?range ?content
?(headers = []) ~url ~meth () : http_state_ =
let headers = ref headers in
let do_cleanup, self =
match client with
@ -390,11 +399,15 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
| Some size, _ -> Curl.set_infilesize self.curl size);
(* local state *)
let tries = max tries 1 in
(* at least one attempt *)
let body = Buffer.create 64 in
let resp_headers = ref [] in
let resp_headers_done = ref false in
let st =
{
do_cleanup;
client = self;
resp_headers = [];
resp_headers_done = false;
}
in
(* once we get "\r\n" header line *)
Curl.set_url self.curl url;
(match meth with
@ -416,29 +429,71 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
let s = String.trim s0 in
(* Printf.printf "got header %S\n%!" s0; *)
if s0 = "\r\n" then
resp_headers_done := true
st.resp_headers_done <- true
else (
(* redirection: drop previous headers *)
if !resp_headers_done then (
resp_headers_done := false;
resp_headers := []
if st.resp_headers_done then (
st.resp_headers_done <- false;
st.resp_headers <- []
);
resp_headers := s :: !resp_headers
st.resp_headers <- s :: st.resp_headers
);
String.length s0);
Curl.set_writefunction self.curl (fun s ->
st
let http ?(tries = 1) ?client ?config ?range ?content ?headers ~url ~meth () :
(string response, _) result io =
(* at least one attempt *)
let tries = max tries 1 in
let st =
http_setup_ ?client ?config ?range ?content ?headers ~url ~meth ()
in
let body = Buffer.create 64 in
Curl.set_writefunction st.client.curl (fun s ->
Buffer.add_string body s;
String.length s);
let rec loop i =
IO.perform self.curl >>= function
IO.perform st.client.curl >>= function
| Curl.CURLE_OK ->
let r = mk_res self (List.rev !resp_headers) (Buffer.contents body) in
if do_cleanup then Curl.cleanup self.curl;
let r =
mk_res st.client (List.rev st.resp_headers) (Buffer.contents body)
in
if st.do_cleanup then Curl.cleanup st.client.curl;
return r
| Curl.CURLE_AGAIN when i > 1 -> loop (i - 1) (* try again *)
| c ->
if do_cleanup then Curl.cleanup self.curl;
if st.do_cleanup then Curl.cleanup st.client.curl;
return (Error (c, Curl.strerror c))
in
loop tries
let http_stream ?(tries = 1) ?client ?config ?range ?content ?headers ~url
~meth ~(write_into : #input_stream) () : (unit response, _) result io =
let tries = max tries 1 in
let st =
http_setup_ ?client ?config ?range ?content ?headers ~url ~meth ()
in
Curl.set_writefunction st.client.curl (fun s ->
let n = String.length s in
write_into#on_input (Bytes.unsafe_of_string s) 0 n;
n);
let rec loop i =
IO.perform st.client.curl >>= function
| Curl.CURLE_OK ->
let r = mk_res st.client (List.rev st.resp_headers) () in
write_into#on_close ();
if st.do_cleanup then Curl.cleanup st.client.curl;
return r
| Curl.CURLE_AGAIN when i > 1 -> loop (i - 1) (* try again *)
| c ->
write_into#on_close ();
if st.do_cleanup then Curl.cleanup st.client.curl;
return (Error (c, Curl.strerror c))
in
loop tries

View file

@ -155,12 +155,12 @@ module type S = sig
@param headers headers of the query
*)
type stream = {
on_close: unit -> unit io;
on_chunk: string -> int -> int -> unit io;
}
(** Push-based stream of bytes
@since NEXT_RELEASE *)
class type input_stream = object
method on_close : unit -> unit
method on_input : bytes -> int -> int -> unit
end
val http_stream :
?tries:int ->
@ -171,9 +171,13 @@ module type S = sig
?headers:(string * string) list ->
url:string ->
meth:meth ->
write_into:#input_stream ->
unit ->
(stream response, Curl.curlCode * string) result io
(unit response, Curl.curlCode * string) result io
(** HTTP call via cURL, with a streaming response body.
The body is given to [write_into] by chunks,
then [write_into#on_close ()] is called
and the response is returned.
@since NEXT_RELEASE *)
val get :