update of the thread pool. It is growable, up to a certain limit, and threads are cached;

idle threads are killed after some (soft) time limit;
This commit is contained in:
Simon Cruanes 2013-03-20 17:53:49 +01:00
parent 43f926dcb3
commit 174f10e4f2
2 changed files with 158 additions and 133 deletions

246
future.ml
View file

@ -44,115 +44,6 @@ and 'a handler =
exception SendTwice
(** Exception raised when a future is evaluated several time *)
(** {2 Thread pool} *)
module Pool = struct
type t = {
mutable threads : Thread.t list;
mutable stop : bool;
size : int;
mutable max_load : int; (* number of jobs waiting before a transient thread is spawned *)
transient_lifetime : int; (* lifetime (in jobs) of a transient thread *)
jobs : (unit -> unit) Queue.t;
mutex : Mutex.t;
condition : Condition.t;
} (** A pool of threads *)
(* TODO option to allow the pool to grow on demand? *)
let load pool =
Mutex.lock pool.mutex;
let n = Queue.length pool.jobs in
Mutex.unlock pool.mutex;
n
(* Internal function, which is run by the threads of the pool *)
let serve pool limit =
(* loop, to get the next job (in locked environment) *)
let rec check limit =
if Queue.is_empty pool.jobs
then wait limit (* wait for someone to add a job *)
else begin
(* process one job *)
let job = Queue.pop pool.jobs in
Mutex.unlock pool.mutex;
(* run the job *)
(try
job ()
with _ ->
());
match limit with
| None -> if not pool.stop then enter limit (* I am immortal! *)
| Some 0 -> () (* stop, reached limit *)
| Some n -> if not pool.stop then enter (Some (n-1)) (* continue serving *)
end
(* enter the loop *)
and enter limit =
Mutex.lock pool.mutex;
check limit
(* wait for someone to push an item on the queue *)
and wait limit =
Condition.wait pool.condition pool.mutex;
check limit
in
enter limit;
(* transient thread: restore old value of pool.max_load *)
Mutex.lock pool.mutex;
pool.max_load <- pool.max_load - pool.transient_lifetime;
Mutex.unlock pool.mutex
(** Add a thread to the pool, that will serve at most [limit] jobs *)
let add_thread ?limit pool =
let t = Thread.create (serve pool) limit in
(* transient threads are not stored *)
if limit = None
then pool.threads <- t :: pool.threads
(** Create a pool with the given number of threads. *)
let create ?(max_load=max_int) ?(transient_lifetime=10) ~size =
let pool = {
threads = [];
stop = false;
size;
max_load;
transient_lifetime;
jobs = Queue.create ();
mutex = Mutex.create ();
condition = Condition.create ();
} in
(* start persistent threads *)
for i = 0 to size - 1 do
add_thread pool
done;
pool
(** Schedule a function to run in the pool *)
let schedule pool f =
Mutex.lock pool.mutex;
Queue.push f pool.jobs;
(* grow set of threads, if needed *)
(if Queue.length pool.jobs > pool.max_load
then begin
add_thread ~limit:pool.transient_lifetime pool;
(* increase max load, to give the new thread some time to process jobs *)
pool.max_load <- pool.max_load + pool.transient_lifetime;
end);
Condition.signal pool.condition; (* wake up one thread *)
Mutex.unlock pool.mutex;
()
(** Kill threads in the pool *)
let finish pool =
Mutex.lock pool.mutex;
pool.stop <- true;
Condition.broadcast pool.condition;
Mutex.unlock pool.mutex;
(* kill immortal threads *)
List.iter (fun t -> Thread.kill t) pool.threads
end
let default_pool = Pool.create ~max_load:50 ?transient_lifetime:None ~size:3
(** Default pool of threads (growable) *)
(** {2 MVar: a zero-or-one element thread-safe box} *)
module MVar = struct
@ -242,6 +133,141 @@ module MVar = struct
x
end
(** {2 Thread pool} *)
module Pool = struct
type t = {
mutable stop : bool; (* indicate that threads should stop *)
mutex : Mutex.t;
jobs : job Queue.t; (* waiting jobs *)
mutable threads : waiting_thread list; (* waiting threads *)
mutable cur_size : int;
max_size : int;
timeout : float; (* idle time after which to discard threads *)
} (** Dynamic, growable thread pool *)
and job = unit -> unit
and command =
| Perform of job
| Quit
(** Command sent to a thread *)
and waiting_thread = float * command MVar.t
(** Cleanup waiting threads. precond: pool is locked *)
let cleanup_waiting pool =
let l = pool.threads in
let now = Unix.gettimeofday () in
(* filter threads that have been waiting for too long *)
let l' = List.filter
(fun (time, box) ->
if time +. pool.timeout < now
then (MVar.put box Quit; false)
else true)
l in
pool.threads <- l'
(** Function that the threads run. They also take a MVar to
get commands *)
let serve pool box =
(* wait for a job to come *)
let rec wait_job () =
match MVar.take box with
| Quit -> (Mutex.lock pool.mutex; quit ()) (* exit *)
| Perform job ->
run_job job
(* run the given job *)
and run_job job =
(try job () with _ -> ());
next () (* loop *)
(* process next task *)
and next () =
Mutex.lock pool.mutex;
if pool.stop then quit () (* stop the pool *)
else if Queue.is_empty pool.jobs
then begin
let now = Unix.gettimeofday () in
(* cleanup waiting threads *)
cleanup_waiting pool;
if pool.cur_size > 1 && List.length pool.threads + 1 = pool.cur_size
then
(* all other threads are waiting, we may need to kill them later *)
(Mutex.unlock pool.mutex; delay ())
else begin
(* add oneself to the list of waiting threads *)
pool.threads <- (now, box) :: pool.threads;
Mutex.unlock pool.mutex;
wait_job ()
end
end else
let job = Queue.pop pool.jobs in
Mutex.unlock pool.mutex;
run_job job
(* delay [pool.timeout], so that in case no job is submitted we
still kill old cached threads *)
and delay () =
Thread.delay pool.timeout;
next ()
(* stop the thread (assume we have pool.mutex) *)
and quit () =
pool.cur_size <- pool.cur_size - 1;
Mutex.unlock pool.mutex
in wait_job ()
let size pool =
Mutex.lock pool.mutex;
let n = pool.cur_size in
Mutex.unlock pool.mutex;
n
(** Add a thread to the pool, starting with the first job *)
let add_thread pool job =
let box = MVar.full job in
ignore (Thread.create (serve pool) box)
(** Create a pool with at most the given number of threads. [timeout]
is the time after which idle threads are killed. *)
let create ?(timeout=30.) ~size =
let pool = {
stop = false;
cur_size = 0;
max_size=size;
timeout;
threads = [];
jobs = Queue.create ();
mutex = Mutex.create ();
} in
pool
(** Run the job in the given pool *)
let run pool job =
assert (not (pool.stop));
Mutex.lock pool.mutex;
begin match pool.threads with
| [] when pool.cur_size = pool.max_size ->
(* max capacity reached, push task in queue *)
Queue.push job pool.jobs
| [] ->
(* spawn a thread for the given task *)
add_thread pool (Perform job);
pool.cur_size <- pool.cur_size + 1;
| (_,box)::l' ->
(* use the first thread *)
MVar.put box (Perform job);
pool.threads <- l';
end;
Mutex.unlock pool.mutex
(** Kill threads in the pool *)
let finish pool =
Mutex.lock pool.mutex;
pool.stop <- true;
(* kill waiting threads *)
List.iter (fun (_,box) -> MVar.put box Quit) pool.threads;
pool.threads <- [];
Mutex.unlock pool.mutex
end
let default_pool = Pool.create ?timeout:None ~size:100
(** Default pool of threads, should be ok for most uses. *)
(** {2 Basic Future functions} *)
let make () =
@ -438,7 +464,7 @@ let return x =
let spawn ?(pool=default_pool) f =
let future = make () in
(* schedule computation *)
Pool.schedule pool
Pool.run pool
(fun () ->
try
let x = f () in

View file

@ -31,29 +31,6 @@ type 'a t
exception SendTwice
(** Exception raised when a future is evaluated several time *)
(** {2 Thread pool} *)
module Pool : sig
type t
(** A pool of threads *)
val create : ?max_load:int -> ?transient_lifetime:int -> size:int -> t
(** Create a pool with the given number of threads. If the load goes
above the given threshold (default max_int), a new thread is spawned
only to die after having processed [transient_lifetime] jobs. *)
val load : t -> int
(** Current number of waiting jobs *)
val schedule : t -> (unit -> unit) -> unit
(** Schedule a function to run in the pool *)
val finish : t -> unit
(** Kill threads in the pool *)
end
val default_pool : Pool.t
(** Pool of threads that is used by default. Growable if needed. *)
(** {2 MVar: a zero-or-one element thread-safe box} *)
module MVar : sig
@ -82,6 +59,28 @@ module MVar : sig
(** Look at the value, without removing it *)
end
(** {2 Thread pool} *)
module Pool : sig
type t
(** A pool of threads *)
val create : ?timeout:float -> size:int -> t
(** Create a pool with at most the given number of threads. [timeout]
is the time after which idle threads are killed. *)
val size : t -> int
(** Current size of the pool *)
val run : t -> (unit -> unit) -> unit
(** Run the function in the pool *)
val finish : t -> unit
(** Kill threads in the pool *)
end
val default_pool : Pool.t
(** Pool of threads that is used by default. Growable if needed. *)
(** {2 Basic low-level Future functions} *)
val make : unit -> 'a t