From d8aa60558b00da29c448a487ed787e29a6aca580 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 25 Sep 2024 21:31:28 -0400 Subject: [PATCH 1/9] benchmark for a concurrent, channel-based, Erathostene prime sieve --- benchs/dune | 2 +- benchs/primes.ml | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 benchs/primes.ml diff --git a/benchs/dune b/benchs/dune index 14393230..6d2ec5ff 100644 --- a/benchs/dune +++ b/benchs/dune @@ -1,5 +1,5 @@ (executables - (names fib_rec pi) + (names fib_rec pi primes) (preprocess (action (run %{project_root}/src/cpp/cpp.exe %{input-file}))) diff --git a/benchs/primes.ml b/benchs/primes.ml new file mode 100644 index 00000000..570b850f --- /dev/null +++ b/benchs/primes.ml @@ -0,0 +1,32 @@ +let ( let@ ) = ( @@ ) + +let generate' chan = + for i = 2 to Int.max_int do + Moonpool.Chan.push chan i; + Thread.yield () + done + +let filter' in_chan out_chan prime = + let rec loop () = + let n = Moonpool.Chan.pop_await in_chan in + if n mod prime <> 0 then Moonpool.Chan.push out_chan n; + loop () + in + loop () + +let () = + let@ runner = Moonpool.Ws_pool.with_ () in + let@ () = Moonpool.Ws_pool.run_wait_block runner in + let primes = ref @@ Moonpool.Chan.create () in + Moonpool.run_async runner + (let chan = !primes in + fun () -> generate' chan); + for _i = 1 to 10 do + let prime = Moonpool.Chan.pop_await !primes in + Format.printf "%d\n@?" prime; + let filtered_chan = Moonpool.Chan.create () in + Moonpool.run_async runner + (let in_chan = !primes in + fun () -> filter' in_chan filtered_chan prime); + primes := filtered_chan + done From 35a69924d3016accdfd54e73e191f017290cd17b Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 25 Sep 2024 21:32:11 -0400 Subject: [PATCH 2/9] wip: change Moonpool.Chan so it's bounded also push/pop require effects, the OCaml 4 version only allows for try_push/try_pop. --- src/core/chan.ml | 290 ++++++++++++++++++++++---------------- src/core/chan.mli | 49 +++---- src/sync/moonpool_sync.ml | 1 + 3 files changed, 192 insertions(+), 148 deletions(-) diff --git a/src/core/chan.ml b/src/core/chan.ml index 5ce82376..099e7074 100644 --- a/src/core/chan.ml +++ b/src/core/chan.ml @@ -1,7 +1,8 @@ module A = Atomic_ type 'a or_error = 'a Fut.or_error -type 'a waiter = 'a Fut.promise +type 'a pop_waiter = 'a Fut.promise +type 'a push_waiter = 'a * unit Fut.promise let[@inline] list_is_empty_ = function | [] -> true @@ -11,6 +12,7 @@ let[@inline] list_is_empty_ = function module Q : sig type 'a t + val empty : 'a t val return : 'a -> 'a t val is_empty : _ t -> bool @@ -28,6 +30,7 @@ end = struct invariant: if hd=[], then tl=[] *) + let empty = { hd = []; tl = [] } let[@inline] return x : _ t = { hd = [ x ]; tl = [] } let[@inline] make_ hd tl = @@ -56,138 +59,177 @@ end exception Closed -type 'a state = - | Empty - | St_closed - | Elems of 'a Q.t - | Waiters of 'a waiter Q.t +type 'a t = { + q: 'a Queue.t; + mutex: Mutex.t; (** protects critical section *) + mutable closed: bool; + max_size: int; + push_waiters: Trigger.t Queue.t; + pop_waiters: Trigger.t Queue.t; +} -type 'a t = { st: 'a state A.t } [@@unboxed] +let create ~max_size () : _ t = + if max_size < 0 then invalid_arg "Chan: max_size < 0"; + { + max_size; + mutex = Mutex.create (); + closed = false; + q = Queue.create (); + push_waiters = Queue.create (); + pop_waiters = Queue.create (); + } -let create () : _ t = { st = A.make Empty } +let try_push (self : _ t) x : bool = + let res = ref false in + if Mutex.try_lock self.mutex then ( + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); -(** Produce a state from a queue of waiters *) -let[@inline] mk_st_waiters_ ws : _ state = - if Q.is_empty ws then - Empty - else - Waiters ws - -(** Produce a state from a queue of elements *) -let[@inline] mk_st_elems_ q : _ state = - if Q.is_empty q then - Empty - else - Elems q - -let push (self : _ t) x : unit = - while - let old_st = A.get self.st in - match old_st with - | St_closed -> raise Closed - | Empty -> not (A.compare_and_set self.st old_st (Elems (Q.return x))) - | Waiters ws -> - (* awake first waiter and give it [x] *) - let w, ws' = Q.pop_exn ws in - let new_st = mk_st_waiters_ ws' in - if A.compare_and_set self.st old_st new_st then ( - Fut.fulfill w (Ok x); - false - ) else - true - | Elems q -> not (A.compare_and_set self.st old_st (Elems (Q.push q x))) - do - Domain_.relax () - done + match Queue.length self.q with + | 0 -> + let to_awake = Queue.create () in + Queue.push x self.q; + Queue.transfer self.pop_waiters to_awake; + res := true; + Mutex.unlock self.mutex; + (* wake up pop triggers if needed. Be careful to do that + outside the critical section*) + Queue.iter Trigger.signal to_awake + | n when n < self.max_size -> + Queue.push x self.q; + Mutex.unlock self.mutex + | _ -> Mutex.unlock self.mutex + ); + !res let try_pop (type elt) self : elt option = - let module M = struct - exception Found of elt - end in - try - (* a bit of spinning *) - for _retry = 1 to 10 do - let old_st = A.get self.st in - match old_st with - | Elems q -> - let x, q' = Q.pop_exn q in - let new_st = mk_st_elems_ q' in - if A.compare_and_set self.st old_st new_st then - raise_notrace (M.Found x) - else - Domain_.relax () - | _ -> raise_notrace Exit - done; - None - with - | M.Found x -> Some x - | Exit -> None - -let pop (type elt) (self : _ t) : elt Fut.t = - let module M = struct - exception Ret of elt - exception Fut of elt Fut.t - end in - try - while - let old_st = A.get self.st in - (match old_st with - | St_closed -> - let bt = Printexc.get_callstack 10 in - raise_notrace (M.Fut (Fut.fail Closed bt)) - | Elems q -> - let x, q' = Q.pop_exn q in - let new_st = mk_st_elems_ q' in - if A.compare_and_set self.st old_st new_st then raise_notrace (M.Ret x) - | Empty -> - let fut, promise = Fut.make () in - let new_st = Waiters (Q.return promise) in - if A.compare_and_set self.st old_st new_st then - raise_notrace (M.Fut fut) - | Waiters ws -> - let fut, promise = Fut.make () in - (* add new promise at the end of the queue of waiters *) - let new_st = Waiters (Q.push ws promise) in - if A.compare_and_set self.st old_st new_st then - raise_notrace (M.Fut fut)); - true - do - Domain_.relax () - done; - (* never reached *) - assert false - with - | M.Ret x -> Fut.return x - | M.Fut f -> f - -let pop_block_exn (self : 'a t) : 'a = - match try_pop self with - | Some x -> x - | None -> Fut.wait_block_exn @@ pop self + let res = ref None in + if Mutex.try_lock self.mutex then ( + (match Queue.pop self.q with + | exception Queue.Empty -> + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ) + | x -> res := Some x); + Mutex.unlock self.mutex + ); + !res let close (self : _ t) : unit = - while - let old_st = A.get self.st in - match old_st with - | St_closed -> false (* exit *) - | Elems _ | Empty -> not (A.compare_and_set self.st old_st St_closed) - | Waiters ws -> - if A.compare_and_set self.st old_st St_closed then ( - (* fail all waiters with [Closed]. *) - let bt = Printexc.get_callstack 10 in - Q.iter (fun w -> Fut.fulfill_idempotent w (Error (Closed, bt))) ws; - false - ) else - true - do - Domain_.relax () - done + let q = Queue.create () in + Mutex.lock self.mutex; + if not self.closed then ( + self.closed <- true; + Queue.transfer self.pop_waiters q; + Queue.transfer self.push_waiters q + ); + Mutex.unlock self.mutex; + Queue.iter Trigger.signal q [@@@ifge 5.0] -let pop_await self = - match try_pop self with - | Some x -> x - | None -> Fut.await @@ pop self +let rec push (self : _ t) x : unit = + Mutex.lock self.mutex; + + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + + match Queue.length self.q with + | 0 -> + Queue.push x self.q; + let to_wakeup = Queue.create () in + Queue.transfer self.pop_waiters to_wakeup; + Mutex.unlock self.mutex; + Queue.iter Trigger.signal to_wakeup + | n when n < self.max_size -> + Queue.push x self.q; + Mutex.unlock self.mutex + | _ -> + let tr = Trigger.create () in + Queue.push tr self.push_waiters; + Mutex.unlock self.mutex; + Trigger.await_exn tr; + push self x + +let rec pop (self : 'a t) : 'a = + Mutex.lock self.mutex; + match Queue.pop self.q with + | x -> + if Queue.is_empty self.q then ( + let to_wakeup = Queue.create () in + Queue.transfer self.push_waiters to_wakeup; + Mutex.unlock self.mutex; + Queue.iter Trigger.signal to_wakeup + ) else + Mutex.unlock self.mutex; + x + | exception Queue.Empty -> + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + + let tr = Trigger.create () in + Queue.push tr self.pop_waiters; + Mutex.unlock self.mutex; + Trigger.await_exn tr; + pop self [@@@endif] + +(* TODO: remove + (** A waiter queue, somewhat similar to a condition. *) + module Waiters_ = struct + type t = { waiters: Trigger.t Q.t A.t } [@@unboxed] + + let create () : t = { waiters = A.make Q.empty } + + let add_waiter self (tr : Trigger.t) : unit = + while + let q = A.get self.waiters in + not (A.compare_and_set self.waiters q (Q.push q tr)) + do + Domain_.relax () + done + + let wait_await (self : t) : unit = + let tr = Trigger.create () in + add_waiter self tr; + Trigger.await_exn tr + + exception Empty = Q.Empty + + let rec pop_waiter (self : t) : Trigger.t = + let q = A.get self.waiters in + let x, q' = Q.pop_exn q in + if A.compare_and_set self.waiters q q' then + x + else ( + Domain_.relax (); + pop_waiter self + ) + + let rec pop_all (self : t) : Trigger.t Q.t = + let q = A.get self.waiters in + if A.compare_and_set self.waiters q Q.empty then + q + else ( + Domain_.relax (); + pop_all self + ) + + let signal (self : t) : unit = + match pop_waiter self with + | exception Empty -> () + | tr -> Trigger.signal tr + + let broadcast (self : t) : unit = + let waiters = pop_all self in + Q.iter Trigger.signal waiters + end +*) diff --git a/src/core/chan.mli b/src/core/chan.mli index 083cf8d5..142f43ce 100644 --- a/src/core/chan.mli +++ b/src/core/chan.mli @@ -1,12 +1,9 @@ (** Channels. - Channels are pipelines of values where threads can push into - one end, and pull from the other end. + The channels have bounded size. Push/pop return futures or can use effects + to provide an [await]-friendly version. - Unlike {!Moonpool.Blocking_queue}, channels are designed so - that pushing never blocks, and pop'ing values returns a future. - - @since 0.3 + The channels became bounded since @NEXT_RELEASE . *) type 'a or_error = 'a Fut.or_error @@ -14,39 +11,43 @@ type 'a or_error = 'a Fut.or_error type 'a t (** Channel carrying values of type ['a]. *) -val create : unit -> 'a t +val create : max_size:int -> unit -> 'a t (** Create a channel. *) exception Closed -val push : 'a t -> 'a -> unit -(** [push chan x] pushes [x] into [chan]. This does not block. +val try_push : 'a t -> 'a -> bool +(** [try_push chan x] pushes [x] into [chan]. This does not block. + Returns [true] if it succeeded in pushing. @raise Closed if the channel is closed. *) -val pop : 'a t -> 'a Fut.t -(** Pop an element. This returns a future that will be - fulfilled when an element is available. - @raise Closed if the channel is closed, or fails the future - if the channel is closed before an element is available for it. *) - val try_pop : 'a t -> 'a option (** [try_pop chan] pops and return an element if one is available immediately. Otherwise it returns [None]. *) -val pop_block_exn : 'a t -> 'a -(** Like [pop], but blocks if an element is not available immediately. - The precautions around blocking from inside a thread pool - are the same as explained in {!Fut.wait_block}. *) - val close : _ t -> unit (** Close the channel. Further push and pop calls will fail. This is idempotent. *) [@@@ifge 5.0] -val pop_await : 'a t -> 'a -(** Like {!pop} but suspends the current thread until an element is - available. See {!Fut.await} for more details. - @since 0.3 *) +val push : 'a t -> 'a -> unit +(** Push the value into the channel, suspending the current task + if the channel is currently full. + @raise Closed if the channel is closed + @since NEXT_RELEASE *) + +val pop : 'a t -> 'a +(** Pop an element. This might suspend the current task if the + channel is currently empty. + @raise Closed if the channel is empty and closed. + @since NEXT_RELEASE *) + +(* +val pop_block_exn : 'a t -> 'a +(** Like [pop], but blocks if an element is not available immediately. + The precautions around blocking from inside a thread pool + are the same as explained in {!Fut.wait_block}. *) +*) [@@@endif] diff --git a/src/sync/moonpool_sync.ml b/src/sync/moonpool_sync.ml index 99065305..f2c29ba7 100644 --- a/src/sync/moonpool_sync.ml +++ b/src/sync/moonpool_sync.ml @@ -1,4 +1,5 @@ module Mutex = Picos_std_sync.Mutex +module Chan = Chan module Condition = Picos_std_sync.Condition module Lock = Lock module Event = Event From e7b422333263bf49ed5f3b3fd43769ef1cdcfeab Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 25 Sep 2024 21:32:53 -0400 Subject: [PATCH 3/9] bench_primes script --- bench_primes.sh | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 bench_primes.sh diff --git a/bench_primes.sh b/bench_primes.sh new file mode 100644 index 00000000..52b618c5 --- /dev/null +++ b/bench_primes.sh @@ -0,0 +1,3 @@ +#!/bin/sh +OPTS="--profile=release --display=quiet" +exec dune exec $OPTS -- benchs/primes.exe $@ From 3b8b4d040a2c9e5633f382ccba37648f9a1a32ff Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 25 Sep 2024 21:32:59 -0400 Subject: [PATCH 4/9] test: move t_chan_train to effect-based tests --- test/dune | 1 - test/effect-based/dune | 1 + test/{ => effect-based}/t_chan_train.ml | 6 +++--- 3 files changed, 4 insertions(+), 4 deletions(-) rename test/{ => effect-based}/t_chan_train.ml (94%) diff --git a/test/dune b/test/dune index c10d08f4..af881591 100644 --- a/test/dune +++ b/test/dune @@ -7,7 +7,6 @@ t_futs1 t_tree_futs t_props - t_chan_train t_resource t_unfair t_ws_deque diff --git a/test/effect-based/dune b/test/effect-based/dune index bf1feb81..faa9254d 100644 --- a/test/effect-based/dune +++ b/test/effect-based/dune @@ -3,6 +3,7 @@ t_fib1 t_futs1 t_many + t_chan_train t_fib_fork_join t_fib_fork_join_all t_sort diff --git a/test/t_chan_train.ml b/test/effect-based/t_chan_train.ml similarity index 94% rename from test/t_chan_train.ml rename to test/effect-based/t_chan_train.ml index 20645a73..09b51987 100644 --- a/test/t_chan_train.ml +++ b/test/effect-based/t_chan_train.ml @@ -10,10 +10,10 @@ type event = | E_close let mk_chan (ic : event Chan.t) : event Chan.t = - let out = Chan.create () in + let out = Chan.create ~max_size:16 () in let rec loop () = - let* ev = Chan.pop ic in + let ev = Chan.pop ic in Chan.push out ev; match ev with | E_close -> Fut.return () @@ -44,7 +44,7 @@ let run () = (* start trains *) let trains = List.init n_trains (fun _ -> - let c = Chan.create () in + let c = Chan.create ~max_size:16 () in let out = mk_train len_train c in c, out) in From 854c3b819bdef4fa1c319e758d9ec60a2f5ff530 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 25 Sep 2024 22:17:31 -0400 Subject: [PATCH 5/9] update tests and benchs --- bench_primes.sh | 0 benchs/primes.ml | 44 ++++++++++++++++++++++++------- test/effect-based/t_chan_train.ml | 6 ++--- test/fiber/t_fib1.ml | 16 +++++------ 4 files changed, 46 insertions(+), 20 deletions(-) mode change 100644 => 100755 bench_primes.sh diff --git a/bench_primes.sh b/bench_primes.sh old mode 100644 new mode 100755 diff --git a/benchs/primes.ml b/benchs/primes.ml index 570b850f..e3e60d06 100644 --- a/benchs/primes.ml +++ b/benchs/primes.ml @@ -1,32 +1,58 @@ let ( let@ ) = ( @@ ) +let spf = Printf.sprintf let generate' chan = for i = 2 to Int.max_int do - Moonpool.Chan.push chan i; - Thread.yield () + Moonpool.Chan.push chan i done let filter' in_chan out_chan prime = let rec loop () = - let n = Moonpool.Chan.pop_await in_chan in + let n = Moonpool.Chan.pop in_chan in if n mod prime <> 0 then Moonpool.Chan.push out_chan n; loop () in loop () -let () = +let main ~n ~on_prime () : unit = let@ runner = Moonpool.Ws_pool.with_ () in let@ () = Moonpool.Ws_pool.run_wait_block runner in - let primes = ref @@ Moonpool.Chan.create () in + let primes = ref @@ Moonpool.Chan.create ~max_size:32 () in Moonpool.run_async runner (let chan = !primes in fun () -> generate' chan); - for _i = 1 to 10 do - let prime = Moonpool.Chan.pop_await !primes in - Format.printf "%d\n@?" prime; - let filtered_chan = Moonpool.Chan.create () in + + for _i = 1 to n do + let prime = Moonpool.Chan.pop !primes in + on_prime prime; + let filtered_chan = Moonpool.Chan.create ~max_size:32 () in Moonpool.run_async runner (let in_chan = !primes in fun () -> filter' in_chan filtered_chan prime); primes := filtered_chan done + +let () = + let n = ref 10_000 in + let time = ref true in + let opts = + [ + "-n", Arg.Set_int n, " number of iterations"; + "--no-time", Arg.Clear time, " do not compute time"; + ] + |> Arg.align + in + Arg.parse opts ignore ""; + Printf.printf "computing %d primes\n%!" !n; + + let t_start = Unix.gettimeofday () in + + let n_primes = Atomic.make 0 in + main ~n:!n ~on_prime:(fun _ -> Atomic.incr n_primes) (); + + let elapsed : float = Unix.gettimeofday () -. t_start in + Printf.printf "computed %d primes%s\n%!" (Atomic.get n_primes) + (if !time then + spf " in %.4fs" elapsed + else + "") diff --git a/test/effect-based/t_chan_train.ml b/test/effect-based/t_chan_train.ml index 09b51987..e972dabc 100644 --- a/test/effect-based/t_chan_train.ml +++ b/test/effect-based/t_chan_train.ml @@ -3,8 +3,6 @@ open Moonpool (* large pool, some of our tasks below are long lived *) let pool = Ws_pool.create ~num_threads:30 () -open Fut.Infix - type event = | E_int of int | E_close @@ -66,7 +64,9 @@ let run () = let sum = ref 0 in try while true do - match Chan.pop_block_exn oc with + match + Fut.spawn ~on:pool (fun () -> Chan.pop oc) |> Fut.wait_block_exn + with | E_close -> raise Exit | E_int x -> sum := !sum + x done; diff --git a/test/fiber/t_fib1.ml b/test/fiber/t_fib1.ml index 7ceedbf4..77360b2b 100644 --- a/test/fiber/t_fib1.ml +++ b/test/fiber/t_fib1.ml @@ -52,14 +52,14 @@ let () = let clock = ref TS.init in let fib = F.spawn_top ~on:runner @@ fun () -> - let chan_progress = Chan.create () in - let chans = Array.init 5 (fun _ -> Chan.create ()) in + let chan_progress = Chan.create ~max_size:4 () in + let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in let subs = List.init 5 (fun i -> F.spawn ~protect:false @@ fun _n -> Thread.delay (float i *. 0.01); - Chan.pop_await chans.(i); + Chan.pop chans.(i); Chan.push chan_progress i; F.check_if_cancelled (); i) @@ -70,7 +70,7 @@ let () = F.spawn_ignore (fun () -> for i = 0 to 4 do Chan.push chans.(i) (); - let i' = Chan.pop_await chan_progress in + let i' = Chan.pop chan_progress in assert (i = i') done); @@ -110,8 +110,8 @@ let () = @@ Exn_bt.show ebt) in - let chans_unblock = Array.init 10 (fun _i -> Chan.create ()) in - let chan_progress = Chan.create () in + let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in + let chan_progress = Chan.create ~max_size:4 () in logf (TS.tick_get clock) "start fibers"; let subs = @@ -126,7 +126,7 @@ let () = Thread.delay 0.002; (* sync for determinism *) - Chan.pop_await chans_unblock.(i); + Chan.pop chans_unblock.(i); Chan.push chan_progress i; if i = 7 then ( @@ -150,7 +150,7 @@ let () = F.spawn_ignore (fun () -> for j = 0 to 9 do Chan.push chans_unblock.(j) (); - let j' = Chan.pop_await chan_progress in + let j' = Chan.pop chan_progress in assert (j = j') done); From 94998ea407d67208c56eef5d33334a44c0902797 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 25 Sep 2024 22:17:41 -0400 Subject: [PATCH 6/9] lock free chan --- src/core/chan.ml | 283 ++++++++++++++++++++-------------------------- src/core/chan.mli | 6 +- 2 files changed, 127 insertions(+), 162 deletions(-) diff --git a/src/core/chan.ml b/src/core/chan.ml index 099e7074..80613d89 100644 --- a/src/core/chan.ml +++ b/src/core/chan.ml @@ -1,9 +1,5 @@ module A = Atomic_ -type 'a or_error = 'a Fut.or_error -type 'a pop_waiter = 'a Fut.promise -type 'a push_waiter = 'a * unit Fut.promise - let[@inline] list_is_empty_ = function | [] -> true | _ :: _ -> false @@ -13,7 +9,6 @@ module Q : sig type 'a t val empty : 'a t - val return : 'a -> 'a t val is_empty : _ t -> bool exception Empty @@ -31,7 +26,6 @@ end = struct invariant: if hd=[], then tl=[] *) let empty = { hd = []; tl = [] } - let[@inline] return x : _ t = { hd = [ x ]; tl = [] } let[@inline] make_ hd tl = match hd with @@ -58,178 +52,149 @@ end = struct end exception Closed +exception Full + +module State = struct + type 'a t = { + q: 'a Q.t; + size: int; + pop_waiters: Trigger.t Q.t; + push_waiters: Trigger.t Q.t; + } + + (** @raise Q.Empty *) + let[@inline] pop_one_ ~max_size (st : 'a t) : 'a * 'a t * Trigger.t Q.t = + let x, new_q = Q.pop_exn st.q in + let new_st = { st with q = new_q; size = st.size - 1 } in + if st.size = max_size then + (* we signal all the push waiters, the channel isn't full anymore *) + x, { new_st with push_waiters = Q.empty }, st.push_waiters + else + x, new_st, Q.empty + + (** @raise Full *) + let[@inline] push_one_ ~max_size (st : 'a t) (x : 'a) : 'a t * Trigger.t Q.t = + if st.size >= max_size then raise_notrace Full; + let new_q = Q.push st.q x in + let new_st = { st with q = new_q; size = st.size + 1 } in + if st.size = 0 then + (* we signal all the pop waiters, the channel isn't empty anymore *) + { new_st with pop_waiters = Q.empty }, st.pop_waiters + else + new_st, Q.empty +end type 'a t = { - q: 'a Queue.t; - mutex: Mutex.t; (** protects critical section *) - mutable closed: bool; + st: 'a State.t A.t; + closed: bool A.t; max_size: int; - push_waiters: Trigger.t Queue.t; - pop_waiters: Trigger.t Queue.t; } let create ~max_size () : _ t = if max_size < 0 then invalid_arg "Chan: max_size < 0"; { max_size; - mutex = Mutex.create (); - closed = false; - q = Queue.create (); - push_waiters = Queue.create (); - pop_waiters = Queue.create (); + closed = A.make false; + st = + A.make + { + State.q = Q.empty; + size = 0; + pop_waiters = Q.empty; + push_waiters = Q.empty; + }; } -let try_push (self : _ t) x : bool = - let res = ref false in - if Mutex.try_lock self.mutex then ( - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); +let try_pop (self : 'a t) : 'a option = + let old_st = A.get self.st in + match State.pop_one_ ~max_size:self.max_size old_st with + | exception Q.Empty -> + if A.get self.closed then raise Closed; + None + | x, new_st, to_broadcast -> + if A.compare_and_set self.st old_st new_st then ( + Q.iter Trigger.signal to_broadcast; + Some x + ) else + None - match Queue.length self.q with - | 0 -> - let to_awake = Queue.create () in - Queue.push x self.q; - Queue.transfer self.pop_waiters to_awake; - res := true; - Mutex.unlock self.mutex; - (* wake up pop triggers if needed. Be careful to do that - outside the critical section*) - Queue.iter Trigger.signal to_awake - | n when n < self.max_size -> - Queue.push x self.q; - Mutex.unlock self.mutex - | _ -> Mutex.unlock self.mutex - ); - !res - -let try_pop (type elt) self : elt option = - let res = ref None in - if Mutex.try_lock self.mutex then ( - (match Queue.pop self.q with - | exception Queue.Empty -> - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ) - | x -> res := Some x); - Mutex.unlock self.mutex - ); - !res +let try_push (self : 'a t) (x : 'a) : bool = + if A.get self.closed then raise Closed; + let old_st = A.get self.st in + match State.push_one_ ~max_size:self.max_size old_st x with + | exception Full -> false + | new_st, to_broadcast -> + if A.compare_and_set self.st old_st new_st then ( + Q.iter Trigger.signal to_broadcast; + true + ) else + false let close (self : _ t) : unit = - let q = Queue.create () in - Mutex.lock self.mutex; - if not self.closed then ( - self.closed <- true; - Queue.transfer self.pop_waiters q; - Queue.transfer self.push_waiters q - ); - Mutex.unlock self.mutex; - Queue.iter Trigger.signal q + if not (A.exchange self.closed true) then + while + let old_st = A.get self.st in + if + A.compare_and_set self.st old_st + { old_st with push_waiters = Q.empty; pop_waiters = Q.empty } + then ( + (* signal all waiters *) + Q.iter Trigger.signal old_st.push_waiters; + Q.iter Trigger.signal old_st.pop_waiters; + + false + ) else + true + do + Domain_.relax () + done [@@@ifge 5.0] -let rec push (self : _ t) x : unit = - Mutex.lock self.mutex; - - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); - - match Queue.length self.q with - | 0 -> - Queue.push x self.q; - let to_wakeup = Queue.create () in - Queue.transfer self.pop_waiters to_wakeup; - Mutex.unlock self.mutex; - Queue.iter Trigger.signal to_wakeup - | n when n < self.max_size -> - Queue.push x self.q; - Mutex.unlock self.mutex - | _ -> - let tr = Trigger.create () in - Queue.push tr self.push_waiters; - Mutex.unlock self.mutex; - Trigger.await_exn tr; - push self x - let rec pop (self : 'a t) : 'a = - Mutex.lock self.mutex; - match Queue.pop self.q with - | x -> - if Queue.is_empty self.q then ( - let to_wakeup = Queue.create () in - Queue.transfer self.push_waiters to_wakeup; - Mutex.unlock self.mutex; - Queue.iter Trigger.signal to_wakeup - ) else - Mutex.unlock self.mutex; - x - | exception Queue.Empty -> - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); + let old_st = A.get self.st in + match State.pop_one_ ~max_size:self.max_size old_st with + | exception Q.Empty -> + if A.get self.closed then raise Closed; let tr = Trigger.create () in - Queue.push tr self.pop_waiters; - Mutex.unlock self.mutex; - Trigger.await_exn tr; - pop self + if + A.compare_and_set self.st old_st + { old_st with pop_waiters = Q.push old_st.pop_waiters tr } + then ( + Trigger.await_exn tr; + pop self + ) else + pop self + | x, new_st, to_broadcast -> + if A.compare_and_set self.st old_st new_st then ( + Q.iter Trigger.signal to_broadcast; + x + ) else + pop self + +let push (self : _ t) x : unit = + while + if A.get self.closed then raise Closed; + + let old_st = A.get self.st in + match State.push_one_ ~max_size:self.max_size old_st x with + | exception Full -> + let tr = Trigger.create () in + if + A.compare_and_set self.st old_st + { old_st with push_waiters = Q.push old_st.push_waiters tr } + then + Trigger.await_exn tr; + true + | new_st, to_broadcast -> + if A.compare_and_set self.st old_st new_st then ( + Q.iter Trigger.signal to_broadcast; + false + ) else + true + do + Domain_.relax () + done [@@@endif] - -(* TODO: remove - (** A waiter queue, somewhat similar to a condition. *) - module Waiters_ = struct - type t = { waiters: Trigger.t Q.t A.t } [@@unboxed] - - let create () : t = { waiters = A.make Q.empty } - - let add_waiter self (tr : Trigger.t) : unit = - while - let q = A.get self.waiters in - not (A.compare_and_set self.waiters q (Q.push q tr)) - do - Domain_.relax () - done - - let wait_await (self : t) : unit = - let tr = Trigger.create () in - add_waiter self tr; - Trigger.await_exn tr - - exception Empty = Q.Empty - - let rec pop_waiter (self : t) : Trigger.t = - let q = A.get self.waiters in - let x, q' = Q.pop_exn q in - if A.compare_and_set self.waiters q q' then - x - else ( - Domain_.relax (); - pop_waiter self - ) - - let rec pop_all (self : t) : Trigger.t Q.t = - let q = A.get self.waiters in - if A.compare_and_set self.waiters q Q.empty then - q - else ( - Domain_.relax (); - pop_all self - ) - - let signal (self : t) : unit = - match pop_waiter self with - | exception Empty -> () - | tr -> Trigger.signal tr - - let broadcast (self : t) : unit = - let waiters = pop_all self in - Q.iter Trigger.signal waiters - end -*) diff --git a/src/core/chan.mli b/src/core/chan.mli index 142f43ce..7ec1163d 100644 --- a/src/core/chan.mli +++ b/src/core/chan.mli @@ -6,8 +6,6 @@ The channels became bounded since @NEXT_RELEASE . *) -type 'a or_error = 'a Fut.or_error - type 'a t (** Channel carrying values of type ['a]. *) @@ -23,7 +21,9 @@ val try_push : 'a t -> 'a -> bool val try_pop : 'a t -> 'a option (** [try_pop chan] pops and return an element if one is available - immediately. Otherwise it returns [None]. *) + immediately. Otherwise it returns [None]. + @raise Closed if the channel is closed and empty. + *) val close : _ t -> unit (** Close the channel. Further push and pop calls will fail. From e7ee012108f817a5b46c4171822d673b29944f84 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 25 Sep 2024 22:19:34 -0400 Subject: [PATCH 7/9] chan: go back to the lock-full version --- src/core/chan.ml | 276 +++++++++++++++++------------------------------ 1 file changed, 101 insertions(+), 175 deletions(-) diff --git a/src/core/chan.ml b/src/core/chan.ml index 80613d89..0debda30 100644 --- a/src/core/chan.ml +++ b/src/core/chan.ml @@ -1,200 +1,126 @@ module A = Atomic_ -let[@inline] list_is_empty_ = function - | [] -> true - | _ :: _ -> false - -(** Simple functional queue *) -module Q : sig - type 'a t - - val empty : 'a t - val is_empty : _ t -> bool - - exception Empty - - val pop_exn : 'a t -> 'a * 'a t - val push : 'a t -> 'a -> 'a t - val iter : ('a -> unit) -> 'a t -> unit -end = struct - type 'a t = { - hd: 'a list; - tl: 'a list; - } - (** Queue containing elements of type 'a. - - invariant: if hd=[], then tl=[] *) - - let empty = { hd = []; tl = [] } - - let[@inline] make_ hd tl = - match hd with - | [] -> { hd = List.rev tl; tl = [] } - | _ :: _ -> { hd; tl } - - let[@inline] is_empty self = list_is_empty_ self.hd - let[@inline] push self x : _ t = make_ self.hd (x :: self.tl) - - let iter f (self : _ t) : unit = - List.iter f self.hd; - List.iter f self.tl - - exception Empty - - let pop_exn self = - match self.hd with - | [] -> - assert (list_is_empty_ self.tl); - raise Empty - | x :: hd' -> - let self' = make_ hd' self.tl in - x, self' -end - exception Closed -exception Full - -module State = struct - type 'a t = { - q: 'a Q.t; - size: int; - pop_waiters: Trigger.t Q.t; - push_waiters: Trigger.t Q.t; - } - - (** @raise Q.Empty *) - let[@inline] pop_one_ ~max_size (st : 'a t) : 'a * 'a t * Trigger.t Q.t = - let x, new_q = Q.pop_exn st.q in - let new_st = { st with q = new_q; size = st.size - 1 } in - if st.size = max_size then - (* we signal all the push waiters, the channel isn't full anymore *) - x, { new_st with push_waiters = Q.empty }, st.push_waiters - else - x, new_st, Q.empty - - (** @raise Full *) - let[@inline] push_one_ ~max_size (st : 'a t) (x : 'a) : 'a t * Trigger.t Q.t = - if st.size >= max_size then raise_notrace Full; - let new_q = Q.push st.q x in - let new_st = { st with q = new_q; size = st.size + 1 } in - if st.size = 0 then - (* we signal all the pop waiters, the channel isn't empty anymore *) - { new_st with pop_waiters = Q.empty }, st.pop_waiters - else - new_st, Q.empty -end type 'a t = { - st: 'a State.t A.t; - closed: bool A.t; + q: 'a Queue.t; + mutex: Mutex.t; (** protects critical section *) + mutable closed: bool; max_size: int; + push_waiters: Trigger.t Queue.t; + pop_waiters: Trigger.t Queue.t; } let create ~max_size () : _ t = if max_size < 0 then invalid_arg "Chan: max_size < 0"; { max_size; - closed = A.make false; - st = - A.make - { - State.q = Q.empty; - size = 0; - pop_waiters = Q.empty; - push_waiters = Q.empty; - }; + mutex = Mutex.create (); + closed = false; + q = Queue.create (); + push_waiters = Queue.create (); + pop_waiters = Queue.create (); } -let try_pop (self : 'a t) : 'a option = - let old_st = A.get self.st in - match State.pop_one_ ~max_size:self.max_size old_st with - | exception Q.Empty -> - if A.get self.closed then raise Closed; - None - | x, new_st, to_broadcast -> - if A.compare_and_set self.st old_st new_st then ( - Q.iter Trigger.signal to_broadcast; - Some x - ) else - None +let try_push (self : _ t) x : bool = + let res = ref false in + if Mutex.try_lock self.mutex then ( + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); -let try_push (self : 'a t) (x : 'a) : bool = - if A.get self.closed then raise Closed; - let old_st = A.get self.st in - match State.push_one_ ~max_size:self.max_size old_st x with - | exception Full -> false - | new_st, to_broadcast -> - if A.compare_and_set self.st old_st new_st then ( - Q.iter Trigger.signal to_broadcast; - true - ) else - false + match Queue.length self.q with + | 0 -> + let to_awake = Queue.create () in + Queue.push x self.q; + Queue.transfer self.pop_waiters to_awake; + res := true; + Mutex.unlock self.mutex; + (* wake up pop triggers if needed. Be careful to do that + outside the critical section*) + Queue.iter Trigger.signal to_awake + | n when n < self.max_size -> + Queue.push x self.q; + Mutex.unlock self.mutex + | _ -> Mutex.unlock self.mutex + ); + !res + +let try_pop (type elt) self : elt option = + let res = ref None in + if Mutex.try_lock self.mutex then ( + (match Queue.pop self.q with + | exception Queue.Empty -> + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ) + | x -> res := Some x); + Mutex.unlock self.mutex + ); + !res let close (self : _ t) : unit = - if not (A.exchange self.closed true) then - while - let old_st = A.get self.st in - if - A.compare_and_set self.st old_st - { old_st with push_waiters = Q.empty; pop_waiters = Q.empty } - then ( - (* signal all waiters *) - Q.iter Trigger.signal old_st.push_waiters; - Q.iter Trigger.signal old_st.pop_waiters; - - false - ) else - true - do - Domain_.relax () - done + let q = Queue.create () in + Mutex.lock self.mutex; + if not self.closed then ( + self.closed <- true; + Queue.transfer self.pop_waiters q; + Queue.transfer self.push_waiters q + ); + Mutex.unlock self.mutex; + Queue.iter Trigger.signal q [@@@ifge 5.0] +let rec push (self : _ t) x : unit = + Mutex.lock self.mutex; + + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + + match Queue.length self.q with + | 0 -> + Queue.push x self.q; + let to_wakeup = Queue.create () in + Queue.transfer self.pop_waiters to_wakeup; + Mutex.unlock self.mutex; + Queue.iter Trigger.signal to_wakeup + | n when n < self.max_size -> + Queue.push x self.q; + Mutex.unlock self.mutex + | _ -> + let tr = Trigger.create () in + Queue.push tr self.push_waiters; + Mutex.unlock self.mutex; + Trigger.await_exn tr; + push self x + let rec pop (self : 'a t) : 'a = - let old_st = A.get self.st in - match State.pop_one_ ~max_size:self.max_size old_st with - | exception Q.Empty -> - if A.get self.closed then raise Closed; + Mutex.lock self.mutex; + match Queue.pop self.q with + | x -> + if Queue.is_empty self.q then ( + let to_wakeup = Queue.create () in + Queue.transfer self.push_waiters to_wakeup; + Mutex.unlock self.mutex; + Queue.iter Trigger.signal to_wakeup + ) else + Mutex.unlock self.mutex; + x + | exception Queue.Empty -> + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); let tr = Trigger.create () in - if - A.compare_and_set self.st old_st - { old_st with pop_waiters = Q.push old_st.pop_waiters tr } - then ( - Trigger.await_exn tr; - pop self - ) else - pop self - | x, new_st, to_broadcast -> - if A.compare_and_set self.st old_st new_st then ( - Q.iter Trigger.signal to_broadcast; - x - ) else - pop self - -let push (self : _ t) x : unit = - while - if A.get self.closed then raise Closed; - - let old_st = A.get self.st in - match State.push_one_ ~max_size:self.max_size old_st x with - | exception Full -> - let tr = Trigger.create () in - if - A.compare_and_set self.st old_st - { old_st with push_waiters = Q.push old_st.push_waiters tr } - then - Trigger.await_exn tr; - true - | new_st, to_broadcast -> - if A.compare_and_set self.st old_st new_st then ( - Q.iter Trigger.signal to_broadcast; - false - ) else - true - do - Domain_.relax () - done + Queue.push tr self.pop_waiters; + Mutex.unlock self.mutex; + Trigger.await_exn tr; + pop self [@@@endif] From d4be74c1b78cb3985941a149c4cec8e4086e8816 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 25 Sep 2024 22:54:18 -0400 Subject: [PATCH 8/9] cleanup --- src/core/chan.ml | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/core/chan.ml b/src/core/chan.ml index 0debda30..be4ac3b9 100644 --- a/src/core/chan.ml +++ b/src/core/chan.ml @@ -1,5 +1,3 @@ -module A = Atomic_ - exception Closed type 'a t = { From c7f517cc28b3d64096db8c2eca7f756b48d20bb2 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 25 Sep 2024 22:54:21 -0400 Subject: [PATCH 9/9] bench prime: cli arg to pick chan size --- benchs/primes.ml | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/benchs/primes.ml b/benchs/primes.ml index e3e60d06..1a8557d9 100644 --- a/benchs/primes.ml +++ b/benchs/primes.ml @@ -14,10 +14,10 @@ let filter' in_chan out_chan prime = in loop () -let main ~n ~on_prime () : unit = +let main ~chan_size ~n ~on_prime () : unit = let@ runner = Moonpool.Ws_pool.with_ () in let@ () = Moonpool.Ws_pool.run_wait_block runner in - let primes = ref @@ Moonpool.Chan.create ~max_size:32 () in + let primes = ref @@ Moonpool.Chan.create ~max_size:chan_size () in Moonpool.run_async runner (let chan = !primes in fun () -> generate' chan); @@ -25,7 +25,7 @@ let main ~n ~on_prime () : unit = for _i = 1 to n do let prime = Moonpool.Chan.pop !primes in on_prime prime; - let filtered_chan = Moonpool.Chan.create ~max_size:32 () in + let filtered_chan = Moonpool.Chan.create ~max_size:chan_size () in Moonpool.run_async runner (let in_chan = !primes in fun () -> filter' in_chan filtered_chan prime); @@ -34,11 +34,13 @@ let main ~n ~on_prime () : unit = let () = let n = ref 10_000 in + let chan_size = ref 0 in let time = ref true in let opts = [ "-n", Arg.Set_int n, " number of iterations"; "--no-time", Arg.Clear time, " do not compute time"; + "--chan-size", Arg.Set_int chan_size, " channel size"; ] |> Arg.align in @@ -48,7 +50,7 @@ let () = let t_start = Unix.gettimeofday () in let n_primes = Atomic.make 0 in - main ~n:!n ~on_prime:(fun _ -> Atomic.incr n_primes) (); + main ~n:!n ~chan_size:!chan_size ~on_prime:(fun _ -> Atomic.incr n_primes) (); let elapsed : float = Unix.gettimeofday () -. t_start in Printf.printf "computed %d primes%s\n%!" (Atomic.get n_primes)