add nanoev-picos as a package, also using picos_std

This commit is contained in:
Simon Cruanes 2025-05-02 13:47:40 -04:00
parent caeae5794c
commit 1dcadb3470
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
9 changed files with 157 additions and 21 deletions

View file

@ -21,10 +21,26 @@
(depends ocaml dune base-unix)
(depopts
(trace
(>= 0.7))
(>= 0.7)))
(tags
(unix select async)))
(package
(name nanoev-picos)
(synopsis "Use nanoev from picos")
(depends
ocaml
dune
base-unix
(nanoev
(= :version))
(iostream
(>= 0.3))
(picos
(and
(>= 0.5)
(< 0.7)))
(picos_std
(and
(>= 0.5)
(< 0.7))))
@ -39,6 +55,8 @@
dune
base-unix
iomux
(nanoev (= :version))
(nanoev-picos (= :version))
(mtime
(>= 2.0))
(moonpool :with-test)
@ -53,9 +71,9 @@
(depends
ocaml
dune
nanoev
(picos
(>= 0.6))
(nanoev (= :version))
(nanoev-picos (= :version))
picos
picos_std
(tiny_httpd
(>= 0.17)))

34
nanoev-picos.opam Normal file
View file

@ -0,0 +1,34 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
synopsis: "Use nanoev from picos"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]
license: "MIT"
tags: ["unix" "select" "async"]
homepage: "https://github.com/c-cube/nanoev"
bug-reports: "https://github.com/c-cube/nanoev/issues"
depends: [
"ocaml"
"dune" {>= "2.7"}
"base-unix"
"nanoev" {= version}
"iostream" {>= "0.3"}
"picos" {>= "0.5" & < "0.7"}
"picos_std" {>= "0.5" & < "0.7"}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/c-cube/nanoev.git"

View file

@ -12,6 +12,8 @@ depends: [
"dune" {>= "2.7"}
"base-unix"
"iomux"
"nanoev" {= version}
"nanoev-picos" {= version}
"mtime" {>= "2.0"}
"moonpool" {with-test}
"trace" {with-test}

View file

@ -15,8 +15,6 @@ depends: [
]
depopts: [
"trace" {>= "0.7"}
"iostream" {>= "0.3"}
"picos" {>= "0.5" & < "0.7"}
]
build: [
["dune" "subst"] {dev}

View file

@ -10,8 +10,9 @@ bug-reports: "https://github.com/c-cube/nanoev/issues"
depends: [
"ocaml"
"dune" {>= "2.7"}
"nanoev"
"picos" {>= "0.6"}
"nanoev" {= version}
"nanoev-picos" {= version}
"picos"
"picos_std"
"tiny_httpd" {>= "0.17"}
"odoc" {with-doc}

View file

@ -1,5 +1,4 @@
(library
(name nanoev_picos)
(public_name nanoev.picos)
(optional) ; picos
(libraries threads picos iostream nanoev))
(public_name nanoev-picos)
(libraries threads picos picos_std.sync iostream nanoev))

View file

@ -6,7 +6,7 @@ let connect addr : Unix.file_descr =
(try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ());
(* connect asynchronously *)
Base.Raw.retry_write sock (fun () -> Unix.connect sock addr);
Base.connect sock addr;
sock
let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a =
@ -15,6 +15,9 @@ let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a =
let ic = IO_in.of_unix_fd sock in
let oc = IO_out.of_unix_fd sock in
let finally () = try Unix.close sock with _ -> () in
let finally () =
(try Unix.shutdown sock Unix.SHUTDOWN_ALL with _ -> ());
try Unix.close sock with _ -> ()
in
let@ () = Fun.protect ~finally in
f ic oc

View file

@ -1,3 +1,5 @@
module Sem = Picos_std_sync.Semaphore.Counting
type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit
type t = {
@ -5,30 +7,81 @@ type t = {
sock: Unix.file_descr;
client_handler: client_handler;
spawn: (unit -> unit) -> unit Picos.Computation.t;
max_conns: int;
sem: Sem.t;
mutable running: unit Picos.Computation.t option;
exn_handler: exn -> Printexc.raw_backtrace -> unit;
}
let join (self : t) : unit = Option.iter Picos.Computation.await self.running
let[@inline] join (self : t) : unit =
Option.iter Picos.Computation.await self.running
let[@inline] max_connections self = self.max_conns
let[@inline] n_active_connections (self : t) : int =
self.max_conns - Sem.get_value self.sem
let[@inline] running (self : t) : bool = Atomic.get self.active
let shutdown (self : t) = if Atomic.exchange self.active false then ()
open struct
let default_exn_handler exn bt =
Printf.eprintf "uncaught exception in network server: %s\n%s%!"
(Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt)
let run (self : t) () : unit =
while Atomic.get self.active do
let client_sock, client_addr = Base.accept self.sock in
let comp =
Sem.acquire self.sem;
let cleanup () =
(try Unix.shutdown client_sock Unix.SHUTDOWN_ALL with _ -> ());
(* TODO: close in nanoev too *)
(try Unix.close client_sock with _ -> ());
Sem.release self.sem
in
let comp : _ Picos.Computation.t =
self.spawn (fun () ->
let ic = IO_in.of_unix_fd client_sock in
let oc = IO_out.of_unix_fd client_sock in
self.client_handler client_addr ic oc)
try
self.client_handler client_addr ic oc;
cleanup ()
with exn ->
let bt = Printexc.get_raw_backtrace () in
cleanup ();
self.exn_handler exn bt)
in
ignore (comp : _ Picos.Computation.t)
done
end
let establish ?(backlog = 32) ~spawn ~(client_handler : client_handler) addr : t
=
let establish ?backlog ?max_connections ?(exn_handler = default_exn_handler)
~spawn ~(client_handler : client_handler) addr : t =
let ev =
match Atomic.get Global_.st with
| Some { nanoev = ev; _ } -> ev
| None -> invalid_arg "Nanoev_picos.Net_server: no event loop installed"
in
let max_connections =
match max_connections with
| None -> Nanoev.max_fds ev
| Some n -> min (Nanoev.max_fds ev) n
in
let sem = Sem.make max_connections in
let backlog =
match backlog with
| Some n -> max 4 n
| None -> max 4 max_connections
in
let domain = Unix.domain_of_sockaddr addr in
let sock = Unix.socket domain Unix.SOCK_STREAM 0 in
Unix.bind sock addr;
Unix.listen sock backlog;
Unix.set_nonblock sock;
@ -36,12 +89,23 @@ let establish ?(backlog = 32) ~spawn ~(client_handler : client_handler) addr : t
(try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ());
let server =
{ active = Atomic.make true; spawn; sock; client_handler; running = None }
{
active = Atomic.make true;
max_conns = max_connections;
sem;
spawn;
sock;
client_handler;
running = None;
exn_handler;
}
in
server.running <- Some (spawn (run server));
server
let with_ ?backlog ~spawn ~client_handler addr f =
let server = establish ?backlog ~spawn ~client_handler addr in
let with_ ?backlog ?max_connections ?exn_handler ~spawn ~client_handler addr f =
let server =
establish ?backlog ?max_connections ?exn_handler ~spawn ~client_handler addr
in
Fun.protect ~finally:(fun () -> shutdown server) (fun () -> f server)

View file

@ -2,17 +2,34 @@ type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit
type t
val join : t -> unit
(** Wait for server to shutdown *)
val shutdown : t -> unit
(** Ask the server to stop *)
val running : t -> bool
val max_connections : t -> int
val n_active_connections : t -> int
val establish :
?backlog:int ->
?max_connections:int ->
?exn_handler:(exn -> Printexc.raw_backtrace -> unit) ->
spawn:((unit -> unit) -> unit Picos.Computation.t) ->
client_handler:client_handler ->
Unix.sockaddr ->
t
(** Create and start a new server on the given socket address.
@param spawn used to spawn a new computation per client
@param client_handler
the logic for talking to a client, will run in its own computation
@param backlog number of connections waiting in the listening socket
@param max_connections max number of simultaneous connections *)
val with_ :
?backlog:int ->
?max_connections:int ->
?exn_handler:(exn -> Printexc.raw_backtrace -> unit) ->
spawn:((unit -> unit) -> unit Picos.Computation.t) ->
client_handler:client_handler ->
Unix.sockaddr ->