diff --git a/src/lwt/dune b/src/lwt/dune index c6bb5ab3..93c86e61 100644 --- a/src/lwt/dune +++ b/src/lwt/dune @@ -5,6 +5,7 @@ (>= %{ocaml_version} 5.0)) (libraries (re_export moonpool) + moonpool.fib picos (re_export lwt) lwt.unix)) diff --git a/src/lwt/moonpool_lwt.ml b/src/lwt/moonpool_lwt.ml index 9fa62582..1e1086c0 100644 --- a/src/lwt/moonpool_lwt.ml +++ b/src/lwt/moonpool_lwt.ml @@ -49,9 +49,9 @@ module Scheduler_state = struct let[@inline never] add_action_from_another_thread_ (self : st) f : unit = Mutex.lock self.mutex; Queue.push f self.actions_from_other_threads; + Mutex.unlock self.mutex; if not (Atomic.exchange self.has_notified true) then - Lwt_unix.send_notification self.notification; - Mutex.unlock self.mutex + Lwt_unix.send_notification self.notification let[@inline] on_lwt_thread_ (self : st) : bool = Thread.id (Thread.self ()) = self.thread @@ -73,9 +73,10 @@ module Scheduler_state = struct failwith "moonpool-lwt: cleanup from the wrong thread"; Option.iter Lwt_main.Enter_iter_hooks.remove st.enter_hook; Option.iter Lwt_main.Leave_iter_hooks.remove st.leave_hook; + Lwt_unix.stop_notification st.notification; Atomic.set cur_st None - | _ -> () + | None -> failwith "moonpool-lwt: cleanup failed (no current active state)" end module Ops = struct @@ -290,7 +291,7 @@ let[@inline] is_setup () = Option.is_some @@ Atomic.get Scheduler_state.cur_st let spawn_lwt f : _ Lwt.t = let st = Main_state.get_st () in let lwt_fut, lwt_prom = Lwt.wait () in - M.Runner.run_async st.as_runner (fun () -> + Moonpool_fib.spawn_top_ignore ~on:st.as_runner (fun () -> try let x = f () in Lwt.wakeup lwt_prom x