feat: add opentelemetry-client-cohttp-lwt library

This commit is contained in:
Simon Cruanes 2022-09-30 16:44:19 -04:00
parent 4ab35e4a54
commit c3f5b36e36
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
9 changed files with 808 additions and 2 deletions

View file

@ -55,3 +55,15 @@
(odoc :with-doc)
(cohttp-lwt (>= "4.0.0")))
(synopsis "Opentelemetry tracing for Cohttp HTTP servers"))
(package
(name opentelemetry-client-cohttp-lwt)
(depends
(ocaml (>= "4.08"))
(mtime (>= "1.4")) ; for spans
(opentelemetry (= :version))
(pbrt (>= 2.2))
(odoc :with-doc)
cohttp-lwt
cohttp-lwt-unix)
(synopsis "Collector client for opentelemetry, using cohttp + lwt"))

View file

@ -0,0 +1,36 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.3"
synopsis: "Collector client for opentelemetry, using cohttp + lwt"
maintainer: ["the Imandra team and contributors"]
authors: ["the Imandra team and contributors"]
license: "MIT"
homepage: "https://github.com/aestheticintegration/ocaml-opentelemetry"
bug-reports:
"https://github.com/aestheticintegration/ocaml-opentelemetry/issues"
depends: [
"dune" {>= "2.7"}
"ocaml" {>= "4.08"}
"mtime" {>= "1.4"}
"opentelemetry" {= version}
"pbrt" {>= "2.2"}
"odoc" {with-doc}
"cohttp-lwt"
"cohttp-lwt-unix"
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo:
"git+https://github.com/aestheticintegration/ocaml-opentelemetry.git"

View file

@ -0,0 +1,38 @@
open Lwt.Syntax
module Atomic = Opentelemetry_atomic.Atomic
let[@inline] ( let@ ) f x = f x
let spf = Printf.sprintf
let tid () = Thread.id @@ Thread.self ()
let debug_ =
ref
(match Sys.getenv_opt "OTEL_OCAML_DEBUG" with
| Some ("1" | "true") -> true
| _ -> false)
let default_url = "http://localhost:4318"
let url =
ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url)
let get_url () = !url
let set_url s = url := s
let parse_headers s =
let parse_header s = Scanf.sscanf s "%s@=%s" (fun key value -> key, value) in
String.split_on_char ',' s |> List.map parse_header
let default_headers = []
let headers =
ref
(try parse_headers (Sys.getenv "OTEL_EXPORTER_OTLP_HEADERS")
with _ -> default_headers)
let get_headers () = !headers
let set_headers s = headers := s

View file

@ -0,0 +1,45 @@
open Common_
type t = {
debug: bool;
url: string;
headers: (string * string) list;
batch_traces: int option;
batch_metrics: int option;
batch_logs: int option;
batch_timeout_ms: int;
}
let pp out self : unit =
let ppiopt = Format.pp_print_option Format.pp_print_int in
let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in
let ppheaders = Format.pp_print_list pp_header in
let {
debug;
url;
headers;
batch_traces;
batch_metrics;
batch_logs;
batch_timeout_ms;
} =
self
in
Format.fprintf out
"{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \
batch_logs=%a;@ batch_timeout_ms=%d; @]}"
debug url ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt
batch_logs batch_timeout_ms
let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
?(batch_traces = Some 400) ?(batch_metrics = Some 20)
?(batch_logs = Some 400) ?(batch_timeout_ms = 500) () : t =
{
debug;
url;
headers;
batch_traces;
batch_metrics;
batch_timeout_ms;
batch_logs;
}

View file

@ -0,0 +1,54 @@
type t = private {
debug: bool;
url: string;
(** Url of the endpoint. Default is "http://localhost:4318",
or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *)
headers: (string * string) list;
(** API headers sent to the endpoint. Default is none or
"OTEL_EXPORTER_OTLP_HEADERS" if set. *)
batch_traces: int option;
(** Batch traces? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
Note that traces and metrics are batched separately.
Default [Some 400].
*)
batch_metrics: int option;
(** Batch metrics? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
Note that traces and metrics are batched separately.
Default [None].
*)
batch_logs: int option;
(** Batch logs? See {!batch_metrics} for details.
Default [Some 400] *)
batch_timeout_ms: int;
(** Number of milliseconds after which we will emit a batch, even
incomplete.
Note that the batch might take longer than that, because this is
only checked when a new event occurs. Default 500. *)
}
(** Configuration.
To build one, use {!make} below. This might be extended with more
fields in the future. *)
val make :
?debug:bool ->
?url:string ->
?headers:(string * string) list ->
?batch_traces:int option ->
?batch_metrics:int option ->
?batch_logs:int option ->
?batch_timeout_ms:int ->
unit ->
t
(** Make a configuration.
@param thread if true and [bg_threads] is not provided, we will pick a number
of bg threads. Otherwise the number of [bg_threads] superseeds this option.
*)
val pp : Format.formatter -> t -> unit

View file

@ -0,0 +1,6 @@
(library
(name opentelemetry_client_cohttp_lwt)
(public_name opentelemetry-client-cohttp-lwt)
(synopsis "Opentelemetry collector using cohttp+lwt+unix")
(libraries opentelemetry lwt cohttp-lwt cohttp-lwt-unix pbrt mtime
mtime.clock.os))

View file

@ -0,0 +1,578 @@
(*
https://github.com/open-telemetry/oteps/blob/main/text/0035-opentelemetry-protocol.md
https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md
*)
module OT = Opentelemetry
module Config = Config
open Opentelemetry
include Common_
let needs_gc_metrics = Atomic.make false
let gc_metrics = ref []
(* side channel for GC, appended to {!E_metrics}'s data *)
(* capture current GC metrics if {!needs_gc_metrics} is true,
and push them into {!gc_metrics} for later
collection *)
let sample_gc_metrics_if_needed () =
if Atomic.compare_and_set needs_gc_metrics true false then (
let l =
OT.Metrics.make_resource_metrics
~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ())
@@ Opentelemetry.GC_metrics.get_metrics ()
in
gc_metrics := l :: !gc_metrics
)
type error =
[ `Status of int * Opentelemetry.Proto.Status.status
| `Failure of string
| `Sysbreak
]
let n_errors = Atomic.make 0
let n_dropped = Atomic.make 0
let report_err_ = function
| `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!"
| `Failure msg ->
Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg
| `Status (code, { Opentelemetry.Proto.Status.code = scode; message; details })
->
let pp_details out l =
List.iter
(fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s))
l
in
Format.eprintf
"@[<2>opentelemetry: export failed with@ http code=%d@ status \
{@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]@."
code scode
(Bytes.unsafe_to_string message)
pp_details details
module Httpc : sig
type t
val create : unit -> t
val send :
t ->
path:string ->
decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] ->
string ->
('a, error) result Lwt.t
val cleanup : t -> unit
end = struct
open Opentelemetry.Proto
open Lwt.Syntax
module Httpc = Cohttp_lwt_unix.Client
type t = unit
let create () : t = ()
let cleanup _self = ()
(* send the content to the remote endpoint/path *)
let send (_self : t) ~path ~decode (bod : string) : ('a, error) result Lwt.t =
let full_url = !url ^ path in
let uri = Uri.of_string full_url in
let open Cohttp in
let headers = Header.(add_list (init ()) !headers) in
let headers =
Header.(add headers "Content-Type" "application/x-protobuf")
in
let body = Cohttp_lwt.Body.of_string bod in
let* r =
Lwt.catch
(fun () ->
let+ r = Httpc.post ~headers ~body uri in
Ok r)
(fun e -> Lwt.return @@ Error e)
in
match r with
| Error e ->
let err =
`Failure
(spf "sending signals via http POST to %S\nfailed with:\n%s" full_url
(Printexc.to_string e))
in
Lwt.return @@ Error err
| Ok (resp, body) ->
let* body = Cohttp_lwt.Body.to_string body in
let code = Response.status resp |> Code.code_of_status in
if not (Code.is_error code) then (
match decode with
| `Ret x -> Lwt.return @@ Ok x
| `Dec f ->
let dec = Pbrt.Decoder.of_string body in
let r =
try Ok (f dec)
with e ->
let bt = Printexc.get_backtrace () in
Error
(`Failure
(spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) bt))
in
Lwt.return r
) else (
let dec = Pbrt.Decoder.of_string body in
let r =
try
let status = Status.decode_status dec in
Error (`Status (code, status))
with e ->
let bt = Printexc.get_backtrace () in
Error
(`Failure
(spf
"httpc: decoding of status (url=%S, code=%d) failed with:\n\
%s\n\
status: %S\n\
%s"
full_url code (Printexc.to_string e) body bt))
in
Lwt.return r
)
end
(** Batch of resources to be pushed later.
This type is thread-safe. *)
module Batch : sig
type 'a t
val push : 'a t -> 'a -> bool
(** [push batch x] pushes [x] into the batch, and heuristically
returns [true] if the batch is ready to be emitted (to know if we should
wake up the sending thread, if any) *)
val push' : 'a t -> 'a -> unit
val is_ready : now:Mtime.t -> _ t -> bool
(** is the batch ready to be sent? This is heuristic. *)
val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option
(** Is the batch ready to be emitted? If batching is disabled,
this is true as soon as {!is_empty} is false. If a timeout is provided
for this batch, then it will be ready if an element has been in it
for at least the timeout.
@param now passed to implement timeout *)
val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t
(** Create a new batch *)
end = struct
type 'a t = {
mutable size: int;
mutable q: 'a list;
batch: int option;
high_watermark: int;
timeout: Mtime.span option;
mutable start: Mtime.t;
}
let make ?batch ?timeout () : _ t =
Option.iter (fun b -> assert (b > 0)) batch;
let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in
{
size = 0;
start = Mtime_clock.now ();
q = [];
batch;
timeout;
high_watermark;
}
let is_empty_ self = self.size = 0
let timeout_expired_ ~now self : bool =
match self.timeout with
| Some t ->
let elapsed = Mtime.span now self.start in
Mtime.Span.compare elapsed t >= 0
| None -> false
let is_full_ self : bool =
match self.batch with
| None -> self.size > 0
| Some b -> self.size >= b
let is_ready ~now self : bool = is_full_ self || timeout_expired_ ~now self
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
if self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self)
then (
let l = self.q in
self.q <- [];
self.size <- 0;
assert (l <> []);
Some l
) else
None
let push (self : _ t) x : bool =
if self.size >= self.high_watermark then (
(* drop this to prevent queue from growing too fast *)
Atomic.incr n_dropped;
true
) else (
if self.size = 0 && Option.is_some self.timeout then
(* current batch starts now *)
self.start <- Mtime_clock.now ();
(* add to queue *)
self.size <- 1 + self.size;
self.q <- x :: self.q;
let ready = is_full_ self in
ready
)
let push' self x = ignore (push self x : bool)
end
(** An emitter. This is used by {!Backend} below to forward traces/metrics/…
from the program to whatever collector client we have. *)
module type EMITTER = sig
open Opentelemetry.Proto
val push_trace : Trace.resource_spans list -> unit
val push_metrics : Metrics.resource_metrics list -> unit
val push_logs : Logs.resource_logs list -> unit
val set_on_tick_callbacks : (unit -> unit) list ref -> unit
val tick : unit -> unit
val cleanup : unit -> unit
end
(* make an emitter.
exceptions inside should be caught, see
https://opentelemetry.io/docs/reference/specification/error-handling/ *)
let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let open Proto in
let open Lwt.Syntax in
(* local helpers *)
let open struct
let timeout =
if config.batch_timeout_ms > 0 then
Some Mtime.Span.(config.batch_timeout_ms * ms)
else
None
let batch_traces : Trace.resource_spans list Batch.t =
Batch.make ?batch:config.batch_traces ?timeout ()
let batch_metrics : Metrics.resource_metrics list Batch.t =
Batch.make ?batch:config.batch_metrics ?timeout ()
let batch_logs : Logs.resource_logs list Batch.t =
Batch.make ?batch:config.batch_logs ?timeout ()
let on_tick_cbs_ = Atomic.make (ref [])
let set_on_tick_callbacks = Atomic.set on_tick_cbs_
let send_http_ (httpc : Httpc.t) encoder ~path ~encode x : unit Lwt.t =
Pbrt.Encoder.reset encoder;
encode x encoder;
let data = Pbrt.Encoder.to_string encoder in
let* r = Httpc.send httpc ~path ~decode:(`Ret ()) data in
match r with
| Ok () -> Lwt.return ()
| Error `Sysbreak ->
Printf.eprintf "ctrl-c captured, stopping\n%!";
Atomic.set stop true;
Lwt.return ()
| Error err ->
(* TODO: log error _via_ otel? *)
Atomic.incr n_errors;
report_err_ err;
(* avoid crazy error loop *)
Lwt_unix.sleep 3.
let send_metrics_http curl encoder (l : Metrics.resource_metrics list list)
=
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Metrics_service.default_export_metrics_service_request
~resource_metrics:l ()
in
send_http_ curl encoder ~path:"/v1/metrics"
~encode:Metrics_service.encode_export_metrics_service_request x
let send_traces_http curl encoder (l : Trace.resource_spans list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Trace_service.default_export_trace_service_request ~resource_spans:l ()
in
send_http_ curl encoder ~path:"/v1/traces"
~encode:Trace_service.encode_export_trace_service_request x
let send_logs_http curl encoder (l : Logs.resource_logs list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Logs_service.default_export_logs_service_request ~resource_logs:l ()
in
send_http_ curl encoder ~path:"/v1/logs"
~encode:Logs_service.encode_export_logs_service_request x
(* emit metrics, if the batch is full or timeout lapsed *)
let emit_metrics_maybe ~now ?force httpc encoder : bool Lwt.t =
match Batch.pop_if_ready ?force ~now batch_metrics with
| None -> Lwt.return false
| Some l ->
let batch = !gc_metrics :: l in
gc_metrics := [];
let+ () = send_metrics_http httpc encoder batch in
true
let emit_traces_maybe ~now ?force httpc encoder : bool Lwt.t =
match Batch.pop_if_ready ?force ~now batch_traces with
| None -> Lwt.return false
| Some l ->
let+ () = send_traces_http httpc encoder l in
true
let emit_logs_maybe ~now ?force httpc encoder : bool Lwt.t =
match Batch.pop_if_ready ?force ~now batch_logs with
| None -> Lwt.return false
| Some l ->
let+ () = send_logs_http httpc encoder l in
true
let[@inline] guard_exn_ where f =
try f ()
with e ->
let bt = Printexc.get_backtrace () in
Printf.eprintf
"opentelemetry-curl: uncaught exception in %s: %s\n%s\n%!" where
(Printexc.to_string e) bt
let emit_all_force (httpc : Httpc.t) encoder : unit Lwt.t =
let now = Mtime_clock.now () in
let+ (_ : bool) = emit_traces_maybe ~now ~force:true httpc encoder
and+ (_ : bool) = emit_logs_maybe ~now ~force:true httpc encoder
and+ (_ : bool) = emit_metrics_maybe ~now ~force:true httpc encoder in
()
let tick_common_ () =
if !debug_ then Printf.eprintf "tick (from %d)\n%!" (tid ());
sample_gc_metrics_if_needed ();
List.iter
(fun f ->
try f ()
with e ->
Printf.eprintf "on tick callback raised: %s\n"
(Printexc.to_string e))
!(Atomic.get on_tick_cbs_);
()
(* thread that calls [tick()] regularly, to help enforce timeouts *)
let setup_ticker_thread ~tick ~finally () =
let rec tick_thread () =
if Atomic.get stop then (
finally ();
Lwt.return ()
) else
let* () = Lwt_unix.sleep 0.5 in
let* () = tick () in
tick_thread ()
in
Lwt.async tick_thread
end in
let httpc = Httpc.create () in
let encoder = Pbrt.Encoder.create () in
let module M = struct
(* we make sure that this is thread-safe, even though we don't have a
background thread. There can still be a ticker thread, and there
can also be several user threads that produce spans and call
the emit functions. *)
let push_trace e =
let@ () = guard_exn_ "push trace" in
Batch.push' batch_traces e;
let now = Mtime_clock.now () in
Lwt.async (fun () ->
let+ (_ : bool) = emit_traces_maybe ~now httpc encoder in
())
let push_metrics e =
let@ () = guard_exn_ "push metrics" in
sample_gc_metrics_if_needed ();
Batch.push' batch_metrics e;
let now = Mtime_clock.now () in
Lwt.async (fun () ->
let+ (_ : bool) = emit_metrics_maybe ~now httpc encoder in
())
let push_logs e =
let@ () = guard_exn_ "push logs" in
Batch.push' batch_logs e;
let now = Mtime_clock.now () in
Lwt.async (fun () ->
let+ (_ : bool) = emit_logs_maybe ~now httpc encoder in
())
let set_on_tick_callbacks = set_on_tick_callbacks
let tick_ () =
tick_common_ ();
sample_gc_metrics_if_needed ();
let now = Mtime_clock.now () in
let+ (_ : bool) = emit_traces_maybe ~now httpc encoder
and+ (_ : bool) = emit_logs_maybe ~now httpc encoder
and+ (_ : bool) = emit_metrics_maybe ~now httpc encoder in
()
let () = setup_ticker_thread ~tick:tick_ ~finally:ignore ()
(* if called in a blocking context: work in the background *)
let tick () = Lwt.async tick_
let cleanup () =
if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!";
Lwt.async (fun () ->
let* () = emit_all_force httpc encoder in
Httpc.cleanup httpc;
Lwt.return ())
end in
(module M)
module Backend (Arg : sig
val stop : bool Atomic.t
val config : Config.t
end)
() : Opentelemetry.Collector.BACKEND = struct
include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ())
open Opentelemetry.Proto
open Opentelemetry.Collector
let send_trace : Trace.resource_spans list sender =
{
send =
(fun l ~ret ->
(if !debug_ then
let@ () = Lock.with_lock in
Format.eprintf "send spans %a@."
(Format.pp_print_list Trace.pp_resource_spans)
l);
push_trace l;
ret ());
}
let last_sent_metrics = Atomic.make (Mtime_clock.now ())
let timeout_sent_metrics = Mtime.Span.(5 * s)
(* send metrics from time to time *)
let signal_emit_gc_metrics () =
if !debug_ then
Printf.eprintf "opentelemetry: emit GC metrics requested\n%!";
Atomic.set needs_gc_metrics true
let additional_metrics () : Metrics.resource_metrics list =
(* add exporter metrics to the lot? *)
let last_emit = Atomic.get last_sent_metrics in
let now = Mtime_clock.now () in
let add_own_metrics =
let elapsed = Mtime.span last_emit now in
Mtime.Span.compare elapsed timeout_sent_metrics > 0
in
(* there is a possible race condition here, as several threads might update
metrics at the same time. But that's harmless. *)
if add_own_metrics then (
Atomic.set last_sent_metrics now;
let open OT.Metrics in
[
make_resource_metrics
[
sum ~name:"otel.export.dropped" ~is_monotonic:true
[
int
~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped);
];
sum ~name:"otel.export.errors" ~is_monotonic:true
[
int
~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors);
];
];
]
) else
[]
let send_metrics : Metrics.resource_metrics list sender =
{
send =
(fun m ~ret ->
(if !debug_ then
let@ () = Lock.with_lock in
Format.eprintf "send metrics %a@."
(Format.pp_print_list Metrics.pp_resource_metrics)
m);
let m = List.rev_append (additional_metrics ()) m in
push_metrics m;
ret ());
}
let send_logs : Logs.resource_logs list sender =
{
send =
(fun m ~ret ->
(if !debug_ then
let@ () = Lock.with_lock in
Format.eprintf "send logs %a@."
(Format.pp_print_list Logs.pp_resource_logs)
m);
push_logs m;
ret ());
}
end
let setup_ ?(stop = Atomic.make false) ~(config : Config.t) () =
debug_ := config.debug;
let module B =
Backend
(struct
let stop = stop
let config = config
end)
()
in
Opentelemetry.Collector.set_backend (module B);
B.cleanup
let setup ?stop ?(config = Config.make ()) ?(enable = true) () =
if enable then (
let cleanup = setup_ ?stop ~config () in
at_exit cleanup
)
let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f =
if enable then (
let cleanup = setup_ ?stop ~config () in
Fun.protect ~finally:cleanup f
) else
f ()

View file

@ -0,0 +1,39 @@
(*
TODO: more options from
https://opentelemetry.io/docs/reference/specification/protocol/exporter/
*)
val get_url : unit -> string
val set_url : string -> unit
(** Url of the endpoint. Default is "http://localhost:4318",
or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *)
val get_headers : unit -> (string * string) list
val set_headers : (string * string) list -> unit
(** Set http headers that are sent on every http query to the collector. *)
module Config = Config
val setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.
@param enable actually setup the backend (default true). This can
be used to enable/disable the setup depending on CLI arguments
or environment.
@param config configuration to use
@param stop an atomic boolean. When it becomes true, background threads
will all stop after a little while.
*)
val with_setup :
?stop:bool Atomic.t ->
?config:Config.t ->
?enable:bool ->
unit ->
(unit -> 'a) ->
'a
(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up
after [f()] returns
See {!setup} for more details. *)

View file

@ -981,8 +981,6 @@ end
(** A set of callbacks that produce metrics when called.
The metrics are automatically called regularly.
This allows applications to register metrics callbacks from various points
in the program (or even in libraries), and not worry about setting
alarms/intervals to emit them. *)