This commit is contained in:
Simon Cruanes 2025-07-09 17:43:03 -04:00
parent bf90c32c86
commit 295f22e770
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 40 additions and 33 deletions

View file

@ -1,6 +1,7 @@
open Common_ open Common_
open struct open struct
module BQ = Moonpool.Blocking_queue
module WL = Moonpool.Private.Worker_loop_ module WL = Moonpool.Private.Worker_loop_
module M = Moonpool module M = Moonpool
end end
@ -9,33 +10,35 @@ module Fut = Moonpool.Fut
let default_around_task_ : WL.around_task = AT_pair (ignore, fun _ _ -> ()) let default_around_task_ : WL.around_task = AT_pair (ignore, fun _ _ -> ())
module Scheduler_state = struct let on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref =
module BQ = Moonpool.Blocking_queue ref (fun ebt ->
Printf.eprintf "uncaught exception in moonpool-lwt:\n%s" (Exn_bt.show ebt))
module Scheduler_state = struct
type st = { type st = {
tasks: WL.task_full BQ.t; tasks: WL.task_full BQ.t;
on_exn: Exn_bt.t -> unit;
mutable as_runner: Moonpool.Runner.t; mutable as_runner: Moonpool.Runner.t;
mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option; mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option;
mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option; mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option;
} }
let create ~on_exn () : st = let st : st =
{ {
tasks = BQ.create (); tasks = BQ.create ();
on_exn;
as_runner = Moonpool.Runner.dummy; as_runner = Moonpool.Runner.dummy;
enter_hook = None; enter_hook = None;
leave_hook = None; leave_hook = None;
} }
let around_task _ = default_around_task_ let around_task _ = default_around_task_
(* FIXME: need to wakeup lwt if needed! *)
let schedule (self : st) t = BQ.push self.tasks t let schedule (self : st) t = BQ.push self.tasks t
let get_next_task (self : st) = let get_next_task (self : st) =
try BQ.pop self.tasks with BQ.Closed -> raise WL.No_more_tasks try BQ.pop self.tasks with BQ.Closed -> raise WL.No_more_tasks
let on_exn (self : st) ebt = self.on_exn ebt let on_exn _ ebt = !on_uncaught_exn ebt
let runner self = self.as_runner let runner self = self.as_runner
let as_runner (self : st) : Moonpool.Runner.t = let as_runner (self : st) : Moonpool.Runner.t =
@ -67,6 +70,10 @@ module Scheduler_state = struct
} }
end end
open struct
module FG = WL.Fine_grained (Scheduler_state) ()
end
let _dummy_exn_bt : Exn_bt.t = let _dummy_exn_bt : Exn_bt.t =
Exn_bt.get_callstack 0 (Failure "dummy Exn_bt from moonpool-lwt") Exn_bt.get_callstack 0 (Failure "dummy Exn_bt from moonpool-lwt")
@ -102,34 +109,34 @@ let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
M.Fut.fulfill prom (Error (Exn_bt.make exn bt))); M.Fut.fulfill prom (Error (Exn_bt.make exn bt)));
fut fut
let on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref = let run_in_hook () =
ref (fun ebt -> FG.run ~max_tasks:1000 ();
Printf.eprintf "uncaught exception in moonpool-lwt:\n%s" (Exn_bt.show ebt)) if BQ.size Scheduler_state.st.tasks > 0 then
ignore (Lwt.pause () : unit Lwt.t)
let runner_ : Moonpool.Runner.t option ref = ref None let is_setup_ = ref false
let setup () = let setup () =
match !runner_ with if not !is_setup_ then (
| Some r -> r is_setup_ := true;
| None ->
let on_exn ebt = !on_uncaught_exn ebt in
let module Arg = struct
type nonrec st = Scheduler_state.st
let ops = Scheduler_state.ops
let st = Scheduler_state.create ~on_exn ()
end in
let module FG = WL.Fine_grained (Arg) () in
runner_ := Some Arg.st.as_runner;
FG.setup ~block_signals:false (); FG.setup ~block_signals:false ();
let run_in_hook () = FG.run ~max_tasks:1000 () in Scheduler_state.st.enter_hook <-
Arg.st.enter_hook <- Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook); Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook);
Arg.st.leave_hook <- Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook); Scheduler_state.st.leave_hook <-
Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook)
)
Arg.st.as_runner let spawn_lwt f : _ Lwt.t =
setup ();
let lwt_fut, lwt_prom = Lwt.wait () in
M.Runner.run_async Scheduler_state.st.as_runner (fun () ->
try
let x = f () in
Lwt.wakeup lwt_prom x
with exn -> Lwt.wakeup_exn lwt_prom exn);
lwt_fut
let lwt_main (f : _ -> 'a) : 'a = let lwt_main (f : _ -> 'a) : 'a =
let runner = setup () in setup ();
let fut = M.spawn ~on:runner (fun () -> f runner) in let fut = spawn_lwt (fun () -> f Scheduler_state.st.as_runner) in
Lwt_main.run (lwt_of_fut fut) Lwt_main.run fut

View file

@ -22,6 +22,9 @@ val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t
(** {2 Helpers on the moonpool side} *) (** {2 Helpers on the moonpool side} *)
val spawn_lwt : (unit -> 'a) -> 'a Lwt.t
(** This spawns a task that runs in the Lwt scheduler *)
val await_lwt : 'a Lwt.t -> 'a val await_lwt : 'a Lwt.t -> 'a
(** [await_lwt fut] awaits a Lwt future from inside a task running on a moonpool (** [await_lwt fut] awaits a Lwt future from inside a task running on a moonpool
runner. This must be run from within a Moonpool runner so that the await-ing runner. This must be run from within a Moonpool runner so that the await-ing
@ -31,8 +34,5 @@ val await_lwt : 'a Lwt.t -> 'a
val on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref val on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref
val setup : unit -> Moonpool.Runner.t
(** Install hooks in Lwt_main to run the scheduler *)
val lwt_main : (Moonpool.Runner.t -> 'a) -> 'a val lwt_main : (Moonpool.Runner.t -> 'a) -> 'a
(** Setup, run lwt main, return the result *) (** Setup, run lwt main, return the result *)