mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-11 05:28:33 -05:00
fix ws_deque: circular array is also in an atomic
This commit is contained in:
parent
1ed25e5aca
commit
629b66662f
1 changed files with 22 additions and 16 deletions
|
|
@ -57,44 +57,47 @@ type 'a t = {
|
||||||
top: int A.t; (** Where we steal *)
|
top: int A.t; (** Where we steal *)
|
||||||
bottom: int A.t; (** Where we push/pop from the owning thread *)
|
bottom: int A.t; (** Where we push/pop from the owning thread *)
|
||||||
mutable top_cached: int; (** Last read value of [top] *)
|
mutable top_cached: int; (** Last read value of [top] *)
|
||||||
mutable arr: 'a CA.t; (** The circular array *)
|
arr: 'a CA.t A.t; (** The circular array *)
|
||||||
}
|
}
|
||||||
|
|
||||||
let create () : _ t =
|
let create () : _ t =
|
||||||
let arr = CA.create ~log_size:4 () in
|
let arr = CA.create ~log_size:4 () in
|
||||||
{ top = A.make 0; top_cached = 0; bottom = A.make 0; arr }
|
{ top = A.make 0; top_cached = 0; bottom = A.make 0; arr = A.make arr }
|
||||||
|
|
||||||
let[@inline] size (self : _ t) : int = max 0 (A.get self.bottom - A.get self.top)
|
let[@inline] size (self : _ t) : int = max 0 (A.get self.bottom - A.get self.top)
|
||||||
|
|
||||||
let push (self : 'a t) (x : 'a) : unit =
|
let push (self : 'a t) (x : 'a) : unit =
|
||||||
let b = A.get self.bottom in
|
let b = A.get self.bottom in
|
||||||
let t_approx = self.top_cached in
|
let t_approx = self.top_cached in
|
||||||
|
let arr = ref (A.get self.arr) in
|
||||||
|
|
||||||
(* Section 2.3: over-approximation of size.
|
(* Section 2.3: over-approximation of size.
|
||||||
Only if it seems too big do we actually read [t]. *)
|
Only if it seems too big do we actually read [t]. *)
|
||||||
let size_approx = b - t_approx in
|
let size_approx = b - t_approx in
|
||||||
if size_approx >= CA.size self.arr - 1 then (
|
if size_approx >= CA.size !arr - 1 then (
|
||||||
(* we need to read the actual value of [top], which might entail contention. *)
|
(* we need to read the actual value of [top], which might entail contention. *)
|
||||||
let t = A.get self.top in
|
let t = A.get self.top in
|
||||||
self.top_cached <- t;
|
self.top_cached <- t;
|
||||||
let size = b - t in
|
let size = b - t in
|
||||||
|
|
||||||
if size >= CA.size self.arr - 1 then
|
if size >= CA.size !arr - 1 then (
|
||||||
self.arr <- CA.grow self.arr ~top:t ~bottom:b
|
arr := CA.grow !arr ~top:t ~bottom:b;
|
||||||
|
A.set self.arr !arr
|
||||||
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
CA.set self.arr b x;
|
CA.set !arr b x;
|
||||||
A.set self.bottom (b + 1)
|
A.set self.bottom (b + 1)
|
||||||
|
|
||||||
let perhaps_shrink (self : _ t) ~top ~bottom : unit =
|
let perhaps_shrink (self : _ t) arr ~top ~bottom : unit =
|
||||||
let size = bottom - top in
|
let size = bottom - top in
|
||||||
let ca_size = CA.size self.arr in
|
let ca_size = CA.size arr in
|
||||||
if ca_size >= 256 && size <= ca_size / 3 then
|
if ca_size >= 256 && size <= ca_size / 3 then
|
||||||
self.arr <- CA.shrink self.arr ~top ~bottom
|
A.set self.arr (CA.shrink arr ~top ~bottom)
|
||||||
|
|
||||||
let pop (self : 'a t) : 'a option =
|
let pop (self : 'a t) : 'a option =
|
||||||
let b = A.get self.bottom in
|
let b = A.get self.bottom in
|
||||||
let arr = self.arr in
|
let arr = A.get self.arr in
|
||||||
let b = b - 1 in
|
let b = b - 1 in
|
||||||
A.set self.bottom b;
|
A.set self.bottom b;
|
||||||
|
|
||||||
|
|
@ -103,31 +106,34 @@ let pop (self : 'a t) : 'a option =
|
||||||
|
|
||||||
let size = b - t in
|
let size = b - t in
|
||||||
if size < 0 then (
|
if size < 0 then (
|
||||||
|
(* reset to basic empty state *)
|
||||||
A.set self.bottom t;
|
A.set self.bottom t;
|
||||||
None
|
None
|
||||||
) else if size > 0 then (
|
) else if size > 0 then (
|
||||||
|
(* can pop without modifying [top] *)
|
||||||
let x = CA.get arr b in
|
let x = CA.get arr b in
|
||||||
perhaps_shrink self ~bottom:b ~top:t;
|
perhaps_shrink self arr ~bottom:b ~top:t;
|
||||||
Some x
|
Some x
|
||||||
) else (
|
) else (
|
||||||
assert (size = 0);
|
assert (size = 0);
|
||||||
|
(* there was exactly one slot, so we might be racing against stealers
|
||||||
|
to update [self.top] *)
|
||||||
if A.compare_and_set self.top t (t + 1) then (
|
if A.compare_and_set self.top t (t + 1) then (
|
||||||
(* exactly one slot, so we might be racing against stealers
|
|
||||||
to update [self.top] *)
|
|
||||||
let x = CA.get arr b in
|
let x = CA.get arr b in
|
||||||
A.set self.bottom (t + 1);
|
A.set self.bottom (t + 1);
|
||||||
Some x
|
Some x
|
||||||
) else
|
) else (
|
||||||
|
A.set self.bottom (t + 1);
|
||||||
None
|
None
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
let steal (self : 'a t) : 'a option =
|
let steal (self : 'a t) : 'a option =
|
||||||
(* read [top], but do not update [top_cached]
|
(* read [top], but do not update [top_cached]
|
||||||
as we're in another thread *)
|
as we're in another thread *)
|
||||||
let t = A.get self.top in
|
let t = A.get self.top in
|
||||||
|
|
||||||
let b = A.get self.bottom in
|
let b = A.get self.bottom in
|
||||||
let arr = self.arr in
|
let arr = A.get self.arr in
|
||||||
|
|
||||||
let size = b - t in
|
let size = b - t in
|
||||||
if size <= 0 then
|
if size <= 0 then
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue