added Futures.sleep function;

Futures.MVar implemented;
Futures.sequence implemented (quite subtle)
This commit is contained in:
Simon Cruanes 2013-03-19 17:12:26 +01:00
parent a4c8e3e408
commit c8b7b7dfed
2 changed files with 124 additions and 15 deletions

View file

@ -48,6 +48,7 @@ exception SendTwice
module Pool = struct
type t = {
mutable threads : Thread.t list;
mutable stop : bool;
size : int;
max_load : int;
jobs : (unit -> unit) Queue.t;
@ -72,7 +73,7 @@ module Pool = struct
if Queue.is_empty pool.jobs
then begin (* caramba! try again *)
Mutex.unlock pool.mutex;
poll limit end
if not pool.stop then poll limit end
else begin
let job = Queue.pop pool.jobs in
Mutex.unlock pool.mutex;
@ -82,9 +83,9 @@ module Pool = struct
with _ ->
());
match limit with
| None -> poll limit (* I am immortal! *)
| None -> if not pool.stop then poll limit (* I am immortal! *)
| Some 0 -> () (* stop, reached limit *)
| Some n -> poll (Some (n-1)) (* continue serving *)
| Some n -> if not pool.stop then poll (Some (n-1)) (* continue serving *)
end
in
poll limit
@ -100,6 +101,7 @@ module Pool = struct
let create ?(max_load=max_int) ~size =
let pool = {
threads = [];
stop = false;
size;
max_load;
jobs = Queue.create ();
@ -129,6 +131,11 @@ module Pool = struct
(** Kill threads in the pool *)
let finish pool =
Mutex.lock pool.mutex;
pool.stop <- true;
Condition.broadcast pool.condition;
Mutex.unlock pool.mutex;
(* kill immortal threads *)
List.iter (fun t -> Thread.kill t) pool.threads
end
@ -141,21 +148,24 @@ module MVar = struct
type 'a t = {
mutable content : 'a option;
mutex : Mutex.t;
condition : Condition.t;
on_take : Condition.t; (* signal that a value was removed (empty) *)
on_put : Condition.t; (* signal that a value was added (full) *)
}
(** Create an empty box *)
let empty () = {
content = None;
mutex = Mutex.create ();
condition = Condition.create ();
on_take = Condition.create ();
on_put = Condition.create ();
}
(** Create a full box *)
let full x = {
content = Some x;
mutex = Mutex.create ();
condition = Condition.create ();
on_take = Condition.create ();
on_put = Condition.create ();
}
(** Is the box currently empty? *)
@ -165,22 +175,60 @@ module MVar = struct
Mutex.unlock box.mutex;
ans
(* assuming we have a lock on given box, wait it gets a value and return it *)
let rec wait_put box =
match box.content with
| None ->
Condition.wait box.on_put box.mutex;
wait_put box (* try again *)
| Some x -> x
(* same, but waits for the box to become empty *)
let rec wait_take box =
match box.content with
| None -> () (* empty! *)
| Some _ ->
Condition.wait box.on_take box.mutex;
wait_take box (* try again *)
(** Take value out of the box. Wait if necessary *)
let take box =
failwith "not implemented"
Mutex.lock box.mutex;
let x = wait_put box in
box.content <- None;
Condition.broadcast box.on_take;
Mutex.unlock box.mutex;
x
(** Put a value in the box. Waits if the box is already empty *)
(** Put a value in the box. Waits if the box is already full *)
let put box x =
failwith "not impleemnted"
Mutex.lock box.mutex;
wait_take box;
box.content <- Some x;
Condition.broadcast box.on_put;
Mutex.unlock box.mutex
(** Use given function to atomically update content, and return
the previous value and the new one *)
let update box f =
failwith "not implemented"
Mutex.lock box.mutex;
let x = wait_put box in
try
let y = f x in
box.content <- Some y;
Condition.broadcast box.on_put; (* signal write *)
Mutex.unlock box.mutex;
x, y
with e ->
Mutex.unlock box.mutex;
raise e
(** Look at the value, without removing it *)
let peek box =
failwith "not implemented"
Mutex.lock box.mutex;
let x = wait_put box in
Mutex.unlock box.mutex;
x
end
(** {2 Basic Future functions} *)
@ -260,17 +308,28 @@ let is_done future =
let on_success future k =
Mutex.lock future.mutex;
future.handlers <- (OnSuccess k) :: future.handlers;
(match future.content with
| NotKnown ->
future.handlers <- (OnSuccess k) :: future.handlers; (* wait *)
| Success x -> k x
| Failure _ -> ());
Mutex.unlock future.mutex
let on_failure future k =
Mutex.lock future.mutex;
future.handlers <- (OnFailure k) :: future.handlers;
(match future.content with
| NotKnown ->
future.handlers <- (OnFailure k) :: future.handlers; (* wait *)
| Success _ -> ()
| Failure e -> k e);
Mutex.unlock future.mutex
let on_finish future k =
Mutex.lock future.mutex;
future.handlers <- (OnFinish k) :: future.handlers;
(match future.content with
| NotKnown ->
future.handlers <- (OnFinish k) :: future.handlers; (* wait *)
| Success _ | Failure _ -> k ());
Mutex.unlock future.mutex
let flatMap f future =
@ -288,8 +347,46 @@ let flatMap f future =
(fun e -> fail future' e);
future'
let andThen future f =
flatMap (fun _ -> f ()) future
let sequence futures =
failwith "not implemented"
let a = Array.of_list futures in
let n = Array.length a in
let results = Array.make n NotKnown in
let future' = make () in
(* state: how many remain to finish *)
let count = MVar.full (Array.length a) in
(* when all futures returned, collect results for future' *)
let check_at_end () =
let l = Array.to_list results in
try
let l = List.map
(function
| Success x -> x
| Failure e -> raise e
| NotKnown -> assert false)
l in
send future' l
with e ->
fail future' e
in
(* function called whenever a future succeeds *)
let one_succeeded i x =
results.(i) <- Success x;
let _, n = MVar.update count (fun x -> x-1) in
if n = 0 then check_at_end ()
and one_failed i e =
results.(i) <- Failure e;
let _, n = MVar.update count (fun x -> x-1) in
if n = 0 then check_at_end ()
in
(* wait for all to succeed or fail *)
for i = 0 to Array.length a - 1 do
on_success a.(i) (one_succeeded i);
on_failure a.(i) (one_failed i);
done;
future'
let choose futures =
failwith "not implemented"
@ -356,6 +453,11 @@ let spawn_process ?(pool=default_pool) ?(stdin="") ~cmd =
| Unix.WSTOPPED i -> i in
(returncode, out', err'))
let sleep ?(pool=default_pool) time =
spawn ~pool
(fun () -> Unix.sleep time; ())
module Infix = struct
let (>>=) x f = flatMap f x
let (>>) a f = andThen a f
end

View file

@ -114,6 +114,9 @@ val on_finish : _ t -> (unit -> unit) -> unit
val flatMap : ('a -> 'b t) -> 'a t -> 'b t
(** Monadic combination of futures *)
val andThen : 'a t -> (unit -> 'b t) -> 'b t
(** Wait for the first future to succeed, then launch the second *)
val sequence : 'a t list -> 'a list t
(** Future that waits for all previous sequences to terminate *)
@ -136,6 +139,10 @@ val spawn_process : ?pool:Pool.t -> ?stdin:string -> cmd:string ->
(** Spawn a sub-process with the given command [cmd] (and possibly input);
returns a future containing (returncode, stdout, stderr) *)
val sleep : ?pool:Pool.t -> int -> unit t
(** Future that returns with success in the given amount of seconds *)
module Infix : sig
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
val (>>) : 'a t -> (unit -> 'b t) -> 'b t
end