mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 03:05:30 -05:00
189 lines
15 KiB
HTML
189 lines
15 KiB
HTML
<!DOCTYPE html>
|
||
<html xmlns="http://www.w3.org/1999/xhtml"><head><title>Picos_std_structured (picos_std.Picos_std_structured)</title><meta charset="utf-8"/><link rel="stylesheet" href="../../_odoc-theme/odoc.css"/><meta name="generator" content="odoc 3.1.0"/><meta name="viewport" content="width=device-width,initial-scale=1.0"/><script src="../../highlight.pack.js"></script><script>hljs.initHighlightingOnLoad();</script></head><body class="odoc"><nav class="odoc-nav"><a href="../index.html">Up</a> – <a href="../../index.html">Index</a> » <a href="../index.html">picos_std</a> » Picos_std_structured</nav><header class="odoc-preamble"><h1>Module <code><span>Picos_std_structured</span></code></h1><p>Basic structured concurrency primitives for <a href="../../picos/Picos/index.html"><code>Picos</code></a>.</p><p>This library essentially provides one application programming interface for structuring fibers with any Picos compatible scheduler.</p><p>For the <a href="#examples" title="examples">examples</a> we open some modules:</p><pre class="language-ocaml"><code> open Picos_io
|
||
open Picos_std_event
|
||
open Picos_std_finally
|
||
open Picos_std_structured
|
||
open Picos_std_sync</code></pre></header><div class="odoc-tocs"><nav class="odoc-toc odoc-local-toc"><ul><li><a href="#modules">Modules</a></li><li><a href="#examples">Examples</a><ul><li><a href="#understanding-cancelation">Understanding cancelation</a></li><li><a href="#errors-and-cancelation">Errors and cancelation</a></li><li><a href="#a-simple-echo-server-and-clients">A simple echo server and clients</a></li></ul></li></ul></nav></div><div class="odoc-content"><h2 id="modules"><a href="#modules" class="anchor"></a>Modules</h2><div class="odoc-spec"><div class="spec module anchored" id="module-Control"><a href="#module-Control" class="anchor"></a><code><span><span class="keyword">module</span> <a href="Control/index.html">Control</a></span><span> : <span class="keyword">sig</span> ... <span class="keyword">end</span></span></code></div><div class="spec-doc"><p>Basic control operations and exceptions for structured concurrency.</p></div></div><div class="odoc-spec"><div class="spec module anchored" id="module-Promise"><a href="#module-Promise" class="anchor"></a><code><span><span class="keyword">module</span> <a href="Promise/index.html">Promise</a></span><span> : <span class="keyword">sig</span> ... <span class="keyword">end</span></span></code></div><div class="spec-doc"><p>A cancelable promise.</p></div></div><div class="odoc-spec"><div class="spec module anchored" id="module-Bundle"><a href="#module-Bundle" class="anchor"></a><code><span><span class="keyword">module</span> <a href="Bundle/index.html">Bundle</a></span><span> : <span class="keyword">sig</span> ... <span class="keyword">end</span></span></code></div><div class="spec-doc"><p>An explicit dynamic bundle of fibers guaranteed to be joined at the end.</p></div></div><div class="odoc-spec"><div class="spec module anchored" id="module-Flock"><a href="#module-Flock" class="anchor"></a><code><span><span class="keyword">module</span> <a href="Flock/index.html">Flock</a></span><span> : <span class="keyword">sig</span> ... <span class="keyword">end</span></span></code></div><div class="spec-doc"><p>An implicit dynamic flock of fibers guaranteed to be joined at the end.</p></div></div><div class="odoc-spec"><div class="spec module anchored" id="module-Run"><a href="#module-Run" class="anchor"></a><code><span><span class="keyword">module</span> <a href="Run/index.html">Run</a></span><span> : <span class="keyword">sig</span> ... <span class="keyword">end</span></span></code></div><div class="spec-doc"><p>Operations for running fibers in specific patterns.</p></div></div><h2 id="examples"><a href="#examples" class="anchor"></a>Examples</h2><h3 id="understanding-cancelation"><a href="#understanding-cancelation" class="anchor"></a>Understanding cancelation</h3><p>Consider the following program:</p><pre class="language-ocaml"><code> let main () =
|
||
Flock.join_after begin fun () ->
|
||
let promise =
|
||
Flock.fork_as_promise @@ fun () ->
|
||
Control.block ()
|
||
in
|
||
|
||
Flock.fork begin fun () ->
|
||
Promise.await promise
|
||
end;
|
||
|
||
Flock.fork begin fun () ->
|
||
let condition = Condition.create ()
|
||
and mutex = Mutex.create () in
|
||
Mutex.protect mutex begin fun () ->
|
||
while true do
|
||
Condition.wait condition mutex
|
||
done
|
||
end
|
||
end;
|
||
|
||
Flock.fork begin fun () ->
|
||
let sem =
|
||
Semaphore.Binary.make false
|
||
in
|
||
Semaphore.Binary.acquire sem
|
||
end;
|
||
|
||
Flock.fork begin fun () ->
|
||
let sem =
|
||
Semaphore.Counting.make 0
|
||
in
|
||
Semaphore.Counting.acquire sem
|
||
end;
|
||
|
||
Flock.fork begin fun () ->
|
||
Event.sync (Event.choose [])
|
||
end;
|
||
|
||
Flock.fork begin fun () ->
|
||
let latch = Latch.create 1 in
|
||
Latch.await latch
|
||
end;
|
||
|
||
Flock.fork begin fun () ->
|
||
let ivar = Ivar.create () in
|
||
Ivar.read ivar
|
||
end;
|
||
|
||
Flock.fork begin fun () ->
|
||
let stream = Stream.create () in
|
||
Stream.read (Stream.tap stream)
|
||
|> ignore
|
||
end;
|
||
|
||
Flock.fork begin fun () ->
|
||
let@ inn, out = finally
|
||
Unix.close_pair @@ fun () ->
|
||
Unix.socketpair ~cloexec:true
|
||
PF_UNIX SOCK_STREAM 0
|
||
in
|
||
Unix.set_nonblock inn;
|
||
let n =
|
||
Unix.read inn (Bytes.create 1)
|
||
0 1
|
||
in
|
||
assert (n = 1)
|
||
end;
|
||
|
||
Flock.fork begin fun () ->
|
||
let a_month =
|
||
60.0 *. 60.0 *. 24.0 *. 30.0
|
||
in
|
||
Control.sleep ~seconds:a_month
|
||
end;
|
||
|
||
(* Let the children get stuck *)
|
||
Control.sleep ~seconds:0.1;
|
||
|
||
Flock.terminate ()
|
||
end</code></pre><p>First of all, note that above the <a href="../Picos_std_sync/Mutex/index.html" title="Picos_std_sync.Mutex"><code>Mutex</code></a>, <a href="../Picos_std_sync/Condition/index.html" title="Picos_std_sync.Condition"><code>Condition</code></a>, and <a href="../Picos_std_sync/Semaphore/index.html" title="Picos_std_sync.Semaphore"><code>Semaphore</code></a> modules come from the <a href="../Picos_std_sync/index.html"><code>Picos_std_sync</code></a> library and the <span class="xref-unresolved" title="Picos_io.Unix"><code>Unix</code></span> module comes from the <code>Picos_io</code> library. They do not come from the standard OCaml libraries.</p><p>The above program creates a <a href="Flock/index.html" title="Flock">flock</a> of fibers and <a href="Flock/index.html#val-fork" title="Flock.fork">forks</a> several fibers to the flock that all block in various ways. In detail,</p><ul><li><a href="Control/index.html#val-block"><code>Control.block</code></a> never returns,</li><li><a href="Promise/index.html#val-await"><code>Promise.await</code></a> never returns as the promise won't be completed,</li><li><a href="../Picos_std_sync/Condition/index.html#val-wait" title="Picos_std_sync.Condition.wait"><code>Condition.wait</code></a> never returns, because the condition is never signaled,</li><li><a href="../Picos_std_sync/Semaphore/Binary/index.html#val-acquire" title="Picos_std_sync.Semaphore.Binary.acquire"><code>Semaphore.Binary.acquire</code></a> and <a href="../Picos_std_sync/Semaphore/Counting/index.html#val-acquire" title="Picos_std_sync.Semaphore.Counting.acquire"><code>Semaphore.Counting.acquire</code></a> never return, because the counts of the semaphores never change from <code>0</code>,</li><li><a href="../Picos_std_event/Event/index.html#val-sync" title="Picos_std_event.Event.sync"><code>Event.sync</code></a> never returns, because the event can never be committed to,</li><li><a href="../Picos_std_sync/Latch/index.html#val-await" title="Picos_std_sync.Latch.await"><code>Latch.await</code></a> never returns, because the count of the latch never reaches <code>0</code>,</li><li><a href="../Picos_std_sync/Ivar/index.html#val-read" title="Picos_std_sync.Ivar.read"><code>Ivar.read</code></a> never returns, because the incremental variable is never filled,</li><li><a href="../Picos_std_sync/Stream/index.html#val-read" title="Picos_std_sync.Stream.read"><code>Stream.read</code></a> never returns, because the stream is never pushed to,</li><li><span class="xref-unresolved" title="Picos_io.Unix.read"><code>Unix.read</code></span> never returns, because the socket is never written to, and the</li><li><a href="Control/index.html#val-sleep"><code>Control.sleep</code></a> call would return only after about a month.</li></ul><p>Fibers forked to a flock can be canceled in various ways. In the above program we call <a href="Flock/index.html#val-terminate"><code>Flock.terminate</code></a> to cancel all of the fibers and effectively close the flock. This allows the program to return normally immediately and without leaking or leaving anything in an invalid state:</p><pre class="language-ocaml"><code> # Picos_mux_random.run_on ~n_domains:2 main
|
||
- : unit = ()</code></pre><p>Now, the point of the above example isn't that you should just call <a href="Flock/index.html#val-terminate" title="Flock.terminate"><code>terminate</code></a> when your program gets stuck. 😅</p><p>What the above example hopefully demonstrates is that concurrent abstractions like mutexes and condition variables, asynchronous IO libraries, and others can be designed to support cancelation.</p><p>Cancelation is a signaling mechanism that allows structured concurrent abstractions, like the <a href="Flock/index.html"><code>Flock</code></a> abstraction, to (hopefully) gracefully tear down concurrent fibers in case of errors. Indeed, one of the basic ideas behind the <a href="Flock/index.html"><code>Flock</code></a> abstraction is that in case any fiber forked to the flock raises an unhandled exception, the whole flock will be terminated and the error will raised from the flock, which allows you to understand what went wrong, instead of having to debug a program that mysteriously gets stuck, for example.</p><p>Cancelation can also, with some care, be used as a mechanism to terminate fibers once they are no longer needed. However, just like sleep, for example, cancelation is inherently prone to races, i.e. it is difficult to understand the exact point and state at which a fiber gets canceled and it is usually non-deterministic, and therefore cancelation is not recommended for use as a general synchronization or communication mechanism.</p><h3 id="errors-and-cancelation"><a href="#errors-and-cancelation" class="anchor"></a>Errors and cancelation</h3><p>Consider the following program:</p><pre class="language-ocaml"><code> let many_errors () =
|
||
Flock.join_after @@ fun () ->
|
||
|
||
let latch = Latch.create 1 in
|
||
|
||
let fork_raising exn =
|
||
Flock.fork begin fun () ->
|
||
Control.protect begin fun () ->
|
||
Latch.await latch
|
||
end;
|
||
raise exn
|
||
end
|
||
in
|
||
|
||
fork_raising Exit;
|
||
fork_raising Not_found;
|
||
fork_raising Control.Terminate;
|
||
|
||
Latch.decr latch</code></pre><p>The above program starts three fibers and uses a <a href="../Picos_std_sync/Latch/index.html" title="Picos_std_sync.Latch">latch</a> to ensure that all of them have been started, before two of them raise errors and the third raises <a href="Control/index.html#exception-Terminate" title="Control.Terminate"><code>Terminate</code></a>, which is not considered an error in this library. Running the program</p><pre class="language-ocaml"><code> # Picos_mux_fifo.run many_errors
|
||
Exception: Errors[Stdlib.Exit; Not_found]</code></pre><p>raises a collection of all of the <a href="Control/index.html#exception-Errors" title="Control.Errors">errors</a>.</p><h3 id="a-simple-echo-server-and-clients"><a href="#a-simple-echo-server-and-clients" class="anchor"></a>A simple echo server and clients</h3><p>Let's build a simple TCP echo server and run it with some clients.</p><p>We first define a function for the server:</p><pre class="language-ocaml"><code> let run_server server_fd =
|
||
Flock.join_after begin fun () ->
|
||
while true do
|
||
let@ client_fd =
|
||
instantiate Unix.close @@ fun () ->
|
||
Unix.accept
|
||
~cloexec:true server_fd |> fst
|
||
in
|
||
|
||
(* Fork a fiber for client *)
|
||
Flock.fork begin fun () ->
|
||
let@ client_fd =
|
||
move client_fd
|
||
in
|
||
Unix.set_nonblock client_fd;
|
||
|
||
let bs = Bytes.create 100 in
|
||
let n =
|
||
Unix.read client_fd bs 0
|
||
(Bytes.length bs)
|
||
in
|
||
Unix.write client_fd bs 0 n
|
||
|> ignore
|
||
end
|
||
done
|
||
end</code></pre><p>The server function expects a listening socket. For each accepted client the server forks a new fiber to handle it. The client socket is <span class="xref-unresolved" title="Finally.move">moved</span> from the server fiber to the client fiber to avoid leaks and to ensure that the socket will be closed.</p><p>Let's then define a function for the clients:</p><pre class="language-ocaml"><code> let run_client server_addr =
|
||
let@ socket =
|
||
finally Unix.close @@ fun () ->
|
||
Unix.socket ~cloexec:true
|
||
PF_INET SOCK_STREAM 0
|
||
in
|
||
Unix.set_nonblock socket;
|
||
Unix.connect socket server_addr;
|
||
|
||
let msg = "Hello!" in
|
||
Unix.write_substring
|
||
socket msg 0 (String.length msg)
|
||
|> ignore;
|
||
|
||
let bytes =
|
||
Bytes.create (String.length msg)
|
||
in
|
||
let n =
|
||
Unix.read socket bytes 0
|
||
(Bytes.length bytes)
|
||
in
|
||
|
||
Printf.printf "Received: %s\n%!"
|
||
(Bytes.sub_string bytes 0 n)</code></pre><p>The client function takes the address of the server and connects a socket to the server address. It then writes a message to the server and reads a reply from the server and prints it.</p><p>Here is the main program:</p><pre class="language-ocaml"><code> let main () =
|
||
let@ server_fd =
|
||
finally Unix.close @@ fun () ->
|
||
Unix.socket ~cloexec:true
|
||
PF_INET SOCK_STREAM 0
|
||
in
|
||
Unix.set_nonblock server_fd;
|
||
(* Let system determine the port *)
|
||
Unix.bind server_fd Unix.(
|
||
ADDR_INET(inet_addr_loopback, 0));
|
||
Unix.listen server_fd 8;
|
||
|
||
let server_addr =
|
||
Unix.getsockname server_fd
|
||
in
|
||
|
||
Flock.join_after ~on_return:`Terminate begin fun () ->
|
||
(* Start server *)
|
||
Flock.fork begin fun () ->
|
||
run_server server_fd
|
||
end;
|
||
|
||
(* Run clients concurrently *)
|
||
Flock.join_after begin fun () ->
|
||
for _ = 1 to 5 do
|
||
Flock.fork @@ fun () ->
|
||
run_client server_addr
|
||
done
|
||
end
|
||
end</code></pre><p>The main program creates a socket for the server and configures it. The server is then started as a fiber in a flock terminated on return. Then the clients are started to run concurrently in an inner flock.</p><p>Finally we run the main program with a scheduler:</p><pre class="language-ocaml"><code> # Picos_mux_random.run_on ~n_domains:1 main
|
||
Received: Hello!
|
||
Received: Hello!
|
||
Received: Hello!
|
||
Received: Hello!
|
||
Received: Hello!
|
||
- : unit = ()</code></pre><p>As an exercise, you might want to refactor the server to avoid <span class="xref-unresolved" title="Finally.move">moving</span> the file descriptors and use a <span class="xref-unresolved" title="Finally.let@">recursive</span> accept loop instead. You could also <a href="Flock/index.html#val-terminate" title="Flock.terminate">terminate the whole flock</a> at the end instead of just terminating the server.</p></div></body></html>
|