mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 11:15:38 -05:00
feat: add Chan again
This commit is contained in:
parent
22ccb76d66
commit
1e21157e8a
4 changed files with 232 additions and 3 deletions
184
src/chan.ml
Normal file
184
src/chan.ml
Normal file
|
|
@ -0,0 +1,184 @@
|
||||||
|
module A = Atomic_
|
||||||
|
|
||||||
|
type 'a or_error = 'a Fut.or_error
|
||||||
|
type 'a waiter = 'a Fut.promise
|
||||||
|
|
||||||
|
let[@inline] list_is_empty_ = function
|
||||||
|
| [] -> true
|
||||||
|
| _ :: _ -> false
|
||||||
|
|
||||||
|
(** Simple functional queue *)
|
||||||
|
module Q : sig
|
||||||
|
type 'a t
|
||||||
|
|
||||||
|
val return : 'a -> 'a t
|
||||||
|
val is_empty : _ t -> bool
|
||||||
|
|
||||||
|
exception Empty
|
||||||
|
|
||||||
|
val pop_exn : 'a t -> 'a * 'a t
|
||||||
|
val push : 'a t -> 'a -> 'a t
|
||||||
|
val iter : ('a -> unit) -> 'a t -> unit
|
||||||
|
end = struct
|
||||||
|
type 'a t = {
|
||||||
|
hd: 'a list;
|
||||||
|
tl: 'a list;
|
||||||
|
}
|
||||||
|
(** Queue containing elements of type 'a.
|
||||||
|
|
||||||
|
invariant: if hd=[], then tl=[] *)
|
||||||
|
|
||||||
|
let[@inline] return x : _ t = { hd = [ x ]; tl = [] }
|
||||||
|
|
||||||
|
let[@inline] make_ hd tl =
|
||||||
|
match hd with
|
||||||
|
| [] -> { hd = List.rev tl; tl = [] }
|
||||||
|
| _ :: _ -> { hd; tl }
|
||||||
|
|
||||||
|
let[@inline] is_empty self = list_is_empty_ self.hd
|
||||||
|
let[@inline] push self x : _ t = make_ self.hd (x :: self.tl)
|
||||||
|
|
||||||
|
let iter f (self : _ t) : unit =
|
||||||
|
List.iter f self.hd;
|
||||||
|
List.iter f self.tl
|
||||||
|
|
||||||
|
exception Empty
|
||||||
|
|
||||||
|
let pop_exn self =
|
||||||
|
match self.hd with
|
||||||
|
| [] ->
|
||||||
|
assert (list_is_empty_ self.tl);
|
||||||
|
raise Empty
|
||||||
|
| x :: hd' ->
|
||||||
|
let self' = make_ hd' self.tl in
|
||||||
|
x, self'
|
||||||
|
end
|
||||||
|
|
||||||
|
exception Closed
|
||||||
|
|
||||||
|
type 'a state =
|
||||||
|
| Empty
|
||||||
|
| St_closed
|
||||||
|
| Elems of 'a Q.t
|
||||||
|
| Waiters of 'a waiter Q.t
|
||||||
|
|
||||||
|
type 'a t = { st: 'a state A.t } [@@unboxed]
|
||||||
|
|
||||||
|
let create () : _ t = { st = A.make Empty }
|
||||||
|
|
||||||
|
(** Produce a state from a queue of waiters *)
|
||||||
|
let[@inline] mk_st_waiters_ ws : _ state =
|
||||||
|
if Q.is_empty ws then
|
||||||
|
Empty
|
||||||
|
else
|
||||||
|
Waiters ws
|
||||||
|
|
||||||
|
(** Produce a state from a queue of elements *)
|
||||||
|
let[@inline] mk_st_elems_ q : _ state =
|
||||||
|
if Q.is_empty q then
|
||||||
|
Empty
|
||||||
|
else
|
||||||
|
Elems q
|
||||||
|
|
||||||
|
let push (self : _ t) x : unit =
|
||||||
|
while
|
||||||
|
let old_st = A.get self.st in
|
||||||
|
match old_st with
|
||||||
|
| St_closed -> raise Closed
|
||||||
|
| Empty -> not (A.compare_and_set self.st old_st (Elems (Q.return x)))
|
||||||
|
| Waiters ws ->
|
||||||
|
(* awake first waiter and give it [x] *)
|
||||||
|
let w, ws' = Q.pop_exn ws in
|
||||||
|
let new_st = mk_st_waiters_ ws' in
|
||||||
|
if A.compare_and_set self.st old_st new_st then (
|
||||||
|
Fut.fulfill w (Ok x);
|
||||||
|
false
|
||||||
|
) else
|
||||||
|
true
|
||||||
|
| Elems q -> not (A.compare_and_set self.st old_st (Elems (Q.push q x)))
|
||||||
|
do
|
||||||
|
Domain_.relax ()
|
||||||
|
done
|
||||||
|
|
||||||
|
let try_pop (type elt) self : elt option =
|
||||||
|
let module M = struct
|
||||||
|
exception Found of elt
|
||||||
|
end in
|
||||||
|
try
|
||||||
|
(* a bit of spinning *)
|
||||||
|
for _retry = 1 to 10 do
|
||||||
|
let old_st = A.get self.st in
|
||||||
|
match old_st with
|
||||||
|
| Elems q ->
|
||||||
|
let x, q' = Q.pop_exn q in
|
||||||
|
let new_st = mk_st_elems_ q' in
|
||||||
|
if A.compare_and_set self.st old_st new_st then
|
||||||
|
raise_notrace (M.Found x)
|
||||||
|
else
|
||||||
|
Domain_.relax ()
|
||||||
|
| _ -> raise_notrace Exit
|
||||||
|
done;
|
||||||
|
None
|
||||||
|
with
|
||||||
|
| M.Found x -> Some x
|
||||||
|
| Exit -> None
|
||||||
|
|
||||||
|
let pop (type elt) (self : _ t) : elt Fut.t =
|
||||||
|
let module M = struct
|
||||||
|
exception Ret of elt
|
||||||
|
exception Fut of elt Fut.t
|
||||||
|
end in
|
||||||
|
try
|
||||||
|
while
|
||||||
|
let old_st = A.get self.st in
|
||||||
|
(match old_st with
|
||||||
|
| St_closed ->
|
||||||
|
let bt = Printexc.get_callstack 10 in
|
||||||
|
raise_notrace (M.Fut (Fut.fail Closed bt))
|
||||||
|
| Elems q ->
|
||||||
|
let x, q' = Q.pop_exn q in
|
||||||
|
let new_st = mk_st_elems_ q' in
|
||||||
|
if A.compare_and_set self.st old_st new_st then raise_notrace (M.Ret x)
|
||||||
|
| Empty ->
|
||||||
|
let fut, promise = Fut.make () in
|
||||||
|
let new_st = Waiters (Q.return promise) in
|
||||||
|
if A.compare_and_set self.st old_st new_st then
|
||||||
|
raise_notrace (M.Fut fut)
|
||||||
|
| Waiters ws ->
|
||||||
|
let fut, promise = Fut.make () in
|
||||||
|
(* add new promise at the end of the queue of waiters *)
|
||||||
|
let new_st = Waiters (Q.push ws promise) in
|
||||||
|
if A.compare_and_set self.st old_st new_st then
|
||||||
|
raise_notrace (M.Fut fut));
|
||||||
|
true
|
||||||
|
do
|
||||||
|
Domain_.relax ()
|
||||||
|
done;
|
||||||
|
(* never reached *)
|
||||||
|
assert false
|
||||||
|
with
|
||||||
|
| M.Ret x -> Fut.return x
|
||||||
|
| M.Fut f -> f
|
||||||
|
|
||||||
|
let pop_block_exn (self : 'a t) : 'a =
|
||||||
|
match try_pop self with
|
||||||
|
| Some x -> x
|
||||||
|
| None -> Fut.wait_block_exn @@ pop self
|
||||||
|
|
||||||
|
let close (self : _ t) : unit =
|
||||||
|
while
|
||||||
|
let old_st = A.get self.st in
|
||||||
|
match old_st with
|
||||||
|
| St_closed -> false (* exit *)
|
||||||
|
| Elems _ | Empty -> not (A.compare_and_set self.st old_st St_closed)
|
||||||
|
| Waiters ws ->
|
||||||
|
if A.compare_and_set self.st old_st St_closed then (
|
||||||
|
(* fail all waiters with [Closed]. *)
|
||||||
|
let bt = Printexc.get_callstack 10 in
|
||||||
|
Q.iter (fun w -> Fut.fulfill_idempotent w (Error (Closed, bt))) ws;
|
||||||
|
false
|
||||||
|
) else
|
||||||
|
true
|
||||||
|
do
|
||||||
|
Domain_.relax ()
|
||||||
|
done
|
||||||
43
src/chan.mli
Normal file
43
src/chan.mli
Normal file
|
|
@ -0,0 +1,43 @@
|
||||||
|
(** Channels.
|
||||||
|
|
||||||
|
Channels are pipelines of values where threads can push into
|
||||||
|
one end, and pull from the other end.
|
||||||
|
|
||||||
|
Unlike {!Moonpool.Blocking_queue}, channels are designed so
|
||||||
|
that pushing never blocks, and pop'ing values returns a future.
|
||||||
|
|
||||||
|
@since 0.3
|
||||||
|
*)
|
||||||
|
|
||||||
|
type 'a or_error = 'a Fut.or_error
|
||||||
|
|
||||||
|
type 'a t
|
||||||
|
(** Channel carrying values of type ['a]. *)
|
||||||
|
|
||||||
|
val create : unit -> 'a t
|
||||||
|
(** Create a channel. *)
|
||||||
|
|
||||||
|
exception Closed
|
||||||
|
|
||||||
|
val push : 'a t -> 'a -> unit
|
||||||
|
(** [push chan x] pushes [x] into [chan]. This does not block.
|
||||||
|
@raise Closed if the channel is closed. *)
|
||||||
|
|
||||||
|
val pop : 'a t -> 'a Fut.t
|
||||||
|
(** Pop an element. This returns a future that will be
|
||||||
|
fulfilled when an element is available.
|
||||||
|
@raise Closed if the channel is closed, or fails the future
|
||||||
|
if the channel is closed before an element is available for it. *)
|
||||||
|
|
||||||
|
val try_pop : 'a t -> 'a option
|
||||||
|
(** [try_pop chan] pops and return an element if one is available
|
||||||
|
immediately. Otherwise it returns [None]. *)
|
||||||
|
|
||||||
|
val pop_block_exn : 'a t -> 'a
|
||||||
|
(** Like [pop], but blocks if an element is not available immediately.
|
||||||
|
The precautions around blocking from inside a thread pool
|
||||||
|
are the same as explained in {!Fut.wait_block}. *)
|
||||||
|
|
||||||
|
val close : _ t -> unit
|
||||||
|
(** Close the channel. Further push and pop calls will fail.
|
||||||
|
This is idempotent. *)
|
||||||
|
|
@ -2,7 +2,8 @@ let start_thread_on_some_domain f x =
|
||||||
let did = Random.int (D_pool_.n_domains ()) in
|
let did = Random.int (D_pool_.n_domains ()) in
|
||||||
D_pool_.run_on_and_wait did (fun () -> Thread.create f x)
|
D_pool_.run_on_and_wait did (fun () -> Thread.create f x)
|
||||||
|
|
||||||
module Pool = Pool
|
|
||||||
module Fut = Fut
|
|
||||||
module Blocking_queue = Bb_queue
|
|
||||||
module Atomic = Atomic_
|
module Atomic = Atomic_
|
||||||
|
module Blocking_queue = Bb_queue
|
||||||
|
module Chan = Chan
|
||||||
|
module Fut = Fut
|
||||||
|
module Pool = Pool
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@ val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t
|
||||||
to run all the various threads needed in an application (timers, event loops, etc.) *)
|
to run all the various threads needed in an application (timers, event loops, etc.) *)
|
||||||
|
|
||||||
module Fut = Fut
|
module Fut = Fut
|
||||||
|
module Chan = Chan
|
||||||
|
|
||||||
(** A simple blocking queue.
|
(** A simple blocking queue.
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue