wip: TCP server

This commit is contained in:
Simon Cruanes 2024-06-24 11:24:22 -04:00
parent dc5af2e7e6
commit f6852172e6
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
8 changed files with 98 additions and 72 deletions

View file

@ -1,5 +1,6 @@
open Common_
module Slice = Iostream_types.Slice
module Fut = Moonpool.Fut
let rec read (fd : Fd.t) buf i len : int =
if len = 0 || Fd.closed fd then
@ -145,9 +146,13 @@ module TCP_server = struct
method running : unit -> bool
method run : unit -> unit
method stop : unit -> unit
method await : unit -> unit
end
let run (self : #t) = self#run ()
let[@inline] run (self : #t) = self#run ()
let[@inline] stop (self : #t) = self#stop ()
let[@inline] endpoint (self : #t) = self#endpoint ()
let[@inline] await (self : #t) = self#await ()
type state =
| Created
@ -166,12 +171,19 @@ module TCP_server = struct
t =
let n_active_ = A.make 0 in
let st = A.make Created in
let fut, promise = Fut.make () in
object
method endpoint () = addr
method active_connections () = A.get n_active_
method running () = A.get st = Running
method stop () = if A.exchange st Stopped = Running then ( (* TODO *) )
method stop () =
match A.exchange st Stopped with
| Stopped -> ()
| Created | Running -> Fut.fulfill_idempotent promise @@ Ok ()
method await () = Fut.await fut
method run () =
(* set to Running *)
@ -196,8 +208,10 @@ module TCP_server = struct
Unix.listen sock listen;
sock
with e ->
let bt = Printexc.get_raw_backtrace () in
A.set st Stopped;
raise e
Fut.fulfill_idempotent promise @@ Error (Exn_bt.make e bt);
Printexc.raise_with_backtrace e bt
in
while A.get st = Running do
let client_sock, client_addr = accept_ sock in
@ -233,6 +247,15 @@ module TCP_server = struct
in
after_init self;
self
let with_server ?listen ?buf_pool ?buf_size ~runner ~handle addr (f : _ -> 'a)
: 'a =
let server =
new base_server ?listen ?buf_pool ?buf_size ~runner ~handle addr
in
run server;
let@ () = Fun.protect ~finally:(fun () -> stop server) in
f server
end
module TCP_client = struct

View file

@ -84,8 +84,16 @@ module TCP_server : sig
method stop : unit -> unit
(** Ask the server to stop. This might not take effect immediately,
and is idempotent. After this [server.running()] must return [false]. *)
method await : unit -> unit
(** Wait for the server to stop running *)
end
val stop : #t -> unit
val run : #t -> unit
val endpoint : #t -> Sockaddr.t
val await : #t -> unit
class base_server :
?listen:int ->
?buf_pool:Buf_pool.t ->
@ -105,5 +113,13 @@ module TCP_server : sig
Sockaddr.t ->
t
val run : #t -> unit
val with_server :
?listen:int ->
?buf_pool:Buf_pool.t ->
?buf_size:int ->
runner:Moonpool.Runner.t ->
handle:conn_handler ->
Sockaddr.t ->
(t -> 'a) ->
'a
end

View file

@ -222,50 +222,34 @@ end
let current_ : Ev_loop.t option A.t = A.make None
let rec set_as_current_ (ev : Ev_loop.t) : bool =
let rec get_or_set_as_current_ (ev : Ev_loop.t) : Ev_loop.t * bool =
match A.get current_ with
| Some _ -> false
| Some ev -> ev, false
| None ->
if A.compare_and_set current_ None (Some ev) then
true
ev, true
else
set_as_current_ ev
get_or_set_as_current_ ev
let with_loop ~runner f =
let@ _sp = Tracing_.with_span "Moonpool_unix.main" in
let ev_loop = Ev_loop.create () in
if not (set_as_current_ ev_loop) then
failwith "Moonpool_unix: the event loop is already active";
(* schedule [f] on the pool *)
let fib = Fiber.spawn_top ~on:runner f in
while not (Fiber.is_done fib) do
Ev_loop.run_step_ ev_loop
done;
A.set current_ None;
(* return result of [fib] *)
Moonpool.Fut.get_or_fail_exn @@ Fiber.res fib
let start_background_loop () =
let run () =
let bg_loop_ (ev_loop : Ev_loop.t) =
let@ _sp = Tracing_.with_span "Moonpool_unix.bg-loop" in
let ev_loop = Ev_loop.create () in
if set_as_current_ ev_loop then
while true do
Ev_loop.run_step_ ev_loop
done
in
ignore (Thread.create run () : Thread.t)
let[@inline never] start_background_loop () : Ev_loop.t =
let ev_loop = Ev_loop.create () in
let ev_loop, we_are_it = get_or_set_as_current_ ev_loop in
(* start the background thread *)
if we_are_it then ignore (Thread.create bg_loop_ ev_loop : Thread.t);
ev_loop
(* ### external inputs *)
let[@inline] get_current_ () =
match A.get current_ with
| None -> failwith "Moonpool_unix: event loop is not active"
| Some ev -> ev
| None -> start_background_loop ()
let interrupt_if_in_blocking_section_ (self : Ev_loop.t) =
if A.get self.in_blocking_section then (

View file

@ -8,11 +8,3 @@ val wait_writable :
val run_after_s : float -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit
val run_every_s : float -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit
val with_loop : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a
(** Run with the event loop processed in the current thread. There can
only be one such loop running at a time.
@raise Failure if another call to {!with_loop} is already in effect. *)
val start_background_loop : unit -> unit
(** Start the event loop in a new background thread. Idempotent. *)

View file

@ -12,8 +12,9 @@ module Fut = Moonpool.Fut
module Fd = Fd
module Cancel_handle = Cancel_handle
module Sockaddr = Sockaddr
include Async_io
let run_after_s = Ev_loop.run_after_s
let run_every_s = Ev_loop.run_every_s
let main = Ev_loop.with_loop
let main = Moonpool_fib.main

View file

@ -5,6 +5,10 @@ let show = function
| Unix.ADDR_INET (addr, port) ->
Printf.sprintf "%s:%d" (Unix.string_of_inet_addr addr) port
let unix str : t = Unix.ADDR_UNIX str
let inet addr port : t = Unix.ADDR_INET (addr, port)
let localhost port : t = inet Unix.inet_addr_loopback port
let any port : t = inet Unix.inet_addr_any port
let pp out (self : t) = Format.pp_print_string out (show self)
let domain = function

View file

@ -1,3 +1,4 @@
(executables
(names t_hash echo_server)
(libraries moonpool moonpool.unix trace.core trace-tef))
;(package moonpool-unix)
(libraries moonpool moonpool-unix trace.core trace-tef))

View file

@ -3,44 +3,49 @@ module MU = Moonpool_unix
module Trace = Trace_core
let ( let@ ) = ( @@ )
let spf = Printf.sprintf
let str_of_sockaddr = function
| Unix.ADDR_UNIX s -> s
| Unix.ADDR_INET (addr, port) ->
spf "%s:%d" (Unix.string_of_inet_addr addr) port
let main ~port ~j () : unit =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:j () in
let@ () = MU.main ~runner in
let@ _main_runner = MU.main in
Trace.set_thread_name "main";
Printf.printf "IN MAIN\n%!";
Trace.message "foo1";
MU.TCP_server.with_server ~port ~runner ()
~after_init:(fun self ->
Printf.printf "listening on port %d\n%!" (MU.TCP_server.port self))
~handle_client:(fun _server addr ic oc ->
let@ server =
MU.TCP_server.with_server ~runner (MU.Sockaddr.any port)
~handle:(fun ~client_addr:addr ic oc ->
Trace.message "got new client";
let@ _sp =
Trace.with_span ~__FILE__ ~__LINE__ "handle.client" ~data:(fun () ->
[ "addr", `String (str_of_sockaddr addr) ])
[ "addr", `String (MU.Sockaddr.show addr) ])
in
let buf = Bytes.create 32 in
let continue = ref true in
while !continue do
Trace.message "read";
let n = MU.IO_in.input ic buf 0 (Bytes.length buf) in
let n = Iostream.In_buf.input ic buf 0 (Bytes.length buf) in
if n = 0 then continue := false;
Trace.messagef (fun k -> k "got %dB" n);
MU.IO_out.output oc buf 0 n;
MU.IO_out.flush oc;
Iostream.Out_buf.output oc buf 0 n;
Iostream.Out_buf.flush oc;
Trace.message "write"
(* MU.sleep_s 0.02 *)
done)
in
Trace.messagef (fun k ->
k "server established on %s"
(MU.Sockaddr.show @@ MU.TCP_server.endpoint server));
Printf.printf "listening on %s\n%!"
(MU.Sockaddr.show @@ MU.TCP_server.endpoint server);
MU.TCP_server.await server;
()
let () =
Sys.catch_break true;
let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "entry";
let port = ref 0 in