mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-17 08:06:43 -05:00
Compare commits
6 commits
62df67db61
...
5176f1956f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5176f1956f | ||
|
|
3c7cf2394a | ||
|
|
5e29f6fc2e | ||
|
|
e6eb0240d1 | ||
|
|
be3e0d8443 | ||
|
|
07235f0515 |
10 changed files with 191 additions and 139 deletions
|
|
@ -39,8 +39,8 @@ let[@inline] raise_with_bt exn =
|
|||
let bt = Printexc.get_raw_backtrace () in
|
||||
Printexc.raise_with_backtrace exn bt
|
||||
|
||||
let with_handler (type st arg) ~(ops : st ops) (self : st) :
|
||||
(unit -> unit) -> unit =
|
||||
let with_handler (type st) ~(ops : st ops) (self : st) : (unit -> unit) -> unit
|
||||
=
|
||||
let current =
|
||||
Some
|
||||
(fun k ->
|
||||
|
|
@ -83,8 +83,8 @@ let with_handler (type st arg) ~(ops : st ops) (self : st) :
|
|||
let fiber = get_current_fiber_exn () in
|
||||
(* when triggers is signaled, reschedule task *)
|
||||
if not (Picos.Fiber.try_suspend fiber trigger fiber k reschedule) then
|
||||
(* trigger was already signaled, run task now *)
|
||||
Picos.Fiber.resume fiber k)
|
||||
(* trigger was already signaled, reschedule task now *)
|
||||
reschedule trigger fiber k)
|
||||
| Picos.Computation.Cancel_after _r ->
|
||||
Some
|
||||
(fun k ->
|
||||
|
|
@ -145,7 +145,7 @@ module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
|
|||
cur_fiber := _dummy_fiber;
|
||||
TLS.set k_cur_fiber _dummy_fiber
|
||||
|
||||
let setup (type st) ~block_signals () : unit =
|
||||
let setup ~block_signals () : unit =
|
||||
if !state <> New then invalid_arg "worker_loop.setup: not a new instance";
|
||||
state := Ready;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +0,0 @@
|
|||
module Exn_bt = Moonpool.Exn_bt
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
|
|
@ -1,12 +1,10 @@
|
|||
(library
|
||||
(name moonpool_lwt)
|
||||
(public_name moonpool-lwt)
|
||||
(private_modules common_)
|
||||
(enabled_if
|
||||
(>= %{ocaml_version} 5.0))
|
||||
(libraries
|
||||
(re_export moonpool)
|
||||
(re_export moonpool.fib)
|
||||
picos
|
||||
(re_export lwt)
|
||||
lwt.unix))
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
open Common_
|
||||
module Exn_bt = Moonpool.Exn_bt
|
||||
|
||||
open struct
|
||||
module BQ = Moonpool.Blocking_queue
|
||||
module WL = Moonpool.Private.Worker_loop_
|
||||
module M = Moonpool
|
||||
end
|
||||
|
|
@ -17,8 +16,9 @@ let on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref =
|
|||
module Scheduler_state = struct
|
||||
type st = {
|
||||
tasks: WL.task_full Queue.t;
|
||||
actions_from_other_threads: (unit -> unit) BQ.t;
|
||||
actions_from_other_threads: (unit -> unit) Queue.t;
|
||||
(** Other threads ask us to run closures in the lwt thread *)
|
||||
mutex: Mutex.t;
|
||||
mutable thread: int;
|
||||
mutable closed: bool;
|
||||
mutable as_runner: Moonpool.Runner.t;
|
||||
|
|
@ -32,7 +32,8 @@ module Scheduler_state = struct
|
|||
let st : st =
|
||||
{
|
||||
tasks = Queue.create ();
|
||||
actions_from_other_threads = BQ.create ();
|
||||
actions_from_other_threads = Queue.create ();
|
||||
mutex = Mutex.create ();
|
||||
thread = Thread.self () |> Thread.id;
|
||||
closed = false;
|
||||
as_runner = Moonpool.Runner.dummy;
|
||||
|
|
@ -41,6 +42,13 @@ module Scheduler_state = struct
|
|||
notification = 0;
|
||||
has_notified = Atomic.make false;
|
||||
}
|
||||
|
||||
let add_action_from_another_thread_ (self : st) f : unit =
|
||||
Mutex.lock st.mutex;
|
||||
Queue.push f self.actions_from_other_threads;
|
||||
Mutex.unlock st.mutex;
|
||||
if not (Atomic.exchange self.has_notified true) then
|
||||
Lwt_unix.send_notification self.notification
|
||||
end
|
||||
|
||||
module Ops = struct
|
||||
|
|
@ -48,16 +56,12 @@ module Ops = struct
|
|||
|
||||
let around_task _ = default_around_task_
|
||||
|
||||
let add_action_from_another_thread_ (self : st) f : unit =
|
||||
BQ.push self.actions_from_other_threads f;
|
||||
if not (Atomic.exchange self.has_notified true) then
|
||||
Lwt_unix.send_notification self.notification
|
||||
|
||||
let schedule (self : st) t =
|
||||
if Thread.id (Thread.self ()) = self.thread then
|
||||
Queue.push t self.tasks
|
||||
else
|
||||
add_action_from_another_thread_ self (fun () -> Queue.push t self.tasks)
|
||||
Scheduler_state.add_action_from_another_thread_ self (fun () ->
|
||||
Queue.push t self.tasks)
|
||||
|
||||
let get_next_task (self : st) =
|
||||
if self.closed then raise WL.No_more_tasks;
|
||||
|
|
@ -109,27 +113,40 @@ open struct
|
|||
()
|
||||
end
|
||||
|
||||
let _dummy_exn_bt : Exn_bt.t =
|
||||
Exn_bt.get_callstack 0 (Failure "dummy Exn_bt from moonpool-lwt")
|
||||
|
||||
let await_lwt (fut : _ Lwt.t) =
|
||||
match Lwt.poll fut with
|
||||
| Some x -> x
|
||||
| None ->
|
||||
match Lwt.state fut with
|
||||
| Return x -> x
|
||||
| Fail exn -> raise exn
|
||||
| Sleep ->
|
||||
(* suspend fiber, wake it up when [fut] resolves *)
|
||||
let trigger = M.Trigger.create () in
|
||||
let res = ref (Error _dummy_exn_bt) in
|
||||
Lwt.on_termination fut (fun _ -> M.Trigger.signal trigger);
|
||||
M.Trigger.await trigger |> Option.iter Exn_bt.raise;
|
||||
Exn_bt.unwrap !res
|
||||
|
||||
(match Lwt.state fut with
|
||||
| Return x -> x
|
||||
| Fail exn -> raise exn
|
||||
| Sleep -> assert false)
|
||||
|
||||
let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
|
||||
let lwt_fut, lwt_prom = Lwt.wait () in
|
||||
M.Fut.on_result fut (function
|
||||
|
||||
(* in lwt thread, resolve [lwt_fut] *)
|
||||
let wakeup_using_res = function
|
||||
| Ok x -> Lwt.wakeup lwt_prom x
|
||||
| Error ebt ->
|
||||
let exn = Exn_bt.exn ebt in
|
||||
Lwt.wakeup_exn lwt_prom exn);
|
||||
Lwt.wakeup_exn lwt_prom exn
|
||||
in
|
||||
|
||||
M.Fut.on_result fut (fun res ->
|
||||
if Thread.id (Thread.self ()) = Scheduler_state.st.thread then
|
||||
(* can safely wakeup from the lwt thread *)
|
||||
wakeup_using_res res
|
||||
else
|
||||
Scheduler_state.add_action_from_another_thread_ Scheduler_state.st
|
||||
(fun () -> wakeup_using_res res));
|
||||
|
||||
lwt_fut
|
||||
|
||||
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
|
||||
|
|
@ -145,33 +162,34 @@ let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
|
|||
fut
|
||||
|
||||
let run_in_hook () =
|
||||
Printf.eprintf "AT %s\n%!" __LOC__;
|
||||
|
||||
(* execute actions sent from other threads *)
|
||||
(* execute actions sent from other threads; first transfer them
|
||||
all atomically to a local queue to reduce contention *)
|
||||
let local_acts = Queue.create () in
|
||||
BQ.transfer Scheduler_state.st.actions_from_other_threads local_acts;
|
||||
Mutex.lock Scheduler_state.st.mutex;
|
||||
Queue.transfer Scheduler_state.st.actions_from_other_threads local_acts;
|
||||
Atomic.set Scheduler_state.st.has_notified false;
|
||||
Mutex.unlock Scheduler_state.st.mutex;
|
||||
|
||||
Queue.iter (fun f -> f ()) local_acts;
|
||||
|
||||
(* run tasks *)
|
||||
Printf.eprintf "AT %s\n%!" __LOC__;
|
||||
FG.run ~max_tasks:1000 ();
|
||||
Printf.eprintf "AT %s\n%!" __LOC__;
|
||||
|
||||
if not (Queue.is_empty Scheduler_state.st.tasks) then
|
||||
ignore (Lwt.pause () : unit Lwt.t);
|
||||
Printf.eprintf "AT %s\n%!" __LOC__;
|
||||
()
|
||||
|
||||
let is_setup_ = ref false
|
||||
let is_setup_ = Atomic.make false
|
||||
|
||||
let setup () =
|
||||
if not !is_setup_ then (
|
||||
is_setup_ := true;
|
||||
if not (Atomic.exchange is_setup_ true) then (
|
||||
(* only one thread does this *)
|
||||
FG.setup ~block_signals:false ();
|
||||
Scheduler_state.st.enter_hook <-
|
||||
Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook);
|
||||
Scheduler_state.st.leave_hook <-
|
||||
Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook);
|
||||
(* notification used to wake lwt up *)
|
||||
Scheduler_state.st.notification <-
|
||||
Lwt_unix.make_notification ~once:false run_in_hook
|
||||
)
|
||||
|
|
@ -191,3 +209,7 @@ let lwt_main (f : _ -> 'a) : 'a =
|
|||
Scheduler_state.st.thread <- Thread.self () |> Thread.id;
|
||||
let fut = spawn_lwt (fun () -> f Scheduler_state.st.as_runner) in
|
||||
Lwt_main.run fut
|
||||
|
||||
let lwt_main_runner () =
|
||||
if not (Atomic.get is_setup_) then failwith "lwt_main_runner: not setup yet";
|
||||
Scheduler_state.st.as_runner
|
||||
|
|
|
|||
|
|
@ -36,3 +36,7 @@ val on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref
|
|||
|
||||
val lwt_main : (Moonpool.Runner.t -> 'a) -> 'a
|
||||
(** Setup, run lwt main, return the result *)
|
||||
|
||||
val lwt_main_runner : unit -> Moonpool.Runner.t
|
||||
(** The runner from {!lwt_main}. The runner is only going to work if {!lwt_main}
|
||||
is currently running in some thread. *)
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
[@@@ocaml.deprecated "use Picos_std_sync or single threaded solutions"]
|
||||
|
||||
module Mutex = Picos_std_sync.Mutex
|
||||
module Condition = Picos_std_sync.Condition
|
||||
module Lock = Lock
|
||||
|
|
|
|||
|
|
@ -1,93 +1,105 @@
|
|||
module M = Moonpool
|
||||
module M_lwt = Moonpool_lwt
|
||||
module Trace = Trace_core
|
||||
|
||||
let spf = Printf.sprintf
|
||||
let await_lwt = Moonpool_lwt.await_lwt
|
||||
let ( let@ ) = ( @@ )
|
||||
let lock_stdout = M.Lock.create ()
|
||||
|
||||
let main ~port ~runner ~n ~n_conn () : unit Lwt.t =
|
||||
let main ~port ~n ~n_conn ~verbose ~msg_per_conn () : unit =
|
||||
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
|
||||
|
||||
let remaining = Atomic.make n in
|
||||
let all_done = Atomic.make 0 in
|
||||
|
||||
let fut_exit, prom_exit = M.Fut.make () in
|
||||
|
||||
Printf.printf "connecting to port %d\n%!" port;
|
||||
let t0 = Unix.gettimeofday () in
|
||||
Printf.printf
|
||||
"connecting to port %d (%d msg per conn, %d conns total, %d max at a time)\n\
|
||||
%!"
|
||||
port msg_per_conn n n_conn;
|
||||
let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in
|
||||
|
||||
let rec run_task () =
|
||||
let token_pool = Lwt_pool.create n_conn (fun () -> Lwt.return_unit) in
|
||||
let n_msg_total = ref 0 in
|
||||
|
||||
let run_task () =
|
||||
(* Printf.printf "running task\n%!"; *)
|
||||
let n = Atomic.fetch_and_add remaining (-1) in
|
||||
if n > 0 then (
|
||||
(let _sp =
|
||||
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "connect.client"
|
||||
in
|
||||
Trace.message "connecting new client…";
|
||||
M_lwt.TCP_client.with_connect addr @@ fun ic oc ->
|
||||
let buf = Bytes.create 32 in
|
||||
let@ () = Lwt_pool.use token_pool in
|
||||
|
||||
for _j = 1 to 10 do
|
||||
let _sp =
|
||||
Trace.enter_manual_sub_span ~parent:_sp ~__FILE__ ~__LINE__
|
||||
"write.loop"
|
||||
in
|
||||
let@ () = M_lwt.spawn_lwt in
|
||||
let _sp =
|
||||
Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "connect.client"
|
||||
in
|
||||
Trace.message "connecting new client…";
|
||||
|
||||
let s = spf "hello %d" _j in
|
||||
M_lwt.IO_out.output_string oc s;
|
||||
M_lwt.IO_out.flush oc;
|
||||
let ic, oc = Lwt_io.open_connection addr |> await_lwt in
|
||||
|
||||
(* read back something *)
|
||||
M_lwt.IO_in.really_input ic buf 0 (String.length s);
|
||||
(let@ () = M.Lock.with_ lock_stdout in
|
||||
Printf.printf "read: %s\n%!"
|
||||
(Bytes.sub_string buf 0 (String.length s)));
|
||||
Trace.exit_manual_span _sp;
|
||||
()
|
||||
done;
|
||||
Trace.exit_manual_span _sp);
|
||||
let cleanup () =
|
||||
Trace.message "closing connection";
|
||||
Lwt_io.close ic |> await_lwt;
|
||||
Lwt_io.close oc |> await_lwt
|
||||
in
|
||||
|
||||
(* run another task *) M.Runner.run_async runner run_task
|
||||
) else (
|
||||
(* if we're the last to exit, resolve the promise *)
|
||||
let n_already_done = Atomic.fetch_and_add all_done 1 in
|
||||
if n_already_done = n_conn - 1 then (
|
||||
(let@ () = M.Lock.with_ lock_stdout in
|
||||
Printf.printf "all done\n%!");
|
||||
M.Fut.fulfill prom_exit @@ Ok ()
|
||||
)
|
||||
)
|
||||
let@ () = Fun.protect ~finally:cleanup in
|
||||
|
||||
let buf = Bytes.create 32 in
|
||||
|
||||
for _j = 1 to msg_per_conn do
|
||||
let _sp =
|
||||
Trace.enter_manual_span
|
||||
~parent:(Some (Trace.ctx_of_span _sp))
|
||||
~__FILE__ ~__LINE__ "write.loop"
|
||||
in
|
||||
|
||||
let s = spf "hello %d" _j in
|
||||
Lwt_io.write oc s |> await_lwt;
|
||||
Lwt_io.flush oc |> await_lwt;
|
||||
incr n_msg_total;
|
||||
|
||||
(* read back something *)
|
||||
Lwt_io.read_into_exactly ic buf 0 (String.length s) |> await_lwt;
|
||||
if verbose then
|
||||
Printf.printf "read: %s\n%!" (Bytes.sub_string buf 0 (String.length s));
|
||||
Trace.exit_manual_span _sp;
|
||||
()
|
||||
done;
|
||||
Trace.exit_manual_span _sp
|
||||
in
|
||||
|
||||
(* start the first [n_conn] tasks *)
|
||||
for _i = 1 to n_conn do
|
||||
M.Runner.run_async runner run_task
|
||||
done;
|
||||
let futs = List.init n (fun _ -> run_task ()) in
|
||||
Lwt.join futs |> await_lwt;
|
||||
|
||||
(* exit when [fut_exit] is resolved *)
|
||||
M_lwt.lwt_of_fut fut_exit
|
||||
Printf.printf "all done\n%!";
|
||||
let elapsed = Unix.gettimeofday () -. t0 in
|
||||
Printf.printf " sent %d messages in %.4fs (%.2f msg/s)\n%!" !n_msg_total
|
||||
elapsed
|
||||
(float !n_msg_total /. elapsed);
|
||||
()
|
||||
|
||||
let () =
|
||||
let@ () = Trace_tef.with_setup () in
|
||||
Trace.set_thread_name "main";
|
||||
|
||||
let port = ref 0 in
|
||||
let j = ref 4 in
|
||||
let n_conn = ref 100 in
|
||||
let n = ref 50_000 in
|
||||
let msg_per_conn = ref 10 in
|
||||
let verbose = ref false in
|
||||
|
||||
let opts =
|
||||
[
|
||||
"-p", Arg.Set_int port, " port";
|
||||
"-j", Arg.Set_int j, " number of threads";
|
||||
"-n", Arg.Set_int n, " total number of connections";
|
||||
"--n-conn", Arg.Set_int n_conn, " number of parallel connections";
|
||||
( "--msg-per-conn",
|
||||
Arg.Set_int msg_per_conn,
|
||||
" messages sent per connection" );
|
||||
"-v", Arg.Set verbose, " verbose";
|
||||
( "--n-conn",
|
||||
Arg.Set_int n_conn,
|
||||
" maximum number of connections opened simultaneously" );
|
||||
]
|
||||
|> Arg.align
|
||||
in
|
||||
Arg.parse opts ignore "echo client";
|
||||
|
||||
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
|
||||
(* Lwt_engine.set @@ new Lwt_engine.libev (); *)
|
||||
Lwt_main.run @@ main ~runner ~port:!port ~n:!n ~n_conn:!n_conn ()
|
||||
M_lwt.lwt_main @@ fun _runner ->
|
||||
main ~port:!port ~n:!n ~n_conn:!n_conn ~verbose:!verbose
|
||||
~msg_per_conn:!msg_per_conn ()
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ let str_of_sockaddr = function
|
|||
| Unix.ADDR_INET (addr, port) ->
|
||||
spf "%s:%d" (Unix.string_of_inet_addr addr) port
|
||||
|
||||
let main ~port ~runner:_ () : unit Lwt.t =
|
||||
let main ~port ~verbose ~runner:_ () : unit Lwt.t =
|
||||
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
|
||||
|
||||
let lwt_fut, _lwt_prom = Lwt.wait () in
|
||||
|
|
@ -26,7 +26,8 @@ let main ~port ~runner:_ () : unit Lwt.t =
|
|||
~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ])
|
||||
in
|
||||
|
||||
Printf.printf "got new client on %s\n%!" (str_of_sockaddr client_addr);
|
||||
if verbose then
|
||||
Printf.printf "got new client on %s\n%!" (str_of_sockaddr client_addr);
|
||||
|
||||
let buf = Bytes.create 32 in
|
||||
let continue = ref true in
|
||||
|
|
@ -42,6 +43,8 @@ let main ~port ~runner:_ () : unit Lwt.t =
|
|||
Trace.message "write"
|
||||
)
|
||||
done;
|
||||
if verbose then
|
||||
Printf.printf "done with client on %s\n%!" (str_of_sockaddr client_addr);
|
||||
Trace.exit_manual_span _sp;
|
||||
Trace.message "exit handle client"
|
||||
in
|
||||
|
|
@ -58,10 +61,13 @@ let () =
|
|||
Trace.set_thread_name "main";
|
||||
let port = ref 0 in
|
||||
let j = ref 4 in
|
||||
let verbose = ref false in
|
||||
|
||||
let opts =
|
||||
[
|
||||
"-p", Arg.Set_int port, " port"; "-j", Arg.Set_int j, " number of threads";
|
||||
"-v", Arg.Set verbose, " verbose";
|
||||
"-p", Arg.Set_int port, " port";
|
||||
"-j", Arg.Set_int j, " number of threads";
|
||||
]
|
||||
|> Arg.align
|
||||
in
|
||||
|
|
@ -69,4 +75,4 @@ let () =
|
|||
|
||||
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
|
||||
(* Lwt_engine.set @@ new Lwt_engine.libev (); *)
|
||||
Lwt_main.run @@ main ~runner ~port:!port ()
|
||||
Lwt_main.run @@ main ~runner ~port:!port ~verbose:!verbose ()
|
||||
|
|
|
|||
|
|
@ -8,10 +8,10 @@ module Str_tbl = Hashtbl.Make (struct
|
|||
let hash = Hashtbl.hash
|
||||
end)
|
||||
|
||||
let await_lwt = Moonpool_lwt.await_lwt
|
||||
let ( let@ ) = ( @@ )
|
||||
let lock_stdout = M.Lock.create ()
|
||||
|
||||
let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t =
|
||||
let main ~port ~ext ~dir ~n_conn () : unit =
|
||||
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
|
||||
|
||||
Printf.printf "hash dir=%S\n%!" dir;
|
||||
|
|
@ -20,12 +20,15 @@ let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t =
|
|||
let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in
|
||||
|
||||
(* TODO: *)
|
||||
let run_task () : unit =
|
||||
let _sp = Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "run-task" in
|
||||
let run_task () : unit Lwt.t =
|
||||
let@ () = M_lwt.spawn_lwt in
|
||||
let _sp =
|
||||
Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "run-task"
|
||||
in
|
||||
|
||||
let seen = Str_tbl.create 16 in
|
||||
|
||||
M_lwt.TCP_client.with_connect_lwt addr @@ fun ic oc ->
|
||||
let ic, oc = Lwt_io.open_connection addr |> await_lwt in
|
||||
let rec walk file : unit =
|
||||
if not (Sys.file_exists file) then
|
||||
()
|
||||
|
|
@ -33,7 +36,9 @@ let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t =
|
|||
()
|
||||
else if Sys.is_directory file then (
|
||||
let _sp =
|
||||
Trace.enter_manual_sub_span ~parent:_sp ~__FILE__ ~__LINE__ "walk-dir"
|
||||
Trace.enter_manual_span
|
||||
~parent:(Some (Trace.ctx_of_span _sp))
|
||||
~__FILE__ ~__LINE__ "walk-dir"
|
||||
~data:(fun () -> [ "d", `String file ])
|
||||
in
|
||||
|
||||
|
|
@ -45,9 +50,8 @@ let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t =
|
|||
()
|
||||
else (
|
||||
Str_tbl.add seen file ();
|
||||
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.write_line oc file);
|
||||
let res = M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.read_line ic) in
|
||||
let@ () = M.Lock.with_ lock_stdout in
|
||||
Lwt_io.write_line oc file |> await_lwt;
|
||||
let res = Lwt_io.read_line ic |> await_lwt in
|
||||
Printf.printf "%s\n%!" res
|
||||
)
|
||||
in
|
||||
|
|
@ -56,16 +60,14 @@ let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t =
|
|||
in
|
||||
|
||||
(* start the first [n_conn] tasks *)
|
||||
let futs = List.init n_conn (fun _ -> M.Fut.spawn ~on:runner run_task) in
|
||||
|
||||
Lwt.join (List.map M_lwt.lwt_of_fut futs)
|
||||
let futs = List.init n_conn (fun _ -> run_task ()) in
|
||||
Lwt.join futs |> await_lwt
|
||||
|
||||
let () =
|
||||
let@ () = Trace_tef.with_setup () in
|
||||
Trace.set_thread_name "main";
|
||||
|
||||
let port = ref 1234 in
|
||||
let j = ref 4 in
|
||||
let n_conn = ref 100 in
|
||||
let ext = ref "" in
|
||||
let dir = ref "." in
|
||||
|
|
@ -73,7 +75,6 @@ let () =
|
|||
let opts =
|
||||
[
|
||||
"-p", Arg.Set_int port, " port";
|
||||
"-j", Arg.Set_int j, " number of threads";
|
||||
"-d", Arg.Set_string dir, " directory to hash";
|
||||
"--n-conn", Arg.Set_int n_conn, " number of parallel connections";
|
||||
"--ext", Arg.Set_string ext, " extension to filter files";
|
||||
|
|
@ -82,7 +83,6 @@ let () =
|
|||
in
|
||||
Arg.parse opts ignore "echo client";
|
||||
|
||||
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
|
||||
(* Lwt_engine.set @@ new Lwt_engine.libev (); *)
|
||||
Lwt_main.run
|
||||
@@ main ~runner ~port:!port ~ext:!ext ~dir:!dir ~n_conn:!n_conn ()
|
||||
M_lwt.lwt_main @@ fun _runner ->
|
||||
main ~port:!port ~ext:!ext ~dir:!dir ~n_conn:!n_conn ()
|
||||
|
|
|
|||
|
|
@ -134,10 +134,11 @@ let sha_1 s =
|
|||
|
||||
(* server that reads from sockets lists of files, and returns hashes of these files *)
|
||||
|
||||
module M = Moonpool
|
||||
module M_lwt = Moonpool_lwt
|
||||
module Trace = Trace_core
|
||||
module Fut = Moonpool.Fut
|
||||
|
||||
let await_lwt = Moonpool_lwt.await_lwt
|
||||
let ( let@ ) = ( @@ )
|
||||
let spf = Printf.sprintf
|
||||
|
||||
|
|
@ -165,7 +166,7 @@ let read_file filename : string =
|
|||
in
|
||||
In_channel.with_open_bin filename In_channel.input_all
|
||||
|
||||
let main ~port ~runner () : unit Lwt.t =
|
||||
let main ~port ~runner () : unit =
|
||||
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
|
||||
|
||||
let lwt_fut, _lwt_prom = Lwt.wait () in
|
||||
|
|
@ -173,38 +174,39 @@ let main ~port ~runner () : unit Lwt.t =
|
|||
(* TODO: handle exit?? *)
|
||||
Printf.printf "listening on port %d\n%!" port;
|
||||
|
||||
let handle_client client_addr ic oc =
|
||||
let handle_client client_addr (ic, oc) =
|
||||
let@ () = Moonpool_lwt.spawn_lwt in
|
||||
let _sp =
|
||||
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "handle.client"
|
||||
Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "handle.client"
|
||||
~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ])
|
||||
in
|
||||
|
||||
try
|
||||
while true do
|
||||
Trace.message "read";
|
||||
let filename =
|
||||
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.read_line ic)
|
||||
|> String.trim
|
||||
in
|
||||
let filename = Lwt_io.read_line ic |> await_lwt |> String.trim in
|
||||
Trace.messagef (fun k -> k "hash %S" filename);
|
||||
|
||||
match read_file filename with
|
||||
| exception e ->
|
||||
Printf.eprintf "error while reading %S:\n%s\n" filename
|
||||
(Printexc.to_string e);
|
||||
M_lwt.run_in_lwt_and_await (fun () ->
|
||||
Lwt_io.write_line oc (spf "%s: error" filename));
|
||||
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.flush oc)
|
||||
Lwt_io.write_line oc (spf "%s: error" filename) |> await_lwt;
|
||||
Lwt_io.flush oc |> await_lwt
|
||||
| content ->
|
||||
(* got the content, now hash it *)
|
||||
let hash =
|
||||
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "hash" in
|
||||
(* got the content, now hash it in a background task *)
|
||||
let hash : _ Fut.t =
|
||||
let@ () = Moonpool.spawn ~on:runner in
|
||||
let@ _sp =
|
||||
Trace.with_span ~__FILE__ ~__LINE__ "hash" ~data:(fun () ->
|
||||
[ "file", `String filename ])
|
||||
in
|
||||
sha_1 content |> to_hex
|
||||
in
|
||||
|
||||
M_lwt.run_in_lwt_and_await (fun () ->
|
||||
Lwt_io.write_line oc (spf "%s: %s" filename hash));
|
||||
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.flush oc)
|
||||
let hash = Fut.await hash in
|
||||
Lwt_io.write_line oc (spf "%s: %s" filename hash) |> await_lwt;
|
||||
Lwt_io.flush oc |> await_lwt
|
||||
done
|
||||
with End_of_file | Unix.Unix_error (Unix.ECONNRESET, _, _) ->
|
||||
Trace.exit_manual_span _sp;
|
||||
|
|
@ -212,16 +214,17 @@ let main ~port ~runner () : unit Lwt.t =
|
|||
in
|
||||
|
||||
let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in
|
||||
let _server = M_lwt.TCP_server.establish_lwt ~runner addr handle_client in
|
||||
Printf.printf "listening on port=%d\n%!" port;
|
||||
let _server =
|
||||
Lwt_io.establish_server_with_client_address addr handle_client |> await_lwt
|
||||
in
|
||||
|
||||
lwt_fut
|
||||
lwt_fut |> await_lwt
|
||||
|
||||
let () =
|
||||
let@ () = Trace_tef.with_setup () in
|
||||
Trace.set_thread_name "main";
|
||||
let port = ref 1234 in
|
||||
let j = ref 4 in
|
||||
let j = ref 0 in
|
||||
|
||||
let opts =
|
||||
[
|
||||
|
|
@ -231,6 +234,14 @@ let () =
|
|||
in
|
||||
Arg.parse opts ignore "echo server";
|
||||
|
||||
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
|
||||
(* Lwt_engine.set @@ new Lwt_engine.libev (); *)
|
||||
Lwt_main.run @@ main ~runner ~port:!port ()
|
||||
let@ runner =
|
||||
let num_threads =
|
||||
if !j = 0 then
|
||||
None
|
||||
else
|
||||
Some !j
|
||||
in
|
||||
Moonpool.Ws_pool.with_ ?num_threads ()
|
||||
in
|
||||
M_lwt.lwt_main @@ fun _main_runner -> main ~runner ~port:!port ()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue