split CCTimer out of CCFuture, a standalone 1-thread timer

This commit is contained in:
Simon Cruanes 2016-01-26 01:01:04 +01:00
parent 9097cb11ab
commit 6b03a28cba
2 changed files with 238 additions and 0 deletions

195
src/threads/CCTimer.ml Normal file
View file

@ -0,0 +1,195 @@
(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Event timer} *)
type job =
| Job : float * (unit -> 'a) -> job
module TaskHeap = CCHeap.Make(struct
type t = job
let leq (Job(f1,_)) (Job (f2,_)) = f1 <= f2
end)
exception Stopped
type t = {
mutable stop : bool;
mutable tasks : TaskHeap.t;
mutable exn_handler : (exn -> unit);
t_mutex : Mutex.t;
fifo_in : Unix.file_descr;
fifo_out : Unix.file_descr;
}
let set_exn_handler timer f = timer.exn_handler <- f
let standby_wait = 10.
(* when no task is scheduled, this is the amount of time that is waited
in a row for something to happen. This is also the maximal delay
between the call to {!stop} and the actual termination of the
thread. *)
let epsilon = 0.0001
(* accepted time diff for actions. *)
let with_lock_ t f =
Mutex.lock t.t_mutex;
try
let x = f t in
Mutex.unlock t.t_mutex;
x
with e ->
Mutex.unlock t.t_mutex;
raise e
type command =
| Quit
| Run : (unit -> _) -> command
| Wait of float
let pop_task_ t =
let tasks, _ = TaskHeap.take_exn t.tasks in
t.tasks <- tasks
let call_ timer f =
try ignore (f ())
with e -> timer.exn_handler e
(* check next task *)
let next_task_ timer = match TaskHeap.find_min timer.tasks with
| _ when timer.stop -> Quit
| None -> Wait standby_wait
| Some Job (time, f) ->
let now = Unix.gettimeofday () in
if now +. epsilon > time then (
(* now! *)
pop_task_ timer;
Run f
) else Wait (time -. now)
(* The main thread function: wait for next event, run it, and loop *)
let serve timer =
let buf = Bytes.make 1 '_' in
(* acquire lock, call [process_task] and do as it commands *)
let rec next () = match with_lock_ timer next_task_ with
| Quit -> ()
| Run f ->
call_ timer f; (* call outside of any lock *)
next ()
| Wait delay -> wait delay
(* wait for [delay] seconds, or until something happens on [fifo_in] *)
and wait delay =
let read = Thread.wait_timed_read timer.fifo_in delay in
(* remove char from fifo, so that next write can happen *)
if read then ignore (Unix.read timer.fifo_in buf 0 1);
next ()
in
next ()
let nop_handler_ _ = ()
let create () =
let fifo_in, fifo_out = Unix.pipe () in
let timer = {
stop = false;
tasks = TaskHeap.empty;
exn_handler = nop_handler_;
t_mutex = Mutex.create ();
fifo_in;
fifo_out;
} in
(* start a thread to process tasks *)
let _t = Thread.create serve timer in
timer
let underscore_ = Bytes.make 1 '_'
(* awake the thread *)
let awaken_ timer =
ignore (Unix.single_write timer.fifo_out underscore_ 0 1)
(** [at s t ~f] will run [f ()] at the Unix echo [t] *)
let at timer time ~f =
if timer.stop then raise Stopped;
let now = Unix.gettimeofday () in
if now >= time
then call_ timer f
else
with_lock_ timer
(fun timer ->
if timer.stop then raise Stopped;
(* time of the next scheduled event *)
let next_time = match TaskHeap.find_min timer.tasks with
| None -> max_float
| Some Job (d, _) -> d
in
(* insert task *)
timer.tasks <- TaskHeap.insert (Job (time, f)) timer.tasks;
(* see if the timer thread needs to be awaken earlier *)
if time < next_time then awaken_ timer
)
let after timer delay ~f =
assert (delay >= 0.);
let now = Unix.gettimeofday () in
at timer (now +. delay) ~f
exception ExitEvery
let every ?delay timer d ~f =
let rec run () =
try
ignore (f ());
schedule()
with ExitEvery -> () (* stop *)
and schedule () = after timer d ~f:run in
match delay with
| None -> run()
| Some d -> after timer d ~f:run
(*$R
let start = Unix.gettimeofday() in
let timer = create() in
let res = CCLock.create 0 in
let stop = ref 0. in
every timer 0.1
~f:(fun () ->
if CCLock.incr_then_get res > 5 then (
stop := Unix.gettimeofday();
raise ExitEvery
));
Thread.delay 0.7;
OUnit.assert_equal ~printer:CCInt.to_string 6 (CCLock.get res);
OUnit.assert_bool "estimate delay" (abs_float (!stop -. start -. 0.5) < 0.1);
*)
let active timer = not timer.stop
(** Stop the given timer, cancelling pending tasks *)
let stop timer =
with_lock_ timer
(fun timer ->
if not timer.stop then (
timer.stop <- true;
(* empty heap of tasks *)
timer.tasks <- TaskHeap.empty;
(* tell the thread to stop *)
awaken_ timer;
)
)
(*$R
(* scenario: n := 1; n := n*4 ; n := n+2; res := n *)
let timer = create () in
let n = CCLock.create 1 in
let res = CCLock.create 0 in
after timer 0.6
~f:(fun () -> CCLock.update n (fun x -> x+2));
ignore (Thread.create
(fun _ -> Thread.delay 0.8; CCLock.set res (CCLock.get n)) ());
after timer 0.4
~f:(fun () -> CCLock.update n (fun x -> x * 4));
Thread.delay 1. ;
OUnit.assert_equal 6 (CCLock.get res);
*)

43
src/threads/CCTimer.mli Normal file
View file

@ -0,0 +1,43 @@
(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Event timer}
Used to be part of [CCFuture]
@since NEXT_RELEASE *)
type t
(** A scheduler for events. It runs in its own thread. *)
val create : unit -> t
(** A new timer. *)
val set_exn_handler : t -> (exn -> unit) -> unit
(** [set_exn_handler timer f] registers [f] so that any exception
raised by a task scheduled in [timer] is given to [f] *)
exception Stopped
val after : t -> float -> f:(unit -> _) -> unit
(** Call the callback [f] after the given number of seconds.
@raise Stopped if the timer was stopped *)
val at : t -> float -> f:(unit -> _) -> unit
(** Create a future that evaluates to [()] at the given Unix timestamp
@raise Stopped if the timer was stopped *)
exception ExitEvery
val every : ?delay:float -> t -> float -> f:(unit -> _) -> unit
(** [every timer n ~f] calls [f ()] every [n] seconds.
[f()] can raise ExitEvery to stop the cycle.
@param delay if provided, the first call to [f ()] is delayed by
that many seconds.
@raise Stopped if the timer was stopped *)
val stop : t -> unit
(** Stop the given timer, cancelling pending tasks. Idempotent.
From now on, calling most other operations on the timer will raise Stopped. *)
val active : t -> bool
(** Returns [true] until [stop t] has been called. *)