Do non blocking write ... not satisfactory yet

This commit is contained in:
craff 2021-12-11 19:47:40 -10:00
parent bb74d7d82e
commit 68dde5ac58

View file

@ -18,6 +18,33 @@ let _debug k =
Printf.kfprintf (fun oc -> Printf.fprintf oc "\n%!") stdout fmt)
)
let output oc s i len =
let fd = Unix.descr_of_out_channel oc in
let rec fn i len =
try
let _,w,_ = Unix.select [] [fd] [] (-1.0) in
if w <> [] then
begin
let written = Unix.single_write fd s i len in
if written < len then
fn (i+written) (len-written)
end
else assert false
with Sys_blocked_io
| Unix.Unix_error((EAGAIN|EWOULDBLOCK),_,_) ->
fn i len
| Unix.Unix_error(EPIPE,_,_) | Sys_error _ ->
()
in
fn i len
let output_string oc str =
let buf = Bytes.unsafe_of_string str in
output oc buf 0 (String.length str)
let fprintf oc format =
Printf.ksprintf (output_string oc) format
module Buf_ = struct
type t = {
mutable bytes: bytes;
@ -68,13 +95,16 @@ module Byte_stream = struct
exception Timeout
let of_descr_ ?(timeout=(-1.0)) ~close fd : t =
let of_descr_ ?(is_sock=false) ?(timeout=(-1.0)) ~close fd : t =
let i = ref 0 in
let len = ref 0 in
let size = Unix.(getsockopt_int fd SO_RCVBUF) in
let size = if is_sock then
Unix.(getsockopt_int fd SO_RCVBUF)
else
0x4000
in
let buf = Bytes.make size ' ' in
Unix.set_nonblock fd;
{ bs_fill_buf=(fun () ->
if !i >= !len then (
i := 0;
@ -97,7 +127,8 @@ module Byte_stream = struct
bs_close=close
}
let of_descr ?timeout fd = of_descr_ ?timeout ~close:(fun() -> Unix.close fd) fd
let of_descr ?timeout fd = of_descr_ ~is_sock:true ?timeout
~close:(fun() -> Unix.close fd) fd
let of_chan ic =
let fd = Unix.descr_of_in_channel ic in
@ -656,7 +687,7 @@ module Response = struct
while !continue do
(* next chunk *)
let s, i, len = str.bs_fill_buf () in
Printf.fprintf oc "%x\r\n" len;
fprintf oc "%x\r\n" len;
output oc s i len;
str.bs_consume len;
if len = 0 then (
@ -667,7 +698,7 @@ module Response = struct
()
let output_ (oc:out_channel) (self:t) : unit =
Printf.fprintf oc "HTTP/1.1 %d %s\r\n" self.code (Response_code.descr self.code);
fprintf oc "HTTP/1.1 %d %s\r\n" self.code (Response_code.descr self.code);
let body, is_chunked = match self.body with
| `String s when String.length s > 1024 * 500 ->
(* chunk-encode large bodies *)
@ -686,7 +717,7 @@ module Response = struct
let self = {self with headers; body} in
_debug (fun k->k "output response: %s"
(Format.asprintf "%a" pp {self with body=`String "<…>"}));
List.iter (fun (k,v) -> Printf.fprintf oc "%s: %s\r\n" k v) headers;
List.iter (fun (k,v) -> fprintf oc "%s: %s\r\n" k v) headers;
output_string oc "\r\n";
begin match body with
| `String "" | `Void -> ()
@ -950,11 +981,11 @@ let add_route_server_sent_handler ?accept self route f =
let send_event ?event ?id ?retry ~data () : unit =
send_response_idempotent_();
_opt_iter event ~f:(fun e -> Printf.fprintf oc "data: %s\n" e);
_opt_iter id ~f:(fun e -> Printf.fprintf oc "id: %s\n" e);
_opt_iter retry ~f:(fun e -> Printf.fprintf oc "retry: %s\n" e);
_opt_iter event ~f:(fun e -> fprintf oc "data: %s\n" e);
_opt_iter id ~f:(fun e -> fprintf oc "id: %s\n" e);
_opt_iter retry ~f:(fun e -> fprintf oc "retry: %s\n" e);
let l = String.split_on_char '\n' data in
List.iter (fun s -> Printf.fprintf oc "data: %s\n" s) l;
List.iter (fun s -> fprintf oc "data: %s\n" s) l;
output_string oc "\n"; (* finish group *)
flush oc
in
@ -997,10 +1028,8 @@ let find_map f l =
in aux f l
let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
let write_sock = Unix.dup client_sock in
let _ = Unix.set_nonblock client_sock in
let _ = Unix.clear_nonblock write_sock in
let oc = Unix.out_channel_of_descr write_sock in
let oc = Unix.out_channel_of_descr client_sock in
let buf = Buf_.create() in
let is = Byte_stream.of_descr client_sock in
let continue = ref true in