feat client: overhaul of bounded queue; generic_consumer

This commit is contained in:
Simon Cruanes 2025-12-08 20:06:45 -05:00
parent 6436f0e36d
commit d7da4c4443
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
17 changed files with 429 additions and 288 deletions

View file

@ -1,3 +1,4 @@
open Common_
open Opentelemetry.Proto
(** A resource *)
@ -31,3 +32,9 @@ let of_metrics ?service_name ?attrs m : t =
let of_metrics_or_empty ?service_name ?attrs ms =
of_x_or_empty ?service_name ?attrs ~f:of_metrics ms
let of_signal_l ?service_name ?attrs (s : OTEL.Any_signal_l.t) : t =
match s with
| Logs logs -> of_logs ?service_name ?attrs logs
| Spans sp -> of_spans ?service_name ?attrs sp
| Metrics ms -> of_metrics ?service_name ?attrs ms

View file

@ -12,24 +12,101 @@ type 'a pop_result =
| `Item of 'a
]
type 'a t = {
push: 'a list -> unit;
(** Push items. This might discard some of them.
@raise Closed if the queue is closed. *)
num_discarded: unit -> int; (** How many items were discarded? *)
on_non_empty: (unit -> unit) -> unit;
(** [on_non_empty f] registers [f] to be called whenever the queue
transitions from empty to non-empty. *)
try_pop: unit -> 'a pop_result; (** Try to pop an item right now. *)
close: unit -> unit;
(** Close the queue. Items currently in the queue will still be accessible
to consumers until the queue is emptied out. Idempotent. *)
closed: unit -> bool;
(** Is the queue closed {b for writing}. Consumers should only use
[try_pop] because a queue that's closed-for-writing might still
contain straggler items that need to be consumed.
module Common = struct
type t = {
closed: unit -> bool;
(** Is the queue closed {b for writing}. Consumers should only use
[try_pop] because a queue that's closed-for-writing might still
contain straggler items that need to be consumed.
This should be as fast and cheap as possible. *)
This should be as fast and cheap as possible. *)
num_discarded: unit -> int; (** How many items were discarded? *)
}
let[@inline] num_discarded self = self.num_discarded ()
let[@inline] closed (self : t) : bool = self.closed ()
end
(** Receiving side *)
module Recv = struct
type 'a t = {
on_non_empty: (unit -> unit) -> unit;
(** [on_non_empty f] registers [f] to be called whenever the queue
transitions from empty to non-empty. *)
try_pop: unit -> 'a pop_result; (** Try to pop an item right now. *)
common: Common.t;
}
let[@inline] try_pop (self : _ t) : _ pop_result = self.try_pop ()
let[@inline] on_non_empty (self : _ t) f = self.on_non_empty f
let[@inline] closed (self : _ t) : bool = self.common.closed ()
let[@inline] num_discarded self = self.common.num_discarded ()
let map (type a b) (f : a -> b) (self : a t) : b t =
{
self with
try_pop =
(fun () ->
match self.try_pop () with
| (`Closed | `Empty) as r -> r
| `Item x -> `Item (f x));
}
end
(** Sending side *)
module Send = struct
type 'a t = {
push: 'a list -> unit;
(** Push items. This might discard some of them.
@raise Closed if the queue is closed. *)
close: unit -> unit;
(** Close the queue. Items currently in the queue will still be
accessible to consumers until the queue is emptied out. Idempotent.
*)
common: Common.t;
}
let[@inline] push (self : _ t) x : unit = self.push x
let[@inline] close (self : _ t) : unit = self.close ()
let[@inline] closed (self : _ t) : bool = self.common.closed ()
let[@inline] num_discarded self = self.common.num_discarded ()
let map (type a b) (f : a list -> b list) (self : b t) : a t =
{
self with
push =
(fun xs ->
match f xs with
| [] -> ()
| ys -> self.push ys);
}
(** Turn the writing end of the queue into an emitter.
@param close_queue_on_close
if true, closing the emitter will close the queue *)
let to_emitter ~close_queue_on_close (self : 'a t) :
'a Opentelemetry_emitter.Emitter.t =
let closed () = closed self in
let enabled () = not (closed ()) in
let emit x = if x <> [] then push self x in
let tick ~now:_ = () in
(* NOTE: we cannot actually flush, only close. Emptying the queue is
fundamentally asynchronous because it's done by consumers *)
let flush_and_close () = if close_queue_on_close then close self in
{ closed; enabled; emit; tick; flush_and_close }
end
type 'a t = {
send: 'a Send.t;
recv: 'a Recv.t;
}
(** A bounded queue, with multiple producers and potentially multiple consumers.
@ -37,33 +114,6 @@ type 'a t = {
to be depending on the context (e.g. a Lwt-specific queue implementation
will consume only from the Lwt thread). *)
let[@inline] push (self : _ t) x : unit = self.push x
let[@inline] num_discarded self = self.num_discarded ()
let[@inline] try_pop (self : _ t) : _ pop_result = self.try_pop ()
let[@inline] on_non_empty (self : _ t) f = self.on_non_empty f
let[@inline] close (self : _ t) : unit = self.close ()
let[@inline] closed (self : _ t) : bool = self.closed ()
(** Turn the writing end of the queue into an emitter.
@param close_queue_on_close
if true, closing the emitter will close the queue *)
let to_emitter ~close_queue_on_close (self : 'a t) :
'a Opentelemetry_emitter.Emitter.t =
let closed () = self.closed () in
let enabled () = not (closed ()) in
let emit x = if x <> [] then push self x in
let tick ~now:_ = () in
(* NOTE: we cannot actually flush, only close. Emptying the queue is
fundamentally asynchronous because it's done by consumers *)
let flush_and_close () = if close_queue_on_close then close self in
{ closed; enabled; emit; tick; flush_and_close }
module Defaults = struct
(** The default high watermark *)
let high_watermark : int = 2048

View file

@ -113,7 +113,11 @@ let to_bounded_queue (self : 'a state) : 'a BQ.t =
(* waiters will want to know *)
Cb_set.trigger self.on_non_empty
in
{ BQ.push; num_discarded; try_pop; on_non_empty; close; closed }
let common = { BQ.Common.closed; num_discarded } in
{
BQ.send = { push; close; common };
recv = { try_pop; on_non_empty; common };
}
let create ~high_watermark () : _ BQ.t =
let st =

View file

@ -2,7 +2,7 @@
open Common_
type 'a t = {
type t = {
active: unit -> Aswitch.t;
shutdown: unit -> unit;
(** Shutdown the consumer as soon as possible. [active] will be turned off
@ -14,11 +14,11 @@ type 'a t = {
}
(** A consumer for signals of type ['a] *)
type 'a consumer = 'a t
type consumer = t
let[@inline] active (self : _ t) : Aswitch.t = self.active ()
let[@inline] active (self : t) : Aswitch.t = self.active ()
let[@inline] shutdown (self : _ t) : unit = self.shutdown ()
let[@inline] shutdown (self : t) : unit = self.shutdown ()
let[@inline] self_metrics self : _ list = self.self_metrics ()
@ -26,13 +26,23 @@ let[@inline] self_metrics self : _ list = self.self_metrics ()
let on_stop self f = Aswitch.on_turn_off (self.active ()) f
module Builder = struct
type 'a t = { start_consuming: 'a Bounded_queue.t -> 'a consumer }
type 'a t = { start_consuming: 'a Bounded_queue.Recv.t -> consumer }
(** A builder that will create a consumer for a given queue, start the
consumer so it starts consuming from the queue, and return the consumer.
*)
let start_consuming (self : _ t) bq = self.start_consuming bq
let map (type a b) (f : a -> b) (self : b t) : a t =
{
start_consuming =
(fun q ->
let q = Bounded_queue.Recv.map f q in
self.start_consuming q);
}
end
type any_signal_l_builder = OTEL.Any_signal_l.t Builder.t
type any_resource_builder = Any_resource.t Builder.t
(** The type that's useful for OTEL backends *)
(** The type that's useful for HTTP backends *)

View file

@ -9,7 +9,6 @@
opentelemetry.proto
opentelemetry.domain
pbrt
saturn
mtime
mtime.clock.os)
(synopsis

View file

@ -8,23 +8,22 @@ module BQ_emitters = struct
queue because we need to flush_and_close the other emitters first.
The bounded queue is a shared resource. *)
let logs_emitter_of_bq ?service_name ?attrs
(q : Any_resource.t Bounded_queue.t) : OTEL.Logger.t =
Bounded_queue.to_emitter q ~close_queue_on_close:false
|> Opentelemetry_emitter.Emitter.flat_map
(Any_resource.of_logs_or_empty ?service_name ?attrs)
let logs_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) :
OTEL.Logger.t =
Bounded_queue.Send.to_emitter q ~close_queue_on_close:false
|> Opentelemetry_emitter.Emitter.flat_map OTEL.Any_signal_l.of_logs_or_empty
let spans_emitter_of_bq ?service_name ?attrs
(q : Any_resource.t Bounded_queue.t) : OTEL.Tracer.t =
Bounded_queue.to_emitter q ~close_queue_on_close:false
let spans_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) :
OTEL.Tracer.t =
Bounded_queue.Send.to_emitter q ~close_queue_on_close:false
|> Opentelemetry_emitter.Emitter.flat_map
(Any_resource.of_spans_or_empty ?service_name ?attrs)
OTEL.Any_signal_l.of_spans_or_empty
let metrics_emitter_of_bq ?service_name ?attrs
(q : Any_resource.t Bounded_queue.t) : OTEL.Metrics_emitter.t =
Bounded_queue.to_emitter q ~close_queue_on_close:false
let metrics_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) :
OTEL.Metrics_emitter.t =
Bounded_queue.Send.to_emitter q ~close_queue_on_close:false
|> Opentelemetry_emitter.Emitter.flat_map
(Any_resource.of_metrics_or_empty ?service_name ?attrs)
OTEL.Any_signal_l.of_metrics_or_empty
end
(** Pair a queue with a consumer to build an exporter.
@ -33,20 +32,16 @@ end
bounded queue; while the consumer takes them from the queue to forward them
somewhere else, store them, etc.
@param resource_attributes attributes added to every "resource" batch *)
let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t)
~(consumer : Consumer.any_resource_builder) () : OTEL.Exporter.t =
let create ~(q : OTEL.Any_signal_l.t Bounded_queue.t)
~(consumer : Consumer.any_signal_l_builder) () : OTEL.Exporter.t =
let open Opentelemetry_emitter in
let shutdown_started = Atomic.make false in
let active, trigger = Aswitch.create () in
let consumer = consumer.start_consuming q in
let consumer = consumer.start_consuming q.recv in
let emit_spans =
BQ_emitters.spans_emitter_of_bq ~attrs:resource_attributes q
in
let emit_logs = BQ_emitters.logs_emitter_of_bq ~attrs:resource_attributes q in
let emit_metrics =
BQ_emitters.metrics_emitter_of_bq ~attrs:resource_attributes q
in
let emit_spans = BQ_emitters.spans_emitter_of_bq q.send in
let emit_logs = BQ_emitters.logs_emitter_of_bq q.send in
let emit_metrics = BQ_emitters.metrics_emitter_of_bq q.send in
let tick_set = Cb_set.create () in
let tick () = Cb_set.trigger tick_set in
@ -61,7 +56,7 @@ let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t)
(* first, prevent further pushes to the queue. Consumer workers
can still drain it. *)
Bounded_queue.close q;
Bounded_queue.Send.close q.send;
(* shutdown consumer; once it's down it'll turn our switch off too *)
Aswitch.link (Consumer.active consumer) trigger;

View file

@ -8,7 +8,7 @@ open struct
let pp_span out (sp : OTEL.Span.t) =
let open OTEL in
Format.fprintf out
"@[<2>SPAN@ trace_id: %a@ span_id: %a@ name: %S@ start: %a@ end: %a@]@."
"@[<2>SPAN {@ trace_id: %a@ span_id: %a@ name: %S@ start: %a@ end: %a@]}"
Trace_id.pp
(Trace_id.of_bytes sp.trace_id)
Span_id.pp
@ -16,6 +16,12 @@ open struct
sp.name Timestamp_ns.pp_debug sp.start_time_unix_nano
Timestamp_ns.pp_debug sp.end_time_unix_nano
let pp_log out l =
Format.fprintf out "@[<2>LOG %a@]" Proto.Logs.pp_log_record l
let pp_metric out m =
Format.fprintf out "@[<2>METRICS %a@]" Proto.Metrics.pp_metric m
let pp_vlist mutex pp out l =
if l != [] then (
let@ () = Util_mutex.protect mutex in
@ -55,8 +61,8 @@ let stdout : OTEL.Exporter.t =
in
let emit_spans = mk_emitter pp_span in
let emit_logs = mk_emitter Proto.Logs.pp_log_record in
let emit_metrics = mk_emitter Proto.Metrics.pp_metric in
let emit_logs = mk_emitter pp_log in
let emit_metrics = mk_emitter pp_metric in
let shutdown () =
Emitter.flush_and_close emit_spans;

View file

@ -0,0 +1,220 @@
open Common_
type error = Export_error.t
(** Number of errors met during export *)
let n_errors = Atomic.make 0
module type IO = Generic_io.S_WITH_CONCURRENCY
module type SENDER = sig
module IO : IO
type t
type config
val create : config:config -> unit -> t
val cleanup : t -> unit
val send : t -> OTEL.Any_signal_l.t -> (unit, error) result IO.t
end
module Make
(IO : IO)
(Notifier : Generic_notifier.S with type 'a IO.t = 'a IO.t)
(Sender : SENDER with type 'a IO.t = 'a IO.t) : sig
val consumer :
sender_config:Sender.config ->
n_workers:int ->
ticker_task:float option ->
unit ->
Consumer.any_signal_l_builder
(** Make a consumer builder, ie. a builder function that will take a bounded
queue of signals, and start a consumer to process these signals and send
them somewhere using HTTP. *)
end = struct
module Proto = Opentelemetry_proto
open IO
type config = {
n_workers: int;
ticker_task: float option;
}
type status =
| Active
| Shutting_down
| Stopped
type state = {
active: Aswitch.t; (** Public facing switch *)
q: OTEL.Any_signal_l.t Bounded_queue.Recv.t;
status: status Atomic.t;
(** Internal status, including the shutting down process *)
notify: Notifier.t;
n_workers: int Atomic.t; (** Current number of workers *)
active_trigger: Aswitch.trigger;
config: config;
sender_config: Sender.config;
}
let shutdown self : unit =
let old_status =
Util_atomic.update_cas self.status @@ fun status ->
match status with
| Stopped -> status, status
| Shutting_down -> status, status
| Active -> status, Shutting_down
in
match old_status with
| Stopped -> ()
| Shutting_down ->
(* last worker to stop will call [on_done] *)
()
| Active ->
(* notify potentially asleep workers *)
Notifier.trigger self.notify;
Notifier.delete self.notify
let tick (self : state) =
if Aswitch.is_on self.active then Notifier.trigger self.notify
(** Shutdown one worker, when the queue is closed *)
let shutdown_worker (self : state) : unit =
(* let tid = Thread.id @@ Thread.self () in
Printf.eprintf "worker %d: shutting down\n%!" tid; *)
if Atomic.fetch_and_add self.n_workers (-1) = 1 then (
(* we were the last worker *)
(* Printf.eprintf "worker %d: last one!\n%!" tid; *)
Atomic.set self.status Stopped;
Aswitch.turn_off self.active_trigger
)
let send_signals (self : state) (sender : Sender.t) ~backoff
(sigs : OTEL.Any_signal_l.t) : unit IO.t =
let* r = Sender.send sender sigs in
match r with
| Ok () ->
Util_net_backoff.on_success backoff;
IO.return ()
| Error `Sysbreak ->
Printf.eprintf "ctrl-c captured, stopping\n%!";
shutdown self;
IO.return ()
| Error err ->
Atomic.incr n_errors;
Export_error.report_err err;
(* avoid crazy error loop *)
let dur_s = Util_net_backoff.on_error backoff in
IO.sleep_s (dur_s +. Random.float (dur_s /. 10.))
let start_worker (self : state) : unit =
let sender = Sender.create ~config:self.sender_config () in
let backoff = Util_net_backoff.create () in
(* loop on [q] *)
let rec loop () : unit IO.t =
(* first look at the queue, to drain it *)
match Bounded_queue.Recv.try_pop self.q with
| `Closed ->
(* this worker shuts down, others might still be busy *)
shutdown_worker self;
IO.return ()
| `Item sigs ->
let* () = send_signals ~backoff self sender sigs in
loop ()
| `Empty ->
(* Printf.eprintf "worker %d: empty queue\n%!" tid; *)
(match Atomic.get self.status with
| Stopped ->
assert false
(* shouldn't happen without us going through [Shutting_down] *)
| Shutting_down ->
shutdown_worker self;
IO.return ()
| Active ->
let* () = Notifier.wait self.notify in
loop ())
in
IO.spawn (fun () ->
IO.protect loop ~finally:(fun () ->
Sender.cleanup sender;
IO.return ()))
let start_ticker (self : state) ~(interval_s : float) : unit =
let rec loop () : unit IO.t =
match Atomic.get self.status with
| Stopped | Shutting_down -> IO.return ()
| Active ->
let* () = IO.sleep_s interval_s in
if Aswitch.is_on self.active then tick self;
loop ()
in
IO.spawn loop
let create_state ~sender_config ~n_workers ~ticker_task ~q () : state =
let active, active_trigger = Aswitch.create () in
let config = { n_workers; ticker_task } in
let self =
{
active;
active_trigger;
status = Atomic.make Active;
n_workers = Atomic.make 0;
q;
notify = Notifier.create ();
config;
sender_config;
}
in
(* start workers *)
let n_workers = min 2 (max 500 self.config.n_workers) in
ignore (Atomic.fetch_and_add self.n_workers n_workers : int);
for _i = 1 to n_workers do
start_worker self
done;
Notifier.register_bounded_queue self.notify q;
(* start ticker *)
(match self.config.ticker_task with
| None -> ()
| Some interval_s -> start_ticker self ~interval_s);
self
let self_metrics (self : state) : OTEL.Metrics.t list =
let open OTEL.Metrics in
let now = Mtime_clock.now () in
[
sum ~name:"otel-ocaml.export.errors" ~is_monotonic:true
[ int ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors) ];
sum ~name:"otel-ocaml.export.discarded-by-bounded-queue"
~is_monotonic:true
[
int ~now:(Mtime.to_uint64_ns now)
(Bounded_queue.Recv.num_discarded self.q);
];
]
let to_consumer (self : state) : Consumer.t =
let shutdown () = shutdown self in
let tick () = tick self in
let self_metrics () = self_metrics self in
{ active = (fun () -> self.active); tick; shutdown; self_metrics }
let consumer ~sender_config ~n_workers ~ticker_task () :
Consumer.any_signal_l_builder =
{
start_consuming =
(fun q ->
let st = create_state ~sender_config ~n_workers ~ticker_task ~q () in
to_consumer st);
}
end

View file

@ -26,7 +26,7 @@ end = struct
active: Aswitch.t; (** Public facing switch *)
active_trigger: Aswitch.trigger;
status: status Atomic.t; (** Internal state, including shutdown *)
q: OTEL.Any_signal_l.t Bounded_queue.t;
q: OTEL.Any_signal_l.t Bounded_queue.Recv.t;
notify: Notifier.t;
exp: OTEL.Exporter.t;
}
@ -62,7 +62,7 @@ end = struct
let start_worker (self : state) : unit =
(* loop on [q] *)
let rec loop () : unit IO.t =
match Bounded_queue.try_pop self.q with
match Bounded_queue.Recv.try_pop self.q with
| `Closed ->
shutdown_worker self;
IO.return ()
@ -118,11 +118,12 @@ end = struct
sum ~name:"otel-ocaml.export.discarded-by-bounded-queue"
~is_monotonic:true
[
int ~now:(Mtime.to_uint64_ns now) (Bounded_queue.num_discarded self.q);
int ~now:(Mtime.to_uint64_ns now)
(Bounded_queue.Recv.num_discarded self.q);
];
]
let to_consumer (self : state) : _ Consumer.t =
let to_consumer (self : state) : Consumer.t =
let shutdown () = shutdown self in
let tick () = tick self in
let self_metrics () = self_metrics self in

View file

@ -14,14 +14,14 @@ module type HTTPC = sig
val create : unit -> t
val cleanup : t -> unit
val send :
t ->
url:string ->
decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] ->
string ->
('a, error) result IO.t
val cleanup : t -> unit
end
module Make
@ -33,7 +33,7 @@ module Make
ticker_task:float option ->
config:Client_config.t ->
unit ->
Consumer.any_resource_builder
Consumer.any_signal_l_builder
(** Make a consumer builder, ie. a builder function that will take a bounded
queue of signals, and start a consumer to process these signals and send
them somewhere using HTTP.
@ -44,218 +44,52 @@ end = struct
module Proto = Opentelemetry_proto
open IO
type other_config = {
override_n_workers: int option;
ticker_task: float option;
}
module Sender :
Generic_consumer.SENDER
with module IO = IO
and type config = Client_config.t = struct
module IO = IO
type status =
| Active
| Shutting_down
| Stopped
type t = {
config: Client_config.t;
encoder: Pbrt.Encoder.t;
http: Httpc.t;
}
type state = {
active: Aswitch.t; (** Public facing switch *)
active_trigger: Aswitch.trigger;
status: status Atomic.t;
(** Internal status, including the shutting down process *)
config: Client_config.t;
other_config: other_config;
q: Any_resource.t Bounded_queue.t;
notify: Notifier.t;
n_workers: int Atomic.t; (** Current number of workers *)
}
type config = Client_config.t
let shutdown self : unit =
let old_status =
Util_atomic.update_cas self.status @@ fun status ->
match status with
| Stopped -> status, status
| Shutting_down -> status, status
| Active -> status, Shutting_down
in
let create ~config () : t =
{ config; http = Httpc.create (); encoder = Pbrt.Encoder.create () }
match old_status with
| Stopped -> ()
| Shutting_down ->
(* last worker to stop will call [on_done] *)
()
| Active ->
(* notify potentially asleep workers *)
Notifier.trigger self.notify;
Notifier.delete self.notify
let cleanup self = Httpc.cleanup self.http
let send_http_ (self : state) (httpc : Httpc.t) ~backoff ~url (data : string)
: unit IO.t =
let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in
match r with
| Ok () ->
Util_net_backoff.on_success backoff;
IO.return ()
| Error `Sysbreak ->
Printf.eprintf "ctrl-c captured, stopping\n%!";
shutdown self;
IO.return ()
| Error err ->
Atomic.incr n_errors;
Export_error.report_err err;
(* avoid crazy error loop *)
let dur_s = Util_net_backoff.on_error backoff in
IO.sleep_s (dur_s +. Random.float (dur_s /. 10.))
let send (self : t) (sigs : OTEL.Any_signal_l.t) : (unit, error) result IO.t
=
let res = Any_resource.of_signal_l sigs in
let url =
match res with
| R_logs _ -> self.config.url_logs
| R_spans _ -> self.config.url_traces
| R_metrics _ -> self.config.url_metrics
in
let data = Signal.Encode.any ~encoder:self.encoder res in
Httpc.send self.http ~url ~decode:(`Ret ()) data
end
let send_metrics_http (st : state) client ~encoder ~backoff
(l : Proto.Metrics.resource_metrics list) =
let msg = Signal.Encode.metrics ~encoder l in
send_http_ st client msg ~backoff ~url:st.config.url_metrics
let send_traces_http st client ~encoder ~backoff
(l : Proto.Trace.resource_spans list) =
let msg = Signal.Encode.traces ~encoder l in
send_http_ st client msg ~backoff ~url:st.config.url_traces
let send_logs_http st client ~encoder ~backoff
(l : Proto.Logs.resource_logs list) =
let msg = Signal.Encode.logs ~encoder l in
send_http_ st client msg ~backoff ~url:st.config.url_logs
let tick (self : state) =
if Aswitch.is_on self.active then Notifier.trigger self.notify
(** Shutdown one worker, when the queue is closed *)
let shutdown_worker (self : state) : unit =
(* let tid = Thread.id @@ Thread.self () in
Printf.eprintf "worker %d: shutting down\n%!" tid; *)
if Atomic.fetch_and_add self.n_workers (-1) = 1 then (
(* we were the last worker *)
(* Printf.eprintf "worker %d: last one!\n%!" tid; *)
Atomic.set self.status Stopped;
Aswitch.turn_off self.active_trigger
)
let start_worker (self : state) : unit =
let client = Httpc.create () in
let encoder = Pbrt.Encoder.create () in
let backoff = Util_net_backoff.create () in
(* loop on [q] *)
let rec loop () : unit IO.t =
(* first look at the queue, to drain it *)
match Bounded_queue.try_pop self.q with
| `Closed ->
(* this worker shuts down, others might still be busy *)
shutdown_worker self;
IO.return ()
| `Item (R_logs logs) ->
let* () = send_logs_http self client ~encoder ~backoff logs in
loop ()
| `Item (R_metrics ms) ->
let* () = send_metrics_http self client ~encoder ~backoff ms in
loop ()
| `Item (R_spans spans) ->
let* () = send_traces_http self client ~encoder ~backoff spans in
loop ()
| `Empty ->
(* Printf.eprintf "worker %d: empty queue\n%!" tid; *)
(match Atomic.get self.status with
| Stopped ->
assert false
(* shouldn't happen without us going through [Shutting_down] *)
| Shutting_down ->
shutdown_worker self;
IO.return ()
| Active ->
let* () = Notifier.wait self.notify in
loop ())
in
IO.spawn (fun () ->
IO.protect loop ~finally:(fun () ->
Httpc.cleanup client;
IO.return ()))
let start_ticker (self : state) ~(interval_s : float) : unit =
let rec loop () : unit IO.t =
match Atomic.get self.status with
| Stopped | Shutting_down -> IO.return ()
| Active ->
let* () = IO.sleep_s interval_s in
if Aswitch.is_on self.active then tick self;
loop ()
in
IO.spawn loop
module C = Generic_consumer.Make (IO) (Notifier) (Sender)
let default_n_workers = 50
let create_state ?override_n_workers ~ticker_task ~config ~q () : state =
let active, active_trigger = Aswitch.create () in
let other_config = { override_n_workers; ticker_task } in
let self =
{
active;
active_trigger;
status = Atomic.make Active;
config;
other_config;
q;
notify = Notifier.create ();
n_workers = Atomic.make 0;
}
in
(* start workers *)
let consumer ?override_n_workers ~ticker_task ~(config : Client_config.t) () :
Consumer.any_signal_l_builder =
let n_workers =
min 2
(max 500
(match
( self.other_config.override_n_workers,
self.config.http_concurrency_level )
with
(match override_n_workers, config.http_concurrency_level with
| Some n, _ -> n
| None, Some n -> n
| None, None -> default_n_workers))
in
ignore (Atomic.fetch_and_add self.n_workers n_workers : int);
for _i = 1 to n_workers do
start_worker self
done;
Notifier.register_bounded_queue self.notify q;
(* start ticker *)
(match self.other_config.ticker_task with
| None -> ()
| Some interval_s -> start_ticker self ~interval_s);
self
let self_metrics (self : state) : OTEL.Metrics.t list =
let open OTEL.Metrics in
let now = Mtime_clock.now () in
[
sum ~name:"otel-ocaml.export.discarded-by-bounded-queue"
~is_monotonic:true
[
int ~now:(Mtime.to_uint64_ns now) (Bounded_queue.num_discarded self.q);
];
sum ~name:"otel-ocaml.export.errors" ~is_monotonic:true
[ int ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors) ];
]
let to_consumer (self : state) : Any_resource.t Consumer.t =
let shutdown () = shutdown self in
let tick () = tick self in
let self_metrics () = self_metrics self in
{ active = (fun () -> self.active); tick; shutdown; self_metrics }
let consumer ?override_n_workers ~ticker_task ~config () :
Consumer.any_resource_builder =
{
start_consuming =
(fun q ->
let st =
create_state ?override_n_workers ~ticker_task ~config ~q ()
in
to_consumer st);
}
C.consumer ~sender_config:config ~n_workers ~ticker_task ()
end

View file

@ -13,5 +13,5 @@ module type S = sig
val wait : t -> unit IO.t
val register_bounded_queue : t -> _ Bounded_queue.t -> unit
val register_bounded_queue : t -> _ Bounded_queue.Recv.t -> unit
end

5
src/client/io_sync.ml Normal file
View file

@ -0,0 +1,5 @@
include Generic_io.Direct_style
let sleep_s = Thread.delay
let[@inline] spawn f = ignore (Util_thread.start_bg_thread f : Thread.t)

2
src/client/io_sync.mli Normal file
View file

@ -0,0 +1,2 @@
include Generic_io.S_WITH_CONCURRENCY with type 'a t = 'a
(** Generic IO with [spawn] starting a background thread *)

View file

@ -36,5 +36,5 @@ let trigger (self : t) : unit =
let wait (self : t) : unit Lwt.t = Lwt_condition.wait self.cond
let register_bounded_queue (self : t) (q : _ Bounded_queue.t) : unit =
Bounded_queue.on_non_empty q (fun () -> trigger self)
let register_bounded_queue (self : t) (q : _ Bounded_queue.Recv.t) : unit =
Bounded_queue.Recv.on_non_empty q (fun () -> trigger self)

View file

@ -17,5 +17,5 @@ let wait self =
Mutex.unlock self.mutex
(** Ensure we get signalled when the queue goes from empty to non-empty *)
let register_bounded_queue (self : t) (bq : _ Bounded_queue.t) : unit =
Bounded_queue.on_non_empty bq (fun () -> trigger self)
let register_bounded_queue (self : t) (bq : _ Bounded_queue.Recv.t) : unit =
Bounded_queue.Recv.on_non_empty bq (fun () -> trigger self)

View file

@ -77,6 +77,12 @@ module Encode = struct
~ctor:(fun r ->
Trace_service.make_export_trace_service_request ~resource_spans:r ())
~enc:Trace_service.encode_pb_export_trace_service_request
let any ?encoder (r : Any_resource.t) : string =
match r with
| R_logs l -> logs ?encoder l
| R_spans sp -> traces ?encoder sp
| R_metrics ms -> metrics ?encoder ms
end
module Decode = struct

View file

@ -48,6 +48,8 @@ module Encode : sig
(** [traces ts] is a protobuf encoded string of the traces [ts]
@param encoder provide an encoder state to reuse *)
val any : ?encoder:Pbrt.Encoder.t -> Any_resource.t -> string
end
(** Decode signals from protobuf encoded strings, received over the wire *)