From 796c4f6f313afe4b2176a0941b107676096db9e8 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 9 Jul 2025 22:05:15 -0400 Subject: [PATCH] feat lwt: improvements --- src/lwt/common_.ml | 3 --- src/lwt/dune | 2 -- src/lwt/moonpool_lwt.ml | 39 ++++++++++++++++++++------------------- 3 files changed, 20 insertions(+), 24 deletions(-) delete mode 100644 src/lwt/common_.ml diff --git a/src/lwt/common_.ml b/src/lwt/common_.ml deleted file mode 100644 index aad8ac4c..00000000 --- a/src/lwt/common_.ml +++ /dev/null @@ -1,3 +0,0 @@ -module Exn_bt = Moonpool.Exn_bt - -let ( let@ ) = ( @@ ) diff --git a/src/lwt/dune b/src/lwt/dune index b03d03d6..c6bb5ab3 100644 --- a/src/lwt/dune +++ b/src/lwt/dune @@ -1,12 +1,10 @@ (library (name moonpool_lwt) (public_name moonpool-lwt) - (private_modules common_) (enabled_if (>= %{ocaml_version} 5.0)) (libraries (re_export moonpool) - (re_export moonpool.fib) picos (re_export lwt) lwt.unix)) diff --git a/src/lwt/moonpool_lwt.ml b/src/lwt/moonpool_lwt.ml index 29f73c6a..49d35afa 100644 --- a/src/lwt/moonpool_lwt.ml +++ b/src/lwt/moonpool_lwt.ml @@ -1,7 +1,6 @@ -open Common_ +module Exn_bt = Moonpool.Exn_bt open struct - module BQ = Moonpool.Blocking_queue module WL = Moonpool.Private.Worker_loop_ module M = Moonpool end @@ -17,8 +16,9 @@ let on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref = module Scheduler_state = struct type st = { tasks: WL.task_full Queue.t; - actions_from_other_threads: (unit -> unit) BQ.t; + actions_from_other_threads: (unit -> unit) Queue.t; (** Other threads ask us to run closures in the lwt thread *) + mutex: Mutex.t; mutable thread: int; mutable closed: bool; mutable as_runner: Moonpool.Runner.t; @@ -32,7 +32,8 @@ module Scheduler_state = struct let st : st = { tasks = Queue.create (); - actions_from_other_threads = BQ.create (); + actions_from_other_threads = Queue.create (); + mutex = Mutex.create (); thread = Thread.self () |> Thread.id; closed = false; as_runner = Moonpool.Runner.dummy; @@ -49,7 +50,9 @@ module Ops = struct let around_task _ = default_around_task_ let add_action_from_another_thread_ (self : st) f : unit = - BQ.push self.actions_from_other_threads f; + Mutex.lock Scheduler_state.st.mutex; + Queue.push f self.actions_from_other_threads; + Mutex.unlock Scheduler_state.st.mutex; if not (Atomic.exchange self.has_notified true) then Lwt_unix.send_notification self.notification @@ -109,19 +112,20 @@ open struct () end -let _dummy_exn_bt : Exn_bt.t = - Exn_bt.get_callstack 0 (Failure "dummy Exn_bt from moonpool-lwt") - let await_lwt (fut : _ Lwt.t) = - match Lwt.poll fut with - | Some x -> x - | None -> + match Lwt.state fut with + | Return x -> x + | Fail exn -> raise exn + | Sleep -> (* suspend fiber, wake it up when [fut] resolves *) let trigger = M.Trigger.create () in - let res = ref (Error _dummy_exn_bt) in Lwt.on_termination fut (fun _ -> M.Trigger.signal trigger); M.Trigger.await trigger |> Option.iter Exn_bt.raise; - Exn_bt.unwrap !res + + (match Lwt.state fut with + | Return x -> x + | Fail exn -> raise exn + | Sleep -> assert false) let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t = let lwt_fut, lwt_prom = Lwt.wait () in @@ -145,21 +149,18 @@ let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = fut let run_in_hook () = - Printf.eprintf "AT %s\n%!" __LOC__; - (* execute actions sent from other threads *) let local_acts = Queue.create () in - BQ.transfer Scheduler_state.st.actions_from_other_threads local_acts; + Mutex.lock Scheduler_state.st.mutex; + Queue.transfer Scheduler_state.st.actions_from_other_threads local_acts; + Mutex.unlock Scheduler_state.st.mutex; Queue.iter (fun f -> f ()) local_acts; (* run tasks *) - Printf.eprintf "AT %s\n%!" __LOC__; FG.run ~max_tasks:1000 (); - Printf.eprintf "AT %s\n%!" __LOC__; if not (Queue.is_empty Scheduler_state.st.tasks) then ignore (Lwt.pause () : unit Lwt.t); - Printf.eprintf "AT %s\n%!" __LOC__; () let is_setup_ = ref false