diff --git a/src/unix/async_io.ml b/src/unix/async_io.ml index 0d8c944e..302803c1 100644 --- a/src/unix/async_io.ml +++ b/src/unix/async_io.ml @@ -1,5 +1,6 @@ open Common_ module Slice = Iostream_types.Slice +module Fut = Moonpool.Fut let rec read (fd : Fd.t) buf i len : int = if len = 0 || Fd.closed fd then @@ -145,9 +146,13 @@ module TCP_server = struct method running : unit -> bool method run : unit -> unit method stop : unit -> unit + method await : unit -> unit end - let run (self : #t) = self#run () + let[@inline] run (self : #t) = self#run () + let[@inline] stop (self : #t) = self#stop () + let[@inline] endpoint (self : #t) = self#endpoint () + let[@inline] await (self : #t) = self#await () type state = | Created @@ -166,12 +171,19 @@ module TCP_server = struct t = let n_active_ = A.make 0 in let st = A.make Created in + let fut, promise = Fut.make () in object method endpoint () = addr method active_connections () = A.get n_active_ method running () = A.get st = Running - method stop () = if A.exchange st Stopped = Running then ( (* TODO *) ) + + method stop () = + match A.exchange st Stopped with + | Stopped -> () + | Created | Running -> Fut.fulfill_idempotent promise @@ Ok () + + method await () = Fut.await fut method run () = (* set to Running *) @@ -196,8 +208,10 @@ module TCP_server = struct Unix.listen sock listen; sock with e -> + let bt = Printexc.get_raw_backtrace () in A.set st Stopped; - raise e + Fut.fulfill_idempotent promise @@ Error (Exn_bt.make e bt); + Printexc.raise_with_backtrace e bt in while A.get st = Running do let client_sock, client_addr = accept_ sock in @@ -233,6 +247,15 @@ module TCP_server = struct in after_init self; self + + let with_server ?listen ?buf_pool ?buf_size ~runner ~handle addr (f : _ -> 'a) + : 'a = + let server = + new base_server ?listen ?buf_pool ?buf_size ~runner ~handle addr + in + run server; + let@ () = Fun.protect ~finally:(fun () -> stop server) in + f server end module TCP_client = struct diff --git a/src/unix/async_io.mli b/src/unix/async_io.mli index f044ee3f..af164ef9 100644 --- a/src/unix/async_io.mli +++ b/src/unix/async_io.mli @@ -84,8 +84,16 @@ module TCP_server : sig method stop : unit -> unit (** Ask the server to stop. This might not take effect immediately, and is idempotent. After this [server.running()] must return [false]. *) + + method await : unit -> unit + (** Wait for the server to stop running *) end + val stop : #t -> unit + val run : #t -> unit + val endpoint : #t -> Sockaddr.t + val await : #t -> unit + class base_server : ?listen:int -> ?buf_pool:Buf_pool.t -> @@ -105,5 +113,13 @@ module TCP_server : sig Sockaddr.t -> t - val run : #t -> unit + val with_server : + ?listen:int -> + ?buf_pool:Buf_pool.t -> + ?buf_size:int -> + runner:Moonpool.Runner.t -> + handle:conn_handler -> + Sockaddr.t -> + (t -> 'a) -> + 'a end diff --git a/src/unix/ev_loop.ml b/src/unix/ev_loop.ml index 66623a40..8c265d9d 100644 --- a/src/unix/ev_loop.ml +++ b/src/unix/ev_loop.ml @@ -222,50 +222,34 @@ end let current_ : Ev_loop.t option A.t = A.make None -let rec set_as_current_ (ev : Ev_loop.t) : bool = +let rec get_or_set_as_current_ (ev : Ev_loop.t) : Ev_loop.t * bool = match A.get current_ with - | Some _ -> false + | Some ev -> ev, false | None -> if A.compare_and_set current_ None (Some ev) then - true + ev, true else - set_as_current_ ev + get_or_set_as_current_ ev -let with_loop ~runner f = - let@ _sp = Tracing_.with_span "Moonpool_unix.main" in - - let ev_loop = Ev_loop.create () in - if not (set_as_current_ ev_loop) then - failwith "Moonpool_unix: the event loop is already active"; - - (* schedule [f] on the pool *) - let fib = Fiber.spawn_top ~on:runner f in - - while not (Fiber.is_done fib) do +let bg_loop_ (ev_loop : Ev_loop.t) = + let@ _sp = Tracing_.with_span "Moonpool_unix.bg-loop" in + while true do Ev_loop.run_step_ ev_loop - done; - A.set current_ None; + done - (* return result of [fib] *) - Moonpool.Fut.get_or_fail_exn @@ Fiber.res fib - -let start_background_loop () = - let run () = - let@ _sp = Tracing_.with_span "Moonpool_unix.bg-loop" in - let ev_loop = Ev_loop.create () in - if set_as_current_ ev_loop then - while true do - Ev_loop.run_step_ ev_loop - done - in - ignore (Thread.create run () : Thread.t) +let[@inline never] start_background_loop () : Ev_loop.t = + let ev_loop = Ev_loop.create () in + let ev_loop, we_are_it = get_or_set_as_current_ ev_loop in + (* start the background thread *) + if we_are_it then ignore (Thread.create bg_loop_ ev_loop : Thread.t); + ev_loop (* ### external inputs *) let[@inline] get_current_ () = match A.get current_ with - | None -> failwith "Moonpool_unix: event loop is not active" | Some ev -> ev + | None -> start_background_loop () let interrupt_if_in_blocking_section_ (self : Ev_loop.t) = if A.get self.in_blocking_section then ( diff --git a/src/unix/ev_loop.mli b/src/unix/ev_loop.mli index 48bab911..4f0e1ebe 100644 --- a/src/unix/ev_loop.mli +++ b/src/unix/ev_loop.mli @@ -8,11 +8,3 @@ val wait_writable : val run_after_s : float -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit val run_every_s : float -> Cancel_handle.t -> (Cancel_handle.t -> unit) -> unit - -val with_loop : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a -(** Run with the event loop processed in the current thread. There can - only be one such loop running at a time. - @raise Failure if another call to {!with_loop} is already in effect. *) - -val start_background_loop : unit -> unit -(** Start the event loop in a new background thread. Idempotent. *) diff --git a/src/unix/moonpool_unix.ml b/src/unix/moonpool_unix.ml index ca87a3f3..cde342f2 100644 --- a/src/unix/moonpool_unix.ml +++ b/src/unix/moonpool_unix.ml @@ -12,8 +12,9 @@ module Fut = Moonpool.Fut module Fd = Fd module Cancel_handle = Cancel_handle +module Sockaddr = Sockaddr include Async_io let run_after_s = Ev_loop.run_after_s let run_every_s = Ev_loop.run_every_s -let main = Ev_loop.with_loop +let main = Moonpool_fib.main diff --git a/src/unix/sockaddr.ml b/src/unix/sockaddr.ml index 6089cefd..3f7958ac 100644 --- a/src/unix/sockaddr.ml +++ b/src/unix/sockaddr.ml @@ -5,6 +5,10 @@ let show = function | Unix.ADDR_INET (addr, port) -> Printf.sprintf "%s:%d" (Unix.string_of_inet_addr addr) port +let unix str : t = Unix.ADDR_UNIX str +let inet addr port : t = Unix.ADDR_INET (addr, port) +let localhost port : t = inet Unix.inet_addr_loopback port +let any port : t = inet Unix.inet_addr_any port let pp out (self : t) = Format.pp_print_string out (show self) let domain = function diff --git a/test/unix/dune b/test/unix/dune index 9bd54ad2..8098a9e9 100644 --- a/test/unix/dune +++ b/test/unix/dune @@ -1,3 +1,4 @@ (executables (names t_hash echo_server) - (libraries moonpool moonpool.unix trace.core trace-tef)) + ;(package moonpool-unix) + (libraries moonpool moonpool-unix trace.core trace-tef)) diff --git a/test/unix/echo_server.ml b/test/unix/echo_server.ml index e9e0426e..12683bb0 100644 --- a/test/unix/echo_server.ml +++ b/test/unix/echo_server.ml @@ -3,44 +3,49 @@ module MU = Moonpool_unix module Trace = Trace_core let ( let@ ) = ( @@ ) -let spf = Printf.sprintf - -let str_of_sockaddr = function - | Unix.ADDR_UNIX s -> s - | Unix.ADDR_INET (addr, port) -> - spf "%s:%d" (Unix.string_of_inet_addr addr) port let main ~port ~j () : unit = let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:j () in - let@ () = MU.main ~runner in + let@ _main_runner = MU.main in Trace.set_thread_name "main"; Printf.printf "IN MAIN\n%!"; + Trace.message "foo1"; - MU.TCP_server.with_server ~port ~runner () - ~after_init:(fun self -> - Printf.printf "listening on port %d\n%!" (MU.TCP_server.port self)) - ~handle_client:(fun _server addr ic oc -> - let@ _sp = - Trace.with_span ~__FILE__ ~__LINE__ "handle.client" ~data:(fun () -> - [ "addr", `String (str_of_sockaddr addr) ]) - in + let@ server = + MU.TCP_server.with_server ~runner (MU.Sockaddr.any port) + ~handle:(fun ~client_addr:addr ic oc -> + Trace.message "got new client"; + let@ _sp = + Trace.with_span ~__FILE__ ~__LINE__ "handle.client" ~data:(fun () -> + [ "addr", `String (MU.Sockaddr.show addr) ]) + in - let buf = Bytes.create 32 in - let continue = ref true in - while !continue do - Trace.message "read"; - let n = MU.IO_in.input ic buf 0 (Bytes.length buf) in - if n = 0 then continue := false; - Trace.messagef (fun k -> k "got %dB" n); - MU.IO_out.output oc buf 0 n; - MU.IO_out.flush oc; - Trace.message "write" - (* MU.sleep_s 0.02 *) - done) + let buf = Bytes.create 32 in + let continue = ref true in + while !continue do + Trace.message "read"; + let n = Iostream.In_buf.input ic buf 0 (Bytes.length buf) in + if n = 0 then continue := false; + Trace.messagef (fun k -> k "got %dB" n); + Iostream.Out_buf.output oc buf 0 n; + Iostream.Out_buf.flush oc; + Trace.message "write" + (* MU.sleep_s 0.02 *) + done) + in + + Trace.messagef (fun k -> + k "server established on %s" + (MU.Sockaddr.show @@ MU.TCP_server.endpoint server)); + Printf.printf "listening on %s\n%!" + (MU.Sockaddr.show @@ MU.TCP_server.endpoint server); + MU.TCP_server.await server; + () let () = + Sys.catch_break true; let@ () = Trace_tef.with_setup () in Trace.set_thread_name "entry"; let port = ref 0 in