From b5c0ef7b20e6435df0d5d3084b33b9711c74a160 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 15 Jun 2023 22:29:10 -0400 Subject: [PATCH] wip: use blocking queue --- dune-project | 3 +- opentelemetry-client-ocurl.opam | 3 +- src/client-ocurl/b_queue.ml | 69 +++++++++++++++++++ src/client-ocurl/b_queue.mli | 23 +++++++ src/client-ocurl/common_.ml | 9 +-- src/client-ocurl/config.ml | 50 +++----------- src/client-ocurl/config.mli | 31 --------- src/client-ocurl/dune | 2 +- .../opentelemetry_client_ocurl.mli | 4 -- 9 files changed, 107 insertions(+), 87 deletions(-) create mode 100644 src/client-ocurl/b_queue.ml create mode 100644 src/client-ocurl/b_queue.mli diff --git a/dune-project b/dune-project index 6d660768..ec02e11c 100644 --- a/dune-project +++ b/dune-project @@ -45,8 +45,9 @@ (opentelemetry (= :version)) (pbrt (>= 2.3)) (odoc :with-doc) + (ezcurl (>= 0.2.3)) ocurl) - (synopsis "Collector client for opentelemetry, using http + ocurl")) + (synopsis "Collector client for opentelemetry, using http + ezcurl")) (package (name opentelemetry-cohttp-lwt) diff --git a/opentelemetry-client-ocurl.opam b/opentelemetry-client-ocurl.opam index 30305dc4..1f1b4c4d 100644 --- a/opentelemetry-client-ocurl.opam +++ b/opentelemetry-client-ocurl.opam @@ -1,7 +1,7 @@ # This file is generated by dune, edit dune-project instead opam-version: "2.0" version: "0.4" -synopsis: "Collector client for opentelemetry, using http + ocurl" +synopsis: "Collector client for opentelemetry, using http + ezcurl" maintainer: ["the Imandra team and contributors"] authors: ["the Imandra team and contributors"] license: "MIT" @@ -14,6 +14,7 @@ depends: [ "opentelemetry" {= version} "pbrt" {>= "2.3"} "odoc" {with-doc} + "ezcurl" {>= "0.2.3"} "ocurl" ] build: [ diff --git a/src/client-ocurl/b_queue.ml b/src/client-ocurl/b_queue.ml new file mode 100644 index 00000000..21d6012f --- /dev/null +++ b/src/client-ocurl/b_queue.ml @@ -0,0 +1,69 @@ +type 'a t = { + mutex: Mutex.t; + cond: Condition.t; + q: 'a Queue.t; + mutable closed: bool; +} + +exception Closed + +let create () : _ t = + { + mutex = Mutex.create (); + cond = Condition.create (); + q = Queue.create (); + closed = false; + } + +let close (self : _ t) = + Mutex.lock self.mutex; + if not self.closed then ( + self.closed <- true; + Condition.broadcast self.cond (* awake waiters so they fail *) + ); + Mutex.unlock self.mutex + +let push (self : _ t) x : unit = + Mutex.lock self.mutex; + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ) else ( + Queue.push x self.q; + Condition.signal self.cond; + Mutex.unlock self.mutex + ) + +let pop (self : 'a t) : 'a = + Mutex.lock self.mutex; + let rec loop () = + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ) else if Queue.is_empty self.q then ( + Condition.wait self.cond self.mutex; + (loop [@tailcall]) () + ) else ( + let x = Queue.pop self.q in + Mutex.unlock self.mutex; + x + ) + in + loop () + +let pop_all (self : 'a t) into : unit = + Mutex.lock self.mutex; + let rec loop () = + if Queue.is_empty self.q then ( + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + Condition.wait self.cond self.mutex; + (loop [@tailcall]) () + ) else ( + Queue.transfer self.q into; + Mutex.unlock self.mutex + ) + in + loop () diff --git a/src/client-ocurl/b_queue.mli b/src/client-ocurl/b_queue.mli new file mode 100644 index 00000000..73bf3edc --- /dev/null +++ b/src/client-ocurl/b_queue.mli @@ -0,0 +1,23 @@ +(** Basic Blocking Queue *) + +type 'a t + +val create : unit -> _ t + +exception Closed + +val push : 'a t -> 'a -> unit +(** [push q x] pushes [x] into [q], and returns [()]. + @raise Closed if [close q] was previously called.*) + +val pop : 'a t -> 'a +(** [pop q] pops the next element in [q]. It might block until an element comes. + @raise Closed if the queue was closed before a new element was available. *) + +val pop_all : 'a t -> 'a Queue.t -> unit +(** [pop_all q into] pops all the elements of [q] + and moves them into [into]. It might block until an element comes. + @raise Closed if the queue was closed before a new element was available. *) + +val close : _ t -> unit +(** Close the queue, meaning there won't be any more [push] allowed. *) diff --git a/src/client-ocurl/common_.ml b/src/client-ocurl/common_.ml index 592a4462..f688ff2b 100644 --- a/src/client-ocurl/common_.ml +++ b/src/client-ocurl/common_.ml @@ -1,10 +1,10 @@ module Atomic = Opentelemetry_atomic.Atomic include Opentelemetry.Lock -let[@inline] ( let@ ) f x = f x - let spf = Printf.sprintf +let ( let@ ) = ( @@ ) + let tid () = Thread.id @@ Thread.self () let debug_ = @@ -22,11 +22,6 @@ let get_url () = !url let set_url s = url := s -(** [with_mutex m f] calls [f()] in a section where [m] is locked. *) -let[@inline] with_mutex_ m f = - Mutex.lock m; - Fun.protect ~finally:(fun () -> Mutex.unlock m) f - let parse_headers s = let parse_header s = match String.split_on_char '=' s with diff --git a/src/client-ocurl/config.ml b/src/client-ocurl/config.ml index b86e6305..1a790654 100644 --- a/src/client-ocurl/config.ml +++ b/src/client-ocurl/config.ml @@ -4,58 +4,24 @@ type t = { debug: bool; url: string; headers: (string * string) list; - batch_traces: int option; - batch_metrics: int option; - batch_logs: int option; batch_timeout_ms: int; bg_threads: int; - ticker_thread: bool; } let pp out self = - let ppiopt = Format.pp_print_option Format.pp_print_int in let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in let ppheaders = Format.pp_print_list pp_header in - let { - debug; - url; - headers; - batch_traces; - batch_metrics; - batch_logs; - batch_timeout_ms; - bg_threads; - ticker_thread; - } = - self - in + let { debug; url; headers; batch_timeout_ms; bg_threads } = self in Format.fprintf out - "{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \ - batch_logs=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ ticker_thread=%B @]}" - debug url ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt - batch_logs batch_timeout_ms bg_threads ticker_thread + "{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_timeout_ms=%d; \ + bg_threads=%d;@]}" + debug url ppheaders headers batch_timeout_ms bg_threads let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ()) - ?(batch_traces = Some 400) ?(batch_metrics = None) ?(batch_logs = Some 400) - ?(batch_timeout_ms = 500) ?(thread = true) ?bg_threads - ?(ticker_thread = true) () : t = + ?(batch_timeout_ms = 500) ?bg_threads () : t = let bg_threads = match bg_threads with - | Some n -> max n 0 - | None -> - if thread then - 4 - else - 0 + | Some n -> max n 2 + | None -> 4 in - { - debug; - url; - headers; - batch_traces; - batch_metrics; - batch_timeout_ms; - batch_logs; - bg_threads; - ticker_thread; - } + { debug; url; headers; batch_timeout_ms; bg_threads } diff --git a/src/client-ocurl/config.mli b/src/client-ocurl/config.mli index 957dba13..46e6a1c9 100644 --- a/src/client-ocurl/config.mli +++ b/src/client-ocurl/config.mli @@ -6,23 +6,6 @@ type t = private { headers: (string * string) list; (** API headers sent to the endpoint. Default is none or "OTEL_EXPORTER_OTLP_HEADERS" if set. *) - batch_traces: int option; - (** Batch traces? If [Some i], then this produces batches of (at most) - [i] items. If [None], there is no batching. - - Note that traces and metrics are batched separately. - Default [Some 400]. - *) - batch_metrics: int option; - (** Batch metrics? If [Some i], then this produces batches of (at most) - [i] items. If [None], there is no batching. - - Note that traces and metrics are batched separately. - Default [None]. - *) - batch_logs: int option; - (** Batch logs? See {!batch_metrics} for details. - Default [Some 400] *) batch_timeout_ms: int; (** Number of milliseconds after which we will emit a batch, even incomplete. @@ -30,11 +13,6 @@ type t = private { only checked when a new event occurs. Default 500. *) bg_threads: int; (** Are there background threads, and how many? Default [4] *) - ticker_thread: bool; - (** Is there a ticker thread? Default [true]. - This thread will regularly call [tick()] on the backend, to make - sure it makes progress, and regularly send events to the collector. - This option is ignored if [bg_threads=0]. *) } (** Configuration. @@ -45,20 +23,11 @@ val make : ?debug:bool -> ?url:string -> ?headers:(string * string) list -> - ?batch_traces:int option -> - ?batch_metrics:int option -> - ?batch_logs:int option -> ?batch_timeout_ms:int -> - ?thread:bool -> ?bg_threads:int -> - ?ticker_thread:bool -> unit -> t (** Make a configuration. - - @param thread if true and [bg_threads] is not provided, we will pick a number - of bg threads. Otherwise the number of [bg_threads] superseeds this option. - *) val pp : Format.formatter -> t -> unit diff --git a/src/client-ocurl/dune b/src/client-ocurl/dune index 8e5c873f..a8162b4a 100644 --- a/src/client-ocurl/dune +++ b/src/client-ocurl/dune @@ -2,4 +2,4 @@ (name opentelemetry_client_ocurl) (public_name opentelemetry-client-ocurl) (libraries opentelemetry opentelemetry.atomic curl pbrt threads mtime - mtime.clock.os)) + mtime.clock.os ezcurl ezcurl.core)) diff --git a/src/client-ocurl/opentelemetry_client_ocurl.mli b/src/client-ocurl/opentelemetry_client_ocurl.mli index 4292586f..7d3ccc0a 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.mli +++ b/src/client-ocurl/opentelemetry_client_ocurl.mli @@ -14,10 +14,6 @@ val get_headers : unit -> (string * string) list val set_headers : (string * string) list -> unit (** Set http headers that are sent on every http query to the collector. *) -val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit -(** Set a lock/unlock pair to protect the critical sections - of {!Opentelemetry.Collector.BACKEND} *) - module Atomic = Opentelemetry_atomic.Atomic module Config = Config