feat util: add Aswitch and Util_atomic

for shutdown processes, it's really preferable to use level-triggered
primitives rather than edge-triggered callbacks. Switch is fairly
robust. It's named Aswitch here, "A" means atomic and is also used to
avoid name collision with Eio.

Util_atomic provides a convenience CAS loop, with backoff.
This commit is contained in:
Simon Cruanes 2025-12-08 15:33:43 -05:00
parent 61f17fa6ce
commit ee91fa4a45
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
7 changed files with 144 additions and 3 deletions

View file

@ -12,12 +12,14 @@ let[@inline] is_empty self : bool =
let get = Atomic.get
let add self x =
let backoff = ref 1 in
while
let old = Atomic.get self in
let l' = x :: old in
not (Atomic.compare_and_set self old l')
do
()
Opentelemetry_domain.relax_loop !backoff;
backoff := min 128 (2 * !backoff)
done
let rec pop_all self =

66
src/util/aswitch.ml Normal file
View file

@ -0,0 +1,66 @@
open Opentelemetry_atomic
module Int_map = Map.Make (struct
type t = int
let compare = compare
end)
type cb = unit -> unit
type state =
| On of {
n: int;
m: cb Int_map.t; (** removable callbacks *)
l: cb list;
}
| Off
type t = { st: state Atomic.t } [@@unboxed]
type trigger = t
let dummy : t = { st = Atomic.make Off }
let on_turn_off (self : t) (f : cb) : unit =
let must_fire =
Util_atomic.update_cas self.st @@ function
| Off -> true, Off
| On r -> false, On { r with l = f :: r.l }
in
if must_fire then (* call now *) f ()
let turn_off' self =
(* When calling turn_off' from a signal handler, Trace.message may cause the thread
to be killed. For this reason, we provide a way to disable tracing here. *)
match Atomic.exchange self.st Off with
| Off -> `Was_off
| On { l; m; n = _ } ->
List.iter (fun f -> f ()) l;
Int_map.iter (fun _ f -> f ()) m;
`Was_on
let[@inline] turn_off self = ignore (turn_off' self : [> `Was_on ])
let[@inline] link parent tr : unit = on_turn_off parent (fun () -> turn_off tr)
let create ?parent () : t * trigger =
let self = { st = Atomic.make (On { l = []; n = 0; m = Int_map.empty }) } in
(* if there's a parent, turning the parent off must turn us off too *)
Option.iter (fun p -> link p self) parent;
self, self
let[@inline] is_on self : bool =
match Atomic.get self.st with
| On _ -> true
| Off -> false
let[@inline] is_off self = not (is_on self)
let show self = Printf.sprintf "<switch on=%B>" (is_on self)
let pp out self = Format.fprintf out "<switch on=%B>" (is_on self)
module Unsafe = struct
let trigger_of_switch = Fun.id
end

50
src/util/aswitch.mli Normal file
View file

@ -0,0 +1,50 @@
(** Aswitch for level-triggered cancellation and cleanup, atomically.
A switch can be flipped to false once, and remains off forever afterwards.
Inspired from https://ocsigen.org/lwt/5.5.0/api/Lwt_switch but thread-safe.
*)
type t
type trigger
(** Can be used to turn the switch off *)
val pp : Format.formatter -> t -> unit
val show : t -> string
val create : ?parent:t -> unit -> t * trigger
(** New switch.
@param parent
inherit from this switch. It means that the result switches off if the
parent does, but conversely we can turn the result off without affecting
the parent. In other words, this switch's lifetime is a subset of the
parent's lifetime *)
val on_turn_off : t -> (unit -> unit) -> unit
(** [on_turn_off sw f] will call [f()] when [sw] is turned off. If [sw] is
already off then [f()] is called immediately.
{b NOTE} [f] really should not fail, and should be as fast and light as
possible. *)
val is_on : t -> bool
val is_off : t -> bool
val turn_off : trigger -> unit
val turn_off' : trigger -> [ `Was_off | `Was_on ]
(** Turn off switch, return previous state *)
val link : t -> trigger -> unit
(** [link parent trigger] turns off [trigger] when [parent] is turned off *)
val dummy : t
(** Always off switch *)
module Unsafe : sig
val trigger_of_switch : t -> trigger
[@@alert unsafe "hope you know what you're doing"]
end

View file

@ -7,3 +7,5 @@ let create () : t = { cbs = Alist.make () }
let[@inline] register self f = Alist.add self.cbs f
let[@inline] trigger self = List.iter (fun f -> f ()) (Alist.get self.cbs)
let clear self : unit = ignore (Alist.pop_all self.cbs : _ list)

View file

@ -1,9 +1,13 @@
(** A collection of callbacks. thread-safe. *)
type t
(** Thread safe set of callbacks *)
val create : unit -> t
val register : t -> (unit -> unit) -> unit
val trigger : t -> unit
val clear : t -> unit
(** Remove all callbacks. *)

View file

@ -2,5 +2,5 @@
(name opentelemetry_util)
(public_name opentelemetry.util)
(flags :standard -open Opentelemetry_atomic)
(libraries opentelemetry.atomic mtime mtime.clock.os)
(synopsis "Utilities for opentelemetry"))
(libraries opentelemetry.atomic opentelemetry.domain mtime mtime.clock.os)
(synopsis "Basic utilities for opentelemetry"))

17
src/util/util_atomic.ml Normal file
View file

@ -0,0 +1,17 @@
module Atomic = Opentelemetry_atomic.Atomic
(** Update loop *)
let update_cas (type res) (self : 'a Atomic.t) (f : 'a -> res * 'a) : res =
let exception Ret of res in
let backoff = ref 1 in
try
while true do
let old_val = Atomic.get self in
let res, new_val = f old_val in
if Atomic.compare_and_set self old_val new_val then
raise_notrace (Ret res);
Opentelemetry_domain.relax_loop !backoff;
backoff := min 128 (2 * !backoff)
done
with Ret r -> r