mirror of
https://github.com/c-cube/tiny_httpd.git
synced 2025-12-06 03:05:29 -05:00
feat: tiny_httpd_eio library
provides a tiny_httpd server that relies on Eio for non-blocking sockets and for concurrency using eio fibers.
This commit is contained in:
parent
4b4fd2afe1
commit
d40f87f07b
5 changed files with 286 additions and 0 deletions
|
|
@ -39,3 +39,12 @@
|
|||
(iostream-camlzip (>= 0.2.1))
|
||||
(logs :with-test)
|
||||
(odoc :with-doc)))
|
||||
|
||||
(package
|
||||
(name tiny_httpd_eio)
|
||||
(synopsis "Use eio for tiny_httpd")
|
||||
(depends
|
||||
(tiny_httpd (= :version))
|
||||
eio
|
||||
(logs :with-test)
|
||||
(odoc :with-doc)))
|
||||
|
|
|
|||
8
src/eio/dune
Normal file
8
src/eio/dune
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
|
||||
(library
|
||||
(name tiny_httpd_eio)
|
||||
(public_name tiny_httpd_eio)
|
||||
(synopsis "An EIO-based backend for Tiny_httpd")
|
||||
(flags :standard -safe-string -warn-error -a+8)
|
||||
(libraries tiny_httpd eio eio.unix))
|
||||
|
||||
207
src/eio/tiny_httpd_eio.ml
Normal file
207
src/eio/tiny_httpd_eio.ml
Normal file
|
|
@ -0,0 +1,207 @@
|
|||
module IO = Tiny_httpd.IO
|
||||
module H = Tiny_httpd.Server
|
||||
module Pool = Tiny_httpd.Pool
|
||||
module Slice = IO.Slice
|
||||
module Log = Tiny_httpd.Log
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
|
||||
type 'a with_args =
|
||||
?addr:string ->
|
||||
?port:int ->
|
||||
?unix_sock:string ->
|
||||
?max_connections:int ->
|
||||
?max_buf_pool_size:int ->
|
||||
stdenv:Eio_unix.Stdenv.base ->
|
||||
sw:Eio.Switch.t ->
|
||||
'a
|
||||
|
||||
let get_max_connection_ ?(max_connections = 64) () : int =
|
||||
let max_connections = max 4 max_connections in
|
||||
max_connections
|
||||
|
||||
let buf_size = 16 * 1024
|
||||
|
||||
let eio_ipaddr_to_unix (a : _ Eio.Net.Ipaddr.t) : Unix.inet_addr =
|
||||
(* TODO: for ipv4 we really could do it faster via sprintf 🙄 *)
|
||||
Unix.inet_addr_of_string (Format.asprintf "%a" Eio.Net.Ipaddr.pp a)
|
||||
|
||||
let eio_sock_addr_to_unix (a : Eio.Net.Sockaddr.stream) : Unix.sockaddr =
|
||||
match a with
|
||||
| `Tcp (h, p) -> Unix.ADDR_INET (eio_ipaddr_to_unix h, p)
|
||||
| `Unix s -> Unix.ADDR_UNIX s
|
||||
|
||||
let ic_of_flow ~buf_pool:ic_pool (flow : _ Eio.Net.stream_socket) : IO.Input.t =
|
||||
let cstruct = Pool.Raw.acquire ic_pool in
|
||||
|
||||
object
|
||||
inherit Iostream.In_buf.t_from_refill ()
|
||||
|
||||
method private refill (sl : Slice.t) =
|
||||
assert (sl.len = 0);
|
||||
let cap = min (Bytes.length sl.bytes) (Cstruct.length cstruct) in
|
||||
|
||||
match Eio.Flow.single_read flow (Cstruct.sub cstruct 0 cap) with
|
||||
| exception End_of_file ->
|
||||
Log.debug (fun k -> k "read: eof");
|
||||
()
|
||||
| n ->
|
||||
Log.debug (fun k -> k "read %d bytes..." n);
|
||||
Cstruct.blit_to_bytes cstruct 0 sl.bytes 0 n;
|
||||
sl.off <- 0;
|
||||
sl.len <- n
|
||||
|
||||
method close () =
|
||||
Pool.Raw.release ic_pool cstruct;
|
||||
Eio.Flow.shutdown flow `Receive
|
||||
end
|
||||
|
||||
let oc_of_flow ~buf_pool:oc_pool (flow : _ Eio.Net.stream_socket) : IO.Output.t
|
||||
=
|
||||
(* write buffer *)
|
||||
let wbuf : Cstruct.t = Pool.Raw.acquire oc_pool in
|
||||
let offset = ref 0 in
|
||||
|
||||
object (self)
|
||||
method flush () : unit =
|
||||
if !offset > 0 then (
|
||||
Eio.Flow.write flow [ Cstruct.sub wbuf 0 !offset ];
|
||||
offset := 0
|
||||
)
|
||||
|
||||
method output buf i len =
|
||||
let i = ref i in
|
||||
let len = ref len in
|
||||
|
||||
while !len > 0 do
|
||||
let available = Cstruct.length wbuf - !offset in
|
||||
let n = min !len available in
|
||||
Cstruct.blit_from_bytes buf !i wbuf !offset n;
|
||||
offset := !offset + n;
|
||||
i := !i + n;
|
||||
len := !len - n;
|
||||
|
||||
if !offset = Cstruct.length wbuf then self#flush ()
|
||||
done
|
||||
|
||||
method output_char c =
|
||||
if !offset = Cstruct.length wbuf then self#flush ();
|
||||
Cstruct.set_char wbuf !offset c;
|
||||
incr offset;
|
||||
if !offset = Cstruct.length wbuf then self#flush ()
|
||||
|
||||
method close () =
|
||||
Pool.Raw.release oc_pool wbuf;
|
||||
Eio.Flow.shutdown flow `Send
|
||||
end
|
||||
|
||||
let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
|
||||
~(stdenv : Eio_unix.Stdenv.base) ~(sw : Eio.Switch.t) () :
|
||||
(module H.IO_BACKEND) =
|
||||
let addr, port, (sockaddr : Eio.Net.Sockaddr.stream) =
|
||||
match addr, port, unix_sock with
|
||||
| _, _, Some s -> Printf.sprintf "unix:%s" s, 0, `Unix s
|
||||
| addr, port, None ->
|
||||
let addr = Option.value ~default:"127.0.0.1" addr in
|
||||
let sockaddr, port =
|
||||
match Eio.Net.getaddrinfo stdenv#net addr, port with
|
||||
| `Tcp (h, _) :: _, None ->
|
||||
let p = 8080 in
|
||||
`Tcp (h, p), p
|
||||
| `Tcp (h, _) :: _, Some p -> `Tcp (h, p), p
|
||||
| _ ->
|
||||
failwith @@ Printf.sprintf "Could not parse TCP address from %S" addr
|
||||
in
|
||||
addr, port, sockaddr
|
||||
in
|
||||
|
||||
let module M = struct
|
||||
let init_addr () = addr
|
||||
let init_port () = port
|
||||
let get_time_s () = Unix.gettimeofday ()
|
||||
let max_connections = get_max_connection_ ?max_connections ()
|
||||
|
||||
let pool_size =
|
||||
match max_buf_pool_size with
|
||||
| Some n -> n
|
||||
| None -> min 4096 (max_connections * 2)
|
||||
|
||||
let cstruct_pool =
|
||||
Pool.create ~max_size:max_connections
|
||||
~mk_item:(fun () -> Cstruct.create buf_size)
|
||||
()
|
||||
|
||||
let tcp_server () : IO.TCP_server.builder =
|
||||
{
|
||||
IO.TCP_server.serve =
|
||||
(fun ~after_init ~handle () : unit ->
|
||||
let running = Atomic.make true in
|
||||
let active_conns = Atomic.make 0 in
|
||||
|
||||
Eio.Switch.on_release sw (fun () -> Atomic.set running false);
|
||||
let net = Eio.Stdenv.net stdenv in
|
||||
|
||||
(* main server socket *)
|
||||
let sock =
|
||||
let backlog = max_connections in
|
||||
Eio.Net.listen ~reuse_addr:true ~reuse_port:true ~backlog ~sw net
|
||||
sockaddr
|
||||
in
|
||||
|
||||
let tcp_server : IO.TCP_server.t =
|
||||
{
|
||||
running = (fun () -> Atomic.get running);
|
||||
stop =
|
||||
(fun () ->
|
||||
Atomic.set running false;
|
||||
Eio.Switch.fail sw Exit);
|
||||
endpoint =
|
||||
(fun () ->
|
||||
(* TODO: find the real port *)
|
||||
addr, port);
|
||||
active_connections = (fun () -> Atomic.get active_conns);
|
||||
}
|
||||
in
|
||||
|
||||
after_init tcp_server;
|
||||
|
||||
while Atomic.get running do
|
||||
Eio.Net.accept_fork ~sw
|
||||
~on_error:(fun exn ->
|
||||
Log.error (fun k ->
|
||||
k "error in client handler: %s" (Printexc.to_string exn)))
|
||||
sock
|
||||
(fun flow client_addr ->
|
||||
Atomic.incr active_conns;
|
||||
let@ () =
|
||||
Fun.protect ~finally:(fun () ->
|
||||
Log.debug (fun k ->
|
||||
k "Tiny_httpd_eio: client handler returned");
|
||||
Atomic.decr active_conns)
|
||||
in
|
||||
let ic = ic_of_flow ~buf_pool:cstruct_pool flow in
|
||||
let oc = oc_of_flow ~buf_pool:cstruct_pool flow in
|
||||
|
||||
Log.debug (fun k ->
|
||||
k "handling client on %a…" Eio.Net.Sockaddr.pp client_addr);
|
||||
let client_addr_unix = eio_sock_addr_to_unix client_addr in
|
||||
try handle.handle ~client_addr:client_addr_unix ic oc
|
||||
with exn ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
Log.error (fun k ->
|
||||
k "Client handler for %a failed with %s\n%s"
|
||||
Eio.Net.Sockaddr.pp client_addr
|
||||
(Printexc.to_string exn)
|
||||
(Printexc.raw_backtrace_to_string bt)))
|
||||
done);
|
||||
}
|
||||
end in
|
||||
(module M)
|
||||
|
||||
let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ~stdenv
|
||||
~sw ?buf_size ?middlewares () : H.t =
|
||||
let backend =
|
||||
io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections
|
||||
~stdenv ~sw ()
|
||||
in
|
||||
H.create_from ?buf_size ?middlewares ~backend ()
|
||||
31
src/eio/tiny_httpd_eio.mli
Normal file
31
src/eio/tiny_httpd_eio.mli
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
(** Tiny httpd EIO backend.
|
||||
|
||||
This replaces the threads + Unix blocking syscalls of {!Tiny_httpd_server}
|
||||
with an Eio-based cooperative system.
|
||||
|
||||
{b NOTE}: this is very experimental and will absolutely change over time,
|
||||
especially since Eio itself is also subject to change.
|
||||
@since NEXT_RELEASE *)
|
||||
|
||||
(* TODO: pass in a switch *)
|
||||
|
||||
type 'a with_args =
|
||||
?addr:string ->
|
||||
?port:int ->
|
||||
?unix_sock:string ->
|
||||
?max_connections:int ->
|
||||
?max_buf_pool_size:int ->
|
||||
stdenv:Eio_unix.Stdenv.base ->
|
||||
sw:Eio.Switch.t ->
|
||||
'a
|
||||
|
||||
val io_backend : (unit -> (module Tiny_httpd.Server.IO_BACKEND)) with_args
|
||||
(** Create a server *)
|
||||
|
||||
val create :
|
||||
(?buf_size:int ->
|
||||
?middlewares:([ `Encoding | `Stage of int ] * Tiny_httpd.Middleware.t) list ->
|
||||
unit ->
|
||||
Tiny_httpd.Server.t)
|
||||
with_args
|
||||
(** Create a server *)
|
||||
31
tiny_httpd_eio.opam
Normal file
31
tiny_httpd_eio.opam
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
# This file is generated by dune, edit dune-project instead
|
||||
opam-version: "2.0"
|
||||
version: "0.19"
|
||||
synopsis: "Use eio for tiny_httpd"
|
||||
maintainer: ["c-cube"]
|
||||
authors: ["c-cube"]
|
||||
license: "MIT"
|
||||
homepage: "https://github.com/c-cube/tiny_httpd/"
|
||||
bug-reports: "https://github.com/c-cube/tiny_httpd/issues"
|
||||
depends: [
|
||||
"dune" {>= "3.2"}
|
||||
"tiny_httpd" {= version}
|
||||
"eio"
|
||||
"logs" {with-test}
|
||||
"odoc" {with-doc}
|
||||
]
|
||||
build: [
|
||||
["dune" "subst"] {dev}
|
||||
[
|
||||
"dune"
|
||||
"build"
|
||||
"-p"
|
||||
name
|
||||
"-j"
|
||||
jobs
|
||||
"@install"
|
||||
"@runtest" {with-test}
|
||||
"@doc" {with-doc}
|
||||
]
|
||||
]
|
||||
dev-repo: "git+https://github.com/c-cube/tiny_httpd.git"
|
||||
Loading…
Add table
Reference in a new issue