diff --git a/future.ml b/future.ml index 51372ea4..11363c51 100644 --- a/future.ml +++ b/future.ml @@ -37,7 +37,7 @@ module MVar = struct on_put : Condition.t; (* signal that a value was added (full) *) } - (** Create an empty box *) + (* Create an empty box *) let empty () = { content = None; mutex = Mutex.create (); @@ -45,7 +45,7 @@ module MVar = struct on_put = Condition.create (); } - (** Create a full box *) + (* Create a full box *) let full x = { content = Some x; mutex = Mutex.create (); @@ -53,12 +53,10 @@ module MVar = struct on_put = Condition.create (); } - (** Is the box currently empty? *) - let is_empty box = - Mutex.lock box.mutex; - let ans = box.content <> None in - Mutex.unlock box.mutex; - ans + (* Is the box currently empty? *) + let is_empty box = match box.content with + | Some _ -> true + | None -> false (* assuming we have a lock on given box, wait it gets a value and return it *) let rec wait_put box = @@ -76,7 +74,7 @@ module MVar = struct Condition.wait box.on_take box.mutex; wait_take box (* try again *) - (** Take value out of the box. Wait if necessary *) + (* Take value out of the box. Wait if necessary *) let take box = Mutex.lock box.mutex; let x = wait_put box in @@ -85,7 +83,7 @@ module MVar = struct Mutex.unlock box.mutex; x - (** Put a value in the box. Waits if the box is already full *) + (* Put a value in the box. Waits if the box is already full *) let put box x = Mutex.lock box.mutex; wait_take box; @@ -93,22 +91,22 @@ module MVar = struct Condition.broadcast box.on_put; Mutex.unlock box.mutex - (** Use given function to atomically update content, and return + (* Use given function to atomically update content, and return the previous value and the new one *) let update box f = Mutex.lock box.mutex; let x = wait_put box in try - let y = f x in - box.content <- Some y; + let x', res = f x in + box.content <- Some x'; Condition.broadcast box.on_put; (* signal write *) Mutex.unlock box.mutex; - x, y + res with e -> Mutex.unlock box.mutex; raise e - (** Look at the value, without removing it *) + (* Look at the value, without removing it *) let peek box = Mutex.lock box.mutex; let x = wait_put box in @@ -120,11 +118,11 @@ module type S = sig type 'a t (** A future value of type 'a *) - val run : t -> (unit -> unit) -> unit - (** Run the function in the pool *) + val run : (unit -> unit) -> unit + (** Use the underlying thread pool to run this job *) - val finish : t -> unit - (** Kill threads in the pool *) + val finish : unit -> unit + (** Kill threads in the pool. The pool won't be usable any more. *) (** {2 Basic low-level Future functions} *) @@ -136,10 +134,6 @@ module type S = sig val state : 'a t -> 'a state (** Current state of the future *) - val get : 'a t -> 'a - (** Blocking get: wait for the future to be evaluated, and get the value, - or the exception that failed the future is returned *) - val is_done : 'a t -> bool (** Is the future evaluated (success/failure)? *) @@ -188,10 +182,10 @@ module type S = sig (** {2 Event timer} *) module Timer : sig - val schedule_at : at:float -> (unit -> unit) -> unit + val at : float -> (unit -> unit) -> unit (** [schedule_at ~at act] will run [act] at the Unix echo [at] *) - val schedule_after : after:t -> float -> (unit -> unit) -> unit + val after : float -> (unit -> unit) -> unit (** [schedule_after ~after act] will run [act] in [after] seconds *) end @@ -205,15 +199,58 @@ module type S = sig end module type CONFIG = sig - val timeout : float - + val min_size : int val max_size : int end module DefaultConfig = struct - let timeout = 10. + let min_size = 0 let max_size = 15 - let size = 0 +end + +(** {2 Mutable heap} +inlined here for avoiding dependencies *) +module Heap = struct + (** Implementation from http://en.wikipedia.org/wiki/Skew_heap *) + + type 'a t = { + mutable tree : 'a tree; + cmp : 'a -> 'a -> int; + } (** A pairing tree heap with the given comparison function *) + and 'a tree = + | Empty + | Node of 'a * 'a tree * 'a tree + + let empty ~cmp = { + tree = Empty; + cmp; + } + + let is_empty h = + match h.tree with + | Empty -> true + | Node _ -> false + + let rec union ~cmp t1 t2 = match t1, t2 with + | Empty, _ -> t2 + | _, Empty -> t1 + | Node (x1, l1, r1), Node (x2, l2, r2) -> + if cmp x1 x2 <= 0 + then Node (x1, union ~cmp t2 r1, l1) + else Node (x2, union ~cmp t1 r2, l2) + + let insert h x = + h.tree <- union ~cmp:h.cmp (Node (x, Empty, Empty)) h.tree + + let min h = match h.tree with + | Empty -> raise Not_found + | Node (x, _, _) -> x + + let pop h = match h.tree with + | Empty -> raise Not_found + | Node (x, l, r) -> + h.tree <- union ~cmp:h.cmp l r; + x end module Make(C : CONFIG) = struct @@ -227,76 +264,50 @@ module Make(C : CONFIG) = struct let stop = ref false let mutex = Mutex.create () let jobs = Queue.create () - let threads : waiting_thread list ref = ref [] + let new_task = Condition.create () (* signal when new task *) let cur_size = ref 0 - (* Cleanup waiting threads. precond: pool is locked *) - let cleanup_waiting () = - let l = !threads in - let now = Unix.gettimeofday () in - (* filter threads that have been waiting for too long *) - let l' = List.filter - (fun (time, box) -> - if time +. C.timeout < now - then (MVar.put box Quit; false) - else true) - l in - threads := l' + (* Function that the threads run *) + let rec serve () = + Mutex.lock mutex; + next_task () + (* process next task *) + and next_task () = + if !stop then Condition.broadcast new_task (* and stop *) + else match poll () with + | Some job -> + Mutex.unlock mutex; + begin try job() + with _ -> () + end; + serve () + | None -> + if !cur_size > C.min_size + then () (* stop, too many threads *) + else next_task() + (* poll for next task *) + and poll () = + if Queue.is_empty jobs + then begin + Condition.wait new_task mutex; + if !stop || Queue.is_empty jobs + then None + else begin + let job = Queue.pop jobs in + Condition.signal new_task; + Some job + end + end else + Some (Queue.pop jobs) - (* Function that the threads run. They also take a MVar to get commands *) - let serve box = - (* wait for a job to come *) - let rec wait_job () = - match MVar.take box with - | Quit -> (Mutex.lock mutex; quit ()) (* exit *) - | Perform job -> - run_job job - (* run the given job *) - and run_job job = - (try job () with _ -> ()); - next () (* loop *) - (* process next task *) - and next () = - Mutex.lock mutex; - if !stop then quit () (* stop the pool *) - else if Queue.is_empty jobs - then begin - let now = Unix.gettimeofday () in - (* cleanup waiting threads *) - cleanup_waiting (); - if !cur_size > 1 && List.length !threads + 1 = !cur_size - then - (* all other threads are waiting, we may need to kill them later *) - (Mutex.unlock mutex; delay ()) - else begin - (* add oneself to the list of waiting threads *) - threads := (now, box) :: !threads; - Mutex.unlock mutex; - wait_job () - end - end else - let job = Queue.pop jobs in - Mutex.unlock mutex; - run_job job - (* delay [pool.timeout], so that in case no job is submitted we - still kill old cached threads *) - and delay () = - Thread.delay C.timeout; - next () - (* stop the thread (assume we have pool.mutex) *) - and quit () = - cur_size := !cur_size - 1; - Mutex.unlock mutex - in wait_job () + (* TODO: start thread iff new task and not max_size reached *) - let size pool = !cur_size - - (** Add a thread to the pool, starting with the first job *) + (* Add a thread to the pool, starting with the first job *) let add_thread job = let box = MVar.full job in ignore (Thread.create serve box) - (** Run the job in the given pool *) + (* Run the job in the pool *) let run job = assert (not (!stop)); Mutex.lock mutex; @@ -305,6 +316,7 @@ module Make(C : CONFIG) = struct (* max capacity reached, push task in queue *) Queue.push job jobs | [] -> + assert (!cur_size < C.max_size); (* spawn a thread for the given task *) add_thread (Perform job); cur_size := !cur_size + 1; @@ -346,6 +358,8 @@ module Make(C : CONFIG) = struct handlers = []; } + let state f = f.state + let send future x = match future.state with | NotKnown -> (* set content and signal *) @@ -537,54 +551,13 @@ module Make(C : CONFIG) = struct let sleep time = spawn (fun () -> Thread.delay time; ()) - (** {3 Mutable heap} - inlined here for avoiding dependencies *) - module Heap = struct - (** Implementation from http://en.wikipedia.org/wiki/Skew_heap *) - - type 'a t = { - mutable tree : 'a tree; - cmp : 'a -> 'a -> int; - } (** A pairing tree heap with the given comparison function *) - and 'a tree = - | Empty - | Node of 'a * 'a tree * 'a tree - - let empty ~cmp = { - tree = Empty; - cmp; - } - - let is_empty h = - match h.tree with - | Empty -> true - | Node _ -> false - - let rec union ~cmp t1 t2 = match t1, t2 with - | Empty, _ -> t2 - | _, Empty -> t1 - | Node (x1, l1, r1), Node (x2, l2, r2) -> - if cmp x1 x2 <= 0 - then Node (x1, union ~cmp t2 r1, l1) - else Node (x2, union ~cmp t1 r2, l2) - - let insert h x = - h.tree <- union ~cmp:h.cmp (Node (x, Empty, Empty)) h.tree - - let pop h = match h.tree with - | Empty -> raise Not_found - | Node (x, l, r) -> - h.tree <- union ~cmp:h.cmp l r; - x - end - (** {2 Event timer} *) module Timer = struct let cmp_tasks (f1,_) (f2,_) = compare f1 f2 let stop = ref false - let tasks : (float * (unit -> unit)) Heap.t = Heap.empty cmp:cmp_tasks + let tasks : (float * (unit -> unit)) Heap.t = Heap.empty ~cmp:cmp_tasks let fifo_in, fifo_out = Unix.pipe () let thread = ref None let standby_wait = 30. (* when no task is scheduled *) @@ -601,7 +574,7 @@ module Make(C : CONFIG) = struct try Some (Heap.min tasks) with Not_found -> None in match next_task with - | _ when timer.stop -> Mutex.unlock mutex (* stop *) + | _ when !stop -> Mutex.unlock mutex (* stop *) | None -> Mutex.unlock mutex; wait standby_wait (* wait for a task *) @@ -627,11 +600,11 @@ module Make(C : CONFIG) = struct next () let () = - let t = Thread.create server timer in + let t = Thread.create serve () in thread := Some t; () - let schedule_at ~at task = + let at time task = Mutex.lock mutex; (* time of the next scheduled event *) let next_time = @@ -641,14 +614,14 @@ module Make(C : CONFIG) = struct (* insert task *) Heap.insert tasks (time, task); (* see if the timer thread needs to be awaken earlier *) - (if time < next_time - then ignore (Unix.single_write fifo_out "_" 0 1)); + if time < next_time + then ignore (Unix.single_write fifo_out "_" 0 1); Mutex.unlock mutex; () - let schedule_after ~after task = - assert (delay >= 0.); - schedule_at ~at:(Unix.gettimeofday () +. delay) task + let after after task = + assert (after>= 0.); + at (Unix.gettimeofday () +. after) task end module Infix = struct @@ -657,5 +630,6 @@ module Make(C : CONFIG) = struct end include Infix - end + +module Std = Make(DefaultConfig) diff --git a/future.mli b/future.mli index b3691183..d788706f 100644 --- a/future.mli +++ b/future.mli @@ -48,9 +48,8 @@ module MVar : sig val put : 'a t -> 'a -> unit (** Put a value in the box. Waits if the box is already empty *) - val update : 'a t -> ('a -> 'a) -> 'a * 'a - (** Use given function to atomically update content, and return - the previous value and the new one *) + val update : 'a t -> ('a -> 'a * 'b) -> 'b + (** Use given function to atomically update content, and return a value *) val peek : 'a t -> 'a (** Look at the value, without removing it *) @@ -63,7 +62,7 @@ module type S = sig (** A future value of type 'a *) val run : (unit -> unit) -> unit - (** Run the function in the pool *) + (** Use the underlying thread pool to run this job *) val finish : unit -> unit (** Kill threads in the pool. The pool won't be usable any more. *) @@ -126,10 +125,10 @@ module type S = sig (** {2 Event timer} *) module Timer : sig - val schedule_at : at:float -> (unit -> unit) -> unit + val at : float -> (unit -> unit) -> unit (** [schedule_at ~at act] will run [act] at the Unix echo [at] *) - val schedule_after : after:float -> (unit -> unit) -> unit + val after : float -> (unit -> unit) -> unit (** [schedule_after ~after act] will run [act] in [after] seconds *) end @@ -145,9 +144,8 @@ end (** {2 Functor} *) module type CONFIG = sig - val timeout : float - - val max_size : int + val min_size : int (** Minimum (initial) number of threads *) + val max_size : int (** Maximum number of active threads *) end module DefaultConfig : CONFIG diff --git a/tests/test_future.ml b/tests/test_future.ml index df4b0bc3..c4555115 100644 --- a/tests/test_future.ml +++ b/tests/test_future.ml @@ -27,21 +27,18 @@ let test_parallel () = () let test_time () = - let start = Unix.gettimeofday () in let f1 = F.spawn (fun () -> Thread.delay 0.5) in let f2 = F.spawn (fun () -> Thread.delay 0.5) in - F.get f1; - F.get f2; - let stop = Unix.gettimeofday () in - OUnit.assert_bool "parallelism" (stop -. start < 0.75); - () + Thread.delay 0.75; + match F.state f1, F.state f2 with + | F.Success _, F.Success _ -> () + | _ -> OUnit.assert_failure "parallelism" let test_timer () = - let timer = F.Timer.create () in let mvar = MVar.full 1 in - F.Timer.schedule_in timer 0.5 + F.Timer.after 0.5 (fun () -> ignore (MVar.update mvar (fun x -> x + 2))); - F.Timer.schedule_in timer 0.2 + F.Timer.after 0.2 (fun () -> ignore (MVar.update mvar (fun x -> x * 4))); Thread.delay 0.7; OUnit.assert_equal 6 (MVar.peek mvar);