Merge commit '03596c1a08f9b9fa063f22c40d80afc73d14ed08'

This commit is contained in:
Simon Cruanes 2023-08-08 15:39:55 -04:00
commit 20b85c9926
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
4 changed files with 16 additions and 12 deletions

View file

@ -187,7 +187,7 @@ end
(** A TCP server abstraction. *) (** A TCP server abstraction. *)
module TCP_server = struct module TCP_server = struct
type conn_handler = { type conn_handler = {
handle: Input.t -> Output.t -> unit; (** Handle client connection *) handle: client_addr:Unix.sockaddr -> Input.t -> Output.t -> unit; (** Handle client connection *)
} }
type t = { type t = {

View file

@ -164,6 +164,7 @@ module Request = struct
type 'body t = { type 'body t = {
meth: Meth.t; meth: Meth.t;
host: string; host: string;
client_addr: Unix.sockaddr;
headers: Headers.t; headers: Headers.t;
http_version: int * int; http_version: int * int;
path: string; path: string;
@ -245,7 +246,7 @@ module Request = struct
bad_reqf 400 "body is too short by %d bytes" size) bad_reqf 400 "body is too short by %d bytes" size)
(* parse request, but not body (yet) *) (* parse request, but not body (yet) *)
let parse_req_start ~get_time_s ~buf (bs : byte_stream) : let parse_req_start ~client_addr ~get_time_s ~buf (bs : byte_stream) :
unit t option resp_result = unit t option resp_result =
try try
let line = Byte_stream.read_line ~buf bs in let line = Byte_stream.read_line ~buf bs in
@ -281,6 +282,7 @@ module Request = struct
meth; meth;
query; query;
host; host;
client_addr;
path; path;
path_components; path_components;
headers; headers;
@ -340,8 +342,8 @@ module Request = struct
| e -> bad_reqf 500 "failed to read body: %s" (Printexc.to_string e) | e -> bad_reqf 500 "failed to read body: %s" (Printexc.to_string e)
module Internal_ = struct module Internal_ = struct
let parse_req_start ?(buf = Buf.create ()) ~get_time_s bs = let parse_req_start ?(buf = Buf.create ()) ~client_addr ~get_time_s bs =
parse_req_start ~get_time_s ~buf bs |> unwrap_resp_result parse_req_start ~client_addr ~get_time_s ~buf bs |> unwrap_resp_result
let parse_body ?(buf = Buf.create ()) req bs : _ t = let parse_body ?(buf = Buf.create ()) req bs : _ t =
parse_body_ ~tr_stream:(fun s -> s) ~buf { req with body = bs } parse_body_ ~tr_stream:(fun s -> s) ~buf { req with body = bs }
@ -918,14 +920,14 @@ module Unix_tcp_server_ = struct
after_init tcp_server; after_init tcp_server;
(* how to handle a single client *) (* how to handle a single client *)
let handle_client_unix_ (client_sock : Unix.file_descr) : unit = let handle_client_unix_ (client_sock : Unix.file_descr) (client_addr : Unix.sockaddr) : unit =
Unix.(setsockopt_float client_sock SO_RCVTIMEO self.timeout); Unix.(setsockopt_float client_sock SO_RCVTIMEO self.timeout);
Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout); Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout);
let oc = let oc =
IO.Output.of_out_channel @@ Unix.out_channel_of_descr client_sock IO.Output.of_out_channel @@ Unix.out_channel_of_descr client_sock
in in
let ic = IO.Input.of_unix_fd client_sock in let ic = IO.Input.of_unix_fd client_sock in
handle.handle ic oc; handle.handle ~client_addr ic oc;
_debug (fun k -> k "done with client, exiting"); _debug (fun k -> k "done with client, exiting");
(try Unix.close client_sock (try Unix.close client_sock
with e -> with e ->
@ -938,14 +940,14 @@ module Unix_tcp_server_ = struct
(* limit concurrency *) (* limit concurrency *)
Sem_.acquire 1 self.sem_max_connections; Sem_.acquire 1 self.sem_max_connections;
try try
let client_sock, _ = Unix.accept sock in let client_sock, client_addr = Unix.accept sock in
Unix.setsockopt client_sock Unix.TCP_NODELAY true; Unix.setsockopt client_sock Unix.TCP_NODELAY true;
(* Block INT/HUP while cloning to avoid children handling them. (* Block INT/HUP while cloning to avoid children handling them.
When thread gets them, our Unix.accept raises neatly. *) When thread gets them, our Unix.accept raises neatly. *)
ignore Unix.(sigprocmask SIG_BLOCK Sys.[ sigint; sighup ]); ignore Unix.(sigprocmask SIG_BLOCK Sys.[ sigint; sighup ]);
self.new_thread (fun () -> self.new_thread (fun () ->
try try
handle_client_unix_ client_sock; handle_client_unix_ client_sock client_addr;
Sem_.release 1 self.sem_max_connections Sem_.release 1 self.sem_max_connections
with e -> with e ->
(try Unix.close client_sock with _ -> ()); (try Unix.close client_sock with _ -> ());
@ -1015,7 +1017,7 @@ let find_map f l =
aux f l aux f l
(* handle client on [ic] and [oc] *) (* handle client on [ic] and [oc] *)
let client_handle_for (self : t) ic oc : unit = let client_handle_for (self : t) ~client_addr ic oc : unit =
Pool.with_resource self.buf_pool @@ fun buf -> Pool.with_resource self.buf_pool @@ fun buf ->
Pool.with_resource self.buf_pool @@ fun buf_res -> Pool.with_resource self.buf_pool @@ fun buf_res ->
let is = Byte_stream.of_input ~buf_size:self.buf_size ic in let is = Byte_stream.of_input ~buf_size:self.buf_size ic in
@ -1023,7 +1025,7 @@ let client_handle_for (self : t) ic oc : unit =
while !continue && running self do while !continue && running self do
_debug (fun k -> k "read next request"); _debug (fun k -> k "read next request");
let (module B) = self.backend in let (module B) = self.backend in
match Request.parse_req_start ~get_time_s:B.get_time_s ~buf is with match Request.parse_req_start ~client_addr ~get_time_s:B.get_time_s ~buf is with
| Ok None -> continue := false (* client is done *) | Ok None -> continue := false (* client is done *)
| Error (c, s) -> | Error (c, s) ->
(* connection error, close *) (* connection error, close *)

View file

@ -67,6 +67,7 @@ module Request : sig
meth: Meth.t; (** HTTP method for this request. *) meth: Meth.t; (** HTTP method for this request. *)
host: string; host: string;
(** Host header, mandatory. It can also be found in {!headers}. *) (** Host header, mandatory. It can also be found in {!headers}. *)
client_addr : Unix.sockaddr; (** Client address. Available since NEXT_RELEASE. *)
headers: Headers.t; (** List of headers. *) headers: Headers.t; (** List of headers. *)
http_version: int * int; http_version: int * int;
(** HTTP version. This should be either [1, 0] or [1, 1]. *) (** HTTP version. This should be either [1, 0] or [1, 1]. *)
@ -161,7 +162,7 @@ module Request : sig
(* for testing purpose, do not use. There is no guarantee of stability. *) (* for testing purpose, do not use. There is no guarantee of stability. *)
module Internal_ : sig module Internal_ : sig
val parse_req_start : val parse_req_start :
?buf:buf -> get_time_s:(unit -> float) -> byte_stream -> unit t option ?buf:buf -> client_addr:Unix.sockaddr -> get_time_s:(unit -> float) -> byte_stream -> unit t option
val parse_body : ?buf:buf -> unit t -> byte_stream -> byte_stream t val parse_body : ?buf:buf -> unit t -> byte_stream -> byte_stream t
end end

View file

@ -10,7 +10,8 @@ let () =
salutationsSOMEJUNK" salutationsSOMEJUNK"
in in
let str = Tiny_httpd.Byte_stream.of_string q in let str = Tiny_httpd.Byte_stream.of_string q in
let r = Request.Internal_.parse_req_start ~get_time_s:(fun _ -> 0.) str in let client_addr = Unix.(ADDR_INET (inet_addr_loopback, 1024)) in
let r = Request.Internal_.parse_req_start ~client_addr ~get_time_s:(fun _ -> 0.) str in
match r with match r with
| None -> failwith "should parse" | None -> failwith "should parse"
| Some req -> | Some req ->