From 0df0642dd186f3e814ad72bd08fbf8745051759b Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 14 Feb 2024 22:10:13 -0500 Subject: [PATCH] lwt: add TCP client/server functions that use lwt channels --- src/lwt/IO.ml | 1 - src/lwt/moonpool_lwt.ml | 64 ++-------------------------------------- src/lwt/moonpool_lwt.mli | 15 ++++++++++ src/lwt/tcp_client.ml | 57 +++++++++++++++++++++++++++++++++++ src/lwt/tcp_server.ml | 38 ++++++++++++++++++++++++ 5 files changed, 112 insertions(+), 63 deletions(-) create mode 100644 src/lwt/tcp_client.ml create mode 100644 src/lwt/tcp_server.ml diff --git a/src/lwt/IO.ml b/src/lwt/IO.ml index 2775a400..09ae6d07 100644 --- a/src/lwt/IO.ml +++ b/src/lwt/IO.ml @@ -1,4 +1,3 @@ -open Common_ open Base let rec read fd buf i len : int = diff --git a/src/lwt/moonpool_lwt.ml b/src/lwt/moonpool_lwt.ml index 8dbc0aee..1d92ddab 100644 --- a/src/lwt/moonpool_lwt.ml +++ b/src/lwt/moonpool_lwt.ml @@ -1,66 +1,6 @@ -open Common_ include Base module IO = IO module IO_out = IO_out module IO_in = IO_in - -module TCP_server = struct - type t = Lwt_io.server - - let establish ?backlog ?no_close ~runner addr handler : t = - let server = - Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr - (fun client_addr client_sock -> - let ic = IO_in.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in - let oc = IO_out.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in - - let fut = - M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc) - in - - let lwt_fut = lwt_of_fut fut in - lwt_fut) - in - await_lwt server - - let shutdown self = await_lwt @@ Lwt_io.shutdown_server self -end - -module TCP_client = struct - let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a = - let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in - Unix.set_nonblock sock; - Unix.setsockopt sock Unix.TCP_NODELAY true; - - (* connect asynchronously *) - while - try - Unix.connect sock addr; - false - with - | Unix.Unix_error - ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _) - -> - Moonpool.Private.Suspend_.suspend - { - handle = - (fun ~run:_ ~resume sus -> - Perform_action_in_lwt.schedule - @@ Action.Wait_writable - ( sock, - fun ev -> - resume sus @@ Ok (); - Lwt_engine.stop_event ev )); - }; - true - do - () - done; - - let ic = IO_in.of_unix_fd sock in - let oc = IO_out.of_unix_fd sock in - - let finally () = try Unix.close sock with _ -> () in - let@ () = Fun.protect ~finally in - f ic oc -end +module TCP_server = Tcp_server +module TCP_client = Tcp_client diff --git a/src/lwt/moonpool_lwt.mli b/src/lwt/moonpool_lwt.mli index 509a9d72..cff4dbd1 100644 --- a/src/lwt/moonpool_lwt.mli +++ b/src/lwt/moonpool_lwt.mli @@ -66,12 +66,27 @@ module TCP_server : sig (Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) -> t + val establish' : + ?backlog:(* ?server_fd:Unix.file_descr -> *) + int -> + ?no_close:bool -> + runner:Moonpool.Runner.t -> + Unix.sockaddr -> + (Unix.sockaddr -> Lwt_io.input_channel -> Lwt_io.output_channel -> unit) -> + t + val shutdown : t -> unit end module TCP_client : sig + val connect : Unix.sockaddr -> Unix.file_descr + val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a (** Open a connection. *) + + val with_connect' : + Unix.sockaddr -> (Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) -> 'a + (** Open a connection. *) end (** {2 Helpers on the lwt side} *) diff --git a/src/lwt/tcp_client.ml b/src/lwt/tcp_client.ml new file mode 100644 index 00000000..c7db3880 --- /dev/null +++ b/src/lwt/tcp_client.ml @@ -0,0 +1,57 @@ +open Common_ +open Base + +let connect addr : Unix.file_descr = + let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + Unix.set_nonblock sock; + Unix.setsockopt sock Unix.TCP_NODELAY true; + + (* connect asynchronously *) + while + try + Unix.connect sock addr; + false + with + | Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _) + -> + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~run:_ ~resume sus -> + Perform_action_in_lwt.schedule + @@ Action.Wait_writable + ( sock, + fun ev -> + resume sus @@ Ok (); + Lwt_engine.stop_event ev )); + }; + true + do + () + done; + sock + +let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a = + let sock = connect addr in + + let ic = IO_in.of_unix_fd sock in + let oc = IO_out.of_unix_fd sock in + + let finally () = try Unix.close sock with _ -> () in + let@ () = Fun.protect ~finally in + f ic oc + +let with_connect' addr (f : Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) + : 'a = + let sock = connect addr in + + let ic = Lwt_io.of_unix_fd ~mode:Lwt_io.input sock in + let oc = Lwt_io.of_unix_fd ~mode:Lwt_io.output sock in + + let finally () = + (try Lwt_io.close ic |> await_lwt with _ -> ()); + (try Lwt_io.close oc |> await_lwt with _ -> ()); + try Unix.close sock with _ -> () + in + let@ () = Fun.protect ~finally in + f ic oc diff --git a/src/lwt/tcp_server.ml b/src/lwt/tcp_server.ml new file mode 100644 index 00000000..2b6605b0 --- /dev/null +++ b/src/lwt/tcp_server.ml @@ -0,0 +1,38 @@ +open Common_ +open Base + +type t = Lwt_io.server + +let establish' ?backlog ?no_close ~runner addr handler : t = + let server = + Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr + (fun client_addr client_sock -> + let ic = Lwt_io.of_fd ~mode:Lwt_io.input client_sock in + let oc = Lwt_io.of_fd ~mode:Lwt_io.output client_sock in + + let fut = + M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc) + in + + let lwt_fut = lwt_of_fut fut in + lwt_fut) + in + await_lwt server + +let establish ?backlog ?no_close ~runner addr handler : t = + let server = + Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr + (fun client_addr client_sock -> + let ic = IO_in.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in + let oc = IO_out.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in + + let fut = + M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc) + in + + let lwt_fut = lwt_of_fut fut in + lwt_fut) + in + await_lwt server + +let shutdown self = await_lwt @@ Lwt_io.shutdown_server self