(* Copyright (c) 2013, Simon Cruanes All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *) (** {1 Futures for concurrency} *) exception SendTwice (** {2 MVar: a zero-or-one element thread-safe box} *) module MVar = struct type 'a t = { mutable content : 'a option; mutex : Mutex.t; on_take : Condition.t; (* signal that a value was removed (empty) *) on_put : Condition.t; (* signal that a value was added (full) *) } (** Create an empty box *) let empty () = { content = None; mutex = Mutex.create (); on_take = Condition.create (); on_put = Condition.create (); } (** Create a full box *) let full x = { content = Some x; mutex = Mutex.create (); on_take = Condition.create (); on_put = Condition.create (); } (** Is the box currently empty? *) let is_empty box = Mutex.lock box.mutex; let ans = box.content <> None in Mutex.unlock box.mutex; ans (* assuming we have a lock on given box, wait it gets a value and return it *) let rec wait_put box = match box.content with | None -> Condition.wait box.on_put box.mutex; wait_put box (* try again *) | Some x -> x (* same, but waits for the box to become empty *) let rec wait_take box = match box.content with | None -> () (* empty! *) | Some _ -> Condition.wait box.on_take box.mutex; wait_take box (* try again *) (** Take value out of the box. Wait if necessary *) let take box = Mutex.lock box.mutex; let x = wait_put box in box.content <- None; Condition.broadcast box.on_take; Mutex.unlock box.mutex; x (** Put a value in the box. Waits if the box is already full *) let put box x = Mutex.lock box.mutex; wait_take box; box.content <- Some x; Condition.broadcast box.on_put; Mutex.unlock box.mutex (** Use given function to atomically update content, and return the previous value and the new one *) let update box f = Mutex.lock box.mutex; let x = wait_put box in try let y = f x in box.content <- Some y; Condition.broadcast box.on_put; (* signal write *) Mutex.unlock box.mutex; x, y with e -> Mutex.unlock box.mutex; raise e (** Look at the value, without removing it *) let peek box = Mutex.lock box.mutex; let x = wait_put box in Mutex.unlock box.mutex; x end module type S = sig type 'a t (** A future value of type 'a *) val run : t -> (unit -> unit) -> unit (** Run the function in the pool *) val finish : t -> unit (** Kill threads in the pool *) (** {2 Basic low-level Future functions} *) type 'a state = | NotKnown | Success of 'a | Failure of exn val state : 'a t -> 'a state (** Current state of the future *) val get : 'a t -> 'a (** Blocking get: wait for the future to be evaluated, and get the value, or the exception that failed the future is returned *) val is_done : 'a t -> bool (** Is the future evaluated (success/failure)? *) (** {2 Combinators} *) val on_success : 'a t -> ('a -> unit) -> unit (** Attach a handler to be called upon success *) val on_failure : _ t -> (exn -> unit) -> unit (** Attach a handler to be called upon failure *) val on_finish : _ t -> (unit -> unit) -> unit (** Attach a handler to be called when the future is evaluated *) val flatMap : ('a -> 'b t) -> 'a t -> 'b t (** Monadic combination of futures *) val andThen : 'a t -> (unit -> 'b t) -> 'b t (** Wait for the first future to succeed, then launch the second *) val sequence : 'a t list -> 'a list t (** Future that waits for all previous sequences to terminate *) val choose : 'a t list -> 'a t (** Choose among those futures (the first to terminate) *) val map : ('a -> 'b) -> 'a t -> 'b t (** Maps the value inside the future *) (** {2 Future constructors} *) val return : 'a -> 'a t (** Future that is already computed *) val spawn : (unit -> 'a) -> 'a t (** Spawn a thread that wraps the given computation *) val spawn_process : ?stdin:string -> cmd:string -> (int * string * string) t (** Spawn a sub-process with the given command [cmd] (and possibly input); returns a future containing (returncode, stdout, stderr) *) val sleep : float -> unit t (** Future that returns with success in the given amount of seconds *) (** {2 Event timer} *) module Timer : sig val schedule_at : at:float -> (unit -> unit) -> unit (** [schedule_at ~at act] will run [act] at the Unix echo [at] *) val schedule_after : after:t -> float -> (unit -> unit) -> unit (** [schedule_after ~after act] will run [act] in [after] seconds *) end module Infix : sig val (>>=) : 'a t -> ('a -> 'b t) -> 'b t val (>>) : 'a t -> (unit -> 'b t) -> 'b t end val (>>=) : 'a t -> ('a -> 'b t) -> 'b t val (>>) : 'a t -> (unit -> 'b t) -> 'b t end module type CONFIG = sig val timeout : float val max_size : int end module DefaultConfig = struct let timeout = 10. let max_size = 15 let size = 0 end module Make(C : CONFIG) = struct type command = | Perform of (unit -> unit) | Quit (** Command sent to a thread *) type waiting_thread = float * command MVar.t let stop = ref false let mutex = Mutex.create () let jobs = Queue.create () let threads : waiting_thread list ref = ref [] let cur_size = ref 0 (* Cleanup waiting threads. precond: pool is locked *) let cleanup_waiting () = let l = !threads in let now = Unix.gettimeofday () in (* filter threads that have been waiting for too long *) let l' = List.filter (fun (time, box) -> if time +. C.timeout < now then (MVar.put box Quit; false) else true) l in threads := l' (* Function that the threads run. They also take a MVar to get commands *) let serve box = (* wait for a job to come *) let rec wait_job () = match MVar.take box with | Quit -> (Mutex.lock mutex; quit ()) (* exit *) | Perform job -> run_job job (* run the given job *) and run_job job = (try job () with _ -> ()); next () (* loop *) (* process next task *) and next () = Mutex.lock mutex; if !stop then quit () (* stop the pool *) else if Queue.is_empty jobs then begin let now = Unix.gettimeofday () in (* cleanup waiting threads *) cleanup_waiting (); if !cur_size > 1 && List.length !threads + 1 = !cur_size then (* all other threads are waiting, we may need to kill them later *) (Mutex.unlock mutex; delay ()) else begin (* add oneself to the list of waiting threads *) threads := (now, box) :: !threads; Mutex.unlock mutex; wait_job () end end else let job = Queue.pop jobs in Mutex.unlock mutex; run_job job (* delay [pool.timeout], so that in case no job is submitted we still kill old cached threads *) and delay () = Thread.delay C.timeout; next () (* stop the thread (assume we have pool.mutex) *) and quit () = cur_size := !cur_size - 1; Mutex.unlock mutex in wait_job () let size pool = !cur_size (** Add a thread to the pool, starting with the first job *) let add_thread job = let box = MVar.full job in ignore (Thread.create serve box) (** Run the job in the given pool *) let run job = assert (not (!stop)); Mutex.lock mutex; begin match !threads with | [] when !cur_size = C.max_size -> (* max capacity reached, push task in queue *) Queue.push job jobs | [] -> (* spawn a thread for the given task *) add_thread (Perform job); cur_size := !cur_size + 1; | (_,box)::l' -> (* use the first thread *) MVar.put box (Perform job); threads := l'; end; Mutex.unlock mutex (** Kill threads in the pool *) let finish () = Mutex.lock mutex; stop := true; (* kill waiting threads *) List.iter (fun (_,box) -> MVar.put box Quit) !threads; threads := []; Mutex.unlock mutex (** {3 Futures} *) type 'a state = | NotKnown | Success of 'a | Failure of exn type 'a handler = | OnSuccess of ('a -> unit) | OnFailure of (exn -> unit) | OnFinish of (unit -> unit) type 'a t = { mutable state : 'a state; mutable handlers : 'a handler list; (* handlers *) } (** A future value of type 'a *) let make_empty pool = { state = NotKnown; handlers = []; } let send future x = match future.state with | NotKnown -> (* set content and signal *) future.state <- Success x; List.iter (function | OnSuccess f -> run (fun () -> f x) | OnFinish f -> run (fun () -> f ()) | OnFailure _ -> ()) future.handlers; | _ -> raise SendTwice (* already set! *) let fail future e = match future.state with | NotKnown -> (* set content and signal *) future.state <- Failure e; List.iter (function | OnSuccess _ -> () | OnFinish f -> f () | OnFailure f -> f e) future.handlers | _ -> raise SendTwice (* already set! *) let is_done future = match future.state with | NotKnown -> false | _ -> true (** {2 Combinators *) let on_success future k = match future.state with | NotKnown -> future.handlers <- (OnSuccess k) :: future.handlers; (* wait *) | Success x -> run (fun () -> k x) | Failure _ -> () let on_failure future k = match future.state with | NotKnown -> future.handlers <- (OnFailure k) :: future.handlers; (* wait *) | Success _ -> () | Failure e -> run (fun () -> k e) let on_finish future k = match future.state with | NotKnown -> future.handlers <- (OnFinish k) :: future.handlers; (* wait *) | Success _ | Failure _ -> run (fun () -> k ()) let flatMap f future = let future' = make_empty () in (* if [future] succeeds with [x], we spawn a new job to compute [f x] *) on_success future (fun x -> try let future'' = f x in on_success future'' (fun x -> send future' x); on_failure future'' (fun e -> fail future' e); with e -> fail future' e); on_failure future (fun e -> fail future' e); future' let andThen future f = flatMap (fun _ -> f ()) future let sequence futures = let a = Array.of_list futures in let n = Array.length a in let results = Array.make n NotKnown in let future' = make_empty () in (* state: how many remain to finish *) let count = MVar.full (Array.length a) in (* when all futures returned, collect results for future' *) let check_at_end () = let l = Array.to_list results in try let l = List.map (function | Success x -> x | Failure e -> raise e | NotKnown -> assert false) l in send future' l with e -> fail future' e in (* function called whenever a future succeeds *) let one_succeeded i x = results.(i) <- Success x; let _, n = MVar.update count (fun x -> x-1) in if n = 0 then check_at_end () and one_failed i e = results.(i) <- Failure e; let _, n = MVar.update count (fun x -> x-1) in if n = 0 then check_at_end () in (* wait for all to succeed or fail *) for i = 0 to Array.length a - 1 do on_success a.(i) (one_succeeded i); on_failure a.(i) (one_failed i); done; future' let choose futures = let future' = make_empty () in let one_finished = MVar.full false in (* handlers. The first handler to be called will update [one_finished] to true, see that it was false (hence know it is the first) and propagate its result to [future'] *) let one_succeeded x = let one_finished, _ = MVar.update one_finished (fun _ -> true) in if not one_finished then send future' x and one_failed e = let one_finished, _ = MVar.update one_finished (fun _ -> true) in if not one_finished then fail future' e in (* add handlers to all futures *) List.iter (fun future -> on_success future one_succeeded; on_failure future one_failed; ) futures; future' let map f future = let future' = make_empty () in on_success future (fun x -> let y = f x in send future' y); on_failure future (fun e -> fail future' e); future' (** {2 Future constructors} *) let return x = { state = Success x; handlers = []; } let spawn f = let future = make_empty () in (* schedule computation *) run (fun () -> try let x = f () in send future x with e -> fail future e); future (** slurp the entire content of the file_descr into a string *) let slurp i_chan = let buf_size = 128 in let content = Buffer.create 120 and buf = String.make 128 'a' in let rec next () = let num = input i_chan buf 0 buf_size in if num = 0 then Buffer.contents content (* EOF *) else (Buffer.add_substring content buf 0 num; next ()) in next () (** Spawn a sub-process with the given command [cmd] (and possibly input); returns a future containing (returncode, stdout, stderr) *) let spawn_process ?(stdin="") ~cmd = spawn (fun () -> (* spawn subprocess *) let out, inp, err = Unix.open_process_full cmd (Unix.environment ()) in output_string inp stdin; (* send stdin to command *) flush inp; close_out inp; (* read output of process *) let out' = slurp out in let err' = slurp err in (* wait for termination *) let status = Unix.close_process_full (out,inp,err) in (* get return code *) let returncode = match status with | Unix.WEXITED i -> i | Unix.WSIGNALED i -> i | Unix.WSTOPPED i -> i in (returncode, out', err')) let sleep time = spawn (fun () -> Thread.delay time; ()) (** {3 Mutable heap} inlined here for avoiding dependencies *) module Heap = struct (** Implementation from http://en.wikipedia.org/wiki/Skew_heap *) type 'a t = { mutable tree : 'a tree; cmp : 'a -> 'a -> int; } (** A pairing tree heap with the given comparison function *) and 'a tree = | Empty | Node of 'a * 'a tree * 'a tree let empty ~cmp = { tree = Empty; cmp; } let is_empty h = match h.tree with | Empty -> true | Node _ -> false let rec union ~cmp t1 t2 = match t1, t2 with | Empty, _ -> t2 | _, Empty -> t1 | Node (x1, l1, r1), Node (x2, l2, r2) -> if cmp x1 x2 <= 0 then Node (x1, union ~cmp t2 r1, l1) else Node (x2, union ~cmp t1 r2, l2) let insert h x = h.tree <- union ~cmp:h.cmp (Node (x, Empty, Empty)) h.tree let pop h = match h.tree with | Empty -> raise Not_found | Node (x, l, r) -> h.tree <- union ~cmp:h.cmp l r; x end (** {2 Event timer} *) module Timer = struct let cmp_tasks (f1,_) (f2,_) = compare f1 f2 let stop = ref false let tasks : (float * (unit -> unit)) Heap.t = Heap.empty cmp:cmp_tasks let fifo_in, fifo_out = Unix.pipe () let thread = ref None let standby_wait = 30. (* when no task is scheduled *) let epsilon = 0.0001 (* accepted time diff for actions *) (** Wait for next event, run it, and loop *) let serve () = let buf = String.make 1 '_' in (* process next task *) let rec next () = Mutex.lock mutex; (* what is the next task? *) let next_task = try Some (Heap.min tasks) with Not_found -> None in match next_task with | _ when timer.stop -> Mutex.unlock mutex (* stop *) | None -> Mutex.unlock mutex; wait standby_wait (* wait for a task *) | Some (time, task) -> let now = Unix.gettimeofday () in if now +. epsilon > time then begin (* run task in the pool *) run task; ignore (Heap.pop tasks); Mutex.unlock mutex; (* process next task, if any *) next () end else (* too early, wait *) (Mutex.unlock mutex; wait (time -. now)) (* wait for [delay] seconds, or until something happens on fifo_in *) and wait delay = let read = Thread.wait_timed_read fifo_in delay in if read then ignore (Unix.read fifo_in buf 0 1); (* remove char *) next () in next () let () = let t = Thread.create server timer in thread := Some t; () let schedule_at ~at task = Mutex.lock mutex; (* time of the next scheduled event *) let next_time = try let time, _ = Heap.min tasks in time with Not_found -> max_float in (* insert task *) Heap.insert tasks (time, task); (* see if the timer thread needs to be awaken earlier *) (if time < next_time then ignore (Unix.single_write fifo_out "_" 0 1)); Mutex.unlock mutex; () let schedule_after ~after task = assert (delay >= 0.); schedule_at ~at:(Unix.gettimeofday () +. delay) task end module Infix = struct let (>>=) x f = flatMap f x let (>>) a f = andThen a f end include Infix end