This commit is contained in:
Simon Cruanes 2025-07-09 18:31:24 -04:00
parent 72d8c09898
commit 6c4fb69d23
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -16,44 +16,72 @@ let on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref =
module Scheduler_state = struct module Scheduler_state = struct
type st = { type st = {
tasks: WL.task_full BQ.t; tasks: WL.task_full Queue.t;
actions_from_other_threads: (unit -> unit) BQ.t;
(** Other threads ask us to run closures in the lwt thread *)
mutable thread: int;
mutable closed: bool;
mutable as_runner: Moonpool.Runner.t; mutable as_runner: Moonpool.Runner.t;
mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option; mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option;
mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option; mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option;
mutable notification: int;
(** A lwt_unix notification to wake up the event loop *)
has_notified: bool Atomic.t;
} }
let st : st = let st : st =
{ {
tasks = BQ.create (); tasks = Queue.create ();
actions_from_other_threads = BQ.create ();
thread = Thread.self () |> Thread.id;
closed = false;
as_runner = Moonpool.Runner.dummy; as_runner = Moonpool.Runner.dummy;
enter_hook = None; enter_hook = None;
leave_hook = None; leave_hook = None;
notification = 0;
has_notified = Atomic.make false;
} }
end
module Ops = struct
type st = Scheduler_state.st
let around_task _ = default_around_task_ let around_task _ = default_around_task_
(* FIXME: need to wakeup lwt if needed! *) let add_action_from_another_thread_ (self : st) f : unit =
let schedule (self : st) t = BQ.push self.tasks t BQ.push self.actions_from_other_threads f;
if not (Atomic.exchange self.has_notified true) then
Lwt_unix.send_notification self.notification
let schedule (self : st) t =
if Thread.id (Thread.self ()) = self.thread then
Queue.push t self.tasks
else
add_action_from_another_thread_ self (fun () -> Queue.push t self.tasks)
let get_next_task (self : st) = let get_next_task (self : st) =
try BQ.pop self.tasks with BQ.Closed -> raise WL.No_more_tasks if self.closed then raise WL.No_more_tasks;
try Queue.pop self.tasks with Queue.Empty -> raise WL.No_more_tasks
let on_exn _ ebt = !on_uncaught_exn ebt let on_exn _ ebt = !on_uncaught_exn ebt
let runner self = self.as_runner let runner (self : st) = self.as_runner
let as_runner (self : st) : Moonpool.Runner.t = let as_runner (self : st) : Moonpool.Runner.t =
Moonpool.Runner.For_runner_implementors.create Moonpool.Runner.For_runner_implementors.create
~size:(fun () -> 1) ~size:(fun () -> 1)
~num_tasks:(fun () -> BQ.size self.tasks) ~num_tasks:(fun () ->
(* FIXME: thread safety. use an atomic?? *)
Queue.length self.tasks)
~run_async:(fun ~fiber f -> schedule self @@ WL.T_start { fiber; f }) ~run_async:(fun ~fiber f -> schedule self @@ WL.T_start { fiber; f })
~shutdown:(fun ~wait:_ () -> BQ.close self.tasks) ~shutdown:(fun ~wait:_ () -> self.closed <- true)
() ()
let before_start self : unit = let before_start (self : st) : unit =
self.as_runner <- as_runner self; self.as_runner <- as_runner self;
() ()
let cleanup self = let cleanup (self : st) =
self.closed <- true;
Option.iter Lwt_main.Enter_iter_hooks.remove self.enter_hook; Option.iter Lwt_main.Enter_iter_hooks.remove self.enter_hook;
Option.iter Lwt_main.Leave_iter_hooks.remove self.leave_hook; Option.iter Lwt_main.Leave_iter_hooks.remove self.leave_hook;
() ()
@ -71,7 +99,14 @@ module Scheduler_state = struct
end end
open struct open struct
module FG = WL.Fine_grained (Scheduler_state) () module FG =
WL.Fine_grained
(struct
include Scheduler_state
let ops = Ops.ops
end)
()
end end
let _dummy_exn_bt : Exn_bt.t = let _dummy_exn_bt : Exn_bt.t =
@ -110,9 +145,22 @@ let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
fut fut
let run_in_hook () = let run_in_hook () =
Printf.eprintf "AT %s\n%!" __LOC__;
(* execute actions sent from other threads *)
let local_acts = Queue.create () in
BQ.transfer Scheduler_state.st.actions_from_other_threads local_acts;
Queue.iter (fun f -> f ()) local_acts;
(* run tasks *)
Printf.eprintf "AT %s\n%!" __LOC__;
FG.run ~max_tasks:1000 (); FG.run ~max_tasks:1000 ();
if BQ.size Scheduler_state.st.tasks > 0 then Printf.eprintf "AT %s\n%!" __LOC__;
ignore (Lwt.pause () : unit Lwt.t)
if not (Queue.is_empty Scheduler_state.st.tasks) then
ignore (Lwt.pause () : unit Lwt.t);
Printf.eprintf "AT %s\n%!" __LOC__;
()
let is_setup_ = ref false let is_setup_ = ref false
@ -123,7 +171,9 @@ let setup () =
Scheduler_state.st.enter_hook <- Scheduler_state.st.enter_hook <-
Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook); Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook);
Scheduler_state.st.leave_hook <- Scheduler_state.st.leave_hook <-
Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook) Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook);
Scheduler_state.st.notification <-
Lwt_unix.make_notification ~once:false run_in_hook
) )
let spawn_lwt f : _ Lwt.t = let spawn_lwt f : _ Lwt.t =
@ -138,5 +188,6 @@ let spawn_lwt f : _ Lwt.t =
let lwt_main (f : _ -> 'a) : 'a = let lwt_main (f : _ -> 'a) : 'a =
setup (); setup ();
Scheduler_state.st.thread <- Thread.self () |> Thread.id;
let fut = spawn_lwt (fun () -> f Scheduler_state.st.as_runner) in let fut = spawn_lwt (fun () -> f Scheduler_state.st.as_runner) in
Lwt_main.run fut Lwt_main.run fut