Compare commits

...

7 commits

Author SHA1 Message Date
Simon Cruanes
a5740468f5
Merge fc5fd996fc into 867cbd2318 2025-09-04 20:03:38 +00:00
Simon Cruanes
fc5fd996fc
update test 2025-09-04 16:03:31 -04:00
Simon Cruanes
3b61d4294f
feat lwt: make sure we can setup/cleanup multiple times 2025-09-04 16:03:06 -04:00
Simon Cruanes
47138b4241
more sanity checks 2025-09-04 15:32:32 -04:00
Simon Cruanes
1c794e1e42
detail 2025-09-04 14:47:02 -04:00
Simon Cruanes
4286eedeec
update lwt test 2025-09-04 14:46:53 -04:00
Simon Cruanes
2396e56c63
feat lwt: make most functions work on any thread, not just the main 2025-09-04 14:46:35 -04:00
6 changed files with 322 additions and 105 deletions

View file

@ -20,7 +20,7 @@ module Scheduler_state = struct
(** Other threads ask us to run closures in the lwt thread *) (** Other threads ask us to run closures in the lwt thread *)
mutex: Mutex.t; mutex: Mutex.t;
mutable thread: int; mutable thread: int;
mutable closed: bool; closed: bool Atomic.t;
mutable as_runner: Moonpool.Runner.t; mutable as_runner: Moonpool.Runner.t;
mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option; mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option;
mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option; mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option;
@ -29,13 +29,16 @@ module Scheduler_state = struct
has_notified: bool Atomic.t; has_notified: bool Atomic.t;
} }
let st : st = (** Main state *)
let cur_st : st option Atomic.t = Atomic.make None
let create_new () : st =
{ {
tasks = Queue.create (); tasks = Queue.create ();
actions_from_other_threads = Queue.create (); actions_from_other_threads = Queue.create ();
mutex = Mutex.create (); mutex = Mutex.create ();
thread = Thread.self () |> Thread.id; thread = Thread.id (Thread.self ());
closed = false; closed = Atomic.make false;
as_runner = Moonpool.Runner.dummy; as_runner = Moonpool.Runner.dummy;
enter_hook = None; enter_hook = None;
leave_hook = None; leave_hook = None;
@ -43,12 +46,36 @@ module Scheduler_state = struct
has_notified = Atomic.make false; has_notified = Atomic.make false;
} }
let add_action_from_another_thread_ (self : st) f : unit = let[@inline never] add_action_from_another_thread_ (self : st) f : unit =
Mutex.lock st.mutex; Mutex.lock self.mutex;
Queue.push f self.actions_from_other_threads; Queue.push f self.actions_from_other_threads;
Mutex.unlock st.mutex;
if not (Atomic.exchange self.has_notified true) then if not (Atomic.exchange self.has_notified true) then
Lwt_unix.send_notification self.notification Lwt_unix.send_notification self.notification;
Mutex.unlock self.mutex
let[@inline] on_lwt_thread_ (self : st) : bool =
Thread.id (Thread.self ()) = self.thread
let[@inline] run_on_lwt_thread_ (self : st) (f : unit -> unit) : unit =
if on_lwt_thread_ self then
f ()
else
add_action_from_another_thread_ self f
let cleanup (st : st) =
match Atomic.get cur_st with
| Some st' ->
if st != st' then
failwith
"moonpool-lwt: cleanup failed (state is not the currently active \
one!)";
if not (on_lwt_thread_ st) then
failwith "moonpool-lwt: cleanup from the wrong thread";
Option.iter Lwt_main.Enter_iter_hooks.remove st.enter_hook;
Option.iter Lwt_main.Leave_iter_hooks.remove st.leave_hook;
Atomic.set cur_st None
| _ -> ()
end end
module Ops = struct module Ops = struct
@ -57,39 +84,34 @@ module Ops = struct
let around_task _ = default_around_task_ let around_task _ = default_around_task_
let schedule (self : st) t = let schedule (self : st) t =
if Thread.id (Thread.self ()) = self.thread then if Atomic.get self.closed then
Queue.push t self.tasks failwith "moonpool-lwt.schedule: scheduler is closed";
else Scheduler_state.run_on_lwt_thread_ self (fun () -> Queue.push t self.tasks)
Scheduler_state.add_action_from_another_thread_ self (fun () ->
Queue.push t self.tasks)
let get_next_task (self : st) = let get_next_task (self : st) =
if self.closed then raise WL.No_more_tasks; if Atomic.get self.closed then raise WL.No_more_tasks;
try Queue.pop self.tasks with Queue.Empty -> raise WL.No_more_tasks try Queue.pop self.tasks with Queue.Empty -> raise WL.No_more_tasks
let on_exn _ ebt = !on_uncaught_exn ebt let on_exn _ ebt = !on_uncaught_exn ebt
let runner (self : st) = self.as_runner let runner (self : st) = self.as_runner
let cleanup = Scheduler_state.cleanup
let as_runner (self : st) : Moonpool.Runner.t = let as_runner (self : st) : Moonpool.Runner.t =
Moonpool.Runner.For_runner_implementors.create Moonpool.Runner.For_runner_implementors.create
~size:(fun () -> 1) ~size:(fun () -> 1)
~num_tasks:(fun () -> ~num_tasks:(fun () ->
(* FIXME: thread safety. use an atomic?? *) Mutex.lock self.mutex;
Queue.length self.tasks) let n = Queue.length self.tasks in
Mutex.unlock self.mutex;
n)
~run_async:(fun ~fiber f -> schedule self @@ WL.T_start { fiber; f }) ~run_async:(fun ~fiber f -> schedule self @@ WL.T_start { fiber; f })
~shutdown:(fun ~wait:_ () -> self.closed <- true) ~shutdown:(fun ~wait:_ () -> Atomic.set self.closed true)
() ()
let before_start (self : st) : unit = let before_start (self : st) : unit =
self.as_runner <- as_runner self; self.as_runner <- as_runner self;
() ()
let cleanup (self : st) =
self.closed <- true;
Option.iter Lwt_main.Enter_iter_hooks.remove self.enter_hook;
Option.iter Lwt_main.Leave_iter_hooks.remove self.leave_hook;
()
let ops : st WL.ops = let ops : st WL.ops =
{ {
schedule; schedule;
@ -100,35 +122,72 @@ module Ops = struct
before_start; before_start;
cleanup; cleanup;
} }
let setup st =
if Atomic.compare_and_set Scheduler_state.cur_st None (Some st) then
before_start st
else
failwith "moonpool-lwt: setup failed (state already in place)"
end end
open struct (** Resolve [prom] with the result of [lwt_fut] *)
module FG = let transfer_lwt_to_fut (lwt_fut : 'a Lwt.t) (prom : 'a Fut.promise) : unit =
WL.Fine_grained Lwt.on_any lwt_fut
(struct (fun x -> M.Fut.fulfill prom (Ok x))
include Scheduler_state (fun exn ->
let bt = Printexc.get_callstack 10 in
M.Fut.fulfill prom (Error (Exn_bt.make exn bt)))
let ops = Ops.ops let[@inline] register_trigger_on_lwt_termination (lwt_fut : _ Lwt.t)
end) (tr : M.Trigger.t) : unit =
() Lwt.on_termination lwt_fut (fun _ -> M.Trigger.signal tr)
end
let await_lwt (fut : _ Lwt.t) = let[@inline] await_lwt_terminated (fut : _ Lwt.t) =
match Lwt.state fut with match Lwt.state fut with
| Return x -> x | Return x -> x
| Fail exn -> raise exn | Fail exn -> raise exn
| Sleep -> | Sleep -> assert false
(* suspend fiber, wake it up when [fut] resolves *)
let trigger = M.Trigger.create () in
Lwt.on_termination fut (fun _ -> M.Trigger.signal trigger);
M.Trigger.await trigger |> Option.iter Exn_bt.raise;
(match Lwt.state fut with module Main_state = struct
let[@inline] get_st () : Scheduler_state.st =
match Atomic.get Scheduler_state.cur_st with
| Some st ->
if Atomic.get st.closed then failwith "moonpool-lwt: scheduler is closed";
st
| None -> failwith "moonpool-lwt: scheduler is not setup"
let[@inline] run_on_lwt_thread f =
Scheduler_state.run_on_lwt_thread_ (get_st ()) f
let[@inline] on_lwt_thread () : bool =
Scheduler_state.on_lwt_thread_ (get_st ())
let[@inline] add_action_from_another_thread f : unit =
Scheduler_state.add_action_from_another_thread_ (get_st ()) f
end
let await_lwt (fut : _ Lwt.t) =
if Scheduler_state.on_lwt_thread_ (Main_state.get_st ()) then (
(* can directly access the future *)
match Lwt.state fut with
| Return x -> x | Return x -> x
| Fail exn -> raise exn | Fail exn -> raise exn
| Sleep -> assert false) | Sleep ->
let tr = M.Trigger.create () in
register_trigger_on_lwt_termination fut tr;
M.Trigger.await_exn tr;
await_lwt_terminated fut
) else (
let tr = M.Trigger.create () in
Main_state.add_action_from_another_thread (fun () ->
register_trigger_on_lwt_termination fut tr);
M.Trigger.await_exn tr;
await_lwt_terminated fut
)
let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t = let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
if not (Main_state.on_lwt_thread ()) then
failwith "lwt_of_fut: not on the lwt thread";
let lwt_fut, lwt_prom = Lwt.wait () in let lwt_fut, lwt_prom = Lwt.wait () in
(* in lwt thread, resolve [lwt_fut] *) (* in lwt thread, resolve [lwt_fut] *)
@ -140,64 +199,98 @@ let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
in in
M.Fut.on_result fut (fun res -> M.Fut.on_result fut (fun res ->
if Thread.id (Thread.self ()) = Scheduler_state.st.thread then Main_state.run_on_lwt_thread (fun () ->
(* can safely wakeup from the lwt thread *) (* can safely wakeup from the lwt thread *)
wakeup_using_res res wakeup_using_res res));
else
Scheduler_state.add_action_from_another_thread_ Scheduler_state.st
(fun () -> wakeup_using_res res));
lwt_fut lwt_fut
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
match Lwt.poll lwt_fut with if Main_state.on_lwt_thread () then (
| Some x -> M.Fut.return x match Lwt.state lwt_fut with
| None -> | Return x -> M.Fut.return x
| _ ->
let fut, prom = M.Fut.make () in
transfer_lwt_to_fut lwt_fut prom;
fut
) else (
let fut, prom = M.Fut.make () in let fut, prom = M.Fut.make () in
Lwt.on_any lwt_fut Main_state.add_action_from_another_thread (fun () ->
(fun x -> M.Fut.fulfill prom (Ok x)) transfer_lwt_to_fut lwt_fut prom);
(fun exn ->
let bt = Printexc.get_callstack 10 in
M.Fut.fulfill prom (Error (Exn_bt.make exn bt)));
fut fut
let run_in_hook () =
(* execute actions sent from other threads; first transfer them
all atomically to a local queue to reduce contention *)
let local_acts = Queue.create () in
Mutex.lock Scheduler_state.st.mutex;
Queue.transfer Scheduler_state.st.actions_from_other_threads local_acts;
Atomic.set Scheduler_state.st.has_notified false;
Mutex.unlock Scheduler_state.st.mutex;
Queue.iter (fun f -> f ()) local_acts;
(* run tasks *)
FG.run ~max_tasks:1000 ();
if not (Queue.is_empty Scheduler_state.st.tasks) then
ignore (Lwt.pause () : unit Lwt.t);
()
let is_setup_ = Atomic.make false
let setup () =
if not (Atomic.exchange is_setup_ true) then (
(* only one thread does this *)
FG.setup ~block_signals:false ();
Scheduler_state.st.enter_hook <-
Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook);
Scheduler_state.st.leave_hook <-
Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook);
(* notification used to wake lwt up *)
Scheduler_state.st.notification <-
Lwt_unix.make_notification ~once:false run_in_hook
) )
let run_in_lwt_and_await (f : unit -> 'a Lwt.t) : 'a =
if Main_state.on_lwt_thread () then (
let fut = f () in
await_lwt fut
) else (
let fut, prom = Fut.make () in
Main_state.add_action_from_another_thread (fun () ->
let lwt_fut = f () in
transfer_lwt_to_fut lwt_fut prom);
Fut.await fut
)
module Setup_lwt_hooks (ARG : sig
val st : Scheduler_state.st
end) =
struct
open ARG
module FG =
WL.Fine_grained
(struct
include Scheduler_state
let st = st
let ops = Ops.ops
end)
()
let run_in_hook () =
(* execute actions sent from other threads; first transfer them
all atomically to a local queue to reduce contention *)
let local_acts = Queue.create () in
Mutex.lock st.mutex;
Queue.transfer st.actions_from_other_threads local_acts;
Atomic.set st.has_notified false;
Mutex.unlock st.mutex;
Queue.iter (fun f -> f ()) local_acts;
(* run tasks *)
FG.run ~max_tasks:1000 ();
if not (Queue.is_empty st.tasks) then ignore (Lwt.pause () : unit Lwt.t);
()
let setup () =
(* only one thread does this *)
FG.setup ~block_signals:false ();
st.thread <- Thread.self () |> Thread.id;
st.enter_hook <- Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook);
st.leave_hook <- Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook);
(* notification used to wake lwt up *)
st.notification <- Lwt_unix.make_notification ~once:false run_in_hook
end
let setup () : Scheduler_state.st =
let st = Scheduler_state.create_new () in
Ops.setup st;
let module Setup_lwt_hooks' = Setup_lwt_hooks (struct
let st = st
end) in
Setup_lwt_hooks'.setup ();
st
let[@inline] is_setup () = Option.is_some @@ Atomic.get Scheduler_state.cur_st
let spawn_lwt f : _ Lwt.t = let spawn_lwt f : _ Lwt.t =
setup (); let st = Main_state.get_st () in
let lwt_fut, lwt_prom = Lwt.wait () in let lwt_fut, lwt_prom = Lwt.wait () in
M.Runner.run_async Scheduler_state.st.as_runner (fun () -> M.Runner.run_async st.as_runner (fun () ->
try try
let x = f () in let x = f () in
Lwt.wakeup lwt_prom x Lwt.wakeup lwt_prom x
@ -205,11 +298,13 @@ let spawn_lwt f : _ Lwt.t =
lwt_fut lwt_fut
let lwt_main (f : _ -> 'a) : 'a = let lwt_main (f : _ -> 'a) : 'a =
setup (); let st = setup () in
Scheduler_state.st.thread <- Thread.self () |> Thread.id; (* make sure to cleanup *)
let fut = spawn_lwt (fun () -> f Scheduler_state.st.as_runner) in let finally () = Scheduler_state.cleanup st in
Fun.protect ~finally @@ fun () ->
let fut = spawn_lwt (fun () -> f st.as_runner) in
Lwt_main.run fut Lwt_main.run fut
let lwt_main_runner () = let[@inline] lwt_main_runner () =
if not (Atomic.get is_setup_) then failwith "lwt_main_runner: not setup yet"; let st = Main_state.get_st () in
Scheduler_state.st.as_runner st.as_runner

View file

@ -18,18 +18,22 @@ val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t
val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t
(** [lwt_of_fut fut] makes a lwt future that completes when [fut] does. This (** [lwt_of_fut fut] makes a lwt future that completes when [fut] does. This
must be called from the Lwt thread, and the result must always be used only must be called from the Lwt thread, and the result must always be used only
from inside the Lwt thread. *) from inside the Lwt thread.
@raise Failure if not run from the lwt thread. *)
(** {2 Helpers on the moonpool side} *) (** {2 Helpers on the moonpool side} *)
val spawn_lwt : (unit -> 'a) -> 'a Lwt.t val spawn_lwt : (unit -> 'a) -> 'a Lwt.t
(** This spawns a task that runs in the Lwt scheduler *) (** This spawns a task that runs in the Lwt scheduler.
@raise Failure if {!lwt_main} was not called. *)
val await_lwt : 'a Lwt.t -> 'a val await_lwt : 'a Lwt.t -> 'a
(** [await_lwt fut] awaits a Lwt future from inside a task running on a moonpool (** [await_lwt fut] awaits a Lwt future from inside a task running on a moonpool
runner. This must be run from within a Moonpool runner so that the await-ing runner. This must be run from within a Moonpool runner so that the await-ing
effect is handled. *) effect is handled. *)
val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a
(** {2 Wrappers around Lwt_main} *) (** {2 Wrappers around Lwt_main} *)
val on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref val on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref
@ -39,4 +43,7 @@ val lwt_main : (Moonpool.Runner.t -> 'a) -> 'a
val lwt_main_runner : unit -> Moonpool.Runner.t val lwt_main_runner : unit -> Moonpool.Runner.t
(** The runner from {!lwt_main}. The runner is only going to work if {!lwt_main} (** The runner from {!lwt_main}. The runner is only going to work if {!lwt_main}
is currently running in some thread. *) is currently running in some thread.
@raise Failure if {!lwt_main} was not called. *)
val is_setup : unit -> bool

View file

@ -1,4 +1,4 @@
[@@@ocaml.deprecated "use Picos_std_sync or single threaded solutions"] [@@@ocaml.deprecated "use Picos_std_sync directly or single threaded solutions"]
module Mutex = Picos_std_sync.Mutex module Mutex = Picos_std_sync.Mutex
module Condition = Picos_std_sync.Condition module Condition = Picos_std_sync.Condition

View file

@ -101,7 +101,17 @@ let () =
in in
Arg.parse opts ignore "echo client"; Arg.parse opts ignore "echo client";
(* Lwt_engine.set @@ new Lwt_engine.libev (); *) let main () =
M_lwt.lwt_main @@ fun _runner -> (* Lwt_engine.set @@ new Lwt_engine.libev (); *)
main ~port:!port ~n:!n ~n_conn:!n_conn ~verbose:!verbose M_lwt.lwt_main @@ fun _runner ->
~msg_per_conn:!msg_per_conn () main ~port:!port ~n:!n ~n_conn:!n_conn ~verbose:!verbose
~msg_per_conn:!msg_per_conn ()
in
print_endline "first run";
main ();
assert (not (M_lwt.is_setup ()));
print_endline "second run";
main ();
assert (not (M_lwt.is_setup ()));
print_endline "done"

View file

@ -11,7 +11,7 @@ let str_of_sockaddr = function
| Unix.ADDR_INET (addr, port) -> | Unix.ADDR_INET (addr, port) ->
spf "%s:%d" (Unix.string_of_inet_addr addr) port spf "%s:%d" (Unix.string_of_inet_addr addr) port
let main ~port ~verbose ~runner:_ () : unit Lwt.t = let main ~port ~verbose ~runner:_ () : unit =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
let lwt_fut, _lwt_prom = Lwt.wait () in let lwt_fut, _lwt_prom = Lwt.wait () in
@ -54,7 +54,7 @@ let main ~port ~verbose ~runner:_ () : unit Lwt.t =
Lwt_io.establish_server_with_client_address addr handle_client |> await_lwt Lwt_io.establish_server_with_client_address addr handle_client |> await_lwt
in in
lwt_fut M_lwt.await_lwt lwt_fut
let () = let () =
let@ () = Trace_tef.with_setup () in let@ () = Trace_tef.with_setup () in
@ -75,4 +75,4 @@ let () =
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
(* Lwt_engine.set @@ new Lwt_engine.libev (); *) (* Lwt_engine.set @@ new Lwt_engine.libev (); *)
Lwt_main.run @@ main ~runner ~port:!port ~verbose:!verbose () M_lwt.lwt_main @@ fun _ -> main ~runner ~port:!port ~verbose:!verbose ()

View file

@ -2,7 +2,21 @@ run echo server on port=12346
listening on port 12346 listening on port 12346
run echo client -p 12346 -n 10 --n-conn=2 -v run echo client -p 12346 -n 10 --n-conn=2 -v
all done all done
all done
connecting to port 12346 (10 msg per conn, 10 conns total, 2 max at a time) connecting to port 12346 (10 msg per conn, 10 conns total, 2 max at a time)
connecting to port 12346 (10 msg per conn, 10 conns total, 2 max at a time)
done
first run
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1 read: hello 1
read: hello 1 read: hello 1
read: hello 1 read: hello 1
@ -23,6 +37,26 @@ read: hello 10
read: hello 10 read: hello 10
read: hello 10 read: hello 10
read: hello 10 read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2 read: hello 2
read: hello 2 read: hello 2
read: hello 2 read: hello 2
@ -43,6 +77,26 @@ read: hello 3
read: hello 3 read: hello 3
read: hello 3 read: hello 3
read: hello 3 read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4 read: hello 4
read: hello 4 read: hello 4
read: hello 4 read: hello 4
@ -63,6 +117,26 @@ read: hello 5
read: hello 5 read: hello 5
read: hello 5 read: hello 5
read: hello 5 read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6 read: hello 6
read: hello 6 read: hello 6
read: hello 6 read: hello 6
@ -83,6 +157,26 @@ read: hello 7
read: hello 7 read: hello 7
read: hello 7 read: hello 7
read: hello 7 read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8 read: hello 8
read: hello 8 read: hello 8
read: hello 8 read: hello 8
@ -103,3 +197,14 @@ read: hello 9
read: hello 9 read: hello 9
read: hello 9 read: hello 9
read: hello 9 read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
second run