diff --git a/_oasis b/_oasis index 26cffeba..29a479c7 100644 --- a/_oasis +++ b/_oasis @@ -114,7 +114,7 @@ Library "containers_bigarray" Library "containers_thread" Path: src/threads/ - Modules: CCFuture, CCLock, CCSemaphore, CCThread + Modules: CCFuture, CCLock, CCSemaphore, CCThread, CCBlockingQueue FindlibName: thread FindlibParent: containers Build$: flag(thread) diff --git a/benchs/run_benchs.ml b/benchs/run_benchs.ml index 555ca079..dea99e05 100644 --- a/benchs/run_benchs.ml +++ b/benchs/run_benchs.ml @@ -954,7 +954,7 @@ module Deque = struct end module Thread = struct - module Q = CCThread.Queue + module Q = CCBlockingQueue module type TAKE_PUSH = sig val take : 'a Q.t -> 'a diff --git a/doc/intro.txt b/doc/intro.txt index 6855b114..5fbc2bf0 100644 --- a/doc/intro.txt +++ b/doc/intro.txt @@ -148,6 +148,7 @@ Moved to its own repository {4 Others} {!modules: +CCBlockingQueue CCFuture CCLock CCSemaphore diff --git a/src/threads/CCBlockingQueue.ml b/src/threads/CCBlockingQueue.ml new file mode 100644 index 00000000..d767b4ab --- /dev/null +++ b/src/threads/CCBlockingQueue.ml @@ -0,0 +1,191 @@ + +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Blocking Queue} *) + +type 'a t = { + q : 'a Queue.t; + lock : Mutex.t; + cond : Condition.t; + capacity : int; + mutable size : int; +} + +let create n = + if n < 1 then invalid_arg "BloquingQueue.create"; + let q = { + q=Queue.create(); + lock=Mutex.create(); + cond=Condition.create(); + capacity=n; + size=0; + } in + q + +let incr_size_ q = assert(q.size < q.capacity); q.size <- q.size + 1 +let decr_size_ q = assert(q.size > 0); q.size <- q.size - 1 + +let finally_ f x ~h = + try + let res = f x in + ignore (h ()); + res + with e -> + ignore (h()); + raise e + +let with_lock_ q f = + Mutex.lock q.lock; + finally_ f () ~h:(fun () -> Mutex.unlock q.lock) + +let push q x = + with_lock_ q + (fun () -> + while q.size = q.capacity do + Condition.wait q.cond q.lock + done; + assert (q.size < q.capacity); + Queue.push x q.q; + (* if there are blocked receivers, awake one of them *) + incr_size_ q; + Condition.broadcast q.cond) + +let take q = + with_lock_ q + (fun () -> + while q.size = 0 do + Condition.wait q.cond q.lock + done; + let x = Queue.take q.q in + (* if there are blocked senders, awake one of them *) + decr_size_ q; + Condition.broadcast q.cond; + x) + +(*$R + let q = create 1 in + let t1 = CCThread.spawn (fun () -> push q 1; push q 2) in + let t2 = CCThread.spawn (fun () -> push q 3; push q 4) in + let l = CCLock.create [] in + let t3 = CCThread.spawn (fun () -> for i = 1 to 4 do + let x = take q in + CCLock.update l (fun l -> x :: l) + done) + in + Thread.join t1; Thread.join t2; Thread.join t3; + assert_equal [1;2;3;4] (List.sort Pervasives.compare (CCLock.get l)) +*) + +let push_list q l = + (* push elements until it's not possible. + Assumes the lock is acquired. *) + let rec push_ q l = match l with + | [] -> l + | _::_ when q.size = q.capacity -> l (* no room remaining *) + | x :: tl -> + Queue.push x q.q; + incr_size_ q; + push_ q tl + in + (* push chunks of [l] in [q] until [l] is empty *) + let rec aux q l = match l with + | [] -> () + | _::_ -> + let l = with_lock_ q + (fun () -> + while q.size = q.capacity do + Condition.wait q.cond q.lock + done; + let l = push_ q l in + Condition.broadcast q.cond; + l) + in + aux q l + in aux q l + +let take_list q n = + (* take at most [n] elements of [q] and prepend them to [acc] *) + let rec pop_ acc q n = + if n=0 || Queue.is_empty q.q then acc, n + else ( (* take next element *) + let x = Queue.take q.q in + decr_size_ q; + pop_ (x::acc) q (n-1) + ) + in + (* call [pop_] until [n] elements have been gathered *) + let rec aux acc q n = + if n=0 then List.rev acc + else + let acc, n = with_lock_ q + (fun () -> + while q.size = 0 do + Condition.wait q.cond q.lock + done; + let acc, n = pop_ acc q n in + Condition.broadcast q.cond; + acc, n + ) + in + aux acc q n + in + aux [] q n + +(*$R + let n = 1000 in + let lists = [| CCList.(1 -- n) ; CCList.(n+1 -- 2*n); CCList.(2*n+1 -- 3*n) |] in + let q = create 2 in + let senders = CCThread.Arr.spawn 3 + (fun i -> + if i=1 + then push_list q lists.(i) (* test push_list *) + else List.iter (push q) lists.(i) + ) + in + let res = CCLock.create [] in + let receivers = CCThread.Arr.spawn 3 + (fun i -> + if i=1 then + let l = take_list q n in + CCLock.update res (fun acc -> l @ acc) + else + for _j = 1 to n do + let x = take q in + CCLock.update res (fun acc -> x::acc) + done + ) + in + CCThread.Arr.join senders; CCThread.Arr.join receivers; + let l = CCLock.get res |> List.sort Pervasives.compare in + assert_equal CCList.(1 -- 3*n) l +*) + +let try_take q = + with_lock_ q + (fun () -> + if q.size = 0 then None + else ( + decr_size_ q; + Some (Queue.take q.q) + )) + +let try_push q x = + with_lock_ q + (fun () -> + if q.size = q.capacity then false + else ( + incr_size_ q; + Queue.push x q.q; + Condition.signal q.cond; + true + )) + +let peek q = + with_lock_ q + (fun () -> + try Some (Queue.peek q.q) + with Queue.Empty -> None) + +let size q = with_lock_ q (fun () -> q.size) + +let capacity q = q.capacity diff --git a/src/threads/CCBlockingQueue.mli b/src/threads/CCBlockingQueue.mli new file mode 100644 index 00000000..fabd441d --- /dev/null +++ b/src/threads/CCBlockingQueue.mli @@ -0,0 +1,50 @@ + +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Blocking Queue} + + This queue has a limited size. Pushing a value on the queue when it + is full will block. + + @since NEXT_RELEASE *) + +type 'a t +(** Safe-thread queue for values of type ['a] *) + +val create : int -> 'a t +(** Create a new queue of size [n]. Using [n=max_int] amounts to using + an infinite queue (2^61 items is a lot to fit in memory); using [n=1] + amounts to using a box with 0 or 1 elements inside. + @raise Invalid_argument if [n < 1] *) + +val push : 'a t -> 'a -> unit +(** [push q x] pushes [x] into [q], blocking if the queue is full *) + +val take : 'a t -> 'a +(** Take the first element, blocking if needed *) + +val push_list : 'a t -> 'a list -> unit +(** Push items of the list, one by one *) + +val take_list : 'a t -> int -> 'a list +(** [take_list n q] takes [n] elements out of [q] *) + +val try_take : 'a t -> 'a option +(** Take the first element if the queue is not empty, return [None] + otherwise *) + +val try_push : 'a t -> 'a -> bool +(** [try_push q x] pushes [x] into [q] if [q] is not full, in which + case it returns [true]. + If it fails because [q] is full, it returns [false] *) + +val peek : 'a t -> 'a option +(** [peek q] returns [Some x] if [x] is the first element of [q], + otherwise it returns [None] *) + +val size : _ t -> int +(** Number of elements currently in the queue *) + +val capacity : _ t -> int +(** Number of values the queue can hold *) + diff --git a/src/threads/CCThread.ml b/src/threads/CCThread.ml index 95f3ead7..eb274097 100644 --- a/src/threads/CCThread.ml +++ b/src/threads/CCThread.ml @@ -83,183 +83,3 @@ end Thread.join t1; Thread.join t2; assert_equal 2 (CCLock.get res) *) - -module Queue = struct - type 'a t = { - q : 'a Queue.t; - lock : Mutex.t; - cond : Condition.t; - capacity : int; - mutable size : int; - } - - let create n = - if n < 1 then invalid_arg "CCThread.Queue.create"; - let q = { - q=Queue.create(); - lock=Mutex.create(); - cond=Condition.create(); - capacity=n; - size=0; - } in - q - - let incr_size_ q = assert(q.size < q.capacity); q.size <- q.size + 1 - let decr_size_ q = assert(q.size > 0); q.size <- q.size - 1 - - let with_lock_ q f = - Mutex.lock q.lock; - finally_ f () ~h:(fun () -> Mutex.unlock q.lock) - - let push q x = - with_lock_ q - (fun () -> - while q.size = q.capacity do - Condition.wait q.cond q.lock - done; - assert (q.size < q.capacity); - Queue.push x q.q; - (* if there are blocked receivers, awake one of them *) - incr_size_ q; - Condition.broadcast q.cond) - - let take q = - with_lock_ q - (fun () -> - while q.size = 0 do - Condition.wait q.cond q.lock - done; - let x = Queue.take q.q in - (* if there are blocked senders, awake one of them *) - decr_size_ q; - Condition.broadcast q.cond; - x) - - (*$R - let q = Queue.create 1 in - let t1 = spawn (fun () -> Queue.push q 1; Queue.push q 2) in - let t2 = spawn (fun () -> Queue.push q 3; Queue.push q 4) in - let l = CCLock.create [] in - let t3 = spawn (fun () -> for i = 1 to 4 do - let x = Queue.take q in - CCLock.update l (fun l -> x :: l) - done) - in - Thread.join t1; Thread.join t2; Thread.join t3; - assert_equal [1;2;3;4] (List.sort Pervasives.compare (CCLock.get l)) - *) - - let push_list q l = - let is_empty_ = function [] -> true | _::_ -> false in - (* push elements until it's not possible *) - let rec push_ q l = match l with - | [] -> l - | _::_ when q.size = q.capacity -> l (* no room remaining *) - | x :: tl -> - Queue.push x q.q; - incr_size_ q; - push_ q tl - in - (* push chunks of [l] in [q] until [l] is empty *) - let rec aux q l = - if not (is_empty_ l) - then - let l = with_lock_ q - (fun () -> - while q.size = q.capacity do - Condition.wait q.cond q.lock - done; - let l = push_ q l in - Condition.broadcast q.cond; - l) - in - aux q l - in aux q l - - let take_list q n = - (* take at most [n] elements of [q] and prepend them to [acc] *) - let rec pop_ acc q n = - if n=0 || Queue.is_empty q.q then acc, n - else ( (* take next element *) - let x = Queue.take q.q in - decr_size_ q; - pop_ (x::acc) q (n-1) - ) - in - (* call [pop_] until [n] elements have been gathered *) - let rec aux acc q n = - if n=0 then List.rev acc - else - let acc, n = with_lock_ q - (fun () -> - while q.size = 0 do - Condition.wait q.cond q.lock - done; - let acc, n = pop_ acc q n in - Condition.broadcast q.cond; - acc, n - ) - in - aux acc q n - in - aux [] q n - - (*$R - let n = 1000 in - let lists = [| CCList.(1 -- n) ; CCList.(n+1 -- 2*n); CCList.(2*n+1 -- 3*n) |] in - let q = Queue.create 2 in - let senders = Arr.spawn 3 - (fun i -> - if i=1 - then Queue.push_list q lists.(i) (* test push_list *) - else List.iter (Queue.push q) lists.(i) - ) - in - let res = CCLock.create [] in - let receivers = Arr.spawn 3 - (fun i -> - if i=1 then - let l = Queue.take_list q n in - CCLock.update res (fun acc -> l @ acc) - else - for _j = 1 to n do - let x = Queue.take q in - CCLock.update res (fun acc -> x::acc) - done - ) - in - Arr.join senders; Arr.join receivers; - let l = CCLock.get res |> List.sort Pervasives.compare in - assert_equal CCList.(1 -- 3*n) l - *) - - let try_take q = - with_lock_ q - (fun () -> - if q.size > 0 - then ( - decr_size_ q; - Some (Queue.take q.q) - ) else None - ) - - let try_push q x = - with_lock_ q - (fun () -> - if q.size < q.capacity - then ( - incr_size_ q; - Queue.push x q.q; - Condition.signal q.cond; - true - ) else false - ) - - let peek q = - with_lock_ q - (fun () -> try Some (Queue.peek q.q) with Queue.Empty -> None) - - let size q = with_lock_ q (fun () -> q.size) - - let capacity q = q.capacity -end diff --git a/src/threads/CCThread.mli b/src/threads/CCThread.mli index 1ea3ff8a..d33f8fd4 100644 --- a/src/threads/CCThread.mli +++ b/src/threads/CCThread.mli @@ -56,48 +56,3 @@ module Barrier : sig was not called since. In other words, [activated b = true] means [wait b] will not block. *) end - -(** {2 Blocking Queue} - - This queue has a limited size. Pushing a value on the queue when it - is full will block *) -module Queue : sig - type 'a t - (** Safe-thread queue for values of type ['a] *) - - val create : int -> 'a t - (** Create a new queue of size [n]. Using [n=max_int] amounts to using - an infinite queue (2^61 items is a lot to fit in memory). - @raise Invalid_argument if [n < 1] *) - - val push : 'a t -> 'a -> unit - (** [push q x] pushes [x] into [q], blocking if the queue is full *) - - val take : 'a t -> 'a - (** Take the first element, blocking if needed *) - - val push_list : 'a t -> 'a list -> unit - (** Push items of the list, one by one *) - - val take_list : 'a t -> int -> 'a list - (** [take_list n q] takes [n] elements out of [q] *) - - val try_take : 'a t -> 'a option - (** Take the first element if the queue is not empty, return [None] - otherwise *) - - val try_push : 'a t -> 'a -> bool - (** [try_push q x] pushes [x] into [q] if [q] is not full, in which - case it returns [true]. - If it fails because [q] is full, it returns [false] *) - - val peek : 'a t -> 'a option - (** [peek q] returns [Some x] if [x] is the first element of [q], - otherwise it returns [None] *) - - val size : _ t -> int - (** Number of elements currently in the queue *) - - val capacity : _ t -> int - (** Number of values the queue can hold *) -end