From 03676f2e3de5e7f7b0ef34e430b015fbcdcb38ba Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 8 Feb 2024 20:36:10 -0500 Subject: [PATCH] wip: moonpool_unix --- src/unix/IO_unix.ml | 12 ++++++++ src/unix/TCP_client.ml | 0 src/unix/TCP_server.ml | 61 +++++++++++++++++++++++++++++++++++++++ src/unix/TCP_server.mli | 12 ++++++++ src/unix/ev_loop.ml | 1 + src/unix/moonpool_unix.ml | 2 ++ 6 files changed, 88 insertions(+) create mode 100644 src/unix/TCP_client.ml create mode 100644 src/unix/TCP_server.ml create mode 100644 src/unix/TCP_server.mli diff --git a/src/unix/IO_unix.ml b/src/unix/IO_unix.ml index b63292ea..1427c0da 100644 --- a/src/unix/IO_unix.ml +++ b/src/unix/IO_unix.ml @@ -48,3 +48,15 @@ let write fd buf i len : unit = i := !i + n; len := !len - n done + +(** Sleep for the given amount of seconds *) +let sleep_s (f : float) : unit = + if f > 0. then + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~ls ~run:_ ~resume sus -> + let cancel = Cancel_handle.dummy in + Ev_loop.run_after_s f cancel (fun _cancel -> + resume ~ls sus @@ Ok ())); + } diff --git a/src/unix/TCP_client.ml b/src/unix/TCP_client.ml new file mode 100644 index 00000000..e69de29b diff --git a/src/unix/TCP_server.ml b/src/unix/TCP_server.ml new file mode 100644 index 00000000..9f05c003 --- /dev/null +++ b/src/unix/TCP_server.ml @@ -0,0 +1,61 @@ +open Common_ +module A = M.Atomic + +type t = { + active: bool A.t; + cancel: Cancel_handle.t; + port: int; + fd: Unix.file_descr; + runner: M.Runner.t; +} + +let port self = self.port + +let with_server ?(addr = Unix.inet_addr_any) ?(port = 0) ?(after_init = ignore) + ~runner ~handle_client () : unit = + let active = A.make true in + let cancel = Cancel_handle.create_with (fun () -> A.set active false) in + let server_sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + Unix.bind server_sock (Unix.ADDR_INET (addr, port)); + Unix.set_nonblock server_sock; + Unix.listen server_sock 16; + + (* now get the real port *) + let port = + match Unix.getsockname server_sock with + | Unix.ADDR_INET (_, p) -> p + | _ -> assert false + in + + (* Unix.setsockopt sock *) + let finally () = Unix.close server_sock in + let@ () = Fun.protect ~finally in + + let self = { active; cancel; port; fd = server_sock; runner } in + after_init self; + + while A.get self.active do + (* accept new client *) + match Unix.accept server_sock with + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + (* wait for [sock] to be ready *) + let@ _sp = Tracing_.with_span "tcp-server.suspend" in + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~ls ~run:_ ~resume sus -> + Ev_loop.wait_readable server_sock cancel (fun _ -> + resume ~ls sus @@ Ok ())); + } + | client_sock, client_addr -> + (* handle client *) + Unix.setsockopt client_sock Unix.TCP_NODELAY true; + Unix.set_nonblock client_sock; + + let ic = IO_in.of_unix_fd client_sock in + let oc = IO_out.of_unix_fd client_sock in + + M.Runner.run_async runner ~name:"tcp.handle-client" (fun () -> + let@ () = Fun.protect ~finally:(fun () -> Unix.close client_sock) in + handle_client self client_addr ic oc) + done diff --git a/src/unix/TCP_server.mli b/src/unix/TCP_server.mli new file mode 100644 index 00000000..7637967b --- /dev/null +++ b/src/unix/TCP_server.mli @@ -0,0 +1,12 @@ +type t + +val port : t -> int + +val with_server : + ?addr:Unix.inet_addr -> + ?port:int -> + ?after_init:(t -> unit) -> + runner:Moonpool.Runner.t -> + handle_client:(t -> Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) -> + unit -> + unit diff --git a/src/unix/ev_loop.ml b/src/unix/ev_loop.ml index 4b4932f4..8c25fea5 100644 --- a/src/unix/ev_loop.ml +++ b/src/unix/ev_loop.ml @@ -207,6 +207,7 @@ module Ev_loop = struct let reads, writes = IO_tbl.prepare_select self.io_tbl in A.set self.in_blocking_section true; let reads, writes, _ = + let@ _sp = Tracing_.with_span "moonpool-unix.select" in Unix.select (self.pipe_read :: reads) writes [] delay in A.set self.in_blocking_section false; diff --git a/src/unix/moonpool_unix.ml b/src/unix/moonpool_unix.ml index 5262890c..8acae7a0 100644 --- a/src/unix/moonpool_unix.ml +++ b/src/unix/moonpool_unix.ml @@ -13,6 +13,8 @@ module Fut = Moonpool.Fut module Cancel_handle = Cancel_handle module IO_in = IO_in module IO_out = IO_out +module TCP_client = TCP_client +module TCP_server = TCP_server include IO_unix let run_after_s = Ev_loop.run_after_s