From 5ea9a3f5878daa9a15a58266bddb065f37b6792a Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 9 Jul 2025 15:28:25 -0400 Subject: [PATCH 01/11] remove bounded_queue --- src/core/bounded_queue.ml | 182 ------------------------------------- src/core/bounded_queue.mli | 82 ----------------- src/core/moonpool.ml | 1 - src/core/moonpool.mli | 2 - test/dune | 3 +- test/t_bounded_queue.ml | 111 ---------------------- 6 files changed, 1 insertion(+), 380 deletions(-) delete mode 100644 src/core/bounded_queue.ml delete mode 100644 src/core/bounded_queue.mli delete mode 100644 test/t_bounded_queue.ml diff --git a/src/core/bounded_queue.ml b/src/core/bounded_queue.ml deleted file mode 100644 index 550c119c..00000000 --- a/src/core/bounded_queue.ml +++ /dev/null @@ -1,182 +0,0 @@ -type 'a t = { - max_size: int; - q: 'a Queue.t; - mutex: Mutex.t; - cond_push: Condition.t; - cond_pop: Condition.t; - mutable closed: bool; -} - -exception Closed - -let create ~max_size () : _ t = - if max_size < 1 then invalid_arg "Bounded_queue.create"; - { - max_size; - mutex = Mutex.create (); - cond_push = Condition.create (); - cond_pop = Condition.create (); - q = Queue.create (); - closed = false; - } - -let close (self : _ t) = - Mutex.lock self.mutex; - if not self.closed then ( - self.closed <- true; - (* awake waiters so they fail *) - Condition.broadcast self.cond_push; - Condition.broadcast self.cond_pop - ); - Mutex.unlock self.mutex - -(** Check if the queue is full. Precondition: [self.mutex] is acquired. *) -let[@inline] is_full_ (self : _ t) : bool = Queue.length self.q >= self.max_size - -let push (self : _ t) x : unit = - let continue = ref true in - Mutex.lock self.mutex; - while !continue do - if self.closed then ( - (* push always fails on a closed queue *) - Mutex.unlock self.mutex; - raise Closed - ) else if is_full_ self then - Condition.wait self.cond_push self.mutex - else ( - let was_empty = Queue.is_empty self.q in - Queue.push x self.q; - if was_empty then Condition.broadcast self.cond_pop; - - (* exit loop *) - continue := false; - Mutex.unlock self.mutex - ) - done - -let pop (self : 'a t) : 'a = - Mutex.lock self.mutex; - let rec loop () = - if Queue.is_empty self.q then ( - if self.closed then ( - (* pop fails on a closed queue if it's also empty, - otherwise it still returns the remaining elements *) - Mutex.unlock self.mutex; - raise Closed - ); - - Condition.wait self.cond_pop self.mutex; - (loop [@tailcall]) () - ) else ( - let was_full = is_full_ self in - let x = Queue.pop self.q in - (* wakeup pushers that were blocked *) - if was_full then Condition.broadcast self.cond_push; - Mutex.unlock self.mutex; - x - ) - in - loop () - -let try_pop ~force_lock (self : _ t) : _ option = - let has_lock = - if force_lock then ( - Mutex.lock self.mutex; - true - ) else - Mutex.try_lock self.mutex - in - if has_lock then ( - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); - let was_full_before_pop = is_full_ self in - match Queue.pop self.q with - | x -> - (* wakeup pushers that are blocked *) - if was_full_before_pop then Condition.broadcast self.cond_push; - Mutex.unlock self.mutex; - Some x - | exception Queue.Empty -> - Mutex.unlock self.mutex; - None - ) else - None - -let try_push ~force_lock (self : _ t) x : bool = - let has_lock = - if force_lock then ( - Mutex.lock self.mutex; - true - ) else - Mutex.try_lock self.mutex - in - if has_lock then ( - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); - - if is_full_ self then ( - Mutex.unlock self.mutex; - false - ) else ( - let was_empty = Queue.is_empty self.q in - Queue.push x self.q; - if was_empty then Condition.broadcast self.cond_pop; - Mutex.unlock self.mutex; - true - ) - ) else - false - -let[@inline] max_size self = self.max_size - -let size (self : _ t) : int = - Mutex.lock self.mutex; - let n = Queue.length self.q in - Mutex.unlock self.mutex; - n - -let transfer (self : 'a t) q2 : unit = - Mutex.lock self.mutex; - let continue = ref true in - while !continue do - if Queue.is_empty self.q then ( - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); - Condition.wait self.cond_pop self.mutex - ) else ( - let was_full = is_full_ self in - Queue.transfer self.q q2; - if was_full then Condition.broadcast self.cond_push; - continue := false; - Mutex.unlock self.mutex - ) - done - -type 'a gen = unit -> 'a option -type 'a iter = ('a -> unit) -> unit - -let to_iter self k = - try - while true do - let x = pop self in - k x - done - with Closed -> () - -let to_gen self : _ gen = - fun () -> - match pop self with - | exception Closed -> None - | x -> Some x - -let rec to_seq self : _ Seq.t = - fun () -> - match pop self with - | exception Closed -> Seq.Nil - | x -> Seq.Cons (x, to_seq self) diff --git a/src/core/bounded_queue.mli b/src/core/bounded_queue.mli deleted file mode 100644 index 165f7681..00000000 --- a/src/core/bounded_queue.mli +++ /dev/null @@ -1,82 +0,0 @@ -(** A blocking queue of finite size. - - This queue, while still using locks underneath (like the regular blocking - queue) should be enough for usage under reasonable contention. - - The bounded size is helpful whenever some form of backpressure is desirable: - if the queue is used to communicate between producer(s) and consumer(s), the - consumer(s) can limit the rate at which producer(s) send new work down their - way. Whenever the queue is full, means that producer(s) will have to wait - before pushing new work. - - @since 0.4 *) - -type 'a t -(** A bounded queue. *) - -val create : max_size:int -> unit -> 'a t - -val close : _ t -> unit -(** [close q] closes [q]. No new elements can be pushed into [q], and after all - the elements still in [q] currently are [pop]'d, {!pop} will also raise - {!Closed}. *) - -exception Closed - -val push : 'a t -> 'a -> unit -(** [push q x] pushes [x] at the end of the queue. If [q] is full, this will - block until there is room for [x]. - @raise Closed if [q] is closed. *) - -val try_push : force_lock:bool -> 'a t -> 'a -> bool -(** [try_push q x] attempts to push [x] into [q], but abandons if it cannot - acquire [q] or if [q] is full. - - @param force_lock - if true, use {!Mutex.lock} (which can block under contention); if false, - use {!Mutex.try_lock}, which might return [false] even if there's room in - the queue. - - @raise Closed if [q] is closed. *) - -val pop : 'a t -> 'a -(** [pop q] pops the first element off [q]. It blocks if [q] is empty, until - some element becomes available. - @raise Closed if [q] is empty and closed. *) - -val try_pop : force_lock:bool -> 'a t -> 'a option -(** [try_pop ~force_lock q] tries to pop the first element, or returns [None] if - no element is available or if it failed to acquire [q]. - - @param force_lock - if true, use {!Mutex.lock} (which can block under contention); if false, - use {!Mutex.try_lock}, which might return [None] even in presence of an - element if there's contention. - - @raise Closed if [q] is empty and closed. *) - -val size : _ t -> int -(** Number of elements currently in [q] *) - -val max_size : _ t -> int -(** Maximum size of the queue. See {!create}. *) - -val transfer : 'a t -> 'a Queue.t -> unit -(** [transfer bq q2] transfers all elements currently available in [bq] into - local queue [q2], and clears [bq], atomically. It blocks if [bq] is empty. - - See {!Bb_queue.transfer} for more details. - @raise Closed if [bq] is empty and closed. *) - -type 'a gen = unit -> 'a option -type 'a iter = ('a -> unit) -> unit - -val to_iter : 'a t -> 'a iter -(** [to_iter q] returns an iterator over all items in the queue. This might not - terminate if [q] is never closed. *) - -val to_gen : 'a t -> 'a gen -(** [to_gen q] returns a generator from the queue. *) - -val to_seq : 'a t -> 'a Seq.t -(** [to_gen q] returns a (transient) sequence from the queue. *) diff --git a/src/core/moonpool.ml b/src/core/moonpool.ml index 2d009065..d9f72b51 100644 --- a/src/core/moonpool.ml +++ b/src/core/moonpool.ml @@ -23,7 +23,6 @@ let yield = Picos.Fiber.yield module Atomic = Atomic_ module Blocking_queue = Bb_queue module Background_thread = Background_thread -module Bounded_queue = Bounded_queue module Chan = Chan module Exn_bt = Exn_bt module Fifo_pool = Fifo_pool diff --git a/src/core/moonpool.mli b/src/core/moonpool.mli index aa4548a5..3a5f5eef 100644 --- a/src/core/moonpool.mli +++ b/src/core/moonpool.mli @@ -203,8 +203,6 @@ module Blocking_queue : sig @since 0.4 *) end -module Bounded_queue = Bounded_queue - module Atomic = Atomic_ (** Atomic values. diff --git a/test/dune b/test/dune index af881591..38b6a9c8 100644 --- a/test/dune +++ b/test/dune @@ -10,8 +10,7 @@ t_resource t_unfair t_ws_deque - t_ws_wait - t_bounded_queue) + t_ws_wait) (package moonpool) (libraries moonpool diff --git a/test/t_bounded_queue.ml b/test/t_bounded_queue.ml deleted file mode 100644 index 25302896..00000000 --- a/test/t_bounded_queue.ml +++ /dev/null @@ -1,111 +0,0 @@ -module BQ = Moonpool.Bounded_queue -module Bb_queue = Moonpool.Blocking_queue -module A = Moonpool.Atomic - -let spawn f = ignore (Moonpool.start_thread_on_some_domain f () : Thread.t) - -let () = - let bq = BQ.create ~max_size:3 () in - BQ.push bq 1; - BQ.push bq 2; - assert (BQ.size bq = 2); - assert (BQ.pop bq = 1); - assert (BQ.pop bq = 2); - - assert (BQ.try_pop ~force_lock:true bq = None); - spawn (fun () -> BQ.push bq 3); - assert (BQ.pop bq = 3) - -let () = - (* cannot create with size 0 *) - assert ( - try - ignore (BQ.create ~max_size:0 ()); - false - with _ -> true) - -let () = - let bq = BQ.create ~max_size:3 () in - BQ.push bq 1; - BQ.push bq 2; - assert (BQ.size bq = 2); - assert (BQ.pop bq = 1); - - BQ.close bq; - assert (BQ.pop bq = 2); - assert ( - try - ignore (BQ.pop bq); - false - with BQ.Closed -> true); - assert ( - try - ignore (BQ.push bq 42); - false - with BQ.Closed -> true) - -let () = - let bq = BQ.create ~max_size:2 () in - let side_q = Bb_queue.create () in - BQ.push bq 1; - BQ.push bq 2; - - spawn (fun () -> - for i = 3 to 10 do - BQ.push bq i; - Bb_queue.push side_q (`Pushed i) - done); - - (* make space for new element *) - assert (BQ.pop bq = 1); - assert (Bb_queue.pop side_q = `Pushed 3); - assert (BQ.pop bq = 2); - assert (BQ.pop bq = 3); - for j = 4 to 10 do - assert (BQ.pop bq = j); - assert (Bb_queue.pop side_q = `Pushed j) - done; - assert (BQ.size bq = 0); - () - -let () = - let bq = BQ.create ~max_size:5 () in - - let bq1 = BQ.create ~max_size:10 () in - let bq2 = BQ.create ~max_size:10 () in - - let bq_res = BQ.create ~max_size:2 () in - - (* diamond: - bq -------> bq1 - | | - | | - v v - bq2 -----> bq_res *) - spawn (fun () -> - BQ.to_iter bq (BQ.push bq1); - BQ.close bq1); - spawn (fun () -> - BQ.to_iter bq (BQ.push bq2); - BQ.close bq2); - spawn (fun () -> BQ.to_iter bq1 (BQ.push bq_res)); - spawn (fun () -> BQ.to_iter bq2 (BQ.push bq_res)); - - let n = 100_000 in - - (* push into [bq] *) - let sum = A.make 0 in - spawn (fun () -> - for i = 1 to n do - ignore (A.fetch_and_add sum i : int); - BQ.push bq i - done; - BQ.close bq); - - let sum' = ref 0 in - for _j = 1 to n do - let x = BQ.pop bq_res in - sum' := x + !sum' - done; - assert (BQ.size bq_res = 0); - assert (A.get sum = !sum') From 83acc18d3d21f7bf4f24f4f9c3ceec21b7080306 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 9 Jul 2025 15:28:33 -0400 Subject: [PATCH 02/11] deprecate fibers --- src/fib/moonpool_fib.ml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/fib/moonpool_fib.ml b/src/fib/moonpool_fib.ml index ec89c075..0a3b7360 100644 --- a/src/fib/moonpool_fib.ml +++ b/src/fib/moonpool_fib.ml @@ -5,8 +5,15 @@ @since 0.6. *) module Fiber = Fiber +[@@deprecated "use picos structured concurrency or something else"] + module Fls = Fls module Handle = Handle + module Main = Main +[@@deprecated "use picos structured concurrency or something else"] + +[@@@ocaml.alert "-deprecated"] + include Fiber include Main From 2dcc8583848ae8c6aafb2aa6e8eb65eaed7e05e4 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 9 Jul 2025 15:39:26 -0400 Subject: [PATCH 03/11] remove Atomic stubs, we're already depending on >4.12 --- src/private/atomic_.ml | 46 ------------------------------------------ 1 file changed, 46 deletions(-) delete mode 100644 src/private/atomic_.ml diff --git a/src/private/atomic_.ml b/src/private/atomic_.ml deleted file mode 100644 index 6983f1fb..00000000 --- a/src/private/atomic_.ml +++ /dev/null @@ -1,46 +0,0 @@ -[@@@ifge 4.12] - -include Atomic - -[@@@else_] - -type 'a t = { mutable x: 'a } - -let[@inline] make x = { x } -let[@inline] get { x } = x -let[@inline] set r x = r.x <- x - -let[@inline never] exchange r x = - (* atomic *) - let y = r.x in - r.x <- x; - (* atomic *) - y - -let[@inline never] compare_and_set r seen v = - (* atomic *) - if r.x == seen then ( - r.x <- v; - (* atomic *) - true - ) else - false - -let[@inline never] fetch_and_add r x = - (* atomic *) - let v = r.x in - r.x <- x + r.x; - (* atomic *) - v - -let[@inline never] incr r = - (* atomic *) - r.x <- 1 + r.x -(* atomic *) - -let[@inline never] decr r = - (* atomic *) - r.x <- r.x - 1 -(* atomic *) - -[@@@endif] From f8d5c564de18161a5445fdd3ce276b9a5d4e4f1b Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 9 Jul 2025 15:41:37 -0400 Subject: [PATCH 04/11] remove version-dependent preprocessor --- .github/workflows/gh-pages.yml | 2 +- .github/workflows/main.yml | 9 +-- dune-project | 2 +- src/core/chan.ml | 4 -- src/core/chan.mli | 4 -- src/core/dune | 5 +- src/core/fut.ml | 6 +- src/core/fut.mli | 4 -- src/core/moonpool.ml | 7 +- src/core/moonpool.mli | 6 +- src/core/worker_loop_.ml | 8 --- src/core/ws_pool.ml | 2 +- src/cpp/cpp.ml | 124 --------------------------------- src/cpp/dune | 6 -- src/dpool/dune | 3 - src/dpool/moonpool_dpool.ml | 14 ++-- src/private/domain_.ml | 16 ----- src/private/dune | 3 - src/private/ws_deque_.ml | 2 +- 19 files changed, 17 insertions(+), 210 deletions(-) delete mode 100644 src/cpp/cpp.ml delete mode 100644 src/cpp/dune diff --git a/.github/workflows/gh-pages.yml b/.github/workflows/gh-pages.yml index 82423cac..58a8bfe1 100644 --- a/.github/workflows/gh-pages.yml +++ b/.github/workflows/gh-pages.yml @@ -15,7 +15,7 @@ jobs: - name: Use OCaml uses: ocaml/setup-ocaml@v3 with: - ocaml-compiler: '5.0' + ocaml-compiler: '5.3' dune-cache: true allow-prerelease-opam: true diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 894b13ba..6b97c607 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -16,8 +16,8 @@ jobs: os: - ubuntu-latest ocaml-compiler: - - '4.14' - - '5.2' + - '5.0' + - '5.3' runs-on: ${{ matrix.os }} steps: @@ -32,15 +32,10 @@ jobs: - run: opam pin picos 0.6.0 -y -n - run: opam install -t moonpool moonpool-lwt moonpool-io --deps-only - if: matrix.ocaml-compiler == '5.2' - - run: opam install -t moonpool --deps-only - if: matrix.ocaml-compiler != '5.2' - run: opam exec -- dune build @install # install some depopts - run: opam install thread-local-storage trace hmap - if: matrix.ocaml-compiler == '5.2' - - run: opam exec -- dune build --profile=release --force @install @runtest compat: diff --git a/dune-project b/dune-project index 7f1de178..ceae9ab2 100644 --- a/dune-project +++ b/dune-project @@ -16,7 +16,7 @@ (name moonpool) (synopsis "Pools of threads supported by a pool of domains") (depends - (ocaml (>= 4.14)) + (ocaml (>= 5.0)) dune (either (>= 1.0)) (trace :with-test) diff --git a/src/core/chan.ml b/src/core/chan.ml index be4ac3b9..57c4b4b5 100644 --- a/src/core/chan.ml +++ b/src/core/chan.ml @@ -70,8 +70,6 @@ let close (self : _ t) : unit = Mutex.unlock self.mutex; Queue.iter Trigger.signal q -[@@@ifge 5.0] - let rec push (self : _ t) x : unit = Mutex.lock self.mutex; @@ -120,5 +118,3 @@ let rec pop (self : 'a t) : 'a = Mutex.unlock self.mutex; Trigger.await_exn tr; pop self - -[@@@endif] diff --git a/src/core/chan.mli b/src/core/chan.mli index a4898069..8d0a10b5 100644 --- a/src/core/chan.mli +++ b/src/core/chan.mli @@ -28,8 +28,6 @@ val close : _ t -> unit (** Close the channel. Further push and pop calls will fail. This is idempotent. *) -[@@@ifge 5.0] - val push : 'a t -> 'a -> unit (** Push the value into the channel, suspending the current task if the channel is currently full. @@ -48,5 +46,3 @@ val pop_block_exn : 'a t -> 'a The precautions around blocking from inside a thread pool are the same as explained in {!Fut.wait_block}. *) *) - -[@@@endif] diff --git a/src/core/dune b/src/core/dune index 015e9ce6..1be87695 100644 --- a/src/core/dune +++ b/src/core/dune @@ -12,7 +12,4 @@ moonpool.dpool (re_export picos)) (flags :standard -open Moonpool_private) - (private_modules util_pool_) - (preprocess - (action - (run %{project_root}/src/cpp/cpp.exe %{input-file})))) + (private_modules util_pool_)) diff --git a/src/core/fut.ml b/src/core/fut.ml index 415e68ab..715ba560 100644 --- a/src/core/fut.ml +++ b/src/core/fut.ml @@ -1,4 +1,4 @@ -module A = Atomic_ +module A = Atomic module C = Picos.Computation type 'a or_error = ('a, Exn_bt.t) result @@ -424,8 +424,6 @@ let wait_block self = let bt = Printexc.get_raw_backtrace () in Error (Exn_bt.make exn bt) -[@@@ifge 5.0] - let await (self : 'a t) : 'a = (* fast path: peek *) match C.peek_exn self with @@ -439,8 +437,6 @@ let await (self : 'a t) : 'a = (* un-suspended: we should have a result! *) get_or_fail_exn self -[@@@endif] - module Infix = struct let[@inline] ( >|= ) x f = map ~f x let[@inline] ( >>= ) x f = bind ~f x diff --git a/src/core/fut.mli b/src/core/fut.mli index 45c800df..073e393f 100644 --- a/src/core/fut.mli +++ b/src/core/fut.mli @@ -236,8 +236,6 @@ val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t {b NOTE} This is only available on OCaml 5. *) -[@@@ifge 5.0] - val await : 'a t -> 'a (** [await fut] suspends the current tasks until [fut] is fulfilled, then resumes the task on this same runner (but possibly on a different @@ -248,8 +246,6 @@ val await : 'a t -> 'a This must only be run from inside the runner itself. The runner must support {!Suspend_}. {b NOTE}: only on OCaml 5.x *) -[@@@endif] - (** {2 Blocking} *) val wait_block : 'a t -> 'a or_error diff --git a/src/core/moonpool.ml b/src/core/moonpool.ml index d9f72b51..6b9aebe0 100644 --- a/src/core/moonpool.ml +++ b/src/core/moonpool.ml @@ -12,15 +12,10 @@ let get_current_runner = Runner.get_current_runner let recommended_thread_count () = Domain_.recommended_number () let spawn = Fut.spawn let spawn_on_current_runner = Fut.spawn_on_current_runner - -[@@@ifge 5.0] - let await = Fut.await let yield = Picos.Fiber.yield -[@@@endif] - -module Atomic = Atomic_ +module Atomic = Atomic module Blocking_queue = Bb_queue module Background_thread = Background_thread module Chan = Chan diff --git a/src/core/moonpool.mli b/src/core/moonpool.mli index 3a5f5eef..0e80fa2f 100644 --- a/src/core/moonpool.mli +++ b/src/core/moonpool.mli @@ -72,8 +72,6 @@ val get_current_runner : unit -> Runner.t option (** See {!Runner.get_current_runner} @since 0.7 *) -[@@@ifge 5.0] - val await : 'a Fut.t -> 'a (** Await a future, must be run on a moonpool runner. See {!Fut.await}. Only on OCaml >= 5.0. @@ -84,8 +82,6 @@ val yield : unit -> unit >= 5.0. @since NEXT_RELEASE *) -[@@@endif] - module Lock = Lock module Fut = Fut module Chan = Chan @@ -203,7 +199,7 @@ module Blocking_queue : sig @since 0.4 *) end -module Atomic = Atomic_ +module Atomic = Atomic (** Atomic values. This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic] diff --git a/src/core/worker_loop_.ml b/src/core/worker_loop_.ml index 51cd75cd..bd2cd5ca 100644 --- a/src/core/worker_loop_.ml +++ b/src/core/worker_loop_.ml @@ -33,8 +33,6 @@ type 'st ops = { (** A dummy task. *) let _dummy_task : task_full = T_start { f = ignore; fiber = _dummy_fiber } -[@@@ifge 5.0] - let[@inline] discontinue k exn = let bt = Printexc.get_raw_backtrace () in Effect.Deep.discontinue_with_backtrace k exn bt @@ -100,12 +98,6 @@ let with_handler (type st arg) ~(ops : st ops) (self : st) : let handler = Effect.Deep.{ retc = Fun.id; exnc = raise_with_bt; effc } in fun f -> Effect.Deep.match_with f () handler -[@@@else_] - -let with_handler ~ops:_ self f = f () - -[@@@endif] - let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit = if block_signals then ( try diff --git a/src/core/ws_pool.ml b/src/core/ws_pool.ml index 137e76cd..1b95cd16 100644 --- a/src/core/ws_pool.ml +++ b/src/core/ws_pool.ml @@ -1,5 +1,5 @@ open Types_ -module A = Atomic_ +module A = Atomic module WSQ = Ws_deque_ module WL = Worker_loop_ include Runner diff --git a/src/cpp/cpp.ml b/src/cpp/cpp.ml deleted file mode 100644 index b4df7a1a..00000000 --- a/src/cpp/cpp.ml +++ /dev/null @@ -1,124 +0,0 @@ -type op = - | Le - | Ge - -type line = - | If of op * int * int - | Elseif of op * int * int - | Else - | Endif - | Raw of string - | Eof - -let prefix ~pre s = - let len = String.length pre in - if len > String.length s then - false - else ( - let rec check i = - if i = len then - true - else if String.unsafe_get s i <> String.unsafe_get pre i then - false - else - check (i + 1) - in - check 0 - ) - -let eval ~major ~minor op i j = - match op with - | Le -> (major, minor) <= (i, j) - | Ge -> (major, minor) >= (i, j) - -let preproc_lines ~file ~major ~minor (ic : in_channel) : unit = - let pos = ref 0 in - let fail msg = - failwith (Printf.sprintf "at line %d in '%s': %s" !pos file msg) - in - let pp_pos () = Printf.printf "#%d %S\n" !pos file in - - let parse_line () : line = - match input_line ic with - | exception End_of_file -> Eof - | line -> - let line' = String.trim line in - incr pos; - if line' <> "" && line'.[0] = '[' then - if prefix line' ~pre:"[@@@ifle" then - Scanf.sscanf line' "[@@@ifle %d.%d]" (fun x y -> If (Le, x, y)) - else if prefix line' ~pre:"[@@@ifge" then - Scanf.sscanf line' "[@@@ifge %d.%d]" (fun x y -> If (Ge, x, y)) - else if prefix line' ~pre:"[@@@elifle" then - Scanf.sscanf line' "[@@@elifle %d.%d]" (fun x y -> Elseif (Le, x, y)) - else if prefix line' ~pre:"[@@@elifge" then - Scanf.sscanf line' "[@@@elifge %d.%d]" (fun x y -> Elseif (Ge, x, y)) - else if line' = "[@@@else_]" then - Else - else if line' = "[@@@endif]" then - Endif - else - Raw line - else - Raw line - in - - (* entry point *) - let rec top () = - match parse_line () with - | Eof -> () - | If (op, i, j) -> - if eval ~major ~minor op i j then ( - pp_pos (); - cat_block () - ) else - skip_block ~elseok:true () - | Raw s -> - print_endline s; - top () - | Elseif _ | Else | Endif -> fail "unexpected elseif|else|endif" - (* current block is the valid one *) - and cat_block () = - match parse_line () with - | Eof -> fail "unexpected EOF" - | If _ -> fail "nested if not supported" - | Raw s -> - print_endline s; - cat_block () - | Endif -> - pp_pos (); - top () - | Elseif _ | Else -> skip_block ~elseok:false () - (* skip current block. - @param elseok if true, we should evaluate "elseif" *) - and skip_block ~elseok () = - match parse_line () with - | Eof -> fail "unexpected EOF" - | If _ -> fail "nested if not supported" - | Raw _ -> skip_block ~elseok () - | Endif -> - pp_pos (); - top () - | Elseif (op, i, j) -> - if elseok && eval ~major ~minor op i j then ( - pp_pos (); - cat_block () - ) else - skip_block ~elseok () - | Else -> - if elseok then ( - pp_pos (); - cat_block () - ) else - skip_block ~elseok () - in - top () - -let () = - let file = Sys.argv.(1) in - let version = Sys.ocaml_version in - let major, minor = Scanf.sscanf version "%u.%u" (fun maj min -> maj, min) in - let ic = open_in file in - preproc_lines ~file ~major ~minor ic; - - () diff --git a/src/cpp/dune b/src/cpp/dune deleted file mode 100644 index c4c75e8b..00000000 --- a/src/cpp/dune +++ /dev/null @@ -1,6 +0,0 @@ -; our little preprocessor (ported from containers) - -(executable - (name cpp) - (modes - (best exe))) diff --git a/src/dpool/dune b/src/dpool/dune index 23c91d38..810a6097 100644 --- a/src/dpool/dune +++ b/src/dpool/dune @@ -2,8 +2,5 @@ (name moonpool_dpool) (public_name moonpool.dpool) (synopsis "Moonpool's domain pool (used to start worker threads)") - (preprocess - (action - (run %{project_root}/src/cpp/cpp.exe %{input-file}))) (flags :standard -open Moonpool_private) (libraries moonpool.private)) diff --git a/src/dpool/moonpool_dpool.ml b/src/dpool/moonpool_dpool.ml index 5f177362..42917c1a 100644 --- a/src/dpool/moonpool_dpool.ml +++ b/src/dpool/moonpool_dpool.ml @@ -71,7 +71,7 @@ type event = new threads for pools. *) type worker_state = { q: event Bb_queue.t; - th_count: int Atomic_.t; (** Number of threads on this *) + th_count: int Atomic.t; (** Number of threads on this *) } (** Array of (optional) workers. @@ -101,14 +101,14 @@ let work_ idx (st : worker_state) : unit = match Bb_queue.pop st.q with | Run f -> (try f () with _ -> ()) | Decr -> - if Atomic_.fetch_and_add st.th_count (-1) = 1 then ( + if Atomic.fetch_and_add st.th_count (-1) = 1 then ( continue := false; (* wait a bit, we might be needed again in a short amount of time *) try for _n_attempt = 1 to 50 do Thread.delay 0.001; - if Atomic_.get st.th_count > 0 then ( + if Atomic.get st.th_count > 0 then ( (* needed again! *) continue := true; raise Exit @@ -129,7 +129,7 @@ let work_ idx (st : worker_state) : unit = | Some _st', dom -> assert (st == _st'); - if Atomic_.get st.th_count > 0 then + if Atomic.get st.th_count > 0 then (* still alive! *) (Some st, dom), true else @@ -145,7 +145,7 @@ let work_ idx (st : worker_state) : unit = (* special case for main domain: we start a worker immediately *) let () = assert (Domain_.is_main_domain ()); - let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in + let w = { th_count = Atomic.make 1; q = Bb_queue.create () } in (* thread that stays alive *) ignore (Thread.create (fun () -> work_ 0 w) () : Thread.t); domains_.(0) <- Lock.create (Some w, None) @@ -157,12 +157,12 @@ let run_on (i : int) (f : unit -> unit) : unit = let w = Lock.update_map domains_.(i) (function | (Some w, _) as st -> - Atomic_.incr w.th_count; + Atomic.incr w.th_count; st, w | None, dying_dom -> (* join previous dying domain, to free its resources, if any *) Option.iter Domain_.join dying_dom; - let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in + let w = { th_count = Atomic.make 1; q = Bb_queue.create () } in let worker : domain = Domain_.spawn (fun () -> work_ i w) in (Some w, Some worker), w) in diff --git a/src/private/domain_.ml b/src/private/domain_.ml index 3050282f..f3605f39 100644 --- a/src/private/domain_.ml +++ b/src/private/domain_.ml @@ -1,4 +1,3 @@ -[@@@ifge 5.0] [@@@ocaml.alert "-unstable"] let recommended_number () = Domain.recommended_domain_count () @@ -10,18 +9,3 @@ let spawn : _ -> t = Domain.spawn let relax = Domain.cpu_relax let join = Domain.join let is_main_domain = Domain.is_main_domain - -[@@@ocaml.alert "+unstable"] -[@@@else_] - -let recommended_number () = 1 - -type t = Thread.t - -let get_id (self : t) : int = Thread.id self -let spawn f : t = Thread.create f () -let relax () = Thread.yield () -let join = Thread.join -let is_main_domain () = true - -[@@@endif] diff --git a/src/private/dune b/src/private/dune index 4555122c..75c918b8 100644 --- a/src/private/dune +++ b/src/private/dune @@ -2,9 +2,6 @@ (name moonpool_private) (public_name moonpool.private) (synopsis "Private internal utils for Moonpool (do not rely on)") - (preprocess - (action - (run %{project_root}/src/cpp/cpp.exe %{input-file}))) (libraries threads either diff --git a/src/private/ws_deque_.ml b/src/private/ws_deque_.ml index 368cc8b0..d54a3992 100644 --- a/src/private/ws_deque_.ml +++ b/src/private/ws_deque_.ml @@ -1,4 +1,4 @@ -module A = Atomic_ +module A = Atomic (* terminology: From f6ad345f3135dbd1c95c170b7ccce824ae0e55b3 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 9 Jul 2025 16:24:49 -0400 Subject: [PATCH 05/11] fib: remove preprocessor --- src/fib/dune | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/fib/dune b/src/fib/dune index 4c6594f8..412c420b 100644 --- a/src/fib/dune +++ b/src/fib/dune @@ -3,10 +3,4 @@ (public_name moonpool.fib) (synopsis "Fibers and structured concurrency for Moonpool") (libraries moonpool picos) - (enabled_if - (>= %{ocaml_version} 5.0)) - (flags :standard -open Moonpool_private -open Moonpool) - (optional) - (preprocess - (action - (run %{project_root}/src/cpp/cpp.exe %{input-file})))) + (flags :standard -open Moonpool_private -open Moonpool)) From 50a44a76e1f13a85a07e4a5f9f728f20d841bbef Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 9 Jul 2025 16:25:03 -0400 Subject: [PATCH 06/11] forkjoin not longer optional --- src/forkjoin/dune | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/forkjoin/dune b/src/forkjoin/dune index 84849c9b..cc6629f2 100644 --- a/src/forkjoin/dune +++ b/src/forkjoin/dune @@ -4,6 +4,4 @@ (synopsis "Fork-join parallelism for moonpool") (flags :standard -open Moonpool) (optional) - (enabled_if - (>= %{ocaml_version} 5.0)) (libraries moonpool moonpool.private picos)) From 41561c3bff5afdf5d9d15e9dd1b127502a6574ab Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 9 Jul 2025 16:25:10 -0400 Subject: [PATCH 07/11] deprecated moonpool_io --- src/io/moonpool_io.ml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/io/moonpool_io.ml b/src/io/moonpool_io.ml index a08d5f6f..c4a81c90 100644 --- a/src/io/moonpool_io.ml +++ b/src/io/moonpool_io.ml @@ -1,3 +1,5 @@ +[@@@deprecated "just use lwt or eio or something else"] + module Fd = Picos_io_fd module Unix = Picos_io.Unix module Select = Picos_io_select From 0ab99517d50639e62dc9bcc9dd2ee3f04f3e2753 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 9 Jul 2025 16:41:05 -0400 Subject: [PATCH 08/11] benchs: no preprocessor anymore --- benchs/dune | 3 --- benchs/pi.ml | 9 --------- 2 files changed, 12 deletions(-) diff --git a/benchs/dune b/benchs/dune index 6d2ec5ff..909b0a26 100644 --- a/benchs/dune +++ b/benchs/dune @@ -1,6 +1,3 @@ (executables (names fib_rec pi primes) - (preprocess - (action - (run %{project_root}/src/cpp/cpp.exe %{input-file}))) (libraries moonpool moonpool.forkjoin unix trace trace-tef domainslib)) diff --git a/benchs/pi.ml b/benchs/pi.ml index e9d333f5..2f63c45b 100644 --- a/benchs/pi.ml +++ b/benchs/pi.ml @@ -66,8 +66,6 @@ let run_par1 ~kind (num_steps : int) : float = let pi = step *. Lock.get global_sum in pi -[@@@ifge 5.0] - let run_fork_join ~kind num_steps : float = let@ pool = with_pool ~kind () in @@ -92,13 +90,6 @@ let run_fork_join ~kind num_steps : float = let pi = step *. Lock.get global_sum in pi -[@@@else_] - -let run_fork_join _ = - failwith "fork join not available on this version of OCaml" - -[@@@endif] - type mode = | Sequential | Par1 From b9bbcf82f7c3293089654ae4e079608a2e0c052e Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 9 Jul 2025 16:43:51 -0400 Subject: [PATCH 09/11] test do not need preprocessor anymore --- test/effect-based/dune | 3 --- test/effect-based/t_fib1.ml | 4 ---- test/effect-based/t_fib_fork_join.ml | 4 ---- test/effect-based/t_fib_fork_join_all.ml | 4 ---- test/effect-based/t_fork_join.ml | 4 ---- test/effect-based/t_fork_join_heavy.ml | 4 ---- test/effect-based/t_futs1.ml | 4 ---- test/effect-based/t_many.ml | 4 ---- test/effect-based/t_sort.ml | 4 ---- 9 files changed, 35 deletions(-) diff --git a/test/effect-based/dune b/test/effect-based/dune index faa9254d..393ca576 100644 --- a/test/effect-based/dune +++ b/test/effect-based/dune @@ -9,9 +9,6 @@ t_sort t_fork_join t_fork_join_heavy) - (preprocess - (action - (run %{project_root}/src/cpp/cpp.exe %{input-file}))) (enabled_if (and (= %{system} "linux") diff --git a/test/effect-based/t_fib1.ml b/test/effect-based/t_fib1.ml index a2f62e82..e856a08a 100644 --- a/test/effect-based/t_fib1.ml +++ b/test/effect-based/t_fib1.ml @@ -1,5 +1,3 @@ -[@@@ifge 5.0] - open Moonpool let ( let@ ) = ( @@ ) @@ -56,5 +54,3 @@ let main () = let () = let@ () = Trace_tef.with_setup () in main () - -[@@@endif] diff --git a/test/effect-based/t_fib_fork_join.ml b/test/effect-based/t_fib_fork_join.ml index 25e7d49d..b514e91e 100644 --- a/test/effect-based/t_fib_fork_join.ml +++ b/test/effect-based/t_fib_fork_join.ml @@ -1,5 +1,3 @@ -[@@@ifge 5.0] - open Moonpool module FJ = Moonpool_forkjoin @@ -52,5 +50,3 @@ let () = (* now make sure we can do this with multiple pools in parallel *) let jobs = Array.init 2 (fun _ -> Thread.create run_test ()) in Array.iter Thread.join jobs - -[@@@endif] diff --git a/test/effect-based/t_fib_fork_join_all.ml b/test/effect-based/t_fib_fork_join_all.ml index f80670ca..ac58d32e 100644 --- a/test/effect-based/t_fib_fork_join_all.ml +++ b/test/effect-based/t_fib_fork_join_all.ml @@ -1,5 +1,3 @@ -[@@@ifge 5.0] - let ( let@ ) = ( @@ ) open Moonpool @@ -44,5 +42,3 @@ let () = (* now make sure we can do this with multiple pools in parallel *) let jobs = Array.init 2 (fun _ -> Thread.create run_test ()) in Array.iter Thread.join jobs - -[@@@endif] diff --git a/test/effect-based/t_fork_join.ml b/test/effect-based/t_fork_join.ml index 83c291ab..5000c23c 100644 --- a/test/effect-based/t_fork_join.ml +++ b/test/effect-based/t_fork_join.ml @@ -1,5 +1,3 @@ -[@@@ifge 5.0] - let spf = Printf.sprintf let ( let@ ) = ( @@ ) @@ -328,5 +326,3 @@ let () = t_for_nested ~min:1 ~chunk_size:100 (); t_for_nested ~min:4 ~chunk_size:100 (); ] - -[@@@endif] diff --git a/test/effect-based/t_fork_join_heavy.ml b/test/effect-based/t_fork_join_heavy.ml index 0b35db1f..69228f5f 100644 --- a/test/effect-based/t_fork_join_heavy.ml +++ b/test/effect-based/t_fork_join_heavy.ml @@ -1,5 +1,3 @@ -[@@@ifge 5.0] - module Q = QCheck let spf = Printf.sprintf @@ -52,5 +50,3 @@ let () = run ~min:4 (); run ~min:1 (); Printf.printf "done\n%!" - -[@@@endif] diff --git a/test/effect-based/t_futs1.ml b/test/effect-based/t_futs1.ml index 4df18226..ef5cba54 100644 --- a/test/effect-based/t_futs1.ml +++ b/test/effect-based/t_futs1.ml @@ -1,5 +1,3 @@ -[@@@ifge 5.0] - open! Moonpool let pool = Ws_pool.create ~num_threads:4 () @@ -53,5 +51,3 @@ let () = in let fut = Fut.both f1 f2 in assert (Fut.wait_block fut = Ok (2, 20)) - -[@@@endif] diff --git a/test/effect-based/t_many.ml b/test/effect-based/t_many.ml index b4a2c8da..fc808c5a 100644 --- a/test/effect-based/t_many.ml +++ b/test/effect-based/t_many.ml @@ -1,5 +1,3 @@ -[@@@ifge 5.0] - open Moonpool let ( let@ ) = ( @@ ) @@ -44,5 +42,3 @@ let () = run ~pool ()); () - -[@@@endif] diff --git a/test/effect-based/t_sort.ml b/test/effect-based/t_sort.ml index f0da71b8..cc152a00 100644 --- a/test/effect-based/t_sort.ml +++ b/test/effect-based/t_sort.ml @@ -1,5 +1,3 @@ -[@@@ifge 5.0] - open Moonpool module FJ = Moonpool_forkjoin @@ -69,5 +67,3 @@ let () = (* Printf.printf "arr: [%s]\n%!" *) (* (String.concat ", " @@ List.map string_of_int @@ Array.to_list arr); *) assert (sorted arr) - -[@@@endif] From 2c1def188ac5bcaef8507ac0078acad68bc09aae Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 9 Jul 2025 16:44:12 -0400 Subject: [PATCH 10/11] breaking: require OCaml 5 --- moonpool.opam | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moonpool.opam b/moonpool.opam index feb9208e..46fb6b69 100644 --- a/moonpool.opam +++ b/moonpool.opam @@ -9,7 +9,7 @@ tags: ["thread" "pool" "domain" "futures" "fork-join"] homepage: "https://github.com/c-cube/moonpool" bug-reports: "https://github.com/c-cube/moonpool/issues" depends: [ - "ocaml" {>= "4.14"} + "ocaml" {>= "5.0"} "dune" {>= "3.0"} "either" {>= "1.0"} "trace" {with-test} From 1a64e7345e7837fda79b7fa4eecc1cd0143b7f9f Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 26 Sep 2025 14:54:50 -0400 Subject: [PATCH 11/11] Revert "deprecate fibers" This reverts commit 83acc18d3d21f7bf4f24f4f9c3ceec21b7080306. --- src/fib/moonpool_fib.ml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/fib/moonpool_fib.ml b/src/fib/moonpool_fib.ml index 0a3b7360..ec89c075 100644 --- a/src/fib/moonpool_fib.ml +++ b/src/fib/moonpool_fib.ml @@ -5,15 +5,8 @@ @since 0.6. *) module Fiber = Fiber -[@@deprecated "use picos structured concurrency or something else"] - module Fls = Fls module Handle = Handle - module Main = Main -[@@deprecated "use picos structured concurrency or something else"] - -[@@@ocaml.alert "-deprecated"] - include Fiber include Main