mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-08 03:47:59 -04:00
Merge pull request #21 from imandra-ai/fix-rand-trace-id
Fix rand trace
This commit is contained in:
commit
35ae22746d
10 changed files with 576 additions and 432 deletions
14
.ocamlformat
Normal file
14
.ocamlformat
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
version = 0.20.0
|
||||
profile=conventional
|
||||
margin=80
|
||||
if-then-else=k-r
|
||||
parens-ite=true
|
||||
parens-tuple=multi-line-only
|
||||
sequence-style=terminator
|
||||
type-decl=sparse
|
||||
break-cases=toplevel
|
||||
cases-exp-indent=2
|
||||
field-space=tight-decl
|
||||
leading-nested-match-parens=true
|
||||
module-item-spacing=sparse
|
||||
quiet=true
|
||||
|
|
@ -1,21 +1,24 @@
|
|||
module Atomic = Opentelemetry_atomic.Atomic
|
||||
include Opentelemetry.Lock
|
||||
|
||||
let[@inline] (let@) f x = f x
|
||||
let[@inline] ( let@ ) f x = f x
|
||||
|
||||
let debug_ = ref (match Sys.getenv_opt "OTEL_OCAML_DEBUG" with Some ("1"|"true") -> true | _ -> false)
|
||||
let debug_ =
|
||||
ref
|
||||
(match Sys.getenv_opt "OTEL_OCAML_DEBUG" with
|
||||
| Some ("1" | "true") -> true
|
||||
| _ -> false)
|
||||
|
||||
let lock_ : (unit -> unit) ref = ref ignore
|
||||
let unlock_ : (unit -> unit) ref = ref ignore
|
||||
let default_url = "http://localhost:4318"
|
||||
|
||||
let set_mutex ~lock ~unlock : unit =
|
||||
lock_ := lock;
|
||||
unlock_ := unlock
|
||||
let url =
|
||||
ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url)
|
||||
|
||||
(* critical section for [f()] *)
|
||||
let[@inline] with_lock_ f =
|
||||
!lock_();
|
||||
Fun.protect ~finally:!unlock_ f
|
||||
let get_url () = !url
|
||||
|
||||
let set_url s = url := s
|
||||
|
||||
(** [with_mutex m f] calls [f()] in a section where [m] is locked. *)
|
||||
let[@inline] with_mutex_ m f =
|
||||
Mutex.lock m;
|
||||
Fun.protect ~finally:(fun () -> Mutex.unlock m) f
|
||||
|
|
@ -25,11 +28,21 @@ let parse_headers s =
|
|||
String.split_on_char ',' s |> List.map parse_header
|
||||
|
||||
let default_url = "http://localhost:4318"
|
||||
|
||||
let default_headers = []
|
||||
let url = ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url)
|
||||
let headers = ref (try parse_headers (Sys.getenv "OTEL_EXPORTER_OTLP_HEADERS") with _ -> default_headers)
|
||||
|
||||
let url =
|
||||
ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url)
|
||||
|
||||
let headers =
|
||||
ref
|
||||
(try parse_headers (Sys.getenv "OTEL_EXPORTER_OTLP_HEADERS")
|
||||
with _ -> default_headers)
|
||||
|
||||
let get_url () = !url
|
||||
|
||||
let set_url s = url := s
|
||||
|
||||
let get_headers () = !headers
|
||||
|
||||
let set_headers s = headers := s
|
||||
|
|
|
|||
|
|
@ -1,34 +0,0 @@
|
|||
|
||||
open Common_
|
||||
|
||||
(* generate random IDs *)
|
||||
module Make() = struct
|
||||
let rand_ = Random.State.make_self_init()
|
||||
|
||||
let rand_bytes_8 () : bytes =
|
||||
let@() = with_lock_ in
|
||||
let b = Bytes.create 8 in
|
||||
for i=0 to 1 do
|
||||
let r = Random.State.bits rand_ in (* 30 bits, of which we use 24 *)
|
||||
Bytes.set b (i*3) (Char.chr (r land 0xff));
|
||||
Bytes.set b (i*3+1) (Char.chr (r lsr 8 land 0xff));
|
||||
Bytes.set b (i*3+2) (Char.chr (r lsr 16 land 0xff));
|
||||
done;
|
||||
let r = Random.State.bits rand_ in
|
||||
Bytes.set b 6 (Char.chr (r land 0xff));
|
||||
Bytes.set b 7 (Char.chr (r lsr 8 land 0xff));
|
||||
b
|
||||
|
||||
let rand_bytes_16 () : bytes =
|
||||
let@() = with_lock_ in
|
||||
let b = Bytes.create 16 in
|
||||
for i=0 to 4 do
|
||||
let r = Random.State.bits rand_ in (* 30 bits, of which we use 24 *)
|
||||
Bytes.set b (i*3) (Char.chr (r land 0xff));
|
||||
Bytes.set b (i*3+1) (Char.chr (r lsr 8 land 0xff));
|
||||
Bytes.set b (i*3+2) (Char.chr (r lsr 16 land 0xff));
|
||||
done;
|
||||
let r = Random.State.bits rand_ in
|
||||
Bytes.set b 15 (Char.chr (r land 0xff)); (* last byte *)
|
||||
b
|
||||
end
|
||||
|
|
@ -1,4 +1,3 @@
|
|||
|
||||
(*
|
||||
https://github.com/open-telemetry/oteps/blob/main/text/0035-opentelemetry-protocol.md
|
||||
https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md
|
||||
|
|
@ -10,48 +9,56 @@ include Common_
|
|||
|
||||
let needs_gc_metrics = Atomic.make false
|
||||
|
||||
let gc_metrics = AList.make() (* side channel for GC, appended to {!E_metrics}'s data *)
|
||||
let gc_metrics = AList.make ()
|
||||
(* side channel for GC, appended to {!E_metrics}'s data *)
|
||||
|
||||
(* capture current GC metrics and push them into {!gc_metrics} for later
|
||||
collection *)
|
||||
let sample_gc_metrics () =
|
||||
Atomic.set needs_gc_metrics false;
|
||||
let l = OT.Metrics.make_resource_metrics
|
||||
~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ())
|
||||
@@ Opentelemetry.GC_metrics.get_metrics() in
|
||||
let l =
|
||||
OT.Metrics.make_resource_metrics
|
||||
~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ())
|
||||
@@ Opentelemetry.GC_metrics.get_metrics ()
|
||||
in
|
||||
AList.add gc_metrics l
|
||||
|
||||
module Config = Config
|
||||
|
||||
let _init_curl = lazy (
|
||||
Curl.global_init Curl.CURLINIT_GLOBALALL;
|
||||
at_exit Curl.global_cleanup;
|
||||
)
|
||||
let _init_curl =
|
||||
lazy
|
||||
(Curl.global_init Curl.CURLINIT_GLOBALALL;
|
||||
at_exit Curl.global_cleanup)
|
||||
|
||||
type error = [
|
||||
| `Status of int * Opentelemetry.Proto.Status.status
|
||||
type error =
|
||||
[ `Status of int * Opentelemetry.Proto.Status.status
|
||||
| `Failure of string
|
||||
]
|
||||
]
|
||||
|
||||
let n_errors = Atomic.make 0
|
||||
|
||||
let n_dropped = Atomic.make 0
|
||||
|
||||
let report_err_ = function
|
||||
| `Failure msg ->
|
||||
Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg
|
||||
| `Status (code, status) ->
|
||||
Format.eprintf "@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@."
|
||||
code Proto.Status.pp_status status
|
||||
Format.eprintf
|
||||
"@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." code
|
||||
Proto.Status.pp_status status
|
||||
|
||||
module type CURL = sig
|
||||
val send : path:string -> decode:(Pbrt.Decoder.t -> 'a) -> string -> ('a, error) result
|
||||
val send :
|
||||
path:string -> decode:(Pbrt.Decoder.t -> 'a) -> string -> ('a, error) result
|
||||
|
||||
val cleanup : unit -> unit
|
||||
end
|
||||
|
||||
(* create a curl client *)
|
||||
module Curl() : CURL = struct
|
||||
module Curl () : CURL = struct
|
||||
open Opentelemetry.Proto
|
||||
let() = Lazy.force _init_curl
|
||||
|
||||
let () = Lazy.force _init_curl
|
||||
|
||||
let buf_res = Buffer.create 256
|
||||
|
||||
|
|
@ -65,37 +72,38 @@ module Curl() : CURL = struct
|
|||
(* TODO: use Curl multi *)
|
||||
|
||||
(* send the content to the remote endpoint/path *)
|
||||
let send ~path ~decode (bod:string) : ('a, error) result =
|
||||
let send ~path ~decode (bod : string) : ('a, error) result =
|
||||
Curl.reset curl;
|
||||
if !debug_ then Curl.set_verbose curl true;
|
||||
Curl.set_url curl (!url ^ path);
|
||||
Curl.set_httppost curl [];
|
||||
let to_http_header (k, v) = Printf.sprintf "%s: %s" k v in
|
||||
let http_headers = List.map to_http_header !headers in
|
||||
Curl.set_httpheader curl ("Content-Type: application/x-protobuf" :: http_headers);
|
||||
Curl.set_httpheader curl
|
||||
("Content-Type: application/x-protobuf" :: http_headers);
|
||||
(* write body *)
|
||||
Curl.set_post curl true;
|
||||
Curl.set_postfieldsize curl (String.length bod);
|
||||
Curl.set_readfunction curl
|
||||
begin
|
||||
let i = ref 0 in
|
||||
(fun n ->
|
||||
if !debug_ then Printf.eprintf "curl asks for %d bytes\n%!" n;
|
||||
let len = min n (String.length bod - !i) in
|
||||
let s = String.sub bod !i len in
|
||||
if !debug_ then Printf.eprintf "gave curl %d bytes\n%!" len;
|
||||
i := !i + len;
|
||||
s)
|
||||
end;
|
||||
(let i = ref 0 in
|
||||
fun n ->
|
||||
if !debug_ then Printf.eprintf "curl asks for %d bytes\n%!" n;
|
||||
let len = min n (String.length bod - !i) in
|
||||
let s = String.sub bod !i len in
|
||||
if !debug_ then Printf.eprintf "gave curl %d bytes\n%!" len;
|
||||
i := !i + len;
|
||||
s);
|
||||
(* read result's body *)
|
||||
Buffer.clear buf_res;
|
||||
Curl.set_writefunction curl
|
||||
(fun s -> Buffer.add_string buf_res s; String.length s);
|
||||
Curl.set_writefunction curl (fun s ->
|
||||
Buffer.add_string buf_res s;
|
||||
String.length s);
|
||||
try
|
||||
match Curl.perform curl with
|
||||
| () ->
|
||||
let code = Curl.get_responsecode curl in
|
||||
if !debug_ then Printf.eprintf "result body: %S\n%!" (Buffer.contents buf_res);
|
||||
if !debug_ then
|
||||
Printf.eprintf "result body: %S\n%!" (Buffer.contents buf_res);
|
||||
let dec = Pbrt.Decoder.of_string (Buffer.contents buf_res) in
|
||||
if code >= 200 && code < 300 then (
|
||||
let res = decode dec in
|
||||
|
|
@ -105,17 +113,24 @@ module Curl() : CURL = struct
|
|||
Error (`Status (code, status))
|
||||
)
|
||||
| exception Curl.CurlException (_, code, msg) ->
|
||||
let status = Status.default_status
|
||||
~code:(Int32.of_int code) ~message:(Bytes.unsafe_of_string msg) () in
|
||||
Error(`Status (code, status))
|
||||
let status =
|
||||
Status.default_status ~code:(Int32.of_int code)
|
||||
~message:(Bytes.unsafe_of_string msg)
|
||||
()
|
||||
in
|
||||
Error (`Status (code, status))
|
||||
with e -> Error (`Failure (Printexc.to_string e))
|
||||
end
|
||||
|
||||
module type PUSH = sig
|
||||
type elt
|
||||
|
||||
val push : elt -> unit
|
||||
|
||||
val is_empty : unit -> bool
|
||||
|
||||
val is_big_enough : unit -> bool
|
||||
|
||||
val pop_iter_all : (elt -> unit) -> unit
|
||||
end
|
||||
|
||||
|
|
@ -125,161 +140,179 @@ module type EMITTER = sig
|
|||
open Opentelemetry.Proto
|
||||
|
||||
val push_trace : Trace.resource_spans list -> unit
|
||||
|
||||
val push_metrics : Metrics.resource_metrics list -> unit
|
||||
|
||||
val set_on_tick_callbacks : (unit -> unit) list ref -> unit
|
||||
|
||||
val tick : unit -> unit
|
||||
|
||||
val cleanup : unit -> unit
|
||||
end
|
||||
|
||||
type 'a push = (module PUSH with type elt = 'a)
|
||||
type on_full_cb = (unit -> unit)
|
||||
|
||||
type on_full_cb = unit -> unit
|
||||
|
||||
(* make a "push" object, along with a setter for a callback to call when
|
||||
it's ready to emit a batch *)
|
||||
let mk_push (type a) ?batch () : (module PUSH with type elt = a) * (on_full_cb -> unit) =
|
||||
let on_full: on_full_cb ref = ref ignore in
|
||||
let mk_push (type a) ?batch () :
|
||||
(module PUSH with type elt = a) * (on_full_cb -> unit) =
|
||||
let on_full : on_full_cb ref = ref ignore in
|
||||
let push =
|
||||
match batch with
|
||||
| None ->
|
||||
let r = ref None in
|
||||
let module M = struct
|
||||
type elt = a
|
||||
|
||||
let is_empty () = !r == None
|
||||
|
||||
let is_big_enough () = !r != None
|
||||
|
||||
let push x =
|
||||
r := Some x; !on_full()
|
||||
let pop_iter_all f = Option.iter f !r; r := None
|
||||
r := Some x;
|
||||
!on_full ()
|
||||
|
||||
let pop_iter_all f =
|
||||
Option.iter f !r;
|
||||
r := None
|
||||
end in
|
||||
(module M : PUSH with type elt = a)
|
||||
|
||||
| Some n ->
|
||||
let q = FQueue.create ~dummy:(Obj.magic 0) (3 * n) in
|
||||
let module M = struct
|
||||
type elt = a
|
||||
|
||||
let is_empty () = FQueue.size q = 0
|
||||
|
||||
let is_big_enough () = FQueue.size q >= n
|
||||
|
||||
let push x =
|
||||
if not (FQueue.push q x) || FQueue.size q > n then (
|
||||
!on_full();
|
||||
if not (FQueue.push q x) then (
|
||||
Atomic.incr n_dropped; (* drop item *)
|
||||
)
|
||||
if (not (FQueue.push q x)) || FQueue.size q > n then (
|
||||
!on_full ();
|
||||
if not (FQueue.push q x) then Atomic.incr n_dropped (* drop item *)
|
||||
)
|
||||
|
||||
let pop_iter_all f = FQueue.pop_iter_all q f
|
||||
end in
|
||||
(module M : PUSH with type elt = a)
|
||||
|
||||
in
|
||||
push, ((:=) on_full)
|
||||
|
||||
push, ( := ) on_full
|
||||
|
||||
(* start a thread in the background, running [f()] *)
|
||||
let start_bg_thread (f: unit -> unit) : unit =
|
||||
let run() =
|
||||
let start_bg_thread (f : unit -> unit) : unit =
|
||||
let run () =
|
||||
(* block some signals: USR1 USR2 TERM PIPE ALARM STOP, see [$ kill -L] *)
|
||||
ignore (Thread.sigmask Unix.SIG_BLOCK [10; 12; 13; 14; 15; 19] : _ list);
|
||||
f()
|
||||
ignore (Thread.sigmask Unix.SIG_BLOCK [ 10; 12; 13; 14; 15; 19 ] : _ list);
|
||||
f ()
|
||||
in
|
||||
ignore (Thread.create run () : Thread.t)
|
||||
|
||||
let l_is_empty = function [] -> true | _::_ -> false
|
||||
let l_is_empty = function
|
||||
| [] -> true
|
||||
| _ :: _ -> false
|
||||
|
||||
let batch_is_empty = List.for_all l_is_empty
|
||||
|
||||
(* make an emitter.
|
||||
|
||||
exceptions inside should be caught, see
|
||||
https://opentelemetry.io/docs/reference/specification/error-handling/ *)
|
||||
let mk_emitter ~(config:Config.t) () : (module EMITTER) =
|
||||
let mk_emitter ~(config : Config.t) () : (module EMITTER) =
|
||||
let open Proto in
|
||||
|
||||
let continue = ref true in
|
||||
|
||||
let ((module E_trace) : Trace.resource_spans list push), on_trace_full =
|
||||
mk_push ?batch:config.batch_traces () in
|
||||
let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full =
|
||||
mk_push ?batch:config.batch_metrics () in
|
||||
|
||||
let encoder = Pbrt.Encoder.create() in
|
||||
|
||||
let ((module C) as curl) = (module Curl() : CURL) in
|
||||
|
||||
let on_tick_cbs_ = ref (ref []) in
|
||||
let set_on_tick_callbacks = (:=) on_tick_cbs_ in
|
||||
|
||||
let send_metrics_http (l:Metrics.resource_metrics list list) =
|
||||
Pbrt.Encoder.reset encoder;
|
||||
let resource_metrics =
|
||||
List.fold_left (fun acc l -> List.rev_append l acc) [] l in
|
||||
Metrics_service.encode_export_metrics_service_request
|
||||
(Metrics_service.default_export_metrics_service_request
|
||||
~resource_metrics ())
|
||||
encoder;
|
||||
let data = Pbrt.Encoder.to_string encoder in
|
||||
begin match
|
||||
C.send ~path:"/v1/metrics" ~decode:(fun _ -> ())
|
||||
data
|
||||
with
|
||||
| Ok () -> ()
|
||||
| Error err ->
|
||||
(* TODO: log error _via_ otel? *)
|
||||
Atomic.incr n_errors;
|
||||
report_err_ err
|
||||
end;
|
||||
mk_push ?batch:config.batch_traces ()
|
||||
in
|
||||
let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full
|
||||
=
|
||||
mk_push ?batch:config.batch_metrics ()
|
||||
in
|
||||
|
||||
let send_traces_http (l:Trace.resource_spans list list) =
|
||||
let encoder = Pbrt.Encoder.create () in
|
||||
|
||||
let ((module C) as curl) = (module Curl () : CURL) in
|
||||
|
||||
let on_tick_cbs_ = ref (ref []) in
|
||||
let set_on_tick_callbacks = ( := ) on_tick_cbs_ in
|
||||
|
||||
let send_metrics_http (l : Metrics.resource_metrics list list) =
|
||||
Pbrt.Encoder.reset encoder;
|
||||
let resource_metrics =
|
||||
List.fold_left (fun acc l -> List.rev_append l acc) [] l
|
||||
in
|
||||
Metrics_service.encode_export_metrics_service_request
|
||||
(Metrics_service.default_export_metrics_service_request ~resource_metrics
|
||||
())
|
||||
encoder;
|
||||
let data = Pbrt.Encoder.to_string encoder in
|
||||
match C.send ~path:"/v1/metrics" ~decode:(fun _ -> ()) data with
|
||||
| Ok () -> ()
|
||||
| Error err ->
|
||||
(* TODO: log error _via_ otel? *)
|
||||
Atomic.incr n_errors;
|
||||
report_err_ err
|
||||
in
|
||||
|
||||
let send_traces_http (l : Trace.resource_spans list list) =
|
||||
Pbrt.Encoder.reset encoder;
|
||||
let resource_spans =
|
||||
List.fold_left (fun acc l -> List.rev_append l acc) [] l in
|
||||
List.fold_left (fun acc l -> List.rev_append l acc) [] l
|
||||
in
|
||||
Trace_service.encode_export_trace_service_request
|
||||
(Trace_service.default_export_trace_service_request ~resource_spans ())
|
||||
encoder;
|
||||
begin match
|
||||
C.send ~path:"/v1/traces" ~decode:(fun _ -> ())
|
||||
(Pbrt.Encoder.to_string encoder)
|
||||
with
|
||||
| Ok () -> ()
|
||||
| Error err ->
|
||||
(* TODO: log error _via_ otel? *)
|
||||
Atomic.incr n_errors;
|
||||
report_err_ err
|
||||
end;
|
||||
match
|
||||
C.send ~path:"/v1/traces"
|
||||
~decode:(fun _ -> ())
|
||||
(Pbrt.Encoder.to_string encoder)
|
||||
with
|
||||
| Ok () -> ()
|
||||
| Error err ->
|
||||
(* TODO: log error _via_ otel? *)
|
||||
Atomic.incr n_errors;
|
||||
report_err_ err
|
||||
in
|
||||
|
||||
let last_wakeup = Atomic.make (Mtime_clock.now()) in
|
||||
let last_wakeup = Atomic.make (Mtime_clock.now ()) in
|
||||
let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in
|
||||
let batch_timeout() : bool =
|
||||
let elapsed = Mtime.span (Mtime_clock.now()) (Atomic.get last_wakeup) in
|
||||
let batch_timeout () : bool =
|
||||
let elapsed = Mtime.span (Mtime_clock.now ()) (Atomic.get last_wakeup) in
|
||||
Mtime.Span.compare elapsed timeout >= 0
|
||||
in
|
||||
|
||||
let emit_metrics ?(force=false) () : bool =
|
||||
if force || (not force && E_metrics.is_big_enough ()) then (
|
||||
let batch = ref [AList.pop_all gc_metrics] in
|
||||
let emit_metrics ?(force = false) () : bool =
|
||||
if force || ((not force) && E_metrics.is_big_enough ()) then (
|
||||
let batch = ref [ AList.pop_all gc_metrics ] in
|
||||
E_metrics.pop_iter_all (fun l -> batch := l :: !batch);
|
||||
let do_something = not (l_is_empty !batch) in
|
||||
if do_something then (
|
||||
send_metrics_http !batch;
|
||||
Atomic.set last_wakeup (Mtime_clock.now());
|
||||
Atomic.set last_wakeup (Mtime_clock.now ())
|
||||
);
|
||||
do_something
|
||||
) else false
|
||||
) else
|
||||
false
|
||||
in
|
||||
let emit_traces ?(force=false) () : bool =
|
||||
if force || (not force && E_trace.is_big_enough ()) then (
|
||||
let emit_traces ?(force = false) () : bool =
|
||||
if force || ((not force) && E_trace.is_big_enough ()) then (
|
||||
let batch = ref [] in
|
||||
E_trace.pop_iter_all (fun l -> batch := l :: !batch);
|
||||
let do_something = not (l_is_empty !batch) in
|
||||
if do_something then (
|
||||
send_traces_http !batch;
|
||||
Atomic.set last_wakeup (Mtime_clock.now());
|
||||
Atomic.set last_wakeup (Mtime_clock.now ())
|
||||
);
|
||||
do_something
|
||||
) else false
|
||||
) else
|
||||
false
|
||||
in
|
||||
|
||||
let[@inline] guard f =
|
||||
try f()
|
||||
try f ()
|
||||
with e ->
|
||||
Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!"
|
||||
(Printexc.to_string e)
|
||||
|
|
@ -288,63 +321,60 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
|
|||
let emit_all_force () =
|
||||
let@ () = guard in
|
||||
ignore (emit_traces ~force:true () : bool);
|
||||
ignore (emit_metrics ~force:true () : bool);
|
||||
ignore (emit_metrics ~force:true () : bool)
|
||||
in
|
||||
|
||||
|
||||
if config.thread then (
|
||||
begin
|
||||
let m = Mutex.create() in
|
||||
set_mutex ~lock:(fun () -> Mutex.lock m) ~unlock:(fun () -> Mutex.unlock m);
|
||||
end;
|
||||
(let m = Mutex.create () in
|
||||
Lock.set_mutex
|
||||
~lock:(fun () -> Mutex.lock m)
|
||||
~unlock:(fun () -> Mutex.unlock m));
|
||||
|
||||
let ((module C) as curl) = (module Curl() : CURL) in
|
||||
let ((module C) as curl) = (module Curl () : CURL) in
|
||||
|
||||
let m = Mutex.create() in
|
||||
let cond = Condition.create() in
|
||||
let m = Mutex.create () in
|
||||
let cond = Condition.create () in
|
||||
|
||||
(* loop for the thread that processes events and sends them to collector *)
|
||||
let bg_thread () =
|
||||
while !continue do
|
||||
let@ () = guard in
|
||||
let timeout = batch_timeout() in
|
||||
let timeout = batch_timeout () in
|
||||
|
||||
let do_metrics = emit_metrics ~force:timeout () in
|
||||
let do_traces = emit_traces ~force:timeout () in
|
||||
if not do_metrics && not do_traces then (
|
||||
if (not do_metrics) && not do_traces then
|
||||
(* wait *)
|
||||
let@ () = with_mutex_ m in
|
||||
Condition.wait cond m;
|
||||
)
|
||||
Condition.wait cond m
|
||||
done;
|
||||
(* flush remaining events *)
|
||||
begin
|
||||
let@ () = guard in
|
||||
ignore (emit_traces ~force:true () : bool);
|
||||
ignore (emit_metrics ~force:true () : bool);
|
||||
C.cleanup();
|
||||
end
|
||||
let@ () = guard in
|
||||
ignore (emit_traces ~force:true () : bool);
|
||||
ignore (emit_metrics ~force:true () : bool);
|
||||
C.cleanup ()
|
||||
in
|
||||
start_bg_thread bg_thread;
|
||||
|
||||
let wakeup () =
|
||||
with_mutex_ m (fun () -> Condition.signal cond);
|
||||
Thread.yield()
|
||||
Thread.yield ()
|
||||
in
|
||||
|
||||
(* wake up if a batch is full *)
|
||||
on_metrics_full wakeup;
|
||||
on_trace_full wakeup;
|
||||
|
||||
let tick() =
|
||||
if Atomic.get needs_gc_metrics then sample_gc_metrics();
|
||||
let tick () =
|
||||
if Atomic.get needs_gc_metrics then sample_gc_metrics ();
|
||||
List.iter
|
||||
(fun f ->
|
||||
try f()
|
||||
try f ()
|
||||
with e ->
|
||||
Printf.eprintf "on tick callback raised: %s\n" (Printexc.to_string e))
|
||||
Printf.eprintf "on tick callback raised: %s\n"
|
||||
(Printexc.to_string e))
|
||||
!(!on_tick_cbs_);
|
||||
if batch_timeout() then wakeup()
|
||||
if batch_timeout () then wakeup ()
|
||||
in
|
||||
|
||||
if config.ticker_thread then (
|
||||
|
|
@ -352,89 +382,97 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
|
|||
let tick_thread () =
|
||||
while true do
|
||||
Thread.delay 0.5;
|
||||
tick();
|
||||
tick ()
|
||||
done
|
||||
in
|
||||
|
||||
start_bg_thread tick_thread;
|
||||
start_bg_thread tick_thread
|
||||
);
|
||||
|
||||
let module M = struct
|
||||
let push_trace e =
|
||||
E_trace.push e;
|
||||
if batch_timeout() then wakeup()
|
||||
if batch_timeout () then wakeup ()
|
||||
|
||||
let push_metrics e =
|
||||
E_metrics.push e;
|
||||
if batch_timeout() then wakeup()
|
||||
if batch_timeout () then wakeup ()
|
||||
|
||||
let set_on_tick_callbacks = set_on_tick_callbacks
|
||||
let tick=tick
|
||||
|
||||
let tick = tick
|
||||
|
||||
let cleanup () =
|
||||
continue := false;
|
||||
with_mutex_ m (fun () -> Condition.broadcast cond)
|
||||
end in
|
||||
(module M)
|
||||
) else (
|
||||
|
||||
on_metrics_full (fun () ->
|
||||
if Atomic.get needs_gc_metrics then sample_gc_metrics();
|
||||
if Atomic.get needs_gc_metrics then sample_gc_metrics ();
|
||||
ignore (emit_metrics () : bool));
|
||||
on_trace_full (fun () ->
|
||||
ignore (emit_traces () : bool));
|
||||
on_trace_full (fun () -> ignore (emit_traces () : bool));
|
||||
|
||||
let cleanup () =
|
||||
emit_all_force();
|
||||
C.cleanup();
|
||||
emit_all_force ();
|
||||
C.cleanup ()
|
||||
in
|
||||
|
||||
let module M = struct
|
||||
let push_trace e =
|
||||
let@() = guard in
|
||||
let@ () = guard in
|
||||
E_trace.push e;
|
||||
if batch_timeout() then emit_all_force()
|
||||
if batch_timeout () then emit_all_force ()
|
||||
|
||||
let push_metrics e =
|
||||
let@() = guard in
|
||||
let@ () = guard in
|
||||
E_metrics.push e;
|
||||
if batch_timeout() then emit_all_force()
|
||||
if batch_timeout () then emit_all_force ()
|
||||
|
||||
let set_on_tick_callbacks = set_on_tick_callbacks
|
||||
|
||||
let tick () =
|
||||
if Atomic.get needs_gc_metrics then sample_gc_metrics();
|
||||
if batch_timeout() then emit_all_force()
|
||||
if Atomic.get needs_gc_metrics then sample_gc_metrics ();
|
||||
if batch_timeout () then emit_all_force ()
|
||||
|
||||
let cleanup = cleanup
|
||||
end in
|
||||
(module M)
|
||||
)
|
||||
|
||||
module Backend(Arg : sig val config : Config.t end)()
|
||||
: Opentelemetry.Collector.BACKEND
|
||||
= struct
|
||||
include Gen_ids.Make()
|
||||
|
||||
module Backend (Arg : sig
|
||||
val config : Config.t
|
||||
end)
|
||||
() : Opentelemetry.Collector.BACKEND = struct
|
||||
include (val mk_emitter ~config:Arg.config ())
|
||||
|
||||
open Opentelemetry.Proto
|
||||
open Opentelemetry.Collector
|
||||
|
||||
let send_trace : Trace.resource_spans list sender = {
|
||||
send=fun l ~ret ->
|
||||
let@() = with_lock_ in
|
||||
if !debug_ then Format.eprintf "send spans %a@." (Format.pp_print_list Trace.pp_resource_spans) l;
|
||||
push_trace l;
|
||||
ret()
|
||||
}
|
||||
let send_trace : Trace.resource_spans list sender =
|
||||
{
|
||||
send =
|
||||
(fun l ~ret ->
|
||||
let@ () = Lock.with_lock in
|
||||
if !debug_ then
|
||||
Format.eprintf "send spans %a@."
|
||||
(Format.pp_print_list Trace.pp_resource_spans)
|
||||
l;
|
||||
push_trace l;
|
||||
ret ());
|
||||
}
|
||||
|
||||
let last_sent_metrics = Atomic.make (Mtime_clock.now())
|
||||
let timeout_sent_metrics = Mtime.Span.(5 * s) (* send metrics from time to time *)
|
||||
let last_sent_metrics = Atomic.make (Mtime_clock.now ())
|
||||
|
||||
let timeout_sent_metrics = Mtime.Span.(5 * s)
|
||||
(* send metrics from time to time *)
|
||||
|
||||
let signal_emit_gc_metrics () = Atomic.set needs_gc_metrics true
|
||||
|
||||
let additional_metrics () : Metrics.resource_metrics list =
|
||||
(* add exporter metrics to the lot? *)
|
||||
let last_emit = Atomic.get last_sent_metrics in
|
||||
let now = Mtime_clock.now() in
|
||||
let now = Mtime_clock.now () in
|
||||
let add_own_metrics =
|
||||
let elapsed = Mtime.span last_emit now in
|
||||
Mtime.Span.compare elapsed timeout_sent_metrics > 0
|
||||
|
|
@ -443,43 +481,63 @@ module Backend(Arg : sig val config : Config.t end)()
|
|||
if add_own_metrics then (
|
||||
let open OT.Metrics in
|
||||
Atomic.set last_sent_metrics now;
|
||||
[make_resource_metrics [
|
||||
sum ~name:"otel-export.dropped" ~is_monotonic:true [
|
||||
int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
|
||||
~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped);
|
||||
];
|
||||
sum ~name:"otel-export.errors" ~is_monotonic:true [
|
||||
int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
|
||||
~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors);
|
||||
];
|
||||
]]
|
||||
) else []
|
||||
[
|
||||
make_resource_metrics
|
||||
[
|
||||
sum ~name:"otel-export.dropped" ~is_monotonic:true
|
||||
[
|
||||
int
|
||||
~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
|
||||
~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped);
|
||||
];
|
||||
sum ~name:"otel-export.errors" ~is_monotonic:true
|
||||
[
|
||||
int
|
||||
~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
|
||||
~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors);
|
||||
];
|
||||
];
|
||||
]
|
||||
) else
|
||||
[]
|
||||
|
||||
let send_metrics : Metrics.resource_metrics list sender = {
|
||||
send=fun m ~ret ->
|
||||
let@() = with_lock_ in
|
||||
if !debug_ then Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) m;
|
||||
let send_metrics : Metrics.resource_metrics list sender =
|
||||
{
|
||||
send =
|
||||
(fun m ~ret ->
|
||||
let@ () = Lock.with_lock in
|
||||
if !debug_ then
|
||||
Format.eprintf "send metrics %a@."
|
||||
(Format.pp_print_list Metrics.pp_resource_metrics)
|
||||
m;
|
||||
|
||||
let m = List.rev_append (additional_metrics()) m in
|
||||
push_metrics m;
|
||||
ret()
|
||||
}
|
||||
let m = List.rev_append (additional_metrics ()) m in
|
||||
push_metrics m;
|
||||
ret ());
|
||||
}
|
||||
end
|
||||
|
||||
let setup_ ~(config:Config.t) () =
|
||||
let setup_ ~(config : Config.t) () =
|
||||
debug_ := config.debug;
|
||||
let module B = Backend(struct let config=config end)() in
|
||||
let module B =
|
||||
Backend
|
||||
(struct
|
||||
let config = config
|
||||
end)
|
||||
()
|
||||
in
|
||||
Opentelemetry.Collector.set_backend (module B);
|
||||
B.cleanup
|
||||
|
||||
let setup ?(config=Config.make()) ?(enable=true) () =
|
||||
let setup ?(config = Config.make ()) ?(enable = true) () =
|
||||
if enable then (
|
||||
let cleanup = setup_ ~config () in
|
||||
at_exit cleanup
|
||||
)
|
||||
|
||||
let with_setup ?(config=Config.make()) ?(enable=true) () f =
|
||||
let with_setup ?(config = Config.make ()) ?(enable = true) () f =
|
||||
if enable then (
|
||||
let cleanup = setup_ ~config () in
|
||||
Fun.protect ~finally:cleanup f
|
||||
) else f()
|
||||
) else
|
||||
f ()
|
||||
|
|
|
|||
2
src/dune
2
src/dune
|
|
@ -2,7 +2,7 @@
|
|||
(name opentelemetry)
|
||||
(synopsis "API for opentelemetry instrumentation")
|
||||
(flags :standard -warn-error -a+8)
|
||||
(libraries ptime ptime.clock.os pbrt)
|
||||
(libraries ptime ptime.clock.os pbrt threads)
|
||||
(public_name opentelemetry))
|
||||
|
||||
; ### protobuf rules ###
|
||||
|
|
|
|||
11
src/lock.ml
Normal file
11
src/lock.ml
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
let lock_ : (unit -> unit) ref = ref ignore
|
||||
|
||||
let unlock_ : (unit -> unit) ref = ref ignore
|
||||
|
||||
let set_mutex ~lock ~unlock : unit =
|
||||
lock_ := lock;
|
||||
unlock_ := unlock
|
||||
|
||||
let[@inline] with_lock f =
|
||||
!lock_ ();
|
||||
Fun.protect ~finally:!unlock_ f
|
||||
7
src/lock.mli
Normal file
7
src/lock.mli
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit
|
||||
(** Set a pair of lock/unlock functions that are used to
|
||||
protect access to global state, if needed. By default these do nothing. *)
|
||||
|
||||
val with_lock : (unit -> 'a) -> 'a
|
||||
(** Call [f()] while holding the mutex defined {!set_mutex}, then
|
||||
release the mutex. *)
|
||||
|
|
@ -1,6 +1,11 @@
|
|||
|
||||
(** Opentelemetry types and instrumentation *)
|
||||
|
||||
module Lock = Lock
|
||||
(** Global lock *)
|
||||
|
||||
module Rand_bytes = Rand_bytes
|
||||
(** Generation of random identifiers *)
|
||||
|
||||
(** {2 Wire format} *)
|
||||
|
||||
(** Protobuf types *)
|
||||
|
|
@ -56,11 +61,12 @@ end
|
|||
in nanoseconds. *)
|
||||
module Timestamp_ns = struct
|
||||
type t = int64
|
||||
|
||||
let ns_in_a_day = Int64.(mul 1_000_000_000L (of_int (24 * 3600)))
|
||||
|
||||
(** Current unix timestamp in nanoseconds *)
|
||||
let[@inline] now_unix_ns () : t =
|
||||
let span = Ptime_clock.now() |> Ptime.to_span in
|
||||
let span = Ptime_clock.now () |> Ptime.to_span in
|
||||
let d, ps = Ptime.Span.to_d_ps span in
|
||||
let d = Int64.(mul (of_int d) ns_in_a_day) in
|
||||
let ns = Int64.(div ps 1_000L) in
|
||||
|
|
@ -78,6 +84,7 @@ end
|
|||
module Collector = struct
|
||||
open Proto
|
||||
|
||||
type 'msg sender = { send: 'a. 'msg -> ret:(unit -> 'a) -> 'a }
|
||||
(** Sender interface for a message of type [msg].
|
||||
Inspired from Logs' reporter
|
||||
(see {{:https://erratique.ch/software/logs/doc/Logs/index.html#sync} its doc})
|
||||
|
|
@ -90,9 +97,6 @@ module Collector = struct
|
|||
It doesn't mean the event has been collected yet, it
|
||||
could sit in a batch queue for a little while.
|
||||
*)
|
||||
type 'msg sender = {
|
||||
send: 'a. 'msg -> ret:(unit -> 'a) -> 'a;
|
||||
}
|
||||
|
||||
(** Collector client interface. *)
|
||||
module type BACKEND = sig
|
||||
|
|
@ -100,12 +104,6 @@ module Collector = struct
|
|||
|
||||
val send_metrics : Metrics.resource_metrics list sender
|
||||
|
||||
val rand_bytes_16 : unit -> bytes
|
||||
(** Generate 16 bytes of random data *)
|
||||
|
||||
val rand_bytes_8 : unit -> bytes
|
||||
(** Generate 16 bytes of random data *)
|
||||
|
||||
val signal_emit_gc_metrics : unit -> unit
|
||||
(** Signal the backend that it should emit GC metrics when it has the
|
||||
chance. This should be installed in a GC alarm or another form
|
||||
|
|
@ -134,7 +132,7 @@ module Collector = struct
|
|||
end
|
||||
|
||||
(** Set collector backend *)
|
||||
let set_backend (b:backend) : unit =
|
||||
let set_backend (b : backend) : unit =
|
||||
let (module B) = b in
|
||||
B.set_on_tick_callbacks on_tick_cbs_;
|
||||
backend := Some b
|
||||
|
|
@ -145,25 +143,19 @@ module Collector = struct
|
|||
(** Current backend, if any *)
|
||||
let[@inline] get_backend () : backend option = !backend
|
||||
|
||||
let send_trace (l:Trace.resource_spans list) ~ret =
|
||||
let send_trace (l : Trace.resource_spans list) ~ret =
|
||||
match !backend with
|
||||
| None -> ret()
|
||||
| None -> ret ()
|
||||
| Some (module B) -> B.send_trace.send l ~ret
|
||||
|
||||
let send_metrics (l:Metrics.resource_metrics list) ~ret =
|
||||
let send_metrics (l : Metrics.resource_metrics list) ~ret =
|
||||
match !backend with
|
||||
| None -> ret()
|
||||
| None -> ret ()
|
||||
| Some (module B) -> B.send_metrics.send l ~ret
|
||||
|
||||
let rand_bytes_16 () =
|
||||
match !backend with
|
||||
| None -> Bytes.make 16 '?'
|
||||
| Some (module B) -> B.rand_bytes_16()
|
||||
let[@inline] rand_bytes_16 () = !Rand_bytes.rand_bytes_16 ()
|
||||
|
||||
let rand_bytes_8 () =
|
||||
match !backend with
|
||||
| None -> Bytes.make 8 '?'
|
||||
| Some (module B) -> B.rand_bytes_8()
|
||||
let[@inline] rand_bytes_8 () = !Rand_bytes.rand_bytes_8 ()
|
||||
|
||||
let on_tick f = on_tick_cbs_ := f :: !on_tick_cbs_
|
||||
|
||||
|
|
@ -172,35 +164,38 @@ module Collector = struct
|
|||
let tick () =
|
||||
match !backend with
|
||||
| None -> ()
|
||||
| Some (module B) -> B.tick()
|
||||
| Some (module B) -> B.tick ()
|
||||
end
|
||||
|
||||
module Util_ = struct
|
||||
let bytes_to_hex (b:bytes) : string =
|
||||
let i_to_hex (i:int) =
|
||||
if i < 10 then Char.chr (i + Char.code '0')
|
||||
else Char.chr (i - 10 + Char.code 'a')
|
||||
let bytes_to_hex (b : bytes) : string =
|
||||
let i_to_hex (i : int) =
|
||||
if i < 10 then
|
||||
Char.chr (i + Char.code '0')
|
||||
else
|
||||
Char.chr (i - 10 + Char.code 'a')
|
||||
in
|
||||
|
||||
let res = Bytes.create (2 * Bytes.length b) in
|
||||
for i = 0 to Bytes.length b-1 do
|
||||
for i = 0 to Bytes.length b - 1 do
|
||||
let n = Char.code (Bytes.get b i) in
|
||||
Bytes.set res (2 * i) (i_to_hex ((n land 0xf0) lsr 4));
|
||||
Bytes.set res (2 * i + 1) (i_to_hex (n land 0x0f));
|
||||
Bytes.set res ((2 * i) + 1) (i_to_hex (n land 0x0f))
|
||||
done;
|
||||
Bytes.unsafe_to_string res
|
||||
|
||||
let bytes_of_hex (s:string) : bytes =
|
||||
let bytes_of_hex (s : string) : bytes =
|
||||
let n_of_c = function
|
||||
| '0' .. '9' as c -> Char.code c - Char.code '0'
|
||||
| 'a' .. 'f' as c -> 10 + Char.code c - Char.code 'a'
|
||||
| _ -> raise (Invalid_argument "invalid hex char")
|
||||
in
|
||||
if (String.length s mod 2 <> 0) then raise (Invalid_argument "hex sequence must be of even length");
|
||||
if String.length s mod 2 <> 0 then
|
||||
raise (Invalid_argument "hex sequence must be of even length");
|
||||
let res = Bytes.make (String.length s / 2) '\x00' in
|
||||
for i=0 to String.length s/2-1 do
|
||||
let n1 = n_of_c (String.get s (2*i)) in
|
||||
let n2 = n_of_c (String.get s (2*i+1)) in
|
||||
for i = 0 to (String.length s / 2) - 1 do
|
||||
let n1 = n_of_c (String.get s (2 * i)) in
|
||||
let n2 = n_of_c (String.get s ((2 * i) + 1)) in
|
||||
let n = (n1 lsl 4) lor n2 in
|
||||
Bytes.set res i (Char.chr n)
|
||||
done;
|
||||
|
|
@ -214,40 +209,74 @@ end
|
|||
This 16 bytes identifier is shared by all spans in one trace. *)
|
||||
module Trace_id : sig
|
||||
type t
|
||||
|
||||
val create : unit -> t
|
||||
|
||||
val pp : Format.formatter -> t -> unit
|
||||
|
||||
val to_bytes : t -> bytes
|
||||
|
||||
val of_bytes : bytes -> t
|
||||
|
||||
val to_hex : t -> string
|
||||
|
||||
val of_hex : string -> t
|
||||
end = struct
|
||||
open Proto.Trace
|
||||
|
||||
type t = bytes
|
||||
|
||||
let to_bytes self = self
|
||||
let create () : t = Collector.rand_bytes_16()
|
||||
let of_bytes b = if Bytes.length b=16 then b else raise (Invalid_argument "trace IDs must be 16 bytes in length")
|
||||
|
||||
let create () : t = Collector.rand_bytes_16 ()
|
||||
|
||||
let of_bytes b =
|
||||
if Bytes.length b = 16 then
|
||||
b
|
||||
else
|
||||
raise (Invalid_argument "trace IDs must be 16 bytes in length")
|
||||
|
||||
let to_hex self = Util_.bytes_to_hex self
|
||||
|
||||
let of_hex s = of_bytes (Util_.bytes_of_hex s)
|
||||
|
||||
let pp fmt t = Format.fprintf fmt "%s" (to_hex t)
|
||||
end
|
||||
|
||||
(** Unique ID of a span. *)
|
||||
module Span_id : sig
|
||||
type t
|
||||
|
||||
val create : unit -> t
|
||||
|
||||
val pp : Format.formatter -> t -> unit
|
||||
|
||||
val to_bytes : t -> bytes
|
||||
|
||||
val of_bytes : bytes -> t
|
||||
|
||||
val to_hex : t -> string
|
||||
|
||||
val of_hex : string -> t
|
||||
end = struct
|
||||
open Proto.Trace
|
||||
|
||||
type t = bytes
|
||||
|
||||
let to_bytes self = self
|
||||
let create () : t = Collector.rand_bytes_8()
|
||||
let of_bytes b = if Bytes.length b=8 then b else raise (Invalid_argument "span IDs must be 8 bytes in length")
|
||||
|
||||
let create () : t = Collector.rand_bytes_8 ()
|
||||
|
||||
let of_bytes b =
|
||||
if Bytes.length b = 8 then
|
||||
b
|
||||
else
|
||||
raise (Invalid_argument "span IDs must be 8 bytes in length")
|
||||
|
||||
let to_hex self = Util_.bytes_to_hex self
|
||||
|
||||
let of_hex s = of_bytes (Util_.bytes_of_hex s)
|
||||
|
||||
let pp fmt t = Format.fprintf fmt "%s" (to_hex t)
|
||||
end
|
||||
|
||||
|
|
@ -258,12 +287,16 @@ module Conventions = struct
|
|||
module Process = struct
|
||||
module Runtime = struct
|
||||
let name = "process.runtime.name"
|
||||
|
||||
let version = "process.runtime.version"
|
||||
|
||||
let description = "process.runtime.description"
|
||||
end
|
||||
end
|
||||
|
||||
module Service = struct
|
||||
let name = "service.name"
|
||||
|
||||
let namespace = "service.namespace"
|
||||
end
|
||||
end
|
||||
|
|
@ -274,9 +307,13 @@ module Conventions = struct
|
|||
module Ocaml = struct
|
||||
module GC = struct
|
||||
let compactions = "process.runtime.ocaml.gc.compactions"
|
||||
|
||||
let major_collections = "process.runtime.ocaml.gc.major_collections"
|
||||
|
||||
let major_heap = "process.runtime.ocaml.gc.major_heap"
|
||||
|
||||
let minor_allocated = "process.runtime.ocaml.gc.minor_allocated"
|
||||
|
||||
let minor_collections = "process.runtime.ocaml.gc.minor_collections"
|
||||
end
|
||||
end
|
||||
|
|
@ -285,11 +322,17 @@ module Conventions = struct
|
|||
end
|
||||
end
|
||||
|
||||
type value = [`Int of int | `String of string | `Bool of bool | `None]
|
||||
type value =
|
||||
[ `Int of int
|
||||
| `String of string
|
||||
| `Bool of bool
|
||||
| `None
|
||||
]
|
||||
|
||||
type key_value = string * value
|
||||
|
||||
(**/**)
|
||||
|
||||
let _conv_value =
|
||||
let open Proto.Common in
|
||||
function
|
||||
|
|
@ -301,7 +344,8 @@ let _conv_value =
|
|||
(**/**)
|
||||
|
||||
(**/**)
|
||||
let _conv_key_value (k,v) =
|
||||
|
||||
let _conv_key_value (k, v) =
|
||||
let open Proto.Common in
|
||||
let value = _conv_value v in
|
||||
default_key_value ~key:k ~value ()
|
||||
|
|
@ -314,29 +358,30 @@ let _conv_key_value (k,v) =
|
|||
module Globals = struct
|
||||
open Proto.Common
|
||||
|
||||
let service_name = ref "unknown_service"
|
||||
(** Main service name metadata *)
|
||||
let service_name = ref "unknown_service"
|
||||
|
||||
let service_namespace = ref None
|
||||
(** Namespace for the service *)
|
||||
let service_namespace = ref None
|
||||
|
||||
let instrumentation_library =
|
||||
default_instrumentation_library
|
||||
~version:"0.1"
|
||||
~name:"ocaml-opentelemetry" ()
|
||||
default_instrumentation_library ~version:"0.1" ~name:"ocaml-opentelemetry"
|
||||
()
|
||||
|
||||
(** Global attributes, initially set
|
||||
via OTEL_RESOURCE_ATTRIBUTES and modifiable
|
||||
by the user code. They will be attached to each outgoing metrics/traces. *)
|
||||
let global_attributes : key_value list ref =
|
||||
let parse_pair s = match String.split_on_char '=' s with
|
||||
| [a;b] -> default_key_value ~key:a ~value:(Some (String_value b)) ()
|
||||
let parse_pair s =
|
||||
match String.split_on_char '=' s with
|
||||
| [ a; b ] -> default_key_value ~key:a ~value:(Some (String_value b)) ()
|
||||
| _ -> failwith (Printf.sprintf "invalid attribute: %S" s)
|
||||
in
|
||||
ref @@
|
||||
ref
|
||||
@@
|
||||
try
|
||||
Sys.getenv "OTEL_RESOURCE_ATTRIBUTES" |> String.split_on_char ','
|
||||
|> List.map parse_pair
|
||||
Sys.getenv "OTEL_RESOURCE_ATTRIBUTES"
|
||||
|> String.split_on_char ',' |> List.map parse_pair
|
||||
with _ -> []
|
||||
|
||||
(* add global attributes to this list *)
|
||||
|
|
@ -344,17 +389,20 @@ module Globals = struct
|
|||
let not_redundant kv = List.for_all (fun kv' -> kv.key <> kv'.key) into in
|
||||
List.rev_append (List.filter not_redundant !global_attributes) into
|
||||
|
||||
let mk_attributes ?(service_name = !service_name) ?(attrs=[]) () : _ list =
|
||||
let mk_attributes ?(service_name = !service_name) ?(attrs = []) () : _ list =
|
||||
let l = List.map _conv_key_value attrs in
|
||||
let l =
|
||||
default_key_value ~key:Conventions.Attributes.Service.name
|
||||
~value:(Some (String_value service_name)) () :: l
|
||||
~value:(Some (String_value service_name)) ()
|
||||
:: l
|
||||
in
|
||||
let l = match !service_namespace with
|
||||
let l =
|
||||
match !service_namespace with
|
||||
| None -> l
|
||||
| Some v ->
|
||||
default_key_value ~key:Conventions.Attributes.Service.namespace
|
||||
~value:(Some (String_value v)) () :: l
|
||||
~value:(Some (String_value v)) ()
|
||||
:: l
|
||||
in
|
||||
l |> merge_global_attributes_
|
||||
end
|
||||
|
|
@ -367,22 +415,18 @@ end
|
|||
belong in a span. *)
|
||||
module Event : sig
|
||||
open Proto.Trace
|
||||
|
||||
type t = span_event
|
||||
|
||||
val make :
|
||||
?time_unix_nano:Timestamp_ns.t ->
|
||||
?attrs:key_value list ->
|
||||
string ->
|
||||
t
|
||||
|
||||
?time_unix_nano:Timestamp_ns.t -> ?attrs:key_value list -> string -> t
|
||||
end = struct
|
||||
open Proto.Trace
|
||||
|
||||
type t = span_event
|
||||
|
||||
let make
|
||||
?(time_unix_nano=Timestamp_ns.now_unix_ns())
|
||||
?(attrs=[])
|
||||
(name:string) : t =
|
||||
let make ?(time_unix_nano = Timestamp_ns.now_unix_ns ()) ?(attrs = [])
|
||||
(name : string) : t =
|
||||
let attrs = List.map _conv_key_value attrs in
|
||||
default_span_event ~time_unix_nano ~name ~attributes:attrs ()
|
||||
end
|
||||
|
|
@ -397,6 +441,7 @@ module Span : sig
|
|||
open Proto.Trace
|
||||
|
||||
type t = span
|
||||
|
||||
type id = Span_id.t
|
||||
|
||||
type nonrec kind = span_span_kind =
|
||||
|
|
@ -419,7 +464,8 @@ module Span : sig
|
|||
|
||||
val id : t -> Span_id.t
|
||||
|
||||
type key_value = string * [`Int of int | `String of string | `Bool of bool | `None]
|
||||
type key_value =
|
||||
string * [ `Int of int | `String of string | `Bool of bool | `None ]
|
||||
|
||||
val create :
|
||||
?kind:kind ->
|
||||
|
|
@ -433,8 +479,9 @@ module Span : sig
|
|||
?links:(Trace_id.t * Span_id.t * string) list ->
|
||||
start_time:Timestamp_ns.t ->
|
||||
end_time:Timestamp_ns.t ->
|
||||
string -> t * id
|
||||
(** [create ~trace_id name] creates a new span with its unique ID.
|
||||
string ->
|
||||
t * id
|
||||
(** [create ~trace_id name] creates a new span with its unique ID.
|
||||
@param trace_id the trace this belongs to
|
||||
@param parent parent span, if any
|
||||
@param links list of links to other spans, each with their trace state
|
||||
|
|
@ -443,6 +490,7 @@ end = struct
|
|||
open Proto.Trace
|
||||
|
||||
type t = span
|
||||
|
||||
type id = Span_id.t
|
||||
|
||||
type nonrec kind = span_span_kind =
|
||||
|
|
@ -453,7 +501,8 @@ end = struct
|
|||
| Span_kind_producer
|
||||
| Span_kind_consumer
|
||||
|
||||
type key_value = string * [`Int of int | `String of string | `Bool of bool | `None]
|
||||
type key_value =
|
||||
string * [ `Int of int | `String of string | `Bool of bool | `None ]
|
||||
|
||||
type nonrec status_code = status_status_code =
|
||||
| Status_code_unset
|
||||
|
|
@ -465,40 +514,26 @@ end = struct
|
|||
code: status_code;
|
||||
}
|
||||
|
||||
|
||||
let id self = Span_id.of_bytes self.span_id
|
||||
|
||||
let create
|
||||
?(kind=Span_kind_unspecified)
|
||||
?(id=Span_id.create())
|
||||
?trace_state
|
||||
?(attrs=[])
|
||||
?(events=[])
|
||||
?status
|
||||
~trace_id ?parent ?(links=[])
|
||||
~start_time ~end_time
|
||||
name : t * id =
|
||||
let create ?(kind = Span_kind_unspecified) ?(id = Span_id.create ())
|
||||
?trace_state ?(attrs = []) ?(events = []) ?status ~trace_id ?parent
|
||||
?(links = []) ~start_time ~end_time name : t * id =
|
||||
let trace_id = Trace_id.to_bytes trace_id in
|
||||
let parent_span_id = Option.map Span_id.to_bytes parent in
|
||||
let attributes = List.map _conv_key_value attrs in
|
||||
let links =
|
||||
List.map
|
||||
(fun (trace_id,span_id,trace_state) ->
|
||||
let trace_id = Trace_id.to_bytes trace_id in
|
||||
let span_id = Span_id.to_bytes span_id in
|
||||
default_span_link ~trace_id ~span_id ~trace_state())
|
||||
(fun (trace_id, span_id, trace_state) ->
|
||||
let trace_id = Trace_id.to_bytes trace_id in
|
||||
let span_id = Span_id.to_bytes span_id in
|
||||
default_span_link ~trace_id ~span_id ~trace_state ())
|
||||
links
|
||||
in
|
||||
let span =
|
||||
default_span
|
||||
~trace_id ?parent_span_id
|
||||
~span_id:(Span_id.to_bytes id)
|
||||
~attributes ~events
|
||||
?trace_state ~status
|
||||
~kind ~name ~links
|
||||
~start_time_unix_nano:start_time
|
||||
~end_time_unix_nano:end_time
|
||||
()
|
||||
default_span ~trace_id ?parent_span_id ~span_id:(Span_id.to_bytes id)
|
||||
~attributes ~events ?trace_state ~status ~kind ~name ~links
|
||||
~start_time_unix_nano:start_time ~end_time_unix_nano:end_time ()
|
||||
in
|
||||
span, id
|
||||
end
|
||||
|
|
@ -514,49 +549,47 @@ module Trace = struct
|
|||
let make_resource_spans ?service_name ?attrs spans =
|
||||
let ils =
|
||||
default_instrumentation_library_spans
|
||||
~instrumentation_library:(Some Globals.instrumentation_library)
|
||||
~spans () in
|
||||
~instrumentation_library:(Some Globals.instrumentation_library) ~spans
|
||||
()
|
||||
in
|
||||
let attributes = Globals.mk_attributes ?service_name ?attrs () in
|
||||
let resource = Proto.Resource.default_resource ~attributes () in
|
||||
default_resource_spans
|
||||
~resource:(Some resource) ~instrumentation_library_spans:[ils] ()
|
||||
default_resource_spans ~resource:(Some resource)
|
||||
~instrumentation_library_spans:[ ils ] ()
|
||||
|
||||
(** Sync emitter *)
|
||||
let emit ?service_name ?attrs (spans:span list) : unit =
|
||||
let emit ?service_name ?attrs (spans : span list) : unit =
|
||||
let rs = make_resource_spans ?service_name ?attrs spans in
|
||||
Collector.send_trace [rs] ~ret:(fun () -> ())
|
||||
Collector.send_trace [ rs ] ~ret:(fun () -> ())
|
||||
|
||||
(** Scope to be used with {!with_}. *)
|
||||
type scope = {
|
||||
trace_id: Trace_id.t;
|
||||
span_id: Span_id.t;
|
||||
mutable events: Event.t list;
|
||||
mutable attrs: Span.key_value list
|
||||
mutable attrs: Span.key_value list;
|
||||
}
|
||||
(** Scope to be used with {!with_}. *)
|
||||
|
||||
(** Add an event to the scope. It will be aggregated into the span.
|
||||
|
||||
Note that this takes a function that produces an event, and will only
|
||||
call it if there is an instrumentation backend. *)
|
||||
let[@inline] add_event (scope:scope) (ev:unit -> Event.t) : unit =
|
||||
if Collector.has_backend() then (
|
||||
scope.events <- ev() :: scope.events
|
||||
)
|
||||
let[@inline] add_event (scope : scope) (ev : unit -> Event.t) : unit =
|
||||
if Collector.has_backend () then scope.events <- ev () :: scope.events
|
||||
|
||||
(** Add an attr to the scope. It will be aggregated into the span.
|
||||
|
||||
Note that this takes a function that produces attributes, and will only
|
||||
call it if there is an instrumentation backend. *)
|
||||
let[@inline] add_attrs (scope:scope) (attrs:unit -> Span.key_value list) : unit =
|
||||
if Collector.has_backend() then (
|
||||
let[@inline] add_attrs (scope : scope) (attrs : unit -> Span.key_value list) :
|
||||
unit =
|
||||
if Collector.has_backend () then
|
||||
scope.attrs <- List.rev_append (attrs ()) scope.attrs
|
||||
)
|
||||
|
||||
(** Sync span guard *)
|
||||
let with_
|
||||
?trace_state ?service_name ?(attrs: (string*[<value]) list = [])
|
||||
?kind ?trace_id ?parent ?scope ?links
|
||||
name (f: scope -> 'a) : 'a =
|
||||
let with_ ?trace_state ?service_name
|
||||
?(attrs : (string * [< value ]) list = []) ?kind ?trace_id ?parent ?scope
|
||||
?links name (f : scope -> 'a) : 'a =
|
||||
let trace_id =
|
||||
match trace_id, scope with
|
||||
| Some trace_id, _ -> trace_id
|
||||
|
|
@ -569,25 +602,26 @@ module Trace = struct
|
|||
| None, Some scope -> Some scope.span_id
|
||||
| None, None -> None
|
||||
in
|
||||
let start_time = Timestamp_ns.now_unix_ns() in
|
||||
let span_id = Span_id.create() in
|
||||
let scope = {trace_id;span_id;events=[]; attrs} in
|
||||
let start_time = Timestamp_ns.now_unix_ns () in
|
||||
let span_id = Span_id.create () in
|
||||
let scope = { trace_id; span_id; events = []; attrs } in
|
||||
|
||||
(* called once we're done, to emit a span *)
|
||||
let finally res =
|
||||
let status = match res with
|
||||
let status =
|
||||
match res with
|
||||
| Ok () -> default_status ~code:Status_code_ok ()
|
||||
| Error e -> default_status ~code:Status_code_error ~message:e () in
|
||||
| Error e -> default_status ~code:Status_code_error ~message:e ()
|
||||
in
|
||||
let span, _ =
|
||||
(* TODO: should the attrs passed to with_ go on the Span (in Span.create) or on the ResourceSpan (in emit)?
|
||||
(question also applies to Opentelemetry_lwt.Trace.with) *)
|
||||
Span.create
|
||||
?kind ~trace_id ?parent ?links ~id:span_id
|
||||
?trace_state ~attrs:scope.attrs ~events:scope.events
|
||||
~start_time ~end_time:(Timestamp_ns.now_unix_ns())
|
||||
~status
|
||||
name in
|
||||
emit ?service_name [span];
|
||||
Span.create ?kind ~trace_id ?parent ?links ~id:span_id ?trace_state
|
||||
~attrs:scope.attrs ~events:scope.events ~start_time
|
||||
~end_time:(Timestamp_ns.now_unix_ns ())
|
||||
~status name
|
||||
in
|
||||
emit ?service_name [ span ]
|
||||
in
|
||||
try
|
||||
let x = f scope in
|
||||
|
|
@ -606,39 +640,36 @@ end
|
|||
module Metrics = struct
|
||||
open Metrics_types
|
||||
|
||||
type t = Metrics_types.metric
|
||||
(** A single metric, measuring some time-varying quantity or statistical
|
||||
distribution. It is composed of one or more data points that have
|
||||
precise values and time stamps. Each distinct metric should have a
|
||||
distinct name. *)
|
||||
type t = Metrics_types.metric
|
||||
|
||||
open struct
|
||||
let _program_start = Timestamp_ns.now_unix_ns()
|
||||
let _program_start = Timestamp_ns.now_unix_ns ()
|
||||
end
|
||||
|
||||
(** Number data point, as a float *)
|
||||
let float ?(start_time_unix_nano=_program_start)
|
||||
?(now=Timestamp_ns.now_unix_ns())
|
||||
?(attrs=[])
|
||||
(d:float) : number_data_point =
|
||||
let float ?(start_time_unix_nano = _program_start)
|
||||
?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) (d : float) :
|
||||
number_data_point =
|
||||
let attributes = attrs |> List.map _conv_key_value in
|
||||
default_number_data_point
|
||||
~start_time_unix_nano ~time_unix_nano:now
|
||||
~attributes
|
||||
~value:(As_double d) ()
|
||||
default_number_data_point ~start_time_unix_nano ~time_unix_nano:now
|
||||
~attributes ~value:(As_double d) ()
|
||||
|
||||
(** Number data point, as an int *)
|
||||
let int ?(start_time_unix_nano=_program_start)
|
||||
?(now=Timestamp_ns.now_unix_ns())
|
||||
?(attrs=[])
|
||||
(i:int) : number_data_point =
|
||||
let int ?(start_time_unix_nano = _program_start)
|
||||
?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) (i : int) :
|
||||
number_data_point =
|
||||
let attributes = attrs |> List.map _conv_key_value in
|
||||
default_number_data_point ~start_time_unix_nano ~time_unix_nano:now
|
||||
~attributes
|
||||
~value:(As_int (Int64.of_int i)) ()
|
||||
~value:(As_int (Int64.of_int i))
|
||||
()
|
||||
|
||||
(** Aggregation of a scalar metric, always with the current value *)
|
||||
let gauge ~name ?description ?unit_ (l:number_data_point list) : t =
|
||||
let gauge ~name ?description ?unit_ (l : number_data_point list) : t =
|
||||
let data = Gauge (default_gauge ~data_points:l ()) in
|
||||
default_metric ~name ?description ?unit_ ~data ()
|
||||
|
||||
|
|
@ -649,12 +680,11 @@ module Metrics = struct
|
|||
|
||||
(** Sum of all reported measurements over a time interval *)
|
||||
let sum ~name ?description ?unit_
|
||||
?(aggregation_temporality=Aggregation_temporality_cumulative)
|
||||
?is_monotonic
|
||||
(l:number_data_point list) : t =
|
||||
?(aggregation_temporality = Aggregation_temporality_cumulative)
|
||||
?is_monotonic (l : number_data_point list) : t =
|
||||
let data =
|
||||
Sum (default_sum ~data_points:l ?is_monotonic
|
||||
~aggregation_temporality ()) in
|
||||
Sum (default_sum ~data_points:l ?is_monotonic ~aggregation_temporality ())
|
||||
in
|
||||
default_metric ~name ?description ?unit_ ~data ()
|
||||
|
||||
(** Histogram data
|
||||
|
|
@ -664,26 +694,19 @@ module Metrics = struct
|
|||
the counts must be equal to [count].
|
||||
length must be [1+length explicit_bounds]
|
||||
@param explicit_bounds strictly increasing list of bounds for the buckets *)
|
||||
let histogram_data_point
|
||||
?(start_time_unix_nano=_program_start)
|
||||
?(now=Timestamp_ns.now_unix_ns())
|
||||
?(attrs=[])
|
||||
?(exemplars=[])
|
||||
?(explicit_bounds=[])
|
||||
?sum
|
||||
~bucket_counts
|
||||
~count
|
||||
() : histogram_data_point =
|
||||
let histogram_data_point ?(start_time_unix_nano = _program_start)
|
||||
?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) ?(exemplars = [])
|
||||
?(explicit_bounds = []) ?sum ~bucket_counts ~count () :
|
||||
histogram_data_point =
|
||||
let attributes = attrs |> List.map _conv_key_value in
|
||||
default_histogram_data_point ~start_time_unix_nano ~time_unix_nano:now
|
||||
~attributes ~exemplars ~bucket_counts ~explicit_bounds ~count ?sum ()
|
||||
|
||||
let histogram ~name ?description ?unit_
|
||||
?aggregation_temporality
|
||||
(l:histogram_data_point list) : t =
|
||||
let histogram ~name ?description ?unit_ ?aggregation_temporality
|
||||
(l : histogram_data_point list) : t =
|
||||
let data =
|
||||
Histogram (default_histogram ~data_points:l
|
||||
?aggregation_temporality ()) in
|
||||
Histogram (default_histogram ~data_points:l ?aggregation_temporality ())
|
||||
in
|
||||
default_metric ~name ?description ?unit_ ~data ()
|
||||
|
||||
(* TODO: exponential history *)
|
||||
|
|
@ -691,22 +714,24 @@ module Metrics = struct
|
|||
(* TODO: exemplar *)
|
||||
|
||||
(** Aggregate metrics into a {!Proto.Metrics.resource_metrics} *)
|
||||
let make_resource_metrics ?service_name ?attrs (l:t list) : resource_metrics =
|
||||
let make_resource_metrics ?service_name ?attrs (l : t list) : resource_metrics
|
||||
=
|
||||
let lm =
|
||||
default_instrumentation_library_metrics
|
||||
~instrumentation_library:(Some Globals.instrumentation_library)
|
||||
~metrics:l () in
|
||||
~metrics:l ()
|
||||
in
|
||||
let attributes = Globals.mk_attributes ?service_name ?attrs () in
|
||||
let resource = Proto.Resource.default_resource ~attributes () in
|
||||
default_resource_metrics
|
||||
~instrumentation_library_metrics:[lm] ~resource:(Some resource) ()
|
||||
default_resource_metrics ~instrumentation_library_metrics:[ lm ]
|
||||
~resource:(Some resource) ()
|
||||
|
||||
(** Emit some metrics to the collector (sync). This blocks until
|
||||
the backend has pushed the metrics into some internal queue, or
|
||||
discarded them. *)
|
||||
let emit ?attrs (l:t list) : unit =
|
||||
let emit ?attrs (l : t list) : unit =
|
||||
let rm = make_resource_metrics ?attrs l in
|
||||
Collector.send_metrics [rm] ~ret:ignore
|
||||
Collector.send_metrics [ rm ] ~ret:ignore
|
||||
end
|
||||
|
||||
(** A set of callbacks that produce metrics when called.
|
||||
|
|
@ -721,23 +746,19 @@ module Metrics_callbacks = struct
|
|||
let cbs_ : (unit -> Metrics.t list) list ref = ref []
|
||||
end
|
||||
|
||||
|
||||
(** [register f] adds the callback [f] to the list.
|
||||
[f] will be called at unspecified times and is expected to return
|
||||
a list of metrics. *)
|
||||
let register f : unit =
|
||||
if !cbs_ = [] then (
|
||||
if !cbs_ = [] then
|
||||
(* make sure we call [f] (and others) at each tick *)
|
||||
Collector.on_tick (fun () ->
|
||||
let m = List.map (fun f -> f()) !cbs_ |> List.flatten in
|
||||
Metrics.emit m)
|
||||
);
|
||||
let m = List.map (fun f -> f ()) !cbs_ |> List.flatten in
|
||||
Metrics.emit m);
|
||||
cbs_ := f :: !cbs_
|
||||
end
|
||||
|
||||
module Logs = struct
|
||||
|
||||
end
|
||||
module Logs = struct end
|
||||
|
||||
(** {2 Utils} *)
|
||||
|
||||
|
|
@ -746,12 +767,10 @@ end
|
|||
https://www.w3.org/TR/trace-context/
|
||||
*)
|
||||
module Trace_context = struct
|
||||
|
||||
(** The traceparent header
|
||||
https://www.w3.org/TR/trace-context/#traceparent-header
|
||||
*)
|
||||
module Traceparent = struct
|
||||
|
||||
let name = "traceparent"
|
||||
|
||||
(** Parse the value of the traceparent header.
|
||||
|
|
@ -782,33 +801,40 @@ module Trace_context = struct
|
|||
let consume expected ~offset ~or_ =
|
||||
let len = String.length expected in
|
||||
let* str, offset = blit ~offset ~len ~or_ in
|
||||
if str = expected then Ok offset else Error or_
|
||||
if str = expected then
|
||||
Ok offset
|
||||
else
|
||||
Error or_
|
||||
in
|
||||
let offset = 0 in
|
||||
let* offset = consume "00" ~offset ~or_:"Expected version 00" in
|
||||
let* offset = consume "-" ~offset ~or_:"Expected delimiter" in
|
||||
let* trace_id, offset = blit ~offset ~len:32 ~or_:"Expected 32-digit trace-id" in
|
||||
let* trace_id, offset =
|
||||
blit ~offset ~len:32 ~or_:"Expected 32-digit trace-id"
|
||||
in
|
||||
let* trace_id =
|
||||
match Trace_id.of_hex trace_id with
|
||||
| trace_id -> Ok trace_id
|
||||
| exception Invalid_argument _ -> Error "Expected hex-encoded trace-id"
|
||||
in
|
||||
let* offset = consume "-" ~offset ~or_:"Expected delimiter" in
|
||||
let* parent_id, offset = blit ~offset ~len:16 ~or_:"Expected 16-digit parent-id" in
|
||||
let* parent_id, offset =
|
||||
blit ~offset ~len:16 ~or_:"Expected 16-digit parent-id"
|
||||
in
|
||||
let* parent_id =
|
||||
match Span_id.of_hex parent_id with
|
||||
| parent_id -> Ok parent_id
|
||||
| exception Invalid_argument _ -> Error "Expected hex-encoded parent-id"
|
||||
in
|
||||
let* offset = consume "-" ~offset ~or_:"Expected delimiter" in
|
||||
let* _flags, _offset = blit ~offset ~len:2 ~or_:"Expected 2-digit flags" in
|
||||
let* _flags, _offset =
|
||||
blit ~offset ~len:2 ~or_:"Expected 2-digit flags"
|
||||
in
|
||||
Ok (trace_id, parent_id)
|
||||
|
||||
let to_value ~(trace_id : Trace_id.t) ~(parent_id : Span_id.t) () : string =
|
||||
Printf.sprintf "00-%s-%s-00"
|
||||
(Trace_id.to_hex trace_id)
|
||||
Printf.sprintf "00-%s-%s-00" (Trace_id.to_hex trace_id)
|
||||
(Span_id.to_hex parent_id)
|
||||
|
||||
end
|
||||
end
|
||||
|
||||
|
|
@ -828,29 +854,32 @@ end = struct
|
|||
(** See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/process.md#process-runtimes *)
|
||||
let runtime_attributes =
|
||||
lazy
|
||||
Conventions.Attributes.[
|
||||
(Process.Runtime.name, `String "ocaml");
|
||||
(Process.Runtime.version, `String Sys.ocaml_version);
|
||||
]
|
||||
Conventions.Attributes.
|
||||
[
|
||||
Process.Runtime.name, `String "ocaml";
|
||||
Process.Runtime.version, `String Sys.ocaml_version;
|
||||
]
|
||||
|
||||
let get_runtime_attributes () = Lazy.force runtime_attributes
|
||||
|
||||
let basic_setup () =
|
||||
(* emit metrics when GC is called *)
|
||||
let on_gc() =
|
||||
match Collector.get_backend() with
|
||||
let on_gc () =
|
||||
match Collector.get_backend () with
|
||||
| None -> ()
|
||||
| Some (module C) -> C.signal_emit_gc_metrics()
|
||||
| Some (module C) -> C.signal_emit_gc_metrics ()
|
||||
in
|
||||
ignore (Gc.create_alarm on_gc : Gc.alarm)
|
||||
|
||||
let bytes_per_word = Sys.word_size / 8
|
||||
|
||||
let word_to_bytes n = n * bytes_per_word
|
||||
|
||||
let word_to_bytes_f n = n *. float bytes_per_word
|
||||
|
||||
let get_metrics () : Metrics.t list =
|
||||
let gc = Gc.quick_stat () in
|
||||
let now = Timestamp_ns.now_unix_ns() in
|
||||
let now = Timestamp_ns.now_unix_ns () in
|
||||
let open Metrics in
|
||||
let open Conventions.Metrics in
|
||||
[
|
||||
|
|
@ -858,8 +887,7 @@ end = struct
|
|||
[ int ~now (word_to_bytes gc.Gc.heap_words) ];
|
||||
sum ~name:Process.Runtime.Ocaml.GC.minor_allocated
|
||||
~aggregation_temporality:Metrics.Aggregation_temporality_cumulative
|
||||
~is_monotonic:true
|
||||
~unit_:"B"
|
||||
~is_monotonic:true ~unit_:"B"
|
||||
[ float ~now (word_to_bytes_f gc.Gc.minor_words) ];
|
||||
sum ~name:Process.Runtime.Ocaml.GC.minor_collections
|
||||
~aggregation_temporality:Metrics.Aggregation_temporality_cumulative
|
||||
|
|
|
|||
38
src/rand_bytes.ml
Normal file
38
src/rand_bytes.ml
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
(* generate random IDs *)
|
||||
let rand_ = Random.State.make_self_init ()
|
||||
|
||||
let[@inline] ( let@ ) f x = f x
|
||||
|
||||
let default_rand_bytes_8 () : bytes =
|
||||
let@ () = Lock.with_lock in
|
||||
let b = Bytes.create 8 in
|
||||
for i = 0 to 1 do
|
||||
let r = Random.State.bits rand_ in
|
||||
(* 30 bits, of which we use 24 *)
|
||||
Bytes.set b (i * 3) (Char.chr (r land 0xff));
|
||||
Bytes.set b ((i * 3) + 1) (Char.chr ((r lsr 8) land 0xff));
|
||||
Bytes.set b ((i * 3) + 2) (Char.chr ((r lsr 16) land 0xff))
|
||||
done;
|
||||
let r = Random.State.bits rand_ in
|
||||
Bytes.set b 6 (Char.chr (r land 0xff));
|
||||
Bytes.set b 7 (Char.chr ((r lsr 8) land 0xff));
|
||||
b
|
||||
|
||||
let default_rand_bytes_16 () : bytes =
|
||||
let@ () = Lock.with_lock in
|
||||
let b = Bytes.create 16 in
|
||||
for i = 0 to 4 do
|
||||
let r = Random.State.bits rand_ in
|
||||
(* 30 bits, of which we use 24 *)
|
||||
Bytes.set b (i * 3) (Char.chr (r land 0xff));
|
||||
Bytes.set b ((i * 3) + 1) (Char.chr ((r lsr 8) land 0xff));
|
||||
Bytes.set b ((i * 3) + 2) (Char.chr ((r lsr 16) land 0xff))
|
||||
done;
|
||||
let r = Random.State.bits rand_ in
|
||||
Bytes.set b 15 (Char.chr (r land 0xff));
|
||||
(* last byte *)
|
||||
b
|
||||
|
||||
let rand_bytes_16 = ref default_rand_bytes_16
|
||||
|
||||
let rand_bytes_8 = ref default_rand_bytes_8
|
||||
9
src/rand_bytes.mli
Normal file
9
src/rand_bytes.mli
Normal file
|
|
@ -0,0 +1,9 @@
|
|||
val rand_bytes_16 : (unit -> bytes) ref
|
||||
(** Generate 16 bytes of random data *)
|
||||
|
||||
val rand_bytes_8 : (unit -> bytes) ref
|
||||
(** Generate 16 bytes of random data *)
|
||||
|
||||
val default_rand_bytes_8 : unit -> bytes
|
||||
|
||||
val default_rand_bytes_16 : unit -> bytes
|
||||
Loading…
Add table
Reference in a new issue