From 3fbac328221af1665d7ca3fbd02760058f0f353e Mon Sep 17 00:00:00 2001 From: ajbt200128 Date: Wed, 12 Nov 2025 14:27:36 -0800 Subject: [PATCH] fix: mutex usage and inline bugs --- src/client-ocurl/b_queue.ml | 51 +++++++++++++++++++------------------ src/client/batch.ml | 13 +++++++--- src/core/lock.ml | 8 +++++- 3 files changed, 43 insertions(+), 29 deletions(-) diff --git a/src/client-ocurl/b_queue.ml b/src/client-ocurl/b_queue.ml index 21d6012f..897ff89e 100644 --- a/src/client-ocurl/b_queue.ml +++ b/src/client-ocurl/b_queue.ml @@ -7,6 +7,19 @@ type 'a t = { exception Closed +(* Mutex.protect was added in OCaml 5.1, but we want support back to 4.08 *) +(* cannot inline, otherwise flambda might move code around. (as per Stdlib) *) +let[@inline never] protect m f = + Mutex.lock m; + match f () with + | x -> + Mutex.unlock m; + x + | exception e -> + (* NOTE: [unlock] does not poll for asynchronous exceptions *) + Mutex.unlock m; + Printexc.raise_with_backtrace e (Printexc.get_raw_backtrace ()) + let create () : _ t = { mutex = Mutex.create (); @@ -16,54 +29,42 @@ let create () : _ t = } let close (self : _ t) = - Mutex.lock self.mutex; + protect self.mutex @@ fun () -> if not self.closed then ( self.closed <- true; Condition.broadcast self.cond (* awake waiters so they fail *) - ); - Mutex.unlock self.mutex + ) let push (self : _ t) x : unit = - Mutex.lock self.mutex; - if self.closed then ( - Mutex.unlock self.mutex; + protect self.mutex @@ fun () -> + if self.closed then raise Closed - ) else ( + else ( Queue.push x self.q; - Condition.signal self.cond; - Mutex.unlock self.mutex + Condition.signal self.cond ) let pop (self : 'a t) : 'a = - Mutex.lock self.mutex; let rec loop () = - if self.closed then ( - Mutex.unlock self.mutex; + if self.closed then raise Closed - ) else if Queue.is_empty self.q then ( + else if Queue.is_empty self.q then ( Condition.wait self.cond self.mutex; (loop [@tailcall]) () ) else ( let x = Queue.pop self.q in - Mutex.unlock self.mutex; x ) in - loop () + protect self.mutex loop let pop_all (self : 'a t) into : unit = - Mutex.lock self.mutex; let rec loop () = if Queue.is_empty self.q then ( - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); + if self.closed then raise Closed; Condition.wait self.cond self.mutex; (loop [@tailcall]) () - ) else ( - Queue.transfer self.q into; - Mutex.unlock self.mutex - ) + ) else + Queue.transfer self.q into in - loop () + protect self.mutex loop diff --git a/src/client/batch.ml b/src/client/batch.ml index 2fc7a965..d017355e 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -9,11 +9,18 @@ type 'a t = { mutex: Mutex.t; } -(* Mutex.protect was added in OCaml 5.1, but we want support back to 4.08. - cannot inline, otherwise flambda might move code around. (as per Stdlib) *) +(* Mutex.protect was added in OCaml 5.1, but we want support back to 4.08 *) +(* cannot inline, otherwise flambda might move code around. (as per Stdlib) *) let[@inline never] protect_mutex m f = Mutex.lock m; - Fun.protect f ~finally:(fun () -> Mutex.unlock m) + match f () with + | x -> + Mutex.unlock m; + x + | exception e -> + (* NOTE: [unlock] does not poll for asynchronous exceptions *) + Mutex.unlock m; + Printexc.raise_with_backtrace e (Printexc.get_raw_backtrace ()) let default_high_watermark batch_size = if batch_size = 1 then diff --git a/src/core/lock.ml b/src/core/lock.ml index 0d17c1c3..6ce295bb 100644 --- a/src/core/lock.ml +++ b/src/core/lock.ml @@ -8,4 +8,10 @@ let set_mutex ~lock ~unlock : unit = let[@inline] with_lock f = !lock_ (); - Fun.protect ~finally:!unlock_ f + match f () with + | x -> + !unlock_ (); + x + | exception e -> + !unlock_ (); + Printexc.raise_with_backtrace e (Printexc.get_raw_backtrace ())