mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-08 03:47:59 -04:00
sync_queue: more operations, including a batch push
This commit is contained in:
parent
a98a1aeb3f
commit
0323c9204f
2 changed files with 46 additions and 0 deletions
|
|
@ -17,6 +17,10 @@ let create () : _ t =
|
||||||
closed = false;
|
closed = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
(* NOTE: the race condition here is benign, assuming no tearing of
|
||||||
|
a value of type [bool] which OCaml's memory model should guarantee. *)
|
||||||
|
let[@inline] closed self = self.closed
|
||||||
|
|
||||||
let close (self : _ t) =
|
let close (self : _ t) =
|
||||||
UM.protect self.mutex @@ fun () ->
|
UM.protect self.mutex @@ fun () ->
|
||||||
if not self.closed then (
|
if not self.closed then (
|
||||||
|
|
@ -47,6 +51,11 @@ let pop (self : 'a t) : 'a =
|
||||||
in
|
in
|
||||||
UM.protect self.mutex loop
|
UM.protect self.mutex loop
|
||||||
|
|
||||||
|
let try_pop (self : 'a t) : 'a option =
|
||||||
|
UM.protect self.mutex @@ fun () ->
|
||||||
|
if self.closed then raise Closed;
|
||||||
|
try Some (Queue.pop self.q) with Queue.Empty -> None
|
||||||
|
|
||||||
let pop_all (self : 'a t) into : unit =
|
let pop_all (self : 'a t) into : unit =
|
||||||
let rec loop () =
|
let rec loop () =
|
||||||
if Queue.is_empty self.q then (
|
if Queue.is_empty self.q then (
|
||||||
|
|
@ -57,3 +66,26 @@ let pop_all (self : 'a t) into : unit =
|
||||||
Queue.transfer self.q into
|
Queue.transfer self.q into
|
||||||
in
|
in
|
||||||
UM.protect self.mutex loop
|
UM.protect self.mutex loop
|
||||||
|
|
||||||
|
let push_while_not_full ~high_watermark (self : 'a t) (xs : 'a list) : int * int
|
||||||
|
=
|
||||||
|
UM.protect self.mutex @@ fun () ->
|
||||||
|
if self.closed then raise Closed;
|
||||||
|
|
||||||
|
let old_size = Queue.length self.q in
|
||||||
|
let xs = ref xs in
|
||||||
|
|
||||||
|
let continue = ref true in
|
||||||
|
while !continue && Queue.length self.q < high_watermark do
|
||||||
|
match !xs with
|
||||||
|
| [] -> continue := false
|
||||||
|
| x :: tl_xs ->
|
||||||
|
xs := tl_xs;
|
||||||
|
Queue.push x self.q
|
||||||
|
done;
|
||||||
|
|
||||||
|
(* pushed at least one item *)
|
||||||
|
if Queue.length self.q <> old_size then Condition.broadcast self.cond;
|
||||||
|
|
||||||
|
let n_discarded = List.length !xs in
|
||||||
|
n_discarded, old_size
|
||||||
|
|
|
||||||
|
|
@ -14,11 +14,25 @@ val pop : 'a t -> 'a
|
||||||
(** [pop q] pops the next element in [q]. It might block until an element comes.
|
(** [pop q] pops the next element in [q]. It might block until an element comes.
|
||||||
@raise Closed if the queue was closed before a new element was available. *)
|
@raise Closed if the queue was closed before a new element was available. *)
|
||||||
|
|
||||||
|
val try_pop : 'a t -> 'a option
|
||||||
|
|
||||||
val pop_all : 'a t -> 'a Queue.t -> unit
|
val pop_all : 'a t -> 'a Queue.t -> unit
|
||||||
(** [pop_all q into] pops all the elements of [q] and moves them into [into]. if
|
(** [pop_all q into] pops all the elements of [q] and moves them into [into]. if
|
||||||
no element is available, it will block until it successfully transfers at
|
no element is available, it will block until it successfully transfers at
|
||||||
least one item to [into].
|
least one item to [into].
|
||||||
@raise Closed if the queue was closed before a new element was available. *)
|
@raise Closed if the queue was closed before a new element was available. *)
|
||||||
|
|
||||||
|
val closed : _ t -> bool
|
||||||
|
|
||||||
val close : _ t -> unit
|
val close : _ t -> unit
|
||||||
(** Close the queue, meaning there won't be any more [push] allowed. *)
|
(** Close the queue, meaning there won't be any more [push] allowed. *)
|
||||||
|
|
||||||
|
val push_while_not_full : high_watermark:int -> 'a t -> 'a list -> int * int
|
||||||
|
(** [push_while_not_full q ~high_watermark xs] tries to push each item of [x]
|
||||||
|
into [q].
|
||||||
|
|
||||||
|
An item is not pushed if the queue is "full" (size >= high_watermark).
|
||||||
|
|
||||||
|
This returns a pair [num_discarded, old_size] where [num_discarded] is the
|
||||||
|
number of items that could not be pushed, and [old_size] is the size before
|
||||||
|
anything was pushed. *)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue