Merge pull request #28 from imandra-ai/wip-cohttp-lwt-client

cohttp lwt client
This commit is contained in:
Simon Cruanes 2022-10-03 13:08:48 -04:00 committed by GitHub
commit 8a52da6446
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 991 additions and 23 deletions

View file

@ -39,7 +39,7 @@ jobs:
- run: opam install . --deps-only --with-test
- run: opam exec -- dune build @install -p opentelemetry,opentelemetry-lwt,opentelemetry-client-ocurl,opentelemetry-cohttp-lwt
- run: opam exec -- dune build @install -p opentelemetry,opentelemetry-lwt,opentelemetry-client-ocurl,opentelemetry-cohttp-lwt,opentelemetry-client-cohttp-lwt
- run: opam pin ocaml-protoc 2.3 -y

View file

@ -55,3 +55,16 @@
(odoc :with-doc)
(cohttp-lwt (>= "4.0.0")))
(synopsis "Opentelemetry tracing for Cohttp HTTP servers"))
(package
(name opentelemetry-client-cohttp-lwt)
(depends
(ocaml (>= "4.08"))
(mtime (>= "1.4")) ; for spans
(opentelemetry (= :version))
(pbrt (>= 2.2))
(odoc :with-doc)
(lwt_ppx :with-test)
cohttp-lwt
cohttp-lwt-unix)
(synopsis "Collector client for opentelemetry, using cohttp + lwt"))

View file

@ -0,0 +1,37 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.3"
synopsis: "Collector client for opentelemetry, using cohttp + lwt"
maintainer: ["the Imandra team and contributors"]
authors: ["the Imandra team and contributors"]
license: "MIT"
homepage: "https://github.com/aestheticintegration/ocaml-opentelemetry"
bug-reports:
"https://github.com/aestheticintegration/ocaml-opentelemetry/issues"
depends: [
"dune" {>= "2.7"}
"ocaml" {>= "4.08"}
"mtime" {>= "1.4"}
"opentelemetry" {= version}
"pbrt" {>= "2.2"}
"odoc" {with-doc}
"lwt_ppx" {with-test}
"cohttp-lwt"
"cohttp-lwt-unix"
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo:
"git+https://github.com/aestheticintegration/ocaml-opentelemetry.git"

View file

@ -0,0 +1,38 @@
open Lwt.Syntax
module Atomic = Opentelemetry_atomic.Atomic
let[@inline] ( let@ ) f x = f x
let spf = Printf.sprintf
let tid () = Thread.id @@ Thread.self ()
let debug_ =
ref
(match Sys.getenv_opt "OTEL_OCAML_DEBUG" with
| Some ("1" | "true") -> true
| _ -> false)
let default_url = "http://localhost:4318"
let url =
ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url)
let get_url () = !url
let set_url s = url := s
let parse_headers s =
let parse_header s = Scanf.sscanf s "%s@=%s" (fun key value -> key, value) in
String.split_on_char ',' s |> List.map parse_header
let default_headers = []
let headers =
ref
(try parse_headers (Sys.getenv "OTEL_EXPORTER_OTLP_HEADERS")
with _ -> default_headers)
let get_headers () = !headers
let set_headers s = headers := s

View file

@ -0,0 +1,45 @@
open Common_
type t = {
debug: bool;
url: string;
headers: (string * string) list;
batch_traces: int option;
batch_metrics: int option;
batch_logs: int option;
batch_timeout_ms: int;
}
let pp out self : unit =
let ppiopt = Format.pp_print_option Format.pp_print_int in
let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in
let ppheaders = Format.pp_print_list pp_header in
let {
debug;
url;
headers;
batch_traces;
batch_metrics;
batch_logs;
batch_timeout_ms;
} =
self
in
Format.fprintf out
"{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \
batch_logs=%a;@ batch_timeout_ms=%d; @]}"
debug url ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt
batch_logs batch_timeout_ms
let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
?(batch_traces = Some 400) ?(batch_metrics = Some 20)
?(batch_logs = Some 400) ?(batch_timeout_ms = 500) () : t =
{
debug;
url;
headers;
batch_traces;
batch_metrics;
batch_timeout_ms;
batch_logs;
}

View file

@ -0,0 +1,54 @@
type t = private {
debug: bool;
url: string;
(** Url of the endpoint. Default is "http://localhost:4318",
or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *)
headers: (string * string) list;
(** API headers sent to the endpoint. Default is none or
"OTEL_EXPORTER_OTLP_HEADERS" if set. *)
batch_traces: int option;
(** Batch traces? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
Note that traces and metrics are batched separately.
Default [Some 400].
*)
batch_metrics: int option;
(** Batch metrics? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
Note that traces and metrics are batched separately.
Default [None].
*)
batch_logs: int option;
(** Batch logs? See {!batch_metrics} for details.
Default [Some 400] *)
batch_timeout_ms: int;
(** Number of milliseconds after which we will emit a batch, even
incomplete.
Note that the batch might take longer than that, because this is
only checked when a new event occurs. Default 500. *)
}
(** Configuration.
To build one, use {!make} below. This might be extended with more
fields in the future. *)
val make :
?debug:bool ->
?url:string ->
?headers:(string * string) list ->
?batch_traces:int option ->
?batch_metrics:int option ->
?batch_logs:int option ->
?batch_timeout_ms:int ->
unit ->
t
(** Make a configuration.
@param thread if true and [bg_threads] is not provided, we will pick a number
of bg threads. Otherwise the number of [bg_threads] superseeds this option.
*)
val pp : Format.formatter -> t -> unit

View file

@ -0,0 +1,6 @@
(library
(name opentelemetry_client_cohttp_lwt)
(public_name opentelemetry-client-cohttp-lwt)
(synopsis "Opentelemetry collector using cohttp+lwt+unix")
(libraries opentelemetry lwt cohttp-lwt cohttp-lwt-unix pbrt mtime
mtime.clock.os))

View file

@ -0,0 +1,578 @@
(*
https://github.com/open-telemetry/oteps/blob/main/text/0035-opentelemetry-protocol.md
https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md
*)
module OT = Opentelemetry
module Config = Config
open Opentelemetry
include Common_
let needs_gc_metrics = Atomic.make false
let gc_metrics = ref []
(* side channel for GC, appended to {!E_metrics}'s data *)
(* capture current GC metrics if {!needs_gc_metrics} is true,
and push them into {!gc_metrics} for later
collection *)
let sample_gc_metrics_if_needed () =
if Atomic.compare_and_set needs_gc_metrics true false then (
let l =
OT.Metrics.make_resource_metrics
~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ())
@@ Opentelemetry.GC_metrics.get_metrics ()
in
gc_metrics := l :: !gc_metrics
)
type error =
[ `Status of int * Opentelemetry.Proto.Status.status
| `Failure of string
| `Sysbreak
]
let n_errors = Atomic.make 0
let n_dropped = Atomic.make 0
let report_err_ = function
| `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!"
| `Failure msg ->
Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg
| `Status (code, { Opentelemetry.Proto.Status.code = scode; message; details })
->
let pp_details out l =
List.iter
(fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s))
l
in
Format.eprintf
"@[<2>opentelemetry: export failed with@ http code=%d@ status \
{@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]@."
code scode
(Bytes.unsafe_to_string message)
pp_details details
module Httpc : sig
type t
val create : unit -> t
val send :
t ->
path:string ->
decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] ->
string ->
('a, error) result Lwt.t
val cleanup : t -> unit
end = struct
open Opentelemetry.Proto
open Lwt.Syntax
module Httpc = Cohttp_lwt_unix.Client
type t = unit
let create () : t = ()
let cleanup _self = ()
(* send the content to the remote endpoint/path *)
let send (_self : t) ~path ~decode (bod : string) : ('a, error) result Lwt.t =
let full_url = !url ^ path in
let uri = Uri.of_string full_url in
let open Cohttp in
let headers = Header.(add_list (init ()) !headers) in
let headers =
Header.(add headers "Content-Type" "application/x-protobuf")
in
let body = Cohttp_lwt.Body.of_string bod in
let* r =
Lwt.catch
(fun () ->
let+ r = Httpc.post ~headers ~body uri in
Ok r)
(fun e -> Lwt.return @@ Error e)
in
match r with
| Error e ->
let err =
`Failure
(spf "sending signals via http POST to %S\nfailed with:\n%s" full_url
(Printexc.to_string e))
in
Lwt.return @@ Error err
| Ok (resp, body) ->
let* body = Cohttp_lwt.Body.to_string body in
let code = Response.status resp |> Code.code_of_status in
if not (Code.is_error code) then (
match decode with
| `Ret x -> Lwt.return @@ Ok x
| `Dec f ->
let dec = Pbrt.Decoder.of_string body in
let r =
try Ok (f dec)
with e ->
let bt = Printexc.get_backtrace () in
Error
(`Failure
(spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) bt))
in
Lwt.return r
) else (
let dec = Pbrt.Decoder.of_string body in
let r =
try
let status = Status.decode_status dec in
Error (`Status (code, status))
with e ->
let bt = Printexc.get_backtrace () in
Error
(`Failure
(spf
"httpc: decoding of status (url=%S, code=%d) failed with:\n\
%s\n\
status: %S\n\
%s"
full_url code (Printexc.to_string e) body bt))
in
Lwt.return r
)
end
(** Batch of resources to be pushed later.
This type is thread-safe. *)
module Batch : sig
type 'a t
val push : 'a t -> 'a -> bool
(** [push batch x] pushes [x] into the batch, and heuristically
returns [true] if the batch is ready to be emitted (to know if we should
wake up the sending thread, if any) *)
val push' : 'a t -> 'a -> unit
val is_ready : now:Mtime.t -> _ t -> bool
(** is the batch ready to be sent? This is heuristic. *)
val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option
(** Is the batch ready to be emitted? If batching is disabled,
this is true as soon as {!is_empty} is false. If a timeout is provided
for this batch, then it will be ready if an element has been in it
for at least the timeout.
@param now passed to implement timeout *)
val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t
(** Create a new batch *)
end = struct
type 'a t = {
mutable size: int;
mutable q: 'a list;
batch: int option;
high_watermark: int;
timeout: Mtime.span option;
mutable start: Mtime.t;
}
let make ?batch ?timeout () : _ t =
Option.iter (fun b -> assert (b > 0)) batch;
let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in
{
size = 0;
start = Mtime_clock.now ();
q = [];
batch;
timeout;
high_watermark;
}
let is_empty_ self = self.size = 0
let timeout_expired_ ~now self : bool =
match self.timeout with
| Some t ->
let elapsed = Mtime.span now self.start in
Mtime.Span.compare elapsed t >= 0
| None -> false
let is_full_ self : bool =
match self.batch with
| None -> self.size > 0
| Some b -> self.size >= b
let is_ready ~now self : bool = is_full_ self || timeout_expired_ ~now self
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
if self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self)
then (
let l = self.q in
self.q <- [];
self.size <- 0;
assert (l <> []);
Some l
) else
None
let push (self : _ t) x : bool =
if self.size >= self.high_watermark then (
(* drop this to prevent queue from growing too fast *)
Atomic.incr n_dropped;
true
) else (
if self.size = 0 && Option.is_some self.timeout then
(* current batch starts now *)
self.start <- Mtime_clock.now ();
(* add to queue *)
self.size <- 1 + self.size;
self.q <- x :: self.q;
let ready = is_full_ self in
ready
)
let push' self x = ignore (push self x : bool)
end
(** An emitter. This is used by {!Backend} below to forward traces/metrics/…
from the program to whatever collector client we have. *)
module type EMITTER = sig
open Opentelemetry.Proto
val push_trace : Trace.resource_spans list -> unit
val push_metrics : Metrics.resource_metrics list -> unit
val push_logs : Logs.resource_logs list -> unit
val set_on_tick_callbacks : (unit -> unit) list ref -> unit
val tick : unit -> unit
val cleanup : unit -> unit
end
(* make an emitter.
exceptions inside should be caught, see
https://opentelemetry.io/docs/reference/specification/error-handling/ *)
let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let open Proto in
let open Lwt.Syntax in
(* local helpers *)
let open struct
let timeout =
if config.batch_timeout_ms > 0 then
Some Mtime.Span.(config.batch_timeout_ms * ms)
else
None
let batch_traces : Trace.resource_spans list Batch.t =
Batch.make ?batch:config.batch_traces ?timeout ()
let batch_metrics : Metrics.resource_metrics list Batch.t =
Batch.make ?batch:config.batch_metrics ?timeout ()
let batch_logs : Logs.resource_logs list Batch.t =
Batch.make ?batch:config.batch_logs ?timeout ()
let on_tick_cbs_ = Atomic.make (ref [])
let set_on_tick_callbacks = Atomic.set on_tick_cbs_
let send_http_ (httpc : Httpc.t) encoder ~path ~encode x : unit Lwt.t =
Pbrt.Encoder.reset encoder;
encode x encoder;
let data = Pbrt.Encoder.to_string encoder in
let* r = Httpc.send httpc ~path ~decode:(`Ret ()) data in
match r with
| Ok () -> Lwt.return ()
| Error `Sysbreak ->
Printf.eprintf "ctrl-c captured, stopping\n%!";
Atomic.set stop true;
Lwt.return ()
| Error err ->
(* TODO: log error _via_ otel? *)
Atomic.incr n_errors;
report_err_ err;
(* avoid crazy error loop *)
Lwt_unix.sleep 3.
let send_metrics_http curl encoder (l : Metrics.resource_metrics list list)
=
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Metrics_service.default_export_metrics_service_request
~resource_metrics:l ()
in
send_http_ curl encoder ~path:"/v1/metrics"
~encode:Metrics_service.encode_export_metrics_service_request x
let send_traces_http curl encoder (l : Trace.resource_spans list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Trace_service.default_export_trace_service_request ~resource_spans:l ()
in
send_http_ curl encoder ~path:"/v1/traces"
~encode:Trace_service.encode_export_trace_service_request x
let send_logs_http curl encoder (l : Logs.resource_logs list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Logs_service.default_export_logs_service_request ~resource_logs:l ()
in
send_http_ curl encoder ~path:"/v1/logs"
~encode:Logs_service.encode_export_logs_service_request x
(* emit metrics, if the batch is full or timeout lapsed *)
let emit_metrics_maybe ~now ?force httpc encoder : bool Lwt.t =
match Batch.pop_if_ready ?force ~now batch_metrics with
| None -> Lwt.return false
| Some l ->
let batch = !gc_metrics :: l in
gc_metrics := [];
let+ () = send_metrics_http httpc encoder batch in
true
let emit_traces_maybe ~now ?force httpc encoder : bool Lwt.t =
match Batch.pop_if_ready ?force ~now batch_traces with
| None -> Lwt.return false
| Some l ->
let+ () = send_traces_http httpc encoder l in
true
let emit_logs_maybe ~now ?force httpc encoder : bool Lwt.t =
match Batch.pop_if_ready ?force ~now batch_logs with
| None -> Lwt.return false
| Some l ->
let+ () = send_logs_http httpc encoder l in
true
let[@inline] guard_exn_ where f =
try f ()
with e ->
let bt = Printexc.get_backtrace () in
Printf.eprintf
"opentelemetry-curl: uncaught exception in %s: %s\n%s\n%!" where
(Printexc.to_string e) bt
let emit_all_force (httpc : Httpc.t) encoder : unit Lwt.t =
let now = Mtime_clock.now () in
let+ (_ : bool) = emit_traces_maybe ~now ~force:true httpc encoder
and+ (_ : bool) = emit_logs_maybe ~now ~force:true httpc encoder
and+ (_ : bool) = emit_metrics_maybe ~now ~force:true httpc encoder in
()
let tick_common_ () =
if !debug_ then Printf.eprintf "tick (from %d)\n%!" (tid ());
sample_gc_metrics_if_needed ();
List.iter
(fun f ->
try f ()
with e ->
Printf.eprintf "on tick callback raised: %s\n"
(Printexc.to_string e))
!(Atomic.get on_tick_cbs_);
()
(* thread that calls [tick()] regularly, to help enforce timeouts *)
let setup_ticker_thread ~tick ~finally () =
let rec tick_thread () =
if Atomic.get stop then (
finally ();
Lwt.return ()
) else
let* () = Lwt_unix.sleep 0.5 in
let* () = tick () in
tick_thread ()
in
Lwt.async tick_thread
end in
let httpc = Httpc.create () in
let encoder = Pbrt.Encoder.create () in
let module M = struct
(* we make sure that this is thread-safe, even though we don't have a
background thread. There can still be a ticker thread, and there
can also be several user threads that produce spans and call
the emit functions. *)
let push_trace e =
let@ () = guard_exn_ "push trace" in
Batch.push' batch_traces e;
let now = Mtime_clock.now () in
Lwt.async (fun () ->
let+ (_ : bool) = emit_traces_maybe ~now httpc encoder in
())
let push_metrics e =
let@ () = guard_exn_ "push metrics" in
sample_gc_metrics_if_needed ();
Batch.push' batch_metrics e;
let now = Mtime_clock.now () in
Lwt.async (fun () ->
let+ (_ : bool) = emit_metrics_maybe ~now httpc encoder in
())
let push_logs e =
let@ () = guard_exn_ "push logs" in
Batch.push' batch_logs e;
let now = Mtime_clock.now () in
Lwt.async (fun () ->
let+ (_ : bool) = emit_logs_maybe ~now httpc encoder in
())
let set_on_tick_callbacks = set_on_tick_callbacks
let tick_ () =
tick_common_ ();
sample_gc_metrics_if_needed ();
let now = Mtime_clock.now () in
let+ (_ : bool) = emit_traces_maybe ~now httpc encoder
and+ (_ : bool) = emit_logs_maybe ~now httpc encoder
and+ (_ : bool) = emit_metrics_maybe ~now httpc encoder in
()
let () = setup_ticker_thread ~tick:tick_ ~finally:ignore ()
(* if called in a blocking context: work in the background *)
let tick () = Lwt.async tick_
let cleanup () =
if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!";
Lwt.async (fun () ->
let* () = emit_all_force httpc encoder in
Httpc.cleanup httpc;
Lwt.return ())
end in
(module M)
module Backend (Arg : sig
val stop : bool Atomic.t
val config : Config.t
end)
() : Opentelemetry.Collector.BACKEND = struct
include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ())
open Opentelemetry.Proto
open Opentelemetry.Collector
let send_trace : Trace.resource_spans list sender =
{
send =
(fun l ~ret ->
(if !debug_ then
let@ () = Lock.with_lock in
Format.eprintf "send spans %a@."
(Format.pp_print_list Trace.pp_resource_spans)
l);
push_trace l;
ret ());
}
let last_sent_metrics = Atomic.make (Mtime_clock.now ())
let timeout_sent_metrics = Mtime.Span.(5 * s)
(* send metrics from time to time *)
let signal_emit_gc_metrics () =
if !debug_ then
Printf.eprintf "opentelemetry: emit GC metrics requested\n%!";
Atomic.set needs_gc_metrics true
let additional_metrics () : Metrics.resource_metrics list =
(* add exporter metrics to the lot? *)
let last_emit = Atomic.get last_sent_metrics in
let now = Mtime_clock.now () in
let add_own_metrics =
let elapsed = Mtime.span last_emit now in
Mtime.Span.compare elapsed timeout_sent_metrics > 0
in
(* there is a possible race condition here, as several threads might update
metrics at the same time. But that's harmless. *)
if add_own_metrics then (
Atomic.set last_sent_metrics now;
let open OT.Metrics in
[
make_resource_metrics
[
sum ~name:"otel.export.dropped" ~is_monotonic:true
[
int
~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped);
];
sum ~name:"otel.export.errors" ~is_monotonic:true
[
int
~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors);
];
];
]
) else
[]
let send_metrics : Metrics.resource_metrics list sender =
{
send =
(fun m ~ret ->
(if !debug_ then
let@ () = Lock.with_lock in
Format.eprintf "send metrics %a@."
(Format.pp_print_list Metrics.pp_resource_metrics)
m);
let m = List.rev_append (additional_metrics ()) m in
push_metrics m;
ret ());
}
let send_logs : Logs.resource_logs list sender =
{
send =
(fun m ~ret ->
(if !debug_ then
let@ () = Lock.with_lock in
Format.eprintf "send logs %a@."
(Format.pp_print_list Logs.pp_resource_logs)
m);
push_logs m;
ret ());
}
end
let setup_ ?(stop = Atomic.make false) ~(config : Config.t) () =
debug_ := config.debug;
let module B =
Backend
(struct
let stop = stop
let config = config
end)
()
in
Opentelemetry.Collector.set_backend (module B);
B.cleanup
let setup ?stop ?(config = Config.make ()) ?(enable = true) () =
if enable then (
let cleanup = setup_ ?stop ~config () in
at_exit cleanup
)
let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f =
if enable then (
let cleanup = setup_ ?stop ~config () in
Fun.protect ~finally:cleanup f
) else
f ()

View file

@ -0,0 +1,41 @@
(*
TODO: more options from
https://opentelemetry.io/docs/reference/specification/protocol/exporter/
*)
open Common_
val get_url : unit -> string
val set_url : string -> unit
(** Url of the endpoint. Default is "http://localhost:4318",
or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *)
val get_headers : unit -> (string * string) list
val set_headers : (string * string) list -> unit
(** Set http headers that are sent on every http query to the collector. *)
module Config = Config
val setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.
@param enable actually setup the backend (default true). This can
be used to enable/disable the setup depending on CLI arguments
or environment.
@param config configuration to use
@param stop an atomic boolean. When it becomes true, background threads
will all stop after a little while.
*)
val with_setup :
?stop:bool Atomic.t ->
?config:Config.t ->
?enable:bool ->
unit ->
(unit -> 'a) ->
'a
(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up
after [f()] returns
See {!setup} for more details. *)

View file

@ -220,8 +220,6 @@ end = struct
high_watermark;
}
let is_empty_ self = self.size = 0
let timeout_expired_ ~now self : bool =
match self.timeout with
| Some t ->
@ -552,6 +550,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let set_on_tick_callbacks = set_on_tick_callbacks
let tick () =
tick_common_ ();
sample_gc_metrics_if_needed ();
let@ () = Lock.with_lock in
let now = Mtime_clock.now () in

View file

@ -16,15 +16,17 @@ module Server : sig
Use it like this:
let my_server callback =
let callback_traced =
Opentelemetry_cohttp_lwt.Server.trace
~service_name:"my-service"
(fun _scope -> callback)
in
Cohttp_lwt_unix.Server.create
~mode:(`TCP (`Port 8080))
(Server.make () ~callback:callback_traced)
{[
let my_server callback =
let callback_traced =
Opentelemetry_cohttp_lwt.Server.trace
~service_name:"my-service"
(fun _scope -> callback)
in
Cohttp_lwt_unix.Server.create
~mode:(`TCP (`Port 8080))
(Server.make () ~callback:callback_traced)
]}
*)
val with_ :

View file

@ -4,9 +4,11 @@ module Span_id = Span_id
module Trace_id = Trace_id
module Event = Event
module Span = Span
module Span_link = Span_link
module Globals = Globals
module Timestamp_ns = Timestamp_ns
module GC_metrics = GC_metrics
module Metrics_callbacks = Metrics_callbacks
module Trace_context = Trace_context
module Trace = struct
@ -59,3 +61,8 @@ module Metrics = struct
open Proto.Metrics
include Metrics
end
module Logs = struct
include Proto.Logs
include Logs
end

View file

@ -981,8 +981,6 @@ end
(** A set of callbacks that produce metrics when called.
The metrics are automatically called regularly.
This allows applications to register metrics callbacks from various points
in the program (or even in libraries), and not worry about setting
alarms/intervals to emit them. *)

View file

@ -13,7 +13,8 @@ let mk_client ~scope =
Opentelemetry_cohttp_lwt.client ~scope (module Cohttp_lwt_unix.Client)
let run () =
Printf.printf "collector is on %S\n%!" (Opentelemetry_client_ocurl.get_url ());
Printf.printf "collector is on %S\n%!"
(Opentelemetry_client_cohttp_lwt.get_url ());
let open Lwt.Syntax in
let rec go () =
let@ scope =
@ -21,7 +22,7 @@ let run () =
in
let* () = Lwt_unix.sleep !sleep_outer in
let module C = (val mk_client ~scope) in
let* res, body =
let* _res, body =
C.get (Uri.of_string "https://enec1hql02hz.x.pipedream.net")
in
let* () = Cohttp_lwt.Body.drain_body body in
@ -35,13 +36,11 @@ let () =
T.Globals.service_namespace := Some "ocaml-otel.test";
let debug = ref false in
let thread = ref true in
let batch_traces = ref 400 in
let batch_metrics = ref 3 in
let opts =
[
"--debug", Arg.Bool (( := ) debug), " enable debug output";
"--thread", Arg.Bool (( := ) thread), " use a background thread";
"--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch";
( "--batch-metrics",
Arg.Int (( := ) batch_metrics),
@ -61,17 +60,17 @@ let () =
None
in
let config =
Opentelemetry_client_ocurl.Config.make ~debug:!debug
Opentelemetry_client_cohttp_lwt.Config.make ~debug:!debug
~batch_traces:(some_if_nzero batch_traces)
~batch_metrics:(some_if_nzero batch_metrics)
~thread:!thread ()
()
in
Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@."
!sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config;
!sleep_outer !sleep_inner Opentelemetry_client_cohttp_lwt.Config.pp config;
Format.printf
"Check HTTP requests at \
https://requestbin.com/r/enec1hql02hz/26qShWryt5vJc1JfrOwalhr5vQt@.";
Opentelemetry_client_ocurl.with_setup ~config () (fun () ->
Opentelemetry_client_cohttp_lwt.with_setup ~config () (fun () ->
Lwt_main.run (run ()))

View file

@ -3,8 +3,14 @@
(modules emit1)
(libraries unix opentelemetry opentelemetry-client-ocurl))
(executable
(name emit1_cohttp)
(modules emit1_cohttp)
(preprocess (pps lwt_ppx))
(libraries unix opentelemetry opentelemetry-lwt opentelemetry-client-cohttp-lwt lwt.unix))
(executable
(name cohttp_client)
(modules cohttp_client)
(libraries cohttp-lwt-unix opentelemetry opentelemetry-client-ocurl
(libraries cohttp-lwt-unix opentelemetry opentelemetry-client-cohttp-lwt
opentelemetry-cohttp-lwt))

145
tests/bin/emit1_cohttp.ml Normal file
View file

@ -0,0 +1,145 @@
module T = Opentelemetry_lwt
module Atomic = Opentelemetry_atomic.Atomic
open Lwt.Syntax
let spf = Printf.sprintf
let ( let@ ) f x = f x
let sleep_inner = ref 0.1
let sleep_outer = ref 2.0
let n_jobs = ref 1
let num_sleep = Atomic.make 0
let stress_alloc_ = ref true
let stop = Atomic.make false
let num_tr = Atomic.make 0
let run_job () : unit Lwt.t =
let i = ref 0 in
while%lwt not @@ Atomic.get stop do
let@ scope =
Atomic.incr num_tr;
T.Trace.with_ ~kind:T.Span.Span_kind_producer "loop.outer"
~attrs:[ "i", `Int !i ]
in
for%lwt j = 0 to 4 do
(* parent scope is found via thread local storage *)
let@ scope =
Atomic.incr num_tr;
T.Trace.with_ ~scope ~kind:T.Span.Span_kind_internal
~attrs:[ "j", `Int j ]
"loop.inner"
in
let* () = Lwt_unix.sleep !sleep_outer in
Atomic.incr num_sleep;
T.Logs.(
emit
[
make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id
~severity:Severity_number_info "inner at %d" j;
]);
incr i;
try%lwt
Atomic.incr num_tr;
let@ scope =
T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope "alloc"
in
(* allocate some stuff *)
if !stress_alloc_ then (
let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in
ignore _arr
);
let* () = Lwt_unix.sleep !sleep_inner in
Atomic.incr num_sleep;
if j = 4 && !i mod 13 = 0 then failwith "oh no";
(* simulate a failure *)
T.Trace.add_event scope (fun () -> T.Event.make "done with alloc");
Lwt.return ()
with Failure _ -> Lwt.return ()
done
done
let run () : unit Lwt.t =
Printf.printf "collector is on %S\n%!"
(Opentelemetry_client_cohttp_lwt.get_url ());
T.GC_metrics.basic_setup ();
T.Metrics_callbacks.register (fun () ->
T.Metrics.
[
sum ~name:"num-sleep" ~is_monotonic:true
[ int (Atomic.get num_sleep) ];
]);
let n_jobs = max 1 !n_jobs in
Printf.printf "run %d jobs\n%!" n_jobs;
let jobs = Array.init n_jobs (fun _ -> run_job ()) |> Array.to_list in
Lwt.join jobs
let () =
Sys.catch_break true;
T.Globals.service_name := "t1";
T.Globals.service_namespace := Some "ocaml-otel.test";
let ts_start = Unix.gettimeofday () in
let debug = ref false in
let batch_traces = ref 400 in
let batch_metrics = ref 3 in
let opts =
[
"--debug", Arg.Bool (( := ) debug), " enable debug output";
( "--stress-alloc",
Arg.Bool (( := ) stress_alloc_),
" perform heavy allocs in inner loop" );
"--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch";
( "--batch-metrics",
Arg.Int (( := ) batch_metrics),
" size of metrics batch" );
"--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop";
"--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop";
"-j", Arg.Set_int n_jobs, " number of parallel jobs";
]
|> Arg.align
in
Arg.parse opts (fun _ -> ()) "emit1 [opt]*";
let some_if_nzero r =
if !r > 0 then
Some !r
else
None
in
let config =
Opentelemetry_client_cohttp_lwt.Config.make ~debug:!debug
~batch_traces:(some_if_nzero batch_traces)
~batch_metrics:(some_if_nzero batch_metrics)
()
in
Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@."
!sleep_outer !sleep_inner Opentelemetry_client_cohttp_lwt.Config.pp config;
let@ () =
Fun.protect ~finally:(fun () ->
let elapsed = Unix.gettimeofday () -. ts_start in
let n_per_sec = float (Atomic.get num_tr) /. elapsed in
Printf.printf "\ndone. %d spans in %.4fs (%.4f/s)\n%!"
(Atomic.get num_tr) elapsed n_per_sec)
in
Opentelemetry_client_cohttp_lwt.with_setup ~stop ~config () @@ fun () ->
Lwt_main.run @@ run ()