mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-08 20:07:55 -04:00
move Util_thread.MCond to Notifier_sync
This commit is contained in:
parent
cd4eb06ca6
commit
e0ff144248
2 changed files with 16 additions and 23 deletions
|
|
@ -1,8 +1,21 @@
|
||||||
include Util_thread.MCond
|
|
||||||
module IO = Generic_io.Direct_style
|
module IO = Generic_io.Direct_style
|
||||||
|
|
||||||
|
type t = {
|
||||||
|
mutex: Mutex.t;
|
||||||
|
cond: Condition.t;
|
||||||
|
}
|
||||||
|
|
||||||
|
let create () : t = { mutex = Mutex.create (); cond = Condition.create () }
|
||||||
|
|
||||||
|
let trigger self = Condition.signal self.cond
|
||||||
|
|
||||||
let delete = ignore
|
let delete = ignore
|
||||||
|
|
||||||
let trigger = signal
|
let[@inline] protect self f = Util_mutex.protect self.mutex f
|
||||||
|
|
||||||
let register_bounded_queue = wakeup_from_bq
|
(** NOTE: the mutex must be acquired *)
|
||||||
|
let wait self = Condition.wait self.cond self.mutex
|
||||||
|
|
||||||
|
(** Ensure we get signalled when the queue goes from empty to non-empty *)
|
||||||
|
let register_bounded_queue (self : t) (bq : _ Bounded_queue.t) : unit =
|
||||||
|
Bounded_queue.on_non_empty bq (fun () -> trigger self)
|
||||||
|
|
|
||||||
|
|
@ -42,23 +42,3 @@ let setup_ticker_thread ~stop ~sleep_ms (exp : OTEL.Exporter.t) () =
|
||||||
(Printexc.to_string exn)
|
(Printexc.to_string exn)
|
||||||
in
|
in
|
||||||
start_bg_thread tick_loop
|
start_bg_thread tick_loop
|
||||||
|
|
||||||
module MCond = struct
|
|
||||||
type t = {
|
|
||||||
mutex: Mutex.t;
|
|
||||||
cond: Condition.t;
|
|
||||||
}
|
|
||||||
|
|
||||||
let create () : t = { mutex = Mutex.create (); cond = Condition.create () }
|
|
||||||
|
|
||||||
let signal self = Condition.signal self.cond
|
|
||||||
|
|
||||||
let[@inline] protect self f = Util_mutex.protect self.mutex f
|
|
||||||
|
|
||||||
(** NOTE: the mutex must be acquired *)
|
|
||||||
let wait self = Condition.wait self.cond self.mutex
|
|
||||||
|
|
||||||
(** Ensure we get signalled when the queue goes from empty to non-empty *)
|
|
||||||
let wakeup_from_bq (self : t) (bq : _ Bounded_queue.t) : unit =
|
|
||||||
Bounded_queue.on_non_empty bq (fun () -> signal self)
|
|
||||||
end
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue