diff --git a/futures.ml b/futures.ml index 13322751..b7b5f786 100644 --- a/futures.ml +++ b/futures.ml @@ -50,7 +50,8 @@ module Pool = struct mutable threads : Thread.t list; mutable stop : bool; size : int; - max_load : int; + mutable max_load : int; (* number of jobs waiting before a transient thread is spawned *) + transient_lifetime : int; (* lifetime (in jobs) of a transient thread *) jobs : (unit -> unit) Queue.t; mutex : Mutex.t; condition : Condition.t; @@ -93,7 +94,11 @@ module Pool = struct Condition.wait pool.condition pool.mutex; check limit in - enter limit + enter limit; + (* transient thread: restore old value of pool.max_load *) + Mutex.lock pool.mutex; + pool.max_load <- pool.max_load - pool.transient_lifetime; + Mutex.unlock pool.mutex (** Add a thread to the pool, that will serve at most [limit] jobs *) let add_thread ?limit pool = @@ -103,12 +108,13 @@ module Pool = struct then pool.threads <- t :: pool.threads (** Create a pool with the given number of threads. *) - let create ?(max_load=max_int) ~size = + let create ?(max_load=max_int) ?(transient_lifetime=10) ~size = let pool = { threads = []; stop = false; size; max_load; + transient_lifetime; jobs = Queue.create (); mutex = Mutex.create (); condition = Condition.create (); @@ -119,8 +125,6 @@ module Pool = struct done; pool - let transient_thread_lifetime = 10 - (** Schedule a function to run in the pool *) let schedule pool f = Mutex.lock pool.mutex; @@ -128,7 +132,9 @@ module Pool = struct (* grow set of threads, if needed *) (if Queue.length pool.jobs > pool.max_load then begin - add_thread ~limit:transient_thread_lifetime pool + add_thread ~limit:pool.transient_lifetime pool; + (* increase max load, to give the new thread some time to process jobs *) + pool.max_load <- pool.max_load + pool.transient_lifetime; end); Condition.signal pool.condition; (* wake up one thread *) Mutex.unlock pool.mutex; @@ -144,7 +150,7 @@ module Pool = struct List.iter (fun t -> Thread.kill t) pool.threads end -let default_pool = Pool.create ~max_load:500 ~size:3 +let default_pool = Pool.create ~max_load:50 ?transient_lifetime:None ~size:3 (** Default pool of threads (growable) *) (** {2 MVar: a zero-or-one element thread-safe box} *) diff --git a/futures.mli b/futures.mli index 520dfaa2..baf88344 100644 --- a/futures.mli +++ b/futures.mli @@ -36,9 +36,10 @@ module Pool : sig type t (** A pool of threads *) - val create : ?max_load:int -> size:int -> t + val create : ?max_load:int -> ?transient_lifetime:int -> size:int -> t (** Create a pool with the given number of threads. If the load goes - above the given threshold (default max_int), a new thread is spawned. *) + above the given threshold (default max_int), a new thread is spawned + only to die after having processed [transient_lifetime] jobs. *) val load : t -> int (** Current number of waiting jobs *)