trace-tef: simplify code

This commit is contained in:
Simon Cruanes 2023-12-06 10:52:09 -05:00
parent c16666d214
commit 317509681e
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
6 changed files with 59 additions and 103 deletions

View file

@ -1,9 +1,9 @@
type 'a t = { type 'a t = {
mutex: Mutex.t; mutex: Mutex.t;
cond: Condition.t; cond: Condition.t;
q: 'a Mpsc_queue.t; q: 'a Mpsc_bag.t;
mutable closed: bool; mutable closed: bool;
mutable consumer_waiting: bool; consumer_waiting: bool Atomic.t;
} }
exception Closed exception Closed
@ -12,9 +12,9 @@ let create () : _ t =
{ {
mutex = Mutex.create (); mutex = Mutex.create ();
cond = Condition.create (); cond = Condition.create ();
q = Mpsc_queue.create (); q = Mpsc_bag.create ();
closed = false; closed = false;
consumer_waiting = false; consumer_waiting = Atomic.make false;
} }
let close (self : _ t) = let close (self : _ t) =
@ -27,35 +27,23 @@ let close (self : _ t) =
let push (self : _ t) x : unit = let push (self : _ t) x : unit =
if self.closed then raise Closed; if self.closed then raise Closed;
Mpsc_queue.enqueue self.q x; Mpsc_bag.add self.q x;
if self.closed then raise Closed; if self.closed then raise Closed;
if self.consumer_waiting then ( if Atomic.get self.consumer_waiting then (
(* wakeup consumer *) (* wakeup consumer *)
Mutex.lock self.mutex; Mutex.lock self.mutex;
Condition.broadcast self.cond; Condition.broadcast self.cond;
Mutex.unlock self.mutex Mutex.unlock self.mutex
) )
let rec pop (self : 'a t) : 'a =
match Mpsc_queue.dequeue self.q with
| x -> x
| exception Mpsc_queue.Empty ->
if self.closed then raise Closed;
Mutex.lock self.mutex;
self.consumer_waiting <- true;
Condition.wait self.cond self.mutex;
self.consumer_waiting <- false;
Mutex.unlock self.mutex;
pop self
let rec pop_all (self : 'a t) : 'a list = let rec pop_all (self : 'a t) : 'a list =
match Mpsc_queue.dequeue_all self.q with match Mpsc_bag.pop_all self.q with
| l -> l | l -> List.rev l
| exception Mpsc_queue.Empty -> | exception Mpsc_bag.Empty ->
if self.closed then raise Closed; if self.closed then raise Closed;
Mutex.lock self.mutex; Mutex.lock self.mutex;
self.consumer_waiting <- true; Atomic.set self.consumer_waiting true;
Condition.wait self.cond self.mutex; Condition.wait self.cond self.mutex;
self.consumer_waiting <- false; Atomic.set self.consumer_waiting false;
Mutex.unlock self.mutex; Mutex.unlock self.mutex;
pop_all self pop_all self

View file

@ -10,13 +10,9 @@ val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] into [q], and returns [()]. (** [push q x] pushes [x] into [q], and returns [()].
@raise Closed if [close q] was previously called.*) @raise Closed if [close q] was previously called.*)
val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *)
val pop_all : 'a t -> 'a list val pop_all : 'a t -> 'a list
(** [transfer bq q2] transfers all items presently (** [pop_all bq] returns all items presently
in [bq] into [q2], and clears [bq]. in [bq], in the same order, and clears [bq].
It blocks if no element is in [bq]. *) It blocks if no element is in [bq]. *)
val close : _ t -> unit val close : _ t -> unit

32
src/tef/mpsc_bag.ml Normal file
View file

@ -0,0 +1,32 @@
type 'a t = { bag: 'a list Atomic.t } [@@unboxed]
let create () =
let bag = Atomic.make [] in
{ bag }
module Backoff = struct
type t = int
let default = 2
let once (b : t) : t =
for _i = 1 to b do
Relax_.cpu_relax ()
done;
min (b * 2) 256
end
let rec add backoff t x =
let before = Atomic.get t.bag in
let after = x :: before in
if not (Atomic.compare_and_set t.bag before after) then
add (Backoff.once backoff) t x
let[@inline] add t x = add Backoff.default t x
exception Empty
let[@inline] pop_all t : _ list =
match Atomic.exchange t.bag [] with
| [] -> raise_notrace Empty
| l -> l

14
src/tef/mpsc_bag.mli Normal file
View file

@ -0,0 +1,14 @@
(** A multi-producer, single-consumer bag *)
type 'a t
val create : unit -> 'a t
val add : 'a t -> 'a -> unit
(** [add q x] adds [x] in the bag. *)
exception Empty
val pop_all : 'a t -> 'a list
(** Return all current items in an unspecified order.
@raise Empty if empty *)

View file

@ -1,60 +0,0 @@
type 'a t = {
tail: 'a list Atomic.t;
head: 'a list ref;
}
let create () =
let tail = Atomic.make [] in
(* padding *)
ignore (Sys.opaque_identity (Array.make 16 ()));
let head = ref [] in
{ tail; head }
module Backoff = struct
type t = int
let default = 2
let once (b : t) : t =
let actual_b = b + Random.int 4 in
for _i = 1 to actual_b do
Relax_.cpu_relax ()
done;
min (b * 2) 256
end
let rec enqueue backoff t x =
let before = Atomic.get t.tail in
let after = x :: before in
if not (Atomic.compare_and_set t.tail before after) then
enqueue (Backoff.once backoff) t x
let enqueue t x = enqueue Backoff.default t x
exception Empty
let dequeue t =
match !(t.head) with
| x :: xs ->
t.head := xs;
x
| [] ->
(match Atomic.exchange t.tail [] with
| [] -> raise_notrace Empty
| [ x ] -> x
| x :: xs ->
(match List.rev_append [ x ] xs with
| x :: xs ->
t.head := xs;
x
| [] -> assert false))
let dequeue_all t : _ list =
match !(t.head) with
| _ :: _ as l ->
t.head := [];
l
| [] ->
(match Atomic.exchange t.tail [] with
| [] -> raise_notrace Empty
| l -> List.rev l)

View file

@ -1,14 +0,0 @@
(** A multi-producer, single-consumer queue (from picos) *)
type 'a t
val create : unit -> 'a t
val enqueue : 'a t -> 'a -> unit
exception Empty
val dequeue : 'a t -> 'a
(** @raise Empty if empty *)
val dequeue_all : 'a t -> 'a list
(** @raise Empty if empty *)