fixes in moonpool-io

This commit is contained in:
Simon Cruanes 2024-07-01 11:42:11 -04:00
parent a147eeb0ab
commit b466e85fb4
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 28 additions and 9 deletions

View file

@ -60,8 +60,6 @@ module Per_fd = struct
mutable writes: IO_wait.t list; mutable writes: IO_wait.t list;
} }
let[@inline] is_empty self = self.reads = [] && self.writes = []
let update_event (self : t) ~(poll : Poll.t) : unit = let update_event (self : t) ~(poll : Poll.t) : unit =
let ev = let ev =
match self.reads, self.writes with match self.reads, self.writes with
@ -71,6 +69,9 @@ module Per_fd = struct
| [], _ :: _ -> Poll.Event.write | [], _ :: _ -> Poll.Event.write
| _ :: _, _ :: _ -> Poll.Event.read_write | _ :: _, _ :: _ -> Poll.Event.read_write
in in
Printf.eprintf "poll.set %d {rd=%b, wr=%b}\n%!"
(Obj.magic self.fd.fd : int)
ev.readable ev.writable;
Poll.set poll self.fd.fd ev Poll.set poll self.fd.fd ev
end end
@ -100,11 +101,11 @@ module IO_tbl = struct
| Read -> | Read ->
self.n_read <- 1 + self.n_read; self.n_read <- 1 + self.n_read;
per_fd.reads <- ev :: per_fd.reads; per_fd.reads <- ev :: per_fd.reads;
if self.n_read = 0 then Per_fd.update_event per_fd ~poll:self.poll if self.n_read = 1 then Per_fd.update_event per_fd ~poll:self.poll
| Write -> | Write ->
self.n_write <- 1 + self.n_write; self.n_write <- 1 + self.n_write;
per_fd.writes <- ev :: per_fd.writes; per_fd.writes <- ev :: per_fd.writes;
if self.n_write = 0 then Per_fd.update_event per_fd ~poll:self.poll if self.n_write = 1 then Per_fd.update_event per_fd ~poll:self.poll
let[@inline] trigger_waiter (io : IO_wait.t) = let[@inline] trigger_waiter (io : IO_wait.t) =
if io.active then io.f io.as_cancel_handle if io.active then io.f io.as_cancel_handle
@ -112,6 +113,7 @@ module IO_tbl = struct
(** Wake up waiters on FDs who received events *) (** Wake up waiters on FDs who received events *)
let handle_ready ~ignore_fd (self : t) : unit = let handle_ready ~ignore_fd (self : t) : unit =
let update_per_fd (per_fd : Per_fd.t) (event : Poll.Event.t) = let update_per_fd (per_fd : Per_fd.t) (event : Poll.Event.t) =
Printf.eprintf "handle ready %d\n%!" (Obj.magic per_fd.fd.fd : int);
if Fd.closed per_fd.fd then if Fd.closed per_fd.fd then
(* cleanup *) (* cleanup *)
Hashtbl.remove self.tbl per_fd.fd.fd Hashtbl.remove self.tbl per_fd.fd.fd
@ -136,6 +138,8 @@ module IO_tbl = struct
let per_fd = Hashtbl.find self.tbl fd in let per_fd = Hashtbl.find self.tbl fd in
update_per_fd per_fd event update_per_fd per_fd event
)); ));
Poll.clear self.poll;
() ()
(** Remove closed FDs *) (** Remove closed FDs *)
@ -165,7 +169,8 @@ module Ev_loop = struct
type t = { type t = {
timer: Timer.t; timer: Timer.t;
actions: Action_queue.t; actions: Action_queue.t;
io_tbl: IO_tbl.t; (** Used for select *) io_tbl: IO_tbl.t;
(** Used to remember which events are tracked on which FD *)
in_blocking_section: bool A.t; in_blocking_section: bool A.t;
(** Is the ev loop thread currently waiting? *) (** Is the ev loop thread currently waiting? *)
pipe_read: Unix.file_descr; (** Main thread only *) pipe_read: Unix.file_descr; (** Main thread only *)
@ -181,7 +186,6 @@ module Ev_loop = struct
Unix.set_nonblock pipe_read; Unix.set_nonblock pipe_read;
let poll = Poll.create () in let poll = Poll.create () in
Poll.set poll pipe_read Poll.Event.read;
{ {
timer = Timer.create (); timer = Timer.create ();
@ -233,14 +237,18 @@ module Ev_loop = struct
let delay_s = Option.value delay_s ~default:10. in let delay_s = Option.value delay_s ~default:10. in
let timeout = Poll.Timeout.after Int64.(of_float (delay_s *. 1e9)) in let timeout = Poll.Timeout.after Int64.(of_float (delay_s *. 1e9)) in
(* run [select] *) (* poll *)
Poll.set self.poll self.pipe_read Poll.Event.read;
A.set self.in_blocking_section true; A.set self.in_blocking_section true;
let has_events = let has_events =
let@ _sp = Tracing_.with_span "moonpool-unix.evloop.select" in let@ _sp = Tracing_.with_span "moonpool-unix.evloop.poll" in
Printf.eprintf "polling…\n%!";
match Poll.wait self.poll timeout with match Poll.wait self.poll timeout with
| `Timeout -> false | `Timeout -> false
| `Ok -> true | `Ok -> true
in in
Printf.eprintf "poll returned with has_events=%b\n%!" has_events;
A.set self.in_blocking_section false; A.set self.in_blocking_section false;
@ -269,6 +277,10 @@ let rec get_or_set_as_current_ (ev : Ev_loop.t) : Ev_loop.t * bool =
let bg_loop_ (ev_loop : Ev_loop.t) = let bg_loop_ (ev_loop : Ev_loop.t) =
let@ _sp = Tracing_.with_span "Moonpool_unix.bg-loop" in let@ _sp = Tracing_.with_span "Moonpool_unix.bg-loop" in
Printf.eprintf "bg loop is thread %d\n%!" (Thread.id @@ Thread.self ());
ignore
(Unix.sigprocmask Unix.SIG_BLOCK [ Sys.sigpipe; Sys.sigterm; Sys.sigint ]
: _ list);
while true do while true do
Ev_loop.run_step_ ev_loop Ev_loop.run_step_ ev_loop
done done

View file

@ -11,10 +11,14 @@ let main ~port ~j () : unit =
let@ _main_runner = MU.main in let@ _main_runner = MU.main in
Trace.set_thread_name "main"; Trace.set_thread_name "main";
Printf.eprintf "main is thread %d\n%!" (Thread.id @@ Thread.self ());
let@ server = let@ server =
MU.TCP_server.with_server ~runner (MU.Sockaddr.any port) MU.TCP_server.with_server ~runner (MU.Sockaddr.any port)
~handle:(fun ~client_addr:addr ic oc -> ~handle:(fun ~client_addr:addr ic oc ->
Trace.message "got new client"; Trace.message "got new client";
Printf.eprintf "handle client on thread %d\n%!"
(Thread.id @@ Thread.self ());
let@ _sp = let@ _sp =
Trace.with_span ~__FILE__ ~__LINE__ "handle.client" ~data:(fun () -> Trace.with_span ~__FILE__ ~__LINE__ "handle.client" ~data:(fun () ->
[ "addr", `String (MU.Sockaddr.show addr) ]) [ "addr", `String (MU.Sockaddr.show addr) ])
@ -59,4 +63,7 @@ let () =
in in
Arg.parse opts ignore "echo server"; Arg.parse opts ignore "echo server";
main ~port:!port ~j:!j () try main ~port:!port ~j:!j ()
with Sys.Break ->
Printf.eprintf "got ctrl-c, exiting\n%!";
exit 0