feat lwt: non-blocking TCP_client.with_connect

This commit is contained in:
Simon Cruanes 2024-02-08 23:05:12 -05:00
parent 38b84e0c03
commit f6d67028cf
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 44 additions and 0 deletions

View file

@ -29,3 +29,42 @@ module TCP_server = struct
let shutdown self = await_lwt @@ Lwt_io.shutdown_server self let shutdown self = await_lwt @@ Lwt_io.shutdown_server self
end end
module TCP_client = struct
let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.set_nonblock sock;
Unix.setsockopt sock Unix.TCP_NODELAY true;
(* connect asynchronously *)
while
try
Unix.connect sock addr;
false
with
| Unix.Unix_error
((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _)
->
Moonpool.Private.Suspend_.suspend
{
handle =
(fun ~ls ~run:_ ~resume sus ->
Perform_action_in_lwt.schedule
@@ Action.Wait_writable
( sock,
fun ev ->
resume ~ls sus @@ Ok ();
Lwt_engine.stop_event ev ));
};
true
do
()
done;
let ic = IO_in.of_unix_fd sock in
let oc = IO_out.of_unix_fd sock in
let finally () = try Unix.close sock with _ -> () in
let@ () = Fun.protect ~finally in
f ic oc
end

View file

@ -69,6 +69,11 @@ module TCP_server : sig
val shutdown : t -> unit val shutdown : t -> unit
end end
module TCP_client : sig
val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a
(** Open a connection. *)
end
(** {2 Helpers on the lwt side} *) (** {2 Helpers on the lwt side} *)
val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t