ocaml-containers/src/threads/CCPool.ml
2017-09-13 18:39:14 +02:00

568 lines
16 KiB
OCaml

(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Thread Pool, and Futures} *)
type +'a state =
| Done of 'a
| Waiting
| Failed of exn
module type PARAM = sig
val min_size : int
(** Minimum number of threads in the pool *)
val max_size : int
(** Maximum number of threads in the pool *)
end
exception Stopped
(*$inject
module P = Make(struct let min_size = 0 let max_size = 30 end)
module Fut = P.Fut
open Fut.Infix
*)
(** {2 Thread pool} *)
module Make(P : PARAM) = struct
type job =
| Job1 : ('a -> _) * 'a -> job
| Job2 : ('a -> 'b -> _) * 'a * 'b -> job
| Job3 : ('a -> 'b -> 'c -> _) * 'a * 'b * 'c -> job
| Job4 : ('a -> 'b -> 'c -> 'd -> _) * 'a * 'b * 'c * 'd -> job
type t = {
mutable stop : bool; (* indicate that threads should stop *)
mutable exn_handler: (exn -> unit);
mutex : Mutex.t;
cond : Condition.t;
jobs : job Queue.t; (* waiting jobs *)
mutable cur_size : int; (* total number of threads *)
mutable cur_idle : int; (* number of idle threads *)
} (** Dynamic, growable thread pool *)
let nop_ _ = ()
(* singleton pool *)
let pool = {
stop = false;
exn_handler = nop_;
cond = Condition.create();
cur_size = 0;
cur_idle = 0;
jobs = Queue.create ();
mutex = Mutex.create ();
}
let set_exn_handler f = pool.exn_handler <- f
let with_lock_ t f =
Mutex.lock t.mutex;
try
let x = f t in
Mutex.unlock t.mutex;
x
with e ->
Mutex.unlock t.mutex;
raise e
let incr_size_ p = p.cur_size <- p.cur_size + 1
let decr_size_ p = p.cur_size <- p.cur_size - 1
let incr_idle_ p = p.cur_idle <- p.cur_idle + 1
let decr_idle_ p = p.cur_idle <- p.cur_idle - 1
(* next thing a thread should do *)
type command =
| Process of job
| Wait (* wait on condition *)
| Die (* thread has no work to do *)
(* thread: seek what to do next (including dying).
Assumes the pool is locked. *)
let get_next_ pool =
if pool.stop
|| (Queue.is_empty pool.jobs && pool.cur_size > P.min_size) then (
(* die: the thread would be idle otherwise *)
assert (pool.cur_size > 0);
decr_size_ pool;
Die
) else if Queue.is_empty pool.jobs then Wait
else (
let job = Queue.pop pool.jobs in
Process job
)
(* Thread: entry point. They seek jobs in the queue *)
let rec serve pool =
assert (pool.cur_size <= P.max_size);
assert (pool.cur_size > 0);
let cmd = with_lock_ pool get_next_ in
run_cmd cmd
(* run a command *)
and run_cmd = function
| Die -> ()
| Wait ->
with_lock_ pool
(fun p ->
incr_idle_ pool;
Condition.wait p.cond p.mutex;
decr_idle_ pool);
serve pool
| Process (Job1 (f, x)) ->
begin try ignore (f x) with e -> pool.exn_handler e end; serve pool
| Process (Job2 (f, x, y)) ->
begin try ignore (f x y) with e -> pool.exn_handler e end; serve pool
| Process (Job3 (f, x, y, z)) ->
begin try ignore (f x y z) with e -> pool.exn_handler e end; serve pool
| Process (Job4 (f, x, y, z, w)) ->
begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool
(* create a new worker thread *)
let launch_worker_ pool = ignore (Thread.create serve pool)
(* launch the minimum required number of threads *)
let () =
if P.min_size < 0 then invalid_arg "CCPool: min_size must be >= 0";
if P.min_size > P.max_size then invalid_arg "CCPool: min_size must be <= max_size";
for _i = 1 to P.min_size do launch_worker_ pool done
(* heuristic criterion for starting a new thread. *)
let can_start_thread_ p = p.cur_size < P.max_size
let run_job job =
(* acquire lock and push job in queue, or start thread directly
if the queue is empty *)
with_lock_ pool
(fun pool ->
if pool.stop then raise Stopped;
if Queue.is_empty pool.jobs && can_start_thread_ pool && pool.cur_idle = 0
then (
(* create the thread now, on [job], as it will not break order of
jobs. We do not want to wait for the busy threads to do our task
if we are allowed to spawn a new thread. *)
incr_size_ pool;
ignore (Thread.create run_cmd (Process job))
) else (
(* cannot start thread, push and wait for some worker to pick it up *)
Queue.push job pool.jobs;
Condition.signal pool.cond; (* wake up some worker, if any *)
(* might want to process in the background, if all threads are busy *)
if pool.cur_idle = 0 && can_start_thread_ pool then (
incr_size_ pool;
launch_worker_ pool;
)
))
(* run the function on the argument in the given pool *)
let run1 f x = run_job (Job1 (f, x))
let run f = run1 f ()
let run2 f x y = run_job (Job2 (f, x, y))
let run3 f x y z = run_job (Job3 (f, x, y, z))
let run4 f x y z w = run_job (Job4 (f, x, y, z, w))
let active () = not pool.stop
(* kill threads in the pool *)
let stop () =
with_lock_ pool
(fun p ->
p.stop <- true;
Queue.clear p.jobs)
(* stop threads if pool is GC'd *)
let () = Gc.finalise (fun _ -> stop ()) pool
(** {6 Futures} *)
module Fut = struct
type 'a handler = 'a state -> unit
(** A proper future, with a delayed computation *)
type 'a cell = {
mutable state : 'a state;
mutable handlers : 'a handler list; (* handlers *)
f_mutex : Mutex.t;
condition : Condition.t;
}
(** A future value of type 'a *)
type 'a t =
| Return of 'a
| FailNow of exn
| Run of 'a cell
type 'a future = 'a t
(** {2 Basic Future functions} *)
let return x = Return x
let fail e = FailNow e
let create_cell () = {
state = Waiting;
handlers = [];
f_mutex = Mutex.create ();
condition = Condition.create ();
}
let with_lock_ cell f =
Mutex.lock cell.f_mutex;
try
let x = f cell in
Mutex.unlock cell.f_mutex;
x
with e ->
Mutex.unlock cell.f_mutex;
raise e
(* TODO: exception handler for handler errors *)
let set_done_ cell x =
with_lock_ cell
(fun cell -> match cell.state with
| Waiting -> (* set state and signal *)
cell.state <- Done x;
Condition.broadcast cell.condition;
List.iter
(fun f -> try f cell.state with e -> pool.exn_handler e)
cell.handlers
| _ -> assert false)
let set_fail_ cell e =
with_lock_ cell
(fun cell -> match cell.state with
| Waiting ->
cell.state <- Failed e;
Condition.broadcast cell.condition;
List.iter
(fun f -> try f cell.state with e -> pool.exn_handler e)
cell.handlers
| _ -> assert false)
(* calls [f x], and put result or exception in [cell] *)
let run_and_set1 cell f x =
try
let y = f x in
set_done_ cell y
with e ->
set_fail_ cell e
let run_and_set2 cell f x y =
try
let z = f x y in
set_done_ cell z
with e ->
set_fail_ cell e
let make1 f x =
let cell = create_cell() in
run3 run_and_set1 cell f x;
Run cell
let make f = make1 f ()
(*$R
List.iter
(fun n ->
let l = Sequence.(1 -- n) |> Sequence.to_list in
let l = List.rev_map (fun i ->
Fut.make
(fun () ->
Thread.delay 0.01;
1
)) l in
let l' = List.map Fut.get l in
OUnit.assert_equal n (List.fold_left (+) 0 l');
)
[ 10; 300; ]
*)
let make2 f x y =
let cell = create_cell() in
run4 run_and_set2 cell f x y;
Run cell
let get = function
| Return x -> x
| FailNow e -> raise e
| Run cell ->
let rec get_ cell = match cell.state with
| Waiting ->
Condition.wait cell.condition cell.f_mutex; (* wait *)
get_ cell
| Done x -> x
| Failed e -> raise e
in
with_lock_ cell get_
(* access the result without locking *)
let get_nolock_ = function
| Return x
| Run {state=Done x; _} -> x
| FailNow _
| Run {state=(Failed _ | Waiting); _} -> assert false
let state = function
| Return x -> Done x
| FailNow e -> Failed e
| Run cell ->
with_lock_ cell (fun cell -> cell.state)
let is_done = function
| Return _
| FailNow _ -> true
| Run cell ->
with_lock_ cell (fun c -> c.state <> Waiting)
(** {2 Combinators *)
let add_handler_ cell f =
with_lock_ cell
(fun cell -> match cell.state with
| Waiting -> cell.handlers <- f :: cell.handlers
| Done _ | Failed _ -> f cell.state)
let on_finish fut k = match fut with
| Return x -> k (Done x)
| FailNow e -> k (Failed e)
| Run cell -> add_handler_ cell k
let on_success fut k =
on_finish fut
(function
| Done x -> k x
| _ -> ())
let on_failure fut k =
on_finish fut
(function
| Failed e -> k e
| _ -> ())
let map_cell_ ~async f cell ~into:cell' =
add_handler_ cell
(function
| Done x ->
if async
then run3 run_and_set1 cell' f x
else run_and_set1 cell' f x
| Failed e -> set_fail_ cell' e
| Waiting -> assert false);
Run cell'
let map_ ~async f fut = match fut with
| Return x ->
if async
then make1 f x
else Return (f x)
| FailNow e -> FailNow e
| Run cell -> map_cell_ ~async f cell ~into:(create_cell())
let map f fut = map_ ~async:false f fut
let map_async f fut = map_ ~async:true f fut
(*$R
let a = Fut.make (fun () -> 1) in
let b = Fut.map (fun x -> x+1) a in
let c = Fut.map (fun x -> x-1) b in
OUnit.assert_equal 1 (Fut.get c)
*)
let app_ ~async f x = match f, x with
| Return f, Return x ->
if async
then make1 f x
else Return (f x)
| FailNow e, _
| _, FailNow e -> FailNow e
| Return f, Run x ->
map_cell_ ~async (fun x -> f x) x ~into:(create_cell())
| Run f, Return x ->
map_cell_ ~async (fun f -> f x) f ~into:(create_cell())
| Run f, Run x ->
let cell' = create_cell () in
add_handler_ f
(function
| Done f -> ignore (map_cell_ ~async f x ~into:cell')
| Failed e -> set_fail_ cell' e
| Waiting -> assert false);
Run cell'
let app f x = app_ ~async:false f x
let app_async f x = app_ ~async:true f x
let flat_map f fut = match fut with
| Return x -> f x
| FailNow e -> FailNow e
| Run cell ->
let cell' = create_cell() in
add_handler_ cell
(function
| Done x ->
let fut' = f x in
on_finish fut'
(function
| Done y -> set_done_ cell' y
| Failed e -> set_fail_ cell' e
| Waiting -> assert false
)
| Failed e -> set_fail_ cell' e
| Waiting -> assert false
);
Run cell'
let and_then fut f = flat_map (fun _ -> f ()) fut
type _ array_or_list =
| A_ : 'a array -> 'a array_or_list
| L_ : 'a list -> 'a array_or_list
let iter_aol
: type a. a array_or_list -> (a -> unit) -> unit
= fun aol f -> match aol with
| A_ a -> Array.iter f a
| L_ l -> List.iter f l
(* [sequence_ l f] returns a future that waits for every element of [l]
to return of fail, and call [f ()] to obtain the result (as a closure)
in case every element succeeded (otherwise a failure is
returned automatically) *)
let sequence_
: type a res. a t array_or_list -> (unit -> res) -> res t
= fun aol f ->
let n = match aol with
| A_ a -> Array.length a
| L_ l -> List.length l
in
assert (n>0);
let cell = create_cell() in
let n_err = CCLock.create 0 in (* number of failed threads *)
let n_ok = CCLock.create 0 in (* number of succeeding threads *)
iter_aol aol
(fun fut ->
on_finish fut
(function
| Failed e ->
let x = CCLock.incr_then_get n_err in
(* if first failure, then seal [cell]'s fate now *)
if x=1 then set_fail_ cell e
| Done _ ->
let x = CCLock.incr_then_get n_ok in
(* if [n] successes, then [cell] succeeds. Otherwise, some
job has not finished or some job has failed. *)
if x = n then (
let res = f () in
set_done_ cell res
)
| Waiting -> assert false));
Run cell
(* map an array of futures to a future array *)
let sequence_a a = match a with
| [||] -> return [||]
| _ ->
sequence_ (A_ a)
(fun () -> Array.map get_nolock_ a)
let map_a f a = sequence_a (Array.map f a)
let sequence_l l = match l with
| [] -> return []
| _ :: _ ->
sequence_ (L_ l) (fun () -> List.map get_nolock_ l)
(* reverse twice *)
let map_l f l =
let l = List.rev_map f l in
sequence_ (L_ l)
(fun () -> List.rev_map get_nolock_ l)
(*$R
let l = CCList.(1 -- 50) in
let l' = l
|> List.map
(fun x -> Fut.make (fun () -> Thread.delay 0.1; x*10))
|> Fut.sequence_l
|> Fut.map (List.fold_left (+) 0)
in
let expected = List.fold_left (fun acc x -> acc + 10 * x) 0 l in
OUnit.assert_equal expected (Fut.get l')
*)
(*$R
let l = CCList.(1 -- 50) in
let l' = l
|> List.map
(fun x -> Fut.make (fun () -> Thread.delay 0.1; if x = 5 then raise Exit; x))
|> Fut.sequence_l
|> Fut.map (List.fold_left (+) 0)
in
OUnit.assert_raises Exit (fun () -> Fut.get l')
*)
(*$R
let rec fib x = if x<2 then 1 else fib (x-1)+fib(x-2) in
let l =
CCList.(1--10_000)
|> List.rev_map
(fun x-> Fut.make (fun () -> Thread.yield(); fib (x mod 20)))
|> Fut.(map_l (fun x->x>|= fun x->x+1))
in
OUnit.assert_bool "not done" (Fut.state l = Waiting);
let l' = Fut.get l in
OUnit.assert_equal 10_000 (List.length l');
*)
let choose_
: type a. a t array_or_list -> a t
= fun aol ->
let cell = create_cell() in
let is_done = CCLock.create false in
iter_aol aol
(fun fut ->
on_finish fut
(fun res -> match res with
| Waiting -> assert false
| Done x ->
let was_done = CCLock.get_then_clear is_done in
if not was_done then set_done_ cell x
| Failed e ->
let was_done = CCLock.get_then_clear is_done in
if not was_done then set_fail_ cell e));
Run cell
let choose_a a = choose_ (A_ a)
let choose_l l = choose_ (L_ l)
let sleep time = make1 Thread.delay time
(*$R
let start = Unix.gettimeofday () in
let pause = 0.2 and n = 10 in
let l = CCList.(1 -- n)
|> List.map (fun _ -> Fut.make (fun () -> Thread.delay pause))
in
List.iter Fut.get l;
let stop = Unix.gettimeofday () in
OUnit.assert_bool "some_parallelism" (stop -. start < float_of_int n *. pause);
*)
module Infix = struct
let (>>=) x f = flat_map f x
let (>>) a f = and_then a f
let (>|=) a f = map f a
let (<*>) = app
end
include Infix
end
end