create output buffer for non blocking socket

This commit is contained in:
craff 2021-12-11 20:18:39 -10:00
parent 68dde5ac58
commit 91ca51ea42

View file

@ -18,25 +18,46 @@ let _debug k =
Printf.kfprintf (fun oc -> Printf.fprintf oc "\n%!") stdout fmt)
)
let output oc s i len =
let fd = Unix.descr_of_out_channel oc in
let rec fn i len =
try
let _,w,_ = Unix.select [] [fd] [] (-1.0) in
if w <> [] then
begin
let written = Unix.single_write fd s i len in
if written < len then
fn (i+written) (len-written)
end
else assert false
with Sys_blocked_io
| Unix.Unix_error((EAGAIN|EWOULDBLOCK),_,_) ->
fn i len
| Unix.Unix_error(EPIPE,_,_) | Sys_error _ ->
()
type out = { fd : Unix.file_descr
; buf : Bytes.t
; mutable pos : int }
let out_of_descr fd =
let size = Unix.(getsockopt_int fd SO_SNDBUF) in
{ fd; buf = Bytes.create size; pos = 0 }
let rec write_out ({buf;fd;_} as oc) i len =
try
let _,w,_ = Unix.select [] [fd] [] (-1.0) in
if w <> [] then
begin
let written = Unix.single_write fd buf i len in
if written < len then
write_out oc (i+written) (len-written)
end
else assert false
with Sys_blocked_io
| Unix.Unix_error((EAGAIN|EWOULDBLOCK),_,_) ->
write_out oc i len
| Unix.Unix_error(EPIPE,_,_) ->
raise (Sys_error "broken pipe")
let rec output ({buf;pos;_} as oc) s i len =
let buf_len = Bytes.length buf in
let do_write, to_write =
if len >= buf_len - pos then
true, buf_len - pos
else
false, len
in
fn i len
Bytes.blit s i buf pos to_write;
oc.pos <- pos + to_write;
if do_write then
begin
write_out oc 0 buf_len;
oc.pos <- 0;
output oc s (i + to_write) (len - to_write)
end
let output_string oc str =
let buf = Bytes.unsafe_of_string str in
@ -45,6 +66,10 @@ let output_string oc str =
let fprintf oc format =
Printf.ksprintf (output_string oc) format
let flush oc =
write_out oc 0 oc.pos;
oc.pos <- 0
module Buf_ = struct
type t = {
mutable bytes: bytes;
@ -149,7 +174,7 @@ module Byte_stream = struct
)
let to_chan (oc:out_channel) (self:t) =
iter (fun s i len -> output oc s i len) self
iter (fun s i len -> Stdlib.output oc s i len) self
let of_bytes ?(i=0) ?len s : t =
(* invariant: !i+!len is constant *)
@ -682,7 +707,7 @@ module Response = struct
self.code Headers.pp self.headers pp_body self.body
(* print a stream as a series of chunks *)
let output_stream_chunked_ (oc:out_channel) (str:byte_stream) : unit =
let output_stream_chunked_ (oc:out) (str:byte_stream) : unit =
let continue = ref true in
while !continue do
(* next chunk *)
@ -697,7 +722,7 @@ module Response = struct
done;
()
let output_ (oc:out_channel) (self:t) : unit =
let output_ (oc:out) (self:t) : unit =
fprintf oc "HTTP/1.1 %d %s\r\n" self.code (Response_code.descr self.code);
let body, is_chunked = match self.body with
| `String s when String.length s > 1024 * 500 ->
@ -843,7 +868,7 @@ end
(* a request handler. handles a single request. *)
type cb_path_handler =
out_channel ->
out ->
byte_stream Request.t ->
resp:(Response.t -> unit) ->
unit
@ -911,7 +936,7 @@ let add_path_handler_
| handler ->
(* we have a handler, do we accept the request based on its headers? *)
begin match accept req with
| Ok () -> Some (Ok (fun _oc req ~resp -> resp (handler (tr_req req))))
| Ok () -> Some (Ok (fun (_oc:out) req ~resp -> resp (handler (tr_req req))))
| Error _ as e -> Some e
end
| exception _ ->
@ -942,7 +967,7 @@ let add_route_handler_
| Some handler ->
(* we have a handler, do we accept the request based on its headers? *)
begin match accept req with
| Ok () -> Some (Ok (fun oc req ~resp -> tr_req oc req ~resp handler))
| Ok () -> Some (Ok (fun (oc:out) req ~resp -> tr_req oc req ~resp handler))
| Error _ as e -> Some e
end
| None ->
@ -964,7 +989,7 @@ let[@inline] _opt_iter ~f o = match o with
| Some x -> f x
let add_route_server_sent_handler ?accept self route f =
let tr_req oc req ~resp f =
let tr_req (oc:out) req ~resp f =
let req = Request.read_body_full req in
let headers = ref Headers.(empty |> set "content-type" "text/event-stream") in
@ -999,7 +1024,7 @@ let add_route_server_sent_handler ?accept self route f =
let close () = raise Exit
end in
try f req (module SSG : SERVER_SENT_GENERATOR);
with Exit -> close_out oc
with Exit -> Unix.close oc.fd
in
add_route_handler_ self ?accept ~meth:`GET route ~tr_req f
@ -1029,7 +1054,7 @@ let find_map f l =
let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
let _ = Unix.set_nonblock client_sock in
let oc = Unix.out_channel_of_descr client_sock in
let oc = out_of_descr client_sock in
let buf = Buf_.create() in
let is = Byte_stream.of_descr client_sock in
let continue = ref true in