Compare commits

..

1 commit

Author SHA1 Message Date
Simon Cruanes
a5740468f5
Merge fc5fd996fc into 867cbd2318 2025-09-04 20:03:38 +00:00
17 changed files with 354 additions and 2443 deletions

View file

@ -5,7 +5,6 @@
(>= %{ocaml_version} 5.0))
(libraries
(re_export moonpool)
moonpool.fib
picos
(re_export lwt)
lwt.unix))

View file

@ -21,7 +21,6 @@ module Scheduler_state = struct
mutex: Mutex.t;
mutable thread: int;
closed: bool Atomic.t;
cleanup_done: bool Atomic.t;
mutable as_runner: Moonpool.Runner.t;
mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option;
mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option;
@ -40,7 +39,6 @@ module Scheduler_state = struct
mutex = Mutex.create ();
thread = Thread.id (Thread.self ());
closed = Atomic.make false;
cleanup_done = Atomic.make false;
as_runner = Moonpool.Runner.dummy;
enter_hook = None;
leave_hook = None;
@ -48,15 +46,12 @@ module Scheduler_state = struct
has_notified = Atomic.make false;
}
let[@inline] notify_ (self : st) : unit =
if not (Atomic.exchange self.has_notified true) then
Lwt_unix.send_notification self.notification
let[@inline never] add_action_from_another_thread_ (self : st) f : unit =
Mutex.lock self.mutex;
Queue.push f self.actions_from_other_threads;
Mutex.unlock self.mutex;
notify_ self
if not (Atomic.exchange self.has_notified true) then
Lwt_unix.send_notification self.notification;
Mutex.unlock self.mutex
let[@inline] on_lwt_thread_ (self : st) : bool =
Thread.id (Thread.self ()) = self.thread
@ -76,15 +71,11 @@ module Scheduler_state = struct
one!)";
if not (on_lwt_thread_ st) then
failwith "moonpool-lwt: cleanup from the wrong thread";
Atomic.set st.closed true;
if not (Atomic.exchange st.cleanup_done true) then (
Option.iter Lwt_main.Enter_iter_hooks.remove st.enter_hook;
Option.iter Lwt_main.Leave_iter_hooks.remove st.leave_hook;
Lwt_unix.stop_notification st.notification
);
Option.iter Lwt_main.Enter_iter_hooks.remove st.enter_hook;
Option.iter Lwt_main.Leave_iter_hooks.remove st.leave_hook;
Atomic.set cur_st None
| None -> failwith "moonpool-lwt: cleanup failed (no current active state)"
| _ -> ()
end
module Ops = struct
@ -299,7 +290,7 @@ let[@inline] is_setup () = Option.is_some @@ Atomic.get Scheduler_state.cur_st
let spawn_lwt f : _ Lwt.t =
let st = Main_state.get_st () in
let lwt_fut, lwt_prom = Lwt.wait () in
Moonpool_fib.spawn_top_ignore ~on:st.as_runner (fun () ->
M.Runner.run_async st.as_runner (fun () ->
try
let x = f () in
Lwt.wakeup lwt_prom x
@ -312,8 +303,6 @@ let lwt_main (f : _ -> 'a) : 'a =
let finally () = Scheduler_state.cleanup st in
Fun.protect ~finally @@ fun () ->
let fut = spawn_lwt (fun () -> f st.as_runner) in
(* make sure the scheduler isn't already sleeping *)
Scheduler_state.notify_ st;
Lwt_main.run fut
let[@inline] lwt_main_runner () =

View file

@ -39,8 +39,7 @@ val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a
val on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref
val lwt_main : (Moonpool.Runner.t -> 'a) -> 'a
(** [lwt_main f] sets the moonpool-lwt bridge up, runs lwt main, calls [f],
destroys the bridge, and return the result of [f()]. *)
(** Setup, run lwt main, return the result *)
val lwt_main_runner : unit -> Moonpool.Runner.t
(** The runner from {!lwt_main}. The runner is only going to work if {!lwt_main}
@ -48,4 +47,3 @@ val lwt_main_runner : unit -> Moonpool.Runner.t
@raise Failure if {!lwt_main} was not called. *)
val is_setup : unit -> bool
(** Is the moonpool-lwt bridge setup? *)

View file

@ -4,7 +4,6 @@
(>= %{ocaml_version} 5.0))
(package moonpool)
(libraries
t_fibers
moonpool
moonpool.fib
trace

View file

@ -1,6 +0,0 @@
(library
(name t_fibers)
(enabled_if
(>= %{ocaml_version} 5.0))
(package moonpool)
(libraries moonpool moonpool.fib trace qcheck-core))

View file

@ -1,177 +0,0 @@
open! Moonpool
module A = Atomic
module F = Moonpool_fib.Fiber
let ( let@ ) = ( @@ )
module TS = struct
type t = int list
let show (s : t) = String.concat "." @@ List.map string_of_int s
let init = [ 0 ]
let next_ = function
| [] -> [ 0 ]
| n :: tl -> (n + 1) :: tl
let tick (t : t ref) = t := next_ !t
let tick_get t =
tick t;
!t
end
(* more deterministic logging of events *)
module Log_ = struct
let events : (TS.t * string) list A.t = A.make []
let add_event t msg : unit =
while
let old = A.get events in
not (A.compare_and_set events old ((t, msg) :: old))
do
()
done
let logf t fmt = Printf.ksprintf (add_event t) fmt
let print_and_clear () =
let l =
A.exchange events []
|> List.map (fun (ts, msg) -> List.rev ts, msg)
|> List.sort Stdlib.compare
in
List.iter (fun (ts, msg) -> Printf.printf "%s: %s\n" (TS.show ts) msg) l
end
let logf = Log_.logf
let run1 ~runner () =
Printf.printf "============\nstart\n%!";
let clock = ref TS.init in
let fib =
F.spawn_top ~on:runner @@ fun () ->
let chan_progress = Chan.create ~max_size:4 () in
let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in
let subs =
List.init 5 (fun i ->
F.spawn ~protect:false @@ fun _n ->
Thread.delay (float i *. 0.01);
Chan.pop chans.(i);
Chan.push chan_progress i;
F.check_if_cancelled ();
i)
in
logf (TS.tick_get clock) "wait for subs";
F.spawn_ignore (fun () ->
for i = 0 to 4 do
Chan.push chans.(i) ();
let i' = Chan.pop chan_progress in
assert (i = i')
done);
(let clock0 = !clock in
List.iteri
(fun i f ->
let clock = ref (0 :: i :: clock0) in
logf !clock "await fiber %d" i;
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
(Option.is_some @@ F.Private_.get_cur_opt ());
let res = F.await f in
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
(Option.is_some @@ F.Private_.get_cur_opt ());
F.yield ();
logf (TS.tick_get clock) "res %d = %d" i res)
subs);
logf (TS.tick_get clock) "main fiber done"
in
Fut.await @@ F.res fib;
logf (TS.tick_get clock) "main fiber exited";
Log_.print_and_clear ();
()
let run2 ~runner () =
(* same but now, cancel one of the sub-fibers *)
Printf.printf "============\nstart\n";
let clock = ref TS.init in
let fib =
F.spawn_top ~on:runner @@ fun () ->
let@ () =
F.with_on_self_cancel (fun ebt ->
logf (TS.tick_get clock) "main fiber cancelled with %s"
@@ Exn_bt.show ebt)
in
let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in
let chan_progress = Chan.create ~max_size:4 () in
logf (TS.tick_get clock) "start fibers";
let subs =
let clock0 = !clock in
List.init 10 (fun i ->
let clock = ref (0 :: i :: clock0) in
F.spawn ~protect:false @@ fun _n ->
let@ () =
F.with_on_self_cancel (fun _ ->
logf (TS.tick_get clock) "sub-fiber %d was cancelled" i)
in
Thread.delay 0.002;
(* sync for determinism *)
Chan.pop chans_unblock.(i);
Chan.push chan_progress i;
if i = 7 then (
logf (TS.tick_get clock) "I'm fiber %d and I'm about to fail…" i;
failwith "oh no!"
);
F.check_if_cancelled ();
i)
in
let post = TS.tick_get clock in
List.iteri
(fun i fib ->
F.on_result fib (function
| Ok _ -> logf (i :: post) "fiber %d resolved as ok" i
| Error _ -> logf (i :: post) "fiber %d resolved as error" i))
subs;
(* sequentialize the fibers, for determinism *)
F.spawn_ignore (fun () ->
for j = 0 to 9 do
Chan.push chans_unblock.(j) ();
let j' = Chan.pop chan_progress in
assert (j = j')
done);
logf (TS.tick_get clock) "wait for subs";
List.iteri
(fun i f ->
logf (TS.tick_get clock) "await fiber %d" i;
let res = F.await f in
logf (TS.tick_get clock) "res %d = %d" i res)
subs;
logf (TS.tick_get clock) "yield";
F.yield ();
logf (TS.tick_get clock) "yielded";
logf (TS.tick_get clock) "main fiber done"
in
F.on_result fib (function
| Ok () -> logf (TS.tick_get clock) "main fiber result: ok"
| Error ebt ->
logf (TS.tick_get clock) "main fiber result: error %s" (Exn_bt.show ebt));
(try Fut.await @@ F.res fib
with Failure msg -> logf (TS.tick_get clock) "main fib failed with %S" msg);
logf (TS.tick_get clock) "main fiber exited";
Log_.print_and_clear ();
()

View file

@ -1,169 +0,0 @@
open! Moonpool
module A = Atomic
module F = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls
(* ### dummy little tracing system with local storage *)
type span_id = int
let k_parent : span_id Hmap.key = Hmap.Key.create ()
let ( let@ ) = ( @@ )
let spf = Printf.sprintf
module Span = struct
let new_id_ : unit -> span_id =
let n = A.make 0 in
fun () -> A.fetch_and_add n 1
type t = {
id: span_id;
parent: span_id option;
msg: string;
}
end
module Tracer = struct
type t = { spans: Span.t list A.t }
let create () : t = { spans = A.make [] }
let get self = A.get self.spans
let add (self : t) span =
while
let old = A.get self.spans in
not (A.compare_and_set self.spans old (span :: old))
do
()
done
let with_span self name f =
let id = Span.new_id_ () in
let parent = FLS.get_in_local_hmap_opt k_parent in
let span = { Span.id; parent; msg = name } in
add self span;
FLS.with_in_local_hmap k_parent id f
end
module Render = struct
type span_tree = {
msg: string; (** message of the span at the root *)
children: span_tree list;
}
type t = { roots: span_tree list }
let build (tracer : Tracer.t) : t =
let tops : (span_id, Span.t) Hashtbl.t = Hashtbl.create 16 in
let children : (span_id, Span.t list) Hashtbl.t = Hashtbl.create 16 in
(* everyone is a root at first *)
let all_spans = Tracer.get tracer in
List.iter (fun (sp : Span.t) -> Hashtbl.add tops sp.id sp) all_spans;
(* now consider the parenting relationships *)
let add_span_to_parent (span : Span.t) =
match span.parent with
| None -> ()
| Some p ->
Hashtbl.remove tops span.id;
let l = try Hashtbl.find children p with Not_found -> [] in
Hashtbl.replace children p (span :: l)
in
List.iter add_span_to_parent all_spans;
(* build the tree *)
let rec build_tree (sp : Span.t) : span_tree =
let children = try Hashtbl.find children sp.id with Not_found -> [] in
let children = List.map build_tree children |> List.sort Stdlib.compare in
{ msg = sp.msg; children }
in
let roots =
Hashtbl.fold (fun _ sp l -> build_tree sp :: l) tops []
|> List.sort Stdlib.compare
in
{ roots }
let pp (oc : out_channel) (self : t) : unit =
let rec pp_tree indent out (t : span_tree) =
let prefix = String.make indent ' ' in
Printf.fprintf out "%s%S\n" prefix t.msg;
List.iter (pp_tree (indent + 2) out) t.children
in
List.iter (pp_tree 2 oc) self.roots
end
let run ~pool ~pool_name () =
let tracer = Tracer.create () in
let sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub () =
let@ () =
Tracer.with_span tracer
(spf "child_%d.%d.%d.%d" idx idx_child idx_sub idx_sub_sub)
in
for j = 1 to 5 do
let@ () = Tracer.with_span tracer (spf "iter.loop %d" j) in
F.yield ()
done
in
let sub_child ~idx ~idx_child ~idx_sub () =
let@ () =
Tracer.with_span tracer (spf "child_%d.%d.%d" idx idx_child idx_sub)
in
for i = 1 to 10 do
let@ () = Tracer.with_span tracer (spf "iter.loop %02d" i) in
F.yield ()
done;
let subs =
List.init 2 (fun idx_sub_sub ->
F.spawn ~protect:true (fun () ->
sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ()))
in
List.iter F.await subs
in
let top_child ~idx ~idx_child () =
let@ () = Tracer.with_span tracer (spf "child.%d.%d" idx idx_child) in
let subs =
List.init 2 (fun k ->
F.spawn ~protect:true @@ fun () ->
sub_child ~idx ~idx_child ~idx_sub:k ())
in
let@ () =
Tracer.with_span tracer
(spf "child.%d.%d.99.await_children" idx idx_child)
in
List.iter F.await subs
in
let top idx =
let@ () = Tracer.with_span tracer (spf "top_%d" idx) in
let subs =
List.init 5 (fun j ->
F.spawn ~protect:true @@ fun () -> top_child ~idx ~idx_child:j ())
in
List.iter F.await subs
in
Printf.printf "run test on pool = %s\n" pool_name;
let fibs =
List.init 8 (fun idx -> F.spawn_top ~on:pool (fun () -> top idx))
in
List.iter F.await fibs;
Printf.printf "tracing complete\n";
Printf.printf "spans:\n";
let tree = Render.build tracer in
Render.pp stdout tree;
Printf.printf "done\n%!";
()

View file

@ -1,6 +1,179 @@
open! Moonpool
module A = Atomic
module F = Moonpool_fib.Fiber
let ( let@ ) = ( @@ )
let runner = Fifo_pool.create ~num_threads:1 ()
module TS = struct
type t = int list
let show (s : t) = String.concat "." @@ List.map string_of_int s
let init = [ 0 ]
let next_ = function
| [] -> [ 0 ]
| n :: tl -> (n + 1) :: tl
let tick (t : t ref) = t := next_ !t
let tick_get t =
tick t;
!t
end
(* more deterministic logging of events *)
module Log_ = struct
let events : (TS.t * string) list A.t = A.make []
let add_event t msg : unit =
while
let old = A.get events in
not (A.compare_and_set events old ((t, msg) :: old))
do
()
done
let logf t fmt = Printf.ksprintf (add_event t) fmt
let print_and_clear () =
let l =
A.exchange events []
|> List.map (fun (ts, msg) -> List.rev ts, msg)
|> List.sort Stdlib.compare
in
List.iter (fun (ts, msg) -> Printf.printf "%s: %s\n" (TS.show ts) msg) l
end
let logf = Log_.logf
let () =
let@ runner = Moonpool_fib.main in
T_fibers.Fib.run1 ~runner ();
T_fibers.Fib.run2 ~runner ()
Printf.printf "============\nstart\n";
let clock = ref TS.init in
let fib =
F.spawn_top ~on:runner @@ fun () ->
let chan_progress = Chan.create ~max_size:4 () in
let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in
let subs =
List.init 5 (fun i ->
F.spawn ~protect:false @@ fun _n ->
Thread.delay (float i *. 0.01);
Chan.pop chans.(i);
Chan.push chan_progress i;
F.check_if_cancelled ();
i)
in
logf (TS.tick_get clock) "wait for subs";
F.spawn_ignore (fun () ->
for i = 0 to 4 do
Chan.push chans.(i) ();
let i' = Chan.pop chan_progress in
assert (i = i')
done);
(let clock0 = !clock in
List.iteri
(fun i f ->
let clock = ref (0 :: i :: clock0) in
logf !clock "await fiber %d" i;
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
(Option.is_some @@ F.Private_.get_cur_opt ());
let res = F.await f in
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
(Option.is_some @@ F.Private_.get_cur_opt ());
F.yield ();
logf (TS.tick_get clock) "res %d = %d" i res)
subs);
logf (TS.tick_get clock) "main fiber done"
in
Fut.wait_block_exn @@ F.res fib;
logf (TS.tick_get clock) "main fiber exited";
Log_.print_and_clear ();
()
let () =
let@ _r = Moonpool_fib.main in
(* same but now, cancel one of the sub-fibers *)
Printf.printf "============\nstart\n";
let clock = ref TS.init in
let fib =
F.spawn_top ~on:runner @@ fun () ->
let@ () =
F.with_on_self_cancel (fun ebt ->
logf (TS.tick_get clock) "main fiber cancelled with %s"
@@ Exn_bt.show ebt)
in
let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in
let chan_progress = Chan.create ~max_size:4 () in
logf (TS.tick_get clock) "start fibers";
let subs =
let clock0 = !clock in
List.init 10 (fun i ->
let clock = ref (0 :: i :: clock0) in
F.spawn ~protect:false @@ fun _n ->
let@ () =
F.with_on_self_cancel (fun _ ->
logf (TS.tick_get clock) "sub-fiber %d was cancelled" i)
in
Thread.delay 0.002;
(* sync for determinism *)
Chan.pop chans_unblock.(i);
Chan.push chan_progress i;
if i = 7 then (
logf (TS.tick_get clock) "I'm fiber %d and I'm about to fail…" i;
failwith "oh no!"
);
F.check_if_cancelled ();
i)
in
let post = TS.tick_get clock in
List.iteri
(fun i fib ->
F.on_result fib (function
| Ok _ -> logf (i :: post) "fiber %d resolved as ok" i
| Error _ -> logf (i :: post) "fiber %d resolved as error" i))
subs;
(* sequentialize the fibers, for determinism *)
F.spawn_ignore (fun () ->
for j = 0 to 9 do
Chan.push chans_unblock.(j) ();
let j' = Chan.pop chan_progress in
assert (j = j')
done);
logf (TS.tick_get clock) "wait for subs";
List.iteri
(fun i f ->
logf (TS.tick_get clock) "await fiber %d" i;
let res = F.await f in
logf (TS.tick_get clock) "res %d = %d" i res)
subs;
logf (TS.tick_get clock) "yield";
F.yield ();
logf (TS.tick_get clock) "yielded";
logf (TS.tick_get clock) "main fiber done"
in
F.on_result fib (function
| Ok () -> logf (TS.tick_get clock) "main fiber result: ok"
| Error ebt ->
logf (TS.tick_get clock) "main fiber result: error %s" (Exn_bt.show ebt));
(try Fut.wait_block_exn @@ F.res fib
with Failure msg -> logf (TS.tick_get clock) "main fib failed with %S" msg);
logf (TS.tick_get clock) "main fiber exited";
Log_.print_and_clear ();
()

View file

@ -1930,7 +1930,7 @@ spans:
"iter.loop 09"
"iter.loop 10"
done
run test on pool = fifo_pool
run test on pool = ws_pool
tracing complete
spans:
"top_0"

View file

@ -1,12 +1,177 @@
open! Moonpool
module A = Atomic
module F = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls
(* ### dummy little tracing system with local storage *)
type span_id = int
let k_parent : span_id Hmap.key = Hmap.Key.create ()
let ( let@ ) = ( @@ )
let spf = Printf.sprintf
module Span = struct
let new_id_ : unit -> span_id =
let n = A.make 0 in
fun () -> A.fetch_and_add n 1
type t = {
id: span_id;
parent: span_id option;
msg: string;
}
end
module Tracer = struct
type t = { spans: Span.t list A.t }
let create () : t = { spans = A.make [] }
let get self = A.get self.spans
let add (self : t) span =
while
let old = A.get self.spans in
not (A.compare_and_set self.spans old (span :: old))
do
()
done
let with_span self name f =
let id = Span.new_id_ () in
let parent = FLS.get_in_local_hmap_opt k_parent in
let span = { Span.id; parent; msg = name } in
add self span;
FLS.with_in_local_hmap k_parent id f
end
module Render = struct
type span_tree = {
msg: string; (** message of the span at the root *)
children: span_tree list;
}
type t = { roots: span_tree list }
let build (tracer : Tracer.t) : t =
let tops : (span_id, Span.t) Hashtbl.t = Hashtbl.create 16 in
let children : (span_id, Span.t list) Hashtbl.t = Hashtbl.create 16 in
(* everyone is a root at first *)
let all_spans = Tracer.get tracer in
List.iter (fun (sp : Span.t) -> Hashtbl.add tops sp.id sp) all_spans;
(* now consider the parenting relationships *)
let add_span_to_parent (span : Span.t) =
match span.parent with
| None -> ()
| Some p ->
Hashtbl.remove tops span.id;
let l = try Hashtbl.find children p with Not_found -> [] in
Hashtbl.replace children p (span :: l)
in
List.iter add_span_to_parent all_spans;
(* build the tree *)
let rec build_tree (sp : Span.t) : span_tree =
let children = try Hashtbl.find children sp.id with Not_found -> [] in
let children = List.map build_tree children |> List.sort Stdlib.compare in
{ msg = sp.msg; children }
in
let roots =
Hashtbl.fold (fun _ sp l -> build_tree sp :: l) tops []
|> List.sort Stdlib.compare
in
{ roots }
let pp (oc : out_channel) (self : t) : unit =
let rec pp_tree indent out (t : span_tree) =
let prefix = String.make indent ' ' in
Printf.fprintf out "%s%S\n" prefix t.msg;
List.iter (pp_tree (indent + 2) out) t.children
in
List.iter (pp_tree 2 oc) self.roots
end
let run ~pool ~pool_name () =
let tracer = Tracer.create () in
let sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub () =
let@ () =
Tracer.with_span tracer
(spf "child_%d.%d.%d.%d" idx idx_child idx_sub idx_sub_sub)
in
for j = 1 to 5 do
let@ () = Tracer.with_span tracer (spf "iter.loop %d" j) in
F.yield ()
done
in
let sub_child ~idx ~idx_child ~idx_sub () =
let@ () =
Tracer.with_span tracer (spf "child_%d.%d.%d" idx idx_child idx_sub)
in
for i = 1 to 10 do
let@ () = Tracer.with_span tracer (spf "iter.loop %02d" i) in
F.yield ()
done;
let subs =
List.init 2 (fun idx_sub_sub ->
F.spawn ~protect:true (fun () ->
sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ()))
in
List.iter F.await subs
in
let top_child ~idx ~idx_child () =
let@ () = Tracer.with_span tracer (spf "child.%d.%d" idx idx_child) in
let subs =
List.init 2 (fun k ->
F.spawn ~protect:true @@ fun () ->
sub_child ~idx ~idx_child ~idx_sub:k ())
in
let@ () =
Tracer.with_span tracer
(spf "child.%d.%d.99.await_children" idx idx_child)
in
List.iter F.await subs
in
let top idx =
let@ () = Tracer.with_span tracer (spf "top_%d" idx) in
let subs =
List.init 5 (fun j ->
F.spawn ~protect:true @@ fun () -> top_child ~idx ~idx_child:j ())
in
List.iter F.await subs
in
Printf.printf "run test on pool = %s\n" pool_name;
let fibs =
List.init 8 (fun idx -> F.spawn_top ~on:pool (fun () -> top idx))
in
List.iter F.wait_block_exn fibs;
Printf.printf "tracing complete\n";
Printf.printf "spans:\n";
let tree = Render.build tracer in
Render.pp stdout tree;
Printf.printf "done\n%!";
()
let () =
let@ _ = Moonpool_fib.main in
(let@ pool = Ws_pool.with_ () in
T_fibers.Fls.run ~pool ~pool_name:"ws_pool" ());
run ~pool ~pool_name:"ws_pool" ());
(let@ pool = Fifo_pool.with_ () in
T_fibers.Fls.run ~pool ~pool_name:"fifo_pool" ());
run ~pool ~pool_name:"ws_pool" ());
()

View file

@ -1,16 +0,0 @@
(tests
(names t_fls t_main t_fib1)
(enabled_if
(>= %{ocaml_version} 5.0))
(package moonpool-lwt)
(libraries
t_fibers
moonpool
moonpool.fib
moonpool-lwt
trace
trace-tef
qcheck-core
qcheck-core.runner
;tracy-client.trace
))

View file

@ -1,61 +0,0 @@
============
start
1: wait for subs
1.0.0: await fiber 0
1.0.1: cur fiber[0] is some: true
1.0.2: cur fiber[0] is some: true
1.0.3: res 0 = 0
1.1.0: await fiber 1
1.1.1: cur fiber[1] is some: true
1.1.2: cur fiber[1] is some: true
1.1.3: res 1 = 1
1.2.0: await fiber 2
1.2.1: cur fiber[2] is some: true
1.2.2: cur fiber[2] is some: true
1.2.3: res 2 = 2
1.3.0: await fiber 3
1.3.1: cur fiber[3] is some: true
1.3.2: cur fiber[3] is some: true
1.3.3: res 3 = 3
1.4.0: await fiber 4
1.4.1: cur fiber[4] is some: true
1.4.2: cur fiber[4] is some: true
1.4.3: res 4 = 4
2: main fiber done
3: main fiber exited
============
start
1: start fibers
1.7.1: I'm fiber 7 and I'm about to fail…
1.8.1: sub-fiber 8 was cancelled
1.9.1: sub-fiber 9 was cancelled
2.0: fiber 0 resolved as ok
2.1: fiber 1 resolved as ok
2.2: fiber 2 resolved as ok
2.3: fiber 3 resolved as ok
2.4: fiber 4 resolved as ok
2.5: fiber 5 resolved as ok
2.6: fiber 6 resolved as ok
2.7: fiber 7 resolved as error
2.8: fiber 8 resolved as error
2.9: fiber 9 resolved as error
3: wait for subs
4: await fiber 0
5: res 0 = 0
6: await fiber 1
7: res 1 = 1
8: await fiber 2
9: res 2 = 2
10: await fiber 3
11: res 3 = 3
12: await fiber 4
13: res 4 = 4
14: await fiber 5
15: res 5 = 5
16: await fiber 6
17: res 6 = 6
18: await fiber 7
19: main fiber cancelled with Failure("oh no!")
20: main fiber result: error Failure("oh no!")
21: main fib failed with "oh no!"
22: main fiber exited

View file

@ -1,9 +0,0 @@
open! Moonpool
module M_lwt = Moonpool_lwt
let ( let@ ) = ( @@ )
let () =
let@ runner = M_lwt.lwt_main in
T_fibers.Fib.run1 ~runner ();
T_fibers.Fib.run2 ~runner ()

File diff suppressed because it is too large Load diff

View file

@ -1,8 +0,0 @@
module M_lwt = Moonpool_lwt
let ( let@ ) = ( @@ )
let () =
(let@ runner = M_lwt.lwt_main in
T_fibers.Fls.run ~pool:runner ~pool_name:"lwt" ());
()

View file

@ -1,35 +0,0 @@
open Moonpool
module M_lwt = Moonpool_lwt
module F = Moonpool_fib
let ( let@ ) = ( @@ )
let () =
(* run fibers in the background, await them in the main thread *)
let@ bg = Fifo_pool.with_ ~num_threads:4 () in
let r =
M_lwt.lwt_main @@ fun runner ->
let f1 = F.spawn_top ~on:bg (fun () -> 1) in
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
let f3 = F.spawn_top ~on:runner (fun () -> F.await f1 + 10) in
let r = F.await f2 + F.await f3 in
assert (r = 13);
r
in
assert (r = 13)
let () =
(* run multiple times to make sure cleanup is correct *)
for _i = 1 to 10 do
try
let _r =
M_lwt.lwt_main @@ fun runner ->
let fib = F.spawn_top ~on:runner (fun () -> failwith "oops") in
F.await fib
in
assert false
with Failure msg ->
(* Printf.eprintf "got %S\n%!" msg; *)
assert (msg = "oops")
done

View file

@ -1,3 +1,4 @@
module M = Moonpool
module M_lwt = Moonpool_lwt
module Trace = Trace_core