feat: block signals in workers if asked to

This commit is contained in:
Simon Cruanes 2025-03-13 10:07:20 -04:00
parent 389f237993
commit a20208ec37
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 31 additions and 4 deletions

View file

@ -165,7 +165,9 @@ let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
create the thread and push it into [receive_threads] *) create the thread and push it into [receive_threads] *)
let create_thread_in_domain () = let create_thread_in_domain () =
let st = { idx = i; dom_idx; st = pool } in let st = { idx = i; dom_idx; st = pool } in
let thread = Thread.create (WL.worker_loop ~ops:worker_ops) st in let thread =
Thread.create (WL.worker_loop ~block_signals:true ~ops:worker_ops) st
in
(* send the thread from the domain back to us *) (* send the thread from the domain back to us *)
Bb_queue.push receive_threads (i, thread) Bb_queue.push receive_threads (i, thread)
in in

View file

@ -102,7 +102,22 @@ let with_handler ~ops:_ self f = f ()
[@@@endif] [@@@endif]
let worker_loop (type st) ~(ops : st ops) (self : st) : unit = let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
if block_signals then
List.iter
(fun signal -> Sys.set_signal signal Sys.Signal_ignore)
[
Sys.sigterm;
Sys.sigpipe;
Sys.sigint;
Sys.sigchld;
Sys.sigalrm;
Sys.sigusr1;
Sys.sigusr2;
Sys.sigvtalrm;
Sys.sigstop;
];
let cur_fiber : fiber ref = ref _dummy_fiber in let cur_fiber : fiber ref = ref _dummy_fiber in
let runner = ops.runner self in let runner = ops.runner self in
TLS.set Runner.For_runner_implementors.k_cur_runner runner; TLS.set Runner.For_runner_implementors.k_cur_runner runner;

View file

@ -310,7 +310,9 @@ let create ?(on_init_thread = default_thread_init_exit_)
(* function called in domain with index [i], to (* function called in domain with index [i], to
create the thread and push it into [receive_threads] *) create the thread and push it into [receive_threads] *)
let create_thread_in_domain () = let create_thread_in_domain () =
let thread = Thread.create (WL.worker_loop ~ops:worker_ops) st in let thread =
Thread.create (WL.worker_loop ~block_signals:true ~ops:worker_ops) st
in
(* send the thread from the domain back to us *) (* send the thread from the domain back to us *)
Bb_queue.push receive_threads (idx, thread) Bb_queue.push receive_threads (idx, thread)
in in

View file

@ -1,6 +1,6 @@
exception Oh_no of Exn_bt.t exception Oh_no of Exn_bt.t
let main (f : Runner.t -> 'a) : 'a = let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
let worker_st = let worker_st =
Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ()) Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ())
~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt))) ~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt)))
@ -13,6 +13,7 @@ let main (f : Runner.t -> 'a) : 'a =
(* run the main thread *) (* run the main thread *)
Moonpool.Private.Worker_loop_.worker_loop worker_st Moonpool.Private.Worker_loop_.worker_loop worker_st
~block_signals (* do not disturb existing thread *)
~ops:Fifo_pool.Private_.worker_ops; ~ops:Fifo_pool.Private_.worker_ops;
match Fiber.peek fiber with match Fiber.peek fiber with
@ -20,3 +21,6 @@ let main (f : Runner.t -> 'a) : 'a =
| Some (Error ebt) -> Exn_bt.raise ebt | Some (Error ebt) -> Exn_bt.raise ebt
| None -> assert false | None -> assert false
with Oh_no ebt -> Exn_bt.raise ebt with Oh_no ebt -> Exn_bt.raise ebt
let main f =
main' () f ~block_signals:false (* do not disturb existing thread *)

View file

@ -23,3 +23,7 @@ val main : (Moonpool.Runner.t -> 'a) -> 'a
(** [main f] runs [f()] in a scope that handles effects, including {!Fiber.await}. (** [main f] runs [f()] in a scope that handles effects, including {!Fiber.await}.
This scope can run background tasks as well, in a cooperative fashion. *) This scope can run background tasks as well, in a cooperative fashion. *)
val main' : ?block_signals:bool -> unit -> (Moonpool.Runner.t -> 'a) -> 'a
(** Same as {!main} but with room for optional arguments.
@since NEXT_RELEASE *)