From fbc7679d053b27bfb502f0031342ae7eb4f82e06 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 3 Feb 2024 17:21:47 -0500 Subject: [PATCH] wip: add `moonpool-lwt` --- dune-project | 10 +++ src/lwt/cancel_handle.ml | 12 +++ src/lwt/common_.ml | 8 ++ src/lwt/dune | 8 ++ src/lwt/moonpool_lwt.ml | 159 +++++++++++++++++++++++++++++++++++++++ src/lwt/moonpool_lwt.mli | 31 ++++++++ 6 files changed, 228 insertions(+) create mode 100644 src/lwt/cancel_handle.ml create mode 100644 src/lwt/common_.ml create mode 100644 src/lwt/dune create mode 100644 src/lwt/moonpool_lwt.ml create mode 100644 src/lwt/moonpool_lwt.mli diff --git a/dune-project b/dune-project index 55cb93f1..666b394f 100644 --- a/dune-project +++ b/dune-project @@ -33,4 +33,14 @@ (tags (thread pool domain futures fork-join))) +(package + (name moonpool-lwt) + (synopsis "Event loop for moonpool based on Lwt-engine") + (depends + (moonpool (= :version)) + (ocaml (>= 5.0)) + lwt + base-unix + (odoc :with-doc))) + ; See the complete stanza docs at https://dune.readthedocs.io/en/stable/dune-files.html#dune-project diff --git a/src/lwt/cancel_handle.ml b/src/lwt/cancel_handle.ml new file mode 100644 index 00000000..821be865 --- /dev/null +++ b/src/lwt/cancel_handle.ml @@ -0,0 +1,12 @@ +(** Cancellation handle. *) + +type t = { cancel: unit -> unit } [@@unboxed] +(** A handle to cancel atomic actions (waiting on something), or + stopping a subscription to some event. *) + +(** Perform the cancellation. This should be idempotent. *) +let[@inline] cancel self = self.cancel () + +(** Dummy that cancels nothing *) +let dummy : t = { cancel = ignore } + diff --git a/src/lwt/common_.ml b/src/lwt/common_.ml new file mode 100644 index 00000000..5606f2c2 --- /dev/null +++ b/src/lwt/common_.ml @@ -0,0 +1,8 @@ +module M = Moonpool +module Exn_bt = M.Exn_bt + +let ( let@ ) = ( @@ ) + +let[@inline] cancel_handle_of_event (ev : Lwt_engine.event) : Cancel_handle.t = + let cancel () = Lwt_engine.stop_event ev in + { Cancel_handle.cancel } diff --git a/src/lwt/dune b/src/lwt/dune new file mode 100644 index 00000000..f3191c3c --- /dev/null +++ b/src/lwt/dune @@ -0,0 +1,8 @@ + +(library + (name moonpool_lwt) + (public_name moonpool-lwt) + (private_modules common_) + (enabled_if + (>= %{ocaml_version} 5.0)) + (libraries moonpool moonpool.fib lwt lwt.unix)) diff --git a/src/lwt/moonpool_lwt.ml b/src/lwt/moonpool_lwt.ml new file mode 100644 index 00000000..bc0ecddf --- /dev/null +++ b/src/lwt/moonpool_lwt.ml @@ -0,0 +1,159 @@ +open Common_ +module Cancel_handle = Cancel_handle +module Fiber = Moonpool_fib.Fiber +module FLS = Moonpool_fib.Fls + +open struct + let _pp_pending out engine = + Printf.fprintf out "readc=%d writec=%d timerc=%d" engine#readable_count + engine#writable_count engine#timer_count +end + +(** Action scheduled from outside the loop *) +module Action = struct + type event = Lwt_engine.event + type cb = event -> unit + + (** Action that we ask the lwt loop to perform, from the outside *) + type t = + | Wait_readable of Unix.file_descr * cb + | Wait_writable of Unix.file_descr * cb + | Sleep of float * bool * cb + | Cancel of event + | On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t + | Wakeup : 'a Lwt.u * 'a -> t + | Wakeup_exn : _ Lwt.u * exn -> t + | Other of (unit -> unit) + + (** Perform the action from within the Lwt thread *) + let perform (self : t) : unit = + match self with + | Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event) + | Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event) + | Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event) + | Cancel ev -> Lwt_engine.stop_event ev + | On_termination (fut, f) -> + Lwt.on_any fut + (fun x -> f @@ Ok x) + (fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn)) + | Wakeup (prom, x) -> Lwt.wakeup prom x + | Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e + | Other f -> f () +end + +module Action_queue = struct + type t = { q: Action.t list Atomic.t } [@@unboxed] + + let create () : t = { q = Atomic.make [] } + let pop_all (self : t) : _ list = Atomic.exchange self.q [] + + (** Push the action and return whether the queue was previously empty *) + let push (self : t) (a : Action.t) : bool = + let is_first = ref true in + while + let old = Atomic.get self.q in + if Atomic.compare_and_set self.q old (a :: old) then ( + is_first := old = []; + false + ) else + true + do + () + done; + !is_first +end + +module Perform_action_in_lwt = struct + open struct + let actions_ : Action_queue.t = Action_queue.create () + + (** Gets the current set of notifications and perform them from inside the + Lwt thread *) + let perform_pending_actions () : unit = + let l = Action_queue.pop_all actions_ in + List.iter Action.perform l + + let notification : int = + Lwt_unix.make_notification ~once:false perform_pending_actions + end + + let schedule (a : Action.t) : unit = + let is_first = Action_queue.push actions_ a in + if is_first then Lwt_unix.send_notification notification +end + +let get_runner () : M.Runner.t = + match M.Runner.get_current_runner () with + | Some r -> r + | None -> failwith "Moonpool_lwt.get_runner: not inside a runner" + +let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = + match Lwt.poll lwt_fut with + | Some x -> M.Fut.return x + | None -> + let fut, prom = M.Fut.make () in + Lwt.on_any lwt_fut + (fun x -> M.Fut.fulfill prom (Ok x)) + (fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10))); + fut + +let await_lwt (fut : _ Lwt.t) = + match Lwt.poll fut with + | Some x -> x + | None -> + (* suspend fiber, wake it up when [fut] resolves *) + M.Private.Suspend_.suspend + { + handle = + (fun ~ls ~run:_ ~resume sus -> + let on_lwt_done _ = resume ~ls sus @@ Ok () in + Perform_action_in_lwt.( + schedule Action.(On_termination (fut, on_lwt_done)))); + }; + + (match Lwt.poll fut with + | Some x -> x + | None -> assert false) + +let run_in_lwt f : _ M.Fut.t = + let fut, prom = M.Fut.make () in + Perform_action_in_lwt.schedule + (Action.Other + (fun () -> + let lwt_fut = f () in + Lwt.on_any lwt_fut + (fun x -> M.Fut.fulfill prom @@ Ok x) + (fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn)))); + fut + +let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f + +let spawn_as_lwt ?name (f : unit -> 'a) : 'a Lwt.t = + let fut, promise = Lwt.wait () in + let _fib = + Fiber.spawn_top ?name (fun () -> + try + let x = f () in + Perform_action_in_lwt.schedule (Action.Wakeup (promise, x)) + with exn -> + Perform_action_in_lwt.schedule (Action.Wakeup_exn (promise, exn))) + in + fut + +let main_with_runner ~runner (f : unit -> 'a) : 'a = + let lwt_fut, lwt_prom = Lwt.wait () in + + let _fiber = + Fiber.spawn_top ~name:"Moonpool_lwt.main" ~on:runner (fun () -> + try + let x = f () in + Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x)) + with exn -> + Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn))) + in + + Lwt_main.run lwt_fut + +let main f = + let@ runner = M.Ws_pool.with_ () in + main_with_runner ~runner f diff --git a/src/lwt/moonpool_lwt.mli b/src/lwt/moonpool_lwt.mli new file mode 100644 index 00000000..49a90250 --- /dev/null +++ b/src/lwt/moonpool_lwt.mli @@ -0,0 +1,31 @@ +(** Lwt_engine-based event loop for Moonpool *) + +module Cancel_handle = Cancel_handle +module Fiber = Moonpool_fib.Fiber +module FLS = Moonpool_fib.Fls + +val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t + +val await_lwt : 'a Lwt.t -> 'a +(** [await_lwt fut] awaits a Lwt future from inside a task running on + a moonpool runner. *) + +val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t +(** [run_in_lwt f] runs [f()] from within the Lwt thread + and returns a thread-safe future. *) + +val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a +(** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and + awaits its result. Must be run from inside a moonpool runner. *) + +val get_runner : unit -> Moonpool.Runner.t +(** Returns the runner from within which this is called. + Must be run from within a fiber. + @raise Failure if not run within a fiber *) + +val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a +(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside + a fiber in [runner]. *) + +val main : (unit -> 'a) -> 'a +(** Like {!main_with_runner} but with a default choice of runner. *)