fix lwt: make sure to wakeup loop in main

there's a race condition where, by the time we schedule the
main fiber in `lwt_main`, the event loop is already asleep (maybe
from a previous run). We make sure to wake the loop up.
This commit is contained in:
Simon Cruanes 2025-09-05 12:54:20 -04:00
parent 01026fafaa
commit 86b64ae3d4
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -21,6 +21,7 @@ module Scheduler_state = struct
mutex: Mutex.t; mutex: Mutex.t;
mutable thread: int; mutable thread: int;
closed: bool Atomic.t; closed: bool Atomic.t;
cleanup_done: bool Atomic.t;
mutable as_runner: Moonpool.Runner.t; mutable as_runner: Moonpool.Runner.t;
mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option; mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option;
mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option; mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option;
@ -39,6 +40,7 @@ module Scheduler_state = struct
mutex = Mutex.create (); mutex = Mutex.create ();
thread = Thread.id (Thread.self ()); thread = Thread.id (Thread.self ());
closed = Atomic.make false; closed = Atomic.make false;
cleanup_done = Atomic.make false;
as_runner = Moonpool.Runner.dummy; as_runner = Moonpool.Runner.dummy;
enter_hook = None; enter_hook = None;
leave_hook = None; leave_hook = None;
@ -46,12 +48,15 @@ module Scheduler_state = struct
has_notified = Atomic.make false; has_notified = Atomic.make false;
} }
let[@inline] notify_ (self : st) : unit =
if not (Atomic.exchange self.has_notified true) then
Lwt_unix.send_notification self.notification
let[@inline never] add_action_from_another_thread_ (self : st) f : unit = let[@inline never] add_action_from_another_thread_ (self : st) f : unit =
Mutex.lock self.mutex; Mutex.lock self.mutex;
Queue.push f self.actions_from_other_threads; Queue.push f self.actions_from_other_threads;
Mutex.unlock self.mutex; Mutex.unlock self.mutex;
if not (Atomic.exchange self.has_notified true) then notify_ self
Lwt_unix.send_notification self.notification
let[@inline] on_lwt_thread_ (self : st) : bool = let[@inline] on_lwt_thread_ (self : st) : bool =
Thread.id (Thread.self ()) = self.thread Thread.id (Thread.self ()) = self.thread
@ -71,9 +76,12 @@ module Scheduler_state = struct
one!)"; one!)";
if not (on_lwt_thread_ st) then if not (on_lwt_thread_ st) then
failwith "moonpool-lwt: cleanup from the wrong thread"; failwith "moonpool-lwt: cleanup from the wrong thread";
Option.iter Lwt_main.Enter_iter_hooks.remove st.enter_hook; Atomic.set st.closed true;
Option.iter Lwt_main.Leave_iter_hooks.remove st.leave_hook; if not (Atomic.exchange st.cleanup_done true) then (
Lwt_unix.stop_notification st.notification; Option.iter Lwt_main.Enter_iter_hooks.remove st.enter_hook;
Option.iter Lwt_main.Leave_iter_hooks.remove st.leave_hook;
Lwt_unix.stop_notification st.notification
);
Atomic.set cur_st None Atomic.set cur_st None
| None -> failwith "moonpool-lwt: cleanup failed (no current active state)" | None -> failwith "moonpool-lwt: cleanup failed (no current active state)"
@ -304,6 +312,8 @@ let lwt_main (f : _ -> 'a) : 'a =
let finally () = Scheduler_state.cleanup st in let finally () = Scheduler_state.cleanup st in
Fun.protect ~finally @@ fun () -> Fun.protect ~finally @@ fun () ->
let fut = spawn_lwt (fun () -> f st.as_runner) in let fut = spawn_lwt (fun () -> f st.as_runner) in
(* make sure the scheduler isn't already sleeping *)
Scheduler_state.notify_ st;
Lwt_main.run fut Lwt_main.run fut
let[@inline] lwt_main_runner () = let[@inline] lwt_main_runner () =