diff --git a/_oasis b/_oasis index 15078c4f..33814989 100644 --- a/_oasis +++ b/_oasis @@ -121,7 +121,7 @@ Library "containers_misc" Library "containers_thread" Path: src/threads/ - Modules: CCFuture + Modules: CCFuture, CCLock FindlibName: thread FindlibParent: containers Build$: flag(thread) @@ -172,22 +172,19 @@ Executable bench_hash MainIs: bench_hash.ml BuildDepends: containers, containers.misc -Executable test_levenshtein - Path: tests/ - Install: false - CompiledObject: native - Build$: flag(tests) - MainIs: test_levenshtein.ml - BuildDepends: containers, qcheck, containers.string - -Executable test_threads +Executable run_test_future Path: tests/threads/ Install: false CompiledObject: best Build$: flag(tests) && flag(thread) - MainIs: test_future.ml + MainIs: run_test_future.ml BuildDepends: containers, threads, sequence, oUnit, containers.thread +Test future + Command: echo "run test future" ; ./run_test_future.native + TestTools: run_test_future + Run$: flag(tests) && flag(thread) + PreBuildCommand: make qtest-gen Executable run_qtest @@ -208,7 +205,7 @@ Executable run_tests MainIs: run_tests.ml Build$: flag(tests) && flag(misc) BuildDepends: containers, containers.data, oUnit, sequence, gen, - qcheck, containers.misc + qcheck, containers.misc, containers.string Test all Command: make test-all diff --git a/src/threads/CCFuture.ml b/src/threads/CCFuture.ml index b1564738..998daff7 100644 --- a/src/threads/CCFuture.ml +++ b/src/threads/CCFuture.ml @@ -25,476 +25,358 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. (** {1 Futures for concurrency} *) -(** {2 MVar: a zero-or-one element thread-safe box} *) - -module MVar = struct - type 'a t = { - mutable content : 'a option; - mutex : Mutex.t; - on_take : Condition.t; (* signal that a value was removed (empty) *) - on_put : Condition.t; (* signal that a value was added (full) *) - } - - (** Create an empty box *) - let empty () = { - content = None; - mutex = Mutex.create (); - on_take = Condition.create (); - on_put = Condition.create (); - } - - (** Create a full box *) - let full x = { - content = Some x; - mutex = Mutex.create (); - on_take = Condition.create (); - on_put = Condition.create (); - } - - (** Is the box currently empty? *) - let is_empty box = - Mutex.lock box.mutex; - let ans = box.content <> None in - Mutex.unlock box.mutex; - ans - - (* assuming we have a lock on given box, wait it gets a value and return it *) - let rec wait_put box = - match box.content with - | None -> - Condition.wait box.on_put box.mutex; - wait_put box (* try again *) - | Some x -> x - - (* same, but waits for the box to become empty *) - let rec wait_take box = - match box.content with - | None -> () (* empty! *) - | Some _ -> - Condition.wait box.on_take box.mutex; - wait_take box (* try again *) - - (** Take value out of the box. Wait if necessary *) - let take box = - Mutex.lock box.mutex; - let x = wait_put box in - box.content <- None; - Condition.broadcast box.on_take; - Mutex.unlock box.mutex; - x - - (** Put a value in the box. Waits if the box is already full *) - let put box x = - Mutex.lock box.mutex; - wait_take box; - box.content <- Some x; - Condition.broadcast box.on_put; - Mutex.unlock box.mutex - - (** Use given function to atomically update content, and return - the previous value and the new one *) - let update box f = - Mutex.lock box.mutex; - let x = wait_put box in - try - let y = f x in - box.content <- Some y; - Condition.broadcast box.on_put; (* signal write *) - Mutex.unlock box.mutex; - x, y - with e -> - Mutex.unlock box.mutex; - raise e - - (** Look at the value, without removing it *) - let peek box = - Mutex.lock box.mutex; - let x = wait_put box in - Mutex.unlock box.mutex; - x -end +type 'a state = + | Done of 'a + | Waiting + | Failed of exn (** {2 Thread pool} *) module Pool = struct + type job = + | Job : ('a -> unit) * 'a -> job + type t = { mutable stop : bool; (* indicate that threads should stop *) mutex : Mutex.t; jobs : job Queue.t; (* waiting jobs *) - mutable threads : waiting_thread list; (* waiting threads *) - mutable cur_size : int; + mutable cur_size : int; (* total number of threads *) max_size : int; - timeout : float; (* idle time after which to discard threads *) } (** Dynamic, growable thread pool *) - and job = unit -> unit - and command = - | Perform of job - | Quit - (** Command sent to a thread *) - and waiting_thread = float * command MVar.t - (** Cleanup waiting threads. precond: pool is locked *) - let cleanup_waiting pool = - let l = pool.threads in - let now = Unix.gettimeofday () in - (* filter threads that have been waiting for too long *) - let l' = List.filter - (fun (time, box) -> - if time +. pool.timeout < now - then (MVar.put box Quit; false) - else true) - l in - pool.threads <- l' + let with_lock_ t f = + Mutex.lock t.mutex; + try + let x = f t in + Mutex.unlock t.mutex; + x + with e -> + Mutex.unlock t.mutex; + raise e - (** Function that the threads run. They also take a MVar to - get commands *) - let serve pool box = - (* wait for a job to come *) - let rec wait_job () = - match MVar.take box with - | Quit -> (Mutex.lock pool.mutex; quit ()) (* exit *) - | Perform job -> - run_job job - (* run the given job *) - and run_job job = - (try job () with _ -> ()); - next () (* loop *) - (* process next task *) - and next () = - Mutex.lock pool.mutex; - if pool.stop then quit () (* stop the pool *) - else if Queue.is_empty pool.jobs - then begin - let now = Unix.gettimeofday () in - (* cleanup waiting threads *) - cleanup_waiting pool; - if pool.cur_size > 1 && List.length pool.threads + 1 = pool.cur_size - then - (* all other threads are waiting, we may need to kill them later *) - (Mutex.unlock pool.mutex; delay ()) - else begin - (* add oneself to the list of waiting threads *) - pool.threads <- (now, box) :: pool.threads; - Mutex.unlock pool.mutex; - wait_job () - end - end else - let job = Queue.pop pool.jobs in - Mutex.unlock pool.mutex; - run_job job - (* delay [pool.timeout], so that in case no job is submitted we - still kill old cached threads *) - and delay () = - Thread.delay pool.timeout; - next () - (* stop the thread (assume we have pool.mutex) *) - and quit () = - pool.cur_size <- pool.cur_size - 1; - Mutex.unlock pool.mutex - in wait_job () + type command = + | Process of job + | Die (* thread has no work to do *) - let size pool = - Mutex.lock pool.mutex; - let n = pool.cur_size in - Mutex.unlock pool.mutex; - n + let die pool = + assert (pool.cur_size > 0); + pool.cur_size <- pool.cur_size - 1; + Die - (** Add a thread to the pool, starting with the first job *) - let add_thread pool job = - let box = MVar.full job in - ignore (Thread.create (serve pool) box) + (** thread: entry point. They seek jobs in the queue *) + let rec serve pool = + match with_lock_ pool get_next with + | Die -> () + | Process (Job (f, x)) -> + f x; + serve pool + + (* thread: seek what to do next (including dying) *) + and get_next pool = + if pool.stop then die pool + else if Queue.is_empty pool.jobs then die pool + else ( + let job = Queue.pop pool.jobs in + Process job + ) (** Create a pool with at most the given number of threads. [timeout] is the time after which idle threads are killed. *) - let create ?(timeout=30.) ~size = + let create ~max_size () = let pool = { stop = false; cur_size = 0; - max_size=size; - timeout; - threads = []; + max_size; jobs = Queue.create (); mutex = Mutex.create (); } in pool - (** Run the job in the given pool *) - let run pool job = - assert (not (pool.stop)); - Mutex.lock pool.mutex; - begin match pool.threads with - | [] when pool.cur_size = pool.max_size -> - (* max capacity reached, push task in queue *) - Queue.push job pool.jobs - | [] -> - (* spawn a thread for the given task *) - add_thread pool (Perform job); - pool.cur_size <- pool.cur_size + 1; - | (_,box)::l' -> - (* use the first thread *) - MVar.put box (Perform job); - pool.threads <- l'; - end; - Mutex.unlock pool.mutex + exception PoolStopped - (** Kill threads in the pool *) - let finish pool = - Mutex.lock pool.mutex; - pool.stop <- true; - (* kill waiting threads *) - List.iter (fun (_,box) -> MVar.put box Quit) pool.threads; - pool.threads <- []; - Mutex.unlock pool.mutex + let run_job pool job = + (* heuristic criterion for starting a new thread. We try to assess + whether there are many busy threads and many waiting tasks. + If there are many threads, it's less likely to start a new one *) + let should_start_thread p = + let num_q = Queue.length p.jobs in + let num_busy = p.cur_size in + let reached_max = p.cur_size = p.max_size in + num_q > 0 && not reached_max && (num_q > 2 * num_busy) + in + (* acquire lock and push job in queue *) + with_lock_ pool + (fun pool -> + if pool.stop then raise PoolStopped; + Queue.push job pool.jobs; + (* maybe start a thread *) + if should_start_thread pool then ( + pool.cur_size <- pool.cur_size + 1; + ignore (Thread.create serve pool) + ) + ) + + (* Run the function on the argument in the given pool *) + let run pool f x = run_job pool (Job (f, x)) + + (* Kill threads in the pool *) + let stop pool = + with_lock_ pool + (fun p -> + p.stop <- true; + Queue.clear p.jobs + ) end -let default_pool = Pool.create ?timeout:None ~size:100 - (** Default pool of threads, should be ok for most uses. *) +let pool = Pool.create ~max_size:50 () +(** Default pool of threads, should be ok for most uses. *) (** {2 Futures} *) -type 'a t = { - mutable content : 'a result; +type 'a handler = 'a state -> unit + +(** A proper future, with a delayed computation *) +type 'a cell = { + mutable state : 'a state; mutable handlers : 'a handler list; (* handlers *) - pool : Pool.t; mutex : Mutex.t; condition : Condition.t; -} (** A future value of type 'a *) -and 'a result = - | NotKnown - | Success of 'a - | Failure of exn - (** Result of a computation *) -and 'a handler = - | OnSuccess of ('a -> unit) - | OnFailure of (exn -> unit) - | OnFinish of (unit -> unit) +} -exception SendTwice - (** Exception raised when a future is evaluated several time *) +(** A future value of type 'a *) +type 'a t = + | Return of 'a + | FailNow of exn + | Run of 'a cell + +type 'a future = 'a t (** {2 Basic Future functions} *) -let make pool = - { content = NotKnown; - handlers = []; - pool; - mutex = Mutex.create (); - condition = Condition.create (); - } +let return x = Return x -let get future = - (* check whether it's finished: precond: mutex is locked *) - let rec check () = - match future.content with - | NotKnown -> - poll () (* wait *) - | Success x -> - Mutex.unlock future.mutex; - x (* return success *) - | Failure e -> - Mutex.unlock future.mutex; - raise e (* raise exception *) - (* poll, to wait for the result to arrive. Precond: mutex is acquired. *) - and poll () = - Condition.wait future.condition future.mutex; - check () (* we have been signaled, check! *) - in - Mutex.lock future.mutex; - check () +let fail e = FailNow e -let send future x = - Mutex.lock future.mutex; - match future.content with - | NotKnown -> (* set content and signal *) - future.content <- Success x; - Condition.broadcast future.condition; - List.iter - (function - | OnSuccess f -> Pool.run future.pool (fun () -> f x) - | OnFinish f -> Pool.run future.pool (fun () -> f ()) - | OnFailure _ -> ()) - future.handlers; - Mutex.unlock future.mutex - | _ -> - Mutex.unlock future.mutex; - raise SendTwice (* already set! *) +let create_cell () = { + state = Waiting; + handlers = []; + mutex = Mutex.create (); + condition = Condition.create (); +} -let fail future e = - Mutex.lock future.mutex; - match future.content with - | NotKnown -> (* set content and signal *) - future.content <- Failure e; - Condition.broadcast future.condition; - List.iter - (function - | OnSuccess _ -> () - | OnFinish f -> f () - | OnFailure f -> f e) - future.handlers; - Mutex.unlock future.mutex - | _ -> - Mutex.unlock future.mutex; - raise SendTwice (* already set! *) +let with_lock_ cell f = + Mutex.lock cell.mutex; + try + let x = f cell in + Mutex.unlock cell.mutex; + x + with e -> + Mutex.unlock cell.mutex; + raise e -let is_done future = - Mutex.lock future.mutex; - match future.content with - | NotKnown -> - Mutex.unlock future.mutex; - false - | _ -> - Mutex.unlock future.mutex; - true +let set_done_ cell x = + with_lock_ cell + (fun cell -> match cell.state with + | Waiting -> (* set state and signal *) + cell.state <- Done x; + Condition.broadcast cell.condition; + List.iter (fun f -> f cell.state) cell.handlers + | _ -> assert false + ) + +let set_fail_ cell e = + with_lock_ cell + (fun cell -> match cell.state with + | Waiting -> + cell.state <- Failed e; + Condition.broadcast cell.condition; + List.iter (fun f -> f cell.state) cell.handlers + | _ -> assert false + ) + +let run_and_set1 cell f x = + try + let y = f x in + set_done_ cell y + with e -> + set_fail_ cell e + +let run_and_set2 cell f x y = + try + let z = f x y in + set_done_ cell z + with e -> + set_fail_ cell e + +let make1 f x = + let cell = create_cell() in + Pool.run pool (run_and_set1 cell f) x; + Run cell + +let make f = make1 f () + +let make2 f x y = + let cell = create_cell() in + Pool.run pool (run_and_set2 cell f x) y; + Run cell + +let get = function + | Return x -> x + | FailNow e -> raise e + | Run cell -> + let rec get_cell cell = match cell.state with + | Waiting -> + Condition.wait cell.condition cell.mutex; (* wait *) + get_cell cell + | Done x -> Mutex.unlock cell.mutex; x + | Failed e -> Mutex.unlock cell.mutex; raise e + in + Mutex.lock cell.mutex; + get_cell cell + +let state = function + | Return x -> Done x + | FailNow e -> Failed e + | Run cell -> + with_lock_ cell (fun cell -> cell.state) + +let is_done = function + | Return _ + | FailNow _ -> true + | Run cell -> + with_lock_ cell (fun c -> c.state <> Waiting) (** {2 Combinators *) -let on_success future k = - Mutex.lock future.mutex; - (match future.content with - | NotKnown -> - future.handlers <- (OnSuccess k) :: future.handlers; (* wait *) - | Success x -> Pool.run future.pool (fun () -> k x) - | Failure _ -> ()); - Mutex.unlock future.mutex +let add_handler_ cell f = + with_lock_ cell + (fun cell -> match cell.state with + | Waiting -> cell.handlers <- f :: cell.handlers + | Done _ | Failed _ -> f cell.state + ) -let on_failure future k = - Mutex.lock future.mutex; - (match future.content with - | NotKnown -> - future.handlers <- (OnFailure k) :: future.handlers; (* wait *) - | Success _ -> () - | Failure e -> Pool.run future.pool (fun () -> k e)); - Mutex.unlock future.mutex +let on_finish fut k = match fut with + | Return x -> k (Done x) + | FailNow e -> k (Failed e) + | Run cell -> add_handler_ cell k -let on_finish future k = - Mutex.lock future.mutex; - (match future.content with - | NotKnown -> - future.handlers <- (OnFinish k) :: future.handlers; (* wait *) - | Success _ | Failure _ -> Pool.run future.pool (fun () -> k ())); - Mutex.unlock future.mutex +let on_success fut k = + on_finish fut + (function + | Done x -> k x + | _ -> () + ) -let flatMap ?pool f future = - let pool = match pool with | Some p -> p | None -> future.pool in - let future' = make pool in - (* if [future] succeeds with [x], we spawn a new job to compute [f x] *) - on_success future - (fun x -> - try - let future'' = f x in - on_success future'' (fun x -> send future' x); - on_failure future'' (fun e -> fail future' e); - with e -> - fail future' e); - on_failure future - (fun e -> fail future' e); - future' +let on_failure fut k = + on_finish fut + (function + | Failed e -> k e + | _ -> () + ) -let andThen ?pool future f = - flatMap ?pool (fun _ -> f ()) future - -let sequence ?(pool=default_pool) futures = - let a = Array.of_list futures in - let n = Array.length a in - let results = Array.make n NotKnown in - let future' = make default_pool in - (* state: how many remain to finish *) - let count = MVar.full (Array.length a) in - (* when all futures returned, collect results for future' *) - let check_at_end () = - let l = Array.to_list results in - try - let l = List.map +let map f fut = match fut with + | Return x -> make1 f x + | FailNow e -> FailNow e + | Run cell -> + let cell' = create_cell() in + add_handler_ cell (function - | Success x -> x - | Failure e -> raise e - | NotKnown -> assert false) - l in - send future' l - with e -> - fail future' e - in - (* function called whenever a future succeeds *) - let one_succeeded i x = - results.(i) <- Success x; - let _, n = MVar.update count (fun x -> x-1) in - if n = 0 then check_at_end () - and one_failed i e = - results.(i) <- Failure e; - let _, n = MVar.update count (fun x -> x-1) in - if n = 0 then check_at_end () + | Done x -> run_and_set1 cell' f x + | Failed e -> set_fail_ cell' e + | Waiting -> assert false + ); + Run cell' + +let flat_map f fut = match fut with + | Return x -> f x + | FailNow e -> FailNow e + | Run cell -> + let cell' = create_cell() in + add_handler_ cell + (function + | Done x -> + let fut' = f x in + on_finish fut' + (function + | Done y -> set_done_ cell' y + | Failed e -> set_fail_ cell' e + | Waiting -> assert false + ) + | Failed e -> set_fail_ cell' e + | Waiting -> assert false + ); + Run cell' + +let and_then fut f = flat_map (fun _ -> f ()) fut + +let sequence futures = + let n = List.length futures in + let state = CCLock.create (`WaitFor n) in + let results = Array.make n None in + let cell = create_cell() in + (* when all futures returned, collect results for future' *) + let send_result () = + let l = Array.map + (function + | None -> assert false + | Some x -> x + ) results + in + set_done_ cell (Array.to_list l) in (* wait for all to succeed or fail *) - for i = 0 to Array.length a - 1 do - on_success a.(i) (one_succeeded i); - on_failure a.(i) (one_failed i); - done; - future' + List.iteri + (fun i fut -> + on_finish fut + (fun res -> + CCLock.update state + (fun st -> match res, st with + | Done _, `Failed -> st + | Done x, `WaitFor 1 -> results.(i) <- Some x; send_result (); `Done + | Done x, `WaitFor n -> results.(i) <- Some x; `WaitFor (n-1) + | Failed _, `Failed -> st + | Failed e, `WaitFor _ -> set_fail_ cell e; `Failed + | _, `Done -> assert false + | Waiting, _ -> assert false + ) + ) + ) futures; + Run cell -let choose ?(pool=default_pool) futures = - let future' = make default_pool in - let one_finished = MVar.full false in - (* handlers. The first handler to be called will update [one_finished] - to true, see that it was false (hence know it is the first) - and propagate its result to [future'] *) - let one_succeeded x = - let one_finished, _ = MVar.update one_finished (fun _ -> true) in - if not one_finished then send future' x - and one_failed e = - let one_finished, _ = MVar.update one_finished (fun _ -> true) in - if not one_finished then fail future' e - in +let choose futures = + let cell = create_cell() in + let state = ref `Waiting in (* add handlers to all futures *) List.iter - (fun future -> - on_success future one_succeeded; - on_failure future one_failed; ) - futures; - future' + (fun fut -> + on_finish fut + (fun res -> match res, !state with + | Done x, `Waiting -> state := `Done; set_done_ cell x + | Failed e, `Waiting -> state := `Done; set_fail_ cell e + | Waiting, _ -> assert false + | _, `Done -> () + ) + ) futures; + Run cell -let map ?(pool=default_pool) f future = - let future' = make pool in - on_success future (fun x -> let y = f x in send future' y); - on_failure future (fun e -> fail future' e); - future' - -(** {2 Future constructors} *) - -let return x = - { content = Success x; - handlers = []; - pool = default_pool; - mutex = Mutex.create (); - condition = Condition.create (); - } - -let spawn ?(pool=default_pool) f = - let future = make pool in - (* schedule computation *) - Pool.run pool - (fun () -> - try - let x = f () in - send future x - with e -> - fail future e); - future - -(** slurp the entire content of the file_descr into a string *) +(** slurp the entire state of the file_descr into a string *) let slurp i_chan = let buf_size = 128 in - let content = Buffer.create 120 + let state = Buffer.create 120 and buf = String.make 128 'a' in let rec next () = let num = input i_chan buf 0 buf_size in if num = 0 - then Buffer.contents content (* EOF *) - else (Buffer.add_substring content buf 0 num; next ()) + then Buffer.contents state (* EOF *) + else ( + Buffer.add_substring state buf 0 num; + next () + ) in next () (** Spawn a sub-process with the given command [cmd] (and possibly input); returns a future containing (returncode, stdout, stderr) *) -let spawn_process ?(pool=default_pool) ?(stdin="") ~cmd = - spawn ~pool +let spawn_process ?(stdin="") ~cmd () = + make (fun () -> (* spawn subprocess *) let out, inp, err = Unix.open_process_full cmd (Unix.environment ()) in @@ -512,162 +394,84 @@ let spawn_process ?(pool=default_pool) ?(stdin="") ~cmd = | Unix.WEXITED i -> i | Unix.WSIGNALED i -> i | Unix.WSTOPPED i -> i in - (returncode, out', err')) + (returncode, out', err') + ) -(* TODO a global scheduler for timed events *) - -let sleep ?(pool=default_pool) time = - spawn ~pool - (fun () -> Thread.delay time; ()) +let sleep time = make (fun () -> Thread.delay time) (** {2 Event timer} *) -(** {3 Mutable heap (taken from heap.ml to avoid dependencies)} *) -module Heap = struct - type 'a t = { - mutable tree : 'a tree; - cmp : 'a -> 'a -> int; - } (** A splay tree heap with the given comparison function *) - and 'a tree = - | Empty - | Node of ('a tree * 'a * 'a tree) - (** A splay tree containing values of type 'a *) - - let empty ~cmp = { - tree = Empty; - cmp; - } - - let is_empty h = - match h.tree with - | Empty -> true - | Node _ -> false - - let clear h = - h.tree <- Empty - - (** Partition the tree into (elements <= pivot, elements > pivot) *) - let rec partition ~cmp pivot tree = - match tree with - | Empty -> Empty, Empty - | Node (a, x, b) -> - if cmp x pivot <= 0 - then begin - match b with - | Empty -> (tree, Empty) - | Node (b1, y, b2) -> - if cmp y pivot <= 0 - then - let small, big = partition ~cmp pivot b2 in - Node (Node (a, x, b1), y, small), big - else - let small, big = partition ~cmp pivot b1 in - Node (a, x, small), Node (big, y, b2) - end else begin - match a with - | Empty -> (Empty, tree) - | Node (a1, y, a2) -> - if cmp y pivot <= 0 - then - let small, big = partition ~cmp pivot a2 in - Node (a1, y, small), Node (big, x, b) - else - let small, big = partition ~cmp pivot a1 in - small, Node (big, y, Node (a2, x, b)) - end - - (** Insert the element in the tree *) - let insert h x = - let small, big = partition ~cmp:h.cmp x h.tree in - let tree' = Node (small, x, big) in - h.tree <- tree' - - (** Access minimum value *) - let min h = - let rec min tree = - match tree with - | Empty -> raise Not_found - | Node (Empty, x, _) -> x - | Node (l, _, _) -> min l - in min h.tree - - (** Get minimum value and remove it from the tree *) - let pop h = - let rec delete_min tree = match tree with - | Empty -> raise Not_found - | Node (Empty, x, b) -> x, b - | Node (Node (Empty, x, b), y, c) -> - x, Node (b, y, c) (* rebalance *) - | Node (Node (a, x, b), y, c) -> - let m, a' = delete_min a in - m, Node (a', x, Node (b, y, c)) - in - let m, tree' = delete_min h.tree in - h.tree <- tree'; - m -end - module Timer = struct + module TaskHeap = CCHeap.Make(struct + type t = (float * unit cell) + let leq (f1,_)(f2,_) = f1 <= f2 + end) + type t = { mutable stop : bool; mutable thread : Thread.t option; (* thread dedicated to the timer *) - pool : Pool.t; - tasks : (float * (unit -> unit)) Heap.t; - mutex : Mutex.t; + mutable tasks : TaskHeap.t; + t_mutex : Mutex.t; fifo_in : Unix.file_descr; fifo_out : Unix.file_descr; } (** A timer for events *) - let cmp_tasks (f1,_) (f2,_) = - compare f1 f2 - - let standby_wait = 30. (* when no task is scheduled *) + let standby_wait = 10. (* when no task is scheduled *) let epsilon = 0.0001 (* accepted time diff for actions *) + let with_lock_ t f = + Mutex.lock t.t_mutex; + try + let x = f t in + Mutex.unlock t.t_mutex; + x + with e -> + Mutex.unlock t.t_mutex; + raise e + + type command = + | Loop + | Wait of float + + let pop_task_ t = + let tasks, _ = TaskHeap.take_exn t.tasks in + t.tasks <- tasks + (** Wait for next event, run it, and loop *) let serve timer = let buf = String.make 1 '_' in - (* process next task *) - let rec next () = - Mutex.lock timer.mutex; - (* what is the next task? *) - let next_task = - try Some (Heap.min timer.tasks) - with Not_found -> None in - match next_task with - | _ when timer.stop -> Mutex.unlock timer.mutex (* stop *) - | None -> - Mutex.unlock timer.mutex; - wait standby_wait (* wait for a task *) - | Some (time, task) -> - let now = Unix.gettimeofday () in - if now +. epsilon > time - then begin (* run task in the pool *) - Pool.run timer.pool task; - ignore (Heap.pop timer.tasks); - Mutex.unlock timer.mutex; - (* process next task, if any *) - next () - end else (* too early, wait *) - (Mutex.unlock timer.mutex; - wait (time -. now)) + (* acquire lock, call [process_task] and do as it commands *) + let rec next () = match with_lock_ timer process_task with + | Loop -> next () + | Wait delay -> wait delay + (* check next task *) + and process_task timer = match TaskHeap.find_min timer.tasks with + | None -> Wait standby_wait + | Some (time, cell) -> + let now = Unix.gettimeofday () in + if now +. epsilon > time then ( + (* now! *) + pop_task_ timer; + set_done_ cell (); + Loop + ) else Wait (time -. now) (* wait for [delay] seconds, or until something happens on fifo_in *) and wait delay = let read = Thread.wait_timed_read timer.fifo_in delay in - (if read then ignore (Unix.read timer.fifo_in buf 0 1)); (* remove char *) + if read + then ignore (Unix.read timer.fifo_in buf 0 1); (* remove char *) next () in next () (** A timer that runs in the given thread pool *) - let create ?(pool=default_pool) () = + let create () = let fifo_in, fifo_out = Unix.pipe () in let timer = { stop = false; - pool; thread = None; - tasks = Heap.empty ~cmp:cmp_tasks; - mutex = Mutex.create (); + tasks = TaskHeap.empty; + t_mutex = Mutex.create (); fifo_in; fifo_out; } in @@ -677,45 +481,59 @@ module Timer = struct timer (** [timerule_at s t act] will run [act] at the Unix echo [t] *) - let schedule_at timer time task = - Mutex.lock timer.mutex; - (* time of the next scheduled event *) - let next_time = - try let time, _ = Heap.min timer.tasks in time - with Not_found -> max_float - in - (* insert task *) - Heap.insert timer.tasks (time, task); - (* see if the timer thread needs to be awaken earlier *) - (if time < next_time - then ignore (Unix.single_write timer.fifo_out "_" 0 1)); - Mutex.unlock timer.mutex; - () + let at timer time = + let now = Unix.gettimeofday () in + if now >= time + then return () + else ( + let cell = create_cell() in + with_lock_ timer + (fun timer -> + (* time of the next scheduled event *) + let next_time = match TaskHeap.find_min timer.tasks with + | None -> max_float + | Some (f, _) -> f + in + (* insert task *) + timer.tasks <- TaskHeap.insert (time, cell) timer.tasks; + (* see if the timer thread needs to be awaken earlier *) + if time < next_time + then ignore (Unix.single_write timer.fifo_out "_" 0 1) + ); + Run cell + ) - (** [schedule_in s d act] will run [act] in [d] seconds *) - let schedule_in timer delay task = + let after timer delay = assert (delay >= 0.); - schedule_at timer (Unix.gettimeofday () +. delay) task + let now = Unix.gettimeofday () in + at timer (now +. delay) (** Stop the given timer, cancelling pending tasks *) let stop timer = - Mutex.lock timer.mutex; - (if timer.stop then (Mutex.unlock timer.mutex; assert false)); - timer.stop <- true; - (* empty heap of tasks *) - Heap.clear timer.tasks; - (* kill the thread *) - (match timer.thread with - | None -> () - | Some t -> - Thread.kill t; - timer.thread <- None); - Mutex.unlock timer.mutex + with_lock_ timer + (fun timer -> + if not timer.stop then ( + timer.stop <- true; + (* empty heap of tasks *) + timer.tasks <- TaskHeap.empty; + (* kill the thread *) + match timer.thread with + | None -> () + | Some t -> + Thread.kill t; + timer.thread <- None + ) + ) end module Infix = struct - let (>>=) x f = flatMap f x - let (>>) a f = andThen a f + let (>>=) x f = flat_map f x + let (>>) a f = and_then a f + let (>|=) a f = map f a end include Infix + +(** {2 Low Level } *) + +let stop_pool () = Pool.stop pool diff --git a/src/threads/CCFuture.mli b/src/threads/CCFuture.mli index e39be681..fb7ac806 100644 --- a/src/threads/CCFuture.mli +++ b/src/threads/CCFuture.mli @@ -25,147 +25,114 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. (** {1 Futures for concurrency} *) +type 'a state = + | Done of 'a + | Waiting + | Failed of exn + type 'a t - (** A future value of type 'a *) +(** A future value of type 'a *) -exception SendTwice - (** Exception raised when a future is evaluated several time *) +type 'a future = 'a t -(** {2 MVar: a zero-or-one element thread-safe box} *) +(** {2 Constructors} *) -module MVar : sig - type 'a t +val return : 'a -> 'a t +(** Future that is already computed *) - val empty : unit -> 'a t - (** Create an empty box *) +val fail : exn -> 'a t +(** Future that fails immediately *) - val full : 'a -> 'a t - (** Create a full box *) +val make : (unit -> 'a) -> 'a t +(** Create a future, representing a value that will be computed by + the function. If the function raises, the future will fail. *) - val is_empty : _ t -> bool - (** Is the box currently empty? *) +val make1 : ('a -> 'b) -> 'a -> 'b t +val make2 : ('a -> 'b -> 'c) -> 'a -> 'b -> 'c t - val take : 'a t -> 'a - (** Take value out of the box. Wait if necessary *) - - val put : 'a t -> 'a -> unit - (** Put a value in the box. Waits if the box is already empty *) - - val update : 'a t -> ('a -> 'a) -> 'a * 'a - (** Use given function to atomically update content, and return - the previous value and the new one *) - - val peek : 'a t -> 'a - (** Look at the value, without removing it *) -end - -(** {2 Thread pool} *) -module Pool : sig - type t - (** A pool of threads *) - - val create : ?timeout:float -> size:int -> t - (** Create a pool with at most the given number of threads. [timeout] - is the time after which idle threads are killed. *) - - val size : t -> int - (** Current size of the pool *) - - val run : t -> (unit -> unit) -> unit - (** Run the function in the pool *) - - val finish : t -> unit - (** Kill threads in the pool *) -end - -val default_pool : Pool.t - (** Pool of threads that is used by default. Growable if needed. *) - -(** {2 Basic low-level Future functions} *) - -val make : Pool.t -> 'a t - (** Create a future, representing a value that is not known yet. *) +(** {2 Basics} *) val get : 'a t -> 'a - (** Blocking get: wait for the future to be evaluated, and get the value, - or the exception that failed the future is returned *) +(** Blocking get: wait for the future to be evaluated, and get the value, + or the exception that failed the future is returned. + @raise e if the exception failed with e *) -val send : 'a t -> 'a -> unit - (** Send a result to the future. Will raise SendTwice if [send] has - already been called on this future before *) - -val fail : 'a t -> exn -> unit - (** Fail the future by raising an exception inside it *) +val state : 'a t -> 'a state +(** State of the future *) val is_done : 'a t -> bool - (** Is the future evaluated (success/failure)? *) +(** Is the future evaluated (success/failure)? *) (** {2 Combinators} *) val on_success : 'a t -> ('a -> unit) -> unit - (** Attach a handler to be called upon success *) +(** Attach a handler to be called upon success *) val on_failure : _ t -> (exn -> unit) -> unit - (** Attach a handler to be called upon failure *) +(** Attach a handler to be called upon failure *) -val on_finish : _ t -> (unit -> unit) -> unit - (** Attach a handler to be called when the future is evaluated *) +val on_finish : 'a t -> ('a state -> unit) -> unit +(** Attach a handler to be called when the future is evaluated *) -val flatMap : ?pool:Pool.t -> ('a -> 'b t) -> 'a t -> 'b t - (** Monadic combination of futures *) +val flat_map : ('a -> 'b t) -> 'a t -> 'b t +(** Monadic combination of futures *) -val andThen : ?pool:Pool.t -> 'a t -> (unit -> 'b t) -> 'b t - (** Wait for the first future to succeed, then launch the second *) +val and_then : 'a t -> (unit -> 'b t) -> 'b t +(** Wait for the first future to succeed, then launch the second *) -val sequence : ?pool:Pool.t -> 'a t list -> 'a list t - (** Future that waits for all previous sequences to terminate *) +val sequence : 'a t list -> 'a list t +(** Future that waits for all previous sequences to terminate. If any future + in the list fails, [sequence l] fails too. *) -val choose : ?pool:Pool.t -> 'a t list -> 'a t - (** Choose among those futures (the first to terminate) *) +val choose : 'a t list -> 'a t +(** Choose among those futures (the first to terminate). Behaves like + the first future that terminates, by failing if the future fails *) -val map : ?pool:Pool.t -> ('a -> 'b) -> 'a t -> 'b t - (** Maps the value inside the future *) +val map : ('a -> 'b) -> 'a t -> 'b t +(** Maps the value inside the future. The function doesn't run in its + own task; if it can take time, use {!flat_map} *) -(** {2 Future constructors} *) +(** {2 Helpers} *) -val return : 'a -> 'a t - (** Future that is already computed *) - -val spawn : ?pool:Pool.t -> (unit -> 'a) -> 'a t - (** Spawn a thread that wraps the given computation *) - -val spawn_process : ?pool:Pool.t -> ?stdin:string -> cmd:string -> +val spawn_process : ?stdin:string -> cmd:string -> unit -> (int * string * string) t - (** Spawn a sub-process with the given command [cmd] (and possibly input); +(** Spawn a sub-process with the given command [cmd] (and possibly input); returns a future containing (returncode, stdout, stderr) *) -val sleep : ?pool:Pool.t -> float -> unit t - (** Future that returns with success in the given amount of seconds *) +val sleep : float -> unit t +(** Future that returns with success in the given amount of seconds *) (** {2 Event timer} *) module Timer : sig type t - (** A scheduler for events *) + (** A scheduler for events. It runs in its own thread. *) - val create : ?pool:Pool.t -> unit -> t - (** A timer that runs tasks in the given thread pool *) + val create : unit -> t + (** A new timer. *) - val schedule_at : t -> float -> (unit -> unit) -> unit - (** [schedule_at s t act] will run [act] at the Unix echo [t] *) + val after : t -> float -> unit future + (** Create a future that waits for the given number of seconds, then + awakens with [()] *) - val schedule_in : t -> float -> (unit -> unit) -> unit - (** [schedule_in s d act] will run [act] in [d] seconds *) + val at : t -> float -> unit future + (** Create a future that evaluates to [()] at the given Unix timestamp *) val stop : t -> unit - (** Stop the given timer, cancelling pending tasks *) + (** Stop the given timer, cancelling pending tasks *) end - module Infix : sig val (>>=) : 'a t -> ('a -> 'b t) -> 'b t val (>>) : 'a t -> (unit -> 'b t) -> 'b t + val (>|=) : 'a t -> ('a -> 'b) -> 'b t end val (>>=) : 'a t -> ('a -> 'b t) -> 'b t val (>>) : 'a t -> (unit -> 'b t) -> 'b t +val (>|=) : 'a t -> ('a -> 'b) -> 'b t + +(** {2 Low level} *) + +val stop_pool : unit -> unit +(** Stop the thread pool *) diff --git a/tests/threads/run_test_future.ml b/tests/threads/run_test_future.ml new file mode 100644 index 00000000..9c94b7fc --- /dev/null +++ b/tests/threads/run_test_future.ml @@ -0,0 +1,87 @@ + +(** Test Future *) + +open OUnit + +module Future = CCFuture +open Future.Infix + +let test_parallel n () = + let l = Sequence.(1 -- n) |> Sequence.to_list in + let l = List.map (fun i -> + Future.make + (fun () -> + Thread.delay 0.1; + 1 + )) l in + let l' = List.map Future.get l in + OUnit.assert_equal n (List.fold_left (+) 0 l'); + () + +let test_map () = + let a = Future.make (fun () -> 1) in + let b = Future.map (fun x -> x+1) a in + let c = Future.map (fun x -> x-1) b in + OUnit.assert_equal 1 (Future.get c) + +let test_sequence_ok () = + let l = CCList.(1 -- 10) in + let l' = l + |> List.map + (fun x -> Future.make (fun () -> Thread.delay 0.2; x*10)) + |> Future.sequence + |> Future.map (List.fold_left (+) 0) + in + let expected = List.fold_left (fun acc x -> acc + 10 * x) 0 l in + OUnit.assert_equal expected (Future.get l') + +let test_sequence_fail () = + let l = CCList.(1 -- 10) in + let l' = l + |> List.map + (fun x -> Future.make (fun () -> Thread.delay 0.2; if x = 5 then raise Exit; x)) + |> Future.sequence + |> Future.map (List.fold_left (+) 0) + in + OUnit.assert_raises Exit (fun () -> Future.get l') + +let test_time () = + let start = Unix.gettimeofday () in + let l = CCList.(1 -- 10) + |> List.map (fun _ -> Future.make (fun () -> Thread.delay 0.5)) + in + List.iter Future.get l; + let stop = Unix.gettimeofday () in + OUnit.assert_bool "some_parallelism" (stop -. start < 10. *. 0.5); + () + +let test_timer () = + let timer = Future.Timer.create () in + let n = CCLock.create 1 in + let get = Future.make (fun () -> Thread.delay 0.7; CCLock.get n) in + let _ = + Future.Timer.after timer 0.5 + >>= fun () -> CCLock.update n (fun x -> x+2); Future.return() + in + let _ = + Future.Timer.after timer 0.2 + >>= fun () -> CCLock.update n (fun x -> x * 4); Future.return() + in + OUnit.assert_equal 6 (Future.get get); + () + +let suite = + "test_future" >::: + [ + "test_parallel_10" >:: test_parallel 10; + "test_parallel_300" >:: test_parallel 300; + "test_time" >:: test_time; + "test_map" >:: test_map; + "test_sequence_ok" >:: test_sequence_ok; + "test_sequence_fail" >:: test_sequence_fail; + "test_timer" >:: test_timer; + ] + +let () = + let _ = OUnit.run_test_tt_main suite in + () diff --git a/tests/threads/test_future.ml b/tests/threads/test_future.ml deleted file mode 100644 index cabb7f39..00000000 --- a/tests/threads/test_future.ml +++ /dev/null @@ -1,52 +0,0 @@ - -(** Test Future *) - -open OUnit - -module Future = CCFuture - -let test_mvar () = - let box = Future.MVar.empty () in - let f = Future.spawn (fun () -> Future.MVar.take box + 1) in - Thread.delay 0.1; - OUnit.assert_bool "still waiting" (not (Future.is_done f)); - Future.MVar.put box 1; - OUnit.assert_equal 2 (Future.get f); - () - -let test_parallel () = - let l = Sequence.(1 -- 300) in - let l = Sequence.map (fun _ -> Future.spawn (fun () -> Thread.delay 0.1; 1)) l in - let l = Sequence.to_list l in - let l' = List.map Future.get l in - OUnit.assert_equal 300 (List.fold_left (+) 0 l'); - () - -let test_time () = - let start = Unix.gettimeofday () in - let f1 = Future.spawn (fun () -> Thread.delay 0.5) in - let f2 = Future.spawn (fun () -> Thread.delay 0.5) in - Future.get f1; - Future.get f2; - let stop = Unix.gettimeofday () in - OUnit.assert_bool "parallelism" (stop -. start < 0.75); - () - -let test_timer () = - let timer = Future.Timer.create () in - let mvar = Future.MVar.full 1 in - Future.Timer.schedule_in timer 0.5 - (fun () -> ignore (Future.MVar.update mvar (fun x -> x + 2))); - Future.Timer.schedule_in timer 0.2 - (fun () -> ignore (Future.MVar.update mvar (fun x -> x * 4))); - Thread.delay 0.7; - OUnit.assert_equal 6 (Future.MVar.peek mvar); - () - -let suite = - "test_future" >::: - [ "test_mvar" >:: test_mvar; - "test_parallel" >:: test_parallel; - "test_time" >:: test_time; - "test_timer" >:: test_timer; - ]