Merge pull request #101 from ajbt200128/austin/logs-integration

feature: Logs integration
This commit is contained in:
Simon Cruanes 2025-08-25 10:10:25 -04:00 committed by GitHub
commit 5facbdfc6d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 785 additions and 1 deletions

View file

@ -56,7 +56,7 @@ jobs:
- run: opam install . --deps-only --with-test --solver=mccs
if: ${{ startsWith(matrix.ocaml-compiler, '5') }}
- run: opam exec -- dune build @install -p opentelemetry,opentelemetry-lwt,opentelemetry-client-ocurl,opentelemetry-cohttp-lwt,opentelemetry-client-cohttp-lwt
- run: opam exec -- dune build @install -p opentelemetry,opentelemetry-lwt,opentelemetry-client-ocurl,opentelemetry-cohttp-lwt,opentelemetry-client-cohttp-lwt,opentelemetry-logs
- run: opam pin trace --dev -y -n
- run: opam install trace

View file

@ -88,6 +88,19 @@
(alcotest :with-test))
(synopsis "Collector client for opentelemetry, using http + ezcurl"))
(package
(name opentelemetry-logs)
(depends
(ocaml
(>= "4.08"))
(opentelemetry
(= :version))
(odoc :with-doc)
(logs
(>= "0.7.0"))
(alcotest :with-test))
(synopsis "Opentelemetry tracing for Cohttp HTTP servers"))
(package
(name opentelemetry-cohttp-lwt)
(depends

38
opentelemetry-logs.opam Normal file
View file

@ -0,0 +1,38 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.11.2"
synopsis: "Opentelemetry tracing for Cohttp HTTP servers"
maintainer: [
"Simon Cruanes <simon.cruanes.2007@m4x.org>"
"Matt Bray <mattjbray@gmail.com>"
"ELLIOTTCABLE <opam@ell.io>"
]
authors: ["the Imandra team and contributors"]
license: "MIT"
homepage: "https://github.com/imandra-ai/ocaml-opentelemetry"
bug-reports: "https://github.com/imandra-ai/ocaml-opentelemetry/issues"
depends: [
"dune" {>= "2.9"}
"ocaml" {>= "4.08"}
"opentelemetry" {= version}
"odoc" {with-doc}
"logs" {>= "0.7.0"}
"alcotest" {with-test}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"--promote-install-files=false"
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
["dune" "install" "-p" name "--create-install-files" name]
]
dev-repo: "git+https://github.com/imandra-ai/ocaml-opentelemetry.git"

View file

@ -0,0 +1,4 @@
(library
(name opentelemetry_logs)
(public_name opentelemetry-logs)
(libraries opentelemetry logs))

View file

@ -0,0 +1,115 @@
module Otel = Opentelemetry
(*****************************************************************************)
(* Prelude *)
(*****************************************************************************)
(* This module is for sending logs from the Logs library
(https://github.com/dbuenzli/logs) via OTel. It is NOT a general logging
library (See Logs for that).
*)
(*****************************************************************************)
(* Levels *)
(*****************************************************************************)
(* Convert log level to Otel severity *)
let log_level_to_severity (level : Logs.level) : Otel.Logs.severity =
match level with
| Logs.App -> Otel.Logs.Severity_number_info (* like info, but less severe *)
| Logs.Info -> Otel.Logs.Severity_number_info2
| Logs.Error -> Otel.Logs.Severity_number_error
| Logs.Warning -> Otel.Logs.Severity_number_warn
| Logs.Debug -> Otel.Logs.Severity_number_debug
(*****************************************************************************)
(* Logs Util *)
(*****************************************************************************)
let create_tag (tag : string) : string Logs.Tag.def =
Logs.Tag.def tag Format.pp_print_string
let emit_telemetry_tag =
Logs.Tag.def ~doc:"Whether or not to emit this log via telemetry"
"emit_telemetry" Format.pp_print_bool
let emit_telemetry do_emit = Logs.Tag.(empty |> add emit_telemetry_tag do_emit)
(*****************************************************************************)
(* Logging *)
(*****************************************************************************)
(* Log a message to otel with some attrs *)
let log ?service_name ?(attrs = []) ?(scope = Otel.Scope.get_ambient_scope ())
~level msg =
let log_level = Logs.level_to_string (Some level) in
let span_id =
Option.map (fun (scope : Otel.Scope.t) -> scope.span_id) scope
in
let trace_id =
Option.map (fun (scope : Otel.Scope.t) -> scope.trace_id) scope
in
let severity = log_level_to_severity level in
let log = Otel.Logs.make_str ~severity ~log_level ?trace_id ?span_id msg in
(* Noop if no backend is set *)
Otel.Logs.emit ?service_name ~attrs [ log ]
let otel_reporter ?service_name ?(attributes = []) () : Logs.reporter =
let report src level ~over k msgf =
msgf (fun ?header ?(tags : Logs.Tag.set option) fmt ->
let k _ =
over ();
k ()
in
Format.kasprintf
(fun msg ->
let tags = Option.value ~default:Logs.Tag.empty tags in
let attrs =
let tags =
Logs.Tag.fold
(fun (Logs.Tag.(V (d, v)) : Logs.Tag.t) acc ->
let name = Logs.Tag.name d in
(* Is there a better way to compare tags? structural equality does not work *)
if String.equal name (Logs.Tag.name emit_telemetry_tag) then
(* Don't include the emit_telemetry_tag in the attributes *)
acc
else (
let value =
let value_printer = Logs.Tag.printer d in
(* Also the default for Format.asprintf *)
let buffer = Buffer.create 512 in
let formatter = Format.formatter_of_buffer buffer in
value_printer formatter v;
Buffer.contents buffer
in
let s = name, `String value in
s :: acc
))
tags []
in
let header =
match header with
| None -> []
| Some h -> [ "header", `String h ]
in
let src_str = Logs.Src.name src in
header @ [ "src", `String src_str ] @ tags @ attributes
in
let do_emit =
Option.value ~default:true (Logs.Tag.find emit_telemetry_tag tags)
in
if do_emit then log ?service_name ~attrs ~level msg;
k ())
fmt)
in
{ Logs.report }
let attach_otel_reporter ?service_name ?attributes reporter =
(* Copied directly from the Logs.mli docs. Just calls a bunch of reporters in a
row *)
let combine r1 r2 =
let report src level ~over k msgf =
let v = r1.Logs.report src level ~over:(fun () -> ()) k msgf in
r2.Logs.report src level ~over (fun () -> v) msgf
in
{ Logs.report }
in
let otel_reporter = otel_reporter ?service_name ?attributes () in
combine reporter otel_reporter

View file

@ -0,0 +1,79 @@
val emit_telemetry_tag : bool Logs.Tag.def
(** [emit_telemetry_tag] is a logging tag that when applied to a log, determines
if a log will be emitted by the tracing/telemetry backend.
Since some OTel backends can cause deadlocks if used during a GC alarm, this
is useful for when you may want logging during a GC alarm. It is also useful
if you want to log sensitive information, but not report it via
Opentelemetry.
If this tag is not set on a log, said log will be emitted by default if the
otel reporter is registered.
Example:
{[
let tags =
Logs.Tag.(add Opentelemetry_logs.emit_telemetry_tag false other_tags)
in
Logs.info (fun m ->
m ~tags "This log will not be sent to the telemetry backend")
]} *)
val emit_telemetry : bool -> Logs.Tag.set
(** [emit_telemetry emit_or_not] creates a tag set with the
{!emit_telemetry_tag} as its only member *)
val otel_reporter :
?service_name:string ->
?attributes:(string * Opentelemetry.value) list ->
unit ->
Logs.reporter
(** [otel_reporter ?service_name ?tag_value_pp_buffer_size ?attrs ()] creates a
[Logs.reporter] that will create and emit an OTel log with the following
info:
{ul
{- Log severity is converted to OTel severity as follows:
{[
module Otel = Opentelemetry
match level with
| Logs.App -> Otel.Logs.Severity_number_info (* like info, but less severe *)
| Logs.Info -> Otel.Logs.Severity_number_info2
| Logs.Error -> Otel.Logs.Severity_number_error
| Logs.Warning -> Otel.Logs.Severity_number_warn
| Logs.Debug -> Otel.Logs.Severity_number_debug
]}
}
{- message is the formatted with the given [fmt] and [msgf] function, and
emitted as the log body
}
{- [header] and [src] will be added as attributes
[("header", `String header)] and [("src", `String (Logs.Src.name src))]
respectively
}
{- [tags] will be also added as attributes, with the tag name as the key,
and the value formatted via its formatter as the value.
}
{- [attributes] will also be added as attributes, and are useful for
setting static attributes such as a library name
}
}
Example use: [Logs.set_reporter (Opentelemetery_logs.otel_reporter ())] *)
val attach_otel_reporter :
?service_name:string ->
?attributes:(string * Opentelemetry.value) list ->
Logs.reporter ->
Logs.reporter
(** [attach_otel_reporter ?service_name ?attributes reporter] will create a
reporter that first calls the reporter passed as an argument, then an otel
report created via {!otel_reporter}, for every log. This is useful for if
you want to emit logs to stderr and to OTel at the same time.
Example:
{[
let reporter = Logs_fmt.reporter () in
Logs.set_reporter
(Opentelemetry_logs.attach_otel_reporter ?service_name ?attributes
reporter)
]} *)

View file

@ -37,6 +37,20 @@
opentelemetry.client
opentelemetry-client-cohttp-eio))
(executable
(name emit_logs_cohttp)
(modules emit_logs_cohttp)
(preprocess
(pps lwt_ppx))
(libraries
cohttp-lwt-unix
opentelemetry
opentelemetry-client-cohttp-lwt
opentelemetry-cohttp-lwt
opentelemetry-logs
logs
))
(executable
(name cohttp_client)
(modules cohttp_client)
@ -45,3 +59,5 @@
opentelemetry
opentelemetry-client-cohttp-lwt
opentelemetry-cohttp-lwt))

View file

@ -0,0 +1,103 @@
module T = Opentelemetry_lwt
(*****************************************************************************)
(* Prelude *)
(*****************************************************************************)
let string_tag = Logs.Tag.def "string_attr" Format.pp_print_string
let int_tag = Logs.Tag.def "int_attr" Format.pp_print_int
let float_tag = Logs.Tag.def "float_attr" Format.pp_print_float
let bool_tag = Logs.Tag.def "bool_attr" Format.pp_print_bool
let string_list_tag =
Logs.Tag.def "string_list_attr" (Format.pp_print_list Format.pp_print_string)
let varied_tag_set =
Logs.Tag.(
empty
|> add string_tag "string_value"
|> add int_tag 42 |> add float_tag 3.14 |> add bool_tag true
|> add string_list_tag [ "foo"; "bar"; "baz" ])
let run () =
let otel_reporter =
Opentelemetry_logs.otel_reporter ~service_name:"emit_logs"
~attributes:[ "my_reporter_attr", `String "foo" ]
()
in
Logs.set_reporter otel_reporter;
Logs.set_level (Some Logs.Debug);
Logs.debug (fun m -> m "emit_logs: starting");
Logs.info (fun m -> m "emit_logs: info log");
Logs.warn (fun m -> m "emit_logs: warn log");
Logs.err (fun m -> m "emit_logs: error log");
Logs.app (fun m -> m "emit_logs: app log");
let%lwt () =
T.Trace.with_ ~kind:T.Span.Span_kind_producer "my_scope" (fun _scope ->
Logs.info (fun m ->
m ~tags:varied_tag_set
"emit_logs: this log is emitted with varied tags from a span");
Lwt.return_unit)
in
let no_emit_tag = Opentelemetry_logs.emit_telemetry false in
Logs.info (fun m ->
m ~tags:no_emit_tag "emit_logs: this log will not be emitted");
Logs.info (fun m ->
m ~tags:varied_tag_set
"emit_logs: this log will be emitted with varied tags");
let fmt_logger = Logs_fmt.reporter ~dst:Format.err_formatter () in
let combined_logger =
Opentelemetry_logs.attach_otel_reporter ~service_name:"emit_logs_fmt"
~attributes:[ "my_fmt_attr", `String "bar" ]
fmt_logger
in
Logs.set_reporter combined_logger;
Logs.info (fun m ->
m "emit_logs: this log will be emitted from otel and fmt reporter");
Logs.set_level None;
(* disable logging *)
Logs.debug (fun m ->
m "emit_logs: this log will not be emitted, logging disabled");
Lwt.return_unit
let () =
Sys.catch_break true;
T.Globals.service_name := "t1";
T.Globals.service_namespace := Some "ocaml-otel.test";
let debug = ref false in
let batch_traces = ref 400 in
let batch_metrics = ref 3 in
let batch_logs = ref 400 in
let url = ref None in
let opts =
[
"--debug", Arg.Bool (( := ) debug), " enable debug output";
( "--url",
Arg.String (fun s -> url := Some s),
" set the url for the OTel collector" );
]
|> Arg.align
in
Arg.parse opts (fun _ -> ()) "emit1 [opt]*";
let some_if_nzero r =
if !r > 0 then
Some !r
else
None
in
let config =
Opentelemetry_client_cohttp_lwt.Config.make ~debug:!debug ?url:!url
~batch_traces:(some_if_nzero batch_traces)
~batch_metrics:(some_if_nzero batch_metrics)
~batch_logs:(some_if_nzero batch_logs) ()
in
Format.printf "@[@ config: %a@]@." Opentelemetry_client_cohttp_lwt.Config.pp
config;
Opentelemetry_client_cohttp_lwt.with_setup ~config () run |> Lwt_main.run

18
tests/logs/dune Normal file
View file

@ -0,0 +1,18 @@
(env
(_
; Make the binaries for the test emitters available on the path for the components defined in this dir.
; See https://dune.readthedocs.io/en/stable/reference/dune/env.html
(binaries
(../bin/emit_logs_cohttp.exe as emit_logs_cohttp)
)))
(tests
(names test_logs_e2e)
(package opentelemetry-logs)
(libraries
opentelemetry
opentelemetry-logs
signal_gatherer
alcotest)
(deps %{bin:emit_logs_cohttp})
)

View file

@ -0,0 +1,355 @@
{ resource =
Some(
{ attributes =
[{ key = "service.namespace";
value = Some(String_value("ocaml-otel.test"));
};
{ key = "service.name"; value = Some(String_value("emit_logs")); };
{ key = "src"; value = Some(String_value("application")); };
{ key = "my_reporter_attr"; value = Some(String_value("foo")); }
];
dropped_attributes_count = 0;
});
scope_logs =
[{ scope =
Some(
{ name = "ocaml-otel";
version = "%%VERSION_NUM%%";
attributes = [];
dropped_attributes_count = 0;
});
log_records =
[{ time_unix_nano = 0;
observed_time_unix_nano = 0;
severity_number = Severity_number_debug;
severity_text = "debug";
body = Some(String_value("emit_logs: starting"));
attributes = [];
dropped_attributes_count = 0;
flags = 0;
trace_id = <bytes len=0>;
span_id = <bytes len=0>;
}
];
schema_url = "";
}
];
schema_url = "";
}
{ resource =
Some(
{ attributes =
[{ key = "service.namespace";
value = Some(String_value("ocaml-otel.test"));
};
{ key = "service.name"; value = Some(String_value("emit_logs")); };
{ key = "src"; value = Some(String_value("application")); };
{ key = "my_reporter_attr"; value = Some(String_value("foo")); }
];
dropped_attributes_count = 0;
});
scope_logs =
[{ scope =
Some(
{ name = "ocaml-otel";
version = "%%VERSION_NUM%%";
attributes = [];
dropped_attributes_count = 0;
});
log_records =
[{ time_unix_nano = 0;
observed_time_unix_nano = 0;
severity_number = Severity_number_info2;
severity_text = "info";
body = Some(String_value("emit_logs: info log"));
attributes = [];
dropped_attributes_count = 0;
flags = 0;
trace_id = <bytes len=0>;
span_id = <bytes len=0>;
}
];
schema_url = "";
}
];
schema_url = "";
}
{ resource =
Some(
{ attributes =
[{ key = "service.namespace";
value = Some(String_value("ocaml-otel.test"));
};
{ key = "service.name";
value = Some(String_value("emit_logs"));
};
{ key = "src"; value = Some(String_value("application")); };
{ key = "my_reporter_attr"; value = Some(String_value("foo")); }
];
dropped_attributes_count = 0;
});
scope_logs =
[{ scope =
Some(
{ name = "ocaml-otel";
version = "%%VERSION_NUM%%";
attributes = [];
dropped_attributes_count = 0;
});
log_records =
[{ time_unix_nano = 0;
observed_time_unix_nano = 0;
severity_number = Severity_number_warn;
severity_text = "warning";
body = Some(String_value("emit_logs: warn log"));
attributes = [];
dropped_attributes_count = 0;
flags = 0;
trace_id = <bytes len=0>;
span_id = <bytes len=0>;
}
];
schema_url = "";
}
];
schema_url = "";
}
{ resource =
Some(
{ attributes =
[{ key = "service.namespace";
value = Some(String_value("ocaml-otel.test"));
};
{ key = "service.name";
value = Some(String_value("emit_logs"));
};
{ key = "src"; value = Some(String_value("application")); };
{ key = "my_reporter_attr";
value = Some(String_value("foo"));
}
];
dropped_attributes_count = 0;
});
scope_logs =
[{ scope =
Some(
{ name = "ocaml-otel";
version = "%%VERSION_NUM%%";
attributes = [];
dropped_attributes_count = 0;
});
log_records =
[{ time_unix_nano = 0;
observed_time_unix_nano = 0;
severity_number = Severity_number_error;
severity_text = "error";
body = Some(String_value("emit_logs: error log"));
attributes = [];
dropped_attributes_count = 0;
flags = 0;
trace_id = <bytes len=0>;
span_id = <bytes len=0>;
}
];
schema_url = "";
}
];
schema_url = "";
}
{ resource =
Some(
{ attributes =
[{ key = "service.namespace";
value = Some(String_value("ocaml-otel.test"));
};
{ key = "service.name";
value = Some(String_value("emit_logs"));
};
{ key = "src"; value = Some(String_value("application")); };
{ key = "my_reporter_attr";
value = Some(String_value("foo"));
}
];
dropped_attributes_count = 0;
});
scope_logs =
[{ scope =
Some(
{ name = "ocaml-otel";
version = "%%VERSION_NUM%%";
attributes = [];
dropped_attributes_count = 0;
});
log_records =
[{ time_unix_nano = 0;
observed_time_unix_nano = 0;
severity_number = Severity_number_info;
severity_text = "app";
body = Some(String_value("emit_logs: app log"));
attributes = [];
dropped_attributes_count = 0;
flags = 0;
trace_id = <bytes len=0>;
span_id = <bytes len=0>;
}
];
schema_url = "";
}
];
schema_url = "";
}
{ resource =
Some(
{ attributes =
[{ key = "service.namespace";
value = Some(String_value("ocaml-otel.test"));
};
{ key = "service.name";
value = Some(String_value("emit_logs"));
};
{ key = "src";
value = Some(String_value("application"));
};
{ key = "string_list_attr";
value = Some(String_value(""));
};
{ key = "bool_attr"; value = Some(String_value("")); };
{ key = "float_attr"; value = Some(String_value("")); };
{ key = "int_attr"; value = Some(String_value("")); };
{ key = "string_attr"; value = Some(String_value("")); };
{ key = "my_reporter_attr";
value = Some(String_value("foo"));
}
];
dropped_attributes_count = 0;
});
scope_logs =
[{ scope =
Some(
{ name = "ocaml-otel";
version = "%%VERSION_NUM%%";
attributes = [];
dropped_attributes_count = 0;
});
log_records =
[{ time_unix_nano = 0;
observed_time_unix_nano = 0;
severity_number = Severity_number_info2;
severity_text = "info";
body =
Some(
String_value(
"emit_logs: this log is emitted with varied tags from a span"));
attributes = [];
dropped_attributes_count = 0;
flags = 0;
trace_id = <bytes len=16>;
span_id = <bytes len=8>;
}
];
schema_url = "";
}
];
schema_url = "";
}
{ resource =
Some(
{ attributes =
[{ key = "service.namespace";
value = Some(String_value("ocaml-otel.test"));
};
{ key = "service.name";
value = Some(String_value("emit_logs"));
};
{ key = "src";
value = Some(String_value("application"));
};
{ key = "string_list_attr";
value = Some(String_value(""));
};
{ key = "bool_attr"; value = Some(String_value("")); };
{ key = "float_attr"; value = Some(String_value("")); };
{ key = "int_attr"; value = Some(String_value("")); };
{ key = "string_attr"; value = Some(String_value("")); };
{ key = "my_reporter_attr";
value = Some(String_value("foo"));
}
];
dropped_attributes_count = 0;
});
scope_logs =
[{ scope =
Some(
{ name = "ocaml-otel";
version = "%%VERSION_NUM%%";
attributes = [];
dropped_attributes_count = 0;
});
log_records =
[{ time_unix_nano = 0;
observed_time_unix_nano = 0;
severity_number = Severity_number_info2;
severity_text = "info";
body =
Some(
String_value(
"emit_logs: this log will be emitted with varied tags"));
attributes = [];
dropped_attributes_count = 0;
flags = 0;
trace_id = <bytes len=0>;
span_id = <bytes len=0>;
}
];
schema_url = "";
}
];
schema_url = "";
}
{ resource =
Some(
{ attributes =
[{ key = "service.namespace";
value = Some(String_value("ocaml-otel.test"));
};
{ key = "service.name";
value = Some(String_value("emit_logs_fmt"));
};
{ key = "src";
value = Some(String_value("application"));
};
{ key = "my_fmt_attr";
value = Some(String_value("bar"));
}
];
dropped_attributes_count = 0;
});
scope_logs =
[{ scope =
Some(
{ name = "ocaml-otel";
version = "%%VERSION_NUM%%";
attributes = [];
dropped_attributes_count = 0;
});
log_records =
[{ time_unix_nano = 0;
observed_time_unix_nano = 0;
severity_number = Severity_number_info2;
severity_text = "info";
body =
Some(
String_value(
"emit_logs: this log will be emitted from otel and fmt reporter"));
attributes = [];
dropped_attributes_count = 0;
flags = 0;
trace_id = <bytes len=0>;
span_id = <bytes len=0>;
}
];
schema_url = "";
}
];
schema_url = "";
}

View file

@ -0,0 +1,43 @@
module Client = Opentelemetry_client
module L = Opentelemetry_proto.Logs
(* NOTE: This port must be different from that used by other integration tests,
to prevent socket binding clashes. *)
let port = 4359
let url = Printf.sprintf "http://localhost:%d" port
let cmd = [ "emit_logs_cohttp"; "--url"; url ]
let tests (signal_batches : Client.Signal.t list) =
ignore signal_batches;
List.iter
(fun (signal_batch : Client.Signal.t) ->
match signal_batch with
| Logs ls ->
ls (* Mask out the times so tests don't change in between runs *)
|> List.map (fun (l : L.resource_logs) ->
let masked_scope_logs =
List.map
(fun (sl : L.scope_logs) ->
let masked_log_records =
List.map
(fun (lr : L.log_record) ->
{
lr with
time_unix_nano = 0L;
observed_time_unix_nano = 0L;
})
sl.log_records
in
{ sl with log_records = masked_log_records })
l.scope_logs
in
{ l with scope_logs = masked_scope_logs })
|> List.iter (Format.printf "%a\n" L.pp_resource_logs)
| _ -> ())
signal_batches
let () =
let signal_batches = Signal_gatherer.gather_signals ~port cmd in
tests signal_batches