From 00c845ec8d4764557de28d8606c0b21edfb5841a Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 5 May 2025 12:58:42 -0400 Subject: [PATCH] feat(picos): check for cancellation; expose more in Base.Raw --- src/picos/base.ml | 31 +++++++++++++++++++++++-------- src/picos/base.mli | 8 ++++++++ 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/src/picos/base.ml b/src/picos/base.ml index d04e242..be5a043 100644 --- a/src/picos/base.ml +++ b/src/picos/base.ml @@ -1,12 +1,18 @@ open Common_ let get_loop_exn_ : unit -> Nanoev.t = Global_ev.get_nanoev_exn +let[@inline] check_fiber_ () : unit = Picos.Fiber.(check @@ current ()) let[@inline] unwrap_ = function | None -> () | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt +let[@inline] on_readable_ fd x y f : unit = + let ev = get_loop_exn_ () in + Nanoev.on_readable ev fd x y f + let[@unroll 1] rec retry_read_ fd f = + check_fiber_ (); match f () with | res -> res | exception @@ -18,15 +24,19 @@ let[@unroll 1] rec retry_read_ fd f = (* Trace_.message "read must wait"; *) let trigger = Picos.Trigger.create () in let closed_r = ref false in - let ev = get_loop_exn_ () in - Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r -> + on_readable_ fd trigger closed_r (fun ~closed trigger closed_r -> closed_r := closed; Picos.Trigger.signal trigger); Picos.Trigger.await trigger |> unwrap_; if !closed_r then raise Closed; retry_read_ fd f +let[@inline] on_writable_ fd x y f : unit = + let ev = get_loop_exn_ () in + Nanoev.on_writable ev fd x y f + let[@unroll 1] rec retry_write_ fd f = + check_fiber_ (); match f () with | res -> res | exception @@ -36,10 +46,9 @@ let[@unroll 1] rec retry_write_ fd f = _, _ ) -> (* Trace_.message "write must wait"; *) - let ev = get_loop_exn_ () in let trigger = Picos.Trigger.create () in let closed_r = ref false in - Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r -> + on_writable_ fd trigger closed_r (fun ~closed trigger closed_r -> closed_r := closed; Picos.Trigger.signal trigger); Picos.Trigger.await trigger |> unwrap_; @@ -84,16 +93,22 @@ let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr) let[@inline] max_fds () = Option.fold ~none:1024 ~some:Nanoev.max_fds @@ Global_ev.get_nanoev () +let run_after_s_ t x y f : unit = + let ev = get_loop_exn_ () in + Nanoev.run_after_s ev t x y f + let sleep t = if t > 0. then ( - let ev = get_loop_exn_ () in let trigger = Picos.Trigger.create () in - Nanoev.run_after_s ev t trigger () (fun trigger () -> - Picos.Trigger.signal trigger); - Picos.Trigger.await trigger |> unwrap_ + run_after_s_ t trigger () (fun trigger () -> Picos.Trigger.signal trigger); + Picos.Trigger.await trigger |> unwrap_; + check_fiber_ () ) module Raw = struct + let run_after_s = run_after_s_ + let on_readable = on_readable_ + let on_writable = on_writable_ let retry_read = retry_read_ let retry_write = retry_write_ end diff --git a/src/picos/base.mli b/src/picos/base.mli index 7b949e9..3b2676a 100644 --- a/src/picos/base.mli +++ b/src/picos/base.mli @@ -32,6 +32,14 @@ val sleep : float -> unit (** Suspend current fiber for [n] seconds *) module Raw : sig + val run_after_s : float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit + + val on_writable : + Unix.file_descr -> 'a -> 'b -> (closed:bool -> 'a -> 'b -> unit) -> unit + + val on_readable : + Unix.file_descr -> 'a -> 'b -> (closed:bool -> 'a -> 'b -> unit) -> unit + val retry_read : Unix.file_descr -> (unit -> 'a) -> 'a val retry_write : Unix.file_descr -> (unit -> 'a) -> 'a end