Compare commits

...

2 commits

Author SHA1 Message Date
Simon Cruanes
604224d1cf
basic UDP socket example
Some checks are pending
github pages / Deploy doc (push) Waiting to run
Build and Test / build (push) Waiting to run
Build and Test / format (push) Waiting to run
2025-05-06 11:02:10 -04:00
Simon Cruanes
0f8e8797e8
add Nanoev_picos.Net with send/recv functions 2025-05-06 11:01:54 -04:00
7 changed files with 211 additions and 3 deletions

104
examples/fnv1_udp/client.ml Normal file
View file

@ -0,0 +1,104 @@
module Net = Nanoev_picos.Net
module Trace = Trace_core
let ( let@ ) = ( @@ )
let port = ref 9427
let j = ref (Domain.recommended_domain_count ())
let n_iters = ref 200
let n_tasks = ref 100
let reset_line = "\x1b[2K\r"
(** Message we send *)
let txt = String.init 100 (fun i -> Char.chr (Char.code 'a' + (i mod 26)))
let main () : unit =
let n_tasks_done = Atomic.make 0 in
let n_sent = Atomic.make 0 in
let n_recv = Atomic.make 0 in
ignore
(Thread.create
(fun () ->
while true do
Printf.printf "%stasks done: %d sent: %d recv: %d%!" reset_line
(Atomic.get n_tasks_done) (Atomic.get n_sent) (Atomic.get n_recv);
Trace.counter_int "task-done" (Atomic.get n_tasks_done);
Trace.counter_int "sent" (Atomic.get n_sent);
Trace.counter_int "recv" (Atomic.get n_recv);
Thread.delay 0.2
done)
()
: Thread.t);
(let@ pool = Moonpool.Ws_pool.with_ ~num_threads:!j () in
let remoteaddr = Unix.ADDR_INET (Unix.inet_addr_loopback, !port) in
let task () =
let[@alert "-deprecated"] _sp =
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "task.run"
in
let sock = Unix.socket Unix.PF_INET Unix.SOCK_DGRAM 0 in
Unix.set_nonblock sock;
let buf_read = Bytes.create 9 in
for _i = 1 to !n_iters do
let ack = Atomic.make false in
(* response might get lost, so we send a message until
we get a response *)
let fut =
Moonpool.spawn ~on:pool @@ fun () ->
while not (Atomic.get ack) do
let _n =
Net.sendto sock ~addr:remoteaddr ~flags:[]
(Bytes.unsafe_of_string txt)
0 (String.length txt)
in
Atomic.incr n_sent;
assert (_n = 100);
(* retry later *)
Nanoev_picos.sleep 0.001
done
in
Picos.Fiber.(yield ());
(* receive a response *)
let _n, _remote = Net.recvfrom sock buf_read 0 9 ~flags:[] in
assert (_n = 8);
Atomic.incr n_recv;
Atomic.set ack true;
(* cleanup *)
Moonpool.await fut
done;
Unix.close sock;
Atomic.incr n_tasks_done;
Trace.exit_manual_span _sp
in
let tasks = Dynarray.create () in
for _i = 1 to !n_tasks do
Dynarray.add_last tasks (Moonpool.spawn ~on:pool task)
done;
Dynarray.iter Moonpool.Fut.wait_block_exn tasks);
Printf.printf "%sall done\n%!" reset_line
let () =
let@ () = Trace_tef.with_setup () in
Arg.parse
(Arg.align
[
"-p", Arg.Set_int port, " set port";
"-j", Arg.Set_int j, " thread pool size";
"--n-tasks", Arg.Set_int n_tasks, " number of tasks";
"--n-iters", Arg.Set_int n_iters, " number of iterations per task";
])
ignore "";
Nanoev_picos.Background_thread.setup @@ Nanoev_posix.create ();
let@ _run = Moonpool_fib.main in
main ()

4
examples/fnv1_udp/dune Normal file
View file

@ -0,0 +1,4 @@
(executables
(names server client)
(libraries moonpool moonpool.fib nanoev nanoev-picos nanoev-posix trace.core
trace-tef))

View file

@ -0,0 +1,75 @@
module Net = Nanoev_picos.Net
module Trace = Trace_core
let ( let@ ) = ( @@ )
let port = ref 9427
let j = ref (Domain.recommended_domain_count ())
(** FNV hashing
https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function
*)
module FNV = struct
let fnv_offset_basis = 0xcbf29ce484222325L
let fnv_prime = 0x100000001b3L
let bytes ?len (x : bytes) : int64 =
let h = ref fnv_offset_basis in
let len = Option.value ~default:(Bytes.length x) len in
for i = 0 to len - 1 do
(h := Int64.(mul !h fnv_prime));
let byte = Char.code (Bytes.unsafe_get x i) in
h := Int64.(logxor !h (of_int byte))
done;
Int64.(logand !h max_int)
let string (x : string) = bytes (Bytes.unsafe_of_string x)
end
let n_requests = Atomic.make 0
let n_replies = Atomic.make 0
let main () : unit =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_DGRAM 0 in
Unix.set_nonblock sock;
Unix.bind sock (Unix.ADDR_INET (Unix.inet_addr_any, !port));
let@ pool = Moonpool.Ws_pool.with_ ~num_threads:!j () in
if Trace.enabled () then
ignore
(Thread.create
(fun () ->
while true do
Trace.counter_int "n-requests" (Atomic.get n_requests);
Trace.counter_int "n-replies" (Atomic.get n_replies);
Thread.delay 0.2
done)
()
: Thread.t);
let buf = Bytes.create 1024 in
while true do
let n, remoteaddr = Net.recvfrom sock buf 0 (Bytes.length buf) ~flags:[] in
Atomic.incr n_requests;
let buf2 = Bytes.sub buf 0 n in
Moonpool.run_async pool (fun () ->
let h = FNV.bytes buf2 in
let buf_ref = Bytes.create 8 in
Bytes.set_int64_le buf_ref 0 h;
ignore (Net.sendto sock ~addr:remoteaddr buf_ref 0 8 ~flags:[] : int));
Atomic.incr n_replies
done
let () =
let@ () = Trace_tef.with_setup () in
Arg.parse
(Arg.align
[
"-p", Arg.Set_int port, " set port";
"-j", Arg.Set_int j, " thread pool size";
])
ignore "";
Nanoev_picos.Background_thread.setup @@ Nanoev_posix.create ();
let@ _run = Moonpool_fib.main in
main ()

View file

@ -4,5 +4,6 @@ include Base
module Global_ev = Global_ev
module IO_in = IO_in
module IO_out = IO_out
module Net = Net
module Net_client = Net_client
module Net_server = Net_server

View file

@ -15,5 +15,6 @@ end
module IO_in = IO_in
module IO_out = IO_out
module Net = Net
module Net_client = Net_client
module Net_server = Net_server

11
src/picos/net.ml Normal file
View file

@ -0,0 +1,11 @@
let[@inline] send sock buf i len ~flags : int =
Base.Raw.retry_write sock (fun () -> Unix.send sock buf i len flags)
let[@inline] sendto sock ~addr ~flags buf i len : int =
Base.Raw.retry_write sock (fun () -> Unix.sendto sock buf i len flags addr)
let[@inline] recv sock buf i len ~flags : int =
Base.Raw.retry_read sock (fun () -> Unix.recv sock buf i len flags)
let[@inline] recvfrom sock buf i len ~flags : int * Unix.sockaddr =
Base.Raw.retry_read sock (fun () -> Unix.recvfrom sock buf i len flags)

View file

@ -159,7 +159,12 @@ let wakeup_from_outside (self : st) : unit =
let rec perform_cbs ~closed = function
| Nil -> ()
| Sub (x, y, f, tail) ->
f ~closed x y;
(try f ~closed x y
with exn ->
let bt = Printexc.get_raw_backtrace () in
Printf.eprintf "nanoev-posix: uncaught error %s\n%s%!"
(Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt));
perform_cbs ~closed tail
(** Change the event loop right now. This must be called only from the owner
@ -320,8 +325,15 @@ let step (self : st) : unit =
let (Timer t) = Heap.peek_min_exn self.timer in
if t.deadline <= now then (
ignore (Heap.pop_min_exn self.timer : timer_ev);
t.f t.x t.y;
true
try
t.f t.x t.y;
true
with exn ->
let bt = Printexc.get_raw_backtrace () in
Printf.eprintf "nanoev-posix: uncaught error %s in timer\n%s%!"
(Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt);
false
) else
false
)