diff --git a/src/core/fut.ml b/src/core/fut.ml index 343a847d..0c72752d 100644 --- a/src/core/fut.ml +++ b/src/core/fut.ml @@ -436,6 +436,8 @@ let await (self : 'a t) : 'a = (* un-suspended: we should have a result! *) get_or_fail_exn self +let yield = Picos.Fiber.yield + module Infix = struct let[@inline] ( >|= ) x f = map ~f x let[@inline] ( >>= ) x f = bind ~f x diff --git a/src/core/fut.mli b/src/core/fut.mli index daa9b8d8..de100ac8 100644 --- a/src/core/fut.mli +++ b/src/core/fut.mli @@ -265,6 +265,10 @@ val await : 'a t -> 'a This must only be run from inside the runner itself. The runner must support {!Suspend_}. *) +val yield : unit -> unit +(** Like {!Moonpool.yield}. + @since NEXT_RELEASE *) + (** {2 Blocking} *) val wait_block : 'a t -> 'a or_error diff --git a/src/core/main.ml b/src/core/main.ml new file mode 100644 index 00000000..9325fd3a --- /dev/null +++ b/src/core/main.ml @@ -0,0 +1,26 @@ +exception Oh_no of Exn_bt.t + +let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a = + let worker_st = + Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ()) + ~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt))) + () + in + let runner = Fifo_pool.Private_.runner_of_state worker_st in + try + let fiber = Fut.spawn ~on:runner (fun () -> f runner) in + Fut.on_result fiber (fun _ -> Runner.shutdown_without_waiting runner); + + (* run the main thread *) + Worker_loop_.worker_loop worker_st + ~block_signals (* do not disturb existing thread *) + ~ops:Fifo_pool.Private_.worker_ops; + + match Fut.peek fiber with + | Some (Ok x) -> x + | Some (Error ebt) -> Exn_bt.raise ebt + | None -> assert false + with Oh_no ebt -> Exn_bt.raise ebt + +let main f = + main' () f ~block_signals:false (* do not disturb existing thread *) diff --git a/src/core/main.mli b/src/core/main.mli new file mode 100644 index 00000000..1d88406a --- /dev/null +++ b/src/core/main.mli @@ -0,0 +1,30 @@ +(** Main thread. + + This is evolved from [Moonpool.Immediate_runner], but unlike it, this API + assumes you run it in a thread (possibly the main thread) which will block + until the initial computation is done. + + This means it's reasonable to use [Main.main (fun () -> do_everything)] at + the beginning of the program. Other Moonpool pools can be created for + background tasks, etc. to do the heavy lifting, and the main thread (inside + this immediate runner) can coordinate tasks via [Fiber.await]. + + Aside from the fact that this blocks the caller thread, it is fairly similar + to {!Background_thread} in that there's a single worker to process + tasks/fibers. + + This handles the concurency effects used in moonpool, including [await] and + [yield]. + + This module was migrated from the late [Moonpool_fib]. + + @since NEXT_RELEASE *) + +val main : (Runner.t -> 'a) -> 'a +(** [main f] runs [f()] in a scope that handles effects, including + {!Fiber.await}. + + This scope can run background tasks as well, in a cooperative fashion. *) + +val main' : ?block_signals:bool -> unit -> (Runner.t -> 'a) -> 'a +(** Same as {!main} but with room for optional arguments. *) diff --git a/src/core/moonpool.ml b/src/core/moonpool.ml index 6b9aebe0..d41335e5 100644 --- a/src/core/moonpool.ml +++ b/src/core/moonpool.ml @@ -23,6 +23,7 @@ module Exn_bt = Exn_bt module Fifo_pool = Fifo_pool module Fut = Fut module Lock = Lock +module Main = Main module Immediate_runner = struct end module Runner = Runner module Task_local_storage = Task_local_storage @@ -30,6 +31,9 @@ module Thread_local_storage = Thread_local_storage module Trigger = Trigger module Ws_pool = Ws_pool +(* re-export main *) +include Main + module Private = struct module Ws_deque_ = Ws_deque_ module Worker_loop_ = Worker_loop_ diff --git a/src/core/moonpool.mli b/src/core/moonpool.mli index b848a077..aca417f3 100644 --- a/src/core/moonpool.mli +++ b/src/core/moonpool.mli @@ -13,6 +13,7 @@ module Fifo_pool = Fifo_pool module Background_thread = Background_thread module Runner = Runner module Trigger = Trigger +module Main = Main module Immediate_runner : sig end [@@deprecated "use Moonpool_fib.Main"] @@ -205,6 +206,10 @@ module Atomic = Atomic This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic] module on OCaml 5. *) +include module type of struct + include Main +end + (**/**) (** Private internals, with no stability guarantees *)