feat: tiny_httpd_eio library

provides a tiny_httpd server that relies on Eio for non-blocking
sockets and for concurrency using eio fibers.
This commit is contained in:
Simon Cruanes 2025-05-01 15:37:09 -04:00
parent 365889557c
commit b1dfdf1f39
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 286 additions and 0 deletions

View file

@ -39,3 +39,12 @@
(iostream-camlzip (>= 0.2.1)) (iostream-camlzip (>= 0.2.1))
(logs :with-test) (logs :with-test)
(odoc :with-doc))) (odoc :with-doc)))
(package
(name tiny_httpd_eio)
(synopsis "Use eio for tiny_httpd")
(depends
(tiny_httpd (= :version))
eio
(logs :with-test)
(odoc :with-doc)))

8
src/eio/dune Normal file
View file

@ -0,0 +1,8 @@
(library
(name tiny_httpd_eio)
(public_name tiny_httpd_eio)
(synopsis "An EIO-based backend for Tiny_httpd")
(flags :standard -safe-string -warn-error -a+8)
(libraries tiny_httpd eio eio.unix))

207
src/eio/tiny_httpd_eio.ml Normal file
View file

@ -0,0 +1,207 @@
module IO = Tiny_httpd.IO
module H = Tiny_httpd.Server
module Pool = Tiny_httpd.Pool
module Slice = IO.Slice
module Log = Tiny_httpd.Log
let ( let@ ) = ( @@ )
type 'a with_args =
?addr:string ->
?port:int ->
?unix_sock:string ->
?max_connections:int ->
?max_buf_pool_size:int ->
stdenv:Eio_unix.Stdenv.base ->
sw:Eio.Switch.t ->
'a
let get_max_connection_ ?(max_connections = 64) () : int =
let max_connections = max 4 max_connections in
max_connections
let buf_size = 16 * 1024
let eio_ipaddr_to_unix (a : _ Eio.Net.Ipaddr.t) : Unix.inet_addr =
(* TODO: for ipv4 we really could do it faster via sprintf 🙄 *)
Unix.inet_addr_of_string (Format.asprintf "%a" Eio.Net.Ipaddr.pp a)
let eio_sock_addr_to_unix (a : Eio.Net.Sockaddr.stream) : Unix.sockaddr =
match a with
| `Tcp (h, p) -> Unix.ADDR_INET (eio_ipaddr_to_unix h, p)
| `Unix s -> Unix.ADDR_UNIX s
let ic_of_flow ~buf_pool:ic_pool (flow : _ Eio.Net.stream_socket) : IO.Input.t =
let cstruct = Pool.Raw.acquire ic_pool in
object
inherit Iostream.In_buf.t_from_refill ()
method private refill (sl : Slice.t) =
assert (sl.len = 0);
let cap = min (Bytes.length sl.bytes) (Cstruct.length cstruct) in
match Eio.Flow.single_read flow (Cstruct.sub cstruct 0 cap) with
| exception End_of_file ->
Log.debug (fun k -> k "read: eof");
()
| n ->
Log.debug (fun k -> k "read %d bytes..." n);
Cstruct.blit_to_bytes cstruct 0 sl.bytes 0 n;
sl.off <- 0;
sl.len <- n
method close () =
Pool.Raw.release ic_pool cstruct;
Eio.Flow.shutdown flow `Receive
end
let oc_of_flow ~buf_pool:oc_pool (flow : _ Eio.Net.stream_socket) : IO.Output.t
=
(* write buffer *)
let wbuf : Cstruct.t = Pool.Raw.acquire oc_pool in
let offset = ref 0 in
object (self)
method flush () : unit =
if !offset > 0 then (
Eio.Flow.write flow [ Cstruct.sub wbuf 0 !offset ];
offset := 0
)
method output buf i len =
let i = ref i in
let len = ref len in
while !len > 0 do
let available = Cstruct.length wbuf - !offset in
let n = min !len available in
Cstruct.blit_from_bytes buf !i wbuf !offset n;
offset := !offset + n;
i := !i + n;
len := !len - n;
if !offset = Cstruct.length wbuf then self#flush ()
done
method output_char c =
if !offset = Cstruct.length wbuf then self#flush ();
Cstruct.set_char wbuf !offset c;
incr offset;
if !offset = Cstruct.length wbuf then self#flush ()
method close () =
Pool.Raw.release oc_pool wbuf;
Eio.Flow.shutdown flow `Send
end
let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
~(stdenv : Eio_unix.Stdenv.base) ~(sw : Eio.Switch.t) () :
(module H.IO_BACKEND) =
let addr, port, (sockaddr : Eio.Net.Sockaddr.stream) =
match addr, port, unix_sock with
| _, _, Some s -> Printf.sprintf "unix:%s" s, 0, `Unix s
| addr, port, None ->
let addr = Option.value ~default:"127.0.0.1" addr in
let sockaddr, port =
match Eio.Net.getaddrinfo stdenv#net addr, port with
| `Tcp (h, _) :: _, None ->
let p = 8080 in
`Tcp (h, p), p
| `Tcp (h, _) :: _, Some p -> `Tcp (h, p), p
| _ ->
failwith @@ Printf.sprintf "Could not parse TCP address from %S" addr
in
addr, port, sockaddr
in
let module M = struct
let init_addr () = addr
let init_port () = port
let get_time_s () = Unix.gettimeofday ()
let max_connections = get_max_connection_ ?max_connections ()
let pool_size =
match max_buf_pool_size with
| Some n -> n
| None -> min 4096 (max_connections * 2)
let cstruct_pool =
Pool.create ~max_size:max_connections
~mk_item:(fun () -> Cstruct.create buf_size)
()
let tcp_server () : IO.TCP_server.builder =
{
IO.TCP_server.serve =
(fun ~after_init ~handle () : unit ->
let running = Atomic.make true in
let active_conns = Atomic.make 0 in
Eio.Switch.on_release sw (fun () -> Atomic.set running false);
let net = Eio.Stdenv.net stdenv in
(* main server socket *)
let sock =
let backlog = max_connections in
Eio.Net.listen ~reuse_addr:true ~reuse_port:true ~backlog ~sw net
sockaddr
in
let tcp_server : IO.TCP_server.t =
{
running = (fun () -> Atomic.get running);
stop =
(fun () ->
Atomic.set running false;
Eio.Switch.fail sw Exit);
endpoint =
(fun () ->
(* TODO: find the real port *)
addr, port);
active_connections = (fun () -> Atomic.get active_conns);
}
in
after_init tcp_server;
while Atomic.get running do
Eio.Net.accept_fork ~sw
~on_error:(fun exn ->
Log.error (fun k ->
k "error in client handler: %s" (Printexc.to_string exn)))
sock
(fun flow client_addr ->
Atomic.incr active_conns;
let@ () =
Fun.protect ~finally:(fun () ->
Log.debug (fun k ->
k "Tiny_httpd_eio: client handler returned");
Atomic.decr active_conns)
in
let ic = ic_of_flow ~buf_pool:cstruct_pool flow in
let oc = oc_of_flow ~buf_pool:cstruct_pool flow in
Log.debug (fun k ->
k "handling client on %a…" Eio.Net.Sockaddr.pp client_addr);
let client_addr_unix = eio_sock_addr_to_unix client_addr in
try handle.handle ~client_addr:client_addr_unix ic oc
with exn ->
let bt = Printexc.get_raw_backtrace () in
Log.error (fun k ->
k "Client handler for %a failed with %s\n%s"
Eio.Net.Sockaddr.pp client_addr
(Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt)))
done);
}
end in
(module M)
let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ~stdenv
~sw ?buf_size ?middlewares () : H.t =
let backend =
io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections
~stdenv ~sw ()
in
H.create_from ?buf_size ?middlewares ~backend ()

View file

@ -0,0 +1,31 @@
(** Tiny httpd EIO backend.
This replaces the threads + Unix blocking syscalls of {!Tiny_httpd_server}
with an Eio-based cooperative system.
{b NOTE}: this is very experimental and will absolutely change over time,
especially since Eio itself is also subject to change.
@since NEXT_RELEASE *)
(* TODO: pass in a switch *)
type 'a with_args =
?addr:string ->
?port:int ->
?unix_sock:string ->
?max_connections:int ->
?max_buf_pool_size:int ->
stdenv:Eio_unix.Stdenv.base ->
sw:Eio.Switch.t ->
'a
val io_backend : (unit -> (module Tiny_httpd.Server.IO_BACKEND)) with_args
(** Create a server *)
val create :
(?buf_size:int ->
?middlewares:([ `Encoding | `Stage of int ] * Tiny_httpd.Middleware.t) list ->
unit ->
Tiny_httpd.Server.t)
with_args
(** Create a server *)

31
tiny_httpd_eio.opam Normal file
View file

@ -0,0 +1,31 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.19"
synopsis: "Use eio for tiny_httpd"
maintainer: ["c-cube"]
authors: ["c-cube"]
license: "MIT"
homepage: "https://github.com/c-cube/tiny_httpd/"
bug-reports: "https://github.com/c-cube/tiny_httpd/issues"
depends: [
"dune" {>= "3.2"}
"tiny_httpd" {= version}
"eio"
"logs" {with-test}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/c-cube/tiny_httpd.git"