mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-08 20:07:55 -04:00
custom queue in Bounded_queue_sync, remove bespoke stuff from Sync_queue
This commit is contained in:
parent
ad5ef8e99c
commit
741de6cece
5 changed files with 100 additions and 58 deletions
|
|
@ -22,11 +22,22 @@ type 'a t = {
|
||||||
on_non_empty: (unit -> unit) -> unit;
|
on_non_empty: (unit -> unit) -> unit;
|
||||||
(** [on_non_empty f] registers [f] to be called whenever the queue
|
(** [on_non_empty f] registers [f] to be called whenever the queue
|
||||||
transitions from empty to non-empty. *)
|
transitions from empty to non-empty. *)
|
||||||
try_pop: unit -> 'a pop_result;
|
try_pop: unit -> 'a pop_result; (** Try to pop an item right now. *)
|
||||||
(** Try to pop an item right now. @raise Closed if the *)
|
|
||||||
close: unit -> unit;
|
close: unit -> unit;
|
||||||
|
(** Close the queue. Items currently in the queue will still be accessible
|
||||||
|
to consumers until the queue is emptied out. Idempotent. *)
|
||||||
closed: unit -> bool;
|
closed: unit -> bool;
|
||||||
|
(** Is the queue closed {b for writing}. Consumers should only use
|
||||||
|
[try_pop] because a queue that's closed-for-writing might still
|
||||||
|
contain straggler items that need to be consumed.
|
||||||
|
|
||||||
|
This should be as fast and cheap as possible. *)
|
||||||
}
|
}
|
||||||
|
(** A bounded queue, with multiple producers and potentially multiple consumers.
|
||||||
|
|
||||||
|
All functions must be thread-safe except for [try_pop] which might not have
|
||||||
|
to be depending on the context (e.g. a Lwt-specific queue implementation
|
||||||
|
will consume only from the Lwt thread). *)
|
||||||
|
|
||||||
let[@inline] push (self : _ t) x : unit = self.push x
|
let[@inline] push (self : _ t) x : unit = self.push x
|
||||||
|
|
||||||
|
|
@ -40,10 +51,14 @@ let[@inline] close (self : _ t) : unit = self.close ()
|
||||||
|
|
||||||
let[@inline] closed (self : _ t) : bool = self.closed ()
|
let[@inline] closed (self : _ t) : bool = self.closed ()
|
||||||
|
|
||||||
|
(** Turn the writing end of the queue into an emitter. *)
|
||||||
let to_emitter (self : 'a t) : 'a Opentelemetry_emitter.Emitter.t =
|
let to_emitter (self : 'a t) : 'a Opentelemetry_emitter.Emitter.t =
|
||||||
let closed () = self.closed () in
|
let closed () = self.closed () in
|
||||||
let enabled () = not (closed ()) in
|
let enabled () = not (closed ()) in
|
||||||
let emit x = if x <> [] then push self x in
|
let emit x = if x <> [] then push self x in
|
||||||
let tick ~now:_ = () in
|
let tick ~now:_ = () in
|
||||||
|
|
||||||
|
(* NOTE: we cannot actually flush, only close. Emptying the queue is
|
||||||
|
fundamentally asynchronous because it's done by consumers *)
|
||||||
let flush_and_close () = close self in
|
let flush_and_close () = close self in
|
||||||
{ closed; enabled; emit; tick; flush_and_close }
|
{ closed; enabled; emit; tick; flush_and_close }
|
||||||
|
|
|
||||||
|
|
@ -1,17 +1,89 @@
|
||||||
module BQ = Bounded_queue
|
module BQ = Bounded_queue
|
||||||
|
|
||||||
|
exception Closed = Bounded_queue.Closed
|
||||||
|
|
||||||
|
(* a variant of {!Sync_queue} with more bespoke pushing behavior *)
|
||||||
|
module Q : sig
|
||||||
|
type 'a t
|
||||||
|
|
||||||
|
val create : unit -> 'a t
|
||||||
|
|
||||||
|
val close : _ t -> unit
|
||||||
|
|
||||||
|
val closed : _ t -> bool
|
||||||
|
|
||||||
|
val try_pop : 'a t -> 'a option
|
||||||
|
|
||||||
|
val push_while_not_full : high_watermark:int -> 'a t -> 'a list -> int * int
|
||||||
|
(** [push_while_not_full q ~high_watermark xs] tries to push each item of [x]
|
||||||
|
into [q].
|
||||||
|
|
||||||
|
An item is not pushed if the queue is "full" (size >= high_watermark).
|
||||||
|
|
||||||
|
This returns a pair [num_discarded, old_size] where [num_discarded] is the
|
||||||
|
number of items that could not be pushed, and [old_size] is the size
|
||||||
|
before anything was pushed. *)
|
||||||
|
end = struct
|
||||||
|
module UM = Opentelemetry_util.Util_mutex
|
||||||
|
|
||||||
|
type 'a t = {
|
||||||
|
mutex: Mutex.t;
|
||||||
|
q: 'a Queue.t;
|
||||||
|
mutable closed: bool;
|
||||||
|
}
|
||||||
|
|
||||||
|
let create () : _ t =
|
||||||
|
{ mutex = Mutex.create (); q = Queue.create (); closed = false }
|
||||||
|
|
||||||
|
(* NOTE: the race condition here is benign, assuming no tearing of
|
||||||
|
a value of type [bool] which OCaml's memory model should guarantee. *)
|
||||||
|
let[@inline] closed self = self.closed
|
||||||
|
|
||||||
|
let close (self : _ t) =
|
||||||
|
UM.protect self.mutex @@ fun () ->
|
||||||
|
if not self.closed then self.closed <- true
|
||||||
|
|
||||||
|
let push (self : _ t) x : unit =
|
||||||
|
UM.protect self.mutex @@ fun () ->
|
||||||
|
if self.closed then raise Closed;
|
||||||
|
Queue.push x self.q
|
||||||
|
|
||||||
|
let try_pop (self : 'a t) : 'a option =
|
||||||
|
UM.protect self.mutex @@ fun () ->
|
||||||
|
if self.closed then raise Closed;
|
||||||
|
try Some (Queue.pop self.q) with Queue.Empty -> None
|
||||||
|
|
||||||
|
let push_while_not_full ~high_watermark (self : 'a t) (xs : 'a list) :
|
||||||
|
int * int =
|
||||||
|
UM.protect self.mutex @@ fun () ->
|
||||||
|
if self.closed then raise Closed;
|
||||||
|
|
||||||
|
let old_size = Queue.length self.q in
|
||||||
|
let xs = ref xs in
|
||||||
|
|
||||||
|
let continue = ref true in
|
||||||
|
while !continue && Queue.length self.q < high_watermark do
|
||||||
|
match !xs with
|
||||||
|
| [] -> continue := false
|
||||||
|
| x :: tl_xs ->
|
||||||
|
xs := tl_xs;
|
||||||
|
Queue.push x self.q
|
||||||
|
done;
|
||||||
|
|
||||||
|
let n_discarded = List.length !xs in
|
||||||
|
n_discarded, old_size
|
||||||
|
end
|
||||||
|
|
||||||
type 'a state = {
|
type 'a state = {
|
||||||
n_discarded: int Atomic.t;
|
n_discarded: int Atomic.t;
|
||||||
high_watermark: int;
|
high_watermark: int;
|
||||||
q: 'a Sync_queue.t;
|
q: 'a Q.t;
|
||||||
on_non_empty: Cb_set.t;
|
on_non_empty: Cb_set.t;
|
||||||
}
|
}
|
||||||
|
|
||||||
let push (self : _ state) x =
|
let push (self : _ state) x =
|
||||||
let discarded, old_size =
|
let discarded, old_size =
|
||||||
try
|
try Q.push_while_not_full self.q ~high_watermark:self.high_watermark x
|
||||||
Sync_queue.push_while_not_full self.q ~high_watermark:self.high_watermark
|
|
||||||
x
|
|
||||||
with Sync_queue.Closed -> raise BQ.Closed
|
with Sync_queue.Closed -> raise BQ.Closed
|
||||||
in
|
in
|
||||||
|
|
||||||
|
|
@ -23,28 +95,27 @@ let push (self : _ state) x =
|
||||||
()
|
()
|
||||||
|
|
||||||
let try_pop (self : _ state) : _ BQ.pop_result =
|
let try_pop (self : _ state) : _ BQ.pop_result =
|
||||||
match Sync_queue.try_pop self.q with
|
match Q.try_pop self.q with
|
||||||
| Some x -> `Item x
|
| Some x -> `Item x
|
||||||
| None -> `Empty
|
| None -> `Empty
|
||||||
| exception Sync_queue.Closed -> `Closed
|
| exception Sync_queue.Closed -> `Closed
|
||||||
|
|
||||||
let to_bounded_queue (self : 'a state) : 'a BQ.t =
|
let to_bounded_queue (self : 'a state) : 'a BQ.t =
|
||||||
let closed () = Sync_queue.closed self.q in
|
let closed () = Q.closed self.q in
|
||||||
let num_discarded () = Atomic.get self.n_discarded in
|
let num_discarded () = Atomic.get self.n_discarded in
|
||||||
let push x = push self x in
|
let push x = push self x in
|
||||||
let on_non_empty = Cb_set.register self.on_non_empty in
|
let on_non_empty = Cb_set.register self.on_non_empty in
|
||||||
let try_pop () = try_pop self in
|
let try_pop () = try_pop self in
|
||||||
let close () = Sync_queue.close self.q in
|
let close () = Q.close self.q in
|
||||||
{ BQ.push; num_discarded; try_pop; on_non_empty; close; closed }
|
{ BQ.push; num_discarded; try_pop; on_non_empty; close; closed }
|
||||||
|
|
||||||
let create ~high_watermark () : _ BQ.t =
|
let create ~high_watermark () : _ BQ.t =
|
||||||
let st =
|
let st =
|
||||||
{
|
{
|
||||||
high_watermark;
|
high_watermark;
|
||||||
q = Sync_queue.create ();
|
q = Q.create ();
|
||||||
n_discarded = Atomic.make 0;
|
n_discarded = Atomic.make 0;
|
||||||
on_non_empty = Cb_set.create ();
|
on_non_empty = Cb_set.create ();
|
||||||
}
|
}
|
||||||
in
|
in
|
||||||
to_bounded_queue st
|
to_bounded_queue st
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,6 @@
|
||||||
(** Bounded queue based on {!Sync_queue} *)
|
(** Bounded queue based on simple synchronization primitives.
|
||||||
|
|
||||||
|
This is not the fastest queue but it should be versatile. *)
|
||||||
|
|
||||||
val create : high_watermark:int -> unit -> 'a Bounded_queue.t
|
val create : high_watermark:int -> unit -> 'a Bounded_queue.t
|
||||||
(** [create ~high_watermark ()] creates a new bounded queue based on
|
(** [create ~high_watermark ()] creates a new bounded queue based on
|
||||||
|
|
|
||||||
|
|
@ -17,10 +17,6 @@ let create () : _ t =
|
||||||
closed = false;
|
closed = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
(* NOTE: the race condition here is benign, assuming no tearing of
|
|
||||||
a value of type [bool] which OCaml's memory model should guarantee. *)
|
|
||||||
let[@inline] closed self = self.closed
|
|
||||||
|
|
||||||
let close (self : _ t) =
|
let close (self : _ t) =
|
||||||
UM.protect self.mutex @@ fun () ->
|
UM.protect self.mutex @@ fun () ->
|
||||||
if not self.closed then (
|
if not self.closed then (
|
||||||
|
|
@ -51,11 +47,6 @@ let pop (self : 'a t) : 'a =
|
||||||
in
|
in
|
||||||
UM.protect self.mutex loop
|
UM.protect self.mutex loop
|
||||||
|
|
||||||
let try_pop (self : 'a t) : 'a option =
|
|
||||||
UM.protect self.mutex @@ fun () ->
|
|
||||||
if self.closed then raise Closed;
|
|
||||||
try Some (Queue.pop self.q) with Queue.Empty -> None
|
|
||||||
|
|
||||||
let pop_all (self : 'a t) into : unit =
|
let pop_all (self : 'a t) into : unit =
|
||||||
let rec loop () =
|
let rec loop () =
|
||||||
if Queue.is_empty self.q then (
|
if Queue.is_empty self.q then (
|
||||||
|
|
@ -66,26 +57,3 @@ let pop_all (self : 'a t) into : unit =
|
||||||
Queue.transfer self.q into
|
Queue.transfer self.q into
|
||||||
in
|
in
|
||||||
UM.protect self.mutex loop
|
UM.protect self.mutex loop
|
||||||
|
|
||||||
let push_while_not_full ~high_watermark (self : 'a t) (xs : 'a list) : int * int
|
|
||||||
=
|
|
||||||
UM.protect self.mutex @@ fun () ->
|
|
||||||
if self.closed then raise Closed;
|
|
||||||
|
|
||||||
let old_size = Queue.length self.q in
|
|
||||||
let xs = ref xs in
|
|
||||||
|
|
||||||
let continue = ref true in
|
|
||||||
while !continue && Queue.length self.q < high_watermark do
|
|
||||||
match !xs with
|
|
||||||
| [] -> continue := false
|
|
||||||
| x :: tl_xs ->
|
|
||||||
xs := tl_xs;
|
|
||||||
Queue.push x self.q
|
|
||||||
done;
|
|
||||||
|
|
||||||
(* pushed at least one item *)
|
|
||||||
if Queue.length self.q <> old_size then Condition.broadcast self.cond;
|
|
||||||
|
|
||||||
let n_discarded = List.length !xs in
|
|
||||||
n_discarded, old_size
|
|
||||||
|
|
|
||||||
|
|
@ -14,25 +14,11 @@ val pop : 'a t -> 'a
|
||||||
(** [pop q] pops the next element in [q]. It might block until an element comes.
|
(** [pop q] pops the next element in [q]. It might block until an element comes.
|
||||||
@raise Closed if the queue was closed before a new element was available. *)
|
@raise Closed if the queue was closed before a new element was available. *)
|
||||||
|
|
||||||
val try_pop : 'a t -> 'a option
|
|
||||||
|
|
||||||
val pop_all : 'a t -> 'a Queue.t -> unit
|
val pop_all : 'a t -> 'a Queue.t -> unit
|
||||||
(** [pop_all q into] pops all the elements of [q] and moves them into [into]. if
|
(** [pop_all q into] pops all the elements of [q] and moves them into [into]. if
|
||||||
no element is available, it will block until it successfully transfers at
|
no element is available, it will block until it successfully transfers at
|
||||||
least one item to [into].
|
least one item to [into].
|
||||||
@raise Closed if the queue was closed before a new element was available. *)
|
@raise Closed if the queue was closed before a new element was available. *)
|
||||||
|
|
||||||
val closed : _ t -> bool
|
|
||||||
|
|
||||||
val close : _ t -> unit
|
val close : _ t -> unit
|
||||||
(** Close the queue, meaning there won't be any more [push] allowed. *)
|
(** Close the queue, meaning there won't be any more [push] allowed. *)
|
||||||
|
|
||||||
val push_while_not_full : high_watermark:int -> 'a t -> 'a list -> int * int
|
|
||||||
(** [push_while_not_full q ~high_watermark xs] tries to push each item of [x]
|
|
||||||
into [q].
|
|
||||||
|
|
||||||
An item is not pushed if the queue is "full" (size >= high_watermark).
|
|
||||||
|
|
||||||
This returns a pair [num_discarded, old_size] where [num_discarded] is the
|
|
||||||
number of items that could not be pushed, and [old_size] is the size before
|
|
||||||
anything was pushed. *)
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue