mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-18 08:36:43 -05:00
Compare commits
5 commits
944a579a8f
...
51a2e0af12
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51a2e0af12 | ||
|
|
867cbd2318 | ||
|
|
eba239487c | ||
|
|
01cdb66f1f | ||
|
|
8cb09c01c4 |
4 changed files with 40 additions and 15 deletions
|
|
@ -15,6 +15,8 @@ let make () =
|
||||||
fut, fut
|
fut, fut
|
||||||
|
|
||||||
let[@inline] return x : _ t = C.returned x
|
let[@inline] return x : _ t = C.returned x
|
||||||
|
let[@inline] cancel x ebt = C.cancel x (fst ebt) (snd ebt)
|
||||||
|
let[@inline] try_cancel x ebt = C.try_cancel x (fst ebt) (snd ebt)
|
||||||
|
|
||||||
let[@inline] fail exn bt : _ t =
|
let[@inline] fail exn bt : _ t =
|
||||||
let fut = C.create () in
|
let fut = C.create () in
|
||||||
|
|
|
||||||
|
|
@ -51,6 +51,15 @@ val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
|
||||||
|
|
||||||
exception Already_fulfilled
|
exception Already_fulfilled
|
||||||
|
|
||||||
|
val try_cancel : _ promise -> Exn_bt.t -> bool
|
||||||
|
(** [try_cancel promise ebt] tries to cancel the promise, returning [true]. It
|
||||||
|
returns [false] if the promise is already resolved.
|
||||||
|
@since NEXT_RELEASE *)
|
||||||
|
|
||||||
|
val cancel : _ promise -> Exn_bt.t -> unit
|
||||||
|
(** Silent version of {!try_cancel}, ignoring the result.
|
||||||
|
@since NEXT_RELEASE *)
|
||||||
|
|
||||||
val fulfill : 'a promise -> 'a or_error -> unit
|
val fulfill : 'a promise -> 'a or_error -> unit
|
||||||
(** Fullfill the promise, setting the future at the same time.
|
(** Fullfill the promise, setting the future at the same time.
|
||||||
@raise Already_fulfilled if the promise is already fulfilled. *)
|
@raise Already_fulfilled if the promise is already fulfilled. *)
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,10 @@ let[@inline] discontinue k exn =
|
||||||
let bt = Printexc.get_raw_backtrace () in
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
Effect.Deep.discontinue_with_backtrace k exn bt
|
Effect.Deep.discontinue_with_backtrace k exn bt
|
||||||
|
|
||||||
|
let[@inline] raise_with_bt exn =
|
||||||
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
|
Printexc.raise_with_backtrace exn bt
|
||||||
|
|
||||||
let with_handler (type st arg) ~(ops : st ops) (self : st) :
|
let with_handler (type st arg) ~(ops : st ops) (self : st) :
|
||||||
(unit -> unit) -> unit =
|
(unit -> unit) -> unit =
|
||||||
let current =
|
let current =
|
||||||
|
|
@ -93,7 +97,7 @@ let with_handler (type st arg) ~(ops : st ops) (self : st) :
|
||||||
discontinue k exn)
|
discontinue k exn)
|
||||||
| _ -> None
|
| _ -> None
|
||||||
in
|
in
|
||||||
let handler = Effect.Deep.{ retc = Fun.id; exnc = raise; effc } in
|
let handler = Effect.Deep.{ retc = Fun.id; exnc = raise_with_bt; effc } in
|
||||||
fun f -> Effect.Deep.match_with f () handler
|
fun f -> Effect.Deep.match_with f () handler
|
||||||
|
|
||||||
[@@@else_]
|
[@@@else_]
|
||||||
|
|
@ -145,7 +149,8 @@ let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
|
||||||
(* this is already in an effect handler *)
|
(* this is already in an effect handler *)
|
||||||
k ()
|
k ()
|
||||||
with e ->
|
with e ->
|
||||||
let ebt = Exn_bt.get e in
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
|
let ebt = Exn_bt.make e bt in
|
||||||
ops.on_exn self ebt);
|
ops.on_exn self ebt);
|
||||||
|
|
||||||
after_task runner _ctx;
|
after_task runner _ctx;
|
||||||
|
|
|
||||||
|
|
@ -15,19 +15,23 @@ module Bb_queue = struct
|
||||||
if was_empty then Condition.broadcast self.cond;
|
if was_empty then Condition.broadcast self.cond;
|
||||||
Mutex.unlock self.mutex
|
Mutex.unlock self.mutex
|
||||||
|
|
||||||
let pop (self : 'a t) : 'a =
|
let pop (type a) (self : a t) : a =
|
||||||
Mutex.lock self.mutex;
|
let module M = struct
|
||||||
let rec loop () =
|
exception Found of a
|
||||||
if Queue.is_empty self.q then (
|
end in
|
||||||
Condition.wait self.cond self.mutex;
|
try
|
||||||
(loop [@tailcall]) ()
|
Mutex.lock self.mutex;
|
||||||
) else (
|
while true do
|
||||||
let x = Queue.pop self.q in
|
if Queue.is_empty self.q then
|
||||||
Mutex.unlock self.mutex;
|
Condition.wait self.cond self.mutex
|
||||||
x
|
else (
|
||||||
)
|
let x = Queue.pop self.q in
|
||||||
in
|
Mutex.unlock self.mutex;
|
||||||
loop ()
|
raise (M.Found x)
|
||||||
|
)
|
||||||
|
done;
|
||||||
|
assert false
|
||||||
|
with M.Found x -> x
|
||||||
end
|
end
|
||||||
|
|
||||||
module Lock = struct
|
module Lock = struct
|
||||||
|
|
@ -95,6 +99,11 @@ let domains_ : (worker_state option * Domain_.t option) Lock.t array =
|
||||||
a [Pool.with_] or [Pool.create() … Pool.shutdown()] in a tight loop), and
|
a [Pool.with_] or [Pool.create() … Pool.shutdown()] in a tight loop), and
|
||||||
if nothing happens it tries to stop to free resources. *)
|
if nothing happens it tries to stop to free resources. *)
|
||||||
let work_ idx (st : worker_state) : unit =
|
let work_ idx (st : worker_state) : unit =
|
||||||
|
Thread.sigmask SIG_BLOCK
|
||||||
|
[
|
||||||
|
Sys.sigpipe; Sys.sigbus; Sys.sigterm; Sys.sigint; Sys.sigusr1; Sys.sigusr2;
|
||||||
|
]
|
||||||
|
|> ignore;
|
||||||
let main_loop () =
|
let main_loop () =
|
||||||
let continue = ref true in
|
let continue = ref true in
|
||||||
while !continue do
|
while !continue do
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue