mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-07 19:55:39 -05:00
feat: actually support domain-local-await if installed
This commit is contained in:
parent
a266a42628
commit
2acf4b28eb
1 changed files with 28 additions and 1 deletions
29
src/pool.ml
29
src/pool.ml
|
|
@ -87,6 +87,27 @@ let num_tasks (self : t) : int =
|
||||||
Array.iter (fun q -> n := !n + Bb_queue.size q) self.qs;
|
Array.iter (fun q -> n := !n + Bb_queue.size q) self.qs;
|
||||||
!n
|
!n
|
||||||
|
|
||||||
|
(* DLA interop *)
|
||||||
|
let prepare_for_await () : Dla_.t =
|
||||||
|
(* current state *)
|
||||||
|
let st :
|
||||||
|
((with_handler:bool -> task -> unit) * Suspend_types_.suspension) option
|
||||||
|
A.t =
|
||||||
|
A.make None
|
||||||
|
in
|
||||||
|
|
||||||
|
let release () : unit =
|
||||||
|
match A.exchange st None with
|
||||||
|
| None -> ()
|
||||||
|
| Some (run, k) -> run ~with_handler:true (fun () -> k (Ok ()))
|
||||||
|
and await () : unit =
|
||||||
|
Suspend_.suspend
|
||||||
|
{ Suspend_types_.handle = (fun ~run k -> A.set st (Some (run, k))) }
|
||||||
|
in
|
||||||
|
|
||||||
|
let t = { Dla_.release; await } in
|
||||||
|
t
|
||||||
|
|
||||||
exception Got_task of task
|
exception Got_task of task
|
||||||
|
|
||||||
type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task
|
type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task
|
||||||
|
|
@ -96,7 +117,7 @@ let worker_thread_ pool ~on_exn ~around_task (active : bool A.t)
|
||||||
let num_qs = Array.length qs in
|
let num_qs = Array.length qs in
|
||||||
let (AT_pair (before_task, after_task)) = around_task in
|
let (AT_pair (before_task, after_task)) = around_task in
|
||||||
|
|
||||||
try
|
let main_loop () =
|
||||||
while A.get active do
|
while A.get active do
|
||||||
(* last resort: block on my queue *)
|
(* last resort: block on my queue *)
|
||||||
let pop_blocking () =
|
let pop_blocking () =
|
||||||
|
|
@ -117,12 +138,18 @@ let worker_thread_ pool ~on_exn ~around_task (active : bool A.t)
|
||||||
in
|
in
|
||||||
|
|
||||||
let _ctx = before_task pool in
|
let _ctx = before_task pool in
|
||||||
|
(* run the task now, catching errors *)
|
||||||
(try task ()
|
(try task ()
|
||||||
with e ->
|
with e ->
|
||||||
let bt = Printexc.get_raw_backtrace () in
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
on_exn e bt);
|
on_exn e bt);
|
||||||
after_task pool _ctx
|
after_task pool _ctx
|
||||||
done
|
done
|
||||||
|
in
|
||||||
|
|
||||||
|
try
|
||||||
|
(* handle domain-local await *)
|
||||||
|
Dla_.using ~prepare_for_await ~while_running:main_loop
|
||||||
with Bb_queue.Closed -> ()
|
with Bb_queue.Closed -> ()
|
||||||
|
|
||||||
let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = ()
|
let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = ()
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue