wip: background thread for IO loop in moonpool.io using poll

This commit is contained in:
Simon Cruanes 2024-03-06 23:44:02 -05:00
parent d4476f7f31
commit 26779e6e12
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
17 changed files with 508 additions and 352 deletions

8
src/io/cancel_handle.ml Normal file
View file

@ -0,0 +1,8 @@
(** A handle to cancel something *)
type t =
| Cancel1 : ('a -> unit) * 'a -> t
| Cancel2 : ('a -> 'b -> unit) * 'a * 'b -> t
let cancel = function
| Cancel1 (f, x) -> f x
| Cancel2 (f, x, y) -> f x y

View file

@ -1,6 +1,6 @@
module A = Moonpool.Atomic
module FLS = Moonpool_fib.Fls module FLS = Moonpool_fib.Fls
module Fiber = Moonpool_fib.Fiber
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
let spf = Printf.sprintf let spf = Printf.sprintf
type cancel_handle = { cancel: unit -> unit } [@@unboxed]

358
src/io/ev_loop.ml Normal file
View file

@ -0,0 +1,358 @@
open Common_
module B_queue = Moonpool.Blocking_queue
type io_mode =
| Read
| Write
module IO_wait = struct
type t = { f: unit -> unit } [@@unboxed]
(** A single event, waiting on a unix FD *)
let[@inline] make f : t = { f }
end
module Per_fd = struct
type t = {
fd: Unix.file_descr;
mutable reads: IO_wait.t Handle.Map.t;
mutable writes: IO_wait.t Handle.Map.t;
}
let[@inline] no_reads self = Handle.Map.is_empty self.reads
let[@inline] no_writes self = Handle.Map.is_empty self.writes
let[@inline] is_empty self = no_reads self && no_writes self
let cancel_read (self : t) h : bool =
if Handle.Map.mem h self.reads then (
self.reads <- Handle.Map.remove h self.reads;
true
) else
false
let cancel_write (self : t) h : bool =
if Handle.Map.mem h self.writes then (
self.reads <- Handle.Map.remove h self.writes;
true
) else
false
end
module IO_tbl = struct
type t = {
mutable n_read: int;
mutable n_write: int;
tbl: (Unix.file_descr, Per_fd.t) Hashtbl.t;
}
let create () : t = { tbl = Hashtbl.create 32; n_read = 0; n_write = 0 }
let get_or_create (self : t) fd : Per_fd.t =
try Hashtbl.find self.tbl fd
with Not_found ->
let per_fd =
{ Per_fd.fd; reads = Handle.Map.empty; writes = Handle.Map.empty }
in
Hashtbl.add self.tbl fd per_fd;
per_fd
let update_subs poll (per_fd : Per_fd.t) =
let ev =
match Per_fd.no_reads per_fd, Per_fd.no_writes per_fd with
| true, true -> Poll.Event.none
| true, false -> Poll.Event.write
| false, true -> Poll.Event.read
| false, false -> Poll.Event.read_write
in
Poll.set poll per_fd.fd ev
let add_io_wait (self : t) poll fd mode (h : Handle.t) (ev : IO_wait.t) =
let per_fd = get_or_create self fd in
(match mode with
| Read ->
self.n_read <- 1 + self.n_read;
per_fd.reads <- Handle.Map.add h ev per_fd.reads
| Write ->
self.n_write <- 1 + self.n_write;
per_fd.writes <- Handle.Map.add h ev per_fd.writes);
update_subs poll per_fd
let cancel (self : t) (fd : Unix.file_descr) (h : Handle.t) : unit =
match Hashtbl.find_opt self.tbl fd, Handle.handle_type h with
| None, _ -> ()
| Some per_fd, H_read ->
if Per_fd.cancel_read per_fd h then self.n_read <- self.n_read - 1
| Some per_fd, H_write ->
if Per_fd.cancel_write per_fd h then self.n_write <- self.n_write - 1
| Some _, H_timer -> assert false
let[@inline] trigger_waiter (_h : Handle.t) (io : IO_wait.t) = io.f ()
let handle_ready_read (self : t) ~ignore_read fd (per_fd : Per_fd.t) =
if fd <> ignore_read then (
Handle.Map.iter trigger_waiter per_fd.reads;
self.n_read <- self.n_read - Handle.Map.cardinal per_fd.reads;
per_fd.reads <- Handle.Map.empty
)
let handle_ready_write (self : t) (per_fd : Per_fd.t) =
Handle.Map.iter trigger_waiter per_fd.writes;
self.n_write <- self.n_write - Handle.Map.cardinal per_fd.writes;
per_fd.writes <- Handle.Map.empty
let update_all (self : t) ~ignore_read (poll : Poll.t) : unit =
let handle_ready fd (ev : Poll.Event.t) =
match Hashtbl.find self.tbl fd with
| exception Not_found -> ()
| per_fd ->
if ev.readable then handle_ready_read self ~ignore_read fd per_fd;
if ev.writable then handle_ready_write self per_fd;
update_subs poll per_fd;
if Per_fd.is_empty per_fd then Hashtbl.remove self.tbl fd
in
Poll.iter_ready poll ~f:handle_ready
end
(** Messages from other threads *)
module Incoming_msg = struct
type t =
| On_ready of Unix.file_descr * io_mode * Handle.t * (unit -> unit)
| Run_after of float * Handle.t * (unit -> unit)
| Run_every of float * Handle.t * (Cancel_handle.t -> unit)
| Cancel_io of Unix.file_descr * Handle.t
| Cancel_timer of Handle.t
end
module Main_loop = struct
(** Process timers that have expired, and return the duration until the next timer *)
let process_expired_timers_ (t : Timer.t) : float option =
let rec loop () =
match Timer.next t with
| Timer.Empty -> None
| Timer.Run f ->
f ();
loop ()
| Timer.Wait f ->
if f > 0. then
Some f
else
None
in
loop ()
type state = {
timer: Timer.t;
io_tbl: IO_tbl.t;
incoming: Incoming_msg.t B_queue.t;
poll: Poll.t;
in_poll: bool A.t;
buf4: bytes;
_magic_pipe_read: Unix.file_descr;
_magic_pipe_write: Unix.file_descr;
}
let create () : state =
let _magic_pipe_read, _magic_pipe_write = Unix.pipe ~cloexec:true () in
Unix.set_nonblock _magic_pipe_read;
Unix.set_nonblock _magic_pipe_write;
let poll = Poll.create () in
{
timer = Timer.create ();
io_tbl = IO_tbl.create ();
incoming = B_queue.create ();
in_poll = A.make false;
poll;
buf4 = Bytes.create 4;
_magic_pipe_read;
_magic_pipe_write;
}
let push (self : state) (msg : Incoming_msg.t) =
B_queue.push self.incoming msg;
if A.exchange self.in_poll false then (
(* maybe wake up the loop *)
let b = Bytes.make 1 '\x01' in
ignore (Unix.write self._magic_pipe_write b 0 1 : int)
)
let cancel_timer_ (self : state) (h : Handle.t) =
push self (Incoming_msg.Cancel_timer h)
(** Make sure the pipe is empty *)
let drain_pipe_ (self : state) =
while
match Unix.read self._magic_pipe_read self.buf4 0 4 with
| 0 -> false
| _n -> true
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
false
do
()
done
let process_msg (self : state) (msg : Incoming_msg.t) =
match msg with
| On_ready (fd, mode, h, f) ->
IO_tbl.add_io_wait self.io_tbl self.poll fd mode h (IO_wait.make f)
| Run_after (delay, h, f) -> Timer.run_after_s self.timer delay h f
| Run_every (delay, h, f) ->
let cancel = Cancel_handle.Cancel2 (cancel_timer_, self, h) in
Timer.run_every_s self.timer delay h (fun () -> f cancel)
| Cancel_io (fd, h) -> IO_tbl.cancel self.io_tbl fd h
| Cancel_timer h -> Timer.cancel self.timer h
let loop (self : state) : unit =
let local = Queue.create () in
while true do
(* process incoming messages *)
B_queue.transfer self.incoming local;
while not (Queue.is_empty local) do
let msg = Queue.pop local in
process_msg self msg
done;
(* check IOs *)
IO_tbl.update_all self.io_tbl ~ignore_read:self._magic_pipe_read self.poll;
(* update timers, get next timeout *)
let timer = process_expired_timers_ self.timer in
let timeout =
match timer with
| None -> Poll.Timeout.Never
| Some t_s ->
let t_ns = Int64.of_float @@ ceil @@ (t_s *. 1e9) in
Poll.Timeout.After t_ns
in
(* wait for something to happen *)
A.set self.in_poll true;
ignore (Poll.wait self.poll timeout : [ `Ok | `Timeout ]);
A.set self.in_poll false;
drain_pipe_ self;
IO_tbl.update_all self.io_tbl ~ignore_read:self._magic_pipe_read self.poll
done
end
type t = Main_loop.state
let cur_ : t option A.t = A.make None
open struct
(* used only for init *)
let m = Mutex.create ()
let[@inline never] create_and_set_ () : t =
Mutex.lock m;
match A.get cur_ with
| Some t ->
Mutex.unlock m;
t
| None ->
let st = Main_loop.create () in
(* start background thread *)
let _t : Thread.t =
Moonpool.start_thread_on_some_domain Main_loop.loop st
in
A.set cur_ (Some st);
Mutex.unlock m;
st
end
let[@inline] get_or_create () : t =
match A.get cur_ with
| Some t -> t
| None -> create_and_set_ ()
let on_readable (self : t) fd f : Cancel_handle.t =
let h = Handle.fresh H_read in
Main_loop.push self (Incoming_msg.On_ready (fd, Read, h, f));
Cancel_handle.Cancel2 (Main_loop.push, self, Incoming_msg.Cancel_io (fd, h))
let on_writable (self : t) fd f : Cancel_handle.t =
let h = Handle.fresh H_write in
Main_loop.push self (Incoming_msg.On_ready (fd, Write, h, f));
Cancel_handle.Cancel2 (Main_loop.push, self, Incoming_msg.Cancel_io (fd, h))
let run_after_s (self : t) delay f : Cancel_handle.t =
let h = Handle.fresh H_timer in
Main_loop.push self (Incoming_msg.Run_after (delay, h, f));
Cancel_handle.Cancel2 (Main_loop.push, self, Incoming_msg.Cancel_timer h)
let run_every_s (self : t) delay f : Cancel_handle.t =
let h = Handle.fresh H_timer in
Main_loop.push self (Incoming_msg.Run_every (delay, h, f));
Cancel_handle.Cancel2 (Main_loop.push, self, Incoming_msg.Cancel_timer h)
(*
class unix_ev_loop =
let _timer = Timer.create () in
let _io_wait : IO_tbl.t = IO_tbl.create () in
let _in_blocking_section = ref false in
let _magic_pipe_read, _magic_pipe_write = Unix.pipe ~cloexec:true () in
let () =
Unix.set_nonblock _magic_pipe_read;
Unix.set_nonblock _magic_pipe_write
in
let[@inline] has_pending_tasks () =
_io_wait.n_read > 0 || _io_wait.n_write > 0 || Timer.has_tasks _timer
in
object
(* val read_ : (cancel_handle -> unit) Int_tbl.t = Int_tbl.create 32 *)
method one_step ~block () : unit =
let delay = process_expired_timers_ _timer in
let delay =
if block then
Option.value delay ~default:10.
else
(* do not wait *)
0.
in
let reads, writes = IO_tbl.prepare_select _io_wait in
if has_pending_tasks () then (
_in_blocking_section := true;
let reads, writes, _ =
Unix.select (_magic_pipe_read :: reads) writes [] delay
in
_in_blocking_section := false;
IO_tbl.handle_ready ~ignore_read:_magic_pipe_read _io_wait reads writes
);
()
method on_readable
: Unix.file_descr -> (cancel_handle -> unit) -> cancel_handle =
fun fd f : cancel_handle ->
let ev = IO_wait.make f in
IO_tbl.add_io_wait _io_wait fd Read ev;
ev.as_cancel_handle
method on_writable
: Unix.file_descr -> (cancel_handle -> unit) -> cancel_handle =
fun fd f : cancel_handle ->
let ev = IO_wait.make f in
IO_tbl.add_io_wait _io_wait fd Write ev;
ev.as_cancel_handle
method on_timer
: float -> repeat:bool -> (cancel_handle -> unit) -> cancel_handle =
fun delay ~repeat f ->
if repeat then
Timer.run_every _timer delay f
else
Timer.run_after _timer delay f
method interrupt_if_in_blocking_section =
if !_in_blocking_section then (
let b = Bytes.create 1 in
ignore (Unix.write _magic_pipe_write b 0 1 : int)
)
method has_pending_tasks : bool = has_pending_tasks ()
end
*)

13
src/io/ev_loop.mli Normal file
View file

@ -0,0 +1,13 @@
open Common_
type io_mode =
| Read
| Write
type t
val get_or_create : unit -> t
val on_readable : t -> Unix.file_descr -> (unit -> unit) -> Cancel_handle.t
val on_writable : t -> Unix.file_descr -> (unit -> unit) -> Cancel_handle.t
val run_after_s : t -> float -> (unit -> unit) -> Cancel_handle.t
val run_every_s : t -> float -> (Cancel_handle.t -> unit) -> Cancel_handle.t

48
src/io/handle.ml Normal file
View file

@ -0,0 +1,48 @@
open Common_
type handle_type =
| H_read
| H_write
| H_timer
type t = int
let n_bits_shard = 3
let n_shards_ = 8
let shard_mask_ = (1 lsl n_bits_shard) - 1
let () = assert (1 lsl n_bits_shard = n_shards_)
(* array of counters, sharded on thread ID to reduce contention.
TODO: avoid false sharing *)
let generators_ : int A.t array = Array.init n_shards_ (fun _ -> A.make 0)
let int_of_handle_type = function
| H_read -> 0
| H_write -> 1
| H_timer -> 2
let[@inline] handle_type (self : t) : handle_type =
match self land 0b11 with
| 0 -> H_read
| 1 -> H_write
| 2 -> H_timer
| _ -> assert false
let fresh (ty : handle_type) : t =
let shard = (Thread.id @@ Thread.self ()) land shard_mask_ in
let n = A.fetch_and_add (Array.unsafe_get generators_ shard) 1 in
let n =
(n lsl (2 + n_bits_shard)) lor (shard lsl 2) lor int_of_handle_type ty
in
n
module As_key = struct
type t = int
let equal : t -> t -> bool = ( = )
let hash = Hashtbl.hash
let compare : t -> t -> int = Stdlib.compare
end
module Map = Map.Make (As_key)
module Tbl = Hashtbl.Make (As_key)

21
src/io/handle.mli Normal file
View file

@ -0,0 +1,21 @@
(** Handles.
Each subscription has a unique handle that is used
to cancel it and refer to it. *)
type handle_type =
| H_read
| H_write
| H_timer
type t = private int
(** A handle. Its internal structure is unspecified. *)
val fresh : handle_type -> t
(** Get a fresh handle *)
val handle_type : t -> handle_type
(** Recover the type of the handle *)
module Map : Map.S with type key = t
module Tbl : Hashtbl.S with type key = t

View file

@ -26,24 +26,9 @@ module type S = sig
val find_min : t -> elt option val find_min : t -> elt option
(** [find_min h] find the minimal element of the heap [h]. *) (** [find_min h] find the minimal element of the heap [h]. *)
val find_min_exn : t -> elt
(** [find_min_exn h] is like {!find_min} but can fail.
@raise Empty if the heap is empty. *)
val take : t -> (t * elt) option
(** [take h] extracts and returns the minimum element, and the new heap (without
this element), or [None] if the heap [h] is empty. *)
val take_exn : t -> t * elt val take_exn : t -> t * elt
(** [take_exn h] is like {!take}, but can fail. (** [take_exn h] is like {!take}, but can fail.
@raise Empty if the heap is empty. *) @raise Empty if the heap is empty. *)
val delete_one : (elt -> elt -> bool) -> elt -> t -> t
(** [delete_one eq x h] uses [eq] to find one occurrence of a value [x]
if it exist in the heap [h], and delete it.
If [h] do not contain [x] then it return [h]. *)
val size : t -> int
end end
module Make (E : PARTIAL_ORD) : S with type elt = E.t = struct module Make (E : PARTIAL_ORD) : S with type elt = E.t = struct
@ -88,46 +73,11 @@ module Make (E : PARTIAL_ORD) : S with type elt = E.t = struct
let insert x h = merge (N (1, x, E, E)) h let insert x h = merge (N (1, x, E, E)) h
let find_min_exn = function
| E -> raise Empty
| N (_, x, _, _) -> x
let find_min = function let find_min = function
| E -> None | E -> None
| N (_, x, _, _) -> Some x | N (_, x, _, _) -> Some x
let take = function
| E -> None
| N (_, x, l, r) -> Some (merge l r, x)
let take_exn = function let take_exn = function
| E -> raise Empty | E -> raise Empty
| N (_, x, l, r) -> merge l r, x | N (_, x, l, r) -> merge l r, x
let delete_one eq x h =
let rec aux = function
| E -> false, E
| N (_, y, l, r) as h ->
if eq x y then
true, merge l r
else if E.leq y x then (
let found_left, l1 = aux l in
let found, r1 =
if found_left then
true, r
else
aux r
in
if found then
true, _make_node y l1 r1
else
false, h
) else
false, h
in
snd (aux h)
let rec size = function
| E -> 0
| N (_, _, l, r) -> 1 + size l + size r
end end

View file

@ -30,24 +30,9 @@ module type S = sig
val find_min : t -> elt option val find_min : t -> elt option
(** [find_min h] find the minimal element of the heap [h]. *) (** [find_min h] find the minimal element of the heap [h]. *)
val find_min_exn : t -> elt
(** [find_min_exn h] is like {!find_min} but can fail.
@raise Empty if the heap is empty. *)
val take : t -> (t * elt) option
(** [take h] extracts and returns the minimum element, and the new heap (without
this element), or [None] if the heap [h] is empty. *)
val take_exn : t -> t * elt val take_exn : t -> t * elt
(** [take_exn h] is like {!take}, but can fail. (** [take_exn h] is like {!take}, but can fail.
@raise Empty if the heap is empty. *) @raise Empty if the heap is empty. *)
val delete_one : (elt -> elt -> bool) -> elt -> t -> t
(** [delete_one eq x h] uses [eq] to find one occurrence of a value [x]
if it exist in the heap [h], and delete it.
If [h] do not contain [x] then it return [h]. *)
val size : t -> int
end end
module Make (E : PARTIAL_ORD) : S with type elt = E.t module Make (E : PARTIAL_ORD) : S with type elt = E.t

View file

@ -1,10 +1,5 @@
open Common_ open Common_
include Fuseau
module IO_unix = IO_unix
module Timer = Timer module Timer = Timer
module Net = Net module Ev_loop = Ev_loop
(* TODO: module IO_unix = IO_unix *)
let main f = (* TODO: module Net = Net *)
let loop = new U_loop.unix_ev_loop in
let@ () = U_loop.with_cur loop in
Fuseau.main ~loop:(loop :> Event_loop.t) f

View file

@ -1,20 +1,4 @@
(** Simple event loop based on {!Unix.select}. (* module IO_unix = IO_unix *)
(* module Net = Net *)
This library combines {!Fuseau}'s fibers with a simple
event loop for IOs based on {!Unix.select}.
It's useful for simple situations or portability.
For bigger system it's probably better to use another
event loop. *)
include module type of struct
include Moonpool_fib
end
module IO_unix = IO_unix
module Net = Net
module Timer = Timer module Timer = Timer
module Ev_loop = Ev_loop
(* FIXME: lazy background thread for the poll loop? *)
val main : (unit -> 'a) -> 'a
(** A version of {!Moonpool_fib.main} that uses a Unix-based event loop. *)

View file

@ -35,16 +35,17 @@ module Sockaddr = struct
end end
module TCP_server = struct module TCP_server = struct
type t = { fiber: unit Fiber.t } [@@unboxed] type t = { fut: unit Fut.t } [@@unboxed]
exception Stop exception Stop
let stop_ fiber = let stop_ fut =
let ebt = Exn_bt.get Stop in let ebt = Exn_bt.get_callstack 20 Stop in
Fuseau.Fiber.Private_.cancel fiber ebt let promise = Fut.Private_.unsafe_promise_of_fut fut in
Fut.fulfill_idempotent promise (Error ebt)
let stop self = stop_ self.fiber let stop self = stop_ self.fut
let join self = Fuseau.await self.fiber let join self = Fut.await self.fut
let with_serve' (addr : Sockaddr.t) handle_client (f : t -> 'a) : 'a = let with_serve' (addr : Sockaddr.t) handle_client (f : t -> 'a) : 'a =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
@ -54,8 +55,8 @@ module TCP_server = struct
Unix.setsockopt sock Unix.SO_REUSEADDR true; Unix.setsockopt sock Unix.SO_REUSEADDR true;
Unix.listen sock 32; Unix.listen sock 32;
let fiber = Fuseau.Fiber.Private_.create () in let fut, _ = Fut.make () in
let self = { fiber } in let self = { fut } in
let loop_client client_sock client_addr : unit = let loop_client client_sock client_addr : unit =
Unix.set_nonblock client_sock; Unix.set_nonblock client_sock;
@ -69,11 +70,11 @@ module TCP_server = struct
in in
let loop () = let loop () =
while not (Fiber.is_done fiber) do while not (Fut.is_done fut) do
match Unix.accept sock with match Unix.accept sock with
| client_sock, client_addr -> | client_sock, client_addr ->
ignore ignore
(Fuseau.spawn ~propagate_cancel_to_parent:false (fun () -> (Fiber.spawn ~protect:true (fun () ->
loop_client client_sock client_addr) loop_client client_sock client_addr)
: _ Fiber.t) : _ Fiber.t)
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->

View file

@ -1,18 +1,22 @@
open Common_
module Time = Time_ module Time = Time_
type instant_s = float type instant_s = float
type duration_s = float type duration_s = float
type tick_res =
| Wait of float
| Run of (unit -> unit)
| Empty
type kind = type kind =
| Once | Once
| Every of duration_s | Every of duration_s
type task = { type task = {
handle: Handle.t;
mutable deadline: instant_s; mutable deadline: instant_s;
mutable active: bool; mutable active: bool;
f: cancel_handle -> unit; mutable f: unit -> unit;
as_cancel_handle: cancel_handle;
kind: kind; kind: kind;
} }
@ -22,52 +26,41 @@ module Task_heap = Heap_.Make (struct
let[@inline] leq t1 t2 = t1.deadline <= t2.deadline let[@inline] leq t1 t2 = t1.deadline <= t2.deadline
end) end)
type t = { mutable tasks: Task_heap.t } type t = {
mutable tasks: Task_heap.t;
by_handle: task Handle.Tbl.t;
}
(** accepted time diff for actions. *) (** accepted time diff for actions. *)
let epsilon_s = 0.000_001 let epsilon_s = 0.000_001
type tick_res =
| Wait of float
| Run of (cancel_handle -> unit) * cancel_handle
| Empty
let[@inline] has_tasks self = not (Task_heap.is_empty self.tasks) let[@inline] has_tasks self = not (Task_heap.is_empty self.tasks)
let[@inline] num_tasks self : int = Task_heap.size self.tasks
let[@inline] pop_task_ self : unit = let pop_task_ self : unit =
let tasks, _t = Task_heap.take_exn self.tasks in let tasks, t = Task_heap.take_exn self.tasks in
Handle.Tbl.remove self.by_handle t.handle;
self.tasks <- tasks self.tasks <- tasks
let run_after self delay f : cancel_handle = let cancel (self : t) (h : Handle.t) =
let now = Time.time_s () in match Handle.Tbl.find_opt self.by_handle h with
let deadline = now +. delay in | None -> ()
let rec task = | Some t ->
{ t.active <- false;
deadline; t.f <- ignore (* free memory captured by [f] *)
f;
kind = Once;
active = true;
as_cancel_handle = { cancel = (fun () -> task.active <- false) };
}
in
self.tasks <- Task_heap.insert task self.tasks;
task.as_cancel_handle
let run_every self delay f : cancel_handle = let run_after_s self delay handle f : unit =
let now = Time.time_s () in let now = Time.time_s () in
let deadline = now +. delay in let deadline = now +. delay in
let rec task = let task = { handle; deadline; f; kind = Once; active = true } in
{
deadline;
f;
kind = Every delay;
active = true;
as_cancel_handle = { cancel = (fun () -> task.active <- false) };
}
in
self.tasks <- Task_heap.insert task self.tasks; self.tasks <- Task_heap.insert task self.tasks;
task.as_cancel_handle Handle.Tbl.add self.by_handle handle task
let run_every_s self delay handle f : unit =
let now = Time.time_s () in
let deadline = now +. delay in
let task = { handle; deadline; f; kind = Every delay; active = true } in
self.tasks <- Task_heap.insert task self.tasks;
Handle.Tbl.add self.by_handle handle task
let rec next (self : t) : tick_res = let rec next (self : t) : tick_res =
match Task_heap.find_min self.tasks with match Task_heap.find_min self.tasks with
@ -85,12 +78,13 @@ let rec next (self : t) : tick_res =
(match task.kind with (match task.kind with
| Once -> () | Once -> ()
| Every dur -> | Every dur ->
(* schedule the next iteration *) (* schedule the next iteration. If we're past the deadline,
task.deadline <- now +. dur; use the deadline as the starting point for the next iteration. *)
task.deadline <- min now task.deadline +. dur;
self.tasks <- Task_heap.insert task self.tasks); self.tasks <- Task_heap.insert task self.tasks);
Run (task.f, task.as_cancel_handle) Run task.f
) else ) else
Wait remaining_time_s Wait remaining_time_s
let create () = { tasks = Task_heap.empty } let create () = { tasks = Task_heap.empty; by_handle = Handle.Tbl.create 32 }

View file

@ -1,17 +1,18 @@
open Common_ (** A simple timer implementation *)
type t type t
(** A timer. Not thread-safe. *)
val create : unit -> t val create : unit -> t
(** A new timer. *) (** A new timer. *)
type tick_res = type tick_res =
| Wait of float | Wait of float
| Run of (cancel_handle -> unit) * cancel_handle | Run of (unit -> unit)
| Empty | Empty
val next : t -> tick_res val next : t -> tick_res
val run_after : t -> float -> (cancel_handle -> unit) -> cancel_handle val run_after_s : t -> float -> Handle.t -> (unit -> unit) -> unit
val run_every : t -> float -> (cancel_handle -> unit) -> cancel_handle val run_every_s : t -> float -> Handle.t -> (unit -> unit) -> unit
val cancel : t -> Handle.t -> unit
val has_tasks : t -> bool val has_tasks : t -> bool
val num_tasks : t -> int

View file

@ -1,202 +0,0 @@
open Common_
type io_mode =
| Read
| Write
module IO_wait = struct
type t = {
mutable active: bool;
f: cancel_handle -> unit;
as_cancel_handle: cancel_handle;
}
(** A single event, waiting on a unix FD *)
let make f : t =
let rec self =
{
active = true;
f;
as_cancel_handle = { cancel = (fun () -> self.active <- false) };
}
in
self
end
module Per_fd = struct
type t = {
fd: Unix.file_descr;
mutable reads: IO_wait.t list;
mutable writes: IO_wait.t list;
}
let[@inline] is_empty self = self.reads = [] && self.writes = []
end
module IO_tbl = struct
type t = {
mutable n_read: int;
mutable n_write: int;
tbl: (Unix.file_descr, Per_fd.t) Hashtbl.t;
}
let create () : t = { tbl = Hashtbl.create 32; n_read = 0; n_write = 0 }
let get_or_create (self : t) fd : Per_fd.t =
try Hashtbl.find self.tbl fd
with Not_found ->
let per_fd = { Per_fd.fd; reads = []; writes = [] } in
Hashtbl.add self.tbl fd per_fd;
per_fd
let add_io_wait (self : t) fd mode (ev : IO_wait.t) =
let per_fd = get_or_create self fd in
match mode with
| Read ->
self.n_read <- 1 + self.n_read;
per_fd.reads <- ev :: per_fd.reads
| Write ->
self.n_write <- 1 + self.n_write;
per_fd.writes <- ev :: per_fd.writes
let prepare_select (self : t) =
let reads = ref [] in
let writes = ref [] in
Hashtbl.iter
(fun _ (per_fd : Per_fd.t) ->
if Per_fd.is_empty per_fd then
Hashtbl.remove self.tbl per_fd.fd
else (
if per_fd.reads <> [] then reads := per_fd.fd :: !reads;
if per_fd.writes <> [] then writes := per_fd.fd :: !writes
))
self.tbl;
!reads, !writes
let trigger_waiter (io : IO_wait.t) =
if io.active then io.f io.as_cancel_handle
let handle_ready ~ignore_read (self : t) (reads : Unix.file_descr list)
(writes : Unix.file_descr list) : unit =
List.iter
(fun fd ->
if fd <> ignore_read then (
let per_fd = Hashtbl.find self.tbl fd in
List.iter trigger_waiter per_fd.reads;
self.n_read <- self.n_read - List.length per_fd.reads;
per_fd.reads <- []
))
reads;
List.iter
(fun fd ->
let per_fd = Hashtbl.find self.tbl fd in
List.iter trigger_waiter per_fd.writes;
self.n_write <- self.n_write - List.length per_fd.writes;
per_fd.writes <- [])
writes;
()
end
let run_timer_ (t : Timer.t) =
let rec loop () =
match Timer.next t with
| Timer.Empty -> None
| Timer.Run (f, ev_h) ->
f ev_h;
loop ()
| Timer.Wait f ->
if f > 0. then
Some f
else
None
in
loop ()
class unix_ev_loop =
let _timer = Timer.create () in
let _io_wait : IO_tbl.t = IO_tbl.create () in
let _in_blocking_section = ref false in
let _magic_pipe_read, _magic_pipe_write = Unix.pipe ~cloexec:true () in
let () =
Unix.set_nonblock _magic_pipe_read;
Unix.set_nonblock _magic_pipe_write
in
let[@inline] has_pending_tasks () =
_io_wait.n_read > 0 || _io_wait.n_write > 0 || Timer.has_tasks _timer
in
object
(* val read_ : (cancel_handle -> unit) Int_tbl.t = Int_tbl.create 32 *)
method one_step ~block () : unit =
let delay = run_timer_ _timer in
let delay =
if block then
Option.value delay ~default:10.
else
(* do not wait *)
0.
in
let reads, writes = IO_tbl.prepare_select _io_wait in
if has_pending_tasks () then (
_in_blocking_section := true;
let reads, writes, _ =
Unix.select (_magic_pipe_read :: reads) writes [] delay
in
_in_blocking_section := false;
IO_tbl.handle_ready ~ignore_read:_magic_pipe_read _io_wait reads writes
);
()
method on_readable
: Unix.file_descr -> (cancel_handle -> unit) -> cancel_handle =
fun fd f : cancel_handle ->
let ev = IO_wait.make f in
IO_tbl.add_io_wait _io_wait fd Read ev;
ev.as_cancel_handle
method on_writable
: Unix.file_descr -> (cancel_handle -> unit) -> cancel_handle =
fun fd f : cancel_handle ->
let ev = IO_wait.make f in
IO_tbl.add_io_wait _io_wait fd Write ev;
ev.as_cancel_handle
method on_timer
: float -> repeat:bool -> (cancel_handle -> unit) -> cancel_handle =
fun delay ~repeat f ->
if repeat then
Timer.run_every _timer delay f
else
Timer.run_after _timer delay f
method interrupt_if_in_blocking_section =
if !_in_blocking_section then (
let b = Bytes.create 1 in
ignore (Unix.write _magic_pipe_write b 0 1 : int)
)
method has_pending_tasks : bool = has_pending_tasks ()
end
open struct
let k_ev_loop : unix_ev_loop option ref TLS.key =
TLS.new_key (fun () -> ref None)
end
(** Access the event loop from within it *)
let[@inline] cur () =
match !(TLS.get k_ev_loop) with
| None -> failwith "must be called from inside Fuseau_unix"
| Some ev -> ev
let with_cur (ev : unix_ev_loop) f =
let r = TLS.get k_ev_loop in
let old = !r in
r := Some ev;
let finally () = r := old in
Fun.protect ~finally f