From ab718b22f94ae0d3fed7a43a36d8ec675a720d3b Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 24 Jun 2023 14:50:43 -0400 Subject: [PATCH] add `Pool.run_wait_block`; rename `Pool.run` into `Pool.run_async` --- README.md | 19 +++++++++++++++++-- src/pool.ml | 17 ++++++++++++++++- src/pool.mli | 19 ++++++++++++++++--- 3 files changed, 49 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index dbcc9867..5c86c99a 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ The user can create several thread pools. These pools use regular posix threads, but the threads are spread across multiple domains (on OCaml 5), which enables parallelism. -The function `Pool.run pool task` runs `task()` on one of the workers +The function `Pool.run_async pool task` runs `task()` on one of the workers of `pool`, as soon as one is available. No result is returned. ```ocaml @@ -28,7 +28,7 @@ of `pool`, as soon as one is available. No result is returned. val pool : Moonpool.Pool.t = # begin - Moonpool.Pool.run pool + Moonpool.Pool.run_async pool (fun () -> Thread.delay 0.1; print_endline "running from the pool"); @@ -40,6 +40,21 @@ running from the pool - : unit = () ``` +To wait until the task is done, you can use `Pool.run_wait_block` instead: + +```ocaml +# begin + Moonpool.Pool.run_wait_block pool + (fun () -> + Thread.delay 0.1; + print_endline "running from the pool"); + print_endline "running from the caller (after waiting)"; + end ;; +running from the pool +running from the caller (after waiting) +- : unit = () +``` + The function `Fut.spawn ~on f` schedules `f ()` on the pool `on`, and immediately returns a _future_ which will eventually hold the result (or an exception). diff --git a/src/pool.ml b/src/pool.ml index 81aefce1..386f2209 100644 --- a/src/pool.ml +++ b/src/pool.ml @@ -54,13 +54,28 @@ let run_direct_ (self : t) (task : task) : unit = (** Run [task]. It will be wrapped with an effect handler to support {!Fut.await}. *) -let run (self : t) (task : task) : unit = +let run_async (self : t) (task : task) : unit = let task' () = (* run [f()] and handle [suspend] in it *) Suspend_.with_suspend task ~run:(run_direct_ self) in run_direct_ self task' +let run = run_async + +let run_wait_block self task : unit = + let q = Bb_queue.create () in + run_async self (fun () -> + try + task (); + Bb_queue.push q (Ok ()) + with exn -> + let bt = Printexc.get_raw_backtrace () in + Bb_queue.push q (Error (exn, bt))); + match Bb_queue.pop q with + | Ok () -> () + | Error (exn, bt) -> Printexc.raise_with_backtrace exn bt + let[@inline] size self = Array.length self.threads let num_tasks (self : t) : int = diff --git a/src/pool.mli b/src/pool.mli index 114f11ed..473c88e7 100644 --- a/src/pool.mli +++ b/src/pool.mli @@ -71,8 +71,21 @@ val shutdown_without_waiting : t -> unit exception Shutdown -val run : t -> (unit -> unit) -> unit -(** [run pool f] schedules [f] for later execution on the pool +val run_async : t -> (unit -> unit) -> unit +(** [run_async pool f] schedules [f] for later execution on the pool in one of the threads. [f()] will run on one of the pool's worker threads. - @raise Shutdown if the pool was shut down before [run] was called. *) + @raise Shutdown if the pool was shut down before [run] was called. + @since 0.3 *) + +val run : t -> (unit -> unit) -> unit + [@@deprecated "use run_async"] +(** deprecated alias to {!run_async} *) + +val run_wait_block : t -> (unit -> unit) -> unit +(** [run_wait_block pool f] schedules [f] for later execution + on the pool, like {!run_async}. + It then blocks the current thread until [f()] is done executing. + + {b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block}). + @since 0.3 *)