wip: use blocking queue

This commit is contained in:
Simon Cruanes 2023-06-15 22:29:10 -04:00
parent f5ed4bc9ef
commit b5c0ef7b20
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
9 changed files with 107 additions and 87 deletions

View file

@ -45,8 +45,9 @@
(opentelemetry (= :version))
(pbrt (>= 2.3))
(odoc :with-doc)
(ezcurl (>= 0.2.3))
ocurl)
(synopsis "Collector client for opentelemetry, using http + ocurl"))
(synopsis "Collector client for opentelemetry, using http + ezcurl"))
(package
(name opentelemetry-cohttp-lwt)

View file

@ -1,7 +1,7 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.4"
synopsis: "Collector client for opentelemetry, using http + ocurl"
synopsis: "Collector client for opentelemetry, using http + ezcurl"
maintainer: ["the Imandra team and contributors"]
authors: ["the Imandra team and contributors"]
license: "MIT"
@ -14,6 +14,7 @@ depends: [
"opentelemetry" {= version}
"pbrt" {>= "2.3"}
"odoc" {with-doc}
"ezcurl" {>= "0.2.3"}
"ocurl"
]
build: [

View file

@ -0,0 +1,69 @@
type 'a t = {
mutex: Mutex.t;
cond: Condition.t;
q: 'a Queue.t;
mutable closed: bool;
}
exception Closed
let create () : _ t =
{
mutex = Mutex.create ();
cond = Condition.create ();
q = Queue.create ();
closed = false;
}
let close (self : _ t) =
Mutex.lock self.mutex;
if not self.closed then (
self.closed <- true;
Condition.broadcast self.cond (* awake waiters so they fail *)
);
Mutex.unlock self.mutex
let push (self : _ t) x : unit =
Mutex.lock self.mutex;
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
) else (
Queue.push x self.q;
Condition.signal self.cond;
Mutex.unlock self.mutex
)
let pop (self : 'a t) : 'a =
Mutex.lock self.mutex;
let rec loop () =
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
) else if Queue.is_empty self.q then (
Condition.wait self.cond self.mutex;
(loop [@tailcall]) ()
) else (
let x = Queue.pop self.q in
Mutex.unlock self.mutex;
x
)
in
loop ()
let pop_all (self : 'a t) into : unit =
Mutex.lock self.mutex;
let rec loop () =
if Queue.is_empty self.q then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
Condition.wait self.cond self.mutex;
(loop [@tailcall]) ()
) else (
Queue.transfer self.q into;
Mutex.unlock self.mutex
)
in
loop ()

View file

@ -0,0 +1,23 @@
(** Basic Blocking Queue *)
type 'a t
val create : unit -> _ t
exception Closed
val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] into [q], and returns [()].
@raise Closed if [close q] was previously called.*)
val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *)
val pop_all : 'a t -> 'a Queue.t -> unit
(** [pop_all q into] pops all the elements of [q]
and moves them into [into]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *)
val close : _ t -> unit
(** Close the queue, meaning there won't be any more [push] allowed. *)

View file

@ -1,10 +1,10 @@
module Atomic = Opentelemetry_atomic.Atomic
include Opentelemetry.Lock
let[@inline] ( let@ ) f x = f x
let spf = Printf.sprintf
let ( let@ ) = ( @@ )
let tid () = Thread.id @@ Thread.self ()
let debug_ =
@ -22,11 +22,6 @@ let get_url () = !url
let set_url s = url := s
(** [with_mutex m f] calls [f()] in a section where [m] is locked. *)
let[@inline] with_mutex_ m f =
Mutex.lock m;
Fun.protect ~finally:(fun () -> Mutex.unlock m) f
let parse_headers s =
let parse_header s =
match String.split_on_char '=' s with

View file

@ -4,58 +4,24 @@ type t = {
debug: bool;
url: string;
headers: (string * string) list;
batch_traces: int option;
batch_metrics: int option;
batch_logs: int option;
batch_timeout_ms: int;
bg_threads: int;
ticker_thread: bool;
}
let pp out self =
let ppiopt = Format.pp_print_option Format.pp_print_int in
let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in
let ppheaders = Format.pp_print_list pp_header in
let {
debug;
url;
headers;
batch_traces;
batch_metrics;
batch_logs;
batch_timeout_ms;
bg_threads;
ticker_thread;
} =
self
in
let { debug; url; headers; batch_timeout_ms; bg_threads } = self in
Format.fprintf out
"{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \
batch_logs=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ ticker_thread=%B @]}"
debug url ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt
batch_logs batch_timeout_ms bg_threads ticker_thread
"{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_timeout_ms=%d; \
bg_threads=%d;@]}"
debug url ppheaders headers batch_timeout_ms bg_threads
let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
?(batch_traces = Some 400) ?(batch_metrics = None) ?(batch_logs = Some 400)
?(batch_timeout_ms = 500) ?(thread = true) ?bg_threads
?(ticker_thread = true) () : t =
?(batch_timeout_ms = 500) ?bg_threads () : t =
let bg_threads =
match bg_threads with
| Some n -> max n 0
| None ->
if thread then
4
else
0
| Some n -> max n 2
| None -> 4
in
{
debug;
url;
headers;
batch_traces;
batch_metrics;
batch_timeout_ms;
batch_logs;
bg_threads;
ticker_thread;
}
{ debug; url; headers; batch_timeout_ms; bg_threads }

View file

@ -6,23 +6,6 @@ type t = private {
headers: (string * string) list;
(** API headers sent to the endpoint. Default is none or
"OTEL_EXPORTER_OTLP_HEADERS" if set. *)
batch_traces: int option;
(** Batch traces? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
Note that traces and metrics are batched separately.
Default [Some 400].
*)
batch_metrics: int option;
(** Batch metrics? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
Note that traces and metrics are batched separately.
Default [None].
*)
batch_logs: int option;
(** Batch logs? See {!batch_metrics} for details.
Default [Some 400] *)
batch_timeout_ms: int;
(** Number of milliseconds after which we will emit a batch, even
incomplete.
@ -30,11 +13,6 @@ type t = private {
only checked when a new event occurs. Default 500. *)
bg_threads: int;
(** Are there background threads, and how many? Default [4] *)
ticker_thread: bool;
(** Is there a ticker thread? Default [true].
This thread will regularly call [tick()] on the backend, to make
sure it makes progress, and regularly send events to the collector.
This option is ignored if [bg_threads=0]. *)
}
(** Configuration.
@ -45,20 +23,11 @@ val make :
?debug:bool ->
?url:string ->
?headers:(string * string) list ->
?batch_traces:int option ->
?batch_metrics:int option ->
?batch_logs:int option ->
?batch_timeout_ms:int ->
?thread:bool ->
?bg_threads:int ->
?ticker_thread:bool ->
unit ->
t
(** Make a configuration.
@param thread if true and [bg_threads] is not provided, we will pick a number
of bg threads. Otherwise the number of [bg_threads] superseeds this option.
*)
val pp : Format.formatter -> t -> unit

View file

@ -2,4 +2,4 @@
(name opentelemetry_client_ocurl)
(public_name opentelemetry-client-ocurl)
(libraries opentelemetry opentelemetry.atomic curl pbrt threads mtime
mtime.clock.os))
mtime.clock.os ezcurl ezcurl.core))

View file

@ -14,10 +14,6 @@ val get_headers : unit -> (string * string) list
val set_headers : (string * string) list -> unit
(** Set http headers that are sent on every http query to the collector. *)
val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit
(** Set a lock/unlock pair to protect the critical sections
of {!Opentelemetry.Collector.BACKEND} *)
module Atomic = Opentelemetry_atomic.Atomic
module Config = Config