From 9a66e90a02b6cb24e63882637603cd5e4ac15df5 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 19 Mar 2013 18:09:56 +0100 Subject: [PATCH] bugfix in the scheduler --- futures.ml | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/futures.ml b/futures.ml index eedf00a6..13322751 100644 --- a/futures.ml +++ b/futures.ml @@ -66,15 +66,12 @@ module Pool = struct (* Internal function, which is run by the threads of the pool *) let serve pool limit = - (* loop, to get the next job *) - let rec poll limit = - Mutex.lock pool.mutex; - Condition.wait pool.condition pool.mutex; + (* loop, to get the next job (in locked environment) *) + let rec check limit = if Queue.is_empty pool.jobs - then begin (* caramba! try again *) - Mutex.unlock pool.mutex; - if not pool.stop then poll limit end + then wait limit (* wait for someone to add a job *) else begin + (* process one job *) let job = Queue.pop pool.jobs in Mutex.unlock pool.mutex; (* run the job *) @@ -83,12 +80,20 @@ module Pool = struct with _ -> ()); match limit with - | None -> if not pool.stop then poll limit (* I am immortal! *) + | None -> if not pool.stop then enter limit (* I am immortal! *) | Some 0 -> () (* stop, reached limit *) - | Some n -> if not pool.stop then poll (Some (n-1)) (* continue serving *) + | Some n -> if not pool.stop then enter (Some (n-1)) (* continue serving *) end + (* enter the loop *) + and enter limit = + Mutex.lock pool.mutex; + check limit + (* wait for someone to push an item on the queue *) + and wait limit = + Condition.wait pool.condition pool.mutex; + check limit in - poll limit + enter limit (** Add a thread to the pool, that will serve at most [limit] jobs *) let add_thread ?limit pool =