wip: IO based on poll

This commit is contained in:
Simon Cruanes 2024-03-06 20:29:20 -05:00
parent 867444d975
commit 82cfe5413e
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
18 changed files with 933 additions and 0 deletions

View file

@ -29,6 +29,9 @@
:with-test)))
(depopts
(trace (>= 0.6))
(mtime (>= 2.0))
(iostream (>= 0.2))
(poll (>= 0.3))
thread-local-storage)
(tags
(thread pool domain futures fork-join)))

View file

@ -20,6 +20,9 @@ depends: [
]
depopts: [
"trace" {>= "0.6"}
"mtime" {>= "2.0"}
"iostream" {>= "0.2"}
"poll" {>= "0.3"}
"thread-local-storage"
]
build: [

142
src/io/IO_unix.ml Normal file
View file

@ -0,0 +1,142 @@
open struct
let _default_buf_size = 16 * 1024
end
type file_descr = Unix.file_descr
let await_readable fd =
let loop = U_loop.cur () in
(* wait for FD to be ready *)
Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup ->
ignore
(loop#on_readable fd (fun ev ->
wakeup ();
Cancel_handle.cancel ev)
: Cancel_handle.t))
let rec read fd buf i len : int =
if len = 0 then
0
else (
match Unix.read fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
await_readable fd;
read fd buf i len
| n -> n
)
let await_writable fd =
let loop = U_loop.cur () in
(* wait for FD to be ready *)
Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup ->
ignore
(loop#on_writable fd (fun ev ->
wakeup ();
Cancel_handle.cancel ev)
: Cancel_handle.t))
let rec write_once fd buf i len : int =
if len = 0 then
0
else (
match Unix.write fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
await_writable fd;
write_once fd buf i len
| n -> n
)
let write fd buf i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
let n = write_once fd buf !i !len in
i := !i + n;
len := !len - n
done
module Out = struct
include Iostream.Out
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size)
fd : t =
let buf_off = ref 0 in
let[@inline] is_full () = !buf_off = Bytes.length buf in
let flush () =
if !buf_off > 0 then (
write fd buf 0 !buf_off;
buf_off := 0
)
in
object
method output_char c =
if is_full () then flush ();
Bytes.set buf !buf_off c;
incr buf_off
method output bs i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
(* make space *)
if is_full () then flush ();
let n = min !len (Bytes.length buf - !buf_off) in
Bytes.blit bs !i buf !buf_off n;
buf_off := !buf_off + n;
i := !i + n;
len := !len - n
done;
(* if full, write eagerly *)
if is_full () then flush ()
method close () =
if close_noerr then (
try
flush ();
Unix.close fd
with _ -> ()
) else (
flush ();
Unix.close fd
)
method flush = flush
end
end
module In = struct
include Iostream.In
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size)
(fd : Unix.file_descr) : t =
let buf_len = ref 0 in
let buf_off = ref 0 in
let refill () =
buf_off := 0;
buf_len := read fd buf 0 (Bytes.length buf)
in
object
method input b i len : int =
if !buf_len = 0 then refill ();
let n = min len !buf_len in
if n > 0 then (
Bytes.blit buf !buf_off b i n;
buf_off := !buf_off + n;
buf_len := !buf_len - n
);
n
method close () =
if close_noerr then (
try Unix.close fd with _ -> ()
) else
Unix.close fd
end
end

21
src/io/IO_unix.mli Normal file
View file

@ -0,0 +1,21 @@
(** Low level Unix IOs *)
type file_descr = Unix.file_descr
val read : file_descr -> bytes -> int -> int -> int
val write_once : file_descr -> bytes -> int -> int -> int
val write : file_descr -> bytes -> int -> int -> unit
val await_readable : file_descr -> unit
val await_writable : file_descr -> unit
module In : sig
include module type of Iostream.In
val of_unix_fd : ?close_noerr:bool -> ?buf:bytes -> file_descr -> t
end
module Out : sig
include module type of Iostream.Out
val of_unix_fd : ?close_noerr:bool -> ?buf:bytes -> file_descr -> t
end

6
src/io/common_.ml Normal file
View file

@ -0,0 +1,6 @@
module FLS = Moonpool_fib.Fls
let ( let@ ) = ( @@ )
let spf = Printf.sprintf
type cancel_handle = { cancel: unit -> unit } [@@unboxed]

14
src/io/dune Normal file
View file

@ -0,0 +1,14 @@
(library
(name moonpool_io)
(public_name moonpool.io)
(synopsis "Event loop for Moonpool based on poll")
(optional) ; dep on poll
(flags :standard -open Moonpool)
(private_modules heap_ time_)
(enabled_if
(>= %{ocaml_version} 5.0))
(libraries moonpool moonpool.fib poll iostream
(select time_.ml from
(mtime mtime.clock.os -> time_.mtime.ml)
(-> time_.unix.ml))))

133
src/io/heap_.ml Normal file
View file

@ -0,0 +1,133 @@
module type PARTIAL_ORD = sig
type t
val leq : t -> t -> bool
(** [leq x y] shall return [true] iff [x] is lower or equal to [y]. *)
end
module type S = sig
type elt
type t
val empty : t
(** [empty] returns the empty heap. *)
val is_empty : t -> bool
(** [is_empty h] returns [true] if the heap [h] is empty. *)
exception Empty
val merge : t -> t -> t
(** [merge h1 h2] merges the two heaps [h1] and [h2]. *)
val insert : elt -> t -> t
(** [insert x h] inserts an element [x] into the heap [h]. *)
val find_min : t -> elt option
(** [find_min h] find the minimal element of the heap [h]. *)
val find_min_exn : t -> elt
(** [find_min_exn h] is like {!find_min} but can fail.
@raise Empty if the heap is empty. *)
val take : t -> (t * elt) option
(** [take h] extracts and returns the minimum element, and the new heap (without
this element), or [None] if the heap [h] is empty. *)
val take_exn : t -> t * elt
(** [take_exn h] is like {!take}, but can fail.
@raise Empty if the heap is empty. *)
val delete_one : (elt -> elt -> bool) -> elt -> t -> t
(** [delete_one eq x h] uses [eq] to find one occurrence of a value [x]
if it exist in the heap [h], and delete it.
If [h] do not contain [x] then it return [h]. *)
val size : t -> int
end
module Make (E : PARTIAL_ORD) : S with type elt = E.t = struct
type elt = E.t
type t =
| E
| N of int * elt * t * t
let empty = E
let is_empty = function
| E -> true
| N _ -> false
exception Empty
(* Rank of the tree *)
let _rank = function
| E -> 0
| N (r, _, _, _) -> r
(* Make a balanced node labelled with [x], and subtrees [a] and [b].
We ensure that the right child's rank is to the rank of the
left child (leftist property). The rank of the resulting node
is the length of the rightmost path. *)
let _make_node x a b =
if _rank a >= _rank b then
N (_rank b + 1, x, a, b)
else
N (_rank a + 1, x, b, a)
let rec merge t1 t2 =
match t1, t2 with
| t, E -> t
| E, t -> t
| N (_, x, a1, b1), N (_, y, a2, b2) ->
if E.leq x y then
_make_node x a1 (merge b1 t2)
else
_make_node y a2 (merge t1 b2)
let insert x h = merge (N (1, x, E, E)) h
let find_min_exn = function
| E -> raise Empty
| N (_, x, _, _) -> x
let find_min = function
| E -> None
| N (_, x, _, _) -> Some x
let take = function
| E -> None
| N (_, x, l, r) -> Some (merge l r, x)
let take_exn = function
| E -> raise Empty
| N (_, x, l, r) -> merge l r, x
let delete_one eq x h =
let rec aux = function
| E -> false, E
| N (_, y, l, r) as h ->
if eq x y then
true, merge l r
else if E.leq y x then (
let found_left, l1 = aux l in
let found, r1 =
if found_left then
true, r
else
aux r
in
if found then
true, _make_node y l1 r1
else
false, h
) else
false, h
in
snd (aux h)
let rec size = function
| E -> 0
| N (_, _, l, r) -> 1 + size l + size r
end

53
src/io/heap_.mli Normal file
View file

@ -0,0 +1,53 @@
(** Leftist Heaps
Implementation following Okasaki's book. *)
module type PARTIAL_ORD = sig
type t
val leq : t -> t -> bool
(** [leq x y] shall return [true] iff [x] is lower or equal to [y]. *)
end
module type S = sig
type elt
type t
val empty : t
(** [empty] returns the empty heap. *)
val is_empty : t -> bool
(** [is_empty h] returns [true] if the heap [h] is empty. *)
exception Empty
val merge : t -> t -> t
(** [merge h1 h2] merges the two heaps [h1] and [h2]. *)
val insert : elt -> t -> t
(** [insert x h] inserts an element [x] into the heap [h]. *)
val find_min : t -> elt option
(** [find_min h] find the minimal element of the heap [h]. *)
val find_min_exn : t -> elt
(** [find_min_exn h] is like {!find_min} but can fail.
@raise Empty if the heap is empty. *)
val take : t -> (t * elt) option
(** [take h] extracts and returns the minimum element, and the new heap (without
this element), or [None] if the heap [h] is empty. *)
val take_exn : t -> t * elt
(** [take_exn h] is like {!take}, but can fail.
@raise Empty if the heap is empty. *)
val delete_one : (elt -> elt -> bool) -> elt -> t -> t
(** [delete_one eq x h] uses [eq] to find one occurrence of a value [x]
if it exist in the heap [h], and delete it.
If [h] do not contain [x] then it return [h]. *)
val size : t -> int
end
module Make (E : PARTIAL_ORD) : S with type elt = E.t

10
src/io/moonpool_io.ml Normal file
View file

@ -0,0 +1,10 @@
open Common_
include Fuseau
module IO_unix = IO_unix
module Timer = Timer
module Net = Net
let main f =
let loop = new U_loop.unix_ev_loop in
let@ () = U_loop.with_cur loop in
Fuseau.main ~loop:(loop :> Event_loop.t) f

20
src/io/moonpool_io.mli Normal file
View file

@ -0,0 +1,20 @@
(** Simple event loop based on {!Unix.select}.
This library combines {!Fuseau}'s fibers with a simple
event loop for IOs based on {!Unix.select}.
It's useful for simple situations or portability.
For bigger system it's probably better to use another
event loop. *)
include module type of struct
include Moonpool_fib
end
module IO_unix = IO_unix
module Net = Net
module Timer = Timer
(* FIXME: lazy background thread for the poll loop? *)
val main : (unit -> 'a) -> 'a
(** A version of {!Moonpool_fib.main} that uses a Unix-based event loop. *)

150
src/io/net.ml Normal file
View file

@ -0,0 +1,150 @@
open Common_
module Inet_addr = struct
type t = Unix.inet_addr
let any = Unix.inet_addr_any
let loopback = Unix.inet_addr_loopback
let show = Unix.string_of_inet_addr
let of_string s = try Some (Unix.inet_addr_of_string s) with _ -> None
let of_string_exn s =
try Unix.inet_addr_of_string s with _ -> invalid_arg "Inet_addr.of_string"
end
module Sockaddr = struct
type t = Unix.sockaddr
let show = function
| Unix.ADDR_UNIX s -> s
| Unix.ADDR_INET (addr, port) ->
spf "%s:%d" (Unix.string_of_inet_addr addr) port
let unix s : t = Unix.ADDR_UNIX s
let inet addr port : t = Unix.ADDR_INET (addr, port)
let inet_parse addr port =
try Some (inet (Unix.inet_addr_of_string addr) port) with _ -> None
let inet_parse_exn addr port =
try inet (Unix.inet_addr_of_string addr) port
with _ -> invalid_arg "Sockadd.inet_parse"
let inet_local port = inet Unix.inet_addr_loopback port
let inet_any port = inet Unix.inet_addr_any port
end
module TCP_server = struct
type t = { fiber: unit Fiber.t } [@@unboxed]
exception Stop
let stop_ fiber =
let ebt = Exn_bt.get Stop in
Fuseau.Fiber.Private_.cancel fiber ebt
let stop self = stop_ self.fiber
let join self = Fuseau.await self.fiber
let with_serve' (addr : Sockaddr.t) handle_client (f : t -> 'a) : 'a =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.bind sock addr;
Unix.set_nonblock sock;
Unix.setsockopt sock Unix.SO_REUSEADDR true;
Unix.listen sock 32;
let fiber = Fuseau.Fiber.Private_.create () in
let self = { fiber } in
let loop_client client_sock client_addr : unit =
Unix.set_nonblock client_sock;
Unix.setsockopt client_sock Unix.TCP_NODELAY true;
let@ () =
Fun.protect ~finally:(fun () ->
try Unix.close client_sock with _ -> ())
in
handle_client client_addr client_sock
in
let loop () =
while not (Fiber.is_done fiber) do
match Unix.accept sock with
| client_sock, client_addr ->
ignore
(Fuseau.spawn ~propagate_cancel_to_parent:false (fun () ->
loop_client client_sock client_addr)
: _ Fiber.t)
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
(* suspend *)
let loop = U_loop.cur () in
Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup ->
(* FIXME: possible race condition: the socket became readable
in the mid-time and we won't get notified. We need to call
[accept] after subscribing to [on_readable]. *)
ignore
(loop#on_readable sock (fun _ev ->
wakeup ();
Cancel_handle.cancel _ev)
: Cancel_handle.t))
done
in
let loop_fiber =
let sched = Fuseau.get_scheduler () in
Fuseau.spawn_as_child_of ~propagate_cancel_to_parent:true sched fiber loop
in
let finally () =
stop_ loop_fiber;
Unix.close sock
in
let@ () = Fun.protect ~finally in
f self
let with_serve (addr : Sockaddr.t) handle_client (f : t -> 'a) : 'a =
with_serve' addr
(fun client_addr client_sock ->
let ic = IO_unix.In.of_unix_fd client_sock in
let oc = IO_unix.Out.of_unix_fd client_sock in
handle_client client_addr ic oc)
f
end
module TCP_client = struct
let with_connect' addr (f : Unix.file_descr -> 'a) : 'a =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.set_nonblock sock;
Unix.setsockopt sock Unix.TCP_NODELAY true;
(* connect asynchronously *)
while
try
Unix.connect sock addr;
false
with
| Unix.Unix_error
((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _)
->
Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup ->
let loop = U_loop.cur () in
ignore
(loop#on_writable sock (fun _ev ->
wakeup ();
Cancel_handle.cancel _ev)
: Cancel_handle.t));
true
do
()
done;
let finally () = try Unix.close sock with _ -> () in
let@ () = Fun.protect ~finally in
f sock
let with_connect addr (f : Iostream.In.t -> Iostream.Out.t -> 'a) : 'a =
with_connect' addr (fun sock ->
let ic = IO_unix.In.of_unix_fd sock in
let oc = IO_unix.Out.of_unix_fd sock in
f ic oc)
end

49
src/io/net.mli Normal file
View file

@ -0,0 +1,49 @@
(** Networking *)
module Inet_addr : sig
type t = Unix.inet_addr
val loopback : t
val any : t
val show : t -> string
val of_string : string -> t option
val of_string_exn : string -> t
(** @raise Invalid_argument *)
end
(** Socket addresses *)
module Sockaddr : sig
type t = Unix.sockaddr
val show : t -> string
val unix : string -> t
val inet : Inet_addr.t -> int -> t
val inet_parse : string -> int -> t option
val inet_parse_exn : string -> int -> t
val inet_local : int -> t
val inet_any : int -> t
end
module TCP_server : sig
type t
val stop : t -> unit
val join : t -> unit
val with_serve' :
Sockaddr.t -> (Sockaddr.t -> Unix.file_descr -> unit) -> (t -> 'a) -> 'a
val with_serve :
Sockaddr.t ->
(Sockaddr.t -> Iostream.In.t -> Iostream.Out.t -> unit) ->
(t -> 'a) ->
'a
end
module TCP_client : sig
val with_connect' : Unix.sockaddr -> (Unix.file_descr -> 'a) -> 'a
val with_connect :
Unix.sockaddr -> (Iostream.In.t -> Iostream.Out.t -> 'a) -> 'a
end

4
src/io/time_.mli Normal file
View file

@ -0,0 +1,4 @@
(** Basic abstraction over a clock *)
val time_s : unit -> float
val time_ns : unit -> int64

8
src/io/time_.mtime.ml Normal file
View file

@ -0,0 +1,8 @@
let time_ns : unit -> int64 = Mtime_clock.now_ns
(** Monotonic time in seconds *)
let time_s () : float =
let ns = time_ns () in
let s = Int64.(div ns 1_000_000_000L) in
let ns' = Int64.(rem ns 1_000_000_000L) in
Int64.to_float s +. (Int64.to_float ns' /. 1e9)

2
src/io/time_.unix.ml Normal file
View file

@ -0,0 +1,2 @@
let time_s = Unix.gettimeofday
let[@inline] time_ns () = Int64.of_float (floor (time_s () *. 1e9))

96
src/io/timer.ml Normal file
View file

@ -0,0 +1,96 @@
open Common_
module Time = Time_
type instant_s = float
type duration_s = float
type kind =
| Once
| Every of duration_s
type task = {
mutable deadline: instant_s;
mutable active: bool;
f: cancel_handle -> unit;
as_cancel_handle: cancel_handle;
kind: kind;
}
module Task_heap = Heap_.Make (struct
type t = task
let[@inline] leq t1 t2 = t1.deadline <= t2.deadline
end)
type t = { mutable tasks: Task_heap.t }
(** accepted time diff for actions. *)
let epsilon_s = 0.000_001
type tick_res =
| Wait of float
| Run of (cancel_handle -> unit) * cancel_handle
| Empty
let[@inline] has_tasks self = not (Task_heap.is_empty self.tasks)
let[@inline] num_tasks self : int = Task_heap.size self.tasks
let[@inline] pop_task_ self : unit =
let tasks, _t = Task_heap.take_exn self.tasks in
self.tasks <- tasks
let run_after self delay f : cancel_handle =
let now = Time.time_s () in
let deadline = now +. delay in
let rec task =
{
deadline;
f;
kind = Once;
active = true;
as_cancel_handle = { cancel = (fun () -> task.active <- false) };
}
in
self.tasks <- Task_heap.insert task self.tasks;
task.as_cancel_handle
let run_every self delay f : cancel_handle =
let now = Time.time_s () in
let deadline = now +. delay in
let rec task =
{
deadline;
f;
kind = Every delay;
active = true;
as_cancel_handle = { cancel = (fun () -> task.active <- false) };
}
in
self.tasks <- Task_heap.insert task self.tasks;
task.as_cancel_handle
let rec next (self : t) : tick_res =
match Task_heap.find_min self.tasks with
| None -> Empty
| Some task when not task.active ->
pop_task_ self;
next self
| Some task ->
let now = Time.time_s () in
let remaining_time_s = task.deadline -. now in
if remaining_time_s <= epsilon_s then (
pop_task_ self;
(match task.kind with
| Once -> ()
| Every dur ->
(* schedule the next iteration *)
task.deadline <- now +. dur;
self.tasks <- Task_heap.insert task self.tasks);
Run (task.f, task.as_cancel_handle)
) else
Wait remaining_time_s
let create () = { tasks = Task_heap.empty }

17
src/io/timer.mli Normal file
View file

@ -0,0 +1,17 @@
open Common_
type t
val create : unit -> t
(** A new timer. *)
type tick_res =
| Wait of float
| Run of (cancel_handle -> unit) * cancel_handle
| Empty
val next : t -> tick_res
val run_after : t -> float -> (cancel_handle -> unit) -> cancel_handle
val run_every : t -> float -> (cancel_handle -> unit) -> cancel_handle
val has_tasks : t -> bool
val num_tasks : t -> int

202
src/io/u_loop.ml Normal file
View file

@ -0,0 +1,202 @@
open Common_
type io_mode =
| Read
| Write
module IO_wait = struct
type t = {
mutable active: bool;
f: cancel_handle -> unit;
as_cancel_handle: cancel_handle;
}
(** A single event, waiting on a unix FD *)
let make f : t =
let rec self =
{
active = true;
f;
as_cancel_handle = { cancel = (fun () -> self.active <- false) };
}
in
self
end
module Per_fd = struct
type t = {
fd: Unix.file_descr;
mutable reads: IO_wait.t list;
mutable writes: IO_wait.t list;
}
let[@inline] is_empty self = self.reads = [] && self.writes = []
end
module IO_tbl = struct
type t = {
mutable n_read: int;
mutable n_write: int;
tbl: (Unix.file_descr, Per_fd.t) Hashtbl.t;
}
let create () : t = { tbl = Hashtbl.create 32; n_read = 0; n_write = 0 }
let get_or_create (self : t) fd : Per_fd.t =
try Hashtbl.find self.tbl fd
with Not_found ->
let per_fd = { Per_fd.fd; reads = []; writes = [] } in
Hashtbl.add self.tbl fd per_fd;
per_fd
let add_io_wait (self : t) fd mode (ev : IO_wait.t) =
let per_fd = get_or_create self fd in
match mode with
| Read ->
self.n_read <- 1 + self.n_read;
per_fd.reads <- ev :: per_fd.reads
| Write ->
self.n_write <- 1 + self.n_write;
per_fd.writes <- ev :: per_fd.writes
let prepare_select (self : t) =
let reads = ref [] in
let writes = ref [] in
Hashtbl.iter
(fun _ (per_fd : Per_fd.t) ->
if Per_fd.is_empty per_fd then
Hashtbl.remove self.tbl per_fd.fd
else (
if per_fd.reads <> [] then reads := per_fd.fd :: !reads;
if per_fd.writes <> [] then writes := per_fd.fd :: !writes
))
self.tbl;
!reads, !writes
let trigger_waiter (io : IO_wait.t) =
if io.active then io.f io.as_cancel_handle
let handle_ready ~ignore_read (self : t) (reads : Unix.file_descr list)
(writes : Unix.file_descr list) : unit =
List.iter
(fun fd ->
if fd <> ignore_read then (
let per_fd = Hashtbl.find self.tbl fd in
List.iter trigger_waiter per_fd.reads;
self.n_read <- self.n_read - List.length per_fd.reads;
per_fd.reads <- []
))
reads;
List.iter
(fun fd ->
let per_fd = Hashtbl.find self.tbl fd in
List.iter trigger_waiter per_fd.writes;
self.n_write <- self.n_write - List.length per_fd.writes;
per_fd.writes <- [])
writes;
()
end
let run_timer_ (t : Timer.t) =
let rec loop () =
match Timer.next t with
| Timer.Empty -> None
| Timer.Run (f, ev_h) ->
f ev_h;
loop ()
| Timer.Wait f ->
if f > 0. then
Some f
else
None
in
loop ()
class unix_ev_loop =
let _timer = Timer.create () in
let _io_wait : IO_tbl.t = IO_tbl.create () in
let _in_blocking_section = ref false in
let _magic_pipe_read, _magic_pipe_write = Unix.pipe ~cloexec:true () in
let () =
Unix.set_nonblock _magic_pipe_read;
Unix.set_nonblock _magic_pipe_write
in
let[@inline] has_pending_tasks () =
_io_wait.n_read > 0 || _io_wait.n_write > 0 || Timer.has_tasks _timer
in
object
(* val read_ : (cancel_handle -> unit) Int_tbl.t = Int_tbl.create 32 *)
method one_step ~block () : unit =
let delay = run_timer_ _timer in
let delay =
if block then
Option.value delay ~default:10.
else
(* do not wait *)
0.
in
let reads, writes = IO_tbl.prepare_select _io_wait in
if has_pending_tasks () then (
_in_blocking_section := true;
let reads, writes, _ =
Unix.select (_magic_pipe_read :: reads) writes [] delay
in
_in_blocking_section := false;
IO_tbl.handle_ready ~ignore_read:_magic_pipe_read _io_wait reads writes
);
()
method on_readable
: Unix.file_descr -> (cancel_handle -> unit) -> cancel_handle =
fun fd f : cancel_handle ->
let ev = IO_wait.make f in
IO_tbl.add_io_wait _io_wait fd Read ev;
ev.as_cancel_handle
method on_writable
: Unix.file_descr -> (cancel_handle -> unit) -> cancel_handle =
fun fd f : cancel_handle ->
let ev = IO_wait.make f in
IO_tbl.add_io_wait _io_wait fd Write ev;
ev.as_cancel_handle
method on_timer
: float -> repeat:bool -> (cancel_handle -> unit) -> cancel_handle =
fun delay ~repeat f ->
if repeat then
Timer.run_every _timer delay f
else
Timer.run_after _timer delay f
method interrupt_if_in_blocking_section =
if !_in_blocking_section then (
let b = Bytes.create 1 in
ignore (Unix.write _magic_pipe_write b 0 1 : int)
)
method has_pending_tasks : bool = has_pending_tasks ()
end
open struct
let k_ev_loop : unix_ev_loop option ref TLS.key =
TLS.new_key (fun () -> ref None)
end
(** Access the event loop from within it *)
let[@inline] cur () =
match !(TLS.get k_ev_loop) with
| None -> failwith "must be called from inside Fuseau_unix"
| Some ev -> ev
let with_cur (ev : unix_ev_loop) f =
let r = TLS.get k_ev_loop in
let old = !r in
r := Some ev;
let finally () = r := old in
Fun.protect ~finally f