diff --git a/src/pool.ml b/src/pool.ml index b4c5ac7c..01a093b3 100644 --- a/src/pool.ml +++ b/src/pool.ml @@ -75,21 +75,24 @@ let run_direct_ (self : state) (w : worker_state option) (task : task) : unit = raise Shutdown let run_async_ (self : state) (task : task) : unit = - (* stay on current worker if possible *) - let w = find_current_worker_ self in - - let rec run_async_rec_ (task : task) = + (* run [task] inside a suspension handler *) + let rec run_async_in_suspend_rec_ (task : task) = let task_with_suspend_ () = (* run [f()] and handle [suspend] in it *) Suspend_.with_suspend task ~run:(fun ~with_handler task' -> if with_handler then - run_async_rec_ task' - else - run_direct_ self w task') + run_async_in_suspend_rec_ task' + else ( + let w = find_current_worker_ self in + run_direct_ self w task' + )) in + + (* schedule on current worker, if run from a worker *) + let w = find_current_worker_ self in run_direct_ self w task_with_suspend_ in - run_async_rec_ task + run_async_in_suspend_rec_ task let run = run_async @@ -100,6 +103,7 @@ type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task (** How many times in a row do we try to do work-stealing? *) let steal_attempt_max_retry = 3 +(** Main loop for a worker thread. *) let worker_thread_ (self : state) (runner : t) (w : worker_state) ~on_exn ~around_task : unit = let (AT_pair (before_task, after_task)) = around_task in @@ -143,22 +147,6 @@ let worker_thread_ (self : state) (runner : t) (w : worker_state) ~on_exn WSQ.steal w'.q in - (* - try - for _retry = 1 to 1 do - for i = 0 to Array.length self.workers - 1 do - let w' = self.workers.(i) in - if w != w' then ( - match WSQ.steal w'.q with - | None -> () - | Some task -> raise_notrace (Got_task task) - ) - done - done; - None - with Got_task task -> Some task - *) - (* try to steal work multiple times *) let try_to_steal_work_loop () : bool = if size_ self = 1 then diff --git a/test/effect-based/t_many.ml b/test/effect-based/t_many.ml index 8b5b76e3..23e1a929 100644 --- a/test/effect-based/t_many.ml +++ b/test/effect-based/t_many.ml @@ -4,8 +4,7 @@ open Moonpool let ( let@ ) = ( @@ ) -let run () = - let@ pool = Pool.with_ ~min:4 () in +let run ~pool () = let t1 = Unix.gettimeofday () in let n = 200_000 in @@ -15,7 +14,7 @@ let run () = Fut.spawn ~on:pool (fun () -> List.fold_left (fun n x -> - let _res = Fut.await x in + let _res = Sys.opaque_identity (Fut.await x) in n + 1) 0 l) in @@ -29,6 +28,23 @@ let run () = Printf.printf "in %.4fs\n%!" (Unix.gettimeofday () -. t1); assert (List.for_all (fun s -> s = n) lens) -let () = run () +let () = + (print_endline "with fifo"; + let@ pool = Fifo_pool.with_ ~min:4 () in + run ~pool ()); + + (print_endline "with WS(1)"; + let@ pool = Pool.with_ ~min:1 () in + run ~pool ()); + + (print_endline "with WS(2)"; + let@ pool = Pool.with_ ~min:2 () in + run ~pool ()); + + (print_endline "with WS(4)"; + let@ pool = Pool.with_ ~min:4 () in + run ~pool ()); + + () [@@@endif]