diff --git a/src/core/ws_pool.ml b/src/core/ws_pool.ml index 70d803c8..e10529d9 100644 --- a/src/core/ws_pool.ml +++ b/src/core/ws_pool.ml @@ -6,18 +6,7 @@ include Runner let ( let@ ) = ( @@ ) -module Id = struct - type t = unit ref - (** Unique identifier for a pool *) - - let create () : t = Sys.opaque_identity (ref ()) - let equal : t -> t -> bool = ( == ) -end - type state = { - id_: Id.t; - (** Unique to this pool. Used to make sure tasks stay within the same - pool. *) active: bool A.t; (** Becomes [false] when the pool is shutdown. *) mutable workers: worker_state array; (** Fixed set of workers. *) main_q: WL.task_full Queue.t; @@ -99,12 +88,15 @@ let schedule_in_main_queue (self : state) task : unit = longer permitted *) raise Shutdown -let schedule_from_w (self : worker_state) (task : WL.task_full) : unit = +let schedule_from_anywhere_ (st : state) (task : WL.task_full) : unit = match get_current_worker_ () with - | Some w when Id.equal self.st.id_ w.st.id_ -> + | Some w when st == w.st -> (* use worker from the same pool *) schedule_on_current_worker w task - | _ -> schedule_in_main_queue self.st task + | _ -> schedule_in_main_queue st task + +let schedule_from_w (w : worker_state) task : unit = + schedule_from_anywhere_ w.st task exception Got_task of WL.task_full @@ -224,9 +216,7 @@ let as_runner_ (self : state) : t = ~shutdown:(fun ~wait () -> shutdown_ self ~wait) ~run_async:(fun ~fiber f -> let task = WL.T_start { fiber; f } in - match get_current_worker_ () with - | Some wst when wst.st == self -> schedule_from_w wst task - | _ -> schedule_in_main_queue self task) + schedule_from_anywhere_ self task) ~size:(fun () -> size_ self) ~num_tasks:(fun () -> num_tasks_ self) () @@ -243,7 +233,6 @@ type ('a, 'b) create_args = let create ?(on_init_thread = default_thread_init_exit_) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) ?num_threads ?name () : t = - let pool_id_ = Id.create () in let num_domains = Domain_pool_.max_number_of_domains () in let num_threads = Util_pool_.num_threads ?num_threads () in @@ -252,7 +241,6 @@ let create ?(on_init_thread = default_thread_init_exit_) let pool = { - id_ = pool_id_; active = A.make true; workers = [||]; main_q = Queue.create ();