feat lwt: improvements

This commit is contained in:
Simon Cruanes 2025-07-09 22:05:15 -04:00
parent f53dbe4dda
commit 796c4f6f31
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
3 changed files with 20 additions and 24 deletions

View file

@ -1,3 +0,0 @@
module Exn_bt = Moonpool.Exn_bt
let ( let@ ) = ( @@ )

View file

@ -1,12 +1,10 @@
(library (library
(name moonpool_lwt) (name moonpool_lwt)
(public_name moonpool-lwt) (public_name moonpool-lwt)
(private_modules common_)
(enabled_if (enabled_if
(>= %{ocaml_version} 5.0)) (>= %{ocaml_version} 5.0))
(libraries (libraries
(re_export moonpool) (re_export moonpool)
(re_export moonpool.fib)
picos picos
(re_export lwt) (re_export lwt)
lwt.unix)) lwt.unix))

View file

@ -1,7 +1,6 @@
open Common_ module Exn_bt = Moonpool.Exn_bt
open struct open struct
module BQ = Moonpool.Blocking_queue
module WL = Moonpool.Private.Worker_loop_ module WL = Moonpool.Private.Worker_loop_
module M = Moonpool module M = Moonpool
end end
@ -17,8 +16,9 @@ let on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref =
module Scheduler_state = struct module Scheduler_state = struct
type st = { type st = {
tasks: WL.task_full Queue.t; tasks: WL.task_full Queue.t;
actions_from_other_threads: (unit -> unit) BQ.t; actions_from_other_threads: (unit -> unit) Queue.t;
(** Other threads ask us to run closures in the lwt thread *) (** Other threads ask us to run closures in the lwt thread *)
mutex: Mutex.t;
mutable thread: int; mutable thread: int;
mutable closed: bool; mutable closed: bool;
mutable as_runner: Moonpool.Runner.t; mutable as_runner: Moonpool.Runner.t;
@ -32,7 +32,8 @@ module Scheduler_state = struct
let st : st = let st : st =
{ {
tasks = Queue.create (); tasks = Queue.create ();
actions_from_other_threads = BQ.create (); actions_from_other_threads = Queue.create ();
mutex = Mutex.create ();
thread = Thread.self () |> Thread.id; thread = Thread.self () |> Thread.id;
closed = false; closed = false;
as_runner = Moonpool.Runner.dummy; as_runner = Moonpool.Runner.dummy;
@ -49,7 +50,9 @@ module Ops = struct
let around_task _ = default_around_task_ let around_task _ = default_around_task_
let add_action_from_another_thread_ (self : st) f : unit = let add_action_from_another_thread_ (self : st) f : unit =
BQ.push self.actions_from_other_threads f; Mutex.lock Scheduler_state.st.mutex;
Queue.push f self.actions_from_other_threads;
Mutex.unlock Scheduler_state.st.mutex;
if not (Atomic.exchange self.has_notified true) then if not (Atomic.exchange self.has_notified true) then
Lwt_unix.send_notification self.notification Lwt_unix.send_notification self.notification
@ -109,19 +112,20 @@ open struct
() ()
end end
let _dummy_exn_bt : Exn_bt.t =
Exn_bt.get_callstack 0 (Failure "dummy Exn_bt from moonpool-lwt")
let await_lwt (fut : _ Lwt.t) = let await_lwt (fut : _ Lwt.t) =
match Lwt.poll fut with match Lwt.state fut with
| Some x -> x | Return x -> x
| None -> | Fail exn -> raise exn
| Sleep ->
(* suspend fiber, wake it up when [fut] resolves *) (* suspend fiber, wake it up when [fut] resolves *)
let trigger = M.Trigger.create () in let trigger = M.Trigger.create () in
let res = ref (Error _dummy_exn_bt) in
Lwt.on_termination fut (fun _ -> M.Trigger.signal trigger); Lwt.on_termination fut (fun _ -> M.Trigger.signal trigger);
M.Trigger.await trigger |> Option.iter Exn_bt.raise; M.Trigger.await trigger |> Option.iter Exn_bt.raise;
Exn_bt.unwrap !res
(match Lwt.state fut with
| Return x -> x
| Fail exn -> raise exn
| Sleep -> assert false)
let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t = let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
let lwt_fut, lwt_prom = Lwt.wait () in let lwt_fut, lwt_prom = Lwt.wait () in
@ -145,21 +149,18 @@ let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
fut fut
let run_in_hook () = let run_in_hook () =
Printf.eprintf "AT %s\n%!" __LOC__;
(* execute actions sent from other threads *) (* execute actions sent from other threads *)
let local_acts = Queue.create () in let local_acts = Queue.create () in
BQ.transfer Scheduler_state.st.actions_from_other_threads local_acts; Mutex.lock Scheduler_state.st.mutex;
Queue.transfer Scheduler_state.st.actions_from_other_threads local_acts;
Mutex.unlock Scheduler_state.st.mutex;
Queue.iter (fun f -> f ()) local_acts; Queue.iter (fun f -> f ()) local_acts;
(* run tasks *) (* run tasks *)
Printf.eprintf "AT %s\n%!" __LOC__;
FG.run ~max_tasks:1000 (); FG.run ~max_tasks:1000 ();
Printf.eprintf "AT %s\n%!" __LOC__;
if not (Queue.is_empty Scheduler_state.st.tasks) then if not (Queue.is_empty Scheduler_state.st.tasks) then
ignore (Lwt.pause () : unit Lwt.t); ignore (Lwt.pause () : unit Lwt.t);
Printf.eprintf "AT %s\n%!" __LOC__;
() ()
let is_setup_ = ref false let is_setup_ = ref false