From 6ae82f130adcb653ccb3edd574e59f7fb69afe4e Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 9 Jul 2025 22:48:48 -0400 Subject: [PATCH] feat lwt: proper wakeup; add lwt_main_runner --- src/lwt/moonpool_lwt.ml | 49 ++++++++++++++++++++++++++++------------ src/lwt/moonpool_lwt.mli | 4 ++++ 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/src/lwt/moonpool_lwt.ml b/src/lwt/moonpool_lwt.ml index 49d35afa..5b2039dd 100644 --- a/src/lwt/moonpool_lwt.ml +++ b/src/lwt/moonpool_lwt.ml @@ -42,6 +42,13 @@ module Scheduler_state = struct notification = 0; has_notified = Atomic.make false; } + + let add_action_from_another_thread_ (self : st) f : unit = + Mutex.lock st.mutex; + Queue.push f self.actions_from_other_threads; + Mutex.unlock st.mutex; + if not (Atomic.exchange self.has_notified true) then + Lwt_unix.send_notification self.notification end module Ops = struct @@ -49,18 +56,12 @@ module Ops = struct let around_task _ = default_around_task_ - let add_action_from_another_thread_ (self : st) f : unit = - Mutex.lock Scheduler_state.st.mutex; - Queue.push f self.actions_from_other_threads; - Mutex.unlock Scheduler_state.st.mutex; - if not (Atomic.exchange self.has_notified true) then - Lwt_unix.send_notification self.notification - let schedule (self : st) t = if Thread.id (Thread.self ()) = self.thread then Queue.push t self.tasks else - add_action_from_another_thread_ self (fun () -> Queue.push t self.tasks) + Scheduler_state.add_action_from_another_thread_ self (fun () -> + Queue.push t self.tasks) let get_next_task (self : st) = if self.closed then raise WL.No_more_tasks; @@ -129,11 +130,23 @@ let await_lwt (fut : _ Lwt.t) = let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t = let lwt_fut, lwt_prom = Lwt.wait () in - M.Fut.on_result fut (function + + (* in lwt thread, resolve [lwt_fut] *) + let wakeup_using_res = function | Ok x -> Lwt.wakeup lwt_prom x | Error ebt -> let exn = Exn_bt.exn ebt in - Lwt.wakeup_exn lwt_prom exn); + Lwt.wakeup_exn lwt_prom exn + in + + M.Fut.on_result fut (fun res -> + if Thread.id (Thread.self ()) = Scheduler_state.st.thread then + (* can safely wakeup from the lwt thread *) + wakeup_using_res res + else + Scheduler_state.add_action_from_another_thread_ Scheduler_state.st + (fun () -> wakeup_using_res res)); + lwt_fut let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = @@ -149,11 +162,14 @@ let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = fut let run_in_hook () = - (* execute actions sent from other threads *) + (* execute actions sent from other threads; first transfer them + all atomically to a local queue to reduce contention *) let local_acts = Queue.create () in Mutex.lock Scheduler_state.st.mutex; Queue.transfer Scheduler_state.st.actions_from_other_threads local_acts; + Atomic.set Scheduler_state.st.has_notified false; Mutex.unlock Scheduler_state.st.mutex; + Queue.iter (fun f -> f ()) local_acts; (* run tasks *) @@ -163,16 +179,17 @@ let run_in_hook () = ignore (Lwt.pause () : unit Lwt.t); () -let is_setup_ = ref false +let is_setup_ = Atomic.make false let setup () = - if not !is_setup_ then ( - is_setup_ := true; + if not (Atomic.exchange is_setup_ true) then ( + (* only one thread does this *) FG.setup ~block_signals:false (); Scheduler_state.st.enter_hook <- Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook); Scheduler_state.st.leave_hook <- Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook); + (* notification used to wake lwt up *) Scheduler_state.st.notification <- Lwt_unix.make_notification ~once:false run_in_hook ) @@ -192,3 +209,7 @@ let lwt_main (f : _ -> 'a) : 'a = Scheduler_state.st.thread <- Thread.self () |> Thread.id; let fut = spawn_lwt (fun () -> f Scheduler_state.st.as_runner) in Lwt_main.run fut + +let lwt_main_runner () = + if not (Atomic.get is_setup_) then failwith "lwt_main_runner: not setup yet"; + Scheduler_state.st.as_runner diff --git a/src/lwt/moonpool_lwt.mli b/src/lwt/moonpool_lwt.mli index 83a47128..79d9e61d 100644 --- a/src/lwt/moonpool_lwt.mli +++ b/src/lwt/moonpool_lwt.mli @@ -36,3 +36,7 @@ val on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref val lwt_main : (Moonpool.Runner.t -> 'a) -> 'a (** Setup, run lwt main, return the result *) + +val lwt_main_runner : unit -> Moonpool.Runner.t +(** The runner from {!lwt_main}. The runner is only going to work if {!lwt_main} + is currently running in some thread. *)