From e6eb9a79ebf4eab2c13424c1fa7a385995dadaa4 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 21 Mar 2013 10:15:35 +0100 Subject: [PATCH] expose an optional Pool argument in most Future\'s combinators; handlers of a future are scheduled in pool rather than sequentially (to avoid possible deadlocks among them) --- future.ml | 80 ++++++++++++++++++++++++++++++------------------------ future.mli | 12 ++++---- 2 files changed, 50 insertions(+), 42 deletions(-) diff --git a/future.ml b/future.ml index 32a54441..3a2a2759 100644 --- a/future.ml +++ b/future.ml @@ -25,25 +25,6 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. (** {1 Futures for concurrency} *) -type 'a t = { - mutable content : 'a result; - mutable handlers : 'a handler list; (* handlers *) - mutex : Mutex.t; - condition : Condition.t; -} (** A future value of type 'a *) -and 'a result = - | NotKnown - | Success of 'a - | Failure of exn - (** Result of a computation *) -and 'a handler = - | OnSuccess of ('a -> unit) - | OnFailure of (exn -> unit) - | OnFinish of (unit -> unit) - -exception SendTwice - (** Exception raised when a future is evaluated several time *) - (** {2 MVar: a zero-or-one element thread-safe box} *) module MVar = struct @@ -268,11 +249,34 @@ end let default_pool = Pool.create ?timeout:None ~size:100 (** Default pool of threads, should be ok for most uses. *) +(** {2 Futures} *) + +type 'a t = { + mutable content : 'a result; + mutable handlers : 'a handler list; (* handlers *) + pool : Pool.t; + mutex : Mutex.t; + condition : Condition.t; +} (** A future value of type 'a *) +and 'a result = + | NotKnown + | Success of 'a + | Failure of exn + (** Result of a computation *) +and 'a handler = + | OnSuccess of ('a -> unit) + | OnFailure of (exn -> unit) + | OnFinish of (unit -> unit) + +exception SendTwice + (** Exception raised when a future is evaluated several time *) + (** {2 Basic Future functions} *) -let make () = +let make pool = { content = NotKnown; handlers = []; + pool; mutex = Mutex.create (); condition = Condition.create (); } @@ -305,8 +309,8 @@ let send future x = Condition.broadcast future.condition; List.iter (function - | OnSuccess f -> f x - | OnFinish f -> f () + | OnSuccess f -> Pool.run future.pool (fun () -> f x) + | OnFinish f -> Pool.run future.pool (fun () -> f ()) | OnFailure _ -> ()) future.handlers; Mutex.unlock future.mutex @@ -348,7 +352,7 @@ let on_success future k = (match future.content with | NotKnown -> future.handlers <- (OnSuccess k) :: future.handlers; (* wait *) - | Success x -> k x + | Success x -> Pool.run future.pool (fun () -> k x) | Failure _ -> ()); Mutex.unlock future.mutex @@ -358,7 +362,7 @@ let on_failure future k = | NotKnown -> future.handlers <- (OnFailure k) :: future.handlers; (* wait *) | Success _ -> () - | Failure e -> k e); + | Failure e -> Pool.run future.pool (fun () -> k e)); Mutex.unlock future.mutex let on_finish future k = @@ -366,11 +370,12 @@ let on_finish future k = (match future.content with | NotKnown -> future.handlers <- (OnFinish k) :: future.handlers; (* wait *) - | Success _ | Failure _ -> k ()); + | Success _ | Failure _ -> Pool.run future.pool (fun () -> k ())); Mutex.unlock future.mutex -let flatMap f future = - let future' = make () in +let flatMap ?pool f future = + let pool = match pool with | Some p -> p | None -> future.pool in + let future' = make pool in (* if [future] succeeds with [x], we spawn a new job to compute [f x] *) on_success future (fun x -> @@ -384,14 +389,14 @@ let flatMap f future = (fun e -> fail future' e); future' -let andThen future f = - flatMap (fun _ -> f ()) future +let andThen ?pool future f = + flatMap ?pool (fun _ -> f ()) future -let sequence futures = +let sequence ?(pool=default_pool) futures = let a = Array.of_list futures in let n = Array.length a in let results = Array.make n NotKnown in - let future' = make () in + let future' = make default_pool in (* state: how many remain to finish *) let count = MVar.full (Array.length a) in (* when all futures returned, collect results for future' *) @@ -425,8 +430,8 @@ let sequence futures = done; future' -let choose futures = - let future' = make () in +let choose ?(pool=default_pool) futures = + let future' = make default_pool in let one_finished = MVar.full false in (* handlers. The first handler to be called will update [one_finished] to true, see that it was false (hence know it is the first) @@ -446,8 +451,8 @@ let choose futures = futures; future' -let map f future = - let future' = make () in +let map ?(pool=default_pool) f future = + let future' = make pool in on_success future (fun x -> let y = f x in send future' y); on_failure future (fun e -> fail future' e); future' @@ -457,12 +462,13 @@ let map f future = let return x = { content = Success x; handlers = []; + pool = default_pool; mutex = Mutex.create (); condition = Condition.create (); } let spawn ?(pool=default_pool) f = - let future = make () in + let future = make pool in (* schedule computation *) Pool.run pool (fun () -> @@ -508,6 +514,8 @@ let spawn_process ?(pool=default_pool) ?(stdin="") ~cmd = | Unix.WSTOPPED i -> i in (returncode, out', err')) +(* TODO a global scheduler for timed events *) + let sleep ?(pool=default_pool) time = spawn ~pool (fun () -> Thread.delay time; ()) diff --git a/future.mli b/future.mli index a1ce32cd..01422020 100644 --- a/future.mli +++ b/future.mli @@ -83,7 +83,7 @@ val default_pool : Pool.t (** {2 Basic low-level Future functions} *) -val make : unit -> 'a t +val make : Pool.t -> 'a t (** Create a future, representing a value that is not known yet. *) val get : 'a t -> 'a @@ -111,19 +111,19 @@ val on_failure : _ t -> (exn -> unit) -> unit val on_finish : _ t -> (unit -> unit) -> unit (** Attach a handler to be called when the future is evaluated *) -val flatMap : ('a -> 'b t) -> 'a t -> 'b t +val flatMap : ?pool:Pool.t -> ('a -> 'b t) -> 'a t -> 'b t (** Monadic combination of futures *) -val andThen : 'a t -> (unit -> 'b t) -> 'b t +val andThen : ?pool:Pool.t -> 'a t -> (unit -> 'b t) -> 'b t (** Wait for the first future to succeed, then launch the second *) -val sequence : 'a t list -> 'a list t +val sequence : ?pool:Pool.t -> 'a t list -> 'a list t (** Future that waits for all previous sequences to terminate *) -val choose : 'a t list -> 'a t +val choose : ?pool:Pool.t -> 'a t list -> 'a t (** Choose among those futures (the first to terminate) *) -val map : ('a -> 'b) -> 'a t -> 'b t +val map : ?pool:Pool.t -> ('a -> 'b) -> 'a t -> 'b t (** Maps the value inside the future *) (** {2 Future constructors} *)