From 8614d4be408be79a6f6afe31e7fb68a64a593566 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 20 Feb 2024 13:45:05 -0500 Subject: [PATCH] simplify implem for background_thread for now --- src/core/background_thread.ml | 168 ++-------------------------------- 1 file changed, 6 insertions(+), 162 deletions(-) diff --git a/src/core/background_thread.ml b/src/core/background_thread.ml index daeceece..163d1897 100644 --- a/src/core/background_thread.ml +++ b/src/core/background_thread.ml @@ -1,105 +1,6 @@ -open Types_ include Runner let ( let@ ) = ( @@ ) -let k_storage = Task_local_storage.Private_.Storage.k_storage - -type task_full = { - f: unit -> unit; - ls: Task_local_storage.storage; -} - -type state = { - mutable bg_thread: Thread.t; - q: task_full Bb_queue.t; (** Queue for tasks. *) -} -(** internal state *) - -let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q - -(** Run [task] as is, on the pool. *) -let schedule_ (self : state) (task : task_full) : unit = - try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown - -type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task - -let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit = - let cur_ls : Task_local_storage.storage ref = ref Task_local_storage.Private_.Storage.dummy in - TLS.set k_storage (Some cur_ls); - TLS.get Runner.For_runner_implementors.k_cur_runner := Some runner; - - let (AT_pair (before_task, after_task)) = around_task in - - let cur_span = ref Tracing_.dummy_span in - - let[@inline] exit_span_ () = - Tracing_.exit_span !cur_span; - cur_span := Tracing_.dummy_span - in - - let on_suspend () = - exit_span_ (); - !cur_ls - in - - let run_another_task ls task' = - let ls' = Task_local_storage.Private_.Storage.copy ls in - schedule_ self { f = task'; ls = ls' } - in - - let run_task (task : task_full) : unit = - cur_ls := task.ls; - let _ctx = before_task runner in - - let resume ls k res = - schedule_ self { f = (fun () -> k res); ls } - in - - (* run the task now, catching errors, handling effects *) - (try -[@@@ifge 5.0] - Suspend_.with_suspend (WSH { - run=run_another_task; - resume; - on_suspend; - }) task.f -[@@@else_] - task.f() -[@@@endif] - with e -> - let bt = Printexc.get_raw_backtrace () in - on_exn e bt); - exit_span_ (); - after_task runner _ctx; - cur_ls := Task_local_storage.Private_.Storage.dummy - in - - let main_loop () = - let continue = ref true in - let local = Queue.create() in - while !continue do - assert (!cur_span = Tracing_.dummy_span); - match Bb_queue.transfer self.q local with - | () -> - while not (Queue.is_empty local) do - let task = Queue.pop local in - run_task task - done - | exception Bb_queue.Closed -> continue := false - done - in - - try - (* handle domain-local await *) - Dla_.using ~prepare_for_await:Suspend_.prepare_for_await - ~while_running:main_loop - with Bb_queue.Closed -> () - -let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = () - -let shutdown_ ~wait (self : state) : unit = - Bb_queue.close self.q; - if wait then Thread.join self.bg_thread type ('a, 'b) create_args = ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> @@ -108,72 +9,15 @@ type ('a, 'b) create_args = ?around_task:(t -> 'b) * (t -> 'b -> unit) -> ?name:string -> 'a +(** Arguments used in {!create}. See {!create} for explanations. *) -let create ?(on_init_thread = default_thread_init_exit_) - ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) - ?around_task ?name () : t = - (* wrapper *) - let around_task = - match around_task with - | Some (f, g) -> AT_pair (f, g) - | None -> AT_pair (ignore, fun _ _ -> ()) - in - - let num_domains = Domain_pool_.n_domains () in - - (* make sure we don't bias towards the first domain(s) in {!D_pool_} *) - let dom_idx = Random.int num_domains in +let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?name () : t = + Fifo_pool.create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?name + ~num_threads:1 () +let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?name () f = let pool = - let dummy = Thread.self () in - { bg_thread = dummy; q = Bb_queue.create () } - in - - let run_async ~ls f = schedule_ pool { f; ls } in - - let runner = - Runner.For_runner_implementors.create - ~shutdown:(fun ~wait () -> shutdown_ pool ~wait) - ~run_async - ~size:(fun () ->1) - ~num_tasks:(fun () -> num_tasks_ pool) - () - in - - let start_thread () : Thread.t = - (* function run in the thread itself *) - let main_thread_fun () : unit = - let thread = Thread.self () in - let t_id = Thread.id thread in - on_init_thread ~dom_id:dom_idx ~t_id (); - - (* set thread name *) - Option.iter - (fun name -> - Tracing_.set_thread_name (Printf.sprintf "%s.worker" name)) - name; - - let run () = worker_thread_ pool runner ~on_exn ~around_task in - - (* now run the main loop *) - Fun.protect run ~finally:(fun () -> - (* on termination, decrease refcount of underlying domain *) - Domain_pool_.decr_on dom_idx); - on_exit_thread ~dom_id:dom_idx ~t_id () - in - Thread.create main_thread_fun () - in - - let bg_thread = Domain_pool_.run_on_and_wait dom_idx start_thread in - pool.bg_thread <- bg_thread; - - runner - -let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task - ?name () f = - let pool = - create ?on_init_thread ?on_exit_thread ?on_exn ?around_task - ?name () + create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?name () in let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in f pool