refactor(client): split into several modules

This commit is contained in:
Simon Cruanes 2022-04-13 16:28:23 -04:00
parent 06cbe55d11
commit c01879c4b0
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
10 changed files with 207 additions and 203 deletions

13
src/client/AList.ml Normal file
View file

@ -0,0 +1,13 @@
module Atomic = Opentelemetry_atomic.Atomic
type 'a t = 'a list Atomic.t
let make () = Atomic.make []
let add self x =
while
let old = Atomic.get self in
let l' = x :: old in
not (Atomic.compare_and_set self old l')
do () done
let rec pop_all self =
let l = Atomic.get self in
if Atomic.compare_and_set self l [] then l else pop_all self

6
src/client/AList.mli Normal file
View file

@ -0,0 +1,6 @@
(** Atomic list *)
type 'a t
val make : unit -> 'a t
val add : 'a t -> 'a -> unit
val pop_all : 'a t -> 'a list

29
src/client/FQueue.ml Normal file
View file

@ -0,0 +1,29 @@
type 'a t = {
arr: 'a array;
mutable i: int;
}
let create ~dummy n : _ t =
assert (n >= 1);
{ arr=Array.make n dummy;
i=0;
}
let[@inline] size self = self.i
let[@inline] is_full self = self.i = Array.length self.arr
let push (self:_ t) x : bool =
if is_full self then false
else (
self.arr.(self.i) <- x;
self.i <- 1 + self.i;
true
)
let pop_iter_all (self: _ t) f =
for j=0 to self.i-1 do
f self.arr.(j)
done;
self.i <- 0

9
src/client/FQueue.mli Normal file
View file

@ -0,0 +1,9 @@
(** queue of fixed size *)
type 'a t
val create : dummy:'a -> int -> 'a t
val size : _ t -> int
val push : 'a t -> 'a -> bool (* true iff it could write element *)
val pop_iter_all : 'a t -> ('a -> unit) -> unit

26
src/client/common_.ml Normal file
View file

@ -0,0 +1,26 @@
module Atomic = Opentelemetry_atomic.Atomic
let[@inline] (let@) f x = f x
let debug_ = ref (try bool_of_string @@ Sys.getenv "DEBUG" with _ -> false)
let lock_ : (unit -> unit) ref = ref ignore
let unlock_ : (unit -> unit) ref = ref ignore
let set_mutex ~lock ~unlock : unit =
lock_ := lock;
unlock_ := unlock
(* critical section for [f()] *)
let[@inline] with_lock_ f =
!lock_();
Fun.protect ~finally:!unlock_ f
let[@inline] with_mutex_ m f =
Mutex.lock m;
Fun.protect ~finally:(fun () -> Mutex.unlock m) f
let default_url = "http://localhost:4318"
let url = ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url)
let get_url () = !url
let set_url s = url := s

35
src/client/config.ml Normal file
View file

@ -0,0 +1,35 @@
open Common_
type t = {
debug: bool;
url: string;
batch_traces: int option;
batch_metrics: int option;
batch_timeout_ms: int;
thread: bool;
ticker_thread: bool;
}
let pp out self =
let ppiopt = Format.pp_print_option Format.pp_print_int in
let {debug; url; batch_traces; batch_metrics;
batch_timeout_ms; thread; ticker_thread} = self in
Format.fprintf out
"{@[ debug=%B;@ url=%S;@ \
batch_traces=%a;@ batch_metrics=%a;@ \
batch_timeout_ms=%d; thread=%B;@ ticker_thread=%B @]}"
debug url ppiopt batch_traces ppiopt batch_metrics
batch_timeout_ms thread ticker_thread
let make
?(debug= !debug_)
?(url= get_url())
?(batch_traces=Some 400)
?(batch_metrics=None)
?(batch_timeout_ms=500)
?(thread=true)
?(ticker_thread=true)
() : t =
{ debug; url; batch_traces; batch_metrics; batch_timeout_ms;
thread; ticker_thread; }

51
src/client/config.mli Normal file
View file

@ -0,0 +1,51 @@
type t = {
debug: bool;
url: string;
(** Url of the endpoint. Default is "http://localhost:4318",
or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *)
batch_traces: int option;
(** Batch traces? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
Note that traces and metrics are batched separately.
Default [Some 400].
*)
batch_metrics: int option;
(** Batch metrics? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
Note that traces and metrics are batched separately.
Default [None].
*)
batch_timeout_ms: int;
(** Number of milliseconds after which we will emit a batch, even
incomplete.
Note that the batch might take longer than that, because this is
only checked when a new event occurs. Default 500. *)
thread: bool;
(** Is there a background thread? Default [true] *)
ticker_thread: bool;
(** Is there a ticker thread? Default [true].
This thread will regularly call [tick()] on the backend, to make
sure it makes progress, and regularly send events to the collector.
This option is ignored if [thread=false]. *)
}
val make :
?debug:bool -> ?url:string ->
?batch_traces:int option ->
?batch_metrics:int option ->
?batch_timeout_ms:int ->
?thread:bool ->
?ticker_thread:bool ->
unit -> t
(** Make a configuration *)
val pp : Format.formatter -> t -> unit

34
src/client/gen_ids.ml Normal file
View file

@ -0,0 +1,34 @@
open Common_
(* generate random IDs *)
module Make() = struct
let rand_ = Random.State.make_self_init()
let rand_bytes_8 () : bytes =
let@() = with_lock_ in
let b = Bytes.create 8 in
for i=0 to 1 do
let r = Random.State.bits rand_ in (* 30 bits, of which we use 24 *)
Bytes.set b (i*3) (Char.chr (r land 0xff));
Bytes.set b (i*3+1) (Char.chr (r lsr 8 land 0xff));
Bytes.set b (i*3+2) (Char.chr (r lsr 16 land 0xff));
done;
let r = Random.State.bits rand_ in
Bytes.set b 6 (Char.chr (r land 0xff));
Bytes.set b 7 (Char.chr (r lsr 8 land 0xff));
b
let rand_bytes_16 () : bytes =
let@() = with_lock_ in
let b = Bytes.create 16 in
for i=0 to 4 do
let r = Random.State.bits rand_ in (* 30 bits, of which we use 24 *)
Bytes.set b (i*3) (Char.chr (r land 0xff));
Bytes.set b (i*3+1) (Char.chr (r lsr 8 land 0xff));
Bytes.set b (i*3+2) (Char.chr (r lsr 16 land 0xff));
done;
let r = Random.State.bits rand_ in
Bytes.set b 15 (Char.chr (r land 0xff)); (* last byte *)
b
end

View file

@ -4,40 +4,9 @@
https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md
*)
(* TODO *)
module OT = Opentelemetry
open Opentelemetry
module Atomic = Opentelemetry_atomic.Atomic
(** Atomic list *)
module AList : sig
type 'a t
val make : unit -> 'a t
val add : 'a t -> 'a -> unit
val pop_all : 'a t -> 'a list
end = struct
type 'a t = 'a list Atomic.t
let make () = Atomic.make []
let add self x =
while
let old = Atomic.get self in
let l' = x :: old in
not (Atomic.compare_and_set self old l')
do () done
let rec pop_all self =
let l = Atomic.get self in
if Atomic.compare_and_set self l [] then l else pop_all self
end
let[@inline] (let@) f x = f x
let debug_ = ref (try bool_of_string @@ Sys.getenv "DEBUG" with _ -> false)
let default_url = "http://localhost:4318"
let url = ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url)
let get_url () = !url
let set_url s = url := s
include Common_
let needs_gc_metrics = Atomic.make false
@ -52,55 +21,7 @@ let sample_gc_metrics () =
@@ Opentelemetry.GC_metrics.get_metrics() in
AList.add gc_metrics l
let lock_ : (unit -> unit) ref = ref ignore
let unlock_ : (unit -> unit) ref = ref ignore
let set_mutex ~lock ~unlock : unit =
lock_ := lock;
unlock_ := unlock
module Config = struct
type t = {
debug: bool;
url: string;
batch_traces: int option;
batch_metrics: int option;
batch_timeout_ms: int;
thread: bool;
ticker_thread: bool;
}
let pp out self =
let ppiopt = Format.pp_print_option Format.pp_print_int in
let {debug; url; batch_traces; batch_metrics;
batch_timeout_ms; thread; ticker_thread} = self in
Format.fprintf out
"{@[ debug=%B;@ url=%S;@ \
batch_traces=%a;@ batch_metrics=%a;@ \
batch_timeout_ms=%d; thread=%B;@ ticker_thread=%B @]}"
debug url ppiopt batch_traces ppiopt batch_metrics
batch_timeout_ms thread ticker_thread
let make
?(debug= !debug_)
?(url= get_url())
?(batch_traces=Some 400)
?(batch_metrics=None)
?(batch_timeout_ms=500)
?(thread=true)
?(ticker_thread=true)
() : t =
{ debug; url; batch_traces; batch_metrics; batch_timeout_ms;
thread; ticker_thread; }
end
(* critical section for [f()] *)
let[@inline] with_lock_ f =
!lock_();
Fun.protect ~finally:!unlock_ f
let[@inline] with_mutex_ m f =
Mutex.lock m;
Fun.protect ~finally:(fun () -> Mutex.unlock m) f
module Config = Config
let _init_curl = lazy (
Curl.global_init Curl.CURLINIT_GLOBALALL;
@ -196,75 +117,6 @@ module type PUSH = sig
val pop_iter_all : (elt -> unit) -> unit
end
(* queue of fixed size *)
module FQueue : sig
type 'a t
val create : dummy:'a -> int -> 'a t
val size : _ t -> int
val push : 'a t -> 'a -> bool (* true iff it could write element *)
val pop_iter_all : 'a t -> ('a -> unit) -> unit
end = struct
type 'a t = {
arr: 'a array;
mutable i: int;
}
let create ~dummy n : _ t =
assert (n >= 1);
{ arr=Array.make n dummy;
i=0;
}
let[@inline] size self = self.i
let[@inline] is_full self = self.i = Array.length self.arr
let push (self:_ t) x : bool =
if is_full self then false
else (
self.arr.(self.i) <- x;
self.i <- 1 + self.i;
true
)
let pop_iter_all (self: _ t) f =
for j=0 to self.i-1 do
f self.arr.(j)
done;
self.i <- 0
end
(* generate random IDs *)
module Gen_ids() = struct
let rand_ = Random.State.make_self_init()
let rand_bytes_8 () : bytes =
let@() = with_lock_ in
let b = Bytes.create 8 in
for i=0 to 1 do
let r = Random.State.bits rand_ in (* 30 bits, of which we use 24 *)
Bytes.set b (i*3) (Char.chr (r land 0xff));
Bytes.set b (i*3+1) (Char.chr (r lsr 8 land 0xff));
Bytes.set b (i*3+2) (Char.chr (r lsr 16 land 0xff));
done;
let r = Random.State.bits rand_ in
Bytes.set b 6 (Char.chr (r land 0xff));
Bytes.set b 7 (Char.chr (r lsr 8 land 0xff));
b
let rand_bytes_16 () : bytes =
let@() = with_lock_ in
let b = Bytes.create 16 in
for i=0 to 4 do
let r = Random.State.bits rand_ in (* 30 bits, of which we use 24 *)
Bytes.set b (i*3) (Char.chr (r land 0xff));
Bytes.set b (i*3+1) (Char.chr (r lsr 8 land 0xff));
Bytes.set b (i*3+2) (Char.chr (r lsr 16 land 0xff));
done;
let r = Random.State.bits rand_ in
Bytes.set b 15 (Char.chr (r land 0xff)); (* last byte *)
b
end
(** An emitter. This is used by {!Backend} below to forward traces/metrics/…
from the program to whatever collector client we have. *)
module type EMITTER = sig
@ -544,7 +396,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
module Backend(Arg : sig val config : Config.t end)()
: Opentelemetry.Collector.BACKEND
= struct
include Gen_ids()
include Gen_ids.Make()
include (val mk_emitter ~config:Arg.config ())

View file

@ -14,58 +14,7 @@ val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit
(** Set a lock/unlock pair to protect the critical sections
of {!Opentelemetry.Collector.BACKEND} *)
module Config : sig
type t = {
debug: bool;
url: string;
(** Url of the endpoint. Default is "http://localhost:4318",
or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *)
batch_traces: int option;
(** Batch traces? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
Note that traces and metrics are batched separately.
Default [Some 400].
*)
batch_metrics: int option;
(** Batch metrics? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
Note that traces and metrics are batched separately.
Default [None].
*)
batch_timeout_ms: int;
(** Number of milliseconds after which we will emit a batch, even
incomplete.
Note that the batch might take longer than that, because this is
only checked when a new event occurs. Default 500. *)
thread: bool;
(** Is there a background thread? Default [true] *)
ticker_thread: bool;
(** Is there a ticker thread? Default [true].
This thread will regularly call [tick()] on the backend, to make
sure it makes progress, and regularly send events to the collector.
This option is ignored if [thread=false]. *)
}
val make :
?debug:bool -> ?url:string ->
?batch_traces:int option ->
?batch_metrics:int option ->
?batch_timeout_ms:int ->
?thread:bool ->
?ticker_thread:bool ->
unit -> t
(** Make a configuration *)
val pp : Format.formatter -> t -> unit
end
module Config = Config
val setup : ?config:Config.t -> ?enable:bool -> unit -> unit
(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.