diff --git a/future.ml b/future.ml index b7b5f786..e964ddea 100644 --- a/future.ml +++ b/future.ml @@ -44,115 +44,6 @@ and 'a handler = exception SendTwice (** Exception raised when a future is evaluated several time *) -(** {2 Thread pool} *) -module Pool = struct - type t = { - mutable threads : Thread.t list; - mutable stop : bool; - size : int; - mutable max_load : int; (* number of jobs waiting before a transient thread is spawned *) - transient_lifetime : int; (* lifetime (in jobs) of a transient thread *) - jobs : (unit -> unit) Queue.t; - mutex : Mutex.t; - condition : Condition.t; - } (** A pool of threads *) - - (* TODO option to allow the pool to grow on demand? *) - - let load pool = - Mutex.lock pool.mutex; - let n = Queue.length pool.jobs in - Mutex.unlock pool.mutex; - n - - (* Internal function, which is run by the threads of the pool *) - let serve pool limit = - (* loop, to get the next job (in locked environment) *) - let rec check limit = - if Queue.is_empty pool.jobs - then wait limit (* wait for someone to add a job *) - else begin - (* process one job *) - let job = Queue.pop pool.jobs in - Mutex.unlock pool.mutex; - (* run the job *) - (try - job () - with _ -> - ()); - match limit with - | None -> if not pool.stop then enter limit (* I am immortal! *) - | Some 0 -> () (* stop, reached limit *) - | Some n -> if not pool.stop then enter (Some (n-1)) (* continue serving *) - end - (* enter the loop *) - and enter limit = - Mutex.lock pool.mutex; - check limit - (* wait for someone to push an item on the queue *) - and wait limit = - Condition.wait pool.condition pool.mutex; - check limit - in - enter limit; - (* transient thread: restore old value of pool.max_load *) - Mutex.lock pool.mutex; - pool.max_load <- pool.max_load - pool.transient_lifetime; - Mutex.unlock pool.mutex - - (** Add a thread to the pool, that will serve at most [limit] jobs *) - let add_thread ?limit pool = - let t = Thread.create (serve pool) limit in - (* transient threads are not stored *) - if limit = None - then pool.threads <- t :: pool.threads - - (** Create a pool with the given number of threads. *) - let create ?(max_load=max_int) ?(transient_lifetime=10) ~size = - let pool = { - threads = []; - stop = false; - size; - max_load; - transient_lifetime; - jobs = Queue.create (); - mutex = Mutex.create (); - condition = Condition.create (); - } in - (* start persistent threads *) - for i = 0 to size - 1 do - add_thread pool - done; - pool - - (** Schedule a function to run in the pool *) - let schedule pool f = - Mutex.lock pool.mutex; - Queue.push f pool.jobs; - (* grow set of threads, if needed *) - (if Queue.length pool.jobs > pool.max_load - then begin - add_thread ~limit:pool.transient_lifetime pool; - (* increase max load, to give the new thread some time to process jobs *) - pool.max_load <- pool.max_load + pool.transient_lifetime; - end); - Condition.signal pool.condition; (* wake up one thread *) - Mutex.unlock pool.mutex; - () - - (** Kill threads in the pool *) - let finish pool = - Mutex.lock pool.mutex; - pool.stop <- true; - Condition.broadcast pool.condition; - Mutex.unlock pool.mutex; - (* kill immortal threads *) - List.iter (fun t -> Thread.kill t) pool.threads -end - -let default_pool = Pool.create ~max_load:50 ?transient_lifetime:None ~size:3 - (** Default pool of threads (growable) *) - (** {2 MVar: a zero-or-one element thread-safe box} *) module MVar = struct @@ -242,6 +133,141 @@ module MVar = struct x end +(** {2 Thread pool} *) +module Pool = struct + type t = { + mutable stop : bool; (* indicate that threads should stop *) + mutex : Mutex.t; + jobs : job Queue.t; (* waiting jobs *) + mutable threads : waiting_thread list; (* waiting threads *) + mutable cur_size : int; + max_size : int; + timeout : float; (* idle time after which to discard threads *) + } (** Dynamic, growable thread pool *) + and job = unit -> unit + and command = + | Perform of job + | Quit + (** Command sent to a thread *) + and waiting_thread = float * command MVar.t + + (** Cleanup waiting threads. precond: pool is locked *) + let cleanup_waiting pool = + let l = pool.threads in + let now = Unix.gettimeofday () in + (* filter threads that have been waiting for too long *) + let l' = List.filter + (fun (time, box) -> + if time +. pool.timeout < now + then (MVar.put box Quit; false) + else true) + l in + pool.threads <- l' + + (** Function that the threads run. They also take a MVar to + get commands *) + let serve pool box = + (* wait for a job to come *) + let rec wait_job () = + match MVar.take box with + | Quit -> (Mutex.lock pool.mutex; quit ()) (* exit *) + | Perform job -> + run_job job + (* run the given job *) + and run_job job = + (try job () with _ -> ()); + next () (* loop *) + (* process next task *) + and next () = + Mutex.lock pool.mutex; + if pool.stop then quit () (* stop the pool *) + else if Queue.is_empty pool.jobs + then begin + let now = Unix.gettimeofday () in + (* cleanup waiting threads *) + cleanup_waiting pool; + if pool.cur_size > 1 && List.length pool.threads + 1 = pool.cur_size + then + (* all other threads are waiting, we may need to kill them later *) + (Mutex.unlock pool.mutex; delay ()) + else begin + (* add oneself to the list of waiting threads *) + pool.threads <- (now, box) :: pool.threads; + Mutex.unlock pool.mutex; + wait_job () + end + end else + let job = Queue.pop pool.jobs in + Mutex.unlock pool.mutex; + run_job job + (* delay [pool.timeout], so that in case no job is submitted we + still kill old cached threads *) + and delay () = + Thread.delay pool.timeout; + next () + (* stop the thread (assume we have pool.mutex) *) + and quit () = + pool.cur_size <- pool.cur_size - 1; + Mutex.unlock pool.mutex + in wait_job () + + let size pool = + Mutex.lock pool.mutex; + let n = pool.cur_size in + Mutex.unlock pool.mutex; + n + + (** Add a thread to the pool, starting with the first job *) + let add_thread pool job = + let box = MVar.full job in + ignore (Thread.create (serve pool) box) + + (** Create a pool with at most the given number of threads. [timeout] + is the time after which idle threads are killed. *) + let create ?(timeout=30.) ~size = + let pool = { + stop = false; + cur_size = 0; + max_size=size; + timeout; + threads = []; + jobs = Queue.create (); + mutex = Mutex.create (); + } in + pool + + (** Run the job in the given pool *) + let run pool job = + assert (not (pool.stop)); + Mutex.lock pool.mutex; + begin match pool.threads with + | [] when pool.cur_size = pool.max_size -> + (* max capacity reached, push task in queue *) + Queue.push job pool.jobs + | [] -> + (* spawn a thread for the given task *) + add_thread pool (Perform job); + pool.cur_size <- pool.cur_size + 1; + | (_,box)::l' -> + (* use the first thread *) + MVar.put box (Perform job); + pool.threads <- l'; + end; + Mutex.unlock pool.mutex + + (** Kill threads in the pool *) + let finish pool = + Mutex.lock pool.mutex; + pool.stop <- true; + (* kill waiting threads *) + List.iter (fun (_,box) -> MVar.put box Quit) pool.threads; + pool.threads <- []; + Mutex.unlock pool.mutex +end + +let default_pool = Pool.create ?timeout:None ~size:100 + (** Default pool of threads, should be ok for most uses. *) + (** {2 Basic Future functions} *) let make () = @@ -438,7 +464,7 @@ let return x = let spawn ?(pool=default_pool) f = let future = make () in (* schedule computation *) - Pool.schedule pool + Pool.run pool (fun () -> try let x = f () in diff --git a/future.mli b/future.mli index baf88344..24a3d1af 100644 --- a/future.mli +++ b/future.mli @@ -31,29 +31,6 @@ type 'a t exception SendTwice (** Exception raised when a future is evaluated several time *) -(** {2 Thread pool} *) -module Pool : sig - type t - (** A pool of threads *) - - val create : ?max_load:int -> ?transient_lifetime:int -> size:int -> t - (** Create a pool with the given number of threads. If the load goes - above the given threshold (default max_int), a new thread is spawned - only to die after having processed [transient_lifetime] jobs. *) - - val load : t -> int - (** Current number of waiting jobs *) - - val schedule : t -> (unit -> unit) -> unit - (** Schedule a function to run in the pool *) - - val finish : t -> unit - (** Kill threads in the pool *) -end - -val default_pool : Pool.t - (** Pool of threads that is used by default. Growable if needed. *) - (** {2 MVar: a zero-or-one element thread-safe box} *) module MVar : sig @@ -82,6 +59,28 @@ module MVar : sig (** Look at the value, without removing it *) end +(** {2 Thread pool} *) +module Pool : sig + type t + (** A pool of threads *) + + val create : ?timeout:float -> size:int -> t + (** Create a pool with at most the given number of threads. [timeout] + is the time after which idle threads are killed. *) + + val size : t -> int + (** Current size of the pool *) + + val run : t -> (unit -> unit) -> unit + (** Run the function in the pool *) + + val finish : t -> unit + (** Kill threads in the pool *) +end + +val default_pool : Pool.t + (** Pool of threads that is used by default. Growable if needed. *) + (** {2 Basic low-level Future functions} *) val make : unit -> 'a t