From 0e2650200855289f16062bc69eba4eeab3faf719 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 17 Jan 2018 20:06:32 -0600 Subject: [PATCH] wip: fix behavior of CCPool when `min_size>0` problem is a deadlock occurs when some threads die (too early?) when P.min_size>0 --- src/threads/CCPool.ml | 101 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 90 insertions(+), 11 deletions(-) diff --git a/src/threads/CCPool.ml b/src/threads/CCPool.ml index af8ab6ba..05603848 100644 --- a/src/threads/CCPool.ml +++ b/src/threads/CCPool.ml @@ -20,8 +20,9 @@ exception Stopped (*$inject module P = Make(struct let min_size = 0 let max_size = 30 end) + module P2 = Make(struct let min_size = 1 let max_size = 15 end) module Fut = P.Fut - open Fut.Infix + module Fut2 = P2.Fut *) (** {2 Thread pool} *) @@ -81,14 +82,16 @@ module Make(P : PARAM) = struct (* thread: seek what to do next (including dying). Assumes the pool is locked. *) let get_next_ pool = - if pool.stop - || (Queue.is_empty pool.jobs && pool.cur_size > P.min_size) then ( + (*Printf.printf "get_next (cur=%d, min=%d, idle=%d, stop=%B)\n%!" pool.cur_size P.min_size pool.cur_idle pool.stop;*) + if pool.stop || (Queue.is_empty pool.jobs && pool.cur_size > P.min_size) then ( (* die: the thread would be idle otherwise *) assert (pool.cur_size > 0); + (*Printf.printf "time… to die (cur=%d, min=%d, idle=%d, stop=%B)\n%!" pool.cur_size P.min_size pool.cur_idle pool.stop;*) decr_size_ pool; Die - ) else if Queue.is_empty pool.jobs then Wait - else ( + ) else if Queue.is_empty pool.jobs then ( + Wait + ) else ( let job = Queue.pop pool.jobs in Process job ) @@ -120,7 +123,11 @@ module Make(P : PARAM) = struct begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool (* create a new worker thread *) - let launch_worker_ pool = ignore (Thread.create serve pool) + let launch_worker_ pool = + with_lock_ pool + (fun pool -> + incr_size_ pool; + ignore (Thread.create serve pool)) (* launch the minimum required number of threads *) let () = @@ -137,8 +144,7 @@ module Make(P : PARAM) = struct with_lock_ pool (fun pool -> if pool.stop then raise Stopped; - if Queue.is_empty pool.jobs && can_start_thread_ pool && pool.cur_idle = 0 - then ( + if Queue.is_empty pool.jobs && can_start_thread_ pool && pool.cur_idle = 0 then ( (* create the thread now, on [job], as it will not break order of jobs. We do not want to wait for the busy threads to do our task if we are allowed to spawn a new thread. *) @@ -147,10 +153,11 @@ module Make(P : PARAM) = struct ) else ( (* cannot start thread, push and wait for some worker to pick it up *) Queue.push job pool.jobs; - Condition.signal pool.cond; (* wake up some worker, if any *) + Condition.broadcast pool.cond; (* wake up some worker, if any *) (* might want to process in the background, if all threads are busy *) - if pool.cur_idle = 0 && can_start_thread_ pool then ( - incr_size_ pool; + if not (Queue.is_empty pool.jobs) + && pool.cur_idle = 0 + && can_start_thread_ pool then ( launch_worker_ pool; ) )) @@ -283,6 +290,23 @@ module Make(P : PARAM) = struct [ 10; 300; ] *) + (*$R + List.iter + (fun n -> + let l = Sequence.(1 -- n) |> Sequence.to_list in + let l = List.rev_map (fun i -> + Fut2.make + (fun () -> + Thread.delay 0.01; + 1 + )) l in + let l' = List.map Fut2.get l in + OUnit.assert_equal n (List.fold_left (+) 0 l'); + ) + [ 10; 300; ] + *) + + let make2 f x y = let cell = create_cell() in run4 run_and_set2 cell f x y; @@ -379,6 +403,13 @@ module Make(P : PARAM) = struct OUnit.assert_equal 1 (Fut.get c) *) + (*$R + let a = Fut2.make (fun () -> 1) in + let b = Fut2.map (fun x -> x+1) a in + let c = Fut2.map (fun x -> x-1) b in + OUnit.assert_equal 1 (Fut2.get c) + *) + let app_ ~async f x = match f, x with | Return f, Return x -> if async @@ -525,6 +556,43 @@ module Make(P : PARAM) = struct OUnit.assert_equal 10_000 (List.length l'); *) + (*$R + let l = CCList.(1 -- 50) in + let l' = l + |> List.map + (fun x -> Fut2.make (fun () -> Thread.delay 0.1; x*10)) + |> Fut2.sequence_l + |> Fut2.map (List.fold_left (+) 0) + in + let expected = List.fold_left (fun acc x -> acc + 10 * x) 0 l in + OUnit.assert_equal expected (Fut2.get l') + *) + + (*$R + let l = CCList.(1 -- 50) in + let l' = l + |> List.map + (fun x -> Fut2.make (fun () -> Thread.delay 0.1; if x = 5 then raise Exit; x)) + |> Fut2.sequence_l + |> Fut2.map (List.fold_left (+) 0) + in + OUnit.assert_raises Exit (fun () -> Fut2.get l') + *) + + (*$R + let rec fib x = if x<2 then 1 else fib (x-1)+fib(x-2) in + let l = + CCList.(1--10_000) + |> List.rev_map + (fun x-> Fut2.make (fun () -> Thread.yield(); fib (x mod 20))) + |> Fut2.(map_l (fun x->x>|= fun x->x+1)) + in + OUnit.assert_bool "not done" (Fut2.state l = Waiting); + let l' = Fut2.get l in + OUnit.assert_equal 10_000 (List.length l'); + *) + + let choose_ : type a. a t array_or_list -> a t = fun aol -> @@ -560,6 +628,17 @@ module Make(P : PARAM) = struct OUnit.assert_bool "some_parallelism" (stop -. start < float_of_int n *. pause); *) + (*$R + let start = Unix.gettimeofday () in + let pause = 0.2 and n = 10 in + let l = CCList.(1 -- n) + |> List.map (fun _ -> Fut2.make (fun () -> Thread.delay pause)) + in + List.iter Fut2.get l; + let stop = Unix.gettimeofday () in + OUnit.assert_bool "some_parallelism" (stop -. start < float_of_int n *. pause); + *) + module Infix = struct let (>>=) x f = flat_map f x let (>>) a f = and_then a f