From 90850ae38c883aee5d186bfcfa7ff3ec6a74e679 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 6 Feb 2024 21:30:55 -0500 Subject: [PATCH] wip: moonpool lwt --- moonpool-lwt.opam | 32 ++++++++ src/lwt/interop.ml | 153 ++++++++++++++++++++++++++++++++++++++ src/lwt/moonpool_lwt.ml | 160 +--------------------------------------- 3 files changed, 186 insertions(+), 159 deletions(-) create mode 100644 moonpool-lwt.opam create mode 100644 src/lwt/interop.ml diff --git a/moonpool-lwt.opam b/moonpool-lwt.opam new file mode 100644 index 00000000..e2c9eb4f --- /dev/null +++ b/moonpool-lwt.opam @@ -0,0 +1,32 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +version: "0.5.1" +synopsis: "Event loop for moonpool based on Lwt-engine" +maintainer: ["Simon Cruanes"] +authors: ["Simon Cruanes"] +license: "MIT" +homepage: "https://github.com/c-cube/moonpool" +bug-reports: "https://github.com/c-cube/moonpool/issues" +depends: [ + "dune" {>= "3.0"} + "moonpool" {= version} + "ocaml" {>= "5.0"} + "lwt" + "base-unix" + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/c-cube/moonpool.git" diff --git a/src/lwt/interop.ml b/src/lwt/interop.ml new file mode 100644 index 00000000..8eed3807 --- /dev/null +++ b/src/lwt/interop.ml @@ -0,0 +1,153 @@ +open Common_ +module Cancel_handle = Cancel_handle +module Fiber = Moonpool_fib.Fiber +module FLS = Moonpool_fib.Fls + +(** Action scheduled from outside the loop *) +module Action = struct + type event = Lwt_engine.event + type cb = event -> unit + + (** Action that we ask the lwt loop to perform, from the outside *) + type t = + | Wait_readable of Unix.file_descr * cb + | Wait_writable of Unix.file_descr * cb + | Sleep of float * bool * cb + | Cancel of event + | On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t + | Wakeup : 'a Lwt.u * 'a -> t + | Wakeup_exn : _ Lwt.u * exn -> t + | Other of (unit -> unit) + + (** Perform the action from within the Lwt thread *) + let perform (self : t) : unit = + match self with + | Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event) + | Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event) + | Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event) + | Cancel ev -> Lwt_engine.stop_event ev + | On_termination (fut, f) -> + Lwt.on_any fut + (fun x -> f @@ Ok x) + (fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn)) + | Wakeup (prom, x) -> Lwt.wakeup prom x + | Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e + | Other f -> f () +end + +module Action_queue = struct + type t = { q: Action.t list Atomic.t } [@@unboxed] + + let create () : t = { q = Atomic.make [] } + let pop_all (self : t) : _ list = Atomic.exchange self.q [] + + (** Push the action and return whether the queue was previously empty *) + let push (self : t) (a : Action.t) : bool = + let is_first = ref true in + while + let old = Atomic.get self.q in + if Atomic.compare_and_set self.q old (a :: old) then ( + is_first := old = []; + false + ) else + true + do + () + done; + !is_first +end + +module Perform_action_in_lwt = struct + open struct + let actions_ : Action_queue.t = Action_queue.create () + + (** Gets the current set of notifications and perform them from inside the + Lwt thread *) + let perform_pending_actions () : unit = + let l = Action_queue.pop_all actions_ in + List.iter Action.perform l + + let notification : int = + Lwt_unix.make_notification ~once:false perform_pending_actions + end + + let schedule (a : Action.t) : unit = + let is_first = Action_queue.push actions_ a in + if is_first then Lwt_unix.send_notification notification +end + +let get_runner () : M.Runner.t = + match M.Runner.get_current_runner () with + | Some r -> r + | None -> failwith "Moonpool_lwt.get_runner: not inside a runner" + +let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = + match Lwt.poll lwt_fut with + | Some x -> M.Fut.return x + | None -> + let fut, prom = M.Fut.make () in + Lwt.on_any lwt_fut + (fun x -> M.Fut.fulfill prom (Ok x)) + (fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10))); + fut + +let await_lwt (fut : _ Lwt.t) = + match Lwt.poll fut with + | Some x -> x + | None -> + (* suspend fiber, wake it up when [fut] resolves *) + M.Private.Suspend_.suspend + { + handle = + (fun ~ls ~run:_ ~resume sus -> + let on_lwt_done _ = resume ~ls sus @@ Ok () in + Perform_action_in_lwt.( + schedule Action.(On_termination (fut, on_lwt_done)))); + }; + + (match Lwt.poll fut with + | Some x -> x + | None -> assert false) + +let run_in_lwt f : _ M.Fut.t = + let fut, prom = M.Fut.make () in + Perform_action_in_lwt.schedule + (Action.Other + (fun () -> + let lwt_fut = f () in + Lwt.on_any lwt_fut + (fun x -> M.Fut.fulfill prom @@ Ok x) + (fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn)))); + fut + +let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f + +let spawn_as_lwt ?name (f : unit -> 'a) : 'a Lwt.t = + let fut, promise = Lwt.wait () in + let _fib = + Fiber.spawn_top ?name (fun () -> + try + let x = f () in + Perform_action_in_lwt.schedule (Action.Wakeup (promise, x)) + with exn -> + Perform_action_in_lwt.schedule (Action.Wakeup_exn (promise, exn))) + in + fut + +let main_with_runner ~runner (f : unit -> 'a) : 'a = + let lwt_fut, lwt_prom = Lwt.wait () in + + let _fiber = + Fiber.spawn_top ~name:"Moonpool_lwt.main" ~on:runner (fun () -> + try + let x = f () in + Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x)) + with exn -> + Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn))) + in + + Lwt_main.run lwt_fut + +let main f = + let@ runner = M.Ws_pool.with_ () in + main_with_runner ~runner f diff --git a/src/lwt/moonpool_lwt.ml b/src/lwt/moonpool_lwt.ml index bc0ecddf..4db347f9 100644 --- a/src/lwt/moonpool_lwt.ml +++ b/src/lwt/moonpool_lwt.ml @@ -1,159 +1 @@ -open Common_ -module Cancel_handle = Cancel_handle -module Fiber = Moonpool_fib.Fiber -module FLS = Moonpool_fib.Fls - -open struct - let _pp_pending out engine = - Printf.fprintf out "readc=%d writec=%d timerc=%d" engine#readable_count - engine#writable_count engine#timer_count -end - -(** Action scheduled from outside the loop *) -module Action = struct - type event = Lwt_engine.event - type cb = event -> unit - - (** Action that we ask the lwt loop to perform, from the outside *) - type t = - | Wait_readable of Unix.file_descr * cb - | Wait_writable of Unix.file_descr * cb - | Sleep of float * bool * cb - | Cancel of event - | On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t - | Wakeup : 'a Lwt.u * 'a -> t - | Wakeup_exn : _ Lwt.u * exn -> t - | Other of (unit -> unit) - - (** Perform the action from within the Lwt thread *) - let perform (self : t) : unit = - match self with - | Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event) - | Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event) - | Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event) - | Cancel ev -> Lwt_engine.stop_event ev - | On_termination (fut, f) -> - Lwt.on_any fut - (fun x -> f @@ Ok x) - (fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn)) - | Wakeup (prom, x) -> Lwt.wakeup prom x - | Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e - | Other f -> f () -end - -module Action_queue = struct - type t = { q: Action.t list Atomic.t } [@@unboxed] - - let create () : t = { q = Atomic.make [] } - let pop_all (self : t) : _ list = Atomic.exchange self.q [] - - (** Push the action and return whether the queue was previously empty *) - let push (self : t) (a : Action.t) : bool = - let is_first = ref true in - while - let old = Atomic.get self.q in - if Atomic.compare_and_set self.q old (a :: old) then ( - is_first := old = []; - false - ) else - true - do - () - done; - !is_first -end - -module Perform_action_in_lwt = struct - open struct - let actions_ : Action_queue.t = Action_queue.create () - - (** Gets the current set of notifications and perform them from inside the - Lwt thread *) - let perform_pending_actions () : unit = - let l = Action_queue.pop_all actions_ in - List.iter Action.perform l - - let notification : int = - Lwt_unix.make_notification ~once:false perform_pending_actions - end - - let schedule (a : Action.t) : unit = - let is_first = Action_queue.push actions_ a in - if is_first then Lwt_unix.send_notification notification -end - -let get_runner () : M.Runner.t = - match M.Runner.get_current_runner () with - | Some r -> r - | None -> failwith "Moonpool_lwt.get_runner: not inside a runner" - -let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = - match Lwt.poll lwt_fut with - | Some x -> M.Fut.return x - | None -> - let fut, prom = M.Fut.make () in - Lwt.on_any lwt_fut - (fun x -> M.Fut.fulfill prom (Ok x)) - (fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10))); - fut - -let await_lwt (fut : _ Lwt.t) = - match Lwt.poll fut with - | Some x -> x - | None -> - (* suspend fiber, wake it up when [fut] resolves *) - M.Private.Suspend_.suspend - { - handle = - (fun ~ls ~run:_ ~resume sus -> - let on_lwt_done _ = resume ~ls sus @@ Ok () in - Perform_action_in_lwt.( - schedule Action.(On_termination (fut, on_lwt_done)))); - }; - - (match Lwt.poll fut with - | Some x -> x - | None -> assert false) - -let run_in_lwt f : _ M.Fut.t = - let fut, prom = M.Fut.make () in - Perform_action_in_lwt.schedule - (Action.Other - (fun () -> - let lwt_fut = f () in - Lwt.on_any lwt_fut - (fun x -> M.Fut.fulfill prom @@ Ok x) - (fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn)))); - fut - -let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f - -let spawn_as_lwt ?name (f : unit -> 'a) : 'a Lwt.t = - let fut, promise = Lwt.wait () in - let _fib = - Fiber.spawn_top ?name (fun () -> - try - let x = f () in - Perform_action_in_lwt.schedule (Action.Wakeup (promise, x)) - with exn -> - Perform_action_in_lwt.schedule (Action.Wakeup_exn (promise, exn))) - in - fut - -let main_with_runner ~runner (f : unit -> 'a) : 'a = - let lwt_fut, lwt_prom = Lwt.wait () in - - let _fiber = - Fiber.spawn_top ~name:"Moonpool_lwt.main" ~on:runner (fun () -> - try - let x = f () in - Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x)) - with exn -> - Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn))) - in - - Lwt_main.run lwt_fut - -let main f = - let@ runner = M.Ws_pool.with_ () in - main_with_runner ~runner f +include Interop