Merge pull request #17 from c-cube/wip-stream-response

streaming response
This commit is contained in:
Simon Cruanes 2024-10-03 11:04:57 -04:00 committed by GitHub
commit 0faaf35969
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 211 additions and 39 deletions

View file

@ -7,6 +7,9 @@ build:
test: test:
@dune runtest --no-buffer --force @dune runtest --no-buffer --force
test-autopromote:
@dune runtest --no-buffer --force --auto-promote
clean: clean:
@dune clean @dune clean

View file

@ -33,7 +33,7 @@ val url : string = "https://curl.haxx.se/"
# let res = Ezcurl.get ~url ();; # let res = Ezcurl.get ~url ();;
... ...
# let content = match res with Ok c -> c | Error (_,s) -> failwith s;; # let content = match res with Ok c -> c | Error (_,s) -> failwith s;;
val content : Ezcurl_core.response = val content : string Ezcurl_core.response =
... ...
# content.Ezcurl.code;; # content.Ezcurl.code;;

View file

@ -69,6 +69,7 @@ module Config = struct
end end
type t = { curl: Curl.t } [@@unboxed] type t = { curl: Curl.t } [@@unboxed]
type client = t
let _init = let _init =
let initialized = ref false in let initialized = ref false in
@ -159,22 +160,23 @@ let pp_response_info out r =
let string_of_response_info s = Format.asprintf "%a" pp_response_info s let string_of_response_info s = Format.asprintf "%a" pp_response_info s
type response = { type 'body response = {
code: int; code: int;
headers: (string * string) list; headers: (string * string) list;
body: string; body: 'body;
info: response_info; info: response_info;
} }
let pp_response out r = let pp_response_with ppbody out r =
let pp_header out (s1, s2) = Format.fprintf out "@[<2>%s:@ %s@]" s1 s2 in let pp_header out (s1, s2) = Format.fprintf out "@[<2>%s:@ %s@]" s1 s2 in
let pp_headers out l = let pp_headers out l =
Format.fprintf out "@[<v>%a@]" (Format.pp_print_list pp_header) l Format.fprintf out "@[<v>%a@]" (Format.pp_print_list pp_header) l
in in
let { code; body; headers; info } = r in let { code; body; headers; info } = r in
Format.fprintf out "{@[code=%d;@ headers=@[%a@];@ info=%a;@ body=@[%a@]@]}" Format.fprintf out "{@[code=%d;@ headers=@[%a@];@ info=%a;@ body=@[%a@]@]}"
code pp_headers headers pp_response_info info Format.pp_print_text body code pp_headers headers pp_response_info info ppbody body
let pp_response = pp_response_with Format.pp_print_text
let string_of_response s = Format.asprintf "%a" pp_response s let string_of_response s = Format.asprintf "%a" pp_response s
type meth = type meth =
@ -224,7 +226,7 @@ module type S = sig
url:string -> url:string ->
meth:meth -> meth:meth ->
unit -> unit ->
(response, Curl.curlCode * string) result io (string response, Curl.curlCode * string) result io
(** General purpose HTTP call via cURL. (** General purpose HTTP call via cURL.
@param url the URL to query @param url the URL to query
@param meth which method to use (see {!meth}) @param meth which method to use (see {!meth})
@ -245,6 +247,28 @@ module type S = sig
@param headers headers of the query @param headers headers of the query
*) *)
(** Push-stream of bytes
@since NEXT_RELEASE *)
class type input_stream = object
method on_close : unit -> unit
method on_input : bytes -> int -> int -> unit
end
val http_stream :
?tries:int ->
?client:t ->
?config:Config.t ->
?range:string ->
?content:[ `String of string | `Write of bytes -> int -> int ] ->
?headers:(string * string) list ->
url:string ->
meth:meth ->
write_into:#input_stream ->
unit ->
(unit response, Curl.curlCode * string) result io
(** HTTP call via cURL, with a streaming response body.
@since NEXT_RELEASE *)
val get : val get :
?tries:int -> ?tries:int ->
?client:t -> ?client:t ->
@ -253,7 +277,7 @@ module type S = sig
?headers:(string * string) list -> ?headers:(string * string) list ->
url:string -> url:string ->
unit -> unit ->
(response, Curl.curlCode * string) result io (string response, Curl.curlCode * string) result io
(** Shortcut for [http ~meth:GET] (** Shortcut for [http ~meth:GET]
See {!http} for more info. See {!http} for more info.
*) *)
@ -266,7 +290,7 @@ module type S = sig
url:string -> url:string ->
content:[ `String of string | `Write of bytes -> int -> int ] -> content:[ `String of string | `Write of bytes -> int -> int ] ->
unit -> unit ->
(response, Curl.curlCode * string) result io (string response, Curl.curlCode * string) result io
(** Shortcut for [http ~meth:PUT] (** Shortcut for [http ~meth:PUT]
See {!http} for more info. See {!http} for more info.
*) *)
@ -280,7 +304,7 @@ module type S = sig
params:Curl.curlHTTPPost list -> params:Curl.curlHTTPPost list ->
url:string -> url:string ->
unit -> unit ->
(response, Curl.curlCode * string) result io (string response, Curl.curlCode * string) result io
(** Shortcut for [http ~meth:(POST params)] (** Shortcut for [http ~meth:(POST params)]
See {!http} for more info. See {!http} for more info.
*) *)
@ -288,7 +312,7 @@ end
exception Parse_error of Curl.curlCode * string exception Parse_error of Curl.curlCode * string
let mk_res (self : t) headers body : (response, _) result = let mk_res (self : t) headers body : (_ response, _) result =
let split_colon s = let split_colon s =
match String.index s ':' with match String.index s ':' with
| exception Not_found -> | exception Not_found ->
@ -340,8 +364,20 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
| `String s -> Some (String.length s) | `String s -> Some (String.length s)
| `Write _ -> None | `Write _ -> None
let http ?(tries = 1) ?client ?(config = Config.default) ?range ?content class type input_stream = object
?(headers = []) ~url ~meth () : _ result io = method on_close : unit -> unit
method on_input : bytes -> int -> int -> unit
end
type http_state_ = {
client: client;
do_cleanup: bool;
mutable resp_headers: string list;
mutable resp_headers_done: bool;
}
let http_setup_ ?client ?(config = Config.default) ?range ?content
?(headers = []) ~url ~meth () : http_state_ =
let headers = ref headers in let headers = ref headers in
let do_cleanup, self = let do_cleanup, self =
match client with match client with
@ -363,11 +399,15 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
| Some size, _ -> Curl.set_infilesize self.curl size); | Some size, _ -> Curl.set_infilesize self.curl size);
(* local state *) (* local state *)
let tries = max tries 1 in let st =
(* at least one attempt *) {
let body = Buffer.create 64 in do_cleanup;
let resp_headers = ref [] in client = self;
let resp_headers_done = ref false in resp_headers = [];
resp_headers_done = false;
}
in
(* once we get "\r\n" header line *) (* once we get "\r\n" header line *)
Curl.set_url self.curl url; Curl.set_url self.curl url;
(match meth with (match meth with
@ -389,29 +429,71 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct
let s = String.trim s0 in let s = String.trim s0 in
(* Printf.printf "got header %S\n%!" s0; *) (* Printf.printf "got header %S\n%!" s0; *)
if s0 = "\r\n" then if s0 = "\r\n" then
resp_headers_done := true st.resp_headers_done <- true
else ( else (
(* redirection: drop previous headers *) (* redirection: drop previous headers *)
if !resp_headers_done then ( if st.resp_headers_done then (
resp_headers_done := false; st.resp_headers_done <- false;
resp_headers := [] st.resp_headers <- []
); );
resp_headers := s :: !resp_headers st.resp_headers <- s :: st.resp_headers
); );
String.length s0); String.length s0);
Curl.set_writefunction self.curl (fun s ->
st
let http ?(tries = 1) ?client ?config ?range ?content ?headers ~url ~meth () :
(string response, _) result io =
(* at least one attempt *)
let tries = max tries 1 in
let st =
http_setup_ ?client ?config ?range ?content ?headers ~url ~meth ()
in
let body = Buffer.create 64 in
Curl.set_writefunction st.client.curl (fun s ->
Buffer.add_string body s; Buffer.add_string body s;
String.length s); String.length s);
let rec loop i = let rec loop i =
IO.perform self.curl >>= function IO.perform st.client.curl >>= function
| Curl.CURLE_OK -> | Curl.CURLE_OK ->
let r = mk_res self (List.rev !resp_headers) (Buffer.contents body) in let r =
if do_cleanup then Curl.cleanup self.curl; mk_res st.client (List.rev st.resp_headers) (Buffer.contents body)
in
if st.do_cleanup then Curl.cleanup st.client.curl;
return r return r
| Curl.CURLE_AGAIN when i > 1 -> loop (i - 1) (* try again *) | Curl.CURLE_AGAIN when i > 1 -> loop (i - 1) (* try again *)
| c -> | c ->
if do_cleanup then Curl.cleanup self.curl; if st.do_cleanup then Curl.cleanup st.client.curl;
return (Error (c, Curl.strerror c))
in
loop tries
let http_stream ?(tries = 1) ?client ?config ?range ?content ?headers ~url
~meth ~(write_into : #input_stream) () : (unit response, _) result io =
let tries = max tries 1 in
let st =
http_setup_ ?client ?config ?range ?content ?headers ~url ~meth ()
in
Curl.set_writefunction st.client.curl (fun s ->
let n = String.length s in
write_into#on_input (Bytes.unsafe_of_string s) 0 n;
n);
let rec loop i =
IO.perform st.client.curl >>= function
| Curl.CURLE_OK ->
let r = mk_res st.client (List.rev st.resp_headers) () in
write_into#on_close ();
if st.do_cleanup then Curl.cleanup st.client.curl;
return r
| Curl.CURLE_AGAIN when i > 1 -> loop (i - 1) (* try again *)
| c ->
write_into#on_close ();
if st.do_cleanup then Curl.cleanup st.client.curl;
return (Error (c, Curl.strerror c)) return (Error (c, Curl.strerror c))
in in
loop tries loop tries

View file

@ -78,17 +78,20 @@ type response_info = {
val pp_response_info : Format.formatter -> response_info -> unit val pp_response_info : Format.formatter -> response_info -> unit
val string_of_response_info : response_info -> string val string_of_response_info : response_info -> string
type response = { type 'body response = {
code: int; code: int;
(** Response code. See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status *) (** Response code. See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status *)
headers: (string * string) list; (** Response headers *) headers: (string * string) list; (** Response headers *)
body: string; (** Response body, or [""] *) body: 'body; (** Response body, or [""] *)
info: response_info; (** Information about the response *) info: response_info; (** Information about the response *)
} }
(** Response for a given request. *) (** Response for a given request. *)
val pp_response : Format.formatter -> response -> unit val pp_response_with :
val string_of_response : response -> string (Format.formatter -> 'a -> unit) -> Format.formatter -> 'a response -> unit
val pp_response : Format.formatter -> string response -> unit
val string_of_response : string response -> string
(** The {{: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods} HTTP method} (** The {{: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods} HTTP method}
to use *) to use *)
@ -131,7 +134,7 @@ module type S = sig
url:string -> url:string ->
meth:meth -> meth:meth ->
unit -> unit ->
(response, Curl.curlCode * string) result io (string response, Curl.curlCode * string) result io
(** General purpose HTTP call via cURL. (** General purpose HTTP call via cURL.
@param url the URL to query @param url the URL to query
@param meth which method to use (see {!meth}) @param meth which method to use (see {!meth})
@ -152,6 +155,31 @@ module type S = sig
@param headers headers of the query @param headers headers of the query
*) *)
(** Push-based stream of bytes
@since NEXT_RELEASE *)
class type input_stream = object
method on_close : unit -> unit
method on_input : bytes -> int -> int -> unit
end
val http_stream :
?tries:int ->
?client:t ->
?config:Config.t ->
?range:string ->
?content:[ `String of string | `Write of bytes -> int -> int ] ->
?headers:(string * string) list ->
url:string ->
meth:meth ->
write_into:#input_stream ->
unit ->
(unit response, Curl.curlCode * string) result io
(** HTTP call via cURL, with a streaming response body.
The body is given to [write_into] by chunks,
then [write_into#on_close ()] is called
and the response is returned.
@since NEXT_RELEASE *)
val get : val get :
?tries:int -> ?tries:int ->
?client:t -> ?client:t ->
@ -160,7 +188,7 @@ module type S = sig
?headers:(string * string) list -> ?headers:(string * string) list ->
url:string -> url:string ->
unit -> unit ->
(response, Curl.curlCode * string) result io (string response, Curl.curlCode * string) result io
(** Shortcut for [http ~meth:GET] (** Shortcut for [http ~meth:GET]
See {!http} for more info. See {!http} for more info.
*) *)
@ -173,7 +201,7 @@ module type S = sig
url:string -> url:string ->
content:[ `String of string | `Write of bytes -> int -> int ] -> content:[ `String of string | `Write of bytes -> int -> int ] ->
unit -> unit ->
(response, Curl.curlCode * string) result io (string response, Curl.curlCode * string) result io
(** Shortcut for [http ~meth:PUT] (** Shortcut for [http ~meth:PUT]
See {!http} for more info. See {!http} for more info.
*) *)
@ -187,7 +215,7 @@ module type S = sig
params:Curl.curlHTTPPost list -> params:Curl.curlHTTPPost list ->
url:string -> url:string ->
unit -> unit ->
(response, Curl.curlCode * string) result io (string response, Curl.curlCode * string) result io
(** Shortcut for [http ~meth:(POST params)] (** Shortcut for [http ~meth:(POST params)]
See {!http} for more info. See {!http} for more info.
*) *)

39
test/basic_test.expected Normal file
View file

@ -0,0 +1,39 @@
get: OK
body=```
version = 0.26.2
profile=conventional
margin=80
if-then-else=k-r
parens-ite=true
parens-tuple=multi-line-only
sequence-style=terminator
type-decl=sparse
break-cases=toplevel
cases-exp-indent=2
field-space=tight-decl
leading-nested-match-parens=true
module-item-spacing=compact
quiet=true
ocaml-version=4.08.0
```
streaming get: OK
body=```
version = 0.26.2
profile=conventional
margin=80
if-then-else=k-r
parens-ite=true
parens-tuple=multi-line-only
sequence-style=terminator
type-decl=sparse
break-cases=toplevel
cases-exp-indent=2
field-space=tight-decl
leading-nested-match-parens=true
module-item-spacing=compact
quiet=true
ocaml-version=4.08.0
```
same buf? true

View file

@ -1,10 +1,30 @@
let body = ref ""
let url =
"https://raw.githubusercontent.com/c-cube/ezcurl/refs/heads/main/.ocamlformat"
let () = let () =
match Ezcurl.get ~url () with
| Error (code, msg) ->
Format.eprintf "curl error: code `%s` (%s)@." (Curl.strerror code) msg
| Ok res ->
body := res.body;
Format.printf "get: OK@.body=```@.%s@.```@." !body
let () =
let buf = Buffer.create 32 in
match match
Ezcurl.get Ezcurl.http_stream ~meth:GET ~url
~url: ~write_into:
"https://archive.softwareheritage.org/api/1/content/sha1_git:7bdf38d4468c114206c9b6ebd9cf1176e085d346/" (object
method on_input bs i len = Buffer.add_subbytes buf bs i len
method on_close () = ()
end)
() ()
with with
| Error (code, msg) -> | Error (code, msg) ->
Format.eprintf "curl error: code `%s` (%s)@." (Curl.strerror code) msg Format.eprintf "curl error: code `%s` (%s)@." (Curl.strerror code) msg
| Ok _response -> Format.printf "OK@." | Ok _res ->
let new_body = Buffer.contents buf in
Format.printf "streaming get: OK@.body=```@.%s@.```@." new_body;
Format.printf "same buf? %b@." (new_body = !body)