diff --git a/futures.ml b/futures.ml index eedf00a6..13322751 100644 --- a/futures.ml +++ b/futures.ml @@ -66,15 +66,12 @@ module Pool = struct (* Internal function, which is run by the threads of the pool *) let serve pool limit = - (* loop, to get the next job *) - let rec poll limit = - Mutex.lock pool.mutex; - Condition.wait pool.condition pool.mutex; + (* loop, to get the next job (in locked environment) *) + let rec check limit = if Queue.is_empty pool.jobs - then begin (* caramba! try again *) - Mutex.unlock pool.mutex; - if not pool.stop then poll limit end + then wait limit (* wait for someone to add a job *) else begin + (* process one job *) let job = Queue.pop pool.jobs in Mutex.unlock pool.mutex; (* run the job *) @@ -83,12 +80,20 @@ module Pool = struct with _ -> ()); match limit with - | None -> if not pool.stop then poll limit (* I am immortal! *) + | None -> if not pool.stop then enter limit (* I am immortal! *) | Some 0 -> () (* stop, reached limit *) - | Some n -> if not pool.stop then poll (Some (n-1)) (* continue serving *) + | Some n -> if not pool.stop then enter (Some (n-1)) (* continue serving *) end + (* enter the loop *) + and enter limit = + Mutex.lock pool.mutex; + check limit + (* wait for someone to push an item on the queue *) + and wait limit = + Condition.wait pool.condition pool.mutex; + check limit in - poll limit + enter limit (** Add a thread to the pool, that will serve at most [limit] jobs *) let add_thread ?limit pool =