diff --git a/moonpool-lwt/Moonpool_lwt/index.html b/moonpool-lwt/Moonpool_lwt/index.html index dea9daa8..73fb0bc2 100644 --- a/moonpool-lwt/Moonpool_lwt/index.html +++ b/moonpool-lwt/Moonpool_lwt/index.html @@ -1,2 +1,2 @@ -Moonpool_lwt (moonpool-lwt.Moonpool_lwt)

Module Moonpool_lwt

Lwt_engine-based event loop for Moonpool.

In what follows, we mean by "lwt thread" the thread running Lwt_main.run (so, the thread where the Lwt event loop and all Lwt callbacks execute).

module Fiber = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls

Basic conversions

val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t

fut_of_lwt lwt_fut makes a thread-safe moonpool future that completes when lwt_fut does. This must be run from within the Lwt thread.

val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t

lwt_of_fut fut makes a lwt future that completes when fut does. This must be called from the Lwt thread, and the result must always be used only from inside the Lwt thread.

Helpers on the moonpool side

val await_lwt : 'a Lwt.t -> 'a

await_lwt fut awaits a Lwt future from inside a task running on a moonpool runner. This must be run from within a Moonpool runner so that the await-ing effect is handled.

val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t

run_in_lwt f runs f() from within the Lwt thread and returns a thread-safe future. This can be run from anywhere.

val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a

run_in_lwt_and_await f runs f in the Lwt thread, and awaits its result. Must be run from inside a moonpool runner so that the await-in effect is handled.

This is similar to Moonpool.await @@ run_in_lwt f.

val get_runner : unit -> Moonpool.Runner.t

Returns the runner from within which this is called. Must be run from within a fiber.

  • raises Failure

    if not run within a fiber

IO

module IO : sig ... end

IO using the Lwt event loop.

module IO_in : sig ... end

Input channel

module IO_out : sig ... end

Output channel

module TCP_server : sig ... end
module TCP_client : sig ... end

Helpers on the lwt side

val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t

detach_in_runner ~runner f runs f in the given moonpool runner, and returns a lwt future. This must be run from within the thread running Lwt_main.

Wrappers around Lwt_main

val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a

main_with_runner ~runner f starts a Lwt-based event loop and runs f() inside a fiber in runner.

val main : (unit -> 'a) -> 'a

Like main_with_runner but with a default choice of runner.

+Moonpool_lwt (moonpool-lwt.Moonpool_lwt)

Module Moonpool_lwt

Lwt_engine-based event loop for Moonpool.

In what follows, we mean by "lwt thread" the thread running Lwt_main.run (so, the thread where the Lwt event loop and all Lwt callbacks execute).

NOTE: this is experimental and might change in future versions.

module Fiber = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls

Basic conversions

val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t

fut_of_lwt lwt_fut makes a thread-safe moonpool future that completes when lwt_fut does. This must be run from within the Lwt thread.

val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t

lwt_of_fut fut makes a lwt future that completes when fut does. This must be called from the Lwt thread, and the result must always be used only from inside the Lwt thread.

Helpers on the moonpool side

val await_lwt : 'a Lwt.t -> 'a

await_lwt fut awaits a Lwt future from inside a task running on a moonpool runner. This must be run from within a Moonpool runner so that the await-ing effect is handled.

val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t

run_in_lwt f runs f() from within the Lwt thread and returns a thread-safe future. This can be run from anywhere.

val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a

run_in_lwt_and_await f runs f in the Lwt thread, and awaits its result. Must be run from inside a moonpool runner so that the await-in effect is handled.

This is similar to Moonpool.await @@ run_in_lwt f.

val get_runner : unit -> Moonpool.Runner.t

Returns the runner from within which this is called. Must be run from within a fiber.

  • raises Failure

    if not run within a fiber

IO

module IO : sig ... end

IO using the Lwt event loop.

module IO_in : sig ... end

Input channel

module IO_out : sig ... end

Output channel

module TCP_server : sig ... end
module TCP_client : sig ... end

Helpers on the lwt side

val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t

detach_in_runner ~runner f runs f in the given moonpool runner, and returns a lwt future. This must be run from within the thread running Lwt_main.

Wrappers around Lwt_main

val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a

main_with_runner ~runner f starts a Lwt-based event loop and runs f() inside a fiber in runner.

val main : (unit -> 'a) -> 'a

Like main_with_runner but with a default choice of runner.

diff --git a/moonpool-lwt/_doc-dir/CHANGES.md b/moonpool-lwt/_doc-dir/CHANGES.md index 3e855a20..a295fa77 100644 --- a/moonpool-lwt/_doc-dir/CHANGES.md +++ b/moonpool-lwt/_doc-dir/CHANGES.md @@ -1,4 +1,29 @@ +# 0.6 + +- breaking: remove `Immediate_runner` (bug prone and didn't + handle effects). `Moonpool_fib.main` can be used to handle + effects in the main function. +- remove deprecated alias `Moonpool.Pool` + +- feat: add structured concurrency sub-library `moonpool.fib` with + fibers. Fibers can use `await` and spawn other fibers that will + be appropriately cancelled when their parent is. +- feat: add add `moonpool-lwt` as an experimental bridge between moonpool and lwt. + This allows moonpool runners to be used from within Lwt to + perform background computations, and conversely to call Lwt from + moonpool with some precautions. +- feat: task-local storage in the main moonpool runners, available from + fibers and regular tasks. +- feat: add `Exn_bt` to core +- feat: add `Runner.dummy` +- make `moonpool.forkjoin` optional (only on OCaml >= 5.0) +- feat: add `Fut.Advanced.barrier_on_abstract_container_of_futures` +- feat: add `Fut.map_list` + +- refactor: split off domain pool to `moonpool.dpool` +- fix too early exit in Ws_pool + # 0.5.1 - fix `Ws_pool`: workers would exit before processing diff --git a/moonpool/Moonpool/Background_thread/index.html b/moonpool/Moonpool/Background_thread/index.html index 4c5e08f2..5777d930 100644 --- a/moonpool/Moonpool/Background_thread/index.html +++ b/moonpool/Moonpool/Background_thread/index.html @@ -1,5 +1,5 @@ -Background_thread (moonpool.Moonpool.Background_thread)

Module Moonpool.Background_thread

A simple runner with a single background thread.

Because this is guaranteed to have a single worker thread, tasks scheduled in this runner always run asynchronously but in a sequential fashion.

This is similar to Fifo_pool with exactly one thread.

include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.t -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.t -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

val dummy : t

Runner that fails when scheduling tasks on it. Calling run_async on it will raise Failure.

  • since NEXT_RELEASE

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
val get_current_storage : unit -> Task_local_storage.t option

get_current_storage runner gets the local storage for the currently running task.

type ('a, 'b) create_args = +Background_thread (moonpool.Moonpool.Background_thread)

Module Moonpool.Background_thread

A simple runner with a single background thread.

Because this is guaranteed to have a single worker thread, tasks scheduled in this runner always run asynchronously but in a sequential fashion.

This is similar to Fifo_pool with exactly one thread.

  • since 0.6
include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.t -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.t -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

val dummy : t

Runner that fails when scheduling tasks on it. Calling run_async on it will raise Failure.

  • since 0.6

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
val get_current_storage : unit -> Task_local_storage.t option

get_current_storage runner gets the local storage for the currently running task.

type ('a, 'b) create_args = ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exn:(exn -> Stdlib.Printexc.raw_backtrace -> unit) -> diff --git a/moonpool/Moonpool/Exn_bt/index.html b/moonpool/Moonpool/Exn_bt/index.html index da518584..15f1dd6f 100644 --- a/moonpool/Moonpool/Exn_bt/index.html +++ b/moonpool/Moonpool/Exn_bt/index.html @@ -1,2 +1,2 @@ -Exn_bt (moonpool.Moonpool.Exn_bt)

Module Moonpool.Exn_bt

Exception with backtrace.

  • since NEXT_RELEASE

An exception bundled with a backtrace

val exn : t -> exn
val make : exn -> Stdlib.Printexc.raw_backtrace -> t

Trivial builder

val get : exn -> t

get exn is make exn (get_raw_backtrace ())

val get_callstack : int -> exn -> t
val raise : t -> 'a

Raise the exception with its save backtrace

val show : t -> string

Simple printing

type nonrec 'a result = ('a, t) result
+Exn_bt (moonpool.Moonpool.Exn_bt)

Module Moonpool.Exn_bt

Exception with backtrace.

  • since 0.6

An exception bundled with a backtrace

val exn : t -> exn
val make : exn -> Stdlib.Printexc.raw_backtrace -> t

Trivial builder

val get : exn -> t

get exn is make exn (get_raw_backtrace ())

val get_callstack : int -> exn -> t
val raise : t -> 'a

Raise the exception with its save backtrace

val show : t -> string

Simple printing

type nonrec 'a result = ('a, t) result
diff --git a/moonpool/Moonpool/Fifo_pool/index.html b/moonpool/Moonpool/Fifo_pool/index.html index 3c278b1c..bc5bf6e2 100644 --- a/moonpool/Moonpool/Fifo_pool/index.html +++ b/moonpool/Moonpool/Fifo_pool/index.html @@ -1,9 +1,9 @@ -Fifo_pool (moonpool.Moonpool.Fifo_pool)

Module Moonpool.Fifo_pool

A simple thread pool in FIFO order.

FIFO: first-in, first-out. Basically tasks are put into a queue, and worker threads pull them out of the queue at the other end.

Since this uses a single blocking queue to manage tasks, it's very simple and reliable. The number of worker threads is fixed, but they are spread over several domains to enable parallelism.

This can be useful for latency-sensitive applications (e.g. as a pool of workers for network servers). Work-stealing pools might have higher throughput but they're very unfair to some tasks; by contrast, here, older tasks have priority over younger tasks.

  • since 0.5
include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.t -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.t -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

val dummy : t

Runner that fails when scheduling tasks on it. Calling run_async on it will raise Failure.

  • since NEXT_RELEASE

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
val get_current_storage : unit -> Task_local_storage.t option

get_current_storage runner gets the local storage for the currently running task.

type ('a, 'b) create_args = +Fifo_pool (moonpool.Moonpool.Fifo_pool)

Module Moonpool.Fifo_pool

A simple thread pool in FIFO order.

FIFO: first-in, first-out. Basically tasks are put into a queue, and worker threads pull them out of the queue at the other end.

Since this uses a single blocking queue to manage tasks, it's very simple and reliable. The number of worker threads is fixed, but they are spread over several domains to enable parallelism.

This can be useful for latency-sensitive applications (e.g. as a pool of workers for network servers). Work-stealing pools might have higher throughput but they're very unfair to some tasks; by contrast, here, older tasks have priority over younger tasks.

  • since 0.5
include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.t -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.t -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

val dummy : t

Runner that fails when scheduling tasks on it. Calling run_async on it will raise Failure.

  • since 0.6

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
val get_current_storage : unit -> Task_local_storage.t option

get_current_storage runner gets the local storage for the currently running task.

type ('a, 'b) create_args = ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exn:(exn -> Stdlib.Printexc.raw_backtrace -> unit) -> ?around_task:((t -> 'b) * (t -> 'b -> unit)) -> ?num_threads:int -> ?name:string -> - 'a

Arguments used in create. See create for explanations.

val create : (unit -> t, _) create_args

create () makes a new thread pool.

  • parameter on_init_thread

    called at the beginning of each new thread in the pool.

  • parameter min

    minimum size of the pool. See Pool.create_args. The default is Domain.recommended_domain_count(), ie one worker per CPU core. On OCaml 4 the default is 4 (since there is only one domain).

  • parameter on_exit_thread

    called at the end of each worker thread in the pool.

  • parameter around_task

    a pair of before, after functions ran around each task. See Pool.create_args.

  • parameter name

    name for the pool, used in tracing (since NEXT_RELEASE)

val with_ : (unit -> (t -> 'a) -> 'a, _) create_args

with_ () f calls f pool, where pool is obtained via create. When f pool returns or fails, pool is shutdown and its resources are released. Most parameters are the same as in create.

+ 'a

Arguments used in create. See create for explanations.

val create : (unit -> t, _) create_args

create () makes a new thread pool.

  • parameter on_init_thread

    called at the beginning of each new thread in the pool.

  • parameter min

    minimum size of the pool. See Pool.create_args. The default is Domain.recommended_domain_count(), ie one worker per CPU core. On OCaml 4 the default is 4 (since there is only one domain).

  • parameter on_exit_thread

    called at the end of each worker thread in the pool.

  • parameter around_task

    a pair of before, after functions ran around each task. See Pool.create_args.

  • parameter name

    name for the pool, used in tracing (since 0.6)

val with_ : (unit -> (t -> 'a) -> 'a, _) create_args

with_ () f calls f pool, where pool is obtained via create. When f pool returns or fails, pool is shutdown and its resources are released. Most parameters are the same as in create.

diff --git a/moonpool/Moonpool/Fut/index.html b/moonpool/Moonpool/Fut/index.html index 448c2088..54e7f1db 100644 --- a/moonpool/Moonpool/Fut/index.html +++ b/moonpool/Moonpool/Fut/index.html @@ -1,2 +1,2 @@ -Fut (moonpool.Moonpool.Fut)

Module Moonpool.Fut

Futures.

A future of type 'a t represents the result of a computation that will yield a value of type 'a.

Typically, the computation is running on a thread pool Runner.t and will proceed on some worker. Once set, a future cannot change. It either succeeds (storing a Ok x with x: 'a), or fail (storing a Error (exn, bt) with an exception and the corresponding backtrace).

Combinators such as map and join_array can be used to produce futures from other futures (in a monadic way). Some combinators take a on argument to specify a runner on which the intermediate computation takes place; for example map ~on:pool ~f fut maps the value in fut using function f, applicatively; the call to f happens on the runner pool (once fut resolves successfully with a value).

type 'a or_error = ('a, Exn_bt.t) result
type 'a t

A future with a result of type 'a.

type 'a promise

A promise, which can be fulfilled exactly once to set the corresponding future

val make : unit -> 'a t * 'a promise

Make a new future with the associated promise.

val on_result : 'a t -> ('a or_error -> unit) -> unit

on_result fut f registers f to be called in the future when fut is set ; or calls f immediately if fut is already set.

exception Already_fulfilled
val fulfill : 'a promise -> 'a or_error -> unit

Fullfill the promise, setting the future at the same time.

val fulfill_idempotent : 'a promise -> 'a or_error -> unit

Fullfill the promise, setting the future at the same time. Does nothing if the promise is already fulfilled.

val return : 'a -> 'a t

Already settled future, with a result

val fail : exn -> Stdlib.Printexc.raw_backtrace -> _ t

Already settled future, with a failure

val fail_exn_bt : Exn_bt.t -> _ t

Fail from a bundle of exception and backtrace

  • since NEXT_RELEASE
val of_result : 'a or_error -> 'a t
val is_resolved : _ t -> bool

is_resolved fut is true iff fut is resolved.

val peek : 'a t -> 'a or_error option

peek fut returns Some r if fut is currently resolved with r, and None if fut is not resolved yet.

exception Not_ready
  • since 0.2
val get_or_fail : 'a t -> 'a or_error

get_or_fail fut obtains the result from fut if it's fulfilled (i.e. if peek fut returns Some res, get_or_fail fut returns res).

  • since 0.2
val get_or_fail_exn : 'a t -> 'a

get_or_fail_exn fut obtains the result from fut if it's fulfilled, like get_or_fail. If the result is an Error _, the exception inside is re-raised.

  • since 0.2
val is_done : _ t -> bool

Is the future resolved? This is the same as peek fut |> Option.is_some.

  • since 0.2
val is_success : _ t -> bool

Checks if the future is resolved with Ok _ as a result.

  • since NEXT_RELEASE
val is_failed : _ t -> bool

Checks if the future is resolved with Error _ as a result.

  • since NEXT_RELEASE
val raise_if_failed : _ t -> unit

raise_if_failed fut raises e if fut failed with e.

  • since NEXT_RELEASE

Combinators

val spawn : on:Runner.t -> (unit -> 'a) -> 'a t

spaw ~on f runs f() on the given runner on, and return a future that will hold its result.

val spawn_on_current_runner : (unit -> 'a) -> 'a t

This must be run from inside a runner, and schedules the new task on it as well.

See Runner.get_current_runner to see how the runner is found.

  • since 0.5
  • raises Failure

    if run from outside a runner.

val reify_error : 'a t -> 'a or_error t

reify_error fut turns a failing future into a non-failing one that contain Error (exn, bt). A non-failing future returning x is turned into Ok x

  • since 0.4
val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t

map ?on ~f fut returns a new future fut2 that resolves with f x if fut resolved with x; and fails with e if fut fails with e or f x raises e.

  • parameter on

    if provided, f runs on the given runner

val bind : ?on:Runner.t -> f:('a -> 'b t) -> 'a t -> 'b t

bind ?on ~f fut returns a new future fut2 that resolves like the future f x if fut resolved with x; and fails with e if fut fails with e or f x raises e.

  • parameter on

    if provided, f runs on the given runner

val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t

bind_reify_error ?on ~f fut returns a new future fut2 that resolves like the future f (Ok x) if fut resolved with x; and resolves like the future f (Error (exn, bt)) if fut fails with exn and backtrace bt.

  • parameter on

    if provided, f runs on the given runner

  • since 0.4
val join : 'a t t -> 'a t

join fut is fut >>= Fun.id. It joins the inner layer of the future.

  • since 0.2
val both : 'a t -> 'b t -> ('a * 'b) t

both a b succeeds with x, y if a succeeds with x and b succeeds with y, or fails if any of them fails.

val choose : 'a t -> 'b t -> ('a, 'b) Either.t t

choose a b succeeds Left x or Right y if a succeeds with x or b succeeds with y, or fails if both of them fails. If they both succeed, it is not specified which result is used.

val choose_same : 'a t -> 'a t -> 'a t

choose_same a b succeeds with the value of one of a or b if they succeed, or fails if both fail. If they both succeed, it is not specified which result is used.

val join_array : 'a t array -> 'a array t

Wait for all the futures in the array. Fails if any future fails.

val join_list : 'a t list -> 'a list t

Wait for all the futures in the list. Fails if any future fails.

module Advanced : sig ... end
val map_list : f:('a -> 'b t) -> 'a list -> 'b list t

map_list ~f l is like join_list @@ List.map f l.

  • since 0.5.1
val wait_array : _ t array -> unit t

wait_array arr waits for all futures in arr to resolve. It discards the individual results of futures in arr. It fails if any future fails.

val wait_list : _ t list -> unit t

wait_list l waits for all futures in l to resolve. It discards the individual results of futures in l. It fails if any future fails.

val for_ : on:Runner.t -> int -> (int -> unit) -> unit t

for_ ~on n f runs f 0, f 1, …, f (n-1) on the runner, and returns a future that resolves when all the tasks have resolved, or fails as soon as one task has failed.

val for_array : on:Runner.t -> 'a array -> (int -> 'a -> unit) -> unit t

for_array ~on arr f runs f 0 arr.(0), …, f (n-1) arr.(n-1) in the runner (where n = Array.length arr), and returns a future that resolves when all the tasks are done, or fails if any of them fails.

  • since 0.2
val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t

for_list ~on l f is like for_array ~on (Array.of_list l) f.

  • since 0.2

Await

NOTE This is only available on OCaml 5.

val await : 'a t -> 'a

await fut suspends the current tasks until fut is fulfilled, then resumes the task on this same runner (but possibly on a different thread/domain).

  • since 0.3

This must only be run from inside the runner itself. The runner must support Suspend_. NOTE: only on OCaml 5.x

Blocking

val wait_block : 'a t -> 'a or_error

wait_block fut blocks the current thread until fut is resolved, and returns its value.

NOTE: A word of warning: this will monopolize the calling thread until the future resolves. This can also easily cause deadlocks, if enough threads in a pool call wait_block on futures running on the same pool or a pool depending on it.

A good rule to avoid deadlocks is to run this from outside of any pool, or to have an acyclic order between pools where wait_block is only called from a pool on futures evaluated in a pool that comes lower in the hierarchy. If this rule is broken, it is possible for all threads in a pool to wait for futures that can only make progress on these same threads, hence the deadlock.

val wait_block_exn : 'a t -> 'a

Same as wait_block but re-raises the exception if the future failed.

Infix operators

These combinators run on either the current pool (if present), or on the same thread that just fulfilled the previous future if not.

They were previously present as module Infix_local and val infix, but are now simplified.

  • since 0.5
module Infix : sig ... end
include module type of Infix
  • since 0.5
val (>|=) : 'a t -> ('a -> 'b) -> 'b t
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
val let+ : 'a t -> ('a -> 'b) -> 'b t
val and+ : 'a t -> 'b t -> ('a * 'b) t
val let* : 'a t -> ('a -> 'b t) -> 'b t
val and* : 'a t -> 'b t -> ('a * 'b) t
module Infix_local = Infix
+Fut (moonpool.Moonpool.Fut)

Module Moonpool.Fut

Futures.

A future of type 'a t represents the result of a computation that will yield a value of type 'a.

Typically, the computation is running on a thread pool Runner.t and will proceed on some worker. Once set, a future cannot change. It either succeeds (storing a Ok x with x: 'a), or fail (storing a Error (exn, bt) with an exception and the corresponding backtrace).

Combinators such as map and join_array can be used to produce futures from other futures (in a monadic way). Some combinators take a on argument to specify a runner on which the intermediate computation takes place; for example map ~on:pool ~f fut maps the value in fut using function f, applicatively; the call to f happens on the runner pool (once fut resolves successfully with a value).

type 'a or_error = ('a, Exn_bt.t) result
type 'a t

A future with a result of type 'a.

type 'a promise

A promise, which can be fulfilled exactly once to set the corresponding future

val make : unit -> 'a t * 'a promise

Make a new future with the associated promise.

val on_result : 'a t -> ('a or_error -> unit) -> unit

on_result fut f registers f to be called in the future when fut is set ; or calls f immediately if fut is already set.

exception Already_fulfilled
val fulfill : 'a promise -> 'a or_error -> unit

Fullfill the promise, setting the future at the same time.

val fulfill_idempotent : 'a promise -> 'a or_error -> unit

Fullfill the promise, setting the future at the same time. Does nothing if the promise is already fulfilled.

val return : 'a -> 'a t

Already settled future, with a result

val fail : exn -> Stdlib.Printexc.raw_backtrace -> _ t

Already settled future, with a failure

val fail_exn_bt : Exn_bt.t -> _ t

Fail from a bundle of exception and backtrace

  • since 0.6
val of_result : 'a or_error -> 'a t
val is_resolved : _ t -> bool

is_resolved fut is true iff fut is resolved.

val peek : 'a t -> 'a or_error option

peek fut returns Some r if fut is currently resolved with r, and None if fut is not resolved yet.

exception Not_ready
  • since 0.2
val get_or_fail : 'a t -> 'a or_error

get_or_fail fut obtains the result from fut if it's fulfilled (i.e. if peek fut returns Some res, get_or_fail fut returns res).

  • since 0.2
val get_or_fail_exn : 'a t -> 'a

get_or_fail_exn fut obtains the result from fut if it's fulfilled, like get_or_fail. If the result is an Error _, the exception inside is re-raised.

  • since 0.2
val is_done : _ t -> bool

Is the future resolved? This is the same as peek fut |> Option.is_some.

  • since 0.2
val is_success : _ t -> bool

Checks if the future is resolved with Ok _ as a result.

  • since 0.6
val is_failed : _ t -> bool

Checks if the future is resolved with Error _ as a result.

  • since 0.6
val raise_if_failed : _ t -> unit

raise_if_failed fut raises e if fut failed with e.

  • since 0.6

Combinators

val spawn : on:Runner.t -> (unit -> 'a) -> 'a t

spaw ~on f runs f() on the given runner on, and return a future that will hold its result.

val spawn_on_current_runner : (unit -> 'a) -> 'a t

This must be run from inside a runner, and schedules the new task on it as well.

See Runner.get_current_runner to see how the runner is found.

  • since 0.5
  • raises Failure

    if run from outside a runner.

val reify_error : 'a t -> 'a or_error t

reify_error fut turns a failing future into a non-failing one that contain Error (exn, bt). A non-failing future returning x is turned into Ok x

  • since 0.4
val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t

map ?on ~f fut returns a new future fut2 that resolves with f x if fut resolved with x; and fails with e if fut fails with e or f x raises e.

  • parameter on

    if provided, f runs on the given runner

val bind : ?on:Runner.t -> f:('a -> 'b t) -> 'a t -> 'b t

bind ?on ~f fut returns a new future fut2 that resolves like the future f x if fut resolved with x; and fails with e if fut fails with e or f x raises e.

  • parameter on

    if provided, f runs on the given runner

val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t

bind_reify_error ?on ~f fut returns a new future fut2 that resolves like the future f (Ok x) if fut resolved with x; and resolves like the future f (Error (exn, bt)) if fut fails with exn and backtrace bt.

  • parameter on

    if provided, f runs on the given runner

  • since 0.4
val join : 'a t t -> 'a t

join fut is fut >>= Fun.id. It joins the inner layer of the future.

  • since 0.2
val both : 'a t -> 'b t -> ('a * 'b) t

both a b succeeds with x, y if a succeeds with x and b succeeds with y, or fails if any of them fails.

val choose : 'a t -> 'b t -> ('a, 'b) Either.t t

choose a b succeeds Left x or Right y if a succeeds with x or b succeeds with y, or fails if both of them fails. If they both succeed, it is not specified which result is used.

val choose_same : 'a t -> 'a t -> 'a t

choose_same a b succeeds with the value of one of a or b if they succeed, or fails if both fail. If they both succeed, it is not specified which result is used.

val join_array : 'a t array -> 'a array t

Wait for all the futures in the array. Fails if any future fails.

val join_list : 'a t list -> 'a list t

Wait for all the futures in the list. Fails if any future fails.

module Advanced : sig ... end
val map_list : f:('a -> 'b t) -> 'a list -> 'b list t

map_list ~f l is like join_list @@ List.map f l.

  • since 0.5.1
val wait_array : _ t array -> unit t

wait_array arr waits for all futures in arr to resolve. It discards the individual results of futures in arr. It fails if any future fails.

val wait_list : _ t list -> unit t

wait_list l waits for all futures in l to resolve. It discards the individual results of futures in l. It fails if any future fails.

val for_ : on:Runner.t -> int -> (int -> unit) -> unit t

for_ ~on n f runs f 0, f 1, …, f (n-1) on the runner, and returns a future that resolves when all the tasks have resolved, or fails as soon as one task has failed.

val for_array : on:Runner.t -> 'a array -> (int -> 'a -> unit) -> unit t

for_array ~on arr f runs f 0 arr.(0), …, f (n-1) arr.(n-1) in the runner (where n = Array.length arr), and returns a future that resolves when all the tasks are done, or fails if any of them fails.

  • since 0.2
val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t

for_list ~on l f is like for_array ~on (Array.of_list l) f.

  • since 0.2

Await

NOTE This is only available on OCaml 5.

val await : 'a t -> 'a

await fut suspends the current tasks until fut is fulfilled, then resumes the task on this same runner (but possibly on a different thread/domain).

  • since 0.3

This must only be run from inside the runner itself. The runner must support Suspend_. NOTE: only on OCaml 5.x

Blocking

val wait_block : 'a t -> 'a or_error

wait_block fut blocks the current thread until fut is resolved, and returns its value.

NOTE: A word of warning: this will monopolize the calling thread until the future resolves. This can also easily cause deadlocks, if enough threads in a pool call wait_block on futures running on the same pool or a pool depending on it.

A good rule to avoid deadlocks is to run this from outside of any pool, or to have an acyclic order between pools where wait_block is only called from a pool on futures evaluated in a pool that comes lower in the hierarchy. If this rule is broken, it is possible for all threads in a pool to wait for futures that can only make progress on these same threads, hence the deadlock.

val wait_block_exn : 'a t -> 'a

Same as wait_block but re-raises the exception if the future failed.

Infix operators

These combinators run on either the current pool (if present), or on the same thread that just fulfilled the previous future if not.

They were previously present as module Infix_local and val infix, but are now simplified.

  • since 0.5
module Infix : sig ... end
include module type of Infix
  • since 0.5
val (>|=) : 'a t -> ('a -> 'b) -> 'b t
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
val let+ : 'a t -> ('a -> 'b) -> 'b t
val and+ : 'a t -> 'b t -> ('a * 'b) t
val let* : 'a t -> ('a -> 'b t) -> 'b t
val and* : 'a t -> 'b t -> ('a * 'b) t
module Infix_local = Infix
diff --git a/moonpool/Moonpool/Immediate_runner/index.html b/moonpool/Moonpool/Immediate_runner/index.html index d690cf9b..fb0ec3a5 100644 --- a/moonpool/Moonpool/Immediate_runner/index.html +++ b/moonpool/Moonpool/Immediate_runner/index.html @@ -1,2 +1,2 @@ -Immediate_runner (moonpool.Moonpool.Immediate_runner)

Module Moonpool.Immediate_runner

Runner that runs tasks in the caller thread.

This is removed since NEXT_RELEASE, and replaced by Moonpool_fib.Main.

  • deprecated use Moonpool_fib.Main
+Immediate_runner (moonpool.Moonpool.Immediate_runner)

Module Moonpool.Immediate_runner

Runner that runs tasks in the caller thread.

This is removed since 0.6, and replaced by Moonpool_fib.Main.

  • deprecated use Moonpool_fib.Main
diff --git a/moonpool/Moonpool/Runner/index.html b/moonpool/Moonpool/Runner/index.html index 44a06c03..8dd17a6d 100644 --- a/moonpool/Moonpool/Runner/index.html +++ b/moonpool/Moonpool/Runner/index.html @@ -1,2 +1,2 @@ -Runner (moonpool.Moonpool.Runner)

Module Moonpool.Runner

Interface for runners.

This provides an abstraction for running tasks in the background, which is implemented by various thread pools.

  • since 0.3
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.t -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.t -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

val dummy : t

Runner that fails when scheduling tasks on it. Calling run_async on it will raise Failure.

  • since NEXT_RELEASE

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
val get_current_storage : unit -> Task_local_storage.t option

get_current_storage runner gets the local storage for the currently running task.

+Runner (moonpool.Moonpool.Runner)

Module Moonpool.Runner

Interface for runners.

This provides an abstraction for running tasks in the background, which is implemented by various thread pools.

  • since 0.3
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.t -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.t -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

val dummy : t

Runner that fails when scheduling tasks on it. Calling run_async on it will raise Failure.

  • since 0.6

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
val get_current_storage : unit -> Task_local_storage.t option

get_current_storage runner gets the local storage for the currently running task.

diff --git a/moonpool/Moonpool/Task_local_storage/index.html b/moonpool/Moonpool/Task_local_storage/index.html index 426c74c4..984ab8ea 100644 --- a/moonpool/Moonpool/Task_local_storage/index.html +++ b/moonpool/Moonpool/Task_local_storage/index.html @@ -1,5 +1,5 @@ -Task_local_storage (moonpool.Moonpool.Task_local_storage)

Module Moonpool.Task_local_storage

Task-local storage.

This storage is associated to the current task, just like thread-local storage is associated with the current thread. The storage is carried along in case the current task is suspended.

  • since NEXT_RELEASE
type t = Moonpool__.Types_.local_storage

Underlying storage for a task. This is mutable and not thread-safe.

val dummy : t
type 'a key

A key used to access a particular (typed) storage slot on every task.

val new_key : init:(unit -> 'a) -> unit -> 'a key

new_key ~init () makes a new key. Keys are expensive and should never be allocated dynamically or in a loop. The correct pattern is, at toplevel:

  let k_foo : foo Task_ocal_storage.key =
+Task_local_storage (moonpool.Moonpool.Task_local_storage)

Module Moonpool.Task_local_storage

Task-local storage.

This storage is associated to the current task, just like thread-local storage is associated with the current thread. The storage is carried along in case the current task is suspended.

  • since 0.6
type t = Moonpool__.Types_.local_storage

Underlying storage for a task. This is mutable and not thread-safe.

val dummy : t
type 'a key

A key used to access a particular (typed) storage slot on every task.

val new_key : init:(unit -> 'a) -> unit -> 'a key

new_key ~init () makes a new key. Keys are expensive and should never be allocated dynamically or in a loop. The correct pattern is, at toplevel:

  let k_foo : foo Task_ocal_storage.key =
     Task_local_storage.new_key ~init:(fun () -> make_foo ()) ()
 
 (* … *)
diff --git a/moonpool/Moonpool/Ws_pool/index.html b/moonpool/Moonpool/Ws_pool/index.html
index 5afb3e60..261af041 100644
--- a/moonpool/Moonpool/Ws_pool/index.html
+++ b/moonpool/Moonpool/Ws_pool/index.html
@@ -1,9 +1,9 @@
 
-Ws_pool (moonpool.Moonpool.Ws_pool)

Module Moonpool.Ws_pool

Work-stealing thread pool.

A pool of threads with a worker-stealing scheduler. The pool contains a fixed number of threads that wait for work items to come, process these, and loop.

This is good for CPU-intensive tasks that feature a lot of small tasks. Note that tasks will not always be processed in the order they are scheduled, so this is not great for workloads where the latency of individual tasks matter (for that see Fifo_pool).

This implements Runner.t since 0.3.

If a pool is no longer needed, shutdown can be used to signal all threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simply the single runtime on OCaml 4).

include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.t -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.t -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

val dummy : t

Runner that fails when scheduling tasks on it. Calling run_async on it will raise Failure.

  • since NEXT_RELEASE

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
val get_current_storage : unit -> Task_local_storage.t option

get_current_storage runner gets the local storage for the currently running task.

type ('a, 'b) create_args = +Ws_pool (moonpool.Moonpool.Ws_pool)

Module Moonpool.Ws_pool

Work-stealing thread pool.

A pool of threads with a worker-stealing scheduler. The pool contains a fixed number of threads that wait for work items to come, process these, and loop.

This is good for CPU-intensive tasks that feature a lot of small tasks. Note that tasks will not always be processed in the order they are scheduled, so this is not great for workloads where the latency of individual tasks matter (for that see Fifo_pool).

This implements Runner.t since 0.3.

If a pool is no longer needed, shutdown can be used to signal all threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simply the single runtime on OCaml 4).

include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.t -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.t -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

val dummy : t

Runner that fails when scheduling tasks on it. Calling run_async on it will raise Failure.

  • since 0.6

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
val get_current_storage : unit -> Task_local_storage.t option

get_current_storage runner gets the local storage for the currently running task.

type ('a, 'b) create_args = ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exn:(exn -> Stdlib.Printexc.raw_backtrace -> unit) -> ?around_task:((t -> 'b) * (t -> 'b -> unit)) -> ?num_threads:int -> ?name:string -> - 'a

Arguments used in create. See create for explanations.

val create : (unit -> t, _) create_args

create () makes a new thread pool.

  • parameter on_init_thread

    called at the beginning of each new thread in the pool.

  • parameter num_threads

    size of the pool, ie. number of worker threads. It will be at least 1 internally, so 0 or negative values make no sense. The default is Domain.recommended_domain_count(), ie one worker thread per CPU core. On OCaml 4 the default is 4 (since there is only one domain).

  • parameter on_exit_thread

    called at the end of each thread in the pool

  • parameter around_task

    a pair of before, after, where before pool is called before a task is processed, on the worker thread about to run it, and returns x; and after pool x is called by the same thread after the task is over. (since 0.2)

  • parameter name

    a name for this thread pool, used if tracing is enabled (since NEXT_RELEASE)

val with_ : (unit -> (t -> 'a) -> 'a, _) create_args

with_ () f calls f pool, where pool is obtained via create. When f pool returns or fails, pool is shutdown and its resources are released.

Most parameters are the same as in create.

  • since 0.3
+ 'a

Arguments used in create. See create for explanations.

val create : (unit -> t, _) create_args

create () makes a new thread pool.

  • parameter on_init_thread

    called at the beginning of each new thread in the pool.

  • parameter num_threads

    size of the pool, ie. number of worker threads. It will be at least 1 internally, so 0 or negative values make no sense. The default is Domain.recommended_domain_count(), ie one worker thread per CPU core. On OCaml 4 the default is 4 (since there is only one domain).

  • parameter on_exit_thread

    called at the end of each thread in the pool

  • parameter around_task

    a pair of before, after, where before pool is called before a task is processed, on the worker thread about to run it, and returns x; and after pool x is called by the same thread after the task is over. (since 0.2)

  • parameter name

    a name for this thread pool, used if tracing is enabled (since 0.6)

val with_ : (unit -> (t -> 'a) -> 'a, _) create_args

with_ () f calls f pool, where pool is obtained via create. When f pool returns or fails, pool is shutdown and its resources are released.

Most parameters are the same as in create.

  • since 0.3
diff --git a/moonpool/Moonpool/index.html b/moonpool/Moonpool/index.html index ab4b9899..58aa74ac 100644 --- a/moonpool/Moonpool/index.html +++ b/moonpool/Moonpool/index.html @@ -1,2 +1,2 @@ -Moonpool (moonpool.Moonpool)

Module Moonpool

Moonpool

A pool within a bigger pool (ie the ocean). Here, we're talking about pools of Thread.t that are dispatched over several Domain.t to enable parallelism.

We provide several implementations of pools with distinct scheduling strategies, alongside some concurrency primitives such as guarding locks (Lock.t) and futures (Fut.t).

module Ws_pool : sig ... end

Work-stealing thread pool.

module Fifo_pool : sig ... end

A simple thread pool in FIFO order.

module Background_thread : sig ... end

A simple runner with a single background thread.

module Runner : sig ... end

Interface for runners.

module Immediate_runner : sig ... end

Runner that runs tasks in the caller thread.

module Exn_bt : sig ... end

Exception with backtrace.

exception Shutdown

Exception raised when trying to run tasks on runners that have been shut down.

  • since NEXT_RELEASE
val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t

Similar to Thread.create, but it picks a background domain at random to run the thread. This ensures that we don't always pick the same domain to run all the various threads needed in an application (timers, event loops, etc.)

val run_async : ?ls:Task_local_storage.t -> Runner.t -> (unit -> unit) -> unit

run_async runner task schedules the task to run on the given runner. This means task() will be executed at some point in the future, possibly in another thread.

  • since 0.5
val run_wait_block : ?ls:Task_local_storage.t -> Runner.t -> (unit -> 'a) -> 'a

run_wait_block runner f schedules f for later execution on the runner, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

  • since NEXT_RELEASE

Number of threads recommended to saturate the CPU. For IO pools this makes little sense (you might want more threads than this because many of them will be blocked most of the time).

  • since 0.5
val spawn : on:Runner.t -> (unit -> 'a) -> 'a Fut.t

spawn ~on f runs f() on the runner (a thread pool typically) and returns a future result for it. See Fut.spawn.

  • since 0.5
val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.t
val await : 'a Fut.t -> 'a

Await a future. See await. Only on OCaml >= 5.0.

  • since 0.5
module Lock : sig ... end

Mutex-protected resource.

module Fut : sig ... end

Futures.

module Chan : sig ... end

Channels.

module Task_local_storage : sig ... end

Task-local storage.

module Thread_local_storage = Moonpool_private.Thread_local_storage_
module Blocking_queue : sig ... end

A simple blocking queue.

module Bounded_queue : sig ... end

A blocking queue of finite size.

module Atomic = Moonpool_private.Atomic_

Atomic values.

+Moonpool (moonpool.Moonpool)

Module Moonpool

Moonpool

A pool within a bigger pool (ie the ocean). Here, we're talking about pools of Thread.t that are dispatched over several Domain.t to enable parallelism.

We provide several implementations of pools with distinct scheduling strategies, alongside some concurrency primitives such as guarding locks (Lock.t) and futures (Fut.t).

module Ws_pool : sig ... end

Work-stealing thread pool.

module Fifo_pool : sig ... end

A simple thread pool in FIFO order.

module Background_thread : sig ... end

A simple runner with a single background thread.

module Runner : sig ... end

Interface for runners.

module Immediate_runner : sig ... end

Runner that runs tasks in the caller thread.

module Exn_bt : sig ... end

Exception with backtrace.

exception Shutdown

Exception raised when trying to run tasks on runners that have been shut down.

  • since 0.6
val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t

Similar to Thread.create, but it picks a background domain at random to run the thread. This ensures that we don't always pick the same domain to run all the various threads needed in an application (timers, event loops, etc.)

val run_async : ?ls:Task_local_storage.t -> Runner.t -> (unit -> unit) -> unit

run_async runner task schedules the task to run on the given runner. This means task() will be executed at some point in the future, possibly in another thread.

  • since 0.5
val run_wait_block : ?ls:Task_local_storage.t -> Runner.t -> (unit -> 'a) -> 'a

run_wait_block runner f schedules f for later execution on the runner, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

  • since 0.6

Number of threads recommended to saturate the CPU. For IO pools this makes little sense (you might want more threads than this because many of them will be blocked most of the time).

  • since 0.5
val spawn : on:Runner.t -> (unit -> 'a) -> 'a Fut.t

spawn ~on f runs f() on the runner (a thread pool typically) and returns a future result for it. See Fut.spawn.

  • since 0.5
val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.t
val await : 'a Fut.t -> 'a

Await a future. See await. Only on OCaml >= 5.0.

  • since 0.5
module Lock : sig ... end

Mutex-protected resource.

module Fut : sig ... end

Futures.

module Chan : sig ... end

Channels.

module Task_local_storage : sig ... end

Task-local storage.

module Thread_local_storage = Moonpool_private.Thread_local_storage_
module Blocking_queue : sig ... end

A simple blocking queue.

module Bounded_queue : sig ... end

A blocking queue of finite size.

module Atomic = Moonpool_private.Atomic_

Atomic values.

diff --git a/moonpool/Moonpool_dpool/index.html b/moonpool/Moonpool_dpool/index.html index a26ad039..0237fd66 100644 --- a/moonpool/Moonpool_dpool/index.html +++ b/moonpool/Moonpool_dpool/index.html @@ -1,2 +1,2 @@ -Moonpool_dpool (moonpool.Moonpool_dpool)

Module Moonpool_dpool

Static pool of domains.

These domains are shared between all the pools in moonpool. The rationale is that we should not have more domains than cores, so it's easier to reserve exactly that many domain slots, and run more flexible thread pools on top (each domain being shared by potentially multiple threads from multiple pools).

The pool should not contain actual domains if it's not in use, ie if no runner is presently actively using one or more of the domain slots.

NOTE: Interface is still experimental.

  • since NEXT_RELEASE
val max_number_of_domains : unit -> int

Number of domains in the pool when all domains are active.

Low level interface for resouce handling

Be very cautious with this interface, or resource leaks might occur.

val run_on : int -> (unit -> unit) -> unit

run_on i f runs f() on the domain with index i. Precondition: 0 <= i < n_domains(). The thread must call decr_on with i once it's done.

val decr_on : int -> unit

Signal that a thread is stopping on the domain with index i.

val run_on_and_wait : int -> (unit -> 'a) -> 'a

run_on_and_wait i f runs f() on the domain with index i, and blocks until the result of f() is returned back.

+Moonpool_dpool (moonpool.Moonpool_dpool)

Module Moonpool_dpool

Static pool of domains.

These domains are shared between all the pools in moonpool. The rationale is that we should not have more domains than cores, so it's easier to reserve exactly that many domain slots, and run more flexible thread pools on top (each domain being shared by potentially multiple threads from multiple pools).

The pool should not contain actual domains if it's not in use, ie if no runner is presently actively using one or more of the domain slots.

NOTE: Interface is still experimental.

  • since 0.6
val max_number_of_domains : unit -> int

Number of domains in the pool when all domains are active.

Low level interface for resouce handling

Be very cautious with this interface, or resource leaks might occur.

val run_on : int -> (unit -> unit) -> unit

run_on i f runs f() on the domain with index i. Precondition: 0 <= i < n_domains(). The thread must call decr_on with i once it's done.

val decr_on : int -> unit

Signal that a thread is stopping on the domain with index i.

val run_on_and_wait : int -> (unit -> 'a) -> 'a

run_on_and_wait i f runs f() on the domain with index i, and blocks until the result of f() is returned back.

diff --git a/moonpool/Moonpool_fib/Main/index.html b/moonpool/Moonpool_fib/Main/index.html index f59ef805..dd3f049b 100644 --- a/moonpool/Moonpool_fib/Main/index.html +++ b/moonpool/Moonpool_fib/Main/index.html @@ -1,2 +1,2 @@ -Main (moonpool.Moonpool_fib.Main)

Module Moonpool_fib.Main

Main thread.

This is evolved from Moonpool.Immediate_runner, but unlike it, this API assumes you run it in a thread (possibly the main thread) which will block until the initial computation is done.

This means it's reasonable to use Main.main (fun () -> do_everything) at the beginning of the program. Other Moonpool pools can be created for background tasks, etc. to do the heavy lifting, and the main thread (inside this immediate runner) can coordinate tasks via Fiber.await.

Aside from the fact that this blocks the caller thread, it is fairly similar to Background_thread in that there's a single worker to process tasks/fibers.

This handles effects, including the ones in Fiber.

  • since NEXT_RELEASE
val main : (Moonpool.Runner.t -> 'a) -> 'a

main f runs f() in a scope that handles effects, including Fiber.await.

This scope can run background tasks as well, in a cooperative fashion.

+Main (moonpool.Moonpool_fib.Main)

Module Moonpool_fib.Main

Main thread.

This is evolved from Moonpool.Immediate_runner, but unlike it, this API assumes you run it in a thread (possibly the main thread) which will block until the initial computation is done.

This means it's reasonable to use Main.main (fun () -> do_everything) at the beginning of the program. Other Moonpool pools can be created for background tasks, etc. to do the heavy lifting, and the main thread (inside this immediate runner) can coordinate tasks via Fiber.await.

Aside from the fact that this blocks the caller thread, it is fairly similar to Background_thread in that there's a single worker to process tasks/fibers.

This handles effects, including the ones in Fiber.

  • since 0.6
val main : (Moonpool.Runner.t -> 'a) -> 'a

main f runs f() in a scope that handles effects, including Fiber.await.

This scope can run background tasks as well, in a cooperative fashion.

diff --git a/moonpool/Moonpool_fib/index.html b/moonpool/Moonpool_fib/index.html index 90bc0690..11cee127 100644 --- a/moonpool/Moonpool_fib/index.html +++ b/moonpool/Moonpool_fib/index.html @@ -1,2 +1,2 @@ -Moonpool_fib (moonpool.Moonpool_fib)

Module Moonpool_fib

Fibers for moonpool.

See Fiber for the most important explanations.

  • since NEXT_RELEASE.
module Fiber : sig ... end

Fibers.

module Fls : sig ... end

Fiber-local storage.

module Handle : sig ... end

The unique name of a fiber.

module Main : sig ... end

Main thread.

include module type of struct include Fiber end
type cancel_callback = Moonpool.Exn_bt.t -> unit

A callback used in case of cancellation

type 'a t = 'a Fiber.{Private_}1.t

A fiber returning a value of type 'a.

val res : 'a t -> 'a Moonpool.Fut.t

Future result of the fiber.

type 'a callback = 'a Moonpool.Exn_bt.result -> unit

Callbacks that are called when a fiber is done.

type any = Fiber.{Private_}1.any =
  1. | Any : _ t -> any

Type erased fiber

val return : 'a -> 'a t
val fail : Moonpool.Exn_bt.t -> _ t
val self : unit -> any

self () is the current fiber. Must be run from inside a fiber.

  • raises Failure

    if not run from inside a fiber.

val peek : 'a t -> 'a Moonpool.Fut.or_error option

Peek inside the future result

val is_done : _ t -> bool

Has the fiber completed?

val is_cancelled : _ t -> bool

Has the fiber completed with a failure?

val is_success : _ t -> bool

Has the fiber completed with a value?

val await : 'a t -> 'a

await fib is like Fut.await (res fib)

val wait_block_exn : 'a t -> 'a

wait_block_exn fib is Fut.wait_block_exn (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.

val wait_block : 'a t -> 'a Moonpool.Fut.or_error

wait_block fib is Fut.wait_block (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.

val check_if_cancelled : unit -> unit

Check if the current fiber is cancelled, in which case this raises. Must be run from inside a fiber.

  • raises e

    if the current fiber is cancelled with exception e

  • raises Failure

    if not run from a fiber.

val yield : unit -> unit

Yield control to the scheduler from the current fiber.

  • raises Failure

    if not run from inside a fiber.

type cancel_handle = Fiber.cancel_handle

An opaque handle for a single cancel callback in a fiber

val add_on_cancel : _ t -> cancel_callback -> cancel_handle

add_on_cancel fib cb adds cb to the list of cancel callbacks for fib. If fib is already cancelled, cb is called immediately.

val remove_on_cancel : _ t -> cancel_handle -> unit

remove_on_cancel fib h removes the cancel callback associated with handle h.

val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'a

with_on_cancel fib cb (fun () -> <e>) evaluates e in a scope in which, if the fiber fib is cancelled, cb() is called. If e returns without the fiber being cancelled, this callback is removed.

val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'a

with_on_self_cancel cb f calls f() in a scope where cb is added to the cancel callbacks of the current fiber; and f() terminates, cb is removed from the list.

val on_result : 'a t -> 'a callback -> unit

Wait for fiber to be done and call the callback with the result. If the fiber is done already then the callback is invoked immediately with its result.

val spawn_top : on:Moonpool.Runner.t -> (unit -> 'a) -> 'a t

spawn_top ~on f spawns a new (toplevel) fiber onto the given runner. This fiber is not the child of any other fiber: its lifetime is only determined by the lifetime of f().

val spawn : ?on:Moonpool.Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a t

spawn ~protect f spawns a sub-fiber f_child from a running fiber parent. The sub-fiber f_child is attached to the current fiber and fails if the current fiber parent fails.

  • parameter on

    if provided, start the fiber on the given runner. If not provided, use the parent's runner.

  • parameter protect

    if true, when f_child fails, it does not affect parent. If false, f_child failing also causes parent to fail (and therefore all other children of parent). Default is true.

    Must be run from inside a fiber.

  • raises Failure

    if not run from inside a fiber.

val spawn_ignore : ?protect:bool -> (unit -> _) -> unit

spawn_ignore f is ignore (spawn f). The fiber will still affect termination of the parent, ie. the parent will exit only after this new fiber exits.

include module type of struct include Main end
val main : (Moonpool.Runner.t -> 'a) -> 'a

main f runs f() in a scope that handles effects, including Fiber.await.

This scope can run background tasks as well, in a cooperative fashion.

+Moonpool_fib (moonpool.Moonpool_fib)

Module Moonpool_fib

Fibers for moonpool.

See Fiber for the most important explanations.

  • since 0.6.
module Fiber : sig ... end

Fibers.

module Fls : sig ... end

Fiber-local storage.

module Handle : sig ... end

The unique name of a fiber.

module Main : sig ... end

Main thread.

include module type of struct include Fiber end
type cancel_callback = Moonpool.Exn_bt.t -> unit

A callback used in case of cancellation

type 'a t = 'a Fiber.{Private_}1.t

A fiber returning a value of type 'a.

val res : 'a t -> 'a Moonpool.Fut.t

Future result of the fiber.

type 'a callback = 'a Moonpool.Exn_bt.result -> unit

Callbacks that are called when a fiber is done.

type any = Fiber.{Private_}1.any =
  1. | Any : _ t -> any

Type erased fiber

val return : 'a -> 'a t
val fail : Moonpool.Exn_bt.t -> _ t
val self : unit -> any

self () is the current fiber. Must be run from inside a fiber.

  • raises Failure

    if not run from inside a fiber.

val peek : 'a t -> 'a Moonpool.Fut.or_error option

Peek inside the future result

val is_done : _ t -> bool

Has the fiber completed?

val is_cancelled : _ t -> bool

Has the fiber completed with a failure?

val is_success : _ t -> bool

Has the fiber completed with a value?

val await : 'a t -> 'a

await fib is like Fut.await (res fib)

val wait_block_exn : 'a t -> 'a

wait_block_exn fib is Fut.wait_block_exn (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.

val wait_block : 'a t -> 'a Moonpool.Fut.or_error

wait_block fib is Fut.wait_block (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.

val check_if_cancelled : unit -> unit

Check if the current fiber is cancelled, in which case this raises. Must be run from inside a fiber.

  • raises e

    if the current fiber is cancelled with exception e

  • raises Failure

    if not run from a fiber.

val yield : unit -> unit

Yield control to the scheduler from the current fiber.

  • raises Failure

    if not run from inside a fiber.

type cancel_handle = Fiber.cancel_handle

An opaque handle for a single cancel callback in a fiber

val add_on_cancel : _ t -> cancel_callback -> cancel_handle

add_on_cancel fib cb adds cb to the list of cancel callbacks for fib. If fib is already cancelled, cb is called immediately.

val remove_on_cancel : _ t -> cancel_handle -> unit

remove_on_cancel fib h removes the cancel callback associated with handle h.

val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'a

with_on_cancel fib cb (fun () -> <e>) evaluates e in a scope in which, if the fiber fib is cancelled, cb() is called. If e returns without the fiber being cancelled, this callback is removed.

val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'a

with_on_self_cancel cb f calls f() in a scope where cb is added to the cancel callbacks of the current fiber; and f() terminates, cb is removed from the list.

val on_result : 'a t -> 'a callback -> unit

Wait for fiber to be done and call the callback with the result. If the fiber is done already then the callback is invoked immediately with its result.

val spawn_top : on:Moonpool.Runner.t -> (unit -> 'a) -> 'a t

spawn_top ~on f spawns a new (toplevel) fiber onto the given runner. This fiber is not the child of any other fiber: its lifetime is only determined by the lifetime of f().

val spawn : ?on:Moonpool.Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a t

spawn ~protect f spawns a sub-fiber f_child from a running fiber parent. The sub-fiber f_child is attached to the current fiber and fails if the current fiber parent fails.

  • parameter on

    if provided, start the fiber on the given runner. If not provided, use the parent's runner.

  • parameter protect

    if true, when f_child fails, it does not affect parent. If false, f_child failing also causes parent to fail (and therefore all other children of parent). Default is true.

    Must be run from inside a fiber.

  • raises Failure

    if not run from inside a fiber.

val spawn_ignore : ?protect:bool -> (unit -> _) -> unit

spawn_ignore f is ignore (spawn f). The fiber will still affect termination of the parent, ie. the parent will exit only after this new fiber exits.

include module type of struct include Main end
val main : (Moonpool.Runner.t -> 'a) -> 'a

main f runs f() in a scope that handles effects, including Fiber.await.

This scope can run background tasks as well, in a cooperative fashion.

diff --git a/moonpool/_doc-dir/CHANGES.md b/moonpool/_doc-dir/CHANGES.md index 3e855a20..a295fa77 100644 --- a/moonpool/_doc-dir/CHANGES.md +++ b/moonpool/_doc-dir/CHANGES.md @@ -1,4 +1,29 @@ +# 0.6 + +- breaking: remove `Immediate_runner` (bug prone and didn't + handle effects). `Moonpool_fib.main` can be used to handle + effects in the main function. +- remove deprecated alias `Moonpool.Pool` + +- feat: add structured concurrency sub-library `moonpool.fib` with + fibers. Fibers can use `await` and spawn other fibers that will + be appropriately cancelled when their parent is. +- feat: add add `moonpool-lwt` as an experimental bridge between moonpool and lwt. + This allows moonpool runners to be used from within Lwt to + perform background computations, and conversely to call Lwt from + moonpool with some precautions. +- feat: task-local storage in the main moonpool runners, available from + fibers and regular tasks. +- feat: add `Exn_bt` to core +- feat: add `Runner.dummy` +- make `moonpool.forkjoin` optional (only on OCaml >= 5.0) +- feat: add `Fut.Advanced.barrier_on_abstract_container_of_futures` +- feat: add `Fut.map_list` + +- refactor: split off domain pool to `moonpool.dpool` +- fix too early exit in Ws_pool + # 0.5.1 - fix `Ws_pool`: workers would exit before processing