diff --git a/src/pool.ml b/src/pool.ml index 4dda7141..ebc1cab1 100644 --- a/src/pool.ml +++ b/src/pool.ml @@ -40,8 +40,12 @@ let find_current_worker_ (self : state) : worker_state option = (** Run [task] as is, on the pool. *) let run_direct_ (self : state) (w : worker_state option) (task : task) : unit = match w with - | Some w -> WSQ.push w.q task - | None -> Bb_queue.push self.main_q task + | Some w -> + print_endline "push local"; + WSQ.push w.q task + | None -> + print_endline "push blocking"; + Bb_queue.push self.main_q task let run_async_ (self : state) (task : task) : unit = (* stay on current worker if possible *) @@ -88,6 +92,7 @@ let worker_thread_ (self : state) (runner : t) (w : worker_state) ~on_exn in let run_self_tasks_ () = + print_endline "run self tasks"; let continue = ref true in let pop_retries = ref 0 in while !continue do @@ -104,6 +109,7 @@ let worker_thread_ (self : state) (runner : t) (w : worker_state) ~on_exn (* get a task from another worker *) let try_to_steal_work () : task option = + print_endline "try to steal work"; try for _retry = 1 to 3 do Array.iter @@ -119,32 +125,46 @@ let worker_thread_ (self : state) (runner : t) (w : worker_state) ~on_exn with Got_task task -> Some task in - let main_loop () = - let steal_attempts = ref 0 in - while true do - run_self_tasks_ (); - - match try_to_steal_work () with - | Some task -> - steal_attempts := 0; - run_task task - | None -> - incr steal_attempts; - Domain_.relax (); - - if !steal_attempts > steal_attempt_max_retry then ( - steal_attempts := 0; - let task = Bb_queue.pop self.main_q in - run_task task - ) - done + (* try to steal work multiple times *) + let try_to_steal_work_loop () : bool = + try + let unsuccessful_steal_attempts = ref 0 in + while !unsuccessful_steal_attempts < steal_attempt_max_retry do + match try_to_steal_work () with + | Some task -> + run_task task; + raise_notrace Exit + | None -> + incr unsuccessful_steal_attempts; + Domain_.relax () + done; + false + with Exit -> true in - try - (* handle domain-local await *) - Dla_.using ~prepare_for_await:Suspend_.prepare_for_await - ~while_running:main_loop - with Bb_queue.Closed -> () + let main_loop () = + (try + while true do + run_self_tasks_ (); + + if not (try_to_steal_work_loop ()) then ( + Array.iteri + (fun i w -> Printf.printf "w[%d].q.size=%d\n" i (WSQ.size w.q)) + self.workers; + Printf.printf "bq.size=%d\n%!" (Bb_queue.size self.main_q); + + print_endline "wait block"; + let task = Bb_queue.pop self.main_q in + run_task task + ) + done + with Bb_queue.Closed -> ()); + run_self_tasks_ () + in + + (* handle domain-local await *) + Dla_.using ~prepare_for_await:Suspend_.prepare_for_await + ~while_running:main_loop let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = ()