From ee91fa4a459fe8d237d2807c507f1f13ef19b231 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 8 Dec 2025 15:33:43 -0500 Subject: [PATCH] feat util: add Aswitch and Util_atomic for shutdown processes, it's really preferable to use level-triggered primitives rather than edge-triggered callbacks. Switch is fairly robust. It's named Aswitch here, "A" means atomic and is also used to avoid name collision with Eio. Util_atomic provides a convenience CAS loop, with backoff. --- src/util/alist.ml | 4 ++- src/util/aswitch.ml | 66 +++++++++++++++++++++++++++++++++++++++++ src/util/aswitch.mli | 50 +++++++++++++++++++++++++++++++ src/util/cb_set.ml | 2 ++ src/util/cb_set.mli | 4 +++ src/util/dune | 4 +-- src/util/util_atomic.ml | 17 +++++++++++ 7 files changed, 144 insertions(+), 3 deletions(-) create mode 100644 src/util/aswitch.ml create mode 100644 src/util/aswitch.mli create mode 100644 src/util/util_atomic.ml diff --git a/src/util/alist.ml b/src/util/alist.ml index 356f2630..973a2e31 100644 --- a/src/util/alist.ml +++ b/src/util/alist.ml @@ -12,12 +12,14 @@ let[@inline] is_empty self : bool = let get = Atomic.get let add self x = + let backoff = ref 1 in while let old = Atomic.get self in let l' = x :: old in not (Atomic.compare_and_set self old l') do - () + Opentelemetry_domain.relax_loop !backoff; + backoff := min 128 (2 * !backoff) done let rec pop_all self = diff --git a/src/util/aswitch.ml b/src/util/aswitch.ml new file mode 100644 index 00000000..a7f618af --- /dev/null +++ b/src/util/aswitch.ml @@ -0,0 +1,66 @@ +open Opentelemetry_atomic + +module Int_map = Map.Make (struct + type t = int + + let compare = compare +end) + +type cb = unit -> unit + +type state = + | On of { + n: int; + m: cb Int_map.t; (** removable callbacks *) + l: cb list; + } + | Off + +type t = { st: state Atomic.t } [@@unboxed] + +type trigger = t + +let dummy : t = { st = Atomic.make Off } + +let on_turn_off (self : t) (f : cb) : unit = + let must_fire = + Util_atomic.update_cas self.st @@ function + | Off -> true, Off + | On r -> false, On { r with l = f :: r.l } + in + if must_fire then (* call now *) f () + +let turn_off' self = + (* When calling turn_off' from a signal handler, Trace.message may cause the thread + to be killed. For this reason, we provide a way to disable tracing here. *) + match Atomic.exchange self.st Off with + | Off -> `Was_off + | On { l; m; n = _ } -> + List.iter (fun f -> f ()) l; + Int_map.iter (fun _ f -> f ()) m; + `Was_on + +let[@inline] turn_off self = ignore (turn_off' self : [> `Was_on ]) + +let[@inline] link parent tr : unit = on_turn_off parent (fun () -> turn_off tr) + +let create ?parent () : t * trigger = + let self = { st = Atomic.make (On { l = []; n = 0; m = Int_map.empty }) } in + (* if there's a parent, turning the parent off must turn us off too *) + Option.iter (fun p -> link p self) parent; + self, self + +let[@inline] is_on self : bool = + match Atomic.get self.st with + | On _ -> true + | Off -> false + +let[@inline] is_off self = not (is_on self) + +let show self = Printf.sprintf "" (is_on self) + +let pp out self = Format.fprintf out "" (is_on self) + +module Unsafe = struct + let trigger_of_switch = Fun.id +end diff --git a/src/util/aswitch.mli b/src/util/aswitch.mli new file mode 100644 index 00000000..bc05da33 --- /dev/null +++ b/src/util/aswitch.mli @@ -0,0 +1,50 @@ +(** Aswitch for level-triggered cancellation and cleanup, atomically. + + A switch can be flipped to false once, and remains off forever afterwards. + + Inspired from https://ocsigen.org/lwt/5.5.0/api/Lwt_switch but thread-safe. +*) + +type t + +type trigger +(** Can be used to turn the switch off *) + +val pp : Format.formatter -> t -> unit + +val show : t -> string + +val create : ?parent:t -> unit -> t * trigger +(** New switch. + @param parent + inherit from this switch. It means that the result switches off if the + parent does, but conversely we can turn the result off without affecting + the parent. In other words, this switch's lifetime is a subset of the + parent's lifetime *) + +val on_turn_off : t -> (unit -> unit) -> unit +(** [on_turn_off sw f] will call [f()] when [sw] is turned off. If [sw] is + already off then [f()] is called immediately. + + {b NOTE} [f] really should not fail, and should be as fast and light as + possible. *) + +val is_on : t -> bool + +val is_off : t -> bool + +val turn_off : trigger -> unit + +val turn_off' : trigger -> [ `Was_off | `Was_on ] +(** Turn off switch, return previous state *) + +val link : t -> trigger -> unit +(** [link parent trigger] turns off [trigger] when [parent] is turned off *) + +val dummy : t +(** Always off switch *) + +module Unsafe : sig + val trigger_of_switch : t -> trigger + [@@alert unsafe "hope you know what you're doing"] +end diff --git a/src/util/cb_set.ml b/src/util/cb_set.ml index 78190855..37079bfc 100644 --- a/src/util/cb_set.ml +++ b/src/util/cb_set.ml @@ -7,3 +7,5 @@ let create () : t = { cbs = Alist.make () } let[@inline] register self f = Alist.add self.cbs f let[@inline] trigger self = List.iter (fun f -> f ()) (Alist.get self.cbs) + +let clear self : unit = ignore (Alist.pop_all self.cbs : _ list) diff --git a/src/util/cb_set.mli b/src/util/cb_set.mli index 3a6e2a0c..c64d613d 100644 --- a/src/util/cb_set.mli +++ b/src/util/cb_set.mli @@ -1,9 +1,13 @@ (** A collection of callbacks. thread-safe. *) type t +(** Thread safe set of callbacks *) val create : unit -> t val register : t -> (unit -> unit) -> unit val trigger : t -> unit + +val clear : t -> unit +(** Remove all callbacks. *) diff --git a/src/util/dune b/src/util/dune index 1348fe68..32ca575a 100644 --- a/src/util/dune +++ b/src/util/dune @@ -2,5 +2,5 @@ (name opentelemetry_util) (public_name opentelemetry.util) (flags :standard -open Opentelemetry_atomic) - (libraries opentelemetry.atomic mtime mtime.clock.os) - (synopsis "Utilities for opentelemetry")) + (libraries opentelemetry.atomic opentelemetry.domain mtime mtime.clock.os) + (synopsis "Basic utilities for opentelemetry")) diff --git a/src/util/util_atomic.ml b/src/util/util_atomic.ml new file mode 100644 index 00000000..7fbbc0fa --- /dev/null +++ b/src/util/util_atomic.ml @@ -0,0 +1,17 @@ +module Atomic = Opentelemetry_atomic.Atomic + +(** Update loop *) +let update_cas (type res) (self : 'a Atomic.t) (f : 'a -> res * 'a) : res = + let exception Ret of res in + let backoff = ref 1 in + try + while true do + let old_val = Atomic.get self in + let res, new_val = f old_val in + if Atomic.compare_and_set self old_val new_val then + raise_notrace (Ret res); + + Opentelemetry_domain.relax_loop !backoff; + backoff := min 128 (2 * !backoff) + done + with Ret r -> r