mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-17 08:06:43 -05:00
breaking: fut: change behavior of ?on
combinators that take `?on` will now check if they can use the current runner if `?on:None` is passed. If no runner is passed and they're run from outside a runner, they will just run in the relevant callback or on the current thread.
This commit is contained in:
parent
05bf3080dc
commit
74d9e5a19c
1 changed files with 38 additions and 31 deletions
69
src/fut.ml
69
src/fut.ml
|
|
@ -113,8 +113,13 @@ let reify_error (f : 'a t) : 'a or_error t =
|
||||||
on_result f (fun r -> fulfill promise (Ok r));
|
on_result f (fun r -> fulfill promise (Ok r));
|
||||||
fut
|
fut
|
||||||
|
|
||||||
|
let get_runner_ ?on () : Runner.t option =
|
||||||
|
match on with
|
||||||
|
| Some _ as r -> r
|
||||||
|
| None -> Runner.get_current_runner ()
|
||||||
|
|
||||||
let map ?on ~f fut : _ t =
|
let map ?on ~f fut : _ t =
|
||||||
let map_res r =
|
let map_immediate_ r : _ result =
|
||||||
match r with
|
match r with
|
||||||
| Ok x ->
|
| Ok x ->
|
||||||
(try Ok (f x)
|
(try Ok (f x)
|
||||||
|
|
@ -123,19 +128,22 @@ let map ?on ~f fut : _ t =
|
||||||
Error (e, bt))
|
Error (e, bt))
|
||||||
| Error e_bt -> Error e_bt
|
| Error e_bt -> Error e_bt
|
||||||
in
|
in
|
||||||
match peek fut with
|
|
||||||
| Some r -> of_result (map_res r)
|
|
||||||
| None ->
|
|
||||||
let fut2, promise = make () in
|
|
||||||
on_result fut (fun r ->
|
|
||||||
let map_and_fulfill () =
|
|
||||||
let res = map_res r in
|
|
||||||
fulfill promise res
|
|
||||||
in
|
|
||||||
|
|
||||||
match on with
|
match peek fut, get_runner_ ?on () with
|
||||||
| None -> map_and_fulfill ()
|
| Some res, None -> of_result @@ map_immediate_ res
|
||||||
| Some on -> Runner.run_async on map_and_fulfill);
|
| Some res, Some runner ->
|
||||||
|
let fut2, promise = make () in
|
||||||
|
Runner.run_async runner (fun () -> fulfill promise @@ map_immediate_ res);
|
||||||
|
fut2
|
||||||
|
| None, None ->
|
||||||
|
let fut2, promise = make () in
|
||||||
|
on_result fut (fun res -> fulfill promise @@ map_immediate_ res);
|
||||||
|
fut2
|
||||||
|
| None, Some runner ->
|
||||||
|
let fut2, promise = make () in
|
||||||
|
on_result fut (fun res ->
|
||||||
|
Runner.run_async runner (fun () ->
|
||||||
|
fulfill promise @@ map_immediate_ res));
|
||||||
fut2
|
fut2
|
||||||
|
|
||||||
let join (fut : 'a t t) : 'a t =
|
let join (fut : 'a t t) : 'a t =
|
||||||
|
|
@ -160,32 +168,31 @@ let bind ?on ~f fut : _ t =
|
||||||
| Error (e, bt) -> fail e bt
|
| Error (e, bt) -> fail e bt
|
||||||
in
|
in
|
||||||
|
|
||||||
let bind_and_fulfill r promise () =
|
let bind_and_fulfill (r : _ result) promise () : unit =
|
||||||
let f_res_fut = apply_f_to_res r in
|
let f_res_fut = apply_f_to_res r in
|
||||||
(* forward result *)
|
(* forward result *)
|
||||||
on_result f_res_fut (fun r -> fulfill promise r)
|
on_result f_res_fut (fun r -> fulfill promise r)
|
||||||
in
|
in
|
||||||
|
|
||||||
match peek fut with
|
match peek fut, get_runner_ ?on () with
|
||||||
| Some r ->
|
| Some res, Some runner ->
|
||||||
(match on with
|
let fut2, promise = make () in
|
||||||
| None -> apply_f_to_res r
|
Runner.run_async runner (bind_and_fulfill res promise);
|
||||||
| Some on ->
|
fut2
|
||||||
let fut2, promise = make () in
|
| Some res, None -> apply_f_to_res res
|
||||||
Runner.run_async on (bind_and_fulfill r promise);
|
| None, Some runner ->
|
||||||
fut2)
|
|
||||||
| None ->
|
|
||||||
let fut2, promise = make () in
|
let fut2, promise = make () in
|
||||||
on_result fut (fun r ->
|
on_result fut (fun r ->
|
||||||
match on with
|
Runner.run_async runner (bind_and_fulfill r promise));
|
||||||
| None -> bind_and_fulfill r promise ()
|
fut2
|
||||||
| Some on -> Runner.run_async on (bind_and_fulfill r promise));
|
| None, None ->
|
||||||
|
let fut2, promise = make () in
|
||||||
|
on_result fut (fun res -> bind_and_fulfill res promise ());
|
||||||
fut2
|
fut2
|
||||||
|
|
||||||
let bind_reify_error ?on ~f fut : _ t = bind ?on ~f (reify_error fut)
|
let[@inline] bind_reify_error ?on ~f fut : _ t = bind ?on ~f (reify_error fut)
|
||||||
|
|
||||||
let update_ (st : 'a A.t) f : 'a =
|
let update_atomic_ (st : 'a A.t) f : 'a =
|
||||||
let rec loop () =
|
let rec loop () =
|
||||||
let x = A.get st in
|
let x = A.get st in
|
||||||
let y = f x in
|
let y = f x in
|
||||||
|
|
@ -210,7 +217,7 @@ let both a b : _ t =
|
||||||
| Error err -> fulfill_idempotent promise (Error err)
|
| Error err -> fulfill_idempotent promise (Error err)
|
||||||
| Ok x ->
|
| Ok x ->
|
||||||
(match
|
(match
|
||||||
update_ st (function
|
update_atomic_ st (function
|
||||||
| `Neither -> `Left x
|
| `Neither -> `Left x
|
||||||
| `Right y -> `Both (x, y)
|
| `Right y -> `Both (x, y)
|
||||||
| _ -> assert false)
|
| _ -> assert false)
|
||||||
|
|
@ -221,7 +228,7 @@ let both a b : _ t =
|
||||||
| Error err -> fulfill_idempotent promise (Error err)
|
| Error err -> fulfill_idempotent promise (Error err)
|
||||||
| Ok y ->
|
| Ok y ->
|
||||||
(match
|
(match
|
||||||
update_ st (function
|
update_atomic_ st (function
|
||||||
| `Left x -> `Both (x, y)
|
| `Left x -> `Both (x, y)
|
||||||
| `Neither -> `Right y
|
| `Neither -> `Right y
|
||||||
| _ -> assert false)
|
| _ -> assert false)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue