parametrize Futures.Pool with lifetime of transient threads; also give them time to

process jobs by increasing max_load temporarily
This commit is contained in:
Simon Cruanes 2013-03-20 10:54:26 +01:00
parent e35f3c61dd
commit 1d2e514d45
2 changed files with 16 additions and 9 deletions

View file

@ -50,7 +50,8 @@ module Pool = struct
mutable threads : Thread.t list; mutable threads : Thread.t list;
mutable stop : bool; mutable stop : bool;
size : int; size : int;
max_load : int; mutable max_load : int; (* number of jobs waiting before a transient thread is spawned *)
transient_lifetime : int; (* lifetime (in jobs) of a transient thread *)
jobs : (unit -> unit) Queue.t; jobs : (unit -> unit) Queue.t;
mutex : Mutex.t; mutex : Mutex.t;
condition : Condition.t; condition : Condition.t;
@ -93,7 +94,11 @@ module Pool = struct
Condition.wait pool.condition pool.mutex; Condition.wait pool.condition pool.mutex;
check limit check limit
in in
enter limit enter limit;
(* transient thread: restore old value of pool.max_load *)
Mutex.lock pool.mutex;
pool.max_load <- pool.max_load - pool.transient_lifetime;
Mutex.unlock pool.mutex
(** Add a thread to the pool, that will serve at most [limit] jobs *) (** Add a thread to the pool, that will serve at most [limit] jobs *)
let add_thread ?limit pool = let add_thread ?limit pool =
@ -103,12 +108,13 @@ module Pool = struct
then pool.threads <- t :: pool.threads then pool.threads <- t :: pool.threads
(** Create a pool with the given number of threads. *) (** Create a pool with the given number of threads. *)
let create ?(max_load=max_int) ~size = let create ?(max_load=max_int) ?(transient_lifetime=10) ~size =
let pool = { let pool = {
threads = []; threads = [];
stop = false; stop = false;
size; size;
max_load; max_load;
transient_lifetime;
jobs = Queue.create (); jobs = Queue.create ();
mutex = Mutex.create (); mutex = Mutex.create ();
condition = Condition.create (); condition = Condition.create ();
@ -119,8 +125,6 @@ module Pool = struct
done; done;
pool pool
let transient_thread_lifetime = 10
(** Schedule a function to run in the pool *) (** Schedule a function to run in the pool *)
let schedule pool f = let schedule pool f =
Mutex.lock pool.mutex; Mutex.lock pool.mutex;
@ -128,7 +132,9 @@ module Pool = struct
(* grow set of threads, if needed *) (* grow set of threads, if needed *)
(if Queue.length pool.jobs > pool.max_load (if Queue.length pool.jobs > pool.max_load
then begin then begin
add_thread ~limit:transient_thread_lifetime pool add_thread ~limit:pool.transient_lifetime pool;
(* increase max load, to give the new thread some time to process jobs *)
pool.max_load <- pool.max_load + pool.transient_lifetime;
end); end);
Condition.signal pool.condition; (* wake up one thread *) Condition.signal pool.condition; (* wake up one thread *)
Mutex.unlock pool.mutex; Mutex.unlock pool.mutex;
@ -144,7 +150,7 @@ module Pool = struct
List.iter (fun t -> Thread.kill t) pool.threads List.iter (fun t -> Thread.kill t) pool.threads
end end
let default_pool = Pool.create ~max_load:500 ~size:3 let default_pool = Pool.create ~max_load:50 ?transient_lifetime:None ~size:3
(** Default pool of threads (growable) *) (** Default pool of threads (growable) *)
(** {2 MVar: a zero-or-one element thread-safe box} *) (** {2 MVar: a zero-or-one element thread-safe box} *)

View file

@ -36,9 +36,10 @@ module Pool : sig
type t type t
(** A pool of threads *) (** A pool of threads *)
val create : ?max_load:int -> size:int -> t val create : ?max_load:int -> ?transient_lifetime:int -> size:int -> t
(** Create a pool with the given number of threads. If the load goes (** Create a pool with the given number of threads. If the load goes
above the given threshold (default max_int), a new thread is spawned. *) above the given threshold (default max_int), a new thread is spawned
only to die after having processed [transient_lifetime] jobs. *)
val load : t -> int val load : t -> int
(** Current number of waiting jobs *) (** Current number of waiting jobs *)