Compare commits

...

10 commits

Author SHA1 Message Date
Simon Cruanes
f287d03bd5 ugh, try multiple lwt versions on linux plz thx
Some checks failed
format / format (push) Has been cancelled
github pages / Deploy doc (push) Has been cancelled
build / build (push) Has been cancelled
build / build-compat (push) Has been cancelled
2026-04-07 23:03:38 -04:00
Simon Cruanes
c372afd2b5 bump to dune 3.15 to fix format 2026-04-07 22:59:34 -04:00
Simon Cruanes
1b59d8aaaf fix 2026-04-07 22:30:39 -04:00
Simon Cruanes
19e925b1fb ocamlforamt ignore 2026-04-07 22:28:57 -04:00
Simon Cruanes
a0f4c20f2b support for lwt.6 2026-04-07 22:09:22 -04:00
Simon Cruanes
e2fb26b7fa test ambient 2026-04-07 22:09:22 -04:00
Simon Cruanes
d681231d9d add support for ambient-context
optional library to provide ambient-context implem using
Moonpool.Task_local_storage
2026-04-07 21:42:52 -04:00
Simon Cruanes
471feb96d6 wip: CI 2026-04-07 21:29:11 -04:00
Simon Cruanes
0ff2645de4 CI 2026-04-06 20:14:43 -04:00
Simon Cruanes
0d0db75f26
ws pool: use ws queue in as_runner (#46)
fix a bug where the work stealing queue wasn't used in the `Runner.t` implementation.

close #45
2026-03-25 09:50:40 -04:00
19 changed files with 241 additions and 70 deletions

17
.github/workflows/format.yml vendored Normal file
View file

@ -0,0 +1,17 @@
name: format
on:
push:
branches:
- main
pull_request:
jobs:
format:
name: format
runs-on: 'ubuntu-latest'
container: ghcr.io/c-cube/c-cube-commmon/ci-doc-5.3:latest
steps:
- uses: actions/checkout@v6
- run: opam exec -- make format-check

View file

@ -9,20 +9,14 @@ jobs:
deploy:
name: Deploy doc
runs-on: ubuntu-latest
container: ghcr.io/c-cube/c-cube-commmon/ci-doc-5.3:latest
steps:
- uses: actions/checkout@main
- name: Use OCaml
uses: ocaml/setup-ocaml@v3
with:
ocaml-compiler: '5.3'
dune-cache: true
allow-prerelease-opam: true
- uses: actions/checkout@v6
# temporary until it's in a release
- run: opam pin picos 0.6.0 -y -n
- run: opam install odig moonpool moonpool-lwt -t
- run: opam install moonpool moonpool-lwt -t
- run: opam exec -- odig odoc --cache-dir=_doc/ moonpool moonpool-lwt

View file

@ -1,4 +1,4 @@
name: Build and Test
name: build
on:
push:
@ -13,33 +13,28 @@ jobs:
strategy:
fail-fast: true
matrix:
os:
- ubuntu-latest
ocaml-compiler:
- '5.0'
- '5.3'
container:
- ghcr.io/c-cube/c-cube-commmon/ci-5.0:latest
- ghcr.io/c-cube/c-cube-commmon/ci-5.4:latest
runs-on: ${{ matrix.os }}
runs-on: ubuntu-latest
container: ${{ matrix.container }}
steps:
- uses: actions/checkout@main
- name: Use OCaml ${{ matrix.ocaml-compiler }}
uses: ocaml/setup-ocaml@v3
with:
ocaml-compiler: ${{ matrix.ocaml-compiler }}
dune-cache: true
allow-prerelease-opam: true
- uses: actions/checkout@v6
- run: opam pin picos 0.6.0 -y -n
- run: opam install -t moonpool moonpool-lwt --deps-only
- run: opam pin -y -n lwt 5.9.2
- run: opam install -t . --deps-only
- run: opam exec -- dune build @install
# install some depopts
- run: opam install thread-local-storage trace hmap
- run: opam pin -y -n lwt 6.1.1
- run: opam install thread-local-storage trace hmap ambient-context.0.2
- run: opam exec -- dune build --profile=release --force @install @runtest
compat:
name: build-compat # compat with other OSes
needs: run # only run this if cheap linux job passed
timeout-minutes: 15
strategy:
fail-fast: true
@ -64,26 +59,7 @@ jobs:
- run: opam install -t moonpool moonpool-lwt --deps-only
- run: opam exec -- dune build @install
# install some depopts
- run: opam install thread-local-storage trace domain-local-await
- run: opam exec -- dune build --profile=release --force @install @runtest
format:
name: format
strategy:
matrix:
ocaml-compiler:
- '5.3'
runs-on: 'ubuntu-latest'
steps:
- uses: actions/checkout@main
- name: Use OCaml ${{ matrix.ocaml-compiler }}
uses: ocaml/setup-ocaml@v3
with:
ocaml-compiler: ${{ matrix.ocaml-compiler }}
dune-cache: true
allow-prerelease-opam: true
- run: opam install ocamlformat.0.27.0
- run: opam exec -- make format-check

View file

@ -7,7 +7,7 @@ clean:
@dune clean
test:
@dune runtest $(DUNE_OPTS)
@dune runtest $(DUNE_OPTS) --no-buffer
test-autopromote:
@dune runtest $(DUNE_OPTS) --auto-promote

View file

@ -1,4 +1,4 @@
(lang dune 3.0)
(lang dune 3.15)
(using mdx 0.2)
@ -60,6 +60,7 @@
:with-test)))
(depopts
hmap
(ambient-context (>= 0.2))
(trace
(>= 0.6)))
(tags

View file

@ -8,7 +8,7 @@ license: "MIT"
homepage: "https://github.com/c-cube/moonpool"
bug-reports: "https://github.com/c-cube/moonpool/issues"
depends: [
"dune" {>= "3.0"}
"dune" {>= "3.15"}
"moonpool" {= version}
"ocaml" {>= "5.0"}
"qcheck-core" {with-test & >= "0.21"}

View file

@ -10,7 +10,7 @@ homepage: "https://github.com/c-cube/moonpool"
bug-reports: "https://github.com/c-cube/moonpool/issues"
depends: [
"ocaml" {>= "5.0"}
"dune" {>= "3.0"}
"dune" {>= "3.15"}
"either" {>= "1.0"}
"trace" {>= "0.11" & with-test}
"trace-tef" {>= "0.11" & with-test}
@ -24,6 +24,7 @@ depends: [
]
depopts: [
"hmap"
"ambient-context" {>= "0.2"}
"trace" {>= "0.6"}
]
build: [

5
src/ambient-context/dune Normal file
View file

@ -0,0 +1,5 @@
(library
(name moonpool_ambient_context)
(public_name moonpool.ambient-context)
(optional) ; ambient-context
(libraries moonpool hmap ambient-context))

View file

@ -0,0 +1,21 @@
open struct
module TLS = Moonpool.Task_local_storage
end
let storage : Ambient_context.Storage.t =
{
name = "moonpool";
get_context = TLS.get_local_hmap;
with_context =
(fun new_hmap f ->
let old = TLS.get_local_hmap () in
TLS.set_local_hmap new_hmap;
match f () with
| x ->
TLS.set_local_hmap old;
x
| exception exn ->
let bt = Printexc.get_raw_backtrace () in
TLS.set_local_hmap old;
Printexc.raise_with_backtrace exn bt);
}

View file

@ -6,18 +6,7 @@ include Runner
let ( let@ ) = ( @@ )
module Id = struct
type t = unit ref
(** Unique identifier for a pool *)
let create () : t = Sys.opaque_identity (ref ())
let equal : t -> t -> bool = ( == )
end
type state = {
id_: Id.t;
(** Unique to this pool. Used to make sure tasks stay within the same
pool. *)
active: bool A.t; (** Becomes [false] when the pool is shutdown. *)
mutable workers: worker_state array; (** Fixed set of workers. *)
main_q: WL.task_full Queue.t;
@ -99,12 +88,15 @@ let schedule_in_main_queue (self : state) task : unit =
longer permitted *)
raise Shutdown
let schedule_from_w (self : worker_state) (task : WL.task_full) : unit =
let schedule_from_anywhere_ (st : state) (task : WL.task_full) : unit =
match get_current_worker_ () with
| Some w when Id.equal self.st.id_ w.st.id_ ->
| Some w when st == w.st ->
(* use worker from the same pool *)
schedule_on_current_worker w task
| _ -> schedule_in_main_queue self.st task
| _ -> schedule_in_main_queue st task
let schedule_from_w (w : worker_state) task : unit =
schedule_from_anywhere_ w.st task
exception Got_task of WL.task_full
@ -223,7 +215,8 @@ let as_runner_ (self : state) : t =
Runner.For_runner_implementors.create
~shutdown:(fun ~wait () -> shutdown_ self ~wait)
~run_async:(fun ~fiber f ->
schedule_in_main_queue self @@ T_start { fiber; f })
let task = WL.T_start { fiber; f } in
schedule_from_anywhere_ self task)
~size:(fun () -> size_ self)
~num_tasks:(fun () -> num_tasks_ self)
()
@ -240,7 +233,6 @@ type ('a, 'b) create_args =
let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
?num_threads ?name () : t =
let pool_id_ = Id.create () in
let num_domains = Domain_pool_.max_number_of_domains () in
let num_threads = Util_pool_.num_threads ?num_threads () in
@ -249,7 +241,6 @@ let create ?(on_init_thread = default_thread_init_exit_)
let pool =
{
id_ = pool_id_;
active = A.make true;
workers = [||];
main_q = Queue.create ();

View file

@ -0,0 +1 @@
types_

View file

@ -2,9 +2,23 @@
(name moonpool_lwt)
(public_name moonpool-lwt)
(enabled_if
(>= %{ocaml_version} 5.0))
(and
(>= %{ocaml_version} 5.0)
%{lib-available:lwt}))
(modules moonpool_lwt types_)
(libraries
(re_export moonpool)
picos
(re_export lwt)
lwt.unix))
(executable
(name gen_types_)
(modules gen_types_))
(rule
(enabled_if %{lib-available:lwt})
(deps types_.ml.5 types_.ml.6)
(target types_.ml)
(action
(run ./gen_types_.exe %{version:lwt})))

23
src/lwt/gen_types_.ml Normal file
View file

@ -0,0 +1,23 @@
let copy_file src dst =
let ic = open_in src in
let oc = open_out dst in
let buf = Bytes.create 1024 in
(try
while true do
let n = input ic buf 0 (Bytes.length buf) in
if n = 0 then raise End_of_file;
output oc buf 0 n
done
with End_of_file -> ());
close_in ic;
close_out oc
let () =
let version = Sys.argv.(1) in
let major =
try Scanf.sscanf version "%d.%s" (fun maj _ -> maj) with _ -> 0
in
if major >= 6 then
copy_file "types_.ml.6" "types_.ml"
else
copy_file "types_.ml.5" "types_.ml"

View file

@ -23,7 +23,7 @@ module Scheduler_state = struct
mutable as_runner: Moonpool.Runner.t;
mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option;
mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option;
mutable notification: int;
mutable notification: Types_.notification;
(** A lwt_unix notification to wake up the event loop *)
has_notified: bool Atomic.t;
}
@ -42,7 +42,7 @@ module Scheduler_state = struct
as_runner = Moonpool.Runner.dummy;
enter_hook = None;
leave_hook = None;
notification = 0;
notification = Types_.dummy_notification;
has_notified = Atomic.make false;
}

3
src/lwt/types_.ml.5 Normal file
View file

@ -0,0 +1,3 @@
type notification = int
let dummy_notification : notification = 0

3
src/lwt/types_.ml.6 Normal file
View file

@ -0,0 +1,3 @@
type notification = Lwt_unix.notification
let dummy_notification : notification = Lwt_unix.make_notification ignore

View file

@ -20,3 +20,17 @@
unix
trace-tef
trace))
(test
(name t_ambient_context)
(package moonpool)
(enabled_if
(and %{lib-available:ambient-context} %{lib-available:hmap}))
(libraries moonpool moonpool.ambient-context ambient-context))
(test
(name t_fib_await_mem)
(package moonpool)
(enabled_if
(= %{system} linux))
(libraries moonpool))

53
test/t_ambient_context.ml Normal file
View file

@ -0,0 +1,53 @@
open Moonpool
let () = Ambient_context.set_current_storage Moonpool_ambient_context.storage
let key_a : string Ambient_context.Context.key = Ambient_context.new_key ()
let key_b : int Ambient_context.Context.key = Ambient_context.new_key ()
let get_a () = Ambient_context.get key_a
let get_b () = Ambient_context.get key_b
let () =
let pool = Ws_pool.create ~num_threads:4 () in
(* basic: values set in a task are visible within that task *)
let fut =
Fut.spawn ~on:pool (fun () ->
assert (get_a () = None);
Ambient_context.with_key_bound_to key_a "hello" (fun () ->
assert (get_a () = Some "hello");
(* nested: inner binding shadows outer *)
Ambient_context.with_key_bound_to key_a "world" (fun () ->
assert (get_a () = Some "world"));
(* restored after inner scope *)
assert (get_a () = Some "hello"));
assert (get_a () = None))
in
Fut.wait_block_exn fut;
(* two keys are independent *)
let fut =
Fut.spawn ~on:pool (fun () ->
Ambient_context.with_key_bound_to key_a "foo" (fun () ->
Ambient_context.with_key_bound_to key_b 42 (fun () ->
assert (get_a () = Some "foo");
assert (get_b () = Some 42));
assert (get_b () = None);
assert (get_a () = Some "foo")))
in
Fut.wait_block_exn fut;
(* child tasks do not inherit the parent's ambient context *)
let fut =
Fut.spawn ~on:pool (fun () ->
Ambient_context.with_key_bound_to key_a "parent" (fun () ->
let child =
Fut.spawn ~on:pool (fun () ->
(* child starts with empty context *)
assert (get_a () = None))
in
Fut.wait_block_exn child))
in
Fut.wait_block_exn fut;
Ws_pool.shutdown pool;
Printf.printf "all assertions passed\n%!"

54
test/t_fib_await_mem.ml Normal file
View file

@ -0,0 +1,54 @@
(* regression test for #45 *)
open Moonpool
let ( let@ ) = ( @@ )
let rec fib_direct x =
if x <= 1 then
1
else
fib_direct (x - 1) + fib_direct (x - 2)
let cutoff = 8
let rec fib_await ~on x : int Fut.t =
if x <= cutoff then
Fut.spawn ~on (fun () -> fib_direct x)
else
Fut.spawn ~on (fun () ->
let n1 = fib_await ~on (x - 1) in
let n2 = fib_await ~on (x - 2) in
let n1 = Fut.await n1 in
let n2 = Fut.await n2 in
n1 + n2)
(** Read VmHWM (peak RSS in kB) from /proc/self/status. *)
let get_vmhwm_kb () : int option =
let path = "/proc/self/status" in
match In_channel.with_open_bin path In_channel.input_all with
| exception Sys_error _ -> None
| content ->
let lines = String.split_on_char '\n' content in
List.find_map
(fun line -> Scanf.sscanf_opt line "VmHWM: %d kB" Fun.id)
lines
let max_rss_bytes = 150_000_000
let () =
let@ pool = Ws_pool.with_ ~num_threads:4 () in
let result = fib_await ~on:pool 40 |> Fut.wait_block_exn in
assert (result = 165580141);
match get_vmhwm_kb () with
| None ->
Printf.printf "fib 40 = %d (skip RSS check: no /proc/self/status)\n%!"
result
| Some hwm_kb ->
let hwm_bytes = hwm_kb * 1024 in
Printf.printf "fib 40 = %d, peak RSS = %d bytes\n%!" result hwm_bytes;
if hwm_bytes > max_rss_bytes then (
Printf.eprintf "FAIL: peak RSS %d bytes exceeds limit %d bytes\n%!"
hwm_bytes max_rss_bytes;
exit 1
)