mirror of
https://github.com/c-cube/moonpool.git
synced 2026-05-05 08:54:24 -04:00
Compare commits
10 commits
18701bfde4
...
f287d03bd5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f287d03bd5 | ||
|
|
c372afd2b5 | ||
|
|
1b59d8aaaf | ||
|
|
19e925b1fb | ||
|
|
a0f4c20f2b | ||
|
|
e2fb26b7fa | ||
|
|
d681231d9d | ||
|
|
471feb96d6 | ||
|
|
0ff2645de4 | ||
|
|
0d0db75f26 |
19 changed files with 241 additions and 70 deletions
17
.github/workflows/format.yml
vendored
Normal file
17
.github/workflows/format.yml
vendored
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
name: format
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
pull_request:
|
||||
|
||||
jobs:
|
||||
format:
|
||||
name: format
|
||||
runs-on: 'ubuntu-latest'
|
||||
container: ghcr.io/c-cube/c-cube-commmon/ci-doc-5.3:latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- run: opam exec -- make format-check
|
||||
|
||||
12
.github/workflows/gh-pages.yml
vendored
12
.github/workflows/gh-pages.yml
vendored
|
|
@ -9,20 +9,14 @@ jobs:
|
|||
deploy:
|
||||
name: Deploy doc
|
||||
runs-on: ubuntu-latest
|
||||
container: ghcr.io/c-cube/c-cube-commmon/ci-doc-5.3:latest
|
||||
steps:
|
||||
- uses: actions/checkout@main
|
||||
|
||||
- name: Use OCaml
|
||||
uses: ocaml/setup-ocaml@v3
|
||||
with:
|
||||
ocaml-compiler: '5.3'
|
||||
dune-cache: true
|
||||
allow-prerelease-opam: true
|
||||
- uses: actions/checkout@v6
|
||||
|
||||
# temporary until it's in a release
|
||||
- run: opam pin picos 0.6.0 -y -n
|
||||
|
||||
- run: opam install odig moonpool moonpool-lwt -t
|
||||
- run: opam install moonpool moonpool-lwt -t
|
||||
|
||||
- run: opam exec -- odig odoc --cache-dir=_doc/ moonpool moonpool-lwt
|
||||
|
||||
|
|
|
|||
50
.github/workflows/main.yml
vendored
50
.github/workflows/main.yml
vendored
|
|
@ -1,4 +1,4 @@
|
|||
name: Build and Test
|
||||
name: build
|
||||
|
||||
on:
|
||||
push:
|
||||
|
|
@ -13,33 +13,28 @@ jobs:
|
|||
strategy:
|
||||
fail-fast: true
|
||||
matrix:
|
||||
os:
|
||||
- ubuntu-latest
|
||||
ocaml-compiler:
|
||||
- '5.0'
|
||||
- '5.3'
|
||||
container:
|
||||
- ghcr.io/c-cube/c-cube-commmon/ci-5.0:latest
|
||||
- ghcr.io/c-cube/c-cube-commmon/ci-5.4:latest
|
||||
|
||||
runs-on: ${{ matrix.os }}
|
||||
runs-on: ubuntu-latest
|
||||
container: ${{ matrix.container }}
|
||||
steps:
|
||||
- uses: actions/checkout@main
|
||||
- name: Use OCaml ${{ matrix.ocaml-compiler }}
|
||||
uses: ocaml/setup-ocaml@v3
|
||||
with:
|
||||
ocaml-compiler: ${{ matrix.ocaml-compiler }}
|
||||
dune-cache: true
|
||||
allow-prerelease-opam: true
|
||||
|
||||
- uses: actions/checkout@v6
|
||||
- run: opam pin picos 0.6.0 -y -n
|
||||
|
||||
- run: opam install -t moonpool moonpool-lwt --deps-only
|
||||
- run: opam pin -y -n lwt 5.9.2
|
||||
- run: opam install -t . --deps-only
|
||||
- run: opam exec -- dune build @install
|
||||
|
||||
# install some depopts
|
||||
- run: opam install thread-local-storage trace hmap
|
||||
- run: opam pin -y -n lwt 6.1.1
|
||||
- run: opam install thread-local-storage trace hmap ambient-context.0.2
|
||||
- run: opam exec -- dune build --profile=release --force @install @runtest
|
||||
|
||||
compat:
|
||||
name: build-compat # compat with other OSes
|
||||
needs: run # only run this if cheap linux job passed
|
||||
timeout-minutes: 15
|
||||
strategy:
|
||||
fail-fast: true
|
||||
|
|
@ -64,26 +59,7 @@ jobs:
|
|||
|
||||
- run: opam install -t moonpool moonpool-lwt --deps-only
|
||||
- run: opam exec -- dune build @install
|
||||
|
||||
# install some depopts
|
||||
- run: opam install thread-local-storage trace domain-local-await
|
||||
- run: opam exec -- dune build --profile=release --force @install @runtest
|
||||
|
||||
format:
|
||||
name: format
|
||||
strategy:
|
||||
matrix:
|
||||
ocaml-compiler:
|
||||
- '5.3'
|
||||
runs-on: 'ubuntu-latest'
|
||||
steps:
|
||||
- uses: actions/checkout@main
|
||||
- name: Use OCaml ${{ matrix.ocaml-compiler }}
|
||||
uses: ocaml/setup-ocaml@v3
|
||||
with:
|
||||
ocaml-compiler: ${{ matrix.ocaml-compiler }}
|
||||
dune-cache: true
|
||||
allow-prerelease-opam: true
|
||||
|
||||
- run: opam install ocamlformat.0.27.0
|
||||
- run: opam exec -- make format-check
|
||||
|
||||
|
|
|
|||
2
Makefile
2
Makefile
|
|
@ -7,7 +7,7 @@ clean:
|
|||
@dune clean
|
||||
|
||||
test:
|
||||
@dune runtest $(DUNE_OPTS)
|
||||
@dune runtest $(DUNE_OPTS) --no-buffer
|
||||
|
||||
test-autopromote:
|
||||
@dune runtest $(DUNE_OPTS) --auto-promote
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
(lang dune 3.0)
|
||||
(lang dune 3.15)
|
||||
|
||||
(using mdx 0.2)
|
||||
|
||||
|
|
@ -60,6 +60,7 @@
|
|||
:with-test)))
|
||||
(depopts
|
||||
hmap
|
||||
(ambient-context (>= 0.2))
|
||||
(trace
|
||||
(>= 0.6)))
|
||||
(tags
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ license: "MIT"
|
|||
homepage: "https://github.com/c-cube/moonpool"
|
||||
bug-reports: "https://github.com/c-cube/moonpool/issues"
|
||||
depends: [
|
||||
"dune" {>= "3.0"}
|
||||
"dune" {>= "3.15"}
|
||||
"moonpool" {= version}
|
||||
"ocaml" {>= "5.0"}
|
||||
"qcheck-core" {with-test & >= "0.21"}
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ homepage: "https://github.com/c-cube/moonpool"
|
|||
bug-reports: "https://github.com/c-cube/moonpool/issues"
|
||||
depends: [
|
||||
"ocaml" {>= "5.0"}
|
||||
"dune" {>= "3.0"}
|
||||
"dune" {>= "3.15"}
|
||||
"either" {>= "1.0"}
|
||||
"trace" {>= "0.11" & with-test}
|
||||
"trace-tef" {>= "0.11" & with-test}
|
||||
|
|
@ -24,6 +24,7 @@ depends: [
|
|||
]
|
||||
depopts: [
|
||||
"hmap"
|
||||
"ambient-context" {>= "0.2"}
|
||||
"trace" {>= "0.6"}
|
||||
]
|
||||
build: [
|
||||
|
|
|
|||
5
src/ambient-context/dune
Normal file
5
src/ambient-context/dune
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
(library
|
||||
(name moonpool_ambient_context)
|
||||
(public_name moonpool.ambient-context)
|
||||
(optional) ; ambient-context
|
||||
(libraries moonpool hmap ambient-context))
|
||||
21
src/ambient-context/moonpool_ambient_context.ml
Normal file
21
src/ambient-context/moonpool_ambient_context.ml
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
open struct
|
||||
module TLS = Moonpool.Task_local_storage
|
||||
end
|
||||
|
||||
let storage : Ambient_context.Storage.t =
|
||||
{
|
||||
name = "moonpool";
|
||||
get_context = TLS.get_local_hmap;
|
||||
with_context =
|
||||
(fun new_hmap f ->
|
||||
let old = TLS.get_local_hmap () in
|
||||
TLS.set_local_hmap new_hmap;
|
||||
match f () with
|
||||
| x ->
|
||||
TLS.set_local_hmap old;
|
||||
x
|
||||
| exception exn ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
TLS.set_local_hmap old;
|
||||
Printexc.raise_with_backtrace exn bt);
|
||||
}
|
||||
|
|
@ -6,18 +6,7 @@ include Runner
|
|||
|
||||
let ( let@ ) = ( @@ )
|
||||
|
||||
module Id = struct
|
||||
type t = unit ref
|
||||
(** Unique identifier for a pool *)
|
||||
|
||||
let create () : t = Sys.opaque_identity (ref ())
|
||||
let equal : t -> t -> bool = ( == )
|
||||
end
|
||||
|
||||
type state = {
|
||||
id_: Id.t;
|
||||
(** Unique to this pool. Used to make sure tasks stay within the same
|
||||
pool. *)
|
||||
active: bool A.t; (** Becomes [false] when the pool is shutdown. *)
|
||||
mutable workers: worker_state array; (** Fixed set of workers. *)
|
||||
main_q: WL.task_full Queue.t;
|
||||
|
|
@ -99,12 +88,15 @@ let schedule_in_main_queue (self : state) task : unit =
|
|||
longer permitted *)
|
||||
raise Shutdown
|
||||
|
||||
let schedule_from_w (self : worker_state) (task : WL.task_full) : unit =
|
||||
let schedule_from_anywhere_ (st : state) (task : WL.task_full) : unit =
|
||||
match get_current_worker_ () with
|
||||
| Some w when Id.equal self.st.id_ w.st.id_ ->
|
||||
| Some w when st == w.st ->
|
||||
(* use worker from the same pool *)
|
||||
schedule_on_current_worker w task
|
||||
| _ -> schedule_in_main_queue self.st task
|
||||
| _ -> schedule_in_main_queue st task
|
||||
|
||||
let schedule_from_w (w : worker_state) task : unit =
|
||||
schedule_from_anywhere_ w.st task
|
||||
|
||||
exception Got_task of WL.task_full
|
||||
|
||||
|
|
@ -223,7 +215,8 @@ let as_runner_ (self : state) : t =
|
|||
Runner.For_runner_implementors.create
|
||||
~shutdown:(fun ~wait () -> shutdown_ self ~wait)
|
||||
~run_async:(fun ~fiber f ->
|
||||
schedule_in_main_queue self @@ T_start { fiber; f })
|
||||
let task = WL.T_start { fiber; f } in
|
||||
schedule_from_anywhere_ self task)
|
||||
~size:(fun () -> size_ self)
|
||||
~num_tasks:(fun () -> num_tasks_ self)
|
||||
()
|
||||
|
|
@ -240,7 +233,6 @@ type ('a, 'b) create_args =
|
|||
let create ?(on_init_thread = default_thread_init_exit_)
|
||||
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
|
||||
?num_threads ?name () : t =
|
||||
let pool_id_ = Id.create () in
|
||||
let num_domains = Domain_pool_.max_number_of_domains () in
|
||||
let num_threads = Util_pool_.num_threads ?num_threads () in
|
||||
|
||||
|
|
@ -249,7 +241,6 @@ let create ?(on_init_thread = default_thread_init_exit_)
|
|||
|
||||
let pool =
|
||||
{
|
||||
id_ = pool_id_;
|
||||
active = A.make true;
|
||||
workers = [||];
|
||||
main_q = Queue.create ();
|
||||
|
|
|
|||
1
src/lwt/.ocamlformat-ignore
Normal file
1
src/lwt/.ocamlformat-ignore
Normal file
|
|
@ -0,0 +1 @@
|
|||
types_
|
||||
16
src/lwt/dune
16
src/lwt/dune
|
|
@ -2,9 +2,23 @@
|
|||
(name moonpool_lwt)
|
||||
(public_name moonpool-lwt)
|
||||
(enabled_if
|
||||
(>= %{ocaml_version} 5.0))
|
||||
(and
|
||||
(>= %{ocaml_version} 5.0)
|
||||
%{lib-available:lwt}))
|
||||
(modules moonpool_lwt types_)
|
||||
(libraries
|
||||
(re_export moonpool)
|
||||
picos
|
||||
(re_export lwt)
|
||||
lwt.unix))
|
||||
|
||||
(executable
|
||||
(name gen_types_)
|
||||
(modules gen_types_))
|
||||
|
||||
(rule
|
||||
(enabled_if %{lib-available:lwt})
|
||||
(deps types_.ml.5 types_.ml.6)
|
||||
(target types_.ml)
|
||||
(action
|
||||
(run ./gen_types_.exe %{version:lwt})))
|
||||
|
|
|
|||
23
src/lwt/gen_types_.ml
Normal file
23
src/lwt/gen_types_.ml
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
let copy_file src dst =
|
||||
let ic = open_in src in
|
||||
let oc = open_out dst in
|
||||
let buf = Bytes.create 1024 in
|
||||
(try
|
||||
while true do
|
||||
let n = input ic buf 0 (Bytes.length buf) in
|
||||
if n = 0 then raise End_of_file;
|
||||
output oc buf 0 n
|
||||
done
|
||||
with End_of_file -> ());
|
||||
close_in ic;
|
||||
close_out oc
|
||||
|
||||
let () =
|
||||
let version = Sys.argv.(1) in
|
||||
let major =
|
||||
try Scanf.sscanf version "%d.%s" (fun maj _ -> maj) with _ -> 0
|
||||
in
|
||||
if major >= 6 then
|
||||
copy_file "types_.ml.6" "types_.ml"
|
||||
else
|
||||
copy_file "types_.ml.5" "types_.ml"
|
||||
|
|
@ -23,7 +23,7 @@ module Scheduler_state = struct
|
|||
mutable as_runner: Moonpool.Runner.t;
|
||||
mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option;
|
||||
mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option;
|
||||
mutable notification: int;
|
||||
mutable notification: Types_.notification;
|
||||
(** A lwt_unix notification to wake up the event loop *)
|
||||
has_notified: bool Atomic.t;
|
||||
}
|
||||
|
|
@ -42,7 +42,7 @@ module Scheduler_state = struct
|
|||
as_runner = Moonpool.Runner.dummy;
|
||||
enter_hook = None;
|
||||
leave_hook = None;
|
||||
notification = 0;
|
||||
notification = Types_.dummy_notification;
|
||||
has_notified = Atomic.make false;
|
||||
}
|
||||
|
||||
|
|
|
|||
3
src/lwt/types_.ml.5
Normal file
3
src/lwt/types_.ml.5
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
|
||||
type notification = int
|
||||
let dummy_notification : notification = 0
|
||||
3
src/lwt/types_.ml.6
Normal file
3
src/lwt/types_.ml.6
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
|
||||
type notification = Lwt_unix.notification
|
||||
let dummy_notification : notification = Lwt_unix.make_notification ignore
|
||||
14
test/dune
14
test/dune
|
|
@ -20,3 +20,17 @@
|
|||
unix
|
||||
trace-tef
|
||||
trace))
|
||||
|
||||
(test
|
||||
(name t_ambient_context)
|
||||
(package moonpool)
|
||||
(enabled_if
|
||||
(and %{lib-available:ambient-context} %{lib-available:hmap}))
|
||||
(libraries moonpool moonpool.ambient-context ambient-context))
|
||||
|
||||
(test
|
||||
(name t_fib_await_mem)
|
||||
(package moonpool)
|
||||
(enabled_if
|
||||
(= %{system} linux))
|
||||
(libraries moonpool))
|
||||
|
|
|
|||
53
test/t_ambient_context.ml
Normal file
53
test/t_ambient_context.ml
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
open Moonpool
|
||||
|
||||
let () = Ambient_context.set_current_storage Moonpool_ambient_context.storage
|
||||
let key_a : string Ambient_context.Context.key = Ambient_context.new_key ()
|
||||
let key_b : int Ambient_context.Context.key = Ambient_context.new_key ()
|
||||
let get_a () = Ambient_context.get key_a
|
||||
let get_b () = Ambient_context.get key_b
|
||||
|
||||
let () =
|
||||
let pool = Ws_pool.create ~num_threads:4 () in
|
||||
|
||||
(* basic: values set in a task are visible within that task *)
|
||||
let fut =
|
||||
Fut.spawn ~on:pool (fun () ->
|
||||
assert (get_a () = None);
|
||||
Ambient_context.with_key_bound_to key_a "hello" (fun () ->
|
||||
assert (get_a () = Some "hello");
|
||||
(* nested: inner binding shadows outer *)
|
||||
Ambient_context.with_key_bound_to key_a "world" (fun () ->
|
||||
assert (get_a () = Some "world"));
|
||||
(* restored after inner scope *)
|
||||
assert (get_a () = Some "hello"));
|
||||
assert (get_a () = None))
|
||||
in
|
||||
Fut.wait_block_exn fut;
|
||||
|
||||
(* two keys are independent *)
|
||||
let fut =
|
||||
Fut.spawn ~on:pool (fun () ->
|
||||
Ambient_context.with_key_bound_to key_a "foo" (fun () ->
|
||||
Ambient_context.with_key_bound_to key_b 42 (fun () ->
|
||||
assert (get_a () = Some "foo");
|
||||
assert (get_b () = Some 42));
|
||||
assert (get_b () = None);
|
||||
assert (get_a () = Some "foo")))
|
||||
in
|
||||
Fut.wait_block_exn fut;
|
||||
|
||||
(* child tasks do not inherit the parent's ambient context *)
|
||||
let fut =
|
||||
Fut.spawn ~on:pool (fun () ->
|
||||
Ambient_context.with_key_bound_to key_a "parent" (fun () ->
|
||||
let child =
|
||||
Fut.spawn ~on:pool (fun () ->
|
||||
(* child starts with empty context *)
|
||||
assert (get_a () = None))
|
||||
in
|
||||
Fut.wait_block_exn child))
|
||||
in
|
||||
Fut.wait_block_exn fut;
|
||||
|
||||
Ws_pool.shutdown pool;
|
||||
Printf.printf "all assertions passed\n%!"
|
||||
54
test/t_fib_await_mem.ml
Normal file
54
test/t_fib_await_mem.ml
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
(* regression test for #45 *)
|
||||
|
||||
open Moonpool
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
|
||||
let rec fib_direct x =
|
||||
if x <= 1 then
|
||||
1
|
||||
else
|
||||
fib_direct (x - 1) + fib_direct (x - 2)
|
||||
|
||||
let cutoff = 8
|
||||
|
||||
let rec fib_await ~on x : int Fut.t =
|
||||
if x <= cutoff then
|
||||
Fut.spawn ~on (fun () -> fib_direct x)
|
||||
else
|
||||
Fut.spawn ~on (fun () ->
|
||||
let n1 = fib_await ~on (x - 1) in
|
||||
let n2 = fib_await ~on (x - 2) in
|
||||
let n1 = Fut.await n1 in
|
||||
let n2 = Fut.await n2 in
|
||||
n1 + n2)
|
||||
|
||||
(** Read VmHWM (peak RSS in kB) from /proc/self/status. *)
|
||||
let get_vmhwm_kb () : int option =
|
||||
let path = "/proc/self/status" in
|
||||
match In_channel.with_open_bin path In_channel.input_all with
|
||||
| exception Sys_error _ -> None
|
||||
| content ->
|
||||
let lines = String.split_on_char '\n' content in
|
||||
List.find_map
|
||||
(fun line -> Scanf.sscanf_opt line "VmHWM: %d kB" Fun.id)
|
||||
lines
|
||||
|
||||
let max_rss_bytes = 150_000_000
|
||||
|
||||
let () =
|
||||
let@ pool = Ws_pool.with_ ~num_threads:4 () in
|
||||
let result = fib_await ~on:pool 40 |> Fut.wait_block_exn in
|
||||
assert (result = 165580141);
|
||||
match get_vmhwm_kb () with
|
||||
| None ->
|
||||
Printf.printf "fib 40 = %d (skip RSS check: no /proc/self/status)\n%!"
|
||||
result
|
||||
| Some hwm_kb ->
|
||||
let hwm_bytes = hwm_kb * 1024 in
|
||||
Printf.printf "fib 40 = %d, peak RSS = %d bytes\n%!" result hwm_bytes;
|
||||
if hwm_bytes > max_rss_bytes then (
|
||||
Printf.eprintf "FAIL: peak RSS %d bytes exceeds limit %d bytes\n%!"
|
||||
hwm_bytes max_rss_bytes;
|
||||
exit 1
|
||||
)
|
||||
Loading…
Add table
Reference in a new issue