(** IO abstraction. We abstract IO so we can support classic unix blocking IOs with threads, and modern async IO with Eio. {b NOTE}: experimental. @since 0.14 *) open Common_ module Buf = Buf module Slice = Iostream.Slice module A = Atomic_ (** Output channel (byte sink) *) module Output = struct include Iostream.Out_buf class of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) (fd : Unix.file_descr) : t = object inherit t_from_output ~bytes:buf.bytes () method private output_underlying bs i len0 = let i = ref i in let len = ref len0 in while !len > 0 do match Unix.write fd bs !i !len with | 0 -> failwith "write failed" | n -> i := !i + n; len := !len - n | exception Unix.Unix_error ( ( Unix.EBADF | Unix.ENOTCONN | Unix.ESHUTDOWN | Unix.ECONNRESET | Unix.EPIPE ), _, _ ) -> failwith "write failed" | exception Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.EAGAIN | Unix.EINTR), _, _) -> ignore (Unix.select [] [ fd ] [] 1.) done method private close_underlying () = if not (A.exchange closed true) then if close_noerr then ( try Unix.close fd with _ -> () ) else Unix.close fd end let output_buf (self : t) (buf : Buf.t) : unit = let b = Buf.bytes_slice buf in output self b 0 (Buf.size buf) (** [chunk_encoding oc] makes a new channel that outputs its content into [oc] in chunk encoding form. @param close_rec if true, closing the result will also close [oc] @param buf a buffer used to accumulate data into chunks. Chunks are emitted when [buf]'s size gets over a certain threshold, or when [flush] is called. *) let chunk_encoding ?(buf = Buf.create ()) ~close_rec (oc : #t) : t = (* write content of [buf] as a chunk if it's big enough. If [force=true] then write content of [buf] if it's simply non empty. *) let write_buf ~force () = let n = Buf.size buf in if (force && n > 0) || n >= 4_096 then ( output_string oc (Printf.sprintf "%x\r\n" n); output oc (Buf.bytes_slice buf) 0 n; output_string oc "\r\n"; Buf.clear buf ) in object method flush () = write_buf ~force:true (); flush oc method close () = write_buf ~force:true (); (* write an empty chunk to close the stream *) output_string oc "0\r\n"; (* write another crlf after the stream (see #56) *) output_string oc "\r\n"; flush oc; if close_rec then close oc method output b i n = Buf.add_bytes buf b i n; write_buf ~force:false () method output_char c = Buf.add_char buf c; write_buf ~force:false () end end (** Input channel (byte source) *) module Input = struct include Iostream.In_buf let of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) (fd : Unix.file_descr) : t = let eof = ref false in let input buf i len : int = let n = ref 0 in if not !eof then ( n := Unix.read fd buf i len; if !n = 0 then eof := true ); !n in object inherit Iostream.In_buf.t_from_refill ~bytes:buf.bytes () method private refill (slice : Slice.t) = if not !eof then ( slice.off <- 0; slice.len <- input slice.bytes 0 (Bytes.length slice.bytes); (* Printf.eprintf "read returned %d B\n%!" !n; *) if slice.len = 0 then eof := true ) method close () = if not (A.exchange closed true) then ( eof := true; if close_noerr then ( try Unix.close fd with _ -> () ) else Unix.close fd ) end let[@inline] of_string s : t = (of_string s :> t) let of_slice (slice : Slice.t) : t = object inherit Iostream.In_buf.t_from_refill ~bytes:slice.bytes () method private refill (slice : Slice.t) = slice.off <- 0; slice.len <- 0 method close () = () end (** Read into the given slice. @return the number of bytes read, [0] means end of input. *) let[@inline] input (self : t) buf i len = self#input buf i len (** Close the channel. *) let[@inline] close self : unit = self#close () (** Read exactly [len] bytes. @raise End_of_file if the input did not contain enough data. *) let really_input (self : #t) buf i len : unit = let i = ref i in let len = ref len in while !len > 0 do let n = input self buf !i !len in if n = 0 then raise End_of_file; i := !i + n; len := !len - n done let iter_slice (f : Slice.t -> unit) (self : #t) : unit = let continue = ref true in while !continue do let slice = self#fill_buf () in if slice.len = 0 then ( continue := false; close self ) else ( f slice; Slice.consume slice slice.len ) done let iter f self = iter_slice (fun (slice : Slice.t) -> f slice.bytes slice.off slice.len) self let to_chan oc (self : #t) = iter_slice (fun (slice : Slice.t) -> Stdlib.output oc slice.bytes slice.off slice.len) self let to_chan' (oc : #Iostream.Out.t) (self : #t) : unit = iter_slice (fun (slice : Slice.t) -> Iostream.Out.output oc slice.bytes slice.off slice.len) self (** Output a stream using chunked encoding *) let output_chunked' ?buf (oc : #Iostream.Out_buf.t) (self : #t) : unit = let oc' = Output.chunk_encoding ?buf oc ~close_rec:false in match to_chan' oc' self with | () -> Output.close oc' | exception e -> let bt = Printexc.get_raw_backtrace () in Output.close oc'; Printexc.raise_with_backtrace e bt (** print a stream as a series of chunks *) let output_chunked ?buf (oc : out_channel) (self : #t) : unit = output_chunked' ?buf (Output.of_out_channel oc) self end (** Input channel (byte source) with read-with-timeout *) module Input_with_timeout = struct include Iostream.In_buf class type t = Iostream.In_buf.t_with_timeout exception Timeout = Iostream.Timeout (** Exception for timeouts *) exception Timeout_partial_read of int (** Exception for timeouts with a partial read *) (** fill buffer, but stop at the deadline *) let fill_buf_with_deadline (self : #t) ~(deadline : float) : Slice.t = let timeout = deadline -. Time.now_s () in if timeout <= 0. then raise Timeout; fill_buf_with_timeout self timeout (** fill buffer, but stop at the deadline if provided *) let fill_buf_with_deadline_opt (self : #t) ~(deadline : float option) : Slice.t = match deadline with | None -> fill_buf self | Some d -> fill_buf_with_deadline self ~deadline:d let of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) (fd : Unix.file_descr) : t = let eof = ref false in let input_with_timeout t buf i len : int = let deadline = Time.now_s () +. t in let n = ref 0 in while (not (Atomic.get closed)) && (not !eof) && try n := Unix.read fd buf i len; false with | Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> (* sleep *) true | Unix.Unix_error ((Unix.ECONNRESET | Unix.ESHUTDOWN | Unix.EPIPE), _, _) -> (* exit *) false do let now = Time.now_s () in if now >= deadline then raise Timeout; ignore (Unix.select [ fd ] [] [] (deadline -. now) : _ * _ * _) done; !n in object inherit Iostream.In_buf.t_with_timeout_from_refill ~bytes:buf.bytes () method private refill_with_timeout t (slice : Slice.t) = if not !eof then ( slice.off <- 0; slice.len <- input_with_timeout t slice.bytes 0 (Bytes.length slice.bytes); (* Printf.eprintf "read returned %d B\n%!" !n; *) if slice.len = 0 then eof := true ) method close () = if not (A.exchange closed true) then ( eof := true; if close_noerr then ( try Unix.close fd with _ -> () ) else Unix.close fd ) end let of_slice (slice : Slice.t) : t = object inherit Iostream.In_buf.t_with_timeout_from_refill ~bytes:slice.bytes () method private refill_with_timeout _t (slice : Slice.t) = slice.off <- 0; slice.len <- 0 method close () = () end (** Read into the given slice. @return the number of bytes read, [0] means end of input. *) let[@inline] input (self : t) buf i len = self#input buf i len (** Close the channel. *) let[@inline] close self : unit = self#close () let iter_slice = Input.iter_slice let iter = Input.iter let to_chan = Input.to_chan let to_chan' = Input.to_chan' (** Read the whole body @param deadline a deadline before which the operation must complete @raise Timeout if deadline expires (leftovers are in [buf] *) let read_all_using ~buf ~(deadline : float) (self : #t) : string = Buf.clear buf; let continue = ref true in while !continue do let timeout = deadline -. Time.now_s () in if timeout <= 0. then raise Timeout; let slice = fill_buf_with_timeout self timeout in if slice.len = 0 then continue := false else ( assert (slice.len > 0); Buf.add_bytes buf slice.bytes slice.off slice.len; Slice.consume slice slice.len ) done; Buf.contents_and_clear buf (** Read [n] bytes from the input into [bytes]. @raise Timeout_partial_read if timeout occurs before it's done *) let read_exactly_ ?(off = 0) ~too_short ~(deadline : float) (self : #t) (bytes : bytes) (n : int) : unit = assert (Bytes.length bytes >= off + n); let offset = ref off in while !offset < n do let slice = try fill_buf_with_deadline self ~deadline with Timeout -> raise (Timeout_partial_read (!offset - off)) in let n_read = min slice.len (n - !offset) in Bytes.blit slice.bytes slice.off bytes !offset n_read; offset := !offset + n_read; Slice.consume slice n_read; if n_read = 0 then too_short () done let[@inline] really_input (self : #t) ~deadline buf i len = read_exactly_ ~off:i ~deadline self buf len ~too_short:(fun () -> raise End_of_file) (** read a line into the buffer, after clearing it. *) let read_line_into (self : #t) ~(deadline : float) ~buf : unit = Buf.clear buf; let continue = ref true in while !continue do let slice = fill_buf_with_deadline self ~deadline in if slice.len = 0 then ( continue := false; if Buf.size buf = 0 then raise End_of_file ); let j = ref slice.off in while !j < slice.off + slice.len && Bytes.get slice.bytes !j <> '\n' do incr j done; if !j - slice.off < slice.len then ( assert (Bytes.get slice.bytes !j = '\n'); (* line without '\n' *) Buf.add_bytes buf slice.bytes slice.off (!j - slice.off); (* consume line + '\n' *) Slice.consume slice (!j - slice.off + 1); continue := false ) else ( Buf.add_bytes buf slice.bytes slice.off slice.len; Slice.consume slice slice.len ) done let read_line_using ~buf ~deadline (self : #t) : string = read_line_into self ~deadline ~buf; Buf.contents_and_clear buf let read_line_using_opt ~buf ~deadline (self : #t) : string option = match read_line_into self ~buf ~deadline with | () -> Some (Buf.contents_and_clear buf) | exception End_of_file -> None (* helper for making a new input stream that either contains at most [size] bytes, or contains exactly [size] bytes. *) let reading_exactly_ ~skip_on_close ~close_rec ~size ~bytes (arg : #t) : t = let remaining_size = ref size in object inherit t_with_timeout_from_refill ~bytes () method close () = if !remaining_size > 0 && skip_on_close then skip arg !remaining_size; if close_rec then close arg method private refill_with_timeout t (slice : Slice.t) = slice.off <- 0; slice.len <- 0; if !remaining_size > 0 then ( let sub = fill_buf_with_timeout arg t in let n = min !remaining_size (min sub.len (Bytes.length slice.bytes)) in Bytes.blit sub.bytes sub.off slice.bytes 0 n; Slice.consume sub n; remaining_size := !remaining_size - n; slice.len <- n ) end (** new stream with maximum size [max_size]. @param close_rec if true, closing this will also close the input stream *) let limit_size_to ~close_rec ~max_size ~bytes (arg : #t) : t = reading_exactly_ ~size:max_size ~skip_on_close:false ~bytes ~close_rec arg (** New stream that consumes exactly [size] bytes from the input. If fewer bytes are read before [close] is called, we read and discard the remaining quota of bytes before [close] returns. @param close_rec if true, closing this will also close the input stream *) let reading_exactly ~close_rec ~size ~bytes (arg : t) : t = reading_exactly_ ~size ~close_rec ~skip_on_close:true ~bytes arg let read_chunked ~(bytes : bytes) ~fail (ic : #t) : t = let first = ref true in (* small buffer to read the chunk sizes *) let line_buf = Buf.create ~size:32 () in let read_next_chunk_len ~deadline () : int = if !first then first := false else ( let line = read_line_using ~buf:line_buf ~deadline ic in if String.trim line <> "" then raise (fail "expected crlf between chunks") ); let line = read_line_using ~buf:line_buf ~deadline ic in (* parse chunk length, ignore extensions *) let chunk_size = if String.trim line = "" then 0 else ( try let off = ref 0 in let n = Parse_.pos_hex line off in n with _ -> raise (fail (spf "cannot read chunk size from line %S" line)) ) in chunk_size in let eof = ref false in let chunk_size = ref 0 in object inherit t_with_timeout_from_refill ~bytes () method private refill_with_timeout t (slice : Slice.t) : unit = let deadline = Time.now_s () +. t in if !chunk_size = 0 && not !eof then ( chunk_size := read_next_chunk_len ~deadline (); if !chunk_size = 0 then eof := true (* stream is finished *) ); slice.off <- 0; slice.len <- 0; if !chunk_size > 0 then ( (* read the whole chunk, or [Bytes.length bytes] of it *) let to_read = min !chunk_size (Bytes.length slice.bytes) in read_exactly_ ~deadline ~too_short:(fun () -> raise (fail "chunk is too short")) ic slice.bytes to_read; slice.len <- to_read; chunk_size := !chunk_size - to_read ) method close () = eof := true (* do not close underlying stream *) end let output_chunked = Input.output_chunked let output_chunked' = Input.output_chunked' end (** A writer abstraction. *) module Writer = struct type t = { write: Output.t -> unit } [@@unboxed] (** Writer. A writer is a push-based stream of bytes. Give it an output channel and it will write the bytes in it. This is useful for responses: an http endpoint can return a writer as its response's body; the writer is given access to the connection to the client and can write into it as if it were a regular [out_channel], including controlling calls to [flush]. Tiny_httpd will convert these writes into valid HTTP chunks. @since 0.14 *) let[@inline] make ~write () : t = { write } (** Write into the channel. *) let[@inline] write (oc : #Output.t) (self : t) : unit = self.write (oc :> Output.t) (** Empty writer, will output 0 bytes. *) let empty : t = { write = ignore } (** A writer that just emits the bytes from the given string. *) let[@inline] of_string (str : string) : t = let write oc = Iostream.Out.output_string oc str in { write } let[@inline] of_input (ic : #Input.t) : t = { write = (fun oc -> Input.to_chan' oc ic) } end (** A TCP server abstraction. *) module TCP_server = struct type conn_handler = { handle: client_addr:Unix.sockaddr -> Input_with_timeout.t -> Output.t -> unit; (** Handle client connection *) } type t = { endpoint: unit -> string * int; (** Endpoint we listen on. This can only be called from within [serve]. *) active_connections: unit -> int; (** Number of connections currently active *) running: unit -> bool; (** Is the server currently running? *) stop: unit -> unit; (** Ask the server to stop. This might not take effect immediately, and is idempotent. After this [server.running()] must return [false]. *) } (** A running TCP server. This contains some functions that provide information about the running server, including whether it's active (as opposed to stopped), a function to stop it, and statistics about the number of connections. *) type builder = { serve: after_init:(t -> unit) -> handle:conn_handler -> unit -> unit; (** Blocking call to listen for incoming connections and handle them. Uses the connection handler [handle] to handle individual client connections in individual threads/fibers/tasks. @param after_init is called once with the server after the server has started. *) } (** A TCP server builder implementation. Calling [builder.serve ~after_init ~handle ()] starts a new TCP server on an unspecified endpoint (most likely coming from the function returning this builder) and returns the running server. *) end