From 4821772dcff586f4c10bcc0ea35675f7d8830c06 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 25 Jan 2017 00:07:38 +0100 Subject: [PATCH] Revert "remove containers.thread" This reverts commit 9f34a7f6e342a3db3c4fba55c10028561014cf20. --- README.adoc | 9 + _oasis | 24 +- doc/intro.txt | 16 + src/threads/CCBlockingQueue.ml | 191 +++++++++++ src/threads/CCBlockingQueue.mli | 50 +++ src/threads/CCLock.ml | 176 +++++++++++ src/threads/CCLock.mli | 87 +++++ src/threads/CCPool.ml | 545 ++++++++++++++++++++++++++++++++ src/threads/CCPool.mli | 167 ++++++++++ src/threads/CCSemaphore.ml | 117 +++++++ src/threads/CCSemaphore.mli | 31 ++ src/threads/CCThread.ml | 85 +++++ src/threads/CCThread.mli | 58 ++++ src/threads/CCTimer.ml | 195 ++++++++++++ src/threads/CCTimer.mli | 43 +++ 15 files changed, 1791 insertions(+), 3 deletions(-) create mode 100644 src/threads/CCBlockingQueue.ml create mode 100644 src/threads/CCBlockingQueue.mli create mode 100644 src/threads/CCLock.ml create mode 100644 src/threads/CCLock.mli create mode 100644 src/threads/CCPool.ml create mode 100644 src/threads/CCPool.mli create mode 100644 src/threads/CCSemaphore.ml create mode 100644 src/threads/CCSemaphore.mli create mode 100644 src/threads/CCThread.ml create mode 100644 src/threads/CCThread.mli create mode 100644 src/threads/CCTimer.ml create mode 100644 src/threads/CCTimer.mli diff --git a/README.adoc b/README.adoc index 39d8f03f..0bb2bf97 100644 --- a/README.adoc +++ b/README.adoc @@ -181,6 +181,15 @@ Iterators: - `CCKTree`, an abstract lazy tree structure +=== Thread + +In the library `containers.thread`, for preemptive system threads: + +- `CCFuture`, a set of tools for preemptive threading, including a thread pool, + monadic futures, and MVars (concurrent boxes) +- `CCLock`, values protected by locks +- `CCSemaphore`, a simple implementation of semaphores +- `CCThread` basic wrappers for `Thread` === Misc diff --git a/_oasis b/_oasis index 1360c5d3..646621b2 100644 --- a/_oasis +++ b/_oasis @@ -21,10 +21,17 @@ Description: extend the stdlib (e.g. CCList provides safe map/fold_right/append, and additional functions on lists). + It also features optional libraries for dealing with strings, and + helpers for unix and threads. + Flag "unix" Description: Build the containers.unix library (depends on Unix) Default: false +Flag "thread" + Description: Build modules that depend on threads + Default: true + Flag "bench" Description: Build and run benchmarks Default: true @@ -73,6 +80,17 @@ Library "containers_iter" FindlibParent: containers FindlibName: iter +Library "containers_thread" + Path: src/threads/ + Modules: CCPool, CCLock, CCSemaphore, CCThread, CCBlockingQueue, + CCTimer + FindlibName: thread + FindlibParent: containers + Build$: flag(thread) + Install$: flag(thread) + BuildDepends: containers, threads + XMETARequires: containers, threads + Library "containers_top" Path: src/top/ Modules: Containers_top @@ -92,7 +110,7 @@ Document containers "-docflags '-colorize-code -short-functors -charset utf-8'" XOCamlbuildLibraries: containers, containers.iter, containers.data, - containers.unix, containers.sexp + containers.thread, containers.unix, containers.sexp Executable run_benchs Path: benchs/ @@ -101,7 +119,7 @@ Executable run_benchs Build$: flag(bench) MainIs: run_benchs.ml BuildDepends: containers, qcheck, - containers.data, containers.iter, + containers.data, containers.iter, containers.thread, sequence, gen, benchmark Executable run_bench_hash @@ -121,7 +139,7 @@ Executable run_qtest MainIs: run_qtest.ml Build$: flag(tests) && flag(unix) BuildDepends: containers, containers.iter, - containers.sexp, containers.unix, + containers.sexp, containers.unix, containers.thread, containers.data, sequence, gen, unix, oUnit, qcheck diff --git a/doc/intro.txt b/doc/intro.txt index 7b445629..461a5ece 100644 --- a/doc/intro.txt +++ b/doc/intro.txt @@ -143,6 +143,22 @@ Moved to its own repository. Moved to its own repository +{4 Thread Helpers} + +{b findlib name}: containers.thread + +Modules related to the use of [Thread]. + +{!modules: +CCBlockingQueue +CCLock +CCPool +CCSemaphore +CCThread +CCTimer +} + + {2 Index} {!indexlist} diff --git a/src/threads/CCBlockingQueue.ml b/src/threads/CCBlockingQueue.ml new file mode 100644 index 00000000..d767b4ab --- /dev/null +++ b/src/threads/CCBlockingQueue.ml @@ -0,0 +1,191 @@ + +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Blocking Queue} *) + +type 'a t = { + q : 'a Queue.t; + lock : Mutex.t; + cond : Condition.t; + capacity : int; + mutable size : int; +} + +let create n = + if n < 1 then invalid_arg "BloquingQueue.create"; + let q = { + q=Queue.create(); + lock=Mutex.create(); + cond=Condition.create(); + capacity=n; + size=0; + } in + q + +let incr_size_ q = assert(q.size < q.capacity); q.size <- q.size + 1 +let decr_size_ q = assert(q.size > 0); q.size <- q.size - 1 + +let finally_ f x ~h = + try + let res = f x in + ignore (h ()); + res + with e -> + ignore (h()); + raise e + +let with_lock_ q f = + Mutex.lock q.lock; + finally_ f () ~h:(fun () -> Mutex.unlock q.lock) + +let push q x = + with_lock_ q + (fun () -> + while q.size = q.capacity do + Condition.wait q.cond q.lock + done; + assert (q.size < q.capacity); + Queue.push x q.q; + (* if there are blocked receivers, awake one of them *) + incr_size_ q; + Condition.broadcast q.cond) + +let take q = + with_lock_ q + (fun () -> + while q.size = 0 do + Condition.wait q.cond q.lock + done; + let x = Queue.take q.q in + (* if there are blocked senders, awake one of them *) + decr_size_ q; + Condition.broadcast q.cond; + x) + +(*$R + let q = create 1 in + let t1 = CCThread.spawn (fun () -> push q 1; push q 2) in + let t2 = CCThread.spawn (fun () -> push q 3; push q 4) in + let l = CCLock.create [] in + let t3 = CCThread.spawn (fun () -> for i = 1 to 4 do + let x = take q in + CCLock.update l (fun l -> x :: l) + done) + in + Thread.join t1; Thread.join t2; Thread.join t3; + assert_equal [1;2;3;4] (List.sort Pervasives.compare (CCLock.get l)) +*) + +let push_list q l = + (* push elements until it's not possible. + Assumes the lock is acquired. *) + let rec push_ q l = match l with + | [] -> l + | _::_ when q.size = q.capacity -> l (* no room remaining *) + | x :: tl -> + Queue.push x q.q; + incr_size_ q; + push_ q tl + in + (* push chunks of [l] in [q] until [l] is empty *) + let rec aux q l = match l with + | [] -> () + | _::_ -> + let l = with_lock_ q + (fun () -> + while q.size = q.capacity do + Condition.wait q.cond q.lock + done; + let l = push_ q l in + Condition.broadcast q.cond; + l) + in + aux q l + in aux q l + +let take_list q n = + (* take at most [n] elements of [q] and prepend them to [acc] *) + let rec pop_ acc q n = + if n=0 || Queue.is_empty q.q then acc, n + else ( (* take next element *) + let x = Queue.take q.q in + decr_size_ q; + pop_ (x::acc) q (n-1) + ) + in + (* call [pop_] until [n] elements have been gathered *) + let rec aux acc q n = + if n=0 then List.rev acc + else + let acc, n = with_lock_ q + (fun () -> + while q.size = 0 do + Condition.wait q.cond q.lock + done; + let acc, n = pop_ acc q n in + Condition.broadcast q.cond; + acc, n + ) + in + aux acc q n + in + aux [] q n + +(*$R + let n = 1000 in + let lists = [| CCList.(1 -- n) ; CCList.(n+1 -- 2*n); CCList.(2*n+1 -- 3*n) |] in + let q = create 2 in + let senders = CCThread.Arr.spawn 3 + (fun i -> + if i=1 + then push_list q lists.(i) (* test push_list *) + else List.iter (push q) lists.(i) + ) + in + let res = CCLock.create [] in + let receivers = CCThread.Arr.spawn 3 + (fun i -> + if i=1 then + let l = take_list q n in + CCLock.update res (fun acc -> l @ acc) + else + for _j = 1 to n do + let x = take q in + CCLock.update res (fun acc -> x::acc) + done + ) + in + CCThread.Arr.join senders; CCThread.Arr.join receivers; + let l = CCLock.get res |> List.sort Pervasives.compare in + assert_equal CCList.(1 -- 3*n) l +*) + +let try_take q = + with_lock_ q + (fun () -> + if q.size = 0 then None + else ( + decr_size_ q; + Some (Queue.take q.q) + )) + +let try_push q x = + with_lock_ q + (fun () -> + if q.size = q.capacity then false + else ( + incr_size_ q; + Queue.push x q.q; + Condition.signal q.cond; + true + )) + +let peek q = + with_lock_ q + (fun () -> + try Some (Queue.peek q.q) + with Queue.Empty -> None) + +let size q = with_lock_ q (fun () -> q.size) + +let capacity q = q.capacity diff --git a/src/threads/CCBlockingQueue.mli b/src/threads/CCBlockingQueue.mli new file mode 100644 index 00000000..003110b1 --- /dev/null +++ b/src/threads/CCBlockingQueue.mli @@ -0,0 +1,50 @@ + +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Blocking Queue} + + This queue has a limited size. Pushing a value on the queue when it + is full will block. + + @since 0.16 *) + +type 'a t +(** Safe-thread queue for values of type ['a] *) + +val create : int -> 'a t +(** Create a new queue of size [n]. Using [n=max_int] amounts to using + an infinite queue (2^61 items is a lot to fit in memory); using [n=1] + amounts to using a box with 0 or 1 elements inside. + @raise Invalid_argument if [n < 1] *) + +val push : 'a t -> 'a -> unit +(** [push q x] pushes [x] into [q], blocking if the queue is full *) + +val take : 'a t -> 'a +(** Take the first element, blocking if needed *) + +val push_list : 'a t -> 'a list -> unit +(** Push items of the list, one by one *) + +val take_list : 'a t -> int -> 'a list +(** [take_list n q] takes [n] elements out of [q] *) + +val try_take : 'a t -> 'a option +(** Take the first element if the queue is not empty, return [None] + otherwise *) + +val try_push : 'a t -> 'a -> bool +(** [try_push q x] pushes [x] into [q] if [q] is not full, in which + case it returns [true]. + If it fails because [q] is full, it returns [false] *) + +val peek : 'a t -> 'a option +(** [peek q] returns [Some x] if [x] is the first element of [q], + otherwise it returns [None] *) + +val size : _ t -> int +(** Number of elements currently in the queue *) + +val capacity : _ t -> int +(** Number of values the queue can hold *) + diff --git a/src/threads/CCLock.ml b/src/threads/CCLock.ml new file mode 100644 index 00000000..cd9aa456 --- /dev/null +++ b/src/threads/CCLock.ml @@ -0,0 +1,176 @@ + +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Utils around Mutex} *) + +type 'a t = { + mutex : Mutex.t; + mutable content : 'a; +} + +type 'a lock = 'a t + +let create content = { + mutex = Mutex.create(); + content; +} + +let with_lock l f = + Mutex.lock l.mutex; + try + let x = f l.content in + Mutex.unlock l.mutex; + x + with e -> + Mutex.unlock l.mutex; + raise e + +(*$R + let l = create 0 in + let try_incr l = + update l (fun x -> Thread.yield(); x+1) + in + for i = 1 to 10 do ignore (Thread.create try_incr l) done; + Thread.delay 0.10 ; + assert_equal 10 (get l) +*) + +module LockRef = struct + type 'a t = 'a lock + let get t = t.content + let set t x = t.content <- x + let update t f = t.content <- f t.content +end + +let with_lock_as_ref l ~f = + Mutex.lock l.mutex; + try + let x = f l in + Mutex.unlock l.mutex; + x + with e -> + Mutex.unlock l.mutex; + raise e + +(*$R + let l = create 0 in + let test_it l = + with_lock_as_ref l + ~f:(fun r -> + (* increment and decrement *) + for j = 0 to 100 do + let x = LockRef.get r in + LockRef.set r (x+10); + if j mod 5=0 then Thread.yield (); + let y = LockRef.get r in + LockRef.set r (y - 10); + done + ) + in + for i = 1 to 100 do ignore (Thread.create test_it l) done; + Thread.delay 0.10; + assert_equal 0 (get l) +*) + +let mutex l = l.mutex + +let update l f = + with_lock l (fun x -> l.content <- f x) + +(*$T + let l = create 5 in update l (fun x->x+1); get l = 6 + *) + +let update_map l f = + with_lock l + (fun x -> + let x', y = f x in + l.content <- x'; + y) + +(*$T + let l = create 5 in update_map l (fun x->x+1, string_of_int x) = "5" && get l = 6 + *) + +let get l = + Mutex.lock l.mutex; + let x = l.content in + Mutex.unlock l.mutex; + x + +let set l x = + Mutex.lock l.mutex; + l.content <- x; + Mutex.unlock l.mutex + +(*$T + let l = create 0 in set l 4; get l = 4 + let l = create 0 in set l 4; set l 5; get l = 5 +*) + +let incr l = update l Pervasives.succ + +let decr l = update l Pervasives.pred + + +(*$R + let l = create 0 in + let a = Array.init 100 (fun _ -> Thread.create (fun _ -> incr l) ()) in + Array.iter Thread.join a; + assert_equal ~printer:CCInt.to_string 100 (get l) +*) + +(*$T + let l = create 0 in incr l ; get l = 1 + let l = create 0 in decr l ; get l = ~-1 + *) + +let incr_then_get l = + Mutex.lock l.mutex; + l.content <- l.content + 1; + let x = l.content in + Mutex.unlock l.mutex; + x + +let get_then_incr l = + Mutex.lock l.mutex; + let x = l.content in + l.content <- l.content + 1; + Mutex.unlock l.mutex; + x + +let decr_then_get l = + Mutex.lock l.mutex; + l.content <- l.content - 1; + let x = l.content in + Mutex.unlock l.mutex; + x + +let get_then_decr l = + Mutex.lock l.mutex; + let x = l.content in + l.content <- l.content - 1; + Mutex.unlock l.mutex; + x + +(*$T + let l = create 0 in 1 = incr_then_get l && 1 = get l + let l = create 0 in 0 = get_then_incr l && 1 = get l + let l = create 10 in 9 = decr_then_get l && 9 = get l + let l = create 10 in 10 = get_then_decr l && 9 = get l +*) + +let get_then_set l = + Mutex.lock l.mutex; + let x = l.content in + l.content <- true; + Mutex.unlock l.mutex; + x + +let get_then_clear l = + Mutex.lock l.mutex; + let x = l.content in + l.content <- false; + Mutex.unlock l.mutex; + x + diff --git a/src/threads/CCLock.mli b/src/threads/CCLock.mli new file mode 100644 index 00000000..75e4b07c --- /dev/null +++ b/src/threads/CCLock.mli @@ -0,0 +1,87 @@ + +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Utils around Mutex} + + A value wrapped into a Mutex, for more safety. + + @since 0.8 *) + +type 'a t +(** A value surrounded with a lock *) + +val create : 'a -> 'a t +(** Create a new protected value *) + +val with_lock : 'a t -> ('a -> 'b) -> 'b +(** [with_lock l f] runs [f x] where [x] is the value protected with + the lock [l], in a critical section. If [f x] fails, [with_lock l f] + fails too but the lock is released *) + +(** Type allowing to manipulate the lock as a reference + @since 0.13 *) +module LockRef : sig + type 'a t + + val get : 'a t -> 'a + + val set : 'a t -> 'a -> unit + + val update : 'a t -> ('a -> 'a) -> unit +end + +val with_lock_as_ref : 'a t -> f:('a LockRef.t -> 'b) -> 'b +(** [with_lock_as_ref l f] calls [f] with a reference-like object + that allows to manipulate the value of [l] safely. + The object passed to [f] must not escape the function call + @since 0.13 *) + +val update : 'a t -> ('a -> 'a) -> unit +(** [update l f] replaces the content [x] of [l] with [f x], atomically *) + +val update_map : 'a t -> ('a -> 'a * 'b) -> 'b +(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l] + and returns [y] + @since 0.16 *) + +val mutex : _ t -> Mutex.t +(** Underlying mutex *) + +val get : 'a t -> 'a +(** Get the value in the lock. The value that is returned isn't protected! *) + +val set : 'a t -> 'a -> unit +(** Atomically set the value + @since 0.13 *) + +val incr : int t -> unit +(** Atomically increment the value + @since 0.13 *) + +val decr : int t -> unit +(** Atomically decrement the value + @since 0.13 *) + +val incr_then_get : int t -> int +(** [incr_then_get x] increments [x], and return its new value + @since 0.16 *) + +val get_then_incr : int t -> int +(** [get_then_incr x] increments [x], and return its previous value + @since 0.16 *) + +val decr_then_get : int t -> int +(** [decr_then_get x] decrements [x], and return its new value + @since 0.16 *) + +val get_then_decr : int t -> int +(** [get_then_decr x] decrements [x], and return its previous value + @since 0.16 *) + +val get_then_set : bool t -> bool +(** [get_then_set b] sets [b] to [true], and return the old value + @since 0.16 *) + +val get_then_clear : bool t -> bool +(** [get_then_clear b] sets [b] to [false], and return the old value + @since 0.16 *) diff --git a/src/threads/CCPool.ml b/src/threads/CCPool.ml new file mode 100644 index 00000000..401863ca --- /dev/null +++ b/src/threads/CCPool.ml @@ -0,0 +1,545 @@ + +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Thread Pool, and Futures} *) + +type +'a state = + | Done of 'a + | Waiting + | Failed of exn + +module type PARAM = sig + val min_size : int + (** Minimum number of threads in the pool *) + + val max_size : int + (** Maximum number of threads in the pool *) +end + +exception Stopped + +(*$inject + module P = Make(struct let min_size = 0 let max_size = 30 end) + module Fut = P.Fut + open Fut.Infix +*) + +(** {2 Thread pool} *) +module Make(P : PARAM) = struct + type job = + | Job1 : ('a -> _) * 'a -> job + | Job2 : ('a -> 'b -> _) * 'a * 'b -> job + | Job3 : ('a -> 'b -> 'c -> _) * 'a * 'b * 'c -> job + | Job4 : ('a -> 'b -> 'c -> 'd -> _) * 'a * 'b * 'c * 'd -> job + + type t = { + mutable stop : bool; (* indicate that threads should stop *) + mutable exn_handler: (exn -> unit); + mutex : Mutex.t; + cond : Condition.t; + jobs : job Queue.t; (* waiting jobs *) + mutable cur_size : int; (* total number of threads *) + mutable cur_idle : int; (* number of idle threads *) + } (** Dynamic, growable thread pool *) + + let nop_ _ = () + + (* singleton pool *) + let pool = { + stop = false; + exn_handler = nop_; + cond = Condition.create(); + cur_size = 0; + cur_idle = 0; + jobs = Queue.create (); + mutex = Mutex.create (); + } + + let set_exn_handler f = pool.exn_handler <- f + + let with_lock_ t f = + Mutex.lock t.mutex; + try + let x = f t in + Mutex.unlock t.mutex; + x + with e -> + Mutex.unlock t.mutex; + raise e + + let incr_size_ p = p.cur_size <- p.cur_size + 1 + let decr_size_ p = p.cur_size <- p.cur_size - 1 + + (* next thing a thread should do *) + type command = + | Process of job + | Wait (* wait on condition *) + | Die (* thread has no work to do *) + + (* thread: seek what to do next (including dying). + Assumes the pool is locked. *) + let get_next_ pool = + if pool.stop + || (Queue.is_empty pool.jobs && pool.cur_size > P.min_size) then ( + (* die: the thread would be idle otherwise *) + assert (pool.cur_size > 0); + decr_size_ pool; + Die + ) + else if Queue.is_empty pool.jobs then Wait + else ( + let job = Queue.pop pool.jobs in + Process job + ) + + (* Thread: entry point. They seek jobs in the queue *) + let rec serve pool = + let cmd = with_lock_ pool get_next_ in + run_cmd cmd + + (* run a command *) + and run_cmd = function + | Die -> () + | Wait -> + with_lock_ pool (fun p -> Condition.wait p.cond p.mutex) + | Process (Job1 (f, x)) -> + begin try ignore (f x) with e -> pool.exn_handler e end; serve pool + | Process (Job2 (f, x, y)) -> + begin try ignore (f x y) with e -> pool.exn_handler e end; serve pool + | Process (Job3 (f, x, y, z)) -> + begin try ignore (f x y z) with e -> pool.exn_handler e end; serve pool + | Process (Job4 (f, x, y, z, w)) -> + begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool + + (* create a new worker thread *) + let launch_worker_ pool = ignore (Thread.create serve pool) + + (* launch the minimum required number of threads *) + let () = + for _i = 1 to P.min_size do launch_worker_ pool done + + (* heuristic criterion for starting a new thread. *) + let can_start_thread_ p = p.cur_size < P.max_size + + let run_job job = + (* acquire lock and push job in queue, or start thread directly + if the queue is empty *) + with_lock_ pool + (fun pool -> + if pool.stop then raise Stopped; + if Queue.is_empty pool.jobs && can_start_thread_ pool && pool.cur_idle = 0 + then ( + (* create the thread now, on [job], as it will not break order of + jobs. We do not want to wait for the busy threads to do our task + if we are allowed to spawn a new thread. *) + incr_size_ pool; + ignore (Thread.create run_cmd (Process job)) + ) else ( + (* cannot start thread, push and wait for some worker to pick it up *) + Queue.push job pool.jobs; + Condition.signal pool.cond; (* wake up *) + (* might want to process in the background, if all threads are busy *) + if pool.cur_idle = 0 && can_start_thread_ pool then ( + incr_size_ pool; + launch_worker_ pool; + ) + )) + + (* run the function on the argument in the given pool *) + let run1 f x = run_job (Job1 (f, x)) + + let run f = run1 f () + + let run2 f x y = run_job (Job2 (f, x, y)) + + let run3 f x y z = run_job (Job3 (f, x, y, z)) + + let run4 f x y z w = run_job (Job4 (f, x, y, z, w)) + + let active () = not pool.stop + + (* kill threads in the pool *) + let stop () = + with_lock_ pool + (fun p -> + p.stop <- true; + Queue.clear p.jobs) + + (* stop threads if pool is GC'd *) + let () = Gc.finalise (fun _ -> stop ()) pool + + (** {6 Futures} *) + module Fut = struct + type 'a handler = 'a state -> unit + + (** A proper future, with a delayed computation *) + type 'a cell = { + mutable state : 'a state; + mutable handlers : 'a handler list; (* handlers *) + f_mutex : Mutex.t; + condition : Condition.t; + } + + (** A future value of type 'a *) + type 'a t = + | Return of 'a + | FailNow of exn + | Run of 'a cell + + type 'a future = 'a t + + (** {2 Basic Future functions} *) + + let return x = Return x + + let fail e = FailNow e + + let create_cell () = { + state = Waiting; + handlers = []; + f_mutex = Mutex.create (); + condition = Condition.create (); + } + + let with_lock_ cell f = + Mutex.lock cell.f_mutex; + try + let x = f cell in + Mutex.unlock cell.f_mutex; + x + with e -> + Mutex.unlock cell.f_mutex; + raise e + + (* TODO: exception handler for handler errors *) + + let set_done_ cell x = + with_lock_ cell + (fun cell -> match cell.state with + | Waiting -> (* set state and signal *) + cell.state <- Done x; + Condition.broadcast cell.condition; + List.iter + (fun f -> try f cell.state with e -> pool.exn_handler e) + cell.handlers + | _ -> assert false) + + let set_fail_ cell e = + with_lock_ cell + (fun cell -> match cell.state with + | Waiting -> + cell.state <- Failed e; + Condition.broadcast cell.condition; + List.iter + (fun f -> try f cell.state with e -> pool.exn_handler e) + cell.handlers + | _ -> assert false) + + (* calls [f x], and put result or exception in [cell] *) + let run_and_set1 cell f x = + try + let y = f x in + set_done_ cell y + with e -> + set_fail_ cell e + + let run_and_set2 cell f x y = + try + let z = f x y in + set_done_ cell z + with e -> + set_fail_ cell e + + let make1 f x = + let cell = create_cell() in + run3 run_and_set1 cell f x; + Run cell + + let make f = make1 f () + + (*$R + List.iter + (fun n -> + let l = Sequence.(1 -- n) |> Sequence.to_list in + let l = List.rev_map (fun i -> + Fut.make + (fun () -> + Thread.delay 0.05; + 1 + )) l in + let l' = List.map Fut.get l in + OUnit.assert_equal n (List.fold_left (+) 0 l'); + ) + [ 10; 300; ] + *) + + let make2 f x y = + let cell = create_cell() in + run4 run_and_set2 cell f x y; + Run cell + + let get = function + | Return x -> x + | FailNow e -> raise e + | Run cell -> + let rec get_ cell = match cell.state with + | Waiting -> + Condition.wait cell.condition cell.f_mutex; (* wait *) + get_ cell + | Done x -> x + | Failed e -> raise e + in + with_lock_ cell get_ + + (* access the result without locking *) + let get_nolock_ = function + | Return x + | Run {state=Done x; _} -> x + | FailNow _ + | Run {state=(Failed _ | Waiting); _} -> assert false + + let state = function + | Return x -> Done x + | FailNow e -> Failed e + | Run cell -> + with_lock_ cell (fun cell -> cell.state) + + let is_done = function + | Return _ + | FailNow _ -> true + | Run cell -> + with_lock_ cell (fun c -> c.state <> Waiting) + + (** {2 Combinators *) + + let add_handler_ cell f = + with_lock_ cell + (fun cell -> match cell.state with + | Waiting -> cell.handlers <- f :: cell.handlers + | Done _ | Failed _ -> f cell.state) + + let on_finish fut k = match fut with + | Return x -> k (Done x) + | FailNow e -> k (Failed e) + | Run cell -> add_handler_ cell k + + let on_success fut k = + on_finish fut + (function + | Done x -> k x + | _ -> ()) + + let on_failure fut k = + on_finish fut + (function + | Failed e -> k e + | _ -> ()) + + let map_cell_ ~async f cell ~into:cell' = + add_handler_ cell + (function + | Done x -> + if async + then run3 run_and_set1 cell' f x + else run_and_set1 cell' f x + | Failed e -> set_fail_ cell' e + | Waiting -> assert false); + Run cell' + + let map_ ~async f fut = match fut with + | Return x -> + if async + then make1 f x + else Return (f x) + | FailNow e -> FailNow e + | Run cell -> map_cell_ ~async f cell ~into:(create_cell()) + + let map f fut = map_ ~async:false f fut + + let map_async f fut = map_ ~async:true f fut + + (*$R + let a = Fut.make (fun () -> 1) in + let b = Fut.map (fun x -> x+1) a in + let c = Fut.map (fun x -> x-1) b in + OUnit.assert_equal 1 (Fut.get c) + *) + + let app_ ~async f x = match f, x with + | Return f, Return x -> + if async + then make1 f x + else Return (f x) + | FailNow e, _ + | _, FailNow e -> FailNow e + | Return f, Run x -> + map_cell_ ~async (fun x -> f x) x ~into:(create_cell()) + | Run f, Return x -> + map_cell_ ~async (fun f -> f x) f ~into:(create_cell()) + | Run f, Run x -> + let cell' = create_cell () in + add_handler_ f + (function + | Done f -> ignore (map_cell_ ~async f x ~into:cell') + | Failed e -> set_fail_ cell' e + | Waiting -> assert false); + Run cell' + + let app f x = app_ ~async:false f x + + let app_async f x = app_ ~async:true f x + + let flat_map f fut = match fut with + | Return x -> f x + | FailNow e -> FailNow e + | Run cell -> + let cell' = create_cell() in + add_handler_ cell + (function + | Done x -> + let fut' = f x in + on_finish fut' + (function + | Done y -> set_done_ cell' y + | Failed e -> set_fail_ cell' e + | Waiting -> assert false + ) + | Failed e -> set_fail_ cell' e + | Waiting -> assert false + ); + Run cell' + + let and_then fut f = flat_map (fun _ -> f ()) fut + + type _ array_or_list = + | A_ : 'a array -> 'a array_or_list + | L_ : 'a list -> 'a array_or_list + + let iter_aol + : type a. a array_or_list -> (a -> unit) -> unit + = fun aol f -> match aol with + | A_ a -> Array.iter f a + | L_ l -> List.iter f l + + (* [sequence_ l f] returns a future that waits for every element of [l] + to return of fail, and call [f ()] to obtain the result (as a closure) + in case every element succeeded (otherwise a failure is + returned automatically) *) + let sequence_ + : type a res. a t array_or_list -> (unit -> res) -> res t + = fun aol f -> + let n = match aol with + | A_ a -> Array.length a + | L_ l -> List.length l + in + assert (n>0); + let cell = create_cell() in + let n_err = CCLock.create 0 in (* number of failed threads *) + let n_ok = CCLock.create 0 in (* number of succeeding threads *) + iter_aol aol + (fun fut -> + on_finish fut + (function + | Failed e -> + let x = CCLock.incr_then_get n_err in + (* if first failure, then seal [cell]'s fate now *) + if x=1 then set_fail_ cell e + | Done _ -> + let x = CCLock.incr_then_get n_ok in + (* if [n] successes, then [cell] succeeds. Otherwise, some + job has not finished or some job has failed. *) + if x = n then ( + let res = f () in + set_done_ cell res + ) + | Waiting -> assert false)); + Run cell + + (* map an array of futures to a future array *) + let sequence_a a = match a with + | [||] -> return [||] + | _ -> + sequence_ (A_ a) + (fun () -> Array.map get_nolock_ a) + + let map_a f a = sequence_a (Array.map f a) + + let sequence_l l = match l with + | [] -> return [] + | _ :: _ -> + sequence_ (L_ l) (fun () -> List.map get_nolock_ l) + + (* reverse twice *) + let map_l f l = + let l = List.rev_map f l in + sequence_ (L_ l) + (fun () -> List.rev_map get_nolock_ l) + + (*$R + let l = CCList.(1 -- 50) in + let l' = l + |> List.map + (fun x -> Fut.make (fun () -> Thread.delay 0.1; x*10)) + |> Fut.sequence_l + |> Fut.map (List.fold_left (+) 0) + in + let expected = List.fold_left (fun acc x -> acc + 10 * x) 0 l in + OUnit.assert_equal expected (Fut.get l') + *) + + (*$R + let l = CCList.(1 -- 50) in + let l' = l + |> List.map + (fun x -> Fut.make (fun () -> Thread.delay 0.1; if x = 5 then raise Exit; x)) + |> Fut.sequence_l + |> Fut.map (List.fold_left (+) 0) + in + OUnit.assert_raises Exit (fun () -> Fut.get l') + *) + + let choose_ + : type a. a t array_or_list -> a t + = fun aol -> + let cell = create_cell() in + let is_done = CCLock.create false in + iter_aol aol + (fun fut -> + on_finish fut + (fun res -> match res with + | Waiting -> assert false + | Done x -> + let was_done = CCLock.get_then_clear is_done in + if not was_done then set_done_ cell x + | Failed e -> + let was_done = CCLock.get_then_clear is_done in + if not was_done then set_fail_ cell e)); + Run cell + + let choose_a a = choose_ (A_ a) + + let choose_l l = choose_ (L_ l) + + let sleep time = make1 Thread.delay time + + (*$R + let start = Unix.gettimeofday () in + let pause = 0.2 and n = 10 in + let l = CCList.(1 -- n) + |> List.map (fun _ -> Fut.make (fun () -> Thread.delay pause)) + in + List.iter Fut.get l; + let stop = Unix.gettimeofday () in + OUnit.assert_bool "some_parallelism" (stop -. start < float_of_int n *. pause); + *) + + module Infix = struct + let (>>=) x f = flat_map f x + let (>>) a f = and_then a f + let (>|=) a f = map f a + let (<*>) = app + end + + include Infix + end +end diff --git a/src/threads/CCPool.mli b/src/threads/CCPool.mli new file mode 100644 index 00000000..9697c6d1 --- /dev/null +++ b/src/threads/CCPool.mli @@ -0,0 +1,167 @@ + +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Thread Pool, and Futures} + + Renamed and heavily updated from [CCFuture] + @since 0.16 *) + +type +'a state = + | Done of 'a + | Waiting + | Failed of exn + +module type PARAM = sig + val min_size : int + (** Minimum number of threads in the pool *) + + val max_size : int + (** Maximum number of threads in the pool *) +end + +exception Stopped + +(** {2 Create a new Pool} *) +module Make(P : PARAM) : sig + val run : (unit -> _) -> unit + (** [run f] schedules [f] for being executed in the thread pool *) + + val run1 : ('a -> _) -> 'a -> unit + (** [run1 f x] is similar to [run (fun () -> f x)] *) + + val run2 : ('a -> 'b -> _) -> 'a -> 'b -> unit + + val run3 : ('a -> 'b -> 'c -> _) -> 'a -> 'b -> 'c -> unit + + val set_exn_handler : (exn -> unit) -> unit + + val active : unit -> bool + (** [active ()] is true as long as [stop()] has not been called yet *) + + val stop : unit -> unit + (** After calling [stop ()], Most functions will raise Stopped. + This has the effect of preventing new tasks from being executed. *) + + (** {6 Futures} + + The futures are registration points for callbacks, storing a {!state}, + that are executed in the pool using {!run}. *) + module Fut : sig + type 'a t + (** A future value of type 'a *) + + type 'a future = 'a t + + (** {2 Constructors} *) + + val return : 'a -> 'a t + (** Future that is already computed *) + + val fail : exn -> 'a t + (** Future that fails immediately *) + + val make : (unit -> 'a) -> 'a t + (** Create a future, representing a value that will be computed by + the function. If the function raises, the future will fail. *) + + val make1 : ('a -> 'b) -> 'a -> 'b t + + val make2 : ('a -> 'b -> 'c) -> 'a -> 'b -> 'c t + + (** {2 Basics} *) + + val get : 'a t -> 'a + (** Blocking get: wait for the future to be evaluated, and get the value, + or the exception that failed the future is returned. + raise e if the future failed with e *) + + val state : 'a t -> 'a state + (** State of the future *) + + val is_done : 'a t -> bool + (** Is the future evaluated (success/failure)? *) + + (** {2 Combinators} *) + + val on_success : 'a t -> ('a -> unit) -> unit + (** Attach a handler to be called upon success. + The handler should not call functions on the future. + Might be evaluated now if the future is already done. *) + + val on_failure : _ t -> (exn -> unit) -> unit + (** Attach a handler to be called upon failure. + The handler should not call any function on the future. + Might be evaluated now if the future is already done. *) + + val on_finish : 'a t -> ('a state -> unit) -> unit + (** Attach a handler to be called when the future is evaluated. + The handler should not call functions on the future. + Might be evaluated now if the future is already done. *) + + val flat_map : ('a -> 'b t) -> 'a t -> 'b t + (** Monadic combination of futures *) + + val and_then : 'a t -> (unit -> 'b t) -> 'b t + (** Wait for the first future to succeed, then launch the second *) + + val sequence_a : 'a t array -> 'a array t + (** Future that waits for all previous futures to terminate. If any future + in the array fails, [sequence_a l] fails too. *) + + val map_a : ('a -> 'b t) -> 'a array -> 'b array t + (** [map_l f a] maps [f] on every element of [a], and will return + the array of every result if all calls succeed, or an error otherwise. *) + + val sequence_l : 'a t list -> 'a list t + (** Future that waits for all previous futures to terminate. If any future + in the list fails, [sequence_l l] fails too. *) + + val map_l : ('a -> 'b t) -> 'a list -> 'b list t + (** [map_l f l] maps [f] on every element of [l], and will return + the list of every result if all calls succeed, or an error otherwise. *) + + val choose_a : 'a t array -> 'a t + (** Choose among those futures (the first to terminate). Behaves like + the first future that terminates, by failing if the future fails *) + + val choose_l : 'a t list -> 'a t + (** Choose among those futures (the first to terminate). Behaves like + the first future that terminates, by failing if the future fails *) + + val map : ('a -> 'b) -> 'a t -> 'b t + (** Maps the value inside the future. The function doesn't run in its + own task; if it can take time, use {!flat_map} or {!map_async} *) + + val map_async : ('a -> 'b) -> 'a t -> 'b t + (** Maps the value inside the future, to be computed in a separated job. *) + + val app : ('a -> 'b) t -> 'a t -> 'b t + (** [app f x] applies the result of [f] to the result of [x] *) + + val app_async : ('a -> 'b) t -> 'a t -> 'b t + (** [app f x] applies the result of [f] to the result of [x], in + a separated job scheduled in the pool *) + + val sleep : float -> unit t + (** Future that returns with success in the given amount of seconds. Blocks + the thread! If you need to wait on many events, consider + using {!CCTimer}. *) + + module Infix : sig + val (>>=) : 'a t -> ('a -> 'b t) -> 'b t + val (>>) : 'a t -> (unit -> 'b t) -> 'b t + val (>|=) : 'a t -> ('a -> 'b) -> 'b t + val (<*>) : ('a -> 'b) t -> 'a t -> 'b t + end + + val (>>=) : 'a t -> ('a -> 'b t) -> 'b t + + val (>>) : 'a t -> (unit -> 'b t) -> 'b t + + val (>|=) : 'a t -> ('a -> 'b) -> 'b t + (** Alias to {!map} *) + + val (<*>): ('a -> 'b) t -> 'a t -> 'b t + (** Alias to {!app} *) + end +end diff --git a/src/threads/CCSemaphore.ml b/src/threads/CCSemaphore.ml new file mode 100644 index 00000000..17d0b6de --- /dev/null +++ b/src/threads/CCSemaphore.ml @@ -0,0 +1,117 @@ +(** {1 Semaphores} *) + +type t = { + mutable n : int; + mutex : Mutex.t; + cond : Condition.t; +} + +let create n = + if n <= 0 then invalid_arg "Semaphore.create"; + { n; + mutex=Mutex.create(); + cond=Condition.create(); + } + +let get t = t.n + +(* assume [t.mutex] locked, try to acquire [t] *) +let acquire_once_locked_ m t = + while t.n < m do + Condition.wait t.cond t.mutex; + done; + assert (t.n >= m); + t.n <- t.n - m; + Condition.broadcast t.cond; + Mutex.unlock t.mutex + +let acquire m t = + Mutex.lock t.mutex; + acquire_once_locked_ m t + +(* assume [t.mutex] locked, try to release [t] *) +let release_once_locked_ m t = + t.n <- t.n + m; + Condition.broadcast t.cond; + Mutex.unlock t.mutex + +let release m t = + Mutex.lock t.mutex; + release_once_locked_ m t; + () + +(*$R + let s = create 1 in + let r = CCLock.create false in + let _ = Thread.create (fun s -> acquire 5 s; CCLock.set r true) s in + Thread.yield (); + assert_equal false (CCLock.get r); + release 4 s; + Thread.delay 0.2; + assert_equal true (CCLock.get r); + assert_equal 0 (get s) +*) + +let with_acquire ~n t ~f = + Mutex.lock t.mutex; + acquire_once_locked_ n t; + try + let x = f() in + release_once_locked_ n t; + x + with e -> + release_once_locked_ n t; + raise e + +(*$R + let s = create 5 in + let n = CCLock.create 0 in + let a = Array.init 100 (fun i -> + Thread.create (fun _ -> + with_acquire ~n:(1 + (i mod 5)) s + ~f:(fun () -> CCLock.incr n) + ) ()) + in + Array.iter Thread.join a; + assert_equal ~printer:CCInt.to_string 5 (get s); + assert_equal ~printer:CCInt.to_string 100 (CCLock.get n) +*) + +let wait_until_at_least ~n t ~f = + Mutex.lock t.mutex; + while t.n < n do + Condition.wait t.cond t.mutex; + done; + assert (t.n >= n); + Mutex.unlock t.mutex; + f () + +(*$R + let output s = () in + let s = create 2 in + let res = CCLock.create false in + let id = Thread.create + (fun () -> + output "start"; + wait_until_at_least ~n:5 s + ~f:(fun () -> + assert (get s >= 5); + output "modify now"; + CCLock.set res true) + ) () + in + output "launched thread"; + Thread.yield(); + assert_bool "start" (not (CCLock.get res)); + output "release 2"; + release 2 s; + Thread.yield(); + assert_bool "after release 2" (not (CCLock.get res)); + output "release 1"; + release 1 s; + (* should work now *) + Thread.delay 0.2; + Thread.join id; + output "check"; + assert_bool "after release 1" (CCLock.get res) +*) diff --git a/src/threads/CCSemaphore.mli b/src/threads/CCSemaphore.mli new file mode 100644 index 00000000..5734d31c --- /dev/null +++ b/src/threads/CCSemaphore.mli @@ -0,0 +1,31 @@ +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Semaphores} + + @since 0.13 *) + +type t +(** A semaphore *) + +val create : int -> t +(** [create n] creates a semaphore with initial value [n] + @raise Invalid_argument if [n <= 0] *) + +val get : t -> int +(** Current value *) + +val acquire : int -> t -> unit +(** [acquire n s] blocks until [get s >= n], then atomically + sets [s := !s - n] *) + +val release : int -> t -> unit +(** [release n s] atomically sets [s := !s + n] *) + +val with_acquire : n:int -> t -> f:(unit -> 'a) -> 'a +(** [with_acquire ~n s ~f] first acquires [s] with [n] units, + calls [f ()], and then release [s] with [n] units. + Safely release the semaphore even if [f ()] fails *) + +val wait_until_at_least : n:int -> t -> f:(unit -> 'a) -> 'a +(** [wait_until_at_least ~n s ~f] waits until [get s >= n], then calls [f ()] + and returns its result. Doesn't modify the semaphore. *) diff --git a/src/threads/CCThread.ml b/src/threads/CCThread.ml new file mode 100644 index 00000000..eb274097 --- /dev/null +++ b/src/threads/CCThread.ml @@ -0,0 +1,85 @@ +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Threads} *) + +type t = Thread.t + +let spawn f = Thread.create f () + +let spawn1 f x = Thread.create f x + +let spawn2 f x y = Thread.create (fun () -> f x y) () + +let detach f = ignore (Thread.create f ()) + +let finally_ f x ~h = + try + let res = f x in + ignore (h ()); + res + with e -> + ignore (h()); + raise e + +module Arr = struct + let spawn n f = + Array.init n (fun i -> Thread.create f i) + + let join a = Array.iter Thread.join a +end + +(*$R + let l = CCLock.create 0 in + let a = Arr.spawn 101 (fun i -> CCLock.update l ((+) i)) in + Arr.join a; + let n = Sequence.(1 -- 100 |> fold (+) 0) in + assert_equal ~printer:CCInt.to_string n (CCLock.get l) +*) + +module Barrier = struct + type t = { + lock: Mutex.t; + cond: Condition.t; + mutable activated: bool; + } + + let create () = { + lock=Mutex.create(); + cond=Condition.create(); + activated=false; + } + + let with_lock_ b f = + Mutex.lock b.lock; + finally_ f () ~h:(fun () -> Mutex.unlock b.lock) + + let reset b = with_lock_ b (fun () -> b.activated <- false) + + let wait b = + with_lock_ b + (fun () -> + while not b.activated do + Condition.wait b.cond b.lock + done) + + let activate b = + with_lock_ b + (fun () -> + if not b.activated then ( + b.activated <- true; + Condition.broadcast b.cond)) + + let activated b = with_lock_ b (fun () -> b.activated) +end + +(*$R + let b = Barrier.create () in + let res = CCLock.create 0 in + let t1 = spawn (fun _ -> Barrier.wait b; CCLock.incr res) + and t2 = spawn (fun _ -> Barrier.wait b; CCLock.incr res) in + Thread.delay 0.2; + assert_equal 0 (CCLock.get res); + Barrier.activate b; + Thread.join t1; Thread.join t2; + assert_equal 2 (CCLock.get res) +*) diff --git a/src/threads/CCThread.mli b/src/threads/CCThread.mli new file mode 100644 index 00000000..fe54e6f8 --- /dev/null +++ b/src/threads/CCThread.mli @@ -0,0 +1,58 @@ +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Threads} + + {b status: unstable} + @since 0.13 *) + +type t = Thread.t + +val spawn : (unit -> _) -> t +(** [spawn f] creates a new thread that runs [f ()] *) + +val spawn1 : ('a -> _) -> 'a -> t +(** [spawn1 f x] is like [spawn (fun () -> f x)]. + @since 0.16 *) + +val spawn2 : ('a -> 'b -> _) -> 'a -> 'b -> t +(** [spawn2 f x y] is like [spawn (fun () -> f x y)]. + @since 0.16 *) + +val detach : (unit -> 'a) -> unit +(** [detach f] is the same as [ignore (spawn f)] *) + +(** {2 Array of threads} *) +module Arr : sig + val spawn : int -> (int -> 'a) -> t array + (** [A.spawn n f] creates an array [res] of length [n], such that + [res.(i) = spawn (fun () -> f i)] *) + + val join : t array -> unit + (** [A.join a] joins every thread in [a] *) +end + +(** {2 Single-Use Barrier} *) + +module Barrier : sig + type t + (** Barrier, used to synchronize threads *) + + val create : unit -> t + (** Create a barrier *) + + val reset : t -> unit + (** Reset to initial (non-triggered) state *) + + val wait : t -> unit + (** [wait b] waits for barrier [b] to be activated by [activate b]. + All threads calling this wait until [activate b] is called. + If [b] is already activated, [wait b] does nothing *) + + val activate : t -> unit + (** [activate b] unblocks all threads that were waiting on [b] *) + + val activated : t -> bool + (** [activated b] returns [true] iff [activate b] was called, and [reset b] + was not called since. In other words, [activated b = true] means + [wait b] will not block. *) +end diff --git a/src/threads/CCTimer.ml b/src/threads/CCTimer.ml new file mode 100644 index 00000000..3fd93934 --- /dev/null +++ b/src/threads/CCTimer.ml @@ -0,0 +1,195 @@ + +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Event timer} *) + +type job = + | Job : float * (unit -> 'a) -> job + +module TaskHeap = CCHeap.Make(struct + type t = job + let leq (Job(f1,_)) (Job (f2,_)) = f1 <= f2 +end) + +exception Stopped + +type t = { + mutable stop : bool; + mutable tasks : TaskHeap.t; + mutable exn_handler : (exn -> unit); + t_mutex : Mutex.t; + fifo_in : Unix.file_descr; + fifo_out : Unix.file_descr; +} + +let set_exn_handler timer f = timer.exn_handler <- f + +let standby_wait = 10. +(* when no task is scheduled, this is the amount of time that is waited + in a row for something to happen. This is also the maximal delay + between the call to {!stop} and the actual termination of the + thread. *) + +let epsilon = 0.0001 +(* accepted time diff for actions. *) + +let with_lock_ t f = + Mutex.lock t.t_mutex; + try + let x = f t in + Mutex.unlock t.t_mutex; + x + with e -> + Mutex.unlock t.t_mutex; + raise e + +type command = + | Quit + | Run : (unit -> _) -> command + | Wait of float + +let pop_task_ t = + let tasks, _ = TaskHeap.take_exn t.tasks in + t.tasks <- tasks + +let call_ timer f = + try ignore (f ()) + with e -> timer.exn_handler e + +(* check next task *) +let next_task_ timer = match TaskHeap.find_min timer.tasks with + | _ when timer.stop -> Quit + | None -> Wait standby_wait + | Some Job (time, f) -> + let now = Unix.gettimeofday () in + if now +. epsilon > time then ( + (* now! *) + pop_task_ timer; + Run f + ) else Wait (time -. now) + +(* The main thread function: wait for next event, run it, and loop *) +let serve timer = + let buf = Bytes.make 1 '_' in + (* acquire lock, call [process_task] and do as it commands *) + let rec next () = match with_lock_ timer next_task_ with + | Quit -> () + | Run f -> + call_ timer f; (* call outside of any lock *) + next () + | Wait delay -> wait delay + (* wait for [delay] seconds, or until something happens on [fifo_in] *) + and wait delay = + let read = Thread.wait_timed_read timer.fifo_in delay in + (* remove char from fifo, so that next write can happen *) + if read then ignore (Unix.read timer.fifo_in buf 0 1); + next () + in + next () + +let nop_handler_ _ = () + +let create () = + let fifo_in, fifo_out = Unix.pipe () in + let timer = { + stop = false; + tasks = TaskHeap.empty; + exn_handler = nop_handler_; + t_mutex = Mutex.create (); + fifo_in; + fifo_out; + } in + (* start a thread to process tasks *) + let _t = Thread.create serve timer in + timer + +let underscore_ = Bytes.make 1 '_' + +(* awake the thread *) +let awaken_ timer = + ignore (Unix.single_write timer.fifo_out underscore_ 0 1) + +(** [at s t ~f] will run [f ()] at the Unix echo [t] *) +let at timer time ~f = + if timer.stop then raise Stopped; + let now = Unix.gettimeofday () in + if now >= time + then call_ timer f + else + with_lock_ timer + (fun timer -> + if timer.stop then raise Stopped; + (* time of the next scheduled event *) + let next_time = match TaskHeap.find_min timer.tasks with + | None -> max_float + | Some Job (d, _) -> d + in + (* insert task *) + timer.tasks <- TaskHeap.insert (Job (time, f)) timer.tasks; + (* see if the timer thread needs to be awaken earlier *) + if time < next_time then awaken_ timer + ) + +let after timer delay ~f = + assert (delay >= 0.); + let now = Unix.gettimeofday () in + at timer (now +. delay) ~f + +exception ExitEvery + +let every ?delay timer d ~f = + let rec run () = + try + ignore (f ()); + schedule() + with ExitEvery -> () (* stop *) + and schedule () = after timer d ~f:run in + match delay with + | None -> run() + | Some d -> after timer d ~f:run + +(*$R + let start = Unix.gettimeofday() in + let timer = create() in + let res = CCLock.create 0 in + let stop = ref 0. in + every timer 0.1 + ~f:(fun () -> + if CCLock.incr_then_get res > 5 then ( + stop := Unix.gettimeofday(); + raise ExitEvery + )); + Thread.delay 0.7; + OUnit.assert_equal ~printer:CCInt.to_string 6 (CCLock.get res); + OUnit.assert_bool "estimate delay" (abs_float (!stop -. start -. 0.5) < 0.1); +*) + +let active timer = not timer.stop + +(** Stop the given timer, cancelling pending tasks *) +let stop timer = + with_lock_ timer + (fun timer -> + if not timer.stop then ( + timer.stop <- true; + (* empty heap of tasks *) + timer.tasks <- TaskHeap.empty; + (* tell the thread to stop *) + awaken_ timer; + ) + ) + +(*$R + (* scenario: n := 1; n := n*4 ; n := n+2; res := n *) + let timer = create () in + let n = CCLock.create 1 in + let res = CCLock.create 0 in + after timer 0.3 + ~f:(fun () -> CCLock.update n (fun x -> x+2)); + ignore (Thread.create + (fun _ -> Thread.delay 0.4; CCLock.set res (CCLock.get n)) ()); + after timer 0.2 + ~f:(fun () -> CCLock.update n (fun x -> x * 4)); + Thread.delay 0.6 ; + OUnit.assert_equal 6 (CCLock.get res); +*) diff --git a/src/threads/CCTimer.mli b/src/threads/CCTimer.mli new file mode 100644 index 00000000..f0068cf8 --- /dev/null +++ b/src/threads/CCTimer.mli @@ -0,0 +1,43 @@ + +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Event timer} + + Used to be part of [CCFuture] + @since 0.16 *) + +type t +(** A scheduler for events. It runs in its own thread. *) + +val create : unit -> t +(** A new timer. *) + +val set_exn_handler : t -> (exn -> unit) -> unit +(** [set_exn_handler timer f] registers [f] so that any exception + raised by a task scheduled in [timer] is given to [f] *) + +exception Stopped + +val after : t -> float -> f:(unit -> _) -> unit +(** Call the callback [f] after the given number of seconds. + @raise Stopped if the timer was stopped *) + +val at : t -> float -> f:(unit -> _) -> unit +(** Create a future that evaluates to [()] at the given Unix timestamp + @raise Stopped if the timer was stopped *) + +exception ExitEvery + +val every : ?delay:float -> t -> float -> f:(unit -> _) -> unit +(** [every timer n ~f] calls [f ()] every [n] seconds. + [f()] can raise ExitEvery to stop the cycle. + @param delay if provided, the first call to [f ()] is delayed by + that many seconds. + @raise Stopped if the timer was stopped *) + +val stop : t -> unit +(** Stop the given timer, cancelling pending tasks. Idempotent. + From now on, calling most other operations on the timer will raise Stopped. *) + +val active : t -> bool +(** Returns [true] until [stop t] has been called. *)