eio: reimplement some buffering

This commit is contained in:
Simon Cruanes 2023-06-04 00:02:20 -04:00
parent bfe5e9c358
commit 8620fe688d
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -15,28 +15,68 @@ let get_max_connection_ ?(max_connections = 64) () : int =
let max_connections = max 4 max_connections in let max_connections = max 4 max_connections in
max_connections max_connections
let read_buf_size = 4 * 1024 let buf_size = 16 * 1024
let write_buf_size = 8 * 1024
let ic_of_flow (flow : Eio.Net.stream_socket) : IO.In_channel.t = let ic_of_flow (flow : Eio.Net.stream_socket) : IO.In_channel.t =
let cstruct = Cstruct.create write_buf_size in let cstruct = Cstruct.create buf_size in
let len_slice = ref 0 in
let offset = ref 0 in
let input buf i len = let input buf i len =
if len = 0 then if len = 0 then
0 0
else ( else (
let n = flow#read_into (Cstruct.sub cstruct 0 (min len write_buf_size)) in let available = ref (!len_slice - !offset) in
Cstruct.blit_to_bytes cstruct 0 buf i n; if !available = 0 then (
n let n = flow#read_into cstruct in
offset := 0;
len_slice := n;
available := n
);
let n = min !available len in
if n > 0 then (
Cstruct.blit_to_bytes cstruct !offset buf i n;
offset := !offset + n;
n
) else
0
) )
in in
let close () = flow#shutdown `Receive in let close () = flow#shutdown `Receive in
{ IO.In_channel.input; close } { IO.In_channel.input; close }
let oc_of_flow (flow : Eio.Net.stream_socket) : IO.Out_channel.t = (*
let output buf i len = let output buf i len =
if len > 0 then ( let i = ref i in
let i = ref i in let len = ref len in
let len = ref len in
while !len > 0 do
let available = Cstruct.length cstruct - !offset in
let n = min !len available in
Cstruct.blit_from_bytes buf !i cstruct !offset n;
offset := !offset + n;
i := !i + n;
len := !len - n;
if !offset = Cstruct.length cstruct then (
flow#write [ cstruct ];
offset := 0
)
done
in
*)
let oc_of_flow (flow : Eio.Net.stream_socket) : IO.Out_channel.t =
(* write buffer *)
let wbuf = Bytes.create buf_size in
let offset = ref 0 in
let flush () =
if !offset > 0 then (
let i = ref 0 in
let len = ref !offset in
let src = let src =
object object
@ -45,18 +85,34 @@ let oc_of_flow (flow : Eio.Net.stream_socket) : IO.Out_channel.t =
method read_into (cstruct : Cstruct.t) : int = method read_into (cstruct : Cstruct.t) : int =
if !len = 0 then raise End_of_file; if !len = 0 then raise End_of_file;
let n = min !len (Cstruct.length cstruct) in let n = min !len (Cstruct.length cstruct) in
Cstruct.blit_from_bytes buf !i cstruct 0 n; Cstruct.blit_from_bytes wbuf !i cstruct 0 n;
i := !i + n; i := !i + n;
len := !len - n; len := !len - n;
n n
end end
in in
flow#copy src flow#copy src;
offset := 0
) )
in in
let output buf i len =
let i = ref i in
let len = ref len in
while !len > 0 do
let available = Bytes.length wbuf - !offset in
let n = min !len available in
Bytes.blit buf !i wbuf !offset n;
offset := !offset + n;
i := !i + n;
len := !len - n;
if !offset = Bytes.length wbuf then flush ()
done
in
let close () = flow#shutdown `Send in let close () = flow#shutdown `Send in
let flush () = () in
{ IO.Out_channel.close; flush; output } { IO.Out_channel.close; flush; output }
let io_backend ?(addr = "127.0.0.1") ?(port = 8080) ?max_connections let io_backend ?(addr = "127.0.0.1") ?(port = 8080) ?max_connections
@ -69,13 +125,11 @@ let io_backend ?(addr = "127.0.0.1") ?(port = 8080) ?max_connections
let clock = Eio.Stdenv.clock stdenv in let clock = Eio.Stdenv.clock stdenv in
Eio.Time.now clock Eio.Time.now clock
let spawn f : unit =
Eio.Switch.run @@ fun sub_sw -> Eio.Fiber.fork ~sw:sub_sw f
let tcp_server () : IO.TCP_server.builder = let tcp_server () : IO.TCP_server.builder =
{ {
IO.TCP_server.serve = IO.TCP_server.serve =
(fun ~after_init ~handle () : unit -> (fun ~after_init ~handle () : unit ->
(* FIXME: parse *)
let ip_addr = Eio.Net.Ipaddr.V4.any in let ip_addr = Eio.Net.Ipaddr.V4.any in
let running = Atomic.make true in let running = Atomic.make true in
let active_conns = Atomic.make 0 in let active_conns = Atomic.make 0 in
@ -105,8 +159,6 @@ let io_backend ?(addr = "127.0.0.1") ?(port = 8080) ?max_connections
after_init tcp_server; after_init tcp_server;
while Atomic.get running do while Atomic.get running do
Eio.Switch.check sw;
Eio.Net.accept_fork ~sw Eio.Net.accept_fork ~sw
~on_error:(fun exn -> ~on_error:(fun exn ->
H._debug (fun k -> H._debug (fun k ->