From b9ee0d71a177ab41884b295fe6673dba8252c856 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 8 Jun 2023 00:19:37 -0400 Subject: [PATCH] add `Chan`, an atomic unbounded channel --- src/chan.ml | 172 +++++++++++++++++++++++++++++++++++++++++++++++ src/chan.mli | 31 +++++++++ src/moonpool.ml | 1 + src/moonpool.mli | 1 + 4 files changed, 205 insertions(+) create mode 100644 src/chan.ml create mode 100644 src/chan.mli diff --git a/src/chan.ml b/src/chan.ml new file mode 100644 index 00000000..7042e1da --- /dev/null +++ b/src/chan.ml @@ -0,0 +1,172 @@ +module A = Atomic_ + +type 'a or_error = 'a Fut.or_error +type 'a waiter = 'a Fut.promise + +let[@inline] list_is_empty_ = function + | [] -> true + | _ :: _ -> false + +(** Simple functional queue *) +module Q : sig + type 'a t + + val return : 'a -> 'a t + val is_empty : _ t -> bool + + exception Empty + + val pop_exn : 'a t -> 'a * 'a t + val push : 'a t -> 'a -> 'a t +end = struct + type 'a t = { + hd: 'a list; + tl: 'a list; + } + (** Queue containing elements of type 'a. + + invariant: if hd=[], then tl=[] *) + + let[@inline] return x : _ t = { hd = [ x ]; tl = [] } + + let make_ hd tl = + match hd with + | [] -> { hd = List.rev tl; tl = [] } + | _ :: _ -> { hd; tl } + + let[@inline] is_empty q = list_is_empty_ q.hd + let[@inline] push q x : _ t = make_ q.hd (x :: q.tl) + + exception Empty + + let pop_exn q = + match q.hd with + | [] -> + assert (list_is_empty_ q.tl); + raise Empty + | x :: hd' -> + let q' = make_ hd' q.tl in + x, q' +end + +exception Closed + +type 'a state = + | Empty + | St_closed + | Elems of 'a Q.t + | Waiters of 'a waiter list + +type 'a t = { st: 'a state A.t } [@@unboxed] + +let create () : _ t = { st = A.make Empty } + +let[@inline] mk_st_waiters_ l = + match l with + | [] -> Empty + | _ -> Waiters l + +let[@inline] mk_st_elems_ q = + if Q.is_empty q then + Empty + else + Elems q + +let push (self : _ t) x : unit = + while + let old_st = A.get self.st in + match old_st with + | St_closed -> raise Closed + | Empty -> not (A.compare_and_set self.st old_st (Elems (Q.return x))) + | Waiters [] -> assert false + | Waiters (w :: waiters') -> + let new_st = mk_st_waiters_ waiters' in + if A.compare_and_set self.st old_st new_st then ( + Fut.fulfill w (Ok x); + false + ) else + true + | Elems q -> not (A.compare_and_set self.st old_st (Elems (Q.push q x))) + do + Domain_.relax () + done + +let try_pop (type elt) self : elt option = + let module M = struct + exception Found of elt + end in + try + for _i = 1 to 10 do + let old_st = A.get self.st in + match old_st with + | Elems q -> + let x, q' = Q.pop_exn q in + let new_st = mk_st_elems_ q' in + if A.compare_and_set self.st old_st new_st then + raise_notrace (M.Found x) + else + Domain_.relax () + | _ -> raise_notrace Exit + done; + None + with + | M.Found x -> Some x + | Exit -> None + +let pop (type elt) (self : _ t) : elt Fut.t = + let module M = struct + exception Ret of elt + exception Fut of elt Fut.t + end in + try + while + let old_st = A.get self.st in + (match old_st with + | St_closed -> + let bt = Printexc.get_callstack 10 in + raise_notrace (M.Fut (Fut.fail Closed bt)) + | Elems q -> + let x, q' = Q.pop_exn q in + let new_st = mk_st_elems_ q' in + if A.compare_and_set self.st old_st new_st then raise_notrace (M.Ret x) + | Empty -> + let fut, promise = Fut.make () in + let new_st = Waiters [ promise ] in + if A.compare_and_set self.st old_st new_st then + raise_notrace (M.Fut fut) + | Waiters ws -> + let fut, promise = Fut.make () in + let new_st = Waiters (promise :: ws) in + if A.compare_and_set self.st old_st new_st then + raise_notrace (M.Fut fut)); + true + do + Domain_.relax () + done; + assert false + with + | M.Ret x -> Fut.return x + | M.Fut f -> f + +let pop_block_exn (self : 'a t) : 'a = + match try_pop self with + | Some x -> x + | None -> Fut.wait_block_exn @@ pop self + +let close (self : _ t) : unit = + while + let old_st = A.get self.st in + match old_st with + | St_closed -> false + | Elems _ | Empty -> not (A.compare_and_set self.st old_st St_closed) + | Waiters ws -> + if A.compare_and_set self.st old_st St_closed then ( + (* fail all waiters *) + let bt = Printexc.get_callstack 10 in + List.iter (fun w -> Fut.fulfill_idempotent w (Error (Closed, bt))) ws; + false + ) else + true + do + Domain_.relax () + done diff --git a/src/chan.mli b/src/chan.mli new file mode 100644 index 00000000..80679aeb --- /dev/null +++ b/src/chan.mli @@ -0,0 +1,31 @@ +(** Channels. + + Channels are pipelines of values where threads can push into + one end, and pull from the other end. *) + +type 'a or_error = 'a Fut.or_error + +type 'a t +(** Channel carrying values of type ['a] *) + +val create : unit -> 'a t +(** Create a channel. *) + +exception Closed + +val push : 'a t -> 'a -> unit +(** [push chan x] pushes [x] into [chan]. + @raise Closed if the channel is closed. *) + +val pop : 'a t -> 'a Fut.t +(** Pop an element. + @raise Closed if the channel is closed. *) + +val try_pop : 'a t -> 'a option + +val pop_block_exn : 'a t -> 'a +(** Like [pop], but block if an element is not available immediately. *) + +val close : _ t -> unit +(** Close the channel. Further push and pop calls will fail. + This is idempotent. *) diff --git a/src/moonpool.ml b/src/moonpool.ml index 109cc500..ab1a25ed 100644 --- a/src/moonpool.ml +++ b/src/moonpool.ml @@ -4,3 +4,4 @@ let start_thread_on_some_domain f x = module Pool = Pool module Fut = Fut +module Chan = Chan diff --git a/src/moonpool.mli b/src/moonpool.mli index a96f07e8..172a5b2f 100644 --- a/src/moonpool.mli +++ b/src/moonpool.mli @@ -12,3 +12,4 @@ val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t to run all the various threads needed in an application (timers, event loops, etc.) *) module Fut = Fut +module Chan = Chan