From cf8555bcec0483cf5f39c3a9c096f4161477092a Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 14 Feb 2024 21:05:35 -0500 Subject: [PATCH] revert: remove name on futures and tasks async tracing will be more robust, and is enabled by task local storage --- benchs/fib_rec.ml | 8 ++--- benchs/pi.ml | 2 +- src/core/fifo_pool.ml | 14 ++++---- src/core/fut.ml | 59 +++++++++++++------------------ src/core/fut.mli | 13 +++---- src/core/immediate_runner.ml | 5 +-- src/core/moonpool.mli | 26 +++----------- src/core/runner.ml | 12 +++---- src/core/runner.mli | 11 ++---- src/core/suspend_.ml | 5 ++- src/core/suspend_.mli | 6 ++-- src/core/ws_pool.ml | 37 +++++++------------ src/fib/fiber.ml | 12 +++---- src/fib/fiber.mli | 4 +-- src/forkjoin/moonpool_forkjoin.ml | 2 +- test/effect-based/t_fib1.ml | 2 +- 16 files changed, 80 insertions(+), 138 deletions(-) diff --git a/benchs/fib_rec.ml b/benchs/fib_rec.ml index 66eded93..82d588cf 100644 --- a/benchs/fib_rec.ml +++ b/benchs/fib_rec.ml @@ -14,7 +14,7 @@ let cutoff = ref 20 let rec fib ~on x : int Fut.t = if x <= !cutoff then - Fut.spawn ~name:"fib" ~on (fun () -> fib_direct x) + Fut.spawn ~on (fun () -> fib_direct x) else let open Fut.Infix in let+ t1 = fib ~on (x - 1) and+ t2 = fib ~on (x - 2) in @@ -31,14 +31,14 @@ let fib_fj ~on x : int Fut.t = n1 + n2 ) in - Fut.spawn ~name:"fib" ~on (fun () -> fib_rec x) + Fut.spawn ~on (fun () -> fib_rec x) let fib_await ~on x : int Fut.t = let rec fib_rec x : int Fut.t = if x <= !cutoff then - Fut.spawn ~name:"fib" ~on (fun () -> fib_direct x) + Fut.spawn ~on (fun () -> fib_direct x) else - Fut.spawn ~name:"fib" ~on (fun () -> + Fut.spawn ~on (fun () -> let n1 = fib_rec (x - 1) in let n2 = fib_rec (x - 2) in let n1 = Fut.await n1 in diff --git a/benchs/pi.ml b/benchs/pi.ml index 4eae7eb0..63ddc2ca 100644 --- a/benchs/pi.ml +++ b/benchs/pi.ml @@ -76,7 +76,7 @@ let run_fork_join ~kind num_steps : float = let step = 1. /. float num_steps in let global_sum = Lock.create 0. in - Ws_pool.run_wait_block ~name:"pi.fj" pool (fun () -> + Ws_pool.run_wait_block pool (fun () -> FJ.for_ ~chunk_size:(3 + (num_steps / num_tasks)) num_steps diff --git a/src/core/fifo_pool.ml b/src/core/fifo_pool.ml index d2757324..58177b3f 100644 --- a/src/core/fifo_pool.ml +++ b/src/core/fifo_pool.ml @@ -6,7 +6,6 @@ let k_storage = Task_local_storage.Private_.Storage.k_storage type task_full = { f: unit -> unit; - name: string; ls: Task_local_storage.storage; } @@ -44,18 +43,17 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit = !cur_ls in - let run_another_task ls ~name task' = + let run_another_task ls task' = let ls' = Task_local_storage.Private_.Storage.copy ls in - schedule_ self { f = task'; name; ls = ls' } + schedule_ self { f = task'; ls = ls' } in let run_task (task : task_full) : unit = cur_ls := task.ls; let _ctx = before_task runner in - cur_span := Tracing_.enter_span task.name; let resume ls k res = - schedule_ self { f = (fun () -> k res); name = task.name; ls } + schedule_ self { f = (fun () -> k res); ls } in (* run the task now, catching errors, handling effects *) @@ -105,12 +103,12 @@ type ('a, 'b) create_args = ?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> ?around_task:(t -> 'b) * (t -> 'b -> unit) -> ?num_threads:int -> - ?name:string -> + ?name:string -> 'a let create ?(on_init_thread = default_thread_init_exit_) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) - ?around_task ?num_threads ?name () : t = + ?around_task ?num_threads ?name () : t = (* wrapper *) let around_task = match around_task with @@ -131,7 +129,7 @@ let create ?(on_init_thread = default_thread_init_exit_) { threads = Array.make num_threads dummy; q = Bb_queue.create () } in - let run_async ~name ~ls f = schedule_ pool { f; name; ls } in + let run_async ~ls f = schedule_ pool { f; ls } in let runner = Runner.For_runner_implementors.create diff --git a/src/core/fut.ml b/src/core/fut.ml index 2c7d6896..5cbcb366 100644 --- a/src/core/fut.ml +++ b/src/core/fut.ml @@ -5,21 +5,13 @@ type 'a waiter = 'a or_error -> unit type 'a state = | Done of 'a or_error - | Waiting of { - waiters: 'a waiter list; - name: string; - } + | Waiting of { waiters: 'a waiter list } type 'a t = { st: 'a state A.t } [@@unboxed] type 'a promise = 'a t -let[@inline] get_name_ (self : _ t) = - match A.get self.st with - | Done _ -> "" - | Waiting { name; _ } -> name - -let make ?(name = "") () = - let fut = { st = A.make (Waiting { waiters = []; name }) } in +let make () = + let fut = { st = A.make (Waiting { waiters = [] }) } in fut, fut let[@inline] of_result x : _ t = { st = A.make (Done x) } @@ -72,8 +64,8 @@ let on_result (self : _ t) (f : _ waiter) : unit = | Done x -> f x; false - | Waiting { waiters = l; name } -> - not (A.compare_and_set self.st st (Waiting { waiters = f :: l; name })) + | Waiting { waiters = l } -> + not (A.compare_and_set self.st st (Waiting { waiters = f :: l })) do Domain_.relax () done @@ -86,7 +78,7 @@ let fulfill (self : _ t) (r : _ result) : unit = let st = A.get self.st in match st with | Done _ -> raise Already_fulfilled - | Waiting { waiters = l; name = _ } -> + | Waiting { waiters = l } -> let did_swap = A.compare_and_set self.st st (Done r) in if did_swap then ( (* success, now call all the waiters *) @@ -105,7 +97,7 @@ let[@inline] fulfill_idempotent self r = (* ### combinators ### *) -let spawn ?name ?ls ~on f : _ t = +let spawn ?ls ~on f : _ t = let fut, promise = make () in let task () = @@ -118,13 +110,13 @@ let spawn ?name ?ls ~on f : _ t = fulfill promise res in - Runner.run_async ?name ?ls on task; + Runner.run_async ?ls on task; fut -let spawn_on_current_runner ?name ?ls f : _ t = +let spawn_on_current_runner ?ls f : _ t = match Runner.get_current_runner () with | None -> failwith "Fut.spawn_on_current_runner: not running on a runner" - | Some on -> spawn ?name ?ls ~on f + | Some on -> spawn ?ls ~on f let reify_error (f : 'a t) : 'a or_error t = match peek f with @@ -150,22 +142,20 @@ let map ?on ~f fut : _ t = | Error e_bt -> Error e_bt in - let name = get_name_ fut in match peek fut, get_runner_ ?on () with | Some res, None -> of_result @@ map_immediate_ res | Some res, Some runner -> - let fut2, promise = make ~name () in - Runner.run_async ~name runner (fun () -> - fulfill promise @@ map_immediate_ res); + let fut2, promise = make () in + Runner.run_async runner (fun () -> fulfill promise @@ map_immediate_ res); fut2 | None, None -> - let fut2, promise = make ~name () in + let fut2, promise = make () in on_result fut (fun res -> fulfill promise @@ map_immediate_ res); fut2 | None, Some runner -> - let fut2, promise = make ~name () in + let fut2, promise = make () in on_result fut (fun res -> - Runner.run_async ~name runner (fun () -> + Runner.run_async runner (fun () -> fulfill promise @@ map_immediate_ res)); fut2 @@ -174,7 +164,7 @@ let join (fut : 'a t t) : 'a t = | Some (Ok f) -> f | Some (Error (e, bt)) -> fail e bt | None -> - let fut2, promise = make ~name:(get_name_ fut) () in + let fut2, promise = make () in on_result fut (function | Ok sub_fut -> on_result sub_fut (fulfill promise) | Error _ as e -> fulfill promise e); @@ -197,20 +187,19 @@ let bind ?on ~f fut : _ t = on_result f_res_fut (fun r -> fulfill promise r) in - let name = get_name_ fut in match peek fut, get_runner_ ?on () with | Some res, Some runner -> - let fut2, promise = make ~name () in - Runner.run_async ~name runner (bind_and_fulfill res promise); + let fut2, promise = make () in + Runner.run_async runner (bind_and_fulfill res promise); fut2 | Some res, None -> apply_f_to_res res | None, Some runner -> - let fut2, promise = make ~name () in + let fut2, promise = make () in on_result fut (fun r -> - Runner.run_async ~name runner (bind_and_fulfill r promise)); + Runner.run_async runner (bind_and_fulfill r promise)); fut2 | None, None -> - let fut2, promise = make ~name () in + let fut2, promise = make () in on_result fut (fun res -> bind_and_fulfill res promise ()); fut2 @@ -234,7 +223,7 @@ let both a b : _ t = | Some (Ok x), Some (Ok y) -> return (x, y) | Some (Error (e, bt)), _ | _, Some (Error (e, bt)) -> fail e bt | _ -> - let fut, promise = make ~name:(get_name_ a) () in + let fut, promise = make () in let st = A.make `Neither in on_result a (function @@ -267,7 +256,7 @@ let choose a b : _ t = | _, Some (Ok y) -> return (Either.Right y) | Some (Error (e, bt)), Some (Error _) -> fail e bt | _ -> - let fut, promise = make ~name:(get_name_ a) () in + let fut, promise = make () in let one_failure = A.make false in on_result a (function @@ -290,7 +279,7 @@ let choose_same a b : _ t = | _, Some (Ok y) -> return y | Some (Error (e, bt)), Some (Error _) -> fail e bt | _ -> - let fut, promise = make ~name:(get_name_ a) () in + let fut, promise = make () in let one_failure = A.make false in on_result a (function diff --git a/src/core/fut.mli b/src/core/fut.mli index a82975f3..7c0d4466 100644 --- a/src/core/fut.mli +++ b/src/core/fut.mli @@ -26,9 +26,8 @@ type 'a promise (** A promise, which can be fulfilled exactly once to set the corresponding future *) -val make : ?name:string -> unit -> 'a t * 'a promise -(** Make a new future with the associated promise. - @param name name for the future, used for tracing. since NEXT_RELEASE. *) +val make : unit -> 'a t * 'a promise +(** Make a new future with the associated promise. *) val on_result : 'a t -> ('a or_error -> unit) -> unit (** [on_result fut f] registers [f] to be called in the future @@ -95,16 +94,12 @@ val is_failed : _ t -> bool (** {2 Combinators} *) val spawn : - ?name:string -> - ?ls:Task_local_storage.storage -> - on:Runner.t -> - (unit -> 'a) -> - 'a t + ?ls:Task_local_storage.storage -> on:Runner.t -> (unit -> 'a) -> 'a t (** [spaw ~on f] runs [f()] on the given runner [on], and return a future that will hold its result. *) val spawn_on_current_runner : - ?name:string -> ?ls:Task_local_storage.storage -> (unit -> 'a) -> 'a t + ?ls:Task_local_storage.storage -> (unit -> 'a) -> 'a t (** This must be run from inside a runner, and schedules the new task on it as well. diff --git a/src/core/immediate_runner.ml b/src/core/immediate_runner.ml index c260f439..9412fd35 100644 --- a/src/core/immediate_runner.ml +++ b/src/core/immediate_runner.ml @@ -4,19 +4,16 @@ include Runner (* convenient alias *) let k_ls = Task_local_storage.Private_.Storage.k_storage -let run_async_ ~name ~ls f = +let run_async_ ~ls f = let cur_ls = ref ls in TLS.set k_ls (Some cur_ls); cur_ls := ls; - let sp = Tracing_.enter_span name in try let x = f () in - Tracing_.exit_span sp; TLS.set k_ls None; x with e -> let bt = Printexc.get_raw_backtrace () in - Tracing_.exit_span sp; TLS.set k_ls None; Printexc.raise_with_backtrace e bt diff --git a/src/core/moonpool.mli b/src/core/moonpool.mli index 23ee52d8..c8049503 100644 --- a/src/core/moonpool.mli +++ b/src/core/moonpool.mli @@ -26,25 +26,14 @@ val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t to run all the various threads needed in an application (timers, event loops, etc.) *) val run_async : - ?name:string -> - ?ls:Task_local_storage.storage -> - Runner.t -> - (unit -> unit) -> - unit + ?ls:Task_local_storage.storage -> Runner.t -> (unit -> unit) -> unit (** [run_async runner task] schedules the task to run on the given runner. This means [task()] will be executed at some point in the future, possibly in another thread. - @param name if provided and [Trace] is present in dependencies, a span - will be created when the task starts, and will stop when the task is over. - (since NEXT_RELEASE) @since 0.5 *) val run_wait_block : - ?name:string -> - ?ls:Task_local_storage.storage -> - Runner.t -> - (unit -> 'a) -> - 'a + ?ls:Task_local_storage.storage -> Runner.t -> (unit -> 'a) -> 'a (** [run_wait_block runner f] schedules [f] for later execution on the runner, like {!run_async}. It then blocks the current thread until [f()] is done executing, @@ -63,21 +52,14 @@ val recommended_thread_count : unit -> int @since 0.5 *) val spawn : - ?name:string -> - ?ls:Task_local_storage.storage -> - on:Runner.t -> - (unit -> 'a) -> - 'a Fut.t + ?ls:Task_local_storage.storage -> on:Runner.t -> (unit -> 'a) -> 'a Fut.t (** [spawn ~on f] runs [f()] on the runner (a thread pool typically) and returns a future result for it. See {!Fut.spawn}. - @param name if provided and [Trace] is present in dependencies, - a span will be created for the future. (since 0.6) @since 0.5 *) val spawn_on_current_runner : - ?name:string -> ?ls:Task_local_storage.storage -> (unit -> 'a) -> 'a Fut.t + ?ls:Task_local_storage.storage -> (unit -> 'a) -> 'a Fut.t (** See {!Fut.spawn_on_current_runner}. - @param name see {!spawn}. since 0.6. @since 0.5 *) [@@@ifge 5.0] diff --git a/src/core/runner.ml b/src/core/runner.ml index 207ea56d..360ec6ba 100644 --- a/src/core/runner.ml +++ b/src/core/runner.ml @@ -3,7 +3,7 @@ module TLS = Thread_local_storage_ type task = unit -> unit type t = { - run_async: name:string -> ls:Task_local_storage.storage -> task -> unit; + run_async: ls:Task_local_storage.storage -> task -> unit; shutdown: wait:bool -> unit -> unit; size: unit -> int; num_tasks: unit -> int; @@ -11,9 +11,9 @@ type t = { exception Shutdown -let[@inline] run_async ?(name = "") - ?(ls = Task_local_storage.Private_.Storage.create ()) (self : t) f : unit = - self.run_async ~name ~ls f +let[@inline] run_async ?(ls = Task_local_storage.Private_.Storage.create ()) + (self : t) f : unit = + self.run_async ~ls f let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true () @@ -23,9 +23,9 @@ let[@inline] shutdown_without_waiting (self : t) : unit = let[@inline] num_tasks (self : t) : int = self.num_tasks () let[@inline] size (self : t) : int = self.size () -let run_wait_block ?name ?ls self (f : unit -> 'a) : 'a = +let run_wait_block ?ls self (f : unit -> 'a) : 'a = let q = Bb_queue.create () in - run_async ?name ?ls self (fun () -> + run_async ?ls self (fun () -> try let x = f () in Bb_queue.push q (Ok x) diff --git a/src/core/runner.mli b/src/core/runner.mli index 5b937c09..331e8b50 100644 --- a/src/core/runner.mli +++ b/src/core/runner.mli @@ -33,19 +33,14 @@ val shutdown_without_waiting : t -> unit exception Shutdown -val run_async : - ?name:string -> ?ls:Task_local_storage.storage -> t -> task -> unit +val run_async : ?ls:Task_local_storage.storage -> t -> task -> unit (** [run_async pool f] schedules [f] for later execution on the runner in one of the threads. [f()] will run on one of the runner's worker threads/domains. - @param name if provided and [Trace] is present in dependencies, a span - will be created when the task starts, and will stop when the task is over. - (since NEXT_RELEASE) @param ls if provided, run the task with this initial local storage @raise Shutdown if the runner was shut down before [run_async] was called. *) -val run_wait_block : - ?name:string -> ?ls:Task_local_storage.storage -> t -> (unit -> 'a) -> 'a +val run_wait_block : ?ls:Task_local_storage.storage -> t -> (unit -> 'a) -> 'a (** [run_wait_block pool f] schedules [f] for later execution on the pool, like {!run_async}. It then blocks the current thread until [f()] is done executing, @@ -65,7 +60,7 @@ module For_runner_implementors : sig size:(unit -> int) -> num_tasks:(unit -> int) -> shutdown:(wait:bool -> unit -> unit) -> - run_async:(name:string -> ls:Task_local_storage.storage -> task -> unit) -> + run_async:(ls:Task_local_storage.storage -> task -> unit) -> unit -> t (** Create a new runner. diff --git a/src/core/suspend_.ml b/src/core/suspend_.ml index 4d15ac77..cdd680a5 100644 --- a/src/core/suspend_.ml +++ b/src/core/suspend_.ml @@ -7,7 +7,7 @@ type task = unit -> unit type suspension_handler = { handle: - run:(name:string -> task -> unit) -> + run:(task -> unit) -> resume:(suspension -> unit Exn_bt.result -> unit) -> suspension -> unit; @@ -27,8 +27,7 @@ type with_suspend_handler = | WSH : { on_suspend: unit -> 'state; (** on_suspend called when [f()] suspends itself. *) - run: 'state -> name:string -> task -> unit; - (** run used to schedule new tasks *) + run: 'state -> task -> unit; (** run used to schedule new tasks *) resume: 'state -> suspension -> unit Exn_bt.result -> unit; (** resume run the suspension. Must be called exactly once. *) } diff --git a/src/core/suspend_.mli b/src/core/suspend_.mli index 1fff43ac..bd8a9a9d 100644 --- a/src/core/suspend_.mli +++ b/src/core/suspend_.mli @@ -14,7 +14,7 @@ type task = unit -> unit type suspension_handler = { handle: - run:(name:string -> task -> unit) -> + run:(task -> unit) -> resume:(suspension -> unit Exn_bt.result -> unit) -> suspension -> unit; @@ -24,7 +24,6 @@ type suspension_handler = { The handler is given a few things: - - the name (if any) of the current computation - the suspended computation (which can be resumed with a result eventually); - a [run] function that can be used to start tasks to perform some @@ -70,8 +69,7 @@ type with_suspend_handler = | WSH : { on_suspend: unit -> 'state; (** on_suspend called when [f()] suspends itself. *) - run: 'state -> name:string -> task -> unit; - (** run used to schedule new tasks *) + run: 'state -> task -> unit; (** run used to schedule new tasks *) resume: 'state -> suspension -> unit Exn_bt.result -> unit; (** resume run the suspension. Must be called exactly once. *) } diff --git a/src/core/ws_pool.ml b/src/core/ws_pool.ml index d1fd7cf3..5627a4bb 100644 --- a/src/core/ws_pool.ml +++ b/src/core/ws_pool.ml @@ -16,7 +16,6 @@ end type task_full = { f: task; - name: string; ls: Task_local_storage.storage; } @@ -26,7 +25,6 @@ type worker_state = { pool_id_: Id.t; (** Unique per pool *) mutable thread: Thread.t; q: task_full WSQ.t; (** Work stealing queue *) - mutable cur_span: int64; cur_ls: Task_local_storage.storage ref; (** Task storage *) rng: Random.State.t; } @@ -75,10 +73,10 @@ let[@inline] try_wake_someone_ (self : state) : unit = ) (** Run [task] as is, on the pool. *) -let schedule_task_ (self : state) ~name ~ls (w : worker_state option) (f : task) +let schedule_task_ (self : state) ~ls (w : worker_state option) (f : task) : unit = (* Printf.printf "schedule task now (%d)\n%!" (Thread.id @@ Thread.self ()); *) - let task = { f; name; ls } in + let task = { f; ls } in match w with | Some w when Id.equal self.id_ w.pool_id_ -> (* we're on this same pool, schedule in the worker's state. Otherwise @@ -107,33 +105,26 @@ let schedule_task_ (self : state) ~name ~ls (w : worker_state option) (f : task) raise Shutdown (** Run this task, now. Must be called from a worker. *) -let run_task_now_ (self : state) ~runner (w : worker_state) ~name ~ls task : +let run_task_now_ (self : state) ~runner (w : worker_state) ~ls task : unit = (* Printf.printf "run task now (%d)\n%!" (Thread.id @@ Thread.self ()); *) let (AT_pair (before_task, after_task)) = self.around_task in w.cur_ls := ls; let _ctx = before_task runner in - w.cur_span <- Tracing_.enter_span name; - let[@inline] exit_span_ () = - Tracing_.exit_span w.cur_span; - w.cur_span <- Tracing_.dummy_span - in - - let on_suspend () = - exit_span_ (); + let[@inline] on_suspend () = !(w.cur_ls) in - let run_another_task ls ~name task' = + let run_another_task ls task' = let w = find_current_worker_ () in let ls' = Task_local_storage.Private_.Storage.copy ls in - schedule_task_ self w ~name ~ls:ls' task' + schedule_task_ self w ~ls:ls' task' in let resume ls k r = let w = find_current_worker_ () in - schedule_task_ self w ~name ~ls (fun () -> k r) + schedule_task_ self w ~ls (fun () -> k r) in (* run the task now, catching errors *) @@ -152,13 +143,12 @@ let run_task_now_ (self : state) ~runner (w : worker_state) ~name ~ls task : let bt = Printexc.get_raw_backtrace () in self.on_exn e bt); - exit_span_ (); after_task runner _ctx; w.cur_ls := Task_local_storage.Private_.Storage.dummy -let[@inline] run_async_ (self : state) ~name ~ls (f : task) : unit = +let[@inline] run_async_ (self : state) ~ls (f : task) : unit = let w = find_current_worker_ () in - schedule_task_ self w ~name ~ls f + schedule_task_ self w ~ls f (* TODO: function to schedule many tasks from the outside. - build a queue @@ -204,7 +194,7 @@ let worker_run_self_tasks_ (self : state) ~runner w : unit = match WSQ.pop w.q with | Some task -> try_wake_someone_ self; - run_task_now_ self ~runner w ~name:task.name ~ls:task.ls task.f + run_task_now_ self ~runner w ~ls:task.ls task.f | None -> continue := false done @@ -217,7 +207,7 @@ let worker_thread_ (self : state) ~(runner : t) (w : worker_state) : unit = worker_run_self_tasks_ self ~runner w; try_steal () and run_task task : unit = - run_task_now_ self ~runner w ~name:task.name ~ls:task.ls task.f; + run_task_now_ self ~runner w ~ls:task.ls task.f; main () and try_steal () = match try_to_steal_work_once_ self w with @@ -276,7 +266,7 @@ type ('a, 'b) create_args = 'a (** Arguments used in {!create}. See {!create} for explanations. *) -let dummy_task_ = { f = ignore; ls = Task_local_storage.Private_.Storage.dummy ; name = "DUMMY_TASK" } +let dummy_task_ = { f = ignore; ls = Task_local_storage.Private_.Storage.dummy ; } let create ?(on_init_thread = default_thread_init_exit_) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) @@ -301,7 +291,6 @@ let create ?(on_init_thread = default_thread_init_exit_) { pool_id_; thread = dummy; - cur_span = Tracing_.dummy_span; q = WSQ.create ~dummy:dummy_task_ (); rng = Random.State.make [| i |]; cur_ls = ref Task_local_storage.Private_.Storage.dummy; @@ -326,7 +315,7 @@ let create ?(on_init_thread = default_thread_init_exit_) let runner = Runner.For_runner_implementors.create ~shutdown:(fun ~wait () -> shutdown_ pool ~wait) - ~run_async:(fun ~name ~ls f -> run_async_ pool ~name ~ls f) + ~run_async:(fun ~ls f -> run_async_ pool ~ls f) ~size:(fun () -> size_ pool) ~num_tasks:(fun () -> num_tasks_ pool) () diff --git a/src/fib/fiber.ml b/src/fib/fiber.ml index 25a4485e..ebfd3319 100644 --- a/src/fib/fiber.ml +++ b/src/fib/fiber.ml @@ -148,9 +148,9 @@ let add_child_ ~protect (self : _ t) (child : _ t) = let k_current_fiber : any option Task_local_storage.key = Task_local_storage.new_key ~init:(fun () -> None) () -let spawn_ ?name ~on (f : _ -> 'a) : 'a t = +let spawn_ ~on (f : _ -> 'a) : 'a t = let id = Handle.generate_fresh () in - let res, _promise = Fut.make ?name () in + let res, _promise = Fut.make () in let fib = { state = A.make @@ Alive { children = FM.empty; on_cancel = [] }; @@ -172,17 +172,17 @@ let spawn_ ?name ~on (f : _ -> 'a) : 'a t = resolve_as_failed_ fib ebt in - Runner.run_async on ?name run; + Runner.run_async on run; fib -let[@inline] spawn_top ?name ~on f : _ t = spawn_ ?name ~on f +let[@inline] spawn_top ~on f : _ t = spawn_ ~on f -let spawn_link ?name ~protect f : _ t = +let spawn_link ~protect f : _ t = match Task_local_storage.get k_current_fiber with | None -> failwith "Fiber.spawn_link: must be run from inside a fiber." | Some (Any parent) -> - let child = spawn_ ?name ~on:parent.runner f in + let child = spawn_ ~on:parent.runner f in add_child_ ~protect parent child; child diff --git a/src/fib/fiber.mli b/src/fib/fiber.mli index dc60b001..5b01948a 100644 --- a/src/fib/fiber.mli +++ b/src/fib/fiber.mli @@ -55,12 +55,12 @@ val on_result : 'a t -> 'a callback -> unit with the result. If the fiber is done already then the callback is invoked immediately with its result. *) -val spawn_top : ?name:string -> on:Runner.t -> (unit -> 'a) -> 'a t +val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t (** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. This fiber is not the child of any other fiber: its lifetime is only determined by the lifetime of [f()]. *) -val spawn_link : ?name:string -> protect:bool -> (unit -> 'a) -> 'a t +val spawn_link : protect:bool -> (unit -> 'a) -> 'a t (** [spawn_link ~protect f] spawns a sub-fiber [f_child] from a running fiber [parent]. The sub-fiber [f_child] is attached to the current fiber and fails diff --git a/src/forkjoin/moonpool_forkjoin.ml b/src/forkjoin/moonpool_forkjoin.ml index 27aa1984..052ca7f2 100644 --- a/src/forkjoin/moonpool_forkjoin.ml +++ b/src/forkjoin/moonpool_forkjoin.ml @@ -135,7 +135,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit = let len_range = min chunk_size (n - offset) in assert (offset + len_range <= n); - run ~name:"" (fun () -> task_for ~offset ~len_range); + run (fun () -> task_for ~offset ~len_range); i := !i + len_range done in diff --git a/test/effect-based/t_fib1.ml b/test/effect-based/t_fib1.ml index 5a9d66e6..a2f62e82 100644 --- a/test/effect-based/t_fib1.ml +++ b/test/effect-based/t_fib1.ml @@ -20,7 +20,7 @@ let fib ~on x : int Fut.t = Fut.await t1 + Fut.await t2 ) in - Fut.spawn ~name:"fib" ~on (fun () -> fib_rec x) + Fut.spawn ~on (fun () -> fib_rec x) (* NOTE: for tracy support let () = Tracy_client_trace.setup ()