diff --git a/src/core/fifo_pool.ml b/src/core/fifo_pool.ml index e3b0bc70..6f7b3700 100644 --- a/src/core/fifo_pool.ml +++ b/src/core/fifo_pool.ml @@ -76,7 +76,11 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit = done in - try main_loop () with Bb_queue.Closed -> () + try + (* handle domain-local await *) + Dla_.using ~prepare_for_await:Suspend_.prepare_for_await + ~while_running:main_loop + with Bb_queue.Closed -> () let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = () diff --git a/src/core/suspend_.ml b/src/core/suspend_.ml index 7e864156..fb02bc3a 100644 --- a/src/core/suspend_.ml +++ b/src/core/suspend_.ml @@ -1,4 +1,5 @@ open Types_ +module A = Atomic_ type suspension = unit Exn_bt.result -> unit type task = unit -> unit @@ -55,9 +56,27 @@ let with_suspend ~on_suspend ~(run : name:string -> task -> unit) E.try_with f () { E.effc } +(* DLA interop *) +let prepare_for_await () : Dla_.t = + (* current state *) + let st : (_ * _ * suspension) option A.t = A.make None in + + let release () : unit = + match A.exchange st None with + | None -> () + | Some (ls, resume, k) -> resume ~ls k @@ Ok () + and await () : unit = + suspend + { handle = (fun ~ls ~run:_ ~resume k -> A.set st (Some (ls, resume, k))) } + in + + let t = { Dla_.release; await } in + t + [@@@ocaml.alert "+unstable"] [@@@else_] let[@inline] with_suspend ~on_suspend:_ ~run:_ ~resume:_ f = f () +let[@inline] prepare_for_await () = { Dla_.release = ignore; await = ignore } [@@@endif] diff --git a/src/core/suspend_.mli b/src/core/suspend_.mli index 6e989803..bd922f41 100644 --- a/src/core/suspend_.mli +++ b/src/core/suspend_.mli @@ -65,6 +65,9 @@ val suspend : suspension_handler -> unit [@@@endif] +val prepare_for_await : unit -> Dla_.t +(** Our stub for DLA. Unstable. *) + val with_suspend : on_suspend:(unit -> task_ls) -> run:(name:string -> task -> unit) -> diff --git a/src/core/ws_pool.ml b/src/core/ws_pool.ml index 4eff8e2e..6da8e31a 100644 --- a/src/core/ws_pool.ml +++ b/src/core/ws_pool.ml @@ -242,7 +242,9 @@ let worker_thread_ (self : state) ~(runner : t) (w : worker_state) : unit = tasks *) Mutex.unlock self.mutex in - main () + + (* handle domain-local await *) + Dla_.using ~prepare_for_await:Suspend_.prepare_for_await ~while_running:main let default_thread_init_exit_ ~dom_id:_ ~t_id:_ () = () diff --git a/src/private/dla_.dummy.ml b/src/private/dla_.dummy.ml new file mode 100644 index 00000000..3991ff1a --- /dev/null +++ b/src/private/dla_.dummy.ml @@ -0,0 +1,13 @@ +(** Interface to Domain-local-await. + + This is used to handle the presence or absence of DLA. *) + +type t = { + release: unit -> unit; + await: unit -> unit; +} + +let using : prepare_for_await:(unit -> t) -> while_running:(unit -> 'a) -> 'a = + fun ~prepare_for_await:_ ~while_running -> while_running () + +let setup_domain () = () diff --git a/src/private/dla_.real.ml b/src/private/dla_.real.ml new file mode 100644 index 00000000..16901ba2 --- /dev/null +++ b/src/private/dla_.real.ml @@ -0,0 +1,10 @@ +type t = Domain_local_await.t = { + release: unit -> unit; + await: unit -> unit; +} + +let using : prepare_for_await:(unit -> t) -> while_running:(unit -> 'a) -> 'a = + Domain_local_await.using + +let setup_domain () = Domain_local_await.per_thread (module Thread) + diff --git a/src/private/dune b/src/private/dune index 653a129f..2d52b3ef 100644 --- a/src/private/dune +++ b/src/private/dune @@ -13,6 +13,11 @@ from (thread-local-storage -> thread_local_storage_.stub.ml) (-> thread_local_storage_.real.ml)) + (select + dla_.ml + from + (domain-local-await -> dla_.real.ml) + (-> dla_.dummy.ml)) (select tracing_.ml from