new module CCThread, utils for threading (+ blocking queue)

This commit is contained in:
Simon Cruanes 2015-08-31 22:30:33 +02:00
parent deab575bb3
commit 767999271c
4 changed files with 214 additions and 1 deletions

View file

@ -180,6 +180,7 @@ In the library `containers.thread`, for preemptive system threads:
monadic futures, and MVars (concurrent boxes) monadic futures, and MVars (concurrent boxes)
- `CCLock`, values protected by locks - `CCLock`, values protected by locks
- `CCSemaphore`, a simple implementation of semaphores - `CCSemaphore`, a simple implementation of semaphores
- `CCThread` basic wrappers for `Thread`
### Misc ### Misc

2
_oasis
View file

@ -130,7 +130,7 @@ Library "containers_misc"
Library "containers_thread" Library "containers_thread"
Path: src/threads/ Path: src/threads/
Modules: CCFuture, CCLock, CCSemaphore Modules: CCFuture, CCLock, CCSemaphore, CCThread
FindlibName: thread FindlibName: thread
FindlibParent: containers FindlibParent: containers
Build$: flag(thread) Build$: flag(thread)

142
src/threads/CCThread.ml Normal file
View file

@ -0,0 +1,142 @@
(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Threads} *)
type t = Thread.t
let spawn f = Thread.create f ()
let detach f = ignore (Thread.create f ())
module Array = struct
let spawn n f =
Array.init n (fun i -> Thread.create f i)
let join a = Array.iter Thread.join a
end
(*$R
let l = CCLock.create 0 in
let a = Array.spawn 101 (fun i -> CCLock.update l ((+) i)) in
Array.join a;
let n = Sequence.(1 -- 100 |> fold (+) 0) in
assert_equal ~printer:CCInt.to_string n (CCLock.get l)
*)
module Queue = struct
type 'a t = {
q : 'a Queue.t;
lock : Mutex.t;
cond : Condition.t;
capacity : int;
mutable size : int;
}
let create n =
if n < 1 then invalid_arg "CCThread.Queue.create";
let q = {
q=Queue.create();
lock=Mutex.create();
cond=Condition.create();
capacity=n;
size=0;
} in
q
let incr_size_ q = assert(q.size < q.capacity); q.size <- q.size + 1
let decr_size_ q = q.size <- q.size - 1
let with_lock_ q f =
Mutex.lock q.lock;
try
let x = f () in
Mutex.unlock q.lock;
x
with e ->
Mutex.unlock q.lock;
raise e
let push q x =
with_lock_ q
(fun () ->
while q.size = q.capacity do
Condition.wait q.cond q.lock
done;
assert (q.size < q.capacity);
Queue.push x q.q;
if q.size = 0 then Condition.signal q.cond;
incr_size_ q;
)
let take q =
with_lock_ q
(fun () ->
while q.size = 0 do
Condition.wait q.cond q.lock
done;
let x = Queue.take q.q in
if q.size = q.capacity then Condition.signal q.cond;
decr_size_ q;
x
)
(*$R
let q = Queue.create 1 in
let t1 = spawn (fun () -> Queue.push q 1; Queue.push q 2) in
let t2 = spawn (fun () -> Queue.push q 3; Queue.push q 4) in
let l = CCLock.create [] in
let t3 = spawn (fun () -> for i = 1 to 4 do
let x = Queue.take q in
CCLock.update l (fun l -> x :: l)
done)
in
Thread.join t1; Thread.join t2; Thread.join t3;
assert_equal [1;2;3;4] (List.sort Pervasives.compare (CCLock.get l))
*)
(* TODO: more efficient versions (push or pop several items at once when possible) *)
let push_list q l = List.iter (push q) l
let rec take_list q n =
if n=0 then []
else
let x = take q in
x :: take_list q (n-1)
let try_take q =
with_lock_ q
(fun () ->
if q.size > 0
then (
decr_size_ q;
Some (Queue.take q.q)
) else None
)
let try_push q x =
with_lock_ q
(fun () ->
if q.size < q.capacity
then (
incr_size_ q;
Queue.push x q.q;
Condition.signal q.cond;
true
) else false
)
let peek q =
with_lock_ q
(fun () ->
try Some (Queue.peek q.q) with Queue.Empty -> None
)
let size q = with_lock_ q (fun () -> q.size)
let capacity q = q.capacity
end

70
src/threads/CCThread.mli Normal file
View file

@ -0,0 +1,70 @@
(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Threads}
{b status: unstable}
@since NEXT_RELEASE *)
type t = Thread.t
val spawn : (unit -> 'a) -> t
(** [spawn f] creates a new thread that runs [f ()] *)
val detach : (unit -> 'a) -> unit
(** [detach f] is the same as [ignore (spawn f)] *)
(** {2 Array of threads} *)
module Array : sig
val spawn : int -> (int -> 'a) -> t array
(** [A.spawn n f] creates an array [res] of length [n], such that
[res.(i) = spawn (fun () -> f i)] *)
val join : t array -> unit
(** [A.join a] joins every thread in [a] *)
end
(** {2 Blocking Queue}
This queue has a limited size. Pushing a value on the queue when it
is full will block *)
module Queue : sig
type 'a t
(** Safe-thread queue for values of type ['a] *)
val create : int -> 'a t
(** Create a new queue of size [n]
@raise Invalid_argument if [n < 1] *)
val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] into [q], blocking if the queue is full *)
val take : 'a t -> 'a
(** Take the first element, blocking if needed *)
val push_list : 'a t -> 'a list -> unit
(** Push items of the list, one by one *)
val take_list : 'a t -> int -> 'a list
(** [take_list n q] takes [n] elements out of [q] *)
val try_take : 'a t -> 'a option
(** Take the first element if the queue is not empty, return [None]
otherwise *)
val try_push : 'a t -> 'a -> bool
(** [try_push q x] pushes [x] into [q] if [q] is not full, in which
case it returns [true].
If it fails because [q] is full, it returns [false] *)
val peek : 'a t -> 'a option
(** [peek q] returns [Some x] if [x] is the first element of [q],
otherwise it returns [None] *)
val size : _ t -> int
(** Number of elements currently in the queue *)
val capacity : _ t -> int
(** Number of values the queue can hold *)
end