diff --git a/dune-project b/dune-project index f56ce53f..3eed4894 100644 --- a/dune-project +++ b/dune-project @@ -39,3 +39,14 @@ (iostream-camlzip (>= 0.2.1)) (logs :with-test) (odoc :with-doc))) + +(package + (name tiny_httpd_moonpool) + (synopsis "Moonpool+picos_stdio backend for Tiny_httpd") + (depends + seq + (tiny_httpd (= :version)) + (moonpool (>= 0.7)) + (moonpool-io (>= 0.7)) + (ocaml (>= 5.0)) + (odoc :with-doc))) diff --git a/src/moonpool-io/dune b/src/moonpool-io/dune new file mode 100644 index 00000000..f9880adb --- /dev/null +++ b/src/moonpool-io/dune @@ -0,0 +1,6 @@ + + +(library + (name tiny_httpd_moonpool) + (public_name tiny_httpd_moonpool) + (libraries tiny_httpd moonpool moonpool.sync moonpool.fib moonpool-io)) diff --git a/src/moonpool-io/io_server.ml b/src/moonpool-io/io_server.ml new file mode 100644 index 00000000..362511ba --- /dev/null +++ b/src/moonpool-io/io_server.ml @@ -0,0 +1,220 @@ +open Tiny_httpd_core +module A = Atomic +module MIO = Moonpool_io +module Sem = Moonpool_sync.Semaphore.Counting +module Fd = Moonpool_io.Fd + +module IO_helper = struct + module Slice = Iostream.Slice + + module Output = struct + include IO.Output + + class of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) (fd : Fd.t) : + t = + object + inherit t_from_output ~bytes:buf.bytes () + + method private output_underlying bs i len0 = + let i = ref i in + let len = ref len0 in + while !len > 0 do + match MIO.Unix.write fd bs !i !len with + | 0 -> failwith "write failed" + | n -> + i := !i + n; + len := !len - n + done + + method private close_underlying () = + if not !closed then ( + closed := true; + if close_noerr then ( + try MIO.Unix.close fd with _ -> () + ) else + MIO.Unix.close fd + ) + end + end + + module Input = struct + include IO.Input + + let of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) (fd : Fd.t) : + t = + let eof = ref false in + object + inherit Iostream.In_buf.t_from_refill ~bytes:buf.bytes () + + method private refill (slice : Slice.t) = + if not !eof then ( + slice.off <- 0; + let continue = ref true in + while !continue do + match + MIO.Unix.read fd slice.bytes 0 (Bytes.length slice.bytes) + with + | n -> + slice.len <- n; + continue := false + done; + (* Printf.eprintf "read returned %d B\n%!" !n; *) + if slice.len = 0 then eof := true + ) + + method close () = + if not !closed then ( + closed := true; + eof := true; + if close_noerr then ( + try MIO.Unix.close fd with _ -> () + ) else + MIO.Unix.close fd + ) + end + end +end + +open struct + let get_addr_ (fd : Fd.t) = + match Unix.getsockname (Fd.unsafe_get fd) with + | Unix.ADDR_INET (addr, port) -> addr, port + | _ -> invalid_arg "httpd: address is not INET" + + let shutdown_silent_ (fd : Fd.t) : unit = + try MIO.Unix.shutdown fd Unix.SHUTDOWN_ALL with _ -> () + + let close_silent_ (fd : Fd.t) : unit = try MIO.Unix.close fd with _ -> () +end + +type t = { + addr: string; + port: int; + buf_pool: Buf.t Pool.t; + slice_pool: IO.Slice.t Pool.t; + max_connections: int; + sem_max_connections: Sem.t; + (** semaphore to restrict the number of active concurrent connections *) + mutable sock: Fd.t option; (** Socket *) + new_thread: (unit -> unit) -> unit; + timeout: float; + running: bool A.t; (* TODO: use an atomic? *) +} + +let to_tcp_server (self : t) : IO.TCP_server.builder = + { + IO.TCP_server.serve = + (fun ~after_init ~handle () : unit -> + let sock, should_bind = + match self.sock with + | Some s -> + (* Because we're getting a socket from the caller (e.g. systemd) *) + s, false + | None -> + let sock = + Unix.socket + (if Util.is_ipv6_str self.addr then + Unix.PF_INET6 + else + Unix.PF_INET) + Unix.SOCK_STREAM 0 + in + let fd = Fd.create sock in + fd, true (* Because we're creating the socket ourselves *) + in + MIO.Unix.clear_nonblock sock; + MIO.Unix.setsockopt_optint sock Unix.SO_LINGER None; + if should_bind then ( + let inet_addr = Unix.inet_addr_of_string self.addr in + MIO.Unix.setsockopt sock Unix.SO_REUSEADDR true; + MIO.Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port)); + let n_listen = 2 * self.max_connections in + MIO.Unix.listen sock n_listen + ); + + self.sock <- Some sock; + + let tcp_server = + { + IO.TCP_server.stop = (fun () -> Atomic.set self.running false); + running = (fun () -> Atomic.get self.running); + active_connections = + (fun () -> + self.max_connections - Sem.get_value self.sem_max_connections); + endpoint = + (fun () -> + let addr, port = get_addr_ sock in + Unix.string_of_inet_addr addr, port); + } + in + after_init tcp_server; + + (* how to handle a single client *) + let handle_client_ (client_sock : Fd.t) (client_addr : Unix.sockaddr) : + unit = + Log.debug (fun k -> + k "t[%d]: serving new client on %s" + (Thread.id @@ Thread.self ()) + (Util.show_sockaddr client_addr)); + + MIO.Unix.set_nonblock client_sock; + MIO.Unix.setsockopt client_sock Unix.TCP_NODELAY true; + MIO.Unix.(setsockopt_float client_sock SO_RCVTIMEO self.timeout); + MIO.Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout); + + Pool.with_resource self.slice_pool @@ fun ic_buf -> + Pool.with_resource self.slice_pool @@ fun oc_buf -> + let closed = ref false in + + let oc = + new IO_helper.Output.of_unix_fd + ~close_noerr:true ~closed ~buf:oc_buf client_sock + in + let ic = + IO_helper.Input.of_unix_fd ~close_noerr:true ~closed ~buf:ic_buf + client_sock + in + handle.handle ~client_addr ic oc + in + + MIO.Unix.set_nonblock sock; + while Atomic.get self.running do + match MIO.Unix.accept sock with + | client_sock, client_addr -> + (* limit concurrency *) + Sem.acquire self.sem_max_connections; + self.new_thread (fun () -> + try + handle_client_ client_sock client_addr; + Log.debug (fun k -> + k "t[%d]: done with client on %s, exiting" + (Thread.id @@ Thread.self ()) + @@ Util.show_sockaddr client_addr); + shutdown_silent_ client_sock; + close_silent_ client_sock; + Sem.release self.sem_max_connections + with e -> + let bt = Printexc.get_raw_backtrace () in + shutdown_silent_ client_sock; + close_silent_ client_sock; + Sem.release self.sem_max_connections; + Log.error (fun k -> + k + "@[Handler: uncaught exception for client %s:@ %s@ \ + %s@]" + (Util.show_sockaddr client_addr) + (Printexc.to_string e) + (Printexc.raw_backtrace_to_string bt))) + | exception e -> + Log.error (fun k -> + k "Unix.accept raised an exception: %s" (Printexc.to_string e)); + Atomic.set self.running false + done; + + (* Wait for all threads to be done: this only works if all threads are done. *) + MIO.Unix.close sock; + while Sem.get_value self.sem_max_connections < self.max_connections do + Sem.acquire self.sem_max_connections + done; + ()); + } diff --git a/src/moonpool-io/tiny_httpd_moonpool.ml b/src/moonpool-io/tiny_httpd_moonpool.ml new file mode 100644 index 00000000..e432ccec --- /dev/null +++ b/src/moonpool-io/tiny_httpd_moonpool.ml @@ -0,0 +1,52 @@ +include Tiny_httpd +module Fd = Io_server.Fd + +open struct + let get_max_connection_ ?(max_connections = 64) () : int = + let max_connections = max 4 max_connections in + max_connections + + let clear_slice (slice : IO.Slice.t) = + Bytes.fill slice.bytes 0 (Bytes.length slice.bytes) '\x00'; + slice.off <- 0; + slice.len <- 0 +end + +let create ?max_connections ?(timeout = 0.0) ?buf_size + ?(get_time_s = Unix.gettimeofday) ?(addr = "127.0.0.1") ?(port = 8080) + ?(sock : Fd.t option) ?middlewares ~(runner : Moonpool.Runner.t) () : t = + let new_thread f = + ignore (Moonpool_fib.spawn_top ~on:runner f : _ Moonpool_fib.t) + in + let max_connections = get_max_connection_ ?max_connections () in + let server = + { + Io_server.addr; + new_thread; + buf_pool = + Pool.create ~clear:Buf.clear_and_zero + ~mk_item:(fun () -> Buf.create ?size:buf_size ()) + (); + slice_pool = + Pool.create ~clear:clear_slice + ~mk_item: + (let buf_size = Option.value buf_size ~default:4096 in + fun () -> IO.Slice.create buf_size) + (); + running = Atomic.make true; + port; + sock; + max_connections; + sem_max_connections = Io_server.Sem.make max_connections; + timeout; + } + in + let tcp_server_builder = Io_server.to_tcp_server server in + let module B = struct + let init_addr () = addr + let init_port () = port + let get_time_s = get_time_s + let tcp_server () = tcp_server_builder + end in + let backend = (module B : IO_BACKEND) in + Server.create_from ?buf_size ?middlewares ~backend () diff --git a/tiny_httpd_moonpool.opam b/tiny_httpd_moonpool.opam new file mode 100644 index 00000000..b067b7d8 --- /dev/null +++ b/tiny_httpd_moonpool.opam @@ -0,0 +1,35 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +version: "0.17" +synopsis: "Moonpool+picos_stdio backend for Tiny_httpd" +maintainer: ["c-cube"] +authors: ["c-cube"] +license: "MIT" +homepage: "https://github.com/c-cube/tiny_httpd/" +bug-reports: "https://github.com/c-cube/tiny_httpd/issues" +depends: [ + "dune" {>= "2.9"} + "seq" + "tiny_httpd" {= version} + "moonpool" {>= "0.7"} + "moonpool-io" {>= "0.7"} + "ocaml" {>= "5.0"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "--promote-install-files=false" + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] + ["dune" "install" "-p" name "--create-install-files" name] +] +dev-repo: "git+https://github.com/c-cube/tiny_httpd.git"