limit scope of locks

This commit is contained in:
Simon Cruanes 2022-07-06 14:22:53 -04:00
parent 6d72a6fac0
commit 053493db8b
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -143,6 +143,9 @@ end
module type BATCH = sig end
(** Batch of resources to be pushed later.
This type is thread-safe. *)
module Batch : sig
type 'a t
@ -164,6 +167,7 @@ module Batch : sig
@param now passed to implement timeout *)
val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t
(** Create a new batch *)
end = struct
type 'a t = {
lock: Mutex.t;
@ -266,12 +270,6 @@ let start_bg_thread (f : unit -> unit) : unit =
in
ignore (Thread.create run () : Thread.t)
let l_is_empty = function
| [] -> true
| _ :: _ -> false
let batch_is_empty = List.for_all l_is_empty
(* make an emitter.
exceptions inside should be caught, see
@ -295,9 +293,9 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let batch_logs : Logs.resource_logs list Batch.t =
Batch.make ?batch:config.batch_logs ?timeout ()
let on_tick_cbs_ = ref (ref [])
let on_tick_cbs_ = Atomic.make (ref [])
let set_on_tick_callbacks = ( := ) on_tick_cbs_
let set_on_tick_callbacks = Atomic.set on_tick_cbs_
let send_http_ (httpc : Httpc.t) encoder ~path ~encode x : unit =
Pbrt.Encoder.reset encoder;
@ -373,13 +371,39 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
ignore (emit_traces_maybe ~now ~force:true httpc encoder : bool);
ignore (emit_logs_maybe ~now ~force:true httpc encoder : bool);
ignore (emit_metrics_maybe ~now ~force:true httpc encoder : bool)
end in
if config.bg_threads > 0 then (
(let m = Mutex.create () in
Lock.set_mutex
~lock:(fun () -> Mutex.lock m)
~unlock:(fun () -> Mutex.unlock m));
let tick_common_ () =
if Atomic.get needs_gc_metrics then sample_gc_metrics ();
List.iter
(fun f ->
try f ()
with e ->
Printf.eprintf "on tick callback raised: %s\n"
(Printexc.to_string e))
!(Atomic.get on_tick_cbs_);
()
let setup_ticker_thread ~tick ~finally () =
(* thread that calls [tick()] regularly, to help enforce timeouts *)
let tick_thread () =
let@ () =
Fun.protect ~finally:(fun () ->
Atomic.set stop true;
finally ())
in
while not @@ Atomic.get stop do
Thread.delay 0.5;
tick ()
done
in
start_bg_thread tick_thread
end in
(let m = Mutex.create () in
Lock.set_mutex
~lock:(fun () -> Mutex.lock m)
~unlock:(fun () -> Mutex.unlock m));
if config.bg_threads > 0 then (
let m = Mutex.create () in
let cond = Condition.create () in
@ -420,14 +444,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
in
let tick () =
if Atomic.get needs_gc_metrics then sample_gc_metrics ();
List.iter
(fun f ->
try f ()
with e ->
Printf.eprintf "on tick callback raised: %s\n"
(Printexc.to_string e))
!(!on_tick_cbs_);
tick_common_ ();
let now = Mtime_clock.now () in
if Atomic.get stop then
@ -440,18 +457,8 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
wakeup ~all:false ()
in
if config.ticker_thread then (
(* thread that calls [tick()] regularly, to help enforce timeouts *)
let tick_thread () =
while not @@ Atomic.get stop do
Thread.delay 0.5;
tick ()
done;
wakeup ~all:true ()
in
start_bg_thread tick_thread
);
if config.ticker_thread then
setup_ticker_thread ~tick ~finally:(fun () -> wakeup ~all:true ()) ();
let module M = struct
let push_trace e = if Batch.push batch_traces e then wakeup ~all:false ()
@ -468,8 +475,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let cleanup () =
Atomic.set stop true;
if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!";
(* wakeup everyone *)
with_mutex_ m (fun () -> Condition.broadcast cond)
wakeup ~all:true ()
end in
(module M)
) else (
@ -477,10 +483,16 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let encoder = Pbrt.Encoder.create () in
let module M = struct
(* we make sure that this is thread-safe, even though we don't have a
background thread. There can still be a ticker thread, and there
can also be several user threads that produce spans and call
the emit functions. *)
let push_trace e =
let@ () = guard_exn_ in
Batch.push' batch_traces e;
let now = Mtime_clock.now () in
let@ () = Lock.with_lock in
ignore (emit_traces_maybe ~now httpc encoder : bool)
let push_metrics e =
@ -488,24 +500,32 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
if Atomic.get needs_gc_metrics then sample_gc_metrics ();
Batch.push' batch_metrics e;
let now = Mtime_clock.now () in
let@ () = Lock.with_lock in
ignore (emit_metrics_maybe ~now httpc encoder : bool)
let push_logs e =
let@ () = guard_exn_ in
Batch.push' batch_logs e;
let now = Mtime_clock.now () in
let@ () = Lock.with_lock in
ignore (emit_logs_maybe ~now httpc encoder : bool)
let set_on_tick_callbacks = set_on_tick_callbacks
let tick () =
if Atomic.get needs_gc_metrics then sample_gc_metrics ();
let@ () = Lock.with_lock in
let now = Mtime_clock.now () in
ignore (emit_traces_maybe ~now httpc encoder : bool);
ignore (emit_metrics_maybe ~now httpc encoder : bool);
ignore (emit_logs_maybe ~now httpc encoder : bool);
()
(* make sure we have a ticker thread, if required *)
let () =
if config.ticker_thread then
setup_ticker_thread ~tick ~finally:ignore ()
let cleanup () =
if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!";
emit_all_force httpc encoder;
@ -529,11 +549,11 @@ end)
{
send =
(fun l ~ret ->
let@ () = Lock.with_lock in
if !debug_ then
(if !debug_ then
let@ () = Lock.with_lock in
Format.eprintf "send spans %a@."
(Format.pp_print_list Trace.pp_resource_spans)
l;
l);
push_trace l;
ret ());
}
@ -581,12 +601,11 @@ end)
{
send =
(fun m ~ret ->
if !debug_ then
(if !debug_ then
let@ () = Lock.with_lock in
Format.eprintf "send metrics %a@."
(Format.pp_print_list Metrics.pp_resource_metrics)
m;
let@ () = Lock.with_lock in
m);
let m = List.rev_append (additional_metrics ()) m in
push_metrics m;
@ -597,12 +616,12 @@ end)
{
send =
(fun m ~ret ->
if !debug_ then
(if !debug_ then
let@ () = Lock.with_lock in
Format.eprintf "send logs %a@."
(Format.pp_print_list Logs.pp_resource_logs)
m;
m);
let@ () = Lock.with_lock in
push_logs m;
ret ());
}