mirror of
https://github.com/c-cube/tiny_httpd.git
synced 2025-12-06 03:05:29 -05:00
feat: add tiny_httpd_moonpool library
This commit is contained in:
parent
e341f48ece
commit
c43ffb5ff4
5 changed files with 324 additions and 0 deletions
11
dune-project
11
dune-project
|
|
@ -39,3 +39,14 @@
|
|||
(iostream-camlzip (>= 0.2.1))
|
||||
(logs :with-test)
|
||||
(odoc :with-doc)))
|
||||
|
||||
(package
|
||||
(name tiny_httpd_moonpool)
|
||||
(synopsis "Moonpool+picos_stdio backend for Tiny_httpd")
|
||||
(depends
|
||||
seq
|
||||
(tiny_httpd (= :version))
|
||||
(moonpool (>= 0.7))
|
||||
(moonpool-io (>= 0.7))
|
||||
(ocaml (>= 5.0))
|
||||
(odoc :with-doc)))
|
||||
|
|
|
|||
6
src/moonpool-io/dune
Normal file
6
src/moonpool-io/dune
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
|
||||
|
||||
(library
|
||||
(name tiny_httpd_moonpool)
|
||||
(public_name tiny_httpd_moonpool)
|
||||
(libraries tiny_httpd moonpool moonpool.sync moonpool.fib moonpool-io))
|
||||
220
src/moonpool-io/io_server.ml
Normal file
220
src/moonpool-io/io_server.ml
Normal file
|
|
@ -0,0 +1,220 @@
|
|||
open Tiny_httpd_core
|
||||
module A = Atomic
|
||||
module MIO = Moonpool_io
|
||||
module Sem = Moonpool_sync.Semaphore.Counting
|
||||
module Fd = Moonpool_io.Fd
|
||||
|
||||
module IO_helper = struct
|
||||
module Slice = Iostream.Slice
|
||||
|
||||
module Output = struct
|
||||
include IO.Output
|
||||
|
||||
class of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) (fd : Fd.t) :
|
||||
t =
|
||||
object
|
||||
inherit t_from_output ~bytes:buf.bytes ()
|
||||
|
||||
method private output_underlying bs i len0 =
|
||||
let i = ref i in
|
||||
let len = ref len0 in
|
||||
while !len > 0 do
|
||||
match MIO.Unix.write fd bs !i !len with
|
||||
| 0 -> failwith "write failed"
|
||||
| n ->
|
||||
i := !i + n;
|
||||
len := !len - n
|
||||
done
|
||||
|
||||
method private close_underlying () =
|
||||
if not !closed then (
|
||||
closed := true;
|
||||
if close_noerr then (
|
||||
try MIO.Unix.close fd with _ -> ()
|
||||
) else
|
||||
MIO.Unix.close fd
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
module Input = struct
|
||||
include IO.Input
|
||||
|
||||
let of_unix_fd ?(close_noerr = false) ~closed ~(buf : Slice.t) (fd : Fd.t) :
|
||||
t =
|
||||
let eof = ref false in
|
||||
object
|
||||
inherit Iostream.In_buf.t_from_refill ~bytes:buf.bytes ()
|
||||
|
||||
method private refill (slice : Slice.t) =
|
||||
if not !eof then (
|
||||
slice.off <- 0;
|
||||
let continue = ref true in
|
||||
while !continue do
|
||||
match
|
||||
MIO.Unix.read fd slice.bytes 0 (Bytes.length slice.bytes)
|
||||
with
|
||||
| n ->
|
||||
slice.len <- n;
|
||||
continue := false
|
||||
done;
|
||||
(* Printf.eprintf "read returned %d B\n%!" !n; *)
|
||||
if slice.len = 0 then eof := true
|
||||
)
|
||||
|
||||
method close () =
|
||||
if not !closed then (
|
||||
closed := true;
|
||||
eof := true;
|
||||
if close_noerr then (
|
||||
try MIO.Unix.close fd with _ -> ()
|
||||
) else
|
||||
MIO.Unix.close fd
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
open struct
|
||||
let get_addr_ (fd : Fd.t) =
|
||||
match Unix.getsockname (Fd.unsafe_get fd) with
|
||||
| Unix.ADDR_INET (addr, port) -> addr, port
|
||||
| _ -> invalid_arg "httpd: address is not INET"
|
||||
|
||||
let shutdown_silent_ (fd : Fd.t) : unit =
|
||||
try MIO.Unix.shutdown fd Unix.SHUTDOWN_ALL with _ -> ()
|
||||
|
||||
let close_silent_ (fd : Fd.t) : unit = try MIO.Unix.close fd with _ -> ()
|
||||
end
|
||||
|
||||
type t = {
|
||||
addr: string;
|
||||
port: int;
|
||||
buf_pool: Buf.t Pool.t;
|
||||
slice_pool: IO.Slice.t Pool.t;
|
||||
max_connections: int;
|
||||
sem_max_connections: Sem.t;
|
||||
(** semaphore to restrict the number of active concurrent connections *)
|
||||
mutable sock: Fd.t option; (** Socket *)
|
||||
new_thread: (unit -> unit) -> unit;
|
||||
timeout: float;
|
||||
running: bool A.t; (* TODO: use an atomic? *)
|
||||
}
|
||||
|
||||
let to_tcp_server (self : t) : IO.TCP_server.builder =
|
||||
{
|
||||
IO.TCP_server.serve =
|
||||
(fun ~after_init ~handle () : unit ->
|
||||
let sock, should_bind =
|
||||
match self.sock with
|
||||
| Some s ->
|
||||
(* Because we're getting a socket from the caller (e.g. systemd) *)
|
||||
s, false
|
||||
| None ->
|
||||
let sock =
|
||||
Unix.socket
|
||||
(if Util.is_ipv6_str self.addr then
|
||||
Unix.PF_INET6
|
||||
else
|
||||
Unix.PF_INET)
|
||||
Unix.SOCK_STREAM 0
|
||||
in
|
||||
let fd = Fd.create sock in
|
||||
fd, true (* Because we're creating the socket ourselves *)
|
||||
in
|
||||
MIO.Unix.clear_nonblock sock;
|
||||
MIO.Unix.setsockopt_optint sock Unix.SO_LINGER None;
|
||||
if should_bind then (
|
||||
let inet_addr = Unix.inet_addr_of_string self.addr in
|
||||
MIO.Unix.setsockopt sock Unix.SO_REUSEADDR true;
|
||||
MIO.Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port));
|
||||
let n_listen = 2 * self.max_connections in
|
||||
MIO.Unix.listen sock n_listen
|
||||
);
|
||||
|
||||
self.sock <- Some sock;
|
||||
|
||||
let tcp_server =
|
||||
{
|
||||
IO.TCP_server.stop = (fun () -> Atomic.set self.running false);
|
||||
running = (fun () -> Atomic.get self.running);
|
||||
active_connections =
|
||||
(fun () ->
|
||||
self.max_connections - Sem.get_value self.sem_max_connections);
|
||||
endpoint =
|
||||
(fun () ->
|
||||
let addr, port = get_addr_ sock in
|
||||
Unix.string_of_inet_addr addr, port);
|
||||
}
|
||||
in
|
||||
after_init tcp_server;
|
||||
|
||||
(* how to handle a single client *)
|
||||
let handle_client_ (client_sock : Fd.t) (client_addr : Unix.sockaddr) :
|
||||
unit =
|
||||
Log.debug (fun k ->
|
||||
k "t[%d]: serving new client on %s"
|
||||
(Thread.id @@ Thread.self ())
|
||||
(Util.show_sockaddr client_addr));
|
||||
|
||||
MIO.Unix.set_nonblock client_sock;
|
||||
MIO.Unix.setsockopt client_sock Unix.TCP_NODELAY true;
|
||||
MIO.Unix.(setsockopt_float client_sock SO_RCVTIMEO self.timeout);
|
||||
MIO.Unix.(setsockopt_float client_sock SO_SNDTIMEO self.timeout);
|
||||
|
||||
Pool.with_resource self.slice_pool @@ fun ic_buf ->
|
||||
Pool.with_resource self.slice_pool @@ fun oc_buf ->
|
||||
let closed = ref false in
|
||||
|
||||
let oc =
|
||||
new IO_helper.Output.of_unix_fd
|
||||
~close_noerr:true ~closed ~buf:oc_buf client_sock
|
||||
in
|
||||
let ic =
|
||||
IO_helper.Input.of_unix_fd ~close_noerr:true ~closed ~buf:ic_buf
|
||||
client_sock
|
||||
in
|
||||
handle.handle ~client_addr ic oc
|
||||
in
|
||||
|
||||
MIO.Unix.set_nonblock sock;
|
||||
while Atomic.get self.running do
|
||||
match MIO.Unix.accept sock with
|
||||
| client_sock, client_addr ->
|
||||
(* limit concurrency *)
|
||||
Sem.acquire self.sem_max_connections;
|
||||
self.new_thread (fun () ->
|
||||
try
|
||||
handle_client_ client_sock client_addr;
|
||||
Log.debug (fun k ->
|
||||
k "t[%d]: done with client on %s, exiting"
|
||||
(Thread.id @@ Thread.self ())
|
||||
@@ Util.show_sockaddr client_addr);
|
||||
shutdown_silent_ client_sock;
|
||||
close_silent_ client_sock;
|
||||
Sem.release self.sem_max_connections
|
||||
with e ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
shutdown_silent_ client_sock;
|
||||
close_silent_ client_sock;
|
||||
Sem.release self.sem_max_connections;
|
||||
Log.error (fun k ->
|
||||
k
|
||||
"@[<v>Handler: uncaught exception for client %s:@ %s@ \
|
||||
%s@]"
|
||||
(Util.show_sockaddr client_addr)
|
||||
(Printexc.to_string e)
|
||||
(Printexc.raw_backtrace_to_string bt)))
|
||||
| exception e ->
|
||||
Log.error (fun k ->
|
||||
k "Unix.accept raised an exception: %s" (Printexc.to_string e));
|
||||
Atomic.set self.running false
|
||||
done;
|
||||
|
||||
(* Wait for all threads to be done: this only works if all threads are done. *)
|
||||
MIO.Unix.close sock;
|
||||
while Sem.get_value self.sem_max_connections < self.max_connections do
|
||||
Sem.acquire self.sem_max_connections
|
||||
done;
|
||||
());
|
||||
}
|
||||
52
src/moonpool-io/tiny_httpd_moonpool.ml
Normal file
52
src/moonpool-io/tiny_httpd_moonpool.ml
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
include Tiny_httpd
|
||||
module Fd = Io_server.Fd
|
||||
|
||||
open struct
|
||||
let get_max_connection_ ?(max_connections = 64) () : int =
|
||||
let max_connections = max 4 max_connections in
|
||||
max_connections
|
||||
|
||||
let clear_slice (slice : IO.Slice.t) =
|
||||
Bytes.fill slice.bytes 0 (Bytes.length slice.bytes) '\x00';
|
||||
slice.off <- 0;
|
||||
slice.len <- 0
|
||||
end
|
||||
|
||||
let create ?max_connections ?(timeout = 0.0) ?buf_size
|
||||
?(get_time_s = Unix.gettimeofday) ?(addr = "127.0.0.1") ?(port = 8080)
|
||||
?(sock : Fd.t option) ?middlewares ~(runner : Moonpool.Runner.t) () : t =
|
||||
let new_thread f =
|
||||
ignore (Moonpool_fib.spawn_top ~on:runner f : _ Moonpool_fib.t)
|
||||
in
|
||||
let max_connections = get_max_connection_ ?max_connections () in
|
||||
let server =
|
||||
{
|
||||
Io_server.addr;
|
||||
new_thread;
|
||||
buf_pool =
|
||||
Pool.create ~clear:Buf.clear_and_zero
|
||||
~mk_item:(fun () -> Buf.create ?size:buf_size ())
|
||||
();
|
||||
slice_pool =
|
||||
Pool.create ~clear:clear_slice
|
||||
~mk_item:
|
||||
(let buf_size = Option.value buf_size ~default:4096 in
|
||||
fun () -> IO.Slice.create buf_size)
|
||||
();
|
||||
running = Atomic.make true;
|
||||
port;
|
||||
sock;
|
||||
max_connections;
|
||||
sem_max_connections = Io_server.Sem.make max_connections;
|
||||
timeout;
|
||||
}
|
||||
in
|
||||
let tcp_server_builder = Io_server.to_tcp_server server in
|
||||
let module B = struct
|
||||
let init_addr () = addr
|
||||
let init_port () = port
|
||||
let get_time_s = get_time_s
|
||||
let tcp_server () = tcp_server_builder
|
||||
end in
|
||||
let backend = (module B : IO_BACKEND) in
|
||||
Server.create_from ?buf_size ?middlewares ~backend ()
|
||||
35
tiny_httpd_moonpool.opam
Normal file
35
tiny_httpd_moonpool.opam
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
# This file is generated by dune, edit dune-project instead
|
||||
opam-version: "2.0"
|
||||
version: "0.17"
|
||||
synopsis: "Moonpool+picos_stdio backend for Tiny_httpd"
|
||||
maintainer: ["c-cube"]
|
||||
authors: ["c-cube"]
|
||||
license: "MIT"
|
||||
homepage: "https://github.com/c-cube/tiny_httpd/"
|
||||
bug-reports: "https://github.com/c-cube/tiny_httpd/issues"
|
||||
depends: [
|
||||
"dune" {>= "2.9"}
|
||||
"seq"
|
||||
"tiny_httpd" {= version}
|
||||
"moonpool" {>= "0.7"}
|
||||
"moonpool-io" {>= "0.7"}
|
||||
"ocaml" {>= "5.0"}
|
||||
"odoc" {with-doc}
|
||||
]
|
||||
build: [
|
||||
["dune" "subst"] {dev}
|
||||
[
|
||||
"dune"
|
||||
"build"
|
||||
"-p"
|
||||
name
|
||||
"-j"
|
||||
jobs
|
||||
"--promote-install-files=false"
|
||||
"@install"
|
||||
"@runtest" {with-test}
|
||||
"@doc" {with-doc}
|
||||
]
|
||||
["dune" "install" "-p" name "--create-install-files" name]
|
||||
]
|
||||
dev-repo: "git+https://github.com/c-cube/tiny_httpd.git"
|
||||
Loading…
Add table
Reference in a new issue