mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-07 11:45:39 -05:00
wip: Unix-based IO, now using iostream
This commit is contained in:
parent
839a1e4717
commit
dce54289ed
7 changed files with 36 additions and 26 deletions
|
|
@ -81,12 +81,6 @@ module IO : sig
|
||||||
(** Suspend the fiber for [n] seconds. *)
|
(** Suspend the fiber for [n] seconds. *)
|
||||||
end
|
end
|
||||||
|
|
||||||
module IO_in = IO_in
|
|
||||||
(** Input channel *)
|
|
||||||
|
|
||||||
module IO_out = IO_out
|
|
||||||
(** Output channel *)
|
|
||||||
|
|
||||||
module TCP_server : sig
|
module TCP_server : sig
|
||||||
type t = Lwt_io.server
|
type t = Lwt_io.server
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ open Common_
|
||||||
|
|
||||||
type file_descr = Unix.file_descr
|
type file_descr = Unix.file_descr
|
||||||
|
|
||||||
|
(** Non blocking read *)
|
||||||
let rec read fd buf i len : int =
|
let rec read fd buf i len : int =
|
||||||
if len = 0 then
|
if len = 0 then
|
||||||
0
|
0
|
||||||
|
|
@ -25,6 +26,7 @@ let rec read fd buf i len : int =
|
||||||
| n -> n
|
| n -> n
|
||||||
)
|
)
|
||||||
|
|
||||||
|
(** Non blocking write *)
|
||||||
let rec write_once fd buf i len : int =
|
let rec write_once fd buf i len : int =
|
||||||
if len = 0 then
|
if len = 0 then
|
||||||
0
|
0
|
||||||
|
|
|
||||||
|
|
@ -2,14 +2,18 @@
|
||||||
|
|
||||||
open Common_
|
open Common_
|
||||||
|
|
||||||
|
type waiter = Waiter : ('a -> unit) * 'a -> waiter
|
||||||
|
|
||||||
type state =
|
type state =
|
||||||
| Cancelled
|
| Cancelled
|
||||||
| Waiting of { waiters: (unit -> unit) list }
|
| Waiting of { waiters: waiter list }
|
||||||
|
|
||||||
type t = { st: state A.t } [@@unboxed]
|
type t = { st: state A.t } [@@unboxed]
|
||||||
|
|
||||||
let create () : t = { st = A.make (Waiting { waiters = [] }) }
|
let create () : t = { st = A.make (Waiting { waiters = [] }) }
|
||||||
let create_with f : t = { st = A.make (Waiting { waiters = [ f ] }) }
|
|
||||||
|
let create_with f : t =
|
||||||
|
{ st = A.make (Waiting { waiters = [ Waiter (f, ()) ] }) }
|
||||||
|
|
||||||
let cancel (self : t) =
|
let cancel (self : t) =
|
||||||
while
|
while
|
||||||
|
|
@ -18,7 +22,7 @@ let cancel (self : t) =
|
||||||
| Cancelled -> false
|
| Cancelled -> false
|
||||||
| Waiting { waiters } ->
|
| Waiting { waiters } ->
|
||||||
if A.compare_and_set self.st old_st Cancelled then (
|
if A.compare_and_set self.st old_st Cancelled then (
|
||||||
List.iter (fun f -> f ()) waiters;
|
List.iter (fun (Waiter (f, x)) -> f x) waiters;
|
||||||
false
|
false
|
||||||
) else
|
) else
|
||||||
true
|
true
|
||||||
|
|
@ -26,17 +30,19 @@ let cancel (self : t) =
|
||||||
()
|
()
|
||||||
done
|
done
|
||||||
|
|
||||||
let on_cancel (self : t) f : unit =
|
let on_cancel1 (self : t) f x : unit =
|
||||||
|
let waiter = Waiter (f, x) in
|
||||||
while
|
while
|
||||||
let old_st = A.get self.st in
|
let old_st = A.get self.st in
|
||||||
match old_st with
|
match old_st with
|
||||||
| Cancelled ->
|
| Cancelled ->
|
||||||
f ();
|
f x;
|
||||||
false
|
false
|
||||||
| Waiting { waiters = l } ->
|
| Waiting { waiters = l } ->
|
||||||
not (A.compare_and_set self.st old_st (Waiting { waiters = f :: l }))
|
not (A.compare_and_set self.st old_st (Waiting { waiters = waiter :: l }))
|
||||||
do
|
do
|
||||||
()
|
()
|
||||||
done
|
done
|
||||||
|
|
||||||
|
let[@inline] on_cancel self f = on_cancel1 self f ()
|
||||||
let dummy = { st = A.make Cancelled }
|
let dummy = { st = A.make Cancelled }
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,5 @@
|
||||||
|
(** Cancellation tokens *)
|
||||||
|
|
||||||
type t
|
type t
|
||||||
(** A handle to cancel atomic actions (waiting on something), or
|
(** A handle to cancel atomic actions (waiting on something), or
|
||||||
stopping a subscription to some event. *)
|
stopping a subscription to some event. *)
|
||||||
|
|
@ -5,6 +7,7 @@ type t
|
||||||
val create : unit -> t
|
val create : unit -> t
|
||||||
val create_with : (unit -> unit) -> t
|
val create_with : (unit -> unit) -> t
|
||||||
val on_cancel : t -> (unit -> unit) -> unit
|
val on_cancel : t -> (unit -> unit) -> unit
|
||||||
|
val on_cancel1 : t -> ('a -> unit) -> 'a -> unit
|
||||||
|
|
||||||
val cancel : t -> unit
|
val cancel : t -> unit
|
||||||
(** Perform the cancellation. This should be idempotent. *)
|
(** Perform the cancellation. This should be idempotent. *)
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,16 @@
|
||||||
|
|
||||||
(library
|
(library
|
||||||
(name moonpool_unix)
|
(name moonpool_unix)
|
||||||
(public_name moonpool.unix)
|
(public_name moonpool.unix)
|
||||||
(optional)
|
(optional) ; iostream
|
||||||
(synopsis "Simple Unix-based event loop for moonpool")
|
(synopsis "Simple Unix-based event loop for moonpool")
|
||||||
(private_modules common_ heap_)
|
(private_modules common_ heap_)
|
||||||
(libraries moonpool moonpool.fib unix
|
(libraries
|
||||||
(select time.ml from
|
moonpool
|
||||||
(mtime mtime.os.clock -> time.mtime.ml)
|
moonpool.fib
|
||||||
(-> time.unix.ml))
|
unix
|
||||||
))
|
iostream
|
||||||
|
(select
|
||||||
|
time.ml
|
||||||
|
from
|
||||||
|
(mtime mtime.os.clock -> time.mtime.ml)
|
||||||
|
(-> time.unix.ml))))
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,7 @@
|
||||||
|
|
||||||
|
|
||||||
let time_ns : unit -> int64 = Mtime_clock.now_ns
|
let time_ns : unit -> int64 = Mtime_clock.now_ns
|
||||||
|
|
||||||
(** Monotonic time in seconds *)
|
(** Monotonic time in seconds *)
|
||||||
let time_s () : float =
|
let[@inline] time_s () : float =
|
||||||
let ns = time_ns () in
|
let ns = time_ns () in
|
||||||
let s = Int64.(div ns 1_000_000_000L) in
|
let s = Int64.(div ns 1_000_000_000L) in
|
||||||
let ns' = Int64.(rem ns 1_000_000_000L) in
|
let ns' = Int64.(rem ns 1_000_000_000L) in
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,7 @@
|
||||||
|
(** Simple timer *)
|
||||||
|
|
||||||
type t
|
type t
|
||||||
|
(** A timer, running tasks after a duration or repeatedly with some frequency *)
|
||||||
|
|
||||||
val create : unit -> t
|
val create : unit -> t
|
||||||
(** A new timer. *)
|
(** A new timer. *)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue