mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-05 19:00:33 -05:00
feat lwt: proper wakeup; add lwt_main_runner
This commit is contained in:
parent
0fecde07fc
commit
6ae82f130a
2 changed files with 39 additions and 14 deletions
|
|
@ -42,6 +42,13 @@ module Scheduler_state = struct
|
||||||
notification = 0;
|
notification = 0;
|
||||||
has_notified = Atomic.make false;
|
has_notified = Atomic.make false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let add_action_from_another_thread_ (self : st) f : unit =
|
||||||
|
Mutex.lock st.mutex;
|
||||||
|
Queue.push f self.actions_from_other_threads;
|
||||||
|
Mutex.unlock st.mutex;
|
||||||
|
if not (Atomic.exchange self.has_notified true) then
|
||||||
|
Lwt_unix.send_notification self.notification
|
||||||
end
|
end
|
||||||
|
|
||||||
module Ops = struct
|
module Ops = struct
|
||||||
|
|
@ -49,18 +56,12 @@ module Ops = struct
|
||||||
|
|
||||||
let around_task _ = default_around_task_
|
let around_task _ = default_around_task_
|
||||||
|
|
||||||
let add_action_from_another_thread_ (self : st) f : unit =
|
|
||||||
Mutex.lock Scheduler_state.st.mutex;
|
|
||||||
Queue.push f self.actions_from_other_threads;
|
|
||||||
Mutex.unlock Scheduler_state.st.mutex;
|
|
||||||
if not (Atomic.exchange self.has_notified true) then
|
|
||||||
Lwt_unix.send_notification self.notification
|
|
||||||
|
|
||||||
let schedule (self : st) t =
|
let schedule (self : st) t =
|
||||||
if Thread.id (Thread.self ()) = self.thread then
|
if Thread.id (Thread.self ()) = self.thread then
|
||||||
Queue.push t self.tasks
|
Queue.push t self.tasks
|
||||||
else
|
else
|
||||||
add_action_from_another_thread_ self (fun () -> Queue.push t self.tasks)
|
Scheduler_state.add_action_from_another_thread_ self (fun () ->
|
||||||
|
Queue.push t self.tasks)
|
||||||
|
|
||||||
let get_next_task (self : st) =
|
let get_next_task (self : st) =
|
||||||
if self.closed then raise WL.No_more_tasks;
|
if self.closed then raise WL.No_more_tasks;
|
||||||
|
|
@ -129,11 +130,23 @@ let await_lwt (fut : _ Lwt.t) =
|
||||||
|
|
||||||
let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
|
let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
|
||||||
let lwt_fut, lwt_prom = Lwt.wait () in
|
let lwt_fut, lwt_prom = Lwt.wait () in
|
||||||
M.Fut.on_result fut (function
|
|
||||||
|
(* in lwt thread, resolve [lwt_fut] *)
|
||||||
|
let wakeup_using_res = function
|
||||||
| Ok x -> Lwt.wakeup lwt_prom x
|
| Ok x -> Lwt.wakeup lwt_prom x
|
||||||
| Error ebt ->
|
| Error ebt ->
|
||||||
let exn = Exn_bt.exn ebt in
|
let exn = Exn_bt.exn ebt in
|
||||||
Lwt.wakeup_exn lwt_prom exn);
|
Lwt.wakeup_exn lwt_prom exn
|
||||||
|
in
|
||||||
|
|
||||||
|
M.Fut.on_result fut (fun res ->
|
||||||
|
if Thread.id (Thread.self ()) = Scheduler_state.st.thread then
|
||||||
|
(* can safely wakeup from the lwt thread *)
|
||||||
|
wakeup_using_res res
|
||||||
|
else
|
||||||
|
Scheduler_state.add_action_from_another_thread_ Scheduler_state.st
|
||||||
|
(fun () -> wakeup_using_res res));
|
||||||
|
|
||||||
lwt_fut
|
lwt_fut
|
||||||
|
|
||||||
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
|
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
|
||||||
|
|
@ -149,11 +162,14 @@ let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
|
||||||
fut
|
fut
|
||||||
|
|
||||||
let run_in_hook () =
|
let run_in_hook () =
|
||||||
(* execute actions sent from other threads *)
|
(* execute actions sent from other threads; first transfer them
|
||||||
|
all atomically to a local queue to reduce contention *)
|
||||||
let local_acts = Queue.create () in
|
let local_acts = Queue.create () in
|
||||||
Mutex.lock Scheduler_state.st.mutex;
|
Mutex.lock Scheduler_state.st.mutex;
|
||||||
Queue.transfer Scheduler_state.st.actions_from_other_threads local_acts;
|
Queue.transfer Scheduler_state.st.actions_from_other_threads local_acts;
|
||||||
|
Atomic.set Scheduler_state.st.has_notified false;
|
||||||
Mutex.unlock Scheduler_state.st.mutex;
|
Mutex.unlock Scheduler_state.st.mutex;
|
||||||
|
|
||||||
Queue.iter (fun f -> f ()) local_acts;
|
Queue.iter (fun f -> f ()) local_acts;
|
||||||
|
|
||||||
(* run tasks *)
|
(* run tasks *)
|
||||||
|
|
@ -163,16 +179,17 @@ let run_in_hook () =
|
||||||
ignore (Lwt.pause () : unit Lwt.t);
|
ignore (Lwt.pause () : unit Lwt.t);
|
||||||
()
|
()
|
||||||
|
|
||||||
let is_setup_ = ref false
|
let is_setup_ = Atomic.make false
|
||||||
|
|
||||||
let setup () =
|
let setup () =
|
||||||
if not !is_setup_ then (
|
if not (Atomic.exchange is_setup_ true) then (
|
||||||
is_setup_ := true;
|
(* only one thread does this *)
|
||||||
FG.setup ~block_signals:false ();
|
FG.setup ~block_signals:false ();
|
||||||
Scheduler_state.st.enter_hook <-
|
Scheduler_state.st.enter_hook <-
|
||||||
Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook);
|
Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook);
|
||||||
Scheduler_state.st.leave_hook <-
|
Scheduler_state.st.leave_hook <-
|
||||||
Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook);
|
Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook);
|
||||||
|
(* notification used to wake lwt up *)
|
||||||
Scheduler_state.st.notification <-
|
Scheduler_state.st.notification <-
|
||||||
Lwt_unix.make_notification ~once:false run_in_hook
|
Lwt_unix.make_notification ~once:false run_in_hook
|
||||||
)
|
)
|
||||||
|
|
@ -192,3 +209,7 @@ let lwt_main (f : _ -> 'a) : 'a =
|
||||||
Scheduler_state.st.thread <- Thread.self () |> Thread.id;
|
Scheduler_state.st.thread <- Thread.self () |> Thread.id;
|
||||||
let fut = spawn_lwt (fun () -> f Scheduler_state.st.as_runner) in
|
let fut = spawn_lwt (fun () -> f Scheduler_state.st.as_runner) in
|
||||||
Lwt_main.run fut
|
Lwt_main.run fut
|
||||||
|
|
||||||
|
let lwt_main_runner () =
|
||||||
|
if not (Atomic.get is_setup_) then failwith "lwt_main_runner: not setup yet";
|
||||||
|
Scheduler_state.st.as_runner
|
||||||
|
|
|
||||||
|
|
@ -36,3 +36,7 @@ val on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref
|
||||||
|
|
||||||
val lwt_main : (Moonpool.Runner.t -> 'a) -> 'a
|
val lwt_main : (Moonpool.Runner.t -> 'a) -> 'a
|
||||||
(** Setup, run lwt main, return the result *)
|
(** Setup, run lwt main, return the result *)
|
||||||
|
|
||||||
|
val lwt_main_runner : unit -> Moonpool.Runner.t
|
||||||
|
(** The runner from {!lwt_main}. The runner is only going to work if {!lwt_main}
|
||||||
|
is currently running in some thread. *)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue