diff --git a/README.md b/README.md index 285b34bf..c6d66cc4 100644 --- a/README.md +++ b/README.md @@ -180,6 +180,7 @@ In the library `containers.thread`, for preemptive system threads: monadic futures, and MVars (concurrent boxes) - `CCLock`, values protected by locks - `CCSemaphore`, a simple implementation of semaphores +- `CCThread` basic wrappers for `Thread` ### Misc diff --git a/_oasis b/_oasis index 08da485a..5ad879cd 100644 --- a/_oasis +++ b/_oasis @@ -130,7 +130,7 @@ Library "containers_misc" Library "containers_thread" Path: src/threads/ - Modules: CCFuture, CCLock, CCSemaphore + Modules: CCFuture, CCLock, CCSemaphore, CCThread FindlibName: thread FindlibParent: containers Build$: flag(thread) diff --git a/src/threads/CCThread.ml b/src/threads/CCThread.ml new file mode 100644 index 00000000..b509d24a --- /dev/null +++ b/src/threads/CCThread.ml @@ -0,0 +1,142 @@ + +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Threads} *) + +type t = Thread.t + +let spawn f = Thread.create f () + +let detach f = ignore (Thread.create f ()) + +module Array = struct + let spawn n f = + Array.init n (fun i -> Thread.create f i) + + let join a = Array.iter Thread.join a +end + +(*$R + let l = CCLock.create 0 in + let a = Array.spawn 101 (fun i -> CCLock.update l ((+) i)) in + Array.join a; + let n = Sequence.(1 -- 100 |> fold (+) 0) in + assert_equal ~printer:CCInt.to_string n (CCLock.get l) +*) + +module Queue = struct + type 'a t = { + q : 'a Queue.t; + lock : Mutex.t; + cond : Condition.t; + capacity : int; + mutable size : int; + } + + let create n = + if n < 1 then invalid_arg "CCThread.Queue.create"; + let q = { + q=Queue.create(); + lock=Mutex.create(); + cond=Condition.create(); + capacity=n; + size=0; + } in + q + + let incr_size_ q = assert(q.size < q.capacity); q.size <- q.size + 1 + let decr_size_ q = q.size <- q.size - 1 + + let with_lock_ q f = + Mutex.lock q.lock; + try + let x = f () in + Mutex.unlock q.lock; + x + with e -> + Mutex.unlock q.lock; + raise e + + let push q x = + with_lock_ q + (fun () -> + while q.size = q.capacity do + Condition.wait q.cond q.lock + done; + assert (q.size < q.capacity); + Queue.push x q.q; + if q.size = 0 then Condition.signal q.cond; + incr_size_ q; + ) + + let take q = + with_lock_ q + (fun () -> + while q.size = 0 do + Condition.wait q.cond q.lock + done; + let x = Queue.take q.q in + if q.size = q.capacity then Condition.signal q.cond; + decr_size_ q; + x + ) + + (*$R + let q = Queue.create 1 in + let t1 = spawn (fun () -> Queue.push q 1; Queue.push q 2) in + let t2 = spawn (fun () -> Queue.push q 3; Queue.push q 4) in + let l = CCLock.create [] in + let t3 = spawn (fun () -> for i = 1 to 4 do + let x = Queue.take q in + CCLock.update l (fun l -> x :: l) + done) + in + Thread.join t1; Thread.join t2; Thread.join t3; + assert_equal [1;2;3;4] (List.sort Pervasives.compare (CCLock.get l)) + *) + + (* TODO: more efficient versions (push or pop several items at once when possible) *) + + let push_list q l = List.iter (push q) l + + let rec take_list q n = + if n=0 then [] + else + let x = take q in + x :: take_list q (n-1) + + let try_take q = + with_lock_ q + (fun () -> + if q.size > 0 + then ( + decr_size_ q; + Some (Queue.take q.q) + ) else None + ) + + let try_push q x = + with_lock_ q + (fun () -> + if q.size < q.capacity + then ( + incr_size_ q; + Queue.push x q.q; + Condition.signal q.cond; + true + ) else false + ) + + let peek q = + with_lock_ q + (fun () -> + try Some (Queue.peek q.q) with Queue.Empty -> None + ) + + let size q = with_lock_ q (fun () -> q.size) + + let capacity q = q.capacity +end + + + diff --git a/src/threads/CCThread.mli b/src/threads/CCThread.mli new file mode 100644 index 00000000..c7e02743 --- /dev/null +++ b/src/threads/CCThread.mli @@ -0,0 +1,70 @@ + +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Threads} + + {b status: unstable} + @since NEXT_RELEASE *) + +type t = Thread.t + +val spawn : (unit -> 'a) -> t +(** [spawn f] creates a new thread that runs [f ()] *) + +val detach : (unit -> 'a) -> unit +(** [detach f] is the same as [ignore (spawn f)] *) + +(** {2 Array of threads} *) +module Array : sig + val spawn : int -> (int -> 'a) -> t array + (** [A.spawn n f] creates an array [res] of length [n], such that + [res.(i) = spawn (fun () -> f i)] *) + + val join : t array -> unit + (** [A.join a] joins every thread in [a] *) +end + +(** {2 Blocking Queue} + + This queue has a limited size. Pushing a value on the queue when it + is full will block *) +module Queue : sig + type 'a t + (** Safe-thread queue for values of type ['a] *) + + val create : int -> 'a t + (** Create a new queue of size [n] + @raise Invalid_argument if [n < 1] *) + + val push : 'a t -> 'a -> unit + (** [push q x] pushes [x] into [q], blocking if the queue is full *) + + val take : 'a t -> 'a + (** Take the first element, blocking if needed *) + + val push_list : 'a t -> 'a list -> unit + (** Push items of the list, one by one *) + + val take_list : 'a t -> int -> 'a list + (** [take_list n q] takes [n] elements out of [q] *) + + val try_take : 'a t -> 'a option + (** Take the first element if the queue is not empty, return [None] + otherwise *) + + val try_push : 'a t -> 'a -> bool + (** [try_push q x] pushes [x] into [q] if [q] is not full, in which + case it returns [true]. + If it fails because [q] is full, it returns [false] *) + + val peek : 'a t -> 'a option + (** [peek q] returns [Some x] if [x] is the first element of [q], + otherwise it returns [None] *) + + val size : _ t -> int + (** Number of elements currently in the queue *) + + val capacity : _ t -> int + (** Number of values the queue can hold *) +end +