This commit is contained in:
Simon Cruanes 2022-05-12 11:54:06 -04:00
parent 8c363341e5
commit 3f9bd94837
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
25 changed files with 956 additions and 787 deletions

4
dune
View file

@ -1,3 +1,3 @@
(env (env
(_ (flags :standard -warn-error -a+8))) (_
(flags :standard -warn-error -a+8)))

View file

@ -1,4 +1,3 @@
(**************************************************************************) (**************************************************************************)
(* *) (* *)
(* OCaml *) (* OCaml *)
@ -19,34 +18,34 @@
(** Atomic references. (** Atomic references.
*) *)
(** An atomic (mutable) reference to a value of type ['a]. *)
type 'a t = 'a Stdlib.Atomic.t type 'a t = 'a Stdlib.Atomic.t
(** An atomic (mutable) reference to a value of type ['a]. *)
(** Create an atomic reference. *)
val make : 'a -> 'a t val make : 'a -> 'a t
(** Create an atomic reference. *)
(** Get the current value of the atomic reference. *)
val get : 'a t -> 'a val get : 'a t -> 'a
(** Get the current value of the atomic reference. *)
(** Set a new value for the atomic reference. *)
val set : 'a t -> 'a -> unit val set : 'a t -> 'a -> unit
(** Set a new value for the atomic reference. *)
(** Set a new value for the atomic reference, and return the current value. *)
val exchange : 'a t -> 'a -> 'a val exchange : 'a t -> 'a -> 'a
(** Set a new value for the atomic reference, and return the current value. *)
val compare_and_set : 'a t -> 'a -> 'a -> bool
(** [compare_and_set r seen v] sets the new value of [r] to [v] only (** [compare_and_set r seen v] sets the new value of [r] to [v] only
if its current value is physically equal to [seen] -- the if its current value is physically equal to [seen] -- the
comparison and the set occur atomically. Returns [true] if the comparison and the set occur atomically. Returns [true] if the
comparison succeeded (so the set happened) and [false] comparison succeeded (so the set happened) and [false]
otherwise. *) otherwise. *)
val compare_and_set : 'a t -> 'a -> 'a -> bool
val fetch_and_add : int t -> int -> int
(** [fetch_and_add r n] atomically increments the value of [r] by [n], (** [fetch_and_add r n] atomically increments the value of [r] by [n],
and returns the current value (before the increment). *) and returns the current value (before the increment). *)
val fetch_and_add : int t -> int -> int
(** [incr r] atomically increments the value of [r] by [1]. *)
val incr : int t -> unit val incr : int t -> unit
(** [incr r] atomically increments the value of [r] by [1]. *)
(** [decr r] atomically decrements the value of [r] by [1]. *)
val decr : int t -> unit val decr : int t -> unit
(** [decr r] atomically decrements the value of [r] by [1]. *)

View file

@ -1,4 +1,3 @@
(**************************************************************************) (**************************************************************************)
(* *) (* *)
(* OCaml *) (* OCaml *)
@ -19,34 +18,34 @@
(** Atomic references. (** Atomic references.
*) *)
(** An atomic (mutable) reference to a value of type ['a]. *)
type 'a t type 'a t
(** An atomic (mutable) reference to a value of type ['a]. *)
(** Create an atomic reference. *)
val make : 'a -> 'a t val make : 'a -> 'a t
(** Create an atomic reference. *)
(** Get the current value of the atomic reference. *)
val get : 'a t -> 'a val get : 'a t -> 'a
(** Get the current value of the atomic reference. *)
(** Set a new value for the atomic reference. *)
val set : 'a t -> 'a -> unit val set : 'a t -> 'a -> unit
(** Set a new value for the atomic reference. *)
(** Set a new value for the atomic reference, and return the current value. *)
val exchange : 'a t -> 'a -> 'a val exchange : 'a t -> 'a -> 'a
(** Set a new value for the atomic reference, and return the current value. *)
val compare_and_set : 'a t -> 'a -> 'a -> bool
(** [compare_and_set r seen v] sets the new value of [r] to [v] only (** [compare_and_set r seen v] sets the new value of [r] to [v] only
if its current value is physically equal to [seen] -- the if its current value is physically equal to [seen] -- the
comparison and the set occur atomically. Returns [true] if the comparison and the set occur atomically. Returns [true] if the
comparison succeeded (so the set happened) and [false] comparison succeeded (so the set happened) and [false]
otherwise. *) otherwise. *)
val compare_and_set : 'a t -> 'a -> 'a -> bool
val fetch_and_add : int t -> int -> int
(** [fetch_and_add r n] atomically increments the value of [r] by [n], (** [fetch_and_add r n] atomically increments the value of [r] by [n],
and returns the current value (before the increment). *) and returns the current value (before the increment). *)
val fetch_and_add : int t -> int -> int
(** [incr r] atomically increments the value of [r] by [1]. *)
val incr : int t -> unit val incr : int t -> unit
(** [incr r] atomically increments the value of [r] by [1]. *)
(** [decr r] atomically decrements the value of [r] by [1]. *)
val decr : int t -> unit val decr : int t -> unit
(** [decr r] atomically decrements the value of [r] by [1]. *)

View file

@ -1,4 +1,3 @@
(library (library
(name opentelemetry_atomic) (name opentelemetry_atomic)
(synopsis "Compatibility package for the Atomic module for opentelemetry") (synopsis "Compatibility package for the Atomic module for opentelemetry")
@ -12,4 +11,5 @@
(rule (rule
(targets atomic.ml atomic.mli atomic.ml) (targets atomic.ml atomic.mli atomic.ml)
(deps atomic.pre412.mli atomic.post412.mli) (deps atomic.pre412.mli atomic.post412.mli)
(action (run ./gen.exe))) (action
(run ./gen.exe)))

View file

@ -1,6 +1,5 @@
let atomic_before_412 =
{|
let atomic_before_412 = {|
type 'a t = {mutable x: 'a} type 'a t = {mutable x: 'a}
let[@inline] make x = {x} let[@inline] make x = {x}
let[@inline] get {x} = x let[@inline] get {x} = x
@ -32,7 +31,9 @@ let atomic_before_412 = {|
let atomic_after_412 = {|include Stdlib.Atomic|} let atomic_after_412 = {|include Stdlib.Atomic|}
let write_file file s = let write_file file s =
let oc = open_out file in output_string oc s; close_out oc let oc = open_out file in
output_string oc s;
close_out oc
let copy_file file1 file2 = let copy_file file1 file2 =
let oc = open_out file2 in let oc = open_out file2 in
@ -48,7 +49,15 @@ let copy_file file1 file2 =
let () = let () =
let version = Scanf.sscanf Sys.ocaml_version "%d.%d.%s" (fun x y _ -> x, y) in let version = Scanf.sscanf Sys.ocaml_version "%d.%d.%s" (fun x y _ -> x, y) in
write_file "atomic.ml" (if version >= (4,12) then atomic_after_412 else atomic_before_412); write_file "atomic.ml"
copy_file (if version >= (4,12) then "atomic.post412.mli" else "atomic.pre412.mli") "atomic.mli" ; (if version >= (4, 12) then
atomic_after_412
else
atomic_before_412);
copy_file
(if version >= (4, 12) then
"atomic.post412.mli"
else
"atomic.pre412.mli")
"atomic.mli";
() ()

View file

@ -1,13 +1,21 @@
module Atomic = Opentelemetry_atomic.Atomic module Atomic = Opentelemetry_atomic.Atomic
type 'a t = 'a list Atomic.t type 'a t = 'a list Atomic.t
let make () = Atomic.make [] let make () = Atomic.make []
let add self x = let add self x =
while while
let old = Atomic.get self in let old = Atomic.get self in
let l' = x :: old in let l' = x :: old in
not (Atomic.compare_and_set self old l') not (Atomic.compare_and_set self old l')
do () done do
()
done
let rec pop_all self = let rec pop_all self =
let l = Atomic.get self in let l = Atomic.get self in
if Atomic.compare_and_set self l [] then l else pop_all self if Atomic.compare_and_set self l [] then
l
else
pop_all self

View file

@ -1,6 +1,9 @@
(** Atomic list *) (** Atomic list *)
type 'a t type 'a t
val make : unit -> 'a t val make : unit -> 'a t
val add : 'a t -> 'a -> unit val add : 'a t -> 'a -> unit
val pop_all : 'a t -> 'a list val pop_all : 'a t -> 'a list

View file

@ -1,5 +1,3 @@
type 'a t = { type 'a t = {
arr: 'a array; arr: 'a array;
mutable i: int; mutable i: int;
@ -7,15 +5,15 @@ type 'a t = {
let create ~dummy n : _ t = let create ~dummy n : _ t =
assert (n >= 1); assert (n >= 1);
{ arr=Array.make n dummy; { arr = Array.make n dummy; i = 0 }
i=0;
}
let[@inline] size self = self.i let[@inline] size self = self.i
let[@inline] is_full self = self.i = Array.length self.arr let[@inline] is_full self = self.i = Array.length self.arr
let push (self : _ t) x : bool = let push (self : _ t) x : bool =
if is_full self then false if is_full self then
false
else ( else (
self.arr.(self.i) <- x; self.arr.(self.i) <- x;
self.i <- 1 + self.i; self.i <- 1 + self.i;

View file

@ -1,9 +1,11 @@
(** queue of fixed size *) (** queue of fixed size *)
type 'a t type 'a t
val create : dummy:'a -> int -> 'a t
val size : _ t -> int
val push : 'a t -> 'a -> bool (* true iff it could write element *)
val pop_iter_all : 'a t -> ('a -> unit) -> unit
val create : dummy:'a -> int -> 'a t
val size : _ t -> int
val push : 'a t -> 'a -> bool (* true iff it could write element *)
val pop_iter_all : 'a t -> ('a -> unit) -> unit

View file

@ -2,9 +2,14 @@ module Atomic = Opentelemetry_atomic.Atomic
let[@inline] ( let@ ) f x = f x let[@inline] ( let@ ) f x = f x
let debug_ = ref (match Sys.getenv_opt "OTEL_OCAML_DEBUG" with Some ("1"|"true") -> true | _ -> false) let debug_ =
ref
(match Sys.getenv_opt "OTEL_OCAML_DEBUG" with
| Some ("1" | "true") -> true
| _ -> false)
let lock_ : (unit -> unit) ref = ref ignore let lock_ : (unit -> unit) ref = ref ignore
let unlock_ : (unit -> unit) ref = ref ignore let unlock_ : (unit -> unit) ref = ref ignore
let set_mutex ~lock ~unlock : unit = let set_mutex ~lock ~unlock : unit =
@ -21,6 +26,10 @@ let[@inline] with_mutex_ m f =
Fun.protect ~finally:(fun () -> Mutex.unlock m) f Fun.protect ~finally:(fun () -> Mutex.unlock m) f
let default_url = "http://localhost:4318" let default_url = "http://localhost:4318"
let url = ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url)
let url =
ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url)
let get_url () = !url let get_url () = !url
let set_url s = url := s let set_url s = url := s

View file

@ -1,4 +1,3 @@
open Common_ open Common_
type t = { type t = {
@ -14,25 +13,34 @@ type t = {
let pp out self = let pp out self =
let ppiopt = Format.pp_print_option Format.pp_print_int in let ppiopt = Format.pp_print_option Format.pp_print_int in
let {debug; url; batch_traces; batch_metrics; batch_logs; let {
batch_timeout_ms; thread; ticker_thread} = self in debug;
url;
batch_traces;
batch_metrics;
batch_logs;
batch_timeout_ms;
thread;
ticker_thread;
} =
self
in
Format.fprintf out Format.fprintf out
"{@[ debug=%B;@ url=%S;@ \ "{@[ debug=%B;@ url=%S;@ batch_traces=%a;@ batch_metrics=%a;@ \
batch_traces=%a;@ batch_metrics=%a;@ batch_logs=%a;@ \ batch_logs=%a;@ batch_timeout_ms=%d; thread=%B;@ ticker_thread=%B @]}"
batch_timeout_ms=%d; thread=%B;@ ticker_thread=%B @]}"
debug url ppiopt batch_traces ppiopt batch_metrics ppiopt batch_logs debug url ppiopt batch_traces ppiopt batch_metrics ppiopt batch_logs
batch_timeout_ms thread ticker_thread batch_timeout_ms thread ticker_thread
let make let make ?(debug = !debug_) ?(url = get_url ()) ?(batch_traces = Some 400)
?(debug= !debug_) ?(batch_metrics = None) ?(batch_logs = Some 400) ?(batch_timeout_ms = 500)
?(url= get_url()) ?(thread = true) ?(ticker_thread = true) () : t =
?(batch_traces=Some 400) {
?(batch_metrics=None) debug;
?(batch_logs=Some 400) url;
?(batch_timeout_ms=500) batch_traces;
?(thread=true) batch_metrics;
?(ticker_thread=true) batch_logs;
() : t =
{ debug; url; batch_traces; batch_metrics; batch_logs;
batch_timeout_ms; batch_timeout_ms;
thread; ticker_thread; } thread;
ticker_thread;
}

View file

@ -1,11 +1,8 @@
type t = { type t = {
debug: bool; debug: bool;
url: string; url: string;
(** Url of the endpoint. Default is "http://localhost:4318", (** Url of the endpoint. Default is "http://localhost:4318",
or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *) or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *)
batch_traces: int option; batch_traces: int option;
(** Batch traces? If [Some i], then this produces batches of (at most) (** Batch traces? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching. [i] items. If [None], there is no batching.
@ -13,7 +10,6 @@ type t = {
Note that traces and metrics are batched separately. Note that traces and metrics are batched separately.
Default [Some 400]. Default [Some 400].
*) *)
batch_metrics: int option; batch_metrics: int option;
(** Batch metrics? If [Some i], then this produces batches of (at most) (** Batch metrics? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching. [i] items. If [None], there is no batching.
@ -21,20 +17,15 @@ type t = {
Note that traces and metrics are batched separately. Note that traces and metrics are batched separately.
Default [None]. Default [None].
*) *)
batch_logs: int option; batch_logs: int option;
(** Batch logs? See {!batch_metrics} for details. (** Batch logs? See {!batch_metrics} for details.
Default [Some 400] *) Default [Some 400] *)
batch_timeout_ms: int; batch_timeout_ms: int;
(** Number of milliseconds after which we will emit a batch, even (** Number of milliseconds after which we will emit a batch, even
incomplete. incomplete.
Note that the batch might take longer than that, because this is Note that the batch might take longer than that, because this is
only checked when a new event occurs. Default 500. *) only checked when a new event occurs. Default 500. *)
thread: bool; (** Is there a background thread? Default [true] *)
thread: bool;
(** Is there a background thread? Default [true] *)
ticker_thread: bool; ticker_thread: bool;
(** Is there a ticker thread? Default [true]. (** Is there a ticker thread? Default [true].
This thread will regularly call [tick()] on the backend, to make This thread will regularly call [tick()] on the backend, to make
@ -43,14 +34,16 @@ type t = {
} }
val make : val make :
?debug:bool -> ?url:string -> ?debug:bool ->
?url:string ->
?batch_traces:int option -> ?batch_traces:int option ->
?batch_metrics:int option -> ?batch_metrics:int option ->
?batch_logs:int option -> ?batch_logs:int option ->
?batch_timeout_ms:int -> ?batch_timeout_ms:int ->
?thread:bool -> ?thread:bool ->
?ticker_thread:bool -> ?ticker_thread:bool ->
unit -> t unit ->
t
(** Make a configuration *) (** Make a configuration *)
val pp : Format.formatter -> t -> unit val pp : Format.formatter -> t -> unit

View file

@ -1,8 +1,5 @@
(library (library
(name opentelemetry_client_ocurl) (name opentelemetry_client_ocurl)
(public_name opentelemetry-client-ocurl) (public_name opentelemetry-client-ocurl)
(libraries opentelemetry opentelemetry.atomic (libraries opentelemetry opentelemetry.atomic curl ocaml-protoc threads
curl ocaml-protoc threads
mtime mtime.clock.os)) mtime mtime.clock.os))

View file

@ -1,4 +1,3 @@
open Common_ open Common_
(* generate random IDs *) (* generate random IDs *)
@ -9,26 +8,29 @@ module Make() = struct
let@ () = with_lock_ in let@ () = with_lock_ in
let b = Bytes.create 8 in let b = Bytes.create 8 in
for i = 0 to 1 do for i = 0 to 1 do
let r = Random.State.bits rand_ in (* 30 bits, of which we use 24 *) let r = Random.State.bits rand_ in
(* 30 bits, of which we use 24 *)
Bytes.set b (i * 3) (Char.chr (r land 0xff)); Bytes.set b (i * 3) (Char.chr (r land 0xff));
Bytes.set b (i*3+1) (Char.chr (r lsr 8 land 0xff)); Bytes.set b ((i * 3) + 1) (Char.chr ((r lsr 8) land 0xff));
Bytes.set b (i*3+2) (Char.chr (r lsr 16 land 0xff)); Bytes.set b ((i * 3) + 2) (Char.chr ((r lsr 16) land 0xff))
done; done;
let r = Random.State.bits rand_ in let r = Random.State.bits rand_ in
Bytes.set b 6 (Char.chr (r land 0xff)); Bytes.set b 6 (Char.chr (r land 0xff));
Bytes.set b 7 (Char.chr (r lsr 8 land 0xff)); Bytes.set b 7 (Char.chr ((r lsr 8) land 0xff));
b b
let rand_bytes_16 () : bytes = let rand_bytes_16 () : bytes =
let@ () = with_lock_ in let@ () = with_lock_ in
let b = Bytes.create 16 in let b = Bytes.create 16 in
for i = 0 to 4 do for i = 0 to 4 do
let r = Random.State.bits rand_ in (* 30 bits, of which we use 24 *) let r = Random.State.bits rand_ in
(* 30 bits, of which we use 24 *)
Bytes.set b (i * 3) (Char.chr (r land 0xff)); Bytes.set b (i * 3) (Char.chr (r land 0xff));
Bytes.set b (i*3+1) (Char.chr (r lsr 8 land 0xff)); Bytes.set b ((i * 3) + 1) (Char.chr ((r lsr 8) land 0xff));
Bytes.set b (i*3+2) (Char.chr (r lsr 16 land 0xff)); Bytes.set b ((i * 3) + 2) (Char.chr ((r lsr 16) land 0xff))
done; done;
let r = Random.State.bits rand_ in let r = Random.State.bits rand_ in
Bytes.set b 15 (Char.chr (r land 0xff)); (* last byte *) Bytes.set b 15 (Char.chr (r land 0xff));
(* last byte *)
b b
end end

View file

@ -1,4 +1,3 @@
(* (*
https://github.com/open-telemetry/oteps/blob/main/text/0035-opentelemetry-protocol.md https://github.com/open-telemetry/oteps/blob/main/text/0035-opentelemetry-protocol.md
https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md
@ -10,47 +9,55 @@ include Common_
let needs_gc_metrics = Atomic.make false let needs_gc_metrics = Atomic.make false
let gc_metrics = AList.make() (* side channel for GC, appended to {!E_metrics}'s data *) let gc_metrics = AList.make ()
(* side channel for GC, appended to {!E_metrics}'s data *)
(* capture current GC metrics and push them into {!gc_metrics} for later (* capture current GC metrics and push them into {!gc_metrics} for later
collection *) collection *)
let sample_gc_metrics () = let sample_gc_metrics () =
Atomic.set needs_gc_metrics false; Atomic.set needs_gc_metrics false;
let l = OT.Metrics.make_resource_metrics let l =
OT.Metrics.make_resource_metrics
~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ())
@@ Opentelemetry.GC_metrics.get_metrics() in @@ Opentelemetry.GC_metrics.get_metrics ()
in
AList.add gc_metrics l AList.add gc_metrics l
module Config = Config module Config = Config
let _init_curl = lazy ( let _init_curl =
Curl.global_init Curl.CURLINIT_GLOBALALL; lazy
at_exit Curl.global_cleanup; (Curl.global_init Curl.CURLINIT_GLOBALALL;
) at_exit Curl.global_cleanup)
type error = [ type error =
| `Status of int * Opentelemetry.Proto.Status.status [ `Status of int * Opentelemetry.Proto.Status.status
| `Failure of string | `Failure of string
] ]
let n_errors = Atomic.make 0 let n_errors = Atomic.make 0
let n_dropped = Atomic.make 0 let n_dropped = Atomic.make 0
let report_err_ = function let report_err_ = function
| `Failure msg -> | `Failure msg ->
Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg
| `Status (code, status) -> | `Status (code, status) ->
Format.eprintf "@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." Format.eprintf
code Proto.Status.pp_status status "@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." code
Proto.Status.pp_status status
module type CURL = sig module type CURL = sig
val send : path:string -> decode:(Pbrt.Decoder.t -> 'a) -> string -> ('a, error) result val send :
path:string -> decode:(Pbrt.Decoder.t -> 'a) -> string -> ('a, error) result
val cleanup : unit -> unit val cleanup : unit -> unit
end end
(* create a curl client *) (* create a curl client *)
module Curl () : CURL = struct module Curl () : CURL = struct
open Opentelemetry.Proto open Opentelemetry.Proto
let () = Lazy.force _init_curl let () = Lazy.force _init_curl
let buf_res = Buffer.create 256 let buf_res = Buffer.create 256
@ -75,25 +82,25 @@ module Curl() : CURL = struct
Curl.set_post curl true; Curl.set_post curl true;
Curl.set_postfieldsize curl (String.length bod); Curl.set_postfieldsize curl (String.length bod);
Curl.set_readfunction curl Curl.set_readfunction curl
begin (let i = ref 0 in
let i = ref 0 in fun n ->
(fun n ->
if !debug_ then Printf.eprintf "curl asks for %d bytes\n%!" n; if !debug_ then Printf.eprintf "curl asks for %d bytes\n%!" n;
let len = min n (String.length bod - !i) in let len = min n (String.length bod - !i) in
let s = String.sub bod !i len in let s = String.sub bod !i len in
if !debug_ then Printf.eprintf "gave curl %d bytes\n%!" len; if !debug_ then Printf.eprintf "gave curl %d bytes\n%!" len;
i := !i + len; i := !i + len;
s) s);
end;
(* read result's body *) (* read result's body *)
Buffer.clear buf_res; Buffer.clear buf_res;
Curl.set_writefunction curl Curl.set_writefunction curl (fun s ->
(fun s -> Buffer.add_string buf_res s; String.length s); Buffer.add_string buf_res s;
String.length s);
try try
match Curl.perform curl with match Curl.perform curl with
| () -> | () ->
let code = Curl.get_responsecode curl in let code = Curl.get_responsecode curl in
if !debug_ then Printf.eprintf "result body: %S\n%!" (Buffer.contents buf_res); if !debug_ then
Printf.eprintf "result body: %S\n%!" (Buffer.contents buf_res);
let dec = Pbrt.Decoder.of_string (Buffer.contents buf_res) in let dec = Pbrt.Decoder.of_string (Buffer.contents buf_res) in
if code >= 200 && code < 300 then ( if code >= 200 && code < 300 then (
let res = decode dec in let res = decode dec in
@ -103,17 +110,24 @@ module Curl() : CURL = struct
Error (`Status (code, status)) Error (`Status (code, status))
) )
| exception Curl.CurlException (_, code, msg) -> | exception Curl.CurlException (_, code, msg) ->
let status = Status.default_status let status =
~code:(Int32.of_int code) ~message:(Bytes.unsafe_of_string msg) () in Status.default_status ~code:(Int32.of_int code)
~message:(Bytes.unsafe_of_string msg)
()
in
Error (`Status (code, status)) Error (`Status (code, status))
with e -> Error (`Failure (Printexc.to_string e)) with e -> Error (`Failure (Printexc.to_string e))
end end
module type PUSH = sig module type PUSH = sig
type elt type elt
val push : elt -> unit val push : elt -> unit
val is_empty : unit -> bool val is_empty : unit -> bool
val is_big_enough : unit -> bool val is_big_enough : unit -> bool
val pop_iter_all : (elt -> unit) -> unit val pop_iter_all : (elt -> unit) -> unit
end end
@ -123,19 +137,24 @@ module type EMITTER = sig
open Opentelemetry.Proto open Opentelemetry.Proto
val push_trace : Trace.resource_spans list -> unit val push_trace : Trace.resource_spans list -> unit
val push_metrics : Metrics.resource_metrics list -> unit val push_metrics : Metrics.resource_metrics list -> unit
val push_logs : Logs.resource_logs list -> unit val push_logs : Logs.resource_logs list -> unit
val tick : unit -> unit val tick : unit -> unit
val cleanup : unit -> unit val cleanup : unit -> unit
end end
type 'a push = (module PUSH with type elt = 'a) type 'a push = (module PUSH with type elt = 'a)
type on_full_cb = (unit -> unit)
type on_full_cb = unit -> unit
(* make a "push" object, along with a setter for a callback to call when (* make a "push" object, along with a setter for a callback to call when
it's ready to emit a batch *) it's ready to emit a batch *)
let mk_push (type a) ?batch () : (module PUSH with type elt = a) * (on_full_cb -> unit) = let mk_push (type a) ?batch () :
(module PUSH with type elt = a) * (on_full_cb -> unit) =
let on_full : on_full_cb ref = ref ignore in let on_full : on_full_cb ref = ref ignore in
let push = let push =
match batch with match batch with
@ -143,33 +162,41 @@ let mk_push (type a) ?batch () : (module PUSH with type elt = a) * (on_full_cb -
let r = ref None in let r = ref None in
let module M = struct let module M = struct
type elt = a type elt = a
let is_empty () = !r == None let is_empty () = !r == None
let is_big_enough () = !r != None let is_big_enough () = !r != None
let push x = let push x =
r := Some x; !on_full() r := Some x;
let pop_iter_all f = Option.iter f !r; r := None !on_full ()
let pop_iter_all f =
Option.iter f !r;
r := None
end in end in
(module M : PUSH with type elt = a) (module M : PUSH with type elt = a)
| Some n -> | Some n ->
let q = FQueue.create ~dummy:(Obj.magic 0) (3 * n) in let q = FQueue.create ~dummy:(Obj.magic 0) (3 * n) in
let module M = struct let module M = struct
type elt = a type elt = a
let is_empty () = FQueue.size q = 0 let is_empty () = FQueue.size q = 0
let is_big_enough () = FQueue.size q >= n let is_big_enough () = FQueue.size q >= n
let push x = let push x =
if not (FQueue.push q x) || FQueue.size q > n then ( if (not (FQueue.push q x)) || FQueue.size q > n then (
!on_full (); !on_full ();
if not (FQueue.push q x) then ( if not (FQueue.push q x) then Atomic.incr n_dropped (* drop item *)
Atomic.incr n_dropped; (* drop item *)
)
) )
let pop_iter_all f = FQueue.pop_iter_all q f let pop_iter_all f = FQueue.pop_iter_all q f
end in end in
(module M : PUSH with type elt = a) (module M : PUSH with type elt = a)
in in
push, ((:=) on_full)
push, ( := ) on_full
(* start a thread in the background, running [f()] *) (* start a thread in the background, running [f()] *)
let start_bg_thread (f : unit -> unit) : unit = let start_bg_thread (f : unit -> unit) : unit =
@ -180,7 +207,10 @@ let start_bg_thread (f: unit -> unit) : unit =
in in
ignore (Thread.create run () : Thread.t) ignore (Thread.create run () : Thread.t)
let l_is_empty = function [] -> true | _::_ -> false let l_is_empty = function
| [] -> true
| _ :: _ -> false
let batch_is_empty = List.for_all l_is_empty let batch_is_empty = List.for_all l_is_empty
(* make an emitter. (* make an emitter.
@ -189,15 +219,18 @@ let batch_is_empty = List.for_all l_is_empty
https://opentelemetry.io/docs/reference/specification/error-handling/ *) https://opentelemetry.io/docs/reference/specification/error-handling/ *)
let mk_emitter ~(config : Config.t) () : (module EMITTER) = let mk_emitter ~(config : Config.t) () : (module EMITTER) =
let open Proto in let open Proto in
let continue = ref true in let continue = ref true in
let ((module E_trace) : Trace.resource_spans list push), on_trace_full = let ((module E_trace) : Trace.resource_spans list push), on_trace_full =
mk_push ?batch:config.batch_traces () in mk_push ?batch:config.batch_traces ()
let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full = in
mk_push ?batch:config.batch_metrics () in let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full
=
mk_push ?batch:config.batch_metrics ()
in
let ((module E_logs) : Logs.resource_logs list push), on_logs_full = let ((module E_logs) : Logs.resource_logs list push), on_logs_full =
mk_push ?batch:config.batch_logs () in mk_push ?batch:config.batch_logs ()
in
let encoder = Pbrt.Encoder.create () in let encoder = Pbrt.Encoder.create () in
@ -207,22 +240,20 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
Pbrt.Encoder.reset encoder; Pbrt.Encoder.reset encoder;
encode x encoder; encode x encoder;
let data = Pbrt.Encoder.to_string encoder in let data = Pbrt.Encoder.to_string encoder in
begin match match C.send ~path ~decode:(fun _ -> ()) data with
C.send ~path ~decode:(fun _ -> ()) data
with
| Ok () -> () | Ok () -> ()
| Error err -> | Error err ->
(* TODO: log error _via_ otel? *) (* TODO: log error _via_ otel? *)
Atomic.incr n_errors; Atomic.incr n_errors;
report_err_ err report_err_ err
end;
in in
let send_metrics_http (l : Metrics.resource_metrics list list) = let send_metrics_http (l : Metrics.resource_metrics list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x = let x =
Metrics_service.default_export_metrics_service_request Metrics_service.default_export_metrics_service_request ~resource_metrics:l
~resource_metrics:l () in ()
in
send_http_ ~path:"/v1/metrics" send_http_ ~path:"/v1/metrics"
~encode:Metrics_service.encode_export_metrics_service_request x ~encode:Metrics_service.encode_export_metrics_service_request x
in in
@ -230,8 +261,8 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let send_traces_http (l : Trace.resource_spans list list) = let send_traces_http (l : Trace.resource_spans list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x = let x =
Trace_service.default_export_trace_service_request Trace_service.default_export_trace_service_request ~resource_spans:l ()
~resource_spans:l () in in
send_http_ ~path:"/v1/traces" send_http_ ~path:"/v1/traces"
~encode:Trace_service.encode_export_trace_service_request x ~encode:Trace_service.encode_export_trace_service_request x
in in
@ -239,8 +270,8 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let send_logs_http (l : Logs.resource_logs list list) = let send_logs_http (l : Logs.resource_logs list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x = let x =
Logs_service.default_export_logs_service_request Logs_service.default_export_logs_service_request ~resource_logs:l ()
~resource_logs:l () in in
send_http_ ~path:"/v1/logs" send_http_ ~path:"/v1/logs"
~encode:Logs_service.encode_export_logs_service_request x ~encode:Logs_service.encode_export_logs_service_request x
in in
@ -254,20 +285,22 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let emit_ (type a) (module P : PUSH with type elt = a list) let emit_ (type a) (module P : PUSH with type elt = a list)
?(init = fun () -> []) ?(force = false) ~send_http () : bool = ?(init = fun () -> []) ?(force = false) ~send_http () : bool =
if force || (not force && P.is_big_enough ()) then ( if force || ((not force) && P.is_big_enough ()) then (
let batch = ref [ init () ] in let batch = ref [ init () ] in
P.pop_iter_all (fun l -> batch := l :: !batch); P.pop_iter_all (fun l -> batch := l :: !batch);
let do_something = not (l_is_empty !batch) in let do_something = not (l_is_empty !batch) in
if do_something then ( if do_something then (
send_http !batch; send_http !batch;
Atomic.set last_wakeup (Mtime_clock.now()); Atomic.set last_wakeup (Mtime_clock.now ())
); );
do_something do_something
) else false ) else
false
in in
let emit_metrics ?force () : bool = let emit_metrics ?force () : bool =
emit_ (module E_metrics) emit_
(module E_metrics)
~init:(fun () -> AList.pop_all gc_metrics) ~init:(fun () -> AList.pop_all gc_metrics)
~send_http:send_metrics_http () ~send_http:send_metrics_http ()
and emit_traces ?force () : bool = and emit_traces ?force () : bool =
@ -286,15 +319,12 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let emit_all_force () = let emit_all_force () =
ignore (emit_traces ~force:true () : bool); ignore (emit_traces ~force:true () : bool);
ignore (emit_metrics ~force:true () : bool); ignore (emit_metrics ~force:true () : bool);
ignore (emit_logs ~force:true () : bool); ignore (emit_logs ~force:true () : bool)
in in
if config.thread then ( if config.thread then (
begin (let m = Mutex.create () in
let m = Mutex.create() in set_mutex ~lock:(fun () -> Mutex.lock m) ~unlock:(fun () -> Mutex.unlock m));
set_mutex ~lock:(fun () -> Mutex.lock m) ~unlock:(fun () -> Mutex.unlock m);
end;
let ((module C) as curl) = (module Curl () : CURL) in let ((module C) as curl) = (module Curl () : CURL) in
@ -310,18 +340,15 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let do_metrics = emit_metrics ~force:timeout () in let do_metrics = emit_metrics ~force:timeout () in
let do_traces = emit_traces ~force:timeout () in let do_traces = emit_traces ~force:timeout () in
let do_logs = emit_logs ~force:timeout () in let do_logs = emit_logs ~force:timeout () in
if not do_metrics && not do_traces && not do_logs then ( if (not do_metrics) && (not do_traces) && not do_logs then
(* wait *) (* wait *)
let@ () = with_mutex_ m in let@ () = with_mutex_ m in
Condition.wait cond m; Condition.wait cond m
)
done; done;
(* flush remaining events *) (* flush remaining events *)
begin
let@ () = guard in let@ () = guard in
emit_all_force (); emit_all_force ();
C.cleanup(); C.cleanup ()
end
in in
start_bg_thread bg_thread; start_bg_thread bg_thread;
@ -344,42 +371,43 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let tick_thread () = let tick_thread () =
while true do while true do
Thread.delay 0.5; Thread.delay 0.5;
tick(); tick ()
done done
in in
start_bg_thread tick_thread; start_bg_thread tick_thread
); );
let module M = struct let module M = struct
let push_trace e = let push_trace e =
E_trace.push e; E_trace.push e;
if batch_timeout () then wakeup () if batch_timeout () then wakeup ()
let push_metrics e = let push_metrics e =
E_metrics.push e; E_metrics.push e;
if batch_timeout () then wakeup () if batch_timeout () then wakeup ()
let push_logs e = let push_logs e =
E_logs.push e; E_logs.push e;
if batch_timeout () then wakeup () if batch_timeout () then wakeup ()
let tick = tick let tick = tick
let cleanup () = let cleanup () =
continue := false; continue := false;
with_mutex_ m (fun () -> Condition.broadcast cond) with_mutex_ m (fun () -> Condition.broadcast cond)
end in end in
(module M) (module M)
) else ( ) else (
on_metrics_full (fun () -> on_metrics_full (fun () ->
if Atomic.get needs_gc_metrics then sample_gc_metrics (); if Atomic.get needs_gc_metrics then sample_gc_metrics ();
ignore (emit_metrics () : bool)); ignore (emit_metrics () : bool));
on_trace_full (fun () -> on_trace_full (fun () -> ignore (emit_traces () : bool));
ignore (emit_traces () : bool)); on_logs_full (fun () -> ignore (emit_logs () : bool));
on_logs_full (fun () ->
ignore (emit_logs () : bool));
let cleanup () = let cleanup () =
emit_all_force (); emit_all_force ();
C.cleanup(); C.cleanup ()
in in
let module M = struct let module M = struct
@ -407,9 +435,10 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
(module M) (module M)
) )
module Backend(Arg : sig val config : Config.t end)() module Backend (Arg : sig
: Opentelemetry.Collector.BACKEND val config : Config.t
= struct end)
() : Opentelemetry.Collector.BACKEND = struct
include Gen_ids.Make () include Gen_ids.Make ()
include (val mk_emitter ~config:Arg.config ()) include (val mk_emitter ~config:Arg.config ())
@ -417,16 +446,23 @@ module Backend(Arg : sig val config : Config.t end)()
open Opentelemetry.Proto open Opentelemetry.Proto
open Opentelemetry.Collector open Opentelemetry.Collector
let send_trace : Trace.resource_spans list sender = { let send_trace : Trace.resource_spans list sender =
send=fun l ~ret -> {
send =
(fun l ~ret ->
let@ () = with_lock_ in let@ () = with_lock_ in
if !debug_ then Format.eprintf "send spans %a@." (Format.pp_print_list Trace.pp_resource_spans) l; if !debug_ then
Format.eprintf "send spans %a@."
(Format.pp_print_list Trace.pp_resource_spans)
l;
push_trace l; push_trace l;
ret() ret ());
} }
let last_sent_metrics = Atomic.make (Mtime_clock.now ()) let last_sent_metrics = Atomic.make (Mtime_clock.now ())
let timeout_sent_metrics = Mtime.Span.(5 * s) (* send metrics from time to time *)
let timeout_sent_metrics = Mtime.Span.(5 * s)
(* send metrics from time to time *)
let signal_emit_gc_metrics () = Atomic.set needs_gc_metrics true let signal_emit_gc_metrics () = Atomic.set needs_gc_metrics true
@ -442,40 +478,64 @@ module Backend(Arg : sig val config : Config.t end)()
if add_own_metrics then ( if add_own_metrics then (
let open OT.Metrics in let open OT.Metrics in
Atomic.set last_sent_metrics now; Atomic.set last_sent_metrics now;
[make_resource_metrics [ [
sum ~name:"otel-export.dropped" ~is_monotonic:true [ make_resource_metrics
int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) [
sum ~name:"otel-export.dropped" ~is_monotonic:true
[
int
~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped);
]; ];
sum ~name:"otel-export.errors" ~is_monotonic:true [ sum ~name:"otel-export.errors" ~is_monotonic:true
int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) [
int
~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors);
]; ];
]] ];
) else [] ]
) else
[]
let send_metrics : Metrics.resource_metrics list sender = { let send_metrics : Metrics.resource_metrics list sender =
send=fun m ~ret -> {
send =
(fun m ~ret ->
let@ () = with_lock_ in let@ () = with_lock_ in
if !debug_ then Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) m; if !debug_ then
Format.eprintf "send metrics %a@."
(Format.pp_print_list Metrics.pp_resource_metrics)
m;
let m = List.rev_append (additional_metrics ()) m in let m = List.rev_append (additional_metrics ()) m in
push_metrics m; push_metrics m;
ret() ret ());
} }
let send_logs : Logs.resource_logs list sender = { let send_logs : Logs.resource_logs list sender =
send=fun m ~ret -> {
send =
(fun m ~ret ->
let@ () = with_lock_ in let@ () = with_lock_ in
if !debug_ then Format.eprintf "send logs %a@." (Format.pp_print_list Logs.pp_resource_logs) m; if !debug_ then
Format.eprintf "send logs %a@."
(Format.pp_print_list Logs.pp_resource_logs)
m;
push_logs m; push_logs m;
ret() ret ());
} }
end end
let setup_ ~(config : Config.t) () = let setup_ ~(config : Config.t) () =
debug_ := config.debug; debug_ := config.debug;
let module B = Backend(struct let config=config end)() in let module B =
Backend
(struct
let config = config
end)
()
in
Opentelemetry.Collector.backend := Some (module B); Opentelemetry.Collector.backend := Some (module B);
B.cleanup B.cleanup
@ -489,4 +549,5 @@ let with_setup ?(config=Config.make()) ?(enable=true) () f =
if enable then ( if enable then (
let cleanup = setup_ ~config () in let cleanup = setup_ ~config () in
Fun.protect ~finally:cleanup f Fun.protect ~finally:cleanup f
) else f() ) else
f ()

View file

@ -1,4 +1,3 @@
(* (*
TODO: more options from TODO: more options from
https://opentelemetry.io/docs/reference/specification/protocol/exporter/ https://opentelemetry.io/docs/reference/specification/protocol/exporter/

100
src/dune
View file

@ -8,97 +8,99 @@
; ### protobuf rules ### ; ### protobuf rules ###
(rule (rule
(targets status_types.ml status_types.mli (targets status_types.ml status_types.mli status_pb.ml status_pb.mli
status_pb.ml status_pb.mli
status_pp.ml status_pp.mli) status_pp.ml status_pp.mli)
(deps (:file status.proto) (deps
(:file status.proto)
(source_tree %{project_root}/vendor/opentelemetry-proto/)) (source_tree %{project_root}/vendor/opentelemetry-proto/))
(action (run ocaml-protoc %{file} -ml_out . -pp -binary))) (action
(run ocaml-protoc %{file} -ml_out . -pp -binary)))
(rule (rule
(targets common_types.ml common_types.mli (targets common_types.ml common_types.mli common_pb.ml common_pb.mli
common_pb.ml common_pb.mli
common_pp.ml common_pp.mli) common_pp.ml common_pp.mli)
(deps (deps
(:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/common/v1/common.proto) (:file
%{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/common/v1/common.proto)
(source_tree %{project_root}/vendor/opentelemetry-proto/)) (source_tree %{project_root}/vendor/opentelemetry-proto/))
(action (run ocaml-protoc %{file} (action
-I %{project_root}/vendor/opentelemetry-proto/ (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/
-ml_out . -pp -binary))) -ml_out . -pp -binary)))
(rule (rule
(targets resource_types.ml resource_types.mli (targets resource_types.ml resource_types.mli resource_pb.ml resource_pb.mli
resource_pb.ml resource_pb.mli
resource_pp.ml resource_pp.mli) resource_pp.ml resource_pp.mli)
(deps (deps
(:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/resource/v1/resource.proto) (:file
%{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/resource/v1/resource.proto)
(source_tree %{project_root}/vendor/opentelemetry-proto/)) (source_tree %{project_root}/vendor/opentelemetry-proto/))
(action (run ocaml-protoc %{file} (action
-I %{project_root}/vendor/opentelemetry-proto/ (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/
-ml_out . -pp -binary))) -ml_out . -pp -binary)))
(rule (rule
(targets trace_types.ml trace_types.mli (targets trace_types.ml trace_types.mli trace_pb.ml trace_pb.mli trace_pp.ml
trace_pb.ml trace_pb.mli trace_pp.mli)
trace_pp.ml trace_pp.mli)
(deps (deps
(:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto) (:file
%{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto)
(source_tree %{project_root}/vendor/opentelemetry-proto/)) (source_tree %{project_root}/vendor/opentelemetry-proto/))
(action (run ocaml-protoc %{file} (action
-I %{project_root}/vendor/opentelemetry-proto/ (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/
-ml_out . -pp -binary))) -ml_out . -pp -binary)))
(rule (rule
(targets metrics_types.ml metrics_types.mli (targets metrics_types.ml metrics_types.mli metrics_pb.ml metrics_pb.mli
metrics_pb.ml metrics_pb.mli
metrics_pp.ml metrics_pp.mli) metrics_pp.ml metrics_pp.mli)
(deps (deps
(:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/metrics/v1/metrics.proto) (:file
%{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/metrics/v1/metrics.proto)
(source_tree %{project_root}/vendor/opentelemetry-proto/)) (source_tree %{project_root}/vendor/opentelemetry-proto/))
(action (run ocaml-protoc %{file} (action
-I %{project_root}/vendor/opentelemetry-proto/ (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/
-ml_out . -pp -binary))) -ml_out . -pp -binary)))
(rule (rule
(targets logs_types.ml logs_types.mli (targets logs_types.ml logs_types.mli logs_pb.ml logs_pb.mli logs_pp.ml
logs_pb.ml logs_pb.mli logs_pp.mli)
logs_pp.ml logs_pp.mli)
(deps (deps
(:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/logs/v1/logs.proto) (:file
%{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/logs/v1/logs.proto)
(source_tree %{project_root}/vendor/opentelemetry-proto/)) (source_tree %{project_root}/vendor/opentelemetry-proto/))
(action (run ocaml-protoc %{file} (action
-I %{project_root}/vendor/opentelemetry-proto/ (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/
-ml_out . -pp -binary))) -ml_out . -pp -binary)))
(rule (rule
(targets metrics_service_types.ml metrics_service_types.mli (targets metrics_service_types.ml metrics_service_types.mli
metrics_service_pp.ml metrics_service_pp.mli metrics_service_pp.ml metrics_service_pp.mli metrics_service_pb.ml
metrics_service_pb.ml metrics_service_pb.mli) metrics_service_pb.mli)
(deps (:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto) (deps
(:file
%{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto)
(source_tree %{project_root}/vendor/opentelemetry-proto/)) (source_tree %{project_root}/vendor/opentelemetry-proto/))
(action (run ocaml-protoc %{file} (action
-I %{project_root}/vendor/opentelemetry-proto/ (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/
-ml_out . -pp -binary))) -ml_out . -pp -binary)))
(rule (rule
(targets trace_service_types.ml trace_service_types.mli (targets trace_service_types.ml trace_service_types.mli trace_service_pp.ml
trace_service_pp.ml trace_service_pp.mli trace_service_pp.mli trace_service_pb.ml trace_service_pb.mli)
trace_service_pb.ml trace_service_pb.mli)
(deps (deps
(:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/collector/trace/v1/trace_service.proto) (:file
%{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/collector/trace/v1/trace_service.proto)
(source_tree %{project_root}/vendor/opentelemetry-proto/)) (source_tree %{project_root}/vendor/opentelemetry-proto/))
(action (run ocaml-protoc %{file} (action
-I %{project_root}/vendor/opentelemetry-proto/ (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/
-ml_out . -pp -binary))) -ml_out . -pp -binary)))
(rule (rule
(targets logs_service_types.ml logs_service_types.mli (targets logs_service_types.ml logs_service_types.mli logs_service_pp.ml
logs_service_pp.ml logs_service_pp.mli logs_service_pp.mli logs_service_pb.ml logs_service_pb.mli)
logs_service_pb.ml logs_service_pb.mli)
(deps (deps
(:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/collector/logs/v1/logs_service.proto) (:file
%{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/collector/logs/v1/logs_service.proto)
(source_tree %{project_root}/vendor/opentelemetry-proto/)) (source_tree %{project_root}/vendor/opentelemetry-proto/))
(action (run ocaml-protoc %{file} (action
-I %{project_root}/vendor/opentelemetry-proto/ (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/
-ml_out . -pp -binary))) -ml_out . -pp -binary)))

View file

@ -4,6 +4,14 @@ open Cohttp
open Cohttp_lwt open Cohttp_lwt
module Server : sig module Server : sig
val trace :
?service_name:string ->
?attrs:Otel.Span.key_value list ->
('conn -> Request.t -> 'body -> (Response.t * 'body) Lwt.t) ->
'conn ->
Request.t ->
'body ->
(Response.t * 'body) Lwt.t
(** Trace requests to a Cohttp server. (** Trace requests to a Cohttp server.
Use it like this: Use it like this:
@ -18,18 +26,7 @@ module Server : sig
~mode:(`TCP (`Port 8080)) ~mode:(`TCP (`Port 8080))
(Server.make () ~callback:callback_traced) (Server.make () ~callback:callback_traced)
*) *)
val trace :
?service_name:string ->
?attrs:Otel.Span.key_value list ->
('conn -> Request.t -> 'body -> (Response.t * 'body) Lwt.t) ->
'conn -> Request.t -> 'body -> (Response.t * 'body) Lwt.t
(** Trace a new internal span.
Identical to [Opentelemetry_lwt.Trace.with_], but fetches/stores the trace
scope in the [x-ocaml-otel-traceparent] header in the request for
convenience.
*)
val with_ : val with_ :
?trace_state:string -> ?trace_state:string ->
?service_name:string -> ?service_name:string ->
@ -40,21 +37,28 @@ module Server : sig
Request.t -> Request.t ->
(Request.t -> 'a Lwt.t) -> (Request.t -> 'a Lwt.t) ->
'a Lwt.t 'a Lwt.t
(** Trace a new internal span.
Identical to [Opentelemetry_lwt.Trace.with_], but fetches/stores the trace
scope in the [x-ocaml-otel-traceparent] header in the request for
convenience.
*)
val get_trace_context :
?from:[ `Internal | `External ] -> Request.t -> Otel.Trace.scope option
(** Get the tracing scope from the custom [x-ocaml-otel-traceparent] header (** Get the tracing scope from the custom [x-ocaml-otel-traceparent] header
added by [trace] and [with_]. added by [trace] and [with_].
*) *)
val get_trace_context : ?from:[`Internal | `External] -> Request.t -> Otel.Trace.scope option
val set_trace_context : Otel.Trace.scope -> Request.t -> Request.t
(** Set the tracing scope in the custom [x-ocaml-otel-traceparent] header used (** Set the tracing scope in the custom [x-ocaml-otel-traceparent] header used
by [trace] and [with_]. by [trace] and [with_].
*) *)
val set_trace_context : Otel.Trace.scope -> Request.t -> Request.t
val remove_trace_context : Request.t -> Request.t
(** Strip the custom [x-ocaml-otel-traceparent] header added by [trace] and (** Strip the custom [x-ocaml-otel-traceparent] header added by [trace] and
[with_]. [with_].
*) *)
val remove_trace_context : Request.t -> Request.t
end = struct end = struct
let attrs_of_request (req : Request.t) = let attrs_of_request (req : Request.t) =
let meth = req |> Request.meth |> Code.string_of_method in let meth = req |> Request.meth |> Code.string_of_method in
@ -63,25 +67,24 @@ end = struct
let ua = Header.get (Request.headers req) "user-agent" in let ua = Header.get (Request.headers req) "user-agent" in
let uri = Request.uri req in let uri = Request.uri req in
List.concat List.concat
[ [ ("http.method", `String meth) ] [
; (match host with None -> [] | Some h -> [ ("http.host", `String h) ]) [ "http.method", `String meth ];
; [ ("http.url", `String (Uri.to_string uri)) ] (match host with
; ( match ua with | None -> []
| None -> | Some h -> [ "http.host", `String h ]);
[] [ "http.url", `String (Uri.to_string uri) ];
| Some ua -> (match ua with
[ ("http.user_agent", `String ua) ] ) | None -> []
; ( match referer with | Some ua -> [ "http.user_agent", `String ua ]);
| None -> (match referer with
[] | None -> []
| Some r -> | Some r -> [ "http.request.header.referer", `String r ]);
[ ("http.request.header.referer", `String r) ] )
] ]
let attrs_of_response (res : Response.t) = let attrs_of_response (res : Response.t) =
let code = Response.status res in let code = Response.status res in
let code = Code.code_of_status code in let code = Code.code_of_status code in
[ ("http.status_code", `Int code) ] [ "http.status_code", `Int code ]
let header_x_ocaml_otel_traceparent = "x-ocaml-otel-traceparent" let header_x_ocaml_otel_traceparent = "x-ocaml-otel-traceparent"
@ -89,7 +92,8 @@ end = struct
let module Traceparent = Otel.Trace_context.Traceparent in let module Traceparent = Otel.Trace_context.Traceparent in
let headers = let headers =
Header.add (Request.headers req) header_x_ocaml_otel_traceparent Header.add (Request.headers req) header_x_ocaml_otel_traceparent
(Traceparent.to_value ~trace_id:scope.trace_id ~parent_id:scope.span_id ()) (Traceparent.to_value ~trace_id:scope.trace_id ~parent_id:scope.span_id
())
in in
{ req with headers } { req with headers }
@ -105,41 +109,37 @@ end = struct
| Some v -> | Some v ->
(match Traceparent.of_value v with (match Traceparent.of_value v with
| Ok (trace_id, parent_id) -> | Ok (trace_id, parent_id) ->
(Some Otel.Trace.{ trace_id; span_id = parent_id; events = []; attrs = []}) Some
Otel.Trace.{ trace_id; span_id = parent_id; events = []; attrs = [] }
| Error _ -> None) | Error _ -> None)
let remove_trace_context req = let remove_trace_context req =
let headers = Header.remove (Request.headers req) header_x_ocaml_otel_traceparent in let headers =
Header.remove (Request.headers req) header_x_ocaml_otel_traceparent
in
{ req with headers } { req with headers }
let trace ?service_name ?(attrs=[]) callback = let trace ?service_name ?(attrs = []) callback conn req body =
fun conn req body ->
let scope = get_trace_context ~from:`External req in let scope = get_trace_context ~from:`External req in
Otel_lwt.Trace.with_ Otel_lwt.Trace.with_ ?service_name "request" ~kind:Span_kind_server
?service_name
"request"
~kind:Span_kind_server
?trace_id:(Option.map (fun scope -> scope.Otel.Trace.trace_id) scope) ?trace_id:(Option.map (fun scope -> scope.Otel.Trace.trace_id) scope)
?parent:(Option.map (fun scope -> scope.Otel.Trace.span_id) scope) ?parent:(Option.map (fun scope -> scope.Otel.Trace.span_id) scope)
~attrs:(attrs @ attrs_of_request req) ~attrs:(attrs @ attrs_of_request req)
(fun scope -> (fun scope ->
let open Lwt.Syntax in let open Lwt.Syntax in
let req = set_trace_context scope req in let req = set_trace_context scope req in
let* (res, body) = callback conn req body in let* res, body = callback conn req body in
Otel.Trace.add_attrs scope (fun () -> attrs_of_response res); Otel.Trace.add_attrs scope (fun () -> attrs_of_response res);
Lwt.return (res, body)) Lwt.return (res, body))
let with_ ?trace_state ?service_name ?attrs ?(kind=Otel.Span.Span_kind_internal) ?links name req (f : Request.t -> 'a Lwt.t) = let with_ ?trace_state ?service_name ?attrs
?(kind = Otel.Span.Span_kind_internal) ?links name req
(f : Request.t -> 'a Lwt.t) =
let scope = get_trace_context ~from:`Internal req in let scope = get_trace_context ~from:`Internal req in
Otel_lwt.Trace.with_ Otel_lwt.Trace.with_ ?trace_state ?service_name ?attrs ~kind
?trace_state
?service_name
?attrs
~kind
?trace_id:(Option.map (fun scope -> scope.Otel.Trace.trace_id) scope) ?trace_id:(Option.map (fun scope -> scope.Otel.Trace.trace_id) scope)
?parent:(Option.map (fun scope -> scope.Otel.Trace.span_id) scope) ?parent:(Option.map (fun scope -> scope.Otel.Trace.span_id) scope)
?links ?links name
name
(fun scope -> (fun scope ->
let open Lwt.Syntax in let open Lwt.Syntax in
let req = set_trace_context scope req in let req = set_trace_context scope req in
@ -151,38 +151,49 @@ let client ?(scope : Otel.Trace.scope option) (module C : Cohttp_lwt.S.Client)
open Lwt.Syntax open Lwt.Syntax
let attrs_for ~uri ~meth () = let attrs_for ~uri ~meth () =
[ ("http.method", `String (Code.string_of_method `GET)) [
; ("http.url", `String (Uri.to_string uri)) "http.method", `String (Code.string_of_method `GET);
"http.url", `String (Uri.to_string uri);
] ]
let context_for ~uri ~meth = let context_for ~uri ~meth =
let trace_id = match scope with | Some scope -> Some scope.trace_id | None -> None in let trace_id =
let parent = match scope with | Some scope -> Some scope.span_id | None -> None in match scope with
| Some scope -> Some scope.trace_id
| None -> None
in
let parent =
match scope with
| Some scope -> Some scope.span_id
| None -> None
in
let attrs = attrs_for ~uri ~meth () in let attrs = attrs_for ~uri ~meth () in
(trace_id, parent, attrs) trace_id, parent, attrs
let add_traceparent (scope : Otel.Trace.scope) headers = let add_traceparent (scope : Otel.Trace.scope) headers =
let module Traceparent = Otel.Trace_context.Traceparent in let module Traceparent = Otel.Trace_context.Traceparent in
let headers = match headers with | None -> Header.init () | Some headers -> headers in let headers =
match headers with
| None -> Header.init ()
| Some headers -> headers
in
Header.add headers Traceparent.name Header.add headers Traceparent.name
(Traceparent.to_value ~trace_id:scope.trace_id ~parent_id:scope.span_id ()) (Traceparent.to_value ~trace_id:scope.trace_id ~parent_id:scope.span_id
())
type ctx = C.ctx type ctx = C.ctx
let call ?ctx ?headers ?body ?chunked meth (uri : Uri.t) : (Response.t * Cohttp_lwt.Body.t) Lwt.t = let call ?ctx ?headers ?body ?chunked meth (uri : Uri.t) :
let (trace_id, parent, attrs) = context_for ~uri ~meth in (Response.t * Cohttp_lwt.Body.t) Lwt.t =
Otel_lwt.Trace.with_ "request" let trace_id, parent, attrs = context_for ~uri ~meth in
~kind:Span_kind_client Otel_lwt.Trace.with_ "request" ~kind:Span_kind_client ?trace_id ?parent
?trace_id ~attrs (fun scope ->
?parent
~attrs
(fun scope ->
let headers = add_traceparent scope headers in let headers = add_traceparent scope headers in
let* (res, body) = C.call ?ctx ~headers ?body ?chunked meth uri in let* res, body = C.call ?ctx ~headers ?body ?chunked meth uri in
Otel.Trace.add_attrs scope (fun () -> Otel.Trace.add_attrs scope (fun () ->
let code = Response.status res in let code = Response.status res in
let code = Code.code_of_status code in let code = Code.code_of_status code in
[ ("http.status_code", `Int code) ]) ; [ "http.status_code", `Int code ]);
Lwt.return (res, body)) Lwt.return (res, body))
let head ?ctx ?headers uri = let head ?ctx ?headers uri =
@ -204,24 +215,17 @@ let client ?(scope : Otel.Trace.scope option) (module C : Cohttp_lwt.S.Client)
call ?ctx ?headers ?body ?chunked `PATCH uri call ?ctx ?headers ?body ?chunked `PATCH uri
let post_form ?ctx ?headers ~params uri = let post_form ?ctx ?headers ~params uri =
let (trace_id, parent, attrs) = context_for ~uri ~meth:`POST in let trace_id, parent, attrs = context_for ~uri ~meth:`POST in
Otel_lwt.Trace.with_ "request" Otel_lwt.Trace.with_ "request" ~kind:Span_kind_client ?trace_id ?parent
~kind:Span_kind_client ~attrs (fun scope ->
?trace_id
?parent
~attrs
(fun scope ->
let headers = add_traceparent scope headers in let headers = add_traceparent scope headers in
let* (res, body) = let* res, body = C.post_form ?ctx ~headers ~params uri in
C.post_form ?ctx ~headers ~params uri
in
Otel.Trace.add_attrs scope (fun () -> Otel.Trace.add_attrs scope (fun () ->
let code = Response.status res in let code = Response.status res in
let code = Code.code_of_status code in let code = Code.code_of_status code in
[ ("http.status_code", `Int code) ]) ; [ "http.status_code", `Int code ]);
Lwt.return (res, body)) Lwt.return (res, body))
let callv = C.callv (* TODO *) let callv = C.callv (* TODO *)
end end in
in
(module Traced : Cohttp_lwt.S.Client) (module Traced : Cohttp_lwt.S.Client)

View file

@ -1,6 +1,5 @@
open Opentelemetry open Opentelemetry
open Lwt.Syntax open Lwt.Syntax
module Span_id = Span_id module Span_id = Span_id
module Trace_id = Trace_id module Trace_id = Trace_id
module Event = Event module Event = Event
@ -15,10 +14,8 @@ module Trace = struct
include Trace include Trace
(** Sync span guard *) (** Sync span guard *)
let with_ let with_ ?trace_state ?service_name ?(attrs = []) ?kind ?trace_id ?parent
?trace_state ?service_name ?(attrs=[]) ?scope ?links name (f : Trace.scope -> 'a Lwt.t) : 'a Lwt.t =
?kind ?trace_id ?parent ?scope ?links
name (f:Trace.scope -> 'a Lwt.t) : 'a Lwt.t =
let trace_id = let trace_id =
match trace_id, scope with match trace_id, scope with
| Some trace_id, _ -> trace_id | Some trace_id, _ -> trace_id
@ -35,16 +32,17 @@ module Trace = struct
let span_id = Span_id.create () in let span_id = Span_id.create () in
let scope = { trace_id; span_id; events = []; attrs } in let scope = { trace_id; span_id; events = []; attrs } in
let finally ok = let finally ok =
let status = match ok with let status =
match ok with
| Ok () -> default_status ~code:Status_code_ok () | Ok () -> default_status ~code:Status_code_ok ()
| Error e -> default_status ~code:Status_code_error ~message:e () in | Error e -> default_status ~code:Status_code_error ~message:e ()
in
let span, _ = let span, _ =
Span.create Span.create ?kind ~trace_id ?parent ?links ~id:span_id ?trace_state
?kind ~trace_id ?parent ?links ~id:span_id ~attrs:scope.attrs ~events:scope.events ~start_time
?trace_state ~attrs:scope.attrs ~events:scope.events ~end_time:(Timestamp_ns.now_unix_ns ())
~start_time ~end_time:(Timestamp_ns.now_unix_ns()) ~status name
~status in
name in
emit ?service_name [ span ] emit ?service_name [ span ]
in in
Lwt.catch Lwt.catch

View file

@ -1,4 +1,3 @@
(** Opentelemetry types and instrumentation *) (** Opentelemetry types and instrumentation *)
(** {2 Wire format} *) (** {2 Wire format} *)
@ -68,6 +67,7 @@ end
in nanoseconds. *) in nanoseconds. *)
module Timestamp_ns = struct module Timestamp_ns = struct
type t = int64 type t = int64
let ns_in_a_day = Int64.(mul 1_000_000_000L (of_int (24 * 3600))) let ns_in_a_day = Int64.(mul 1_000_000_000L (of_int (24 * 3600)))
(** Current unix timestamp in nanoseconds *) (** Current unix timestamp in nanoseconds *)
@ -90,6 +90,7 @@ end
module Collector = struct module Collector = struct
open Proto open Proto
type 'msg sender = { send: 'a. 'msg -> ret:(unit -> 'a) -> 'a }
(** Sender interface for a message of type [msg]. (** Sender interface for a message of type [msg].
Inspired from Logs' reporter Inspired from Logs' reporter
(see {{:https://erratique.ch/software/logs/doc/Logs/index.html#sync} its doc}) (see {{:https://erratique.ch/software/logs/doc/Logs/index.html#sync} its doc})
@ -102,9 +103,6 @@ module Collector = struct
It doesn't mean the event has been collected yet, it It doesn't mean the event has been collected yet, it
could sit in a batch queue for a little while. could sit in a batch queue for a little while.
*) *)
type 'msg sender = {
send: 'a. 'msg -> ret:(unit -> 'a) -> 'a;
}
(** Collector client interface. *) (** Collector client interface. *)
module type BACKEND = sig module type BACKEND = sig
@ -175,15 +173,17 @@ end
module Util_ = struct module Util_ = struct
let bytes_to_hex (b : bytes) : string = let bytes_to_hex (b : bytes) : string =
let i_to_hex (i : int) = let i_to_hex (i : int) =
if i < 10 then Char.chr (i + Char.code '0') if i < 10 then
else Char.chr (i - 10 + Char.code 'a') Char.chr (i + Char.code '0')
else
Char.chr (i - 10 + Char.code 'a')
in in
let res = Bytes.create (2 * Bytes.length b) in let res = Bytes.create (2 * Bytes.length b) in
for i = 0 to Bytes.length b - 1 do for i = 0 to Bytes.length b - 1 do
let n = Char.code (Bytes.get b i) in let n = Char.code (Bytes.get b i) in
Bytes.set res (2 * i) (i_to_hex ((n land 0xf0) lsr 4)); Bytes.set res (2 * i) (i_to_hex ((n land 0xf0) lsr 4));
Bytes.set res (2 * i + 1) (i_to_hex (n land 0x0f)); Bytes.set res ((2 * i) + 1) (i_to_hex (n land 0x0f))
done; done;
Bytes.unsafe_to_string res Bytes.unsafe_to_string res
@ -193,11 +193,12 @@ module Util_ = struct
| 'a' .. 'f' as c -> 10 + Char.code c - Char.code 'a' | 'a' .. 'f' as c -> 10 + Char.code c - Char.code 'a'
| _ -> raise (Invalid_argument "invalid hex char") | _ -> raise (Invalid_argument "invalid hex char")
in in
if (String.length s mod 2 <> 0) then raise (Invalid_argument "hex sequence must be of even length"); if String.length s mod 2 <> 0 then
raise (Invalid_argument "hex sequence must be of even length");
let res = Bytes.make (String.length s / 2) '\x00' in let res = Bytes.make (String.length s / 2) '\x00' in
for i=0 to String.length s/2-1 do for i = 0 to (String.length s / 2) - 1 do
let n1 = n_of_c (String.get s (2 * i)) in let n1 = n_of_c (String.get s (2 * i)) in
let n2 = n_of_c (String.get s (2*i+1)) in let n2 = n_of_c (String.get s ((2 * i) + 1)) in
let n = (n1 lsl 4) lor n2 in let n = (n1 lsl 4) lor n2 in
Bytes.set res i (Char.chr n) Bytes.set res i (Char.chr n)
done; done;
@ -211,36 +212,66 @@ end
This 16 bytes identifier is shared by all spans in one trace. *) This 16 bytes identifier is shared by all spans in one trace. *)
module Trace_id : sig module Trace_id : sig
type t type t
val create : unit -> t val create : unit -> t
val to_bytes : t -> bytes val to_bytes : t -> bytes
val of_bytes : bytes -> t val of_bytes : bytes -> t
val to_hex : t -> string val to_hex : t -> string
val of_hex : string -> t val of_hex : string -> t
end = struct end = struct
open Proto.Trace open Proto.Trace
type t = bytes type t = bytes
let to_bytes self = self let to_bytes self = self
let create () : t = Collector.rand_bytes_16 () let create () : t = Collector.rand_bytes_16 ()
let of_bytes b = if Bytes.length b=16 then b else raise (Invalid_argument "trace IDs must be 16 bytes in length")
let of_bytes b =
if Bytes.length b = 16 then
b
else
raise (Invalid_argument "trace IDs must be 16 bytes in length")
let to_hex self = Util_.bytes_to_hex self let to_hex self = Util_.bytes_to_hex self
let of_hex s = of_bytes (Util_.bytes_of_hex s) let of_hex s = of_bytes (Util_.bytes_of_hex s)
end end
(** Unique ID of a span. *) (** Unique ID of a span. *)
module Span_id : sig module Span_id : sig
type t type t
val create : unit -> t val create : unit -> t
val to_bytes : t -> bytes val to_bytes : t -> bytes
val of_bytes : bytes -> t val of_bytes : bytes -> t
val to_hex : t -> string val to_hex : t -> string
val of_hex : string -> t val of_hex : string -> t
end = struct end = struct
open Proto.Trace open Proto.Trace
type t = bytes type t = bytes
let to_bytes self = self let to_bytes self = self
let create () : t = Collector.rand_bytes_8 () let create () : t = Collector.rand_bytes_8 ()
let of_bytes b = if Bytes.length b=8 then b else raise (Invalid_argument "span IDs must be 8 bytes in length")
let of_bytes b =
if Bytes.length b = 8 then
b
else
raise (Invalid_argument "span IDs must be 8 bytes in length")
let to_hex self = Util_.bytes_to_hex self let to_hex self = Util_.bytes_to_hex self
let of_hex s = of_bytes (Util_.bytes_of_hex s) let of_hex s = of_bytes (Util_.bytes_of_hex s)
end end
@ -251,12 +282,16 @@ module Conventions = struct
module Process = struct module Process = struct
module Runtime = struct module Runtime = struct
let name = "process.runtime.name" let name = "process.runtime.name"
let version = "process.runtime.version" let version = "process.runtime.version"
let description = "process.runtime.description" let description = "process.runtime.description"
end end
end end
module Service = struct module Service = struct
let name = "service.name" let name = "service.name"
let namespace = "service.namespace" let namespace = "service.namespace"
end end
end end
@ -267,9 +302,13 @@ module Conventions = struct
module Ocaml = struct module Ocaml = struct
module GC = struct module GC = struct
let compactions = "process.runtime.ocaml.gc.compactions" let compactions = "process.runtime.ocaml.gc.compactions"
let major_collections = "process.runtime.ocaml.gc.major_collections" let major_collections = "process.runtime.ocaml.gc.major_collections"
let major_heap = "process.runtime.ocaml.gc.major_heap" let major_heap = "process.runtime.ocaml.gc.major_heap"
let minor_allocated = "process.runtime.ocaml.gc.minor_allocated" let minor_allocated = "process.runtime.ocaml.gc.minor_allocated"
let minor_collections = "process.runtime.ocaml.gc.minor_collections" let minor_collections = "process.runtime.ocaml.gc.minor_collections"
end end
end end
@ -278,11 +317,17 @@ module Conventions = struct
end end
end end
type value = [`Int of int | `String of string | `Bool of bool | `None] type value =
[ `Int of int
| `String of string
| `Bool of bool
| `None
]
type key_value = string * value type key_value = string * value
(**/**) (**/**)
let _conv_value = let _conv_value =
let open Proto.Common in let open Proto.Common in
function function
@ -294,6 +339,7 @@ let _conv_value =
(**/**) (**/**)
(**/**) (**/**)
let _conv_key_value (k, v) = let _conv_key_value (k, v) =
let open Proto.Common in let open Proto.Common in
let value = _conv_value v in let value = _conv_value v in
@ -307,29 +353,30 @@ let _conv_key_value (k,v) =
module Globals = struct module Globals = struct
open Proto.Common open Proto.Common
let service_name = ref "unknown_service"
(** Main service name metadata *) (** Main service name metadata *)
let service_name = ref "unknown_service"
let service_namespace = ref None
(** Namespace for the service *) (** Namespace for the service *)
let service_namespace = ref None
let instrumentation_library = let instrumentation_library =
default_instrumentation_library default_instrumentation_library ~version:"%%VERSION%%"
~version:"%%VERSION%%"
~name:"ocaml-opentelemetry" () ~name:"ocaml-opentelemetry" ()
(** Global attributes, initially set (** Global attributes, initially set
via OTEL_RESOURCE_ATTRIBUTES and modifiable via OTEL_RESOURCE_ATTRIBUTES and modifiable
by the user code. They will be attached to each outgoing metrics/traces. *) by the user code. They will be attached to each outgoing metrics/traces. *)
let global_attributes : key_value list ref = let global_attributes : key_value list ref =
let parse_pair s = match String.split_on_char '=' s with let parse_pair s =
match String.split_on_char '=' s with
| [ a; b ] -> default_key_value ~key:a ~value:(Some (String_value b)) () | [ a; b ] -> default_key_value ~key:a ~value:(Some (String_value b)) ()
| _ -> failwith (Printf.sprintf "invalid attribute: %S" s) | _ -> failwith (Printf.sprintf "invalid attribute: %S" s)
in in
ref @@ ref
@@
try try
Sys.getenv "OTEL_RESOURCE_ATTRIBUTES" |> String.split_on_char ',' Sys.getenv "OTEL_RESOURCE_ATTRIBUTES"
|> List.map parse_pair |> String.split_on_char ',' |> List.map parse_pair
with _ -> [] with _ -> []
(* add global attributes to this list *) (* add global attributes to this list *)
@ -341,13 +388,16 @@ module Globals = struct
let l = List.map _conv_key_value attrs in let l = List.map _conv_key_value attrs in
let l = let l =
default_key_value ~key:Conventions.Attributes.Service.name default_key_value ~key:Conventions.Attributes.Service.name
~value:(Some (String_value service_name)) () :: l ~value:(Some (String_value service_name)) ()
:: l
in in
let l = match !service_namespace with let l =
match !service_namespace with
| None -> l | None -> l
| Some v -> | Some v ->
default_key_value ~key:Conventions.Attributes.Service.namespace default_key_value ~key:Conventions.Attributes.Service.namespace
~value:(Some (String_value v)) () :: l ~value:(Some (String_value v)) ()
:: l
in in
l |> merge_global_attributes_ l |> merge_global_attributes_
end end
@ -360,21 +410,17 @@ end
belong in a span. *) belong in a span. *)
module Event : sig module Event : sig
open Proto.Trace open Proto.Trace
type t = span_event type t = span_event
val make : val make :
?time_unix_nano:Timestamp_ns.t -> ?time_unix_nano:Timestamp_ns.t -> ?attrs:key_value list -> string -> t
?attrs:key_value list ->
string ->
t
end = struct end = struct
open Proto.Trace open Proto.Trace
type t = span_event type t = span_event
let make let make ?(time_unix_nano = Timestamp_ns.now_unix_ns ()) ?(attrs = [])
?(time_unix_nano=Timestamp_ns.now_unix_ns())
?(attrs=[])
(name : string) : t = (name : string) : t =
let attrs = List.map _conv_key_value attrs in let attrs = List.map _conv_key_value attrs in
default_span_event ~time_unix_nano ~name ~attributes:attrs () default_span_event ~time_unix_nano ~name ~attributes:attrs ()
@ -390,6 +436,7 @@ module Span : sig
open Proto.Trace open Proto.Trace
type t = span type t = span
type id = Span_id.t type id = Span_id.t
type nonrec kind = span_span_kind = type nonrec kind = span_span_kind =
@ -412,7 +459,8 @@ module Span : sig
val id : t -> Span_id.t val id : t -> Span_id.t
type key_value = string * [`Int of int | `String of string | `Bool of bool | `None] type key_value =
string * [ `Int of int | `String of string | `Bool of bool | `None ]
val create : val create :
?kind:kind -> ?kind:kind ->
@ -426,7 +474,8 @@ module Span : sig
?links:(Trace_id.t * Span_id.t * string) list -> ?links:(Trace_id.t * Span_id.t * string) list ->
start_time:Timestamp_ns.t -> start_time:Timestamp_ns.t ->
end_time:Timestamp_ns.t -> end_time:Timestamp_ns.t ->
string -> t * id string ->
t * id
(** [create ~trace_id name] creates a new span with its unique ID. (** [create ~trace_id name] creates a new span with its unique ID.
@param trace_id the trace this belongs to @param trace_id the trace this belongs to
@param parent parent span, if any @param parent parent span, if any
@ -436,6 +485,7 @@ end = struct
open Proto.Trace open Proto.Trace
type t = span type t = span
type id = Span_id.t type id = Span_id.t
type nonrec kind = span_span_kind = type nonrec kind = span_span_kind =
@ -446,7 +496,8 @@ end = struct
| Span_kind_producer | Span_kind_producer
| Span_kind_consumer | Span_kind_consumer
type key_value = string * [`Int of int | `String of string | `Bool of bool | `None] type key_value =
string * [ `Int of int | `String of string | `Bool of bool | `None ]
type nonrec status_code = status_status_code = type nonrec status_code = status_status_code =
| Status_code_unset | Status_code_unset
@ -458,19 +509,11 @@ end = struct
code: status_code; code: status_code;
} }
let id self = Span_id.of_bytes self.span_id let id self = Span_id.of_bytes self.span_id
let create let create ?(kind = Span_kind_unspecified) ?(id = Span_id.create ())
?(kind=Span_kind_unspecified) ?trace_state ?(attrs = []) ?(events = []) ?status ~trace_id ?parent
?(id=Span_id.create()) ?(links = []) ~start_time ~end_time name : t * id =
?trace_state
?(attrs=[])
?(events=[])
?status
~trace_id ?parent ?(links=[])
~start_time ~end_time
name : t * id =
let trace_id = Trace_id.to_bytes trace_id in let trace_id = Trace_id.to_bytes trace_id in
let parent_span_id = Option.map Span_id.to_bytes parent in let parent_span_id = Option.map Span_id.to_bytes parent in
let attributes = List.map _conv_key_value attrs in let attributes = List.map _conv_key_value attrs in
@ -483,15 +526,9 @@ end = struct
links links
in in
let span = let span =
default_span default_span ~trace_id ?parent_span_id ~span_id:(Span_id.to_bytes id)
~trace_id ?parent_span_id ~attributes ~events ?trace_state ~status ~kind ~name ~links
~span_id:(Span_id.to_bytes id) ~start_time_unix_nano:start_time ~end_time_unix_nano:end_time ()
~attributes ~events
?trace_state ~status
~kind ~name ~links
~start_time_unix_nano:start_time
~end_time_unix_nano:end_time
()
in in
span, id span, id
end end
@ -507,49 +544,47 @@ module Trace = struct
let make_resource_spans ?service_name ?attrs spans = let make_resource_spans ?service_name ?attrs spans =
let ils = let ils =
default_instrumentation_library_spans default_instrumentation_library_spans
~instrumentation_library:(Some Globals.instrumentation_library) ~instrumentation_library:(Some Globals.instrumentation_library) ~spans
~spans () in ()
in
let attributes = Globals.mk_attributes ?service_name ?attrs () in let attributes = Globals.mk_attributes ?service_name ?attrs () in
let resource = Proto.Resource.default_resource ~attributes () in let resource = Proto.Resource.default_resource ~attributes () in
default_resource_spans default_resource_spans ~resource:(Some resource)
~resource:(Some resource) ~instrumentation_library_spans:[ils] () ~instrumentation_library_spans:[ ils ] ()
(** Sync emitter *) (** Sync emitter *)
let emit ?service_name ?attrs (spans : span list) : unit = let emit ?service_name ?attrs (spans : span list) : unit =
let rs = make_resource_spans ?service_name ?attrs spans in let rs = make_resource_spans ?service_name ?attrs spans in
Collector.send_trace [ rs ] ~ret:(fun () -> ()) Collector.send_trace [ rs ] ~ret:(fun () -> ())
(** Scope to be used with {!with_}. *)
type scope = { type scope = {
trace_id: Trace_id.t; trace_id: Trace_id.t;
span_id: Span_id.t; span_id: Span_id.t;
mutable events: Event.t list; mutable events: Event.t list;
mutable attrs: Span.key_value list mutable attrs: Span.key_value list;
} }
(** Scope to be used with {!with_}. *)
(** Add an event to the scope. It will be aggregated into the span. (** Add an event to the scope. It will be aggregated into the span.
Note that this takes a function that produces an event, and will only Note that this takes a function that produces an event, and will only
call it if there is an instrumentation backend. *) call it if there is an instrumentation backend. *)
let[@inline] add_event (scope : scope) (ev : unit -> Event.t) : unit = let[@inline] add_event (scope : scope) (ev : unit -> Event.t) : unit =
if Collector.has_backend() then ( if Collector.has_backend () then scope.events <- ev () :: scope.events
scope.events <- ev() :: scope.events
)
(** Add an attr to the scope. It will be aggregated into the span. (** Add an attr to the scope. It will be aggregated into the span.
Note that this takes a function that produces attributes, and will only Note that this takes a function that produces attributes, and will only
call it if there is an instrumentation backend. *) call it if there is an instrumentation backend. *)
let[@inline] add_attrs (scope:scope) (attrs:unit -> Span.key_value list) : unit = let[@inline] add_attrs (scope : scope) (attrs : unit -> Span.key_value list) :
if Collector.has_backend() then ( unit =
if Collector.has_backend () then
scope.attrs <- List.rev_append (attrs ()) scope.attrs scope.attrs <- List.rev_append (attrs ()) scope.attrs
)
(** Sync span guard *) (** Sync span guard *)
let with_ let with_ ?trace_state ?service_name
?trace_state ?service_name ?(attrs: (string*[<value]) list = []) ?(attrs : (string * [< value ]) list = []) ?kind ?trace_id ?parent ?scope
?kind ?trace_id ?parent ?scope ?links ?links name (f : scope -> 'a) : 'a =
name (f: scope -> 'a) : 'a =
let trace_id = let trace_id =
match trace_id, scope with match trace_id, scope with
| Some trace_id, _ -> trace_id | Some trace_id, _ -> trace_id
@ -568,19 +603,20 @@ module Trace = struct
(* called once we're done, to emit a span *) (* called once we're done, to emit a span *)
let finally res = let finally res =
let status = match res with let status =
match res with
| Ok () -> default_status ~code:Status_code_ok () | Ok () -> default_status ~code:Status_code_ok ()
| Error e -> default_status ~code:Status_code_error ~message:e () in | Error e -> default_status ~code:Status_code_error ~message:e ()
in
let span, _ = let span, _ =
(* TODO: should the attrs passed to with_ go on the Span (in Span.create) or on the ResourceSpan (in emit)? (* TODO: should the attrs passed to with_ go on the Span (in Span.create) or on the ResourceSpan (in emit)?
(question also applies to Opentelemetry_lwt.Trace.with) *) (question also applies to Opentelemetry_lwt.Trace.with) *)
Span.create Span.create ?kind ~trace_id ?parent ?links ~id:span_id ?trace_state
?kind ~trace_id ?parent ?links ~id:span_id ~attrs:scope.attrs ~events:scope.events ~start_time
?trace_state ~attrs:scope.attrs ~events:scope.events ~end_time:(Timestamp_ns.now_unix_ns ())
~start_time ~end_time:(Timestamp_ns.now_unix_ns()) ~status name
~status in
name in emit ?service_name [ span ]
emit ?service_name [span];
in in
try try
let x = f scope in let x = f scope in
@ -605,24 +641,21 @@ module Metrics = struct
(** Number data point, as a float *) (** Number data point, as a float *)
let float ?(start_time_unix_nano = _program_start) let float ?(start_time_unix_nano = _program_start)
?(now=Timestamp_ns.now_unix_ns()) ?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) (d : float) :
?(attrs=[]) number_data_point =
(d:float) : number_data_point =
let attributes = attrs |> List.map _conv_key_value in let attributes = attrs |> List.map _conv_key_value in
default_number_data_point default_number_data_point ~start_time_unix_nano ~time_unix_nano:now
~start_time_unix_nano ~time_unix_nano:now ~attributes ~value:(As_double d) ()
~attributes
~value:(As_double d) ()
(** Number data point, as an int *) (** Number data point, as an int *)
let int ?(start_time_unix_nano = _program_start) let int ?(start_time_unix_nano = _program_start)
?(now=Timestamp_ns.now_unix_ns()) ?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) (i : int) :
?(attrs=[]) number_data_point =
(i:int) : number_data_point =
let attributes = attrs |> List.map _conv_key_value in let attributes = attrs |> List.map _conv_key_value in
default_number_data_point ~start_time_unix_nano ~time_unix_nano:now default_number_data_point ~start_time_unix_nano ~time_unix_nano:now
~attributes ~attributes
~value:(As_int (Int64.of_int i)) () ~value:(As_int (Int64.of_int i))
()
(** Aggregation of a scalar metric, always with the current value *) (** Aggregation of a scalar metric, always with the current value *)
let gauge ~name ?description ?unit_ (l : number_data_point list) : t = let gauge ~name ?description ?unit_ (l : number_data_point list) : t =
@ -637,11 +670,10 @@ module Metrics = struct
(** Sum of all reported measurements over a time interval *) (** Sum of all reported measurements over a time interval *)
let sum ~name ?description ?unit_ let sum ~name ?description ?unit_
?(aggregation_temporality = Aggregation_temporality_cumulative) ?(aggregation_temporality = Aggregation_temporality_cumulative)
?is_monotonic ?is_monotonic (l : number_data_point list) : t =
(l:number_data_point list) : t =
let data = let data =
Sum (default_sum ~data_points:l ?is_monotonic Sum (default_sum ~data_points:l ?is_monotonic ~aggregation_temporality ())
~aggregation_temporality ()) in in
default_metric ~name ?description ?unit_ ~data () default_metric ~name ?description ?unit_ ~data ()
(* TODO (* TODO
@ -659,15 +691,17 @@ module Metrics = struct
(* TODO: exemplar *) (* TODO: exemplar *)
(** Aggregate metrics into a {!Proto.Metrics.resource_metrics} *) (** Aggregate metrics into a {!Proto.Metrics.resource_metrics} *)
let make_resource_metrics ?service_name ?attrs (l:t list) : resource_metrics = let make_resource_metrics ?service_name ?attrs (l : t list) : resource_metrics
=
let lm = let lm =
default_instrumentation_library_metrics default_instrumentation_library_metrics
~instrumentation_library:(Some Globals.instrumentation_library) ~instrumentation_library:(Some Globals.instrumentation_library)
~metrics:l () in ~metrics:l ()
in
let attributes = Globals.mk_attributes ?service_name ?attrs () in let attributes = Globals.mk_attributes ?service_name ?attrs () in
let resource = Proto.Resource.default_resource ~attributes () in let resource = Proto.Resource.default_resource ~attributes () in
default_resource_metrics default_resource_metrics ~instrumentation_library_metrics:[ lm ]
~instrumentation_library_metrics:[lm] ~resource:(Some resource) () ~resource:(Some resource) ()
(** Emit some metrics to the collector (sync). This blocks until (** Emit some metrics to the collector (sync). This blocks until
the backend has pushed the metrics into some internal queue, or the backend has pushed the metrics into some internal queue, or
@ -724,52 +758,47 @@ module Logs = struct
let pp_flags = Logs_pp.pp_log_record_flags let pp_flags = Logs_pp.pp_log_record_flags
(** Make a single log entry *) (** Make a single log entry *)
let make let make ?time ?(observed_time_unix_nano = Timestamp_ns.now_unix_ns ())
?time ?(observed_time_unix_nano=Timestamp_ns.now_unix_ns()) ?severity ?log_level ?flags ?trace_id ?span_id (body : value) : t =
?severity ?log_level ?flags ?trace_id ?span_id let time_unix_nano =
(body:value) : t = match time with
let time_unix_nano = match time with
| None -> observed_time_unix_nano | None -> observed_time_unix_nano
| Some t -> t | Some t -> t
in in
let trace_id = Option.map Trace_id.to_bytes trace_id in let trace_id = Option.map Trace_id.to_bytes trace_id in
let span_id = Option.map Span_id.to_bytes span_id in let span_id = Option.map Span_id.to_bytes span_id in
let body = _conv_value body in let body = _conv_value body in
default_log_record default_log_record ~time_unix_nano ~observed_time_unix_nano
~time_unix_nano ~observed_time_unix_nano ?severity_number:severity ?severity_text:log_level ?flags ?trace_id
?severity_number:severity ?severity_text:log_level ?span_id ~body ()
?flags ?trace_id ?span_id ~body ()
(** Make a log entry whose body is a string *) (** Make a log entry whose body is a string *)
let make_str let make_str ?time ?observed_time_unix_nano ?severity ?log_level ?flags
?time ?observed_time_unix_nano ?trace_id ?span_id (body : string) : t =
?severity ?log_level ?flags ?trace_id ?span_id make ?time ?observed_time_unix_nano ?severity ?log_level ?flags ?trace_id
(body:string) : t = ?span_id (`String body)
make
?time ?observed_time_unix_nano
?severity ?log_level ?flags ?trace_id ?span_id
(`String body)
(** Make a log entry with format *) (** Make a log entry with format *)
let make_strf let make_strf ?time ?observed_time_unix_nano ?severity ?log_level ?flags
?time ?observed_time_unix_nano ?trace_id ?span_id fmt =
?severity ?log_level ?flags ?trace_id ?span_id
fmt =
Format.kasprintf Format.kasprintf
(fun bod -> (fun bod ->
make_str make_str ?time ?observed_time_unix_nano ?severity ?log_level ?flags
?time ?observed_time_unix_nano ?trace_id ?span_id bod)
?severity ?log_level ?flags ?trace_id ?span_id bod)
fmt fmt
let emit ?service_name ?attrs (l : t list) : unit = let emit ?service_name ?attrs (l : t list) : unit =
let attributes = Globals.mk_attributes ?service_name ?attrs () in let attributes = Globals.mk_attributes ?service_name ?attrs () in
let resource = Proto.Resource.default_resource ~attributes () in let resource = Proto.Resource.default_resource ~attributes () in
let ll = default_instrumentation_library_logs let ll =
default_instrumentation_library_logs
~instrumentation_library:(Some Globals.instrumentation_library) ~instrumentation_library:(Some Globals.instrumentation_library)
~log_records:l () in ~log_records:l ()
let rl = default_resource_logs ~resource:(Some resource) in
~instrumentation_library_logs:[ll] () in let rl =
default_resource_logs ~resource:(Some resource)
~instrumentation_library_logs:[ ll ] ()
in
Collector.send_logs [ rl ] ~ret:ignore Collector.send_logs [ rl ] ~ret:ignore
end end
@ -780,12 +809,10 @@ end
https://www.w3.org/TR/trace-context/ https://www.w3.org/TR/trace-context/
*) *)
module Trace_context = struct module Trace_context = struct
(** The traceparent header (** The traceparent header
https://www.w3.org/TR/trace-context/#traceparent-header https://www.w3.org/TR/trace-context/#traceparent-header
*) *)
module Traceparent = struct module Traceparent = struct
let name = "traceparent" let name = "traceparent"
(** Parse the value of the traceparent header. (** Parse the value of the traceparent header.
@ -816,33 +843,40 @@ module Trace_context = struct
let consume expected ~offset ~or_ = let consume expected ~offset ~or_ =
let len = String.length expected in let len = String.length expected in
let* str, offset = blit ~offset ~len ~or_ in let* str, offset = blit ~offset ~len ~or_ in
if str = expected then Ok offset else Error or_ if str = expected then
Ok offset
else
Error or_
in in
let offset = 0 in let offset = 0 in
let* offset = consume "00" ~offset ~or_:"Expected version 00" in let* offset = consume "00" ~offset ~or_:"Expected version 00" in
let* offset = consume "-" ~offset ~or_:"Expected delimiter" in let* offset = consume "-" ~offset ~or_:"Expected delimiter" in
let* trace_id, offset = blit ~offset ~len:32 ~or_:"Expected 32-digit trace-id" in let* trace_id, offset =
blit ~offset ~len:32 ~or_:"Expected 32-digit trace-id"
in
let* trace_id = let* trace_id =
match Trace_id.of_hex trace_id with match Trace_id.of_hex trace_id with
| trace_id -> Ok trace_id | trace_id -> Ok trace_id
| exception Invalid_argument _ -> Error "Expected hex-encoded trace-id" | exception Invalid_argument _ -> Error "Expected hex-encoded trace-id"
in in
let* offset = consume "-" ~offset ~or_:"Expected delimiter" in let* offset = consume "-" ~offset ~or_:"Expected delimiter" in
let* parent_id, offset = blit ~offset ~len:16 ~or_:"Expected 16-digit parent-id" in let* parent_id, offset =
blit ~offset ~len:16 ~or_:"Expected 16-digit parent-id"
in
let* parent_id = let* parent_id =
match Span_id.of_hex parent_id with match Span_id.of_hex parent_id with
| parent_id -> Ok parent_id | parent_id -> Ok parent_id
| exception Invalid_argument _ -> Error "Expected hex-encoded parent-id" | exception Invalid_argument _ -> Error "Expected hex-encoded parent-id"
in in
let* offset = consume "-" ~offset ~or_:"Expected delimiter" in let* offset = consume "-" ~offset ~or_:"Expected delimiter" in
let* _flags, _offset = blit ~offset ~len:2 ~or_:"Expected 2-digit flags" in let* _flags, _offset =
blit ~offset ~len:2 ~or_:"Expected 2-digit flags"
in
Ok (trace_id, parent_id) Ok (trace_id, parent_id)
let to_value ~(trace_id : Trace_id.t) ~(parent_id : Span_id.t) () : string = let to_value ~(trace_id : Trace_id.t) ~(parent_id : Span_id.t) () : string =
Printf.sprintf "00-%s-%s-00" Printf.sprintf "00-%s-%s-00" (Trace_id.to_hex trace_id)
(Trace_id.to_hex trace_id)
(Span_id.to_hex parent_id) (Span_id.to_hex parent_id)
end end
end end
@ -862,9 +896,10 @@ end = struct
(** See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/process.md#process-runtimes *) (** See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/process.md#process-runtimes *)
let runtime_attributes = let runtime_attributes =
lazy lazy
Conventions.Attributes.[ Conventions.Attributes.
(Process.Runtime.name, `String "ocaml"); [
(Process.Runtime.version, `String Sys.ocaml_version); Process.Runtime.name, `String "ocaml";
Process.Runtime.version, `String Sys.ocaml_version;
] ]
let get_runtime_attributes () = Lazy.force runtime_attributes let get_runtime_attributes () = Lazy.force runtime_attributes
@ -878,7 +913,9 @@ end = struct
ignore (Gc.create_alarm trigger : Gc.alarm) ignore (Gc.create_alarm trigger : Gc.alarm)
let bytes_per_word = Sys.word_size / 8 let bytes_per_word = Sys.word_size / 8
let word_to_bytes n = n * bytes_per_word let word_to_bytes n = n * bytes_per_word
let word_to_bytes_f n = n *. float bytes_per_word let word_to_bytes_f n = n *. float bytes_per_word
let get_metrics () : Metrics.t list = let get_metrics () : Metrics.t list =
@ -891,8 +928,7 @@ end = struct
[ int ~now (word_to_bytes gc.Gc.heap_words) ]; [ int ~now (word_to_bytes gc.Gc.heap_words) ];
sum ~name:Process.Runtime.Ocaml.GC.minor_allocated sum ~name:Process.Runtime.Ocaml.GC.minor_allocated
~aggregation_temporality:Metrics.Aggregation_temporality_cumulative ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative
~is_monotonic:true ~is_monotonic:true ~unit_:"B"
~unit_:"B"
[ float ~now (word_to_bytes_f gc.Gc.minor_words) ]; [ float ~now (word_to_bytes_f gc.Gc.minor_words) ];
sum ~name:Process.Runtime.Ocaml.GC.minor_collections sum ~name:Process.Runtime.Ocaml.GC.minor_collections
~aggregation_temporality:Metrics.Aggregation_temporality_cumulative ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative

View file

@ -1,25 +1,29 @@
module T = Opentelemetry module T = Opentelemetry
module Otel_lwt = Opentelemetry_lwt module Otel_lwt = Opentelemetry_lwt
let spf = Printf.sprintf let spf = Printf.sprintf
let ( let@ ) f x = f x let ( let@ ) f x = f x
let sleep_inner = ref 0.1 let sleep_inner = ref 0.1
let sleep_outer = ref 2.0 let sleep_outer = ref 2.0
let mk_client ~scope = Opentelemetry_cohttp_lwt.client ~scope (module Cohttp_lwt_unix.Client) let mk_client ~scope =
Opentelemetry_cohttp_lwt.client ~scope (module Cohttp_lwt_unix.Client)
let run () = let run () =
Printf.printf "collector is on %S\n%!" (Opentelemetry_client_ocurl.get_url ()); Printf.printf "collector is on %S\n%!" (Opentelemetry_client_ocurl.get_url ());
let open Lwt.Syntax in let open Lwt.Syntax in
let rec go () = let rec go () =
let@ scope = let@ scope =
Otel_lwt.Trace.with_ Otel_lwt.Trace.with_ ~kind:T.Span.Span_kind_producer "loop.outer"
~kind:T.Span.Span_kind_producer
"loop.outer"
in in
let* () = Lwt_unix.sleep !sleep_outer in let* () = Lwt_unix.sleep !sleep_outer in
let module C = (val mk_client ~scope) in let module C = (val mk_client ~scope) in
let* (res, body) = C.get (Uri.of_string "https://enec1hql02hz.x.pipedream.net") in let* res, body =
C.get (Uri.of_string "https://enec1hql02hz.x.pipedream.net")
in
let* () = Cohttp_lwt.Body.drain_body body in let* () = Cohttp_lwt.Body.drain_body body in
go () go ()
in in
@ -34,26 +38,40 @@ let () =
let thread = ref true in let thread = ref true in
let batch_traces = ref 400 in let batch_traces = ref 400 in
let batch_metrics = ref 3 in let batch_metrics = ref 3 in
let opts = [ let opts =
[
"--debug", Arg.Bool (( := ) debug), " enable debug output"; "--debug", Arg.Bool (( := ) debug), " enable debug output";
"--thread", Arg.Bool (( := ) thread), " use a background thread"; "--thread", Arg.Bool (( := ) thread), " use a background thread";
"--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch"; "--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch";
"--batch-metrics", Arg.Int ((:=) batch_metrics), " size of metrics batch"; ( "--batch-metrics",
Arg.Int (( := ) batch_metrics),
" size of metrics batch" );
"--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop";
"--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop";
] |> Arg.align in ]
|> Arg.align
in
Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; Arg.parse opts (fun _ -> ()) "emit1 [opt]*";
let some_if_nzero r = if !r > 0 then Some !r else None in let some_if_nzero r =
let config = Opentelemetry_client_ocurl.Config.make if !r > 0 then
~debug:!debug Some !r
else
None
in
let config =
Opentelemetry_client_ocurl.Config.make ~debug:!debug
~batch_traces:(some_if_nzero batch_traces) ~batch_traces:(some_if_nzero batch_traces)
~batch_metrics:(some_if_nzero batch_metrics) ~batch_metrics:(some_if_nzero batch_metrics)
~thread:!thread () in ~thread:!thread ()
in
Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@."
!sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config; !sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config;
Format.printf "Check HTTP requests at https://requestbin.com/r/enec1hql02hz/26qShWryt5vJc1JfrOwalhr5vQt@."; Format.printf
"Check HTTP requests at \
https://requestbin.com/r/enec1hql02hz/26qShWryt5vJc1JfrOwalhr5vQt@.";
Opentelemetry_client_ocurl.with_setup ~config () (fun () -> Lwt_main.run (run ())) Opentelemetry_client_ocurl.with_setup ~config () (fun () ->
Lwt_main.run (run ()))

View file

@ -6,4 +6,5 @@
(executable (executable
(name cohttp_client) (name cohttp_client)
(modules cohttp_client) (modules cohttp_client)
(libraries cohttp-lwt-unix opentelemetry opentelemetry-client-ocurl opentelemetry-cohttp-lwt)) (libraries cohttp-lwt-unix opentelemetry opentelemetry-client-ocurl
opentelemetry-cohttp-lwt))

View file

@ -1,9 +1,11 @@
module T = Opentelemetry module T = Opentelemetry
let spf = Printf.sprintf let spf = Printf.sprintf
let ( let@ ) f x = f x let ( let@ ) f x = f x
let sleep_inner = ref 0.1 let sleep_inner = ref 0.1
let sleep_outer = ref 2.0 let sleep_outer = ref 2.0
let run () = let run () =
@ -13,39 +15,39 @@ let run () =
let i = ref 0 in let i = ref 0 in
while true do while true do
let@ scope = let@ scope =
T.Trace.with_ T.Trace.with_ ~kind:T.Span.Span_kind_producer "loop.outer"
~kind:T.Span.Span_kind_producer ~attrs:[ "i", `Int !i ]
"loop.outer" ~attrs:["i", `Int !i] in in
for j = 0 to 4 do for j = 0 to 4 do
let@ scope =
let@ scope = T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope
~attrs:[ "j", `Int j ] ~attrs:[ "j", `Int j ]
"loop.inner" in "loop.inner"
in
Unix.sleepf !sleep_outer; Unix.sleepf !sleep_outer;
T.Logs.(emit [ T.Logs.(
emit
[
make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id
~severity:Severity_number_info ~severity:Severity_number_info "inner at %d" j;
"inner at %d" j
]); ]);
incr i; incr i;
(try try
let@ _ = let@ _ = T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope "alloc" in
T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope
"alloc" in
(* allocate some stuff *) (* allocate some stuff *)
let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in
ignore _arr; ignore _arr;
Unix.sleepf !sleep_inner; Unix.sleepf !sleep_inner;
if j=4 && !i mod 13 = 0 then failwith "oh no"; (* simulate a failure *) if j = 4 && !i mod 13 = 0 then failwith "oh no";
T.Trace.add_event scope (fun()->T.Event.make "done with alloc"); (* simulate a failure *)
with Failure _ -> T.Trace.add_event scope (fun () -> T.Event.make "done with alloc")
()); with Failure _ -> ()
done; done
done done
let () = let () =
@ -57,23 +59,34 @@ let () =
let thread = ref true in let thread = ref true in
let batch_traces = ref 400 in let batch_traces = ref 400 in
let batch_metrics = ref 3 in let batch_metrics = ref 3 in
let opts = [ let opts =
[
"--debug", Arg.Bool (( := ) debug), " enable debug output"; "--debug", Arg.Bool (( := ) debug), " enable debug output";
"--thread", Arg.Bool (( := ) thread), " use a background thread"; "--thread", Arg.Bool (( := ) thread), " use a background thread";
"--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch"; "--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch";
"--batch-metrics", Arg.Int ((:=) batch_metrics), " size of metrics batch"; ( "--batch-metrics",
Arg.Int (( := ) batch_metrics),
" size of metrics batch" );
"--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop";
"--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop";
] |> Arg.align in ]
|> Arg.align
in
Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; Arg.parse opts (fun _ -> ()) "emit1 [opt]*";
let some_if_nzero r = if !r > 0 then Some !r else None in let some_if_nzero r =
let config = Opentelemetry_client_ocurl.Config.make if !r > 0 then
~debug:!debug Some !r
else
None
in
let config =
Opentelemetry_client_ocurl.Config.make ~debug:!debug
~batch_traces:(some_if_nzero batch_traces) ~batch_traces:(some_if_nzero batch_traces)
~batch_metrics:(some_if_nzero batch_metrics) ~batch_metrics:(some_if_nzero batch_metrics)
~thread:!thread () in ~thread:!thread ()
in
Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@."
!sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config; !sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config;

View file

@ -2,39 +2,49 @@ open Opentelemetry
let pp_traceparent fmt (trace_id, parent_id) = let pp_traceparent fmt (trace_id, parent_id) =
let open Format in let open Format in
fprintf fmt "trace_id:%S parent_id:%S" fprintf fmt "trace_id:%S parent_id:%S" (Trace_id.to_hex trace_id)
(Trace_id.to_hex trace_id)
(Span_id.to_hex parent_id) (Span_id.to_hex parent_id)
let test_of_value str = let test_of_value str =
let open Format in let open Format in
printf "@[<v 2>Trace_context.Traceparent.of_value %S:@ %a@]@." printf "@[<v 2>Trace_context.Traceparent.of_value %S:@ %a@]@." str
str (pp_print_result
(pp_print_result ~ok:(fun fmt (trace_id, parent_id) -> ~ok:(fun fmt (trace_id, parent_id) ->
fprintf fmt "Ok %a" pp_traceparent (trace_id, parent_id)) fprintf fmt "Ok %a" pp_traceparent (trace_id, parent_id))
~error:(fun fmt msg -> fprintf fmt "Error %S" msg)) ~error:(fun fmt msg -> fprintf fmt "Error %S" msg))
(Trace_context.Traceparent.of_value str) (Trace_context.Traceparent.of_value str)
let () = test_of_value "xx" let () = test_of_value "xx"
let () = test_of_value "00" let () = test_of_value "00"
let () = test_of_value "00-xxxx" let () = test_of_value "00-xxxx"
let () = test_of_value "00-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" let () = test_of_value "00-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
let () = test_of_value "00-0123456789abcdef0123456789abcdef" let () = test_of_value "00-0123456789abcdef0123456789abcdef"
let () = test_of_value "00-0123456789abcdef0123456789abcdef-xxxx" let () = test_of_value "00-0123456789abcdef0123456789abcdef-xxxx"
let () = test_of_value "00-0123456789abcdef0123456789abcdef-xxxxxxxxxxxxxxxx" let () = test_of_value "00-0123456789abcdef0123456789abcdef-xxxxxxxxxxxxxxxx"
let () = test_of_value "00-0123456789abcdef0123456789abcdef-0123456789abcdef" let () = test_of_value "00-0123456789abcdef0123456789abcdef-0123456789abcdef"
let () = test_of_value "00-0123456789abcdef0123456789abcdef-0123456789abcdef-" let () = test_of_value "00-0123456789abcdef0123456789abcdef-0123456789abcdef-"
let () = test_of_value "00-0123456789abcdef0123456789abcdef-0123456789abcdef-00" let () = test_of_value "00-0123456789abcdef0123456789abcdef-0123456789abcdef-00"
let () = test_of_value "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01" let () = test_of_value "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
let () = print_endline "" let () = print_endline ""
let test_to_value trace_id parent_id = let test_to_value trace_id parent_id =
let open Format in let open Format in
printf "@[<v 2>Trace_context.Traceparent.to_value %a:@ %S@]@." printf "@[<v 2>Trace_context.Traceparent.to_value %a:@ %S@]@." pp_traceparent
pp_traceparent (trace_id, parent_id) (trace_id, parent_id)
(Trace_context.Traceparent.to_value ~trace_id ~parent_id ()) (Trace_context.Traceparent.to_value ~trace_id ~parent_id ())
let () =
let () = test_to_value (Trace_id.of_hex "4bf92f3577b34da6a3ce929d0e0e4736") (Span_id.of_hex "00f067aa0ba902b7") test_to_value
(Trace_id.of_hex "4bf92f3577b34da6a3ce929d0e0e4736")
(Span_id.of_hex "00f067aa0ba902b7")