revert: remove name on futures and tasks

async tracing will be more robust, and is enabled by
task local storage
This commit is contained in:
Simon Cruanes 2024-02-14 21:05:35 -05:00
parent b0d2716eff
commit cf8555bcec
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
16 changed files with 80 additions and 138 deletions

View file

@ -14,7 +14,7 @@ let cutoff = ref 20
let rec fib ~on x : int Fut.t = let rec fib ~on x : int Fut.t =
if x <= !cutoff then if x <= !cutoff then
Fut.spawn ~name:"fib" ~on (fun () -> fib_direct x) Fut.spawn ~on (fun () -> fib_direct x)
else else
let open Fut.Infix in let open Fut.Infix in
let+ t1 = fib ~on (x - 1) and+ t2 = fib ~on (x - 2) in let+ t1 = fib ~on (x - 1) and+ t2 = fib ~on (x - 2) in
@ -31,14 +31,14 @@ let fib_fj ~on x : int Fut.t =
n1 + n2 n1 + n2
) )
in in
Fut.spawn ~name:"fib" ~on (fun () -> fib_rec x) Fut.spawn ~on (fun () -> fib_rec x)
let fib_await ~on x : int Fut.t = let fib_await ~on x : int Fut.t =
let rec fib_rec x : int Fut.t = let rec fib_rec x : int Fut.t =
if x <= !cutoff then if x <= !cutoff then
Fut.spawn ~name:"fib" ~on (fun () -> fib_direct x) Fut.spawn ~on (fun () -> fib_direct x)
else else
Fut.spawn ~name:"fib" ~on (fun () -> Fut.spawn ~on (fun () ->
let n1 = fib_rec (x - 1) in let n1 = fib_rec (x - 1) in
let n2 = fib_rec (x - 2) in let n2 = fib_rec (x - 2) in
let n1 = Fut.await n1 in let n1 = Fut.await n1 in

View file

@ -76,7 +76,7 @@ let run_fork_join ~kind num_steps : float =
let step = 1. /. float num_steps in let step = 1. /. float num_steps in
let global_sum = Lock.create 0. in let global_sum = Lock.create 0. in
Ws_pool.run_wait_block ~name:"pi.fj" pool (fun () -> Ws_pool.run_wait_block pool (fun () ->
FJ.for_ FJ.for_
~chunk_size:(3 + (num_steps / num_tasks)) ~chunk_size:(3 + (num_steps / num_tasks))
num_steps num_steps

View file

@ -6,7 +6,6 @@ let k_storage = Task_local_storage.Private_.Storage.k_storage
type task_full = { type task_full = {
f: unit -> unit; f: unit -> unit;
name: string;
ls: Task_local_storage.storage; ls: Task_local_storage.storage;
} }
@ -44,18 +43,17 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit =
!cur_ls !cur_ls
in in
let run_another_task ls ~name task' = let run_another_task ls task' =
let ls' = Task_local_storage.Private_.Storage.copy ls in let ls' = Task_local_storage.Private_.Storage.copy ls in
schedule_ self { f = task'; name; ls = ls' } schedule_ self { f = task'; ls = ls' }
in in
let run_task (task : task_full) : unit = let run_task (task : task_full) : unit =
cur_ls := task.ls; cur_ls := task.ls;
let _ctx = before_task runner in let _ctx = before_task runner in
cur_span := Tracing_.enter_span task.name;
let resume ls k res = let resume ls k res =
schedule_ self { f = (fun () -> k res); name = task.name; ls } schedule_ self { f = (fun () -> k res); ls }
in in
(* run the task now, catching errors, handling effects *) (* run the task now, catching errors, handling effects *)
@ -105,12 +103,12 @@ type ('a, 'b) create_args =
?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> ?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) -> ?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?num_threads:int -> ?num_threads:int ->
?name:string -> ?name:string ->
'a 'a
let create ?(on_init_thread = default_thread_init_exit_) let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
?around_task ?num_threads ?name () : t = ?around_task ?num_threads ?name () : t =
(* wrapper *) (* wrapper *)
let around_task = let around_task =
match around_task with match around_task with
@ -131,7 +129,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
{ threads = Array.make num_threads dummy; q = Bb_queue.create () } { threads = Array.make num_threads dummy; q = Bb_queue.create () }
in in
let run_async ~name ~ls f = schedule_ pool { f; name; ls } in let run_async ~ls f = schedule_ pool { f; ls } in
let runner = let runner =
Runner.For_runner_implementors.create Runner.For_runner_implementors.create

View file

@ -5,21 +5,13 @@ type 'a waiter = 'a or_error -> unit
type 'a state = type 'a state =
| Done of 'a or_error | Done of 'a or_error
| Waiting of { | Waiting of { waiters: 'a waiter list }
waiters: 'a waiter list;
name: string;
}
type 'a t = { st: 'a state A.t } [@@unboxed] type 'a t = { st: 'a state A.t } [@@unboxed]
type 'a promise = 'a t type 'a promise = 'a t
let[@inline] get_name_ (self : _ t) = let make () =
match A.get self.st with let fut = { st = A.make (Waiting { waiters = [] }) } in
| Done _ -> ""
| Waiting { name; _ } -> name
let make ?(name = "") () =
let fut = { st = A.make (Waiting { waiters = []; name }) } in
fut, fut fut, fut
let[@inline] of_result x : _ t = { st = A.make (Done x) } let[@inline] of_result x : _ t = { st = A.make (Done x) }
@ -72,8 +64,8 @@ let on_result (self : _ t) (f : _ waiter) : unit =
| Done x -> | Done x ->
f x; f x;
false false
| Waiting { waiters = l; name } -> | Waiting { waiters = l } ->
not (A.compare_and_set self.st st (Waiting { waiters = f :: l; name })) not (A.compare_and_set self.st st (Waiting { waiters = f :: l }))
do do
Domain_.relax () Domain_.relax ()
done done
@ -86,7 +78,7 @@ let fulfill (self : _ t) (r : _ result) : unit =
let st = A.get self.st in let st = A.get self.st in
match st with match st with
| Done _ -> raise Already_fulfilled | Done _ -> raise Already_fulfilled
| Waiting { waiters = l; name = _ } -> | Waiting { waiters = l } ->
let did_swap = A.compare_and_set self.st st (Done r) in let did_swap = A.compare_and_set self.st st (Done r) in
if did_swap then ( if did_swap then (
(* success, now call all the waiters *) (* success, now call all the waiters *)
@ -105,7 +97,7 @@ let[@inline] fulfill_idempotent self r =
(* ### combinators ### *) (* ### combinators ### *)
let spawn ?name ?ls ~on f : _ t = let spawn ?ls ~on f : _ t =
let fut, promise = make () in let fut, promise = make () in
let task () = let task () =
@ -118,13 +110,13 @@ let spawn ?name ?ls ~on f : _ t =
fulfill promise res fulfill promise res
in in
Runner.run_async ?name ?ls on task; Runner.run_async ?ls on task;
fut fut
let spawn_on_current_runner ?name ?ls f : _ t = let spawn_on_current_runner ?ls f : _ t =
match Runner.get_current_runner () with match Runner.get_current_runner () with
| None -> failwith "Fut.spawn_on_current_runner: not running on a runner" | None -> failwith "Fut.spawn_on_current_runner: not running on a runner"
| Some on -> spawn ?name ?ls ~on f | Some on -> spawn ?ls ~on f
let reify_error (f : 'a t) : 'a or_error t = let reify_error (f : 'a t) : 'a or_error t =
match peek f with match peek f with
@ -150,22 +142,20 @@ let map ?on ~f fut : _ t =
| Error e_bt -> Error e_bt | Error e_bt -> Error e_bt
in in
let name = get_name_ fut in
match peek fut, get_runner_ ?on () with match peek fut, get_runner_ ?on () with
| Some res, None -> of_result @@ map_immediate_ res | Some res, None -> of_result @@ map_immediate_ res
| Some res, Some runner -> | Some res, Some runner ->
let fut2, promise = make ~name () in let fut2, promise = make () in
Runner.run_async ~name runner (fun () -> Runner.run_async runner (fun () -> fulfill promise @@ map_immediate_ res);
fulfill promise @@ map_immediate_ res);
fut2 fut2
| None, None -> | None, None ->
let fut2, promise = make ~name () in let fut2, promise = make () in
on_result fut (fun res -> fulfill promise @@ map_immediate_ res); on_result fut (fun res -> fulfill promise @@ map_immediate_ res);
fut2 fut2
| None, Some runner -> | None, Some runner ->
let fut2, promise = make ~name () in let fut2, promise = make () in
on_result fut (fun res -> on_result fut (fun res ->
Runner.run_async ~name runner (fun () -> Runner.run_async runner (fun () ->
fulfill promise @@ map_immediate_ res)); fulfill promise @@ map_immediate_ res));
fut2 fut2
@ -174,7 +164,7 @@ let join (fut : 'a t t) : 'a t =
| Some (Ok f) -> f | Some (Ok f) -> f
| Some (Error (e, bt)) -> fail e bt | Some (Error (e, bt)) -> fail e bt
| None -> | None ->
let fut2, promise = make ~name:(get_name_ fut) () in let fut2, promise = make () in
on_result fut (function on_result fut (function
| Ok sub_fut -> on_result sub_fut (fulfill promise) | Ok sub_fut -> on_result sub_fut (fulfill promise)
| Error _ as e -> fulfill promise e); | Error _ as e -> fulfill promise e);
@ -197,20 +187,19 @@ let bind ?on ~f fut : _ t =
on_result f_res_fut (fun r -> fulfill promise r) on_result f_res_fut (fun r -> fulfill promise r)
in in
let name = get_name_ fut in
match peek fut, get_runner_ ?on () with match peek fut, get_runner_ ?on () with
| Some res, Some runner -> | Some res, Some runner ->
let fut2, promise = make ~name () in let fut2, promise = make () in
Runner.run_async ~name runner (bind_and_fulfill res promise); Runner.run_async runner (bind_and_fulfill res promise);
fut2 fut2
| Some res, None -> apply_f_to_res res | Some res, None -> apply_f_to_res res
| None, Some runner -> | None, Some runner ->
let fut2, promise = make ~name () in let fut2, promise = make () in
on_result fut (fun r -> on_result fut (fun r ->
Runner.run_async ~name runner (bind_and_fulfill r promise)); Runner.run_async runner (bind_and_fulfill r promise));
fut2 fut2
| None, None -> | None, None ->
let fut2, promise = make ~name () in let fut2, promise = make () in
on_result fut (fun res -> bind_and_fulfill res promise ()); on_result fut (fun res -> bind_and_fulfill res promise ());
fut2 fut2
@ -234,7 +223,7 @@ let both a b : _ t =
| Some (Ok x), Some (Ok y) -> return (x, y) | Some (Ok x), Some (Ok y) -> return (x, y)
| Some (Error (e, bt)), _ | _, Some (Error (e, bt)) -> fail e bt | Some (Error (e, bt)), _ | _, Some (Error (e, bt)) -> fail e bt
| _ -> | _ ->
let fut, promise = make ~name:(get_name_ a) () in let fut, promise = make () in
let st = A.make `Neither in let st = A.make `Neither in
on_result a (function on_result a (function
@ -267,7 +256,7 @@ let choose a b : _ t =
| _, Some (Ok y) -> return (Either.Right y) | _, Some (Ok y) -> return (Either.Right y)
| Some (Error (e, bt)), Some (Error _) -> fail e bt | Some (Error (e, bt)), Some (Error _) -> fail e bt
| _ -> | _ ->
let fut, promise = make ~name:(get_name_ a) () in let fut, promise = make () in
let one_failure = A.make false in let one_failure = A.make false in
on_result a (function on_result a (function
@ -290,7 +279,7 @@ let choose_same a b : _ t =
| _, Some (Ok y) -> return y | _, Some (Ok y) -> return y
| Some (Error (e, bt)), Some (Error _) -> fail e bt | Some (Error (e, bt)), Some (Error _) -> fail e bt
| _ -> | _ ->
let fut, promise = make ~name:(get_name_ a) () in let fut, promise = make () in
let one_failure = A.make false in let one_failure = A.make false in
on_result a (function on_result a (function

View file

@ -26,9 +26,8 @@ type 'a promise
(** A promise, which can be fulfilled exactly once to set (** A promise, which can be fulfilled exactly once to set
the corresponding future *) the corresponding future *)
val make : ?name:string -> unit -> 'a t * 'a promise val make : unit -> 'a t * 'a promise
(** Make a new future with the associated promise. (** Make a new future with the associated promise. *)
@param name name for the future, used for tracing. since NEXT_RELEASE. *)
val on_result : 'a t -> ('a or_error -> unit) -> unit val on_result : 'a t -> ('a or_error -> unit) -> unit
(** [on_result fut f] registers [f] to be called in the future (** [on_result fut f] registers [f] to be called in the future
@ -95,16 +94,12 @@ val is_failed : _ t -> bool
(** {2 Combinators} *) (** {2 Combinators} *)
val spawn : val spawn :
?name:string -> ?ls:Task_local_storage.storage -> on:Runner.t -> (unit -> 'a) -> 'a t
?ls:Task_local_storage.storage ->
on:Runner.t ->
(unit -> 'a) ->
'a t
(** [spaw ~on f] runs [f()] on the given runner [on], and return a future that will (** [spaw ~on f] runs [f()] on the given runner [on], and return a future that will
hold its result. *) hold its result. *)
val spawn_on_current_runner : val spawn_on_current_runner :
?name:string -> ?ls:Task_local_storage.storage -> (unit -> 'a) -> 'a t ?ls:Task_local_storage.storage -> (unit -> 'a) -> 'a t
(** This must be run from inside a runner, and schedules (** This must be run from inside a runner, and schedules
the new task on it as well. the new task on it as well.

View file

@ -4,19 +4,16 @@ include Runner
(* convenient alias *) (* convenient alias *)
let k_ls = Task_local_storage.Private_.Storage.k_storage let k_ls = Task_local_storage.Private_.Storage.k_storage
let run_async_ ~name ~ls f = let run_async_ ~ls f =
let cur_ls = ref ls in let cur_ls = ref ls in
TLS.set k_ls (Some cur_ls); TLS.set k_ls (Some cur_ls);
cur_ls := ls; cur_ls := ls;
let sp = Tracing_.enter_span name in
try try
let x = f () in let x = f () in
Tracing_.exit_span sp;
TLS.set k_ls None; TLS.set k_ls None;
x x
with e -> with e ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
Tracing_.exit_span sp;
TLS.set k_ls None; TLS.set k_ls None;
Printexc.raise_with_backtrace e bt Printexc.raise_with_backtrace e bt

View file

@ -26,25 +26,14 @@ val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t
to run all the various threads needed in an application (timers, event loops, etc.) *) to run all the various threads needed in an application (timers, event loops, etc.) *)
val run_async : val run_async :
?name:string -> ?ls:Task_local_storage.storage -> Runner.t -> (unit -> unit) -> unit
?ls:Task_local_storage.storage ->
Runner.t ->
(unit -> unit) ->
unit
(** [run_async runner task] schedules the task to run (** [run_async runner task] schedules the task to run
on the given runner. This means [task()] will be executed on the given runner. This means [task()] will be executed
at some point in the future, possibly in another thread. at some point in the future, possibly in another thread.
@param name if provided and [Trace] is present in dependencies, a span
will be created when the task starts, and will stop when the task is over.
(since NEXT_RELEASE)
@since 0.5 *) @since 0.5 *)
val run_wait_block : val run_wait_block :
?name:string -> ?ls:Task_local_storage.storage -> Runner.t -> (unit -> 'a) -> 'a
?ls:Task_local_storage.storage ->
Runner.t ->
(unit -> 'a) ->
'a
(** [run_wait_block runner f] schedules [f] for later execution (** [run_wait_block runner f] schedules [f] for later execution
on the runner, like {!run_async}. on the runner, like {!run_async}.
It then blocks the current thread until [f()] is done executing, It then blocks the current thread until [f()] is done executing,
@ -63,21 +52,14 @@ val recommended_thread_count : unit -> int
@since 0.5 *) @since 0.5 *)
val spawn : val spawn :
?name:string -> ?ls:Task_local_storage.storage -> on:Runner.t -> (unit -> 'a) -> 'a Fut.t
?ls:Task_local_storage.storage ->
on:Runner.t ->
(unit -> 'a) ->
'a Fut.t
(** [spawn ~on f] runs [f()] on the runner (a thread pool typically) (** [spawn ~on f] runs [f()] on the runner (a thread pool typically)
and returns a future result for it. See {!Fut.spawn}. and returns a future result for it. See {!Fut.spawn}.
@param name if provided and [Trace] is present in dependencies,
a span will be created for the future. (since 0.6)
@since 0.5 *) @since 0.5 *)
val spawn_on_current_runner : val spawn_on_current_runner :
?name:string -> ?ls:Task_local_storage.storage -> (unit -> 'a) -> 'a Fut.t ?ls:Task_local_storage.storage -> (unit -> 'a) -> 'a Fut.t
(** See {!Fut.spawn_on_current_runner}. (** See {!Fut.spawn_on_current_runner}.
@param name see {!spawn}. since 0.6.
@since 0.5 *) @since 0.5 *)
[@@@ifge 5.0] [@@@ifge 5.0]

View file

@ -3,7 +3,7 @@ module TLS = Thread_local_storage_
type task = unit -> unit type task = unit -> unit
type t = { type t = {
run_async: name:string -> ls:Task_local_storage.storage -> task -> unit; run_async: ls:Task_local_storage.storage -> task -> unit;
shutdown: wait:bool -> unit -> unit; shutdown: wait:bool -> unit -> unit;
size: unit -> int; size: unit -> int;
num_tasks: unit -> int; num_tasks: unit -> int;
@ -11,9 +11,9 @@ type t = {
exception Shutdown exception Shutdown
let[@inline] run_async ?(name = "") let[@inline] run_async ?(ls = Task_local_storage.Private_.Storage.create ())
?(ls = Task_local_storage.Private_.Storage.create ()) (self : t) f : unit = (self : t) f : unit =
self.run_async ~name ~ls f self.run_async ~ls f
let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true () let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true ()
@ -23,9 +23,9 @@ let[@inline] shutdown_without_waiting (self : t) : unit =
let[@inline] num_tasks (self : t) : int = self.num_tasks () let[@inline] num_tasks (self : t) : int = self.num_tasks ()
let[@inline] size (self : t) : int = self.size () let[@inline] size (self : t) : int = self.size ()
let run_wait_block ?name ?ls self (f : unit -> 'a) : 'a = let run_wait_block ?ls self (f : unit -> 'a) : 'a =
let q = Bb_queue.create () in let q = Bb_queue.create () in
run_async ?name ?ls self (fun () -> run_async ?ls self (fun () ->
try try
let x = f () in let x = f () in
Bb_queue.push q (Ok x) Bb_queue.push q (Ok x)

View file

@ -33,19 +33,14 @@ val shutdown_without_waiting : t -> unit
exception Shutdown exception Shutdown
val run_async : val run_async : ?ls:Task_local_storage.storage -> t -> task -> unit
?name:string -> ?ls:Task_local_storage.storage -> t -> task -> unit
(** [run_async pool f] schedules [f] for later execution on the runner (** [run_async pool f] schedules [f] for later execution on the runner
in one of the threads. [f()] will run on one of the runner's in one of the threads. [f()] will run on one of the runner's
worker threads/domains. worker threads/domains.
@param name if provided and [Trace] is present in dependencies, a span
will be created when the task starts, and will stop when the task is over.
(since NEXT_RELEASE)
@param ls if provided, run the task with this initial local storage @param ls if provided, run the task with this initial local storage
@raise Shutdown if the runner was shut down before [run_async] was called. *) @raise Shutdown if the runner was shut down before [run_async] was called. *)
val run_wait_block : val run_wait_block : ?ls:Task_local_storage.storage -> t -> (unit -> 'a) -> 'a
?name:string -> ?ls:Task_local_storage.storage -> t -> (unit -> 'a) -> 'a
(** [run_wait_block pool f] schedules [f] for later execution (** [run_wait_block pool f] schedules [f] for later execution
on the pool, like {!run_async}. on the pool, like {!run_async}.
It then blocks the current thread until [f()] is done executing, It then blocks the current thread until [f()] is done executing,
@ -65,7 +60,7 @@ module For_runner_implementors : sig
size:(unit -> int) -> size:(unit -> int) ->
num_tasks:(unit -> int) -> num_tasks:(unit -> int) ->
shutdown:(wait:bool -> unit -> unit) -> shutdown:(wait:bool -> unit -> unit) ->
run_async:(name:string -> ls:Task_local_storage.storage -> task -> unit) -> run_async:(ls:Task_local_storage.storage -> task -> unit) ->
unit -> unit ->
t t
(** Create a new runner. (** Create a new runner.

View file

@ -7,7 +7,7 @@ type task = unit -> unit
type suspension_handler = { type suspension_handler = {
handle: handle:
run:(name:string -> task -> unit) -> run:(task -> unit) ->
resume:(suspension -> unit Exn_bt.result -> unit) -> resume:(suspension -> unit Exn_bt.result -> unit) ->
suspension -> suspension ->
unit; unit;
@ -27,8 +27,7 @@ type with_suspend_handler =
| WSH : { | WSH : {
on_suspend: unit -> 'state; on_suspend: unit -> 'state;
(** on_suspend called when [f()] suspends itself. *) (** on_suspend called when [f()] suspends itself. *)
run: 'state -> name:string -> task -> unit; run: 'state -> task -> unit; (** run used to schedule new tasks *)
(** run used to schedule new tasks *)
resume: 'state -> suspension -> unit Exn_bt.result -> unit; resume: 'state -> suspension -> unit Exn_bt.result -> unit;
(** resume run the suspension. Must be called exactly once. *) (** resume run the suspension. Must be called exactly once. *)
} }

View file

@ -14,7 +14,7 @@ type task = unit -> unit
type suspension_handler = { type suspension_handler = {
handle: handle:
run:(name:string -> task -> unit) -> run:(task -> unit) ->
resume:(suspension -> unit Exn_bt.result -> unit) -> resume:(suspension -> unit Exn_bt.result -> unit) ->
suspension -> suspension ->
unit; unit;
@ -24,7 +24,6 @@ type suspension_handler = {
The handler is given a few things: The handler is given a few things:
- the name (if any) of the current computation
- the suspended computation (which can be resumed with a result - the suspended computation (which can be resumed with a result
eventually); eventually);
- a [run] function that can be used to start tasks to perform some - a [run] function that can be used to start tasks to perform some
@ -70,8 +69,7 @@ type with_suspend_handler =
| WSH : { | WSH : {
on_suspend: unit -> 'state; on_suspend: unit -> 'state;
(** on_suspend called when [f()] suspends itself. *) (** on_suspend called when [f()] suspends itself. *)
run: 'state -> name:string -> task -> unit; run: 'state -> task -> unit; (** run used to schedule new tasks *)
(** run used to schedule new tasks *)
resume: 'state -> suspension -> unit Exn_bt.result -> unit; resume: 'state -> suspension -> unit Exn_bt.result -> unit;
(** resume run the suspension. Must be called exactly once. *) (** resume run the suspension. Must be called exactly once. *)
} }

View file

@ -16,7 +16,6 @@ end
type task_full = { type task_full = {
f: task; f: task;
name: string;
ls: Task_local_storage.storage; ls: Task_local_storage.storage;
} }
@ -26,7 +25,6 @@ type worker_state = {
pool_id_: Id.t; (** Unique per pool *) pool_id_: Id.t; (** Unique per pool *)
mutable thread: Thread.t; mutable thread: Thread.t;
q: task_full WSQ.t; (** Work stealing queue *) q: task_full WSQ.t; (** Work stealing queue *)
mutable cur_span: int64;
cur_ls: Task_local_storage.storage ref; (** Task storage *) cur_ls: Task_local_storage.storage ref; (** Task storage *)
rng: Random.State.t; rng: Random.State.t;
} }
@ -75,10 +73,10 @@ let[@inline] try_wake_someone_ (self : state) : unit =
) )
(** Run [task] as is, on the pool. *) (** Run [task] as is, on the pool. *)
let schedule_task_ (self : state) ~name ~ls (w : worker_state option) (f : task) let schedule_task_ (self : state) ~ls (w : worker_state option) (f : task)
: unit = : unit =
(* Printf.printf "schedule task now (%d)\n%!" (Thread.id @@ Thread.self ()); *) (* Printf.printf "schedule task now (%d)\n%!" (Thread.id @@ Thread.self ()); *)
let task = { f; name; ls } in let task = { f; ls } in
match w with match w with
| Some w when Id.equal self.id_ w.pool_id_ -> | Some w when Id.equal self.id_ w.pool_id_ ->
(* we're on this same pool, schedule in the worker's state. Otherwise (* we're on this same pool, schedule in the worker's state. Otherwise
@ -107,33 +105,26 @@ let schedule_task_ (self : state) ~name ~ls (w : worker_state option) (f : task)
raise Shutdown raise Shutdown
(** Run this task, now. Must be called from a worker. *) (** Run this task, now. Must be called from a worker. *)
let run_task_now_ (self : state) ~runner (w : worker_state) ~name ~ls task : let run_task_now_ (self : state) ~runner (w : worker_state) ~ls task :
unit = unit =
(* Printf.printf "run task now (%d)\n%!" (Thread.id @@ Thread.self ()); *) (* Printf.printf "run task now (%d)\n%!" (Thread.id @@ Thread.self ()); *)
let (AT_pair (before_task, after_task)) = self.around_task in let (AT_pair (before_task, after_task)) = self.around_task in
w.cur_ls := ls; w.cur_ls := ls;
let _ctx = before_task runner in let _ctx = before_task runner in
w.cur_span <- Tracing_.enter_span name; let[@inline] on_suspend () =
let[@inline] exit_span_ () =
Tracing_.exit_span w.cur_span;
w.cur_span <- Tracing_.dummy_span
in
let on_suspend () =
exit_span_ ();
!(w.cur_ls) !(w.cur_ls)
in in
let run_another_task ls ~name task' = let run_another_task ls task' =
let w = find_current_worker_ () in let w = find_current_worker_ () in
let ls' = Task_local_storage.Private_.Storage.copy ls in let ls' = Task_local_storage.Private_.Storage.copy ls in
schedule_task_ self w ~name ~ls:ls' task' schedule_task_ self w ~ls:ls' task'
in in
let resume ls k r = let resume ls k r =
let w = find_current_worker_ () in let w = find_current_worker_ () in
schedule_task_ self w ~name ~ls (fun () -> k r) schedule_task_ self w ~ls (fun () -> k r)
in in
(* run the task now, catching errors *) (* run the task now, catching errors *)
@ -152,13 +143,12 @@ let run_task_now_ (self : state) ~runner (w : worker_state) ~name ~ls task :
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
self.on_exn e bt); self.on_exn e bt);
exit_span_ ();
after_task runner _ctx; after_task runner _ctx;
w.cur_ls := Task_local_storage.Private_.Storage.dummy w.cur_ls := Task_local_storage.Private_.Storage.dummy
let[@inline] run_async_ (self : state) ~name ~ls (f : task) : unit = let[@inline] run_async_ (self : state) ~ls (f : task) : unit =
let w = find_current_worker_ () in let w = find_current_worker_ () in
schedule_task_ self w ~name ~ls f schedule_task_ self w ~ls f
(* TODO: function to schedule many tasks from the outside. (* TODO: function to schedule many tasks from the outside.
- build a queue - build a queue
@ -204,7 +194,7 @@ let worker_run_self_tasks_ (self : state) ~runner w : unit =
match WSQ.pop w.q with match WSQ.pop w.q with
| Some task -> | Some task ->
try_wake_someone_ self; try_wake_someone_ self;
run_task_now_ self ~runner w ~name:task.name ~ls:task.ls task.f run_task_now_ self ~runner w ~ls:task.ls task.f
| None -> continue := false | None -> continue := false
done done
@ -217,7 +207,7 @@ let worker_thread_ (self : state) ~(runner : t) (w : worker_state) : unit =
worker_run_self_tasks_ self ~runner w; worker_run_self_tasks_ self ~runner w;
try_steal () try_steal ()
and run_task task : unit = and run_task task : unit =
run_task_now_ self ~runner w ~name:task.name ~ls:task.ls task.f; run_task_now_ self ~runner w ~ls:task.ls task.f;
main () main ()
and try_steal () = and try_steal () =
match try_to_steal_work_once_ self w with match try_to_steal_work_once_ self w with
@ -276,7 +266,7 @@ type ('a, 'b) create_args =
'a 'a
(** Arguments used in {!create}. See {!create} for explanations. *) (** Arguments used in {!create}. See {!create} for explanations. *)
let dummy_task_ = { f = ignore; ls = Task_local_storage.Private_.Storage.dummy ; name = "DUMMY_TASK" } let dummy_task_ = { f = ignore; ls = Task_local_storage.Private_.Storage.dummy ; }
let create ?(on_init_thread = default_thread_init_exit_) let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
@ -301,7 +291,6 @@ let create ?(on_init_thread = default_thread_init_exit_)
{ {
pool_id_; pool_id_;
thread = dummy; thread = dummy;
cur_span = Tracing_.dummy_span;
q = WSQ.create ~dummy:dummy_task_ (); q = WSQ.create ~dummy:dummy_task_ ();
rng = Random.State.make [| i |]; rng = Random.State.make [| i |];
cur_ls = ref Task_local_storage.Private_.Storage.dummy; cur_ls = ref Task_local_storage.Private_.Storage.dummy;
@ -326,7 +315,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
let runner = let runner =
Runner.For_runner_implementors.create Runner.For_runner_implementors.create
~shutdown:(fun ~wait () -> shutdown_ pool ~wait) ~shutdown:(fun ~wait () -> shutdown_ pool ~wait)
~run_async:(fun ~name ~ls f -> run_async_ pool ~name ~ls f) ~run_async:(fun ~ls f -> run_async_ pool ~ls f)
~size:(fun () -> size_ pool) ~size:(fun () -> size_ pool)
~num_tasks:(fun () -> num_tasks_ pool) ~num_tasks:(fun () -> num_tasks_ pool)
() ()

View file

@ -148,9 +148,9 @@ let add_child_ ~protect (self : _ t) (child : _ t) =
let k_current_fiber : any option Task_local_storage.key = let k_current_fiber : any option Task_local_storage.key =
Task_local_storage.new_key ~init:(fun () -> None) () Task_local_storage.new_key ~init:(fun () -> None) ()
let spawn_ ?name ~on (f : _ -> 'a) : 'a t = let spawn_ ~on (f : _ -> 'a) : 'a t =
let id = Handle.generate_fresh () in let id = Handle.generate_fresh () in
let res, _promise = Fut.make ?name () in let res, _promise = Fut.make () in
let fib = let fib =
{ {
state = A.make @@ Alive { children = FM.empty; on_cancel = [] }; state = A.make @@ Alive { children = FM.empty; on_cancel = [] };
@ -172,17 +172,17 @@ let spawn_ ?name ~on (f : _ -> 'a) : 'a t =
resolve_as_failed_ fib ebt resolve_as_failed_ fib ebt
in in
Runner.run_async on ?name run; Runner.run_async on run;
fib fib
let[@inline] spawn_top ?name ~on f : _ t = spawn_ ?name ~on f let[@inline] spawn_top ~on f : _ t = spawn_ ~on f
let spawn_link ?name ~protect f : _ t = let spawn_link ~protect f : _ t =
match Task_local_storage.get k_current_fiber with match Task_local_storage.get k_current_fiber with
| None -> failwith "Fiber.spawn_link: must be run from inside a fiber." | None -> failwith "Fiber.spawn_link: must be run from inside a fiber."
| Some (Any parent) -> | Some (Any parent) ->
let child = spawn_ ?name ~on:parent.runner f in let child = spawn_ ~on:parent.runner f in
add_child_ ~protect parent child; add_child_ ~protect parent child;
child child

View file

@ -55,12 +55,12 @@ val on_result : 'a t -> 'a callback -> unit
with the result. If the fiber is done already then the with the result. If the fiber is done already then the
callback is invoked immediately with its result. *) callback is invoked immediately with its result. *)
val spawn_top : ?name:string -> on:Runner.t -> (unit -> 'a) -> 'a t val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t
(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. (** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner.
This fiber is not the child of any other fiber: its lifetime This fiber is not the child of any other fiber: its lifetime
is only determined by the lifetime of [f()]. *) is only determined by the lifetime of [f()]. *)
val spawn_link : ?name:string -> protect:bool -> (unit -> 'a) -> 'a t val spawn_link : protect:bool -> (unit -> 'a) -> 'a t
(** [spawn_link ~protect f] spawns a sub-fiber [f_child] (** [spawn_link ~protect f] spawns a sub-fiber [f_child]
from a running fiber [parent]. from a running fiber [parent].
The sub-fiber [f_child] is attached to the current fiber and fails The sub-fiber [f_child] is attached to the current fiber and fails

View file

@ -135,7 +135,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
let len_range = min chunk_size (n - offset) in let len_range = min chunk_size (n - offset) in
assert (offset + len_range <= n); assert (offset + len_range <= n);
run ~name:"" (fun () -> task_for ~offset ~len_range); run (fun () -> task_for ~offset ~len_range);
i := !i + len_range i := !i + len_range
done done
in in

View file

@ -20,7 +20,7 @@ let fib ~on x : int Fut.t =
Fut.await t1 + Fut.await t2 Fut.await t1 + Fut.await t2
) )
in in
Fut.spawn ~name:"fib" ~on (fun () -> fib_rec x) Fut.spawn ~on (fun () -> fib_rec x)
(* NOTE: for tracy support (* NOTE: for tracy support
let () = Tracy_client_trace.setup () let () = Tracy_client_trace.setup ()