Compare commits

...

5 commits

Author SHA1 Message Date
Simon Cruanes
51a2e0af12
Merge 01cdb66f1f into 867cbd2318 2025-06-23 22:34:21 +02:00
Simon Cruanes
867cbd2318
fix core: better repropagating of errors
Some checks failed
github pages / Deploy doc (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / build-compat (push) Has been cancelled
Build and Test / format (push) Has been cancelled
2025-06-20 16:49:27 -04:00
Simon Cruanes
eba239487c
add Fut.{cancel,try_cancel}
Some checks failed
github pages / Deploy doc (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / build-compat (push) Has been cancelled
Build and Test / format (push) Has been cancelled
2025-05-21 11:48:12 -04:00
Simon Cruanes
01cdb66f1f
avoid recursion in dpool 2024-10-09 00:26:30 -04:00
Simon Cruanes
8cb09c01c4
fix domain pool: block signals in background threads
close #35
2024-10-08 15:28:04 -04:00
4 changed files with 40 additions and 15 deletions

View file

@ -15,6 +15,8 @@ let make () =
fut, fut
let[@inline] return x : _ t = C.returned x
let[@inline] cancel x ebt = C.cancel x (fst ebt) (snd ebt)
let[@inline] try_cancel x ebt = C.try_cancel x (fst ebt) (snd ebt)
let[@inline] fail exn bt : _ t =
let fut = C.create () in

View file

@ -51,6 +51,15 @@ val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
exception Already_fulfilled
val try_cancel : _ promise -> Exn_bt.t -> bool
(** [try_cancel promise ebt] tries to cancel the promise, returning [true]. It
returns [false] if the promise is already resolved.
@since NEXT_RELEASE *)
val cancel : _ promise -> Exn_bt.t -> unit
(** Silent version of {!try_cancel}, ignoring the result.
@since NEXT_RELEASE *)
val fulfill : 'a promise -> 'a or_error -> unit
(** Fullfill the promise, setting the future at the same time.
@raise Already_fulfilled if the promise is already fulfilled. *)

View file

@ -39,6 +39,10 @@ let[@inline] discontinue k exn =
let bt = Printexc.get_raw_backtrace () in
Effect.Deep.discontinue_with_backtrace k exn bt
let[@inline] raise_with_bt exn =
let bt = Printexc.get_raw_backtrace () in
Printexc.raise_with_backtrace exn bt
let with_handler (type st arg) ~(ops : st ops) (self : st) :
(unit -> unit) -> unit =
let current =
@ -93,7 +97,7 @@ let with_handler (type st arg) ~(ops : st ops) (self : st) :
discontinue k exn)
| _ -> None
in
let handler = Effect.Deep.{ retc = Fun.id; exnc = raise; effc } in
let handler = Effect.Deep.{ retc = Fun.id; exnc = raise_with_bt; effc } in
fun f -> Effect.Deep.match_with f () handler
[@@@else_]
@ -145,7 +149,8 @@ let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
(* this is already in an effect handler *)
k ()
with e ->
let ebt = Exn_bt.get e in
let bt = Printexc.get_raw_backtrace () in
let ebt = Exn_bt.make e bt in
ops.on_exn self ebt);
after_task runner _ctx;

View file

@ -15,19 +15,23 @@ module Bb_queue = struct
if was_empty then Condition.broadcast self.cond;
Mutex.unlock self.mutex
let pop (self : 'a t) : 'a =
let pop (type a) (self : a t) : a =
let module M = struct
exception Found of a
end in
try
Mutex.lock self.mutex;
let rec loop () =
if Queue.is_empty self.q then (
Condition.wait self.cond self.mutex;
(loop [@tailcall]) ()
) else (
while true do
if Queue.is_empty self.q then
Condition.wait self.cond self.mutex
else (
let x = Queue.pop self.q in
Mutex.unlock self.mutex;
x
raise (M.Found x)
)
in
loop ()
done;
assert false
with M.Found x -> x
end
module Lock = struct
@ -95,6 +99,11 @@ let domains_ : (worker_state option * Domain_.t option) Lock.t array =
a [Pool.with_] or [Pool.create() Pool.shutdown()] in a tight loop), and
if nothing happens it tries to stop to free resources. *)
let work_ idx (st : worker_state) : unit =
Thread.sigmask SIG_BLOCK
[
Sys.sigpipe; Sys.sigbus; Sys.sigterm; Sys.sigint; Sys.sigusr1; Sys.sigusr2;
]
|> ignore;
let main_loop () =
let continue = ref true in
while !continue do