rename CCFuture into CCPool, expose the thread pool

This commit is contained in:
Simon Cruanes 2016-01-26 01:02:29 +01:00
parent 6b03a28cba
commit ec70f865e4
6 changed files with 650 additions and 773 deletions

3
_oasis
View file

@ -114,7 +114,8 @@ Library "containers_bigarray"
Library "containers_thread" Library "containers_thread"
Path: src/threads/ Path: src/threads/
Modules: CCFuture, CCLock, CCSemaphore, CCThread, CCBlockingQueue Modules: CCPool, CCLock, CCSemaphore, CCThread, CCBlockingQueue,
CCTimer
FindlibName: thread FindlibName: thread
FindlibParent: containers FindlibParent: containers
Build$: flag(thread) Build$: flag(thread)

View file

@ -149,10 +149,11 @@ Moved to its own repository
{!modules: {!modules:
CCBlockingQueue CCBlockingQueue
CCFuture
CCLock CCLock
CCPool
CCSemaphore CCSemaphore
CCThread CCThread
CCTimer
} }

View file

@ -1,622 +0,0 @@
(*
Copyright (c) 2013, Simon Cruanes
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer. Redistributions in binary
form must reproduce the above copyright notice, this list of conditions and the
following disclaimer in the documentation and/or other materials provided with
the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*)
(** {1 Futures for concurrency} *)
type 'a state =
| Done of 'a
| Waiting
| Failed of exn
(** {2 Thread pool} *)
module Pool = struct
type job =
| Job1 : ('a -> unit) * 'a -> job
| Job2 : ('a -> 'b -> unit) * 'a * 'b -> job
| Job3 : ('a -> 'b -> 'c -> unit) * 'a * 'b * 'c -> job
| Job4 : ('a -> 'b -> 'c -> 'd -> unit) * 'a * 'b * 'c * 'd -> job
type t = {
mutable stop : bool; (* indicate that threads should stop *)
mutex : Mutex.t;
jobs : job Queue.t; (* waiting jobs *)
mutable cur_size : int; (* total number of threads *)
max_size : int;
} (** Dynamic, growable thread pool *)
let with_lock_ t f =
Mutex.lock t.mutex;
try
let x = f t in
Mutex.unlock t.mutex;
x
with e ->
Mutex.unlock t.mutex;
raise e
type command =
| Process of job
| Die (* thread has no work to do *)
let die pool =
assert (pool.cur_size > 0);
pool.cur_size <- pool.cur_size - 1;
Die
(** Thread: entry point. They seek jobs in the queue *)
let rec serve pool = match with_lock_ pool get_next with
| Die -> ()
| Process (Job1 (f, x)) -> ignore (f x); serve pool
| Process (Job2 (f, x, y)) -> ignore (f x y); serve pool
| Process (Job3 (f, x, y, z)) -> ignore (f x y z); serve pool
| Process (Job4 (f, x, y, z, w)) -> ignore (f x y z w); serve pool
(* thread: seek what to do next (including dying) *)
and get_next pool =
if pool.stop then die pool
else if Queue.is_empty pool.jobs then die pool
else (
let job = Queue.pop pool.jobs in
Process job
)
(** Create a pool with at most the given number of threads. [timeout]
is the time after which idle threads are killed. *)
let create ~max_size () =
let pool = {
stop = false;
cur_size = 0;
max_size;
jobs = Queue.create ();
mutex = Mutex.create ();
} in
pool
exception PoolStopped
let run_job pool job =
(* heuristic criterion for starting a new thread. We try to assess
whether there are many busy threads and many waiting tasks.
If there are many threads, it's less likely to start a new one *)
let should_start_thread p =
let num_q = Queue.length p.jobs in
let num_busy = p.cur_size in
let reached_max = p.cur_size = p.max_size in
num_q > 0 && not reached_max && (num_q > 2 * num_busy)
in
(* acquire lock and push job in queue *)
with_lock_ pool
(fun pool ->
if pool.stop then raise PoolStopped;
Queue.push job pool.jobs;
(* maybe start a thread *)
if should_start_thread pool then (
pool.cur_size <- pool.cur_size + 1;
ignore (Thread.create serve pool)
))
(* run the function on the argument in the given pool *)
let run1 pool f x = run_job pool (Job1 (f, x))
let run2 pool f x y = run_job pool (Job2 (f, x, y))
let run3 pool f x y z = run_job pool (Job3 (f, x, y, z))
let run4 pool f x y z w = run_job pool (Job4 (f, x, y, z, w))
(* kill threads in the pool *)
let stop pool =
with_lock_ pool
(fun p ->
p.stop <- true;
Queue.clear p.jobs)
end
(*$inject
open Infix
*)
let pool = Pool.create ~max_size:50 ()
(** Default pool of threads, should be ok for most uses. *)
(** {2 Futures} *)
type 'a handler = 'a state -> unit
(** A proper future, with a delayed computation *)
type 'a cell = {
mutable state : 'a state;
mutable handlers : 'a handler list; (* handlers *)
mutex : Mutex.t;
condition : Condition.t;
}
(** A future value of type 'a *)
type 'a t =
| Return of 'a
| FailNow of exn
| Run of 'a cell
type 'a future = 'a t
(** {2 Basic Future functions} *)
let return x = Return x
let fail e = FailNow e
let create_cell () = {
state = Waiting;
handlers = [];
mutex = Mutex.create ();
condition = Condition.create ();
}
let with_lock_ cell f =
Mutex.lock cell.mutex;
try
let x = f cell in
Mutex.unlock cell.mutex;
x
with e ->
Mutex.unlock cell.mutex;
raise e
let set_done_ cell x =
with_lock_ cell
(fun cell -> match cell.state with
| Waiting -> (* set state and signal *)
cell.state <- Done x;
Condition.broadcast cell.condition;
List.iter (fun f -> f cell.state) cell.handlers
| _ -> assert false)
let set_fail_ cell e =
with_lock_ cell
(fun cell -> match cell.state with
| Waiting ->
cell.state <- Failed e;
Condition.broadcast cell.condition;
List.iter (fun f -> f cell.state) cell.handlers
| _ -> assert false)
let run_and_set1 cell f x =
try
let y = f x in
set_done_ cell y
with e ->
set_fail_ cell e
let run_and_set2 cell f x y =
try
let z = f x y in
set_done_ cell z
with e ->
set_fail_ cell e
let make1 f x =
let cell = create_cell() in
Pool.run3 pool run_and_set1 cell f x;
Run cell
let make f = make1 f ()
(*$R
List.iter
(fun n ->
let l = Sequence.(1 -- n) |> Sequence.to_list in
let l = List.map (fun i ->
make
(fun () ->
Thread.delay 0.1;
1
)) l in
let l' = List.map get l in
OUnit.assert_equal n (List.fold_left (+) 0 l');
)
[ 10; 300 ]
*)
let make2 f x y =
let cell = create_cell() in
Pool.run4 pool run_and_set2 cell f x y;
Run cell
let get = function
| Return x -> x
| FailNow e -> raise e
| Run cell ->
let rec get_cell cell = match cell.state with
| Waiting ->
Condition.wait cell.condition cell.mutex; (* wait *)
get_cell cell
| Done x -> Mutex.unlock cell.mutex; x
| Failed e -> Mutex.unlock cell.mutex; raise e
in
Mutex.lock cell.mutex;
get_cell cell
let state = function
| Return x -> Done x
| FailNow e -> Failed e
| Run cell ->
with_lock_ cell (fun cell -> cell.state)
let is_done = function
| Return _
| FailNow _ -> true
| Run cell ->
with_lock_ cell (fun c -> c.state <> Waiting)
(** {2 Combinators *)
let add_handler_ cell f =
with_lock_ cell
(fun cell -> match cell.state with
| Waiting -> cell.handlers <- f :: cell.handlers
| Done _ | Failed _ -> f cell.state
)
let on_finish fut k = match fut with
| Return x -> k (Done x)
| FailNow e -> k (Failed e)
| Run cell -> add_handler_ cell k
let on_success fut k =
on_finish fut
(function
| Done x -> k x
| _ -> ()
)
let on_failure fut k =
on_finish fut
(function
| Failed e -> k e
| _ -> ()
)
let map f fut = match fut with
| Return x -> make1 f x
| FailNow e -> FailNow e
| Run cell ->
let cell' = create_cell() in
add_handler_ cell
(function
| Done x -> run_and_set1 cell' f x
| Failed e -> set_fail_ cell' e
| Waiting -> assert false
);
Run cell'
(*$R
let a = make (fun () -> 1) in
let b = map (fun x -> x+1) a in
let c = map (fun x -> x-1) b in
OUnit.assert_equal 1 (get c)
*)
let flat_map f fut = match fut with
| Return x -> f x
| FailNow e -> FailNow e
| Run cell ->
let cell' = create_cell() in
add_handler_ cell
(function
| Done x ->
let fut' = f x in
on_finish fut'
(function
| Done y -> set_done_ cell' y
| Failed e -> set_fail_ cell' e
| Waiting -> assert false
)
| Failed e -> set_fail_ cell' e
| Waiting -> assert false
);
Run cell'
let and_then fut f = flat_map (fun _ -> f ()) fut
let sequence futures =
let n = List.length futures in
let state = CCLock.create (`WaitFor n) in
let results = Array.make n None in
let cell = create_cell() in
(* when all futures returned, collect results for future' *)
let send_result () =
let l = Array.map
(function
| None -> assert false
| Some x -> x
) results
in
set_done_ cell (Array.to_list l)
in
(* wait for all to succeed or fail *)
List.iteri
(fun i fut ->
on_finish fut
(fun res ->
CCLock.update state
(fun st -> match res, st with
| Done _, `Failed -> st
| Done x, `WaitFor 1 -> results.(i) <- Some x; send_result (); `Done
| Done x, `WaitFor n -> results.(i) <- Some x; `WaitFor (n-1)
| Failed _, `Failed -> st
| Failed e, `WaitFor _ -> set_fail_ cell e; `Failed
| _, `Done -> assert false
| Waiting, _ -> assert false
)
)
) futures;
Run cell
(*$R
let l = CCList.(1 -- 10) in
let l' = l
|> List.map
(fun x -> make (fun () -> Thread.delay 0.2; x*10))
|> sequence
|> map (List.fold_left (+) 0)
in
let expected = List.fold_left (fun acc x -> acc + 10 * x) 0 l in
OUnit.assert_equal expected (get l')
*)
(*$R
let l = CCList.(1 -- 10) in
let l' = l
|> List.map
(fun x -> make (fun () -> Thread.delay 0.2; if x = 5 then raise Exit; x))
|> sequence
|> map (List.fold_left (+) 0)
in
OUnit.assert_raises Exit (fun () -> get l')
*)
let choose futures =
let cell = create_cell() in
let state = ref `Waiting in
(* add handlers to all futures *)
List.iter
(fun fut ->
on_finish fut
(fun res -> match res, !state with
| Done x, `Waiting -> state := `Done; set_done_ cell x
| Failed e, `Waiting -> state := `Done; set_fail_ cell e
| Waiting, _ -> assert false
| _, `Done -> ()
)
) futures;
Run cell
(** slurp the entire state of the file_descr into a string *)
let slurp ic = CCIO.read_all_bytes ic
let read_chan ic = make1 slurp ic
type subprocess_res = <
errcode : int;
stdout : Bytes.t;
stderr : Bytes.t;
>
(** Spawn a sub-process with the given command [cmd] (and possibly input);
returns a future containing (returncode, stdout, stderr) *)
let spawn_process ?(stdin="") cmd : subprocess_res t =
make
(fun () ->
(* spawn subprocess *)
let out, inp, err = Unix.open_process_full cmd (Unix.environment ()) in
output_string inp stdin;
(* send stdin to command *)
flush inp;
close_out inp;
(* read output of process *)
let out' = slurp out in
let err' = slurp err in
(* wait for termination *)
let status = Unix.close_process_full (out,inp,err) in
(* get return code *)
let returncode = match status with
| Unix.WEXITED i -> i
| Unix.WSIGNALED i -> i
| Unix.WSTOPPED i -> i in
object
method errcode = returncode
method stdout = out'
method stderr = err'
end
)
let sleep time = make (fun () -> Thread.delay time)
(*$R
let start = Unix.gettimeofday () in
let pause = 0.2 and n = 10 in
let l = CCList.(1 -- n)
|> List.map (fun _ -> make (fun () -> Thread.delay pause))
in
List.iter get l;
let stop = Unix.gettimeofday () in
OUnit.assert_bool "some_parallelism" (stop -. start < float_of_int n *. pause);
*)
(** {2 Event timer} *)
module Timer = struct
module TaskHeap = CCHeap.Make(struct
type t = (float * unit cell)
let leq (f1,_)(f2,_) = f1 <= f2
end)
type t = {
mutable stop : bool;
mutable thread : Thread.t option; (* thread dedicated to the timer *)
mutable tasks : TaskHeap.t;
t_mutex : Mutex.t;
fifo_in : Unix.file_descr;
fifo_out : Unix.file_descr;
} (** A timer for events *)
let standby_wait = 10. (* when no task is scheduled *)
let epsilon = 0.0001 (* accepted time diff for actions *)
let with_lock_ t f =
Mutex.lock t.t_mutex;
try
let x = f t in
Mutex.unlock t.t_mutex;
x
with e ->
Mutex.unlock t.t_mutex;
raise e
type command =
| Loop
| Wait of float
let pop_task_ t =
let tasks, _ = TaskHeap.take_exn t.tasks in
t.tasks <- tasks
(** Wait for next event, run it, and loop *)
let serve timer =
let buf = Bytes.make 1 '_' in
(* acquire lock, call [process_task] and do as it commands *)
let rec next () = match with_lock_ timer process_task with
| Loop -> next ()
| Wait delay -> wait delay
(* check next task *)
and process_task timer = match TaskHeap.find_min timer.tasks with
| None -> Wait standby_wait
| Some (time, cell) ->
let now = Unix.gettimeofday () in
if now +. epsilon > time then (
(* now! *)
pop_task_ timer;
set_done_ cell ();
Loop
) else Wait (time -. now)
(* wait for [delay] seconds, or until something happens on fifo_in *)
and wait delay =
let read = Thread.wait_timed_read timer.fifo_in delay in
if read
then ignore (Unix.read timer.fifo_in buf 0 1); (* remove char *)
next ()
in
next ()
(** A timer that runs in the given thread pool *)
let create () =
let fifo_in, fifo_out = Unix.pipe () in
let timer = {
stop = false;
thread = None;
tasks = TaskHeap.empty;
t_mutex = Mutex.create ();
fifo_in;
fifo_out;
} in
(* start a thread to process tasks *)
let t = Thread.create serve timer in
timer.thread <- Some t;
timer
let underscore_ = Bytes.make 1 '_'
(** [timerule_at s t act] will run [act] at the Unix echo [t] *)
let at timer time =
let now = Unix.gettimeofday () in
if now >= time
then return ()
else (
let cell = create_cell() in
with_lock_ timer
(fun timer ->
(* time of the next scheduled event *)
let next_time = match TaskHeap.find_min timer.tasks with
| None -> max_float
| Some (f, _) -> f
in
(* insert task *)
timer.tasks <- TaskHeap.insert (time, cell) timer.tasks;
(* see if the timer thread needs to be awaken earlier *)
if time < next_time
then ignore (Unix.single_write timer.fifo_out underscore_ 0 1)
);
Run cell
)
let after timer delay =
assert (delay >= 0.);
let now = Unix.gettimeofday () in
at timer (now +. delay)
(** Stop the given timer, cancelling pending tasks *)
let stop timer =
with_lock_ timer
(fun timer ->
if not timer.stop then (
timer.stop <- true;
(* empty heap of tasks *)
timer.tasks <- TaskHeap.empty;
(* kill the thread *)
match timer.thread with
| None -> ()
| Some t ->
Thread.kill t;
timer.thread <- None
)
)
end
(*$R
let timer = Timer.create () in
let n = CCLock.create 1 in
let getter = make (fun () -> Thread.delay 0.8; CCLock.get n) in
let _ =
Timer.after timer 0.6
>>= fun () -> CCLock.update n (fun x -> x+2); return()
in
let _ =
Timer.after timer 0.4
>>= fun () -> CCLock.update n (fun x -> x * 4); return()
in
OUnit.assert_equal 6 (get getter);
*)
module Infix = struct
let (>>=) x f = flat_map f x
let (>>) a f = and_then a f
let (>|=) a f = map f a
end
include Infix
(** {2 Low Level } *)
let stop_pool () = Pool.stop pool

View file

@ -1,149 +0,0 @@
(*
Copyright (c) 2013, Simon Cruanes
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer. Redistributions in binary
form must reproduce the above copyright notice, this list of conditions and the
following disclaimer in the documentation and/or other materials provided with
the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*)
(** {1 Futures for concurrency} *)
type 'a state =
| Done of 'a
| Waiting
| Failed of exn
type 'a t
(** A future value of type 'a *)
type 'a future = 'a t
(** {2 Constructors} *)
val return : 'a -> 'a t
(** Future that is already computed *)
val fail : exn -> 'a t
(** Future that fails immediately *)
val make : (unit -> 'a) -> 'a t
(** Create a future, representing a value that will be computed by
the function. If the function raises, the future will fail. *)
val make1 : ('a -> 'b) -> 'a -> 'b t
val make2 : ('a -> 'b -> 'c) -> 'a -> 'b -> 'c t
(** {2 Basics} *)
val get : 'a t -> 'a
(** Blocking get: wait for the future to be evaluated, and get the value,
or the exception that failed the future is returned.
raise e if the future failed with e *)
val state : 'a t -> 'a state
(** State of the future *)
val is_done : 'a t -> bool
(** Is the future evaluated (success/failure)? *)
(** {2 Combinators} *)
val on_success : 'a t -> ('a -> unit) -> unit
(** Attach a handler to be called upon success *)
val on_failure : _ t -> (exn -> unit) -> unit
(** Attach a handler to be called upon failure *)
val on_finish : 'a t -> ('a state -> unit) -> unit
(** Attach a handler to be called when the future is evaluated *)
val flat_map : ('a -> 'b t) -> 'a t -> 'b t
(** Monadic combination of futures *)
val and_then : 'a t -> (unit -> 'b t) -> 'b t
(** Wait for the first future to succeed, then launch the second *)
val sequence : 'a t list -> 'a list t
(** Future that waits for all previous sequences to terminate. If any future
in the list fails, [sequence l] fails too. *)
val choose : 'a t list -> 'a t
(** Choose among those futures (the first to terminate). Behaves like
the first future that terminates, by failing if the future fails *)
val map : ('a -> 'b) -> 'a t -> 'b t
(** Maps the value inside the future. The function doesn't run in its
own task; if it can take time, use {!flat_map} *)
(** {2 Helpers} *)
val read_chan : in_channel -> Bytes.t t
(** Read the whole channel *)
type subprocess_res = <
errcode : int;
stdout : Bytes.t;
stderr : Bytes.t;
>
val spawn_process : ?stdin:string -> string -> subprocess_res t
(** Spawn a sub-process with the given command (and possibly input);
returns a future containing [(returncode, stdout, stderr)] *)
val sleep : float -> unit t
(** Future that returns with success in the given amount of seconds. Blocks
the thread! If you need to wait on many events, consider
using {!Timer} *)
(** {2 Event timer} *)
module Timer : sig
type t
(** A scheduler for events. It runs in its own thread. *)
val create : unit -> t
(** A new timer. *)
val after : t -> float -> unit future
(** Create a future that waits for the given number of seconds, then
awakens with [()] *)
val at : t -> float -> unit future
(** Create a future that evaluates to [()] at the given Unix timestamp *)
val stop : t -> unit
(** Stop the given timer, cancelling pending tasks *)
end
module Infix : sig
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
val (>>) : 'a t -> (unit -> 'b t) -> 'b t
val (>|=) : 'a t -> ('a -> 'b) -> 'b t
end
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
val (>>) : 'a t -> (unit -> 'b t) -> 'b t
val (>|=) : 'a t -> ('a -> 'b) -> 'b t
(** {2 Low level} *)
val stop_pool : unit -> unit
(** Stop the thread pool *)

496
src/threads/CCPool.ml Normal file
View file

@ -0,0 +1,496 @@
(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Thread Pool, and Futures} *)
type +'a state =
| Done of 'a
| Waiting
| Failed of exn
module type PARAM = sig
val max_size : int
(** Maximum number of threads in the pool *)
end
exception Stopped
(*$inject
module P = Make(struct let max_size = 30 end)
module Fut = P.Fut
open Fut.Infix
*)
(** {2 Thread pool} *)
module Make(P : PARAM) = struct
type job =
| Job1 : ('a -> _) * 'a -> job
| Job2 : ('a -> 'b -> _) * 'a * 'b -> job
| Job3 : ('a -> 'b -> 'c -> _) * 'a * 'b * 'c -> job
| Job4 : ('a -> 'b -> 'c -> 'd -> _) * 'a * 'b * 'c * 'd -> job
type t = {
mutable stop : bool; (* indicate that threads should stop *)
mutable exn_handler: (exn -> unit);
mutex : Mutex.t;
jobs : job Queue.t; (* waiting jobs *)
mutable cur_size : int; (* total number of threads *)
} (** Dynamic, growable thread pool *)
let nop_ _ = ()
(* singleton pool *)
let pool = {
stop = false;
exn_handler = nop_;
cur_size = 0;
jobs = Queue.create ();
mutex = Mutex.create ();
}
let set_exn_handler f = pool.exn_handler <- f
let with_lock_ t f =
Mutex.lock t.mutex;
try
let x = f t in
Mutex.unlock t.mutex;
x
with e ->
Mutex.unlock t.mutex;
raise e
(* next thing a thread should do *)
type command =
| Process of job
| Die (* thread has no work to do *)
(* thread: seek what to do next (including dying).
Assumes the pool is locked. *)
let get_next_ pool =
if pool.stop || Queue.is_empty pool.jobs then (
(* die: the thread would be idle otherwise *)
assert (pool.cur_size > 0);
pool.cur_size <- pool.cur_size - 1;
Die
) else (
let job = Queue.pop pool.jobs in
Process job
)
(* Thread: entry point. They seek jobs in the queue *)
let rec serve pool =
let cmd = with_lock_ pool get_next_ in
run_cmd cmd
(* run a command *)
and run_cmd = function
| Die -> ()
| Process (Job1 (f, x)) ->
begin try ignore (f x) with e -> pool.exn_handler e end; serve pool
| Process (Job2 (f, x, y)) ->
begin try ignore (f x y) with e -> pool.exn_handler e end; serve pool
| Process (Job3 (f, x, y, z)) ->
begin try ignore (f x y z) with e -> pool.exn_handler e end; serve pool
| Process (Job4 (f, x, y, z, w)) ->
begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool
(* heuristic criterion for starting a new thread. *)
let should_start_thread p = p.cur_size < P.max_size
let incr_size_ p = p.cur_size <- p.cur_size +1
let run_job job =
(* acquire lock and push job in queue, or start thread directly
if the queue is empty *)
with_lock_ pool
(fun pool ->
if pool.stop then raise Stopped;
if Queue.is_empty pool.jobs && should_start_thread pool
then (
pool.cur_size <- pool.cur_size + 1;
(* create the thread now, on [job], as it will not
break order *)
ignore (Thread.create run_cmd (Process job))
) else (
assert (pool.cur_size > 0);
Queue.push job pool.jobs;
(* might want to process in the background *)
if should_start_thread pool then (
incr_size_ pool;
ignore (Thread.create serve pool);
)
))
(* run the function on the argument in the given pool *)
let run1 f x = run_job (Job1 (f, x))
let run f = run1 f ()
let run2 f x y = run_job (Job2 (f, x, y))
let run3 f x y z = run_job (Job3 (f, x, y, z))
let run4 f x y z w = run_job (Job4 (f, x, y, z, w))
let active () = not pool.stop
(* kill threads in the pool *)
let stop () =
with_lock_ pool
(fun p ->
p.stop <- true;
Queue.clear p.jobs)
(** {6 Futures} *)
module Fut = struct
type 'a handler = 'a state -> unit
(** A proper future, with a delayed computation *)
type 'a cell = {
mutable state : 'a state;
mutable handlers : 'a handler list; (* handlers *)
f_mutex : Mutex.t;
condition : Condition.t;
}
(** A future value of type 'a *)
type 'a t =
| Return of 'a
| FailNow of exn
| Run of 'a cell
type 'a future = 'a t
(** {2 Basic Future functions} *)
let return x = Return x
let fail e = FailNow e
let create_cell () = {
state = Waiting;
handlers = [];
f_mutex = Mutex.create ();
condition = Condition.create ();
}
let with_lock_ cell f =
Mutex.lock cell.f_mutex;
try
let x = f cell in
Mutex.unlock cell.f_mutex;
x
with e ->
Mutex.unlock cell.f_mutex;
raise e
(* TODO: exception handler for handler errors *)
let set_done_ cell x =
with_lock_ cell
(fun cell -> match cell.state with
| Waiting -> (* set state and signal *)
cell.state <- Done x;
Condition.broadcast cell.condition;
List.iter
(fun f -> try f cell.state with e -> pool.exn_handler e)
cell.handlers
| _ -> assert false)
let set_fail_ cell e =
with_lock_ cell
(fun cell -> match cell.state with
| Waiting ->
cell.state <- Failed e;
Condition.broadcast cell.condition;
List.iter
(fun f -> try f cell.state with e -> pool.exn_handler e)
cell.handlers
| _ -> assert false)
(* calls [f x], and put result or exception in [cell] *)
let run_and_set1 cell f x =
try
let y = f x in
set_done_ cell y
with e ->
set_fail_ cell e
let run_and_set2 cell f x y =
try
let z = f x y in
set_done_ cell z
with e ->
set_fail_ cell e
let make1 f x =
let cell = create_cell() in
run3 run_and_set1 cell f x;
Run cell
let make f = make1 f ()
(*$R
List.iter
(fun n ->
let l = Sequence.(1 -- n) |> Sequence.to_list in
let l = List.rev_map (fun i ->
Fut.make
(fun () ->
Thread.delay 0.1;
1
)) l in
let l' = List.map Fut.get l in
OUnit.assert_equal n (List.fold_left (+) 0 l');
)
[ 10; 300; ]
*)
let make2 f x y =
let cell = create_cell() in
run4 run_and_set2 cell f x y;
Run cell
let get = function
| Return x -> x
| FailNow e -> raise e
| Run cell ->
let rec get_ cell = match cell.state with
| Waiting ->
Condition.wait cell.condition cell.f_mutex; (* wait *)
get_ cell
| Done x -> x
| Failed e -> raise e
in
with_lock_ cell get_
(* access the result without locking *)
let get_nolock_ = function
| Return x
| Run {state=Done x; _} -> x
| FailNow _
| Run {state=(Failed _ | Waiting); _} -> assert false
let state = function
| Return x -> Done x
| FailNow e -> Failed e
| Run cell ->
with_lock_ cell (fun cell -> cell.state)
let is_done = function
| Return _
| FailNow _ -> true
| Run cell ->
with_lock_ cell (fun c -> c.state <> Waiting)
(** {2 Combinators *)
let add_handler_ cell f =
with_lock_ cell
(fun cell -> match cell.state with
| Waiting -> cell.handlers <- f :: cell.handlers
| Done _ | Failed _ -> f cell.state)
let on_finish fut k = match fut with
| Return x -> k (Done x)
| FailNow e -> k (Failed e)
| Run cell -> add_handler_ cell k
let on_success fut k =
on_finish fut
(function
| Done x -> k x
| _ -> ())
let on_failure fut k =
on_finish fut
(function
| Failed e -> k e
| _ -> ())
let map f fut = match fut with
| Return x -> Return (f x)
| FailNow e -> FailNow e
| Run cell ->
let cell' = create_cell() in
add_handler_ cell
(function
| Done x -> run_and_set1 cell' f x
| Failed e -> set_fail_ cell' e
| Waiting -> assert false);
Run cell'
(*$R
let a = Fut.make (fun () -> 1) in
let b = Fut.map (fun x -> x+1) a in
let c = Fut.map (fun x -> x-1) b in
OUnit.assert_equal 1 (Fut.get c)
*)
(* same as {!map}, but schedules the computation of [f] in the pool *)
let map_async f fut = match fut with
| Return x -> make1 f x
| FailNow e -> FailNow e
| Run cell ->
let cell' = create_cell() in
add_handler_ cell
(function
| Done x -> run3 run_and_set1 cell' f x
| Failed e -> set_fail_ cell' e
| Waiting -> assert false);
Run cell'
let flat_map f fut = match fut with
| Return x -> f x
| FailNow e -> FailNow e
| Run cell ->
let cell' = create_cell() in
add_handler_ cell
(function
| Done x ->
let fut' = f x in
on_finish fut'
(function
| Done y -> set_done_ cell' y
| Failed e -> set_fail_ cell' e
| Waiting -> assert false
)
| Failed e -> set_fail_ cell' e
| Waiting -> assert false
);
Run cell'
let and_then fut f = flat_map (fun _ -> f ()) fut
type _ array_or_list =
| A_ : 'a array -> 'a array_or_list
| L_ : 'a list -> 'a array_or_list
let iter_aol
: type a. a array_or_list -> (a -> unit) -> unit
= fun aol f -> match aol with
| A_ a -> Array.iter f a
| L_ l -> List.iter f l
(* [sequence_ l f] returns a future that waits for every element of [l]
to return of fail, and call [f ()] to obtain the result (as a closure)
in case every element succeeded (otherwise a failure is
returned automatically) *)
let sequence_
: type a res. a t array_or_list -> (unit -> res) -> res t
= fun aol f ->
let n = match aol with
| A_ a -> Array.length a
| L_ l -> List.length l
in
assert (n>0);
let cell = create_cell() in
let n_err = CCLock.create 0 in (* number of failed threads *)
let n_ok = CCLock.create 0 in (* number of succeeding threads *)
iter_aol aol
(fun fut ->
on_finish fut
(function
| Failed e ->
let x = CCLock.incr_then_get n_err in
(* if first failure, then seal [cell]'s fate now *)
if x=1 then set_fail_ cell e
| Done _ ->
let x = CCLock.incr_then_get n_ok in
(* if [n] successes, then [cell] succeeds. Otherwise, some
job has not finished or some job has failed. *)
if x = n then (
let res = f () in
set_done_ cell res
)
| Waiting -> assert false));
Run cell
(* map an array of futures to a future array *)
let sequence_a a = match a with
| [||] -> return [||]
| _ ->
sequence_ (A_ a)
(fun () -> Array.map get_nolock_ a)
let map_a f a = sequence_a (Array.map f a)
let sequence_l l = match l with
| [] -> return []
| _ :: _ ->
sequence_ (L_ l) (fun () -> List.map get_nolock_ l)
(* reverse twice *)
let map_l f l =
let l = List.rev_map f l in
sequence_ (L_ l)
(fun () -> List.rev_map get_nolock_ l)
(*$R
let l = CCList.(1 -- 50) in
let l' = l
|> List.map
(fun x -> Fut.make (fun () -> Thread.delay 0.1; x*10))
|> Fut.sequence_l
|> Fut.map (List.fold_left (+) 0)
in
let expected = List.fold_left (fun acc x -> acc + 10 * x) 0 l in
OUnit.assert_equal expected (Fut.get l')
*)
(*$R
let l = CCList.(1 -- 50) in
let l' = l
|> List.map
(fun x -> Fut.make (fun () -> Thread.delay 0.1; if x = 5 then raise Exit; x))
|> Fut.sequence_l
|> Fut.map (List.fold_left (+) 0)
in
OUnit.assert_raises Exit (fun () -> Fut.get l')
*)
let choose_
: type a. a t array_or_list -> a t
= fun aol ->
let cell = create_cell() in
let is_done = CCLock.create false in
iter_aol aol
(fun fut ->
on_finish fut
(fun res -> match res with
| Waiting -> assert false
| Done x ->
let was_done = CCLock.get_then_clear is_done in
if not was_done then set_done_ cell x
| Failed e ->
let was_done = CCLock.get_then_clear is_done in
if not was_done then set_fail_ cell e));
Run cell
let choose_a a = choose_ (A_ a)
let choose_l l = choose_ (L_ l)
let sleep time = make1 Thread.delay time
(*$R
let start = Unix.gettimeofday () in
let pause = 0.2 and n = 10 in
let l = CCList.(1 -- n)
|> List.map (fun _ -> Fut.make (fun () -> Thread.delay pause))
in
List.iter Fut.get l;
let stop = Unix.gettimeofday () in
OUnit.assert_bool "some_parallelism" (stop -. start < float_of_int n *. pause);
*)
module Infix = struct
let (>>=) x f = flat_map f x
let (>>) a f = and_then a f
let (>|=) a f = map f a
end
include Infix
end
end

150
src/threads/CCPool.mli Normal file
View file

@ -0,0 +1,150 @@
(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Thread Pool, and Futures}
Renamed and heavily updated from [CCFuture]
@since NEXT_RELEASE *)
type +'a state =
| Done of 'a
| Waiting
| Failed of exn
module type PARAM = sig
val max_size : int
(** Maximum number of threads in the pool *)
end
exception Stopped
(** {2 Create a new Pool} *)
module Make(P : PARAM) : sig
val run : (unit -> _) -> unit
(** [run f] schedules [f] for being executed in the thread pool *)
val run1 : ('a -> _) -> 'a -> unit
(** [run1 f x] is similar to [run (fun () -> f x)] *)
val run2 : ('a -> 'b -> _) -> 'a -> 'b -> unit
val run3 : ('a -> 'b -> 'c -> _) -> 'a -> 'b -> 'c -> unit
val set_exn_handler : (exn -> unit) -> unit
val active : unit -> bool
(** [active ()] is true as long as [stop()] has not been called yet *)
val stop : unit -> unit
(** After calling [stop ()], Most functions will raise Stopped.
This has the effect of preventing new tasks from being executed. *)
(** {6 Futures}
The futures are registration points for callbacks, storing a {!state},
that are executed in the pool using {!run}. *)
module Fut : sig
type 'a t
(** A future value of type 'a *)
type 'a future = 'a t
(** {2 Constructors} *)
val return : 'a -> 'a t
(** Future that is already computed *)
val fail : exn -> 'a t
(** Future that fails immediately *)
val make : (unit -> 'a) -> 'a t
(** Create a future, representing a value that will be computed by
the function. If the function raises, the future will fail. *)
val make1 : ('a -> 'b) -> 'a -> 'b t
val make2 : ('a -> 'b -> 'c) -> 'a -> 'b -> 'c t
(** {2 Basics} *)
val get : 'a t -> 'a
(** Blocking get: wait for the future to be evaluated, and get the value,
or the exception that failed the future is returned.
raise e if the future failed with e *)
val state : 'a t -> 'a state
(** State of the future *)
val is_done : 'a t -> bool
(** Is the future evaluated (success/failure)? *)
(** {2 Combinators} *)
val on_success : 'a t -> ('a -> unit) -> unit
(** Attach a handler to be called upon success.
The handler should not call functions on the future.
Might be evaluated now if the future is already done. *)
val on_failure : _ t -> (exn -> unit) -> unit
(** Attach a handler to be called upon failure.
The handler should not call any function on the future.
Might be evaluated now if the future is already done. *)
val on_finish : 'a t -> ('a state -> unit) -> unit
(** Attach a handler to be called when the future is evaluated.
The handler should not call functions on the future.
Might be evaluated now if the future is already done. *)
val flat_map : ('a -> 'b t) -> 'a t -> 'b t
(** Monadic combination of futures *)
val and_then : 'a t -> (unit -> 'b t) -> 'b t
(** Wait for the first future to succeed, then launch the second *)
val sequence_a : 'a t array -> 'a array t
(** Future that waits for all previous futures to terminate. If any future
in the array fails, [sequence_a l] fails too. *)
val map_a : ('a -> 'b t) -> 'a array -> 'b array t
(** [map_l f a] maps [f] on every element of [a], and will return
the array of every result if all calls succeed, or an error otherwise. *)
val sequence_l : 'a t list -> 'a list t
(** Future that waits for all previous futures to terminate. If any future
in the list fails, [sequence_l l] fails too. *)
val map_l : ('a -> 'b t) -> 'a list -> 'b list t
(** [map_l f l] maps [f] on every element of [l], and will return
the list of every result if all calls succeed, or an error otherwise. *)
val choose_a : 'a t array -> 'a t
(** Choose among those futures (the first to terminate). Behaves like
the first future that terminates, by failing if the future fails *)
val choose_l : 'a t list -> 'a t
(** Choose among those futures (the first to terminate). Behaves like
the first future that terminates, by failing if the future fails *)
val map : ('a -> 'b) -> 'a t -> 'b t
(** Maps the value inside the future. The function doesn't run in its
own task; if it can take time, use {!flat_map} or {!map_async} *)
val map_async : ('a -> 'b) -> 'a t -> 'b t
(** Maps the value inside the future, to be computed in a separated job. *)
val sleep : float -> unit t
(** Future that returns with success in the given amount of seconds. Blocks
the thread! If you need to wait on many events, consider
using {!CCTimer}. *)
module Infix : sig
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
val (>>) : 'a t -> (unit -> 'b t) -> 'b t
val (>|=) : 'a t -> ('a -> 'b) -> 'b t
end
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
val (>>) : 'a t -> (unit -> 'b t) -> 'b t
val (>|=) : 'a t -> ('a -> 'b) -> 'b t
end
end