collector: if queue is full, drop item, and wakeup thread

This commit is contained in:
Simon Cruanes 2022-03-21 15:06:38 -04:00
parent 396ef4c366
commit a00d4d2383
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -155,7 +155,7 @@ module FQueue : sig
type 'a t type 'a t
val create : dummy:'a -> int -> 'a t val create : dummy:'a -> int -> 'a t
val size : _ t -> int val size : _ t -> int
val push : 'a t -> 'a -> unit val push : 'a t -> 'a -> bool (* true iff it could write element *)
val pop_iter_all : 'a t -> ('a -> unit) -> unit val pop_iter_all : 'a t -> ('a -> unit) -> unit
end = struct end = struct
type 'a t = { type 'a t = {
@ -170,11 +170,15 @@ end = struct
} }
let[@inline] size self = self.i let[@inline] size self = self.i
let[@inline] is_full self = self.i = Array.length self.arr
let push (self:_ t) x : unit = let push (self:_ t) x : bool =
assert (self.i < Array.length self.arr); if is_full self then false
self.arr.(self.i) <- x; else (
self.i <- 1 + self.i self.arr.(self.i) <- x;
self.i <- 1 + self.i;
true
)
let pop_iter_all (self: _ t) f = let pop_iter_all (self: _ t) f =
for j=0 to self.i-1 do for j=0 to self.i-1 do
@ -251,15 +255,14 @@ let mk_push (type a) ?batch () : (module PUSH with type elt = a) * (on_full_cb -
(module M : PUSH with type elt = a) (module M : PUSH with type elt = a)
| Some n -> | Some n ->
let q = FQueue.create ~dummy:(Obj.magic 0) (2 * n) in let q = FQueue.create ~dummy:(Obj.magic 0) (3 * n) in
let module M = struct let module M = struct
type elt = a type elt = a
let is_empty () = FQueue.size q = 0 let is_empty () = FQueue.size q = 0
let is_big_enough () = FQueue.size q >= n let is_big_enough () = FQueue.size q >= n
let push x = let push x =
FQueue.push q x; if not (FQueue.push q x) || FQueue.size q > n then (
if FQueue.size q > n then ( !on_full(); (* drop *)
!on_full()
) )
let pop_iter_all f = FQueue.pop_iter_all q f let pop_iter_all f = FQueue.pop_iter_all q f
end in end in
@ -374,7 +377,8 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let wakeup () = let wakeup () =
last_wakeup := Mtime_clock.now(); last_wakeup := Mtime_clock.now();
with_mutex_ m (fun () -> Condition.signal cond) with_mutex_ m (fun () -> Condition.signal cond);
Thread.yield()
in in
(* wake up if a batch is full *) (* wake up if a batch is full *)