diff --git a/README.md b/README.md index b096b8e6..c8176bd7 100644 --- a/README.md +++ b/README.md @@ -7,10 +7,10 @@ connectors to talk to opentelemetry software such as [jaeger](https://www.jaeger - library `opentelemetry` should be used to instrument your code and possibly libraries. It doesn't communicate with anything except - a backend (default: dummy backend); -- library `opentelemetry-client-ocurl` is a backend that communicates + an exporter (default: no-op); +- library `opentelemetry-client-ocurl` is an exporter that communicates via http+protobuf with some collector (otelcol, datadog-agent, etc.) using cURL bindings; -- library `opentelemetry-client-cohttp-lwt` is a backend that communicates +- library `opentelemetry-client-cohttp-lwt` is an exporter that communicates via http+protobuf with some collector using cohttp. ## License @@ -39,14 +39,14 @@ module Otel = Opentelemetry let (let@) = (@@) let foo () = - let@ scope = Otel.Trace.with_ "foo" + let@ span = Otel.Tracer.with_ "foo" ~attrs:["hello", `String "world"] in - do_work(); - Otel.Metrics.( - emit [ - gauge ~name:"foo.x" [int 42]; - ]); - do_more_work(); + do_work (); + let now = Otel.Clock.now Otel.Meter.default.clock in + Otel.Meter.emit1 Otel.Meter.default + Otel.Metrics.(gauge ~name:"foo.x" [int ~now 42]); + Otel.Span.add_event span (Otel.Event.make "work done"); + do_more_work (); () ``` @@ -56,14 +56,14 @@ If you're writing a top-level application, you need to perform some initial conf 1. Set the [`service_name`][]; 2. optionally configure [ambient-context][] with the appropriate storage for your environment — TLS, Lwt, Eio…; -3. and install a [`Collector`][] (usually by calling your collector's `with_setup` function.) +3. and install an exporter (usually by calling your client library's `with_setup` function.) For example, if your application is using Lwt, and you're using `ocurl` as your collector, you might do something like this: ```ocaml let main () = Otel.Globals.service_name := "my_service"; - Otel.GC_metrics.basic_setup(); + Otel.Gc_metrics.setup (); Opentelemetry_ambient_context.set_storage_provider (Opentelemetry_ambient_context_lwt.storage ()); Opentelemetry_client_ocurl.with_setup () @@ fun () -> @@ -72,10 +72,13 @@ let main () = (* … *) ``` - [`service_name`]: - [`Collector`]: + [`service_name`]: [ambient-context]: now vendored as `opentelemetry.ambient-context`, formerly +## Migration v012 → v0.13 + +see `doc/migration_guide_v0.13.md` + ## Configuration ### Environment Variables @@ -104,20 +107,20 @@ The library supports standard OpenTelemetry environment variables: - `OTEL_EXPORTER_OTLP_LOGS_HEADERS` - logs-specific headers -## Collector opentelemetry-client-ocurl +## opentelemetry-client-ocurl -This is a synchronous collector that uses the http+protobuf format -to send signals (metrics, traces, logs) to some other collector (eg. `otelcol` +This is a synchronous exporter that uses the http+protobuf format +to send signals (metrics, traces, logs) to some collector (eg. `otelcol` or the datadog agent). -Do note that this backend uses a thread pool and is incompatible +Do note that it uses a thread pool and is incompatible with uses of `fork` on some Unixy systems. See [#68](https://github.com/imandra-ai/ocaml-opentelemetry/issues/68) for a possible workaround. -## Collector opentelemetry-client-cohttp-lwt +## opentelemetry-client-cohttp-lwt -This is a Lwt-friendly collector that uses cohttp to send -signals to some other collector (e.g. `otelcol`). It must be run +This is a Lwt-friendly exporter that uses cohttp to send +signals to some collector (e.g. `otelcol`). It must be run inside a `Lwt_main.run` scope. ## Opentelemetry-trace diff --git a/doc/migration_guide_v0.13.md b/doc/migration_guide_v0.13.md new file mode 100644 index 00000000..2dff660f --- /dev/null +++ b/doc/migration_guide_v0.13.md @@ -0,0 +1,271 @@ +# Migration guide: v0.12 → v0.13 + +This guide covers breaking changes when upgrading from v0.12. + +## 1. Backend setup: `Collector` → `Sdk` + `Exporter` + +v0.12 used a first-class module `BACKEND` installed into a global slot via +`Collector.set_backend`. v0.13 replaces this with a plain record `Exporter.t` +installed via `Sdk.set`. + +The `with_setup` helper in each client library still exists, so if you use that +you mainly need to rename the module. + +```ocaml +(* v0.12 *) +Opentelemetry_client_ocurl.with_setup ~config () (fun () -> + (* your code *) + ()) + +(* v0.13: same call, internals changed; ~stop removed, ~after_shutdown added *) +Opentelemetry_client_ocurl.with_setup + ~after_shutdown:(fun _exp -> ()) + ~config () (fun () -> + (* your code *) + ()) +``` + +If you called `setup`/`remove_backend` manually: + +```ocaml +(* v0.12 *) +Opentelemetry_client_ocurl.setup ~config () +(* ... *) +Opentelemetry_client_ocurl.remove_backend () + +(* v0.13 *) +Opentelemetry_client_ocurl.setup ~config () +(* ... *) +Opentelemetry_client_ocurl.remove_exporter () +``` + +The `~stop:bool Atomic.t` parameter has been removed from the ocurl client. +Use `Sdk.active ()` (an `Aswitch.t`) to detect shutdown instead. + +## 2. `Trace.with_` → `Tracer.with_`, callback gets a `Span.t` + +The most common migration. The module is renamed and the callback argument type +changes from `Scope.t` to `Span.t`. + +```ocaml +(* v0.12 *) +Trace.with_ "my-op" ~attrs:["k", `String "v"] (fun (scope : Scope.t) -> + Scope.add_event scope (fun () -> Event.make "something happened"); + Scope.add_attrs scope (fun () -> ["extra", `Int 42]); + do_work () +) + +(* v0.13 *) +Tracer.with_ "my-op" ~attrs:["k", `String "v"] (fun (span : Span.t) -> + Span.add_event span (Event.make "something happened"); + Span.add_attrs span ["extra", `Int 42]; + do_work () +) +``` + +`Trace` is kept as a deprecated alias for `Tracer`. + +Key differences on the callback argument: + +| v0.12 (`Scope.t`) | v0.13 (`Span.t`) | +|--------------------------------------------|--------------------------------------| +| `scope.trace_id` | `Span.trace_id span` | +| `scope.span_id` | `Span.id span` | +| `Scope.add_event scope (fun () -> ev)` | `Span.add_event span ev` | +| `Scope.add_attrs scope (fun () -> attrs)` | `Span.add_attrs span attrs` | +| `Scope.set_status scope st` | `Span.set_status span st` | +| `Scope.record_exception scope e bt` | `Span.record_exception span e bt` | +| `Scope.to_span_ctx scope` | `Span.to_span_ctx span` | +| `Scope.to_span_link scope` | `Span.to_span_link span` | +| `~scope:scope` (pass parent explicitly) | `~parent:span` | + +The `~scope` parameter of `Trace.with_` is renamed to `~parent`: + +```ocaml +(* v0.12 *) +Trace.with_ "child" ~scope:parent_scope (fun child -> ...) + +(* v0.13 *) +Tracer.with_ "child" ~parent:parent_span (fun child -> ...) +``` + +In addition, `Scope.t` is entirely removed because `Span.t` is now mutable. +For additional efficiency, `Span.t` is directly encodable to protobuf +without the need to allocate further intermediate structures. + +## 3. `Logs` → `Logger`, new emit helpers + +The `Logs` module is renamed to `Logger` (`Logs` is kept as a deprecated alias). +Direct construction of log records and batch-emit is replaced by convenience +functions. + +```ocaml +(* v0.12 *) +Logs.emit [ + Logs.make_str ~severity:Severity_number_warn "something went wrong" +] + +Logs.emit [ + Logs.make_strf ~severity:Severity_number_info "processed %d items" n +] + +(* v0.13: simple string *) +Logger.log ~severity:Severity_number_warn "something went wrong" + +(* v0.13: formatted *) +Logger.logf ~severity:Severity_number_info (fun k -> k "processed %d items" n) +``` + +If you need to keep the trace/span correlation: + +```ocaml +(* v0.12 *) +Logs.emit [ + Logs.make_str ~trace_id ~span_id ~severity:Severity_number_info "ok" +] + +(* v0.13 *) +Logger.log ~trace_id ~span_id ~severity:Severity_number_info "ok" +``` + +`Log_record.make_str` / `Log_record.make` still exist if you need to build +records manually and emit them via a `Logger.t`. + +## 4. `Metrics.emit` → emit via a `Meter` + +In v0.12 `Metrics.emit` was a top-level function that sent directly to the +collector. In v0.13 metrics go through a `Meter.t`. For most code the change +is mechanical: + +```ocaml +(* v0.12 *) +Metrics.emit [ + Metrics.gauge ~name:"queue.depth" [ Metrics.int ~now depth ] +] + +(* v0.13: Meter.default emits to the global provider *) +Meter.emit1 Meter.default + (Metrics.gauge ~name:"queue.depth" [ Metrics.int ~now depth ]) +``` + +`now` is now obtained from the meter's clock rather than `Timestamp_ns.now_unix_ns ()`: + +```ocaml +(* v0.12 *) +let now = Timestamp_ns.now_unix_ns () in +Metrics.emit [ Metrics.sum ~name:"counter" [ Metrics.int ~now n ] ] + +(* v0.13 *) +let now = Clock.now Meter.default.clock in +Meter.emit1 Meter.default + (Metrics.sum ~name:"counter" [ Metrics.int ~now n ]) +``` + +## 5. `Metrics_callbacks.register` → `Meter.add_cb` + +```ocaml +(* v0.12 *) +Metrics_callbacks.register (fun () -> + [ Metrics.gauge ~name:"foo" [ Metrics.int ~now:... 42 ] ]) + +(* v0.13: callback now receives a clock *) +Meter.add_cb (fun ~clock () -> + let now = Clock.now clock in + [ Metrics.gauge ~name:"foo" [ Metrics.int ~now 42 ] ]) +``` + +After registering callbacks you must tell the SDK to drive them: + +```ocaml +(* v0.13: call once after setup to schedule periodic emission *) +Meter.add_to_main_exporter Meter.default +``` + +In v0.12 this was automatic once `Metrics_callbacks.register` was called. + +## 6. `GC_metrics.basic_setup` signature unchanged, `setup` changed + +`GC_metrics.basic_setup ()` still works. The module has been renamed +to `Gc_metrics`, but the former name persists as a deprecated alias. + +If you called the lower-level `GC_metrics.setup exp` directly: + +```ocaml +(* v0.12 *) +GC_metrics.setup exporter +(* or *) +GC_metrics.setup_on_main_exporter () + +(* v0.13 *) +Gc_metrics.setup () (* uses Meter.default *) +(* or with a specific meter: *) +Gc_metrics.setup ~meter:my_meter () +``` + +`GC_metrics.setup_on_main_exporter` has been removed. + +## 7. `Collector.on_tick` → `Sdk.add_on_tick_callback` + +```ocaml +(* v0.12 *) +Collector.on_tick (fun () -> do_background_work ()) + +(* v0.13 *) +Sdk.add_on_tick_callback (fun () -> do_background_work ()) +``` + +## 8. `?service_name` parameter removed + +`Trace.with_`, `Logs.emit`, and `Metrics.emit` accepted a `?service_name` +override. This is no longer supported per-call; set it once globally: + +```ocaml +(* v0.12 *) +Trace.with_ "op" ~service_name:"my-svc" (fun _ -> ...) + +(* v0.13: set globally before setup *) +Opentelemetry.Globals.service_name := "my-svc" +Tracer.with_ "op" (fun _ -> ...) +``` + +## 9. `create_backend` / `BACKEND` module type removed + +If you held a reference to a backend module: + +```ocaml +(* v0.12 *) +let (module B : Collector.BACKEND) = + Opentelemetry_client_ocurl.create_backend ~config () +in +Collector.set_backend (module B) + +(* v0.13 *) +let exp : Exporter.t = + Opentelemetry_client_ocurl.create_exporter ~config () +in +Sdk.set exp +``` + +## 10. New features (no migration needed) + +- **`Sdk.get_tracer/get_meter/get_logger`**: obtain a provider pre-stamped with + instrumentation-scope metadata (`~name`, `~version`, `~__MODULE__`). +- **`Trace_provider` / `Meter_provider` / `Log_provider`**: independent + per-signal providers; useful for testing or multi-backend setups. +- **`Dynamic_enricher`**: register callbacks that inject attributes into every + span and log record at creation time (wide events). +- **Batch**: much better handling of batching overall. + +## Quick checklist + +- [ ] `Trace.with_` → `Tracer.with_`; callback argument `Scope.t` → `Span.t` +- [ ] `Scope.add_event`/`add_attrs` → `Span.add_event`/`add_attrs` (no thunk wrapper) +- [ ] `~scope:` → `~parent:` in nested `with_` calls +- [ ] `Logs.emit [Logs.make_str ...]` → `Logger.log`/`Logger.logf` +- [ ] `Metrics.emit [...]` → `Meter.emit1 Meter.default ...` +- [ ] `Metrics_callbacks.register` → `Meter.add_cb` (+ call `Meter.add_to_main_exporter`) +- [ ] `GC_metrics.setup exp` → `Gc_metrics.setup ()` +- [ ] `Collector.on_tick` → `Sdk.add_on_tick_callback` +- [ ] Remove `?service_name` call-site overrides; set `Globals.service_name` once +- [ ] `create_backend` → `create_exporter`; `set_backend` → `Sdk.set` +- [ ] `~stop:bool Atomic.t` removed from ocurl client diff --git a/dune-project b/dune-project index 1ad5563f..acca113d 100644 --- a/dune-project +++ b/dune-project @@ -53,7 +53,7 @@ (depopts atomic trace thread-local-storage lwt eio picos) (conflicts (trace - (< 0.11))) + (< 0.12))) (tags (instrumentation tracing opentelemetry datadog jaeger))) diff --git a/opentelemetry.opam b/opentelemetry.opam index cdd8401f..338ca48e 100644 --- a/opentelemetry.opam +++ b/opentelemetry.opam @@ -28,7 +28,7 @@ depends: [ ] depopts: ["atomic" "trace" "thread-local-storage" "lwt" "eio" "picos"] conflicts: [ - "trace" {< "0.11"} + "trace" {< "0.12"} ] build: [ ["dune" "subst"] {dev} diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml index 9844f582..af06091d 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml @@ -163,7 +163,7 @@ let create_consumer ?(config = Config.make ()) ~sw ~env () : let env = env end) in let module C = Generic_http_consumer.Make (M.IO) (M.Notifier) (M.Httpc) in - C.consumer ~ticker_task:(Some 0.5) ~config () + C.consumer ~ticker_task:(Some 0.5) ~on_tick:Sdk.tick ~config () let create_exporter ?(config = Config.make ()) ~sw ~env () = let consumer = create_consumer ~config ~sw ~env () in @@ -172,21 +172,23 @@ let create_exporter ?(config = Config.make ()) ~sw ~env () = ~high_watermark:Bounded_queue.Defaults.high_watermark () in Exporter_queued.create ~clock:Clock.ptime_clock ~q:bq ~consumer () - |> Exporter_batch.add_batching ~config let create_backend = create_exporter let setup_ ~sw ~config env : unit = Opentelemetry_ambient_context.set_current_storage Ambient_context_eio.storage; let exp = create_exporter ~config ~sw ~env () in - Main_exporter.set exp + Sdk.set ?batch_traces:config.batch_traces ?batch_metrics:config.batch_metrics + ?batch_logs:config.batch_logs + ~batch_timeout:Mtime.Span.(config.batch_timeout_ms * ms) + exp let setup ?(config = Config.make ()) ?(enable = true) ~sw env = if enable && not config.sdk_disabled then setup_ ~sw ~config env let remove_exporter () = let p, waker = Eio.Promise.create () in - Main_exporter.remove () ~on_done:(fun () -> Eio.Promise.resolve waker ()); + Sdk.remove () ~on_done:(fun () -> Eio.Promise.resolve waker ()); Eio.Promise.await p let remove_backend = remove_exporter diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index 8861f415..db7703af 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -95,7 +95,8 @@ module Consumer_impl = (Httpc) let create_consumer ?(config = Config.make ()) () = - Consumer_impl.consumer ~ticker_task:(Some 0.5) ~config () + Consumer_impl.consumer ~ticker_task:(Some 0.5) ~on_tick:OTEL.Sdk.tick ~config + () let create_exporter ?(config = Config.make ()) () = let consumer = create_consumer ~config () in @@ -104,14 +105,16 @@ let create_exporter ?(config = Config.make ()) () = ~high_watermark:Bounded_queue.Defaults.high_watermark () in Exporter_queued.create ~clock:Clock.ptime_clock ~q:bq ~consumer () - |> Exporter_batch.add_batching ~config let create_backend = create_exporter let setup_ ~config () : unit = Opentelemetry_client_lwt.Util_ambient_context.setup_ambient_context (); let exp = create_exporter ~config () in - Main_exporter.set exp; + Sdk.set ?batch_traces:config.batch_traces ?batch_metrics:config.batch_metrics + ?batch_logs:config.batch_logs + ~batch_timeout:Mtime.Span.(config.batch_timeout_ms * ms) + exp; () let setup ?(config = Config.make ()) ?(enable = true) () = @@ -120,7 +123,7 @@ let setup ?(config = Config.make ()) ?(enable = true) () = let remove_exporter () : unit Lwt.t = let done_fut, done_u = Lwt.wait () in (* Printf.eprintf "otel.client.cohttp-lwt: removing…\n%!"; *) - Main_exporter.remove + Sdk.remove ~on_done:(fun () -> (* Printf.eprintf "otel.client.cohttp-lwt: done removing\n%!"; *) Lwt.wakeup_later done_u ()) diff --git a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml index 2ee0277b..b7a59e8a 100644 --- a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml +++ b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml @@ -70,7 +70,8 @@ module Consumer_impl = (Httpc) let create_consumer ?(config = Config.make ()) () = - Consumer_impl.consumer ~ticker_task:(Some 0.5) ~config () + Consumer_impl.consumer ~ticker_task:(Some 0.5) ~on_tick:OTEL.Sdk.tick ~config + () let create_exporter ?(config = Config.make ()) () = let consumer = create_consumer ~config () in @@ -79,14 +80,16 @@ let create_exporter ?(config = Config.make ()) () = ~high_watermark:Bounded_queue.Defaults.high_watermark () in Exporter_queued.create ~clock:Clock.ptime_clock ~q:bq ~consumer () - |> Exporter_batch.add_batching ~config let create_backend = create_exporter let setup_ ~config () : Exporter.t = Opentelemetry_client_lwt.Util_ambient_context.setup_ambient_context (); let exp = create_exporter ~config () in - Main_exporter.set exp; + Sdk.set ?batch_traces:config.batch_traces ?batch_metrics:config.batch_metrics + ?batch_logs:config.batch_logs + ~batch_timeout:Mtime.Span.(config.batch_timeout_ms * ms) + exp; exp let setup ?(config = Config.make ()) ?(enable = true) () = @@ -95,7 +98,7 @@ let setup ?(config = Config.make ()) ?(enable = true) () = let remove_exporter () : unit Lwt.t = let done_fut, done_u = Lwt.wait () in - Main_exporter.remove ~on_done:(fun () -> Lwt.wakeup_later done_u ()) (); + Sdk.remove ~on_done:(fun () -> Lwt.wakeup_later done_u ()) (); done_fut let remove_backend = remove_exporter diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index 473c467d..fd262e0b 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -73,8 +73,8 @@ let consumer ?(config = Config.make ()) () : else None in - Consumer_impl.consumer ~override_n_workers:n_workers ~ticker_task - ~config:config.common () + Consumer_impl.consumer ~override_n_workers:n_workers ~on_tick:OTEL.Sdk.tick + ~ticker_task ~config:config.common () let create_exporter ?(config = Config.make ()) () : OTEL.Exporter.t = let consumer = consumer ~config () in @@ -84,43 +84,25 @@ let create_exporter ?(config = Config.make ()) () : OTEL.Exporter.t = in OTELC.Exporter_queued.create ~clock:OTEL.Clock.ptime_clock ~q:bq ~consumer () - |> OTELC.Exporter_batch.add_batching ~config:config.common let create_backend = create_exporter -let shutdown_and_wait ?(after_shutdown = ignore) (self : OTEL.Exporter.t) : unit - = - let open Opentelemetry_client_sync in - let sq = Sync_queue.create () in - OTEL.Aswitch.on_turn_off (OTEL.Exporter.active self) (fun () -> - Sync_queue.push sq ()); - OTEL.Exporter.shutdown self; - Sync_queue.pop sq; - after_shutdown self; - () - let setup_ ~config () : OTEL.Exporter.t = let exporter = create_exporter ~config () in - OTEL.Main_exporter.set exporter; + OTEL.Sdk.set ?batch_traces:config.common.batch_traces + ?batch_metrics:config.common.batch_metrics + ?batch_logs:config.common.batch_logs + ~batch_timeout:Mtime.Span.(config.common.batch_timeout_ms * ms) + exporter; OTELC.Self_trace.set_enabled config.common.self_trace; - - if config.ticker_thread then ( - (* at most a minute *) - let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in - let active = OTEL.Exporter.active exporter in - ignore - (Opentelemetry_client_sync.Util_thread.setup_ticker_thread ~active - ~sleep_ms exporter () - : Thread.t) - ); exporter let remove_exporter () : unit = let open Opentelemetry_client_sync in (* used to wait *) let sq = Sync_queue.create () in - OTEL.Main_exporter.remove () ~on_done:(fun () -> Sync_queue.push sq ()); + OTEL.Sdk.remove () ~on_done:(fun () -> Sync_queue.push sq ()); Sync_queue.pop sq let remove_backend = remove_exporter @@ -129,10 +111,12 @@ let setup ?(config : Config.t = Config.make ()) ?(enable = true) () = if enable && not config.common.sdk_disabled then ignore (setup_ ~config () : OTEL.Exporter.t) -let with_setup ?after_shutdown ?(config : Config.t = Config.make ()) +let with_setup ?(after_shutdown = ignore) ?(config : Config.t = Config.make ()) ?(enable = true) () f = if enable && not config.common.sdk_disabled then ( let exp = setup_ ~config () in - Fun.protect f ~finally:(fun () -> shutdown_and_wait ?after_shutdown exp) + Fun.protect f ~finally:(fun () -> + remove_exporter (); + after_shutdown exp) ) else f () diff --git a/src/client/exporter_batch.ml b/src/client/exporter_batch.ml deleted file mode 100644 index c8df20ca..00000000 --- a/src/client/exporter_batch.ml +++ /dev/null @@ -1,45 +0,0 @@ -open Common_ - -let add_batching ~(config : Http_config.t) (exp : OTEL.Exporter.t) : - OTEL.Exporter.t = - let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in - - let emit_spans = - Emitter_batch.add_batching_opt ~timeout ~batch_size:config.batch_traces - exp.emit_spans - in - let emit_metrics = - Emitter_batch.add_batching_opt ~timeout ~batch_size:config.batch_metrics - exp.emit_metrics - in - let emit_logs = - Emitter_batch.add_batching_opt ~timeout ~batch_size:config.batch_logs - exp.emit_logs - in - - let active = exp.active in - let tick = exp.tick in - let on_tick = exp.on_tick in - let clock = exp.clock in - - let self_metrics () = exp.self_metrics () in - let shutdown () = - let open Opentelemetry_emitter in - Emitter.flush_and_close emit_spans; - Emitter.flush_and_close emit_metrics; - Emitter.flush_and_close emit_logs; - - exp.shutdown () - in - - { - OTEL.Exporter.active; - clock; - emit_spans; - emit_metrics; - emit_logs; - on_tick; - tick; - shutdown; - self_metrics; - } diff --git a/src/client/exporter_batch.mli b/src/client/exporter_batch.mli deleted file mode 100644 index 6dacbc74..00000000 --- a/src/client/exporter_batch.mli +++ /dev/null @@ -1,10 +0,0 @@ -(** Add batching to the emitters of an exporter. - - The exporter has multiple emitters (one per signal type), this can add - batching on top of each of them (so that they emit less frequent, larger - batches of signals, amortizing the per-signal cost). *) - -open Common_ - -val add_batching : config:Http_config.t -> OTEL.Exporter.t -> OTEL.Exporter.t -(** Given an exporter, add batches for each emitter according to [config]. *) diff --git a/src/client/exporter_combine.ml b/src/client/exporter_combine.ml index 28577d90..0b17ec0b 100644 --- a/src/client/exporter_combine.ml +++ b/src/client/exporter_combine.ml @@ -1,42 +1,29 @@ (** Combine multiple exporters into one *) open Common_ -open Opentelemetry_atomic - -open struct - let shutdown_l (es : OTEL.Exporter.t list) ~trigger : unit = - let missing = Atomic.make (List.length es) in - let on_done () = - if Atomic.fetch_and_add missing (-1) = 1 then - (* we were the last exporter to shutdown, [missing] is now 0 *) - Aswitch.turn_off trigger - in - - List.iter (fun e -> Aswitch.on_turn_off (OTEL.Exporter.active e) on_done) es; - List.iter OTEL.Exporter.shutdown es -end let combine_l (es : OTEL.Exporter.t list) : OTEL.Exporter.t = - let open OTEL.Exporter in - if es = [] then - OTEL.Exporter.dummy () - else ( + match es with + | [] -> OTEL.Exporter.dummy () + | _ -> + (* active turns off once all constituent exporters are off *) let active, trigger = Aswitch.create () in + let remaining = Atomic.make (List.length es) in + List.iter + (fun e -> + Aswitch.on_turn_off (OTEL.Exporter.active e) (fun () -> + if Atomic.fetch_and_add remaining (-1) = 1 then + Aswitch.turn_off trigger)) + es; { + OTEL.Exporter.export = + (fun sig_ -> List.iter (fun e -> e.OTEL.Exporter.export sig_) es); active = (fun () -> active); - clock = (List.hd es).clock; - emit_spans = - Emitter_combine.combine_l (List.map (fun e -> e.emit_spans) es); - emit_logs = Emitter_combine.combine_l (List.map (fun e -> e.emit_logs) es); - emit_metrics = - Emitter_combine.combine_l (List.map (fun e -> e.emit_metrics) es); - on_tick = (fun f -> List.iter (fun e -> e.on_tick f) es); - tick = (fun () -> List.iter (fun e -> e.tick ()) es); - shutdown = (fun () -> shutdown_l es ~trigger); + shutdown = (fun () -> List.iter OTEL.Exporter.shutdown es); self_metrics = - (fun () -> List.fold_left (fun acc e -> e.self_metrics () @ acc) [] es); + (fun () -> + List.concat_map (fun e -> e.OTEL.Exporter.self_metrics ()) es); } - ) (** [combine exp1 exp2] is the exporter that emits signals to both [exp1] and [exp2]. *) diff --git a/src/client/exporter_debug.ml b/src/client/exporter_debug.ml index 875faba3..047ac930 100644 --- a/src/client/exporter_debug.ml +++ b/src/client/exporter_debug.ml @@ -4,34 +4,29 @@ export signals and eyeball them. *) open Common_ -open Opentelemetry_emitter (** [debug ?out ()] is an exporter that pretty-prints signals on [out]. @param out the formatter into which to print, default [stderr]. *) let debug ?(clock = OTEL.Clock.ptime_clock) ?(out = Format.err_formatter) () : OTEL.Exporter.t = + ignore clock; let open Proto in - let active, trigger = Aswitch.create () in - let ticker = Cb_set.create () in { - active = (fun () -> active); - clock; - emit_spans = - Emitter.make ~signal_name:"spans" () ~emit:(fun sp -> - List.iter (Format.fprintf out "SPAN: %a@." Trace.pp_span) sp); - emit_logs = - Emitter.make ~signal_name:"logs" () ~emit:(fun log -> + OTEL.Exporter.export = + (fun sig_ -> + match sig_ with + | OTEL.Any_signal_l.Spans sp -> + List.iter (Format.fprintf out "SPAN: %a@." Trace.pp_span) sp + | OTEL.Any_signal_l.Metrics ms -> + List.iter (Format.fprintf out "METRIC: %a@." Metrics.pp_metric) ms + | OTEL.Any_signal_l.Logs logs -> List.iter (Format.fprintf out "LOG: %a@." Proto.Logs.pp_log_record) - log); - emit_metrics = - Emitter.make ~signal_name:"metrics" () ~emit:(fun m -> - List.iter (Format.fprintf out "METRIC: %a@." Metrics.pp_metric) m); - on_tick = Cb_set.register ticker; - tick = (fun () -> Cb_set.trigger ticker); - self_metrics = (fun () -> []); + logs); + active = (fun () -> Aswitch.dummy); shutdown = (fun () -> Format.fprintf out "CLEANUP@."; - Aswitch.turn_off trigger); + ()); + self_metrics = (fun () -> []); } diff --git a/src/client/exporter_queued.ml b/src/client/exporter_queued.ml index 791d74c0..9956c9a0 100644 --- a/src/client/exporter_queued.ml +++ b/src/client/exporter_queued.ml @@ -10,32 +10,6 @@ open Common_ module BQ = Bounded_queue -module BQ_emitters = struct - (* NOTE: these emitters, when closed, don't close the bounded - queue because we need to flush_and_close the other emitters first. - The bounded queue is a shared resource. *) - - let logs_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) : - _ OTEL.Emitter.t = - Bounded_queue.Send.to_emitter q ~signal_name:"logs" - ~close_queue_on_close:false - |> Opentelemetry_emitter.Emitter.flat_map OTEL.Any_signal_l.of_logs_or_empty - - let spans_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) : - _ OTEL.Emitter.t = - Bounded_queue.Send.to_emitter q ~signal_name:"spans" - ~close_queue_on_close:false - |> Opentelemetry_emitter.Emitter.flat_map - OTEL.Any_signal_l.of_spans_or_empty - - let metrics_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) : - _ OTEL.Emitter.t = - Bounded_queue.Send.to_emitter q ~signal_name:"metrics" - ~close_queue_on_close:false - |> Opentelemetry_emitter.Emitter.flat_map - OTEL.Any_signal_l.of_metrics_or_empty -end - (** Pair a queue with a consumer to build an exporter. The resulting exporter will emit logs, spans, and traces directly into the @@ -44,19 +18,10 @@ end @param resource_attributes attributes added to every "resource" batch *) let create ~clock ~(q : OTEL.Any_signal_l.t Bounded_queue.t) ~(consumer : Consumer.any_signal_l_builder) () : OTEL.Exporter.t = - let open Opentelemetry_emitter in let shutdown_started = Atomic.make false in let active, trigger = Aswitch.create () in let consumer = consumer.start_consuming q.recv in - let emit_spans = BQ_emitters.spans_emitter_of_bq q.send in - let emit_logs = BQ_emitters.logs_emitter_of_bq q.send in - let emit_metrics = BQ_emitters.metrics_emitter_of_bq q.send in - - let tick_set = Cb_set.create () in - let tick () = Cb_set.trigger tick_set in - let on_tick f = Cb_set.register tick_set f in - let self_metrics () : _ list = let now = OTEL.Clock.now clock in let m_size = @@ -73,13 +38,12 @@ let create ~clock ~(q : OTEL.Any_signal_l.t Bounded_queue.t) m_size :: m_cap :: m_discarded :: Consumer.self_metrics consumer ~clock in + let export (sig_ : OTEL.Any_signal_l.t) = + if Aswitch.is_on active then BQ.Send.push q.send [ sig_ ] + in + let shutdown () = if Aswitch.is_on active && not (Atomic.exchange shutdown_started true) then ( - (* flush all emitters *) - Emitter.flush_and_close emit_spans; - Emitter.flush_and_close emit_logs; - Emitter.flush_and_close emit_metrics; - (* first, prevent further pushes to the queue. Consumer workers can still drain it. *) Bounded_queue.Send.close q.send; @@ -93,15 +57,4 @@ let create ~clock ~(q : OTEL.Any_signal_l.t Bounded_queue.t) (* if consumer shuts down for some reason, we also must *) Aswitch.on_turn_off (Consumer.active consumer) shutdown; - let active () = active in - { - active; - clock; - emit_logs; - emit_metrics; - emit_spans; - self_metrics; - tick; - on_tick; - shutdown; - } + { OTEL.Exporter.export; active = (fun () -> active); self_metrics; shutdown } diff --git a/src/client/exporter_stdout.ml b/src/client/exporter_stdout.ml index 71b2e449..25a900a6 100644 --- a/src/client/exporter_stdout.ml +++ b/src/client/exporter_stdout.ml @@ -1,7 +1,6 @@ (** A simple exporter that prints on stdout. *) open Common_ -open Opentelemetry_emitter open struct let pp_span out (sp : OTEL.Span.t) = @@ -40,58 +39,25 @@ end let stdout ?(clock = OTEL.Clock.ptime_clock) () : OTEL.Exporter.t = let open Opentelemetry_util in + ignore clock; let out = Format.std_formatter in let mutex = Mutex.create () in - let ticker = Cb_set.create () in - let active, trigger = Aswitch.create () in - let tick () = Cb_set.trigger ticker in - - let mk_emitter ~signal_name pp_signal = - let emit l = - if Aswitch.is_off active then raise Emitter.Closed; - pp_vlist mutex pp_signal out l - in - let enabled () = Aswitch.is_on active in - let self_metrics ~now:_ () = [] in - let tick ~mtime:_ = () in - let flush_and_close () = - if Aswitch.is_on active then - let@ () = Util_mutex.protect mutex in - Format.pp_print_flush out () - in - let closed () = Aswitch.is_off active in - { - Emitter.emit; - signal_name; - self_metrics; - closed; - enabled; - tick; - flush_and_close; - } + let export (sig_ : OTEL.Any_signal_l.t) = + match sig_ with + | OTEL.Any_signal_l.Spans sp -> pp_vlist mutex pp_span out sp + | OTEL.Any_signal_l.Logs logs -> pp_vlist mutex pp_log out logs + | OTEL.Any_signal_l.Metrics ms -> pp_vlist mutex pp_metric out ms in - let emit_spans = mk_emitter ~signal_name:"spans" pp_span in - let emit_logs = mk_emitter ~signal_name:"logs" pp_log in - let emit_metrics = mk_emitter ~signal_name:"metrics" pp_metric in - - let self_metrics () = [] in let shutdown () = - Emitter.flush_and_close emit_spans; - Emitter.flush_and_close emit_logs; - Emitter.flush_and_close emit_metrics; - Aswitch.turn_off trigger + let@ () = Util_mutex.protect mutex in + Format.pp_print_flush out () in { - active = (fun () -> active); - clock; - emit_spans; - emit_logs; - emit_metrics; - on_tick = Cb_set.register ticker; - self_metrics; - tick; + OTEL.Exporter.export; + active = (fun () -> Aswitch.dummy); shutdown; + self_metrics = (fun () -> []); } diff --git a/src/client/generic_consumer.ml b/src/client/generic_consumer.ml index 1c4553a7..f83e1c42 100644 --- a/src/client/generic_consumer.ml +++ b/src/client/generic_consumer.ml @@ -35,6 +35,7 @@ module Make sender_config:Sender.config -> n_workers:int -> ticker_task:float option -> + ?on_tick:(unit -> unit) -> unit -> Consumer.any_signal_l_builder (** Make a consumer builder, ie. a builder function that will take a bounded @@ -46,6 +47,7 @@ end = struct type config = { n_workers: int; ticker_task: float option; + on_tick: unit -> unit; } type status = @@ -174,14 +176,18 @@ end = struct | Stopped | Shutting_down -> IO.return () | Active -> let* () = IO.sleep_s interval_s in - if Aswitch.is_on self.active then tick self; + if Aswitch.is_on self.active then ( + tick self; + self.config.on_tick () + ); loop () in IO.spawn loop - let create_state ~sender_config ~n_workers ~ticker_task ~q () : state = + let create_state ~sender_config ~n_workers ~ticker_task ~on_tick ~q () : state + = let active, active_trigger = Aswitch.create () in - let config = { n_workers; ticker_task } in + let config = { n_workers; ticker_task; on_tick } in let self = { active; @@ -233,12 +239,14 @@ end = struct let self_metrics ~clock () = self_metrics self ~clock in { active = (fun () -> self.active); tick; shutdown; self_metrics } - let consumer ~sender_config ~n_workers ~ticker_task () : + let consumer ~sender_config ~n_workers ~ticker_task ?(on_tick = ignore) () : Consumer.any_signal_l_builder = { start_consuming = (fun q -> - let st = create_state ~sender_config ~n_workers ~ticker_task ~q () in + let st = + create_state ~sender_config ~n_workers ~ticker_task ~on_tick ~q () + in to_consumer st); } end diff --git a/src/client/generic_consumer_exporter.ml b/src/client/generic_consumer_exporter.ml index b939dc20..10355dc6 100644 --- a/src/client/generic_consumer_exporter.ml +++ b/src/client/generic_consumer_exporter.ml @@ -68,15 +68,8 @@ end = struct | `Closed -> shutdown_worker self; IO.return () - | `Item (Logs logs) -> - OTEL.Exporter.send_logs self.exp logs; - loop () - | `Item (Metrics ms) -> - OTEL.Exporter.send_metrics self.exp ms; - - loop () - | `Item (Spans sp) -> - OTEL.Exporter.send_trace self.exp sp; + | `Item sig_ -> + self.exp.OTEL.Exporter.export sig_; loop () | `Empty -> (match Atomic.get self.status with @@ -110,12 +103,6 @@ end = struct } in - (* if [exporter] turns off, shut us down too. Note that [shutdown] - is idempotent so it won't lead to divergence when it shuts the - exporter down. *) - Aswitch.on_turn_off (OTEL.Exporter.active exporter) (fun () -> - shutdown self); - start_worker self; self diff --git a/src/client/generic_http_consumer.ml b/src/client/generic_http_consumer.ml index dd1cdd8d..05e9bf66 100644 --- a/src/client/generic_http_consumer.ml +++ b/src/client/generic_http_consumer.ml @@ -29,6 +29,7 @@ module Make val consumer : ?override_n_workers:int -> ticker_task:float option -> + ?on_tick:(unit -> unit) -> config:Http_config.t -> unit -> Consumer.any_signal_l_builder @@ -127,8 +128,8 @@ end = struct let default_n_workers = 50 - let consumer ?override_n_workers ~ticker_task ~(config : Http_config.t) () : - Consumer.any_signal_l_builder = + let consumer ?override_n_workers ~ticker_task ?(on_tick = ignore) + ~(config : Http_config.t) () : Consumer.any_signal_l_builder = let n_workers = max 2 (min 500 @@ -138,5 +139,5 @@ end = struct | None, None -> default_n_workers)) in - C.consumer ~sender_config:config ~n_workers ~ticker_task () + C.consumer ~sender_config:config ~n_workers ~ticker_task ~on_tick () end diff --git a/src/client/lwt/util_ticker.ml b/src/client/lwt/util_ticker.ml index 95aed6ad..426bcaf9 100644 --- a/src/client/lwt/util_ticker.ml +++ b/src/client/lwt/util_ticker.ml @@ -1,10 +1,9 @@ -open Common_ open Lwt.Syntax -(** Lwt task that calls [Exporter.tick] regularly, to help enforce timeouts. +(** Lwt task that calls [tick()] regularly, to help enforce timeouts. @param frequency_s how often in seconds does the tick tock? *) let start_ticker_thread ?(finally = ignore) ~(stop : bool Atomic.t) - ~(frequency_s : float) (exp : OTEL.Exporter.t) : unit = + ~(frequency_s : float) ~(tick : unit -> unit) () : unit = let frequency_s = max frequency_s 0.5 in let rec tick_loop () = if Atomic.get stop then ( @@ -12,8 +11,7 @@ let start_ticker_thread ?(finally = ignore) ~(stop : bool Atomic.t) Lwt.return () ) else let* () = Lwt_unix.sleep frequency_s in - let mtime = Mtime_clock.now () in - OTEL.Exporter.tick exp ~mtime; + tick (); tick_loop () in Lwt.async tick_loop diff --git a/src/client/sync/shutdown_sync.ml b/src/client/sync/shutdown_sync.ml index 97ab5c2f..29e2418c 100644 --- a/src/client/sync/shutdown_sync.ml +++ b/src/client/sync/shutdown_sync.ml @@ -2,13 +2,9 @@ open Common_ (** Shutdown this exporter and block the thread until it's done. - {b NOTE}: this might deadlock if the exporter runs entirely in the current - thread! *) -let shutdown (exp : OTEL.Exporter.t) : unit = - let q = Sync_queue.create () in - OTEL.Exporter.on_stop exp (Sync_queue.push q); - OTEL.Exporter.shutdown exp; - Sync_queue.pop q + With the new Exporter.t interface, shutdown is synchronous. This function is + kept for backwards compatibility. *) +let shutdown (exp : OTEL.Exporter.t) : unit = OTEL.Exporter.shutdown exp (** Shutdown main exporter and wait *) -let shutdown_main () : unit = Option.iter shutdown (OTEL.Main_exporter.get ()) +let shutdown_main () : unit = Option.iter shutdown (OTEL.Sdk.get ()) diff --git a/src/client/sync/util_thread.ml b/src/client/sync/util_thread.ml index 9fad94e5..d7e5b24f 100644 --- a/src/client/sync/util_thread.ml +++ b/src/client/sync/util_thread.ml @@ -1,5 +1,3 @@ -open Common_ - (** start a thread in the background, running [f()], blocking signals *) let start_bg_thread (f : unit -> unit) : Thread.t = let unix_run () = @@ -26,7 +24,7 @@ let start_bg_thread (f : unit -> unit) : Thread.t = Thread.create run () (** thread that calls [tick()] regularly, to help enforce timeouts *) -let setup_ticker_thread ~(active : Aswitch.t) ~sleep_ms (exp : OTEL.Exporter.t) +let setup_ticker_thread ~(active : Aswitch.t) ~sleep_ms ~(tick : unit -> unit) () = let sleep_s = float sleep_ms /. 1000. in let tick_loop () = @@ -34,10 +32,7 @@ let setup_ticker_thread ~(active : Aswitch.t) ~sleep_ms (exp : OTEL.Exporter.t) while Aswitch.is_on active do Thread.delay sleep_s; - if Aswitch.is_on active then ( - let mtime = Mtime_clock.now () in - OTEL.Exporter.tick exp ~mtime - ) + if Aswitch.is_on active then tick () done with | Sync_queue.Closed -> () diff --git a/src/core/clock.ml b/src/core/clock.ml index 505274f5..4ba17f53 100644 --- a/src/core/clock.ml +++ b/src/core/clock.ml @@ -34,11 +34,6 @@ module Main = struct (** Set the current clock *) let set t : unit = Util_atomic.update_cas main (fun _ -> (), t) - - (** Clock that always defers to the current main clock. Whenever - [now dynamic_main] is called, it in turn becomes [now (get ())], ie it - looks up the current clock and uses it. *) - let dynamic_main : t = { now = (fun () -> now (get ())) } end (** Timestamp using the main clock *) diff --git a/src/core/dune b/src/core/dune index 1c0ae1ab..c1d58a81 100644 --- a/src/core/dune +++ b/src/core/dune @@ -2,7 +2,14 @@ (name opentelemetry_core) (public_name opentelemetry.core) (synopsis "Core types and definitions for opentelemetry") - (flags :standard -warn-error -a+8 -open Opentelemetry_util) + (flags + :standard + -warn-error + -a+8 + -open + Opentelemetry_util + -open + Opentelemetry_atomic) (libraries opentelemetry.proto opentelemetry.util diff --git a/src/core/exporter.ml b/src/core/exporter.ml index 4a67847a..a26e0365 100644 --- a/src/core/exporter.ml +++ b/src/core/exporter.ml @@ -6,24 +6,16 @@ This is part of the SDK, not just the API, so most real implementations live in their own library. *) -open Common_ -open Opentelemetry_emitter - type t = { + export: Any_signal_l.t -> unit; + (** Export a batch of signals. Called by the provider when signals are + ready to be sent. *) active: unit -> Aswitch.t; - (** Is the exporer currently active? After shutdown this is turned off. *) - clock: Clock.t; - emit_spans: Proto.Trace.span Emitter.t; - emit_metrics: Proto.Metrics.metric Emitter.t; - emit_logs: Proto.Logs.log_record Emitter.t; - on_tick: (unit -> unit) -> unit; - tick: unit -> unit; - (** Call all the callbacks registered with [on_tick]. Should be triggered - regularly for background processing, timeout checks, etc. *) + (** Lifecycle switch: turns off when the exporter has fully shut down + (i.e. the consumer queue is drained). *) shutdown: unit -> unit; - (** [shutdown ()] is called when the exporter is shut down, and is - responsible for sending remaining batches, flushing sockets, etc. To - know when shutdown is complete, register callbacks on [active]. + (** [shutdown ()] initiates shutdown: flushes remaining batches, closes + the queue, etc. Watch [active] to know when it's complete. @since 0.12 *) self_metrics: unit -> Metrics.t list; (** metrics about the exporter itself *) } @@ -31,58 +23,17 @@ type t = { (** Dummy exporter, does nothing *) let dummy () : t = - let ticker = Cb_set.create () in - let active, trigger = Aswitch.create () in { - active = (fun () -> active); - clock = Clock.ptime_clock; - emit_spans = Emitter.dummy; - emit_metrics = Emitter.dummy; - emit_logs = Emitter.dummy; - on_tick = Cb_set.register ticker; - tick = (fun () -> Cb_set.trigger ticker); - shutdown = (fun () -> Aswitch.turn_off trigger); + export = ignore; + active = (fun () -> Aswitch.dummy); + shutdown = ignore; self_metrics = (fun () -> []); } -let[@inline] send_trace (self : t) (l : Proto.Trace.span list) = - Emitter.emit self.emit_spans l - -let[@inline] send_metrics (self : t) (l : Proto.Metrics.metric list) = - Emitter.emit self.emit_metrics l - -let[@inline] send_logs (self : t) (l : Proto.Logs.log_record list) = - Emitter.emit self.emit_logs l - -let[@inline] on_tick (self : t) f = self.on_tick f - -(** Do background work. Call this regularly if the collector doesn't already - have a ticker thread or internal timer. *) -let tick ~mtime (self : t) = - (* make sure emitters get the chance to check timeouts, flush, etc. *) - Emitter.tick ~mtime self.emit_spans; - Emitter.tick ~mtime self.emit_metrics; - Emitter.tick ~mtime self.emit_logs; - - (* call the callbacks *) - self.tick (); - () - -let[@inline] active self : Aswitch.t = self.active () - -(** [on_stop e f] calls [f()] when [e] stops, or now if it's already stopped *) -let on_stop self f = Aswitch.on_turn_off (self.active ()) f +let[@inline] active (self : t) : Aswitch.t = self.active () let[@inline] shutdown (self : t) : unit = self.shutdown () let (cleanup [@deprecated "use shutdown instead"]) = shutdown -let self_metrics (self : t) : _ list = - let now = Clock.now self.clock in - List.flatten - [ - self.self_metrics (); - self.emit_spans.self_metrics ~now (); - self.emit_logs.self_metrics ~now (); - self.emit_metrics.self_metrics ~now (); - ] +let[@inline] self_metrics (self : t) : _ list = self.self_metrics () diff --git a/src/core/log_record.ml b/src/core/log_record.ml index 57e6bfed..438776c8 100644 --- a/src/core/log_record.ml +++ b/src/core/log_record.ml @@ -77,3 +77,7 @@ let make_strf ?time ?severity ?log_level ?flags ?trace_id ?span_id ?attrs make_str ?time ~observed_time_unix_nano ?severity ?log_level ?flags ?trace_id ?span_id ?attrs bod) fmt + +let add_attrs (self : t) (attrs : Key_value.t list) : unit = + let attrs = List.map Key_value.conv attrs in + Proto.Logs.log_record_set_attributes self (attrs @ self.attributes) diff --git a/src/core/metrics.ml b/src/core/metrics.ml index aab20afe..64f56dad 100644 --- a/src/core/metrics.ml +++ b/src/core/metrics.ml @@ -74,6 +74,37 @@ let histogram ~name ?description ?unit_ ?aggregation_temporality in make_metric ~name ?description ?unit_ ~data () +let add_attrs (m : t) (attrs : Key_value.t list) : unit = + let attrs = List.map Key_value.conv attrs in + match m.data with + | None -> () + | Some (Gauge g) -> + List.iter + (fun (dp : number_data_point) -> + number_data_point_set_attributes dp (attrs @ dp.attributes)) + g.data_points + | Some (Sum s) -> + List.iter + (fun (dp : number_data_point) -> + number_data_point_set_attributes dp (attrs @ dp.attributes)) + s.data_points + | Some (Histogram h) -> + List.iter + (fun (dp : histogram_data_point) -> + histogram_data_point_set_attributes dp (attrs @ dp.attributes)) + h.data_points + | Some (Exponential_histogram eh) -> + List.iter + (fun (dp : exponential_histogram_data_point) -> + exponential_histogram_data_point_set_attributes dp + (attrs @ dp.attributes)) + eh.data_points + | Some (Summary s) -> + List.iter + (fun (dp : summary_data_point) -> + summary_data_point_set_attributes dp (attrs @ dp.attributes)) + s.data_points + (* TODO: exponential history *) (* TODO: summary *) (* TODO: exemplar *) diff --git a/src/client/batch.ml b/src/lib/batch.ml similarity index 100% rename from src/client/batch.ml rename to src/lib/batch.ml diff --git a/src/client/batch.mli b/src/lib/batch.mli similarity index 100% rename from src/client/batch.mli rename to src/lib/batch.mli diff --git a/src/lib/dynamic_enricher.ml b/src/lib/dynamic_enricher.ml new file mode 100644 index 00000000..700c6826 --- /dev/null +++ b/src/lib/dynamic_enricher.ml @@ -0,0 +1,23 @@ +type t = unit -> Key_value.t list +(** A dynamic enricher is a callback that produces high-cardinality attributes + at span/log-record creation time. This enables "wide events". *) + +open struct + let enrichers_ : t Alist.t = Alist.make () +end + +let add (f : t) : unit = Alist.add enrichers_ f + +let collect () : Key_value.t list = + let acc = ref [] in + List.iter + (fun f -> + match f () with + | kvs -> acc := List.rev_append kvs !acc + | exception exn -> + let bt = Printexc.get_raw_backtrace () in + Printf.eprintf "opentelemetry: dynamic_enricher raised %s\n%s%!" + (Printexc.to_string exn) + (Printexc.raw_backtrace_to_string bt)) + (Alist.get enrichers_); + !acc diff --git a/src/client/emitter_batch.ml b/src/lib/emitter_batch.ml similarity index 100% rename from src/client/emitter_batch.ml rename to src/lib/emitter_batch.ml diff --git a/src/client/emitter_batch.mli b/src/lib/emitter_batch.mli similarity index 100% rename from src/client/emitter_batch.mli rename to src/lib/emitter_batch.mli diff --git a/src/lib/gc_metrics.ml b/src/lib/gc_metrics.ml index 2c909de5..5258880f 100644 --- a/src/lib/gc_metrics.ml +++ b/src/lib/gc_metrics.ml @@ -34,23 +34,13 @@ let get_metrics () : Metrics.t list = [ int ~now gc.Gc.compactions ]; ] -let setup ?(min_interval_s = default_interval_s) (exp : Exporter.t) = - (* limit rate *) +let setup ?(min_interval_s = default_interval_s) + ?(meter = Meter_provider.default_meter) () = let min_interval_s = max 5 min_interval_s in let min_interval = Mtime.Span.(min_interval_s * s) in let limiter = Interval_limiter.create ~min_interval () in + Sdk.add_on_tick_callback (fun () -> + if Interval_limiter.make_attempt limiter then + List.iter (Meter.emit1 meter) (get_metrics ())) - let on_tick () = - if Interval_limiter.make_attempt limiter then ( - let m = get_metrics () in - Exporter.send_metrics exp m - ) - in - Exporter.on_tick exp on_tick - -let setup_on_main_exporter ?min_interval_s () = - match Main_exporter.get () with - | None -> () - | Some exp -> setup ?min_interval_s exp - -let basic_setup () = setup_on_main_exporter () +let basic_setup () = setup () diff --git a/src/lib/gc_metrics.mli b/src/lib/gc_metrics.mli index d17496aa..53cae54b 100644 --- a/src/lib/gc_metrics.mli +++ b/src/lib/gc_metrics.mli @@ -1,21 +1,12 @@ -(** Export GC metrics. - - These metrics are emitted regularly. *) +(** Export GC metrics periodically. *) val get_metrics : unit -> Metrics.t list -(** Get a few metrics from the current state of the GC. *) +(** Get a snapshot of GC statistics as metrics. *) -val setup : ?min_interval_s:int -> Exporter.t -> unit -(** Setup a hook that will emit GC statistics on every tick. It does assume that - [tick] is called regularly on the exporter. For example, if we ensure the - exporter's [tick] function is called every 5s, we'll get GC metrics every - 5s. +val setup : ?min_interval_s:int -> ?meter:Meter.t -> unit -> unit +(** Register a tick callback that emits GC statistics periodically. + @param min_interval_s emit at most every N seconds (default 20) + @param meter where to emit metrics (default [Meter.default]) *) - @param min_interval_s - if provided, GC metrics will be emitted at most every [min_interval_s] - seconds. This prevents flooding. Default value is 20s. *) - -val setup_on_main_exporter : ?min_interval_s:int -> unit -> unit -(** Setup the hook on the main exporter. *) - -val basic_setup : unit -> unit [@@deprecated "use setup_on_main_exporter"] +val basic_setup : unit -> unit +(** [setup ()] — uses all defaults. *) diff --git a/src/lib/globals.ml b/src/lib/globals.ml index 4dc88055..e166cebf 100644 --- a/src/lib/globals.ml +++ b/src/lib/globals.ml @@ -98,3 +98,32 @@ let mk_attributes ?(service_name = !service_name) ?(attrs = []) () : _ list = :: l in l |> merge_global_attributes_ + +(** Global tick callback registry. Callbacks are run periodically by the SDK + ticker. Other modules register here to avoid depending on {!Sdk}. *) +let tick_cbs_ : (unit -> unit) Alist.t = Alist.make () + +let add_on_tick_callback (f : unit -> unit) : unit = Alist.add tick_cbs_ f + +let run_tick_callbacks () : unit = + List.iter (fun f -> f ()) (Alist.get tick_cbs_) + +(* TODO: rename to dynamic_attributes *) +module Enricher = struct + type t = unit -> key_value list + + let cached ~(timeout_s : float) (e : t) : t = + let last_updated = ref (Unix.gettimeofday ()) in + let value = ref (e ()) in + fun () -> + let now = Unix.gettimeofday () in + if now > !last_updated +. timeout_s then ( + last_updated := now; + value := e () + ); + !value + + let all_ : t list ref = ref [] + + let add f = all_ := f :: !all_ +end diff --git a/src/lib/log_provider.ml b/src/lib/log_provider.ml new file mode 100644 index 00000000..95de674b --- /dev/null +++ b/src/lib/log_provider.ml @@ -0,0 +1,76 @@ +open Opentelemetry_emitter + +open struct + let provider_ : Logger.t Atomic.t = Atomic.make Logger.dummy +end + +let get () : Logger.t = Atomic.get provider_ + +let set (t : Logger.t) : unit = Atomic.set provider_ t + +let clear () : unit = Atomic.set provider_ Logger.dummy + +(** Get a logger pre-configured with a fixed set of attributes added to every + log record it emits, forwarding to the current global logger. Intended to be + called once at the top of a library module. + + @param name instrumentation scope name (recorded as [otel.scope.name]) + @param version + instrumentation scope version (recorded as [otel.scope.version]) + @param __MODULE__ + the OCaml module name, typically the [__MODULE__] literal (recorded as + [code.namespace]) + @param attrs additional fixed attributes *) +let get_logger ?name ?version ?(attrs : (string * [< Value.t ]) list = []) + ?__MODULE__ () : Logger.t = + let extra = + Scope_attributes.make_attrs ?name ?version ~attrs ?__MODULE__ () + in + { + Logger.emit = + Emitter.make ~signal_name:"logs" + ~enabled:(fun () -> Emitter.enabled (Atomic.get provider_).emit) + ~emit:(fun logs -> + (match extra with + | [] -> () + | _ -> List.iter (fun log -> Log_record.add_attrs log extra) logs); + Emitter.emit (Atomic.get provider_).emit logs) + (); + clock = { Clock.now = (fun () -> Clock.now (Clock.Main.get ())) }; + } + +(** A Logger.t that lazily reads the global at emit time *) +let default_logger : Logger.t = get_logger () + +open Log_record + +(** Create log record and emit it on [logger] *) +let log ?(logger = default_logger) ?attrs ?trace_id ?span_id + ?(severity : severity option) (msg : string) : unit = + if Logger.enabled logger then ( + let now = Clock.now logger.clock in + let dyn_attrs = Dynamic_enricher.collect () in + let attrs = + match dyn_attrs with + | [] -> attrs + | _ -> + let base = Option.value ~default:[] attrs in + Some (List.rev_append dyn_attrs base) + in + let logrec = + Log_record.make_str ?attrs ?trace_id ?span_id ?severity + ~observed_time_unix_nano:now msg + in + Logger.emit1 logger logrec + ) + +(** Helper to create a log record, with a suspension, like in [Logs]. + + Example usage: + [logf ~severity:Severity_number_warn (fun k->k"oh no!! %s it's bad: %b" + "help" true)] *) +let logf ?(logger = default_logger) ?attrs ?trace_id ?span_id ?severity msgf : + unit = + if Logger.enabled logger then + msgf (fun fmt -> + Format.kasprintf (log ~logger ?attrs ?trace_id ?span_id ?severity) fmt) diff --git a/src/lib/logger.ml b/src/lib/logger.ml index 006d4d59..1dc0d45a 100644 --- a/src/lib/logger.ml +++ b/src/lib/logger.ml @@ -8,60 +8,22 @@ open Opentelemetry_emitter -(** {2 Logger object} *) - type t = { emit: Log_record.t Emitter.t; clock: Clock.t; } +(** Dummy logger, always disabled *) let dummy : t = { emit = Emitter.dummy; clock = Clock.ptime_clock } let[@inline] enabled (self : t) : bool = Emitter.enabled self.emit -let of_exporter (exp : Exporter.t) : t = - { emit = exp.emit_logs; clock = exp.clock } - let[@inline] emit1 (self : t) (l : Log_record.t) = Emitter.emit self.emit [ l ] -let (emit_main [@deprecated "use an explicit Logger.t"]) = - fun (logs : Log_record.t list) : unit -> - match Main_exporter.get () with - | None -> () - | Some exp -> Exporter.send_logs exp logs - -open struct - (* internal default, keeps the default params below working without deprecation alerts *) - let dynamic_main_ : t = - of_exporter Main_exporter.dynamic_forward_to_main_exporter -end - -(** A logger that uses the current {!Main_exporter}'s logger *) -let default = dynamic_main_ - -(** {2 Logging helpers} *) - -open Log_record - -(** Create log record and emit it on [logger] *) -let log ?(logger = dynamic_main_) ?attrs ?trace_id ?span_id - ?(severity : severity option) (msg : string) : unit = - if enabled logger then ( - let now = Clock.now logger.clock in - let logrec = - Log_record.make_str ?attrs ?trace_id ?span_id ?severity - ~observed_time_unix_nano:now msg - in - emit1 logger logrec - ) - -(** Helper to create a log record, with a suspension, like in [Logs]. - - Example usage: - [logf ~severity:Severity_number_warn (fun k->k"oh no!! %s it's bad: %b" - "help" true)] *) -let logf ?(logger = dynamic_main_) ?attrs ?trace_id ?span_id ?severity msgf : - unit = - if enabled logger then - msgf (fun fmt -> - Format.kasprintf (log ~logger ?attrs ?trace_id ?span_id ?severity) fmt) +let of_exporter (exp : Exporter.t) : t = + let emit = + Emitter.make ~signal_name:"logs" + ~emit:(fun logs -> exp.Exporter.export (Any_signal_l.Logs logs)) + () + in + { emit; clock = Clock.Main.get () } diff --git a/src/lib/main_exporter.ml b/src/lib/main_exporter.ml deleted file mode 100644 index 38bc98cd..00000000 --- a/src/lib/main_exporter.ml +++ /dev/null @@ -1,156 +0,0 @@ -(** Main exporter. - - This is a singleton exporter, or [None] if not defined. It is better to pass - an explicit exporter when possible, but this is quite convenient and most - programs only need one exporter. *) - -open Exporter - -(* hidden *) -open struct - (* a list of callbacks automatically added to the main exporter *) - let on_tick_cbs_ = Alist.make () - - let exporter : t option Atomic.t = Atomic.make None -end - -(** Remove current exporter, if any. - @param on_done - called when this operation is done, including shutting down the exporter, - if any *) -let remove ~on_done () : unit = - match Atomic.exchange exporter None with - | None -> on_done () - | Some exp -> - Aswitch.on_turn_off (Exporter.active exp) on_done; - let mtime = Mtime_clock.now () in - tick exp ~mtime; - shutdown exp - -(** Is there a configured exporter? *) -let[@inline] present () : bool = Option.is_some (Atomic.get exporter) - -(** Current exporter, if any *) -let[@inline] get () : t option = Atomic.get exporter - -let add_on_tick_callback f = - Alist.add on_tick_cbs_ f; - Option.iter (fun exp -> on_tick exp f) (get ()) - -module Util = struct - open Opentelemetry_emitter - - (** An emitter that uses the corresponding emitter in the current main - exporter. When this emitter is used to [emit signals], the current - exporter is looked up, [get_emitter exporter] is then used to locate the - relevant emitter [e'], and [signals] is in turn emitted in [e']. *) - let dynamic_forward_emitter_to_main_exporter ~signal_name - ~(get_emitter : Exporter.t -> _ Emitter.t) () : _ Emitter.t = - let enabled () = present () in - let closed () = not (enabled ()) in - let flush_and_close () = () in - let tick ~mtime = - match get () with - | None -> () - | Some exp -> Exporter.tick exp ~mtime - in - let self_metrics ~now () = - match get () with - | None -> [] - | Some exp -> (get_emitter exp).self_metrics ~now () - in - let emit signals = - if signals <> [] then ( - match get () with - | None -> () - | Some exp -> - let emitter = get_emitter exp in - Emitter.emit emitter signals - ) - in - Emitter.make ~signal_name ~enabled ~closed ~self_metrics ~flush_and_close - ~tick ~emit () -end - -(** Aswitch of the current exporter, or {!Aswitch.dummy} *) -let[@inline] active () : Aswitch.t = - match get () with - | None -> Aswitch.dummy - | Some e -> e.active () - -(** This exporter uses the current "main exporter" using [get()] at every - invocation. It is useful as a fallback or to port existing applications that - expect a global singleton backend^W exporter. - @since NEXT_RELEASE *) -let dynamic_forward_to_main_exporter : Exporter.t = - let emit_logs = - Util.dynamic_forward_emitter_to_main_exporter () ~signal_name:"logs" - ~get_emitter:Exporter.(fun e -> e.emit_logs) - in - let emit_metrics = - Util.dynamic_forward_emitter_to_main_exporter () ~signal_name:"metrics" - ~get_emitter:Exporter.(fun e -> e.emit_metrics) - in - let emit_spans = - Util.dynamic_forward_emitter_to_main_exporter () ~signal_name:"spans" - ~get_emitter:Exporter.(fun e -> e.emit_spans) - in - let on_tick f = - match get () with - | None -> () - | Some exp -> Exporter.on_tick exp f - in - let tick () = - match get () with - | None -> () - | Some exp -> exp.tick () - in - let self_metrics () = - match get () with - | None -> [] - | Some exp -> exp.self_metrics () - in - let shutdown () = () in - { - Exporter.active; - clock = Clock.ptime_clock; - emit_metrics; - emit_spans; - emit_logs; - on_tick; - tick; - shutdown; - self_metrics; - } - -let self_metrics () : Metrics.t list = - dynamic_forward_to_main_exporter.self_metrics () - -(** Set the global exporter *) -let set (exp : t) : unit = - (* sanity check! this specific exporter would just call itself, leading to - stack overflow. *) - if exp == dynamic_forward_to_main_exporter then - failwith - "cannot set Main_exporter.dynamic_forward_to_main_exporter as main \ - exporter!"; - - List.iter (on_tick exp) (Alist.get on_tick_cbs_); - Atomic.set exporter (Some exp) - -let (set_backend [@deprecated "use `Main_exporter.set`"]) = set - -let (remove_backend [@deprecated "use `Main_exporter.remove`"]) = remove - -let (has_backend [@deprecated "use `Main_exporter.present`"]) = present - -let (get_backend [@deprecated "use `Main_exporter.get"]) = get - -let with_setup_debug_backend ?(on_done = ignore) (exp : t) ?(enable = true) () f - = - if enable then ( - set exp; - Aswitch.on_turn_off (Exporter.active exp) on_done; - Fun.protect f ~finally:(fun () -> Exporter.shutdown exp) - ) else - Fun.protect f ~finally:(fun () -> on_done ()) diff --git a/src/lib/meter.ml b/src/lib/meter.ml index 4b09b767..d61b33d3 100644 --- a/src/lib/meter.ml +++ b/src/lib/meter.ml @@ -5,21 +5,22 @@ type t = { clock: Clock.t; } +(** Dummy meter, always disabled *) let dummy : t = { emit = Emitter.dummy; clock = Clock.ptime_clock } let[@inline] enabled (self : t) = Emitter.enabled self.emit -let of_exporter (exp : Exporter.t) : t = - { emit = exp.emit_metrics; clock = exp.clock } - -let (create [@deprecated "use Meter.of_exporter"]) = - fun ~(exporter : Exporter.t) ?name:_name () : t -> of_exporter exporter - -let default : t = Main_exporter.dynamic_forward_to_main_exporter |> of_exporter - let[@inline] emit1 (self : t) (m : Metrics.t) : unit = Emitter.emit self.emit [ m ] +let of_exporter (exp : Exporter.t) : t = + let emit = + Emitter.make ~signal_name:"metrics" + ~emit:(fun ms -> exp.Exporter.export (Any_signal_l.Metrics ms)) + () + in + { emit; clock = Clock.Main.get () } + (** Global list of raw metric callbacks, collected alongside {!Instrument.all}. *) let cbs_ : (clock:Clock.t -> unit -> Metrics.t list) Alist.t = Alist.make () @@ -37,38 +38,6 @@ let collect (self : t) : Metrics.t list = (Alist.get cbs_); List.rev !acc -let minimum_min_interval_ = Mtime.Span.(100 * ms) - -let default_min_interval_ = Mtime.Span.(4 * s) - -let clamp_interval_ interval = - if Mtime.Span.is_shorter interval ~than:minimum_min_interval_ then - minimum_min_interval_ - else - interval - -let add_to_exporter ?(min_interval = default_min_interval_) (exp : Exporter.t) - (self : t) : unit = - let limiter = - Interval_limiter.create ~min_interval:(clamp_interval_ min_interval) () - in - Exporter.on_tick exp (fun () -> - if Interval_limiter.make_attempt limiter then ( - let metrics = collect self in - if metrics <> [] then Emitter.emit self.emit metrics - )) - -let add_to_main_exporter ?(min_interval = default_min_interval_) (self : t) : - unit = - let limiter = - Interval_limiter.create ~min_interval:(clamp_interval_ min_interval) () - in - Main_exporter.add_on_tick_callback (fun () -> - if Interval_limiter.make_attempt limiter then ( - let metrics = collect self in - if metrics <> [] then Emitter.emit self.emit metrics - )) - module Instrument = Instrument module type INSTRUMENT_IMPL = Instrument.CUSTOM_IMPL diff --git a/src/lib/meter.mli b/src/lib/meter.mli index ea9ab7ea..8ab7a49f 100644 --- a/src/lib/meter.mli +++ b/src/lib/meter.mli @@ -8,7 +8,10 @@ {!add_to_exporter} or {!add_to_main_exporter} once after creating your instruments. *) -type t +type t = { + emit: Metrics.t Opentelemetry_emitter.Emitter.t; + clock: Clock.t; +} val dummy : t (** Dummy meter, always disabled *) @@ -18,13 +21,6 @@ val enabled : t -> bool val of_exporter : Exporter.t -> t (** Create a meter from an exporter *) -val create : exporter:Exporter.t -> ?name:string -> unit -> t -[@@deprecated "use of_exporter"] - -val default : t -(** Meter that forwards to the current main exporter. Equivalent to - [of_exporter Main_exporter.dynamic_forward_to_main_exporter]. *) - val emit1 : t -> Metrics.t -> unit (** Emit a single metric directly, bypassing the instrument registry *) @@ -37,17 +33,6 @@ val collect : t -> Metrics.t list (** Collect metrics from all registered instruments ({!Instrument.all}) and raw callbacks ({!add_cb}), using this meter's clock. *) -val add_to_exporter : ?min_interval:Mtime.span -> Exporter.t -> t -> unit -(** Register a periodic tick callback on [exp] that collects and emits all - instruments. Call this once after creating your instruments. - @param min_interval minimum time between collections (default 4s, min 100ms) -*) - -val add_to_main_exporter : ?min_interval:Mtime.span -> t -> unit -(** Like {!add_to_exporter} but targets the main exporter via - {!Main_exporter.add_on_tick_callback}, so it works even if the main exporter - has not been set yet. *) - module Instrument = Instrument (** Global registry of metric instruments. Re-exported from {!Opentelemetry_core.Instrument} for convenience. *) diff --git a/src/lib/meter_provider.ml b/src/lib/meter_provider.ml new file mode 100644 index 00000000..6bab48d2 --- /dev/null +++ b/src/lib/meter_provider.ml @@ -0,0 +1,75 @@ +open Opentelemetry_emitter + +open struct + let provider_ : Meter.t Atomic.t = Atomic.make Meter.dummy +end + +let get () : Meter.t = Atomic.get provider_ + +let set (t : Meter.t) : unit = Atomic.set provider_ t + +let clear () : unit = Atomic.set provider_ Meter.dummy + +(** Get a meter pre-configured with a fixed set of attributes added to every + metric it emits, forwarding to the current global meter. Intended to be + called once at the top of a library module. + + @param name instrumentation scope name (recorded as [otel.scope.name]) + @param version + instrumentation scope version (recorded as [otel.scope.version]) + @param __MODULE__ + the OCaml module name, typically the [__MODULE__] literal (recorded as + [code.namespace]) + @param attrs additional fixed attributes *) +let get_meter ?name ?version ?(attrs : (string * [< Value.t ]) list = []) + ?__MODULE__ () : Meter.t = + let extra = + Scope_attributes.make_attrs ?name ?version ~attrs ?__MODULE__ () + in + { + Meter.emit = + Emitter.make ~signal_name:"metrics" + ~enabled:(fun () -> Emitter.enabled (Atomic.get provider_).emit) + ~emit:(fun metrics -> + (match extra with + | [] -> () + | _ -> List.iter (fun m -> Metrics.add_attrs m extra) metrics); + Emitter.emit (Atomic.get provider_).emit metrics) + (); + clock = { Clock.now = (fun () -> Clock.now (Clock.Main.get ())) }; + } + +(** A Meter.t that lazily reads the global at emit time *) +let default_meter : Meter.t = get_meter () + +let minimum_min_interval_ = Mtime.Span.(100 * ms) + +let default_min_interval_ = Mtime.Span.(4 * s) + +let clamp_interval_ interval = + if Mtime.Span.is_shorter interval ~than:minimum_min_interval_ then + minimum_min_interval_ + else + interval + +let add_to_exporter ?(min_interval = default_min_interval_) (_exp : Exporter.t) + (self : Meter.t) : unit = + let limiter = + Interval_limiter.create ~min_interval:(clamp_interval_ min_interval) () + in + Globals.add_on_tick_callback (fun () -> + if Interval_limiter.make_attempt limiter then ( + let metrics = Meter.collect self in + if metrics <> [] then Emitter.emit self.emit metrics + )) + +let add_to_main_exporter ?(min_interval = default_min_interval_) + (self : Meter.t) : unit = + let limiter = + Interval_limiter.create ~min_interval:(clamp_interval_ min_interval) () + in + Globals.add_on_tick_callback (fun () -> + if Interval_limiter.make_attempt limiter then ( + let metrics = Meter.collect self in + if metrics <> [] then Emitter.emit self.emit metrics + )) diff --git a/src/lib/opentelemetry.ml b/src/lib/opentelemetry.ml index a3ed2aee..c0b0a59e 100644 --- a/src/lib/opentelemetry.ml +++ b/src/lib/opentelemetry.ml @@ -39,28 +39,38 @@ module Exporter = struct let get_logger (self : t) : Logger.t = Logger.of_exporter self end -module Main_exporter = struct - include Main_exporter +module Sdk = struct + include Sdk (** Get a tracer forwarding to the current main exporter. @since NEXT_RELEASE *) - let get_tracer () : Tracer.t = Tracer.default + let get_tracer ?name ?version ?attrs ?__MODULE__ () = + Trace_provider.get_tracer ?name ?version ?attrs ?__MODULE__ () (** Get a meter forwarding to the current main exporter. @since NEXT_RELEASE *) - let get_meter () : Meter.t = Meter.default + let get_meter ?name ?version ?attrs ?__MODULE__ () = + Meter_provider.get_meter ?name ?version ?attrs ?__MODULE__ () (** Get a logger forwarding to the current main exporter. @since NEXT_RELEASE *) - let get_logger () : Logger.t = Logger.default + let get_logger ?name ?version ?attrs ?__MODULE__ () = + Log_provider.get_logger ?name ?version ?attrs ?__MODULE__ () end +module Main_exporter = Sdk [@@deprecated "use Sdk instead"] + module Collector = struct include Exporter - include Main_exporter + include Sdk end [@@deprecated "Use 'Exporter' instead"] +module Dynamic_enricher = Dynamic_enricher +module Trace_provider = Trace_provider +module Meter_provider = Meter_provider +module Log_provider = Log_provider + (** {2 Identifiers} *) module Trace_id = Trace_id @@ -98,19 +108,48 @@ module Span_kind = Span_kind module Span = Span module Ambient_span = Ambient_span -module Tracer = Tracer + +module Tracer = struct + include Tracer + + let default = Trace_provider.default_tracer + + let with_thunk_and_finally = Trace_provider.with_thunk_and_finally + + let with_ = Trace_provider.with_ +end + module Trace = Tracer [@@deprecated "use Tracer instead"] (** {2 Metrics} *) module Metrics = Metrics module Instrument = Instrument -module Meter = Meter + +module Meter = struct + include Meter + + let default = Meter_provider.default_meter + + let add_to_exporter = Meter_provider.add_to_exporter + + let add_to_main_exporter = Meter_provider.add_to_main_exporter +end (** {2 Logs} *) module Log_record = Log_record -module Logger = Logger + +module Logger = struct + include Logger + + let default = Log_provider.default_logger + + let log = Log_provider.log + + let logf = Log_provider.logf +end + module Logs = Logger [@@deprecated "use Logger"] (** {2 Utils} *) diff --git a/src/lib/scope_attributes.ml b/src/lib/scope_attributes.ml new file mode 100644 index 00000000..0db02f07 --- /dev/null +++ b/src/lib/scope_attributes.ml @@ -0,0 +1,27 @@ +(** Helper for building instrumentation scope attributes. + + Used internally by {!Tracer.get}, {!Meter.get}, {!Logger.get}. *) + +(** Build a list of fixed key-value attributes from instrumentation scope + parameters. These attributes will be injected into every signal emitted by a + tracer/meter/logger obtained via the corresponding [get] function. + + @param name instrumentation scope name (recorded as [otel.scope.name]) + @param version + instrumentation scope version (recorded as [otel.scope.version]) + @param __MODULE__ + the OCaml module name, typically the [__MODULE__] literal (recorded as + [code.namespace]) + @param attrs additional fixed attributes *) +let make_attrs ?name ?version ?(attrs : (string * [< Value.t ]) list = []) + ?__MODULE__ () : Key_value.t list = + let maybe_cons opt k l = + match opt with + | None -> l + | Some v -> (k, (`String v : Value.t)) :: l + in + let l = (attrs :> Key_value.t list) in + let l = maybe_cons __MODULE__ Conventions.Attributes.Code.namespace l in + let l = maybe_cons version "otel.scope.version" l in + let l = maybe_cons name "otel.scope.name" l in + l diff --git a/src/lib/sdk.ml b/src/lib/sdk.ml new file mode 100644 index 00000000..c5224634 --- /dev/null +++ b/src/lib/sdk.ml @@ -0,0 +1,100 @@ +(** SDK setup. + + Convenience module for installing a single {!Exporter.t} as the global + backend, wiring it into {!Trace_provider}, {!Meter_provider}, and + {!Log_provider} at once. Optionally applies per-signal batching. *) + +open Opentelemetry_emitter + +open struct + let exporter : Exporter.t option Atomic.t = Atomic.make None +end + +(** Remove current exporter, if any. + @param on_done called once the exporter has fully shut down (queue drained). +*) +let remove ~on_done () : unit = + (* flush+close provider emitters so buffered signals reach the queue *) + Emitter.flush_and_close (Trace_provider.get ()).emit; + Emitter.flush_and_close (Meter_provider.get ()).emit; + Emitter.flush_and_close (Log_provider.get ()).emit; + + (* clear providers — no new signals accepted *) + Trace_provider.clear (); + Meter_provider.clear (); + Log_provider.clear (); + match Atomic.exchange exporter None with + | None -> on_done () + | Some exp -> + (* wait for exporter to fully drain, then call on_done *) + Aswitch.on_turn_off (Exporter.active exp) on_done; + (* initiate shutdown (closes queue, starts consumer drain) *) + Exporter.shutdown exp + +let[@inline] present () : bool = Option.is_some (Atomic.get exporter) + +let[@inline] get () : Exporter.t option = Atomic.get exporter + +(** Aswitch of the installed exporter, or {!Aswitch.dummy} if none. *) +let[@inline] active () : Aswitch.t = + match Atomic.get exporter with + | None -> Aswitch.dummy + | Some exp -> Exporter.active exp + +let add_on_tick_callback (f : unit -> unit) : unit = + Globals.add_on_tick_callback f + +let run_tick_callbacks () : unit = Globals.run_tick_callbacks () + +(** Tick all providers and run all registered callbacks. Call this periodically + (e.g. every 500ms) to drive metrics collection, GC metrics, and batch + timeout flushing. This is the single function client libraries should call + from their ticker. *) +let tick () : unit = Globals.run_tick_callbacks () + +let set ?batch_traces ?batch_metrics ?batch_logs + ?(batch_timeout = Mtime.Span.(2_000 * ms)) (exp : Exporter.t) : unit = + Atomic.set exporter (Some exp); + let tracer : Tracer.t = + let t = Tracer.of_exporter exp in + { + t with + emit = + Emitter_batch.add_batching_opt ~timeout:batch_timeout + ~batch_size:batch_traces t.emit; + } + in + let meter : Meter.t = + let m = Meter.of_exporter exp in + { + m with + emit = + Emitter_batch.add_batching_opt ~timeout:batch_timeout + ~batch_size:batch_metrics m.emit; + } + in + let logger : Logger.t = + let l = Logger.of_exporter exp in + { + l with + emit = + Emitter_batch.add_batching_opt ~timeout:batch_timeout + ~batch_size:batch_logs l.emit; + } + in + Trace_provider.set tracer; + Meter_provider.set meter; + Log_provider.set logger + +let self_metrics () : Metrics.t list = + match get () with + | None -> [] + | Some exp -> exp.Exporter.self_metrics () + +(* Permanent tick callback to drive batch timeouts on provider emitters *) +let () = + Globals.add_on_tick_callback (fun () -> + let mtime = Mtime_clock.now () in + Emitter.tick (Trace_provider.get ()).emit ~mtime; + Emitter.tick (Meter_provider.get ()).emit ~mtime; + Emitter.tick (Log_provider.get ()).emit ~mtime) diff --git a/src/lib/trace_provider.ml b/src/lib/trace_provider.ml new file mode 100644 index 00000000..9622b2e5 --- /dev/null +++ b/src/lib/trace_provider.ml @@ -0,0 +1,131 @@ +open Proto.Trace +open Opentelemetry_emitter + +open struct + let provider_ : Tracer.t Atomic.t = Atomic.make Tracer.dummy +end + +let get () : Tracer.t = Atomic.get provider_ + +let set (t : Tracer.t) : unit = Atomic.set provider_ t + +let clear () : unit = Atomic.set provider_ Tracer.dummy + +(** Get a tracer pre-configured with a fixed set of attributes added to every + span it emits, forwarding to the current global tracer. Intended to be + called once at the top of a library module. + + @param name instrumentation scope name (recorded as [otel.scope.name]) + @param version + instrumentation scope version (recorded as [otel.scope.version]) + @param __MODULE__ + the OCaml module name, typically the [__MODULE__] literal (recorded as + [code.namespace]) + @param attrs additional fixed attributes *) +let get_tracer ?name ?version ?(attrs : (string * [< Value.t ]) list = []) + ?__MODULE__ () : Tracer.t = + let extra = + Scope_attributes.make_attrs ?name ?version ~attrs ?__MODULE__ () + in + { + Tracer.emit = + Emitter.make ~signal_name:"spans" + ~enabled:(fun () -> Emitter.enabled (Atomic.get provider_).emit) + ~emit:(fun spans -> + (match extra with + | [] -> () + | _ -> List.iter (fun span -> Span.add_attrs span extra) spans); + Emitter.emit (Atomic.get provider_).emit spans) + (); + clock = { Clock.now = (fun () -> Clock.now (Clock.Main.get ())) }; + } + +(** A Tracer.t that lazily reads the global at emit time *) +let default_tracer : Tracer.t = get_tracer () + +(** Emit a span directly via the current global tracer *) +let emit (span : Span.t) : unit = Emitter.emit default_tracer.emit [ span ] + +(** Helper to implement {!with_} and similar functions *) +let with_thunk_and_finally (self : Tracer.t) ?(force_new_trace_id = false) + ?trace_state ?(attrs : (string * [< Value.t ]) list = []) ?kind ?trace_id + ?parent ?links name cb = + let parent = + match parent with + | Some _ -> parent + | None -> Ambient_span.get () + in + let trace_id = + match trace_id, parent with + | _ when force_new_trace_id -> Trace_id.create () + | Some trace_id, _ -> trace_id + | None, Some p -> Span.trace_id p + | None, None -> Trace_id.create () + in + let start_time = Clock.now self.clock in + let span_id = Span_id.create () in + + let parent_id = Option.map Span.id parent in + + let span : Span.t = + Span.make ?trace_state ?kind ?parent:parent_id ~trace_id ~id:span_id ~attrs + ?links ~start_time ~end_time:start_time name + in + let () = + match Dynamic_enricher.collect () with + | [] -> () + | dyn_attrs -> Span.add_attrs span dyn_attrs + in + (* called once we're done, to emit a span *) + let finally res = + let end_time = Clock.now self.clock in + Proto.Trace.span_set_end_time_unix_nano span end_time; + + (match Span.status span with + | Some _ -> () + | None -> + (match res with + | Ok () -> () + | Error (e, bt) -> + Span.record_exception span e bt; + let status = + make_status ~code:Status_code_error ~message:(Printexc.to_string e) () + in + Span.set_status span status)); + + Emitter.emit self.emit [ span ] + in + let thunk () = Ambient_span.with_ambient span (fun () -> cb span) in + thunk, finally + +(** Sync span guard. + + Notably, this includes {e implicit} scope-tracking: if called without a + [~scope] argument (or [~parent]/[~trace_id]), it will check in the + {!Ambient_context} for a surrounding environment, and use that as the scope. + Similarly, it uses {!Scope.with_ambient_scope} to {e set} a new scope in the + ambient context, so that any logically-nested calls to {!with_} will use + this span as their parent. + + {b NOTE} be careful not to call this inside a Gc alarm, as it can cause + deadlocks. + + @param tracer the tracer to use (default [default_tracer]) + @param force_new_trace_id + if true (default false), the span will not use a ambient scope, the + [~scope] argument, nor [~trace_id], but will instead always create fresh + identifiers for this span *) +let with_ ?(tracer = default_tracer) ?force_new_trace_id ?trace_state ?attrs + ?kind ?trace_id ?parent ?links name (cb : Span.t -> 'a) : 'a = + let thunk, finally = + with_thunk_and_finally tracer ?force_new_trace_id ?trace_state ?attrs ?kind + ?trace_id ?parent ?links name cb + in + try + let rv = thunk () in + finally (Ok ()); + rv + with e -> + let bt = Printexc.get_raw_backtrace () in + finally (Error (e, bt)); + raise e diff --git a/src/lib/tracer.ml b/src/lib/tracer.ml index bc4afd2a..91d0a76e 100644 --- a/src/lib/tracer.ml +++ b/src/lib/tracer.ml @@ -6,8 +6,6 @@ {{:https://opentelemetry.io/docs/reference/specification/overview/#tracing-signal} the spec} *) -open Common_ -open Proto.Trace open Opentelemetry_emitter type span = Span.t @@ -26,102 +24,9 @@ let dummy : t = { emit = Emitter.dummy; clock = Clock.ptime_clock } let[@inline] enabled (self : t) = Emitter.enabled self.emit let of_exporter (exp : Exporter.t) : t = - { emit = exp.emit_spans; clock = exp.clock } - -open struct - (* internal default, keeps the default param below working without deprecation alerts *) - let dynamic_main_ : t = - Main_exporter.dynamic_forward_to_main_exporter |> of_exporter -end - -let default = dynamic_main_ - -let (add_event [@deprecated "use Span.add_event"]) = Span.add_event' - -let (add_attrs [@deprecated "use Span.add_attrs"]) = Span.add_attrs' - -(** Helper to implement {!with_} and similar functions *) -let with_thunk_and_finally (self : t) ?(force_new_trace_id = false) ?trace_state - ?(attrs : (string * [< Value.t ]) list = []) ?kind ?trace_id ?parent ?links - name cb = - let parent = - match parent with - | Some _ -> parent - | None -> Ambient_span.get () + let emit = + Emitter.make ~signal_name:"spans" + ~emit:(fun spans -> exp.Exporter.export (Any_signal_l.Spans spans)) + () in - let trace_id = - match trace_id, parent with - | _ when force_new_trace_id -> Trace_id.create () - | Some trace_id, _ -> trace_id - | None, Some p -> Span.trace_id p - | None, None -> Trace_id.create () - in - let start_time = Clock.now self.clock in - let span_id = Span_id.create () in - - let parent_id = Option.map Span.id parent in - - let span : Span.t = - Span.make ?trace_state ?kind ?parent:parent_id ~trace_id ~id:span_id ~attrs - ?links ~start_time ~end_time:start_time name - in - (* called once we're done, to emit a span *) - let finally res = - let end_time = Clock.now self.clock in - Proto.Trace.span_set_end_time_unix_nano span end_time; - - (match Span.status span with - | Some _ -> () - | None -> - (match res with - | Ok () -> - (* By default, all spans are Unset, which means a span completed without error. - The Ok status is reserved for when you need to explicitly mark a span as successful - rather than stick with the default of Unset (i.e., “without error”). - - https://opentelemetry.io/docs/languages/go/instrumentation/#set-span-status *) - () - | Error (e, bt) -> - Span.record_exception span e bt; - let status = - make_status ~code:Status_code_error ~message:(Printexc.to_string e) () - in - Span.set_status span status)); - - Emitter.emit self.emit [ span ] - in - let thunk () = Ambient_span.with_ambient span (fun () -> cb span) in - thunk, finally - -(** Sync span guard. - - Notably, this includes {e implicit} scope-tracking: if called without a - [~scope] argument (or [~parent]/[~trace_id]), it will check in the - {!Ambient_context} for a surrounding environment, and use that as the scope. - Similarly, it uses {!Scope.with_ambient_scope} to {e set} a new scope in the - ambient context, so that any logically-nested calls to {!with_} will use - this span as their parent. - - {b NOTE} be careful not to call this inside a Gc alarm, as it can cause - deadlocks. - - @param tracer the tracer to use (default [get_main()]) - @param force_new_trace_id - if true (default false), the span will not use a ambient scope, the - [~scope] argument, nor [~trace_id], but will instead always create fresh - identifiers for this span *) -let with_ ?(tracer = dynamic_main_) ?force_new_trace_id ?trace_state ?attrs - ?kind ?trace_id ?parent ?links name (cb : Span.t -> 'a) : 'a = - let thunk, finally = - with_thunk_and_finally tracer ?force_new_trace_id ?trace_state ?attrs ?kind - ?trace_id ?parent ?links name cb - in - - try - let rv = thunk () in - finally (Ok ()); - rv - with e -> - let bt = Printexc.get_raw_backtrace () in - finally (Error (e, bt)); - raise e + { emit; clock = Clock.Main.get () } diff --git a/src/lwt/opentelemetry_lwt.ml b/src/lwt/opentelemetry_lwt.ml index 48104128..b09d769f 100644 --- a/src/lwt/opentelemetry_lwt.ml +++ b/src/lwt/opentelemetry_lwt.ml @@ -4,8 +4,8 @@ include Opentelemetry let setup_ambient_context () = Opentelemetry_ambient_context.set_current_storage Ambient_context_lwt.storage -module Main_exporter = struct - include Main_exporter +module Sdk = struct + include Sdk let remove () : unit Lwt.t = let p, resolve = Lwt.wait () in diff --git a/src/trace/common_.ml b/src/trace/common_.ml index 09086d69..c5744107 100644 --- a/src/trace/common_.ml +++ b/src/trace/common_.ml @@ -1,6 +1,6 @@ module OTEL = Opentelemetry -module Otrace = Trace_core (* ocaml-trace *) -module Ambient_context = Opentelemetry_ambient_context +module Trace = Trace_core (* ocaml-trace *) +module Ambient_context = Ambient_context let ( let@ ) = ( @@ ) diff --git a/src/trace/dune b/src/trace/dune index 5814053c..c03bf9f2 100644 --- a/src/trace/dune +++ b/src/trace/dune @@ -5,7 +5,7 @@ (optional) ; trace (flags :standard -open Opentelemetry_util -open Opentelemetry_atomic) (libraries - (re_export opentelemetry.ambient-context) + (re_export ambient-context) (re_export opentelemetry.util) opentelemetry.atomic (re_export opentelemetry) diff --git a/src/trace/opentelemetry_trace.ml b/src/trace/opentelemetry_trace.ml index f3484b31..2c240842 100644 --- a/src/trace/opentelemetry_trace.ml +++ b/src/trace/opentelemetry_trace.ml @@ -1,19 +1,19 @@ open Common_ module Extensions = struct - type Otrace.span += Span_otel of OTEL.Span.t + type Trace.span += Span_otel of OTEL.Span.t - type Otrace.extension_event += - | Ev_link_span of Otrace.span * OTEL.Span_ctx.t + type Trace.extension_event += + | Ev_link_span of Trace.span * OTEL.Span_ctx.t | Ev_record_exn of { - sp: Otrace.span; + sp: Trace.span; exn: exn; bt: Printexc.raw_backtrace; } - | Ev_set_span_kind of Otrace.span * OTEL.Span_kind.t - | Ev_set_span_status of Otrace.span * OTEL.Span_status.t + | Ev_set_span_kind of Trace.span * OTEL.Span_kind.t + | Ev_set_span_status of Trace.span * OTEL.Span_status.t - type Otrace.metric += + type Trace.metric += | Metric_hist of OTEL.Metrics.histogram_data_point | Metric_sum_int of int | Metric_sum_float of float @@ -28,7 +28,7 @@ open struct } let create_state ~(exporter : OTEL.Exporter.t) () : state = - let clock = exporter.clock in + let clock = OTEL.Clock.ptime_clock in { clock; exporter } (* sanity check: otrace meta-map must be the same as hmap *) @@ -39,12 +39,11 @@ open struct OTEL.Span_ctx.k_ambient let enter_span (self : state) ~__FUNCTION__ ~__FILE__ ~__LINE__ ~level:_ - ~params:_ ~(data : (_ * Otrace.user_data) list) ~parent name : Otrace.span - = + ~params:_ ~(data : (_ * Trace.user_data) list) ~parent name : Trace.span = let start_time = OTEL.Clock.now self.clock in let trace_id, parent_id = match parent with - | Otrace.P_some (Span_otel sp) -> + | Trace.P_some (Span_otel sp) -> OTEL.Span.trace_id sp, Some (OTEL.Span.id sp) | _ -> (match Ambient_context.get k_span_ctx with @@ -100,10 +99,10 @@ open struct (* emit the span after setting the end timestamp *) let end_time = OTEL.Clock.now self.clock in OTEL.Proto.Trace.span_set_end_time_unix_nano span end_time; - OTEL.Exporter.send_trace self.exporter [ span ] + self.exporter.OTEL.Exporter.export (OTEL.Any_signal_l.Spans [ span ]) | _ -> () - let add_data_to_span _self span (data : (_ * Otrace.user_data) list) = + let add_data_to_span _self span (data : (_ * Trace.user_data) list) = match span with | Span_otel sp -> OTEL.Span.add_attrs sp data | _ -> () @@ -136,7 +135,7 @@ open struct OTEL.Log_record.make ~severity ?trace_id ?span_id ~attrs:data ~observed_time_unix_nano (`String msg) in - OTEL.Exporter.send_logs self.exporter [ log ] + self.exporter.OTEL.Exporter.export (OTEL.Any_signal_l.Logs [ log ]) let metric (self : state) ~level:_ ~params:_ ~data:attrs name v : unit = let now = OTEL.Clock.now self.clock in @@ -158,7 +157,8 @@ open struct | `sum v -> [ OTEL.Metrics.sum ~name [ v ] ] | `hist h -> [ OTEL.Metrics.histogram ~name [ h ] ] in - if m <> [] then OTEL.Exporter.send_metrics self.exporter m + if m <> [] then + self.exporter.OTEL.Exporter.export (OTEL.Any_signal_l.Metrics m) let extension (_self : state) ~level:_ ev = match ev with @@ -176,16 +176,35 @@ open struct let shutdown self = OTEL.Exporter.shutdown self.exporter - let callbacks : state Otrace.Collector.Callbacks.t = - Otrace.Collector.Callbacks.make ~enter_span ~exit_span ~add_data_to_span + let callbacks : state Trace.Collector.Callbacks.t = + Trace.Collector.Callbacks.make ~enter_span ~exit_span ~add_data_to_span ~message ~metric ~extension ~shutdown () end +module Ambient_span_provider_ = struct + let get_current_span () = + match OTEL.Ambient_span.get () with + | None -> None + | Some sp -> Some (Span_otel sp) + + let with_current_span_set_to () span f = + match span with + | Span_otel sp -> OTEL.Ambient_span.with_ambient sp (fun () -> f span) + | _ -> f span + + let callbacks : unit Trace.Ambient_span_provider.Callbacks.t = + { get_current_span; with_current_span_set_to } + + let provider = Trace.Ambient_span_provider.ASP_some ((), callbacks) +end + +let ambient_span_provider = Ambient_span_provider_.provider + let collector_of_exporter (exporter : OTEL.Exporter.t) : Trace_core.collector = let st = create_state ~exporter () in Trace_core.Collector.C_some (st, callbacks) -let with_ambient_span (sp : Otrace.span) f = +let with_ambient_span (sp : Trace.span) f = match sp with | Span_otel sp -> Ambient_context.with_key_bound_to k_span_ctx (OTEL.Span.to_span_ctx sp) f @@ -194,43 +213,59 @@ let with_ambient_span (sp : Otrace.span) f = let with_ambient_span_ctx (sp : OTEL.Span_ctx.t) f = Ambient_context.with_key_bound_to k_span_ctx sp f -let link_span_to_otel_ctx (sp1 : Otrace.span) (sp2 : OTEL.Span_ctx.t) : unit = - if Otrace.enabled () then Otrace.extension_event @@ Ev_link_span (sp1, sp2) +let link_span_to_otel_ctx (sp1 : Trace.span) (sp2 : OTEL.Span_ctx.t) : unit = + if Trace.enabled () then Trace.extension_event @@ Ev_link_span (sp1, sp2) -let link_spans (sp1 : Otrace.span) (sp2 : Otrace.span) : unit = - if Otrace.enabled () then ( +let link_spans (sp1 : Trace.span) (sp2 : Trace.span) : unit = + if Trace.enabled () then ( match sp2 with | Span_otel sp2 -> - Otrace.extension_event @@ Ev_link_span (sp1, OTEL.Span.to_span_ctx sp2) + Trace.extension_event @@ Ev_link_span (sp1, OTEL.Span.to_span_ctx sp2) | _ -> () ) let[@inline] set_span_kind sp k : unit = - if Otrace.enabled () then Otrace.extension_event @@ Ev_set_span_kind (sp, k) + if Trace.enabled () then Trace.extension_event @@ Ev_set_span_kind (sp, k) let[@inline] set_span_status sp status : unit = - if Otrace.enabled () then - Otrace.extension_event @@ Ev_set_span_status (sp, status) + if Trace.enabled () then + Trace.extension_event @@ Ev_set_span_status (sp, status) let record_exception sp exn bt : unit = - if Otrace.enabled () then - Otrace.extension_event @@ Ev_record_exn { sp; exn; bt } + if Trace.enabled () then + Trace.extension_event @@ Ev_record_exn { sp; exn; bt } (** Collector that forwards to the {b currently installed} OTEL exporter. *) -let collector_main_otel_exporter () : Otrace.collector = - collector_of_exporter OTEL.Main_exporter.dynamic_forward_to_main_exporter +let collector_main_otel_exporter () : Trace.collector = + (* Create a dynamic exporter that forwards to the currently installed main + exporter at call time. *) + let dynamic_exp : OTEL.Exporter.t = + { + OTEL.Exporter.export = + (fun sig_ -> + match OTEL.Sdk.get () with + | None -> () + | Some exp -> exp.OTEL.Exporter.export sig_); + active = (fun () -> Aswitch.dummy); + shutdown = ignore; + self_metrics = (fun () -> OTEL.Sdk.self_metrics ()); + } + in + collector_of_exporter dynamic_exp let (collector [@deprecated "use collector_of_exporter or collector_main_otel_exporter"]) = collector_main_otel_exporter -let setup () = Otrace.setup_collector @@ collector_main_otel_exporter () +let setup () = + Trace.set_ambient_context_provider Ambient_span_provider_.provider; + Trace.setup_collector @@ collector_main_otel_exporter () let setup_with_otel_exporter exp : unit = let coll = collector_of_exporter exp in - OTEL.Main_exporter.set exp; - Otrace.setup_collector coll + OTEL.Sdk.set exp; + Trace.setup_collector coll let setup_with_otel_backend = setup_with_otel_exporter diff --git a/src/trace/opentelemetry_trace.mli b/src/trace/opentelemetry_trace.mli index 4b590869..995ab0e7 100644 --- a/src/trace/opentelemetry_trace.mli +++ b/src/trace/opentelemetry_trace.mli @@ -59,6 +59,9 @@ val collector : unit -> Trace_core.collector (** Make a Trace collector that uses the main OTEL backend to send spans and logs *) +val ambient_span_provider : Trace_core.Ambient_span_provider.t +(** Uses {!Ambient_context} to provide contextual spans in {!Trace_core}.*) + val link_spans : Otrace.span -> Otrace.span -> unit (** [link_spans sp1 sp2] modifies [sp1] by adding a span link to [sp2]. @since 0.11 *) diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index 83376078..3350dc30 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -22,7 +22,7 @@ let stress_alloc_ = ref true let num_tr = Atomic.make 0 let run_job () = - let active = OT.Main_exporter.active () in + let active = OT.Sdk.active () in let i = ref 0 in let cnt = ref 0 in @@ -88,9 +88,9 @@ let run_job () = () let run () = - OT.Gc_metrics.setup_on_main_exporter (); + OT.Gc_metrics.setup (); - OT.Meter.add_cb (fun ~clock:_ () -> OT.Main_exporter.self_metrics ()); + OT.Meter.add_cb (fun ~clock:_ () -> OT.Sdk.self_metrics ()); OT.Meter.add_cb (fun ~clock () -> let now = OT.Clock.now clock in OT.Metrics. diff --git a/tests/bin/emit1_cohttp.ml b/tests/bin/emit1_cohttp.ml index 478e4add..6d7d4989 100644 --- a/tests/bin/emit1_cohttp.ml +++ b/tests/bin/emit1_cohttp.ml @@ -24,7 +24,7 @@ let num_tr = Atomic.make 0 let run_job job_id : unit Lwt.t = let i = ref 0 in - while%lwt T.Aswitch.is_on (T.Main_exporter.active ()) && !i < !n do + while%lwt T.Aswitch.is_on (T.Sdk.active ()) && !i < !n do (* Printf.eprintf "test: run outer loop job_id=%d i=%d\n%!" job_id !i; *) let@ scope = Atomic.incr num_tr; @@ -82,7 +82,7 @@ let run_job job_id : unit Lwt.t = Lwt.return ()*) let run () : unit Lwt.t = - T.Gc_metrics.setup_on_main_exporter (); + T.Gc_metrics.setup (); T.Meter.add_cb (fun ~clock () -> let now = T.Clock.now clock in diff --git a/tests/bin/emit1_eio.ml b/tests/bin/emit1_eio.ml index 469dcdcf..27bee02a 100644 --- a/tests/bin/emit1_eio.ml +++ b/tests/bin/emit1_eio.ml @@ -21,7 +21,7 @@ let n = ref max_int let run_job clock _job_id iterations : unit = let i = ref 0 in - while OT.Aswitch.is_on (OT.Main_exporter.active ()) && !i < !n do + while OT.Aswitch.is_on (OT.Sdk.active ()) && !i < !n do let@ scope = Atomic.incr num_tr; OT.Tracer.with_ ~kind:OT.Span.Span_kind_producer "loop.outer" @@ -69,7 +69,7 @@ let run_job clock _job_id iterations : unit = done let run env proc iterations () : unit = - OT.Gc_metrics.setup_on_main_exporter (); + OT.Gc_metrics.setup (); OT.Meter.add_cb (fun ~clock () -> let now = OT.Clock.now clock in diff --git a/tests/bin/emit1_ocurl_lwt.ml b/tests/bin/emit1_ocurl_lwt.ml index 08fe9ede..528e0f20 100644 --- a/tests/bin/emit1_ocurl_lwt.ml +++ b/tests/bin/emit1_ocurl_lwt.ml @@ -22,7 +22,7 @@ let stress_alloc_ = ref true let num_tr = Atomic.make 0 let run_job () : unit Lwt.t = - let active = OT.Main_exporter.active () in + let active = OT.Sdk.active () in let i = ref 0 in let cnt = ref 0 in @@ -86,9 +86,9 @@ let run_job () : unit Lwt.t = Lwt.return ()*) let run () : unit Lwt.t = - OT.Gc_metrics.setup_on_main_exporter (); + OT.Gc_metrics.setup (); - OT.Meter.add_cb (fun ~clock:_ () -> OT.Main_exporter.self_metrics ()); + OT.Meter.add_cb (fun ~clock:_ () -> OT.Sdk.self_metrics ()); OT.Meter.add_cb (fun ~clock () -> let now = OT.Clock.now clock in OT.Metrics. diff --git a/tests/bin/emit1_stdout.ml b/tests/bin/emit1_stdout.ml index 4fa38f97..4ea4526d 100644 --- a/tests/bin/emit1_stdout.ml +++ b/tests/bin/emit1_stdout.ml @@ -22,7 +22,7 @@ let stress_alloc_ = ref true let num_tr = Atomic.make 0 let run_job () = - let active = OT.Main_exporter.active () in + let active = OT.Sdk.active () in let i = ref 0 in let cnt = ref 0 in @@ -85,7 +85,7 @@ let run_job () = () let run () = - OT.Gc_metrics.setup_on_main_exporter (); + OT.Gc_metrics.setup (); OT.Meter.add_cb (fun ~clock () -> let now = OT.Clock.now clock in @@ -163,7 +163,7 @@ let () = ~high_watermark:20_000 () in let exp = - OTC.Exporter_queued.create ~clock:exp.clock ~q + OTC.Exporter_queued.create ~clock:OT.Clock.ptime_clock ~q ~consumer:(Consumer_exporter.consumer exp) () in @@ -173,7 +173,7 @@ let () = exp, ignore in - OT.Main_exporter.set exporter; + OT.Sdk.set exporter; let@ () = Fun.protect ~finally in if !self_trace then Opentelemetry_client.Self_trace.set_enabled true; diff --git a/tests/implicit_scope/sync/test_implicit_scope_sync.ml b/tests/implicit_scope/sync/test_implicit_scope_sync.ml index 61e73a66..234db6af 100644 --- a/tests/implicit_scope/sync/test_implicit_scope_sync.ml +++ b/tests/implicit_scope/sync/test_implicit_scope_sync.ml @@ -4,17 +4,28 @@ module Otel = Opentelemetry let spans_emitted : Otel.Span.t list ref = ref [] let test_exporter : Otel.Exporter.t = - let open Otel.Exporter in { - (dummy ()) with - emit_spans = - Opentelemetry_emitter.To_list.to_list ~signal_name:"spans" spans_emitted; + Otel.Exporter.export = + (fun sig_ -> + match sig_ with + | Otel.Any_signal_l.Spans sp -> + spans_emitted := List.rev_append sp !spans_emitted + | _ -> ()); + active = (fun () -> Opentelemetry_util.Aswitch.dummy); + shutdown = ignore; + self_metrics = (fun () -> []); } let with_test_exporter f = (* uncomment for eprintf debugging: *) (* let test_exporter = Opentelemetry_client.Exporter_debug.debug test_exporter in*) - Otel.Main_exporter.with_setup_debug_backend test_exporter () f + Otel.Sdk.set test_exporter; + Fun.protect f ~finally:(fun () -> + let sq = Opentelemetry_client_sync.Sync_queue.create () in + Otel.Sdk.remove + ~on_done:(fun () -> Opentelemetry_client_sync.Sync_queue.push sq ()) + (); + Opentelemetry_client_sync.Sync_queue.pop sq) let bytes_to_hex = Opentelemetry_util.Util_bytes_.bytes_to_hex