From 4454975a616d610a23dd790f2a7c8faf275a71fd Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 7 May 2025 13:06:55 -0400 Subject: [PATCH] feat util: remove b_queue, add Rpool to be used in various buffer pools. --- src/util/b_queue.ml | 65 ---------------------------------------- src/util/b_queue.mli | 18 ----------- src/util/mpsc_bag.ml | 32 -------------------- src/util/mpsc_bag.mli | 11 ------- src/util/rpool.ml | 69 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 69 insertions(+), 126 deletions(-) delete mode 100644 src/util/b_queue.ml delete mode 100644 src/util/b_queue.mli delete mode 100644 src/util/mpsc_bag.ml delete mode 100644 src/util/mpsc_bag.mli create mode 100644 src/util/rpool.ml diff --git a/src/util/b_queue.ml b/src/util/b_queue.ml deleted file mode 100644 index f5ee5f3..0000000 --- a/src/util/b_queue.ml +++ /dev/null @@ -1,65 +0,0 @@ -module A = Trace_core.Internal_.Atomic_ - -type 'a t = { - mutex: Mutex.t; - cond: Condition.t; - q: 'a Mpsc_bag.t; - mutable closed: bool; - consumer_waiting: bool A.t; -} - -exception Closed - -let create () : _ t = - { - mutex = Mutex.create (); - cond = Condition.create (); - q = Mpsc_bag.create (); - closed = false; - consumer_waiting = A.make false; - } - -let close (self : _ t) = - Mutex.lock self.mutex; - if not self.closed then ( - self.closed <- true; - Condition.broadcast self.cond (* awake waiters so they fail *) - ); - Mutex.unlock self.mutex - -let push (self : _ t) x : unit = - if self.closed then raise Closed; - Mpsc_bag.add self.q x; - if self.closed then raise Closed; - if A.get self.consumer_waiting then ( - (* wakeup consumer *) - Mutex.lock self.mutex; - Condition.broadcast self.cond; - Mutex.unlock self.mutex - ) - -let rec pop_all (self : 'a t) : 'a list = - match Mpsc_bag.pop_all self.q with - | Some l -> l - | None -> - if self.closed then raise Closed; - Mutex.lock self.mutex; - A.set self.consumer_waiting true; - (* check again, a producer might have pushed an element since we - last checked. However if we still find - nothing, because this comes after [consumer_waiting:=true], - any producer arriving after that will know to wake us up. *) - (match Mpsc_bag.pop_all self.q with - | Some l -> - A.set self.consumer_waiting false; - Mutex.unlock self.mutex; - l - | None -> - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); - Condition.wait self.cond self.mutex; - A.set self.consumer_waiting false; - Mutex.unlock self.mutex; - pop_all self) diff --git a/src/util/b_queue.mli b/src/util/b_queue.mli deleted file mode 100644 index 1fb8f5c..0000000 --- a/src/util/b_queue.mli +++ /dev/null @@ -1,18 +0,0 @@ -(** Basic Blocking Queue *) - -type 'a t - -val create : unit -> _ t - -exception Closed - -val push : 'a t -> 'a -> unit -(** [push q x] pushes [x] into [q], and returns [()]. - @raise Closed if [close q] was previously called.*) - -val pop_all : 'a t -> 'a list -(** [pop_all bq] returns all items presently in [bq], in the same order, and - clears [bq]. It blocks if no element is in [bq]. *) - -val close : _ t -> unit -(** Close the queue, meaning there won't be any more [push] allowed. *) diff --git a/src/util/mpsc_bag.ml b/src/util/mpsc_bag.ml deleted file mode 100644 index 02aeadf..0000000 --- a/src/util/mpsc_bag.ml +++ /dev/null @@ -1,32 +0,0 @@ -module A = Trace_core.Internal_.Atomic_ - -type 'a t = { bag: 'a list A.t } [@@unboxed] - -let create () = - let bag = A.make [] in - { bag } - -module Backoff = struct - type t = int - - let default = 2 - - let once (b : t) : t = - for _i = 1 to b do - Domain_util.cpu_relax () - done; - min (b * 2) 256 -end - -let rec add backoff t x = - let before = A.get t.bag in - let after = x :: before in - if not (A.compare_and_set t.bag before after) then - add (Backoff.once backoff) t x - -let[@inline] add t x = add Backoff.default t x - -let[@inline] pop_all t : _ list option = - match A.exchange t.bag [] with - | [] -> None - | l -> Some (List.rev l) diff --git a/src/util/mpsc_bag.mli b/src/util/mpsc_bag.mli deleted file mode 100644 index edb5dba..0000000 --- a/src/util/mpsc_bag.mli +++ /dev/null @@ -1,11 +0,0 @@ -(** A multi-producer, single-consumer bag *) - -type 'a t - -val create : unit -> 'a t - -val add : 'a t -> 'a -> unit -(** [add q x] adds [x] in the bag. *) - -val pop_all : 'a t -> 'a list option -(** Return all current items in the insertion order. *) diff --git a/src/util/rpool.ml b/src/util/rpool.ml new file mode 100644 index 0000000..2a5446c --- /dev/null +++ b/src/util/rpool.ml @@ -0,0 +1,69 @@ +(** A resource pool (for buffers) *) + +open struct + module A = Trace_core.Internal_.Atomic_ +end + +module List_with_len = struct + type +'a t = + | Nil + | Cons of int * 'a * 'a t + + let empty : _ t = Nil + + let[@inline] len = function + | Nil -> 0 + | Cons (i, _, _) -> i + + let[@inline] cons x self = Cons (len self + 1, x, self) +end + +type 'a t = { + max_size: int; + create: unit -> 'a; + clear: 'a -> unit; + cached: 'a List_with_len.t A.t; +} + +let create ~max_size ~create ~clear () : _ t = + { max_size; create; clear; cached = A.make List_with_len.empty } + +let alloc (type a) (self : a t) : a = + let module M = struct + exception Found of a + end in + try + while + match A.get self.cached with + | Nil -> false + | Cons (_, x, tl) as old -> + if A.compare_and_set self.cached old tl then + raise_notrace (M.Found x) + else + true + do + () + done; + self.create () + with M.Found x -> x + +let recycle (self : 'a t) (x : 'a) : unit = + self.clear x; + while + match A.get self.cached with + | Cons (i, _, _) when i >= self.max_size -> false (* drop buf *) + | old -> not (A.compare_and_set self.cached old (List_with_len.cons x old)) + do + () + done + +let with_ (self : 'a t) f = + let x = alloc self in + try + let res = f x in + recycle self x; + res + with e -> + let bt = Printexc.get_raw_backtrace () in + recycle self x; + Printexc.raise_with_backtrace e bt