mirror of
https://github.com/c-cube/ocaml-containers.git
synced 2025-12-06 03:05:28 -05:00
refactor(pool): less locking, fix deadlock, more parallelism
This commit is contained in:
parent
40c05cc7e3
commit
f41887c367
1 changed files with 61 additions and 42 deletions
|
|
@ -48,7 +48,7 @@ module Make(P : PARAM) = struct
|
||||||
exn_handler = nop_;
|
exn_handler = nop_;
|
||||||
cond = Condition.create();
|
cond = Condition.create();
|
||||||
cur_size = 0;
|
cur_size = 0;
|
||||||
cur_idle = 0;
|
cur_idle = 0; (* invariant: cur_idle <= cur_size *)
|
||||||
jobs = Queue.create ();
|
jobs = Queue.create ();
|
||||||
mutex = Mutex.create ();
|
mutex = Mutex.create ();
|
||||||
}
|
}
|
||||||
|
|
@ -79,61 +79,76 @@ module Make(P : PARAM) = struct
|
||||||
(* thread: seek what to do next (including dying).
|
(* thread: seek what to do next (including dying).
|
||||||
Assumes the pool is locked. *)
|
Assumes the pool is locked. *)
|
||||||
let get_next_ pool =
|
let get_next_ pool =
|
||||||
Printf.printf "get_next (cur=%d, idle=%d, stop=%B)\n%!" pool.cur_size pool.cur_idle pool.stop;
|
(*Printf.printf "get_next (cur=%d, idle=%d, stop=%B)\n%!" pool.cur_size pool.cur_idle pool.stop;*)
|
||||||
if pool.stop then (
|
if pool.stop then (
|
||||||
decr_size_ pool;
|
|
||||||
Die
|
Die
|
||||||
) else (
|
) else (
|
||||||
match Queue.take pool.jobs with
|
match Queue.take pool.jobs with
|
||||||
| exception Queue.Empty ->
|
| exception Queue.Empty ->
|
||||||
if pool.cur_idle > 0 then (
|
if pool.cur_idle > 0 then (
|
||||||
(* die: there's already at least one idle thread *)
|
(* die: there's already at least one idle thread *)
|
||||||
Printf.printf "DIE (idle>0)\n%!";
|
(*Printf.printf "DIE (idle>0)\n%!";*)
|
||||||
decr_size_ pool;
|
|
||||||
Die
|
Die
|
||||||
) else (
|
) else (
|
||||||
incr_idle_ pool;
|
(*Printf.printf "WAIT\n%!";*)
|
||||||
Wait
|
Wait
|
||||||
)
|
)
|
||||||
| job ->
|
| job ->
|
||||||
Process job
|
Process job
|
||||||
)
|
)
|
||||||
|
|
||||||
|
(* heuristic criterion for starting a new thread. *)
|
||||||
|
let[@inline] can_start_thread_ p = p.cur_size < P.max_size
|
||||||
|
|
||||||
(* Thread: entry point. They seek jobs in the queue *)
|
(* Thread: entry point. They seek jobs in the queue *)
|
||||||
let rec serve pool =
|
let rec serve pool =
|
||||||
assert (pool.cur_size <= P.max_size);
|
assert (pool.cur_size <= P.max_size);
|
||||||
assert (pool.cur_size > 0);
|
assert (pool.cur_size > 0);
|
||||||
let cmd = with_lock_ pool get_next_ in
|
Mutex.lock pool.mutex;
|
||||||
run_cmd cmd
|
let cmd = get_next_ pool in
|
||||||
|
maybe_start_runner_ pool;
|
||||||
|
run_cmd pool cmd
|
||||||
|
|
||||||
(* run a command *)
|
(* run a command *)
|
||||||
and run_cmd = function
|
and run_cmd pool = function
|
||||||
| Die -> ()
|
| Die ->
|
||||||
|
decr_size_ pool;
|
||||||
|
Mutex.unlock pool.mutex;
|
||||||
|
()
|
||||||
| Wait ->
|
| Wait ->
|
||||||
Printf.printf "WAIT\n%!";
|
(*Printf.printf "WAIT\n%!";*)
|
||||||
with_lock_ pool
|
incr_idle_ pool;
|
||||||
(fun p ->
|
Condition.wait pool.cond pool.mutex;
|
||||||
Condition.wait p.cond p.mutex;
|
decr_idle_ pool;
|
||||||
decr_idle_ pool);
|
Mutex.unlock pool.mutex;
|
||||||
serve pool
|
serve pool
|
||||||
| Process (Job1 (f, x)) ->
|
| Process job ->
|
||||||
|
Mutex.unlock pool.mutex;
|
||||||
|
run_job pool job
|
||||||
|
|
||||||
|
(* execute the job *)
|
||||||
|
and run_job pool job =
|
||||||
|
match job with
|
||||||
|
| Job1 (f, x) ->
|
||||||
begin try ignore (f x) with e -> pool.exn_handler e end; serve pool
|
begin try ignore (f x) with e -> pool.exn_handler e end; serve pool
|
||||||
| Process (Job2 (f, x, y)) ->
|
| Job2 (f, x, y) ->
|
||||||
begin try ignore (f x y) with e -> pool.exn_handler e end; serve pool
|
begin try ignore (f x y) with e -> pool.exn_handler e end; serve pool
|
||||||
| Process (Job3 (f, x, y, z)) ->
|
| Job3 (f, x, y, z) ->
|
||||||
begin try ignore (f x y z) with e -> pool.exn_handler e end; serve pool
|
begin try ignore (f x y z) with e -> pool.exn_handler e end; serve pool
|
||||||
| Process (Job4 (f, x, y, z, w)) ->
|
| Job4 (f, x, y, z, w) ->
|
||||||
begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool
|
begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool
|
||||||
|
|
||||||
(* create a new worker thread *)
|
and maybe_start_runner_ pool : unit =
|
||||||
let launch_worker_ pool =
|
if not (Queue.is_empty pool.jobs) && can_start_thread_ pool then (
|
||||||
with_lock_ pool
|
(* there's room for another thread to start processing jobs,
|
||||||
(fun pool ->
|
starting with [Queue.pop pool.jobs] *)
|
||||||
incr_size_ pool;
|
let job' = Queue.pop pool.jobs in
|
||||||
ignore (Thread.create serve pool))
|
launch_worker_on_ pool job';
|
||||||
|
)
|
||||||
|
|
||||||
(* heuristic criterion for starting a new thread. *)
|
and[@inline] launch_worker_on_ pool job =
|
||||||
let can_start_thread_ p = p.cur_size < P.max_size
|
incr_size_ pool;
|
||||||
|
ignore (Thread.create (run_job pool) job)
|
||||||
|
|
||||||
let run_job job =
|
let run_job job =
|
||||||
(* acquire lock and push job in queue, or start thread directly
|
(* acquire lock and push job in queue, or start thread directly
|
||||||
|
|
@ -142,21 +157,26 @@ module Make(P : PARAM) = struct
|
||||||
(fun pool ->
|
(fun pool ->
|
||||||
if pool.stop then raise Stopped;
|
if pool.stop then raise Stopped;
|
||||||
if Queue.is_empty pool.jobs && can_start_thread_ pool && pool.cur_idle = 0 then (
|
if Queue.is_empty pool.jobs && can_start_thread_ pool && pool.cur_idle = 0 then (
|
||||||
(* create the thread now, on [job], as it will not break order of
|
(* create the thread now, on [job], since no other job in
|
||||||
jobs. We do not want to wait for the busy threads to do our task
|
the queue takes precedence.
|
||||||
if we are allowed to spawn a new thread. *)
|
We do not want to wait for the busy threads to do our task
|
||||||
incr_size_ pool;
|
if we are allowed to spawn a new thread, and no thread is
|
||||||
ignore (Thread.create run_cmd (Process job))
|
just idle waiting for new jobs. *)
|
||||||
) else (
|
launch_worker_on_ pool job;
|
||||||
(* cannot start thread, push and wait for some worker to pick it up *)
|
) else if pool.cur_idle > 0 then (
|
||||||
|
(* at least one idle thread, wake it up *)
|
||||||
Queue.push job pool.jobs;
|
Queue.push job pool.jobs;
|
||||||
Condition.broadcast pool.cond; (* wake up some worker, if any *)
|
Condition.broadcast pool.cond; (* wake up some worker *)
|
||||||
(* might want to process in the background, if all threads are busy *)
|
) else (
|
||||||
if not (Queue.is_empty pool.jobs)
|
Queue.push job pool.jobs;
|
||||||
&& pool.cur_idle = 0
|
|
||||||
&& can_start_thread_ pool then (
|
(* we might still be able to start another thread to help the
|
||||||
launch_worker_ pool;
|
active ones (none is idle). This thread is not necessarily
|
||||||
)
|
going to process [job], but rather the first job in the queue *)
|
||||||
|
if can_start_thread_ pool then (
|
||||||
|
let job' = Queue.pop pool.jobs in
|
||||||
|
launch_worker_on_ pool job';
|
||||||
|
);
|
||||||
))
|
))
|
||||||
|
|
||||||
(* run the function on the argument in the given pool *)
|
(* run the function on the argument in the given pool *)
|
||||||
|
|
@ -174,7 +194,6 @@ module Make(P : PARAM) = struct
|
||||||
|
|
||||||
(* kill threads in the pool *)
|
(* kill threads in the pool *)
|
||||||
let stop () =
|
let stop () =
|
||||||
Printf.printf "STOP\n%!";
|
|
||||||
with_lock_ pool
|
with_lock_ pool
|
||||||
(fun p ->
|
(fun p ->
|
||||||
p.stop <- true;
|
p.stop <- true;
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue