Merge pull request #39 from c-cube/simon/reduce-scope-2025-07-09

reduce scope of the library a bit
This commit is contained in:
Simon Cruanes 2025-09-26 15:02:01 -04:00 committed by GitHub
commit d79200f555
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
39 changed files with 22 additions and 693 deletions

View file

@ -15,7 +15,7 @@ jobs:
- name: Use OCaml - name: Use OCaml
uses: ocaml/setup-ocaml@v3 uses: ocaml/setup-ocaml@v3
with: with:
ocaml-compiler: '5.0' ocaml-compiler: '5.3'
dune-cache: true dune-cache: true
allow-prerelease-opam: true allow-prerelease-opam: true

View file

@ -16,8 +16,8 @@ jobs:
os: os:
- ubuntu-latest - ubuntu-latest
ocaml-compiler: ocaml-compiler:
- '4.14' - '5.0'
- '5.2' - '5.3'
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:
@ -32,15 +32,10 @@ jobs:
- run: opam pin picos 0.6.0 -y -n - run: opam pin picos 0.6.0 -y -n
- run: opam install -t moonpool moonpool-lwt moonpool-io --deps-only - run: opam install -t moonpool moonpool-lwt moonpool-io --deps-only
if: matrix.ocaml-compiler == '5.2'
- run: opam install -t moonpool --deps-only
if: matrix.ocaml-compiler != '5.2'
- run: opam exec -- dune build @install - run: opam exec -- dune build @install
# install some depopts # install some depopts
- run: opam install thread-local-storage trace hmap - run: opam install thread-local-storage trace hmap
if: matrix.ocaml-compiler == '5.2'
- run: opam exec -- dune build --profile=release --force @install @runtest - run: opam exec -- dune build --profile=release --force @install @runtest
compat: compat:

View file

@ -1,6 +1,3 @@
(executables (executables
(names fib_rec pi primes) (names fib_rec pi primes)
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file})))
(libraries moonpool moonpool.forkjoin unix trace trace-tef domainslib)) (libraries moonpool moonpool.forkjoin unix trace trace-tef domainslib))

View file

@ -66,8 +66,6 @@ let run_par1 ~kind (num_steps : int) : float =
let pi = step *. Lock.get global_sum in let pi = step *. Lock.get global_sum in
pi pi
[@@@ifge 5.0]
let run_fork_join ~kind num_steps : float = let run_fork_join ~kind num_steps : float =
let@ pool = with_pool ~kind () in let@ pool = with_pool ~kind () in
@ -92,13 +90,6 @@ let run_fork_join ~kind num_steps : float =
let pi = step *. Lock.get global_sum in let pi = step *. Lock.get global_sum in
pi pi
[@@@else_]
let run_fork_join _ =
failwith "fork join not available on this version of OCaml"
[@@@endif]
type mode = type mode =
| Sequential | Sequential
| Par1 | Par1

View file

@ -16,7 +16,7 @@
(name moonpool) (name moonpool)
(synopsis "Pools of threads supported by a pool of domains") (synopsis "Pools of threads supported by a pool of domains")
(depends (depends
(ocaml (>= 4.14)) (ocaml (>= 5.0))
dune dune
(either (>= 1.0)) (either (>= 1.0))
(trace :with-test) (trace :with-test)

View file

@ -9,7 +9,7 @@ tags: ["thread" "pool" "domain" "futures" "fork-join"]
homepage: "https://github.com/c-cube/moonpool" homepage: "https://github.com/c-cube/moonpool"
bug-reports: "https://github.com/c-cube/moonpool/issues" bug-reports: "https://github.com/c-cube/moonpool/issues"
depends: [ depends: [
"ocaml" {>= "4.14"} "ocaml" {>= "5.0"}
"dune" {>= "3.0"} "dune" {>= "3.0"}
"either" {>= "1.0"} "either" {>= "1.0"}
"trace" {with-test} "trace" {with-test}

View file

@ -1,182 +0,0 @@
type 'a t = {
max_size: int;
q: 'a Queue.t;
mutex: Mutex.t;
cond_push: Condition.t;
cond_pop: Condition.t;
mutable closed: bool;
}
exception Closed
let create ~max_size () : _ t =
if max_size < 1 then invalid_arg "Bounded_queue.create";
{
max_size;
mutex = Mutex.create ();
cond_push = Condition.create ();
cond_pop = Condition.create ();
q = Queue.create ();
closed = false;
}
let close (self : _ t) =
Mutex.lock self.mutex;
if not self.closed then (
self.closed <- true;
(* awake waiters so they fail *)
Condition.broadcast self.cond_push;
Condition.broadcast self.cond_pop
);
Mutex.unlock self.mutex
(** Check if the queue is full. Precondition: [self.mutex] is acquired. *)
let[@inline] is_full_ (self : _ t) : bool = Queue.length self.q >= self.max_size
let push (self : _ t) x : unit =
let continue = ref true in
Mutex.lock self.mutex;
while !continue do
if self.closed then (
(* push always fails on a closed queue *)
Mutex.unlock self.mutex;
raise Closed
) else if is_full_ self then
Condition.wait self.cond_push self.mutex
else (
let was_empty = Queue.is_empty self.q in
Queue.push x self.q;
if was_empty then Condition.broadcast self.cond_pop;
(* exit loop *)
continue := false;
Mutex.unlock self.mutex
)
done
let pop (self : 'a t) : 'a =
Mutex.lock self.mutex;
let rec loop () =
if Queue.is_empty self.q then (
if self.closed then (
(* pop fails on a closed queue if it's also empty,
otherwise it still returns the remaining elements *)
Mutex.unlock self.mutex;
raise Closed
);
Condition.wait self.cond_pop self.mutex;
(loop [@tailcall]) ()
) else (
let was_full = is_full_ self in
let x = Queue.pop self.q in
(* wakeup pushers that were blocked *)
if was_full then Condition.broadcast self.cond_push;
Mutex.unlock self.mutex;
x
)
in
loop ()
let try_pop ~force_lock (self : _ t) : _ option =
let has_lock =
if force_lock then (
Mutex.lock self.mutex;
true
) else
Mutex.try_lock self.mutex
in
if has_lock then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
let was_full_before_pop = is_full_ self in
match Queue.pop self.q with
| x ->
(* wakeup pushers that are blocked *)
if was_full_before_pop then Condition.broadcast self.cond_push;
Mutex.unlock self.mutex;
Some x
| exception Queue.Empty ->
Mutex.unlock self.mutex;
None
) else
None
let try_push ~force_lock (self : _ t) x : bool =
let has_lock =
if force_lock then (
Mutex.lock self.mutex;
true
) else
Mutex.try_lock self.mutex
in
if has_lock then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
if is_full_ self then (
Mutex.unlock self.mutex;
false
) else (
let was_empty = Queue.is_empty self.q in
Queue.push x self.q;
if was_empty then Condition.broadcast self.cond_pop;
Mutex.unlock self.mutex;
true
)
) else
false
let[@inline] max_size self = self.max_size
let size (self : _ t) : int =
Mutex.lock self.mutex;
let n = Queue.length self.q in
Mutex.unlock self.mutex;
n
let transfer (self : 'a t) q2 : unit =
Mutex.lock self.mutex;
let continue = ref true in
while !continue do
if Queue.is_empty self.q then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
Condition.wait self.cond_pop self.mutex
) else (
let was_full = is_full_ self in
Queue.transfer self.q q2;
if was_full then Condition.broadcast self.cond_push;
continue := false;
Mutex.unlock self.mutex
)
done
type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit
let to_iter self k =
try
while true do
let x = pop self in
k x
done
with Closed -> ()
let to_gen self : _ gen =
fun () ->
match pop self with
| exception Closed -> None
| x -> Some x
let rec to_seq self : _ Seq.t =
fun () ->
match pop self with
| exception Closed -> Seq.Nil
| x -> Seq.Cons (x, to_seq self)

View file

@ -1,82 +0,0 @@
(** A blocking queue of finite size.
This queue, while still using locks underneath (like the regular blocking
queue) should be enough for usage under reasonable contention.
The bounded size is helpful whenever some form of backpressure is desirable:
if the queue is used to communicate between producer(s) and consumer(s), the
consumer(s) can limit the rate at which producer(s) send new work down their
way. Whenever the queue is full, means that producer(s) will have to wait
before pushing new work.
@since 0.4 *)
type 'a t
(** A bounded queue. *)
val create : max_size:int -> unit -> 'a t
val close : _ t -> unit
(** [close q] closes [q]. No new elements can be pushed into [q], and after all
the elements still in [q] currently are [pop]'d, {!pop} will also raise
{!Closed}. *)
exception Closed
val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] at the end of the queue. If [q] is full, this will
block until there is room for [x].
@raise Closed if [q] is closed. *)
val try_push : force_lock:bool -> 'a t -> 'a -> bool
(** [try_push q x] attempts to push [x] into [q], but abandons if it cannot
acquire [q] or if [q] is full.
@param force_lock
if true, use {!Mutex.lock} (which can block under contention); if false,
use {!Mutex.try_lock}, which might return [false] even if there's room in
the queue.
@raise Closed if [q] is closed. *)
val pop : 'a t -> 'a
(** [pop q] pops the first element off [q]. It blocks if [q] is empty, until
some element becomes available.
@raise Closed if [q] is empty and closed. *)
val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop ~force_lock q] tries to pop the first element, or returns [None] if
no element is available or if it failed to acquire [q].
@param force_lock
if true, use {!Mutex.lock} (which can block under contention); if false,
use {!Mutex.try_lock}, which might return [None] even in presence of an
element if there's contention.
@raise Closed if [q] is empty and closed. *)
val size : _ t -> int
(** Number of elements currently in [q] *)
val max_size : _ t -> int
(** Maximum size of the queue. See {!create}. *)
val transfer : 'a t -> 'a Queue.t -> unit
(** [transfer bq q2] transfers all elements currently available in [bq] into
local queue [q2], and clears [bq], atomically. It blocks if [bq] is empty.
See {!Bb_queue.transfer} for more details.
@raise Closed if [bq] is empty and closed. *)
type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit
val to_iter : 'a t -> 'a iter
(** [to_iter q] returns an iterator over all items in the queue. This might not
terminate if [q] is never closed. *)
val to_gen : 'a t -> 'a gen
(** [to_gen q] returns a generator from the queue. *)
val to_seq : 'a t -> 'a Seq.t
(** [to_gen q] returns a (transient) sequence from the queue. *)

View file

@ -70,8 +70,6 @@ let close (self : _ t) : unit =
Mutex.unlock self.mutex; Mutex.unlock self.mutex;
Queue.iter Trigger.signal q Queue.iter Trigger.signal q
[@@@ifge 5.0]
let rec push (self : _ t) x : unit = let rec push (self : _ t) x : unit =
Mutex.lock self.mutex; Mutex.lock self.mutex;
@ -120,5 +118,3 @@ let rec pop (self : 'a t) : 'a =
Mutex.unlock self.mutex; Mutex.unlock self.mutex;
Trigger.await_exn tr; Trigger.await_exn tr;
pop self pop self
[@@@endif]

View file

@ -28,8 +28,6 @@ val close : _ t -> unit
(** Close the channel. Further push and pop calls will fail. This is idempotent. (** Close the channel. Further push and pop calls will fail. This is idempotent.
*) *)
[@@@ifge 5.0]
val push : 'a t -> 'a -> unit val push : 'a t -> 'a -> unit
(** Push the value into the channel, suspending the current task if the channel (** Push the value into the channel, suspending the current task if the channel
is currently full. is currently full.
@ -48,5 +46,3 @@ val pop_block_exn : 'a t -> 'a
The precautions around blocking from inside a thread pool The precautions around blocking from inside a thread pool
are the same as explained in {!Fut.wait_block}. *) are the same as explained in {!Fut.wait_block}. *)
*) *)
[@@@endif]

View file

@ -12,7 +12,4 @@
moonpool.dpool moonpool.dpool
(re_export picos)) (re_export picos))
(flags :standard -open Moonpool_private) (flags :standard -open Moonpool_private)
(private_modules util_pool_) (private_modules util_pool_))
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file}))))

View file

@ -1,4 +1,4 @@
module A = Atomic_ module A = Atomic
module C = Picos.Computation module C = Picos.Computation
type 'a or_error = ('a, Exn_bt.t) result type 'a or_error = ('a, Exn_bt.t) result
@ -424,8 +424,6 @@ let wait_block self =
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
Error (Exn_bt.make exn bt) Error (Exn_bt.make exn bt)
[@@@ifge 5.0]
let await (self : 'a t) : 'a = let await (self : 'a t) : 'a =
(* fast path: peek *) (* fast path: peek *)
match C.peek_exn self with match C.peek_exn self with
@ -439,8 +437,6 @@ let await (self : 'a t) : 'a =
(* un-suspended: we should have a result! *) (* un-suspended: we should have a result! *)
get_or_fail_exn self get_or_fail_exn self
[@@@endif]
module Infix = struct module Infix = struct
let[@inline] ( >|= ) x f = map ~f x let[@inline] ( >|= ) x f = map ~f x
let[@inline] ( >>= ) x f = bind ~f x let[@inline] ( >>= ) x f = bind ~f x

View file

@ -236,8 +236,6 @@ val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t
{b NOTE} This is only available on OCaml 5. *) {b NOTE} This is only available on OCaml 5. *)
[@@@ifge 5.0]
val await : 'a t -> 'a val await : 'a t -> 'a
(** [await fut] suspends the current tasks until [fut] is fulfilled, then (** [await fut] suspends the current tasks until [fut] is fulfilled, then
resumes the task on this same runner (but possibly on a different resumes the task on this same runner (but possibly on a different
@ -248,8 +246,6 @@ val await : 'a t -> 'a
This must only be run from inside the runner itself. The runner must support This must only be run from inside the runner itself. The runner must support
{!Suspend_}. {b NOTE}: only on OCaml 5.x *) {!Suspend_}. {b NOTE}: only on OCaml 5.x *)
[@@@endif]
(** {2 Blocking} *) (** {2 Blocking} *)
val wait_block : 'a t -> 'a or_error val wait_block : 'a t -> 'a or_error

View file

@ -12,18 +12,12 @@ let get_current_runner = Runner.get_current_runner
let recommended_thread_count () = Domain_.recommended_number () let recommended_thread_count () = Domain_.recommended_number ()
let spawn = Fut.spawn let spawn = Fut.spawn
let spawn_on_current_runner = Fut.spawn_on_current_runner let spawn_on_current_runner = Fut.spawn_on_current_runner
[@@@ifge 5.0]
let await = Fut.await let await = Fut.await
let yield = Picos.Fiber.yield let yield = Picos.Fiber.yield
[@@@endif] module Atomic = Atomic
module Atomic = Atomic_
module Blocking_queue = Bb_queue module Blocking_queue = Bb_queue
module Background_thread = Background_thread module Background_thread = Background_thread
module Bounded_queue = Bounded_queue
module Chan = Chan module Chan = Chan
module Exn_bt = Exn_bt module Exn_bt = Exn_bt
module Fifo_pool = Fifo_pool module Fifo_pool = Fifo_pool

View file

@ -72,8 +72,6 @@ val get_current_runner : unit -> Runner.t option
(** See {!Runner.get_current_runner} (** See {!Runner.get_current_runner}
@since 0.7 *) @since 0.7 *)
[@@@ifge 5.0]
val await : 'a Fut.t -> 'a val await : 'a Fut.t -> 'a
(** Await a future, must be run on a moonpool runner. See {!Fut.await}. Only on (** Await a future, must be run on a moonpool runner. See {!Fut.await}. Only on
OCaml >= 5.0. OCaml >= 5.0.
@ -84,8 +82,6 @@ val yield : unit -> unit
>= 5.0. >= 5.0.
@since NEXT_RELEASE *) @since NEXT_RELEASE *)
[@@@endif]
module Lock = Lock module Lock = Lock
module Fut = Fut module Fut = Fut
module Chan = Chan module Chan = Chan
@ -203,9 +199,7 @@ module Blocking_queue : sig
@since 0.4 *) @since 0.4 *)
end end
module Bounded_queue = Bounded_queue module Atomic = Atomic
module Atomic = Atomic_
(** Atomic values. (** Atomic values.
This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic] This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic]

View file

@ -33,8 +33,6 @@ type 'st ops = {
(** A dummy task. *) (** A dummy task. *)
let _dummy_task : task_full = T_start { f = ignore; fiber = _dummy_fiber } let _dummy_task : task_full = T_start { f = ignore; fiber = _dummy_fiber }
[@@@ifge 5.0]
let[@inline] discontinue k exn = let[@inline] discontinue k exn =
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
Effect.Deep.discontinue_with_backtrace k exn bt Effect.Deep.discontinue_with_backtrace k exn bt
@ -100,12 +98,6 @@ let with_handler (type st arg) ~(ops : st ops) (self : st) :
let handler = Effect.Deep.{ retc = Fun.id; exnc = raise_with_bt; effc } in let handler = Effect.Deep.{ retc = Fun.id; exnc = raise_with_bt; effc } in
fun f -> Effect.Deep.match_with f () handler fun f -> Effect.Deep.match_with f () handler
[@@@else_]
let with_handler ~ops:_ self f = f ()
[@@@endif]
let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit = let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
if block_signals then ( if block_signals then (
try try

View file

@ -1,5 +1,5 @@
open Types_ open Types_
module A = Atomic_ module A = Atomic
module WSQ = Ws_deque_ module WSQ = Ws_deque_
module WL = Worker_loop_ module WL = Worker_loop_
include Runner include Runner

View file

@ -1,124 +0,0 @@
type op =
| Le
| Ge
type line =
| If of op * int * int
| Elseif of op * int * int
| Else
| Endif
| Raw of string
| Eof
let prefix ~pre s =
let len = String.length pre in
if len > String.length s then
false
else (
let rec check i =
if i = len then
true
else if String.unsafe_get s i <> String.unsafe_get pre i then
false
else
check (i + 1)
in
check 0
)
let eval ~major ~minor op i j =
match op with
| Le -> (major, minor) <= (i, j)
| Ge -> (major, minor) >= (i, j)
let preproc_lines ~file ~major ~minor (ic : in_channel) : unit =
let pos = ref 0 in
let fail msg =
failwith (Printf.sprintf "at line %d in '%s': %s" !pos file msg)
in
let pp_pos () = Printf.printf "#%d %S\n" !pos file in
let parse_line () : line =
match input_line ic with
| exception End_of_file -> Eof
| line ->
let line' = String.trim line in
incr pos;
if line' <> "" && line'.[0] = '[' then
if prefix line' ~pre:"[@@@ifle" then
Scanf.sscanf line' "[@@@ifle %d.%d]" (fun x y -> If (Le, x, y))
else if prefix line' ~pre:"[@@@ifge" then
Scanf.sscanf line' "[@@@ifge %d.%d]" (fun x y -> If (Ge, x, y))
else if prefix line' ~pre:"[@@@elifle" then
Scanf.sscanf line' "[@@@elifle %d.%d]" (fun x y -> Elseif (Le, x, y))
else if prefix line' ~pre:"[@@@elifge" then
Scanf.sscanf line' "[@@@elifge %d.%d]" (fun x y -> Elseif (Ge, x, y))
else if line' = "[@@@else_]" then
Else
else if line' = "[@@@endif]" then
Endif
else
Raw line
else
Raw line
in
(* entry point *)
let rec top () =
match parse_line () with
| Eof -> ()
| If (op, i, j) ->
if eval ~major ~minor op i j then (
pp_pos ();
cat_block ()
) else
skip_block ~elseok:true ()
| Raw s ->
print_endline s;
top ()
| Elseif _ | Else | Endif -> fail "unexpected elseif|else|endif"
(* current block is the valid one *)
and cat_block () =
match parse_line () with
| Eof -> fail "unexpected EOF"
| If _ -> fail "nested if not supported"
| Raw s ->
print_endline s;
cat_block ()
| Endif ->
pp_pos ();
top ()
| Elseif _ | Else -> skip_block ~elseok:false ()
(* skip current block.
@param elseok if true, we should evaluate "elseif" *)
and skip_block ~elseok () =
match parse_line () with
| Eof -> fail "unexpected EOF"
| If _ -> fail "nested if not supported"
| Raw _ -> skip_block ~elseok ()
| Endif ->
pp_pos ();
top ()
| Elseif (op, i, j) ->
if elseok && eval ~major ~minor op i j then (
pp_pos ();
cat_block ()
) else
skip_block ~elseok ()
| Else ->
if elseok then (
pp_pos ();
cat_block ()
) else
skip_block ~elseok ()
in
top ()
let () =
let file = Sys.argv.(1) in
let version = Sys.ocaml_version in
let major, minor = Scanf.sscanf version "%u.%u" (fun maj min -> maj, min) in
let ic = open_in file in
preproc_lines ~file ~major ~minor ic;
()

View file

@ -1,6 +0,0 @@
; our little preprocessor (ported from containers)
(executable
(name cpp)
(modes
(best exe)))

View file

@ -2,8 +2,5 @@
(name moonpool_dpool) (name moonpool_dpool)
(public_name moonpool.dpool) (public_name moonpool.dpool)
(synopsis "Moonpool's domain pool (used to start worker threads)") (synopsis "Moonpool's domain pool (used to start worker threads)")
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file})))
(flags :standard -open Moonpool_private) (flags :standard -open Moonpool_private)
(libraries moonpool.private)) (libraries moonpool.private))

View file

@ -71,7 +71,7 @@ type event =
new threads for pools. *) new threads for pools. *)
type worker_state = { type worker_state = {
q: event Bb_queue.t; q: event Bb_queue.t;
th_count: int Atomic_.t; (** Number of threads on this *) th_count: int Atomic.t; (** Number of threads on this *)
} }
(** Array of (optional) workers. (** Array of (optional) workers.
@ -101,14 +101,14 @@ let work_ idx (st : worker_state) : unit =
match Bb_queue.pop st.q with match Bb_queue.pop st.q with
| Run f -> (try f () with _ -> ()) | Run f -> (try f () with _ -> ())
| Decr -> | Decr ->
if Atomic_.fetch_and_add st.th_count (-1) = 1 then ( if Atomic.fetch_and_add st.th_count (-1) = 1 then (
continue := false; continue := false;
(* wait a bit, we might be needed again in a short amount of time *) (* wait a bit, we might be needed again in a short amount of time *)
try try
for _n_attempt = 1 to 50 do for _n_attempt = 1 to 50 do
Thread.delay 0.001; Thread.delay 0.001;
if Atomic_.get st.th_count > 0 then ( if Atomic.get st.th_count > 0 then (
(* needed again! *) (* needed again! *)
continue := true; continue := true;
raise Exit raise Exit
@ -129,7 +129,7 @@ let work_ idx (st : worker_state) : unit =
| Some _st', dom -> | Some _st', dom ->
assert (st == _st'); assert (st == _st');
if Atomic_.get st.th_count > 0 then if Atomic.get st.th_count > 0 then
(* still alive! *) (* still alive! *)
(Some st, dom), true (Some st, dom), true
else else
@ -145,7 +145,7 @@ let work_ idx (st : worker_state) : unit =
(* special case for main domain: we start a worker immediately *) (* special case for main domain: we start a worker immediately *)
let () = let () =
assert (Domain_.is_main_domain ()); assert (Domain_.is_main_domain ());
let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in let w = { th_count = Atomic.make 1; q = Bb_queue.create () } in
(* thread that stays alive *) (* thread that stays alive *)
ignore (Thread.create (fun () -> work_ 0 w) () : Thread.t); ignore (Thread.create (fun () -> work_ 0 w) () : Thread.t);
domains_.(0) <- Lock.create (Some w, None) domains_.(0) <- Lock.create (Some w, None)
@ -157,12 +157,12 @@ let run_on (i : int) (f : unit -> unit) : unit =
let w = let w =
Lock.update_map domains_.(i) (function Lock.update_map domains_.(i) (function
| (Some w, _) as st -> | (Some w, _) as st ->
Atomic_.incr w.th_count; Atomic.incr w.th_count;
st, w st, w
| None, dying_dom -> | None, dying_dom ->
(* join previous dying domain, to free its resources, if any *) (* join previous dying domain, to free its resources, if any *)
Option.iter Domain_.join dying_dom; Option.iter Domain_.join dying_dom;
let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in let w = { th_count = Atomic.make 1; q = Bb_queue.create () } in
let worker : domain = Domain_.spawn (fun () -> work_ i w) in let worker : domain = Domain_.spawn (fun () -> work_ i w) in
(Some w, Some worker), w) (Some w, Some worker), w)
in in

View file

@ -3,10 +3,4 @@
(public_name moonpool.fib) (public_name moonpool.fib)
(synopsis "Fibers and structured concurrency for Moonpool") (synopsis "Fibers and structured concurrency for Moonpool")
(libraries moonpool picos) (libraries moonpool picos)
(enabled_if (flags :standard -open Moonpool_private -open Moonpool))
(>= %{ocaml_version} 5.0))
(flags :standard -open Moonpool_private -open Moonpool)
(optional)
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file}))))

View file

@ -4,6 +4,4 @@
(synopsis "Fork-join parallelism for moonpool") (synopsis "Fork-join parallelism for moonpool")
(flags :standard -open Moonpool) (flags :standard -open Moonpool)
(optional) (optional)
(enabled_if
(>= %{ocaml_version} 5.0))
(libraries moonpool moonpool.private picos)) (libraries moonpool moonpool.private picos))

View file

@ -1,3 +1,5 @@
[@@@deprecated "just use lwt or eio or something else"]
module Fd = Picos_io_fd module Fd = Picos_io_fd
module Unix = Picos_io.Unix module Unix = Picos_io.Unix
module Select = Picos_io_select module Select = Picos_io_select

View file

@ -1,46 +0,0 @@
[@@@ifge 4.12]
include Atomic
[@@@else_]
type 'a t = { mutable x: 'a }
let[@inline] make x = { x }
let[@inline] get { x } = x
let[@inline] set r x = r.x <- x
let[@inline never] exchange r x =
(* atomic *)
let y = r.x in
r.x <- x;
(* atomic *)
y
let[@inline never] compare_and_set r seen v =
(* atomic *)
if r.x == seen then (
r.x <- v;
(* atomic *)
true
) else
false
let[@inline never] fetch_and_add r x =
(* atomic *)
let v = r.x in
r.x <- x + r.x;
(* atomic *)
v
let[@inline never] incr r =
(* atomic *)
r.x <- 1 + r.x
(* atomic *)
let[@inline never] decr r =
(* atomic *)
r.x <- r.x - 1
(* atomic *)
[@@@endif]

View file

@ -1,4 +1,3 @@
[@@@ifge 5.0]
[@@@ocaml.alert "-unstable"] [@@@ocaml.alert "-unstable"]
let recommended_number () = Domain.recommended_domain_count () let recommended_number () = Domain.recommended_domain_count ()
@ -10,18 +9,3 @@ let spawn : _ -> t = Domain.spawn
let relax = Domain.cpu_relax let relax = Domain.cpu_relax
let join = Domain.join let join = Domain.join
let is_main_domain = Domain.is_main_domain let is_main_domain = Domain.is_main_domain
[@@@ocaml.alert "+unstable"]
[@@@else_]
let recommended_number () = 1
type t = Thread.t
let get_id (self : t) : int = Thread.id self
let spawn f : t = Thread.create f ()
let relax () = Thread.yield ()
let join = Thread.join
let is_main_domain () = true
[@@@endif]

View file

@ -2,9 +2,6 @@
(name moonpool_private) (name moonpool_private)
(public_name moonpool.private) (public_name moonpool.private)
(synopsis "Private internal utils for Moonpool (do not rely on)") (synopsis "Private internal utils for Moonpool (do not rely on)")
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file})))
(libraries (libraries
threads threads
either either

View file

@ -1,4 +1,4 @@
module A = Atomic_ module A = Atomic
(* terminology: (* terminology:

View file

@ -10,8 +10,7 @@
t_resource t_resource
t_unfair t_unfair
t_ws_deque t_ws_deque
t_ws_wait t_ws_wait)
t_bounded_queue)
(package moonpool) (package moonpool)
(libraries (libraries
moonpool moonpool

View file

@ -9,9 +9,6 @@
t_sort t_sort
t_fork_join t_fork_join
t_fork_join_heavy) t_fork_join_heavy)
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file})))
(enabled_if (enabled_if
(and (and
(= %{system} "linux") (= %{system} "linux")

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
open Moonpool open Moonpool
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
@ -56,5 +54,3 @@ let main () =
let () = let () =
let@ () = Trace_tef.with_setup () in let@ () = Trace_tef.with_setup () in
main () main ()
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
open Moonpool open Moonpool
module FJ = Moonpool_forkjoin module FJ = Moonpool_forkjoin
@ -52,5 +50,3 @@ let () =
(* now make sure we can do this with multiple pools in parallel *) (* now make sure we can do this with multiple pools in parallel *)
let jobs = Array.init 2 (fun _ -> Thread.create run_test ()) in let jobs = Array.init 2 (fun _ -> Thread.create run_test ()) in
Array.iter Thread.join jobs Array.iter Thread.join jobs
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
open Moonpool open Moonpool
@ -44,5 +42,3 @@ let () =
(* now make sure we can do this with multiple pools in parallel *) (* now make sure we can do this with multiple pools in parallel *)
let jobs = Array.init 2 (fun _ -> Thread.create run_test ()) in let jobs = Array.init 2 (fun _ -> Thread.create run_test ()) in
Array.iter Thread.join jobs Array.iter Thread.join jobs
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
let spf = Printf.sprintf let spf = Printf.sprintf
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
@ -328,5 +326,3 @@ let () =
t_for_nested ~min:1 ~chunk_size:100 (); t_for_nested ~min:1 ~chunk_size:100 ();
t_for_nested ~min:4 ~chunk_size:100 (); t_for_nested ~min:4 ~chunk_size:100 ();
] ]
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
module Q = QCheck module Q = QCheck
let spf = Printf.sprintf let spf = Printf.sprintf
@ -52,5 +50,3 @@ let () =
run ~min:4 (); run ~min:4 ();
run ~min:1 (); run ~min:1 ();
Printf.printf "done\n%!" Printf.printf "done\n%!"
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
open! Moonpool open! Moonpool
let pool = Ws_pool.create ~num_threads:4 () let pool = Ws_pool.create ~num_threads:4 ()
@ -53,5 +51,3 @@ let () =
in in
let fut = Fut.both f1 f2 in let fut = Fut.both f1 f2 in
assert (Fut.wait_block fut = Ok (2, 20)) assert (Fut.wait_block fut = Ok (2, 20))
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
open Moonpool open Moonpool
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
@ -44,5 +42,3 @@ let () =
run ~pool ()); run ~pool ());
() ()
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
open Moonpool open Moonpool
module FJ = Moonpool_forkjoin module FJ = Moonpool_forkjoin
@ -69,5 +67,3 @@ let () =
(* Printf.printf "arr: [%s]\n%!" *) (* Printf.printf "arr: [%s]\n%!" *)
(* (String.concat ", " @@ List.map string_of_int @@ Array.to_list arr); *) (* (String.concat ", " @@ List.map string_of_int @@ Array.to_list arr); *)
assert (sorted arr) assert (sorted arr)
[@@@endif]

View file

@ -1,111 +0,0 @@
module BQ = Moonpool.Bounded_queue
module Bb_queue = Moonpool.Blocking_queue
module A = Moonpool.Atomic
let spawn f = ignore (Moonpool.start_thread_on_some_domain f () : Thread.t)
let () =
let bq = BQ.create ~max_size:3 () in
BQ.push bq 1;
BQ.push bq 2;
assert (BQ.size bq = 2);
assert (BQ.pop bq = 1);
assert (BQ.pop bq = 2);
assert (BQ.try_pop ~force_lock:true bq = None);
spawn (fun () -> BQ.push bq 3);
assert (BQ.pop bq = 3)
let () =
(* cannot create with size 0 *)
assert (
try
ignore (BQ.create ~max_size:0 ());
false
with _ -> true)
let () =
let bq = BQ.create ~max_size:3 () in
BQ.push bq 1;
BQ.push bq 2;
assert (BQ.size bq = 2);
assert (BQ.pop bq = 1);
BQ.close bq;
assert (BQ.pop bq = 2);
assert (
try
ignore (BQ.pop bq);
false
with BQ.Closed -> true);
assert (
try
ignore (BQ.push bq 42);
false
with BQ.Closed -> true)
let () =
let bq = BQ.create ~max_size:2 () in
let side_q = Bb_queue.create () in
BQ.push bq 1;
BQ.push bq 2;
spawn (fun () ->
for i = 3 to 10 do
BQ.push bq i;
Bb_queue.push side_q (`Pushed i)
done);
(* make space for new element *)
assert (BQ.pop bq = 1);
assert (Bb_queue.pop side_q = `Pushed 3);
assert (BQ.pop bq = 2);
assert (BQ.pop bq = 3);
for j = 4 to 10 do
assert (BQ.pop bq = j);
assert (Bb_queue.pop side_q = `Pushed j)
done;
assert (BQ.size bq = 0);
()
let () =
let bq = BQ.create ~max_size:5 () in
let bq1 = BQ.create ~max_size:10 () in
let bq2 = BQ.create ~max_size:10 () in
let bq_res = BQ.create ~max_size:2 () in
(* diamond:
bq -------> bq1
| |
| |
v v
bq2 -----> bq_res *)
spawn (fun () ->
BQ.to_iter bq (BQ.push bq1);
BQ.close bq1);
spawn (fun () ->
BQ.to_iter bq (BQ.push bq2);
BQ.close bq2);
spawn (fun () -> BQ.to_iter bq1 (BQ.push bq_res));
spawn (fun () -> BQ.to_iter bq2 (BQ.push bq_res));
let n = 100_000 in
(* push into [bq] *)
let sum = A.make 0 in
spawn (fun () ->
for i = 1 to n do
ignore (A.fetch_and_add sum i : int);
BQ.push bq i
done;
BQ.close bq);
let sum' = ref 0 in
for _j = 1 to n do
let x = BQ.pop bq_res in
sum' := x + !sum'
done;
assert (BQ.size bq_res = 0);
assert (A.get sum = !sum')