From 91ca51ea429e843e874e0639904d81dd2e77984e Mon Sep 17 00:00:00 2001 From: craff Date: Sat, 11 Dec 2021 20:18:39 -1000 Subject: [PATCH] create output buffer for non blocking socket --- src/Tiny_httpd.ml | 79 +++++++++++++++++++++++++++++++---------------- 1 file changed, 52 insertions(+), 27 deletions(-) diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index 735ad2c8..3406deab 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -18,25 +18,46 @@ let _debug k = Printf.kfprintf (fun oc -> Printf.fprintf oc "\n%!") stdout fmt) ) -let output oc s i len = - let fd = Unix.descr_of_out_channel oc in - let rec fn i len = - try - let _,w,_ = Unix.select [] [fd] [] (-1.0) in - if w <> [] then - begin - let written = Unix.single_write fd s i len in - if written < len then - fn (i+written) (len-written) - end - else assert false - with Sys_blocked_io - | Unix.Unix_error((EAGAIN|EWOULDBLOCK),_,_) -> - fn i len - | Unix.Unix_error(EPIPE,_,_) | Sys_error _ -> - () +type out = { fd : Unix.file_descr + ; buf : Bytes.t + ; mutable pos : int } + +let out_of_descr fd = + let size = Unix.(getsockopt_int fd SO_SNDBUF) in + { fd; buf = Bytes.create size; pos = 0 } + +let rec write_out ({buf;fd;_} as oc) i len = + try + let _,w,_ = Unix.select [] [fd] [] (-1.0) in + if w <> [] then + begin + let written = Unix.single_write fd buf i len in + if written < len then + write_out oc (i+written) (len-written) + end + else assert false + with Sys_blocked_io + | Unix.Unix_error((EAGAIN|EWOULDBLOCK),_,_) -> + write_out oc i len + | Unix.Unix_error(EPIPE,_,_) -> + raise (Sys_error "broken pipe") + +let rec output ({buf;pos;_} as oc) s i len = + let buf_len = Bytes.length buf in + let do_write, to_write = + if len >= buf_len - pos then + true, buf_len - pos + else + false, len in - fn i len + Bytes.blit s i buf pos to_write; + oc.pos <- pos + to_write; + if do_write then + begin + write_out oc 0 buf_len; + oc.pos <- 0; + output oc s (i + to_write) (len - to_write) + end let output_string oc str = let buf = Bytes.unsafe_of_string str in @@ -45,6 +66,10 @@ let output_string oc str = let fprintf oc format = Printf.ksprintf (output_string oc) format +let flush oc = + write_out oc 0 oc.pos; + oc.pos <- 0 + module Buf_ = struct type t = { mutable bytes: bytes; @@ -149,7 +174,7 @@ module Byte_stream = struct ) let to_chan (oc:out_channel) (self:t) = - iter (fun s i len -> output oc s i len) self + iter (fun s i len -> Stdlib.output oc s i len) self let of_bytes ?(i=0) ?len s : t = (* invariant: !i+!len is constant *) @@ -682,7 +707,7 @@ module Response = struct self.code Headers.pp self.headers pp_body self.body (* print a stream as a series of chunks *) - let output_stream_chunked_ (oc:out_channel) (str:byte_stream) : unit = + let output_stream_chunked_ (oc:out) (str:byte_stream) : unit = let continue = ref true in while !continue do (* next chunk *) @@ -697,7 +722,7 @@ module Response = struct done; () - let output_ (oc:out_channel) (self:t) : unit = + let output_ (oc:out) (self:t) : unit = fprintf oc "HTTP/1.1 %d %s\r\n" self.code (Response_code.descr self.code); let body, is_chunked = match self.body with | `String s when String.length s > 1024 * 500 -> @@ -843,7 +868,7 @@ end (* a request handler. handles a single request. *) type cb_path_handler = - out_channel -> + out -> byte_stream Request.t -> resp:(Response.t -> unit) -> unit @@ -911,7 +936,7 @@ let add_path_handler_ | handler -> (* we have a handler, do we accept the request based on its headers? *) begin match accept req with - | Ok () -> Some (Ok (fun _oc req ~resp -> resp (handler (tr_req req)))) + | Ok () -> Some (Ok (fun (_oc:out) req ~resp -> resp (handler (tr_req req)))) | Error _ as e -> Some e end | exception _ -> @@ -942,7 +967,7 @@ let add_route_handler_ | Some handler -> (* we have a handler, do we accept the request based on its headers? *) begin match accept req with - | Ok () -> Some (Ok (fun oc req ~resp -> tr_req oc req ~resp handler)) + | Ok () -> Some (Ok (fun (oc:out) req ~resp -> tr_req oc req ~resp handler)) | Error _ as e -> Some e end | None -> @@ -964,7 +989,7 @@ let[@inline] _opt_iter ~f o = match o with | Some x -> f x let add_route_server_sent_handler ?accept self route f = - let tr_req oc req ~resp f = + let tr_req (oc:out) req ~resp f = let req = Request.read_body_full req in let headers = ref Headers.(empty |> set "content-type" "text/event-stream") in @@ -999,7 +1024,7 @@ let add_route_server_sent_handler ?accept self route f = let close () = raise Exit end in try f req (module SSG : SERVER_SENT_GENERATOR); - with Exit -> close_out oc + with Exit -> Unix.close oc.fd in add_route_handler_ self ?accept ~meth:`GET route ~tr_req f @@ -1029,7 +1054,7 @@ let find_map f l = let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit = let _ = Unix.set_nonblock client_sock in - let oc = Unix.out_channel_of_descr client_sock in + let oc = out_of_descr client_sock in let buf = Buf_.create() in let is = Byte_stream.of_descr client_sock in let continue = ref true in