mirror of
https://github.com/c-cube/ocaml-containers.git
synced 2025-12-06 03:05:28 -05:00
expose an optional Pool argument in most Future\'s combinators;
handlers of a future are scheduled in pool rather than sequentially (to avoid possible deadlocks among them)
This commit is contained in:
parent
08d33095f6
commit
e6eb9a79eb
2 changed files with 50 additions and 42 deletions
80
future.ml
80
future.ml
|
|
@ -25,25 +25,6 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||||
|
|
||||||
(** {1 Futures for concurrency} *)
|
(** {1 Futures for concurrency} *)
|
||||||
|
|
||||||
type 'a t = {
|
|
||||||
mutable content : 'a result;
|
|
||||||
mutable handlers : 'a handler list; (* handlers *)
|
|
||||||
mutex : Mutex.t;
|
|
||||||
condition : Condition.t;
|
|
||||||
} (** A future value of type 'a *)
|
|
||||||
and 'a result =
|
|
||||||
| NotKnown
|
|
||||||
| Success of 'a
|
|
||||||
| Failure of exn
|
|
||||||
(** Result of a computation *)
|
|
||||||
and 'a handler =
|
|
||||||
| OnSuccess of ('a -> unit)
|
|
||||||
| OnFailure of (exn -> unit)
|
|
||||||
| OnFinish of (unit -> unit)
|
|
||||||
|
|
||||||
exception SendTwice
|
|
||||||
(** Exception raised when a future is evaluated several time *)
|
|
||||||
|
|
||||||
(** {2 MVar: a zero-or-one element thread-safe box} *)
|
(** {2 MVar: a zero-or-one element thread-safe box} *)
|
||||||
|
|
||||||
module MVar = struct
|
module MVar = struct
|
||||||
|
|
@ -268,11 +249,34 @@ end
|
||||||
let default_pool = Pool.create ?timeout:None ~size:100
|
let default_pool = Pool.create ?timeout:None ~size:100
|
||||||
(** Default pool of threads, should be ok for most uses. *)
|
(** Default pool of threads, should be ok for most uses. *)
|
||||||
|
|
||||||
|
(** {2 Futures} *)
|
||||||
|
|
||||||
|
type 'a t = {
|
||||||
|
mutable content : 'a result;
|
||||||
|
mutable handlers : 'a handler list; (* handlers *)
|
||||||
|
pool : Pool.t;
|
||||||
|
mutex : Mutex.t;
|
||||||
|
condition : Condition.t;
|
||||||
|
} (** A future value of type 'a *)
|
||||||
|
and 'a result =
|
||||||
|
| NotKnown
|
||||||
|
| Success of 'a
|
||||||
|
| Failure of exn
|
||||||
|
(** Result of a computation *)
|
||||||
|
and 'a handler =
|
||||||
|
| OnSuccess of ('a -> unit)
|
||||||
|
| OnFailure of (exn -> unit)
|
||||||
|
| OnFinish of (unit -> unit)
|
||||||
|
|
||||||
|
exception SendTwice
|
||||||
|
(** Exception raised when a future is evaluated several time *)
|
||||||
|
|
||||||
(** {2 Basic Future functions} *)
|
(** {2 Basic Future functions} *)
|
||||||
|
|
||||||
let make () =
|
let make pool =
|
||||||
{ content = NotKnown;
|
{ content = NotKnown;
|
||||||
handlers = [];
|
handlers = [];
|
||||||
|
pool;
|
||||||
mutex = Mutex.create ();
|
mutex = Mutex.create ();
|
||||||
condition = Condition.create ();
|
condition = Condition.create ();
|
||||||
}
|
}
|
||||||
|
|
@ -305,8 +309,8 @@ let send future x =
|
||||||
Condition.broadcast future.condition;
|
Condition.broadcast future.condition;
|
||||||
List.iter
|
List.iter
|
||||||
(function
|
(function
|
||||||
| OnSuccess f -> f x
|
| OnSuccess f -> Pool.run future.pool (fun () -> f x)
|
||||||
| OnFinish f -> f ()
|
| OnFinish f -> Pool.run future.pool (fun () -> f ())
|
||||||
| OnFailure _ -> ())
|
| OnFailure _ -> ())
|
||||||
future.handlers;
|
future.handlers;
|
||||||
Mutex.unlock future.mutex
|
Mutex.unlock future.mutex
|
||||||
|
|
@ -348,7 +352,7 @@ let on_success future k =
|
||||||
(match future.content with
|
(match future.content with
|
||||||
| NotKnown ->
|
| NotKnown ->
|
||||||
future.handlers <- (OnSuccess k) :: future.handlers; (* wait *)
|
future.handlers <- (OnSuccess k) :: future.handlers; (* wait *)
|
||||||
| Success x -> k x
|
| Success x -> Pool.run future.pool (fun () -> k x)
|
||||||
| Failure _ -> ());
|
| Failure _ -> ());
|
||||||
Mutex.unlock future.mutex
|
Mutex.unlock future.mutex
|
||||||
|
|
||||||
|
|
@ -358,7 +362,7 @@ let on_failure future k =
|
||||||
| NotKnown ->
|
| NotKnown ->
|
||||||
future.handlers <- (OnFailure k) :: future.handlers; (* wait *)
|
future.handlers <- (OnFailure k) :: future.handlers; (* wait *)
|
||||||
| Success _ -> ()
|
| Success _ -> ()
|
||||||
| Failure e -> k e);
|
| Failure e -> Pool.run future.pool (fun () -> k e));
|
||||||
Mutex.unlock future.mutex
|
Mutex.unlock future.mutex
|
||||||
|
|
||||||
let on_finish future k =
|
let on_finish future k =
|
||||||
|
|
@ -366,11 +370,12 @@ let on_finish future k =
|
||||||
(match future.content with
|
(match future.content with
|
||||||
| NotKnown ->
|
| NotKnown ->
|
||||||
future.handlers <- (OnFinish k) :: future.handlers; (* wait *)
|
future.handlers <- (OnFinish k) :: future.handlers; (* wait *)
|
||||||
| Success _ | Failure _ -> k ());
|
| Success _ | Failure _ -> Pool.run future.pool (fun () -> k ()));
|
||||||
Mutex.unlock future.mutex
|
Mutex.unlock future.mutex
|
||||||
|
|
||||||
let flatMap f future =
|
let flatMap ?pool f future =
|
||||||
let future' = make () in
|
let pool = match pool with | Some p -> p | None -> future.pool in
|
||||||
|
let future' = make pool in
|
||||||
(* if [future] succeeds with [x], we spawn a new job to compute [f x] *)
|
(* if [future] succeeds with [x], we spawn a new job to compute [f x] *)
|
||||||
on_success future
|
on_success future
|
||||||
(fun x ->
|
(fun x ->
|
||||||
|
|
@ -384,14 +389,14 @@ let flatMap f future =
|
||||||
(fun e -> fail future' e);
|
(fun e -> fail future' e);
|
||||||
future'
|
future'
|
||||||
|
|
||||||
let andThen future f =
|
let andThen ?pool future f =
|
||||||
flatMap (fun _ -> f ()) future
|
flatMap ?pool (fun _ -> f ()) future
|
||||||
|
|
||||||
let sequence futures =
|
let sequence ?(pool=default_pool) futures =
|
||||||
let a = Array.of_list futures in
|
let a = Array.of_list futures in
|
||||||
let n = Array.length a in
|
let n = Array.length a in
|
||||||
let results = Array.make n NotKnown in
|
let results = Array.make n NotKnown in
|
||||||
let future' = make () in
|
let future' = make default_pool in
|
||||||
(* state: how many remain to finish *)
|
(* state: how many remain to finish *)
|
||||||
let count = MVar.full (Array.length a) in
|
let count = MVar.full (Array.length a) in
|
||||||
(* when all futures returned, collect results for future' *)
|
(* when all futures returned, collect results for future' *)
|
||||||
|
|
@ -425,8 +430,8 @@ let sequence futures =
|
||||||
done;
|
done;
|
||||||
future'
|
future'
|
||||||
|
|
||||||
let choose futures =
|
let choose ?(pool=default_pool) futures =
|
||||||
let future' = make () in
|
let future' = make default_pool in
|
||||||
let one_finished = MVar.full false in
|
let one_finished = MVar.full false in
|
||||||
(* handlers. The first handler to be called will update [one_finished]
|
(* handlers. The first handler to be called will update [one_finished]
|
||||||
to true, see that it was false (hence know it is the first)
|
to true, see that it was false (hence know it is the first)
|
||||||
|
|
@ -446,8 +451,8 @@ let choose futures =
|
||||||
futures;
|
futures;
|
||||||
future'
|
future'
|
||||||
|
|
||||||
let map f future =
|
let map ?(pool=default_pool) f future =
|
||||||
let future' = make () in
|
let future' = make pool in
|
||||||
on_success future (fun x -> let y = f x in send future' y);
|
on_success future (fun x -> let y = f x in send future' y);
|
||||||
on_failure future (fun e -> fail future' e);
|
on_failure future (fun e -> fail future' e);
|
||||||
future'
|
future'
|
||||||
|
|
@ -457,12 +462,13 @@ let map f future =
|
||||||
let return x =
|
let return x =
|
||||||
{ content = Success x;
|
{ content = Success x;
|
||||||
handlers = [];
|
handlers = [];
|
||||||
|
pool = default_pool;
|
||||||
mutex = Mutex.create ();
|
mutex = Mutex.create ();
|
||||||
condition = Condition.create ();
|
condition = Condition.create ();
|
||||||
}
|
}
|
||||||
|
|
||||||
let spawn ?(pool=default_pool) f =
|
let spawn ?(pool=default_pool) f =
|
||||||
let future = make () in
|
let future = make pool in
|
||||||
(* schedule computation *)
|
(* schedule computation *)
|
||||||
Pool.run pool
|
Pool.run pool
|
||||||
(fun () ->
|
(fun () ->
|
||||||
|
|
@ -508,6 +514,8 @@ let spawn_process ?(pool=default_pool) ?(stdin="") ~cmd =
|
||||||
| Unix.WSTOPPED i -> i in
|
| Unix.WSTOPPED i -> i in
|
||||||
(returncode, out', err'))
|
(returncode, out', err'))
|
||||||
|
|
||||||
|
(* TODO a global scheduler for timed events *)
|
||||||
|
|
||||||
let sleep ?(pool=default_pool) time =
|
let sleep ?(pool=default_pool) time =
|
||||||
spawn ~pool
|
spawn ~pool
|
||||||
(fun () -> Thread.delay time; ())
|
(fun () -> Thread.delay time; ())
|
||||||
|
|
|
||||||
12
future.mli
12
future.mli
|
|
@ -83,7 +83,7 @@ val default_pool : Pool.t
|
||||||
|
|
||||||
(** {2 Basic low-level Future functions} *)
|
(** {2 Basic low-level Future functions} *)
|
||||||
|
|
||||||
val make : unit -> 'a t
|
val make : Pool.t -> 'a t
|
||||||
(** Create a future, representing a value that is not known yet. *)
|
(** Create a future, representing a value that is not known yet. *)
|
||||||
|
|
||||||
val get : 'a t -> 'a
|
val get : 'a t -> 'a
|
||||||
|
|
@ -111,19 +111,19 @@ val on_failure : _ t -> (exn -> unit) -> unit
|
||||||
val on_finish : _ t -> (unit -> unit) -> unit
|
val on_finish : _ t -> (unit -> unit) -> unit
|
||||||
(** Attach a handler to be called when the future is evaluated *)
|
(** Attach a handler to be called when the future is evaluated *)
|
||||||
|
|
||||||
val flatMap : ('a -> 'b t) -> 'a t -> 'b t
|
val flatMap : ?pool:Pool.t -> ('a -> 'b t) -> 'a t -> 'b t
|
||||||
(** Monadic combination of futures *)
|
(** Monadic combination of futures *)
|
||||||
|
|
||||||
val andThen : 'a t -> (unit -> 'b t) -> 'b t
|
val andThen : ?pool:Pool.t -> 'a t -> (unit -> 'b t) -> 'b t
|
||||||
(** Wait for the first future to succeed, then launch the second *)
|
(** Wait for the first future to succeed, then launch the second *)
|
||||||
|
|
||||||
val sequence : 'a t list -> 'a list t
|
val sequence : ?pool:Pool.t -> 'a t list -> 'a list t
|
||||||
(** Future that waits for all previous sequences to terminate *)
|
(** Future that waits for all previous sequences to terminate *)
|
||||||
|
|
||||||
val choose : 'a t list -> 'a t
|
val choose : ?pool:Pool.t -> 'a t list -> 'a t
|
||||||
(** Choose among those futures (the first to terminate) *)
|
(** Choose among those futures (the first to terminate) *)
|
||||||
|
|
||||||
val map : ('a -> 'b) -> 'a t -> 'b t
|
val map : ?pool:Pool.t -> ('a -> 'b) -> 'a t -> 'b t
|
||||||
(** Maps the value inside the future *)
|
(** Maps the value inside the future *)
|
||||||
|
|
||||||
(** {2 Future constructors} *)
|
(** {2 Future constructors} *)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue