Compare commits

..

3 commits
v0.10 ... main

Author SHA1 Message Date
Simon Cruanes
189a95a514
fix: in Lock, prevent flambda from reordering mutex-protected operations
Some checks failed
github pages / Deploy doc (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / build-compat (push) Has been cancelled
Build and Test / format (push) Has been cancelled
inspired by Mutex.protect in the stdlib, which is also `[@inline never]`
for this reason
2025-11-19 12:28:09 -05:00
Simon Cruanes
0959004b11
document how many threads are used for work in Ws_pool 2025-11-19 12:24:33 -05:00
Simon Cruanes
75e528413b
remove mentions of ocaml4 in readme
Some checks failed
github pages / Deploy doc (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / build-compat (push) Has been cancelled
Build and Test / format (push) Has been cancelled
2025-11-14 22:56:37 -05:00
4 changed files with 35 additions and 36 deletions

View file

@ -16,16 +16,13 @@ In addition, some concurrency and parallelism primitives are provided:
- `Moonpool.Chan` provides simple cooperative and thread-safe channels - `Moonpool.Chan` provides simple cooperative and thread-safe channels
to use within pool-bound tasks. They're essentially re-usable futures. to use within pool-bound tasks. They're essentially re-usable futures.
On OCaml 5 (meaning there's actual domains and effects, not just threads), Moonpool now requires OCaml 5 (meaning there's actual domains and effects, not just threads),
a `Fut.await` primitive is provided. It's simpler and more powerful so the `Fut.await` primitive is always provided. It's simpler and more powerful
than the monadic combinators. than the monadic combinators.
- `Moonpool_forkjoin`, in the library `moonpool.forkjoin` - `Moonpool_forkjoin`, in the library `moonpool.forkjoin`
provides the fork-join parallelism primitives provides the fork-join parallelism primitives
to use within tasks running in the pool. to use within tasks running in the pool.
On OCaml 4.xx, there is only one domain; all threads run on it, but the
pool abstraction is still useful to provide preemptive concurrency.
## Usage ## Usage
The user can create several thread pools (implementing the interface `Runner.t`). The user can create several thread pools (implementing the interface `Runner.t`).
@ -182,7 +179,7 @@ scope).
### Fork-join ### Fork-join
On OCaml 5, again using effect handlers, the sublibrary `moonpool.forkjoin` The sub-library `moonpool.forkjoin`
provides a module `Moonpool_forkjoin` provides a module `Moonpool_forkjoin`
implements the [fork-join model](https://en.wikipedia.org/wiki/Fork%E2%80%93join_model). implements the [fork-join model](https://en.wikipedia.org/wiki/Fork%E2%80%93join_model).
It must run on a pool (using `Runner.run_async` or inside a future via `Fut.spawn`). It must run on a pool (using `Runner.run_async` or inside a future via `Fut.spawn`).
@ -296,21 +293,18 @@ You are assuming that, if pool P1 has 5000 tasks, and pool P2 has 10 other tasks
## OCaml versions ## OCaml versions
This works for OCaml >= 4.08. This works for OCaml >= 5.00.
- On OCaml 4.xx, there are no domains, so this is just a library for regular thread pools
with not actual parallelism (except for threads that call C code that releases the runtime lock, that is).
C calls that do release the runtime lock (e.g. to call [Z3](https://github.com/Z3Prover/z3), hash a file, etc.)
will still run in parallel.
- on OCaml 5.xx, there is a fixed pool of domains (using the recommended domain count).
These domains do not do much by themselves, but we schedule new threads on them, and form pools
of threads that contain threads from each domain.
Each domain might thus have multiple threads that belong to distinct pools (and several threads from
the same pool, too — this is useful for threads blocking on IO); Each pool will have threads
running on distinct domains, which enables parallelism.
A useful analogy is that each domain is a bit like a CPU core, and `Thread.t` is a logical thread running on a core. Internally, there is a fixed pool of domains (using the recommended domain count).
Multiple threads have to share a single core and do not run in parallel on it[^2]. These domains do not do much by themselves, but we schedule new threads on them, and form pools
We can therefore build pools that spread their worker threads on multiple cores to enable parallelism within each pool. of threads that contain threads from each domain.
Each domain might thus have multiple threads that belong to distinct pools (and several threads from
the same pool, too — this is useful for threads blocking on IO); Each pool will have threads
running on distinct domains, which enables parallelism.
A useful analogy is that each domain is a bit like a CPU core, and `Thread.t` is a logical thread running on a core.
Multiple threads have to share a single core and do not run in parallel on it[^2].
We can therefore build pools that spread their worker threads on multiple cores to enable parallelism within each pool.
TODO: actually use https://github.com/haesbaert/ocaml-processor to pin domains to cores, TODO: actually use https://github.com/haesbaert/ocaml-processor to pin domains to cores,
possibly optionally using `select` in dune. possibly optionally using `select` in dune.
@ -326,3 +320,4 @@ $ opam install moonpool
``` ```
[^2]: ignoring hyperthreading for the sake of the analogy. [^2]: ignoring hyperthreading for the sake of the analogy.

View file

@ -29,10 +29,8 @@ val create : (unit -> t, _) create_args
(** [create ()] makes a new thread pool. (** [create ()] makes a new thread pool.
@param on_init_thread @param on_init_thread
called at the beginning of each new thread in the pool. called at the beginning of each new thread in the pool.
@param min @param num_threads
minimum size of the pool. See {!Pool.create_args}. The default is number of worker threads. See {!Ws_pool.create} for more details.
[Domain.recommended_domain_count()], ie one worker per CPU core. On OCaml
4 the default is [4] (since there is only one domain).
@param on_exit_thread called at the end of each worker thread in the pool. @param on_exit_thread called at the end of each worker thread in the pool.
@param name name for the pool, used in tracing (since 0.6) *) @param name name for the pool, used in tracing (since 0.6) *)

View file

@ -5,13 +5,13 @@ type 'a t = {
let create content : _ t = { mutex = Mutex.create (); content } let create content : _ t = { mutex = Mutex.create (); content }
let with_ (self : _ t) f = let[@inline never] with_ (self : _ t) f =
Mutex.lock self.mutex; Mutex.lock self.mutex;
try match f self.content with
let x = f self.content in | x ->
Mutex.unlock self.mutex; Mutex.unlock self.mutex;
x x
with e -> | exception e ->
Mutex.unlock self.mutex; Mutex.unlock self.mutex;
raise e raise e
@ -24,13 +24,13 @@ let[@inline] update_map l f =
l.content <- x'; l.content <- x';
y) y)
let get l = let[@inline never] get l =
Mutex.lock l.mutex; Mutex.lock l.mutex;
let x = l.content in let x = l.content in
Mutex.unlock l.mutex; Mutex.unlock l.mutex;
x x
let set l x = let[@inline never] set l x =
Mutex.lock l.mutex; Mutex.lock l.mutex;
l.content <- x; l.content <- x;
Mutex.unlock l.mutex Mutex.unlock l.mutex

View file

@ -1,8 +1,8 @@
(** Work-stealing thread pool. (** Work-stealing thread pool.
A pool of threads with a worker-stealing scheduler. The pool contains a A pool of threads with a worker-stealing scheduler. The pool contains a
fixed number of threads that wait for work items to come, process these, and fixed number of worker threads that wait for work items to come, process
loop. these, and loop.
This is good for CPU-intensive tasks that feature a lot of small tasks. Note This is good for CPU-intensive tasks that feature a lot of small tasks. Note
that tasks will not always be processed in the order they are scheduled, so that tasks will not always be processed in the order they are scheduled, so
@ -15,8 +15,8 @@
in it to stop (after they finish their work), and wait for them to stop. in it to stop (after they finish their work), and wait for them to stop.
The threads are distributed across a fixed domain pool (whose size is The threads are distributed across a fixed domain pool (whose size is
determined by {!Domain.recommended_domain_count} on OCaml 5, and simply the determined by {!Domain.recommended_domain_count}. See {!create} for more
single runtime on OCaml 4). *) details. *)
include module type of Runner include module type of Runner
@ -36,8 +36,14 @@ val create : (unit -> t, _) create_args
@param num_threads @param num_threads
size of the pool, ie. number of worker threads. It will be at least [1] size of the pool, ie. number of worker threads. It will be at least [1]
internally, so [0] or negative values make no sense. The default is internally, so [0] or negative values make no sense. The default is
[Domain.recommended_domain_count()], ie one worker thread per CPU core. On [Domain.recommended_domain_count()], ie one worker thread per CPU core.
OCaml 4 the default is [4] (since there is only one domain).
Note that specifying [num_threads=n] means that the degree of parallelism is
at most [n]. This behavior is different than the one of [Domainslib], see
https://github.com/c-cube/moonpool/issues/41 for context.
If you want to use all cores, use [Domain.recommended_domain_count()].
@param on_exit_thread called at the end of each thread in the pool @param on_exit_thread called at the end of each thread in the pool
@param name @param name
a name for this thread pool, used if tracing is enabled (since 0.6) *) a name for this thread pool, used if tracing is enabled (since 0.6) *)