refactored heavily CCFuture (much simpler, cleaner, basic API and thread pool)

This commit is contained in:
Simon Cruanes 2015-01-25 23:08:31 +01:00
parent 4a66f1cdb7
commit feec3bf46f
5 changed files with 539 additions and 722 deletions

21
_oasis
View file

@ -121,7 +121,7 @@ Library "containers_misc"
Library "containers_thread"
Path: src/threads/
Modules: CCFuture
Modules: CCFuture, CCLock
FindlibName: thread
FindlibParent: containers
Build$: flag(thread)
@ -172,22 +172,19 @@ Executable bench_hash
MainIs: bench_hash.ml
BuildDepends: containers, containers.misc
Executable test_levenshtein
Path: tests/
Install: false
CompiledObject: native
Build$: flag(tests)
MainIs: test_levenshtein.ml
BuildDepends: containers, qcheck, containers.string
Executable test_threads
Executable run_test_future
Path: tests/threads/
Install: false
CompiledObject: best
Build$: flag(tests) && flag(thread)
MainIs: test_future.ml
MainIs: run_test_future.ml
BuildDepends: containers, threads, sequence, oUnit, containers.thread
Test future
Command: echo "run test future" ; ./run_test_future.native
TestTools: run_test_future
Run$: flag(tests) && flag(thread)
PreBuildCommand: make qtest-gen
Executable run_qtest
@ -208,7 +205,7 @@ Executable run_tests
MainIs: run_tests.ml
Build$: flag(tests) && flag(misc)
BuildDepends: containers, containers.data, oUnit, sequence, gen,
qcheck, containers.misc
qcheck, containers.misc, containers.string
Test all
Command: make test-all

File diff suppressed because it is too large Load diff

View file

@ -25,77 +25,40 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
(** {1 Futures for concurrency} *)
type 'a state =
| Done of 'a
| Waiting
| Failed of exn
type 'a t
(** A future value of type 'a *)
exception SendTwice
(** Exception raised when a future is evaluated several time *)
type 'a future = 'a t
(** {2 MVar: a zero-or-one element thread-safe box} *)
(** {2 Constructors} *)
module MVar : sig
type 'a t
val return : 'a -> 'a t
(** Future that is already computed *)
val empty : unit -> 'a t
(** Create an empty box *)
val fail : exn -> 'a t
(** Future that fails immediately *)
val full : 'a -> 'a t
(** Create a full box *)
val make : (unit -> 'a) -> 'a t
(** Create a future, representing a value that will be computed by
the function. If the function raises, the future will fail. *)
val is_empty : _ t -> bool
(** Is the box currently empty? *)
val make1 : ('a -> 'b) -> 'a -> 'b t
val make2 : ('a -> 'b -> 'c) -> 'a -> 'b -> 'c t
val take : 'a t -> 'a
(** Take value out of the box. Wait if necessary *)
val put : 'a t -> 'a -> unit
(** Put a value in the box. Waits if the box is already empty *)
val update : 'a t -> ('a -> 'a) -> 'a * 'a
(** Use given function to atomically update content, and return
the previous value and the new one *)
val peek : 'a t -> 'a
(** Look at the value, without removing it *)
end
(** {2 Thread pool} *)
module Pool : sig
type t
(** A pool of threads *)
val create : ?timeout:float -> size:int -> t
(** Create a pool with at most the given number of threads. [timeout]
is the time after which idle threads are killed. *)
val size : t -> int
(** Current size of the pool *)
val run : t -> (unit -> unit) -> unit
(** Run the function in the pool *)
val finish : t -> unit
(** Kill threads in the pool *)
end
val default_pool : Pool.t
(** Pool of threads that is used by default. Growable if needed. *)
(** {2 Basic low-level Future functions} *)
val make : Pool.t -> 'a t
(** Create a future, representing a value that is not known yet. *)
(** {2 Basics} *)
val get : 'a t -> 'a
(** Blocking get: wait for the future to be evaluated, and get the value,
or the exception that failed the future is returned *)
or the exception that failed the future is returned.
@raise e if the exception failed with e *)
val send : 'a t -> 'a -> unit
(** Send a result to the future. Will raise SendTwice if [send] has
already been called on this future before *)
val fail : 'a t -> exn -> unit
(** Fail the future by raising an exception inside it *)
val state : 'a t -> 'a state
(** State of the future *)
val is_done : 'a t -> bool
(** Is the future evaluated (success/failure)? *)
@ -108,64 +71,68 @@ val on_success : 'a t -> ('a -> unit) -> unit
val on_failure : _ t -> (exn -> unit) -> unit
(** Attach a handler to be called upon failure *)
val on_finish : _ t -> (unit -> unit) -> unit
val on_finish : 'a t -> ('a state -> unit) -> unit
(** Attach a handler to be called when the future is evaluated *)
val flatMap : ?pool:Pool.t -> ('a -> 'b t) -> 'a t -> 'b t
val flat_map : ('a -> 'b t) -> 'a t -> 'b t
(** Monadic combination of futures *)
val andThen : ?pool:Pool.t -> 'a t -> (unit -> 'b t) -> 'b t
val and_then : 'a t -> (unit -> 'b t) -> 'b t
(** Wait for the first future to succeed, then launch the second *)
val sequence : ?pool:Pool.t -> 'a t list -> 'a list t
(** Future that waits for all previous sequences to terminate *)
val sequence : 'a t list -> 'a list t
(** Future that waits for all previous sequences to terminate. If any future
in the list fails, [sequence l] fails too. *)
val choose : ?pool:Pool.t -> 'a t list -> 'a t
(** Choose among those futures (the first to terminate) *)
val choose : 'a t list -> 'a t
(** Choose among those futures (the first to terminate). Behaves like
the first future that terminates, by failing if the future fails *)
val map : ?pool:Pool.t -> ('a -> 'b) -> 'a t -> 'b t
(** Maps the value inside the future *)
val map : ('a -> 'b) -> 'a t -> 'b t
(** Maps the value inside the future. The function doesn't run in its
own task; if it can take time, use {!flat_map} *)
(** {2 Future constructors} *)
(** {2 Helpers} *)
val return : 'a -> 'a t
(** Future that is already computed *)
val spawn : ?pool:Pool.t -> (unit -> 'a) -> 'a t
(** Spawn a thread that wraps the given computation *)
val spawn_process : ?pool:Pool.t -> ?stdin:string -> cmd:string ->
val spawn_process : ?stdin:string -> cmd:string -> unit ->
(int * string * string) t
(** Spawn a sub-process with the given command [cmd] (and possibly input);
returns a future containing (returncode, stdout, stderr) *)
val sleep : ?pool:Pool.t -> float -> unit t
val sleep : float -> unit t
(** Future that returns with success in the given amount of seconds *)
(** {2 Event timer} *)
module Timer : sig
type t
(** A scheduler for events *)
(** A scheduler for events. It runs in its own thread. *)
val create : ?pool:Pool.t -> unit -> t
(** A timer that runs tasks in the given thread pool *)
val create : unit -> t
(** A new timer. *)
val schedule_at : t -> float -> (unit -> unit) -> unit
(** [schedule_at s t act] will run [act] at the Unix echo [t] *)
val after : t -> float -> unit future
(** Create a future that waits for the given number of seconds, then
awakens with [()] *)
val schedule_in : t -> float -> (unit -> unit) -> unit
(** [schedule_in s d act] will run [act] in [d] seconds *)
val at : t -> float -> unit future
(** Create a future that evaluates to [()] at the given Unix timestamp *)
val stop : t -> unit
(** Stop the given timer, cancelling pending tasks *)
end
module Infix : sig
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
val (>>) : 'a t -> (unit -> 'b t) -> 'b t
val (>|=) : 'a t -> ('a -> 'b) -> 'b t
end
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
val (>>) : 'a t -> (unit -> 'b t) -> 'b t
val (>|=) : 'a t -> ('a -> 'b) -> 'b t
(** {2 Low level} *)
val stop_pool : unit -> unit
(** Stop the thread pool *)

View file

@ -0,0 +1,87 @@
(** Test Future *)
open OUnit
module Future = CCFuture
open Future.Infix
let test_parallel n () =
let l = Sequence.(1 -- n) |> Sequence.to_list in
let l = List.map (fun i ->
Future.make
(fun () ->
Thread.delay 0.1;
1
)) l in
let l' = List.map Future.get l in
OUnit.assert_equal n (List.fold_left (+) 0 l');
()
let test_map () =
let a = Future.make (fun () -> 1) in
let b = Future.map (fun x -> x+1) a in
let c = Future.map (fun x -> x-1) b in
OUnit.assert_equal 1 (Future.get c)
let test_sequence_ok () =
let l = CCList.(1 -- 10) in
let l' = l
|> List.map
(fun x -> Future.make (fun () -> Thread.delay 0.2; x*10))
|> Future.sequence
|> Future.map (List.fold_left (+) 0)
in
let expected = List.fold_left (fun acc x -> acc + 10 * x) 0 l in
OUnit.assert_equal expected (Future.get l')
let test_sequence_fail () =
let l = CCList.(1 -- 10) in
let l' = l
|> List.map
(fun x -> Future.make (fun () -> Thread.delay 0.2; if x = 5 then raise Exit; x))
|> Future.sequence
|> Future.map (List.fold_left (+) 0)
in
OUnit.assert_raises Exit (fun () -> Future.get l')
let test_time () =
let start = Unix.gettimeofday () in
let l = CCList.(1 -- 10)
|> List.map (fun _ -> Future.make (fun () -> Thread.delay 0.5))
in
List.iter Future.get l;
let stop = Unix.gettimeofday () in
OUnit.assert_bool "some_parallelism" (stop -. start < 10. *. 0.5);
()
let test_timer () =
let timer = Future.Timer.create () in
let n = CCLock.create 1 in
let get = Future.make (fun () -> Thread.delay 0.7; CCLock.get n) in
let _ =
Future.Timer.after timer 0.5
>>= fun () -> CCLock.update n (fun x -> x+2); Future.return()
in
let _ =
Future.Timer.after timer 0.2
>>= fun () -> CCLock.update n (fun x -> x * 4); Future.return()
in
OUnit.assert_equal 6 (Future.get get);
()
let suite =
"test_future" >:::
[
"test_parallel_10" >:: test_parallel 10;
"test_parallel_300" >:: test_parallel 300;
"test_time" >:: test_time;
"test_map" >:: test_map;
"test_sequence_ok" >:: test_sequence_ok;
"test_sequence_fail" >:: test_sequence_fail;
"test_timer" >:: test_timer;
]
let () =
let _ = OUnit.run_test_tt_main suite in
()

View file

@ -1,52 +0,0 @@
(** Test Future *)
open OUnit
module Future = CCFuture
let test_mvar () =
let box = Future.MVar.empty () in
let f = Future.spawn (fun () -> Future.MVar.take box + 1) in
Thread.delay 0.1;
OUnit.assert_bool "still waiting" (not (Future.is_done f));
Future.MVar.put box 1;
OUnit.assert_equal 2 (Future.get f);
()
let test_parallel () =
let l = Sequence.(1 -- 300) in
let l = Sequence.map (fun _ -> Future.spawn (fun () -> Thread.delay 0.1; 1)) l in
let l = Sequence.to_list l in
let l' = List.map Future.get l in
OUnit.assert_equal 300 (List.fold_left (+) 0 l');
()
let test_time () =
let start = Unix.gettimeofday () in
let f1 = Future.spawn (fun () -> Thread.delay 0.5) in
let f2 = Future.spawn (fun () -> Thread.delay 0.5) in
Future.get f1;
Future.get f2;
let stop = Unix.gettimeofday () in
OUnit.assert_bool "parallelism" (stop -. start < 0.75);
()
let test_timer () =
let timer = Future.Timer.create () in
let mvar = Future.MVar.full 1 in
Future.Timer.schedule_in timer 0.5
(fun () -> ignore (Future.MVar.update mvar (fun x -> x + 2)));
Future.Timer.schedule_in timer 0.2
(fun () -> ignore (Future.MVar.update mvar (fun x -> x * 4)));
Thread.delay 0.7;
OUnit.assert_equal 6 (Future.MVar.peek mvar);
()
let suite =
"test_future" >:::
[ "test_mvar" >:: test_mvar;
"test_parallel" >:: test_parallel;
"test_time" >:: test_time;
"test_timer" >:: test_timer;
]