mirror of
https://github.com/c-cube/ocaml-containers.git
synced 2025-12-06 03:05:28 -05:00
move CCThread.Queue into CCBlockingQueue
- fix the module - fix benchs
This commit is contained in:
parent
02a9639d02
commit
9097cb11ab
7 changed files with 244 additions and 227 deletions
2
_oasis
2
_oasis
|
|
@ -114,7 +114,7 @@ Library "containers_bigarray"
|
||||||
|
|
||||||
Library "containers_thread"
|
Library "containers_thread"
|
||||||
Path: src/threads/
|
Path: src/threads/
|
||||||
Modules: CCFuture, CCLock, CCSemaphore, CCThread
|
Modules: CCFuture, CCLock, CCSemaphore, CCThread, CCBlockingQueue
|
||||||
FindlibName: thread
|
FindlibName: thread
|
||||||
FindlibParent: containers
|
FindlibParent: containers
|
||||||
Build$: flag(thread)
|
Build$: flag(thread)
|
||||||
|
|
|
||||||
|
|
@ -954,7 +954,7 @@ module Deque = struct
|
||||||
end
|
end
|
||||||
|
|
||||||
module Thread = struct
|
module Thread = struct
|
||||||
module Q = CCThread.Queue
|
module Q = CCBlockingQueue
|
||||||
|
|
||||||
module type TAKE_PUSH = sig
|
module type TAKE_PUSH = sig
|
||||||
val take : 'a Q.t -> 'a
|
val take : 'a Q.t -> 'a
|
||||||
|
|
|
||||||
|
|
@ -148,6 +148,7 @@ Moved to its own repository
|
||||||
{4 Others}
|
{4 Others}
|
||||||
|
|
||||||
{!modules:
|
{!modules:
|
||||||
|
CCBlockingQueue
|
||||||
CCFuture
|
CCFuture
|
||||||
CCLock
|
CCLock
|
||||||
CCSemaphore
|
CCSemaphore
|
||||||
|
|
|
||||||
191
src/threads/CCBlockingQueue.ml
Normal file
191
src/threads/CCBlockingQueue.ml
Normal file
|
|
@ -0,0 +1,191 @@
|
||||||
|
|
||||||
|
(* This file is free software, part of containers. See file "license" for more details. *)
|
||||||
|
|
||||||
|
(** {1 Blocking Queue} *)
|
||||||
|
|
||||||
|
type 'a t = {
|
||||||
|
q : 'a Queue.t;
|
||||||
|
lock : Mutex.t;
|
||||||
|
cond : Condition.t;
|
||||||
|
capacity : int;
|
||||||
|
mutable size : int;
|
||||||
|
}
|
||||||
|
|
||||||
|
let create n =
|
||||||
|
if n < 1 then invalid_arg "BloquingQueue.create";
|
||||||
|
let q = {
|
||||||
|
q=Queue.create();
|
||||||
|
lock=Mutex.create();
|
||||||
|
cond=Condition.create();
|
||||||
|
capacity=n;
|
||||||
|
size=0;
|
||||||
|
} in
|
||||||
|
q
|
||||||
|
|
||||||
|
let incr_size_ q = assert(q.size < q.capacity); q.size <- q.size + 1
|
||||||
|
let decr_size_ q = assert(q.size > 0); q.size <- q.size - 1
|
||||||
|
|
||||||
|
let finally_ f x ~h =
|
||||||
|
try
|
||||||
|
let res = f x in
|
||||||
|
ignore (h ());
|
||||||
|
res
|
||||||
|
with e ->
|
||||||
|
ignore (h());
|
||||||
|
raise e
|
||||||
|
|
||||||
|
let with_lock_ q f =
|
||||||
|
Mutex.lock q.lock;
|
||||||
|
finally_ f () ~h:(fun () -> Mutex.unlock q.lock)
|
||||||
|
|
||||||
|
let push q x =
|
||||||
|
with_lock_ q
|
||||||
|
(fun () ->
|
||||||
|
while q.size = q.capacity do
|
||||||
|
Condition.wait q.cond q.lock
|
||||||
|
done;
|
||||||
|
assert (q.size < q.capacity);
|
||||||
|
Queue.push x q.q;
|
||||||
|
(* if there are blocked receivers, awake one of them *)
|
||||||
|
incr_size_ q;
|
||||||
|
Condition.broadcast q.cond)
|
||||||
|
|
||||||
|
let take q =
|
||||||
|
with_lock_ q
|
||||||
|
(fun () ->
|
||||||
|
while q.size = 0 do
|
||||||
|
Condition.wait q.cond q.lock
|
||||||
|
done;
|
||||||
|
let x = Queue.take q.q in
|
||||||
|
(* if there are blocked senders, awake one of them *)
|
||||||
|
decr_size_ q;
|
||||||
|
Condition.broadcast q.cond;
|
||||||
|
x)
|
||||||
|
|
||||||
|
(*$R
|
||||||
|
let q = create 1 in
|
||||||
|
let t1 = CCThread.spawn (fun () -> push q 1; push q 2) in
|
||||||
|
let t2 = CCThread.spawn (fun () -> push q 3; push q 4) in
|
||||||
|
let l = CCLock.create [] in
|
||||||
|
let t3 = CCThread.spawn (fun () -> for i = 1 to 4 do
|
||||||
|
let x = take q in
|
||||||
|
CCLock.update l (fun l -> x :: l)
|
||||||
|
done)
|
||||||
|
in
|
||||||
|
Thread.join t1; Thread.join t2; Thread.join t3;
|
||||||
|
assert_equal [1;2;3;4] (List.sort Pervasives.compare (CCLock.get l))
|
||||||
|
*)
|
||||||
|
|
||||||
|
let push_list q l =
|
||||||
|
(* push elements until it's not possible.
|
||||||
|
Assumes the lock is acquired. *)
|
||||||
|
let rec push_ q l = match l with
|
||||||
|
| [] -> l
|
||||||
|
| _::_ when q.size = q.capacity -> l (* no room remaining *)
|
||||||
|
| x :: tl ->
|
||||||
|
Queue.push x q.q;
|
||||||
|
incr_size_ q;
|
||||||
|
push_ q tl
|
||||||
|
in
|
||||||
|
(* push chunks of [l] in [q] until [l] is empty *)
|
||||||
|
let rec aux q l = match l with
|
||||||
|
| [] -> ()
|
||||||
|
| _::_ ->
|
||||||
|
let l = with_lock_ q
|
||||||
|
(fun () ->
|
||||||
|
while q.size = q.capacity do
|
||||||
|
Condition.wait q.cond q.lock
|
||||||
|
done;
|
||||||
|
let l = push_ q l in
|
||||||
|
Condition.broadcast q.cond;
|
||||||
|
l)
|
||||||
|
in
|
||||||
|
aux q l
|
||||||
|
in aux q l
|
||||||
|
|
||||||
|
let take_list q n =
|
||||||
|
(* take at most [n] elements of [q] and prepend them to [acc] *)
|
||||||
|
let rec pop_ acc q n =
|
||||||
|
if n=0 || Queue.is_empty q.q then acc, n
|
||||||
|
else ( (* take next element *)
|
||||||
|
let x = Queue.take q.q in
|
||||||
|
decr_size_ q;
|
||||||
|
pop_ (x::acc) q (n-1)
|
||||||
|
)
|
||||||
|
in
|
||||||
|
(* call [pop_] until [n] elements have been gathered *)
|
||||||
|
let rec aux acc q n =
|
||||||
|
if n=0 then List.rev acc
|
||||||
|
else
|
||||||
|
let acc, n = with_lock_ q
|
||||||
|
(fun () ->
|
||||||
|
while q.size = 0 do
|
||||||
|
Condition.wait q.cond q.lock
|
||||||
|
done;
|
||||||
|
let acc, n = pop_ acc q n in
|
||||||
|
Condition.broadcast q.cond;
|
||||||
|
acc, n
|
||||||
|
)
|
||||||
|
in
|
||||||
|
aux acc q n
|
||||||
|
in
|
||||||
|
aux [] q n
|
||||||
|
|
||||||
|
(*$R
|
||||||
|
let n = 1000 in
|
||||||
|
let lists = [| CCList.(1 -- n) ; CCList.(n+1 -- 2*n); CCList.(2*n+1 -- 3*n) |] in
|
||||||
|
let q = create 2 in
|
||||||
|
let senders = CCThread.Arr.spawn 3
|
||||||
|
(fun i ->
|
||||||
|
if i=1
|
||||||
|
then push_list q lists.(i) (* test push_list *)
|
||||||
|
else List.iter (push q) lists.(i)
|
||||||
|
)
|
||||||
|
in
|
||||||
|
let res = CCLock.create [] in
|
||||||
|
let receivers = CCThread.Arr.spawn 3
|
||||||
|
(fun i ->
|
||||||
|
if i=1 then
|
||||||
|
let l = take_list q n in
|
||||||
|
CCLock.update res (fun acc -> l @ acc)
|
||||||
|
else
|
||||||
|
for _j = 1 to n do
|
||||||
|
let x = take q in
|
||||||
|
CCLock.update res (fun acc -> x::acc)
|
||||||
|
done
|
||||||
|
)
|
||||||
|
in
|
||||||
|
CCThread.Arr.join senders; CCThread.Arr.join receivers;
|
||||||
|
let l = CCLock.get res |> List.sort Pervasives.compare in
|
||||||
|
assert_equal CCList.(1 -- 3*n) l
|
||||||
|
*)
|
||||||
|
|
||||||
|
let try_take q =
|
||||||
|
with_lock_ q
|
||||||
|
(fun () ->
|
||||||
|
if q.size = 0 then None
|
||||||
|
else (
|
||||||
|
decr_size_ q;
|
||||||
|
Some (Queue.take q.q)
|
||||||
|
))
|
||||||
|
|
||||||
|
let try_push q x =
|
||||||
|
with_lock_ q
|
||||||
|
(fun () ->
|
||||||
|
if q.size = q.capacity then false
|
||||||
|
else (
|
||||||
|
incr_size_ q;
|
||||||
|
Queue.push x q.q;
|
||||||
|
Condition.signal q.cond;
|
||||||
|
true
|
||||||
|
))
|
||||||
|
|
||||||
|
let peek q =
|
||||||
|
with_lock_ q
|
||||||
|
(fun () ->
|
||||||
|
try Some (Queue.peek q.q)
|
||||||
|
with Queue.Empty -> None)
|
||||||
|
|
||||||
|
let size q = with_lock_ q (fun () -> q.size)
|
||||||
|
|
||||||
|
let capacity q = q.capacity
|
||||||
50
src/threads/CCBlockingQueue.mli
Normal file
50
src/threads/CCBlockingQueue.mli
Normal file
|
|
@ -0,0 +1,50 @@
|
||||||
|
|
||||||
|
(* This file is free software, part of containers. See file "license" for more details. *)
|
||||||
|
|
||||||
|
(** {1 Blocking Queue}
|
||||||
|
|
||||||
|
This queue has a limited size. Pushing a value on the queue when it
|
||||||
|
is full will block.
|
||||||
|
|
||||||
|
@since NEXT_RELEASE *)
|
||||||
|
|
||||||
|
type 'a t
|
||||||
|
(** Safe-thread queue for values of type ['a] *)
|
||||||
|
|
||||||
|
val create : int -> 'a t
|
||||||
|
(** Create a new queue of size [n]. Using [n=max_int] amounts to using
|
||||||
|
an infinite queue (2^61 items is a lot to fit in memory); using [n=1]
|
||||||
|
amounts to using a box with 0 or 1 elements inside.
|
||||||
|
@raise Invalid_argument if [n < 1] *)
|
||||||
|
|
||||||
|
val push : 'a t -> 'a -> unit
|
||||||
|
(** [push q x] pushes [x] into [q], blocking if the queue is full *)
|
||||||
|
|
||||||
|
val take : 'a t -> 'a
|
||||||
|
(** Take the first element, blocking if needed *)
|
||||||
|
|
||||||
|
val push_list : 'a t -> 'a list -> unit
|
||||||
|
(** Push items of the list, one by one *)
|
||||||
|
|
||||||
|
val take_list : 'a t -> int -> 'a list
|
||||||
|
(** [take_list n q] takes [n] elements out of [q] *)
|
||||||
|
|
||||||
|
val try_take : 'a t -> 'a option
|
||||||
|
(** Take the first element if the queue is not empty, return [None]
|
||||||
|
otherwise *)
|
||||||
|
|
||||||
|
val try_push : 'a t -> 'a -> bool
|
||||||
|
(** [try_push q x] pushes [x] into [q] if [q] is not full, in which
|
||||||
|
case it returns [true].
|
||||||
|
If it fails because [q] is full, it returns [false] *)
|
||||||
|
|
||||||
|
val peek : 'a t -> 'a option
|
||||||
|
(** [peek q] returns [Some x] if [x] is the first element of [q],
|
||||||
|
otherwise it returns [None] *)
|
||||||
|
|
||||||
|
val size : _ t -> int
|
||||||
|
(** Number of elements currently in the queue *)
|
||||||
|
|
||||||
|
val capacity : _ t -> int
|
||||||
|
(** Number of values the queue can hold *)
|
||||||
|
|
||||||
|
|
@ -83,183 +83,3 @@ end
|
||||||
Thread.join t1; Thread.join t2;
|
Thread.join t1; Thread.join t2;
|
||||||
assert_equal 2 (CCLock.get res)
|
assert_equal 2 (CCLock.get res)
|
||||||
*)
|
*)
|
||||||
|
|
||||||
module Queue = struct
|
|
||||||
type 'a t = {
|
|
||||||
q : 'a Queue.t;
|
|
||||||
lock : Mutex.t;
|
|
||||||
cond : Condition.t;
|
|
||||||
capacity : int;
|
|
||||||
mutable size : int;
|
|
||||||
}
|
|
||||||
|
|
||||||
let create n =
|
|
||||||
if n < 1 then invalid_arg "CCThread.Queue.create";
|
|
||||||
let q = {
|
|
||||||
q=Queue.create();
|
|
||||||
lock=Mutex.create();
|
|
||||||
cond=Condition.create();
|
|
||||||
capacity=n;
|
|
||||||
size=0;
|
|
||||||
} in
|
|
||||||
q
|
|
||||||
|
|
||||||
let incr_size_ q = assert(q.size < q.capacity); q.size <- q.size + 1
|
|
||||||
let decr_size_ q = assert(q.size > 0); q.size <- q.size - 1
|
|
||||||
|
|
||||||
let with_lock_ q f =
|
|
||||||
Mutex.lock q.lock;
|
|
||||||
finally_ f () ~h:(fun () -> Mutex.unlock q.lock)
|
|
||||||
|
|
||||||
let push q x =
|
|
||||||
with_lock_ q
|
|
||||||
(fun () ->
|
|
||||||
while q.size = q.capacity do
|
|
||||||
Condition.wait q.cond q.lock
|
|
||||||
done;
|
|
||||||
assert (q.size < q.capacity);
|
|
||||||
Queue.push x q.q;
|
|
||||||
(* if there are blocked receivers, awake one of them *)
|
|
||||||
incr_size_ q;
|
|
||||||
Condition.broadcast q.cond)
|
|
||||||
|
|
||||||
let take q =
|
|
||||||
with_lock_ q
|
|
||||||
(fun () ->
|
|
||||||
while q.size = 0 do
|
|
||||||
Condition.wait q.cond q.lock
|
|
||||||
done;
|
|
||||||
let x = Queue.take q.q in
|
|
||||||
(* if there are blocked senders, awake one of them *)
|
|
||||||
decr_size_ q;
|
|
||||||
Condition.broadcast q.cond;
|
|
||||||
x)
|
|
||||||
|
|
||||||
(*$R
|
|
||||||
let q = Queue.create 1 in
|
|
||||||
let t1 = spawn (fun () -> Queue.push q 1; Queue.push q 2) in
|
|
||||||
let t2 = spawn (fun () -> Queue.push q 3; Queue.push q 4) in
|
|
||||||
let l = CCLock.create [] in
|
|
||||||
let t3 = spawn (fun () -> for i = 1 to 4 do
|
|
||||||
let x = Queue.take q in
|
|
||||||
CCLock.update l (fun l -> x :: l)
|
|
||||||
done)
|
|
||||||
in
|
|
||||||
Thread.join t1; Thread.join t2; Thread.join t3;
|
|
||||||
assert_equal [1;2;3;4] (List.sort Pervasives.compare (CCLock.get l))
|
|
||||||
*)
|
|
||||||
|
|
||||||
let push_list q l =
|
|
||||||
let is_empty_ = function [] -> true | _::_ -> false in
|
|
||||||
(* push elements until it's not possible *)
|
|
||||||
let rec push_ q l = match l with
|
|
||||||
| [] -> l
|
|
||||||
| _::_ when q.size = q.capacity -> l (* no room remaining *)
|
|
||||||
| x :: tl ->
|
|
||||||
Queue.push x q.q;
|
|
||||||
incr_size_ q;
|
|
||||||
push_ q tl
|
|
||||||
in
|
|
||||||
(* push chunks of [l] in [q] until [l] is empty *)
|
|
||||||
let rec aux q l =
|
|
||||||
if not (is_empty_ l)
|
|
||||||
then
|
|
||||||
let l = with_lock_ q
|
|
||||||
(fun () ->
|
|
||||||
while q.size = q.capacity do
|
|
||||||
Condition.wait q.cond q.lock
|
|
||||||
done;
|
|
||||||
let l = push_ q l in
|
|
||||||
Condition.broadcast q.cond;
|
|
||||||
l)
|
|
||||||
in
|
|
||||||
aux q l
|
|
||||||
in aux q l
|
|
||||||
|
|
||||||
let take_list q n =
|
|
||||||
(* take at most [n] elements of [q] and prepend them to [acc] *)
|
|
||||||
let rec pop_ acc q n =
|
|
||||||
if n=0 || Queue.is_empty q.q then acc, n
|
|
||||||
else ( (* take next element *)
|
|
||||||
let x = Queue.take q.q in
|
|
||||||
decr_size_ q;
|
|
||||||
pop_ (x::acc) q (n-1)
|
|
||||||
)
|
|
||||||
in
|
|
||||||
(* call [pop_] until [n] elements have been gathered *)
|
|
||||||
let rec aux acc q n =
|
|
||||||
if n=0 then List.rev acc
|
|
||||||
else
|
|
||||||
let acc, n = with_lock_ q
|
|
||||||
(fun () ->
|
|
||||||
while q.size = 0 do
|
|
||||||
Condition.wait q.cond q.lock
|
|
||||||
done;
|
|
||||||
let acc, n = pop_ acc q n in
|
|
||||||
Condition.broadcast q.cond;
|
|
||||||
acc, n
|
|
||||||
)
|
|
||||||
in
|
|
||||||
aux acc q n
|
|
||||||
in
|
|
||||||
aux [] q n
|
|
||||||
|
|
||||||
(*$R
|
|
||||||
let n = 1000 in
|
|
||||||
let lists = [| CCList.(1 -- n) ; CCList.(n+1 -- 2*n); CCList.(2*n+1 -- 3*n) |] in
|
|
||||||
let q = Queue.create 2 in
|
|
||||||
let senders = Arr.spawn 3
|
|
||||||
(fun i ->
|
|
||||||
if i=1
|
|
||||||
then Queue.push_list q lists.(i) (* test push_list *)
|
|
||||||
else List.iter (Queue.push q) lists.(i)
|
|
||||||
)
|
|
||||||
in
|
|
||||||
let res = CCLock.create [] in
|
|
||||||
let receivers = Arr.spawn 3
|
|
||||||
(fun i ->
|
|
||||||
if i=1 then
|
|
||||||
let l = Queue.take_list q n in
|
|
||||||
CCLock.update res (fun acc -> l @ acc)
|
|
||||||
else
|
|
||||||
for _j = 1 to n do
|
|
||||||
let x = Queue.take q in
|
|
||||||
CCLock.update res (fun acc -> x::acc)
|
|
||||||
done
|
|
||||||
)
|
|
||||||
in
|
|
||||||
Arr.join senders; Arr.join receivers;
|
|
||||||
let l = CCLock.get res |> List.sort Pervasives.compare in
|
|
||||||
assert_equal CCList.(1 -- 3*n) l
|
|
||||||
*)
|
|
||||||
|
|
||||||
let try_take q =
|
|
||||||
with_lock_ q
|
|
||||||
(fun () ->
|
|
||||||
if q.size > 0
|
|
||||||
then (
|
|
||||||
decr_size_ q;
|
|
||||||
Some (Queue.take q.q)
|
|
||||||
) else None
|
|
||||||
)
|
|
||||||
|
|
||||||
let try_push q x =
|
|
||||||
with_lock_ q
|
|
||||||
(fun () ->
|
|
||||||
if q.size < q.capacity
|
|
||||||
then (
|
|
||||||
incr_size_ q;
|
|
||||||
Queue.push x q.q;
|
|
||||||
Condition.signal q.cond;
|
|
||||||
true
|
|
||||||
) else false
|
|
||||||
)
|
|
||||||
|
|
||||||
let peek q =
|
|
||||||
with_lock_ q
|
|
||||||
(fun () -> try Some (Queue.peek q.q) with Queue.Empty -> None)
|
|
||||||
|
|
||||||
let size q = with_lock_ q (fun () -> q.size)
|
|
||||||
|
|
||||||
let capacity q = q.capacity
|
|
||||||
end
|
|
||||||
|
|
|
||||||
|
|
@ -56,48 +56,3 @@ module Barrier : sig
|
||||||
was not called since. In other words, [activated b = true] means
|
was not called since. In other words, [activated b = true] means
|
||||||
[wait b] will not block. *)
|
[wait b] will not block. *)
|
||||||
end
|
end
|
||||||
|
|
||||||
(** {2 Blocking Queue}
|
|
||||||
|
|
||||||
This queue has a limited size. Pushing a value on the queue when it
|
|
||||||
is full will block *)
|
|
||||||
module Queue : sig
|
|
||||||
type 'a t
|
|
||||||
(** Safe-thread queue for values of type ['a] *)
|
|
||||||
|
|
||||||
val create : int -> 'a t
|
|
||||||
(** Create a new queue of size [n]. Using [n=max_int] amounts to using
|
|
||||||
an infinite queue (2^61 items is a lot to fit in memory).
|
|
||||||
@raise Invalid_argument if [n < 1] *)
|
|
||||||
|
|
||||||
val push : 'a t -> 'a -> unit
|
|
||||||
(** [push q x] pushes [x] into [q], blocking if the queue is full *)
|
|
||||||
|
|
||||||
val take : 'a t -> 'a
|
|
||||||
(** Take the first element, blocking if needed *)
|
|
||||||
|
|
||||||
val push_list : 'a t -> 'a list -> unit
|
|
||||||
(** Push items of the list, one by one *)
|
|
||||||
|
|
||||||
val take_list : 'a t -> int -> 'a list
|
|
||||||
(** [take_list n q] takes [n] elements out of [q] *)
|
|
||||||
|
|
||||||
val try_take : 'a t -> 'a option
|
|
||||||
(** Take the first element if the queue is not empty, return [None]
|
|
||||||
otherwise *)
|
|
||||||
|
|
||||||
val try_push : 'a t -> 'a -> bool
|
|
||||||
(** [try_push q x] pushes [x] into [q] if [q] is not full, in which
|
|
||||||
case it returns [true].
|
|
||||||
If it fails because [q] is full, it returns [false] *)
|
|
||||||
|
|
||||||
val peek : 'a t -> 'a option
|
|
||||||
(** [peek q] returns [Some x] if [x] is the first element of [q],
|
|
||||||
otherwise it returns [None] *)
|
|
||||||
|
|
||||||
val size : _ t -> int
|
|
||||||
(** Number of elements currently in the queue *)
|
|
||||||
|
|
||||||
val capacity : _ t -> int
|
|
||||||
(** Number of values the queue can hold *)
|
|
||||||
end
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue