feat(picos): check for cancellation; expose more in Base.Raw
Some checks are pending
github pages / Deploy doc (push) Waiting to run
Build and Test / build (push) Waiting to run
Build and Test / format (push) Waiting to run

This commit is contained in:
Simon Cruanes 2025-05-05 12:58:42 -04:00
parent b6583d69a8
commit 00c845ec8d
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 31 additions and 8 deletions

View file

@ -1,12 +1,18 @@
open Common_ open Common_
let get_loop_exn_ : unit -> Nanoev.t = Global_ev.get_nanoev_exn let get_loop_exn_ : unit -> Nanoev.t = Global_ev.get_nanoev_exn
let[@inline] check_fiber_ () : unit = Picos.Fiber.(check @@ current ())
let[@inline] unwrap_ = function let[@inline] unwrap_ = function
| None -> () | None -> ()
| Some (exn, bt) -> Printexc.raise_with_backtrace exn bt | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt
let[@inline] on_readable_ fd x y f : unit =
let ev = get_loop_exn_ () in
Nanoev.on_readable ev fd x y f
let[@unroll 1] rec retry_read_ fd f = let[@unroll 1] rec retry_read_ fd f =
check_fiber_ ();
match f () with match f () with
| res -> res | res -> res
| exception | exception
@ -18,15 +24,19 @@ let[@unroll 1] rec retry_read_ fd f =
(* Trace_.message "read must wait"; *) (* Trace_.message "read must wait"; *)
let trigger = Picos.Trigger.create () in let trigger = Picos.Trigger.create () in
let closed_r = ref false in let closed_r = ref false in
let ev = get_loop_exn_ () in on_readable_ fd trigger closed_r (fun ~closed trigger closed_r ->
Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r ->
closed_r := closed; closed_r := closed;
Picos.Trigger.signal trigger); Picos.Trigger.signal trigger);
Picos.Trigger.await trigger |> unwrap_; Picos.Trigger.await trigger |> unwrap_;
if !closed_r then raise Closed; if !closed_r then raise Closed;
retry_read_ fd f retry_read_ fd f
let[@inline] on_writable_ fd x y f : unit =
let ev = get_loop_exn_ () in
Nanoev.on_writable ev fd x y f
let[@unroll 1] rec retry_write_ fd f = let[@unroll 1] rec retry_write_ fd f =
check_fiber_ ();
match f () with match f () with
| res -> res | res -> res
| exception | exception
@ -36,10 +46,9 @@ let[@unroll 1] rec retry_write_ fd f =
_, _,
_ ) -> _ ) ->
(* Trace_.message "write must wait"; *) (* Trace_.message "write must wait"; *)
let ev = get_loop_exn_ () in
let trigger = Picos.Trigger.create () in let trigger = Picos.Trigger.create () in
let closed_r = ref false in let closed_r = ref false in
Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r -> on_writable_ fd trigger closed_r (fun ~closed trigger closed_r ->
closed_r := closed; closed_r := closed;
Picos.Trigger.signal trigger); Picos.Trigger.signal trigger);
Picos.Trigger.await trigger |> unwrap_; Picos.Trigger.await trigger |> unwrap_;
@ -84,16 +93,22 @@ let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr)
let[@inline] max_fds () = let[@inline] max_fds () =
Option.fold ~none:1024 ~some:Nanoev.max_fds @@ Global_ev.get_nanoev () Option.fold ~none:1024 ~some:Nanoev.max_fds @@ Global_ev.get_nanoev ()
let run_after_s_ t x y f : unit =
let ev = get_loop_exn_ () in
Nanoev.run_after_s ev t x y f
let sleep t = let sleep t =
if t > 0. then ( if t > 0. then (
let ev = get_loop_exn_ () in
let trigger = Picos.Trigger.create () in let trigger = Picos.Trigger.create () in
Nanoev.run_after_s ev t trigger () (fun trigger () -> run_after_s_ t trigger () (fun trigger () -> Picos.Trigger.signal trigger);
Picos.Trigger.signal trigger); Picos.Trigger.await trigger |> unwrap_;
Picos.Trigger.await trigger |> unwrap_ check_fiber_ ()
) )
module Raw = struct module Raw = struct
let run_after_s = run_after_s_
let on_readable = on_readable_
let on_writable = on_writable_
let retry_read = retry_read_ let retry_read = retry_read_
let retry_write = retry_write_ let retry_write = retry_write_
end end

View file

@ -32,6 +32,14 @@ val sleep : float -> unit
(** Suspend current fiber for [n] seconds *) (** Suspend current fiber for [n] seconds *)
module Raw : sig module Raw : sig
val run_after_s : float -> 'a -> 'b -> ('a -> 'b -> unit) -> unit
val on_writable :
Unix.file_descr -> 'a -> 'b -> (closed:bool -> 'a -> 'b -> unit) -> unit
val on_readable :
Unix.file_descr -> 'a -> 'b -> (closed:bool -> 'a -> 'b -> unit) -> unit
val retry_read : Unix.file_descr -> (unit -> 'a) -> 'a val retry_read : Unix.file_descr -> (unit -> 'a) -> 'a
val retry_write : Unix.file_descr -> (unit -> 'a) -> 'a val retry_write : Unix.file_descr -> (unit -> 'a) -> 'a
end end