diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 4591c2eb..9ce2fd75 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -143,6 +143,9 @@ end module type BATCH = sig end +(** Batch of resources to be pushed later. + + This type is thread-safe. *) module Batch : sig type 'a t @@ -164,6 +167,7 @@ module Batch : sig @param now passed to implement timeout *) val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t + (** Create a new batch *) end = struct type 'a t = { lock: Mutex.t; @@ -266,12 +270,6 @@ let start_bg_thread (f : unit -> unit) : unit = in ignore (Thread.create run () : Thread.t) -let l_is_empty = function - | [] -> true - | _ :: _ -> false - -let batch_is_empty = List.for_all l_is_empty - (* make an emitter. exceptions inside should be caught, see @@ -295,9 +293,9 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let batch_logs : Logs.resource_logs list Batch.t = Batch.make ?batch:config.batch_logs ?timeout () - let on_tick_cbs_ = ref (ref []) + let on_tick_cbs_ = Atomic.make (ref []) - let set_on_tick_callbacks = ( := ) on_tick_cbs_ + let set_on_tick_callbacks = Atomic.set on_tick_cbs_ let send_http_ (httpc : Httpc.t) encoder ~path ~encode x : unit = Pbrt.Encoder.reset encoder; @@ -373,13 +371,39 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = ignore (emit_traces_maybe ~now ~force:true httpc encoder : bool); ignore (emit_logs_maybe ~now ~force:true httpc encoder : bool); ignore (emit_metrics_maybe ~now ~force:true httpc encoder : bool) - end in - if config.bg_threads > 0 then ( - (let m = Mutex.create () in - Lock.set_mutex - ~lock:(fun () -> Mutex.lock m) - ~unlock:(fun () -> Mutex.unlock m)); + let tick_common_ () = + if Atomic.get needs_gc_metrics then sample_gc_metrics (); + List.iter + (fun f -> + try f () + with e -> + Printf.eprintf "on tick callback raised: %s\n" + (Printexc.to_string e)) + !(Atomic.get on_tick_cbs_); + () + + let setup_ticker_thread ~tick ~finally () = + (* thread that calls [tick()] regularly, to help enforce timeouts *) + let tick_thread () = + let@ () = + Fun.protect ~finally:(fun () -> + Atomic.set stop true; + finally ()) + in + while not @@ Atomic.get stop do + Thread.delay 0.5; + tick () + done + in + start_bg_thread tick_thread + end in + (let m = Mutex.create () in + Lock.set_mutex + ~lock:(fun () -> Mutex.lock m) + ~unlock:(fun () -> Mutex.unlock m)); + + if config.bg_threads > 0 then ( let m = Mutex.create () in let cond = Condition.create () in @@ -420,14 +444,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = in let tick () = - if Atomic.get needs_gc_metrics then sample_gc_metrics (); - List.iter - (fun f -> - try f () - with e -> - Printf.eprintf "on tick callback raised: %s\n" - (Printexc.to_string e)) - !(!on_tick_cbs_); + tick_common_ (); let now = Mtime_clock.now () in if Atomic.get stop then @@ -440,18 +457,8 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = wakeup ~all:false () in - if config.ticker_thread then ( - (* thread that calls [tick()] regularly, to help enforce timeouts *) - let tick_thread () = - while not @@ Atomic.get stop do - Thread.delay 0.5; - tick () - done; - wakeup ~all:true () - in - - start_bg_thread tick_thread - ); + if config.ticker_thread then + setup_ticker_thread ~tick ~finally:(fun () -> wakeup ~all:true ()) (); let module M = struct let push_trace e = if Batch.push batch_traces e then wakeup ~all:false () @@ -468,8 +475,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let cleanup () = Atomic.set stop true; if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; - (* wakeup everyone *) - with_mutex_ m (fun () -> Condition.broadcast cond) + wakeup ~all:true () end in (module M) ) else ( @@ -477,10 +483,16 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let encoder = Pbrt.Encoder.create () in let module M = struct + (* we make sure that this is thread-safe, even though we don't have a + background thread. There can still be a ticker thread, and there + can also be several user threads that produce spans and call + the emit functions. *) + let push_trace e = let@ () = guard_exn_ in Batch.push' batch_traces e; let now = Mtime_clock.now () in + let@ () = Lock.with_lock in ignore (emit_traces_maybe ~now httpc encoder : bool) let push_metrics e = @@ -488,24 +500,32 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = if Atomic.get needs_gc_metrics then sample_gc_metrics (); Batch.push' batch_metrics e; let now = Mtime_clock.now () in + let@ () = Lock.with_lock in ignore (emit_metrics_maybe ~now httpc encoder : bool) let push_logs e = let@ () = guard_exn_ in Batch.push' batch_logs e; let now = Mtime_clock.now () in + let@ () = Lock.with_lock in ignore (emit_logs_maybe ~now httpc encoder : bool) let set_on_tick_callbacks = set_on_tick_callbacks let tick () = if Atomic.get needs_gc_metrics then sample_gc_metrics (); + let@ () = Lock.with_lock in let now = Mtime_clock.now () in ignore (emit_traces_maybe ~now httpc encoder : bool); ignore (emit_metrics_maybe ~now httpc encoder : bool); ignore (emit_logs_maybe ~now httpc encoder : bool); () + (* make sure we have a ticker thread, if required *) + let () = + if config.ticker_thread then + setup_ticker_thread ~tick ~finally:ignore () + let cleanup () = if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; emit_all_force httpc encoder; @@ -529,11 +549,11 @@ end) { send = (fun l ~ret -> - let@ () = Lock.with_lock in - if !debug_ then + (if !debug_ then + let@ () = Lock.with_lock in Format.eprintf "send spans %a@." (Format.pp_print_list Trace.pp_resource_spans) - l; + l); push_trace l; ret ()); } @@ -581,12 +601,11 @@ end) { send = (fun m ~ret -> - if !debug_ then + (if !debug_ then + let@ () = Lock.with_lock in Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) - m; - - let@ () = Lock.with_lock in + m); let m = List.rev_append (additional_metrics ()) m in push_metrics m; @@ -597,12 +616,12 @@ end) { send = (fun m ~ret -> - if !debug_ then + (if !debug_ then + let@ () = Lock.with_lock in Format.eprintf "send logs %a@." (Format.pp_print_list Logs.pp_resource_logs) - m; + m); - let@ () = Lock.with_lock in push_logs m; ret ()); }