From 82cfe5413ebe3228e23c48bf2bbddcf77251c2de Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 6 Mar 2024 20:29:20 -0500 Subject: [PATCH] wip: IO based on poll --- dune-project | 3 + moonpool.opam | 3 + src/io/IO_unix.ml | 142 +++++++++++++++++++++++++++++ src/io/IO_unix.mli | 21 +++++ src/io/common_.ml | 6 ++ src/io/dune | 14 +++ src/io/heap_.ml | 133 +++++++++++++++++++++++++++ src/io/heap_.mli | 53 +++++++++++ src/io/moonpool_io.ml | 10 ++ src/io/moonpool_io.mli | 20 ++++ src/io/net.ml | 150 ++++++++++++++++++++++++++++++ src/io/net.mli | 49 ++++++++++ src/io/time_.mli | 4 + src/io/time_.mtime.ml | 8 ++ src/io/time_.unix.ml | 2 + src/io/timer.ml | 96 ++++++++++++++++++++ src/io/timer.mli | 17 ++++ src/io/u_loop.ml | 202 +++++++++++++++++++++++++++++++++++++++++ 18 files changed, 933 insertions(+) create mode 100644 src/io/IO_unix.ml create mode 100644 src/io/IO_unix.mli create mode 100644 src/io/common_.ml create mode 100644 src/io/dune create mode 100644 src/io/heap_.ml create mode 100644 src/io/heap_.mli create mode 100644 src/io/moonpool_io.ml create mode 100644 src/io/moonpool_io.mli create mode 100644 src/io/net.ml create mode 100644 src/io/net.mli create mode 100644 src/io/time_.mli create mode 100644 src/io/time_.mtime.ml create mode 100644 src/io/time_.unix.ml create mode 100644 src/io/timer.ml create mode 100644 src/io/timer.mli create mode 100644 src/io/u_loop.ml diff --git a/dune-project b/dune-project index 050eb9d8..f42cdb49 100644 --- a/dune-project +++ b/dune-project @@ -29,6 +29,9 @@ :with-test))) (depopts (trace (>= 0.6)) + (mtime (>= 2.0)) + (iostream (>= 0.2)) + (poll (>= 0.3)) thread-local-storage) (tags (thread pool domain futures fork-join))) diff --git a/moonpool.opam b/moonpool.opam index c8afba80..33abc241 100644 --- a/moonpool.opam +++ b/moonpool.opam @@ -20,6 +20,9 @@ depends: [ ] depopts: [ "trace" {>= "0.6"} + "mtime" {>= "2.0"} + "iostream" {>= "0.2"} + "poll" {>= "0.3"} "thread-local-storage" ] build: [ diff --git a/src/io/IO_unix.ml b/src/io/IO_unix.ml new file mode 100644 index 00000000..0457ab31 --- /dev/null +++ b/src/io/IO_unix.ml @@ -0,0 +1,142 @@ +open struct + let _default_buf_size = 16 * 1024 +end + +type file_descr = Unix.file_descr + +let await_readable fd = + let loop = U_loop.cur () in + (* wait for FD to be ready *) + Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup -> + ignore + (loop#on_readable fd (fun ev -> + wakeup (); + Cancel_handle.cancel ev) + : Cancel_handle.t)) + +let rec read fd buf i len : int = + if len = 0 then + 0 + else ( + match Unix.read fd buf i len with + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + await_readable fd; + read fd buf i len + | n -> n + ) + +let await_writable fd = + let loop = U_loop.cur () in + (* wait for FD to be ready *) + Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup -> + ignore + (loop#on_writable fd (fun ev -> + wakeup (); + Cancel_handle.cancel ev) + : Cancel_handle.t)) + +let rec write_once fd buf i len : int = + if len = 0 then + 0 + else ( + match Unix.write fd buf i len with + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + await_writable fd; + write_once fd buf i len + | n -> n + ) + +let write fd buf i len : unit = + let i = ref i in + let len = ref len in + while !len > 0 do + let n = write_once fd buf !i !len in + i := !i + n; + len := !len - n + done + +module Out = struct + include Iostream.Out + + let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) + fd : t = + let buf_off = ref 0 in + + let[@inline] is_full () = !buf_off = Bytes.length buf in + + let flush () = + if !buf_off > 0 then ( + write fd buf 0 !buf_off; + buf_off := 0 + ) + in + + object + method output_char c = + if is_full () then flush (); + Bytes.set buf !buf_off c; + incr buf_off + + method output bs i len : unit = + let i = ref i in + let len = ref len in + + while !len > 0 do + (* make space *) + if is_full () then flush (); + + let n = min !len (Bytes.length buf - !buf_off) in + Bytes.blit bs !i buf !buf_off n; + buf_off := !buf_off + n; + i := !i + n; + len := !len - n + done; + (* if full, write eagerly *) + if is_full () then flush () + + method close () = + if close_noerr then ( + try + flush (); + Unix.close fd + with _ -> () + ) else ( + flush (); + Unix.close fd + ) + + method flush = flush + end +end + +module In = struct + include Iostream.In + + let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) + (fd : Unix.file_descr) : t = + let buf_len = ref 0 in + let buf_off = ref 0 in + + let refill () = + buf_off := 0; + buf_len := read fd buf 0 (Bytes.length buf) + in + + object + method input b i len : int = + if !buf_len = 0 then refill (); + let n = min len !buf_len in + if n > 0 then ( + Bytes.blit buf !buf_off b i n; + buf_off := !buf_off + n; + buf_len := !buf_len - n + ); + n + + method close () = + if close_noerr then ( + try Unix.close fd with _ -> () + ) else + Unix.close fd + end +end diff --git a/src/io/IO_unix.mli b/src/io/IO_unix.mli new file mode 100644 index 00000000..96010e2f --- /dev/null +++ b/src/io/IO_unix.mli @@ -0,0 +1,21 @@ +(** Low level Unix IOs *) + +type file_descr = Unix.file_descr + +val read : file_descr -> bytes -> int -> int -> int +val write_once : file_descr -> bytes -> int -> int -> int +val write : file_descr -> bytes -> int -> int -> unit +val await_readable : file_descr -> unit +val await_writable : file_descr -> unit + +module In : sig + include module type of Iostream.In + + val of_unix_fd : ?close_noerr:bool -> ?buf:bytes -> file_descr -> t +end + +module Out : sig + include module type of Iostream.Out + + val of_unix_fd : ?close_noerr:bool -> ?buf:bytes -> file_descr -> t +end diff --git a/src/io/common_.ml b/src/io/common_.ml new file mode 100644 index 00000000..6cfcb1a8 --- /dev/null +++ b/src/io/common_.ml @@ -0,0 +1,6 @@ +module FLS = Moonpool_fib.Fls + +let ( let@ ) = ( @@ ) +let spf = Printf.sprintf + +type cancel_handle = { cancel: unit -> unit } [@@unboxed] diff --git a/src/io/dune b/src/io/dune new file mode 100644 index 00000000..6d165d9e --- /dev/null +++ b/src/io/dune @@ -0,0 +1,14 @@ + +(library + (name moonpool_io) + (public_name moonpool.io) + (synopsis "Event loop for Moonpool based on poll") + (optional) ; dep on poll + (flags :standard -open Moonpool) + (private_modules heap_ time_) + (enabled_if + (>= %{ocaml_version} 5.0)) + (libraries moonpool moonpool.fib poll iostream + (select time_.ml from + (mtime mtime.clock.os -> time_.mtime.ml) + (-> time_.unix.ml)))) diff --git a/src/io/heap_.ml b/src/io/heap_.ml new file mode 100644 index 00000000..752585ec --- /dev/null +++ b/src/io/heap_.ml @@ -0,0 +1,133 @@ +module type PARTIAL_ORD = sig + type t + + val leq : t -> t -> bool + (** [leq x y] shall return [true] iff [x] is lower or equal to [y]. *) +end + +module type S = sig + type elt + type t + + val empty : t + (** [empty] returns the empty heap. *) + + val is_empty : t -> bool + (** [is_empty h] returns [true] if the heap [h] is empty. *) + + exception Empty + + val merge : t -> t -> t + (** [merge h1 h2] merges the two heaps [h1] and [h2]. *) + + val insert : elt -> t -> t + (** [insert x h] inserts an element [x] into the heap [h]. *) + + val find_min : t -> elt option + (** [find_min h] find the minimal element of the heap [h]. *) + + val find_min_exn : t -> elt + (** [find_min_exn h] is like {!find_min} but can fail. + @raise Empty if the heap is empty. *) + + val take : t -> (t * elt) option + (** [take h] extracts and returns the minimum element, and the new heap (without + this element), or [None] if the heap [h] is empty. *) + + val take_exn : t -> t * elt + (** [take_exn h] is like {!take}, but can fail. + @raise Empty if the heap is empty. *) + + val delete_one : (elt -> elt -> bool) -> elt -> t -> t + (** [delete_one eq x h] uses [eq] to find one occurrence of a value [x] + if it exist in the heap [h], and delete it. + If [h] do not contain [x] then it return [h]. *) + + val size : t -> int +end + +module Make (E : PARTIAL_ORD) : S with type elt = E.t = struct + type elt = E.t + + type t = + | E + | N of int * elt * t * t + + let empty = E + + let is_empty = function + | E -> true + | N _ -> false + + exception Empty + + (* Rank of the tree *) + let _rank = function + | E -> 0 + | N (r, _, _, _) -> r + + (* Make a balanced node labelled with [x], and subtrees [a] and [b]. + We ensure that the right child's rank is ≤ to the rank of the + left child (leftist property). The rank of the resulting node + is the length of the rightmost path. *) + let _make_node x a b = + if _rank a >= _rank b then + N (_rank b + 1, x, a, b) + else + N (_rank a + 1, x, b, a) + + let rec merge t1 t2 = + match t1, t2 with + | t, E -> t + | E, t -> t + | N (_, x, a1, b1), N (_, y, a2, b2) -> + if E.leq x y then + _make_node x a1 (merge b1 t2) + else + _make_node y a2 (merge t1 b2) + + let insert x h = merge (N (1, x, E, E)) h + + let find_min_exn = function + | E -> raise Empty + | N (_, x, _, _) -> x + + let find_min = function + | E -> None + | N (_, x, _, _) -> Some x + + let take = function + | E -> None + | N (_, x, l, r) -> Some (merge l r, x) + + let take_exn = function + | E -> raise Empty + | N (_, x, l, r) -> merge l r, x + + let delete_one eq x h = + let rec aux = function + | E -> false, E + | N (_, y, l, r) as h -> + if eq x y then + true, merge l r + else if E.leq y x then ( + let found_left, l1 = aux l in + let found, r1 = + if found_left then + true, r + else + aux r + in + if found then + true, _make_node y l1 r1 + else + false, h + ) else + false, h + in + snd (aux h) + + let rec size = function + | E -> 0 + | N (_, _, l, r) -> 1 + size l + size r +end diff --git a/src/io/heap_.mli b/src/io/heap_.mli new file mode 100644 index 00000000..3d60f5c9 --- /dev/null +++ b/src/io/heap_.mli @@ -0,0 +1,53 @@ +(** Leftist Heaps + + Implementation following Okasaki's book. *) + +module type PARTIAL_ORD = sig + type t + + val leq : t -> t -> bool + (** [leq x y] shall return [true] iff [x] is lower or equal to [y]. *) +end + +module type S = sig + type elt + type t + + val empty : t + (** [empty] returns the empty heap. *) + + val is_empty : t -> bool + (** [is_empty h] returns [true] if the heap [h] is empty. *) + + exception Empty + + val merge : t -> t -> t + (** [merge h1 h2] merges the two heaps [h1] and [h2]. *) + + val insert : elt -> t -> t + (** [insert x h] inserts an element [x] into the heap [h]. *) + + val find_min : t -> elt option + (** [find_min h] find the minimal element of the heap [h]. *) + + val find_min_exn : t -> elt + (** [find_min_exn h] is like {!find_min} but can fail. + @raise Empty if the heap is empty. *) + + val take : t -> (t * elt) option + (** [take h] extracts and returns the minimum element, and the new heap (without + this element), or [None] if the heap [h] is empty. *) + + val take_exn : t -> t * elt + (** [take_exn h] is like {!take}, but can fail. + @raise Empty if the heap is empty. *) + + val delete_one : (elt -> elt -> bool) -> elt -> t -> t + (** [delete_one eq x h] uses [eq] to find one occurrence of a value [x] + if it exist in the heap [h], and delete it. + If [h] do not contain [x] then it return [h]. *) + + val size : t -> int +end + +module Make (E : PARTIAL_ORD) : S with type elt = E.t diff --git a/src/io/moonpool_io.ml b/src/io/moonpool_io.ml new file mode 100644 index 00000000..ead33a7b --- /dev/null +++ b/src/io/moonpool_io.ml @@ -0,0 +1,10 @@ +open Common_ +include Fuseau +module IO_unix = IO_unix +module Timer = Timer +module Net = Net + +let main f = + let loop = new U_loop.unix_ev_loop in + let@ () = U_loop.with_cur loop in + Fuseau.main ~loop:(loop :> Event_loop.t) f diff --git a/src/io/moonpool_io.mli b/src/io/moonpool_io.mli new file mode 100644 index 00000000..c9bad41e --- /dev/null +++ b/src/io/moonpool_io.mli @@ -0,0 +1,20 @@ +(** Simple event loop based on {!Unix.select}. + + This library combines {!Fuseau}'s fibers with a simple + event loop for IOs based on {!Unix.select}. + It's useful for simple situations or portability. + For bigger system it's probably better to use another + event loop. *) + +include module type of struct + include Moonpool_fib +end + +module IO_unix = IO_unix +module Net = Net +module Timer = Timer + +(* FIXME: lazy background thread for the poll loop? *) + +val main : (unit -> 'a) -> 'a +(** A version of {!Moonpool_fib.main} that uses a Unix-based event loop. *) diff --git a/src/io/net.ml b/src/io/net.ml new file mode 100644 index 00000000..e1817905 --- /dev/null +++ b/src/io/net.ml @@ -0,0 +1,150 @@ +open Common_ + +module Inet_addr = struct + type t = Unix.inet_addr + + let any = Unix.inet_addr_any + let loopback = Unix.inet_addr_loopback + let show = Unix.string_of_inet_addr + let of_string s = try Some (Unix.inet_addr_of_string s) with _ -> None + + let of_string_exn s = + try Unix.inet_addr_of_string s with _ -> invalid_arg "Inet_addr.of_string" +end + +module Sockaddr = struct + type t = Unix.sockaddr + + let show = function + | Unix.ADDR_UNIX s -> s + | Unix.ADDR_INET (addr, port) -> + spf "%s:%d" (Unix.string_of_inet_addr addr) port + + let unix s : t = Unix.ADDR_UNIX s + let inet addr port : t = Unix.ADDR_INET (addr, port) + + let inet_parse addr port = + try Some (inet (Unix.inet_addr_of_string addr) port) with _ -> None + + let inet_parse_exn addr port = + try inet (Unix.inet_addr_of_string addr) port + with _ -> invalid_arg "Sockadd.inet_parse" + + let inet_local port = inet Unix.inet_addr_loopback port + let inet_any port = inet Unix.inet_addr_any port +end + +module TCP_server = struct + type t = { fiber: unit Fiber.t } [@@unboxed] + + exception Stop + + let stop_ fiber = + let ebt = Exn_bt.get Stop in + Fuseau.Fiber.Private_.cancel fiber ebt + + let stop self = stop_ self.fiber + let join self = Fuseau.await self.fiber + + let with_serve' (addr : Sockaddr.t) handle_client (f : t -> 'a) : 'a = + let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + + Unix.bind sock addr; + Unix.set_nonblock sock; + Unix.setsockopt sock Unix.SO_REUSEADDR true; + Unix.listen sock 32; + + let fiber = Fuseau.Fiber.Private_.create () in + let self = { fiber } in + + let loop_client client_sock client_addr : unit = + Unix.set_nonblock client_sock; + Unix.setsockopt client_sock Unix.TCP_NODELAY true; + + let@ () = + Fun.protect ~finally:(fun () -> + try Unix.close client_sock with _ -> ()) + in + handle_client client_addr client_sock + in + + let loop () = + while not (Fiber.is_done fiber) do + match Unix.accept sock with + | client_sock, client_addr -> + ignore + (Fuseau.spawn ~propagate_cancel_to_parent:false (fun () -> + loop_client client_sock client_addr) + : _ Fiber.t) + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + (* suspend *) + let loop = U_loop.cur () in + Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup -> + (* FIXME: possible race condition: the socket became readable + in the mid-time and we won't get notified. We need to call + [accept] after subscribing to [on_readable]. *) + ignore + (loop#on_readable sock (fun _ev -> + wakeup (); + Cancel_handle.cancel _ev) + : Cancel_handle.t)) + done + in + + let loop_fiber = + let sched = Fuseau.get_scheduler () in + Fuseau.spawn_as_child_of ~propagate_cancel_to_parent:true sched fiber loop + in + let finally () = + stop_ loop_fiber; + Unix.close sock + in + let@ () = Fun.protect ~finally in + f self + + let with_serve (addr : Sockaddr.t) handle_client (f : t -> 'a) : 'a = + with_serve' addr + (fun client_addr client_sock -> + let ic = IO_unix.In.of_unix_fd client_sock in + let oc = IO_unix.Out.of_unix_fd client_sock in + handle_client client_addr ic oc) + f +end + +module TCP_client = struct + let with_connect' addr (f : Unix.file_descr -> 'a) : 'a = + let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + Unix.set_nonblock sock; + Unix.setsockopt sock Unix.TCP_NODELAY true; + + (* connect asynchronously *) + while + try + Unix.connect sock addr; + false + with + | Unix.Unix_error + ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _) + -> + Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup -> + let loop = U_loop.cur () in + ignore + (loop#on_writable sock (fun _ev -> + wakeup (); + Cancel_handle.cancel _ev) + : Cancel_handle.t)); + true + do + () + done; + + let finally () = try Unix.close sock with _ -> () in + let@ () = Fun.protect ~finally in + f sock + + let with_connect addr (f : Iostream.In.t -> Iostream.Out.t -> 'a) : 'a = + with_connect' addr (fun sock -> + let ic = IO_unix.In.of_unix_fd sock in + let oc = IO_unix.Out.of_unix_fd sock in + f ic oc) +end diff --git a/src/io/net.mli b/src/io/net.mli new file mode 100644 index 00000000..f6289844 --- /dev/null +++ b/src/io/net.mli @@ -0,0 +1,49 @@ +(** Networking *) + +module Inet_addr : sig + type t = Unix.inet_addr + + val loopback : t + val any : t + val show : t -> string + val of_string : string -> t option + + val of_string_exn : string -> t + (** @raise Invalid_argument *) +end + +(** Socket addresses *) +module Sockaddr : sig + type t = Unix.sockaddr + + val show : t -> string + val unix : string -> t + val inet : Inet_addr.t -> int -> t + val inet_parse : string -> int -> t option + val inet_parse_exn : string -> int -> t + val inet_local : int -> t + val inet_any : int -> t +end + +module TCP_server : sig + type t + + val stop : t -> unit + val join : t -> unit + + val with_serve' : + Sockaddr.t -> (Sockaddr.t -> Unix.file_descr -> unit) -> (t -> 'a) -> 'a + + val with_serve : + Sockaddr.t -> + (Sockaddr.t -> Iostream.In.t -> Iostream.Out.t -> unit) -> + (t -> 'a) -> + 'a +end + +module TCP_client : sig + val with_connect' : Unix.sockaddr -> (Unix.file_descr -> 'a) -> 'a + + val with_connect : + Unix.sockaddr -> (Iostream.In.t -> Iostream.Out.t -> 'a) -> 'a +end diff --git a/src/io/time_.mli b/src/io/time_.mli new file mode 100644 index 00000000..38e6f838 --- /dev/null +++ b/src/io/time_.mli @@ -0,0 +1,4 @@ +(** Basic abstraction over a clock *) + +val time_s : unit -> float +val time_ns : unit -> int64 diff --git a/src/io/time_.mtime.ml b/src/io/time_.mtime.ml new file mode 100644 index 00000000..d50c086c --- /dev/null +++ b/src/io/time_.mtime.ml @@ -0,0 +1,8 @@ +let time_ns : unit -> int64 = Mtime_clock.now_ns + +(** Monotonic time in seconds *) +let time_s () : float = + let ns = time_ns () in + let s = Int64.(div ns 1_000_000_000L) in + let ns' = Int64.(rem ns 1_000_000_000L) in + Int64.to_float s +. (Int64.to_float ns' /. 1e9) diff --git a/src/io/time_.unix.ml b/src/io/time_.unix.ml new file mode 100644 index 00000000..cdb9f337 --- /dev/null +++ b/src/io/time_.unix.ml @@ -0,0 +1,2 @@ +let time_s = Unix.gettimeofday +let[@inline] time_ns () = Int64.of_float (floor (time_s () *. 1e9)) diff --git a/src/io/timer.ml b/src/io/timer.ml new file mode 100644 index 00000000..4346bccc --- /dev/null +++ b/src/io/timer.ml @@ -0,0 +1,96 @@ +open Common_ +module Time = Time_ + +type instant_s = float +type duration_s = float + +type kind = + | Once + | Every of duration_s + +type task = { + mutable deadline: instant_s; + mutable active: bool; + f: cancel_handle -> unit; + as_cancel_handle: cancel_handle; + kind: kind; +} + +module Task_heap = Heap_.Make (struct + type t = task + + let[@inline] leq t1 t2 = t1.deadline <= t2.deadline +end) + +type t = { mutable tasks: Task_heap.t } + +(** accepted time diff for actions. *) +let epsilon_s = 0.000_001 + +type tick_res = + | Wait of float + | Run of (cancel_handle -> unit) * cancel_handle + | Empty + +let[@inline] has_tasks self = not (Task_heap.is_empty self.tasks) +let[@inline] num_tasks self : int = Task_heap.size self.tasks + +let[@inline] pop_task_ self : unit = + let tasks, _t = Task_heap.take_exn self.tasks in + self.tasks <- tasks + +let run_after self delay f : cancel_handle = + let now = Time.time_s () in + let deadline = now +. delay in + let rec task = + { + deadline; + f; + kind = Once; + active = true; + as_cancel_handle = { cancel = (fun () -> task.active <- false) }; + } + in + self.tasks <- Task_heap.insert task self.tasks; + task.as_cancel_handle + +let run_every self delay f : cancel_handle = + let now = Time.time_s () in + let deadline = now +. delay in + let rec task = + { + deadline; + f; + kind = Every delay; + active = true; + as_cancel_handle = { cancel = (fun () -> task.active <- false) }; + } + in + self.tasks <- Task_heap.insert task self.tasks; + task.as_cancel_handle + +let rec next (self : t) : tick_res = + match Task_heap.find_min self.tasks with + | None -> Empty + | Some task when not task.active -> + pop_task_ self; + next self + | Some task -> + let now = Time.time_s () in + + let remaining_time_s = task.deadline -. now in + if remaining_time_s <= epsilon_s then ( + pop_task_ self; + + (match task.kind with + | Once -> () + | Every dur -> + (* schedule the next iteration *) + task.deadline <- now +. dur; + self.tasks <- Task_heap.insert task self.tasks); + + Run (task.f, task.as_cancel_handle) + ) else + Wait remaining_time_s + +let create () = { tasks = Task_heap.empty } diff --git a/src/io/timer.mli b/src/io/timer.mli new file mode 100644 index 00000000..42f6c0ed --- /dev/null +++ b/src/io/timer.mli @@ -0,0 +1,17 @@ +open Common_ + +type t + +val create : unit -> t +(** A new timer. *) + +type tick_res = + | Wait of float + | Run of (cancel_handle -> unit) * cancel_handle + | Empty + +val next : t -> tick_res +val run_after : t -> float -> (cancel_handle -> unit) -> cancel_handle +val run_every : t -> float -> (cancel_handle -> unit) -> cancel_handle +val has_tasks : t -> bool +val num_tasks : t -> int diff --git a/src/io/u_loop.ml b/src/io/u_loop.ml new file mode 100644 index 00000000..04fcc1d8 --- /dev/null +++ b/src/io/u_loop.ml @@ -0,0 +1,202 @@ +open Common_ + +type io_mode = + | Read + | Write + +module IO_wait = struct + type t = { + mutable active: bool; + f: cancel_handle -> unit; + as_cancel_handle: cancel_handle; + } + (** A single event, waiting on a unix FD *) + + let make f : t = + let rec self = + { + active = true; + f; + as_cancel_handle = { cancel = (fun () -> self.active <- false) }; + } + in + self +end + +module Per_fd = struct + type t = { + fd: Unix.file_descr; + mutable reads: IO_wait.t list; + mutable writes: IO_wait.t list; + } + + let[@inline] is_empty self = self.reads = [] && self.writes = [] +end + +module IO_tbl = struct + type t = { + mutable n_read: int; + mutable n_write: int; + tbl: (Unix.file_descr, Per_fd.t) Hashtbl.t; + } + + let create () : t = { tbl = Hashtbl.create 32; n_read = 0; n_write = 0 } + + let get_or_create (self : t) fd : Per_fd.t = + try Hashtbl.find self.tbl fd + with Not_found -> + let per_fd = { Per_fd.fd; reads = []; writes = [] } in + Hashtbl.add self.tbl fd per_fd; + per_fd + + let add_io_wait (self : t) fd mode (ev : IO_wait.t) = + let per_fd = get_or_create self fd in + match mode with + | Read -> + self.n_read <- 1 + self.n_read; + per_fd.reads <- ev :: per_fd.reads + | Write -> + self.n_write <- 1 + self.n_write; + per_fd.writes <- ev :: per_fd.writes + + let prepare_select (self : t) = + let reads = ref [] in + let writes = ref [] in + Hashtbl.iter + (fun _ (per_fd : Per_fd.t) -> + if Per_fd.is_empty per_fd then + Hashtbl.remove self.tbl per_fd.fd + else ( + if per_fd.reads <> [] then reads := per_fd.fd :: !reads; + if per_fd.writes <> [] then writes := per_fd.fd :: !writes + )) + self.tbl; + !reads, !writes + + let trigger_waiter (io : IO_wait.t) = + if io.active then io.f io.as_cancel_handle + + let handle_ready ~ignore_read (self : t) (reads : Unix.file_descr list) + (writes : Unix.file_descr list) : unit = + List.iter + (fun fd -> + if fd <> ignore_read then ( + let per_fd = Hashtbl.find self.tbl fd in + List.iter trigger_waiter per_fd.reads; + self.n_read <- self.n_read - List.length per_fd.reads; + per_fd.reads <- [] + )) + reads; + + List.iter + (fun fd -> + let per_fd = Hashtbl.find self.tbl fd in + List.iter trigger_waiter per_fd.writes; + self.n_write <- self.n_write - List.length per_fd.writes; + per_fd.writes <- []) + writes; + () +end + +let run_timer_ (t : Timer.t) = + let rec loop () = + match Timer.next t with + | Timer.Empty -> None + | Timer.Run (f, ev_h) -> + f ev_h; + loop () + | Timer.Wait f -> + if f > 0. then + Some f + else + None + in + loop () + +class unix_ev_loop = + let _timer = Timer.create () in + let _io_wait : IO_tbl.t = IO_tbl.create () in + let _in_blocking_section = ref false in + + let _magic_pipe_read, _magic_pipe_write = Unix.pipe ~cloexec:true () in + let () = + Unix.set_nonblock _magic_pipe_read; + Unix.set_nonblock _magic_pipe_write + in + + let[@inline] has_pending_tasks () = + _io_wait.n_read > 0 || _io_wait.n_write > 0 || Timer.has_tasks _timer + in + + object + (* val read_ : (cancel_handle -> unit) Int_tbl.t = Int_tbl.create 32 *) + method one_step ~block () : unit = + let delay = run_timer_ _timer in + + let delay = + if block then + Option.value delay ~default:10. + else + (* do not wait *) + 0. + in + + let reads, writes = IO_tbl.prepare_select _io_wait in + if has_pending_tasks () then ( + _in_blocking_section := true; + let reads, writes, _ = + Unix.select (_magic_pipe_read :: reads) writes [] delay + in + _in_blocking_section := false; + IO_tbl.handle_ready ~ignore_read:_magic_pipe_read _io_wait reads writes + ); + () + + method on_readable + : Unix.file_descr -> (cancel_handle -> unit) -> cancel_handle = + fun fd f : cancel_handle -> + let ev = IO_wait.make f in + IO_tbl.add_io_wait _io_wait fd Read ev; + ev.as_cancel_handle + + method on_writable + : Unix.file_descr -> (cancel_handle -> unit) -> cancel_handle = + fun fd f : cancel_handle -> + let ev = IO_wait.make f in + IO_tbl.add_io_wait _io_wait fd Write ev; + ev.as_cancel_handle + + method on_timer + : float -> repeat:bool -> (cancel_handle -> unit) -> cancel_handle = + fun delay ~repeat f -> + if repeat then + Timer.run_every _timer delay f + else + Timer.run_after _timer delay f + + method interrupt_if_in_blocking_section = + if !_in_blocking_section then ( + let b = Bytes.create 1 in + ignore (Unix.write _magic_pipe_write b 0 1 : int) + ) + + method has_pending_tasks : bool = has_pending_tasks () + end + +open struct + let k_ev_loop : unix_ev_loop option ref TLS.key = + TLS.new_key (fun () -> ref None) +end + +(** Access the event loop from within it *) +let[@inline] cur () = + match !(TLS.get k_ev_loop) with + | None -> failwith "must be called from inside Fuseau_unix" + | Some ev -> ev + +let with_cur (ev : unix_ev_loop) f = + let r = TLS.get k_ev_loop in + let old = !r in + r := Some ev; + let finally () = r := old in + Fun.protect ~finally f