From 9c3e2a7076e954055f52de0d3123c3850063cf1f Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Jul 2022 12:53:00 -0400 Subject: [PATCH] feat: pass `?stop` atomic; allow for multiple background threads --- src/client/config.ml | 22 ++- src/client/config.mli | 13 +- src/client/opentelemetry_client_ocurl.ml | 191 ++++++++++++---------- src/client/opentelemetry_client_ocurl.mli | 19 ++- tests/bin/emit1.ml | 13 +- 5 files changed, 160 insertions(+), 98 deletions(-) diff --git a/src/client/config.ml b/src/client/config.ml index 3c204946..b86e6305 100644 --- a/src/client/config.ml +++ b/src/client/config.ml @@ -8,7 +8,7 @@ type t = { batch_metrics: int option; batch_logs: int option; batch_timeout_ms: int; - thread: bool; + bg_threads: int; ticker_thread: bool; } @@ -24,20 +24,30 @@ let pp out self = batch_metrics; batch_logs; batch_timeout_ms; - thread; + bg_threads; ticker_thread; } = self in Format.fprintf out "{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \ - batch_logs=%a;@ batch_timeout_ms=%d; thread=%B;@ ticker_thread=%B @]}" + batch_logs=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ ticker_thread=%B @]}" debug url ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt - batch_logs batch_timeout_ms thread ticker_thread + batch_logs batch_timeout_ms bg_threads ticker_thread let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ()) ?(batch_traces = Some 400) ?(batch_metrics = None) ?(batch_logs = Some 400) - ?(batch_timeout_ms = 500) ?(thread = true) ?(ticker_thread = true) () : t = + ?(batch_timeout_ms = 500) ?(thread = true) ?bg_threads + ?(ticker_thread = true) () : t = + let bg_threads = + match bg_threads with + | Some n -> max n 0 + | None -> + if thread then + 4 + else + 0 + in { debug; url; @@ -46,6 +56,6 @@ let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ()) batch_metrics; batch_timeout_ms; batch_logs; - thread; + bg_threads; ticker_thread; } diff --git a/src/client/config.mli b/src/client/config.mli index 3682b9cf..957dba13 100644 --- a/src/client/config.mli +++ b/src/client/config.mli @@ -28,12 +28,13 @@ type t = private { incomplete. Note that the batch might take longer than that, because this is only checked when a new event occurs. Default 500. *) - thread: bool; (** Is there a background thread? Default [true] *) + bg_threads: int; + (** Are there background threads, and how many? Default [4] *) ticker_thread: bool; (** Is there a ticker thread? Default [true]. This thread will regularly call [tick()] on the backend, to make sure it makes progress, and regularly send events to the collector. - This option is ignored if [thread=false]. *) + This option is ignored if [bg_threads=0]. *) } (** Configuration. @@ -49,9 +50,15 @@ val make : ?batch_logs:int option -> ?batch_timeout_ms:int -> ?thread:bool -> + ?bg_threads:int -> ?ticker_thread:bool -> unit -> t -(** Make a configuration *) +(** Make a configuration. + + @param thread if true and [bg_threads] is not provided, we will pick a number + of bg threads. Otherwise the number of [bg_threads] superseeds this option. + + *) val pp : Format.formatter -> t -> unit diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 9c535877..6d7671c2 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -44,37 +44,51 @@ let report_err_ = function | `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!" | `Failure msg -> Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg - | `Status (code, status) -> + | `Status (code, { Opentelemetry.Proto.Status.code = scode; message; details }) + -> + let pp_details out l = + List.iter + (fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s)) + l + in Format.eprintf - "@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." code - Proto.Status.pp_status status + "@[<2>opentelemetry: export failed with@ http code=%d@ status \ + {@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]@." + code scode + (Bytes.unsafe_to_string message) + pp_details details + +module Httpc : sig + type t + + val create : unit -> t -module type CURL = sig val send : - path:string -> decode:(Pbrt.Decoder.t -> 'a) -> string -> ('a, error) result + t -> + path:string -> + decode:(Pbrt.Decoder.t -> 'a) -> + string -> + ('a, error) result - val cleanup : unit -> unit -end - -(* create a curl client *) -module Curl () : CURL = struct + val cleanup : t -> unit +end = struct open Opentelemetry.Proto let () = Lazy.force _init_curl - let buf_res = Buffer.create 256 - (* TODO: use Curl.Multi, etc. instead? *) + type t = { + buf_res: Buffer.t; + curl: Curl.t; + } - (* http client *) - let curl : Curl.t = Curl.init () + let create () : t = { buf_res = Buffer.create 256; curl = Curl.init () } - let cleanup () = Curl.cleanup curl - - (* TODO: use Curl multi *) + let cleanup self = Curl.cleanup self.curl (* send the content to the remote endpoint/path *) - let send ~path ~decode (bod : string) : ('a, error) result = + let send (self : t) ~path ~decode (bod : string) : ('a, error) result = + let { curl; buf_res } = self in Curl.reset curl; if !debug_ then Curl.set_verbose curl true; Curl.set_url curl (!url ^ path); @@ -251,12 +265,10 @@ let batch_is_empty = List.for_all l_is_empty exceptions inside should be caught, see https://opentelemetry.io/docs/reference/specification/error-handling/ *) -let mk_emitter ~(config : Config.t) () : (module EMITTER) = +let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let open Proto in (* local helpers *) let open struct - let continue = Atomic.make true - let timeout = if config.batch_timeout_ms > 0 then Some Mtime.Span.(config.batch_timeout_ms * ms) @@ -272,76 +284,71 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) = let batch_logs : Logs.resource_logs list Batch.t = Batch.make ?batch:config.batch_logs ?timeout () - let encoder = Pbrt.Encoder.create () - - let curl = (module Curl () : CURL) - - module C = (val curl) - let on_tick_cbs_ = ref (ref []) let set_on_tick_callbacks = ( := ) on_tick_cbs_ - let send_http_ ~path ~encode x : unit = + let send_http_ (httpc : Httpc.t) encoder ~path ~encode x : unit = Pbrt.Encoder.reset encoder; encode x encoder; let data = Pbrt.Encoder.to_string encoder in - match C.send ~path ~decode:(fun _ -> ()) data with + match Httpc.send httpc ~path ~decode:(fun _ -> ()) data with | Ok () -> () | Error `Sysbreak -> Printf.eprintf "ctrl-c captured, stopping\n%!"; - Atomic.set continue false + Atomic.set stop true | Error err -> (* TODO: log error _via_ otel? *) Atomic.incr n_errors; report_err_ err - let send_metrics_http (l : Metrics.resource_metrics list list) = + let send_metrics_http curl encoder (l : Metrics.resource_metrics list list) + = let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let x = Metrics_service.default_export_metrics_service_request ~resource_metrics:l () in - send_http_ ~path:"/v1/metrics" + send_http_ curl encoder ~path:"/v1/metrics" ~encode:Metrics_service.encode_export_metrics_service_request x - let send_traces_http (l : Trace.resource_spans list list) = + let send_traces_http curl encoder (l : Trace.resource_spans list list) = let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let x = Trace_service.default_export_trace_service_request ~resource_spans:l () in - send_http_ ~path:"/v1/traces" + send_http_ curl encoder ~path:"/v1/traces" ~encode:Trace_service.encode_export_trace_service_request x - let send_logs_http (l : Logs.resource_logs list list) = + let send_logs_http curl encoder (l : Logs.resource_logs list list) = let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let x = Logs_service.default_export_logs_service_request ~resource_logs:l () in - send_http_ ~path:"/v1/logs" + send_http_ curl encoder ~path:"/v1/logs" ~encode:Logs_service.encode_export_logs_service_request x (* emit metrics, if the batch is full or timeout lapsed *) - let emit_metrics_maybe ~now ?force () : bool = + let emit_metrics_maybe ~now ?force httpc encoder : bool = match Batch.pop_if_ready ?force ~now batch_metrics with | None -> false | Some l -> let batch = AList.pop_all gc_metrics :: l in - send_metrics_http batch; + send_metrics_http httpc encoder batch; true - let emit_traces_maybe ~now ?force () : bool = + let emit_traces_maybe ~now ?force httpc encoder : bool = match Batch.pop_if_ready ?force ~now batch_traces with | None -> false | Some l -> - send_traces_http l; + send_traces_http httpc encoder l; true - let emit_logs_maybe ~now ?force () : bool = + let emit_logs_maybe ~now ?force httpc encoder : bool = match Batch.pop_if_ready ?force ~now batch_logs with | None -> false | Some l -> - send_logs_http l; + send_logs_http httpc encoder l; true let[@inline] guard_exn_ f = @@ -350,13 +357,13 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) = Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!" (Printexc.to_string e) - let emit_all_force () = + let emit_all_force (httpc : Httpc.t) encoder = let now = Mtime_clock.now () in - ignore (emit_traces_maybe ~now ~force:true () : bool); - ignore (emit_logs_maybe ~now ~force:true () : bool); - ignore (emit_metrics_maybe ~now ~force:true () : bool) + ignore (emit_traces_maybe ~now ~force:true httpc encoder : bool); + ignore (emit_logs_maybe ~now ~force:true httpc encoder : bool); + ignore (emit_metrics_maybe ~now ~force:true httpc encoder : bool) end in - if config.thread then ( + if config.bg_threads > 0 then ( (let m = Mutex.create () in Lock.set_mutex ~lock:(fun () -> Mutex.lock m) @@ -367,13 +374,15 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) = (* loop for the thread that processes events and sends them to collector *) let bg_thread () = - while Atomic.get continue do + let httpc = Httpc.create () in + let encoder = Pbrt.Encoder.create () in + while not @@ Atomic.get stop do let@ () = guard_exn_ in let now = Mtime_clock.now () in - let do_metrics = emit_metrics_maybe ~now () in - let do_traces = emit_traces_maybe ~now () in - let do_logs = emit_logs_maybe ~now () in + let do_metrics = emit_metrics_maybe ~now httpc encoder in + let do_traces = emit_traces_maybe ~now httpc encoder in + let do_logs = emit_logs_maybe ~now httpc encoder in if (not do_metrics) && (not do_traces) && not do_logs then (* wait for something to happen *) let@ () = with_mutex_ m in @@ -381,14 +390,21 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) = done; (* flush remaining events once we exit *) let@ () = guard_exn_ in - emit_all_force (); - C.cleanup () + emit_all_force httpc encoder; + Httpc.cleanup httpc in - start_bg_thread bg_thread; + + for _i = 1 to config.bg_threads do + start_bg_thread bg_thread + done; (* if the bg thread waits, this will wake it up so it can send batches *) - let wakeup () = - with_mutex_ m (fun () -> Condition.broadcast cond); + let wakeup ~all () = + with_mutex_ m (fun () -> + if all then + Condition.broadcast cond + else + Condition.signal cond); Thread.yield () in @@ -403,90 +419,97 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) = !(!on_tick_cbs_); let now = Mtime_clock.now () in - if - (not (Atomic.get continue)) - || Batch.is_ready ~now batch_metrics + if Atomic.get stop then + wakeup ~all:true () + else if + Batch.is_ready ~now batch_metrics || Batch.is_ready ~now batch_traces || Batch.is_ready ~now batch_logs then - wakeup () + wakeup ~all:false () in if config.ticker_thread then ( (* thread that calls [tick()] regularly, to help enforce timeouts *) let tick_thread () = - while Atomic.get continue do + while not @@ Atomic.get stop do Thread.delay 0.5; tick () done; - wakeup () + wakeup ~all:true () in start_bg_thread tick_thread ); let module M = struct - let push_trace e = if Batch.push batch_traces e then wakeup () + let push_trace e = if Batch.push batch_traces e then wakeup ~all:false () - let push_metrics e = if Batch.push batch_metrics e then wakeup () + let push_metrics e = + if Batch.push batch_metrics e then wakeup ~all:false () - let push_logs e = if Batch.push batch_logs e then wakeup () + let push_logs e = if Batch.push batch_logs e then wakeup ~all:false () let set_on_tick_callbacks = set_on_tick_callbacks let tick = tick let cleanup () = - Atomic.set continue false; + Atomic.set stop true; + if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; + (* wakeup everyone *) with_mutex_ m (fun () -> Condition.broadcast cond) end in (module M) ) else ( - let cleanup () = - emit_all_force (); - C.cleanup () - in + let httpc = Httpc.create () in + let encoder = Pbrt.Encoder.create () in let module M = struct let push_trace e = let@ () = guard_exn_ in Batch.push' batch_traces e; let now = Mtime_clock.now () in - ignore (emit_traces_maybe ~now () : bool) + ignore (emit_traces_maybe ~now httpc encoder : bool) let push_metrics e = let@ () = guard_exn_ in if Atomic.get needs_gc_metrics then sample_gc_metrics (); Batch.push' batch_metrics e; let now = Mtime_clock.now () in - ignore (emit_metrics_maybe ~now () : bool) + ignore (emit_metrics_maybe ~now httpc encoder : bool) let push_logs e = let@ () = guard_exn_ in Batch.push' batch_logs e; let now = Mtime_clock.now () in - ignore (emit_logs_maybe ~now () : bool) + ignore (emit_logs_maybe ~now httpc encoder : bool) let set_on_tick_callbacks = set_on_tick_callbacks let tick () = if Atomic.get needs_gc_metrics then sample_gc_metrics (); let now = Mtime_clock.now () in - ignore (emit_traces_maybe ~now () : bool); - ignore (emit_metrics_maybe ~now () : bool); - ignore (emit_logs_maybe ~now () : bool); + ignore (emit_traces_maybe ~now httpc encoder : bool); + ignore (emit_metrics_maybe ~now httpc encoder : bool); + ignore (emit_logs_maybe ~now httpc encoder : bool); () - let cleanup = cleanup + let cleanup () = + if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; + emit_all_force httpc encoder; + Httpc.cleanup httpc end in (module M) ) module Backend (Arg : sig + val stop : bool Atomic.t + val config : Config.t end) () : Opentelemetry.Collector.BACKEND = struct - include (val mk_emitter ~config:Arg.config ()) + include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ()) open Opentelemetry.Proto open Opentelemetry.Collector @@ -574,11 +597,13 @@ end) } end -let setup_ ~(config : Config.t) () = +let setup_ ?(stop = Atomic.make false) ~(config : Config.t) () = debug_ := config.debug; let module B = Backend (struct + let stop = stop + let config = config end) () @@ -586,15 +611,15 @@ let setup_ ~(config : Config.t) () = Opentelemetry.Collector.set_backend (module B); B.cleanup -let setup ?(config = Config.make ()) ?(enable = true) () = +let setup ?stop ?(config = Config.make ()) ?(enable = true) () = if enable then ( - let cleanup = setup_ ~config () in + let cleanup = setup_ ?stop ~config () in at_exit cleanup ) -let with_setup ?(config = Config.make ()) ?(enable = true) () f = +let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f = if enable then ( - let cleanup = setup_ ~config () in + let cleanup = setup_ ?stop ~config () in Fun.protect ~finally:cleanup f ) else f () diff --git a/src/client/opentelemetry_client_ocurl.mli b/src/client/opentelemetry_client_ocurl.mli index 9f8bc053..250767d9 100644 --- a/src/client/opentelemetry_client_ocurl.mli +++ b/src/client/opentelemetry_client_ocurl.mli @@ -20,13 +20,24 @@ val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit module Config = Config -val setup : ?config:Config.t -> ?enable:bool -> unit -> unit +val setup : + ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. @param enable actually setup the backend (default true). This can be used to enable/disable the setup depending on CLI arguments or environment. - @param config configuration to use *) + @param config configuration to use + @param stop an atomic boolean. When it becomes true, background threads + will all stop after a little while. +*) -val with_setup : ?config:Config.t -> ?enable:bool -> unit -> (unit -> 'a) -> 'a +val with_setup : + ?stop:bool Atomic.t -> + ?config:Config.t -> + ?enable:bool -> + unit -> + (unit -> 'a) -> + 'a (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up - after [f()] returns. *) + after [f()] returns + See {!setup} for more details. *) diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index 385dae36..d5196413 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -98,6 +98,7 @@ let () = let debug = ref false in let thread = ref true in + let n_bg_threads = ref 0 in let batch_traces = ref 400 in let batch_metrics = ref 3 in let opts = @@ -114,6 +115,7 @@ let () = "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; "-j", Arg.Set_int n_jobs, " number of parallel jobs"; + "--bg-threads", Arg.Set_int n_bg_threads, " number of background threads"; ] |> Arg.align in @@ -130,7 +132,14 @@ let () = Opentelemetry_client_ocurl.Config.make ~debug:!debug ~batch_traces:(some_if_nzero batch_traces) ~batch_metrics:(some_if_nzero batch_metrics) - ~thread:!thread () + ~thread:!thread + ?bg_threads: + (let n = !n_bg_threads in + if n = 0 then + None + else + Some n) + () in Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." !sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config; @@ -142,4 +151,4 @@ let () = Printf.printf "\ndone. %d spans in %.4fs (%.4f/s)\n%!" (Atomic.get num_tr) elapsed n_per_sec) in - Opentelemetry_client_ocurl.with_setup ~config () run + Opentelemetry_client_ocurl.with_setup ~stop ~config () run