initial commit

This commit is contained in:
Simon Cruanes 2023-05-30 23:03:05 -04:00
commit 1619f8b773
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
12 changed files with 535 additions and 0 deletions

38
.github/workflows/main.yml vendored Normal file
View file

@ -0,0 +1,38 @@
name: Build and Test
on:
push:
branches:
- master
pull_request:
jobs:
run:
name: build
strategy:
fail-fast: true
matrix:
os:
- macos-latest
- ubuntu-latest
#- windows-latest
ocaml-compiler:
- '4.03.x'
- '4.14.x'
- '5.0.x'
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@main
- name: Use OCaml ${{ matrix.ocaml-compiler }}
uses: ocaml/setup-ocaml@v2
with:
ocaml-compiler: ${{ matrix.ocaml-compiler }}
dune-cache: true
- run: opam install -t moonpool --deps-only
- run: opam exec -- dune build '@install'
- run: opam exec -- dune runtest

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
_build
_opam

15
.ocamlformat Normal file
View file

@ -0,0 +1,15 @@
version = 0.24.1
profile=conventional
margin=80
if-then-else=k-r
parens-ite=true
parens-tuple=multi-line-only
sequence-style=terminator
type-decl=sparse
break-cases=toplevel
cases-exp-indent=2
field-space=tight-decl
leading-nested-match-parens=true
module-item-spacing=compact
quiet=true
ocaml-version=4.08.0

11
Makefile Normal file
View file

@ -0,0 +1,11 @@
DUNE_OPTS?=
all:
dune build @all $(DUNE_OPTS)
clean:
@dune clean
WATCH?=@all
watch:
dune build $(DUNE_OPTS) -w $(WATCH)

5
dune Normal file
View file

@ -0,0 +1,5 @@
(env
(_ (flags :standard -strict-sequence -warn-error -a+8 -w +a-40-70)))
(mdx)

33
dune-project Normal file
View file

@ -0,0 +1,33 @@
(lang dune 3.0)
(using mdx 0.2)
(name moonpool)
(generate_opam_files true)
(source
(github c-cube/moonpool))
(authors "Simon Cruanes")
(maintainers "Simon Cruanes")
(license MIT)
;(documentation https://url/to/documentation)
(package
(name moonpool)
(synopsis "Pools of threads supported by a pool of domains")
(depends
ocaml
dune
(mdx
(and
(>= 1.9.0)
:with-test)))
(tags
(thread pool domain)))
; See the complete stanza docs at https://dune.readthedocs.io/en/stable/dune-files.html#dune-project

30
moonpool.opam Normal file
View file

@ -0,0 +1,30 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
synopsis: "Pools of threads supported by a pool of domains"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]
license: "MIT"
tags: ["thread" "pool" "domain"]
homepage: "https://github.com/c-cube/moonpool"
bug-reports: "https://github.com/c-cube/moonpool/issues"
depends: [
"ocaml"
"dune" {>= "3.0"}
"mdx" {>= "1.9.0" & with-test}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/c-cube/moonpool.git"

18
src/dune Normal file
View file

@ -0,0 +1,18 @@
(library
(public_name moonpool)
(name moonpool)
(private_modules atomic_ domain_)
(libraries threads))
(rule
(targets atomic_.ml)
(action
(with-stdout-to %{targets}
(run ./gen/gen.exe --ocaml %{ocaml_version} --atomic))))
(rule
(targets domain_.ml)
(action
(with-stdout-to %{targets}
(run ./gen/gen.exe --ocaml %{ocaml_version} --domain))))

3
src/gen/dune Normal file
View file

@ -0,0 +1,3 @@
(executable
(name gen))

104
src/gen/gen.ml Normal file
View file

@ -0,0 +1,104 @@
let atomic_pre_412 =
{|
type 'a t = { mutable x: 'a }
let[@inline] make x = { x }
let[@inline] get { x } = x
let[@inline] set r x = r.x <- x
let[@inline never] exchange r x =
(* atomic *)
let y = r.x in
r.x <- x;
(* atomic *)
y
let[@inline never] compare_and_set r seen v =
(* atomic *)
if r.x == seen then (
r.x <- v;
(* atomic *)
true
) else
false
let[@inline never] fetch_and_add r x =
(* atomic *)
let v = r.x in
r.x <- x + r.x;
(* atomic *)
v
let[@inline never] incr r =
(* atomic *)
r.x <- 1 + r.x
(* atomic *)
let[@inline never] decr r =
(* atomic *)
r.x <- r.x - 1
(* atomic *)
|}
let atomic_post_412 = {|
include Atomic
|}
let domain_pre_5 =
{|
let recommended_number () = 1
type t = Thread.t
let get_id (self:t) : int = Thread.id self
let spawn_on f : t =
Thread.create f ()
|}
let domain_post_5 =
{|
let recommended_number () = Domain.recommended_domain_count ()
type t = unit Domain.t
let get_id (self:t) : int = (Domain.get_id self :> int)
let spawn f : t =
Domain.spawn f ()
|}
let p_version s = Scanf.sscanf s "%d.%d" (fun x y -> x, y)
let () =
let atomic = ref false in
let domain = ref false in
let ocaml = ref Sys.ocaml_version in
Arg.parse
[
"--atomic", Arg.Set atomic, " atomic";
"--domain", Arg.Set domain, " domain";
"--ocaml", Arg.Set_string ocaml, " set ocaml version";
]
ignore "";
let major, minor = p_version !ocaml in
if !atomic then (
let code =
if (major, minor) < (4, 12) then
atomic_pre_412
else
atomic_post_412
in
print_endline code
) else if !domain then (
let code =
if (major, minor) < (5, 0) then
domain_pre_5
else
domain_post_5
in
print_endline code
)

212
src/moonpool.ml Normal file
View file

@ -0,0 +1,212 @@
module A = Atomic_
type 'a or_error = ('a, exn * Printexc.raw_backtrace) result
(** Simple blocking queue *)
module S_queue : sig
type 'a t
val create : unit -> _ t
val push : 'a t -> 'a -> unit
val pop : 'a t -> 'a
end = struct
type 'a t = {
mutex: Mutex.t;
cond: Condition.t;
q: 'a Queue.t;
}
let create () : _ t =
{ mutex = Mutex.create (); cond = Condition.create (); q = Queue.create () }
let push (self : _ t) x : unit =
Mutex.lock self.mutex;
Queue.push x self.q;
Condition.signal self.cond;
Mutex.unlock self.mutex
let pop (self : 'a t) : 'a =
Mutex.lock self.mutex;
let rec loop () =
if Queue.is_empty self.q then (
Condition.wait self.cond self.mutex;
(loop [@tailcall]) ()
) else (
let x = Queue.pop self.q in
Mutex.unlock self.mutex;
x
)
in
loop ()
end
(** Static pool of domains *)
module D_pool_ = struct
type domain = Domain_.t
let work_ _i q : unit =
while true do
let f = S_queue.pop q in
try f () with _ -> ()
done
(* A domain level worker. It should not do too much except for starting
new threads for pools. *)
type worker = { q: (unit -> unit) S_queue.t } [@@unboxed]
let domains_ : worker array lazy_t =
lazy
(let n = Domain_.recommended_number () in
Array.init n (fun i ->
let q = S_queue.create () in
let _domain : domain = Domain_.spawn_on (fun () -> work_ i q) in
{ q }))
(** Number of domains in the pool *)
let[@inline] n_domains () : int = Array.length (Lazy.force domains_)
let run_on (i : int) (f : unit -> unit) : unit =
let (lazy arr) = domains_ in
assert (i < Array.length arr);
S_queue.push arr.(i).q f
end
module Pool = struct
(* TODO: use a better queue for the tasks *)
type t = {
active: bool A.t;
threads: Thread.t array;
q: (unit -> unit) S_queue.t;
}
let[@inline] run self f : unit = S_queue.push self.q f
let worker_thread_ ~on_exn (active : bool A.t) (q : _ S_queue.t) : unit =
while A.get active do
let task = S_queue.pop q in
try task ()
with e ->
let bt = Printexc.get_raw_backtrace () in
on_exn e bt
done
let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = ()
let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
?(min = 1) ?(per_domain = 0) () : t =
(* number of threads to run *)
let min = max 1 min in
let n_domains = D_pool_.n_domains () in
assert (n_domains >= 1);
let n = max min (n_domains * per_domain) in
(* make sure we don't bias towards the first domain(s) in {!D_pool_} *)
let offset = Random.int n_domains in
let active = A.make true in
let q = S_queue.create () in
let receive_threads = S_queue.create () in
(* start the thread with index [i] *)
let start_thread_with_idx i =
let dom_idx = (offset + i) mod n_domains in
let create () =
let thread =
Thread.create
(fun () ->
let t_id = Thread.id @@ Thread.self () in
on_init_thread ~dom_id:dom_idx ~t_id ();
worker_thread_ ~on_exn active q;
on_exit_thread ~dom_id:dom_idx ~t_id ())
()
in
(* send the thread from the domain back to us *)
S_queue.push receive_threads (i, thread)
in
D_pool_.run_on dom_idx create
in
(* start all threads, placing them on the domains
according to their index and [offset] in a round-robin fashion. *)
let threads =
let dummy = Thread.self () in
Array.init n (fun i ->
start_thread_with_idx i;
dummy)
in
(* receive the newly created threads back from domains *)
for _j = 1 to n do
let i, th = S_queue.pop receive_threads in
threads.(i) <- th
done;
{ active; threads; q }
let shutdown (self : t) : unit =
let was_active = A.exchange self.active false in
(* make sure to wakeup all the sleeping threads by scheduling one task each.
This way, a thread that is asleep, waiting for tasks,
will wakeup to process this trivial task, check [self.active], and terminate. *)
if was_active then Array.iter (fun _ -> run self ignore) self.threads;
Array.iter Thread.join self.threads
end
module Fut = struct
type 'a waiter = 'a or_error -> unit
type 'a state =
| Done of 'a or_error
| Waiting of 'a waiter list
type 'a t = { st: 'a state A.t } [@@unboxed]
type 'a promise = 'a t
let make () =
let fut = { st = A.make (Waiting []) } in
fut, fut
let of_result x : _ t = { st = A.make (Done x) }
let[@inline] return x : _ t = of_result (Ok x)
let[@inline] fail e bt : _ t = of_result (Error (e, bt))
let on_result (self : _ t) (f : _ waiter) : unit =
while
let st = A.get self.st in
match st with
| Done x ->
f x;
false
| Waiting l ->
let must_retry =
not (A.compare_and_set self.st st (Waiting (f :: l)))
in
must_retry
do
()
done
exception Already_fulfilled
let fulfill (self : _ t) (r : _ result) : unit =
while
let st = A.get self.st in
match st with
| Done _ -> raise Already_fulfilled
| Waiting l ->
let did_swap = A.compare_and_set self.st st (Done r) in
if did_swap then (
(* success, now call all the waiters *)
List.iter (fun f -> try f r with _ -> ()) l;
false
) else
true
do
()
done
end

64
src/moonpool.mli Normal file
View file

@ -0,0 +1,64 @@
(** Moonpool
A pool within a bigger pool (ie the ocean). Here, we're talking about
pools of [Thread.t] which live within a fixed pool of [Domain.t].
*)
type 'a or_error = ('a, exn * Printexc.raw_backtrace) result
(** Thread pool *)
module Pool : sig
type t
val create :
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?min:int ->
?per_domain:int ->
unit ->
t
(** [create ()] makes a new thread pool.
@param on_init_thread called at the beginning of each new thread
in the pool.
*)
val shutdown : t -> unit
(** Shutdown the pool and wait for it to terminate. Idempotent. *)
val run : t -> (unit -> unit) -> unit
(** [run pool f] schedules [f] for later execution on the pool
in one of the threads. *)
end
(** Futures *)
module Fut : sig
type 'a t
(** A future with a result of type ['a] *)
type 'a promise
(** A promise, which can be fulfilled exactly once to set
the corresponding future *)
val make : unit -> 'a t * 'a promise
(** Make a new future with the associated promise *)
val on_result : 'a t -> ('a or_error -> unit) -> unit
(** [on_result fut f] registers [f] to be called in the future
when [fut] is set ;
or calls [f] immediately if [fut] is already set. *)
exception Already_fulfilled
val fulfill : 'a promise -> 'a or_error -> unit
(** Fullfill the promise, setting the future at the same time.
@raise Already_fulfilled if the promise is already fulfilled. *)
val return : 'a -> 'a t
(** Already settled future, with a result *)
val fail : exn -> Printexc.raw_backtrace -> _ t
(** Already settled future, with a failure *)
val of_result : 'a or_error -> 'a t
end