Merge pull request #33 from imandra-ai/wip-simplify-ocurl-2023-06-15

simplify sync ocurl backend
This commit is contained in:
Simon Cruanes 2023-06-21 16:35:23 -04:00 committed by GitHub
commit 82e58d4108
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 523 additions and 735 deletions

View file

@ -3,7 +3,7 @@ name: build
on:
pull_request:
push:
branch:
branches:
- master
jobs:
@ -13,7 +13,7 @@ jobs:
matrix:
os:
- ubuntu-latest
- windows-latest
#- windows-latest
#- macos-latest
ocaml-compiler:
- 4.08.x

2
dune
View file

@ -1,3 +1,3 @@
(env
(_
(flags :standard -warn-error -a+8 -w +a-4-30-40-41-42-44-70 -strict-sequence)))
(flags :standard -warn-error -a+8 -w +a-4-30-40-41-42-44-48-70 -strict-sequence)))

View file

@ -45,8 +45,9 @@
(opentelemetry (= :version))
(pbrt (>= 2.3))
(odoc :with-doc)
(ezcurl (>= 0.2.3))
ocurl)
(synopsis "Collector client for opentelemetry, using http + ocurl"))
(synopsis "Collector client for opentelemetry, using http + ezcurl"))
(package
(name opentelemetry-cohttp-lwt)

View file

@ -1,7 +1,7 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.4"
synopsis: "Collector client for opentelemetry, using http + ocurl"
synopsis: "Collector client for opentelemetry, using http + ezcurl"
maintainer: ["the Imandra team and contributors"]
authors: ["the Imandra team and contributors"]
license: "MIT"
@ -14,6 +14,7 @@ depends: [
"opentelemetry" {= version}
"pbrt" {>= "2.3"}
"odoc" {with-doc}
"ezcurl" {>= "0.2.3"}
"ocurl"
]
build: [

View file

@ -0,0 +1,69 @@
type 'a t = {
mutex: Mutex.t;
cond: Condition.t;
q: 'a Queue.t;
mutable closed: bool;
}
exception Closed
let create () : _ t =
{
mutex = Mutex.create ();
cond = Condition.create ();
q = Queue.create ();
closed = false;
}
let close (self : _ t) =
Mutex.lock self.mutex;
if not self.closed then (
self.closed <- true;
Condition.broadcast self.cond (* awake waiters so they fail *)
);
Mutex.unlock self.mutex
let push (self : _ t) x : unit =
Mutex.lock self.mutex;
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
) else (
Queue.push x self.q;
Condition.signal self.cond;
Mutex.unlock self.mutex
)
let pop (self : 'a t) : 'a =
Mutex.lock self.mutex;
let rec loop () =
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
) else if Queue.is_empty self.q then (
Condition.wait self.cond self.mutex;
(loop [@tailcall]) ()
) else (
let x = Queue.pop self.q in
Mutex.unlock self.mutex;
x
)
in
loop ()
let pop_all (self : 'a t) into : unit =
Mutex.lock self.mutex;
let rec loop () =
if Queue.is_empty self.q then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
Condition.wait self.cond self.mutex;
(loop [@tailcall]) ()
) else (
Queue.transfer self.q into;
Mutex.unlock self.mutex
)
in
loop ()

View file

@ -0,0 +1,23 @@
(** Basic Blocking Queue *)
type 'a t
val create : unit -> _ t
exception Closed
val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] into [q], and returns [()].
@raise Closed if [close q] was previously called.*)
val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *)
val pop_all : 'a t -> 'a Queue.t -> unit
(** [pop_all q into] pops all the elements of [q]
and moves them into [into]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *)
val close : _ t -> unit
(** Close the queue, meaning there won't be any more [push] allowed. *)

24
src/client-ocurl/batch.ml Normal file
View file

@ -0,0 +1,24 @@
type 'a t = {
mutable len: int;
mutable l: 'a list list;
mutable started: Mtime.t;
}
let create () = { len = 0; l = []; started = Mtime_clock.now () }
let push self l =
if l != [] then (
if self.l == [] then self.started <- Mtime_clock.now ();
self.l <- l :: self.l;
self.len <- self.len + List.length l
)
let[@inline] len self = self.len
let[@inline] time_started self = self.started
let pop_all self =
let l = self.l in
self.l <- [];
self.len <- 0;
l

View file

@ -0,0 +1,14 @@
(** List of lists with length *)
type 'a t
val create : unit -> 'a t
val push : 'a t -> 'a list -> unit
val len : _ t -> int
val time_started : _ t -> Mtime.t
(** Time at which the batch most recently became non-empty *)
val pop_all : 'a t -> 'a list list

View file

@ -1,10 +1,10 @@
module Atomic = Opentelemetry_atomic.Atomic
include Opentelemetry.Lock
let[@inline] ( let@ ) f x = f x
let spf = Printf.sprintf
let ( let@ ) = ( @@ )
let tid () = Thread.id @@ Thread.self ()
let debug_ =
@ -22,11 +22,6 @@ let get_url () = !url
let set_url s = url := s
(** [with_mutex m f] calls [f()] in a section where [m] is locked. *)
let[@inline] with_mutex_ m f =
Mutex.lock m;
Fun.protect ~finally:(fun () -> Mutex.unlock m) f
let parse_headers s =
let parse_header s =
match String.split_on_char '=' s with

View file

@ -4,58 +4,23 @@ type t = {
debug: bool;
url: string;
headers: (string * string) list;
batch_traces: int option;
batch_metrics: int option;
batch_logs: int option;
batch_timeout_ms: int;
bg_threads: int;
ticker_thread: bool;
}
let pp out self =
let ppiopt = Format.pp_print_option Format.pp_print_int in
let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in
let ppheaders = Format.pp_print_list pp_header in
let {
debug;
url;
headers;
batch_traces;
batch_metrics;
batch_logs;
batch_timeout_ms;
bg_threads;
ticker_thread;
} =
let { debug; url; headers; batch_timeout_ms; bg_threads; ticker_thread } =
self
in
Format.fprintf out
"{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \
batch_logs=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ ticker_thread=%B @]}"
debug url ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt
batch_logs batch_timeout_ms bg_threads ticker_thread
"{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ \
ticker_thread=%B @]}"
debug url ppheaders headers batch_timeout_ms bg_threads ticker_thread
let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
?(batch_traces = Some 400) ?(batch_metrics = None) ?(batch_logs = Some 400)
?(batch_timeout_ms = 500) ?(thread = true) ?bg_threads
?(ticker_thread = true) () : t =
let bg_threads =
match bg_threads with
| Some n -> max n 0
| None ->
if thread then
4
else
0
in
{
debug;
url;
headers;
batch_traces;
batch_metrics;
batch_timeout_ms;
batch_logs;
bg_threads;
ticker_thread;
}
?(batch_timeout_ms = 500) ?(bg_threads = 4) ?(ticker_thread = true) () : t =
let bg_threads = max 2 (min bg_threads 32) in
{ debug; url; headers; batch_timeout_ms; bg_threads; ticker_thread }

View file

@ -1,3 +1,5 @@
(** Configuration for the ocurl backend *)
type t = private {
debug: bool;
url: string;
@ -6,23 +8,6 @@ type t = private {
headers: (string * string) list;
(** API headers sent to the endpoint. Default is none or
"OTEL_EXPORTER_OTLP_HEADERS" if set. *)
batch_traces: int option;
(** Batch traces? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
Note that traces and metrics are batched separately.
Default [Some 400].
*)
batch_metrics: int option;
(** Batch metrics? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
Note that traces and metrics are batched separately.
Default [None].
*)
batch_logs: int option;
(** Batch logs? See {!batch_metrics} for details.
Default [Some 400] *)
batch_timeout_ms: int;
(** Number of milliseconds after which we will emit a batch, even
incomplete.
@ -31,10 +16,8 @@ type t = private {
bg_threads: int;
(** Are there background threads, and how many? Default [4] *)
ticker_thread: bool;
(** Is there a ticker thread? Default [true].
This thread will regularly call [tick()] on the backend, to make
sure it makes progress, and regularly send events to the collector.
This option is ignored if [bg_threads=0]. *)
(** If true, start a thread that regularly checks if signals should
be sent to the collector. Default [true] *)
}
(** Configuration.
@ -45,20 +28,12 @@ val make :
?debug:bool ->
?url:string ->
?headers:(string * string) list ->
?batch_traces:int option ->
?batch_metrics:int option ->
?batch_logs:int option ->
?batch_timeout_ms:int ->
?thread:bool ->
?bg_threads:int ->
?ticker_thread:bool ->
unit ->
t
(** Make a configuration.
@param thread if true and [bg_threads] is not provided, we will pick a number
of bg threads. Otherwise the number of [bg_threads] superseeds this option.
*)
val pp : Format.formatter -> t -> unit

View file

@ -2,4 +2,4 @@
(name opentelemetry_client_ocurl)
(public_name opentelemetry-client-ocurl)
(libraries opentelemetry opentelemetry.atomic curl pbrt threads mtime
mtime.clock.os))
mtime.clock.os ezcurl ezcurl.core))

File diff suppressed because it is too large Load diff

View file

@ -14,10 +14,6 @@ val get_headers : unit -> (string * string) list
val set_headers : (string * string) list -> unit
(** Set http headers that are sent on every http query to the collector. *)
val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit
(** Set a lock/unlock pair to protect the critical sections
of {!Opentelemetry.Collector.BACKEND} *)
module Atomic = Opentelemetry_atomic.Atomic
module Config = Config

View file

@ -3,7 +3,7 @@ module Atomic = Opentelemetry_atomic.Atomic
let spf = Printf.sprintf
let ( let@ ) f x = f x
let ( let@ ) = ( @@ )
let sleep_inner = ref 0.1
@ -98,21 +98,13 @@ let () =
let ts_start = Unix.gettimeofday () in
let debug = ref false in
let thread = ref true in
let n_bg_threads = ref 0 in
let batch_traces = ref 400 in
let batch_metrics = ref 3 in
let opts =
[
"--debug", Arg.Bool (( := ) debug), " enable debug output";
"--thread", Arg.Bool (( := ) thread), " use a background thread";
( "--stress-alloc",
Arg.Bool (( := ) stress_alloc_),
" perform heavy allocs in inner loop" );
"--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch";
( "--batch-metrics",
Arg.Int (( := ) batch_metrics),
" size of metrics batch" );
"--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop";
"--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop";
"-j", Arg.Set_int n_jobs, " number of parallel jobs";
@ -123,17 +115,8 @@ let () =
Arg.parse opts (fun _ -> ()) "emit1 [opt]*";
let some_if_nzero r =
if !r > 0 then
Some !r
else
None
in
let config =
Opentelemetry_client_ocurl.Config.make ~debug:!debug
~batch_traces:(some_if_nzero batch_traces)
~batch_metrics:(some_if_nzero batch_metrics)
~thread:!thread
?bg_threads:
(let n = !n_bg_threads in
if n = 0 then