From 1972f0f55dc346cf3cfc2a3505d7877827a01b7c Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 7 Feb 2014 23:47:00 +0100 Subject: [PATCH] started refactoring Future --- future.ml | 932 ++++++++++++++++++++----------------------- future.mli | 151 ++++--- tests/test_future.ml | 45 ++- 3 files changed, 530 insertions(+), 598 deletions(-) diff --git a/future.ml b/future.ml index b1564738..51372ea4 100644 --- a/future.ml +++ b/future.ml @@ -25,6 +25,8 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. (** {1 Futures for concurrency} *) +exception SendTwice + (** {2 MVar: a zero-or-one element thread-safe box} *) module MVar = struct @@ -114,44 +116,139 @@ module MVar = struct x end -(** {2 Thread pool} *) -module Pool = struct - type t = { - mutable stop : bool; (* indicate that threads should stop *) - mutex : Mutex.t; - jobs : job Queue.t; (* waiting jobs *) - mutable threads : waiting_thread list; (* waiting threads *) - mutable cur_size : int; - max_size : int; - timeout : float; (* idle time after which to discard threads *) - } (** Dynamic, growable thread pool *) - and job = unit -> unit - and command = - | Perform of job +module type S = sig + type 'a t + (** A future value of type 'a *) + + val run : t -> (unit -> unit) -> unit + (** Run the function in the pool *) + + val finish : t -> unit + (** Kill threads in the pool *) + + (** {2 Basic low-level Future functions} *) + + type 'a state = + | NotKnown + | Success of 'a + | Failure of exn + + val state : 'a t -> 'a state + (** Current state of the future *) + + val get : 'a t -> 'a + (** Blocking get: wait for the future to be evaluated, and get the value, + or the exception that failed the future is returned *) + + val is_done : 'a t -> bool + (** Is the future evaluated (success/failure)? *) + + (** {2 Combinators} *) + + val on_success : 'a t -> ('a -> unit) -> unit + (** Attach a handler to be called upon success *) + + val on_failure : _ t -> (exn -> unit) -> unit + (** Attach a handler to be called upon failure *) + + val on_finish : _ t -> (unit -> unit) -> unit + (** Attach a handler to be called when the future is evaluated *) + + val flatMap : ('a -> 'b t) -> 'a t -> 'b t + (** Monadic combination of futures *) + + val andThen : 'a t -> (unit -> 'b t) -> 'b t + (** Wait for the first future to succeed, then launch the second *) + + val sequence : 'a t list -> 'a list t + (** Future that waits for all previous sequences to terminate *) + + val choose : 'a t list -> 'a t + (** Choose among those futures (the first to terminate) *) + + val map : ('a -> 'b) -> 'a t -> 'b t + (** Maps the value inside the future *) + + (** {2 Future constructors} *) + + val return : 'a -> 'a t + (** Future that is already computed *) + + val spawn : (unit -> 'a) -> 'a t + (** Spawn a thread that wraps the given computation *) + + val spawn_process : ?stdin:string -> cmd:string -> + (int * string * string) t + (** Spawn a sub-process with the given command [cmd] (and possibly input); + returns a future containing (returncode, stdout, stderr) *) + + val sleep : float -> unit t + (** Future that returns with success in the given amount of seconds *) + + (** {2 Event timer} *) + + module Timer : sig + val schedule_at : at:float -> (unit -> unit) -> unit + (** [schedule_at ~at act] will run [act] at the Unix echo [at] *) + + val schedule_after : after:t -> float -> (unit -> unit) -> unit + (** [schedule_after ~after act] will run [act] in [after] seconds *) + end + + module Infix : sig + val (>>=) : 'a t -> ('a -> 'b t) -> 'b t + val (>>) : 'a t -> (unit -> 'b t) -> 'b t + end + + val (>>=) : 'a t -> ('a -> 'b t) -> 'b t + val (>>) : 'a t -> (unit -> 'b t) -> 'b t +end + +module type CONFIG = sig + val timeout : float + + val max_size : int +end + +module DefaultConfig = struct + let timeout = 10. + let max_size = 15 + let size = 0 +end + +module Make(C : CONFIG) = struct + type command = + | Perform of (unit -> unit) | Quit (** Command sent to a thread *) - and waiting_thread = float * command MVar.t - (** Cleanup waiting threads. precond: pool is locked *) - let cleanup_waiting pool = - let l = pool.threads in + type waiting_thread = float * command MVar.t + + let stop = ref false + let mutex = Mutex.create () + let jobs = Queue.create () + let threads : waiting_thread list ref = ref [] + let cur_size = ref 0 + + (* Cleanup waiting threads. precond: pool is locked *) + let cleanup_waiting () = + let l = !threads in let now = Unix.gettimeofday () in (* filter threads that have been waiting for too long *) let l' = List.filter (fun (time, box) -> - if time +. pool.timeout < now + if time +. C.timeout < now then (MVar.put box Quit; false) else true) l in - pool.threads <- l' + threads := l' - (** Function that the threads run. They also take a MVar to - get commands *) - let serve pool box = + (* Function that the threads run. They also take a MVar to get commands *) + let serve box = (* wait for a job to come *) let rec wait_job () = match MVar.take box with - | Quit -> (Mutex.lock pool.mutex; quit ()) (* exit *) + | Quit -> (Mutex.lock mutex; quit ()) (* exit *) | Perform job -> run_job job (* run the given job *) @@ -160,342 +257,265 @@ module Pool = struct next () (* loop *) (* process next task *) and next () = - Mutex.lock pool.mutex; - if pool.stop then quit () (* stop the pool *) - else if Queue.is_empty pool.jobs + Mutex.lock mutex; + if !stop then quit () (* stop the pool *) + else if Queue.is_empty jobs then begin let now = Unix.gettimeofday () in (* cleanup waiting threads *) - cleanup_waiting pool; - if pool.cur_size > 1 && List.length pool.threads + 1 = pool.cur_size + cleanup_waiting (); + if !cur_size > 1 && List.length !threads + 1 = !cur_size then (* all other threads are waiting, we may need to kill them later *) - (Mutex.unlock pool.mutex; delay ()) + (Mutex.unlock mutex; delay ()) else begin (* add oneself to the list of waiting threads *) - pool.threads <- (now, box) :: pool.threads; - Mutex.unlock pool.mutex; + threads := (now, box) :: !threads; + Mutex.unlock mutex; wait_job () end end else - let job = Queue.pop pool.jobs in - Mutex.unlock pool.mutex; + let job = Queue.pop jobs in + Mutex.unlock mutex; run_job job (* delay [pool.timeout], so that in case no job is submitted we still kill old cached threads *) and delay () = - Thread.delay pool.timeout; + Thread.delay C.timeout; next () (* stop the thread (assume we have pool.mutex) *) and quit () = - pool.cur_size <- pool.cur_size - 1; - Mutex.unlock pool.mutex + cur_size := !cur_size - 1; + Mutex.unlock mutex in wait_job () - let size pool = - Mutex.lock pool.mutex; - let n = pool.cur_size in - Mutex.unlock pool.mutex; - n + let size pool = !cur_size (** Add a thread to the pool, starting with the first job *) - let add_thread pool job = + let add_thread job = let box = MVar.full job in - ignore (Thread.create (serve pool) box) - - (** Create a pool with at most the given number of threads. [timeout] - is the time after which idle threads are killed. *) - let create ?(timeout=30.) ~size = - let pool = { - stop = false; - cur_size = 0; - max_size=size; - timeout; - threads = []; - jobs = Queue.create (); - mutex = Mutex.create (); - } in - pool + ignore (Thread.create serve box) (** Run the job in the given pool *) - let run pool job = - assert (not (pool.stop)); - Mutex.lock pool.mutex; - begin match pool.threads with - | [] when pool.cur_size = pool.max_size -> + let run job = + assert (not (!stop)); + Mutex.lock mutex; + begin match !threads with + | [] when !cur_size = C.max_size -> (* max capacity reached, push task in queue *) - Queue.push job pool.jobs + Queue.push job jobs | [] -> (* spawn a thread for the given task *) - add_thread pool (Perform job); - pool.cur_size <- pool.cur_size + 1; + add_thread (Perform job); + cur_size := !cur_size + 1; | (_,box)::l' -> (* use the first thread *) MVar.put box (Perform job); - pool.threads <- l'; + threads := l'; end; - Mutex.unlock pool.mutex + Mutex.unlock mutex (** Kill threads in the pool *) - let finish pool = - Mutex.lock pool.mutex; - pool.stop <- true; + let finish () = + Mutex.lock mutex; + stop := true; (* kill waiting threads *) - List.iter (fun (_,box) -> MVar.put box Quit) pool.threads; - pool.threads <- []; - Mutex.unlock pool.mutex -end + List.iter (fun (_,box) -> MVar.put box Quit) !threads; + threads := []; + Mutex.unlock mutex -let default_pool = Pool.create ?timeout:None ~size:100 - (** Default pool of threads, should be ok for most uses. *) + (** {3 Futures} *) -(** {2 Futures} *) + type 'a state = + | NotKnown + | Success of 'a + | Failure of exn -type 'a t = { - mutable content : 'a result; - mutable handlers : 'a handler list; (* handlers *) - pool : Pool.t; - mutex : Mutex.t; - condition : Condition.t; -} (** A future value of type 'a *) -and 'a result = - | NotKnown - | Success of 'a - | Failure of exn - (** Result of a computation *) -and 'a handler = - | OnSuccess of ('a -> unit) - | OnFailure of (exn -> unit) - | OnFinish of (unit -> unit) + type 'a handler = + | OnSuccess of ('a -> unit) + | OnFailure of (exn -> unit) + | OnFinish of (unit -> unit) -exception SendTwice - (** Exception raised when a future is evaluated several time *) + type 'a t = { + mutable state : 'a state; + mutable handlers : 'a handler list; (* handlers *) + } (** A future value of type 'a *) -(** {2 Basic Future functions} *) - -let make pool = - { content = NotKnown; + let make_empty pool = { + state = NotKnown; handlers = []; - pool; - mutex = Mutex.create (); - condition = Condition.create (); } -let get future = - (* check whether it's finished: precond: mutex is locked *) - let rec check () = - match future.content with - | NotKnown -> - poll () (* wait *) - | Success x -> - Mutex.unlock future.mutex; - x (* return success *) - | Failure e -> - Mutex.unlock future.mutex; - raise e (* raise exception *) - (* poll, to wait for the result to arrive. Precond: mutex is acquired. *) - and poll () = - Condition.wait future.condition future.mutex; - check () (* we have been signaled, check! *) - in - Mutex.lock future.mutex; - check () - -let send future x = - Mutex.lock future.mutex; - match future.content with - | NotKnown -> (* set content and signal *) - future.content <- Success x; - Condition.broadcast future.condition; - List.iter - (function - | OnSuccess f -> Pool.run future.pool (fun () -> f x) - | OnFinish f -> Pool.run future.pool (fun () -> f ()) - | OnFailure _ -> ()) - future.handlers; - Mutex.unlock future.mutex - | _ -> - Mutex.unlock future.mutex; - raise SendTwice (* already set! *) - -let fail future e = - Mutex.lock future.mutex; - match future.content with - | NotKnown -> (* set content and signal *) - future.content <- Failure e; - Condition.broadcast future.condition; - List.iter - (function - | OnSuccess _ -> () - | OnFinish f -> f () - | OnFailure f -> f e) - future.handlers; - Mutex.unlock future.mutex - | _ -> - Mutex.unlock future.mutex; - raise SendTwice (* already set! *) - -let is_done future = - Mutex.lock future.mutex; - match future.content with - | NotKnown -> - Mutex.unlock future.mutex; - false - | _ -> - Mutex.unlock future.mutex; - true - -(** {2 Combinators *) - -let on_success future k = - Mutex.lock future.mutex; - (match future.content with - | NotKnown -> - future.handlers <- (OnSuccess k) :: future.handlers; (* wait *) - | Success x -> Pool.run future.pool (fun () -> k x) - | Failure _ -> ()); - Mutex.unlock future.mutex - -let on_failure future k = - Mutex.lock future.mutex; - (match future.content with - | NotKnown -> - future.handlers <- (OnFailure k) :: future.handlers; (* wait *) - | Success _ -> () - | Failure e -> Pool.run future.pool (fun () -> k e)); - Mutex.unlock future.mutex - -let on_finish future k = - Mutex.lock future.mutex; - (match future.content with - | NotKnown -> - future.handlers <- (OnFinish k) :: future.handlers; (* wait *) - | Success _ | Failure _ -> Pool.run future.pool (fun () -> k ())); - Mutex.unlock future.mutex - -let flatMap ?pool f future = - let pool = match pool with | Some p -> p | None -> future.pool in - let future' = make pool in - (* if [future] succeeds with [x], we spawn a new job to compute [f x] *) - on_success future - (fun x -> - try - let future'' = f x in - on_success future'' (fun x -> send future' x); - on_failure future'' (fun e -> fail future' e); - with e -> - fail future' e); - on_failure future - (fun e -> fail future' e); - future' - -let andThen ?pool future f = - flatMap ?pool (fun _ -> f ()) future - -let sequence ?(pool=default_pool) futures = - let a = Array.of_list futures in - let n = Array.length a in - let results = Array.make n NotKnown in - let future' = make default_pool in - (* state: how many remain to finish *) - let count = MVar.full (Array.length a) in - (* when all futures returned, collect results for future' *) - let check_at_end () = - let l = Array.to_list results in - try - let l = List.map + let send future x = + match future.state with + | NotKnown -> (* set content and signal *) + future.state <- Success x; + List.iter (function - | Success x -> x - | Failure e -> raise e - | NotKnown -> assert false) - l in - send future' l - with e -> - fail future' e - in - (* function called whenever a future succeeds *) - let one_succeeded i x = - results.(i) <- Success x; - let _, n = MVar.update count (fun x -> x-1) in - if n = 0 then check_at_end () - and one_failed i e = - results.(i) <- Failure e; - let _, n = MVar.update count (fun x -> x-1) in - if n = 0 then check_at_end () - in - (* wait for all to succeed or fail *) - for i = 0 to Array.length a - 1 do - on_success a.(i) (one_succeeded i); - on_failure a.(i) (one_failed i); - done; - future' + | OnSuccess f -> run (fun () -> f x) + | OnFinish f -> run (fun () -> f ()) + | OnFailure _ -> ()) + future.handlers; + | _ -> + raise SendTwice (* already set! *) -let choose ?(pool=default_pool) futures = - let future' = make default_pool in - let one_finished = MVar.full false in - (* handlers. The first handler to be called will update [one_finished] - to true, see that it was false (hence know it is the first) - and propagate its result to [future'] *) - let one_succeeded x = - let one_finished, _ = MVar.update one_finished (fun _ -> true) in - if not one_finished then send future' x - and one_failed e = - let one_finished, _ = MVar.update one_finished (fun _ -> true) in - if not one_finished then fail future' e - in - (* add handlers to all futures *) - List.iter - (fun future -> - on_success future one_succeeded; - on_failure future one_failed; ) - futures; - future' + let fail future e = + match future.state with + | NotKnown -> (* set content and signal *) + future.state <- Failure e; + List.iter + (function + | OnSuccess _ -> () + | OnFinish f -> f () + | OnFailure f -> f e) + future.handlers + | _ -> + raise SendTwice (* already set! *) -let map ?(pool=default_pool) f future = - let future' = make pool in - on_success future (fun x -> let y = f x in send future' y); - on_failure future (fun e -> fail future' e); - future' + let is_done future = + match future.state with + | NotKnown -> false + | _ -> true -(** {2 Future constructors} *) + (** {2 Combinators *) -let return x = - { content = Success x; + let on_success future k = + match future.state with + | NotKnown -> + future.handlers <- (OnSuccess k) :: future.handlers; (* wait *) + | Success x -> run (fun () -> k x) + | Failure _ -> () + + let on_failure future k = + match future.state with + | NotKnown -> + future.handlers <- (OnFailure k) :: future.handlers; (* wait *) + | Success _ -> () + | Failure e -> run (fun () -> k e) + + let on_finish future k = + match future.state with + | NotKnown -> + future.handlers <- (OnFinish k) :: future.handlers; (* wait *) + | Success _ | Failure _ -> run (fun () -> k ()) + + let flatMap f future = + let future' = make_empty () in + (* if [future] succeeds with [x], we spawn a new job to compute [f x] *) + on_success future + (fun x -> + try + let future'' = f x in + on_success future'' (fun x -> send future' x); + on_failure future'' (fun e -> fail future' e); + with e -> + fail future' e); + on_failure future + (fun e -> fail future' e); + future' + + let andThen future f = + flatMap (fun _ -> f ()) future + + let sequence futures = + let a = Array.of_list futures in + let n = Array.length a in + let results = Array.make n NotKnown in + let future' = make_empty () in + (* state: how many remain to finish *) + let count = MVar.full (Array.length a) in + (* when all futures returned, collect results for future' *) + let check_at_end () = + let l = Array.to_list results in + try + let l = List.map + (function + | Success x -> x + | Failure e -> raise e + | NotKnown -> assert false) + l in + send future' l + with e -> + fail future' e + in + (* function called whenever a future succeeds *) + let one_succeeded i x = + results.(i) <- Success x; + let _, n = MVar.update count (fun x -> x-1) in + if n = 0 then check_at_end () + and one_failed i e = + results.(i) <- Failure e; + let _, n = MVar.update count (fun x -> x-1) in + if n = 0 then check_at_end () + in + (* wait for all to succeed or fail *) + for i = 0 to Array.length a - 1 do + on_success a.(i) (one_succeeded i); + on_failure a.(i) (one_failed i); + done; + future' + + let choose futures = + let future' = make_empty () in + let one_finished = MVar.full false in + (* handlers. The first handler to be called will update [one_finished] + to true, see that it was false (hence know it is the first) + and propagate its result to [future'] *) + let one_succeeded x = + let one_finished, _ = MVar.update one_finished (fun _ -> true) in + if not one_finished then send future' x + and one_failed e = + let one_finished, _ = MVar.update one_finished (fun _ -> true) in + if not one_finished then fail future' e + in + (* add handlers to all futures *) + List.iter + (fun future -> + on_success future one_succeeded; + on_failure future one_failed; ) + futures; + future' + + let map f future = + let future' = make_empty () in + on_success future (fun x -> let y = f x in send future' y); + on_failure future (fun e -> fail future' e); + future' + + (** {2 Future constructors} *) + + let return x = { + state = Success x; handlers = []; - pool = default_pool; - mutex = Mutex.create (); - condition = Condition.create (); } -let spawn ?(pool=default_pool) f = - let future = make pool in - (* schedule computation *) - Pool.run pool - (fun () -> - try - let x = f () in - send future x - with e -> - fail future e); - future + let spawn f = + let future = make_empty () in + (* schedule computation *) + run (fun () -> + try + let x = f () in + send future x + with e -> + fail future e); + future -(** slurp the entire content of the file_descr into a string *) -let slurp i_chan = - let buf_size = 128 in - let content = Buffer.create 120 - and buf = String.make 128 'a' in - let rec next () = - let num = input i_chan buf 0 buf_size in - if num = 0 - then Buffer.contents content (* EOF *) - else (Buffer.add_substring content buf 0 num; next ()) - in next () + (** slurp the entire content of the file_descr into a string *) + let slurp i_chan = + let buf_size = 128 in + let content = Buffer.create 120 + and buf = String.make 128 'a' in + let rec next () = + let num = input i_chan buf 0 buf_size in + if num = 0 + then Buffer.contents content (* EOF *) + else (Buffer.add_substring content buf 0 num; next ()) + in next () -(** Spawn a sub-process with the given command [cmd] (and possibly input); - returns a future containing (returncode, stdout, stderr) *) -let spawn_process ?(pool=default_pool) ?(stdin="") ~cmd = - spawn ~pool - (fun () -> + (** Spawn a sub-process with the given command [cmd] (and possibly input); + returns a future containing (returncode, stdout, stderr) *) + let spawn_process ?(stdin="") ~cmd = + spawn (fun () -> (* spawn subprocess *) let out, inp, err = Unix.open_process_full cmd (Unix.environment ()) in output_string inp stdin; @@ -514,208 +534,128 @@ let spawn_process ?(pool=default_pool) ?(stdin="") ~cmd = | Unix.WSTOPPED i -> i in (returncode, out', err')) -(* TODO a global scheduler for timed events *) + let sleep time = + spawn (fun () -> Thread.delay time; ()) -let sleep ?(pool=default_pool) time = - spawn ~pool - (fun () -> Thread.delay time; ()) + (** {3 Mutable heap} + inlined here for avoiding dependencies *) + module Heap = struct + (** Implementation from http://en.wikipedia.org/wiki/Skew_heap *) -(** {2 Event timer} *) + type 'a t = { + mutable tree : 'a tree; + cmp : 'a -> 'a -> int; + } (** A pairing tree heap with the given comparison function *) + and 'a tree = + | Empty + | Node of 'a * 'a tree * 'a tree -(** {3 Mutable heap (taken from heap.ml to avoid dependencies)} *) -module Heap = struct - type 'a t = { - mutable tree : 'a tree; - cmp : 'a -> 'a -> int; - } (** A splay tree heap with the given comparison function *) - and 'a tree = - | Empty - | Node of ('a tree * 'a * 'a tree) - (** A splay tree containing values of type 'a *) + let empty ~cmp = { + tree = Empty; + cmp; + } - let empty ~cmp = { - tree = Empty; - cmp; - } + let is_empty h = + match h.tree with + | Empty -> true + | Node _ -> false - let is_empty h = - match h.tree with - | Empty -> true - | Node _ -> false + let rec union ~cmp t1 t2 = match t1, t2 with + | Empty, _ -> t2 + | _, Empty -> t1 + | Node (x1, l1, r1), Node (x2, l2, r2) -> + if cmp x1 x2 <= 0 + then Node (x1, union ~cmp t2 r1, l1) + else Node (x2, union ~cmp t1 r2, l2) - let clear h = - h.tree <- Empty + let insert h x = + h.tree <- union ~cmp:h.cmp (Node (x, Empty, Empty)) h.tree - (** Partition the tree into (elements <= pivot, elements > pivot) *) - let rec partition ~cmp pivot tree = - match tree with - | Empty -> Empty, Empty - | Node (a, x, b) -> - if cmp x pivot <= 0 - then begin - match b with - | Empty -> (tree, Empty) - | Node (b1, y, b2) -> - if cmp y pivot <= 0 - then - let small, big = partition ~cmp pivot b2 in - Node (Node (a, x, b1), y, small), big - else - let small, big = partition ~cmp pivot b1 in - Node (a, x, small), Node (big, y, b2) - end else begin - match a with - | Empty -> (Empty, tree) - | Node (a1, y, a2) -> - if cmp y pivot <= 0 - then - let small, big = partition ~cmp pivot a2 in - Node (a1, y, small), Node (big, x, b) - else - let small, big = partition ~cmp pivot a1 in - small, Node (big, y, Node (a2, x, b)) - end - - (** Insert the element in the tree *) - let insert h x = - let small, big = partition ~cmp:h.cmp x h.tree in - let tree' = Node (small, x, big) in - h.tree <- tree' - - (** Access minimum value *) - let min h = - let rec min tree = - match tree with + let pop h = match h.tree with | Empty -> raise Not_found - | Node (Empty, x, _) -> x - | Node (l, _, _) -> min l - in min h.tree + | Node (x, l, r) -> + h.tree <- union ~cmp:h.cmp l r; + x + end - (** Get minimum value and remove it from the tree *) - let pop h = - let rec delete_min tree = match tree with - | Empty -> raise Not_found - | Node (Empty, x, b) -> x, b - | Node (Node (Empty, x, b), y, c) -> - x, Node (b, y, c) (* rebalance *) - | Node (Node (a, x, b), y, c) -> - let m, a' = delete_min a in - m, Node (a', x, Node (b, y, c)) - in - let m, tree' = delete_min h.tree in - h.tree <- tree'; - m -end + (** {2 Event timer} *) + module Timer = struct + let cmp_tasks (f1,_) (f2,_) = + compare f1 f2 -module Timer = struct - type t = { - mutable stop : bool; - mutable thread : Thread.t option; (* thread dedicated to the timer *) - pool : Pool.t; - tasks : (float * (unit -> unit)) Heap.t; - mutex : Mutex.t; - fifo_in : Unix.file_descr; - fifo_out : Unix.file_descr; - } (** A timer for events *) + let stop = ref false + let tasks : (float * (unit -> unit)) Heap.t = Heap.empty cmp:cmp_tasks + let fifo_in, fifo_out = Unix.pipe () + let thread = ref None + let standby_wait = 30. (* when no task is scheduled *) + let epsilon = 0.0001 (* accepted time diff for actions *) - let cmp_tasks (f1,_) (f2,_) = - compare f1 f2 - - let standby_wait = 30. (* when no task is scheduled *) - let epsilon = 0.0001 (* accepted time diff for actions *) - - (** Wait for next event, run it, and loop *) - let serve timer = - let buf = String.make 1 '_' in - (* process next task *) - let rec next () = - Mutex.lock timer.mutex; - (* what is the next task? *) - let next_task = - try Some (Heap.min timer.tasks) - with Not_found -> None in - match next_task with - | _ when timer.stop -> Mutex.unlock timer.mutex (* stop *) - | None -> - Mutex.unlock timer.mutex; - wait standby_wait (* wait for a task *) - | Some (time, task) -> - let now = Unix.gettimeofday () in - if now +. epsilon > time - then begin (* run task in the pool *) - Pool.run timer.pool task; - ignore (Heap.pop timer.tasks); - Mutex.unlock timer.mutex; - (* process next task, if any *) - next () - end else (* too early, wait *) - (Mutex.unlock timer.mutex; - wait (time -. now)) - (* wait for [delay] seconds, or until something happens on fifo_in *) - and wait delay = - let read = Thread.wait_timed_read timer.fifo_in delay in - (if read then ignore (Unix.read timer.fifo_in buf 0 1)); (* remove char *) + (** Wait for next event, run it, and loop *) + let serve () = + let buf = String.make 1 '_' in + (* process next task *) + let rec next () = + Mutex.lock mutex; + (* what is the next task? *) + let next_task = + try Some (Heap.min tasks) + with Not_found -> None in + match next_task with + | _ when timer.stop -> Mutex.unlock mutex (* stop *) + | None -> + Mutex.unlock mutex; + wait standby_wait (* wait for a task *) + | Some (time, task) -> + let now = Unix.gettimeofday () in + if now +. epsilon > time + then begin (* run task in the pool *) + run task; + ignore (Heap.pop tasks); + Mutex.unlock mutex; + (* process next task, if any *) + next () + end else (* too early, wait *) + (Mutex.unlock mutex; + wait (time -. now)) + (* wait for [delay] seconds, or until something happens on fifo_in *) + and wait delay = + let read = Thread.wait_timed_read fifo_in delay in + if read + then ignore (Unix.read fifo_in buf 0 1); (* remove char *) + next () + in next () - in - next () - (** A timer that runs in the given thread pool *) - let create ?(pool=default_pool) () = - let fifo_in, fifo_out = Unix.pipe () in - let timer = { - stop = false; - pool; - thread = None; - tasks = Heap.empty ~cmp:cmp_tasks; - mutex = Mutex.create (); - fifo_in; - fifo_out; - } in - (* start a thread to process tasks *) - let t = Thread.create serve timer in - timer.thread <- Some t; - timer + let () = + let t = Thread.create server timer in + thread := Some t; + () - (** [timerule_at s t act] will run [act] at the Unix echo [t] *) - let schedule_at timer time task = - Mutex.lock timer.mutex; - (* time of the next scheduled event *) - let next_time = - try let time, _ = Heap.min timer.tasks in time - with Not_found -> max_float - in - (* insert task *) - Heap.insert timer.tasks (time, task); - (* see if the timer thread needs to be awaken earlier *) - (if time < next_time - then ignore (Unix.single_write timer.fifo_out "_" 0 1)); - Mutex.unlock timer.mutex; - () + let schedule_at ~at task = + Mutex.lock mutex; + (* time of the next scheduled event *) + let next_time = + try let time, _ = Heap.min tasks in time + with Not_found -> max_float + in + (* insert task *) + Heap.insert tasks (time, task); + (* see if the timer thread needs to be awaken earlier *) + (if time < next_time + then ignore (Unix.single_write fifo_out "_" 0 1)); + Mutex.unlock mutex; + () - (** [schedule_in s d act] will run [act] in [d] seconds *) - let schedule_in timer delay task = - assert (delay >= 0.); - schedule_at timer (Unix.gettimeofday () +. delay) task + let schedule_after ~after task = + assert (delay >= 0.); + schedule_at ~at:(Unix.gettimeofday () +. delay) task + end + + module Infix = struct + let (>>=) x f = flatMap f x + let (>>) a f = andThen a f + end + + include Infix - (** Stop the given timer, cancelling pending tasks *) - let stop timer = - Mutex.lock timer.mutex; - (if timer.stop then (Mutex.unlock timer.mutex; assert false)); - timer.stop <- true; - (* empty heap of tasks *) - Heap.clear timer.tasks; - (* kill the thread *) - (match timer.thread with - | None -> () - | Some t -> - Thread.kill t; - timer.thread <- None); - Mutex.unlock timer.mutex end - -module Infix = struct - let (>>=) x f = flatMap f x - let (>>) a f = andThen a f -end - -include Infix diff --git a/future.mli b/future.mli index e39be681..b3691183 100644 --- a/future.mli +++ b/future.mli @@ -25,9 +25,6 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. (** {1 Futures for concurrency} *) -type 'a t - (** A future value of type 'a *) - exception SendTwice (** Exception raised when a future is evaluated several time *) @@ -59,113 +56,103 @@ module MVar : sig (** Look at the value, without removing it *) end -(** {2 Thread pool} *) -module Pool : sig - type t - (** A pool of threads *) +(** {2 Signature} *) - val create : ?timeout:float -> size:int -> t - (** Create a pool with at most the given number of threads. [timeout] - is the time after which idle threads are killed. *) +module type S = sig + type 'a t + (** A future value of type 'a *) - val size : t -> int - (** Current size of the pool *) - - val run : t -> (unit -> unit) -> unit + val run : (unit -> unit) -> unit (** Run the function in the pool *) - val finish : t -> unit - (** Kill threads in the pool *) -end + val finish : unit -> unit + (** Kill threads in the pool. The pool won't be usable any more. *) -val default_pool : Pool.t - (** Pool of threads that is used by default. Growable if needed. *) + (** {2 Basic low-level Future functions} *) -(** {2 Basic low-level Future functions} *) + type 'a state = + | NotKnown + | Success of 'a + | Failure of exn -val make : Pool.t -> 'a t - (** Create a future, representing a value that is not known yet. *) + val state : 'a t -> 'a state + (** Current state of the future *) -val get : 'a t -> 'a - (** Blocking get: wait for the future to be evaluated, and get the value, - or the exception that failed the future is returned *) + val is_done : 'a t -> bool + (** Is the future evaluated (success/failure)? *) -val send : 'a t -> 'a -> unit - (** Send a result to the future. Will raise SendTwice if [send] has - already been called on this future before *) + (** {2 Combinators} *) -val fail : 'a t -> exn -> unit - (** Fail the future by raising an exception inside it *) + val on_success : 'a t -> ('a -> unit) -> unit + (** Attach a handler to be called upon success *) -val is_done : 'a t -> bool - (** Is the future evaluated (success/failure)? *) + val on_failure : _ t -> (exn -> unit) -> unit + (** Attach a handler to be called upon failure *) -(** {2 Combinators} *) + val on_finish : _ t -> (unit -> unit) -> unit + (** Attach a handler to be called when the future is evaluated *) -val on_success : 'a t -> ('a -> unit) -> unit - (** Attach a handler to be called upon success *) + val flatMap : ('a -> 'b t) -> 'a t -> 'b t + (** Monadic combination of futures *) -val on_failure : _ t -> (exn -> unit) -> unit - (** Attach a handler to be called upon failure *) + val andThen : 'a t -> (unit -> 'b t) -> 'b t + (** Wait for the first future to succeed, then launch the second *) -val on_finish : _ t -> (unit -> unit) -> unit - (** Attach a handler to be called when the future is evaluated *) + val sequence : 'a t list -> 'a list t + (** Future that waits for all previous sequences to terminate *) -val flatMap : ?pool:Pool.t -> ('a -> 'b t) -> 'a t -> 'b t - (** Monadic combination of futures *) + val choose : 'a t list -> 'a t + (** Choose among those futures (the first to terminate) *) -val andThen : ?pool:Pool.t -> 'a t -> (unit -> 'b t) -> 'b t - (** Wait for the first future to succeed, then launch the second *) + val map : ('a -> 'b) -> 'a t -> 'b t + (** Maps the value inside the future *) -val sequence : ?pool:Pool.t -> 'a t list -> 'a list t - (** Future that waits for all previous sequences to terminate *) + (** {2 Future constructors} *) -val choose : ?pool:Pool.t -> 'a t list -> 'a t - (** Choose among those futures (the first to terminate) *) + val return : 'a -> 'a t + (** Future that is already computed *) -val map : ?pool:Pool.t -> ('a -> 'b) -> 'a t -> 'b t - (** Maps the value inside the future *) + val spawn : (unit -> 'a) -> 'a t + (** Spawn a thread that wraps the given computation *) -(** {2 Future constructors} *) + val spawn_process : ?stdin:string -> cmd:string -> + (int * string * string) t + (** Spawn a sub-process with the given command [cmd] (and possibly input); + returns a future containing (returncode, stdout, stderr) *) -val return : 'a -> 'a t - (** Future that is already computed *) + val sleep : float -> unit t + (** Future that returns with success in the given amount of seconds *) -val spawn : ?pool:Pool.t -> (unit -> 'a) -> 'a t - (** Spawn a thread that wraps the given computation *) + (** {2 Event timer} *) -val spawn_process : ?pool:Pool.t -> ?stdin:string -> cmd:string -> - (int * string * string) t - (** Spawn a sub-process with the given command [cmd] (and possibly input); - returns a future containing (returncode, stdout, stderr) *) + module Timer : sig + val schedule_at : at:float -> (unit -> unit) -> unit + (** [schedule_at ~at act] will run [act] at the Unix echo [at] *) -val sleep : ?pool:Pool.t -> float -> unit t - (** Future that returns with success in the given amount of seconds *) + val schedule_after : after:float -> (unit -> unit) -> unit + (** [schedule_after ~after act] will run [act] in [after] seconds *) + end -(** {2 Event timer} *) + module Infix : sig + val (>>=) : 'a t -> ('a -> 'b t) -> 'b t + val (>>) : 'a t -> (unit -> 'b t) -> 'b t + end -module Timer : sig - type t - (** A scheduler for events *) - - val create : ?pool:Pool.t -> unit -> t - (** A timer that runs tasks in the given thread pool *) - - val schedule_at : t -> float -> (unit -> unit) -> unit - (** [schedule_at s t act] will run [act] at the Unix echo [t] *) - - val schedule_in : t -> float -> (unit -> unit) -> unit - (** [schedule_in s d act] will run [act] in [d] seconds *) - - val stop : t -> unit - (** Stop the given timer, cancelling pending tasks *) -end - - -module Infix : sig val (>>=) : 'a t -> ('a -> 'b t) -> 'b t val (>>) : 'a t -> (unit -> 'b t) -> 'b t end -val (>>=) : 'a t -> ('a -> 'b t) -> 'b t -val (>>) : 'a t -> (unit -> 'b t) -> 'b t +(** {2 Functor} *) + +module type CONFIG = sig + val timeout : float + + val max_size : int +end + +module DefaultConfig : CONFIG + +module Make(C : CONFIG) : S + +(** Standard (default) pool *) +module Std : S diff --git a/tests/test_future.ml b/tests/test_future.ml index ce7420b4..df4b0bc3 100644 --- a/tests/test_future.ml +++ b/tests/test_future.ml @@ -1,45 +1,50 @@ -(** Test Future *) +(** Test F *) open OUnit +module F = Future.Std +module MVar = Future.MVar + let test_mvar () = - let box = Future.MVar.empty () in - let f = Future.spawn (fun () -> Future.MVar.take box + 1) in + let box = MVar.empty () in + let f = F.spawn (fun () -> MVar.take box + 1) in Thread.delay 0.1; - OUnit.assert_bool "still waiting" (not (Future.is_done f)); - Future.MVar.put box 1; - OUnit.assert_equal 2 (Future.get f); + OUnit.assert_bool "still waiting" (not (F.is_done f)); + MVar.put box 1; + Thread.delay 1.; + OUnit.assert_equal (F.Success 2) (F.state f); () let test_parallel () = let open Gen.Infix in let l = 1 -- 300 - |> Gen.map (fun _ -> Future.spawn (fun () -> Thread.delay 0.1; 1)) + |> Gen.map (fun _ -> F.spawn (fun () -> Thread.delay 0.1; 1)) |> Gen.to_list in - let l' = List.map Future.get l in - OUnit.assert_equal 300 (List.fold_left (+) 0 l'); + let l' = F.map (List.fold_left (+) 0) (F.sequence l) in + Thread.delay 0.5; + OUnit.assert_equal (F.Success 300) (F.state l'); () let test_time () = let start = Unix.gettimeofday () in - let f1 = Future.spawn (fun () -> Thread.delay 0.5) in - let f2 = Future.spawn (fun () -> Thread.delay 0.5) in - Future.get f1; - Future.get f2; + let f1 = F.spawn (fun () -> Thread.delay 0.5) in + let f2 = F.spawn (fun () -> Thread.delay 0.5) in + F.get f1; + F.get f2; let stop = Unix.gettimeofday () in OUnit.assert_bool "parallelism" (stop -. start < 0.75); () let test_timer () = - let timer = Future.Timer.create () in - let mvar = Future.MVar.full 1 in - Future.Timer.schedule_in timer 0.5 - (fun () -> ignore (Future.MVar.update mvar (fun x -> x + 2))); - Future.Timer.schedule_in timer 0.2 - (fun () -> ignore (Future.MVar.update mvar (fun x -> x * 4))); + let timer = F.Timer.create () in + let mvar = MVar.full 1 in + F.Timer.schedule_in timer 0.5 + (fun () -> ignore (MVar.update mvar (fun x -> x + 2))); + F.Timer.schedule_in timer 0.2 + (fun () -> ignore (MVar.update mvar (fun x -> x * 4))); Thread.delay 0.7; - OUnit.assert_equal 6 (Future.MVar.peek mvar); + OUnit.assert_equal 6 (MVar.peek mvar); () let suite =