diff --git a/src/fib/dune b/src/fib/dune deleted file mode 100644 index 412c420b..00000000 --- a/src/fib/dune +++ /dev/null @@ -1,6 +0,0 @@ -(library - (name moonpool_fib) - (public_name moonpool.fib) - (synopsis "Fibers and structured concurrency for Moonpool") - (libraries moonpool picos) - (flags :standard -open Moonpool_private -open Moonpool)) diff --git a/src/fib/fiber.ml b/src/fib/fiber.ml deleted file mode 100644 index 255f1ab9..00000000 --- a/src/fib/fiber.ml +++ /dev/null @@ -1,334 +0,0 @@ -open Moonpool.Private.Types_ -module A = Atomic -module FM = Handle.Map -module Int_map = Map.Make (Int) -module PF = Picos.Fiber -module FLS = Picos.Fiber.FLS - -type 'a callback = 'a Exn_bt.result -> unit -(** Callbacks that are called when a fiber is done. *) - -type cancel_callback = Exn_bt.t -> unit - -let prom_of_fut : 'a Fut.t -> 'a Fut.promise = - Fut.Private_.unsafe_promise_of_fut - -(* TODO: replace with picos structured at some point? *) -module Private_ = struct - type pfiber = PF.t - - type 'a t = { - id: Handle.t; (** unique identifier for this fiber *) - state: 'a state A.t; (** Current state in the lifetime of the fiber *) - res: 'a Fut.t; - runner: Runner.t; - pfiber: pfiber; (** Associated picos fiber *) - } - - and 'a state = - | Alive of { - children: children; - on_cancel: cancel_callback Int_map.t; - cancel_id: int; - } - | Terminating_or_done of 'a Exn_bt.result A.t - - and children = any FM.t - and any = Any : _ t -> any [@@unboxed] - - (** Key to access the current moonpool.fiber. *) - let k_current_fiber : any FLS.t = FLS.create () - - exception Not_set = FLS.Not_set - - let[@inline] get_cur_from_exn (pfiber : pfiber) : any = - FLS.get_exn pfiber k_current_fiber - - let[@inline] get_cur_exn () : any = - get_cur_from_exn @@ get_current_fiber_exn () - - let[@inline] get_cur_opt () = try Some (get_cur_exn ()) with _ -> None - - let[@inline] is_closed (self : _ t) = - match A.get self.state with - | Alive _ -> false - | Terminating_or_done _ -> true -end - -include Private_ - -let create_ ~pfiber ~runner ~res () : 'a t = - let id = Handle.generate_fresh () in - { - state = - A.make - @@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 }; - id; - res; - runner; - pfiber; - } - -let create_done_ ~res () : _ t = - let id = Handle.generate_fresh () in - { - state = - A.make - @@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 }; - id; - res; - runner = Runner.dummy; - pfiber = Moonpool.Private.Types_._dummy_fiber; - } - -let[@inline] return x = create_done_ ~res:(Fut.return x) () -let[@inline] fail ebt = create_done_ ~res:(Fut.fail_exn_bt ebt) () -let[@inline] res self = self.res -let[@inline] peek self = Fut.peek self.res -let[@inline] is_done self = Fut.is_done self.res -let[@inline] is_success self = Fut.is_success self.res -let[@inline] is_cancelled self = Fut.is_failed self.res -let[@inline] on_result (self : _ t) f = Fut.on_result self.res f -let[@inline] await self = Fut.await self.res -let[@inline] wait_block self = Fut.wait_block self.res -let[@inline] wait_block_exn self = Fut.wait_block_exn self.res - -(** Resolve [promise] once [children] are all done *) -let resolve_once_children_are_done_ ~children ~promise - (res : 'a Exn_bt.result A.t) : unit = - let n_children = FM.cardinal children in - if n_children > 0 then ( - (* wait for all children to be done *) - let n_waiting = A.make (FM.cardinal children) in - let on_child_finish (r : _ result) = - (* make sure the parent fails if any child fails *) - (match r with - | Ok _ -> () - | Error ebt -> A.set res (Error ebt)); - - (* if we're the last to finish, resolve the parent fiber's [res] *) - if A.fetch_and_add n_waiting (-1) = 1 then ( - let res = A.get res in - Fut.fulfill promise res - ) - in - FM.iter (fun _ (Any f) -> Fut.on_result f.res on_child_finish) children - ) else - Fut.fulfill promise @@ A.get res - -let rec resolve_as_failed_ : type a. a t -> Exn_bt.t -> unit = - fun self ebt -> - let promise = prom_of_fut self.res in - while - match A.get self.state with - | Alive { children; cancel_id = _; on_cancel } as old -> - let new_st = Terminating_or_done (A.make @@ Error ebt) in - if A.compare_and_set self.state old new_st then ( - (* here, unlike in {!resolve_fiber}, we immediately cancel children *) - cancel_children_ ~children ebt; - Int_map.iter (fun _ cb -> cb ebt) on_cancel; - resolve_once_children_are_done_ ~children ~promise (A.make @@ Error ebt); - false - ) else - true - | Terminating_or_done _ -> false - do - () - done - -(** Cancel eagerly all children *) -and cancel_children_ ebt ~children : unit = - FM.iter (fun _ (Any f) -> resolve_as_failed_ f ebt) children - -type cancel_handle = int - -let add_on_cancel (self : _ t) cb : cancel_handle = - let h = ref 0 in - while - match A.get self.state with - | Alive { children; cancel_id; on_cancel } as old -> - let new_st = - Alive - { - children; - cancel_id = cancel_id + 1; - on_cancel = Int_map.add cancel_id cb on_cancel; - } - in - if A.compare_and_set self.state old new_st then ( - h := cancel_id; - false - ) else - true - | Terminating_or_done r -> - (match A.get r with - | Error ebt -> cb ebt - | Ok _ -> ()); - false - do - () - done; - !h - -let remove_on_cancel (self : _ t) h = - while - match A.get self.state with - | Alive ({ on_cancel; _ } as alive) as old -> - let new_st = - Alive { alive with on_cancel = Int_map.remove h on_cancel } - in - not (A.compare_and_set self.state old new_st) - | Terminating_or_done _ -> false - do - () - done - -let with_on_cancel (self : _ t) cb (k : unit -> 'a) : 'a = - let h = add_on_cancel self cb in - Fun.protect k ~finally:(fun () -> remove_on_cancel self h) - -(** Successfully resolve the fiber. This might still fail if some children - failed. *) -let resolve_ok_ (self : 'a t) (r : 'a) : unit = - let r = A.make @@ Ok r in - let promise = prom_of_fut self.res in - while - match A.get self.state with - | Alive { children; _ } as old -> - let new_st = Terminating_or_done r in - if A.compare_and_set self.state old new_st then ( - resolve_once_children_are_done_ ~children ~promise r; - false - ) else - true - | Terminating_or_done _ -> false - do - () - done - -let remove_child_ (self : _ t) (child : _ t) = - while - match A.get self.state with - | Alive ({ children; _ } as alive) as old -> - let new_st = - Alive { alive with children = FM.remove child.id children } - in - not (A.compare_and_set self.state old new_st) - | _ -> false - do - () - done - -(** Add a child to [self]. - @param protected if true, the child's failure will not affect [self]. *) -let add_child_ ~protect (self : _ t) (child : _ t) = - while - match A.get self.state with - | Alive ({ children; _ } as alive) as old -> - let new_st = - Alive { alive with children = FM.add child.id (Any child) children } - in - - if A.compare_and_set self.state old new_st then ( - (* make sure to remove [child] from [self.children] once it's done; - fail [self] is [child] failed and [protect=false] *) - Fut.on_result child.res (function - | Ok _ -> remove_child_ self child - | Error ebt -> - (* child failed, we must fail too *) - remove_child_ self child; - if not protect then resolve_as_failed_ self ebt); - false - ) else - true - | Terminating_or_done r -> - (match A.get r with - | Error ebt -> - (* cancel child immediately *) - resolve_as_failed_ child ebt - | Ok _ -> ()); - false - do - () - done - -let spawn_ ~parent ~runner (f : unit -> 'a) : 'a t = - let res, _ = Fut.make () in - let pfiber = PF.create ~forbid:false (Fut.Private_.as_computation res) in - - (* copy local hmap from parent, if present *) - Option.iter - (fun (p : _ t) -> Fls.Private_hmap_ls_.copy_fls p.pfiber pfiber) - parent; - - (match parent with - | Some p when is_closed p -> failwith "spawn: nursery is closed" - | _ -> ()); - let fib = create_ ~pfiber ~runner ~res () in - - let run () = - (* make sure the fiber is accessible from inside itself *) - FLS.set pfiber k_current_fiber (Any fib); - try - let res = f () in - resolve_ok_ fib res - with exn -> - let bt = Printexc.get_raw_backtrace () in - let ebt = Exn_bt.make exn bt in - resolve_as_failed_ fib ebt - in - - Runner.run_async ~fiber:pfiber runner run; - - fib - -let spawn_top ~on f : _ t = spawn_ ~runner:on ~parent:None f - -let spawn ?on ?(protect = true) f : _ t = - (* spawn [f()] with a copy of our local storage *) - let (Any p) = - try get_cur_exn () - with Not_set -> - failwith "Fiber.spawn: must be run from within another fiber." - in - - let runner = - match on with - | Some r -> r - | None -> p.runner - in - let child = spawn_ ~parent:(Some p) ~runner f in - add_child_ ~protect p child; - child - -let[@inline] spawn_ignore ?on ?protect f : unit = - ignore (spawn ?on ?protect f : _ t) - -let[@inline] spawn_top_ignore ~on f : unit = ignore (spawn_top ~on f : _ t) - -let[@inline] self () : any = - match get_cur_exn () with - | exception Not_set -> failwith "Fiber.self: must be run from inside a fiber." - | f -> f - -let with_on_self_cancel cb (k : unit -> 'a) : 'a = - let (Any self) = self () in - let h = add_on_cancel self cb in - Fun.protect k ~finally:(fun () -> remove_on_cancel self h) - -let[@inline] check_if_cancelled_ (self : _ t) = PF.check self.pfiber - -let check_if_cancelled () = - match get_cur_exn () with - | exception Not_set -> - failwith "Fiber.check_if_cancelled: must be run from inside a fiber." - | Any self -> check_if_cancelled_ self - -let yield () : unit = - match get_cur_exn () with - | exception Not_set -> - failwith "Fiber.yield: must be run from inside a fiber." - | Any self -> - check_if_cancelled_ self; - PF.yield (); - check_if_cancelled_ self diff --git a/src/fib/fiber.mli b/src/fib/fiber.mli deleted file mode 100644 index a6458015..00000000 --- a/src/fib/fiber.mli +++ /dev/null @@ -1,150 +0,0 @@ -(** Fibers. - - A fiber is a lightweight computation that runs cooperatively alongside other - fibers. In the context of moonpool, fibers have additional properties: - - - they run in a moonpool runner - - they form a simple supervision tree, enabling a limited form of structured - concurrency *) - -type cancel_callback = Exn_bt.t -> unit -(** A callback used in case of cancellation *) - -(**/**) - -(** Do not rely on this, it is internal implementation details. *) -module Private_ : sig - type 'a state - type pfiber - - type 'a t = private { - id: Handle.t; (** unique identifier for this fiber *) - state: 'a state Atomic.t; (** Current state in the lifetime of the fiber *) - res: 'a Fut.t; - runner: Runner.t; - pfiber: pfiber; - } - (** Type definition, exposed so that {!any} can be unboxed. Please do not rely - on that. *) - - type any = Any : _ t -> any [@@unboxed] - - exception Not_set - - val get_cur_exn : unit -> any - (** [get_cur_exn ()] either returns the current fiber, or - @raise Not_set if run outside a fiber. *) - - val get_cur_opt : unit -> any option -end - -(**/**) - -type 'a t = 'a Private_.t -(** A fiber returning a value of type ['a]. *) - -val res : 'a t -> 'a Fut.t -(** Future result of the fiber. *) - -type 'a callback = 'a Exn_bt.result -> unit -(** Callbacks that are called when a fiber is done. *) - -(** Type erased fiber *) -type any = Private_.any = Any : _ t -> any [@@unboxed] - -val return : 'a -> 'a t -val fail : Exn_bt.t -> _ t - -val self : unit -> any -(** [self ()] is the current fiber. Must be run from inside a fiber. - @raise Failure if not run from inside a fiber. *) - -val peek : 'a t -> 'a Fut.or_error option -(** Peek inside the future result *) - -val is_done : _ t -> bool -(** Has the fiber completed? *) - -val is_cancelled : _ t -> bool -(** Has the fiber completed with a failure? *) - -val is_success : _ t -> bool -(** Has the fiber completed with a value? *) - -val await : 'a t -> 'a -(** [await fib] is like [Fut.await (res fib)] *) - -val wait_block_exn : 'a t -> 'a -(** [wait_block_exn fib] is [Fut.wait_block_exn (res fib)]. {b NOTE}: See - {!Fut.wait_block} for warnings about deadlocks. *) - -val wait_block : 'a t -> 'a Fut.or_error -(** [wait_block fib] is [Fut.wait_block (res fib)]. {b NOTE}: See - {!Fut.wait_block} for warnings about deadlocks. *) - -val check_if_cancelled : unit -> unit -(** Check if the current fiber is cancelled, in which case this raises. Must be - run from inside a fiber. - @raise e if the current fiber is cancelled with exception [e] - @raise Failure if not run from a fiber. *) - -val yield : unit -> unit -(** Yield control to the scheduler from the current fiber. - @raise Failure if not run from inside a fiber. *) - -type cancel_handle -(** An opaque handle for a single cancel callback in a fiber *) - -val add_on_cancel : _ t -> cancel_callback -> cancel_handle -(** [add_on_cancel fib cb] adds [cb] to the list of cancel callbacks for [fib]. - If [fib] is already cancelled, [cb] is called immediately. *) - -val remove_on_cancel : _ t -> cancel_handle -> unit -(** [remove_on_cancel fib h] removes the cancel callback associated with handle - [h]. *) - -val with_on_cancel : _ t -> cancel_callback -> (unit -> 'a) -> 'a -(** [with_on_cancel fib cb (fun () -> )] evaluates [e] in a scope in which, - if the fiber [fib] is cancelled, [cb()] is called. If [e] returns without - the fiber being cancelled, this callback is removed. *) - -val with_on_self_cancel : cancel_callback -> (unit -> 'a) -> 'a -(** [with_on_self_cancel cb f] calls [f()] in a scope where [cb] is added to the - cancel callbacks of the current fiber; and [f()] terminates, [cb] is removed - from the list. *) - -val on_result : 'a t -> 'a callback -> unit -(** Wait for fiber to be done and call the callback with the result. If the - fiber is done already then the callback is invoked immediately with its - result. *) - -val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t -(** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. This - fiber is not the child of any other fiber: its lifetime is only determined - by the lifetime of [f()]. *) - -val spawn : ?on:Runner.t -> ?protect:bool -> (unit -> 'a) -> 'a t -(** [spawn ~protect f] spawns a sub-fiber [f_child] from a running fiber - [parent]. The sub-fiber [f_child] is attached to the current fiber and fails - if the current fiber [parent] fails. - - @param on - if provided, start the fiber on the given runner. If not provided, use the - parent's runner. - @param protect - if true, when [f_child] fails, it does not affect [parent]. If false, - [f_child] failing also causes [parent] to fail (and therefore all other - children of [parent]). Default is [true]. - - Must be run from inside a fiber. - @raise Failure if not run from inside a fiber. *) - -val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit -(** [spawn_ignore f] is [ignore (spawn f)]. The fiber will still affect - termination of the parent, ie. the parent will exit only after this new - fiber exits. - @param on the optional runner to use, added since 0.7 *) - -val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit -(** Like {!spawn_top} but ignores the result. - @since 0.7 *) diff --git a/src/fib/fls.ml b/src/fib/fls.ml deleted file mode 100644 index ed2162c4..00000000 --- a/src/fib/fls.ml +++ /dev/null @@ -1 +0,0 @@ -include Task_local_storage diff --git a/src/fib/fls.mli b/src/fib/fls.mli deleted file mode 100644 index 15ae4f2f..00000000 --- a/src/fib/fls.mli +++ /dev/null @@ -1,17 +0,0 @@ -(** Fiber-local storage. - - This storage is associated to the current fiber, just like thread-local - storage is associated with the current thread. - - See {!Moonpool.Task_local_storage} for more general information, as this is - based on it. - - {b NOTE}: it's important to note that, while each fiber has its own storage, - spawning a sub-fiber [f2] from a fiber [f1] will only do a shallow copy of - the storage. Values inside [f1]'s storage will be physically shared with - [f2]. It is thus recommended to store only persistent values in the local - storage. *) - -include module type of struct - include Task_local_storage -end diff --git a/src/fib/handle.ml b/src/fib/handle.ml deleted file mode 100644 index f73ed58d..00000000 --- a/src/fib/handle.ml +++ /dev/null @@ -1,14 +0,0 @@ -module A = Atomic - -type t = int - -let counter_ = A.make 0 -let equal : t -> t -> bool = ( = ) -let compare : t -> t -> int = Stdlib.compare -let[@inline] generate_fresh () = A.fetch_and_add counter_ 1 - -(* TODO: better hash *) -let[@inline] hash x = x land max_int - -module Set = Set.Make (Int) -module Map = Map.Make (Int) diff --git a/src/fib/handle.mli b/src/fib/handle.mli deleted file mode 100644 index 6e1c13a9..00000000 --- a/src/fib/handle.mli +++ /dev/null @@ -1,17 +0,0 @@ -(** The unique name of a fiber. - - Each fiber has a unique handle that can be used to refer to it in maps or - sets. *) - -type t = private int -(** Unique, opaque identifier for a fiber. *) - -val equal : t -> t -> bool -val compare : t -> t -> int -val hash : t -> int - -val generate_fresh : unit -> t -(** Generate a fresh, unique identifier *) - -module Set : Set.S with type elt = t -module Map : Map.S with type key = t diff --git a/src/fib/main.ml b/src/fib/main.ml deleted file mode 100644 index 00174a21..00000000 --- a/src/fib/main.ml +++ /dev/null @@ -1,26 +0,0 @@ -exception Oh_no of Exn_bt.t - -let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a = - let worker_st = - Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ()) - ~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt))) - () - in - let runner = Fifo_pool.Private_.runner_of_state worker_st in - try - let fiber = Fiber.spawn_top ~on:runner (fun () -> f runner) in - Fiber.on_result fiber (fun _ -> Runner.shutdown_without_waiting runner); - - (* run the main thread *) - Moonpool.Private.Worker_loop_.worker_loop worker_st - ~block_signals (* do not disturb existing thread *) - ~ops:Fifo_pool.Private_.worker_ops; - - match Fiber.peek fiber with - | Some (Ok x) -> x - | Some (Error ebt) -> Exn_bt.raise ebt - | None -> assert false - with Oh_no ebt -> Exn_bt.raise ebt - -let main f = - main' () f ~block_signals:false (* do not disturb existing thread *) diff --git a/src/fib/main.mli b/src/fib/main.mli deleted file mode 100644 index 85cad3d9..00000000 --- a/src/fib/main.mli +++ /dev/null @@ -1,28 +0,0 @@ -(** Main thread. - - This is evolved from [Moonpool.Immediate_runner], but unlike it, this API - assumes you run it in a thread (possibly the main thread) which will block - until the initial computation is done. - - This means it's reasonable to use [Main.main (fun () -> do_everything)] at - the beginning of the program. Other Moonpool pools can be created for - background tasks, etc. to do the heavy lifting, and the main thread (inside - this immediate runner) can coordinate tasks via [Fiber.await]. - - Aside from the fact that this blocks the caller thread, it is fairly similar - to {!Background_thread} in that there's a single worker to process - tasks/fibers. - - This handles effects, including the ones in {!Fiber}. - - @since 0.6 *) - -val main : (Moonpool.Runner.t -> 'a) -> 'a -(** [main f] runs [f()] in a scope that handles effects, including - {!Fiber.await}. - - This scope can run background tasks as well, in a cooperative fashion. *) - -val main' : ?block_signals:bool -> unit -> (Moonpool.Runner.t -> 'a) -> 'a -(** Same as {!main} but with room for optional arguments. - @since 0.7 *) diff --git a/src/fib/moonpool_fib.ml b/src/fib/moonpool_fib.ml deleted file mode 100644 index ec89c075..00000000 --- a/src/fib/moonpool_fib.ml +++ /dev/null @@ -1,12 +0,0 @@ -(** Fibers for moonpool. - - See {!Fiber} for the most important explanations. - - @since 0.6. *) - -module Fiber = Fiber -module Fls = Fls -module Handle = Handle -module Main = Main -include Fiber -include Main diff --git a/src/lwt/dune b/src/lwt/dune index 93c86e61..c6bb5ab3 100644 --- a/src/lwt/dune +++ b/src/lwt/dune @@ -5,7 +5,6 @@ (>= %{ocaml_version} 5.0)) (libraries (re_export moonpool) - moonpool.fib picos (re_export lwt) lwt.unix)) diff --git a/src/lwt/moonpool_lwt.ml b/src/lwt/moonpool_lwt.ml index 71b1f941..07b98042 100644 --- a/src/lwt/moonpool_lwt.ml +++ b/src/lwt/moonpool_lwt.ml @@ -289,7 +289,7 @@ let[@inline] is_setup () = Option.is_some @@ Atomic.get Scheduler_state.cur_st let spawn_lwt f : _ Lwt.t = let st = Main_state.get_st () in let lwt_fut, lwt_prom = Lwt.wait () in - Moonpool_fib.spawn_top_ignore ~on:st.as_runner (fun () -> + M.run_async st.as_runner (fun () -> try let x = f () in Lwt.wakeup lwt_prom x