diff --git a/src/lwt/moonpool_lwt.ml b/src/lwt/moonpool_lwt.ml index 95166d12..c3133d2e 100644 --- a/src/lwt/moonpool_lwt.ml +++ b/src/lwt/moonpool_lwt.ml @@ -29,3 +29,42 @@ module TCP_server = struct let shutdown self = await_lwt @@ Lwt_io.shutdown_server self end + +module TCP_client = struct + let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a = + let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + Unix.set_nonblock sock; + Unix.setsockopt sock Unix.TCP_NODELAY true; + + (* connect asynchronously *) + while + try + Unix.connect sock addr; + false + with + | Unix.Unix_error + ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _) + -> + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~ls ~run:_ ~resume sus -> + Perform_action_in_lwt.schedule + @@ Action.Wait_writable + ( sock, + fun ev -> + resume ~ls sus @@ Ok (); + Lwt_engine.stop_event ev )); + }; + true + do + () + done; + + let ic = IO_in.of_unix_fd sock in + let oc = IO_out.of_unix_fd sock in + + let finally () = try Unix.close sock with _ -> () in + let@ () = Fun.protect ~finally in + f ic oc +end diff --git a/src/lwt/moonpool_lwt.mli b/src/lwt/moonpool_lwt.mli index af5b495e..509a9d72 100644 --- a/src/lwt/moonpool_lwt.mli +++ b/src/lwt/moonpool_lwt.mli @@ -69,6 +69,11 @@ module TCP_server : sig val shutdown : t -> unit end +module TCP_client : sig + val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a + (** Open a connection. *) +end + (** {2 Helpers on the lwt side} *) val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t