From b07d460b3f1cfce24c646ad532e2286c502d91fa Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sun, 9 Jul 2023 16:13:44 -0400 Subject: [PATCH] port `cpp.ml` from containers, replace previous codegen with it now OCaml 5-only features are truly available only on OCaml 5, instead of just relying on the user reading the docstring. --- src/atomic_.ml | 46 ++++++++++++++ src/cpp/cpp.ml | 124 +++++++++++++++++++++++++++++++++++++ src/cpp/dune | 5 ++ src/domain_.ml | 23 +++++++ src/dune | 21 +------ src/fork_join.ml | 10 ++- src/fork_join.mli | 6 ++ src/fut.ml | 6 +- src/fut.mli | 6 +- src/gen/dune | 3 - src/gen/gen.ml | 154 ---------------------------------------------- src/pool.ml | 2 +- src/suspend_.ml | 36 +++++++++++ src/suspend_.mli | 15 ++++- 14 files changed, 275 insertions(+), 182 deletions(-) create mode 100644 src/atomic_.ml create mode 100644 src/cpp/cpp.ml create mode 100644 src/cpp/dune create mode 100644 src/domain_.ml delete mode 100644 src/gen/dune delete mode 100644 src/gen/gen.ml create mode 100644 src/suspend_.ml diff --git a/src/atomic_.ml b/src/atomic_.ml new file mode 100644 index 00000000..6983f1fb --- /dev/null +++ b/src/atomic_.ml @@ -0,0 +1,46 @@ +[@@@ifge 4.12] + +include Atomic + +[@@@else_] + +type 'a t = { mutable x: 'a } + +let[@inline] make x = { x } +let[@inline] get { x } = x +let[@inline] set r x = r.x <- x + +let[@inline never] exchange r x = + (* atomic *) + let y = r.x in + r.x <- x; + (* atomic *) + y + +let[@inline never] compare_and_set r seen v = + (* atomic *) + if r.x == seen then ( + r.x <- v; + (* atomic *) + true + ) else + false + +let[@inline never] fetch_and_add r x = + (* atomic *) + let v = r.x in + r.x <- x + r.x; + (* atomic *) + v + +let[@inline never] incr r = + (* atomic *) + r.x <- 1 + r.x +(* atomic *) + +let[@inline never] decr r = + (* atomic *) + r.x <- r.x - 1 +(* atomic *) + +[@@@endif] diff --git a/src/cpp/cpp.ml b/src/cpp/cpp.ml new file mode 100644 index 00000000..b4df7a1a --- /dev/null +++ b/src/cpp/cpp.ml @@ -0,0 +1,124 @@ +type op = + | Le + | Ge + +type line = + | If of op * int * int + | Elseif of op * int * int + | Else + | Endif + | Raw of string + | Eof + +let prefix ~pre s = + let len = String.length pre in + if len > String.length s then + false + else ( + let rec check i = + if i = len then + true + else if String.unsafe_get s i <> String.unsafe_get pre i then + false + else + check (i + 1) + in + check 0 + ) + +let eval ~major ~minor op i j = + match op with + | Le -> (major, minor) <= (i, j) + | Ge -> (major, minor) >= (i, j) + +let preproc_lines ~file ~major ~minor (ic : in_channel) : unit = + let pos = ref 0 in + let fail msg = + failwith (Printf.sprintf "at line %d in '%s': %s" !pos file msg) + in + let pp_pos () = Printf.printf "#%d %S\n" !pos file in + + let parse_line () : line = + match input_line ic with + | exception End_of_file -> Eof + | line -> + let line' = String.trim line in + incr pos; + if line' <> "" && line'.[0] = '[' then + if prefix line' ~pre:"[@@@ifle" then + Scanf.sscanf line' "[@@@ifle %d.%d]" (fun x y -> If (Le, x, y)) + else if prefix line' ~pre:"[@@@ifge" then + Scanf.sscanf line' "[@@@ifge %d.%d]" (fun x y -> If (Ge, x, y)) + else if prefix line' ~pre:"[@@@elifle" then + Scanf.sscanf line' "[@@@elifle %d.%d]" (fun x y -> Elseif (Le, x, y)) + else if prefix line' ~pre:"[@@@elifge" then + Scanf.sscanf line' "[@@@elifge %d.%d]" (fun x y -> Elseif (Ge, x, y)) + else if line' = "[@@@else_]" then + Else + else if line' = "[@@@endif]" then + Endif + else + Raw line + else + Raw line + in + + (* entry point *) + let rec top () = + match parse_line () with + | Eof -> () + | If (op, i, j) -> + if eval ~major ~minor op i j then ( + pp_pos (); + cat_block () + ) else + skip_block ~elseok:true () + | Raw s -> + print_endline s; + top () + | Elseif _ | Else | Endif -> fail "unexpected elseif|else|endif" + (* current block is the valid one *) + and cat_block () = + match parse_line () with + | Eof -> fail "unexpected EOF" + | If _ -> fail "nested if not supported" + | Raw s -> + print_endline s; + cat_block () + | Endif -> + pp_pos (); + top () + | Elseif _ | Else -> skip_block ~elseok:false () + (* skip current block. + @param elseok if true, we should evaluate "elseif" *) + and skip_block ~elseok () = + match parse_line () with + | Eof -> fail "unexpected EOF" + | If _ -> fail "nested if not supported" + | Raw _ -> skip_block ~elseok () + | Endif -> + pp_pos (); + top () + | Elseif (op, i, j) -> + if elseok && eval ~major ~minor op i j then ( + pp_pos (); + cat_block () + ) else + skip_block ~elseok () + | Else -> + if elseok then ( + pp_pos (); + cat_block () + ) else + skip_block ~elseok () + in + top () + +let () = + let file = Sys.argv.(1) in + let version = Sys.ocaml_version in + let major, minor = Scanf.sscanf version "%u.%u" (fun maj min -> maj, min) in + let ic = open_in file in + preproc_lines ~file ~major ~minor ic; + + () diff --git a/src/cpp/dune b/src/cpp/dune new file mode 100644 index 00000000..6ec12a60 --- /dev/null +++ b/src/cpp/dune @@ -0,0 +1,5 @@ +; our little preprocessor (ported from containers) + +(executable + (name cpp) + (modes (best exe))) diff --git a/src/domain_.ml b/src/domain_.ml new file mode 100644 index 00000000..1878ab6d --- /dev/null +++ b/src/domain_.ml @@ -0,0 +1,23 @@ +[@@@ifge 5.0] +[@@@ocaml.alert "-unstable"] + +let recommended_number () = Domain.recommended_domain_count () + +type t = unit Domain.t + +let get_id (self : t) : int = (Domain.get_id self :> int) +let spawn : _ -> t = Domain.spawn +let relax = Domain.cpu_relax + +[@@@ocaml.alert "+unstable"] +[@@@else_] + +let recommended_number () = 1 + +type t = Thread.t + +let get_id (self : t) : int = Thread.id self +let spawn f : t = Thread.create f () +let relax () = Thread.yield () + +[@@@endif] diff --git a/src/dune b/src/dune index 5cf7f22a..d65920e8 100644 --- a/src/dune +++ b/src/dune @@ -2,25 +2,10 @@ (public_name moonpool) (name moonpool) (private_modules d_pool_) + (preprocess + (action + (run %{project_root}/src/cpp/cpp.exe %{input-file}))) (libraries threads either (select dla_.ml from (domain-local-await -> dla_.real.ml) ( -> dla_.dummy.ml)))) - -(rule - (targets atomic_.ml) - (action - (with-stdout-to %{targets} - (run ./gen/gen.exe --ocaml %{ocaml_version} --atomic)))) - -(rule - (targets domain_.ml) - (action - (with-stdout-to %{targets} - (run ./gen/gen.exe --ocaml %{ocaml_version} --domain)))) - -(rule - (targets suspend_.ml) - (action - (with-stdout-to %{targets} - (run ./gen/gen.exe --ocaml %{ocaml_version} --suspend)))) diff --git a/src/fork_join.ml b/src/fork_join.ml index e524f000..66ee8dd8 100644 --- a/src/fork_join.ml +++ b/src/fork_join.ml @@ -1,3 +1,5 @@ +[@@@ifge 5.0] + module A = Atomic_ module State_ = struct @@ -79,7 +81,7 @@ let both f g : _ * _ = Suspend_.suspend { - Suspend_types_.handle = + Suspend_.handle = (fun ~run suspension -> (* nothing else is started, no race condition possible *) (A.get st).suspension <- Some suspension; @@ -95,7 +97,7 @@ let all_list fs : _ list = let has_failed = A.make false in let missing = A.make len in - let start_tasks ~run (suspension : Suspend_types_.suspension) = + let start_tasks ~run (suspension : Suspend_.suspension) = let task_for i f = try let x = f () in @@ -117,7 +119,7 @@ let all_list fs : _ list = Suspend_.suspend { - Suspend_types_.handle = + Suspend_.handle = (fun ~run suspension -> (* nothing else is started, no race condition possible *) start_tasks ~run suspension); @@ -130,3 +132,5 @@ let all_list fs : _ list = | Some x -> x) let all_init n f = all_list @@ List.init n (fun i () -> f i) + +[@@@endif] diff --git a/src/fork_join.mli b/src/fork_join.mli index 2d0c6f71..4f9dffa5 100644 --- a/src/fork_join.mli +++ b/src/fork_join.mli @@ -1,7 +1,11 @@ (** Fork-join primitives. + {b NOTE} These are only available on OCaml 5.0 and above. + @since 0.3 *) +[@@@ifge 5.0] + val both : (unit -> 'a) -> (unit -> 'b) -> 'a * 'b (** [both f g] runs [f()] and [g()], potentially in parallel, and returns their result when both are done. @@ -30,3 +34,5 @@ val all_init : int -> (int -> 'a) -> 'a list all the results. @since 0.3 {b NOTE} this is only available on OCaml 5. *) + +[@@@endif] diff --git a/src/fut.ml b/src/fut.ml index e9401cb2..e998ae32 100644 --- a/src/fut.ml +++ b/src/fut.ml @@ -354,6 +354,8 @@ let wait_block_exn self = | Ok x -> x | Error (e, bt) -> Printexc.raise_with_backtrace e bt +[@@@ifge 5.0] + let await (fut : 'a t) : 'a = match peek fut with | Some res -> @@ -365,7 +367,7 @@ let await (fut : 'a t) : 'a = (* suspend until the future is resolved *) Suspend_.suspend { - Suspend_types_.handle = + Suspend_.handle = (fun ~run k -> on_result fut (function | Ok _ -> @@ -378,6 +380,8 @@ let await (fut : 'a t) : 'a = (* un-suspended: we should have a result! *) get_or_fail_exn fut +[@@@endif] + module type INFIX = sig val ( >|= ) : 'a t -> ('a -> 'b) -> 'b t val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t diff --git a/src/fut.mli b/src/fut.mli index b14893f4..cb188d5c 100644 --- a/src/fut.mli +++ b/src/fut.mli @@ -147,7 +147,9 @@ val for_list : on:Pool.t -> 'a list -> ('a -> unit) -> unit t (** {2 Await} -This is only available on OCaml 5. *) + {b NOTE} This is only available on OCaml 5. *) + +[@@@ifge 5.0] val await : 'a t -> 'a (** [await fut] suspends the current tasks until [fut] is fulfilled, then @@ -156,6 +158,8 @@ val await : 'a t -> 'a @since 0.3 {b NOTE}: only on OCaml 5 *) +[@@@endif] + (** {2 Blocking} *) val wait_block : 'a t -> 'a or_error diff --git a/src/gen/dune b/src/gen/dune deleted file mode 100644 index cd463d88..00000000 --- a/src/gen/dune +++ /dev/null @@ -1,3 +0,0 @@ - -(executable - (name gen)) diff --git a/src/gen/gen.ml b/src/gen/gen.ml deleted file mode 100644 index a5638f90..00000000 --- a/src/gen/gen.ml +++ /dev/null @@ -1,154 +0,0 @@ -let atomic_pre_412 = - {| -type 'a t = { mutable x: 'a } - -let[@inline] make x = { x } -let[@inline] get { x } = x -let[@inline] set r x = r.x <- x - -let[@inline never] exchange r x = - (* atomic *) - let y = r.x in - r.x <- x; - (* atomic *) - y - -let[@inline never] compare_and_set r seen v = - (* atomic *) - if r.x == seen then ( - r.x <- v; - (* atomic *) - true - ) else - false - -let[@inline never] fetch_and_add r x = - (* atomic *) - let v = r.x in - r.x <- x + r.x; - (* atomic *) - v - -let[@inline never] incr r = - (* atomic *) - r.x <- 1 + r.x -(* atomic *) - -let[@inline never] decr r = - (* atomic *) - r.x <- r.x - 1 -(* atomic *) - -|} - -let atomic_post_412 = {| -include Atomic -|} - -let domain_pre_5 = - {| -let recommended_number () = 1 - -type t = Thread.t - -let get_id (self:t) : int = Thread.id self - -let spawn f : t = - Thread.create f () - -let relax () = Thread.yield () -|} - -let domain_post_5 = - {| -let recommended_number () = Domain.recommended_domain_count () - -type t = unit Domain.t - -let get_id (self:t) : int = (Domain.get_id self :> int) - -let spawn : _ -> t = Domain.spawn - -let relax = Domain.cpu_relax -|} - -let suspend_pre_5 = - {| -let suspend _ = failwith "Thread suspension is only available on OCaml >= 5.0" -let with_suspend ~run:_ f : unit = f() -|} - -let suspend_post_5 = - {| -open Suspend_types_ - -type _ Effect.t += - | Suspend : suspension_handler -> unit Effect.t - -let[@inline] suspend h = Effect.perform (Suspend h) - -let with_suspend ~(run:with_handler:bool -> task -> unit) (f: unit -> unit) : unit = - let module E = Effect.Deep in - - (* effect handler *) - let effc - : type e. e Effect.t -> ((e, _) E.continuation -> _) option - = function - | Suspend h -> - Some (fun k -> - let k': suspension = function - | Ok () -> E.continue k () - | Error (exn, bt) -> - E.discontinue_with_backtrace k exn bt - in - h.handle ~run k' - ) - | _ -> None - in - - E.try_with f () {E.effc} -|} - -let p_version s = Scanf.sscanf s "%d.%d" (fun x y -> x, y) - -let () = - let atomic = ref false in - let domain = ref false in - let suspend = ref false in - let ocaml = ref Sys.ocaml_version in - Arg.parse - [ - "--atomic", Arg.Set atomic, " atomic"; - "--domain", Arg.Set domain, " domain"; - "--suspend", Arg.Set suspend, " suspend"; - "--ocaml", Arg.Set_string ocaml, " set ocaml version"; - ] - ignore ""; - - let major, minor = p_version !ocaml in - - if !atomic then ( - let code = - if (major, minor) < (4, 12) then - atomic_pre_412 - else - atomic_post_412 - in - print_endline code - ) else if !domain then ( - let code = - if (major, minor) < (5, 0) then - domain_pre_5 - else - domain_post_5 - in - print_endline code - ) else if !suspend then ( - let code = - if (major, minor) < (5, 0) then - suspend_pre_5 - else - suspend_post_5 - in - print_endline code - ) diff --git a/src/pool.ml b/src/pool.ml index 52aa050b..9cef70ab 100644 --- a/src/pool.ml +++ b/src/pool.ml @@ -102,7 +102,7 @@ let prepare_for_await () : Dla_.t = | Some (run, k) -> run ~with_handler:true (fun () -> k (Ok ())) and await () : unit = Suspend_.suspend - { Suspend_types_.handle = (fun ~run k -> A.set st (Some (run, k))) } + { Suspend_.handle = (fun ~run k -> A.set st (Some (run, k))) } in let t = { Dla_.release; await } in diff --git a/src/suspend_.ml b/src/suspend_.ml new file mode 100644 index 00000000..172560fc --- /dev/null +++ b/src/suspend_.ml @@ -0,0 +1,36 @@ +type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit +type task = unit -> unit + +type suspension_handler = { + handle: run:(with_handler:bool -> task -> unit) -> suspension -> unit; +} +[@@unboxed] + +[@@@ifge 5.0] +[@@@ocaml.alert "-unstable"] + +type _ Effect.t += Suspend : suspension_handler -> unit Effect.t + +let[@inline] suspend h = Effect.perform (Suspend h) + +let with_suspend ~(run : with_handler:bool -> task -> unit) (f : unit -> unit) : + unit = + let module E = Effect.Deep in + (* effect handler *) + let effc : type e. e Effect.t -> ((e, _) E.continuation -> _) option = + function + | Suspend h -> + Some + (fun k -> + let k' : suspension = function + | Ok () -> E.continue k () + | Error (exn, bt) -> E.discontinue_with_backtrace k exn bt + in + h.handle ~run k') + | _ -> None + in + + E.try_with f () { E.effc } + +[@@@ocaml.alert "+unstable"] +[@@@endif] diff --git a/src/suspend_.mli b/src/suspend_.mli index 9dbd3c6a..a720fdb9 100644 --- a/src/suspend_.mli +++ b/src/suspend_.mli @@ -3,7 +3,18 @@ This module is an implementation detail of Moonpool and should not be used outside of it. *) -open Suspend_types_ +type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit +(** A suspended computation *) + +type task = unit -> unit + +type suspension_handler = { + handle: run:(with_handler:bool -> task -> unit) -> suspension -> unit; +} +[@@unboxed] +(** The handler that knows what to do with the suspended computation *) + +[@@@ifge 5.0] val suspend : suspension_handler -> unit (** [suspend h] jumps back to the nearest {!with_suspend} @@ -17,3 +28,5 @@ val with_suspend : will work. If [f()] suspends with suspension handler [h], this calls [h ~run k] where [k] is the suspension. *) + +[@@@endif]