wip: echo server using lwt

This commit is contained in:
Simon Cruanes 2025-07-09 17:43:06 -04:00
parent 295f22e770
commit 543135a0b0
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -3,6 +3,7 @@ module M_lwt = Moonpool_lwt
module Trace = Trace_core module Trace = Trace_core
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
let await_lwt = M_lwt.await_lwt
let spf = Printf.sprintf let spf = Printf.sprintf
let str_of_sockaddr = function let str_of_sockaddr = function
@ -10,31 +11,34 @@ let str_of_sockaddr = function
| Unix.ADDR_INET (addr, port) -> | Unix.ADDR_INET (addr, port) ->
spf "%s:%d" (Unix.string_of_inet_addr addr) port spf "%s:%d" (Unix.string_of_inet_addr addr) port
let main ~port ~runner () : unit Lwt.t = let main ~port ~runner:_ () : unit Lwt.t =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
let lwt_fut, _lwt_prom = Lwt.wait () in let lwt_fut, _lwt_prom = Lwt.wait () in
(* TODO: handle exit?? *) (* TODO: handle exit?? ctrl-c? *)
Printf.printf "listening on port %d\n%!" port; Printf.printf "listening on port %d\n%!" port;
let handle_client client_addr ic oc = let handle_client client_addr (ic, oc) : _ Lwt.t =
let@ () = M_lwt.spawn_lwt in
let _sp = let _sp =
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "handle.client" Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "handle.client"
~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ]) ~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ])
in in
Printf.printf "got new client\n%!";
let buf = Bytes.create 32 in let buf = Bytes.create 32 in
let continue = ref true in let continue = ref true in
while !continue do while !continue do
Trace.message "read"; Trace.message "read";
let n = M_lwt.IO_in.input ic buf 0 (Bytes.length buf) in let n = Lwt_io.read_into ic buf 0 (Bytes.length buf) |> await_lwt in
if n = 0 then if n = 0 then
continue := false continue := false
else ( else (
Trace.messagef (fun k -> k "got %dB" n); Trace.messagef (fun k -> k "got %dB" n);
M_lwt.IO_out.output oc buf 0 n; Lwt_io.write_from_exactly oc buf 0 n |> await_lwt;
M_lwt.IO_out.flush oc; Lwt_io.flush oc |> await_lwt;
Trace.message "write" Trace.message "write"
) )
done; done;
@ -43,7 +47,9 @@ let main ~port ~runner () : unit Lwt.t =
in in
let addr = Unix.ADDR_INET (Unix.inet_addr_any, port) in let addr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
let _server = M_lwt.TCP_server.establish ~runner addr handle_client in let _server =
Lwt_io.establish_server_with_client_address addr handle_client
in
lwt_fut lwt_fut