Compare commits

...

63 commits

Author SHA1 Message Date
Simon Cruanes
7c64ad9400
Merge 01cdb66f1f into d957f7b54e 2025-10-30 13:09:04 +01:00
Simon Cruanes
d957f7b54e
small refactor
Some checks failed
github pages / Deploy doc (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / build-compat (push) Has been cancelled
Build and Test / format (push) Has been cancelled
2025-10-25 21:46:20 -04:00
Simon Cruanes
a26503df0b
refactor chan; fix bug in Chan.try_push
we could return `false` even though we succeeded in pushing a value into
the chan.
2025-10-25 21:21:03 -04:00
Simon Cruanes
92300ad698
fix: make Moonpool_lwt.fut_of_lwt idempotent
Some checks failed
github pages / Deploy doc (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / build-compat (push) Has been cancelled
Build and Test / format (push) Has been cancelled
this way the resulting future can be cancelled/fulfilled from
the outside without crashing Lwt
2025-10-07 13:53:54 -04:00
Simon Cruanes
538f3df31a
doc correction
Some checks failed
github pages / Deploy doc (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / build-compat (push) Has been cancelled
Build and Test / format (push) Has been cancelled
2025-10-02 14:37:15 -04:00
Simon Cruanes
dbc099052d
CI
Some checks failed
github pages / Deploy doc (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / build-compat (push) Has been cancelled
Build and Test / format (push) Has been cancelled
2025-09-30 11:26:05 -04:00
Simon Cruanes
8d99628f03
remove deprecated moonpool-io and moonpool.sync 2025-09-30 11:24:53 -04:00
Simon Cruanes
0e5a2896ef
prepare for 0.9
Some checks are pending
github pages / Deploy doc (push) Waiting to run
Build and Test / build (push) Waiting to run
Build and Test / build-compat (push) Waiting to run
Build and Test / format (push) Waiting to run
2025-09-29 08:51:42 -04:00
Simon Cruanes
9601621ebc
opam fixes 2025-09-29 08:51:42 -04:00
Simon Cruanes
70018423ff
fix build 2025-09-26 15:44:21 -04:00
Simon Cruanes
64c3442078
more doc 2025-09-26 15:44:21 -04:00
Simon Cruanes
03f8ccd030
CI 2025-09-26 15:44:21 -04:00
Simon Cruanes
d98dadeb84
Merge pull request #37 from c-cube/simon/lwt-main-runner-2025-07-09
change moonpool-lwt to make it a lwt-engine based runner
2025-09-26 15:04:48 -04:00
Simon Cruanes
d79200f555
Merge pull request #39 from c-cube/simon/reduce-scope-2025-07-09
reduce scope of the library a bit
2025-09-26 15:02:01 -04:00
Simon Cruanes
2dbbad4ef2
refactor moonpool_lwt 2025-09-26 14:55:26 -04:00
Simon Cruanes
677ae5c36a
perf: fast path for Moonpool_lwt.run_in_lwt_and_await 2025-09-26 14:55:26 -04:00
Simon Cruanes
4e19719c4f
modify signature for Moonpool_lwt.run_in_lwt_and_await 2025-09-26 14:55:26 -04:00
Simon Cruanes
4f685313de
detail 2025-09-26 14:55:26 -04:00
Simon Cruanes
8bd79c70b5
add Moonpool_lwt.on_lwt_thread 2025-09-26 14:55:26 -04:00
Simon Cruanes
f245f4913c
add Moonpool_lwt.spawn_lwt_ignore 2025-09-26 14:55:26 -04:00
Simon Cruanes
2aabc30b70
fix test 2025-09-26 14:55:26 -04:00
Simon Cruanes
a42737aa81
format 2025-09-26 14:55:25 -04:00
Simon Cruanes
bf649f5348
fix test 2025-09-26 14:55:25 -04:00
Simon Cruanes
44edf60836
fix tests 2025-09-26 14:55:25 -04:00
Simon Cruanes
86b64ae3d4
fix lwt: make sure to wakeup loop in main
there's a race condition where, by the time we schedule the
main fiber in `lwt_main`, the event loop is already asleep (maybe
from a previous run). We make sure to wake the loop up.
2025-09-26 14:55:25 -04:00
Simon Cruanes
01026fafaa
doc 2025-09-26 14:55:25 -04:00
Simon Cruanes
2afb5c1036
adapt some tests for the lwt runner 2025-09-26 14:55:25 -04:00
Simon Cruanes
9e814ecb48
lwt: handle fibers in moonpool_lwt 2025-09-26 14:55:25 -04:00
Simon Cruanes
00078d8b43
update test 2025-09-26 14:55:25 -04:00
Simon Cruanes
e3be2aceaa
feat lwt: make sure we can setup/cleanup multiple times 2025-09-26 14:55:25 -04:00
Simon Cruanes
1eef212a3e
more sanity checks 2025-09-26 14:55:25 -04:00
Simon Cruanes
63559f0f3b
detail 2025-09-26 14:55:25 -04:00
Simon Cruanes
6c8c06b391
update lwt test 2025-09-26 14:55:25 -04:00
Simon Cruanes
122b3a6b06
feat lwt: make most functions work on any thread, not just the main 2025-09-26 14:55:25 -04:00
Simon Cruanes
786d75d680
comments/license for the Lwt hash server 2025-09-26 14:55:25 -04:00
Simon Cruanes
50b9dd9b62
fix CI for lwt tests 2025-09-26 14:55:25 -04:00
Simon Cruanes
da551edbd3
fix lwt tests 2025-09-26 14:55:25 -04:00
Simon Cruanes
6ae82f130a
feat lwt: proper wakeup; add lwt_main_runner 2025-09-26 14:55:25 -04:00
Simon Cruanes
0fecde07fc
test: update Lwt tests to use the new Moonpool_lwt 2025-09-26 14:55:25 -04:00
Simon Cruanes
a24bd7472d
feat worker_loop: always use reschedule in await
it's better than continuing right now because it will potentially
reschedule on the correct runner.
2025-09-26 14:55:25 -04:00
Simon Cruanes
796c4f6f31
feat lwt: improvements 2025-09-26 14:55:25 -04:00
Simon Cruanes
f53dbe4dda
cleanup worker loop 2025-09-26 14:55:25 -04:00
Simon Cruanes
e09c809a45
deprecate moonpool.sync 2025-09-26 14:55:25 -04:00
Simon Cruanes
f5993408c0
wip: debug echo server 2025-09-26 14:55:24 -04:00
Simon Cruanes
6c4fb69d23
wip: lwt 2025-09-26 14:55:24 -04:00
Simon Cruanes
72d8c09898
wip 2025-09-26 14:55:24 -04:00
Simon Cruanes
543135a0b0
wip: echo server using lwt 2025-09-26 14:55:24 -04:00
Simon Cruanes
295f22e770
wip: lwt 2025-09-26 14:55:24 -04:00
Simon Cruanes
bf90c32c86
wip lwt: event loop for moonpool directly inside lwt 2025-09-26 14:55:24 -04:00
Simon Cruanes
55e3e77a66
core: cleanup, and add a fined grained API for worker loop 2025-09-26 14:55:24 -04:00
Simon Cruanes
1a64e7345e
Revert "deprecate fibers"
This reverts commit 83acc18d3d.
2025-09-26 14:54:50 -04:00
Simon Cruanes
2c1def188a
breaking: require OCaml 5 2025-07-09 16:44:12 -04:00
Simon Cruanes
b9bbcf82f7
test do not need preprocessor anymore 2025-07-09 16:43:51 -04:00
Simon Cruanes
0ab99517d5
benchs: no preprocessor anymore 2025-07-09 16:41:05 -04:00
Simon Cruanes
41561c3bff
deprecated moonpool_io 2025-07-09 16:25:10 -04:00
Simon Cruanes
50a44a76e1
forkjoin not longer optional 2025-07-09 16:25:03 -04:00
Simon Cruanes
f6ad345f31
fib: remove preprocessor 2025-07-09 16:24:49 -04:00
Simon Cruanes
f8d5c564de
remove version-dependent preprocessor 2025-07-09 15:42:23 -04:00
Simon Cruanes
2dcc858384
remove Atomic stubs, we're already depending on >4.12 2025-07-09 15:39:26 -04:00
Simon Cruanes
83acc18d3d
deprecate fibers 2025-07-09 15:28:33 -04:00
Simon Cruanes
5ea9a3f587
remove bounded_queue 2025-07-09 15:28:25 -04:00
Simon Cruanes
01cdb66f1f
avoid recursion in dpool 2024-10-09 00:26:30 -04:00
Simon Cruanes
8cb09c01c4
fix domain pool: block signals in background threads
close #35
2024-10-08 15:28:04 -04:00
82 changed files with 3263 additions and 2188 deletions

View file

@ -15,14 +15,14 @@ jobs:
- name: Use OCaml
uses: ocaml/setup-ocaml@v3
with:
ocaml-compiler: '5.0'
ocaml-compiler: '5.3'
dune-cache: true
allow-prerelease-opam: true
# temporary until it's in a release
- run: opam pin picos 0.6.0 -y -n
- run: opam install odig moonpool moonpool-lwt moonpool-io
- run: opam install odig moonpool moonpool-lwt -t
- run: opam exec -- odig odoc --cache-dir=_doc/ moonpool moonpool-lwt

View file

@ -16,8 +16,8 @@ jobs:
os:
- ubuntu-latest
ocaml-compiler:
- '4.14'
- '5.2'
- '5.0'
- '5.3'
runs-on: ${{ matrix.os }}
steps:
@ -31,16 +31,11 @@ jobs:
- run: opam pin picos 0.6.0 -y -n
- run: opam install -t moonpool moonpool-lwt moonpool-io --deps-only
if: matrix.ocaml-compiler == '5.2'
- run: opam install -t moonpool --deps-only
if: matrix.ocaml-compiler != '5.2'
- run: opam install -t moonpool moonpool-lwt --deps-only
- run: opam exec -- dune build @install
# install some depopts
- run: opam install thread-local-storage trace hmap
if: matrix.ocaml-compiler == '5.2'
- run: opam exec -- dune build --profile=release --force @install @runtest
compat:
@ -67,7 +62,7 @@ jobs:
# temporary until it's in a release
- run: opam pin https://github.com/ocaml-multicore/picos.git -y -n
- run: opam install -t moonpool moonpool-lwt moonpool-io --deps-only
- run: opam install -t moonpool moonpool-lwt --deps-only
- run: opam exec -- dune build @install
# install some depopts
- run: opam install thread-local-storage trace domain-local-await

View file

@ -1,4 +1,21 @@
# 0.9
- breaking: require OCaml 5
* no further need for a preprocessor
* forkjoin not longer optional
- moonpool-lwt: large changes, including a Runner that runs
inside `Lwt_unix`'s event loop and can thus use any `_ Lwt.t` function
- remove bounded_queue
- fix core: better repropagating of errors
- add `Fut.{cancel,try_cancel}`
- perf: `await` on immediately ready timer queues its task
- feat: add `Moonpool.yield`
- deprecate moonpool.sync
- deprecate moonpool_io
# 0.8
- api(fut): make alias `'a Fut.t = 'a Picos.Computation.t` public

View file

@ -1,6 +1,3 @@
(executables
(names fib_rec pi primes)
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file})))
(libraries moonpool moonpool.forkjoin unix trace trace-tef domainslib))

View file

@ -66,8 +66,6 @@ let run_par1 ~kind (num_steps : int) : float =
let pi = step *. Lock.get global_sum in
pi
[@@@ifge 5.0]
let run_fork_join ~kind num_steps : float =
let@ pool = with_pool ~kind () in
@ -92,13 +90,6 @@ let run_fork_join ~kind num_steps : float =
let pi = step *. Lock.get global_sum in
pi
[@@@else_]
let run_fork_join _ =
failwith "fork join not available on this version of OCaml"
[@@@endif]
type mode =
| Sequential
| Par1

View file

@ -2,7 +2,7 @@
(using mdx 0.2)
(name moonpool)
(version 0.8)
(version 0.9)
(generate_opam_files true)
(source
(github c-cube/moonpool))
@ -16,7 +16,7 @@
(name moonpool)
(synopsis "Pools of threads supported by a pool of domains")
(depends
(ocaml (>= 4.14))
(ocaml (>= 5.0))
dune
(either (>= 1.0))
(trace :with-test)
@ -44,23 +44,13 @@
(depends
(moonpool (= :version))
(ocaml (>= 5.0))
(qcheck-core (and :with-test (>= 0.19)))
(hmap :with-test)
lwt
base-unix
(trace :with-test)
(trace-tef :with-test)
(odoc :with-doc)))
(package
(name moonpool-io)
(synopsis "Async IO for moonpool, relying on picos (experimental)")
(allow_empty) ; on < 5.0
(depends
(moonpool (= :version))
(picos_io (and (>= 0.5) (< 0.7)))
(ocaml (>= 5.0))
(trace :with-test)
(trace-tef :with-test)
(odoc :with-doc)))
; See the complete stanza docs at https://dune.readthedocs.io/en/stable/dune-files.html#dune-project

View file

@ -1,33 +0,0 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.8"
synopsis: "Async IO for moonpool, relying on picos (experimental)"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]
license: "MIT"
homepage: "https://github.com/c-cube/moonpool"
bug-reports: "https://github.com/c-cube/moonpool/issues"
depends: [
"dune" {>= "3.0"}
"moonpool" {= version}
"picos_io" {>= "0.5" & < "0.7"}
"ocaml" {>= "5.0"}
"trace" {with-test}
"trace-tef" {with-test}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/c-cube/moonpool.git"

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.8"
version: "0.9"
synopsis: "Event loop for moonpool based on Lwt-engine (experimental)"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]
@ -11,6 +11,8 @@ depends: [
"dune" {>= "3.0"}
"moonpool" {= version}
"ocaml" {>= "5.0"}
"qcheck-core" {with-test & >= "0.19"}
"hmap" {with-test}
"lwt"
"base-unix"
"trace" {with-test}

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.8"
version: "0.9"
synopsis: "Pools of threads supported by a pool of domains"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]
@ -9,7 +9,7 @@ tags: ["thread" "pool" "domain" "futures" "fork-join"]
homepage: "https://github.com/c-cube/moonpool"
bug-reports: "https://github.com/c-cube/moonpool/issues"
depends: [
"ocaml" {>= "4.14"}
"ocaml" {>= "5.0"}
"dune" {>= "3.0"}
"either" {>= "1.0"}
"trace" {with-test}

View file

@ -1,182 +0,0 @@
type 'a t = {
max_size: int;
q: 'a Queue.t;
mutex: Mutex.t;
cond_push: Condition.t;
cond_pop: Condition.t;
mutable closed: bool;
}
exception Closed
let create ~max_size () : _ t =
if max_size < 1 then invalid_arg "Bounded_queue.create";
{
max_size;
mutex = Mutex.create ();
cond_push = Condition.create ();
cond_pop = Condition.create ();
q = Queue.create ();
closed = false;
}
let close (self : _ t) =
Mutex.lock self.mutex;
if not self.closed then (
self.closed <- true;
(* awake waiters so they fail *)
Condition.broadcast self.cond_push;
Condition.broadcast self.cond_pop
);
Mutex.unlock self.mutex
(** Check if the queue is full. Precondition: [self.mutex] is acquired. *)
let[@inline] is_full_ (self : _ t) : bool = Queue.length self.q >= self.max_size
let push (self : _ t) x : unit =
let continue = ref true in
Mutex.lock self.mutex;
while !continue do
if self.closed then (
(* push always fails on a closed queue *)
Mutex.unlock self.mutex;
raise Closed
) else if is_full_ self then
Condition.wait self.cond_push self.mutex
else (
let was_empty = Queue.is_empty self.q in
Queue.push x self.q;
if was_empty then Condition.broadcast self.cond_pop;
(* exit loop *)
continue := false;
Mutex.unlock self.mutex
)
done
let pop (self : 'a t) : 'a =
Mutex.lock self.mutex;
let rec loop () =
if Queue.is_empty self.q then (
if self.closed then (
(* pop fails on a closed queue if it's also empty,
otherwise it still returns the remaining elements *)
Mutex.unlock self.mutex;
raise Closed
);
Condition.wait self.cond_pop self.mutex;
(loop [@tailcall]) ()
) else (
let was_full = is_full_ self in
let x = Queue.pop self.q in
(* wakeup pushers that were blocked *)
if was_full then Condition.broadcast self.cond_push;
Mutex.unlock self.mutex;
x
)
in
loop ()
let try_pop ~force_lock (self : _ t) : _ option =
let has_lock =
if force_lock then (
Mutex.lock self.mutex;
true
) else
Mutex.try_lock self.mutex
in
if has_lock then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
let was_full_before_pop = is_full_ self in
match Queue.pop self.q with
| x ->
(* wakeup pushers that are blocked *)
if was_full_before_pop then Condition.broadcast self.cond_push;
Mutex.unlock self.mutex;
Some x
| exception Queue.Empty ->
Mutex.unlock self.mutex;
None
) else
None
let try_push ~force_lock (self : _ t) x : bool =
let has_lock =
if force_lock then (
Mutex.lock self.mutex;
true
) else
Mutex.try_lock self.mutex
in
if has_lock then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
if is_full_ self then (
Mutex.unlock self.mutex;
false
) else (
let was_empty = Queue.is_empty self.q in
Queue.push x self.q;
if was_empty then Condition.broadcast self.cond_pop;
Mutex.unlock self.mutex;
true
)
) else
false
let[@inline] max_size self = self.max_size
let size (self : _ t) : int =
Mutex.lock self.mutex;
let n = Queue.length self.q in
Mutex.unlock self.mutex;
n
let transfer (self : 'a t) q2 : unit =
Mutex.lock self.mutex;
let continue = ref true in
while !continue do
if Queue.is_empty self.q then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
Condition.wait self.cond_pop self.mutex
) else (
let was_full = is_full_ self in
Queue.transfer self.q q2;
if was_full then Condition.broadcast self.cond_push;
continue := false;
Mutex.unlock self.mutex
)
done
type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit
let to_iter self k =
try
while true do
let x = pop self in
k x
done
with Closed -> ()
let to_gen self : _ gen =
fun () ->
match pop self with
| exception Closed -> None
| x -> Some x
let rec to_seq self : _ Seq.t =
fun () ->
match pop self with
| exception Closed -> Seq.Nil
| x -> Seq.Cons (x, to_seq self)

View file

@ -1,82 +0,0 @@
(** A blocking queue of finite size.
This queue, while still using locks underneath (like the regular blocking
queue) should be enough for usage under reasonable contention.
The bounded size is helpful whenever some form of backpressure is desirable:
if the queue is used to communicate between producer(s) and consumer(s), the
consumer(s) can limit the rate at which producer(s) send new work down their
way. Whenever the queue is full, means that producer(s) will have to wait
before pushing new work.
@since 0.4 *)
type 'a t
(** A bounded queue. *)
val create : max_size:int -> unit -> 'a t
val close : _ t -> unit
(** [close q] closes [q]. No new elements can be pushed into [q], and after all
the elements still in [q] currently are [pop]'d, {!pop} will also raise
{!Closed}. *)
exception Closed
val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] at the end of the queue. If [q] is full, this will
block until there is room for [x].
@raise Closed if [q] is closed. *)
val try_push : force_lock:bool -> 'a t -> 'a -> bool
(** [try_push q x] attempts to push [x] into [q], but abandons if it cannot
acquire [q] or if [q] is full.
@param force_lock
if true, use {!Mutex.lock} (which can block under contention); if false,
use {!Mutex.try_lock}, which might return [false] even if there's room in
the queue.
@raise Closed if [q] is closed. *)
val pop : 'a t -> 'a
(** [pop q] pops the first element off [q]. It blocks if [q] is empty, until
some element becomes available.
@raise Closed if [q] is empty and closed. *)
val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop ~force_lock q] tries to pop the first element, or returns [None] if
no element is available or if it failed to acquire [q].
@param force_lock
if true, use {!Mutex.lock} (which can block under contention); if false,
use {!Mutex.try_lock}, which might return [None] even in presence of an
element if there's contention.
@raise Closed if [q] is empty and closed. *)
val size : _ t -> int
(** Number of elements currently in [q] *)
val max_size : _ t -> int
(** Maximum size of the queue. See {!create}. *)
val transfer : 'a t -> 'a Queue.t -> unit
(** [transfer bq q2] transfers all elements currently available in [bq] into
local queue [q2], and clears [bq], atomically. It blocks if [bq] is empty.
See {!Bb_queue.transfer} for more details.
@raise Closed if [bq] is empty and closed. *)
type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit
val to_iter : 'a t -> 'a iter
(** [to_iter q] returns an iterator over all items in the queue. This might not
terminate if [q] is never closed. *)
val to_gen : 'a t -> 'a gen
(** [to_gen q] returns a generator from the queue. *)
val to_seq : 'a t -> 'a Seq.t
(** [to_gen q] returns a (transient) sequence from the queue. *)

View file

@ -21,7 +21,6 @@ let create ~max_size () : _ t =
}
let try_push (self : _ t) x : bool =
let res = ref false in
if Mutex.try_lock self.mutex then (
if self.closed then (
Mutex.unlock self.mutex;
@ -33,44 +32,46 @@ let try_push (self : _ t) x : bool =
let to_awake = Queue.create () in
Queue.push x self.q;
Queue.transfer self.pop_waiters to_awake;
res := true;
Mutex.unlock self.mutex;
(* wake up pop triggers if needed. Be careful to do that
outside the critical section*)
Queue.iter Trigger.signal to_awake
Queue.iter Trigger.signal to_awake;
true
| n when n < self.max_size ->
Queue.push x self.q;
Mutex.unlock self.mutex
| _ -> Mutex.unlock self.mutex
);
!res
Mutex.unlock self.mutex;
true
| _ ->
Mutex.unlock self.mutex;
false
) else
false
let try_pop (type elt) self : elt option =
let res = ref None in
if Mutex.try_lock self.mutex then (
(match Queue.pop self.q with
match Queue.pop self.q with
| exception Queue.Empty ->
if self.closed then (
Mutex.unlock self.mutex;
Mutex.unlock self.mutex;
if self.closed then
raise Closed
)
| x -> res := Some x);
Mutex.unlock self.mutex
);
!res
else
None
| x ->
Mutex.unlock self.mutex;
Some x
) else
None
let close (self : _ t) : unit =
let q = Queue.create () in
let triggers_to_signal = Queue.create () in
Mutex.lock self.mutex;
if not self.closed then (
self.closed <- true;
Queue.transfer self.pop_waiters q;
Queue.transfer self.push_waiters q
Queue.transfer self.pop_waiters triggers_to_signal;
Queue.transfer self.push_waiters triggers_to_signal
);
Mutex.unlock self.mutex;
Queue.iter Trigger.signal q
[@@@ifge 5.0]
Queue.iter Trigger.signal triggers_to_signal
let rec push (self : _ t) x : unit =
Mutex.lock self.mutex;
@ -120,5 +121,3 @@ let rec pop (self : 'a t) : 'a =
Mutex.unlock self.mutex;
Trigger.await_exn tr;
pop self
[@@@endif]

View file

@ -1,7 +1,8 @@
(** Channels.
The channels have bounded size. Push/pop return futures or can use effects
to provide an [await]-friendly version.
The channels have bounded size. They use effects/await to provide
a direct style implementation. Pushing into a full channel,
or popping from an empty one, will suspend the current task.
The channels became bounded since @0.7 .
*)
@ -28,8 +29,6 @@ val close : _ t -> unit
(** Close the channel. Further push and pop calls will fail. This is idempotent.
*)
[@@@ifge 5.0]
val push : 'a t -> 'a -> unit
(** Push the value into the channel, suspending the current task if the channel
is currently full.
@ -48,5 +47,3 @@ val pop_block_exn : 'a t -> 'a
The precautions around blocking from inside a thread pool
are the same as explained in {!Fut.wait_block}. *)
*)
[@@@endif]

View file

@ -12,7 +12,4 @@
moonpool.dpool
(re_export picos))
(flags :standard -open Moonpool_private)
(private_modules util_pool_)
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file}))))
(private_modules util_pool_))

View file

@ -28,7 +28,6 @@ type worker_state = {
let[@inline] size_ (self : state) = Array.length self.threads
let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q
let k_worker_state : worker_state TLS.t = TLS.create ()
(*
get_thread_state = TLS.get_opt k_worker_state
@ -71,12 +70,6 @@ let schedule_w (self : worker_state) (task : task_full) : unit =
let get_next_task (self : worker_state) =
try Bb_queue.pop self.st.q with Bb_queue.Closed -> raise WL.No_more_tasks
let get_thread_state () =
match TLS.get_exn k_worker_state with
| st -> st
| exception TLS.Not_set ->
failwith "Moonpool: get_thread_state called from outside a runner."
let before_start (self : worker_state) =
let t_id = Thread.id @@ Thread.self () in
self.st.on_init_thread ~dom_id:self.dom_idx ~t_id ();
@ -103,7 +96,6 @@ let worker_ops : worker_state WL.ops =
WL.schedule = schedule_w;
runner;
get_next_task;
get_thread_state;
around_task;
on_exn;
before_start;

View file

@ -1,4 +1,4 @@
module A = Atomic_
module A = Atomic
module C = Picos.Computation
type 'a or_error = ('a, Exn_bt.t) result
@ -424,8 +424,6 @@ let wait_block self =
let bt = Printexc.get_raw_backtrace () in
Error (Exn_bt.make exn bt)
[@@@ifge 5.0]
let await (self : 'a t) : 'a =
(* fast path: peek *)
match C.peek_exn self with
@ -433,14 +431,11 @@ let await (self : 'a t) : 'a =
| exception C.Running ->
let trigger = Trigger.create () in
(* suspend until the future is resolved *)
if C.try_attach self trigger then
Option.iter Exn_bt.raise @@ Trigger.await trigger;
if C.try_attach self trigger then Trigger.await_exn trigger;
(* un-suspended: we should have a result! *)
get_or_fail_exn self
[@@@endif]
module Infix = struct
let[@inline] ( >|= ) x f = map ~f x
let[@inline] ( >>= ) x f = bind ~f x

View file

@ -236,8 +236,6 @@ val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t
{b NOTE} This is only available on OCaml 5. *)
[@@@ifge 5.0]
val await : 'a t -> 'a
(** [await fut] suspends the current tasks until [fut] is fulfilled, then
resumes the task on this same runner (but possibly on a different
@ -248,8 +246,6 @@ val await : 'a t -> 'a
This must only be run from inside the runner itself. The runner must support
{!Suspend_}. {b NOTE}: only on OCaml 5.x *)
[@@@endif]
(** {2 Blocking} *)
val wait_block : 'a t -> 'a or_error

View file

@ -12,18 +12,12 @@ let get_current_runner = Runner.get_current_runner
let recommended_thread_count () = Domain_.recommended_number ()
let spawn = Fut.spawn
let spawn_on_current_runner = Fut.spawn_on_current_runner
[@@@ifge 5.0]
let await = Fut.await
let yield = Picos.Fiber.yield
[@@@endif]
module Atomic = Atomic_
module Atomic = Atomic
module Blocking_queue = Bb_queue
module Background_thread = Background_thread
module Bounded_queue = Bounded_queue
module Chan = Chan
module Exn_bt = Exn_bt
module Fifo_pool = Fifo_pool

View file

@ -72,8 +72,6 @@ val get_current_runner : unit -> Runner.t option
(** See {!Runner.get_current_runner}
@since 0.7 *)
[@@@ifge 5.0]
val await : 'a Fut.t -> 'a
(** Await a future, must be run on a moonpool runner. See {!Fut.await}. Only on
OCaml >= 5.0.
@ -84,8 +82,6 @@ val yield : unit -> unit
>= 5.0.
@since NEXT_RELEASE *)
[@@@endif]
module Lock = Lock
module Fut = Fut
module Chan = Chan
@ -203,9 +199,7 @@ module Blocking_queue : sig
@since 0.4 *)
end
module Bounded_queue = Bounded_queue
module Atomic = Atomic_
module Atomic = Atomic
(** Atomic values.
This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic]

View file

@ -21,8 +21,6 @@ exception No_more_tasks
type 'st ops = {
schedule: 'st -> task_full -> unit;
get_next_task: 'st -> task_full; (** @raise No_more_tasks *)
get_thread_state: unit -> 'st;
(** Access current thread's worker state from any worker *)
around_task: 'st -> around_task;
on_exn: 'st -> Exn_bt.t -> unit;
runner: 'st -> Runner.t;
@ -33,8 +31,6 @@ type 'st ops = {
(** A dummy task. *)
let _dummy_task : task_full = T_start { f = ignore; fiber = _dummy_fiber }
[@@@ifge 5.0]
let[@inline] discontinue k exn =
let bt = Printexc.get_raw_backtrace () in
Effect.Deep.discontinue_with_backtrace k exn bt
@ -43,8 +39,8 @@ let[@inline] raise_with_bt exn =
let bt = Printexc.get_raw_backtrace () in
Printexc.raise_with_backtrace exn bt
let with_handler (type st arg) ~(ops : st ops) (self : st) :
(unit -> unit) -> unit =
let with_handler (type st) ~(ops : st ops) (self : st) : (unit -> unit) -> unit
=
let current =
Some
(fun k ->
@ -87,8 +83,8 @@ let with_handler (type st arg) ~(ops : st ops) (self : st) :
let fiber = get_current_fiber_exn () in
(* when triggers is signaled, reschedule task *)
if not (Picos.Fiber.try_suspend fiber trigger fiber k reschedule) then
(* trigger was already signaled, run task now *)
Picos.Fiber.resume fiber k)
(* trigger was already signaled, reschedule task now *)
reschedule trigger fiber k)
| Picos.Computation.Cancel_after _r ->
Some
(fun k ->
@ -100,37 +96,28 @@ let with_handler (type st arg) ~(ops : st ops) (self : st) :
let handler = Effect.Deep.{ retc = Fun.id; exnc = raise_with_bt; effc } in
fun f -> Effect.Deep.match_with f () handler
[@@@else_]
module type FINE_GRAINED_ARGS = sig
type st
let with_handler ~ops:_ self f = f ()
val ops : st ops
val st : st
end
[@@@endif]
module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
open Args
let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
if block_signals then (
try
ignore
(Unix.sigprocmask SIG_BLOCK
[
Sys.sigterm;
Sys.sigpipe;
Sys.sigint;
Sys.sigchld;
Sys.sigalrm;
Sys.sigusr1;
Sys.sigusr2;
]
: _ list)
with _ -> ()
);
let cur_fiber : fiber ref = ref _dummy_fiber
let runner = ops.runner st
let cur_fiber : fiber ref = ref _dummy_fiber in
let runner = ops.runner self in
TLS.set Runner.For_runner_implementors.k_cur_runner runner;
type state =
| New
| Ready
| Torn_down
let (AT_pair (before_task, after_task)) = ops.around_task self in
let state = ref New
let run_task (task : task_full) : unit =
let (AT_pair (before_task, after_task)) = ops.around_task st in
let fiber =
match task with
| T_start { fiber; _ } | T_resume { fiber; _ } -> fiber
@ -144,32 +131,82 @@ let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
assert (task != _dummy_task);
(try
match task with
| T_start { fiber = _; f } -> with_handler ~ops self f
| T_start { fiber = _; f } -> with_handler ~ops st f
| T_resume { fiber = _; k } ->
(* this is already in an effect handler *)
k ()
with e ->
let bt = Printexc.get_raw_backtrace () in
let ebt = Exn_bt.make e bt in
ops.on_exn self ebt);
ops.on_exn st ebt);
after_task runner _ctx;
cur_fiber := _dummy_fiber;
TLS.set k_cur_fiber _dummy_fiber
in
ops.before_start self;
let setup ~block_signals () : unit =
if !state <> New then invalid_arg "worker_loop.setup: not a new instance";
state := Ready;
let continue = ref true in
try
while !continue do
match ops.get_next_task self with
| task -> run_task task
if block_signals then (
try
ignore
(Unix.sigprocmask SIG_BLOCK
[
Sys.sigterm;
Sys.sigpipe;
Sys.sigint;
Sys.sigchld;
Sys.sigalrm;
Sys.sigusr1;
Sys.sigusr2;
]
: _ list)
with _ -> ()
);
TLS.set Runner.For_runner_implementors.k_cur_runner runner;
ops.before_start st
let run ?(max_tasks = max_int) () : unit =
if !state <> Ready then invalid_arg "worker_loop.run: not setup";
let continue = ref true in
let n_tasks = ref 0 in
while !continue && !n_tasks < max_tasks do
match ops.get_next_task st with
| task ->
incr n_tasks;
run_task task
| exception No_more_tasks -> continue := false
done;
ops.cleanup self
done
let teardown () =
if !state <> Torn_down then (
state := Torn_down;
cur_fiber := _dummy_fiber;
ops.cleanup st
)
end
let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
let module FG =
Fine_grained
(struct
type nonrec st = st
let ops = ops
let st = self
end)
()
in
FG.setup ~block_signals ();
try
FG.run ();
FG.teardown ()
with exn ->
let bt = Printexc.get_raw_backtrace () in
ops.cleanup self;
FG.teardown ();
Printexc.raise_with_backtrace exn bt

View file

@ -26,7 +26,6 @@ exception No_more_tasks
type 'st ops = {
schedule: 'st -> task_full -> unit;
get_next_task: 'st -> task_full;
get_thread_state: unit -> 'st;
around_task: 'st -> around_task;
on_exn: 'st -> Exn_bt.t -> unit;
runner: 'st -> Runner.t;
@ -34,4 +33,23 @@ type 'st ops = {
cleanup: 'st -> unit;
}
module type FINE_GRAINED_ARGS = sig
type st
val ops : st ops
val st : st
end
module Fine_grained (_ : FINE_GRAINED_ARGS) () : sig
val setup : block_signals:bool -> unit -> unit
(** Just initialize the loop *)
val run : ?max_tasks:int -> unit -> unit
(** Run the loop until no task remains or until [max_tasks] tasks have been
run *)
val teardown : unit -> unit
(** Tear down the loop *)
end
val worker_loop : block_signals:bool -> ops:'st ops -> 'st -> unit

View file

@ -1,5 +1,5 @@
open Types_
module A = Atomic_
module A = Atomic
module WSQ = Ws_deque_
module WL = Worker_loop_
include Runner
@ -62,12 +62,6 @@ let k_worker_state : worker_state TLS.t = TLS.create ()
let[@inline] get_current_worker_ () : worker_state option =
TLS.get_opt k_worker_state
let[@inline] get_current_worker_exn () : worker_state =
match TLS.get_exn k_worker_state with
| w -> w
| exception TLS.Not_set ->
failwith "Moonpool: get_current_runner was called from outside a pool."
(** Try to wake up a waiter, if there's any. *)
let[@inline] try_wake_someone_ (self : state) : unit =
if self.n_waiting_nonzero then (
@ -212,7 +206,6 @@ let worker_ops : worker_state WL.ops =
WL.schedule = schedule_from_w;
runner;
get_next_task;
get_thread_state = get_current_worker_exn;
around_task;
on_exn;
before_start;

View file

@ -1,124 +0,0 @@
type op =
| Le
| Ge
type line =
| If of op * int * int
| Elseif of op * int * int
| Else
| Endif
| Raw of string
| Eof
let prefix ~pre s =
let len = String.length pre in
if len > String.length s then
false
else (
let rec check i =
if i = len then
true
else if String.unsafe_get s i <> String.unsafe_get pre i then
false
else
check (i + 1)
in
check 0
)
let eval ~major ~minor op i j =
match op with
| Le -> (major, minor) <= (i, j)
| Ge -> (major, minor) >= (i, j)
let preproc_lines ~file ~major ~minor (ic : in_channel) : unit =
let pos = ref 0 in
let fail msg =
failwith (Printf.sprintf "at line %d in '%s': %s" !pos file msg)
in
let pp_pos () = Printf.printf "#%d %S\n" !pos file in
let parse_line () : line =
match input_line ic with
| exception End_of_file -> Eof
| line ->
let line' = String.trim line in
incr pos;
if line' <> "" && line'.[0] = '[' then
if prefix line' ~pre:"[@@@ifle" then
Scanf.sscanf line' "[@@@ifle %d.%d]" (fun x y -> If (Le, x, y))
else if prefix line' ~pre:"[@@@ifge" then
Scanf.sscanf line' "[@@@ifge %d.%d]" (fun x y -> If (Ge, x, y))
else if prefix line' ~pre:"[@@@elifle" then
Scanf.sscanf line' "[@@@elifle %d.%d]" (fun x y -> Elseif (Le, x, y))
else if prefix line' ~pre:"[@@@elifge" then
Scanf.sscanf line' "[@@@elifge %d.%d]" (fun x y -> Elseif (Ge, x, y))
else if line' = "[@@@else_]" then
Else
else if line' = "[@@@endif]" then
Endif
else
Raw line
else
Raw line
in
(* entry point *)
let rec top () =
match parse_line () with
| Eof -> ()
| If (op, i, j) ->
if eval ~major ~minor op i j then (
pp_pos ();
cat_block ()
) else
skip_block ~elseok:true ()
| Raw s ->
print_endline s;
top ()
| Elseif _ | Else | Endif -> fail "unexpected elseif|else|endif"
(* current block is the valid one *)
and cat_block () =
match parse_line () with
| Eof -> fail "unexpected EOF"
| If _ -> fail "nested if not supported"
| Raw s ->
print_endline s;
cat_block ()
| Endif ->
pp_pos ();
top ()
| Elseif _ | Else -> skip_block ~elseok:false ()
(* skip current block.
@param elseok if true, we should evaluate "elseif" *)
and skip_block ~elseok () =
match parse_line () with
| Eof -> fail "unexpected EOF"
| If _ -> fail "nested if not supported"
| Raw _ -> skip_block ~elseok ()
| Endif ->
pp_pos ();
top ()
| Elseif (op, i, j) ->
if elseok && eval ~major ~minor op i j then (
pp_pos ();
cat_block ()
) else
skip_block ~elseok ()
| Else ->
if elseok then (
pp_pos ();
cat_block ()
) else
skip_block ~elseok ()
in
top ()
let () =
let file = Sys.argv.(1) in
let version = Sys.ocaml_version in
let major, minor = Scanf.sscanf version "%u.%u" (fun maj min -> maj, min) in
let ic = open_in file in
preproc_lines ~file ~major ~minor ic;
()

View file

@ -1,6 +0,0 @@
; our little preprocessor (ported from containers)
(executable
(name cpp)
(modes
(best exe)))

View file

@ -2,8 +2,5 @@
(name moonpool_dpool)
(public_name moonpool.dpool)
(synopsis "Moonpool's domain pool (used to start worker threads)")
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file})))
(flags :standard -open Moonpool_private)
(libraries moonpool.private))

View file

@ -15,19 +15,23 @@ module Bb_queue = struct
if was_empty then Condition.broadcast self.cond;
Mutex.unlock self.mutex
let pop (self : 'a t) : 'a =
Mutex.lock self.mutex;
let rec loop () =
if Queue.is_empty self.q then (
Condition.wait self.cond self.mutex;
(loop [@tailcall]) ()
) else (
let x = Queue.pop self.q in
Mutex.unlock self.mutex;
x
)
in
loop ()
let pop (type a) (self : a t) : a =
let module M = struct
exception Found of a
end in
try
Mutex.lock self.mutex;
while true do
if Queue.is_empty self.q then
Condition.wait self.cond self.mutex
else (
let x = Queue.pop self.q in
Mutex.unlock self.mutex;
raise (M.Found x)
)
done;
assert false
with M.Found x -> x
end
module Lock = struct
@ -71,7 +75,7 @@ type event =
new threads for pools. *)
type worker_state = {
q: event Bb_queue.t;
th_count: int Atomic_.t; (** Number of threads on this *)
th_count: int Atomic.t; (** Number of threads on this *)
}
(** Array of (optional) workers.
@ -95,20 +99,25 @@ let domains_ : (worker_state option * Domain_.t option) Lock.t array =
a [Pool.with_] or [Pool.create() Pool.shutdown()] in a tight loop), and
if nothing happens it tries to stop to free resources. *)
let work_ idx (st : worker_state) : unit =
Thread.sigmask SIG_BLOCK
[
Sys.sigpipe; Sys.sigbus; Sys.sigterm; Sys.sigint; Sys.sigusr1; Sys.sigusr2;
]
|> ignore;
let main_loop () =
let continue = ref true in
while !continue do
match Bb_queue.pop st.q with
| Run f -> (try f () with _ -> ())
| Decr ->
if Atomic_.fetch_and_add st.th_count (-1) = 1 then (
if Atomic.fetch_and_add st.th_count (-1) = 1 then (
continue := false;
(* wait a bit, we might be needed again in a short amount of time *)
try
for _n_attempt = 1 to 50 do
Thread.delay 0.001;
if Atomic_.get st.th_count > 0 then (
if Atomic.get st.th_count > 0 then (
(* needed again! *)
continue := true;
raise Exit
@ -129,7 +138,7 @@ let work_ idx (st : worker_state) : unit =
| Some _st', dom ->
assert (st == _st');
if Atomic_.get st.th_count > 0 then
if Atomic.get st.th_count > 0 then
(* still alive! *)
(Some st, dom), true
else
@ -145,7 +154,7 @@ let work_ idx (st : worker_state) : unit =
(* special case for main domain: we start a worker immediately *)
let () =
assert (Domain_.is_main_domain ());
let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in
let w = { th_count = Atomic.make 1; q = Bb_queue.create () } in
(* thread that stays alive *)
ignore (Thread.create (fun () -> work_ 0 w) () : Thread.t);
domains_.(0) <- Lock.create (Some w, None)
@ -157,12 +166,12 @@ let run_on (i : int) (f : unit -> unit) : unit =
let w =
Lock.update_map domains_.(i) (function
| (Some w, _) as st ->
Atomic_.incr w.th_count;
Atomic.incr w.th_count;
st, w
| None, dying_dom ->
(* join previous dying domain, to free its resources, if any *)
Option.iter Domain_.join dying_dom;
let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in
let w = { th_count = Atomic.make 1; q = Bb_queue.create () } in
let worker : domain = Domain_.spawn (fun () -> work_ i w) in
(Some w, Some worker), w)
in

View file

@ -3,10 +3,4 @@
(public_name moonpool.fib)
(synopsis "Fibers and structured concurrency for Moonpool")
(libraries moonpool picos)
(enabled_if
(>= %{ocaml_version} 5.0))
(flags :standard -open Moonpool_private -open Moonpool)
(optional)
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file}))))
(flags :standard -open Moonpool_private -open Moonpool))

View file

@ -4,6 +4,4 @@
(synopsis "Fork-join parallelism for moonpool")
(flags :standard -open Moonpool)
(optional)
(enabled_if
(>= %{ocaml_version} 5.0))
(libraries moonpool moonpool.private picos))

View file

@ -64,7 +64,7 @@ module State_ = struct
done;
(* wait for the other computation to be done *)
if !must_await then Trigger.await trigger |> Option.iter Exn_bt.raise
if !must_await then Trigger.await_exn trigger
| Right_solved _ | Both_solved _ -> assert false
end
@ -144,7 +144,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
i := !i + len_range
done;
Trigger.await trigger |> Option.iter Exn_bt.raise;
Trigger.await_exn trigger;
Option.iter Exn_bt.raise @@ A.get failure;
()
)

View file

@ -1,7 +0,0 @@
(library
(name moonpool_io)
(public_name moonpool-io)
(synopsis "Async IO for moonpool, using Picos")
(enabled_if
(>= %{ocaml_version} 5.0))
(libraries moonpool moonpool.fib picos_io picos_io.select picos_io.fd))

View file

@ -1,13 +0,0 @@
module Fd = Picos_io_fd
module Unix = Picos_io.Unix
module Select = Picos_io_select
let fd_of_unix_fd : Unix.file_descr -> Fd.t = Fun.id
(** [main f] runs [f()] inside a scheduler. *)
let main (f : Moonpool.Runner.t -> 'a) : 'a = Moonpool_fib.main f
(** {2 Async read/write} *)
let read = Unix.read
let write = Unix.write

View file

@ -1,66 +0,0 @@
open Base
let await_readable fd : unit =
let trigger = Trigger.create () in
Perform_action_in_lwt.schedule
@@ Action.Wait_readable
( fd,
fun cancel ->
Trigger.signal trigger;
Lwt_engine.stop_event cancel );
Trigger.await_exn trigger
let rec read fd buf i len : int =
if len = 0 then
0
else (
match Unix.read fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
await_readable fd;
read fd buf i len
| n -> n
)
let await_writable fd =
let trigger = Trigger.create () in
Perform_action_in_lwt.schedule
@@ Action.Wait_writable
( fd,
fun cancel ->
Trigger.signal trigger;
Lwt_engine.stop_event cancel );
Trigger.await_exn trigger
let rec write_once fd buf i len : int =
if len = 0 then
0
else (
match Unix.write fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
await_writable fd;
write_once fd buf i len
| n -> n
)
let write fd buf i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
let n = write_once fd buf !i !len in
i := !i + n;
len := !len - n
done
(** Sleep for the given amount of seconds *)
let sleep_s (f : float) : unit =
if f > 0. then (
let trigger = Trigger.create () in
Perform_action_in_lwt.schedule
@@ Action.Sleep
( f,
false,
fun cancel ->
Trigger.signal trigger;
Lwt_engine.stop_event cancel );
Trigger.await_exn trigger
)

View file

@ -1,152 +0,0 @@
open Common_
class type t = object
method input : bytes -> int -> int -> int
(** Read into the slice. Returns [0] only if the stream is closed. *)
method close : unit -> unit
(** Close the input. Must be idempotent. *)
end
let create ?(close = ignore) ~input () : t =
object
method close = close
method input = input
end
let empty : t =
object
method close () = ()
method input _ _ _ = 0
end
let of_bytes ?(off = 0) ?len (b : bytes) : t =
(* i: current position in [b] *)
let i = ref off in
let len =
match len with
| Some n ->
if n > Bytes.length b - off then invalid_arg "Iostream.In.of_bytes";
n
| None -> Bytes.length b - off
in
let end_ = off + len in
object
method input b_out i_out len_out =
let n = min (end_ - !i) len_out in
Bytes.blit b !i b_out i_out n;
i := !i + n;
n
method close () = i := end_
end
let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s)
(** Read into the given slice.
@return the number of bytes read, [0] means end of input. *)
let[@inline] input (self : #t) buf i len = self#input buf i len
(** Close the channel. *)
let[@inline] close self : unit = self#close ()
let rec really_input (self : #t) buf i len =
if len > 0 then (
let n = input self buf i len in
if n = 0 then raise End_of_file;
(really_input [@tailrec]) self buf (i + n) (len - n)
)
let really_input_string self n : string =
let buf = Bytes.create n in
really_input self buf 0 n;
Bytes.unsafe_to_string buf
let copy_into ?(buf = Bytes.create _default_buf_size) (ic : #t) (oc : IO_out.t)
: unit =
let continue = ref true in
while !continue do
let len = input ic buf 0 (Bytes.length buf) in
if len = 0 then
continue := false
else
IO_out.output oc buf 0 len
done
let concat (l0 : t list) : t =
let l = ref l0 in
let rec input b i len : int =
match !l with
| [] -> 0
| ic :: tl ->
let n = ic#input b i len in
if n > 0 then
n
else (
l := tl;
input b i len
)
in
let close () = List.iter close l0 in
create ~close ~input ()
let input_all ?(buf = Bytes.create 128) (self : #t) : string =
let buf = ref buf in
let i = ref 0 in
let[@inline] full_ () = !i = Bytes.length !buf in
let grow_ () =
let old_size = Bytes.length !buf in
let new_size = min Sys.max_string_length (old_size + (old_size / 4) + 10) in
if old_size = new_size then
failwith "input_all: maximum input size exceeded";
let new_buf = Bytes.extend !buf 0 (new_size - old_size) in
buf := new_buf
in
let rec loop () =
if full_ () then grow_ ();
let available = Bytes.length !buf - !i in
let n = input self !buf !i available in
if n > 0 then (
i := !i + n;
(loop [@tailrec]) ()
)
in
loop ();
if full_ () then
Bytes.unsafe_to_string !buf
else
Bytes.sub_string !buf 0 !i
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size)
(fd : Unix.file_descr) : t =
let buf_len = ref 0 in
let buf_off = ref 0 in
let refill () =
buf_off := 0;
buf_len := IO.read fd buf 0 (Bytes.length buf)
in
object
method input b i len : int =
if !buf_len = 0 then refill ();
let n = min len !buf_len in
if n > 0 then (
Bytes.blit buf !buf_off b i n;
buf_off := !buf_off + n;
buf_len := !buf_len - n
);
n
method close () =
if close_noerr then (
try Unix.close fd with _ -> ()
) else
Unix.close fd
end

View file

@ -1,118 +0,0 @@
open Common_
class type t = object
method output_char : char -> unit
method output : bytes -> int -> int -> unit
method flush : unit -> unit
method close : unit -> unit
end
let create ?(flush = ignore) ?(close = ignore) ~output_char ~output () : t =
object
method flush () = flush ()
method close () = close ()
method output_char c = output_char c
method output bs i len = output bs i len
end
let dummy : t =
object
method flush () = ()
method close () = ()
method output_char _ = ()
method output _ _ _ = ()
end
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) fd
: t =
let buf_off = ref 0 in
let[@inline] is_full () = !buf_off = Bytes.length buf in
let flush () =
if !buf_off > 0 then (
IO.write fd buf 0 !buf_off;
buf_off := 0
)
in
object
method output_char c =
if is_full () then flush ();
Bytes.set buf !buf_off c;
incr buf_off
method output bs i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
(* make space *)
if is_full () then flush ();
let n = min !len (Bytes.length buf - !buf_off) in
Bytes.blit bs !i buf !buf_off n;
buf_off := !buf_off + n;
i := !i + n;
len := !len - n
done;
(* if full, write eagerly *)
if is_full () then flush ()
method close () =
if close_noerr then (
try
flush ();
Unix.close fd
with _ -> ()
) else (
flush ();
Unix.close fd
)
method flush = flush
end
let of_buffer (buf : Buffer.t) : t =
object
method close () = ()
method flush () = ()
method output_char c = Buffer.add_char buf c
method output bs i len = Buffer.add_subbytes buf bs i len
end
(** Output the buffer slice into this channel *)
let[@inline] output_char (self : #t) c : unit = self#output_char c
(** Output the buffer slice into this channel *)
let[@inline] output (self : #t) buf i len : unit = self#output buf i len
let[@inline] output_string (self : #t) (str : string) : unit =
self#output (Bytes.unsafe_of_string str) 0 (String.length str)
let output_line (self : #t) (str : string) : unit =
output_string self str;
output_char self '\n'
(** Close the channel. *)
let[@inline] close self : unit = self#close ()
(** Flush (ie. force write) any buffered bytes. *)
let[@inline] flush self : unit = self#flush ()
let output_int self i =
let s = string_of_int i in
output_string self s
let output_lines self seq = Seq.iter (output_line self) seq
let tee (l : t list) : t =
match l with
| [] -> dummy
| [ oc ] -> oc
| _ ->
let output bs i len = List.iter (fun oc -> output oc bs i len) l in
let output_char c = List.iter (fun oc -> output_char oc c) l in
let close () = List.iter close l in
let flush () = List.iter flush l in
create ~flush ~close ~output ~output_char ()

View file

@ -1,167 +0,0 @@
open Common_
module Trigger = M.Trigger
module Fiber = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls
(** Action scheduled from outside the loop *)
module Action = struct
type event = Lwt_engine.event
type cb = event -> unit
(** Action that we ask the lwt loop to perform, from the outside *)
type t =
| Wait_readable of Unix.file_descr * cb
| Wait_writable of Unix.file_descr * cb
| Sleep of float * bool * cb
(* TODO: provide actions with cancellation, alongside a "select" operation *)
(* | Cancel of event *)
| On_termination : 'a Lwt.t * 'a Exn_bt.result ref * Trigger.t -> t
| Wakeup : 'a Lwt.u * 'a -> t
| Wakeup_exn : _ Lwt.u * exn -> t
| Other of (unit -> unit)
(** Perform the action from within the Lwt thread *)
let perform (self : t) : unit =
match self with
| Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event)
| Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event)
| Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event)
(* | Cancel ev -> Lwt_engine.stop_event ev *)
| On_termination (fut, res, trigger) ->
Lwt.on_any fut
(fun x ->
res := Ok x;
Trigger.signal trigger)
(fun exn ->
res := Error (Exn_bt.get_callstack 10 exn);
Trigger.signal trigger)
| Wakeup (prom, x) -> Lwt.wakeup prom x
| Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e
| Other f -> f ()
end
module Action_queue = struct
type t = { q: Action.t list Atomic.t } [@@unboxed]
let create () : t = { q = Atomic.make [] }
let pop_all (self : t) : _ list = Atomic.exchange self.q []
(** Push the action and return whether the queue was previously empty *)
let push (self : t) (a : Action.t) : bool =
let is_first = ref true in
while
let old = Atomic.get self.q in
if Atomic.compare_and_set self.q old (a :: old) then (
is_first := old == [];
false
) else
true
do
()
done;
!is_first
end
module Perform_action_in_lwt = struct
open struct
let actions_ : Action_queue.t = Action_queue.create ()
(** Gets the current set of notifications and perform them from inside the
Lwt thread *)
let perform_pending_actions () : unit =
let@ _sp =
Moonpool.Private.Tracing_.with_span
"moonpool-lwt.perform-pending-actions"
in
let l = Action_queue.pop_all actions_ in
List.iter Action.perform l
let notification : int =
Lwt_unix.make_notification ~once:false perform_pending_actions
end
let schedule (a : Action.t) : unit =
let is_first = Action_queue.push actions_ a in
if is_first then Lwt_unix.send_notification notification
end
let get_runner () : M.Runner.t =
match M.Runner.get_current_runner () with
| Some r -> r
| None -> failwith "Moonpool_lwt.get_runner: not inside a runner"
let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
let lwt_fut, lwt_prom = Lwt.wait () in
M.Fut.on_result fut (function
| Ok x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (lwt_prom, x)
| Error ebt ->
let exn = Exn_bt.exn ebt in
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (lwt_prom, exn));
lwt_fut
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
match Lwt.poll lwt_fut with
| Some x -> M.Fut.return x
| None ->
let fut, prom = M.Fut.make () in
Lwt.on_any lwt_fut
(fun x -> M.Fut.fulfill prom (Ok x))
(fun exn ->
let bt = Printexc.get_callstack 10 in
M.Fut.fulfill prom (Error (Exn_bt.make exn bt)));
fut
let _dummy_exn_bt : Exn_bt.t =
Exn_bt.get_callstack 0 (Failure "dummy Exn_bt from moonpool-lwt")
let await_lwt (fut : _ Lwt.t) =
match Lwt.poll fut with
| Some x -> x
| None ->
(* suspend fiber, wake it up when [fut] resolves *)
let trigger = M.Trigger.create () in
let res = ref (Error _dummy_exn_bt) in
Perform_action_in_lwt.(schedule Action.(On_termination (fut, res, trigger)));
Trigger.await trigger |> Option.iter Exn_bt.raise;
Exn_bt.unwrap !res
let run_in_lwt f : _ M.Fut.t =
let fut, prom = M.Fut.make () in
Perform_action_in_lwt.schedule
(Action.Other
(fun () ->
let lwt_fut = f () in
Lwt.on_any lwt_fut
(fun x -> M.Fut.fulfill prom @@ Ok x)
(fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn))));
fut
let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f
let detach_in_runner ~runner f : _ Lwt.t =
let fut, promise = Lwt.wait () in
M.Runner.run_async runner (fun () ->
match f () with
| x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (promise, x)
| exception exn ->
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn));
fut
let main_with_runner ~runner (f : unit -> 'a) : 'a =
let lwt_fut, lwt_prom = Lwt.wait () in
let _fiber =
Fiber.spawn_top ~on:runner (fun () ->
try
let x = f () in
Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x))
with exn ->
Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn)))
in
Lwt_main.run lwt_fut
let main f =
let@ runner = M.Ws_pool.with_ () in
main_with_runner ~runner f

View file

@ -1,5 +0,0 @@
module M = Moonpool
module Exn_bt = M.Exn_bt
let ( let@ ) = ( @@ )
let _default_buf_size = 4 * 1024

View file

@ -1,12 +1,11 @@
(library
(name moonpool_lwt)
(public_name moonpool-lwt)
(private_modules common_)
(enabled_if
(>= %{ocaml_version} 5.0))
(libraries
(re_export moonpool)
(re_export moonpool.fib)
moonpool.fib
picos
(re_export lwt)
lwt.unix))

View file

@ -1,6 +1,322 @@
include Base
module IO = IO
module IO_out = IO_out
module IO_in = IO_in
module TCP_server = Tcp_server
module TCP_client = Tcp_client
module Exn_bt = Moonpool.Exn_bt
open struct
module WL = Moonpool.Private.Worker_loop_
module M = Moonpool
end
module Fut = Moonpool.Fut
let default_around_task_ : WL.around_task = AT_pair (ignore, fun _ _ -> ())
let on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref =
ref (fun ebt ->
Printf.eprintf "uncaught exception in moonpool-lwt:\n%s" (Exn_bt.show ebt))
module Scheduler_state = struct
type st = {
tasks: WL.task_full Queue.t;
actions_from_other_threads: (unit -> unit) Queue.t;
(** Other threads ask us to run closures in the lwt thread *)
mutex: Mutex.t;
mutable thread: int;
closed: bool Atomic.t;
cleanup_done: bool Atomic.t;
mutable as_runner: Moonpool.Runner.t;
mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option;
mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option;
mutable notification: int;
(** A lwt_unix notification to wake up the event loop *)
has_notified: bool Atomic.t;
}
(** Main state *)
let cur_st : st option Atomic.t = Atomic.make None
let create_new () : st =
{
tasks = Queue.create ();
actions_from_other_threads = Queue.create ();
mutex = Mutex.create ();
thread = Thread.id (Thread.self ());
closed = Atomic.make false;
cleanup_done = Atomic.make false;
as_runner = Moonpool.Runner.dummy;
enter_hook = None;
leave_hook = None;
notification = 0;
has_notified = Atomic.make false;
}
let[@inline] notify_ (self : st) : unit =
if not (Atomic.exchange self.has_notified true) then
Lwt_unix.send_notification self.notification
let[@inline never] add_action_from_another_thread_ (self : st) f : unit =
Mutex.lock self.mutex;
Queue.push f self.actions_from_other_threads;
Mutex.unlock self.mutex;
notify_ self
let[@inline] on_lwt_thread_ (self : st) : bool =
Thread.id (Thread.self ()) = self.thread
let[@inline] run_on_lwt_thread_ (self : st) (f : unit -> unit) : unit =
if on_lwt_thread_ self then
f ()
else
add_action_from_another_thread_ self f
let cleanup (st : st) =
match Atomic.get cur_st with
| Some st' ->
if st != st' then
failwith
"moonpool-lwt: cleanup failed (state is not the currently active \
one!)";
if not (on_lwt_thread_ st) then
failwith "moonpool-lwt: cleanup from the wrong thread";
Atomic.set st.closed true;
if not (Atomic.exchange st.cleanup_done true) then (
Option.iter Lwt_main.Enter_iter_hooks.remove st.enter_hook;
Option.iter Lwt_main.Leave_iter_hooks.remove st.leave_hook;
Lwt_unix.stop_notification st.notification
);
Atomic.set cur_st None
| None -> failwith "moonpool-lwt: cleanup failed (no current active state)"
end
module Ops = struct
type st = Scheduler_state.st
let around_task _ = default_around_task_
let schedule (self : st) t =
if Atomic.get self.closed then
failwith "moonpool-lwt.schedule: scheduler is closed";
Scheduler_state.run_on_lwt_thread_ self (fun () -> Queue.push t self.tasks)
let get_next_task (self : st) =
if Atomic.get self.closed then raise WL.No_more_tasks;
try Queue.pop self.tasks with Queue.Empty -> raise WL.No_more_tasks
let on_exn _ ebt = !on_uncaught_exn ebt
let runner (self : st) = self.as_runner
let cleanup = Scheduler_state.cleanup
let as_runner (self : st) : Moonpool.Runner.t =
Moonpool.Runner.For_runner_implementors.create
~size:(fun () -> 1)
~num_tasks:(fun () ->
Mutex.lock self.mutex;
let n = Queue.length self.tasks in
Mutex.unlock self.mutex;
n)
~run_async:(fun ~fiber f -> schedule self @@ WL.T_start { fiber; f })
~shutdown:(fun ~wait:_ () -> Atomic.set self.closed true)
()
let before_start (self : st) : unit =
self.as_runner <- as_runner self;
()
let ops : st WL.ops =
{
schedule;
around_task;
get_next_task;
on_exn;
runner;
before_start;
cleanup;
}
let setup st =
if Atomic.compare_and_set Scheduler_state.cur_st None (Some st) then
before_start st
else
failwith "moonpool-lwt: setup failed (state already in place)"
end
(** Resolve [prom] with the result of [lwt_fut] *)
let transfer_lwt_to_fut (lwt_fut : 'a Lwt.t) (prom : 'a Fut.promise) : unit =
Lwt.on_any lwt_fut
(fun x -> M.Fut.fulfill_idempotent prom (Ok x))
(fun exn ->
let bt = Printexc.get_callstack 10 in
M.Fut.fulfill_idempotent prom (Error (Exn_bt.make exn bt)))
let[@inline] register_trigger_on_lwt_termination (lwt_fut : _ Lwt.t)
(tr : M.Trigger.t) : unit =
Lwt.on_termination lwt_fut (fun _ -> M.Trigger.signal tr)
let[@inline] await_lwt_terminated (fut : _ Lwt.t) =
match Lwt.state fut with
| Return x -> x
| Fail exn -> raise exn
| Sleep -> assert false
module Main_state = struct
let[@inline] get_st () : Scheduler_state.st =
match Atomic.get Scheduler_state.cur_st with
| Some st ->
if Atomic.get st.closed then failwith "moonpool-lwt: scheduler is closed";
st
| None -> failwith "moonpool-lwt: scheduler is not setup"
let[@inline] run_on_lwt_thread f =
Scheduler_state.run_on_lwt_thread_ (get_st ()) f
let[@inline] on_lwt_thread () : bool =
Scheduler_state.on_lwt_thread_ (get_st ())
let[@inline] add_action_from_another_thread f : unit =
Scheduler_state.add_action_from_another_thread_ (get_st ()) f
end
let await_lwt_from_another_thread fut =
let tr = M.Trigger.create () in
Main_state.add_action_from_another_thread (fun () ->
register_trigger_on_lwt_termination fut tr);
M.Trigger.await_exn tr;
await_lwt_terminated fut
let await_lwt (fut : _ Lwt.t) =
if Scheduler_state.on_lwt_thread_ (Main_state.get_st ()) then (
(* can directly access the future *)
match Lwt.state fut with
| Return x -> x
| Fail exn -> raise exn
| Sleep ->
let tr = M.Trigger.create () in
register_trigger_on_lwt_termination fut tr;
M.Trigger.await_exn tr;
await_lwt_terminated fut
) else
await_lwt_from_another_thread fut
let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
if not (Main_state.on_lwt_thread ()) then
failwith "lwt_of_fut: not on the lwt thread";
let lwt_fut, lwt_prom = Lwt.wait () in
(* in lwt thread, resolve [lwt_fut] *)
let wakeup_using_res = function
| Ok x -> Lwt.wakeup lwt_prom x
| Error ebt ->
let exn = Exn_bt.exn ebt in
Lwt.wakeup_exn lwt_prom exn
in
M.Fut.on_result fut (fun res ->
Main_state.run_on_lwt_thread (fun () ->
(* can safely wakeup from the lwt thread *)
wakeup_using_res res));
lwt_fut
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
if Main_state.on_lwt_thread () then (
match Lwt.state lwt_fut with
| Return x -> M.Fut.return x
| _ ->
let fut, prom = M.Fut.make () in
transfer_lwt_to_fut lwt_fut prom;
fut
) else (
let fut, prom = M.Fut.make () in
Main_state.add_action_from_another_thread (fun () ->
transfer_lwt_to_fut lwt_fut prom);
fut
)
module Setup_lwt_hooks (ARG : sig
val st : Scheduler_state.st
end) =
struct
open ARG
module FG =
WL.Fine_grained
(struct
include Scheduler_state
let st = st
let ops = Ops.ops
end)
()
let run_in_hook () =
(* execute actions sent from other threads; first transfer them
all atomically to a local queue to reduce contention *)
let local_acts = Queue.create () in
Mutex.lock st.mutex;
Queue.transfer st.actions_from_other_threads local_acts;
Atomic.set st.has_notified false;
Mutex.unlock st.mutex;
Queue.iter (fun f -> f ()) local_acts;
(* run tasks *)
FG.run ~max_tasks:1000 ();
if not (Queue.is_empty st.tasks) then ignore (Lwt.pause () : unit Lwt.t);
()
let setup () =
(* only one thread does this *)
FG.setup ~block_signals:false ();
st.thread <- Thread.self () |> Thread.id;
st.enter_hook <- Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook);
st.leave_hook <- Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook);
(* notification used to wake lwt up *)
st.notification <- Lwt_unix.make_notification ~once:false run_in_hook
end
let setup () : Scheduler_state.st =
let st = Scheduler_state.create_new () in
Ops.setup st;
let module Setup_lwt_hooks' = Setup_lwt_hooks (struct
let st = st
end) in
Setup_lwt_hooks'.setup ();
st
let[@inline] is_setup () = Option.is_some @@ Atomic.get Scheduler_state.cur_st
let spawn_lwt f : _ Lwt.t =
let st = Main_state.get_st () in
let lwt_fut, lwt_prom = Lwt.wait () in
Moonpool_fib.spawn_top_ignore ~on:st.as_runner (fun () ->
try
let x = f () in
Lwt.wakeup lwt_prom x
with exn -> Lwt.wakeup_exn lwt_prom exn);
lwt_fut
let spawn_lwt_ignore f = ignore (spawn_lwt f : unit Lwt.t)
let on_lwt_thread = Main_state.on_lwt_thread
let run_in_lwt_and_await (f : unit -> 'a) : 'a =
let st = Main_state.get_st () in
if Scheduler_state.on_lwt_thread_ st then
(* run immediately *)
f ()
else
await_lwt_from_another_thread @@ spawn_lwt f
let lwt_main (f : _ -> 'a) : 'a =
let st = setup () in
(* make sure to cleanup *)
let finally () = Scheduler_state.cleanup st in
Fun.protect ~finally @@ fun () ->
let fut = spawn_lwt (fun () -> f st.as_runner) in
(* make sure the scheduler isn't already sleeping *)
Scheduler_state.notify_ st;
Lwt_main.run fut
let[@inline] lwt_main_runner () =
let st = Main_state.get_st () in
st.as_runner

View file

@ -1,144 +1,73 @@
(** Lwt_engine-based event loop for Moonpool.
In what follows, we mean by "lwt thread" the thread running [Lwt_main.run]
(so, the thread where the Lwt event loop and all Lwt callbacks execute).
In what follows, we mean by "lwt thread" the thread running {!lwt_main}
(which wraps [Lwt_main.run]; so, the thread where the Lwt event loop and all
Lwt callbacks execute).
{b NOTE}: this is experimental and might change in future versions.
@since 0.6 *)
@since 0.6
module Fiber = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls
The API has entirely changed since 0.9 , see
https://github.com/c-cube/moonpool/pull/37 *)
module Fut = Moonpool.Fut
(** {2 Basic conversions} *)
val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t
(** [fut_of_lwt lwt_fut] makes a thread-safe moonpool future that completes when
[lwt_fut] does. This must be run from within the Lwt thread. *)
[lwt_fut] does. This can be run from any thread. *)
val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t
(** [lwt_of_fut fut] makes a lwt future that completes when [fut] does. This
must be called from the Lwt thread, and the result must always be used only
from inside the Lwt thread. *)
from inside the Lwt thread.
@raise Failure if not run from the lwt thread. *)
(** {2 Helpers on the moonpool side} *)
val spawn_lwt : (unit -> 'a) -> 'a Lwt.t
(** This spawns a task that runs in the Lwt scheduler. This function is thread
safe.
@raise Failure if {!lwt_main} was not called. *)
val spawn_lwt_ignore : (unit -> unit) -> unit
(** Like {!spawn_lwt} but ignores the result, like [Lwt.async]. This function is
thread safe. *)
val await_lwt : 'a Lwt.t -> 'a
(** [await_lwt fut] awaits a Lwt future from inside a task running on a moonpool
runner. This must be run from within a Moonpool runner so that the await-ing
effect is handled. *)
effect is handled, but it doesn't have to run from inside the Lwt thread. *)
val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t
(** [run_in_lwt f] runs [f()] from within the Lwt thread and returns a
thread-safe future. This can be run from anywhere. *)
val run_in_lwt_and_await : (unit -> 'a) -> 'a
(** [run_in_lwt_and_await f] runs [f()] in the lwt thread, just like
[spawn_lwt f], and then calls {!await_lwt} on the result. This means [f()]
can use Lwt functions and libraries, use {!await_lwt} on them freely, etc.
val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a
(** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and awaits its result.
Must be run from inside a moonpool runner so that the await-in effect is
handled.
This is similar to [Moonpool.await @@ run_in_lwt f]. *)
val get_runner : unit -> Moonpool.Runner.t
(** Returns the runner from within which this is called. Must be run from within
a fiber.
@raise Failure if not run within a fiber *)
(** {2 IO} *)
(** IO using the Lwt event loop.
These IO operations work on non-blocking file descriptors and rely on a
[Lwt_engine] event loop being active (meaning, [Lwt_main.run] is currently
running in some thread).
Calling these functions must be done from a moonpool runner. A function like
[read] will first try to perform the IO action directly (here, call
{!Unix.read}); if the action fails because the FD is not ready, then
[await_readable] is called: it suspends the fiber and subscribes it to Lwt
to be awakened when the FD becomes ready. *)
module IO : sig
val read : Unix.file_descr -> bytes -> int -> int -> int
(** Read from the file descriptor *)
val await_readable : Unix.file_descr -> unit
(** Suspend the fiber until the FD is readable *)
val write_once : Unix.file_descr -> bytes -> int -> int -> int
(** Perform one write into the file descriptor *)
val await_writable : Unix.file_descr -> unit
(** Suspend the fiber until the FD is writable *)
val write : Unix.file_descr -> bytes -> int -> int -> unit
(** Loop around {!write_once} to write the entire slice. *)
val sleep_s : float -> unit
(** Suspend the fiber for [n] seconds. *)
end
module IO_in = IO_in
(** Input channel *)
module IO_out = IO_out
(** Output channel *)
module TCP_server : sig
type t = Lwt_io.server
val establish_lwt :
?backlog:
(* ?server_fd:Unix.file_descr -> *)
int ->
?no_close:bool ->
runner:Moonpool.Runner.t ->
Unix.sockaddr ->
(Unix.sockaddr -> Lwt_io.input_channel -> Lwt_io.output_channel -> unit) ->
t
(** [establish ~runner addr handler] runs a TCP server in the Lwt thread. When
a client connects, a moonpool fiber is started on [runner] to handle it.
*)
val establish :
?backlog:
(* ?server_fd:Unix.file_descr -> *)
int ->
?no_close:bool ->
runner:Moonpool.Runner.t ->
Unix.sockaddr ->
(Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) ->
t
(** Like {!establish_lwt} but uses {!IO} to directly handle reads and writes
on client sockets. *)
val shutdown : t -> unit
(** Shutdown the server *)
end
module TCP_client : sig
val connect : Unix.sockaddr -> Unix.file_descr
val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a
(** Open a connection, and use {!IO} to read and write from the socket in a
non blocking way. *)
val with_connect_lwt :
Unix.sockaddr -> (Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) -> 'a
(** Open a connection. *)
end
(** {2 Helpers on the lwt side} *)
val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t
(** [detach_in_runner ~runner f] runs [f] in the given moonpool runner, and
returns a lwt future. This must be run from within the thread running
[Lwt_main]. *)
This function must run from within a task running on a moonpool runner so
that it can [await_lwt]. *)
(** {2 Wrappers around Lwt_main} *)
val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a
(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()]
inside a fiber in [runner]. *)
val on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref
(** Exception handler for tasks that raise an uncaught exception. *)
val main : (unit -> 'a) -> 'a
(** Like {!main_with_runner} but with a default choice of runner. *)
val lwt_main : (Moonpool.Runner.t -> 'a) -> 'a
(** [lwt_main f] sets the moonpool-lwt bridge up, runs lwt main, calls [f],
destroys the bridge, and return the result of [f()]. Only one thread should
call this at a time. *)
val on_lwt_thread : unit -> bool
(** [on_lwt_thread ()] is true if the current thread is the one currently
running {!lwt_main}. This is thread safe.
@raise Failure if {!lwt_main} was not called. *)
val lwt_main_runner : unit -> Moonpool.Runner.t
(** The runner from {!lwt_main}. The runner is only going to work if {!lwt_main}
is currently running in some thread. This is thread safe.
@raise Failure if {!lwt_main} was not called. *)
val is_setup : unit -> bool
(** Is the moonpool-lwt bridge setup? This is thread safe. *)

View file

@ -1,53 +0,0 @@
open Common_
open Base
let connect addr : Unix.file_descr =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.set_nonblock sock;
Unix.setsockopt sock Unix.TCP_NODELAY true;
(* connect asynchronously *)
while
try
Unix.connect sock addr;
false
with
| Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _)
->
IO.await_writable sock;
true
do
()
done;
sock
let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a =
let sock = connect addr in
let ic = IO_in.of_unix_fd sock in
let oc = IO_out.of_unix_fd sock in
let finally () = try Unix.close sock with _ -> () in
let@ () = Fun.protect ~finally in
f ic oc
let with_connect_lwt addr
(f : Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) : 'a =
let sock = connect addr in
let ic =
run_in_lwt_and_await (fun () ->
Lwt.return @@ Lwt_io.of_unix_fd ~mode:Lwt_io.input sock)
in
let oc =
run_in_lwt_and_await (fun () ->
Lwt.return @@ Lwt_io.of_unix_fd ~mode:Lwt_io.output sock)
in
let finally () =
(try run_in_lwt_and_await (fun () -> Lwt_io.close ic) with _ -> ());
(try run_in_lwt_and_await (fun () -> Lwt_io.close oc) with _ -> ());
try Unix.close sock with _ -> ()
in
let@ () = Fun.protect ~finally in
f ic oc

View file

@ -1,38 +0,0 @@
open Common_
open Base
type t = Lwt_io.server
let establish_lwt ?backlog ?no_close ~runner addr handler : t =
let server =
Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr
(fun client_addr client_sock ->
let ic = Lwt_io.of_fd ~mode:Lwt_io.input client_sock in
let oc = Lwt_io.of_fd ~mode:Lwt_io.output client_sock in
let fut =
M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc)
in
let lwt_fut = lwt_of_fut fut in
lwt_fut)
in
await_lwt server
let establish ?backlog ?no_close ~runner addr handler : t =
let server =
Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr
(fun client_addr client_sock ->
let ic = IO_in.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in
let oc = IO_out.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in
let fut =
M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc)
in
let lwt_fut = lwt_of_fut fut in
lwt_fut)
in
await_lwt server
let shutdown self = await_lwt @@ Lwt_io.shutdown_server self

View file

@ -1,46 +0,0 @@
[@@@ifge 4.12]
include Atomic
[@@@else_]
type 'a t = { mutable x: 'a }
let[@inline] make x = { x }
let[@inline] get { x } = x
let[@inline] set r x = r.x <- x
let[@inline never] exchange r x =
(* atomic *)
let y = r.x in
r.x <- x;
(* atomic *)
y
let[@inline never] compare_and_set r seen v =
(* atomic *)
if r.x == seen then (
r.x <- v;
(* atomic *)
true
) else
false
let[@inline never] fetch_and_add r x =
(* atomic *)
let v = r.x in
r.x <- x + r.x;
(* atomic *)
v
let[@inline never] incr r =
(* atomic *)
r.x <- 1 + r.x
(* atomic *)
let[@inline never] decr r =
(* atomic *)
r.x <- r.x - 1
(* atomic *)
[@@@endif]

View file

@ -1,4 +1,3 @@
[@@@ifge 5.0]
[@@@ocaml.alert "-unstable"]
let recommended_number () = Domain.recommended_domain_count ()
@ -10,18 +9,3 @@ let spawn : _ -> t = Domain.spawn
let relax = Domain.cpu_relax
let join = Domain.join
let is_main_domain = Domain.is_main_domain
[@@@ocaml.alert "+unstable"]
[@@@else_]
let recommended_number () = 1
type t = Thread.t
let get_id (self : t) : int = Thread.id self
let spawn f : t = Thread.create f ()
let relax () = Thread.yield ()
let join = Thread.join
let is_main_domain () = true
[@@@endif]

View file

@ -2,9 +2,6 @@
(name moonpool_private)
(public_name moonpool.private)
(synopsis "Private internal utils for Moonpool (do not rely on)")
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file})))
(libraries
threads
either

View file

@ -1,4 +1,4 @@
module A = Atomic_
module A = Atomic
(* terminology:

View file

@ -1,5 +0,0 @@
(library
(name moonpool_sync)
(public_name moonpool.sync)
(synopsis "Cooperative synchronization primitives for Moonpool")
(libraries moonpool picos picos_std.sync picos_std.event))

View file

@ -1,11 +0,0 @@
include Picos_std_event.Event
let[@inline] of_fut (fut : _ Moonpool.Fut.t) : _ t =
from_computation (Moonpool.Fut.Private_.as_computation fut)
module Infix = struct
let[@inline] ( let+ ) x f = map f x
let ( >|= ) = ( let+ )
end
include Infix

View file

@ -1,12 +0,0 @@
include module type of struct
include Picos_std_event.Event
end
val of_fut : 'a Moonpool.Fut.t -> 'a t
module Infix : sig
val ( >|= ) : 'a t -> ('a -> 'b) -> 'b t
val ( let+ ) : 'a t -> ('a -> 'b) -> 'b t
end
include module type of Infix

View file

@ -1,38 +0,0 @@
module Mutex = Picos_std_sync.Mutex
type 'a t = {
mutex: Mutex.t;
mutable content: 'a;
}
let create content : _ t = { mutex = Mutex.create (); content }
let with_ (self : _ t) f =
Mutex.lock self.mutex;
try
let x = f self.content in
Mutex.unlock self.mutex;
x
with e ->
Mutex.unlock self.mutex;
raise e
let[@inline] mutex self = self.mutex
let[@inline] update self f = with_ self (fun x -> self.content <- f x)
let[@inline] update_map l f =
with_ l (fun x ->
let x', y = f x in
l.content <- x';
y)
let get l =
Mutex.lock l.mutex;
let x = l.content in
Mutex.unlock l.mutex;
x
let set l x =
Mutex.lock l.mutex;
l.content <- x;
Mutex.unlock l.mutex

View file

@ -1,56 +0,0 @@
(** Mutex-protected resource.
This lock is a synchronous concurrency primitive, as a thin wrapper around
{!Mutex} that encourages proper management of the critical section in RAII
style:
{[
let (let@) = (@@)
let compute_foo =
(* enter critical section *)
let@ x = Lock.with_ protected_resource in
use_x;
return_foo ()
(* exit critical section *)
in
]}
This lock is based on {!Picos_sync.Mutex} so it is [await]-safe.
@since 0.7 *)
type 'a t
(** A value protected by a cooperative mutex *)
val create : 'a -> 'a t
(** Create a new protected value. *)
val with_ : 'a t -> ('a -> 'b) -> 'b
(** [with_ l f] runs [f x] where [x] is the value protected with the lock [l],
in a critical section. If [f x] fails, [with_lock l f] fails too but the
lock is released. *)
val update : 'a t -> ('a -> 'a) -> unit
(** [update l f] replaces the content [x] of [l] with [f x], while protected by
the mutex. *)
val update_map : 'a t -> ('a -> 'a * 'b) -> 'b
(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l] and
returns [y], while protected by the mutex. *)
val mutex : _ t -> Picos_std_sync.Mutex.t
(** Underlying mutex. *)
val get : 'a t -> 'a
(** Atomically get the value in the lock. The value that is returned isn't
protected! *)
val set : 'a t -> 'a -> unit
(** Atomically set the value.
{b NOTE} caution: using {!get} and {!set} as if this were a {!ref} is an
anti pattern and will not protect data against some race conditions. *)

View file

@ -1,9 +0,0 @@
module Mutex = Picos_std_sync.Mutex
module Condition = Picos_std_sync.Condition
module Lock = Lock
module Event = Event
module Semaphore = Picos_std_sync.Semaphore
module Lazy = Picos_std_sync.Lazy
module Latch = Picos_std_sync.Latch
module Ivar = Picos_std_sync.Ivar
module Stream = Picos_std_sync.Stream

View file

@ -10,8 +10,7 @@
t_resource
t_unfair
t_ws_deque
t_ws_wait
t_bounded_queue)
t_ws_wait)
(package moonpool)
(libraries
moonpool

View file

@ -9,9 +9,6 @@
t_sort
t_fork_join
t_fork_join_heavy)
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file})))
(enabled_if
(and
(= %{system} "linux")

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
open Moonpool
let ( let@ ) = ( @@ )
@ -56,5 +54,3 @@ let main () =
let () =
let@ () = Trace_tef.with_setup () in
main ()
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
open Moonpool
module FJ = Moonpool_forkjoin
@ -52,5 +50,3 @@ let () =
(* now make sure we can do this with multiple pools in parallel *)
let jobs = Array.init 2 (fun _ -> Thread.create run_test ()) in
Array.iter Thread.join jobs
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
let ( let@ ) = ( @@ )
open Moonpool
@ -44,5 +42,3 @@ let () =
(* now make sure we can do this with multiple pools in parallel *)
let jobs = Array.init 2 (fun _ -> Thread.create run_test ()) in
Array.iter Thread.join jobs
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
let spf = Printf.sprintf
let ( let@ ) = ( @@ )
@ -328,5 +326,3 @@ let () =
t_for_nested ~min:1 ~chunk_size:100 ();
t_for_nested ~min:4 ~chunk_size:100 ();
]
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
module Q = QCheck
let spf = Printf.sprintf
@ -52,5 +50,3 @@ let () =
run ~min:4 ();
run ~min:1 ();
Printf.printf "done\n%!"
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
open! Moonpool
let pool = Ws_pool.create ~num_threads:4 ()
@ -53,5 +51,3 @@ let () =
in
let fut = Fut.both f1 f2 in
assert (Fut.wait_block fut = Ok (2, 20))
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
open Moonpool
let ( let@ ) = ( @@ )
@ -44,5 +42,3 @@ let () =
run ~pool ());
()
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
open Moonpool
module FJ = Moonpool_forkjoin
@ -69,5 +67,3 @@ let () =
(* Printf.printf "arr: [%s]\n%!" *)
(* (String.concat ", " @@ List.map string_of_int @@ Array.to_list arr); *)
assert (sorted arr)
[@@@endif]

View file

@ -4,6 +4,7 @@
(>= %{ocaml_version} 5.0))
(package moonpool)
(libraries
t_fibers
moonpool
moonpool.fib
trace

5
test/fiber/lib/dune Normal file
View file

@ -0,0 +1,5 @@
(library
(name t_fibers)
(package moonpool)
(optional)
(libraries moonpool moonpool.fib trace qcheck-core hmap))

177
test/fiber/lib/fib.ml Normal file
View file

@ -0,0 +1,177 @@
open! Moonpool
module A = Atomic
module F = Moonpool_fib.Fiber
let ( let@ ) = ( @@ )
module TS = struct
type t = int list
let show (s : t) = String.concat "." @@ List.map string_of_int s
let init = [ 0 ]
let next_ = function
| [] -> [ 0 ]
| n :: tl -> (n + 1) :: tl
let tick (t : t ref) = t := next_ !t
let tick_get t =
tick t;
!t
end
(* more deterministic logging of events *)
module Log_ = struct
let events : (TS.t * string) list A.t = A.make []
let add_event t msg : unit =
while
let old = A.get events in
not (A.compare_and_set events old ((t, msg) :: old))
do
()
done
let logf t fmt = Printf.ksprintf (add_event t) fmt
let print_and_clear () =
let l =
A.exchange events []
|> List.map (fun (ts, msg) -> List.rev ts, msg)
|> List.sort Stdlib.compare
in
List.iter (fun (ts, msg) -> Printf.printf "%s: %s\n" (TS.show ts) msg) l
end
let logf = Log_.logf
let run1 ~runner () =
Printf.printf "============\nstart\n%!";
let clock = ref TS.init in
let fib =
F.spawn_top ~on:runner @@ fun () ->
let chan_progress = Chan.create ~max_size:4 () in
let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in
let subs =
List.init 5 (fun i ->
F.spawn ~protect:false @@ fun _n ->
Thread.delay (float i *. 0.01);
Chan.pop chans.(i);
Chan.push chan_progress i;
F.check_if_cancelled ();
i)
in
logf (TS.tick_get clock) "wait for subs";
F.spawn_ignore (fun () ->
for i = 0 to 4 do
Chan.push chans.(i) ();
let i' = Chan.pop chan_progress in
assert (i = i')
done);
(let clock0 = !clock in
List.iteri
(fun i f ->
let clock = ref (0 :: i :: clock0) in
logf !clock "await fiber %d" i;
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
(Option.is_some @@ F.Private_.get_cur_opt ());
let res = F.await f in
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
(Option.is_some @@ F.Private_.get_cur_opt ());
F.yield ();
logf (TS.tick_get clock) "res %d = %d" i res)
subs);
logf (TS.tick_get clock) "main fiber done"
in
Fut.await @@ F.res fib;
logf (TS.tick_get clock) "main fiber exited";
Log_.print_and_clear ();
()
let run2 ~runner () =
(* same but now, cancel one of the sub-fibers *)
Printf.printf "============\nstart\n";
let clock = ref TS.init in
let fib =
F.spawn_top ~on:runner @@ fun () ->
let@ () =
F.with_on_self_cancel (fun ebt ->
logf (TS.tick_get clock) "main fiber cancelled with %s"
@@ Exn_bt.show ebt)
in
let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in
let chan_progress = Chan.create ~max_size:4 () in
logf (TS.tick_get clock) "start fibers";
let subs =
let clock0 = !clock in
List.init 10 (fun i ->
let clock = ref (0 :: i :: clock0) in
F.spawn ~protect:false @@ fun _n ->
let@ () =
F.with_on_self_cancel (fun _ ->
logf (TS.tick_get clock) "sub-fiber %d was cancelled" i)
in
Thread.delay 0.002;
(* sync for determinism *)
Chan.pop chans_unblock.(i);
Chan.push chan_progress i;
if i = 7 then (
logf (TS.tick_get clock) "I'm fiber %d and I'm about to fail…" i;
failwith "oh no!"
);
F.check_if_cancelled ();
i)
in
let post = TS.tick_get clock in
List.iteri
(fun i fib ->
F.on_result fib (function
| Ok _ -> logf (i :: post) "fiber %d resolved as ok" i
| Error _ -> logf (i :: post) "fiber %d resolved as error" i))
subs;
(* sequentialize the fibers, for determinism *)
F.spawn_ignore (fun () ->
for j = 0 to 9 do
Chan.push chans_unblock.(j) ();
let j' = Chan.pop chan_progress in
assert (j = j')
done);
logf (TS.tick_get clock) "wait for subs";
List.iteri
(fun i f ->
logf (TS.tick_get clock) "await fiber %d" i;
let res = F.await f in
logf (TS.tick_get clock) "res %d = %d" i res)
subs;
logf (TS.tick_get clock) "yield";
F.yield ();
logf (TS.tick_get clock) "yielded";
logf (TS.tick_get clock) "main fiber done"
in
F.on_result fib (function
| Ok () -> logf (TS.tick_get clock) "main fiber result: ok"
| Error ebt ->
logf (TS.tick_get clock) "main fiber result: error %s" (Exn_bt.show ebt));
(try Fut.await @@ F.res fib
with Failure msg -> logf (TS.tick_get clock) "main fib failed with %S" msg);
logf (TS.tick_get clock) "main fiber exited";
Log_.print_and_clear ();
()

169
test/fiber/lib/fls.ml Normal file
View file

@ -0,0 +1,169 @@
open! Moonpool
module A = Atomic
module F = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls
(* ### dummy little tracing system with local storage *)
type span_id = int
let k_parent : span_id Hmap.key = Hmap.Key.create ()
let ( let@ ) = ( @@ )
let spf = Printf.sprintf
module Span = struct
let new_id_ : unit -> span_id =
let n = A.make 0 in
fun () -> A.fetch_and_add n 1
type t = {
id: span_id;
parent: span_id option;
msg: string;
}
end
module Tracer = struct
type t = { spans: Span.t list A.t }
let create () : t = { spans = A.make [] }
let get self = A.get self.spans
let add (self : t) span =
while
let old = A.get self.spans in
not (A.compare_and_set self.spans old (span :: old))
do
()
done
let with_span self name f =
let id = Span.new_id_ () in
let parent = FLS.get_in_local_hmap_opt k_parent in
let span = { Span.id; parent; msg = name } in
add self span;
FLS.with_in_local_hmap k_parent id f
end
module Render = struct
type span_tree = {
msg: string; (** message of the span at the root *)
children: span_tree list;
}
type t = { roots: span_tree list }
let build (tracer : Tracer.t) : t =
let tops : (span_id, Span.t) Hashtbl.t = Hashtbl.create 16 in
let children : (span_id, Span.t list) Hashtbl.t = Hashtbl.create 16 in
(* everyone is a root at first *)
let all_spans = Tracer.get tracer in
List.iter (fun (sp : Span.t) -> Hashtbl.add tops sp.id sp) all_spans;
(* now consider the parenting relationships *)
let add_span_to_parent (span : Span.t) =
match span.parent with
| None -> ()
| Some p ->
Hashtbl.remove tops span.id;
let l = try Hashtbl.find children p with Not_found -> [] in
Hashtbl.replace children p (span :: l)
in
List.iter add_span_to_parent all_spans;
(* build the tree *)
let rec build_tree (sp : Span.t) : span_tree =
let children = try Hashtbl.find children sp.id with Not_found -> [] in
let children = List.map build_tree children |> List.sort Stdlib.compare in
{ msg = sp.msg; children }
in
let roots =
Hashtbl.fold (fun _ sp l -> build_tree sp :: l) tops []
|> List.sort Stdlib.compare
in
{ roots }
let pp (oc : out_channel) (self : t) : unit =
let rec pp_tree indent out (t : span_tree) =
let prefix = String.make indent ' ' in
Printf.fprintf out "%s%S\n" prefix t.msg;
List.iter (pp_tree (indent + 2) out) t.children
in
List.iter (pp_tree 2 oc) self.roots
end
let run ~pool ~pool_name () =
let tracer = Tracer.create () in
let sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub () =
let@ () =
Tracer.with_span tracer
(spf "child_%d.%d.%d.%d" idx idx_child idx_sub idx_sub_sub)
in
for j = 1 to 5 do
let@ () = Tracer.with_span tracer (spf "iter.loop %d" j) in
F.yield ()
done
in
let sub_child ~idx ~idx_child ~idx_sub () =
let@ () =
Tracer.with_span tracer (spf "child_%d.%d.%d" idx idx_child idx_sub)
in
for i = 1 to 10 do
let@ () = Tracer.with_span tracer (spf "iter.loop %02d" i) in
F.yield ()
done;
let subs =
List.init 2 (fun idx_sub_sub ->
F.spawn ~protect:true (fun () ->
sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ()))
in
List.iter F.await subs
in
let top_child ~idx ~idx_child () =
let@ () = Tracer.with_span tracer (spf "child.%d.%d" idx idx_child) in
let subs =
List.init 2 (fun k ->
F.spawn ~protect:true @@ fun () ->
sub_child ~idx ~idx_child ~idx_sub:k ())
in
let@ () =
Tracer.with_span tracer
(spf "child.%d.%d.99.await_children" idx idx_child)
in
List.iter F.await subs
in
let top idx =
let@ () = Tracer.with_span tracer (spf "top_%d" idx) in
let subs =
List.init 5 (fun j ->
F.spawn ~protect:true @@ fun () -> top_child ~idx ~idx_child:j ())
in
List.iter F.await subs
in
Printf.printf "run test on pool = %s\n" pool_name;
let fibs =
List.init 8 (fun idx -> F.spawn_top ~on:pool (fun () -> top idx))
in
List.iter F.await fibs;
Printf.printf "tracing complete\n";
Printf.printf "spans:\n";
let tree = Render.build tracer in
Render.pp stdout tree;
Printf.printf "done\n%!";
()

View file

@ -1,179 +1,6 @@
open! Moonpool
module A = Atomic
module F = Moonpool_fib.Fiber
let ( let@ ) = ( @@ )
let runner = Fifo_pool.create ~num_threads:1 ()
module TS = struct
type t = int list
let show (s : t) = String.concat "." @@ List.map string_of_int s
let init = [ 0 ]
let next_ = function
| [] -> [ 0 ]
| n :: tl -> (n + 1) :: tl
let tick (t : t ref) = t := next_ !t
let tick_get t =
tick t;
!t
end
(* more deterministic logging of events *)
module Log_ = struct
let events : (TS.t * string) list A.t = A.make []
let add_event t msg : unit =
while
let old = A.get events in
not (A.compare_and_set events old ((t, msg) :: old))
do
()
done
let logf t fmt = Printf.ksprintf (add_event t) fmt
let print_and_clear () =
let l =
A.exchange events []
|> List.map (fun (ts, msg) -> List.rev ts, msg)
|> List.sort Stdlib.compare
in
List.iter (fun (ts, msg) -> Printf.printf "%s: %s\n" (TS.show ts) msg) l
end
let logf = Log_.logf
let () =
Printf.printf "============\nstart\n";
let clock = ref TS.init in
let fib =
F.spawn_top ~on:runner @@ fun () ->
let chan_progress = Chan.create ~max_size:4 () in
let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in
let subs =
List.init 5 (fun i ->
F.spawn ~protect:false @@ fun _n ->
Thread.delay (float i *. 0.01);
Chan.pop chans.(i);
Chan.push chan_progress i;
F.check_if_cancelled ();
i)
in
logf (TS.tick_get clock) "wait for subs";
F.spawn_ignore (fun () ->
for i = 0 to 4 do
Chan.push chans.(i) ();
let i' = Chan.pop chan_progress in
assert (i = i')
done);
(let clock0 = !clock in
List.iteri
(fun i f ->
let clock = ref (0 :: i :: clock0) in
logf !clock "await fiber %d" i;
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
(Option.is_some @@ F.Private_.get_cur_opt ());
let res = F.await f in
logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i
(Option.is_some @@ F.Private_.get_cur_opt ());
F.yield ();
logf (TS.tick_get clock) "res %d = %d" i res)
subs);
logf (TS.tick_get clock) "main fiber done"
in
Fut.wait_block_exn @@ F.res fib;
logf (TS.tick_get clock) "main fiber exited";
Log_.print_and_clear ();
()
let () =
let@ _r = Moonpool_fib.main in
(* same but now, cancel one of the sub-fibers *)
Printf.printf "============\nstart\n";
let clock = ref TS.init in
let fib =
F.spawn_top ~on:runner @@ fun () ->
let@ () =
F.with_on_self_cancel (fun ebt ->
logf (TS.tick_get clock) "main fiber cancelled with %s"
@@ Exn_bt.show ebt)
in
let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in
let chan_progress = Chan.create ~max_size:4 () in
logf (TS.tick_get clock) "start fibers";
let subs =
let clock0 = !clock in
List.init 10 (fun i ->
let clock = ref (0 :: i :: clock0) in
F.spawn ~protect:false @@ fun _n ->
let@ () =
F.with_on_self_cancel (fun _ ->
logf (TS.tick_get clock) "sub-fiber %d was cancelled" i)
in
Thread.delay 0.002;
(* sync for determinism *)
Chan.pop chans_unblock.(i);
Chan.push chan_progress i;
if i = 7 then (
logf (TS.tick_get clock) "I'm fiber %d and I'm about to fail…" i;
failwith "oh no!"
);
F.check_if_cancelled ();
i)
in
let post = TS.tick_get clock in
List.iteri
(fun i fib ->
F.on_result fib (function
| Ok _ -> logf (i :: post) "fiber %d resolved as ok" i
| Error _ -> logf (i :: post) "fiber %d resolved as error" i))
subs;
(* sequentialize the fibers, for determinism *)
F.spawn_ignore (fun () ->
for j = 0 to 9 do
Chan.push chans_unblock.(j) ();
let j' = Chan.pop chan_progress in
assert (j = j')
done);
logf (TS.tick_get clock) "wait for subs";
List.iteri
(fun i f ->
logf (TS.tick_get clock) "await fiber %d" i;
let res = F.await f in
logf (TS.tick_get clock) "res %d = %d" i res)
subs;
logf (TS.tick_get clock) "yield";
F.yield ();
logf (TS.tick_get clock) "yielded";
logf (TS.tick_get clock) "main fiber done"
in
F.on_result fib (function
| Ok () -> logf (TS.tick_get clock) "main fiber result: ok"
| Error ebt ->
logf (TS.tick_get clock) "main fiber result: error %s" (Exn_bt.show ebt));
(try Fut.wait_block_exn @@ F.res fib
with Failure msg -> logf (TS.tick_get clock) "main fib failed with %S" msg);
logf (TS.tick_get clock) "main fiber exited";
Log_.print_and_clear ();
()
let@ runner = Moonpool_fib.main in
T_fibers.Fib.run1 ~runner ();
T_fibers.Fib.run2 ~runner ()

View file

@ -1930,7 +1930,7 @@ spans:
"iter.loop 09"
"iter.loop 10"
done
run test on pool = ws_pool
run test on pool = fifo_pool
tracing complete
spans:
"top_0"

View file

@ -1,177 +1,12 @@
open! Moonpool
module A = Atomic
module F = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls
(* ### dummy little tracing system with local storage *)
type span_id = int
let k_parent : span_id Hmap.key = Hmap.Key.create ()
let ( let@ ) = ( @@ )
let spf = Printf.sprintf
module Span = struct
let new_id_ : unit -> span_id =
let n = A.make 0 in
fun () -> A.fetch_and_add n 1
type t = {
id: span_id;
parent: span_id option;
msg: string;
}
end
module Tracer = struct
type t = { spans: Span.t list A.t }
let create () : t = { spans = A.make [] }
let get self = A.get self.spans
let add (self : t) span =
while
let old = A.get self.spans in
not (A.compare_and_set self.spans old (span :: old))
do
()
done
let with_span self name f =
let id = Span.new_id_ () in
let parent = FLS.get_in_local_hmap_opt k_parent in
let span = { Span.id; parent; msg = name } in
add self span;
FLS.with_in_local_hmap k_parent id f
end
module Render = struct
type span_tree = {
msg: string; (** message of the span at the root *)
children: span_tree list;
}
type t = { roots: span_tree list }
let build (tracer : Tracer.t) : t =
let tops : (span_id, Span.t) Hashtbl.t = Hashtbl.create 16 in
let children : (span_id, Span.t list) Hashtbl.t = Hashtbl.create 16 in
(* everyone is a root at first *)
let all_spans = Tracer.get tracer in
List.iter (fun (sp : Span.t) -> Hashtbl.add tops sp.id sp) all_spans;
(* now consider the parenting relationships *)
let add_span_to_parent (span : Span.t) =
match span.parent with
| None -> ()
| Some p ->
Hashtbl.remove tops span.id;
let l = try Hashtbl.find children p with Not_found -> [] in
Hashtbl.replace children p (span :: l)
in
List.iter add_span_to_parent all_spans;
(* build the tree *)
let rec build_tree (sp : Span.t) : span_tree =
let children = try Hashtbl.find children sp.id with Not_found -> [] in
let children = List.map build_tree children |> List.sort Stdlib.compare in
{ msg = sp.msg; children }
in
let roots =
Hashtbl.fold (fun _ sp l -> build_tree sp :: l) tops []
|> List.sort Stdlib.compare
in
{ roots }
let pp (oc : out_channel) (self : t) : unit =
let rec pp_tree indent out (t : span_tree) =
let prefix = String.make indent ' ' in
Printf.fprintf out "%s%S\n" prefix t.msg;
List.iter (pp_tree (indent + 2) out) t.children
in
List.iter (pp_tree 2 oc) self.roots
end
let run ~pool ~pool_name () =
let tracer = Tracer.create () in
let sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub () =
let@ () =
Tracer.with_span tracer
(spf "child_%d.%d.%d.%d" idx idx_child idx_sub idx_sub_sub)
in
for j = 1 to 5 do
let@ () = Tracer.with_span tracer (spf "iter.loop %d" j) in
F.yield ()
done
in
let sub_child ~idx ~idx_child ~idx_sub () =
let@ () =
Tracer.with_span tracer (spf "child_%d.%d.%d" idx idx_child idx_sub)
in
for i = 1 to 10 do
let@ () = Tracer.with_span tracer (spf "iter.loop %02d" i) in
F.yield ()
done;
let subs =
List.init 2 (fun idx_sub_sub ->
F.spawn ~protect:true (fun () ->
sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ()))
in
List.iter F.await subs
in
let top_child ~idx ~idx_child () =
let@ () = Tracer.with_span tracer (spf "child.%d.%d" idx idx_child) in
let subs =
List.init 2 (fun k ->
F.spawn ~protect:true @@ fun () ->
sub_child ~idx ~idx_child ~idx_sub:k ())
in
let@ () =
Tracer.with_span tracer
(spf "child.%d.%d.99.await_children" idx idx_child)
in
List.iter F.await subs
in
let top idx =
let@ () = Tracer.with_span tracer (spf "top_%d" idx) in
let subs =
List.init 5 (fun j ->
F.spawn ~protect:true @@ fun () -> top_child ~idx ~idx_child:j ())
in
List.iter F.await subs
in
Printf.printf "run test on pool = %s\n" pool_name;
let fibs =
List.init 8 (fun idx -> F.spawn_top ~on:pool (fun () -> top idx))
in
List.iter F.wait_block_exn fibs;
Printf.printf "tracing complete\n";
Printf.printf "spans:\n";
let tree = Render.build tracer in
Render.pp stdout tree;
Printf.printf "done\n%!";
()
let () =
let@ _ = Moonpool_fib.main in
(let@ pool = Ws_pool.with_ () in
run ~pool ~pool_name:"ws_pool" ());
T_fibers.Fls.run ~pool ~pool_name:"ws_pool" ());
(let@ pool = Fifo_pool.with_ () in
run ~pool ~pool_name:"ws_pool" ());
T_fibers.Fls.run ~pool ~pool_name:"fifo_pool" ());
()

View file

@ -16,7 +16,7 @@
(action
(with-stdout-to
%{targets}
(run ./run_hash.sh -d ../data/ --n-conn=2 -j=4))))
(run ./run_hash.sh -d ../data/ --n-conn=2))))
(rule
(alias runtest)
@ -36,9 +36,12 @@
(= %{system} "linux")
(>= %{ocaml_version} 5.0)))
(action
(with-stdout-to
%{targets}
(run ./run_echo.sh -n 10 --n-conn=2 -j=4))))
(setenv
CI_MODE
1
(with-stdout-to
%{targets}
(run ./run_echo.sh -n 10 --n-conn=2 -v)))))
(rule
(alias runtest)

View file

@ -1,93 +1,117 @@
module M = Moonpool
module M_lwt = Moonpool_lwt
module Trace = Trace_core
let ci_mode = Option.is_some @@ Sys.getenv_opt "CI_MODE"
let spf = Printf.sprintf
let await_lwt = Moonpool_lwt.await_lwt
let ( let@ ) = ( @@ )
let lock_stdout = M.Lock.create ()
let main ~port ~runner ~n ~n_conn () : unit Lwt.t =
let main ~port ~n ~n_conn ~verbose ~msg_per_conn () : unit =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
let remaining = Atomic.make n in
let all_done = Atomic.make 0 in
let fut_exit, prom_exit = M.Fut.make () in
Printf.printf "connecting to port %d\n%!" port;
let t0 = Unix.gettimeofday () in
Printf.printf
"connecting to port %d (%d msg per conn, %d conns total, %d max at a time)\n\
%!"
port msg_per_conn n n_conn;
let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in
let rec run_task () =
let token_pool = Lwt_pool.create n_conn (fun () -> Lwt.return_unit) in
let n_msg_total = ref 0 in
let run_task () =
(* Printf.printf "running task\n%!"; *)
let n = Atomic.fetch_and_add remaining (-1) in
if n > 0 then (
(let _sp =
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "connect.client"
in
Trace.message "connecting new client…";
M_lwt.TCP_client.with_connect addr @@ fun ic oc ->
let buf = Bytes.create 32 in
let@ () = Lwt_pool.use token_pool in
for _j = 1 to 10 do
let _sp =
Trace.enter_manual_sub_span ~parent:_sp ~__FILE__ ~__LINE__
"write.loop"
in
let@ () = M_lwt.spawn_lwt in
let _sp =
Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "connect.client"
in
Trace.message "connecting new client…";
let s = spf "hello %d" _j in
M_lwt.IO_out.output_string oc s;
M_lwt.IO_out.flush oc;
let ic, oc = Lwt_io.open_connection addr |> await_lwt in
(* read back something *)
M_lwt.IO_in.really_input ic buf 0 (String.length s);
(let@ () = M.Lock.with_ lock_stdout in
Printf.printf "read: %s\n%!"
(Bytes.sub_string buf 0 (String.length s)));
Trace.exit_manual_span _sp;
()
done;
Trace.exit_manual_span _sp);
let cleanup () =
Trace.message "closing connection";
Lwt_io.close ic |> await_lwt;
Lwt_io.close oc |> await_lwt
in
(* run another task *) M.Runner.run_async runner run_task
) else (
(* if we're the last to exit, resolve the promise *)
let n_already_done = Atomic.fetch_and_add all_done 1 in
if n_already_done = n_conn - 1 then (
(let@ () = M.Lock.with_ lock_stdout in
Printf.printf "all done\n%!");
M.Fut.fulfill prom_exit @@ Ok ()
)
)
let@ () = Fun.protect ~finally:cleanup in
let buf = Bytes.create 32 in
for _j = 1 to msg_per_conn do
let _sp =
Trace.enter_manual_span
~parent:(Some (Trace.ctx_of_span _sp))
~__FILE__ ~__LINE__ "write.loop"
in
let s = spf "hello %d" _j in
Lwt_io.write oc s |> await_lwt;
Lwt_io.flush oc |> await_lwt;
incr n_msg_total;
(* read back something *)
Lwt_io.read_into_exactly ic buf 0 (String.length s) |> await_lwt;
if verbose then
Printf.printf "read: %s\n%!" (Bytes.sub_string buf 0 (String.length s));
Trace.exit_manual_span _sp;
()
done;
Trace.exit_manual_span _sp
in
(* start the first [n_conn] tasks *)
for _i = 1 to n_conn do
M.Runner.run_async runner run_task
done;
let futs = List.init n (fun _ -> run_task ()) in
Lwt.join futs |> await_lwt;
(* exit when [fut_exit] is resolved *)
M_lwt.lwt_of_fut fut_exit
Printf.printf "all done\n%!";
let elapsed = Unix.gettimeofday () -. t0 in
if not ci_mode then
Printf.printf " sent %d messages in %.4fs (%.2f msg/s)\n%!" !n_msg_total
elapsed
(float !n_msg_total /. elapsed);
()
let () =
let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "main";
let port = ref 0 in
let j = ref 4 in
let n_conn = ref 100 in
let n = ref 50_000 in
let msg_per_conn = ref 10 in
let verbose = ref false in
let opts =
[
"-p", Arg.Set_int port, " port";
"-j", Arg.Set_int j, " number of threads";
"-n", Arg.Set_int n, " total number of connections";
"--n-conn", Arg.Set_int n_conn, " number of parallel connections";
( "--msg-per-conn",
Arg.Set_int msg_per_conn,
" messages sent per connection" );
"-v", Arg.Set verbose, " verbose";
( "--n-conn",
Arg.Set_int n_conn,
" maximum number of connections opened simultaneously" );
]
|> Arg.align
in
Arg.parse opts ignore "echo client";
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
(* Lwt_engine.set @@ new Lwt_engine.libev (); *)
Lwt_main.run @@ main ~runner ~port:!port ~n:!n ~n_conn:!n_conn ()
let main () =
(* Lwt_engine.set @@ new Lwt_engine.libev (); *)
M_lwt.lwt_main @@ fun _runner ->
main ~port:!port ~n:!n ~n_conn:!n_conn ~verbose:!verbose
~msg_per_conn:!msg_per_conn ()
in
print_endline "first run";
main ();
assert (not (M_lwt.is_setup ()));
print_endline "second run";
main ();
assert (not (M_lwt.is_setup ()));
print_endline "done"

View file

@ -3,6 +3,7 @@ module M_lwt = Moonpool_lwt
module Trace = Trace_core
let ( let@ ) = ( @@ )
let await_lwt = M_lwt.await_lwt
let spf = Printf.sprintf
let str_of_sockaddr = function
@ -10,52 +11,63 @@ let str_of_sockaddr = function
| Unix.ADDR_INET (addr, port) ->
spf "%s:%d" (Unix.string_of_inet_addr addr) port
let main ~port ~runner () : unit Lwt.t =
let main ~port ~verbose ~runner:_ () : unit =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
let lwt_fut, _lwt_prom = Lwt.wait () in
(* TODO: handle exit?? *)
(* TODO: handle exit?? ctrl-c? *)
Printf.printf "listening on port %d\n%!" port;
let handle_client client_addr ic oc =
let handle_client client_addr (ic, oc) : _ Lwt.t =
let@ () = M_lwt.spawn_lwt in
let _sp =
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "handle.client"
Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "handle.client"
~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ])
in
if verbose then
Printf.printf "got new client on %s\n%!" (str_of_sockaddr client_addr);
let buf = Bytes.create 32 in
let continue = ref true in
while !continue do
Trace.message "read";
let n = M_lwt.IO_in.input ic buf 0 (Bytes.length buf) in
let n = Lwt_io.read_into ic buf 0 (Bytes.length buf) |> await_lwt in
if n = 0 then
continue := false
else (
Trace.messagef (fun k -> k "got %dB" n);
M_lwt.IO_out.output oc buf 0 n;
M_lwt.IO_out.flush oc;
Lwt_io.write_from_exactly oc buf 0 n |> await_lwt;
Lwt_io.flush oc |> await_lwt;
Trace.message "write"
)
done;
if verbose then
Printf.printf "done with client on %s\n%!" (str_of_sockaddr client_addr);
Trace.exit_manual_span _sp;
Trace.message "exit handle client"
in
let addr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
let _server = M_lwt.TCP_server.establish ~runner addr handle_client in
let _server =
Lwt_io.establish_server_with_client_address addr handle_client |> await_lwt
in
lwt_fut
M_lwt.await_lwt lwt_fut
let () =
let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "main";
let port = ref 0 in
let j = ref 4 in
let verbose = ref false in
let opts =
[
"-p", Arg.Set_int port, " port"; "-j", Arg.Set_int j, " number of threads";
"-v", Arg.Set verbose, " verbose";
"-p", Arg.Set_int port, " port";
"-j", Arg.Set_int j, " number of threads";
]
|> Arg.align
in
@ -63,4 +75,4 @@ let () =
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
(* Lwt_engine.set @@ new Lwt_engine.libev (); *)
Lwt_main.run @@ main ~runner ~port:!port ()
M_lwt.lwt_main @@ fun _ -> main ~runner ~port:!port ~verbose:!verbose ()

17
test/lwt/fibers/dune Normal file
View file

@ -0,0 +1,17 @@
(tests
(names t_fls t_main t_fib1)
(enabled_if
(>= %{ocaml_version} 5.0))
(package moonpool-lwt)
(libraries
t_fibers
moonpool
moonpool.fib
moonpool-lwt
hmap
trace
trace-tef
qcheck-core
qcheck-core.runner
;tracy-client.trace
))

View file

@ -0,0 +1,61 @@
============
start
1: wait for subs
1.0.0: await fiber 0
1.0.1: cur fiber[0] is some: true
1.0.2: cur fiber[0] is some: true
1.0.3: res 0 = 0
1.1.0: await fiber 1
1.1.1: cur fiber[1] is some: true
1.1.2: cur fiber[1] is some: true
1.1.3: res 1 = 1
1.2.0: await fiber 2
1.2.1: cur fiber[2] is some: true
1.2.2: cur fiber[2] is some: true
1.2.3: res 2 = 2
1.3.0: await fiber 3
1.3.1: cur fiber[3] is some: true
1.3.2: cur fiber[3] is some: true
1.3.3: res 3 = 3
1.4.0: await fiber 4
1.4.1: cur fiber[4] is some: true
1.4.2: cur fiber[4] is some: true
1.4.3: res 4 = 4
2: main fiber done
3: main fiber exited
============
start
1: start fibers
1.7.1: I'm fiber 7 and I'm about to fail…
1.8.1: sub-fiber 8 was cancelled
1.9.1: sub-fiber 9 was cancelled
2.0: fiber 0 resolved as ok
2.1: fiber 1 resolved as ok
2.2: fiber 2 resolved as ok
2.3: fiber 3 resolved as ok
2.4: fiber 4 resolved as ok
2.5: fiber 5 resolved as ok
2.6: fiber 6 resolved as ok
2.7: fiber 7 resolved as error
2.8: fiber 8 resolved as error
2.9: fiber 9 resolved as error
3: wait for subs
4: await fiber 0
5: res 0 = 0
6: await fiber 1
7: res 1 = 1
8: await fiber 2
9: res 2 = 2
10: await fiber 3
11: res 3 = 3
12: await fiber 4
13: res 4 = 4
14: await fiber 5
15: res 5 = 5
16: await fiber 6
17: res 6 = 6
18: await fiber 7
19: main fiber cancelled with Failure("oh no!")
20: main fiber result: error Failure("oh no!")
21: main fib failed with "oh no!"
22: main fiber exited

View file

@ -0,0 +1,8 @@
module M_lwt = Moonpool_lwt
let ( let@ ) = ( @@ )
let () =
let@ runner = M_lwt.lwt_main in
T_fibers.Fib.run1 ~runner ();
T_fibers.Fib.run2 ~runner ()

File diff suppressed because it is too large Load diff

8
test/lwt/fibers/t_fls.ml Normal file
View file

@ -0,0 +1,8 @@
module M_lwt = Moonpool_lwt
let ( let@ ) = ( @@ )
let () =
(let@ runner = M_lwt.lwt_main in
T_fibers.Fls.run ~pool:runner ~pool_name:"lwt" ());
()

35
test/lwt/fibers/t_main.ml Normal file
View file

@ -0,0 +1,35 @@
open Moonpool
module M_lwt = Moonpool_lwt
module F = Moonpool_fib
let ( let@ ) = ( @@ )
let () =
(* run fibers in the background, await them in the main thread *)
let@ bg = Fifo_pool.with_ ~num_threads:4 () in
let r =
M_lwt.lwt_main @@ fun runner ->
let f1 = F.spawn_top ~on:bg (fun () -> 1) in
let f2 = F.spawn_top ~on:runner (fun () -> 2) in
let f3 = F.spawn_top ~on:runner (fun () -> F.await f1 + 10) in
let r = F.await f2 + F.await f3 in
assert (r = 13);
r
in
assert (r = 13)
let () =
(* run multiple times to make sure cleanup is correct *)
for _i = 1 to 10 do
try
let _r =
M_lwt.lwt_main @@ fun runner ->
let fib = F.spawn_top ~on:runner (fun () -> failwith "oops") in
F.await fib
in
assert false
with Failure msg ->
(* Printf.eprintf "got %S\n%!" msg; *)
assert (msg = "oops")
done

View file

@ -1,4 +1,3 @@
module M = Moonpool
module M_lwt = Moonpool_lwt
module Trace = Trace_core
@ -8,10 +7,10 @@ module Str_tbl = Hashtbl.Make (struct
let hash = Hashtbl.hash
end)
let await_lwt = Moonpool_lwt.await_lwt
let ( let@ ) = ( @@ )
let lock_stdout = M.Lock.create ()
let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t =
let main ~port ~ext ~dir ~n_conn () : unit =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
Printf.printf "hash dir=%S\n%!" dir;
@ -20,12 +19,15 @@ let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t =
let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in
(* TODO: *)
let run_task () : unit =
let _sp = Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "run-task" in
let run_task () : unit Lwt.t =
let@ () = M_lwt.spawn_lwt in
let _sp =
Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "run-task"
in
let seen = Str_tbl.create 16 in
M_lwt.TCP_client.with_connect_lwt addr @@ fun ic oc ->
let ic, oc = Lwt_io.open_connection addr |> await_lwt in
let rec walk file : unit =
if not (Sys.file_exists file) then
()
@ -33,7 +35,9 @@ let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t =
()
else if Sys.is_directory file then (
let _sp =
Trace.enter_manual_sub_span ~parent:_sp ~__FILE__ ~__LINE__ "walk-dir"
Trace.enter_manual_span
~parent:(Some (Trace.ctx_of_span _sp))
~__FILE__ ~__LINE__ "walk-dir"
~data:(fun () -> [ "d", `String file ])
in
@ -45,9 +49,8 @@ let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t =
()
else (
Str_tbl.add seen file ();
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.write_line oc file);
let res = M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.read_line ic) in
let@ () = M.Lock.with_ lock_stdout in
Lwt_io.write_line oc file |> await_lwt;
let res = Lwt_io.read_line ic |> await_lwt in
Printf.printf "%s\n%!" res
)
in
@ -56,16 +59,14 @@ let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t =
in
(* start the first [n_conn] tasks *)
let futs = List.init n_conn (fun _ -> M.Fut.spawn ~on:runner run_task) in
Lwt.join (List.map M_lwt.lwt_of_fut futs)
let futs = List.init n_conn (fun _ -> run_task ()) in
Lwt.join futs |> await_lwt
let () =
let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "main";
let port = ref 1234 in
let j = ref 4 in
let n_conn = ref 100 in
let ext = ref "" in
let dir = ref "." in
@ -73,7 +74,6 @@ let () =
let opts =
[
"-p", Arg.Set_int port, " port";
"-j", Arg.Set_int j, " number of threads";
"-d", Arg.Set_string dir, " directory to hash";
"--n-conn", Arg.Set_int n_conn, " number of parallel connections";
"--ext", Arg.Set_string ext, " extension to filter files";
@ -82,7 +82,6 @@ let () =
in
Arg.parse opts ignore "echo client";
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
(* Lwt_engine.set @@ new Lwt_engine.libev (); *)
Lwt_main.run
@@ main ~runner ~port:!port ~ext:!ext ~dir:!dir ~n_conn:!n_conn ()
M_lwt.lwt_main @@ fun _runner ->
main ~port:!port ~ext:!ext ~dir:!dir ~n_conn:!n_conn ()

View file

@ -1,4 +1,7 @@
(* vendored from https://github.com/dbuenzli/uuidm *)
(* vendored from https://github.com/dbuenzli/uuidm
This function is Copyright (c) 2008 The uuidm programmers.
SPDX-License-Identifier: ISC *)
let sha_1 s =
(* Based on pseudo-code of RFC 3174. Slow and ugly but does the job. *)
@ -116,28 +119,16 @@ let sha_1 s =
i2s h 16 !h4;
Bytes.unsafe_to_string h
(*---------------------------------------------------------------------------
Copyright (c) 2008 The uuidm programmers
(* ================== *)
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
(* test server that reads a list of files from each client connection, and sends back
to the client the hashes of these files *)
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
---------------------------------------------------------------------------*)
(* server that reads from sockets lists of files, and returns hashes of these files *)
module M = Moonpool
module M_lwt = Moonpool_lwt
module Trace = Trace_core
module Fut = Moonpool.Fut
let await_lwt = Moonpool_lwt.await_lwt
let ( let@ ) = ( @@ )
let spf = Printf.sprintf
@ -165,7 +156,7 @@ let read_file filename : string =
in
In_channel.with_open_bin filename In_channel.input_all
let main ~port ~runner () : unit Lwt.t =
let main ~port ~runner () : unit =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
let lwt_fut, _lwt_prom = Lwt.wait () in
@ -173,38 +164,39 @@ let main ~port ~runner () : unit Lwt.t =
(* TODO: handle exit?? *)
Printf.printf "listening on port %d\n%!" port;
let handle_client client_addr ic oc =
let handle_client client_addr (ic, oc) =
let@ () = Moonpool_lwt.spawn_lwt in
let _sp =
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "handle.client"
Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "handle.client"
~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ])
in
try
while true do
Trace.message "read";
let filename =
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.read_line ic)
|> String.trim
in
let filename = Lwt_io.read_line ic |> await_lwt |> String.trim in
Trace.messagef (fun k -> k "hash %S" filename);
match read_file filename with
| exception e ->
Printf.eprintf "error while reading %S:\n%s\n" filename
(Printexc.to_string e);
M_lwt.run_in_lwt_and_await (fun () ->
Lwt_io.write_line oc (spf "%s: error" filename));
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.flush oc)
Lwt_io.write_line oc (spf "%s: error" filename) |> await_lwt;
Lwt_io.flush oc |> await_lwt
| content ->
(* got the content, now hash it *)
let hash =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "hash" in
(* got the content, now hash it in a background task *)
let hash : _ Fut.t =
let@ () = Moonpool.spawn ~on:runner in
let@ _sp =
Trace.with_span ~__FILE__ ~__LINE__ "hash" ~data:(fun () ->
[ "file", `String filename ])
in
sha_1 content |> to_hex
in
M_lwt.run_in_lwt_and_await (fun () ->
Lwt_io.write_line oc (spf "%s: %s" filename hash));
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.flush oc)
let hash = Fut.await hash in
Lwt_io.write_line oc (spf "%s: %s" filename hash) |> await_lwt;
Lwt_io.flush oc |> await_lwt
done
with End_of_file | Unix.Unix_error (Unix.ECONNRESET, _, _) ->
Trace.exit_manual_span _sp;
@ -212,16 +204,17 @@ let main ~port ~runner () : unit Lwt.t =
in
let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in
let _server = M_lwt.TCP_server.establish_lwt ~runner addr handle_client in
Printf.printf "listening on port=%d\n%!" port;
let _server =
Lwt_io.establish_server_with_client_address addr handle_client |> await_lwt
in
lwt_fut
lwt_fut |> await_lwt
let () =
let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "main";
let port = ref 1234 in
let j = ref 4 in
let j = ref 0 in
let opts =
[
@ -231,6 +224,14 @@ let () =
in
Arg.parse opts ignore "echo server";
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
(* Lwt_engine.set @@ new Lwt_engine.libev (); *)
Lwt_main.run @@ main ~runner ~port:!port ()
let@ runner =
let num_threads =
if !j = 0 then
None
else
Some !j
in
Moonpool.Ws_pool.with_ ?num_threads ()
in
M_lwt.lwt_main @@ fun _main_runner -> main ~runner ~port:!port ()

View file

@ -1,8 +1,22 @@
run echo server on port=12346
listening on port 12346
run echo client -p 12346 -n 10 --n-conn=2 -j=4
run echo client -p 12346 -n 10 --n-conn=2 -v
all done
connecting to port 12346
all done
connecting to port 12346 (10 msg per conn, 10 conns total, 2 max at a time)
connecting to port 12346 (10 msg per conn, 10 conns total, 2 max at a time)
done
first run
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
@ -23,6 +37,26 @@ read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
@ -43,6 +77,26 @@ read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
@ -63,6 +117,26 @@ read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
@ -83,6 +157,26 @@ read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
@ -103,3 +197,14 @@ read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
second run

View file

@ -1,7 +1,6 @@
running hash server on port=12345
listening on port 12345
listening on port=12345
run hash client -p 12345 -d ../data/ --n-conn=2 -j=4
run hash client -p 12345 -d ../data/ --n-conn=2
../data/d1/large: fdb479c5661572f9606266eeb280b4db5c26cc38
../data/d1/large: fdb479c5661572f9606266eeb280b4db5c26cc38
../data/d1/large_10: c31560efa1a5ad6dbf89990d51878f3bd64b13ce

View file

@ -1,111 +0,0 @@
module BQ = Moonpool.Bounded_queue
module Bb_queue = Moonpool.Blocking_queue
module A = Moonpool.Atomic
let spawn f = ignore (Moonpool.start_thread_on_some_domain f () : Thread.t)
let () =
let bq = BQ.create ~max_size:3 () in
BQ.push bq 1;
BQ.push bq 2;
assert (BQ.size bq = 2);
assert (BQ.pop bq = 1);
assert (BQ.pop bq = 2);
assert (BQ.try_pop ~force_lock:true bq = None);
spawn (fun () -> BQ.push bq 3);
assert (BQ.pop bq = 3)
let () =
(* cannot create with size 0 *)
assert (
try
ignore (BQ.create ~max_size:0 ());
false
with _ -> true)
let () =
let bq = BQ.create ~max_size:3 () in
BQ.push bq 1;
BQ.push bq 2;
assert (BQ.size bq = 2);
assert (BQ.pop bq = 1);
BQ.close bq;
assert (BQ.pop bq = 2);
assert (
try
ignore (BQ.pop bq);
false
with BQ.Closed -> true);
assert (
try
ignore (BQ.push bq 42);
false
with BQ.Closed -> true)
let () =
let bq = BQ.create ~max_size:2 () in
let side_q = Bb_queue.create () in
BQ.push bq 1;
BQ.push bq 2;
spawn (fun () ->
for i = 3 to 10 do
BQ.push bq i;
Bb_queue.push side_q (`Pushed i)
done);
(* make space for new element *)
assert (BQ.pop bq = 1);
assert (Bb_queue.pop side_q = `Pushed 3);
assert (BQ.pop bq = 2);
assert (BQ.pop bq = 3);
for j = 4 to 10 do
assert (BQ.pop bq = j);
assert (Bb_queue.pop side_q = `Pushed j)
done;
assert (BQ.size bq = 0);
()
let () =
let bq = BQ.create ~max_size:5 () in
let bq1 = BQ.create ~max_size:10 () in
let bq2 = BQ.create ~max_size:10 () in
let bq_res = BQ.create ~max_size:2 () in
(* diamond:
bq -------> bq1
| |
| |
v v
bq2 -----> bq_res *)
spawn (fun () ->
BQ.to_iter bq (BQ.push bq1);
BQ.close bq1);
spawn (fun () ->
BQ.to_iter bq (BQ.push bq2);
BQ.close bq2);
spawn (fun () -> BQ.to_iter bq1 (BQ.push bq_res));
spawn (fun () -> BQ.to_iter bq2 (BQ.push bq_res));
let n = 100_000 in
(* push into [bq] *)
let sum = A.make 0 in
spawn (fun () ->
for i = 1 to n do
ignore (A.fetch_and_add sum i : int);
BQ.push bq i
done;
BQ.close bq);
let sum' = ref 0 in
for _j = 1 to n do
let x = BQ.pop bq_res in
sum' := x + !sum'
done;
assert (BQ.size bq_res = 0);
assert (A.get sum = !sum')