From 295f22e7708a942e092c1c802e7c653e718e69e6 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 9 Jul 2025 17:43:03 -0400 Subject: [PATCH] wip: lwt --- src/lwt/moonpool_lwt.ml | 67 ++++++++++++++++++++++------------------ src/lwt/moonpool_lwt.mli | 6 ++-- 2 files changed, 40 insertions(+), 33 deletions(-) diff --git a/src/lwt/moonpool_lwt.ml b/src/lwt/moonpool_lwt.ml index d669ad9c..e2a9e393 100644 --- a/src/lwt/moonpool_lwt.ml +++ b/src/lwt/moonpool_lwt.ml @@ -1,6 +1,7 @@ open Common_ open struct + module BQ = Moonpool.Blocking_queue module WL = Moonpool.Private.Worker_loop_ module M = Moonpool end @@ -9,33 +10,35 @@ module Fut = Moonpool.Fut let default_around_task_ : WL.around_task = AT_pair (ignore, fun _ _ -> ()) -module Scheduler_state = struct - module BQ = Moonpool.Blocking_queue +let on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref = + ref (fun ebt -> + Printf.eprintf "uncaught exception in moonpool-lwt:\n%s" (Exn_bt.show ebt)) +module Scheduler_state = struct type st = { tasks: WL.task_full BQ.t; - on_exn: Exn_bt.t -> unit; mutable as_runner: Moonpool.Runner.t; mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option; mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option; } - let create ~on_exn () : st = + let st : st = { tasks = BQ.create (); - on_exn; as_runner = Moonpool.Runner.dummy; enter_hook = None; leave_hook = None; } let around_task _ = default_around_task_ + + (* FIXME: need to wakeup lwt if needed! *) let schedule (self : st) t = BQ.push self.tasks t let get_next_task (self : st) = try BQ.pop self.tasks with BQ.Closed -> raise WL.No_more_tasks - let on_exn (self : st) ebt = self.on_exn ebt + let on_exn _ ebt = !on_uncaught_exn ebt let runner self = self.as_runner let as_runner (self : st) : Moonpool.Runner.t = @@ -67,6 +70,10 @@ module Scheduler_state = struct } end +open struct + module FG = WL.Fine_grained (Scheduler_state) () +end + let _dummy_exn_bt : Exn_bt.t = Exn_bt.get_callstack 0 (Failure "dummy Exn_bt from moonpool-lwt") @@ -102,34 +109,34 @@ let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = M.Fut.fulfill prom (Error (Exn_bt.make exn bt))); fut -let on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref = - ref (fun ebt -> - Printf.eprintf "uncaught exception in moonpool-lwt:\n%s" (Exn_bt.show ebt)) +let run_in_hook () = + FG.run ~max_tasks:1000 (); + if BQ.size Scheduler_state.st.tasks > 0 then + ignore (Lwt.pause () : unit Lwt.t) -let runner_ : Moonpool.Runner.t option ref = ref None +let is_setup_ = ref false let setup () = - match !runner_ with - | Some r -> r - | None -> - let on_exn ebt = !on_uncaught_exn ebt in - let module Arg = struct - type nonrec st = Scheduler_state.st - - let ops = Scheduler_state.ops - let st = Scheduler_state.create ~on_exn () - end in - let module FG = WL.Fine_grained (Arg) () in - runner_ := Some Arg.st.as_runner; - + if not !is_setup_ then ( + is_setup_ := true; FG.setup ~block_signals:false (); - let run_in_hook () = FG.run ~max_tasks:1000 () in - Arg.st.enter_hook <- Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook); - Arg.st.leave_hook <- Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook); + Scheduler_state.st.enter_hook <- + Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook); + Scheduler_state.st.leave_hook <- + Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook) + ) - Arg.st.as_runner +let spawn_lwt f : _ Lwt.t = + setup (); + let lwt_fut, lwt_prom = Lwt.wait () in + M.Runner.run_async Scheduler_state.st.as_runner (fun () -> + try + let x = f () in + Lwt.wakeup lwt_prom x + with exn -> Lwt.wakeup_exn lwt_prom exn); + lwt_fut let lwt_main (f : _ -> 'a) : 'a = - let runner = setup () in - let fut = M.spawn ~on:runner (fun () -> f runner) in - Lwt_main.run (lwt_of_fut fut) + setup (); + let fut = spawn_lwt (fun () -> f Scheduler_state.st.as_runner) in + Lwt_main.run fut diff --git a/src/lwt/moonpool_lwt.mli b/src/lwt/moonpool_lwt.mli index 29fe2add..83a47128 100644 --- a/src/lwt/moonpool_lwt.mli +++ b/src/lwt/moonpool_lwt.mli @@ -22,6 +22,9 @@ val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t (** {2 Helpers on the moonpool side} *) +val spawn_lwt : (unit -> 'a) -> 'a Lwt.t +(** This spawns a task that runs in the Lwt scheduler *) + val await_lwt : 'a Lwt.t -> 'a (** [await_lwt fut] awaits a Lwt future from inside a task running on a moonpool runner. This must be run from within a Moonpool runner so that the await-ing @@ -31,8 +34,5 @@ val await_lwt : 'a Lwt.t -> 'a val on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref -val setup : unit -> Moonpool.Runner.t -(** Install hooks in Lwt_main to run the scheduler *) - val lwt_main : (Moonpool.Runner.t -> 'a) -> 'a (** Setup, run lwt main, return the result *)