mirror of
https://github.com/ocaml-tracing/ocaml-trace.git
synced 2026-03-07 18:37:56 -05:00
refactor: avoid some deadlocks in trace-tef
This commit is contained in:
parent
debb0211b7
commit
756ea1d22c
3 changed files with 25 additions and 16 deletions
|
|
@ -38,12 +38,26 @@ let push (self : _ t) x : unit =
|
||||||
|
|
||||||
let rec pop_all (self : 'a t) : 'a list =
|
let rec pop_all (self : 'a t) : 'a list =
|
||||||
match Mpsc_bag.pop_all self.q with
|
match Mpsc_bag.pop_all self.q with
|
||||||
| l -> List.rev l
|
| Some l -> l
|
||||||
| exception Mpsc_bag.Empty ->
|
| None ->
|
||||||
if self.closed then raise Closed;
|
if self.closed then raise Closed;
|
||||||
Mutex.lock self.mutex;
|
Mutex.lock self.mutex;
|
||||||
Atomic.set self.consumer_waiting true;
|
Atomic.set self.consumer_waiting true;
|
||||||
Condition.wait self.cond self.mutex;
|
(* check again, a producer might have pushed an element since we
|
||||||
Atomic.set self.consumer_waiting false;
|
last checked. However if we still find
|
||||||
Mutex.unlock self.mutex;
|
nothing, because this comes after [consumer_waiting:=true],
|
||||||
pop_all self
|
any producer arriving after that will know to wake us up. *)
|
||||||
|
(match Mpsc_bag.pop_all self.q with
|
||||||
|
| Some l ->
|
||||||
|
Atomic.set self.consumer_waiting false;
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
l
|
||||||
|
| None ->
|
||||||
|
if self.closed then (
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
raise Closed
|
||||||
|
);
|
||||||
|
Condition.wait self.cond self.mutex;
|
||||||
|
Atomic.set self.consumer_waiting false;
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
pop_all self)
|
||||||
|
|
|
||||||
|
|
@ -24,9 +24,7 @@ let rec add backoff t x =
|
||||||
|
|
||||||
let[@inline] add t x = add Backoff.default t x
|
let[@inline] add t x = add Backoff.default t x
|
||||||
|
|
||||||
exception Empty
|
let[@inline] pop_all t : _ list option =
|
||||||
|
|
||||||
let[@inline] pop_all t : _ list =
|
|
||||||
match Atomic.exchange t.bag [] with
|
match Atomic.exchange t.bag [] with
|
||||||
| [] -> raise_notrace Empty
|
| [] -> None
|
||||||
| l -> l
|
| l -> Some (List.rev l)
|
||||||
|
|
|
||||||
|
|
@ -7,8 +7,5 @@ val create : unit -> 'a t
|
||||||
val add : 'a t -> 'a -> unit
|
val add : 'a t -> 'a -> unit
|
||||||
(** [add q x] adds [x] in the bag. *)
|
(** [add q x] adds [x] in the bag. *)
|
||||||
|
|
||||||
exception Empty
|
val pop_all : 'a t -> 'a list option
|
||||||
|
(** Return all current items in the insertion order. *)
|
||||||
val pop_all : 'a t -> 'a list
|
|
||||||
(** Return all current items in an unspecified order.
|
|
||||||
@raise Empty if empty *)
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue