mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 03:05:30 -05:00
wip: moonpool lwt
This commit is contained in:
parent
fbc7679d05
commit
90850ae38c
3 changed files with 186 additions and 159 deletions
32
moonpool-lwt.opam
Normal file
32
moonpool-lwt.opam
Normal file
|
|
@ -0,0 +1,32 @@
|
||||||
|
# This file is generated by dune, edit dune-project instead
|
||||||
|
opam-version: "2.0"
|
||||||
|
version: "0.5.1"
|
||||||
|
synopsis: "Event loop for moonpool based on Lwt-engine"
|
||||||
|
maintainer: ["Simon Cruanes"]
|
||||||
|
authors: ["Simon Cruanes"]
|
||||||
|
license: "MIT"
|
||||||
|
homepage: "https://github.com/c-cube/moonpool"
|
||||||
|
bug-reports: "https://github.com/c-cube/moonpool/issues"
|
||||||
|
depends: [
|
||||||
|
"dune" {>= "3.0"}
|
||||||
|
"moonpool" {= version}
|
||||||
|
"ocaml" {>= "5.0"}
|
||||||
|
"lwt"
|
||||||
|
"base-unix"
|
||||||
|
"odoc" {with-doc}
|
||||||
|
]
|
||||||
|
build: [
|
||||||
|
["dune" "subst"] {dev}
|
||||||
|
[
|
||||||
|
"dune"
|
||||||
|
"build"
|
||||||
|
"-p"
|
||||||
|
name
|
||||||
|
"-j"
|
||||||
|
jobs
|
||||||
|
"@install"
|
||||||
|
"@runtest" {with-test}
|
||||||
|
"@doc" {with-doc}
|
||||||
|
]
|
||||||
|
]
|
||||||
|
dev-repo: "git+https://github.com/c-cube/moonpool.git"
|
||||||
153
src/lwt/interop.ml
Normal file
153
src/lwt/interop.ml
Normal file
|
|
@ -0,0 +1,153 @@
|
||||||
|
open Common_
|
||||||
|
module Cancel_handle = Cancel_handle
|
||||||
|
module Fiber = Moonpool_fib.Fiber
|
||||||
|
module FLS = Moonpool_fib.Fls
|
||||||
|
|
||||||
|
(** Action scheduled from outside the loop *)
|
||||||
|
module Action = struct
|
||||||
|
type event = Lwt_engine.event
|
||||||
|
type cb = event -> unit
|
||||||
|
|
||||||
|
(** Action that we ask the lwt loop to perform, from the outside *)
|
||||||
|
type t =
|
||||||
|
| Wait_readable of Unix.file_descr * cb
|
||||||
|
| Wait_writable of Unix.file_descr * cb
|
||||||
|
| Sleep of float * bool * cb
|
||||||
|
| Cancel of event
|
||||||
|
| On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t
|
||||||
|
| Wakeup : 'a Lwt.u * 'a -> t
|
||||||
|
| Wakeup_exn : _ Lwt.u * exn -> t
|
||||||
|
| Other of (unit -> unit)
|
||||||
|
|
||||||
|
(** Perform the action from within the Lwt thread *)
|
||||||
|
let perform (self : t) : unit =
|
||||||
|
match self with
|
||||||
|
| Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event)
|
||||||
|
| Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event)
|
||||||
|
| Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event)
|
||||||
|
| Cancel ev -> Lwt_engine.stop_event ev
|
||||||
|
| On_termination (fut, f) ->
|
||||||
|
Lwt.on_any fut
|
||||||
|
(fun x -> f @@ Ok x)
|
||||||
|
(fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn))
|
||||||
|
| Wakeup (prom, x) -> Lwt.wakeup prom x
|
||||||
|
| Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e
|
||||||
|
| Other f -> f ()
|
||||||
|
end
|
||||||
|
|
||||||
|
module Action_queue = struct
|
||||||
|
type t = { q: Action.t list Atomic.t } [@@unboxed]
|
||||||
|
|
||||||
|
let create () : t = { q = Atomic.make [] }
|
||||||
|
let pop_all (self : t) : _ list = Atomic.exchange self.q []
|
||||||
|
|
||||||
|
(** Push the action and return whether the queue was previously empty *)
|
||||||
|
let push (self : t) (a : Action.t) : bool =
|
||||||
|
let is_first = ref true in
|
||||||
|
while
|
||||||
|
let old = Atomic.get self.q in
|
||||||
|
if Atomic.compare_and_set self.q old (a :: old) then (
|
||||||
|
is_first := old = [];
|
||||||
|
false
|
||||||
|
) else
|
||||||
|
true
|
||||||
|
do
|
||||||
|
()
|
||||||
|
done;
|
||||||
|
!is_first
|
||||||
|
end
|
||||||
|
|
||||||
|
module Perform_action_in_lwt = struct
|
||||||
|
open struct
|
||||||
|
let actions_ : Action_queue.t = Action_queue.create ()
|
||||||
|
|
||||||
|
(** Gets the current set of notifications and perform them from inside the
|
||||||
|
Lwt thread *)
|
||||||
|
let perform_pending_actions () : unit =
|
||||||
|
let l = Action_queue.pop_all actions_ in
|
||||||
|
List.iter Action.perform l
|
||||||
|
|
||||||
|
let notification : int =
|
||||||
|
Lwt_unix.make_notification ~once:false perform_pending_actions
|
||||||
|
end
|
||||||
|
|
||||||
|
let schedule (a : Action.t) : unit =
|
||||||
|
let is_first = Action_queue.push actions_ a in
|
||||||
|
if is_first then Lwt_unix.send_notification notification
|
||||||
|
end
|
||||||
|
|
||||||
|
let get_runner () : M.Runner.t =
|
||||||
|
match M.Runner.get_current_runner () with
|
||||||
|
| Some r -> r
|
||||||
|
| None -> failwith "Moonpool_lwt.get_runner: not inside a runner"
|
||||||
|
|
||||||
|
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
|
||||||
|
match Lwt.poll lwt_fut with
|
||||||
|
| Some x -> M.Fut.return x
|
||||||
|
| None ->
|
||||||
|
let fut, prom = M.Fut.make () in
|
||||||
|
Lwt.on_any lwt_fut
|
||||||
|
(fun x -> M.Fut.fulfill prom (Ok x))
|
||||||
|
(fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10)));
|
||||||
|
fut
|
||||||
|
|
||||||
|
let await_lwt (fut : _ Lwt.t) =
|
||||||
|
match Lwt.poll fut with
|
||||||
|
| Some x -> x
|
||||||
|
| None ->
|
||||||
|
(* suspend fiber, wake it up when [fut] resolves *)
|
||||||
|
M.Private.Suspend_.suspend
|
||||||
|
{
|
||||||
|
handle =
|
||||||
|
(fun ~ls ~run:_ ~resume sus ->
|
||||||
|
let on_lwt_done _ = resume ~ls sus @@ Ok () in
|
||||||
|
Perform_action_in_lwt.(
|
||||||
|
schedule Action.(On_termination (fut, on_lwt_done))));
|
||||||
|
};
|
||||||
|
|
||||||
|
(match Lwt.poll fut with
|
||||||
|
| Some x -> x
|
||||||
|
| None -> assert false)
|
||||||
|
|
||||||
|
let run_in_lwt f : _ M.Fut.t =
|
||||||
|
let fut, prom = M.Fut.make () in
|
||||||
|
Perform_action_in_lwt.schedule
|
||||||
|
(Action.Other
|
||||||
|
(fun () ->
|
||||||
|
let lwt_fut = f () in
|
||||||
|
Lwt.on_any lwt_fut
|
||||||
|
(fun x -> M.Fut.fulfill prom @@ Ok x)
|
||||||
|
(fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn))));
|
||||||
|
fut
|
||||||
|
|
||||||
|
let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f
|
||||||
|
|
||||||
|
let spawn_as_lwt ?name (f : unit -> 'a) : 'a Lwt.t =
|
||||||
|
let fut, promise = Lwt.wait () in
|
||||||
|
let _fib =
|
||||||
|
Fiber.spawn_top ?name (fun () ->
|
||||||
|
try
|
||||||
|
let x = f () in
|
||||||
|
Perform_action_in_lwt.schedule (Action.Wakeup (promise, x))
|
||||||
|
with exn ->
|
||||||
|
Perform_action_in_lwt.schedule (Action.Wakeup_exn (promise, exn)))
|
||||||
|
in
|
||||||
|
fut
|
||||||
|
|
||||||
|
let main_with_runner ~runner (f : unit -> 'a) : 'a =
|
||||||
|
let lwt_fut, lwt_prom = Lwt.wait () in
|
||||||
|
|
||||||
|
let _fiber =
|
||||||
|
Fiber.spawn_top ~name:"Moonpool_lwt.main" ~on:runner (fun () ->
|
||||||
|
try
|
||||||
|
let x = f () in
|
||||||
|
Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x))
|
||||||
|
with exn ->
|
||||||
|
Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn)))
|
||||||
|
in
|
||||||
|
|
||||||
|
Lwt_main.run lwt_fut
|
||||||
|
|
||||||
|
let main f =
|
||||||
|
let@ runner = M.Ws_pool.with_ () in
|
||||||
|
main_with_runner ~runner f
|
||||||
|
|
@ -1,159 +1 @@
|
||||||
open Common_
|
include Interop
|
||||||
module Cancel_handle = Cancel_handle
|
|
||||||
module Fiber = Moonpool_fib.Fiber
|
|
||||||
module FLS = Moonpool_fib.Fls
|
|
||||||
|
|
||||||
open struct
|
|
||||||
let _pp_pending out engine =
|
|
||||||
Printf.fprintf out "readc=%d writec=%d timerc=%d" engine#readable_count
|
|
||||||
engine#writable_count engine#timer_count
|
|
||||||
end
|
|
||||||
|
|
||||||
(** Action scheduled from outside the loop *)
|
|
||||||
module Action = struct
|
|
||||||
type event = Lwt_engine.event
|
|
||||||
type cb = event -> unit
|
|
||||||
|
|
||||||
(** Action that we ask the lwt loop to perform, from the outside *)
|
|
||||||
type t =
|
|
||||||
| Wait_readable of Unix.file_descr * cb
|
|
||||||
| Wait_writable of Unix.file_descr * cb
|
|
||||||
| Sleep of float * bool * cb
|
|
||||||
| Cancel of event
|
|
||||||
| On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t
|
|
||||||
| Wakeup : 'a Lwt.u * 'a -> t
|
|
||||||
| Wakeup_exn : _ Lwt.u * exn -> t
|
|
||||||
| Other of (unit -> unit)
|
|
||||||
|
|
||||||
(** Perform the action from within the Lwt thread *)
|
|
||||||
let perform (self : t) : unit =
|
|
||||||
match self with
|
|
||||||
| Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event)
|
|
||||||
| Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event)
|
|
||||||
| Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event)
|
|
||||||
| Cancel ev -> Lwt_engine.stop_event ev
|
|
||||||
| On_termination (fut, f) ->
|
|
||||||
Lwt.on_any fut
|
|
||||||
(fun x -> f @@ Ok x)
|
|
||||||
(fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn))
|
|
||||||
| Wakeup (prom, x) -> Lwt.wakeup prom x
|
|
||||||
| Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e
|
|
||||||
| Other f -> f ()
|
|
||||||
end
|
|
||||||
|
|
||||||
module Action_queue = struct
|
|
||||||
type t = { q: Action.t list Atomic.t } [@@unboxed]
|
|
||||||
|
|
||||||
let create () : t = { q = Atomic.make [] }
|
|
||||||
let pop_all (self : t) : _ list = Atomic.exchange self.q []
|
|
||||||
|
|
||||||
(** Push the action and return whether the queue was previously empty *)
|
|
||||||
let push (self : t) (a : Action.t) : bool =
|
|
||||||
let is_first = ref true in
|
|
||||||
while
|
|
||||||
let old = Atomic.get self.q in
|
|
||||||
if Atomic.compare_and_set self.q old (a :: old) then (
|
|
||||||
is_first := old = [];
|
|
||||||
false
|
|
||||||
) else
|
|
||||||
true
|
|
||||||
do
|
|
||||||
()
|
|
||||||
done;
|
|
||||||
!is_first
|
|
||||||
end
|
|
||||||
|
|
||||||
module Perform_action_in_lwt = struct
|
|
||||||
open struct
|
|
||||||
let actions_ : Action_queue.t = Action_queue.create ()
|
|
||||||
|
|
||||||
(** Gets the current set of notifications and perform them from inside the
|
|
||||||
Lwt thread *)
|
|
||||||
let perform_pending_actions () : unit =
|
|
||||||
let l = Action_queue.pop_all actions_ in
|
|
||||||
List.iter Action.perform l
|
|
||||||
|
|
||||||
let notification : int =
|
|
||||||
Lwt_unix.make_notification ~once:false perform_pending_actions
|
|
||||||
end
|
|
||||||
|
|
||||||
let schedule (a : Action.t) : unit =
|
|
||||||
let is_first = Action_queue.push actions_ a in
|
|
||||||
if is_first then Lwt_unix.send_notification notification
|
|
||||||
end
|
|
||||||
|
|
||||||
let get_runner () : M.Runner.t =
|
|
||||||
match M.Runner.get_current_runner () with
|
|
||||||
| Some r -> r
|
|
||||||
| None -> failwith "Moonpool_lwt.get_runner: not inside a runner"
|
|
||||||
|
|
||||||
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
|
|
||||||
match Lwt.poll lwt_fut with
|
|
||||||
| Some x -> M.Fut.return x
|
|
||||||
| None ->
|
|
||||||
let fut, prom = M.Fut.make () in
|
|
||||||
Lwt.on_any lwt_fut
|
|
||||||
(fun x -> M.Fut.fulfill prom (Ok x))
|
|
||||||
(fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10)));
|
|
||||||
fut
|
|
||||||
|
|
||||||
let await_lwt (fut : _ Lwt.t) =
|
|
||||||
match Lwt.poll fut with
|
|
||||||
| Some x -> x
|
|
||||||
| None ->
|
|
||||||
(* suspend fiber, wake it up when [fut] resolves *)
|
|
||||||
M.Private.Suspend_.suspend
|
|
||||||
{
|
|
||||||
handle =
|
|
||||||
(fun ~ls ~run:_ ~resume sus ->
|
|
||||||
let on_lwt_done _ = resume ~ls sus @@ Ok () in
|
|
||||||
Perform_action_in_lwt.(
|
|
||||||
schedule Action.(On_termination (fut, on_lwt_done))));
|
|
||||||
};
|
|
||||||
|
|
||||||
(match Lwt.poll fut with
|
|
||||||
| Some x -> x
|
|
||||||
| None -> assert false)
|
|
||||||
|
|
||||||
let run_in_lwt f : _ M.Fut.t =
|
|
||||||
let fut, prom = M.Fut.make () in
|
|
||||||
Perform_action_in_lwt.schedule
|
|
||||||
(Action.Other
|
|
||||||
(fun () ->
|
|
||||||
let lwt_fut = f () in
|
|
||||||
Lwt.on_any lwt_fut
|
|
||||||
(fun x -> M.Fut.fulfill prom @@ Ok x)
|
|
||||||
(fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn))));
|
|
||||||
fut
|
|
||||||
|
|
||||||
let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f
|
|
||||||
|
|
||||||
let spawn_as_lwt ?name (f : unit -> 'a) : 'a Lwt.t =
|
|
||||||
let fut, promise = Lwt.wait () in
|
|
||||||
let _fib =
|
|
||||||
Fiber.spawn_top ?name (fun () ->
|
|
||||||
try
|
|
||||||
let x = f () in
|
|
||||||
Perform_action_in_lwt.schedule (Action.Wakeup (promise, x))
|
|
||||||
with exn ->
|
|
||||||
Perform_action_in_lwt.schedule (Action.Wakeup_exn (promise, exn)))
|
|
||||||
in
|
|
||||||
fut
|
|
||||||
|
|
||||||
let main_with_runner ~runner (f : unit -> 'a) : 'a =
|
|
||||||
let lwt_fut, lwt_prom = Lwt.wait () in
|
|
||||||
|
|
||||||
let _fiber =
|
|
||||||
Fiber.spawn_top ~name:"Moonpool_lwt.main" ~on:runner (fun () ->
|
|
||||||
try
|
|
||||||
let x = f () in
|
|
||||||
Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x))
|
|
||||||
with exn ->
|
|
||||||
Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn)))
|
|
||||||
in
|
|
||||||
|
|
||||||
Lwt_main.run lwt_fut
|
|
||||||
|
|
||||||
let main f =
|
|
||||||
let@ runner = M.Ws_pool.with_ () in
|
|
||||||
main_with_runner ~runner f
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue