lwt: add TCP client/server functions that use lwt channels

This commit is contained in:
Simon Cruanes 2024-02-14 22:10:13 -05:00
parent e789cbe4f7
commit 0df0642dd1
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 112 additions and 63 deletions

View file

@ -1,4 +1,3 @@
open Common_
open Base
let rec read fd buf i len : int =

View file

@ -1,66 +1,6 @@
open Common_
include Base
module IO = IO
module IO_out = IO_out
module IO_in = IO_in
module TCP_server = struct
type t = Lwt_io.server
let establish ?backlog ?no_close ~runner addr handler : t =
let server =
Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr
(fun client_addr client_sock ->
let ic = IO_in.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in
let oc = IO_out.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in
let fut =
M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc)
in
let lwt_fut = lwt_of_fut fut in
lwt_fut)
in
await_lwt server
let shutdown self = await_lwt @@ Lwt_io.shutdown_server self
end
module TCP_client = struct
let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.set_nonblock sock;
Unix.setsockopt sock Unix.TCP_NODELAY true;
(* connect asynchronously *)
while
try
Unix.connect sock addr;
false
with
| Unix.Unix_error
((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _)
->
Moonpool.Private.Suspend_.suspend
{
handle =
(fun ~run:_ ~resume sus ->
Perform_action_in_lwt.schedule
@@ Action.Wait_writable
( sock,
fun ev ->
resume sus @@ Ok ();
Lwt_engine.stop_event ev ));
};
true
do
()
done;
let ic = IO_in.of_unix_fd sock in
let oc = IO_out.of_unix_fd sock in
let finally () = try Unix.close sock with _ -> () in
let@ () = Fun.protect ~finally in
f ic oc
end
module TCP_server = Tcp_server
module TCP_client = Tcp_client

View file

@ -66,12 +66,27 @@ module TCP_server : sig
(Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) ->
t
val establish' :
?backlog:(* ?server_fd:Unix.file_descr -> *)
int ->
?no_close:bool ->
runner:Moonpool.Runner.t ->
Unix.sockaddr ->
(Unix.sockaddr -> Lwt_io.input_channel -> Lwt_io.output_channel -> unit) ->
t
val shutdown : t -> unit
end
module TCP_client : sig
val connect : Unix.sockaddr -> Unix.file_descr
val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a
(** Open a connection. *)
val with_connect' :
Unix.sockaddr -> (Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) -> 'a
(** Open a connection. *)
end
(** {2 Helpers on the lwt side} *)

57
src/lwt/tcp_client.ml Normal file
View file

@ -0,0 +1,57 @@
open Common_
open Base
let connect addr : Unix.file_descr =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.set_nonblock sock;
Unix.setsockopt sock Unix.TCP_NODELAY true;
(* connect asynchronously *)
while
try
Unix.connect sock addr;
false
with
| Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _)
->
Moonpool.Private.Suspend_.suspend
{
handle =
(fun ~run:_ ~resume sus ->
Perform_action_in_lwt.schedule
@@ Action.Wait_writable
( sock,
fun ev ->
resume sus @@ Ok ();
Lwt_engine.stop_event ev ));
};
true
do
()
done;
sock
let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a =
let sock = connect addr in
let ic = IO_in.of_unix_fd sock in
let oc = IO_out.of_unix_fd sock in
let finally () = try Unix.close sock with _ -> () in
let@ () = Fun.protect ~finally in
f ic oc
let with_connect' addr (f : Lwt_io.input_channel -> Lwt_io.output_channel -> 'a)
: 'a =
let sock = connect addr in
let ic = Lwt_io.of_unix_fd ~mode:Lwt_io.input sock in
let oc = Lwt_io.of_unix_fd ~mode:Lwt_io.output sock in
let finally () =
(try Lwt_io.close ic |> await_lwt with _ -> ());
(try Lwt_io.close oc |> await_lwt with _ -> ());
try Unix.close sock with _ -> ()
in
let@ () = Fun.protect ~finally in
f ic oc

38
src/lwt/tcp_server.ml Normal file
View file

@ -0,0 +1,38 @@
open Common_
open Base
type t = Lwt_io.server
let establish' ?backlog ?no_close ~runner addr handler : t =
let server =
Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr
(fun client_addr client_sock ->
let ic = Lwt_io.of_fd ~mode:Lwt_io.input client_sock in
let oc = Lwt_io.of_fd ~mode:Lwt_io.output client_sock in
let fut =
M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc)
in
let lwt_fut = lwt_of_fut fut in
lwt_fut)
in
await_lwt server
let establish ?backlog ?no_close ~runner addr handler : t =
let server =
Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr
(fun client_addr client_sock ->
let ic = IO_in.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in
let oc = IO_out.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in
let fut =
M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc)
in
let lwt_fut = lwt_of_fut fut in
lwt_fut)
in
await_lwt server
let shutdown self = await_lwt @@ Lwt_io.shutdown_server self