mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 03:05:30 -05:00
adapt some tests for the lwt runner
This commit is contained in:
parent
9e814ecb48
commit
2afb5c1036
13 changed files with 4362 additions and 342 deletions
|
|
@ -4,6 +4,7 @@
|
||||||
(>= %{ocaml_version} 5.0))
|
(>= %{ocaml_version} 5.0))
|
||||||
(package moonpool)
|
(package moonpool)
|
||||||
(libraries
|
(libraries
|
||||||
|
t_fibers
|
||||||
moonpool
|
moonpool
|
||||||
moonpool.fib
|
moonpool.fib
|
||||||
trace
|
trace
|
||||||
|
|
|
||||||
12
test/fiber/lib/dune
Normal file
12
test/fiber/lib/dune
Normal file
|
|
@ -0,0 +1,12 @@
|
||||||
|
(library
|
||||||
|
(name t_fibers)
|
||||||
|
(enabled_if
|
||||||
|
(>= %{ocaml_version} 5.0))
|
||||||
|
(package moonpool)
|
||||||
|
(libraries
|
||||||
|
moonpool
|
||||||
|
moonpool.fib
|
||||||
|
trace
|
||||||
|
qcheck-core))
|
||||||
|
|
||||||
|
|
||||||
178
test/fiber/lib/fib.ml
Normal file
178
test/fiber/lib/fib.ml
Normal file
|
|
@ -0,0 +1,178 @@
|
||||||
|
|
||||||
|
open! Moonpool
|
||||||
|
module A = Atomic
|
||||||
|
module F = Moonpool_fib.Fiber
|
||||||
|
|
||||||
|
let ( let@ ) = ( @@ )
|
||||||
|
|
||||||
|
module TS = struct
|
||||||
|
type t = int list
|
||||||
|
|
||||||
|
let show (s : t) = String.concat "." @@ List.map string_of_int s
|
||||||
|
let init = [ 0 ]
|
||||||
|
|
||||||
|
let next_ = function
|
||||||
|
| [] -> [ 0 ]
|
||||||
|
| n :: tl -> (n + 1) :: tl
|
||||||
|
|
||||||
|
let tick (t : t ref) = t := next_ !t
|
||||||
|
|
||||||
|
let tick_get t =
|
||||||
|
tick t;
|
||||||
|
!t
|
||||||
|
end
|
||||||
|
|
||||||
|
(* more deterministic logging of events *)
|
||||||
|
module Log_ = struct
|
||||||
|
let events : (TS.t * string) list A.t = A.make []
|
||||||
|
|
||||||
|
let add_event t msg : unit =
|
||||||
|
while
|
||||||
|
let old = A.get events in
|
||||||
|
not (A.compare_and_set events old ((t, msg) :: old))
|
||||||
|
do
|
||||||
|
()
|
||||||
|
done
|
||||||
|
|
||||||
|
let logf t fmt = Printf.ksprintf (add_event t) fmt
|
||||||
|
|
||||||
|
let print_and_clear () =
|
||||||
|
let l =
|
||||||
|
A.exchange events []
|
||||||
|
|> List.map (fun (ts, msg) -> List.rev ts, msg)
|
||||||
|
|> List.sort Stdlib.compare
|
||||||
|
in
|
||||||
|
List.iter (fun (ts, msg) -> Printf.printf "%s: %s\n" (TS.show ts) msg) l
|
||||||
|
end
|
||||||
|
|
||||||
|
let logf = Log_.logf
|
||||||
|
|
||||||
|
let run1 ~runner () =
|
||||||
|
Printf.printf "============\nstart\n%!";
|
||||||
|
let clock = ref TS.init in
|
||||||
|
let fib =
|
||||||
|
F.spawn_top ~on:runner @@ fun () ->
|
||||||
|
let chan_progress = Chan.create ~max_size:4 () in
|
||||||
|
let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in
|
||||||
|
|
||||||
|
let subs =
|
||||||
|
List.init 5 (fun i ->
|
||||||
|
F.spawn ~protect:false @@ fun _n ->
|
||||||
|
Thread.delay (float i *. 0.01);
|
||||||
|
Chan.pop chans.(i);
|
||||||
|
Chan.push chan_progress i;
|
||||||
|
F.check_if_cancelled ();
|
||||||
|
i)
|
||||||
|
in
|
||||||
|
|
||||||
|
logf (TS.tick_get clock) "wait for subs";
|
||||||
|
|
||||||
|
F.spawn_ignore (fun () ->
|
||||||
|
for i = 0 to 4 do
|
||||||
|
Chan.push chans.(i) ();
|
||||||
|
let i' = Chan.pop chan_progress in
|
||||||
|
assert (i = i')
|
||||||
|
done);
|
||||||
|
|
||||||
|
(let clock0 = !clock in
|
||||||
|
List.iteri
|
||||||
|
(fun i f ->
|
||||||
|
let clock = ref (0 :: i :: clock0) in
|
||||||
|
logf !clock "await fiber %d" i;
|
||||||
|
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
|
||||||
|
(Option.is_some @@ F.Private_.get_cur_opt ());
|
||||||
|
let res = F.await f in
|
||||||
|
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
|
||||||
|
(Option.is_some @@ F.Private_.get_cur_opt ());
|
||||||
|
F.yield ();
|
||||||
|
logf (TS.tick_get clock) "res %d = %d" i res)
|
||||||
|
subs);
|
||||||
|
|
||||||
|
logf (TS.tick_get clock) "main fiber done"
|
||||||
|
in
|
||||||
|
|
||||||
|
Fut.await @@ F.res fib;
|
||||||
|
logf (TS.tick_get clock) "main fiber exited";
|
||||||
|
Log_.print_and_clear ();
|
||||||
|
()
|
||||||
|
|
||||||
|
let run2 ~runner () =
|
||||||
|
(* same but now, cancel one of the sub-fibers *)
|
||||||
|
Printf.printf "============\nstart\n";
|
||||||
|
|
||||||
|
let clock = ref TS.init in
|
||||||
|
let fib =
|
||||||
|
F.spawn_top ~on:runner @@ fun () ->
|
||||||
|
let@ () =
|
||||||
|
F.with_on_self_cancel (fun ebt ->
|
||||||
|
logf (TS.tick_get clock) "main fiber cancelled with %s"
|
||||||
|
@@ Exn_bt.show ebt)
|
||||||
|
in
|
||||||
|
|
||||||
|
let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in
|
||||||
|
let chan_progress = Chan.create ~max_size:4 () in
|
||||||
|
|
||||||
|
logf (TS.tick_get clock) "start fibers";
|
||||||
|
let subs =
|
||||||
|
let clock0 = !clock in
|
||||||
|
List.init 10 (fun i ->
|
||||||
|
let clock = ref (0 :: i :: clock0) in
|
||||||
|
F.spawn ~protect:false @@ fun _n ->
|
||||||
|
let@ () =
|
||||||
|
F.with_on_self_cancel (fun _ ->
|
||||||
|
logf (TS.tick_get clock) "sub-fiber %d was cancelled" i)
|
||||||
|
in
|
||||||
|
Thread.delay 0.002;
|
||||||
|
|
||||||
|
(* sync for determinism *)
|
||||||
|
Chan.pop chans_unblock.(i);
|
||||||
|
Chan.push chan_progress i;
|
||||||
|
|
||||||
|
if i = 7 then (
|
||||||
|
logf (TS.tick_get clock) "I'm fiber %d and I'm about to fail…" i;
|
||||||
|
failwith "oh no!"
|
||||||
|
);
|
||||||
|
|
||||||
|
F.check_if_cancelled ();
|
||||||
|
i)
|
||||||
|
in
|
||||||
|
|
||||||
|
let post = TS.tick_get clock in
|
||||||
|
List.iteri
|
||||||
|
(fun i fib ->
|
||||||
|
F.on_result fib (function
|
||||||
|
| Ok _ -> logf (i :: post) "fiber %d resolved as ok" i
|
||||||
|
| Error _ -> logf (i :: post) "fiber %d resolved as error" i))
|
||||||
|
subs;
|
||||||
|
|
||||||
|
(* sequentialize the fibers, for determinism *)
|
||||||
|
F.spawn_ignore (fun () ->
|
||||||
|
for j = 0 to 9 do
|
||||||
|
Chan.push chans_unblock.(j) ();
|
||||||
|
let j' = Chan.pop chan_progress in
|
||||||
|
assert (j = j')
|
||||||
|
done);
|
||||||
|
|
||||||
|
logf (TS.tick_get clock) "wait for subs";
|
||||||
|
List.iteri
|
||||||
|
(fun i f ->
|
||||||
|
logf (TS.tick_get clock) "await fiber %d" i;
|
||||||
|
let res = F.await f in
|
||||||
|
logf (TS.tick_get clock) "res %d = %d" i res)
|
||||||
|
subs;
|
||||||
|
logf (TS.tick_get clock) "yield";
|
||||||
|
F.yield ();
|
||||||
|
logf (TS.tick_get clock) "yielded";
|
||||||
|
logf (TS.tick_get clock) "main fiber done"
|
||||||
|
in
|
||||||
|
|
||||||
|
F.on_result fib (function
|
||||||
|
| Ok () -> logf (TS.tick_get clock) "main fiber result: ok"
|
||||||
|
| Error ebt ->
|
||||||
|
logf (TS.tick_get clock) "main fiber result: error %s" (Exn_bt.show ebt));
|
||||||
|
|
||||||
|
(try Fut.await @@ F.res fib
|
||||||
|
with Failure msg -> logf (TS.tick_get clock) "main fib failed with %S" msg);
|
||||||
|
logf (TS.tick_get clock) "main fiber exited";
|
||||||
|
Log_.print_and_clear ();
|
||||||
|
()
|
||||||
170
test/fiber/lib/fls.ml
Normal file
170
test/fiber/lib/fls.ml
Normal file
|
|
@ -0,0 +1,170 @@
|
||||||
|
|
||||||
|
open! Moonpool
|
||||||
|
module A = Atomic
|
||||||
|
module F = Moonpool_fib.Fiber
|
||||||
|
module FLS = Moonpool_fib.Fls
|
||||||
|
|
||||||
|
(* ### dummy little tracing system with local storage *)
|
||||||
|
|
||||||
|
type span_id = int
|
||||||
|
|
||||||
|
let k_parent : span_id Hmap.key = Hmap.Key.create ()
|
||||||
|
let ( let@ ) = ( @@ )
|
||||||
|
let spf = Printf.sprintf
|
||||||
|
|
||||||
|
module Span = struct
|
||||||
|
let new_id_ : unit -> span_id =
|
||||||
|
let n = A.make 0 in
|
||||||
|
fun () -> A.fetch_and_add n 1
|
||||||
|
|
||||||
|
type t = {
|
||||||
|
id: span_id;
|
||||||
|
parent: span_id option;
|
||||||
|
msg: string;
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
module Tracer = struct
|
||||||
|
type t = { spans: Span.t list A.t }
|
||||||
|
|
||||||
|
let create () : t = { spans = A.make [] }
|
||||||
|
let get self = A.get self.spans
|
||||||
|
|
||||||
|
let add (self : t) span =
|
||||||
|
while
|
||||||
|
let old = A.get self.spans in
|
||||||
|
not (A.compare_and_set self.spans old (span :: old))
|
||||||
|
do
|
||||||
|
()
|
||||||
|
done
|
||||||
|
|
||||||
|
let with_span self name f =
|
||||||
|
let id = Span.new_id_ () in
|
||||||
|
let parent = FLS.get_in_local_hmap_opt k_parent in
|
||||||
|
let span = { Span.id; parent; msg = name } in
|
||||||
|
add self span;
|
||||||
|
FLS.with_in_local_hmap k_parent id f
|
||||||
|
end
|
||||||
|
|
||||||
|
module Render = struct
|
||||||
|
type span_tree = {
|
||||||
|
msg: string; (** message of the span at the root *)
|
||||||
|
children: span_tree list;
|
||||||
|
}
|
||||||
|
|
||||||
|
type t = { roots: span_tree list }
|
||||||
|
|
||||||
|
let build (tracer : Tracer.t) : t =
|
||||||
|
let tops : (span_id, Span.t) Hashtbl.t = Hashtbl.create 16 in
|
||||||
|
let children : (span_id, Span.t list) Hashtbl.t = Hashtbl.create 16 in
|
||||||
|
|
||||||
|
(* everyone is a root at first *)
|
||||||
|
let all_spans = Tracer.get tracer in
|
||||||
|
List.iter (fun (sp : Span.t) -> Hashtbl.add tops sp.id sp) all_spans;
|
||||||
|
|
||||||
|
(* now consider the parenting relationships *)
|
||||||
|
let add_span_to_parent (span : Span.t) =
|
||||||
|
match span.parent with
|
||||||
|
| None -> ()
|
||||||
|
| Some p ->
|
||||||
|
Hashtbl.remove tops span.id;
|
||||||
|
let l = try Hashtbl.find children p with Not_found -> [] in
|
||||||
|
Hashtbl.replace children p (span :: l)
|
||||||
|
in
|
||||||
|
List.iter add_span_to_parent all_spans;
|
||||||
|
|
||||||
|
(* build the tree *)
|
||||||
|
let rec build_tree (sp : Span.t) : span_tree =
|
||||||
|
let children = try Hashtbl.find children sp.id with Not_found -> [] in
|
||||||
|
let children = List.map build_tree children |> List.sort Stdlib.compare in
|
||||||
|
{ msg = sp.msg; children }
|
||||||
|
in
|
||||||
|
|
||||||
|
let roots =
|
||||||
|
Hashtbl.fold (fun _ sp l -> build_tree sp :: l) tops []
|
||||||
|
|> List.sort Stdlib.compare
|
||||||
|
in
|
||||||
|
|
||||||
|
{ roots }
|
||||||
|
|
||||||
|
let pp (oc : out_channel) (self : t) : unit =
|
||||||
|
let rec pp_tree indent out (t : span_tree) =
|
||||||
|
let prefix = String.make indent ' ' in
|
||||||
|
Printf.fprintf out "%s%S\n" prefix t.msg;
|
||||||
|
List.iter (pp_tree (indent + 2) out) t.children
|
||||||
|
in
|
||||||
|
List.iter (pp_tree 2 oc) self.roots
|
||||||
|
end
|
||||||
|
|
||||||
|
let run ~pool ~pool_name () =
|
||||||
|
let tracer = Tracer.create () in
|
||||||
|
|
||||||
|
let sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub () =
|
||||||
|
let@ () =
|
||||||
|
Tracer.with_span tracer
|
||||||
|
(spf "child_%d.%d.%d.%d" idx idx_child idx_sub idx_sub_sub)
|
||||||
|
in
|
||||||
|
|
||||||
|
for j = 1 to 5 do
|
||||||
|
let@ () = Tracer.with_span tracer (spf "iter.loop %d" j) in
|
||||||
|
F.yield ()
|
||||||
|
done
|
||||||
|
in
|
||||||
|
|
||||||
|
let sub_child ~idx ~idx_child ~idx_sub () =
|
||||||
|
let@ () =
|
||||||
|
Tracer.with_span tracer (spf "child_%d.%d.%d" idx idx_child idx_sub)
|
||||||
|
in
|
||||||
|
|
||||||
|
for i = 1 to 10 do
|
||||||
|
let@ () = Tracer.with_span tracer (spf "iter.loop %02d" i) in
|
||||||
|
F.yield ()
|
||||||
|
done;
|
||||||
|
|
||||||
|
let subs =
|
||||||
|
List.init 2 (fun idx_sub_sub ->
|
||||||
|
F.spawn ~protect:true (fun () ->
|
||||||
|
sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ()))
|
||||||
|
in
|
||||||
|
List.iter F.await subs
|
||||||
|
in
|
||||||
|
|
||||||
|
let top_child ~idx ~idx_child () =
|
||||||
|
let@ () = Tracer.with_span tracer (spf "child.%d.%d" idx idx_child) in
|
||||||
|
|
||||||
|
let subs =
|
||||||
|
List.init 2 (fun k ->
|
||||||
|
F.spawn ~protect:true @@ fun () ->
|
||||||
|
sub_child ~idx ~idx_child ~idx_sub:k ())
|
||||||
|
in
|
||||||
|
|
||||||
|
let@ () =
|
||||||
|
Tracer.with_span tracer
|
||||||
|
(spf "child.%d.%d.99.await_children" idx idx_child)
|
||||||
|
in
|
||||||
|
List.iter F.await subs
|
||||||
|
in
|
||||||
|
|
||||||
|
let top idx =
|
||||||
|
let@ () = Tracer.with_span tracer (spf "top_%d" idx) in
|
||||||
|
|
||||||
|
let subs =
|
||||||
|
List.init 5 (fun j ->
|
||||||
|
F.spawn ~protect:true @@ fun () -> top_child ~idx ~idx_child:j ())
|
||||||
|
in
|
||||||
|
|
||||||
|
List.iter F.await subs
|
||||||
|
in
|
||||||
|
|
||||||
|
Printf.printf "run test on pool = %s\n" pool_name;
|
||||||
|
let fibs =
|
||||||
|
List.init 8 (fun idx -> F.spawn_top ~on:pool (fun () -> top idx))
|
||||||
|
in
|
||||||
|
List.iter F.wait_block_exn fibs;
|
||||||
|
|
||||||
|
Printf.printf "tracing complete\n";
|
||||||
|
Printf.printf "spans:\n";
|
||||||
|
let tree = Render.build tracer in
|
||||||
|
Render.pp stdout tree;
|
||||||
|
Printf.printf "done\n%!";
|
||||||
|
()
|
||||||
|
|
@ -1,179 +1,9 @@
|
||||||
open! Moonpool
|
open! Moonpool
|
||||||
module A = Atomic
|
|
||||||
module F = Moonpool_fib.Fiber
|
|
||||||
|
|
||||||
let ( let@ ) = ( @@ )
|
let ( let@ ) = ( @@ )
|
||||||
let runner = Fifo_pool.create ~num_threads:1 ()
|
let runner = Fifo_pool.create ~num_threads:1 ()
|
||||||
|
|
||||||
module TS = struct
|
|
||||||
type t = int list
|
|
||||||
|
|
||||||
let show (s : t) = String.concat "." @@ List.map string_of_int s
|
|
||||||
let init = [ 0 ]
|
|
||||||
|
|
||||||
let next_ = function
|
|
||||||
| [] -> [ 0 ]
|
|
||||||
| n :: tl -> (n + 1) :: tl
|
|
||||||
|
|
||||||
let tick (t : t ref) = t := next_ !t
|
|
||||||
|
|
||||||
let tick_get t =
|
|
||||||
tick t;
|
|
||||||
!t
|
|
||||||
end
|
|
||||||
|
|
||||||
(* more deterministic logging of events *)
|
|
||||||
module Log_ = struct
|
|
||||||
let events : (TS.t * string) list A.t = A.make []
|
|
||||||
|
|
||||||
let add_event t msg : unit =
|
|
||||||
while
|
|
||||||
let old = A.get events in
|
|
||||||
not (A.compare_and_set events old ((t, msg) :: old))
|
|
||||||
do
|
|
||||||
()
|
|
||||||
done
|
|
||||||
|
|
||||||
let logf t fmt = Printf.ksprintf (add_event t) fmt
|
|
||||||
|
|
||||||
let print_and_clear () =
|
|
||||||
let l =
|
|
||||||
A.exchange events []
|
|
||||||
|> List.map (fun (ts, msg) -> List.rev ts, msg)
|
|
||||||
|> List.sort Stdlib.compare
|
|
||||||
in
|
|
||||||
List.iter (fun (ts, msg) -> Printf.printf "%s: %s\n" (TS.show ts) msg) l
|
|
||||||
end
|
|
||||||
|
|
||||||
let logf = Log_.logf
|
|
||||||
|
|
||||||
let () =
|
let () =
|
||||||
Printf.printf "============\nstart\n";
|
T_fibers.Fib.run1 ~runner ();
|
||||||
let clock = ref TS.init in
|
T_fibers.Fib.run2 ~runner ();
|
||||||
let fib =
|
|
||||||
F.spawn_top ~on:runner @@ fun () ->
|
|
||||||
let chan_progress = Chan.create ~max_size:4 () in
|
|
||||||
let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in
|
|
||||||
|
|
||||||
let subs =
|
|
||||||
List.init 5 (fun i ->
|
|
||||||
F.spawn ~protect:false @@ fun _n ->
|
|
||||||
Thread.delay (float i *. 0.01);
|
|
||||||
Chan.pop chans.(i);
|
|
||||||
Chan.push chan_progress i;
|
|
||||||
F.check_if_cancelled ();
|
|
||||||
i)
|
|
||||||
in
|
|
||||||
|
|
||||||
logf (TS.tick_get clock) "wait for subs";
|
|
||||||
|
|
||||||
F.spawn_ignore (fun () ->
|
|
||||||
for i = 0 to 4 do
|
|
||||||
Chan.push chans.(i) ();
|
|
||||||
let i' = Chan.pop chan_progress in
|
|
||||||
assert (i = i')
|
|
||||||
done);
|
|
||||||
|
|
||||||
(let clock0 = !clock in
|
|
||||||
List.iteri
|
|
||||||
(fun i f ->
|
|
||||||
let clock = ref (0 :: i :: clock0) in
|
|
||||||
logf !clock "await fiber %d" i;
|
|
||||||
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
|
|
||||||
(Option.is_some @@ F.Private_.get_cur_opt ());
|
|
||||||
let res = F.await f in
|
|
||||||
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
|
|
||||||
(Option.is_some @@ F.Private_.get_cur_opt ());
|
|
||||||
F.yield ();
|
|
||||||
logf (TS.tick_get clock) "res %d = %d" i res)
|
|
||||||
subs);
|
|
||||||
|
|
||||||
logf (TS.tick_get clock) "main fiber done"
|
|
||||||
in
|
|
||||||
|
|
||||||
Fut.wait_block_exn @@ F.res fib;
|
|
||||||
logf (TS.tick_get clock) "main fiber exited";
|
|
||||||
Log_.print_and_clear ();
|
|
||||||
()
|
|
||||||
|
|
||||||
let () =
|
|
||||||
let@ _r = Moonpool_fib.main in
|
|
||||||
(* same but now, cancel one of the sub-fibers *)
|
|
||||||
Printf.printf "============\nstart\n";
|
|
||||||
|
|
||||||
let clock = ref TS.init in
|
|
||||||
let fib =
|
|
||||||
F.spawn_top ~on:runner @@ fun () ->
|
|
||||||
let@ () =
|
|
||||||
F.with_on_self_cancel (fun ebt ->
|
|
||||||
logf (TS.tick_get clock) "main fiber cancelled with %s"
|
|
||||||
@@ Exn_bt.show ebt)
|
|
||||||
in
|
|
||||||
|
|
||||||
let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in
|
|
||||||
let chan_progress = Chan.create ~max_size:4 () in
|
|
||||||
|
|
||||||
logf (TS.tick_get clock) "start fibers";
|
|
||||||
let subs =
|
|
||||||
let clock0 = !clock in
|
|
||||||
List.init 10 (fun i ->
|
|
||||||
let clock = ref (0 :: i :: clock0) in
|
|
||||||
F.spawn ~protect:false @@ fun _n ->
|
|
||||||
let@ () =
|
|
||||||
F.with_on_self_cancel (fun _ ->
|
|
||||||
logf (TS.tick_get clock) "sub-fiber %d was cancelled" i)
|
|
||||||
in
|
|
||||||
Thread.delay 0.002;
|
|
||||||
|
|
||||||
(* sync for determinism *)
|
|
||||||
Chan.pop chans_unblock.(i);
|
|
||||||
Chan.push chan_progress i;
|
|
||||||
|
|
||||||
if i = 7 then (
|
|
||||||
logf (TS.tick_get clock) "I'm fiber %d and I'm about to fail…" i;
|
|
||||||
failwith "oh no!"
|
|
||||||
);
|
|
||||||
|
|
||||||
F.check_if_cancelled ();
|
|
||||||
i)
|
|
||||||
in
|
|
||||||
|
|
||||||
let post = TS.tick_get clock in
|
|
||||||
List.iteri
|
|
||||||
(fun i fib ->
|
|
||||||
F.on_result fib (function
|
|
||||||
| Ok _ -> logf (i :: post) "fiber %d resolved as ok" i
|
|
||||||
| Error _ -> logf (i :: post) "fiber %d resolved as error" i))
|
|
||||||
subs;
|
|
||||||
|
|
||||||
(* sequentialize the fibers, for determinism *)
|
|
||||||
F.spawn_ignore (fun () ->
|
|
||||||
for j = 0 to 9 do
|
|
||||||
Chan.push chans_unblock.(j) ();
|
|
||||||
let j' = Chan.pop chan_progress in
|
|
||||||
assert (j = j')
|
|
||||||
done);
|
|
||||||
|
|
||||||
logf (TS.tick_get clock) "wait for subs";
|
|
||||||
List.iteri
|
|
||||||
(fun i f ->
|
|
||||||
logf (TS.tick_get clock) "await fiber %d" i;
|
|
||||||
let res = F.await f in
|
|
||||||
logf (TS.tick_get clock) "res %d = %d" i res)
|
|
||||||
subs;
|
|
||||||
logf (TS.tick_get clock) "yield";
|
|
||||||
F.yield ();
|
|
||||||
logf (TS.tick_get clock) "yielded";
|
|
||||||
logf (TS.tick_get clock) "main fiber done"
|
|
||||||
in
|
|
||||||
|
|
||||||
F.on_result fib (function
|
|
||||||
| Ok () -> logf (TS.tick_get clock) "main fiber result: ok"
|
|
||||||
| Error ebt ->
|
|
||||||
logf (TS.tick_get clock) "main fiber result: error %s" (Exn_bt.show ebt));
|
|
||||||
|
|
||||||
(try Fut.wait_block_exn @@ F.res fib
|
|
||||||
with Failure msg -> logf (TS.tick_get clock) "main fib failed with %S" msg);
|
|
||||||
logf (TS.tick_get clock) "main fiber exited";
|
|
||||||
Log_.print_and_clear ();
|
|
||||||
()
|
|
||||||
|
|
|
||||||
|
|
@ -1930,7 +1930,7 @@ spans:
|
||||||
"iter.loop 09"
|
"iter.loop 09"
|
||||||
"iter.loop 10"
|
"iter.loop 10"
|
||||||
done
|
done
|
||||||
run test on pool = ws_pool
|
run test on pool = fifo_pool
|
||||||
tracing complete
|
tracing complete
|
||||||
spans:
|
spans:
|
||||||
"top_0"
|
"top_0"
|
||||||
|
|
|
||||||
|
|
@ -1,177 +1,11 @@
|
||||||
open! Moonpool
|
open! Moonpool
|
||||||
module A = Atomic
|
|
||||||
module F = Moonpool_fib.Fiber
|
|
||||||
module FLS = Moonpool_fib.Fls
|
|
||||||
|
|
||||||
(* ### dummy little tracing system with local storage *)
|
let (let@) = (@@)
|
||||||
|
|
||||||
type span_id = int
|
|
||||||
|
|
||||||
let k_parent : span_id Hmap.key = Hmap.Key.create ()
|
|
||||||
let ( let@ ) = ( @@ )
|
|
||||||
let spf = Printf.sprintf
|
|
||||||
|
|
||||||
module Span = struct
|
|
||||||
let new_id_ : unit -> span_id =
|
|
||||||
let n = A.make 0 in
|
|
||||||
fun () -> A.fetch_and_add n 1
|
|
||||||
|
|
||||||
type t = {
|
|
||||||
id: span_id;
|
|
||||||
parent: span_id option;
|
|
||||||
msg: string;
|
|
||||||
}
|
|
||||||
end
|
|
||||||
|
|
||||||
module Tracer = struct
|
|
||||||
type t = { spans: Span.t list A.t }
|
|
||||||
|
|
||||||
let create () : t = { spans = A.make [] }
|
|
||||||
let get self = A.get self.spans
|
|
||||||
|
|
||||||
let add (self : t) span =
|
|
||||||
while
|
|
||||||
let old = A.get self.spans in
|
|
||||||
not (A.compare_and_set self.spans old (span :: old))
|
|
||||||
do
|
|
||||||
()
|
|
||||||
done
|
|
||||||
|
|
||||||
let with_span self name f =
|
|
||||||
let id = Span.new_id_ () in
|
|
||||||
let parent = FLS.get_in_local_hmap_opt k_parent in
|
|
||||||
let span = { Span.id; parent; msg = name } in
|
|
||||||
add self span;
|
|
||||||
FLS.with_in_local_hmap k_parent id f
|
|
||||||
end
|
|
||||||
|
|
||||||
module Render = struct
|
|
||||||
type span_tree = {
|
|
||||||
msg: string; (** message of the span at the root *)
|
|
||||||
children: span_tree list;
|
|
||||||
}
|
|
||||||
|
|
||||||
type t = { roots: span_tree list }
|
|
||||||
|
|
||||||
let build (tracer : Tracer.t) : t =
|
|
||||||
let tops : (span_id, Span.t) Hashtbl.t = Hashtbl.create 16 in
|
|
||||||
let children : (span_id, Span.t list) Hashtbl.t = Hashtbl.create 16 in
|
|
||||||
|
|
||||||
(* everyone is a root at first *)
|
|
||||||
let all_spans = Tracer.get tracer in
|
|
||||||
List.iter (fun (sp : Span.t) -> Hashtbl.add tops sp.id sp) all_spans;
|
|
||||||
|
|
||||||
(* now consider the parenting relationships *)
|
|
||||||
let add_span_to_parent (span : Span.t) =
|
|
||||||
match span.parent with
|
|
||||||
| None -> ()
|
|
||||||
| Some p ->
|
|
||||||
Hashtbl.remove tops span.id;
|
|
||||||
let l = try Hashtbl.find children p with Not_found -> [] in
|
|
||||||
Hashtbl.replace children p (span :: l)
|
|
||||||
in
|
|
||||||
List.iter add_span_to_parent all_spans;
|
|
||||||
|
|
||||||
(* build the tree *)
|
|
||||||
let rec build_tree (sp : Span.t) : span_tree =
|
|
||||||
let children = try Hashtbl.find children sp.id with Not_found -> [] in
|
|
||||||
let children = List.map build_tree children |> List.sort Stdlib.compare in
|
|
||||||
{ msg = sp.msg; children }
|
|
||||||
in
|
|
||||||
|
|
||||||
let roots =
|
|
||||||
Hashtbl.fold (fun _ sp l -> build_tree sp :: l) tops []
|
|
||||||
|> List.sort Stdlib.compare
|
|
||||||
in
|
|
||||||
|
|
||||||
{ roots }
|
|
||||||
|
|
||||||
let pp (oc : out_channel) (self : t) : unit =
|
|
||||||
let rec pp_tree indent out (t : span_tree) =
|
|
||||||
let prefix = String.make indent ' ' in
|
|
||||||
Printf.fprintf out "%s%S\n" prefix t.msg;
|
|
||||||
List.iter (pp_tree (indent + 2) out) t.children
|
|
||||||
in
|
|
||||||
List.iter (pp_tree 2 oc) self.roots
|
|
||||||
end
|
|
||||||
|
|
||||||
let run ~pool ~pool_name () =
|
|
||||||
let tracer = Tracer.create () in
|
|
||||||
|
|
||||||
let sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub () =
|
|
||||||
let@ () =
|
|
||||||
Tracer.with_span tracer
|
|
||||||
(spf "child_%d.%d.%d.%d" idx idx_child idx_sub idx_sub_sub)
|
|
||||||
in
|
|
||||||
|
|
||||||
for j = 1 to 5 do
|
|
||||||
let@ () = Tracer.with_span tracer (spf "iter.loop %d" j) in
|
|
||||||
F.yield ()
|
|
||||||
done
|
|
||||||
in
|
|
||||||
|
|
||||||
let sub_child ~idx ~idx_child ~idx_sub () =
|
|
||||||
let@ () =
|
|
||||||
Tracer.with_span tracer (spf "child_%d.%d.%d" idx idx_child idx_sub)
|
|
||||||
in
|
|
||||||
|
|
||||||
for i = 1 to 10 do
|
|
||||||
let@ () = Tracer.with_span tracer (spf "iter.loop %02d" i) in
|
|
||||||
F.yield ()
|
|
||||||
done;
|
|
||||||
|
|
||||||
let subs =
|
|
||||||
List.init 2 (fun idx_sub_sub ->
|
|
||||||
F.spawn ~protect:true (fun () ->
|
|
||||||
sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ()))
|
|
||||||
in
|
|
||||||
List.iter F.await subs
|
|
||||||
in
|
|
||||||
|
|
||||||
let top_child ~idx ~idx_child () =
|
|
||||||
let@ () = Tracer.with_span tracer (spf "child.%d.%d" idx idx_child) in
|
|
||||||
|
|
||||||
let subs =
|
|
||||||
List.init 2 (fun k ->
|
|
||||||
F.spawn ~protect:true @@ fun () ->
|
|
||||||
sub_child ~idx ~idx_child ~idx_sub:k ())
|
|
||||||
in
|
|
||||||
|
|
||||||
let@ () =
|
|
||||||
Tracer.with_span tracer
|
|
||||||
(spf "child.%d.%d.99.await_children" idx idx_child)
|
|
||||||
in
|
|
||||||
List.iter F.await subs
|
|
||||||
in
|
|
||||||
|
|
||||||
let top idx =
|
|
||||||
let@ () = Tracer.with_span tracer (spf "top_%d" idx) in
|
|
||||||
|
|
||||||
let subs =
|
|
||||||
List.init 5 (fun j ->
|
|
||||||
F.spawn ~protect:true @@ fun () -> top_child ~idx ~idx_child:j ())
|
|
||||||
in
|
|
||||||
|
|
||||||
List.iter F.await subs
|
|
||||||
in
|
|
||||||
|
|
||||||
Printf.printf "run test on pool = %s\n" pool_name;
|
|
||||||
let fibs =
|
|
||||||
List.init 8 (fun idx -> F.spawn_top ~on:pool (fun () -> top idx))
|
|
||||||
in
|
|
||||||
List.iter F.wait_block_exn fibs;
|
|
||||||
|
|
||||||
Printf.printf "tracing complete\n";
|
|
||||||
Printf.printf "spans:\n";
|
|
||||||
let tree = Render.build tracer in
|
|
||||||
Render.pp stdout tree;
|
|
||||||
Printf.printf "done\n%!";
|
|
||||||
()
|
|
||||||
|
|
||||||
let () =
|
let () =
|
||||||
(let@ pool = Ws_pool.with_ () in
|
(let@ pool = Ws_pool.with_ () in
|
||||||
run ~pool ~pool_name:"ws_pool" ());
|
T_fibers.Fls.run ~pool ~pool_name:"ws_pool" ());
|
||||||
|
|
||||||
(let@ pool = Fifo_pool.with_ () in
|
(let@ pool = Fifo_pool.with_ () in
|
||||||
run ~pool ~pool_name:"ws_pool" ());
|
T_fibers.Fls.run ~pool ~pool_name:"fifo_pool" ());
|
||||||
()
|
()
|
||||||
|
|
|
||||||
16
test/lwt/fibers/dune
Normal file
16
test/lwt/fibers/dune
Normal file
|
|
@ -0,0 +1,16 @@
|
||||||
|
(tests
|
||||||
|
(names t_fls t_main t_fib1)
|
||||||
|
(enabled_if
|
||||||
|
(>= %{ocaml_version} 5.0))
|
||||||
|
(package moonpool-lwt)
|
||||||
|
(libraries
|
||||||
|
t_fibers
|
||||||
|
moonpool
|
||||||
|
moonpool.fib
|
||||||
|
moonpool-lwt
|
||||||
|
trace
|
||||||
|
trace-tef
|
||||||
|
qcheck-core
|
||||||
|
qcheck-core.runner
|
||||||
|
;tracy-client.trace
|
||||||
|
))
|
||||||
61
test/lwt/fibers/t_fib1.expected
Normal file
61
test/lwt/fibers/t_fib1.expected
Normal file
|
|
@ -0,0 +1,61 @@
|
||||||
|
============
|
||||||
|
start
|
||||||
|
1: wait for subs
|
||||||
|
1.0.0: await fiber 0
|
||||||
|
1.0.1: cur fiber[0] is some: true
|
||||||
|
1.0.2: cur fiber[0] is some: true
|
||||||
|
1.0.3: res 0 = 0
|
||||||
|
1.1.0: await fiber 1
|
||||||
|
1.1.1: cur fiber[1] is some: true
|
||||||
|
1.1.2: cur fiber[1] is some: true
|
||||||
|
1.1.3: res 1 = 1
|
||||||
|
1.2.0: await fiber 2
|
||||||
|
1.2.1: cur fiber[2] is some: true
|
||||||
|
1.2.2: cur fiber[2] is some: true
|
||||||
|
1.2.3: res 2 = 2
|
||||||
|
1.3.0: await fiber 3
|
||||||
|
1.3.1: cur fiber[3] is some: true
|
||||||
|
1.3.2: cur fiber[3] is some: true
|
||||||
|
1.3.3: res 3 = 3
|
||||||
|
1.4.0: await fiber 4
|
||||||
|
1.4.1: cur fiber[4] is some: true
|
||||||
|
1.4.2: cur fiber[4] is some: true
|
||||||
|
1.4.3: res 4 = 4
|
||||||
|
2: main fiber done
|
||||||
|
3: main fiber exited
|
||||||
|
============
|
||||||
|
start
|
||||||
|
1: start fibers
|
||||||
|
1.7.1: I'm fiber 7 and I'm about to fail…
|
||||||
|
1.8.1: sub-fiber 8 was cancelled
|
||||||
|
1.9.1: sub-fiber 9 was cancelled
|
||||||
|
2.0: fiber 0 resolved as ok
|
||||||
|
2.1: fiber 1 resolved as ok
|
||||||
|
2.2: fiber 2 resolved as ok
|
||||||
|
2.3: fiber 3 resolved as ok
|
||||||
|
2.4: fiber 4 resolved as ok
|
||||||
|
2.5: fiber 5 resolved as ok
|
||||||
|
2.6: fiber 6 resolved as ok
|
||||||
|
2.7: fiber 7 resolved as error
|
||||||
|
2.8: fiber 8 resolved as error
|
||||||
|
2.9: fiber 9 resolved as error
|
||||||
|
3: wait for subs
|
||||||
|
4: await fiber 0
|
||||||
|
5: res 0 = 0
|
||||||
|
6: await fiber 1
|
||||||
|
7: res 1 = 1
|
||||||
|
8: await fiber 2
|
||||||
|
9: res 2 = 2
|
||||||
|
10: await fiber 3
|
||||||
|
11: res 3 = 3
|
||||||
|
12: await fiber 4
|
||||||
|
13: res 4 = 4
|
||||||
|
14: await fiber 5
|
||||||
|
15: res 5 = 5
|
||||||
|
16: await fiber 6
|
||||||
|
17: res 6 = 6
|
||||||
|
18: await fiber 7
|
||||||
|
19: main fiber cancelled with Failure("oh no!")
|
||||||
|
20: main fiber result: error Failure("oh no!")
|
||||||
|
21: main fib failed with "oh no!"
|
||||||
|
22: main fiber exited
|
||||||
10
test/lwt/fibers/t_fib1.ml
Normal file
10
test/lwt/fibers/t_fib1.ml
Normal file
|
|
@ -0,0 +1,10 @@
|
||||||
|
open! Moonpool
|
||||||
|
module M_lwt = Moonpool_lwt
|
||||||
|
|
||||||
|
let ( let@ ) = ( @@ )
|
||||||
|
|
||||||
|
let () =
|
||||||
|
let@ runner = M_lwt.lwt_main in
|
||||||
|
T_fibers.Fib.run1 ~runner ();
|
||||||
|
T_fibers.Fib.run2 ~runner ();
|
||||||
|
|
||||||
3864
test/lwt/fibers/t_fls.expected
Normal file
3864
test/lwt/fibers/t_fls.expected
Normal file
File diff suppressed because it is too large
Load diff
11
test/lwt/fibers/t_fls.ml
Normal file
11
test/lwt/fibers/t_fls.ml
Normal file
|
|
@ -0,0 +1,11 @@
|
||||||
|
open! Moonpool
|
||||||
|
|
||||||
|
let (let@) = (@@)
|
||||||
|
|
||||||
|
let () =
|
||||||
|
(let@ pool = Ws_pool.with_ () in
|
||||||
|
T_fibers.Fls.run ~pool ~pool_name:"ws_pool" ());
|
||||||
|
|
||||||
|
(let@ pool = Fifo_pool.with_ () in
|
||||||
|
T_fibers.Fls.run ~pool ~pool_name:"fifo_pool" ());
|
||||||
|
()
|
||||||
33
test/lwt/fibers/t_main.ml
Normal file
33
test/lwt/fibers/t_main.ml
Normal file
|
|
@ -0,0 +1,33 @@
|
||||||
|
open Moonpool
|
||||||
|
module M_lwt = Moonpool_lwt
|
||||||
|
module F = Moonpool_fib
|
||||||
|
|
||||||
|
let ( let@ ) = ( @@ )
|
||||||
|
|
||||||
|
let () =
|
||||||
|
(* run fibers in the background, await them in the main thread *)
|
||||||
|
let@ bg = Fifo_pool.with_ ~num_threads:4 () in
|
||||||
|
let r =
|
||||||
|
M_lwt.lwt_main @@ fun runner ->
|
||||||
|
let f1 = F.spawn_top ~on:bg (fun () -> 1) in
|
||||||
|
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
|
||||||
|
let f3 = F.spawn_top ~on:runner (fun () -> F.await f1 + 10) in
|
||||||
|
let r = F.await f2 + F.await f3 in
|
||||||
|
assert (r = 13);
|
||||||
|
r
|
||||||
|
in
|
||||||
|
assert (r = 13)
|
||||||
|
|
||||||
|
let () =
|
||||||
|
Printf.eprintf "PART 2\n%!";
|
||||||
|
try
|
||||||
|
let _r =
|
||||||
|
M_lwt.lwt_main @@ fun runner ->
|
||||||
|
let fib = F.spawn_top ~on:runner (fun () -> failwith "oops") in
|
||||||
|
F.await fib
|
||||||
|
in
|
||||||
|
|
||||||
|
assert false
|
||||||
|
with Failure msg ->
|
||||||
|
(* Printf.eprintf "got %S\n%!" msg; *)
|
||||||
|
assert (msg = "oops")
|
||||||
Loading…
Add table
Reference in a new issue