From dce54289edf72afbb5573d4fe99fcf6970739a22 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 21 Jun 2024 15:46:42 -0400 Subject: [PATCH] wip: `Unix`-based IO, now using iostream --- src/lwt/moonpool_lwt.mli | 6 ------ src/unix/IO_unix.ml | 2 ++ src/unix/cancel_handle.ml | 18 ++++++++++++------ src/unix/cancel_handle.mli | 3 +++ src/unix/dune | 26 +++++++++++++++----------- src/unix/time.mtime.ml | 4 +--- src/unix/timer.mli | 3 +++ 7 files changed, 36 insertions(+), 26 deletions(-) diff --git a/src/lwt/moonpool_lwt.mli b/src/lwt/moonpool_lwt.mli index 0d171504..944bc752 100644 --- a/src/lwt/moonpool_lwt.mli +++ b/src/lwt/moonpool_lwt.mli @@ -81,12 +81,6 @@ module IO : sig (** Suspend the fiber for [n] seconds. *) end -module IO_in = IO_in -(** Input channel *) - -module IO_out = IO_out -(** Output channel *) - module TCP_server : sig type t = Lwt_io.server diff --git a/src/unix/IO_unix.ml b/src/unix/IO_unix.ml index e2f40598..0ec47d04 100644 --- a/src/unix/IO_unix.ml +++ b/src/unix/IO_unix.ml @@ -2,6 +2,7 @@ open Common_ type file_descr = Unix.file_descr +(** Non blocking read *) let rec read fd buf i len : int = if len = 0 then 0 @@ -25,6 +26,7 @@ let rec read fd buf i len : int = | n -> n ) +(** Non blocking write *) let rec write_once fd buf i len : int = if len = 0 then 0 diff --git a/src/unix/cancel_handle.ml b/src/unix/cancel_handle.ml index 6911ca4c..bbde1873 100644 --- a/src/unix/cancel_handle.ml +++ b/src/unix/cancel_handle.ml @@ -2,14 +2,18 @@ open Common_ +type waiter = Waiter : ('a -> unit) * 'a -> waiter + type state = | Cancelled - | Waiting of { waiters: (unit -> unit) list } + | Waiting of { waiters: waiter list } type t = { st: state A.t } [@@unboxed] let create () : t = { st = A.make (Waiting { waiters = [] }) } -let create_with f : t = { st = A.make (Waiting { waiters = [ f ] }) } + +let create_with f : t = + { st = A.make (Waiting { waiters = [ Waiter (f, ()) ] }) } let cancel (self : t) = while @@ -18,7 +22,7 @@ let cancel (self : t) = | Cancelled -> false | Waiting { waiters } -> if A.compare_and_set self.st old_st Cancelled then ( - List.iter (fun f -> f ()) waiters; + List.iter (fun (Waiter (f, x)) -> f x) waiters; false ) else true @@ -26,17 +30,19 @@ let cancel (self : t) = () done -let on_cancel (self : t) f : unit = +let on_cancel1 (self : t) f x : unit = + let waiter = Waiter (f, x) in while let old_st = A.get self.st in match old_st with | Cancelled -> - f (); + f x; false | Waiting { waiters = l } -> - not (A.compare_and_set self.st old_st (Waiting { waiters = f :: l })) + not (A.compare_and_set self.st old_st (Waiting { waiters = waiter :: l })) do () done +let[@inline] on_cancel self f = on_cancel1 self f () let dummy = { st = A.make Cancelled } diff --git a/src/unix/cancel_handle.mli b/src/unix/cancel_handle.mli index 7dbed0a4..22f335d8 100644 --- a/src/unix/cancel_handle.mli +++ b/src/unix/cancel_handle.mli @@ -1,3 +1,5 @@ +(** Cancellation tokens *) + type t (** A handle to cancel atomic actions (waiting on something), or stopping a subscription to some event. *) @@ -5,6 +7,7 @@ type t val create : unit -> t val create_with : (unit -> unit) -> t val on_cancel : t -> (unit -> unit) -> unit +val on_cancel1 : t -> ('a -> unit) -> 'a -> unit val cancel : t -> unit (** Perform the cancellation. This should be idempotent. *) diff --git a/src/unix/dune b/src/unix/dune index f47487e3..a2f9f672 100644 --- a/src/unix/dune +++ b/src/unix/dune @@ -1,12 +1,16 @@ - (library - (name moonpool_unix) - (public_name moonpool.unix) - (optional) - (synopsis "Simple Unix-based event loop for moonpool") - (private_modules common_ heap_) - (libraries moonpool moonpool.fib unix - (select time.ml from - (mtime mtime.os.clock -> time.mtime.ml) - (-> time.unix.ml)) - )) + (name moonpool_unix) + (public_name moonpool.unix) + (optional) ; iostream + (synopsis "Simple Unix-based event loop for moonpool") + (private_modules common_ heap_) + (libraries + moonpool + moonpool.fib + unix + iostream + (select + time.ml + from + (mtime mtime.os.clock -> time.mtime.ml) + (-> time.unix.ml)))) diff --git a/src/unix/time.mtime.ml b/src/unix/time.mtime.ml index 745b5412..abcf9639 100644 --- a/src/unix/time.mtime.ml +++ b/src/unix/time.mtime.ml @@ -1,9 +1,7 @@ - - let time_ns : unit -> int64 = Mtime_clock.now_ns (** Monotonic time in seconds *) -let time_s () : float = +let[@inline] time_s () : float = let ns = time_ns () in let s = Int64.(div ns 1_000_000_000L) in let ns' = Int64.(rem ns 1_000_000_000L) in diff --git a/src/unix/timer.mli b/src/unix/timer.mli index 55cd60de..ed2b8f83 100644 --- a/src/unix/timer.mli +++ b/src/unix/timer.mli @@ -1,4 +1,7 @@ +(** Simple timer *) + type t +(** A timer, running tasks after a duration or repeatedly with some frequency *) val create : unit -> t (** A new timer. *)