mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-10 13:14:05 -05:00
feat: add Suspend_ module, using effects, on OCaml >= 5.0
This commit is contained in:
parent
eab774813d
commit
52a04701ed
4 changed files with 83 additions and 2 deletions
9
src/dune
9
src/dune
|
|
@ -9,10 +9,15 @@
|
|||
(action
|
||||
(with-stdout-to %{targets}
|
||||
(run ./gen/gen.exe --ocaml %{ocaml_version} --atomic))))
|
||||
|
||||
|
||||
(rule
|
||||
(targets domain_.ml)
|
||||
(action
|
||||
(with-stdout-to %{targets}
|
||||
(run ./gen/gen.exe --ocaml %{ocaml_version} --domain))))
|
||||
|
||||
|
||||
(rule
|
||||
(targets suspend_.ml)
|
||||
(action
|
||||
(with-stdout-to %{targets}
|
||||
(run ./gen/gen.exe --ocaml %{ocaml_version} --suspend))))
|
||||
|
|
|
|||
|
|
@ -72,16 +72,56 @@ let spawn : _ -> t = Domain.spawn
|
|||
let relax = Domain.cpu_relax
|
||||
|}
|
||||
|
||||
let suspend_pre_5 =
|
||||
{|
|
||||
open Suspend_types_
|
||||
let suspend _ = failwith "Thread suspension is only available on OCaml >= 5.0"
|
||||
let with_suspend ~run:_ f : unit = f()
|
||||
|}
|
||||
|
||||
let suspend_post_5 =
|
||||
{|
|
||||
open Suspend_types_
|
||||
|
||||
type _ Effect.t +=
|
||||
| Suspend : suspension_handler -> unit Effect.t
|
||||
|
||||
let[@inline] suspend h = Effect.perform (Suspend h)
|
||||
|
||||
let with_suspend ~run (f: unit -> unit) : unit =
|
||||
let module E = Effect.Deep in
|
||||
|
||||
(* effect handler *)
|
||||
let effc
|
||||
: type e. e Effect.t -> ((e, unit) E.continuation -> unit) option
|
||||
= function
|
||||
| Suspend h ->
|
||||
Some (fun k ->
|
||||
let k' = function
|
||||
| Ok () -> E.continue k ()
|
||||
| Error (exn, bt) ->
|
||||
E.discontinue_with_backtrace k exn bt
|
||||
in
|
||||
h.handle run k'
|
||||
)
|
||||
| _ -> None
|
||||
in
|
||||
|
||||
E.try_with f () {E.effc}
|
||||
|}
|
||||
|
||||
let p_version s = Scanf.sscanf s "%d.%d" (fun x y -> x, y)
|
||||
|
||||
let () =
|
||||
let atomic = ref false in
|
||||
let domain = ref false in
|
||||
let suspend = ref false in
|
||||
let ocaml = ref Sys.ocaml_version in
|
||||
Arg.parse
|
||||
[
|
||||
"--atomic", Arg.Set atomic, " atomic";
|
||||
"--domain", Arg.Set domain, " domain";
|
||||
"--suspend", Arg.Set suspend, " suspend";
|
||||
"--ocaml", Arg.Set_string ocaml, " set ocaml version";
|
||||
]
|
||||
ignore "";
|
||||
|
|
@ -104,4 +144,12 @@ let () =
|
|||
domain_post_5
|
||||
in
|
||||
print_endline code
|
||||
) else if !suspend then (
|
||||
let code =
|
||||
if (major, minor) < (5, 0) then
|
||||
suspend_pre_5
|
||||
else
|
||||
suspend_post_5
|
||||
in
|
||||
print_endline code
|
||||
)
|
||||
|
|
|
|||
15
src/suspend_.mli
Normal file
15
src/suspend_.mli
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
(** (Private) suspending tasks using Effects.
|
||||
|
||||
This module is an implementation detail of Moonpool and should
|
||||
not be used outside of it. *)
|
||||
|
||||
open Suspend_types_
|
||||
|
||||
val suspend : suspension_handler -> unit
|
||||
(** [suspend h] calls [h] with the current continuation [k].
|
||||
The suspension handler, [h], can decide to register [k] somewhere,
|
||||
so it's called later. *)
|
||||
|
||||
val with_suspend : run:runner -> (unit -> unit) -> unit
|
||||
(** [with_suspend ~run f] runs [f()] in an environment where [suspend]
|
||||
will work. It passes [run] to suspension handlers. *)
|
||||
13
src/suspend_types_.ml
Normal file
13
src/suspend_types_.ml
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
(** (Private) types for {!Suspend_}.
|
||||
|
||||
This module is an implementation detail of Moonpool and should
|
||||
not be used outside of it. *)
|
||||
|
||||
type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit
|
||||
(** A suspended computation *)
|
||||
|
||||
type runner = { run: (unit -> unit) -> unit } [@@unboxed]
|
||||
(** A task runner (typically, {!Pool.t}) *)
|
||||
|
||||
type suspension_handler = { handle: runner -> suspension -> unit } [@@unboxed]
|
||||
(** The handler that knows what to do with the suspended computation *)
|
||||
Loading…
Add table
Reference in a new issue