diff --git a/futures.ml b/futures.ml index d061f088..8fffdbcb 100644 --- a/futures.ml +++ b/futures.ml @@ -27,6 +27,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. type 'a t = { mutable content : 'a result; + mutable handlers : 'a handler list; (* handlers *) mutex : Mutex.t; condition : Condition.t; } (** A future value of type 'a *) @@ -35,6 +36,10 @@ and 'a result = | Success of 'a | Failure of exn (** Result of a computation *) +and 'a handler = + | OnSuccess of ('a -> unit) + | OnFailure of (exn -> unit) + | OnFinish of (unit -> unit) exception SendTwice (** Exception raised when a future is evaluated several time *) @@ -44,6 +49,7 @@ module Pool = struct type t = { mutable threads : Thread.t list; size : int; + max_load : int; jobs : (unit -> unit) Queue.t; mutex : Mutex.t; condition : Condition.t; @@ -51,16 +57,22 @@ module Pool = struct (* TODO option to allow the pool to grow on demand? *) + let load pool = + Mutex.lock pool.mutex; + let n = Queue.length pool.jobs in + Mutex.unlock pool.mutex; + n + (* Internal function, which is run by the threads of the pool *) - let serve pool = + let serve pool limit = (* loop, to get the next job *) - let rec poll () = + let rec poll limit = Mutex.lock pool.mutex; Condition.wait pool.condition pool.mutex; if Queue.is_empty pool.jobs then begin (* caramba! try again *) Mutex.unlock pool.mutex; - poll () end + poll limit end else begin let job = Queue.pop pool.jobs in Mutex.unlock pool.mutex; @@ -69,30 +81,48 @@ module Pool = struct job () with _ -> ()); - poll () (* recurse *) + match limit with + | None -> poll limit (* I am immortal! *) + | Some 0 -> () (* stop, reached limit *) + | Some n -> poll (Some (n-1)) (* continue serving *) end in - poll () + poll limit + + (** Add a thread to the pool, that will serve at most [limit] jobs *) + let add_thread ?limit pool = + let t = Thread.create (serve pool) limit in + (* transient threads are not stored *) + if limit = None + then pool.threads <- t :: pool.threads (** Create a pool with the given number of threads. *) - let create ~size = + let create ?(max_load=max_int) ~size = let pool = { threads = []; size; + max_load; jobs = Queue.create (); mutex = Mutex.create (); condition = Condition.create (); } in - (* start threads *) + (* start persistent threads *) for i = 0 to size - 1 do - pool.threads <- (Thread.create serve pool) :: pool.threads; + add_thread pool done; pool + let transient_thread_lifetime = 10 + (** Schedule a function to run in the pool *) let schedule pool f = Mutex.lock pool.mutex; Queue.push f pool.jobs; + (* grow set of threads, if needed *) + (if Queue.length pool.jobs > pool.max_load + then begin + add_thread ~limit:transient_thread_lifetime pool + end); Condition.signal pool.condition; (* wake up one thread *) Mutex.unlock pool.mutex; () @@ -102,13 +132,62 @@ module Pool = struct List.iter (fun t -> Thread.kill t) pool.threads end -let default_pool = Pool.create 3 - (** Default pool of threads *) +let default_pool = Pool.create ~max_load:500 ~size:3 + (** Default pool of threads (growable) *) + +(** {2 MVar: a zero-or-one element thread-safe box} *) + +module MVar = struct + type 'a t = { + mutable content : 'a option; + mutex : Mutex.t; + condition : Condition.t; + } + + (** Create an empty box *) + let empty () = { + content = None; + mutex = Mutex.create (); + condition = Condition.create (); + } + + (** Create a full box *) + let full x = { + content = Some x; + mutex = Mutex.create (); + condition = Condition.create (); + } + + (** Is the box currently empty? *) + let is_empty box = + Mutex.lock box.mutex; + let ans = box.content <> None in + Mutex.unlock box.mutex; + ans + + (** Take value out of the box. Wait if necessary *) + let take box = + failwith "not implemented" + + (** Put a value in the box. Waits if the box is already empty *) + let put box x = + failwith "not impleemnted" + + (** Use given function to atomically update content, and return + the previous value and the new one *) + let update box f = + failwith "not implemented" + + (** Look at the value, without removing it *) + let peek box = + failwith "not implemented" +end (** {2 Basic Future functions} *) let make () = { content = NotKnown; + handlers = []; mutex = Mutex.create (); condition = Condition.create (); } @@ -139,6 +218,12 @@ let send future x = | NotKnown -> (* set content and signal *) future.content <- Success x; Condition.broadcast future.condition; + List.iter + (function + | OnSuccess f -> f x + | OnFinish f -> f () + | OnFailure _ -> ()) + future.handlers; Mutex.unlock future.mutex | _ -> Mutex.unlock future.mutex; @@ -150,6 +235,12 @@ let fail future e = | NotKnown -> (* set content and signal *) future.content <- Failure e; Condition.broadcast future.condition; + List.iter + (function + | OnSuccess _ -> () + | OnFinish f -> f () + | OnFailure f -> f e) + future.handlers; Mutex.unlock future.mutex | _ -> Mutex.unlock future.mutex; @@ -167,25 +258,53 @@ let is_done future = (** {2 Combinators *) -let flatMap ?(pool=default_pool) f future = +let on_success future k = + Mutex.lock future.mutex; + future.handlers <- (OnSuccess k) :: future.handlers; + Mutex.unlock future.mutex + +let on_failure future k = + Mutex.lock future.mutex; + future.handlers <- (OnFailure k) :: future.handlers; + Mutex.unlock future.mutex + +let on_finish future k = + Mutex.lock future.mutex; + future.handlers <- (OnFinish k) :: future.handlers; + Mutex.unlock future.mutex + +let flatMap f future = let future' = make () in - (* schedule the task that waits for [future] to return [x], then - computes [f x] and send the result to [future'] *) - Pool.schedule pool - (fun () -> + (* if [future] succeeds with [x], we spawn a new job to compute [f x] *) + on_success future + (fun x -> try - let x = get future in let future'' = f x in - let y = get future'' in - send future' y - with e -> (* failure occurred *) + on_success future'' (fun x -> send future' x); + on_failure future'' (fun e -> fail future' e); + with e -> fail future' e); + on_failure future + (fun e -> fail future' e); + future' + +let sequence futures = + failwith "not implemented" + +let choose futures = + failwith "not implemented" + +let map f future = + let future' = make () in + on_success future (fun x -> let y = f x in send future' y); + on_failure future (fun e -> fail future' e); future' (** {2 Future constructors} *) let return x = { content = Success x; + handlers = []; mutex = Mutex.create (); condition = Condition.create (); } diff --git a/futures.mli b/futures.mli index 58e6b2a7..78fc4ffa 100644 --- a/futures.mli +++ b/futures.mli @@ -36,8 +36,12 @@ module Pool : sig type t (** A pool of threads *) - val create : size:int -> t - (** Create a pool with the given number of threads. *) + val create : ?max_load:int -> size:int -> t + (** Create a pool with the given number of threads. If the load goes + above the given threshold (default max_int), a new thread is spawned. *) + + val load : t -> int + (** Current number of waiting jobs *) val schedule : t -> (unit -> unit) -> unit (** Schedule a function to run in the pool *) @@ -47,7 +51,35 @@ module Pool : sig end val default_pool : Pool.t - (** Pool of threads that is used by default *) + (** Pool of threads that is used by default. Growable if needed. *) + +(** {2 MVar: a zero-or-one element thread-safe box} *) + +module MVar : sig + type 'a t + + val empty : unit -> 'a t + (** Create an empty box *) + + val full : 'a -> 'a t + (** Create a full box *) + + val is_empty : _ t -> bool + (** Is the box currently empty? *) + + val take : 'a t -> 'a + (** Take value out of the box. Wait if necessary *) + + val put : 'a t -> 'a -> unit + (** Put a value in the box. Waits if the box is already empty *) + + val update : 'a t -> ('a -> 'a) -> 'a * 'a + (** Use given function to atomically update content, and return + the previous value and the new one *) + + val peek : 'a t -> 'a + (** Look at the value, without removing it *) +end (** {2 Basic low-level Future functions} *) @@ -70,9 +102,27 @@ val is_done : 'a t -> bool (** {2 Combinators *) -val flatMap : ?pool:Pool.t -> ('a -> 'b t) -> 'a t -> 'b t +val on_success : 'a t -> ('a -> unit) -> unit + (** Attach a handler to be called upon success *) + +val on_failure : _ t -> (exn -> unit) -> unit + (** Attach a handler to be called upon failure *) + +val on_finish : _ t -> (unit -> unit) -> unit + (** Attach a handler to be called when the future is evaluated *) + +val flatMap : ('a -> 'b t) -> 'a t -> 'b t (** Monadic combination of futures *) +val sequence : 'a t list -> 'a list t + (** Future that waits for all previous sequences to terminate *) + +val choose : 'a t list -> 'a t + (** Choose among those futures (the first to terminate) *) + +val map : ('a -> 'b) -> 'a t -> 'b t + (** Maps the value inside the future *) + (** {2 Future constructors} *) val return : 'a -> 'a t