From 1dcadb34707bfd6b4ec34d183a71de1a5efca350 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 2 May 2025 13:47:40 -0400 Subject: [PATCH] add nanoev-picos as a package, also using picos_std --- dune-project | 26 +++++++++++-- nanoev-picos.opam | 34 +++++++++++++++++ nanoev-posix.opam | 2 + nanoev.opam | 2 - nanoev_tiny_httpd.opam | 5 ++- src/picos/dune | 5 +-- src/picos/net_client.ml | 7 +++- src/picos/net_server.ml | 80 ++++++++++++++++++++++++++++++++++++---- src/picos/net_server.mli | 17 +++++++++ 9 files changed, 157 insertions(+), 21 deletions(-) create mode 100644 nanoev-picos.opam diff --git a/dune-project b/dune-project index f0a6568..b9b1aca 100644 --- a/dune-project +++ b/dune-project @@ -21,10 +21,26 @@ (depends ocaml dune base-unix) (depopts (trace - (>= 0.7)) + (>= 0.7))) + (tags + (unix select async))) + +(package + (name nanoev-picos) + (synopsis "Use nanoev from picos") + (depends + ocaml + dune + base-unix + (nanoev + (= :version)) (iostream (>= 0.3)) (picos + (and + (>= 0.5) + (< 0.7))) + (picos_std (and (>= 0.5) (< 0.7)))) @@ -39,6 +55,8 @@ dune base-unix iomux + (nanoev (= :version)) + (nanoev-picos (= :version)) (mtime (>= 2.0)) (moonpool :with-test) @@ -53,9 +71,9 @@ (depends ocaml dune - nanoev - (picos - (>= 0.6)) + (nanoev (= :version)) + (nanoev-picos (= :version)) + picos picos_std (tiny_httpd (>= 0.17))) diff --git a/nanoev-picos.opam b/nanoev-picos.opam new file mode 100644 index 0000000..44e1230 --- /dev/null +++ b/nanoev-picos.opam @@ -0,0 +1,34 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Use nanoev from picos" +maintainer: ["Simon Cruanes"] +authors: ["Simon Cruanes"] +license: "MIT" +tags: ["unix" "select" "async"] +homepage: "https://github.com/c-cube/nanoev" +bug-reports: "https://github.com/c-cube/nanoev/issues" +depends: [ + "ocaml" + "dune" {>= "2.7"} + "base-unix" + "nanoev" {= version} + "iostream" {>= "0.3"} + "picos" {>= "0.5" & < "0.7"} + "picos_std" {>= "0.5" & < "0.7"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/c-cube/nanoev.git" diff --git a/nanoev-posix.opam b/nanoev-posix.opam index 8b82637..b9df267 100644 --- a/nanoev-posix.opam +++ b/nanoev-posix.opam @@ -12,6 +12,8 @@ depends: [ "dune" {>= "2.7"} "base-unix" "iomux" + "nanoev" {= version} + "nanoev-picos" {= version} "mtime" {>= "2.0"} "moonpool" {with-test} "trace" {with-test} diff --git a/nanoev.opam b/nanoev.opam index 9aeadaa..8268f40 100644 --- a/nanoev.opam +++ b/nanoev.opam @@ -15,8 +15,6 @@ depends: [ ] depopts: [ "trace" {>= "0.7"} - "iostream" {>= "0.3"} - "picos" {>= "0.5" & < "0.7"} ] build: [ ["dune" "subst"] {dev} diff --git a/nanoev_tiny_httpd.opam b/nanoev_tiny_httpd.opam index ea48089..08833e7 100644 --- a/nanoev_tiny_httpd.opam +++ b/nanoev_tiny_httpd.opam @@ -10,8 +10,9 @@ bug-reports: "https://github.com/c-cube/nanoev/issues" depends: [ "ocaml" "dune" {>= "2.7"} - "nanoev" - "picos" {>= "0.6"} + "nanoev" {= version} + "nanoev-picos" {= version} + "picos" "picos_std" "tiny_httpd" {>= "0.17"} "odoc" {with-doc} diff --git a/src/picos/dune b/src/picos/dune index fb37e29..de66efd 100644 --- a/src/picos/dune +++ b/src/picos/dune @@ -1,5 +1,4 @@ (library (name nanoev_picos) - (public_name nanoev.picos) - (optional) ; picos - (libraries threads picos iostream nanoev)) + (public_name nanoev-picos) + (libraries threads picos picos_std.sync iostream nanoev)) diff --git a/src/picos/net_client.ml b/src/picos/net_client.ml index 1bb3cf5..20ec2c4 100644 --- a/src/picos/net_client.ml +++ b/src/picos/net_client.ml @@ -6,7 +6,7 @@ let connect addr : Unix.file_descr = (try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ()); (* connect asynchronously *) - Base.Raw.retry_write sock (fun () -> Unix.connect sock addr); + Base.connect sock addr; sock let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a = @@ -15,6 +15,9 @@ let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a = let ic = IO_in.of_unix_fd sock in let oc = IO_out.of_unix_fd sock in - let finally () = try Unix.close sock with _ -> () in + let finally () = + (try Unix.shutdown sock Unix.SHUTDOWN_ALL with _ -> ()); + try Unix.close sock with _ -> () + in let@ () = Fun.protect ~finally in f ic oc diff --git a/src/picos/net_server.ml b/src/picos/net_server.ml index ca74b0c..642ba48 100644 --- a/src/picos/net_server.ml +++ b/src/picos/net_server.ml @@ -1,3 +1,5 @@ +module Sem = Picos_std_sync.Semaphore.Counting + type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit type t = { @@ -5,30 +7,81 @@ type t = { sock: Unix.file_descr; client_handler: client_handler; spawn: (unit -> unit) -> unit Picos.Computation.t; + max_conns: int; + sem: Sem.t; mutable running: unit Picos.Computation.t option; + exn_handler: exn -> Printexc.raw_backtrace -> unit; } -let join (self : t) : unit = Option.iter Picos.Computation.await self.running +let[@inline] join (self : t) : unit = + Option.iter Picos.Computation.await self.running + +let[@inline] max_connections self = self.max_conns + +let[@inline] n_active_connections (self : t) : int = + self.max_conns - Sem.get_value self.sem + +let[@inline] running (self : t) : bool = Atomic.get self.active let shutdown (self : t) = if Atomic.exchange self.active false then () open struct + let default_exn_handler exn bt = + Printf.eprintf "uncaught exception in network server: %s\n%s%!" + (Printexc.to_string exn) + (Printexc.raw_backtrace_to_string bt) + let run (self : t) () : unit = while Atomic.get self.active do let client_sock, client_addr = Base.accept self.sock in - let comp = + Sem.acquire self.sem; + + let cleanup () = + (try Unix.shutdown client_sock Unix.SHUTDOWN_ALL with _ -> ()); + (* TODO: close in nanoev too *) + (try Unix.close client_sock with _ -> ()); + Sem.release self.sem + in + + let comp : _ Picos.Computation.t = self.spawn (fun () -> let ic = IO_in.of_unix_fd client_sock in let oc = IO_out.of_unix_fd client_sock in - self.client_handler client_addr ic oc) + try + self.client_handler client_addr ic oc; + cleanup () + with exn -> + let bt = Printexc.get_raw_backtrace () in + cleanup (); + self.exn_handler exn bt) in ignore (comp : _ Picos.Computation.t) done end -let establish ?(backlog = 32) ~spawn ~(client_handler : client_handler) addr : t - = +let establish ?backlog ?max_connections ?(exn_handler = default_exn_handler) + ~spawn ~(client_handler : client_handler) addr : t = + let ev = + match Atomic.get Global_.st with + | Some { nanoev = ev; _ } -> ev + | None -> invalid_arg "Nanoev_picos.Net_server: no event loop installed" + in + + let max_connections = + match max_connections with + | None -> Nanoev.max_fds ev + | Some n -> min (Nanoev.max_fds ev) n + in + let sem = Sem.make max_connections in + + let backlog = + match backlog with + | Some n -> max 4 n + | None -> max 4 max_connections + in + let domain = Unix.domain_of_sockaddr addr in let sock = Unix.socket domain Unix.SOCK_STREAM 0 in + Unix.bind sock addr; Unix.listen sock backlog; Unix.set_nonblock sock; @@ -36,12 +89,23 @@ let establish ?(backlog = 32) ~spawn ~(client_handler : client_handler) addr : t (try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ()); let server = - { active = Atomic.make true; spawn; sock; client_handler; running = None } + { + active = Atomic.make true; + max_conns = max_connections; + sem; + spawn; + sock; + client_handler; + running = None; + exn_handler; + } in server.running <- Some (spawn (run server)); server -let with_ ?backlog ~spawn ~client_handler addr f = - let server = establish ?backlog ~spawn ~client_handler addr in +let with_ ?backlog ?max_connections ?exn_handler ~spawn ~client_handler addr f = + let server = + establish ?backlog ?max_connections ?exn_handler ~spawn ~client_handler addr + in Fun.protect ~finally:(fun () -> shutdown server) (fun () -> f server) diff --git a/src/picos/net_server.mli b/src/picos/net_server.mli index 178fc0d..8f9cf62 100644 --- a/src/picos/net_server.mli +++ b/src/picos/net_server.mli @@ -2,17 +2,34 @@ type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit type t val join : t -> unit +(** Wait for server to shutdown *) + val shutdown : t -> unit +(** Ask the server to stop *) + +val running : t -> bool +val max_connections : t -> int +val n_active_connections : t -> int val establish : ?backlog:int -> + ?max_connections:int -> + ?exn_handler:(exn -> Printexc.raw_backtrace -> unit) -> spawn:((unit -> unit) -> unit Picos.Computation.t) -> client_handler:client_handler -> Unix.sockaddr -> t +(** Create and start a new server on the given socket address. + @param spawn used to spawn a new computation per client + @param client_handler + the logic for talking to a client, will run in its own computation + @param backlog number of connections waiting in the listening socket + @param max_connections max number of simultaneous connections *) val with_ : ?backlog:int -> + ?max_connections:int -> + ?exn_handler:(exn -> Printexc.raw_backtrace -> unit) -> spawn:((unit -> unit) -> unit Picos.Computation.t) -> client_handler:client_handler -> Unix.sockaddr ->