From 6c4fb69d23b9020553de62a8f8c828f6fb192ed3 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 9 Jul 2025 18:31:24 -0400 Subject: [PATCH] wip: lwt --- src/lwt/moonpool_lwt.ml | 79 +++++++++++++++++++++++++++++++++-------- 1 file changed, 65 insertions(+), 14 deletions(-) diff --git a/src/lwt/moonpool_lwt.ml b/src/lwt/moonpool_lwt.ml index e2a9e393..29f73c6a 100644 --- a/src/lwt/moonpool_lwt.ml +++ b/src/lwt/moonpool_lwt.ml @@ -16,44 +16,72 @@ let on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref = module Scheduler_state = struct type st = { - tasks: WL.task_full BQ.t; + tasks: WL.task_full Queue.t; + actions_from_other_threads: (unit -> unit) BQ.t; + (** Other threads ask us to run closures in the lwt thread *) + mutable thread: int; + mutable closed: bool; mutable as_runner: Moonpool.Runner.t; mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option; mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option; + mutable notification: int; + (** A lwt_unix notification to wake up the event loop *) + has_notified: bool Atomic.t; } let st : st = { - tasks = BQ.create (); + tasks = Queue.create (); + actions_from_other_threads = BQ.create (); + thread = Thread.self () |> Thread.id; + closed = false; as_runner = Moonpool.Runner.dummy; enter_hook = None; leave_hook = None; + notification = 0; + has_notified = Atomic.make false; } +end + +module Ops = struct + type st = Scheduler_state.st let around_task _ = default_around_task_ - (* FIXME: need to wakeup lwt if needed! *) - let schedule (self : st) t = BQ.push self.tasks t + let add_action_from_another_thread_ (self : st) f : unit = + BQ.push self.actions_from_other_threads f; + if not (Atomic.exchange self.has_notified true) then + Lwt_unix.send_notification self.notification + + let schedule (self : st) t = + if Thread.id (Thread.self ()) = self.thread then + Queue.push t self.tasks + else + add_action_from_another_thread_ self (fun () -> Queue.push t self.tasks) let get_next_task (self : st) = - try BQ.pop self.tasks with BQ.Closed -> raise WL.No_more_tasks + if self.closed then raise WL.No_more_tasks; + try Queue.pop self.tasks with Queue.Empty -> raise WL.No_more_tasks let on_exn _ ebt = !on_uncaught_exn ebt - let runner self = self.as_runner + let runner (self : st) = self.as_runner let as_runner (self : st) : Moonpool.Runner.t = Moonpool.Runner.For_runner_implementors.create ~size:(fun () -> 1) - ~num_tasks:(fun () -> BQ.size self.tasks) + ~num_tasks:(fun () -> + (* FIXME: thread safety. use an atomic?? *) + Queue.length self.tasks) ~run_async:(fun ~fiber f -> schedule self @@ WL.T_start { fiber; f }) - ~shutdown:(fun ~wait:_ () -> BQ.close self.tasks) + ~shutdown:(fun ~wait:_ () -> self.closed <- true) () - let before_start self : unit = + let before_start (self : st) : unit = self.as_runner <- as_runner self; () - let cleanup self = + let cleanup (self : st) = + self.closed <- true; Option.iter Lwt_main.Enter_iter_hooks.remove self.enter_hook; Option.iter Lwt_main.Leave_iter_hooks.remove self.leave_hook; () @@ -71,7 +99,14 @@ module Scheduler_state = struct end open struct - module FG = WL.Fine_grained (Scheduler_state) () + module FG = + WL.Fine_grained + (struct + include Scheduler_state + + let ops = Ops.ops + end) + () end let _dummy_exn_bt : Exn_bt.t = @@ -110,9 +145,22 @@ let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = fut let run_in_hook () = + Printf.eprintf "AT %s\n%!" __LOC__; + + (* execute actions sent from other threads *) + let local_acts = Queue.create () in + BQ.transfer Scheduler_state.st.actions_from_other_threads local_acts; + Queue.iter (fun f -> f ()) local_acts; + + (* run tasks *) + Printf.eprintf "AT %s\n%!" __LOC__; FG.run ~max_tasks:1000 (); - if BQ.size Scheduler_state.st.tasks > 0 then - ignore (Lwt.pause () : unit Lwt.t) + Printf.eprintf "AT %s\n%!" __LOC__; + + if not (Queue.is_empty Scheduler_state.st.tasks) then + ignore (Lwt.pause () : unit Lwt.t); + Printf.eprintf "AT %s\n%!" __LOC__; + () let is_setup_ = ref false @@ -123,7 +171,9 @@ let setup () = Scheduler_state.st.enter_hook <- Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook); Scheduler_state.st.leave_hook <- - Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook) + Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook); + Scheduler_state.st.notification <- + Lwt_unix.make_notification ~once:false run_in_hook ) let spawn_lwt f : _ Lwt.t = @@ -138,5 +188,6 @@ let spawn_lwt f : _ Lwt.t = let lwt_main (f : _ -> 'a) : 'a = setup (); + Scheduler_state.st.thread <- Thread.self () |> Thread.id; let fut = spawn_lwt (fun () -> f Scheduler_state.st.as_runner) in Lwt_main.run fut