Merge pull request #19 from c-cube/wip-tracing-support

wip: tracing support
This commit is contained in:
Simon Cruanes 2024-02-06 22:24:10 -05:00 committed by GitHub
commit 6ed870aa9c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 238 additions and 98 deletions

View file

@ -39,4 +39,6 @@ jobs:
if: matrix.ocaml-compiler == '5.0' if: matrix.ocaml-compiler == '5.0'
- run: opam exec -- dune build @install @runtest - run: opam exec -- dune build @install @runtest
if: matrix.ocaml-compiler == '5.0' if: matrix.ocaml-compiler == '5.0'
- run: opam install trace thread-local-storage
- run: opam exec -- dune build @install

View file

@ -1,4 +1,7 @@
open Moonpool open Moonpool
module Trace = Trace_core
let ( let@ ) = ( @@ )
let rec fib_direct x = let rec fib_direct x =
if x <= 1 then if x <= 1 then
@ -10,7 +13,7 @@ let cutoff = ref 20
let rec fib ~on x : int Fut.t = let rec fib ~on x : int Fut.t =
if x <= !cutoff then if x <= !cutoff then
Fut.spawn ~on (fun () -> fib_direct x) Fut.spawn ~name:"fib" ~on (fun () -> fib_direct x)
else else
let open Fut.Infix in let open Fut.Infix in
let+ t1 = fib ~on (x - 1) and+ t2 = fib ~on (x - 2) in let+ t1 = fib ~on (x - 1) and+ t2 = fib ~on (x - 2) in
@ -27,14 +30,14 @@ let fib_fj ~on x : int Fut.t =
n1 + n2 n1 + n2
) )
in in
Fut.spawn ~on (fun () -> fib_rec x) Fut.spawn ~name:"fib" ~on (fun () -> fib_rec x)
let fib_await ~on x : int Fut.t = let fib_await ~on x : int Fut.t =
let rec fib_rec x : int Fut.t = let rec fib_rec x : int Fut.t =
if x <= !cutoff then if x <= !cutoff then
Fut.spawn ~on (fun () -> fib_direct x) Fut.spawn ~name:"fib" ~on (fun () -> fib_direct x)
else else
Fut.spawn ~on (fun () -> Fut.spawn ~name:"fib" ~on (fun () ->
let n1 = fib_rec (x - 1) in let n1 = fib_rec (x - 1) in
let n2 = fib_rec (x - 2) in let n2 = fib_rec (x - 2) in
let n1 = Fut.await n1 in let n1 = Fut.await n1 in
@ -66,6 +69,7 @@ let str_of_int_opt = function
| Some i -> Printf.sprintf "Some %d" i | Some i -> Printf.sprintf "Some %d" i
let run ~psize ~n ~seq ~dl ~fj ~await ~niter ~kind () : unit = let run ~psize ~n ~seq ~dl ~fj ~await ~niter ~kind () : unit =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "fib.run" in
let pool = lazy (create_pool ~kind ~psize ()) in let pool = lazy (create_pool ~kind ~psize ()) in
let dl_pool = let dl_pool =
lazy lazy
@ -108,6 +112,7 @@ let run ~psize ~n ~seq ~dl ~fj ~await ~niter ~kind () : unit =
Ws_pool.shutdown (Lazy.force pool) Ws_pool.shutdown (Lazy.force pool)
let () = let () =
let@ () = Trace_tef.with_setup () in
let n = ref 40 in let n = ref 40 in
let psize = ref None in let psize = ref None in
let seq = ref false in let seq = ref false in

View file

@ -7,6 +7,7 @@ let j = ref 0
let spf = Printf.sprintf let spf = Printf.sprintf
let run_sequential (num_steps : int) : float = let run_sequential (num_steps : int) : float =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "pi.seq" in
let step = 1. /. float num_steps in let step = 1. /. float num_steps in
let sum = ref 0. in let sum = ref 0. in
for i = 0 to num_steps - 1 do for i = 0 to num_steps - 1 do
@ -42,6 +43,11 @@ let run_par1 ~kind (num_steps : int) : float =
(* one chunk of the work *) (* one chunk of the work *)
let run_task _idx_task : unit = let run_task _idx_task : unit =
let@ _sp =
Trace.with_span ~__FILE__ ~__LINE__ "pi.slice" ~data:(fun () ->
[ "i", `Int _idx_task ])
in
let sum = ref 0. in let sum = ref 0. in
let i = ref 0 in let i = ref 0 in
while !i < num_steps do while !i < num_steps do
@ -69,7 +75,7 @@ let run_fork_join ~kind num_steps : float =
let step = 1. /. float num_steps in let step = 1. /. float num_steps in
let global_sum = Lock.create 0. in let global_sum = Lock.create 0. in
Ws_pool.run_wait_block pool (fun () -> Ws_pool.run_wait_block ~name:"pi.fj" pool (fun () ->
Fork_join.for_ Fork_join.for_
~chunk_size:(3 + (num_steps / num_tasks)) ~chunk_size:(3 + (num_steps / num_tasks))
num_steps num_steps
@ -99,6 +105,9 @@ type mode =
let () = let () =
let@ () = Trace_tef.with_setup () in let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "main";
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
let mode = ref Sequential in let mode = ref Sequential in
let n = ref 1000 in let n = ref 1000 in
let time = ref false in let time = ref false in

View file

@ -28,6 +28,7 @@
(>= 1.9.0) (>= 1.9.0)
:with-test))) :with-test)))
(depopts (depopts
(trace (>= 0.6))
thread-local-storage thread-local-storage
(domain-local-await (>= 0.2))) (domain-local-await (>= 0.2)))
(tags (tags

View file

@ -19,6 +19,7 @@ depends: [
"mdx" {>= "1.9.0" & with-test} "mdx" {>= "1.9.0" & with-test}
] ]
depopts: [ depopts: [
"trace" {>= "0.6"}
"thread-local-storage" "thread-local-storage"
"domain-local-await" {>= "0.2"} "domain-local-await" {>= "0.2"}
] ]

View file

@ -1,7 +1,7 @@
(library (library
(public_name moonpool) (public_name moonpool)
(name moonpool) (name moonpool)
(private_modules d_pool_ dla_) (private_modules d_pool_ dla_ tracing_)
(preprocess (preprocess
(action (action
(run %{project_root}/src/cpp/cpp.exe %{input-file}))) (run %{project_root}/src/cpp/cpp.exe %{input-file})))
@ -9,6 +9,9 @@
(select thread_local_storage_.ml from (select thread_local_storage_.ml from
(thread-local-storage -> thread_local_storage_.stub.ml) (thread-local-storage -> thread_local_storage_.stub.ml)
(-> thread_local_storage_.real.ml)) (-> thread_local_storage_.real.ml))
(select tracing_.ml from
(trace.core -> tracing_.real.ml)
(-> tracing_.dummy.ml))
(select dla_.ml from (select dla_.ml from
(domain-local-await -> dla_.real.ml) (domain-local-await -> dla_.real.ml)
( -> dla_.dummy.ml)))) ( -> dla_.dummy.ml))))

View file

@ -3,9 +3,14 @@ include Runner
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
type task_with_name = {
f: unit -> unit;
name: string;
}
type state = { type state = {
threads: Thread.t array; threads: Thread.t array;
q: task Bb_queue.t; (** Queue for tasks. *) q: task_with_name Bb_queue.t; (** Queue for tasks. *)
} }
(** internal state *) (** internal state *)
@ -13,7 +18,7 @@ let[@inline] size_ (self : state) = Array.length self.threads
let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q
(** Run [task] as is, on the pool. *) (** Run [task] as is, on the pool. *)
let schedule_ (self : state) (task : task) : unit = let schedule_ (self : state) (task : task_with_name) : unit =
try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown
type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task
@ -22,19 +27,33 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit =
TLS.get Runner.For_runner_implementors.k_cur_runner := Some runner; TLS.get Runner.For_runner_implementors.k_cur_runner := Some runner;
let (AT_pair (before_task, after_task)) = around_task in let (AT_pair (before_task, after_task)) = around_task in
let run_task task : unit = let cur_span = ref Tracing_.dummy_span in
let[@inline] exit_span_ () =
Tracing_.exit_span !cur_span;
cur_span := Tracing_.dummy_span
in
let run_another_task ~name task' = schedule_ self { f = task'; name } in
let run_task (task : task_with_name) : unit =
let _ctx = before_task runner in let _ctx = before_task runner in
cur_span := Tracing_.enter_span task.name;
(* run the task now, catching errors *) (* run the task now, catching errors *)
(try Suspend_.with_suspend task ~run:(fun task' -> schedule_ self task') (try
Suspend_.with_suspend task.f ~name:task.name ~run:run_another_task
~on_suspend:exit_span_
with e -> with e ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
on_exn e bt); on_exn e bt);
exit_span_ ();
after_task runner _ctx after_task runner _ctx
in in
let main_loop () = let main_loop () =
let continue = ref true in let continue = ref true in
while !continue do while !continue do
assert (!cur_span = Tracing_.dummy_span);
match Bb_queue.pop self.q with match Bb_queue.pop self.q with
| task -> run_task task | task -> run_task task
| exception Bb_queue.Closed -> continue := false | exception Bb_queue.Closed -> continue := false
@ -84,10 +103,12 @@ let create ?(on_init_thread = default_thread_init_exit_)
{ threads = Array.make num_threads dummy; q = Bb_queue.create () } { threads = Array.make num_threads dummy; q = Bb_queue.create () }
in in
let run_async ~name f = schedule_ pool { f; name } in
let runner = let runner =
Runner.For_runner_implementors.create Runner.For_runner_implementors.create
~shutdown:(fun ~wait () -> shutdown_ pool ~wait) ~shutdown:(fun ~wait () -> shutdown_ pool ~wait)
~run_async:(fun f -> schedule_ pool f) ~run_async
~size:(fun () -> size_ pool) ~size:(fun () -> size_ pool)
~num_tasks:(fun () -> num_tasks_ pool) ~num_tasks:(fun () -> num_tasks_ pool)
() ()

View file

@ -48,7 +48,7 @@ module State_ = struct
Suspend_.suspend Suspend_.suspend
{ {
Suspend_.handle = Suspend_.handle =
(fun ~run:_ suspension -> (fun ~name:_ ~run:_ suspension ->
while while
let old_st = A.get self in let old_st = A.get self in
match old_st with match old_st with
@ -113,19 +113,19 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
max 1 (1 + (n / D_pool_.n_domains ())) max 1 (1 + (n / D_pool_.n_domains ()))
in in
let start_tasks ~run (suspension : Suspend_.suspension) = let start_tasks ~name ~run (suspension : Suspend_.suspension) =
let task_for ~offset ~len_range = let task_for ~offset ~len_range =
match f offset (offset + len_range - 1) with match f offset (offset + len_range - 1) with
| () -> | () ->
if A.fetch_and_add missing (-len_range) = len_range then if A.fetch_and_add missing (-len_range) = len_range then
(* all tasks done successfully *) (* all tasks done successfully *)
suspension (Ok ()) run ~name (fun () -> suspension (Ok ()))
| exception exn -> | exception exn ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
if not (A.exchange has_failed true) then if not (A.exchange has_failed true) then
(* first one to fail, and [missing] must be >= 2 (* first one to fail, and [missing] must be >= 2
because we're not decreasing it. *) because we're not decreasing it. *)
suspension (Error (exn, bt)) run ~name (fun () -> suspension (Error (exn, bt)))
in in
let i = ref 0 in let i = ref 0 in
@ -135,7 +135,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
let len_range = min chunk_size (n - offset) in let len_range = min chunk_size (n - offset) in
assert (offset + len_range <= n); assert (offset + len_range <= n);
run (fun () -> task_for ~offset ~len_range); run ~name (fun () -> task_for ~offset ~len_range);
i := !i + len_range i := !i + len_range
done done
in in
@ -143,9 +143,9 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
Suspend_.suspend Suspend_.suspend
{ {
Suspend_.handle = Suspend_.handle =
(fun ~run suspension -> (fun ~name ~run suspension ->
(* run tasks, then we'll resume [suspension] *) (* run tasks, then we'll resume [suspension] *)
start_tasks ~run suspension); start_tasks ~run ~name suspension);
} }
) )

View file

@ -5,16 +5,24 @@ type 'a waiter = 'a or_error -> unit
type 'a state = type 'a state =
| Done of 'a or_error | Done of 'a or_error
| Waiting of 'a waiter list | Waiting of {
waiters: 'a waiter list;
name: string;
}
type 'a t = { st: 'a state A.t } [@@unboxed] type 'a t = { st: 'a state A.t } [@@unboxed]
type 'a promise = 'a t type 'a promise = 'a t
let make () = let[@inline] get_name_ (self : _ t) =
let fut = { st = A.make (Waiting []) } in match A.get self.st with
| Done _ -> ""
| Waiting { name; _ } -> name
let make ?(name = "") () =
let fut = { st = A.make (Waiting { waiters = []; name }) } in
fut, fut fut, fut
let of_result x : _ t = { st = A.make (Done x) } let[@inline] of_result x : _ t = { st = A.make (Done x) }
let[@inline] return x : _ t = of_result (Ok x) let[@inline] return x : _ t = of_result (Ok x)
let[@inline] fail e bt : _ t = of_result (Error (e, bt)) let[@inline] fail e bt : _ t = of_result (Error (e, bt))
@ -53,9 +61,8 @@ let on_result (self : _ t) (f : _ waiter) : unit =
| Done x -> | Done x ->
f x; f x;
false false
| Waiting l -> | Waiting { waiters = l; name } ->
let must_retry = not (A.compare_and_set self.st st (Waiting (f :: l))) in not (A.compare_and_set self.st st (Waiting { waiters = f :: l; name }))
must_retry
do do
Domain_.relax () Domain_.relax ()
done done
@ -63,28 +70,31 @@ let on_result (self : _ t) (f : _ waiter) : unit =
exception Already_fulfilled exception Already_fulfilled
let fulfill (self : _ t) (r : _ result) : unit = let fulfill (self : _ t) (r : _ result) : unit =
let fs = ref [] in
while while
let st = A.get self.st in let st = A.get self.st in
match st with match st with
| Done _ -> raise Already_fulfilled | Done _ -> raise Already_fulfilled
| Waiting l -> | Waiting { waiters = l; name = _ } ->
let did_swap = A.compare_and_set self.st st (Done r) in let did_swap = A.compare_and_set self.st st (Done r) in
if did_swap then ( if did_swap then (
(* success, now call all the waiters *) (* success, now call all the waiters *)
List.iter (fun f -> try f r with _ -> ()) l; fs := l;
false false
) else ) else
true true
do do
Domain_.relax () Domain_.relax ()
done done;
List.iter (fun f -> try f r with _ -> ()) !fs;
()
let[@inline] fulfill_idempotent self r = let[@inline] fulfill_idempotent self r =
try fulfill self r with Already_fulfilled -> () try fulfill self r with Already_fulfilled -> ()
(* ### combinators ### *) (* ### combinators ### *)
let spawn ~on f : _ t = let spawn ?name ~on f : _ t =
let fut, promise = make () in let fut, promise = make () in
let task () = let task () =
@ -97,13 +107,13 @@ let spawn ~on f : _ t =
fulfill promise res fulfill promise res
in in
Runner.run_async on task; Runner.run_async ?name on task;
fut fut
let spawn_on_current_runner f : _ t = let spawn_on_current_runner ?name f : _ t =
match Runner.get_current_runner () with match Runner.get_current_runner () with
| None -> failwith "Fut.spawn_on_current_runner: not running on a runner" | None -> failwith "Fut.spawn_on_current_runner: not running on a runner"
| Some on -> spawn ~on f | Some on -> spawn ?name ~on f
let reify_error (f : 'a t) : 'a or_error t = let reify_error (f : 'a t) : 'a or_error t =
match peek f with match peek f with
@ -113,7 +123,7 @@ let reify_error (f : 'a t) : 'a or_error t =
on_result f (fun r -> fulfill promise (Ok r)); on_result f (fun r -> fulfill promise (Ok r));
fut fut
let get_runner_ ?on () : Runner.t option = let[@inline] get_runner_ ?on () : Runner.t option =
match on with match on with
| Some _ as r -> r | Some _ as r -> r
| None -> Runner.get_current_runner () | None -> Runner.get_current_runner ()
@ -129,20 +139,22 @@ let map ?on ~f fut : _ t =
| Error e_bt -> Error e_bt | Error e_bt -> Error e_bt
in in
let name = get_name_ fut in
match peek fut, get_runner_ ?on () with match peek fut, get_runner_ ?on () with
| Some res, None -> of_result @@ map_immediate_ res | Some res, None -> of_result @@ map_immediate_ res
| Some res, Some runner -> | Some res, Some runner ->
let fut2, promise = make () in let fut2, promise = make ~name () in
Runner.run_async runner (fun () -> fulfill promise @@ map_immediate_ res); Runner.run_async ~name runner (fun () ->
fulfill promise @@ map_immediate_ res);
fut2 fut2
| None, None -> | None, None ->
let fut2, promise = make () in let fut2, promise = make ~name () in
on_result fut (fun res -> fulfill promise @@ map_immediate_ res); on_result fut (fun res -> fulfill promise @@ map_immediate_ res);
fut2 fut2
| None, Some runner -> | None, Some runner ->
let fut2, promise = make () in let fut2, promise = make ~name () in
on_result fut (fun res -> on_result fut (fun res ->
Runner.run_async runner (fun () -> Runner.run_async ~name runner (fun () ->
fulfill promise @@ map_immediate_ res)); fulfill promise @@ map_immediate_ res));
fut2 fut2
@ -151,7 +163,7 @@ let join (fut : 'a t t) : 'a t =
| Some (Ok f) -> f | Some (Ok f) -> f
| Some (Error (e, bt)) -> fail e bt | Some (Error (e, bt)) -> fail e bt
| None -> | None ->
let fut2, promise = make () in let fut2, promise = make ~name:(get_name_ fut) () in
on_result fut (function on_result fut (function
| Ok sub_fut -> on_result sub_fut (fulfill promise) | Ok sub_fut -> on_result sub_fut (fulfill promise)
| Error _ as e -> fulfill promise e); | Error _ as e -> fulfill promise e);
@ -174,19 +186,20 @@ let bind ?on ~f fut : _ t =
on_result f_res_fut (fun r -> fulfill promise r) on_result f_res_fut (fun r -> fulfill promise r)
in in
let name = get_name_ fut in
match peek fut, get_runner_ ?on () with match peek fut, get_runner_ ?on () with
| Some res, Some runner -> | Some res, Some runner ->
let fut2, promise = make () in let fut2, promise = make ~name () in
Runner.run_async runner (bind_and_fulfill res promise); Runner.run_async ~name runner (bind_and_fulfill res promise);
fut2 fut2
| Some res, None -> apply_f_to_res res | Some res, None -> apply_f_to_res res
| None, Some runner -> | None, Some runner ->
let fut2, promise = make () in let fut2, promise = make ~name () in
on_result fut (fun r -> on_result fut (fun r ->
Runner.run_async runner (bind_and_fulfill r promise)); Runner.run_async ~name runner (bind_and_fulfill r promise));
fut2 fut2
| None, None -> | None, None ->
let fut2, promise = make () in let fut2, promise = make ~name () in
on_result fut (fun res -> bind_and_fulfill res promise ()); on_result fut (fun res -> bind_and_fulfill res promise ());
fut2 fut2
@ -210,7 +223,7 @@ let both a b : _ t =
| Some (Ok x), Some (Ok y) -> return (x, y) | Some (Ok x), Some (Ok y) -> return (x, y)
| Some (Error (e, bt)), _ | _, Some (Error (e, bt)) -> fail e bt | Some (Error (e, bt)), _ | _, Some (Error (e, bt)) -> fail e bt
| _ -> | _ ->
let fut, promise = make () in let fut, promise = make ~name:(get_name_ a) () in
let st = A.make `Neither in let st = A.make `Neither in
on_result a (function on_result a (function
@ -243,7 +256,7 @@ let choose a b : _ t =
| _, Some (Ok y) -> return (Either.Right y) | _, Some (Ok y) -> return (Either.Right y)
| Some (Error (e, bt)), Some (Error _) -> fail e bt | Some (Error (e, bt)), Some (Error _) -> fail e bt
| _ -> | _ ->
let fut, promise = make () in let fut, promise = make ~name:(get_name_ a) () in
let one_failure = A.make false in let one_failure = A.make false in
on_result a (function on_result a (function
@ -266,7 +279,7 @@ let choose_same a b : _ t =
| _, Some (Ok y) -> return y | _, Some (Ok y) -> return y
| Some (Error (e, bt)), Some (Error _) -> fail e bt | Some (Error (e, bt)), Some (Error _) -> fail e bt
| _ -> | _ ->
let fut, promise = make () in let fut, promise = make ~name:(get_name_ a) () in
let one_failure = A.make false in let one_failure = A.make false in
on_result a (function on_result a (function
@ -413,9 +426,11 @@ let await (fut : 'a t) : 'a =
Suspend_.suspend Suspend_.suspend
{ {
Suspend_.handle = Suspend_.handle =
(fun ~run k -> (fun ~name ~run k ->
on_result fut (function on_result fut (function
| Ok _ -> run (fun () -> k (Ok ())) | Ok _ ->
(* schedule continuation with the same name *)
run ~name (fun () -> k (Ok ()))
| Error (exn, bt) -> | Error (exn, bt) ->
(* fail continuation immediately *) (* fail continuation immediately *)
k (Error (exn, bt)))); k (Error (exn, bt))));

View file

@ -26,8 +26,9 @@ type 'a promise
(** A promise, which can be fulfilled exactly once to set (** A promise, which can be fulfilled exactly once to set
the corresponding future *) the corresponding future *)
val make : unit -> 'a t * 'a promise val make : ?name:string -> unit -> 'a t * 'a promise
(** Make a new future with the associated promise *) (** Make a new future with the associated promise.
@param name name for the future, used for tracing. since NEXT_RELEASE. *)
val on_result : 'a t -> ('a or_error -> unit) -> unit val on_result : 'a t -> ('a or_error -> unit) -> unit
(** [on_result fut f] registers [f] to be called in the future (** [on_result fut f] registers [f] to be called in the future
@ -81,11 +82,11 @@ val is_done : _ t -> bool
(** {2 Combinators} *) (** {2 Combinators} *)
val spawn : on:Runner.t -> (unit -> 'a) -> 'a t val spawn : ?name:string -> on:Runner.t -> (unit -> 'a) -> 'a t
(** [spaw ~on f] runs [f()] on the given runner [on], and return a future that will (** [spaw ~on f] runs [f()] on the given runner [on], and return a future that will
hold its result. *) hold its result. *)
val spawn_on_current_runner : (unit -> 'a) -> 'a t val spawn_on_current_runner : ?name:string -> (unit -> 'a) -> 'a t
(** This must be run from inside a runner, and schedules (** This must be run from inside a runner, and schedules
the new task on it as well. the new task on it as well.

View file

@ -1,9 +1,19 @@
include Runner include Runner
let run_async_ ~name f =
let sp = Tracing_.enter_span name in
try
let x = f () in
Tracing_.exit_span sp;
x
with e ->
let bt = Printexc.get_raw_backtrace () in
Tracing_.exit_span sp;
Printexc.raise_with_backtrace e bt
let runner : t = let runner : t =
Runner.For_runner_implementors.create Runner.For_runner_implementors.create
~size:(fun () -> 0) ~size:(fun () -> 0)
~num_tasks:(fun () -> 0) ~num_tasks:(fun () -> 0)
~shutdown:(fun ~wait:_ () -> ()) ~shutdown:(fun ~wait:_ () -> ())
~run_async:(fun f -> f ()) ~run_async:run_async_ ()
()

View file

@ -23,10 +23,13 @@ val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t
to run the thread. This ensures that we don't always pick the same domain to run the thread. This ensures that we don't always pick the same domain
to run all the various threads needed in an application (timers, event loops, etc.) *) to run all the various threads needed in an application (timers, event loops, etc.) *)
val run_async : Runner.t -> (unit -> unit) -> unit val run_async : ?name:string -> Runner.t -> (unit -> unit) -> unit
(** [run_async runner task] schedules the task to run (** [run_async runner task] schedules the task to run
on the given runner. This means [task()] will be executed on the given runner. This means [task()] will be executed
at some point in the future, possibly in another thread. at some point in the future, possibly in another thread.
@param name if provided and [Trace] is present in dependencies, a span
will be created when the task starts, and will stop when the task is over.
(since NEXT_RELEASE)
@since 0.5 *) @since 0.5 *)
val recommended_thread_count : unit -> int val recommended_thread_count : unit -> int
@ -35,13 +38,16 @@ val recommended_thread_count : unit -> int
this because many of them will be blocked most of the time). this because many of them will be blocked most of the time).
@since 0.5 *) @since 0.5 *)
val spawn : on:Runner.t -> (unit -> 'a) -> 'a Fut.t val spawn : ?name:string -> on:Runner.t -> (unit -> 'a) -> 'a Fut.t
(** [spawn ~on f] runs [f()] on the runner (a thread pool typically) (** [spawn ~on f] runs [f()] on the runner (a thread pool typically)
and returns a future result for it. See {!Fut.spawn}. and returns a future result for it. See {!Fut.spawn}.
@param name if provided and [Trace] is present in dependencies,
a span will be created for the future. (since NEXT_RELEASE)
@since 0.5 *) @since 0.5 *)
val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.t val spawn_on_current_runner : ?name:string -> (unit -> 'a) -> 'a Fut.t
(** See {!Fut.spawn_on_current_runner}. (** See {!Fut.spawn_on_current_runner}.
@param name see {!spawn}. since NEXT_RELEASE.
@since 0.5 *) @since 0.5 *)
[@@@ifge 5.0] [@@@ifge 5.0]

View file

@ -3,7 +3,7 @@ module TLS = Thread_local_storage_
type task = unit -> unit type task = unit -> unit
type t = { type t = {
run_async: (unit -> unit) -> unit; run_async: name:string -> task -> unit;
shutdown: wait:bool -> unit -> unit; shutdown: wait:bool -> unit -> unit;
size: unit -> int; size: unit -> int;
num_tasks: unit -> int; num_tasks: unit -> int;
@ -11,7 +11,7 @@ type t = {
exception Shutdown exception Shutdown
let[@inline] run_async (self : t) f : unit = self.run_async f let[@inline] run_async ?(name = "") (self : t) f : unit = self.run_async ~name f
let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true () let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true ()
let[@inline] shutdown_without_waiting (self : t) : unit = let[@inline] shutdown_without_waiting (self : t) : unit =
@ -20,9 +20,9 @@ let[@inline] shutdown_without_waiting (self : t) : unit =
let[@inline] num_tasks (self : t) : int = self.num_tasks () let[@inline] num_tasks (self : t) : int = self.num_tasks ()
let[@inline] size (self : t) : int = self.size () let[@inline] size (self : t) : int = self.size ()
let run_wait_block self (f : unit -> 'a) : 'a = let run_wait_block ?name self (f : unit -> 'a) : 'a =
let q = Bb_queue.create () in let q = Bb_queue.create () in
run_async self (fun () -> run_async ?name self (fun () ->
try try
let x = f () in let x = f () in
Bb_queue.push q (Ok x) Bb_queue.push q (Ok x)

View file

@ -33,13 +33,16 @@ val shutdown_without_waiting : t -> unit
exception Shutdown exception Shutdown
val run_async : t -> task -> unit val run_async : ?name:string -> t -> task -> unit
(** [run_async pool f] schedules [f] for later execution on the runner (** [run_async pool f] schedules [f] for later execution on the runner
in one of the threads. [f()] will run on one of the runner's in one of the threads. [f()] will run on one of the runner's
worker threads/domains. worker threads/domains.
@param name if provided and [Trace] is present in dependencies, a span
will be created when the task starts, and will stop when the task is over.
(since NEXT_RELEASE)
@raise Shutdown if the runner was shut down before [run_async] was called. *) @raise Shutdown if the runner was shut down before [run_async] was called. *)
val run_wait_block : t -> (unit -> 'a) -> 'a val run_wait_block : ?name:string -> t -> (unit -> 'a) -> 'a
(** [run_wait_block pool f] schedules [f] for later execution (** [run_wait_block pool f] schedules [f] for later execution
on the pool, like {!run_async}. on the pool, like {!run_async}.
It then blocks the current thread until [f()] is done executing, It then blocks the current thread until [f()] is done executing,
@ -47,7 +50,10 @@ val run_wait_block : t -> (unit -> 'a) -> 'a
will raise it as well. will raise it as well.
{b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block} {b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block}
about the required discipline to avoid deadlocks). *) about the required discipline to avoid deadlocks).
@raise Shutdown if the runner was already shut down *)
(** {2 Implementing runners} *)
(** This module is specifically intended for users who implement their (** This module is specifically intended for users who implement their
own runners. Regular users of Moonpool should not need to look at it. *) own runners. Regular users of Moonpool should not need to look at it. *)
@ -56,7 +62,7 @@ module For_runner_implementors : sig
size:(unit -> int) -> size:(unit -> int) ->
num_tasks:(unit -> int) -> num_tasks:(unit -> int) ->
shutdown:(wait:bool -> unit -> unit) -> shutdown:(wait:bool -> unit -> unit) ->
run_async:(task -> unit) -> run_async:(name:string -> task -> unit) ->
unit -> unit ->
t t
(** Create a new runner. (** Create a new runner.
@ -65,6 +71,10 @@ module For_runner_implementors : sig
so that {!Fork_join} and other 5.x features work properly. *) so that {!Fork_join} and other 5.x features work properly. *)
val k_cur_runner : t option ref Thread_local_storage_.key val k_cur_runner : t option ref Thread_local_storage_.key
(** Key that should be used by each runner to store itself in TLS
on every thread it controls, so that tasks running on these threads
can access the runner. This is necessary for {!get_current_runner}
to work. *)
end end
val get_current_runner : unit -> t option val get_current_runner : unit -> t option

View file

@ -1,7 +1,9 @@
type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit
type task = unit -> unit type task = unit -> unit
type suspension_handler = { handle: run:(task -> unit) -> suspension -> unit } type suspension_handler = {
handle: name:string -> run:(name:string -> task -> unit) -> suspension -> unit;
}
[@@unboxed] [@@unboxed]
[@@@ifge 5.0] [@@@ifge 5.0]
@ -13,7 +15,8 @@ type _ Effect.t += Suspend : suspension_handler -> unit Effect.t
let[@inline] suspend h = Effect.perform (Suspend h) let[@inline] suspend h = Effect.perform (Suspend h)
let with_suspend ~(run : task -> unit) (f : unit -> unit) : unit = let with_suspend ~name ~on_suspend ~(run : name:string -> task -> unit)
(f : unit -> unit) : unit =
let module E = Effect.Deep in let module E = Effect.Deep in
(* effect handler *) (* effect handler *)
let effc : type e. e Effect.t -> ((e, _) E.continuation -> _) option = let effc : type e. e Effect.t -> ((e, _) E.continuation -> _) option =
@ -21,11 +24,12 @@ let with_suspend ~(run : task -> unit) (f : unit -> unit) : unit =
| Suspend h -> | Suspend h ->
Some Some
(fun k -> (fun k ->
on_suspend ();
let k' : suspension = function let k' : suspension = function
| Ok () -> E.continue k () | Ok () -> E.continue k ()
| Error (exn, bt) -> E.discontinue_with_backtrace k exn bt | Error (exn, bt) -> E.discontinue_with_backtrace k exn bt
in in
h.handle ~run k') h.handle ~name ~run k')
| _ -> None | _ -> None
in in
@ -34,14 +38,16 @@ let with_suspend ~(run : task -> unit) (f : unit -> unit) : unit =
(* DLA interop *) (* DLA interop *)
let prepare_for_await () : Dla_.t = let prepare_for_await () : Dla_.t =
(* current state *) (* current state *)
let st : ((task -> unit) * suspension) option A.t = A.make None in let st : (string * (name:string -> task -> unit) * suspension) option A.t =
A.make None
in
let release () : unit = let release () : unit =
match A.exchange st None with match A.exchange st None with
| None -> () | None -> ()
| Some (run, k) -> run (fun () -> k (Ok ())) | Some (name, run, k) -> run ~name (fun () -> k (Ok ()))
and await () : unit = and await () : unit =
suspend { handle = (fun ~run k -> A.set st (Some (run, k))) } suspend { handle = (fun ~name ~run k -> A.set st (Some (name, run, k))) }
in in
let t = { Dla_.release; await } in let t = { Dla_.release; await } in
@ -50,7 +56,7 @@ let prepare_for_await () : Dla_.t =
[@@@ocaml.alert "+unstable"] [@@@ocaml.alert "+unstable"]
[@@@else_] [@@@else_]
let[@inline] with_suspend ~run:_ f = f () let[@inline] with_suspend ~name:_ ~on_suspend:_ ~run:_ f = f ()
let[@inline] prepare_for_await () = { Dla_.release = ignore; await = ignore } let[@inline] prepare_for_await () = { Dla_.release = ignore; await = ignore }
[@@@endif] [@@@endif]

View file

@ -8,12 +8,15 @@ type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit
type task = unit -> unit type task = unit -> unit
type suspension_handler = { handle: run:(task -> unit) -> suspension -> unit } type suspension_handler = {
handle: name:string -> run:(name:string -> task -> unit) -> suspension -> unit;
}
[@@unboxed] [@@unboxed]
(** The handler that knows what to do with the suspended computation. (** The handler that knows what to do with the suspended computation.
The handler is given two things: The handler is given a few things:
- the name (if any) of the current computation
- the suspended computation (which can be resumed with a result - the suspended computation (which can be resumed with a result
eventually); eventually);
- a [run] function that can be used to start tasks to perform some - a [run] function that can be used to start tasks to perform some
@ -51,10 +54,18 @@ val suspend : suspension_handler -> unit
val prepare_for_await : unit -> Dla_.t val prepare_for_await : unit -> Dla_.t
(** Our stub for DLA. Unstable. *) (** Our stub for DLA. Unstable. *)
val with_suspend : run:(task -> unit) -> (unit -> unit) -> unit val with_suspend :
name:string ->
on_suspend:(unit -> unit) ->
run:(name:string -> task -> unit) ->
(unit -> unit) ->
unit
(** [with_suspend ~run f] runs [f()] in an environment where [suspend] (** [with_suspend ~run f] runs [f()] in an environment where [suspend]
will work. If [f()] suspends with suspension handler [h], will work. If [f()] suspends with suspension handler [h],
this calls [h ~run k] where [k] is the suspension. this calls [h ~run k] where [k] is the suspension.
The suspension should always run in a new task, via [run].
@param on_suspend called when [f()] suspends itself.
This will not do anything on OCaml 4.x. This will not do anything on OCaml 4.x.
*) *)

4
src/tracing_.dummy.ml Normal file
View file

@ -0,0 +1,4 @@
let enabled () = false
let dummy_span = 0L
let enter_span _name = dummy_span
let exit_span = ignore

4
src/tracing_.mli Normal file
View file

@ -0,0 +1,4 @@
val dummy_span : int64
val enter_span : string -> int64
val exit_span : int64 -> unit
val enabled : unit -> bool

13
src/tracing_.real.ml Normal file
View file

@ -0,0 +1,13 @@
module Trace = Trace_core
let enabled = Trace.enabled
let dummy_span = Int64.min_int
let dummy_file_ = "<unknown file>"
let[@inline] enter_span name : int64 =
if name = "" then
dummy_span
else
Trace.enter_span ~__FILE__:dummy_file_ ~__LINE__:0 name
let[@inline] exit_span sp = if sp <> dummy_span then Trace.exit_span sp

View file

@ -13,10 +13,16 @@ module Id = struct
let equal : t -> t -> bool = ( == ) let equal : t -> t -> bool = ( == )
end end
type task_with_name = {
f: task;
name: string;
}
type worker_state = { type worker_state = {
pool_id_: Id.t; (** Unique per pool *) pool_id_: Id.t; (** Unique per pool *)
mutable thread: Thread.t; mutable thread: Thread.t;
q: task WSQ.t; (** Work stealing queue *) q: task_with_name WSQ.t; (** Work stealing queue *)
mutable cur_span: int64;
rng: Random.State.t; rng: Random.State.t;
} }
(** State for a given worker. Only this worker is (** State for a given worker. Only this worker is
@ -29,7 +35,8 @@ type state = {
id_: Id.t; id_: Id.t;
active: bool A.t; (** Becomes [false] when the pool is shutdown. *) active: bool A.t; (** Becomes [false] when the pool is shutdown. *)
workers: worker_state array; (** Fixed set of workers. *) workers: worker_state array; (** Fixed set of workers. *)
main_q: task Queue.t; (** Main queue for tasks coming from the outside *) main_q: task_with_name Queue.t;
(** Main queue for tasks coming from the outside *)
mutable n_waiting: int; (* protected by mutex *) mutable n_waiting: int; (* protected by mutex *)
mutable n_waiting_nonzero: bool; (** [n_waiting > 0] *) mutable n_waiting_nonzero: bool; (** [n_waiting > 0] *)
mutex: Mutex.t; mutex: Mutex.t;
@ -65,9 +72,10 @@ let[@inline] try_wake_someone_ (self : state) : unit =
) )
(** Run [task] as is, on the pool. *) (** Run [task] as is, on the pool. *)
let schedule_task_ (self : state) (w : worker_state option) (task : task) : unit let schedule_task_ (self : state) ~name (w : worker_state option) (f : task) :
= unit =
(* Printf.printf "schedule task now (%d)\n%!" (Thread.id @@ Thread.self ()); *) (* Printf.printf "schedule task now (%d)\n%!" (Thread.id @@ Thread.self ()); *)
let task = { f; name } in
match w with match w with
| Some w when Id.equal self.id_ w.pool_id_ -> | Some w when Id.equal self.id_ w.pool_id_ ->
(* we're on this same pool, schedule in the worker's state. Otherwise (* we're on this same pool, schedule in the worker's state. Otherwise
@ -96,24 +104,36 @@ let schedule_task_ (self : state) (w : worker_state option) (task : task) : unit
raise Shutdown raise Shutdown
(** Run this task, now. Must be called from a worker. *) (** Run this task, now. Must be called from a worker. *)
let run_task_now_ (self : state) ~runner task : unit = let run_task_now_ (self : state) ~runner (w : worker_state) ~name task : unit =
(* Printf.printf "run task now (%d)\n%!" (Thread.id @@ Thread.self ()); *) (* Printf.printf "run task now (%d)\n%!" (Thread.id @@ Thread.self ()); *)
let (AT_pair (before_task, after_task)) = self.around_task in let (AT_pair (before_task, after_task)) = self.around_task in
let _ctx = before_task runner in let _ctx = before_task runner in
w.cur_span <- Tracing_.enter_span name;
let[@inline] exit_span_ () =
Tracing_.exit_span w.cur_span;
w.cur_span <- Tracing_.dummy_span
in
let run_another_task ~name task' =
let w = find_current_worker_ () in
schedule_task_ self w ~name task'
in
(* run the task now, catching errors *) (* run the task now, catching errors *)
(try (try
(* run [task()] and handle [suspend] in it *) (* run [task()] and handle [suspend] in it *)
Suspend_.with_suspend task ~run:(fun task' -> Suspend_.with_suspend task ~name ~run:run_another_task
let w = find_current_worker_ () in ~on_suspend:exit_span_
schedule_task_ self w task')
with e -> with e ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
self.on_exn e bt); self.on_exn e bt);
exit_span_ ();
after_task runner _ctx after_task runner _ctx
let[@inline] run_async_ (self : state) (task : task) : unit = let[@inline] run_async_ (self : state) ~name (f : task) : unit =
let w = find_current_worker_ () in let w = find_current_worker_ () in
schedule_task_ self w task schedule_task_ self w ~name f
(* TODO: function to schedule many tasks from the outside. (* TODO: function to schedule many tasks from the outside.
- build a queue - build a queue
@ -122,8 +142,6 @@ let[@inline] run_async_ (self : state) (task : task) : unit =
- wakeup all (broadcast) - wakeup all (broadcast)
- unlock *) - unlock *)
let run = run_async
(** Wait on condition. Precondition: we hold the mutex. *) (** Wait on condition. Precondition: we hold the mutex. *)
let[@inline] wait_ (self : state) : unit = let[@inline] wait_ (self : state) : unit =
self.n_waiting <- self.n_waiting + 1; self.n_waiting <- self.n_waiting + 1;
@ -132,10 +150,11 @@ let[@inline] wait_ (self : state) : unit =
self.n_waiting <- self.n_waiting - 1; self.n_waiting <- self.n_waiting - 1;
if self.n_waiting = 0 then self.n_waiting_nonzero <- false if self.n_waiting = 0 then self.n_waiting_nonzero <- false
exception Got_task of task exception Got_task of task_with_name
(** Try to steal a task *) (** Try to steal a task *)
let try_to_steal_work_once_ (self : state) (w : worker_state) : task option = let try_to_steal_work_once_ (self : state) (w : worker_state) :
task_with_name option =
let init = Random.State.int w.rng (Array.length self.workers) in let init = Random.State.int w.rng (Array.length self.workers) in
try try
@ -160,7 +179,7 @@ let worker_run_self_tasks_ (self : state) ~runner w : unit =
match WSQ.pop w.q with match WSQ.pop w.q with
| Some task -> | Some task ->
try_wake_someone_ self; try_wake_someone_ self;
run_task_now_ self ~runner task run_task_now_ self ~runner w ~name:task.name task.f
| None -> continue := false | None -> continue := false
done done
@ -173,7 +192,7 @@ let worker_thread_ (self : state) ~(runner : t) (w : worker_state) : unit =
worker_run_self_tasks_ self ~runner w; worker_run_self_tasks_ self ~runner w;
try_steal () try_steal ()
and run_task task : unit = and run_task task : unit =
run_task_now_ self ~runner task; run_task_now_ self ~runner w ~name:task.name task.f;
main () main ()
and try_steal () = and try_steal () =
match try_to_steal_work_once_ self w with match try_to_steal_work_once_ self w with
@ -231,7 +250,7 @@ type ('a, 'b) create_args =
'a 'a
(** Arguments used in {!create}. See {!create} for explanations. *) (** Arguments used in {!create}. See {!create} for explanations. *)
let dummy_task_ () = assert false let dummy_task_ = { f = ignore; name = "DUMMY_TASK" }
let create ?(on_init_thread = default_thread_init_exit_) let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
@ -256,6 +275,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
{ {
pool_id_; pool_id_;
thread = dummy; thread = dummy;
cur_span = Tracing_.dummy_span;
q = WSQ.create ~dummy:dummy_task_ (); q = WSQ.create ~dummy:dummy_task_ ();
rng = Random.State.make [| i |]; rng = Random.State.make [| i |];
}) })
@ -279,7 +299,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
let runner = let runner =
Runner.For_runner_implementors.create Runner.For_runner_implementors.create
~shutdown:(fun ~wait () -> shutdown_ pool ~wait) ~shutdown:(fun ~wait () -> shutdown_ pool ~wait)
~run_async:(fun f -> run_async_ pool f) ~run_async:(fun ~name f -> run_async_ pool ~name f)
~size:(fun () -> size_ pool) ~size:(fun () -> size_ pool)
~num_tasks:(fun () -> num_tasks_ pool) ~num_tasks:(fun () -> num_tasks_ pool)
() ()

View file

@ -53,7 +53,3 @@ val with_ : (unit -> (t -> 'a) -> 'a, _) create_args
Most parameters are the same as in {!create}. Most parameters are the same as in {!create}.
@since 0.3 *) @since 0.3 *)
val run : t -> (unit -> unit) -> unit
[@@deprecated "use run_async"]
(** deprecated alias to {!run_async} *)

View file

@ -33,10 +33,12 @@ let run ~min () =
let l1, l2 = let l1, l2 =
Fork_join.both Fork_join.both
(fun () -> (fun () ->
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "fj.left" in
Fork_join.map_list ~chunk_size Fork_join.map_list ~chunk_size
(Fork_join.map_list ~chunk_size neg) (Fork_join.map_list ~chunk_size neg)
l) l)
(fun () -> (fun () ->
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "fj.right" in
Fork_join.map_list ~chunk_size Fork_join.map_list ~chunk_size
(Fork_join.map_list ~chunk_size neg) (Fork_join.map_list ~chunk_size neg)
ref_l1) ref_l1)