Merge pull request #22 from c-cube/wip-pool-lwt

moonpool-lwt, for interop with lwt
This commit is contained in:
Simon Cruanes 2024-02-20 12:00:49 -05:00 committed by GitHub
commit df8b284a0d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
39 changed files with 1370 additions and 145 deletions

View file

@ -20,10 +20,10 @@ jobs:
allow-prerelease-opam: true
- name: Deps
run: opam install odig moonpool
run: opam install odig moonpool moonpool-lwt
- name: Build
run: opam exec -- odig odoc --cache-dir=_doc/ moonpool
run: opam exec -- odig odoc --cache-dir=_doc/ moonpool moonpool-lwt
- name: Deploy
uses: peaceiris/actions-gh-pages@v3

View file

@ -20,7 +20,7 @@ jobs:
ocaml-compiler:
- '4.08'
- '4.14'
- '5.0'
- '5.1'
runs-on: ${{ matrix.os }}
steps:
@ -32,7 +32,10 @@ jobs:
dune-cache: true
allow-prerelease-opam: true
- run: opam install -t moonpool moonpool-lwt --deps-only
if: matrix.ocaml-compiler == '5.1'
- run: opam install -t moonpool --deps-only
if: matrix.ocaml-compiler != '5.1'
- run: opam exec -- dune build @install
- run: opam exec -- dune runtest
- run: opam install thread-local-storage trace

View file

@ -14,7 +14,7 @@ let cutoff = ref 20
let rec fib ~on x : int Fut.t =
if x <= !cutoff then
Fut.spawn ~name:"fib" ~on (fun () -> fib_direct x)
Fut.spawn ~on (fun () -> fib_direct x)
else
let open Fut.Infix in
let+ t1 = fib ~on (x - 1) and+ t2 = fib ~on (x - 2) in
@ -31,14 +31,14 @@ let fib_fj ~on x : int Fut.t =
n1 + n2
)
in
Fut.spawn ~name:"fib" ~on (fun () -> fib_rec x)
Fut.spawn ~on (fun () -> fib_rec x)
let fib_await ~on x : int Fut.t =
let rec fib_rec x : int Fut.t =
if x <= !cutoff then
Fut.spawn ~name:"fib" ~on (fun () -> fib_direct x)
Fut.spawn ~on (fun () -> fib_direct x)
else
Fut.spawn ~name:"fib" ~on (fun () ->
Fut.spawn ~on (fun () ->
let n1 = fib_rec (x - 1) in
let n2 = fib_rec (x - 2) in
let n1 = Fut.await n1 in

View file

@ -76,7 +76,7 @@ let run_fork_join ~kind num_steps : float =
let step = 1. /. float num_steps in
let global_sum = Lock.create 0. in
Ws_pool.run_wait_block ~name:"pi.fj" pool (fun () ->
Ws_pool.run_wait_block pool (fun () ->
FJ.for_
~chunk_size:(3 + (num_steps / num_tasks))
num_steps

View file

@ -33,4 +33,15 @@
(tags
(thread pool domain futures fork-join)))
(package
(name moonpool-lwt)
(synopsis "Event loop for moonpool based on Lwt-engine")
(allow_empty) ; on < 5.0
(depends
(moonpool (= :version))
(ocaml (>= 5.0))
lwt
base-unix
(odoc :with-doc)))
; See the complete stanza docs at https://dune.readthedocs.io/en/stable/dune-files.html#dune-project

32
moonpool-lwt.opam Normal file
View file

@ -0,0 +1,32 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.5.1"
synopsis: "Event loop for moonpool based on Lwt-engine"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]
license: "MIT"
homepage: "https://github.com/c-cube/moonpool"
bug-reports: "https://github.com/c-cube/moonpool/issues"
depends: [
"dune" {>= "3.0"}
"moonpool" {= version}
"ocaml" {>= "5.0"}
"lwt"
"base-unix"
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/c-cube/moonpool.git"

View file

@ -6,7 +6,6 @@ let k_storage = Task_local_storage.Private_.Storage.k_storage
type task_full = {
f: unit -> unit;
name: string;
ls: Task_local_storage.storage;
}
@ -44,18 +43,17 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit =
!cur_ls
in
let run_another_task ls ~name task' =
let run_another_task ls task' =
let ls' = Task_local_storage.Private_.Storage.copy ls in
schedule_ self { f = task'; name; ls = ls' }
schedule_ self { f = task'; ls = ls' }
in
let run_task (task : task_full) : unit =
cur_ls := task.ls;
let _ctx = before_task runner in
cur_span := Tracing_.enter_span task.name;
let resume ls k res =
schedule_ self { f = (fun () -> k res); name = task.name; ls }
schedule_ self { f = (fun () -> k res); ls }
in
(* run the task now, catching errors, handling effects *)
@ -105,12 +103,12 @@ type ('a, 'b) create_args =
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?num_threads:int ->
?name:string ->
?name:string ->
'a
let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
?around_task ?num_threads ?name () : t =
?around_task ?num_threads ?name () : t =
(* wrapper *)
let around_task =
match around_task with
@ -131,7 +129,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
{ threads = Array.make num_threads dummy; q = Bb_queue.create () }
in
let run_async ~name ~ls f = schedule_ pool { f; name; ls } in
let run_async ~ls f = schedule_ pool { f; ls } in
let runner =
Runner.For_runner_implementors.create

View file

@ -5,21 +5,13 @@ type 'a waiter = 'a or_error -> unit
type 'a state =
| Done of 'a or_error
| Waiting of {
waiters: 'a waiter list;
name: string;
}
| Waiting of { waiters: 'a waiter list }
type 'a t = { st: 'a state A.t } [@@unboxed]
type 'a promise = 'a t
let[@inline] get_name_ (self : _ t) =
match A.get self.st with
| Done _ -> ""
| Waiting { name; _ } -> name
let make ?(name = "") () =
let fut = { st = A.make (Waiting { waiters = []; name }) } in
let make () =
let fut = { st = A.make (Waiting { waiters = [] }) } in
fut, fut
let[@inline] of_result x : _ t = { st = A.make (Done x) }
@ -72,8 +64,8 @@ let on_result (self : _ t) (f : _ waiter) : unit =
| Done x ->
f x;
false
| Waiting { waiters = l; name } ->
not (A.compare_and_set self.st st (Waiting { waiters = f :: l; name }))
| Waiting { waiters = l } ->
not (A.compare_and_set self.st st (Waiting { waiters = f :: l }))
do
Domain_.relax ()
done
@ -86,7 +78,7 @@ let fulfill (self : _ t) (r : _ result) : unit =
let st = A.get self.st in
match st with
| Done _ -> raise Already_fulfilled
| Waiting { waiters = l; name = _ } ->
| Waiting { waiters = l } ->
let did_swap = A.compare_and_set self.st st (Done r) in
if did_swap then (
(* success, now call all the waiters *)
@ -105,7 +97,7 @@ let[@inline] fulfill_idempotent self r =
(* ### combinators ### *)
let spawn ?name ?ls ~on f : _ t =
let spawn ?ls ~on f : _ t =
let fut, promise = make () in
let task () =
@ -118,13 +110,13 @@ let spawn ?name ?ls ~on f : _ t =
fulfill promise res
in
Runner.run_async ?name ?ls on task;
Runner.run_async ?ls on task;
fut
let spawn_on_current_runner ?name ?ls f : _ t =
let spawn_on_current_runner ?ls f : _ t =
match Runner.get_current_runner () with
| None -> failwith "Fut.spawn_on_current_runner: not running on a runner"
| Some on -> spawn ?name ?ls ~on f
| Some on -> spawn ?ls ~on f
let reify_error (f : 'a t) : 'a or_error t =
match peek f with
@ -150,22 +142,20 @@ let map ?on ~f fut : _ t =
| Error e_bt -> Error e_bt
in
let name = get_name_ fut in
match peek fut, get_runner_ ?on () with
| Some res, None -> of_result @@ map_immediate_ res
| Some res, Some runner ->
let fut2, promise = make ~name () in
Runner.run_async ~name runner (fun () ->
fulfill promise @@ map_immediate_ res);
let fut2, promise = make () in
Runner.run_async runner (fun () -> fulfill promise @@ map_immediate_ res);
fut2
| None, None ->
let fut2, promise = make ~name () in
let fut2, promise = make () in
on_result fut (fun res -> fulfill promise @@ map_immediate_ res);
fut2
| None, Some runner ->
let fut2, promise = make ~name () in
let fut2, promise = make () in
on_result fut (fun res ->
Runner.run_async ~name runner (fun () ->
Runner.run_async runner (fun () ->
fulfill promise @@ map_immediate_ res));
fut2
@ -174,7 +164,7 @@ let join (fut : 'a t t) : 'a t =
| Some (Ok f) -> f
| Some (Error (e, bt)) -> fail e bt
| None ->
let fut2, promise = make ~name:(get_name_ fut) () in
let fut2, promise = make () in
on_result fut (function
| Ok sub_fut -> on_result sub_fut (fulfill promise)
| Error _ as e -> fulfill promise e);
@ -197,20 +187,19 @@ let bind ?on ~f fut : _ t =
on_result f_res_fut (fun r -> fulfill promise r)
in
let name = get_name_ fut in
match peek fut, get_runner_ ?on () with
| Some res, Some runner ->
let fut2, promise = make ~name () in
Runner.run_async ~name runner (bind_and_fulfill res promise);
let fut2, promise = make () in
Runner.run_async runner (bind_and_fulfill res promise);
fut2
| Some res, None -> apply_f_to_res res
| None, Some runner ->
let fut2, promise = make ~name () in
let fut2, promise = make () in
on_result fut (fun r ->
Runner.run_async ~name runner (bind_and_fulfill r promise));
Runner.run_async runner (bind_and_fulfill r promise));
fut2
| None, None ->
let fut2, promise = make ~name () in
let fut2, promise = make () in
on_result fut (fun res -> bind_and_fulfill res promise ());
fut2
@ -234,7 +223,7 @@ let both a b : _ t =
| Some (Ok x), Some (Ok y) -> return (x, y)
| Some (Error (e, bt)), _ | _, Some (Error (e, bt)) -> fail e bt
| _ ->
let fut, promise = make ~name:(get_name_ a) () in
let fut, promise = make () in
let st = A.make `Neither in
on_result a (function
@ -267,7 +256,7 @@ let choose a b : _ t =
| _, Some (Ok y) -> return (Either.Right y)
| Some (Error (e, bt)), Some (Error _) -> fail e bt
| _ ->
let fut, promise = make ~name:(get_name_ a) () in
let fut, promise = make () in
let one_failure = A.make false in
on_result a (function
@ -290,7 +279,7 @@ let choose_same a b : _ t =
| _, Some (Ok y) -> return y
| Some (Error (e, bt)), Some (Error _) -> fail e bt
| _ ->
let fut, promise = make ~name:(get_name_ a) () in
let fut, promise = make () in
let one_failure = A.make false in
on_result a (function

View file

@ -26,9 +26,8 @@ type 'a promise
(** A promise, which can be fulfilled exactly once to set
the corresponding future *)
val make : ?name:string -> unit -> 'a t * 'a promise
(** Make a new future with the associated promise.
@param name name for the future, used for tracing. since NEXT_RELEASE. *)
val make : unit -> 'a t * 'a promise
(** Make a new future with the associated promise. *)
val on_result : 'a t -> ('a or_error -> unit) -> unit
(** [on_result fut f] registers [f] to be called in the future
@ -95,16 +94,12 @@ val is_failed : _ t -> bool
(** {2 Combinators} *)
val spawn :
?name:string ->
?ls:Task_local_storage.storage ->
on:Runner.t ->
(unit -> 'a) ->
'a t
?ls:Task_local_storage.storage -> on:Runner.t -> (unit -> 'a) -> 'a t
(** [spaw ~on f] runs [f()] on the given runner [on], and return a future that will
hold its result. *)
val spawn_on_current_runner :
?name:string -> ?ls:Task_local_storage.storage -> (unit -> 'a) -> 'a t
?ls:Task_local_storage.storage -> (unit -> 'a) -> 'a t
(** This must be run from inside a runner, and schedules
the new task on it as well.

View file

@ -4,19 +4,16 @@ include Runner
(* convenient alias *)
let k_ls = Task_local_storage.Private_.Storage.k_storage
let run_async_ ~name ~ls f =
let run_async_ ~ls f =
let cur_ls = ref ls in
TLS.set k_ls (Some cur_ls);
cur_ls := ls;
let sp = Tracing_.enter_span name in
try
let x = f () in
Tracing_.exit_span sp;
TLS.set k_ls None;
x
with e ->
let bt = Printexc.get_raw_backtrace () in
Tracing_.exit_span sp;
TLS.set k_ls None;
Printexc.raise_with_backtrace e bt

View file

@ -34,6 +34,7 @@ module Private = struct
module Ws_deque_ = Ws_deque_
module Suspend_ = Suspend_
module Domain_ = Domain_
module Tracing_ = Tracing_
let num_domains = Domain_pool_.n_domains
end

View file

@ -26,25 +26,14 @@ val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t
to run all the various threads needed in an application (timers, event loops, etc.) *)
val run_async :
?name:string ->
?ls:Task_local_storage.storage ->
Runner.t ->
(unit -> unit) ->
unit
?ls:Task_local_storage.storage -> Runner.t -> (unit -> unit) -> unit
(** [run_async runner task] schedules the task to run
on the given runner. This means [task()] will be executed
at some point in the future, possibly in another thread.
@param name if provided and [Trace] is present in dependencies, a span
will be created when the task starts, and will stop when the task is over.
(since NEXT_RELEASE)
@since 0.5 *)
val run_wait_block :
?name:string ->
?ls:Task_local_storage.storage ->
Runner.t ->
(unit -> 'a) ->
'a
?ls:Task_local_storage.storage -> Runner.t -> (unit -> 'a) -> 'a
(** [run_wait_block runner f] schedules [f] for later execution
on the runner, like {!run_async}.
It then blocks the current thread until [f()] is done executing,
@ -63,21 +52,14 @@ val recommended_thread_count : unit -> int
@since 0.5 *)
val spawn :
?name:string ->
?ls:Task_local_storage.storage ->
on:Runner.t ->
(unit -> 'a) ->
'a Fut.t
?ls:Task_local_storage.storage -> on:Runner.t -> (unit -> 'a) -> 'a Fut.t
(** [spawn ~on f] runs [f()] on the runner (a thread pool typically)
and returns a future result for it. See {!Fut.spawn}.
@param name if provided and [Trace] is present in dependencies,
a span will be created for the future. (since 0.6)
@since 0.5 *)
val spawn_on_current_runner :
?name:string -> ?ls:Task_local_storage.storage -> (unit -> 'a) -> 'a Fut.t
?ls:Task_local_storage.storage -> (unit -> 'a) -> 'a Fut.t
(** See {!Fut.spawn_on_current_runner}.
@param name see {!spawn}. since 0.6.
@since 0.5 *)
[@@@ifge 5.0]
@ -240,6 +222,8 @@ module Private : sig
module Domain_ = Domain_
(** Utils for domains *)
module Tracing_ = Tracing_
val num_domains : unit -> int
(** Number of domains in the backing domain pool *)
end

View file

@ -3,7 +3,7 @@ module TLS = Thread_local_storage_
type task = unit -> unit
type t = {
run_async: name:string -> ls:Task_local_storage.storage -> task -> unit;
run_async: ls:Task_local_storage.storage -> task -> unit;
shutdown: wait:bool -> unit -> unit;
size: unit -> int;
num_tasks: unit -> int;
@ -11,9 +11,9 @@ type t = {
exception Shutdown
let[@inline] run_async ?(name = "")
?(ls = Task_local_storage.Private_.Storage.create ()) (self : t) f : unit =
self.run_async ~name ~ls f
let[@inline] run_async ?(ls = Task_local_storage.Private_.Storage.create ())
(self : t) f : unit =
self.run_async ~ls f
let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true ()
@ -23,9 +23,9 @@ let[@inline] shutdown_without_waiting (self : t) : unit =
let[@inline] num_tasks (self : t) : int = self.num_tasks ()
let[@inline] size (self : t) : int = self.size ()
let run_wait_block ?name ?ls self (f : unit -> 'a) : 'a =
let run_wait_block ?ls self (f : unit -> 'a) : 'a =
let q = Bb_queue.create () in
run_async ?name ?ls self (fun () ->
run_async ?ls self (fun () ->
try
let x = f () in
Bb_queue.push q (Ok x)

View file

@ -33,19 +33,14 @@ val shutdown_without_waiting : t -> unit
exception Shutdown
val run_async :
?name:string -> ?ls:Task_local_storage.storage -> t -> task -> unit
val run_async : ?ls:Task_local_storage.storage -> t -> task -> unit
(** [run_async pool f] schedules [f] for later execution on the runner
in one of the threads. [f()] will run on one of the runner's
worker threads/domains.
@param name if provided and [Trace] is present in dependencies, a span
will be created when the task starts, and will stop when the task is over.
(since NEXT_RELEASE)
@param ls if provided, run the task with this initial local storage
@raise Shutdown if the runner was shut down before [run_async] was called. *)
val run_wait_block :
?name:string -> ?ls:Task_local_storage.storage -> t -> (unit -> 'a) -> 'a
val run_wait_block : ?ls:Task_local_storage.storage -> t -> (unit -> 'a) -> 'a
(** [run_wait_block pool f] schedules [f] for later execution
on the pool, like {!run_async}.
It then blocks the current thread until [f()] is done executing,
@ -65,7 +60,7 @@ module For_runner_implementors : sig
size:(unit -> int) ->
num_tasks:(unit -> int) ->
shutdown:(wait:bool -> unit -> unit) ->
run_async:(name:string -> ls:Task_local_storage.storage -> task -> unit) ->
run_async:(ls:Task_local_storage.storage -> task -> unit) ->
unit ->
t
(** Create a new runner.

View file

@ -1,5 +1,3 @@
module A = Atomic_
type suspension = unit Exn_bt.result -> unit
type task = unit -> unit
@ -7,7 +5,7 @@ type task = unit -> unit
type suspension_handler = {
handle:
run:(name:string -> task -> unit) ->
run:(task -> unit) ->
resume:(suspension -> unit Exn_bt.result -> unit) ->
suspension ->
unit;
@ -16,6 +14,8 @@ type suspension_handler = {
[@@@ocaml.alert "-unstable"]
module A = Atomic_
type _ Effect.t +=
| Suspend : suspension_handler -> unit Effect.t
| Yield : unit Effect.t
@ -27,8 +27,7 @@ type with_suspend_handler =
| WSH : {
on_suspend: unit -> 'state;
(** on_suspend called when [f()] suspends itself. *)
run: 'state -> name:string -> task -> unit;
(** run used to schedule new tasks *)
run: 'state -> task -> unit; (** run used to schedule new tasks *)
resume: 'state -> suspension -> unit Exn_bt.result -> unit;
(** resume run the suspension. Must be called exactly once. *)
}

View file

@ -3,8 +3,6 @@
This module is an implementation detail of Moonpool and should
not be used outside of it, except by experts to implement {!Runner}. *)
open Types_
type suspension = unit Exn_bt.result -> unit
(** A suspended computation *)
@ -14,7 +12,7 @@ type task = unit -> unit
type suspension_handler = {
handle:
run:(name:string -> task -> unit) ->
run:(task -> unit) ->
resume:(suspension -> unit Exn_bt.result -> unit) ->
suspension ->
unit;
@ -24,7 +22,6 @@ type suspension_handler = {
The handler is given a few things:
- the name (if any) of the current computation
- the suspended computation (which can be resumed with a result
eventually);
- a [run] function that can be used to start tasks to perform some
@ -70,8 +67,7 @@ type with_suspend_handler =
| WSH : {
on_suspend: unit -> 'state;
(** on_suspend called when [f()] suspends itself. *)
run: 'state -> name:string -> task -> unit;
(** run used to schedule new tasks *)
run: 'state -> task -> unit; (** run used to schedule new tasks *)
resume: 'state -> suspension -> unit Exn_bt.result -> unit;
(** resume run the suspension. Must be called exactly once. *)
}

View file

@ -16,7 +16,6 @@ end
type task_full = {
f: task;
name: string;
ls: Task_local_storage.storage;
}
@ -26,7 +25,6 @@ type worker_state = {
pool_id_: Id.t; (** Unique per pool *)
mutable thread: Thread.t;
q: task_full WSQ.t; (** Work stealing queue *)
mutable cur_span: int64;
cur_ls: Task_local_storage.storage ref; (** Task storage *)
rng: Random.State.t;
}
@ -75,10 +73,10 @@ let[@inline] try_wake_someone_ (self : state) : unit =
)
(** Run [task] as is, on the pool. *)
let schedule_task_ (self : state) ~name ~ls (w : worker_state option) (f : task)
let schedule_task_ (self : state) ~ls (w : worker_state option) (f : task)
: unit =
(* Printf.printf "schedule task now (%d)\n%!" (Thread.id @@ Thread.self ()); *)
let task = { f; name; ls } in
let task = { f; ls } in
match w with
| Some w when Id.equal self.id_ w.pool_id_ ->
(* we're on this same pool, schedule in the worker's state. Otherwise
@ -107,33 +105,26 @@ let schedule_task_ (self : state) ~name ~ls (w : worker_state option) (f : task)
raise Shutdown
(** Run this task, now. Must be called from a worker. *)
let run_task_now_ (self : state) ~runner (w : worker_state) ~name ~ls task :
let run_task_now_ (self : state) ~runner (w : worker_state) ~ls task :
unit =
(* Printf.printf "run task now (%d)\n%!" (Thread.id @@ Thread.self ()); *)
let (AT_pair (before_task, after_task)) = self.around_task in
w.cur_ls := ls;
let _ctx = before_task runner in
w.cur_span <- Tracing_.enter_span name;
let[@inline] exit_span_ () =
Tracing_.exit_span w.cur_span;
w.cur_span <- Tracing_.dummy_span
in
let on_suspend () =
exit_span_ ();
let[@inline] on_suspend () =
!(w.cur_ls)
in
let run_another_task ls ~name task' =
let run_another_task ls task' =
let w = find_current_worker_ () in
let ls' = Task_local_storage.Private_.Storage.copy ls in
schedule_task_ self w ~name ~ls:ls' task'
schedule_task_ self w ~ls:ls' task'
in
let resume ls k r =
let w = find_current_worker_ () in
schedule_task_ self w ~name ~ls (fun () -> k r)
schedule_task_ self w ~ls (fun () -> k r)
in
(* run the task now, catching errors *)
@ -152,13 +143,12 @@ let run_task_now_ (self : state) ~runner (w : worker_state) ~name ~ls task :
let bt = Printexc.get_raw_backtrace () in
self.on_exn e bt);
exit_span_ ();
after_task runner _ctx;
w.cur_ls := Task_local_storage.Private_.Storage.dummy
let[@inline] run_async_ (self : state) ~name ~ls (f : task) : unit =
let[@inline] run_async_ (self : state) ~ls (f : task) : unit =
let w = find_current_worker_ () in
schedule_task_ self w ~name ~ls f
schedule_task_ self w ~ls f
(* TODO: function to schedule many tasks from the outside.
- build a queue
@ -204,7 +194,7 @@ let worker_run_self_tasks_ (self : state) ~runner w : unit =
match WSQ.pop w.q with
| Some task ->
try_wake_someone_ self;
run_task_now_ self ~runner w ~name:task.name ~ls:task.ls task.f
run_task_now_ self ~runner w ~ls:task.ls task.f
| None -> continue := false
done
@ -217,7 +207,7 @@ let worker_thread_ (self : state) ~(runner : t) (w : worker_state) : unit =
worker_run_self_tasks_ self ~runner w;
try_steal ()
and run_task task : unit =
run_task_now_ self ~runner w ~name:task.name ~ls:task.ls task.f;
run_task_now_ self ~runner w ~ls:task.ls task.f;
main ()
and try_steal () =
match try_to_steal_work_once_ self w with
@ -276,7 +266,7 @@ type ('a, 'b) create_args =
'a
(** Arguments used in {!create}. See {!create} for explanations. *)
let dummy_task_ = { f = ignore; ls = Task_local_storage.Private_.Storage.dummy ; name = "DUMMY_TASK" }
let dummy_task_ = { f = ignore; ls = Task_local_storage.Private_.Storage.dummy ; }
let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
@ -301,7 +291,6 @@ let create ?(on_init_thread = default_thread_init_exit_)
{
pool_id_;
thread = dummy;
cur_span = Tracing_.dummy_span;
q = WSQ.create ~dummy:dummy_task_ ();
rng = Random.State.make [| i |];
cur_ls = ref Task_local_storage.Private_.Storage.dummy;
@ -326,7 +315,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
let runner =
Runner.For_runner_implementors.create
~shutdown:(fun ~wait () -> shutdown_ pool ~wait)
~run_async:(fun ~name ~ls f -> run_async_ pool ~name ~ls f)
~run_async:(fun ~ls f -> run_async_ pool ~ls f)
~size:(fun () -> size_ pool)
~num_tasks:(fun () -> num_tasks_ pool)
()

View file

@ -148,9 +148,9 @@ let add_child_ ~protect (self : _ t) (child : _ t) =
let k_current_fiber : any option Task_local_storage.key =
Task_local_storage.new_key ~init:(fun () -> None) ()
let spawn_ ?name ~on (f : _ -> 'a) : 'a t =
let spawn_ ~on (f : _ -> 'a) : 'a t =
let id = Handle.generate_fresh () in
let res, _promise = Fut.make ?name () in
let res, _promise = Fut.make () in
let fib =
{
state = A.make @@ Alive { children = FM.empty; on_cancel = [] };
@ -172,17 +172,17 @@ let spawn_ ?name ~on (f : _ -> 'a) : 'a t =
resolve_as_failed_ fib ebt
in
Runner.run_async on ?name run;
Runner.run_async on run;
fib
let[@inline] spawn_top ?name ~on f : _ t = spawn_ ?name ~on f
let[@inline] spawn_top ~on f : _ t = spawn_ ~on f
let spawn_link ?name ~protect f : _ t =
let spawn_link ~protect f : _ t =
match Task_local_storage.get k_current_fiber with
| None -> failwith "Fiber.spawn_link: must be run from inside a fiber."
| Some (Any parent) ->
let child = spawn_ ?name ~on:parent.runner f in
let child = spawn_ ~on:parent.runner f in
add_child_ ~protect parent child;
child

View file

@ -55,12 +55,12 @@ val on_result : 'a t -> 'a callback -> unit
with the result. If the fiber is done already then the
callback is invoked immediately with its result. *)
val spawn_top : ?name:string -> on:Runner.t -> (unit -> 'a) -> 'a t
val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t
(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner.
This fiber is not the child of any other fiber: its lifetime
is only determined by the lifetime of [f()]. *)
val spawn_link : ?name:string -> protect:bool -> (unit -> 'a) -> 'a t
val spawn_link : protect:bool -> (unit -> 'a) -> 'a t
(** [spawn_link ~protect f] spawns a sub-fiber [f_child]
from a running fiber [parent].
The sub-fiber [f_child] is attached to the current fiber and fails

View file

@ -135,7 +135,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
let len_range = min chunk_size (n - offset) in
assert (offset + len_range <= n);
run ~name:"" (fun () -> task_for ~offset ~len_range);
run (fun () -> task_for ~offset ~len_range);
i := !i + len_range
done
in

74
src/lwt/IO.ml Normal file
View file

@ -0,0 +1,74 @@
open Base
let await_readable fd : unit =
Moonpool.Private.Suspend_.suspend
{
handle =
(fun ~run:_ ~resume sus ->
Perform_action_in_lwt.schedule
@@ Action.Wait_readable
( fd,
fun cancel ->
resume sus @@ Ok ();
Lwt_engine.stop_event cancel ));
}
let rec read fd buf i len : int =
if len = 0 then
0
else (
match Unix.read fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
await_readable fd;
read fd buf i len
| n -> n
)
let await_writable fd =
Moonpool.Private.Suspend_.suspend
{
handle =
(fun ~run:_ ~resume sus ->
Perform_action_in_lwt.schedule
@@ Action.Wait_writable
( fd,
fun cancel ->
resume sus @@ Ok ();
Lwt_engine.stop_event cancel ));
}
let rec write_once fd buf i len : int =
if len = 0 then
0
else (
match Unix.write fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
await_writable fd;
write_once fd buf i len
| n -> n
)
let write fd buf i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
let n = write_once fd buf !i !len in
i := !i + n;
len := !len - n
done
(** Sleep for the given amount of seconds *)
let sleep_s (f : float) : unit =
if f > 0. then
Moonpool.Private.Suspend_.suspend
{
handle =
(fun ~run:_ ~resume sus ->
Perform_action_in_lwt.schedule
@@ Action.Sleep
( f,
false,
fun cancel ->
resume sus @@ Ok ();
Lwt_engine.stop_event cancel ));
}

154
src/lwt/IO_in.ml Normal file
View file

@ -0,0 +1,154 @@
open Common_
class type t =
object
method input : bytes -> int -> int -> int
(** Read into the slice. Returns [0] only if the
stream is closed. *)
method close : unit -> unit
(** Close the input. Must be idempotent. *)
end
let create ?(close = ignore) ~input () : t =
object
method close = close
method input = input
end
let empty : t =
object
method close () = ()
method input _ _ _ = 0
end
let of_bytes ?(off = 0) ?len (b : bytes) : t =
(* i: current position in [b] *)
let i = ref off in
let len =
match len with
| Some n ->
if n > Bytes.length b - off then invalid_arg "Iostream.In.of_bytes";
n
| None -> Bytes.length b - off
in
let end_ = off + len in
object
method input b_out i_out len_out =
let n = min (end_ - !i) len_out in
Bytes.blit b !i b_out i_out n;
i := !i + n;
n
method close () = i := end_
end
let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s)
(** Read into the given slice.
@return the number of bytes read, [0] means end of input. *)
let[@inline] input (self : #t) buf i len = self#input buf i len
(** Close the channel. *)
let[@inline] close self : unit = self#close ()
let rec really_input (self : #t) buf i len =
if len > 0 then (
let n = input self buf i len in
if n = 0 then raise End_of_file;
(really_input [@tailrec]) self buf (i + n) (len - n)
)
let really_input_string self n : string =
let buf = Bytes.create n in
really_input self buf 0 n;
Bytes.unsafe_to_string buf
let copy_into ?(buf = Bytes.create _default_buf_size) (ic : #t) (oc : IO_out.t)
: unit =
let continue = ref true in
while !continue do
let len = input ic buf 0 (Bytes.length buf) in
if len = 0 then
continue := false
else
IO_out.output oc buf 0 len
done
let concat (l0 : t list) : t =
let l = ref l0 in
let rec input b i len : int =
match !l with
| [] -> 0
| ic :: tl ->
let n = ic#input b i len in
if n > 0 then
n
else (
l := tl;
input b i len
)
in
let close () = List.iter close l0 in
create ~close ~input ()
let input_all ?(buf = Bytes.create 128) (self : #t) : string =
let buf = ref buf in
let i = ref 0 in
let[@inline] full_ () = !i = Bytes.length !buf in
let grow_ () =
let old_size = Bytes.length !buf in
let new_size = min Sys.max_string_length (old_size + (old_size / 4) + 10) in
if old_size = new_size then
failwith "input_all: maximum input size exceeded";
let new_buf = Bytes.extend !buf 0 (new_size - old_size) in
buf := new_buf
in
let rec loop () =
if full_ () then grow_ ();
let available = Bytes.length !buf - !i in
let n = input self !buf !i available in
if n > 0 then (
i := !i + n;
(loop [@tailrec]) ()
)
in
loop ();
if full_ () then
Bytes.unsafe_to_string !buf
else
Bytes.sub_string !buf 0 !i
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size)
(fd : Unix.file_descr) : t =
let buf_len = ref 0 in
let buf_off = ref 0 in
let refill () =
buf_off := 0;
buf_len := IO.read fd buf 0 (Bytes.length buf)
in
object
method input b i len : int =
if !buf_len = 0 then refill ();
let n = min len !buf_len in
if n > 0 then (
Bytes.blit buf !buf_off b i n;
buf_off := !buf_off + n;
buf_len := !buf_len - n
);
n
method close () =
if close_noerr then (
try Unix.close fd with _ -> ()
) else
Unix.close fd
end

119
src/lwt/IO_out.ml Normal file
View file

@ -0,0 +1,119 @@
open Common_
class type t =
object
method output_char : char -> unit
method output : bytes -> int -> int -> unit
method flush : unit -> unit
method close : unit -> unit
end
let create ?(flush = ignore) ?(close = ignore) ~output_char ~output () : t =
object
method flush () = flush ()
method close () = close ()
method output_char c = output_char c
method output bs i len = output bs i len
end
let dummy : t =
object
method flush () = ()
method close () = ()
method output_char _ = ()
method output _ _ _ = ()
end
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) fd
: t =
let buf_off = ref 0 in
let[@inline] is_full () = !buf_off = Bytes.length buf in
let flush () =
if !buf_off > 0 then (
IO.write fd buf 0 !buf_off;
buf_off := 0
)
in
object
method output_char c =
if is_full () then flush ();
Bytes.set buf !buf_off c;
incr buf_off
method output bs i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
(* make space *)
if is_full () then flush ();
let n = min !len (Bytes.length buf - !buf_off) in
Bytes.blit bs !i buf !buf_off n;
buf_off := !buf_off + n;
i := !i + n;
len := !len - n
done;
(* if full, write eagerly *)
if is_full () then flush ()
method close () =
if close_noerr then (
try
flush ();
Unix.close fd
with _ -> ()
) else (
flush ();
Unix.close fd
)
method flush = flush
end
let of_buffer (buf : Buffer.t) : t =
object
method close () = ()
method flush () = ()
method output_char c = Buffer.add_char buf c
method output bs i len = Buffer.add_subbytes buf bs i len
end
(** Output the buffer slice into this channel *)
let[@inline] output_char (self : #t) c : unit = self#output_char c
(** Output the buffer slice into this channel *)
let[@inline] output (self : #t) buf i len : unit = self#output buf i len
let[@inline] output_string (self : #t) (str : string) : unit =
self#output (Bytes.unsafe_of_string str) 0 (String.length str)
let output_line (self : #t) (str : string) : unit =
output_string self str;
output_char self '\n'
(** Close the channel. *)
let[@inline] close self : unit = self#close ()
(** Flush (ie. force write) any buffered bytes. *)
let[@inline] flush self : unit = self#flush ()
let output_int self i =
let s = string_of_int i in
output_string self s
let output_lines self seq = Seq.iter (output_line self) seq
let tee (l : t list) : t =
match l with
| [] -> dummy
| [ oc ] -> oc
| _ ->
let output bs i len = List.iter (fun oc -> output oc bs i len) l in
let output_char c = List.iter (fun oc -> output_char oc c) l in
let close () = List.iter close l in
let flush () = List.iter flush l in
create ~flush ~close ~output ~output_char ()

163
src/lwt/base.ml Normal file
View file

@ -0,0 +1,163 @@
open Common_
module Fiber = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls
(** Action scheduled from outside the loop *)
module Action = struct
type event = Lwt_engine.event
type cb = event -> unit
(** Action that we ask the lwt loop to perform, from the outside *)
type t =
| Wait_readable of Unix.file_descr * cb
| Wait_writable of Unix.file_descr * cb
| Sleep of float * bool * cb
(* TODO: provide actions with cancellation, alongside a "select" operation *)
(* | Cancel of event *)
| On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t
| Wakeup : 'a Lwt.u * 'a -> t
| Wakeup_exn : _ Lwt.u * exn -> t
| Other of (unit -> unit)
(** Perform the action from within the Lwt thread *)
let perform (self : t) : unit =
match self with
| Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event)
| Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event)
| Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event)
(* | Cancel ev -> Lwt_engine.stop_event ev *)
| On_termination (fut, f) ->
Lwt.on_any fut
(fun x -> f @@ Ok x)
(fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn))
| Wakeup (prom, x) -> Lwt.wakeup prom x
| Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e
| Other f -> f ()
end
module Action_queue = struct
type t = { q: Action.t list Atomic.t } [@@unboxed]
let create () : t = { q = Atomic.make [] }
let pop_all (self : t) : _ list = Atomic.exchange self.q []
(** Push the action and return whether the queue was previously empty *)
let push (self : t) (a : Action.t) : bool =
let is_first = ref true in
while
let old = Atomic.get self.q in
if Atomic.compare_and_set self.q old (a :: old) then (
is_first := old == [];
false
) else
true
do
()
done;
!is_first
end
module Perform_action_in_lwt = struct
open struct
let actions_ : Action_queue.t = Action_queue.create ()
(** Gets the current set of notifications and perform them from inside the
Lwt thread *)
let perform_pending_actions () : unit =
let@ _sp =
Moonpool.Private.Tracing_.with_span
"moonpool-lwt.perform-pending-actions"
in
let l = Action_queue.pop_all actions_ in
List.iter Action.perform l
let notification : int =
Lwt_unix.make_notification ~once:false perform_pending_actions
end
let schedule (a : Action.t) : unit =
let is_first = Action_queue.push actions_ a in
if is_first then Lwt_unix.send_notification notification
end
let get_runner () : M.Runner.t =
match M.Runner.get_current_runner () with
| Some r -> r
| None -> failwith "Moonpool_lwt.get_runner: not inside a runner"
let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
let lwt_fut, lwt_prom = Lwt.wait () in
M.Fut.on_result fut (function
| Ok x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (lwt_prom, x)
| Error (exn, _) ->
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (lwt_prom, exn));
lwt_fut
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
match Lwt.poll lwt_fut with
| Some x -> M.Fut.return x
| None ->
let fut, prom = M.Fut.make () in
Lwt.on_any lwt_fut
(fun x -> M.Fut.fulfill prom (Ok x))
(fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10)));
fut
let await_lwt (fut : _ Lwt.t) =
match Lwt.poll fut with
| Some x -> x
| None ->
(* suspend fiber, wake it up when [fut] resolves *)
M.Private.Suspend_.suspend
{
handle =
(fun ~run:_ ~resume sus ->
let on_lwt_done _ = resume sus @@ Ok () in
Perform_action_in_lwt.(
schedule Action.(On_termination (fut, on_lwt_done))));
};
(match Lwt.poll fut with
| Some x -> x
| None -> assert false)
let run_in_lwt f : _ M.Fut.t =
let fut, prom = M.Fut.make () in
Perform_action_in_lwt.schedule
(Action.Other
(fun () ->
let lwt_fut = f () in
Lwt.on_any lwt_fut
(fun x -> M.Fut.fulfill prom @@ Ok x)
(fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn))));
fut
let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f
let detach_in_runner ~runner f : _ Lwt.t =
let fut, promise = Lwt.wait () in
M.Runner.run_async runner (fun () ->
match f () with
| x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (promise, x)
| exception exn ->
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn));
fut
let main_with_runner ~runner (f : unit -> 'a) : 'a =
let lwt_fut, lwt_prom = Lwt.wait () in
let _fiber =
Fiber.spawn_top ~on:runner (fun () ->
try
let x = f () in
Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x))
with exn ->
Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn)))
in
Lwt_main.run lwt_fut
let main f =
let@ runner = M.Ws_pool.with_ () in
main_with_runner ~runner f

5
src/lwt/common_.ml Normal file
View file

@ -0,0 +1,5 @@
module M = Moonpool
module Exn_bt = M.Exn_bt
let ( let@ ) = ( @@ )
let _default_buf_size = 4 * 1024

8
src/lwt/dune Normal file
View file

@ -0,0 +1,8 @@
(library
(name moonpool_lwt)
(public_name moonpool-lwt)
(private_modules common_)
(enabled_if
(>= %{ocaml_version} 5.0))
(libraries moonpool moonpool.fib lwt lwt.unix))

6
src/lwt/moonpool_lwt.ml Normal file
View file

@ -0,0 +1,6 @@
include Base
module IO = IO
module IO_out = IO_out
module IO_in = IO_in
module TCP_server = Tcp_server
module TCP_client = Tcp_client

144
src/lwt/moonpool_lwt.mli Normal file
View file

@ -0,0 +1,144 @@
(** Lwt_engine-based event loop for Moonpool.
In what follows, we mean by "lwt thread" the thread
running [Lwt_main.run] (so, the thread where the Lwt event
loop and all Lwt callbacks execute).
@since NEXT_RELEASE *)
module Fiber = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls
(** {2 Basic conversions} *)
val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t
(** [fut_of_lwt lwt_fut] makes a thread-safe moonpool future that
completes when [lwt_fut] does. This must be run from within
the Lwt thread. *)
val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t
(** [lwt_of_fut fut] makes a lwt future that completes when
[fut] does. This must be called from the Lwt thread, and the result
must always be used only from inside the Lwt thread. *)
(** {2 Helpers on the moonpool side} *)
val await_lwt : 'a Lwt.t -> 'a
(** [await_lwt fut] awaits a Lwt future from inside a task running on
a moonpool runner. This must be run from within a Moonpool runner
so that the await-ing effect is handled. *)
val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t
(** [run_in_lwt f] runs [f()] from within the Lwt thread
and returns a thread-safe future. This can be run from anywhere. *)
val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a
(** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and
awaits its result. Must be run from inside a moonpool runner
so that the await-in effect is handled.
This is similar to [Moonpool.await @@ run_in_lwt f]. *)
val get_runner : unit -> Moonpool.Runner.t
(** Returns the runner from within which this is called.
Must be run from within a fiber.
@raise Failure if not run within a fiber *)
(** {2 IO} *)
(** IO using the Lwt event loop.
These IO operations work on non-blocking file descriptors
and rely on a [Lwt_engine] event loop being active (meaning,
[Lwt_main.run] is currently running in some thread).
Calling these functions must be done from a moonpool runner.
A function like [read] will first try to perform the IO action
directly (here, call {!Unix.read}); if the action fails because
the FD is not ready, then [await_readable] is called:
it suspends the fiber and subscribes it to Lwt to be awakened
when the FD becomes ready.
*)
module IO : sig
val read : Unix.file_descr -> bytes -> int -> int -> int
(** Read from the file descriptor *)
val await_readable : Unix.file_descr -> unit
(** Suspend the fiber until the FD is readable *)
val write_once : Unix.file_descr -> bytes -> int -> int -> int
(** Perform one write into the file descriptor *)
val await_writable : Unix.file_descr -> unit
(** Suspend the fiber until the FD is writable *)
val write : Unix.file_descr -> bytes -> int -> int -> unit
(** Loop around {!write_once} to write the entire slice. *)
val sleep_s : float -> unit
(** Suspend the fiber for [n] seconds. *)
end
module IO_in = IO_in
(** Input channel *)
module IO_out = IO_out
(** Output channel *)
module TCP_server : sig
type t = Lwt_io.server
val establish_lwt :
?backlog:(* ?server_fd:Unix.file_descr -> *)
int ->
?no_close:bool ->
runner:Moonpool.Runner.t ->
Unix.sockaddr ->
(Unix.sockaddr -> Lwt_io.input_channel -> Lwt_io.output_channel -> unit) ->
t
(** [establish ~runner addr handler] runs a TCP server in the Lwt
thread. When a client connects, a moonpool fiber is started on [runner]
to handle it. *)
val establish :
?backlog:(* ?server_fd:Unix.file_descr -> *)
int ->
?no_close:bool ->
runner:Moonpool.Runner.t ->
Unix.sockaddr ->
(Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) ->
t
(** Like {!establish_lwt} but uses {!IO} to directly handle
reads and writes on client sockets. *)
val shutdown : t -> unit
(** Shutdown the server *)
end
module TCP_client : sig
val connect : Unix.sockaddr -> Unix.file_descr
val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a
(** Open a connection, and use {!IO} to read and write from
the socket in a non blocking way. *)
val with_connect_lwt :
Unix.sockaddr -> (Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) -> 'a
(** Open a connection. *)
end
(** {2 Helpers on the lwt side} *)
val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t
(** [detach_in_runner ~runner f] runs [f] in the given moonpool runner,
and returns a lwt future. This must be run from within the thread
running [Lwt_main]. *)
(** {2 Wrappers around Lwt_main} *)
val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a
(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside
a fiber in [runner]. *)
val main : (unit -> 'a) -> 'a
(** Like {!main_with_runner} but with a default choice of runner. *)

53
src/lwt/tcp_client.ml Normal file
View file

@ -0,0 +1,53 @@
open Common_
open Base
let connect addr : Unix.file_descr =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.set_nonblock sock;
Unix.setsockopt sock Unix.TCP_NODELAY true;
(* connect asynchronously *)
while
try
Unix.connect sock addr;
false
with
| Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _)
->
IO.await_writable sock;
true
do
()
done;
sock
let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a =
let sock = connect addr in
let ic = IO_in.of_unix_fd sock in
let oc = IO_out.of_unix_fd sock in
let finally () = try Unix.close sock with _ -> () in
let@ () = Fun.protect ~finally in
f ic oc
let with_connect_lwt addr
(f : Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) : 'a =
let sock = connect addr in
let ic =
run_in_lwt_and_await (fun () ->
Lwt.return @@ Lwt_io.of_unix_fd ~mode:Lwt_io.input sock)
in
let oc =
run_in_lwt_and_await (fun () ->
Lwt.return @@ Lwt_io.of_unix_fd ~mode:Lwt_io.output sock)
in
let finally () =
(try run_in_lwt_and_await (fun () -> Lwt_io.close ic) with _ -> ());
(try run_in_lwt_and_await (fun () -> Lwt_io.close oc) with _ -> ());
try Unix.close sock with _ -> ()
in
let@ () = Fun.protect ~finally in
f ic oc

38
src/lwt/tcp_server.ml Normal file
View file

@ -0,0 +1,38 @@
open Common_
open Base
type t = Lwt_io.server
let establish_lwt ?backlog ?no_close ~runner addr handler : t =
let server =
Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr
(fun client_addr client_sock ->
let ic = Lwt_io.of_fd ~mode:Lwt_io.input client_sock in
let oc = Lwt_io.of_fd ~mode:Lwt_io.output client_sock in
let fut =
M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc)
in
let lwt_fut = lwt_of_fut fut in
lwt_fut)
in
await_lwt server
let establish ?backlog ?no_close ~runner addr handler : t =
let server =
Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr
(fun client_addr client_sock ->
let ic = IO_in.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in
let oc = IO_out.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in
let fut =
M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc)
in
let lwt_fut = lwt_of_fut fut in
lwt_fut)
in
await_lwt server
let shutdown self = await_lwt @@ Lwt_io.shutdown_server self

View file

@ -3,3 +3,4 @@ let dummy_span = 0L
let enter_span _name = dummy_span
let exit_span = ignore
let set_thread_name = ignore
let with_span _ f = f dummy_span

View file

@ -1,5 +1,6 @@
val dummy_span : int64
val enter_span : string -> int64
val exit_span : int64 -> unit
val with_span : string -> (int64 -> 'a) -> 'a
val enabled : unit -> bool
val set_thread_name : string -> unit

View file

@ -12,3 +12,14 @@ let[@inline] enter_span name : int64 =
Trace.enter_span ~__FILE__:dummy_file_ ~__LINE__:0 name
let[@inline] exit_span sp = if sp <> dummy_span then Trace.exit_span sp
let with_span name f =
let sp = enter_span name in
try
let x = f sp in
exit_span sp;
x
with exn ->
let bt = Printexc.get_raw_backtrace () in
exit_span sp;
Printexc.raise_with_backtrace exn bt

View file

@ -20,7 +20,7 @@ let fib ~on x : int Fut.t =
Fut.await t1 + Fut.await t2
)
in
Fut.spawn ~name:"fib" ~on (fun () -> fib_rec x)
Fut.spawn ~on (fun () -> fib_rec x)
(* NOTE: for tracy support
let () = Tracy_client_trace.setup ()

3
test/lwt/dune Normal file
View file

@ -0,0 +1,3 @@
(executables
(names echo_server echo_client hash_server hash_client)
(libraries moonpool moonpool-lwt lwt lwt.unix trace.core trace-tef))

86
test/lwt/echo_client.ml Normal file
View file

@ -0,0 +1,86 @@
module M = Moonpool
module M_lwt = Moonpool_lwt
module Trace = Trace_core
let ( let@ ) = ( @@ )
let main ~port ~runner ~n ~n_conn () : unit Lwt.t =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
let remaining = Atomic.make n in
let all_done = Atomic.make 0 in
let fut_exit, prom_exit = M.Fut.make () in
Printf.printf "connecting to port %d\n%!" port;
let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in
let rec run_task () =
(* Printf.printf "running task\n%!"; *)
let n = Atomic.fetch_and_add remaining (-1) in
if n > 0 then (
(let _sp =
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "connect.client"
in
Trace.message "connecting new client…";
M_lwt.TCP_client.with_connect addr @@ fun ic oc ->
let buf = Bytes.create 32 in
for _j = 1 to 100 do
let _sp =
Trace.enter_manual_sub_span ~parent:_sp ~__FILE__ ~__LINE__
"write.loop"
in
M_lwt.IO_out.output_string oc "hello";
M_lwt.IO_out.flush oc;
(* read back something *)
M_lwt.IO_in.really_input ic buf 0 (String.length "hello");
Trace.exit_manual_span _sp;
()
done;
Trace.exit_manual_span _sp);
(* run another task *) M.Runner.run_async runner run_task
) else (
(* if we're the last to exit, resolve the promise *)
let n_already_done = Atomic.fetch_and_add all_done 1 in
if n_already_done = n_conn - 1 then (
Printf.printf "all done\n%!";
M.Fut.fulfill prom_exit @@ Ok ()
)
)
in
(* start the first [n_conn] tasks *)
for _i = 1 to n_conn do
M.Runner.run_async runner run_task
done;
(* exit when [fut_exit] is resolved *)
M_lwt.lwt_of_fut fut_exit
let () =
let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "main";
let port = ref 0 in
let j = ref 4 in
let n_conn = ref 100 in
let n = ref 50_000 in
let opts =
[
"-p", Arg.Set_int port, " port";
"-j", Arg.Set_int j, " number of threads";
"-n", Arg.Set_int n, " total number of connections";
"--n-conn", Arg.Set_int n_conn, " number of parallel connections";
]
|> Arg.align
in
Arg.parse opts ignore "echo client";
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
Lwt_engine.set @@ new Lwt_engine.libev ();
Lwt_main.run @@ main ~runner ~port:!port ~n:!n ~n_conn:!n_conn ()

66
test/lwt/echo_server.ml Normal file
View file

@ -0,0 +1,66 @@
module M = Moonpool
module M_lwt = Moonpool_lwt
module Trace = Trace_core
let ( let@ ) = ( @@ )
let spf = Printf.sprintf
let str_of_sockaddr = function
| Unix.ADDR_UNIX s -> s
| Unix.ADDR_INET (addr, port) ->
spf "%s:%d" (Unix.string_of_inet_addr addr) port
let main ~port ~runner () : unit Lwt.t =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
let lwt_fut, _lwt_prom = Lwt.wait () in
(* TODO: handle exit?? *)
Printf.printf "listening on port %d\n%!" port;
let handle_client client_addr ic oc =
let _sp =
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "handle.client"
~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ])
in
let buf = Bytes.create 32 in
let continue = ref true in
while !continue do
Trace.message "read";
let n = M_lwt.IO_in.input ic buf 0 (Bytes.length buf) in
if n = 0 then
continue := false
else (
Trace.messagef (fun k -> k "got %dB" n);
M_lwt.IO_out.output oc buf 0 n;
M_lwt.IO_out.flush oc;
Trace.message "write"
)
done;
Trace.exit_manual_span _sp;
Trace.message "exit handle client"
in
let addr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
let _server = M_lwt.TCP_server.establish ~runner addr handle_client in
lwt_fut
let () =
let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "main";
let port = ref 0 in
let j = ref 4 in
let opts =
[
"-p", Arg.Set_int port, " port"; "-j", Arg.Set_int j, " number of threads";
]
|> Arg.align
in
Arg.parse opts ignore "echo server";
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
Lwt_engine.set @@ new Lwt_engine.libev ();
Lwt_main.run @@ main ~runner ~port:!port ()

69
test/lwt/hash_client.ml Normal file
View file

@ -0,0 +1,69 @@
module M = Moonpool
module M_lwt = Moonpool_lwt
module Trace = Trace_core
let ( let@ ) = ( @@ )
let main ~port ~runner ~dir ~n_conn () : unit Lwt.t =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
Printf.printf "hash dir=%S\n%!" dir;
Printf.printf "connecting to port %d\n%!" port;
let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in
(* TODO: *)
let run_task () : unit =
let _sp = Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "run-task" in
M_lwt.TCP_client.with_connect_lwt addr @@ fun ic oc ->
let rec walk file : unit =
if not (Sys.file_exists file) then
()
else if Sys.is_regular_file file then (
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.write_line oc file);
let res = M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.read_line ic) in
Printf.printf "%s\n%!" res
) else if Sys.is_directory file then (
let _sp =
Trace.enter_manual_sub_span ~parent:_sp ~__FILE__ ~__LINE__ "walk-dir"
~data:(fun () -> [ "d", `String file ])
in
Printf.printf "explore %S\n%!" file;
let d = Sys.readdir file in
Array.sort String.compare d;
Array.iter (fun sub -> walk (Filename.concat file sub)) d
)
in
walk dir;
Trace.exit_manual_span _sp
in
(* start the first [n_conn] tasks *)
let futs = List.init n_conn (fun _ -> M.Fut.spawn ~on:runner run_task) in
Lwt.join (List.map M_lwt.lwt_of_fut futs)
let () =
let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "main";
let port = ref 1234 in
let j = ref 4 in
let n_conn = ref 100 in
let dir = ref "." in
let opts =
[
"-p", Arg.Set_int port, " port";
"-j", Arg.Set_int j, " number of threads";
"-d", Arg.Set_string dir, " directory to hash";
"--n-conn", Arg.Set_int n_conn, " number of parallel connections";
]
|> Arg.align
in
Arg.parse opts ignore "echo client";
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
Lwt_engine.set @@ new Lwt_engine.libev ();
Lwt_main.run @@ main ~runner ~port:!port ~dir:!dir ~n_conn:!n_conn ()

235
test/lwt/hash_server.ml Normal file
View file

@ -0,0 +1,235 @@
(* vendored from https://github.com/dbuenzli/uuidm *)
let sha_1 s =
(* Based on pseudo-code of RFC 3174. Slow and ugly but does the job. *)
let sha_1_pad s =
let len = String.length s in
let blen = 8 * len in
let rem = len mod 64 in
let mlen =
if rem > 55 then
len + 128 - rem
else
len + 64 - rem
in
let m = Bytes.create mlen in
Bytes.blit_string s 0 m 0 len;
Bytes.fill m len (mlen - len) '\x00';
Bytes.set m len '\x80';
if Sys.word_size > 32 then (
Bytes.set m (mlen - 8) (Char.unsafe_chr ((blen lsr 56) land 0xFF));
Bytes.set m (mlen - 7) (Char.unsafe_chr ((blen lsr 48) land 0xFF));
Bytes.set m (mlen - 6) (Char.unsafe_chr ((blen lsr 40) land 0xFF));
Bytes.set m (mlen - 5) (Char.unsafe_chr ((blen lsr 32) land 0xFF))
);
Bytes.set m (mlen - 4) (Char.unsafe_chr ((blen lsr 24) land 0xFF));
Bytes.set m (mlen - 3) (Char.unsafe_chr ((blen lsr 16) land 0xFF));
Bytes.set m (mlen - 2) (Char.unsafe_chr ((blen lsr 8) land 0xFF));
Bytes.set m (mlen - 1) (Char.unsafe_chr (blen land 0xFF));
m
in
(* Operations on int32 *)
let ( &&& ) = ( land ) in
let ( lor ) = Int32.logor in
let ( lxor ) = Int32.logxor in
let ( land ) = Int32.logand in
let ( ++ ) = Int32.add in
let lnot = Int32.lognot in
let sr = Int32.shift_right in
let sl = Int32.shift_left in
let cls n x = sl x n lor Int32.shift_right_logical x (32 - n) in
(* Start *)
let m = sha_1_pad s in
let w = Array.make 16 0l in
let h0 = ref 0x67452301l in
let h1 = ref 0xEFCDAB89l in
let h2 = ref 0x98BADCFEl in
let h3 = ref 0x10325476l in
let h4 = ref 0xC3D2E1F0l in
let a = ref 0l in
let b = ref 0l in
let c = ref 0l in
let d = ref 0l in
let e = ref 0l in
for i = 0 to (Bytes.length m / 64) - 1 do
(* For each block *)
(* Fill w *)
let base = i * 64 in
for j = 0 to 15 do
let k = base + (j * 4) in
w.(j) <-
sl (Int32.of_int (Char.code @@ Bytes.get m k)) 24
lor sl (Int32.of_int (Char.code @@ Bytes.get m (k + 1))) 16
lor sl (Int32.of_int (Char.code @@ Bytes.get m (k + 2))) 8
lor Int32.of_int (Char.code @@ Bytes.get m (k + 3))
done;
(* Loop *)
a := !h0;
b := !h1;
c := !h2;
d := !h3;
e := !h4;
for t = 0 to 79 do
let f, k =
if t <= 19 then
!b land !c lor (lnot !b land !d), 0x5A827999l
else if t <= 39 then
!b lxor !c lxor !d, 0x6ED9EBA1l
else if t <= 59 then
!b land !c lor (!b land !d) lor (!c land !d), 0x8F1BBCDCl
else
!b lxor !c lxor !d, 0xCA62C1D6l
in
let s = t &&& 0xF in
if t >= 16 then
w.(s) <-
cls 1
(w.(s + 13 &&& 0xF)
lxor w.(s + 8 &&& 0xF)
lxor w.(s + 2 &&& 0xF)
lxor w.(s));
let temp = cls 5 !a ++ f ++ !e ++ w.(s) ++ k in
e := !d;
d := !c;
c := cls 30 !b;
b := !a;
a := temp
done;
(* Update *)
h0 := !h0 ++ !a;
h1 := !h1 ++ !b;
h2 := !h2 ++ !c;
h3 := !h3 ++ !d;
h4 := !h4 ++ !e
done;
let h = Bytes.create 20 in
let i2s h k i =
Bytes.set h k (Char.unsafe_chr (Int32.to_int (sr i 24) &&& 0xFF));
Bytes.set h (k + 1) (Char.unsafe_chr (Int32.to_int (sr i 16) &&& 0xFF));
Bytes.set h (k + 2) (Char.unsafe_chr (Int32.to_int (sr i 8) &&& 0xFF));
Bytes.set h (k + 3) (Char.unsafe_chr (Int32.to_int i &&& 0xFF))
in
i2s h 0 !h0;
i2s h 4 !h1;
i2s h 8 !h2;
i2s h 12 !h3;
i2s h 16 !h4;
Bytes.unsafe_to_string h
(*---------------------------------------------------------------------------
Copyright (c) 2008 The uuidm programmers
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
---------------------------------------------------------------------------*)
(* server that reads from sockets lists of files, and returns hashes of these files *)
module M = Moonpool
module M_lwt = Moonpool_lwt
module Trace = Trace_core
let ( let@ ) = ( @@ )
let spf = Printf.sprintf
let to_hex s =
let i2h i = String.get (spf "%x" i) 0 in
let n = String.length s in
let bs = Bytes.create (n * 2) in
for i = 0 to n - 1 do
Bytes.set bs (2 * i) (i2h ((Char.code s.[i] land 0b1111_0000) lsr 4));
Bytes.set bs ((2 * i) + 1) (i2h (Char.code s.[i] land 0b0000_1111))
done;
Bytes.unsafe_to_string bs
let str_of_sockaddr = function
| Unix.ADDR_UNIX s -> s
| Unix.ADDR_INET (addr, port) ->
spf "%s:%d" (Unix.string_of_inet_addr addr) port
[@@@ocaml.warning "-48"]
let read_file filename : string =
let@ _sp =
Trace.with_span ~__FILE__ ~__LINE__ "read-file" ~data:(fun () ->
[ "f", `String filename ])
in
In_channel.with_open_bin filename In_channel.input_all
let main ~port ~runner () : unit Lwt.t =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
let lwt_fut, _lwt_prom = Lwt.wait () in
(* TODO: handle exit?? *)
Printf.printf "listening on port %d\n%!" port;
let handle_client client_addr ic oc =
let _sp =
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "handle.client"
~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ])
in
try
while true do
Trace.message "read";
let filename =
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.read_line ic)
|> String.trim
in
Trace.messagef (fun k -> k "hash %S" filename);
match read_file filename with
| exception e ->
Printf.eprintf "error while reading %S:\n%s\n" filename
(Printexc.to_string e);
M_lwt.run_in_lwt_and_await (fun () ->
Lwt_io.write_line oc (spf "%s: error" filename));
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.flush oc)
| content ->
(* got the content, now hash it *)
let hash =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "hash" in
sha_1 content |> to_hex
in
M_lwt.run_in_lwt_and_await (fun () ->
Lwt_io.write_line oc (spf "%s: %s" filename hash));
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.flush oc)
done
with End_of_file | Unix.Unix_error (Unix.ECONNRESET, _, _) ->
Trace.exit_manual_span _sp;
Trace.message "exit handle client"
in
let addr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
let _server = M_lwt.TCP_server.establish_lwt ~runner addr handle_client in
lwt_fut
let () =
let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "main";
let port = ref 1234 in
let j = ref 4 in
let opts =
[
"-p", Arg.Set_int port, " port"; "-j", Arg.Set_int j, " number of threads";
]
|> Arg.align
in
Arg.parse opts ignore "echo server";
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
Lwt_engine.set @@ new Lwt_engine.libev ();
Lwt_main.run @@ main ~runner ~port:!port ()