ocaml-containers/future.ml
2014-12-21 11:33:42 +01:00

635 lines
18 KiB
OCaml

(*
Copyright (c) 2013, Simon Cruanes
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer. Redistributions in binary
form must reproduce the above copyright notice, this list of conditions and the
following disclaimer in the documentation and/or other materials provided with
the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*)
(** {1 Futures for concurrency} *)
exception SendTwice
(** {2 MVar: a zero-or-one element thread-safe box} *)
module MVar = struct
type 'a t = {
mutable content : 'a option;
mutex : Mutex.t;
on_take : Condition.t; (* signal that a value was removed (empty) *)
on_put : Condition.t; (* signal that a value was added (full) *)
}
(* Create an empty box *)
let empty () = {
content = None;
mutex = Mutex.create ();
on_take = Condition.create ();
on_put = Condition.create ();
}
(* Create a full box *)
let full x = {
content = Some x;
mutex = Mutex.create ();
on_take = Condition.create ();
on_put = Condition.create ();
}
(* Is the box currently empty? *)
let is_empty box = match box.content with
| Some _ -> true
| None -> false
(* assuming we have a lock on given box, wait it gets a value and return it *)
let rec wait_put box =
match box.content with
| None ->
Condition.wait box.on_put box.mutex;
wait_put box (* try again *)
| Some x -> x
(* same, but waits for the box to become empty *)
let rec wait_take box =
match box.content with
| None -> () (* empty! *)
| Some _ ->
Condition.wait box.on_take box.mutex;
wait_take box (* try again *)
(* Take value out of the box. Wait if necessary *)
let take box =
Mutex.lock box.mutex;
let x = wait_put box in
box.content <- None;
Condition.broadcast box.on_take;
Mutex.unlock box.mutex;
x
(* Put a value in the box. Waits if the box is already full *)
let put box x =
Mutex.lock box.mutex;
wait_take box;
box.content <- Some x;
Condition.broadcast box.on_put;
Mutex.unlock box.mutex
(* Use given function to atomically update content, and return
the previous value and the new one *)
let update box f =
Mutex.lock box.mutex;
let x = wait_put box in
try
let x', res = f x in
box.content <- Some x';
Condition.broadcast box.on_put; (* signal write *)
Mutex.unlock box.mutex;
res
with e ->
Mutex.unlock box.mutex;
raise e
(* Look at the value, without removing it *)
let peek box =
Mutex.lock box.mutex;
let x = wait_put box in
Mutex.unlock box.mutex;
x
end
module type S = sig
type 'a t
(** A future value of type 'a *)
val run : (unit -> unit) -> unit
(** Use the underlying thread pool to run this job *)
val finish : unit -> unit
(** Kill threads in the pool. The pool won't be usable any more. *)
(** {2 Basic low-level Future functions} *)
type 'a state =
| NotKnown
| Success of 'a
| Failure of exn
val state : 'a t -> 'a state
(** Current state of the future *)
val is_done : 'a t -> bool
(** Is the future evaluated (success/failure)? *)
(** {2 Combinators} *)
val on_success : 'a t -> ('a -> unit) -> unit
(** Attach a handler to be called upon success *)
val on_failure : _ t -> (exn -> unit) -> unit
(** Attach a handler to be called upon failure *)
val on_finish : _ t -> (unit -> unit) -> unit
(** Attach a handler to be called when the future is evaluated *)
val flatMap : ('a -> 'b t) -> 'a t -> 'b t
(** Monadic combination of futures *)
val andThen : 'a t -> (unit -> 'b t) -> 'b t
(** Wait for the first future to succeed, then launch the second *)
val sequence : 'a t list -> 'a list t
(** Future that waits for all previous sequences to terminate *)
val choose : 'a t list -> 'a t
(** Choose among those futures (the first to terminate) *)
val map : ('a -> 'b) -> 'a t -> 'b t
(** Maps the value inside the future *)
(** {2 Future constructors} *)
val return : 'a -> 'a t
(** Future that is already computed *)
val spawn : (unit -> 'a) -> 'a t
(** Spawn a thread that wraps the given computation *)
val spawn_process : ?stdin:string -> cmd:string ->
(int * string * string) t
(** Spawn a sub-process with the given command [cmd] (and possibly input);
returns a future containing (returncode, stdout, stderr) *)
val sleep : float -> unit t
(** Future that returns with success in the given amount of seconds *)
(** {2 Event timer} *)
module Timer : sig
val at : float -> (unit -> unit) -> unit
(** [schedule_at ~at act] will run [act] at the Unix echo [at] *)
val after : float -> (unit -> unit) -> unit
(** [schedule_after ~after act] will run [act] in [after] seconds *)
end
module Infix : sig
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
val (>>) : 'a t -> (unit -> 'b t) -> 'b t
end
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
val (>>) : 'a t -> (unit -> 'b t) -> 'b t
end
module type CONFIG = sig
val min_size : int
val max_size : int
end
module DefaultConfig = struct
let min_size = 0
let max_size = 15
end
(** {2 Mutable heap}
inlined here for avoiding dependencies *)
module Heap = struct
(** Implementation from http://en.wikipedia.org/wiki/Skew_heap *)
type 'a t = {
mutable tree : 'a tree;
cmp : 'a -> 'a -> int;
} (** A pairing tree heap with the given comparison function *)
and 'a tree =
| Empty
| Node of 'a * 'a tree * 'a tree
let empty ~cmp = {
tree = Empty;
cmp;
}
let is_empty h =
match h.tree with
| Empty -> true
| Node _ -> false
let rec union ~cmp t1 t2 = match t1, t2 with
| Empty, _ -> t2
| _, Empty -> t1
| Node (x1, l1, r1), Node (x2, l2, r2) ->
if cmp x1 x2 <= 0
then Node (x1, union ~cmp t2 r1, l1)
else Node (x2, union ~cmp t1 r2, l2)
let insert h x =
h.tree <- union ~cmp:h.cmp (Node (x, Empty, Empty)) h.tree
let min h = match h.tree with
| Empty -> raise Not_found
| Node (x, _, _) -> x
let pop h = match h.tree with
| Empty -> raise Not_found
| Node (x, l, r) ->
h.tree <- union ~cmp:h.cmp l r;
x
end
module Make(C : CONFIG) = struct
type command =
| Perform of (unit -> unit)
| Quit
(** Command sent to a thread *)
type waiting_thread = float * command MVar.t
let stop = ref false
let mutex = Mutex.create ()
let jobs = Queue.create ()
let new_task = Condition.create () (* signal when new task *)
let cur_size = ref 0
(* Function that the threads run *)
let rec serve () =
Mutex.lock mutex;
next_task ()
(* process next task *)
and next_task () =
if !stop then Condition.broadcast new_task (* and stop *)
else match poll () with
| Some job ->
Mutex.unlock mutex;
begin try job()
with _ -> ()
end;
serve ()
| None ->
if !cur_size > C.min_size
then () (* stop, too many threads *)
else next_task()
(* poll for next task *)
and poll () =
if Queue.is_empty jobs
then begin
Condition.wait new_task mutex;
if !stop || Queue.is_empty jobs
then None
else begin
let job = Queue.pop jobs in
Condition.signal new_task;
Some job
end
end else
Some (Queue.pop jobs)
(* TODO: start thread iff new task and not max_size reached *)
(* Add a thread to the pool, starting with the first job *)
let add_thread job =
let box = MVar.full job in
ignore (Thread.create serve box)
(* Run the job in the pool *)
let run job =
assert (not (!stop));
Mutex.lock mutex;
begin match !threads with
| [] when !cur_size = C.max_size ->
(* max capacity reached, push task in queue *)
Queue.push job jobs
| [] ->
assert (!cur_size < C.max_size);
(* spawn a thread for the given task *)
add_thread (Perform job);
cur_size := !cur_size + 1;
| (_,box)::l' ->
(* use the first thread *)
MVar.put box (Perform job);
threads := l';
end;
Mutex.unlock mutex
(** Kill threads in the pool *)
let finish () =
Mutex.lock mutex;
stop := true;
(* kill waiting threads *)
List.iter (fun (_,box) -> MVar.put box Quit) !threads;
threads := [];
Mutex.unlock mutex
(** {3 Futures} *)
type 'a state =
| NotKnown
| Success of 'a
| Failure of exn
type 'a handler =
| OnSuccess of ('a -> unit)
| OnFailure of (exn -> unit)
| OnFinish of (unit -> unit)
type 'a t = {
mutable state : 'a state;
mutable handlers : 'a handler list; (* handlers *)
} (** A future value of type 'a *)
let make_empty pool = {
state = NotKnown;
handlers = [];
}
let state f = f.state
let send future x =
match future.state with
| NotKnown -> (* set content and signal *)
future.state <- Success x;
List.iter
(function
| OnSuccess f -> run (fun () -> f x)
| OnFinish f -> run (fun () -> f ())
| OnFailure _ -> ())
future.handlers;
| _ ->
raise SendTwice (* already set! *)
let fail future e =
match future.state with
| NotKnown -> (* set content and signal *)
future.state <- Failure e;
List.iter
(function
| OnSuccess _ -> ()
| OnFinish f -> f ()
| OnFailure f -> f e)
future.handlers
| _ ->
raise SendTwice (* already set! *)
let is_done future =
match future.state with
| NotKnown -> false
| _ -> true
(** {2 Combinators *)
let on_success future k =
match future.state with
| NotKnown ->
future.handlers <- (OnSuccess k) :: future.handlers; (* wait *)
| Success x -> run (fun () -> k x)
| Failure _ -> ()
let on_failure future k =
match future.state with
| NotKnown ->
future.handlers <- (OnFailure k) :: future.handlers; (* wait *)
| Success _ -> ()
| Failure e -> run (fun () -> k e)
let on_finish future k =
match future.state with
| NotKnown ->
future.handlers <- (OnFinish k) :: future.handlers; (* wait *)
| Success _ | Failure _ -> run (fun () -> k ())
let flatMap f future =
let future' = make_empty () in
(* if [future] succeeds with [x], we spawn a new job to compute [f x] *)
on_success future
(fun x ->
try
let future'' = f x in
on_success future'' (fun x -> send future' x);
on_failure future'' (fun e -> fail future' e);
with e ->
fail future' e);
on_failure future
(fun e -> fail future' e);
future'
let andThen future f =
flatMap (fun _ -> f ()) future
let sequence futures =
let a = Array.of_list futures in
let n = Array.length a in
let results = Array.make n NotKnown in
let future' = make_empty () in
(* state: how many remain to finish *)
let count = MVar.full (Array.length a) in
(* when all futures returned, collect results for future' *)
let check_at_end () =
let l = Array.to_list results in
try
let l = List.map
(function
| Success x -> x
| Failure e -> raise e
| NotKnown -> assert false)
l in
send future' l
with e ->
fail future' e
in
(* function called whenever a future succeeds *)
let one_succeeded i x =
results.(i) <- Success x;
let _, n = MVar.update count (fun x -> x-1) in
if n = 0 then check_at_end ()
and one_failed i e =
results.(i) <- Failure e;
let _, n = MVar.update count (fun x -> x-1) in
if n = 0 then check_at_end ()
in
(* wait for all to succeed or fail *)
for i = 0 to Array.length a - 1 do
on_success a.(i) (one_succeeded i);
on_failure a.(i) (one_failed i);
done;
future'
let choose futures =
let future' = make_empty () in
let one_finished = MVar.full false in
(* handlers. The first handler to be called will update [one_finished]
to true, see that it was false (hence know it is the first)
and propagate its result to [future'] *)
let one_succeeded x =
let one_finished, _ = MVar.update one_finished (fun _ -> true) in
if not one_finished then send future' x
and one_failed e =
let one_finished, _ = MVar.update one_finished (fun _ -> true) in
if not one_finished then fail future' e
in
(* add handlers to all futures *)
List.iter
(fun future ->
on_success future one_succeeded;
on_failure future one_failed; )
futures;
future'
let map f future =
let future' = make_empty () in
on_success future (fun x -> let y = f x in send future' y);
on_failure future (fun e -> fail future' e);
future'
(** {2 Future constructors} *)
let return x = {
state = Success x;
handlers = [];
}
let spawn f =
let future = make_empty () in
(* schedule computation *)
run (fun () ->
try
let x = f () in
send future x
with e ->
fail future e);
future
(** slurp the entire content of the file_descr into a string *)
let slurp i_chan =
let buf_size = 128 in
let content = Buffer.create 120
and buf = String.make 128 'a' in
let rec next () =
let num = input i_chan buf 0 buf_size in
if num = 0
then Buffer.contents content (* EOF *)
else (Buffer.add_substring content buf 0 num; next ())
in next ()
(** Spawn a sub-process with the given command [cmd] (and possibly input);
returns a future containing (returncode, stdout, stderr) *)
let spawn_process ?(stdin="") ~cmd =
spawn (fun () ->
(* spawn subprocess *)
let out, inp, err = Unix.open_process_full cmd (Unix.environment ()) in
output_string inp stdin;
(* send stdin to command *)
flush inp;
close_out inp;
(* read output of process *)
let out' = slurp out in
let err' = slurp err in
(* wait for termination *)
let status = Unix.close_process_full (out,inp,err) in
(* get return code *)
let returncode = match status with
| Unix.WEXITED i -> i
| Unix.WSIGNALED i -> i
| Unix.WSTOPPED i -> i in
(returncode, out', err'))
let sleep time =
spawn (fun () -> Thread.delay time; ())
(** {2 Event timer} *)
module Timer = struct
let cmp_tasks (f1,_) (f2,_) =
compare f1 f2
let stop = ref false
let tasks : (float * (unit -> unit)) Heap.t = Heap.empty ~cmp:cmp_tasks
let fifo_in, fifo_out = Unix.pipe ()
let thread = ref None
let standby_wait = 30. (* when no task is scheduled *)
let epsilon = 0.0001 (* accepted time diff for actions *)
(** Wait for next event, run it, and loop *)
let serve () =
let buf = String.make 1 '_' in
(* process next task *)
let rec next () =
Mutex.lock mutex;
(* what is the next task? *)
let next_task =
try Some (Heap.min tasks)
with Not_found -> None in
match next_task with
| _ when !stop -> Mutex.unlock mutex (* stop *)
| None ->
Mutex.unlock mutex;
wait standby_wait (* wait for a task *)
| Some (time, task) ->
let now = Unix.gettimeofday () in
if now +. epsilon > time
then begin (* run task in the pool *)
run task;
ignore (Heap.pop tasks);
Mutex.unlock mutex;
(* process next task, if any *)
next ()
end else (* too early, wait *)
(Mutex.unlock mutex;
wait (time -. now))
(* wait for [delay] seconds, or until something happens on fifo_in *)
and wait delay =
let read = Thread.wait_timed_read fifo_in delay in
if read
then ignore (Unix.read fifo_in buf 0 1); (* remove char *)
next ()
in
next ()
let () =
let t = Thread.create serve () in
thread := Some t;
()
let at time task =
Mutex.lock mutex;
(* time of the next scheduled event *)
let next_time =
try let time, _ = Heap.min tasks in time
with Not_found -> max_float
in
(* insert task *)
Heap.insert tasks (time, task);
(* see if the timer thread needs to be awaken earlier *)
if time < next_time
then ignore (Unix.single_write fifo_out "_" 0 1);
Mutex.unlock mutex;
()
let after after task =
assert (after>= 0.);
at (Unix.gettimeofday () +. after) task
end
module Infix = struct
let (>>=) x f = flatMap f x
let (>>) a f = andThen a f
end
include Infix
end
module Std = Make(DefaultConfig)