client: start heavily refactoring to use Aswitch, also fix bugs

This commit is contained in:
Simon Cruanes 2025-12-08 15:36:19 -05:00
parent 3026ad41ad
commit 15268270df
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
19 changed files with 455 additions and 215 deletions

View file

@ -1,5 +1,4 @@
open Opentelemetry_atomic open Opentelemetry_atomic
module Domain = Opentelemetry_domain
type 'a state = { type 'a state = {
start: Mtime.t; start: Mtime.t;
@ -49,27 +48,10 @@ let timeout_expired_ ~now ~timeout (self : _ state) : bool =
(** Big enough to send? *) (** Big enough to send? *)
let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch
let[@inline] atomic_update_loop_ (type res) (self : _ t)
(f : 'a state -> 'a state * res) : res =
let exception Return of res in
try
let backoff = ref 1 in
while true do
let st = Atomic.get self.st in
let new_st, res = f st in
if Atomic.compare_and_set self.st st new_st then
raise_notrace (Return res);
(* poor man's backoff strategy *)
Domain.relax_loop !backoff;
backoff := min 128 (2 * !backoff)
done
with Return res -> res
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
let rev_batch_opt = let rev_batch_opt =
(* update state. When uncontended this runs only once. *) (* update state. When uncontended this runs only once. *)
atomic_update_loop_ self @@ fun state -> Util_atomic.update_cas self.st @@ fun state ->
(* *) (* *)
(* check if the batch is ready *) (* check if the batch is ready *)
@ -84,9 +66,9 @@ let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
assert (state.q <> []); assert (state.q <> []);
let batch = state.q in let batch = state.q in
let new_st = _empty_state in let new_st = _empty_state in
new_st, Some batch Some batch, new_st
) else ) else
state, None None, state
in in
match rev_batch_opt with match rev_batch_opt with
| None -> None | None -> None
@ -99,10 +81,10 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] =
`Ok `Ok
else ( else (
let now = lazy (Mtime_clock.now ()) in let now = lazy (Mtime_clock.now ()) in
atomic_update_loop_ self @@ fun state -> Util_atomic.update_cas self.st @@ fun state ->
if state.size >= self.high_watermark then if state.size >= self.high_watermark then
(* drop this to prevent queue from growing too fast *) (* drop this to prevent queue from growing too fast *)
state, `Dropped `Dropped, state
else ( else (
let start = let start =
if state.size = 0 && Option.is_some self.timeout then if state.size = 0 && Option.is_some self.timeout then
@ -120,7 +102,7 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] =
} }
in in
state, `Ok `Ok, state
) )
) )
@ -134,8 +116,8 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t =
then [e] itself will be closed. *) then [e] itself will be closed. *)
let closed_here = Atomic.make false in let closed_here = Atomic.make false in
let enabled () = e.enabled () in let enabled () = (not (Atomic.get closed_here)) && e.enabled () in
let closed () = e.closed () in let closed () = Atomic.get closed_here || e.closed () in
let flush_and_close () = let flush_and_close () =
if not (Atomic.exchange closed_here true) then ( if not (Atomic.exchange closed_here true) then (
(* NOTE: we need to close this wrapping emitter first, to prevent (* NOTE: we need to close this wrapping emitter first, to prevent
@ -145,6 +127,7 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t =
| None -> () | None -> ()
| Some l -> Emitter.emit e l); | Some l -> Emitter.emit e l);
(* now we can close [e], nothing remains in [self] *)
Emitter.flush_and_close e Emitter.flush_and_close e
) )
in in

View file

@ -49,8 +49,11 @@ let[@inline] close (self : _ t) : unit = self.close ()
let[@inline] closed (self : _ t) : bool = self.closed () let[@inline] closed (self : _ t) : bool = self.closed ()
(** Turn the writing end of the queue into an emitter. *) (** Turn the writing end of the queue into an emitter.
let to_emitter (self : 'a t) : 'a Opentelemetry_emitter.Emitter.t = @param close_queue_on_close
if true, closing the emitter will close the queue *)
let to_emitter ~close_queue_on_close (self : 'a t) :
'a Opentelemetry_emitter.Emitter.t =
let closed () = self.closed () in let closed () = self.closed () in
let enabled () = not (closed ()) in let enabled () = not (closed ()) in
let emit x = if x <> [] then push self x in let emit x = if x <> [] then push self x in
@ -58,7 +61,7 @@ let to_emitter (self : 'a t) : 'a Opentelemetry_emitter.Emitter.t =
(* NOTE: we cannot actually flush, only close. Emptying the queue is (* NOTE: we cannot actually flush, only close. Emptying the queue is
fundamentally asynchronous because it's done by consumers *) fundamentally asynchronous because it's done by consumers *)
let flush_and_close () = close self in let flush_and_close () = if close_queue_on_close then close self in
{ closed; enabled; emit; tick; flush_and_close } { closed; enabled; emit; tick; flush_and_close }
module Defaults = struct module Defaults = struct

View file

@ -1,6 +1,8 @@
module BQ = Bounded_queue module BQ = Bounded_queue
exception Closed = Bounded_queue.Closed type push_res =
| Closed
| Pushed of { num_discarded: int }
(* a variant of {!Sync_queue} with more bespoke pushing behavior *) (* a variant of {!Sync_queue} with more bespoke pushing behavior *)
module Q : sig module Q : sig
@ -12,9 +14,9 @@ module Q : sig
val closed : _ t -> bool val closed : _ t -> bool
val try_pop : 'a t -> 'a option val try_pop : 'a t -> 'a BQ.pop_result
val push_while_not_full : high_watermark:int -> 'a t -> 'a list -> int * int val push_while_not_full : high_watermark:int -> 'a t -> 'a list -> push_res
(** [push_while_not_full q ~high_watermark xs] tries to push each item of [x] (** [push_while_not_full q ~high_watermark xs] tries to push each item of [x]
into [q]. into [q].
@ -43,17 +45,20 @@ end = struct
UM.protect self.mutex @@ fun () -> UM.protect self.mutex @@ fun () ->
if not self.closed then self.closed <- true if not self.closed then self.closed <- true
let try_pop (self : 'a t) : 'a option = let try_pop (self : 'a t) : 'a BQ.pop_result =
UM.protect self.mutex @@ fun () -> UM.protect self.mutex @@ fun () ->
if self.closed then raise Closed; if self.closed then
try Some (Queue.pop self.q) with Queue.Empty -> None `Closed
else (
try `Item (Queue.pop self.q) with Queue.Empty -> `Empty
)
let push_while_not_full ~high_watermark (self : 'a t) (xs : 'a list) : let push_while_not_full ~high_watermark (self : 'a t) (xs : 'a list) :
int * int = push_res =
UM.protect self.mutex @@ fun () -> UM.protect self.mutex @@ fun () ->
if self.closed then raise Closed; if self.closed then
Closed
let old_size = Queue.length self.q in else (
let xs = ref xs in let xs = ref xs in
let continue = ref true in let continue = ref true in
@ -65,8 +70,9 @@ end = struct
Queue.push x self.q Queue.push x self.q
done; done;
let n_discarded = List.length !xs in let num_discarded = List.length !xs in
n_discarded, old_size Pushed { num_discarded }
)
end end
type 'a state = { type 'a state = {
@ -77,23 +83,22 @@ type 'a state = {
} }
let push (self : _ state) x = let push (self : _ state) x =
let discarded, old_size = if x <> [] then (
try Q.push_while_not_full self.q ~high_watermark:self.high_watermark x match
with Sync_queue.Closed -> raise BQ.Closed Q.push_while_not_full self.q ~high_watermark:self.high_watermark x
in with
| Closed -> Printf.eprintf "bounded queue: warning: queue is closed\n%!"
| Pushed { num_discarded } ->
if num_discarded > 0 then (
Printf.eprintf "DISCARD %d items\n%!" num_discarded;
ignore (Atomic.fetch_and_add self.n_discarded num_discarded : int)
);
if discarded > 0 then (* wake up potentially asleep consumers *)
ignore (Atomic.fetch_and_add self.n_discarded discarded : int); Cb_set.trigger self.on_non_empty
)
(* wake up lagards if the queue was empty *) let[@inline] try_pop (self : _ state) : _ BQ.pop_result = Q.try_pop self.q
if old_size = 0 then Cb_set.trigger self.on_non_empty;
()
let try_pop (self : _ state) : _ BQ.pop_result =
match Q.try_pop self.q with
| Some x -> `Item x
| None -> `Empty
| exception Sync_queue.Closed -> `Closed
let to_bounded_queue (self : 'a state) : 'a BQ.t = let to_bounded_queue (self : 'a state) : 'a BQ.t =
let closed () = Q.closed self.q in let closed () = Q.closed self.q in

View file

@ -39,8 +39,9 @@ let pp out (self : t) : unit =
in in
Format.fprintf out Format.fprintf out
"{@[ debug=%B;@ self_trace=%B; url_traces=%S;@ url_metrics=%S;@ \ "{@[ debug=%B;@ self_trace=%B; url_traces=%S;@ url_metrics=%S;@ \
url_logs=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \ url_logs=%S;@ @[<2>headers=@,\
batch_logs=%a;@ batch_timeout_ms=%d;@ http_concurrency_level=%a @]}" %a@];@ batch_traces=%a;@ batch_metrics=%a;@ batch_logs=%a;@ \
batch_timeout_ms=%d;@ http_concurrency_level=%a @]}"
debug self_trace url_traces url_metrics url_logs ppheaders headers ppiopt debug self_trace url_traces url_metrics url_logs ppheaders headers ppiopt
batch_traces ppiopt batch_metrics ppiopt batch_logs batch_timeout_ms ppiopt batch_traces ppiopt batch_metrics ppiopt batch_logs batch_timeout_ms ppiopt
http_concurrency_level http_concurrency_level

View file

@ -1,21 +1,29 @@
(** Consumer that accepts items from a bounded queue *) (** Consumer that accepts items from a bounded queue *)
open Common_
type 'a t = { type 'a t = {
active: unit -> bool; (** Still running? Must be fast and thread-safe *) active: unit -> Aswitch.t;
shutdown: unit -> unit;
(** Shutdown the consumer as soon as possible. [active] will be turned off
once the consumer is fully shut down. *)
tick: unit -> unit; tick: unit -> unit;
(** Regularly called, eg to emit metrics, check timeouts, etc. Must be (** Regularly called, eg to emit metrics, check timeouts, etc. Must be
thread safe. *) thread safe. *)
shutdown: on_done:(unit -> unit) -> unit; self_metrics: unit -> OTEL.Metrics.t list; (** Self observing metrics *)
(** Shutdown the consumer as soon as possible, call [on_done()] once it's
done. *)
} }
(** A consumer for signals of type ['a] *) (** A consumer for signals of type ['a] *)
type 'a consumer = 'a t type 'a consumer = 'a t
let[@inline] active (self : _ t) = self.active () let[@inline] active (self : _ t) : Aswitch.t = self.active ()
let[@inline] shutdown (self : _ t) ~on_done = self.shutdown ~on_done let[@inline] shutdown (self : _ t) : unit = self.shutdown ()
let[@inline] self_metrics self : _ list = self.self_metrics ()
(** [on_stop e f] calls [f()] when [e] stops, or now if it's already stopped *)
let on_stop self f = Aswitch.on_turn_off (self.active ()) f
module Builder = struct module Builder = struct
type 'a t = { start_consuming: 'a Bounded_queue.t -> 'a consumer } type 'a t = { start_consuming: 'a Bounded_queue.t -> 'a consumer }

View file

@ -0,0 +1,33 @@
(** Combine multiple emitters into one *)
open Opentelemetry_emitter.Emitter
type closing_behavior =
[ `Close_when_all_closed
| `Close_when_one_closed
]
(** When to close the combined emitter:
- [`Close_when_all_closed]: closed when all the emitters that are combined
are closed
- [`Close_when_one_closed]: closed as soon as one of the emitters is closed
*)
(** [combine_l es] is an emitter that sends signals to every emitter in [es].
@param closing
when is this emitter closing. Default [`Close_when_all_closed]. *)
let combine_l ?(closing : closing_behavior = `Close_when_all_closed)
(es : 'a t list) : 'a t =
let closed =
fun () ->
match closing with
| `Close_when_all_closed -> List.for_all closed es
| `Close_when_one_closed -> List.exists closed es
in
let enabled () = not (closed ()) in
let emit x = if x <> [] then List.iter (fun e -> emit e x) es in
let tick ~now = List.iter (tick ~now) es in
let flush_and_close () = List.iter flush_and_close es in
{ closed; enabled; emit; tick; flush_and_close }
let combine e1 e2 : _ t = combine_l [ e1; e2 ]

View file

@ -23,4 +23,24 @@ let add_batching ~(config : Client_config.t) (exp : OTEL.Exporter.t) :
let emit_metrics = add_batch_opt config.batch_metrics exp.emit_metrics in let emit_metrics = add_batch_opt config.batch_metrics exp.emit_metrics in
let emit_logs = add_batch_opt config.batch_logs exp.emit_logs in let emit_logs = add_batch_opt config.batch_logs exp.emit_logs in
{ exp with emit_spans; emit_metrics; emit_logs } let active = exp.active in
let tick = exp.tick in
let on_tick = exp.on_tick in
let shutdown () =
let open Opentelemetry_emitter in
Emitter.flush_and_close emit_spans;
Emitter.flush_and_close emit_metrics;
Emitter.flush_and_close emit_logs;
exp.shutdown ()
in
{
OTEL.Exporter.active;
emit_spans;
emit_metrics;
emit_logs;
on_tick;
tick;
shutdown;
}

View file

@ -2,24 +2,26 @@ open Common_
open Opentelemetry_atomic open Opentelemetry_atomic
open struct open struct
let shutdown_l ~on_done:on_done_real (es : OTEL.Exporter.t list) : unit = let shutdown_l (es : OTEL.Exporter.t list) ~trigger : unit =
let missing = Atomic.make (List.length es) in let missing = Atomic.make (List.length es) in
let on_done () = let on_done () =
if Atomic.fetch_and_add missing (-1) = 1 then if Atomic.fetch_and_add missing (-1) = 1 then
(* we were the last exporter to shutdown, [missing] is now 0 *) (* we were the last exporter to shutdown, [missing] is now 0 *)
on_done_real () Aswitch.turn_off trigger
in in
List.iter (OTEL.Exporter.shutdown ~on_done) es List.iter (fun e -> Aswitch.on_turn_off (OTEL.Exporter.active e) on_done) es;
List.iter OTEL.Exporter.shutdown es
end end
let combine_l (es : OTEL.Exporter.t list) : OTEL.Exporter.t = let combine_l (es : OTEL.Exporter.t list) : OTEL.Exporter.t =
let open OTEL.Exporter in let open OTEL.Exporter in
if es = [] then if es = [] then
OTEL.Exporter.dummy () OTEL.Exporter.dummy ()
else else (
let active, trigger = Aswitch.create () in
{ {
active = (fun () -> active);
emit_spans = emit_spans =
Emitter_combine.combine_l (List.map (fun e -> e.emit_spans) es); Emitter_combine.combine_l (List.map (fun e -> e.emit_spans) es);
emit_logs = Emitter_combine.combine_l (List.map (fun e -> e.emit_logs) es); emit_logs = Emitter_combine.combine_l (List.map (fun e -> e.emit_logs) es);
@ -27,8 +29,9 @@ let combine_l (es : OTEL.Exporter.t list) : OTEL.Exporter.t =
Emitter_combine.combine_l (List.map (fun e -> e.emit_metrics) es); Emitter_combine.combine_l (List.map (fun e -> e.emit_metrics) es);
on_tick = (fun f -> List.iter (fun e -> e.on_tick f) es); on_tick = (fun f -> List.iter (fun e -> e.on_tick f) es);
tick = (fun () -> List.iter tick es); tick = (fun () -> List.iter tick es);
shutdown = (fun ~on_done () -> shutdown_l ~on_done es); shutdown = (fun () -> shutdown_l es ~trigger);
} }
)
(** [combine exp1 exp2] is the exporter that emits signals to both [exp1] and (** [combine exp1 exp2] is the exporter that emits signals to both [exp1] and
[exp2]. *) [exp2]. *)

View file

@ -5,8 +5,10 @@ open Opentelemetry_emitter
@param out the formatter into which to print, default [stderr]. *) @param out the formatter into which to print, default [stderr]. *)
let debug ?(out = Format.err_formatter) () : OTEL.Exporter.t = let debug ?(out = Format.err_formatter) () : OTEL.Exporter.t =
let open Proto in let open Proto in
let active, trigger = Aswitch.create () in
let ticker = Cb_set.create () in let ticker = Cb_set.create () in
{ {
active = (fun () -> active);
emit_spans = emit_spans =
Emitter.make_simple () ~emit:(fun sp -> Emitter.make_simple () ~emit:(fun sp ->
List.iter (Format.fprintf out "SPAN: %a@." Trace.pp_span) sp); List.iter (Format.fprintf out "SPAN: %a@." Trace.pp_span) sp);
@ -21,7 +23,7 @@ let debug ?(out = Format.err_formatter) () : OTEL.Exporter.t =
on_tick = Cb_set.register ticker; on_tick = Cb_set.register ticker;
tick = (fun () -> Cb_set.trigger ticker); tick = (fun () -> Cb_set.trigger ticker);
shutdown = shutdown =
(fun ~on_done () -> (fun () ->
Format.fprintf out "CLEANUP@."; Format.fprintf out "CLEANUP@.";
on_done ()); Aswitch.turn_off trigger);
} }

View file

@ -6,19 +6,19 @@ module BQ = Bounded_queue
module BQ_emitters = struct module BQ_emitters = struct
let logs_emitter_of_bq ?service_name ?attrs let logs_emitter_of_bq ?service_name ?attrs
(q : Any_resource.t Bounded_queue.t) : OTEL.Logger.t = (q : Any_resource.t Bounded_queue.t) : OTEL.Logger.t =
Bounded_queue.to_emitter q Bounded_queue.to_emitter q ~close_queue_on_close:false
|> Opentelemetry_emitter.Emitter.flat_map |> Opentelemetry_emitter.Emitter.flat_map
(Any_resource.of_logs_or_empty ?service_name ?attrs) (Any_resource.of_logs_or_empty ?service_name ?attrs)
let spans_emitter_of_bq ?service_name ?attrs let spans_emitter_of_bq ?service_name ?attrs
(q : Any_resource.t Bounded_queue.t) : OTEL.Tracer.t = (q : Any_resource.t Bounded_queue.t) : OTEL.Tracer.t =
Bounded_queue.to_emitter q Bounded_queue.to_emitter q ~close_queue_on_close:false
|> Opentelemetry_emitter.Emitter.flat_map |> Opentelemetry_emitter.Emitter.flat_map
(Any_resource.of_spans_or_empty ?service_name ?attrs) (Any_resource.of_spans_or_empty ?service_name ?attrs)
let metrics_emitter_of_bq ?service_name ?attrs let metrics_emitter_of_bq ?service_name ?attrs
(q : Any_resource.t Bounded_queue.t) : OTEL.Metrics_emitter.t = (q : Any_resource.t Bounded_queue.t) : OTEL.Metrics_emitter.t =
Bounded_queue.to_emitter q Bounded_queue.to_emitter q ~close_queue_on_close:false
|> Opentelemetry_emitter.Emitter.flat_map |> Opentelemetry_emitter.Emitter.flat_map
(Any_resource.of_metrics_or_empty ?service_name ?attrs) (Any_resource.of_metrics_or_empty ?service_name ?attrs)
end end
@ -31,6 +31,11 @@ end
@param resource_attributes attributes added to every "resource" batch *) @param resource_attributes attributes added to every "resource" batch *)
let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t) let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t)
~(consumer : Consumer.any_resource_builder) () : OTEL.Exporter.t = ~(consumer : Consumer.any_resource_builder) () : OTEL.Exporter.t =
let open Opentelemetry_emitter in
let shutdown_started = Atomic.make false in
let active, trigger = Aswitch.create () in
let consumer = consumer.start_consuming q in
let emit_spans = let emit_spans =
BQ_emitters.spans_emitter_of_bq ~attrs:resource_attributes q BQ_emitters.spans_emitter_of_bq ~attrs:resource_attributes q
in in
@ -43,15 +48,25 @@ let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t)
let tick () = Cb_set.trigger tick_set in let tick () = Cb_set.trigger tick_set in
let on_tick f = Cb_set.register tick_set f in let on_tick f = Cb_set.register tick_set f in
let closed = Atomic.make false in let shutdown () =
if Aswitch.is_on active && not (Atomic.exchange shutdown_started true) then (
(* flush all emitters *)
Emitter.flush_and_close emit_spans;
Emitter.flush_and_close emit_logs;
Emitter.flush_and_close emit_metrics;
let consumer = consumer.start_consuming q in (* first, prevent further pushes to the queue. Consumer workers
can still drain it. *)
let shutdown ~on_done () =
if not (Atomic.exchange closed true) then (
Bounded_queue.close q; Bounded_queue.close q;
Consumer.shutdown consumer ~on_done
) else (* shutdown consumer; once it's down it'll turn our switch off too *)
on_done () Aswitch.link (Consumer.active consumer) trigger;
Consumer.shutdown consumer
)
in in
{ emit_logs; emit_metrics; emit_spans; tick; on_tick; shutdown }
(* if consumer shuts down for some reason, we also must *)
Aswitch.on_turn_off (Consumer.active consumer) shutdown;
let active () = active in
{ active; emit_logs; emit_metrics; emit_spans; tick; on_tick; shutdown }

View file

@ -35,22 +35,22 @@ let stdout : OTEL.Exporter.t =
let mutex = Mutex.create () in let mutex = Mutex.create () in
let ticker = Cb_set.create () in let ticker = Cb_set.create () in
let closed = Atomic.make false in let active, trigger = Aswitch.create () in
let tick () = Cb_set.trigger ticker in let tick () = Cb_set.trigger ticker in
let mk_emitter pp_signal = let mk_emitter pp_signal =
let emit l = let emit l =
if Atomic.get closed then raise Emitter.Closed; if Aswitch.is_off active then raise Emitter.Closed;
pp_vlist mutex pp_signal out l pp_vlist mutex pp_signal out l
in in
let enabled () = not (Atomic.get closed) in let enabled () = Aswitch.is_on active in
let tick ~now:_ = () in let tick ~now:_ = () in
let flush_and_close () = let flush_and_close () =
if not (Atomic.exchange closed true) then if Aswitch.is_on active then
let@ () = Util_mutex.protect mutex in let@ () = Util_mutex.protect mutex in
Format.pp_print_flush out () Format.pp_print_flush out ()
in in
let closed () = Atomic.get closed in let closed () = Aswitch.is_off active in
{ Emitter.emit; closed; enabled; tick; flush_and_close } { Emitter.emit; closed; enabled; tick; flush_and_close }
in in
@ -58,14 +58,15 @@ let stdout : OTEL.Exporter.t =
let emit_logs = mk_emitter Proto.Logs.pp_log_record in let emit_logs = mk_emitter Proto.Logs.pp_log_record in
let emit_metrics = mk_emitter Proto.Metrics.pp_metric in let emit_metrics = mk_emitter Proto.Metrics.pp_metric in
let shutdown ~on_done () = let shutdown () =
Emitter.flush_and_close emit_spans; Emitter.flush_and_close emit_spans;
Emitter.flush_and_close emit_logs; Emitter.flush_and_close emit_logs;
Emitter.flush_and_close emit_metrics; Emitter.flush_and_close emit_metrics;
on_done () Aswitch.turn_off trigger
in in
{ {
active = (fun () -> active);
emit_spans; emit_spans;
emit_logs; emit_logs;
emit_metrics; emit_metrics;

View file

@ -0,0 +1,138 @@
(** A consumer that just calls another exporter.
This is useful to introduce queueing behavior using {!Exporter_queued}, but
simply forwarding to another (presumably non-queue) exporter.
It is generic because we need some sort of threading/concurrency to run the
consumer. *)
open Common_
module type IO = Generic_io.S_WITH_CONCURRENCY
module Make
(IO : IO)
(Notifier : Generic_notifier.S with type 'a IO.t = 'a IO.t) : sig
val consumer : OTEL.Exporter.t -> OTEL.Any_signal_l.t Consumer.Builder.t
end = struct
open IO
type status =
| Active
| Shutting_down
| Stopped
type state = {
active: Aswitch.t; (** Public facing switch *)
active_trigger: Aswitch.trigger;
status: status Atomic.t; (** Internal state, including shutdown *)
q: OTEL.Any_signal_l.t Bounded_queue.t;
notify: Notifier.t;
exp: OTEL.Exporter.t;
}
let shutdown self : unit =
let old_status =
Util_atomic.update_cas self.status @@ fun status ->
match status with
| Stopped -> status, status
| Shutting_down -> status, status
| Active -> status, Shutting_down
in
match old_status with
| Stopped -> ()
| Shutting_down ->
(* when the worker stops it will call [on_done] *)
()
| Active ->
(* notify potentially asleep workers *)
Notifier.trigger self.notify;
Notifier.delete self.notify
let tick (self : state) = Notifier.trigger self.notify
(** Shutdown one worker, when the queue is closed *)
let shutdown_worker (self : state) : unit =
(* we were the last worker *)
(* Printf.eprintf "worker %d: last one!\n%!" tid; *)
Atomic.set self.status Stopped;
Aswitch.turn_off self.active_trigger
let start_worker (self : state) : unit =
(* loop on [q] *)
let rec loop () : unit IO.t =
match Bounded_queue.try_pop self.q with
| `Closed ->
shutdown_worker self;
IO.return ()
| `Item (Logs logs) ->
OTEL.Exporter.send_logs self.exp logs;
loop ()
| `Item (Metrics ms) ->
OTEL.Exporter.send_metrics self.exp ms;
loop ()
| `Item (Spans sp) ->
OTEL.Exporter.send_trace self.exp sp;
loop ()
| `Empty ->
(match Atomic.get self.status with
| Stopped ->
assert false
(* shouldn't happen without us going through [Shutting_down] *)
| Shutting_down ->
shutdown_worker self;
IO.return ()
| Active ->
let* () = Notifier.wait self.notify in
loop ())
in
IO.spawn loop
let create_state ~q ~exporter () : state =
let active, active_trigger = Aswitch.create () in
let self =
{
active;
active_trigger;
status = Atomic.make Active;
q;
exp = exporter;
notify = Notifier.create ();
}
in
(* if [exporter] turns off, shut us down too *)
Aswitch.on_turn_off (OTEL.Exporter.active exporter) (fun () ->
shutdown self);
start_worker self;
self
let self_metrics (self : state) : OTEL.Metrics.t list =
let open OTEL.Metrics in
let now = Mtime_clock.now () in
[
sum ~name:"otel-ocaml.export.discarded-by-bounded-queue"
~is_monotonic:true
[
int ~now:(Mtime.to_uint64_ns now) (Bounded_queue.num_discarded self.q);
];
]
let to_consumer (self : state) : _ Consumer.t =
let shutdown () = shutdown self in
let tick () = tick self in
let self_metrics () = self_metrics self in
{ active = (fun () -> self.active); tick; shutdown; self_metrics }
let consumer exporter : _ Consumer.Builder.t =
{
start_consuming =
(fun q ->
let st = create_state ~q ~exporter () in
to_consumer st);
}
end

View file

@ -1,38 +1,10 @@
type error = Export_error.t open Common_
(* TODO: emit this in a metric in [tick()] if self tracing is enabled? *) type error = Export_error.t
(** Number of errors met during export *) (** Number of errors met during export *)
let n_errors = Atomic.make 0 let n_errors = Atomic.make 0
(* TODO: put this somewhere with an interval limiter to 30s
(* there is a possible race condition here, as several threads might update
metrics at the same time. But that's harmless. *)
if add_own_metrics then (
Atomic.set last_sent_metrics now;
let open OT.Metrics in
[
make_resource_metrics
[
sum ~name:"otel.export.dropped" ~is_monotonic:true
[
int
~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped);
];
sum ~name:"otel.export.errors" ~is_monotonic:true
[
int
~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors);
];
];
]
) else
[]
*)
module type IO = Generic_io.S_WITH_CONCURRENCY module type IO = Generic_io.S_WITH_CONCURRENCY
module type HTTPC = sig module type HTTPC = sig
@ -59,13 +31,12 @@ module Make
val consumer : val consumer :
?override_n_workers:int -> ?override_n_workers:int ->
ticker_task:float option -> ticker_task:float option ->
stop:bool Atomic.t ->
config:Client_config.t -> config:Client_config.t ->
unit -> unit ->
Consumer.any_resource_builder Consumer.any_resource_builder
(** Create a consumer. (** Make a consumer builder, ie. a builder function that will take a bounded
@param stop queue of signals, and start a consumer to process these signals and send
shared stop variable, set to true to stop this (and maybe other tasks) them somewhere using HTTP.
@param ticker_task @param ticker_task
controls whether we start a task to call [tick] at the given interval in controls whether we start a task to call [tick] at the given interval in
seconds, or [None] to not start such a task at all. *) seconds, or [None] to not start such a task at all. *)
@ -78,39 +49,58 @@ end = struct
ticker_task: float option; ticker_task: float option;
} }
type status =
| Active
| Shutting_down
| Stopped
type state = { type state = {
stop: bool Atomic.t; active: Aswitch.t; (** Public facing switch *)
cleaned: bool Atomic.t; (** True when we cleaned up after closing *) active_trigger: Aswitch.trigger;
status: status Atomic.t;
(** Internal status, including the shutting down process *)
config: Client_config.t; config: Client_config.t;
other_config: other_config; other_config: other_config;
q: Any_resource.t Bounded_queue.t; q: Any_resource.t Bounded_queue.t;
notify: Notifier.t; notify: Notifier.t;
n_workers: int Atomic.t; (** Current number of workers *)
} }
let shutdown self = let shutdown self : unit =
Atomic.set self.stop true; let old_status =
if not (Atomic.exchange self.cleaned true) then ( Util_atomic.update_cas self.status @@ fun status ->
match status with
| Stopped -> status, status
| Shutting_down -> status, status
| Active -> status, Shutting_down
in
match old_status with
| Stopped -> ()
| Shutting_down ->
(* last worker to stop will call [on_done] *)
()
| Active ->
(* notify potentially asleep workers *)
Notifier.trigger self.notify; Notifier.trigger self.notify;
Notifier.delete self.notify Notifier.delete self.notify
)
let send_http_ (self : state) (httpc : Httpc.t) ~backoff ~url (data : string) let send_http_ (self : state) (httpc : Httpc.t) ~backoff ~url (data : string)
: unit IO.t = : unit IO.t =
let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in
match r with match r with
| Ok () -> | Ok () ->
Util_backoff.on_success backoff; Util_net_backoff.on_success backoff;
IO.return () IO.return ()
| Error `Sysbreak -> | Error `Sysbreak ->
Printf.eprintf "ctrl-c captured, stopping\n%!"; Printf.eprintf "ctrl-c captured, stopping\n%!";
Atomic.set self.stop true; shutdown self;
IO.return () IO.return ()
| Error err -> | Error err ->
Atomic.incr n_errors; Atomic.incr n_errors;
Export_error.report_err err; Export_error.report_err err;
(* avoid crazy error loop *) (* avoid crazy error loop *)
let dur_s = Util_backoff.cur_duration_s backoff in let dur_s = Util_net_backoff.on_error backoff in
Util_backoff.on_error backoff;
IO.sleep_s (dur_s +. Random.float (dur_s /. 10.)) IO.sleep_s (dur_s +. Random.float (dur_s /. 10.))
let send_metrics_http (st : state) client ~encoder ~backoff let send_metrics_http (st : state) client ~encoder ~backoff
@ -128,32 +118,54 @@ end = struct
let msg = Signal.Encode.logs ~encoder l in let msg = Signal.Encode.logs ~encoder l in
send_http_ st client msg ~backoff ~url:st.config.url_logs send_http_ st client msg ~backoff ~url:st.config.url_logs
let tick (self : state) = Notifier.trigger self.notify let tick (self : state) =
if Aswitch.is_on self.active then Notifier.trigger self.notify
(** Shutdown one worker, when the queue is closed *)
let shutdown_worker (self : state) : unit =
(* let tid = Thread.id @@ Thread.self () in
Printf.eprintf "worker %d: shutting down\n%!" tid; *)
if Atomic.fetch_and_add self.n_workers (-1) = 1 then (
(* we were the last worker *)
(* Printf.eprintf "worker %d: last one!\n%!" tid; *)
Atomic.set self.status Stopped;
Aswitch.turn_off self.active_trigger
)
let start_worker (self : state) : unit = let start_worker (self : state) : unit =
let client = Httpc.create () in let client = Httpc.create () in
let encoder = Pbrt.Encoder.create () in let encoder = Pbrt.Encoder.create () in
let backoff = Util_backoff.create () in let backoff = Util_net_backoff.create () in
(* loop on [q] *) (* loop on [q] *)
let rec loop () : unit IO.t = let rec loop () : unit IO.t =
if Atomic.get self.stop then (* first look at the queue, to drain it *)
IO.return ()
else
let* () =
match Bounded_queue.try_pop self.q with match Bounded_queue.try_pop self.q with
| `Closed -> | `Closed ->
shutdown self; (* this worker shuts down, others might still be busy *)
shutdown_worker self;
IO.return () IO.return ()
| `Empty -> Notifier.wait self.notify
| `Item (R_logs logs) -> | `Item (R_logs logs) ->
send_logs_http self client ~encoder ~backoff logs let* () = send_logs_http self client ~encoder ~backoff logs in
| `Item (R_metrics ms) ->
send_metrics_http self client ~encoder ~backoff ms
| `Item (R_spans spans) ->
send_traces_http self client ~encoder ~backoff spans
in
loop () loop ()
| `Item (R_metrics ms) ->
let* () = send_metrics_http self client ~encoder ~backoff ms in
loop ()
| `Item (R_spans spans) ->
let* () = send_traces_http self client ~encoder ~backoff spans in
loop ()
| `Empty ->
(* Printf.eprintf "worker %d: empty queue\n%!" tid; *)
(match Atomic.get self.status with
| Stopped ->
assert false
(* shouldn't happen without us going through [Shutting_down] *)
| Shutting_down ->
shutdown_worker self;
IO.return ()
| Active ->
let* () = Notifier.wait self.notify in
loop ())
in in
IO.spawn (fun () -> IO.spawn (fun () ->
@ -163,28 +175,30 @@ end = struct
let start_ticker (self : state) ~(interval_s : float) : unit = let start_ticker (self : state) ~(interval_s : float) : unit =
let rec loop () : unit IO.t = let rec loop () : unit IO.t =
if Atomic.get self.stop then match Atomic.get self.status with
IO.return () | Stopped | Shutting_down -> IO.return ()
else | Active ->
let* () = IO.sleep_s interval_s in let* () = IO.sleep_s interval_s in
tick self; if Aswitch.is_on self.active then tick self;
loop () loop ()
in in
IO.spawn loop IO.spawn loop
let default_n_workers = 50 let default_n_workers = 50
let create_state ?override_n_workers ~ticker_task ~stop ~config ~q () : state let create_state ?override_n_workers ~ticker_task ~config ~q () : state =
= let active, active_trigger = Aswitch.create () in
let other_config = { override_n_workers; ticker_task } in let other_config = { override_n_workers; ticker_task } in
let self = let self =
{ {
stop; active;
active_trigger;
status = Atomic.make Active;
config; config;
other_config; other_config;
q; q;
cleaned = Atomic.make false;
notify = Notifier.create (); notify = Notifier.create ();
n_workers = Atomic.make 0;
} }
in in
@ -201,10 +215,13 @@ end = struct
| None, None -> default_n_workers)) | None, None -> default_n_workers))
in in
ignore (Atomic.fetch_and_add self.n_workers n_workers : int);
for _i = 1 to n_workers do for _i = 1 to n_workers do
start_worker self start_worker self
done; done;
Notifier.register_bounded_queue self.notify q;
(* start ticker *) (* start ticker *)
(match self.other_config.ticker_task with (match self.other_config.ticker_task with
| None -> () | None -> ()
@ -212,22 +229,32 @@ end = struct
self self
let to_consumer (self : state) : Any_resource.t Consumer.t = let self_metrics (self : state) : OTEL.Metrics.t list =
let active () = not (Atomic.get self.stop) in let open OTEL.Metrics in
let shutdown ~on_done = let now = Mtime_clock.now () in
shutdown self; [
on_done () sum ~name:"otel-ocaml.export.discarded-by-bounded-queue"
in ~is_monotonic:true
let tick () = tick self in [
{ active; tick; shutdown } int ~now:(Mtime.to_uint64_ns now) (Bounded_queue.num_discarded self.q);
];
sum ~name:"otel-ocaml.export.errors" ~is_monotonic:true
[ int ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors) ];
]
let consumer ?override_n_workers ~ticker_task ~stop ~config () : let to_consumer (self : state) : Any_resource.t Consumer.t =
let shutdown () = shutdown self in
let tick () = tick self in
let self_metrics () = self_metrics self in
{ active = (fun () -> self.active); tick; shutdown; self_metrics }
let consumer ?override_n_workers ~ticker_task ~config () :
Consumer.any_resource_builder = Consumer.any_resource_builder =
{ {
start_consuming = start_consuming =
(fun q -> (fun q ->
let st = let st =
create_state ?override_n_workers ~ticker_task ~stop ~config ~q () create_state ?override_n_workers ~ticker_task ~config ~q ()
in in
to_consumer st); to_consumer st);
} }

View file

@ -7,7 +7,7 @@ type t = {
let create () : t = { mutex = Mutex.create (); cond = Condition.create () } let create () : t = { mutex = Mutex.create (); cond = Condition.create () }
let trigger self = Condition.signal self.cond let[@inline] trigger self = Condition.broadcast self.cond
let delete = ignore let delete = ignore

View file

@ -1,12 +0,0 @@
(** Backoff behavior in case of errors *)
type t
(** Backoff state. Not thread safe *)
val create : unit -> t
val on_success : t -> unit
val on_error : t -> unit
val cur_duration_s : t -> float

View file

@ -8,6 +8,7 @@ let create () = { delay_s = 0.001; min_delay_s = 0.001; max_delay_s = 20. }
let on_success self = self.delay_s <- max self.min_delay_s (self.delay_s /. 10.) let on_success self = self.delay_s <- max self.min_delay_s (self.delay_s /. 10.)
let on_error self = self.delay_s <- min self.max_delay_s (self.delay_s *. 2.) let on_error self =
let cur = self.delay_s in
let[@inline] cur_duration_s self = self.delay_s self.delay_s <- min self.max_delay_s (self.delay_s *. 2.);
cur

View file

@ -0,0 +1,13 @@
(** Backoff behavior in case of errors *)
type t
(** Backoff state for networking operations. Not thread safe. Do remember to add
a bit of jitter. *)
val create : unit -> t
val on_success : t -> unit
(** Reset backoff to its baseline. *)
val on_error : t -> float
(** Increase backoff, returning the current delay in seconds *)

View file

@ -26,19 +26,23 @@ let start_bg_thread (f : unit -> unit) : Thread.t =
Thread.create run () Thread.create run ()
(** thread that calls [tick()] regularly, to help enforce timeouts *) (** thread that calls [tick()] regularly, to help enforce timeouts *)
let setup_ticker_thread ~stop ~sleep_ms (exp : OTEL.Exporter.t) () = let setup_ticker_thread ~(active : Aswitch.t) ~sleep_ms (exp : OTEL.Exporter.t)
() =
let sleep_s = float sleep_ms /. 1000. in let sleep_s = float sleep_ms /. 1000. in
let tick_loop () = let tick_loop () =
try try
while not @@ Atomic.get stop do while Aswitch.is_on active do
Thread.delay sleep_s; Thread.delay sleep_s;
OTEL.Exporter.tick exp
if Aswitch.is_on active then OTEL.Exporter.tick exp
done done
with with
| Sync_queue.Closed -> () | Sync_queue.Closed -> ()
| exn -> | exn ->
(* print and ignore *) (* print and ignore *)
Printf.eprintf "otel-ocurl: ticker thread: uncaught exn:\n%s\n%!" let bt = Printexc.get_raw_backtrace () in
Printf.eprintf "otel: background thread: uncaught exn:\n%s\n%s\n%!"
(Printexc.to_string exn) (Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt)
in in
start_bg_thread tick_loop start_bg_thread tick_loop

View file

@ -1,19 +1,14 @@
open Opentelemetry
open Lwt.Syntax open Lwt.Syntax
module Span_id = Span_id include Opentelemetry
module Trace_id = Trace_id
module Event = Event module Main_exporter = struct
module Span = Span include Main_exporter
module Span_link = Span_link
module Globals = Globals let remove () : unit Lwt.t =
module Timestamp_ns = Timestamp_ns let p, resolve = Lwt.wait () in
module Gc_metrics = Gc_metrics Aswitch.on_turn_off (active ()) (fun () -> Lwt.wakeup_later resolve ());
module Metrics_callbacks = Metrics_callbacks p
module Trace_context = Trace_context end
module GC_metrics = Gc_metrics [@@depecated "use Gc_metrics"]
module Metrics_emitter = Metrics_emitter
module Logger = Logger
module Log_record = Log_record
external reraise : exn -> 'a = "%reraise" external reraise : exn -> 'a = "%reraise"
(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to (** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to