diff --git a/backoff/Backoff/index.html b/backoff/Backoff/index.html deleted file mode 100644 index 0e9f94f4..00000000 --- a/backoff/Backoff/index.html +++ /dev/null @@ -1,2 +0,0 @@ - -
BackoffRandomized exponential backoff mechanism.
val create : ?lower_wait_log:int -> ?upper_wait_log:int -> unit -> tcreate creates a backoff value. upper_wait_log, lower_wait_log override the logarithmic upper and lower bound on the number of spins executed by once.
val default : tdefault is equivalent to create ().
once b executes one random wait and returns a new backoff with logarithm of the current maximum value incremented unless it is already at upper_wait_log of b.
Note that this uses the default Stdlib Random per-domain generator.
reset b returns a backoff equivalent to b except with current value set to the lower_wait_log of b.
Backoff Randomized exponential backoff mechanism.Browse by name, by tag, the standard library and the OCaml manual (online, latest version).
Generated for /home/runner/work/moonpool/moonpool/_opam/lib
Browse by name, by tag, the standard library and the OCaml manual (online, latest version).
Generated for /home/runner/work/moonpool/moonpool/_opam/lib
Event.InfixMoonpool_sync.Eventinclude module type of struct include Picos_std_event.Event endtype !'a t = 'a Picos_std_event.Event.tAn event returning a value of type 'a.
val always : 'a -> 'a talways value returns an event that can always be committed to resulting in the given value.
choose events return an event that offers all of the given events and then commits to at most one of them.
wrap event fn returns an event that acts as the given event and then applies the given function to the value in case the event is committed to.
map fn event is equivalent to wrap event fn.
guard thunk returns an event that, when synchronized, calls the thunk, and then behaves like the resulting event.
⚠️ Raising an exception from a guard thunk will result in raising that exception out of the sync. This may result in dropping the result of an event that committed just after the exception was raised. This means that you should treat an unexpected exception raised from sync as a fatal error.
val sync : 'a t -> 'async event synchronizes on the given event.
Synchronizing on an event executes in three phases:
⚠️ sync event does not wait for the canceled concurrent requests to terminate. This means that you should arrange for guaranteed cleanup through other means such as the use of structured concurrency.
val select : 'a t list -> 'aselect events is equivalent to sync (choose events).
ℹ️ The Computation concept of Picos can be seen as a basic single-shot atomic event. This module builds on that concept to provide a composable API to concurrent services exposed through computations.
type 'a request = 'a Picos_std_event.Event.request = {request : 'r. (unit -> 'r) Picos.Computation.t -> ('a -> 'r) -> unit;}Represents a function that requests a concurrent service to update a computation.
ℹ️ The computation passed to a request may be completed by some other event at any point. All primitive requests should be implemented carefully to take that into account. If the computation is completed by some other event, then the request should be considered as canceled, take no effect, and not leak any resources.
⚠️ Raising an exception from a request function will result in raising that exception out of the sync. This may result in dropping the result of an event that committed just after the exception was raised. This means that you should treat an unexpected exception raised from sync as a fatal error. In addition, you should arrange for concurrent services to report unexpected errors independently of the computation being passed to the service.
from_request { request } creates an event from the request function.
val from_computation : 'a Picos.Computation.t -> 'a tfrom_computation source creates an event that can be committed to once the given source computation has completed.
ℹ️ Committing to some other event does not cancel the source computation.
val of_fut : 'a Moonpool.Fut.t -> 'a tmodule Infix : sig ... endMoonpool_sync.LockMutex-protected resource.
This lock is a synchronous concurrency primitive, as a thin wrapper around Mutex that encourages proper management of the critical section in RAII style:
let (let@) = (@@)
-
-
- …
- let compute_foo =
- (* enter critical section *)
- let@ x = Lock.with_ protected_resource in
- use_x;
- return_foo ()
- (* exit critical section *)
- in
- …This lock is based on Picos_sync.Mutex so it is await-safe.
val create : 'a -> 'a tCreate a new protected value.
val with_ : 'a t -> ('a -> 'b) -> 'bwith_ l f runs f x where x is the value protected with the lock l, in a critical section. If f x fails, with_lock l f fails too but the lock is released.
val update : 'a t -> ('a -> 'a) -> unitupdate l f replaces the content x of l with f x, while protected by the mutex.
val update_map : 'a t -> ('a -> 'a * 'b) -> 'bupdate_map l f computes x', y = f (get l), then puts x' in l and returns y, while protected by the mutex.
val mutex : _ t -> Picos_std_sync.Mutex.tUnderlying mutex.
val get : 'a t -> 'aAtomically get the value in the lock. The value that is returned isn't protected!
Moonpool_syncmodule Mutex = Picos_std_sync.Mutexmodule Condition = Picos_std_sync.Conditionmodule Lock : sig ... endMutex-protected resource.
module Event : sig ... endmodule Semaphore = Picos_std_sync.Semaphoremodule Lazy = Picos_std_sync.Lazymodule Latch = Picos_std_sync.Latchmodule Ivar = Picos_std_sync.Ivarmodule Stream = Picos_std_sync.StreamMoonpool_sync__This module is hidden.
Moonpool_sync__EventThis module is hidden.
Moonpool_sync__LockThis module is hidden.
Moonpool MoonpoolMoonpool_dpool Static pool of domains.Moonpool_fib Fibers for moonpool.Moonpool_forkjoin Fork-join primitives.Moonpool_private Moonpool_sync Moonpool MoonpoolMoonpool_dpool Static pool of domains.Moonpool_fib Fibers for moonpool.Moonpool_forkjoin Fork-join primitives.Moonpool_private Multicore_magic.Atomic_arrayArray of (potentially unboxed) atomic locations.
Where available, this uses an undocumented operation exported by the OCaml 5 runtime, caml_atomic_cas_field, which makes it possible to perform sequentially consistent atomic updates of record fields and array elements.
Hopefully a future version of OCaml provides more comprehensive and even more efficient support for both sequentially consistent and relaxed atomic operations on records and arrays.
val make : int -> 'a -> 'a tmake n value creates a new array of n atomic locations having given value.
val of_array : 'a array -> 'a tof_array non_atomic_array create a new array of atomic locations as a copy of the given non_atomic_array.
val init : int -> (int -> 'a) -> 'a tinit n fn is equivalent to of_array (Array.init n fn).
val length : 'a t -> intlength atomic_array returns the length of the atomic_array.
val unsafe_fenceless_get : 'a t -> int -> 'aunsafe_fenceless_get atomic_array index reads and returns the value at the specified index of the atomic_array.
⚠️ The read is relaxed and may be reordered with respect to other reads and writes in program order.
⚠️ No bounds checking is performed.
val unsafe_fenceless_set : 'a t -> int -> 'a -> unitunsafe_fenceless_set atomic_array index value writes the given value to the specified index of the atomic_array.
⚠️ The write is relaxed and may be reordered with respect to other reads and (non-initializing) writes in program order.
⚠️ No bounds checking is performed.
val unsafe_compare_and_set : 'a t -> int -> 'a -> 'a -> boolunsafe_compare_and_set atomic_array index before after atomically updates the specified index of the atomic_array to the after value in case it had the before value and returns a boolean indicating whether that was the case. This operation is sequentially consistent and may not be reordered with respect to other reads and writes in program order.
⚠️ No bounds checking is performed.
Multicore_magic.Transparent_atomicA replacement for Stdlib.Atomic with fixes and performance improvements
Stdlib.Atomic.get is incorrectly subject to CSE optimization in OCaml 5.0.0 and 5.1.0. This can result in code being generated that can produce results that cannot be explained with the OCaml memory model. It can also sometimes result in code being generated where a manual optimization to avoid writing to memory is defeated by the compiler as the compiler eliminates a (repeated) read access. This module implements get such that argument to Stdlib.Atomic.get is passed through Sys.opaque_identity, which prevents the compiler from applying the CSE optimization.
OCaml 5 generates inefficient accesses of 'a Stdlib.Atomic.t arrays assuming that the array might be an array of floating point numbers. That is because the Stdlib.Atomic.t type constructor is opaque, which means that the compiler cannot assume that _ Stdlib.Atomic.t is not the same as float. This module defines the type as private 'a ref, which allows the compiler to know that it cannot be the same as float, which allows the compiler to generate more efficient array accesses. This can both improve performance and reduce size of generated code when using arrays of atomics.
type !'a t = private 'a refval make : 'a -> 'a tval make_contended : 'a -> 'a tval get : 'a t -> 'aval fenceless_get : 'a t -> 'aval set : 'a t -> 'a -> unitval fenceless_set : 'a t -> 'a -> unitval exchange : 'a t -> 'a -> 'aval compare_and_set : 'a t -> 'a -> 'a -> boolval fetch_and_add : int t -> int -> intval incr : int t -> unitval decr : int t -> unitMulticore_magicThis is a library of magic multicore utilities intended for experts for extracting the best possible performance from multicore OCaml.
Hopefully future releases of multicore OCaml will make this library obsolete!
Depending on the object, either creates a shallow clone of it or returns it as is. When cloned, the clone will have extra padding words added after the last used word.
This is designed to help avoid false sharing. False sharing has a negative impact on multicore performance. Accesses of both atomic and non-atomic locations, whether read-only or read-write, may suffer from false sharing.
The intended use case for this is to pad all long lived objects that are being accessed highly frequently (read or written).
Many kinds of objects can be padded, for example:
let padded_atomic = Multicore_magic.copy_as_padded (Atomic.make 101)
- let padded_ref = Multicore_magic.copy_as_padded (ref 42)
-
- let padded_record =
- Multicore_magic.copy_as_padded { number = 76; pointer = [ 1; 2; 3 ] }
-
- let padded_variant = Multicore_magic.copy_as_padded (Some 1)Padding changes the length of an array. If you need to pad an array, use make_padded_array.
copy_as x by default simply returns x. When ~padded:true is explicitly specified, returns copy_as_padded x.
Creates a padded array. The length of the returned array includes padding. Use length_of_padded_array to get the unpadded length.
Returns the length of an array created by make_padded_array without the padding.
WARNING: This is not guaranteed to work with copy_as_padded.
Returns the length of an array created by make_padded_array without the padding minus 1.
WARNING: This is not guaranteed to work with copy_as_padded.
Atomic operationsval fenceless_get : 'a Stdlib.Atomic.t -> 'aGet a value from the atomic without performing an acquire fence.
Consider the following prototypical example of a lock-free algorithm:
let rec prototypical_lock_free_algorithm () =
- let expected = Atomic.get atomic in
- let desired = (* computed from expected *) in
- if not (Atomic.compare_and_set atomic expected desired) then
- (* failure, maybe retry *)
- else
- (* success *)A potential performance problem with the above example is that it performs two acquire fences. Both the Atomic.get and the Atomic.compare_and_set perform an acquire fence. This may have a negative impact on performance.
Assuming the first fence is not necessary, we can rewrite the example using fenceless_get as follows:
let rec prototypical_lock_free_algorithm () =
- let expected = Multicore_magic.fenceless_get atomic in
- let desired = (* computed from expected *) in
- if not (Atomic.compare_and_set atomic expected desired) then
- (* failure, maybe retry *)
- else
- (* success *)Now only a single acquire fence is performed by Atomic.compare_and_set and performance may be improved.
val fenceless_set : 'a Stdlib.Atomic.t -> 'a -> unitSet the value of an atomic without performing a full fence.
Consider the following example:
let new_atomic = Atomic.make dummy_value in
- (* prepare data_structure referring to new_atomic *)
- Atomic.set new_atomic data_structure;
- (* publish the data_structure: *)
- Atomic.exchance old_atomic data_structureA potential performance problem with the above example is that it performs two full fences. Both the Atomic.set used to initialize the data structure and the Atomic.exchange used to publish the data structure perform a full fence. The same would also apply in cases where Atomic.compare_and_set or Atomic.set would be used to publish the data structure. This may have a negative impact on performance.
Using fenceless_set we can rewrite the example as follows:
let new_atomic = Atomic.make dummy_value in
- (* prepare data_structure referring to new_atomic *)
- Multicore_magic.fenceless_set new_atomic data_structure;
- (* publish the data_structure: *)
- Atomic.exchance old_atomic data_structureNow only a single full fence is performed by Atomic.exchange and performance may be improved.
val fence : int Stdlib.Atomic.t -> unitPerform a full acquire-release fence on the given atomic.
fence atomic is equivalent to ignore (Atomic.fetch_and_add atomic 0).
module Transparent_atomic : sig ... endA replacement for Stdlib.Atomic with fixes and performance improvements
module Atomic_array : sig ... endArray of (potentially unboxed) atomic locations.
instantaneous_domain_index () potentially (re)allocates and returns a non-negative integer "index" for the current domain. The indices are guaranteed to be unique among the domains that exist at a point in time. Each call of instantaneous_domain_index () may return a different index.
The intention is that the returned value can be used as an index into a contention avoiding parallelism safe data structure. For example, a naïve scalable increment of one counter from an array of counters could be done as follows:
let incr counters =
- (* Assuming length of [counters] is a power of two and larger than
- the number of domains. *)
- let mask = Array.length counters - 1 in
- let index = instantaneous_domain_index () in
- Atomic.incr counters.(index land mask)The implementation ensures that the indices are allocated as densely as possible at any given moment. This should allow allocating as many counters as needed and essentially eliminate contention.
On OCaml 4 instantaneous_domain_index () will always return 0.
Multicore_magic__This module is hidden.
Multicore_magic__CacheThis module is hidden.
Multicore_magic__IndexThis module is hidden.
Multicore_magic__PaddingThis module is hidden.
Multicore_magic__Transparent_atomicThis module is hidden.
Multicore_magic This is a library of magic multicore utilities intended for experts for extracting the best possible performance from multicore OCaml.Awaitable.AwaiterLow level interface for more flexible waiting.
type 'a awaitable := 'a tAn erased type alias for Awaitable.t.
Represents a single use awaiter of a signal to an awaitable.
val add : 'a awaitable -> Picos.Trigger.t -> tadd awaitable trigger creates a single use awaiter, adds it to the FIFO associated with the awaitable, and returns the awaiter.
val remove : t -> unitremove awaiter marks the awaiter as having been signaled and removes it from the FIFO associated with the awaitable.
ℹ️ If the associated trigger is used with only one awaiter and the Trigger.awaitawait on the trigger returns None, there is no need to explicitly remove the awaiter, because it has already been removed.
Picos_std_awaitable.AwaitableAn awaitable atomic location.
This module provides a superset of the Stdlib Atomic API with more or less identical performance. The main difference is that a non-padded awaitable location takes an extra word of memory. Additionally a futex-like API provides the ability to await until an awaitable location is explicitly signaled to potentially have a different value.
Awaitable locations can be used to implement many kinds of synchronization and communication primitives.
val make : ?padded:bool -> 'a -> 'a tmake initial creates a new awaitable atomic location with the given initial value.
val make_contended : 'a -> 'a tmake_contended initial is equivalent to make ~padded:true initial.
val get : 'a t -> 'aget awaitable is essentially equivalent to Atomic.get awaitable.
val compare_and_set : 'a t -> 'a -> 'a -> boolcompare_and_set awaitable before after is essentially equivalent to Atomic.compare_and_set awaitable before after.
val exchange : 'a t -> 'a -> 'aexchange awaitable after is essentially equivalent to Atomic.exchange awaitable after.
val set : 'a t -> 'a -> unitset awaitable value is equivalent to exchange awaitable value |> ignore.
val fetch_and_add : int t -> int -> intfetch_and_add awaitable delta is essentially equivalent to Atomic.fetch_and_add awaitable delta.
val incr : int t -> unitincr awaitable is equivalent to fetch_and_add awaitable (+1) |> ignore.
val decr : int t -> unitincr awaitable is equivalent to fetch_and_add awaitable (-1) |> ignore.
val signal : 'a t -> unitsignal awaitable tries to wake up one fiber awaitin on the awaitable location.
🐌 Generally speaking one should avoid calling signal too frequently, because the queue of awaiters is stored separately from the awaitable location and it takes a bit of effort to locate it. For example, calling signal every time a value is added to an empty data structure might not be optimal. In many cases it is faster to explicitly mark the potential presence of awaiters in the data structure and avoid calling signal when it is definitely known that there are no awaiters.
val broadcast : 'a t -> unitbroadcast awaitable tries to wake up all fibers awaiting on the awaitable location.
🐌 The same advice as with signal applies to broadcast. In addition, it is typically a good idea to avoid potentially waking up large numbers of fibers as it can easily lead to the thundering herd phenomana.
val await : 'a t -> 'a -> unitawait awaitable before suspends the current fiber until the awaitable is explicitly signaled and has a value other than before.
⚠️ This operation is subject to the ABA problem. An await for value other than A may not return after the awaitable is signaled while having the value B, because at a later point the awaitable has again the value A. Furthermore, by the time an await for value other than A returns, the awaitable might already again have the value A.
⚠️ Atomic operations that change the value of an awaitable do not implicitly wake up awaiters.
module Awaiter : sig ... endLow level interface for more flexible waiting.
Picos_std_awaitablemodule Awaitable : sig ... endAn awaitable atomic location.
We first open the library to bring the Awaitable module into scope:
# open Picos_std_awaitableMutexHere is a basic mutex implementation using awaitables:
module Mutex = struct
- type t = int Awaitable.t
-
- let create ?padded () = Awaitable.make ?padded 0
-
- let lock t =
- if not (Awaitable.compare_and_set t 0 1) then
- while Awaitable.exchange t 2 <> 0 do
- Awaitable.await t 2
- done
-
- let unlock t =
- let before = Awaitable.fetch_and_add t (-1) in
- if before = 2 then begin
- Awaitable.set t 0;
- Awaitable.signal t
- end
- endThe above mutex outperforms most other mutexes under both no/low and high contention scenarios. In no/low contention scenarios the use of fetch_and_add provides low overhead. In high contention scenarios the above mutex allows unfairness, which avoids performance degradation due to the lock convoy phenomena.
ConditionLet's also implement a condition variable. For that we'll also make use of low level abstractions and operations from the Picos core library:
# open PicosTo implement a condition variable, we'll use the Awaiter API:
module Condition = struct
- type t = unit Awaitable.t
-
- let create () = Awaitable.make ()
-
- let wait t mutex =
- let trigger = Trigger.create () in
- let awaiter = Awaitable.Awaiter.add t trigger in
- Mutex.unlock mutex;
- let lock_forbidden mutex =
- let fiber = Fiber.current () in
- let forbid = Fiber.exchange fiber ~forbid:true in
- Mutex.lock mutex;
- Fiber.set fiber ~forbid
- in
- match Trigger.await trigger with
- | None -> lock_forbidden mutex
- | Some exn_bt ->
- Awaitable.Awaiter.remove awaiter;
- lock_forbidden mutex;
- Printexc.raise_with_backtrace (fst exn_bt) (snd exn_bt)
-
- let signal = Awaitable.signal
- let broadcast = Awaitable.broadcast
- endNotice that the awaitable location used in the above condition variable implementation is never mutated. We just reuse the signaling mechanism of awaitables.
Picos_std_event.EventFirst-class synchronous communication abstraction.
Events describe a thing that might happen in the future, or a concurrent offer or request that might be accepted or succeed, but is cancelable if some other event happens first.
See the Picos_io_select library for an example.
ℹ️ This module intentionally mimics the Event module provided by the OCaml POSIX threads library.
val always : 'a -> 'a talways value returns an event that can always be committed to resulting in the given value.
choose events return an event that offers all of the given events and then commits to at most one of them.
wrap event fn returns an event that acts as the given event and then applies the given function to the value in case the event is committed to.
map fn event is equivalent to wrap event fn.
guard thunk returns an event that, when synchronized, calls the thunk, and then behaves like the resulting event.
⚠️ Raising an exception from a guard thunk will result in raising that exception out of the sync. This may result in dropping the result of an event that committed just after the exception was raised. This means that you should treat an unexpected exception raised from sync as a fatal error.
val sync : 'a t -> 'async event synchronizes on the given event.
Synchronizing on an event executes in three phases:
⚠️ sync event does not wait for the canceled concurrent requests to terminate. This means that you should arrange for guaranteed cleanup through other means such as the use of structured concurrency.
val select : 'a t list -> 'aselect events is equivalent to sync (choose events).
ℹ️ The Computation concept of Picos can be seen as a basic single-shot atomic event. This module builds on that concept to provide a composable API to concurrent services exposed through computations.
Represents a function that requests a concurrent service to update a computation.
ℹ️ The computation passed to a request may be completed by some other event at any point. All primitive requests should be implemented carefully to take that into account. If the computation is completed by some other event, then the request should be considered as canceled, take no effect, and not leak any resources.
⚠️ Raising an exception from a request function will result in raising that exception out of the sync. This may result in dropping the result of an event that committed just after the exception was raised. This means that you should treat an unexpected exception raised from sync as a fatal error. In addition, you should arrange for concurrent services to report unexpected errors independently of the computation being passed to the service.
from_request { request } creates an event from the request function.
val from_computation : 'a Picos.Computation.t -> 'a tfrom_computation source creates an event that can be committed to once the given source computation has completed.
ℹ️ Committing to some other event does not cancel the source computation.
Picos_std_eventBasic event abstraction for Picos.
module Event : sig ... endFirst-class synchronous communication abstraction.
Picos_std_event__This module is hidden.
Picos_std_event__EventThis module is hidden.
Picos_std_finallySyntax for avoiding resource leaks for Picos.
A resource is something that is acquired and must be released after it is no longer needed.
⚠️ Beware that the Stdlib Fun.protect ~finally helper does not protect against cancelation propagation when it calls finally (). This means that cancelable operations performed by finally may be terminated and resources might be leaked. So, if you want to avoid resource leaks, you should either use lastly or explicitly protect against cancelation propagation.
We open both this library and a few other libraries
open Picos_io
- open Picos_std_finally
- open Picos_std_structured
- open Picos_std_syncfor the examples.
let@ resource = template in scope is equivalent to template (fun resource -> scope).
ℹ️ You can use this binding operator with any template function that has a type of the form ('r -> 'a) -> 'a.
finally release acquire scope calls acquire () to obtain a resource, calls scope resource, and then calls release resource after the scope exits.
ℹ️ Cancelation propagation will be forbidden during the call of release.
lastly action scope is equivalent to finally action Fun.id scope.
ℹ️ Cancelation propagation will be forbidden during the call of action.
Either contains a resource or is empty as the resource has been transferred, dropped, or has been borrowed temporarily.
val instantiate : ('r -> unit) -> (unit -> 'r) -> ('r instance -> 'a) -> 'ainstantiate release acquire scope calls acquire () to obtain a resource and stores it as an instance, calls scope instance. Then, if scope returns normally, awaits until the instance becomes empty. In case scope raises an exception or the fiber is canceled, the instance will be dropped.
ℹ️ Cancelation propagation will be forbidden during the call of release.
val drop : 'r instance -> unitdrop instance releases the resource, if any, contained by the instance.
val borrow : 'r instance -> ('r -> 'a) -> 'aborrow instance scope borrows the resource stored in the instance, calls scope resource, and then returns the resource to the instance after scope exits.
transfer source transfers the resource stored in the source instance into a new target instance, calls scope target. Then, if scope returns normally, awaits until the target instance becomes empty. In case scope raises an exception or the fiber is canceled, the target instance will be dropped.
val move : 'r instance -> ('r -> 'a) -> 'amove instance scope is equivalent to transfer instance (fun instance -> borrow instance scope).
Here is a sketch of a server that recursively forks a fiber to accept and handle a client:
let recursive_server server_fd =
- Flock.join_after @@ fun () ->
-
- (* recursive server *)
- let rec accept () =
- let@ client_fd =
- finally Unix.close @@ fun () ->
- Unix.accept ~cloexec:true server_fd
- |> fst
- in
-
- (* fork to accept other clients *)
- Flock.fork accept;
-
- (* handle this client... omitted *)
- ()
- in
- Flock.fork acceptThere is also a way to move instantiated resources to allow forking fibers to handle clients without leaks.
Here is a sketch of a server that accepts in a loop and forks fibers to handle clients:
let looping_server server_fd =
- Flock.join_after @@ fun () ->
-
- (* loop to accept clients *)
- while true do
- let@ client_fd =
- instantiate Unix.close @@ fun () ->
- Unix.accept ~cloexec:true server_fd
- |> fst
- in
-
- (* fork to handle this client *)
- Flock.fork @@ fun () ->
- let@ client_fd = move client_fd in
-
- (* handle client... omitted *)
- ()
- doneYou can move an instantiated resource between any two fibers and borrow it before moving it. For example, you can create a resource in a child fiber, use it there, and then move it to the parent fiber:
let move_from_child_to_parent () =
- Flock.join_after @@ fun () ->
-
- (* for communicating a resource *)
- let shared_ivar = Ivar.create () in
-
- (* fork a child that creates a resource *)
- Flock.fork begin fun () ->
- let pretend_release () = ()
- and pretend_acquire () = () in
-
- (* allocate a resource *)
- let@ instance =
- instantiate pretend_release pretend_acquire
- in
-
- begin
- (* borrow the resource *)
- let@ resource = borrow instance in
-
- (* use the resource... omitted *)
- ()
- end;
-
- (* send the resource to the parent *)
- Ivar.fill shared_ivar instance
- end;
-
- (* await for a resource from the child and own it *)
- let@ resource = Ivar.read shared_ivar |> move in
-
- (* use the resource... omitted *)
- ()The above uses an Ivar to communicate the movable resource from the child fiber to the parent fiber. Any concurrency safe mechanism could be used.
Picos_std_structured.BundleAn explicit dynamic bundle of fibers guaranteed to be joined at the end.
Bundles allow you to conveniently structure or delimit concurrency into nested scopes. After a bundle returns or raises an exception, no fibers forked to the bundle remain.
An unhandled exception, or error, within any fiber of the bundle causes all of the fibers forked to the bundle to be canceled and the bundle to raise the error exception or error exceptions raised by all of the fibers forked into the bundle.
val join_after :
- ?callstack:int ->
- ?on_return:[ `Terminate | `Wait ] ->
- (t -> 'a) ->
- 'ajoin_after scope calls scope with a bundle. A call of join_after returns or raises only after scope has returned or raised and all forked fibers have terminated. If scope raises an exception, error will be called.
The optional on_return argument specifies what to do when the scope returns normally. It defaults to `Wait, which means to just wait for all the fibers to terminate on their own. When explicitly specified as ~on_return:`Terminate, then terminate ?callstack will be called on return. This can be convenient, for example, when dealing with daemon fibers.
val terminate : ?callstack:int -> t -> unitterminate bundle cancels all of the forked fibers using the Terminate exception. After terminate has been called, no new fibers can be forked to the bundle.
The optional callstack argument specifies the number of callstack entries to capture with the Terminate exception. The default is 0.
ℹ️ Calling terminate at the end of a bundle can be a convenient way to cancel any background fibers started by the bundle.
ℹ️ Calling terminate does not raise the Terminate exception, but blocking operations after terminate will raise the exception to propagate cancelation unless propagation of cancelation is forbidden.
val terminate_after : ?callstack:int -> t -> seconds:float -> unitterminate_after ~seconds bundle arranges to terminate the bundle after the specified timeout in seconds.
val error : ?callstack:int -> t -> exn -> Stdlib.Printexc.raw_backtrace -> unitval fork : t -> (unit -> unit) -> unitfork bundle action is equivalent to fork_as_promise bundle action |> ignore.
Picos_std_structured.ControlBasic control operations and exceptions for structured concurrency.
An exception that is used to signal fibers, typically by canceling them, that they should terminate by letting the exception propagate.
ℹ️ Within this library, the Terminate exception does not, by itself, indicate an error. Raising it inside a fiber forked within the structured concurrency constructs of this library simply causes the relevant part of the tree of fibers to be terminated.
⚠️ If Terminate is raised in the main fiber of a Bundle, and no other exceptions are raised within any fiber inside the bundle, the bundle will then, of course, raise the Terminate exception after all of the fibers have been terminated.
exception Errors of (exn * Stdlib.Printexc.raw_backtrace) listAn exception that can be used to collect exceptions, typically indicating errors, from multiple fibers.
ℹ️ The Terminate exception is not considered an error within this library and the structuring constructs do not include it in the list of Errors.
raise_if_canceled () checks whether the current fiber has been canceled and if so raises the exception that the fiber was canceled with.
ℹ️ Within this library fibers are canceled using the Terminate exception.
sleep ~seconds suspends the current fiber for the specified number of seconds.
protect thunk forbids propagation of cancelation for the duration of thunk ().
ℹ️ Many operations are cancelable. In particular, anything that might suspend the current fiber to await for something should typically be cancelable. Operations that release resources may sometimes also be cancelable and calls of such operations should typically be protected to ensure that resources will be properly released. Forbidding propagation of cancelation may also be required when a sequence of cancelable operations must be performed.
ℹ️ With the constructs provided by this library it is not possible to prevent a fiber from being canceled, but it is possible for a fiber to forbid the scheduler from propagating cancelation to the fiber.
block () suspends the current fiber until it is canceled at which point the cancelation exception will be raised.
terminate_after ~seconds thunk arranges to terminate the execution of thunk on the current fiber after the specified timeout in seconds.
Using terminate_after one can attempt any blocking operation that supports cancelation with a timeout. For example, one could try to read an Ivar with a timeout
let peek_in ~seconds ivar =
- match
- Control.terminate_after ~seconds @@ fun () ->
- Ivar.read ivar
- with
- | value -> Some value
- | exception Control.Terminate -> Noneor one could try to connect a socket with a timeout
let try_connect_in ~seconds socket sockaddr =
- match
- Control.terminate_after ~seconds @@ fun () ->
- Unix.connect socket sockaddr
- with
- | () -> true
- | exception Control.Terminate -> falseusing the Picos_io.Unix module.
The optional callstack argument specifies the number of callstack entries to capture with the Terminate exception. The default is 0.
As an example, terminate_after could be implemented using Bundle as follows:
let terminate_after ?callstack ~seconds thunk =
- Bundle.join_after @@ fun bundle ->
- Bundle.terminate_after ?callstack ~seconds bundle;
- thunk ()Picos_std_structured.FlockAn implicit dynamic flock of fibers guaranteed to be joined at the end.
Flocks allow you to conveniently structure or delimit concurrency into nested scopes. After a flock returns or raises an exception, no fibers forked to the flock remain.
An unhandled exception, or error, within any fiber of the flock causes all of the fibers forked to the flock to be canceled and the flock to raise the error exception or error exceptions raised by all of the fibers forked into the flock.
ℹ️ This is essentially a very thin convenience wrapper for an implicitly propagated Bundle.
⚠️ All of the operations in this module, except join_after, raise the Invalid_argument exception in case they are called from outside of the dynamic multifiber scope of a flock established by calling join_after.
join_after scope creates a new flock for fibers, calls scope after setting current flock to the new flock, and restores the previous flock, if any after scope exits. The flock will be implicitly propagated to all fibers forked into the flock. A call of join_after returns or raises only after scope has returned or raised and all forked fibers have terminated. If scope raises an exception, error will be called.
The optional on_return argument specifies what to do when the scope returns normally. It defaults to `Wait, which means to just wait for all the fibers to terminate on their own. When explicitly specified as ~on_return:`Terminate, then terminate ?callstack will be called on return. This can be convenient, for example, when dealing with daemon fibers.
terminate () cancels all of the forked fibers using the Terminate exception. After terminate has been called, no new fibers can be forked to the current flock.
The optional callstack argument specifies the number of callstack entries to capture with the Terminate exception. The default is 0.
ℹ️ Calling terminate at the end of a flock can be a convenient way to cancel any background fibers started by the flock.
ℹ️ Calling terminate does not raise the Terminate exception, but blocking operations after terminate will raise the exception to propagate cancelation unless propagation of cancelation is forbidden.
terminate_after ~seconds () arranges to terminate the current flock after the specified timeout in seconds.
val error : ?callstack:int -> exn -> Stdlib.Printexc.raw_backtrace -> unitval fork_as_promise : (unit -> 'a) -> 'a Promise.tfork action is equivalent to fork_as_promise action |> ignore.
Picos_std_structured.PromiseA cancelable promise.
ℹ️ In addition to using a promise to concurrently compute and return a value, a cancelable promise can also represent a concurrent fiber that will continue until it is explicitly canceled.
⚠️ Canceling a promise does not immediately terminate the fiber or wait for the fiber working to complete the promise to terminate. Constructs like Bundle.join_after and Flock.join_after only guarantee that all fibers forked within their scope have terminated before they return or raise. The reason for this design choice in this library is that synchronization is expensive and delaying synchronization to the join operation is typically sufficient and amortizes the cost.
val of_value : 'a -> 'a tval await : 'a t -> 'aawait promise awaits until the promise has completed and either returns the value that the evaluation of the promise returned, raises the exception that the evaluation of the promise raised, or raises the Terminate exception in case the promise has been canceled.
⚠️ The fiber corresponding to a canceled promise is not guaranteed to have terminated at the point await raises.
val completed : 'a t -> 'a Picos_std_event.Event.tcompleted promise returns an event that can be committed to once the promise has completed.
val is_running : 'a t -> boolis_running promise determines whether the completion of the promise is still pending.
val try_terminate : ?callstack:int -> 'a t -> booltry_terminate promise tries to terminate the promise by canceling it with the Terminate exception and returns true in case of success and false in case the promise had already completed, i.e. either returned, raised, or canceled.
The optional callstack argument specifies the number of callstack entries to capture with the Terminate exception. The default is 0.
val terminate : ?callstack:int -> 'a t -> unitterminate promise is equivalent to try_terminate promise |> ignore.
val terminate_after : ?callstack:int -> 'a t -> seconds:float -> unitPicos_std_structured.RunOperations for running fibers in specific patterns.
all actions starts the actions as separate fibers and waits until they all complete or one of them raises an unhandled exception other than Terminate, which is not counted as an error, after which the remaining fibers will be canceled.
⚠️ One of actions may be run on the current fiber.
⚠️ It is not guaranteed that any of the actions in the list are called. In particular, after any action raises an unhandled exception or after the main fiber is canceled, the actions that have not yet started may be skipped entirely.
all is roughly equivalent to
let all actions =
- Bundle.join_after @@ fun bundle ->
- List.iter (Bundle.fork bundle) actionsbut treats the list of actions as a single computation.
any actions starts the actions as separate fibers and waits until one of them completes or raises an unhandled exception other than Terminate, which is not counted as an error, after which the rest of the started fibers will be canceled.
⚠️ One of actions may be run on the current fiber.
⚠️ It is not guaranteed that any of the actions in the list are called. In particular, after the first action returns successfully or after any action raises an unhandled exception or after the main fiber is canceled, the actions that have not yet started may be skipped entirely.
any is roughly equivalent to
let any actions =
- Bundle.join_after @@ fun bundle ->
- try
- actions
- |> List.iter @@ fun action ->
- Bundle.fork bundle @@ fun () ->
- action ();
- Bundle.terminate bundle
- with Control.Terminate -> ()but treats the list of actions as a single computation.
Picos_std_structuredBasic structured concurrency primitives for Picos.
This library essentially provides one application programming interface for structuring fibers with any Picos compatible scheduler.
For the examples we open some modules:
open Picos_io
- open Picos_std_event
- open Picos_std_finally
- open Picos_std_structured
- open Picos_std_syncmodule Control : sig ... endBasic control operations and exceptions for structured concurrency.
module Promise : sig ... endA cancelable promise.
module Bundle : sig ... endAn explicit dynamic bundle of fibers guaranteed to be joined at the end.
module Flock : sig ... endAn implicit dynamic flock of fibers guaranteed to be joined at the end.
module Run : sig ... endOperations for running fibers in specific patterns.
Consider the following program:
let main () =
- Flock.join_after begin fun () ->
- let promise =
- Flock.fork_as_promise @@ fun () ->
- Control.block ()
- in
-
- Flock.fork begin fun () ->
- Promise.await promise
- end;
-
- Flock.fork begin fun () ->
- let condition = Condition.create ()
- and mutex = Mutex.create () in
- Mutex.protect mutex begin fun () ->
- while true do
- Condition.wait condition mutex
- done
- end
- end;
-
- Flock.fork begin fun () ->
- let sem =
- Semaphore.Binary.make false
- in
- Semaphore.Binary.acquire sem
- end;
-
- Flock.fork begin fun () ->
- let sem =
- Semaphore.Counting.make 0
- in
- Semaphore.Counting.acquire sem
- end;
-
- Flock.fork begin fun () ->
- Event.sync (Event.choose [])
- end;
-
- Flock.fork begin fun () ->
- let latch = Latch.create 1 in
- Latch.await latch
- end;
-
- Flock.fork begin fun () ->
- let ivar = Ivar.create () in
- Ivar.read ivar
- end;
-
- Flock.fork begin fun () ->
- let stream = Stream.create () in
- Stream.read (Stream.tap stream)
- |> ignore
- end;
-
- Flock.fork begin fun () ->
- let@ inn, out = finally
- Unix.close_pair @@ fun () ->
- Unix.socketpair ~cloexec:true
- PF_UNIX SOCK_STREAM 0
- in
- Unix.set_nonblock inn;
- let n =
- Unix.read inn (Bytes.create 1)
- 0 1
- in
- assert (n = 1)
- end;
-
- Flock.fork begin fun () ->
- let a_month =
- 60.0 *. 60.0 *. 24.0 *. 30.0
- in
- Control.sleep ~seconds:a_month
- end;
-
- (* Let the children get stuck *)
- Control.sleep ~seconds:0.1;
-
- Flock.terminate ()
- endFirst of all, note that above the Mutex, Condition, and Semaphore modules come from the Picos_std_sync library and the Unix module comes from the Picos_io library. They do not come from the standard OCaml libraries.
The above program creates a flock of fibers and forks several fibers to the flock that all block in various ways. In detail,
Control.block never returns,Promise.await never returns as the promise won't be completed,Condition.wait never returns, because the condition is never signaled,Semaphore.Binary.acquire and Semaphore.Counting.acquire never return, because the counts of the semaphores never change from 0,Event.sync never returns, because the event can never be committed to,Latch.await never returns, because the count of the latch never reaches 0,Ivar.read never returns, because the incremental variable is never filled,Stream.read never returns, because the stream is never pushed to,Unix.read never returns, because the socket is never written to, and theControl.sleep call would return only after about a month.Fibers forked to a flock can be canceled in various ways. In the above program we call Flock.terminate to cancel all of the fibers and effectively close the flock. This allows the program to return normally immediately and without leaking or leaving anything in an invalid state:
# Picos_mux_random.run_on ~n_domains:2 main
- - : unit = ()Now, the point of the above example isn't that you should just call terminate when your program gets stuck. 😅
What the above example hopefully demonstrates is that concurrent abstractions like mutexes and condition variables, asynchronous IO libraries, and others can be designed to support cancelation.
Cancelation is a signaling mechanism that allows structured concurrent abstractions, like the Flock abstraction, to (hopefully) gracefully tear down concurrent fibers in case of errors. Indeed, one of the basic ideas behind the Flock abstraction is that in case any fiber forked to the flock raises an unhandled exception, the whole flock will be terminated and the error will raised from the flock, which allows you to understand what went wrong, instead of having to debug a program that mysteriously gets stuck, for example.
Cancelation can also, with some care, be used as a mechanism to terminate fibers once they are no longer needed. However, just like sleep, for example, cancelation is inherently prone to races, i.e. it is difficult to understand the exact point and state at which a fiber gets canceled and it is usually non-deterministic, and therefore cancelation is not recommended for use as a general synchronization or communication mechanism.
Consider the following program:
let many_errors () =
- Flock.join_after @@ fun () ->
-
- let latch = Latch.create 1 in
-
- let fork_raising exn =
- Flock.fork begin fun () ->
- Control.protect begin fun () ->
- Latch.await latch
- end;
- raise exn
- end
- in
-
- fork_raising Exit;
- fork_raising Not_found;
- fork_raising Control.Terminate;
-
- Latch.decr latchThe above program starts three fibers and uses a latch to ensure that all of them have been started, before two of them raise errors and the third raises Terminate, which is not considered an error in this library. Running the program
# Picos_mux_fifo.run many_errors
- Exception: Errors[Stdlib.Exit; Not_found]raises a collection of all of the errors.
Let's build a simple TCP echo server and run it with some clients.
We first define a function for the server:
let run_server server_fd =
- Flock.join_after begin fun () ->
- while true do
- let@ client_fd =
- instantiate Unix.close @@ fun () ->
- Unix.accept
- ~cloexec:true server_fd |> fst
- in
-
- (* Fork a fiber for client *)
- Flock.fork begin fun () ->
- let@ client_fd =
- move client_fd
- in
- Unix.set_nonblock client_fd;
-
- let bs = Bytes.create 100 in
- let n =
- Unix.read client_fd bs 0
- (Bytes.length bs)
- in
- Unix.write client_fd bs 0 n
- |> ignore
- end
- done
- endThe server function expects a listening socket. For each accepted client the server forks a new fiber to handle it. The client socket is moved from the server fiber to the client fiber to avoid leaks and to ensure that the socket will be closed.
Let's then define a function for the clients:
let run_client server_addr =
- let@ socket =
- finally Unix.close @@ fun () ->
- Unix.socket ~cloexec:true
- PF_INET SOCK_STREAM 0
- in
- Unix.set_nonblock socket;
- Unix.connect socket server_addr;
-
- let msg = "Hello!" in
- Unix.write_substring
- socket msg 0 (String.length msg)
- |> ignore;
-
- let bytes =
- Bytes.create (String.length msg)
- in
- let n =
- Unix.read socket bytes 0
- (Bytes.length bytes)
- in
-
- Printf.printf "Received: %s\n%!"
- (Bytes.sub_string bytes 0 n)The client function takes the address of the server and connects a socket to the server address. It then writes a message to the server and reads a reply from the server and prints it.
Here is the main program:
let main () =
- let@ server_fd =
- finally Unix.close @@ fun () ->
- Unix.socket ~cloexec:true
- PF_INET SOCK_STREAM 0
- in
- Unix.set_nonblock server_fd;
- (* Let system determine the port *)
- Unix.bind server_fd Unix.(
- ADDR_INET(inet_addr_loopback, 0));
- Unix.listen server_fd 8;
-
- let server_addr =
- Unix.getsockname server_fd
- in
-
- Flock.join_after ~on_return:`Terminate begin fun () ->
- (* Start server *)
- Flock.fork begin fun () ->
- run_server server_fd
- end;
-
- (* Run clients concurrently *)
- Flock.join_after begin fun () ->
- for _ = 1 to 5 do
- Flock.fork @@ fun () ->
- run_client server_addr
- done
- end
- endThe main program creates a socket for the server and configures it. The server is then started as a fiber in a flock terminated on return. Then the clients are started to run concurrently in an inner flock.
Finally we run the main program with a scheduler:
# Picos_mux_random.run_on ~n_domains:1 main
- Received: Hello!
- Received: Hello!
- Received: Hello!
- Received: Hello!
- Received: Hello!
- - : unit = ()As an exercise, you might want to refactor the server to avoid moving the file descriptors and use a recursive accept loop instead. You could also terminate the whole flock at the end instead of just terminating the server.
Picos_std_structured__This module is hidden.
Picos_std_structured__BundleThis module is hidden.
Picos_std_structured__ControlThis module is hidden.
Picos_std_structured__FlockThis module is hidden.
Picos_std_structured__PromiseThis module is hidden.
Picos_std_structured__RunThis module is hidden.
Picos_std_sync.ConditionA condition variable.
ℹ️ This intentionally mimics the interface of Stdlib.Condition. Unlike with the standard library condition variable, blocking on this condition variable allows an effects based scheduler to run other fibers on the thread.
val create : ?padded:bool -> unit -> tcreate () return a new condition variable.
wait condition unlocks the mutex, waits for the condition, and locks the mutex before returning or raising due to the operation being canceled.
ℹ️ If the fiber has been canceled and propagation of cancelation is allowed, this may raise the cancelation exception.
val signal : t -> unitsignal condition wakes up one fiber waiting on the condition variable unless there are no such fibers.
val broadcast : t -> unitbroadcast condition wakes up all the fibers waiting on the condition variable.
Picos_std_sync.IvarAn incremental or single-assignment poisonable variable.
val create : unit -> 'a tcreate () returns a new empty incremental variable.
val of_value : 'a -> 'a tof_value value returns an incremental variable prefilled with the given value.
val try_fill : 'a t -> 'a -> booltry_fill ivar value attempts to assign the given value to the incremental variable. Returns true on success and false in case the variable had already been poisoned or assigned a value.
val fill : 'a t -> 'a -> unitfill ivar value is equivalent to try_fill ivar value |> ignore.
val try_poison_at : 'a t -> exn -> Stdlib.Printexc.raw_backtrace -> booltry_poison_at ivar exn bt attempts to poison the incremental variable with the specified exception and backtrace. Returns true on success and false in case the variable had already been poisoned or assigned a value.
ℹ️ This operation is not cancelable.
val try_poison : ?callstack:int -> 'a t -> exn -> booltry_poison ivar exn is equivalent to try_poison_at ivar exn (Printexc.get_callstack n) where n defaults to 0.
val poison_at : 'a t -> exn -> Stdlib.Printexc.raw_backtrace -> unitpoison_at ivar exn bt is equivalent to try_poison_at ivar exn bt |> ignore.
val poison : ?callstack:int -> 'a t -> exn -> unitpoison ivar exn is equivalent to poison_at ivar exn (Printexc.get_callstack n) where n defaults to 0.
val peek_opt : 'a t -> 'a optionpeek_opt ivar either returns Some value in case the variable has been assigned the value, raises an exception in case the variable has been poisoned, or otherwise returns None, which means that the variable has not yet been poisoned or assigned a value.
val read : 'a t -> 'aread ivar waits until the variable is either assigned a value or the variable is poisoned and then returns the value or raises the exception.
val read_evt : 'a t -> 'a Picos_std_event.Event.tread_evt ivar returns an event that can be committed to once the variable has either been assigned a value or has been poisoned.
Picos_std_sync.LatchA dynamic single-use countdown latch.
Latches are typically used for determining when a finite set of parallel computations is done. If the size of the set is known a priori, then the latch can be initialized with the size as initial count and then each computation just decrements the latch.
If the size is unknown, i.e. it is determined dynamically, then a latch is initialized with a count of one, the a priori known computations are started and then the latch is decremented. When a computation is stsrted, the latch is incremented, and then decremented once the computation has finished.
val create : ?padded:bool -> int -> tcreate initial creates a new countdown latch with the specified initial count.
val try_decr : t -> booltry_decr latch attempts to decrement the count of the latch and returns true in case the count of the latch was greater than zero and false in case the count already was zero.
val decr : t -> unitdecr latch is equivalent to:
if not (try_decr latch) then
- invalid_arg "zero count"ℹ️ This operation is not cancelable.
val try_incr : t -> booltry_incr latch attempts to increment the count of the latch and returns true on success and false on failure, which means that the latch has already reached zero.
val incr : t -> unitincr latch is equivalent to:
if not (try_incr latch) then
- invalid_arg "zero count"val await : t -> unitawait latch returns after the count of the latch has reached zero.
val await_evt : t -> unit Picos_std_event.Event.tawait_evt latch returns an event that can be committed to once the count of the latch has reached zero.
Picos_std_sync.LazyA lazy suspension.
ℹ️ This intentionally mimics the interface of Stdlib.Lazy. Unlike with the standard library suspensions an attempt to force a suspension from multiple fibers, possibly running on different domains, does not raise the Undefined exception.
Synonym for Stdlib.Lazy.Undefined.
val from_fun : (unit -> 'a) -> 'a tfrom_fun thunk returns a suspension.
val from_val : 'a -> 'a tfrom_val value returns an already forced suspension whose result is the given value.
val is_val : 'a t -> boolis_val susp determines whether the suspension has already been forced and didn't raise an exception.
val force : 'a t -> 'aforce susp forces the suspension, i.e. computes thunk () using the thunk passed to from_fun, stores the result of the computation to the suspension and reproduces its result. In case the suspension has already been forced the computation is skipped and stored result is reproduced.
ℹ️ This will check whether the current fiber has been canceled before starting the computation of thunk (). This allows the suspension to be forced by another fiber. However, if the fiber is canceled and the cancelation exception is raised after the computation has been started, the suspension will then store the cancelation exception.
map fn susp is equivalent to from_fun (fun () -> fn (force susp)).
Picos_std_sync.MutexA mutual-exclusion lock or mutex.
ℹ️ This intentionally mimics the interface of Stdlib.Mutex. Unlike with the standard library mutex, blocking on this mutex potentially allows an effects based scheduler to run other fibers on the thread.
🏎️ The optional checked argument taken by most of the operations defaults to true. When explicitly specified as ~checked:false the mutex implementation may avoid having to obtain the current fiber, which can be expensive relative to locking or unlocking an uncontested mutex. Note that specifying ~checked:false on an operation may prevent error checking also on a subsequent operation.
val create : ?padded:bool -> unit -> tcreate () returns a new mutex that is initially unlocked.
val lock : ?checked:bool -> t -> unitlock mutex locks the mutex.
ℹ️ If the fiber has been canceled and propagation of cancelation is allowed, this may raise the cancelation exception before locking the mutex. If ~checked:false was specified, the cancelation exception may or may not be raised.
val try_lock : ?checked:bool -> t -> booltry_lock mutex locks the mutex in case the mutex is unlocked. Returns true on success and false in case the mutex was locked.
ℹ️ If the fiber has been canceled and propagation of cancelation is allowed, this may raise the cancelation exception before locking the mutex. If ~checked:false was specified, the cancelation exception may or may not be raised.
val unlock : ?checked:bool -> t -> unitunlock mutex unlocks the mutex.
ℹ️ This operation is not cancelable.
val protect : ?checked:bool -> t -> (unit -> 'a) -> 'aprotect mutex thunk locks the mutex, runs thunk (), and unlocks the mutex after thunk () returns or raises.
ℹ️ If the fiber has been canceled and propagation of cancelation is allowed, this may raise the cancelation exception before locking the mutex. If ~checked:false was specified, the cancelation exception may or may not be raised.
Semaphore.BinaryA binary semaphore.
val make : ?padded:bool -> bool -> tmake initial creates a new binary semaphore with count of 1 in case initial is true and count of 0 otherwise.
val release : t -> unitrelease semaphore sets the count of the semaphore to 1.
ℹ️ This operation is not cancelable.
val acquire : t -> unitacquire semaphore waits until the count of the semaphore is 1 and then atomically changes the count to 0.
val try_acquire : t -> booltry_acquire semaphore attempts to atomically change the count of the semaphore from 1 to 0.
Semaphore.CountingA counting semaphore.
val make : ?padded:bool -> int -> tmake initial creates a new counting semaphore with the given initial count.
val release : t -> unitrelease semaphore increments the count of the semaphore.
ℹ️ This operation is not cancelable.
val acquire : t -> unitacquire semaphore waits until the count of the semaphore is greater than 0 and then atomically decrements the count.
val try_acquire : t -> booltry_acquire semaphore attempts to atomically decrement the count of the semaphore unless the count is already 0.
val get_value : t -> intget_value semaphore returns the current count of the semaphore. This should only be used for debugging or informational messages.
Picos_std_sync.SemaphoreCounting and Binary semaphores.
ℹ️ This intentionally mimics the interface of Stdlib.Semaphore. Unlike with the standard library semaphores, blocking on these semaphores allows an effects based scheduler to run other fibers on the thread.
Picos_std_sync.StreamA lock-free, poisonable, many-to-many, stream.
Readers can tap into a stream to get a cursor for reading all the values pushed to the stream starting from the cursor position. Conversely, values pushed to a stream are lost unless a reader has a cursor to the position in the stream.
val create : ?padded:bool -> unit -> 'a tcreate () returns a new stream.
val push : 'a t -> 'a -> unitval poison_at : 'a t -> exn -> Stdlib.Printexc.raw_backtrace -> unitpoison_at stream exn bt marks the stream as poisoned at the current position, which means that subsequent attempts to push to the stream will raise the given exception with backtrace.
ℹ️ This operation is not cancelable.
val poison : ?callstack:int -> 'a t -> exn -> unitpoison stream exn is equivalent to poison_at stream exn (Printexc.get_callstack n) where n defaults to 0.
peek_opt cursor immediately returns Some (value, next) with the value pushed to the position and a cursor to the next position, when the cursor points to a past position in the stream. Otherwise returns None or raises the exception that the stream was poisoned with.
read cursor immediately returns (value, next) with the value pushed to the position and a cursor to the next position, when the cursor points to a past position in the stream. If the cursor points to the current position of the stream, read cursor waits until a value is pushed to the stream or the stream is poisoned, in which case the exception that the stream was poisoned with will be raised.
val read_evt : 'a cursor -> ('a * 'a cursor) Picos_std_event.Event.tread_evt cursor returns an event that reads from the cursor position.
Picos_std_syncBasic communication and synchronization primitives for Picos.
This library essentially provides a conventional set of communication and synchronization primitives for concurrent programming with any Picos compatible scheduler.
For the examples we open some modules:
open Picos_std_structured
- open Picos_std_syncmodule Mutex : sig ... endA mutual-exclusion lock or mutex.
module Condition : sig ... endA condition variable.
module Lazy : sig ... endA lazy suspension.
module Latch : sig ... endA dynamic single-use countdown latch.
module Ivar : sig ... endAn incremental or single-assignment poisonable variable.
module Stream : sig ... endA lock-free, poisonable, many-to-many, stream.
Here is an example of a simple bounded (blocking) queue using a mutex and condition variables:
module Bounded_q : sig
- type 'a t
- val create : capacity:int -> 'a t
- val push : 'a t -> 'a -> unit
- val pop : 'a t -> 'a
- end = struct
- type 'a t = {
- mutex : Mutex.t;
- queue : 'a Queue.t;
- capacity : int;
- not_empty : Condition.t;
- not_full : Condition.t;
- }
-
- let create ~capacity =
- if capacity < 0 then
- invalid_arg "negative capacity"
- else {
- mutex = Mutex.create ();
- queue = Queue.create ();
- capacity;
- not_empty = Condition.create ();
- not_full = Condition.create ();
- }
-
- let is_full_unsafe t =
- t.capacity <= Queue.length t.queue
-
- let push t x =
- let was_empty =
- Mutex.protect t.mutex @@ fun () ->
- while is_full_unsafe t do
- Condition.wait t.not_full t.mutex
- done;
- Queue.push x t.queue;
- Queue.length t.queue = 1
- in
- if was_empty then
- Condition.broadcast t.not_empty
-
- let pop t =
- let elem, was_full =
- Mutex.protect t.mutex @@ fun () ->
- while Queue.length t.queue = 0 do
- Condition.wait
- t.not_empty t.mutex
- done;
- let was_full = is_full_unsafe t in
- Queue.pop t.queue, was_full
- in
- if was_full then
- Condition.broadcast t.not_full;
- elem
- endThe above is definitely not the fastest nor the most scalable bounded queue, but we can now demonstrate it with the cooperative Picos_mux_fifo scheduler:
# Picos_mux_fifo.run @@ fun () ->
-
- let bq =
- Bounded_q.create ~capacity:3
- in
-
- Flock.join_after ~on_return:`Terminate begin fun () ->
- Flock.fork begin fun () ->
- while true do
- Printf.printf "Popped %d\n%!"
- (Bounded_q.pop bq)
- done
- end;
-
- for i=1 to 5 do
- Printf.printf "Pushing %d\n%!" i;
- Bounded_q.push bq i
- done;
-
- Printf.printf "All done?\n%!";
-
- Control.yield ();
- end;
-
- Printf.printf "Pushing %d\n%!" 101;
- Bounded_q.push bq 101;
-
- Printf.printf "Popped %d\n%!"
- (Bounded_q.pop bq)
- Pushing 1
- Pushing 2
- Pushing 3
- Pushing 4
- Popped 1
- Popped 2
- Popped 3
- Pushing 5
- All done?
- Popped 4
- Popped 5
- Pushing 101
- Popped 101
- - : unit = ()Notice how the producer was able to push three elements to the queue after which the fourth push blocked and the consumer was started. Also, after canceling the consumer, the queue could still be used just fine.
The optional padded argument taken by several constructor functions, e.g. Latch.create, Mutex.create, Condition.create, Semaphore.Counting.make, and Semaphore.Binary.make, defaults to false. When explicitly specified as ~padded:true the object is allocated in a way to avoid false sharing. For relatively long lived objects this can improve performance and make performance more stable at the cost of using more memory. It is not recommended to use ~padded:true for short lived objects.
The primitives provided by this library are generally optimized for low contention scenariors and size. Generally speaking, for best performance and scalability, you should try to avoid high contention scenarios by architecting your program to distribute processing such that sequential bottlenecks are avoided. If high contention is unavoidable then other communication and synchronization primitive implementations may provide better performance.
Picos_std_sync__This module is hidden.
Picos_std_sync__ConditionThis module is hidden.
Picos_std_sync__IvarThis module is hidden.
Picos_std_sync__LatchThis module is hidden.
Picos_std_sync__LazyThis module is hidden.
Picos_std_sync__List_extThis module is hidden.
Picos_std_sync__MutexThis module is hidden.
Picos_std_sync__QThis module is hidden.
Picos_std_sync__SemaphoreThis module is hidden.
Picos_std_sync__StreamThis module is hidden.
This package contains sample scheduler agnostic libraries for Picos. Many of the modules are intentionally designed to mimic modules from the OCaml Stdlib.
Picos_std_finally Syntax for avoiding resource leaks for Picos.Picos_std_awaitable Basic futex-like awaitable atomic location for Picos.Picos_std_event Basic event abstraction for Picos.Picos_std_structured Basic structured concurrency primitives for Picos.Picos_std_sync Basic communication and synchronization primitives for Picos.These libraries are both meant to serve as examples of what can be done and to also provide practical means for programming with fibers. Hopefully there will be many more libraries implemented in Picos like these providing different approaches, patterns, and idioms for structuring concurrent programs.