diff --git a/src/fib/fiber.ml b/src/fib/fiber.ml index ebfd3319..7007aa17 100644 --- a/src/fib/fiber.ml +++ b/src/fib/fiber.ml @@ -9,22 +9,26 @@ type cancel_callback = Exn_bt.t -> unit let prom_of_fut : 'a Fut.t -> 'a Fut.promise = Fut.Private_.unsafe_promise_of_fut -type 'a t = { - id: Handle.t; (** unique identifier for this fiber *) - state: 'a state A.t; (** Current state in the lifetime of the fiber *) - res: 'a Fut.t; - runner: Runner.t; -} +module Private_ = struct + type 'a t = { + id: Handle.t; (** unique identifier for this fiber *) + state: 'a state A.t; (** Current state in the lifetime of the fiber *) + res: 'a Fut.t; + runner: Runner.t; + } -and 'a state = - | Alive of { - children: children; - on_cancel: cancel_callback list; - } - | Terminating_or_done of 'a Exn_bt.result A.t + and 'a state = + | Alive of { + children: children; + on_cancel: cancel_callback list; + } + | Terminating_or_done of 'a Exn_bt.result A.t -and children = any FM.t -and any = Any : _ t -> any [@@unboxed] + and children = any FM.t + and any = Any : _ t -> any [@@unboxed] +end + +include Private_ let[@inline] res self = self.res let[@inline] peek self = Fut.peek self.res @@ -178,6 +182,11 @@ let spawn_ ~on (f : _ -> 'a) : 'a t = let[@inline] spawn_top ~on f : _ t = spawn_ ~on f +let[@inline] self () : any = + match Task_local_storage.get k_current_fiber with + | None -> failwith "Fiber.self: must be run from inside a fiber." + | Some f -> f + let spawn_link ~protect f : _ t = match Task_local_storage.get k_current_fiber with | None -> failwith "Fiber.spawn_link: must be run from inside a fiber." @@ -217,6 +226,11 @@ let with_cancel_callback (self : _ t) cb (k : unit -> 'a) : 'a = add_cancel_cb_ self cb; Fun.protect k ~finally:(fun () -> remove_top_cancel_cb_ self) +let with_self_cancel_callback cb (k : unit -> 'a) : 'a = + let (Any self) = self () in + add_cancel_cb_ self cb; + Fun.protect k ~finally:(fun () -> remove_top_cancel_cb_ self) + let[@inline] await self = Fut.await self.res module Suspend_ = Moonpool.Private.Suspend_ diff --git a/src/fib/fiber.mli b/src/fib/fiber.mli index 5b01948a..694d3a92 100644 --- a/src/fib/fiber.mli +++ b/src/fib/fiber.mli @@ -9,7 +9,24 @@ of structured concurrency *) -type 'a t +(**/**) + +module Private_ : sig + type 'a state + + type 'a t = private { + id: Handle.t; (** unique identifier for this fiber *) + state: 'a state Atomic.t; (** Current state in the lifetime of the fiber *) + res: 'a Fut.t; + runner: Runner.t; + } + (** Type definition, exposed so that {!any} can be unboxed. + Please do not rely on that. *) +end + +(**/**) + +type 'a t = 'a Private_.t (** A fiber returning a value of type ['a]. *) val res : 'a t -> 'a Fut.t @@ -20,6 +37,14 @@ type 'a callback = 'a Exn_bt.result -> unit type cancel_callback = Exn_bt.t -> unit +(** Type erased fiber *) +type any = Any : _ t -> any [@@unboxed] + +val self : unit -> any +(** [self ()] is the current fiber. + Must be run from inside a fiber. + @raise Failure if not run from inside a fiber. *) + val peek : 'a t -> 'a Fut.or_error option (** Peek inside the future result *) @@ -50,6 +75,10 @@ val with_cancel_callback : _ t -> cancel_callback -> (unit -> 'a) -> 'a [cb()] is called. If [e] returns without the fiber being cancelled, this callback is removed. *) +val with_self_cancel_callback : cancel_callback -> (unit -> 'a) -> 'a +(** [with_self_cancel_callback cb f] calls [f()] in a scope where + [cb] is added to the cancel callbacks of the current fiber *) + val on_result : 'a t -> 'a callback -> unit (** Wait for fiber to be done and call the callback with the result. If the fiber is done already then the