Merge pull request #32 from c-cube/simon/update-chan-2024-09-25

update channels to make them bounded and more efficient
This commit is contained in:
Simon Cruanes 2024-09-26 21:38:09 -04:00 committed by GitHub
commit f128e6c63a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 218 additions and 222 deletions

3
bench_primes.sh Executable file
View file

@ -0,0 +1,3 @@
#!/bin/sh
OPTS="--profile=release --display=quiet"
exec dune exec $OPTS -- benchs/primes.exe $@

View file

@ -1,5 +1,5 @@
(executables (executables
(names fib_rec pi) (names fib_rec pi primes)
(preprocess (preprocess
(action (action
(run %{project_root}/src/cpp/cpp.exe %{input-file}))) (run %{project_root}/src/cpp/cpp.exe %{input-file})))

60
benchs/primes.ml Normal file
View file

@ -0,0 +1,60 @@
let ( let@ ) = ( @@ )
let spf = Printf.sprintf
let generate' chan =
for i = 2 to Int.max_int do
Moonpool.Chan.push chan i
done
let filter' in_chan out_chan prime =
let rec loop () =
let n = Moonpool.Chan.pop in_chan in
if n mod prime <> 0 then Moonpool.Chan.push out_chan n;
loop ()
in
loop ()
let main ~chan_size ~n ~on_prime () : unit =
let@ runner = Moonpool.Ws_pool.with_ () in
let@ () = Moonpool.Ws_pool.run_wait_block runner in
let primes = ref @@ Moonpool.Chan.create ~max_size:chan_size () in
Moonpool.run_async runner
(let chan = !primes in
fun () -> generate' chan);
for _i = 1 to n do
let prime = Moonpool.Chan.pop !primes in
on_prime prime;
let filtered_chan = Moonpool.Chan.create ~max_size:chan_size () in
Moonpool.run_async runner
(let in_chan = !primes in
fun () -> filter' in_chan filtered_chan prime);
primes := filtered_chan
done
let () =
let n = ref 10_000 in
let chan_size = ref 0 in
let time = ref true in
let opts =
[
"-n", Arg.Set_int n, " number of iterations";
"--no-time", Arg.Clear time, " do not compute time";
"--chan-size", Arg.Set_int chan_size, " channel size";
]
|> Arg.align
in
Arg.parse opts ignore "";
Printf.printf "computing %d primes\n%!" !n;
let t_start = Unix.gettimeofday () in
let n_primes = Atomic.make 0 in
main ~n:!n ~chan_size:!chan_size ~on_prime:(fun _ -> Atomic.incr n_primes) ();
let elapsed : float = Unix.gettimeofday () -. t_start in
Printf.printf "computed %d primes%s\n%!" (Atomic.get n_primes)
(if !time then
spf " in %.4fs" elapsed
else
"")

View file

@ -1,193 +1,124 @@
module A = Atomic_
type 'a or_error = 'a Fut.or_error
type 'a waiter = 'a Fut.promise
let[@inline] list_is_empty_ = function
| [] -> true
| _ :: _ -> false
(** Simple functional queue *)
module Q : sig
type 'a t
val return : 'a -> 'a t
val is_empty : _ t -> bool
exception Empty
val pop_exn : 'a t -> 'a * 'a t
val push : 'a t -> 'a -> 'a t
val iter : ('a -> unit) -> 'a t -> unit
end = struct
type 'a t = {
hd: 'a list;
tl: 'a list;
}
(** Queue containing elements of type 'a.
invariant: if hd=[], then tl=[] *)
let[@inline] return x : _ t = { hd = [ x ]; tl = [] }
let[@inline] make_ hd tl =
match hd with
| [] -> { hd = List.rev tl; tl = [] }
| _ :: _ -> { hd; tl }
let[@inline] is_empty self = list_is_empty_ self.hd
let[@inline] push self x : _ t = make_ self.hd (x :: self.tl)
let iter f (self : _ t) : unit =
List.iter f self.hd;
List.iter f self.tl
exception Empty
let pop_exn self =
match self.hd with
| [] ->
assert (list_is_empty_ self.tl);
raise Empty
| x :: hd' ->
let self' = make_ hd' self.tl in
x, self'
end
exception Closed exception Closed
type 'a state = type 'a t = {
| Empty q: 'a Queue.t;
| St_closed mutex: Mutex.t; (** protects critical section *)
| Elems of 'a Q.t mutable closed: bool;
| Waiters of 'a waiter Q.t max_size: int;
push_waiters: Trigger.t Queue.t;
pop_waiters: Trigger.t Queue.t;
}
type 'a t = { st: 'a state A.t } [@@unboxed] let create ~max_size () : _ t =
if max_size < 0 then invalid_arg "Chan: max_size < 0";
{
max_size;
mutex = Mutex.create ();
closed = false;
q = Queue.create ();
push_waiters = Queue.create ();
pop_waiters = Queue.create ();
}
let create () : _ t = { st = A.make Empty } let try_push (self : _ t) x : bool =
let res = ref false in
if Mutex.try_lock self.mutex then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
(** Produce a state from a queue of waiters *) match Queue.length self.q with
let[@inline] mk_st_waiters_ ws : _ state = | 0 ->
if Q.is_empty ws then let to_awake = Queue.create () in
Empty Queue.push x self.q;
else Queue.transfer self.pop_waiters to_awake;
Waiters ws res := true;
Mutex.unlock self.mutex;
(** Produce a state from a queue of elements *) (* wake up pop triggers if needed. Be careful to do that
let[@inline] mk_st_elems_ q : _ state = outside the critical section*)
if Q.is_empty q then Queue.iter Trigger.signal to_awake
Empty | n when n < self.max_size ->
else Queue.push x self.q;
Elems q Mutex.unlock self.mutex
| _ -> Mutex.unlock self.mutex
let push (self : _ t) x : unit = );
while !res
let old_st = A.get self.st in
match old_st with
| St_closed -> raise Closed
| Empty -> not (A.compare_and_set self.st old_st (Elems (Q.return x)))
| Waiters ws ->
(* awake first waiter and give it [x] *)
let w, ws' = Q.pop_exn ws in
let new_st = mk_st_waiters_ ws' in
if A.compare_and_set self.st old_st new_st then (
Fut.fulfill w (Ok x);
false
) else
true
| Elems q -> not (A.compare_and_set self.st old_st (Elems (Q.push q x)))
do
Domain_.relax ()
done
let try_pop (type elt) self : elt option = let try_pop (type elt) self : elt option =
let module M = struct let res = ref None in
exception Found of elt if Mutex.try_lock self.mutex then (
end in (match Queue.pop self.q with
try | exception Queue.Empty ->
(* a bit of spinning *) if self.closed then (
for _retry = 1 to 10 do Mutex.unlock self.mutex;
let old_st = A.get self.st in raise Closed
match old_st with )
| Elems q -> | x -> res := Some x);
let x, q' = Q.pop_exn q in Mutex.unlock self.mutex
let new_st = mk_st_elems_ q' in );
if A.compare_and_set self.st old_st new_st then !res
raise_notrace (M.Found x)
else
Domain_.relax ()
| _ -> raise_notrace Exit
done;
None
with
| M.Found x -> Some x
| Exit -> None
let pop (type elt) (self : _ t) : elt Fut.t =
let module M = struct
exception Ret of elt
exception Fut of elt Fut.t
end in
try
while
let old_st = A.get self.st in
(match old_st with
| St_closed ->
let bt = Printexc.get_callstack 10 in
raise_notrace (M.Fut (Fut.fail Closed bt))
| Elems q ->
let x, q' = Q.pop_exn q in
let new_st = mk_st_elems_ q' in
if A.compare_and_set self.st old_st new_st then raise_notrace (M.Ret x)
| Empty ->
let fut, promise = Fut.make () in
let new_st = Waiters (Q.return promise) in
if A.compare_and_set self.st old_st new_st then
raise_notrace (M.Fut fut)
| Waiters ws ->
let fut, promise = Fut.make () in
(* add new promise at the end of the queue of waiters *)
let new_st = Waiters (Q.push ws promise) in
if A.compare_and_set self.st old_st new_st then
raise_notrace (M.Fut fut));
true
do
Domain_.relax ()
done;
(* never reached *)
assert false
with
| M.Ret x -> Fut.return x
| M.Fut f -> f
let pop_block_exn (self : 'a t) : 'a =
match try_pop self with
| Some x -> x
| None -> Fut.wait_block_exn @@ pop self
let close (self : _ t) : unit = let close (self : _ t) : unit =
while let q = Queue.create () in
let old_st = A.get self.st in Mutex.lock self.mutex;
match old_st with if not self.closed then (
| St_closed -> false (* exit *) self.closed <- true;
| Elems _ | Empty -> not (A.compare_and_set self.st old_st St_closed) Queue.transfer self.pop_waiters q;
| Waiters ws -> Queue.transfer self.push_waiters q
if A.compare_and_set self.st old_st St_closed then ( );
(* fail all waiters with [Closed]. *) Mutex.unlock self.mutex;
let bt = Printexc.get_callstack 10 in Queue.iter Trigger.signal q
Q.iter (fun w -> Fut.fulfill_idempotent w (Error (Closed, bt))) ws;
false
) else
true
do
Domain_.relax ()
done
[@@@ifge 5.0] [@@@ifge 5.0]
let pop_await self = let rec push (self : _ t) x : unit =
match try_pop self with Mutex.lock self.mutex;
| Some x -> x
| None -> Fut.await @@ pop self if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
match Queue.length self.q with
| 0 ->
Queue.push x self.q;
let to_wakeup = Queue.create () in
Queue.transfer self.pop_waiters to_wakeup;
Mutex.unlock self.mutex;
Queue.iter Trigger.signal to_wakeup
| n when n < self.max_size ->
Queue.push x self.q;
Mutex.unlock self.mutex
| _ ->
let tr = Trigger.create () in
Queue.push tr self.push_waiters;
Mutex.unlock self.mutex;
Trigger.await_exn tr;
push self x
let rec pop (self : 'a t) : 'a =
Mutex.lock self.mutex;
match Queue.pop self.q with
| x ->
if Queue.is_empty self.q then (
let to_wakeup = Queue.create () in
Queue.transfer self.push_waiters to_wakeup;
Mutex.unlock self.mutex;
Queue.iter Trigger.signal to_wakeup
) else
Mutex.unlock self.mutex;
x
| exception Queue.Empty ->
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
let tr = Trigger.create () in
Queue.push tr self.pop_waiters;
Mutex.unlock self.mutex;
Trigger.await_exn tr;
pop self
[@@@endif] [@@@endif]

View file

@ -1,42 +1,29 @@
(** Channels. (** Channels.
Channels are pipelines of values where threads can push into The channels have bounded size. Push/pop return futures or can use effects
one end, and pull from the other end. to provide an [await]-friendly version.
Unlike {!Moonpool.Blocking_queue}, channels are designed so The channels became bounded since @NEXT_RELEASE .
that pushing never blocks, and pop'ing values returns a future.
@since 0.3
*) *)
type 'a or_error = 'a Fut.or_error
type 'a t type 'a t
(** Channel carrying values of type ['a]. *) (** Channel carrying values of type ['a]. *)
val create : unit -> 'a t val create : max_size:int -> unit -> 'a t
(** Create a channel. *) (** Create a channel. *)
exception Closed exception Closed
val push : 'a t -> 'a -> unit val try_push : 'a t -> 'a -> bool
(** [push chan x] pushes [x] into [chan]. This does not block. (** [try_push chan x] pushes [x] into [chan]. This does not block.
Returns [true] if it succeeded in pushing.
@raise Closed if the channel is closed. *) @raise Closed if the channel is closed. *)
val pop : 'a t -> 'a Fut.t
(** Pop an element. This returns a future that will be
fulfilled when an element is available.
@raise Closed if the channel is closed, or fails the future
if the channel is closed before an element is available for it. *)
val try_pop : 'a t -> 'a option val try_pop : 'a t -> 'a option
(** [try_pop chan] pops and return an element if one is available (** [try_pop chan] pops and return an element if one is available
immediately. Otherwise it returns [None]. *) immediately. Otherwise it returns [None].
@raise Closed if the channel is closed and empty.
val pop_block_exn : 'a t -> 'a *)
(** Like [pop], but blocks if an element is not available immediately.
The precautions around blocking from inside a thread pool
are the same as explained in {!Fut.wait_block}. *)
val close : _ t -> unit val close : _ t -> unit
(** Close the channel. Further push and pop calls will fail. (** Close the channel. Further push and pop calls will fail.
@ -44,9 +31,23 @@ val close : _ t -> unit
[@@@ifge 5.0] [@@@ifge 5.0]
val pop_await : 'a t -> 'a val push : 'a t -> 'a -> unit
(** Like {!pop} but suspends the current thread until an element is (** Push the value into the channel, suspending the current task
available. See {!Fut.await} for more details. if the channel is currently full.
@since 0.3 *) @raise Closed if the channel is closed
@since NEXT_RELEASE *)
val pop : 'a t -> 'a
(** Pop an element. This might suspend the current task if the
channel is currently empty.
@raise Closed if the channel is empty and closed.
@since NEXT_RELEASE *)
(*
val pop_block_exn : 'a t -> 'a
(** Like [pop], but blocks if an element is not available immediately.
The precautions around blocking from inside a thread pool
are the same as explained in {!Fut.wait_block}. *)
*)
[@@@endif] [@@@endif]

View file

@ -1,4 +1,5 @@
module Mutex = Picos_std_sync.Mutex module Mutex = Picos_std_sync.Mutex
module Chan = Chan
module Condition = Picos_std_sync.Condition module Condition = Picos_std_sync.Condition
module Lock = Lock module Lock = Lock
module Event = Event module Event = Event

View file

@ -7,7 +7,6 @@
t_futs1 t_futs1
t_tree_futs t_tree_futs
t_props t_props
t_chan_train
t_resource t_resource
t_unfair t_unfair
t_ws_deque t_ws_deque

View file

@ -3,6 +3,7 @@
t_fib1 t_fib1
t_futs1 t_futs1
t_many t_many
t_chan_train
t_fib_fork_join t_fib_fork_join
t_fib_fork_join_all t_fib_fork_join_all
t_sort t_sort

View file

@ -3,17 +3,15 @@ open Moonpool
(* large pool, some of our tasks below are long lived *) (* large pool, some of our tasks below are long lived *)
let pool = Ws_pool.create ~num_threads:30 () let pool = Ws_pool.create ~num_threads:30 ()
open Fut.Infix
type event = type event =
| E_int of int | E_int of int
| E_close | E_close
let mk_chan (ic : event Chan.t) : event Chan.t = let mk_chan (ic : event Chan.t) : event Chan.t =
let out = Chan.create () in let out = Chan.create ~max_size:16 () in
let rec loop () = let rec loop () =
let* ev = Chan.pop ic in let ev = Chan.pop ic in
Chan.push out ev; Chan.push out ev;
match ev with match ev with
| E_close -> Fut.return () | E_close -> Fut.return ()
@ -44,7 +42,7 @@ let run () =
(* start trains *) (* start trains *)
let trains = let trains =
List.init n_trains (fun _ -> List.init n_trains (fun _ ->
let c = Chan.create () in let c = Chan.create ~max_size:16 () in
let out = mk_train len_train c in let out = mk_train len_train c in
c, out) c, out)
in in
@ -66,7 +64,9 @@ let run () =
let sum = ref 0 in let sum = ref 0 in
try try
while true do while true do
match Chan.pop_block_exn oc with match
Fut.spawn ~on:pool (fun () -> Chan.pop oc) |> Fut.wait_block_exn
with
| E_close -> raise Exit | E_close -> raise Exit
| E_int x -> sum := !sum + x | E_int x -> sum := !sum + x
done; done;

View file

@ -52,14 +52,14 @@ let () =
let clock = ref TS.init in let clock = ref TS.init in
let fib = let fib =
F.spawn_top ~on:runner @@ fun () -> F.spawn_top ~on:runner @@ fun () ->
let chan_progress = Chan.create () in let chan_progress = Chan.create ~max_size:4 () in
let chans = Array.init 5 (fun _ -> Chan.create ()) in let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in
let subs = let subs =
List.init 5 (fun i -> List.init 5 (fun i ->
F.spawn ~protect:false @@ fun _n -> F.spawn ~protect:false @@ fun _n ->
Thread.delay (float i *. 0.01); Thread.delay (float i *. 0.01);
Chan.pop_await chans.(i); Chan.pop chans.(i);
Chan.push chan_progress i; Chan.push chan_progress i;
F.check_if_cancelled (); F.check_if_cancelled ();
i) i)
@ -70,7 +70,7 @@ let () =
F.spawn_ignore (fun () -> F.spawn_ignore (fun () ->
for i = 0 to 4 do for i = 0 to 4 do
Chan.push chans.(i) (); Chan.push chans.(i) ();
let i' = Chan.pop_await chan_progress in let i' = Chan.pop chan_progress in
assert (i = i') assert (i = i')
done); done);
@ -110,8 +110,8 @@ let () =
@@ Exn_bt.show ebt) @@ Exn_bt.show ebt)
in in
let chans_unblock = Array.init 10 (fun _i -> Chan.create ()) in let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in
let chan_progress = Chan.create () in let chan_progress = Chan.create ~max_size:4 () in
logf (TS.tick_get clock) "start fibers"; logf (TS.tick_get clock) "start fibers";
let subs = let subs =
@ -126,7 +126,7 @@ let () =
Thread.delay 0.002; Thread.delay 0.002;
(* sync for determinism *) (* sync for determinism *)
Chan.pop_await chans_unblock.(i); Chan.pop chans_unblock.(i);
Chan.push chan_progress i; Chan.push chan_progress i;
if i = 7 then ( if i = 7 then (
@ -150,7 +150,7 @@ let () =
F.spawn_ignore (fun () -> F.spawn_ignore (fun () ->
for j = 0 to 9 do for j = 0 to 9 do
Chan.push chans_unblock.(j) (); Chan.push chans_unblock.(j) ();
let j' = Chan.pop_await chan_progress in let j' = Chan.pop chan_progress in
assert (j = j') assert (j = j')
done); done);