diff --git a/src/chan.ml b/src/chan.ml new file mode 100644 index 00000000..348a5801 --- /dev/null +++ b/src/chan.ml @@ -0,0 +1,184 @@ +module A = Atomic_ + +type 'a or_error = 'a Fut.or_error +type 'a waiter = 'a Fut.promise + +let[@inline] list_is_empty_ = function + | [] -> true + | _ :: _ -> false + +(** Simple functional queue *) +module Q : sig + type 'a t + + val return : 'a -> 'a t + val is_empty : _ t -> bool + + exception Empty + + val pop_exn : 'a t -> 'a * 'a t + val push : 'a t -> 'a -> 'a t + val iter : ('a -> unit) -> 'a t -> unit +end = struct + type 'a t = { + hd: 'a list; + tl: 'a list; + } + (** Queue containing elements of type 'a. + + invariant: if hd=[], then tl=[] *) + + let[@inline] return x : _ t = { hd = [ x ]; tl = [] } + + let[@inline] make_ hd tl = + match hd with + | [] -> { hd = List.rev tl; tl = [] } + | _ :: _ -> { hd; tl } + + let[@inline] is_empty self = list_is_empty_ self.hd + let[@inline] push self x : _ t = make_ self.hd (x :: self.tl) + + let iter f (self : _ t) : unit = + List.iter f self.hd; + List.iter f self.tl + + exception Empty + + let pop_exn self = + match self.hd with + | [] -> + assert (list_is_empty_ self.tl); + raise Empty + | x :: hd' -> + let self' = make_ hd' self.tl in + x, self' +end + +exception Closed + +type 'a state = + | Empty + | St_closed + | Elems of 'a Q.t + | Waiters of 'a waiter Q.t + +type 'a t = { st: 'a state A.t } [@@unboxed] + +let create () : _ t = { st = A.make Empty } + +(** Produce a state from a queue of waiters *) +let[@inline] mk_st_waiters_ ws : _ state = + if Q.is_empty ws then + Empty + else + Waiters ws + +(** Produce a state from a queue of elements *) +let[@inline] mk_st_elems_ q : _ state = + if Q.is_empty q then + Empty + else + Elems q + +let push (self : _ t) x : unit = + while + let old_st = A.get self.st in + match old_st with + | St_closed -> raise Closed + | Empty -> not (A.compare_and_set self.st old_st (Elems (Q.return x))) + | Waiters ws -> + (* awake first waiter and give it [x] *) + let w, ws' = Q.pop_exn ws in + let new_st = mk_st_waiters_ ws' in + if A.compare_and_set self.st old_st new_st then ( + Fut.fulfill w (Ok x); + false + ) else + true + | Elems q -> not (A.compare_and_set self.st old_st (Elems (Q.push q x))) + do + Domain_.relax () + done + +let try_pop (type elt) self : elt option = + let module M = struct + exception Found of elt + end in + try + (* a bit of spinning *) + for _retry = 1 to 10 do + let old_st = A.get self.st in + match old_st with + | Elems q -> + let x, q' = Q.pop_exn q in + let new_st = mk_st_elems_ q' in + if A.compare_and_set self.st old_st new_st then + raise_notrace (M.Found x) + else + Domain_.relax () + | _ -> raise_notrace Exit + done; + None + with + | M.Found x -> Some x + | Exit -> None + +let pop (type elt) (self : _ t) : elt Fut.t = + let module M = struct + exception Ret of elt + exception Fut of elt Fut.t + end in + try + while + let old_st = A.get self.st in + (match old_st with + | St_closed -> + let bt = Printexc.get_callstack 10 in + raise_notrace (M.Fut (Fut.fail Closed bt)) + | Elems q -> + let x, q' = Q.pop_exn q in + let new_st = mk_st_elems_ q' in + if A.compare_and_set self.st old_st new_st then raise_notrace (M.Ret x) + | Empty -> + let fut, promise = Fut.make () in + let new_st = Waiters (Q.return promise) in + if A.compare_and_set self.st old_st new_st then + raise_notrace (M.Fut fut) + | Waiters ws -> + let fut, promise = Fut.make () in + (* add new promise at the end of the queue of waiters *) + let new_st = Waiters (Q.push ws promise) in + if A.compare_and_set self.st old_st new_st then + raise_notrace (M.Fut fut)); + true + do + Domain_.relax () + done; + (* never reached *) + assert false + with + | M.Ret x -> Fut.return x + | M.Fut f -> f + +let pop_block_exn (self : 'a t) : 'a = + match try_pop self with + | Some x -> x + | None -> Fut.wait_block_exn @@ pop self + +let close (self : _ t) : unit = + while + let old_st = A.get self.st in + match old_st with + | St_closed -> false (* exit *) + | Elems _ | Empty -> not (A.compare_and_set self.st old_st St_closed) + | Waiters ws -> + if A.compare_and_set self.st old_st St_closed then ( + (* fail all waiters with [Closed]. *) + let bt = Printexc.get_callstack 10 in + Q.iter (fun w -> Fut.fulfill_idempotent w (Error (Closed, bt))) ws; + false + ) else + true + do + Domain_.relax () + done diff --git a/src/chan.mli b/src/chan.mli new file mode 100644 index 00000000..c36ee91a --- /dev/null +++ b/src/chan.mli @@ -0,0 +1,43 @@ +(** Channels. + + Channels are pipelines of values where threads can push into + one end, and pull from the other end. + + Unlike {!Moonpool.Blocking_queue}, channels are designed so + that pushing never blocks, and pop'ing values returns a future. + + @since 0.3 +*) + +type 'a or_error = 'a Fut.or_error + +type 'a t +(** Channel carrying values of type ['a]. *) + +val create : unit -> 'a t +(** Create a channel. *) + +exception Closed + +val push : 'a t -> 'a -> unit +(** [push chan x] pushes [x] into [chan]. This does not block. + @raise Closed if the channel is closed. *) + +val pop : 'a t -> 'a Fut.t +(** Pop an element. This returns a future that will be + fulfilled when an element is available. + @raise Closed if the channel is closed, or fails the future + if the channel is closed before an element is available for it. *) + +val try_pop : 'a t -> 'a option +(** [try_pop chan] pops and return an element if one is available + immediately. Otherwise it returns [None]. *) + +val pop_block_exn : 'a t -> 'a +(** Like [pop], but blocks if an element is not available immediately. + The precautions around blocking from inside a thread pool + are the same as explained in {!Fut.wait_block}. *) + +val close : _ t -> unit +(** Close the channel. Further push and pop calls will fail. + This is idempotent. *) diff --git a/src/moonpool.ml b/src/moonpool.ml index 8bd5d50e..37fabde7 100644 --- a/src/moonpool.ml +++ b/src/moonpool.ml @@ -2,7 +2,8 @@ let start_thread_on_some_domain f x = let did = Random.int (D_pool_.n_domains ()) in D_pool_.run_on_and_wait did (fun () -> Thread.create f x) -module Pool = Pool -module Fut = Fut -module Blocking_queue = Bb_queue module Atomic = Atomic_ +module Blocking_queue = Bb_queue +module Chan = Chan +module Fut = Fut +module Pool = Pool diff --git a/src/moonpool.mli b/src/moonpool.mli index 90362198..e276b11e 100644 --- a/src/moonpool.mli +++ b/src/moonpool.mli @@ -12,6 +12,7 @@ val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t to run all the various threads needed in an application (timers, event loops, etc.) *) module Fut = Fut +module Chan = Chan (** A simple blocking queue.