diff --git a/futures.ml b/futures.ml index 6c1d10ac..26f294c7 100644 --- a/futures.ml +++ b/futures.ml @@ -48,6 +48,7 @@ exception SendTwice module Pool = struct type t = { mutable threads : Thread.t list; + mutable stop : bool; size : int; max_load : int; jobs : (unit -> unit) Queue.t; @@ -72,7 +73,7 @@ module Pool = struct if Queue.is_empty pool.jobs then begin (* caramba! try again *) Mutex.unlock pool.mutex; - poll limit end + if not pool.stop then poll limit end else begin let job = Queue.pop pool.jobs in Mutex.unlock pool.mutex; @@ -82,9 +83,9 @@ module Pool = struct with _ -> ()); match limit with - | None -> poll limit (* I am immortal! *) + | None -> if not pool.stop then poll limit (* I am immortal! *) | Some 0 -> () (* stop, reached limit *) - | Some n -> poll (Some (n-1)) (* continue serving *) + | Some n -> if not pool.stop then poll (Some (n-1)) (* continue serving *) end in poll limit @@ -100,6 +101,7 @@ module Pool = struct let create ?(max_load=max_int) ~size = let pool = { threads = []; + stop = false; size; max_load; jobs = Queue.create (); @@ -129,6 +131,11 @@ module Pool = struct (** Kill threads in the pool *) let finish pool = + Mutex.lock pool.mutex; + pool.stop <- true; + Condition.broadcast pool.condition; + Mutex.unlock pool.mutex; + (* kill immortal threads *) List.iter (fun t -> Thread.kill t) pool.threads end @@ -141,21 +148,24 @@ module MVar = struct type 'a t = { mutable content : 'a option; mutex : Mutex.t; - condition : Condition.t; + on_take : Condition.t; (* signal that a value was removed (empty) *) + on_put : Condition.t; (* signal that a value was added (full) *) } (** Create an empty box *) let empty () = { content = None; mutex = Mutex.create (); - condition = Condition.create (); + on_take = Condition.create (); + on_put = Condition.create (); } (** Create a full box *) let full x = { content = Some x; mutex = Mutex.create (); - condition = Condition.create (); + on_take = Condition.create (); + on_put = Condition.create (); } (** Is the box currently empty? *) @@ -165,22 +175,60 @@ module MVar = struct Mutex.unlock box.mutex; ans + (* assuming we have a lock on given box, wait it gets a value and return it *) + let rec wait_put box = + match box.content with + | None -> + Condition.wait box.on_put box.mutex; + wait_put box (* try again *) + | Some x -> x + + (* same, but waits for the box to become empty *) + let rec wait_take box = + match box.content with + | None -> () (* empty! *) + | Some _ -> + Condition.wait box.on_take box.mutex; + wait_take box (* try again *) + (** Take value out of the box. Wait if necessary *) let take box = - failwith "not implemented" + Mutex.lock box.mutex; + let x = wait_put box in + box.content <- None; + Condition.broadcast box.on_take; + Mutex.unlock box.mutex; + x - (** Put a value in the box. Waits if the box is already empty *) + (** Put a value in the box. Waits if the box is already full *) let put box x = - failwith "not impleemnted" + Mutex.lock box.mutex; + wait_take box; + box.content <- Some x; + Condition.broadcast box.on_put; + Mutex.unlock box.mutex (** Use given function to atomically update content, and return the previous value and the new one *) let update box f = - failwith "not implemented" + Mutex.lock box.mutex; + let x = wait_put box in + try + let y = f x in + box.content <- Some y; + Condition.broadcast box.on_put; (* signal write *) + Mutex.unlock box.mutex; + x, y + with e -> + Mutex.unlock box.mutex; + raise e (** Look at the value, without removing it *) let peek box = - failwith "not implemented" + Mutex.lock box.mutex; + let x = wait_put box in + Mutex.unlock box.mutex; + x end (** {2 Basic Future functions} *) @@ -260,17 +308,28 @@ let is_done future = let on_success future k = Mutex.lock future.mutex; - future.handlers <- (OnSuccess k) :: future.handlers; + (match future.content with + | NotKnown -> + future.handlers <- (OnSuccess k) :: future.handlers; (* wait *) + | Success x -> k x + | Failure _ -> ()); Mutex.unlock future.mutex let on_failure future k = Mutex.lock future.mutex; - future.handlers <- (OnFailure k) :: future.handlers; + (match future.content with + | NotKnown -> + future.handlers <- (OnFailure k) :: future.handlers; (* wait *) + | Success _ -> () + | Failure e -> k e); Mutex.unlock future.mutex let on_finish future k = Mutex.lock future.mutex; - future.handlers <- (OnFinish k) :: future.handlers; + (match future.content with + | NotKnown -> + future.handlers <- (OnFinish k) :: future.handlers; (* wait *) + | Success _ | Failure _ -> k ()); Mutex.unlock future.mutex let flatMap f future = @@ -288,8 +347,46 @@ let flatMap f future = (fun e -> fail future' e); future' +let andThen future f = + flatMap (fun _ -> f ()) future + let sequence futures = - failwith "not implemented" + let a = Array.of_list futures in + let n = Array.length a in + let results = Array.make n NotKnown in + let future' = make () in + (* state: how many remain to finish *) + let count = MVar.full (Array.length a) in + (* when all futures returned, collect results for future' *) + let check_at_end () = + let l = Array.to_list results in + try + let l = List.map + (function + | Success x -> x + | Failure e -> raise e + | NotKnown -> assert false) + l in + send future' l + with e -> + fail future' e + in + (* function called whenever a future succeeds *) + let one_succeeded i x = + results.(i) <- Success x; + let _, n = MVar.update count (fun x -> x-1) in + if n = 0 then check_at_end () + and one_failed i e = + results.(i) <- Failure e; + let _, n = MVar.update count (fun x -> x-1) in + if n = 0 then check_at_end () + in + (* wait for all to succeed or fail *) + for i = 0 to Array.length a - 1 do + on_success a.(i) (one_succeeded i); + on_failure a.(i) (one_failed i); + done; + future' let choose futures = failwith "not implemented" @@ -356,6 +453,11 @@ let spawn_process ?(pool=default_pool) ?(stdin="") ~cmd = | Unix.WSTOPPED i -> i in (returncode, out', err')) +let sleep ?(pool=default_pool) time = + spawn ~pool + (fun () -> Unix.sleep time; ()) + module Infix = struct let (>>=) x f = flatMap f x + let (>>) a f = andThen a f end diff --git a/futures.mli b/futures.mli index 78fc4ffa..520dfaa2 100644 --- a/futures.mli +++ b/futures.mli @@ -114,6 +114,9 @@ val on_finish : _ t -> (unit -> unit) -> unit val flatMap : ('a -> 'b t) -> 'a t -> 'b t (** Monadic combination of futures *) +val andThen : 'a t -> (unit -> 'b t) -> 'b t + (** Wait for the first future to succeed, then launch the second *) + val sequence : 'a t list -> 'a list t (** Future that waits for all previous sequences to terminate *) @@ -136,6 +139,10 @@ val spawn_process : ?pool:Pool.t -> ?stdin:string -> cmd:string -> (** Spawn a sub-process with the given command [cmd] (and possibly input); returns a future containing (returncode, stdout, stderr) *) +val sleep : ?pool:Pool.t -> int -> unit t + (** Future that returns with success in the given amount of seconds *) + module Infix : sig val (>>=) : 'a t -> ('a -> 'b t) -> 'b t + val (>>) : 'a t -> (unit -> 'b t) -> 'b t end