reindent in containers.thread

This commit is contained in:
Simon Cruanes 2017-01-25 00:24:14 +01:00
parent 59208fd9c6
commit 9a46b4527c
5 changed files with 228 additions and 228 deletions

View file

@ -41,26 +41,26 @@ let with_lock_ q f =
let push q x = let push q x =
with_lock_ q with_lock_ q
(fun () -> (fun () ->
while q.size = q.capacity do while q.size = q.capacity do
Condition.wait q.cond q.lock Condition.wait q.cond q.lock
done; done;
assert (q.size < q.capacity); assert (q.size < q.capacity);
Queue.push x q.q; Queue.push x q.q;
(* if there are blocked receivers, awake one of them *) (* if there are blocked receivers, awake one of them *)
incr_size_ q; incr_size_ q;
Condition.broadcast q.cond) Condition.broadcast q.cond)
let take q = let take q =
with_lock_ q with_lock_ q
(fun () -> (fun () ->
while q.size = 0 do while q.size = 0 do
Condition.wait q.cond q.lock Condition.wait q.cond q.lock
done; done;
let x = Queue.take q.q in let x = Queue.take q.q in
(* if there are blocked senders, awake one of them *) (* if there are blocked senders, awake one of them *)
decr_size_ q; decr_size_ q;
Condition.broadcast q.cond; Condition.broadcast q.cond;
x) x)
(*$R (*$R
let q = create 1 in let q = create 1 in
@ -83,22 +83,22 @@ let push_list q l =
| [] -> l | [] -> l
| _::_ when q.size = q.capacity -> l (* no room remaining *) | _::_ when q.size = q.capacity -> l (* no room remaining *)
| x :: tl -> | x :: tl ->
Queue.push x q.q; Queue.push x q.q;
incr_size_ q; incr_size_ q;
push_ q tl push_ q tl
in in
(* push chunks of [l] in [q] until [l] is empty *) (* push chunks of [l] in [q] until [l] is empty *)
let rec aux q l = match l with let rec aux q l = match l with
| [] -> () | [] -> ()
| _::_ -> | _::_ ->
let l = with_lock_ q let l = with_lock_ q
(fun () -> (fun () ->
while q.size = q.capacity do while q.size = q.capacity do
Condition.wait q.cond q.lock Condition.wait q.cond q.lock
done; done;
let l = push_ q l in let l = push_ q l in
Condition.broadcast q.cond; Condition.broadcast q.cond;
l) l)
in in
aux q l aux q l
in aux q l in aux q l
@ -118,14 +118,14 @@ let take_list q n =
if n=0 then List.rev acc if n=0 then List.rev acc
else else
let acc, n = with_lock_ q let acc, n = with_lock_ q
(fun () -> (fun () ->
while q.size = 0 do while q.size = 0 do
Condition.wait q.cond q.lock Condition.wait q.cond q.lock
done; done;
let acc, n = pop_ acc q n in let acc, n = pop_ acc q n in
Condition.broadcast q.cond; Condition.broadcast q.cond;
acc, n acc, n
) )
in in
aux acc q n aux acc q n
in in
@ -163,28 +163,28 @@ let take_list q n =
let try_take q = let try_take q =
with_lock_ q with_lock_ q
(fun () -> (fun () ->
if q.size = 0 then None if q.size = 0 then None
else ( else (
decr_size_ q; decr_size_ q;
Some (Queue.take q.q) Some (Queue.take q.q)
)) ))
let try_push q x = let try_push q x =
with_lock_ q with_lock_ q
(fun () -> (fun () ->
if q.size = q.capacity then false if q.size = q.capacity then false
else ( else (
incr_size_ q; incr_size_ q;
Queue.push x q.q; Queue.push x q.q;
Condition.signal q.cond; Condition.signal q.cond;
true true
)) ))
let peek q = let peek q =
with_lock_ q with_lock_ q
(fun () -> (fun () ->
try Some (Queue.peek q.q) try Some (Queue.peek q.q)
with Queue.Empty -> None) with Queue.Empty -> None)
let size q = with_lock_ q (fun () -> q.size) let size q = with_lock_ q (fun () -> q.size)

View file

@ -91,18 +91,18 @@ let update l f =
(*$T (*$T
let l = create 5 in update l (fun x->x+1); get l = 6 let l = create 5 in update l (fun x->x+1); get l = 6
*) *)
let update_map l f = let update_map l f =
with_lock l with_lock l
(fun x -> (fun x ->
let x', y = f x in let x', y = f x in
l.content <- x'; l.content <- x';
y) y)
(*$T (*$T
let l = create 5 in update_map l (fun x->x+1, string_of_int x) = "5" && get l = 6 let l = create 5 in update_map l (fun x->x+1, string_of_int x) = "5" && get l = 6
*) *)
let get l = let get l =
Mutex.lock l.mutex; Mutex.lock l.mutex;
@ -135,7 +135,7 @@ let decr l = update l Pervasives.pred
(*$T (*$T
let l = create 0 in incr l ; get l = 1 let l = create 0 in incr l ; get l = 1
let l = create 0 in decr l ; get l = ~-1 let l = create 0 in decr l ; get l = ~-1
*) *)
let incr_then_get l = let incr_then_get l =
Mutex.lock l.mutex; Mutex.lock l.mutex;

View file

@ -101,15 +101,15 @@ module Make(P : PARAM) = struct
and run_cmd = function and run_cmd = function
| Die -> () | Die -> ()
| Wait -> | Wait ->
with_lock_ pool (fun p -> Condition.wait p.cond p.mutex) with_lock_ pool (fun p -> Condition.wait p.cond p.mutex)
| Process (Job1 (f, x)) -> | Process (Job1 (f, x)) ->
begin try ignore (f x) with e -> pool.exn_handler e end; serve pool begin try ignore (f x) with e -> pool.exn_handler e end; serve pool
| Process (Job2 (f, x, y)) -> | Process (Job2 (f, x, y)) ->
begin try ignore (f x y) with e -> pool.exn_handler e end; serve pool begin try ignore (f x y) with e -> pool.exn_handler e end; serve pool
| Process (Job3 (f, x, y, z)) -> | Process (Job3 (f, x, y, z)) ->
begin try ignore (f x y z) with e -> pool.exn_handler e end; serve pool begin try ignore (f x y z) with e -> pool.exn_handler e end; serve pool
| Process (Job4 (f, x, y, z, w)) -> | Process (Job4 (f, x, y, z, w)) ->
begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool
(* create a new worker thread *) (* create a new worker thread *)
let launch_worker_ pool = ignore (Thread.create serve pool) let launch_worker_ pool = ignore (Thread.create serve pool)
@ -126,24 +126,24 @@ module Make(P : PARAM) = struct
if the queue is empty *) if the queue is empty *)
with_lock_ pool with_lock_ pool
(fun pool -> (fun pool ->
if pool.stop then raise Stopped; if pool.stop then raise Stopped;
if Queue.is_empty pool.jobs && can_start_thread_ pool && pool.cur_idle = 0 if Queue.is_empty pool.jobs && can_start_thread_ pool && pool.cur_idle = 0
then ( then (
(* create the thread now, on [job], as it will not break order of (* create the thread now, on [job], as it will not break order of
jobs. We do not want to wait for the busy threads to do our task jobs. We do not want to wait for the busy threads to do our task
if we are allowed to spawn a new thread. *) if we are allowed to spawn a new thread. *)
incr_size_ pool; incr_size_ pool;
ignore (Thread.create run_cmd (Process job)) ignore (Thread.create run_cmd (Process job))
) else ( ) else (
(* cannot start thread, push and wait for some worker to pick it up *) (* cannot start thread, push and wait for some worker to pick it up *)
Queue.push job pool.jobs; Queue.push job pool.jobs;
Condition.signal pool.cond; (* wake up *) Condition.signal pool.cond; (* wake up *)
(* might want to process in the background, if all threads are busy *) (* might want to process in the background, if all threads are busy *)
if pool.cur_idle = 0 && can_start_thread_ pool then ( if pool.cur_idle = 0 && can_start_thread_ pool then (
incr_size_ pool; incr_size_ pool;
launch_worker_ pool; launch_worker_ pool;
) )
)) ))
(* run the function on the argument in the given pool *) (* run the function on the argument in the given pool *)
let run1 f x = run_job (Job1 (f, x)) let run1 f x = run_job (Job1 (f, x))
@ -162,8 +162,8 @@ module Make(P : PARAM) = struct
let stop () = let stop () =
with_lock_ pool with_lock_ pool
(fun p -> (fun p ->
p.stop <- true; p.stop <- true;
Queue.clear p.jobs) Queue.clear p.jobs)
(* stop threads if pool is GC'd *) (* stop threads if pool is GC'd *)
let () = Gc.finalise (fun _ -> stop ()) pool let () = Gc.finalise (fun _ -> stop ()) pool
@ -216,24 +216,24 @@ module Make(P : PARAM) = struct
let set_done_ cell x = let set_done_ cell x =
with_lock_ cell with_lock_ cell
(fun cell -> match cell.state with (fun cell -> match cell.state with
| Waiting -> (* set state and signal *) | Waiting -> (* set state and signal *)
cell.state <- Done x; cell.state <- Done x;
Condition.broadcast cell.condition; Condition.broadcast cell.condition;
List.iter List.iter
(fun f -> try f cell.state with e -> pool.exn_handler e) (fun f -> try f cell.state with e -> pool.exn_handler e)
cell.handlers cell.handlers
| _ -> assert false) | _ -> assert false)
let set_fail_ cell e = let set_fail_ cell e =
with_lock_ cell with_lock_ cell
(fun cell -> match cell.state with (fun cell -> match cell.state with
| Waiting -> | Waiting ->
cell.state <- Failed e; cell.state <- Failed e;
Condition.broadcast cell.condition; Condition.broadcast cell.condition;
List.iter List.iter
(fun f -> try f cell.state with e -> pool.exn_handler e) (fun f -> try f cell.state with e -> pool.exn_handler e)
cell.handlers cell.handlers
| _ -> assert false) | _ -> assert false)
(* calls [f x], and put result or exception in [cell] *) (* calls [f x], and put result or exception in [cell] *)
let run_and_set1 cell f x = let run_and_set1 cell f x =
@ -282,14 +282,14 @@ module Make(P : PARAM) = struct
| Return x -> x | Return x -> x
| FailNow e -> raise e | FailNow e -> raise e
| Run cell -> | Run cell ->
let rec get_ cell = match cell.state with let rec get_ cell = match cell.state with
| Waiting -> | Waiting ->
Condition.wait cell.condition cell.f_mutex; (* wait *) Condition.wait cell.condition cell.f_mutex; (* wait *)
get_ cell get_ cell
| Done x -> x | Done x -> x
| Failed e -> raise e | Failed e -> raise e
in in
with_lock_ cell get_ with_lock_ cell get_
(* access the result without locking *) (* access the result without locking *)
let get_nolock_ = function let get_nolock_ = function
@ -302,21 +302,21 @@ module Make(P : PARAM) = struct
| Return x -> Done x | Return x -> Done x
| FailNow e -> Failed e | FailNow e -> Failed e
| Run cell -> | Run cell ->
with_lock_ cell (fun cell -> cell.state) with_lock_ cell (fun cell -> cell.state)
let is_done = function let is_done = function
| Return _ | Return _
| FailNow _ -> true | FailNow _ -> true
| Run cell -> | Run cell ->
with_lock_ cell (fun c -> c.state <> Waiting) with_lock_ cell (fun c -> c.state <> Waiting)
(** {2 Combinators *) (** {2 Combinators *)
let add_handler_ cell f = let add_handler_ cell f =
with_lock_ cell with_lock_ cell
(fun cell -> match cell.state with (fun cell -> match cell.state with
| Waiting -> cell.handlers <- f :: cell.handlers | Waiting -> cell.handlers <- f :: cell.handlers
| Done _ | Failed _ -> f cell.state) | Done _ | Failed _ -> f cell.state)
let on_finish fut k = match fut with let on_finish fut k = match fut with
| Return x -> k (Done x) | Return x -> k (Done x)
@ -339,18 +339,18 @@ module Make(P : PARAM) = struct
add_handler_ cell add_handler_ cell
(function (function
| Done x -> | Done x ->
if async if async
then run3 run_and_set1 cell' f x then run3 run_and_set1 cell' f x
else run_and_set1 cell' f x else run_and_set1 cell' f x
| Failed e -> set_fail_ cell' e | Failed e -> set_fail_ cell' e
| Waiting -> assert false); | Waiting -> assert false);
Run cell' Run cell'
let map_ ~async f fut = match fut with let map_ ~async f fut = match fut with
| Return x -> | Return x ->
if async if async
then make1 f x then make1 f x
else Return (f x) else Return (f x)
| FailNow e -> FailNow e | FailNow e -> FailNow e
| Run cell -> map_cell_ ~async f cell ~into:(create_cell()) | Run cell -> map_cell_ ~async f cell ~into:(create_cell())
@ -367,23 +367,23 @@ module Make(P : PARAM) = struct
let app_ ~async f x = match f, x with let app_ ~async f x = match f, x with
| Return f, Return x -> | Return f, Return x ->
if async if async
then make1 f x then make1 f x
else Return (f x) else Return (f x)
| FailNow e, _ | FailNow e, _
| _, FailNow e -> FailNow e | _, FailNow e -> FailNow e
| Return f, Run x -> | Return f, Run x ->
map_cell_ ~async (fun x -> f x) x ~into:(create_cell()) map_cell_ ~async (fun x -> f x) x ~into:(create_cell())
| Run f, Return x -> | Run f, Return x ->
map_cell_ ~async (fun f -> f x) f ~into:(create_cell()) map_cell_ ~async (fun f -> f x) f ~into:(create_cell())
| Run f, Run x -> | Run f, Run x ->
let cell' = create_cell () in let cell' = create_cell () in
add_handler_ f add_handler_ f
(function (function
| Done f -> ignore (map_cell_ ~async f x ~into:cell') | Done f -> ignore (map_cell_ ~async f x ~into:cell')
| Failed e -> set_fail_ cell' e | Failed e -> set_fail_ cell' e
| Waiting -> assert false); | Waiting -> assert false);
Run cell' Run cell'
let app f x = app_ ~async:false f x let app f x = app_ ~async:false f x
@ -393,21 +393,21 @@ module Make(P : PARAM) = struct
| Return x -> f x | Return x -> f x
| FailNow e -> FailNow e | FailNow e -> FailNow e
| Run cell -> | Run cell ->
let cell' = create_cell() in let cell' = create_cell() in
add_handler_ cell add_handler_ cell
(function (function
| Done x -> | Done x ->
let fut' = f x in let fut' = f x in
on_finish fut' on_finish fut'
(function (function
| Done y -> set_done_ cell' y | Done y -> set_done_ cell' y
| Failed e -> set_fail_ cell' e | Failed e -> set_fail_ cell' e
| Waiting -> assert false | Waiting -> assert false
) )
| Failed e -> set_fail_ cell' e | Failed e -> set_fail_ cell' e
| Waiting -> assert false | Waiting -> assert false
); );
Run cell' Run cell'
let and_then fut f = flat_map (fun _ -> f ()) fut let and_then fut f = flat_map (fun _ -> f ()) fut
@ -416,57 +416,57 @@ module Make(P : PARAM) = struct
| L_ : 'a list -> 'a array_or_list | L_ : 'a list -> 'a array_or_list
let iter_aol let iter_aol
: type a. a array_or_list -> (a -> unit) -> unit : type a. a array_or_list -> (a -> unit) -> unit
= fun aol f -> match aol with = fun aol f -> match aol with
| A_ a -> Array.iter f a | A_ a -> Array.iter f a
| L_ l -> List.iter f l | L_ l -> List.iter f l
(* [sequence_ l f] returns a future that waits for every element of [l] (* [sequence_ l f] returns a future that waits for every element of [l]
to return of fail, and call [f ()] to obtain the result (as a closure) to return of fail, and call [f ()] to obtain the result (as a closure)
in case every element succeeded (otherwise a failure is in case every element succeeded (otherwise a failure is
returned automatically) *) returned automatically) *)
let sequence_ let sequence_
: type a res. a t array_or_list -> (unit -> res) -> res t : type a res. a t array_or_list -> (unit -> res) -> res t
= fun aol f -> = fun aol f ->
let n = match aol with let n = match aol with
| A_ a -> Array.length a | A_ a -> Array.length a
| L_ l -> List.length l | L_ l -> List.length l
in in
assert (n>0); assert (n>0);
let cell = create_cell() in let cell = create_cell() in
let n_err = CCLock.create 0 in (* number of failed threads *) let n_err = CCLock.create 0 in (* number of failed threads *)
let n_ok = CCLock.create 0 in (* number of succeeding threads *) let n_ok = CCLock.create 0 in (* number of succeeding threads *)
iter_aol aol iter_aol aol
(fun fut -> (fun fut ->
on_finish fut on_finish fut
(function (function
| Failed e -> | Failed e ->
let x = CCLock.incr_then_get n_err in let x = CCLock.incr_then_get n_err in
(* if first failure, then seal [cell]'s fate now *) (* if first failure, then seal [cell]'s fate now *)
if x=1 then set_fail_ cell e if x=1 then set_fail_ cell e
| Done _ -> | Done _ ->
let x = CCLock.incr_then_get n_ok in let x = CCLock.incr_then_get n_ok in
(* if [n] successes, then [cell] succeeds. Otherwise, some (* if [n] successes, then [cell] succeeds. Otherwise, some
job has not finished or some job has failed. *) job has not finished or some job has failed. *)
if x = n then ( if x = n then (
let res = f () in let res = f () in
set_done_ cell res set_done_ cell res
) )
| Waiting -> assert false)); | Waiting -> assert false));
Run cell Run cell
(* map an array of futures to a future array *) (* map an array of futures to a future array *)
let sequence_a a = match a with let sequence_a a = match a with
| [||] -> return [||] | [||] -> return [||]
| _ -> | _ ->
sequence_ (A_ a) sequence_ (A_ a)
(fun () -> Array.map get_nolock_ a) (fun () -> Array.map get_nolock_ a)
let map_a f a = sequence_a (Array.map f a) let map_a f a = sequence_a (Array.map f a)
let sequence_l l = match l with let sequence_l l = match l with
| [] -> return [] | [] -> return []
| _ :: _ -> | _ :: _ ->
sequence_ (L_ l) (fun () -> List.map get_nolock_ l) sequence_ (L_ l) (fun () -> List.map get_nolock_ l)
(* reverse twice *) (* reverse twice *)
@ -499,22 +499,22 @@ module Make(P : PARAM) = struct
*) *)
let choose_ let choose_
: type a. a t array_or_list -> a t : type a. a t array_or_list -> a t
= fun aol -> = fun aol ->
let cell = create_cell() in let cell = create_cell() in
let is_done = CCLock.create false in let is_done = CCLock.create false in
iter_aol aol iter_aol aol
(fun fut -> (fun fut ->
on_finish fut on_finish fut
(fun res -> match res with (fun res -> match res with
| Waiting -> assert false | Waiting -> assert false
| Done x -> | Done x ->
let was_done = CCLock.get_then_clear is_done in let was_done = CCLock.get_then_clear is_done in
if not was_done then set_done_ cell x if not was_done then set_done_ cell x
| Failed e -> | Failed e ->
let was_done = CCLock.get_then_clear is_done in let was_done = CCLock.get_then_clear is_done in
if not was_done then set_fail_ cell e)); if not was_done then set_fail_ cell e));
Run cell Run cell
let choose_a a = choose_ (A_ a) let choose_a a = choose_ (A_ a)

View file

@ -58,16 +58,16 @@ module Barrier = struct
let wait b = let wait b =
with_lock_ b with_lock_ b
(fun () -> (fun () ->
while not b.activated do while not b.activated do
Condition.wait b.cond b.lock Condition.wait b.cond b.lock
done) done)
let activate b = let activate b =
with_lock_ b with_lock_ b
(fun () -> (fun () ->
if not b.activated then ( if not b.activated then (
b.activated <- true; b.activated <- true;
Condition.broadcast b.cond)) Condition.broadcast b.cond))
let activated b = with_lock_ b (fun () -> b.activated) let activated b = with_lock_ b (fun () -> b.activated)
end end

View file

@ -7,9 +7,9 @@ type job =
| Job : float * (unit -> 'a) -> job | Job : float * (unit -> 'a) -> job
module TaskHeap = CCHeap.Make(struct module TaskHeap = CCHeap.Make(struct
type t = job type t = job
let leq (Job(f1,_)) (Job (f2,_)) = f1 <= f2 let leq (Job(f1,_)) (Job (f2,_)) = f1 <= f2
end) end)
exception Stopped exception Stopped
@ -61,12 +61,12 @@ let next_task_ timer = match TaskHeap.find_min timer.tasks with
| _ when timer.stop -> Quit | _ when timer.stop -> Quit
| None -> Wait standby_wait | None -> Wait standby_wait
| Some Job (time, f) -> | Some Job (time, f) ->
let now = Unix.gettimeofday () in let now = Unix.gettimeofday () in
if now +. epsilon > time then ( if now +. epsilon > time then (
(* now! *) (* now! *)
pop_task_ timer; pop_task_ timer;
Run f Run f
) else Wait (time -. now) ) else Wait (time -. now)
(* The main thread function: wait for next event, run it, and loop *) (* The main thread function: wait for next event, run it, and loop *)
let serve timer = let serve timer =
@ -75,8 +75,8 @@ let serve timer =
let rec next () = match with_lock_ timer next_task_ with let rec next () = match with_lock_ timer next_task_ with
| Quit -> () | Quit -> ()
| Run f -> | Run f ->
call_ timer f; (* call outside of any lock *) call_ timer f; (* call outside of any lock *)
next () next ()
| Wait delay -> wait delay | Wait delay -> wait delay
(* wait for [delay] seconds, or until something happens on [fifo_in] *) (* wait for [delay] seconds, or until something happens on [fifo_in] *)
and wait delay = and wait delay =
@ -118,16 +118,16 @@ let at timer time ~f =
else else
with_lock_ timer with_lock_ timer
(fun timer -> (fun timer ->
if timer.stop then raise Stopped; if timer.stop then raise Stopped;
(* time of the next scheduled event *) (* time of the next scheduled event *)
let next_time = match TaskHeap.find_min timer.tasks with let next_time = match TaskHeap.find_min timer.tasks with
| None -> max_float | None -> max_float
| Some Job (d, _) -> d | Some Job (d, _) -> d
in in
(* insert task *) (* insert task *)
timer.tasks <- TaskHeap.insert (Job (time, f)) timer.tasks; timer.tasks <- TaskHeap.insert (Job (time, f)) timer.tasks;
(* see if the timer thread needs to be awaken earlier *) (* see if the timer thread needs to be awaken earlier *)
if time < next_time then awaken_ timer if time < next_time then awaken_ timer
) )
let after timer delay ~f = let after timer delay ~f =
@ -145,8 +145,8 @@ let every ?delay timer d ~f =
with ExitEvery -> () (* stop *) with ExitEvery -> () (* stop *)
and schedule () = after timer d ~f:run in and schedule () = after timer d ~f:run in
match delay with match delay with
| None -> run() | None -> run()
| Some d -> after timer d ~f:run | Some d -> after timer d ~f:run
(*$R (*$R
let start = Unix.gettimeofday() in let start = Unix.gettimeofday() in
@ -170,13 +170,13 @@ let active timer = not timer.stop
let stop timer = let stop timer =
with_lock_ timer with_lock_ timer
(fun timer -> (fun timer ->
if not timer.stop then ( if not timer.stop then (
timer.stop <- true; timer.stop <- true;
(* empty heap of tasks *) (* empty heap of tasks *)
timer.tasks <- TaskHeap.empty; timer.tasks <- TaskHeap.empty;
(* tell the thread to stop *) (* tell the thread to stop *)
awaken_ timer; awaken_ timer;
) )
) )
(*$R (*$R