diff --git a/future.ml b/future.ml index 2657a822..1fab5436 100644 --- a/future.ml +++ b/future.ml @@ -520,6 +520,202 @@ let sleep ?(pool=default_pool) time = spawn ~pool (fun () -> Thread.delay time; ()) +(** {2 Event timer} *) + +(** {3 Mutable heap (taken from heap.ml to avoid dependencies)} *) +module Heap = struct + type 'a t = { + mutable tree : 'a tree; + cmp : 'a -> 'a -> int; + } (** A splay tree heap with the given comparison function *) + and 'a tree = + | Empty + | Node of ('a tree * 'a * 'a tree) + (** A splay tree containing values of type 'a *) + + let empty ~cmp = { + tree = Empty; + cmp; + } + + let is_empty h = + match h.tree with + | Empty -> true + | Node _ -> false + + let clear h = + h.tree <- Empty + + (** Partition the tree into (elements <= pivot, elements > pivot) *) + let rec partition ~cmp pivot tree = + match tree with + | Empty -> Empty, Empty + | Node (a, x, b) -> + if cmp x pivot <= 0 + then begin + match b with + | Empty -> (tree, Empty) + | Node (b1, y, b2) -> + if cmp y pivot <= 0 + then + let small, big = partition ~cmp pivot b2 in + Node (Node (a, x, b1), y, small), big + else + let small, big = partition ~cmp pivot b1 in + Node (a, x, small), Node (big, y, b2) + end else begin + match a with + | Empty -> (Empty, tree) + | Node (a1, y, a2) -> + if cmp y pivot <= 0 + then + let small, big = partition ~cmp pivot a2 in + Node (a1, y, small), Node (big, x, b) + else + let small, big = partition ~cmp pivot a1 in + small, Node (big, y, Node (a2, x, b)) + end + + (** Insert the element in the tree *) + let insert h x = + let small, big = partition ~cmp:h.cmp x h.tree in + let tree' = Node (small, x, big) in + h.tree <- tree' + + (** Access minimum value *) + let min h = + let rec min tree = + match tree with + | Empty -> raise Not_found + | Node (Empty, x, _) -> x + | Node (l, _, _) -> min l + in min h.tree + + (** Get minimum value and remove it from the tree *) + let pop h = + let rec delete_min tree = match tree with + | Empty -> raise Not_found + | Node (Empty, x, b) -> x, b + | Node (Node (Empty, x, b), y, c) -> + x, Node (b, y, c) (* rebalance *) + | Node (Node (a, x, b), y, c) -> + let m, a' = delete_min a in + m, Node (a', x, Node (b, y, c)) + in + let m, tree' = delete_min h.tree in + h.tree <- tree'; + m +end + +module Timer = struct + type t = { + mutable stop : bool; + mutable thread : Thread.t option; (* thread dedicated to the timer *) + pool : Pool.t; + tasks : (float * (unit -> unit)) Heap.t; + mutex : Mutex.t; + fifo_in : Unix.file_descr; + fifo_out : Unix.file_descr; + } (** A timer for events *) + + let cmp_tasks (f1,_) (f2,_) = + if f1 < f2 then -1 + else if f1 > f2 then 1 + else 0 + + let standby_wait = 300. (* when no task is scheduled *) + let epsilon = 0.0001 (* accepted time diff for actions *) + + (** Wait for next event, run it, and loop *) + let serve timer = + let buf = String.make 1 '_' in + (* process next task *) + let rec next () = + Mutex.lock timer.mutex; + (* what is the next task? *) + let next_task = + try Some (Heap.min timer.tasks) + with Not_found -> None in + match next_task with + | _ when timer.stop -> Mutex.unlock timer.mutex (* stop *) + | None -> + Mutex.unlock timer.mutex; + wait standby_wait (* wait for a task *) + | Some (time, task) -> + let now = Unix.gettimeofday () in + if now +. epsilon > time + then begin (* run task in the pool *) + Pool.run timer.pool task; + ignore (Heap.pop timer.tasks); + Mutex.unlock timer.mutex; + (* process next task, if any *) + next () + end else (* too early, wait *) + (Mutex.unlock timer.mutex; + wait (time -. now)) + (* wait for [delay] seconds, or until something happens on fifo_in *) + and wait delay = + let read = Thread.wait_timed_read timer.fifo_in delay in + (if read then ignore (Unix.read timer.fifo_in buf 0 1)); (* remove char *) + next () + in + next () + + (** A timer that runs in the given thread pool *) + let create ?(pool=default_pool) () = + let fifo_in, fifo_out = Unix.pipe () in + let timer = { + stop = false; + pool; + thread = None; + tasks = Heap.empty ~cmp:cmp_tasks; + mutex = Mutex.create (); + fifo_in; + fifo_out; + } in + (* start a thread to process tasks *) + let t = Thread.create serve timer in + timer.thread <- Some t; + timer + + (** [timerule_at s t act] will run [act] at the Unix echo [t] *) + let schedule_at timer time task = + Mutex.lock timer.mutex; + (* time of the next scheduled event *) + let next_time = + try let time, _ = Heap.min timer.tasks in time + with Not_found -> max_float + in + (* insert task *) + Heap.insert timer.tasks (time, task); + (* see if the timer thread needs to be awaken earlier *) + (if time < next_time + then ignore (Unix.single_write timer.fifo_out "_" 0 1)); + Mutex.unlock timer.mutex; + () + + (** [schedule_in s d act] will run [act] in [d] seconds *) + let schedule_in timer delay task = + assert (delay >= 0.); + schedule_at timer (Unix.gettimeofday () +. delay) task + + (** Stop the given timer, cancelling pending tasks *) + let stop timer = + Mutex.lock timer.mutex; + (if timer.stop then (Mutex.unlock timer.mutex; assert false)); + timer.stop <- true; + (* empty heap of tasks *) + Heap.clear timer.tasks; + (* kill the thread *) + (match timer.thread with + | None -> () + | Some t -> + Thread.kill t; + timer.thread <- None); + Mutex.unlock timer.mutex +end + + module Infix = struct let (>>=) x f = flatMap f x let (>>) a f = andThen a f diff --git a/future.mli b/future.mli index 48988002..af02be97 100644 --- a/future.mli +++ b/future.mli @@ -142,6 +142,24 @@ val spawn_process : ?pool:Pool.t -> ?stdin:string -> cmd:string -> val sleep : ?pool:Pool.t -> float -> unit t (** Future that returns with success in the given amount of seconds *) +(** {2 Event timer} *) + +module Timer : sig + type t + (** A scheduler for events *) + + val create : ?pool:Pool.t -> unit -> t + (** A timer that runs tasks in the given thread pool *) + + val schedule_at : t -> float -> (unit -> unit) -> unit + (** [schedule_at s t act] will run [act] at the Unix echo [t] *) + + val schedule_in : t -> float -> (unit -> unit) -> unit + (** [schedule_in s d act] will run [act] in [d] seconds *) + + val stop : t -> unit + (** Stop the given timer, cancelling pending tasks *) +end module Infix : sig