From 15268270df59fd3cfaa228f602bd22ec9ac0b270 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 8 Dec 2025 15:36:19 -0500 Subject: [PATCH] client: start heavily refactoring to use Aswitch, also fix bugs --- src/client/batch.ml | 35 +--- src/client/bounded_queue.ml | 9 +- src/client/bounded_queue_sync.ml | 77 ++++---- src/client/client_config.ml | 5 +- src/client/consumer.ml | 20 +- src/client/emitter_combine.ml | 33 ++++ src/client/exporter_add_batching.ml | 22 ++- src/client/exporter_combine.ml | 15 +- src/client/exporter_debug.ml | 6 +- src/client/exporter_queued.ml | 39 ++-- src/client/exporter_stdout.ml | 15 +- src/client/generic_consumer_exporter.ml | 138 +++++++++++++ src/client/generic_http_consumer.ml | 185 ++++++++++-------- src/client/notifier_sync.ml | 2 +- src/client/util_backoff.mli | 12 -- .../{util_backoff.ml => util_net_backoff.ml} | 7 +- src/client/util_net_backoff.mli | 13 ++ src/client/util_thread.ml | 12 +- src/lwt/opentelemetry_lwt.ml | 25 +-- 19 files changed, 455 insertions(+), 215 deletions(-) create mode 100644 src/client/emitter_combine.ml create mode 100644 src/client/generic_consumer_exporter.ml delete mode 100644 src/client/util_backoff.mli rename src/client/{util_backoff.ml => util_net_backoff.ml} (65%) create mode 100644 src/client/util_net_backoff.mli diff --git a/src/client/batch.ml b/src/client/batch.ml index ac74c3f7..94b69cab 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -1,5 +1,4 @@ open Opentelemetry_atomic -module Domain = Opentelemetry_domain type 'a state = { start: Mtime.t; @@ -49,27 +48,10 @@ let timeout_expired_ ~now ~timeout (self : _ state) : bool = (** Big enough to send? *) let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch -let[@inline] atomic_update_loop_ (type res) (self : _ t) - (f : 'a state -> 'a state * res) : res = - let exception Return of res in - try - let backoff = ref 1 in - while true do - let st = Atomic.get self.st in - let new_st, res = f st in - if Atomic.compare_and_set self.st st new_st then - raise_notrace (Return res); - - (* poor man's backoff strategy *) - Domain.relax_loop !backoff; - backoff := min 128 (2 * !backoff) - done - with Return res -> res - let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = let rev_batch_opt = (* update state. When uncontended this runs only once. *) - atomic_update_loop_ self @@ fun state -> + Util_atomic.update_cas self.st @@ fun state -> (* *) (* check if the batch is ready *) @@ -84,9 +66,9 @@ let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = assert (state.q <> []); let batch = state.q in let new_st = _empty_state in - new_st, Some batch + Some batch, new_st ) else - state, None + None, state in match rev_batch_opt with | None -> None @@ -99,10 +81,10 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] = `Ok else ( let now = lazy (Mtime_clock.now ()) in - atomic_update_loop_ self @@ fun state -> + Util_atomic.update_cas self.st @@ fun state -> if state.size >= self.high_watermark then (* drop this to prevent queue from growing too fast *) - state, `Dropped + `Dropped, state else ( let start = if state.size = 0 && Option.is_some self.timeout then @@ -120,7 +102,7 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] = } in - state, `Ok + `Ok, state ) ) @@ -134,8 +116,8 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t = then [e] itself will be closed. *) let closed_here = Atomic.make false in - let enabled () = e.enabled () in - let closed () = e.closed () in + let enabled () = (not (Atomic.get closed_here)) && e.enabled () in + let closed () = Atomic.get closed_here || e.closed () in let flush_and_close () = if not (Atomic.exchange closed_here true) then ( (* NOTE: we need to close this wrapping emitter first, to prevent @@ -145,6 +127,7 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t = | None -> () | Some l -> Emitter.emit e l); + (* now we can close [e], nothing remains in [self] *) Emitter.flush_and_close e ) in diff --git a/src/client/bounded_queue.ml b/src/client/bounded_queue.ml index b1cc1e81..7419fd59 100644 --- a/src/client/bounded_queue.ml +++ b/src/client/bounded_queue.ml @@ -49,8 +49,11 @@ let[@inline] close (self : _ t) : unit = self.close () let[@inline] closed (self : _ t) : bool = self.closed () -(** Turn the writing end of the queue into an emitter. *) -let to_emitter (self : 'a t) : 'a Opentelemetry_emitter.Emitter.t = +(** Turn the writing end of the queue into an emitter. + @param close_queue_on_close + if true, closing the emitter will close the queue *) +let to_emitter ~close_queue_on_close (self : 'a t) : + 'a Opentelemetry_emitter.Emitter.t = let closed () = self.closed () in let enabled () = not (closed ()) in let emit x = if x <> [] then push self x in @@ -58,7 +61,7 @@ let to_emitter (self : 'a t) : 'a Opentelemetry_emitter.Emitter.t = (* NOTE: we cannot actually flush, only close. Emptying the queue is fundamentally asynchronous because it's done by consumers *) - let flush_and_close () = close self in + let flush_and_close () = if close_queue_on_close then close self in { closed; enabled; emit; tick; flush_and_close } module Defaults = struct diff --git a/src/client/bounded_queue_sync.ml b/src/client/bounded_queue_sync.ml index 487ea4f2..1b02ca26 100644 --- a/src/client/bounded_queue_sync.ml +++ b/src/client/bounded_queue_sync.ml @@ -1,6 +1,8 @@ module BQ = Bounded_queue -exception Closed = Bounded_queue.Closed +type push_res = + | Closed + | Pushed of { num_discarded: int } (* a variant of {!Sync_queue} with more bespoke pushing behavior *) module Q : sig @@ -12,9 +14,9 @@ module Q : sig val closed : _ t -> bool - val try_pop : 'a t -> 'a option + val try_pop : 'a t -> 'a BQ.pop_result - val push_while_not_full : high_watermark:int -> 'a t -> 'a list -> int * int + val push_while_not_full : high_watermark:int -> 'a t -> 'a list -> push_res (** [push_while_not_full q ~high_watermark xs] tries to push each item of [x] into [q]. @@ -43,30 +45,34 @@ end = struct UM.protect self.mutex @@ fun () -> if not self.closed then self.closed <- true - let try_pop (self : 'a t) : 'a option = + let try_pop (self : 'a t) : 'a BQ.pop_result = UM.protect self.mutex @@ fun () -> - if self.closed then raise Closed; - try Some (Queue.pop self.q) with Queue.Empty -> None + if self.closed then + `Closed + else ( + try `Item (Queue.pop self.q) with Queue.Empty -> `Empty + ) let push_while_not_full ~high_watermark (self : 'a t) (xs : 'a list) : - int * int = + push_res = UM.protect self.mutex @@ fun () -> - if self.closed then raise Closed; + if self.closed then + Closed + else ( + let xs = ref xs in - let old_size = Queue.length self.q in - let xs = ref xs in + let continue = ref true in + while !continue && Queue.length self.q < high_watermark do + match !xs with + | [] -> continue := false + | x :: tl_xs -> + xs := tl_xs; + Queue.push x self.q + done; - let continue = ref true in - while !continue && Queue.length self.q < high_watermark do - match !xs with - | [] -> continue := false - | x :: tl_xs -> - xs := tl_xs; - Queue.push x self.q - done; - - let n_discarded = List.length !xs in - n_discarded, old_size + let num_discarded = List.length !xs in + Pushed { num_discarded } + ) end type 'a state = { @@ -77,23 +83,22 @@ type 'a state = { } let push (self : _ state) x = - let discarded, old_size = - try Q.push_while_not_full self.q ~high_watermark:self.high_watermark x - with Sync_queue.Closed -> raise BQ.Closed - in + if x <> [] then ( + match + Q.push_while_not_full self.q ~high_watermark:self.high_watermark x + with + | Closed -> Printf.eprintf "bounded queue: warning: queue is closed\n%!" + | Pushed { num_discarded } -> + if num_discarded > 0 then ( + Printf.eprintf "DISCARD %d items\n%!" num_discarded; + ignore (Atomic.fetch_and_add self.n_discarded num_discarded : int) + ); - if discarded > 0 then - ignore (Atomic.fetch_and_add self.n_discarded discarded : int); + (* wake up potentially asleep consumers *) + Cb_set.trigger self.on_non_empty + ) - (* wake up lagards if the queue was empty *) - if old_size = 0 then Cb_set.trigger self.on_non_empty; - () - -let try_pop (self : _ state) : _ BQ.pop_result = - match Q.try_pop self.q with - | Some x -> `Item x - | None -> `Empty - | exception Sync_queue.Closed -> `Closed +let[@inline] try_pop (self : _ state) : _ BQ.pop_result = Q.try_pop self.q let to_bounded_queue (self : 'a state) : 'a BQ.t = let closed () = Q.closed self.q in diff --git a/src/client/client_config.ml b/src/client/client_config.ml index 655ebcd7..aa1c1d5e 100644 --- a/src/client/client_config.ml +++ b/src/client/client_config.ml @@ -39,8 +39,9 @@ let pp out (self : t) : unit = in Format.fprintf out "{@[ debug=%B;@ self_trace=%B; url_traces=%S;@ url_metrics=%S;@ \ - url_logs=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \ - batch_logs=%a;@ batch_timeout_ms=%d;@ http_concurrency_level=%a @]}" + url_logs=%S;@ @[<2>headers=@,\ + %a@];@ batch_traces=%a;@ batch_metrics=%a;@ batch_logs=%a;@ \ + batch_timeout_ms=%d;@ http_concurrency_level=%a @]}" debug self_trace url_traces url_metrics url_logs ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt batch_logs batch_timeout_ms ppiopt http_concurrency_level diff --git a/src/client/consumer.ml b/src/client/consumer.ml index 42bcf35d..e22af2f0 100644 --- a/src/client/consumer.ml +++ b/src/client/consumer.ml @@ -1,21 +1,29 @@ (** Consumer that accepts items from a bounded queue *) +open Common_ + type 'a t = { - active: unit -> bool; (** Still running? Must be fast and thread-safe *) + active: unit -> Aswitch.t; + shutdown: unit -> unit; + (** Shutdown the consumer as soon as possible. [active] will be turned off + once the consumer is fully shut down. *) tick: unit -> unit; (** Regularly called, eg to emit metrics, check timeouts, etc. Must be thread safe. *) - shutdown: on_done:(unit -> unit) -> unit; - (** Shutdown the consumer as soon as possible, call [on_done()] once it's - done. *) + self_metrics: unit -> OTEL.Metrics.t list; (** Self observing metrics *) } (** A consumer for signals of type ['a] *) type 'a consumer = 'a t -let[@inline] active (self : _ t) = self.active () +let[@inline] active (self : _ t) : Aswitch.t = self.active () -let[@inline] shutdown (self : _ t) ~on_done = self.shutdown ~on_done +let[@inline] shutdown (self : _ t) : unit = self.shutdown () + +let[@inline] self_metrics self : _ list = self.self_metrics () + +(** [on_stop e f] calls [f()] when [e] stops, or now if it's already stopped *) +let on_stop self f = Aswitch.on_turn_off (self.active ()) f module Builder = struct type 'a t = { start_consuming: 'a Bounded_queue.t -> 'a consumer } diff --git a/src/client/emitter_combine.ml b/src/client/emitter_combine.ml new file mode 100644 index 00000000..398b98be --- /dev/null +++ b/src/client/emitter_combine.ml @@ -0,0 +1,33 @@ +(** Combine multiple emitters into one *) + +open Opentelemetry_emitter.Emitter + +type closing_behavior = + [ `Close_when_all_closed + | `Close_when_one_closed + ] +(** When to close the combined emitter: + + - [`Close_when_all_closed]: closed when all the emitters that are combined + are closed + - [`Close_when_one_closed]: closed as soon as one of the emitters is closed +*) + +(** [combine_l es] is an emitter that sends signals to every emitter in [es]. + @param closing + when is this emitter closing. Default [`Close_when_all_closed]. *) +let combine_l ?(closing : closing_behavior = `Close_when_all_closed) + (es : 'a t list) : 'a t = + let closed = + fun () -> + match closing with + | `Close_when_all_closed -> List.for_all closed es + | `Close_when_one_closed -> List.exists closed es + in + let enabled () = not (closed ()) in + let emit x = if x <> [] then List.iter (fun e -> emit e x) es in + let tick ~now = List.iter (tick ~now) es in + let flush_and_close () = List.iter flush_and_close es in + { closed; enabled; emit; tick; flush_and_close } + +let combine e1 e2 : _ t = combine_l [ e1; e2 ] diff --git a/src/client/exporter_add_batching.ml b/src/client/exporter_add_batching.ml index 3f3adaf0..38c5dcef 100644 --- a/src/client/exporter_add_batching.ml +++ b/src/client/exporter_add_batching.ml @@ -23,4 +23,24 @@ let add_batching ~(config : Client_config.t) (exp : OTEL.Exporter.t) : let emit_metrics = add_batch_opt config.batch_metrics exp.emit_metrics in let emit_logs = add_batch_opt config.batch_logs exp.emit_logs in - { exp with emit_spans; emit_metrics; emit_logs } + let active = exp.active in + let tick = exp.tick in + let on_tick = exp.on_tick in + let shutdown () = + let open Opentelemetry_emitter in + Emitter.flush_and_close emit_spans; + Emitter.flush_and_close emit_metrics; + Emitter.flush_and_close emit_logs; + + exp.shutdown () + in + + { + OTEL.Exporter.active; + emit_spans; + emit_metrics; + emit_logs; + on_tick; + tick; + shutdown; + } diff --git a/src/client/exporter_combine.ml b/src/client/exporter_combine.ml index 86c5b100..6b05c8ce 100644 --- a/src/client/exporter_combine.ml +++ b/src/client/exporter_combine.ml @@ -2,24 +2,26 @@ open Common_ open Opentelemetry_atomic open struct - let shutdown_l ~on_done:on_done_real (es : OTEL.Exporter.t list) : unit = + let shutdown_l (es : OTEL.Exporter.t list) ~trigger : unit = let missing = Atomic.make (List.length es) in - let on_done () = if Atomic.fetch_and_add missing (-1) = 1 then (* we were the last exporter to shutdown, [missing] is now 0 *) - on_done_real () + Aswitch.turn_off trigger in - List.iter (OTEL.Exporter.shutdown ~on_done) es + List.iter (fun e -> Aswitch.on_turn_off (OTEL.Exporter.active e) on_done) es; + List.iter OTEL.Exporter.shutdown es end let combine_l (es : OTEL.Exporter.t list) : OTEL.Exporter.t = let open OTEL.Exporter in if es = [] then OTEL.Exporter.dummy () - else + else ( + let active, trigger = Aswitch.create () in { + active = (fun () -> active); emit_spans = Emitter_combine.combine_l (List.map (fun e -> e.emit_spans) es); emit_logs = Emitter_combine.combine_l (List.map (fun e -> e.emit_logs) es); @@ -27,8 +29,9 @@ let combine_l (es : OTEL.Exporter.t list) : OTEL.Exporter.t = Emitter_combine.combine_l (List.map (fun e -> e.emit_metrics) es); on_tick = (fun f -> List.iter (fun e -> e.on_tick f) es); tick = (fun () -> List.iter tick es); - shutdown = (fun ~on_done () -> shutdown_l ~on_done es); + shutdown = (fun () -> shutdown_l es ~trigger); } + ) (** [combine exp1 exp2] is the exporter that emits signals to both [exp1] and [exp2]. *) diff --git a/src/client/exporter_debug.ml b/src/client/exporter_debug.ml index f6346b36..212e6637 100644 --- a/src/client/exporter_debug.ml +++ b/src/client/exporter_debug.ml @@ -5,8 +5,10 @@ open Opentelemetry_emitter @param out the formatter into which to print, default [stderr]. *) let debug ?(out = Format.err_formatter) () : OTEL.Exporter.t = let open Proto in + let active, trigger = Aswitch.create () in let ticker = Cb_set.create () in { + active = (fun () -> active); emit_spans = Emitter.make_simple () ~emit:(fun sp -> List.iter (Format.fprintf out "SPAN: %a@." Trace.pp_span) sp); @@ -21,7 +23,7 @@ let debug ?(out = Format.err_formatter) () : OTEL.Exporter.t = on_tick = Cb_set.register ticker; tick = (fun () -> Cb_set.trigger ticker); shutdown = - (fun ~on_done () -> + (fun () -> Format.fprintf out "CLEANUP@."; - on_done ()); + Aswitch.turn_off trigger); } diff --git a/src/client/exporter_queued.ml b/src/client/exporter_queued.ml index 7db97e09..d9e287ca 100644 --- a/src/client/exporter_queued.ml +++ b/src/client/exporter_queued.ml @@ -6,19 +6,19 @@ module BQ = Bounded_queue module BQ_emitters = struct let logs_emitter_of_bq ?service_name ?attrs (q : Any_resource.t Bounded_queue.t) : OTEL.Logger.t = - Bounded_queue.to_emitter q + Bounded_queue.to_emitter q ~close_queue_on_close:false |> Opentelemetry_emitter.Emitter.flat_map (Any_resource.of_logs_or_empty ?service_name ?attrs) let spans_emitter_of_bq ?service_name ?attrs (q : Any_resource.t Bounded_queue.t) : OTEL.Tracer.t = - Bounded_queue.to_emitter q + Bounded_queue.to_emitter q ~close_queue_on_close:false |> Opentelemetry_emitter.Emitter.flat_map (Any_resource.of_spans_or_empty ?service_name ?attrs) let metrics_emitter_of_bq ?service_name ?attrs (q : Any_resource.t Bounded_queue.t) : OTEL.Metrics_emitter.t = - Bounded_queue.to_emitter q + Bounded_queue.to_emitter q ~close_queue_on_close:false |> Opentelemetry_emitter.Emitter.flat_map (Any_resource.of_metrics_or_empty ?service_name ?attrs) end @@ -31,6 +31,11 @@ end @param resource_attributes attributes added to every "resource" batch *) let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t) ~(consumer : Consumer.any_resource_builder) () : OTEL.Exporter.t = + let open Opentelemetry_emitter in + let shutdown_started = Atomic.make false in + let active, trigger = Aswitch.create () in + let consumer = consumer.start_consuming q in + let emit_spans = BQ_emitters.spans_emitter_of_bq ~attrs:resource_attributes q in @@ -43,15 +48,25 @@ let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t) let tick () = Cb_set.trigger tick_set in let on_tick f = Cb_set.register tick_set f in - let closed = Atomic.make false in + let shutdown () = + if Aswitch.is_on active && not (Atomic.exchange shutdown_started true) then ( + (* flush all emitters *) + Emitter.flush_and_close emit_spans; + Emitter.flush_and_close emit_logs; + Emitter.flush_and_close emit_metrics; - let consumer = consumer.start_consuming q in - - let shutdown ~on_done () = - if not (Atomic.exchange closed true) then ( + (* first, prevent further pushes to the queue. Consumer workers + can still drain it. *) Bounded_queue.close q; - Consumer.shutdown consumer ~on_done - ) else - on_done () + + (* shutdown consumer; once it's down it'll turn our switch off too *) + Aswitch.link (Consumer.active consumer) trigger; + Consumer.shutdown consumer + ) in - { emit_logs; emit_metrics; emit_spans; tick; on_tick; shutdown } + + (* if consumer shuts down for some reason, we also must *) + Aswitch.on_turn_off (Consumer.active consumer) shutdown; + + let active () = active in + { active; emit_logs; emit_metrics; emit_spans; tick; on_tick; shutdown } diff --git a/src/client/exporter_stdout.ml b/src/client/exporter_stdout.ml index e3abee08..af2f1a61 100644 --- a/src/client/exporter_stdout.ml +++ b/src/client/exporter_stdout.ml @@ -35,22 +35,22 @@ let stdout : OTEL.Exporter.t = let mutex = Mutex.create () in let ticker = Cb_set.create () in - let closed = Atomic.make false in + let active, trigger = Aswitch.create () in let tick () = Cb_set.trigger ticker in let mk_emitter pp_signal = let emit l = - if Atomic.get closed then raise Emitter.Closed; + if Aswitch.is_off active then raise Emitter.Closed; pp_vlist mutex pp_signal out l in - let enabled () = not (Atomic.get closed) in + let enabled () = Aswitch.is_on active in let tick ~now:_ = () in let flush_and_close () = - if not (Atomic.exchange closed true) then + if Aswitch.is_on active then let@ () = Util_mutex.protect mutex in Format.pp_print_flush out () in - let closed () = Atomic.get closed in + let closed () = Aswitch.is_off active in { Emitter.emit; closed; enabled; tick; flush_and_close } in @@ -58,14 +58,15 @@ let stdout : OTEL.Exporter.t = let emit_logs = mk_emitter Proto.Logs.pp_log_record in let emit_metrics = mk_emitter Proto.Metrics.pp_metric in - let shutdown ~on_done () = + let shutdown () = Emitter.flush_and_close emit_spans; Emitter.flush_and_close emit_logs; Emitter.flush_and_close emit_metrics; - on_done () + Aswitch.turn_off trigger in { + active = (fun () -> active); emit_spans; emit_logs; emit_metrics; diff --git a/src/client/generic_consumer_exporter.ml b/src/client/generic_consumer_exporter.ml new file mode 100644 index 00000000..65ff639e --- /dev/null +++ b/src/client/generic_consumer_exporter.ml @@ -0,0 +1,138 @@ +(** A consumer that just calls another exporter. + + This is useful to introduce queueing behavior using {!Exporter_queued}, but + simply forwarding to another (presumably non-queue) exporter. + + It is generic because we need some sort of threading/concurrency to run the + consumer. *) + +open Common_ + +module type IO = Generic_io.S_WITH_CONCURRENCY + +module Make + (IO : IO) + (Notifier : Generic_notifier.S with type 'a IO.t = 'a IO.t) : sig + val consumer : OTEL.Exporter.t -> OTEL.Any_signal_l.t Consumer.Builder.t +end = struct + open IO + + type status = + | Active + | Shutting_down + | Stopped + + type state = { + active: Aswitch.t; (** Public facing switch *) + active_trigger: Aswitch.trigger; + status: status Atomic.t; (** Internal state, including shutdown *) + q: OTEL.Any_signal_l.t Bounded_queue.t; + notify: Notifier.t; + exp: OTEL.Exporter.t; + } + + let shutdown self : unit = + let old_status = + Util_atomic.update_cas self.status @@ fun status -> + match status with + | Stopped -> status, status + | Shutting_down -> status, status + | Active -> status, Shutting_down + in + + match old_status with + | Stopped -> () + | Shutting_down -> + (* when the worker stops it will call [on_done] *) + () + | Active -> + (* notify potentially asleep workers *) + Notifier.trigger self.notify; + Notifier.delete self.notify + + let tick (self : state) = Notifier.trigger self.notify + + (** Shutdown one worker, when the queue is closed *) + let shutdown_worker (self : state) : unit = + (* we were the last worker *) + (* Printf.eprintf "worker %d: last one!\n%!" tid; *) + Atomic.set self.status Stopped; + Aswitch.turn_off self.active_trigger + + let start_worker (self : state) : unit = + (* loop on [q] *) + let rec loop () : unit IO.t = + match Bounded_queue.try_pop self.q with + | `Closed -> + shutdown_worker self; + IO.return () + | `Item (Logs logs) -> + OTEL.Exporter.send_logs self.exp logs; + loop () + | `Item (Metrics ms) -> + OTEL.Exporter.send_metrics self.exp ms; + + loop () + | `Item (Spans sp) -> + OTEL.Exporter.send_trace self.exp sp; + loop () + | `Empty -> + (match Atomic.get self.status with + | Stopped -> + assert false + (* shouldn't happen without us going through [Shutting_down] *) + | Shutting_down -> + shutdown_worker self; + IO.return () + | Active -> + let* () = Notifier.wait self.notify in + loop ()) + in + + IO.spawn loop + + let create_state ~q ~exporter () : state = + let active, active_trigger = Aswitch.create () in + let self = + { + active; + active_trigger; + status = Atomic.make Active; + q; + exp = exporter; + notify = Notifier.create (); + } + in + + (* if [exporter] turns off, shut us down too *) + Aswitch.on_turn_off (OTEL.Exporter.active exporter) (fun () -> + shutdown self); + + start_worker self; + self + + let self_metrics (self : state) : OTEL.Metrics.t list = + let open OTEL.Metrics in + let now = Mtime_clock.now () in + [ + sum ~name:"otel-ocaml.export.discarded-by-bounded-queue" + ~is_monotonic:true + [ + int ~now:(Mtime.to_uint64_ns now) (Bounded_queue.num_discarded self.q); + ]; + ] + + let to_consumer (self : state) : _ Consumer.t = + let shutdown () = shutdown self in + let tick () = tick self in + let self_metrics () = self_metrics self in + { active = (fun () -> self.active); tick; shutdown; self_metrics } + + let consumer exporter : _ Consumer.Builder.t = + { + start_consuming = + (fun q -> + let st = create_state ~q ~exporter () in + to_consumer st); + } +end diff --git a/src/client/generic_http_consumer.ml b/src/client/generic_http_consumer.ml index 76823810..b557249a 100644 --- a/src/client/generic_http_consumer.ml +++ b/src/client/generic_http_consumer.ml @@ -1,38 +1,10 @@ -type error = Export_error.t +open Common_ -(* TODO: emit this in a metric in [tick()] if self tracing is enabled? *) +type error = Export_error.t (** Number of errors met during export *) let n_errors = Atomic.make 0 -(* TODO: put this somewhere with an interval limiter to 30s - - (* there is a possible race condition here, as several threads might update - metrics at the same time. But that's harmless. *) - if add_own_metrics then ( - Atomic.set last_sent_metrics now; - let open OT.Metrics in - [ - make_resource_metrics - [ - sum ~name:"otel.export.dropped" ~is_monotonic:true - [ - int - ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); - ]; - sum ~name:"otel.export.errors" ~is_monotonic:true - [ - int - ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); - ]; - ]; - ] - ) else - [] -*) - module type IO = Generic_io.S_WITH_CONCURRENCY module type HTTPC = sig @@ -59,13 +31,12 @@ module Make val consumer : ?override_n_workers:int -> ticker_task:float option -> - stop:bool Atomic.t -> config:Client_config.t -> unit -> Consumer.any_resource_builder - (** Create a consumer. - @param stop - shared stop variable, set to true to stop this (and maybe other tasks) + (** Make a consumer builder, ie. a builder function that will take a bounded + queue of signals, and start a consumer to process these signals and send + them somewhere using HTTP. @param ticker_task controls whether we start a task to call [tick] at the given interval in seconds, or [None] to not start such a task at all. *) @@ -78,39 +49,58 @@ end = struct ticker_task: float option; } + type status = + | Active + | Shutting_down + | Stopped + type state = { - stop: bool Atomic.t; - cleaned: bool Atomic.t; (** True when we cleaned up after closing *) + active: Aswitch.t; (** Public facing switch *) + active_trigger: Aswitch.trigger; + status: status Atomic.t; + (** Internal status, including the shutting down process *) config: Client_config.t; other_config: other_config; q: Any_resource.t Bounded_queue.t; notify: Notifier.t; + n_workers: int Atomic.t; (** Current number of workers *) } - let shutdown self = - Atomic.set self.stop true; - if not (Atomic.exchange self.cleaned true) then ( + let shutdown self : unit = + let old_status = + Util_atomic.update_cas self.status @@ fun status -> + match status with + | Stopped -> status, status + | Shutting_down -> status, status + | Active -> status, Shutting_down + in + + match old_status with + | Stopped -> () + | Shutting_down -> + (* last worker to stop will call [on_done] *) + () + | Active -> + (* notify potentially asleep workers *) Notifier.trigger self.notify; Notifier.delete self.notify - ) let send_http_ (self : state) (httpc : Httpc.t) ~backoff ~url (data : string) : unit IO.t = let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in match r with | Ok () -> - Util_backoff.on_success backoff; + Util_net_backoff.on_success backoff; IO.return () | Error `Sysbreak -> Printf.eprintf "ctrl-c captured, stopping\n%!"; - Atomic.set self.stop true; + shutdown self; IO.return () | Error err -> Atomic.incr n_errors; Export_error.report_err err; (* avoid crazy error loop *) - let dur_s = Util_backoff.cur_duration_s backoff in - Util_backoff.on_error backoff; + let dur_s = Util_net_backoff.on_error backoff in IO.sleep_s (dur_s +. Random.float (dur_s /. 10.)) let send_metrics_http (st : state) client ~encoder ~backoff @@ -128,32 +118,54 @@ end = struct let msg = Signal.Encode.logs ~encoder l in send_http_ st client msg ~backoff ~url:st.config.url_logs - let tick (self : state) = Notifier.trigger self.notify + let tick (self : state) = + if Aswitch.is_on self.active then Notifier.trigger self.notify + + (** Shutdown one worker, when the queue is closed *) + let shutdown_worker (self : state) : unit = + (* let tid = Thread.id @@ Thread.self () in + Printf.eprintf "worker %d: shutting down\n%!" tid; *) + if Atomic.fetch_and_add self.n_workers (-1) = 1 then ( + (* we were the last worker *) + (* Printf.eprintf "worker %d: last one!\n%!" tid; *) + Atomic.set self.status Stopped; + Aswitch.turn_off self.active_trigger + ) let start_worker (self : state) : unit = let client = Httpc.create () in let encoder = Pbrt.Encoder.create () in - let backoff = Util_backoff.create () in + let backoff = Util_net_backoff.create () in (* loop on [q] *) let rec loop () : unit IO.t = - if Atomic.get self.stop then + (* first look at the queue, to drain it *) + match Bounded_queue.try_pop self.q with + | `Closed -> + (* this worker shuts down, others might still be busy *) + shutdown_worker self; IO.return () - else - let* () = - match Bounded_queue.try_pop self.q with - | `Closed -> - shutdown self; - IO.return () - | `Empty -> Notifier.wait self.notify - | `Item (R_logs logs) -> - send_logs_http self client ~encoder ~backoff logs - | `Item (R_metrics ms) -> - send_metrics_http self client ~encoder ~backoff ms - | `Item (R_spans spans) -> - send_traces_http self client ~encoder ~backoff spans - in + | `Item (R_logs logs) -> + let* () = send_logs_http self client ~encoder ~backoff logs in loop () + | `Item (R_metrics ms) -> + let* () = send_metrics_http self client ~encoder ~backoff ms in + loop () + | `Item (R_spans spans) -> + let* () = send_traces_http self client ~encoder ~backoff spans in + loop () + | `Empty -> + (* Printf.eprintf "worker %d: empty queue\n%!" tid; *) + (match Atomic.get self.status with + | Stopped -> + assert false + (* shouldn't happen without us going through [Shutting_down] *) + | Shutting_down -> + shutdown_worker self; + IO.return () + | Active -> + let* () = Notifier.wait self.notify in + loop ()) in IO.spawn (fun () -> @@ -163,28 +175,30 @@ end = struct let start_ticker (self : state) ~(interval_s : float) : unit = let rec loop () : unit IO.t = - if Atomic.get self.stop then - IO.return () - else + match Atomic.get self.status with + | Stopped | Shutting_down -> IO.return () + | Active -> let* () = IO.sleep_s interval_s in - tick self; + if Aswitch.is_on self.active then tick self; loop () in IO.spawn loop let default_n_workers = 50 - let create_state ?override_n_workers ~ticker_task ~stop ~config ~q () : state - = + let create_state ?override_n_workers ~ticker_task ~config ~q () : state = + let active, active_trigger = Aswitch.create () in let other_config = { override_n_workers; ticker_task } in let self = { - stop; + active; + active_trigger; + status = Atomic.make Active; config; other_config; q; - cleaned = Atomic.make false; notify = Notifier.create (); + n_workers = Atomic.make 0; } in @@ -201,10 +215,13 @@ end = struct | None, None -> default_n_workers)) in + ignore (Atomic.fetch_and_add self.n_workers n_workers : int); for _i = 1 to n_workers do start_worker self done; + Notifier.register_bounded_queue self.notify q; + (* start ticker *) (match self.other_config.ticker_task with | None -> () @@ -212,22 +229,32 @@ end = struct self - let to_consumer (self : state) : Any_resource.t Consumer.t = - let active () = not (Atomic.get self.stop) in - let shutdown ~on_done = - shutdown self; - on_done () - in - let tick () = tick self in - { active; tick; shutdown } + let self_metrics (self : state) : OTEL.Metrics.t list = + let open OTEL.Metrics in + let now = Mtime_clock.now () in + [ + sum ~name:"otel-ocaml.export.discarded-by-bounded-queue" + ~is_monotonic:true + [ + int ~now:(Mtime.to_uint64_ns now) (Bounded_queue.num_discarded self.q); + ]; + sum ~name:"otel-ocaml.export.errors" ~is_monotonic:true + [ int ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors) ]; + ] - let consumer ?override_n_workers ~ticker_task ~stop ~config () : + let to_consumer (self : state) : Any_resource.t Consumer.t = + let shutdown () = shutdown self in + let tick () = tick self in + let self_metrics () = self_metrics self in + { active = (fun () -> self.active); tick; shutdown; self_metrics } + + let consumer ?override_n_workers ~ticker_task ~config () : Consumer.any_resource_builder = { start_consuming = (fun q -> let st = - create_state ?override_n_workers ~ticker_task ~stop ~config ~q () + create_state ?override_n_workers ~ticker_task ~config ~q () in to_consumer st); } diff --git a/src/client/notifier_sync.ml b/src/client/notifier_sync.ml index e1fd501d..f7590705 100644 --- a/src/client/notifier_sync.ml +++ b/src/client/notifier_sync.ml @@ -7,7 +7,7 @@ type t = { let create () : t = { mutex = Mutex.create (); cond = Condition.create () } -let trigger self = Condition.signal self.cond +let[@inline] trigger self = Condition.broadcast self.cond let delete = ignore diff --git a/src/client/util_backoff.mli b/src/client/util_backoff.mli deleted file mode 100644 index f097f8ae..00000000 --- a/src/client/util_backoff.mli +++ /dev/null @@ -1,12 +0,0 @@ -(** Backoff behavior in case of errors *) - -type t -(** Backoff state. Not thread safe *) - -val create : unit -> t - -val on_success : t -> unit - -val on_error : t -> unit - -val cur_duration_s : t -> float diff --git a/src/client/util_backoff.ml b/src/client/util_net_backoff.ml similarity index 65% rename from src/client/util_backoff.ml rename to src/client/util_net_backoff.ml index 4bc50ab2..914856f9 100644 --- a/src/client/util_backoff.ml +++ b/src/client/util_net_backoff.ml @@ -8,6 +8,7 @@ let create () = { delay_s = 0.001; min_delay_s = 0.001; max_delay_s = 20. } let on_success self = self.delay_s <- max self.min_delay_s (self.delay_s /. 10.) -let on_error self = self.delay_s <- min self.max_delay_s (self.delay_s *. 2.) - -let[@inline] cur_duration_s self = self.delay_s +let on_error self = + let cur = self.delay_s in + self.delay_s <- min self.max_delay_s (self.delay_s *. 2.); + cur diff --git a/src/client/util_net_backoff.mli b/src/client/util_net_backoff.mli new file mode 100644 index 00000000..af734c31 --- /dev/null +++ b/src/client/util_net_backoff.mli @@ -0,0 +1,13 @@ +(** Backoff behavior in case of errors *) + +type t +(** Backoff state for networking operations. Not thread safe. Do remember to add + a bit of jitter. *) + +val create : unit -> t + +val on_success : t -> unit +(** Reset backoff to its baseline. *) + +val on_error : t -> float +(** Increase backoff, returning the current delay in seconds *) diff --git a/src/client/util_thread.ml b/src/client/util_thread.ml index 75479688..dfbe61ac 100644 --- a/src/client/util_thread.ml +++ b/src/client/util_thread.ml @@ -26,19 +26,23 @@ let start_bg_thread (f : unit -> unit) : Thread.t = Thread.create run () (** thread that calls [tick()] regularly, to help enforce timeouts *) -let setup_ticker_thread ~stop ~sleep_ms (exp : OTEL.Exporter.t) () = +let setup_ticker_thread ~(active : Aswitch.t) ~sleep_ms (exp : OTEL.Exporter.t) + () = let sleep_s = float sleep_ms /. 1000. in let tick_loop () = try - while not @@ Atomic.get stop do + while Aswitch.is_on active do Thread.delay sleep_s; - OTEL.Exporter.tick exp + + if Aswitch.is_on active then OTEL.Exporter.tick exp done with | Sync_queue.Closed -> () | exn -> (* print and ignore *) - Printf.eprintf "otel-ocurl: ticker thread: uncaught exn:\n%s\n%!" + let bt = Printexc.get_raw_backtrace () in + Printf.eprintf "otel: background thread: uncaught exn:\n%s\n%s\n%!" (Printexc.to_string exn) + (Printexc.raw_backtrace_to_string bt) in start_bg_thread tick_loop diff --git a/src/lwt/opentelemetry_lwt.ml b/src/lwt/opentelemetry_lwt.ml index d89d7e3c..ee2daa12 100644 --- a/src/lwt/opentelemetry_lwt.ml +++ b/src/lwt/opentelemetry_lwt.ml @@ -1,19 +1,14 @@ -open Opentelemetry open Lwt.Syntax -module Span_id = Span_id -module Trace_id = Trace_id -module Event = Event -module Span = Span -module Span_link = Span_link -module Globals = Globals -module Timestamp_ns = Timestamp_ns -module Gc_metrics = Gc_metrics -module Metrics_callbacks = Metrics_callbacks -module Trace_context = Trace_context -module GC_metrics = Gc_metrics [@@depecated "use Gc_metrics"] -module Metrics_emitter = Metrics_emitter -module Logger = Logger -module Log_record = Log_record +include Opentelemetry + +module Main_exporter = struct + include Main_exporter + + let remove () : unit Lwt.t = + let p, resolve = Lwt.wait () in + Aswitch.on_turn_off (active ()) (fun () -> Lwt.wakeup_later resolve ()); + p +end external reraise : exn -> 'a = "%reraise" (** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to