mirror of
https://github.com/c-cube/ocaml-containers.git
synced 2025-12-06 03:05:28 -05:00
parent
cebee407ea
commit
4821772dcf
15 changed files with 1791 additions and 3 deletions
|
|
@ -181,6 +181,15 @@ Iterators:
|
|||
- `CCKTree`, an abstract lazy tree structure
|
||||
|
||||
|
||||
=== Thread
|
||||
|
||||
In the library `containers.thread`, for preemptive system threads:
|
||||
|
||||
- `CCFuture`, a set of tools for preemptive threading, including a thread pool,
|
||||
monadic futures, and MVars (concurrent boxes)
|
||||
- `CCLock`, values protected by locks
|
||||
- `CCSemaphore`, a simple implementation of semaphores
|
||||
- `CCThread` basic wrappers for `Thread`
|
||||
|
||||
=== Misc
|
||||
|
||||
|
|
|
|||
24
_oasis
24
_oasis
|
|
@ -21,10 +21,17 @@ Description:
|
|||
extend the stdlib (e.g. CCList provides safe map/fold_right/append, and
|
||||
additional functions on lists).
|
||||
|
||||
It also features optional libraries for dealing with strings, and
|
||||
helpers for unix and threads.
|
||||
|
||||
Flag "unix"
|
||||
Description: Build the containers.unix library (depends on Unix)
|
||||
Default: false
|
||||
|
||||
Flag "thread"
|
||||
Description: Build modules that depend on threads
|
||||
Default: true
|
||||
|
||||
Flag "bench"
|
||||
Description: Build and run benchmarks
|
||||
Default: true
|
||||
|
|
@ -73,6 +80,17 @@ Library "containers_iter"
|
|||
FindlibParent: containers
|
||||
FindlibName: iter
|
||||
|
||||
Library "containers_thread"
|
||||
Path: src/threads/
|
||||
Modules: CCPool, CCLock, CCSemaphore, CCThread, CCBlockingQueue,
|
||||
CCTimer
|
||||
FindlibName: thread
|
||||
FindlibParent: containers
|
||||
Build$: flag(thread)
|
||||
Install$: flag(thread)
|
||||
BuildDepends: containers, threads
|
||||
XMETARequires: containers, threads
|
||||
|
||||
Library "containers_top"
|
||||
Path: src/top/
|
||||
Modules: Containers_top
|
||||
|
|
@ -92,7 +110,7 @@ Document containers
|
|||
"-docflags '-colorize-code -short-functors -charset utf-8'"
|
||||
XOCamlbuildLibraries:
|
||||
containers, containers.iter, containers.data,
|
||||
containers.unix, containers.sexp
|
||||
containers.thread, containers.unix, containers.sexp
|
||||
|
||||
Executable run_benchs
|
||||
Path: benchs/
|
||||
|
|
@ -101,7 +119,7 @@ Executable run_benchs
|
|||
Build$: flag(bench)
|
||||
MainIs: run_benchs.ml
|
||||
BuildDepends: containers, qcheck,
|
||||
containers.data, containers.iter,
|
||||
containers.data, containers.iter, containers.thread,
|
||||
sequence, gen, benchmark
|
||||
|
||||
Executable run_bench_hash
|
||||
|
|
@ -121,7 +139,7 @@ Executable run_qtest
|
|||
MainIs: run_qtest.ml
|
||||
Build$: flag(tests) && flag(unix)
|
||||
BuildDepends: containers, containers.iter,
|
||||
containers.sexp, containers.unix,
|
||||
containers.sexp, containers.unix, containers.thread,
|
||||
containers.data,
|
||||
sequence, gen, unix, oUnit, qcheck
|
||||
|
||||
|
|
|
|||
|
|
@ -143,6 +143,22 @@ Moved to its own repository.
|
|||
|
||||
Moved to its own repository
|
||||
|
||||
{4 Thread Helpers}
|
||||
|
||||
{b findlib name}: containers.thread
|
||||
|
||||
Modules related to the use of [Thread].
|
||||
|
||||
{!modules:
|
||||
CCBlockingQueue
|
||||
CCLock
|
||||
CCPool
|
||||
CCSemaphore
|
||||
CCThread
|
||||
CCTimer
|
||||
}
|
||||
|
||||
|
||||
{2 Index}
|
||||
|
||||
{!indexlist}
|
||||
|
|
|
|||
191
src/threads/CCBlockingQueue.ml
Normal file
191
src/threads/CCBlockingQueue.ml
Normal file
|
|
@ -0,0 +1,191 @@
|
|||
|
||||
(* This file is free software, part of containers. See file "license" for more details. *)
|
||||
|
||||
(** {1 Blocking Queue} *)
|
||||
|
||||
type 'a t = {
|
||||
q : 'a Queue.t;
|
||||
lock : Mutex.t;
|
||||
cond : Condition.t;
|
||||
capacity : int;
|
||||
mutable size : int;
|
||||
}
|
||||
|
||||
let create n =
|
||||
if n < 1 then invalid_arg "BloquingQueue.create";
|
||||
let q = {
|
||||
q=Queue.create();
|
||||
lock=Mutex.create();
|
||||
cond=Condition.create();
|
||||
capacity=n;
|
||||
size=0;
|
||||
} in
|
||||
q
|
||||
|
||||
let incr_size_ q = assert(q.size < q.capacity); q.size <- q.size + 1
|
||||
let decr_size_ q = assert(q.size > 0); q.size <- q.size - 1
|
||||
|
||||
let finally_ f x ~h =
|
||||
try
|
||||
let res = f x in
|
||||
ignore (h ());
|
||||
res
|
||||
with e ->
|
||||
ignore (h());
|
||||
raise e
|
||||
|
||||
let with_lock_ q f =
|
||||
Mutex.lock q.lock;
|
||||
finally_ f () ~h:(fun () -> Mutex.unlock q.lock)
|
||||
|
||||
let push q x =
|
||||
with_lock_ q
|
||||
(fun () ->
|
||||
while q.size = q.capacity do
|
||||
Condition.wait q.cond q.lock
|
||||
done;
|
||||
assert (q.size < q.capacity);
|
||||
Queue.push x q.q;
|
||||
(* if there are blocked receivers, awake one of them *)
|
||||
incr_size_ q;
|
||||
Condition.broadcast q.cond)
|
||||
|
||||
let take q =
|
||||
with_lock_ q
|
||||
(fun () ->
|
||||
while q.size = 0 do
|
||||
Condition.wait q.cond q.lock
|
||||
done;
|
||||
let x = Queue.take q.q in
|
||||
(* if there are blocked senders, awake one of them *)
|
||||
decr_size_ q;
|
||||
Condition.broadcast q.cond;
|
||||
x)
|
||||
|
||||
(*$R
|
||||
let q = create 1 in
|
||||
let t1 = CCThread.spawn (fun () -> push q 1; push q 2) in
|
||||
let t2 = CCThread.spawn (fun () -> push q 3; push q 4) in
|
||||
let l = CCLock.create [] in
|
||||
let t3 = CCThread.spawn (fun () -> for i = 1 to 4 do
|
||||
let x = take q in
|
||||
CCLock.update l (fun l -> x :: l)
|
||||
done)
|
||||
in
|
||||
Thread.join t1; Thread.join t2; Thread.join t3;
|
||||
assert_equal [1;2;3;4] (List.sort Pervasives.compare (CCLock.get l))
|
||||
*)
|
||||
|
||||
let push_list q l =
|
||||
(* push elements until it's not possible.
|
||||
Assumes the lock is acquired. *)
|
||||
let rec push_ q l = match l with
|
||||
| [] -> l
|
||||
| _::_ when q.size = q.capacity -> l (* no room remaining *)
|
||||
| x :: tl ->
|
||||
Queue.push x q.q;
|
||||
incr_size_ q;
|
||||
push_ q tl
|
||||
in
|
||||
(* push chunks of [l] in [q] until [l] is empty *)
|
||||
let rec aux q l = match l with
|
||||
| [] -> ()
|
||||
| _::_ ->
|
||||
let l = with_lock_ q
|
||||
(fun () ->
|
||||
while q.size = q.capacity do
|
||||
Condition.wait q.cond q.lock
|
||||
done;
|
||||
let l = push_ q l in
|
||||
Condition.broadcast q.cond;
|
||||
l)
|
||||
in
|
||||
aux q l
|
||||
in aux q l
|
||||
|
||||
let take_list q n =
|
||||
(* take at most [n] elements of [q] and prepend them to [acc] *)
|
||||
let rec pop_ acc q n =
|
||||
if n=0 || Queue.is_empty q.q then acc, n
|
||||
else ( (* take next element *)
|
||||
let x = Queue.take q.q in
|
||||
decr_size_ q;
|
||||
pop_ (x::acc) q (n-1)
|
||||
)
|
||||
in
|
||||
(* call [pop_] until [n] elements have been gathered *)
|
||||
let rec aux acc q n =
|
||||
if n=0 then List.rev acc
|
||||
else
|
||||
let acc, n = with_lock_ q
|
||||
(fun () ->
|
||||
while q.size = 0 do
|
||||
Condition.wait q.cond q.lock
|
||||
done;
|
||||
let acc, n = pop_ acc q n in
|
||||
Condition.broadcast q.cond;
|
||||
acc, n
|
||||
)
|
||||
in
|
||||
aux acc q n
|
||||
in
|
||||
aux [] q n
|
||||
|
||||
(*$R
|
||||
let n = 1000 in
|
||||
let lists = [| CCList.(1 -- n) ; CCList.(n+1 -- 2*n); CCList.(2*n+1 -- 3*n) |] in
|
||||
let q = create 2 in
|
||||
let senders = CCThread.Arr.spawn 3
|
||||
(fun i ->
|
||||
if i=1
|
||||
then push_list q lists.(i) (* test push_list *)
|
||||
else List.iter (push q) lists.(i)
|
||||
)
|
||||
in
|
||||
let res = CCLock.create [] in
|
||||
let receivers = CCThread.Arr.spawn 3
|
||||
(fun i ->
|
||||
if i=1 then
|
||||
let l = take_list q n in
|
||||
CCLock.update res (fun acc -> l @ acc)
|
||||
else
|
||||
for _j = 1 to n do
|
||||
let x = take q in
|
||||
CCLock.update res (fun acc -> x::acc)
|
||||
done
|
||||
)
|
||||
in
|
||||
CCThread.Arr.join senders; CCThread.Arr.join receivers;
|
||||
let l = CCLock.get res |> List.sort Pervasives.compare in
|
||||
assert_equal CCList.(1 -- 3*n) l
|
||||
*)
|
||||
|
||||
let try_take q =
|
||||
with_lock_ q
|
||||
(fun () ->
|
||||
if q.size = 0 then None
|
||||
else (
|
||||
decr_size_ q;
|
||||
Some (Queue.take q.q)
|
||||
))
|
||||
|
||||
let try_push q x =
|
||||
with_lock_ q
|
||||
(fun () ->
|
||||
if q.size = q.capacity then false
|
||||
else (
|
||||
incr_size_ q;
|
||||
Queue.push x q.q;
|
||||
Condition.signal q.cond;
|
||||
true
|
||||
))
|
||||
|
||||
let peek q =
|
||||
with_lock_ q
|
||||
(fun () ->
|
||||
try Some (Queue.peek q.q)
|
||||
with Queue.Empty -> None)
|
||||
|
||||
let size q = with_lock_ q (fun () -> q.size)
|
||||
|
||||
let capacity q = q.capacity
|
||||
50
src/threads/CCBlockingQueue.mli
Normal file
50
src/threads/CCBlockingQueue.mli
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
|
||||
(* This file is free software, part of containers. See file "license" for more details. *)
|
||||
|
||||
(** {1 Blocking Queue}
|
||||
|
||||
This queue has a limited size. Pushing a value on the queue when it
|
||||
is full will block.
|
||||
|
||||
@since 0.16 *)
|
||||
|
||||
type 'a t
|
||||
(** Safe-thread queue for values of type ['a] *)
|
||||
|
||||
val create : int -> 'a t
|
||||
(** Create a new queue of size [n]. Using [n=max_int] amounts to using
|
||||
an infinite queue (2^61 items is a lot to fit in memory); using [n=1]
|
||||
amounts to using a box with 0 or 1 elements inside.
|
||||
@raise Invalid_argument if [n < 1] *)
|
||||
|
||||
val push : 'a t -> 'a -> unit
|
||||
(** [push q x] pushes [x] into [q], blocking if the queue is full *)
|
||||
|
||||
val take : 'a t -> 'a
|
||||
(** Take the first element, blocking if needed *)
|
||||
|
||||
val push_list : 'a t -> 'a list -> unit
|
||||
(** Push items of the list, one by one *)
|
||||
|
||||
val take_list : 'a t -> int -> 'a list
|
||||
(** [take_list n q] takes [n] elements out of [q] *)
|
||||
|
||||
val try_take : 'a t -> 'a option
|
||||
(** Take the first element if the queue is not empty, return [None]
|
||||
otherwise *)
|
||||
|
||||
val try_push : 'a t -> 'a -> bool
|
||||
(** [try_push q x] pushes [x] into [q] if [q] is not full, in which
|
||||
case it returns [true].
|
||||
If it fails because [q] is full, it returns [false] *)
|
||||
|
||||
val peek : 'a t -> 'a option
|
||||
(** [peek q] returns [Some x] if [x] is the first element of [q],
|
||||
otherwise it returns [None] *)
|
||||
|
||||
val size : _ t -> int
|
||||
(** Number of elements currently in the queue *)
|
||||
|
||||
val capacity : _ t -> int
|
||||
(** Number of values the queue can hold *)
|
||||
|
||||
176
src/threads/CCLock.ml
Normal file
176
src/threads/CCLock.ml
Normal file
|
|
@ -0,0 +1,176 @@
|
|||
|
||||
(* This file is free software, part of containers. See file "license" for more details. *)
|
||||
|
||||
(** {1 Utils around Mutex} *)
|
||||
|
||||
type 'a t = {
|
||||
mutex : Mutex.t;
|
||||
mutable content : 'a;
|
||||
}
|
||||
|
||||
type 'a lock = 'a t
|
||||
|
||||
let create content = {
|
||||
mutex = Mutex.create();
|
||||
content;
|
||||
}
|
||||
|
||||
let with_lock l f =
|
||||
Mutex.lock l.mutex;
|
||||
try
|
||||
let x = f l.content in
|
||||
Mutex.unlock l.mutex;
|
||||
x
|
||||
with e ->
|
||||
Mutex.unlock l.mutex;
|
||||
raise e
|
||||
|
||||
(*$R
|
||||
let l = create 0 in
|
||||
let try_incr l =
|
||||
update l (fun x -> Thread.yield(); x+1)
|
||||
in
|
||||
for i = 1 to 10 do ignore (Thread.create try_incr l) done;
|
||||
Thread.delay 0.10 ;
|
||||
assert_equal 10 (get l)
|
||||
*)
|
||||
|
||||
module LockRef = struct
|
||||
type 'a t = 'a lock
|
||||
let get t = t.content
|
||||
let set t x = t.content <- x
|
||||
let update t f = t.content <- f t.content
|
||||
end
|
||||
|
||||
let with_lock_as_ref l ~f =
|
||||
Mutex.lock l.mutex;
|
||||
try
|
||||
let x = f l in
|
||||
Mutex.unlock l.mutex;
|
||||
x
|
||||
with e ->
|
||||
Mutex.unlock l.mutex;
|
||||
raise e
|
||||
|
||||
(*$R
|
||||
let l = create 0 in
|
||||
let test_it l =
|
||||
with_lock_as_ref l
|
||||
~f:(fun r ->
|
||||
(* increment and decrement *)
|
||||
for j = 0 to 100 do
|
||||
let x = LockRef.get r in
|
||||
LockRef.set r (x+10);
|
||||
if j mod 5=0 then Thread.yield ();
|
||||
let y = LockRef.get r in
|
||||
LockRef.set r (y - 10);
|
||||
done
|
||||
)
|
||||
in
|
||||
for i = 1 to 100 do ignore (Thread.create test_it l) done;
|
||||
Thread.delay 0.10;
|
||||
assert_equal 0 (get l)
|
||||
*)
|
||||
|
||||
let mutex l = l.mutex
|
||||
|
||||
let update l f =
|
||||
with_lock l (fun x -> l.content <- f x)
|
||||
|
||||
(*$T
|
||||
let l = create 5 in update l (fun x->x+1); get l = 6
|
||||
*)
|
||||
|
||||
let update_map l f =
|
||||
with_lock l
|
||||
(fun x ->
|
||||
let x', y = f x in
|
||||
l.content <- x';
|
||||
y)
|
||||
|
||||
(*$T
|
||||
let l = create 5 in update_map l (fun x->x+1, string_of_int x) = "5" && get l = 6
|
||||
*)
|
||||
|
||||
let get l =
|
||||
Mutex.lock l.mutex;
|
||||
let x = l.content in
|
||||
Mutex.unlock l.mutex;
|
||||
x
|
||||
|
||||
let set l x =
|
||||
Mutex.lock l.mutex;
|
||||
l.content <- x;
|
||||
Mutex.unlock l.mutex
|
||||
|
||||
(*$T
|
||||
let l = create 0 in set l 4; get l = 4
|
||||
let l = create 0 in set l 4; set l 5; get l = 5
|
||||
*)
|
||||
|
||||
let incr l = update l Pervasives.succ
|
||||
|
||||
let decr l = update l Pervasives.pred
|
||||
|
||||
|
||||
(*$R
|
||||
let l = create 0 in
|
||||
let a = Array.init 100 (fun _ -> Thread.create (fun _ -> incr l) ()) in
|
||||
Array.iter Thread.join a;
|
||||
assert_equal ~printer:CCInt.to_string 100 (get l)
|
||||
*)
|
||||
|
||||
(*$T
|
||||
let l = create 0 in incr l ; get l = 1
|
||||
let l = create 0 in decr l ; get l = ~-1
|
||||
*)
|
||||
|
||||
let incr_then_get l =
|
||||
Mutex.lock l.mutex;
|
||||
l.content <- l.content + 1;
|
||||
let x = l.content in
|
||||
Mutex.unlock l.mutex;
|
||||
x
|
||||
|
||||
let get_then_incr l =
|
||||
Mutex.lock l.mutex;
|
||||
let x = l.content in
|
||||
l.content <- l.content + 1;
|
||||
Mutex.unlock l.mutex;
|
||||
x
|
||||
|
||||
let decr_then_get l =
|
||||
Mutex.lock l.mutex;
|
||||
l.content <- l.content - 1;
|
||||
let x = l.content in
|
||||
Mutex.unlock l.mutex;
|
||||
x
|
||||
|
||||
let get_then_decr l =
|
||||
Mutex.lock l.mutex;
|
||||
let x = l.content in
|
||||
l.content <- l.content - 1;
|
||||
Mutex.unlock l.mutex;
|
||||
x
|
||||
|
||||
(*$T
|
||||
let l = create 0 in 1 = incr_then_get l && 1 = get l
|
||||
let l = create 0 in 0 = get_then_incr l && 1 = get l
|
||||
let l = create 10 in 9 = decr_then_get l && 9 = get l
|
||||
let l = create 10 in 10 = get_then_decr l && 9 = get l
|
||||
*)
|
||||
|
||||
let get_then_set l =
|
||||
Mutex.lock l.mutex;
|
||||
let x = l.content in
|
||||
l.content <- true;
|
||||
Mutex.unlock l.mutex;
|
||||
x
|
||||
|
||||
let get_then_clear l =
|
||||
Mutex.lock l.mutex;
|
||||
let x = l.content in
|
||||
l.content <- false;
|
||||
Mutex.unlock l.mutex;
|
||||
x
|
||||
|
||||
87
src/threads/CCLock.mli
Normal file
87
src/threads/CCLock.mli
Normal file
|
|
@ -0,0 +1,87 @@
|
|||
|
||||
(* This file is free software, part of containers. See file "license" for more details. *)
|
||||
|
||||
(** {1 Utils around Mutex}
|
||||
|
||||
A value wrapped into a Mutex, for more safety.
|
||||
|
||||
@since 0.8 *)
|
||||
|
||||
type 'a t
|
||||
(** A value surrounded with a lock *)
|
||||
|
||||
val create : 'a -> 'a t
|
||||
(** Create a new protected value *)
|
||||
|
||||
val with_lock : 'a t -> ('a -> 'b) -> 'b
|
||||
(** [with_lock l f] runs [f x] where [x] is the value protected with
|
||||
the lock [l], in a critical section. If [f x] fails, [with_lock l f]
|
||||
fails too but the lock is released *)
|
||||
|
||||
(** Type allowing to manipulate the lock as a reference
|
||||
@since 0.13 *)
|
||||
module LockRef : sig
|
||||
type 'a t
|
||||
|
||||
val get : 'a t -> 'a
|
||||
|
||||
val set : 'a t -> 'a -> unit
|
||||
|
||||
val update : 'a t -> ('a -> 'a) -> unit
|
||||
end
|
||||
|
||||
val with_lock_as_ref : 'a t -> f:('a LockRef.t -> 'b) -> 'b
|
||||
(** [with_lock_as_ref l f] calls [f] with a reference-like object
|
||||
that allows to manipulate the value of [l] safely.
|
||||
The object passed to [f] must not escape the function call
|
||||
@since 0.13 *)
|
||||
|
||||
val update : 'a t -> ('a -> 'a) -> unit
|
||||
(** [update l f] replaces the content [x] of [l] with [f x], atomically *)
|
||||
|
||||
val update_map : 'a t -> ('a -> 'a * 'b) -> 'b
|
||||
(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l]
|
||||
and returns [y]
|
||||
@since 0.16 *)
|
||||
|
||||
val mutex : _ t -> Mutex.t
|
||||
(** Underlying mutex *)
|
||||
|
||||
val get : 'a t -> 'a
|
||||
(** Get the value in the lock. The value that is returned isn't protected! *)
|
||||
|
||||
val set : 'a t -> 'a -> unit
|
||||
(** Atomically set the value
|
||||
@since 0.13 *)
|
||||
|
||||
val incr : int t -> unit
|
||||
(** Atomically increment the value
|
||||
@since 0.13 *)
|
||||
|
||||
val decr : int t -> unit
|
||||
(** Atomically decrement the value
|
||||
@since 0.13 *)
|
||||
|
||||
val incr_then_get : int t -> int
|
||||
(** [incr_then_get x] increments [x], and return its new value
|
||||
@since 0.16 *)
|
||||
|
||||
val get_then_incr : int t -> int
|
||||
(** [get_then_incr x] increments [x], and return its previous value
|
||||
@since 0.16 *)
|
||||
|
||||
val decr_then_get : int t -> int
|
||||
(** [decr_then_get x] decrements [x], and return its new value
|
||||
@since 0.16 *)
|
||||
|
||||
val get_then_decr : int t -> int
|
||||
(** [get_then_decr x] decrements [x], and return its previous value
|
||||
@since 0.16 *)
|
||||
|
||||
val get_then_set : bool t -> bool
|
||||
(** [get_then_set b] sets [b] to [true], and return the old value
|
||||
@since 0.16 *)
|
||||
|
||||
val get_then_clear : bool t -> bool
|
||||
(** [get_then_clear b] sets [b] to [false], and return the old value
|
||||
@since 0.16 *)
|
||||
545
src/threads/CCPool.ml
Normal file
545
src/threads/CCPool.ml
Normal file
|
|
@ -0,0 +1,545 @@
|
|||
|
||||
(* This file is free software, part of containers. See file "license" for more details. *)
|
||||
|
||||
(** {1 Thread Pool, and Futures} *)
|
||||
|
||||
type +'a state =
|
||||
| Done of 'a
|
||||
| Waiting
|
||||
| Failed of exn
|
||||
|
||||
module type PARAM = sig
|
||||
val min_size : int
|
||||
(** Minimum number of threads in the pool *)
|
||||
|
||||
val max_size : int
|
||||
(** Maximum number of threads in the pool *)
|
||||
end
|
||||
|
||||
exception Stopped
|
||||
|
||||
(*$inject
|
||||
module P = Make(struct let min_size = 0 let max_size = 30 end)
|
||||
module Fut = P.Fut
|
||||
open Fut.Infix
|
||||
*)
|
||||
|
||||
(** {2 Thread pool} *)
|
||||
module Make(P : PARAM) = struct
|
||||
type job =
|
||||
| Job1 : ('a -> _) * 'a -> job
|
||||
| Job2 : ('a -> 'b -> _) * 'a * 'b -> job
|
||||
| Job3 : ('a -> 'b -> 'c -> _) * 'a * 'b * 'c -> job
|
||||
| Job4 : ('a -> 'b -> 'c -> 'd -> _) * 'a * 'b * 'c * 'd -> job
|
||||
|
||||
type t = {
|
||||
mutable stop : bool; (* indicate that threads should stop *)
|
||||
mutable exn_handler: (exn -> unit);
|
||||
mutex : Mutex.t;
|
||||
cond : Condition.t;
|
||||
jobs : job Queue.t; (* waiting jobs *)
|
||||
mutable cur_size : int; (* total number of threads *)
|
||||
mutable cur_idle : int; (* number of idle threads *)
|
||||
} (** Dynamic, growable thread pool *)
|
||||
|
||||
let nop_ _ = ()
|
||||
|
||||
(* singleton pool *)
|
||||
let pool = {
|
||||
stop = false;
|
||||
exn_handler = nop_;
|
||||
cond = Condition.create();
|
||||
cur_size = 0;
|
||||
cur_idle = 0;
|
||||
jobs = Queue.create ();
|
||||
mutex = Mutex.create ();
|
||||
}
|
||||
|
||||
let set_exn_handler f = pool.exn_handler <- f
|
||||
|
||||
let with_lock_ t f =
|
||||
Mutex.lock t.mutex;
|
||||
try
|
||||
let x = f t in
|
||||
Mutex.unlock t.mutex;
|
||||
x
|
||||
with e ->
|
||||
Mutex.unlock t.mutex;
|
||||
raise e
|
||||
|
||||
let incr_size_ p = p.cur_size <- p.cur_size + 1
|
||||
let decr_size_ p = p.cur_size <- p.cur_size - 1
|
||||
|
||||
(* next thing a thread should do *)
|
||||
type command =
|
||||
| Process of job
|
||||
| Wait (* wait on condition *)
|
||||
| Die (* thread has no work to do *)
|
||||
|
||||
(* thread: seek what to do next (including dying).
|
||||
Assumes the pool is locked. *)
|
||||
let get_next_ pool =
|
||||
if pool.stop
|
||||
|| (Queue.is_empty pool.jobs && pool.cur_size > P.min_size) then (
|
||||
(* die: the thread would be idle otherwise *)
|
||||
assert (pool.cur_size > 0);
|
||||
decr_size_ pool;
|
||||
Die
|
||||
)
|
||||
else if Queue.is_empty pool.jobs then Wait
|
||||
else (
|
||||
let job = Queue.pop pool.jobs in
|
||||
Process job
|
||||
)
|
||||
|
||||
(* Thread: entry point. They seek jobs in the queue *)
|
||||
let rec serve pool =
|
||||
let cmd = with_lock_ pool get_next_ in
|
||||
run_cmd cmd
|
||||
|
||||
(* run a command *)
|
||||
and run_cmd = function
|
||||
| Die -> ()
|
||||
| Wait ->
|
||||
with_lock_ pool (fun p -> Condition.wait p.cond p.mutex)
|
||||
| Process (Job1 (f, x)) ->
|
||||
begin try ignore (f x) with e -> pool.exn_handler e end; serve pool
|
||||
| Process (Job2 (f, x, y)) ->
|
||||
begin try ignore (f x y) with e -> pool.exn_handler e end; serve pool
|
||||
| Process (Job3 (f, x, y, z)) ->
|
||||
begin try ignore (f x y z) with e -> pool.exn_handler e end; serve pool
|
||||
| Process (Job4 (f, x, y, z, w)) ->
|
||||
begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool
|
||||
|
||||
(* create a new worker thread *)
|
||||
let launch_worker_ pool = ignore (Thread.create serve pool)
|
||||
|
||||
(* launch the minimum required number of threads *)
|
||||
let () =
|
||||
for _i = 1 to P.min_size do launch_worker_ pool done
|
||||
|
||||
(* heuristic criterion for starting a new thread. *)
|
||||
let can_start_thread_ p = p.cur_size < P.max_size
|
||||
|
||||
let run_job job =
|
||||
(* acquire lock and push job in queue, or start thread directly
|
||||
if the queue is empty *)
|
||||
with_lock_ pool
|
||||
(fun pool ->
|
||||
if pool.stop then raise Stopped;
|
||||
if Queue.is_empty pool.jobs && can_start_thread_ pool && pool.cur_idle = 0
|
||||
then (
|
||||
(* create the thread now, on [job], as it will not break order of
|
||||
jobs. We do not want to wait for the busy threads to do our task
|
||||
if we are allowed to spawn a new thread. *)
|
||||
incr_size_ pool;
|
||||
ignore (Thread.create run_cmd (Process job))
|
||||
) else (
|
||||
(* cannot start thread, push and wait for some worker to pick it up *)
|
||||
Queue.push job pool.jobs;
|
||||
Condition.signal pool.cond; (* wake up *)
|
||||
(* might want to process in the background, if all threads are busy *)
|
||||
if pool.cur_idle = 0 && can_start_thread_ pool then (
|
||||
incr_size_ pool;
|
||||
launch_worker_ pool;
|
||||
)
|
||||
))
|
||||
|
||||
(* run the function on the argument in the given pool *)
|
||||
let run1 f x = run_job (Job1 (f, x))
|
||||
|
||||
let run f = run1 f ()
|
||||
|
||||
let run2 f x y = run_job (Job2 (f, x, y))
|
||||
|
||||
let run3 f x y z = run_job (Job3 (f, x, y, z))
|
||||
|
||||
let run4 f x y z w = run_job (Job4 (f, x, y, z, w))
|
||||
|
||||
let active () = not pool.stop
|
||||
|
||||
(* kill threads in the pool *)
|
||||
let stop () =
|
||||
with_lock_ pool
|
||||
(fun p ->
|
||||
p.stop <- true;
|
||||
Queue.clear p.jobs)
|
||||
|
||||
(* stop threads if pool is GC'd *)
|
||||
let () = Gc.finalise (fun _ -> stop ()) pool
|
||||
|
||||
(** {6 Futures} *)
|
||||
module Fut = struct
|
||||
type 'a handler = 'a state -> unit
|
||||
|
||||
(** A proper future, with a delayed computation *)
|
||||
type 'a cell = {
|
||||
mutable state : 'a state;
|
||||
mutable handlers : 'a handler list; (* handlers *)
|
||||
f_mutex : Mutex.t;
|
||||
condition : Condition.t;
|
||||
}
|
||||
|
||||
(** A future value of type 'a *)
|
||||
type 'a t =
|
||||
| Return of 'a
|
||||
| FailNow of exn
|
||||
| Run of 'a cell
|
||||
|
||||
type 'a future = 'a t
|
||||
|
||||
(** {2 Basic Future functions} *)
|
||||
|
||||
let return x = Return x
|
||||
|
||||
let fail e = FailNow e
|
||||
|
||||
let create_cell () = {
|
||||
state = Waiting;
|
||||
handlers = [];
|
||||
f_mutex = Mutex.create ();
|
||||
condition = Condition.create ();
|
||||
}
|
||||
|
||||
let with_lock_ cell f =
|
||||
Mutex.lock cell.f_mutex;
|
||||
try
|
||||
let x = f cell in
|
||||
Mutex.unlock cell.f_mutex;
|
||||
x
|
||||
with e ->
|
||||
Mutex.unlock cell.f_mutex;
|
||||
raise e
|
||||
|
||||
(* TODO: exception handler for handler errors *)
|
||||
|
||||
let set_done_ cell x =
|
||||
with_lock_ cell
|
||||
(fun cell -> match cell.state with
|
||||
| Waiting -> (* set state and signal *)
|
||||
cell.state <- Done x;
|
||||
Condition.broadcast cell.condition;
|
||||
List.iter
|
||||
(fun f -> try f cell.state with e -> pool.exn_handler e)
|
||||
cell.handlers
|
||||
| _ -> assert false)
|
||||
|
||||
let set_fail_ cell e =
|
||||
with_lock_ cell
|
||||
(fun cell -> match cell.state with
|
||||
| Waiting ->
|
||||
cell.state <- Failed e;
|
||||
Condition.broadcast cell.condition;
|
||||
List.iter
|
||||
(fun f -> try f cell.state with e -> pool.exn_handler e)
|
||||
cell.handlers
|
||||
| _ -> assert false)
|
||||
|
||||
(* calls [f x], and put result or exception in [cell] *)
|
||||
let run_and_set1 cell f x =
|
||||
try
|
||||
let y = f x in
|
||||
set_done_ cell y
|
||||
with e ->
|
||||
set_fail_ cell e
|
||||
|
||||
let run_and_set2 cell f x y =
|
||||
try
|
||||
let z = f x y in
|
||||
set_done_ cell z
|
||||
with e ->
|
||||
set_fail_ cell e
|
||||
|
||||
let make1 f x =
|
||||
let cell = create_cell() in
|
||||
run3 run_and_set1 cell f x;
|
||||
Run cell
|
||||
|
||||
let make f = make1 f ()
|
||||
|
||||
(*$R
|
||||
List.iter
|
||||
(fun n ->
|
||||
let l = Sequence.(1 -- n) |> Sequence.to_list in
|
||||
let l = List.rev_map (fun i ->
|
||||
Fut.make
|
||||
(fun () ->
|
||||
Thread.delay 0.05;
|
||||
1
|
||||
)) l in
|
||||
let l' = List.map Fut.get l in
|
||||
OUnit.assert_equal n (List.fold_left (+) 0 l');
|
||||
)
|
||||
[ 10; 300; ]
|
||||
*)
|
||||
|
||||
let make2 f x y =
|
||||
let cell = create_cell() in
|
||||
run4 run_and_set2 cell f x y;
|
||||
Run cell
|
||||
|
||||
let get = function
|
||||
| Return x -> x
|
||||
| FailNow e -> raise e
|
||||
| Run cell ->
|
||||
let rec get_ cell = match cell.state with
|
||||
| Waiting ->
|
||||
Condition.wait cell.condition cell.f_mutex; (* wait *)
|
||||
get_ cell
|
||||
| Done x -> x
|
||||
| Failed e -> raise e
|
||||
in
|
||||
with_lock_ cell get_
|
||||
|
||||
(* access the result without locking *)
|
||||
let get_nolock_ = function
|
||||
| Return x
|
||||
| Run {state=Done x; _} -> x
|
||||
| FailNow _
|
||||
| Run {state=(Failed _ | Waiting); _} -> assert false
|
||||
|
||||
let state = function
|
||||
| Return x -> Done x
|
||||
| FailNow e -> Failed e
|
||||
| Run cell ->
|
||||
with_lock_ cell (fun cell -> cell.state)
|
||||
|
||||
let is_done = function
|
||||
| Return _
|
||||
| FailNow _ -> true
|
||||
| Run cell ->
|
||||
with_lock_ cell (fun c -> c.state <> Waiting)
|
||||
|
||||
(** {2 Combinators *)
|
||||
|
||||
let add_handler_ cell f =
|
||||
with_lock_ cell
|
||||
(fun cell -> match cell.state with
|
||||
| Waiting -> cell.handlers <- f :: cell.handlers
|
||||
| Done _ | Failed _ -> f cell.state)
|
||||
|
||||
let on_finish fut k = match fut with
|
||||
| Return x -> k (Done x)
|
||||
| FailNow e -> k (Failed e)
|
||||
| Run cell -> add_handler_ cell k
|
||||
|
||||
let on_success fut k =
|
||||
on_finish fut
|
||||
(function
|
||||
| Done x -> k x
|
||||
| _ -> ())
|
||||
|
||||
let on_failure fut k =
|
||||
on_finish fut
|
||||
(function
|
||||
| Failed e -> k e
|
||||
| _ -> ())
|
||||
|
||||
let map_cell_ ~async f cell ~into:cell' =
|
||||
add_handler_ cell
|
||||
(function
|
||||
| Done x ->
|
||||
if async
|
||||
then run3 run_and_set1 cell' f x
|
||||
else run_and_set1 cell' f x
|
||||
| Failed e -> set_fail_ cell' e
|
||||
| Waiting -> assert false);
|
||||
Run cell'
|
||||
|
||||
let map_ ~async f fut = match fut with
|
||||
| Return x ->
|
||||
if async
|
||||
then make1 f x
|
||||
else Return (f x)
|
||||
| FailNow e -> FailNow e
|
||||
| Run cell -> map_cell_ ~async f cell ~into:(create_cell())
|
||||
|
||||
let map f fut = map_ ~async:false f fut
|
||||
|
||||
let map_async f fut = map_ ~async:true f fut
|
||||
|
||||
(*$R
|
||||
let a = Fut.make (fun () -> 1) in
|
||||
let b = Fut.map (fun x -> x+1) a in
|
||||
let c = Fut.map (fun x -> x-1) b in
|
||||
OUnit.assert_equal 1 (Fut.get c)
|
||||
*)
|
||||
|
||||
let app_ ~async f x = match f, x with
|
||||
| Return f, Return x ->
|
||||
if async
|
||||
then make1 f x
|
||||
else Return (f x)
|
||||
| FailNow e, _
|
||||
| _, FailNow e -> FailNow e
|
||||
| Return f, Run x ->
|
||||
map_cell_ ~async (fun x -> f x) x ~into:(create_cell())
|
||||
| Run f, Return x ->
|
||||
map_cell_ ~async (fun f -> f x) f ~into:(create_cell())
|
||||
| Run f, Run x ->
|
||||
let cell' = create_cell () in
|
||||
add_handler_ f
|
||||
(function
|
||||
| Done f -> ignore (map_cell_ ~async f x ~into:cell')
|
||||
| Failed e -> set_fail_ cell' e
|
||||
| Waiting -> assert false);
|
||||
Run cell'
|
||||
|
||||
let app f x = app_ ~async:false f x
|
||||
|
||||
let app_async f x = app_ ~async:true f x
|
||||
|
||||
let flat_map f fut = match fut with
|
||||
| Return x -> f x
|
||||
| FailNow e -> FailNow e
|
||||
| Run cell ->
|
||||
let cell' = create_cell() in
|
||||
add_handler_ cell
|
||||
(function
|
||||
| Done x ->
|
||||
let fut' = f x in
|
||||
on_finish fut'
|
||||
(function
|
||||
| Done y -> set_done_ cell' y
|
||||
| Failed e -> set_fail_ cell' e
|
||||
| Waiting -> assert false
|
||||
)
|
||||
| Failed e -> set_fail_ cell' e
|
||||
| Waiting -> assert false
|
||||
);
|
||||
Run cell'
|
||||
|
||||
let and_then fut f = flat_map (fun _ -> f ()) fut
|
||||
|
||||
type _ array_or_list =
|
||||
| A_ : 'a array -> 'a array_or_list
|
||||
| L_ : 'a list -> 'a array_or_list
|
||||
|
||||
let iter_aol
|
||||
: type a. a array_or_list -> (a -> unit) -> unit
|
||||
= fun aol f -> match aol with
|
||||
| A_ a -> Array.iter f a
|
||||
| L_ l -> List.iter f l
|
||||
|
||||
(* [sequence_ l f] returns a future that waits for every element of [l]
|
||||
to return of fail, and call [f ()] to obtain the result (as a closure)
|
||||
in case every element succeeded (otherwise a failure is
|
||||
returned automatically) *)
|
||||
let sequence_
|
||||
: type a res. a t array_or_list -> (unit -> res) -> res t
|
||||
= fun aol f ->
|
||||
let n = match aol with
|
||||
| A_ a -> Array.length a
|
||||
| L_ l -> List.length l
|
||||
in
|
||||
assert (n>0);
|
||||
let cell = create_cell() in
|
||||
let n_err = CCLock.create 0 in (* number of failed threads *)
|
||||
let n_ok = CCLock.create 0 in (* number of succeeding threads *)
|
||||
iter_aol aol
|
||||
(fun fut ->
|
||||
on_finish fut
|
||||
(function
|
||||
| Failed e ->
|
||||
let x = CCLock.incr_then_get n_err in
|
||||
(* if first failure, then seal [cell]'s fate now *)
|
||||
if x=1 then set_fail_ cell e
|
||||
| Done _ ->
|
||||
let x = CCLock.incr_then_get n_ok in
|
||||
(* if [n] successes, then [cell] succeeds. Otherwise, some
|
||||
job has not finished or some job has failed. *)
|
||||
if x = n then (
|
||||
let res = f () in
|
||||
set_done_ cell res
|
||||
)
|
||||
| Waiting -> assert false));
|
||||
Run cell
|
||||
|
||||
(* map an array of futures to a future array *)
|
||||
let sequence_a a = match a with
|
||||
| [||] -> return [||]
|
||||
| _ ->
|
||||
sequence_ (A_ a)
|
||||
(fun () -> Array.map get_nolock_ a)
|
||||
|
||||
let map_a f a = sequence_a (Array.map f a)
|
||||
|
||||
let sequence_l l = match l with
|
||||
| [] -> return []
|
||||
| _ :: _ ->
|
||||
sequence_ (L_ l) (fun () -> List.map get_nolock_ l)
|
||||
|
||||
(* reverse twice *)
|
||||
let map_l f l =
|
||||
let l = List.rev_map f l in
|
||||
sequence_ (L_ l)
|
||||
(fun () -> List.rev_map get_nolock_ l)
|
||||
|
||||
(*$R
|
||||
let l = CCList.(1 -- 50) in
|
||||
let l' = l
|
||||
|> List.map
|
||||
(fun x -> Fut.make (fun () -> Thread.delay 0.1; x*10))
|
||||
|> Fut.sequence_l
|
||||
|> Fut.map (List.fold_left (+) 0)
|
||||
in
|
||||
let expected = List.fold_left (fun acc x -> acc + 10 * x) 0 l in
|
||||
OUnit.assert_equal expected (Fut.get l')
|
||||
*)
|
||||
|
||||
(*$R
|
||||
let l = CCList.(1 -- 50) in
|
||||
let l' = l
|
||||
|> List.map
|
||||
(fun x -> Fut.make (fun () -> Thread.delay 0.1; if x = 5 then raise Exit; x))
|
||||
|> Fut.sequence_l
|
||||
|> Fut.map (List.fold_left (+) 0)
|
||||
in
|
||||
OUnit.assert_raises Exit (fun () -> Fut.get l')
|
||||
*)
|
||||
|
||||
let choose_
|
||||
: type a. a t array_or_list -> a t
|
||||
= fun aol ->
|
||||
let cell = create_cell() in
|
||||
let is_done = CCLock.create false in
|
||||
iter_aol aol
|
||||
(fun fut ->
|
||||
on_finish fut
|
||||
(fun res -> match res with
|
||||
| Waiting -> assert false
|
||||
| Done x ->
|
||||
let was_done = CCLock.get_then_clear is_done in
|
||||
if not was_done then set_done_ cell x
|
||||
| Failed e ->
|
||||
let was_done = CCLock.get_then_clear is_done in
|
||||
if not was_done then set_fail_ cell e));
|
||||
Run cell
|
||||
|
||||
let choose_a a = choose_ (A_ a)
|
||||
|
||||
let choose_l l = choose_ (L_ l)
|
||||
|
||||
let sleep time = make1 Thread.delay time
|
||||
|
||||
(*$R
|
||||
let start = Unix.gettimeofday () in
|
||||
let pause = 0.2 and n = 10 in
|
||||
let l = CCList.(1 -- n)
|
||||
|> List.map (fun _ -> Fut.make (fun () -> Thread.delay pause))
|
||||
in
|
||||
List.iter Fut.get l;
|
||||
let stop = Unix.gettimeofday () in
|
||||
OUnit.assert_bool "some_parallelism" (stop -. start < float_of_int n *. pause);
|
||||
*)
|
||||
|
||||
module Infix = struct
|
||||
let (>>=) x f = flat_map f x
|
||||
let (>>) a f = and_then a f
|
||||
let (>|=) a f = map f a
|
||||
let (<*>) = app
|
||||
end
|
||||
|
||||
include Infix
|
||||
end
|
||||
end
|
||||
167
src/threads/CCPool.mli
Normal file
167
src/threads/CCPool.mli
Normal file
|
|
@ -0,0 +1,167 @@
|
|||
|
||||
(* This file is free software, part of containers. See file "license" for more details. *)
|
||||
|
||||
(** {1 Thread Pool, and Futures}
|
||||
|
||||
Renamed and heavily updated from [CCFuture]
|
||||
@since 0.16 *)
|
||||
|
||||
type +'a state =
|
||||
| Done of 'a
|
||||
| Waiting
|
||||
| Failed of exn
|
||||
|
||||
module type PARAM = sig
|
||||
val min_size : int
|
||||
(** Minimum number of threads in the pool *)
|
||||
|
||||
val max_size : int
|
||||
(** Maximum number of threads in the pool *)
|
||||
end
|
||||
|
||||
exception Stopped
|
||||
|
||||
(** {2 Create a new Pool} *)
|
||||
module Make(P : PARAM) : sig
|
||||
val run : (unit -> _) -> unit
|
||||
(** [run f] schedules [f] for being executed in the thread pool *)
|
||||
|
||||
val run1 : ('a -> _) -> 'a -> unit
|
||||
(** [run1 f x] is similar to [run (fun () -> f x)] *)
|
||||
|
||||
val run2 : ('a -> 'b -> _) -> 'a -> 'b -> unit
|
||||
|
||||
val run3 : ('a -> 'b -> 'c -> _) -> 'a -> 'b -> 'c -> unit
|
||||
|
||||
val set_exn_handler : (exn -> unit) -> unit
|
||||
|
||||
val active : unit -> bool
|
||||
(** [active ()] is true as long as [stop()] has not been called yet *)
|
||||
|
||||
val stop : unit -> unit
|
||||
(** After calling [stop ()], Most functions will raise Stopped.
|
||||
This has the effect of preventing new tasks from being executed. *)
|
||||
|
||||
(** {6 Futures}
|
||||
|
||||
The futures are registration points for callbacks, storing a {!state},
|
||||
that are executed in the pool using {!run}. *)
|
||||
module Fut : sig
|
||||
type 'a t
|
||||
(** A future value of type 'a *)
|
||||
|
||||
type 'a future = 'a t
|
||||
|
||||
(** {2 Constructors} *)
|
||||
|
||||
val return : 'a -> 'a t
|
||||
(** Future that is already computed *)
|
||||
|
||||
val fail : exn -> 'a t
|
||||
(** Future that fails immediately *)
|
||||
|
||||
val make : (unit -> 'a) -> 'a t
|
||||
(** Create a future, representing a value that will be computed by
|
||||
the function. If the function raises, the future will fail. *)
|
||||
|
||||
val make1 : ('a -> 'b) -> 'a -> 'b t
|
||||
|
||||
val make2 : ('a -> 'b -> 'c) -> 'a -> 'b -> 'c t
|
||||
|
||||
(** {2 Basics} *)
|
||||
|
||||
val get : 'a t -> 'a
|
||||
(** Blocking get: wait for the future to be evaluated, and get the value,
|
||||
or the exception that failed the future is returned.
|
||||
raise e if the future failed with e *)
|
||||
|
||||
val state : 'a t -> 'a state
|
||||
(** State of the future *)
|
||||
|
||||
val is_done : 'a t -> bool
|
||||
(** Is the future evaluated (success/failure)? *)
|
||||
|
||||
(** {2 Combinators} *)
|
||||
|
||||
val on_success : 'a t -> ('a -> unit) -> unit
|
||||
(** Attach a handler to be called upon success.
|
||||
The handler should not call functions on the future.
|
||||
Might be evaluated now if the future is already done. *)
|
||||
|
||||
val on_failure : _ t -> (exn -> unit) -> unit
|
||||
(** Attach a handler to be called upon failure.
|
||||
The handler should not call any function on the future.
|
||||
Might be evaluated now if the future is already done. *)
|
||||
|
||||
val on_finish : 'a t -> ('a state -> unit) -> unit
|
||||
(** Attach a handler to be called when the future is evaluated.
|
||||
The handler should not call functions on the future.
|
||||
Might be evaluated now if the future is already done. *)
|
||||
|
||||
val flat_map : ('a -> 'b t) -> 'a t -> 'b t
|
||||
(** Monadic combination of futures *)
|
||||
|
||||
val and_then : 'a t -> (unit -> 'b t) -> 'b t
|
||||
(** Wait for the first future to succeed, then launch the second *)
|
||||
|
||||
val sequence_a : 'a t array -> 'a array t
|
||||
(** Future that waits for all previous futures to terminate. If any future
|
||||
in the array fails, [sequence_a l] fails too. *)
|
||||
|
||||
val map_a : ('a -> 'b t) -> 'a array -> 'b array t
|
||||
(** [map_l f a] maps [f] on every element of [a], and will return
|
||||
the array of every result if all calls succeed, or an error otherwise. *)
|
||||
|
||||
val sequence_l : 'a t list -> 'a list t
|
||||
(** Future that waits for all previous futures to terminate. If any future
|
||||
in the list fails, [sequence_l l] fails too. *)
|
||||
|
||||
val map_l : ('a -> 'b t) -> 'a list -> 'b list t
|
||||
(** [map_l f l] maps [f] on every element of [l], and will return
|
||||
the list of every result if all calls succeed, or an error otherwise. *)
|
||||
|
||||
val choose_a : 'a t array -> 'a t
|
||||
(** Choose among those futures (the first to terminate). Behaves like
|
||||
the first future that terminates, by failing if the future fails *)
|
||||
|
||||
val choose_l : 'a t list -> 'a t
|
||||
(** Choose among those futures (the first to terminate). Behaves like
|
||||
the first future that terminates, by failing if the future fails *)
|
||||
|
||||
val map : ('a -> 'b) -> 'a t -> 'b t
|
||||
(** Maps the value inside the future. The function doesn't run in its
|
||||
own task; if it can take time, use {!flat_map} or {!map_async} *)
|
||||
|
||||
val map_async : ('a -> 'b) -> 'a t -> 'b t
|
||||
(** Maps the value inside the future, to be computed in a separated job. *)
|
||||
|
||||
val app : ('a -> 'b) t -> 'a t -> 'b t
|
||||
(** [app f x] applies the result of [f] to the result of [x] *)
|
||||
|
||||
val app_async : ('a -> 'b) t -> 'a t -> 'b t
|
||||
(** [app f x] applies the result of [f] to the result of [x], in
|
||||
a separated job scheduled in the pool *)
|
||||
|
||||
val sleep : float -> unit t
|
||||
(** Future that returns with success in the given amount of seconds. Blocks
|
||||
the thread! If you need to wait on many events, consider
|
||||
using {!CCTimer}. *)
|
||||
|
||||
module Infix : sig
|
||||
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
|
||||
val (>>) : 'a t -> (unit -> 'b t) -> 'b t
|
||||
val (>|=) : 'a t -> ('a -> 'b) -> 'b t
|
||||
val (<*>) : ('a -> 'b) t -> 'a t -> 'b t
|
||||
end
|
||||
|
||||
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
|
||||
|
||||
val (>>) : 'a t -> (unit -> 'b t) -> 'b t
|
||||
|
||||
val (>|=) : 'a t -> ('a -> 'b) -> 'b t
|
||||
(** Alias to {!map} *)
|
||||
|
||||
val (<*>): ('a -> 'b) t -> 'a t -> 'b t
|
||||
(** Alias to {!app} *)
|
||||
end
|
||||
end
|
||||
117
src/threads/CCSemaphore.ml
Normal file
117
src/threads/CCSemaphore.ml
Normal file
|
|
@ -0,0 +1,117 @@
|
|||
(** {1 Semaphores} *)
|
||||
|
||||
type t = {
|
||||
mutable n : int;
|
||||
mutex : Mutex.t;
|
||||
cond : Condition.t;
|
||||
}
|
||||
|
||||
let create n =
|
||||
if n <= 0 then invalid_arg "Semaphore.create";
|
||||
{ n;
|
||||
mutex=Mutex.create();
|
||||
cond=Condition.create();
|
||||
}
|
||||
|
||||
let get t = t.n
|
||||
|
||||
(* assume [t.mutex] locked, try to acquire [t] *)
|
||||
let acquire_once_locked_ m t =
|
||||
while t.n < m do
|
||||
Condition.wait t.cond t.mutex;
|
||||
done;
|
||||
assert (t.n >= m);
|
||||
t.n <- t.n - m;
|
||||
Condition.broadcast t.cond;
|
||||
Mutex.unlock t.mutex
|
||||
|
||||
let acquire m t =
|
||||
Mutex.lock t.mutex;
|
||||
acquire_once_locked_ m t
|
||||
|
||||
(* assume [t.mutex] locked, try to release [t] *)
|
||||
let release_once_locked_ m t =
|
||||
t.n <- t.n + m;
|
||||
Condition.broadcast t.cond;
|
||||
Mutex.unlock t.mutex
|
||||
|
||||
let release m t =
|
||||
Mutex.lock t.mutex;
|
||||
release_once_locked_ m t;
|
||||
()
|
||||
|
||||
(*$R
|
||||
let s = create 1 in
|
||||
let r = CCLock.create false in
|
||||
let _ = Thread.create (fun s -> acquire 5 s; CCLock.set r true) s in
|
||||
Thread.yield ();
|
||||
assert_equal false (CCLock.get r);
|
||||
release 4 s;
|
||||
Thread.delay 0.2;
|
||||
assert_equal true (CCLock.get r);
|
||||
assert_equal 0 (get s)
|
||||
*)
|
||||
|
||||
let with_acquire ~n t ~f =
|
||||
Mutex.lock t.mutex;
|
||||
acquire_once_locked_ n t;
|
||||
try
|
||||
let x = f() in
|
||||
release_once_locked_ n t;
|
||||
x
|
||||
with e ->
|
||||
release_once_locked_ n t;
|
||||
raise e
|
||||
|
||||
(*$R
|
||||
let s = create 5 in
|
||||
let n = CCLock.create 0 in
|
||||
let a = Array.init 100 (fun i ->
|
||||
Thread.create (fun _ ->
|
||||
with_acquire ~n:(1 + (i mod 5)) s
|
||||
~f:(fun () -> CCLock.incr n)
|
||||
) ())
|
||||
in
|
||||
Array.iter Thread.join a;
|
||||
assert_equal ~printer:CCInt.to_string 5 (get s);
|
||||
assert_equal ~printer:CCInt.to_string 100 (CCLock.get n)
|
||||
*)
|
||||
|
||||
let wait_until_at_least ~n t ~f =
|
||||
Mutex.lock t.mutex;
|
||||
while t.n < n do
|
||||
Condition.wait t.cond t.mutex;
|
||||
done;
|
||||
assert (t.n >= n);
|
||||
Mutex.unlock t.mutex;
|
||||
f ()
|
||||
|
||||
(*$R
|
||||
let output s = () in
|
||||
let s = create 2 in
|
||||
let res = CCLock.create false in
|
||||
let id = Thread.create
|
||||
(fun () ->
|
||||
output "start";
|
||||
wait_until_at_least ~n:5 s
|
||||
~f:(fun () ->
|
||||
assert (get s >= 5);
|
||||
output "modify now";
|
||||
CCLock.set res true)
|
||||
) ()
|
||||
in
|
||||
output "launched thread";
|
||||
Thread.yield();
|
||||
assert_bool "start" (not (CCLock.get res));
|
||||
output "release 2";
|
||||
release 2 s;
|
||||
Thread.yield();
|
||||
assert_bool "after release 2" (not (CCLock.get res));
|
||||
output "release 1";
|
||||
release 1 s;
|
||||
(* should work now *)
|
||||
Thread.delay 0.2;
|
||||
Thread.join id;
|
||||
output "check";
|
||||
assert_bool "after release 1" (CCLock.get res)
|
||||
*)
|
||||
31
src/threads/CCSemaphore.mli
Normal file
31
src/threads/CCSemaphore.mli
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
(* This file is free software, part of containers. See file "license" for more details. *)
|
||||
|
||||
(** {1 Semaphores}
|
||||
|
||||
@since 0.13 *)
|
||||
|
||||
type t
|
||||
(** A semaphore *)
|
||||
|
||||
val create : int -> t
|
||||
(** [create n] creates a semaphore with initial value [n]
|
||||
@raise Invalid_argument if [n <= 0] *)
|
||||
|
||||
val get : t -> int
|
||||
(** Current value *)
|
||||
|
||||
val acquire : int -> t -> unit
|
||||
(** [acquire n s] blocks until [get s >= n], then atomically
|
||||
sets [s := !s - n] *)
|
||||
|
||||
val release : int -> t -> unit
|
||||
(** [release n s] atomically sets [s := !s + n] *)
|
||||
|
||||
val with_acquire : n:int -> t -> f:(unit -> 'a) -> 'a
|
||||
(** [with_acquire ~n s ~f] first acquires [s] with [n] units,
|
||||
calls [f ()], and then release [s] with [n] units.
|
||||
Safely release the semaphore even if [f ()] fails *)
|
||||
|
||||
val wait_until_at_least : n:int -> t -> f:(unit -> 'a) -> 'a
|
||||
(** [wait_until_at_least ~n s ~f] waits until [get s >= n], then calls [f ()]
|
||||
and returns its result. Doesn't modify the semaphore. *)
|
||||
85
src/threads/CCThread.ml
Normal file
85
src/threads/CCThread.ml
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
(* This file is free software, part of containers. See file "license" for more details. *)
|
||||
|
||||
(** {1 Threads} *)
|
||||
|
||||
type t = Thread.t
|
||||
|
||||
let spawn f = Thread.create f ()
|
||||
|
||||
let spawn1 f x = Thread.create f x
|
||||
|
||||
let spawn2 f x y = Thread.create (fun () -> f x y) ()
|
||||
|
||||
let detach f = ignore (Thread.create f ())
|
||||
|
||||
let finally_ f x ~h =
|
||||
try
|
||||
let res = f x in
|
||||
ignore (h ());
|
||||
res
|
||||
with e ->
|
||||
ignore (h());
|
||||
raise e
|
||||
|
||||
module Arr = struct
|
||||
let spawn n f =
|
||||
Array.init n (fun i -> Thread.create f i)
|
||||
|
||||
let join a = Array.iter Thread.join a
|
||||
end
|
||||
|
||||
(*$R
|
||||
let l = CCLock.create 0 in
|
||||
let a = Arr.spawn 101 (fun i -> CCLock.update l ((+) i)) in
|
||||
Arr.join a;
|
||||
let n = Sequence.(1 -- 100 |> fold (+) 0) in
|
||||
assert_equal ~printer:CCInt.to_string n (CCLock.get l)
|
||||
*)
|
||||
|
||||
module Barrier = struct
|
||||
type t = {
|
||||
lock: Mutex.t;
|
||||
cond: Condition.t;
|
||||
mutable activated: bool;
|
||||
}
|
||||
|
||||
let create () = {
|
||||
lock=Mutex.create();
|
||||
cond=Condition.create();
|
||||
activated=false;
|
||||
}
|
||||
|
||||
let with_lock_ b f =
|
||||
Mutex.lock b.lock;
|
||||
finally_ f () ~h:(fun () -> Mutex.unlock b.lock)
|
||||
|
||||
let reset b = with_lock_ b (fun () -> b.activated <- false)
|
||||
|
||||
let wait b =
|
||||
with_lock_ b
|
||||
(fun () ->
|
||||
while not b.activated do
|
||||
Condition.wait b.cond b.lock
|
||||
done)
|
||||
|
||||
let activate b =
|
||||
with_lock_ b
|
||||
(fun () ->
|
||||
if not b.activated then (
|
||||
b.activated <- true;
|
||||
Condition.broadcast b.cond))
|
||||
|
||||
let activated b = with_lock_ b (fun () -> b.activated)
|
||||
end
|
||||
|
||||
(*$R
|
||||
let b = Barrier.create () in
|
||||
let res = CCLock.create 0 in
|
||||
let t1 = spawn (fun _ -> Barrier.wait b; CCLock.incr res)
|
||||
and t2 = spawn (fun _ -> Barrier.wait b; CCLock.incr res) in
|
||||
Thread.delay 0.2;
|
||||
assert_equal 0 (CCLock.get res);
|
||||
Barrier.activate b;
|
||||
Thread.join t1; Thread.join t2;
|
||||
assert_equal 2 (CCLock.get res)
|
||||
*)
|
||||
58
src/threads/CCThread.mli
Normal file
58
src/threads/CCThread.mli
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
(* This file is free software, part of containers. See file "license" for more details. *)
|
||||
|
||||
(** {1 Threads}
|
||||
|
||||
{b status: unstable}
|
||||
@since 0.13 *)
|
||||
|
||||
type t = Thread.t
|
||||
|
||||
val spawn : (unit -> _) -> t
|
||||
(** [spawn f] creates a new thread that runs [f ()] *)
|
||||
|
||||
val spawn1 : ('a -> _) -> 'a -> t
|
||||
(** [spawn1 f x] is like [spawn (fun () -> f x)].
|
||||
@since 0.16 *)
|
||||
|
||||
val spawn2 : ('a -> 'b -> _) -> 'a -> 'b -> t
|
||||
(** [spawn2 f x y] is like [spawn (fun () -> f x y)].
|
||||
@since 0.16 *)
|
||||
|
||||
val detach : (unit -> 'a) -> unit
|
||||
(** [detach f] is the same as [ignore (spawn f)] *)
|
||||
|
||||
(** {2 Array of threads} *)
|
||||
module Arr : sig
|
||||
val spawn : int -> (int -> 'a) -> t array
|
||||
(** [A.spawn n f] creates an array [res] of length [n], such that
|
||||
[res.(i) = spawn (fun () -> f i)] *)
|
||||
|
||||
val join : t array -> unit
|
||||
(** [A.join a] joins every thread in [a] *)
|
||||
end
|
||||
|
||||
(** {2 Single-Use Barrier} *)
|
||||
|
||||
module Barrier : sig
|
||||
type t
|
||||
(** Barrier, used to synchronize threads *)
|
||||
|
||||
val create : unit -> t
|
||||
(** Create a barrier *)
|
||||
|
||||
val reset : t -> unit
|
||||
(** Reset to initial (non-triggered) state *)
|
||||
|
||||
val wait : t -> unit
|
||||
(** [wait b] waits for barrier [b] to be activated by [activate b].
|
||||
All threads calling this wait until [activate b] is called.
|
||||
If [b] is already activated, [wait b] does nothing *)
|
||||
|
||||
val activate : t -> unit
|
||||
(** [activate b] unblocks all threads that were waiting on [b] *)
|
||||
|
||||
val activated : t -> bool
|
||||
(** [activated b] returns [true] iff [activate b] was called, and [reset b]
|
||||
was not called since. In other words, [activated b = true] means
|
||||
[wait b] will not block. *)
|
||||
end
|
||||
195
src/threads/CCTimer.ml
Normal file
195
src/threads/CCTimer.ml
Normal file
|
|
@ -0,0 +1,195 @@
|
|||
|
||||
(* This file is free software, part of containers. See file "license" for more details. *)
|
||||
|
||||
(** {1 Event timer} *)
|
||||
|
||||
type job =
|
||||
| Job : float * (unit -> 'a) -> job
|
||||
|
||||
module TaskHeap = CCHeap.Make(struct
|
||||
type t = job
|
||||
let leq (Job(f1,_)) (Job (f2,_)) = f1 <= f2
|
||||
end)
|
||||
|
||||
exception Stopped
|
||||
|
||||
type t = {
|
||||
mutable stop : bool;
|
||||
mutable tasks : TaskHeap.t;
|
||||
mutable exn_handler : (exn -> unit);
|
||||
t_mutex : Mutex.t;
|
||||
fifo_in : Unix.file_descr;
|
||||
fifo_out : Unix.file_descr;
|
||||
}
|
||||
|
||||
let set_exn_handler timer f = timer.exn_handler <- f
|
||||
|
||||
let standby_wait = 10.
|
||||
(* when no task is scheduled, this is the amount of time that is waited
|
||||
in a row for something to happen. This is also the maximal delay
|
||||
between the call to {!stop} and the actual termination of the
|
||||
thread. *)
|
||||
|
||||
let epsilon = 0.0001
|
||||
(* accepted time diff for actions. *)
|
||||
|
||||
let with_lock_ t f =
|
||||
Mutex.lock t.t_mutex;
|
||||
try
|
||||
let x = f t in
|
||||
Mutex.unlock t.t_mutex;
|
||||
x
|
||||
with e ->
|
||||
Mutex.unlock t.t_mutex;
|
||||
raise e
|
||||
|
||||
type command =
|
||||
| Quit
|
||||
| Run : (unit -> _) -> command
|
||||
| Wait of float
|
||||
|
||||
let pop_task_ t =
|
||||
let tasks, _ = TaskHeap.take_exn t.tasks in
|
||||
t.tasks <- tasks
|
||||
|
||||
let call_ timer f =
|
||||
try ignore (f ())
|
||||
with e -> timer.exn_handler e
|
||||
|
||||
(* check next task *)
|
||||
let next_task_ timer = match TaskHeap.find_min timer.tasks with
|
||||
| _ when timer.stop -> Quit
|
||||
| None -> Wait standby_wait
|
||||
| Some Job (time, f) ->
|
||||
let now = Unix.gettimeofday () in
|
||||
if now +. epsilon > time then (
|
||||
(* now! *)
|
||||
pop_task_ timer;
|
||||
Run f
|
||||
) else Wait (time -. now)
|
||||
|
||||
(* The main thread function: wait for next event, run it, and loop *)
|
||||
let serve timer =
|
||||
let buf = Bytes.make 1 '_' in
|
||||
(* acquire lock, call [process_task] and do as it commands *)
|
||||
let rec next () = match with_lock_ timer next_task_ with
|
||||
| Quit -> ()
|
||||
| Run f ->
|
||||
call_ timer f; (* call outside of any lock *)
|
||||
next ()
|
||||
| Wait delay -> wait delay
|
||||
(* wait for [delay] seconds, or until something happens on [fifo_in] *)
|
||||
and wait delay =
|
||||
let read = Thread.wait_timed_read timer.fifo_in delay in
|
||||
(* remove char from fifo, so that next write can happen *)
|
||||
if read then ignore (Unix.read timer.fifo_in buf 0 1);
|
||||
next ()
|
||||
in
|
||||
next ()
|
||||
|
||||
let nop_handler_ _ = ()
|
||||
|
||||
let create () =
|
||||
let fifo_in, fifo_out = Unix.pipe () in
|
||||
let timer = {
|
||||
stop = false;
|
||||
tasks = TaskHeap.empty;
|
||||
exn_handler = nop_handler_;
|
||||
t_mutex = Mutex.create ();
|
||||
fifo_in;
|
||||
fifo_out;
|
||||
} in
|
||||
(* start a thread to process tasks *)
|
||||
let _t = Thread.create serve timer in
|
||||
timer
|
||||
|
||||
let underscore_ = Bytes.make 1 '_'
|
||||
|
||||
(* awake the thread *)
|
||||
let awaken_ timer =
|
||||
ignore (Unix.single_write timer.fifo_out underscore_ 0 1)
|
||||
|
||||
(** [at s t ~f] will run [f ()] at the Unix echo [t] *)
|
||||
let at timer time ~f =
|
||||
if timer.stop then raise Stopped;
|
||||
let now = Unix.gettimeofday () in
|
||||
if now >= time
|
||||
then call_ timer f
|
||||
else
|
||||
with_lock_ timer
|
||||
(fun timer ->
|
||||
if timer.stop then raise Stopped;
|
||||
(* time of the next scheduled event *)
|
||||
let next_time = match TaskHeap.find_min timer.tasks with
|
||||
| None -> max_float
|
||||
| Some Job (d, _) -> d
|
||||
in
|
||||
(* insert task *)
|
||||
timer.tasks <- TaskHeap.insert (Job (time, f)) timer.tasks;
|
||||
(* see if the timer thread needs to be awaken earlier *)
|
||||
if time < next_time then awaken_ timer
|
||||
)
|
||||
|
||||
let after timer delay ~f =
|
||||
assert (delay >= 0.);
|
||||
let now = Unix.gettimeofday () in
|
||||
at timer (now +. delay) ~f
|
||||
|
||||
exception ExitEvery
|
||||
|
||||
let every ?delay timer d ~f =
|
||||
let rec run () =
|
||||
try
|
||||
ignore (f ());
|
||||
schedule()
|
||||
with ExitEvery -> () (* stop *)
|
||||
and schedule () = after timer d ~f:run in
|
||||
match delay with
|
||||
| None -> run()
|
||||
| Some d -> after timer d ~f:run
|
||||
|
||||
(*$R
|
||||
let start = Unix.gettimeofday() in
|
||||
let timer = create() in
|
||||
let res = CCLock.create 0 in
|
||||
let stop = ref 0. in
|
||||
every timer 0.1
|
||||
~f:(fun () ->
|
||||
if CCLock.incr_then_get res > 5 then (
|
||||
stop := Unix.gettimeofday();
|
||||
raise ExitEvery
|
||||
));
|
||||
Thread.delay 0.7;
|
||||
OUnit.assert_equal ~printer:CCInt.to_string 6 (CCLock.get res);
|
||||
OUnit.assert_bool "estimate delay" (abs_float (!stop -. start -. 0.5) < 0.1);
|
||||
*)
|
||||
|
||||
let active timer = not timer.stop
|
||||
|
||||
(** Stop the given timer, cancelling pending tasks *)
|
||||
let stop timer =
|
||||
with_lock_ timer
|
||||
(fun timer ->
|
||||
if not timer.stop then (
|
||||
timer.stop <- true;
|
||||
(* empty heap of tasks *)
|
||||
timer.tasks <- TaskHeap.empty;
|
||||
(* tell the thread to stop *)
|
||||
awaken_ timer;
|
||||
)
|
||||
)
|
||||
|
||||
(*$R
|
||||
(* scenario: n := 1; n := n*4 ; n := n+2; res := n *)
|
||||
let timer = create () in
|
||||
let n = CCLock.create 1 in
|
||||
let res = CCLock.create 0 in
|
||||
after timer 0.3
|
||||
~f:(fun () -> CCLock.update n (fun x -> x+2));
|
||||
ignore (Thread.create
|
||||
(fun _ -> Thread.delay 0.4; CCLock.set res (CCLock.get n)) ());
|
||||
after timer 0.2
|
||||
~f:(fun () -> CCLock.update n (fun x -> x * 4));
|
||||
Thread.delay 0.6 ;
|
||||
OUnit.assert_equal 6 (CCLock.get res);
|
||||
*)
|
||||
43
src/threads/CCTimer.mli
Normal file
43
src/threads/CCTimer.mli
Normal file
|
|
@ -0,0 +1,43 @@
|
|||
|
||||
(* This file is free software, part of containers. See file "license" for more details. *)
|
||||
|
||||
(** {1 Event timer}
|
||||
|
||||
Used to be part of [CCFuture]
|
||||
@since 0.16 *)
|
||||
|
||||
type t
|
||||
(** A scheduler for events. It runs in its own thread. *)
|
||||
|
||||
val create : unit -> t
|
||||
(** A new timer. *)
|
||||
|
||||
val set_exn_handler : t -> (exn -> unit) -> unit
|
||||
(** [set_exn_handler timer f] registers [f] so that any exception
|
||||
raised by a task scheduled in [timer] is given to [f] *)
|
||||
|
||||
exception Stopped
|
||||
|
||||
val after : t -> float -> f:(unit -> _) -> unit
|
||||
(** Call the callback [f] after the given number of seconds.
|
||||
@raise Stopped if the timer was stopped *)
|
||||
|
||||
val at : t -> float -> f:(unit -> _) -> unit
|
||||
(** Create a future that evaluates to [()] at the given Unix timestamp
|
||||
@raise Stopped if the timer was stopped *)
|
||||
|
||||
exception ExitEvery
|
||||
|
||||
val every : ?delay:float -> t -> float -> f:(unit -> _) -> unit
|
||||
(** [every timer n ~f] calls [f ()] every [n] seconds.
|
||||
[f()] can raise ExitEvery to stop the cycle.
|
||||
@param delay if provided, the first call to [f ()] is delayed by
|
||||
that many seconds.
|
||||
@raise Stopped if the timer was stopped *)
|
||||
|
||||
val stop : t -> unit
|
||||
(** Stop the given timer, cancelling pending tasks. Idempotent.
|
||||
From now on, calling most other operations on the timer will raise Stopped. *)
|
||||
|
||||
val active : t -> bool
|
||||
(** Returns [true] until [stop t] has been called. *)
|
||||
Loading…
Add table
Reference in a new issue