lwt: handle fibers in moonpool_lwt

This commit is contained in:
Simon Cruanes 2025-09-05 10:23:06 -04:00
parent 00078d8b43
commit 9e814ecb48
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 6 additions and 4 deletions

View file

@ -5,6 +5,7 @@
(>= %{ocaml_version} 5.0))
(libraries
(re_export moonpool)
moonpool.fib
picos
(re_export lwt)
lwt.unix))

View file

@ -49,9 +49,9 @@ module Scheduler_state = struct
let[@inline never] add_action_from_another_thread_ (self : st) f : unit =
Mutex.lock self.mutex;
Queue.push f self.actions_from_other_threads;
Mutex.unlock self.mutex;
if not (Atomic.exchange self.has_notified true) then
Lwt_unix.send_notification self.notification;
Mutex.unlock self.mutex
Lwt_unix.send_notification self.notification
let[@inline] on_lwt_thread_ (self : st) : bool =
Thread.id (Thread.self ()) = self.thread
@ -73,9 +73,10 @@ module Scheduler_state = struct
failwith "moonpool-lwt: cleanup from the wrong thread";
Option.iter Lwt_main.Enter_iter_hooks.remove st.enter_hook;
Option.iter Lwt_main.Leave_iter_hooks.remove st.leave_hook;
Lwt_unix.stop_notification st.notification;
Atomic.set cur_st None
| _ -> ()
| None -> failwith "moonpool-lwt: cleanup failed (no current active state)"
end
module Ops = struct
@ -290,7 +291,7 @@ let[@inline] is_setup () = Option.is_some @@ Atomic.get Scheduler_state.cur_st
let spawn_lwt f : _ Lwt.t =
let st = Main_state.get_st () in
let lwt_fut, lwt_prom = Lwt.wait () in
M.Runner.run_async st.as_runner (fun () ->
Moonpool_fib.spawn_top_ignore ~on:st.as_runner (fun () ->
try
let x = f () in
Lwt.wakeup lwt_prom x