mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 03:05:30 -05:00
feat: add Pool.fork_join
This commit is contained in:
parent
2bbb4185a4
commit
009855ce0d
2 changed files with 102 additions and 1 deletions
87
src/pool.ml
87
src/pool.ml
|
|
@ -227,3 +227,90 @@ let shutdown_ ~wait (self : t) : unit =
|
|||
|
||||
let shutdown_without_waiting (self : t) : unit = shutdown_ self ~wait:false
|
||||
let shutdown (self : t) : unit = shutdown_ self ~wait:true
|
||||
|
||||
module Fork_join_ = struct
|
||||
type 'a single_res =
|
||||
| St_none
|
||||
| St_some of 'a
|
||||
| St_fail of exn * Printexc.raw_backtrace
|
||||
|
||||
type ('a, 'b) t = {
|
||||
mutable suspension:
|
||||
((unit, exn * Printexc.raw_backtrace) result -> unit) option;
|
||||
(** suspended caller *)
|
||||
left: 'a single_res;
|
||||
right: 'b single_res;
|
||||
}
|
||||
|
||||
let get_exn (self : _ t A.t) =
|
||||
match A.get self with
|
||||
| { left = St_fail (e, bt); _ } | { right = St_fail (e, bt); _ } ->
|
||||
Printexc.raise_with_backtrace e bt
|
||||
| { left = St_some x; right = St_some y; _ } -> x, y
|
||||
| _ -> assert false
|
||||
|
||||
let check_if_state_complete_ (self : _ t) : unit =
|
||||
match self.left, self.right, self.suspension with
|
||||
| St_some _, St_some _, Some f -> f (Ok ())
|
||||
| St_fail (e, bt), _, Some f | _, St_fail (e, bt), Some f ->
|
||||
f (Error (e, bt))
|
||||
| _ -> ()
|
||||
|
||||
let set_left_ (self : _ t A.t) (x : _ single_res) =
|
||||
while
|
||||
let old_st = A.get self in
|
||||
let new_st = { old_st with left = x } in
|
||||
if A.compare_and_set self old_st new_st then (
|
||||
check_if_state_complete_ new_st;
|
||||
false
|
||||
) else
|
||||
true
|
||||
do
|
||||
Domain_.relax ()
|
||||
done
|
||||
|
||||
let set_right_ (self : _ t A.t) (y : _ single_res) =
|
||||
while
|
||||
let old_st = A.get self in
|
||||
let new_st = { old_st with right = y } in
|
||||
if A.compare_and_set self old_st new_st then (
|
||||
check_if_state_complete_ new_st;
|
||||
false
|
||||
) else
|
||||
true
|
||||
do
|
||||
Domain_.relax ()
|
||||
done
|
||||
end
|
||||
|
||||
let fork_join f g : _ * _ =
|
||||
let open Fork_join_ in
|
||||
let st = A.make { suspension = None; left = St_none; right = St_none } in
|
||||
|
||||
let start_tasks ~run () : unit =
|
||||
run (fun () ->
|
||||
try
|
||||
let res = f () in
|
||||
set_left_ st (St_some res)
|
||||
with e ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
set_left_ st (St_fail (e, bt)));
|
||||
|
||||
run (fun () ->
|
||||
try
|
||||
let res = g () in
|
||||
set_right_ st (St_some res)
|
||||
with e ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
set_right_ st (St_fail (e, bt)))
|
||||
in
|
||||
|
||||
Suspend_.suspend
|
||||
{
|
||||
Suspend_types_.handle =
|
||||
(fun ~run suspension ->
|
||||
(* nothing else is started, no race condition possible *)
|
||||
(A.get st).suspension <- Some suspension;
|
||||
start_tasks ~run ());
|
||||
};
|
||||
get_exn st
|
||||
|
|
|
|||
16
src/pool.mli
16
src/pool.mli
|
|
@ -75,7 +75,7 @@ val run_async : t -> (unit -> unit) -> unit
|
|||
(** [run_async pool f] schedules [f] for later execution on the pool
|
||||
in one of the threads. [f()] will run on one of the pool's
|
||||
worker threads.
|
||||
@raise Shutdown if the pool was shut down before [run] was called.
|
||||
@raise Shutdown if the pool was shut down before [run_async] was called.
|
||||
@since 0.3 *)
|
||||
|
||||
val run : t -> (unit -> unit) -> unit
|
||||
|
|
@ -91,3 +91,17 @@ val run_wait_block : t -> (unit -> 'a) -> 'a
|
|||
|
||||
{b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block}).
|
||||
@since 0.3 *)
|
||||
|
||||
(** {2 Fork-join computations} *)
|
||||
|
||||
val fork_join : (unit -> 'a) -> (unit -> 'b) -> 'a * 'b
|
||||
(** [fork_join f g] runs [f()] and [g()], potentially in parallel,
|
||||
and returns their result when both are done.
|
||||
If any of [f()] and [g()] fails, then the whole computation fails.
|
||||
|
||||
This must be run from within the pool, inside {!run}
|
||||
(or inside a {!Fut.spawn} computation).
|
||||
This is because it relies on an effect handler to be installed.
|
||||
|
||||
@since 0.3
|
||||
{b NOTE} this is only available on OCaml 5. *)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue