diff --git a/src/client/AList.ml b/src/client/AList.ml new file mode 100644 index 00000000..0a8c54b0 --- /dev/null +++ b/src/client/AList.ml @@ -0,0 +1,13 @@ +module Atomic = Opentelemetry_atomic.Atomic + +type 'a t = 'a list Atomic.t +let make () = Atomic.make [] +let add self x = + while + let old = Atomic.get self in + let l' = x :: old in + not (Atomic.compare_and_set self old l') + do () done +let rec pop_all self = + let l = Atomic.get self in + if Atomic.compare_and_set self l [] then l else pop_all self diff --git a/src/client/AList.mli b/src/client/AList.mli new file mode 100644 index 00000000..f4db8292 --- /dev/null +++ b/src/client/AList.mli @@ -0,0 +1,6 @@ +(** Atomic list *) + +type 'a t +val make : unit -> 'a t +val add : 'a t -> 'a -> unit +val pop_all : 'a t -> 'a list diff --git a/src/client/FQueue.ml b/src/client/FQueue.ml new file mode 100644 index 00000000..fc48def3 --- /dev/null +++ b/src/client/FQueue.ml @@ -0,0 +1,29 @@ + + +type 'a t = { + arr: 'a array; + mutable i: int; +} + +let create ~dummy n : _ t = + assert (n >= 1); + { arr=Array.make n dummy; + i=0; + } + +let[@inline] size self = self.i +let[@inline] is_full self = self.i = Array.length self.arr + +let push (self:_ t) x : bool = + if is_full self then false + else ( + self.arr.(self.i) <- x; + self.i <- 1 + self.i; + true + ) + +let pop_iter_all (self: _ t) f = + for j=0 to self.i-1 do + f self.arr.(j) + done; + self.i <- 0 diff --git a/src/client/FQueue.mli b/src/client/FQueue.mli new file mode 100644 index 00000000..b80544d2 --- /dev/null +++ b/src/client/FQueue.mli @@ -0,0 +1,9 @@ + +(** queue of fixed size *) + +type 'a t +val create : dummy:'a -> int -> 'a t +val size : _ t -> int +val push : 'a t -> 'a -> bool (* true iff it could write element *) +val pop_iter_all : 'a t -> ('a -> unit) -> unit + diff --git a/src/client/common_.ml b/src/client/common_.ml new file mode 100644 index 00000000..7646b327 --- /dev/null +++ b/src/client/common_.ml @@ -0,0 +1,26 @@ +module Atomic = Opentelemetry_atomic.Atomic + +let[@inline] (let@) f x = f x + +let debug_ = ref (try bool_of_string @@ Sys.getenv "DEBUG" with _ -> false) + +let lock_ : (unit -> unit) ref = ref ignore +let unlock_ : (unit -> unit) ref = ref ignore + +let set_mutex ~lock ~unlock : unit = + lock_ := lock; + unlock_ := unlock + +(* critical section for [f()] *) +let[@inline] with_lock_ f = + !lock_(); + Fun.protect ~finally:!unlock_ f + +let[@inline] with_mutex_ m f = + Mutex.lock m; + Fun.protect ~finally:(fun () -> Mutex.unlock m) f + +let default_url = "http://localhost:4318" +let url = ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url) +let get_url () = !url +let set_url s = url := s diff --git a/src/client/config.ml b/src/client/config.ml new file mode 100644 index 00000000..2ecb2b8d --- /dev/null +++ b/src/client/config.ml @@ -0,0 +1,35 @@ + +open Common_ + +type t = { + debug: bool; + url: string; + batch_traces: int option; + batch_metrics: int option; + batch_timeout_ms: int; + thread: bool; + ticker_thread: bool; +} + +let pp out self = + let ppiopt = Format.pp_print_option Format.pp_print_int in + let {debug; url; batch_traces; batch_metrics; + batch_timeout_ms; thread; ticker_thread} = self in + Format.fprintf out + "{@[ debug=%B;@ url=%S;@ \ + batch_traces=%a;@ batch_metrics=%a;@ \ + batch_timeout_ms=%d; thread=%B;@ ticker_thread=%B @]}" + debug url ppiopt batch_traces ppiopt batch_metrics + batch_timeout_ms thread ticker_thread + +let make + ?(debug= !debug_) + ?(url= get_url()) + ?(batch_traces=Some 400) + ?(batch_metrics=None) + ?(batch_timeout_ms=500) + ?(thread=true) + ?(ticker_thread=true) + () : t = + { debug; url; batch_traces; batch_metrics; batch_timeout_ms; + thread; ticker_thread; } diff --git a/src/client/config.mli b/src/client/config.mli new file mode 100644 index 00000000..892c91af --- /dev/null +++ b/src/client/config.mli @@ -0,0 +1,51 @@ + +type t = { + debug: bool; + + url: string; + (** Url of the endpoint. Default is "http://localhost:4318", + or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *) + + batch_traces: int option; + (** Batch traces? If [Some i], then this produces batches of (at most) + [i] items. If [None], there is no batching. + + Note that traces and metrics are batched separately. + Default [Some 400]. + *) + + batch_metrics: int option; + (** Batch metrics? If [Some i], then this produces batches of (at most) + [i] items. If [None], there is no batching. + + Note that traces and metrics are batched separately. + Default [None]. + *) + + batch_timeout_ms: int; + (** Number of milliseconds after which we will emit a batch, even + incomplete. + Note that the batch might take longer than that, because this is + only checked when a new event occurs. Default 500. *) + + thread: bool; + (** Is there a background thread? Default [true] *) + + ticker_thread: bool; + (** Is there a ticker thread? Default [true]. + This thread will regularly call [tick()] on the backend, to make + sure it makes progress, and regularly send events to the collector. + This option is ignored if [thread=false]. *) +} + +val make : + ?debug:bool -> ?url:string -> + ?batch_traces:int option -> + ?batch_metrics:int option -> + ?batch_timeout_ms:int -> + ?thread:bool -> + ?ticker_thread:bool -> + unit -> t +(** Make a configuration *) + +val pp : Format.formatter -> t -> unit diff --git a/src/client/gen_ids.ml b/src/client/gen_ids.ml new file mode 100644 index 00000000..049f6efc --- /dev/null +++ b/src/client/gen_ids.ml @@ -0,0 +1,34 @@ + +open Common_ + +(* generate random IDs *) +module Make() = struct + let rand_ = Random.State.make_self_init() + + let rand_bytes_8 () : bytes = + let@() = with_lock_ in + let b = Bytes.create 8 in + for i=0 to 1 do + let r = Random.State.bits rand_ in (* 30 bits, of which we use 24 *) + Bytes.set b (i*3) (Char.chr (r land 0xff)); + Bytes.set b (i*3+1) (Char.chr (r lsr 8 land 0xff)); + Bytes.set b (i*3+2) (Char.chr (r lsr 16 land 0xff)); + done; + let r = Random.State.bits rand_ in + Bytes.set b 6 (Char.chr (r land 0xff)); + Bytes.set b 7 (Char.chr (r lsr 8 land 0xff)); + b + + let rand_bytes_16 () : bytes = + let@() = with_lock_ in + let b = Bytes.create 16 in + for i=0 to 4 do + let r = Random.State.bits rand_ in (* 30 bits, of which we use 24 *) + Bytes.set b (i*3) (Char.chr (r land 0xff)); + Bytes.set b (i*3+1) (Char.chr (r lsr 8 land 0xff)); + Bytes.set b (i*3+2) (Char.chr (r lsr 16 land 0xff)); + done; + let r = Random.State.bits rand_ in + Bytes.set b 15 (Char.chr (r land 0xff)); (* last byte *) + b +end diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 0fca427d..780ef6c2 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -4,40 +4,9 @@ https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md *) -(* TODO *) - module OT = Opentelemetry open Opentelemetry -module Atomic = Opentelemetry_atomic.Atomic - -(** Atomic list *) -module AList : sig - type 'a t - val make : unit -> 'a t - val add : 'a t -> 'a -> unit - val pop_all : 'a t -> 'a list -end = struct - type 'a t = 'a list Atomic.t - let make () = Atomic.make [] - let add self x = - while - let old = Atomic.get self in - let l' = x :: old in - not (Atomic.compare_and_set self old l') - do () done - let rec pop_all self = - let l = Atomic.get self in - if Atomic.compare_and_set self l [] then l else pop_all self -end - -let[@inline] (let@) f x = f x - -let debug_ = ref (try bool_of_string @@ Sys.getenv "DEBUG" with _ -> false) - -let default_url = "http://localhost:4318" -let url = ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url) -let get_url () = !url -let set_url s = url := s +include Common_ let needs_gc_metrics = Atomic.make false @@ -52,55 +21,7 @@ let sample_gc_metrics () = @@ Opentelemetry.GC_metrics.get_metrics() in AList.add gc_metrics l -let lock_ : (unit -> unit) ref = ref ignore -let unlock_ : (unit -> unit) ref = ref ignore -let set_mutex ~lock ~unlock : unit = - lock_ := lock; - unlock_ := unlock - -module Config = struct - type t = { - debug: bool; - url: string; - batch_traces: int option; - batch_metrics: int option; - batch_timeout_ms: int; - thread: bool; - ticker_thread: bool; - } - - let pp out self = - let ppiopt = Format.pp_print_option Format.pp_print_int in - let {debug; url; batch_traces; batch_metrics; - batch_timeout_ms; thread; ticker_thread} = self in - Format.fprintf out - "{@[ debug=%B;@ url=%S;@ \ - batch_traces=%a;@ batch_metrics=%a;@ \ - batch_timeout_ms=%d; thread=%B;@ ticker_thread=%B @]}" - debug url ppiopt batch_traces ppiopt batch_metrics - batch_timeout_ms thread ticker_thread - - let make - ?(debug= !debug_) - ?(url= get_url()) - ?(batch_traces=Some 400) - ?(batch_metrics=None) - ?(batch_timeout_ms=500) - ?(thread=true) - ?(ticker_thread=true) - () : t = - { debug; url; batch_traces; batch_metrics; batch_timeout_ms; - thread; ticker_thread; } -end - -(* critical section for [f()] *) -let[@inline] with_lock_ f = - !lock_(); - Fun.protect ~finally:!unlock_ f - -let[@inline] with_mutex_ m f = - Mutex.lock m; - Fun.protect ~finally:(fun () -> Mutex.unlock m) f +module Config = Config let _init_curl = lazy ( Curl.global_init Curl.CURLINIT_GLOBALALL; @@ -196,75 +117,6 @@ module type PUSH = sig val pop_iter_all : (elt -> unit) -> unit end -(* queue of fixed size *) -module FQueue : sig - type 'a t - val create : dummy:'a -> int -> 'a t - val size : _ t -> int - val push : 'a t -> 'a -> bool (* true iff it could write element *) - val pop_iter_all : 'a t -> ('a -> unit) -> unit -end = struct - type 'a t = { - arr: 'a array; - mutable i: int; - } - - let create ~dummy n : _ t = - assert (n >= 1); - { arr=Array.make n dummy; - i=0; - } - - let[@inline] size self = self.i - let[@inline] is_full self = self.i = Array.length self.arr - - let push (self:_ t) x : bool = - if is_full self then false - else ( - self.arr.(self.i) <- x; - self.i <- 1 + self.i; - true - ) - - let pop_iter_all (self: _ t) f = - for j=0 to self.i-1 do - f self.arr.(j) - done; - self.i <- 0 -end - -(* generate random IDs *) -module Gen_ids() = struct - let rand_ = Random.State.make_self_init() - - let rand_bytes_8 () : bytes = - let@() = with_lock_ in - let b = Bytes.create 8 in - for i=0 to 1 do - let r = Random.State.bits rand_ in (* 30 bits, of which we use 24 *) - Bytes.set b (i*3) (Char.chr (r land 0xff)); - Bytes.set b (i*3+1) (Char.chr (r lsr 8 land 0xff)); - Bytes.set b (i*3+2) (Char.chr (r lsr 16 land 0xff)); - done; - let r = Random.State.bits rand_ in - Bytes.set b 6 (Char.chr (r land 0xff)); - Bytes.set b 7 (Char.chr (r lsr 8 land 0xff)); - b - - let rand_bytes_16 () : bytes = - let@() = with_lock_ in - let b = Bytes.create 16 in - for i=0 to 4 do - let r = Random.State.bits rand_ in (* 30 bits, of which we use 24 *) - Bytes.set b (i*3) (Char.chr (r land 0xff)); - Bytes.set b (i*3+1) (Char.chr (r lsr 8 land 0xff)); - Bytes.set b (i*3+2) (Char.chr (r lsr 16 land 0xff)); - done; - let r = Random.State.bits rand_ in - Bytes.set b 15 (Char.chr (r land 0xff)); (* last byte *) - b -end - (** An emitter. This is used by {!Backend} below to forward traces/metrics/… from the program to whatever collector client we have. *) module type EMITTER = sig @@ -544,7 +396,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = module Backend(Arg : sig val config : Config.t end)() : Opentelemetry.Collector.BACKEND = struct - include Gen_ids() + include Gen_ids.Make() include (val mk_emitter ~config:Arg.config ()) diff --git a/src/client/opentelemetry_client_ocurl.mli b/src/client/opentelemetry_client_ocurl.mli index 6774fedc..6147d5b1 100644 --- a/src/client/opentelemetry_client_ocurl.mli +++ b/src/client/opentelemetry_client_ocurl.mli @@ -14,58 +14,7 @@ val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit (** Set a lock/unlock pair to protect the critical sections of {!Opentelemetry.Collector.BACKEND} *) -module Config : sig - type t = { - debug: bool; - - url: string; - (** Url of the endpoint. Default is "http://localhost:4318", - or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *) - - batch_traces: int option; - (** Batch traces? If [Some i], then this produces batches of (at most) - [i] items. If [None], there is no batching. - - Note that traces and metrics are batched separately. - Default [Some 400]. - *) - - batch_metrics: int option; - (** Batch metrics? If [Some i], then this produces batches of (at most) - [i] items. If [None], there is no batching. - - Note that traces and metrics are batched separately. - Default [None]. - *) - - batch_timeout_ms: int; - (** Number of milliseconds after which we will emit a batch, even - incomplete. - Note that the batch might take longer than that, because this is - only checked when a new event occurs. Default 500. *) - - thread: bool; - (** Is there a background thread? Default [true] *) - - ticker_thread: bool; - (** Is there a ticker thread? Default [true]. - This thread will regularly call [tick()] on the backend, to make - sure it makes progress, and regularly send events to the collector. - This option is ignored if [thread=false]. *) - } - - val make : - ?debug:bool -> ?url:string -> - ?batch_traces:int option -> - ?batch_metrics:int option -> - ?batch_timeout_ms:int -> - ?thread:bool -> - ?ticker_thread:bool -> - unit -> t - (** Make a configuration *) - - val pp : Format.formatter -> t -> unit -end +module Config = Config val setup : ?config:Config.t -> ?enable:bool -> unit -> unit (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.