From 9f34a7f6e342a3db3c4fba55c10028561014cf20 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 3 Nov 2016 21:38:17 +0100 Subject: [PATCH] remove containers.thread --- README.adoc | 9 - _oasis | 24 +- doc/intro.txt | 16 - src/threads/CCBlockingQueue.ml | 191 ----------- src/threads/CCBlockingQueue.mli | 50 --- src/threads/CCLock.ml | 176 ----------- src/threads/CCLock.mli | 87 ----- src/threads/CCPool.ml | 545 -------------------------------- src/threads/CCPool.mli | 167 ---------- src/threads/CCSemaphore.ml | 117 ------- src/threads/CCSemaphore.mli | 31 -- src/threads/CCThread.ml | 85 ----- src/threads/CCThread.mli | 58 ---- src/threads/CCTimer.ml | 195 ------------ src/threads/CCTimer.mli | 43 --- 15 files changed, 3 insertions(+), 1791 deletions(-) delete mode 100644 src/threads/CCBlockingQueue.ml delete mode 100644 src/threads/CCBlockingQueue.mli delete mode 100644 src/threads/CCLock.ml delete mode 100644 src/threads/CCLock.mli delete mode 100644 src/threads/CCPool.ml delete mode 100644 src/threads/CCPool.mli delete mode 100644 src/threads/CCSemaphore.ml delete mode 100644 src/threads/CCSemaphore.mli delete mode 100644 src/threads/CCThread.ml delete mode 100644 src/threads/CCThread.mli delete mode 100644 src/threads/CCTimer.ml delete mode 100644 src/threads/CCTimer.mli diff --git a/README.adoc b/README.adoc index 04738814..42776c22 100644 --- a/README.adoc +++ b/README.adoc @@ -188,15 +188,6 @@ Iterators: - `CCKTree`, an abstract lazy tree structure -=== Thread - -In the library `containers.thread`, for preemptive system threads: - -- `CCFuture`, a set of tools for preemptive threading, including a thread pool, - monadic futures, and MVars (concurrent boxes) -- `CCLock`, values protected by locks -- `CCSemaphore`, a simple implementation of semaphores -- `CCThread` basic wrappers for `Thread` === Misc diff --git a/_oasis b/_oasis index 259f985b..efe0ea9b 100644 --- a/_oasis +++ b/_oasis @@ -21,17 +21,10 @@ Description: extend the stdlib (e.g. CCList provides safe map/fold_right/append, and additional functions on lists). - It also features optional libraries for dealing with strings, and - helpers for unix and threads. - Flag "unix" Description: Build the containers.unix library (depends on Unix) Default: false -Flag "thread" - Description: Build modules that depend on threads - Default: true - Flag "bench" Description: Build and run benchmarks Default: true @@ -79,17 +72,6 @@ Library "containers_iter" FindlibParent: containers FindlibName: iter -Library "containers_thread" - Path: src/threads/ - Modules: CCPool, CCLock, CCSemaphore, CCThread, CCBlockingQueue, - CCTimer - FindlibName: thread - FindlibParent: containers - Build$: flag(thread) - Install$: flag(thread) - BuildDepends: containers, threads - XMETARequires: containers, threads - Library "containers_top" Path: src/top/ Modules: Containers_top @@ -109,7 +91,7 @@ Document containers "-docflags '-colorize-code -short-functors -charset utf-8'" XOCamlbuildLibraries: containers, containers.iter, containers.data, - containers.thread, containers.unix, containers.sexp + containers.unix, containers.sexp Executable run_benchs Path: benchs/ @@ -119,7 +101,7 @@ Executable run_benchs MainIs: run_benchs.ml BuildDepends: containers, qcheck, containers.data, containers.iter, - containers.thread, sequence, gen, benchmark, hamt + sequence, gen, benchmark, hamt Executable run_bench_hash Path: benchs/ @@ -138,7 +120,7 @@ Executable run_qtest MainIs: run_qtest.ml Build$: flag(tests) && flag(bigarray) && flag(unix) BuildDepends: containers, containers.iter, - containers.sexp, containers.unix, containers.thread, + containers.sexp, containers.unix, containers.data, sequence, gen, unix, oUnit, qcheck diff --git a/doc/intro.txt b/doc/intro.txt index 2a973e9f..7e96e213 100644 --- a/doc/intro.txt +++ b/doc/intro.txt @@ -129,22 +129,6 @@ Moved to its own repository. Moved to its own repository -{4 Thread Helpers} - -{b findlib name}: containers.thread - -Modules related to the use of [Thread]. - -{!modules: -CCBlockingQueue -CCLock -CCPool -CCSemaphore -CCThread -CCTimer -} - - {2 Index} {!indexlist} diff --git a/src/threads/CCBlockingQueue.ml b/src/threads/CCBlockingQueue.ml deleted file mode 100644 index d767b4ab..00000000 --- a/src/threads/CCBlockingQueue.ml +++ /dev/null @@ -1,191 +0,0 @@ - -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Blocking Queue} *) - -type 'a t = { - q : 'a Queue.t; - lock : Mutex.t; - cond : Condition.t; - capacity : int; - mutable size : int; -} - -let create n = - if n < 1 then invalid_arg "BloquingQueue.create"; - let q = { - q=Queue.create(); - lock=Mutex.create(); - cond=Condition.create(); - capacity=n; - size=0; - } in - q - -let incr_size_ q = assert(q.size < q.capacity); q.size <- q.size + 1 -let decr_size_ q = assert(q.size > 0); q.size <- q.size - 1 - -let finally_ f x ~h = - try - let res = f x in - ignore (h ()); - res - with e -> - ignore (h()); - raise e - -let with_lock_ q f = - Mutex.lock q.lock; - finally_ f () ~h:(fun () -> Mutex.unlock q.lock) - -let push q x = - with_lock_ q - (fun () -> - while q.size = q.capacity do - Condition.wait q.cond q.lock - done; - assert (q.size < q.capacity); - Queue.push x q.q; - (* if there are blocked receivers, awake one of them *) - incr_size_ q; - Condition.broadcast q.cond) - -let take q = - with_lock_ q - (fun () -> - while q.size = 0 do - Condition.wait q.cond q.lock - done; - let x = Queue.take q.q in - (* if there are blocked senders, awake one of them *) - decr_size_ q; - Condition.broadcast q.cond; - x) - -(*$R - let q = create 1 in - let t1 = CCThread.spawn (fun () -> push q 1; push q 2) in - let t2 = CCThread.spawn (fun () -> push q 3; push q 4) in - let l = CCLock.create [] in - let t3 = CCThread.spawn (fun () -> for i = 1 to 4 do - let x = take q in - CCLock.update l (fun l -> x :: l) - done) - in - Thread.join t1; Thread.join t2; Thread.join t3; - assert_equal [1;2;3;4] (List.sort Pervasives.compare (CCLock.get l)) -*) - -let push_list q l = - (* push elements until it's not possible. - Assumes the lock is acquired. *) - let rec push_ q l = match l with - | [] -> l - | _::_ when q.size = q.capacity -> l (* no room remaining *) - | x :: tl -> - Queue.push x q.q; - incr_size_ q; - push_ q tl - in - (* push chunks of [l] in [q] until [l] is empty *) - let rec aux q l = match l with - | [] -> () - | _::_ -> - let l = with_lock_ q - (fun () -> - while q.size = q.capacity do - Condition.wait q.cond q.lock - done; - let l = push_ q l in - Condition.broadcast q.cond; - l) - in - aux q l - in aux q l - -let take_list q n = - (* take at most [n] elements of [q] and prepend them to [acc] *) - let rec pop_ acc q n = - if n=0 || Queue.is_empty q.q then acc, n - else ( (* take next element *) - let x = Queue.take q.q in - decr_size_ q; - pop_ (x::acc) q (n-1) - ) - in - (* call [pop_] until [n] elements have been gathered *) - let rec aux acc q n = - if n=0 then List.rev acc - else - let acc, n = with_lock_ q - (fun () -> - while q.size = 0 do - Condition.wait q.cond q.lock - done; - let acc, n = pop_ acc q n in - Condition.broadcast q.cond; - acc, n - ) - in - aux acc q n - in - aux [] q n - -(*$R - let n = 1000 in - let lists = [| CCList.(1 -- n) ; CCList.(n+1 -- 2*n); CCList.(2*n+1 -- 3*n) |] in - let q = create 2 in - let senders = CCThread.Arr.spawn 3 - (fun i -> - if i=1 - then push_list q lists.(i) (* test push_list *) - else List.iter (push q) lists.(i) - ) - in - let res = CCLock.create [] in - let receivers = CCThread.Arr.spawn 3 - (fun i -> - if i=1 then - let l = take_list q n in - CCLock.update res (fun acc -> l @ acc) - else - for _j = 1 to n do - let x = take q in - CCLock.update res (fun acc -> x::acc) - done - ) - in - CCThread.Arr.join senders; CCThread.Arr.join receivers; - let l = CCLock.get res |> List.sort Pervasives.compare in - assert_equal CCList.(1 -- 3*n) l -*) - -let try_take q = - with_lock_ q - (fun () -> - if q.size = 0 then None - else ( - decr_size_ q; - Some (Queue.take q.q) - )) - -let try_push q x = - with_lock_ q - (fun () -> - if q.size = q.capacity then false - else ( - incr_size_ q; - Queue.push x q.q; - Condition.signal q.cond; - true - )) - -let peek q = - with_lock_ q - (fun () -> - try Some (Queue.peek q.q) - with Queue.Empty -> None) - -let size q = with_lock_ q (fun () -> q.size) - -let capacity q = q.capacity diff --git a/src/threads/CCBlockingQueue.mli b/src/threads/CCBlockingQueue.mli deleted file mode 100644 index 003110b1..00000000 --- a/src/threads/CCBlockingQueue.mli +++ /dev/null @@ -1,50 +0,0 @@ - -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Blocking Queue} - - This queue has a limited size. Pushing a value on the queue when it - is full will block. - - @since 0.16 *) - -type 'a t -(** Safe-thread queue for values of type ['a] *) - -val create : int -> 'a t -(** Create a new queue of size [n]. Using [n=max_int] amounts to using - an infinite queue (2^61 items is a lot to fit in memory); using [n=1] - amounts to using a box with 0 or 1 elements inside. - @raise Invalid_argument if [n < 1] *) - -val push : 'a t -> 'a -> unit -(** [push q x] pushes [x] into [q], blocking if the queue is full *) - -val take : 'a t -> 'a -(** Take the first element, blocking if needed *) - -val push_list : 'a t -> 'a list -> unit -(** Push items of the list, one by one *) - -val take_list : 'a t -> int -> 'a list -(** [take_list n q] takes [n] elements out of [q] *) - -val try_take : 'a t -> 'a option -(** Take the first element if the queue is not empty, return [None] - otherwise *) - -val try_push : 'a t -> 'a -> bool -(** [try_push q x] pushes [x] into [q] if [q] is not full, in which - case it returns [true]. - If it fails because [q] is full, it returns [false] *) - -val peek : 'a t -> 'a option -(** [peek q] returns [Some x] if [x] is the first element of [q], - otherwise it returns [None] *) - -val size : _ t -> int -(** Number of elements currently in the queue *) - -val capacity : _ t -> int -(** Number of values the queue can hold *) - diff --git a/src/threads/CCLock.ml b/src/threads/CCLock.ml deleted file mode 100644 index cd9aa456..00000000 --- a/src/threads/CCLock.ml +++ /dev/null @@ -1,176 +0,0 @@ - -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Utils around Mutex} *) - -type 'a t = { - mutex : Mutex.t; - mutable content : 'a; -} - -type 'a lock = 'a t - -let create content = { - mutex = Mutex.create(); - content; -} - -let with_lock l f = - Mutex.lock l.mutex; - try - let x = f l.content in - Mutex.unlock l.mutex; - x - with e -> - Mutex.unlock l.mutex; - raise e - -(*$R - let l = create 0 in - let try_incr l = - update l (fun x -> Thread.yield(); x+1) - in - for i = 1 to 10 do ignore (Thread.create try_incr l) done; - Thread.delay 0.10 ; - assert_equal 10 (get l) -*) - -module LockRef = struct - type 'a t = 'a lock - let get t = t.content - let set t x = t.content <- x - let update t f = t.content <- f t.content -end - -let with_lock_as_ref l ~f = - Mutex.lock l.mutex; - try - let x = f l in - Mutex.unlock l.mutex; - x - with e -> - Mutex.unlock l.mutex; - raise e - -(*$R - let l = create 0 in - let test_it l = - with_lock_as_ref l - ~f:(fun r -> - (* increment and decrement *) - for j = 0 to 100 do - let x = LockRef.get r in - LockRef.set r (x+10); - if j mod 5=0 then Thread.yield (); - let y = LockRef.get r in - LockRef.set r (y - 10); - done - ) - in - for i = 1 to 100 do ignore (Thread.create test_it l) done; - Thread.delay 0.10; - assert_equal 0 (get l) -*) - -let mutex l = l.mutex - -let update l f = - with_lock l (fun x -> l.content <- f x) - -(*$T - let l = create 5 in update l (fun x->x+1); get l = 6 - *) - -let update_map l f = - with_lock l - (fun x -> - let x', y = f x in - l.content <- x'; - y) - -(*$T - let l = create 5 in update_map l (fun x->x+1, string_of_int x) = "5" && get l = 6 - *) - -let get l = - Mutex.lock l.mutex; - let x = l.content in - Mutex.unlock l.mutex; - x - -let set l x = - Mutex.lock l.mutex; - l.content <- x; - Mutex.unlock l.mutex - -(*$T - let l = create 0 in set l 4; get l = 4 - let l = create 0 in set l 4; set l 5; get l = 5 -*) - -let incr l = update l Pervasives.succ - -let decr l = update l Pervasives.pred - - -(*$R - let l = create 0 in - let a = Array.init 100 (fun _ -> Thread.create (fun _ -> incr l) ()) in - Array.iter Thread.join a; - assert_equal ~printer:CCInt.to_string 100 (get l) -*) - -(*$T - let l = create 0 in incr l ; get l = 1 - let l = create 0 in decr l ; get l = ~-1 - *) - -let incr_then_get l = - Mutex.lock l.mutex; - l.content <- l.content + 1; - let x = l.content in - Mutex.unlock l.mutex; - x - -let get_then_incr l = - Mutex.lock l.mutex; - let x = l.content in - l.content <- l.content + 1; - Mutex.unlock l.mutex; - x - -let decr_then_get l = - Mutex.lock l.mutex; - l.content <- l.content - 1; - let x = l.content in - Mutex.unlock l.mutex; - x - -let get_then_decr l = - Mutex.lock l.mutex; - let x = l.content in - l.content <- l.content - 1; - Mutex.unlock l.mutex; - x - -(*$T - let l = create 0 in 1 = incr_then_get l && 1 = get l - let l = create 0 in 0 = get_then_incr l && 1 = get l - let l = create 10 in 9 = decr_then_get l && 9 = get l - let l = create 10 in 10 = get_then_decr l && 9 = get l -*) - -let get_then_set l = - Mutex.lock l.mutex; - let x = l.content in - l.content <- true; - Mutex.unlock l.mutex; - x - -let get_then_clear l = - Mutex.lock l.mutex; - let x = l.content in - l.content <- false; - Mutex.unlock l.mutex; - x - diff --git a/src/threads/CCLock.mli b/src/threads/CCLock.mli deleted file mode 100644 index 75e4b07c..00000000 --- a/src/threads/CCLock.mli +++ /dev/null @@ -1,87 +0,0 @@ - -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Utils around Mutex} - - A value wrapped into a Mutex, for more safety. - - @since 0.8 *) - -type 'a t -(** A value surrounded with a lock *) - -val create : 'a -> 'a t -(** Create a new protected value *) - -val with_lock : 'a t -> ('a -> 'b) -> 'b -(** [with_lock l f] runs [f x] where [x] is the value protected with - the lock [l], in a critical section. If [f x] fails, [with_lock l f] - fails too but the lock is released *) - -(** Type allowing to manipulate the lock as a reference - @since 0.13 *) -module LockRef : sig - type 'a t - - val get : 'a t -> 'a - - val set : 'a t -> 'a -> unit - - val update : 'a t -> ('a -> 'a) -> unit -end - -val with_lock_as_ref : 'a t -> f:('a LockRef.t -> 'b) -> 'b -(** [with_lock_as_ref l f] calls [f] with a reference-like object - that allows to manipulate the value of [l] safely. - The object passed to [f] must not escape the function call - @since 0.13 *) - -val update : 'a t -> ('a -> 'a) -> unit -(** [update l f] replaces the content [x] of [l] with [f x], atomically *) - -val update_map : 'a t -> ('a -> 'a * 'b) -> 'b -(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l] - and returns [y] - @since 0.16 *) - -val mutex : _ t -> Mutex.t -(** Underlying mutex *) - -val get : 'a t -> 'a -(** Get the value in the lock. The value that is returned isn't protected! *) - -val set : 'a t -> 'a -> unit -(** Atomically set the value - @since 0.13 *) - -val incr : int t -> unit -(** Atomically increment the value - @since 0.13 *) - -val decr : int t -> unit -(** Atomically decrement the value - @since 0.13 *) - -val incr_then_get : int t -> int -(** [incr_then_get x] increments [x], and return its new value - @since 0.16 *) - -val get_then_incr : int t -> int -(** [get_then_incr x] increments [x], and return its previous value - @since 0.16 *) - -val decr_then_get : int t -> int -(** [decr_then_get x] decrements [x], and return its new value - @since 0.16 *) - -val get_then_decr : int t -> int -(** [get_then_decr x] decrements [x], and return its previous value - @since 0.16 *) - -val get_then_set : bool t -> bool -(** [get_then_set b] sets [b] to [true], and return the old value - @since 0.16 *) - -val get_then_clear : bool t -> bool -(** [get_then_clear b] sets [b] to [false], and return the old value - @since 0.16 *) diff --git a/src/threads/CCPool.ml b/src/threads/CCPool.ml deleted file mode 100644 index 401863ca..00000000 --- a/src/threads/CCPool.ml +++ /dev/null @@ -1,545 +0,0 @@ - -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Thread Pool, and Futures} *) - -type +'a state = - | Done of 'a - | Waiting - | Failed of exn - -module type PARAM = sig - val min_size : int - (** Minimum number of threads in the pool *) - - val max_size : int - (** Maximum number of threads in the pool *) -end - -exception Stopped - -(*$inject - module P = Make(struct let min_size = 0 let max_size = 30 end) - module Fut = P.Fut - open Fut.Infix -*) - -(** {2 Thread pool} *) -module Make(P : PARAM) = struct - type job = - | Job1 : ('a -> _) * 'a -> job - | Job2 : ('a -> 'b -> _) * 'a * 'b -> job - | Job3 : ('a -> 'b -> 'c -> _) * 'a * 'b * 'c -> job - | Job4 : ('a -> 'b -> 'c -> 'd -> _) * 'a * 'b * 'c * 'd -> job - - type t = { - mutable stop : bool; (* indicate that threads should stop *) - mutable exn_handler: (exn -> unit); - mutex : Mutex.t; - cond : Condition.t; - jobs : job Queue.t; (* waiting jobs *) - mutable cur_size : int; (* total number of threads *) - mutable cur_idle : int; (* number of idle threads *) - } (** Dynamic, growable thread pool *) - - let nop_ _ = () - - (* singleton pool *) - let pool = { - stop = false; - exn_handler = nop_; - cond = Condition.create(); - cur_size = 0; - cur_idle = 0; - jobs = Queue.create (); - mutex = Mutex.create (); - } - - let set_exn_handler f = pool.exn_handler <- f - - let with_lock_ t f = - Mutex.lock t.mutex; - try - let x = f t in - Mutex.unlock t.mutex; - x - with e -> - Mutex.unlock t.mutex; - raise e - - let incr_size_ p = p.cur_size <- p.cur_size + 1 - let decr_size_ p = p.cur_size <- p.cur_size - 1 - - (* next thing a thread should do *) - type command = - | Process of job - | Wait (* wait on condition *) - | Die (* thread has no work to do *) - - (* thread: seek what to do next (including dying). - Assumes the pool is locked. *) - let get_next_ pool = - if pool.stop - || (Queue.is_empty pool.jobs && pool.cur_size > P.min_size) then ( - (* die: the thread would be idle otherwise *) - assert (pool.cur_size > 0); - decr_size_ pool; - Die - ) - else if Queue.is_empty pool.jobs then Wait - else ( - let job = Queue.pop pool.jobs in - Process job - ) - - (* Thread: entry point. They seek jobs in the queue *) - let rec serve pool = - let cmd = with_lock_ pool get_next_ in - run_cmd cmd - - (* run a command *) - and run_cmd = function - | Die -> () - | Wait -> - with_lock_ pool (fun p -> Condition.wait p.cond p.mutex) - | Process (Job1 (f, x)) -> - begin try ignore (f x) with e -> pool.exn_handler e end; serve pool - | Process (Job2 (f, x, y)) -> - begin try ignore (f x y) with e -> pool.exn_handler e end; serve pool - | Process (Job3 (f, x, y, z)) -> - begin try ignore (f x y z) with e -> pool.exn_handler e end; serve pool - | Process (Job4 (f, x, y, z, w)) -> - begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool - - (* create a new worker thread *) - let launch_worker_ pool = ignore (Thread.create serve pool) - - (* launch the minimum required number of threads *) - let () = - for _i = 1 to P.min_size do launch_worker_ pool done - - (* heuristic criterion for starting a new thread. *) - let can_start_thread_ p = p.cur_size < P.max_size - - let run_job job = - (* acquire lock and push job in queue, or start thread directly - if the queue is empty *) - with_lock_ pool - (fun pool -> - if pool.stop then raise Stopped; - if Queue.is_empty pool.jobs && can_start_thread_ pool && pool.cur_idle = 0 - then ( - (* create the thread now, on [job], as it will not break order of - jobs. We do not want to wait for the busy threads to do our task - if we are allowed to spawn a new thread. *) - incr_size_ pool; - ignore (Thread.create run_cmd (Process job)) - ) else ( - (* cannot start thread, push and wait for some worker to pick it up *) - Queue.push job pool.jobs; - Condition.signal pool.cond; (* wake up *) - (* might want to process in the background, if all threads are busy *) - if pool.cur_idle = 0 && can_start_thread_ pool then ( - incr_size_ pool; - launch_worker_ pool; - ) - )) - - (* run the function on the argument in the given pool *) - let run1 f x = run_job (Job1 (f, x)) - - let run f = run1 f () - - let run2 f x y = run_job (Job2 (f, x, y)) - - let run3 f x y z = run_job (Job3 (f, x, y, z)) - - let run4 f x y z w = run_job (Job4 (f, x, y, z, w)) - - let active () = not pool.stop - - (* kill threads in the pool *) - let stop () = - with_lock_ pool - (fun p -> - p.stop <- true; - Queue.clear p.jobs) - - (* stop threads if pool is GC'd *) - let () = Gc.finalise (fun _ -> stop ()) pool - - (** {6 Futures} *) - module Fut = struct - type 'a handler = 'a state -> unit - - (** A proper future, with a delayed computation *) - type 'a cell = { - mutable state : 'a state; - mutable handlers : 'a handler list; (* handlers *) - f_mutex : Mutex.t; - condition : Condition.t; - } - - (** A future value of type 'a *) - type 'a t = - | Return of 'a - | FailNow of exn - | Run of 'a cell - - type 'a future = 'a t - - (** {2 Basic Future functions} *) - - let return x = Return x - - let fail e = FailNow e - - let create_cell () = { - state = Waiting; - handlers = []; - f_mutex = Mutex.create (); - condition = Condition.create (); - } - - let with_lock_ cell f = - Mutex.lock cell.f_mutex; - try - let x = f cell in - Mutex.unlock cell.f_mutex; - x - with e -> - Mutex.unlock cell.f_mutex; - raise e - - (* TODO: exception handler for handler errors *) - - let set_done_ cell x = - with_lock_ cell - (fun cell -> match cell.state with - | Waiting -> (* set state and signal *) - cell.state <- Done x; - Condition.broadcast cell.condition; - List.iter - (fun f -> try f cell.state with e -> pool.exn_handler e) - cell.handlers - | _ -> assert false) - - let set_fail_ cell e = - with_lock_ cell - (fun cell -> match cell.state with - | Waiting -> - cell.state <- Failed e; - Condition.broadcast cell.condition; - List.iter - (fun f -> try f cell.state with e -> pool.exn_handler e) - cell.handlers - | _ -> assert false) - - (* calls [f x], and put result or exception in [cell] *) - let run_and_set1 cell f x = - try - let y = f x in - set_done_ cell y - with e -> - set_fail_ cell e - - let run_and_set2 cell f x y = - try - let z = f x y in - set_done_ cell z - with e -> - set_fail_ cell e - - let make1 f x = - let cell = create_cell() in - run3 run_and_set1 cell f x; - Run cell - - let make f = make1 f () - - (*$R - List.iter - (fun n -> - let l = Sequence.(1 -- n) |> Sequence.to_list in - let l = List.rev_map (fun i -> - Fut.make - (fun () -> - Thread.delay 0.05; - 1 - )) l in - let l' = List.map Fut.get l in - OUnit.assert_equal n (List.fold_left (+) 0 l'); - ) - [ 10; 300; ] - *) - - let make2 f x y = - let cell = create_cell() in - run4 run_and_set2 cell f x y; - Run cell - - let get = function - | Return x -> x - | FailNow e -> raise e - | Run cell -> - let rec get_ cell = match cell.state with - | Waiting -> - Condition.wait cell.condition cell.f_mutex; (* wait *) - get_ cell - | Done x -> x - | Failed e -> raise e - in - with_lock_ cell get_ - - (* access the result without locking *) - let get_nolock_ = function - | Return x - | Run {state=Done x; _} -> x - | FailNow _ - | Run {state=(Failed _ | Waiting); _} -> assert false - - let state = function - | Return x -> Done x - | FailNow e -> Failed e - | Run cell -> - with_lock_ cell (fun cell -> cell.state) - - let is_done = function - | Return _ - | FailNow _ -> true - | Run cell -> - with_lock_ cell (fun c -> c.state <> Waiting) - - (** {2 Combinators *) - - let add_handler_ cell f = - with_lock_ cell - (fun cell -> match cell.state with - | Waiting -> cell.handlers <- f :: cell.handlers - | Done _ | Failed _ -> f cell.state) - - let on_finish fut k = match fut with - | Return x -> k (Done x) - | FailNow e -> k (Failed e) - | Run cell -> add_handler_ cell k - - let on_success fut k = - on_finish fut - (function - | Done x -> k x - | _ -> ()) - - let on_failure fut k = - on_finish fut - (function - | Failed e -> k e - | _ -> ()) - - let map_cell_ ~async f cell ~into:cell' = - add_handler_ cell - (function - | Done x -> - if async - then run3 run_and_set1 cell' f x - else run_and_set1 cell' f x - | Failed e -> set_fail_ cell' e - | Waiting -> assert false); - Run cell' - - let map_ ~async f fut = match fut with - | Return x -> - if async - then make1 f x - else Return (f x) - | FailNow e -> FailNow e - | Run cell -> map_cell_ ~async f cell ~into:(create_cell()) - - let map f fut = map_ ~async:false f fut - - let map_async f fut = map_ ~async:true f fut - - (*$R - let a = Fut.make (fun () -> 1) in - let b = Fut.map (fun x -> x+1) a in - let c = Fut.map (fun x -> x-1) b in - OUnit.assert_equal 1 (Fut.get c) - *) - - let app_ ~async f x = match f, x with - | Return f, Return x -> - if async - then make1 f x - else Return (f x) - | FailNow e, _ - | _, FailNow e -> FailNow e - | Return f, Run x -> - map_cell_ ~async (fun x -> f x) x ~into:(create_cell()) - | Run f, Return x -> - map_cell_ ~async (fun f -> f x) f ~into:(create_cell()) - | Run f, Run x -> - let cell' = create_cell () in - add_handler_ f - (function - | Done f -> ignore (map_cell_ ~async f x ~into:cell') - | Failed e -> set_fail_ cell' e - | Waiting -> assert false); - Run cell' - - let app f x = app_ ~async:false f x - - let app_async f x = app_ ~async:true f x - - let flat_map f fut = match fut with - | Return x -> f x - | FailNow e -> FailNow e - | Run cell -> - let cell' = create_cell() in - add_handler_ cell - (function - | Done x -> - let fut' = f x in - on_finish fut' - (function - | Done y -> set_done_ cell' y - | Failed e -> set_fail_ cell' e - | Waiting -> assert false - ) - | Failed e -> set_fail_ cell' e - | Waiting -> assert false - ); - Run cell' - - let and_then fut f = flat_map (fun _ -> f ()) fut - - type _ array_or_list = - | A_ : 'a array -> 'a array_or_list - | L_ : 'a list -> 'a array_or_list - - let iter_aol - : type a. a array_or_list -> (a -> unit) -> unit - = fun aol f -> match aol with - | A_ a -> Array.iter f a - | L_ l -> List.iter f l - - (* [sequence_ l f] returns a future that waits for every element of [l] - to return of fail, and call [f ()] to obtain the result (as a closure) - in case every element succeeded (otherwise a failure is - returned automatically) *) - let sequence_ - : type a res. a t array_or_list -> (unit -> res) -> res t - = fun aol f -> - let n = match aol with - | A_ a -> Array.length a - | L_ l -> List.length l - in - assert (n>0); - let cell = create_cell() in - let n_err = CCLock.create 0 in (* number of failed threads *) - let n_ok = CCLock.create 0 in (* number of succeeding threads *) - iter_aol aol - (fun fut -> - on_finish fut - (function - | Failed e -> - let x = CCLock.incr_then_get n_err in - (* if first failure, then seal [cell]'s fate now *) - if x=1 then set_fail_ cell e - | Done _ -> - let x = CCLock.incr_then_get n_ok in - (* if [n] successes, then [cell] succeeds. Otherwise, some - job has not finished or some job has failed. *) - if x = n then ( - let res = f () in - set_done_ cell res - ) - | Waiting -> assert false)); - Run cell - - (* map an array of futures to a future array *) - let sequence_a a = match a with - | [||] -> return [||] - | _ -> - sequence_ (A_ a) - (fun () -> Array.map get_nolock_ a) - - let map_a f a = sequence_a (Array.map f a) - - let sequence_l l = match l with - | [] -> return [] - | _ :: _ -> - sequence_ (L_ l) (fun () -> List.map get_nolock_ l) - - (* reverse twice *) - let map_l f l = - let l = List.rev_map f l in - sequence_ (L_ l) - (fun () -> List.rev_map get_nolock_ l) - - (*$R - let l = CCList.(1 -- 50) in - let l' = l - |> List.map - (fun x -> Fut.make (fun () -> Thread.delay 0.1; x*10)) - |> Fut.sequence_l - |> Fut.map (List.fold_left (+) 0) - in - let expected = List.fold_left (fun acc x -> acc + 10 * x) 0 l in - OUnit.assert_equal expected (Fut.get l') - *) - - (*$R - let l = CCList.(1 -- 50) in - let l' = l - |> List.map - (fun x -> Fut.make (fun () -> Thread.delay 0.1; if x = 5 then raise Exit; x)) - |> Fut.sequence_l - |> Fut.map (List.fold_left (+) 0) - in - OUnit.assert_raises Exit (fun () -> Fut.get l') - *) - - let choose_ - : type a. a t array_or_list -> a t - = fun aol -> - let cell = create_cell() in - let is_done = CCLock.create false in - iter_aol aol - (fun fut -> - on_finish fut - (fun res -> match res with - | Waiting -> assert false - | Done x -> - let was_done = CCLock.get_then_clear is_done in - if not was_done then set_done_ cell x - | Failed e -> - let was_done = CCLock.get_then_clear is_done in - if not was_done then set_fail_ cell e)); - Run cell - - let choose_a a = choose_ (A_ a) - - let choose_l l = choose_ (L_ l) - - let sleep time = make1 Thread.delay time - - (*$R - let start = Unix.gettimeofday () in - let pause = 0.2 and n = 10 in - let l = CCList.(1 -- n) - |> List.map (fun _ -> Fut.make (fun () -> Thread.delay pause)) - in - List.iter Fut.get l; - let stop = Unix.gettimeofday () in - OUnit.assert_bool "some_parallelism" (stop -. start < float_of_int n *. pause); - *) - - module Infix = struct - let (>>=) x f = flat_map f x - let (>>) a f = and_then a f - let (>|=) a f = map f a - let (<*>) = app - end - - include Infix - end -end diff --git a/src/threads/CCPool.mli b/src/threads/CCPool.mli deleted file mode 100644 index 9697c6d1..00000000 --- a/src/threads/CCPool.mli +++ /dev/null @@ -1,167 +0,0 @@ - -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Thread Pool, and Futures} - - Renamed and heavily updated from [CCFuture] - @since 0.16 *) - -type +'a state = - | Done of 'a - | Waiting - | Failed of exn - -module type PARAM = sig - val min_size : int - (** Minimum number of threads in the pool *) - - val max_size : int - (** Maximum number of threads in the pool *) -end - -exception Stopped - -(** {2 Create a new Pool} *) -module Make(P : PARAM) : sig - val run : (unit -> _) -> unit - (** [run f] schedules [f] for being executed in the thread pool *) - - val run1 : ('a -> _) -> 'a -> unit - (** [run1 f x] is similar to [run (fun () -> f x)] *) - - val run2 : ('a -> 'b -> _) -> 'a -> 'b -> unit - - val run3 : ('a -> 'b -> 'c -> _) -> 'a -> 'b -> 'c -> unit - - val set_exn_handler : (exn -> unit) -> unit - - val active : unit -> bool - (** [active ()] is true as long as [stop()] has not been called yet *) - - val stop : unit -> unit - (** After calling [stop ()], Most functions will raise Stopped. - This has the effect of preventing new tasks from being executed. *) - - (** {6 Futures} - - The futures are registration points for callbacks, storing a {!state}, - that are executed in the pool using {!run}. *) - module Fut : sig - type 'a t - (** A future value of type 'a *) - - type 'a future = 'a t - - (** {2 Constructors} *) - - val return : 'a -> 'a t - (** Future that is already computed *) - - val fail : exn -> 'a t - (** Future that fails immediately *) - - val make : (unit -> 'a) -> 'a t - (** Create a future, representing a value that will be computed by - the function. If the function raises, the future will fail. *) - - val make1 : ('a -> 'b) -> 'a -> 'b t - - val make2 : ('a -> 'b -> 'c) -> 'a -> 'b -> 'c t - - (** {2 Basics} *) - - val get : 'a t -> 'a - (** Blocking get: wait for the future to be evaluated, and get the value, - or the exception that failed the future is returned. - raise e if the future failed with e *) - - val state : 'a t -> 'a state - (** State of the future *) - - val is_done : 'a t -> bool - (** Is the future evaluated (success/failure)? *) - - (** {2 Combinators} *) - - val on_success : 'a t -> ('a -> unit) -> unit - (** Attach a handler to be called upon success. - The handler should not call functions on the future. - Might be evaluated now if the future is already done. *) - - val on_failure : _ t -> (exn -> unit) -> unit - (** Attach a handler to be called upon failure. - The handler should not call any function on the future. - Might be evaluated now if the future is already done. *) - - val on_finish : 'a t -> ('a state -> unit) -> unit - (** Attach a handler to be called when the future is evaluated. - The handler should not call functions on the future. - Might be evaluated now if the future is already done. *) - - val flat_map : ('a -> 'b t) -> 'a t -> 'b t - (** Monadic combination of futures *) - - val and_then : 'a t -> (unit -> 'b t) -> 'b t - (** Wait for the first future to succeed, then launch the second *) - - val sequence_a : 'a t array -> 'a array t - (** Future that waits for all previous futures to terminate. If any future - in the array fails, [sequence_a l] fails too. *) - - val map_a : ('a -> 'b t) -> 'a array -> 'b array t - (** [map_l f a] maps [f] on every element of [a], and will return - the array of every result if all calls succeed, or an error otherwise. *) - - val sequence_l : 'a t list -> 'a list t - (** Future that waits for all previous futures to terminate. If any future - in the list fails, [sequence_l l] fails too. *) - - val map_l : ('a -> 'b t) -> 'a list -> 'b list t - (** [map_l f l] maps [f] on every element of [l], and will return - the list of every result if all calls succeed, or an error otherwise. *) - - val choose_a : 'a t array -> 'a t - (** Choose among those futures (the first to terminate). Behaves like - the first future that terminates, by failing if the future fails *) - - val choose_l : 'a t list -> 'a t - (** Choose among those futures (the first to terminate). Behaves like - the first future that terminates, by failing if the future fails *) - - val map : ('a -> 'b) -> 'a t -> 'b t - (** Maps the value inside the future. The function doesn't run in its - own task; if it can take time, use {!flat_map} or {!map_async} *) - - val map_async : ('a -> 'b) -> 'a t -> 'b t - (** Maps the value inside the future, to be computed in a separated job. *) - - val app : ('a -> 'b) t -> 'a t -> 'b t - (** [app f x] applies the result of [f] to the result of [x] *) - - val app_async : ('a -> 'b) t -> 'a t -> 'b t - (** [app f x] applies the result of [f] to the result of [x], in - a separated job scheduled in the pool *) - - val sleep : float -> unit t - (** Future that returns with success in the given amount of seconds. Blocks - the thread! If you need to wait on many events, consider - using {!CCTimer}. *) - - module Infix : sig - val (>>=) : 'a t -> ('a -> 'b t) -> 'b t - val (>>) : 'a t -> (unit -> 'b t) -> 'b t - val (>|=) : 'a t -> ('a -> 'b) -> 'b t - val (<*>) : ('a -> 'b) t -> 'a t -> 'b t - end - - val (>>=) : 'a t -> ('a -> 'b t) -> 'b t - - val (>>) : 'a t -> (unit -> 'b t) -> 'b t - - val (>|=) : 'a t -> ('a -> 'b) -> 'b t - (** Alias to {!map} *) - - val (<*>): ('a -> 'b) t -> 'a t -> 'b t - (** Alias to {!app} *) - end -end diff --git a/src/threads/CCSemaphore.ml b/src/threads/CCSemaphore.ml deleted file mode 100644 index 17d0b6de..00000000 --- a/src/threads/CCSemaphore.ml +++ /dev/null @@ -1,117 +0,0 @@ -(** {1 Semaphores} *) - -type t = { - mutable n : int; - mutex : Mutex.t; - cond : Condition.t; -} - -let create n = - if n <= 0 then invalid_arg "Semaphore.create"; - { n; - mutex=Mutex.create(); - cond=Condition.create(); - } - -let get t = t.n - -(* assume [t.mutex] locked, try to acquire [t] *) -let acquire_once_locked_ m t = - while t.n < m do - Condition.wait t.cond t.mutex; - done; - assert (t.n >= m); - t.n <- t.n - m; - Condition.broadcast t.cond; - Mutex.unlock t.mutex - -let acquire m t = - Mutex.lock t.mutex; - acquire_once_locked_ m t - -(* assume [t.mutex] locked, try to release [t] *) -let release_once_locked_ m t = - t.n <- t.n + m; - Condition.broadcast t.cond; - Mutex.unlock t.mutex - -let release m t = - Mutex.lock t.mutex; - release_once_locked_ m t; - () - -(*$R - let s = create 1 in - let r = CCLock.create false in - let _ = Thread.create (fun s -> acquire 5 s; CCLock.set r true) s in - Thread.yield (); - assert_equal false (CCLock.get r); - release 4 s; - Thread.delay 0.2; - assert_equal true (CCLock.get r); - assert_equal 0 (get s) -*) - -let with_acquire ~n t ~f = - Mutex.lock t.mutex; - acquire_once_locked_ n t; - try - let x = f() in - release_once_locked_ n t; - x - with e -> - release_once_locked_ n t; - raise e - -(*$R - let s = create 5 in - let n = CCLock.create 0 in - let a = Array.init 100 (fun i -> - Thread.create (fun _ -> - with_acquire ~n:(1 + (i mod 5)) s - ~f:(fun () -> CCLock.incr n) - ) ()) - in - Array.iter Thread.join a; - assert_equal ~printer:CCInt.to_string 5 (get s); - assert_equal ~printer:CCInt.to_string 100 (CCLock.get n) -*) - -let wait_until_at_least ~n t ~f = - Mutex.lock t.mutex; - while t.n < n do - Condition.wait t.cond t.mutex; - done; - assert (t.n >= n); - Mutex.unlock t.mutex; - f () - -(*$R - let output s = () in - let s = create 2 in - let res = CCLock.create false in - let id = Thread.create - (fun () -> - output "start"; - wait_until_at_least ~n:5 s - ~f:(fun () -> - assert (get s >= 5); - output "modify now"; - CCLock.set res true) - ) () - in - output "launched thread"; - Thread.yield(); - assert_bool "start" (not (CCLock.get res)); - output "release 2"; - release 2 s; - Thread.yield(); - assert_bool "after release 2" (not (CCLock.get res)); - output "release 1"; - release 1 s; - (* should work now *) - Thread.delay 0.2; - Thread.join id; - output "check"; - assert_bool "after release 1" (CCLock.get res) -*) diff --git a/src/threads/CCSemaphore.mli b/src/threads/CCSemaphore.mli deleted file mode 100644 index 5734d31c..00000000 --- a/src/threads/CCSemaphore.mli +++ /dev/null @@ -1,31 +0,0 @@ -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Semaphores} - - @since 0.13 *) - -type t -(** A semaphore *) - -val create : int -> t -(** [create n] creates a semaphore with initial value [n] - @raise Invalid_argument if [n <= 0] *) - -val get : t -> int -(** Current value *) - -val acquire : int -> t -> unit -(** [acquire n s] blocks until [get s >= n], then atomically - sets [s := !s - n] *) - -val release : int -> t -> unit -(** [release n s] atomically sets [s := !s + n] *) - -val with_acquire : n:int -> t -> f:(unit -> 'a) -> 'a -(** [with_acquire ~n s ~f] first acquires [s] with [n] units, - calls [f ()], and then release [s] with [n] units. - Safely release the semaphore even if [f ()] fails *) - -val wait_until_at_least : n:int -> t -> f:(unit -> 'a) -> 'a -(** [wait_until_at_least ~n s ~f] waits until [get s >= n], then calls [f ()] - and returns its result. Doesn't modify the semaphore. *) diff --git a/src/threads/CCThread.ml b/src/threads/CCThread.ml deleted file mode 100644 index eb274097..00000000 --- a/src/threads/CCThread.ml +++ /dev/null @@ -1,85 +0,0 @@ -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Threads} *) - -type t = Thread.t - -let spawn f = Thread.create f () - -let spawn1 f x = Thread.create f x - -let spawn2 f x y = Thread.create (fun () -> f x y) () - -let detach f = ignore (Thread.create f ()) - -let finally_ f x ~h = - try - let res = f x in - ignore (h ()); - res - with e -> - ignore (h()); - raise e - -module Arr = struct - let spawn n f = - Array.init n (fun i -> Thread.create f i) - - let join a = Array.iter Thread.join a -end - -(*$R - let l = CCLock.create 0 in - let a = Arr.spawn 101 (fun i -> CCLock.update l ((+) i)) in - Arr.join a; - let n = Sequence.(1 -- 100 |> fold (+) 0) in - assert_equal ~printer:CCInt.to_string n (CCLock.get l) -*) - -module Barrier = struct - type t = { - lock: Mutex.t; - cond: Condition.t; - mutable activated: bool; - } - - let create () = { - lock=Mutex.create(); - cond=Condition.create(); - activated=false; - } - - let with_lock_ b f = - Mutex.lock b.lock; - finally_ f () ~h:(fun () -> Mutex.unlock b.lock) - - let reset b = with_lock_ b (fun () -> b.activated <- false) - - let wait b = - with_lock_ b - (fun () -> - while not b.activated do - Condition.wait b.cond b.lock - done) - - let activate b = - with_lock_ b - (fun () -> - if not b.activated then ( - b.activated <- true; - Condition.broadcast b.cond)) - - let activated b = with_lock_ b (fun () -> b.activated) -end - -(*$R - let b = Barrier.create () in - let res = CCLock.create 0 in - let t1 = spawn (fun _ -> Barrier.wait b; CCLock.incr res) - and t2 = spawn (fun _ -> Barrier.wait b; CCLock.incr res) in - Thread.delay 0.2; - assert_equal 0 (CCLock.get res); - Barrier.activate b; - Thread.join t1; Thread.join t2; - assert_equal 2 (CCLock.get res) -*) diff --git a/src/threads/CCThread.mli b/src/threads/CCThread.mli deleted file mode 100644 index fe54e6f8..00000000 --- a/src/threads/CCThread.mli +++ /dev/null @@ -1,58 +0,0 @@ -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Threads} - - {b status: unstable} - @since 0.13 *) - -type t = Thread.t - -val spawn : (unit -> _) -> t -(** [spawn f] creates a new thread that runs [f ()] *) - -val spawn1 : ('a -> _) -> 'a -> t -(** [spawn1 f x] is like [spawn (fun () -> f x)]. - @since 0.16 *) - -val spawn2 : ('a -> 'b -> _) -> 'a -> 'b -> t -(** [spawn2 f x y] is like [spawn (fun () -> f x y)]. - @since 0.16 *) - -val detach : (unit -> 'a) -> unit -(** [detach f] is the same as [ignore (spawn f)] *) - -(** {2 Array of threads} *) -module Arr : sig - val spawn : int -> (int -> 'a) -> t array - (** [A.spawn n f] creates an array [res] of length [n], such that - [res.(i) = spawn (fun () -> f i)] *) - - val join : t array -> unit - (** [A.join a] joins every thread in [a] *) -end - -(** {2 Single-Use Barrier} *) - -module Barrier : sig - type t - (** Barrier, used to synchronize threads *) - - val create : unit -> t - (** Create a barrier *) - - val reset : t -> unit - (** Reset to initial (non-triggered) state *) - - val wait : t -> unit - (** [wait b] waits for barrier [b] to be activated by [activate b]. - All threads calling this wait until [activate b] is called. - If [b] is already activated, [wait b] does nothing *) - - val activate : t -> unit - (** [activate b] unblocks all threads that were waiting on [b] *) - - val activated : t -> bool - (** [activated b] returns [true] iff [activate b] was called, and [reset b] - was not called since. In other words, [activated b = true] means - [wait b] will not block. *) -end diff --git a/src/threads/CCTimer.ml b/src/threads/CCTimer.ml deleted file mode 100644 index 3fd93934..00000000 --- a/src/threads/CCTimer.ml +++ /dev/null @@ -1,195 +0,0 @@ - -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Event timer} *) - -type job = - | Job : float * (unit -> 'a) -> job - -module TaskHeap = CCHeap.Make(struct - type t = job - let leq (Job(f1,_)) (Job (f2,_)) = f1 <= f2 -end) - -exception Stopped - -type t = { - mutable stop : bool; - mutable tasks : TaskHeap.t; - mutable exn_handler : (exn -> unit); - t_mutex : Mutex.t; - fifo_in : Unix.file_descr; - fifo_out : Unix.file_descr; -} - -let set_exn_handler timer f = timer.exn_handler <- f - -let standby_wait = 10. -(* when no task is scheduled, this is the amount of time that is waited - in a row for something to happen. This is also the maximal delay - between the call to {!stop} and the actual termination of the - thread. *) - -let epsilon = 0.0001 -(* accepted time diff for actions. *) - -let with_lock_ t f = - Mutex.lock t.t_mutex; - try - let x = f t in - Mutex.unlock t.t_mutex; - x - with e -> - Mutex.unlock t.t_mutex; - raise e - -type command = - | Quit - | Run : (unit -> _) -> command - | Wait of float - -let pop_task_ t = - let tasks, _ = TaskHeap.take_exn t.tasks in - t.tasks <- tasks - -let call_ timer f = - try ignore (f ()) - with e -> timer.exn_handler e - -(* check next task *) -let next_task_ timer = match TaskHeap.find_min timer.tasks with - | _ when timer.stop -> Quit - | None -> Wait standby_wait - | Some Job (time, f) -> - let now = Unix.gettimeofday () in - if now +. epsilon > time then ( - (* now! *) - pop_task_ timer; - Run f - ) else Wait (time -. now) - -(* The main thread function: wait for next event, run it, and loop *) -let serve timer = - let buf = Bytes.make 1 '_' in - (* acquire lock, call [process_task] and do as it commands *) - let rec next () = match with_lock_ timer next_task_ with - | Quit -> () - | Run f -> - call_ timer f; (* call outside of any lock *) - next () - | Wait delay -> wait delay - (* wait for [delay] seconds, or until something happens on [fifo_in] *) - and wait delay = - let read = Thread.wait_timed_read timer.fifo_in delay in - (* remove char from fifo, so that next write can happen *) - if read then ignore (Unix.read timer.fifo_in buf 0 1); - next () - in - next () - -let nop_handler_ _ = () - -let create () = - let fifo_in, fifo_out = Unix.pipe () in - let timer = { - stop = false; - tasks = TaskHeap.empty; - exn_handler = nop_handler_; - t_mutex = Mutex.create (); - fifo_in; - fifo_out; - } in - (* start a thread to process tasks *) - let _t = Thread.create serve timer in - timer - -let underscore_ = Bytes.make 1 '_' - -(* awake the thread *) -let awaken_ timer = - ignore (Unix.single_write timer.fifo_out underscore_ 0 1) - -(** [at s t ~f] will run [f ()] at the Unix echo [t] *) -let at timer time ~f = - if timer.stop then raise Stopped; - let now = Unix.gettimeofday () in - if now >= time - then call_ timer f - else - with_lock_ timer - (fun timer -> - if timer.stop then raise Stopped; - (* time of the next scheduled event *) - let next_time = match TaskHeap.find_min timer.tasks with - | None -> max_float - | Some Job (d, _) -> d - in - (* insert task *) - timer.tasks <- TaskHeap.insert (Job (time, f)) timer.tasks; - (* see if the timer thread needs to be awaken earlier *) - if time < next_time then awaken_ timer - ) - -let after timer delay ~f = - assert (delay >= 0.); - let now = Unix.gettimeofday () in - at timer (now +. delay) ~f - -exception ExitEvery - -let every ?delay timer d ~f = - let rec run () = - try - ignore (f ()); - schedule() - with ExitEvery -> () (* stop *) - and schedule () = after timer d ~f:run in - match delay with - | None -> run() - | Some d -> after timer d ~f:run - -(*$R - let start = Unix.gettimeofday() in - let timer = create() in - let res = CCLock.create 0 in - let stop = ref 0. in - every timer 0.1 - ~f:(fun () -> - if CCLock.incr_then_get res > 5 then ( - stop := Unix.gettimeofday(); - raise ExitEvery - )); - Thread.delay 0.7; - OUnit.assert_equal ~printer:CCInt.to_string 6 (CCLock.get res); - OUnit.assert_bool "estimate delay" (abs_float (!stop -. start -. 0.5) < 0.1); -*) - -let active timer = not timer.stop - -(** Stop the given timer, cancelling pending tasks *) -let stop timer = - with_lock_ timer - (fun timer -> - if not timer.stop then ( - timer.stop <- true; - (* empty heap of tasks *) - timer.tasks <- TaskHeap.empty; - (* tell the thread to stop *) - awaken_ timer; - ) - ) - -(*$R - (* scenario: n := 1; n := n*4 ; n := n+2; res := n *) - let timer = create () in - let n = CCLock.create 1 in - let res = CCLock.create 0 in - after timer 0.3 - ~f:(fun () -> CCLock.update n (fun x -> x+2)); - ignore (Thread.create - (fun _ -> Thread.delay 0.4; CCLock.set res (CCLock.get n)) ()); - after timer 0.2 - ~f:(fun () -> CCLock.update n (fun x -> x * 4)); - Thread.delay 0.6 ; - OUnit.assert_equal 6 (CCLock.get res); -*) diff --git a/src/threads/CCTimer.mli b/src/threads/CCTimer.mli deleted file mode 100644 index f0068cf8..00000000 --- a/src/threads/CCTimer.mli +++ /dev/null @@ -1,43 +0,0 @@ - -(* This file is free software, part of containers. See file "license" for more details. *) - -(** {1 Event timer} - - Used to be part of [CCFuture] - @since 0.16 *) - -type t -(** A scheduler for events. It runs in its own thread. *) - -val create : unit -> t -(** A new timer. *) - -val set_exn_handler : t -> (exn -> unit) -> unit -(** [set_exn_handler timer f] registers [f] so that any exception - raised by a task scheduled in [timer] is given to [f] *) - -exception Stopped - -val after : t -> float -> f:(unit -> _) -> unit -(** Call the callback [f] after the given number of seconds. - @raise Stopped if the timer was stopped *) - -val at : t -> float -> f:(unit -> _) -> unit -(** Create a future that evaluates to [()] at the given Unix timestamp - @raise Stopped if the timer was stopped *) - -exception ExitEvery - -val every : ?delay:float -> t -> float -> f:(unit -> _) -> unit -(** [every timer n ~f] calls [f ()] every [n] seconds. - [f()] can raise ExitEvery to stop the cycle. - @param delay if provided, the first call to [f ()] is delayed by - that many seconds. - @raise Stopped if the timer was stopped *) - -val stop : t -> unit -(** Stop the given timer, cancelling pending tasks. Idempotent. - From now on, calling most other operations on the timer will raise Stopped. *) - -val active : t -> bool -(** Returns [true] until [stop t] has been called. *)