mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-05 19:00:33 -05:00
wip: add moonpool-lwt
This commit is contained in:
parent
a5eef687c8
commit
fbc7679d05
6 changed files with 228 additions and 0 deletions
10
dune-project
10
dune-project
|
|
@ -33,4 +33,14 @@
|
|||
(tags
|
||||
(thread pool domain futures fork-join)))
|
||||
|
||||
(package
|
||||
(name moonpool-lwt)
|
||||
(synopsis "Event loop for moonpool based on Lwt-engine")
|
||||
(depends
|
||||
(moonpool (= :version))
|
||||
(ocaml (>= 5.0))
|
||||
lwt
|
||||
base-unix
|
||||
(odoc :with-doc)))
|
||||
|
||||
; See the complete stanza docs at https://dune.readthedocs.io/en/stable/dune-files.html#dune-project
|
||||
|
|
|
|||
12
src/lwt/cancel_handle.ml
Normal file
12
src/lwt/cancel_handle.ml
Normal file
|
|
@ -0,0 +1,12 @@
|
|||
(** Cancellation handle. *)
|
||||
|
||||
type t = { cancel: unit -> unit } [@@unboxed]
|
||||
(** A handle to cancel atomic actions (waiting on something), or
|
||||
stopping a subscription to some event. *)
|
||||
|
||||
(** Perform the cancellation. This should be idempotent. *)
|
||||
let[@inline] cancel self = self.cancel ()
|
||||
|
||||
(** Dummy that cancels nothing *)
|
||||
let dummy : t = { cancel = ignore }
|
||||
|
||||
8
src/lwt/common_.ml
Normal file
8
src/lwt/common_.ml
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
module M = Moonpool
|
||||
module Exn_bt = M.Exn_bt
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
|
||||
let[@inline] cancel_handle_of_event (ev : Lwt_engine.event) : Cancel_handle.t =
|
||||
let cancel () = Lwt_engine.stop_event ev in
|
||||
{ Cancel_handle.cancel }
|
||||
8
src/lwt/dune
Normal file
8
src/lwt/dune
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
|
||||
(library
|
||||
(name moonpool_lwt)
|
||||
(public_name moonpool-lwt)
|
||||
(private_modules common_)
|
||||
(enabled_if
|
||||
(>= %{ocaml_version} 5.0))
|
||||
(libraries moonpool moonpool.fib lwt lwt.unix))
|
||||
159
src/lwt/moonpool_lwt.ml
Normal file
159
src/lwt/moonpool_lwt.ml
Normal file
|
|
@ -0,0 +1,159 @@
|
|||
open Common_
|
||||
module Cancel_handle = Cancel_handle
|
||||
module Fiber = Moonpool_fib.Fiber
|
||||
module FLS = Moonpool_fib.Fls
|
||||
|
||||
open struct
|
||||
let _pp_pending out engine =
|
||||
Printf.fprintf out "readc=%d writec=%d timerc=%d" engine#readable_count
|
||||
engine#writable_count engine#timer_count
|
||||
end
|
||||
|
||||
(** Action scheduled from outside the loop *)
|
||||
module Action = struct
|
||||
type event = Lwt_engine.event
|
||||
type cb = event -> unit
|
||||
|
||||
(** Action that we ask the lwt loop to perform, from the outside *)
|
||||
type t =
|
||||
| Wait_readable of Unix.file_descr * cb
|
||||
| Wait_writable of Unix.file_descr * cb
|
||||
| Sleep of float * bool * cb
|
||||
| Cancel of event
|
||||
| On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t
|
||||
| Wakeup : 'a Lwt.u * 'a -> t
|
||||
| Wakeup_exn : _ Lwt.u * exn -> t
|
||||
| Other of (unit -> unit)
|
||||
|
||||
(** Perform the action from within the Lwt thread *)
|
||||
let perform (self : t) : unit =
|
||||
match self with
|
||||
| Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event)
|
||||
| Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event)
|
||||
| Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event)
|
||||
| Cancel ev -> Lwt_engine.stop_event ev
|
||||
| On_termination (fut, f) ->
|
||||
Lwt.on_any fut
|
||||
(fun x -> f @@ Ok x)
|
||||
(fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn))
|
||||
| Wakeup (prom, x) -> Lwt.wakeup prom x
|
||||
| Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e
|
||||
| Other f -> f ()
|
||||
end
|
||||
|
||||
module Action_queue = struct
|
||||
type t = { q: Action.t list Atomic.t } [@@unboxed]
|
||||
|
||||
let create () : t = { q = Atomic.make [] }
|
||||
let pop_all (self : t) : _ list = Atomic.exchange self.q []
|
||||
|
||||
(** Push the action and return whether the queue was previously empty *)
|
||||
let push (self : t) (a : Action.t) : bool =
|
||||
let is_first = ref true in
|
||||
while
|
||||
let old = Atomic.get self.q in
|
||||
if Atomic.compare_and_set self.q old (a :: old) then (
|
||||
is_first := old = [];
|
||||
false
|
||||
) else
|
||||
true
|
||||
do
|
||||
()
|
||||
done;
|
||||
!is_first
|
||||
end
|
||||
|
||||
module Perform_action_in_lwt = struct
|
||||
open struct
|
||||
let actions_ : Action_queue.t = Action_queue.create ()
|
||||
|
||||
(** Gets the current set of notifications and perform them from inside the
|
||||
Lwt thread *)
|
||||
let perform_pending_actions () : unit =
|
||||
let l = Action_queue.pop_all actions_ in
|
||||
List.iter Action.perform l
|
||||
|
||||
let notification : int =
|
||||
Lwt_unix.make_notification ~once:false perform_pending_actions
|
||||
end
|
||||
|
||||
let schedule (a : Action.t) : unit =
|
||||
let is_first = Action_queue.push actions_ a in
|
||||
if is_first then Lwt_unix.send_notification notification
|
||||
end
|
||||
|
||||
let get_runner () : M.Runner.t =
|
||||
match M.Runner.get_current_runner () with
|
||||
| Some r -> r
|
||||
| None -> failwith "Moonpool_lwt.get_runner: not inside a runner"
|
||||
|
||||
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
|
||||
match Lwt.poll lwt_fut with
|
||||
| Some x -> M.Fut.return x
|
||||
| None ->
|
||||
let fut, prom = M.Fut.make () in
|
||||
Lwt.on_any lwt_fut
|
||||
(fun x -> M.Fut.fulfill prom (Ok x))
|
||||
(fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10)));
|
||||
fut
|
||||
|
||||
let await_lwt (fut : _ Lwt.t) =
|
||||
match Lwt.poll fut with
|
||||
| Some x -> x
|
||||
| None ->
|
||||
(* suspend fiber, wake it up when [fut] resolves *)
|
||||
M.Private.Suspend_.suspend
|
||||
{
|
||||
handle =
|
||||
(fun ~ls ~run:_ ~resume sus ->
|
||||
let on_lwt_done _ = resume ~ls sus @@ Ok () in
|
||||
Perform_action_in_lwt.(
|
||||
schedule Action.(On_termination (fut, on_lwt_done))));
|
||||
};
|
||||
|
||||
(match Lwt.poll fut with
|
||||
| Some x -> x
|
||||
| None -> assert false)
|
||||
|
||||
let run_in_lwt f : _ M.Fut.t =
|
||||
let fut, prom = M.Fut.make () in
|
||||
Perform_action_in_lwt.schedule
|
||||
(Action.Other
|
||||
(fun () ->
|
||||
let lwt_fut = f () in
|
||||
Lwt.on_any lwt_fut
|
||||
(fun x -> M.Fut.fulfill prom @@ Ok x)
|
||||
(fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn))));
|
||||
fut
|
||||
|
||||
let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f
|
||||
|
||||
let spawn_as_lwt ?name (f : unit -> 'a) : 'a Lwt.t =
|
||||
let fut, promise = Lwt.wait () in
|
||||
let _fib =
|
||||
Fiber.spawn_top ?name (fun () ->
|
||||
try
|
||||
let x = f () in
|
||||
Perform_action_in_lwt.schedule (Action.Wakeup (promise, x))
|
||||
with exn ->
|
||||
Perform_action_in_lwt.schedule (Action.Wakeup_exn (promise, exn)))
|
||||
in
|
||||
fut
|
||||
|
||||
let main_with_runner ~runner (f : unit -> 'a) : 'a =
|
||||
let lwt_fut, lwt_prom = Lwt.wait () in
|
||||
|
||||
let _fiber =
|
||||
Fiber.spawn_top ~name:"Moonpool_lwt.main" ~on:runner (fun () ->
|
||||
try
|
||||
let x = f () in
|
||||
Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x))
|
||||
with exn ->
|
||||
Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn)))
|
||||
in
|
||||
|
||||
Lwt_main.run lwt_fut
|
||||
|
||||
let main f =
|
||||
let@ runner = M.Ws_pool.with_ () in
|
||||
main_with_runner ~runner f
|
||||
31
src/lwt/moonpool_lwt.mli
Normal file
31
src/lwt/moonpool_lwt.mli
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
(** Lwt_engine-based event loop for Moonpool *)
|
||||
|
||||
module Cancel_handle = Cancel_handle
|
||||
module Fiber = Moonpool_fib.Fiber
|
||||
module FLS = Moonpool_fib.Fls
|
||||
|
||||
val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t
|
||||
|
||||
val await_lwt : 'a Lwt.t -> 'a
|
||||
(** [await_lwt fut] awaits a Lwt future from inside a task running on
|
||||
a moonpool runner. *)
|
||||
|
||||
val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t
|
||||
(** [run_in_lwt f] runs [f()] from within the Lwt thread
|
||||
and returns a thread-safe future. *)
|
||||
|
||||
val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a
|
||||
(** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and
|
||||
awaits its result. Must be run from inside a moonpool runner. *)
|
||||
|
||||
val get_runner : unit -> Moonpool.Runner.t
|
||||
(** Returns the runner from within which this is called.
|
||||
Must be run from within a fiber.
|
||||
@raise Failure if not run within a fiber *)
|
||||
|
||||
val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a
|
||||
(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside
|
||||
a fiber in [runner]. *)
|
||||
|
||||
val main : (unit -> 'a) -> 'a
|
||||
(** Like {!main_with_runner} but with a default choice of runner. *)
|
||||
Loading…
Add table
Reference in a new issue