Compare commits

...

12 commits

Author SHA1 Message Date
Simon Cruanes
e17d39637f
Merge 01cdb66f1f into ed0eda226c 2025-04-24 06:46:20 +02:00
Simon Cruanes
ed0eda226c
prepare for 0.8 2025-04-17 16:35:19 -04:00
Simon Cruanes
2b00a0cea1
feat(exn_bt): in show/pp, do print the backtrace when present 2025-04-15 10:10:02 -04:00
Simon Cruanes
3a5eaaa44d
api(fut): public alias 'a Fut.t = 'a Picos.Computation.t 2025-03-19 17:40:17 -04:00
Simon Cruanes
f0ea8c294d
single system call for signal blocking 2025-03-13 15:42:04 -04:00
Simon Cruanes
dd88008a0a
fix: do not die if we fail to block a signal 2025-03-13 10:45:21 -04:00
Simon Cruanes
c51a0a6bd4
don't try to block sigstop 2025-03-13 10:45:01 -04:00
Simon Cruanes
deb96302e1
mli for worker loop 2025-03-13 10:07:39 -04:00
Simon Cruanes
a20208ec37
feat: block signals in workers if asked to 2025-03-13 10:07:20 -04:00
Simon Cruanes
389f237993
CI
Some checks failed
github pages / Deploy doc (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / build-compat (push) Has been cancelled
Build and Test / format (push) Has been cancelled
2025-02-21 14:02:05 -05:00
Simon Cruanes
01cdb66f1f
avoid recursion in dpool 2024-10-09 00:26:30 -04:00
Simon Cruanes
8cb09c01c4
fix domain pool: block signals in background threads
close #35
2024-10-08 15:28:04 -04:00
19 changed files with 159 additions and 63 deletions

View file

@ -13,7 +13,7 @@ jobs:
- uses: actions/checkout@main - uses: actions/checkout@main
- name: Use OCaml - name: Use OCaml
uses: ocaml/setup-ocaml@v2 uses: ocaml/setup-ocaml@v3
with: with:
ocaml-compiler: '5.0' ocaml-compiler: '5.0'
dune-cache: true dune-cache: true

View file

@ -23,7 +23,7 @@ jobs:
steps: steps:
- uses: actions/checkout@main - uses: actions/checkout@main
- name: Use OCaml ${{ matrix.ocaml-compiler }} - name: Use OCaml ${{ matrix.ocaml-compiler }}
uses: ocaml/setup-ocaml@v2 uses: ocaml/setup-ocaml@v3
with: with:
ocaml-compiler: ${{ matrix.ocaml-compiler }} ocaml-compiler: ${{ matrix.ocaml-compiler }}
dune-cache: true dune-cache: true
@ -58,7 +58,7 @@ jobs:
steps: steps:
- uses: actions/checkout@main - uses: actions/checkout@main
- name: Use OCaml ${{ matrix.ocaml-compiler }} - name: Use OCaml ${{ matrix.ocaml-compiler }}
uses: ocaml/setup-ocaml@v2 uses: ocaml/setup-ocaml@v3
with: with:
ocaml-compiler: ${{ matrix.ocaml-compiler }} ocaml-compiler: ${{ matrix.ocaml-compiler }}
dune-cache: true dune-cache: true
@ -83,7 +83,7 @@ jobs:
steps: steps:
- uses: actions/checkout@main - uses: actions/checkout@main
- name: Use OCaml ${{ matrix.ocaml-compiler }} - name: Use OCaml ${{ matrix.ocaml-compiler }}
uses: ocaml/setup-ocaml@v2 uses: ocaml/setup-ocaml@v3
with: with:
ocaml-compiler: ${{ matrix.ocaml-compiler }} ocaml-compiler: ${{ matrix.ocaml-compiler }}
dune-cache: true dune-cache: true

View file

@ -1,4 +1,17 @@
# 0.8
- api(fut): make alias `'a Fut.t = 'a Picos.Computation.t` public
- feat: add `Fut.make_promise`, have `'a promise = private 'a fut`
- feat(exn_bt): in show/pp, do print the backtrace when present
- feat: block signals in workers if asked to
- relax bound on picos to 0.5-0.6
- feat fib: `spawn_ignore` now has `?on` optional param
- change Moonpool.Chan so it's bounded (stil experimental)
- fix task local storage: type was too specific
- fix fiber: use a single fut/computation in fibers
# 0.7 # 0.7
- add `Moonpool_fiber.spawn_top_ignore` - add `Moonpool_fiber.spawn_top_ignore`

View file

@ -2,7 +2,7 @@
(using mdx 0.2) (using mdx 0.2)
(name moonpool) (name moonpool)
(version 0.7) (version 0.8)
(generate_opam_files true) (generate_opam_files true)
(source (source
(github c-cube/moonpool)) (github c-cube/moonpool))

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead # This file is generated by dune, edit dune-project instead
opam-version: "2.0" opam-version: "2.0"
version: "0.7" version: "0.8"
synopsis: "Async IO for moonpool, relying on picos (experimental)" synopsis: "Async IO for moonpool, relying on picos (experimental)"
maintainer: ["Simon Cruanes"] maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"] authors: ["Simon Cruanes"]

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead # This file is generated by dune, edit dune-project instead
opam-version: "2.0" opam-version: "2.0"
version: "0.7" version: "0.8"
synopsis: "Event loop for moonpool based on Lwt-engine (experimental)" synopsis: "Event loop for moonpool based on Lwt-engine (experimental)"
maintainer: ["Simon Cruanes"] maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"] authors: ["Simon Cruanes"]

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead # This file is generated by dune, edit dune-project instead
opam-version: "2.0" opam-version: "2.0"
version: "0.7" version: "0.8"
synopsis: "Pools of threads supported by a pool of domains" synopsis: "Pools of threads supported by a pool of domains"
maintainer: ["Simon Cruanes"] maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"] authors: ["Simon Cruanes"]

View file

@ -3,7 +3,7 @@
The channels have bounded size. Push/pop return futures or can use effects The channels have bounded size. Push/pop return futures or can use effects
to provide an [await]-friendly version. to provide an [await]-friendly version.
The channels became bounded since @NEXT_RELEASE . The channels became bounded since @0.7 .
*) *)
type 'a t type 'a t
@ -35,13 +35,13 @@ val push : 'a t -> 'a -> unit
(** Push the value into the channel, suspending the current task (** Push the value into the channel, suspending the current task
if the channel is currently full. if the channel is currently full.
@raise Closed if the channel is closed @raise Closed if the channel is closed
@since NEXT_RELEASE *) @since 0.7 *)
val pop : 'a t -> 'a val pop : 'a t -> 'a
(** Pop an element. This might suspend the current task if the (** Pop an element. This might suspend the current task if the
channel is currently empty. channel is currently empty.
@raise Closed if the channel is empty and closed. @raise Closed if the channel is empty and closed.
@since NEXT_RELEASE *) @since 0.7 *)
(* (*
val pop_block_exn : 'a t -> 'a val pop_block_exn : 'a t -> 'a

View file

@ -3,7 +3,15 @@ type t = exn * Printexc.raw_backtrace
let[@inline] make exn bt : t = exn, bt let[@inline] make exn bt : t = exn, bt
let[@inline] exn (e, _) = e let[@inline] exn (e, _) = e
let[@inline] bt (_, bt) = bt let[@inline] bt (_, bt) = bt
let show self = Printexc.to_string (exn self)
let show self =
let bt = Printexc.raw_backtrace_to_string (bt self) in
let exn = Printexc.to_string (exn self) in
if bt = "" then
exn
else
Printf.sprintf "%s\n%s" exn bt
let pp out self = Format.pp_print_string out (show self) let pp out self = Format.pp_print_string out (show self)
let[@inline] raise (e, bt) = Printexc.raise_with_backtrace e bt let[@inline] raise (e, bt) = Printexc.raise_with_backtrace e bt

View file

@ -165,7 +165,9 @@ let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
create the thread and push it into [receive_threads] *) create the thread and push it into [receive_threads] *)
let create_thread_in_domain () = let create_thread_in_domain () =
let st = { idx = i; dom_idx; st = pool } in let st = { idx = i; dom_idx; st = pool } in
let thread = Thread.create (WL.worker_loop ~ops:worker_ops) st in let thread =
Thread.create (WL.worker_loop ~block_signals:true ~ops:worker_ops) st
in
(* send the thread from the domain back to us *) (* send the thread from the domain back to us *)
Bb_queue.push receive_threads (i, thread) Bb_queue.push receive_threads (i, thread)
in in

View file

@ -3,23 +3,23 @@ module C = Picos.Computation
type 'a or_error = ('a, Exn_bt.t) result type 'a or_error = ('a, Exn_bt.t) result
type 'a waiter = 'a or_error -> unit type 'a waiter = 'a or_error -> unit
type 'a t = { st: 'a C.t } [@@unboxed] type 'a t = 'a C.t
type 'a promise = 'a t type 'a promise = 'a t
let[@inline] make_promise () : _ t = let[@inline] make_promise () : _ t =
let fut = { st = C.create ~mode:`LIFO () } in let fut = C.create ~mode:`LIFO () in
fut fut
let make () = let make () =
let fut = make_promise () in let fut = make_promise () in
fut, fut fut, fut
let[@inline] return x : _ t = { st = C.returned x } let[@inline] return x : _ t = C.returned x
let[@inline] fail exn bt : _ t = let[@inline] fail exn bt : _ t =
let st = C.create () in let fut = C.create () in
C.cancel st exn bt; C.cancel fut exn bt;
{ st } fut
let[@inline] fail_exn_bt ebt = fail (Exn_bt.exn ebt) (Exn_bt.bt ebt) let[@inline] fail_exn_bt ebt = fail (Exn_bt.exn ebt) (Exn_bt.bt ebt)
@ -27,32 +27,32 @@ let[@inline] of_result = function
| Ok x -> return x | Ok x -> return x
| Error ebt -> fail_exn_bt ebt | Error ebt -> fail_exn_bt ebt
let[@inline] is_resolved self : bool = not (C.is_running self.st) let[@inline] is_resolved self : bool = not (C.is_running self)
let is_done = is_resolved let is_done = is_resolved
let[@inline] peek self : _ option = C.peek self.st let peek : 'a t -> _ option = C.peek
let[@inline] raise_if_failed self : unit = C.check self.st let raise_if_failed : _ t -> unit = C.check
let[@inline] is_success self = let[@inline] is_success self =
match C.peek_exn self.st with match C.peek_exn self with
| _ -> true | _ -> true
| exception _ -> false | exception _ -> false
let[@inline] is_failed self = C.is_canceled self.st let is_failed : _ t -> bool = C.is_canceled
exception Not_ready exception Not_ready
let[@inline] get_or_fail self = let[@inline] get_or_fail self =
match C.peek self.st with match C.peek self with
| Some x -> x | Some x -> x
| None -> raise Not_ready | None -> raise Not_ready
let[@inline] get_or_fail_exn self = let[@inline] get_or_fail_exn self =
match C.peek_exn self.st with match C.peek_exn self with
| x -> x | x -> x
| exception C.Running -> raise Not_ready | exception C.Running -> raise Not_ready
let[@inline] peek_or_assert_ (self : 'a t) : 'a = let[@inline] peek_or_assert_ (self : 'a t) : 'a =
match C.peek_exn self.st with match C.peek_exn self with
| x -> x | x -> x
| exception C.Running -> assert false | exception C.Running -> assert false
@ -67,32 +67,32 @@ let on_result (self : _ t) (f : _ waiter) : unit =
let trigger = let trigger =
(Trigger.from_action f self on_result_cb_ [@alert "-handler"]) (Trigger.from_action f self on_result_cb_ [@alert "-handler"])
in in
if not (C.try_attach self.st trigger) then on_result_cb_ () f self if not (C.try_attach self trigger) then on_result_cb_ () f self
let on_result_ignore_cb_ _tr f (self : _ t) = let on_result_ignore_cb_ _tr f (self : _ t) =
f (Picos.Computation.canceled self.st) f (Picos.Computation.canceled self)
let on_result_ignore (self : _ t) f : unit = let on_result_ignore (self : _ t) f : unit =
if Picos.Computation.is_running self.st then ( if Picos.Computation.is_running self then (
let trigger = let trigger =
(Trigger.from_action f self on_result_ignore_cb_ [@alert "-handler"]) (Trigger.from_action f self on_result_ignore_cb_ [@alert "-handler"])
in in
if not (C.try_attach self.st trigger) then on_result_ignore_cb_ () f self if not (C.try_attach self trigger) then on_result_ignore_cb_ () f self
) else ) else
on_result_ignore_cb_ () f self on_result_ignore_cb_ () f self
let[@inline] fulfill_idempotent self r = let[@inline] fulfill_idempotent self r =
match r with match r with
| Ok x -> C.return self.st x | Ok x -> C.return self x
| Error ebt -> C.cancel self.st (Exn_bt.exn ebt) (Exn_bt.bt ebt) | Error ebt -> C.cancel self (Exn_bt.exn ebt) (Exn_bt.bt ebt)
exception Already_fulfilled exception Already_fulfilled
let fulfill (self : _ t) (r : _ result) : unit = let fulfill (self : _ t) (r : _ result) : unit =
let ok = let ok =
match r with match r with
| Ok x -> C.try_return self.st x | Ok x -> C.try_return self x
| Error ebt -> C.try_cancel self.st (Exn_bt.exn ebt) (Exn_bt.bt ebt) | Error ebt -> C.try_cancel self (Exn_bt.exn ebt) (Exn_bt.bt ebt)
in in
if not ok then raise Already_fulfilled if not ok then raise Already_fulfilled
@ -104,10 +104,10 @@ let spawn ~on f : _ t =
let task () = let task () =
try try
let res = f () in let res = f () in
C.return fut.st res C.return fut res
with exn -> with exn ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
C.cancel fut.st exn bt C.cancel fut exn bt
in in
Runner.run_async on task; Runner.run_async on task;
@ -380,7 +380,7 @@ let for_list ~on l f : unit t =
let push_queue_ _tr q () = Bb_queue.push q () let push_queue_ _tr q () = Bb_queue.push q ()
let wait_block_exn (self : 'a t) : 'a = let wait_block_exn (self : 'a t) : 'a =
match C.peek_exn self.st with match C.peek_exn self with
| x -> x (* fast path *) | x -> x (* fast path *)
| exception C.Running -> | exception C.Running ->
let real_block () = let real_block () =
@ -394,7 +394,7 @@ let wait_block_exn (self : 'a t) : 'a =
assert attached; assert attached;
(* blockingly wait for trigger if computation didn't complete in the mean time *) (* blockingly wait for trigger if computation didn't complete in the mean time *)
if C.try_attach self.st trigger then Bb_queue.pop q; if C.try_attach self trigger then Bb_queue.pop q;
(* trigger was signaled! computation must be done*) (* trigger was signaled! computation must be done*)
peek_or_assert_ self peek_or_assert_ self
@ -406,7 +406,7 @@ let wait_block_exn (self : 'a t) : 'a =
if i = 0 then if i = 0 then
real_block () real_block ()
else ( else (
match C.peek_exn self.st with match C.peek_exn self with
| x -> x | x -> x
| exception C.Running -> | exception C.Running ->
Domain_.relax (); Domain_.relax ();
@ -426,12 +426,12 @@ let wait_block self =
let await (self : 'a t) : 'a = let await (self : 'a t) : 'a =
(* fast path: peek *) (* fast path: peek *)
match C.peek_exn self.st with match C.peek_exn self with
| res -> res | res -> res
| exception C.Running -> | exception C.Running ->
let trigger = Trigger.create () in let trigger = Trigger.create () in
(* suspend until the future is resolved *) (* suspend until the future is resolved *)
if C.try_attach self.st trigger then if C.try_attach self trigger then
Option.iter Exn_bt.raise @@ Trigger.await trigger; Option.iter Exn_bt.raise @@ Trigger.await trigger;
(* un-suspended: we should have a result! *) (* un-suspended: we should have a result! *)
@ -453,5 +453,5 @@ module Infix_local = Infix [@@deprecated "use Infix"]
module Private_ = struct module Private_ = struct
let[@inline] unsafe_promise_of_fut x = x let[@inline] unsafe_promise_of_fut x = x
let[@inline] as_computation self = self.st let[@inline] as_computation self = self
end end

View file

@ -19,13 +19,13 @@
type 'a or_error = ('a, Exn_bt.t) result type 'a or_error = ('a, Exn_bt.t) result
type 'a t type 'a t = 'a Picos.Computation.t
(** A future with a result of type ['a]. *) (** A future with a result of type ['a]. *)
type 'a promise = private 'a t type 'a promise = private 'a t
(** A promise, which can be fulfilled exactly once to set (** A promise, which can be fulfilled exactly once to set
the corresponding future. the corresponding future.
This is a private alias of ['a t] since NEXT_RELEASE, previously it was opaque. *) This is a private alias of ['a t] since 0.7, previously it was opaque. *)
val make : unit -> 'a t * 'a promise val make : unit -> 'a t * 'a promise
(** Make a new future with the associated promise. *) (** Make a new future with the associated promise. *)
@ -38,7 +38,7 @@ val make_promise : unit -> 'a promise
{[let prom = Fut.make_promise();; {[let prom = Fut.make_promise();;
let fut = (prom : _ Fut.promise :> _ Fut.t) ;; let fut = (prom : _ Fut.promise :> _ Fut.t) ;;
]} ]}
@since NEXT_RELEASE *) @since 0.7 *)
val on_result : 'a t -> ('a or_error -> unit) -> unit val on_result : 'a t -> ('a or_error -> unit) -> unit
(** [on_result fut f] registers [f] to be called in the future (** [on_result fut f] registers [f] to be called in the future

View file

@ -102,7 +102,24 @@ let with_handler ~ops:_ self f = f ()
[@@@endif] [@@@endif]
let worker_loop (type st) ~(ops : st ops) (self : st) : unit = let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
if block_signals then (
try
ignore
(Unix.sigprocmask SIG_BLOCK
[
Sys.sigterm;
Sys.sigpipe;
Sys.sigint;
Sys.sigchld;
Sys.sigalrm;
Sys.sigusr1;
Sys.sigusr2;
]
: _ list)
with _ -> ()
);
let cur_fiber : fiber ref = ref _dummy_fiber in let cur_fiber : fiber ref = ref _dummy_fiber in
let runner = ops.runner self in let runner = ops.runner self in
TLS.set Runner.For_runner_implementors.k_cur_runner runner; TLS.set Runner.For_runner_implementors.k_cur_runner runner;

37
src/core/worker_loop_.mli Normal file
View file

@ -0,0 +1,37 @@
(** Internal module that is used for workers.
A thread pool should use this [worker_loop] to run tasks,
handle effects, etc. *)
open Types_
type task_full =
| T_start of {
fiber: fiber;
f: unit -> unit;
}
| T_resume : {
fiber: fiber;
k: unit -> unit;
}
-> task_full
val _dummy_task : task_full
type around_task =
| AT_pair : (Runner.t -> 'a) * (Runner.t -> 'a -> unit) -> around_task
exception No_more_tasks
type 'st ops = {
schedule: 'st -> task_full -> unit;
get_next_task: 'st -> task_full;
get_thread_state: unit -> 'st;
around_task: 'st -> around_task;
on_exn: 'st -> Exn_bt.t -> unit;
runner: 'st -> Runner.t;
before_start: 'st -> unit;
cleanup: 'st -> unit;
}
val worker_loop : block_signals:bool -> ops:'st ops -> 'st -> unit

View file

@ -310,7 +310,9 @@ let create ?(on_init_thread = default_thread_init_exit_)
(* function called in domain with index [i], to (* function called in domain with index [i], to
create the thread and push it into [receive_threads] *) create the thread and push it into [receive_threads] *)
let create_thread_in_domain () = let create_thread_in_domain () =
let thread = Thread.create (WL.worker_loop ~ops:worker_ops) st in let thread =
Thread.create (WL.worker_loop ~block_signals:true ~ops:worker_ops) st
in
(* send the thread from the domain back to us *) (* send the thread from the domain back to us *)
Bb_queue.push receive_threads (idx, thread) Bb_queue.push receive_threads (idx, thread)
in in

View file

@ -15,19 +15,23 @@ module Bb_queue = struct
if was_empty then Condition.broadcast self.cond; if was_empty then Condition.broadcast self.cond;
Mutex.unlock self.mutex Mutex.unlock self.mutex
let pop (self : 'a t) : 'a = let pop (type a) (self : a t) : a =
Mutex.lock self.mutex; let module M = struct
let rec loop () = exception Found of a
if Queue.is_empty self.q then ( end in
Condition.wait self.cond self.mutex; try
(loop [@tailcall]) () Mutex.lock self.mutex;
) else ( while true do
let x = Queue.pop self.q in if Queue.is_empty self.q then
Mutex.unlock self.mutex; Condition.wait self.cond self.mutex
x else (
) let x = Queue.pop self.q in
in Mutex.unlock self.mutex;
loop () raise (M.Found x)
)
done;
assert false
with M.Found x -> x
end end
module Lock = struct module Lock = struct
@ -96,6 +100,11 @@ let domains_ : (worker_state option * Domain_.t option) Lock.t array =
in a tight loop), and if nothing happens it tries to stop to free resources. in a tight loop), and if nothing happens it tries to stop to free resources.
*) *)
let work_ idx (st : worker_state) : unit = let work_ idx (st : worker_state) : unit =
Thread.sigmask SIG_BLOCK
[
Sys.sigpipe; Sys.sigbus; Sys.sigterm; Sys.sigint; Sys.sigusr1; Sys.sigusr2;
]
|> ignore;
let main_loop () = let main_loop () =
let continue = ref true in let continue = ref true in
while !continue do while !continue do

View file

@ -147,7 +147,7 @@ val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit
(** [spawn_ignore f] is [ignore (spawn f)]. (** [spawn_ignore f] is [ignore (spawn f)].
The fiber will still affect termination of the parent, ie. the The fiber will still affect termination of the parent, ie. the
parent will exit only after this new fiber exits. parent will exit only after this new fiber exits.
@param on the optional runner to use, added since NEXT_RELEASE *) @param on the optional runner to use, added since 0.7 *)
val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit
(** Like {!spawn_top} but ignores the result. (** Like {!spawn_top} but ignores the result.

View file

@ -1,6 +1,6 @@
exception Oh_no of Exn_bt.t exception Oh_no of Exn_bt.t
let main (f : Runner.t -> 'a) : 'a = let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
let worker_st = let worker_st =
Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ()) Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ())
~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt))) ~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt)))
@ -13,6 +13,7 @@ let main (f : Runner.t -> 'a) : 'a =
(* run the main thread *) (* run the main thread *)
Moonpool.Private.Worker_loop_.worker_loop worker_st Moonpool.Private.Worker_loop_.worker_loop worker_st
~block_signals (* do not disturb existing thread *)
~ops:Fifo_pool.Private_.worker_ops; ~ops:Fifo_pool.Private_.worker_ops;
match Fiber.peek fiber with match Fiber.peek fiber with
@ -20,3 +21,6 @@ let main (f : Runner.t -> 'a) : 'a =
| Some (Error ebt) -> Exn_bt.raise ebt | Some (Error ebt) -> Exn_bt.raise ebt
| None -> assert false | None -> assert false
with Oh_no ebt -> Exn_bt.raise ebt with Oh_no ebt -> Exn_bt.raise ebt
let main f =
main' () f ~block_signals:false (* do not disturb existing thread *)

View file

@ -23,3 +23,7 @@ val main : (Moonpool.Runner.t -> 'a) -> 'a
(** [main f] runs [f()] in a scope that handles effects, including {!Fiber.await}. (** [main f] runs [f()] in a scope that handles effects, including {!Fiber.await}.
This scope can run background tasks as well, in a cooperative fashion. *) This scope can run background tasks as well, in a cooperative fashion. *)
val main' : ?block_signals:bool -> unit -> (Moonpool.Runner.t -> 'a) -> 'a
(** Same as {!main} but with room for optional arguments.
@since 0.7 *)