diff --git a/.github/workflows/gh-pages.yml b/.github/workflows/gh-pages.yml index 82423cac..58a8bfe1 100644 --- a/.github/workflows/gh-pages.yml +++ b/.github/workflows/gh-pages.yml @@ -15,7 +15,7 @@ jobs: - name: Use OCaml uses: ocaml/setup-ocaml@v3 with: - ocaml-compiler: '5.0' + ocaml-compiler: '5.3' dune-cache: true allow-prerelease-opam: true diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 894b13ba..6b97c607 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -16,8 +16,8 @@ jobs: os: - ubuntu-latest ocaml-compiler: - - '4.14' - - '5.2' + - '5.0' + - '5.3' runs-on: ${{ matrix.os }} steps: @@ -32,15 +32,10 @@ jobs: - run: opam pin picos 0.6.0 -y -n - run: opam install -t moonpool moonpool-lwt moonpool-io --deps-only - if: matrix.ocaml-compiler == '5.2' - - run: opam install -t moonpool --deps-only - if: matrix.ocaml-compiler != '5.2' - run: opam exec -- dune build @install # install some depopts - run: opam install thread-local-storage trace hmap - if: matrix.ocaml-compiler == '5.2' - - run: opam exec -- dune build --profile=release --force @install @runtest compat: diff --git a/dune-project b/dune-project index 7f1de178..ceae9ab2 100644 --- a/dune-project +++ b/dune-project @@ -16,7 +16,7 @@ (name moonpool) (synopsis "Pools of threads supported by a pool of domains") (depends - (ocaml (>= 4.14)) + (ocaml (>= 5.0)) dune (either (>= 1.0)) (trace :with-test) diff --git a/src/core/chan.ml b/src/core/chan.ml index be4ac3b9..57c4b4b5 100644 --- a/src/core/chan.ml +++ b/src/core/chan.ml @@ -70,8 +70,6 @@ let close (self : _ t) : unit = Mutex.unlock self.mutex; Queue.iter Trigger.signal q -[@@@ifge 5.0] - let rec push (self : _ t) x : unit = Mutex.lock self.mutex; @@ -120,5 +118,3 @@ let rec pop (self : 'a t) : 'a = Mutex.unlock self.mutex; Trigger.await_exn tr; pop self - -[@@@endif] diff --git a/src/core/chan.mli b/src/core/chan.mli index a4898069..8d0a10b5 100644 --- a/src/core/chan.mli +++ b/src/core/chan.mli @@ -28,8 +28,6 @@ val close : _ t -> unit (** Close the channel. Further push and pop calls will fail. This is idempotent. *) -[@@@ifge 5.0] - val push : 'a t -> 'a -> unit (** Push the value into the channel, suspending the current task if the channel is currently full. @@ -48,5 +46,3 @@ val pop_block_exn : 'a t -> 'a The precautions around blocking from inside a thread pool are the same as explained in {!Fut.wait_block}. *) *) - -[@@@endif] diff --git a/src/core/dune b/src/core/dune index 015e9ce6..1be87695 100644 --- a/src/core/dune +++ b/src/core/dune @@ -12,7 +12,4 @@ moonpool.dpool (re_export picos)) (flags :standard -open Moonpool_private) - (private_modules util_pool_) - (preprocess - (action - (run %{project_root}/src/cpp/cpp.exe %{input-file})))) + (private_modules util_pool_)) diff --git a/src/core/fut.ml b/src/core/fut.ml index 415e68ab..715ba560 100644 --- a/src/core/fut.ml +++ b/src/core/fut.ml @@ -1,4 +1,4 @@ -module A = Atomic_ +module A = Atomic module C = Picos.Computation type 'a or_error = ('a, Exn_bt.t) result @@ -424,8 +424,6 @@ let wait_block self = let bt = Printexc.get_raw_backtrace () in Error (Exn_bt.make exn bt) -[@@@ifge 5.0] - let await (self : 'a t) : 'a = (* fast path: peek *) match C.peek_exn self with @@ -439,8 +437,6 @@ let await (self : 'a t) : 'a = (* un-suspended: we should have a result! *) get_or_fail_exn self -[@@@endif] - module Infix = struct let[@inline] ( >|= ) x f = map ~f x let[@inline] ( >>= ) x f = bind ~f x diff --git a/src/core/fut.mli b/src/core/fut.mli index 45c800df..073e393f 100644 --- a/src/core/fut.mli +++ b/src/core/fut.mli @@ -236,8 +236,6 @@ val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t {b NOTE} This is only available on OCaml 5. *) -[@@@ifge 5.0] - val await : 'a t -> 'a (** [await fut] suspends the current tasks until [fut] is fulfilled, then resumes the task on this same runner (but possibly on a different @@ -248,8 +246,6 @@ val await : 'a t -> 'a This must only be run from inside the runner itself. The runner must support {!Suspend_}. {b NOTE}: only on OCaml 5.x *) -[@@@endif] - (** {2 Blocking} *) val wait_block : 'a t -> 'a or_error diff --git a/src/core/moonpool.ml b/src/core/moonpool.ml index d9f72b51..6b9aebe0 100644 --- a/src/core/moonpool.ml +++ b/src/core/moonpool.ml @@ -12,15 +12,10 @@ let get_current_runner = Runner.get_current_runner let recommended_thread_count () = Domain_.recommended_number () let spawn = Fut.spawn let spawn_on_current_runner = Fut.spawn_on_current_runner - -[@@@ifge 5.0] - let await = Fut.await let yield = Picos.Fiber.yield -[@@@endif] - -module Atomic = Atomic_ +module Atomic = Atomic module Blocking_queue = Bb_queue module Background_thread = Background_thread module Chan = Chan diff --git a/src/core/moonpool.mli b/src/core/moonpool.mli index 3a5f5eef..0e80fa2f 100644 --- a/src/core/moonpool.mli +++ b/src/core/moonpool.mli @@ -72,8 +72,6 @@ val get_current_runner : unit -> Runner.t option (** See {!Runner.get_current_runner} @since 0.7 *) -[@@@ifge 5.0] - val await : 'a Fut.t -> 'a (** Await a future, must be run on a moonpool runner. See {!Fut.await}. Only on OCaml >= 5.0. @@ -84,8 +82,6 @@ val yield : unit -> unit >= 5.0. @since NEXT_RELEASE *) -[@@@endif] - module Lock = Lock module Fut = Fut module Chan = Chan @@ -203,7 +199,7 @@ module Blocking_queue : sig @since 0.4 *) end -module Atomic = Atomic_ +module Atomic = Atomic (** Atomic values. This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic] diff --git a/src/core/worker_loop_.ml b/src/core/worker_loop_.ml index 51cd75cd..bd2cd5ca 100644 --- a/src/core/worker_loop_.ml +++ b/src/core/worker_loop_.ml @@ -33,8 +33,6 @@ type 'st ops = { (** A dummy task. *) let _dummy_task : task_full = T_start { f = ignore; fiber = _dummy_fiber } -[@@@ifge 5.0] - let[@inline] discontinue k exn = let bt = Printexc.get_raw_backtrace () in Effect.Deep.discontinue_with_backtrace k exn bt @@ -100,12 +98,6 @@ let with_handler (type st arg) ~(ops : st ops) (self : st) : let handler = Effect.Deep.{ retc = Fun.id; exnc = raise_with_bt; effc } in fun f -> Effect.Deep.match_with f () handler -[@@@else_] - -let with_handler ~ops:_ self f = f () - -[@@@endif] - let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit = if block_signals then ( try diff --git a/src/core/ws_pool.ml b/src/core/ws_pool.ml index 137e76cd..1b95cd16 100644 --- a/src/core/ws_pool.ml +++ b/src/core/ws_pool.ml @@ -1,5 +1,5 @@ open Types_ -module A = Atomic_ +module A = Atomic module WSQ = Ws_deque_ module WL = Worker_loop_ include Runner diff --git a/src/cpp/cpp.ml b/src/cpp/cpp.ml deleted file mode 100644 index b4df7a1a..00000000 --- a/src/cpp/cpp.ml +++ /dev/null @@ -1,124 +0,0 @@ -type op = - | Le - | Ge - -type line = - | If of op * int * int - | Elseif of op * int * int - | Else - | Endif - | Raw of string - | Eof - -let prefix ~pre s = - let len = String.length pre in - if len > String.length s then - false - else ( - let rec check i = - if i = len then - true - else if String.unsafe_get s i <> String.unsafe_get pre i then - false - else - check (i + 1) - in - check 0 - ) - -let eval ~major ~minor op i j = - match op with - | Le -> (major, minor) <= (i, j) - | Ge -> (major, minor) >= (i, j) - -let preproc_lines ~file ~major ~minor (ic : in_channel) : unit = - let pos = ref 0 in - let fail msg = - failwith (Printf.sprintf "at line %d in '%s': %s" !pos file msg) - in - let pp_pos () = Printf.printf "#%d %S\n" !pos file in - - let parse_line () : line = - match input_line ic with - | exception End_of_file -> Eof - | line -> - let line' = String.trim line in - incr pos; - if line' <> "" && line'.[0] = '[' then - if prefix line' ~pre:"[@@@ifle" then - Scanf.sscanf line' "[@@@ifle %d.%d]" (fun x y -> If (Le, x, y)) - else if prefix line' ~pre:"[@@@ifge" then - Scanf.sscanf line' "[@@@ifge %d.%d]" (fun x y -> If (Ge, x, y)) - else if prefix line' ~pre:"[@@@elifle" then - Scanf.sscanf line' "[@@@elifle %d.%d]" (fun x y -> Elseif (Le, x, y)) - else if prefix line' ~pre:"[@@@elifge" then - Scanf.sscanf line' "[@@@elifge %d.%d]" (fun x y -> Elseif (Ge, x, y)) - else if line' = "[@@@else_]" then - Else - else if line' = "[@@@endif]" then - Endif - else - Raw line - else - Raw line - in - - (* entry point *) - let rec top () = - match parse_line () with - | Eof -> () - | If (op, i, j) -> - if eval ~major ~minor op i j then ( - pp_pos (); - cat_block () - ) else - skip_block ~elseok:true () - | Raw s -> - print_endline s; - top () - | Elseif _ | Else | Endif -> fail "unexpected elseif|else|endif" - (* current block is the valid one *) - and cat_block () = - match parse_line () with - | Eof -> fail "unexpected EOF" - | If _ -> fail "nested if not supported" - | Raw s -> - print_endline s; - cat_block () - | Endif -> - pp_pos (); - top () - | Elseif _ | Else -> skip_block ~elseok:false () - (* skip current block. - @param elseok if true, we should evaluate "elseif" *) - and skip_block ~elseok () = - match parse_line () with - | Eof -> fail "unexpected EOF" - | If _ -> fail "nested if not supported" - | Raw _ -> skip_block ~elseok () - | Endif -> - pp_pos (); - top () - | Elseif (op, i, j) -> - if elseok && eval ~major ~minor op i j then ( - pp_pos (); - cat_block () - ) else - skip_block ~elseok () - | Else -> - if elseok then ( - pp_pos (); - cat_block () - ) else - skip_block ~elseok () - in - top () - -let () = - let file = Sys.argv.(1) in - let version = Sys.ocaml_version in - let major, minor = Scanf.sscanf version "%u.%u" (fun maj min -> maj, min) in - let ic = open_in file in - preproc_lines ~file ~major ~minor ic; - - () diff --git a/src/cpp/dune b/src/cpp/dune deleted file mode 100644 index c4c75e8b..00000000 --- a/src/cpp/dune +++ /dev/null @@ -1,6 +0,0 @@ -; our little preprocessor (ported from containers) - -(executable - (name cpp) - (modes - (best exe))) diff --git a/src/dpool/dune b/src/dpool/dune index 23c91d38..810a6097 100644 --- a/src/dpool/dune +++ b/src/dpool/dune @@ -2,8 +2,5 @@ (name moonpool_dpool) (public_name moonpool.dpool) (synopsis "Moonpool's domain pool (used to start worker threads)") - (preprocess - (action - (run %{project_root}/src/cpp/cpp.exe %{input-file}))) (flags :standard -open Moonpool_private) (libraries moonpool.private)) diff --git a/src/dpool/moonpool_dpool.ml b/src/dpool/moonpool_dpool.ml index 5f177362..42917c1a 100644 --- a/src/dpool/moonpool_dpool.ml +++ b/src/dpool/moonpool_dpool.ml @@ -71,7 +71,7 @@ type event = new threads for pools. *) type worker_state = { q: event Bb_queue.t; - th_count: int Atomic_.t; (** Number of threads on this *) + th_count: int Atomic.t; (** Number of threads on this *) } (** Array of (optional) workers. @@ -101,14 +101,14 @@ let work_ idx (st : worker_state) : unit = match Bb_queue.pop st.q with | Run f -> (try f () with _ -> ()) | Decr -> - if Atomic_.fetch_and_add st.th_count (-1) = 1 then ( + if Atomic.fetch_and_add st.th_count (-1) = 1 then ( continue := false; (* wait a bit, we might be needed again in a short amount of time *) try for _n_attempt = 1 to 50 do Thread.delay 0.001; - if Atomic_.get st.th_count > 0 then ( + if Atomic.get st.th_count > 0 then ( (* needed again! *) continue := true; raise Exit @@ -129,7 +129,7 @@ let work_ idx (st : worker_state) : unit = | Some _st', dom -> assert (st == _st'); - if Atomic_.get st.th_count > 0 then + if Atomic.get st.th_count > 0 then (* still alive! *) (Some st, dom), true else @@ -145,7 +145,7 @@ let work_ idx (st : worker_state) : unit = (* special case for main domain: we start a worker immediately *) let () = assert (Domain_.is_main_domain ()); - let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in + let w = { th_count = Atomic.make 1; q = Bb_queue.create () } in (* thread that stays alive *) ignore (Thread.create (fun () -> work_ 0 w) () : Thread.t); domains_.(0) <- Lock.create (Some w, None) @@ -157,12 +157,12 @@ let run_on (i : int) (f : unit -> unit) : unit = let w = Lock.update_map domains_.(i) (function | (Some w, _) as st -> - Atomic_.incr w.th_count; + Atomic.incr w.th_count; st, w | None, dying_dom -> (* join previous dying domain, to free its resources, if any *) Option.iter Domain_.join dying_dom; - let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in + let w = { th_count = Atomic.make 1; q = Bb_queue.create () } in let worker : domain = Domain_.spawn (fun () -> work_ i w) in (Some w, Some worker), w) in diff --git a/src/private/domain_.ml b/src/private/domain_.ml index 3050282f..f3605f39 100644 --- a/src/private/domain_.ml +++ b/src/private/domain_.ml @@ -1,4 +1,3 @@ -[@@@ifge 5.0] [@@@ocaml.alert "-unstable"] let recommended_number () = Domain.recommended_domain_count () @@ -10,18 +9,3 @@ let spawn : _ -> t = Domain.spawn let relax = Domain.cpu_relax let join = Domain.join let is_main_domain = Domain.is_main_domain - -[@@@ocaml.alert "+unstable"] -[@@@else_] - -let recommended_number () = 1 - -type t = Thread.t - -let get_id (self : t) : int = Thread.id self -let spawn f : t = Thread.create f () -let relax () = Thread.yield () -let join = Thread.join -let is_main_domain () = true - -[@@@endif] diff --git a/src/private/dune b/src/private/dune index 4555122c..75c918b8 100644 --- a/src/private/dune +++ b/src/private/dune @@ -2,9 +2,6 @@ (name moonpool_private) (public_name moonpool.private) (synopsis "Private internal utils for Moonpool (do not rely on)") - (preprocess - (action - (run %{project_root}/src/cpp/cpp.exe %{input-file}))) (libraries threads either diff --git a/src/private/ws_deque_.ml b/src/private/ws_deque_.ml index 368cc8b0..d54a3992 100644 --- a/src/private/ws_deque_.ml +++ b/src/private/ws_deque_.ml @@ -1,4 +1,4 @@ -module A = Atomic_ +module A = Atomic (* terminology: