diff --git a/test/lwt/echo_server.ml b/test/lwt/echo_server.ml index 722bcf69..e58a6a01 100644 --- a/test/lwt/echo_server.ml +++ b/test/lwt/echo_server.ml @@ -3,6 +3,7 @@ module M_lwt = Moonpool_lwt module Trace = Trace_core let ( let@ ) = ( @@ ) +let await_lwt = M_lwt.await_lwt let spf = Printf.sprintf let str_of_sockaddr = function @@ -10,31 +11,34 @@ let str_of_sockaddr = function | Unix.ADDR_INET (addr, port) -> spf "%s:%d" (Unix.string_of_inet_addr addr) port -let main ~port ~runner () : unit Lwt.t = +let main ~port ~runner:_ () : unit Lwt.t = let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in let lwt_fut, _lwt_prom = Lwt.wait () in - (* TODO: handle exit?? *) + (* TODO: handle exit?? ctrl-c? *) Printf.printf "listening on port %d\n%!" port; - let handle_client client_addr ic oc = + let handle_client client_addr (ic, oc) : _ Lwt.t = + let@ () = M_lwt.spawn_lwt in let _sp = - Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "handle.client" + Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "handle.client" ~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ]) in + Printf.printf "got new client\n%!"; + let buf = Bytes.create 32 in let continue = ref true in while !continue do Trace.message "read"; - let n = M_lwt.IO_in.input ic buf 0 (Bytes.length buf) in + let n = Lwt_io.read_into ic buf 0 (Bytes.length buf) |> await_lwt in if n = 0 then continue := false else ( Trace.messagef (fun k -> k "got %dB" n); - M_lwt.IO_out.output oc buf 0 n; - M_lwt.IO_out.flush oc; + Lwt_io.write_from_exactly oc buf 0 n |> await_lwt; + Lwt_io.flush oc |> await_lwt; Trace.message "write" ) done; @@ -43,7 +47,9 @@ let main ~port ~runner () : unit Lwt.t = in let addr = Unix.ADDR_INET (Unix.inet_addr_any, port) in - let _server = M_lwt.TCP_server.establish ~runner addr handle_client in + let _server = + Lwt_io.establish_server_with_client_address addr handle_client + in lwt_fut