From f6d67028cf28ba80588db1af23ffb2ee58e2868d Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 8 Feb 2024 23:05:12 -0500 Subject: [PATCH] feat lwt: non-blocking TCP_client.with_connect --- src/lwt/moonpool_lwt.ml | 39 +++++++++++++++++++++++++++++++++++++++ src/lwt/moonpool_lwt.mli | 5 +++++ 2 files changed, 44 insertions(+) diff --git a/src/lwt/moonpool_lwt.ml b/src/lwt/moonpool_lwt.ml index 95166d12..c3133d2e 100644 --- a/src/lwt/moonpool_lwt.ml +++ b/src/lwt/moonpool_lwt.ml @@ -29,3 +29,42 @@ module TCP_server = struct let shutdown self = await_lwt @@ Lwt_io.shutdown_server self end + +module TCP_client = struct + let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a = + let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + Unix.set_nonblock sock; + Unix.setsockopt sock Unix.TCP_NODELAY true; + + (* connect asynchronously *) + while + try + Unix.connect sock addr; + false + with + | Unix.Unix_error + ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _) + -> + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~ls ~run:_ ~resume sus -> + Perform_action_in_lwt.schedule + @@ Action.Wait_writable + ( sock, + fun ev -> + resume ~ls sus @@ Ok (); + Lwt_engine.stop_event ev )); + }; + true + do + () + done; + + let ic = IO_in.of_unix_fd sock in + let oc = IO_out.of_unix_fd sock in + + let finally () = try Unix.close sock with _ -> () in + let@ () = Fun.protect ~finally in + f ic oc +end diff --git a/src/lwt/moonpool_lwt.mli b/src/lwt/moonpool_lwt.mli index af5b495e..509a9d72 100644 --- a/src/lwt/moonpool_lwt.mli +++ b/src/lwt/moonpool_lwt.mli @@ -69,6 +69,11 @@ module TCP_server : sig val shutdown : t -> unit end +module TCP_client : sig + val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a + (** Open a connection. *) +end + (** {2 Helpers on the lwt side} *) val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t