commit 1619f8b773a9d2f1b634880ed851b3b85e25d9ca Author: Simon Cruanes Date: Tue May 30 23:03:05 2023 -0400 initial commit diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 00000000..b2fab248 --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,38 @@ +name: Build and Test + +on: + push: + branches: + - master + pull_request: + +jobs: + run: + name: build + strategy: + fail-fast: true + matrix: + os: + - macos-latest + - ubuntu-latest + #- windows-latest + ocaml-compiler: + - '4.03.x' + - '4.14.x' + - '5.0.x' + + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@main + - name: Use OCaml ${{ matrix.ocaml-compiler }} + uses: ocaml/setup-ocaml@v2 + with: + ocaml-compiler: ${{ matrix.ocaml-compiler }} + dune-cache: true + + - run: opam install -t moonpool --deps-only + + - run: opam exec -- dune build '@install' + + - run: opam exec -- dune runtest + diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..76301f0c --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +_build +_opam diff --git a/.ocamlformat b/.ocamlformat new file mode 100644 index 00000000..2124d7dd --- /dev/null +++ b/.ocamlformat @@ -0,0 +1,15 @@ +version = 0.24.1 +profile=conventional +margin=80 +if-then-else=k-r +parens-ite=true +parens-tuple=multi-line-only +sequence-style=terminator +type-decl=sparse +break-cases=toplevel +cases-exp-indent=2 +field-space=tight-decl +leading-nested-match-parens=true +module-item-spacing=compact +quiet=true +ocaml-version=4.08.0 diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..1a091758 --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ + +DUNE_OPTS?= +all: + dune build @all $(DUNE_OPTS) + +clean: + @dune clean + +WATCH?=@all +watch: + dune build $(DUNE_OPTS) -w $(WATCH) diff --git a/dune b/dune new file mode 100644 index 00000000..4227a1ef --- /dev/null +++ b/dune @@ -0,0 +1,5 @@ + +(env + (_ (flags :standard -strict-sequence -warn-error -a+8 -w +a-40-70))) + +(mdx) diff --git a/dune-project b/dune-project new file mode 100644 index 00000000..c8cf0cd4 --- /dev/null +++ b/dune-project @@ -0,0 +1,33 @@ +(lang dune 3.0) + +(using mdx 0.2) + +(name moonpool) + +(generate_opam_files true) + +(source + (github c-cube/moonpool)) + +(authors "Simon Cruanes") + +(maintainers "Simon Cruanes") + +(license MIT) + +;(documentation https://url/to/documentation) + +(package + (name moonpool) + (synopsis "Pools of threads supported by a pool of domains") + (depends + ocaml + dune + (mdx + (and + (>= 1.9.0) + :with-test))) + (tags + (thread pool domain))) + +; See the complete stanza docs at https://dune.readthedocs.io/en/stable/dune-files.html#dune-project diff --git a/moonpool.opam b/moonpool.opam new file mode 100644 index 00000000..64dc845e --- /dev/null +++ b/moonpool.opam @@ -0,0 +1,30 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Pools of threads supported by a pool of domains" +maintainer: ["Simon Cruanes"] +authors: ["Simon Cruanes"] +license: "MIT" +tags: ["thread" "pool" "domain"] +homepage: "https://github.com/c-cube/moonpool" +bug-reports: "https://github.com/c-cube/moonpool/issues" +depends: [ + "ocaml" + "dune" {>= "3.0"} + "mdx" {>= "1.9.0" & with-test} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/c-cube/moonpool.git" diff --git a/src/dune b/src/dune new file mode 100644 index 00000000..765785b1 --- /dev/null +++ b/src/dune @@ -0,0 +1,18 @@ +(library + (public_name moonpool) + (name moonpool) + (private_modules atomic_ domain_) + (libraries threads)) + +(rule + (targets atomic_.ml) + (action + (with-stdout-to %{targets} + (run ./gen/gen.exe --ocaml %{ocaml_version} --atomic)))) + +(rule + (targets domain_.ml) + (action + (with-stdout-to %{targets} + (run ./gen/gen.exe --ocaml %{ocaml_version} --domain)))) + diff --git a/src/gen/dune b/src/gen/dune new file mode 100644 index 00000000..cd463d88 --- /dev/null +++ b/src/gen/dune @@ -0,0 +1,3 @@ + +(executable + (name gen)) diff --git a/src/gen/gen.ml b/src/gen/gen.ml new file mode 100644 index 00000000..f2212b6d --- /dev/null +++ b/src/gen/gen.ml @@ -0,0 +1,104 @@ +let atomic_pre_412 = + {| +type 'a t = { mutable x: 'a } + +let[@inline] make x = { x } +let[@inline] get { x } = x +let[@inline] set r x = r.x <- x + +let[@inline never] exchange r x = + (* atomic *) + let y = r.x in + r.x <- x; + (* atomic *) + y + +let[@inline never] compare_and_set r seen v = + (* atomic *) + if r.x == seen then ( + r.x <- v; + (* atomic *) + true + ) else + false + +let[@inline never] fetch_and_add r x = + (* atomic *) + let v = r.x in + r.x <- x + r.x; + (* atomic *) + v + +let[@inline never] incr r = + (* atomic *) + r.x <- 1 + r.x +(* atomic *) + +let[@inline never] decr r = + (* atomic *) + r.x <- r.x - 1 +(* atomic *) + +|} + +let atomic_post_412 = {| +include Atomic +|} + +let domain_pre_5 = + {| +let recommended_number () = 1 + +type t = Thread.t + +let get_id (self:t) : int = Thread.id self + +let spawn_on f : t = + Thread.create f () +|} + +let domain_post_5 = + {| +let recommended_number () = Domain.recommended_domain_count () + +type t = unit Domain.t + +let get_id (self:t) : int = (Domain.get_id self :> int) + +let spawn f : t = + Domain.spawn f () +|} + +let p_version s = Scanf.sscanf s "%d.%d" (fun x y -> x, y) + +let () = + let atomic = ref false in + let domain = ref false in + let ocaml = ref Sys.ocaml_version in + Arg.parse + [ + "--atomic", Arg.Set atomic, " atomic"; + "--domain", Arg.Set domain, " domain"; + "--ocaml", Arg.Set_string ocaml, " set ocaml version"; + ] + ignore ""; + + let major, minor = p_version !ocaml in + + if !atomic then ( + let code = + if (major, minor) < (4, 12) then + atomic_pre_412 + else + atomic_post_412 + in + print_endline code + ) else if !domain then ( + let code = + if (major, minor) < (5, 0) then + domain_pre_5 + else + domain_post_5 + in + print_endline code + ) diff --git a/src/moonpool.ml b/src/moonpool.ml new file mode 100644 index 00000000..8a2ef573 --- /dev/null +++ b/src/moonpool.ml @@ -0,0 +1,212 @@ +module A = Atomic_ + +type 'a or_error = ('a, exn * Printexc.raw_backtrace) result + +(** Simple blocking queue *) +module S_queue : sig + type 'a t + + val create : unit -> _ t + val push : 'a t -> 'a -> unit + val pop : 'a t -> 'a +end = struct + type 'a t = { + mutex: Mutex.t; + cond: Condition.t; + q: 'a Queue.t; + } + + let create () : _ t = + { mutex = Mutex.create (); cond = Condition.create (); q = Queue.create () } + + let push (self : _ t) x : unit = + Mutex.lock self.mutex; + Queue.push x self.q; + Condition.signal self.cond; + Mutex.unlock self.mutex + + let pop (self : 'a t) : 'a = + Mutex.lock self.mutex; + let rec loop () = + if Queue.is_empty self.q then ( + Condition.wait self.cond self.mutex; + (loop [@tailcall]) () + ) else ( + let x = Queue.pop self.q in + Mutex.unlock self.mutex; + x + ) + in + loop () +end + +(** Static pool of domains *) +module D_pool_ = struct + type domain = Domain_.t + + let work_ _i q : unit = + while true do + let f = S_queue.pop q in + try f () with _ -> () + done + + (* A domain level worker. It should not do too much except for starting + new threads for pools. *) + type worker = { q: (unit -> unit) S_queue.t } [@@unboxed] + + let domains_ : worker array lazy_t = + lazy + (let n = Domain_.recommended_number () in + Array.init n (fun i -> + let q = S_queue.create () in + let _domain : domain = Domain_.spawn_on (fun () -> work_ i q) in + { q })) + + (** Number of domains in the pool *) + let[@inline] n_domains () : int = Array.length (Lazy.force domains_) + + let run_on (i : int) (f : unit -> unit) : unit = + let (lazy arr) = domains_ in + assert (i < Array.length arr); + S_queue.push arr.(i).q f +end + +module Pool = struct + (* TODO: use a better queue for the tasks *) + + type t = { + active: bool A.t; + threads: Thread.t array; + q: (unit -> unit) S_queue.t; + } + + let[@inline] run self f : unit = S_queue.push self.q f + + let worker_thread_ ~on_exn (active : bool A.t) (q : _ S_queue.t) : unit = + while A.get active do + let task = S_queue.pop q in + try task () + with e -> + let bt = Printexc.get_raw_backtrace () in + on_exn e bt + done + + let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = () + + let create ?(on_init_thread = default_thread_init_exit_) + ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) + ?(min = 1) ?(per_domain = 0) () : t = + (* number of threads to run *) + let min = max 1 min in + let n_domains = D_pool_.n_domains () in + assert (n_domains >= 1); + let n = max min (n_domains * per_domain) in + + (* make sure we don't bias towards the first domain(s) in {!D_pool_} *) + let offset = Random.int n_domains in + + let active = A.make true in + let q = S_queue.create () in + + let receive_threads = S_queue.create () in + + (* start the thread with index [i] *) + let start_thread_with_idx i = + let dom_idx = (offset + i) mod n_domains in + + let create () = + let thread = + Thread.create + (fun () -> + let t_id = Thread.id @@ Thread.self () in + on_init_thread ~dom_id:dom_idx ~t_id (); + worker_thread_ ~on_exn active q; + on_exit_thread ~dom_id:dom_idx ~t_id ()) + () + in + (* send the thread from the domain back to us *) + S_queue.push receive_threads (i, thread) + in + + D_pool_.run_on dom_idx create + in + + (* start all threads, placing them on the domains + according to their index and [offset] in a round-robin fashion. *) + let threads = + let dummy = Thread.self () in + Array.init n (fun i -> + start_thread_with_idx i; + dummy) + in + + (* receive the newly created threads back from domains *) + for _j = 1 to n do + let i, th = S_queue.pop receive_threads in + threads.(i) <- th + done; + + { active; threads; q } + + let shutdown (self : t) : unit = + let was_active = A.exchange self.active false in + (* make sure to wakeup all the sleeping threads by scheduling one task each. + This way, a thread that is asleep, waiting for tasks, + will wakeup to process this trivial task, check [self.active], and terminate. *) + if was_active then Array.iter (fun _ -> run self ignore) self.threads; + Array.iter Thread.join self.threads +end + +module Fut = struct + type 'a waiter = 'a or_error -> unit + + type 'a state = + | Done of 'a or_error + | Waiting of 'a waiter list + + type 'a t = { st: 'a state A.t } [@@unboxed] + type 'a promise = 'a t + + let make () = + let fut = { st = A.make (Waiting []) } in + fut, fut + + let of_result x : _ t = { st = A.make (Done x) } + let[@inline] return x : _ t = of_result (Ok x) + let[@inline] fail e bt : _ t = of_result (Error (e, bt)) + + let on_result (self : _ t) (f : _ waiter) : unit = + while + let st = A.get self.st in + match st with + | Done x -> + f x; + false + | Waiting l -> + let must_retry = + not (A.compare_and_set self.st st (Waiting (f :: l))) + in + must_retry + do + () + done + + exception Already_fulfilled + + let fulfill (self : _ t) (r : _ result) : unit = + while + let st = A.get self.st in + match st with + | Done _ -> raise Already_fulfilled + | Waiting l -> + let did_swap = A.compare_and_set self.st st (Done r) in + if did_swap then ( + (* success, now call all the waiters *) + List.iter (fun f -> try f r with _ -> ()) l; + false + ) else + true + do + () + done +end diff --git a/src/moonpool.mli b/src/moonpool.mli new file mode 100644 index 00000000..273eef63 --- /dev/null +++ b/src/moonpool.mli @@ -0,0 +1,64 @@ +(** Moonpool + + A pool within a bigger pool (ie the ocean). Here, we're talking about + pools of [Thread.t] which live within a fixed pool of [Domain.t]. +*) + +type 'a or_error = ('a, exn * Printexc.raw_backtrace) result + +(** Thread pool *) +module Pool : sig + type t + + val create : + ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> + ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> + ?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> + ?min:int -> + ?per_domain:int -> + unit -> + t + (** [create ()] makes a new thread pool. + @param on_init_thread called at the beginning of each new thread + in the pool. + *) + + val shutdown : t -> unit + (** Shutdown the pool and wait for it to terminate. Idempotent. *) + + val run : t -> (unit -> unit) -> unit + (** [run pool f] schedules [f] for later execution on the pool + in one of the threads. *) +end + +(** Futures *) +module Fut : sig + type 'a t + (** A future with a result of type ['a] *) + + type 'a promise + (** A promise, which can be fulfilled exactly once to set + the corresponding future *) + + val make : unit -> 'a t * 'a promise + (** Make a new future with the associated promise *) + + val on_result : 'a t -> ('a or_error -> unit) -> unit + (** [on_result fut f] registers [f] to be called in the future + when [fut] is set ; + or calls [f] immediately if [fut] is already set. *) + + exception Already_fulfilled + + val fulfill : 'a promise -> 'a or_error -> unit + (** Fullfill the promise, setting the future at the same time. + @raise Already_fulfilled if the promise is already fulfilled. *) + + val return : 'a -> 'a t + (** Already settled future, with a result *) + + val fail : exn -> Printexc.raw_backtrace -> _ t + (** Already settled future, with a failure *) + + val of_result : 'a or_error -> 'a t +end