From d248a569f6deaf7554be1f48837cdd913de004f4 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 8 Feb 2024 21:02:19 -0500 Subject: [PATCH] feat: progress on moonpool-lwt --- src/lwt/cancel_handle.ml | 12 -- src/lwt/common_.ml | 4 - src/lwt/interop.ml | 153 -------------------------- src/lwt/moonpool_lwt.ml | 230 ++++++++++++++++++++++++++++++++++++++- src/lwt/moonpool_lwt.mli | 30 ++++- 5 files changed, 257 insertions(+), 172 deletions(-) delete mode 100644 src/lwt/cancel_handle.ml delete mode 100644 src/lwt/interop.ml diff --git a/src/lwt/cancel_handle.ml b/src/lwt/cancel_handle.ml deleted file mode 100644 index 821be865..00000000 --- a/src/lwt/cancel_handle.ml +++ /dev/null @@ -1,12 +0,0 @@ -(** Cancellation handle. *) - -type t = { cancel: unit -> unit } [@@unboxed] -(** A handle to cancel atomic actions (waiting on something), or - stopping a subscription to some event. *) - -(** Perform the cancellation. This should be idempotent. *) -let[@inline] cancel self = self.cancel () - -(** Dummy that cancels nothing *) -let dummy : t = { cancel = ignore } - diff --git a/src/lwt/common_.ml b/src/lwt/common_.ml index 5606f2c2..2d46076a 100644 --- a/src/lwt/common_.ml +++ b/src/lwt/common_.ml @@ -2,7 +2,3 @@ module M = Moonpool module Exn_bt = M.Exn_bt let ( let@ ) = ( @@ ) - -let[@inline] cancel_handle_of_event (ev : Lwt_engine.event) : Cancel_handle.t = - let cancel () = Lwt_engine.stop_event ev in - { Cancel_handle.cancel } diff --git a/src/lwt/interop.ml b/src/lwt/interop.ml deleted file mode 100644 index 8eed3807..00000000 --- a/src/lwt/interop.ml +++ /dev/null @@ -1,153 +0,0 @@ -open Common_ -module Cancel_handle = Cancel_handle -module Fiber = Moonpool_fib.Fiber -module FLS = Moonpool_fib.Fls - -(** Action scheduled from outside the loop *) -module Action = struct - type event = Lwt_engine.event - type cb = event -> unit - - (** Action that we ask the lwt loop to perform, from the outside *) - type t = - | Wait_readable of Unix.file_descr * cb - | Wait_writable of Unix.file_descr * cb - | Sleep of float * bool * cb - | Cancel of event - | On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t - | Wakeup : 'a Lwt.u * 'a -> t - | Wakeup_exn : _ Lwt.u * exn -> t - | Other of (unit -> unit) - - (** Perform the action from within the Lwt thread *) - let perform (self : t) : unit = - match self with - | Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event) - | Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event) - | Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event) - | Cancel ev -> Lwt_engine.stop_event ev - | On_termination (fut, f) -> - Lwt.on_any fut - (fun x -> f @@ Ok x) - (fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn)) - | Wakeup (prom, x) -> Lwt.wakeup prom x - | Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e - | Other f -> f () -end - -module Action_queue = struct - type t = { q: Action.t list Atomic.t } [@@unboxed] - - let create () : t = { q = Atomic.make [] } - let pop_all (self : t) : _ list = Atomic.exchange self.q [] - - (** Push the action and return whether the queue was previously empty *) - let push (self : t) (a : Action.t) : bool = - let is_first = ref true in - while - let old = Atomic.get self.q in - if Atomic.compare_and_set self.q old (a :: old) then ( - is_first := old = []; - false - ) else - true - do - () - done; - !is_first -end - -module Perform_action_in_lwt = struct - open struct - let actions_ : Action_queue.t = Action_queue.create () - - (** Gets the current set of notifications and perform them from inside the - Lwt thread *) - let perform_pending_actions () : unit = - let l = Action_queue.pop_all actions_ in - List.iter Action.perform l - - let notification : int = - Lwt_unix.make_notification ~once:false perform_pending_actions - end - - let schedule (a : Action.t) : unit = - let is_first = Action_queue.push actions_ a in - if is_first then Lwt_unix.send_notification notification -end - -let get_runner () : M.Runner.t = - match M.Runner.get_current_runner () with - | Some r -> r - | None -> failwith "Moonpool_lwt.get_runner: not inside a runner" - -let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = - match Lwt.poll lwt_fut with - | Some x -> M.Fut.return x - | None -> - let fut, prom = M.Fut.make () in - Lwt.on_any lwt_fut - (fun x -> M.Fut.fulfill prom (Ok x)) - (fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10))); - fut - -let await_lwt (fut : _ Lwt.t) = - match Lwt.poll fut with - | Some x -> x - | None -> - (* suspend fiber, wake it up when [fut] resolves *) - M.Private.Suspend_.suspend - { - handle = - (fun ~ls ~run:_ ~resume sus -> - let on_lwt_done _ = resume ~ls sus @@ Ok () in - Perform_action_in_lwt.( - schedule Action.(On_termination (fut, on_lwt_done)))); - }; - - (match Lwt.poll fut with - | Some x -> x - | None -> assert false) - -let run_in_lwt f : _ M.Fut.t = - let fut, prom = M.Fut.make () in - Perform_action_in_lwt.schedule - (Action.Other - (fun () -> - let lwt_fut = f () in - Lwt.on_any lwt_fut - (fun x -> M.Fut.fulfill prom @@ Ok x) - (fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn)))); - fut - -let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f - -let spawn_as_lwt ?name (f : unit -> 'a) : 'a Lwt.t = - let fut, promise = Lwt.wait () in - let _fib = - Fiber.spawn_top ?name (fun () -> - try - let x = f () in - Perform_action_in_lwt.schedule (Action.Wakeup (promise, x)) - with exn -> - Perform_action_in_lwt.schedule (Action.Wakeup_exn (promise, exn))) - in - fut - -let main_with_runner ~runner (f : unit -> 'a) : 'a = - let lwt_fut, lwt_prom = Lwt.wait () in - - let _fiber = - Fiber.spawn_top ~name:"Moonpool_lwt.main" ~on:runner (fun () -> - try - let x = f () in - Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x)) - with exn -> - Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn))) - in - - Lwt_main.run lwt_fut - -let main f = - let@ runner = M.Ws_pool.with_ () in - main_with_runner ~runner f diff --git a/src/lwt/moonpool_lwt.ml b/src/lwt/moonpool_lwt.ml index 4db347f9..0637c738 100644 --- a/src/lwt/moonpool_lwt.ml +++ b/src/lwt/moonpool_lwt.ml @@ -1 +1,229 @@ -include Interop +open Common_ +module Fiber = Moonpool_fib.Fiber +module FLS = Moonpool_fib.Fls + +(** Action scheduled from outside the loop *) +module Action = struct + type event = Lwt_engine.event + type cb = event -> unit + + (** Action that we ask the lwt loop to perform, from the outside *) + type t = + | Wait_readable of Unix.file_descr * cb + | Wait_writable of Unix.file_descr * cb + | Sleep of float * bool * cb + (* TODO: provide actions with cancellation, alongside a "select" operation *) + (* | Cancel of event *) + | On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t + | Wakeup : 'a Lwt.u * 'a -> t + | Wakeup_exn : _ Lwt.u * exn -> t + | Other of (unit -> unit) + + (** Perform the action from within the Lwt thread *) + let perform (self : t) : unit = + match self with + | Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event) + | Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event) + | Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event) + (* | Cancel ev -> Lwt_engine.stop_event ev *) + | On_termination (fut, f) -> + Lwt.on_any fut + (fun x -> f @@ Ok x) + (fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn)) + | Wakeup (prom, x) -> Lwt.wakeup prom x + | Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e + | Other f -> f () +end + +module Action_queue = struct + type t = { q: Action.t list Atomic.t } [@@unboxed] + + let create () : t = { q = Atomic.make [] } + let pop_all (self : t) : _ list = Atomic.exchange self.q [] + + (** Push the action and return whether the queue was previously empty *) + let push (self : t) (a : Action.t) : bool = + let is_first = ref true in + while + let old = Atomic.get self.q in + if Atomic.compare_and_set self.q old (a :: old) then ( + is_first := old = []; + false + ) else + true + do + () + done; + !is_first +end + +module Perform_action_in_lwt = struct + open struct + let actions_ : Action_queue.t = Action_queue.create () + + (** Gets the current set of notifications and perform them from inside the + Lwt thread *) + let perform_pending_actions () : unit = + let l = Action_queue.pop_all actions_ in + List.iter Action.perform l + + let notification : int = + Lwt_unix.make_notification ~once:false perform_pending_actions + end + + let schedule (a : Action.t) : unit = + let is_first = Action_queue.push actions_ a in + if is_first then Lwt_unix.send_notification notification +end + +let get_runner () : M.Runner.t = + match M.Runner.get_current_runner () with + | Some r -> r + | None -> failwith "Moonpool_lwt.get_runner: not inside a runner" + +let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t = + let lwt_fut, lwt_prom = Lwt.wait () in + M.Fut.on_result fut (function + | Ok x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (lwt_prom, x) + | Error (exn, _) -> + Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (lwt_prom, exn)); + lwt_fut + +let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = + match Lwt.poll lwt_fut with + | Some x -> M.Fut.return x + | None -> + let fut, prom = M.Fut.make () in + Lwt.on_any lwt_fut + (fun x -> M.Fut.fulfill prom (Ok x)) + (fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10))); + fut + +let await_lwt (fut : _ Lwt.t) = + match Lwt.poll fut with + | Some x -> x + | None -> + (* suspend fiber, wake it up when [fut] resolves *) + M.Private.Suspend_.suspend + { + handle = + (fun ~ls ~run:_ ~resume sus -> + let on_lwt_done _ = resume ~ls sus @@ Ok () in + Perform_action_in_lwt.( + schedule Action.(On_termination (fut, on_lwt_done)))); + }; + + (match Lwt.poll fut with + | Some x -> x + | None -> assert false) + +let run_in_lwt f : _ M.Fut.t = + let fut, prom = M.Fut.make () in + Perform_action_in_lwt.schedule + (Action.Other + (fun () -> + let lwt_fut = f () in + Lwt.on_any lwt_fut + (fun x -> M.Fut.fulfill prom @@ Ok x) + (fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn)))); + fut + +let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f + +let detach_in_runner ~runner f : _ Lwt.t = + let fut, promise = Lwt.wait () in + M.Runner.run_async runner (fun () -> + match f () with + | x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (promise, x) + | exception exn -> + Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn)); + fut + +let main_with_runner ~runner (f : unit -> 'a) : 'a = + let lwt_fut, lwt_prom = Lwt.wait () in + + let _fiber = + Fiber.spawn_top ~name:"Moonpool_lwt.main" ~on:runner (fun () -> + try + let x = f () in + Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x)) + with exn -> + Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn))) + in + + Lwt_main.run lwt_fut + +let main f = + let@ runner = M.Ws_pool.with_ () in + main_with_runner ~runner f + +module IO = struct + let rec read fd buf i len : int = + if len = 0 then + 0 + else ( + match Unix.read fd buf i len with + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + (* wait for FD to be ready *) + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~ls ~run:_ ~resume sus -> + Perform_action_in_lwt.schedule + @@ Action.Wait_readable + ( fd, + fun cancel -> + resume ~ls sus @@ Ok (); + Lwt_engine.stop_event cancel )); + }; + read fd buf i len + | n -> n + ) + + let rec write_once fd buf i len : int = + if len = 0 then + 0 + else ( + match Unix.write fd buf i len with + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + (* wait for FD to be ready *) + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~ls ~run:_ ~resume sus -> + Perform_action_in_lwt.schedule + @@ Action.Wait_writable + ( fd, + fun cancel -> + resume ~ls sus @@ Ok (); + Lwt_engine.stop_event cancel )); + }; + write_once fd buf i len + | n -> n + ) + + let write fd buf i len : unit = + let i = ref i in + let len = ref len in + while !len > 0 do + let n = write_once fd buf !i !len in + i := !i + n; + len := !len - n + done + + (** Sleep for the given amount of seconds *) + let sleep_s (f : float) : unit = + if f > 0. then + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~ls ~run:_ ~resume sus -> + Perform_action_in_lwt.schedule + @@ Action.Sleep + ( f, + false, + fun cancel -> + resume ~ls sus @@ Ok (); + Lwt_engine.stop_event cancel )); + } +end diff --git a/src/lwt/moonpool_lwt.mli b/src/lwt/moonpool_lwt.mli index 49a90250..f7a5ec2c 100644 --- a/src/lwt/moonpool_lwt.mli +++ b/src/lwt/moonpool_lwt.mli @@ -1,14 +1,24 @@ (** Lwt_engine-based event loop for Moonpool *) -module Cancel_handle = Cancel_handle module Fiber = Moonpool_fib.Fiber module FLS = Moonpool_fib.Fls +(** {2 Basic conversions} *) + val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t +(** [fut_of_lwt lwt_fut] makes a thread-safe moonpool future that + completes when [lwt_fut] does *) + +val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t +(** [lwt_of_fut fut] makes a lwt future that completes when + [fut] does. The result should be used only from inside the + thread running [Lwt_main.run]. *) + +(** {2 Helpers on the moonpool side} *) val await_lwt : 'a Lwt.t -> 'a (** [await_lwt fut] awaits a Lwt future from inside a task running on - a moonpool runner. *) + a moonpool runner. This must be run from within moonpool. *) val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t (** [run_in_lwt f] runs [f()] from within the Lwt thread @@ -23,6 +33,22 @@ val get_runner : unit -> Moonpool.Runner.t Must be run from within a fiber. @raise Failure if not run within a fiber *) +module IO : sig + val read : Unix.file_descr -> bytes -> int -> int -> int + val write_once : Unix.file_descr -> bytes -> int -> int -> int + val write : Unix.file_descr -> bytes -> int -> int -> unit + val sleep_s : float -> unit +end + +(** {2 Helpers on the lwt side} *) + +val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t +(** [detach_in_runner ~runner f] runs [f] in the given moonpool runner, + and returns a lwt future. This must be run from within the thread + running [Lwt_main]. *) + +(** {2 Wrappers around Lwt_main} *) + val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a (** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside a fiber in [runner]. *)