diff --git a/src/lwt/IO.ml b/src/lwt/IO.ml new file mode 100644 index 00000000..cc962e67 --- /dev/null +++ b/src/lwt/IO.ml @@ -0,0 +1,71 @@ +open Common_ +open Base + +let rec read fd buf i len : int = + if len = 0 then + 0 + else ( + match Unix.read fd buf i len with + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + (* wait for FD to be ready *) + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~ls ~run:_ ~resume sus -> + Perform_action_in_lwt.schedule + @@ Action.Wait_readable + ( fd, + fun cancel -> + resume ~ls sus @@ Ok (); + Lwt_engine.stop_event cancel )); + }; + read fd buf i len + | n -> n + ) + +let rec write_once fd buf i len : int = + if len = 0 then + 0 + else ( + match Unix.write fd buf i len with + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + (* wait for FD to be ready *) + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~ls ~run:_ ~resume sus -> + Perform_action_in_lwt.schedule + @@ Action.Wait_writable + ( fd, + fun cancel -> + resume ~ls sus @@ Ok (); + Lwt_engine.stop_event cancel )); + }; + write_once fd buf i len + | n -> n + ) + +let write fd buf i len : unit = + let i = ref i in + let len = ref len in + while !len > 0 do + let n = write_once fd buf !i !len in + i := !i + n; + len := !len - n + done + +(** Sleep for the given amount of seconds *) +let sleep_s (f : float) : unit = + if f > 0. then + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~ls ~run:_ ~resume sus -> + Perform_action_in_lwt.schedule + @@ Action.Sleep + ( f, + false, + fun cancel -> + resume ~ls sus @@ Ok (); + Lwt_engine.stop_event cancel )); + } diff --git a/src/lwt/IO_in.ml b/src/lwt/IO_in.ml new file mode 100644 index 00000000..a0e2744a --- /dev/null +++ b/src/lwt/IO_in.ml @@ -0,0 +1,154 @@ +open Common_ + +class type t = + object + method input : bytes -> int -> int -> int + (** Read into the slice. Returns [0] only if the + stream is closed. *) + + method close : unit -> unit + (** Close the input. Must be idempotent. *) + end + +let create ?(close = ignore) ~input () : t = + object + method close = close + method input = input + end + +let empty : t = + object + method close () = () + method input _ _ _ = 0 + end + +let of_bytes ?(off = 0) ?len (b : bytes) : t = + (* i: current position in [b] *) + let i = ref off in + + let len = + match len with + | Some n -> + if n > Bytes.length b - off then invalid_arg "Iostream.In.of_bytes"; + n + | None -> Bytes.length b - off + in + let end_ = off + len in + + object + method input b_out i_out len_out = + let n = min (end_ - !i) len_out in + Bytes.blit b !i b_out i_out n; + i := !i + n; + n + + method close () = i := end_ + end + +let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s) + +(** Read into the given slice. + @return the number of bytes read, [0] means end of input. *) +let[@inline] input (self : #t) buf i len = self#input buf i len + +(** Close the channel. *) +let[@inline] close self : unit = self#close () + +let rec really_input (self : #t) buf i len = + if len > 0 then ( + let n = input self buf i len in + if n = 0 then raise End_of_file; + (really_input [@tailrec]) self buf (i + n) (len - n) + ) + +let really_input_string self n : string = + let buf = Bytes.create n in + really_input self buf 0 n; + Bytes.unsafe_to_string buf + +let copy_into ?(buf = Bytes.create _default_buf_size) (ic : #t) (oc : IO_out.t) + : unit = + let continue = ref true in + while !continue do + let len = input ic buf 0 (Bytes.length buf) in + if len = 0 then + continue := false + else + IO_out.output oc buf 0 len + done + +let concat (l0 : t list) : t = + let l = ref l0 in + let rec input b i len : int = + match !l with + | [] -> 0 + | ic :: tl -> + let n = ic#input b i len in + if n > 0 then + n + else ( + l := tl; + input b i len + ) + in + let close () = List.iter close l0 in + create ~close ~input () + +let input_all ?(buf = Bytes.create 128) (self : #t) : string = + let buf = ref buf in + let i = ref 0 in + + let[@inline] full_ () = !i = Bytes.length !buf in + + let grow_ () = + let old_size = Bytes.length !buf in + let new_size = min Sys.max_string_length (old_size + (old_size / 4) + 10) in + if old_size = new_size then + failwith "input_all: maximum input size exceeded"; + let new_buf = Bytes.extend !buf 0 (new_size - old_size) in + buf := new_buf + in + + let rec loop () = + if full_ () then grow_ (); + let available = Bytes.length !buf - !i in + let n = input self !buf !i available in + if n > 0 then ( + i := !i + n; + (loop [@tailrec]) () + ) + in + loop (); + + if full_ () then + Bytes.unsafe_to_string !buf + else + Bytes.sub_string !buf 0 !i + +let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) + (fd : Unix.file_descr) : t = + let buf_len = ref 0 in + let buf_off = ref 0 in + + let refill () = + buf_off := 0; + buf_len := IO.read fd buf 0 (Bytes.length buf) + in + + object + method input b i len : int = + if !buf_len = 0 then refill (); + let n = min len !buf_len in + if n > 0 then ( + Bytes.blit buf !buf_off b i n; + buf_off := !buf_off + n; + buf_len := !buf_len - n + ); + n + + method close () = + if close_noerr then ( + try Unix.close fd with _ -> () + ) else + Unix.close fd + end diff --git a/src/lwt/IO_out.ml b/src/lwt/IO_out.ml new file mode 100644 index 00000000..9c1207dc --- /dev/null +++ b/src/lwt/IO_out.ml @@ -0,0 +1,119 @@ +open Common_ + +class type t = + object + method output_char : char -> unit + method output : bytes -> int -> int -> unit + method flush : unit -> unit + method close : unit -> unit + end + +let create ?(flush = ignore) ?(close = ignore) ~output_char ~output () : t = + object + method flush () = flush () + method close () = close () + method output_char c = output_char c + method output bs i len = output bs i len + end + +let dummy : t = + object + method flush () = () + method close () = () + method output_char _ = () + method output _ _ _ = () + end + +let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) fd + : t = + let buf_off = ref 0 in + + let[@inline] is_full () = !buf_off = Bytes.length buf in + + let flush () = + if !buf_off > 0 then ( + IO.write fd buf 0 !buf_off; + buf_off := 0 + ) + in + + object + method output_char c = + if is_full () then flush (); + Bytes.set buf !buf_off c; + incr buf_off + + method output bs i len : unit = + let i = ref i in + let len = ref len in + + while !len > 0 do + (* make space *) + if is_full () then flush (); + + let n = min !len (Bytes.length buf - !buf_off) in + Bytes.blit bs !i buf !buf_off n; + buf_off := !buf_off + n; + i := !i + n; + len := !len - n + done; + (* if full, write eagerly *) + if is_full () then flush () + + method close () = + if close_noerr then ( + try + flush (); + Unix.close fd + with _ -> () + ) else ( + flush (); + Unix.close fd + ) + + method flush = flush + end + +let of_buffer (buf : Buffer.t) : t = + object + method close () = () + method flush () = () + method output_char c = Buffer.add_char buf c + method output bs i len = Buffer.add_subbytes buf bs i len + end + +(** Output the buffer slice into this channel *) +let[@inline] output_char (self : #t) c : unit = self#output_char c + +(** Output the buffer slice into this channel *) +let[@inline] output (self : #t) buf i len : unit = self#output buf i len + +let[@inline] output_string (self : #t) (str : string) : unit = + self#output (Bytes.unsafe_of_string str) 0 (String.length str) + +let output_line (self : #t) (str : string) : unit = + output_string self str; + output_char self '\n' + +(** Close the channel. *) +let[@inline] close self : unit = self#close () + +(** Flush (ie. force write) any buffered bytes. *) +let[@inline] flush self : unit = self#flush () + +let output_int self i = + let s = string_of_int i in + output_string self s + +let output_lines self seq = Seq.iter (output_line self) seq + +let tee (l : t list) : t = + match l with + | [] -> dummy + | [ oc ] -> oc + | _ -> + let output bs i len = List.iter (fun oc -> output oc bs i len) l in + let output_char c = List.iter (fun oc -> output_char oc c) l in + let close () = List.iter close l in + let flush () = List.iter flush l in + create ~flush ~close ~output ~output_char () diff --git a/src/lwt/base.ml b/src/lwt/base.ml new file mode 100644 index 00000000..4ed374fa --- /dev/null +++ b/src/lwt/base.ml @@ -0,0 +1,158 @@ +open Common_ +module Fiber = Moonpool_fib.Fiber +module FLS = Moonpool_fib.Fls + +(** Action scheduled from outside the loop *) +module Action = struct + type event = Lwt_engine.event + type cb = event -> unit + + (** Action that we ask the lwt loop to perform, from the outside *) + type t = + | Wait_readable of Unix.file_descr * cb + | Wait_writable of Unix.file_descr * cb + | Sleep of float * bool * cb + (* TODO: provide actions with cancellation, alongside a "select" operation *) + (* | Cancel of event *) + | On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t + | Wakeup : 'a Lwt.u * 'a -> t + | Wakeup_exn : _ Lwt.u * exn -> t + | Other of (unit -> unit) + + (** Perform the action from within the Lwt thread *) + let perform (self : t) : unit = + match self with + | Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event) + | Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event) + | Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event) + (* | Cancel ev -> Lwt_engine.stop_event ev *) + | On_termination (fut, f) -> + Lwt.on_any fut + (fun x -> f @@ Ok x) + (fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn)) + | Wakeup (prom, x) -> Lwt.wakeup prom x + | Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e + | Other f -> f () +end + +module Action_queue = struct + type t = { q: Action.t list Atomic.t } [@@unboxed] + + let create () : t = { q = Atomic.make [] } + let pop_all (self : t) : _ list = Atomic.exchange self.q [] + + (** Push the action and return whether the queue was previously empty *) + let push (self : t) (a : Action.t) : bool = + let is_first = ref true in + while + let old = Atomic.get self.q in + if Atomic.compare_and_set self.q old (a :: old) then ( + is_first := old = []; + false + ) else + true + do + () + done; + !is_first +end + +module Perform_action_in_lwt = struct + open struct + let actions_ : Action_queue.t = Action_queue.create () + + (** Gets the current set of notifications and perform them from inside the + Lwt thread *) + let perform_pending_actions () : unit = + let l = Action_queue.pop_all actions_ in + List.iter Action.perform l + + let notification : int = + Lwt_unix.make_notification ~once:false perform_pending_actions + end + + let schedule (a : Action.t) : unit = + let is_first = Action_queue.push actions_ a in + if is_first then Lwt_unix.send_notification notification +end + +let get_runner () : M.Runner.t = + match M.Runner.get_current_runner () with + | Some r -> r + | None -> failwith "Moonpool_lwt.get_runner: not inside a runner" + +let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t = + let lwt_fut, lwt_prom = Lwt.wait () in + M.Fut.on_result fut (function + | Ok x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (lwt_prom, x) + | Error (exn, _) -> + Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (lwt_prom, exn)); + lwt_fut + +let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = + match Lwt.poll lwt_fut with + | Some x -> M.Fut.return x + | None -> + let fut, prom = M.Fut.make () in + Lwt.on_any lwt_fut + (fun x -> M.Fut.fulfill prom (Ok x)) + (fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10))); + fut + +let await_lwt (fut : _ Lwt.t) = + match Lwt.poll fut with + | Some x -> x + | None -> + (* suspend fiber, wake it up when [fut] resolves *) + M.Private.Suspend_.suspend + { + handle = + (fun ~ls ~run:_ ~resume sus -> + let on_lwt_done _ = resume ~ls sus @@ Ok () in + Perform_action_in_lwt.( + schedule Action.(On_termination (fut, on_lwt_done)))); + }; + + (match Lwt.poll fut with + | Some x -> x + | None -> assert false) + +let run_in_lwt f : _ M.Fut.t = + let fut, prom = M.Fut.make () in + Perform_action_in_lwt.schedule + (Action.Other + (fun () -> + let lwt_fut = f () in + Lwt.on_any lwt_fut + (fun x -> M.Fut.fulfill prom @@ Ok x) + (fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn)))); + fut + +let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f + +let detach_in_runner ~runner f : _ Lwt.t = + let fut, promise = Lwt.wait () in + M.Runner.run_async runner (fun () -> + match f () with + | x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (promise, x) + | exception exn -> + Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn)); + fut + +let main_with_runner ~runner (f : unit -> 'a) : 'a = + let lwt_fut, lwt_prom = Lwt.wait () in + + let _fiber = + Fiber.spawn_top ~name:"Moonpool_lwt.main" ~on:runner (fun () -> + try + let x = f () in + Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x)) + with exn -> + Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn))) + in + + Lwt_main.run lwt_fut + +let main f = + let@ runner = M.Ws_pool.with_ () in + main_with_runner ~runner f diff --git a/src/lwt/common_.ml b/src/lwt/common_.ml index 2d46076a..d1fdf6d2 100644 --- a/src/lwt/common_.ml +++ b/src/lwt/common_.ml @@ -2,3 +2,4 @@ module M = Moonpool module Exn_bt = M.Exn_bt let ( let@ ) = ( @@ ) +let _default_buf_size = 4 * 1024 diff --git a/src/lwt/moonpool_lwt.ml b/src/lwt/moonpool_lwt.ml index 0637c738..95166d12 100644 --- a/src/lwt/moonpool_lwt.ml +++ b/src/lwt/moonpool_lwt.ml @@ -1,229 +1,31 @@ open Common_ -module Fiber = Moonpool_fib.Fiber -module FLS = Moonpool_fib.Fls +include Base +module IO = IO +module IO_out = IO_out +module IO_in = IO_in -(** Action scheduled from outside the loop *) -module Action = struct - type event = Lwt_engine.event - type cb = event -> unit +module TCP_server = struct + type t = Lwt_io.server - (** Action that we ask the lwt loop to perform, from the outside *) - type t = - | Wait_readable of Unix.file_descr * cb - | Wait_writable of Unix.file_descr * cb - | Sleep of float * bool * cb - (* TODO: provide actions with cancellation, alongside a "select" operation *) - (* | Cancel of event *) - | On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t - | Wakeup : 'a Lwt.u * 'a -> t - | Wakeup_exn : _ Lwt.u * exn -> t - | Other of (unit -> unit) + let establish ?backlog ?no_close ~runner addr handler : t = + let server = + Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr + (fun client_addr client_sock -> + let ic = IO_in.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in + let oc = IO_out.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in - (** Perform the action from within the Lwt thread *) - let perform (self : t) : unit = - match self with - | Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event) - | Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event) - | Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event) - (* | Cancel ev -> Lwt_engine.stop_event ev *) - | On_termination (fut, f) -> - Lwt.on_any fut - (fun x -> f @@ Ok x) - (fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn)) - | Wakeup (prom, x) -> Lwt.wakeup prom x - | Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e - | Other f -> f () -end - -module Action_queue = struct - type t = { q: Action.t list Atomic.t } [@@unboxed] - - let create () : t = { q = Atomic.make [] } - let pop_all (self : t) : _ list = Atomic.exchange self.q [] - - (** Push the action and return whether the queue was previously empty *) - let push (self : t) (a : Action.t) : bool = - let is_first = ref true in - while - let old = Atomic.get self.q in - if Atomic.compare_and_set self.q old (a :: old) then ( - is_first := old = []; - false - ) else - true - do - () - done; - !is_first -end - -module Perform_action_in_lwt = struct - open struct - let actions_ : Action_queue.t = Action_queue.create () - - (** Gets the current set of notifications and perform them from inside the - Lwt thread *) - let perform_pending_actions () : unit = - let l = Action_queue.pop_all actions_ in - List.iter Action.perform l - - let notification : int = - Lwt_unix.make_notification ~once:false perform_pending_actions - end - - let schedule (a : Action.t) : unit = - let is_first = Action_queue.push actions_ a in - if is_first then Lwt_unix.send_notification notification -end - -let get_runner () : M.Runner.t = - match M.Runner.get_current_runner () with - | Some r -> r - | None -> failwith "Moonpool_lwt.get_runner: not inside a runner" - -let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t = - let lwt_fut, lwt_prom = Lwt.wait () in - M.Fut.on_result fut (function - | Ok x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (lwt_prom, x) - | Error (exn, _) -> - Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (lwt_prom, exn)); - lwt_fut - -let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = - match Lwt.poll lwt_fut with - | Some x -> M.Fut.return x - | None -> - let fut, prom = M.Fut.make () in - Lwt.on_any lwt_fut - (fun x -> M.Fut.fulfill prom (Ok x)) - (fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10))); - fut - -let await_lwt (fut : _ Lwt.t) = - match Lwt.poll fut with - | Some x -> x - | None -> - (* suspend fiber, wake it up when [fut] resolves *) - M.Private.Suspend_.suspend - { - handle = - (fun ~ls ~run:_ ~resume sus -> - let on_lwt_done _ = resume ~ls sus @@ Ok () in - Perform_action_in_lwt.( - schedule Action.(On_termination (fut, on_lwt_done)))); - }; - - (match Lwt.poll fut with - | Some x -> x - | None -> assert false) - -let run_in_lwt f : _ M.Fut.t = - let fut, prom = M.Fut.make () in - Perform_action_in_lwt.schedule - (Action.Other - (fun () -> - let lwt_fut = f () in - Lwt.on_any lwt_fut - (fun x -> M.Fut.fulfill prom @@ Ok x) - (fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn)))); - fut - -let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f - -let detach_in_runner ~runner f : _ Lwt.t = - let fut, promise = Lwt.wait () in - M.Runner.run_async runner (fun () -> - match f () with - | x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (promise, x) - | exception exn -> - Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn)); - fut - -let main_with_runner ~runner (f : unit -> 'a) : 'a = - let lwt_fut, lwt_prom = Lwt.wait () in - - let _fiber = - Fiber.spawn_top ~name:"Moonpool_lwt.main" ~on:runner (fun () -> - try - let x = f () in - Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x)) - with exn -> - Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn))) - in - - Lwt_main.run lwt_fut - -let main f = - let@ runner = M.Ws_pool.with_ () in - main_with_runner ~runner f - -module IO = struct - let rec read fd buf i len : int = - if len = 0 then - 0 - else ( - match Unix.read fd buf i len with - | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> - (* wait for FD to be ready *) - Moonpool.Private.Suspend_.suspend - { - handle = - (fun ~ls ~run:_ ~resume sus -> - Perform_action_in_lwt.schedule - @@ Action.Wait_readable - ( fd, - fun cancel -> - resume ~ls sus @@ Ok (); - Lwt_engine.stop_event cancel )); - }; - read fd buf i len - | n -> n - ) - - let rec write_once fd buf i len : int = - if len = 0 then - 0 - else ( - match Unix.write fd buf i len with - | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> - (* wait for FD to be ready *) - Moonpool.Private.Suspend_.suspend - { - handle = - (fun ~ls ~run:_ ~resume sus -> - Perform_action_in_lwt.schedule - @@ Action.Wait_writable - ( fd, - fun cancel -> - resume ~ls sus @@ Ok (); - Lwt_engine.stop_event cancel )); - }; - write_once fd buf i len - | n -> n - ) - - let write fd buf i len : unit = - let i = ref i in - let len = ref len in - while !len > 0 do - let n = write_once fd buf !i !len in - i := !i + n; - len := !len - n - done - - (** Sleep for the given amount of seconds *) - let sleep_s (f : float) : unit = - if f > 0. then - Moonpool.Private.Suspend_.suspend - { - handle = - (fun ~ls ~run:_ ~resume sus -> - Perform_action_in_lwt.schedule - @@ Action.Sleep - ( f, - false, - fun cancel -> - resume ~ls sus @@ Ok (); - Lwt_engine.stop_event cancel )); - } + let fut = + M.Fut.spawn ~name:"tcp.server.handler" ~on:runner (fun () -> + handler client_addr ic oc) + in + + let lwt_fut = lwt_of_fut fut in + lwt_fut) + in + Printf.printf "awaiting server\n%!"; + let s = await_lwt server in + Printf.printf "got server\n%!"; + s + + let shutdown self = await_lwt @@ Lwt_io.shutdown_server self end diff --git a/src/lwt/moonpool_lwt.mli b/src/lwt/moonpool_lwt.mli index f7a5ec2c..af5b495e 100644 --- a/src/lwt/moonpool_lwt.mli +++ b/src/lwt/moonpool_lwt.mli @@ -33,6 +33,17 @@ val get_runner : unit -> Moonpool.Runner.t Must be run from within a fiber. @raise Failure if not run within a fiber *) +(** {2 IO} *) + +(** IO using the Lwt event loop. + + These IO operations work on non-blocking file descriptors + and rely on a [Lwt_engine] event loop being active (meaning, + [Lwt_main.run] is currently running in some thread). + + Calling these functions must be done from a moonpool runner and + will suspend the current task/fut/fiber if the FD is not ready. +*) module IO : sig val read : Unix.file_descr -> bytes -> int -> int -> int val write_once : Unix.file_descr -> bytes -> int -> int -> int @@ -40,6 +51,24 @@ module IO : sig val sleep_s : float -> unit end +module IO_in = IO_in +module IO_out = IO_out + +module TCP_server : sig + type t = Lwt_io.server + + val establish : + ?backlog:(* ?server_fd:Unix.file_descr -> *) + int -> + ?no_close:bool -> + runner:Moonpool.Runner.t -> + Unix.sockaddr -> + (Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) -> + t + + val shutdown : t -> unit +end + (** {2 Helpers on the lwt side} *) val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t