feat util: remove b_queue, add Rpool

to be used in various buffer pools.
This commit is contained in:
Simon Cruanes 2025-05-07 13:06:55 -04:00
parent 005626a2cd
commit 4454975a61
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 69 additions and 126 deletions

View file

@ -1,65 +0,0 @@
module A = Trace_core.Internal_.Atomic_
type 'a t = {
mutex: Mutex.t;
cond: Condition.t;
q: 'a Mpsc_bag.t;
mutable closed: bool;
consumer_waiting: bool A.t;
}
exception Closed
let create () : _ t =
{
mutex = Mutex.create ();
cond = Condition.create ();
q = Mpsc_bag.create ();
closed = false;
consumer_waiting = A.make false;
}
let close (self : _ t) =
Mutex.lock self.mutex;
if not self.closed then (
self.closed <- true;
Condition.broadcast self.cond (* awake waiters so they fail *)
);
Mutex.unlock self.mutex
let push (self : _ t) x : unit =
if self.closed then raise Closed;
Mpsc_bag.add self.q x;
if self.closed then raise Closed;
if A.get self.consumer_waiting then (
(* wakeup consumer *)
Mutex.lock self.mutex;
Condition.broadcast self.cond;
Mutex.unlock self.mutex
)
let rec pop_all (self : 'a t) : 'a list =
match Mpsc_bag.pop_all self.q with
| Some l -> l
| None ->
if self.closed then raise Closed;
Mutex.lock self.mutex;
A.set self.consumer_waiting true;
(* check again, a producer might have pushed an element since we
last checked. However if we still find
nothing, because this comes after [consumer_waiting:=true],
any producer arriving after that will know to wake us up. *)
(match Mpsc_bag.pop_all self.q with
| Some l ->
A.set self.consumer_waiting false;
Mutex.unlock self.mutex;
l
| None ->
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
Condition.wait self.cond self.mutex;
A.set self.consumer_waiting false;
Mutex.unlock self.mutex;
pop_all self)

View file

@ -1,18 +0,0 @@
(** Basic Blocking Queue *)
type 'a t
val create : unit -> _ t
exception Closed
val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] into [q], and returns [()].
@raise Closed if [close q] was previously called.*)
val pop_all : 'a t -> 'a list
(** [pop_all bq] returns all items presently in [bq], in the same order, and
clears [bq]. It blocks if no element is in [bq]. *)
val close : _ t -> unit
(** Close the queue, meaning there won't be any more [push] allowed. *)

View file

@ -1,32 +0,0 @@
module A = Trace_core.Internal_.Atomic_
type 'a t = { bag: 'a list A.t } [@@unboxed]
let create () =
let bag = A.make [] in
{ bag }
module Backoff = struct
type t = int
let default = 2
let once (b : t) : t =
for _i = 1 to b do
Domain_util.cpu_relax ()
done;
min (b * 2) 256
end
let rec add backoff t x =
let before = A.get t.bag in
let after = x :: before in
if not (A.compare_and_set t.bag before after) then
add (Backoff.once backoff) t x
let[@inline] add t x = add Backoff.default t x
let[@inline] pop_all t : _ list option =
match A.exchange t.bag [] with
| [] -> None
| l -> Some (List.rev l)

View file

@ -1,11 +0,0 @@
(** A multi-producer, single-consumer bag *)
type 'a t
val create : unit -> 'a t
val add : 'a t -> 'a -> unit
(** [add q x] adds [x] in the bag. *)
val pop_all : 'a t -> 'a list option
(** Return all current items in the insertion order. *)

69
src/util/rpool.ml Normal file
View file

@ -0,0 +1,69 @@
(** A resource pool (for buffers) *)
open struct
module A = Trace_core.Internal_.Atomic_
end
module List_with_len = struct
type +'a t =
| Nil
| Cons of int * 'a * 'a t
let empty : _ t = Nil
let[@inline] len = function
| Nil -> 0
| Cons (i, _, _) -> i
let[@inline] cons x self = Cons (len self + 1, x, self)
end
type 'a t = {
max_size: int;
create: unit -> 'a;
clear: 'a -> unit;
cached: 'a List_with_len.t A.t;
}
let create ~max_size ~create ~clear () : _ t =
{ max_size; create; clear; cached = A.make List_with_len.empty }
let alloc (type a) (self : a t) : a =
let module M = struct
exception Found of a
end in
try
while
match A.get self.cached with
| Nil -> false
| Cons (_, x, tl) as old ->
if A.compare_and_set self.cached old tl then
raise_notrace (M.Found x)
else
true
do
()
done;
self.create ()
with M.Found x -> x
let recycle (self : 'a t) (x : 'a) : unit =
self.clear x;
while
match A.get self.cached with
| Cons (i, _, _) when i >= self.max_size -> false (* drop buf *)
| old -> not (A.compare_and_set self.cached old (List_with_len.cons x old))
do
()
done
let with_ (self : 'a t) f =
let x = alloc self in
try
let res = f x in
recycle self x;
res
with e ->
let bt = Printexc.get_raw_backtrace () in
recycle self x;
Printexc.raise_with_backtrace e bt