mirror of
https://github.com/c-cube/ocaml-containers.git
synced 2025-12-06 11:15:31 -05:00
276 lines
6.4 KiB
OCaml
276 lines
6.4 KiB
OCaml
(* This file is free software, part of containers. See file "license" for more details. *)
|
|
|
|
(** {1 Threads} *)
|
|
|
|
type t = Thread.t
|
|
|
|
let spawn f = Thread.create f ()
|
|
|
|
let spawn1 f x = Thread.create f x
|
|
|
|
let spawn2 f x y = Thread.create (fun () -> f x y) ()
|
|
|
|
let detach f = ignore (Thread.create f ())
|
|
|
|
module Arr = struct
|
|
let spawn n f =
|
|
Array.init n (fun i -> Thread.create f i)
|
|
|
|
let join a = Array.iter Thread.join a
|
|
end
|
|
|
|
(*$R
|
|
let l = CCLock.create 0 in
|
|
let a = Arr.spawn 101 (fun i -> CCLock.update l ((+) i)) in
|
|
Arr.join a;
|
|
let n = Sequence.(1 -- 100 |> fold (+) 0) in
|
|
assert_equal ~printer:CCInt.to_string n (CCLock.get l)
|
|
*)
|
|
|
|
module Barrier = struct
|
|
type t = {
|
|
lock: Mutex.t;
|
|
cond: Condition.t;
|
|
mutable activated: bool;
|
|
}
|
|
|
|
let create () = {
|
|
lock=Mutex.create();
|
|
cond=Condition.create();
|
|
activated=false;
|
|
}
|
|
|
|
let with_lock_ b f =
|
|
Mutex.lock b.lock;
|
|
try
|
|
let x = f () in
|
|
Mutex.unlock b.lock;
|
|
x
|
|
with e ->
|
|
Mutex.unlock b.lock;
|
|
raise e
|
|
|
|
let reset b = with_lock_ b (fun () -> b.activated <- false)
|
|
|
|
let wait b =
|
|
with_lock_ b
|
|
(fun () ->
|
|
while not b.activated do
|
|
Condition.wait b.cond b.lock
|
|
done
|
|
)
|
|
|
|
let activate b =
|
|
with_lock_ b
|
|
(fun () ->
|
|
if not b.activated then (
|
|
b.activated <- true;
|
|
Condition.broadcast b.cond
|
|
)
|
|
)
|
|
|
|
let activated b = with_lock_ b (fun () -> b.activated)
|
|
end
|
|
|
|
(*$R
|
|
let b = Barrier.create () in
|
|
let res = CCLock.create 0 in
|
|
let t1 = spawn (fun _ -> Barrier.wait b; CCLock.incr res)
|
|
and t2 = spawn (fun _ -> Barrier.wait b; CCLock.incr res) in
|
|
Thread.delay 0.2;
|
|
assert_equal 0 (CCLock.get res);
|
|
Barrier.activate b;
|
|
Thread.join t1; Thread.join t2;
|
|
assert_equal 2 (CCLock.get res)
|
|
*)
|
|
|
|
module Queue = struct
|
|
type 'a t = {
|
|
q : 'a Queue.t;
|
|
lock : Mutex.t;
|
|
cond : Condition.t;
|
|
capacity : int;
|
|
mutable size : int;
|
|
}
|
|
|
|
let create n =
|
|
if n < 1 then invalid_arg "CCThread.Queue.create";
|
|
let q = {
|
|
q=Queue.create();
|
|
lock=Mutex.create();
|
|
cond=Condition.create();
|
|
capacity=n;
|
|
size=0;
|
|
} in
|
|
q
|
|
|
|
let incr_size_ q = assert(q.size < q.capacity); q.size <- q.size + 1
|
|
let decr_size_ q = assert(q.size > 0); q.size <- q.size - 1
|
|
|
|
let with_lock_ q f =
|
|
Mutex.lock q.lock;
|
|
try
|
|
let x = f () in
|
|
Mutex.unlock q.lock;
|
|
x
|
|
with e ->
|
|
Mutex.unlock q.lock;
|
|
raise e
|
|
|
|
let push q x =
|
|
with_lock_ q
|
|
(fun () ->
|
|
while q.size = q.capacity do
|
|
Condition.wait q.cond q.lock
|
|
done;
|
|
assert (q.size < q.capacity);
|
|
Queue.push x q.q;
|
|
(* if there are blocked receivers, awake one of them *)
|
|
incr_size_ q;
|
|
Condition.broadcast q.cond;
|
|
)
|
|
|
|
let take q =
|
|
with_lock_ q
|
|
(fun () ->
|
|
while q.size = 0 do
|
|
Condition.wait q.cond q.lock
|
|
done;
|
|
let x = Queue.take q.q in
|
|
(* if there are blocked senders, awake one of them *)
|
|
decr_size_ q;
|
|
Condition.broadcast q.cond;
|
|
x
|
|
)
|
|
|
|
(*$R
|
|
let q = Queue.create 1 in
|
|
let t1 = spawn (fun () -> Queue.push q 1; Queue.push q 2) in
|
|
let t2 = spawn (fun () -> Queue.push q 3; Queue.push q 4) in
|
|
let l = CCLock.create [] in
|
|
let t3 = spawn (fun () -> for i = 1 to 4 do
|
|
let x = Queue.take q in
|
|
CCLock.update l (fun l -> x :: l)
|
|
done)
|
|
in
|
|
Thread.join t1; Thread.join t2; Thread.join t3;
|
|
assert_equal [1;2;3;4] (List.sort Pervasives.compare (CCLock.get l))
|
|
*)
|
|
|
|
let push_list q l =
|
|
let is_empty_ = function [] -> true | _::_ -> false in
|
|
(* push elements until it's not possible *)
|
|
let rec push_ q l = match l with
|
|
| [] -> l
|
|
| _::_ when q.size = q.capacity -> l (* no room remaining *)
|
|
| x :: tl ->
|
|
Queue.push x q.q;
|
|
incr_size_ q;
|
|
push_ q tl
|
|
in
|
|
(* push chunks of [l] in [q] until [l] is empty *)
|
|
let rec aux q l =
|
|
if not (is_empty_ l)
|
|
then
|
|
let l = with_lock_ q
|
|
(fun () ->
|
|
while q.size = q.capacity do
|
|
Condition.wait q.cond q.lock
|
|
done;
|
|
let l = push_ q l in
|
|
Condition.broadcast q.cond;
|
|
l
|
|
)
|
|
in
|
|
aux q l
|
|
in aux q l
|
|
|
|
let take_list q n =
|
|
(* take at most [n] elements of [q] and prepend them to [acc] *)
|
|
let rec pop_ acc q n =
|
|
if n=0 || Queue.is_empty q.q then acc, n
|
|
else ( (* take next element *)
|
|
let x = Queue.take q.q in
|
|
decr_size_ q;
|
|
pop_ (x::acc) q (n-1)
|
|
)
|
|
in
|
|
(* call [pop_] until [n] elements have been gathered *)
|
|
let rec aux acc q n =
|
|
if n=0 then List.rev acc
|
|
else
|
|
let acc, n = with_lock_ q
|
|
(fun () ->
|
|
while q.size = 0 do
|
|
Condition.wait q.cond q.lock
|
|
done;
|
|
let acc, n = pop_ acc q n in
|
|
Condition.broadcast q.cond;
|
|
acc, n
|
|
)
|
|
in
|
|
aux acc q n
|
|
in
|
|
aux [] q n
|
|
|
|
(*$R
|
|
let n = 1000 in
|
|
let lists = [| CCList.(1 -- n) ; CCList.(n+1 -- 2*n); CCList.(2*n+1 -- 3*n) |] in
|
|
let q = Queue.create 2 in
|
|
let senders = Arr.spawn 3
|
|
(fun i ->
|
|
if i=1
|
|
then Queue.push_list q lists.(i) (* test push_list *)
|
|
else List.iter (Queue.push q) lists.(i)
|
|
)
|
|
in
|
|
let res = CCLock.create [] in
|
|
let receivers = Arr.spawn 3
|
|
(fun i ->
|
|
if i=1 then
|
|
let l = Queue.take_list q n in
|
|
CCLock.update res (fun acc -> l @ acc)
|
|
else
|
|
for _j = 1 to n do
|
|
let x = Queue.take q in
|
|
CCLock.update res (fun acc -> x::acc)
|
|
done
|
|
)
|
|
in
|
|
Arr.join senders; Arr.join receivers;
|
|
let l = CCLock.get res |> List.sort Pervasives.compare in
|
|
assert_equal CCList.(1 -- 3*n) l
|
|
*)
|
|
|
|
let try_take q =
|
|
with_lock_ q
|
|
(fun () ->
|
|
if q.size > 0
|
|
then (
|
|
decr_size_ q;
|
|
Some (Queue.take q.q)
|
|
) else None
|
|
)
|
|
|
|
let try_push q x =
|
|
with_lock_ q
|
|
(fun () ->
|
|
if q.size < q.capacity
|
|
then (
|
|
incr_size_ q;
|
|
Queue.push x q.q;
|
|
Condition.signal q.cond;
|
|
true
|
|
) else false
|
|
)
|
|
|
|
let peek q =
|
|
with_lock_ q
|
|
(fun () ->
|
|
try Some (Queue.peek q.q) with Queue.Empty -> None
|
|
)
|
|
|
|
let size q = with_lock_ q (fun () -> q.size)
|
|
|
|
let capacity q = q.capacity
|
|
end
|