diff --git a/src/io/IO_unix.ml b/src/io/IO_unix.ml.tmp similarity index 100% rename from src/io/IO_unix.ml rename to src/io/IO_unix.ml.tmp diff --git a/src/io/IO_unix.mli b/src/io/IO_unix.mli.tmp similarity index 100% rename from src/io/IO_unix.mli rename to src/io/IO_unix.mli.tmp diff --git a/src/io/cancel_handle.ml b/src/io/cancel_handle.ml new file mode 100644 index 00000000..89d653ca --- /dev/null +++ b/src/io/cancel_handle.ml @@ -0,0 +1,8 @@ +(** A handle to cancel something *) +type t = + | Cancel1 : ('a -> unit) * 'a -> t + | Cancel2 : ('a -> 'b -> unit) * 'a * 'b -> t + +let cancel = function + | Cancel1 (f, x) -> f x + | Cancel2 (f, x, y) -> f x y diff --git a/src/io/common_.ml b/src/io/common_.ml index 6cfcb1a8..1bbb377b 100644 --- a/src/io/common_.ml +++ b/src/io/common_.ml @@ -1,6 +1,6 @@ +module A = Moonpool.Atomic module FLS = Moonpool_fib.Fls +module Fiber = Moonpool_fib.Fiber let ( let@ ) = ( @@ ) let spf = Printf.sprintf - -type cancel_handle = { cancel: unit -> unit } [@@unboxed] diff --git a/src/io/ev_loop.ml b/src/io/ev_loop.ml new file mode 100644 index 00000000..3c085a7d --- /dev/null +++ b/src/io/ev_loop.ml @@ -0,0 +1,358 @@ +open Common_ +module B_queue = Moonpool.Blocking_queue + +type io_mode = + | Read + | Write + +module IO_wait = struct + type t = { f: unit -> unit } [@@unboxed] + (** A single event, waiting on a unix FD *) + + let[@inline] make f : t = { f } +end + +module Per_fd = struct + type t = { + fd: Unix.file_descr; + mutable reads: IO_wait.t Handle.Map.t; + mutable writes: IO_wait.t Handle.Map.t; + } + + let[@inline] no_reads self = Handle.Map.is_empty self.reads + let[@inline] no_writes self = Handle.Map.is_empty self.writes + let[@inline] is_empty self = no_reads self && no_writes self + + let cancel_read (self : t) h : bool = + if Handle.Map.mem h self.reads then ( + self.reads <- Handle.Map.remove h self.reads; + true + ) else + false + + let cancel_write (self : t) h : bool = + if Handle.Map.mem h self.writes then ( + self.reads <- Handle.Map.remove h self.writes; + true + ) else + false +end + +module IO_tbl = struct + type t = { + mutable n_read: int; + mutable n_write: int; + tbl: (Unix.file_descr, Per_fd.t) Hashtbl.t; + } + + let create () : t = { tbl = Hashtbl.create 32; n_read = 0; n_write = 0 } + + let get_or_create (self : t) fd : Per_fd.t = + try Hashtbl.find self.tbl fd + with Not_found -> + let per_fd = + { Per_fd.fd; reads = Handle.Map.empty; writes = Handle.Map.empty } + in + Hashtbl.add self.tbl fd per_fd; + per_fd + + let update_subs poll (per_fd : Per_fd.t) = + let ev = + match Per_fd.no_reads per_fd, Per_fd.no_writes per_fd with + | true, true -> Poll.Event.none + | true, false -> Poll.Event.write + | false, true -> Poll.Event.read + | false, false -> Poll.Event.read_write + in + Poll.set poll per_fd.fd ev + + let add_io_wait (self : t) poll fd mode (h : Handle.t) (ev : IO_wait.t) = + let per_fd = get_or_create self fd in + (match mode with + | Read -> + self.n_read <- 1 + self.n_read; + per_fd.reads <- Handle.Map.add h ev per_fd.reads + | Write -> + self.n_write <- 1 + self.n_write; + per_fd.writes <- Handle.Map.add h ev per_fd.writes); + update_subs poll per_fd + + let cancel (self : t) (fd : Unix.file_descr) (h : Handle.t) : unit = + match Hashtbl.find_opt self.tbl fd, Handle.handle_type h with + | None, _ -> () + | Some per_fd, H_read -> + if Per_fd.cancel_read per_fd h then self.n_read <- self.n_read - 1 + | Some per_fd, H_write -> + if Per_fd.cancel_write per_fd h then self.n_write <- self.n_write - 1 + | Some _, H_timer -> assert false + + let[@inline] trigger_waiter (_h : Handle.t) (io : IO_wait.t) = io.f () + + let handle_ready_read (self : t) ~ignore_read fd (per_fd : Per_fd.t) = + if fd <> ignore_read then ( + Handle.Map.iter trigger_waiter per_fd.reads; + self.n_read <- self.n_read - Handle.Map.cardinal per_fd.reads; + per_fd.reads <- Handle.Map.empty + ) + + let handle_ready_write (self : t) (per_fd : Per_fd.t) = + Handle.Map.iter trigger_waiter per_fd.writes; + self.n_write <- self.n_write - Handle.Map.cardinal per_fd.writes; + per_fd.writes <- Handle.Map.empty + + let update_all (self : t) ~ignore_read (poll : Poll.t) : unit = + let handle_ready fd (ev : Poll.Event.t) = + match Hashtbl.find self.tbl fd with + | exception Not_found -> () + | per_fd -> + if ev.readable then handle_ready_read self ~ignore_read fd per_fd; + if ev.writable then handle_ready_write self per_fd; + update_subs poll per_fd; + if Per_fd.is_empty per_fd then Hashtbl.remove self.tbl fd + in + Poll.iter_ready poll ~f:handle_ready +end + +(** Messages from other threads *) +module Incoming_msg = struct + type t = + | On_ready of Unix.file_descr * io_mode * Handle.t * (unit -> unit) + | Run_after of float * Handle.t * (unit -> unit) + | Run_every of float * Handle.t * (Cancel_handle.t -> unit) + | Cancel_io of Unix.file_descr * Handle.t + | Cancel_timer of Handle.t +end + +module Main_loop = struct + (** Process timers that have expired, and return the duration until the next timer *) + let process_expired_timers_ (t : Timer.t) : float option = + let rec loop () = + match Timer.next t with + | Timer.Empty -> None + | Timer.Run f -> + f (); + loop () + | Timer.Wait f -> + if f > 0. then + Some f + else + None + in + loop () + + type state = { + timer: Timer.t; + io_tbl: IO_tbl.t; + incoming: Incoming_msg.t B_queue.t; + poll: Poll.t; + in_poll: bool A.t; + buf4: bytes; + _magic_pipe_read: Unix.file_descr; + _magic_pipe_write: Unix.file_descr; + } + + let create () : state = + let _magic_pipe_read, _magic_pipe_write = Unix.pipe ~cloexec:true () in + Unix.set_nonblock _magic_pipe_read; + Unix.set_nonblock _magic_pipe_write; + let poll = Poll.create () in + { + timer = Timer.create (); + io_tbl = IO_tbl.create (); + incoming = B_queue.create (); + in_poll = A.make false; + poll; + buf4 = Bytes.create 4; + _magic_pipe_read; + _magic_pipe_write; + } + + let push (self : state) (msg : Incoming_msg.t) = + B_queue.push self.incoming msg; + if A.exchange self.in_poll false then ( + (* maybe wake up the loop *) + let b = Bytes.make 1 '\x01' in + ignore (Unix.write self._magic_pipe_write b 0 1 : int) + ) + + let cancel_timer_ (self : state) (h : Handle.t) = + push self (Incoming_msg.Cancel_timer h) + + (** Make sure the pipe is empty *) + let drain_pipe_ (self : state) = + while + match Unix.read self._magic_pipe_read self.buf4 0 4 with + | 0 -> false + | _n -> true + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + false + do + () + done + + let process_msg (self : state) (msg : Incoming_msg.t) = + match msg with + | On_ready (fd, mode, h, f) -> + IO_tbl.add_io_wait self.io_tbl self.poll fd mode h (IO_wait.make f) + | Run_after (delay, h, f) -> Timer.run_after_s self.timer delay h f + | Run_every (delay, h, f) -> + let cancel = Cancel_handle.Cancel2 (cancel_timer_, self, h) in + Timer.run_every_s self.timer delay h (fun () -> f cancel) + | Cancel_io (fd, h) -> IO_tbl.cancel self.io_tbl fd h + | Cancel_timer h -> Timer.cancel self.timer h + + let loop (self : state) : unit = + let local = Queue.create () in + while true do + (* process incoming messages *) + B_queue.transfer self.incoming local; + while not (Queue.is_empty local) do + let msg = Queue.pop local in + process_msg self msg + done; + + (* check IOs *) + IO_tbl.update_all self.io_tbl ~ignore_read:self._magic_pipe_read self.poll; + + (* update timers, get next timeout *) + let timer = process_expired_timers_ self.timer in + let timeout = + match timer with + | None -> Poll.Timeout.Never + | Some t_s -> + let t_ns = Int64.of_float @@ ceil @@ (t_s *. 1e9) in + Poll.Timeout.After t_ns + in + + (* wait for something to happen *) + A.set self.in_poll true; + ignore (Poll.wait self.poll timeout : [ `Ok | `Timeout ]); + A.set self.in_poll false; + + drain_pipe_ self; + + IO_tbl.update_all self.io_tbl ~ignore_read:self._magic_pipe_read self.poll + done +end + +type t = Main_loop.state + +let cur_ : t option A.t = A.make None + +open struct + (* used only for init *) + let m = Mutex.create () + + let[@inline never] create_and_set_ () : t = + Mutex.lock m; + match A.get cur_ with + | Some t -> + Mutex.unlock m; + t + | None -> + let st = Main_loop.create () in + (* start background thread *) + let _t : Thread.t = + Moonpool.start_thread_on_some_domain Main_loop.loop st + in + A.set cur_ (Some st); + Mutex.unlock m; + st +end + +let[@inline] get_or_create () : t = + match A.get cur_ with + | Some t -> t + | None -> create_and_set_ () + +let on_readable (self : t) fd f : Cancel_handle.t = + let h = Handle.fresh H_read in + Main_loop.push self (Incoming_msg.On_ready (fd, Read, h, f)); + Cancel_handle.Cancel2 (Main_loop.push, self, Incoming_msg.Cancel_io (fd, h)) + +let on_writable (self : t) fd f : Cancel_handle.t = + let h = Handle.fresh H_write in + Main_loop.push self (Incoming_msg.On_ready (fd, Write, h, f)); + Cancel_handle.Cancel2 (Main_loop.push, self, Incoming_msg.Cancel_io (fd, h)) + +let run_after_s (self : t) delay f : Cancel_handle.t = + let h = Handle.fresh H_timer in + Main_loop.push self (Incoming_msg.Run_after (delay, h, f)); + Cancel_handle.Cancel2 (Main_loop.push, self, Incoming_msg.Cancel_timer h) + +let run_every_s (self : t) delay f : Cancel_handle.t = + let h = Handle.fresh H_timer in + Main_loop.push self (Incoming_msg.Run_every (delay, h, f)); + Cancel_handle.Cancel2 (Main_loop.push, self, Incoming_msg.Cancel_timer h) + +(* +class unix_ev_loop = + let _timer = Timer.create () in + let _io_wait : IO_tbl.t = IO_tbl.create () in + let _in_blocking_section = ref false in + + let _magic_pipe_read, _magic_pipe_write = Unix.pipe ~cloexec:true () in + let () = + Unix.set_nonblock _magic_pipe_read; + Unix.set_nonblock _magic_pipe_write + in + + let[@inline] has_pending_tasks () = + _io_wait.n_read > 0 || _io_wait.n_write > 0 || Timer.has_tasks _timer + in + + object + (* val read_ : (cancel_handle -> unit) Int_tbl.t = Int_tbl.create 32 *) + method one_step ~block () : unit = + let delay = process_expired_timers_ _timer in + + let delay = + if block then + Option.value delay ~default:10. + else + (* do not wait *) + 0. + in + + let reads, writes = IO_tbl.prepare_select _io_wait in + if has_pending_tasks () then ( + _in_blocking_section := true; + let reads, writes, _ = + Unix.select (_magic_pipe_read :: reads) writes [] delay + in + _in_blocking_section := false; + IO_tbl.handle_ready ~ignore_read:_magic_pipe_read _io_wait reads writes + ); + () + + method on_readable + : Unix.file_descr -> (cancel_handle -> unit) -> cancel_handle = + fun fd f : cancel_handle -> + let ev = IO_wait.make f in + IO_tbl.add_io_wait _io_wait fd Read ev; + ev.as_cancel_handle + + method on_writable + : Unix.file_descr -> (cancel_handle -> unit) -> cancel_handle = + fun fd f : cancel_handle -> + let ev = IO_wait.make f in + IO_tbl.add_io_wait _io_wait fd Write ev; + ev.as_cancel_handle + + method on_timer + : float -> repeat:bool -> (cancel_handle -> unit) -> cancel_handle = + fun delay ~repeat f -> + if repeat then + Timer.run_every _timer delay f + else + Timer.run_after _timer delay f + + method interrupt_if_in_blocking_section = + if !_in_blocking_section then ( + let b = Bytes.create 1 in + ignore (Unix.write _magic_pipe_write b 0 1 : int) + ) + + method has_pending_tasks : bool = has_pending_tasks () + end + *) diff --git a/src/io/ev_loop.mli b/src/io/ev_loop.mli new file mode 100644 index 00000000..047bade6 --- /dev/null +++ b/src/io/ev_loop.mli @@ -0,0 +1,13 @@ +open Common_ + +type io_mode = + | Read + | Write + +type t + +val get_or_create : unit -> t +val on_readable : t -> Unix.file_descr -> (unit -> unit) -> Cancel_handle.t +val on_writable : t -> Unix.file_descr -> (unit -> unit) -> Cancel_handle.t +val run_after_s : t -> float -> (unit -> unit) -> Cancel_handle.t +val run_every_s : t -> float -> (Cancel_handle.t -> unit) -> Cancel_handle.t diff --git a/src/io/handle.ml b/src/io/handle.ml new file mode 100644 index 00000000..7b4f2f98 --- /dev/null +++ b/src/io/handle.ml @@ -0,0 +1,48 @@ +open Common_ + +type handle_type = + | H_read + | H_write + | H_timer + +type t = int + +let n_bits_shard = 3 +let n_shards_ = 8 +let shard_mask_ = (1 lsl n_bits_shard) - 1 +let () = assert (1 lsl n_bits_shard = n_shards_) + +(* array of counters, sharded on thread ID to reduce contention. + TODO: avoid false sharing *) +let generators_ : int A.t array = Array.init n_shards_ (fun _ -> A.make 0) + +let int_of_handle_type = function + | H_read -> 0 + | H_write -> 1 + | H_timer -> 2 + +let[@inline] handle_type (self : t) : handle_type = + match self land 0b11 with + | 0 -> H_read + | 1 -> H_write + | 2 -> H_timer + | _ -> assert false + +let fresh (ty : handle_type) : t = + let shard = (Thread.id @@ Thread.self ()) land shard_mask_ in + let n = A.fetch_and_add (Array.unsafe_get generators_ shard) 1 in + let n = + (n lsl (2 + n_bits_shard)) lor (shard lsl 2) lor int_of_handle_type ty + in + n + +module As_key = struct + type t = int + + let equal : t -> t -> bool = ( = ) + let hash = Hashtbl.hash + let compare : t -> t -> int = Stdlib.compare +end + +module Map = Map.Make (As_key) +module Tbl = Hashtbl.Make (As_key) diff --git a/src/io/handle.mli b/src/io/handle.mli new file mode 100644 index 00000000..ff580d05 --- /dev/null +++ b/src/io/handle.mli @@ -0,0 +1,21 @@ +(** Handles. + + Each subscription has a unique handle that is used + to cancel it and refer to it. *) + +type handle_type = + | H_read + | H_write + | H_timer + +type t = private int +(** A handle. Its internal structure is unspecified. *) + +val fresh : handle_type -> t +(** Get a fresh handle *) + +val handle_type : t -> handle_type +(** Recover the type of the handle *) + +module Map : Map.S with type key = t +module Tbl : Hashtbl.S with type key = t diff --git a/src/io/heap_.ml b/src/io/heap_.ml index 752585ec..12fa0d9e 100644 --- a/src/io/heap_.ml +++ b/src/io/heap_.ml @@ -26,24 +26,9 @@ module type S = sig val find_min : t -> elt option (** [find_min h] find the minimal element of the heap [h]. *) - val find_min_exn : t -> elt - (** [find_min_exn h] is like {!find_min} but can fail. - @raise Empty if the heap is empty. *) - - val take : t -> (t * elt) option - (** [take h] extracts and returns the minimum element, and the new heap (without - this element), or [None] if the heap [h] is empty. *) - val take_exn : t -> t * elt (** [take_exn h] is like {!take}, but can fail. @raise Empty if the heap is empty. *) - - val delete_one : (elt -> elt -> bool) -> elt -> t -> t - (** [delete_one eq x h] uses [eq] to find one occurrence of a value [x] - if it exist in the heap [h], and delete it. - If [h] do not contain [x] then it return [h]. *) - - val size : t -> int end module Make (E : PARTIAL_ORD) : S with type elt = E.t = struct @@ -88,46 +73,11 @@ module Make (E : PARTIAL_ORD) : S with type elt = E.t = struct let insert x h = merge (N (1, x, E, E)) h - let find_min_exn = function - | E -> raise Empty - | N (_, x, _, _) -> x - let find_min = function | E -> None | N (_, x, _, _) -> Some x - let take = function - | E -> None - | N (_, x, l, r) -> Some (merge l r, x) - let take_exn = function | E -> raise Empty | N (_, x, l, r) -> merge l r, x - - let delete_one eq x h = - let rec aux = function - | E -> false, E - | N (_, y, l, r) as h -> - if eq x y then - true, merge l r - else if E.leq y x then ( - let found_left, l1 = aux l in - let found, r1 = - if found_left then - true, r - else - aux r - in - if found then - true, _make_node y l1 r1 - else - false, h - ) else - false, h - in - snd (aux h) - - let rec size = function - | E -> 0 - | N (_, _, l, r) -> 1 + size l + size r end diff --git a/src/io/heap_.mli b/src/io/heap_.mli index 3d60f5c9..5393395f 100644 --- a/src/io/heap_.mli +++ b/src/io/heap_.mli @@ -30,24 +30,9 @@ module type S = sig val find_min : t -> elt option (** [find_min h] find the minimal element of the heap [h]. *) - val find_min_exn : t -> elt - (** [find_min_exn h] is like {!find_min} but can fail. - @raise Empty if the heap is empty. *) - - val take : t -> (t * elt) option - (** [take h] extracts and returns the minimum element, and the new heap (without - this element), or [None] if the heap [h] is empty. *) - val take_exn : t -> t * elt (** [take_exn h] is like {!take}, but can fail. @raise Empty if the heap is empty. *) - - val delete_one : (elt -> elt -> bool) -> elt -> t -> t - (** [delete_one eq x h] uses [eq] to find one occurrence of a value [x] - if it exist in the heap [h], and delete it. - If [h] do not contain [x] then it return [h]. *) - - val size : t -> int end module Make (E : PARTIAL_ORD) : S with type elt = E.t diff --git a/src/io/moonpool_io.ml b/src/io/moonpool_io.ml index ead33a7b..d6958c13 100644 --- a/src/io/moonpool_io.ml +++ b/src/io/moonpool_io.ml @@ -1,10 +1,5 @@ open Common_ -include Fuseau -module IO_unix = IO_unix module Timer = Timer -module Net = Net - -let main f = - let loop = new U_loop.unix_ev_loop in - let@ () = U_loop.with_cur loop in - Fuseau.main ~loop:(loop :> Event_loop.t) f +module Ev_loop = Ev_loop +(* TODO: module IO_unix = IO_unix *) +(* TODO: module Net = Net *) diff --git a/src/io/moonpool_io.mli b/src/io/moonpool_io.mli index c9bad41e..1fae812e 100644 --- a/src/io/moonpool_io.mli +++ b/src/io/moonpool_io.mli @@ -1,20 +1,4 @@ -(** Simple event loop based on {!Unix.select}. - - This library combines {!Fuseau}'s fibers with a simple - event loop for IOs based on {!Unix.select}. - It's useful for simple situations or portability. - For bigger system it's probably better to use another - event loop. *) - -include module type of struct - include Moonpool_fib -end - -module IO_unix = IO_unix -module Net = Net +(* module IO_unix = IO_unix *) +(* module Net = Net *) module Timer = Timer - -(* FIXME: lazy background thread for the poll loop? *) - -val main : (unit -> 'a) -> 'a -(** A version of {!Moonpool_fib.main} that uses a Unix-based event loop. *) +module Ev_loop = Ev_loop diff --git a/src/io/net.ml b/src/io/net.ml.tmp similarity index 90% rename from src/io/net.ml rename to src/io/net.ml.tmp index e1817905..8db14797 100644 --- a/src/io/net.ml +++ b/src/io/net.ml.tmp @@ -35,16 +35,17 @@ module Sockaddr = struct end module TCP_server = struct - type t = { fiber: unit Fiber.t } [@@unboxed] + type t = { fut: unit Fut.t } [@@unboxed] exception Stop - let stop_ fiber = - let ebt = Exn_bt.get Stop in - Fuseau.Fiber.Private_.cancel fiber ebt + let stop_ fut = + let ebt = Exn_bt.get_callstack 20 Stop in + let promise = Fut.Private_.unsafe_promise_of_fut fut in + Fut.fulfill_idempotent promise (Error ebt) - let stop self = stop_ self.fiber - let join self = Fuseau.await self.fiber + let stop self = stop_ self.fut + let join self = Fut.await self.fut let with_serve' (addr : Sockaddr.t) handle_client (f : t -> 'a) : 'a = let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in @@ -54,8 +55,8 @@ module TCP_server = struct Unix.setsockopt sock Unix.SO_REUSEADDR true; Unix.listen sock 32; - let fiber = Fuseau.Fiber.Private_.create () in - let self = { fiber } in + let fut, _ = Fut.make () in + let self = { fut } in let loop_client client_sock client_addr : unit = Unix.set_nonblock client_sock; @@ -69,11 +70,11 @@ module TCP_server = struct in let loop () = - while not (Fiber.is_done fiber) do + while not (Fut.is_done fut) do match Unix.accept sock with | client_sock, client_addr -> ignore - (Fuseau.spawn ~propagate_cancel_to_parent:false (fun () -> + (Fiber.spawn ~protect:true (fun () -> loop_client client_sock client_addr) : _ Fiber.t) | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> diff --git a/src/io/net.mli b/src/io/net.mli.tmp similarity index 100% rename from src/io/net.mli rename to src/io/net.mli.tmp diff --git a/src/io/timer.ml b/src/io/timer.ml index 4346bccc..c7ddddc2 100644 --- a/src/io/timer.ml +++ b/src/io/timer.ml @@ -1,18 +1,22 @@ -open Common_ module Time = Time_ type instant_s = float type duration_s = float +type tick_res = + | Wait of float + | Run of (unit -> unit) + | Empty + type kind = | Once | Every of duration_s type task = { + handle: Handle.t; mutable deadline: instant_s; mutable active: bool; - f: cancel_handle -> unit; - as_cancel_handle: cancel_handle; + mutable f: unit -> unit; kind: kind; } @@ -22,52 +26,41 @@ module Task_heap = Heap_.Make (struct let[@inline] leq t1 t2 = t1.deadline <= t2.deadline end) -type t = { mutable tasks: Task_heap.t } +type t = { + mutable tasks: Task_heap.t; + by_handle: task Handle.Tbl.t; +} (** accepted time diff for actions. *) let epsilon_s = 0.000_001 -type tick_res = - | Wait of float - | Run of (cancel_handle -> unit) * cancel_handle - | Empty - let[@inline] has_tasks self = not (Task_heap.is_empty self.tasks) -let[@inline] num_tasks self : int = Task_heap.size self.tasks -let[@inline] pop_task_ self : unit = - let tasks, _t = Task_heap.take_exn self.tasks in +let pop_task_ self : unit = + let tasks, t = Task_heap.take_exn self.tasks in + Handle.Tbl.remove self.by_handle t.handle; self.tasks <- tasks -let run_after self delay f : cancel_handle = - let now = Time.time_s () in - let deadline = now +. delay in - let rec task = - { - deadline; - f; - kind = Once; - active = true; - as_cancel_handle = { cancel = (fun () -> task.active <- false) }; - } - in - self.tasks <- Task_heap.insert task self.tasks; - task.as_cancel_handle +let cancel (self : t) (h : Handle.t) = + match Handle.Tbl.find_opt self.by_handle h with + | None -> () + | Some t -> + t.active <- false; + t.f <- ignore (* free memory captured by [f] *) -let run_every self delay f : cancel_handle = +let run_after_s self delay handle f : unit = let now = Time.time_s () in let deadline = now +. delay in - let rec task = - { - deadline; - f; - kind = Every delay; - active = true; - as_cancel_handle = { cancel = (fun () -> task.active <- false) }; - } - in + let task = { handle; deadline; f; kind = Once; active = true } in self.tasks <- Task_heap.insert task self.tasks; - task.as_cancel_handle + Handle.Tbl.add self.by_handle handle task + +let run_every_s self delay handle f : unit = + let now = Time.time_s () in + let deadline = now +. delay in + let task = { handle; deadline; f; kind = Every delay; active = true } in + self.tasks <- Task_heap.insert task self.tasks; + Handle.Tbl.add self.by_handle handle task let rec next (self : t) : tick_res = match Task_heap.find_min self.tasks with @@ -85,12 +78,13 @@ let rec next (self : t) : tick_res = (match task.kind with | Once -> () | Every dur -> - (* schedule the next iteration *) - task.deadline <- now +. dur; + (* schedule the next iteration. If we're past the deadline, + use the deadline as the starting point for the next iteration. *) + task.deadline <- min now task.deadline +. dur; self.tasks <- Task_heap.insert task self.tasks); - Run (task.f, task.as_cancel_handle) + Run task.f ) else Wait remaining_time_s -let create () = { tasks = Task_heap.empty } +let create () = { tasks = Task_heap.empty; by_handle = Handle.Tbl.create 32 } diff --git a/src/io/timer.mli b/src/io/timer.mli index 42f6c0ed..e56e4499 100644 --- a/src/io/timer.mli +++ b/src/io/timer.mli @@ -1,17 +1,18 @@ -open Common_ +(** A simple timer implementation *) type t +(** A timer. Not thread-safe. *) val create : unit -> t (** A new timer. *) type tick_res = | Wait of float - | Run of (cancel_handle -> unit) * cancel_handle + | Run of (unit -> unit) | Empty val next : t -> tick_res -val run_after : t -> float -> (cancel_handle -> unit) -> cancel_handle -val run_every : t -> float -> (cancel_handle -> unit) -> cancel_handle +val run_after_s : t -> float -> Handle.t -> (unit -> unit) -> unit +val run_every_s : t -> float -> Handle.t -> (unit -> unit) -> unit +val cancel : t -> Handle.t -> unit val has_tasks : t -> bool -val num_tasks : t -> int diff --git a/src/io/u_loop.ml b/src/io/u_loop.ml deleted file mode 100644 index 04fcc1d8..00000000 --- a/src/io/u_loop.ml +++ /dev/null @@ -1,202 +0,0 @@ -open Common_ - -type io_mode = - | Read - | Write - -module IO_wait = struct - type t = { - mutable active: bool; - f: cancel_handle -> unit; - as_cancel_handle: cancel_handle; - } - (** A single event, waiting on a unix FD *) - - let make f : t = - let rec self = - { - active = true; - f; - as_cancel_handle = { cancel = (fun () -> self.active <- false) }; - } - in - self -end - -module Per_fd = struct - type t = { - fd: Unix.file_descr; - mutable reads: IO_wait.t list; - mutable writes: IO_wait.t list; - } - - let[@inline] is_empty self = self.reads = [] && self.writes = [] -end - -module IO_tbl = struct - type t = { - mutable n_read: int; - mutable n_write: int; - tbl: (Unix.file_descr, Per_fd.t) Hashtbl.t; - } - - let create () : t = { tbl = Hashtbl.create 32; n_read = 0; n_write = 0 } - - let get_or_create (self : t) fd : Per_fd.t = - try Hashtbl.find self.tbl fd - with Not_found -> - let per_fd = { Per_fd.fd; reads = []; writes = [] } in - Hashtbl.add self.tbl fd per_fd; - per_fd - - let add_io_wait (self : t) fd mode (ev : IO_wait.t) = - let per_fd = get_or_create self fd in - match mode with - | Read -> - self.n_read <- 1 + self.n_read; - per_fd.reads <- ev :: per_fd.reads - | Write -> - self.n_write <- 1 + self.n_write; - per_fd.writes <- ev :: per_fd.writes - - let prepare_select (self : t) = - let reads = ref [] in - let writes = ref [] in - Hashtbl.iter - (fun _ (per_fd : Per_fd.t) -> - if Per_fd.is_empty per_fd then - Hashtbl.remove self.tbl per_fd.fd - else ( - if per_fd.reads <> [] then reads := per_fd.fd :: !reads; - if per_fd.writes <> [] then writes := per_fd.fd :: !writes - )) - self.tbl; - !reads, !writes - - let trigger_waiter (io : IO_wait.t) = - if io.active then io.f io.as_cancel_handle - - let handle_ready ~ignore_read (self : t) (reads : Unix.file_descr list) - (writes : Unix.file_descr list) : unit = - List.iter - (fun fd -> - if fd <> ignore_read then ( - let per_fd = Hashtbl.find self.tbl fd in - List.iter trigger_waiter per_fd.reads; - self.n_read <- self.n_read - List.length per_fd.reads; - per_fd.reads <- [] - )) - reads; - - List.iter - (fun fd -> - let per_fd = Hashtbl.find self.tbl fd in - List.iter trigger_waiter per_fd.writes; - self.n_write <- self.n_write - List.length per_fd.writes; - per_fd.writes <- []) - writes; - () -end - -let run_timer_ (t : Timer.t) = - let rec loop () = - match Timer.next t with - | Timer.Empty -> None - | Timer.Run (f, ev_h) -> - f ev_h; - loop () - | Timer.Wait f -> - if f > 0. then - Some f - else - None - in - loop () - -class unix_ev_loop = - let _timer = Timer.create () in - let _io_wait : IO_tbl.t = IO_tbl.create () in - let _in_blocking_section = ref false in - - let _magic_pipe_read, _magic_pipe_write = Unix.pipe ~cloexec:true () in - let () = - Unix.set_nonblock _magic_pipe_read; - Unix.set_nonblock _magic_pipe_write - in - - let[@inline] has_pending_tasks () = - _io_wait.n_read > 0 || _io_wait.n_write > 0 || Timer.has_tasks _timer - in - - object - (* val read_ : (cancel_handle -> unit) Int_tbl.t = Int_tbl.create 32 *) - method one_step ~block () : unit = - let delay = run_timer_ _timer in - - let delay = - if block then - Option.value delay ~default:10. - else - (* do not wait *) - 0. - in - - let reads, writes = IO_tbl.prepare_select _io_wait in - if has_pending_tasks () then ( - _in_blocking_section := true; - let reads, writes, _ = - Unix.select (_magic_pipe_read :: reads) writes [] delay - in - _in_blocking_section := false; - IO_tbl.handle_ready ~ignore_read:_magic_pipe_read _io_wait reads writes - ); - () - - method on_readable - : Unix.file_descr -> (cancel_handle -> unit) -> cancel_handle = - fun fd f : cancel_handle -> - let ev = IO_wait.make f in - IO_tbl.add_io_wait _io_wait fd Read ev; - ev.as_cancel_handle - - method on_writable - : Unix.file_descr -> (cancel_handle -> unit) -> cancel_handle = - fun fd f : cancel_handle -> - let ev = IO_wait.make f in - IO_tbl.add_io_wait _io_wait fd Write ev; - ev.as_cancel_handle - - method on_timer - : float -> repeat:bool -> (cancel_handle -> unit) -> cancel_handle = - fun delay ~repeat f -> - if repeat then - Timer.run_every _timer delay f - else - Timer.run_after _timer delay f - - method interrupt_if_in_blocking_section = - if !_in_blocking_section then ( - let b = Bytes.create 1 in - ignore (Unix.write _magic_pipe_write b 0 1 : int) - ) - - method has_pending_tasks : bool = has_pending_tasks () - end - -open struct - let k_ev_loop : unix_ev_loop option ref TLS.key = - TLS.new_key (fun () -> ref None) -end - -(** Access the event loop from within it *) -let[@inline] cur () = - match !(TLS.get k_ev_loop) with - | None -> failwith "must be called from inside Fuseau_unix" - | Some ev -> ev - -let with_cur (ev : unix_ev_loop) f = - let r = TLS.get k_ev_loop in - let old = !r in - r := Some ev; - let finally () = r := old in - Fun.protect ~finally f