diff --git a/bench_primes.sh b/bench_primes.sh old mode 100644 new mode 100755 diff --git a/benchs/primes.ml b/benchs/primes.ml index 570b850f..e3e60d06 100644 --- a/benchs/primes.ml +++ b/benchs/primes.ml @@ -1,32 +1,58 @@ let ( let@ ) = ( @@ ) +let spf = Printf.sprintf let generate' chan = for i = 2 to Int.max_int do - Moonpool.Chan.push chan i; - Thread.yield () + Moonpool.Chan.push chan i done let filter' in_chan out_chan prime = let rec loop () = - let n = Moonpool.Chan.pop_await in_chan in + let n = Moonpool.Chan.pop in_chan in if n mod prime <> 0 then Moonpool.Chan.push out_chan n; loop () in loop () -let () = +let main ~n ~on_prime () : unit = let@ runner = Moonpool.Ws_pool.with_ () in let@ () = Moonpool.Ws_pool.run_wait_block runner in - let primes = ref @@ Moonpool.Chan.create () in + let primes = ref @@ Moonpool.Chan.create ~max_size:32 () in Moonpool.run_async runner (let chan = !primes in fun () -> generate' chan); - for _i = 1 to 10 do - let prime = Moonpool.Chan.pop_await !primes in - Format.printf "%d\n@?" prime; - let filtered_chan = Moonpool.Chan.create () in + + for _i = 1 to n do + let prime = Moonpool.Chan.pop !primes in + on_prime prime; + let filtered_chan = Moonpool.Chan.create ~max_size:32 () in Moonpool.run_async runner (let in_chan = !primes in fun () -> filter' in_chan filtered_chan prime); primes := filtered_chan done + +let () = + let n = ref 10_000 in + let time = ref true in + let opts = + [ + "-n", Arg.Set_int n, " number of iterations"; + "--no-time", Arg.Clear time, " do not compute time"; + ] + |> Arg.align + in + Arg.parse opts ignore ""; + Printf.printf "computing %d primes\n%!" !n; + + let t_start = Unix.gettimeofday () in + + let n_primes = Atomic.make 0 in + main ~n:!n ~on_prime:(fun _ -> Atomic.incr n_primes) (); + + let elapsed : float = Unix.gettimeofday () -. t_start in + Printf.printf "computed %d primes%s\n%!" (Atomic.get n_primes) + (if !time then + spf " in %.4fs" elapsed + else + "") diff --git a/test/effect-based/t_chan_train.ml b/test/effect-based/t_chan_train.ml index 09b51987..e972dabc 100644 --- a/test/effect-based/t_chan_train.ml +++ b/test/effect-based/t_chan_train.ml @@ -3,8 +3,6 @@ open Moonpool (* large pool, some of our tasks below are long lived *) let pool = Ws_pool.create ~num_threads:30 () -open Fut.Infix - type event = | E_int of int | E_close @@ -66,7 +64,9 @@ let run () = let sum = ref 0 in try while true do - match Chan.pop_block_exn oc with + match + Fut.spawn ~on:pool (fun () -> Chan.pop oc) |> Fut.wait_block_exn + with | E_close -> raise Exit | E_int x -> sum := !sum + x done; diff --git a/test/fiber/t_fib1.ml b/test/fiber/t_fib1.ml index 7ceedbf4..77360b2b 100644 --- a/test/fiber/t_fib1.ml +++ b/test/fiber/t_fib1.ml @@ -52,14 +52,14 @@ let () = let clock = ref TS.init in let fib = F.spawn_top ~on:runner @@ fun () -> - let chan_progress = Chan.create () in - let chans = Array.init 5 (fun _ -> Chan.create ()) in + let chan_progress = Chan.create ~max_size:4 () in + let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in let subs = List.init 5 (fun i -> F.spawn ~protect:false @@ fun _n -> Thread.delay (float i *. 0.01); - Chan.pop_await chans.(i); + Chan.pop chans.(i); Chan.push chan_progress i; F.check_if_cancelled (); i) @@ -70,7 +70,7 @@ let () = F.spawn_ignore (fun () -> for i = 0 to 4 do Chan.push chans.(i) (); - let i' = Chan.pop_await chan_progress in + let i' = Chan.pop chan_progress in assert (i = i') done); @@ -110,8 +110,8 @@ let () = @@ Exn_bt.show ebt) in - let chans_unblock = Array.init 10 (fun _i -> Chan.create ()) in - let chan_progress = Chan.create () in + let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in + let chan_progress = Chan.create ~max_size:4 () in logf (TS.tick_get clock) "start fibers"; let subs = @@ -126,7 +126,7 @@ let () = Thread.delay 0.002; (* sync for determinism *) - Chan.pop_await chans_unblock.(i); + Chan.pop chans_unblock.(i); Chan.push chan_progress i; if i = 7 then ( @@ -150,7 +150,7 @@ let () = F.spawn_ignore (fun () -> for j = 0 to 9 do Chan.push chans_unblock.(j) (); - let j' = Chan.pop_await chan_progress in + let j' = Chan.pop chan_progress in assert (j = j') done);