From c34156bfc18135401a418866e31adffbd8621d8a Mon Sep 17 00:00:00 2001 From: c-cube Date: Tue, 5 Mar 2024 01:52:16 +0000 Subject: [PATCH] deploy: 184690b21c16254d22949de2c461926ee9654ae1 --- moonpool-lwt/_doc-dir/README.md | 51 +++++++++++++++++++ .../For_runner_implementors/index.html | 2 +- .../Moonpool/Background_thread/index.html | 2 +- .../For_runner_implementors/index.html | 2 +- moonpool/Moonpool/Fifo_pool/index.html | 2 +- .../For_runner_implementors/index.html | 8 --- moonpool/Moonpool/Immediate_runner/index.html | 2 +- .../Runner/For_runner_implementors/index.html | 2 +- moonpool/Moonpool/Runner/index.html | 2 +- .../Task_local_storage/Direct/index.html | 2 + .../Moonpool/Task_local_storage/index.html | 4 +- .../For_runner_implementors/index.html | 2 +- moonpool/Moonpool/Ws_pool/index.html | 2 +- moonpool/Moonpool/index.html | 10 +--- .../Moonpool__Immediate_runner/index.html | 2 - moonpool/Moonpool_fib/Fiber/index.html | 6 +-- moonpool/Moonpool_fib/Fls/index.html | 4 +- moonpool/Moonpool_fib/Handle/index.html | 2 +- moonpool/Moonpool_fib/Main/index.html | 2 + moonpool/Moonpool_fib/index.html | 2 +- moonpool/Moonpool_fib__/index.html | 2 + moonpool/Moonpool_fib__Main/index.html | 2 + moonpool/_doc-dir/README.md | 51 +++++++++++++++++++ moonpool/index.html | 2 +- 24 files changed, 128 insertions(+), 40 deletions(-) delete mode 100644 moonpool/Moonpool/Immediate_runner/For_runner_implementors/index.html create mode 100644 moonpool/Moonpool/Task_local_storage/Direct/index.html delete mode 100644 moonpool/Moonpool__Immediate_runner/index.html create mode 100644 moonpool/Moonpool_fib/Main/index.html create mode 100644 moonpool/Moonpool_fib__/index.html create mode 100644 moonpool/Moonpool_fib__Main/index.html diff --git a/moonpool-lwt/_doc-dir/README.md b/moonpool-lwt/_doc-dir/README.md index c51361df..5a37e6a0 100644 --- a/moonpool-lwt/_doc-dir/README.md +++ b/moonpool-lwt/_doc-dir/README.md @@ -165,6 +165,57 @@ val expected_sum : int = 5050 - : unit = () ``` +### Errors + +We have a `Exn_bt.t` type that comes in handy in many places. It bundles together +an exception and the backtrace associated with the place the exception was caught. + +### Fibers + +On OCaml 5, Moonpool comes with a library `moonpool.fib` (module `Moonpool_fib`) +which provides _lightweight fibers_ +that can run on any Moonpool runner. +These fibers are a sort of lightweight thread, dispatched on the runner's +background thread(s). +Fibers rely on effects to implement `Fiber.await`, suspending themselves until the `await`-ed fiber +is done. + +```ocaml +# #require "moonpool.fib";; + +# (* convenient alias *) + module F = Moonpool_fib;; +module F = Moonpool_fib +# F.main (fun _runner -> + let f1 = F.spawn (fun () -> fib 10) in + let f2 = F.spawn (fun () -> fib 15) in + F.await f1 + F.await f2);; +- : int = 1076 +``` + +Fibers form a _tree_, where a fiber calling `Fiber.spawn` to start a sub-fiber is +the sub-fiber's _parent_. +When a parent fails, all its children are cancelled (forced to fail). +This is a simple form of [Structured Concurrency](https://en.wikipedia.org/wiki/Structured_concurrency). + +Like a future, a fiber eventually _resolves_ into a value (or an `Exn_bt.t`) that it's possible +to `await`. With `Fiber.res : 'a Fiber.t -> 'a Fut.t` it's possible to access that result +as a regular future, too. +However, this resolution is only done after all the children of the fiber have +resolved — the lifetime of fibers forms a well-nested tree in that sense. + +When a fiber is suspended because it `await`s another fiber (or future), the scheduler's +thread on which it was running becomes available again and can go on process another task. +When the fiber resumes, it will automatically be re-scheduled on the same runner it started on. +This means fibers on pool P1 can await fibers from pool P2 and still be resumed on P1. + +In addition to all that, fibers provide _fiber local storage_ (like thread-local storage, but per fiber). +This storage is inherited in `spawn` (as a shallow copy only — it's advisable to only +put persistent data in storage to avoid confusing aliasing). +The storage is convenient for carrying around context for cross-cutting concerns such +as logging or tracing (e.g. a log tag for the current user or request ID, or a tracing +scope). + ### Fork-join On OCaml 5, again using effect handlers, the sublibrary `moonpool.forkjoin` diff --git a/moonpool/Moonpool/Background_thread/For_runner_implementors/index.html b/moonpool/Moonpool/Background_thread/For_runner_implementors/index.html index 681bb222..6d61ea48 100644 --- a/moonpool/Moonpool/Background_thread/For_runner_implementors/index.html +++ b/moonpool/Moonpool/Background_thread/For_runner_implementors/index.html @@ -3,6 +3,6 @@ size:(unit -> int) -> num_tasks:(unit -> int) -> shutdown:(wait:bool -> unit -> unit) -> - run_async:(ls:Task_local_storage.storage -> task -> unit) -> + run_async:(ls:Task_local_storage.t -> task -> unit) -> unit -> t

Create a new runner.

NOTE: the runner should support DLA and Suspend_ on OCaml 5.x, so that Fork_join and other 5.x features work properly.

Key that should be used by each runner to store itself in TLS on every thread it controls, so that tasks running on these threads can access the runner. This is necessary for get_current_runner to work.

diff --git a/moonpool/Moonpool/Background_thread/index.html b/moonpool/Moonpool/Background_thread/index.html index 5eb626c0..ec647226 100644 --- a/moonpool/Moonpool/Background_thread/index.html +++ b/moonpool/Moonpool/Background_thread/index.html @@ -1,5 +1,5 @@ -Background_thread (moonpool.Moonpool.Background_thread)

Module Moonpool.Background_thread

A simple runner with a single background thread.

Because this is guaranteed to have a single worker thread, tasks scheduled in this runner always run asynchronously but in a sequential fashion.

This is similar to Fifo_pool with exactly one thread.

include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.storage -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.storage -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
type ('a, 'b) create_args = +Background_thread (moonpool.Moonpool.Background_thread)

Module Moonpool.Background_thread

A simple runner with a single background thread.

Because this is guaranteed to have a single worker thread, tasks scheduled in this runner always run asynchronously but in a sequential fashion.

This is similar to Fifo_pool with exactly one thread.

  • since NEXT_RELEASE
include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.t -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.t -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

val dummy : t

Runner that fails when scheduling tasks on it. Calling run_async on it will raise Failure.

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
val get_current_storage : unit -> Task_local_storage.t option

get_current_storage runner gets the local storage for the currently running task.

type ('a, 'b) create_args = ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exn:(exn -> Stdlib.Printexc.raw_backtrace -> unit) -> diff --git a/moonpool/Moonpool/Fifo_pool/For_runner_implementors/index.html b/moonpool/Moonpool/Fifo_pool/For_runner_implementors/index.html index a0601776..1567d526 100644 --- a/moonpool/Moonpool/Fifo_pool/For_runner_implementors/index.html +++ b/moonpool/Moonpool/Fifo_pool/For_runner_implementors/index.html @@ -3,6 +3,6 @@ size:(unit -> int) -> num_tasks:(unit -> int) -> shutdown:(wait:bool -> unit -> unit) -> - run_async:(ls:Task_local_storage.storage -> task -> unit) -> + run_async:(ls:Task_local_storage.t -> task -> unit) -> unit -> t

Create a new runner.

NOTE: the runner should support DLA and Suspend_ on OCaml 5.x, so that Fork_join and other 5.x features work properly.

Key that should be used by each runner to store itself in TLS on every thread it controls, so that tasks running on these threads can access the runner. This is necessary for get_current_runner to work.

diff --git a/moonpool/Moonpool/Fifo_pool/index.html b/moonpool/Moonpool/Fifo_pool/index.html index 173e7644..769e26ee 100644 --- a/moonpool/Moonpool/Fifo_pool/index.html +++ b/moonpool/Moonpool/Fifo_pool/index.html @@ -1,5 +1,5 @@ -Fifo_pool (moonpool.Moonpool.Fifo_pool)

Module Moonpool.Fifo_pool

A simple thread pool in FIFO order.

FIFO: first-in, first-out. Basically tasks are put into a queue, and worker threads pull them out of the queue at the other end.

Since this uses a single blocking queue to manage tasks, it's very simple and reliable. The number of worker threads is fixed, but they are spread over several domains to enable parallelism.

This can be useful for latency-sensitive applications (e.g. as a pool of workers for network servers). Work-stealing pools might have higher throughput but they're very unfair to some tasks; by contrast, here, older tasks have priority over younger tasks.

  • since 0.5
include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.storage -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.storage -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
type ('a, 'b) create_args = +Fifo_pool (moonpool.Moonpool.Fifo_pool)

Module Moonpool.Fifo_pool

A simple thread pool in FIFO order.

FIFO: first-in, first-out. Basically tasks are put into a queue, and worker threads pull them out of the queue at the other end.

Since this uses a single blocking queue to manage tasks, it's very simple and reliable. The number of worker threads is fixed, but they are spread over several domains to enable parallelism.

This can be useful for latency-sensitive applications (e.g. as a pool of workers for network servers). Work-stealing pools might have higher throughput but they're very unfair to some tasks; by contrast, here, older tasks have priority over younger tasks.

  • since 0.5
include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.t -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.t -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

val dummy : t

Runner that fails when scheduling tasks on it. Calling run_async on it will raise Failure.

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
val get_current_storage : unit -> Task_local_storage.t option

get_current_storage runner gets the local storage for the currently running task.

type ('a, 'b) create_args = ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exn:(exn -> Stdlib.Printexc.raw_backtrace -> unit) -> diff --git a/moonpool/Moonpool/Immediate_runner/For_runner_implementors/index.html b/moonpool/Moonpool/Immediate_runner/For_runner_implementors/index.html deleted file mode 100644 index 31922564..00000000 --- a/moonpool/Moonpool/Immediate_runner/For_runner_implementors/index.html +++ /dev/null @@ -1,8 +0,0 @@ - -For_runner_implementors (moonpool.Moonpool.Immediate_runner.For_runner_implementors)

Module Immediate_runner.For_runner_implementors

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val create : - size:(unit -> int) -> - num_tasks:(unit -> int) -> - shutdown:(wait:bool -> unit -> unit) -> - run_async:(ls:Task_local_storage.storage -> task -> unit) -> - unit -> - t

Create a new runner.

NOTE: the runner should support DLA and Suspend_ on OCaml 5.x, so that Fork_join and other 5.x features work properly.

Key that should be used by each runner to store itself in TLS on every thread it controls, so that tasks running on these threads can access the runner. This is necessary for get_current_runner to work.

diff --git a/moonpool/Moonpool/Immediate_runner/index.html b/moonpool/Moonpool/Immediate_runner/index.html index 9c7ded75..d690cf9b 100644 --- a/moonpool/Moonpool/Immediate_runner/index.html +++ b/moonpool/Moonpool/Immediate_runner/index.html @@ -1,2 +1,2 @@ -Immediate_runner (moonpool.Moonpool.Immediate_runner)

Module Moonpool.Immediate_runner

Runner that runs tasks immediately in the caller thread.

Whenever a task is submitted to this runner via Runner.run_async r task, the task is run immediately in the caller thread as task(). There are no background threads, no resource, this is just a trivial implementation of the interface.

This can be useful when an implementation needs a runner, but there isn't enough work to justify starting an actual full thread pool.

Another situation is when threads cannot be used at all (e.g. because you plan to call Unix.fork later).

NOTE: this does not handle the Suspend effect, so await, fork-join, etc. will NOT work on this runner.

  • since 0.5
include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.storage -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.storage -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
val runner : t

The trivial runner that actually runs tasks at the calling point.

+Immediate_runner (moonpool.Moonpool.Immediate_runner)

Module Moonpool.Immediate_runner

Runner that runs tasks in the caller thread.

This is removed since NEXT_RELEASE, and replaced by Moonpool_fib.Main.

  • deprecated use Moonpool_fib.Main
diff --git a/moonpool/Moonpool/Runner/For_runner_implementors/index.html b/moonpool/Moonpool/Runner/For_runner_implementors/index.html index 0f9c5247..66a85997 100644 --- a/moonpool/Moonpool/Runner/For_runner_implementors/index.html +++ b/moonpool/Moonpool/Runner/For_runner_implementors/index.html @@ -3,6 +3,6 @@ size:(unit -> int) -> num_tasks:(unit -> int) -> shutdown:(wait:bool -> unit -> unit) -> - run_async:(ls:Task_local_storage.storage -> task -> unit) -> + run_async:(ls:Task_local_storage.t -> task -> unit) -> unit -> t

Create a new runner.

NOTE: the runner should support DLA and Suspend_ on OCaml 5.x, so that Fork_join and other 5.x features work properly.

Key that should be used by each runner to store itself in TLS on every thread it controls, so that tasks running on these threads can access the runner. This is necessary for get_current_runner to work.

diff --git a/moonpool/Moonpool/Runner/index.html b/moonpool/Moonpool/Runner/index.html index 07c5fc7b..cc0ab1b0 100644 --- a/moonpool/Moonpool/Runner/index.html +++ b/moonpool/Moonpool/Runner/index.html @@ -1,2 +1,2 @@ -Runner (moonpool.Moonpool.Runner)

Module Moonpool.Runner

Interface for runners.

This provides an abstraction for running tasks in the background, which is implemented by various thread pools.

  • since 0.3
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.storage -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.storage -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
+Runner (moonpool.Moonpool.Runner)

Module Moonpool.Runner

Interface for runners.

This provides an abstraction for running tasks in the background, which is implemented by various thread pools.

  • since 0.3
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.t -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.t -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

val dummy : t

Runner that fails when scheduling tasks on it. Calling run_async on it will raise Failure.

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
val get_current_storage : unit -> Task_local_storage.t option

get_current_storage runner gets the local storage for the currently running task.

diff --git a/moonpool/Moonpool/Task_local_storage/Direct/index.html b/moonpool/Moonpool/Task_local_storage/Direct/index.html new file mode 100644 index 00000000..77e678f6 --- /dev/null +++ b/moonpool/Moonpool/Task_local_storage/Direct/index.html @@ -0,0 +1,2 @@ + +Direct (moonpool.Moonpool.Task_local_storage.Direct)

Module Task_local_storage.Direct

Direct access to values from a storage handle

val get : t -> 'a key -> 'a

Access a key

val set : t -> 'a key -> 'a -> unit
val create : unit -> t
val copy : t -> t
diff --git a/moonpool/Moonpool/Task_local_storage/index.html b/moonpool/Moonpool/Task_local_storage/index.html index 7c5d437b..426c74c4 100644 --- a/moonpool/Moonpool/Task_local_storage/index.html +++ b/moonpool/Moonpool/Task_local_storage/index.html @@ -1,8 +1,8 @@ -Task_local_storage (moonpool.Moonpool.Task_local_storage)

Module Moonpool.Task_local_storage

Task-local storage.

This storage is associated to the current task, just like thread-local storage is associated with the current thread. The storage is carried along in case the current task is suspended.

  • since NEXT_RELEASE
type storage

Underlying storage for a task

type 'a key

A key used to access a particular (typed) storage slot on every task.

val new_key : init:(unit -> 'a) -> unit -> 'a key

new_key ~init () makes a new key. Keys are expensive and should never be allocated dynamically or in a loop. The correct pattern is, at toplevel:

  let k_foo : foo Task_ocal_storage.key =
+Task_local_storage (moonpool.Moonpool.Task_local_storage)

Module Moonpool.Task_local_storage

Task-local storage.

This storage is associated to the current task, just like thread-local storage is associated with the current thread. The storage is carried along in case the current task is suspended.

  • since NEXT_RELEASE
type t = Moonpool__.Types_.local_storage

Underlying storage for a task. This is mutable and not thread-safe.

val dummy : t
type 'a key

A key used to access a particular (typed) storage slot on every task.

val new_key : init:(unit -> 'a) -> unit -> 'a key

new_key ~init () makes a new key. Keys are expensive and should never be allocated dynamically or in a loop. The correct pattern is, at toplevel:

  let k_foo : foo Task_ocal_storage.key =
     Task_local_storage.new_key ~init:(fun () -> make_foo ()) ()
 
 (* … *)
 
 (* use it: *)
-let … = Task_local_storage.get k_foo
val get : 'a key -> 'a

get k gets the value for the current task for key k. Must be run from inside a task running on a runner.

  • raises Failure

    otherwise

val get_opt : 'a key -> 'a option

get_opt k gets the current task's value for key k, or None if not run from inside the task.

val set : 'a key -> 'a -> unit

set k v sets the storage for k to v. Must be run from inside a task running on a runner.

  • raises Failure

    otherwise

val with_value : 'a key -> 'a -> (unit -> 'b) -> 'b

with_value k v f sets k to v for the duration of the call to f(). When f() returns (or fails), k is restored to its old value.

+let … = Task_local_storage.get k_foo
val get : 'a key -> 'a

get k gets the value for the current task for key k. Must be run from inside a task running on a runner.

  • raises Failure

    otherwise

val get_opt : 'a key -> 'a option

get_opt k gets the current task's value for key k, or None if not run from inside the task.

val set : 'a key -> 'a -> unit

set k v sets the storage for k to v. Must be run from inside a task running on a runner.

  • raises Failure

    otherwise

val with_value : 'a key -> 'a -> (unit -> 'b) -> 'b

with_value k v f sets k to v for the duration of the call to f(). When f() returns (or fails), k is restored to its old value.

val get_current : unit -> t option

Access the current storage, or None if not run from within a task.

module Direct : sig ... end

Direct access to values from a storage handle

diff --git a/moonpool/Moonpool/Ws_pool/For_runner_implementors/index.html b/moonpool/Moonpool/Ws_pool/For_runner_implementors/index.html index 6ed2b7b2..05657a32 100644 --- a/moonpool/Moonpool/Ws_pool/For_runner_implementors/index.html +++ b/moonpool/Moonpool/Ws_pool/For_runner_implementors/index.html @@ -3,6 +3,6 @@ size:(unit -> int) -> num_tasks:(unit -> int) -> shutdown:(wait:bool -> unit -> unit) -> - run_async:(ls:Task_local_storage.storage -> task -> unit) -> + run_async:(ls:Task_local_storage.t -> task -> unit) -> unit -> t

Create a new runner.

NOTE: the runner should support DLA and Suspend_ on OCaml 5.x, so that Fork_join and other 5.x features work properly.

Key that should be used by each runner to store itself in TLS on every thread it controls, so that tasks running on these threads can access the runner. This is necessary for get_current_runner to work.

diff --git a/moonpool/Moonpool/Ws_pool/index.html b/moonpool/Moonpool/Ws_pool/index.html index fea8b129..6f06b554 100644 --- a/moonpool/Moonpool/Ws_pool/index.html +++ b/moonpool/Moonpool/Ws_pool/index.html @@ -1,5 +1,5 @@ -Ws_pool (moonpool.Moonpool.Ws_pool)

Module Moonpool.Ws_pool

Work-stealing thread pool.

A pool of threads with a worker-stealing scheduler. The pool contains a fixed number of threads that wait for work items to come, process these, and loop.

This is good for CPU-intensive tasks that feature a lot of small tasks. Note that tasks will not always be processed in the order they are scheduled, so this is not great for workloads where the latency of individual tasks matter (for that see Fifo_pool).

This implements Runner.t since 0.3.

If a pool is no longer needed, shutdown can be used to signal all threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simply the single runtime on OCaml 4).

include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.storage -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.storage -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
type ('a, 'b) create_args = +Ws_pool (moonpool.Moonpool.Ws_pool)

Module Moonpool.Ws_pool

Work-stealing thread pool.

A pool of threads with a worker-stealing scheduler. The pool contains a fixed number of threads that wait for work items to come, process these, and loop.

This is good for CPU-intensive tasks that feature a lot of small tasks. Note that tasks will not always be processed in the order they are scheduled, so this is not great for workloads where the latency of individual tasks matter (for that see Fifo_pool).

This implements Runner.t since 0.3.

If a pool is no longer needed, shutdown can be used to signal all threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simply the single runtime on OCaml 4).

include module type of Runner
type task = unit -> unit
type t

A runner.

If a runner is no longer needed, shutdown can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.

The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count on OCaml 5, and simple the single runtime on OCaml 4).

val size : t -> int

Number of threads/workers.

val num_tasks : t -> int

Current number of tasks. This is at best a snapshot, useful for metrics and debugging.

val shutdown : t -> unit

Shutdown the runner and wait for it to terminate. Idempotent.

val shutdown_without_waiting : t -> unit

Shutdown the pool, and do not wait for it to terminate. Idempotent.

exception Shutdown
val run_async : ?ls:Task_local_storage.t -> t -> task -> unit

run_async pool f schedules f for later execution on the runner in one of the threads. f() will run on one of the runner's worker threads/domains.

  • parameter ls

    if provided, run the task with this initial local storage

  • raises Shutdown

    if the runner was shut down before run_async was called.

val run_wait_block : ?ls:Task_local_storage.t -> t -> (unit -> 'a) -> 'a

run_wait_block pool f schedules f for later execution on the pool, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

val dummy : t

Runner that fails when scheduling tasks on it. Calling run_async on it will raise Failure.

Implementing runners

module For_runner_implementors : sig ... end

This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.

val get_current_runner : unit -> t option

Access the current runner. This returns Some r if the call happens on a thread that belongs in a runner.

  • since 0.5
val get_current_storage : unit -> Task_local_storage.t option

get_current_storage runner gets the local storage for the currently running task.

type ('a, 'b) create_args = ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exn:(exn -> Stdlib.Printexc.raw_backtrace -> unit) -> diff --git a/moonpool/Moonpool/index.html b/moonpool/Moonpool/index.html index d561ab56..ab4b9899 100644 --- a/moonpool/Moonpool/index.html +++ b/moonpool/Moonpool/index.html @@ -1,10 +1,2 @@ -Moonpool (moonpool.Moonpool)

Module Moonpool

Moonpool

A pool within a bigger pool (ie the ocean). Here, we're talking about pools of Thread.t that are dispatched over several Domain.t to enable parallelism.

We provide several implementations of pools with distinct scheduling strategies, alongside some concurrency primitives such as guarding locks (Lock.t) and futures (Fut.t).

module Ws_pool : sig ... end

Work-stealing thread pool.

module Fifo_pool : sig ... end

A simple thread pool in FIFO order.

module Background_thread : sig ... end

A simple runner with a single background thread.

module Runner : sig ... end

Interface for runners.

module Immediate_runner : sig ... end

Runner that runs tasks immediately in the caller thread.

module Exn_bt : sig ... end

Exception with backtrace.

exception Shutdown

Exception raised when trying to run tasks on runners that have been shut down.

  • since NEXT_RELEASE
val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t

Similar to Thread.create, but it picks a background domain at random to run the thread. This ensures that we don't always pick the same domain to run all the various threads needed in an application (timers, event loops, etc.)

val run_async : - ?ls:Task_local_storage.storage -> - Runner.t -> - (unit -> unit) -> - unit

run_async runner task schedules the task to run on the given runner. This means task() will be executed at some point in the future, possibly in another thread.

  • since 0.5
val run_wait_block : - ?ls:Task_local_storage.storage -> - Runner.t -> - (unit -> 'a) -> - 'a

run_wait_block runner f schedules f for later execution on the runner, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

  • since NEXT_RELEASE

Number of threads recommended to saturate the CPU. For IO pools this makes little sense (you might want more threads than this because many of them will be blocked most of the time).

  • since 0.5
val spawn : on:Runner.t -> (unit -> 'a) -> 'a Fut.t

spawn ~on f runs f() on the runner (a thread pool typically) and returns a future result for it. See Fut.spawn.

  • since 0.5
val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.t
val await : 'a Fut.t -> 'a

Await a future. See await. Only on OCaml >= 5.0.

  • since 0.5
module Lock : sig ... end

Mutex-protected resource.

module Fut : sig ... end

Futures.

module Chan : sig ... end

Channels.

module Task_local_storage : sig ... end

Task-local storage.

module Thread_local_storage = Moonpool_private.Thread_local_storage_
module Blocking_queue : sig ... end

A simple blocking queue.

module Bounded_queue : sig ... end

A blocking queue of finite size.

module Atomic = Moonpool_private.Atomic_

Atomic values.

+Moonpool (moonpool.Moonpool)

Module Moonpool

Moonpool

A pool within a bigger pool (ie the ocean). Here, we're talking about pools of Thread.t that are dispatched over several Domain.t to enable parallelism.

We provide several implementations of pools with distinct scheduling strategies, alongside some concurrency primitives such as guarding locks (Lock.t) and futures (Fut.t).

module Ws_pool : sig ... end

Work-stealing thread pool.

module Fifo_pool : sig ... end

A simple thread pool in FIFO order.

module Background_thread : sig ... end

A simple runner with a single background thread.

module Runner : sig ... end

Interface for runners.

module Immediate_runner : sig ... end

Runner that runs tasks in the caller thread.

module Exn_bt : sig ... end

Exception with backtrace.

exception Shutdown

Exception raised when trying to run tasks on runners that have been shut down.

  • since NEXT_RELEASE
val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t

Similar to Thread.create, but it picks a background domain at random to run the thread. This ensures that we don't always pick the same domain to run all the various threads needed in an application (timers, event loops, etc.)

val run_async : ?ls:Task_local_storage.t -> Runner.t -> (unit -> unit) -> unit

run_async runner task schedules the task to run on the given runner. This means task() will be executed at some point in the future, possibly in another thread.

  • since 0.5
val run_wait_block : ?ls:Task_local_storage.t -> Runner.t -> (unit -> 'a) -> 'a

run_wait_block runner f schedules f for later execution on the runner, like run_async. It then blocks the current thread until f() is done executing, and returns its result. If f() raises an exception, then run_wait_block pool f will raise it as well.

NOTE be careful with deadlocks (see notes in Fut.wait_block about the required discipline to avoid deadlocks).

  • raises Shutdown

    if the runner was already shut down

  • since NEXT_RELEASE

Number of threads recommended to saturate the CPU. For IO pools this makes little sense (you might want more threads than this because many of them will be blocked most of the time).

  • since 0.5
val spawn : on:Runner.t -> (unit -> 'a) -> 'a Fut.t

spawn ~on f runs f() on the runner (a thread pool typically) and returns a future result for it. See Fut.spawn.

  • since 0.5
val spawn_on_current_runner : (unit -> 'a) -> 'a Fut.t
val await : 'a Fut.t -> 'a

Await a future. See await. Only on OCaml >= 5.0.

  • since 0.5
module Lock : sig ... end

Mutex-protected resource.

module Fut : sig ... end

Futures.

module Chan : sig ... end

Channels.

module Task_local_storage : sig ... end

Task-local storage.

module Thread_local_storage = Moonpool_private.Thread_local_storage_
module Blocking_queue : sig ... end

A simple blocking queue.

module Bounded_queue : sig ... end

A blocking queue of finite size.

module Atomic = Moonpool_private.Atomic_

Atomic values.

diff --git a/moonpool/Moonpool__Immediate_runner/index.html b/moonpool/Moonpool__Immediate_runner/index.html deleted file mode 100644 index 6b7ebfd3..00000000 --- a/moonpool/Moonpool__Immediate_runner/index.html +++ /dev/null @@ -1,2 +0,0 @@ - -Moonpool__Immediate_runner (moonpool.Moonpool__Immediate_runner)

Module Moonpool__Immediate_runner

This module is hidden.

diff --git a/moonpool/Moonpool_fib/Fiber/index.html b/moonpool/Moonpool_fib/Fiber/index.html index 4c2cb315..02630a87 100644 --- a/moonpool/Moonpool_fib/Fiber/index.html +++ b/moonpool/Moonpool_fib/Fiber/index.html @@ -1,6 +1,2 @@ -Fiber (moonpool.Moonpool_fib.Fiber)

Module Moonpool_fib.Fiber

Fibers.

A fiber is a lightweight computation that runs cooperatively alongside other fibers. In the context of moonpool, fibers have additional properties:

  • they run in a moonpool runner
  • they form a simple supervision tree, enabling a limited form of structured concurrency
type 'a t

A fiber returning a value of type 'a.

val res : 'a t -> 'a Moonpool.Fut.t

Future result of the fiber.

type 'a callback = 'a Moonpool.Exn_bt.result -> unit

Callbacks that are called when a fiber is done.

type cancel_callback = Moonpool.Exn_bt.t -> unit
type any =
  1. | Any : _ t -> any

Type erased fiber

val self : unit -> any

self () is the current fiber. Must be run from inside a fiber.

  • raises Failure

    if not run from inside a fiber.

val peek : 'a t -> 'a Moonpool.Fut.or_error option

Peek inside the future result

val is_done : _ t -> bool

Has the fiber completed?

val is_cancelled : _ t -> bool

Has the fiber completed with a failure?

val is_success : _ t -> bool

Has the fiber completed with a value?

val await : 'a t -> 'a

await fib is like Fut.await (res fib)

val wait_block_exn : 'a t -> 'a

wait_block_exn fib is Fut.wait_block_exn (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.

val wait_block : 'a t -> 'a Moonpool.Fut.or_error

wait_block fib is Fut.wait_block (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.

val check_if_cancelled : unit -> unit

Check if the current fiber is cancelled, in which case this raises. Must be run from inside a fiber.

  • raises Failure

    if not.

val yield : unit -> unit

Yield control to the scheduler from the current fiber.

  • raises Failure

    if not run from inside a fiber.

type cancel_handle

An opaque handle for a single cancel callback in a fiber

val add_on_cancel : _ t -> cancel_callback -> cancel_handle

add_on_cancel fib cb adds cb to the list of cancel callbacks for fib. If fib is already cancelled, cb is called immediately.

val remove_on_cancel : _ t -> cancel_handle -> unit

remove_on_cancel fib h removes the cancel callback associated with handle h.

val with_cancel_callback : _ t -> cancel_callback -> (unit -> 'a) -> 'a

with_cancel_callback fib cb (fun () -> <e>) evaluates e in a scope in which, if the fiber fib is cancelled, cb() is called. If e returns without the fiber being cancelled, this callback is removed.

val with_self_cancel_callback : cancel_callback -> (unit -> 'a) -> 'a

with_self_cancel_callback cb f calls f() in a scope where cb is added to the cancel callbacks of the current fiber

val on_result : 'a t -> 'a callback -> unit

Wait for fiber to be done and call the callback with the result. If the fiber is done already then the callback is invoked immediately with its result.

val spawn_top : on:Moonpool.Runner.t -> (unit -> 'a) -> 'a t

spawn_top ~on f spawns a new (toplevel) fiber onto the given runner. This fiber is not the child of any other fiber: its lifetime is only determined by the lifetime of f().

spawn_link ~protect f spawns a sub-fiber f_child from a running fiber parent. The sub-fiber f_child is attached to the current fiber and fails if the current fiber parent fails.

  • parameter protect

    if true, when f_child fails, it does not affect parent. If false, f_child failing also causes parent to fail (and therefore all other children of parent). Default is true.

    Must be run from inside a fiber.

  • raises Failure

    if not run from inside a fiber.

spawn_top_or_link ~on ~protect f runs f() in a new fiber. If this is run from inside a fiber, this behaves like spawn_link ~protect f (links to the parent); otherwise it behaves like spawn_top ~on f.

  • parameter protect

    if false, failure of the new fiber will also cancel the parent (in case there is a parent). Default true. See spawn_link.

+Fiber (moonpool.Moonpool_fib.Fiber)

Module Moonpool_fib.Fiber

Fibers.

A fiber is a lightweight computation that runs cooperatively alongside other fibers. In the context of moonpool, fibers have additional properties:

  • they run in a moonpool runner
  • they form a simple supervision tree, enabling a limited form of structured concurrency
type cancel_callback = Moonpool.Exn_bt.t -> unit

A callback used in case of cancellation

type 'a t

A fiber returning a value of type 'a.

val res : 'a t -> 'a Moonpool.Fut.t

Future result of the fiber.

type 'a callback = 'a Moonpool.Exn_bt.result -> unit

Callbacks that are called when a fiber is done.

type any =
  1. | Any : _ t -> any

Type erased fiber

val return : 'a -> 'a t
val fail : Moonpool.Exn_bt.t -> _ t
val self : unit -> any

self () is the current fiber. Must be run from inside a fiber.

  • raises Failure

    if not run from inside a fiber.

val peek : 'a t -> 'a Moonpool.Fut.or_error option

Peek inside the future result

val is_done : _ t -> bool

Has the fiber completed?

val is_cancelled : _ t -> bool

Has the fiber completed with a failure?

val is_success : _ t -> bool

Has the fiber completed with a value?

val await : 'a t -> 'a

await fib is like Fut.await (res fib)

val wait_block_exn : 'a t -> 'a

wait_block_exn fib is Fut.wait_block_exn (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.

val wait_block : 'a t -> 'a Moonpool.Fut.or_error

wait_block fib is Fut.wait_block (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.

val check_if_cancelled : unit -> unit

Check if the current fiber is cancelled, in which case this raises. Must be run from inside a fiber.

  • raises Failure

    if not.

val yield : unit -> unit

Yield control to the scheduler from the current fiber.

  • raises Failure

    if not run from inside a fiber.

type cancel_handle

An opaque handle for a single cancel callback in a fiber

val add_on_cancel : _ t -> cancel_callback -> cancel_handle

add_on_cancel fib cb adds cb to the list of cancel callbacks for fib. If fib is already cancelled, cb is called immediately.

val remove_on_cancel : _ t -> cancel_handle -> unit

remove_on_cancel fib h removes the cancel callback associated with handle h.

val with_cancel_callback : _ t -> cancel_callback -> (unit -> 'a) -> 'a

with_cancel_callback fib cb (fun () -> <e>) evaluates e in a scope in which, if the fiber fib is cancelled, cb() is called. If e returns without the fiber being cancelled, this callback is removed.

val with_self_cancel_callback : cancel_callback -> (unit -> 'a) -> 'a

with_self_cancel_callback cb f calls f() in a scope where cb is added to the cancel callbacks of the current fiber

val on_result : 'a t -> 'a callback -> unit

Wait for fiber to be done and call the callback with the result. If the fiber is done already then the callback is invoked immediately with its result.

val spawn_top : on:Moonpool.Runner.t -> (unit -> 'a) -> 'a t

spawn_top ~on f spawns a new (toplevel) fiber onto the given runner. This fiber is not the child of any other fiber: its lifetime is only determined by the lifetime of f().

val spawn : ?protect:bool -> (unit -> 'a) -> 'a t

spawn ~protect f spawns a sub-fiber f_child from a running fiber parent. The sub-fiber f_child is attached to the current fiber and fails if the current fiber parent fails.

  • parameter protect

    if true, when f_child fails, it does not affect parent. If false, f_child failing also causes parent to fail (and therefore all other children of parent). Default is true.

    Must be run from inside a fiber.

  • raises Failure

    if not run from inside a fiber.

val spawn_ignore : ?protect:bool -> (unit -> _) -> unit

spawn_ignore f is ignore (spawn f). The fiber will still affect termination of the parent, ie. the parent will exit only after this new fiber exits.

diff --git a/moonpool/Moonpool_fib/Fls/index.html b/moonpool/Moonpool_fib/Fls/index.html index 4263a41d..ec3eb449 100644 --- a/moonpool/Moonpool_fib/Fls/index.html +++ b/moonpool/Moonpool_fib/Fls/index.html @@ -1,8 +1,8 @@ -Fls (moonpool.Moonpool_fib.Fls)

Module Moonpool_fib.Fls

Fiber-local storage.

This storage is associated to the current fiber, just like thread-local storage is associated with the current thread.

See Moonpool.Task_local_storage for more general information, as this is based on it.

NOTE: it's important to note that, while each fiber has its own storage, spawning a sub-fiber f2 from a fiber f1 will only do a shallow copy of the storage. Values inside f1's storage will be physically shared with f2.

include module type of struct include Moonpool.Task_local_storage end

Underlying storage for a task

A key used to access a particular (typed) storage slot on every task.

val new_key : init:(unit -> 'a) -> unit -> 'a key

new_key ~init () makes a new key. Keys are expensive and should never be allocated dynamically or in a loop. The correct pattern is, at toplevel:

  let k_foo : foo Task_ocal_storage.key =
+Fls (moonpool.Moonpool_fib.Fls)

Module Moonpool_fib.Fls

Fiber-local storage.

This storage is associated to the current fiber, just like thread-local storage is associated with the current thread.

See Moonpool.Task_local_storage for more general information, as this is based on it.

NOTE: it's important to note that, while each fiber has its own storage, spawning a sub-fiber f2 from a fiber f1 will only do a shallow copy of the storage. Values inside f1's storage will be physically shared with f2. It is thus recommended to store only persistent values in the local storage.

include module type of struct include Moonpool.Task_local_storage end
type t = Moonpool__.Types_.local_storage

Underlying storage for a task. This is mutable and not thread-safe.

val dummy : t

A key used to access a particular (typed) storage slot on every task.

val new_key : init:(unit -> 'a) -> unit -> 'a key

new_key ~init () makes a new key. Keys are expensive and should never be allocated dynamically or in a loop. The correct pattern is, at toplevel:

  let k_foo : foo Task_ocal_storage.key =
     Task_local_storage.new_key ~init:(fun () -> make_foo ()) ()
 
 (* … *)
 
 (* use it: *)
-let … = Task_local_storage.get k_foo
val get : 'a key -> 'a

get k gets the value for the current task for key k. Must be run from inside a task running on a runner.

  • raises Failure

    otherwise

val get_opt : 'a key -> 'a option

get_opt k gets the current task's value for key k, or None if not run from inside the task.

val set : 'a key -> 'a -> unit

set k v sets the storage for k to v. Must be run from inside a task running on a runner.

  • raises Failure

    otherwise

val with_value : 'a key -> 'a -> (unit -> 'b) -> 'b

with_value k v f sets k to v for the duration of the call to f(). When f() returns (or fails), k is restored to its old value.

+let … = Task_local_storage.get k_foo
val get : 'a key -> 'a

get k gets the value for the current task for key k. Must be run from inside a task running on a runner.

  • raises Failure

    otherwise

val get_opt : 'a key -> 'a option

get_opt k gets the current task's value for key k, or None if not run from inside the task.

val set : 'a key -> 'a -> unit

set k v sets the storage for k to v. Must be run from inside a task running on a runner.

  • raises Failure

    otherwise

val with_value : 'a key -> 'a -> (unit -> 'b) -> 'b

with_value k v f sets k to v for the duration of the call to f(). When f() returns (or fails), k is restored to its old value.

val get_current : unit -> t option

Access the current storage, or None if not run from within a task.

Direct access to values from a storage handle

diff --git a/moonpool/Moonpool_fib/Handle/index.html b/moonpool/Moonpool_fib/Handle/index.html index 230d07b2..b3dc859a 100644 --- a/moonpool/Moonpool_fib/Handle/index.html +++ b/moonpool/Moonpool_fib/Handle/index.html @@ -1,2 +1,2 @@ -Handle (moonpool.Moonpool_fib.Handle)

Module Moonpool_fib.Handle

The unique name of a fiber

type t = private int

Unique, opaque identifier for a fiber.

val equal : t -> t -> bool
val compare : t -> t -> int
val hash : t -> int
val generate_fresh : unit -> t

Generate a fresh, unique identifier

module Set : Set.S with type elt = t
module Map : Map.S with type key = t
+Handle (moonpool.Moonpool_fib.Handle)

Module Moonpool_fib.Handle

The unique name of a fiber.

Each fiber has a unique handle that can be used to refer to it in maps or sets.

type t = private int

Unique, opaque identifier for a fiber.

val equal : t -> t -> bool
val compare : t -> t -> int
val hash : t -> int
val generate_fresh : unit -> t

Generate a fresh, unique identifier

module Set : Set.S with type elt = t
module Map : Map.S with type key = t
diff --git a/moonpool/Moonpool_fib/Main/index.html b/moonpool/Moonpool_fib/Main/index.html new file mode 100644 index 00000000..f59ef805 --- /dev/null +++ b/moonpool/Moonpool_fib/Main/index.html @@ -0,0 +1,2 @@ + +Main (moonpool.Moonpool_fib.Main)

Module Moonpool_fib.Main

Main thread.

This is evolved from Moonpool.Immediate_runner, but unlike it, this API assumes you run it in a thread (possibly the main thread) which will block until the initial computation is done.

This means it's reasonable to use Main.main (fun () -> do_everything) at the beginning of the program. Other Moonpool pools can be created for background tasks, etc. to do the heavy lifting, and the main thread (inside this immediate runner) can coordinate tasks via Fiber.await.

Aside from the fact that this blocks the caller thread, it is fairly similar to Background_thread in that there's a single worker to process tasks/fibers.

This handles effects, including the ones in Fiber.

  • since NEXT_RELEASE
val main : (Moonpool.Runner.t -> 'a) -> 'a

main f runs f() in a scope that handles effects, including Fiber.await.

This scope can run background tasks as well, in a cooperative fashion.

diff --git a/moonpool/Moonpool_fib/index.html b/moonpool/Moonpool_fib/index.html index b1e9b4c2..902f8a4b 100644 --- a/moonpool/Moonpool_fib/index.html +++ b/moonpool/Moonpool_fib/index.html @@ -1,2 +1,2 @@ -Moonpool_fib (moonpool.Moonpool_fib)

Module Moonpool_fib

module Fiber : sig ... end

Fibers.

module Fls : sig ... end

Fiber-local storage.

module Handle : sig ... end

The unique name of a fiber

+Moonpool_fib (moonpool.Moonpool_fib)

Module Moonpool_fib

Fiber for moonpool

module Fiber : sig ... end

Fibers.

module Fls : sig ... end

Fiber-local storage.

module Handle : sig ... end

The unique name of a fiber.

module Main : sig ... end

Main thread.

include module type of struct include Fiber end
type cancel_callback = Moonpool.Exn_bt.t -> unit

A callback used in case of cancellation

type 'a t = 'a Fiber.{Private_}1.t

A fiber returning a value of type 'a.

val res : 'a t -> 'a Moonpool.Fut.t

Future result of the fiber.

type 'a callback = 'a Moonpool.Exn_bt.result -> unit

Callbacks that are called when a fiber is done.

type any = Fiber.{Private_}1.any =
  1. | Any : _ t -> any

Type erased fiber

val return : 'a -> 'a t
val fail : Moonpool.Exn_bt.t -> _ t
val self : unit -> any

self () is the current fiber. Must be run from inside a fiber.

  • raises Failure

    if not run from inside a fiber.

val peek : 'a t -> 'a Moonpool.Fut.or_error option

Peek inside the future result

val is_done : _ t -> bool

Has the fiber completed?

val is_cancelled : _ t -> bool

Has the fiber completed with a failure?

val is_success : _ t -> bool

Has the fiber completed with a value?

val await : 'a t -> 'a

await fib is like Fut.await (res fib)

val wait_block_exn : 'a t -> 'a

wait_block_exn fib is Fut.wait_block_exn (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.

val wait_block : 'a t -> 'a Moonpool.Fut.or_error

wait_block fib is Fut.wait_block (res fib). NOTE: See Fut.wait_block for warnings about deadlocks.

val check_if_cancelled : unit -> unit

Check if the current fiber is cancelled, in which case this raises. Must be run from inside a fiber.

  • raises Failure

    if not.

val yield : unit -> unit

Yield control to the scheduler from the current fiber.

  • raises Failure

    if not run from inside a fiber.

type cancel_handle = Fiber.cancel_handle

An opaque handle for a single cancel callback in a fiber

val add_on_cancel : _ t -> cancel_callback -> cancel_handle

add_on_cancel fib cb adds cb to the list of cancel callbacks for fib. If fib is already cancelled, cb is called immediately.

val remove_on_cancel : _ t -> cancel_handle -> unit

remove_on_cancel fib h removes the cancel callback associated with handle h.

val with_cancel_callback : _ t -> cancel_callback -> (unit -> 'a) -> 'a

with_cancel_callback fib cb (fun () -> <e>) evaluates e in a scope in which, if the fiber fib is cancelled, cb() is called. If e returns without the fiber being cancelled, this callback is removed.

val with_self_cancel_callback : cancel_callback -> (unit -> 'a) -> 'a

with_self_cancel_callback cb f calls f() in a scope where cb is added to the cancel callbacks of the current fiber

val on_result : 'a t -> 'a callback -> unit

Wait for fiber to be done and call the callback with the result. If the fiber is done already then the callback is invoked immediately with its result.

val spawn_top : on:Moonpool.Runner.t -> (unit -> 'a) -> 'a t

spawn_top ~on f spawns a new (toplevel) fiber onto the given runner. This fiber is not the child of any other fiber: its lifetime is only determined by the lifetime of f().

val spawn : ?protect:bool -> (unit -> 'a) -> 'a t

spawn ~protect f spawns a sub-fiber f_child from a running fiber parent. The sub-fiber f_child is attached to the current fiber and fails if the current fiber parent fails.

  • parameter protect

    if true, when f_child fails, it does not affect parent. If false, f_child failing also causes parent to fail (and therefore all other children of parent). Default is true.

    Must be run from inside a fiber.

  • raises Failure

    if not run from inside a fiber.

val spawn_ignore : ?protect:bool -> (unit -> _) -> unit

spawn_ignore f is ignore (spawn f). The fiber will still affect termination of the parent, ie. the parent will exit only after this new fiber exits.

val main : (Moonpool.Runner.t -> 'a) -> 'a
diff --git a/moonpool/Moonpool_fib__/index.html b/moonpool/Moonpool_fib__/index.html new file mode 100644 index 00000000..3101f692 --- /dev/null +++ b/moonpool/Moonpool_fib__/index.html @@ -0,0 +1,2 @@ + +Moonpool_fib__ (moonpool.Moonpool_fib__)

Module Moonpool_fib__

This module is hidden.

diff --git a/moonpool/Moonpool_fib__Main/index.html b/moonpool/Moonpool_fib__Main/index.html new file mode 100644 index 00000000..4602b8de --- /dev/null +++ b/moonpool/Moonpool_fib__Main/index.html @@ -0,0 +1,2 @@ + +Moonpool_fib__Main (moonpool.Moonpool_fib__Main)

Module Moonpool_fib__Main

This module is hidden.

diff --git a/moonpool/_doc-dir/README.md b/moonpool/_doc-dir/README.md index c51361df..5a37e6a0 100644 --- a/moonpool/_doc-dir/README.md +++ b/moonpool/_doc-dir/README.md @@ -165,6 +165,57 @@ val expected_sum : int = 5050 - : unit = () ``` +### Errors + +We have a `Exn_bt.t` type that comes in handy in many places. It bundles together +an exception and the backtrace associated with the place the exception was caught. + +### Fibers + +On OCaml 5, Moonpool comes with a library `moonpool.fib` (module `Moonpool_fib`) +which provides _lightweight fibers_ +that can run on any Moonpool runner. +These fibers are a sort of lightweight thread, dispatched on the runner's +background thread(s). +Fibers rely on effects to implement `Fiber.await`, suspending themselves until the `await`-ed fiber +is done. + +```ocaml +# #require "moonpool.fib";; + +# (* convenient alias *) + module F = Moonpool_fib;; +module F = Moonpool_fib +# F.main (fun _runner -> + let f1 = F.spawn (fun () -> fib 10) in + let f2 = F.spawn (fun () -> fib 15) in + F.await f1 + F.await f2);; +- : int = 1076 +``` + +Fibers form a _tree_, where a fiber calling `Fiber.spawn` to start a sub-fiber is +the sub-fiber's _parent_. +When a parent fails, all its children are cancelled (forced to fail). +This is a simple form of [Structured Concurrency](https://en.wikipedia.org/wiki/Structured_concurrency). + +Like a future, a fiber eventually _resolves_ into a value (or an `Exn_bt.t`) that it's possible +to `await`. With `Fiber.res : 'a Fiber.t -> 'a Fut.t` it's possible to access that result +as a regular future, too. +However, this resolution is only done after all the children of the fiber have +resolved — the lifetime of fibers forms a well-nested tree in that sense. + +When a fiber is suspended because it `await`s another fiber (or future), the scheduler's +thread on which it was running becomes available again and can go on process another task. +When the fiber resumes, it will automatically be re-scheduled on the same runner it started on. +This means fibers on pool P1 can await fibers from pool P2 and still be resumed on P1. + +In addition to all that, fibers provide _fiber local storage_ (like thread-local storage, but per fiber). +This storage is inherited in `spawn` (as a shallow copy only — it's advisable to only +put persistent data in storage to avoid confusing aliasing). +The storage is convenient for carrying around context for cross-cutting concerns such +as logging or tracing (e.g. a log tag for the current user or request ID, or a tracing +scope). + ### Fork-join On OCaml 5, again using effect handlers, the sublibrary `moonpool.forkjoin` diff --git a/moonpool/index.html b/moonpool/index.html index e594054d..b4548584 100644 --- a/moonpool/index.html +++ b/moonpool/index.html @@ -1,2 +1,2 @@ -index (moonpool.index)

Package moonpool

Package info

changes-files
readme-files
+index (moonpool.index)

Package moonpool

Package info

changes-files
readme-files