feat: support for trace as a depopt

one can now give tasks an optional string "name". If `Trace`
is present (installed) and enabled, this results in
a span around the task's execution. This also plays ok
with `await` and other effect-based primitives.
This commit is contained in:
Simon Cruanes 2024-01-30 16:10:11 -05:00
parent 469cb89ecd
commit 092ad5f2ce
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
18 changed files with 184 additions and 74 deletions

View file

@ -28,6 +28,7 @@
(>= 1.9.0) (>= 1.9.0)
:with-test))) :with-test)))
(depopts (depopts
(trace (>= 0.6))
thread-local-storage thread-local-storage
(domain-local-await (>= 0.2))) (domain-local-await (>= 0.2)))
(tags (tags

View file

@ -19,6 +19,7 @@ depends: [
"mdx" {>= "1.9.0" & with-test} "mdx" {>= "1.9.0" & with-test}
] ]
depopts: [ depopts: [
"trace" {>= "0.6"}
"thread-local-storage" "thread-local-storage"
"domain-local-await" {>= "0.2"} "domain-local-await" {>= "0.2"}
] ]

View file

@ -1,7 +1,7 @@
(library (library
(public_name moonpool) (public_name moonpool)
(name moonpool) (name moonpool)
(private_modules d_pool_ dla_) (private_modules d_pool_ dla_ tracing_)
(preprocess (preprocess
(action (action
(run %{project_root}/src/cpp/cpp.exe %{input-file}))) (run %{project_root}/src/cpp/cpp.exe %{input-file})))
@ -9,6 +9,9 @@
(select thread_local_storage_.ml from (select thread_local_storage_.ml from
(thread-local-storage -> thread_local_storage_.stub.ml) (thread-local-storage -> thread_local_storage_.stub.ml)
(-> thread_local_storage_.real.ml)) (-> thread_local_storage_.real.ml))
(select tracing_.ml from
(trace.core -> tracing_.real.ml)
(-> tracing_.dummy.ml))
(select dla_.ml from (select dla_.ml from
(domain-local-await -> dla_.real.ml) (domain-local-await -> dla_.real.ml)
( -> dla_.dummy.ml)))) ( -> dla_.dummy.ml))))

View file

@ -3,9 +3,14 @@ include Runner
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
type task_with_name = {
f: unit -> unit;
name: string;
}
type state = { type state = {
threads: Thread.t array; threads: Thread.t array;
q: task Bb_queue.t; (** Queue for tasks. *) q: task_with_name Bb_queue.t; (** Queue for tasks. *)
} }
(** internal state *) (** internal state *)
@ -13,7 +18,7 @@ let[@inline] size_ (self : state) = Array.length self.threads
let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q
(** Run [task] as is, on the pool. *) (** Run [task] as is, on the pool. *)
let schedule_ (self : state) (task : task) : unit = let schedule_ (self : state) (task : task_with_name) : unit =
try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown
type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task
@ -22,19 +27,33 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit =
TLS.get Runner.For_runner_implementors.k_cur_runner := Some runner; TLS.get Runner.For_runner_implementors.k_cur_runner := Some runner;
let (AT_pair (before_task, after_task)) = around_task in let (AT_pair (before_task, after_task)) = around_task in
let run_task task : unit = let cur_span = ref Tracing_.dummy_span in
let[@inline] exit_span_ () =
Tracing_.exit_span !cur_span;
cur_span := Tracing_.dummy_span
in
let run_another_task ~name task' = schedule_ self { f = task'; name } in
let run_task (task : task_with_name) : unit =
let _ctx = before_task runner in let _ctx = before_task runner in
cur_span := Tracing_.enter_span task.name;
(* run the task now, catching errors *) (* run the task now, catching errors *)
(try Suspend_.with_suspend task ~run:(fun task' -> schedule_ self task') (try
Suspend_.with_suspend task.f ~name:task.name ~run:run_another_task
~on_suspend:exit_span_
with e -> with e ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
on_exn e bt); on_exn e bt);
exit_span_ ();
after_task runner _ctx after_task runner _ctx
in in
let main_loop () = let main_loop () =
let continue = ref true in let continue = ref true in
while !continue do while !continue do
assert (!cur_span = Tracing_.dummy_span);
match Bb_queue.pop self.q with match Bb_queue.pop self.q with
| task -> run_task task | task -> run_task task
| exception Bb_queue.Closed -> continue := false | exception Bb_queue.Closed -> continue := false
@ -84,10 +103,12 @@ let create ?(on_init_thread = default_thread_init_exit_)
{ threads = Array.make num_threads dummy; q = Bb_queue.create () } { threads = Array.make num_threads dummy; q = Bb_queue.create () }
in in
let run_async ~name f = schedule_ pool { f; name } in
let runner = let runner =
Runner.For_runner_implementors.create Runner.For_runner_implementors.create
~shutdown:(fun ~wait () -> shutdown_ pool ~wait) ~shutdown:(fun ~wait () -> shutdown_ pool ~wait)
~run_async:(fun f -> schedule_ pool f) ~run_async
~size:(fun () -> size_ pool) ~size:(fun () -> size_ pool)
~num_tasks:(fun () -> num_tasks_ pool) ~num_tasks:(fun () -> num_tasks_ pool)
() ()

View file

@ -48,7 +48,7 @@ module State_ = struct
Suspend_.suspend Suspend_.suspend
{ {
Suspend_.handle = Suspend_.handle =
(fun ~run:_ suspension -> (fun ~name:_ ~run:_ suspension ->
while while
let old_st = A.get self in let old_st = A.get self in
match old_st with match old_st with
@ -113,19 +113,19 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
max 1 (1 + (n / D_pool_.n_domains ())) max 1 (1 + (n / D_pool_.n_domains ()))
in in
let start_tasks ~run (suspension : Suspend_.suspension) = let start_tasks ~name ~run (suspension : Suspend_.suspension) =
let task_for ~offset ~len_range = let task_for ~offset ~len_range =
match f offset (offset + len_range - 1) with match f offset (offset + len_range - 1) with
| () -> | () ->
if A.fetch_and_add missing (-len_range) = len_range then if A.fetch_and_add missing (-len_range) = len_range then
(* all tasks done successfully *) (* all tasks done successfully *)
suspension (Ok ()) run ~name (fun () -> suspension (Ok ()))
| exception exn -> | exception exn ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
if not (A.exchange has_failed true) then if not (A.exchange has_failed true) then
(* first one to fail, and [missing] must be >= 2 (* first one to fail, and [missing] must be >= 2
because we're not decreasing it. *) because we're not decreasing it. *)
suspension (Error (exn, bt)) run ~name (fun () -> suspension (Error (exn, bt)))
in in
let i = ref 0 in let i = ref 0 in
@ -135,7 +135,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
let len_range = min chunk_size (n - offset) in let len_range = min chunk_size (n - offset) in
assert (offset + len_range <= n); assert (offset + len_range <= n);
run (fun () -> task_for ~offset ~len_range); run ~name (fun () -> task_for ~offset ~len_range);
i := !i + len_range i := !i + len_range
done done
in in
@ -143,9 +143,9 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
Suspend_.suspend Suspend_.suspend
{ {
Suspend_.handle = Suspend_.handle =
(fun ~run suspension -> (fun ~name ~run suspension ->
(* run tasks, then we'll resume [suspension] *) (* run tasks, then we'll resume [suspension] *)
start_tasks ~run suspension); start_tasks ~run ~name suspension);
} }
) )

View file

@ -5,16 +5,16 @@ type 'a waiter = 'a or_error -> unit
type 'a state = type 'a state =
| Done of 'a or_error | Done of 'a or_error
| Waiting of 'a waiter list | Waiting of { waiters: 'a waiter list }
type 'a t = { st: 'a state A.t } [@@unboxed] type 'a t = { st: 'a state A.t } [@@unboxed]
type 'a promise = 'a t type 'a promise = 'a t
let make () = let make () =
let fut = { st = A.make (Waiting []) } in let fut = { st = A.make (Waiting { waiters = [] }) } in
fut, fut fut, fut
let of_result x : _ t = { st = A.make (Done x) } let[@inline] of_result x : _ t = { st = A.make (Done x) }
let[@inline] return x : _ t = of_result (Ok x) let[@inline] return x : _ t = of_result (Ok x)
let[@inline] fail e bt : _ t = of_result (Error (e, bt)) let[@inline] fail e bt : _ t = of_result (Error (e, bt))
@ -53,9 +53,8 @@ let on_result (self : _ t) (f : _ waiter) : unit =
| Done x -> | Done x ->
f x; f x;
false false
| Waiting l -> | Waiting { waiters = l } ->
let must_retry = not (A.compare_and_set self.st st (Waiting (f :: l))) in not (A.compare_and_set self.st st (Waiting { waiters = f :: l }))
must_retry
do do
Domain_.relax () Domain_.relax ()
done done
@ -63,28 +62,31 @@ let on_result (self : _ t) (f : _ waiter) : unit =
exception Already_fulfilled exception Already_fulfilled
let fulfill (self : _ t) (r : _ result) : unit = let fulfill (self : _ t) (r : _ result) : unit =
let fs = ref [] in
while while
let st = A.get self.st in let st = A.get self.st in
match st with match st with
| Done _ -> raise Already_fulfilled | Done _ -> raise Already_fulfilled
| Waiting l -> | Waiting { waiters = l } ->
let did_swap = A.compare_and_set self.st st (Done r) in let did_swap = A.compare_and_set self.st st (Done r) in
if did_swap then ( if did_swap then (
(* success, now call all the waiters *) (* success, now call all the waiters *)
List.iter (fun f -> try f r with _ -> ()) l; fs := l;
false false
) else ) else
true true
do do
Domain_.relax () Domain_.relax ()
done done;
List.iter (fun f -> try f r with _ -> ()) !fs;
()
let[@inline] fulfill_idempotent self r = let[@inline] fulfill_idempotent self r =
try fulfill self r with Already_fulfilled -> () try fulfill self r with Already_fulfilled -> ()
(* ### combinators ### *) (* ### combinators ### *)
let spawn ~on f : _ t = let spawn ?name ~on f : _ t =
let fut, promise = make () in let fut, promise = make () in
let task () = let task () =
@ -97,13 +99,13 @@ let spawn ~on f : _ t =
fulfill promise res fulfill promise res
in in
Runner.run_async on task; Runner.run_async ?name on task;
fut fut
let spawn_on_current_runner f : _ t = let spawn_on_current_runner ?name f : _ t =
match Runner.get_current_runner () with match Runner.get_current_runner () with
| None -> failwith "Fut.spawn_on_current_runner: not running on a runner" | None -> failwith "Fut.spawn_on_current_runner: not running on a runner"
| Some on -> spawn ~on f | Some on -> spawn ?name ~on f
let reify_error (f : 'a t) : 'a or_error t = let reify_error (f : 'a t) : 'a or_error t =
match peek f with match peek f with
@ -413,9 +415,11 @@ let await (fut : 'a t) : 'a =
Suspend_.suspend Suspend_.suspend
{ {
Suspend_.handle = Suspend_.handle =
(fun ~run k -> (fun ~name ~run k ->
on_result fut (function on_result fut (function
| Ok _ -> run (fun () -> k (Ok ())) | Ok _ ->
(* schedule continuation with the same name *)
run ~name (fun () -> k (Ok ()))
| Error (exn, bt) -> | Error (exn, bt) ->
(* fail continuation immediately *) (* fail continuation immediately *)
k (Error (exn, bt)))); k (Error (exn, bt))));

View file

@ -81,11 +81,11 @@ val is_done : _ t -> bool
(** {2 Combinators} *) (** {2 Combinators} *)
val spawn : on:Runner.t -> (unit -> 'a) -> 'a t val spawn : ?name:string -> on:Runner.t -> (unit -> 'a) -> 'a t
(** [spaw ~on f] runs [f()] on the given runner [on], and return a future that will (** [spaw ~on f] runs [f()] on the given runner [on], and return a future that will
hold its result. *) hold its result. *)
val spawn_on_current_runner : (unit -> 'a) -> 'a t val spawn_on_current_runner : ?name:string -> (unit -> 'a) -> 'a t
(** This must be run from inside a runner, and schedules (** This must be run from inside a runner, and schedules
the new task on it as well. the new task on it as well.

View file

@ -1,9 +1,19 @@
include Runner include Runner
let run_async_ ~name f =
let sp = Tracing_.enter_span name in
try
let x = f () in
Tracing_.exit_span sp;
x
with e ->
let bt = Printexc.get_raw_backtrace () in
Tracing_.exit_span sp;
Printexc.raise_with_backtrace e bt
let runner : t = let runner : t =
Runner.For_runner_implementors.create Runner.For_runner_implementors.create
~size:(fun () -> 0) ~size:(fun () -> 0)
~num_tasks:(fun () -> 0) ~num_tasks:(fun () -> 0)
~shutdown:(fun ~wait:_ () -> ()) ~shutdown:(fun ~wait:_ () -> ())
~run_async:(fun f -> f ()) ~run_async:run_async_ ()
()

View file

@ -23,10 +23,13 @@ val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t
to run the thread. This ensures that we don't always pick the same domain to run the thread. This ensures that we don't always pick the same domain
to run all the various threads needed in an application (timers, event loops, etc.) *) to run all the various threads needed in an application (timers, event loops, etc.) *)
val run_async : Runner.t -> (unit -> unit) -> unit val run_async : ?name:string -> Runner.t -> (unit -> unit) -> unit
(** [run_async runner task] schedules the task to run (** [run_async runner task] schedules the task to run
on the given runner. This means [task()] will be executed on the given runner. This means [task()] will be executed
at some point in the future, possibly in another thread. at some point in the future, possibly in another thread.
@param name if provided and [Trace] is present in dependencies, a span
will be created when the task starts, and will stop when the task is over.
(since NEXT_RELEASE)
@since 0.5 *) @since 0.5 *)
val recommended_thread_count : unit -> int val recommended_thread_count : unit -> int
@ -35,13 +38,16 @@ val recommended_thread_count : unit -> int
this because many of them will be blocked most of the time). this because many of them will be blocked most of the time).
@since 0.5 *) @since 0.5 *)
val spawn : on:Runner.t -> (unit -> 'a) -> 'a Fut.t val spawn : ?name:string -> on:Runner.t -> (unit -> 'a) -> 'a Fut.t
(** [spawn ~on f] runs [f()] on the runner (a thread pool typically) (** [spawn ~on f] runs [f()] on the runner (a thread pool typically)
and returns a future result for it. See {!Fut.spawn}. and returns a future result for it. See {!Fut.spawn}.
@param name if provided and [Trace] is present in dependencies,
a span will be created for the future. (since NEXT_RELEASE)
@since 0.5 *) @since 0.5 *)
val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.t val spawn_on_current_runner : ?name:string -> (unit -> 'a) -> 'a Fut.t
(** See {!Fut.spawn_on_current_runner}. (** See {!Fut.spawn_on_current_runner}.
@param name see {!spawn}. since NEXT_RELEASE.
@since 0.5 *) @since 0.5 *)
[@@@ifge 5.0] [@@@ifge 5.0]

View file

@ -3,7 +3,7 @@ module TLS = Thread_local_storage_
type task = unit -> unit type task = unit -> unit
type t = { type t = {
run_async: (unit -> unit) -> unit; run_async: name:string -> task -> unit;
shutdown: wait:bool -> unit -> unit; shutdown: wait:bool -> unit -> unit;
size: unit -> int; size: unit -> int;
num_tasks: unit -> int; num_tasks: unit -> int;
@ -11,7 +11,7 @@ type t = {
exception Shutdown exception Shutdown
let[@inline] run_async (self : t) f : unit = self.run_async f let[@inline] run_async ?(name = "") (self : t) f : unit = self.run_async ~name f
let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true () let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true ()
let[@inline] shutdown_without_waiting (self : t) : unit = let[@inline] shutdown_without_waiting (self : t) : unit =
@ -20,9 +20,9 @@ let[@inline] shutdown_without_waiting (self : t) : unit =
let[@inline] num_tasks (self : t) : int = self.num_tasks () let[@inline] num_tasks (self : t) : int = self.num_tasks ()
let[@inline] size (self : t) : int = self.size () let[@inline] size (self : t) : int = self.size ()
let run_wait_block self (f : unit -> 'a) : 'a = let run_wait_block ?name self (f : unit -> 'a) : 'a =
let q = Bb_queue.create () in let q = Bb_queue.create () in
run_async self (fun () -> run_async ?name self (fun () ->
try try
let x = f () in let x = f () in
Bb_queue.push q (Ok x) Bb_queue.push q (Ok x)

View file

@ -33,13 +33,16 @@ val shutdown_without_waiting : t -> unit
exception Shutdown exception Shutdown
val run_async : t -> task -> unit val run_async : ?name:string -> t -> task -> unit
(** [run_async pool f] schedules [f] for later execution on the runner (** [run_async pool f] schedules [f] for later execution on the runner
in one of the threads. [f()] will run on one of the runner's in one of the threads. [f()] will run on one of the runner's
worker threads/domains. worker threads/domains.
@param name if provided and [Trace] is present in dependencies, a span
will be created when the task starts, and will stop when the task is over.
(since NEXT_RELEASE)
@raise Shutdown if the runner was shut down before [run_async] was called. *) @raise Shutdown if the runner was shut down before [run_async] was called. *)
val run_wait_block : t -> (unit -> 'a) -> 'a val run_wait_block : ?name:string -> t -> (unit -> 'a) -> 'a
(** [run_wait_block pool f] schedules [f] for later execution (** [run_wait_block pool f] schedules [f] for later execution
on the pool, like {!run_async}. on the pool, like {!run_async}.
It then blocks the current thread until [f()] is done executing, It then blocks the current thread until [f()] is done executing,
@ -47,7 +50,10 @@ val run_wait_block : t -> (unit -> 'a) -> 'a
will raise it as well. will raise it as well.
{b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block} {b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block}
about the required discipline to avoid deadlocks). *) about the required discipline to avoid deadlocks).
@raise Shutdown if the runner was already shut down *)
(** {2 Implementing runners} *)
(** This module is specifically intended for users who implement their (** This module is specifically intended for users who implement their
own runners. Regular users of Moonpool should not need to look at it. *) own runners. Regular users of Moonpool should not need to look at it. *)
@ -56,7 +62,7 @@ module For_runner_implementors : sig
size:(unit -> int) -> size:(unit -> int) ->
num_tasks:(unit -> int) -> num_tasks:(unit -> int) ->
shutdown:(wait:bool -> unit -> unit) -> shutdown:(wait:bool -> unit -> unit) ->
run_async:(task -> unit) -> run_async:(name:string -> task -> unit) ->
unit -> unit ->
t t
(** Create a new runner. (** Create a new runner.
@ -65,6 +71,10 @@ module For_runner_implementors : sig
so that {!Fork_join} and other 5.x features work properly. *) so that {!Fork_join} and other 5.x features work properly. *)
val k_cur_runner : t option ref Thread_local_storage_.key val k_cur_runner : t option ref Thread_local_storage_.key
(** Key that should be used by each runner to store itself in TLS
on every thread it controls, so that tasks running on these threads
can access the runner. This is necessary for {!get_current_runner}
to work. *)
end end
val get_current_runner : unit -> t option val get_current_runner : unit -> t option

View file

@ -1,7 +1,9 @@
type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit
type task = unit -> unit type task = unit -> unit
type suspension_handler = { handle: run:(task -> unit) -> suspension -> unit } type suspension_handler = {
handle: name:string -> run:(name:string -> task -> unit) -> suspension -> unit;
}
[@@unboxed] [@@unboxed]
[@@@ifge 5.0] [@@@ifge 5.0]
@ -13,7 +15,8 @@ type _ Effect.t += Suspend : suspension_handler -> unit Effect.t
let[@inline] suspend h = Effect.perform (Suspend h) let[@inline] suspend h = Effect.perform (Suspend h)
let with_suspend ~(run : task -> unit) (f : unit -> unit) : unit = let with_suspend ~name ~on_suspend ~(run : name:string -> task -> unit)
(f : unit -> unit) : unit =
let module E = Effect.Deep in let module E = Effect.Deep in
(* effect handler *) (* effect handler *)
let effc : type e. e Effect.t -> ((e, _) E.continuation -> _) option = let effc : type e. e Effect.t -> ((e, _) E.continuation -> _) option =
@ -21,11 +24,12 @@ let with_suspend ~(run : task -> unit) (f : unit -> unit) : unit =
| Suspend h -> | Suspend h ->
Some Some
(fun k -> (fun k ->
on_suspend ();
let k' : suspension = function let k' : suspension = function
| Ok () -> E.continue k () | Ok () -> E.continue k ()
| Error (exn, bt) -> E.discontinue_with_backtrace k exn bt | Error (exn, bt) -> E.discontinue_with_backtrace k exn bt
in in
h.handle ~run k') h.handle ~name ~run k')
| _ -> None | _ -> None
in in
@ -34,14 +38,16 @@ let with_suspend ~(run : task -> unit) (f : unit -> unit) : unit =
(* DLA interop *) (* DLA interop *)
let prepare_for_await () : Dla_.t = let prepare_for_await () : Dla_.t =
(* current state *) (* current state *)
let st : ((task -> unit) * suspension) option A.t = A.make None in let st : (string * (name:string -> task -> unit) * suspension) option A.t =
A.make None
in
let release () : unit = let release () : unit =
match A.exchange st None with match A.exchange st None with
| None -> () | None -> ()
| Some (run, k) -> run (fun () -> k (Ok ())) | Some (name, run, k) -> run ~name (fun () -> k (Ok ()))
and await () : unit = and await () : unit =
suspend { handle = (fun ~run k -> A.set st (Some (run, k))) } suspend { handle = (fun ~name ~run k -> A.set st (Some (name, run, k))) }
in in
let t = { Dla_.release; await } in let t = { Dla_.release; await } in

View file

@ -8,12 +8,15 @@ type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit
type task = unit -> unit type task = unit -> unit
type suspension_handler = { handle: run:(task -> unit) -> suspension -> unit } type suspension_handler = {
handle: name:string -> run:(name:string -> task -> unit) -> suspension -> unit;
}
[@@unboxed] [@@unboxed]
(** The handler that knows what to do with the suspended computation. (** The handler that knows what to do with the suspended computation.
The handler is given two things: The handler is given a few things:
- the name (if any) of the current computation
- the suspended computation (which can be resumed with a result - the suspended computation (which can be resumed with a result
eventually); eventually);
- a [run] function that can be used to start tasks to perform some - a [run] function that can be used to start tasks to perform some
@ -51,10 +54,18 @@ val suspend : suspension_handler -> unit
val prepare_for_await : unit -> Dla_.t val prepare_for_await : unit -> Dla_.t
(** Our stub for DLA. Unstable. *) (** Our stub for DLA. Unstable. *)
val with_suspend : run:(task -> unit) -> (unit -> unit) -> unit val with_suspend :
name:string ->
on_suspend:(unit -> unit) ->
run:(name:string -> task -> unit) ->
(unit -> unit) ->
unit
(** [with_suspend ~run f] runs [f()] in an environment where [suspend] (** [with_suspend ~run f] runs [f()] in an environment where [suspend]
will work. If [f()] suspends with suspension handler [h], will work. If [f()] suspends with suspension handler [h],
this calls [h ~run k] where [k] is the suspension. this calls [h ~run k] where [k] is the suspension.
The suspension should always run in a new task, via [run].
@param on_suspend called when [f()] suspends itself.
This will not do anything on OCaml 4.x. This will not do anything on OCaml 4.x.
*) *)

4
src/tracing_.dummy.ml Normal file
View file

@ -0,0 +1,4 @@
let enabled () = false
let dummy_span = 0L
let enter_span _name = dummy_span
let exit_span = ignore

4
src/tracing_.mli Normal file
View file

@ -0,0 +1,4 @@
val dummy_span : int64
val enter_span : string -> int64
val exit_span : int64 -> unit
val enabled : unit -> bool

13
src/tracing_.real.ml Normal file
View file

@ -0,0 +1,13 @@
module Trace = Trace_core
let enabled = Trace.enabled
let dummy_span = Int64.min_int
let dummy_file_ = "<unknown file>"
let[@inline] enter_span name : int64 =
if name = "" then
dummy_span
else
Trace.enter_span ~__FILE__:dummy_file_ ~__LINE__:0 name
let[@inline] exit_span sp = if sp <> dummy_span then Trace.exit_span sp

View file

@ -13,10 +13,16 @@ module Id = struct
let equal : t -> t -> bool = ( == ) let equal : t -> t -> bool = ( == )
end end
type task_with_name = {
f: task;
name: string;
}
type worker_state = { type worker_state = {
pool_id_: Id.t; (** Unique per pool *) pool_id_: Id.t; (** Unique per pool *)
mutable thread: Thread.t; mutable thread: Thread.t;
q: task WSQ.t; (** Work stealing queue *) q: task_with_name WSQ.t; (** Work stealing queue *)
mutable cur_span: int64;
rng: Random.State.t; rng: Random.State.t;
} }
(** State for a given worker. Only this worker is (** State for a given worker. Only this worker is
@ -29,7 +35,8 @@ type state = {
id_: Id.t; id_: Id.t;
active: bool A.t; (** Becomes [false] when the pool is shutdown. *) active: bool A.t; (** Becomes [false] when the pool is shutdown. *)
workers: worker_state array; (** Fixed set of workers. *) workers: worker_state array; (** Fixed set of workers. *)
main_q: task Queue.t; (** Main queue for tasks coming from the outside *) main_q: task_with_name Queue.t;
(** Main queue for tasks coming from the outside *)
mutable n_waiting: int; (* protected by mutex *) mutable n_waiting: int; (* protected by mutex *)
mutable n_waiting_nonzero: bool; (** [n_waiting > 0] *) mutable n_waiting_nonzero: bool; (** [n_waiting > 0] *)
mutex: Mutex.t; mutex: Mutex.t;
@ -65,9 +72,10 @@ let[@inline] try_wake_someone_ (self : state) : unit =
) )
(** Run [task] as is, on the pool. *) (** Run [task] as is, on the pool. *)
let schedule_task_ (self : state) (w : worker_state option) (task : task) : unit let schedule_task_ (self : state) ~name (w : worker_state option) (f : task) :
= unit =
(* Printf.printf "schedule task now (%d)\n%!" (Thread.id @@ Thread.self ()); *) (* Printf.printf "schedule task now (%d)\n%!" (Thread.id @@ Thread.self ()); *)
let task = { f; name } in
match w with match w with
| Some w when Id.equal self.id_ w.pool_id_ -> | Some w when Id.equal self.id_ w.pool_id_ ->
(* we're on this same pool, schedule in the worker's state. Otherwise (* we're on this same pool, schedule in the worker's state. Otherwise
@ -96,24 +104,36 @@ let schedule_task_ (self : state) (w : worker_state option) (task : task) : unit
raise Shutdown raise Shutdown
(** Run this task, now. Must be called from a worker. *) (** Run this task, now. Must be called from a worker. *)
let run_task_now_ (self : state) ~runner task : unit = let run_task_now_ (self : state) ~runner (w : worker_state) ~name task : unit =
(* Printf.printf "run task now (%d)\n%!" (Thread.id @@ Thread.self ()); *) (* Printf.printf "run task now (%d)\n%!" (Thread.id @@ Thread.self ()); *)
let (AT_pair (before_task, after_task)) = self.around_task in let (AT_pair (before_task, after_task)) = self.around_task in
let _ctx = before_task runner in let _ctx = before_task runner in
w.cur_span <- Tracing_.enter_span name;
let[@inline] exit_span_ () =
Tracing_.exit_span w.cur_span;
w.cur_span <- Tracing_.dummy_span
in
let run_another_task ~name task' =
let w = find_current_worker_ () in
schedule_task_ self w ~name task'
in
(* run the task now, catching errors *) (* run the task now, catching errors *)
(try (try
(* run [task()] and handle [suspend] in it *) (* run [task()] and handle [suspend] in it *)
Suspend_.with_suspend task ~run:(fun task' -> Suspend_.with_suspend task ~name ~run:run_another_task
let w = find_current_worker_ () in ~on_suspend:exit_span_
schedule_task_ self w task')
with e -> with e ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
self.on_exn e bt); self.on_exn e bt);
exit_span_ ();
after_task runner _ctx after_task runner _ctx
let[@inline] run_async_ (self : state) (task : task) : unit = let[@inline] run_async_ (self : state) ~name (f : task) : unit =
let w = find_current_worker_ () in let w = find_current_worker_ () in
schedule_task_ self w task schedule_task_ self w ~name f
(* TODO: function to schedule many tasks from the outside. (* TODO: function to schedule many tasks from the outside.
- build a queue - build a queue
@ -122,8 +142,6 @@ let[@inline] run_async_ (self : state) (task : task) : unit =
- wakeup all (broadcast) - wakeup all (broadcast)
- unlock *) - unlock *)
let run = run_async
(** Wait on condition. Precondition: we hold the mutex. *) (** Wait on condition. Precondition: we hold the mutex. *)
let[@inline] wait_ (self : state) : unit = let[@inline] wait_ (self : state) : unit =
self.n_waiting <- self.n_waiting + 1; self.n_waiting <- self.n_waiting + 1;
@ -132,10 +150,11 @@ let[@inline] wait_ (self : state) : unit =
self.n_waiting <- self.n_waiting - 1; self.n_waiting <- self.n_waiting - 1;
if self.n_waiting = 0 then self.n_waiting_nonzero <- false if self.n_waiting = 0 then self.n_waiting_nonzero <- false
exception Got_task of task exception Got_task of task_with_name
(** Try to steal a task *) (** Try to steal a task *)
let try_to_steal_work_once_ (self : state) (w : worker_state) : task option = let try_to_steal_work_once_ (self : state) (w : worker_state) :
task_with_name option =
let init = Random.State.int w.rng (Array.length self.workers) in let init = Random.State.int w.rng (Array.length self.workers) in
try try
@ -160,7 +179,7 @@ let worker_run_self_tasks_ (self : state) ~runner w : unit =
match WSQ.pop w.q with match WSQ.pop w.q with
| Some task -> | Some task ->
try_wake_someone_ self; try_wake_someone_ self;
run_task_now_ self ~runner task run_task_now_ self ~runner w ~name:task.name task.f
| None -> continue := false | None -> continue := false
done done
@ -173,7 +192,7 @@ let worker_thread_ (self : state) ~(runner : t) (w : worker_state) : unit =
worker_run_self_tasks_ self ~runner w; worker_run_self_tasks_ self ~runner w;
try_steal () try_steal ()
and run_task task : unit = and run_task task : unit =
run_task_now_ self ~runner task; run_task_now_ self ~runner w ~name:task.name task.f;
main () main ()
and try_steal () = and try_steal () =
match try_to_steal_work_once_ self w with match try_to_steal_work_once_ self w with
@ -231,7 +250,7 @@ type ('a, 'b) create_args =
'a 'a
(** Arguments used in {!create}. See {!create} for explanations. *) (** Arguments used in {!create}. See {!create} for explanations. *)
let dummy_task_ () = assert false let dummy_task_ = { f = ignore; name = "DUMMY_TASK" }
let create ?(on_init_thread = default_thread_init_exit_) let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
@ -256,6 +275,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
{ {
pool_id_; pool_id_;
thread = dummy; thread = dummy;
cur_span = Tracing_.dummy_span;
q = WSQ.create ~dummy:dummy_task_ (); q = WSQ.create ~dummy:dummy_task_ ();
rng = Random.State.make [| i |]; rng = Random.State.make [| i |];
}) })
@ -279,7 +299,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
let runner = let runner =
Runner.For_runner_implementors.create Runner.For_runner_implementors.create
~shutdown:(fun ~wait () -> shutdown_ pool ~wait) ~shutdown:(fun ~wait () -> shutdown_ pool ~wait)
~run_async:(fun f -> run_async_ pool f) ~run_async:(fun ~name f -> run_async_ pool ~name f)
~size:(fun () -> size_ pool) ~size:(fun () -> size_ pool)
~num_tasks:(fun () -> num_tasks_ pool) ~num_tasks:(fun () -> num_tasks_ pool)
() ()

View file

@ -53,7 +53,3 @@ val with_ : (unit -> (t -> 'a) -> 'a, _) create_args
Most parameters are the same as in {!create}. Most parameters are the same as in {!create}.
@since 0.3 *) @since 0.3 *)
val run : t -> (unit -> unit) -> unit
[@@deprecated "use run_async"]
(** deprecated alias to {!run_async} *)