mirror of
https://github.com/c-cube/ocaml-containers.git
synced 2025-12-06 11:15:31 -05:00
bugfix in the scheduler
This commit is contained in:
parent
81e7ee2c04
commit
9a66e90a02
1 changed files with 15 additions and 10 deletions
25
futures.ml
25
futures.ml
|
|
@ -66,15 +66,12 @@ module Pool = struct
|
|||
|
||||
(* Internal function, which is run by the threads of the pool *)
|
||||
let serve pool limit =
|
||||
(* loop, to get the next job *)
|
||||
let rec poll limit =
|
||||
Mutex.lock pool.mutex;
|
||||
Condition.wait pool.condition pool.mutex;
|
||||
(* loop, to get the next job (in locked environment) *)
|
||||
let rec check limit =
|
||||
if Queue.is_empty pool.jobs
|
||||
then begin (* caramba! try again *)
|
||||
Mutex.unlock pool.mutex;
|
||||
if not pool.stop then poll limit end
|
||||
then wait limit (* wait for someone to add a job *)
|
||||
else begin
|
||||
(* process one job *)
|
||||
let job = Queue.pop pool.jobs in
|
||||
Mutex.unlock pool.mutex;
|
||||
(* run the job *)
|
||||
|
|
@ -83,12 +80,20 @@ module Pool = struct
|
|||
with _ ->
|
||||
());
|
||||
match limit with
|
||||
| None -> if not pool.stop then poll limit (* I am immortal! *)
|
||||
| None -> if not pool.stop then enter limit (* I am immortal! *)
|
||||
| Some 0 -> () (* stop, reached limit *)
|
||||
| Some n -> if not pool.stop then poll (Some (n-1)) (* continue serving *)
|
||||
| Some n -> if not pool.stop then enter (Some (n-1)) (* continue serving *)
|
||||
end
|
||||
(* enter the loop *)
|
||||
and enter limit =
|
||||
Mutex.lock pool.mutex;
|
||||
check limit
|
||||
(* wait for someone to push an item on the queue *)
|
||||
and wait limit =
|
||||
Condition.wait pool.condition pool.mutex;
|
||||
check limit
|
||||
in
|
||||
poll limit
|
||||
enter limit
|
||||
|
||||
(** Add a thread to the pool, that will serve at most [limit] jobs *)
|
||||
let add_thread ?limit pool =
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue