diff --git a/dune-project b/dune-project index 55cb93f1..aac71b8a 100644 --- a/dune-project +++ b/dune-project @@ -29,6 +29,7 @@ :with-test))) (depopts (trace (>= 0.6)) + (mtime (>= 2.0)) thread-local-storage) (tags (thread pool domain futures fork-join))) diff --git a/moonpool.opam b/moonpool.opam index c8afba80..eaf29738 100644 --- a/moonpool.opam +++ b/moonpool.opam @@ -20,6 +20,7 @@ depends: [ ] depopts: [ "trace" {>= "0.6"} + "mtime" {>= "2.0"} "thread-local-storage" ] build: [ diff --git a/src/unix/IO_in.ml b/src/unix/IO_in.ml new file mode 100644 index 00000000..09c607de --- /dev/null +++ b/src/unix/IO_in.ml @@ -0,0 +1,154 @@ +open Common_ + +class type t = + object + method input : bytes -> int -> int -> int + (** Read into the slice. Returns [0] only if the + stream is closed. *) + + method close : unit -> unit + (** Close the input. Must be idempotent. *) + end + +let create ?(close = ignore) ~input () : t = + object + method close = close + method input = input + end + +let empty : t = + object + method close () = () + method input _ _ _ = 0 + end + +let of_bytes ?(off = 0) ?len (b : bytes) : t = + (* i: current position in [b] *) + let i = ref off in + + let len = + match len with + | Some n -> + if n > Bytes.length b - off then invalid_arg "Iostream.In.of_bytes"; + n + | None -> Bytes.length b - off + in + let end_ = off + len in + + object + method input b_out i_out len_out = + let n = min (end_ - !i) len_out in + Bytes.blit b !i b_out i_out n; + i := !i + n; + n + + method close () = i := end_ + end + +let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s) + +(** Read into the given slice. + @return the number of bytes read, [0] means end of input. *) +let[@inline] input (self : #t) buf i len = self#input buf i len + +(** Close the channel. *) +let[@inline] close self : unit = self#close () + +let rec really_input (self : #t) buf i len = + if len > 0 then ( + let n = input self buf i len in + if n = 0 then raise End_of_file; + (really_input [@tailrec]) self buf (i + n) (len - n) + ) + +let really_input_string self n : string = + let buf = Bytes.create n in + really_input self buf 0 n; + Bytes.unsafe_to_string buf + +let copy_into ?(buf = Bytes.create _default_buf_size) (ic : #t) (oc : IO_out.t) + : unit = + let continue = ref true in + while !continue do + let len = input ic buf 0 (Bytes.length buf) in + if len = 0 then + continue := false + else + IO_out.output oc buf 0 len + done + +let concat (l0 : t list) : t = + let l = ref l0 in + let rec input b i len : int = + match !l with + | [] -> 0 + | ic :: tl -> + let n = ic#input b i len in + if n > 0 then + n + else ( + l := tl; + input b i len + ) + in + let close () = List.iter close l0 in + create ~close ~input () + +let input_all ?(buf = Bytes.create 128) (self : #t) : string = + let buf = ref buf in + let i = ref 0 in + + let[@inline] full_ () = !i = Bytes.length !buf in + + let grow_ () = + let old_size = Bytes.length !buf in + let new_size = min Sys.max_string_length (old_size + (old_size / 4) + 10) in + if old_size = new_size then + failwith "input_all: maximum input size exceeded"; + let new_buf = Bytes.extend !buf 0 (new_size - old_size) in + buf := new_buf + in + + let rec loop () = + if full_ () then grow_ (); + let available = Bytes.length !buf - !i in + let n = input self !buf !i available in + if n > 0 then ( + i := !i + n; + (loop [@tailrec]) () + ) + in + loop (); + + if full_ () then + Bytes.unsafe_to_string !buf + else + Bytes.sub_string !buf 0 !i + +let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) + (fd : Unix.file_descr) : t = + let buf_len = ref 0 in + let buf_off = ref 0 in + + let refill () = + buf_off := 0; + buf_len := IO_unix.read fd buf 0 (Bytes.length buf) + in + + object + method input b i len : int = + if !buf_len = 0 then refill (); + let n = min len !buf_len in + if n > 0 then ( + Bytes.blit buf !buf_off b i n; + buf_off := !buf_off + n; + buf_len := !buf_len - n + ); + n + + method close () = + if close_noerr then ( + try Unix.close fd with _ -> () + ) else + Unix.close fd + end diff --git a/src/unix/IO_out.ml b/src/unix/IO_out.ml new file mode 100644 index 00000000..ba772345 --- /dev/null +++ b/src/unix/IO_out.ml @@ -0,0 +1,119 @@ +open Common_ + +class type t = + object + method output_char : char -> unit + method output : bytes -> int -> int -> unit + method flush : unit -> unit + method close : unit -> unit + end + +let create ?(flush = ignore) ?(close = ignore) ~output_char ~output () : t = + object + method flush () = flush () + method close () = close () + method output_char c = output_char c + method output bs i len = output bs i len + end + +let dummy : t = + object + method flush () = () + method close () = () + method output_char _ = () + method output _ _ _ = () + end + +let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) fd + : t = + let buf_off = ref 0 in + + let[@inline] is_full () = !buf_off = Bytes.length buf in + + let flush () = + if !buf_off > 0 then ( + IO_unix.write fd buf 0 !buf_off; + buf_off := 0 + ) + in + + object + method output_char c = + if is_full () then flush (); + Bytes.set buf !buf_off c; + incr buf_off + + method output bs i len : unit = + let i = ref i in + let len = ref len in + + while !len > 0 do + (* make space *) + if is_full () then flush (); + + let n = min !len (Bytes.length buf - !buf_off) in + Bytes.blit bs !i buf !buf_off n; + buf_off := !buf_off + n; + i := !i + n; + len := !len - n + done; + (* if full, write eagerly *) + if is_full () then flush () + + method close () = + if close_noerr then ( + try + flush (); + Unix.close fd + with _ -> () + ) else ( + flush (); + Unix.close fd + ) + + method flush = flush + end + +let of_buffer (buf : Buffer.t) : t = + object + method close () = () + method flush () = () + method output_char c = Buffer.add_char buf c + method output bs i len = Buffer.add_subbytes buf bs i len + end + +(** Output the buffer slice into this channel *) +let[@inline] output_char (self : #t) c : unit = self#output_char c + +(** Output the buffer slice into this channel *) +let[@inline] output (self : #t) buf i len : unit = self#output buf i len + +let[@inline] output_string (self : #t) (str : string) : unit = + self#output (Bytes.unsafe_of_string str) 0 (String.length str) + +let output_line (self : #t) (str : string) : unit = + output_string self str; + output_char self '\n' + +(** Close the channel. *) +let[@inline] close self : unit = self#close () + +(** Flush (ie. force write) any buffered bytes. *) +let[@inline] flush self : unit = self#flush () + +let output_int self i = + let s = string_of_int i in + output_string self s + +let output_lines self seq = Seq.iter (output_line self) seq + +let tee (l : t list) : t = + match l with + | [] -> dummy + | [ oc ] -> oc + | _ -> + let output bs i len = List.iter (fun oc -> output oc bs i len) l in + let output_char c = List.iter (fun oc -> output_char oc c) l in + let close () = List.iter close l in + let flush () = List.iter flush l in + create ~flush ~close ~output ~output_char () diff --git a/src/unix/IO_unix.ml b/src/unix/IO_unix.ml new file mode 100644 index 00000000..989f381b --- /dev/null +++ b/src/unix/IO_unix.ml @@ -0,0 +1,52 @@ +open Common_ + +type file_descr = Unix.file_descr + +let rec read fd buf i len : int = + if len = 0 then + 0 + else ( + match Unix.read fd buf i len with + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + (* wait for FD to be ready *) + let cancel = Cancel_handle.create () in + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~ls ~run:_ ~resume sus -> + Ev_loop.wait_readable fd cancel (fun cancel -> + resume ~ls sus @@ Ok (); + Cancel_handle.cancel cancel)); + }; + read fd buf i len + | n -> n + ) + +let rec write_once fd buf i len : int = + if len = 0 then + 0 + else ( + match Unix.write fd buf i len with + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + (* wait for FD to be ready *) + let cancel = Cancel_handle.create () in + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~ls ~run:_ ~resume sus -> + Ev_loop.wait_writable fd cancel (fun cancel -> + resume ~ls sus @@ Ok (); + Cancel_handle.cancel cancel)); + }; + write_once fd buf i len + | n -> n + ) + +let write fd buf i len : unit = + let i = ref i in + let len = ref len in + while !len > 0 do + let n = write_once fd buf !i !len in + i := !i + n; + len := !len - n + done diff --git a/src/unix/cancel_handle.ml b/src/unix/cancel_handle.ml new file mode 100644 index 00000000..6911ca4c --- /dev/null +++ b/src/unix/cancel_handle.ml @@ -0,0 +1,42 @@ +(** Cancellation handle. *) + +open Common_ + +type state = + | Cancelled + | Waiting of { waiters: (unit -> unit) list } + +type t = { st: state A.t } [@@unboxed] + +let create () : t = { st = A.make (Waiting { waiters = [] }) } +let create_with f : t = { st = A.make (Waiting { waiters = [ f ] }) } + +let cancel (self : t) = + while + let old_st = A.get self.st in + match old_st with + | Cancelled -> false + | Waiting { waiters } -> + if A.compare_and_set self.st old_st Cancelled then ( + List.iter (fun f -> f ()) waiters; + false + ) else + true + do + () + done + +let on_cancel (self : t) f : unit = + while + let old_st = A.get self.st in + match old_st with + | Cancelled -> + f (); + false + | Waiting { waiters = l } -> + not (A.compare_and_set self.st old_st (Waiting { waiters = f :: l })) + do + () + done + +let dummy = { st = A.make Cancelled } diff --git a/src/unix/cancel_handle.mli b/src/unix/cancel_handle.mli new file mode 100644 index 00000000..7dbed0a4 --- /dev/null +++ b/src/unix/cancel_handle.mli @@ -0,0 +1,12 @@ +type t +(** A handle to cancel atomic actions (waiting on something), or + stopping a subscription to some event. *) + +val create : unit -> t +val create_with : (unit -> unit) -> t +val on_cancel : t -> (unit -> unit) -> unit + +val cancel : t -> unit +(** Perform the cancellation. This should be idempotent. *) + +val dummy : t diff --git a/src/unix/common_.ml b/src/unix/common_.ml new file mode 100644 index 00000000..8bb8b338 --- /dev/null +++ b/src/unix/common_.ml @@ -0,0 +1,9 @@ +module M = Moonpool +module Exn_bt = M.Exn_bt +module A = Moonpool.Atomic +module Fiber = Moonpool_fib.Fiber +module Tracing_ = Moonpool.Private.Tracing_ + +let ( let@ ) = ( @@ ) +let spf = Printf.sprintf +let _default_buf_size = 4 * 1024 diff --git a/src/unix/dune b/src/unix/dune new file mode 100644 index 00000000..c3eaa7cd --- /dev/null +++ b/src/unix/dune @@ -0,0 +1,12 @@ + +(library + (name moonpool_unix) + (public_name moonpool.unix) + (optional) + (synopsis "Simple Unix-based event loop for moonpool") + (private_modules common_) + (libraries moonpool moonpool.fib unix + (select time.ml from + (mtime mtime.os.clock -> time.mtime.ml) + (-> time.unix.ml)) + )) diff --git a/src/unix/ev_loop.ml b/src/unix/ev_loop.ml new file mode 100644 index 00000000..4b4932f4 --- /dev/null +++ b/src/unix/ev_loop.ml @@ -0,0 +1,291 @@ +open Common_ + +(** Action scheduled from outside the loop *) +module Action = struct + type cb = Cancel_handle.t -> unit + + (** Action that we ask the lwt loop to perform, from the outside *) + type t = + | Wait_readable of Unix.file_descr * cb * Cancel_handle.t + | Wait_writable of Unix.file_descr * cb * Cancel_handle.t + | Run_after_s of float * cb * Cancel_handle.t + | Run_every_s of float * cb * Cancel_handle.t +end + +(** Thread-safe queue of actions *) +module Action_queue = struct + type t = { q: Action.t list Atomic.t } [@@unboxed] + + let create () : t = { q = Atomic.make [] } + let pop_all (self : t) : _ list = Atomic.exchange self.q [] + + (** Push the action and return whether the queue was previously empty *) + let push (self : t) (a : Action.t) : bool = + let is_first = ref true in + while + let old = Atomic.get self.q in + if Atomic.compare_and_set self.q old (a :: old) then ( + is_first := old = []; + false + ) else + true + do + () + done; + !is_first +end + +type io_mode = + | Read + | Write + +module IO_wait = struct + type t = { + mutable active: bool; + f: Cancel_handle.t -> unit; + as_cancel_handle: Cancel_handle.t; + } + (** A single event, waiting on a unix FD *) + + let make cancel f : t = + let self = { active = true; f; as_cancel_handle = cancel } in + Cancel_handle.on_cancel cancel (fun () -> self.active <- false); + self +end + +module Per_fd = struct + type t = { + fd: Unix.file_descr; + mutable reads: IO_wait.t list; + mutable writes: IO_wait.t list; + } + + let[@inline] is_empty self = self.reads = [] && self.writes = [] +end + +(** Keep track of the subscriptions to channels *) +module IO_tbl = struct + type t = { + mutable n_read: int; + mutable n_write: int; + tbl: (Unix.file_descr, Per_fd.t) Hashtbl.t; + } + + let create () : t = { tbl = Hashtbl.create 32; n_read = 0; n_write = 0 } + + let get_or_create (self : t) fd : Per_fd.t = + try Hashtbl.find self.tbl fd + with Not_found -> + let per_fd = { Per_fd.fd; reads = []; writes = [] } in + Hashtbl.add self.tbl fd per_fd; + per_fd + + let add_io_wait (self : t) fd mode (ev : IO_wait.t) = + let per_fd = get_or_create self fd in + match mode with + | Read -> + self.n_read <- 1 + self.n_read; + per_fd.reads <- ev :: per_fd.reads + | Write -> + self.n_write <- 1 + self.n_write; + per_fd.writes <- ev :: per_fd.writes + + let prepare_select (self : t) = + let reads = ref [] in + let writes = ref [] in + Hashtbl.iter + (fun _ (per_fd : Per_fd.t) -> + if Per_fd.is_empty per_fd then + Hashtbl.remove self.tbl per_fd.fd + else ( + if per_fd.reads <> [] then reads := per_fd.fd :: !reads; + if per_fd.writes <> [] then writes := per_fd.fd :: !writes + )) + self.tbl; + !reads, !writes + + let trigger_waiter (io : IO_wait.t) = + if io.active then io.f io.as_cancel_handle + + let handle_ready ~ignore_read (self : t) (reads : Unix.file_descr list) + (writes : Unix.file_descr list) : unit = + List.iter + (fun fd -> + if fd <> ignore_read then ( + let per_fd = Hashtbl.find self.tbl fd in + List.iter trigger_waiter per_fd.reads; + self.n_read <- self.n_read - List.length per_fd.reads; + per_fd.reads <- [] + )) + reads; + + List.iter + (fun fd -> + let per_fd = Hashtbl.find self.tbl fd in + List.iter trigger_waiter per_fd.writes; + self.n_write <- self.n_write - List.length per_fd.writes; + per_fd.writes <- []) + writes; + () +end + +let run_timer_ (t : Timer.t) = + let rec loop () = + match Timer.next t with + | Timer.Empty -> None + | Timer.Run (f, ev_h) -> + f ev_h; + loop () + | Timer.Wait f -> + if f > 0. then + Some f + else + None + in + loop () + +module Ev_loop = struct + type t = { + timer: Timer.t; + actions: Action_queue.t; + io_tbl: IO_tbl.t; (** Used for select *) + in_blocking_section: bool A.t; + (** Is the ev loop thread currently waiting? *) + pipe_read: Unix.file_descr; (** Main thread only *) + pipe_write: Unix.file_descr; (** Wakeup main thread *) + } + + let create () : t = + let pipe_read, pipe_write = Unix.pipe ~cloexec:true () in + Unix.set_nonblock pipe_read; + Unix.set_nonblock pipe_write; + { + timer = Timer.create (); + io_tbl = IO_tbl.create (); + in_blocking_section = A.make false; + actions = Action_queue.create (); + pipe_read; + pipe_write; + } + + (** Perform the action from within the ev loop thread *) + let perform_action (self : t) (act : Action.t) : unit = + match act with + | Wait_readable (fd, cb, cancel) -> + IO_tbl.add_io_wait self.io_tbl fd Read (IO_wait.make cancel cb) + | Wait_writable (fd, cb, cancel) -> + IO_tbl.add_io_wait self.io_tbl fd Write (IO_wait.make cancel cb) + | Run_after_s (delay, cb, cancel) -> + Timer.run_after_s self.timer delay cancel cb + | Run_every_s (delay, cb, cancel) -> + Timer.run_every_s self.timer delay cancel cb + + (** Gets the current set of notifications and perform them from inside the + ev loop thread *) + let perform_pending_actions (self : t) : unit = + let l = Action_queue.pop_all self.actions in + List.iter (perform_action self) l + + (** Empty the pipe *) + let drain_pipe_ (self : t) : unit = + let b = Bytes.create 1 in + try + let continue = ref true in + while !continue do + let n = Unix.read self.pipe_read b 0 1 in + if n = 0 then continue := false + done + with Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> () + + let run_step_ (self : t) : unit = + perform_pending_actions self; + + let delay = run_timer_ self.timer in + let delay = Option.value delay ~default:10. in + + (* run [select] *) + let reads, writes = IO_tbl.prepare_select self.io_tbl in + A.set self.in_blocking_section true; + let reads, writes, _ = + Unix.select (self.pipe_read :: reads) writes [] delay + in + A.set self.in_blocking_section false; + + drain_pipe_ self; + IO_tbl.handle_ready ~ignore_read:self.pipe_read self.io_tbl reads writes; + + perform_pending_actions self; + () +end + +(* ### global ev loop *) + +let current_ : Ev_loop.t option A.t = A.make None + +let rec set_as_current_ (ev : Ev_loop.t) : bool = + match A.get current_ with + | Some _ -> false + | None -> + if A.compare_and_set current_ None (Some ev) then + true + else + set_as_current_ ev + +let with_loop ~runner f = + let@ _sp = Tracing_.with_span "Moonpool_unix.main" in + + let ev_loop = Ev_loop.create () in + if not (set_as_current_ ev_loop) then + failwith "Moonpool_unix: the event loop is already active"; + + (* schedule [f] on the pool *) + let fib = Fiber.spawn_top ~name:"Moonpool_unix.main-fiber" ~on:runner f in + + while not (Fiber.is_done fib) do + Ev_loop.run_step_ ev_loop + done; + A.set current_ None; + + (* return result of [fib] *) + Moonpool.Fut.get_or_fail_exn @@ Fiber.res fib + +(* ### external inputs *) + +let[@inline] get_current_ () = + match A.get current_ with + | None -> failwith "Moonpool_unix: event loop is not active" + | Some ev -> ev + +let interrupt_if_in_blocking_section_ (self : Ev_loop.t) = + if A.get self.in_blocking_section then ( + let b = Bytes.create 1 in + ignore (Unix.write self.pipe_write b 0 1 : int) + ) + +let wait_readable fd cancel f : unit = + let ev_loop = get_current_ () in + let first = + Action_queue.push ev_loop.actions (Action.Wait_readable (fd, f, cancel)) + in + if first then interrupt_if_in_blocking_section_ ev_loop + +let wait_writable fd cancel f : unit = + let ev_loop = get_current_ () in + let first = + Action_queue.push ev_loop.actions (Action.Wait_writable (fd, f, cancel)) + in + if first then interrupt_if_in_blocking_section_ ev_loop + +let run_after_s delay cancel f : unit = + let ev_loop = get_current_ () in + let first = + Action_queue.push ev_loop.actions (Action.Run_after_s (delay, f, cancel)) + in + if first then interrupt_if_in_blocking_section_ ev_loop + +let run_every_s delay cancel f : unit = + let ev_loop = get_current_ () in + let first = + Action_queue.push ev_loop.actions (Action.Run_every_s (delay, f, cancel)) + in + if first then interrupt_if_in_blocking_section_ ev_loop diff --git a/src/unix/ev_loop.mli b/src/unix/ev_loop.mli new file mode 100644 index 00000000..23fee7c4 --- /dev/null +++ b/src/unix/ev_loop.mli @@ -0,0 +1,15 @@ +(** Event loop *) + +val wait_readable : + Unix.file_descr -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit + +val wait_writable : + Unix.file_descr -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit + +val run_after_s : float -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit +val run_every_s : float -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit + +val with_loop : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a +(** Run with the event loop processed in the current thread. There can + only be one such loop running at a time. + @raise Failure if another call to {!with_loop} is already in effect. *) diff --git a/src/unix/heap.ml b/src/unix/heap.ml new file mode 100644 index 00000000..83f55c64 --- /dev/null +++ b/src/unix/heap.ml @@ -0,0 +1,134 @@ +module type PARTIAL_ORD = sig + type t + + val leq : t -> t -> bool + (** [leq x y] shall return [true] iff [x] is lower or equal to [y]. *) +end + +module type S = sig + type elt + type t + + val empty : t + (** [empty] returns the empty heap. *) + + val is_empty : t -> bool + (** [is_empty h] returns [true] if the heap [h] is empty. *) + + exception Empty + + val merge : t -> t -> t + (** [merge h1 h2] merges the two heaps [h1] and [h2]. *) + + val insert : elt -> t -> t + (** [insert x h] inserts an element [x] into the heap [h]. *) + + val find_min : t -> elt option + (** [find_min h] find the minimal element of the heap [h]. *) + + val find_min_exn : t -> elt + (** [find_min_exn h] is like {!find_min} but can fail. + @raise Empty if the heap is empty. *) + + val take : t -> (t * elt) option + (** [take h] extracts and returns the minimum element, and the new heap (without + this element), or [None] if the heap [h] is empty. *) + + val take_exn : t -> t * elt + (** [take_exn h] is like {!take}, but can fail. + @raise Empty if the heap is empty. *) + + val delete_one : (elt -> elt -> bool) -> elt -> t -> t + (** [delete_one eq x h] uses [eq] to find one occurrence of a value [x] + if it exist in the heap [h], and delete it. + If [h] do not contain [x] then it return [h]. + @since 2.0 *) + + val size : t -> int +end + +module Make (E : PARTIAL_ORD) : S with type elt = E.t = struct + type elt = E.t + + type t = + | E + | N of int * elt * t * t + + let empty = E + + let is_empty = function + | E -> true + | N _ -> false + + exception Empty + + (* Rank of the tree *) + let _rank = function + | E -> 0 + | N (r, _, _, _) -> r + + (* Make a balanced node labelled with [x], and subtrees [a] and [b]. + We ensure that the right child's rank is ≤ to the rank of the + left child (leftist property). The rank of the resulting node + is the length of the rightmost path. *) + let _make_node x a b = + if _rank a >= _rank b then + N (_rank b + 1, x, a, b) + else + N (_rank a + 1, x, b, a) + + let rec merge t1 t2 = + match t1, t2 with + | t, E -> t + | E, t -> t + | N (_, x, a1, b1), N (_, y, a2, b2) -> + if E.leq x y then + _make_node x a1 (merge b1 t2) + else + _make_node y a2 (merge t1 b2) + + let insert x h = merge (N (1, x, E, E)) h + + let find_min_exn = function + | E -> raise Empty + | N (_, x, _, _) -> x + + let find_min = function + | E -> None + | N (_, x, _, _) -> Some x + + let take = function + | E -> None + | N (_, x, l, r) -> Some (merge l r, x) + + let take_exn = function + | E -> raise Empty + | N (_, x, l, r) -> merge l r, x + + let delete_one eq x h = + let rec aux = function + | E -> false, E + | N (_, y, l, r) as h -> + if eq x y then + true, merge l r + else if E.leq y x then ( + let found_left, l1 = aux l in + let found, r1 = + if found_left then + true, r + else + aux r + in + if found then + true, _make_node y l1 r1 + else + false, h + ) else + false, h + in + snd (aux h) + + let rec size = function + | E -> 0 + | N (_, _, l, r) -> 1 + size l + size r +end diff --git a/src/unix/heap.mli b/src/unix/heap.mli new file mode 100644 index 00000000..4e635f73 --- /dev/null +++ b/src/unix/heap.mli @@ -0,0 +1,54 @@ +(** Leftist Heaps + + Implementation following Okasaki's book. *) + +module type PARTIAL_ORD = sig + type t + + val leq : t -> t -> bool + (** [leq x y] shall return [true] iff [x] is lower or equal to [y]. *) +end + +module type S = sig + type elt + type t + + val empty : t + (** [empty] returns the empty heap. *) + + val is_empty : t -> bool + (** [is_empty h] returns [true] if the heap [h] is empty. *) + + exception Empty + + val merge : t -> t -> t + (** [merge h1 h2] merges the two heaps [h1] and [h2]. *) + + val insert : elt -> t -> t + (** [insert x h] inserts an element [x] into the heap [h]. *) + + val find_min : t -> elt option + (** [find_min h] find the minimal element of the heap [h]. *) + + val find_min_exn : t -> elt + (** [find_min_exn h] is like {!find_min} but can fail. + @raise Empty if the heap is empty. *) + + val take : t -> (t * elt) option + (** [take h] extracts and returns the minimum element, and the new heap (without + this element), or [None] if the heap [h] is empty. *) + + val take_exn : t -> t * elt + (** [take_exn h] is like {!take}, but can fail. + @raise Empty if the heap is empty. *) + + val delete_one : (elt -> elt -> bool) -> elt -> t -> t + (** [delete_one eq x h] uses [eq] to find one occurrence of a value [x] + if it exist in the heap [h], and delete it. + If [h] do not contain [x] then it return [h]. + @since 2.0 *) + + val size : t -> int +end + +module Make (E : PARTIAL_ORD) : S with type elt = E.t diff --git a/src/unix/moonpool_unix.ml b/src/unix/moonpool_unix.ml new file mode 100644 index 00000000..6fcd2127 --- /dev/null +++ b/src/unix/moonpool_unix.ml @@ -0,0 +1,61 @@ +(* +(** Unix-compatible event loop *) +class type event_loop = + object + method one_step : block:bool -> unit -> unit + (** Run one step of the event loop. + @param block if [true], the call might block until the next timeout + or until the next IO event occurs. If [false], this does not + block and returns after having processed the available events. *) + + method on_readable : + Unix.file_descr -> (Cancel_handle.t -> unit) -> Cancel_handle.t + (** [on_readable fd f] creates a new event [ev], and will run [f ev] when + [fd] becomes readable *) + + method on_writable : + Unix.file_descr -> (Cancel_handle.t -> unit) -> Cancel_handle.t + + method on_timer : + float -> repeat:bool -> (Cancel_handle.t -> unit) -> Cancel_handle.t + (** [on_timer delay ~repeat f] runs [f] after [delay]. + @param repeat if true runs [f] every [delay] seconds *) + + method fake_io : Unix.file_descr -> unit + (** Simulate activity on the FD *) + + method interrupt_if_in_blocking_section : unit + (** If run from inside the event loop when it's waiting, wakes the event loop up *) + + method has_pending_tasks : bool + end + +(* TODO: for lwt backend: + let has_pending_tasks (self : #t) : bool = + self#readable_count > 0 || self#writable_count > 0 || self#timer_count > 0 + + method readable_count : int + (** Number of events waiting for FDs to be readable FDs *) + + method writable_count : int + + method timer_count : int + (** Number of events waiting on a timer *) + let readable_count (self : #t) = self#readable_count + let writable_count (self : #t) = self#writable_count + let timer_count (self : #t) = self#timer_count +*) + +let[@inline] one_step (self : #t) ~block () = self#one_step ~block () +let[@inline] on_readable (self : #t) fd f = self#on_readable fd f +let[@inline] on_writable (self : #t) fd f = self#on_writable fd f + +let[@inline] on_timer (self : #t) delay ~repeat f = + self#on_timer delay ~repeat f + +let[@inline] fake_io (self : #t) fd = self#fake_io fd +let[@inline] has_pending_tasks (self : #t) = self#has_pending_tasks + +let[@inline] interrupt_if_in_blocking_section (self : #t) : unit = + self#interrupt_if_in_blocking_section + *) diff --git a/src/unix/time.mli b/src/unix/time.mli new file mode 100644 index 00000000..f0037eed --- /dev/null +++ b/src/unix/time.mli @@ -0,0 +1,2 @@ +val time_s : unit -> float +val time_ns : unit -> int64 diff --git a/src/unix/time.mtime.ml b/src/unix/time.mtime.ml new file mode 100644 index 00000000..745b5412 --- /dev/null +++ b/src/unix/time.mtime.ml @@ -0,0 +1,10 @@ + + +let time_ns : unit -> int64 = Mtime_clock.now_ns + +(** Monotonic time in seconds *) +let time_s () : float = + let ns = time_ns () in + let s = Int64.(div ns 1_000_000_000L) in + let ns' = Int64.(rem ns 1_000_000_000L) in + Int64.to_float s +. (Int64.to_float ns' /. 1e9) diff --git a/src/unix/time.unix.ml b/src/unix/time.unix.ml new file mode 100644 index 00000000..cdb9f337 --- /dev/null +++ b/src/unix/time.unix.ml @@ -0,0 +1,2 @@ +let time_s = Unix.gettimeofday +let[@inline] time_ns () = Int64.of_float (floor (time_s () *. 1e9)) diff --git a/src/unix/timer.ml b/src/unix/timer.ml new file mode 100644 index 00000000..c677e0a4 --- /dev/null +++ b/src/unix/timer.ml @@ -0,0 +1,94 @@ +type instant_s = float +type duration_s = float + +type kind = + | Once + | Every of duration_s + +type task = { + mutable deadline: instant_s; + mutable active: bool; + f: Cancel_handle.t -> unit; + as_cancel_handle: Cancel_handle.t; + kind: kind; +} + +module Task_heap = Heap.Make (struct + type t = task + + let[@inline] leq t1 t2 = t1.deadline <= t2.deadline +end) + +type t = { + mutable tasks: Task_heap.t; + mutable n_tasks: int; +} + +(** accepted time diff for actions.*) +let epsilon_s = 0.000_001 + +type tick_res = + | Wait of float + | Run of (Cancel_handle.t -> unit) * Cancel_handle.t + | Empty + +let[@inline] has_tasks self = not (Task_heap.is_empty self.tasks) +let[@inline] num_tasks self : int = self.n_tasks + +let[@inline] pop_task_ self : unit = + let tasks, _t = Task_heap.take_exn self.tasks in + self.n_tasks <- self.n_tasks - 1; + self.tasks <- tasks + +let run_after_s self delay cancel f : unit = + let now = Time.time_s () in + let deadline = now +. delay in + let task = + { deadline; f; kind = Once; active = true; as_cancel_handle = cancel } + in + self.tasks <- Task_heap.insert task self.tasks; + self.n_tasks <- 1 + self.n_tasks; + Cancel_handle.on_cancel cancel (fun () -> task.active <- false) + +let run_every_s self delay cancel f : unit = + let now = Time.time_s () in + let deadline = now +. delay in + let task = + { + deadline; + f; + kind = Every delay; + active = true; + as_cancel_handle = cancel; + } + in + self.tasks <- Task_heap.insert task self.tasks; + self.n_tasks <- 1 + self.n_tasks; + Cancel_handle.on_cancel cancel (fun () -> task.active <- false) + +let rec next (self : t) : tick_res = + match Task_heap.find_min self.tasks with + | None -> Empty + | Some task when not task.active -> + pop_task_ self; + next self + | Some task -> + let now = Time.time_s () in + + let remaining_time_s = task.deadline -. now in + if remaining_time_s <= epsilon_s then ( + pop_task_ self; + + (match task.kind with + | Once -> () + | Every dur -> + (* schedule the next iteration *) + task.deadline <- now +. dur; + self.n_tasks <- 1 + self.n_tasks; + self.tasks <- Task_heap.insert task self.tasks); + + Run (task.f, task.as_cancel_handle) + ) else + Wait remaining_time_s + +let create () = { tasks = Task_heap.empty; n_tasks = 0 } diff --git a/src/unix/timer.mli b/src/unix/timer.mli new file mode 100644 index 00000000..e57dd75c --- /dev/null +++ b/src/unix/timer.mli @@ -0,0 +1,24 @@ +type t + +val create : unit -> t +(** A new timer. *) + +type tick_res = + | Wait of float (** Wait for number of seconds *) + | Run of (Cancel_handle.t -> unit) * Cancel_handle.t + | Empty (** Next action to take *) + +val next : t -> tick_res +(** Next action to take *) + +val run_after_s : + t -> float -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit + +val run_every_s : + t -> float -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit + +val has_tasks : t -> bool +(** Does the timer contain anything? *) + +val num_tasks : t -> int +(** Number of tasks in the timer *)