diff --git a/src/d_pool_.ml b/src/d_pool_.ml new file mode 100644 index 00000000..e2368ceb --- /dev/null +++ b/src/d_pool_.ml @@ -0,0 +1,33 @@ +type domain = Domain_.t + +let work_ _i q : unit = + while true do + let f = S_queue.pop q in + try f () with _ -> () + done + +(* A domain level worker. It should not do too much except for starting + new threads for pools. *) +type worker = { q: (unit -> unit) S_queue.t } [@@unboxed] + +let domains_ : worker array lazy_t = + lazy + (let n = Domain_.recommended_number () in + Array.init n (fun i -> + let q = S_queue.create () in + let _domain : domain = Domain_.spawn (fun () -> work_ i q) in + { q })) + +let[@inline] n_domains () : int = Array.length (Lazy.force domains_) + +let run_on (i : int) (f : unit -> unit) : unit = + let (lazy arr) = domains_ in + assert (i < Array.length arr); + S_queue.push arr.(i).q f + +let run_on_and_wait (i : int) (f : unit -> 'a) : 'a = + let q = S_queue.create () in + run_on i (fun () -> + let x = f () in + S_queue.push q x); + S_queue.pop q diff --git a/src/d_pool_.mli b/src/d_pool_.mli new file mode 100644 index 00000000..1dd05d5f --- /dev/null +++ b/src/d_pool_.mli @@ -0,0 +1,20 @@ +(** Static pool of domains. + + These domains are shared between {b all} the pools in moonpool. + The rationale is that we should not have more domains than cores, so + it's easier to pre-allocate exactly that many domains, and run more flexible + thread pools on top. +*) + +type domain = Domain_.t + +val n_domains : unit -> int +(** Number of domains in the pool *) + +val run_on : int -> (unit -> unit) -> unit +(** [run_on i f] runs [f()] on the domain with index [i]. + Precondition: [0 <= i < n_domains()] *) + +val run_on_and_wait : int -> (unit -> 'a) -> 'a +(** [run_on_and_wait i f] runs [f()] on the domain with index [i], + and blocks until the result of [f()] is returned back. *) diff --git a/src/dune b/src/dune index 35d06f81..2dcceea2 100644 --- a/src/dune +++ b/src/dune @@ -1,7 +1,7 @@ (library (public_name moonpool) (name moonpool) - (private_modules atomic_ domain_) + (private_modules atomic_ domain_ d_pool_) (libraries threads either)) (rule diff --git a/src/fut.ml b/src/fut.ml new file mode 100644 index 00000000..9e093290 --- /dev/null +++ b/src/fut.ml @@ -0,0 +1,341 @@ +module A = Atomic_ + +type 'a or_error = ('a, exn * Printexc.raw_backtrace) result +type 'a waiter = 'a or_error -> unit + +type 'a state = + | Done of 'a or_error + | Waiting of 'a waiter list + +type 'a t = { st: 'a state A.t } [@@unboxed] +type 'a promise = 'a t + +let make () = + let fut = { st = A.make (Waiting []) } in + fut, fut + +let of_result x : _ t = { st = A.make (Done x) } +let[@inline] return x : _ t = of_result (Ok x) +let[@inline] fail e bt : _ t = of_result (Error (e, bt)) + +let[@inline] is_resolved self : bool = + match A.get self.st with + | Done _ -> true + | Waiting _ -> false + +let[@inline] peek self : _ option = + match A.get self.st with + | Done x -> Some x + | Waiting _ -> None + +let on_result (self : _ t) (f : _ waiter) : unit = + while + let st = A.get self.st in + match st with + | Done x -> + f x; + false + | Waiting l -> + let must_retry = not (A.compare_and_set self.st st (Waiting (f :: l))) in + must_retry + do + () + done + +exception Already_fulfilled + +let fulfill (self : _ t) (r : _ result) : unit = + while + let st = A.get self.st in + match st with + | Done _ -> raise Already_fulfilled + | Waiting l -> + let did_swap = A.compare_and_set self.st st (Done r) in + if did_swap then ( + (* success, now call all the waiters *) + List.iter (fun f -> try f r with _ -> ()) l; + false + ) else + true + do + () + done + +let[@inline] fulfill_idempotent self r = + try fulfill self r with Already_fulfilled -> () + +(* ### combinators ### *) + +let spawn ~on f : _ t = + let fut, promise = make () in + + let task () = + let res = + try Ok (f ()) + with e -> + let bt = Printexc.get_raw_backtrace () in + Error (e, bt) + in + fulfill promise res + in + + Pool.run on task; + fut + +let map ?on ~f fut : _ t = + let map_res r = + match r with + | Ok x -> + (try Ok (f x) + with e -> + let bt = Printexc.get_raw_backtrace () in + Error (e, bt)) + | Error e_bt -> Error e_bt + in + + match peek fut with + | Some r -> of_result (map_res r) + | None -> + let fut2, promise = make () in + on_result fut (fun r -> + let map_and_fulfill () = + let res = map_res r in + fulfill promise res + in + + match on with + | None -> map_and_fulfill () + | Some on -> Pool.run on map_and_fulfill); + + fut2 + +let bind ?on ~f fut : _ t = + let apply_f_to_res r : _ t = + match r with + | Ok x -> + (try f x + with e -> + let bt = Printexc.get_raw_backtrace () in + fail e bt) + | Error (e, bt) -> fail e bt + in + + match peek fut with + | Some r -> apply_f_to_res r + | None -> + let fut2, promise = make () in + on_result fut (fun r -> + let bind_and_fulfill () = + let f_res_fut = apply_f_to_res r in + (* forward result *) + on_result f_res_fut (fun r -> fulfill promise r) + in + + match on with + | None -> bind_and_fulfill () + | Some on -> Pool.run on bind_and_fulfill); + + fut2 + +let rec update_ (st : 'a A.t) f : 'a = + let x = A.get st in + let y = f x in + if A.compare_and_set st x y then + y + else + update_ st f + +let both a b : _ t = + match peek a, peek b with + | Some (Ok x), Some (Ok y) -> return (x, y) + | Some (Error (e, bt)), _ | _, Some (Error (e, bt)) -> fail e bt + | _ -> + let fut, promise = make () in + + let st = A.make `Neither in + on_result a (function + | Error err -> fulfill_idempotent promise (Error err) + | Ok x -> + (match + update_ st (function + | `Neither -> `Left x + | `Right y -> `Both (x, y) + | _ -> assert false) + with + | `Both (x, y) -> fulfill promise (Ok (x, y)) + | _ -> ())); + on_result b (function + | Error err -> fulfill_idempotent promise (Error err) + | Ok y -> + (match + update_ st (function + | `Left x -> `Both (x, y) + | `Neither -> `Right y + | _ -> assert false) + with + | `Both (x, y) -> fulfill promise (Ok (x, y)) + | _ -> ())); + fut + +let choose a b : _ t = + match peek a, peek b with + | Some (Ok x), _ -> return (Either.Left x) + | _, Some (Ok y) -> return (Either.Right y) + | Some (Error (e, bt)), Some (Error _) -> fail e bt + | _ -> + let fut, promise = make () in + + let one_failure = A.make false in + on_result a (function + | Error err -> + if A.exchange one_failure true then + (* the other one failed already *) + fulfill_idempotent promise (Error err) + | Ok x -> fulfill_idempotent promise (Ok (Either.Left x))); + on_result b (function + | Error err -> + if A.exchange one_failure true then + (* the other one failed already *) + fulfill_idempotent promise (Error err) + | Ok y -> fulfill_idempotent promise (Ok (Either.Right y))); + fut + +let choose_same a b : _ t = + match peek a, peek b with + | Some (Ok x), _ -> return x + | _, Some (Ok y) -> return y + | Some (Error (e, bt)), Some (Error _) -> fail e bt + | _ -> + let fut, promise = make () in + + let one_failure = A.make false in + on_result a (function + | Error err -> + if A.exchange one_failure true then + fulfill_idempotent promise (Error err) + | Ok x -> fulfill_idempotent promise (Ok x)); + on_result b (function + | Error err -> + if A.exchange one_failure true then + fulfill_idempotent promise (Error err) + | Ok y -> fulfill_idempotent promise (Ok y)); + fut + +let peek_ok_assert_ (self : 'a t) : 'a = + match A.get self.st with + | Done (Ok x) -> x + | _ -> assert false + +let join_container_ ~iter ~map ~len cont : _ t = + let fut, promise = make () in + let missing = A.make (len cont) in + + (* callback called when a future in [a] is resolved *) + let on_res = function + | Ok _ -> + let n = A.fetch_and_add missing (-1) in + if n = 1 then ( + (* last future, we know they all succeeded, so resolve [fut] *) + let res = map peek_ok_assert_ cont in + fulfill promise (Ok res) + ) + | Error e_bt -> + (* immediately cancel all other [on_res] *) + let n = A.exchange missing 0 in + if n > 0 then + (* we're the only one to set to 0, so we can fulfill [fut] + with an error. *) + fulfill promise (Error e_bt) + in + + iter (fun fut -> on_result fut on_res) cont; + fut + +let join_array (a : _ t array) : _ array t = + match Array.length a with + | 0 -> return [||] + | 1 -> map ?on:None a.(1) ~f:(fun x -> [| x |]) + | _ -> join_container_ ~len:Array.length ~map:Array.map ~iter:Array.iter a + +let join_list (l : _ t list) : _ list t = + match l with + | [] -> return [] + | [ x ] -> map ?on:None x ~f:(fun x -> [ x ]) + | _ -> join_container_ ~len:List.length ~map:List.map ~iter:List.iter l + +let wait_array (a : _ t array) : unit t = + join_container_ a ~iter:Array.iter ~len:Array.length ~map:(fun _f _ -> ()) + +let wait_list (a : _ t list) : unit t = + join_container_ a ~iter:List.iter ~len:List.length ~map:(fun _f _ -> ()) + +let for_ ~on n f : unit t = + let futs = Array.init n (fun i -> spawn ~on (fun () -> f i)) in + join_container_ + ~len:(fun () -> n) + ~iter:(fun f () -> Array.iter f futs) + ~map:(fun _f () -> ()) + () + +(* ### blocking ### *) + +let wait_block (self : 'a t) : 'a or_error = + match A.get self.st with + | Done x -> x (* fast path *) + | Waiting _ -> + let real_block () = + (* use queue only once *) + let q = S_queue.create () in + on_result self (fun r -> S_queue.push q r); + S_queue.pop q + in + + (* a bit of spinlock *) + let rec loop i = + if i = 0 then + real_block () + else ( + match A.get self.st with + | Done x -> x + | Waiting _ -> + Domain_.relax (); + (loop [@tailcall]) (i - 1) + ) + in + loop 50 + +let wait_block_exn self = + match wait_block self with + | Ok x -> x + | Error (e, bt) -> Printexc.raise_with_backtrace e bt + +module type INFIX = sig + val ( >|= ) : 'a t -> ('a -> 'b) -> 'b t + val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t + val ( let+ ) : 'a t -> ('a -> 'b) -> 'b t + val ( and+ ) : 'a t -> 'b t -> ('a * 'b) t + val ( let* ) : 'a t -> ('a -> 'b t) -> 'b t + val ( and* ) : 'a t -> 'b t -> ('a * 'b) t +end + +module Infix_ (X : sig + val pool : Pool.t option +end) : INFIX = struct + let[@inline] ( >|= ) x f = map ?on:X.pool ~f x + let[@inline] ( >>= ) x f = bind ?on:X.pool ~f x + let ( let+ ) = ( >|= ) + let ( let* ) = ( >>= ) + let ( and+ ) = both + let ( and* ) = both +end + +include Infix_ (struct + let pool = None +end) + +module Infix (X : sig + val pool : Pool.t +end) = +Infix_ (struct + let pool = Some X.pool +end) diff --git a/src/fut.mli b/src/fut.mli new file mode 100644 index 00000000..b91847db --- /dev/null +++ b/src/fut.mli @@ -0,0 +1,133 @@ +(** Futures *) + +type 'a or_error = ('a, exn * Printexc.raw_backtrace) result + +type 'a t +(** A future with a result of type ['a] *) + +type 'a promise +(** A promise, which can be fulfilled exactly once to set + the corresponding future *) + +val make : unit -> 'a t * 'a promise +(** Make a new future with the associated promise *) + +val on_result : 'a t -> ('a or_error -> unit) -> unit +(** [on_result fut f] registers [f] to be called in the future + when [fut] is set ; + or calls [f] immediately if [fut] is already set. *) + +exception Already_fulfilled + +val fulfill : 'a promise -> 'a or_error -> unit +(** Fullfill the promise, setting the future at the same time. + @raise Already_fulfilled if the promise is already fulfilled. *) + +val fulfill_idempotent : 'a promise -> 'a or_error -> unit +(** Fullfill the promise, setting the future at the same time. + Does nothing if the promise is already fulfilled. *) + +val return : 'a -> 'a t +(** Already settled future, with a result *) + +val fail : exn -> Printexc.raw_backtrace -> _ t +(** Already settled future, with a failure *) + +val of_result : 'a or_error -> 'a t + +val is_resolved : _ t -> bool +(** [is_resolved fut] is [true] iff [fut] is resolved. *) + +val peek : 'a t -> 'a or_error option +(** [peek fut] returns [Some r] if [fut] is currently resolved with [r], + and [None] if [fut] is not resolved yet. *) + +(** {2 Combinators} *) + +val spawn : on:Pool.t -> (unit -> 'a) -> 'a t +(** [spaw ~on f] runs [f()] on the given pool, and return a future that will + hold its result. *) + +val map : ?on:Pool.t -> f:('a -> 'b) -> 'a t -> 'b t +(** [map ?on ~f fut] returns a new future [fut2] that resolves + with [f x] if [fut] resolved with [x]; + and fails with [e] if [fut] fails with [e] or [f x] raises [e]. + @param on if provided, [f] runs on the given pool *) + +val bind : ?on:Pool.t -> f:('a -> 'b t) -> 'a t -> 'b t +(** [map ?on ~f fut] returns a new future [fut2] that resolves + like the future [f x] if [fut] resolved with [x]; + and fails with [e] if [fut] fails with [e] or [f x] raises [e]. + @param on if provided, [f] runs on the given pool *) + +val both : 'a t -> 'b t -> ('a * 'b) t +(** [both a b] succeeds with [x, y] if [a] succeeds with [x] and + [b] succeeds with [y], or fails if any of them fails. *) + +val choose : 'a t -> 'b t -> ('a, 'b) Either.t t +(** [choose a b] succeeds [Left x] or [Right y] if [a] succeeds with [x] or + [b] succeeds with [y], or fails if both of them fails. + If they both succeed, it is not specified which result is used. *) + +val choose_same : 'a t -> 'a t -> 'a t +(** [choose_same a b] succeeds with the value of one of [a] or [b] if + they succeed, or fails if both fail. + If they both succeed, it is not specified which result is used. *) + +val join_array : 'a t array -> 'a array t +(** Wait for all the futures in the array. Fails if any future fails. *) + +val join_list : 'a t list -> 'a list t +(** Wait for all the futures in the list. Fails if any future fails. *) + +val wait_array : _ t array -> unit t +(** [wait_array arr] waits for all futures in [arr] to resolve. It discards + the individual results of futures in [arr]. It fails if any future fails. *) + +val wait_list : _ t list -> unit t +(** [wait_list l] waits for all futures in [l] to resolve. It discards + the individual results of futures in [l]. It fails if any future fails. *) + +val for_ : on:Pool.t -> int -> (int -> unit) -> unit t +(** [for_ ~on n f] runs [f 0], [f 1], …, [f (n-1)] on the pool, and returns + a future that resolves when all the tasks have resolved, or fails + as soon as one task has failed. *) + +(** {2 Blocking} *) + +val wait_block : 'a t -> 'a or_error +(** [wait_block fut] blocks the current thread until [fut] is resolved, + and returns its value. + + A word of warning: this will monopolize the calling thread until the future + resolves. This can also easily cause deadlocks, if enough threads in a pool + call [wait_block] on futures running on the same pool or a pool depending on it. + + A good rule to avoid deadlocks is to run this from outside of any pool, + or to have an acyclic order between pools where [wait_block] + is only called from a pool on futures evaluated in a pool that comes lower + in the hierarchy. + If this rule is broken, it is possible for all threads in a pool to wait + for futures that can only make progress on these same threads, + hence the deadlock. + *) + +val wait_block_exn : 'a t -> 'a +(** Same as {!wait_block} but re-raises the exception if the future failed. *) + +module type INFIX = sig + val ( >|= ) : 'a t -> ('a -> 'b) -> 'b t + val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t + val ( let+ ) : 'a t -> ('a -> 'b) -> 'b t + val ( and+ ) : 'a t -> 'b t -> ('a * 'b) t + val ( let* ) : 'a t -> ('a -> 'b t) -> 'b t + val ( and* ) : 'a t -> 'b t -> ('a * 'b) t +end + +include INFIX +(** Operators that run on the same thread *) + +(** Make infix combinators *) +module Infix (_ : sig + val pool : Pool.t +end) : INFIX diff --git a/src/moonpool.ml b/src/moonpool.ml index 12dc5fad..9c3a7bf3 100644 --- a/src/moonpool.ml +++ b/src/moonpool.ml @@ -1,544 +1,8 @@ -module A = Atomic_ - type 'a or_error = ('a, exn * Printexc.raw_backtrace) result -(** Simple blocking queue *) -module S_queue : sig - type 'a t - - val create : unit -> _ t - val push : 'a t -> 'a -> unit - val pop : 'a t -> 'a -end = struct - type 'a t = { - mutex: Mutex.t; - cond: Condition.t; - q: 'a Queue.t; - } - - let create () : _ t = - { mutex = Mutex.create (); cond = Condition.create (); q = Queue.create () } - - let push (self : _ t) x : unit = - Mutex.lock self.mutex; - Queue.push x self.q; - Condition.signal self.cond; - Mutex.unlock self.mutex - - let pop (self : 'a t) : 'a = - Mutex.lock self.mutex; - let rec loop () = - if Queue.is_empty self.q then ( - Condition.wait self.cond self.mutex; - (loop [@tailcall]) () - ) else ( - let x = Queue.pop self.q in - Mutex.unlock self.mutex; - x - ) - in - loop () -end - -(** Static pool of domains *) -module D_pool_ = struct - type domain = Domain_.t - - let work_ _i q : unit = - while true do - let f = S_queue.pop q in - try f () with _ -> () - done - - (* A domain level worker. It should not do too much except for starting - new threads for pools. *) - type worker = { q: (unit -> unit) S_queue.t } [@@unboxed] - - let domains_ : worker array lazy_t = - lazy - (let n = Domain_.recommended_number () in - Array.init n (fun i -> - let q = S_queue.create () in - let _domain : domain = Domain_.spawn (fun () -> work_ i q) in - { q })) - - (** Number of domains in the pool *) - let[@inline] n_domains () : int = Array.length (Lazy.force domains_) - - let run_on (i : int) (f : unit -> unit) : unit = - let (lazy arr) = domains_ in - assert (i < Array.length arr); - S_queue.push arr.(i).q f - - let run_on_and_wait (i : int) (f : unit -> 'a) : 'a = - let q = S_queue.create () in - run_on i (fun () -> - let x = f () in - S_queue.push q x); - S_queue.pop q -end - let start_thread_on_some_domain f x = let did = Random.int (D_pool_.n_domains ()) in D_pool_.run_on_and_wait did (fun () -> Thread.create f x) -module Pool = struct - (* TODO: use a better queue for the tasks *) - - type t = { - active: bool A.t; - threads: Thread.t array; - q: (unit -> unit) S_queue.t; - } - - type thread_loop_wrapper = - thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit - - let global_thread_wrappers_ : thread_loop_wrapper list A.t = A.make [] - - let add_global_thread_loop_wrapper f : unit = - while - let l = A.get global_thread_wrappers_ in - not (A.compare_and_set global_thread_wrappers_ l (f :: l)) - do - () - done - - let[@inline] run self f : unit = S_queue.push self.q f - - let worker_thread_ ~on_exn (active : bool A.t) (q : _ S_queue.t) : unit = - while A.get active do - let task = S_queue.pop q in - try task () - with e -> - let bt = Printexc.get_raw_backtrace () in - on_exn e bt - done - - let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = () - - let create ?(on_init_thread = default_thread_init_exit_) - ?(on_exit_thread = default_thread_init_exit_) ?(thread_wrappers = []) - ?(on_exn = fun _ _ -> ()) ?(min = 1) ?(per_domain = 0) () : t = - (* number of threads to run *) - let min = max 1 min in - let n_domains = D_pool_.n_domains () in - assert (n_domains >= 1); - let n = max min (n_domains * per_domain) in - - (* make sure we don't bias towards the first domain(s) in {!D_pool_} *) - let offset = Random.int n_domains in - - let active = A.make true in - let q = S_queue.create () in - - let pool = - let dummy = Thread.self () in - { active; threads = Array.make n dummy; q } - in - - (* temporary queue used to obtain thread handles from domains - on which the thread are started. *) - let receive_threads = S_queue.create () in - - (* start the thread with index [i] *) - let start_thread_with_idx i = - let dom_idx = (offset + i) mod n_domains in - - (* function run in the thread itself *) - let main_thread_fun () = - let thread = Thread.self () in - let t_id = Thread.id thread in - on_init_thread ~dom_id:dom_idx ~t_id (); - - let all_wrappers = - List.rev_append thread_wrappers (A.get global_thread_wrappers_) - in - - let run () = worker_thread_ ~on_exn active q in - (* the actual worker loop is [worker_thread_], with all - wrappers for this pool and for all pools (global_thread_wrappers_) *) - let run' = - List.fold_left (fun run f -> f ~thread ~pool run) run all_wrappers - in - - (* now run the main loop *) - run' (); - on_exit_thread ~dom_id:dom_idx ~t_id () - in - - (* function called in domain with index [i], to - create the thread and push it into [receive_threads] *) - let create_thread_in_domain () = - let thread = Thread.create main_thread_fun () in - (* send the thread from the domain back to us *) - S_queue.push receive_threads (i, thread) - in - - D_pool_.run_on dom_idx create_thread_in_domain - in - - (* start all threads, placing them on the domains - according to their index and [offset] in a round-robin fashion. *) - for i = 0 to n - 1 do - start_thread_with_idx i - done; - - (* receive the newly created threads back from domains *) - for _j = 1 to n do - let i, th = S_queue.pop receive_threads in - pool.threads.(i) <- th - done; - pool - - let shutdown (self : t) : unit = - let was_active = A.exchange self.active false in - (* make sure to wakeup all the sleeping threads by scheduling one task each. - This way, a thread that is asleep, waiting for tasks, - will wakeup to process this trivial task, check [self.active], and terminate. *) - if was_active then Array.iter (fun _ -> run self ignore) self.threads; - Array.iter Thread.join self.threads -end - -module Fut = struct - type 'a waiter = 'a or_error -> unit - - type 'a state = - | Done of 'a or_error - | Waiting of 'a waiter list - - type 'a t = { st: 'a state A.t } [@@unboxed] - type 'a promise = 'a t - - let make () = - let fut = { st = A.make (Waiting []) } in - fut, fut - - let of_result x : _ t = { st = A.make (Done x) } - let[@inline] return x : _ t = of_result (Ok x) - let[@inline] fail e bt : _ t = of_result (Error (e, bt)) - - let[@inline] is_resolved self : bool = - match A.get self.st with - | Done _ -> true - | Waiting _ -> false - - let[@inline] peek self : _ option = - match A.get self.st with - | Done x -> Some x - | Waiting _ -> None - - let on_result (self : _ t) (f : _ waiter) : unit = - while - let st = A.get self.st in - match st with - | Done x -> - f x; - false - | Waiting l -> - let must_retry = - not (A.compare_and_set self.st st (Waiting (f :: l))) - in - must_retry - do - () - done - - exception Already_fulfilled - - let fulfill (self : _ t) (r : _ result) : unit = - while - let st = A.get self.st in - match st with - | Done _ -> raise Already_fulfilled - | Waiting l -> - let did_swap = A.compare_and_set self.st st (Done r) in - if did_swap then ( - (* success, now call all the waiters *) - List.iter (fun f -> try f r with _ -> ()) l; - false - ) else - true - do - () - done - - let[@inline] fulfill_idempotent self r = - try fulfill self r with Already_fulfilled -> () - - (* ### combinators ### *) - - let spawn ~on f : _ t = - let fut, promise = make () in - - let task () = - let res = - try Ok (f ()) - with e -> - let bt = Printexc.get_raw_backtrace () in - Error (e, bt) - in - fulfill promise res - in - - Pool.run on task; - fut - - let map ?on ~f fut : _ t = - let map_res r = - match r with - | Ok x -> - (try Ok (f x) - with e -> - let bt = Printexc.get_raw_backtrace () in - Error (e, bt)) - | Error e_bt -> Error e_bt - in - - match peek fut with - | Some r -> of_result (map_res r) - | None -> - let fut2, promise = make () in - on_result fut (fun r -> - let map_and_fulfill () = - let res = map_res r in - fulfill promise res - in - - match on with - | None -> map_and_fulfill () - | Some on -> Pool.run on map_and_fulfill); - - fut2 - - let bind ?on ~f fut : _ t = - let apply_f_to_res r : _ t = - match r with - | Ok x -> - (try f x - with e -> - let bt = Printexc.get_raw_backtrace () in - fail e bt) - | Error (e, bt) -> fail e bt - in - - match peek fut with - | Some r -> apply_f_to_res r - | None -> - let fut2, promise = make () in - on_result fut (fun r -> - let bind_and_fulfill () = - let f_res_fut = apply_f_to_res r in - (* forward result *) - on_result f_res_fut (fun r -> fulfill promise r) - in - - match on with - | None -> bind_and_fulfill () - | Some on -> Pool.run on bind_and_fulfill); - - fut2 - - let rec update_ (st : 'a A.t) f : 'a = - let x = A.get st in - let y = f x in - if A.compare_and_set st x y then - y - else - update_ st f - - let both a b : _ t = - match peek a, peek b with - | Some (Ok x), Some (Ok y) -> return (x, y) - | Some (Error (e, bt)), _ | _, Some (Error (e, bt)) -> fail e bt - | _ -> - let fut, promise = make () in - - let st = A.make `Neither in - on_result a (function - | Error err -> fulfill_idempotent promise (Error err) - | Ok x -> - (match - update_ st (function - | `Neither -> `Left x - | `Right y -> `Both (x, y) - | _ -> assert false) - with - | `Both (x, y) -> fulfill promise (Ok (x, y)) - | _ -> ())); - on_result b (function - | Error err -> fulfill_idempotent promise (Error err) - | Ok y -> - (match - update_ st (function - | `Left x -> `Both (x, y) - | `Neither -> `Right y - | _ -> assert false) - with - | `Both (x, y) -> fulfill promise (Ok (x, y)) - | _ -> ())); - fut - - let choose a b : _ t = - match peek a, peek b with - | Some (Ok x), _ -> return (Either.Left x) - | _, Some (Ok y) -> return (Either.Right y) - | Some (Error (e, bt)), Some (Error _) -> fail e bt - | _ -> - let fut, promise = make () in - - let one_failure = A.make false in - on_result a (function - | Error err -> - if A.exchange one_failure true then - (* the other one failed already *) - fulfill_idempotent promise (Error err) - | Ok x -> fulfill_idempotent promise (Ok (Either.Left x))); - on_result b (function - | Error err -> - if A.exchange one_failure true then - (* the other one failed already *) - fulfill_idempotent promise (Error err) - | Ok y -> fulfill_idempotent promise (Ok (Either.Right y))); - fut - - let choose_same a b : _ t = - match peek a, peek b with - | Some (Ok x), _ -> return x - | _, Some (Ok y) -> return y - | Some (Error (e, bt)), Some (Error _) -> fail e bt - | _ -> - let fut, promise = make () in - - let one_failure = A.make false in - on_result a (function - | Error err -> - if A.exchange one_failure true then - fulfill_idempotent promise (Error err) - | Ok x -> fulfill_idempotent promise (Ok x)); - on_result b (function - | Error err -> - if A.exchange one_failure true then - fulfill_idempotent promise (Error err) - | Ok y -> fulfill_idempotent promise (Ok y)); - fut - - let peek_ok_assert_ (self : 'a t) : 'a = - match A.get self.st with - | Done (Ok x) -> x - | _ -> assert false - - let join_container_ ~iter ~map ~len cont : _ t = - let fut, promise = make () in - let missing = A.make (len cont) in - - (* callback called when a future in [a] is resolved *) - let on_res = function - | Ok _ -> - let n = A.fetch_and_add missing (-1) in - if n = 1 then ( - (* last future, we know they all succeeded, so resolve [fut] *) - let res = map peek_ok_assert_ cont in - fulfill promise (Ok res) - ) - | Error e_bt -> - (* immediately cancel all other [on_res] *) - let n = A.exchange missing 0 in - if n > 0 then - (* we're the only one to set to 0, so we can fulfill [fut] - with an error. *) - fulfill promise (Error e_bt) - in - - iter (fun fut -> on_result fut on_res) cont; - fut - - let join_array (a : _ t array) : _ array t = - match Array.length a with - | 0 -> return [||] - | 1 -> map ?on:None a.(1) ~f:(fun x -> [| x |]) - | _ -> join_container_ ~len:Array.length ~map:Array.map ~iter:Array.iter a - - let join_list (l : _ t list) : _ list t = - match l with - | [] -> return [] - | [ x ] -> map ?on:None x ~f:(fun x -> [ x ]) - | _ -> join_container_ ~len:List.length ~map:List.map ~iter:List.iter l - - let wait_array (a : _ t array) : unit t = - join_container_ a ~iter:Array.iter ~len:Array.length ~map:(fun _f _ -> ()) - - let wait_list (a : _ t list) : unit t = - join_container_ a ~iter:List.iter ~len:List.length ~map:(fun _f _ -> ()) - - let for_ ~on n f : unit t = - let futs = Array.init n (fun i -> spawn ~on (fun () -> f i)) in - join_container_ - ~len:(fun () -> n) - ~iter:(fun f () -> Array.iter f futs) - ~map:(fun _f () -> ()) - () - - (* ### blocking ### *) - - let wait_block (self : 'a t) : 'a or_error = - match A.get self.st with - | Done x -> x (* fast path *) - | Waiting _ -> - let real_block () = - (* use queue only once *) - let q = S_queue.create () in - on_result self (fun r -> S_queue.push q r); - S_queue.pop q - in - - (* a bit of spinlock *) - let rec loop i = - if i = 0 then - real_block () - else ( - match A.get self.st with - | Done x -> x - | Waiting _ -> - Domain_.relax (); - (loop [@tailcall]) (i - 1) - ) - in - loop 50 - - let wait_block_exn self = - match wait_block self with - | Ok x -> x - | Error (e, bt) -> Printexc.raise_with_backtrace e bt - - module type INFIX = sig - val ( >|= ) : 'a t -> ('a -> 'b) -> 'b t - val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t - val ( let+ ) : 'a t -> ('a -> 'b) -> 'b t - val ( and+ ) : 'a t -> 'b t -> ('a * 'b) t - val ( let* ) : 'a t -> ('a -> 'b t) -> 'b t - val ( and* ) : 'a t -> 'b t -> ('a * 'b) t - end - - module Infix_ (X : sig - val pool : Pool.t option - end) : INFIX = struct - let[@inline] ( >|= ) x f = map ?on:X.pool ~f x - let[@inline] ( >>= ) x f = bind ?on:X.pool ~f x - let ( let+ ) = ( >|= ) - let ( let* ) = ( >>= ) - let ( and+ ) = both - let ( and* ) = both - end - - include Infix_ (struct - let pool = None - end) - - module Infix (X : sig - val pool : Pool.t - end) = - Infix_ (struct - let pool = Some X.pool - end) -end +module Pool = Pool +module Fut = Fut diff --git a/src/moonpool.mli b/src/moonpool.mli index 9d53593f..625d15f7 100644 --- a/src/moonpool.mli +++ b/src/moonpool.mli @@ -6,182 +6,11 @@ type 'a or_error = ('a, exn * Printexc.raw_backtrace) result -(** Thread pool *) -module Pool : sig - type t - - type thread_loop_wrapper = - thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit - (** a thread wrapper [f] takes the current thread, the current pool, - and the worker function [loop : unit -> unit] which is - the worker's main loop, and returns a new loop function. - By default it just returns the same loop function but it can be used - to install tracing, effect handlers, etc. *) - - val add_global_thread_loop_wrapper : thread_loop_wrapper -> unit - (** [add_global_thread_loop_wrapper f] installs [f] to be installed in every new pool worker - thread, for all existing pools, and all new pools created with [create]. - These wrappers accumulate: they all apply, but their order is not specified. *) - - val create : - ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> - ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> - ?thread_wrappers:thread_loop_wrapper list -> - ?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> - ?min:int -> - ?per_domain:int -> - unit -> - t - (** [create ()] makes a new thread pool. - @param on_init_thread called at the beginning of each new thread - in the pool. - @param on_exit_thread called at the end of each thread in the pool - @param thread_wrappers a list of {!thread_loop_wrapper} functions - to use for this pool's workers. - *) - - val shutdown : t -> unit - (** Shutdown the pool and wait for it to terminate. Idempotent. *) - - val run : t -> (unit -> unit) -> unit - (** [run pool f] schedules [f] for later execution on the pool - in one of the threads. *) -end +module Pool = Pool val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t (** Similar to {!Thread.create}, but it picks a background domain at random to run the thread. This ensures that we don't always pick the same domain to run all the various threads needed in an application (timers, event loops, etc.) *) -(** Futures *) -module Fut : sig - type 'a t - (** A future with a result of type ['a] *) - - type 'a promise - (** A promise, which can be fulfilled exactly once to set - the corresponding future *) - - val make : unit -> 'a t * 'a promise - (** Make a new future with the associated promise *) - - val on_result : 'a t -> ('a or_error -> unit) -> unit - (** [on_result fut f] registers [f] to be called in the future - when [fut] is set ; - or calls [f] immediately if [fut] is already set. *) - - exception Already_fulfilled - - val fulfill : 'a promise -> 'a or_error -> unit - (** Fullfill the promise, setting the future at the same time. - @raise Already_fulfilled if the promise is already fulfilled. *) - - val fulfill_idempotent : 'a promise -> 'a or_error -> unit - (** Fullfill the promise, setting the future at the same time. - Does nothing if the promise is already fulfilled. *) - - val return : 'a -> 'a t - (** Already settled future, with a result *) - - val fail : exn -> Printexc.raw_backtrace -> _ t - (** Already settled future, with a failure *) - - val of_result : 'a or_error -> 'a t - - val is_resolved : _ t -> bool - (** [is_resolved fut] is [true] iff [fut] is resolved. *) - - val peek : 'a t -> 'a or_error option - (** [peek fut] returns [Some r] if [fut] is currently resolved with [r], - and [None] if [fut] is not resolved yet. *) - - (** {2 Combinators} *) - - val spawn : on:Pool.t -> (unit -> 'a) -> 'a t - (** [spaw ~on f] runs [f()] on the given pool, and return a future that will - hold its result. *) - - val map : ?on:Pool.t -> f:('a -> 'b) -> 'a t -> 'b t - (** [map ?on ~f fut] returns a new future [fut2] that resolves - with [f x] if [fut] resolved with [x]; - and fails with [e] if [fut] fails with [e] or [f x] raises [e]. - @param on if provided, [f] runs on the given pool *) - - val bind : ?on:Pool.t -> f:('a -> 'b t) -> 'a t -> 'b t - (** [map ?on ~f fut] returns a new future [fut2] that resolves - like the future [f x] if [fut] resolved with [x]; - and fails with [e] if [fut] fails with [e] or [f x] raises [e]. - @param on if provided, [f] runs on the given pool *) - - val both : 'a t -> 'b t -> ('a * 'b) t - (** [both a b] succeeds with [x, y] if [a] succeeds with [x] and - [b] succeeds with [y], or fails if any of them fails. *) - - val choose : 'a t -> 'b t -> ('a, 'b) Either.t t - (** [choose a b] succeeds [Left x] or [Right y] if [a] succeeds with [x] or - [b] succeeds with [y], or fails if both of them fails. - If they both succeed, it is not specified which result is used. *) - - val choose_same : 'a t -> 'a t -> 'a t - (** [choose_same a b] succeeds with the value of one of [a] or [b] if - they succeed, or fails if both fail. - If they both succeed, it is not specified which result is used. *) - - val join_array : 'a t array -> 'a array t - (** Wait for all the futures in the array. Fails if any future fails. *) - - val join_list : 'a t list -> 'a list t - (** Wait for all the futures in the list. Fails if any future fails. *) - - val wait_array : _ t array -> unit t - (** [wait_array arr] waits for all futures in [arr] to resolve. It discards - the individual results of futures in [arr]. It fails if any future fails. *) - - val wait_list : _ t list -> unit t - (** [wait_list l] waits for all futures in [l] to resolve. It discards - the individual results of futures in [l]. It fails if any future fails. *) - - val for_ : on:Pool.t -> int -> (int -> unit) -> unit t - (** [for_ ~on n f] runs [f 0], [f 1], …, [f (n-1)] on the pool, and returns - a future that resolves when all the tasks have resolved, or fails - as soon as one task has failed. *) - - (** {2 Blocking} *) - - val wait_block : 'a t -> 'a or_error - (** [wait_block fut] blocks the current thread until [fut] is resolved, - and returns its value. - - A word of warning: this will monopolize the calling thread until the future - resolves. This can also easily cause deadlocks, if enough threads in a pool - call [wait_block] on futures running on the same pool or a pool depending on it. - - A good rule to avoid deadlocks is to run this from outside of any pool, - or to have an acyclic order between pools where [wait_block] - is only called from a pool on futures evaluated in a pool that comes lower - in the hierarchy. - If this rule is broken, it is possible for all threads in a pool to wait - for futures that can only make progress on these same threads, - hence the deadlock. - *) - - val wait_block_exn : 'a t -> 'a - (** Same as {!wait_block} but re-raises the exception if the future failed. *) - - module type INFIX = sig - val ( >|= ) : 'a t -> ('a -> 'b) -> 'b t - val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t - val ( let+ ) : 'a t -> ('a -> 'b) -> 'b t - val ( and+ ) : 'a t -> 'b t -> ('a * 'b) t - val ( let* ) : 'a t -> ('a -> 'b t) -> 'b t - val ( and* ) : 'a t -> 'b t -> ('a * 'b) t - end - - include INFIX - (** Operators that run on the same thread *) - - (** Make infix combinators *) - module Infix (_ : sig - val pool : Pool.t - end) : INFIX -end +module Fut = Fut diff --git a/src/pool.ml b/src/pool.ml new file mode 100644 index 00000000..b1d744ba --- /dev/null +++ b/src/pool.ml @@ -0,0 +1,117 @@ +(* TODO: use a better queue for the tasks *) + +module A = Atomic_ + +type t = { + active: bool A.t; + threads: Thread.t array; + q: (unit -> unit) S_queue.t; +} + +type thread_loop_wrapper = + thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit + +let global_thread_wrappers_ : thread_loop_wrapper list A.t = A.make [] + +let add_global_thread_loop_wrapper f : unit = + while + let l = A.get global_thread_wrappers_ in + not (A.compare_and_set global_thread_wrappers_ l (f :: l)) + do + () + done + +let[@inline] run self f : unit = S_queue.push self.q f + +let worker_thread_ ~on_exn (active : bool A.t) (q : _ S_queue.t) : unit = + while A.get active do + let task = S_queue.pop q in + try task () + with e -> + let bt = Printexc.get_raw_backtrace () in + on_exn e bt + done + +let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = () + +let create ?(on_init_thread = default_thread_init_exit_) + ?(on_exit_thread = default_thread_init_exit_) ?(thread_wrappers = []) + ?(on_exn = fun _ _ -> ()) ?(min = 1) ?(per_domain = 0) () : t = + (* number of threads to run *) + let min = max 1 min in + let n_domains = D_pool_.n_domains () in + assert (n_domains >= 1); + let n = max min (n_domains * per_domain) in + + (* make sure we don't bias towards the first domain(s) in {!D_pool_} *) + let offset = Random.int n_domains in + + let active = A.make true in + let q = S_queue.create () in + + let pool = + let dummy = Thread.self () in + { active; threads = Array.make n dummy; q } + in + + (* temporary queue used to obtain thread handles from domains + on which the thread are started. *) + let receive_threads = S_queue.create () in + + (* start the thread with index [i] *) + let start_thread_with_idx i = + let dom_idx = (offset + i) mod n_domains in + + (* function run in the thread itself *) + let main_thread_fun () = + let thread = Thread.self () in + let t_id = Thread.id thread in + on_init_thread ~dom_id:dom_idx ~t_id (); + + let all_wrappers = + List.rev_append thread_wrappers (A.get global_thread_wrappers_) + in + + let run () = worker_thread_ ~on_exn active q in + (* the actual worker loop is [worker_thread_], with all + wrappers for this pool and for all pools (global_thread_wrappers_) *) + let run' = + List.fold_left (fun run f -> f ~thread ~pool run) run all_wrappers + in + + (* now run the main loop *) + run' (); + on_exit_thread ~dom_id:dom_idx ~t_id () + in + + (* function called in domain with index [i], to + create the thread and push it into [receive_threads] *) + let create_thread_in_domain () = + let thread = Thread.create main_thread_fun () in + (* send the thread from the domain back to us *) + S_queue.push receive_threads (i, thread) + in + + D_pool_.run_on dom_idx create_thread_in_domain + in + + (* start all threads, placing them on the domains + according to their index and [offset] in a round-robin fashion. *) + for i = 0 to n - 1 do + start_thread_with_idx i + done; + + (* receive the newly created threads back from domains *) + for _j = 1 to n do + let i, th = S_queue.pop receive_threads in + pool.threads.(i) <- th + done; + pool + +let shutdown (self : t) : unit = + let was_active = A.exchange self.active false in + (* make sure to wakeup all the sleeping threads by scheduling one task each. + This way, a thread that is asleep, waiting for tasks, + will wakeup to process this trivial task, check [self.active], and terminate. *) + if was_active then Array.iter (fun _ -> run self ignore) self.threads; + Array.iter Thread.join self.threads diff --git a/src/pool.mli b/src/pool.mli new file mode 100644 index 00000000..aa9b361c --- /dev/null +++ b/src/pool.mli @@ -0,0 +1,40 @@ +(** Thread pool *) + +type t + +type thread_loop_wrapper = + thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit +(** a thread wrapper [f] takes the current thread, the current pool, + and the worker function [loop : unit -> unit] which is + the worker's main loop, and returns a new loop function. + By default it just returns the same loop function but it can be used + to install tracing, effect handlers, etc. *) + +val add_global_thread_loop_wrapper : thread_loop_wrapper -> unit +(** [add_global_thread_loop_wrapper f] installs [f] to be installed in every new pool worker + thread, for all existing pools, and all new pools created with [create]. + These wrappers accumulate: they all apply, but their order is not specified. *) + +val create : + ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> + ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> + ?thread_wrappers:thread_loop_wrapper list -> + ?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> + ?min:int -> + ?per_domain:int -> + unit -> + t +(** [create ()] makes a new thread pool. + @param on_init_thread called at the beginning of each new thread + in the pool. + @param on_exit_thread called at the end of each thread in the pool + @param thread_wrappers a list of {!thread_loop_wrapper} functions + to use for this pool's workers. + *) + +val shutdown : t -> unit +(** Shutdown the pool and wait for it to terminate. Idempotent. *) + +val run : t -> (unit -> unit) -> unit +(** [run pool f] schedules [f] for later execution on the pool + in one of the threads. *) diff --git a/src/s_queue.ml b/src/s_queue.ml new file mode 100644 index 00000000..8a3e8280 --- /dev/null +++ b/src/s_queue.ml @@ -0,0 +1,28 @@ +type 'a t = { + mutex: Mutex.t; + cond: Condition.t; + q: 'a Queue.t; +} + +let create () : _ t = + { mutex = Mutex.create (); cond = Condition.create (); q = Queue.create () } + +let push (self : _ t) x : unit = + Mutex.lock self.mutex; + Queue.push x self.q; + Condition.signal self.cond; + Mutex.unlock self.mutex + +let pop (self : 'a t) : 'a = + Mutex.lock self.mutex; + let rec loop () = + if Queue.is_empty self.q then ( + Condition.wait self.cond self.mutex; + (loop [@tailcall]) () + ) else ( + let x = Queue.pop self.q in + Mutex.unlock self.mutex; + x + ) + in + loop () diff --git a/src/s_queue.mli b/src/s_queue.mli new file mode 100644 index 00000000..6adad6da --- /dev/null +++ b/src/s_queue.mli @@ -0,0 +1,7 @@ +(** Simple blocking queue *) + +type 'a t + +val create : unit -> _ t +val push : 'a t -> 'a -> unit +val pop : 'a t -> 'a