diff --git a/examples/fnv1_udp/client.ml b/examples/fnv1_udp/client.ml new file mode 100644 index 0000000..83a01b0 --- /dev/null +++ b/examples/fnv1_udp/client.ml @@ -0,0 +1,104 @@ +module Net = Nanoev_picos.Net +module Trace = Trace_core + +let ( let@ ) = ( @@ ) +let port = ref 9427 +let j = ref (Domain.recommended_domain_count ()) +let n_iters = ref 200 +let n_tasks = ref 100 +let reset_line = "\x1b[2K\r" + +(** Message we send *) +let txt = String.init 100 (fun i -> Char.chr (Char.code 'a' + (i mod 26))) + +let main () : unit = + let n_tasks_done = Atomic.make 0 in + let n_sent = Atomic.make 0 in + let n_recv = Atomic.make 0 in + ignore + (Thread.create + (fun () -> + while true do + Printf.printf "%stasks done: %d sent: %d recv: %d%!" reset_line + (Atomic.get n_tasks_done) (Atomic.get n_sent) (Atomic.get n_recv); + + Trace.counter_int "task-done" (Atomic.get n_tasks_done); + Trace.counter_int "sent" (Atomic.get n_sent); + Trace.counter_int "recv" (Atomic.get n_recv); + + Thread.delay 0.2 + done) + () + : Thread.t); + + (let@ pool = Moonpool.Ws_pool.with_ ~num_threads:!j () in + + let remoteaddr = Unix.ADDR_INET (Unix.inet_addr_loopback, !port) in + + let task () = + let[@alert "-deprecated"] _sp = + Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "task.run" + in + + let sock = Unix.socket Unix.PF_INET Unix.SOCK_DGRAM 0 in + Unix.set_nonblock sock; + + let buf_read = Bytes.create 9 in + for _i = 1 to !n_iters do + let ack = Atomic.make false in + + (* response might get lost, so we send a message until + we get a response *) + let fut = + Moonpool.spawn ~on:pool @@ fun () -> + while not (Atomic.get ack) do + let _n = + Net.sendto sock ~addr:remoteaddr ~flags:[] + (Bytes.unsafe_of_string txt) + 0 (String.length txt) + in + Atomic.incr n_sent; + assert (_n = 100); + (* retry later *) + Nanoev_picos.sleep 0.001 + done + in + + Picos.Fiber.(yield ()); + + (* receive a response *) + let _n, _remote = Net.recvfrom sock buf_read 0 9 ~flags:[] in + assert (_n = 8); + Atomic.incr n_recv; + Atomic.set ack true; + + (* cleanup *) + Moonpool.await fut + done; + + Unix.close sock; + Atomic.incr n_tasks_done; + Trace.exit_manual_span _sp + in + + let tasks = Dynarray.create () in + for _i = 1 to !n_tasks do + Dynarray.add_last tasks (Moonpool.spawn ~on:pool task) + done; + Dynarray.iter Moonpool.Fut.wait_block_exn tasks); + Printf.printf "%sall done\n%!" reset_line + +let () = + let@ () = Trace_tef.with_setup () in + Arg.parse + (Arg.align + [ + "-p", Arg.Set_int port, " set port"; + "-j", Arg.Set_int j, " thread pool size"; + "--n-tasks", Arg.Set_int n_tasks, " number of tasks"; + "--n-iters", Arg.Set_int n_iters, " number of iterations per task"; + ]) + ignore ""; + Nanoev_picos.Background_thread.setup @@ Nanoev_posix.create (); + let@ _run = Moonpool_fib.main in + main () diff --git a/examples/fnv1_udp/dune b/examples/fnv1_udp/dune new file mode 100644 index 0000000..7f7f7a5 --- /dev/null +++ b/examples/fnv1_udp/dune @@ -0,0 +1,4 @@ +(executables + (names server client) + (libraries moonpool moonpool.fib nanoev nanoev-picos nanoev-posix trace.core + trace-tef)) diff --git a/examples/fnv1_udp/server.ml b/examples/fnv1_udp/server.ml new file mode 100644 index 0000000..b51d267 --- /dev/null +++ b/examples/fnv1_udp/server.ml @@ -0,0 +1,75 @@ +module Net = Nanoev_picos.Net +module Trace = Trace_core + +let ( let@ ) = ( @@ ) +let port = ref 9427 +let j = ref (Domain.recommended_domain_count ()) + +(** FNV hashing + https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function +*) +module FNV = struct + let fnv_offset_basis = 0xcbf29ce484222325L + let fnv_prime = 0x100000001b3L + + let bytes ?len (x : bytes) : int64 = + let h = ref fnv_offset_basis in + let len = Option.value ~default:(Bytes.length x) len in + for i = 0 to len - 1 do + (h := Int64.(mul !h fnv_prime)); + let byte = Char.code (Bytes.unsafe_get x i) in + h := Int64.(logxor !h (of_int byte)) + done; + Int64.(logand !h max_int) + + let string (x : string) = bytes (Bytes.unsafe_of_string x) +end + +let n_requests = Atomic.make 0 +let n_replies = Atomic.make 0 + +let main () : unit = + let sock = Unix.socket Unix.PF_INET Unix.SOCK_DGRAM 0 in + Unix.set_nonblock sock; + Unix.bind sock (Unix.ADDR_INET (Unix.inet_addr_any, !port)); + + let@ pool = Moonpool.Ws_pool.with_ ~num_threads:!j () in + + if Trace.enabled () then + ignore + (Thread.create + (fun () -> + while true do + Trace.counter_int "n-requests" (Atomic.get n_requests); + Trace.counter_int "n-replies" (Atomic.get n_replies); + + Thread.delay 0.2 + done) + () + : Thread.t); + + let buf = Bytes.create 1024 in + while true do + let n, remoteaddr = Net.recvfrom sock buf 0 (Bytes.length buf) ~flags:[] in + Atomic.incr n_requests; + let buf2 = Bytes.sub buf 0 n in + Moonpool.run_async pool (fun () -> + let h = FNV.bytes buf2 in + let buf_ref = Bytes.create 8 in + Bytes.set_int64_le buf_ref 0 h; + ignore (Net.sendto sock ~addr:remoteaddr buf_ref 0 8 ~flags:[] : int)); + Atomic.incr n_replies + done + +let () = + let@ () = Trace_tef.with_setup () in + Arg.parse + (Arg.align + [ + "-p", Arg.Set_int port, " set port"; + "-j", Arg.Set_int j, " thread pool size"; + ]) + ignore ""; + Nanoev_picos.Background_thread.setup @@ Nanoev_posix.create (); + let@ _run = Moonpool_fib.main in + main ()