mirror of
https://github.com/c-cube/nanoev.git
synced 2025-12-06 03:05:32 -05:00
feat(nanoev.picos): dep on iostream, add IO channels+Net
This commit is contained in:
parent
90311ad4fa
commit
653fddb850
17 changed files with 605 additions and 207 deletions
18
dune-project
18
dune-project
|
|
@ -20,9 +20,14 @@
|
||||||
(synopsis "Tiny event loop abstraction")
|
(synopsis "Tiny event loop abstraction")
|
||||||
(depends ocaml dune base-unix)
|
(depends ocaml dune base-unix)
|
||||||
(depopts
|
(depopts
|
||||||
(trace (>= 0.7))
|
(trace
|
||||||
|
(>= 0.7))
|
||||||
|
(iostream
|
||||||
|
(>= 0.3))
|
||||||
(picos
|
(picos
|
||||||
(and (>= 0.5) (< 0.7))))
|
(and
|
||||||
|
(>= 0.5)
|
||||||
|
(< 0.7))))
|
||||||
(tags
|
(tags
|
||||||
(unix select async)))
|
(unix select async)))
|
||||||
|
|
||||||
|
|
@ -40,9 +45,12 @@
|
||||||
ocaml
|
ocaml
|
||||||
dune
|
dune
|
||||||
nanoev
|
nanoev
|
||||||
(picos (>= 0.6))
|
(picos
|
||||||
|
(>= 0.6))
|
||||||
picos_std
|
picos_std
|
||||||
(tiny_httpd (>= 0.17)))
|
(tiny_httpd
|
||||||
(tags (nanoev http)))
|
(>= 0.17)))
|
||||||
|
(tags
|
||||||
|
(nanoev http)))
|
||||||
|
|
||||||
; See the complete stanza docs at https://dune.readthedocs.io/en/stable/reference/dune-project/index.html
|
; See the complete stanza docs at https://dune.readthedocs.io/en/stable/reference/dune-project/index.html
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ depends: [
|
||||||
]
|
]
|
||||||
depopts: [
|
depopts: [
|
||||||
"trace" {>= "0.7"}
|
"trace" {>= "0.7"}
|
||||||
|
"iostream" {>= "0.3"}
|
||||||
"picos" {>= "0.5" & < "0.7"}
|
"picos" {>= "0.5" & < "0.7"}
|
||||||
]
|
]
|
||||||
build: [
|
build: [
|
||||||
|
|
|
||||||
152
src/picos/IO_in.ml
Normal file
152
src/picos/IO_in.ml
Normal file
|
|
@ -0,0 +1,152 @@
|
||||||
|
open Common_
|
||||||
|
|
||||||
|
class type t = object
|
||||||
|
method input : bytes -> int -> int -> int
|
||||||
|
(** Read into the slice. Returns [0] only if the stream is closed. *)
|
||||||
|
|
||||||
|
method close : unit -> unit
|
||||||
|
(** Close the input. Must be idempotent. *)
|
||||||
|
end
|
||||||
|
|
||||||
|
let create ?(close = ignore) ~input () : t =
|
||||||
|
object
|
||||||
|
method close = close
|
||||||
|
method input = input
|
||||||
|
end
|
||||||
|
|
||||||
|
let empty : t =
|
||||||
|
object
|
||||||
|
method close () = ()
|
||||||
|
method input _ _ _ = 0
|
||||||
|
end
|
||||||
|
|
||||||
|
let of_bytes ?(off = 0) ?len (b : bytes) : t =
|
||||||
|
(* i: current position in [b] *)
|
||||||
|
let i = ref off in
|
||||||
|
|
||||||
|
let len =
|
||||||
|
match len with
|
||||||
|
| Some n ->
|
||||||
|
if n > Bytes.length b - off then invalid_arg "Iostream.In.of_bytes";
|
||||||
|
n
|
||||||
|
| None -> Bytes.length b - off
|
||||||
|
in
|
||||||
|
let end_ = off + len in
|
||||||
|
|
||||||
|
object
|
||||||
|
method input b_out i_out len_out =
|
||||||
|
let n = min (end_ - !i) len_out in
|
||||||
|
Bytes.blit b !i b_out i_out n;
|
||||||
|
i := !i + n;
|
||||||
|
n
|
||||||
|
|
||||||
|
method close () = i := end_
|
||||||
|
end
|
||||||
|
|
||||||
|
let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s)
|
||||||
|
|
||||||
|
(** Read into the given slice.
|
||||||
|
@return the number of bytes read, [0] means end of input. *)
|
||||||
|
let[@inline] input (self : #t) buf i len = self#input buf i len
|
||||||
|
|
||||||
|
(** Close the channel. *)
|
||||||
|
let[@inline] close self : unit = self#close ()
|
||||||
|
|
||||||
|
let rec really_input (self : #t) buf i len =
|
||||||
|
if len > 0 then (
|
||||||
|
let n = input self buf i len in
|
||||||
|
if n = 0 then raise End_of_file;
|
||||||
|
(really_input [@tailrec]) self buf (i + n) (len - n)
|
||||||
|
)
|
||||||
|
|
||||||
|
let really_input_string self n : string =
|
||||||
|
let buf = Bytes.create n in
|
||||||
|
really_input self buf 0 n;
|
||||||
|
Bytes.unsafe_to_string buf
|
||||||
|
|
||||||
|
let copy_into ?(buf = Bytes.create _default_buf_size) (ic : #t) (oc : IO_out.t)
|
||||||
|
: unit =
|
||||||
|
let continue = ref true in
|
||||||
|
while !continue do
|
||||||
|
let len = input ic buf 0 (Bytes.length buf) in
|
||||||
|
if len = 0 then
|
||||||
|
continue := false
|
||||||
|
else
|
||||||
|
IO_out.output oc buf 0 len
|
||||||
|
done
|
||||||
|
|
||||||
|
let concat (l0 : t list) : t =
|
||||||
|
let l = ref l0 in
|
||||||
|
let rec input b i len : int =
|
||||||
|
match !l with
|
||||||
|
| [] -> 0
|
||||||
|
| ic :: tl ->
|
||||||
|
let n = ic#input b i len in
|
||||||
|
if n > 0 then
|
||||||
|
n
|
||||||
|
else (
|
||||||
|
l := tl;
|
||||||
|
input b i len
|
||||||
|
)
|
||||||
|
in
|
||||||
|
let close () = List.iter close l0 in
|
||||||
|
create ~close ~input ()
|
||||||
|
|
||||||
|
let input_all ?(buf = Bytes.create 128) (self : #t) : string =
|
||||||
|
let buf = ref buf in
|
||||||
|
let i = ref 0 in
|
||||||
|
|
||||||
|
let[@inline] full_ () = !i = Bytes.length !buf in
|
||||||
|
|
||||||
|
let grow_ () =
|
||||||
|
let old_size = Bytes.length !buf in
|
||||||
|
let new_size = min Sys.max_string_length (old_size + (old_size / 4) + 10) in
|
||||||
|
if old_size = new_size then
|
||||||
|
failwith "input_all: maximum input size exceeded";
|
||||||
|
let new_buf = Bytes.extend !buf 0 (new_size - old_size) in
|
||||||
|
buf := new_buf
|
||||||
|
in
|
||||||
|
|
||||||
|
let rec loop () =
|
||||||
|
if full_ () then grow_ ();
|
||||||
|
let available = Bytes.length !buf - !i in
|
||||||
|
let n = input self !buf !i available in
|
||||||
|
if n > 0 then (
|
||||||
|
i := !i + n;
|
||||||
|
(loop [@tailrec]) ()
|
||||||
|
)
|
||||||
|
in
|
||||||
|
loop ();
|
||||||
|
|
||||||
|
if full_ () then
|
||||||
|
Bytes.unsafe_to_string !buf
|
||||||
|
else
|
||||||
|
Bytes.sub_string !buf 0 !i
|
||||||
|
|
||||||
|
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size)
|
||||||
|
(fd : Unix.file_descr) : t =
|
||||||
|
let buf_len = ref 0 in
|
||||||
|
let buf_off = ref 0 in
|
||||||
|
|
||||||
|
let refill () =
|
||||||
|
buf_off := 0;
|
||||||
|
buf_len := Base.read fd buf 0 (Bytes.length buf)
|
||||||
|
in
|
||||||
|
|
||||||
|
object
|
||||||
|
method input b i len : int =
|
||||||
|
if !buf_len = 0 then refill ();
|
||||||
|
let n = min len !buf_len in
|
||||||
|
if n > 0 then (
|
||||||
|
Bytes.blit buf !buf_off b i n;
|
||||||
|
buf_off := !buf_off + n;
|
||||||
|
buf_len := !buf_len - n
|
||||||
|
);
|
||||||
|
n
|
||||||
|
|
||||||
|
method close () =
|
||||||
|
if close_noerr then (
|
||||||
|
try Unix.close fd with _ -> ()
|
||||||
|
) else
|
||||||
|
Unix.close fd
|
||||||
|
end
|
||||||
118
src/picos/IO_out.ml
Normal file
118
src/picos/IO_out.ml
Normal file
|
|
@ -0,0 +1,118 @@
|
||||||
|
open Common_
|
||||||
|
|
||||||
|
class type t = object
|
||||||
|
method output_char : char -> unit
|
||||||
|
method output : bytes -> int -> int -> unit
|
||||||
|
method flush : unit -> unit
|
||||||
|
method close : unit -> unit
|
||||||
|
end
|
||||||
|
|
||||||
|
let create ?(flush = ignore) ?(close = ignore) ~output_char ~output () : t =
|
||||||
|
object
|
||||||
|
method flush () = flush ()
|
||||||
|
method close () = close ()
|
||||||
|
method output_char c = output_char c
|
||||||
|
method output bs i len = output bs i len
|
||||||
|
end
|
||||||
|
|
||||||
|
let dummy : t =
|
||||||
|
object
|
||||||
|
method flush () = ()
|
||||||
|
method close () = ()
|
||||||
|
method output_char _ = ()
|
||||||
|
method output _ _ _ = ()
|
||||||
|
end
|
||||||
|
|
||||||
|
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) fd
|
||||||
|
: t =
|
||||||
|
let buf_off = ref 0 in
|
||||||
|
|
||||||
|
let[@inline] is_full () = !buf_off = Bytes.length buf in
|
||||||
|
|
||||||
|
let flush () =
|
||||||
|
if !buf_off > 0 then (
|
||||||
|
Base.write fd buf 0 !buf_off;
|
||||||
|
buf_off := 0
|
||||||
|
)
|
||||||
|
in
|
||||||
|
|
||||||
|
object
|
||||||
|
method output_char c =
|
||||||
|
if is_full () then flush ();
|
||||||
|
Bytes.set buf !buf_off c;
|
||||||
|
incr buf_off
|
||||||
|
|
||||||
|
method output bs i len : unit =
|
||||||
|
let i = ref i in
|
||||||
|
let len = ref len in
|
||||||
|
|
||||||
|
while !len > 0 do
|
||||||
|
(* make space *)
|
||||||
|
if is_full () then flush ();
|
||||||
|
|
||||||
|
let n = min !len (Bytes.length buf - !buf_off) in
|
||||||
|
Bytes.blit bs !i buf !buf_off n;
|
||||||
|
buf_off := !buf_off + n;
|
||||||
|
i := !i + n;
|
||||||
|
len := !len - n
|
||||||
|
done;
|
||||||
|
(* if full, write eagerly *)
|
||||||
|
if is_full () then flush ()
|
||||||
|
|
||||||
|
method close () =
|
||||||
|
if close_noerr then (
|
||||||
|
try
|
||||||
|
flush ();
|
||||||
|
Unix.close fd
|
||||||
|
with _ -> ()
|
||||||
|
) else (
|
||||||
|
flush ();
|
||||||
|
Unix.close fd
|
||||||
|
)
|
||||||
|
|
||||||
|
method flush = flush
|
||||||
|
end
|
||||||
|
|
||||||
|
let of_buffer (buf : Buffer.t) : t =
|
||||||
|
object
|
||||||
|
method close () = ()
|
||||||
|
method flush () = ()
|
||||||
|
method output_char c = Buffer.add_char buf c
|
||||||
|
method output bs i len = Buffer.add_subbytes buf bs i len
|
||||||
|
end
|
||||||
|
|
||||||
|
(** Output the buffer slice into this channel *)
|
||||||
|
let[@inline] output_char (self : #t) c : unit = self#output_char c
|
||||||
|
|
||||||
|
(** Output the buffer slice into this channel *)
|
||||||
|
let[@inline] output (self : #t) buf i len : unit = self#output buf i len
|
||||||
|
|
||||||
|
let[@inline] output_string (self : #t) (str : string) : unit =
|
||||||
|
self#output (Bytes.unsafe_of_string str) 0 (String.length str)
|
||||||
|
|
||||||
|
let output_line (self : #t) (str : string) : unit =
|
||||||
|
output_string self str;
|
||||||
|
output_char self '\n'
|
||||||
|
|
||||||
|
(** Close the channel. *)
|
||||||
|
let[@inline] close self : unit = self#close ()
|
||||||
|
|
||||||
|
(** Flush (ie. force write) any buffered bytes. *)
|
||||||
|
let[@inline] flush self : unit = self#flush ()
|
||||||
|
|
||||||
|
let output_int self i =
|
||||||
|
let s = string_of_int i in
|
||||||
|
output_string self s
|
||||||
|
|
||||||
|
let output_lines self seq = Seq.iter (output_line self) seq
|
||||||
|
|
||||||
|
let tee (l : t list) : t =
|
||||||
|
match l with
|
||||||
|
| [] -> dummy
|
||||||
|
| [ oc ] -> oc
|
||||||
|
| _ ->
|
||||||
|
let output bs i len = List.iter (fun oc -> output oc bs i len) l in
|
||||||
|
let output_char c = List.iter (fun oc -> output_char oc c) l in
|
||||||
|
let close () = List.iter close l in
|
||||||
|
let flush () = List.iter flush l in
|
||||||
|
create ~flush ~close ~output ~output_char ()
|
||||||
7
src/picos/background_thread.ml
Normal file
7
src/picos/background_thread.ml
Normal file
|
|
@ -0,0 +1,7 @@
|
||||||
|
let is_setup = Global_.has_bg_thread
|
||||||
|
let setup = Global_.setup_bg_thread
|
||||||
|
let shutdown = Global_.shutdown_bg_thread
|
||||||
|
|
||||||
|
let with_setup ev f =
|
||||||
|
setup ev;
|
||||||
|
Fun.protect ~finally:shutdown f
|
||||||
10
src/picos/background_thread.mli
Normal file
10
src/picos/background_thread.mli
Normal file
|
|
@ -0,0 +1,10 @@
|
||||||
|
val setup : Nanoev.t -> unit
|
||||||
|
(** Install this event loop in a background thread *)
|
||||||
|
|
||||||
|
val shutdown : unit -> unit
|
||||||
|
(** Shutdown background thread, assuming {! is_setup} returns [true] *)
|
||||||
|
|
||||||
|
val with_setup : Nanoev.t -> (unit -> 'a) -> 'a
|
||||||
|
|
||||||
|
val is_setup : unit -> bool
|
||||||
|
(** [is_setup()] is [true] iff a background thread is running a nanoev loop *)
|
||||||
96
src/picos/base.ml
Normal file
96
src/picos/base.ml
Normal file
|
|
@ -0,0 +1,96 @@
|
||||||
|
open Common_
|
||||||
|
|
||||||
|
let[@inline] get_loop_exn_ () : Nanoev.t =
|
||||||
|
match Atomic.get Global_.st with
|
||||||
|
| None -> failwith "No nanoev loop installed."
|
||||||
|
| Some st -> st.nanoev
|
||||||
|
|
||||||
|
let[@inline] unwrap_ = function
|
||||||
|
| None -> ()
|
||||||
|
| Some (exn, bt) -> Printexc.raise_with_backtrace exn bt
|
||||||
|
|
||||||
|
let[@unroll 1] rec retry_read_ fd f =
|
||||||
|
match f () with
|
||||||
|
| res -> res
|
||||||
|
| exception
|
||||||
|
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
|
||||||
|
(* Trace_.message "read must wait"; *)
|
||||||
|
let trigger = Picos.Trigger.create () in
|
||||||
|
let closed_r = ref false in
|
||||||
|
let ev = get_loop_exn_ () in
|
||||||
|
Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r ->
|
||||||
|
closed_r := closed;
|
||||||
|
Picos.Trigger.signal trigger);
|
||||||
|
Picos.Trigger.await trigger |> unwrap_;
|
||||||
|
if !closed_r then raise Closed;
|
||||||
|
retry_read_ fd f
|
||||||
|
|
||||||
|
let[@unroll 1] rec retry_write_ fd f =
|
||||||
|
match f () with
|
||||||
|
| res -> res
|
||||||
|
| exception
|
||||||
|
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
|
||||||
|
(* Trace_.message "write must wait"; *)
|
||||||
|
let ev = get_loop_exn_ () in
|
||||||
|
let trigger = Picos.Trigger.create () in
|
||||||
|
let closed_r = ref false in
|
||||||
|
Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r ->
|
||||||
|
closed_r := closed;
|
||||||
|
Picos.Trigger.signal trigger);
|
||||||
|
Picos.Trigger.await trigger |> unwrap_;
|
||||||
|
if !closed_r then raise Closed;
|
||||||
|
retry_write_ fd f
|
||||||
|
|
||||||
|
let read fd buf i len : int =
|
||||||
|
try
|
||||||
|
retry_read_ fd (fun () ->
|
||||||
|
(* Trace_.message "read"; *)
|
||||||
|
Unix.read fd buf i len)
|
||||||
|
with Closed -> 0
|
||||||
|
|
||||||
|
let close fd =
|
||||||
|
Unix.close fd;
|
||||||
|
let ev = get_loop_exn_ () in
|
||||||
|
Nanoev.close ev fd
|
||||||
|
|
||||||
|
let accept fd =
|
||||||
|
try
|
||||||
|
retry_read_ fd (fun () ->
|
||||||
|
(* Trace_.message "accept"; *)
|
||||||
|
Unix.accept fd)
|
||||||
|
with Unix.Unix_error ((Unix.ESHUTDOWN | Unix.ECONNABORTED), _, _) ->
|
||||||
|
raise Closed
|
||||||
|
|
||||||
|
let write_once fd buf i len : int =
|
||||||
|
try
|
||||||
|
retry_write_ fd (fun () ->
|
||||||
|
(* Trace_.message "write"; *)
|
||||||
|
Unix.write fd buf i len)
|
||||||
|
with Closed -> 0
|
||||||
|
|
||||||
|
let rec write fd buf i len =
|
||||||
|
if len > 0 then (
|
||||||
|
let n = write_once fd buf i len in
|
||||||
|
if n < len then write fd buf (i + n) (len - n)
|
||||||
|
)
|
||||||
|
|
||||||
|
let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr)
|
||||||
|
|
||||||
|
let[@inline] max_fds () =
|
||||||
|
match Atomic.get Global_.st with
|
||||||
|
| None -> 1024
|
||||||
|
| Some st -> Nanoev.max_fds st.nanoev
|
||||||
|
|
||||||
|
let sleep t =
|
||||||
|
if t > 0. then (
|
||||||
|
let ev = get_loop_exn_ () in
|
||||||
|
let trigger = Picos.Trigger.create () in
|
||||||
|
Nanoev.run_after_s ev t trigger () (fun trigger () ->
|
||||||
|
Picos.Trigger.signal trigger);
|
||||||
|
Picos.Trigger.await trigger |> unwrap_
|
||||||
|
)
|
||||||
|
|
||||||
|
module Raw = struct
|
||||||
|
let retry_read = retry_read_
|
||||||
|
let retry_write = retry_write_
|
||||||
|
end
|
||||||
37
src/picos/base.mli
Normal file
37
src/picos/base.mli
Normal file
|
|
@ -0,0 +1,37 @@
|
||||||
|
val read : Unix.file_descr -> bytes -> int -> int -> int
|
||||||
|
(** Read from the non blocking FD.
|
||||||
|
@raise Nanoev.Closed if the FD is closed
|
||||||
|
@raise Unix.Unix_error for other errors *)
|
||||||
|
|
||||||
|
val write_once : Unix.file_descr -> bytes -> int -> int -> int
|
||||||
|
(** Write into the non blocking FD.
|
||||||
|
@raise Nanoev.Closed if the FD is closed
|
||||||
|
@raise Unix.Unix_error for other errors *)
|
||||||
|
|
||||||
|
val write : Unix.file_descr -> bytes -> int -> int -> unit
|
||||||
|
|
||||||
|
val close : Unix.file_descr -> unit
|
||||||
|
(** Close the file descriptor
|
||||||
|
@raise Unix.Unix_error when it fails *)
|
||||||
|
|
||||||
|
val connect : Unix.file_descr -> Unix.sockaddr -> unit
|
||||||
|
(** Connect this FD to the remote address.
|
||||||
|
@raise Nanoev.Closed if the FD is closed.
|
||||||
|
@raise Unix.Unix_error for other errors *)
|
||||||
|
|
||||||
|
val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr
|
||||||
|
(** Accept a connection on this fd.
|
||||||
|
@raise Nanoev.Closed if the FD is closed.
|
||||||
|
@raise Unix.Unix_error for other errors *)
|
||||||
|
|
||||||
|
val max_fds : unit -> int
|
||||||
|
(** Maximum number of file descriptors one can await on. See {!Nanoev.max_fds}
|
||||||
|
*)
|
||||||
|
|
||||||
|
val sleep : float -> unit
|
||||||
|
(** Suspend current fiber for [n] seconds *)
|
||||||
|
|
||||||
|
module Raw : sig
|
||||||
|
val retry_read : Unix.file_descr -> (unit -> 'a) -> 'a
|
||||||
|
val retry_write : Unix.file_descr -> (unit -> 'a) -> 'a
|
||||||
|
end
|
||||||
6
src/picos/common_.ml
Normal file
6
src/picos/common_.ml
Normal file
|
|
@ -0,0 +1,6 @@
|
||||||
|
module Trace_ = Nanoev.Trace_
|
||||||
|
|
||||||
|
let ( let@ ) = ( @@ )
|
||||||
|
let _default_buf_size = 4 * 1024
|
||||||
|
|
||||||
|
exception Closed = Nanoev.Closed
|
||||||
|
|
@ -2,4 +2,4 @@
|
||||||
(name nanoev_picos)
|
(name nanoev_picos)
|
||||||
(public_name nanoev.picos)
|
(public_name nanoev.picos)
|
||||||
(optional) ; picos
|
(optional) ; picos
|
||||||
(libraries threads picos nanoev))
|
(libraries threads picos iostream nanoev))
|
||||||
|
|
|
||||||
59
src/picos/global_.ml
Normal file
59
src/picos/global_.ml
Normal file
|
|
@ -0,0 +1,59 @@
|
||||||
|
open Common_
|
||||||
|
|
||||||
|
type st =
|
||||||
|
| None
|
||||||
|
| Some of {
|
||||||
|
active: bool Atomic.t;
|
||||||
|
nanoev: Nanoev.t;
|
||||||
|
th: Thread.t;
|
||||||
|
}
|
||||||
|
|
||||||
|
let st : st Atomic.t = Atomic.make None
|
||||||
|
let lock = Mutex.create ()
|
||||||
|
|
||||||
|
let with_lock lock f =
|
||||||
|
Mutex.lock lock;
|
||||||
|
match f () with
|
||||||
|
| exception e ->
|
||||||
|
Mutex.unlock lock;
|
||||||
|
raise e
|
||||||
|
| x ->
|
||||||
|
Mutex.unlock lock;
|
||||||
|
x
|
||||||
|
|
||||||
|
let bg_thread_ ~active ~evloop () : unit =
|
||||||
|
Trace_.set_thread_name "nanoev.picos.bg-thread";
|
||||||
|
while Atomic.get active do
|
||||||
|
Nanoev.step evloop
|
||||||
|
done
|
||||||
|
|
||||||
|
let[@inline] has_bg_thread () = Atomic.get st <> None
|
||||||
|
|
||||||
|
let setup_bg_thread (ev : Nanoev.t) : unit =
|
||||||
|
let@ () = with_lock lock in
|
||||||
|
(* shutdown existing thread, if any *)
|
||||||
|
(match Atomic.get st with
|
||||||
|
| Some st ->
|
||||||
|
Atomic.set st.active false;
|
||||||
|
Nanoev.wakeup_from_outside st.nanoev;
|
||||||
|
Thread.join st.th
|
||||||
|
| None -> ());
|
||||||
|
|
||||||
|
(* start new bg thread *)
|
||||||
|
let active = Atomic.make true in
|
||||||
|
Atomic.set st
|
||||||
|
@@ Some
|
||||||
|
{
|
||||||
|
active;
|
||||||
|
nanoev = ev;
|
||||||
|
th = Thread.create (bg_thread_ ~active ~evloop:ev) ();
|
||||||
|
}
|
||||||
|
|
||||||
|
let shutdown_bg_thread () =
|
||||||
|
let@ () = with_lock lock in
|
||||||
|
match Atomic.exchange st None with
|
||||||
|
| None -> ()
|
||||||
|
| Some st ->
|
||||||
|
Atomic.set st.active false;
|
||||||
|
Nanoev.wakeup_from_outside st.nanoev;
|
||||||
|
Thread.join st.th
|
||||||
|
|
@ -1,161 +1,7 @@
|
||||||
open struct
|
module Background_thread = Background_thread
|
||||||
module Trace_ = Nanoev.Trace_
|
module Base = Base
|
||||||
|
include Base
|
||||||
let ( let@ ) = ( @@ )
|
module IO_in = IO_in
|
||||||
end
|
module IO_out = IO_out
|
||||||
|
module Net_client = Net_client
|
||||||
exception Closed = Nanoev.Closed
|
module Net_server = Net_server
|
||||||
|
|
||||||
module Global_ = struct
|
|
||||||
type st =
|
|
||||||
| None
|
|
||||||
| Some of {
|
|
||||||
active: bool Atomic.t;
|
|
||||||
nanoev: Nanoev.t;
|
|
||||||
th: Thread.t;
|
|
||||||
}
|
|
||||||
|
|
||||||
let st : st Atomic.t = Atomic.make None
|
|
||||||
let lock = Mutex.create ()
|
|
||||||
|
|
||||||
let with_lock lock f =
|
|
||||||
Mutex.lock lock;
|
|
||||||
match f () with
|
|
||||||
| exception e ->
|
|
||||||
Mutex.unlock lock;
|
|
||||||
raise e
|
|
||||||
| x ->
|
|
||||||
Mutex.unlock lock;
|
|
||||||
x
|
|
||||||
|
|
||||||
let bg_thread_ ~active ~evloop () : unit =
|
|
||||||
Trace_.set_thread_name "nanoev.picos.bg-thread";
|
|
||||||
while Atomic.get active do
|
|
||||||
Nanoev.step evloop
|
|
||||||
done
|
|
||||||
|
|
||||||
let[@inline] has_bg_thread () = Atomic.get st <> None
|
|
||||||
|
|
||||||
let setup_bg_thread (ev : Nanoev.t) : unit =
|
|
||||||
let@ () = with_lock lock in
|
|
||||||
(* shutdown existing thread, if any *)
|
|
||||||
(match Atomic.get st with
|
|
||||||
| Some st ->
|
|
||||||
Atomic.set st.active false;
|
|
||||||
Nanoev.wakeup_from_outside st.nanoev;
|
|
||||||
Thread.join st.th
|
|
||||||
| None -> ());
|
|
||||||
|
|
||||||
(* start new bg thread *)
|
|
||||||
let active = Atomic.make true in
|
|
||||||
Atomic.set st
|
|
||||||
@@ Some
|
|
||||||
{
|
|
||||||
active;
|
|
||||||
nanoev = ev;
|
|
||||||
th = Thread.create (bg_thread_ ~active ~evloop:ev) ();
|
|
||||||
}
|
|
||||||
|
|
||||||
let shutdown_bg_thread () =
|
|
||||||
let@ () = with_lock lock in
|
|
||||||
match Atomic.exchange st None with
|
|
||||||
| None -> ()
|
|
||||||
| Some st ->
|
|
||||||
Atomic.set st.active false;
|
|
||||||
Nanoev.wakeup_from_outside st.nanoev;
|
|
||||||
Thread.join st.th
|
|
||||||
end
|
|
||||||
|
|
||||||
module Background_thread = struct
|
|
||||||
let is_setup = Global_.has_bg_thread
|
|
||||||
let setup = Global_.setup_bg_thread
|
|
||||||
let shutdown = Global_.shutdown_bg_thread
|
|
||||||
|
|
||||||
let with_setup ev f =
|
|
||||||
setup ev;
|
|
||||||
Fun.protect ~finally:shutdown f
|
|
||||||
end
|
|
||||||
|
|
||||||
let[@inline] get_loop_exn_ () : Nanoev.t =
|
|
||||||
match Atomic.get Global_.st with
|
|
||||||
| None -> failwith "No nanoev loop installed."
|
|
||||||
| Some st -> st.nanoev
|
|
||||||
|
|
||||||
let[@inline] unwrap_ = function
|
|
||||||
| None -> ()
|
|
||||||
| Some (exn, bt) -> Printexc.raise_with_backtrace exn bt
|
|
||||||
|
|
||||||
let[@unroll 1] rec retry_read_ fd f =
|
|
||||||
match f () with
|
|
||||||
| res -> res
|
|
||||||
| exception
|
|
||||||
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
|
|
||||||
(* Trace_.message "read must wait"; *)
|
|
||||||
let trigger = Picos.Trigger.create () in
|
|
||||||
let closed_r = ref false in
|
|
||||||
let ev = get_loop_exn_ () in
|
|
||||||
Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r ->
|
|
||||||
closed_r := closed;
|
|
||||||
Picos.Trigger.signal trigger);
|
|
||||||
Picos.Trigger.await trigger |> unwrap_;
|
|
||||||
if !closed_r then raise Closed;
|
|
||||||
retry_read_ fd f
|
|
||||||
|
|
||||||
let[@unroll 1] rec retry_write_ fd f =
|
|
||||||
match f () with
|
|
||||||
| res -> res
|
|
||||||
| exception
|
|
||||||
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
|
|
||||||
(* Trace_.message "write must wait"; *)
|
|
||||||
let ev = get_loop_exn_ () in
|
|
||||||
let trigger = Picos.Trigger.create () in
|
|
||||||
let closed_r = ref false in
|
|
||||||
Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r ->
|
|
||||||
closed_r := closed;
|
|
||||||
Picos.Trigger.signal trigger);
|
|
||||||
Picos.Trigger.await trigger |> unwrap_;
|
|
||||||
if !closed_r then raise Closed;
|
|
||||||
retry_write_ fd f
|
|
||||||
|
|
||||||
let read fd buf i len : int =
|
|
||||||
try
|
|
||||||
retry_read_ fd (fun () ->
|
|
||||||
(* Trace_.message "read"; *)
|
|
||||||
Unix.read fd buf i len)
|
|
||||||
with Closed -> 0
|
|
||||||
|
|
||||||
let close fd =
|
|
||||||
Unix.close fd;
|
|
||||||
let ev = get_loop_exn_ () in
|
|
||||||
Nanoev.close ev fd
|
|
||||||
|
|
||||||
let accept fd =
|
|
||||||
try
|
|
||||||
retry_read_ fd (fun () ->
|
|
||||||
(* Trace_.message "accept"; *)
|
|
||||||
Unix.accept fd)
|
|
||||||
with Unix.Unix_error ((Unix.ESHUTDOWN | Unix.ECONNABORTED), _, _) ->
|
|
||||||
raise Closed
|
|
||||||
|
|
||||||
let write fd buf i len : int =
|
|
||||||
try
|
|
||||||
retry_write_ fd (fun () ->
|
|
||||||
(* Trace_.message "write"; *)
|
|
||||||
Unix.write fd buf i len)
|
|
||||||
with Closed -> 0
|
|
||||||
|
|
||||||
let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr)
|
|
||||||
|
|
||||||
let[@inline] max_fds () =
|
|
||||||
match Atomic.get Global_.st with
|
|
||||||
| None -> 1024
|
|
||||||
| Some st -> Nanoev.max_fds st.nanoev
|
|
||||||
|
|
||||||
let sleep t =
|
|
||||||
if t > 0. then (
|
|
||||||
let ev = get_loop_exn_ () in
|
|
||||||
let trigger = Picos.Trigger.create () in
|
|
||||||
Nanoev.run_after_s ev t trigger () (fun trigger () ->
|
|
||||||
Picos.Trigger.signal trigger);
|
|
||||||
Picos.Trigger.await trigger |> unwrap_
|
|
||||||
)
|
|
||||||
|
|
|
||||||
|
|
@ -1,47 +1,18 @@
|
||||||
(** Basic interface with picos *)
|
(** Basic interface with picos *)
|
||||||
|
|
||||||
module Background_thread : sig
|
module Background_thread = Background_thread
|
||||||
val setup : Nanoev.t -> unit
|
|
||||||
(** Install this event loop in a background thread *)
|
|
||||||
|
|
||||||
val shutdown : unit -> unit
|
|
||||||
(** Shutdown background thread, assuming {! is_setup} returns [true] *)
|
|
||||||
|
|
||||||
val with_setup : Nanoev.t -> (unit -> 'a) -> 'a
|
|
||||||
|
|
||||||
val is_setup : unit -> bool
|
|
||||||
(** [is_setup()] is [true] iff a background thread is running a nanoev loop *)
|
|
||||||
end
|
|
||||||
|
|
||||||
(** {2 Non blocking IO primitives} *)
|
(** {2 Non blocking IO primitives} *)
|
||||||
|
|
||||||
val read : Unix.file_descr -> bytes -> int -> int -> int
|
module Base = Base
|
||||||
(** Read from the non blocking FD.
|
|
||||||
@raise Nanoev.Closed if the FD is closed
|
|
||||||
@raise Unix.Unix_error for other errors *)
|
|
||||||
|
|
||||||
val write : Unix.file_descr -> bytes -> int -> int -> int
|
include module type of struct
|
||||||
(** Write into the non blocking FD.
|
include Base
|
||||||
@raise Nanoev.Closed if the FD is closed
|
end
|
||||||
@raise Unix.Unix_error for other errors *)
|
|
||||||
|
|
||||||
val close : Unix.file_descr -> unit
|
(** {2 Building blocks on top of {!Base}} *)
|
||||||
(** Close the file descriptor
|
|
||||||
@raise Unix.Unix_error when it fails *)
|
|
||||||
|
|
||||||
val connect : Unix.file_descr -> Unix.sockaddr -> unit
|
module IO_in = IO_in
|
||||||
(** Connect this FD to the remote address.
|
module IO_out = IO_out
|
||||||
@raise Nanoev.Closed if the FD is closed.
|
module Net_client = Net_client
|
||||||
@raise Unix.Unix_error for other errors *)
|
module Net_server = Net_server
|
||||||
|
|
||||||
val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr
|
|
||||||
(** Accept a connection on this fd.
|
|
||||||
@raise Nanoev.Closed if the FD is closed.
|
|
||||||
@raise Unix.Unix_error for other errors *)
|
|
||||||
|
|
||||||
val max_fds : unit -> int
|
|
||||||
(** Maximum number of file descriptors one can await on. See {!Nanoev.max_fds}
|
|
||||||
*)
|
|
||||||
|
|
||||||
val sleep : float -> unit
|
|
||||||
(** Suspend current fiber for [n] seconds *)
|
|
||||||
|
|
|
||||||
20
src/picos/net_client.ml
Normal file
20
src/picos/net_client.ml
Normal file
|
|
@ -0,0 +1,20 @@
|
||||||
|
open Common_
|
||||||
|
|
||||||
|
let connect addr : Unix.file_descr =
|
||||||
|
let sock = Unix.socket (Unix.domain_of_sockaddr addr) Unix.SOCK_STREAM 0 in
|
||||||
|
Unix.set_nonblock sock;
|
||||||
|
(try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ());
|
||||||
|
|
||||||
|
(* connect asynchronously *)
|
||||||
|
Base.Raw.retry_write sock (fun () -> Unix.connect sock addr);
|
||||||
|
sock
|
||||||
|
|
||||||
|
let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a =
|
||||||
|
let sock = connect addr in
|
||||||
|
|
||||||
|
let ic = IO_in.of_unix_fd sock in
|
||||||
|
let oc = IO_out.of_unix_fd sock in
|
||||||
|
|
||||||
|
let finally () = try Unix.close sock with _ -> () in
|
||||||
|
let@ () = Fun.protect ~finally in
|
||||||
|
f ic oc
|
||||||
48
src/picos/net_server.ml
Normal file
48
src/picos/net_server.ml
Normal file
|
|
@ -0,0 +1,48 @@
|
||||||
|
type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit
|
||||||
|
|
||||||
|
type t = {
|
||||||
|
active: bool Atomic.t;
|
||||||
|
sock: Unix.file_descr;
|
||||||
|
client_handler: client_handler;
|
||||||
|
spawn: (unit -> unit) -> unit Picos.Computation.t;
|
||||||
|
mutable running: unit Picos.Computation.t option;
|
||||||
|
}
|
||||||
|
|
||||||
|
let shutdown (self : t) =
|
||||||
|
if Atomic.exchange self.active false then
|
||||||
|
Option.iter Picos.Computation.await self.running
|
||||||
|
|
||||||
|
open struct
|
||||||
|
let run (self : t) () : unit =
|
||||||
|
while Atomic.get self.active do
|
||||||
|
let client_sock, client_addr = Base.accept self.sock in
|
||||||
|
let comp =
|
||||||
|
self.spawn (fun () ->
|
||||||
|
let ic = IO_in.of_unix_fd client_sock in
|
||||||
|
let oc = IO_out.of_unix_fd client_sock in
|
||||||
|
self.client_handler client_addr ic oc)
|
||||||
|
in
|
||||||
|
ignore (comp : _ Picos.Computation.t)
|
||||||
|
done
|
||||||
|
end
|
||||||
|
|
||||||
|
let establish ?(backlog = 32) ~spawn ~(client_handler : client_handler) addr : t
|
||||||
|
=
|
||||||
|
let domain = Unix.domain_of_sockaddr addr in
|
||||||
|
let sock = Unix.socket domain Unix.SOCK_STREAM 0 in
|
||||||
|
Unix.bind sock addr;
|
||||||
|
Unix.listen sock backlog;
|
||||||
|
Unix.set_nonblock sock;
|
||||||
|
Unix.setsockopt sock Unix.SO_REUSEADDR true;
|
||||||
|
(try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ());
|
||||||
|
|
||||||
|
let server =
|
||||||
|
{ active = Atomic.make true; spawn; sock; client_handler; running = None }
|
||||||
|
in
|
||||||
|
|
||||||
|
server.running <- Some (spawn (run server));
|
||||||
|
server
|
||||||
|
|
||||||
|
let with_ ?backlog ~spawn ~client_handler addr f =
|
||||||
|
let server = establish ?backlog ~spawn ~client_handler addr in
|
||||||
|
Fun.protect ~finally:(fun () -> shutdown server) (fun () -> f server)
|
||||||
19
src/picos/net_server.mli
Normal file
19
src/picos/net_server.mli
Normal file
|
|
@ -0,0 +1,19 @@
|
||||||
|
type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit
|
||||||
|
type t
|
||||||
|
|
||||||
|
val shutdown : t -> unit
|
||||||
|
|
||||||
|
val establish :
|
||||||
|
?backlog:int ->
|
||||||
|
spawn:((unit -> unit) -> unit Picos.Computation.t) ->
|
||||||
|
client_handler:client_handler ->
|
||||||
|
Unix.sockaddr ->
|
||||||
|
t
|
||||||
|
|
||||||
|
val with_ :
|
||||||
|
?backlog:int ->
|
||||||
|
spawn:((unit -> unit) -> unit Picos.Computation.t) ->
|
||||||
|
client_handler:client_handler ->
|
||||||
|
Unix.sockaddr ->
|
||||||
|
(t -> 'a) ->
|
||||||
|
'a
|
||||||
|
|
@ -23,7 +23,7 @@ module Out = struct
|
||||||
let i = ref i in
|
let i = ref i in
|
||||||
let len = ref len0 in
|
let len = ref len0 in
|
||||||
while !len > 0 do
|
while !len > 0 do
|
||||||
match EV.write fd bs !i !len with
|
match EV.write_once fd bs !i !len with
|
||||||
| 0 -> failwith "write failed"
|
| 0 -> failwith "write failed"
|
||||||
| n ->
|
| n ->
|
||||||
i := !i + n;
|
i := !i + n;
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue