remove moonpool.fib

it's complicated and hard to use in practice, because it's not obvious
if a piece of code is running under another fiber or not, so
`Fiber.spawn` might fail because it has no parent.

So in practice we've been using `Fiber.spawn_top`… which has no
interest over just using `Fut.spawn`.
This commit is contained in:
Simon Cruanes 2025-10-22 11:35:46 -04:00
parent 2aa2612963
commit f9ab951c36
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
12 changed files with 1 additions and 607 deletions

View file

@ -1,6 +0,0 @@
(library
(name moonpool_fib)
(public_name moonpool.fib)
(synopsis "Fibers and structured concurrency for Moonpool")
(libraries moonpool picos)
(flags :standard -open Moonpool_private -open Moonpool))

View file

@ -1,334 +0,0 @@
open Moonpool.Private.Types_
module A = Atomic
module FM = Handle.Map
module Int_map = Map.Make (Int)
module PF = Picos.Fiber
module FLS = Picos.Fiber.FLS
type 'a callback = 'a Exn_bt.result -> unit
(** Callbacks that are called when a fiber is done. *)
type cancel_callback = Exn_bt.t -> unit
let prom_of_fut : 'a Fut.t -> 'a Fut.promise =
Fut.Private_.unsafe_promise_of_fut
(* TODO: replace with picos structured at some point? *)
module Private_ = struct
type pfiber = PF.t
type 'a t = {
id: Handle.t; (** unique identifier for this fiber *)
state: 'a state A.t; (** Current state in the lifetime of the fiber *)
res: 'a Fut.t;
runner: Runner.t;
pfiber: pfiber; (** Associated picos fiber *)
}
and 'a state =
| Alive of {
children: children;
on_cancel: cancel_callback Int_map.t;
cancel_id: int;
}
| Terminating_or_done of 'a Exn_bt.result A.t
and children = any FM.t
and any = Any : _ t -> any [@@unboxed]
(** Key to access the current moonpool.fiber. *)
let k_current_fiber : any FLS.t = FLS.create ()
exception Not_set = FLS.Not_set
let[@inline] get_cur_from_exn (pfiber : pfiber) : any =
FLS.get_exn pfiber k_current_fiber
let[@inline] get_cur_exn () : any =
get_cur_from_exn @@ get_current_fiber_exn ()
let[@inline] get_cur_opt () = try Some (get_cur_exn ()) with _ -> None
let[@inline] is_closed (self : _ t) =
match A.get self.state with
| Alive _ -> false
| Terminating_or_done _ -> true
end
include Private_
let create_ ~pfiber ~runner ~res () : 'a t =
let id = Handle.generate_fresh () in
{
state =
A.make
@@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 };
id;
res;
runner;
pfiber;
}
let create_done_ ~res () : _ t =
let id = Handle.generate_fresh () in
{
state =
A.make
@@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 };
id;
res;
runner = Runner.dummy;
pfiber = Moonpool.Private.Types_._dummy_fiber;
}
let[@inline] return x = create_done_ ~res:(Fut.return x) ()
let[@inline] fail ebt = create_done_ ~res:(Fut.fail_exn_bt ebt) ()
let[@inline] res self = self.res
let[@inline] peek self = Fut.peek self.res
let[@inline] is_done self = Fut.is_done self.res
let[@inline] is_success self = Fut.is_success self.res
let[@inline] is_cancelled self = Fut.is_failed self.res
let[@inline] on_result (self : _ t) f = Fut.on_result self.res f
let[@inline] await self = Fut.await self.res
let[@inline] wait_block self = Fut.wait_block self.res
let[@inline] wait_block_exn self = Fut.wait_block_exn self.res
(** Resolve [promise] once [children] are all done *)
let resolve_once_children_are_done_ ~children ~promise
(res : 'a Exn_bt.result A.t) : unit =
let n_children = FM.cardinal children in
if n_children > 0 then (
(* wait for all children to be done *)
let n_waiting = A.make (FM.cardinal children) in
let on_child_finish (r : _ result) =
(* make sure the parent fails if any child fails *)
(match r with
| Ok _ -> ()
| Error ebt -> A.set res (Error ebt));
(* if we're the last to finish, resolve the parent fiber's [res] *)
if A.fetch_and_add n_waiting (-1) = 1 then (
let res = A.get res in
Fut.fulfill promise res
)
in
FM.iter (fun _ (Any f) -> Fut.on_result f.res on_child_finish) children
) else
Fut.fulfill promise @@ A.get res
let rec resolve_as_failed_ : type a. a t -> Exn_bt.t -> unit =
fun self ebt ->
let promise = prom_of_fut self.res in
while
match A.get self.state with
| Alive { children; cancel_id = _; on_cancel } as old ->
let new_st = Terminating_or_done (A.make @@ Error ebt) in
if A.compare_and_set self.state old new_st then (
(* here, unlike in {!resolve_fiber}, we immediately cancel children *)
cancel_children_ ~children ebt;
Int_map.iter (fun _ cb -> cb ebt) on_cancel;
resolve_once_children_are_done_ ~children ~promise (A.make @@ Error ebt);
false
) else
true
| Terminating_or_done _ -> false
do
()
done
(** Cancel eagerly all children *)
and cancel_children_ ebt ~children : unit =
FM.iter (fun _ (Any f) -> resolve_as_failed_ f ebt) children
type cancel_handle = int
let add_on_cancel (self : _ t) cb : cancel_handle =
let h = ref 0 in
while
match A.get self.state with
| Alive { children; cancel_id; on_cancel } as old ->
let new_st =
Alive
{
children;
cancel_id = cancel_id + 1;
on_cancel = Int_map.add cancel_id cb on_cancel;
}
in
if A.compare_and_set self.state old new_st then (
h := cancel_id;
false
) else
true
| Terminating_or_done r ->
(match A.get r with
| Error ebt -> cb ebt
| Ok _ -> ());
false
do
()
done;
!h
let remove_on_cancel (self : _ t) h =
while
match A.get self.state with
| Alive ({ on_cancel; _ } as alive) as old ->
let new_st =
Alive { alive with on_cancel = Int_map.remove h on_cancel }
in
not (A.compare_and_set self.state old new_st)
| Terminating_or_done _ -> false
do
()
done
let with_on_cancel (self : _ t) cb (k : unit -> 'a) : 'a =
let h = add_on_cancel self cb in
Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
(** Successfully resolve the fiber. This might still fail if some children
failed. *)
let resolve_ok_ (self : 'a t) (r : 'a) : unit =
let r = A.make @@ Ok r in
let promise = prom_of_fut self.res in
while
match A.get self.state with
| Alive { children; _ } as old ->
let new_st = Terminating_or_done r in
if A.compare_and_set self.state old new_st then (
resolve_once_children_are_done_ ~children ~promise r;
false
) else
true
| Terminating_or_done _ -> false
do
()
done
let remove_child_ (self : _ t) (child : _ t) =
while
match A.get self.state with
| Alive ({ children; _ } as alive) as old ->
let new_st =
Alive { alive with children = FM.remove child.id children }
in
not (A.compare_and_set self.state old new_st)
| _ -> false
do
()
done
(** Add a child to [self].
@param protected if true, the child's failure will not affect [self]. *)
let add_child_ ~protect (self : _ t) (child : _ t) =
while
match A.get self.state with
| Alive ({ children; _ } as alive) as old ->
let new_st =
Alive { alive with children = FM.add child.id (Any child) children }
in
if A.compare_and_set self.state old new_st then (
(* make sure to remove [child] from [self.children] once it's done;
fail [self] is [child] failed and [protect=false] *)
Fut.on_result child.res (function
| Ok _ -> remove_child_ self child
| Error ebt ->
(* child failed, we must fail too *)
remove_child_ self child;
if not protect then resolve_as_failed_ self ebt);
false
) else
true
| Terminating_or_done r ->
(match A.get r with
| Error ebt ->
(* cancel child immediately *)
resolve_as_failed_ child ebt
| Ok _ -> ());
false
do
()
done
let spawn_ ~parent ~runner (f : unit -> 'a) : 'a t =
let res, _ = Fut.make () in
let pfiber = PF.create ~forbid:false (Fut.Private_.as_computation res) in
(* copy local hmap from parent, if present *)
Option.iter
(fun (p : _ t) -> Fls.Private_hmap_ls_.copy_fls p.pfiber pfiber)
parent;
(match parent with
| Some p when is_closed p -> failwith "spawn: nursery is closed"
| _ -> ());
let fib = create_ ~pfiber ~runner ~res () in
let run () =
(* make sure the fiber is accessible from inside itself *)
FLS.set pfiber k_current_fiber (Any fib);
try
let res = f () in
resolve_ok_ fib res
with exn ->
let bt = Printexc.get_raw_backtrace () in
let ebt = Exn_bt.make exn bt in
resolve_as_failed_ fib ebt
in
Runner.run_async ~fiber:pfiber runner run;
fib
let spawn_top ~on f : _ t = spawn_ ~runner:on ~parent:None f
let spawn ?on ?(protect = true) f : _ t =
(* spawn [f()] with a copy of our local storage *)
let (Any p) =
try get_cur_exn ()
with Not_set ->
failwith "Fiber.spawn: must be run from within another fiber."
in
let runner =
match on with
| Some r -> r
| None -> p.runner
in
let child = spawn_ ~parent:(Some p) ~runner f in
add_child_ ~protect p child;
child
let[@inline] spawn_ignore ?on ?protect f : unit =
ignore (spawn ?on ?protect f : _ t)
let[@inline] spawn_top_ignore ~on f : unit = ignore (spawn_top ~on f : _ t)
let[@inline] self () : any =
match get_cur_exn () with
| exception Not_set -> failwith "Fiber.self: must be run from inside a fiber."
| f -> f
let with_on_self_cancel cb (k : unit -> 'a) : 'a =
let (Any self) = self () in
let h = add_on_cancel self cb in
Fun.protect k ~finally:(fun () -> remove_on_cancel self h)
let[@inline] check_if_cancelled_ (self : _ t) = PF.check self.pfiber
let check_if_cancelled () =
match get_cur_exn () with
| exception Not_set ->
failwith "Fiber.check_if_cancelled: must be run from inside a fiber."
| Any self -> check_if_cancelled_ self
let yield () : unit =
match get_cur_exn () with
| exception Not_set ->
failwith "Fiber.yield: must be run from inside a fiber."
| Any self ->
check_if_cancelled_ self;
PF.yield ();
check_if_cancelled_ self

View file

@ -1,150 +0,0 @@
(** Fibers.
A fiber is a lightweight computation that runs cooperatively alongside other
fibers. In the context of moonpool, fibers have additional properties:
- they run in a moonpool runner
- they form a simple supervision tree, enabling a limited form of structured
concurrency *)
type cancel_callback = Exn_bt.t -> unit
(** A callback used in case of cancellation *)
(**/**)
(** Do not rely on this, it is internal implementation details. *)
module Private_ : sig
type 'a state
type pfiber
type 'a t = private {
id: Handle.t; (** unique identifier for this fiber *)
state: 'a state Atomic.t; (** Current state in the lifetime of the fiber *)
res: 'a Fut.t;
runner: Runner.t;
pfiber: pfiber;
}
(** Type definition, exposed so that {!any} can be unboxed. Please do not rely
on that. *)
type any = Any : _ t -> any [@@unboxed]
exception Not_set
val get_cur_exn : unit -> any
(** [get_cur_exn ()] either returns the current fiber, or
@raise Not_set if run outside a fiber. *)
val get_cur_opt : unit -> any option
end
(**/**)
type 'a t = 'a Private_.t
(** A fiber returning a value of type ['a]. *)
val res : 'a t -> 'a Fut.t
(** Future result of the fiber. *)
type 'a callback = 'a Exn_bt.result -> unit
(** Callbacks that are called when a fiber is done. *)
(** Type erased fiber *)
type any = Private_.any = Any : _ t -> any [@@unboxed]
val return : 'a -> 'a t
val fail : Exn_bt.t -> _ t
val self : unit -> any
(** [self ()] is the current fiber. Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *)
val peek : 'a t -> 'a Fut.or_error option
(** Peek inside the future result *)
val is_done : _ t -> bool
(** Has the fiber completed? *)
val is_cancelled : _ t -> bool
(** Has the fiber completed with a failure? *)
val is_success : _ t -> bool
(** Has the fiber completed with a value? *)
val await : 'a t -> 'a
(** [await fib] is like [Fut.await (res fib)] *)
val wait_block_exn : 'a t -> 'a
(** [wait_block_exn fib] is [Fut.wait_block_exn (res fib)]. {b NOTE}: See
{!Fut.wait_block} for warnings about deadlocks. *)
val wait_block : 'a t -> 'a Fut.or_error
(** [wait_block fib] is [Fut.wait_block (res fib)]. {b NOTE}: See
{!Fut.wait_block} for warnings about deadlocks. *)
val check_if_cancelled : unit -> unit
(** Check if the current fiber is cancelled, in which case this raises. Must be
run from inside a fiber.
@raise e if the current fiber is cancelled with exception [e]
@raise Failure if not run from a fiber. *)
val yield : unit -> unit
(** Yield control to the scheduler from the current fiber.
@raise Failure if not run from inside a fiber. *)
type cancel_handle
(** An opaque handle for a single cancel callback in a fiber *)
val add_on_cancel : _ t -> cancel_callback -> cancel_handle
(** [add_on_cancel fib cb] adds [cb] to the list of cancel callbacks for [fib].
If [fib] is already cancelled, [cb] is called immediately. *)
val remove_on_cancel : _ t -> cancel_handle -> unit
(** [remove_on_cancel fib h] removes the cancel callback associated with handle
[h]. *)
val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'a
(** [with_on_cancel fib cb (fun () -> <e>)] evaluates [e] in a scope in which,
if the fiber [fib] is cancelled, [cb()] is called. If [e] returns without
the fiber being cancelled, this callback is removed. *)
val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'a
(** [with_on_self_cancel cb f] calls [f()] in a scope where [cb] is added to the
cancel callbacks of the current fiber; and [f()] terminates, [cb] is removed
from the list. *)
val on_result : 'a t -> 'a callback -> unit
(** Wait for fiber to be done and call the callback with the result. If the
fiber is done already then the callback is invoked immediately with its
result. *)
val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t
(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. This
fiber is not the child of any other fiber: its lifetime is only determined
by the lifetime of [f()]. *)
val spawn : ?on:Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a t
(** [spawn ~protect f] spawns a sub-fiber [f_child] from a running fiber
[parent]. The sub-fiber [f_child] is attached to the current fiber and fails
if the current fiber [parent] fails.
@param on
if provided, start the fiber on the given runner. If not provided, use the
parent's runner.
@param protect
if true, when [f_child] fails, it does not affect [parent]. If false,
[f_child] failing also causes [parent] to fail (and therefore all other
children of [parent]). Default is [true].
Must be run from inside a fiber.
@raise Failure if not run from inside a fiber. *)
val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit
(** [spawn_ignore f] is [ignore (spawn f)]. The fiber will still affect
termination of the parent, ie. the parent will exit only after this new
fiber exits.
@param on the optional runner to use, added since 0.7 *)
val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit
(** Like {!spawn_top} but ignores the result.
@since 0.7 *)

View file

@ -1 +0,0 @@
include Task_local_storage

View file

@ -1,17 +0,0 @@
(** Fiber-local storage.
This storage is associated to the current fiber, just like thread-local
storage is associated with the current thread.
See {!Moonpool.Task_local_storage} for more general information, as this is
based on it.
{b NOTE}: it's important to note that, while each fiber has its own storage,
spawning a sub-fiber [f2] from a fiber [f1] will only do a shallow copy of
the storage. Values inside [f1]'s storage will be physically shared with
[f2]. It is thus recommended to store only persistent values in the local
storage. *)
include module type of struct
include Task_local_storage
end

View file

@ -1,14 +0,0 @@
module A = Atomic
type t = int
let counter_ = A.make 0
let equal : t -> t -> bool = ( = )
let compare : t -> t -> int = Stdlib.compare
let[@inline] generate_fresh () = A.fetch_and_add counter_ 1
(* TODO: better hash *)
let[@inline] hash x = x land max_int
module Set = Set.Make (Int)
module Map = Map.Make (Int)

View file

@ -1,17 +0,0 @@
(** The unique name of a fiber.
Each fiber has a unique handle that can be used to refer to it in maps or
sets. *)
type t = private int
(** Unique, opaque identifier for a fiber. *)
val equal : t -> t -> bool
val compare : t -> t -> int
val hash : t -> int
val generate_fresh : unit -> t
(** Generate a fresh, unique identifier *)
module Set : Set.S with type elt = t
module Map : Map.S with type key = t

View file

@ -1,26 +0,0 @@
exception Oh_no of Exn_bt.t
let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
let worker_st =
Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ())
~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt)))
()
in
let runner = Fifo_pool.Private_.runner_of_state worker_st in
try
let fiber = Fiber.spawn_top ~on:runner (fun () -> f runner) in
Fiber.on_result fiber (fun _ -> Runner.shutdown_without_waiting runner);
(* run the main thread *)
Moonpool.Private.Worker_loop_.worker_loop worker_st
~block_signals (* do not disturb existing thread *)
~ops:Fifo_pool.Private_.worker_ops;
match Fiber.peek fiber with
| Some (Ok x) -> x
| Some (Error ebt) -> Exn_bt.raise ebt
| None -> assert false
with Oh_no ebt -> Exn_bt.raise ebt
let main f =
main' () f ~block_signals:false (* do not disturb existing thread *)

View file

@ -1,28 +0,0 @@
(** Main thread.
This is evolved from [Moonpool.Immediate_runner], but unlike it, this API
assumes you run it in a thread (possibly the main thread) which will block
until the initial computation is done.
This means it's reasonable to use [Main.main (fun () -> do_everything)] at
the beginning of the program. Other Moonpool pools can be created for
background tasks, etc. to do the heavy lifting, and the main thread (inside
this immediate runner) can coordinate tasks via [Fiber.await].
Aside from the fact that this blocks the caller thread, it is fairly similar
to {!Background_thread} in that there's a single worker to process
tasks/fibers.
This handles effects, including the ones in {!Fiber}.
@since 0.6 *)
val main : (Moonpool.Runner.t -> 'a) -> 'a
(** [main f] runs [f()] in a scope that handles effects, including
{!Fiber.await}.
This scope can run background tasks as well, in a cooperative fashion. *)
val main' : ?block_signals:bool -> unit -> (Moonpool.Runner.t -> 'a) -> 'a
(** Same as {!main} but with room for optional arguments.
@since 0.7 *)

View file

@ -1,12 +0,0 @@
(** Fibers for moonpool.
See {!Fiber} for the most important explanations.
@since 0.6. *)
module Fiber = Fiber
module Fls = Fls
module Handle = Handle
module Main = Main
include Fiber
include Main

View file

@ -5,7 +5,6 @@
(>= %{ocaml_version} 5.0))
(libraries
(re_export moonpool)
moonpool.fib
picos
(re_export lwt)
lwt.unix))

View file

@ -289,7 +289,7 @@ let[@inline] is_setup () = Option.is_some @@ Atomic.get Scheduler_state.cur_st
let spawn_lwt f : _ Lwt.t =
let st = Main_state.get_st () in
let lwt_fut, lwt_prom = Lwt.wait () in
Moonpool_fib.spawn_top_ignore ~on:st.as_runner (fun () ->
M.run_async st.as_runner (fun () ->
try
let x = f () in
Lwt.wakeup lwt_prom x