diff --git a/_oasis b/_oasis index 29a479c7..dbc3fdea 100644 --- a/_oasis +++ b/_oasis @@ -114,7 +114,8 @@ Library "containers_bigarray" Library "containers_thread" Path: src/threads/ - Modules: CCFuture, CCLock, CCSemaphore, CCThread, CCBlockingQueue + Modules: CCPool, CCLock, CCSemaphore, CCThread, CCBlockingQueue, + CCTimer FindlibName: thread FindlibParent: containers Build$: flag(thread) diff --git a/doc/intro.txt b/doc/intro.txt index 5fbc2bf0..2344eccb 100644 --- a/doc/intro.txt +++ b/doc/intro.txt @@ -149,10 +149,11 @@ Moved to its own repository {!modules: CCBlockingQueue -CCFuture CCLock +CCPool CCSemaphore CCThread +CCTimer } diff --git a/src/threads/CCFuture.ml b/src/threads/CCFuture.ml deleted file mode 100644 index 9d5d0424..00000000 --- a/src/threads/CCFuture.ml +++ /dev/null @@ -1,622 +0,0 @@ -(* -Copyright (c) 2013, Simon Cruanes -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -Redistributions of source code must retain the above copyright notice, this -list of conditions and the following disclaimer. Redistributions in binary -form must reproduce the above copyright notice, this list of conditions and the -following disclaimer in the documentation and/or other materials provided with -the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -*) - -(** {1 Futures for concurrency} *) - -type 'a state = - | Done of 'a - | Waiting - | Failed of exn - -(** {2 Thread pool} *) -module Pool = struct - type job = - | Job1 : ('a -> unit) * 'a -> job - | Job2 : ('a -> 'b -> unit) * 'a * 'b -> job - | Job3 : ('a -> 'b -> 'c -> unit) * 'a * 'b * 'c -> job - | Job4 : ('a -> 'b -> 'c -> 'd -> unit) * 'a * 'b * 'c * 'd -> job - - type t = { - mutable stop : bool; (* indicate that threads should stop *) - mutex : Mutex.t; - jobs : job Queue.t; (* waiting jobs *) - mutable cur_size : int; (* total number of threads *) - max_size : int; - } (** Dynamic, growable thread pool *) - - let with_lock_ t f = - Mutex.lock t.mutex; - try - let x = f t in - Mutex.unlock t.mutex; - x - with e -> - Mutex.unlock t.mutex; - raise e - - type command = - | Process of job - | Die (* thread has no work to do *) - - let die pool = - assert (pool.cur_size > 0); - pool.cur_size <- pool.cur_size - 1; - Die - - (** Thread: entry point. They seek jobs in the queue *) - let rec serve pool = match with_lock_ pool get_next with - | Die -> () - | Process (Job1 (f, x)) -> ignore (f x); serve pool - | Process (Job2 (f, x, y)) -> ignore (f x y); serve pool - | Process (Job3 (f, x, y, z)) -> ignore (f x y z); serve pool - | Process (Job4 (f, x, y, z, w)) -> ignore (f x y z w); serve pool - - (* thread: seek what to do next (including dying) *) - and get_next pool = - if pool.stop then die pool - else if Queue.is_empty pool.jobs then die pool - else ( - let job = Queue.pop pool.jobs in - Process job - ) - - (** Create a pool with at most the given number of threads. [timeout] - is the time after which idle threads are killed. *) - let create ~max_size () = - let pool = { - stop = false; - cur_size = 0; - max_size; - jobs = Queue.create (); - mutex = Mutex.create (); - } in - pool - - exception PoolStopped - - let run_job pool job = - (* heuristic criterion for starting a new thread. We try to assess - whether there are many busy threads and many waiting tasks. - If there are many threads, it's less likely to start a new one *) - let should_start_thread p = - let num_q = Queue.length p.jobs in - let num_busy = p.cur_size in - let reached_max = p.cur_size = p.max_size in - num_q > 0 && not reached_max && (num_q > 2 * num_busy) - in - (* acquire lock and push job in queue *) - with_lock_ pool - (fun pool -> - if pool.stop then raise PoolStopped; - Queue.push job pool.jobs; - (* maybe start a thread *) - if should_start_thread pool then ( - pool.cur_size <- pool.cur_size + 1; - ignore (Thread.create serve pool) - )) - - (* run the function on the argument in the given pool *) - let run1 pool f x = run_job pool (Job1 (f, x)) - - let run2 pool f x y = run_job pool (Job2 (f, x, y)) - - let run3 pool f x y z = run_job pool (Job3 (f, x, y, z)) - - let run4 pool f x y z w = run_job pool (Job4 (f, x, y, z, w)) - - (* kill threads in the pool *) - let stop pool = - with_lock_ pool - (fun p -> - p.stop <- true; - Queue.clear p.jobs) -end - -(*$inject - open Infix -*) - -let pool = Pool.create ~max_size:50 () -(** Default pool of threads, should be ok for most uses. *) - -(** {2 Futures} *) - -type 'a handler = 'a state -> unit - -(** A proper future, with a delayed computation *) -type 'a cell = { - mutable state : 'a state; - mutable handlers : 'a handler list; (* handlers *) - mutex : Mutex.t; - condition : Condition.t; -} - -(** A future value of type 'a *) -type 'a t = - | Return of 'a - | FailNow of exn - | Run of 'a cell - -type 'a future = 'a t - -(** {2 Basic Future functions} *) - -let return x = Return x - -let fail e = FailNow e - -let create_cell () = { - state = Waiting; - handlers = []; - mutex = Mutex.create (); - condition = Condition.create (); -} - -let with_lock_ cell f = - Mutex.lock cell.mutex; - try - let x = f cell in - Mutex.unlock cell.mutex; - x - with e -> - Mutex.unlock cell.mutex; - raise e - -let set_done_ cell x = - with_lock_ cell - (fun cell -> match cell.state with - | Waiting -> (* set state and signal *) - cell.state <- Done x; - Condition.broadcast cell.condition; - List.iter (fun f -> f cell.state) cell.handlers - | _ -> assert false) - -let set_fail_ cell e = - with_lock_ cell - (fun cell -> match cell.state with - | Waiting -> - cell.state <- Failed e; - Condition.broadcast cell.condition; - List.iter (fun f -> f cell.state) cell.handlers - | _ -> assert false) - -let run_and_set1 cell f x = - try - let y = f x in - set_done_ cell y - with e -> - set_fail_ cell e - -let run_and_set2 cell f x y = - try - let z = f x y in - set_done_ cell z - with e -> - set_fail_ cell e - -let make1 f x = - let cell = create_cell() in - Pool.run3 pool run_and_set1 cell f x; - Run cell - -let make f = make1 f () - -(*$R - List.iter - (fun n -> - let l = Sequence.(1 -- n) |> Sequence.to_list in - let l = List.map (fun i -> - make - (fun () -> - Thread.delay 0.1; - 1 - )) l in - let l' = List.map get l in - OUnit.assert_equal n (List.fold_left (+) 0 l'); - ) - [ 10; 300 ] -*) - -let make2 f x y = - let cell = create_cell() in - Pool.run4 pool run_and_set2 cell f x y; - Run cell - -let get = function - | Return x -> x - | FailNow e -> raise e - | Run cell -> - let rec get_cell cell = match cell.state with - | Waiting -> - Condition.wait cell.condition cell.mutex; (* wait *) - get_cell cell - | Done x -> Mutex.unlock cell.mutex; x - | Failed e -> Mutex.unlock cell.mutex; raise e - in - Mutex.lock cell.mutex; - get_cell cell - -let state = function - | Return x -> Done x - | FailNow e -> Failed e - | Run cell -> - with_lock_ cell (fun cell -> cell.state) - -let is_done = function - | Return _ - | FailNow _ -> true - | Run cell -> - with_lock_ cell (fun c -> c.state <> Waiting) - -(** {2 Combinators *) - -let add_handler_ cell f = - with_lock_ cell - (fun cell -> match cell.state with - | Waiting -> cell.handlers <- f :: cell.handlers - | Done _ | Failed _ -> f cell.state - ) - -let on_finish fut k = match fut with - | Return x -> k (Done x) - | FailNow e -> k (Failed e) - | Run cell -> add_handler_ cell k - -let on_success fut k = - on_finish fut - (function - | Done x -> k x - | _ -> () - ) - -let on_failure fut k = - on_finish fut - (function - | Failed e -> k e - | _ -> () - ) - -let map f fut = match fut with - | Return x -> make1 f x - | FailNow e -> FailNow e - | Run cell -> - let cell' = create_cell() in - add_handler_ cell - (function - | Done x -> run_and_set1 cell' f x - | Failed e -> set_fail_ cell' e - | Waiting -> assert false - ); - Run cell' - -(*$R - let a = make (fun () -> 1) in - let b = map (fun x -> x+1) a in - let c = map (fun x -> x-1) b in - OUnit.assert_equal 1 (get c) -*) - -let flat_map f fut = match fut with - | Return x -> f x - | FailNow e -> FailNow e - | Run cell -> - let cell' = create_cell() in - add_handler_ cell - (function - | Done x -> - let fut' = f x in - on_finish fut' - (function - | Done y -> set_done_ cell' y - | Failed e -> set_fail_ cell' e - | Waiting -> assert false - ) - | Failed e -> set_fail_ cell' e - | Waiting -> assert false - ); - Run cell' - -let and_then fut f = flat_map (fun _ -> f ()) fut - -let sequence futures = - let n = List.length futures in - let state = CCLock.create (`WaitFor n) in - let results = Array.make n None in - let cell = create_cell() in - (* when all futures returned, collect results for future' *) - let send_result () = - let l = Array.map - (function - | None -> assert false - | Some x -> x - ) results - in - set_done_ cell (Array.to_list l) - in - (* wait for all to succeed or fail *) - List.iteri - (fun i fut -> - on_finish fut - (fun res -> - CCLock.update state - (fun st -> match res, st with - | Done _, `Failed -> st - | Done x, `WaitFor 1 -> results.(i) <- Some x; send_result (); `Done - | Done x, `WaitFor n -> results.(i) <- Some x; `WaitFor (n-1) - | Failed _, `Failed -> st - | Failed e, `WaitFor _ -> set_fail_ cell e; `Failed - | _, `Done -> assert false - | Waiting, _ -> assert false - ) - ) - ) futures; - Run cell - -(*$R - let l = CCList.(1 -- 10) in - let l' = l - |> List.map - (fun x -> make (fun () -> Thread.delay 0.2; x*10)) - |> sequence - |> map (List.fold_left (+) 0) - in - let expected = List.fold_left (fun acc x -> acc + 10 * x) 0 l in - OUnit.assert_equal expected (get l') -*) - -(*$R - let l = CCList.(1 -- 10) in - let l' = l - |> List.map - (fun x -> make (fun () -> Thread.delay 0.2; if x = 5 then raise Exit; x)) - |> sequence - |> map (List.fold_left (+) 0) - in - OUnit.assert_raises Exit (fun () -> get l') -*) - -let choose futures = - let cell = create_cell() in - let state = ref `Waiting in - (* add handlers to all futures *) - List.iter - (fun fut -> - on_finish fut - (fun res -> match res, !state with - | Done x, `Waiting -> state := `Done; set_done_ cell x - | Failed e, `Waiting -> state := `Done; set_fail_ cell e - | Waiting, _ -> assert false - | _, `Done -> () - ) - ) futures; - Run cell - -(** slurp the entire state of the file_descr into a string *) -let slurp ic = CCIO.read_all_bytes ic - -let read_chan ic = make1 slurp ic - -type subprocess_res = < - errcode : int; - stdout : Bytes.t; - stderr : Bytes.t; -> - -(** Spawn a sub-process with the given command [cmd] (and possibly input); - returns a future containing (returncode, stdout, stderr) *) -let spawn_process ?(stdin="") cmd : subprocess_res t = - make - (fun () -> - (* spawn subprocess *) - let out, inp, err = Unix.open_process_full cmd (Unix.environment ()) in - output_string inp stdin; - (* send stdin to command *) - flush inp; - close_out inp; - (* read output of process *) - let out' = slurp out in - let err' = slurp err in - (* wait for termination *) - let status = Unix.close_process_full (out,inp,err) in - (* get return code *) - let returncode = match status with - | Unix.WEXITED i -> i - | Unix.WSIGNALED i -> i - | Unix.WSTOPPED i -> i in - object - method errcode = returncode - method stdout = out' - method stderr = err' - end - ) - -let sleep time = make (fun () -> Thread.delay time) - -(*$R - let start = Unix.gettimeofday () in - let pause = 0.2 and n = 10 in - let l = CCList.(1 -- n) - |> List.map (fun _ -> make (fun () -> Thread.delay pause)) - in - List.iter get l; - let stop = Unix.gettimeofday () in - OUnit.assert_bool "some_parallelism" (stop -. start < float_of_int n *. pause); -*) - -(** {2 Event timer} *) - -module Timer = struct - module TaskHeap = CCHeap.Make(struct - type t = (float * unit cell) - let leq (f1,_)(f2,_) = f1 <= f2 - end) - - type t = { - mutable stop : bool; - mutable thread : Thread.t option; (* thread dedicated to the timer *) - mutable tasks : TaskHeap.t; - t_mutex : Mutex.t; - fifo_in : Unix.file_descr; - fifo_out : Unix.file_descr; - } (** A timer for events *) - - let standby_wait = 10. (* when no task is scheduled *) - let epsilon = 0.0001 (* accepted time diff for actions *) - - let with_lock_ t f = - Mutex.lock t.t_mutex; - try - let x = f t in - Mutex.unlock t.t_mutex; - x - with e -> - Mutex.unlock t.t_mutex; - raise e - - type command = - | Loop - | Wait of float - - let pop_task_ t = - let tasks, _ = TaskHeap.take_exn t.tasks in - t.tasks <- tasks - - (** Wait for next event, run it, and loop *) - let serve timer = - let buf = Bytes.make 1 '_' in - (* acquire lock, call [process_task] and do as it commands *) - let rec next () = match with_lock_ timer process_task with - | Loop -> next () - | Wait delay -> wait delay - (* check next task *) - and process_task timer = match TaskHeap.find_min timer.tasks with - | None -> Wait standby_wait - | Some (time, cell) -> - let now = Unix.gettimeofday () in - if now +. epsilon > time then ( - (* now! *) - pop_task_ timer; - set_done_ cell (); - Loop - ) else Wait (time -. now) - (* wait for [delay] seconds, or until something happens on fifo_in *) - and wait delay = - let read = Thread.wait_timed_read timer.fifo_in delay in - if read - then ignore (Unix.read timer.fifo_in buf 0 1); (* remove char *) - next () - in - next () - - (** A timer that runs in the given thread pool *) - let create () = - let fifo_in, fifo_out = Unix.pipe () in - let timer = { - stop = false; - thread = None; - tasks = TaskHeap.empty; - t_mutex = Mutex.create (); - fifo_in; - fifo_out; - } in - (* start a thread to process tasks *) - let t = Thread.create serve timer in - timer.thread <- Some t; - timer - - let underscore_ = Bytes.make 1 '_' - - (** [timerule_at s t act] will run [act] at the Unix echo [t] *) - let at timer time = - let now = Unix.gettimeofday () in - if now >= time - then return () - else ( - let cell = create_cell() in - with_lock_ timer - (fun timer -> - (* time of the next scheduled event *) - let next_time = match TaskHeap.find_min timer.tasks with - | None -> max_float - | Some (f, _) -> f - in - (* insert task *) - timer.tasks <- TaskHeap.insert (time, cell) timer.tasks; - (* see if the timer thread needs to be awaken earlier *) - if time < next_time - then ignore (Unix.single_write timer.fifo_out underscore_ 0 1) - ); - Run cell - ) - - let after timer delay = - assert (delay >= 0.); - let now = Unix.gettimeofday () in - at timer (now +. delay) - - (** Stop the given timer, cancelling pending tasks *) - let stop timer = - with_lock_ timer - (fun timer -> - if not timer.stop then ( - timer.stop <- true; - (* empty heap of tasks *) - timer.tasks <- TaskHeap.empty; - (* kill the thread *) - match timer.thread with - | None -> () - | Some t -> - Thread.kill t; - timer.thread <- None - ) - ) -end - -(*$R - let timer = Timer.create () in - let n = CCLock.create 1 in - let getter = make (fun () -> Thread.delay 0.8; CCLock.get n) in - let _ = - Timer.after timer 0.6 - >>= fun () -> CCLock.update n (fun x -> x+2); return() - in - let _ = - Timer.after timer 0.4 - >>= fun () -> CCLock.update n (fun x -> x * 4); return() - in - OUnit.assert_equal 6 (get getter); -*) - -module Infix = struct - let (>>=) x f = flat_map f x - let (>>) a f = and_then a f - let (>|=) a f = map f a -end - -include Infix - -(** {2 Low Level } *) - -let stop_pool () = Pool.stop pool diff --git a/src/threads/CCFuture.mli b/src/threads/CCFuture.mli deleted file mode 100644 index 0d2a1fb2..00000000 --- a/src/threads/CCFuture.mli +++ /dev/null @@ -1,149 +0,0 @@ -(* -Copyright (c) 2013, Simon Cruanes -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -Redistributions of source code must retain the above copyright notice, this -list of conditions and the following disclaimer. Redistributions in binary -form must reproduce the above copyright notice, this list of conditions and the -following disclaimer in the documentation and/or other materials provided with -the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -*) - -(** {1 Futures for concurrency} *) - -type 'a state = - | Done of 'a - | Waiting - | Failed of exn - -type 'a t -(** A future value of type 'a *) - -type 'a future = 'a t - -(** {2 Constructors} *) - -val return : 'a -> 'a t -(** Future that is already computed *) - -val fail : exn -> 'a t -(** Future that fails immediately *) - -val make : (unit -> 'a) -> 'a t -(** Create a future, representing a value that will be computed by - the function. If the function raises, the future will fail. *) - -val make1 : ('a -> 'b) -> 'a -> 'b t -val make2 : ('a -> 'b -> 'c) -> 'a -> 'b -> 'c t - -(** {2 Basics} *) - -val get : 'a t -> 'a -(** Blocking get: wait for the future to be evaluated, and get the value, - or the exception that failed the future is returned. - raise e if the future failed with e *) - -val state : 'a t -> 'a state -(** State of the future *) - -val is_done : 'a t -> bool -(** Is the future evaluated (success/failure)? *) - -(** {2 Combinators} *) - -val on_success : 'a t -> ('a -> unit) -> unit -(** Attach a handler to be called upon success *) - -val on_failure : _ t -> (exn -> unit) -> unit -(** Attach a handler to be called upon failure *) - -val on_finish : 'a t -> ('a state -> unit) -> unit -(** Attach a handler to be called when the future is evaluated *) - -val flat_map : ('a -> 'b t) -> 'a t -> 'b t -(** Monadic combination of futures *) - -val and_then : 'a t -> (unit -> 'b t) -> 'b t -(** Wait for the first future to succeed, then launch the second *) - -val sequence : 'a t list -> 'a list t -(** Future that waits for all previous sequences to terminate. If any future - in the list fails, [sequence l] fails too. *) - -val choose : 'a t list -> 'a t -(** Choose among those futures (the first to terminate). Behaves like - the first future that terminates, by failing if the future fails *) - -val map : ('a -> 'b) -> 'a t -> 'b t -(** Maps the value inside the future. The function doesn't run in its - own task; if it can take time, use {!flat_map} *) - -(** {2 Helpers} *) - -val read_chan : in_channel -> Bytes.t t -(** Read the whole channel *) - -type subprocess_res = < - errcode : int; - stdout : Bytes.t; - stderr : Bytes.t; -> - -val spawn_process : ?stdin:string -> string -> subprocess_res t -(** Spawn a sub-process with the given command (and possibly input); - returns a future containing [(returncode, stdout, stderr)] *) - -val sleep : float -> unit t -(** Future that returns with success in the given amount of seconds. Blocks - the thread! If you need to wait on many events, consider - using {!Timer} *) - -(** {2 Event timer} *) - -module Timer : sig - type t - (** A scheduler for events. It runs in its own thread. *) - - val create : unit -> t - (** A new timer. *) - - val after : t -> float -> unit future - (** Create a future that waits for the given number of seconds, then - awakens with [()] *) - - val at : t -> float -> unit future - (** Create a future that evaluates to [()] at the given Unix timestamp *) - - val stop : t -> unit - (** Stop the given timer, cancelling pending tasks *) -end - -module Infix : sig - val (>>=) : 'a t -> ('a -> 'b t) -> 'b t - val (>>) : 'a t -> (unit -> 'b t) -> 'b t - val (>|=) : 'a t -> ('a -> 'b) -> 'b t -end - -val (>>=) : 'a t -> ('a -> 'b t) -> 'b t -val (>>) : 'a t -> (unit -> 'b t) -> 'b t -val (>|=) : 'a t -> ('a -> 'b) -> 'b t - -(** {2 Low level} *) - -val stop_pool : unit -> unit -(** Stop the thread pool *) - diff --git a/src/threads/CCPool.ml b/src/threads/CCPool.ml new file mode 100644 index 00000000..f33ca001 --- /dev/null +++ b/src/threads/CCPool.ml @@ -0,0 +1,496 @@ + +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Thread Pool, and Futures} *) + +type +'a state = + | Done of 'a + | Waiting + | Failed of exn + +module type PARAM = sig + val max_size : int + (** Maximum number of threads in the pool *) +end + +exception Stopped + +(*$inject + module P = Make(struct let max_size = 30 end) + module Fut = P.Fut + open Fut.Infix +*) + +(** {2 Thread pool} *) +module Make(P : PARAM) = struct + type job = + | Job1 : ('a -> _) * 'a -> job + | Job2 : ('a -> 'b -> _) * 'a * 'b -> job + | Job3 : ('a -> 'b -> 'c -> _) * 'a * 'b * 'c -> job + | Job4 : ('a -> 'b -> 'c -> 'd -> _) * 'a * 'b * 'c * 'd -> job + + type t = { + mutable stop : bool; (* indicate that threads should stop *) + mutable exn_handler: (exn -> unit); + mutex : Mutex.t; + jobs : job Queue.t; (* waiting jobs *) + mutable cur_size : int; (* total number of threads *) + } (** Dynamic, growable thread pool *) + + let nop_ _ = () + + (* singleton pool *) + let pool = { + stop = false; + exn_handler = nop_; + cur_size = 0; + jobs = Queue.create (); + mutex = Mutex.create (); + } + + let set_exn_handler f = pool.exn_handler <- f + + let with_lock_ t f = + Mutex.lock t.mutex; + try + let x = f t in + Mutex.unlock t.mutex; + x + with e -> + Mutex.unlock t.mutex; + raise e + + (* next thing a thread should do *) + type command = + | Process of job + | Die (* thread has no work to do *) + + (* thread: seek what to do next (including dying). + Assumes the pool is locked. *) + let get_next_ pool = + if pool.stop || Queue.is_empty pool.jobs then ( + (* die: the thread would be idle otherwise *) + assert (pool.cur_size > 0); + pool.cur_size <- pool.cur_size - 1; + Die + ) else ( + let job = Queue.pop pool.jobs in + Process job + ) + + (* Thread: entry point. They seek jobs in the queue *) + let rec serve pool = + let cmd = with_lock_ pool get_next_ in + run_cmd cmd + + (* run a command *) + and run_cmd = function + | Die -> () + | Process (Job1 (f, x)) -> + begin try ignore (f x) with e -> pool.exn_handler e end; serve pool + | Process (Job2 (f, x, y)) -> + begin try ignore (f x y) with e -> pool.exn_handler e end; serve pool + | Process (Job3 (f, x, y, z)) -> + begin try ignore (f x y z) with e -> pool.exn_handler e end; serve pool + | Process (Job4 (f, x, y, z, w)) -> + begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool + + (* heuristic criterion for starting a new thread. *) + let should_start_thread p = p.cur_size < P.max_size + + let incr_size_ p = p.cur_size <- p.cur_size +1 + + let run_job job = + (* acquire lock and push job in queue, or start thread directly + if the queue is empty *) + with_lock_ pool + (fun pool -> + if pool.stop then raise Stopped; + if Queue.is_empty pool.jobs && should_start_thread pool + then ( + pool.cur_size <- pool.cur_size + 1; + (* create the thread now, on [job], as it will not + break order *) + ignore (Thread.create run_cmd (Process job)) + ) else ( + assert (pool.cur_size > 0); + Queue.push job pool.jobs; + (* might want to process in the background *) + if should_start_thread pool then ( + incr_size_ pool; + ignore (Thread.create serve pool); + ) + )) + + (* run the function on the argument in the given pool *) + let run1 f x = run_job (Job1 (f, x)) + + let run f = run1 f () + + let run2 f x y = run_job (Job2 (f, x, y)) + + let run3 f x y z = run_job (Job3 (f, x, y, z)) + + let run4 f x y z w = run_job (Job4 (f, x, y, z, w)) + + let active () = not pool.stop + + (* kill threads in the pool *) + let stop () = + with_lock_ pool + (fun p -> + p.stop <- true; + Queue.clear p.jobs) + + (** {6 Futures} *) + module Fut = struct + type 'a handler = 'a state -> unit + + (** A proper future, with a delayed computation *) + type 'a cell = { + mutable state : 'a state; + mutable handlers : 'a handler list; (* handlers *) + f_mutex : Mutex.t; + condition : Condition.t; + } + + (** A future value of type 'a *) + type 'a t = + | Return of 'a + | FailNow of exn + | Run of 'a cell + + type 'a future = 'a t + + (** {2 Basic Future functions} *) + + let return x = Return x + + let fail e = FailNow e + + let create_cell () = { + state = Waiting; + handlers = []; + f_mutex = Mutex.create (); + condition = Condition.create (); + } + + let with_lock_ cell f = + Mutex.lock cell.f_mutex; + try + let x = f cell in + Mutex.unlock cell.f_mutex; + x + with e -> + Mutex.unlock cell.f_mutex; + raise e + + (* TODO: exception handler for handler errors *) + + let set_done_ cell x = + with_lock_ cell + (fun cell -> match cell.state with + | Waiting -> (* set state and signal *) + cell.state <- Done x; + Condition.broadcast cell.condition; + List.iter + (fun f -> try f cell.state with e -> pool.exn_handler e) + cell.handlers + | _ -> assert false) + + let set_fail_ cell e = + with_lock_ cell + (fun cell -> match cell.state with + | Waiting -> + cell.state <- Failed e; + Condition.broadcast cell.condition; + List.iter + (fun f -> try f cell.state with e -> pool.exn_handler e) + cell.handlers + | _ -> assert false) + + (* calls [f x], and put result or exception in [cell] *) + let run_and_set1 cell f x = + try + let y = f x in + set_done_ cell y + with e -> + set_fail_ cell e + + let run_and_set2 cell f x y = + try + let z = f x y in + set_done_ cell z + with e -> + set_fail_ cell e + + let make1 f x = + let cell = create_cell() in + run3 run_and_set1 cell f x; + Run cell + + let make f = make1 f () + + (*$R + List.iter + (fun n -> + let l = Sequence.(1 -- n) |> Sequence.to_list in + let l = List.rev_map (fun i -> + Fut.make + (fun () -> + Thread.delay 0.1; + 1 + )) l in + let l' = List.map Fut.get l in + OUnit.assert_equal n (List.fold_left (+) 0 l'); + ) + [ 10; 300; ] + *) + + let make2 f x y = + let cell = create_cell() in + run4 run_and_set2 cell f x y; + Run cell + + let get = function + | Return x -> x + | FailNow e -> raise e + | Run cell -> + let rec get_ cell = match cell.state with + | Waiting -> + Condition.wait cell.condition cell.f_mutex; (* wait *) + get_ cell + | Done x -> x + | Failed e -> raise e + in + with_lock_ cell get_ + + (* access the result without locking *) + let get_nolock_ = function + | Return x + | Run {state=Done x; _} -> x + | FailNow _ + | Run {state=(Failed _ | Waiting); _} -> assert false + + let state = function + | Return x -> Done x + | FailNow e -> Failed e + | Run cell -> + with_lock_ cell (fun cell -> cell.state) + + let is_done = function + | Return _ + | FailNow _ -> true + | Run cell -> + with_lock_ cell (fun c -> c.state <> Waiting) + + (** {2 Combinators *) + + let add_handler_ cell f = + with_lock_ cell + (fun cell -> match cell.state with + | Waiting -> cell.handlers <- f :: cell.handlers + | Done _ | Failed _ -> f cell.state) + + let on_finish fut k = match fut with + | Return x -> k (Done x) + | FailNow e -> k (Failed e) + | Run cell -> add_handler_ cell k + + let on_success fut k = + on_finish fut + (function + | Done x -> k x + | _ -> ()) + + let on_failure fut k = + on_finish fut + (function + | Failed e -> k e + | _ -> ()) + + let map f fut = match fut with + | Return x -> Return (f x) + | FailNow e -> FailNow e + | Run cell -> + let cell' = create_cell() in + add_handler_ cell + (function + | Done x -> run_and_set1 cell' f x + | Failed e -> set_fail_ cell' e + | Waiting -> assert false); + Run cell' + + (*$R + let a = Fut.make (fun () -> 1) in + let b = Fut.map (fun x -> x+1) a in + let c = Fut.map (fun x -> x-1) b in + OUnit.assert_equal 1 (Fut.get c) + *) + + (* same as {!map}, but schedules the computation of [f] in the pool *) + let map_async f fut = match fut with + | Return x -> make1 f x + | FailNow e -> FailNow e + | Run cell -> + let cell' = create_cell() in + add_handler_ cell + (function + | Done x -> run3 run_and_set1 cell' f x + | Failed e -> set_fail_ cell' e + | Waiting -> assert false); + Run cell' + + let flat_map f fut = match fut with + | Return x -> f x + | FailNow e -> FailNow e + | Run cell -> + let cell' = create_cell() in + add_handler_ cell + (function + | Done x -> + let fut' = f x in + on_finish fut' + (function + | Done y -> set_done_ cell' y + | Failed e -> set_fail_ cell' e + | Waiting -> assert false + ) + | Failed e -> set_fail_ cell' e + | Waiting -> assert false + ); + Run cell' + + let and_then fut f = flat_map (fun _ -> f ()) fut + + type _ array_or_list = + | A_ : 'a array -> 'a array_or_list + | L_ : 'a list -> 'a array_or_list + + let iter_aol + : type a. a array_or_list -> (a -> unit) -> unit + = fun aol f -> match aol with + | A_ a -> Array.iter f a + | L_ l -> List.iter f l + + (* [sequence_ l f] returns a future that waits for every element of [l] + to return of fail, and call [f ()] to obtain the result (as a closure) + in case every element succeeded (otherwise a failure is + returned automatically) *) + let sequence_ + : type a res. a t array_or_list -> (unit -> res) -> res t + = fun aol f -> + let n = match aol with + | A_ a -> Array.length a + | L_ l -> List.length l + in + assert (n>0); + let cell = create_cell() in + let n_err = CCLock.create 0 in (* number of failed threads *) + let n_ok = CCLock.create 0 in (* number of succeeding threads *) + iter_aol aol + (fun fut -> + on_finish fut + (function + | Failed e -> + let x = CCLock.incr_then_get n_err in + (* if first failure, then seal [cell]'s fate now *) + if x=1 then set_fail_ cell e + | Done _ -> + let x = CCLock.incr_then_get n_ok in + (* if [n] successes, then [cell] succeeds. Otherwise, some + job has not finished or some job has failed. *) + if x = n then ( + let res = f () in + set_done_ cell res + ) + | Waiting -> assert false)); + Run cell + + (* map an array of futures to a future array *) + let sequence_a a = match a with + | [||] -> return [||] + | _ -> + sequence_ (A_ a) + (fun () -> Array.map get_nolock_ a) + + let map_a f a = sequence_a (Array.map f a) + + let sequence_l l = match l with + | [] -> return [] + | _ :: _ -> + sequence_ (L_ l) (fun () -> List.map get_nolock_ l) + + (* reverse twice *) + let map_l f l = + let l = List.rev_map f l in + sequence_ (L_ l) + (fun () -> List.rev_map get_nolock_ l) + + (*$R + let l = CCList.(1 -- 50) in + let l' = l + |> List.map + (fun x -> Fut.make (fun () -> Thread.delay 0.1; x*10)) + |> Fut.sequence_l + |> Fut.map (List.fold_left (+) 0) + in + let expected = List.fold_left (fun acc x -> acc + 10 * x) 0 l in + OUnit.assert_equal expected (Fut.get l') + *) + + (*$R + let l = CCList.(1 -- 50) in + let l' = l + |> List.map + (fun x -> Fut.make (fun () -> Thread.delay 0.1; if x = 5 then raise Exit; x)) + |> Fut.sequence_l + |> Fut.map (List.fold_left (+) 0) + in + OUnit.assert_raises Exit (fun () -> Fut.get l') + *) + + let choose_ + : type a. a t array_or_list -> a t + = fun aol -> + let cell = create_cell() in + let is_done = CCLock.create false in + iter_aol aol + (fun fut -> + on_finish fut + (fun res -> match res with + | Waiting -> assert false + | Done x -> + let was_done = CCLock.get_then_clear is_done in + if not was_done then set_done_ cell x + | Failed e -> + let was_done = CCLock.get_then_clear is_done in + if not was_done then set_fail_ cell e)); + Run cell + + let choose_a a = choose_ (A_ a) + + let choose_l l = choose_ (L_ l) + + let sleep time = make1 Thread.delay time + + (*$R + let start = Unix.gettimeofday () in + let pause = 0.2 and n = 10 in + let l = CCList.(1 -- n) + |> List.map (fun _ -> Fut.make (fun () -> Thread.delay pause)) + in + List.iter Fut.get l; + let stop = Unix.gettimeofday () in + OUnit.assert_bool "some_parallelism" (stop -. start < float_of_int n *. pause); + *) + + module Infix = struct + let (>>=) x f = flat_map f x + let (>>) a f = and_then a f + let (>|=) a f = map f a + end + + include Infix + end +end diff --git a/src/threads/CCPool.mli b/src/threads/CCPool.mli new file mode 100644 index 00000000..94657788 --- /dev/null +++ b/src/threads/CCPool.mli @@ -0,0 +1,150 @@ + +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Thread Pool, and Futures} + + Renamed and heavily updated from [CCFuture] + @since NEXT_RELEASE *) + +type +'a state = + | Done of 'a + | Waiting + | Failed of exn + +module type PARAM = sig + val max_size : int + (** Maximum number of threads in the pool *) +end + +exception Stopped + +(** {2 Create a new Pool} *) +module Make(P : PARAM) : sig + val run : (unit -> _) -> unit + (** [run f] schedules [f] for being executed in the thread pool *) + + val run1 : ('a -> _) -> 'a -> unit + (** [run1 f x] is similar to [run (fun () -> f x)] *) + + val run2 : ('a -> 'b -> _) -> 'a -> 'b -> unit + + val run3 : ('a -> 'b -> 'c -> _) -> 'a -> 'b -> 'c -> unit + + val set_exn_handler : (exn -> unit) -> unit + + val active : unit -> bool + (** [active ()] is true as long as [stop()] has not been called yet *) + + val stop : unit -> unit + (** After calling [stop ()], Most functions will raise Stopped. + This has the effect of preventing new tasks from being executed. *) + + (** {6 Futures} + + The futures are registration points for callbacks, storing a {!state}, + that are executed in the pool using {!run}. *) + module Fut : sig + type 'a t + (** A future value of type 'a *) + + type 'a future = 'a t + + (** {2 Constructors} *) + + val return : 'a -> 'a t + (** Future that is already computed *) + + val fail : exn -> 'a t + (** Future that fails immediately *) + + val make : (unit -> 'a) -> 'a t + (** Create a future, representing a value that will be computed by + the function. If the function raises, the future will fail. *) + + val make1 : ('a -> 'b) -> 'a -> 'b t + + val make2 : ('a -> 'b -> 'c) -> 'a -> 'b -> 'c t + + (** {2 Basics} *) + + val get : 'a t -> 'a + (** Blocking get: wait for the future to be evaluated, and get the value, + or the exception that failed the future is returned. + raise e if the future failed with e *) + + val state : 'a t -> 'a state + (** State of the future *) + + val is_done : 'a t -> bool + (** Is the future evaluated (success/failure)? *) + + (** {2 Combinators} *) + + val on_success : 'a t -> ('a -> unit) -> unit + (** Attach a handler to be called upon success. + The handler should not call functions on the future. + Might be evaluated now if the future is already done. *) + + val on_failure : _ t -> (exn -> unit) -> unit + (** Attach a handler to be called upon failure. + The handler should not call any function on the future. + Might be evaluated now if the future is already done. *) + + val on_finish : 'a t -> ('a state -> unit) -> unit + (** Attach a handler to be called when the future is evaluated. + The handler should not call functions on the future. + Might be evaluated now if the future is already done. *) + + val flat_map : ('a -> 'b t) -> 'a t -> 'b t + (** Monadic combination of futures *) + + val and_then : 'a t -> (unit -> 'b t) -> 'b t + (** Wait for the first future to succeed, then launch the second *) + + val sequence_a : 'a t array -> 'a array t + (** Future that waits for all previous futures to terminate. If any future + in the array fails, [sequence_a l] fails too. *) + + val map_a : ('a -> 'b t) -> 'a array -> 'b array t + (** [map_l f a] maps [f] on every element of [a], and will return + the array of every result if all calls succeed, or an error otherwise. *) + + val sequence_l : 'a t list -> 'a list t + (** Future that waits for all previous futures to terminate. If any future + in the list fails, [sequence_l l] fails too. *) + + val map_l : ('a -> 'b t) -> 'a list -> 'b list t + (** [map_l f l] maps [f] on every element of [l], and will return + the list of every result if all calls succeed, or an error otherwise. *) + + val choose_a : 'a t array -> 'a t + (** Choose among those futures (the first to terminate). Behaves like + the first future that terminates, by failing if the future fails *) + + val choose_l : 'a t list -> 'a t + (** Choose among those futures (the first to terminate). Behaves like + the first future that terminates, by failing if the future fails *) + + val map : ('a -> 'b) -> 'a t -> 'b t + (** Maps the value inside the future. The function doesn't run in its + own task; if it can take time, use {!flat_map} or {!map_async} *) + + val map_async : ('a -> 'b) -> 'a t -> 'b t + (** Maps the value inside the future, to be computed in a separated job. *) + + val sleep : float -> unit t + (** Future that returns with success in the given amount of seconds. Blocks + the thread! If you need to wait on many events, consider + using {!CCTimer}. *) + + module Infix : sig + val (>>=) : 'a t -> ('a -> 'b t) -> 'b t + val (>>) : 'a t -> (unit -> 'b t) -> 'b t + val (>|=) : 'a t -> ('a -> 'b) -> 'b t + end + + val (>>=) : 'a t -> ('a -> 'b t) -> 'b t + val (>>) : 'a t -> (unit -> 'b t) -> 'b t + val (>|=) : 'a t -> ('a -> 'b) -> 'b t + end +end