From c9f5a27b222535dd26be35fb2370764bfbf75f6e Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 17 Dec 2025 15:07:02 -0500 Subject: [PATCH] test binaries: updates --- src/client-cohttp-eio/dune | 1 + .../opentelemetry_client_cohttp_eio.ml | 2 +- src/client-cohttp-lwt/dune | 1 + .../opentelemetry_client_cohttp_lwt.ml | 2 +- src/client-ocurl-lwt/dune | 1 + .../opentelemetry_client_ocurl_lwt.ml | 2 +- src/client-ocurl/dune | 1 + .../opentelemetry_client_ocurl.ml | 13 +-- src/lwt/opentelemetry_lwt.ml | 23 +++-- tests/bin/cohttp_client.ml | 3 +- tests/bin/dune | 10 +- tests/bin/emit1.ml | 35 +++++-- tests/bin/emit1_cohttp.ml | 94 +++++++++---------- tests/bin/emit1_eio.ml | 59 ++++++------ tests/bin/emit1_ocurl_lwt.ml | 29 +++--- tests/bin/emit1_stdout.ml | 47 ++++++---- tests/bin/emit_logs_cohttp.ml | 4 +- 17 files changed, 182 insertions(+), 145 deletions(-) diff --git a/src/client-cohttp-eio/dune b/src/client-cohttp-eio/dune index 96477c9f..6e1608cb 100644 --- a/src/client-cohttp-eio/dune +++ b/src/client-cohttp-eio/dune @@ -7,6 +7,7 @@ (libraries (re_export opentelemetry) (re_export opentelemetry-client) + (re_export opentelemetry-client.sync) (re_export eio) (re_export eio.core) (re_export eio.unix) diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml index 49d8dfa2..4c9ce02d 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml @@ -171,7 +171,7 @@ let create_consumer ?(config = Config.make ()) ~sw ~env () : let create_exporter ?(config = Config.make ()) ~sw ~env () = let consumer = create_consumer ~config ~sw ~env () in let bq = - Bounded_queue_sync.create + Opentelemetry_client_sync.Bounded_queue_sync.create ~high_watermark:Bounded_queue.Defaults.high_watermark () in Exporter_queued.create ~clock:Clock.ptime_clock ~q:bq ~consumer () diff --git a/src/client-cohttp-lwt/dune b/src/client-cohttp-lwt/dune index 0a2b7540..8df457b7 100644 --- a/src/client-cohttp-lwt/dune +++ b/src/client-cohttp-lwt/dune @@ -9,6 +9,7 @@ (re_export opentelemetry-lwt) (re_export opentelemetry-client) (re_export opentelemetry-client.lwt) + (re_export opentelemetry-client.sync) (re_export lwt) (re_export lwt.unix) (re_export cohttp-lwt) diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index 79034641..936b89d9 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -111,7 +111,7 @@ let create_consumer ?(config = Config.make ()) () = let create_exporter ?(config = Config.make ()) () = let consumer = create_consumer ~config () in let bq = - Bounded_queue_sync.create + Opentelemetry_client_sync.Bounded_queue_sync.create ~high_watermark:Bounded_queue.Defaults.high_watermark () in Exporter_queued.create ~clock:Clock.ptime_clock ~q:bq ~consumer () diff --git a/src/client-ocurl-lwt/dune b/src/client-ocurl-lwt/dune index f9453892..c68e8250 100644 --- a/src/client-ocurl-lwt/dune +++ b/src/client-ocurl-lwt/dune @@ -8,6 +8,7 @@ (re_export opentelemetry) opentelemetry.atomic (re_export opentelemetry-client) + (re_export opentelemetry-client.sync) (re_export opentelemetry-client.lwt) threads pbrt diff --git a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml index 425a18a6..2cef3ff5 100644 --- a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml +++ b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml @@ -82,7 +82,7 @@ let create_consumer ?(config = Config.make ()) () = let create_exporter ?(config = Config.make ()) () = let consumer = create_consumer ~config () in let bq = - Bounded_queue_sync.create + Opentelemetry_client_sync.Bounded_queue_sync.create ~high_watermark:Bounded_queue.Defaults.high_watermark () in Exporter_queued.create ~clock:Clock.ptime_clock ~q:bq ~consumer () diff --git a/src/client-ocurl/dune b/src/client-ocurl/dune index 980a709c..62a77d6d 100644 --- a/src/client-ocurl/dune +++ b/src/client-ocurl/dune @@ -5,6 +5,7 @@ (re_export opentelemetry) opentelemetry.atomic (re_export opentelemetry-client) + (re_export opentelemetry-client.sync) (re_export curl) unix pbrt diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index fe7d80fa..2f320597 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -17,8 +17,8 @@ let n_bytes_sent : int Atomic.t = Atomic.make 0 type error = OTELC.Export_error.t open struct - module Notifier = OTELC.Notifier_sync - module IO = OTELC.Io_sync + module Notifier = Opentelemetry_client_sync.Notifier_sync + module IO = Opentelemetry_client_sync.Io_sync end module Httpc : OTELC.Generic_http_consumer.HTTPC with module IO = IO = struct @@ -88,7 +88,7 @@ let consumer ?(config = Config.make ()) () : let create_exporter ?(config = Config.make ()) () : OTEL.Exporter.t = let consumer = consumer ~config () in let bq = - OTELC.Bounded_queue_sync.create + Opentelemetry_client_sync.Bounded_queue_sync.create ~high_watermark:OTELC.Bounded_queue.Defaults.high_watermark () in @@ -99,7 +99,7 @@ let create_backend = create_exporter let shutdown_and_wait ?(after_shutdown = ignore) (self : OTEL.Exporter.t) : unit = - let open Opentelemetry_client in + let open Opentelemetry_client_sync in let sq = Sync_queue.create () in OTEL.Aswitch.on_turn_off (OTEL.Exporter.active self) (fun () -> Sync_queue.push sq ()); @@ -119,13 +119,14 @@ let setup_ ?(config : Config.t = Config.make ()) () : OTEL.Exporter.t = let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in let active = OTEL.Exporter.active exporter in ignore - (OTELC.Util_thread.setup_ticker_thread ~active ~sleep_ms exporter () + (Opentelemetry_client_sync.Util_thread.setup_ticker_thread ~active + ~sleep_ms exporter () : Thread.t) ); exporter let remove_exporter () : unit = - let open Opentelemetry_client in + let open Opentelemetry_client_sync in (* used to wait *) let sq = Sync_queue.create () in OTEL.Main_exporter.remove () ~on_done:(fun () -> Sync_queue.push sq ()); diff --git a/src/lwt/opentelemetry_lwt.ml b/src/lwt/opentelemetry_lwt.ml index 5b608fb5..47bbf640 100644 --- a/src/lwt/opentelemetry_lwt.ml +++ b/src/lwt/opentelemetry_lwt.ml @@ -18,21 +18,26 @@ module Tracer = struct include Tracer (** Sync span guard *) - let with_ ?(tracer = dynamic_main) ?force_new_trace_id ?trace_state ?attrs - ?kind ?trace_id ?parent ?links name (cb : Span.t -> 'a Lwt.t) : 'a Lwt.t = + let with_ (type a) ?(tracer = dynamic_main) ?force_new_trace_id ?trace_state + ?attrs ?kind ?trace_id ?parent ?links name (cb : Span.t -> a Lwt.t) : + a Lwt.t = let thunk, finally = with_thunk_and_finally tracer ?force_new_trace_id ?trace_state ?attrs ?kind ?trace_id ?parent ?links name cb in - try%lwt - let* rv = thunk () in - let () = finally (Ok ()) in - Lwt.return rv - with e -> + match thunk () with + | exception exn -> let bt = Printexc.get_raw_backtrace () in - let () = finally (Error (e, bt)) in - reraise e + finally (Error (exn, bt)); + Printexc.raise_with_backtrace exn bt + | promise -> + Lwt.on_any promise + (fun _ -> finally (Ok ())) + (fun exn -> + let bt = Printexc.get_raw_backtrace () in + finally (Error (exn, bt))); + promise end module Trace = Tracer [@@deprecated "use Tracer"] diff --git a/tests/bin/cohttp_client.ml b/tests/bin/cohttp_client.ml index a2e0c0aa..ad074e79 100644 --- a/tests/bin/cohttp_client.ml +++ b/tests/bin/cohttp_client.ml @@ -14,10 +14,9 @@ let mk_client ~scope = let run () = let open Lwt.Syntax in - let tracer = OT.Tracer.get_main () in let rec go () = let@ scope = - Otel_lwt.Tracer.with_ tracer ~kind:OT.Span.Span_kind_producer "loop.outer" + Otel_lwt.Tracer.with_ ~kind:OT.Span.Span_kind_producer "loop.outer" in let* () = Lwt_unix.sleep !sleep_outer in let module C = (val mk_client ~scope) in diff --git a/tests/bin/dune b/tests/bin/dune index 07dde77a..e98b0e8e 100644 --- a/tests/bin/dune +++ b/tests/bin/dune @@ -25,7 +25,12 @@ (executable (name emit1_stdout) (modules emit1_stdout) - (libraries unix threads opentelemetry opentelemetry-client)) + (libraries + unix + threads + opentelemetry + opentelemetry-client + opentelemetry-client.sync)) (executable (name emit1_cohttp) @@ -78,7 +83,10 @@ (modules cohttp_client) (libraries cohttp-lwt-unix + lwt + lwt.unix unix + uri opentelemetry opentelemetry-client-cohttp-lwt opentelemetry-cohttp-lwt)) diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index 8e81cf35..2c7324ba 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -11,6 +11,8 @@ let sleep_outer = ref 2.0 let n_jobs = ref 1 +let iterations = ref 4 + let n = ref max_int let num_sleep = Atomic.make 0 @@ -21,25 +23,24 @@ let num_tr = Atomic.make 0 let run_job () = let active = OT.Main_exporter.active () in - let tracer = OT.Tracer.get_main () in let i = ref 0 in let cnt = ref 0 in while OT.Aswitch.is_on active && !cnt < !n do let@ _scope = Atomic.incr num_tr; - OT.Tracer.with_ ~tracer ~kind:OT.Span.Span_kind_producer "loop.outer" + OT.Tracer.with_ ~kind:OT.Span.Span_kind_producer "loop.outer" ~attrs:[ "i", `Int !i ] in (* Printf.printf "cnt=%d\n%!" !cnt; *) incr cnt; - for j = 0 to 4 do + for j = 1 to !iterations do (* parent scope is found via thread local storage *) let@ scope = Atomic.incr num_tr; - OT.Tracer.with_ ~tracer ~kind:OT.Span.Span_kind_internal ~parent:_scope + OT.Tracer.with_ ~kind:OT.Span.Span_kind_internal ~parent:_scope ~attrs:[ "j", `Int j ] "loop.inner" in @@ -59,8 +60,8 @@ let run_job () = (* allocate some stuff *) if !stress_alloc_ then ( let@ _ = - OT.Tracer.with_ ~tracer ~kind:OT.Span.Span_kind_internal - ~parent:scope "alloc" + OT.Tracer.with_ ~kind:OT.Span.Span_kind_internal ~parent:scope + "alloc" in Atomic.incr num_tr; @@ -92,12 +93,13 @@ let run () = OT.Metrics_callbacks.with_set_added_to_main_exporter (fun set -> OT.Metrics_callbacks.add_metrics_cb set OT.Main_exporter.self_metrics; OT.Metrics_callbacks.add_metrics_cb set (fun () -> + let now = OT.Clock.now_main () in OT.Metrics. [ sum ~name:"num-sleep" ~is_monotonic:true - [ int (Atomic.get num_sleep) ]; + [ int ~now (Atomic.get num_sleep) ]; sum ~name:"otel.bytes-sent" ~is_monotonic:true ~unit_:"B" - [ int (Opentelemetry_client_ocurl.n_bytes_sent ()) ]; + [ int ~now (Opentelemetry_client_ocurl.n_bytes_sent ()) ]; ])); let n_jobs = max 1 !n_jobs in @@ -124,6 +126,8 @@ let () = let final_stats = ref false in let n_bg_threads = ref 0 in + let url = ref None in + let n_procs = ref 1 in let opts = [ "--debug", Arg.Bool (( := ) debug), " enable debug output"; @@ -140,14 +144,26 @@ let () = "-j", Arg.Set_int n_jobs, " number of parallel jobs"; "--bg-threads", Arg.Set_int n_bg_threads, " number of background threads"; "--no-self-trace", Arg.Clear self_trace, " disable self tracing"; - "-n", Arg.Set_int n, " number of iterations (default ∞)"; + "-n", Arg.Set_int n, " number of outer iterations (default ∞)"; + ( "--iterations", + Arg.Set_int iterations, + " the number of inner iterations to run" ); + ( "--url", + Arg.String (fun s -> url := Some s), + " set the url for the OTel collector" ); "--final-stats", Arg.Set final_stats, " display some metrics at the end"; + "--procs", Arg.Set_int n_procs, " number of processes (stub)"; ] |> Arg.align in Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; + if !n_procs > 1 then + failwith + "TODO: add support for running multiple processes to the lwt-cohttp \ + emitter"; + let some_if_nzero r = if !r > 0 then Some !r @@ -156,6 +172,7 @@ let () = in let config = Opentelemetry_client_ocurl.Config.make ~debug:!debug ~self_trace:!self_trace + ?url:!url ?bg_threads:(some_if_nzero n_bg_threads) ~batch_traces:(some_if_nzero batch_traces) ~batch_metrics:(some_if_nzero batch_metrics) diff --git a/tests/bin/emit1_cohttp.ml b/tests/bin/emit1_cohttp.ml index 1aa92217..2e0f4f5b 100644 --- a/tests/bin/emit1_cohttp.ml +++ b/tests/bin/emit1_cohttp.ml @@ -12,6 +12,8 @@ let sleep_outer = ref 2.0 let n_jobs = ref 1 +let n = ref max_int + let iterations = ref 1 let num_sleep = Atomic.make 0 @@ -20,61 +22,53 @@ let stress_alloc_ = ref true let num_tr = Atomic.make 0 -(* Counter used to mark simulated failures *) -let i = ref 0 - let run_job job_id : unit Lwt.t = let switch = T.Main_exporter.active () in - while%lwt T.Aswitch.is_on switch do - let tracer = T.Tracer.get_main () in + let i = ref 0 in + while%lwt T.Aswitch.is_on switch && !i < !n do let@ scope = Atomic.incr num_tr; - T.Tracer.with_ ~tracer ~kind:T.Span.Span_kind_producer "loop.outer" - ~attrs:[ "i", `Int job_id ] + T.Tracer.with_ ~kind:T.Span.Span_kind_producer "loop.outer" + ~attrs:[ "i", `Int !i; "job_id", `Int job_id ] in - for%lwt j = 0 to !iterations do - if j >= !iterations then - (* Terminate program, having reached our max iterations *) - T.Main_exporter.remove () - else - (* parent scope is found via thread local storage *) - let@ span = - Atomic.incr num_tr; - T.Tracer.with_ ~tracer ~parent:scope ~kind:T.Span.Span_kind_internal - ~attrs:[ "j", `Int j ] - "loop.inner" - in + incr i; - let* () = Lwt_unix.sleep !sleep_outer in + for%lwt j = 1 to !iterations do + (* parent scope is found via thread local storage *) + let@ span = + Atomic.incr num_tr; + T.Tracer.with_ ~parent:scope ~kind:T.Span.Span_kind_internal + ~attrs:[ "j", `Int j ] + "loop.inner" + in + + let* () = Lwt_unix.sleep !sleep_outer in + Atomic.incr num_sleep; + + T.Logger.logf ~trace_id:(T.Span.trace_id span) ~span_id:(T.Span.id span) + ~severity:Severity_number_info (fun k -> k "inner at %d" j); + + try%lwt + Atomic.incr num_tr; + let@ scope = + T.Tracer.with_ ~kind:T.Span.Span_kind_internal ~parent:span "alloc" + in + (* allocate some stuff *) + if !stress_alloc_ then ( + let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in + ignore _arr + ); + + let* () = Lwt_unix.sleep !sleep_inner in Atomic.incr num_sleep; - T.Logger.logf ~trace_id:(T.Span.trace_id span) ~span_id:(T.Span.id span) - ~severity:Severity_number_info (fun k -> k "inner at %d" j); + (* simulate a failure *) + if j = 4 && !i mod 13 = 0 then failwith "oh no"; - incr i; - - try%lwt - Atomic.incr num_tr; - let@ scope = - T.Tracer.with_ ~tracer ~kind:T.Span.Span_kind_internal ~parent:span - "alloc" - in - (* allocate some stuff *) - if !stress_alloc_ then ( - let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in - ignore _arr - ); - - let* () = Lwt_unix.sleep !sleep_inner in - Atomic.incr num_sleep; - - (* simulate a failure *) - if j = 4 && !i mod 13 = 0 then failwith "oh no"; - - T.Span.add_event scope (T.Event.make "done with alloc"); - Lwt.return () - with Failure _ -> Lwt.return () + T.Span.add_event scope (T.Event.make "done with alloc"); + Lwt.return () + with Failure _ -> Lwt.return () done done @@ -84,10 +78,11 @@ let run () : unit Lwt.t = T.Metrics_callbacks.( with_set_added_to_main_exporter (fun set -> add_metrics_cb set (fun () -> + let now = T.Clock.now_main () in T.Metrics. [ sum ~name:"num-sleep" ~is_monotonic:true - [ int (Atomic.get num_sleep) ]; + [ int ~now (Atomic.get num_sleep) ]; ]))); let n_jobs = max 1 !n_jobs in @@ -124,9 +119,12 @@ let () = "--batch-logs", Arg.Int (( := ) batch_logs), " size of logs batch"; "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; - "--iterations", Arg.Set_int iterations, " the number of iterations to run"; + ( "--iterations", + Arg.Set_int iterations, + " the number of inner iterations to run" ); "-j", Arg.Set_int n_jobs, " number of parallel jobs"; - "--procs", Arg.Set_int n_procs, " number of processes"; + "-n", Arg.Set_int n, " number of outer iterations (default ∞)"; + "--procs", Arg.Set_int n_procs, " number of processes (stub)"; ] |> Arg.align in diff --git a/tests/bin/emit1_eio.ml b/tests/bin/emit1_eio.ml index f215da12..c3302c9c 100644 --- a/tests/bin/emit1_eio.ml +++ b/tests/bin/emit1_eio.ml @@ -15,25 +15,22 @@ let stress_alloc_ = ref true let num_sleep = Atomic.make 0 -let stop = Atomic.make false - let num_tr = Atomic.make 0 -(* Counter used to mark simulated failures *) -let i = Atomic.make 0 +let n = ref max_int let run_job clock _job_id iterations : unit = - let@ scope = - Atomic.incr num_tr; - OT.Tracer.with_ ~kind:OT.Span.Span_kind_producer "loop.outer" - ~attrs:[ "i", `Int (Atomic.get i) ] - in + let i = ref 0 in + while OT.Aswitch.is_on (OT.Main_exporter.active ()) && !i < !n do + let@ scope = + Atomic.incr num_tr; + OT.Tracer.with_ ~kind:OT.Span.Span_kind_producer "loop.outer" + ~attrs:[ "i", `Int !i ] + in - for j = 0 to iterations do - if j >= iterations then - (* Terminate program, having reached our max iterations *) - Atomic.set stop true - else + incr i; + + for j = 1 to iterations do (* parent scope is found via thread local storage *) let@ scope = Atomic.incr num_tr; @@ -49,8 +46,6 @@ let run_job clock _job_id iterations : unit = ~span_id:(OT.Span.id scope) ~severity:Severity_number_info (fun k -> k "inner at %d" j); - Atomic.incr i; - try Atomic.incr num_tr; let@ scope = @@ -65,18 +60,21 @@ let run_job clock _job_id iterations : unit = let () = Eio.Time.sleep clock !sleep_inner in Atomic.incr num_sleep; - if j = 4 && Atomic.get i mod 13 = 0 then failwith "oh no"; + if j = 4 && !i mod 13 = 0 then failwith "oh no"; (* simulate a failure *) OT.Span.add_event scope (OT.Event.make "done with alloc") with Failure _ -> () + done done let run env proc iterations () : unit = OT.Gc_metrics.setup_on_main_exporter (); OT.Metrics_callbacks.( - with_set_added_to_main_exporter (fun set -> + with_set_added_to_main_exporter + ~min_interval:Mtime.Span.(10 * ms) + (fun set -> add_metrics_cb set (fun () -> let now = OT.Clock.now_main () in OT.Metrics. @@ -126,6 +124,7 @@ let () = Arg.Set_int n_iterations, " the number of iterations to run" ); "-j", Arg.Set_int n_jobs, " number of jobs per processes"; + "-n", Arg.Set_int n, " number of iterations (default ∞)"; "--procs", Arg.Set_int n_procs, " number of processes"; ] |> Arg.align @@ -156,16 +155,14 @@ let () = (Atomic.get num_tr) elapsed n_per_sec) in Eio_main.run @@ fun env -> - (if !n_procs < 2 then - Opentelemetry_client_cohttp_eio.with_setup ~config - (run env 0 !n_iterations) env - else - Eio.Switch.run @@ fun sw -> - Opentelemetry_client_cohttp_eio.setup ~config ~sw env; - let dm = Eio.Stdenv.domain_mgr env in - Eio.Switch.run (fun sw -> - for proc = 1 to !n_procs do - Eio.Fiber.fork ~sw @@ fun () -> - Eio.Domain_manager.run dm (run env proc !n_iterations) - done)); - OT.Main_exporter.remove () ~on_done:ignore + if !n_procs < 2 then + Opentelemetry_client_cohttp_eio.with_setup ~config env + (run env 0 !n_iterations) + else + Opentelemetry_client_cohttp_eio.with_setup ~config env @@ fun () -> + let dm = Eio.Stdenv.domain_mgr env in + Eio.Switch.run (fun sw -> + for proc = 1 to !n_procs do + Eio.Fiber.fork ~sw @@ fun () -> + Eio.Domain_manager.run dm (run env proc !n_iterations) + done) diff --git a/tests/bin/emit1_ocurl_lwt.ml b/tests/bin/emit1_ocurl_lwt.ml index 1284363c..bea8f9a6 100644 --- a/tests/bin/emit1_ocurl_lwt.ml +++ b/tests/bin/emit1_ocurl_lwt.ml @@ -21,14 +21,13 @@ let num_tr = Atomic.make 0 let run_job () : unit Lwt.t = let active = OT.Main_exporter.active () in - let tracer = OT.Tracer.get_main () in let i = ref 0 in let cnt = ref 0 in while%lwt OT.Aswitch.is_on active && !cnt < !n do let@ _scope = Atomic.incr num_tr; - OT.Tracer.with_ tracer ~kind:OT.Span.Span_kind_producer "loop.outer" + OT.Tracer.with_ ~kind:OT.Span.Span_kind_producer "loop.outer" ~attrs:[ "i", `Int !i ] in @@ -39,7 +38,7 @@ let run_job () : unit Lwt.t = (* parent scope is found via thread local storage *) let@ scope = Atomic.incr num_tr; - OT.Tracer.with_ tracer ~kind:OT.Span.Span_kind_internal ~parent:_scope + OT.Tracer.with_ ~kind:OT.Span.Span_kind_internal ~parent:_scope ~attrs:[ "j", `Int j ] "loop.inner" in @@ -49,13 +48,9 @@ let run_job () : unit Lwt.t = Atomic.incr num_sleep ); - let logger = OT.Logger.get_main () in - OT.Emitter.emit logger - [ - OT.Log_record.make_strf ~trace_id:(OT.Span.trace_id scope) - ~span_id:(OT.Span.id scope) ~severity:Severity_number_info - "inner at %d" j; - ]; + OT.Logger.logf ~trace_id:(OT.Span.trace_id scope) + ~span_id:(OT.Span.id scope) ~severity:Severity_number_info (fun k -> + k "inner at %d" j); incr i; @@ -63,8 +58,8 @@ let run_job () : unit Lwt.t = (* allocate some stuff *) if !stress_alloc_ then ( let@ _ = - OT.Tracer.with_ tracer ~kind:OT.Span.Span_kind_internal - ~parent:scope "alloc" + OT.Tracer.with_ ~kind:OT.Span.Span_kind_internal ~parent:scope + "alloc" in Atomic.incr num_tr; @@ -95,10 +90,11 @@ let run () : unit Lwt.t = OT.Metrics_callbacks.with_set_added_to_main_exporter (fun set -> OT.Metrics_callbacks.add_metrics_cb set OT.Main_exporter.self_metrics; OT.Metrics_callbacks.add_metrics_cb set (fun () -> + let now = OT.Clock.now_main () in OT.Metrics. [ sum ~name:"num-sleep" ~is_monotonic:true - [ int (Atomic.get num_sleep) ]; + [ int ~now (Atomic.get num_sleep) ]; ])); let n_jobs = max 1 !n_jobs in @@ -180,8 +176,7 @@ let () = ) in - Lwt_main.run - @@ let@ () = Fun.protect ~finally in - Opentelemetry_client_ocurl_lwt.with_setup ~config () run - ~after_shutdown:after_exp_shutdown + Lwt_main.run + (Opentelemetry_client_ocurl_lwt.with_setup ~config () run + ~after_shutdown:after_exp_shutdown) diff --git a/tests/bin/emit1_stdout.ml b/tests/bin/emit1_stdout.ml index 825fe9ff..f0ede528 100644 --- a/tests/bin/emit1_stdout.ml +++ b/tests/bin/emit1_stdout.ml @@ -11,6 +11,8 @@ let sleep_outer = ref 2.0 let n_jobs = ref 1 +let iterations = ref 4 + let n = ref max_int let num_sleep = Atomic.make 0 @@ -21,25 +23,24 @@ let num_tr = Atomic.make 0 let run_job () = let active = OT.Main_exporter.active () in - let tracer = OT.Tracer.get_main () in let i = ref 0 in let cnt = ref 0 in while OT.Aswitch.is_on active && !cnt < !n do let@ _scope = Atomic.incr num_tr; - OT.Tracer.with_ ~tracer ~kind:OT.Span.Span_kind_producer "loop.outer" + OT.Tracer.with_ ~kind:OT.Span.Span_kind_producer "loop.outer" ~attrs:[ "i", `Int !i ] in (* Printf.printf "cnt=%d\n%!" !cnt; *) incr cnt; - for j = 0 to 4 do + for j = 1 to !iterations do (* parent scope is found via thread local storage *) let@ scope = Atomic.incr num_tr; - OT.Tracer.with_ ~tracer ~kind:OT.Span.Span_kind_internal ~parent:_scope + OT.Tracer.with_ ~kind:OT.Span.Span_kind_internal ~parent:_scope ~attrs:[ "j", `Int j ] "loop.inner" in @@ -60,8 +61,8 @@ let run_job () = (* allocate some stuff *) (if !stress_alloc_ then let@ _ = - OT.Tracer.with_ ~tracer ~kind:OT.Span.Span_kind_internal - ~parent:scope "alloc" + OT.Tracer.with_ ~kind:OT.Span.Span_kind_internal ~parent:scope + "alloc" in let _arr : _ array = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 @@ -88,10 +89,11 @@ let run () = OT.Metrics_callbacks.with_set_added_to_main_exporter (fun set -> OT.Metrics_callbacks.add_metrics_cb set (fun () -> + let now = OT.Clock.now_main () in OT.Metrics. [ sum ~name:"num-sleep" ~is_monotonic:true - [ int (Atomic.get num_sleep) ]; + [ int ~now (Atomic.get num_sleep) ]; ])); let n_jobs = max 1 !n_jobs in @@ -105,7 +107,9 @@ let run () = Array.iter Thread.join jobs module Consumer_exporter = - OTC.Generic_consumer_exporter.Make (OTC.Io_sync) (OTC.Notifier_sync) + OTC.Generic_consumer_exporter.Make + (Opentelemetry_client_sync.Io_sync) + (Opentelemetry_client_sync.Notifier_sync) let () = OT.Globals.service_name := "t1"; @@ -137,7 +141,10 @@ let () = "-j", Arg.Set_int n_jobs, " number of parallel jobs"; "--bg-threads", Arg.Set_int n_bg_threads, " number of background threads"; "--no-self-trace", Arg.Clear self_trace, " disable self tracing"; - "-n", Arg.Set_int n, " number of iterations (default ∞)"; + "-n", Arg.Set_int n, " number of outer iterations (default ∞)"; + ( "--iterations", + Arg.Set_int iterations, + " the number of inner iterations to run" ); "--queued", Arg.Set queued, " queue exporter"; ] |> Arg.align @@ -145,21 +152,29 @@ let () = Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; - Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ queued: %ba@]@." + Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ queued: %b@]@." !sleep_outer !sleep_inner !queued; - let exporter = + let exporter, finally = let exp = OTC.Exporter_stdout.stdout () in if !queued then ( - let q = OTC.Bounded_queue_sync.create ~high_watermark:20_000 () in - OTC.Exporter_queued.create ~clock:exp.clock ~q - ~consumer:(Consumer_exporter.consumer exp) - () + let q = + Opentelemetry_client_sync.Bounded_queue_sync.create + ~high_watermark:20_000 () + in + let exp = + OTC.Exporter_queued.create ~clock:exp.clock ~q + ~consumer:(Consumer_exporter.consumer exp) + () + in + let finally () = Opentelemetry_client_sync.Shutdown_sync.shutdown exp in + exp, finally ) else - exp + exp, ignore in OT.Main_exporter.set exporter; + let@ () = Fun.protect ~finally in if !self_trace then Opentelemetry_client.Self_trace.set_enabled true; diff --git a/tests/bin/emit_logs_cohttp.ml b/tests/bin/emit_logs_cohttp.ml index e4b81ed8..80a744ef 100644 --- a/tests/bin/emit_logs_cohttp.ml +++ b/tests/bin/emit_logs_cohttp.ml @@ -36,9 +36,7 @@ let run () = Logs.err (fun m -> m "emit_logs: error log"); Logs.app (fun m -> m "emit_logs: app log"); let%lwt () = - let tracer = T.Tracer.get_main () in - T.Tracer.with_ ~tracer ~kind:T.Span.Span_kind_producer "my_scope" - (fun _scope -> + T.Tracer.with_ ~kind:T.Span.Span_kind_producer "my_scope" (fun _scope -> Logs.info (fun m -> m ~tags:varied_tag_set "emit_logs: this log is emitted with varied tags from a span");