mirror of
https://github.com/c-cube/linol.git
synced 2025-12-06 03:05:31 -05:00
974 lines
30 KiB
Text
974 lines
30 KiB
Text
{0 Lwt manual }
|
|
|
|
{1 Introduction }
|
|
|
|
When writing a program, a common developer's task is to handle I/O
|
|
operations. Indeed, most software interacts with several different
|
|
resources, such as:
|
|
|
|
{ul
|
|
{- the kernel, by doing system calls,}
|
|
{- the user, by reading the keyboard, the mouse, or any input device,}
|
|
{- a graphical server, to build graphical user interface,}
|
|
{- other computers, by using the network,}
|
|
{- …and so on.}}
|
|
|
|
When this list contains only one item, it is pretty easy to
|
|
handle. However as this list grows it becomes harder and harder to
|
|
make everything work together. Several choices have been proposed
|
|
to solve this problem:
|
|
|
|
{ul
|
|
{- using a main loop, and integrating all components we are
|
|
interacting with into this main loop,}
|
|
{- using preemptive system threads.}}
|
|
|
|
Both solutions have their advantages and their drawbacks. For the
|
|
first one, it may work, but it becomes very complicated to write
|
|
a piece of asynchronous sequential code. The typical example is
|
|
graphical user interfaces freezing and not redrawing themselves
|
|
because they are waiting for some blocking part of the code to
|
|
complete.
|
|
|
|
If you already wrote code using preemptive threads, you should know
|
|
that doing it right with threads is a difficult job. Moreover, system
|
|
threads consume non-negligible resources, and so you can only launch
|
|
a limited number of threads at the same time. Thus, this is not a
|
|
general solution.
|
|
|
|
[Lwt] offers a third alternative. It provides promises, which are
|
|
very fast: a promise is just a reference that will be filled asynchronously,
|
|
and calling a function that returns a promise does not require a new stack,
|
|
new process, or anything else. It is just a normal, fast, function call.
|
|
Promises compose nicely, allowing us to write highly asynchronous programs.
|
|
|
|
In the first part, we will explain the concepts of [Lwt], then we will
|
|
describe the main modules [Lwt] consists of.
|
|
|
|
{2 Finding examples }
|
|
|
|
Additional sources of examples:
|
|
|
|
{ul
|
|
{- {{: https://github.com/dkim/rwo-lwt#readme }Concurrent Programming with Lwt}}
|
|
{- {{: https://mirage.io/wiki/tutorial-lwt }Mirage Lwt Tutorial}}
|
|
{- {{: http://www.baturin.org/code/lwt-counter-server/ }Simple Server with Lwt}}}
|
|
|
|
{1 The Lwt core library }
|
|
|
|
In this section we describe the basics of [Lwt]. It is advised to
|
|
start [utop] and try the given code examples.
|
|
|
|
{2 Lwt concepts }
|
|
|
|
Let's take a classic function of the [Stdlib] module:
|
|
|
|
{[
|
|
# Stdlib.input_char;;
|
|
- : in_channel -> char = <fun>
|
|
]}
|
|
|
|
This function will wait for a character to come on the given input
|
|
channel, and then return it. The problem with this function is that it is
|
|
blocking: while it is being executed, the whole program will be
|
|
blocked, and other events will not be handled until it returns.
|
|
|
|
Now, let's look at the lwt equivalent:
|
|
|
|
{[
|
|
# Lwt_io.read_char;;
|
|
- : Lwt_io.input_channel -> char Lwt.t = <fun>
|
|
]}
|
|
|
|
As you can see, it does not return just a character, but something of
|
|
type [char Lwt.t]. The type ['a Lwt.t] is the type
|
|
of promises that can be fulfilled later with a value of type ['a].
|
|
[Lwt_io.read_char] will try to read a character from the
|
|
given input channel and {e immediately} return a promise, without
|
|
blocking, whether a character is available or not. If a character is
|
|
not available, the promise will just not be fulfilled {e yet}.
|
|
|
|
Now, let's see what we can do with a [Lwt] promise. The following
|
|
code creates a pipe, creates a promise that is fulfilled with the result of
|
|
reading the input side:
|
|
|
|
{[
|
|
# let ic, oc = Lwt_io.pipe ();;
|
|
val ic : Lwt_io.input_channel = <abstr>
|
|
val oc : Lwt_io.output_channel = <abstr>
|
|
# let p = Lwt_io.read_char ic;;
|
|
val p : char Lwt.t = <abstr>
|
|
]}
|
|
|
|
We can now look at the state of our newly created promise:
|
|
|
|
{[
|
|
# Lwt.state p;;
|
|
- : char Lwt.state = Lwt.Sleep
|
|
]}
|
|
|
|
A promise may be in one of the following states:
|
|
|
|
{ul
|
|
{- [Return x], which means that the promise has been fulfilled
|
|
with the value [x]. This usually implies that the asynchronous
|
|
operation, that you started by calling the function that returned the
|
|
promise, has completed successfully.}
|
|
{- [Fail exn], which means that the promise has been rejected
|
|
with the exception [exn]. This usually means that the asynchronous
|
|
operation associated with the promise has failed.}
|
|
{- [Sleep], which means that the promise is has not yet been
|
|
fulfilled or rejected, so it is {e pending}.}}
|
|
|
|
The above promise [p] is pending because there is nothing yet
|
|
to read from the pipe. Let's write something:
|
|
|
|
{[
|
|
# Lwt_io.write_char oc 'a';;
|
|
- : unit Lwt.t = <abstr>
|
|
# Lwt.state p;;
|
|
- : char Lwt.state = Lwt.Return 'a'
|
|
]}
|
|
|
|
So, after we write something, the reading promise has been fulfilled
|
|
with the value ['a'].
|
|
|
|
{2 Primitives for promise creation }
|
|
|
|
There are several primitives for creating [Lwt] promises. These
|
|
functions are located in the module [Lwt].
|
|
|
|
Here are the main primitives:
|
|
|
|
{ul
|
|
{- [Lwt.return : 'a -> 'a Lwt.t]
|
|
creates a promise which is already fulfilled with the given value}
|
|
{- [Lwt.fail : exn -> 'a Lwt.t]
|
|
creates a promise which is already rejected with the given exception}
|
|
{- [Lwt.wait : unit -> 'a Lwt.t * 'a Lwt.u]
|
|
creates a pending promise, and returns it, paired with a resolver (of
|
|
type ['a Lwt.u]), which must be used to resolve (fulfill or reject)
|
|
the promise.}}
|
|
|
|
To resolve a pending promise, use one of the following
|
|
functions:
|
|
|
|
{ul
|
|
{- [Lwt.wakeup : 'a Lwt.u -> 'a -> unit]
|
|
fulfills the promise with a value.}
|
|
{- [Lwt.wakeup_exn : 'a Lwt.u -> exn -> unit]
|
|
rejects the promise with an exception.}}
|
|
|
|
Note that it is an error to try to resolve the same promise twice. [Lwt]
|
|
will raise [Invalid_argument] if you try to do so.
|
|
|
|
With this information, try to guess the result of each of the
|
|
following expressions:
|
|
|
|
{[
|
|
# Lwt.state (Lwt.return 42);;
|
|
# Lwt.state (Lwt.fail Exit);;
|
|
# let p, r = Lwt.wait ();;
|
|
# Lwt.state p;;
|
|
# Lwt.wakeup r 42;;
|
|
# Lwt.state p;;
|
|
# let p, r = Lwt.wait ();;
|
|
# Lwt.state p;;
|
|
# Lwt.wakeup_exn r Exit;;
|
|
# Lwt.state p;;
|
|
]}
|
|
|
|
{3 Primitives for promise composition }
|
|
|
|
The most important operation you need to know is [bind]:
|
|
|
|
{[
|
|
val bind : 'a Lwt.t -> ('a -> 'b Lwt.t) -> 'b Lwt.t
|
|
]}
|
|
|
|
[bind p f] creates a promise which waits for [p] to become
|
|
become fulfilled, then passes the resulting value to [f]. If [p] is a
|
|
pending promise, then [bind p f] will be a pending promise too,
|
|
until [p] is resolved. If [p] is rejected, then the resulting
|
|
promise will be rejected with the same exception. For example, consider the
|
|
following expression:
|
|
|
|
{[
|
|
Lwt.bind
|
|
(Lwt_io.read_line Lwt_io.stdin)
|
|
(fun str -> Lwt_io.printlf "You typed %S" str)
|
|
]}
|
|
|
|
This code will first wait for the user to enter a line of text, then
|
|
print a message on the standard output.
|
|
|
|
Similarly to [bind], there is a function to handle the case
|
|
when [p] is rejected:
|
|
|
|
{[
|
|
val catch : (unit -> 'a Lwt.t) -> (exn -> 'a Lwt.t) -> 'a Lwt.t
|
|
]}
|
|
|
|
[catch f g] will call [f ()], then wait for it to become
|
|
resolved, and if it was rejected with an exception [exn], call
|
|
[g exn] to handle it. Note that both exceptions raised with
|
|
[Pervasives.raise] and [Lwt.fail] are caught by
|
|
[catch].
|
|
|
|
{3 Cancelable promises }
|
|
|
|
In some case, we may want to cancel a promise. For example, because it
|
|
has not resolved after a timeout. This can be done with cancelable
|
|
promises. To create a cancelable promise, you must use the
|
|
[Lwt.task] function:
|
|
|
|
{[
|
|
val task : unit -> 'a Lwt.t * 'a Lwt.u
|
|
]}
|
|
|
|
It has the same semantics as [Lwt.wait], except that the
|
|
pending promise can be canceled with [Lwt.cancel]:
|
|
|
|
{[
|
|
val cancel : 'a Lwt.t -> unit
|
|
]}
|
|
|
|
The promise will then be rejected with the exception
|
|
[Lwt.Canceled]. To execute a function when the promise is
|
|
canceled, you must use [Lwt.on_cancel]:
|
|
|
|
{[
|
|
val on_cancel : 'a Lwt.t -> (unit -> unit) -> unit
|
|
]}
|
|
|
|
Note that canceling a promise does not automatically cancel the
|
|
asynchronous operation that is going to resolve it. It does, however,
|
|
prevent any further chained operations from running. The asynchronous
|
|
operation associated with a promise can only be canceled if its implementation
|
|
has taken care to set an [on_cancel] callback on the promise that
|
|
it returned to you. In practice, most operations (such as system calls)
|
|
can't be canceled once they are started anyway, so promise cancellation is
|
|
useful mainly for interrupting future operations once you know that a chain of
|
|
asynchronous operations will not be needed.
|
|
|
|
It is also possible to cancel a promise which has not been
|
|
created directly by you with [Lwt.task]. In this case, the deepest
|
|
cancelable promise that the given promise depends on will be canceled.
|
|
|
|
For example, consider the following code:
|
|
|
|
{[
|
|
# let p, r = Lwt.task ();;
|
|
val p : '_a Lwt.t = <abstr>
|
|
val r : '_a Lwt.u = <abstr>
|
|
# let p' = Lwt.bind p (fun x -> Lwt.return (x + 1));;
|
|
val p' : int Lwt.t = <abstr>
|
|
]}
|
|
|
|
Here, cancelling [p'] will in fact cancel [p], rejecting
|
|
it with [Lwt.Canceled]. [Lwt.bind] will then propagate the
|
|
exception forward to [p']:
|
|
|
|
{[
|
|
# Lwt.cancel p';;
|
|
- : unit = ()
|
|
# Lwt.state p;;
|
|
- : int Lwt.state = Lwt.Fail Lwt.Canceled
|
|
# Lwt.state p';;
|
|
- : int Lwt.state = Lwt.Fail Lwt.Canceled
|
|
]}
|
|
|
|
It is possible to prevent a promise from being canceled
|
|
by using the function [Lwt.protected]:
|
|
|
|
{[
|
|
val protected : 'a Lwt.t -> 'a Lwt.t
|
|
]}
|
|
|
|
Canceling [(protected p)] will have no effect on [p].
|
|
|
|
{3 Primitives for concurrent composition }
|
|
|
|
We now show how to compose several promises concurrently. The
|
|
main functions for this are in the [Lwt] module: [join],
|
|
[choose] and [pick].
|
|
|
|
The first one, [join] takes a list of promises and returns a promise
|
|
that is waiting for all of them to resolve:
|
|
|
|
{[
|
|
val join : unit Lwt.t list -> unit Lwt.t
|
|
]}
|
|
|
|
Moreover, if at least one promise is rejected, [join l] will be rejected
|
|
with the same exception as the first one, after all the promises are resolved.
|
|
|
|
Conversely, [choose] waits for at least {e one} promise to become
|
|
resolved, then resolves with the same value or exception:
|
|
|
|
{[
|
|
val choose : 'a Lwt.t list -> 'a Lwt.t
|
|
]}
|
|
|
|
For example:
|
|
|
|
{[
|
|
# let p1, r1 = Lwt.wait ();;
|
|
val p1 : '_a Lwt.t = <abstr>
|
|
val r1 : '_a Lwt.u = <abstr>
|
|
# let p2, r2 = Lwt.wait ();;
|
|
val p2 : '_a Lwt.t = <abstr>
|
|
val r2 : '_a Lwt.u = <abstr>
|
|
# let p3 = Lwt.choose [p1; p2];;
|
|
val p3 : '_a Lwt.t = <abstr>
|
|
# Lwt.state p3;;
|
|
- : '_a Lwt.state = Lwt.Sleep
|
|
# Lwt.wakeup r2 42;;
|
|
- : unit = ()
|
|
# Lwt.state p3;;
|
|
- : int Lwt.state = Lwt.Return 42
|
|
]}
|
|
|
|
The last one, [pick], is the same as [choose], except that it tries to cancel
|
|
all other promises when one resolves. Promises created via [Lwt.wait()] are not cancellable
|
|
and are thus not cancelled.
|
|
|
|
{3 Rules }
|
|
|
|
A callback, like the [f] that you might pass to [Lwt.bind], is
|
|
an ordinary OCaml function. [Lwt] just handles ordering calls to these
|
|
functions.
|
|
|
|
[Lwt] uses some preemptive threading internally, but all of your code
|
|
runs in the main thread, except when you explicitly opt into additional
|
|
threads with [Lwt_preemptive].
|
|
|
|
This simplifies reasoning about critical sections: all the code in one
|
|
callback cannot be interrupted by any of the code in another callback.
|
|
However, it also carries the danger that if a single callback takes a very
|
|
long time, it will not give [Lwt] a chance to run your other callbacks.
|
|
In particular:
|
|
|
|
{ul
|
|
{- do not write functions that may take time to complete, without splitting
|
|
them up using [Lwt.pause] or performing some [Lwt] I/O,}
|
|
{- do not do I/O that may block, otherwise the whole program will
|
|
hang inside that callback. You must instead use the asynchronous I/O
|
|
operations provided by [Lwt].}}
|
|
|
|
{2 The syntax extension }
|
|
|
|
[Lwt] offers a PPX syntax extension which increases code readability and
|
|
makes coding using [Lwt] easier. The syntax extension is documented
|
|
in {!Ppx_lwt}.
|
|
|
|
To use the PPX syntax extension, add the [lwt_ppx] package when
|
|
compiling:
|
|
|
|
{[
|
|
$ ocamlfind ocamlc -package lwt_ppx -linkpkg -o foo foo.ml
|
|
]}
|
|
|
|
Or, in [utop]:
|
|
|
|
{[
|
|
# #require "lwt_ppx";;
|
|
]}
|
|
|
|
[lwt_ppx] is distributed in a separate opam package of that same name.
|
|
|
|
For a brief overview of the syntax, see the Correspondence table below.
|
|
|
|
{3 Correspondence table }
|
|
|
|
{table
|
|
{tr
|
|
{th Without Lwt}
|
|
{th With Lwt}}
|
|
{tr
|
|
{td {[let pattern_1 = expr_1
|
|
and pattern_2 = expr2
|
|
…
|
|
and pattern_n = expr_n in
|
|
expr]}}
|
|
{td {[let%lwt pattern_1 = expr_1
|
|
and pattern_2 = expr2
|
|
…
|
|
and pattern_n = expr_n in
|
|
expr]}}}
|
|
{tr
|
|
{td {[try expr with
|
|
| pattern_1 = expr_1
|
|
| pattern_2 = expr2
|
|
…
|
|
| pattern_n = expr_n]}}
|
|
{td {[try%lwt expr with
|
|
| pattern_1 = expr_1
|
|
| pattern_2 = expr2
|
|
…
|
|
| pattern_n = expr_n]}}}
|
|
{tr
|
|
{td {[match expr with
|
|
| pattern_1 = expr_1
|
|
| pattern_2 = expr2
|
|
…
|
|
| pattern_n = expr_n]}}
|
|
{td {[match%lwt expr with
|
|
| pattern_1 = expr_1
|
|
| pattern_2 = expr2
|
|
…
|
|
| pattern_n = expr_n]}}}
|
|
{tr
|
|
{td {[for ident = expr_init to expr_final do
|
|
expr
|
|
done]}}
|
|
{td {[for%lwt ident = expr_init to expr_final do
|
|
expr
|
|
done]}}}
|
|
{tr
|
|
{td {[while expr do expr done]}}
|
|
{td {[while%lwt expr do expr done]}}}
|
|
{tr
|
|
{td {[if expr then expr else expr]}}
|
|
{td {[if%lwt expr then expr else expr]}}}
|
|
{tr
|
|
{td {[assert expr]}}
|
|
{td {[assert%lwt expr]}}}
|
|
{tr
|
|
{td {[raise exn]}}
|
|
{td {[[%lwt raise exn]]}}}}
|
|
|
|
{2 Backtrace support }
|
|
|
|
If an exception is raised inside a callback called by Lwt, the backtrace
|
|
provided by OCaml will not be very useful. It will end inside the Lwt
|
|
scheduler instead of continuing into the code that started the operations that
|
|
led to the callback call. To avoid this, and get good backtraces from Lwt, use
|
|
the syntax extension. The [let%lwt] construct will properly propagate
|
|
backtraces.
|
|
|
|
As always, to get backtraces from an OCaml program, you need to either declare
|
|
the environment variable [OCAMLRUNPARAM=b] or call
|
|
[Printexc.record_backtrace true] at the start of your program, and be
|
|
sure to compile it with [-g]. Most modern build systems add [-g] by
|
|
default.
|
|
|
|
{2 [let*] syntax }
|
|
|
|
To use Lwt with the [let*] syntax introduced in OCaml 4.08, you can open
|
|
the [Syntax] module:
|
|
|
|
{[
|
|
open Syntax
|
|
]}
|
|
|
|
Then, you can write
|
|
|
|
{[
|
|
let* () = Lwt_io.printl "Hello," in
|
|
let* () = Lwt_io.printl "world!" in
|
|
Lwt.return ()
|
|
]}
|
|
|
|
{2 Other modules of the core library }
|
|
|
|
The core library contains several modules that only depend on
|
|
[Lwt]. The following naming convention is used in [Lwt]: when a
|
|
function takes as argument a function, returning a promise, that is going
|
|
to be executed sequentially, it is suffixed with “[_s]”. And
|
|
when it is going to be executed concurrently, it is suffixed with
|
|
“[_p]”. For example, in the [Lwt_list] module we have:
|
|
|
|
{[
|
|
val map_s : ('a -> 'b Lwt.t) -> 'a list -> 'b list Lwt.t
|
|
val map_p : ('a -> 'b Lwt.t) -> 'a list -> 'b list Lwt.t
|
|
]}
|
|
|
|
{3 Mutexes }
|
|
|
|
[Lwt_mutex] provides mutexes for [Lwt]. Its use is almost the
|
|
same as the [Mutex] module of the thread library shipped with
|
|
OCaml. In general, programs using [Lwt] do not need a lot of
|
|
mutexes, because callbacks run without preempting each other. They are
|
|
only useful for synchronising or sequencing complex operations spread over
|
|
multiple callback calls.
|
|
|
|
{3 Lists }
|
|
|
|
The [Lwt_list] module defines iteration and scanning functions
|
|
over lists, similar to the ones of the [List] module, but using
|
|
functions that return a promise. For example:
|
|
|
|
{[
|
|
val iter_s : ('a -> unit Lwt.t) -> 'a list -> unit Lwt.t
|
|
val iter_p : ('a -> unit Lwt.t) -> 'a list -> unit Lwt.t
|
|
]}
|
|
|
|
In [iter_s f l], [iter_s] will call f on each elements
|
|
of [l], waiting for resolution between each element. On the
|
|
contrary, in [iter_p f l], [iter_p] will call f on all
|
|
elements of [l], only then wait for all the promises to resolve.
|
|
|
|
{3 Data streams }
|
|
|
|
[Lwt] streams are used in a lot of places in [Lwt] and its
|
|
submodules. They offer a high-level interface to manipulate data flows.
|
|
|
|
A stream is an object which returns elements sequentially and
|
|
lazily. Lazily means that the source of the stream is touched only for new
|
|
elements when needed. This module contains a lot of stream
|
|
transformation, iteration, and scanning functions.
|
|
|
|
The common way of creating a stream is by using
|
|
[Lwt_stream.from] or by using [Lwt_stream.create]:
|
|
|
|
{[
|
|
val from : (unit -> 'a option Lwt.t) -> 'a Lwt_stream.t
|
|
val create : unit -> 'a Lwt_stream.t * ('a option -> unit)
|
|
]}
|
|
|
|
As for streams of the standard library, [from] takes as
|
|
argument a function which is used to create new elements.
|
|
|
|
[create] returns a function used to push new elements
|
|
into the stream and the stream which will receive them.
|
|
|
|
For example:
|
|
|
|
{[
|
|
# let stream, push = Lwt_stream.create ();;
|
|
val stream : '_a Lwt_stream.t = <abstr>
|
|
val push : '_a option -> unit = <fun>
|
|
# push (Some 1);;
|
|
- : unit = ()
|
|
# push (Some 2);;
|
|
- : unit = ()
|
|
# push (Some 3);;
|
|
- : unit = ()
|
|
# Lwt.state (Lwt_stream.next stream);;
|
|
- : int Lwt.state = Lwt.Return 1
|
|
# Lwt.state (Lwt_stream.next stream);;
|
|
- : int Lwt.state = Lwt.Return 2
|
|
# Lwt.state (Lwt_stream.next stream);;
|
|
- : int Lwt.state = Lwt.Return 3
|
|
# Lwt.state (Lwt_stream.next stream);;
|
|
- : int Lwt.state = Lwt.Sleep
|
|
]}
|
|
|
|
Note that streams are consumable. Once you take an element from a
|
|
stream, it is removed from the stream. So, if you want to iterate two times
|
|
over a stream, you may consider “cloning” it, with
|
|
[Lwt_stream.clone]. Cloned stream will return the same
|
|
elements in the same order. Consuming one will not consume the other.
|
|
For example:
|
|
|
|
{[
|
|
# let s = Lwt_stream.of_list [1; 2];;
|
|
val s : int Lwt_stream.t = <abstr>
|
|
# let s' = Lwt_stream.clone s;;
|
|
val s' : int Lwt_stream.t = <abstr>
|
|
# Lwt.state (Lwt_stream.next s);;
|
|
- : int Lwt.state = Lwt.Return 1
|
|
# Lwt.state (Lwt_stream.next s);;
|
|
- : int Lwt.state = Lwt.Return 2
|
|
# Lwt.state (Lwt_stream.next s');;
|
|
- : int Lwt.state = Lwt.Return 1
|
|
# Lwt.state (Lwt_stream.next s');;
|
|
- : int Lwt.state = Lwt.Return 2
|
|
]}
|
|
|
|
{3 Mailbox variables }
|
|
|
|
The [Lwt_mvar] module provides mailbox variables. A mailbox
|
|
variable, also called a “mvar”, is a cell which may contain 0 or 1
|
|
element. If it contains no elements, we say that the mvar is empty,
|
|
if it contains one, we say that it is full. Adding an element to a
|
|
full mvar will block until one is taken. Taking an element from an
|
|
empty mvar will block until one is added.
|
|
|
|
Mailbox variables are commonly used to pass messages between chains of
|
|
callbacks being executed concurrently.
|
|
|
|
Note that a mailbox variable can be seen as a pushable stream with a
|
|
limited memory.
|
|
|
|
{1 Running an Lwt program }
|
|
|
|
An [Lwt] computation you have created will give you something of type
|
|
[Lwt.t], a promise. However, even though you have the promise, the
|
|
computation may not have run yet, and the promise might still be pending.
|
|
|
|
For example if your program is just:
|
|
|
|
{[
|
|
let _ = Lwt_io.printl "Hello, world!"
|
|
]}
|
|
|
|
you have no guarantee that the promise for writing ["Hello, world!"]
|
|
on the terminal will be resolved before the program exits. In order
|
|
to wait for the promise to resolve, you have to call the function
|
|
[Lwt_main.run]:
|
|
|
|
{[
|
|
val Lwt_main.run : 'a Lwt.t -> 'a
|
|
]}
|
|
|
|
This function waits for the given promise to resolve and returns
|
|
its result. In fact it does more than that; it also runs the
|
|
scheduler which is responsible for making asynchronous computations progress
|
|
when events are received from the outside world.
|
|
|
|
So basically, when you write a [Lwt] program, you must call
|
|
[Lwt_main.run] on your top-level, outer-most promise. For instance:
|
|
|
|
{[
|
|
let () = Lwt_main.run (Lwt_io.printl "Hello, world!")
|
|
]}
|
|
|
|
Note that you must not make nested calls to [Lwt_main.run]. It
|
|
cannot be used anywhere else to get the result of a promise.
|
|
|
|
{1 The [lwt.unix] library }
|
|
|
|
The package [lwt.unix] contains all [Unix]-dependent
|
|
modules of [Lwt]. Among all its features, it implements Lwt-friendly,
|
|
non-blocking versions of functions of the OCaml standard and Unix libraries.
|
|
|
|
{2 Unix primitives }
|
|
|
|
Module [Lwt_unix] provides non-blocking system calls. For example,
|
|
the [Lwt] counterpart of [Unix.read] is:
|
|
|
|
{[
|
|
val read : file_descr -> string -> int -> int -> int Lwt.t
|
|
]}
|
|
|
|
[Lwt_io] provides features similar to buffered channels of
|
|
the standard library (of type [in_channel] or
|
|
[out_channel]), but with non-blocking semantics.
|
|
|
|
[Lwt_gc] allows you to register a finalizer that returns a
|
|
promise. At the end of the program, [Lwt] will wait for all these
|
|
finalizers to resolve.
|
|
|
|
{2 The Lwt scheduler }
|
|
|
|
Operations doing I/O have to be resumed when some events are received by
|
|
the process, so they can resolve their associated pending promises.
|
|
For example, when you read from a file descriptor, you
|
|
may have to wait for the file descriptor to become readable if no
|
|
data are immediately available on it.
|
|
|
|
[Lwt] contains a scheduler which is responsible for managing
|
|
multiple operations waiting for events, and restarting them when needed.
|
|
This scheduler is implemented by the two modules [Lwt_engine]
|
|
and [Lwt_main]. [Lwt_engine] is a low-level module, it
|
|
provides a signature for custom I/O multiplexers as well as two built-in
|
|
implementations, [libev] and [select]. The signature is given by the
|
|
class [Lwt_engine.t].
|
|
|
|
[libev] is used by default on Linux, because it supports any
|
|
number of file descriptors, while [select] supports only 1024. [libev]
|
|
is also much more efficient. On Windows, [Unix.select] is used because
|
|
[libev] does not work properly. The user may change the backend in use at
|
|
any time.
|
|
|
|
If you see an [Invalid_argument] error on [Unix.select], it
|
|
may be because the 1024 file descriptor limit was exceeded. Try
|
|
switching to [libev], if possible.
|
|
|
|
The engine can also be used directly in order to integrate other
|
|
libraries with [Lwt]. For example, [GTK] needs to be notified
|
|
when some events are received. If you use [Lwt] with [GTK]
|
|
you need to use the [Lwt] scheduler to monitor [GTK]
|
|
sources. This is what is done by the [Lwt_glib] library.
|
|
|
|
The [Lwt_main] module contains the {e main loop} of
|
|
[Lwt]. It is run by calling the function [Lwt_main.run]:
|
|
|
|
{[
|
|
val Lwt_main.run : 'a Lwt.t -> 'a
|
|
]}
|
|
|
|
This function continuously runs the scheduler until the promise passed
|
|
as argument is resolved.
|
|
|
|
To make sure [Lwt] is compiled with [libev] support,
|
|
tell opam that the library is available on the system by installing the
|
|
{{: http://opam.ocaml.org/packages/conf-libev/conf-libev.4-11/ }conf-libev}
|
|
package. You may get the actual library with your system package manager:
|
|
|
|
{ul
|
|
{- [brew install libev] on MacOSX,}
|
|
{- [apt-get install libev-dev] on Debian/Ubuntu, or}
|
|
{- [yum install libev-devel] on CentOS, which requires to set
|
|
[export C_INCLUDE_PATH=/usr/include/libev/] and
|
|
[export LIBRARY_PATH=/usr/lib64/] before calling
|
|
[opam install conf-libev].}}
|
|
|
|
|
|
{2 Logging }
|
|
|
|
For logging, we recommend the [logs] package from opam, which includes an
|
|
Lwt-aware module [Logs_lwt].
|
|
|
|
{1 The Lwt.react library }
|
|
|
|
The [Lwt_react] module provides helpers for using the [react]
|
|
library with [Lwt]. It extends the [React] module by adding
|
|
[Lwt]-specific functions. It can be used as a replacement of
|
|
[React]. For example you can add at the beginning of your
|
|
program:
|
|
|
|
{[
|
|
open Lwt_react
|
|
]}
|
|
|
|
instead of:
|
|
|
|
{[
|
|
open React
|
|
]}
|
|
|
|
or:
|
|
|
|
{[
|
|
module React = Lwt_react
|
|
]}
|
|
|
|
Among the added functionalities we have [Lwt_react.E.next], which
|
|
takes an event and returns a promise which will be pending until the next
|
|
occurrence of this event. For example:
|
|
|
|
{[
|
|
# open Lwt_react;;
|
|
# let event, push = E.create ();;
|
|
val event : '_a React.event = <abstr>
|
|
val push : '_a -> unit = <fun>
|
|
# let p = E.next event;;
|
|
val p : '_a Lwt.t = <abstr>
|
|
# Lwt.state p;;
|
|
- : '_a Lwt.state = Lwt.Sleep
|
|
# push 42;;
|
|
- : unit = ()
|
|
# Lwt.state p;;
|
|
- : int Lwt.state = Lwt.Return 42
|
|
]}
|
|
|
|
Another interesting feature is the ability to limit events
|
|
(resp. signals) from occurring (resp. changing) too often. For example,
|
|
suppose you are doing a program which displays something on the screen
|
|
each time a signal changes. If at some point the signal changes 1000
|
|
times per second, you probably don't want to render it 1000 times per
|
|
second. For that you use [Lwt_react.S.limit]:
|
|
|
|
{[
|
|
val limit : (unit -> unit Lwt.t) -> 'a React.signal -> 'a React.signal
|
|
]}
|
|
|
|
[Lwt_react.S.limit f signal] returns a signal which varies as
|
|
[signal] except that two consecutive updates are separated by a
|
|
call to [f]. For example if [f] returns a promise which is pending
|
|
for 0.1 seconds, then there will be no more than 10 changes per
|
|
second:
|
|
|
|
{[
|
|
open Lwt_react
|
|
|
|
let draw x =
|
|
(* Draw the screen *)
|
|
…
|
|
|
|
let () =
|
|
(* The signal we are interested in: *)
|
|
let signal = … in
|
|
|
|
(* The limited signal: *)
|
|
let signal' = S.limit (fun () -> Lwt_unix.sleep 0.1) signal in
|
|
|
|
(* Redraw the screen each time the limited signal change: *)
|
|
S.notify_p draw signal'
|
|
]}
|
|
|
|
{1 Other libraries }
|
|
|
|
{2 Parallelise computations to other cores }
|
|
|
|
If you have some compute-intensive steps within your program, you can execute
|
|
them on a separate core. You can get performance benefits from the
|
|
parallelisation. In addition, whilst your compute-intensive function is running
|
|
on a different core, your normal I/O-bound tasks continue running on the
|
|
original core.
|
|
|
|
The module {!Lwt_domain} from the [lwt_domain] package provides all the
|
|
necessary helpers to achieve this. It is based on the [Domainslib] library
|
|
and uses similar concepts (such as tasks and pools).
|
|
|
|
First, you need to create a task pool:
|
|
|
|
{[
|
|
val setup_pool : ?name:string -> int -> pool
|
|
]}
|
|
|
|
Then you simple detach the function calls to the created pool:
|
|
|
|
{[
|
|
val detach : pool -> ('a -> 'b) -> 'a -> 'b Lwt.t
|
|
]}
|
|
|
|
The returned promise resolves as soon as the function returns.
|
|
|
|
{2 Detaching computation to preemptive threads }
|
|
|
|
It may happen that you want to run a function which will take time to
|
|
compute or that you want to use a blocking function that cannot be
|
|
used in a non-blocking way. For these situations, [Lwt] allows you to
|
|
{e detach} the computation to a preemptive thread.
|
|
|
|
This is done by the module [Lwt_preemptive] of the
|
|
[lwt.unix] package which maintains a pool of system
|
|
threads. The main function is:
|
|
|
|
{[
|
|
val detach : ('a -> 'b) -> 'a -> 'b Lwt.t
|
|
]}
|
|
|
|
[detach f x] will execute [f x] in another thread and
|
|
return a pending promise, usable from the main thread, which will be fulfilled
|
|
with the result of the preemptive thread.
|
|
|
|
If you want to trigger some [Lwt] operations from your detached thread,
|
|
you have to call back into the main thread using
|
|
[Lwt_preemptive.run_in_main]:
|
|
|
|
{[
|
|
val run_in_main : (unit -> 'a Lwt.t) -> 'a
|
|
]}
|
|
|
|
This is roughly the equivalent of [Lwt.main_run], but for detached
|
|
threads, rather than for the whole process. Note that you must not call
|
|
[Lwt_main.run] in a detached thread.
|
|
|
|
{2 SSL support }
|
|
|
|
The library [Lwt_ssl] allows use of SSL asynchronously.
|
|
|
|
{1 Writing stubs using [Lwt] }
|
|
|
|
{2 Thread-safe notifications }
|
|
|
|
If you want to notify the main thread from another thread, you can use the [Lwt]
|
|
thread safe notification system. First you need to create a notification identifier
|
|
(which is just an integer) from the OCaml side using the
|
|
[Lwt_unix.make_notification] function, then you can send it from either the
|
|
OCaml code with [Lwt_unix.send_notification] function, or from the C code using
|
|
the function [lwt_unix_send_notification] (defined in [lwt_unix_.h]).
|
|
|
|
Notifications are received and processed asynchronously by the main thread.
|
|
|
|
{2 Jobs }
|
|
|
|
For operations that cannot be executed asynchronously, [Lwt]
|
|
uses a system of jobs that can be executed in a different threads. A
|
|
job is composed of three functions:
|
|
|
|
{ul
|
|
{- A stub function to create the job. It must allocate a new job
|
|
structure and fill its [worker] and [result] fields. This
|
|
function is executed in the main thread.
|
|
The return type for the OCaml external must be of the form ['a job].}
|
|
{- A function which executes the job. This one may be executed asynchronously
|
|
in another thread. This function must not:
|
|
{ul
|
|
{- access or allocate OCaml block values (tuples, strings, …),}
|
|
{- call OCaml code.}}}
|
|
{- A function which reads the result of the job, frees resources and
|
|
returns the result as an OCaml value. This function is executed in
|
|
the main thread.}}
|
|
|
|
With [Lwt < 2.3.3], 4 functions (including 3 stubs) were
|
|
required. It is still possible to use this mode but it is
|
|
deprecated.
|
|
|
|
We show as example the implementation of [Lwt_unix.mkdir]. On the C
|
|
side we have:
|
|
|
|
{@c[/**/
|
|
/* Structure holding informations for calling [mkdir]. */
|
|
struct job_mkdir {
|
|
/* Informations used by lwt.
|
|
It must be the first field of the structure. */
|
|
struct lwt_unix_job job;
|
|
/* This field store the result of the call. */
|
|
int result;
|
|
/* This field store the value of [errno] after the call. */
|
|
int errno_copy;
|
|
/* Pointer to a copy of the path parameter. */
|
|
char* path;
|
|
/* Copy of the mode parameter. */
|
|
int mode;
|
|
/* Buffer for storing the path. */
|
|
char data[];
|
|
};
|
|
|
|
/* The function calling [mkdir]. */
|
|
static void worker_mkdir(struct job_mkdir* job)
|
|
{
|
|
/* Perform the blocking call. */
|
|
job->result = mkdir(job->path, job->mode);
|
|
/* Save the value of errno. */
|
|
job->errno_copy = errno;
|
|
}
|
|
|
|
/* The function building the caml result. */
|
|
static value result_mkdir(struct job_mkdir* job)
|
|
{
|
|
/* Check for errors. */
|
|
if (job->result < 0) {
|
|
/* Save the value of errno so we can use it
|
|
once the job has been freed. */
|
|
int error = job->errno_copy;
|
|
/* Copy the contents of job->path into a caml string. */
|
|
value string_argument = caml_copy_string(job->path);
|
|
/* Free the job structure. */
|
|
lwt_unix_free_job(&job->job);
|
|
/* Raise the error. */
|
|
unix_error(error, "mkdir", string_argument);
|
|
}
|
|
/* Free the job structure. */
|
|
lwt_unix_free_job(&job->job);
|
|
/* Return the result. */
|
|
return Val_unit;
|
|
}
|
|
|
|
/* The stub creating the job structure. */
|
|
CAMLprim value lwt_unix_mkdir_job(value path, value mode)
|
|
{
|
|
/* Get the length of the path parameter. */
|
|
mlsize_t len_path = caml_string_length(path) + 1;
|
|
/* Allocate a new job. */
|
|
struct job_mkdir* job =
|
|
(struct job_mkdir*)lwt_unix_new_plus(struct job_mkdir, len_path);
|
|
/* Set the offset of the path parameter inside the job structure. */
|
|
job->path = job->data;
|
|
/* Copy the path parameter inside the job structure. */
|
|
memcpy(job->path, String_val(path), len_path);
|
|
/* Initialize function fields. */
|
|
job->job.worker = (lwt_unix_job_worker)worker_mkdir;
|
|
job->job.result = (lwt_unix_job_result)result_mkdir;
|
|
/* Copy the mode parameter. */
|
|
job->mode = Int_val(mode);
|
|
/* Wrap the structure into a caml value. */
|
|
return lwt_unix_alloc_job(&job->job);
|
|
}
|
|
]}
|
|
|
|
and on the ocaml side:
|
|
|
|
{[
|
|
(* The stub for creating the job. *)
|
|
external mkdir_job : string -> int -> unit job = "lwt_unix_mkdir_job"
|
|
|
|
(* The ocaml function. *)
|
|
let mkdir name perms = Lwt_unix.run_job (mkdir_job name perms)
|
|
]}
|