Compare commits

..

1 commit

Author SHA1 Message Date
Simon Cruanes
0fefc62d35
Merge 820de8e054 into 867cbd2318 2025-07-11 14:52:21 +00:00
6 changed files with 99 additions and 316 deletions

View file

@ -20,7 +20,7 @@ module Scheduler_state = struct
(** Other threads ask us to run closures in the lwt thread *) (** Other threads ask us to run closures in the lwt thread *)
mutex: Mutex.t; mutex: Mutex.t;
mutable thread: int; mutable thread: int;
closed: bool Atomic.t; mutable closed: bool;
mutable as_runner: Moonpool.Runner.t; mutable as_runner: Moonpool.Runner.t;
mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option; mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option;
mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option; mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option;
@ -29,16 +29,13 @@ module Scheduler_state = struct
has_notified: bool Atomic.t; has_notified: bool Atomic.t;
} }
(** Main state *) let st : st =
let cur_st : st option Atomic.t = Atomic.make None
let create_new () : st =
{ {
tasks = Queue.create (); tasks = Queue.create ();
actions_from_other_threads = Queue.create (); actions_from_other_threads = Queue.create ();
mutex = Mutex.create (); mutex = Mutex.create ();
thread = Thread.id (Thread.self ()); thread = Thread.self () |> Thread.id;
closed = Atomic.make false; closed = false;
as_runner = Moonpool.Runner.dummy; as_runner = Moonpool.Runner.dummy;
enter_hook = None; enter_hook = None;
leave_hook = None; leave_hook = None;
@ -46,36 +43,12 @@ module Scheduler_state = struct
has_notified = Atomic.make false; has_notified = Atomic.make false;
} }
let[@inline never] add_action_from_another_thread_ (self : st) f : unit = let add_action_from_another_thread_ (self : st) f : unit =
Mutex.lock self.mutex; Mutex.lock st.mutex;
Queue.push f self.actions_from_other_threads; Queue.push f self.actions_from_other_threads;
Mutex.unlock st.mutex;
if not (Atomic.exchange self.has_notified true) then if not (Atomic.exchange self.has_notified true) then
Lwt_unix.send_notification self.notification; Lwt_unix.send_notification self.notification
Mutex.unlock self.mutex
let[@inline] on_lwt_thread_ (self : st) : bool =
Thread.id (Thread.self ()) = self.thread
let[@inline] run_on_lwt_thread_ (self : st) (f : unit -> unit) : unit =
if on_lwt_thread_ self then
f ()
else
add_action_from_another_thread_ self f
let cleanup (st : st) =
match Atomic.get cur_st with
| Some st' ->
if st != st' then
failwith
"moonpool-lwt: cleanup failed (state is not the currently active \
one!)";
if not (on_lwt_thread_ st) then
failwith "moonpool-lwt: cleanup from the wrong thread";
Option.iter Lwt_main.Enter_iter_hooks.remove st.enter_hook;
Option.iter Lwt_main.Leave_iter_hooks.remove st.leave_hook;
Atomic.set cur_st None
| _ -> ()
end end
module Ops = struct module Ops = struct
@ -84,34 +57,39 @@ module Ops = struct
let around_task _ = default_around_task_ let around_task _ = default_around_task_
let schedule (self : st) t = let schedule (self : st) t =
if Atomic.get self.closed then if Thread.id (Thread.self ()) = self.thread then
failwith "moonpool-lwt.schedule: scheduler is closed"; Queue.push t self.tasks
Scheduler_state.run_on_lwt_thread_ self (fun () -> Queue.push t self.tasks) else
Scheduler_state.add_action_from_another_thread_ self (fun () ->
Queue.push t self.tasks)
let get_next_task (self : st) = let get_next_task (self : st) =
if Atomic.get self.closed then raise WL.No_more_tasks; if self.closed then raise WL.No_more_tasks;
try Queue.pop self.tasks with Queue.Empty -> raise WL.No_more_tasks try Queue.pop self.tasks with Queue.Empty -> raise WL.No_more_tasks
let on_exn _ ebt = !on_uncaught_exn ebt let on_exn _ ebt = !on_uncaught_exn ebt
let runner (self : st) = self.as_runner let runner (self : st) = self.as_runner
let cleanup = Scheduler_state.cleanup
let as_runner (self : st) : Moonpool.Runner.t = let as_runner (self : st) : Moonpool.Runner.t =
Moonpool.Runner.For_runner_implementors.create Moonpool.Runner.For_runner_implementors.create
~size:(fun () -> 1) ~size:(fun () -> 1)
~num_tasks:(fun () -> ~num_tasks:(fun () ->
Mutex.lock self.mutex; (* FIXME: thread safety. use an atomic?? *)
let n = Queue.length self.tasks in Queue.length self.tasks)
Mutex.unlock self.mutex;
n)
~run_async:(fun ~fiber f -> schedule self @@ WL.T_start { fiber; f }) ~run_async:(fun ~fiber f -> schedule self @@ WL.T_start { fiber; f })
~shutdown:(fun ~wait:_ () -> Atomic.set self.closed true) ~shutdown:(fun ~wait:_ () -> self.closed <- true)
() ()
let before_start (self : st) : unit = let before_start (self : st) : unit =
self.as_runner <- as_runner self; self.as_runner <- as_runner self;
() ()
let cleanup (self : st) =
self.closed <- true;
Option.iter Lwt_main.Enter_iter_hooks.remove self.enter_hook;
Option.iter Lwt_main.Leave_iter_hooks.remove self.leave_hook;
()
let ops : st WL.ops = let ops : st WL.ops =
{ {
schedule; schedule;
@ -122,72 +100,35 @@ module Ops = struct
before_start; before_start;
cleanup; cleanup;
} }
let setup st =
if Atomic.compare_and_set Scheduler_state.cur_st None (Some st) then
before_start st
else
failwith "moonpool-lwt: setup failed (state already in place)"
end end
(** Resolve [prom] with the result of [lwt_fut] *) open struct
let transfer_lwt_to_fut (lwt_fut : 'a Lwt.t) (prom : 'a Fut.promise) : unit = module FG =
Lwt.on_any lwt_fut WL.Fine_grained
(fun x -> M.Fut.fulfill prom (Ok x)) (struct
(fun exn -> include Scheduler_state
let bt = Printexc.get_callstack 10 in
M.Fut.fulfill prom (Error (Exn_bt.make exn bt)))
let[@inline] register_trigger_on_lwt_termination (lwt_fut : _ Lwt.t) let ops = Ops.ops
(tr : M.Trigger.t) : unit = end)
Lwt.on_termination lwt_fut (fun _ -> M.Trigger.signal tr) ()
let[@inline] await_lwt_terminated (fut : _ Lwt.t) =
match Lwt.state fut with
| Return x -> x
| Fail exn -> raise exn
| Sleep -> assert false
module Main_state = struct
let[@inline] get_st () : Scheduler_state.st =
match Atomic.get Scheduler_state.cur_st with
| Some st ->
if Atomic.get st.closed then failwith "moonpool-lwt: scheduler is closed";
st
| None -> failwith "moonpool-lwt: scheduler is not setup"
let[@inline] run_on_lwt_thread f =
Scheduler_state.run_on_lwt_thread_ (get_st ()) f
let[@inline] on_lwt_thread () : bool =
Scheduler_state.on_lwt_thread_ (get_st ())
let[@inline] add_action_from_another_thread f : unit =
Scheduler_state.add_action_from_another_thread_ (get_st ()) f
end end
let await_lwt (fut : _ Lwt.t) = let await_lwt (fut : _ Lwt.t) =
if Scheduler_state.on_lwt_thread_ (Main_state.get_st ()) then ( match Lwt.state fut with
(* can directly access the future *) | Return x -> x
match Lwt.state fut with | Fail exn -> raise exn
| Sleep ->
(* suspend fiber, wake it up when [fut] resolves *)
let trigger = M.Trigger.create () in
Lwt.on_termination fut (fun _ -> M.Trigger.signal trigger);
M.Trigger.await trigger |> Option.iter Exn_bt.raise;
(match Lwt.state fut with
| Return x -> x | Return x -> x
| Fail exn -> raise exn | Fail exn -> raise exn
| Sleep -> | Sleep -> assert false)
let tr = M.Trigger.create () in
register_trigger_on_lwt_termination fut tr;
M.Trigger.await_exn tr;
await_lwt_terminated fut
) else (
let tr = M.Trigger.create () in
Main_state.add_action_from_another_thread (fun () ->
register_trigger_on_lwt_termination fut tr);
M.Trigger.await_exn tr;
await_lwt_terminated fut
)
let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t = let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
if not (Main_state.on_lwt_thread ()) then
failwith "lwt_of_fut: not on the lwt thread";
let lwt_fut, lwt_prom = Lwt.wait () in let lwt_fut, lwt_prom = Lwt.wait () in
(* in lwt thread, resolve [lwt_fut] *) (* in lwt thread, resolve [lwt_fut] *)
@ -199,98 +140,64 @@ let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
in in
M.Fut.on_result fut (fun res -> M.Fut.on_result fut (fun res ->
Main_state.run_on_lwt_thread (fun () -> if Thread.id (Thread.self ()) = Scheduler_state.st.thread then
(* can safely wakeup from the lwt thread *) (* can safely wakeup from the lwt thread *)
wakeup_using_res res)); wakeup_using_res res
else
Scheduler_state.add_action_from_another_thread_ Scheduler_state.st
(fun () -> wakeup_using_res res));
lwt_fut lwt_fut
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
if Main_state.on_lwt_thread () then ( match Lwt.poll lwt_fut with
match Lwt.state lwt_fut with | Some x -> M.Fut.return x
| Return x -> M.Fut.return x | None ->
| _ ->
let fut, prom = M.Fut.make () in
transfer_lwt_to_fut lwt_fut prom;
fut
) else (
let fut, prom = M.Fut.make () in let fut, prom = M.Fut.make () in
Main_state.add_action_from_another_thread (fun () -> Lwt.on_any lwt_fut
transfer_lwt_to_fut lwt_fut prom); (fun x -> M.Fut.fulfill prom (Ok x))
(fun exn ->
let bt = Printexc.get_callstack 10 in
M.Fut.fulfill prom (Error (Exn_bt.make exn bt)));
fut fut
)
let run_in_lwt_and_await (f : unit -> 'a Lwt.t) : 'a = let run_in_hook () =
if Main_state.on_lwt_thread () then ( (* execute actions sent from other threads; first transfer them
let fut = f () in
await_lwt fut
) else (
let fut, prom = Fut.make () in
Main_state.add_action_from_another_thread (fun () ->
let lwt_fut = f () in
transfer_lwt_to_fut lwt_fut prom);
Fut.await fut
)
module Setup_lwt_hooks (ARG : sig
val st : Scheduler_state.st
end) =
struct
open ARG
module FG =
WL.Fine_grained
(struct
include Scheduler_state
let st = st
let ops = Ops.ops
end)
()
let run_in_hook () =
(* execute actions sent from other threads; first transfer them
all atomically to a local queue to reduce contention *) all atomically to a local queue to reduce contention *)
let local_acts = Queue.create () in let local_acts = Queue.create () in
Mutex.lock st.mutex; Mutex.lock Scheduler_state.st.mutex;
Queue.transfer st.actions_from_other_threads local_acts; Queue.transfer Scheduler_state.st.actions_from_other_threads local_acts;
Atomic.set st.has_notified false; Atomic.set Scheduler_state.st.has_notified false;
Mutex.unlock st.mutex; Mutex.unlock Scheduler_state.st.mutex;
Queue.iter (fun f -> f ()) local_acts; Queue.iter (fun f -> f ()) local_acts;
(* run tasks *) (* run tasks *)
FG.run ~max_tasks:1000 (); FG.run ~max_tasks:1000 ();
if not (Queue.is_empty st.tasks) then ignore (Lwt.pause () : unit Lwt.t); if not (Queue.is_empty Scheduler_state.st.tasks) then
() ignore (Lwt.pause () : unit Lwt.t);
()
let setup () = let is_setup_ = Atomic.make false
let setup () =
if not (Atomic.exchange is_setup_ true) then (
(* only one thread does this *) (* only one thread does this *)
FG.setup ~block_signals:false (); FG.setup ~block_signals:false ();
Scheduler_state.st.enter_hook <-
st.thread <- Thread.self () |> Thread.id; Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook);
st.enter_hook <- Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook); Scheduler_state.st.leave_hook <-
st.leave_hook <- Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook); Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook);
(* notification used to wake lwt up *) (* notification used to wake lwt up *)
st.notification <- Lwt_unix.make_notification ~once:false run_in_hook Scheduler_state.st.notification <-
end Lwt_unix.make_notification ~once:false run_in_hook
)
let setup () : Scheduler_state.st =
let st = Scheduler_state.create_new () in
Ops.setup st;
let module Setup_lwt_hooks' = Setup_lwt_hooks (struct
let st = st
end) in
Setup_lwt_hooks'.setup ();
st
let[@inline] is_setup () = Option.is_some @@ Atomic.get Scheduler_state.cur_st
let spawn_lwt f : _ Lwt.t = let spawn_lwt f : _ Lwt.t =
let st = Main_state.get_st () in setup ();
let lwt_fut, lwt_prom = Lwt.wait () in let lwt_fut, lwt_prom = Lwt.wait () in
M.Runner.run_async st.as_runner (fun () -> M.Runner.run_async Scheduler_state.st.as_runner (fun () ->
try try
let x = f () in let x = f () in
Lwt.wakeup lwt_prom x Lwt.wakeup lwt_prom x
@ -298,13 +205,11 @@ let spawn_lwt f : _ Lwt.t =
lwt_fut lwt_fut
let lwt_main (f : _ -> 'a) : 'a = let lwt_main (f : _ -> 'a) : 'a =
let st = setup () in setup ();
(* make sure to cleanup *) Scheduler_state.st.thread <- Thread.self () |> Thread.id;
let finally () = Scheduler_state.cleanup st in let fut = spawn_lwt (fun () -> f Scheduler_state.st.as_runner) in
Fun.protect ~finally @@ fun () ->
let fut = spawn_lwt (fun () -> f st.as_runner) in
Lwt_main.run fut Lwt_main.run fut
let[@inline] lwt_main_runner () = let lwt_main_runner () =
let st = Main_state.get_st () in if not (Atomic.get is_setup_) then failwith "lwt_main_runner: not setup yet";
st.as_runner Scheduler_state.st.as_runner

View file

@ -18,22 +18,18 @@ val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t
val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t
(** [lwt_of_fut fut] makes a lwt future that completes when [fut] does. This (** [lwt_of_fut fut] makes a lwt future that completes when [fut] does. This
must be called from the Lwt thread, and the result must always be used only must be called from the Lwt thread, and the result must always be used only
from inside the Lwt thread. from inside the Lwt thread. *)
@raise Failure if not run from the lwt thread. *)
(** {2 Helpers on the moonpool side} *) (** {2 Helpers on the moonpool side} *)
val spawn_lwt : (unit -> 'a) -> 'a Lwt.t val spawn_lwt : (unit -> 'a) -> 'a Lwt.t
(** This spawns a task that runs in the Lwt scheduler. (** This spawns a task that runs in the Lwt scheduler *)
@raise Failure if {!lwt_main} was not called. *)
val await_lwt : 'a Lwt.t -> 'a val await_lwt : 'a Lwt.t -> 'a
(** [await_lwt fut] awaits a Lwt future from inside a task running on a moonpool (** [await_lwt fut] awaits a Lwt future from inside a task running on a moonpool
runner. This must be run from within a Moonpool runner so that the await-ing runner. This must be run from within a Moonpool runner so that the await-ing
effect is handled. *) effect is handled. *)
val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a
(** {2 Wrappers around Lwt_main} *) (** {2 Wrappers around Lwt_main} *)
val on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref val on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref
@ -43,7 +39,4 @@ val lwt_main : (Moonpool.Runner.t -> 'a) -> 'a
val lwt_main_runner : unit -> Moonpool.Runner.t val lwt_main_runner : unit -> Moonpool.Runner.t
(** The runner from {!lwt_main}. The runner is only going to work if {!lwt_main} (** The runner from {!lwt_main}. The runner is only going to work if {!lwt_main}
is currently running in some thread. is currently running in some thread. *)
@raise Failure if {!lwt_main} was not called. *)
val is_setup : unit -> bool

View file

@ -1,4 +1,4 @@
[@@@ocaml.deprecated "use Picos_std_sync directly or single threaded solutions"] [@@@ocaml.deprecated "use Picos_std_sync or single threaded solutions"]
module Mutex = Picos_std_sync.Mutex module Mutex = Picos_std_sync.Mutex
module Condition = Picos_std_sync.Condition module Condition = Picos_std_sync.Condition

View file

@ -101,17 +101,7 @@ let () =
in in
Arg.parse opts ignore "echo client"; Arg.parse opts ignore "echo client";
let main () = (* Lwt_engine.set @@ new Lwt_engine.libev (); *)
(* Lwt_engine.set @@ new Lwt_engine.libev (); *) M_lwt.lwt_main @@ fun _runner ->
M_lwt.lwt_main @@ fun _runner -> main ~port:!port ~n:!n ~n_conn:!n_conn ~verbose:!verbose
main ~port:!port ~n:!n ~n_conn:!n_conn ~verbose:!verbose ~msg_per_conn:!msg_per_conn ()
~msg_per_conn:!msg_per_conn ()
in
print_endline "first run";
main ();
assert (not (M_lwt.is_setup ()));
print_endline "second run";
main ();
assert (not (M_lwt.is_setup ()));
print_endline "done"

View file

@ -11,7 +11,7 @@ let str_of_sockaddr = function
| Unix.ADDR_INET (addr, port) -> | Unix.ADDR_INET (addr, port) ->
spf "%s:%d" (Unix.string_of_inet_addr addr) port spf "%s:%d" (Unix.string_of_inet_addr addr) port
let main ~port ~verbose ~runner:_ () : unit = let main ~port ~verbose ~runner:_ () : unit Lwt.t =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
let lwt_fut, _lwt_prom = Lwt.wait () in let lwt_fut, _lwt_prom = Lwt.wait () in
@ -54,7 +54,7 @@ let main ~port ~verbose ~runner:_ () : unit =
Lwt_io.establish_server_with_client_address addr handle_client |> await_lwt Lwt_io.establish_server_with_client_address addr handle_client |> await_lwt
in in
M_lwt.await_lwt lwt_fut lwt_fut
let () = let () =
let@ () = Trace_tef.with_setup () in let@ () = Trace_tef.with_setup () in
@ -75,4 +75,4 @@ let () =
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
(* Lwt_engine.set @@ new Lwt_engine.libev (); *) (* Lwt_engine.set @@ new Lwt_engine.libev (); *)
M_lwt.lwt_main @@ fun _ -> main ~runner ~port:!port ~verbose:!verbose () Lwt_main.run @@ main ~runner ~port:!port ~verbose:!verbose ()

View file

@ -2,21 +2,7 @@ run echo server on port=12346
listening on port 12346 listening on port 12346
run echo client -p 12346 -n 10 --n-conn=2 -v run echo client -p 12346 -n 10 --n-conn=2 -v
all done all done
all done
connecting to port 12346 (10 msg per conn, 10 conns total, 2 max at a time) connecting to port 12346 (10 msg per conn, 10 conns total, 2 max at a time)
connecting to port 12346 (10 msg per conn, 10 conns total, 2 max at a time)
done
first run
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1 read: hello 1
read: hello 1 read: hello 1
read: hello 1 read: hello 1
@ -37,26 +23,6 @@ read: hello 10
read: hello 10 read: hello 10
read: hello 10 read: hello 10
read: hello 10 read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2 read: hello 2
read: hello 2 read: hello 2
read: hello 2 read: hello 2
@ -77,26 +43,6 @@ read: hello 3
read: hello 3 read: hello 3
read: hello 3 read: hello 3
read: hello 3 read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4 read: hello 4
read: hello 4 read: hello 4
read: hello 4 read: hello 4
@ -117,26 +63,6 @@ read: hello 5
read: hello 5 read: hello 5
read: hello 5 read: hello 5
read: hello 5 read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6 read: hello 6
read: hello 6 read: hello 6
read: hello 6 read: hello 6
@ -157,26 +83,6 @@ read: hello 7
read: hello 7 read: hello 7
read: hello 7 read: hello 7
read: hello 7 read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8 read: hello 8
read: hello 8 read: hello 8
read: hello 8 read: hello 8
@ -197,14 +103,3 @@ read: hello 9
read: hello 9 read: hello 9
read: hello 9 read: hello 9
read: hello 9 read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
second run