feat fiber: add helpers, expose any

This commit is contained in:
Simon Cruanes 2024-02-21 00:53:07 -05:00
parent cb8668f3dc
commit 8a7cfb6fb0
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 58 additions and 15 deletions

View file

@ -9,22 +9,26 @@ type cancel_callback = Exn_bt.t -> unit
let prom_of_fut : 'a Fut.t -> 'a Fut.promise = let prom_of_fut : 'a Fut.t -> 'a Fut.promise =
Fut.Private_.unsafe_promise_of_fut Fut.Private_.unsafe_promise_of_fut
type 'a t = { module Private_ = struct
id: Handle.t; (** unique identifier for this fiber *) type 'a t = {
state: 'a state A.t; (** Current state in the lifetime of the fiber *) id: Handle.t; (** unique identifier for this fiber *)
res: 'a Fut.t; state: 'a state A.t; (** Current state in the lifetime of the fiber *)
runner: Runner.t; res: 'a Fut.t;
} runner: Runner.t;
}
and 'a state = and 'a state =
| Alive of { | Alive of {
children: children; children: children;
on_cancel: cancel_callback list; on_cancel: cancel_callback list;
} }
| Terminating_or_done of 'a Exn_bt.result A.t | Terminating_or_done of 'a Exn_bt.result A.t
and children = any FM.t and children = any FM.t
and any = Any : _ t -> any [@@unboxed] and any = Any : _ t -> any [@@unboxed]
end
include Private_
let[@inline] res self = self.res let[@inline] res self = self.res
let[@inline] peek self = Fut.peek self.res let[@inline] peek self = Fut.peek self.res
@ -178,6 +182,11 @@ let spawn_ ~on (f : _ -> 'a) : 'a t =
let[@inline] spawn_top ~on f : _ t = spawn_ ~on f let[@inline] spawn_top ~on f : _ t = spawn_ ~on f
let[@inline] self () : any =
match Task_local_storage.get k_current_fiber with
| None -> failwith "Fiber.self: must be run from inside a fiber."
| Some f -> f
let spawn_link ~protect f : _ t = let spawn_link ~protect f : _ t =
match Task_local_storage.get k_current_fiber with match Task_local_storage.get k_current_fiber with
| None -> failwith "Fiber.spawn_link: must be run from inside a fiber." | None -> failwith "Fiber.spawn_link: must be run from inside a fiber."
@ -217,6 +226,11 @@ let with_cancel_callback (self : _ t) cb (k : unit -> 'a) : 'a =
add_cancel_cb_ self cb; add_cancel_cb_ self cb;
Fun.protect k ~finally:(fun () -> remove_top_cancel_cb_ self) Fun.protect k ~finally:(fun () -> remove_top_cancel_cb_ self)
let with_self_cancel_callback cb (k : unit -> 'a) : 'a =
let (Any self) = self () in
add_cancel_cb_ self cb;
Fun.protect k ~finally:(fun () -> remove_top_cancel_cb_ self)
let[@inline] await self = Fut.await self.res let[@inline] await self = Fut.await self.res
module Suspend_ = Moonpool.Private.Suspend_ module Suspend_ = Moonpool.Private.Suspend_

View file

@ -9,7 +9,24 @@
of structured concurrency of structured concurrency
*) *)
type 'a t (**/**)
module Private_ : sig
type 'a state
type 'a t = private {
id: Handle.t; (** unique identifier for this fiber *)
state: 'a state Atomic.t; (** Current state in the lifetime of the fiber *)
res: 'a Fut.t;
runner: Runner.t;
}
(** Type definition, exposed so that {!any} can be unboxed.
Please do not rely on that. *)
end
(**/**)
type 'a t = 'a Private_.t
(** A fiber returning a value of type ['a]. *) (** A fiber returning a value of type ['a]. *)
val res : 'a t -> 'a Fut.t val res : 'a t -> 'a Fut.t
@ -20,6 +37,14 @@ type 'a callback = 'a Exn_bt.result -> unit
type cancel_callback = Exn_bt.t -> unit type cancel_callback = Exn_bt.t -> unit
(** Type erased fiber *)
type any = Any : _ t -> any [@@unboxed]
val self : unit -> any
(** [self ()] is the current fiber.
Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *)
val peek : 'a t -> 'a Fut.or_error option val peek : 'a t -> 'a Fut.or_error option
(** Peek inside the future result *) (** Peek inside the future result *)
@ -50,6 +75,10 @@ val with_cancel_callback : _ t -> cancel_callback -> (unit -> 'a) -> 'a
[cb()] is called. If [e] returns without the fiber being cancelled, [cb()] is called. If [e] returns without the fiber being cancelled,
this callback is removed. *) this callback is removed. *)
val with_self_cancel_callback : cancel_callback -> (unit -> 'a) -> 'a
(** [with_self_cancel_callback cb f] calls [f()] in a scope where
[cb] is added to the cancel callbacks of the current fiber *)
val on_result : 'a t -> 'a callback -> unit val on_result : 'a t -> 'a callback -> unit
(** Wait for fiber to be done and call the callback (** Wait for fiber to be done and call the callback
with the result. If the fiber is done already then the with the result. If the fiber is done already then the