From ad2c45aae40f8cdafaeb9f48e77f84518f260c34 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 21 Jun 2024 17:08:12 -0400 Subject: [PATCH] wip: moonpool-unix with idempotent-close fd --- dune-project | 13 + moonpool-unix.opam | 33 +++ src/unix/IO_unix.ml | 60 ----- src/unix/async_io.ml | 549 ++++++++++++++++++++++++++++++++++++++ src/unix/async_io.mli | 42 +++ src/unix/dune | 3 +- src/unix/ev_loop.ml | 11 + src/unix/ev_loop.mli | 3 + src/unix/fd.ml | 19 ++ src/unix/moonpool_unix.ml | 5 +- 10 files changed, 673 insertions(+), 65 deletions(-) create mode 100644 moonpool-unix.opam delete mode 100644 src/unix/IO_unix.ml create mode 100644 src/unix/async_io.ml create mode 100644 src/unix/async_io.mli create mode 100644 src/unix/fd.ml diff --git a/dune-project b/dune-project index 234d03ad..55756c97 100644 --- a/dune-project +++ b/dune-project @@ -33,6 +33,19 @@ (tags (thread pool domain futures fork-join))) +(package + (name moonpool-unix) + (synopsis "Non blocking IO for Moonpool based on Unix") + (depends + (ocaml (>= 5.0)) + dune + (iostream (>= 0.2.2)) + (trace :with-test)) + (depopts + mtime) + (tags + (moonpool io unix async))) + (package (name moonpool-lwt) (synopsis "Event loop for moonpool based on Lwt-engine (experimental)") diff --git a/moonpool-unix.opam b/moonpool-unix.opam new file mode 100644 index 00000000..62cdaf7e --- /dev/null +++ b/moonpool-unix.opam @@ -0,0 +1,33 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +version: "0.6" +synopsis: "Non blocking IO for Moonpool based on Unix" +maintainer: ["Simon Cruanes"] +authors: ["Simon Cruanes"] +license: "MIT" +tags: ["moonpool" "io" "unix" "async"] +homepage: "https://github.com/c-cube/moonpool" +bug-reports: "https://github.com/c-cube/moonpool/issues" +depends: [ + "ocaml" {>= "5.0"} + "dune" {>= "3.0"} + "iostream" {>= "0.2.2"} + "trace" {with-test} + "odoc" {with-doc} +] +depopts: ["mtime"] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/c-cube/moonpool.git" diff --git a/src/unix/IO_unix.ml b/src/unix/IO_unix.ml deleted file mode 100644 index 0ec47d04..00000000 --- a/src/unix/IO_unix.ml +++ /dev/null @@ -1,60 +0,0 @@ -open Common_ - -type file_descr = Unix.file_descr - -(** Non blocking read *) -let rec read fd buf i len : int = - if len = 0 then - 0 - else ( - match Unix.read fd buf i len with - | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> - (* wait for FD to be ready *) - let cancel = Cancel_handle.create () in - let@ () = - Fiber.with_on_self_cancel (fun _ -> Cancel_handle.cancel cancel) - in - Moonpool.Private.Suspend_.suspend - { - handle = - (fun ~run:_ ~resume sus -> - Ev_loop.wait_readable fd cancel (fun cancel -> - resume sus @@ Ok (); - Cancel_handle.cancel cancel)); - }; - read fd buf i len - | n -> n - ) - -(** Non blocking write *) -let rec write_once fd buf i len : int = - if len = 0 then - 0 - else ( - match Unix.write fd buf i len with - | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> - (* wait for FD to be ready *) - let cancel = Cancel_handle.create () in - let@ () = - Fiber.with_on_self_cancel (fun _ -> Cancel_handle.cancel cancel) - in - Moonpool.Private.Suspend_.suspend - { - handle = - (fun ~run:_ ~resume sus -> - Ev_loop.wait_writable fd cancel (fun cancel -> - resume sus @@ Ok (); - Cancel_handle.cancel cancel)); - }; - write_once fd buf i len - | n -> n - ) - -let write fd buf i len : unit = - let i = ref i in - let len = ref len in - while !len > 0 do - let n = write_once fd buf !i !len in - i := !i + n; - len := !len - n - done diff --git a/src/unix/async_io.ml b/src/unix/async_io.ml new file mode 100644 index 00000000..31cbdad7 --- /dev/null +++ b/src/unix/async_io.ml @@ -0,0 +1,549 @@ +open Common_ +module Slice = Iostream.Slice + +let rec read (fd : Fd.t) buf i len : int = + if len = 0 || Fd.closed fd then + 0 + else ( + match Unix.read fd.fd buf i len with + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + (* wait for FD to be ready *) + let cancel = Cancel_handle.create () in + let@ () = + Fiber.with_on_self_cancel (fun _ -> Cancel_handle.cancel cancel) + in + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~run:_ ~resume sus -> + Ev_loop.wait_readable fd.fd cancel (fun cancel -> + resume sus @@ Ok (); + Cancel_handle.cancel cancel)); + }; + read fd buf i len + | n -> n + ) + +let rec write_once (fd : Fd.t) buf i len : int = + if len = 0 || Fd.closed fd then + 0 + else ( + match Unix.write fd.fd buf i len with + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + (* wait for FD to be ready *) + let cancel = Cancel_handle.create () in + let@ () = + Fiber.with_on_self_cancel (fun _ -> Cancel_handle.cancel cancel) + in + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~run:_ ~resume sus -> + Ev_loop.wait_writable fd.fd cancel (fun cancel -> + resume sus @@ Ok (); + Cancel_handle.cancel cancel)); + }; + write_once fd buf i len + | n -> n + ) + +let write fd buf i len : unit = + let i = ref i in + let len = ref len in + while !len > 0 do + let n = write_once fd buf !i !len in + i := !i + n; + len := !len - n + done + +module Reader = struct + include Iostream.In + + let of_fd ?(close_noerr = false) (fd : Fd.t) : t = + object + method input buf i len = read fd buf i len + + method close () = + if close_noerr then + Fd.close_noerr fd + else + Fd.close fd + end + + let of_slice (slice : Slice.t) : t = + (of_bytes ~off:slice.off ~len:slice.len slice.bytes :> t) +end + +module Buf_reader = struct + include Iostream.In_buf + + let of_fd ?(close_noerr = false) ~(buf : bytes) (fd : Fd.t) : t = + let eof = ref false in + object + inherit Iostream.In_buf.t_from_refill ~bytes:buf () + + method private refill (slice : Slice.t) = + if not !eof then ( + slice.off <- 0; + slice.len <- read fd slice.bytes 0 (Bytes.length slice.bytes); + if slice.len = 0 then eof := true + ) + + method close () = + if close_noerr then + Fd.close_noerr fd + else + Fd.close fd + end +end + +(* +(** Output channel (byte sink) *) +module Output = struct + include Iostream.Out_buf + + class of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) + (fd : Unix.file_descr) : + t = + object + inherit t_from_output ~bytes:buf.bytes () + + method private output_underlying bs i len0 = + let i = ref i in + let len = ref len0 in + while !len > 0 do + match Unix.write fd bs !i !len with + | 0 -> failwith "write failed" + | n -> + i := !i + n; + len := !len - n + | exception + Unix.Unix_error + ( ( Unix.EBADF | Unix.ENOTCONN | Unix.ESHUTDOWN + | Unix.ECONNRESET | Unix.EPIPE ), + _, + _ ) -> + failwith "write failed" + | exception + Unix.Unix_error + ((Unix.EWOULDBLOCK | Unix.EAGAIN | Unix.EINTR), _, _) -> + ignore (Unix.select [] [ fd ] [] 1.) + done + + method private close_underlying () = + if not !closed then ( + closed := true; + if close_noerr then ( + try Unix.close fd with _ -> () + ) else + Unix.close fd + ) + end + + let output_buf (self : t) (buf : Buf.t) : unit = + let b = Buf.bytes_slice buf in + output self b 0 (Buf.size buf) + + (** [chunk_encoding oc] makes a new channel that outputs its content into [oc] + in chunk encoding form. + @param close_rec if true, closing the result will also close [oc] + @param buf a buffer used to accumulate data into chunks. + Chunks are emitted when [buf]'s size gets over a certain threshold, + or when [flush] is called. + *) + let chunk_encoding ?(buf = Buf.create ()) ~close_rec (oc : #t) : t = + (* write content of [buf] as a chunk if it's big enough. + If [force=true] then write content of [buf] if it's simply non empty. *) + let write_buf ~force () = + let n = Buf.size buf in + if (force && n > 0) || n >= 4_096 then ( + output_string oc (Printf.sprintf "%x\r\n" n); + output oc (Buf.bytes_slice buf) 0 n; + output_string oc "\r\n"; + Buf.clear buf + ) + in + + object + method flush () = + write_buf ~force:true (); + flush oc + + method close () = + write_buf ~force:true (); + (* write an empty chunk to close the stream *) + output_string oc "0\r\n"; + (* write another crlf after the stream (see #56) *) + output_string oc "\r\n"; + flush oc; + if close_rec then close oc + + method output b i n = + Buf.add_bytes buf b i n; + write_buf ~force:false () + + method output_char c = + Buf.add_char buf c; + write_buf ~force:false () + end +end + +(** Input channel (byte source) *) +module Input = struct + include Iostream.In_buf + + let of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) + (fd : Unix.file_descr) : t = + let eof = ref false in + object + inherit Iostream.In_buf.t_from_refill ~bytes:buf.bytes () + + method private refill (slice : Slice.t) = + if not !eof then ( + slice.off <- 0; + let continue = ref true in + while !continue do + match Unix.read fd slice.bytes 0 (Bytes.length slice.bytes) with + | n -> + slice.len <- n; + continue := false + | exception + Unix.Unix_error + ( ( Unix.EBADF | Unix.ENOTCONN | Unix.ESHUTDOWN + | Unix.ECONNRESET | Unix.EPIPE ), + _, + _ ) -> + eof := true; + continue := false + | exception + Unix.Unix_error + ((Unix.EWOULDBLOCK | Unix.EAGAIN | Unix.EINTR), _, _) -> + ignore (Unix.select [ fd ] [] [] 1.) + done; + (* Printf.eprintf "read returned %d B\n%!" !n; *) + if slice.len = 0 then eof := true + ) + + method close () = + if not !closed then ( + closed := true; + eof := true; + if close_noerr then ( + try Unix.close fd with _ -> () + ) else + Unix.close fd + ) + end + + let of_slice (slice : Slice.t) : t = + object + inherit Iostream.In_buf.t_from_refill ~bytes:slice.bytes () + + method private refill (slice : Slice.t) = + slice.off <- 0; + slice.len <- 0 + + method close () = () + end + + (** Read into the given slice. + @return the number of bytes read, [0] means end of input. *) + let[@inline] input (self : t) buf i len = self#input buf i len + + (** Close the channel. *) + let[@inline] close self : unit = self#close () + + (** Read exactly [len] bytes. + @raise End_of_file if the input did not contain enough data. *) + let really_input (self : t) buf i len : unit = + let i = ref i in + let len = ref len in + while !len > 0 do + let n = input self buf !i !len in + if n = 0 then raise End_of_file; + i := !i + n; + len := !len - n + done + + let iter_slice (f : Slice.t -> unit) (self : #t) : unit = + let continue = ref true in + while !continue do + let slice = self#fill_buf () in + if slice.len = 0 then ( + continue := false; + close self + ) else ( + f slice; + Slice.consume slice slice.len + ) + done + + let iter f self = + iter_slice (fun (slice : Slice.t) -> f slice.bytes slice.off slice.len) self + + let to_chan oc (self : #t) = + iter_slice + (fun (slice : Slice.t) -> + Stdlib.output oc slice.bytes slice.off slice.len) + self + + let to_chan' (oc : #Iostream.Out.t) (self : #t) : unit = + iter_slice + (fun (slice : Slice.t) -> + Iostream.Out.output oc slice.bytes slice.off slice.len) + self + + let read_all_using ~buf (self : #t) : string = + Buf.clear buf; + let continue = ref true in + while !continue do + let slice = fill_buf self in + if slice.len = 0 then + continue := false + else ( + assert (slice.len > 0); + Buf.add_bytes buf slice.bytes slice.off slice.len; + Slice.consume slice slice.len + ) + done; + Buf.contents_and_clear buf + + (** Read [n] bytes from the input into [bytes]. *) + let read_exactly_ ~too_short (self : #t) (bytes : bytes) (n : int) : unit = + assert (Bytes.length bytes >= n); + let offset = ref 0 in + while !offset < n do + let slice = self#fill_buf () in + let n_read = min slice.len (n - !offset) in + Bytes.blit slice.bytes slice.off bytes !offset n_read; + offset := !offset + n_read; + Slice.consume slice n_read; + if n_read = 0 then too_short () + done + + (** read a line into the buffer, after clearing it. *) + let read_line_into (self : t) ~buf : unit = + Buf.clear buf; + let continue = ref true in + while !continue do + let slice = self#fill_buf () in + if slice.len = 0 then ( + continue := false; + if Buf.size buf = 0 then raise End_of_file + ); + let j = ref slice.off in + let limit = slice.off + slice.len in + while !j < limit && Bytes.get slice.bytes !j <> '\n' do + incr j + done; + if !j < limit then ( + assert (Bytes.get slice.bytes !j = '\n'); + (* line without '\n' *) + Buf.add_bytes buf slice.bytes slice.off (!j - slice.off); + (* consume line + '\n' *) + Slice.consume slice (!j - slice.off + 1); + continue := false + ) else ( + Buf.add_bytes buf slice.bytes slice.off slice.len; + Slice.consume slice slice.len + ) + done + + let read_line_using ~buf (self : #t) : string = + read_line_into self ~buf; + Buf.contents_and_clear buf + + let read_line_using_opt ~buf (self : #t) : string option = + match read_line_into self ~buf with + | () -> Some (Buf.contents_and_clear buf) + | exception End_of_file -> None + + (* helper for making a new input stream that either contains at most [size] + bytes, or contains exactly [size] bytes. *) + let reading_exactly_ ~skip_on_close ~close_rec ~size ~bytes (arg : t) : t = + let remaining_size = ref size in + + object + inherit t_from_refill ~bytes () + + method close () = + if !remaining_size > 0 && skip_on_close then skip arg !remaining_size; + if close_rec then close arg + + method private refill (slice : Slice.t) = + slice.off <- 0; + slice.len <- 0; + if !remaining_size > 0 then ( + let sub = fill_buf arg in + let n = + min !remaining_size (min sub.len (Bytes.length slice.bytes)) + in + Bytes.blit sub.bytes sub.off slice.bytes 0 n; + Slice.consume sub n; + remaining_size := !remaining_size - n; + slice.len <- n + ) + end + + (** new stream with maximum size [max_size]. + @param close_rec if true, closing this will also close the input stream *) + let limit_size_to ~close_rec ~max_size ~bytes (arg : t) : t = + reading_exactly_ ~size:max_size ~skip_on_close:false ~bytes ~close_rec arg + + (** New stream that consumes exactly [size] bytes from the input. + If fewer bytes are read before [close] is called, we read and discard + the remaining quota of bytes before [close] returns. + @param close_rec if true, closing this will also close the input stream *) + let reading_exactly ~close_rec ~size ~bytes (arg : t) : t = + reading_exactly_ ~size ~close_rec ~skip_on_close:true ~bytes arg + + let read_chunked ~(bytes : bytes) ~fail (ic : #t) : t = + let first = ref true in + + (* small buffer to read the chunk sizes *) + let line_buf = Buf.create ~size:32 () in + let read_next_chunk_len () : int = + if !first then + first := false + else ( + let line = read_line_using ~buf:line_buf ic in + if String.trim line <> "" then + raise (fail "expected crlf between chunks") + ); + let line = read_line_using ~buf:line_buf ic in + (* parse chunk length, ignore extensions *) + let chunk_size = + if String.trim line = "" then + 0 + else ( + try + let off = ref 0 in + let n = Parse_.pos_hex line off in + n + with _ -> + raise (fail (spf "cannot read chunk size from line %S" line)) + ) + in + chunk_size + in + let eof = ref false in + let chunk_size = ref 0 in + + object + inherit t_from_refill ~bytes () + + method private refill (slice : Slice.t) : unit = + if !chunk_size = 0 && not !eof then ( + chunk_size := read_next_chunk_len (); + if !chunk_size = 0 then ( + (* stream is finished, consume trailing \r\n *) + eof := true; + let line = read_line_using ~buf:line_buf ic in + if String.trim line <> "" then + raise + (fail (spf "expected \\r\\n to follow last chunk, got %S" line)) + ) + ); + slice.off <- 0; + slice.len <- 0; + if !chunk_size > 0 then ( + (* read the whole chunk, or [Bytes.length bytes] of it *) + let to_read = min !chunk_size (Bytes.length slice.bytes) in + read_exactly_ + ~too_short:(fun () -> raise (fail "chunk is too short")) + ic slice.bytes to_read; + slice.len <- to_read; + chunk_size := !chunk_size - to_read + ) + + method close () = eof := true (* do not close underlying stream *) + end + + (** Output a stream using chunked encoding *) + let output_chunked' ?buf (oc : #Iostream.Out_buf.t) (self : #t) : unit = + let oc' = Output.chunk_encoding ?buf oc ~close_rec:false in + match to_chan' oc' self with + | () -> Output.close oc' + | exception e -> + let bt = Printexc.get_raw_backtrace () in + Output.close oc'; + Printexc.raise_with_backtrace e bt + + (** print a stream as a series of chunks *) + let output_chunked ?buf (oc : out_channel) (self : #t) : unit = + output_chunked' ?buf (Output.of_out_channel oc) self +end + +(** A writer abstraction. *) +module Writer = struct + type t = { write: Output.t -> unit } [@@unboxed] + (** Writer. + + A writer is a push-based stream of bytes. + Give it an output channel and it will write the bytes in it. + + This is useful for responses: an http endpoint can return a writer + as its response's body; the writer is given access to the connection + to the client and can write into it as if it were a regular + [out_channel], including controlling calls to [flush]. + Tiny_httpd will convert these writes into valid HTTP chunks. + @since 0.14 + *) + + let[@inline] make ~write () : t = { write } + + (** Write into the channel. *) + let[@inline] write (oc : #Output.t) (self : t) : unit = + self.write (oc :> Output.t) + + (** Empty writer, will output 0 bytes. *) + let empty : t = { write = ignore } + + (** A writer that just emits the bytes from the given string. *) + let[@inline] of_string (str : string) : t = + let write oc = Iostream.Out.output_string oc str in + { write } + + let[@inline] of_input (ic : #Input.t) : t = + { write = (fun oc -> Input.to_chan' oc ic) } +end + +(** A TCP server abstraction. *) +module TCP_server = struct + type conn_handler = { + handle: client_addr:Unix.sockaddr -> Input.t -> Output.t -> unit; + (** Handle client connection *) + } + + type t = { + endpoint: unit -> string * int; + (** Endpoint we listen on. This can only be called from within [serve]. *) + active_connections: unit -> int; + (** Number of connections currently active *) + running: unit -> bool; (** Is the server currently running? *) + stop: unit -> unit; + (** Ask the server to stop. This might not take effect immediately, + and is idempotent. After this [server.running()] must return [false]. *) + } + (** A running TCP server. + + This contains some functions that provide information about the running + server, including whether it's active (as opposed to stopped), a function + to stop it, and statistics about the number of connections. *) + + type builder = { + serve: after_init:(t -> unit) -> handle:conn_handler -> unit -> unit; + (** Blocking call to listen for incoming connections and handle them. + Uses the connection handler [handle] to handle individual client + connections in individual threads/fibers/tasks. + @param after_init is called once with the server after the server + has started. *) + } + (** A TCP server builder implementation. + + Calling [builder.serve ~after_init ~handle ()] starts a new TCP server on + an unspecified endpoint + (most likely coming from the function returning this builder) + and returns the running server. *) +end +*) diff --git a/src/unix/async_io.mli b/src/unix/async_io.mli new file mode 100644 index 00000000..2fe04d52 --- /dev/null +++ b/src/unix/async_io.mli @@ -0,0 +1,42 @@ +module Slice = Iostream.Slice + +val read : Fd.t -> bytes -> int -> int -> int +(** Non blocking read *) + +val write_once : Fd.t -> bytes -> int -> int -> int +(** Non blocking write *) + +val write : Fd.t -> bytes -> int -> int -> unit + +module Reader : sig + include module type of struct + include Iostream.In + end + + val of_fd : ?close_noerr:bool -> Fd.t -> t + val of_slice : Slice.t -> t +end + +module Buf_reader : sig + include module type of struct + include Iostream.In_buf + end + + val of_fd : ?close_noerr:bool -> buf:bytes -> Fd.t -> t +end + +module Writer : sig + include module type of struct + include Iostream.Out + end + + val of_fd : ?close_noerr:bool -> Fd.t -> t +end + +module Buf_writer : sig + include module type of struct + include Iostream.Out_buf + end + + val of_fd : ?close_noerr:bool -> buf:bytes -> Fd.t -> t +end diff --git a/src/unix/dune b/src/unix/dune index a2f9f672..bd609139 100644 --- a/src/unix/dune +++ b/src/unix/dune @@ -1,7 +1,6 @@ (library (name moonpool_unix) - (public_name moonpool.unix) - (optional) ; iostream + (public_name moonpool-unix) (synopsis "Simple Unix-based event loop for moonpool") (private_modules common_ heap_) (libraries diff --git a/src/unix/ev_loop.ml b/src/unix/ev_loop.ml index f437970f..66623a40 100644 --- a/src/unix/ev_loop.ml +++ b/src/unix/ev_loop.ml @@ -249,6 +249,17 @@ let with_loop ~runner f = (* return result of [fib] *) Moonpool.Fut.get_or_fail_exn @@ Fiber.res fib +let start_background_loop () = + let run () = + let@ _sp = Tracing_.with_span "Moonpool_unix.bg-loop" in + let ev_loop = Ev_loop.create () in + if set_as_current_ ev_loop then + while true do + Ev_loop.run_step_ ev_loop + done + in + ignore (Thread.create run () : Thread.t) + (* ### external inputs *) let[@inline] get_current_ () = diff --git a/src/unix/ev_loop.mli b/src/unix/ev_loop.mli index 23fee7c4..48bab911 100644 --- a/src/unix/ev_loop.mli +++ b/src/unix/ev_loop.mli @@ -13,3 +13,6 @@ val with_loop : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a (** Run with the event loop processed in the current thread. There can only be one such loop running at a time. @raise Failure if another call to {!with_loop} is already in effect. *) + +val start_background_loop : unit -> unit +(** Start the event loop in a new background thread. Idempotent. *) diff --git a/src/unix/fd.ml b/src/unix/fd.ml new file mode 100644 index 00000000..196c1436 --- /dev/null +++ b/src/unix/fd.ml @@ -0,0 +1,19 @@ +open Common_ + +type t = { + fd: Unix.file_descr; + closed: bool A.t; +} +(** A file descriptor with idempotent closing *) + +let create (fd : Unix.file_descr) : t = { fd; closed = A.make false } +let[@inline] closed self = A.get self.closed + +(** Close the file descriptor. Idempotent. *) +let[@inline] close (self : t) : unit = + if not (A.exchange self.closed true) then Unix.close self.fd + +let[@inline] close_noerr (self : t) : unit = + if not (A.exchange self.closed true) then ( + try Unix.close self.fd with _ -> () + ) diff --git a/src/unix/moonpool_unix.ml b/src/unix/moonpool_unix.ml index 5262890c..ca87a3f3 100644 --- a/src/unix/moonpool_unix.ml +++ b/src/unix/moonpool_unix.ml @@ -10,10 +10,9 @@ module Fut = Moonpool.Fut (** {2 Event loop modules} *) +module Fd = Fd module Cancel_handle = Cancel_handle -module IO_in = IO_in -module IO_out = IO_out -include IO_unix +include Async_io let run_after_s = Ev_loop.run_after_s let run_every_s = Ev_loop.run_every_s