add moonpool.unix with a basic event loop

This commit is contained in:
Simon Cruanes 2024-02-06 21:30:03 -05:00
parent c3ef591231
commit 56c32ff5a0
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
19 changed files with 1089 additions and 0 deletions

View file

@ -29,6 +29,7 @@
:with-test)))
(depopts
(trace (>= 0.6))
(mtime (>= 2.0))
thread-local-storage)
(tags
(thread pool domain futures fork-join)))

View file

@ -20,6 +20,7 @@ depends: [
]
depopts: [
"trace" {>= "0.6"}
"mtime" {>= "2.0"}
"thread-local-storage"
]
build: [

154
src/unix/IO_in.ml Normal file
View file

@ -0,0 +1,154 @@
open Common_
class type t =
object
method input : bytes -> int -> int -> int
(** Read into the slice. Returns [0] only if the
stream is closed. *)
method close : unit -> unit
(** Close the input. Must be idempotent. *)
end
let create ?(close = ignore) ~input () : t =
object
method close = close
method input = input
end
let empty : t =
object
method close () = ()
method input _ _ _ = 0
end
let of_bytes ?(off = 0) ?len (b : bytes) : t =
(* i: current position in [b] *)
let i = ref off in
let len =
match len with
| Some n ->
if n > Bytes.length b - off then invalid_arg "Iostream.In.of_bytes";
n
| None -> Bytes.length b - off
in
let end_ = off + len in
object
method input b_out i_out len_out =
let n = min (end_ - !i) len_out in
Bytes.blit b !i b_out i_out n;
i := !i + n;
n
method close () = i := end_
end
let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s)
(** Read into the given slice.
@return the number of bytes read, [0] means end of input. *)
let[@inline] input (self : #t) buf i len = self#input buf i len
(** Close the channel. *)
let[@inline] close self : unit = self#close ()
let rec really_input (self : #t) buf i len =
if len > 0 then (
let n = input self buf i len in
if n = 0 then raise End_of_file;
(really_input [@tailrec]) self buf (i + n) (len - n)
)
let really_input_string self n : string =
let buf = Bytes.create n in
really_input self buf 0 n;
Bytes.unsafe_to_string buf
let copy_into ?(buf = Bytes.create _default_buf_size) (ic : #t) (oc : IO_out.t)
: unit =
let continue = ref true in
while !continue do
let len = input ic buf 0 (Bytes.length buf) in
if len = 0 then
continue := false
else
IO_out.output oc buf 0 len
done
let concat (l0 : t list) : t =
let l = ref l0 in
let rec input b i len : int =
match !l with
| [] -> 0
| ic :: tl ->
let n = ic#input b i len in
if n > 0 then
n
else (
l := tl;
input b i len
)
in
let close () = List.iter close l0 in
create ~close ~input ()
let input_all ?(buf = Bytes.create 128) (self : #t) : string =
let buf = ref buf in
let i = ref 0 in
let[@inline] full_ () = !i = Bytes.length !buf in
let grow_ () =
let old_size = Bytes.length !buf in
let new_size = min Sys.max_string_length (old_size + (old_size / 4) + 10) in
if old_size = new_size then
failwith "input_all: maximum input size exceeded";
let new_buf = Bytes.extend !buf 0 (new_size - old_size) in
buf := new_buf
in
let rec loop () =
if full_ () then grow_ ();
let available = Bytes.length !buf - !i in
let n = input self !buf !i available in
if n > 0 then (
i := !i + n;
(loop [@tailrec]) ()
)
in
loop ();
if full_ () then
Bytes.unsafe_to_string !buf
else
Bytes.sub_string !buf 0 !i
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size)
(fd : Unix.file_descr) : t =
let buf_len = ref 0 in
let buf_off = ref 0 in
let refill () =
buf_off := 0;
buf_len := IO_unix.read fd buf 0 (Bytes.length buf)
in
object
method input b i len : int =
if !buf_len = 0 then refill ();
let n = min len !buf_len in
if n > 0 then (
Bytes.blit buf !buf_off b i n;
buf_off := !buf_off + n;
buf_len := !buf_len - n
);
n
method close () =
if close_noerr then (
try Unix.close fd with _ -> ()
) else
Unix.close fd
end

119
src/unix/IO_out.ml Normal file
View file

@ -0,0 +1,119 @@
open Common_
class type t =
object
method output_char : char -> unit
method output : bytes -> int -> int -> unit
method flush : unit -> unit
method close : unit -> unit
end
let create ?(flush = ignore) ?(close = ignore) ~output_char ~output () : t =
object
method flush () = flush ()
method close () = close ()
method output_char c = output_char c
method output bs i len = output bs i len
end
let dummy : t =
object
method flush () = ()
method close () = ()
method output_char _ = ()
method output _ _ _ = ()
end
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) fd
: t =
let buf_off = ref 0 in
let[@inline] is_full () = !buf_off = Bytes.length buf in
let flush () =
if !buf_off > 0 then (
IO_unix.write fd buf 0 !buf_off;
buf_off := 0
)
in
object
method output_char c =
if is_full () then flush ();
Bytes.set buf !buf_off c;
incr buf_off
method output bs i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
(* make space *)
if is_full () then flush ();
let n = min !len (Bytes.length buf - !buf_off) in
Bytes.blit bs !i buf !buf_off n;
buf_off := !buf_off + n;
i := !i + n;
len := !len - n
done;
(* if full, write eagerly *)
if is_full () then flush ()
method close () =
if close_noerr then (
try
flush ();
Unix.close fd
with _ -> ()
) else (
flush ();
Unix.close fd
)
method flush = flush
end
let of_buffer (buf : Buffer.t) : t =
object
method close () = ()
method flush () = ()
method output_char c = Buffer.add_char buf c
method output bs i len = Buffer.add_subbytes buf bs i len
end
(** Output the buffer slice into this channel *)
let[@inline] output_char (self : #t) c : unit = self#output_char c
(** Output the buffer slice into this channel *)
let[@inline] output (self : #t) buf i len : unit = self#output buf i len
let[@inline] output_string (self : #t) (str : string) : unit =
self#output (Bytes.unsafe_of_string str) 0 (String.length str)
let output_line (self : #t) (str : string) : unit =
output_string self str;
output_char self '\n'
(** Close the channel. *)
let[@inline] close self : unit = self#close ()
(** Flush (ie. force write) any buffered bytes. *)
let[@inline] flush self : unit = self#flush ()
let output_int self i =
let s = string_of_int i in
output_string self s
let output_lines self seq = Seq.iter (output_line self) seq
let tee (l : t list) : t =
match l with
| [] -> dummy
| [ oc ] -> oc
| _ ->
let output bs i len = List.iter (fun oc -> output oc bs i len) l in
let output_char c = List.iter (fun oc -> output_char oc c) l in
let close () = List.iter close l in
let flush () = List.iter flush l in
create ~flush ~close ~output ~output_char ()

52
src/unix/IO_unix.ml Normal file
View file

@ -0,0 +1,52 @@
open Common_
type file_descr = Unix.file_descr
let rec read fd buf i len : int =
if len = 0 then
0
else (
match Unix.read fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
(* wait for FD to be ready *)
let cancel = Cancel_handle.create () in
Moonpool.Private.Suspend_.suspend
{
handle =
(fun ~ls ~run:_ ~resume sus ->
Ev_loop.wait_readable fd cancel (fun cancel ->
resume ~ls sus @@ Ok ();
Cancel_handle.cancel cancel));
};
read fd buf i len
| n -> n
)
let rec write_once fd buf i len : int =
if len = 0 then
0
else (
match Unix.write fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
(* wait for FD to be ready *)
let cancel = Cancel_handle.create () in
Moonpool.Private.Suspend_.suspend
{
handle =
(fun ~ls ~run:_ ~resume sus ->
Ev_loop.wait_writable fd cancel (fun cancel ->
resume ~ls sus @@ Ok ();
Cancel_handle.cancel cancel));
};
write_once fd buf i len
| n -> n
)
let write fd buf i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
let n = write_once fd buf !i !len in
i := !i + n;
len := !len - n
done

42
src/unix/cancel_handle.ml Normal file
View file

@ -0,0 +1,42 @@
(** Cancellation handle. *)
open Common_
type state =
| Cancelled
| Waiting of { waiters: (unit -> unit) list }
type t = { st: state A.t } [@@unboxed]
let create () : t = { st = A.make (Waiting { waiters = [] }) }
let create_with f : t = { st = A.make (Waiting { waiters = [ f ] }) }
let cancel (self : t) =
while
let old_st = A.get self.st in
match old_st with
| Cancelled -> false
| Waiting { waiters } ->
if A.compare_and_set self.st old_st Cancelled then (
List.iter (fun f -> f ()) waiters;
false
) else
true
do
()
done
let on_cancel (self : t) f : unit =
while
let old_st = A.get self.st in
match old_st with
| Cancelled ->
f ();
false
| Waiting { waiters = l } ->
not (A.compare_and_set self.st old_st (Waiting { waiters = f :: l }))
do
()
done
let dummy = { st = A.make Cancelled }

View file

@ -0,0 +1,12 @@
type t
(** A handle to cancel atomic actions (waiting on something), or
stopping a subscription to some event. *)
val create : unit -> t
val create_with : (unit -> unit) -> t
val on_cancel : t -> (unit -> unit) -> unit
val cancel : t -> unit
(** Perform the cancellation. This should be idempotent. *)
val dummy : t

9
src/unix/common_.ml Normal file
View file

@ -0,0 +1,9 @@
module M = Moonpool
module Exn_bt = M.Exn_bt
module A = Moonpool.Atomic
module Fiber = Moonpool_fib.Fiber
module Tracing_ = Moonpool.Private.Tracing_
let ( let@ ) = ( @@ )
let spf = Printf.sprintf
let _default_buf_size = 4 * 1024

12
src/unix/dune Normal file
View file

@ -0,0 +1,12 @@
(library
(name moonpool_unix)
(public_name moonpool.unix)
(optional)
(synopsis "Simple Unix-based event loop for moonpool")
(private_modules common_)
(libraries moonpool moonpool.fib unix
(select time.ml from
(mtime mtime.os.clock -> time.mtime.ml)
(-> time.unix.ml))
))

291
src/unix/ev_loop.ml Normal file
View file

@ -0,0 +1,291 @@
open Common_
(** Action scheduled from outside the loop *)
module Action = struct
type cb = Cancel_handle.t -> unit
(** Action that we ask the lwt loop to perform, from the outside *)
type t =
| Wait_readable of Unix.file_descr * cb * Cancel_handle.t
| Wait_writable of Unix.file_descr * cb * Cancel_handle.t
| Run_after_s of float * cb * Cancel_handle.t
| Run_every_s of float * cb * Cancel_handle.t
end
(** Thread-safe queue of actions *)
module Action_queue = struct
type t = { q: Action.t list Atomic.t } [@@unboxed]
let create () : t = { q = Atomic.make [] }
let pop_all (self : t) : _ list = Atomic.exchange self.q []
(** Push the action and return whether the queue was previously empty *)
let push (self : t) (a : Action.t) : bool =
let is_first = ref true in
while
let old = Atomic.get self.q in
if Atomic.compare_and_set self.q old (a :: old) then (
is_first := old = [];
false
) else
true
do
()
done;
!is_first
end
type io_mode =
| Read
| Write
module IO_wait = struct
type t = {
mutable active: bool;
f: Cancel_handle.t -> unit;
as_cancel_handle: Cancel_handle.t;
}
(** A single event, waiting on a unix FD *)
let make cancel f : t =
let self = { active = true; f; as_cancel_handle = cancel } in
Cancel_handle.on_cancel cancel (fun () -> self.active <- false);
self
end
module Per_fd = struct
type t = {
fd: Unix.file_descr;
mutable reads: IO_wait.t list;
mutable writes: IO_wait.t list;
}
let[@inline] is_empty self = self.reads = [] && self.writes = []
end
(** Keep track of the subscriptions to channels *)
module IO_tbl = struct
type t = {
mutable n_read: int;
mutable n_write: int;
tbl: (Unix.file_descr, Per_fd.t) Hashtbl.t;
}
let create () : t = { tbl = Hashtbl.create 32; n_read = 0; n_write = 0 }
let get_or_create (self : t) fd : Per_fd.t =
try Hashtbl.find self.tbl fd
with Not_found ->
let per_fd = { Per_fd.fd; reads = []; writes = [] } in
Hashtbl.add self.tbl fd per_fd;
per_fd
let add_io_wait (self : t) fd mode (ev : IO_wait.t) =
let per_fd = get_or_create self fd in
match mode with
| Read ->
self.n_read <- 1 + self.n_read;
per_fd.reads <- ev :: per_fd.reads
| Write ->
self.n_write <- 1 + self.n_write;
per_fd.writes <- ev :: per_fd.writes
let prepare_select (self : t) =
let reads = ref [] in
let writes = ref [] in
Hashtbl.iter
(fun _ (per_fd : Per_fd.t) ->
if Per_fd.is_empty per_fd then
Hashtbl.remove self.tbl per_fd.fd
else (
if per_fd.reads <> [] then reads := per_fd.fd :: !reads;
if per_fd.writes <> [] then writes := per_fd.fd :: !writes
))
self.tbl;
!reads, !writes
let trigger_waiter (io : IO_wait.t) =
if io.active then io.f io.as_cancel_handle
let handle_ready ~ignore_read (self : t) (reads : Unix.file_descr list)
(writes : Unix.file_descr list) : unit =
List.iter
(fun fd ->
if fd <> ignore_read then (
let per_fd = Hashtbl.find self.tbl fd in
List.iter trigger_waiter per_fd.reads;
self.n_read <- self.n_read - List.length per_fd.reads;
per_fd.reads <- []
))
reads;
List.iter
(fun fd ->
let per_fd = Hashtbl.find self.tbl fd in
List.iter trigger_waiter per_fd.writes;
self.n_write <- self.n_write - List.length per_fd.writes;
per_fd.writes <- [])
writes;
()
end
let run_timer_ (t : Timer.t) =
let rec loop () =
match Timer.next t with
| Timer.Empty -> None
| Timer.Run (f, ev_h) ->
f ev_h;
loop ()
| Timer.Wait f ->
if f > 0. then
Some f
else
None
in
loop ()
module Ev_loop = struct
type t = {
timer: Timer.t;
actions: Action_queue.t;
io_tbl: IO_tbl.t; (** Used for select *)
in_blocking_section: bool A.t;
(** Is the ev loop thread currently waiting? *)
pipe_read: Unix.file_descr; (** Main thread only *)
pipe_write: Unix.file_descr; (** Wakeup main thread *)
}
let create () : t =
let pipe_read, pipe_write = Unix.pipe ~cloexec:true () in
Unix.set_nonblock pipe_read;
Unix.set_nonblock pipe_write;
{
timer = Timer.create ();
io_tbl = IO_tbl.create ();
in_blocking_section = A.make false;
actions = Action_queue.create ();
pipe_read;
pipe_write;
}
(** Perform the action from within the ev loop thread *)
let perform_action (self : t) (act : Action.t) : unit =
match act with
| Wait_readable (fd, cb, cancel) ->
IO_tbl.add_io_wait self.io_tbl fd Read (IO_wait.make cancel cb)
| Wait_writable (fd, cb, cancel) ->
IO_tbl.add_io_wait self.io_tbl fd Write (IO_wait.make cancel cb)
| Run_after_s (delay, cb, cancel) ->
Timer.run_after_s self.timer delay cancel cb
| Run_every_s (delay, cb, cancel) ->
Timer.run_every_s self.timer delay cancel cb
(** Gets the current set of notifications and perform them from inside the
ev loop thread *)
let perform_pending_actions (self : t) : unit =
let l = Action_queue.pop_all self.actions in
List.iter (perform_action self) l
(** Empty the pipe *)
let drain_pipe_ (self : t) : unit =
let b = Bytes.create 1 in
try
let continue = ref true in
while !continue do
let n = Unix.read self.pipe_read b 0 1 in
if n = 0 then continue := false
done
with Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> ()
let run_step_ (self : t) : unit =
perform_pending_actions self;
let delay = run_timer_ self.timer in
let delay = Option.value delay ~default:10. in
(* run [select] *)
let reads, writes = IO_tbl.prepare_select self.io_tbl in
A.set self.in_blocking_section true;
let reads, writes, _ =
Unix.select (self.pipe_read :: reads) writes [] delay
in
A.set self.in_blocking_section false;
drain_pipe_ self;
IO_tbl.handle_ready ~ignore_read:self.pipe_read self.io_tbl reads writes;
perform_pending_actions self;
()
end
(* ### global ev loop *)
let current_ : Ev_loop.t option A.t = A.make None
let rec set_as_current_ (ev : Ev_loop.t) : bool =
match A.get current_ with
| Some _ -> false
| None ->
if A.compare_and_set current_ None (Some ev) then
true
else
set_as_current_ ev
let with_loop ~runner f =
let@ _sp = Tracing_.with_span "Moonpool_unix.main" in
let ev_loop = Ev_loop.create () in
if not (set_as_current_ ev_loop) then
failwith "Moonpool_unix: the event loop is already active";
(* schedule [f] on the pool *)
let fib = Fiber.spawn_top ~name:"Moonpool_unix.main-fiber" ~on:runner f in
while not (Fiber.is_done fib) do
Ev_loop.run_step_ ev_loop
done;
A.set current_ None;
(* return result of [fib] *)
Moonpool.Fut.get_or_fail_exn @@ Fiber.res fib
(* ### external inputs *)
let[@inline] get_current_ () =
match A.get current_ with
| None -> failwith "Moonpool_unix: event loop is not active"
| Some ev -> ev
let interrupt_if_in_blocking_section_ (self : Ev_loop.t) =
if A.get self.in_blocking_section then (
let b = Bytes.create 1 in
ignore (Unix.write self.pipe_write b 0 1 : int)
)
let wait_readable fd cancel f : unit =
let ev_loop = get_current_ () in
let first =
Action_queue.push ev_loop.actions (Action.Wait_readable (fd, f, cancel))
in
if first then interrupt_if_in_blocking_section_ ev_loop
let wait_writable fd cancel f : unit =
let ev_loop = get_current_ () in
let first =
Action_queue.push ev_loop.actions (Action.Wait_writable (fd, f, cancel))
in
if first then interrupt_if_in_blocking_section_ ev_loop
let run_after_s delay cancel f : unit =
let ev_loop = get_current_ () in
let first =
Action_queue.push ev_loop.actions (Action.Run_after_s (delay, f, cancel))
in
if first then interrupt_if_in_blocking_section_ ev_loop
let run_every_s delay cancel f : unit =
let ev_loop = get_current_ () in
let first =
Action_queue.push ev_loop.actions (Action.Run_every_s (delay, f, cancel))
in
if first then interrupt_if_in_blocking_section_ ev_loop

15
src/unix/ev_loop.mli Normal file
View file

@ -0,0 +1,15 @@
(** Event loop *)
val wait_readable :
Unix.file_descr -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit
val wait_writable :
Unix.file_descr -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit
val run_after_s : float -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit
val run_every_s : float -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit
val with_loop : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a
(** Run with the event loop processed in the current thread. There can
only be one such loop running at a time.
@raise Failure if another call to {!with_loop} is already in effect. *)

134
src/unix/heap.ml Normal file
View file

@ -0,0 +1,134 @@
module type PARTIAL_ORD = sig
type t
val leq : t -> t -> bool
(** [leq x y] shall return [true] iff [x] is lower or equal to [y]. *)
end
module type S = sig
type elt
type t
val empty : t
(** [empty] returns the empty heap. *)
val is_empty : t -> bool
(** [is_empty h] returns [true] if the heap [h] is empty. *)
exception Empty
val merge : t -> t -> t
(** [merge h1 h2] merges the two heaps [h1] and [h2]. *)
val insert : elt -> t -> t
(** [insert x h] inserts an element [x] into the heap [h]. *)
val find_min : t -> elt option
(** [find_min h] find the minimal element of the heap [h]. *)
val find_min_exn : t -> elt
(** [find_min_exn h] is like {!find_min} but can fail.
@raise Empty if the heap is empty. *)
val take : t -> (t * elt) option
(** [take h] extracts and returns the minimum element, and the new heap (without
this element), or [None] if the heap [h] is empty. *)
val take_exn : t -> t * elt
(** [take_exn h] is like {!take}, but can fail.
@raise Empty if the heap is empty. *)
val delete_one : (elt -> elt -> bool) -> elt -> t -> t
(** [delete_one eq x h] uses [eq] to find one occurrence of a value [x]
if it exist in the heap [h], and delete it.
If [h] do not contain [x] then it return [h].
@since 2.0 *)
val size : t -> int
end
module Make (E : PARTIAL_ORD) : S with type elt = E.t = struct
type elt = E.t
type t =
| E
| N of int * elt * t * t
let empty = E
let is_empty = function
| E -> true
| N _ -> false
exception Empty
(* Rank of the tree *)
let _rank = function
| E -> 0
| N (r, _, _, _) -> r
(* Make a balanced node labelled with [x], and subtrees [a] and [b].
We ensure that the right child's rank is to the rank of the
left child (leftist property). The rank of the resulting node
is the length of the rightmost path. *)
let _make_node x a b =
if _rank a >= _rank b then
N (_rank b + 1, x, a, b)
else
N (_rank a + 1, x, b, a)
let rec merge t1 t2 =
match t1, t2 with
| t, E -> t
| E, t -> t
| N (_, x, a1, b1), N (_, y, a2, b2) ->
if E.leq x y then
_make_node x a1 (merge b1 t2)
else
_make_node y a2 (merge t1 b2)
let insert x h = merge (N (1, x, E, E)) h
let find_min_exn = function
| E -> raise Empty
| N (_, x, _, _) -> x
let find_min = function
| E -> None
| N (_, x, _, _) -> Some x
let take = function
| E -> None
| N (_, x, l, r) -> Some (merge l r, x)
let take_exn = function
| E -> raise Empty
| N (_, x, l, r) -> merge l r, x
let delete_one eq x h =
let rec aux = function
| E -> false, E
| N (_, y, l, r) as h ->
if eq x y then
true, merge l r
else if E.leq y x then (
let found_left, l1 = aux l in
let found, r1 =
if found_left then
true, r
else
aux r
in
if found then
true, _make_node y l1 r1
else
false, h
) else
false, h
in
snd (aux h)
let rec size = function
| E -> 0
| N (_, _, l, r) -> 1 + size l + size r
end

54
src/unix/heap.mli Normal file
View file

@ -0,0 +1,54 @@
(** Leftist Heaps
Implementation following Okasaki's book. *)
module type PARTIAL_ORD = sig
type t
val leq : t -> t -> bool
(** [leq x y] shall return [true] iff [x] is lower or equal to [y]. *)
end
module type S = sig
type elt
type t
val empty : t
(** [empty] returns the empty heap. *)
val is_empty : t -> bool
(** [is_empty h] returns [true] if the heap [h] is empty. *)
exception Empty
val merge : t -> t -> t
(** [merge h1 h2] merges the two heaps [h1] and [h2]. *)
val insert : elt -> t -> t
(** [insert x h] inserts an element [x] into the heap [h]. *)
val find_min : t -> elt option
(** [find_min h] find the minimal element of the heap [h]. *)
val find_min_exn : t -> elt
(** [find_min_exn h] is like {!find_min} but can fail.
@raise Empty if the heap is empty. *)
val take : t -> (t * elt) option
(** [take h] extracts and returns the minimum element, and the new heap (without
this element), or [None] if the heap [h] is empty. *)
val take_exn : t -> t * elt
(** [take_exn h] is like {!take}, but can fail.
@raise Empty if the heap is empty. *)
val delete_one : (elt -> elt -> bool) -> elt -> t -> t
(** [delete_one eq x h] uses [eq] to find one occurrence of a value [x]
if it exist in the heap [h], and delete it.
If [h] do not contain [x] then it return [h].
@since 2.0 *)
val size : t -> int
end
module Make (E : PARTIAL_ORD) : S with type elt = E.t

61
src/unix/moonpool_unix.ml Normal file
View file

@ -0,0 +1,61 @@
(*
(** Unix-compatible event loop *)
class type event_loop =
object
method one_step : block:bool -> unit -> unit
(** Run one step of the event loop.
@param block if [true], the call might block until the next timeout
or until the next IO event occurs. If [false], this does not
block and returns after having processed the available events. *)
method on_readable :
Unix.file_descr -> (Cancel_handle.t -> unit) -> Cancel_handle.t
(** [on_readable fd f] creates a new event [ev], and will run [f ev] when
[fd] becomes readable *)
method on_writable :
Unix.file_descr -> (Cancel_handle.t -> unit) -> Cancel_handle.t
method on_timer :
float -> repeat:bool -> (Cancel_handle.t -> unit) -> Cancel_handle.t
(** [on_timer delay ~repeat f] runs [f] after [delay].
@param repeat if true runs [f] every [delay] seconds *)
method fake_io : Unix.file_descr -> unit
(** Simulate activity on the FD *)
method interrupt_if_in_blocking_section : unit
(** If run from inside the event loop when it's waiting, wakes the event loop up *)
method has_pending_tasks : bool
end
(* TODO: for lwt backend:
let has_pending_tasks (self : #t) : bool =
self#readable_count > 0 || self#writable_count > 0 || self#timer_count > 0
method readable_count : int
(** Number of events waiting for FDs to be readable FDs *)
method writable_count : int
method timer_count : int
(** Number of events waiting on a timer *)
let readable_count (self : #t) = self#readable_count
let writable_count (self : #t) = self#writable_count
let timer_count (self : #t) = self#timer_count
*)
let[@inline] one_step (self : #t) ~block () = self#one_step ~block ()
let[@inline] on_readable (self : #t) fd f = self#on_readable fd f
let[@inline] on_writable (self : #t) fd f = self#on_writable fd f
let[@inline] on_timer (self : #t) delay ~repeat f =
self#on_timer delay ~repeat f
let[@inline] fake_io (self : #t) fd = self#fake_io fd
let[@inline] has_pending_tasks (self : #t) = self#has_pending_tasks
let[@inline] interrupt_if_in_blocking_section (self : #t) : unit =
self#interrupt_if_in_blocking_section
*)

2
src/unix/time.mli Normal file
View file

@ -0,0 +1,2 @@
val time_s : unit -> float
val time_ns : unit -> int64

10
src/unix/time.mtime.ml Normal file
View file

@ -0,0 +1,10 @@
let time_ns : unit -> int64 = Mtime_clock.now_ns
(** Monotonic time in seconds *)
let time_s () : float =
let ns = time_ns () in
let s = Int64.(div ns 1_000_000_000L) in
let ns' = Int64.(rem ns 1_000_000_000L) in
Int64.to_float s +. (Int64.to_float ns' /. 1e9)

2
src/unix/time.unix.ml Normal file
View file

@ -0,0 +1,2 @@
let time_s = Unix.gettimeofday
let[@inline] time_ns () = Int64.of_float (floor (time_s () *. 1e9))

94
src/unix/timer.ml Normal file
View file

@ -0,0 +1,94 @@
type instant_s = float
type duration_s = float
type kind =
| Once
| Every of duration_s
type task = {
mutable deadline: instant_s;
mutable active: bool;
f: Cancel_handle.t -> unit;
as_cancel_handle: Cancel_handle.t;
kind: kind;
}
module Task_heap = Heap.Make (struct
type t = task
let[@inline] leq t1 t2 = t1.deadline <= t2.deadline
end)
type t = {
mutable tasks: Task_heap.t;
mutable n_tasks: int;
}
(** accepted time diff for actions.*)
let epsilon_s = 0.000_001
type tick_res =
| Wait of float
| Run of (Cancel_handle.t -> unit) * Cancel_handle.t
| Empty
let[@inline] has_tasks self = not (Task_heap.is_empty self.tasks)
let[@inline] num_tasks self : int = self.n_tasks
let[@inline] pop_task_ self : unit =
let tasks, _t = Task_heap.take_exn self.tasks in
self.n_tasks <- self.n_tasks - 1;
self.tasks <- tasks
let run_after_s self delay cancel f : unit =
let now = Time.time_s () in
let deadline = now +. delay in
let task =
{ deadline; f; kind = Once; active = true; as_cancel_handle = cancel }
in
self.tasks <- Task_heap.insert task self.tasks;
self.n_tasks <- 1 + self.n_tasks;
Cancel_handle.on_cancel cancel (fun () -> task.active <- false)
let run_every_s self delay cancel f : unit =
let now = Time.time_s () in
let deadline = now +. delay in
let task =
{
deadline;
f;
kind = Every delay;
active = true;
as_cancel_handle = cancel;
}
in
self.tasks <- Task_heap.insert task self.tasks;
self.n_tasks <- 1 + self.n_tasks;
Cancel_handle.on_cancel cancel (fun () -> task.active <- false)
let rec next (self : t) : tick_res =
match Task_heap.find_min self.tasks with
| None -> Empty
| Some task when not task.active ->
pop_task_ self;
next self
| Some task ->
let now = Time.time_s () in
let remaining_time_s = task.deadline -. now in
if remaining_time_s <= epsilon_s then (
pop_task_ self;
(match task.kind with
| Once -> ()
| Every dur ->
(* schedule the next iteration *)
task.deadline <- now +. dur;
self.n_tasks <- 1 + self.n_tasks;
self.tasks <- Task_heap.insert task self.tasks);
Run (task.f, task.as_cancel_handle)
) else
Wait remaining_time_s
let create () = { tasks = Task_heap.empty; n_tasks = 0 }

24
src/unix/timer.mli Normal file
View file

@ -0,0 +1,24 @@
type t
val create : unit -> t
(** A new timer. *)
type tick_res =
| Wait of float (** Wait for number of seconds *)
| Run of (Cancel_handle.t -> unit) * Cancel_handle.t
| Empty (** Next action to take *)
val next : t -> tick_res
(** Next action to take *)
val run_after_s :
t -> float -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit
val run_every_s :
t -> float -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit
val has_tasks : t -> bool
(** Does the timer contain anything? *)
val num_tasks : t -> int
(** Number of tasks in the timer *)