various fixes

This commit is contained in:
Simon Cruanes 2026-02-17 16:51:34 -05:00
parent 126e25b5a7
commit 71bb7d1996
18 changed files with 105 additions and 61 deletions

View file

@ -38,17 +38,19 @@ struct
{ mutex = Eio.Mutex.create (); cond = Eio.Condition.create () }
let trigger self =
(* FIXME: this might be triggered from other threads!! how do we
ensure it runs in the Eio thread? *)
(* Eio.Condition.broadcast is lock-free since eio 0.8 (ocaml-multicore/eio#397)
and safe to call from other threads/domains and signal handlers. *)
Eio.Condition.broadcast self.cond
let delete self =
trigger self;
()
let wait self =
let wait self ~should_keep_waiting =
Eio.Mutex.lock self.mutex;
Eio.Condition.await self.cond self.mutex;
while should_keep_waiting () do
Eio.Condition.await self.cond self.mutex
done;
Eio.Mutex.unlock self.mutex
(** Ensure we get signalled when the queue goes from empty to non-empty *)
@ -111,7 +113,9 @@ struct
in
Error err
| Ok (resp, body) ->
let body = Eio.Buf_read.(parse_exn take_all) body ~max_size:max_int in
let body =
Eio.Buf_read.(parse_exn take_all) body ~max_size:(10 * 1024 * 1024)
in
let code = Response.status resp |> Code.code_of_status in
if not (Code.is_error code) then (
match decode with

View file

@ -8,8 +8,6 @@ module OTELC = Opentelemetry_client
module OTEL = Opentelemetry
open Common_
let n_bytes_sent : int Atomic.t = Atomic.make 0
type error = OTELC.Export_error.t
open struct
@ -138,5 +136,3 @@ let with_setup ?after_shutdown ?(config : Config.t = Config.make ())
Fun.protect f ~finally:(fun () -> shutdown_and_wait ?after_shutdown exp)
) else
f ()
let[@inline] n_bytes_sent () = Atomic.get n_bytes_sent

View file

@ -5,9 +5,6 @@
module Config = Config
val n_bytes_sent : unit -> int
(** Global counter of bytes sent (or attempted to be sent) *)
val consumer :
?config:Config.t -> unit -> Opentelemetry_client.Consumer.any_signal_l_builder
(** Consumer that pulls from a queue *)

View file

@ -11,6 +11,7 @@ type 'a t = {
batch: int; (** Minimum size to batch before popping *)
high_watermark: int; (** Size above which we start dropping signals *)
timeout: Mtime.span option;
n_dropped: int Atomic.t;
}
let max_batch_size = 100_000
@ -45,6 +46,7 @@ let make ?(batch = 100) ?high_watermark ?mtime ?timeout () : _ t =
batch;
timeout;
high_watermark;
n_dropped = Atomic.make 0;
}
let timeout_expired_ ~mtime ~timeout (self : _ state) : bool =
@ -94,35 +96,43 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] =
if elems = [] then
`Ok
else (
let now = lazy (Mtime_clock.now ()) in
Util_atomic.update_cas self.st @@ fun state ->
if state.size >= self.high_watermark then
( (* drop this to prevent queue from growing too fast *)
`Dropped,
state )
else (
let start =
if state.size = 0 && Option.is_some self.timeout then
Lazy.force now
else
state.start
in
let now = Mtime_clock.now () in
let res =
Util_atomic.update_cas self.st @@ fun state ->
if state.size >= self.high_watermark then
( (* drop this to prevent queue from growing too fast *)
`Dropped,
state )
else (
let start =
if state.size = 0 && Option.is_some self.timeout then
now
else
state.start
in
(* add to queue *)
let state =
{
size = state.size + List.length elems;
q = List.rev_append elems state.q;
start;
}
in
(* add to queue *)
let state =
{
size = state.size + List.length elems;
q = List.rev_append elems state.q;
start;
}
in
`Ok, state
)
`Ok, state
)
in
(match res with
| `Dropped -> Atomic.incr self.n_dropped
| `Ok -> ());
res
)
let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ])
let[@inline] n_dropped self = Atomic.get self.n_dropped
module Internal_ = struct
let mtime_dummy_ = mtime_dummy_
end

View file

@ -56,6 +56,9 @@ val push' : 'a t -> 'a list -> unit
val cur_size : _ t -> int
(** Number of elements in the current batch *)
val n_dropped : _ t -> int
(** Number of elements dropped because the batch exceeded its high watermark *)
(**/**)
module Internal_ : sig

View file

@ -155,7 +155,11 @@ end = struct
shutdown_worker self;
IO.return ()
| Active ->
let* () = Notifier.wait self.notify in
let* () =
Notifier.wait self.notify ~should_keep_waiting:(fun () ->
Bounded_queue.Recv.size self.q = 0
&& Atomic.get self.status = Active)
in
loop ())
in

View file

@ -87,7 +87,11 @@ end = struct
shutdown_worker self;
IO.return ()
| Active ->
let* () = Notifier.wait self.notify in
let* () =
Notifier.wait self.notify ~should_keep_waiting:(fun () ->
Bounded_queue.Recv.size self.q = 0
&& Atomic.get self.status = Active)
in
loop ())
in

View file

@ -2,9 +2,6 @@ open Common_
type error = Export_error.t
(** Number of errors met during export *)
let n_errors = Atomic.make 0
module type IO = Generic_io.S_WITH_CONCURRENCY
module type HTTPC = sig
@ -133,8 +130,8 @@ end = struct
let consumer ?override_n_workers ~ticker_task ~(config : Http_config.t) () :
Consumer.any_signal_l_builder =
let n_workers =
min 2
(max 500
max 2
(min 500
(match override_n_workers, config.http_concurrency_level with
| Some n, _ -> n
| None, Some n -> n

View file

@ -13,7 +13,7 @@ module type S = sig
val trigger : t -> unit
val wait : t -> unit IO.t
val wait : t -> should_keep_waiting:(unit -> bool) -> unit IO.t
val register_bounded_queue : t -> _ Bounded_queue.Recv.t -> unit
end

View file

@ -34,7 +34,16 @@ let trigger (self : t) : unit =
else if not (Atomic.exchange self.notified true) then
Lwt_unix.send_notification self.notification
let wait (self : t) : unit Lwt.t = Lwt_condition.wait self.cond
let wait (self : t) ~should_keep_waiting : unit Lwt.t =
let open Lwt.Syntax in
let rec loop () =
if should_keep_waiting () then
let* () = Lwt_condition.wait self.cond in
loop ()
else
Lwt.return_unit
in
loop ()
let register_bounded_queue (self : t) (q : _ Bounded_queue.Recv.t) : unit =
Bounded_queue.Recv.on_non_empty q (fun () -> trigger self)

View file

@ -1,15 +1,19 @@
type t = {
proba_accept: float;
rng: Random.State.t;
n_seen: int Atomic.t;
n_accepted: int Atomic.t;
}
let create ~proba_accept () : t =
(* FIXME: either create a random state and protect it, or make sure
we Random.self_init() in the current domain?? *)
if proba_accept < 0. || proba_accept > 1. then
invalid_arg "sampler: proba_accept must be in [0., 1.]";
{ proba_accept; n_seen = Atomic.make 0; n_accepted = Atomic.make 0 }
{
proba_accept;
rng = Random.State.make_self_init ();
n_seen = Atomic.make 0;
n_accepted = Atomic.make 0;
}
let[@inline] proba_accept self = self.proba_accept
@ -25,7 +29,10 @@ let actual_rate (self : t) : float =
let accept (self : t) : bool =
Atomic.incr self.n_seen;
let n = Random.float 1. in
(* WARNING: Random.State.float is not safe to call concurrently on the
same state from multiple domains. If a sampler is shared across domains,
consider creating one sampler per domain. *)
let n = Random.State.float self.rng 1. in
let res = n < self.proba_accept in
if res then Atomic.incr self.n_accepted;

View file

@ -43,8 +43,7 @@ end = struct
a value of type [bool] which OCaml's memory model should guarantee. *)
let[@inline] closed self = self.closed
(* NOTE: race condition here is also benign in absence of tearing. *)
let[@inline] size self = Queue.length self.q
let[@inline] size self = UM.protect self.mutex (fun () -> Queue.length self.q)
let close (self : _ t) =
UM.protect self.mutex @@ fun () ->

View file

@ -11,9 +11,11 @@ let[@inline] trigger self = Condition.broadcast self.cond
let delete = ignore
let wait self =
let wait self ~should_keep_waiting =
Mutex.lock self.mutex;
Condition.wait self.cond self.mutex;
while should_keep_waiting () do
Condition.wait self.cond self.mutex
done;
Mutex.unlock self.mutex
(** Ensure we get signalled when the queue goes from empty to non-empty *)

View file

@ -92,6 +92,8 @@ let to_span_link (self : t) : Span_link.t =
let[@inline] to_span_ctx (self : t) : Span_ctx.t =
Span_ctx.make ~trace_id:(trace_id self) ~parent_id:(id self) ()
(* Note: a span must not be concurrently modified from multiple
threads or domains. *)
let[@inline] add_event self ev : unit =
if is_not_dummy self then span_set_events self (ev :: self.events)

View file

@ -77,7 +77,11 @@ let of_w3c_trace_context bs : _ result =
with Invalid_argument msg -> invalid_arg (spf "in span id: %s" msg)
in
if Bytes.get bs 52 <> '-' then invalid_arg "expected '-' after parent_id";
let sampled = int_of_string_opt (Bytes.sub_string bs 53 2) = Some 1 in
let sampled =
match int_of_string_opt ("0x" ^ Bytes.sub_string bs 53 2) with
| Some flags -> flags land 1 = 1
| None -> false
in
(* ignore other flags *)
Ok (make ~remote:true ~sampled ~trace_id ~parent_id ())

View file

@ -151,16 +151,21 @@ let client ?(tracer = Otel.Tracer.dynamic_main) ?(span : Otel.Span.t option)
[Cohttp_lwt.S.Client]. *)
include C
let attrs_for ~uri ~meth:_ () =
let attrs_for ~uri ~meth () =
[
"http.method", `String (Code.string_of_method `GET);
"http.method", `String (Code.string_of_method meth);
"http.url", `String (Uri.to_string uri);
]
let context_for ~uri ~meth =
let trace_id = Option.map Otel.Span.trace_id span in
let parent =
match span with
| Some _ -> span
| None -> Otel.Ambient_span.get ()
in
let trace_id = Option.map Otel.Span.trace_id parent in
let attrs = attrs_for ~uri ~meth () in
trace_id, span, attrs
trace_id, parent, attrs
let add_traceparent (span : Otel.Span.t) headers =
let module Traceparent = Otel.Trace_context.Traceparent in

View file

@ -56,8 +56,11 @@ module Main_set = struct
| Some s -> s
| None ->
let s = create () in
if Atomic.compare_and_set cur_set_ None (Some s) then
if Atomic.compare_and_set cur_set_ None (Some s) then (
(match Main_exporter.get () with
| Some exp -> add_to_exporter exp s
| None -> ());
s
else
) else
get ()
end

View file

@ -98,8 +98,6 @@ let run () =
[
sum ~name:"num-sleep" ~is_monotonic:true
[ int ~now (Atomic.get num_sleep) ];
sum ~name:"otel.bytes-sent" ~is_monotonic:true ~unit_:"B"
[ int ~now (Opentelemetry_client_ocurl.n_bytes_sent ()) ];
]));
let n_jobs = max 1 !n_jobs in