added a timer in Future

This commit is contained in:
Simon Cruanes 2013-03-24 19:21:43 +01:00
parent d74808efe0
commit 8be147c50b
2 changed files with 214 additions and 0 deletions

196
future.ml
View file

@ -520,6 +520,202 @@ let sleep ?(pool=default_pool) time =
spawn ~pool spawn ~pool
(fun () -> Thread.delay time; ()) (fun () -> Thread.delay time; ())
(** {2 Event timer} *)
(** {3 Mutable heap (taken from heap.ml to avoid dependencies)} *)
module Heap = struct
type 'a t = {
mutable tree : 'a tree;
cmp : 'a -> 'a -> int;
} (** A splay tree heap with the given comparison function *)
and 'a tree =
| Empty
| Node of ('a tree * 'a * 'a tree)
(** A splay tree containing values of type 'a *)
let empty ~cmp = {
tree = Empty;
cmp;
}
let is_empty h =
match h.tree with
| Empty -> true
| Node _ -> false
let clear h =
h.tree <- Empty
(** Partition the tree into (elements <= pivot, elements > pivot) *)
let rec partition ~cmp pivot tree =
match tree with
| Empty -> Empty, Empty
| Node (a, x, b) ->
if cmp x pivot <= 0
then begin
match b with
| Empty -> (tree, Empty)
| Node (b1, y, b2) ->
if cmp y pivot <= 0
then
let small, big = partition ~cmp pivot b2 in
Node (Node (a, x, b1), y, small), big
else
let small, big = partition ~cmp pivot b1 in
Node (a, x, small), Node (big, y, b2)
end else begin
match a with
| Empty -> (Empty, tree)
| Node (a1, y, a2) ->
if cmp y pivot <= 0
then
let small, big = partition ~cmp pivot a2 in
Node (a1, y, small), Node (big, x, b)
else
let small, big = partition ~cmp pivot a1 in
small, Node (big, y, Node (a2, x, b))
end
(** Insert the element in the tree *)
let insert h x =
let small, big = partition ~cmp:h.cmp x h.tree in
let tree' = Node (small, x, big) in
h.tree <- tree'
(** Access minimum value *)
let min h =
let rec min tree =
match tree with
| Empty -> raise Not_found
| Node (Empty, x, _) -> x
| Node (l, _, _) -> min l
in min h.tree
(** Get minimum value and remove it from the tree *)
let pop h =
let rec delete_min tree = match tree with
| Empty -> raise Not_found
| Node (Empty, x, b) -> x, b
| Node (Node (Empty, x, b), y, c) ->
x, Node (b, y, c) (* rebalance *)
| Node (Node (a, x, b), y, c) ->
let m, a' = delete_min a in
m, Node (a', x, Node (b, y, c))
in
let m, tree' = delete_min h.tree in
h.tree <- tree';
m
end
module Timer = struct
type t = {
mutable stop : bool;
mutable thread : Thread.t option; (* thread dedicated to the timer *)
pool : Pool.t;
tasks : (float * (unit -> unit)) Heap.t;
mutex : Mutex.t;
fifo_in : Unix.file_descr;
fifo_out : Unix.file_descr;
} (** A timer for events *)
let cmp_tasks (f1,_) (f2,_) =
if f1 < f2 then -1
else if f1 > f2 then 1
else 0
let standby_wait = 300. (* when no task is scheduled *)
let epsilon = 0.0001 (* accepted time diff for actions *)
(** Wait for next event, run it, and loop *)
let serve timer =
let buf = String.make 1 '_' in
(* process next task *)
let rec next () =
Mutex.lock timer.mutex;
(* what is the next task? *)
let next_task =
try Some (Heap.min timer.tasks)
with Not_found -> None in
match next_task with
| _ when timer.stop -> Mutex.unlock timer.mutex (* stop *)
| None ->
Mutex.unlock timer.mutex;
wait standby_wait (* wait for a task *)
| Some (time, task) ->
let now = Unix.gettimeofday () in
if now +. epsilon > time
then begin (* run task in the pool *)
Pool.run timer.pool task;
ignore (Heap.pop timer.tasks);
Mutex.unlock timer.mutex;
(* process next task, if any *)
next ()
end else (* too early, wait *)
(Mutex.unlock timer.mutex;
wait (time -. now))
(* wait for [delay] seconds, or until something happens on fifo_in *)
and wait delay =
let read = Thread.wait_timed_read timer.fifo_in delay in
(if read then ignore (Unix.read timer.fifo_in buf 0 1)); (* remove char *)
next ()
in
next ()
(** A timer that runs in the given thread pool *)
let create ?(pool=default_pool) () =
let fifo_in, fifo_out = Unix.pipe () in
let timer = {
stop = false;
pool;
thread = None;
tasks = Heap.empty ~cmp:cmp_tasks;
mutex = Mutex.create ();
fifo_in;
fifo_out;
} in
(* start a thread to process tasks *)
let t = Thread.create serve timer in
timer.thread <- Some t;
timer
(** [timerule_at s t act] will run [act] at the Unix echo [t] *)
let schedule_at timer time task =
Mutex.lock timer.mutex;
(* time of the next scheduled event *)
let next_time =
try let time, _ = Heap.min timer.tasks in time
with Not_found -> max_float
in
(* insert task *)
Heap.insert timer.tasks (time, task);
(* see if the timer thread needs to be awaken earlier *)
(if time < next_time
then ignore (Unix.single_write timer.fifo_out "_" 0 1));
Mutex.unlock timer.mutex;
()
(** [schedule_in s d act] will run [act] in [d] seconds *)
let schedule_in timer delay task =
assert (delay >= 0.);
schedule_at timer (Unix.gettimeofday () +. delay) task
(** Stop the given timer, cancelling pending tasks *)
let stop timer =
Mutex.lock timer.mutex;
(if timer.stop then (Mutex.unlock timer.mutex; assert false));
timer.stop <- true;
(* empty heap of tasks *)
Heap.clear timer.tasks;
(* kill the thread *)
(match timer.thread with
| None -> ()
| Some t ->
Thread.kill t;
timer.thread <- None);
Mutex.unlock timer.mutex
end
module Infix = struct module Infix = struct
let (>>=) x f = flatMap f x let (>>=) x f = flatMap f x
let (>>) a f = andThen a f let (>>) a f = andThen a f

View file

@ -142,6 +142,24 @@ val spawn_process : ?pool:Pool.t -> ?stdin:string -> cmd:string ->
val sleep : ?pool:Pool.t -> float -> unit t val sleep : ?pool:Pool.t -> float -> unit t
(** Future that returns with success in the given amount of seconds *) (** Future that returns with success in the given amount of seconds *)
(** {2 Event timer} *)
module Timer : sig
type t
(** A scheduler for events *)
val create : ?pool:Pool.t -> unit -> t
(** A timer that runs tasks in the given thread pool *)
val schedule_at : t -> float -> (unit -> unit) -> unit
(** [schedule_at s t act] will run [act] at the Unix echo [t] *)
val schedule_in : t -> float -> (unit -> unit) -> unit
(** [schedule_in s d act] will run [act] in [d] seconds *)
val stop : t -> unit
(** Stop the given timer, cancelling pending tasks *)
end
module Infix : sig module Infix : sig