diff --git a/src/ws_deque_.ml b/src/ws_deque_.ml index 488d651a..378dc14e 100644 --- a/src/ws_deque_.ml +++ b/src/ws_deque_.ml @@ -57,44 +57,47 @@ type 'a t = { top: int A.t; (** Where we steal *) bottom: int A.t; (** Where we push/pop from the owning thread *) mutable top_cached: int; (** Last read value of [top] *) - mutable arr: 'a CA.t; (** The circular array *) + arr: 'a CA.t A.t; (** The circular array *) } let create () : _ t = let arr = CA.create ~log_size:4 () in - { top = A.make 0; top_cached = 0; bottom = A.make 0; arr } + { top = A.make 0; top_cached = 0; bottom = A.make 0; arr = A.make arr } let[@inline] size (self : _ t) : int = max 0 (A.get self.bottom - A.get self.top) let push (self : 'a t) (x : 'a) : unit = let b = A.get self.bottom in let t_approx = self.top_cached in + let arr = ref (A.get self.arr) in (* Section 2.3: over-approximation of size. Only if it seems too big do we actually read [t]. *) let size_approx = b - t_approx in - if size_approx >= CA.size self.arr - 1 then ( + if size_approx >= CA.size !arr - 1 then ( (* we need to read the actual value of [top], which might entail contention. *) let t = A.get self.top in self.top_cached <- t; let size = b - t in - if size >= CA.size self.arr - 1 then - self.arr <- CA.grow self.arr ~top:t ~bottom:b + if size >= CA.size !arr - 1 then ( + arr := CA.grow !arr ~top:t ~bottom:b; + A.set self.arr !arr + ) ); - CA.set self.arr b x; + CA.set !arr b x; A.set self.bottom (b + 1) -let perhaps_shrink (self : _ t) ~top ~bottom : unit = +let perhaps_shrink (self : _ t) arr ~top ~bottom : unit = let size = bottom - top in - let ca_size = CA.size self.arr in + let ca_size = CA.size arr in if ca_size >= 256 && size <= ca_size / 3 then - self.arr <- CA.shrink self.arr ~top ~bottom + A.set self.arr (CA.shrink arr ~top ~bottom) let pop (self : 'a t) : 'a option = let b = A.get self.bottom in - let arr = self.arr in + let arr = A.get self.arr in let b = b - 1 in A.set self.bottom b; @@ -103,31 +106,34 @@ let pop (self : 'a t) : 'a option = let size = b - t in if size < 0 then ( + (* reset to basic empty state *) A.set self.bottom t; None ) else if size > 0 then ( + (* can pop without modifying [top] *) let x = CA.get arr b in - perhaps_shrink self ~bottom:b ~top:t; + perhaps_shrink self arr ~bottom:b ~top:t; Some x ) else ( assert (size = 0); + (* there was exactly one slot, so we might be racing against stealers + to update [self.top] *) if A.compare_and_set self.top t (t + 1) then ( - (* exactly one slot, so we might be racing against stealers - to update [self.top] *) let x = CA.get arr b in A.set self.bottom (t + 1); Some x - ) else + ) else ( + A.set self.bottom (t + 1); None + ) ) let steal (self : 'a t) : 'a option = (* read [top], but do not update [top_cached] as we're in another thread *) let t = A.get self.top in - let b = A.get self.bottom in - let arr = self.arr in + let arr = A.get self.arr in let size = b - t in if size <= 0 then