Futures.MVar module (not implemented) for sharing values;

handlers to attach on a future to react upon failure/success/termination;
Futures.map implemented using handlers, flatMap also uses handlers (more lightweight);
growable thread pool (transient threads are added when the number of waiting jobs is too high)
This commit is contained in:
Simon Cruanes 2013-03-19 16:11:38 +01:00
parent 23029332df
commit 9708c4b01c
2 changed files with 192 additions and 23 deletions

View file

@ -27,6 +27,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
type 'a t = { type 'a t = {
mutable content : 'a result; mutable content : 'a result;
mutable handlers : 'a handler list; (* handlers *)
mutex : Mutex.t; mutex : Mutex.t;
condition : Condition.t; condition : Condition.t;
} (** A future value of type 'a *) } (** A future value of type 'a *)
@ -35,6 +36,10 @@ and 'a result =
| Success of 'a | Success of 'a
| Failure of exn | Failure of exn
(** Result of a computation *) (** Result of a computation *)
and 'a handler =
| OnSuccess of ('a -> unit)
| OnFailure of (exn -> unit)
| OnFinish of (unit -> unit)
exception SendTwice exception SendTwice
(** Exception raised when a future is evaluated several time *) (** Exception raised when a future is evaluated several time *)
@ -44,6 +49,7 @@ module Pool = struct
type t = { type t = {
mutable threads : Thread.t list; mutable threads : Thread.t list;
size : int; size : int;
max_load : int;
jobs : (unit -> unit) Queue.t; jobs : (unit -> unit) Queue.t;
mutex : Mutex.t; mutex : Mutex.t;
condition : Condition.t; condition : Condition.t;
@ -51,16 +57,22 @@ module Pool = struct
(* TODO option to allow the pool to grow on demand? *) (* TODO option to allow the pool to grow on demand? *)
let load pool =
Mutex.lock pool.mutex;
let n = Queue.length pool.jobs in
Mutex.unlock pool.mutex;
n
(* Internal function, which is run by the threads of the pool *) (* Internal function, which is run by the threads of the pool *)
let serve pool = let serve pool limit =
(* loop, to get the next job *) (* loop, to get the next job *)
let rec poll () = let rec poll limit =
Mutex.lock pool.mutex; Mutex.lock pool.mutex;
Condition.wait pool.condition pool.mutex; Condition.wait pool.condition pool.mutex;
if Queue.is_empty pool.jobs if Queue.is_empty pool.jobs
then begin (* caramba! try again *) then begin (* caramba! try again *)
Mutex.unlock pool.mutex; Mutex.unlock pool.mutex;
poll () end poll limit end
else begin else begin
let job = Queue.pop pool.jobs in let job = Queue.pop pool.jobs in
Mutex.unlock pool.mutex; Mutex.unlock pool.mutex;
@ -69,30 +81,48 @@ module Pool = struct
job () job ()
with _ -> with _ ->
()); ());
poll () (* recurse *) match limit with
| None -> poll limit (* I am immortal! *)
| Some 0 -> () (* stop, reached limit *)
| Some n -> poll (Some (n-1)) (* continue serving *)
end end
in in
poll () poll limit
(** Add a thread to the pool, that will serve at most [limit] jobs *)
let add_thread ?limit pool =
let t = Thread.create (serve pool) limit in
(* transient threads are not stored *)
if limit = None
then pool.threads <- t :: pool.threads
(** Create a pool with the given number of threads. *) (** Create a pool with the given number of threads. *)
let create ~size = let create ?(max_load=max_int) ~size =
let pool = { let pool = {
threads = []; threads = [];
size; size;
max_load;
jobs = Queue.create (); jobs = Queue.create ();
mutex = Mutex.create (); mutex = Mutex.create ();
condition = Condition.create (); condition = Condition.create ();
} in } in
(* start threads *) (* start persistent threads *)
for i = 0 to size - 1 do for i = 0 to size - 1 do
pool.threads <- (Thread.create serve pool) :: pool.threads; add_thread pool
done; done;
pool pool
let transient_thread_lifetime = 10
(** Schedule a function to run in the pool *) (** Schedule a function to run in the pool *)
let schedule pool f = let schedule pool f =
Mutex.lock pool.mutex; Mutex.lock pool.mutex;
Queue.push f pool.jobs; Queue.push f pool.jobs;
(* grow set of threads, if needed *)
(if Queue.length pool.jobs > pool.max_load
then begin
add_thread ~limit:transient_thread_lifetime pool
end);
Condition.signal pool.condition; (* wake up one thread *) Condition.signal pool.condition; (* wake up one thread *)
Mutex.unlock pool.mutex; Mutex.unlock pool.mutex;
() ()
@ -102,13 +132,62 @@ module Pool = struct
List.iter (fun t -> Thread.kill t) pool.threads List.iter (fun t -> Thread.kill t) pool.threads
end end
let default_pool = Pool.create 3 let default_pool = Pool.create ~max_load:500 ~size:3
(** Default pool of threads *) (** Default pool of threads (growable) *)
(** {2 MVar: a zero-or-one element thread-safe box} *)
module MVar = struct
type 'a t = {
mutable content : 'a option;
mutex : Mutex.t;
condition : Condition.t;
}
(** Create an empty box *)
let empty () = {
content = None;
mutex = Mutex.create ();
condition = Condition.create ();
}
(** Create a full box *)
let full x = {
content = Some x;
mutex = Mutex.create ();
condition = Condition.create ();
}
(** Is the box currently empty? *)
let is_empty box =
Mutex.lock box.mutex;
let ans = box.content <> None in
Mutex.unlock box.mutex;
ans
(** Take value out of the box. Wait if necessary *)
let take box =
failwith "not implemented"
(** Put a value in the box. Waits if the box is already empty *)
let put box x =
failwith "not impleemnted"
(** Use given function to atomically update content, and return
the previous value and the new one *)
let update box f =
failwith "not implemented"
(** Look at the value, without removing it *)
let peek box =
failwith "not implemented"
end
(** {2 Basic Future functions} *) (** {2 Basic Future functions} *)
let make () = let make () =
{ content = NotKnown; { content = NotKnown;
handlers = [];
mutex = Mutex.create (); mutex = Mutex.create ();
condition = Condition.create (); condition = Condition.create ();
} }
@ -139,6 +218,12 @@ let send future x =
| NotKnown -> (* set content and signal *) | NotKnown -> (* set content and signal *)
future.content <- Success x; future.content <- Success x;
Condition.broadcast future.condition; Condition.broadcast future.condition;
List.iter
(function
| OnSuccess f -> f x
| OnFinish f -> f ()
| OnFailure _ -> ())
future.handlers;
Mutex.unlock future.mutex Mutex.unlock future.mutex
| _ -> | _ ->
Mutex.unlock future.mutex; Mutex.unlock future.mutex;
@ -150,6 +235,12 @@ let fail future e =
| NotKnown -> (* set content and signal *) | NotKnown -> (* set content and signal *)
future.content <- Failure e; future.content <- Failure e;
Condition.broadcast future.condition; Condition.broadcast future.condition;
List.iter
(function
| OnSuccess _ -> ()
| OnFinish f -> f ()
| OnFailure f -> f e)
future.handlers;
Mutex.unlock future.mutex Mutex.unlock future.mutex
| _ -> | _ ->
Mutex.unlock future.mutex; Mutex.unlock future.mutex;
@ -167,25 +258,53 @@ let is_done future =
(** {2 Combinators *) (** {2 Combinators *)
let flatMap ?(pool=default_pool) f future = let on_success future k =
Mutex.lock future.mutex;
future.handlers <- (OnSuccess k) :: future.handlers;
Mutex.unlock future.mutex
let on_failure future k =
Mutex.lock future.mutex;
future.handlers <- (OnFailure k) :: future.handlers;
Mutex.unlock future.mutex
let on_finish future k =
Mutex.lock future.mutex;
future.handlers <- (OnFinish k) :: future.handlers;
Mutex.unlock future.mutex
let flatMap f future =
let future' = make () in let future' = make () in
(* schedule the task that waits for [future] to return [x], then (* if [future] succeeds with [x], we spawn a new job to compute [f x] *)
computes [f x] and send the result to [future'] *) on_success future
Pool.schedule pool (fun x ->
(fun () ->
try try
let x = get future in
let future'' = f x in let future'' = f x in
let y = get future'' in on_success future'' (fun x -> send future' x);
send future' y on_failure future'' (fun e -> fail future' e);
with e -> (* failure occurred *) with e ->
fail future' e); fail future' e);
on_failure future
(fun e -> fail future' e);
future'
let sequence futures =
failwith "not implemented"
let choose futures =
failwith "not implemented"
let map f future =
let future' = make () in
on_success future (fun x -> let y = f x in send future' y);
on_failure future (fun e -> fail future' e);
future' future'
(** {2 Future constructors} *) (** {2 Future constructors} *)
let return x = let return x =
{ content = Success x; { content = Success x;
handlers = [];
mutex = Mutex.create (); mutex = Mutex.create ();
condition = Condition.create (); condition = Condition.create ();
} }

View file

@ -36,8 +36,12 @@ module Pool : sig
type t type t
(** A pool of threads *) (** A pool of threads *)
val create : size:int -> t val create : ?max_load:int -> size:int -> t
(** Create a pool with the given number of threads. *) (** Create a pool with the given number of threads. If the load goes
above the given threshold (default max_int), a new thread is spawned. *)
val load : t -> int
(** Current number of waiting jobs *)
val schedule : t -> (unit -> unit) -> unit val schedule : t -> (unit -> unit) -> unit
(** Schedule a function to run in the pool *) (** Schedule a function to run in the pool *)
@ -47,7 +51,35 @@ module Pool : sig
end end
val default_pool : Pool.t val default_pool : Pool.t
(** Pool of threads that is used by default *) (** Pool of threads that is used by default. Growable if needed. *)
(** {2 MVar: a zero-or-one element thread-safe box} *)
module MVar : sig
type 'a t
val empty : unit -> 'a t
(** Create an empty box *)
val full : 'a -> 'a t
(** Create a full box *)
val is_empty : _ t -> bool
(** Is the box currently empty? *)
val take : 'a t -> 'a
(** Take value out of the box. Wait if necessary *)
val put : 'a t -> 'a -> unit
(** Put a value in the box. Waits if the box is already empty *)
val update : 'a t -> ('a -> 'a) -> 'a * 'a
(** Use given function to atomically update content, and return
the previous value and the new one *)
val peek : 'a t -> 'a
(** Look at the value, without removing it *)
end
(** {2 Basic low-level Future functions} *) (** {2 Basic low-level Future functions} *)
@ -70,9 +102,27 @@ val is_done : 'a t -> bool
(** {2 Combinators *) (** {2 Combinators *)
val flatMap : ?pool:Pool.t -> ('a -> 'b t) -> 'a t -> 'b t val on_success : 'a t -> ('a -> unit) -> unit
(** Attach a handler to be called upon success *)
val on_failure : _ t -> (exn -> unit) -> unit
(** Attach a handler to be called upon failure *)
val on_finish : _ t -> (unit -> unit) -> unit
(** Attach a handler to be called when the future is evaluated *)
val flatMap : ('a -> 'b t) -> 'a t -> 'b t
(** Monadic combination of futures *) (** Monadic combination of futures *)
val sequence : 'a t list -> 'a list t
(** Future that waits for all previous sequences to terminate *)
val choose : 'a t list -> 'a t
(** Choose among those futures (the first to terminate) *)
val map : ('a -> 'b) -> 'a t -> 'b t
(** Maps the value inside the future *)
(** {2 Future constructors} *) (** {2 Future constructors} *)
val return : 'a -> 'a t val return : 'a -> 'a t