diff --git a/src/threads/CCBlockingQueue.ml b/src/threads/CCBlockingQueue.ml index d767b4ab..9a5a0f9f 100644 --- a/src/threads/CCBlockingQueue.ml +++ b/src/threads/CCBlockingQueue.ml @@ -41,26 +41,26 @@ let with_lock_ q f = let push q x = with_lock_ q (fun () -> - while q.size = q.capacity do - Condition.wait q.cond q.lock - done; - assert (q.size < q.capacity); - Queue.push x q.q; - (* if there are blocked receivers, awake one of them *) - incr_size_ q; - Condition.broadcast q.cond) + while q.size = q.capacity do + Condition.wait q.cond q.lock + done; + assert (q.size < q.capacity); + Queue.push x q.q; + (* if there are blocked receivers, awake one of them *) + incr_size_ q; + Condition.broadcast q.cond) let take q = with_lock_ q (fun () -> - while q.size = 0 do - Condition.wait q.cond q.lock - done; - let x = Queue.take q.q in - (* if there are blocked senders, awake one of them *) - decr_size_ q; - Condition.broadcast q.cond; - x) + while q.size = 0 do + Condition.wait q.cond q.lock + done; + let x = Queue.take q.q in + (* if there are blocked senders, awake one of them *) + decr_size_ q; + Condition.broadcast q.cond; + x) (*$R let q = create 1 in @@ -83,22 +83,22 @@ let push_list q l = | [] -> l | _::_ when q.size = q.capacity -> l (* no room remaining *) | x :: tl -> - Queue.push x q.q; - incr_size_ q; - push_ q tl + Queue.push x q.q; + incr_size_ q; + push_ q tl in (* push chunks of [l] in [q] until [l] is empty *) let rec aux q l = match l with - | [] -> () - | _::_ -> + | [] -> () + | _::_ -> let l = with_lock_ q - (fun () -> - while q.size = q.capacity do - Condition.wait q.cond q.lock - done; - let l = push_ q l in - Condition.broadcast q.cond; - l) + (fun () -> + while q.size = q.capacity do + Condition.wait q.cond q.lock + done; + let l = push_ q l in + Condition.broadcast q.cond; + l) in aux q l in aux q l @@ -118,14 +118,14 @@ let take_list q n = if n=0 then List.rev acc else let acc, n = with_lock_ q - (fun () -> - while q.size = 0 do - Condition.wait q.cond q.lock - done; - let acc, n = pop_ acc q n in - Condition.broadcast q.cond; - acc, n - ) + (fun () -> + while q.size = 0 do + Condition.wait q.cond q.lock + done; + let acc, n = pop_ acc q n in + Condition.broadcast q.cond; + acc, n + ) in aux acc q n in @@ -163,28 +163,28 @@ let take_list q n = let try_take q = with_lock_ q (fun () -> - if q.size = 0 then None - else ( - decr_size_ q; - Some (Queue.take q.q) - )) + if q.size = 0 then None + else ( + decr_size_ q; + Some (Queue.take q.q) + )) let try_push q x = with_lock_ q (fun () -> - if q.size = q.capacity then false - else ( - incr_size_ q; - Queue.push x q.q; - Condition.signal q.cond; - true - )) + if q.size = q.capacity then false + else ( + incr_size_ q; + Queue.push x q.q; + Condition.signal q.cond; + true + )) let peek q = with_lock_ q (fun () -> - try Some (Queue.peek q.q) - with Queue.Empty -> None) + try Some (Queue.peek q.q) + with Queue.Empty -> None) let size q = with_lock_ q (fun () -> q.size) diff --git a/src/threads/CCLock.ml b/src/threads/CCLock.ml index c1f0bf14..61daccfe 100644 --- a/src/threads/CCLock.ml +++ b/src/threads/CCLock.ml @@ -91,18 +91,18 @@ let update l f = (*$T let l = create 5 in update l (fun x->x+1); get l = 6 - *) +*) let update_map l f = with_lock l (fun x -> - let x', y = f x in - l.content <- x'; - y) + let x', y = f x in + l.content <- x'; + y) (*$T let l = create 5 in update_map l (fun x->x+1, string_of_int x) = "5" && get l = 6 - *) +*) let get l = Mutex.lock l.mutex; @@ -135,7 +135,7 @@ let decr l = update l Pervasives.pred (*$T let l = create 0 in incr l ; get l = 1 let l = create 0 in decr l ; get l = ~-1 - *) +*) let incr_then_get l = Mutex.lock l.mutex; diff --git a/src/threads/CCPool.ml b/src/threads/CCPool.ml index 401863ca..fa5b3192 100644 --- a/src/threads/CCPool.ml +++ b/src/threads/CCPool.ml @@ -101,15 +101,15 @@ module Make(P : PARAM) = struct and run_cmd = function | Die -> () | Wait -> - with_lock_ pool (fun p -> Condition.wait p.cond p.mutex) + with_lock_ pool (fun p -> Condition.wait p.cond p.mutex) | Process (Job1 (f, x)) -> - begin try ignore (f x) with e -> pool.exn_handler e end; serve pool + begin try ignore (f x) with e -> pool.exn_handler e end; serve pool | Process (Job2 (f, x, y)) -> - begin try ignore (f x y) with e -> pool.exn_handler e end; serve pool + begin try ignore (f x y) with e -> pool.exn_handler e end; serve pool | Process (Job3 (f, x, y, z)) -> - begin try ignore (f x y z) with e -> pool.exn_handler e end; serve pool + begin try ignore (f x y z) with e -> pool.exn_handler e end; serve pool | Process (Job4 (f, x, y, z, w)) -> - begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool + begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool (* create a new worker thread *) let launch_worker_ pool = ignore (Thread.create serve pool) @@ -126,24 +126,24 @@ module Make(P : PARAM) = struct if the queue is empty *) with_lock_ pool (fun pool -> - if pool.stop then raise Stopped; - if Queue.is_empty pool.jobs && can_start_thread_ pool && pool.cur_idle = 0 - then ( - (* create the thread now, on [job], as it will not break order of - jobs. We do not want to wait for the busy threads to do our task - if we are allowed to spawn a new thread. *) - incr_size_ pool; - ignore (Thread.create run_cmd (Process job)) - ) else ( - (* cannot start thread, push and wait for some worker to pick it up *) - Queue.push job pool.jobs; - Condition.signal pool.cond; (* wake up *) - (* might want to process in the background, if all threads are busy *) - if pool.cur_idle = 0 && can_start_thread_ pool then ( - incr_size_ pool; - launch_worker_ pool; - ) - )) + if pool.stop then raise Stopped; + if Queue.is_empty pool.jobs && can_start_thread_ pool && pool.cur_idle = 0 + then ( + (* create the thread now, on [job], as it will not break order of + jobs. We do not want to wait for the busy threads to do our task + if we are allowed to spawn a new thread. *) + incr_size_ pool; + ignore (Thread.create run_cmd (Process job)) + ) else ( + (* cannot start thread, push and wait for some worker to pick it up *) + Queue.push job pool.jobs; + Condition.signal pool.cond; (* wake up *) + (* might want to process in the background, if all threads are busy *) + if pool.cur_idle = 0 && can_start_thread_ pool then ( + incr_size_ pool; + launch_worker_ pool; + ) + )) (* run the function on the argument in the given pool *) let run1 f x = run_job (Job1 (f, x)) @@ -162,8 +162,8 @@ module Make(P : PARAM) = struct let stop () = with_lock_ pool (fun p -> - p.stop <- true; - Queue.clear p.jobs) + p.stop <- true; + Queue.clear p.jobs) (* stop threads if pool is GC'd *) let () = Gc.finalise (fun _ -> stop ()) pool @@ -216,24 +216,24 @@ module Make(P : PARAM) = struct let set_done_ cell x = with_lock_ cell (fun cell -> match cell.state with - | Waiting -> (* set state and signal *) - cell.state <- Done x; - Condition.broadcast cell.condition; - List.iter - (fun f -> try f cell.state with e -> pool.exn_handler e) - cell.handlers - | _ -> assert false) + | Waiting -> (* set state and signal *) + cell.state <- Done x; + Condition.broadcast cell.condition; + List.iter + (fun f -> try f cell.state with e -> pool.exn_handler e) + cell.handlers + | _ -> assert false) let set_fail_ cell e = with_lock_ cell (fun cell -> match cell.state with - | Waiting -> - cell.state <- Failed e; - Condition.broadcast cell.condition; - List.iter - (fun f -> try f cell.state with e -> pool.exn_handler e) - cell.handlers - | _ -> assert false) + | Waiting -> + cell.state <- Failed e; + Condition.broadcast cell.condition; + List.iter + (fun f -> try f cell.state with e -> pool.exn_handler e) + cell.handlers + | _ -> assert false) (* calls [f x], and put result or exception in [cell] *) let run_and_set1 cell f x = @@ -282,14 +282,14 @@ module Make(P : PARAM) = struct | Return x -> x | FailNow e -> raise e | Run cell -> - let rec get_ cell = match cell.state with - | Waiting -> - Condition.wait cell.condition cell.f_mutex; (* wait *) - get_ cell - | Done x -> x - | Failed e -> raise e - in - with_lock_ cell get_ + let rec get_ cell = match cell.state with + | Waiting -> + Condition.wait cell.condition cell.f_mutex; (* wait *) + get_ cell + | Done x -> x + | Failed e -> raise e + in + with_lock_ cell get_ (* access the result without locking *) let get_nolock_ = function @@ -302,21 +302,21 @@ module Make(P : PARAM) = struct | Return x -> Done x | FailNow e -> Failed e | Run cell -> - with_lock_ cell (fun cell -> cell.state) + with_lock_ cell (fun cell -> cell.state) let is_done = function | Return _ | FailNow _ -> true | Run cell -> - with_lock_ cell (fun c -> c.state <> Waiting) + with_lock_ cell (fun c -> c.state <> Waiting) (** {2 Combinators *) let add_handler_ cell f = with_lock_ cell (fun cell -> match cell.state with - | Waiting -> cell.handlers <- f :: cell.handlers - | Done _ | Failed _ -> f cell.state) + | Waiting -> cell.handlers <- f :: cell.handlers + | Done _ | Failed _ -> f cell.state) let on_finish fut k = match fut with | Return x -> k (Done x) @@ -339,18 +339,18 @@ module Make(P : PARAM) = struct add_handler_ cell (function | Done x -> - if async - then run3 run_and_set1 cell' f x - else run_and_set1 cell' f x + if async + then run3 run_and_set1 cell' f x + else run_and_set1 cell' f x | Failed e -> set_fail_ cell' e | Waiting -> assert false); Run cell' let map_ ~async f fut = match fut with | Return x -> - if async - then make1 f x - else Return (f x) + if async + then make1 f x + else Return (f x) | FailNow e -> FailNow e | Run cell -> map_cell_ ~async f cell ~into:(create_cell()) @@ -367,23 +367,23 @@ module Make(P : PARAM) = struct let app_ ~async f x = match f, x with | Return f, Return x -> - if async - then make1 f x - else Return (f x) + if async + then make1 f x + else Return (f x) | FailNow e, _ | _, FailNow e -> FailNow e | Return f, Run x -> - map_cell_ ~async (fun x -> f x) x ~into:(create_cell()) + map_cell_ ~async (fun x -> f x) x ~into:(create_cell()) | Run f, Return x -> - map_cell_ ~async (fun f -> f x) f ~into:(create_cell()) + map_cell_ ~async (fun f -> f x) f ~into:(create_cell()) | Run f, Run x -> - let cell' = create_cell () in - add_handler_ f - (function - | Done f -> ignore (map_cell_ ~async f x ~into:cell') - | Failed e -> set_fail_ cell' e - | Waiting -> assert false); - Run cell' + let cell' = create_cell () in + add_handler_ f + (function + | Done f -> ignore (map_cell_ ~async f x ~into:cell') + | Failed e -> set_fail_ cell' e + | Waiting -> assert false); + Run cell' let app f x = app_ ~async:false f x @@ -393,21 +393,21 @@ module Make(P : PARAM) = struct | Return x -> f x | FailNow e -> FailNow e | Run cell -> - let cell' = create_cell() in - add_handler_ cell - (function - | Done x -> - let fut' = f x in - on_finish fut' - (function - | Done y -> set_done_ cell' y - | Failed e -> set_fail_ cell' e - | Waiting -> assert false - ) - | Failed e -> set_fail_ cell' e - | Waiting -> assert false - ); - Run cell' + let cell' = create_cell() in + add_handler_ cell + (function + | Done x -> + let fut' = f x in + on_finish fut' + (function + | Done y -> set_done_ cell' y + | Failed e -> set_fail_ cell' e + | Waiting -> assert false + ) + | Failed e -> set_fail_ cell' e + | Waiting -> assert false + ); + Run cell' let and_then fut f = flat_map (fun _ -> f ()) fut @@ -416,57 +416,57 @@ module Make(P : PARAM) = struct | L_ : 'a list -> 'a array_or_list let iter_aol - : type a. a array_or_list -> (a -> unit) -> unit - = fun aol f -> match aol with - | A_ a -> Array.iter f a - | L_ l -> List.iter f l + : type a. a array_or_list -> (a -> unit) -> unit + = fun aol f -> match aol with + | A_ a -> Array.iter f a + | L_ l -> List.iter f l (* [sequence_ l f] returns a future that waits for every element of [l] to return of fail, and call [f ()] to obtain the result (as a closure) in case every element succeeded (otherwise a failure is returned automatically) *) let sequence_ - : type a res. a t array_or_list -> (unit -> res) -> res t - = fun aol f -> - let n = match aol with - | A_ a -> Array.length a - | L_ l -> List.length l - in - assert (n>0); - let cell = create_cell() in - let n_err = CCLock.create 0 in (* number of failed threads *) - let n_ok = CCLock.create 0 in (* number of succeeding threads *) - iter_aol aol - (fun fut -> - on_finish fut - (function - | Failed e -> - let x = CCLock.incr_then_get n_err in - (* if first failure, then seal [cell]'s fate now *) - if x=1 then set_fail_ cell e - | Done _ -> - let x = CCLock.incr_then_get n_ok in - (* if [n] successes, then [cell] succeeds. Otherwise, some - job has not finished or some job has failed. *) - if x = n then ( - let res = f () in - set_done_ cell res - ) - | Waiting -> assert false)); - Run cell + : type a res. a t array_or_list -> (unit -> res) -> res t + = fun aol f -> + let n = match aol with + | A_ a -> Array.length a + | L_ l -> List.length l + in + assert (n>0); + let cell = create_cell() in + let n_err = CCLock.create 0 in (* number of failed threads *) + let n_ok = CCLock.create 0 in (* number of succeeding threads *) + iter_aol aol + (fun fut -> + on_finish fut + (function + | Failed e -> + let x = CCLock.incr_then_get n_err in + (* if first failure, then seal [cell]'s fate now *) + if x=1 then set_fail_ cell e + | Done _ -> + let x = CCLock.incr_then_get n_ok in + (* if [n] successes, then [cell] succeeds. Otherwise, some + job has not finished or some job has failed. *) + if x = n then ( + let res = f () in + set_done_ cell res + ) + | Waiting -> assert false)); + Run cell (* map an array of futures to a future array *) let sequence_a a = match a with - | [||] -> return [||] - | _ -> + | [||] -> return [||] + | _ -> sequence_ (A_ a) (fun () -> Array.map get_nolock_ a) let map_a f a = sequence_a (Array.map f a) let sequence_l l = match l with - | [] -> return [] - | _ :: _ -> + | [] -> return [] + | _ :: _ -> sequence_ (L_ l) (fun () -> List.map get_nolock_ l) (* reverse twice *) @@ -499,22 +499,22 @@ module Make(P : PARAM) = struct *) let choose_ - : type a. a t array_or_list -> a t - = fun aol -> - let cell = create_cell() in - let is_done = CCLock.create false in - iter_aol aol - (fun fut -> - on_finish fut - (fun res -> match res with - | Waiting -> assert false - | Done x -> - let was_done = CCLock.get_then_clear is_done in - if not was_done then set_done_ cell x - | Failed e -> - let was_done = CCLock.get_then_clear is_done in - if not was_done then set_fail_ cell e)); - Run cell + : type a. a t array_or_list -> a t + = fun aol -> + let cell = create_cell() in + let is_done = CCLock.create false in + iter_aol aol + (fun fut -> + on_finish fut + (fun res -> match res with + | Waiting -> assert false + | Done x -> + let was_done = CCLock.get_then_clear is_done in + if not was_done then set_done_ cell x + | Failed e -> + let was_done = CCLock.get_then_clear is_done in + if not was_done then set_fail_ cell e)); + Run cell let choose_a a = choose_ (A_ a) diff --git a/src/threads/CCThread.ml b/src/threads/CCThread.ml index eb274097..51d0e795 100644 --- a/src/threads/CCThread.ml +++ b/src/threads/CCThread.ml @@ -58,16 +58,16 @@ module Barrier = struct let wait b = with_lock_ b (fun () -> - while not b.activated do - Condition.wait b.cond b.lock - done) + while not b.activated do + Condition.wait b.cond b.lock + done) let activate b = with_lock_ b (fun () -> - if not b.activated then ( - b.activated <- true; - Condition.broadcast b.cond)) + if not b.activated then ( + b.activated <- true; + Condition.broadcast b.cond)) let activated b = with_lock_ b (fun () -> b.activated) end diff --git a/src/threads/CCTimer.ml b/src/threads/CCTimer.ml index 3fd93934..f2c37cb8 100644 --- a/src/threads/CCTimer.ml +++ b/src/threads/CCTimer.ml @@ -7,9 +7,9 @@ type job = | Job : float * (unit -> 'a) -> job module TaskHeap = CCHeap.Make(struct - type t = job - let leq (Job(f1,_)) (Job (f2,_)) = f1 <= f2 -end) + type t = job + let leq (Job(f1,_)) (Job (f2,_)) = f1 <= f2 + end) exception Stopped @@ -61,12 +61,12 @@ let next_task_ timer = match TaskHeap.find_min timer.tasks with | _ when timer.stop -> Quit | None -> Wait standby_wait | Some Job (time, f) -> - let now = Unix.gettimeofday () in - if now +. epsilon > time then ( - (* now! *) - pop_task_ timer; - Run f - ) else Wait (time -. now) + let now = Unix.gettimeofday () in + if now +. epsilon > time then ( + (* now! *) + pop_task_ timer; + Run f + ) else Wait (time -. now) (* The main thread function: wait for next event, run it, and loop *) let serve timer = @@ -75,8 +75,8 @@ let serve timer = let rec next () = match with_lock_ timer next_task_ with | Quit -> () | Run f -> - call_ timer f; (* call outside of any lock *) - next () + call_ timer f; (* call outside of any lock *) + next () | Wait delay -> wait delay (* wait for [delay] seconds, or until something happens on [fifo_in] *) and wait delay = @@ -118,16 +118,16 @@ let at timer time ~f = else with_lock_ timer (fun timer -> - if timer.stop then raise Stopped; - (* time of the next scheduled event *) - let next_time = match TaskHeap.find_min timer.tasks with - | None -> max_float - | Some Job (d, _) -> d - in - (* insert task *) - timer.tasks <- TaskHeap.insert (Job (time, f)) timer.tasks; - (* see if the timer thread needs to be awaken earlier *) - if time < next_time then awaken_ timer + if timer.stop then raise Stopped; + (* time of the next scheduled event *) + let next_time = match TaskHeap.find_min timer.tasks with + | None -> max_float + | Some Job (d, _) -> d + in + (* insert task *) + timer.tasks <- TaskHeap.insert (Job (time, f)) timer.tasks; + (* see if the timer thread needs to be awaken earlier *) + if time < next_time then awaken_ timer ) let after timer delay ~f = @@ -145,8 +145,8 @@ let every ?delay timer d ~f = with ExitEvery -> () (* stop *) and schedule () = after timer d ~f:run in match delay with - | None -> run() - | Some d -> after timer d ~f:run + | None -> run() + | Some d -> after timer d ~f:run (*$R let start = Unix.gettimeofday() in @@ -170,13 +170,13 @@ let active timer = not timer.stop let stop timer = with_lock_ timer (fun timer -> - if not timer.stop then ( - timer.stop <- true; - (* empty heap of tasks *) - timer.tasks <- TaskHeap.empty; - (* tell the thread to stop *) - awaken_ timer; - ) + if not timer.stop then ( + timer.stop <- true; + (* empty heap of tasks *) + timer.tasks <- TaskHeap.empty; + (* tell the thread to stop *) + awaken_ timer; + ) ) (*$R