delete containers-thread

use moonpool instead!
This commit is contained in:
Simon Cruanes 2023-12-05 13:03:59 -05:00
parent 98ceaac8de
commit bf2375f042
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
16 changed files with 4 additions and 1550 deletions

View file

@ -19,10 +19,10 @@ jobs:
dune-cache: false
- name: Deps
run: opam install odig containers containers-data containers-thread
run: opam install odig containers containers-data
- name: Build
run: opam exec -- odig odoc --cache-dir=_doc/ containers containers-data containers-thread
run: opam exec -- odig odoc --cache-dir=_doc/ containers containers-data
- name: Deploy
uses: peaceiris/actions-gh-pages@v3

View file

@ -31,12 +31,12 @@ jobs:
ocaml-compiler: ${{ matrix.ocaml-compiler }}
dune-cache: true
- run: opam install -t containers containers-data containers-thread --deps-only
- run: opam install -t containers containers-data --deps-only
if: matrix.os == 'ubuntu-latest'
- run: |
opam install -t containers --deps-only ;
opam install containers-data containers-thread --deps-only # no test deps
opam install containers-data --deps-only # no test deps
if: matrix.os != 'ubuntu-latest'
- run: opam exec -- dune build '@install'

View file

@ -1,27 +0,0 @@
opam-version: "2.0"
version: "3.12"
author: "Simon Cruanes"
maintainer: "simon.cruanes.2007@m4x.org"
license: "BSD-2-Clause"
synopsis: "An extension of containers for threading. DEPRECATED: use moonpool, domainslib, etc."
build: [
["dune" "build" "-p" name "-j" jobs]
["dune" "build" "@doc" "-p" name ] {with-doc}
["dune" "runtest" "-p" name "-j" jobs] {with-test}
]
depends: [
"ocaml" { >= "4.08.0" }
"dune" { >= "2.0" }
"base-threads"
"dune-configurator"
"containers" { = version }
"iter" { with-test }
"qcheck-core" {>= "0.18" & with-test}
"uutf" { with-test }
"odoc" { with-doc }
]
tags: [ "containers" "thread" "semaphore" "blocking queue" ]
homepage: "https://github.com/c-cube/ocaml-containers/"
doc: "https://c-cube.github.io/ocaml-containers"
dev-repo: "git+https://github.com/c-cube/ocaml-containers.git"
bug-reports: "https://github.com/c-cube/ocaml-containers/issues/"

View file

@ -1,154 +0,0 @@
(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Blocking Queue} *)
type 'a t = {
q: 'a Queue.t;
lock: Mutex.t;
cond: Condition.t;
capacity: int;
mutable size: int;
}
let create n =
if n < 1 then invalid_arg "BloquingQueue.create";
let q =
{
q = Queue.create ();
lock = Mutex.create ();
cond = Condition.create ();
capacity = n;
size = 0;
}
in
q
let incr_size_ q =
assert (q.size < q.capacity);
q.size <- q.size + 1
let decr_size_ q =
assert (q.size > 0);
q.size <- q.size - 1
let finally_ f x ~h =
try
let res = f x in
ignore (h ());
res
with e ->
ignore (h ());
raise e
let with_lock_ q f =
Mutex.lock q.lock;
finally_ f () ~h:(fun () -> Mutex.unlock q.lock)
let push q x =
with_lock_ q (fun () ->
while q.size = q.capacity do
Condition.wait q.cond q.lock
done;
assert (q.size < q.capacity);
Queue.push x q.q;
(* if there are blocked receivers, awake one of them *)
incr_size_ q;
Condition.broadcast q.cond)
let take q =
with_lock_ q (fun () ->
while q.size = 0 do
Condition.wait q.cond q.lock
done;
let x = Queue.take q.q in
(* if there are blocked senders, awake one of them *)
decr_size_ q;
Condition.broadcast q.cond;
x)
let push_list q l =
(* push elements until it's not possible.
Assumes the lock is acquired. *)
let rec push_ q l =
match l with
| [] -> l
| _ :: _ when q.size = q.capacity -> l (* no room remaining *)
| x :: tl ->
Queue.push x q.q;
incr_size_ q;
push_ q tl
in
(* push chunks of [l] in [q] until [l] is empty *)
let rec aux q l =
match l with
| [] -> ()
| _ :: _ ->
let l =
with_lock_ q (fun () ->
while q.size = q.capacity do
Condition.wait q.cond q.lock
done;
let l = push_ q l in
Condition.broadcast q.cond;
l)
in
aux q l
in
aux q l
let take_list q n =
(* take at most [n] elements of [q] and prepend them to [acc] *)
let rec pop_ acc q n =
if n = 0 || Queue.is_empty q.q then
acc, n
else (
(* take next element *)
let x = Queue.take q.q in
decr_size_ q;
pop_ (x :: acc) q (n - 1)
)
in
(* call [pop_] until [n] elements have been gathered *)
let rec aux acc q n =
if n = 0 then
List.rev acc
else (
let acc, n =
with_lock_ q (fun () ->
while q.size = 0 do
Condition.wait q.cond q.lock
done;
let acc, n = pop_ acc q n in
Condition.broadcast q.cond;
acc, n)
in
aux acc q n
)
in
aux [] q n
let try_take q =
with_lock_ q (fun () ->
if q.size = 0 then
None
else (
decr_size_ q;
Some (Queue.take q.q)
))
let try_push q x =
with_lock_ q (fun () ->
if q.size = q.capacity then
false
else (
incr_size_ q;
Queue.push x q.q;
Condition.signal q.cond;
true
))
let peek q =
with_lock_ q (fun () -> try Some (Queue.peek q.q) with Queue.Empty -> None)
let size q = with_lock_ q (fun () -> q.size)
let capacity q = q.capacity

View file

@ -1,51 +0,0 @@
(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Blocking Queue}
This queue has a limited size. Pushing a value on the queue when it
is full will block.
@since 0.16 *)
[@@@deprecated
"use moonpool or domainslib or saturn, libraries designed for multicore"]
type 'a t
(** Safe-thread queue for values of type ['a] *)
val create : int -> 'a t
(** Create a new queue of size [n]. Using [n=max_int] amounts to using
an infinite queue (2^61 items is a lot to fit in memory); using [n=1]
amounts to using a box with 0 or 1 elements inside.
@raise Invalid_argument if [n < 1]. *)
val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] into [q], blocking if the queue is full. *)
val take : 'a t -> 'a
(** Take the first element, blocking if needed. *)
val push_list : 'a t -> 'a list -> unit
(** Push items of the list, one by one. *)
val take_list : 'a t -> int -> 'a list
(** [take_list n q] takes [n] elements out of [q]. *)
val try_take : 'a t -> 'a option
(** Take the first element if the queue is not empty, return [None]
otherwise. *)
val try_push : 'a t -> 'a -> bool
(** [try_push q x] pushes [x] into [q] if [q] is not full, in which
case it returns [true].
If it fails because [q] is full, it returns [false]. *)
val peek : 'a t -> 'a option
(** [peek q] returns [Some x] if [x] is the first element of [q],
otherwise it returns [None]. *)
val size : _ t -> int
(** Number of elements currently in the queue. *)
val capacity : _ t -> int
(** Number of values the queue can hold. *)

View file

@ -1,113 +0,0 @@
(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Utils around Mutex} *)
type 'a t = { mutex: Mutex.t; mutable content: 'a }
type 'a lock = 'a t
let create content = { mutex = Mutex.create (); content }
let with_lock l f =
Mutex.lock l.mutex;
try
let x = f l.content in
Mutex.unlock l.mutex;
x
with e ->
Mutex.unlock l.mutex;
raise e
let try_with_lock l f =
if Mutex.try_lock l.mutex then (
try
let x = f l.content in
Mutex.unlock l.mutex;
Some x
with e ->
Mutex.unlock l.mutex;
raise e
) else
None
module LockRef = struct
type 'a t = 'a lock
let get t = t.content
let set t x = t.content <- x
let update t f = t.content <- f t.content
end
let with_lock_as_ref l ~f =
Mutex.lock l.mutex;
try
let x = f l in
Mutex.unlock l.mutex;
x
with e ->
Mutex.unlock l.mutex;
raise e
let mutex l = l.mutex
let update l f = with_lock l (fun x -> l.content <- f x)
let update_map l f =
with_lock l (fun x ->
let x', y = f x in
l.content <- x';
y)
let get l =
Mutex.lock l.mutex;
let x = l.content in
Mutex.unlock l.mutex;
x
let set l x =
Mutex.lock l.mutex;
l.content <- x;
Mutex.unlock l.mutex
let incr l = update l Stdlib.succ
let decr l = update l Stdlib.pred
let incr_then_get l =
Mutex.lock l.mutex;
l.content <- l.content + 1;
let x = l.content in
Mutex.unlock l.mutex;
x
let get_then_incr l =
Mutex.lock l.mutex;
let x = l.content in
l.content <- l.content + 1;
Mutex.unlock l.mutex;
x
let decr_then_get l =
Mutex.lock l.mutex;
l.content <- l.content - 1;
let x = l.content in
Mutex.unlock l.mutex;
x
let get_then_decr l =
Mutex.lock l.mutex;
let x = l.content in
l.content <- l.content - 1;
Mutex.unlock l.mutex;
x
let get_then_set l =
Mutex.lock l.mutex;
let x = l.content in
l.content <- true;
Mutex.unlock l.mutex;
x
let get_then_clear l =
Mutex.lock l.mutex;
let x = l.content in
l.content <- false;
Mutex.unlock l.mutex;
x

View file

@ -1,94 +0,0 @@
(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Utils around Mutex}
A value wrapped into a Mutex, for more safety.
@since 0.8 *)
[@@@deprecated
"use moonpool or domainslib or saturn, libraries designed for multicore"]
type 'a t
(** A value surrounded with a lock *)
val create : 'a -> 'a t
(** Create a new protected value. *)
val with_lock : 'a t -> ('a -> 'b) -> 'b
(** [with_lock l f] runs [f x] where [x] is the value protected with
the lock [l], in a critical section. If [f x] fails, [with_lock l f]
fails too but the lock is released. *)
val try_with_lock : 'a t -> ('a -> 'b) -> 'b option
(** [try_with_lock l f] runs [f x] in a critical section if [l] is not
locked. [x] is the value protected by the lock [l]. If [f x]
fails, [try_with_lock l f] fails too but the lock is released.
@since 0.22 *)
(** Type allowing to manipulate the lock as a reference.
@since 0.13 *)
module LockRef : sig
type 'a t
val get : 'a t -> 'a
val set : 'a t -> 'a -> unit
val update : 'a t -> ('a -> 'a) -> unit
end
val with_lock_as_ref : 'a t -> f:('a LockRef.t -> 'b) -> 'b
(** [with_lock_as_ref l f] calls [f] with a reference-like object
that allows to manipulate the value of [l] safely.
The object passed to [f] must not escape the function call.
@since 0.13 *)
val update : 'a t -> ('a -> 'a) -> unit
(** [update l f] replaces the content [x] of [l] with [f x], atomically. *)
val update_map : 'a t -> ('a -> 'a * 'b) -> 'b
(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l]
and returns [y].
@since 0.16 *)
val mutex : _ t -> Mutex.t
(** Underlying mutex. *)
val get : 'a t -> 'a
(** Atomically get the value in the lock. The value that is returned
isn't protected! *)
val set : 'a t -> 'a -> unit
(** Atomically set the value.
@since 0.13 *)
val incr : int t -> unit
(** Atomically increment the value.
@since 0.13 *)
val decr : int t -> unit
(** Atomically decrement the value.
@since 0.13 *)
val incr_then_get : int t -> int
(** [incr_then_get x] increments [x], and returns its new value.
@since 0.16 *)
val get_then_incr : int t -> int
(** [get_then_incr x] increments [x], and returns its previous value.
@since 0.16 *)
val decr_then_get : int t -> int
(** [decr_then_get x] decrements [x], and returns its new value.
@since 0.16 *)
val get_then_decr : int t -> int
(** [get_then_decr x] decrements [x], and returns its previous value.
@since 0.16 *)
val get_then_set : bool t -> bool
(** [get_then_set b] sets [b] to [true], and returns the old value.
@since 0.16 *)
val get_then_clear : bool t -> bool
(** [get_then_clear b] sets [b] to [false], and returns the old value.
@since 0.16 *)

View file

@ -1,529 +0,0 @@
(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Thread Pool, and Futures} *)
type +'a state = Done of 'a | Waiting | Failed of exn
module type PARAM = sig
val max_size : int
(** Maximum number of threads in the pool *)
end
exception Stopped
(** {2 Thread pool} *)
module Make (P : PARAM) = struct
type job =
| Job1 : ('a -> _) * 'a -> job
| Job2 : ('a -> 'b -> _) * 'a * 'b -> job
| Job3 : ('a -> 'b -> 'c -> _) * 'a * 'b * 'c -> job
| Job4 : ('a -> 'b -> 'c -> 'd -> _) * 'a * 'b * 'c * 'd -> job
type t = {
mutable stop: bool; (* indicate that threads should stop *)
mutable exn_handler: exn -> unit;
mutex: Mutex.t;
cond: Condition.t;
jobs: job Queue.t; (* waiting jobs *)
mutable cur_size: int; (* total number of threads *)
mutable cur_idle: int; (* number of idle threads *)
}
(** Dynamic, growable thread pool *)
let nop_ _ = ()
(* singleton pool *)
let pool =
{
stop = false;
exn_handler = nop_;
cond = Condition.create ();
cur_size = 0;
cur_idle = 0;
(* invariant: cur_idle <= cur_size *)
jobs = Queue.create ();
mutex = Mutex.create ();
}
let set_exn_handler f = pool.exn_handler <- f
let[@inline] with_lock_ t f =
Mutex.lock t.mutex;
try
let x = f t in
Mutex.unlock t.mutex;
x
with e ->
Mutex.unlock t.mutex;
raise e
let incr_size_ p = p.cur_size <- p.cur_size + 1
let decr_size_ p = p.cur_size <- p.cur_size - 1
let incr_idle_ p = p.cur_idle <- p.cur_idle + 1
let decr_idle_ p = p.cur_idle <- p.cur_idle - 1
(* next thing a thread should do *)
type command =
| Process of job
| Wait
(* wait on condition *)
| Die
(* thread has no work to do *)
(* thread: seek what to do next (including dying).
Assumes the pool is locked. *)
let get_next_ pool =
(*Printf.printf "get_next (cur=%d, idle=%d, stop=%B)\n%!" pool.cur_size pool.cur_idle pool.stop;*)
if pool.stop then
Die
else (
match Queue.take pool.jobs with
| exception Queue.Empty ->
if pool.cur_idle > 0 then
(* die: there's already at least one idle thread *)
(*Printf.printf "DIE (idle>0)\n%!";*)
Die
else
(*Printf.printf "WAIT\n%!";*)
Wait
| job -> Process job
)
(* heuristic criterion for starting a new thread. *)
let[@inline] can_start_thread_ p = p.cur_size < P.max_size
(* Thread: entry point. They seek jobs in the queue *)
let rec serve pool =
assert (pool.cur_size <= P.max_size);
assert (pool.cur_size > 0);
Mutex.lock pool.mutex;
let cmd = get_next_ pool in
maybe_start_runner_ pool;
run_cmd pool cmd
(* run a command *)
and run_cmd pool = function
| Die ->
decr_size_ pool;
Mutex.unlock pool.mutex;
()
| Wait ->
(*Printf.printf "WAIT\n%!";*)
incr_idle_ pool;
Condition.wait pool.cond pool.mutex;
decr_idle_ pool;
Mutex.unlock pool.mutex;
serve pool
| Process job ->
Mutex.unlock pool.mutex;
run_job pool job
(* execute the job *)
and run_job pool job =
match job with
| Job1 (f, x) ->
(try ignore (f x) with e -> pool.exn_handler e);
serve pool
| Job2 (f, x, y) ->
(try ignore (f x y) with e -> pool.exn_handler e);
serve pool
| Job3 (f, x, y, z) ->
(try ignore (f x y z) with e -> pool.exn_handler e);
serve pool
| Job4 (f, x, y, z, w) ->
(try ignore (f x y z w) with e -> pool.exn_handler e);
serve pool
and maybe_start_runner_ pool : unit =
if (not (Queue.is_empty pool.jobs)) && can_start_thread_ pool then (
(* there's room for another thread to start processing jobs,
starting with [Queue.pop pool.jobs] *)
let job' = Queue.pop pool.jobs in
launch_worker_on_ pool job'
)
and[@inline] launch_worker_on_ pool job =
incr_size_ pool;
ignore (Thread.create (run_job pool) job)
let run_job job =
(* acquire lock and push job in queue, or start thread directly
if the queue is empty *)
with_lock_ pool (fun pool ->
if pool.stop then raise Stopped;
if
Queue.is_empty pool.jobs && can_start_thread_ pool
&& pool.cur_idle = 0
then
(* create the thread now, on [job], since no other job in
the queue takes precedence.
We do not want to wait for the busy threads to do our task
if we are allowed to spawn a new thread, and no thread is
just idle waiting for new jobs. *)
launch_worker_on_ pool job
else if pool.cur_idle > 0 then (
(* at least one idle thread, wake it up *)
Queue.push job pool.jobs;
Condition.broadcast pool.cond (* wake up some worker *)
) else (
Queue.push job pool.jobs;
(* we might still be able to start another thread to help the
active ones (none is idle). This thread is not necessarily
going to process [job], but rather the first job in the queue *)
if can_start_thread_ pool then (
let job' = Queue.pop pool.jobs in
launch_worker_on_ pool job'
)
))
(* run the function on the argument in the given pool *)
let run1 f x = run_job (Job1 (f, x))
let run f = run1 f ()
let run2 f x y = run_job (Job2 (f, x, y))
let run3 f x y z = run_job (Job3 (f, x, y, z))
let run4 f x y z w = run_job (Job4 (f, x, y, z, w))
let active () = not pool.stop
(* kill threads in the pool *)
let stop () =
with_lock_ pool (fun p ->
p.stop <- true;
Queue.clear p.jobs;
Condition.broadcast p.cond (* wait up idlers *))
(* stop threads if pool is GC'd *)
let () = Gc.finalise (fun _ -> stop ()) pool
(** {6 Futures} *)
module Fut = struct
type 'a handler = 'a state -> unit
type 'a cell = {
mutable state: 'a state;
mutable handlers: 'a handler list; (* handlers *)
f_mutex: Mutex.t;
condition: Condition.t;
}
(** A proper future, with a delayed computation *)
(** A future value of type 'a *)
type 'a t = Return of 'a | FailNow of exn | Run of 'a cell
type 'a future = 'a t
(** {2 Basic Future functions} *)
let return x = Return x
let fail e = FailNow e
let create_cell () =
{
state = Waiting;
handlers = [];
f_mutex = Mutex.create ();
condition = Condition.create ();
}
let with_lock_ cell f =
Mutex.lock cell.f_mutex;
try
let x = f cell in
Mutex.unlock cell.f_mutex;
x
with e ->
Mutex.unlock cell.f_mutex;
raise e
(* TODO: exception handler for handler errors *)
let set_done_ cell x =
with_lock_ cell (fun cell ->
match cell.state with
| Waiting ->
(* set state and signal *)
cell.state <- Done x;
Condition.broadcast cell.condition;
List.iter
(fun f -> try f cell.state with e -> pool.exn_handler e)
cell.handlers
| _ -> assert false)
let set_fail_ cell e =
with_lock_ cell (fun cell ->
match cell.state with
| Waiting ->
cell.state <- Failed e;
Condition.broadcast cell.condition;
List.iter
(fun f -> try f cell.state with e -> pool.exn_handler e)
cell.handlers
| _ -> assert false)
(* calls [f x], and put result or exception in [cell] *)
let run_and_set1 cell f x =
try
let y = f x in
set_done_ cell y
with e -> set_fail_ cell e
let run_and_set2 cell f x y =
try
let z = f x y in
set_done_ cell z
with e -> set_fail_ cell e
let make1 f x =
let cell = create_cell () in
run3 run_and_set1 cell f x;
Run cell
let make f = make1 f ()
let make2 f x y =
let cell = create_cell () in
run4 run_and_set2 cell f x y;
Run cell
let get = function
| Return x -> x
| FailNow e -> raise e
| Run cell ->
let rec get_ cell =
match cell.state with
| Waiting ->
Condition.wait cell.condition cell.f_mutex;
(* wait *)
get_ cell
| Done x -> x
| Failed e -> raise e
in
with_lock_ cell get_
(* access the result without locking *)
let get_nolock_ = function
| Return x | Run { state = Done x; _ } -> x
| FailNow _ | Run { state = Failed _ | Waiting; _ } -> assert false
let state = function
| Return x -> Done x
| FailNow e -> Failed e
| Run cell -> with_lock_ cell (fun cell -> cell.state)
let is_not_waiting = function
| Waiting -> false
| Failed _ | Done _ -> true
let is_done = function
| Return _ | FailNow _ -> true
| Run cell -> with_lock_ cell (fun c -> is_not_waiting c.state)
(** {2 Combinators *)
let add_handler_ cell f =
with_lock_ cell (fun cell ->
match cell.state with
| Waiting -> cell.handlers <- f :: cell.handlers
| Done _ | Failed _ -> f cell.state)
let on_finish fut k =
match fut with
| Return x -> k (Done x)
| FailNow e -> k (Failed e)
| Run cell -> add_handler_ cell k
let on_success fut k =
on_finish fut (function
| Done x -> k x
| _ -> ())
let on_failure fut k =
on_finish fut (function
| Failed e -> k e
| _ -> ())
let map_cell_ ~async f cell ~into:cell' =
add_handler_ cell (function
| Done x ->
if async then
run3 run_and_set1 cell' f x
else
run_and_set1 cell' f x
| Failed e -> set_fail_ cell' e
| Waiting -> assert false);
Run cell'
let map_ ~async f fut =
match fut with
| Return x ->
if async then
make1 f x
else
Return (f x)
| FailNow e -> FailNow e
| Run cell -> map_cell_ ~async f cell ~into:(create_cell ())
let map f fut = map_ ~async:false f fut
let map_async f fut = map_ ~async:true f fut
let app_ ~async f x =
match f, x with
| Return f, Return x ->
if async then
make1 f x
else
Return (f x)
| FailNow e, _ | _, FailNow e -> FailNow e
| Return f, Run x ->
map_cell_ ~async (fun x -> f x) x ~into:(create_cell ())
| Run f, Return x ->
map_cell_ ~async (fun f -> f x) f ~into:(create_cell ())
| Run f, Run x ->
let cell' = create_cell () in
add_handler_ f (function
| Done f -> ignore (map_cell_ ~async f x ~into:cell')
| Failed e -> set_fail_ cell' e
| Waiting -> assert false);
Run cell'
let app f x = app_ ~async:false f x
let app_async f x = app_ ~async:true f x
let monoid_product f x y =
match x, y with
| Return x, Return y -> Return (f x y)
| FailNow e, _ | _, FailNow e -> FailNow e
| Return x, Run y ->
map_cell_ ~async:false (fun y -> f x y) y ~into:(create_cell ())
| Run x, Return y ->
map_cell_ ~async:false (fun x -> f x y) x ~into:(create_cell ())
| Run x, Run y ->
let cell' = create_cell () in
add_handler_ x (function
| Done x ->
ignore (map_cell_ ~async:false (fun y -> f x y) y ~into:cell')
| Failed e -> set_fail_ cell' e
| Waiting -> assert false);
Run cell'
let flat_map f fut =
match fut with
| Return x -> f x
| FailNow e -> FailNow e
| Run cell ->
let cell' = create_cell () in
add_handler_ cell (function
| Done x ->
let fut' = f x in
on_finish fut' (function
| Done y -> set_done_ cell' y
| Failed e -> set_fail_ cell' e
| Waiting -> assert false)
| Failed e -> set_fail_ cell' e
| Waiting -> assert false);
Run cell'
let and_then fut f = flat_map (fun _ -> f ()) fut
type _ array_or_list =
| A_ : 'a array -> 'a array_or_list
| L_ : 'a list -> 'a array_or_list
let iter_aol : type a. a array_or_list -> (a -> unit) -> unit =
fun aol f ->
match aol with
| A_ a -> Array.iter f a
| L_ l -> List.iter f l
(* [sequence_ l f] returns a future that waits for every element of [l]
to return of fail, and call [f ()] to obtain the result (as a closure)
in case every element succeeded (otherwise a failure is
returned automatically) *)
let sequence_ : type a res. a t array_or_list -> (unit -> res) -> res t =
fun aol f ->
let n =
match aol with
| A_ a -> Array.length a
| L_ l -> List.length l
in
assert (n > 0);
let cell = create_cell () in
let n_err = CCLock.create 0 in
(* number of failed threads *)
let n_ok = CCLock.create 0 in
(* number of succeeding threads *)
iter_aol aol (fun fut ->
on_finish fut (function
| Failed e ->
let x = CCLock.incr_then_get n_err in
(* if first failure, then seal [cell]'s fate now *)
if x = 1 then set_fail_ cell e
| Done _ ->
let x = CCLock.incr_then_get n_ok in
(* if [n] successes, then [cell] succeeds. Otherwise, some
job has not finished or some job has failed. *)
if x = n then (
let res = f () in
set_done_ cell res
)
| Waiting -> assert false));
Run cell
(* map an array of futures to a future array *)
let sequence_a a =
match a with
| [||] -> return [||]
| [| x |] -> map (fun x -> [| x |]) x
| _ -> sequence_ (A_ a) (fun () -> Array.map get_nolock_ a)
let map_a f a = sequence_a (Array.map f a)
let sequence_l l =
match l with
| [] -> return []
| _ :: _ ->
let l = List.rev l in
sequence_ (L_ l) (fun () -> List.rev_map get_nolock_ l)
(* reverse twice *)
let map_l f l =
match l with
| [] -> return []
| _ ->
let l = List.rev_map f l in
sequence_ (L_ l) (fun () -> List.rev_map get_nolock_ l)
let choose_ : type a. a t array_or_list -> a t =
fun aol ->
let cell = create_cell () in
let is_done = CCLock.create false in
iter_aol aol (fun fut ->
on_finish fut (fun res ->
match res with
| Waiting -> assert false
| Done x ->
let was_done = CCLock.get_then_clear is_done in
if not was_done then set_done_ cell x
| Failed e ->
let was_done = CCLock.get_then_clear is_done in
if not was_done then set_fail_ cell e));
Run cell
let choose_a a = choose_ (A_ a)
let choose_l l = choose_ (L_ l)
let sleep time = make1 Thread.delay time
module Infix = struct
let ( >>= ) x f = flat_map f x
let ( >> ) a f = and_then a f
let ( >|= ) a f = map f a
let ( <*> ) = app
let ( let+ ) = ( >|= )
let ( let* ) = ( >>= )
let[@inline] ( and+ ) a1 a2 = monoid_product (fun x y -> x, y) a1 a2
let ( and* ) = ( and+ )
end
include Infix
end
end

View file

@ -1,162 +0,0 @@
(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Thread Pool, and Futures}
Renamed and heavily updated from [CCFuture].
@since 0.16 *)
[@@@deprecated
"use moonpool or domainslib or saturn, libraries designed for multicore"]
type +'a state = Done of 'a | Waiting | Failed of exn
module type PARAM = sig
val max_size : int
(** Maximum number of threads in the pool. *)
end
exception Stopped
[@@@ocaml.warning "-67"]
(** {2 Create a new Pool} *)
module Make (P : PARAM) : sig
val run : (unit -> _) -> unit
(** [run f] schedules [f] for being executed in the thread pool. *)
val run1 : ('a -> _) -> 'a -> unit
(** [run1 f x] is similar to [run (fun () -> f x)]. *)
val run2 : ('a -> 'b -> _) -> 'a -> 'b -> unit
val run3 : ('a -> 'b -> 'c -> _) -> 'a -> 'b -> 'c -> unit
val set_exn_handler : (exn -> unit) -> unit
val active : unit -> bool
(** [active ()] is true as long as [stop()] has not been called yet. *)
val stop : unit -> unit
(** After calling [stop ()], most functions will raise Stopped.
This has the effect of preventing new tasks from being executed. *)
(** {4 Futures}
The futures are registration points for callbacks, storing a {!state},
that are executed in the pool using {!run}. *)
module Fut : sig
type 'a t
(** A future value of type ['a] *)
type 'a future = 'a t
(** {2 Constructors} *)
val return : 'a -> 'a t
(** Future that is already computed. *)
val fail : exn -> 'a t
(** Future that fails immediately. *)
val make : (unit -> 'a) -> 'a t
(** Create a future, representing a value that will be computed by
the function. If the function raises, the future will fail. *)
val make1 : ('a -> 'b) -> 'a -> 'b t
val make2 : ('a -> 'b -> 'c) -> 'a -> 'b -> 'c t
(** {2 Basics} *)
val get : 'a t -> 'a
(** Blocking get: wait for the future to be evaluated, and get the value,
or the exception that failed the future is returned.
Raise e if the future failed with e. *)
val state : 'a t -> 'a state
(** State of the future. *)
val is_done : 'a t -> bool
(** Is the future evaluated (success/failure)? *)
(** {2 Combinators} *)
val on_success : 'a t -> ('a -> unit) -> unit
(** Attach a handler to be called upon success.
The handler should not call functions on the future.
Might be evaluated now if the future is already done. *)
val on_failure : _ t -> (exn -> unit) -> unit
(** Attach a handler to be called upon failure.
The handler should not call any function on the future.
Might be evaluated now if the future is already done. *)
val on_finish : 'a t -> ('a state -> unit) -> unit
(** Attach a handler to be called when the future is evaluated.
The handler should not call functions on the future.
Might be evaluated now if the future is already done. *)
val flat_map : ('a -> 'b t) -> 'a t -> 'b t
(** Monadic combination of futures. *)
val and_then : 'a t -> (unit -> 'b t) -> 'b t
(** Wait for the first future to succeed, then launch the second. *)
val sequence_a : 'a t array -> 'a array t
(** Future that waits for all previous futures to terminate. If any future
in the array fails, [sequence_a l] fails too. *)
val map_a : ('a -> 'b t) -> 'a array -> 'b array t
(** [map_a f a] maps [f] on every element of [a], and will return
the array of every result if all calls succeed, or an error otherwise. *)
val sequence_l : 'a t list -> 'a list t
(** Future that waits for all previous futures to terminate. If any future
in the list fails, [sequence_l l] fails too. *)
val map_l : ('a -> 'b t) -> 'a list -> 'b list t
(** [map_l f l] maps [f] on every element of [l], and will return
the list of every result if all calls succeed, or an error otherwise. *)
val choose_a : 'a t array -> 'a t
(** Choose among those futures (the first to terminate). Behaves like
the first future that terminates, by failing if the future fails. *)
val choose_l : 'a t list -> 'a t
(** Choose among those futures (the first to terminate). Behaves like
the first future that terminates, by failing if the future fails. *)
val map : ('a -> 'b) -> 'a t -> 'b t
(** Map the value inside the future. The function doesn't run in its
own task; if it can take time, use {!flat_map} or {!map_async}. *)
val map_async : ('a -> 'b) -> 'a t -> 'b t
(** Map the value inside the future, to be computed in a separated job. *)
val monoid_product : ('a -> 'b -> 'c) -> 'a t -> 'b t -> 'c t
(** Cartesian product of the content of these futures.
@since 2.8 *)
val app : ('a -> 'b) t -> 'a t -> 'b t
(** [app f x] applies the result of [f] to the result of [x]. *)
val app_async : ('a -> 'b) t -> 'a t -> 'b t
(** [app_async f x] applies the result of [f] to the result of [x], in
a separated job scheduled in the pool. *)
val sleep : float -> unit t
(** Future that returns with success in the given amount of seconds. Blocks
the thread! If you need to wait on many events, consider
using {!CCTimer}. *)
module Infix : sig
val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t
val ( >> ) : 'a t -> (unit -> 'b t) -> 'b t
val ( >|= ) : 'a t -> ('a -> 'b) -> 'b t
val ( <*> ) : ('a -> 'b) t -> 'a t -> 'b t
val ( let+ ) : 'a t -> ('a -> 'b) -> 'b t
val ( and+ ) : 'a t -> 'b t -> ('a * 'b) t
val ( let* ) : 'a t -> ('a -> 'b t) -> 'b t
val ( and* ) : 'a t -> 'b t -> ('a * 'b) t
end
include module type of Infix
end
end

View file

@ -1,53 +0,0 @@
(** {1 Semaphores} *)
type t = { mutable n: int; mutex: Mutex.t; cond: Condition.t }
let create n =
if n <= 0 then invalid_arg "Semaphore.create";
{ n; mutex = Mutex.create (); cond = Condition.create () }
let get t = t.n
(* assume [t.mutex] locked, try to acquire [t] *)
let acquire_once_locked_ m t =
while t.n < m do
Condition.wait t.cond t.mutex
done;
assert (t.n >= m);
t.n <- t.n - m;
Condition.broadcast t.cond;
Mutex.unlock t.mutex
let acquire m t =
Mutex.lock t.mutex;
acquire_once_locked_ m t
(* assume [t.mutex] locked, try to release [t] *)
let release_once_locked_ m t =
t.n <- t.n + m;
Condition.broadcast t.cond;
Mutex.unlock t.mutex
let release m t =
Mutex.lock t.mutex;
release_once_locked_ m t;
()
let with_acquire ~n t ~f =
acquire n t;
try
let x = f () in
release n t;
x
with e ->
release n t;
raise e
let wait_until_at_least ~n t ~f =
Mutex.lock t.mutex;
while t.n < n do
Condition.wait t.cond t.mutex
done;
assert (t.n >= n);
Mutex.unlock t.mutex;
f ()

View file

@ -1,33 +0,0 @@
(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Semaphores}
@since 0.13 *)
[@@@deprecated "use the stdlib's Semaphore module"]
type t
(** A semaphore *)
val create : int -> t
(** [create n] creates a semaphore with initial value [n].
@raise Invalid_argument if [n <= 0]. *)
val get : t -> int
(** Current value. *)
val acquire : int -> t -> unit
(** [acquire n s] blocks until [get s >= n], then atomically
sets [s := !s - n]. *)
val release : int -> t -> unit
(** [release n s] atomically sets [s := !s + n]. *)
val with_acquire : n:int -> t -> f:(unit -> 'a) -> 'a
(** [with_acquire ~n s ~f] first acquires [s] with [n] units,
calls [f ()], and then releases [s] with [n] units.
Safely release the semaphore even if [f ()] fails. *)
val wait_until_at_least : n:int -> t -> f:(unit -> 'a) -> 'a
(** [wait_until_at_least ~n s ~f] waits until [get s >= n], then calls [f ()]
and returns its result. Doesn't modify the semaphore. *)

View file

@ -1,52 +0,0 @@
(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Threads} *)
type t = Thread.t
let spawn f = Thread.create f ()
let spawn1 f x = Thread.create f x
let spawn2 f x y = Thread.create (fun () -> f x y) ()
let detach f = ignore (Thread.create f ())
let finally_ f x ~h =
try
let res = f x in
ignore (h ());
res
with e ->
ignore (h ());
raise e
module Arr = struct
let spawn n f = Array.init n (fun i -> Thread.create f i)
let join a = Array.iter Thread.join a
end
module Barrier = struct
type t = { lock: Mutex.t; cond: Condition.t; mutable activated: bool }
let create () =
{ lock = Mutex.create (); cond = Condition.create (); activated = false }
let with_lock_ b f =
Mutex.lock b.lock;
finally_ f () ~h:(fun () -> Mutex.unlock b.lock)
let reset b = with_lock_ b (fun () -> b.activated <- false)
let wait b =
with_lock_ b (fun () ->
while not b.activated do
Condition.wait b.cond b.lock
done)
let activate b =
with_lock_ b (fun () ->
if not b.activated then (
b.activated <- true;
Condition.broadcast b.cond
))
let activated b = with_lock_ b (fun () -> b.activated)
end

View file

@ -1,61 +0,0 @@
(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Threads}
{b status: unstable}
@since 0.13 *)
[@@@deprecated
"use moonpool or domainslib or saturn, libraries designed for multicore"]
type t = Thread.t
val spawn : (unit -> _) -> t
(** [spawn f] creates a new thread that runs [f ()]. *)
val spawn1 : ('a -> _) -> 'a -> t
(** [spawn1 f x] is like [spawn (fun () -> f x)].
@since 0.16 *)
val spawn2 : ('a -> 'b -> _) -> 'a -> 'b -> t
(** [spawn2 f x y] is like [spawn (fun () -> f x y)].
@since 0.16 *)
val detach : (unit -> 'a) -> unit
(** [detach f] is the same as [ignore (spawn f)]. *)
(** {2 Array of threads} *)
module Arr : sig
val spawn : int -> (int -> 'a) -> t array
(** [Arr.spawn n f] creates an array [res] of length [n], such that
[res.(i) = spawn (fun () -> f i)]. *)
val join : t array -> unit
(** [Arr.join a] joins every thread in [a]. *)
end
(** {2 Single-Use Barrier} *)
module Barrier : sig
type t
(** Barrier, used to synchronize threads *)
val create : unit -> t
(** Create a barrier. *)
val reset : t -> unit
(** Reset to initial (non-triggered) state. *)
val wait : t -> unit
(** [wait b] waits for barrier [b] to be activated by [activate b].
All threads calling this wait until [activate b] is called.
If [b] is already activated, [wait b] does nothing. *)
val activate : t -> unit
(** [activate b] unblocks all threads that were waiting on [b]. *)
val activated : t -> bool
(** [activated b] returns [true] iff [activate b] was called, and [reset b]
was not called since. In other words, [activated b = true] means
[wait b] will not block. *)
end

View file

@ -1,162 +0,0 @@
type job = Job : float * (unit -> 'a) -> job
let ( <= ) (a : float) b = Stdlib.( <= ) a b
let ( >= ) (a : float) b = Stdlib.( >= ) a b
let ( < ) (a : float) b = Stdlib.( < ) a b
let ( > ) (a : float) b = Stdlib.( > ) a b
module TaskHeap = CCHeap.Make (struct
type t = job
let leq (Job (f1, _)) (Job (f2, _)) = f1 <= f2
end)
exception Stopped
type t = {
mutable stop: bool;
mutable tasks: TaskHeap.t;
mutable exn_handler: exn -> unit;
t_mutex: Mutex.t;
fifo_in: Unix.file_descr;
fifo_out: Unix.file_descr;
}
let set_exn_handler timer f = timer.exn_handler <- f
let standby_wait = 10.
(* when no task is scheduled, this is the amount of time that is waited
in a row for something to happen. This is also the maximal delay
between the call to {!stop} and the actual termination of the
thread. *)
let epsilon = 0.0001
(* accepted time diff for actions. *)
let with_lock_ t f =
Mutex.lock t.t_mutex;
try
let x = f t in
Mutex.unlock t.t_mutex;
x
with e ->
Mutex.unlock t.t_mutex;
raise e
type command = Quit | Run : (unit -> _) -> command | Wait of float
let pop_task_ t =
let tasks, _ = TaskHeap.take_exn t.tasks in
t.tasks <- tasks
let call_ timer f = try ignore (f ()) with e -> timer.exn_handler e
(* check next task *)
let next_task_ timer =
match TaskHeap.find_min timer.tasks with
| _ when timer.stop -> Quit
| None -> Wait standby_wait
| Some (Job (time, f)) ->
let now = Unix.gettimeofday () in
if now +. epsilon > time then (
(* now! *)
pop_task_ timer;
Run f
) else
Wait (time -. now)
(* The main thread function: wait for next event, run it, and loop *)
let serve timer =
let buf = Bytes.make 1 '_' in
(* acquire lock, call [process_task] and do as it commands *)
let rec next () =
match with_lock_ timer next_task_ with
| Quit -> ()
| Run f ->
call_ timer f;
(* call outside of any lock *)
next ()
| Wait delay -> wait delay
(* wait for [delay] seconds, or until something happens on [fifo_in] *)
and wait delay =
ignore (Unix.select [ timer.fifo_in ] [] [] delay : _ * _ * _);
(* remove char from fifo, so that next write can happen *)
(try ignore (Unix.read timer.fifo_in buf 0 1 : int) with _ -> ());
next ()
in
next ()
let nop_handler_ _ = ()
let create () =
let fifo_in, fifo_out = Unix.pipe () in
Unix.set_nonblock fifo_in;
let timer =
{
stop = false;
tasks = TaskHeap.empty;
exn_handler = nop_handler_;
t_mutex = Mutex.create ();
fifo_in;
fifo_out;
}
in
(* start a thread to process tasks *)
let _t = Thread.create serve timer in
timer
let underscore_ = Bytes.make 1 '_'
(* awake the thread *)
let awaken_ timer = ignore (Unix.single_write timer.fifo_out underscore_ 0 1)
(** [at s t ~f] will run [f ()] at the Unix echo [t] *)
let at timer time ~f =
if timer.stop then raise Stopped;
let now = Unix.gettimeofday () in
if now >= time then
call_ timer f
else
with_lock_ timer (fun timer ->
if timer.stop then raise Stopped;
(* time of the next scheduled event *)
let next_time =
match TaskHeap.find_min timer.tasks with
| None -> max_float
| Some (Job (d, _)) -> d
in
(* insert task *)
timer.tasks <- TaskHeap.insert (Job (time, f)) timer.tasks;
(* see if the timer thread needs to be awaken earlier *)
if time < next_time then awaken_ timer)
let after timer delay ~f =
assert (delay >= 0.);
let now = Unix.gettimeofday () in
at timer (now +. delay) ~f
exception ExitEvery
let every ?delay timer d ~f =
let rec run () =
try
ignore (f ());
schedule ()
with ExitEvery -> ()
(* stop *)
and schedule () = after timer d ~f:run in
match delay with
| None -> run ()
| Some d -> after timer d ~f:run
let active timer = not timer.stop
(** Stop the given timer, cancelling pending tasks *)
let stop timer =
with_lock_ timer (fun timer ->
if not timer.stop then (
timer.stop <- true;
(* empty heap of tasks *)
timer.tasks <- TaskHeap.empty;
(* tell the thread to stop *)
awaken_ timer
))

View file

@ -1,43 +0,0 @@
(** Event timer
Used to be part of [CCFuture].
@since 0.16 *)
[@@@deprecated
"use moonpool or domainslib or saturn, libraries designed for multicore"]
type t
(** A scheduler for events. It runs in its own thread. *)
val create : unit -> t
(** A new timer. *)
val set_exn_handler : t -> (exn -> unit) -> unit
(** [set_exn_handler timer f] registers [f] so that any exception
raised by a task scheduled in [timer] is given to [f]. *)
exception Stopped
val after : t -> float -> f:(unit -> _) -> unit
(** Call the callback [f] after the given number of seconds.
@raise Stopped if the timer was stopped. *)
val at : t -> float -> f:(unit -> _) -> unit
(** Create a future that evaluates to [()] at the given Unix timestamp.
@raise Stopped if the timer was stopped. *)
exception ExitEvery
val every : ?delay:float -> t -> float -> f:(unit -> _) -> unit
(** [every timer n ~f] calls [f ()] every [n] seconds.
[f()] can raise ExitEvery to stop the cycle.
@param delay if provided, the first call to [f ()] is delayed by
that many seconds.
@raise Stopped if the timer was stopped. *)
val stop : t -> unit
(** Stop the given timer, cancelling pending tasks. Idempotent.
From now on, calling most other operations on the timer will raise Stopped. *)
val active : t -> bool
(** Return [true] until [stop t] has been called. *)

View file

@ -1,12 +0,0 @@
(library
(name containers_thread)
(public_name containers-thread)
(synopsis "DEPRECATED library for threading")
(wrapped false)
(optional)
(flags :standard -warn-error -a+8 -w -32 -safe-string)
(preprocess
(action
(run %{project_root}/src/core/cpp/cpp.exe %{input-file})))
(libraries containers threads))