mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-12 22:10:46 -05:00
wip: moonpool_unix
This commit is contained in:
parent
57be8f2130
commit
03676f2e3d
6 changed files with 88 additions and 0 deletions
|
|
@ -48,3 +48,15 @@ let write fd buf i len : unit =
|
|||
i := !i + n;
|
||||
len := !len - n
|
||||
done
|
||||
|
||||
(** Sleep for the given amount of seconds *)
|
||||
let sleep_s (f : float) : unit =
|
||||
if f > 0. then
|
||||
Moonpool.Private.Suspend_.suspend
|
||||
{
|
||||
handle =
|
||||
(fun ~ls ~run:_ ~resume sus ->
|
||||
let cancel = Cancel_handle.dummy in
|
||||
Ev_loop.run_after_s f cancel (fun _cancel ->
|
||||
resume ~ls sus @@ Ok ()));
|
||||
}
|
||||
|
|
|
|||
0
src/unix/TCP_client.ml
Normal file
0
src/unix/TCP_client.ml
Normal file
61
src/unix/TCP_server.ml
Normal file
61
src/unix/TCP_server.ml
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
open Common_
|
||||
module A = M.Atomic
|
||||
|
||||
type t = {
|
||||
active: bool A.t;
|
||||
cancel: Cancel_handle.t;
|
||||
port: int;
|
||||
fd: Unix.file_descr;
|
||||
runner: M.Runner.t;
|
||||
}
|
||||
|
||||
let port self = self.port
|
||||
|
||||
let with_server ?(addr = Unix.inet_addr_any) ?(port = 0) ?(after_init = ignore)
|
||||
~runner ~handle_client () : unit =
|
||||
let active = A.make true in
|
||||
let cancel = Cancel_handle.create_with (fun () -> A.set active false) in
|
||||
let server_sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
|
||||
Unix.bind server_sock (Unix.ADDR_INET (addr, port));
|
||||
Unix.set_nonblock server_sock;
|
||||
Unix.listen server_sock 16;
|
||||
|
||||
(* now get the real port *)
|
||||
let port =
|
||||
match Unix.getsockname server_sock with
|
||||
| Unix.ADDR_INET (_, p) -> p
|
||||
| _ -> assert false
|
||||
in
|
||||
|
||||
(* Unix.setsockopt sock *)
|
||||
let finally () = Unix.close server_sock in
|
||||
let@ () = Fun.protect ~finally in
|
||||
|
||||
let self = { active; cancel; port; fd = server_sock; runner } in
|
||||
after_init self;
|
||||
|
||||
while A.get self.active do
|
||||
(* accept new client *)
|
||||
match Unix.accept server_sock with
|
||||
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
|
||||
(* wait for [sock] to be ready *)
|
||||
let@ _sp = Tracing_.with_span "tcp-server.suspend" in
|
||||
Moonpool.Private.Suspend_.suspend
|
||||
{
|
||||
handle =
|
||||
(fun ~ls ~run:_ ~resume sus ->
|
||||
Ev_loop.wait_readable server_sock cancel (fun _ ->
|
||||
resume ~ls sus @@ Ok ()));
|
||||
}
|
||||
| client_sock, client_addr ->
|
||||
(* handle client *)
|
||||
Unix.setsockopt client_sock Unix.TCP_NODELAY true;
|
||||
Unix.set_nonblock client_sock;
|
||||
|
||||
let ic = IO_in.of_unix_fd client_sock in
|
||||
let oc = IO_out.of_unix_fd client_sock in
|
||||
|
||||
M.Runner.run_async runner ~name:"tcp.handle-client" (fun () ->
|
||||
let@ () = Fun.protect ~finally:(fun () -> Unix.close client_sock) in
|
||||
handle_client self client_addr ic oc)
|
||||
done
|
||||
12
src/unix/TCP_server.mli
Normal file
12
src/unix/TCP_server.mli
Normal file
|
|
@ -0,0 +1,12 @@
|
|||
type t
|
||||
|
||||
val port : t -> int
|
||||
|
||||
val with_server :
|
||||
?addr:Unix.inet_addr ->
|
||||
?port:int ->
|
||||
?after_init:(t -> unit) ->
|
||||
runner:Moonpool.Runner.t ->
|
||||
handle_client:(t -> Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) ->
|
||||
unit ->
|
||||
unit
|
||||
|
|
@ -207,6 +207,7 @@ module Ev_loop = struct
|
|||
let reads, writes = IO_tbl.prepare_select self.io_tbl in
|
||||
A.set self.in_blocking_section true;
|
||||
let reads, writes, _ =
|
||||
let@ _sp = Tracing_.with_span "moonpool-unix.select" in
|
||||
Unix.select (self.pipe_read :: reads) writes [] delay
|
||||
in
|
||||
A.set self.in_blocking_section false;
|
||||
|
|
|
|||
|
|
@ -13,6 +13,8 @@ module Fut = Moonpool.Fut
|
|||
module Cancel_handle = Cancel_handle
|
||||
module IO_in = IO_in
|
||||
module IO_out = IO_out
|
||||
module TCP_client = TCP_client
|
||||
module TCP_server = TCP_server
|
||||
include IO_unix
|
||||
|
||||
let run_after_s = Ev_loop.run_after_s
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue