mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-10 04:35:46 -04:00
refactor: move the Mutex.protect backport into Util_mutex
This commit is contained in:
parent
1a8f66b49e
commit
e31f5f6aba
6 changed files with 25 additions and 28 deletions
|
|
@ -1,3 +1,5 @@
|
||||||
|
open Opentelemetry.Util_mutex
|
||||||
|
|
||||||
type 'a t = {
|
type 'a t = {
|
||||||
mutex: Mutex.t;
|
mutex: Mutex.t;
|
||||||
cond: Condition.t;
|
cond: Condition.t;
|
||||||
|
|
@ -7,19 +9,6 @@ type 'a t = {
|
||||||
|
|
||||||
exception Closed
|
exception Closed
|
||||||
|
|
||||||
(* Mutex.protect was added in OCaml 5.1, but we want support back to 4.08 *)
|
|
||||||
(* cannot inline, otherwise flambda might move code around. (as per Stdlib) *)
|
|
||||||
let[@inline never] protect m f =
|
|
||||||
Mutex.lock m;
|
|
||||||
match f () with
|
|
||||||
| x ->
|
|
||||||
Mutex.unlock m;
|
|
||||||
x
|
|
||||||
| exception e ->
|
|
||||||
(* NOTE: [unlock] does not poll for asynchronous exceptions *)
|
|
||||||
Mutex.unlock m;
|
|
||||||
Printexc.raise_with_backtrace e (Printexc.get_raw_backtrace ())
|
|
||||||
|
|
||||||
let create () : _ t =
|
let create () : _ t =
|
||||||
{
|
{
|
||||||
mutex = Mutex.create ();
|
mutex = Mutex.create ();
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,5 @@
|
||||||
|
module Otel = Opentelemetry
|
||||||
|
|
||||||
type 'a t = {
|
type 'a t = {
|
||||||
mutable size: int;
|
mutable size: int;
|
||||||
mutable q: 'a list;
|
mutable q: 'a list;
|
||||||
|
|
@ -9,19 +11,6 @@ type 'a t = {
|
||||||
mutex: Mutex.t;
|
mutex: Mutex.t;
|
||||||
}
|
}
|
||||||
|
|
||||||
(* Mutex.protect was added in OCaml 5.1, but we want support back to 4.08 *)
|
|
||||||
(* cannot inline, otherwise flambda might move code around. (as per Stdlib) *)
|
|
||||||
let[@inline never] protect_mutex m f =
|
|
||||||
Mutex.lock m;
|
|
||||||
match f () with
|
|
||||||
| x ->
|
|
||||||
Mutex.unlock m;
|
|
||||||
x
|
|
||||||
| exception e ->
|
|
||||||
(* NOTE: [unlock] does not poll for asynchronous exceptions *)
|
|
||||||
Mutex.unlock m;
|
|
||||||
Printexc.raise_with_backtrace e (Printexc.get_raw_backtrace ())
|
|
||||||
|
|
||||||
let default_high_watermark batch_size =
|
let default_high_watermark batch_size =
|
||||||
if batch_size = 1 then
|
if batch_size = 1 then
|
||||||
100
|
100
|
||||||
|
|
@ -58,7 +47,7 @@ let ready_to_pop ~force ~now self =
|
||||||
|
|
||||||
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
|
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
|
||||||
let rev_batch_opt =
|
let rev_batch_opt =
|
||||||
protect_mutex self.mutex @@ fun () ->
|
Otel.Util_mutex.protect self.mutex @@ fun () ->
|
||||||
if ready_to_pop ~force ~now self then (
|
if ready_to_pop ~force ~now self then (
|
||||||
assert (self.q <> []);
|
assert (self.q <> []);
|
||||||
let batch = self.q in
|
let batch = self.q in
|
||||||
|
|
@ -83,7 +72,7 @@ let rec push_unprotected (self : _ t) ~(elems : _ list) : unit =
|
||||||
push_unprotected self ~elems:xs
|
push_unprotected self ~elems:xs
|
||||||
|
|
||||||
let push (self : _ t) elems : [ `Dropped | `Ok ] =
|
let push (self : _ t) elems : [ `Dropped | `Ok ] =
|
||||||
protect_mutex self.mutex @@ fun () ->
|
Otel.Util_mutex.protect self.mutex @@ fun () ->
|
||||||
if self.size >= self.high_watermark then
|
if self.size >= self.high_watermark then
|
||||||
(* drop this to prevent queue from growing too fast *)
|
(* drop this to prevent queue from growing too fast *)
|
||||||
`Dropped
|
`Dropped
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,5 @@
|
||||||
|
(** A global lock, modifiable by the user *)
|
||||||
|
|
||||||
val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit
|
val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit
|
||||||
(** Set a pair of lock/unlock functions that are used to protect access to
|
(** Set a pair of lock/unlock functions that are used to protect access to
|
||||||
global state, if needed. By default these do nothing. *)
|
global state, if needed. By default these do nothing. *)
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,10 @@ module AList = AList
|
||||||
(** Atomic list, for internal usage
|
(** Atomic list, for internal usage
|
||||||
@since 0.7 *)
|
@since 0.7 *)
|
||||||
|
|
||||||
|
module Util_mutex = Util_mutex
|
||||||
|
(** Utilities for internal usage.
|
||||||
|
@since NEXT_RELEASE *)
|
||||||
|
|
||||||
(** {2 Wire format} *)
|
(** {2 Wire format} *)
|
||||||
|
|
||||||
module Proto = Opentelemetry_proto
|
module Proto = Opentelemetry_proto
|
||||||
|
|
|
||||||
12
src/core/util_mutex.ml
Normal file
12
src/core/util_mutex.ml
Normal file
|
|
@ -0,0 +1,12 @@
|
||||||
|
(* Mutex.protect was added in OCaml 5.1, but we want support back to 4.08 *)
|
||||||
|
(* cannot inline, otherwise flambda might move code around. (as per Stdlib) *)
|
||||||
|
let[@inline never] protect m f =
|
||||||
|
Mutex.lock m;
|
||||||
|
match f () with
|
||||||
|
| x ->
|
||||||
|
Mutex.unlock m;
|
||||||
|
x
|
||||||
|
| exception e ->
|
||||||
|
(* NOTE: [unlock] does not poll for asynchronous exceptions *)
|
||||||
|
Mutex.unlock m;
|
||||||
|
Printexc.raise_with_backtrace e (Printexc.get_raw_backtrace ())
|
||||||
1
src/core/util_mutex.mli
Normal file
1
src/core/util_mutex.mli
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
val protect : Mutex.t -> (unit -> 'a) -> 'a
|
||||||
Loading…
Add table
Reference in a new issue