feat: progress on moonpool-lwt

This commit is contained in:
Simon Cruanes 2024-02-08 21:02:19 -05:00
parent 90850ae38c
commit d248a569f6
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 257 additions and 172 deletions

View file

@ -1,12 +0,0 @@
(** Cancellation handle. *)
type t = { cancel: unit -> unit } [@@unboxed]
(** A handle to cancel atomic actions (waiting on something), or
stopping a subscription to some event. *)
(** Perform the cancellation. This should be idempotent. *)
let[@inline] cancel self = self.cancel ()
(** Dummy that cancels nothing *)
let dummy : t = { cancel = ignore }

View file

@ -2,7 +2,3 @@ module M = Moonpool
module Exn_bt = M.Exn_bt module Exn_bt = M.Exn_bt
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
let[@inline] cancel_handle_of_event (ev : Lwt_engine.event) : Cancel_handle.t =
let cancel () = Lwt_engine.stop_event ev in
{ Cancel_handle.cancel }

View file

@ -1,153 +0,0 @@
open Common_
module Cancel_handle = Cancel_handle
module Fiber = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls
(** Action scheduled from outside the loop *)
module Action = struct
type event = Lwt_engine.event
type cb = event -> unit
(** Action that we ask the lwt loop to perform, from the outside *)
type t =
| Wait_readable of Unix.file_descr * cb
| Wait_writable of Unix.file_descr * cb
| Sleep of float * bool * cb
| Cancel of event
| On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t
| Wakeup : 'a Lwt.u * 'a -> t
| Wakeup_exn : _ Lwt.u * exn -> t
| Other of (unit -> unit)
(** Perform the action from within the Lwt thread *)
let perform (self : t) : unit =
match self with
| Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event)
| Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event)
| Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event)
| Cancel ev -> Lwt_engine.stop_event ev
| On_termination (fut, f) ->
Lwt.on_any fut
(fun x -> f @@ Ok x)
(fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn))
| Wakeup (prom, x) -> Lwt.wakeup prom x
| Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e
| Other f -> f ()
end
module Action_queue = struct
type t = { q: Action.t list Atomic.t } [@@unboxed]
let create () : t = { q = Atomic.make [] }
let pop_all (self : t) : _ list = Atomic.exchange self.q []
(** Push the action and return whether the queue was previously empty *)
let push (self : t) (a : Action.t) : bool =
let is_first = ref true in
while
let old = Atomic.get self.q in
if Atomic.compare_and_set self.q old (a :: old) then (
is_first := old = [];
false
) else
true
do
()
done;
!is_first
end
module Perform_action_in_lwt = struct
open struct
let actions_ : Action_queue.t = Action_queue.create ()
(** Gets the current set of notifications and perform them from inside the
Lwt thread *)
let perform_pending_actions () : unit =
let l = Action_queue.pop_all actions_ in
List.iter Action.perform l
let notification : int =
Lwt_unix.make_notification ~once:false perform_pending_actions
end
let schedule (a : Action.t) : unit =
let is_first = Action_queue.push actions_ a in
if is_first then Lwt_unix.send_notification notification
end
let get_runner () : M.Runner.t =
match M.Runner.get_current_runner () with
| Some r -> r
| None -> failwith "Moonpool_lwt.get_runner: not inside a runner"
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
match Lwt.poll lwt_fut with
| Some x -> M.Fut.return x
| None ->
let fut, prom = M.Fut.make () in
Lwt.on_any lwt_fut
(fun x -> M.Fut.fulfill prom (Ok x))
(fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10)));
fut
let await_lwt (fut : _ Lwt.t) =
match Lwt.poll fut with
| Some x -> x
| None ->
(* suspend fiber, wake it up when [fut] resolves *)
M.Private.Suspend_.suspend
{
handle =
(fun ~ls ~run:_ ~resume sus ->
let on_lwt_done _ = resume ~ls sus @@ Ok () in
Perform_action_in_lwt.(
schedule Action.(On_termination (fut, on_lwt_done))));
};
(match Lwt.poll fut with
| Some x -> x
| None -> assert false)
let run_in_lwt f : _ M.Fut.t =
let fut, prom = M.Fut.make () in
Perform_action_in_lwt.schedule
(Action.Other
(fun () ->
let lwt_fut = f () in
Lwt.on_any lwt_fut
(fun x -> M.Fut.fulfill prom @@ Ok x)
(fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn))));
fut
let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f
let spawn_as_lwt ?name (f : unit -> 'a) : 'a Lwt.t =
let fut, promise = Lwt.wait () in
let _fib =
Fiber.spawn_top ?name (fun () ->
try
let x = f () in
Perform_action_in_lwt.schedule (Action.Wakeup (promise, x))
with exn ->
Perform_action_in_lwt.schedule (Action.Wakeup_exn (promise, exn)))
in
fut
let main_with_runner ~runner (f : unit -> 'a) : 'a =
let lwt_fut, lwt_prom = Lwt.wait () in
let _fiber =
Fiber.spawn_top ~name:"Moonpool_lwt.main" ~on:runner (fun () ->
try
let x = f () in
Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x))
with exn ->
Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn)))
in
Lwt_main.run lwt_fut
let main f =
let@ runner = M.Ws_pool.with_ () in
main_with_runner ~runner f

View file

@ -1 +1,229 @@
include Interop open Common_
module Fiber = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls
(** Action scheduled from outside the loop *)
module Action = struct
type event = Lwt_engine.event
type cb = event -> unit
(** Action that we ask the lwt loop to perform, from the outside *)
type t =
| Wait_readable of Unix.file_descr * cb
| Wait_writable of Unix.file_descr * cb
| Sleep of float * bool * cb
(* TODO: provide actions with cancellation, alongside a "select" operation *)
(* | Cancel of event *)
| On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t
| Wakeup : 'a Lwt.u * 'a -> t
| Wakeup_exn : _ Lwt.u * exn -> t
| Other of (unit -> unit)
(** Perform the action from within the Lwt thread *)
let perform (self : t) : unit =
match self with
| Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event)
| Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event)
| Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event)
(* | Cancel ev -> Lwt_engine.stop_event ev *)
| On_termination (fut, f) ->
Lwt.on_any fut
(fun x -> f @@ Ok x)
(fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn))
| Wakeup (prom, x) -> Lwt.wakeup prom x
| Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e
| Other f -> f ()
end
module Action_queue = struct
type t = { q: Action.t list Atomic.t } [@@unboxed]
let create () : t = { q = Atomic.make [] }
let pop_all (self : t) : _ list = Atomic.exchange self.q []
(** Push the action and return whether the queue was previously empty *)
let push (self : t) (a : Action.t) : bool =
let is_first = ref true in
while
let old = Atomic.get self.q in
if Atomic.compare_and_set self.q old (a :: old) then (
is_first := old = [];
false
) else
true
do
()
done;
!is_first
end
module Perform_action_in_lwt = struct
open struct
let actions_ : Action_queue.t = Action_queue.create ()
(** Gets the current set of notifications and perform them from inside the
Lwt thread *)
let perform_pending_actions () : unit =
let l = Action_queue.pop_all actions_ in
List.iter Action.perform l
let notification : int =
Lwt_unix.make_notification ~once:false perform_pending_actions
end
let schedule (a : Action.t) : unit =
let is_first = Action_queue.push actions_ a in
if is_first then Lwt_unix.send_notification notification
end
let get_runner () : M.Runner.t =
match M.Runner.get_current_runner () with
| Some r -> r
| None -> failwith "Moonpool_lwt.get_runner: not inside a runner"
let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
let lwt_fut, lwt_prom = Lwt.wait () in
M.Fut.on_result fut (function
| Ok x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (lwt_prom, x)
| Error (exn, _) ->
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (lwt_prom, exn));
lwt_fut
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
match Lwt.poll lwt_fut with
| Some x -> M.Fut.return x
| None ->
let fut, prom = M.Fut.make () in
Lwt.on_any lwt_fut
(fun x -> M.Fut.fulfill prom (Ok x))
(fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10)));
fut
let await_lwt (fut : _ Lwt.t) =
match Lwt.poll fut with
| Some x -> x
| None ->
(* suspend fiber, wake it up when [fut] resolves *)
M.Private.Suspend_.suspend
{
handle =
(fun ~ls ~run:_ ~resume sus ->
let on_lwt_done _ = resume ~ls sus @@ Ok () in
Perform_action_in_lwt.(
schedule Action.(On_termination (fut, on_lwt_done))));
};
(match Lwt.poll fut with
| Some x -> x
| None -> assert false)
let run_in_lwt f : _ M.Fut.t =
let fut, prom = M.Fut.make () in
Perform_action_in_lwt.schedule
(Action.Other
(fun () ->
let lwt_fut = f () in
Lwt.on_any lwt_fut
(fun x -> M.Fut.fulfill prom @@ Ok x)
(fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn))));
fut
let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f
let detach_in_runner ~runner f : _ Lwt.t =
let fut, promise = Lwt.wait () in
M.Runner.run_async runner (fun () ->
match f () with
| x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (promise, x)
| exception exn ->
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn));
fut
let main_with_runner ~runner (f : unit -> 'a) : 'a =
let lwt_fut, lwt_prom = Lwt.wait () in
let _fiber =
Fiber.spawn_top ~name:"Moonpool_lwt.main" ~on:runner (fun () ->
try
let x = f () in
Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x))
with exn ->
Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn)))
in
Lwt_main.run lwt_fut
let main f =
let@ runner = M.Ws_pool.with_ () in
main_with_runner ~runner f
module IO = struct
let rec read fd buf i len : int =
if len = 0 then
0
else (
match Unix.read fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
(* wait for FD to be ready *)
Moonpool.Private.Suspend_.suspend
{
handle =
(fun ~ls ~run:_ ~resume sus ->
Perform_action_in_lwt.schedule
@@ Action.Wait_readable
( fd,
fun cancel ->
resume ~ls sus @@ Ok ();
Lwt_engine.stop_event cancel ));
};
read fd buf i len
| n -> n
)
let rec write_once fd buf i len : int =
if len = 0 then
0
else (
match Unix.write fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
(* wait for FD to be ready *)
Moonpool.Private.Suspend_.suspend
{
handle =
(fun ~ls ~run:_ ~resume sus ->
Perform_action_in_lwt.schedule
@@ Action.Wait_writable
( fd,
fun cancel ->
resume ~ls sus @@ Ok ();
Lwt_engine.stop_event cancel ));
};
write_once fd buf i len
| n -> n
)
let write fd buf i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
let n = write_once fd buf !i !len in
i := !i + n;
len := !len - n
done
(** Sleep for the given amount of seconds *)
let sleep_s (f : float) : unit =
if f > 0. then
Moonpool.Private.Suspend_.suspend
{
handle =
(fun ~ls ~run:_ ~resume sus ->
Perform_action_in_lwt.schedule
@@ Action.Sleep
( f,
false,
fun cancel ->
resume ~ls sus @@ Ok ();
Lwt_engine.stop_event cancel ));
}
end

View file

@ -1,14 +1,24 @@
(** Lwt_engine-based event loop for Moonpool *) (** Lwt_engine-based event loop for Moonpool *)
module Cancel_handle = Cancel_handle
module Fiber = Moonpool_fib.Fiber module Fiber = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls module FLS = Moonpool_fib.Fls
(** {2 Basic conversions} *)
val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t
(** [fut_of_lwt lwt_fut] makes a thread-safe moonpool future that
completes when [lwt_fut] does *)
val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t
(** [lwt_of_fut fut] makes a lwt future that completes when
[fut] does. The result should be used only from inside the
thread running [Lwt_main.run]. *)
(** {2 Helpers on the moonpool side} *)
val await_lwt : 'a Lwt.t -> 'a val await_lwt : 'a Lwt.t -> 'a
(** [await_lwt fut] awaits a Lwt future from inside a task running on (** [await_lwt fut] awaits a Lwt future from inside a task running on
a moonpool runner. *) a moonpool runner. This must be run from within moonpool. *)
val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t
(** [run_in_lwt f] runs [f()] from within the Lwt thread (** [run_in_lwt f] runs [f()] from within the Lwt thread
@ -23,6 +33,22 @@ val get_runner : unit -> Moonpool.Runner.t
Must be run from within a fiber. Must be run from within a fiber.
@raise Failure if not run within a fiber *) @raise Failure if not run within a fiber *)
module IO : sig
val read : Unix.file_descr -> bytes -> int -> int -> int
val write_once : Unix.file_descr -> bytes -> int -> int -> int
val write : Unix.file_descr -> bytes -> int -> int -> unit
val sleep_s : float -> unit
end
(** {2 Helpers on the lwt side} *)
val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t
(** [detach_in_runner ~runner f] runs [f] in the given moonpool runner,
and returns a lwt future. This must be run from within the thread
running [Lwt_main]. *)
(** {2 Wrappers around Lwt_main} *)
val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a
(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside (** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside
a fiber in [runner]. *) a fiber in [runner]. *)