From 4c3c53ee169affe609c967cf8bef22cadd548fb7 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 May 2025 11:22:32 -0400 Subject: [PATCH] feat(picos): add shutdown and max_fds --- src/picos/nanoev_picos.ml | 19 +++++++++++++++++++ src/picos/nanoev_picos.mli | 22 +++++++++++++++++----- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/src/picos/nanoev_picos.ml b/src/picos/nanoev_picos.ml index 9920345..0835252 100644 --- a/src/picos/nanoev_picos.ml +++ b/src/picos/nanoev_picos.ml @@ -55,10 +55,24 @@ module Global_ = struct nanoev = ev; th = Thread.create (bg_thread_ ~active ~evloop:ev) (); } + + let shutdown_bg_thread () = + let@ () = with_lock lock in + match Atomic.exchange st None with + | None -> () + | Some st -> + Atomic.set st.active false; + Nanoev.wakeup_from_outside st.nanoev; + Thread.join st.th end let has_bg_thread = Global_.has_bg_thread let setup_bg_thread = Global_.setup_bg_thread +let shutdown_bg_thread = Global_.shutdown_bg_thread + +let with_setup_bg_thread ev f = + setup_bg_thread ev; + Fun.protect ~finally:shutdown_bg_thread f let[@inline] get_loop_exn_ () : Nanoev.t = match Atomic.get Global_.st with @@ -136,6 +150,11 @@ let write fd buf i len : int = let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr) +let[@inline] max_fds () = + match Atomic.get Global_.st with + | None -> 1024 + | Some st -> Nanoev.max_fds st.nanoev + let sleep t = if t > 0. then ( let ev = get_loop_exn_ () in diff --git a/src/picos/nanoev_picos.mli b/src/picos/nanoev_picos.mli index 7dfbe1b..a35bb4b 100644 --- a/src/picos/nanoev_picos.mli +++ b/src/picos/nanoev_picos.mli @@ -1,11 +1,18 @@ (** Basic interface with picos *) -val setup_bg_thread : Nanoev.t -> unit -(** Install this event loop in a background thread *) +module Background_thread : sig + val setup_bg_thread : Nanoev.t -> unit + (** Install this event loop in a background thread *) -val has_bg_thread : unit -> bool -(** [has_bg_thread ()] is [true] iff a background thread is running a nanoev - loop *) + val shutdown_bg_thread : unit -> unit + (** Shutdown background thread, assuming {! has_bg_thread} returns [true] *) + + val with_setup_bg_thread : Nanoev.t -> (unit -> 'a) -> 'a + + val has_bg_thread : unit -> bool + (** [has_bg_thread ()] is [true] iff a background thread is running a nanoev + loop *) +end (** {2 Non blocking IO primitives} *) @@ -33,4 +40,9 @@ val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr @raise Nanoev.Closed if the FD is closed. @raise Unix.Unix_error for other errors *) +val max_fds : unit -> int +(** Maximum number of file descriptors one can await on. See {!Nanoev.max_fds} +*) + val sleep : float -> unit +(** Suspend current fiber for [n] seconds *)