diff --git a/.ocamlformat b/.ocamlformat new file mode 100644 index 00000000..35daf11e --- /dev/null +++ b/.ocamlformat @@ -0,0 +1,14 @@ +version = 0.20.0 +profile=conventional +margin=80 +if-then-else=k-r +parens-ite=true +parens-tuple=multi-line-only +sequence-style=terminator +type-decl=sparse +break-cases=toplevel +cases-exp-indent=2 +field-space=tight-decl +leading-nested-match-parens=true +module-item-spacing=sparse +quiet=true diff --git a/src/client/common_.ml b/src/client/common_.ml index fb98369e..5b9a0010 100644 --- a/src/client/common_.ml +++ b/src/client/common_.ml @@ -1,21 +1,24 @@ module Atomic = Opentelemetry_atomic.Atomic +include Opentelemetry.Lock -let[@inline] (let@) f x = f x +let[@inline] ( let@ ) f x = f x -let debug_ = ref (match Sys.getenv_opt "OTEL_OCAML_DEBUG" with Some ("1"|"true") -> true | _ -> false) +let debug_ = + ref + (match Sys.getenv_opt "OTEL_OCAML_DEBUG" with + | Some ("1" | "true") -> true + | _ -> false) -let lock_ : (unit -> unit) ref = ref ignore -let unlock_ : (unit -> unit) ref = ref ignore +let default_url = "http://localhost:4318" -let set_mutex ~lock ~unlock : unit = - lock_ := lock; - unlock_ := unlock +let url = + ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url) -(* critical section for [f()] *) -let[@inline] with_lock_ f = - !lock_(); - Fun.protect ~finally:!unlock_ f +let get_url () = !url +let set_url s = url := s + +(** [with_mutex m f] calls [f()] in a section where [m] is locked. *) let[@inline] with_mutex_ m f = Mutex.lock m; Fun.protect ~finally:(fun () -> Mutex.unlock m) f @@ -25,11 +28,21 @@ let parse_headers s = String.split_on_char ',' s |> List.map parse_header let default_url = "http://localhost:4318" + let default_headers = [] -let url = ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url) -let headers = ref (try parse_headers (Sys.getenv "OTEL_EXPORTER_OTLP_HEADERS") with _ -> default_headers) + +let url = + ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url) + +let headers = + ref + (try parse_headers (Sys.getenv "OTEL_EXPORTER_OTLP_HEADERS") + with _ -> default_headers) + let get_url () = !url + let set_url s = url := s let get_headers () = !headers + let set_headers s = headers := s diff --git a/src/client/gen_ids.ml b/src/client/gen_ids.ml deleted file mode 100644 index 049f6efc..00000000 --- a/src/client/gen_ids.ml +++ /dev/null @@ -1,34 +0,0 @@ - -open Common_ - -(* generate random IDs *) -module Make() = struct - let rand_ = Random.State.make_self_init() - - let rand_bytes_8 () : bytes = - let@() = with_lock_ in - let b = Bytes.create 8 in - for i=0 to 1 do - let r = Random.State.bits rand_ in (* 30 bits, of which we use 24 *) - Bytes.set b (i*3) (Char.chr (r land 0xff)); - Bytes.set b (i*3+1) (Char.chr (r lsr 8 land 0xff)); - Bytes.set b (i*3+2) (Char.chr (r lsr 16 land 0xff)); - done; - let r = Random.State.bits rand_ in - Bytes.set b 6 (Char.chr (r land 0xff)); - Bytes.set b 7 (Char.chr (r lsr 8 land 0xff)); - b - - let rand_bytes_16 () : bytes = - let@() = with_lock_ in - let b = Bytes.create 16 in - for i=0 to 4 do - let r = Random.State.bits rand_ in (* 30 bits, of which we use 24 *) - Bytes.set b (i*3) (Char.chr (r land 0xff)); - Bytes.set b (i*3+1) (Char.chr (r lsr 8 land 0xff)); - Bytes.set b (i*3+2) (Char.chr (r lsr 16 land 0xff)); - done; - let r = Random.State.bits rand_ in - Bytes.set b 15 (Char.chr (r land 0xff)); (* last byte *) - b -end diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 5be2814a..0374f328 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -1,4 +1,3 @@ - (* https://github.com/open-telemetry/oteps/blob/main/text/0035-opentelemetry-protocol.md https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md @@ -10,48 +9,56 @@ include Common_ let needs_gc_metrics = Atomic.make false -let gc_metrics = AList.make() (* side channel for GC, appended to {!E_metrics}'s data *) +let gc_metrics = AList.make () +(* side channel for GC, appended to {!E_metrics}'s data *) (* capture current GC metrics and push them into {!gc_metrics} for later collection *) let sample_gc_metrics () = Atomic.set needs_gc_metrics false; - let l = OT.Metrics.make_resource_metrics - ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) - @@ Opentelemetry.GC_metrics.get_metrics() in + let l = + OT.Metrics.make_resource_metrics + ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) + @@ Opentelemetry.GC_metrics.get_metrics () + in AList.add gc_metrics l module Config = Config -let _init_curl = lazy ( - Curl.global_init Curl.CURLINIT_GLOBALALL; - at_exit Curl.global_cleanup; -) +let _init_curl = + lazy + (Curl.global_init Curl.CURLINIT_GLOBALALL; + at_exit Curl.global_cleanup) -type error = [ - | `Status of int * Opentelemetry.Proto.Status.status +type error = + [ `Status of int * Opentelemetry.Proto.Status.status | `Failure of string -] + ] let n_errors = Atomic.make 0 + let n_dropped = Atomic.make 0 let report_err_ = function | `Failure msg -> Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg | `Status (code, status) -> - Format.eprintf "@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." - code Proto.Status.pp_status status + Format.eprintf + "@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." code + Proto.Status.pp_status status module type CURL = sig - val send : path:string -> decode:(Pbrt.Decoder.t -> 'a) -> string -> ('a, error) result + val send : + path:string -> decode:(Pbrt.Decoder.t -> 'a) -> string -> ('a, error) result + val cleanup : unit -> unit end (* create a curl client *) -module Curl() : CURL = struct +module Curl () : CURL = struct open Opentelemetry.Proto - let() = Lazy.force _init_curl + + let () = Lazy.force _init_curl let buf_res = Buffer.create 256 @@ -65,37 +72,38 @@ module Curl() : CURL = struct (* TODO: use Curl multi *) (* send the content to the remote endpoint/path *) - let send ~path ~decode (bod:string) : ('a, error) result = + let send ~path ~decode (bod : string) : ('a, error) result = Curl.reset curl; if !debug_ then Curl.set_verbose curl true; Curl.set_url curl (!url ^ path); Curl.set_httppost curl []; let to_http_header (k, v) = Printf.sprintf "%s: %s" k v in let http_headers = List.map to_http_header !headers in - Curl.set_httpheader curl ("Content-Type: application/x-protobuf" :: http_headers); + Curl.set_httpheader curl + ("Content-Type: application/x-protobuf" :: http_headers); (* write body *) Curl.set_post curl true; Curl.set_postfieldsize curl (String.length bod); Curl.set_readfunction curl - begin - let i = ref 0 in - (fun n -> - if !debug_ then Printf.eprintf "curl asks for %d bytes\n%!" n; - let len = min n (String.length bod - !i) in - let s = String.sub bod !i len in - if !debug_ then Printf.eprintf "gave curl %d bytes\n%!" len; - i := !i + len; - s) - end; + (let i = ref 0 in + fun n -> + if !debug_ then Printf.eprintf "curl asks for %d bytes\n%!" n; + let len = min n (String.length bod - !i) in + let s = String.sub bod !i len in + if !debug_ then Printf.eprintf "gave curl %d bytes\n%!" len; + i := !i + len; + s); (* read result's body *) Buffer.clear buf_res; - Curl.set_writefunction curl - (fun s -> Buffer.add_string buf_res s; String.length s); + Curl.set_writefunction curl (fun s -> + Buffer.add_string buf_res s; + String.length s); try match Curl.perform curl with | () -> let code = Curl.get_responsecode curl in - if !debug_ then Printf.eprintf "result body: %S\n%!" (Buffer.contents buf_res); + if !debug_ then + Printf.eprintf "result body: %S\n%!" (Buffer.contents buf_res); let dec = Pbrt.Decoder.of_string (Buffer.contents buf_res) in if code >= 200 && code < 300 then ( let res = decode dec in @@ -105,17 +113,24 @@ module Curl() : CURL = struct Error (`Status (code, status)) ) | exception Curl.CurlException (_, code, msg) -> - let status = Status.default_status - ~code:(Int32.of_int code) ~message:(Bytes.unsafe_of_string msg) () in - Error(`Status (code, status)) + let status = + Status.default_status ~code:(Int32.of_int code) + ~message:(Bytes.unsafe_of_string msg) + () + in + Error (`Status (code, status)) with e -> Error (`Failure (Printexc.to_string e)) end module type PUSH = sig type elt + val push : elt -> unit + val is_empty : unit -> bool + val is_big_enough : unit -> bool + val pop_iter_all : (elt -> unit) -> unit end @@ -125,161 +140,179 @@ module type EMITTER = sig open Opentelemetry.Proto val push_trace : Trace.resource_spans list -> unit + val push_metrics : Metrics.resource_metrics list -> unit + val set_on_tick_callbacks : (unit -> unit) list ref -> unit val tick : unit -> unit + val cleanup : unit -> unit end type 'a push = (module PUSH with type elt = 'a) -type on_full_cb = (unit -> unit) + +type on_full_cb = unit -> unit (* make a "push" object, along with a setter for a callback to call when it's ready to emit a batch *) -let mk_push (type a) ?batch () : (module PUSH with type elt = a) * (on_full_cb -> unit) = - let on_full: on_full_cb ref = ref ignore in +let mk_push (type a) ?batch () : + (module PUSH with type elt = a) * (on_full_cb -> unit) = + let on_full : on_full_cb ref = ref ignore in let push = match batch with | None -> let r = ref None in let module M = struct type elt = a + let is_empty () = !r == None + let is_big_enough () = !r != None + let push x = - r := Some x; !on_full() - let pop_iter_all f = Option.iter f !r; r := None + r := Some x; + !on_full () + + let pop_iter_all f = + Option.iter f !r; + r := None end in (module M : PUSH with type elt = a) - | Some n -> let q = FQueue.create ~dummy:(Obj.magic 0) (3 * n) in let module M = struct type elt = a + let is_empty () = FQueue.size q = 0 + let is_big_enough () = FQueue.size q >= n + let push x = - if not (FQueue.push q x) || FQueue.size q > n then ( - !on_full(); - if not (FQueue.push q x) then ( - Atomic.incr n_dropped; (* drop item *) - ) + if (not (FQueue.push q x)) || FQueue.size q > n then ( + !on_full (); + if not (FQueue.push q x) then Atomic.incr n_dropped (* drop item *) ) + let pop_iter_all f = FQueue.pop_iter_all q f end in (module M : PUSH with type elt = a) - in - push, ((:=) on_full) + + push, ( := ) on_full (* start a thread in the background, running [f()] *) -let start_bg_thread (f: unit -> unit) : unit = - let run() = +let start_bg_thread (f : unit -> unit) : unit = + let run () = (* block some signals: USR1 USR2 TERM PIPE ALARM STOP, see [$ kill -L] *) - ignore (Thread.sigmask Unix.SIG_BLOCK [10; 12; 13; 14; 15; 19] : _ list); - f() + ignore (Thread.sigmask Unix.SIG_BLOCK [ 10; 12; 13; 14; 15; 19 ] : _ list); + f () in ignore (Thread.create run () : Thread.t) -let l_is_empty = function [] -> true | _::_ -> false +let l_is_empty = function + | [] -> true + | _ :: _ -> false + let batch_is_empty = List.for_all l_is_empty (* make an emitter. exceptions inside should be caught, see https://opentelemetry.io/docs/reference/specification/error-handling/ *) -let mk_emitter ~(config:Config.t) () : (module EMITTER) = +let mk_emitter ~(config : Config.t) () : (module EMITTER) = let open Proto in - let continue = ref true in let ((module E_trace) : Trace.resource_spans list push), on_trace_full = - mk_push ?batch:config.batch_traces () in - let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full = - mk_push ?batch:config.batch_metrics () in - - let encoder = Pbrt.Encoder.create() in - - let ((module C) as curl) = (module Curl() : CURL) in - - let on_tick_cbs_ = ref (ref []) in - let set_on_tick_callbacks = (:=) on_tick_cbs_ in - - let send_metrics_http (l:Metrics.resource_metrics list list) = - Pbrt.Encoder.reset encoder; - let resource_metrics = - List.fold_left (fun acc l -> List.rev_append l acc) [] l in - Metrics_service.encode_export_metrics_service_request - (Metrics_service.default_export_metrics_service_request - ~resource_metrics ()) - encoder; - let data = Pbrt.Encoder.to_string encoder in - begin match - C.send ~path:"/v1/metrics" ~decode:(fun _ -> ()) - data - with - | Ok () -> () - | Error err -> - (* TODO: log error _via_ otel? *) - Atomic.incr n_errors; - report_err_ err - end; + mk_push ?batch:config.batch_traces () + in + let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full + = + mk_push ?batch:config.batch_metrics () in - let send_traces_http (l:Trace.resource_spans list list) = + let encoder = Pbrt.Encoder.create () in + + let ((module C) as curl) = (module Curl () : CURL) in + + let on_tick_cbs_ = ref (ref []) in + let set_on_tick_callbacks = ( := ) on_tick_cbs_ in + + let send_metrics_http (l : Metrics.resource_metrics list list) = + Pbrt.Encoder.reset encoder; + let resource_metrics = + List.fold_left (fun acc l -> List.rev_append l acc) [] l + in + Metrics_service.encode_export_metrics_service_request + (Metrics_service.default_export_metrics_service_request ~resource_metrics + ()) + encoder; + let data = Pbrt.Encoder.to_string encoder in + match C.send ~path:"/v1/metrics" ~decode:(fun _ -> ()) data with + | Ok () -> () + | Error err -> + (* TODO: log error _via_ otel? *) + Atomic.incr n_errors; + report_err_ err + in + + let send_traces_http (l : Trace.resource_spans list list) = Pbrt.Encoder.reset encoder; let resource_spans = - List.fold_left (fun acc l -> List.rev_append l acc) [] l in + List.fold_left (fun acc l -> List.rev_append l acc) [] l + in Trace_service.encode_export_trace_service_request (Trace_service.default_export_trace_service_request ~resource_spans ()) encoder; - begin match - C.send ~path:"/v1/traces" ~decode:(fun _ -> ()) - (Pbrt.Encoder.to_string encoder) - with - | Ok () -> () - | Error err -> - (* TODO: log error _via_ otel? *) - Atomic.incr n_errors; - report_err_ err - end; + match + C.send ~path:"/v1/traces" + ~decode:(fun _ -> ()) + (Pbrt.Encoder.to_string encoder) + with + | Ok () -> () + | Error err -> + (* TODO: log error _via_ otel? *) + Atomic.incr n_errors; + report_err_ err in - let last_wakeup = Atomic.make (Mtime_clock.now()) in + let last_wakeup = Atomic.make (Mtime_clock.now ()) in let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in - let batch_timeout() : bool = - let elapsed = Mtime.span (Mtime_clock.now()) (Atomic.get last_wakeup) in + let batch_timeout () : bool = + let elapsed = Mtime.span (Mtime_clock.now ()) (Atomic.get last_wakeup) in Mtime.Span.compare elapsed timeout >= 0 in - let emit_metrics ?(force=false) () : bool = - if force || (not force && E_metrics.is_big_enough ()) then ( - let batch = ref [AList.pop_all gc_metrics] in + let emit_metrics ?(force = false) () : bool = + if force || ((not force) && E_metrics.is_big_enough ()) then ( + let batch = ref [ AList.pop_all gc_metrics ] in E_metrics.pop_iter_all (fun l -> batch := l :: !batch); let do_something = not (l_is_empty !batch) in if do_something then ( send_metrics_http !batch; - Atomic.set last_wakeup (Mtime_clock.now()); + Atomic.set last_wakeup (Mtime_clock.now ()) ); do_something - ) else false + ) else + false in - let emit_traces ?(force=false) () : bool = - if force || (not force && E_trace.is_big_enough ()) then ( + let emit_traces ?(force = false) () : bool = + if force || ((not force) && E_trace.is_big_enough ()) then ( let batch = ref [] in E_trace.pop_iter_all (fun l -> batch := l :: !batch); let do_something = not (l_is_empty !batch) in if do_something then ( send_traces_http !batch; - Atomic.set last_wakeup (Mtime_clock.now()); + Atomic.set last_wakeup (Mtime_clock.now ()) ); do_something - ) else false + ) else + false in let[@inline] guard f = - try f() + try f () with e -> Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!" (Printexc.to_string e) @@ -288,63 +321,60 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let emit_all_force () = let@ () = guard in ignore (emit_traces ~force:true () : bool); - ignore (emit_metrics ~force:true () : bool); + ignore (emit_metrics ~force:true () : bool) in - if config.thread then ( - begin - let m = Mutex.create() in - set_mutex ~lock:(fun () -> Mutex.lock m) ~unlock:(fun () -> Mutex.unlock m); - end; + (let m = Mutex.create () in + Lock.set_mutex + ~lock:(fun () -> Mutex.lock m) + ~unlock:(fun () -> Mutex.unlock m)); - let ((module C) as curl) = (module Curl() : CURL) in + let ((module C) as curl) = (module Curl () : CURL) in - let m = Mutex.create() in - let cond = Condition.create() in + let m = Mutex.create () in + let cond = Condition.create () in (* loop for the thread that processes events and sends them to collector *) let bg_thread () = while !continue do let@ () = guard in - let timeout = batch_timeout() in + let timeout = batch_timeout () in let do_metrics = emit_metrics ~force:timeout () in let do_traces = emit_traces ~force:timeout () in - if not do_metrics && not do_traces then ( + if (not do_metrics) && not do_traces then (* wait *) let@ () = with_mutex_ m in - Condition.wait cond m; - ) + Condition.wait cond m done; (* flush remaining events *) - begin - let@ () = guard in - ignore (emit_traces ~force:true () : bool); - ignore (emit_metrics ~force:true () : bool); - C.cleanup(); - end + let@ () = guard in + ignore (emit_traces ~force:true () : bool); + ignore (emit_metrics ~force:true () : bool); + C.cleanup () in start_bg_thread bg_thread; let wakeup () = with_mutex_ m (fun () -> Condition.signal cond); - Thread.yield() + Thread.yield () in (* wake up if a batch is full *) on_metrics_full wakeup; on_trace_full wakeup; - let tick() = - if Atomic.get needs_gc_metrics then sample_gc_metrics(); + let tick () = + if Atomic.get needs_gc_metrics then sample_gc_metrics (); List.iter (fun f -> - try f() + try f () with e -> - Printf.eprintf "on tick callback raised: %s\n" (Printexc.to_string e)) + Printf.eprintf "on tick callback raised: %s\n" + (Printexc.to_string e)) !(!on_tick_cbs_); - if batch_timeout() then wakeup() + if batch_timeout () then wakeup () in if config.ticker_thread then ( @@ -352,89 +382,97 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let tick_thread () = while true do Thread.delay 0.5; - tick(); + tick () done in - start_bg_thread tick_thread; + start_bg_thread tick_thread ); let module M = struct let push_trace e = E_trace.push e; - if batch_timeout() then wakeup() + if batch_timeout () then wakeup () + let push_metrics e = E_metrics.push e; - if batch_timeout() then wakeup() + if batch_timeout () then wakeup () + let set_on_tick_callbacks = set_on_tick_callbacks - let tick=tick + + let tick = tick + let cleanup () = continue := false; with_mutex_ m (fun () -> Condition.broadcast cond) end in (module M) ) else ( - on_metrics_full (fun () -> - if Atomic.get needs_gc_metrics then sample_gc_metrics(); + if Atomic.get needs_gc_metrics then sample_gc_metrics (); ignore (emit_metrics () : bool)); - on_trace_full (fun () -> - ignore (emit_traces () : bool)); + on_trace_full (fun () -> ignore (emit_traces () : bool)); let cleanup () = - emit_all_force(); - C.cleanup(); + emit_all_force (); + C.cleanup () in let module M = struct let push_trace e = - let@() = guard in + let@ () = guard in E_trace.push e; - if batch_timeout() then emit_all_force() + if batch_timeout () then emit_all_force () let push_metrics e = - let@() = guard in + let@ () = guard in E_metrics.push e; - if batch_timeout() then emit_all_force() + if batch_timeout () then emit_all_force () let set_on_tick_callbacks = set_on_tick_callbacks let tick () = - if Atomic.get needs_gc_metrics then sample_gc_metrics(); - if batch_timeout() then emit_all_force() + if Atomic.get needs_gc_metrics then sample_gc_metrics (); + if batch_timeout () then emit_all_force () let cleanup = cleanup end in (module M) ) -module Backend(Arg : sig val config : Config.t end)() - : Opentelemetry.Collector.BACKEND -= struct - include Gen_ids.Make() - +module Backend (Arg : sig + val config : Config.t +end) +() : Opentelemetry.Collector.BACKEND = struct include (val mk_emitter ~config:Arg.config ()) open Opentelemetry.Proto open Opentelemetry.Collector - let send_trace : Trace.resource_spans list sender = { - send=fun l ~ret -> - let@() = with_lock_ in - if !debug_ then Format.eprintf "send spans %a@." (Format.pp_print_list Trace.pp_resource_spans) l; - push_trace l; - ret() - } + let send_trace : Trace.resource_spans list sender = + { + send = + (fun l ~ret -> + let@ () = Lock.with_lock in + if !debug_ then + Format.eprintf "send spans %a@." + (Format.pp_print_list Trace.pp_resource_spans) + l; + push_trace l; + ret ()); + } - let last_sent_metrics = Atomic.make (Mtime_clock.now()) - let timeout_sent_metrics = Mtime.Span.(5 * s) (* send metrics from time to time *) + let last_sent_metrics = Atomic.make (Mtime_clock.now ()) + + let timeout_sent_metrics = Mtime.Span.(5 * s) + (* send metrics from time to time *) let signal_emit_gc_metrics () = Atomic.set needs_gc_metrics true let additional_metrics () : Metrics.resource_metrics list = (* add exporter metrics to the lot? *) let last_emit = Atomic.get last_sent_metrics in - let now = Mtime_clock.now() in + let now = Mtime_clock.now () in let add_own_metrics = let elapsed = Mtime.span last_emit now in Mtime.Span.compare elapsed timeout_sent_metrics > 0 @@ -443,43 +481,63 @@ module Backend(Arg : sig val config : Config.t end)() if add_own_metrics then ( let open OT.Metrics in Atomic.set last_sent_metrics now; - [make_resource_metrics [ - sum ~name:"otel-export.dropped" ~is_monotonic:true [ - int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); - ]; - sum ~name:"otel-export.errors" ~is_monotonic:true [ - int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); - ]; - ]] - ) else [] + [ + make_resource_metrics + [ + sum ~name:"otel-export.dropped" ~is_monotonic:true + [ + int + ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); + ]; + sum ~name:"otel-export.errors" ~is_monotonic:true + [ + int + ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); + ]; + ]; + ] + ) else + [] - let send_metrics : Metrics.resource_metrics list sender = { - send=fun m ~ret -> - let@() = with_lock_ in - if !debug_ then Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) m; + let send_metrics : Metrics.resource_metrics list sender = + { + send = + (fun m ~ret -> + let@ () = Lock.with_lock in + if !debug_ then + Format.eprintf "send metrics %a@." + (Format.pp_print_list Metrics.pp_resource_metrics) + m; - let m = List.rev_append (additional_metrics()) m in - push_metrics m; - ret() - } + let m = List.rev_append (additional_metrics ()) m in + push_metrics m; + ret ()); + } end -let setup_ ~(config:Config.t) () = +let setup_ ~(config : Config.t) () = debug_ := config.debug; - let module B = Backend(struct let config=config end)() in + let module B = + Backend + (struct + let config = config + end) + () + in Opentelemetry.Collector.set_backend (module B); B.cleanup -let setup ?(config=Config.make()) ?(enable=true) () = +let setup ?(config = Config.make ()) ?(enable = true) () = if enable then ( let cleanup = setup_ ~config () in at_exit cleanup ) -let with_setup ?(config=Config.make()) ?(enable=true) () f = +let with_setup ?(config = Config.make ()) ?(enable = true) () f = if enable then ( let cleanup = setup_ ~config () in Fun.protect ~finally:cleanup f - ) else f() + ) else + f () diff --git a/src/dune b/src/dune index 1db5ef21..b3e8a3d6 100644 --- a/src/dune +++ b/src/dune @@ -2,7 +2,7 @@ (name opentelemetry) (synopsis "API for opentelemetry instrumentation") (flags :standard -warn-error -a+8) - (libraries ptime ptime.clock.os pbrt) + (libraries ptime ptime.clock.os pbrt threads) (public_name opentelemetry)) ; ### protobuf rules ### diff --git a/src/lock.ml b/src/lock.ml new file mode 100644 index 00000000..0d17c1c3 --- /dev/null +++ b/src/lock.ml @@ -0,0 +1,11 @@ +let lock_ : (unit -> unit) ref = ref ignore + +let unlock_ : (unit -> unit) ref = ref ignore + +let set_mutex ~lock ~unlock : unit = + lock_ := lock; + unlock_ := unlock + +let[@inline] with_lock f = + !lock_ (); + Fun.protect ~finally:!unlock_ f diff --git a/src/lock.mli b/src/lock.mli new file mode 100644 index 00000000..1b6c8c4b --- /dev/null +++ b/src/lock.mli @@ -0,0 +1,7 @@ +val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit +(** Set a pair of lock/unlock functions that are used to + protect access to global state, if needed. By default these do nothing. *) + +val with_lock : (unit -> 'a) -> 'a +(** Call [f()] while holding the mutex defined {!set_mutex}, then + release the mutex. *) diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 678fe405..df37c8f4 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -1,6 +1,11 @@ - (** Opentelemetry types and instrumentation *) +module Lock = Lock +(** Global lock *) + +module Rand_bytes = Rand_bytes +(** Generation of random identifiers *) + (** {2 Wire format} *) (** Protobuf types *) @@ -56,11 +61,12 @@ end in nanoseconds. *) module Timestamp_ns = struct type t = int64 + let ns_in_a_day = Int64.(mul 1_000_000_000L (of_int (24 * 3600))) (** Current unix timestamp in nanoseconds *) let[@inline] now_unix_ns () : t = - let span = Ptime_clock.now() |> Ptime.to_span in + let span = Ptime_clock.now () |> Ptime.to_span in let d, ps = Ptime.Span.to_d_ps span in let d = Int64.(mul (of_int d) ns_in_a_day) in let ns = Int64.(div ps 1_000L) in @@ -78,6 +84,7 @@ end module Collector = struct open Proto + type 'msg sender = { send: 'a. 'msg -> ret:(unit -> 'a) -> 'a } (** Sender interface for a message of type [msg]. Inspired from Logs' reporter (see {{:https://erratique.ch/software/logs/doc/Logs/index.html#sync} its doc}) @@ -90,9 +97,6 @@ module Collector = struct It doesn't mean the event has been collected yet, it could sit in a batch queue for a little while. *) - type 'msg sender = { - send: 'a. 'msg -> ret:(unit -> 'a) -> 'a; - } (** Collector client interface. *) module type BACKEND = sig @@ -100,12 +104,6 @@ module Collector = struct val send_metrics : Metrics.resource_metrics list sender - val rand_bytes_16 : unit -> bytes - (** Generate 16 bytes of random data *) - - val rand_bytes_8 : unit -> bytes - (** Generate 16 bytes of random data *) - val signal_emit_gc_metrics : unit -> unit (** Signal the backend that it should emit GC metrics when it has the chance. This should be installed in a GC alarm or another form @@ -134,7 +132,7 @@ module Collector = struct end (** Set collector backend *) - let set_backend (b:backend) : unit = + let set_backend (b : backend) : unit = let (module B) = b in B.set_on_tick_callbacks on_tick_cbs_; backend := Some b @@ -145,25 +143,19 @@ module Collector = struct (** Current backend, if any *) let[@inline] get_backend () : backend option = !backend - let send_trace (l:Trace.resource_spans list) ~ret = + let send_trace (l : Trace.resource_spans list) ~ret = match !backend with - | None -> ret() + | None -> ret () | Some (module B) -> B.send_trace.send l ~ret - let send_metrics (l:Metrics.resource_metrics list) ~ret = + let send_metrics (l : Metrics.resource_metrics list) ~ret = match !backend with - | None -> ret() + | None -> ret () | Some (module B) -> B.send_metrics.send l ~ret - let rand_bytes_16 () = - match !backend with - | None -> Bytes.make 16 '?' - | Some (module B) -> B.rand_bytes_16() + let[@inline] rand_bytes_16 () = !Rand_bytes.rand_bytes_16 () - let rand_bytes_8 () = - match !backend with - | None -> Bytes.make 8 '?' - | Some (module B) -> B.rand_bytes_8() + let[@inline] rand_bytes_8 () = !Rand_bytes.rand_bytes_8 () let on_tick f = on_tick_cbs_ := f :: !on_tick_cbs_ @@ -172,35 +164,38 @@ module Collector = struct let tick () = match !backend with | None -> () - | Some (module B) -> B.tick() + | Some (module B) -> B.tick () end module Util_ = struct - let bytes_to_hex (b:bytes) : string = - let i_to_hex (i:int) = - if i < 10 then Char.chr (i + Char.code '0') - else Char.chr (i - 10 + Char.code 'a') + let bytes_to_hex (b : bytes) : string = + let i_to_hex (i : int) = + if i < 10 then + Char.chr (i + Char.code '0') + else + Char.chr (i - 10 + Char.code 'a') in let res = Bytes.create (2 * Bytes.length b) in - for i = 0 to Bytes.length b-1 do + for i = 0 to Bytes.length b - 1 do let n = Char.code (Bytes.get b i) in Bytes.set res (2 * i) (i_to_hex ((n land 0xf0) lsr 4)); - Bytes.set res (2 * i + 1) (i_to_hex (n land 0x0f)); + Bytes.set res ((2 * i) + 1) (i_to_hex (n land 0x0f)) done; Bytes.unsafe_to_string res - let bytes_of_hex (s:string) : bytes = + let bytes_of_hex (s : string) : bytes = let n_of_c = function | '0' .. '9' as c -> Char.code c - Char.code '0' | 'a' .. 'f' as c -> 10 + Char.code c - Char.code 'a' | _ -> raise (Invalid_argument "invalid hex char") in - if (String.length s mod 2 <> 0) then raise (Invalid_argument "hex sequence must be of even length"); + if String.length s mod 2 <> 0 then + raise (Invalid_argument "hex sequence must be of even length"); let res = Bytes.make (String.length s / 2) '\x00' in - for i=0 to String.length s/2-1 do - let n1 = n_of_c (String.get s (2*i)) in - let n2 = n_of_c (String.get s (2*i+1)) in + for i = 0 to (String.length s / 2) - 1 do + let n1 = n_of_c (String.get s (2 * i)) in + let n2 = n_of_c (String.get s ((2 * i) + 1)) in let n = (n1 lsl 4) lor n2 in Bytes.set res i (Char.chr n) done; @@ -214,40 +209,74 @@ end This 16 bytes identifier is shared by all spans in one trace. *) module Trace_id : sig type t + val create : unit -> t + val pp : Format.formatter -> t -> unit + val to_bytes : t -> bytes + val of_bytes : bytes -> t + val to_hex : t -> string + val of_hex : string -> t end = struct open Proto.Trace + type t = bytes + let to_bytes self = self - let create () : t = Collector.rand_bytes_16() - let of_bytes b = if Bytes.length b=16 then b else raise (Invalid_argument "trace IDs must be 16 bytes in length") + + let create () : t = Collector.rand_bytes_16 () + + let of_bytes b = + if Bytes.length b = 16 then + b + else + raise (Invalid_argument "trace IDs must be 16 bytes in length") + let to_hex self = Util_.bytes_to_hex self + let of_hex s = of_bytes (Util_.bytes_of_hex s) + let pp fmt t = Format.fprintf fmt "%s" (to_hex t) end (** Unique ID of a span. *) module Span_id : sig type t + val create : unit -> t + val pp : Format.formatter -> t -> unit + val to_bytes : t -> bytes + val of_bytes : bytes -> t + val to_hex : t -> string + val of_hex : string -> t end = struct open Proto.Trace + type t = bytes + let to_bytes self = self - let create () : t = Collector.rand_bytes_8() - let of_bytes b = if Bytes.length b=8 then b else raise (Invalid_argument "span IDs must be 8 bytes in length") + + let create () : t = Collector.rand_bytes_8 () + + let of_bytes b = + if Bytes.length b = 8 then + b + else + raise (Invalid_argument "span IDs must be 8 bytes in length") + let to_hex self = Util_.bytes_to_hex self + let of_hex s = of_bytes (Util_.bytes_of_hex s) + let pp fmt t = Format.fprintf fmt "%s" (to_hex t) end @@ -258,12 +287,16 @@ module Conventions = struct module Process = struct module Runtime = struct let name = "process.runtime.name" + let version = "process.runtime.version" + let description = "process.runtime.description" end end + module Service = struct let name = "service.name" + let namespace = "service.namespace" end end @@ -274,9 +307,13 @@ module Conventions = struct module Ocaml = struct module GC = struct let compactions = "process.runtime.ocaml.gc.compactions" + let major_collections = "process.runtime.ocaml.gc.major_collections" + let major_heap = "process.runtime.ocaml.gc.major_heap" + let minor_allocated = "process.runtime.ocaml.gc.minor_allocated" + let minor_collections = "process.runtime.ocaml.gc.minor_collections" end end @@ -285,11 +322,17 @@ module Conventions = struct end end -type value = [`Int of int | `String of string | `Bool of bool | `None] +type value = + [ `Int of int + | `String of string + | `Bool of bool + | `None + ] type key_value = string * value (**/**) + let _conv_value = let open Proto.Common in function @@ -301,7 +344,8 @@ let _conv_value = (**/**) (**/**) -let _conv_key_value (k,v) = + +let _conv_key_value (k, v) = let open Proto.Common in let value = _conv_value v in default_key_value ~key:k ~value () @@ -314,29 +358,30 @@ let _conv_key_value (k,v) = module Globals = struct open Proto.Common - let service_name = ref "unknown_service" (** Main service name metadata *) + let service_name = ref "unknown_service" - let service_namespace = ref None (** Namespace for the service *) + let service_namespace = ref None let instrumentation_library = - default_instrumentation_library - ~version:"0.1" - ~name:"ocaml-opentelemetry" () + default_instrumentation_library ~version:"0.1" ~name:"ocaml-opentelemetry" + () (** Global attributes, initially set via OTEL_RESOURCE_ATTRIBUTES and modifiable by the user code. They will be attached to each outgoing metrics/traces. *) let global_attributes : key_value list ref = - let parse_pair s = match String.split_on_char '=' s with - | [a;b] -> default_key_value ~key:a ~value:(Some (String_value b)) () + let parse_pair s = + match String.split_on_char '=' s with + | [ a; b ] -> default_key_value ~key:a ~value:(Some (String_value b)) () | _ -> failwith (Printf.sprintf "invalid attribute: %S" s) in - ref @@ + ref + @@ try - Sys.getenv "OTEL_RESOURCE_ATTRIBUTES" |> String.split_on_char ',' - |> List.map parse_pair + Sys.getenv "OTEL_RESOURCE_ATTRIBUTES" + |> String.split_on_char ',' |> List.map parse_pair with _ -> [] (* add global attributes to this list *) @@ -344,17 +389,20 @@ module Globals = struct let not_redundant kv = List.for_all (fun kv' -> kv.key <> kv'.key) into in List.rev_append (List.filter not_redundant !global_attributes) into - let mk_attributes ?(service_name = !service_name) ?(attrs=[]) () : _ list = + let mk_attributes ?(service_name = !service_name) ?(attrs = []) () : _ list = let l = List.map _conv_key_value attrs in let l = default_key_value ~key:Conventions.Attributes.Service.name - ~value:(Some (String_value service_name)) () :: l + ~value:(Some (String_value service_name)) () + :: l in - let l = match !service_namespace with + let l = + match !service_namespace with | None -> l | Some v -> default_key_value ~key:Conventions.Attributes.Service.namespace - ~value:(Some (String_value v)) () :: l + ~value:(Some (String_value v)) () + :: l in l |> merge_global_attributes_ end @@ -367,22 +415,18 @@ end belong in a span. *) module Event : sig open Proto.Trace + type t = span_event val make : - ?time_unix_nano:Timestamp_ns.t -> - ?attrs:key_value list -> - string -> - t - + ?time_unix_nano:Timestamp_ns.t -> ?attrs:key_value list -> string -> t end = struct open Proto.Trace + type t = span_event - let make - ?(time_unix_nano=Timestamp_ns.now_unix_ns()) - ?(attrs=[]) - (name:string) : t = + let make ?(time_unix_nano = Timestamp_ns.now_unix_ns ()) ?(attrs = []) + (name : string) : t = let attrs = List.map _conv_key_value attrs in default_span_event ~time_unix_nano ~name ~attributes:attrs () end @@ -397,6 +441,7 @@ module Span : sig open Proto.Trace type t = span + type id = Span_id.t type nonrec kind = span_span_kind = @@ -419,7 +464,8 @@ module Span : sig val id : t -> Span_id.t - type key_value = string * [`Int of int | `String of string | `Bool of bool | `None] + type key_value = + string * [ `Int of int | `String of string | `Bool of bool | `None ] val create : ?kind:kind -> @@ -433,8 +479,9 @@ module Span : sig ?links:(Trace_id.t * Span_id.t * string) list -> start_time:Timestamp_ns.t -> end_time:Timestamp_ns.t -> - string -> t * id - (** [create ~trace_id name] creates a new span with its unique ID. + string -> + t * id + (** [create ~trace_id name] creates a new span with its unique ID. @param trace_id the trace this belongs to @param parent parent span, if any @param links list of links to other spans, each with their trace state @@ -443,6 +490,7 @@ end = struct open Proto.Trace type t = span + type id = Span_id.t type nonrec kind = span_span_kind = @@ -453,7 +501,8 @@ end = struct | Span_kind_producer | Span_kind_consumer - type key_value = string * [`Int of int | `String of string | `Bool of bool | `None] + type key_value = + string * [ `Int of int | `String of string | `Bool of bool | `None ] type nonrec status_code = status_status_code = | Status_code_unset @@ -465,40 +514,26 @@ end = struct code: status_code; } - let id self = Span_id.of_bytes self.span_id - let create - ?(kind=Span_kind_unspecified) - ?(id=Span_id.create()) - ?trace_state - ?(attrs=[]) - ?(events=[]) - ?status - ~trace_id ?parent ?(links=[]) - ~start_time ~end_time - name : t * id = + let create ?(kind = Span_kind_unspecified) ?(id = Span_id.create ()) + ?trace_state ?(attrs = []) ?(events = []) ?status ~trace_id ?parent + ?(links = []) ~start_time ~end_time name : t * id = let trace_id = Trace_id.to_bytes trace_id in let parent_span_id = Option.map Span_id.to_bytes parent in let attributes = List.map _conv_key_value attrs in let links = List.map - (fun (trace_id,span_id,trace_state) -> - let trace_id = Trace_id.to_bytes trace_id in - let span_id = Span_id.to_bytes span_id in - default_span_link ~trace_id ~span_id ~trace_state()) + (fun (trace_id, span_id, trace_state) -> + let trace_id = Trace_id.to_bytes trace_id in + let span_id = Span_id.to_bytes span_id in + default_span_link ~trace_id ~span_id ~trace_state ()) links in let span = - default_span - ~trace_id ?parent_span_id - ~span_id:(Span_id.to_bytes id) - ~attributes ~events - ?trace_state ~status - ~kind ~name ~links - ~start_time_unix_nano:start_time - ~end_time_unix_nano:end_time - () + default_span ~trace_id ?parent_span_id ~span_id:(Span_id.to_bytes id) + ~attributes ~events ?trace_state ~status ~kind ~name ~links + ~start_time_unix_nano:start_time ~end_time_unix_nano:end_time () in span, id end @@ -514,49 +549,47 @@ module Trace = struct let make_resource_spans ?service_name ?attrs spans = let ils = default_instrumentation_library_spans - ~instrumentation_library:(Some Globals.instrumentation_library) - ~spans () in + ~instrumentation_library:(Some Globals.instrumentation_library) ~spans + () + in let attributes = Globals.mk_attributes ?service_name ?attrs () in let resource = Proto.Resource.default_resource ~attributes () in - default_resource_spans - ~resource:(Some resource) ~instrumentation_library_spans:[ils] () + default_resource_spans ~resource:(Some resource) + ~instrumentation_library_spans:[ ils ] () (** Sync emitter *) - let emit ?service_name ?attrs (spans:span list) : unit = + let emit ?service_name ?attrs (spans : span list) : unit = let rs = make_resource_spans ?service_name ?attrs spans in - Collector.send_trace [rs] ~ret:(fun () -> ()) + Collector.send_trace [ rs ] ~ret:(fun () -> ()) - (** Scope to be used with {!with_}. *) type scope = { trace_id: Trace_id.t; span_id: Span_id.t; mutable events: Event.t list; - mutable attrs: Span.key_value list + mutable attrs: Span.key_value list; } + (** Scope to be used with {!with_}. *) (** Add an event to the scope. It will be aggregated into the span. Note that this takes a function that produces an event, and will only call it if there is an instrumentation backend. *) - let[@inline] add_event (scope:scope) (ev:unit -> Event.t) : unit = - if Collector.has_backend() then ( - scope.events <- ev() :: scope.events - ) + let[@inline] add_event (scope : scope) (ev : unit -> Event.t) : unit = + if Collector.has_backend () then scope.events <- ev () :: scope.events (** Add an attr to the scope. It will be aggregated into the span. Note that this takes a function that produces attributes, and will only call it if there is an instrumentation backend. *) - let[@inline] add_attrs (scope:scope) (attrs:unit -> Span.key_value list) : unit = - if Collector.has_backend() then ( + let[@inline] add_attrs (scope : scope) (attrs : unit -> Span.key_value list) : + unit = + if Collector.has_backend () then scope.attrs <- List.rev_append (attrs ()) scope.attrs - ) (** Sync span guard *) - let with_ - ?trace_state ?service_name ?(attrs: (string*[ 'a) : 'a = + let with_ ?trace_state ?service_name + ?(attrs : (string * [< value ]) list = []) ?kind ?trace_id ?parent ?scope + ?links name (f : scope -> 'a) : 'a = let trace_id = match trace_id, scope with | Some trace_id, _ -> trace_id @@ -569,25 +602,26 @@ module Trace = struct | None, Some scope -> Some scope.span_id | None, None -> None in - let start_time = Timestamp_ns.now_unix_ns() in - let span_id = Span_id.create() in - let scope = {trace_id;span_id;events=[]; attrs} in + let start_time = Timestamp_ns.now_unix_ns () in + let span_id = Span_id.create () in + let scope = { trace_id; span_id; events = []; attrs } in (* called once we're done, to emit a span *) let finally res = - let status = match res with + let status = + match res with | Ok () -> default_status ~code:Status_code_ok () - | Error e -> default_status ~code:Status_code_error ~message:e () in + | Error e -> default_status ~code:Status_code_error ~message:e () + in let span, _ = (* TODO: should the attrs passed to with_ go on the Span (in Span.create) or on the ResourceSpan (in emit)? (question also applies to Opentelemetry_lwt.Trace.with) *) - Span.create - ?kind ~trace_id ?parent ?links ~id:span_id - ?trace_state ~attrs:scope.attrs ~events:scope.events - ~start_time ~end_time:(Timestamp_ns.now_unix_ns()) - ~status - name in - emit ?service_name [span]; + Span.create ?kind ~trace_id ?parent ?links ~id:span_id ?trace_state + ~attrs:scope.attrs ~events:scope.events ~start_time + ~end_time:(Timestamp_ns.now_unix_ns ()) + ~status name + in + emit ?service_name [ span ] in try let x = f scope in @@ -606,39 +640,36 @@ end module Metrics = struct open Metrics_types + type t = Metrics_types.metric (** A single metric, measuring some time-varying quantity or statistical distribution. It is composed of one or more data points that have precise values and time stamps. Each distinct metric should have a distinct name. *) - type t = Metrics_types.metric open struct - let _program_start = Timestamp_ns.now_unix_ns() + let _program_start = Timestamp_ns.now_unix_ns () end (** Number data point, as a float *) - let float ?(start_time_unix_nano=_program_start) - ?(now=Timestamp_ns.now_unix_ns()) - ?(attrs=[]) - (d:float) : number_data_point = + let float ?(start_time_unix_nano = _program_start) + ?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) (d : float) : + number_data_point = let attributes = attrs |> List.map _conv_key_value in - default_number_data_point - ~start_time_unix_nano ~time_unix_nano:now - ~attributes - ~value:(As_double d) () + default_number_data_point ~start_time_unix_nano ~time_unix_nano:now + ~attributes ~value:(As_double d) () (** Number data point, as an int *) - let int ?(start_time_unix_nano=_program_start) - ?(now=Timestamp_ns.now_unix_ns()) - ?(attrs=[]) - (i:int) : number_data_point = + let int ?(start_time_unix_nano = _program_start) + ?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) (i : int) : + number_data_point = let attributes = attrs |> List.map _conv_key_value in default_number_data_point ~start_time_unix_nano ~time_unix_nano:now ~attributes - ~value:(As_int (Int64.of_int i)) () + ~value:(As_int (Int64.of_int i)) + () (** Aggregation of a scalar metric, always with the current value *) - let gauge ~name ?description ?unit_ (l:number_data_point list) : t = + let gauge ~name ?description ?unit_ (l : number_data_point list) : t = let data = Gauge (default_gauge ~data_points:l ()) in default_metric ~name ?description ?unit_ ~data () @@ -649,12 +680,11 @@ module Metrics = struct (** Sum of all reported measurements over a time interval *) let sum ~name ?description ?unit_ - ?(aggregation_temporality=Aggregation_temporality_cumulative) - ?is_monotonic - (l:number_data_point list) : t = + ?(aggregation_temporality = Aggregation_temporality_cumulative) + ?is_monotonic (l : number_data_point list) : t = let data = - Sum (default_sum ~data_points:l ?is_monotonic - ~aggregation_temporality ()) in + Sum (default_sum ~data_points:l ?is_monotonic ~aggregation_temporality ()) + in default_metric ~name ?description ?unit_ ~data () (** Histogram data @@ -664,26 +694,19 @@ module Metrics = struct the counts must be equal to [count]. length must be [1+length explicit_bounds] @param explicit_bounds strictly increasing list of bounds for the buckets *) - let histogram_data_point - ?(start_time_unix_nano=_program_start) - ?(now=Timestamp_ns.now_unix_ns()) - ?(attrs=[]) - ?(exemplars=[]) - ?(explicit_bounds=[]) - ?sum - ~bucket_counts - ~count - () : histogram_data_point = + let histogram_data_point ?(start_time_unix_nano = _program_start) + ?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) ?(exemplars = []) + ?(explicit_bounds = []) ?sum ~bucket_counts ~count () : + histogram_data_point = let attributes = attrs |> List.map _conv_key_value in default_histogram_data_point ~start_time_unix_nano ~time_unix_nano:now ~attributes ~exemplars ~bucket_counts ~explicit_bounds ~count ?sum () - let histogram ~name ?description ?unit_ - ?aggregation_temporality - (l:histogram_data_point list) : t = + let histogram ~name ?description ?unit_ ?aggregation_temporality + (l : histogram_data_point list) : t = let data = - Histogram (default_histogram ~data_points:l - ?aggregation_temporality ()) in + Histogram (default_histogram ~data_points:l ?aggregation_temporality ()) + in default_metric ~name ?description ?unit_ ~data () (* TODO: exponential history *) @@ -691,22 +714,24 @@ module Metrics = struct (* TODO: exemplar *) (** Aggregate metrics into a {!Proto.Metrics.resource_metrics} *) - let make_resource_metrics ?service_name ?attrs (l:t list) : resource_metrics = + let make_resource_metrics ?service_name ?attrs (l : t list) : resource_metrics + = let lm = default_instrumentation_library_metrics ~instrumentation_library:(Some Globals.instrumentation_library) - ~metrics:l () in + ~metrics:l () + in let attributes = Globals.mk_attributes ?service_name ?attrs () in let resource = Proto.Resource.default_resource ~attributes () in - default_resource_metrics - ~instrumentation_library_metrics:[lm] ~resource:(Some resource) () + default_resource_metrics ~instrumentation_library_metrics:[ lm ] + ~resource:(Some resource) () (** Emit some metrics to the collector (sync). This blocks until the backend has pushed the metrics into some internal queue, or discarded them. *) - let emit ?attrs (l:t list) : unit = + let emit ?attrs (l : t list) : unit = let rm = make_resource_metrics ?attrs l in - Collector.send_metrics [rm] ~ret:ignore + Collector.send_metrics [ rm ] ~ret:ignore end (** A set of callbacks that produce metrics when called. @@ -721,23 +746,19 @@ module Metrics_callbacks = struct let cbs_ : (unit -> Metrics.t list) list ref = ref [] end - (** [register f] adds the callback [f] to the list. [f] will be called at unspecified times and is expected to return a list of metrics. *) let register f : unit = - if !cbs_ = [] then ( + if !cbs_ = [] then (* make sure we call [f] (and others) at each tick *) Collector.on_tick (fun () -> - let m = List.map (fun f -> f()) !cbs_ |> List.flatten in - Metrics.emit m) - ); + let m = List.map (fun f -> f ()) !cbs_ |> List.flatten in + Metrics.emit m); cbs_ := f :: !cbs_ end -module Logs = struct - -end +module Logs = struct end (** {2 Utils} *) @@ -746,12 +767,10 @@ end https://www.w3.org/TR/trace-context/ *) module Trace_context = struct - (** The traceparent header https://www.w3.org/TR/trace-context/#traceparent-header *) module Traceparent = struct - let name = "traceparent" (** Parse the value of the traceparent header. @@ -782,33 +801,40 @@ module Trace_context = struct let consume expected ~offset ~or_ = let len = String.length expected in let* str, offset = blit ~offset ~len ~or_ in - if str = expected then Ok offset else Error or_ + if str = expected then + Ok offset + else + Error or_ in let offset = 0 in let* offset = consume "00" ~offset ~or_:"Expected version 00" in let* offset = consume "-" ~offset ~or_:"Expected delimiter" in - let* trace_id, offset = blit ~offset ~len:32 ~or_:"Expected 32-digit trace-id" in + let* trace_id, offset = + blit ~offset ~len:32 ~or_:"Expected 32-digit trace-id" + in let* trace_id = match Trace_id.of_hex trace_id with | trace_id -> Ok trace_id | exception Invalid_argument _ -> Error "Expected hex-encoded trace-id" in let* offset = consume "-" ~offset ~or_:"Expected delimiter" in - let* parent_id, offset = blit ~offset ~len:16 ~or_:"Expected 16-digit parent-id" in + let* parent_id, offset = + blit ~offset ~len:16 ~or_:"Expected 16-digit parent-id" + in let* parent_id = match Span_id.of_hex parent_id with | parent_id -> Ok parent_id | exception Invalid_argument _ -> Error "Expected hex-encoded parent-id" in let* offset = consume "-" ~offset ~or_:"Expected delimiter" in - let* _flags, _offset = blit ~offset ~len:2 ~or_:"Expected 2-digit flags" in + let* _flags, _offset = + blit ~offset ~len:2 ~or_:"Expected 2-digit flags" + in Ok (trace_id, parent_id) let to_value ~(trace_id : Trace_id.t) ~(parent_id : Span_id.t) () : string = - Printf.sprintf "00-%s-%s-00" - (Trace_id.to_hex trace_id) + Printf.sprintf "00-%s-%s-00" (Trace_id.to_hex trace_id) (Span_id.to_hex parent_id) - end end @@ -828,29 +854,32 @@ end = struct (** See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/process.md#process-runtimes *) let runtime_attributes = lazy - Conventions.Attributes.[ - (Process.Runtime.name, `String "ocaml"); - (Process.Runtime.version, `String Sys.ocaml_version); - ] + Conventions.Attributes. + [ + Process.Runtime.name, `String "ocaml"; + Process.Runtime.version, `String Sys.ocaml_version; + ] let get_runtime_attributes () = Lazy.force runtime_attributes let basic_setup () = (* emit metrics when GC is called *) - let on_gc() = - match Collector.get_backend() with + let on_gc () = + match Collector.get_backend () with | None -> () - | Some (module C) -> C.signal_emit_gc_metrics() + | Some (module C) -> C.signal_emit_gc_metrics () in ignore (Gc.create_alarm on_gc : Gc.alarm) let bytes_per_word = Sys.word_size / 8 + let word_to_bytes n = n * bytes_per_word + let word_to_bytes_f n = n *. float bytes_per_word let get_metrics () : Metrics.t list = let gc = Gc.quick_stat () in - let now = Timestamp_ns.now_unix_ns() in + let now = Timestamp_ns.now_unix_ns () in let open Metrics in let open Conventions.Metrics in [ @@ -858,8 +887,7 @@ end = struct [ int ~now (word_to_bytes gc.Gc.heap_words) ]; sum ~name:Process.Runtime.Ocaml.GC.minor_allocated ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative - ~is_monotonic:true - ~unit_:"B" + ~is_monotonic:true ~unit_:"B" [ float ~now (word_to_bytes_f gc.Gc.minor_words) ]; sum ~name:Process.Runtime.Ocaml.GC.minor_collections ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative diff --git a/src/rand_bytes.ml b/src/rand_bytes.ml new file mode 100644 index 00000000..2359358f --- /dev/null +++ b/src/rand_bytes.ml @@ -0,0 +1,38 @@ +(* generate random IDs *) +let rand_ = Random.State.make_self_init () + +let[@inline] ( let@ ) f x = f x + +let default_rand_bytes_8 () : bytes = + let@ () = Lock.with_lock in + let b = Bytes.create 8 in + for i = 0 to 1 do + let r = Random.State.bits rand_ in + (* 30 bits, of which we use 24 *) + Bytes.set b (i * 3) (Char.chr (r land 0xff)); + Bytes.set b ((i * 3) + 1) (Char.chr ((r lsr 8) land 0xff)); + Bytes.set b ((i * 3) + 2) (Char.chr ((r lsr 16) land 0xff)) + done; + let r = Random.State.bits rand_ in + Bytes.set b 6 (Char.chr (r land 0xff)); + Bytes.set b 7 (Char.chr ((r lsr 8) land 0xff)); + b + +let default_rand_bytes_16 () : bytes = + let@ () = Lock.with_lock in + let b = Bytes.create 16 in + for i = 0 to 4 do + let r = Random.State.bits rand_ in + (* 30 bits, of which we use 24 *) + Bytes.set b (i * 3) (Char.chr (r land 0xff)); + Bytes.set b ((i * 3) + 1) (Char.chr ((r lsr 8) land 0xff)); + Bytes.set b ((i * 3) + 2) (Char.chr ((r lsr 16) land 0xff)) + done; + let r = Random.State.bits rand_ in + Bytes.set b 15 (Char.chr (r land 0xff)); + (* last byte *) + b + +let rand_bytes_16 = ref default_rand_bytes_16 + +let rand_bytes_8 = ref default_rand_bytes_8 diff --git a/src/rand_bytes.mli b/src/rand_bytes.mli new file mode 100644 index 00000000..b1e4c811 --- /dev/null +++ b/src/rand_bytes.mli @@ -0,0 +1,9 @@ +val rand_bytes_16 : (unit -> bytes) ref +(** Generate 16 bytes of random data *) + +val rand_bytes_8 : (unit -> bytes) ref +(** Generate 16 bytes of random data *) + +val default_rand_bytes_8 : unit -> bytes + +val default_rand_bytes_16 : unit -> bytes