diff --git a/src/core/fut.ml b/src/core/fut.ml index 2cba1259..cc8ee076 100644 --- a/src/core/fut.ml +++ b/src/core/fut.ml @@ -33,9 +33,9 @@ let[@inline] peek self : _ option = C.peek self.st let[@inline] raise_if_failed self : unit = C.check self.st let[@inline] is_success self = - match C.peek self.st with - | Some (Ok _) -> true - | _ -> false + match C.peek_exn self.st with + | _ -> true + | exception _ -> false let[@inline] is_failed self = C.is_canceled self.st @@ -47,24 +47,21 @@ let[@inline] get_or_fail self = | None -> raise Not_ready let[@inline] get_or_fail_exn self = - match C.peek self.st with - | Some (Ok x) -> x - | Some (Error ebt) -> Exn_bt.raise ebt - | None -> raise Not_ready + match C.peek_exn self.st with + | x -> x + | exception C.Running -> raise Not_ready -let[@inline] peek_ok_assert_ (self : 'a t) : 'a = - if C.is_running self.st then assert false; - (* cannot block *) - C.await self.st +let[@inline] peek_or_assert_ (self : 'a t) : 'a = + match C.peek_exn self.st with + | x -> x + | exception C.Running -> assert false let on_result_cb_ _tr f self : unit = - let res = - try Ok (peek_ok_assert_ self) - with exn -> - let ebt = Exn_bt.get exn in - Error ebt - in - f res + match peek_or_assert_ self with + | x -> f (Ok x) + | exception exn -> + let ebt = Exn_bt.get exn in + f (Error ebt) let on_result (self : _ t) (f : _ waiter) : unit = let trigger = @@ -303,7 +300,7 @@ let barrier_on_abstract_container_of_futures ~iter ~len ~aggregate_results cont let n = A.fetch_and_add missing (-1) in if n = 1 then ( (* last future, we know they all succeeded, so resolve [fut] *) - let res = aggregate_results peek_ok_assert_ cont in + let res = aggregate_results peek_or_assert_ cont in fulfill promise (Ok res) ) | Error e_bt -> @@ -369,62 +366,65 @@ let for_list ~on l f : unit t = (* ### blocking ### *) -(* TODO: use a trigger directly? *) -let wait_block (self : 'a t) : 'a or_error = - match A.get self.st with - | Done x -> x (* fast path *) - | Waiting _ -> +let push_queue_ _tr q () = Bb_queue.push q () + +let wait_block_exn (self : 'a t) : 'a = + match C.peek_exn self.st with + | x -> x (* fast path *) + | exception C.Running -> let real_block () = (* use queue only once *) let q = Bb_queue.create () in - on_result self (fun r -> Bb_queue.push q r); - Bb_queue.pop q + + let trigger = Trigger.create () in + let attached = + (Trigger.on_signal trigger q () push_queue_ [@alert "-handler"]) + in + assert attached; + + (* blockingly wait for trigger if computation didn't complete in the mean time *) + if C.try_attach self.st trigger then Bb_queue.pop q; + + (* trigger was signaled! computation must be done*) + peek_or_assert_ self in + (* TODO: use backoff? *) (* a bit of spinning before we block *) let rec loop i = if i = 0 then real_block () else ( - match A.get self.st with - | Done x -> x - | Waiting _ -> + match C.peek_exn self.st with + | x -> x + | exception C.Running -> Domain_.relax (); (loop [@tailcall]) (i - 1) ) in loop 50 -let wait_block_exn self = - match wait_block self with - | Ok x -> x - | Error { exn; bt } -> Printexc.raise_with_backtrace exn bt +let wait_block self = + match wait_block_exn self with + | x -> Ok x + | exception exn -> + let bt = Printexc.get_raw_backtrace () in + Error { Exn_bt.exn; bt } [@@@ifge 5.0] -let await (fut : 'a t) : 'a = - match peek fut with - | Some res -> - (* fast path: peek *) - (match res with - | Ok x -> x - | Error { exn; bt } -> Printexc.raise_with_backtrace exn bt) - | None -> +let await (self : 'a t) : 'a = + (* fast path: peek *) + match C.peek_exn self.st with + | res -> res + | exception C.Running -> + let trigger = Trigger.create () in (* suspend until the future is resolved *) - Suspend_.suspend - { - Suspend_.handle = - (fun ~run:_ ~resume k -> - on_result fut (function - | Ok _ -> - (* schedule continuation with the same name *) - resume k (Ok ()) - | Error ebt -> - (* fail continuation immediately *) - resume k (Error ebt))); - }; + if C.try_attach self.st trigger then + Option.iter Exn_bt.raise @@ Trigger.await trigger; + (* un-suspended: we should have a result! *) - get_or_fail_exn fut + get_or_fail_exn self [@@@endif]