Merge pull request #10 from AestheticIntegration/matt/cohttp-client

Integration with cohttp-lwt (client)
This commit is contained in:
Matt Bray 2022-03-25 11:57:32 +00:00 committed by GitHub
commit b67fc00821
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 148 additions and 3 deletions

View file

@ -19,7 +19,7 @@ module Server : sig
(Server.make () ~callback:callback_traced)
*)
val trace :
service_name:string ->
?service_name:string ->
?attrs:Otel.Span.key_value list ->
('conn -> Request.t -> 'body -> (Response.t * 'body) Lwt.t) ->
'conn -> Request.t -> 'body -> (Response.t * 'body) Lwt.t
@ -112,11 +112,11 @@ end = struct
let headers = Header.remove (Request.headers req) header_x_ocaml_otel_traceparent in
{ req with headers }
let trace ~service_name ?(attrs=[]) callback =
let trace ?service_name ?(attrs=[]) callback =
fun conn req body ->
let scope = get_trace_context ~from:`External req in
Otel_lwt.Trace.with_
~service_name
?service_name
"request"
~kind:Span_kind_server
?trace_id:(Option.map (fun scope -> scope.Otel.Trace.trace_id) scope)
@ -145,3 +145,83 @@ end = struct
let req = set_trace_context scope req in
f req)
end
let client ?(scope : Otel.Trace.scope option) (module C : Cohttp_lwt.S.Client) =
let module Traced = struct
open Lwt.Syntax
let attrs_for ~uri ~meth () =
[ ("http.method", `String (Code.string_of_method `GET))
; ("http.url", `String (Uri.to_string uri))
]
let context_for ~uri ~meth =
let trace_id = match scope with | Some scope -> Some scope.trace_id | None -> None in
let parent = match scope with | Some scope -> Some scope.span_id | None -> None in
let attrs = attrs_for ~uri ~meth () in
(trace_id, parent, attrs)
let add_traceparent (scope : Otel.Trace.scope) headers =
let module Traceparent = Otel.Trace_context.Traceparent in
let headers = match headers with | None -> Header.init () | Some headers -> headers in
Header.add headers Traceparent.name
(Traceparent.to_value ~trace_id:scope.trace_id ~parent_id:scope.span_id ())
type ctx = C.ctx
let call ?ctx ?headers ?body ?chunked meth (uri : Uri.t) : (Response.t * Cohttp_lwt.Body.t) Lwt.t =
let (trace_id, parent, attrs) = context_for ~uri ~meth in
Otel_lwt.Trace.with_ "request"
~kind:Span_kind_client
?trace_id
?parent
~attrs
(fun scope ->
let headers = add_traceparent scope headers in
let* (res, body) = C.call ?ctx ~headers ?body ?chunked meth uri in
Otel.Trace.add_attrs scope (fun () ->
let code = Response.status res in
let code = Code.code_of_status code in
[ ("http.status_code", `Int code) ]) ;
Lwt.return (res, body))
let head ?ctx ?headers uri =
let open Lwt.Infix in
call ?ctx ?headers `HEAD uri >|= fst
let get ?ctx ?headers uri = call ?ctx ?headers `GET uri
let delete ?ctx ?body ?chunked ?headers uri =
call ?ctx ?headers ?body ?chunked `DELETE uri
let post ?ctx ?body ?chunked ?headers uri =
call ?ctx ?headers ?body ?chunked `POST uri
let put ?ctx ?body ?chunked ?headers uri =
call ?ctx ?headers ?body ?chunked `PUT uri
let patch ?ctx ?body ?chunked ?headers uri =
call ?ctx ?headers ?body ?chunked `PATCH uri
let post_form ?ctx ?headers ~params uri =
let (trace_id, parent, attrs) = context_for ~uri ~meth:`POST in
Otel_lwt.Trace.with_ "request"
~kind:Span_kind_client
?trace_id
?parent
~attrs
(fun scope ->
let headers = add_traceparent scope headers in
let* (res, body) =
C.post_form ?ctx ~headers ~params uri
in
Otel.Trace.add_attrs scope (fun () ->
let code = Response.status res in
let code = Code.code_of_status code in
[ ("http.status_code", `Int code) ]) ;
Lwt.return (res, body))
let callv = C.callv (* TODO *)
end
in
(module Traced : Cohttp_lwt.S.Client)

View file

@ -0,0 +1,59 @@
module T = Opentelemetry
module Otel_lwt = Opentelemetry_lwt
let spf = Printf.sprintf
let (let@) f x = f x
let sleep_inner = ref 0.1
let sleep_outer = ref 2.0
let mk_client ~scope = Opentelemetry_cohttp_lwt.client ~scope (module Cohttp_lwt_unix.Client)
let run () =
Printf.printf "collector is on %S\n%!" (Opentelemetry_client_ocurl.get_url());
let open Lwt.Syntax in
let rec go () =
let@ scope =
Otel_lwt.Trace.with_
~kind:T.Span.Span_kind_producer
"loop.outer"
in
let* () = Lwt_unix.sleep !sleep_outer in
let module C = (val mk_client ~scope) in
let* (res, body) = C.get (Uri.of_string "https://enec1hql02hz.x.pipedream.net") in
let* () = Cohttp_lwt.Body.drain_body body in
go ()
in
go ()
let () =
Sys.catch_break true;
T.Globals.service_name := "ocaml-otel-cohttp-client";
T.Globals.service_namespace := Some "ocaml-otel.test";
let debug = ref false in
let thread = ref true in
let batch_traces = ref 400 in
let batch_metrics = ref 3 in
let opts = [
"--debug", Arg.Bool ((:=) debug), " enable debug output";
"--thread", Arg.Bool ((:=) thread), " use a background thread";
"--batch-traces", Arg.Int ((:=) batch_traces), " size of traces batch";
"--batch-metrics", Arg.Int ((:=) batch_metrics), " size of metrics batch";
"--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop";
"--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop";
] |> Arg.align in
Arg.parse opts (fun _ -> ()) "emit1 [opt]*";
let some_if_nzero r = if !r > 0 then Some !r else None in
let config = Opentelemetry_client_ocurl.Config.make
~debug:!debug
~batch_traces:(some_if_nzero batch_traces)
~batch_metrics:(some_if_nzero batch_metrics)
~thread:!thread () in
Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@."
!sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config;
Format.printf "Check HTTP requests at https://requestbin.com/r/enec1hql02hz/26qShWryt5vJc1JfrOwalhr5vQt@.";
Opentelemetry_client_ocurl.with_setup ~config () (fun () -> Lwt_main.run (run ()))

View file

@ -1,3 +1,9 @@
(executable
(name emit1)
(modules emit1)
(libraries unix opentelemetry opentelemetry-client-ocurl))
(executable
(name cohttp_client)
(modules cohttp_client)
(libraries cohttp-lwt-unix opentelemetry opentelemetry-client-ocurl opentelemetry-cohttp-lwt))