mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-05 19:00:33 -05:00
restore DLA
This commit is contained in:
parent
c8e99fd7ee
commit
f7449416e4
7 changed files with 58 additions and 2 deletions
|
|
@ -76,7 +76,11 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit =
|
|||
done
|
||||
in
|
||||
|
||||
try main_loop () with Bb_queue.Closed -> ()
|
||||
try
|
||||
(* handle domain-local await *)
|
||||
Dla_.using ~prepare_for_await:Suspend_.prepare_for_await
|
||||
~while_running:main_loop
|
||||
with Bb_queue.Closed -> ()
|
||||
|
||||
let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = ()
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
open Types_
|
||||
module A = Atomic_
|
||||
|
||||
type suspension = unit Exn_bt.result -> unit
|
||||
type task = unit -> unit
|
||||
|
|
@ -55,9 +56,27 @@ let with_suspend ~on_suspend ~(run : name:string -> task -> unit)
|
|||
|
||||
E.try_with f () { E.effc }
|
||||
|
||||
(* DLA interop *)
|
||||
let prepare_for_await () : Dla_.t =
|
||||
(* current state *)
|
||||
let st : (_ * _ * suspension) option A.t = A.make None in
|
||||
|
||||
let release () : unit =
|
||||
match A.exchange st None with
|
||||
| None -> ()
|
||||
| Some (ls, resume, k) -> resume ~ls k @@ Ok ()
|
||||
and await () : unit =
|
||||
suspend
|
||||
{ handle = (fun ~ls ~run:_ ~resume k -> A.set st (Some (ls, resume, k))) }
|
||||
in
|
||||
|
||||
let t = { Dla_.release; await } in
|
||||
t
|
||||
|
||||
[@@@ocaml.alert "+unstable"]
|
||||
[@@@else_]
|
||||
|
||||
let[@inline] with_suspend ~on_suspend:_ ~run:_ ~resume:_ f = f ()
|
||||
let[@inline] prepare_for_await () = { Dla_.release = ignore; await = ignore }
|
||||
|
||||
[@@@endif]
|
||||
|
|
|
|||
|
|
@ -65,6 +65,9 @@ val suspend : suspension_handler -> unit
|
|||
|
||||
[@@@endif]
|
||||
|
||||
val prepare_for_await : unit -> Dla_.t
|
||||
(** Our stub for DLA. Unstable. *)
|
||||
|
||||
val with_suspend :
|
||||
on_suspend:(unit -> task_ls) ->
|
||||
run:(name:string -> task -> unit) ->
|
||||
|
|
|
|||
|
|
@ -242,7 +242,9 @@ let worker_thread_ (self : state) ~(runner : t) (w : worker_state) : unit =
|
|||
tasks *)
|
||||
Mutex.unlock self.mutex
|
||||
in
|
||||
main ()
|
||||
|
||||
(* handle domain-local await *)
|
||||
Dla_.using ~prepare_for_await:Suspend_.prepare_for_await ~while_running:main
|
||||
|
||||
let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = ()
|
||||
|
||||
|
|
|
|||
13
src/private/dla_.dummy.ml
Normal file
13
src/private/dla_.dummy.ml
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
(** Interface to Domain-local-await.
|
||||
|
||||
This is used to handle the presence or absence of DLA. *)
|
||||
|
||||
type t = {
|
||||
release: unit -> unit;
|
||||
await: unit -> unit;
|
||||
}
|
||||
|
||||
let using : prepare_for_await:(unit -> t) -> while_running:(unit -> 'a) -> 'a =
|
||||
fun ~prepare_for_await:_ ~while_running -> while_running ()
|
||||
|
||||
let setup_domain () = ()
|
||||
10
src/private/dla_.real.ml
Normal file
10
src/private/dla_.real.ml
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
type t = Domain_local_await.t = {
|
||||
release: unit -> unit;
|
||||
await: unit -> unit;
|
||||
}
|
||||
|
||||
let using : prepare_for_await:(unit -> t) -> while_running:(unit -> 'a) -> 'a =
|
||||
Domain_local_await.using
|
||||
|
||||
let setup_domain () = Domain_local_await.per_thread (module Thread)
|
||||
|
||||
|
|
@ -13,6 +13,11 @@
|
|||
from
|
||||
(thread-local-storage -> thread_local_storage_.stub.ml)
|
||||
(-> thread_local_storage_.real.ml))
|
||||
(select
|
||||
dla_.ml
|
||||
from
|
||||
(domain-local-await -> dla_.real.ml)
|
||||
(-> dla_.dummy.ml))
|
||||
(select
|
||||
tracing_.ml
|
||||
from
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue