diff --git a/.github/workflows/gh-pages.yml b/.github/workflows/gh-pages.yml index 457b0c35..ede82674 100644 --- a/.github/workflows/gh-pages.yml +++ b/.github/workflows/gh-pages.yml @@ -20,10 +20,10 @@ jobs: allow-prerelease-opam: true - name: Deps - run: opam install odig moonpool + run: opam install odig moonpool moonpool-lwt - name: Build - run: opam exec -- odig odoc --cache-dir=_doc/ moonpool + run: opam exec -- odig odoc --cache-dir=_doc/ moonpool moonpool-lwt - name: Deploy uses: peaceiris/actions-gh-pages@v3 diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index b7f5dbb7..157b43d7 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -20,7 +20,7 @@ jobs: ocaml-compiler: - '4.08' - '4.14' - - '5.0' + - '5.1' runs-on: ${{ matrix.os }} steps: @@ -32,7 +32,10 @@ jobs: dune-cache: true allow-prerelease-opam: true + - run: opam install -t moonpool moonpool-lwt --deps-only + if: matrix.ocaml-compiler == '5.1' - run: opam install -t moonpool --deps-only + if: matrix.ocaml-compiler != '5.1' - run: opam exec -- dune build @install - run: opam exec -- dune runtest - run: opam install thread-local-storage trace diff --git a/benchs/fib_rec.ml b/benchs/fib_rec.ml index 66eded93..82d588cf 100644 --- a/benchs/fib_rec.ml +++ b/benchs/fib_rec.ml @@ -14,7 +14,7 @@ let cutoff = ref 20 let rec fib ~on x : int Fut.t = if x <= !cutoff then - Fut.spawn ~name:"fib" ~on (fun () -> fib_direct x) + Fut.spawn ~on (fun () -> fib_direct x) else let open Fut.Infix in let+ t1 = fib ~on (x - 1) and+ t2 = fib ~on (x - 2) in @@ -31,14 +31,14 @@ let fib_fj ~on x : int Fut.t = n1 + n2 ) in - Fut.spawn ~name:"fib" ~on (fun () -> fib_rec x) + Fut.spawn ~on (fun () -> fib_rec x) let fib_await ~on x : int Fut.t = let rec fib_rec x : int Fut.t = if x <= !cutoff then - Fut.spawn ~name:"fib" ~on (fun () -> fib_direct x) + Fut.spawn ~on (fun () -> fib_direct x) else - Fut.spawn ~name:"fib" ~on (fun () -> + Fut.spawn ~on (fun () -> let n1 = fib_rec (x - 1) in let n2 = fib_rec (x - 2) in let n1 = Fut.await n1 in diff --git a/benchs/pi.ml b/benchs/pi.ml index 4eae7eb0..63ddc2ca 100644 --- a/benchs/pi.ml +++ b/benchs/pi.ml @@ -76,7 +76,7 @@ let run_fork_join ~kind num_steps : float = let step = 1. /. float num_steps in let global_sum = Lock.create 0. in - Ws_pool.run_wait_block ~name:"pi.fj" pool (fun () -> + Ws_pool.run_wait_block pool (fun () -> FJ.for_ ~chunk_size:(3 + (num_steps / num_tasks)) num_steps diff --git a/dune-project b/dune-project index 55cb93f1..050eb9d8 100644 --- a/dune-project +++ b/dune-project @@ -33,4 +33,15 @@ (tags (thread pool domain futures fork-join))) +(package + (name moonpool-lwt) + (synopsis "Event loop for moonpool based on Lwt-engine") + (allow_empty) ; on < 5.0 + (depends + (moonpool (= :version)) + (ocaml (>= 5.0)) + lwt + base-unix + (odoc :with-doc))) + ; See the complete stanza docs at https://dune.readthedocs.io/en/stable/dune-files.html#dune-project diff --git a/moonpool-lwt.opam b/moonpool-lwt.opam new file mode 100644 index 00000000..e2c9eb4f --- /dev/null +++ b/moonpool-lwt.opam @@ -0,0 +1,32 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +version: "0.5.1" +synopsis: "Event loop for moonpool based on Lwt-engine" +maintainer: ["Simon Cruanes"] +authors: ["Simon Cruanes"] +license: "MIT" +homepage: "https://github.com/c-cube/moonpool" +bug-reports: "https://github.com/c-cube/moonpool/issues" +depends: [ + "dune" {>= "3.0"} + "moonpool" {= version} + "ocaml" {>= "5.0"} + "lwt" + "base-unix" + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/c-cube/moonpool.git" diff --git a/src/core/fifo_pool.ml b/src/core/fifo_pool.ml index d2757324..58177b3f 100644 --- a/src/core/fifo_pool.ml +++ b/src/core/fifo_pool.ml @@ -6,7 +6,6 @@ let k_storage = Task_local_storage.Private_.Storage.k_storage type task_full = { f: unit -> unit; - name: string; ls: Task_local_storage.storage; } @@ -44,18 +43,17 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit = !cur_ls in - let run_another_task ls ~name task' = + let run_another_task ls task' = let ls' = Task_local_storage.Private_.Storage.copy ls in - schedule_ self { f = task'; name; ls = ls' } + schedule_ self { f = task'; ls = ls' } in let run_task (task : task_full) : unit = cur_ls := task.ls; let _ctx = before_task runner in - cur_span := Tracing_.enter_span task.name; let resume ls k res = - schedule_ self { f = (fun () -> k res); name = task.name; ls } + schedule_ self { f = (fun () -> k res); ls } in (* run the task now, catching errors, handling effects *) @@ -105,12 +103,12 @@ type ('a, 'b) create_args = ?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> ?around_task:(t -> 'b) * (t -> 'b -> unit) -> ?num_threads:int -> - ?name:string -> + ?name:string -> 'a let create ?(on_init_thread = default_thread_init_exit_) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) - ?around_task ?num_threads ?name () : t = + ?around_task ?num_threads ?name () : t = (* wrapper *) let around_task = match around_task with @@ -131,7 +129,7 @@ let create ?(on_init_thread = default_thread_init_exit_) { threads = Array.make num_threads dummy; q = Bb_queue.create () } in - let run_async ~name ~ls f = schedule_ pool { f; name; ls } in + let run_async ~ls f = schedule_ pool { f; ls } in let runner = Runner.For_runner_implementors.create diff --git a/src/core/fut.ml b/src/core/fut.ml index 2c7d6896..5cbcb366 100644 --- a/src/core/fut.ml +++ b/src/core/fut.ml @@ -5,21 +5,13 @@ type 'a waiter = 'a or_error -> unit type 'a state = | Done of 'a or_error - | Waiting of { - waiters: 'a waiter list; - name: string; - } + | Waiting of { waiters: 'a waiter list } type 'a t = { st: 'a state A.t } [@@unboxed] type 'a promise = 'a t -let[@inline] get_name_ (self : _ t) = - match A.get self.st with - | Done _ -> "" - | Waiting { name; _ } -> name - -let make ?(name = "") () = - let fut = { st = A.make (Waiting { waiters = []; name }) } in +let make () = + let fut = { st = A.make (Waiting { waiters = [] }) } in fut, fut let[@inline] of_result x : _ t = { st = A.make (Done x) } @@ -72,8 +64,8 @@ let on_result (self : _ t) (f : _ waiter) : unit = | Done x -> f x; false - | Waiting { waiters = l; name } -> - not (A.compare_and_set self.st st (Waiting { waiters = f :: l; name })) + | Waiting { waiters = l } -> + not (A.compare_and_set self.st st (Waiting { waiters = f :: l })) do Domain_.relax () done @@ -86,7 +78,7 @@ let fulfill (self : _ t) (r : _ result) : unit = let st = A.get self.st in match st with | Done _ -> raise Already_fulfilled - | Waiting { waiters = l; name = _ } -> + | Waiting { waiters = l } -> let did_swap = A.compare_and_set self.st st (Done r) in if did_swap then ( (* success, now call all the waiters *) @@ -105,7 +97,7 @@ let[@inline] fulfill_idempotent self r = (* ### combinators ### *) -let spawn ?name ?ls ~on f : _ t = +let spawn ?ls ~on f : _ t = let fut, promise = make () in let task () = @@ -118,13 +110,13 @@ let spawn ?name ?ls ~on f : _ t = fulfill promise res in - Runner.run_async ?name ?ls on task; + Runner.run_async ?ls on task; fut -let spawn_on_current_runner ?name ?ls f : _ t = +let spawn_on_current_runner ?ls f : _ t = match Runner.get_current_runner () with | None -> failwith "Fut.spawn_on_current_runner: not running on a runner" - | Some on -> spawn ?name ?ls ~on f + | Some on -> spawn ?ls ~on f let reify_error (f : 'a t) : 'a or_error t = match peek f with @@ -150,22 +142,20 @@ let map ?on ~f fut : _ t = | Error e_bt -> Error e_bt in - let name = get_name_ fut in match peek fut, get_runner_ ?on () with | Some res, None -> of_result @@ map_immediate_ res | Some res, Some runner -> - let fut2, promise = make ~name () in - Runner.run_async ~name runner (fun () -> - fulfill promise @@ map_immediate_ res); + let fut2, promise = make () in + Runner.run_async runner (fun () -> fulfill promise @@ map_immediate_ res); fut2 | None, None -> - let fut2, promise = make ~name () in + let fut2, promise = make () in on_result fut (fun res -> fulfill promise @@ map_immediate_ res); fut2 | None, Some runner -> - let fut2, promise = make ~name () in + let fut2, promise = make () in on_result fut (fun res -> - Runner.run_async ~name runner (fun () -> + Runner.run_async runner (fun () -> fulfill promise @@ map_immediate_ res)); fut2 @@ -174,7 +164,7 @@ let join (fut : 'a t t) : 'a t = | Some (Ok f) -> f | Some (Error (e, bt)) -> fail e bt | None -> - let fut2, promise = make ~name:(get_name_ fut) () in + let fut2, promise = make () in on_result fut (function | Ok sub_fut -> on_result sub_fut (fulfill promise) | Error _ as e -> fulfill promise e); @@ -197,20 +187,19 @@ let bind ?on ~f fut : _ t = on_result f_res_fut (fun r -> fulfill promise r) in - let name = get_name_ fut in match peek fut, get_runner_ ?on () with | Some res, Some runner -> - let fut2, promise = make ~name () in - Runner.run_async ~name runner (bind_and_fulfill res promise); + let fut2, promise = make () in + Runner.run_async runner (bind_and_fulfill res promise); fut2 | Some res, None -> apply_f_to_res res | None, Some runner -> - let fut2, promise = make ~name () in + let fut2, promise = make () in on_result fut (fun r -> - Runner.run_async ~name runner (bind_and_fulfill r promise)); + Runner.run_async runner (bind_and_fulfill r promise)); fut2 | None, None -> - let fut2, promise = make ~name () in + let fut2, promise = make () in on_result fut (fun res -> bind_and_fulfill res promise ()); fut2 @@ -234,7 +223,7 @@ let both a b : _ t = | Some (Ok x), Some (Ok y) -> return (x, y) | Some (Error (e, bt)), _ | _, Some (Error (e, bt)) -> fail e bt | _ -> - let fut, promise = make ~name:(get_name_ a) () in + let fut, promise = make () in let st = A.make `Neither in on_result a (function @@ -267,7 +256,7 @@ let choose a b : _ t = | _, Some (Ok y) -> return (Either.Right y) | Some (Error (e, bt)), Some (Error _) -> fail e bt | _ -> - let fut, promise = make ~name:(get_name_ a) () in + let fut, promise = make () in let one_failure = A.make false in on_result a (function @@ -290,7 +279,7 @@ let choose_same a b : _ t = | _, Some (Ok y) -> return y | Some (Error (e, bt)), Some (Error _) -> fail e bt | _ -> - let fut, promise = make ~name:(get_name_ a) () in + let fut, promise = make () in let one_failure = A.make false in on_result a (function diff --git a/src/core/fut.mli b/src/core/fut.mli index a82975f3..7c0d4466 100644 --- a/src/core/fut.mli +++ b/src/core/fut.mli @@ -26,9 +26,8 @@ type 'a promise (** A promise, which can be fulfilled exactly once to set the corresponding future *) -val make : ?name:string -> unit -> 'a t * 'a promise -(** Make a new future with the associated promise. - @param name name for the future, used for tracing. since NEXT_RELEASE. *) +val make : unit -> 'a t * 'a promise +(** Make a new future with the associated promise. *) val on_result : 'a t -> ('a or_error -> unit) -> unit (** [on_result fut f] registers [f] to be called in the future @@ -95,16 +94,12 @@ val is_failed : _ t -> bool (** {2 Combinators} *) val spawn : - ?name:string -> - ?ls:Task_local_storage.storage -> - on:Runner.t -> - (unit -> 'a) -> - 'a t + ?ls:Task_local_storage.storage -> on:Runner.t -> (unit -> 'a) -> 'a t (** [spaw ~on f] runs [f()] on the given runner [on], and return a future that will hold its result. *) val spawn_on_current_runner : - ?name:string -> ?ls:Task_local_storage.storage -> (unit -> 'a) -> 'a t + ?ls:Task_local_storage.storage -> (unit -> 'a) -> 'a t (** This must be run from inside a runner, and schedules the new task on it as well. diff --git a/src/core/immediate_runner.ml b/src/core/immediate_runner.ml index c260f439..9412fd35 100644 --- a/src/core/immediate_runner.ml +++ b/src/core/immediate_runner.ml @@ -4,19 +4,16 @@ include Runner (* convenient alias *) let k_ls = Task_local_storage.Private_.Storage.k_storage -let run_async_ ~name ~ls f = +let run_async_ ~ls f = let cur_ls = ref ls in TLS.set k_ls (Some cur_ls); cur_ls := ls; - let sp = Tracing_.enter_span name in try let x = f () in - Tracing_.exit_span sp; TLS.set k_ls None; x with e -> let bt = Printexc.get_raw_backtrace () in - Tracing_.exit_span sp; TLS.set k_ls None; Printexc.raise_with_backtrace e bt diff --git a/src/core/moonpool.ml b/src/core/moonpool.ml index c69b5581..aed377ea 100644 --- a/src/core/moonpool.ml +++ b/src/core/moonpool.ml @@ -34,6 +34,7 @@ module Private = struct module Ws_deque_ = Ws_deque_ module Suspend_ = Suspend_ module Domain_ = Domain_ + module Tracing_ = Tracing_ let num_domains = Domain_pool_.n_domains end diff --git a/src/core/moonpool.mli b/src/core/moonpool.mli index 23ee52d8..cb1c2991 100644 --- a/src/core/moonpool.mli +++ b/src/core/moonpool.mli @@ -26,25 +26,14 @@ val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t to run all the various threads needed in an application (timers, event loops, etc.) *) val run_async : - ?name:string -> - ?ls:Task_local_storage.storage -> - Runner.t -> - (unit -> unit) -> - unit + ?ls:Task_local_storage.storage -> Runner.t -> (unit -> unit) -> unit (** [run_async runner task] schedules the task to run on the given runner. This means [task()] will be executed at some point in the future, possibly in another thread. - @param name if provided and [Trace] is present in dependencies, a span - will be created when the task starts, and will stop when the task is over. - (since NEXT_RELEASE) @since 0.5 *) val run_wait_block : - ?name:string -> - ?ls:Task_local_storage.storage -> - Runner.t -> - (unit -> 'a) -> - 'a + ?ls:Task_local_storage.storage -> Runner.t -> (unit -> 'a) -> 'a (** [run_wait_block runner f] schedules [f] for later execution on the runner, like {!run_async}. It then blocks the current thread until [f()] is done executing, @@ -63,21 +52,14 @@ val recommended_thread_count : unit -> int @since 0.5 *) val spawn : - ?name:string -> - ?ls:Task_local_storage.storage -> - on:Runner.t -> - (unit -> 'a) -> - 'a Fut.t + ?ls:Task_local_storage.storage -> on:Runner.t -> (unit -> 'a) -> 'a Fut.t (** [spawn ~on f] runs [f()] on the runner (a thread pool typically) and returns a future result for it. See {!Fut.spawn}. - @param name if provided and [Trace] is present in dependencies, - a span will be created for the future. (since 0.6) @since 0.5 *) val spawn_on_current_runner : - ?name:string -> ?ls:Task_local_storage.storage -> (unit -> 'a) -> 'a Fut.t + ?ls:Task_local_storage.storage -> (unit -> 'a) -> 'a Fut.t (** See {!Fut.spawn_on_current_runner}. - @param name see {!spawn}. since 0.6. @since 0.5 *) [@@@ifge 5.0] @@ -240,6 +222,8 @@ module Private : sig module Domain_ = Domain_ (** Utils for domains *) + module Tracing_ = Tracing_ + val num_domains : unit -> int (** Number of domains in the backing domain pool *) end diff --git a/src/core/runner.ml b/src/core/runner.ml index 207ea56d..360ec6ba 100644 --- a/src/core/runner.ml +++ b/src/core/runner.ml @@ -3,7 +3,7 @@ module TLS = Thread_local_storage_ type task = unit -> unit type t = { - run_async: name:string -> ls:Task_local_storage.storage -> task -> unit; + run_async: ls:Task_local_storage.storage -> task -> unit; shutdown: wait:bool -> unit -> unit; size: unit -> int; num_tasks: unit -> int; @@ -11,9 +11,9 @@ type t = { exception Shutdown -let[@inline] run_async ?(name = "") - ?(ls = Task_local_storage.Private_.Storage.create ()) (self : t) f : unit = - self.run_async ~name ~ls f +let[@inline] run_async ?(ls = Task_local_storage.Private_.Storage.create ()) + (self : t) f : unit = + self.run_async ~ls f let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true () @@ -23,9 +23,9 @@ let[@inline] shutdown_without_waiting (self : t) : unit = let[@inline] num_tasks (self : t) : int = self.num_tasks () let[@inline] size (self : t) : int = self.size () -let run_wait_block ?name ?ls self (f : unit -> 'a) : 'a = +let run_wait_block ?ls self (f : unit -> 'a) : 'a = let q = Bb_queue.create () in - run_async ?name ?ls self (fun () -> + run_async ?ls self (fun () -> try let x = f () in Bb_queue.push q (Ok x) diff --git a/src/core/runner.mli b/src/core/runner.mli index 5b937c09..331e8b50 100644 --- a/src/core/runner.mli +++ b/src/core/runner.mli @@ -33,19 +33,14 @@ val shutdown_without_waiting : t -> unit exception Shutdown -val run_async : - ?name:string -> ?ls:Task_local_storage.storage -> t -> task -> unit +val run_async : ?ls:Task_local_storage.storage -> t -> task -> unit (** [run_async pool f] schedules [f] for later execution on the runner in one of the threads. [f()] will run on one of the runner's worker threads/domains. - @param name if provided and [Trace] is present in dependencies, a span - will be created when the task starts, and will stop when the task is over. - (since NEXT_RELEASE) @param ls if provided, run the task with this initial local storage @raise Shutdown if the runner was shut down before [run_async] was called. *) -val run_wait_block : - ?name:string -> ?ls:Task_local_storage.storage -> t -> (unit -> 'a) -> 'a +val run_wait_block : ?ls:Task_local_storage.storage -> t -> (unit -> 'a) -> 'a (** [run_wait_block pool f] schedules [f] for later execution on the pool, like {!run_async}. It then blocks the current thread until [f()] is done executing, @@ -65,7 +60,7 @@ module For_runner_implementors : sig size:(unit -> int) -> num_tasks:(unit -> int) -> shutdown:(wait:bool -> unit -> unit) -> - run_async:(name:string -> ls:Task_local_storage.storage -> task -> unit) -> + run_async:(ls:Task_local_storage.storage -> task -> unit) -> unit -> t (** Create a new runner. diff --git a/src/core/suspend_.ml b/src/core/suspend_.ml index 4d15ac77..193d3639 100644 --- a/src/core/suspend_.ml +++ b/src/core/suspend_.ml @@ -1,5 +1,3 @@ -module A = Atomic_ - type suspension = unit Exn_bt.result -> unit type task = unit -> unit @@ -7,7 +5,7 @@ type task = unit -> unit type suspension_handler = { handle: - run:(name:string -> task -> unit) -> + run:(task -> unit) -> resume:(suspension -> unit Exn_bt.result -> unit) -> suspension -> unit; @@ -16,6 +14,8 @@ type suspension_handler = { [@@@ocaml.alert "-unstable"] +module A = Atomic_ + type _ Effect.t += | Suspend : suspension_handler -> unit Effect.t | Yield : unit Effect.t @@ -27,8 +27,7 @@ type with_suspend_handler = | WSH : { on_suspend: unit -> 'state; (** on_suspend called when [f()] suspends itself. *) - run: 'state -> name:string -> task -> unit; - (** run used to schedule new tasks *) + run: 'state -> task -> unit; (** run used to schedule new tasks *) resume: 'state -> suspension -> unit Exn_bt.result -> unit; (** resume run the suspension. Must be called exactly once. *) } diff --git a/src/core/suspend_.mli b/src/core/suspend_.mli index 1fff43ac..45d4bc97 100644 --- a/src/core/suspend_.mli +++ b/src/core/suspend_.mli @@ -3,8 +3,6 @@ This module is an implementation detail of Moonpool and should not be used outside of it, except by experts to implement {!Runner}. *) -open Types_ - type suspension = unit Exn_bt.result -> unit (** A suspended computation *) @@ -14,7 +12,7 @@ type task = unit -> unit type suspension_handler = { handle: - run:(name:string -> task -> unit) -> + run:(task -> unit) -> resume:(suspension -> unit Exn_bt.result -> unit) -> suspension -> unit; @@ -24,7 +22,6 @@ type suspension_handler = { The handler is given a few things: - - the name (if any) of the current computation - the suspended computation (which can be resumed with a result eventually); - a [run] function that can be used to start tasks to perform some @@ -70,8 +67,7 @@ type with_suspend_handler = | WSH : { on_suspend: unit -> 'state; (** on_suspend called when [f()] suspends itself. *) - run: 'state -> name:string -> task -> unit; - (** run used to schedule new tasks *) + run: 'state -> task -> unit; (** run used to schedule new tasks *) resume: 'state -> suspension -> unit Exn_bt.result -> unit; (** resume run the suspension. Must be called exactly once. *) } diff --git a/src/core/ws_pool.ml b/src/core/ws_pool.ml index d1fd7cf3..5627a4bb 100644 --- a/src/core/ws_pool.ml +++ b/src/core/ws_pool.ml @@ -16,7 +16,6 @@ end type task_full = { f: task; - name: string; ls: Task_local_storage.storage; } @@ -26,7 +25,6 @@ type worker_state = { pool_id_: Id.t; (** Unique per pool *) mutable thread: Thread.t; q: task_full WSQ.t; (** Work stealing queue *) - mutable cur_span: int64; cur_ls: Task_local_storage.storage ref; (** Task storage *) rng: Random.State.t; } @@ -75,10 +73,10 @@ let[@inline] try_wake_someone_ (self : state) : unit = ) (** Run [task] as is, on the pool. *) -let schedule_task_ (self : state) ~name ~ls (w : worker_state option) (f : task) +let schedule_task_ (self : state) ~ls (w : worker_state option) (f : task) : unit = (* Printf.printf "schedule task now (%d)\n%!" (Thread.id @@ Thread.self ()); *) - let task = { f; name; ls } in + let task = { f; ls } in match w with | Some w when Id.equal self.id_ w.pool_id_ -> (* we're on this same pool, schedule in the worker's state. Otherwise @@ -107,33 +105,26 @@ let schedule_task_ (self : state) ~name ~ls (w : worker_state option) (f : task) raise Shutdown (** Run this task, now. Must be called from a worker. *) -let run_task_now_ (self : state) ~runner (w : worker_state) ~name ~ls task : +let run_task_now_ (self : state) ~runner (w : worker_state) ~ls task : unit = (* Printf.printf "run task now (%d)\n%!" (Thread.id @@ Thread.self ()); *) let (AT_pair (before_task, after_task)) = self.around_task in w.cur_ls := ls; let _ctx = before_task runner in - w.cur_span <- Tracing_.enter_span name; - let[@inline] exit_span_ () = - Tracing_.exit_span w.cur_span; - w.cur_span <- Tracing_.dummy_span - in - - let on_suspend () = - exit_span_ (); + let[@inline] on_suspend () = !(w.cur_ls) in - let run_another_task ls ~name task' = + let run_another_task ls task' = let w = find_current_worker_ () in let ls' = Task_local_storage.Private_.Storage.copy ls in - schedule_task_ self w ~name ~ls:ls' task' + schedule_task_ self w ~ls:ls' task' in let resume ls k r = let w = find_current_worker_ () in - schedule_task_ self w ~name ~ls (fun () -> k r) + schedule_task_ self w ~ls (fun () -> k r) in (* run the task now, catching errors *) @@ -152,13 +143,12 @@ let run_task_now_ (self : state) ~runner (w : worker_state) ~name ~ls task : let bt = Printexc.get_raw_backtrace () in self.on_exn e bt); - exit_span_ (); after_task runner _ctx; w.cur_ls := Task_local_storage.Private_.Storage.dummy -let[@inline] run_async_ (self : state) ~name ~ls (f : task) : unit = +let[@inline] run_async_ (self : state) ~ls (f : task) : unit = let w = find_current_worker_ () in - schedule_task_ self w ~name ~ls f + schedule_task_ self w ~ls f (* TODO: function to schedule many tasks from the outside. - build a queue @@ -204,7 +194,7 @@ let worker_run_self_tasks_ (self : state) ~runner w : unit = match WSQ.pop w.q with | Some task -> try_wake_someone_ self; - run_task_now_ self ~runner w ~name:task.name ~ls:task.ls task.f + run_task_now_ self ~runner w ~ls:task.ls task.f | None -> continue := false done @@ -217,7 +207,7 @@ let worker_thread_ (self : state) ~(runner : t) (w : worker_state) : unit = worker_run_self_tasks_ self ~runner w; try_steal () and run_task task : unit = - run_task_now_ self ~runner w ~name:task.name ~ls:task.ls task.f; + run_task_now_ self ~runner w ~ls:task.ls task.f; main () and try_steal () = match try_to_steal_work_once_ self w with @@ -276,7 +266,7 @@ type ('a, 'b) create_args = 'a (** Arguments used in {!create}. See {!create} for explanations. *) -let dummy_task_ = { f = ignore; ls = Task_local_storage.Private_.Storage.dummy ; name = "DUMMY_TASK" } +let dummy_task_ = { f = ignore; ls = Task_local_storage.Private_.Storage.dummy ; } let create ?(on_init_thread = default_thread_init_exit_) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) @@ -301,7 +291,6 @@ let create ?(on_init_thread = default_thread_init_exit_) { pool_id_; thread = dummy; - cur_span = Tracing_.dummy_span; q = WSQ.create ~dummy:dummy_task_ (); rng = Random.State.make [| i |]; cur_ls = ref Task_local_storage.Private_.Storage.dummy; @@ -326,7 +315,7 @@ let create ?(on_init_thread = default_thread_init_exit_) let runner = Runner.For_runner_implementors.create ~shutdown:(fun ~wait () -> shutdown_ pool ~wait) - ~run_async:(fun ~name ~ls f -> run_async_ pool ~name ~ls f) + ~run_async:(fun ~ls f -> run_async_ pool ~ls f) ~size:(fun () -> size_ pool) ~num_tasks:(fun () -> num_tasks_ pool) () diff --git a/src/fib/fiber.ml b/src/fib/fiber.ml index 25a4485e..ebfd3319 100644 --- a/src/fib/fiber.ml +++ b/src/fib/fiber.ml @@ -148,9 +148,9 @@ let add_child_ ~protect (self : _ t) (child : _ t) = let k_current_fiber : any option Task_local_storage.key = Task_local_storage.new_key ~init:(fun () -> None) () -let spawn_ ?name ~on (f : _ -> 'a) : 'a t = +let spawn_ ~on (f : _ -> 'a) : 'a t = let id = Handle.generate_fresh () in - let res, _promise = Fut.make ?name () in + let res, _promise = Fut.make () in let fib = { state = A.make @@ Alive { children = FM.empty; on_cancel = [] }; @@ -172,17 +172,17 @@ let spawn_ ?name ~on (f : _ -> 'a) : 'a t = resolve_as_failed_ fib ebt in - Runner.run_async on ?name run; + Runner.run_async on run; fib -let[@inline] spawn_top ?name ~on f : _ t = spawn_ ?name ~on f +let[@inline] spawn_top ~on f : _ t = spawn_ ~on f -let spawn_link ?name ~protect f : _ t = +let spawn_link ~protect f : _ t = match Task_local_storage.get k_current_fiber with | None -> failwith "Fiber.spawn_link: must be run from inside a fiber." | Some (Any parent) -> - let child = spawn_ ?name ~on:parent.runner f in + let child = spawn_ ~on:parent.runner f in add_child_ ~protect parent child; child diff --git a/src/fib/fiber.mli b/src/fib/fiber.mli index dc60b001..5b01948a 100644 --- a/src/fib/fiber.mli +++ b/src/fib/fiber.mli @@ -55,12 +55,12 @@ val on_result : 'a t -> 'a callback -> unit with the result. If the fiber is done already then the callback is invoked immediately with its result. *) -val spawn_top : ?name:string -> on:Runner.t -> (unit -> 'a) -> 'a t +val spawn_top : on:Runner.t -> (unit -> 'a) -> 'a t (** [spawn_top ~on f] spawns a new (toplevel) fiber onto the given runner. This fiber is not the child of any other fiber: its lifetime is only determined by the lifetime of [f()]. *) -val spawn_link : ?name:string -> protect:bool -> (unit -> 'a) -> 'a t +val spawn_link : protect:bool -> (unit -> 'a) -> 'a t (** [spawn_link ~protect f] spawns a sub-fiber [f_child] from a running fiber [parent]. The sub-fiber [f_child] is attached to the current fiber and fails diff --git a/src/forkjoin/moonpool_forkjoin.ml b/src/forkjoin/moonpool_forkjoin.ml index 27aa1984..052ca7f2 100644 --- a/src/forkjoin/moonpool_forkjoin.ml +++ b/src/forkjoin/moonpool_forkjoin.ml @@ -135,7 +135,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit = let len_range = min chunk_size (n - offset) in assert (offset + len_range <= n); - run ~name:"" (fun () -> task_for ~offset ~len_range); + run (fun () -> task_for ~offset ~len_range); i := !i + len_range done in diff --git a/src/lwt/IO.ml b/src/lwt/IO.ml new file mode 100644 index 00000000..4a8acc69 --- /dev/null +++ b/src/lwt/IO.ml @@ -0,0 +1,74 @@ +open Base + +let await_readable fd : unit = + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~run:_ ~resume sus -> + Perform_action_in_lwt.schedule + @@ Action.Wait_readable + ( fd, + fun cancel -> + resume sus @@ Ok (); + Lwt_engine.stop_event cancel )); + } + +let rec read fd buf i len : int = + if len = 0 then + 0 + else ( + match Unix.read fd buf i len with + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + await_readable fd; + read fd buf i len + | n -> n + ) + +let await_writable fd = + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~run:_ ~resume sus -> + Perform_action_in_lwt.schedule + @@ Action.Wait_writable + ( fd, + fun cancel -> + resume sus @@ Ok (); + Lwt_engine.stop_event cancel )); + } + +let rec write_once fd buf i len : int = + if len = 0 then + 0 + else ( + match Unix.write fd buf i len with + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + await_writable fd; + write_once fd buf i len + | n -> n + ) + +let write fd buf i len : unit = + let i = ref i in + let len = ref len in + while !len > 0 do + let n = write_once fd buf !i !len in + i := !i + n; + len := !len - n + done + +(** Sleep for the given amount of seconds *) +let sleep_s (f : float) : unit = + if f > 0. then + Moonpool.Private.Suspend_.suspend + { + handle = + (fun ~run:_ ~resume sus -> + Perform_action_in_lwt.schedule + @@ Action.Sleep + ( f, + false, + fun cancel -> + resume sus @@ Ok (); + Lwt_engine.stop_event cancel )); + } diff --git a/src/lwt/IO_in.ml b/src/lwt/IO_in.ml new file mode 100644 index 00000000..a0e2744a --- /dev/null +++ b/src/lwt/IO_in.ml @@ -0,0 +1,154 @@ +open Common_ + +class type t = + object + method input : bytes -> int -> int -> int + (** Read into the slice. Returns [0] only if the + stream is closed. *) + + method close : unit -> unit + (** Close the input. Must be idempotent. *) + end + +let create ?(close = ignore) ~input () : t = + object + method close = close + method input = input + end + +let empty : t = + object + method close () = () + method input _ _ _ = 0 + end + +let of_bytes ?(off = 0) ?len (b : bytes) : t = + (* i: current position in [b] *) + let i = ref off in + + let len = + match len with + | Some n -> + if n > Bytes.length b - off then invalid_arg "Iostream.In.of_bytes"; + n + | None -> Bytes.length b - off + in + let end_ = off + len in + + object + method input b_out i_out len_out = + let n = min (end_ - !i) len_out in + Bytes.blit b !i b_out i_out n; + i := !i + n; + n + + method close () = i := end_ + end + +let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s) + +(** Read into the given slice. + @return the number of bytes read, [0] means end of input. *) +let[@inline] input (self : #t) buf i len = self#input buf i len + +(** Close the channel. *) +let[@inline] close self : unit = self#close () + +let rec really_input (self : #t) buf i len = + if len > 0 then ( + let n = input self buf i len in + if n = 0 then raise End_of_file; + (really_input [@tailrec]) self buf (i + n) (len - n) + ) + +let really_input_string self n : string = + let buf = Bytes.create n in + really_input self buf 0 n; + Bytes.unsafe_to_string buf + +let copy_into ?(buf = Bytes.create _default_buf_size) (ic : #t) (oc : IO_out.t) + : unit = + let continue = ref true in + while !continue do + let len = input ic buf 0 (Bytes.length buf) in + if len = 0 then + continue := false + else + IO_out.output oc buf 0 len + done + +let concat (l0 : t list) : t = + let l = ref l0 in + let rec input b i len : int = + match !l with + | [] -> 0 + | ic :: tl -> + let n = ic#input b i len in + if n > 0 then + n + else ( + l := tl; + input b i len + ) + in + let close () = List.iter close l0 in + create ~close ~input () + +let input_all ?(buf = Bytes.create 128) (self : #t) : string = + let buf = ref buf in + let i = ref 0 in + + let[@inline] full_ () = !i = Bytes.length !buf in + + let grow_ () = + let old_size = Bytes.length !buf in + let new_size = min Sys.max_string_length (old_size + (old_size / 4) + 10) in + if old_size = new_size then + failwith "input_all: maximum input size exceeded"; + let new_buf = Bytes.extend !buf 0 (new_size - old_size) in + buf := new_buf + in + + let rec loop () = + if full_ () then grow_ (); + let available = Bytes.length !buf - !i in + let n = input self !buf !i available in + if n > 0 then ( + i := !i + n; + (loop [@tailrec]) () + ) + in + loop (); + + if full_ () then + Bytes.unsafe_to_string !buf + else + Bytes.sub_string !buf 0 !i + +let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) + (fd : Unix.file_descr) : t = + let buf_len = ref 0 in + let buf_off = ref 0 in + + let refill () = + buf_off := 0; + buf_len := IO.read fd buf 0 (Bytes.length buf) + in + + object + method input b i len : int = + if !buf_len = 0 then refill (); + let n = min len !buf_len in + if n > 0 then ( + Bytes.blit buf !buf_off b i n; + buf_off := !buf_off + n; + buf_len := !buf_len - n + ); + n + + method close () = + if close_noerr then ( + try Unix.close fd with _ -> () + ) else + Unix.close fd + end diff --git a/src/lwt/IO_out.ml b/src/lwt/IO_out.ml new file mode 100644 index 00000000..9c1207dc --- /dev/null +++ b/src/lwt/IO_out.ml @@ -0,0 +1,119 @@ +open Common_ + +class type t = + object + method output_char : char -> unit + method output : bytes -> int -> int -> unit + method flush : unit -> unit + method close : unit -> unit + end + +let create ?(flush = ignore) ?(close = ignore) ~output_char ~output () : t = + object + method flush () = flush () + method close () = close () + method output_char c = output_char c + method output bs i len = output bs i len + end + +let dummy : t = + object + method flush () = () + method close () = () + method output_char _ = () + method output _ _ _ = () + end + +let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) fd + : t = + let buf_off = ref 0 in + + let[@inline] is_full () = !buf_off = Bytes.length buf in + + let flush () = + if !buf_off > 0 then ( + IO.write fd buf 0 !buf_off; + buf_off := 0 + ) + in + + object + method output_char c = + if is_full () then flush (); + Bytes.set buf !buf_off c; + incr buf_off + + method output bs i len : unit = + let i = ref i in + let len = ref len in + + while !len > 0 do + (* make space *) + if is_full () then flush (); + + let n = min !len (Bytes.length buf - !buf_off) in + Bytes.blit bs !i buf !buf_off n; + buf_off := !buf_off + n; + i := !i + n; + len := !len - n + done; + (* if full, write eagerly *) + if is_full () then flush () + + method close () = + if close_noerr then ( + try + flush (); + Unix.close fd + with _ -> () + ) else ( + flush (); + Unix.close fd + ) + + method flush = flush + end + +let of_buffer (buf : Buffer.t) : t = + object + method close () = () + method flush () = () + method output_char c = Buffer.add_char buf c + method output bs i len = Buffer.add_subbytes buf bs i len + end + +(** Output the buffer slice into this channel *) +let[@inline] output_char (self : #t) c : unit = self#output_char c + +(** Output the buffer slice into this channel *) +let[@inline] output (self : #t) buf i len : unit = self#output buf i len + +let[@inline] output_string (self : #t) (str : string) : unit = + self#output (Bytes.unsafe_of_string str) 0 (String.length str) + +let output_line (self : #t) (str : string) : unit = + output_string self str; + output_char self '\n' + +(** Close the channel. *) +let[@inline] close self : unit = self#close () + +(** Flush (ie. force write) any buffered bytes. *) +let[@inline] flush self : unit = self#flush () + +let output_int self i = + let s = string_of_int i in + output_string self s + +let output_lines self seq = Seq.iter (output_line self) seq + +let tee (l : t list) : t = + match l with + | [] -> dummy + | [ oc ] -> oc + | _ -> + let output bs i len = List.iter (fun oc -> output oc bs i len) l in + let output_char c = List.iter (fun oc -> output_char oc c) l in + let close () = List.iter close l in + let flush () = List.iter flush l in + create ~flush ~close ~output ~output_char () diff --git a/src/lwt/base.ml b/src/lwt/base.ml new file mode 100644 index 00000000..88e7ed3d --- /dev/null +++ b/src/lwt/base.ml @@ -0,0 +1,163 @@ +open Common_ +module Fiber = Moonpool_fib.Fiber +module FLS = Moonpool_fib.Fls + +(** Action scheduled from outside the loop *) +module Action = struct + type event = Lwt_engine.event + type cb = event -> unit + + (** Action that we ask the lwt loop to perform, from the outside *) + type t = + | Wait_readable of Unix.file_descr * cb + | Wait_writable of Unix.file_descr * cb + | Sleep of float * bool * cb + (* TODO: provide actions with cancellation, alongside a "select" operation *) + (* | Cancel of event *) + | On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t + | Wakeup : 'a Lwt.u * 'a -> t + | Wakeup_exn : _ Lwt.u * exn -> t + | Other of (unit -> unit) + + (** Perform the action from within the Lwt thread *) + let perform (self : t) : unit = + match self with + | Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event) + | Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event) + | Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event) + (* | Cancel ev -> Lwt_engine.stop_event ev *) + | On_termination (fut, f) -> + Lwt.on_any fut + (fun x -> f @@ Ok x) + (fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn)) + | Wakeup (prom, x) -> Lwt.wakeup prom x + | Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e + | Other f -> f () +end + +module Action_queue = struct + type t = { q: Action.t list Atomic.t } [@@unboxed] + + let create () : t = { q = Atomic.make [] } + let pop_all (self : t) : _ list = Atomic.exchange self.q [] + + (** Push the action and return whether the queue was previously empty *) + let push (self : t) (a : Action.t) : bool = + let is_first = ref true in + while + let old = Atomic.get self.q in + if Atomic.compare_and_set self.q old (a :: old) then ( + is_first := old == []; + false + ) else + true + do + () + done; + !is_first +end + +module Perform_action_in_lwt = struct + open struct + let actions_ : Action_queue.t = Action_queue.create () + + (** Gets the current set of notifications and perform them from inside the + Lwt thread *) + let perform_pending_actions () : unit = + let@ _sp = + Moonpool.Private.Tracing_.with_span + "moonpool-lwt.perform-pending-actions" + in + + let l = Action_queue.pop_all actions_ in + List.iter Action.perform l + + let notification : int = + Lwt_unix.make_notification ~once:false perform_pending_actions + end + + let schedule (a : Action.t) : unit = + let is_first = Action_queue.push actions_ a in + if is_first then Lwt_unix.send_notification notification +end + +let get_runner () : M.Runner.t = + match M.Runner.get_current_runner () with + | Some r -> r + | None -> failwith "Moonpool_lwt.get_runner: not inside a runner" + +let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t = + let lwt_fut, lwt_prom = Lwt.wait () in + M.Fut.on_result fut (function + | Ok x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (lwt_prom, x) + | Error (exn, _) -> + Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (lwt_prom, exn)); + lwt_fut + +let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = + match Lwt.poll lwt_fut with + | Some x -> M.Fut.return x + | None -> + let fut, prom = M.Fut.make () in + Lwt.on_any lwt_fut + (fun x -> M.Fut.fulfill prom (Ok x)) + (fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10))); + fut + +let await_lwt (fut : _ Lwt.t) = + match Lwt.poll fut with + | Some x -> x + | None -> + (* suspend fiber, wake it up when [fut] resolves *) + M.Private.Suspend_.suspend + { + handle = + (fun ~run:_ ~resume sus -> + let on_lwt_done _ = resume sus @@ Ok () in + Perform_action_in_lwt.( + schedule Action.(On_termination (fut, on_lwt_done)))); + }; + + (match Lwt.poll fut with + | Some x -> x + | None -> assert false) + +let run_in_lwt f : _ M.Fut.t = + let fut, prom = M.Fut.make () in + Perform_action_in_lwt.schedule + (Action.Other + (fun () -> + let lwt_fut = f () in + Lwt.on_any lwt_fut + (fun x -> M.Fut.fulfill prom @@ Ok x) + (fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn)))); + fut + +let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f + +let detach_in_runner ~runner f : _ Lwt.t = + let fut, promise = Lwt.wait () in + M.Runner.run_async runner (fun () -> + match f () with + | x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (promise, x) + | exception exn -> + Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn)); + fut + +let main_with_runner ~runner (f : unit -> 'a) : 'a = + let lwt_fut, lwt_prom = Lwt.wait () in + + let _fiber = + Fiber.spawn_top ~on:runner (fun () -> + try + let x = f () in + Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x)) + with exn -> + Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn))) + in + + Lwt_main.run lwt_fut + +let main f = + let@ runner = M.Ws_pool.with_ () in + main_with_runner ~runner f diff --git a/src/lwt/common_.ml b/src/lwt/common_.ml new file mode 100644 index 00000000..d1fdf6d2 --- /dev/null +++ b/src/lwt/common_.ml @@ -0,0 +1,5 @@ +module M = Moonpool +module Exn_bt = M.Exn_bt + +let ( let@ ) = ( @@ ) +let _default_buf_size = 4 * 1024 diff --git a/src/lwt/dune b/src/lwt/dune new file mode 100644 index 00000000..f3191c3c --- /dev/null +++ b/src/lwt/dune @@ -0,0 +1,8 @@ + +(library + (name moonpool_lwt) + (public_name moonpool-lwt) + (private_modules common_) + (enabled_if + (>= %{ocaml_version} 5.0)) + (libraries moonpool moonpool.fib lwt lwt.unix)) diff --git a/src/lwt/moonpool_lwt.ml b/src/lwt/moonpool_lwt.ml new file mode 100644 index 00000000..1d92ddab --- /dev/null +++ b/src/lwt/moonpool_lwt.ml @@ -0,0 +1,6 @@ +include Base +module IO = IO +module IO_out = IO_out +module IO_in = IO_in +module TCP_server = Tcp_server +module TCP_client = Tcp_client diff --git a/src/lwt/moonpool_lwt.mli b/src/lwt/moonpool_lwt.mli new file mode 100644 index 00000000..ac218e0c --- /dev/null +++ b/src/lwt/moonpool_lwt.mli @@ -0,0 +1,144 @@ +(** Lwt_engine-based event loop for Moonpool. + + In what follows, we mean by "lwt thread" the thread + running [Lwt_main.run] (so, the thread where the Lwt event + loop and all Lwt callbacks execute). + + @since NEXT_RELEASE *) + +module Fiber = Moonpool_fib.Fiber +module FLS = Moonpool_fib.Fls + +(** {2 Basic conversions} *) + +val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t +(** [fut_of_lwt lwt_fut] makes a thread-safe moonpool future that + completes when [lwt_fut] does. This must be run from within + the Lwt thread. *) + +val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t +(** [lwt_of_fut fut] makes a lwt future that completes when + [fut] does. This must be called from the Lwt thread, and the result + must always be used only from inside the Lwt thread. *) + +(** {2 Helpers on the moonpool side} *) + +val await_lwt : 'a Lwt.t -> 'a +(** [await_lwt fut] awaits a Lwt future from inside a task running on + a moonpool runner. This must be run from within a Moonpool runner + so that the await-ing effect is handled. *) + +val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t +(** [run_in_lwt f] runs [f()] from within the Lwt thread + and returns a thread-safe future. This can be run from anywhere. *) + +val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a +(** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and + awaits its result. Must be run from inside a moonpool runner + so that the await-in effect is handled. + + This is similar to [Moonpool.await @@ run_in_lwt f]. *) + +val get_runner : unit -> Moonpool.Runner.t +(** Returns the runner from within which this is called. + Must be run from within a fiber. + @raise Failure if not run within a fiber *) + +(** {2 IO} *) + +(** IO using the Lwt event loop. + + These IO operations work on non-blocking file descriptors + and rely on a [Lwt_engine] event loop being active (meaning, + [Lwt_main.run] is currently running in some thread). + + Calling these functions must be done from a moonpool runner. + A function like [read] will first try to perform the IO action + directly (here, call {!Unix.read}); if the action fails because + the FD is not ready, then [await_readable] is called: + it suspends the fiber and subscribes it to Lwt to be awakened + when the FD becomes ready. +*) +module IO : sig + val read : Unix.file_descr -> bytes -> int -> int -> int + (** Read from the file descriptor *) + + val await_readable : Unix.file_descr -> unit + (** Suspend the fiber until the FD is readable *) + + val write_once : Unix.file_descr -> bytes -> int -> int -> int + (** Perform one write into the file descriptor *) + + val await_writable : Unix.file_descr -> unit + (** Suspend the fiber until the FD is writable *) + + val write : Unix.file_descr -> bytes -> int -> int -> unit + (** Loop around {!write_once} to write the entire slice. *) + + val sleep_s : float -> unit + (** Suspend the fiber for [n] seconds. *) +end + +module IO_in = IO_in +(** Input channel *) + +module IO_out = IO_out +(** Output channel *) + +module TCP_server : sig + type t = Lwt_io.server + + val establish_lwt : + ?backlog:(* ?server_fd:Unix.file_descr -> *) + int -> + ?no_close:bool -> + runner:Moonpool.Runner.t -> + Unix.sockaddr -> + (Unix.sockaddr -> Lwt_io.input_channel -> Lwt_io.output_channel -> unit) -> + t + (** [establish ~runner addr handler] runs a TCP server in the Lwt + thread. When a client connects, a moonpool fiber is started on [runner] + to handle it. *) + + val establish : + ?backlog:(* ?server_fd:Unix.file_descr -> *) + int -> + ?no_close:bool -> + runner:Moonpool.Runner.t -> + Unix.sockaddr -> + (Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) -> + t + (** Like {!establish_lwt} but uses {!IO} to directly handle + reads and writes on client sockets. *) + + val shutdown : t -> unit + (** Shutdown the server *) +end + +module TCP_client : sig + val connect : Unix.sockaddr -> Unix.file_descr + + val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a + (** Open a connection, and use {!IO} to read and write from + the socket in a non blocking way. *) + + val with_connect_lwt : + Unix.sockaddr -> (Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) -> 'a + (** Open a connection. *) +end + +(** {2 Helpers on the lwt side} *) + +val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t +(** [detach_in_runner ~runner f] runs [f] in the given moonpool runner, + and returns a lwt future. This must be run from within the thread + running [Lwt_main]. *) + +(** {2 Wrappers around Lwt_main} *) + +val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a +(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] inside + a fiber in [runner]. *) + +val main : (unit -> 'a) -> 'a +(** Like {!main_with_runner} but with a default choice of runner. *) diff --git a/src/lwt/tcp_client.ml b/src/lwt/tcp_client.ml new file mode 100644 index 00000000..8aec16f2 --- /dev/null +++ b/src/lwt/tcp_client.ml @@ -0,0 +1,53 @@ +open Common_ +open Base + +let connect addr : Unix.file_descr = + let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + Unix.set_nonblock sock; + Unix.setsockopt sock Unix.TCP_NODELAY true; + + (* connect asynchronously *) + while + try + Unix.connect sock addr; + false + with + | Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _) + -> + IO.await_writable sock; + true + do + () + done; + sock + +let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a = + let sock = connect addr in + + let ic = IO_in.of_unix_fd sock in + let oc = IO_out.of_unix_fd sock in + + let finally () = try Unix.close sock with _ -> () in + let@ () = Fun.protect ~finally in + f ic oc + +let with_connect_lwt addr + (f : Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) : 'a = + let sock = connect addr in + + let ic = + run_in_lwt_and_await (fun () -> + Lwt.return @@ Lwt_io.of_unix_fd ~mode:Lwt_io.input sock) + in + let oc = + run_in_lwt_and_await (fun () -> + Lwt.return @@ Lwt_io.of_unix_fd ~mode:Lwt_io.output sock) + in + + let finally () = + (try run_in_lwt_and_await (fun () -> Lwt_io.close ic) with _ -> ()); + (try run_in_lwt_and_await (fun () -> Lwt_io.close oc) with _ -> ()); + try Unix.close sock with _ -> () + in + let@ () = Fun.protect ~finally in + f ic oc diff --git a/src/lwt/tcp_server.ml b/src/lwt/tcp_server.ml new file mode 100644 index 00000000..22fa9253 --- /dev/null +++ b/src/lwt/tcp_server.ml @@ -0,0 +1,38 @@ +open Common_ +open Base + +type t = Lwt_io.server + +let establish_lwt ?backlog ?no_close ~runner addr handler : t = + let server = + Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr + (fun client_addr client_sock -> + let ic = Lwt_io.of_fd ~mode:Lwt_io.input client_sock in + let oc = Lwt_io.of_fd ~mode:Lwt_io.output client_sock in + + let fut = + M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc) + in + + let lwt_fut = lwt_of_fut fut in + lwt_fut) + in + await_lwt server + +let establish ?backlog ?no_close ~runner addr handler : t = + let server = + Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr + (fun client_addr client_sock -> + let ic = IO_in.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in + let oc = IO_out.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in + + let fut = + M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc) + in + + let lwt_fut = lwt_of_fut fut in + lwt_fut) + in + await_lwt server + +let shutdown self = await_lwt @@ Lwt_io.shutdown_server self diff --git a/src/private/tracing_.dummy.ml b/src/private/tracing_.dummy.ml index d685a5b3..ba6d0aff 100644 --- a/src/private/tracing_.dummy.ml +++ b/src/private/tracing_.dummy.ml @@ -3,3 +3,4 @@ let dummy_span = 0L let enter_span _name = dummy_span let exit_span = ignore let set_thread_name = ignore +let with_span _ f = f dummy_span diff --git a/src/private/tracing_.mli b/src/private/tracing_.mli index 35379332..d4634697 100644 --- a/src/private/tracing_.mli +++ b/src/private/tracing_.mli @@ -1,5 +1,6 @@ val dummy_span : int64 val enter_span : string -> int64 val exit_span : int64 -> unit +val with_span : string -> (int64 -> 'a) -> 'a val enabled : unit -> bool val set_thread_name : string -> unit diff --git a/src/private/tracing_.real.ml b/src/private/tracing_.real.ml index f71ec418..4a928e27 100644 --- a/src/private/tracing_.real.ml +++ b/src/private/tracing_.real.ml @@ -12,3 +12,14 @@ let[@inline] enter_span name : int64 = Trace.enter_span ~__FILE__:dummy_file_ ~__LINE__:0 name let[@inline] exit_span sp = if sp <> dummy_span then Trace.exit_span sp + +let with_span name f = + let sp = enter_span name in + try + let x = f sp in + exit_span sp; + x + with exn -> + let bt = Printexc.get_raw_backtrace () in + exit_span sp; + Printexc.raise_with_backtrace exn bt diff --git a/test/effect-based/t_fib1.ml b/test/effect-based/t_fib1.ml index 5a9d66e6..a2f62e82 100644 --- a/test/effect-based/t_fib1.ml +++ b/test/effect-based/t_fib1.ml @@ -20,7 +20,7 @@ let fib ~on x : int Fut.t = Fut.await t1 + Fut.await t2 ) in - Fut.spawn ~name:"fib" ~on (fun () -> fib_rec x) + Fut.spawn ~on (fun () -> fib_rec x) (* NOTE: for tracy support let () = Tracy_client_trace.setup () diff --git a/test/lwt/dune b/test/lwt/dune new file mode 100644 index 00000000..f52b77d7 --- /dev/null +++ b/test/lwt/dune @@ -0,0 +1,3 @@ +(executables + (names echo_server echo_client hash_server hash_client) + (libraries moonpool moonpool-lwt lwt lwt.unix trace.core trace-tef)) diff --git a/test/lwt/echo_client.ml b/test/lwt/echo_client.ml new file mode 100644 index 00000000..c4446d1d --- /dev/null +++ b/test/lwt/echo_client.ml @@ -0,0 +1,86 @@ +module M = Moonpool +module M_lwt = Moonpool_lwt +module Trace = Trace_core + +let ( let@ ) = ( @@ ) + +let main ~port ~runner ~n ~n_conn () : unit Lwt.t = + let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in + + let remaining = Atomic.make n in + let all_done = Atomic.make 0 in + + let fut_exit, prom_exit = M.Fut.make () in + + Printf.printf "connecting to port %d\n%!" port; + let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in + + let rec run_task () = + (* Printf.printf "running task\n%!"; *) + let n = Atomic.fetch_and_add remaining (-1) in + if n > 0 then ( + (let _sp = + Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "connect.client" + in + Trace.message "connecting new client…"; + M_lwt.TCP_client.with_connect addr @@ fun ic oc -> + let buf = Bytes.create 32 in + + for _j = 1 to 100 do + let _sp = + Trace.enter_manual_sub_span ~parent:_sp ~__FILE__ ~__LINE__ + "write.loop" + in + + M_lwt.IO_out.output_string oc "hello"; + M_lwt.IO_out.flush oc; + + (* read back something *) + M_lwt.IO_in.really_input ic buf 0 (String.length "hello"); + Trace.exit_manual_span _sp; + () + done; + Trace.exit_manual_span _sp); + + (* run another task *) M.Runner.run_async runner run_task + ) else ( + (* if we're the last to exit, resolve the promise *) + let n_already_done = Atomic.fetch_and_add all_done 1 in + if n_already_done = n_conn - 1 then ( + Printf.printf "all done\n%!"; + M.Fut.fulfill prom_exit @@ Ok () + ) + ) + in + + (* start the first [n_conn] tasks *) + for _i = 1 to n_conn do + M.Runner.run_async runner run_task + done; + + (* exit when [fut_exit] is resolved *) + M_lwt.lwt_of_fut fut_exit + +let () = + let@ () = Trace_tef.with_setup () in + Trace.set_thread_name "main"; + + let port = ref 0 in + let j = ref 4 in + let n_conn = ref 100 in + let n = ref 50_000 in + + let opts = + [ + "-p", Arg.Set_int port, " port"; + "-j", Arg.Set_int j, " number of threads"; + "-n", Arg.Set_int n, " total number of connections"; + "--n-conn", Arg.Set_int n_conn, " number of parallel connections"; + ] + |> Arg.align + in + Arg.parse opts ignore "echo client"; + + let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in + Lwt_engine.set @@ new Lwt_engine.libev (); + Lwt_main.run @@ main ~runner ~port:!port ~n:!n ~n_conn:!n_conn () diff --git a/test/lwt/echo_server.ml b/test/lwt/echo_server.ml new file mode 100644 index 00000000..ceff3cd2 --- /dev/null +++ b/test/lwt/echo_server.ml @@ -0,0 +1,66 @@ +module M = Moonpool +module M_lwt = Moonpool_lwt +module Trace = Trace_core + +let ( let@ ) = ( @@ ) +let spf = Printf.sprintf + +let str_of_sockaddr = function + | Unix.ADDR_UNIX s -> s + | Unix.ADDR_INET (addr, port) -> + spf "%s:%d" (Unix.string_of_inet_addr addr) port + +let main ~port ~runner () : unit Lwt.t = + let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in + + let lwt_fut, _lwt_prom = Lwt.wait () in + + (* TODO: handle exit?? *) + Printf.printf "listening on port %d\n%!" port; + + let handle_client client_addr ic oc = + let _sp = + Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "handle.client" + ~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ]) + in + + let buf = Bytes.create 32 in + let continue = ref true in + while !continue do + Trace.message "read"; + let n = M_lwt.IO_in.input ic buf 0 (Bytes.length buf) in + if n = 0 then + continue := false + else ( + Trace.messagef (fun k -> k "got %dB" n); + M_lwt.IO_out.output oc buf 0 n; + M_lwt.IO_out.flush oc; + Trace.message "write" + ) + done; + Trace.exit_manual_span _sp; + Trace.message "exit handle client" + in + + let addr = Unix.ADDR_INET (Unix.inet_addr_any, port) in + let _server = M_lwt.TCP_server.establish ~runner addr handle_client in + + lwt_fut + +let () = + let@ () = Trace_tef.with_setup () in + Trace.set_thread_name "main"; + let port = ref 0 in + let j = ref 4 in + + let opts = + [ + "-p", Arg.Set_int port, " port"; "-j", Arg.Set_int j, " number of threads"; + ] + |> Arg.align + in + Arg.parse opts ignore "echo server"; + + let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in + Lwt_engine.set @@ new Lwt_engine.libev (); + Lwt_main.run @@ main ~runner ~port:!port () diff --git a/test/lwt/hash_client.ml b/test/lwt/hash_client.ml new file mode 100644 index 00000000..8d6db790 --- /dev/null +++ b/test/lwt/hash_client.ml @@ -0,0 +1,69 @@ +module M = Moonpool +module M_lwt = Moonpool_lwt +module Trace = Trace_core + +let ( let@ ) = ( @@ ) + +let main ~port ~runner ~dir ~n_conn () : unit Lwt.t = + let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in + + Printf.printf "hash dir=%S\n%!" dir; + + Printf.printf "connecting to port %d\n%!" port; + let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in + + (* TODO: *) + let run_task () : unit = + let _sp = Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "run-task" in + M_lwt.TCP_client.with_connect_lwt addr @@ fun ic oc -> + let rec walk file : unit = + if not (Sys.file_exists file) then + () + else if Sys.is_regular_file file then ( + M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.write_line oc file); + let res = M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.read_line ic) in + Printf.printf "%s\n%!" res + ) else if Sys.is_directory file then ( + let _sp = + Trace.enter_manual_sub_span ~parent:_sp ~__FILE__ ~__LINE__ "walk-dir" + ~data:(fun () -> [ "d", `String file ]) + in + + Printf.printf "explore %S\n%!" file; + let d = Sys.readdir file in + Array.sort String.compare d; + Array.iter (fun sub -> walk (Filename.concat file sub)) d + ) + in + walk dir; + Trace.exit_manual_span _sp + in + + (* start the first [n_conn] tasks *) + let futs = List.init n_conn (fun _ -> M.Fut.spawn ~on:runner run_task) in + + Lwt.join (List.map M_lwt.lwt_of_fut futs) + +let () = + let@ () = Trace_tef.with_setup () in + Trace.set_thread_name "main"; + + let port = ref 1234 in + let j = ref 4 in + let n_conn = ref 100 in + let dir = ref "." in + + let opts = + [ + "-p", Arg.Set_int port, " port"; + "-j", Arg.Set_int j, " number of threads"; + "-d", Arg.Set_string dir, " directory to hash"; + "--n-conn", Arg.Set_int n_conn, " number of parallel connections"; + ] + |> Arg.align + in + Arg.parse opts ignore "echo client"; + + let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in + Lwt_engine.set @@ new Lwt_engine.libev (); + Lwt_main.run @@ main ~runner ~port:!port ~dir:!dir ~n_conn:!n_conn () diff --git a/test/lwt/hash_server.ml b/test/lwt/hash_server.ml new file mode 100644 index 00000000..9480be75 --- /dev/null +++ b/test/lwt/hash_server.ml @@ -0,0 +1,235 @@ +(* vendored from https://github.com/dbuenzli/uuidm *) + +let sha_1 s = + (* Based on pseudo-code of RFC 3174. Slow and ugly but does the job. *) + let sha_1_pad s = + let len = String.length s in + let blen = 8 * len in + let rem = len mod 64 in + let mlen = + if rem > 55 then + len + 128 - rem + else + len + 64 - rem + in + let m = Bytes.create mlen in + Bytes.blit_string s 0 m 0 len; + Bytes.fill m len (mlen - len) '\x00'; + Bytes.set m len '\x80'; + if Sys.word_size > 32 then ( + Bytes.set m (mlen - 8) (Char.unsafe_chr ((blen lsr 56) land 0xFF)); + Bytes.set m (mlen - 7) (Char.unsafe_chr ((blen lsr 48) land 0xFF)); + Bytes.set m (mlen - 6) (Char.unsafe_chr ((blen lsr 40) land 0xFF)); + Bytes.set m (mlen - 5) (Char.unsafe_chr ((blen lsr 32) land 0xFF)) + ); + Bytes.set m (mlen - 4) (Char.unsafe_chr ((blen lsr 24) land 0xFF)); + Bytes.set m (mlen - 3) (Char.unsafe_chr ((blen lsr 16) land 0xFF)); + Bytes.set m (mlen - 2) (Char.unsafe_chr ((blen lsr 8) land 0xFF)); + Bytes.set m (mlen - 1) (Char.unsafe_chr (blen land 0xFF)); + m + in + (* Operations on int32 *) + let ( &&& ) = ( land ) in + let ( lor ) = Int32.logor in + let ( lxor ) = Int32.logxor in + let ( land ) = Int32.logand in + let ( ++ ) = Int32.add in + let lnot = Int32.lognot in + let sr = Int32.shift_right in + let sl = Int32.shift_left in + let cls n x = sl x n lor Int32.shift_right_logical x (32 - n) in + (* Start *) + let m = sha_1_pad s in + let w = Array.make 16 0l in + let h0 = ref 0x67452301l in + let h1 = ref 0xEFCDAB89l in + let h2 = ref 0x98BADCFEl in + let h3 = ref 0x10325476l in + let h4 = ref 0xC3D2E1F0l in + let a = ref 0l in + let b = ref 0l in + let c = ref 0l in + let d = ref 0l in + let e = ref 0l in + for i = 0 to (Bytes.length m / 64) - 1 do + (* For each block *) + (* Fill w *) + let base = i * 64 in + for j = 0 to 15 do + let k = base + (j * 4) in + w.(j) <- + sl (Int32.of_int (Char.code @@ Bytes.get m k)) 24 + lor sl (Int32.of_int (Char.code @@ Bytes.get m (k + 1))) 16 + lor sl (Int32.of_int (Char.code @@ Bytes.get m (k + 2))) 8 + lor Int32.of_int (Char.code @@ Bytes.get m (k + 3)) + done; + (* Loop *) + a := !h0; + b := !h1; + c := !h2; + d := !h3; + e := !h4; + for t = 0 to 79 do + let f, k = + if t <= 19 then + !b land !c lor (lnot !b land !d), 0x5A827999l + else if t <= 39 then + !b lxor !c lxor !d, 0x6ED9EBA1l + else if t <= 59 then + !b land !c lor (!b land !d) lor (!c land !d), 0x8F1BBCDCl + else + !b lxor !c lxor !d, 0xCA62C1D6l + in + let s = t &&& 0xF in + if t >= 16 then + w.(s) <- + cls 1 + (w.(s + 13 &&& 0xF) + lxor w.(s + 8 &&& 0xF) + lxor w.(s + 2 &&& 0xF) + lxor w.(s)); + let temp = cls 5 !a ++ f ++ !e ++ w.(s) ++ k in + e := !d; + d := !c; + c := cls 30 !b; + b := !a; + a := temp + done; + (* Update *) + h0 := !h0 ++ !a; + h1 := !h1 ++ !b; + h2 := !h2 ++ !c; + h3 := !h3 ++ !d; + h4 := !h4 ++ !e + done; + let h = Bytes.create 20 in + let i2s h k i = + Bytes.set h k (Char.unsafe_chr (Int32.to_int (sr i 24) &&& 0xFF)); + Bytes.set h (k + 1) (Char.unsafe_chr (Int32.to_int (sr i 16) &&& 0xFF)); + Bytes.set h (k + 2) (Char.unsafe_chr (Int32.to_int (sr i 8) &&& 0xFF)); + Bytes.set h (k + 3) (Char.unsafe_chr (Int32.to_int i &&& 0xFF)) + in + i2s h 0 !h0; + i2s h 4 !h1; + i2s h 8 !h2; + i2s h 12 !h3; + i2s h 16 !h4; + Bytes.unsafe_to_string h + +(*--------------------------------------------------------------------------- + Copyright (c) 2008 The uuidm programmers + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + ---------------------------------------------------------------------------*) + +(* server that reads from sockets lists of files, and returns hashes of these files *) + +module M = Moonpool +module M_lwt = Moonpool_lwt +module Trace = Trace_core + +let ( let@ ) = ( @@ ) +let spf = Printf.sprintf + +let to_hex s = + let i2h i = String.get (spf "%x" i) 0 in + let n = String.length s in + let bs = Bytes.create (n * 2) in + for i = 0 to n - 1 do + Bytes.set bs (2 * i) (i2h ((Char.code s.[i] land 0b1111_0000) lsr 4)); + Bytes.set bs ((2 * i) + 1) (i2h (Char.code s.[i] land 0b0000_1111)) + done; + Bytes.unsafe_to_string bs + +let str_of_sockaddr = function + | Unix.ADDR_UNIX s -> s + | Unix.ADDR_INET (addr, port) -> + spf "%s:%d" (Unix.string_of_inet_addr addr) port + +[@@@ocaml.warning "-48"] + +let read_file filename : string = + let@ _sp = + Trace.with_span ~__FILE__ ~__LINE__ "read-file" ~data:(fun () -> + [ "f", `String filename ]) + in + In_channel.with_open_bin filename In_channel.input_all + +let main ~port ~runner () : unit Lwt.t = + let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in + + let lwt_fut, _lwt_prom = Lwt.wait () in + + (* TODO: handle exit?? *) + Printf.printf "listening on port %d\n%!" port; + + let handle_client client_addr ic oc = + let _sp = + Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "handle.client" + ~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ]) + in + + try + while true do + Trace.message "read"; + let filename = + M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.read_line ic) + |> String.trim + in + Trace.messagef (fun k -> k "hash %S" filename); + + match read_file filename with + | exception e -> + Printf.eprintf "error while reading %S:\n%s\n" filename + (Printexc.to_string e); + M_lwt.run_in_lwt_and_await (fun () -> + Lwt_io.write_line oc (spf "%s: error" filename)); + M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.flush oc) + | content -> + (* got the content, now hash it *) + let hash = + let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "hash" in + sha_1 content |> to_hex + in + + M_lwt.run_in_lwt_and_await (fun () -> + Lwt_io.write_line oc (spf "%s: %s" filename hash)); + M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.flush oc) + done + with End_of_file | Unix.Unix_error (Unix.ECONNRESET, _, _) -> + Trace.exit_manual_span _sp; + Trace.message "exit handle client" + in + + let addr = Unix.ADDR_INET (Unix.inet_addr_any, port) in + let _server = M_lwt.TCP_server.establish_lwt ~runner addr handle_client in + + lwt_fut + +let () = + let@ () = Trace_tef.with_setup () in + Trace.set_thread_name "main"; + let port = ref 1234 in + let j = ref 4 in + + let opts = + [ + "-p", Arg.Set_int port, " port"; "-j", Arg.Set_int j, " number of threads"; + ] + |> Arg.align + in + Arg.parse opts ignore "echo server"; + + let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in + Lwt_engine.set @@ new Lwt_engine.libev (); + Lwt_main.run @@ main ~runner ~port:!port ()