diff --git a/dev/moonpool/Moonpool/Atomic/index.html b/dev/moonpool/Moonpool/Atomic/index.html deleted file mode 100644 index 31fe9ab2..00000000 --- a/dev/moonpool/Moonpool/Atomic/index.html +++ /dev/null @@ -1,2 +0,0 @@ - -Atomic (moonpool.Moonpool.Atomic)

Module Moonpool.Atomic

Atomic values.

This is either a shim using ref, on pre-OCaml 5, or the standard Atomic module on OCaml 5.

include module type of struct include Stdlib.Atomic end
type !'a t = 'a Stdlib.Atomic.t

An atomic (mutable) reference to a value of type 'a.

val make : 'a -> 'a t

Create an atomic reference.

val get : 'a t -> 'a

Get the current value of the atomic reference.

val set : 'a t -> 'a -> unit

Set a new value for the atomic reference.

val exchange : 'a t -> 'a -> 'a

Set a new value for the atomic reference, and return the current value.

val compare_and_set : 'a t -> 'a -> 'a -> bool

compare_and_set r seen v sets the new value of r to v only if its current value is physically equal to seen -- the comparison and the set occur atomically. Returns true if the comparison succeeded (so the set happened) and false otherwise.

val fetch_and_add : int t -> int -> int

fetch_and_add r n atomically increments the value of r by n, and returns the current value (before the increment).

val incr : int t -> unit

incr r atomically increments the value of r by 1.

val decr : int t -> unit

decr r atomically decrements the value of r by 1.

diff --git a/dev/moonpool/Moonpool/Exn_bt/index.html b/dev/moonpool/Moonpool/Exn_bt/index.html new file mode 100644 index 00000000..da518584 --- /dev/null +++ b/dev/moonpool/Moonpool/Exn_bt/index.html @@ -0,0 +1,2 @@ + +Exn_bt (moonpool.Moonpool.Exn_bt)

Module Moonpool.Exn_bt

Exception with backtrace.

An exception bundled with a backtrace

val exn : t -> exn
val make : exn -> Stdlib.Printexc.raw_backtrace -> t

Trivial builder

val get : exn -> t

get exn is make exn (get_raw_backtrace ())

val get_callstack : int -> exn -> t
val raise : t -> 'a

Raise the exception with its save backtrace

val show : t -> string

Simple printing

type nonrec 'a result = ('a, t) result
diff --git a/dev/moonpool/Moonpool/Fifo_pool/For_runner_implementors/index.html b/dev/moonpool/Moonpool/Fifo_pool/For_runner_implementors/index.html index 77ba3cfe..e5404afa 100644 --- a/dev/moonpool/Moonpool/Fifo_pool/For_runner_implementors/index.html +++ b/dev/moonpool/Moonpool/Fifo_pool/For_runner_implementors/index.html @@ -3,6 +3,6 @@ size:(unit -> int) -> num_tasks:(unit -> int) -> shutdown:(wait:bool -> unit -> unit) -> - run_async:(name:string -> task -> unit) -> + run_async:(name:string -> ls:Task_local_storage.storage -> task -> unit) -> unit -> - t

Create a new runner.

NOTE: the runner should support DLA and Suspend_ on OCaml 5.x, so that Fork_join and other 5.x features work properly.

val k_cur_runner : t option ref Moonpool__.Thread_local_storage_.key

Key that should be used by each runner to store itself in TLS on every thread it controls, so that tasks running on these threads can access the runner. This is necessary for get_current_runner to work.

+ t

Create a new runner.

NOTE: the runner should support DLA and Suspend_ on OCaml 5.x, so that Fork_join and other 5.x features work properly.

val k_cur_runner : t option ref Moonpool_private.Thread_local_storage_.key

Key that should be used by each runner to store itself in TLS on every thread it controls, so that tasks running on these threads can access the runner. This is necessary for get_current_runner to work.

diff --git a/dev/moonpool/Moonpool/Fifo_pool/index.html b/dev/moonpool/Moonpool/Fifo_pool/index.html index d70bd877..e1fa2b31 100644 --- a/dev/moonpool/Moonpool/Fifo_pool/index.html +++ b/dev/moonpool/Moonpool/Fifo_pool/index.html @@ -1,9 +1,19 @@ -Fifo_pool (moonpool.Moonpool.Fifo_pool)

Module Moonpool.Fifo_pool

A simple thread pool in FIFO order.

FIFO: first-in, first-out. Basically tasks are put into a queue, and worker threads pull them out of the queue at the other end.

Since this uses a single blocking queue to manage tasks, it's very simple and reliable. The number of worker threads is fixed, but they are spread over several domains to enable parallelism.

This can be useful for latency-sensitive applications (e.g. as a pool of workers for network servers). Work-stealing pools might have higher throughput but they're very unfair to some tasks; by contrast, here, older tasks have priority over younger tasks.

include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?name:string -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter name

    if provided and Trace is present in dependencies, a span will be created when the task starts, and will stop when the task is over. (since NEXT_RELEASE)

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?name:string -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
type ('a, 'b) create_args = +Fifo_pool (moonpool.Moonpool.Fifo_pool)

Module Moonpool.Fifo_pool

A simple thread pool in FIFO order.

FIFO: first-in, first-out. Basically tasks are put into a queue, and worker threads pull them out of the queue at the other end.

Since this uses a single blocking queue to manage tasks, it's very simple and reliable. The number of worker threads is fixed, but they are spread over several domains to enable parallelism.

This can be useful for latency-sensitive applications (e.g. as a pool of workers for network servers). Work-stealing pools might have higher throughput but they're very unfair to some tasks; by contrast, here, older tasks have priority over younger tasks.

  • since 0.5
include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : + ?name:string -> + ?ls:Task_local_storage.storage -> + t -> + task -> + unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter name

    if provided and Trace is present in dependencies, a span will be created when the task starts, and will stop when the task is over. (since NEXT_RELEASE)

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : + ?name:string -> + ?ls:Task_local_storage.storage -> + t -> + (unit -> 'a) -> + 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
type ('a, 'b) create_args = ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exn:(exn -> Stdlib.Printexc.raw_backtrace -> unit) -> ?around_task:((t -> 'b) * (t -> 'b -> unit)) -> ?num_threads:int -> ?name:string -> - 'a

Arguments used in create. See create for explanations.

val create : (unit -> t, _) create_args

create () makes a new thread pool.

  • parameter on_init_thread

    called at the beginning of each new thread in the pool.

  • parameter min

    minimum size of the pool. See Pool.create_args. The default is Domain.recommended_domain_count(), ie one worker per CPU core. On OCaml 4 the default is 4 (since there is only one domain).

  • parameter on_exit_thread

    called at the end of each worker thread in the pool.

  • parameter around_task

    a pair of before, after functions ran around each task. See Pool.create_args.

  • parameter name

    name for the pool, used in tracing (since NEXT_RELEASE)

val with_ : (unit -> (t -> 'a) -> 'a, _) create_args

with_ () f calls f pool, where pool is obtained via create. When f pool returns or fails, pool is shutdown and its resources are released. Most parameters are the same as in create.

+ 'a

Arguments used in create. See create for explanations.

val create : (unit -> t, _) create_args

create () makes a new thread pool.

  • parameter on_init_thread

    called at the beginning of each new thread in the pool.

  • parameter min

    minimum size of the pool. See Pool.create_args. The default is Domain.recommended_domain_count(), ie one worker per CPU core. On OCaml 4 the default is 4 (since there is only one domain).

  • parameter on_exit_thread

    called at the end of each worker thread in the pool.

  • parameter around_task

    a pair of before, after functions ran around each task. See Pool.create_args.

  • parameter name

    name for the pool, used in tracing (since NEXT_RELEASE)

val with_ : (unit -> (t -> 'a) -> 'a, _) create_args

with_ () f calls f pool, where pool is obtained via create. When f pool returns or fails, pool is shutdown and its resources are released. Most parameters are the same as in create.

diff --git a/dev/moonpool/Moonpool/Fut/index.html b/dev/moonpool/Moonpool/Fut/index.html index 1e020ae8..b4b5b1f8 100644 --- a/dev/moonpool/Moonpool/Fut/index.html +++ b/dev/moonpool/Moonpool/Fut/index.html @@ -1,2 +1,11 @@ -Fut (moonpool.Moonpool.Fut)

Module Moonpool.Fut

Futures.

A future of type 'a t represents the result of a computation that will yield a value of type 'a.

Typically, the computation is running on a thread pool Runner.t and will proceed on some worker. Once set, a future cannot change. It either succeeds (storing a Ok x with x: 'a), or fail (storing a Error (exn, bt) with an exception and the corresponding backtrace).

Combinators such as map and join_array can be used to produce futures from other futures (in a monadic way). Some combinators take a on argument to specify a runner on which the intermediate computation takes place; for example map ~on:pool ~f fut maps the value in fut using function f, applicatively; the call to f happens on the runner pool (once fut resolves successfully with a value).

type 'a or_error = ('a, exn * Stdlib.Printexc.raw_backtrace) result
type 'a t

A future with a result of type 'a.

type 'a promise

A promise, which can be fulfilled exactly once to set the corresponding future

val make : ?name:string -> unit -> 'a t * 'a promise

Make a new future with the associated promise.

  • parameter name

    name for the future, used for tracing. since NEXT_RELEASE.

val on_result : 'a t -> ('a or_error -> unit) -> unit

on_result fut f registers f to be called in the future when fut is set ; or calls f immediately if fut is already set.

exception Already_fulfilled
val fulfill : 'a promise -> 'a or_error -> unit

Fullfill the promise, setting the future at the same time.

val fulfill_idempotent : 'a promise -> 'a or_error -> unit

Fullfill the promise, setting the future at the same time. Does nothing if the promise is already fulfilled.

val return : 'a -> 'a t

Already settled future, with a result

val fail : exn -> Stdlib.Printexc.raw_backtrace -> _ t

Already settled future, with a failure

val of_result : 'a or_error -> 'a t
val is_resolved : _ t -> bool

is_resolved fut is true iff fut is resolved.

val peek : 'a t -> 'a or_error option

peek fut returns Some r if fut is currently resolved with r, and None if fut is not resolved yet.

exception Not_ready
  • since 0.2
val get_or_fail : 'a t -> 'a or_error

get_or_fail fut obtains the result from fut if it's fulfilled (i.e. if peek fut returns Some res, get_or_fail fut returns res).

  • since 0.2
val get_or_fail_exn : 'a t -> 'a

get_or_fail_exn fut obtains the result from fut if it's fulfilled, like get_or_fail. If the result is an Error _, the exception inside is re-raised.

  • since 0.2
val is_done : _ t -> bool

Is the future resolved? This is the same as peek fut |> Option.is_some.

  • since 0.2

Combinators

val spawn : ?name:string -> on:Runner.t -> (unit -> 'a) -> 'a t

spaw ~on f runs f() on the given runner on, and return a future that will hold its result.

val spawn_on_current_runner : ?name:string -> (unit -> 'a) -> 'a t

This must be run from inside a runner, and schedules the new task on it as well.

See Runner.get_current_runner to see how the runner is found.

  • since 0.5
  • raises Failure

    if run from outside a runner.

val reify_error : 'a t -> 'a or_error t

reify_error fut turns a failing future into a non-failing one that contain Error (exn, bt). A non-failing future returning x is turned into Ok x

  • since 0.4
val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t

map ?on ~f fut returns a new future fut2 that resolves with f x if fut resolved with x; and fails with e if fut fails with e or f x raises e.

  • parameter on

    if provided, f runs on the given runner

val bind : ?on:Runner.t -> f:('a -> 'b t) -> 'a t -> 'b t

bind ?on ~f fut returns a new future fut2 that resolves like the future f x if fut resolved with x; and fails with e if fut fails with e or f x raises e.

  • parameter on

    if provided, f runs on the given runner

val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t

bind_reify_error ?on ~f fut returns a new future fut2 that resolves like the future f (Ok x) if fut resolved with x; and resolves like the future f (Error (exn, bt)) if fut fails with exn and backtrace bt.

  • parameter on

    if provided, f runs on the given runner

  • since 0.4
val join : 'a t t -> 'a t

join fut is fut >>= Fun.id. It joins the inner layer of the future.

  • since 0.2
val both : 'a t -> 'b t -> ('a * 'b) t

both a b succeeds with x, y if a succeeds with x and b succeeds with y, or fails if any of them fails.

val choose : 'a t -> 'b t -> ('a, 'b) Either.t t

choose a b succeeds Left x or Right y if a succeeds with x or b succeeds with y, or fails if both of them fails. If they both succeed, it is not specified which result is used.

val choose_same : 'a t -> 'a t -> 'a t

choose_same a b succeeds with the value of one of a or b if they succeed, or fails if both fail. If they both succeed, it is not specified which result is used.

val join_array : 'a t array -> 'a array t

Wait for all the futures in the array. Fails if any future fails.

val join_list : 'a t list -> 'a list t

Wait for all the futures in the list. Fails if any future fails.

module Advanced : sig ... end
val map_list : f:('a -> 'b t) -> 'a list -> 'b list t

map_list ~f l is like join_list @@ List.map f l.

  • since 0.5.1
val wait_array : _ t array -> unit t

wait_array arr waits for all futures in arr to resolve. It discards the individual results of futures in arr. It fails if any future fails.

val wait_list : _ t list -> unit t

wait_list l waits for all futures in l to resolve. It discards the individual results of futures in l. It fails if any future fails.

val for_ : on:Runner.t -> int -> (int -> unit) -> unit t

for_ ~on n f runs f 0, f 1, …, f (n-1) on the runner, and returns a future that resolves when all the tasks have resolved, or fails as soon as one task has failed.

val for_array : on:Runner.t -> 'a array -> (int -> 'a -> unit) -> unit t

for_array ~on arr f runs f 0 arr.(0), …, f (n-1) arr.(n-1) in the runner (where n = Array.length arr), and returns a future that resolves when all the tasks are done, or fails if any of them fails.

  • since 0.2
val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t

for_list ~on l f is like for_array ~on (Array.of_list l) f.

  • since 0.2

Await

NOTE This is only available on OCaml 5.

val await : 'a t -> 'a

await fut suspends the current tasks until fut is fulfilled, then resumes the task on this same runner.

  • since 0.3

This must only be run from inside the runner itself. The runner must support Suspend_. NOTE: only on OCaml 5.x

Blocking

val wait_block : 'a t -> 'a or_error

wait_block fut blocks the current thread until fut is resolved, and returns its value.

NOTE: A word of warning: this will monopolize the calling thread until the future resolves. This can also easily cause deadlocks, if enough threads in a pool call wait_block on futures running on the same pool or a pool depending on it.

A good rule to avoid deadlocks is to run this from outside of any pool, or to have an acyclic order between pools where wait_block is only called from a pool on futures evaluated in a pool that comes lower in the hierarchy. If this rule is broken, it is possible for all threads in a pool to wait for futures that can only make progress on these same threads, hence the deadlock.

val wait_block_exn : 'a t -> 'a

Same as wait_block but re-raises the exception if the future failed.

Infix operators

These combinators run on either the current pool (if present), or on the same thread that just fulfilled the previous future if not.

They were previously present as module Infix_local and val infix, but are now simplified.

module Infix : sig ... end
include module type of Infix
  • since 0.5
val (>|=) : 'a t -> ('a -> 'b) -> 'b t
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
val let+ : 'a t -> ('a -> 'b) -> 'b t
val and+ : 'a t -> 'b t -> ('a * 'b) t
val let* : 'a t -> ('a -> 'b t) -> 'b t
val and* : 'a t -> 'b t -> ('a * 'b) t
module Infix_local = Infix
+Fut (moonpool.Moonpool.Fut)

Module Moonpool.Fut

Futures.

A future of type 'a t represents the result of a computation that will yield a value of type 'a.

Typically, the computation is running on a thread pool Runner.t and will proceed on some worker. Once set, a future cannot change. It either succeeds (storing a Ok x with x: 'a), or fail (storing a Error (exn, bt) with an exception and the corresponding backtrace).

Combinators such as map and join_array can be used to produce futures from other futures (in a monadic way). Some combinators take a on argument to specify a runner on which the intermediate computation takes place; for example map ~on:pool ~f fut maps the value in fut using function f, applicatively; the call to f happens on the runner pool (once fut resolves successfully with a value).

type 'a or_error = ('a, Exn_bt.t) result
type 'a t

A future with a result of type 'a.

type 'a promise

A promise, which can be fulfilled exactly once to set the corresponding future

val make : ?name:string -> unit -> 'a t * 'a promise

Make a new future with the associated promise.

  • parameter name

    name for the future, used for tracing. since NEXT_RELEASE.

val on_result : 'a t -> ('a or_error -> unit) -> unit

on_result fut f registers f to be called in the future when fut is set ; or calls f immediately if fut is already set.

exception Already_fulfilled
val fulfill : 'a promise -> 'a or_error -> unit

Fullfill the promise, setting the future at the same time.

val fulfill_idempotent : 'a promise -> 'a or_error -> unit

Fullfill the promise, setting the future at the same time. Does nothing if the promise is already fulfilled.

val return : 'a -> 'a t

Already settled future, with a result

val fail : exn -> Stdlib.Printexc.raw_backtrace -> _ t

Already settled future, with a failure

val fail_exn_bt : Exn_bt.t -> _ t

Fail from a bundle of exception and backtrace

  • since NEXT_RELEASE
val of_result : 'a or_error -> 'a t
val is_resolved : _ t -> bool

is_resolved fut is true iff fut is resolved.

val peek : 'a t -> 'a or_error option

peek fut returns Some r if fut is currently resolved with r, and None if fut is not resolved yet.

exception Not_ready
  • since 0.2
val get_or_fail : 'a t -> 'a or_error

get_or_fail fut obtains the result from fut if it's fulfilled (i.e. if peek fut returns Some res, get_or_fail fut returns res).

  • since 0.2
val get_or_fail_exn : 'a t -> 'a

get_or_fail_exn fut obtains the result from fut if it's fulfilled, like get_or_fail. If the result is an Error _, the exception inside is re-raised.

  • since 0.2
val is_done : _ t -> bool

Is the future resolved? This is the same as peek fut |> Option.is_some.

  • since 0.2
val is_success : _ t -> bool

Checks if the future is resolved with Ok _ as a result.

  • since NEXT_RELEASE
val is_failed : _ t -> bool

Checks if the future is resolved with Error _ as a result.

  • since NEXT_RELEASE

Combinators

val spawn : + ?name:string -> + ?ls:Task_local_storage.storage -> + on:Runner.t -> + (unit -> 'a) -> + 'a t

spaw ~on f runs f() on the given runner on, and return a future that will hold its result.

val spawn_on_current_runner : + ?name:string -> + ?ls:Task_local_storage.storage -> + (unit -> 'a) -> + 'a t

This must be run from inside a runner, and schedules the new task on it as well.

See Runner.get_current_runner to see how the runner is found.

  • since 0.5
  • raises Failure

    if run from outside a runner.

val reify_error : 'a t -> 'a or_error t

reify_error fut turns a failing future into a non-failing one that contain Error (exn, bt). A non-failing future returning x is turned into Ok x

  • since 0.4
val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t

map ?on ~f fut returns a new future fut2 that resolves with f x if fut resolved with x; and fails with e if fut fails with e or f x raises e.

  • parameter on

    if provided, f runs on the given runner

val bind : ?on:Runner.t -> f:('a -> 'b t) -> 'a t -> 'b t

bind ?on ~f fut returns a new future fut2 that resolves like the future f x if fut resolved with x; and fails with e if fut fails with e or f x raises e.

  • parameter on

    if provided, f runs on the given runner

val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t

bind_reify_error ?on ~f fut returns a new future fut2 that resolves like the future f (Ok x) if fut resolved with x; and resolves like the future f (Error (exn, bt)) if fut fails with exn and backtrace bt.

  • parameter on

    if provided, f runs on the given runner

  • since 0.4
val join : 'a t t -> 'a t

join fut is fut >>= Fun.id. It joins the inner layer of the future.

  • since 0.2
val both : 'a t -> 'b t -> ('a * 'b) t

both a b succeeds with x, y if a succeeds with x and b succeeds with y, or fails if any of them fails.

val choose : 'a t -> 'b t -> ('a, 'b) Either.t t

choose a b succeeds Left x or Right y if a succeeds with x or b succeeds with y, or fails if both of them fails. If they both succeed, it is not specified which result is used.

val choose_same : 'a t -> 'a t -> 'a t

choose_same a b succeeds with the value of one of a or b if they succeed, or fails if both fail. If they both succeed, it is not specified which result is used.

val join_array : 'a t array -> 'a array t

Wait for all the futures in the array. Fails if any future fails.

val join_list : 'a t list -> 'a list t

Wait for all the futures in the list. Fails if any future fails.

module Advanced : sig ... end
val map_list : f:('a -> 'b t) -> 'a list -> 'b list t

map_list ~f l is like join_list @@ List.map f l.

  • since 0.5.1
val wait_array : _ t array -> unit t

wait_array arr waits for all futures in arr to resolve. It discards the individual results of futures in arr. It fails if any future fails.

val wait_list : _ t list -> unit t

wait_list l waits for all futures in l to resolve. It discards the individual results of futures in l. It fails if any future fails.

val for_ : on:Runner.t -> int -> (int -> unit) -> unit t

for_ ~on n f runs f 0, f 1, …, f (n-1) on the runner, and returns a future that resolves when all the tasks have resolved, or fails as soon as one task has failed.

val for_array : on:Runner.t -> 'a array -> (int -> 'a -> unit) -> unit t

for_array ~on arr f runs f 0 arr.(0), …, f (n-1) arr.(n-1) in the runner (where n = Array.length arr), and returns a future that resolves when all the tasks are done, or fails if any of them fails.

  • since 0.2
val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t

for_list ~on l f is like for_array ~on (Array.of_list l) f.

  • since 0.2

Await

NOTE This is only available on OCaml 5.

val await : 'a t -> 'a

await fut suspends the current tasks until fut is fulfilled, then resumes the task on this same runner (but possibly on a different thread/domain).

  • since 0.3

This must only be run from inside the runner itself. The runner must support Suspend_. NOTE: only on OCaml 5.x

Blocking

val wait_block : 'a t -> 'a or_error

wait_block fut blocks the current thread until fut is resolved, and returns its value.

NOTE: A word of warning: this will monopolize the calling thread until the future resolves. This can also easily cause deadlocks, if enough threads in a pool call wait_block on futures running on the same pool or a pool depending on it.

A good rule to avoid deadlocks is to run this from outside of any pool, or to have an acyclic order between pools where wait_block is only called from a pool on futures evaluated in a pool that comes lower in the hierarchy. If this rule is broken, it is possible for all threads in a pool to wait for futures that can only make progress on these same threads, hence the deadlock.

val wait_block_exn : 'a t -> 'a

Same as wait_block but re-raises the exception if the future failed.

Infix operators

These combinators run on either the current pool (if present), or on the same thread that just fulfilled the previous future if not.

They were previously present as module Infix_local and val infix, but are now simplified.

module Infix : sig ... end
include module type of Infix
  • since 0.5
val (>|=) : 'a t -> ('a -> 'b) -> 'b t
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
val let+ : 'a t -> ('a -> 'b) -> 'b t
val and+ : 'a t -> 'b t -> ('a * 'b) t
val let* : 'a t -> ('a -> 'b t) -> 'b t
val and* : 'a t -> 'b t -> ('a * 'b) t
module Infix_local = Infix
diff --git a/dev/moonpool/Moonpool/Immediate_runner/For_runner_implementors/index.html b/dev/moonpool/Moonpool/Immediate_runner/For_runner_implementors/index.html index 6494de98..e99177a1 100644 --- a/dev/moonpool/Moonpool/Immediate_runner/For_runner_implementors/index.html +++ b/dev/moonpool/Moonpool/Immediate_runner/For_runner_implementors/index.html @@ -3,6 +3,6 @@ size:(unit -> int) -> num_tasks:(unit -> int) -> shutdown:(wait:bool -> unit -> unit) -> - run_async:(name:string -> task -> unit) -> + run_async:(name:string -> ls:Task_local_storage.storage -> task -> unit) -> unit -> - t

Create a new runner.

NOTE: the runner should support DLA and Suspend_ on OCaml 5.x, so that Fork_join and other 5.x features work properly.

val k_cur_runner : t option ref Moonpool__.Thread_local_storage_.key

Key that should be used by each runner to store itself in TLS on every thread it controls, so that tasks running on these threads can access the runner. This is necessary for get_current_runner to work.

+ t

Create a new runner.

NOTE: the runner should support DLA and Suspend_ on OCaml 5.x, so that Fork_join and other 5.x features work properly.

val k_cur_runner : t option ref Moonpool_private.Thread_local_storage_.key

Key that should be used by each runner to store itself in TLS on every thread it controls, so that tasks running on these threads can access the runner. This is necessary for get_current_runner to work.

diff --git a/dev/moonpool/Moonpool/Immediate_runner/index.html b/dev/moonpool/Moonpool/Immediate_runner/index.html index 240bff6b..62fab13e 100644 --- a/dev/moonpool/Moonpool/Immediate_runner/index.html +++ b/dev/moonpool/Moonpool/Immediate_runner/index.html @@ -1,2 +1,12 @@ -Immediate_runner (moonpool.Moonpool.Immediate_runner)

Module Moonpool.Immediate_runner

Runner that runs tasks immediately in the caller thread.

Whenever a task is submitted to this runner via Runner.run_async r task, the task is run immediately in the caller thread as task(). There are no background threads, no resource, this is just a trivial implementation of the interface.

This can be useful when an implementation needs a runner, but there isn't enough work to justify starting an actual full thread pool.

Another situation is when threads cannot be used at all (e.g. because you plan to call Unix.fork later).

include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?name:string -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter name

    if provided and Trace is present in dependencies, a span will be created when the task starts, and will stop when the task is over. (since NEXT_RELEASE)

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?name:string -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
val runner : t

The trivial runner that actually runs tasks at the calling point.

+Immediate_runner (moonpool.Moonpool.Immediate_runner)

Module Moonpool.Immediate_runner

Runner that runs tasks immediately in the caller thread.

Whenever a task is submitted to this runner via Runner.run_async r task, the task is run immediately in the caller thread as task(). There are no background threads, no resource, this is just a trivial implementation of the interface.

This can be useful when an implementation needs a runner, but there isn't enough work to justify starting an actual full thread pool.

Another situation is when threads cannot be used at all (e.g. because you plan to call Unix.fork later).

NOTE: this does not handle the Suspend effect, so await, fork-join, etc. will NOT work on this runner.

include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : + ?name:string -> + ?ls:Task_local_storage.storage -> + t -> + task -> + unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter name

    if provided and Trace is present in dependencies, a span will be created when the task starts, and will stop when the task is over. (since NEXT_RELEASE)

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : + ?name:string -> + ?ls:Task_local_storage.storage -> + t -> + (unit -> 'a) -> + 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
val runner : t

The trivial runner that actually runs tasks at the calling point.

diff --git a/dev/moonpool/Moonpool/Lock/index.html b/dev/moonpool/Moonpool/Lock/index.html index c798743c..1a50728a 100644 --- a/dev/moonpool/Moonpool/Lock/index.html +++ b/dev/moonpool/Moonpool/Lock/index.html @@ -1,2 +1,13 @@ -Lock (moonpool.Moonpool.Lock)

Module Moonpool.Lock

Mutex-protected resource.

type 'a t

A value protected by a mutex

val create : 'a -> 'a t

Create a new protected value.

val with_ : 'a t -> ('a -> 'b) -> 'b

with_ l f runs f x where x is the value protected with the lock l, in a critical section. If f x fails, with_lock l f fails too but the lock is released.

val update : 'a t -> ('a -> 'a) -> unit

update l f replaces the content x of l with f x, while protected by the mutex.

val update_map : 'a t -> ('a -> 'a * 'b) -> 'b

update_map l f computes x', y = f (get l), then puts x' in l and returns y, while protected by the mutex.

val mutex : _ t -> Stdlib.Mutex.t

Underlying mutex.

val get : 'a t -> 'a

Atomically get the value in the lock. The value that is returned isn't protected!

val set : 'a t -> 'a -> unit

Atomically set the value.

NOTE caution: using get and set as if this were a ref is an anti pattern and will not protect data against some race conditions.

+Lock (moonpool.Moonpool.Lock)

Module Moonpool.Lock

Mutex-protected resource.

This lock is a synchronous concurrency primitive, as a thin wrapper around Mutex that encourages proper management of the critical section in RAII style:

let (let@) = (@@)
+
+
+…
+let compute_foo =
+  (* enter critical section *)
+  let@ x = Lock.with_ protected_resource in
+  use_x;
+  return_foo ()
+  (* exit critical section *)
+in
+…

This lock does not work well with await. A critical section that contains a call to await might cause deadlocks, or lock starvation, because it will hold onto the lock while it goes to sleep.

type 'a t

A value protected by a mutex

val create : 'a -> 'a t

Create a new protected value.

val with_ : 'a t -> ('a -> 'b) -> 'b

with_ l f runs f x where x is the value protected with the lock l, in a critical section. If f x fails, with_lock l f fails too but the lock is released.

val update : 'a t -> ('a -> 'a) -> unit

update l f replaces the content x of l with f x, while protected by the mutex.

val update_map : 'a t -> ('a -> 'a * 'b) -> 'b

update_map l f computes x', y = f (get l), then puts x' in l and returns y, while protected by the mutex.

val mutex : _ t -> Stdlib.Mutex.t

Underlying mutex.

val get : 'a t -> 'a

Atomically get the value in the lock. The value that is returned isn't protected!

val set : 'a t -> 'a -> unit

Atomically set the value.

NOTE caution: using get and set as if this were a ref is an anti pattern and will not protect data against some race conditions.

diff --git a/dev/moonpool/Moonpool/Runner/For_runner_implementors/index.html b/dev/moonpool/Moonpool/Runner/For_runner_implementors/index.html index 1d311087..7e584aca 100644 --- a/dev/moonpool/Moonpool/Runner/For_runner_implementors/index.html +++ b/dev/moonpool/Moonpool/Runner/For_runner_implementors/index.html @@ -3,6 +3,6 @@ size:(unit -> int) -> num_tasks:(unit -> int) -> shutdown:(wait:bool -> unit -> unit) -> - run_async:(name:string -> task -> unit) -> + run_async:(name:string -> ls:Task_local_storage.storage -> task -> unit) -> unit -> - t

Create a new runner.

NOTE: the runner should support DLA and Suspend_ on OCaml 5.x, so that Fork_join and other 5.x features work properly.

val k_cur_runner : t option ref Moonpool__.Thread_local_storage_.key

Key that should be used by each runner to store itself in TLS on every thread it controls, so that tasks running on these threads can access the runner. This is necessary for get_current_runner to work.

+ t

Create a new runner.

NOTE: the runner should support DLA and Suspend_ on OCaml 5.x, so that Fork_join and other 5.x features work properly.

val k_cur_runner : t option ref Moonpool_private.Thread_local_storage_.key

Key that should be used by each runner to store itself in TLS on every thread it controls, so that tasks running on these threads can access the runner. This is necessary for get_current_runner to work.

diff --git a/dev/moonpool/Moonpool/Runner/index.html b/dev/moonpool/Moonpool/Runner/index.html index eb22ca79..ac6e0122 100644 --- a/dev/moonpool/Moonpool/Runner/index.html +++ b/dev/moonpool/Moonpool/Runner/index.html @@ -1,2 +1,12 @@ -Runner (moonpool.Moonpool.Runner)

Module Moonpool.Runner

Interface for runners.

This provides an abstraction for running tasks in the background, which is implemented by various thread pools.

type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?name:string -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter name

    if provided and Trace is present in dependencies, a span will be created when the task starts, and will stop when the task is over. (since NEXT_RELEASE)

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?name:string -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
+Runner (moonpool.Moonpool.Runner)

Module Moonpool.Runner

Interface for runners.

This provides an abstraction for running tasks in the background, which is implemented by various thread pools.

type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : + ?name:string -> + ?ls:Task_local_storage.storage -> + t -> + task -> + unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter name

    if provided and Trace is present in dependencies, a span will be created when the task starts, and will stop when the task is over. (since NEXT_RELEASE)

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : + ?name:string -> + ?ls:Task_local_storage.storage -> + t -> + (unit -> 'a) -> + 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
diff --git a/dev/moonpool/Moonpool/Task_local_storage/index.html b/dev/moonpool/Moonpool/Task_local_storage/index.html new file mode 100644 index 00000000..973f759b --- /dev/null +++ b/dev/moonpool/Moonpool/Task_local_storage/index.html @@ -0,0 +1,8 @@ + +Task_local_storage (moonpool.Moonpool.Task_local_storage)

Module Moonpool.Task_local_storage

Task-local storage.

This storage is associated to the current task, just like thread-local storage is associated with the current thread. The storage is carried along in case the current task is suspended.

type storage

Underlying storage for a task

type 'a key

A key used to access a particular (typed) storage slot on every task.

val new_key : init:(unit -> 'a) -> unit -> 'a key

new_key ~init () makes a new key. Keys are expensive and should never be allocated dynamically or in a loop. The correct pattern is, at toplevel:

  let k_foo : foo Task_ocal_storage.key =
+    Task_local_storage.new_key ~init:(fun () -> make_foo ()) ()
+
+(* … *)
+
+(* use it: *)
+let … = Task_local_storage.get k_foo
val get : 'a key -> 'a

get k gets the value for the current task for key k. Must be run from inside a task running on a runner.

  • raises Failure

    otherwise

val set : 'a key -> 'a -> unit

set k v sets the storage for k to v. Must be run from inside a task running on a runner.

  • raises Failure

    otherwise

val with_value : 'a key -> 'a -> (unit -> 'b) -> 'b

with_value k v f sets k to v for the duration of the call to f(). When f() returns (or fails), k is restored to its old value.

diff --git a/dev/moonpool/Moonpool/Thread_local_storage/index.html b/dev/moonpool/Moonpool/Thread_local_storage/index.html deleted file mode 100644 index 5ca01d98..00000000 --- a/dev/moonpool/Moonpool/Thread_local_storage/index.html +++ /dev/null @@ -1,2 +0,0 @@ - -Thread_local_storage (moonpool.Moonpool.Thread_local_storage)

Module Moonpool.Thread_local_storage

Thread local storage

type 'a key

A TLS key for values of type 'a. This allows the storage of a single value of type 'a per thread.

val new_key : (unit -> 'a) -> 'a key

Allocate a new, generative key. When the key is used for the first time on a thread, the function is called to produce it.

This should only ever be called at toplevel to produce constants, do not use it in a loop.

val get : 'a key -> 'a

Get the value for the current thread.

val set : 'a key -> 'a -> unit

Set the value for the current thread.

diff --git a/dev/moonpool/Moonpool/Ws_pool/For_runner_implementors/index.html b/dev/moonpool/Moonpool/Ws_pool/For_runner_implementors/index.html index d7deaa26..98481d1f 100644 --- a/dev/moonpool/Moonpool/Ws_pool/For_runner_implementors/index.html +++ b/dev/moonpool/Moonpool/Ws_pool/For_runner_implementors/index.html @@ -3,6 +3,6 @@ size:(unit -> int) -> num_tasks:(unit -> int) -> shutdown:(wait:bool -> unit -> unit) -> - run_async:(name:string -> task -> unit) -> + run_async:(name:string -> ls:Task_local_storage.storage -> task -> unit) -> unit -> - t

Create a new runner.

NOTE: the runner should support DLA and Suspend_ on OCaml 5.x, so that Fork_join and other 5.x features work properly.

val k_cur_runner : t option ref Moonpool__.Thread_local_storage_.key

Key that should be used by each runner to store itself in TLS on every thread it controls, so that tasks running on these threads can access the runner. This is necessary for get_current_runner to work.

+ t

Create a new runner.

NOTE: the runner should support DLA and Suspend_ on OCaml 5.x, so that Fork_join and other 5.x features work properly.

val k_cur_runner : t option ref Moonpool_private.Thread_local_storage_.key

Key that should be used by each runner to store itself in TLS on every thread it controls, so that tasks running on these threads can access the runner. This is necessary for get_current_runner to work.

diff --git a/dev/moonpool/Moonpool/Ws_pool/index.html b/dev/moonpool/Moonpool/Ws_pool/index.html index e7f622f5..7a25a527 100644 --- a/dev/moonpool/Moonpool/Ws_pool/index.html +++ b/dev/moonpool/Moonpool/Ws_pool/index.html @@ -1,5 +1,15 @@ -Ws_pool (moonpool.Moonpool.Ws_pool)

Module Moonpool.Ws_pool

Work-stealing thread pool.

A pool of threads with a worker-stealing scheduler. The pool contains a fixed number of threads that wait for work items to come, process these, and loop.

This is good for CPU-intensive tasks that feature a lot of small tasks. Note that tasks will not always be processed in the order they are scheduled, so this is not great for workloads where the latency of individual tasks matter (for that see Fifo_pool).

This implements Runner.t since 0.3.

If a pool is no longer needed, shutdown can be used to signal all threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simply the single runtime on OCaml 4).

include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?name:string -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter name

    if provided and Trace is present in dependencies, a span will be created when the task starts, and will stop when the task is over. (since NEXT_RELEASE)

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?name:string -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
type ('a, 'b) create_args = +Ws_pool (moonpool.Moonpool.Ws_pool)

Module Moonpool.Ws_pool

Work-stealing thread pool.

A pool of threads with a worker-stealing scheduler. The pool contains a fixed number of threads that wait for work items to come, process these, and loop.

This is good for CPU-intensive tasks that feature a lot of small tasks. Note that tasks will not always be processed in the order they are scheduled, so this is not great for workloads where the latency of individual tasks matter (for that see Fifo_pool).

This implements Runner.t since 0.3.

If a pool is no longer needed, shutdown can be used to signal all threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simply the single runtime on OCaml 4).

include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : + ?name:string -> + ?ls:Task_local_storage.storage -> + t -> + task -> + unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter name

    if provided and Trace is present in dependencies, a span will be created when the task starts, and will stop when the task is over. (since NEXT_RELEASE)

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : + ?name:string -> + ?ls:Task_local_storage.storage -> + t -> + (unit -> 'a) -> + 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
type ('a, 'b) create_args = ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exn:(exn -> Stdlib.Printexc.raw_backtrace -> unit) -> diff --git a/dev/moonpool/Moonpool/index.html b/dev/moonpool/Moonpool/index.html index 4c96aa6c..0b3915b8 100644 --- a/dev/moonpool/Moonpool/index.html +++ b/dev/moonpool/Moonpool/index.html @@ -1,2 +1,21 @@ -Moonpool (moonpool.Moonpool)

Module Moonpool

Moonpool

A pool within a bigger pool (ie the ocean). Here, we're talking about pools of Thread.t that are dispatched over several Domain.t to enable parallelism.

We provide several implementations of pools with distinct scheduling strategies, alongside some concurrency primitives such as guarding locks (Lock.t) and futures (Fut.t).

module Ws_pool : sig ... end

Work-stealing thread pool.

module Fifo_pool : sig ... end

A simple thread pool in FIFO order.

module Runner : sig ... end

Interface for runners.

module Immediate_runner : sig ... end

Runner that runs tasks immediately in the caller thread.

module Pool = Fifo_pool

Default pool. Please explicitly pick an implementation instead.

val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t

Similar to Thread.create, but it picks a background domain at random to run the thread. This ensures that we don't always pick the same domain to run all the various threads needed in an application (timers, event loops, etc.)

val run_async : ?name:string -> Runner.t -> (unit -> unit) -> unit

run_async runner task schedules the task to run on the given runner. This means task() will be executed at some point in the future, possibly in another thread.

  • parameter name

    if provided and Trace is present in dependencies, a span will be created when the task starts, and will stop when the task is over. (since NEXT_RELEASE)

  • since 0.5

Number of threads recommended to saturate the CPU. For IO pools this makes little sense (you might want more threads than this because many of them will be blocked most of the time).

  • since 0.5
val spawn : ?name:string -> on:Runner.t -> (unit -> 'a) -> 'a Fut.t

spawn ~on f runs f() on the runner (a thread pool typically) and returns a future result for it. See Fut.spawn.

  • parameter name

    if provided and Trace is present in dependencies, a span will be created for the future. (since 0.6)

  • since 0.5
val spawn_on_current_runner : ?name:string -> (unit -> 'a) -> 'a Fut.t

See Fut.spawn_on_current_runner.

  • parameter name

    see spawn. since 0.6.

  • since 0.5
val await : 'a Fut.t -> 'a

Await a future. See await. Only on OCaml >= 5.0.

  • since 0.5
module Lock : sig ... end

Mutex-protected resource.

module Fut : sig ... end

Futures.

module Chan : sig ... end

Channels.

module Fork_join : sig ... end

Fork-join primitives.

module Thread_local_storage : sig ... end

Thread local storage

module Blocking_queue : sig ... end

A simple blocking queue.

module Bounded_queue : sig ... end

A blocking queue of finite size.

module Atomic : sig ... end

Atomic values.

+Moonpool (moonpool.Moonpool)

Module Moonpool

Moonpool

A pool within a bigger pool (ie the ocean). Here, we're talking about pools of Thread.t that are dispatched over several Domain.t to enable parallelism.

We provide several implementations of pools with distinct scheduling strategies, alongside some concurrency primitives such as guarding locks (Lock.t) and futures (Fut.t).

module Ws_pool : sig ... end

Work-stealing thread pool.

module Fifo_pool : sig ... end

A simple thread pool in FIFO order.

module Runner : sig ... end

Interface for runners.

module Immediate_runner : sig ... end

Runner that runs tasks immediately in the caller thread.

module Exn_bt : sig ... end

Exception with backtrace.

exception Shutdown

Exception raised when trying to run tasks on runners that have been shut down.

  • since NEXT_RELEASE
val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t

Similar to Thread.create, but it picks a background domain at random to run the thread. This ensures that we don't always pick the same domain to run all the various threads needed in an application (timers, event loops, etc.)

val run_async : + ?name:string -> + ?ls:Task_local_storage.storage -> + Runner.t -> + (unit -> unit) -> + unit

run_async runner task schedules the task to run on the given runner. This means task() will be executed at some point in the future, possibly in another thread.

  • parameter name

    if provided and Trace is present in dependencies, a span will be created when the task starts, and will stop when the task is over. (since NEXT_RELEASE)

  • since 0.5
val run_wait_block : + ?name:string -> + ?ls:Task_local_storage.storage -> + Runner.t -> + (unit -> 'a) -> + 'a

run_wait_block runner f schedules f for later execution on the runner, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

  • since NEXT_RELEASE

Number of threads recommended to saturate the CPU. For IO pools this makes little sense (you might want more threads than this because many of them will be blocked most of the time).

  • since 0.5
val spawn : + ?name:string -> + ?ls:Task_local_storage.storage -> + on:Runner.t -> + (unit -> 'a) -> + 'a Fut.t

spawn ~on f runs f() on the runner (a thread pool typically) and returns a future result for it. See Fut.spawn.

  • parameter name

    if provided and Trace is present in dependencies, a span will be created for the future. (since 0.6)

  • since 0.5
val spawn_on_current_runner : + ?name:string -> + ?ls:Task_local_storage.storage -> + (unit -> 'a) -> + 'a Fut.t

See Fut.spawn_on_current_runner.

  • parameter name

    see spawn. since 0.6.

  • since 0.5
val await : 'a Fut.t -> 'a

Await a future. See await. Only on OCaml >= 5.0.

  • since 0.5
module Lock : sig ... end

Mutex-protected resource.

module Fut : sig ... end

Futures.

module Chan : sig ... end

Channels.

module Task_local_storage : sig ... end

Task-local storage.

module Thread_local_storage = Moonpool_private.Thread_local_storage_
module Blocking_queue : sig ... end

A simple blocking queue.

module Bounded_queue : sig ... end

A blocking queue of finite size.

module Atomic = Moonpool_private.Atomic_

Atomic values.

diff --git a/dev/moonpool/Moonpool__Atomic_/index.html b/dev/moonpool/Moonpool__Atomic_/index.html deleted file mode 100644 index 4e3ab443..00000000 --- a/dev/moonpool/Moonpool__Atomic_/index.html +++ /dev/null @@ -1,2 +0,0 @@ - -Moonpool__Atomic_ (moonpool.Moonpool__Atomic_)

Module Moonpool__Atomic_

This module is hidden.

diff --git a/dev/moonpool/Moonpool__Exn_bt/index.html b/dev/moonpool/Moonpool__Exn_bt/index.html new file mode 100644 index 00000000..ba603c34 --- /dev/null +++ b/dev/moonpool/Moonpool__Exn_bt/index.html @@ -0,0 +1,2 @@ + +Moonpool__Exn_bt (moonpool.Moonpool__Exn_bt)

Module Moonpool__Exn_bt

This module is hidden.

diff --git a/dev/moonpool/Moonpool__Task_local_storage/index.html b/dev/moonpool/Moonpool__Task_local_storage/index.html new file mode 100644 index 00000000..843fc635 --- /dev/null +++ b/dev/moonpool/Moonpool__Task_local_storage/index.html @@ -0,0 +1,2 @@ + +Moonpool__Task_local_storage (moonpool.Moonpool__Task_local_storage)

Module Moonpool__Task_local_storage

This module is hidden.

diff --git a/dev/moonpool/Moonpool__Thread_local_storage_/index.html b/dev/moonpool/Moonpool__Thread_local_storage_/index.html deleted file mode 100644 index 6ae7a4cc..00000000 --- a/dev/moonpool/Moonpool__Thread_local_storage_/index.html +++ /dev/null @@ -1,2 +0,0 @@ - -Moonpool__Thread_local_storage_ (moonpool.Moonpool__Thread_local_storage_)

Module Moonpool__Thread_local_storage_

This module is hidden.

diff --git a/dev/moonpool/Moonpool__Ws_deque_/index.html b/dev/moonpool/Moonpool__Ws_deque_/index.html deleted file mode 100644 index ffb49ac3..00000000 --- a/dev/moonpool/Moonpool__Ws_deque_/index.html +++ /dev/null @@ -1,2 +0,0 @@ - -Moonpool__Ws_deque_ (moonpool.Moonpool__Ws_deque_)

Module Moonpool__Ws_deque_

This module is hidden.

diff --git a/dev/moonpool/Moonpool_fib/Fiber/index.html b/dev/moonpool/Moonpool_fib/Fiber/index.html new file mode 100644 index 00000000..22e96129 --- /dev/null +++ b/dev/moonpool/Moonpool_fib/Fiber/index.html @@ -0,0 +1,2 @@ + +Fiber (moonpool.Moonpool_fib.Fiber)

Module Moonpool_fib.Fiber

Fibers.

A fiber is a lightweight computation that runs cooperatively alongside other fibers. In the context of moonpool, fibers have additional properties:

  • they run in a moonpool runner
  • they form a simple supervision tree, enabling a limited form of structured concurrency
type 'a t

A fiber returning a value of type 'a.

val res : 'a t -> 'a Moonpool.Fut.t

Future result of the fiber.

type 'a callback = 'a Moonpool.Exn_bt.result -> unit

Callbacks that are called when a fiber is done.

type cancel_callback = Moonpool.Exn_bt.t -> unit
val peek : 'a t -> 'a Moonpool.Fut.or_error option

Peek inside the future result

val is_done : _ t -> bool

Has the fiber completed?

val is_cancelled : _ t -> bool

Has the fiber completed with a failure?

val is_success : _ t -> bool

Has the fiber completed with a value?

val await : 'a t -> 'a

await fib is like Fut.await (res fib)

val check_if_cancelled : unit -> unit

Check if the current fiber is cancelled, in which case this raises. Must be run from inside a fiber.

  • raises Failure

    if not.

val yield : unit -> unit

Yield control to the scheduler from the current fiber.

  • raises Failure

    if not run from inside a fiber.

val with_cancel_callback : _ t -> cancel_callback -> (unit -> 'a) -> 'a

with_cancel_callback fib cb (fun () -> <e>) evaluates e in a scope in which, if the fiber fib is cancelled, cb() is called. If e returns without the fiber being cancelled, this callback is removed.

val on_result : 'a t -> 'a callback -> unit

Wait for fiber to be done and call the callback with the result. If the fiber is done already then the callback is invoked immediately with its result.

val spawn_top : ?name:string -> on:Moonpool.Runner.t -> (unit -> 'a) -> 'a t

spawn_top ~on f spawns a new (toplevel) fiber onto the given runner. This fiber is not the child of any other fiber: its lifetime is only determined by the lifetime of f().

spawn_link ~protect f spawns a sub-fiber f_child from a running fiber parent. The sub-fiber f_child is attached to the current fiber and fails if the current fiber parent fails.

  • parameter protect

    if true, when f_child fails, it does not affect parent. If false, f_child failing also causes parent to fail (and therefore all other children of parent).

    Must be run from inside a fiber.

  • raises Failure

    if not run from inside a fiber.

diff --git a/dev/moonpool/Moonpool_fib/Fls/index.html b/dev/moonpool/Moonpool_fib/Fls/index.html new file mode 100644 index 00000000..3bcd2826 --- /dev/null +++ b/dev/moonpool/Moonpool_fib/Fls/index.html @@ -0,0 +1,8 @@ + +Fls (moonpool.Moonpool_fib.Fls)

Module Moonpool_fib.Fls

Fiber-local storage.

This storage is associated to the current fiber, just like thread-local storage is associated with the current thread.

include module type of struct include Moonpool.Task_local_storage end

Underlying storage for a task

A key used to access a particular (typed) storage slot on every task.

val new_key : init:(unit -> 'a) -> unit -> 'a key

new_key ~init () makes a new key. Keys are expensive and should never be allocated dynamically or in a loop. The correct pattern is, at toplevel:

  let k_foo : foo Task_ocal_storage.key =
+    Task_local_storage.new_key ~init:(fun () -> make_foo ()) ()
+
+(* … *)
+
+(* use it: *)
+let … = Task_local_storage.get k_foo
val get : 'a key -> 'a

get k gets the value for the current task for key k. Must be run from inside a task running on a runner.

  • raises Failure

    otherwise

val set : 'a key -> 'a -> unit

set k v sets the storage for k to v. Must be run from inside a task running on a runner.

  • raises Failure

    otherwise

val with_value : 'a key -> 'a -> (unit -> 'b) -> 'b

with_value k v f sets k to v for the duration of the call to f(). When f() returns (or fails), k is restored to its old value.

diff --git a/dev/moonpool/Moonpool_fib/Handle/Map/index.html b/dev/moonpool/Moonpool_fib/Handle/Map/index.html new file mode 100644 index 00000000..f9a96e30 --- /dev/null +++ b/dev/moonpool/Moonpool_fib/Handle/Map/index.html @@ -0,0 +1,8 @@ + +Map (moonpool.Moonpool_fib.Handle.Map)

Module Handle.Map

type key = t

The type of the map keys.

type !+'a t

The type of maps from type key to type 'a.

val empty : 'a t

The empty map.

val is_empty : 'a t -> bool

Test whether a map is empty or not.

val mem : key -> 'a t -> bool

mem x m returns true if m contains a binding for x, and false otherwise.

val add : key -> 'a -> 'a t -> 'a t

add key data m returns a map containing the same bindings as m, plus a binding of key to data. If key was already bound in m to a value that is physically equal to data, m is returned unchanged (the result of the function is then physically equal to m). Otherwise, the previous binding of key in m disappears.

  • before 4.03

    Physical equality was not ensured.

val update : key -> ('a option -> 'a option) -> 'a t -> 'a t

update key f m returns a map containing the same bindings as m, except for the binding of key. Depending on the value of y where y is f (find_opt key m), the binding of key is added, removed or updated. If y is None, the binding is removed if it exists; otherwise, if y is Some z then key is associated to z in the resulting map. If key was already bound in m to a value that is physically equal to z, m is returned unchanged (the result of the function is then physically equal to m).

  • since 4.06.0
val singleton : key -> 'a -> 'a t

singleton x y returns the one-element map that contains a binding y for x.

  • since 3.12.0
val remove : key -> 'a t -> 'a t

remove x m returns a map containing the same bindings as m, except for x which is unbound in the returned map. If x was not in m, m is returned unchanged (the result of the function is then physically equal to m).

  • before 4.03

    Physical equality was not ensured.

val merge : + (key -> 'a option -> 'b option -> 'c option) -> + 'a t -> + 'b t -> + 'c t

merge f m1 m2 computes a map whose keys are a subset of the keys of m1 and of m2. The presence of each such binding, and the corresponding value, is determined with the function f. In terms of the find_opt operation, we have find_opt x (merge f m1 m2) = f x (find_opt x m1) (find_opt x m2) for any key x, provided that f x None None = None.

  • since 3.12.0
val union : (key -> 'a -> 'a -> 'a option) -> 'a t -> 'a t -> 'a t

union f m1 m2 computes a map whose keys are a subset of the keys of m1 and of m2. When the same binding is defined in both arguments, the function f is used to combine them. This is a special case of merge: union f m1 m2 is equivalent to merge f' m1 m2, where

  • f' _key None None = None
  • f' _key (Some v) None = Some v
  • f' _key None (Some v) = Some v
  • f' key (Some v1) (Some v2) = f key v1 v2
  • since 4.03.0
val compare : ('a -> 'a -> int) -> 'a t -> 'a t -> int

Total ordering between maps. The first argument is a total ordering used to compare data associated with equal keys in the two maps.

val equal : ('a -> 'a -> bool) -> 'a t -> 'a t -> bool

equal cmp m1 m2 tests whether the maps m1 and m2 are equal, that is, contain equal keys and associate them with equal data. cmp is the equality predicate used to compare the data associated with the keys.

val iter : (key -> 'a -> unit) -> 'a t -> unit

iter f m applies f to all bindings in map m. f receives the key as first argument, and the associated value as second argument. The bindings are passed to f in increasing order with respect to the ordering over the type of the keys.

val fold : (key -> 'a -> 'b -> 'b) -> 'a t -> 'b -> 'b

fold f m init computes (f kN dN ... (f k1 d1 init)...), where k1 ... kN are the keys of all bindings in m (in increasing order), and d1 ... dN are the associated data.

val for_all : (key -> 'a -> bool) -> 'a t -> bool

for_all f m checks if all the bindings of the map satisfy the predicate f.

  • since 3.12.0
val exists : (key -> 'a -> bool) -> 'a t -> bool

exists f m checks if at least one binding of the map satisfies the predicate f.

  • since 3.12.0
val filter : (key -> 'a -> bool) -> 'a t -> 'a t

filter f m returns the map with all the bindings in m that satisfy predicate p. If every binding in m satisfies f, m is returned unchanged (the result of the function is then physically equal to m)

  • since 3.12.0
  • before 4.03

    Physical equality was not ensured.

val filter_map : (key -> 'a -> 'b option) -> 'a t -> 'b t

filter_map f m applies the function f to every binding of m, and builds a map from the results. For each binding (k, v) in the input map:

  • if f k v is None then k is not in the result,
  • if f k v is Some v' then the binding (k, v') is in the output map.

For example, the following function on maps whose values are lists

filter_map
+  (fun _k li -> match li with [] -> None | _::tl -> Some tl)
+  m

drops all bindings of m whose value is an empty list, and pops the first element of each value that is non-empty.

  • since 4.11.0
val partition : (key -> 'a -> bool) -> 'a t -> 'a t * 'a t

partition f m returns a pair of maps (m1, m2), where m1 contains all the bindings of m that satisfy the predicate f, and m2 is the map with all the bindings of m that do not satisfy f.

  • since 3.12.0
val cardinal : 'a t -> int

Return the number of bindings of a map.

  • since 3.12.0
val bindings : 'a t -> (key * 'a) list

Return the list of all bindings of the given map. The returned list is sorted in increasing order of keys with respect to the ordering Ord.compare, where Ord is the argument given to Map.Make.

  • since 3.12.0
val min_binding : 'a t -> key * 'a

Return the binding with the smallest key in a given map (with respect to the Ord.compare ordering), or raise Not_found if the map is empty.

  • since 3.12.0
val min_binding_opt : 'a t -> (key * 'a) option

Return the binding with the smallest key in the given map (with respect to the Ord.compare ordering), or None if the map is empty.

  • since 4.05
val max_binding : 'a t -> key * 'a

Same as min_binding, but returns the binding with the largest key in the given map.

  • since 3.12.0
val max_binding_opt : 'a t -> (key * 'a) option

Same as min_binding_opt, but returns the binding with the largest key in the given map.

  • since 4.05
val choose : 'a t -> key * 'a

Return one binding of the given map, or raise Not_found if the map is empty. Which binding is chosen is unspecified, but equal bindings will be chosen for equal maps.

  • since 3.12.0
val choose_opt : 'a t -> (key * 'a) option

Return one binding of the given map, or None if the map is empty. Which binding is chosen is unspecified, but equal bindings will be chosen for equal maps.

  • since 4.05
val split : key -> 'a t -> 'a t * 'a option * 'a t

split x m returns a triple (l, data, r), where l is the map with all the bindings of m whose key is strictly less than x; r is the map with all the bindings of m whose key is strictly greater than x; data is None if m contains no binding for x, or Some v if m binds v to x.

  • since 3.12.0
val find : key -> 'a t -> 'a

find x m returns the current value of x in m, or raises Not_found if no binding for x exists.

val find_opt : key -> 'a t -> 'a option

find_opt x m returns Some v if the current value of x in m is v, or None if no binding for x exists.

  • since 4.05
val find_first : (key -> bool) -> 'a t -> key * 'a

find_first f m, where f is a monotonically increasing function, returns the binding of m with the lowest key k such that f k, or raises Not_found if no such key exists.

For example, find_first (fun k -> Ord.compare k x >= 0) m will return the first binding k, v of m where Ord.compare k x >= 0 (intuitively: k >= x), or raise Not_found if x is greater than any element of m.

  • since 4.05
val find_first_opt : (key -> bool) -> 'a t -> (key * 'a) option

find_first_opt f m, where f is a monotonically increasing function, returns an option containing the binding of m with the lowest key k such that f k, or None if no such key exists.

  • since 4.05
val find_last : (key -> bool) -> 'a t -> key * 'a

find_last f m, where f is a monotonically decreasing function, returns the binding of m with the highest key k such that f k, or raises Not_found if no such key exists.

  • since 4.05
val find_last_opt : (key -> bool) -> 'a t -> (key * 'a) option

find_last_opt f m, where f is a monotonically decreasing function, returns an option containing the binding of m with the highest key k such that f k, or None if no such key exists.

  • since 4.05
val map : ('a -> 'b) -> 'a t -> 'b t

map f m returns a map with same domain as m, where the associated value a of all bindings of m has been replaced by the result of the application of f to a. The bindings are passed to f in increasing order with respect to the ordering over the type of the keys.

val mapi : (key -> 'a -> 'b) -> 'a t -> 'b t

Same as map, but the function receives as arguments both the key and the associated value for each binding of the map.

Maps and Sequences

val to_seq : 'a t -> (key * 'a) Seq.t

Iterate on the whole map, in ascending order of keys

  • since 4.07
val to_rev_seq : 'a t -> (key * 'a) Seq.t

Iterate on the whole map, in descending order of keys

  • since 4.12
val to_seq_from : key -> 'a t -> (key * 'a) Seq.t

to_seq_from k m iterates on a subset of the bindings of m, in ascending order of keys, from key k or above.

  • since 4.07
val add_seq : (key * 'a) Seq.t -> 'a t -> 'a t

Add the given bindings to the map, in order.

  • since 4.07
val of_seq : (key * 'a) Seq.t -> 'a t

Build a map from the given bindings

  • since 4.07
diff --git a/dev/moonpool/Moonpool_fib/Handle/Set/index.html b/dev/moonpool/Moonpool_fib/Handle/Set/index.html new file mode 100644 index 00000000..b7cd769b --- /dev/null +++ b/dev/moonpool/Moonpool_fib/Handle/Set/index.html @@ -0,0 +1,3 @@ + +Set (moonpool.Moonpool_fib.Handle.Set)

Module Handle.Set

type elt = t

The type of the set elements.

type t

The type of sets.

val empty : t

The empty set.

val is_empty : t -> bool

Test whether a set is empty or not.

val mem : elt -> t -> bool

mem x s tests whether x belongs to the set s.

val add : elt -> t -> t

add x s returns a set containing all elements of s, plus x. If x was already in s, s is returned unchanged (the result of the function is then physically equal to s).

  • before 4.03

    Physical equality was not ensured.

val singleton : elt -> t

singleton x returns the one-element set containing only x.

val remove : elt -> t -> t

remove x s returns a set containing all elements of s, except x. If x was not in s, s is returned unchanged (the result of the function is then physically equal to s).

  • before 4.03

    Physical equality was not ensured.

val union : t -> t -> t

Set union.

val inter : t -> t -> t

Set intersection.

val disjoint : t -> t -> bool

Test if two sets are disjoint.

  • since 4.08.0
val diff : t -> t -> t

Set difference: diff s1 s2 contains the elements of s1 that are not in s2.

val compare : t -> t -> int

Total ordering between sets. Can be used as the ordering function for doing sets of sets.

val equal : t -> t -> bool

equal s1 s2 tests whether the sets s1 and s2 are equal, that is, contain equal elements.

val subset : t -> t -> bool

subset s1 s2 tests whether the set s1 is a subset of the set s2.

val iter : (elt -> unit) -> t -> unit

iter f s applies f in turn to all elements of s. The elements of s are presented to f in increasing order with respect to the ordering over the type of the elements.

val map : (elt -> elt) -> t -> t

map f s is the set whose elements are f a0,f a1... f + aN, where a0,a1...aN are the elements of s.

The elements are passed to f in increasing order with respect to the ordering over the type of the elements.

If no element of s is changed by f, s is returned unchanged. (If each output of f is physically equal to its input, the returned set is physically equal to s.)

  • since 4.04.0
val fold : (elt -> 'a -> 'a) -> t -> 'a -> 'a

fold f s init computes (f xN ... (f x2 (f x1 init))...), where x1 ... xN are the elements of s, in increasing order.

val for_all : (elt -> bool) -> t -> bool

for_all f s checks if all elements of the set satisfy the predicate f.

val exists : (elt -> bool) -> t -> bool

exists f s checks if at least one element of the set satisfies the predicate f.

val filter : (elt -> bool) -> t -> t

filter f s returns the set of all elements in s that satisfy predicate f. If f satisfies every element in s, s is returned unchanged (the result of the function is then physically equal to s).

  • before 4.03

    Physical equality was not ensured.

val filter_map : (elt -> elt option) -> t -> t

filter_map f s returns the set of all v such that f x = Some v for some element x of s.

For example,

filter_map (fun n -> if n mod 2 = 0 then Some (n / 2) else None) s

is the set of halves of the even elements of s.

If no element of s is changed or dropped by f (if f x = Some x for each element x), then s is returned unchanged: the result of the function is then physically equal to s.

  • since 4.11.0
val partition : (elt -> bool) -> t -> t * t

partition f s returns a pair of sets (s1, s2), where s1 is the set of all the elements of s that satisfy the predicate f, and s2 is the set of all the elements of s that do not satisfy f.

val cardinal : t -> int

Return the number of elements of a set.

val elements : t -> elt list

Return the list of all elements of the given set. The returned list is sorted in increasing order with respect to the ordering Ord.compare, where Ord is the argument given to Set.Make.

val min_elt : t -> elt

Return the smallest element of the given set (with respect to the Ord.compare ordering), or raise Not_found if the set is empty.

val min_elt_opt : t -> elt option

Return the smallest element of the given set (with respect to the Ord.compare ordering), or None if the set is empty.

  • since 4.05
val max_elt : t -> elt

Same as min_elt, but returns the largest element of the given set.

val max_elt_opt : t -> elt option

Same as min_elt_opt, but returns the largest element of the given set.

  • since 4.05
val choose : t -> elt

Return one element of the given set, or raise Not_found if the set is empty. Which element is chosen is unspecified, but equal elements will be chosen for equal sets.

val choose_opt : t -> elt option

Return one element of the given set, or None if the set is empty. Which element is chosen is unspecified, but equal elements will be chosen for equal sets.

  • since 4.05
val split : elt -> t -> t * bool * t

split x s returns a triple (l, present, r), where l is the set of elements of s that are strictly less than x; r is the set of elements of s that are strictly greater than x; present is false if s contains no element equal to x, or true if s contains an element equal to x.

val find : elt -> t -> elt

find x s returns the element of s equal to x (according to Ord.compare), or raise Not_found if no such element exists.

  • since 4.01.0
val find_opt : elt -> t -> elt option

find_opt x s returns the element of s equal to x (according to Ord.compare), or None if no such element exists.

  • since 4.05
val find_first : (elt -> bool) -> t -> elt

find_first f s, where f is a monotonically increasing function, returns the lowest element e of s such that f e, or raises Not_found if no such element exists.

For example, find_first (fun e -> Ord.compare e x >= 0) s will return the first element e of s where Ord.compare e x >= 0 (intuitively: e >= x), or raise Not_found if x is greater than any element of s.

  • since 4.05
val find_first_opt : (elt -> bool) -> t -> elt option

find_first_opt f s, where f is a monotonically increasing function, returns an option containing the lowest element e of s such that f e, or None if no such element exists.

  • since 4.05
val find_last : (elt -> bool) -> t -> elt

find_last f s, where f is a monotonically decreasing function, returns the highest element e of s such that f e, or raises Not_found if no such element exists.

  • since 4.05
val find_last_opt : (elt -> bool) -> t -> elt option

find_last_opt f s, where f is a monotonically decreasing function, returns an option containing the highest element e of s such that f e, or None if no such element exists.

  • since 4.05
val of_list : elt list -> t

of_list l creates a set from a list of elements. This is usually more efficient than folding add over the list, except perhaps for lists with many duplicated elements.

  • since 4.02.0

Iterators

val to_seq_from : elt -> t -> elt Seq.t

to_seq_from x s iterates on a subset of the elements of s in ascending order, from x or above.

  • since 4.07
val to_seq : t -> elt Seq.t

Iterate on the whole set, in ascending order

  • since 4.07
val to_rev_seq : t -> elt Seq.t

Iterate on the whole set, in descending order

  • since 4.12
val add_seq : elt Seq.t -> t -> t

Add the given elements to the set, in order.

  • since 4.07
val of_seq : elt Seq.t -> t

Build a set from the given bindings

  • since 4.07
diff --git a/dev/moonpool/Moonpool_fib/Handle/index.html b/dev/moonpool/Moonpool_fib/Handle/index.html new file mode 100644 index 00000000..230d07b2 --- /dev/null +++ b/dev/moonpool/Moonpool_fib/Handle/index.html @@ -0,0 +1,2 @@ + +Handle (moonpool.Moonpool_fib.Handle)

Module Moonpool_fib.Handle

The unique name of a fiber

type t = private int

Unique, opaque identifier for a fiber.

val equal : t -> t -> bool
val compare : t -> t -> int
val hash : t -> int
val generate_fresh : unit -> t

Generate a fresh, unique identifier

module Set : Set.S with type elt = t
module Map : Map.S with type key = t
diff --git a/dev/moonpool/Moonpool_fib/index.html b/dev/moonpool/Moonpool_fib/index.html new file mode 100644 index 00000000..b1e9b4c2 --- /dev/null +++ b/dev/moonpool/Moonpool_fib/index.html @@ -0,0 +1,2 @@ + +Moonpool_fib (moonpool.Moonpool_fib)

Module Moonpool_fib

module Fiber : sig ... end

Fibers.

module Fls : sig ... end

Fiber-local storage.

module Handle : sig ... end

The unique name of a fiber

diff --git a/dev/moonpool/Moonpool__Fork_join/index.html b/dev/moonpool/Moonpool_fib__Fiber/index.html similarity index 74% rename from dev/moonpool/Moonpool__Fork_join/index.html rename to dev/moonpool/Moonpool_fib__Fiber/index.html index c81785ed..f2bc3f4a 100644 --- a/dev/moonpool/Moonpool__Fork_join/index.html +++ b/dev/moonpool/Moonpool_fib__Fiber/index.html @@ -1,2 +1,2 @@ -Moonpool__Fork_join (moonpool.Moonpool__Fork_join)

Module Moonpool__Fork_join

This module is hidden.

+Moonpool_fib__Fiber (moonpool.Moonpool_fib__Fiber)

Module Moonpool_fib__Fiber

This module is hidden.

diff --git a/dev/moonpool/Moonpool__Domain_/index.html b/dev/moonpool/Moonpool_fib__Fls/index.html similarity index 74% rename from dev/moonpool/Moonpool__Domain_/index.html rename to dev/moonpool/Moonpool_fib__Fls/index.html index 637f6af3..6383dcd8 100644 --- a/dev/moonpool/Moonpool__Domain_/index.html +++ b/dev/moonpool/Moonpool_fib__Fls/index.html @@ -1,2 +1,2 @@ -Moonpool__Domain_ (moonpool.Moonpool__Domain_)

Module Moonpool__Domain_

This module is hidden.

+Moonpool_fib__Fls (moonpool.Moonpool_fib__Fls)

Module Moonpool_fib__Fls

This module is hidden.

diff --git a/dev/moonpool/Moonpool__Util_pool_/index.html b/dev/moonpool/Moonpool_fib__Handle/index.html similarity index 74% rename from dev/moonpool/Moonpool__Util_pool_/index.html rename to dev/moonpool/Moonpool_fib__Handle/index.html index b95889d1..f082300a 100644 --- a/dev/moonpool/Moonpool__Util_pool_/index.html +++ b/dev/moonpool/Moonpool_fib__Handle/index.html @@ -1,2 +1,2 @@ -Moonpool__Util_pool_ (moonpool.Moonpool__Util_pool_)

Module Moonpool__Util_pool_

This module is hidden.

+Moonpool_fib__Handle (moonpool.Moonpool_fib__Handle)

Module Moonpool_fib__Handle

This module is hidden.

diff --git a/dev/moonpool/Moonpool/Fork_join/index.html b/dev/moonpool/Moonpool_forkjoin/index.html similarity index 59% rename from dev/moonpool/Moonpool/Fork_join/index.html rename to dev/moonpool/Moonpool_forkjoin/index.html index 22f8b7dc..2660567a 100644 --- a/dev/moonpool/Moonpool/Fork_join/index.html +++ b/dev/moonpool/Moonpool_forkjoin/index.html @@ -1,5 +1,5 @@ -Fork_join (moonpool.Moonpool.Fork_join)

Module Moonpool.Fork_join

Fork-join primitives.

NOTE These are only available on OCaml 5.0 and above.

  • since 0.3
val both : (unit -> 'a) -> (unit -> 'b) -> 'a * 'b

both f g runs f() and g(), potentially in parallel, and returns their result when both are done. If any of f() and g() fails, then the whole computation fails.

This must be run from within the pool: for example, inside Pool.run or inside a Fut.spawn computation. This is because it relies on an effect handler to be installed.

  • since 0.3

NOTE this is only available on OCaml 5.

val both_ignore : (unit -> _) -> (unit -> _) -> unit

Same as both f g |> ignore.

  • since 0.3

NOTE this is only available on OCaml 5.

val for_ : ?chunk_size:int -> int -> (int -> int -> unit) -> unit

for_ n f is the parallel version of for i=0 to n-1 do f i done.

f is called with parameters low and high and must use them like so:

for j = low to high do (* … actual work *) done 

. If chunk_size=1 then low=high and the loop is not actually needed.

  • parameter chunk_size

    controls the granularity of parallelism. The default chunk size is not specified. See all_array or all_list for more details.

    Example:

    let total_sum = Atomic.make 0
    +Moonpool_forkjoin (moonpool.Moonpool_forkjoin)

    Module Moonpool_forkjoin

    Fork-join primitives.

    NOTE These are only available on OCaml 5.0 and above.

    • since 0.3
    val both : (unit -> 'a) -> (unit -> 'b) -> 'a * 'b

    both f g runs f() and g(), potentially in parallel, and returns their result when both are done. If any of f() and g() fails, then the whole computation fails.

    This must be run from within the pool: for example, inside Pool.run or inside a Fut.spawn computation. This is because it relies on an effect handler to be installed.

    • since 0.3

    NOTE this is only available on OCaml 5.

    val both_ignore : (unit -> _) -> (unit -> _) -> unit

    Same as both f g |> ignore.

    • since 0.3

    NOTE this is only available on OCaml 5.

    val for_ : ?chunk_size:int -> int -> (int -> int -> unit) -> unit

    for_ n f is the parallel version of for i=0 to n-1 do f i done.

    f is called with parameters low and high and must use them like so:

    for j = low to high do (* … actual work *) done 

    . If chunk_size=1 then low=high and the loop is not actually needed.

    • parameter chunk_size

      controls the granularity of parallelism. The default chunk size is not specified. See all_array or all_list for more details.

      Example:

      let total_sum = Atomic.make 0
       
       let() = for_ ~chunk_size:5 100
         (fun low high ->
      diff --git a/dev/moonpool/Moonpool_private/Atomic_/index.html b/dev/moonpool/Moonpool_private/Atomic_/index.html
      new file mode 100644
      index 00000000..2b8de2fd
      --- /dev/null
      +++ b/dev/moonpool/Moonpool_private/Atomic_/index.html
      @@ -0,0 +1,2 @@
      +
      +Atomic_ (moonpool.Moonpool_private.Atomic_)

      Module Moonpool_private.Atomic_

      include module type of struct include Stdlib.Atomic end
      type !'a t = 'a Stdlib.Atomic.t

      An atomic (mutable) reference to a value of type 'a.

      val make : 'a -> 'a t

      Create an atomic reference.

      val get : 'a t -> 'a

      Get the current value of the atomic reference.

      val set : 'a t -> 'a -> unit

      Set a new value for the atomic reference.

      val exchange : 'a t -> 'a -> 'a

      Set a new value for the atomic reference, and return the current value.

      val compare_and_set : 'a t -> 'a -> 'a -> bool

      compare_and_set r seen v sets the new value of r to v only if its current value is physically equal to seen -- the comparison and the set occur atomically. Returns true if the comparison succeeded (so the set happened) and false otherwise.

      val fetch_and_add : int t -> int -> int

      fetch_and_add r n atomically increments the value of r by n, and returns the current value (before the increment).

      val incr : int t -> unit

      incr r atomically increments the value of r by 1.

      val decr : int t -> unit

      decr r atomically decrements the value of r by 1.

      diff --git a/dev/moonpool/Moonpool_private/Dla_/index.html b/dev/moonpool/Moonpool_private/Dla_/index.html new file mode 100644 index 00000000..0b78e6dd --- /dev/null +++ b/dev/moonpool/Moonpool_private/Dla_/index.html @@ -0,0 +1,2 @@ + +Dla_ (moonpool.Moonpool_private.Dla_)

      Module Moonpool_private.Dla_

      Interface to Domain-local-await.

      This is used to handle the presence or absence of DLA.

      type t = {
      1. release : unit -> unit;
      2. await : unit -> unit;
      }
      val using : prepare_for_await:(unit -> t) -> while_running:(unit -> 'a) -> 'a
      val setup_domain : unit -> unit
      diff --git a/dev/moonpool/Moonpool_private/Domain_/index.html b/dev/moonpool/Moonpool_private/Domain_/index.html new file mode 100644 index 00000000..a8b51dbb --- /dev/null +++ b/dev/moonpool/Moonpool_private/Domain_/index.html @@ -0,0 +1,2 @@ + +Domain_ (moonpool.Moonpool_private.Domain_)

      Module Moonpool_private.Domain_

      type t = unit Stdlib.Domain.t
      val get_id : t -> int
      val spawn : (unit -> unit) -> t
      val relax : unit -> unit
      val join : 'a Stdlib.Domain.t -> 'a
      val is_main_domain : unit -> bool
      diff --git a/dev/moonpool/Moonpool_private/Thread_local_storage_/index.html b/dev/moonpool/Moonpool_private/Thread_local_storage_/index.html new file mode 100644 index 00000000..1e26d1d1 --- /dev/null +++ b/dev/moonpool/Moonpool_private/Thread_local_storage_/index.html @@ -0,0 +1,2 @@ + +Thread_local_storage_ (moonpool.Moonpool_private.Thread_local_storage_)

      Module Moonpool_private.Thread_local_storage_

      Thread local storage

      type 'a key

      A TLS key for values of type 'a. This allows the storage of a single value of type 'a per thread.

      val new_key : (unit -> 'a) -> 'a key

      Allocate a new, generative key. When the key is used for the first time on a thread, the function is called to produce it.

      This should only ever be called at toplevel to produce constants, do not use it in a loop.

      val get : 'a key -> 'a

      Get the value for the current thread.

      val set : 'a key -> 'a -> unit

      Set the value for the current thread.

      diff --git a/dev/moonpool/Moonpool_private/Tracing_/index.html b/dev/moonpool/Moonpool_private/Tracing_/index.html new file mode 100644 index 00000000..fba00eda --- /dev/null +++ b/dev/moonpool/Moonpool_private/Tracing_/index.html @@ -0,0 +1,2 @@ + +Tracing_ (moonpool.Moonpool_private.Tracing_)

      Module Moonpool_private.Tracing_

      val dummy_span : int64
      val enter_span : string -> int64
      val exit_span : int64 -> unit
      val enabled : unit -> bool
      val set_thread_name : string -> unit
      diff --git a/dev/moonpool/Moonpool_private/Ws_deque_/index.html b/dev/moonpool/Moonpool_private/Ws_deque_/index.html new file mode 100644 index 00000000..fb9e01fb --- /dev/null +++ b/dev/moonpool/Moonpool_private/Ws_deque_/index.html @@ -0,0 +1,2 @@ + +Ws_deque_ (moonpool.Moonpool_private.Ws_deque_)

      Module Moonpool_private.Ws_deque_

      Work-stealing deque.

      Adapted from "Dynamic circular work stealing deque", Chase & Lev.

      However note that this one is not dynamic in the sense that there is no resizing. Instead we return false when push fails, which keeps the implementation fairly lightweight.

      type 'a t

      Deque containing values of type 'a

      val create : dummy:'a -> unit -> 'a t

      Create a new deque.

      val push : 'a t -> 'a -> bool

      Push value at the bottom of deque. returns true if it succeeds. This must be called only by the owner thread.

      val pop : 'a t -> 'a option

      Pop value from the bottom of deque. This must be called only by the owner thread.

      val steal : 'a t -> 'a option

      Try to steal from the top of deque. This is thread-safe.

      val size : _ t -> int
      diff --git a/dev/moonpool/Moonpool_private/index.html b/dev/moonpool/Moonpool_private/index.html new file mode 100644 index 00000000..8cea1aa8 --- /dev/null +++ b/dev/moonpool/Moonpool_private/index.html @@ -0,0 +1,2 @@ + +Moonpool_private (moonpool.Moonpool_private)

      Module Moonpool_private

      module Atomic_ : sig ... end
      module Dla_ : sig ... end

      Interface to Domain-local-await.

      module Domain_ : sig ... end
      module Thread_local_storage_ : sig ... end

      Thread local storage

      module Tracing_ : sig ... end
      module Ws_deque_ : sig ... end

      Work-stealing deque.

      diff --git a/dev/moonpool/Moonpool_private__Atomic_/index.html b/dev/moonpool/Moonpool_private__Atomic_/index.html new file mode 100644 index 00000000..ecb36bcb --- /dev/null +++ b/dev/moonpool/Moonpool_private__Atomic_/index.html @@ -0,0 +1,2 @@ + +Moonpool_private__Atomic_ (moonpool.Moonpool_private__Atomic_)

      Module Moonpool_private__Atomic_

      This module is hidden.

      diff --git a/dev/moonpool/Moonpool_private__Dla_/index.html b/dev/moonpool/Moonpool_private__Dla_/index.html new file mode 100644 index 00000000..6941b183 --- /dev/null +++ b/dev/moonpool/Moonpool_private__Dla_/index.html @@ -0,0 +1,2 @@ + +Moonpool_private__Dla_ (moonpool.Moonpool_private__Dla_)

      Module Moonpool_private__Dla_

      This module is hidden.

      diff --git a/dev/moonpool/Moonpool_private__Domain_/index.html b/dev/moonpool/Moonpool_private__Domain_/index.html new file mode 100644 index 00000000..204cee77 --- /dev/null +++ b/dev/moonpool/Moonpool_private__Domain_/index.html @@ -0,0 +1,2 @@ + +Moonpool_private__Domain_ (moonpool.Moonpool_private__Domain_)

      Module Moonpool_private__Domain_

      This module is hidden.

      diff --git a/dev/moonpool/Moonpool_private__Thread_local_storage_/index.html b/dev/moonpool/Moonpool_private__Thread_local_storage_/index.html new file mode 100644 index 00000000..6a84ba9c --- /dev/null +++ b/dev/moonpool/Moonpool_private__Thread_local_storage_/index.html @@ -0,0 +1,2 @@ + +Moonpool_private__Thread_local_storage_ (moonpool.Moonpool_private__Thread_local_storage_)

      Module Moonpool_private__Thread_local_storage_

      This module is hidden.

      diff --git a/dev/moonpool/Moonpool_private__Tracing_/index.html b/dev/moonpool/Moonpool_private__Tracing_/index.html new file mode 100644 index 00000000..e810b47c --- /dev/null +++ b/dev/moonpool/Moonpool_private__Tracing_/index.html @@ -0,0 +1,2 @@ + +Moonpool_private__Tracing_ (moonpool.Moonpool_private__Tracing_)

      Module Moonpool_private__Tracing_

      This module is hidden.

      diff --git a/dev/moonpool/Moonpool_private__Ws_deque_/index.html b/dev/moonpool/Moonpool_private__Ws_deque_/index.html new file mode 100644 index 00000000..e9f6acea --- /dev/null +++ b/dev/moonpool/Moonpool_private__Ws_deque_/index.html @@ -0,0 +1,2 @@ + +Moonpool_private__Ws_deque_ (moonpool.Moonpool_private__Ws_deque_)

      Module Moonpool_private__Ws_deque_

      This module is hidden.

      diff --git a/dev/moonpool/_doc-dir/README.md b/dev/moonpool/_doc-dir/README.md index 4353a757..c51361df 100644 --- a/dev/moonpool/_doc-dir/README.md +++ b/dev/moonpool/_doc-dir/README.md @@ -19,7 +19,8 @@ In addition, some concurrency and parallelism primitives are provided: On OCaml 5 (meaning there's actual domains and effects, not just threads), a `Fut.await` primitive is provided. It's simpler and more powerful than the monadic combinators. -- `Moonpool.Fork_join` provides the fork-join parallelism primitives +- `Moonpool_forkjoin`, in the library `moonpool.forkjoin` + provides the fork-join parallelism primitives to use within tasks running in the pool. ## Usage @@ -166,7 +167,8 @@ val expected_sum : int = 5050 ### Fork-join -On OCaml 5, again using effect handlers, the module `Fork_join` +On OCaml 5, again using effect handlers, the sublibrary `moonpool.forkjoin` +provides a module `Moonpool_forkjoin` implements the [fork-join model](https://en.wikipedia.org/wiki/Fork%E2%80%93join_model). It must run on a pool (using `Runner.run_async` or inside a future via `Fut.spawn`). @@ -220,7 +222,7 @@ And a parallel quicksort for larger slices: done; (* sort lower half and upper half in parallel *) - Moonpool.Fork_join.both_ignore + Moonpool_forkjoin.both_ignore (fun () -> quicksort arr i (!low - i)) (fun () -> quicksort arr !low (len - (!low - i))) );; diff --git a/dev/moonpool/index.html b/dev/moonpool/index.html index 5d306dfb..e594054d 100644 --- a/dev/moonpool/index.html +++ b/dev/moonpool/index.html @@ -1,2 +1,2 @@ -index (moonpool.index)

      Package moonpool

      Package info

      changes-files
      readme-files
      +index (moonpool.index)

      Package moonpool

      Package info

      changes-files
      readme-files