add Pool.run_wait_block; rename Pool.run into Pool.run_async

This commit is contained in:
Simon Cruanes 2023-06-24 14:50:43 -04:00
parent ffd37642b9
commit ab718b22f9
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
3 changed files with 49 additions and 6 deletions

View file

@ -19,7 +19,7 @@ The user can create several thread pools. These pools use regular posix threads,
but the threads are spread across multiple domains (on OCaml 5), which enables but the threads are spread across multiple domains (on OCaml 5), which enables
parallelism. parallelism.
The function `Pool.run pool task` runs `task()` on one of the workers The function `Pool.run_async pool task` runs `task()` on one of the workers
of `pool`, as soon as one is available. No result is returned. of `pool`, as soon as one is available. No result is returned.
```ocaml ```ocaml
@ -28,7 +28,7 @@ of `pool`, as soon as one is available. No result is returned.
val pool : Moonpool.Pool.t = <abstr> val pool : Moonpool.Pool.t = <abstr>
# begin # begin
Moonpool.Pool.run pool Moonpool.Pool.run_async pool
(fun () -> (fun () ->
Thread.delay 0.1; Thread.delay 0.1;
print_endline "running from the pool"); print_endline "running from the pool");
@ -40,6 +40,21 @@ running from the pool
- : unit = () - : unit = ()
``` ```
To wait until the task is done, you can use `Pool.run_wait_block` instead:
```ocaml
# begin
Moonpool.Pool.run_wait_block pool
(fun () ->
Thread.delay 0.1;
print_endline "running from the pool");
print_endline "running from the caller (after waiting)";
end ;;
running from the pool
running from the caller (after waiting)
- : unit = ()
```
The function `Fut.spawn ~on f` schedules `f ()` on the pool `on`, and immediately The function `Fut.spawn ~on f` schedules `f ()` on the pool `on`, and immediately
returns a _future_ which will eventually hold the result (or an exception). returns a _future_ which will eventually hold the result (or an exception).

View file

@ -54,13 +54,28 @@ let run_direct_ (self : t) (task : task) : unit =
(** Run [task]. It will be wrapped with an effect handler to (** Run [task]. It will be wrapped with an effect handler to
support {!Fut.await}. *) support {!Fut.await}. *)
let run (self : t) (task : task) : unit = let run_async (self : t) (task : task) : unit =
let task' () = let task' () =
(* run [f()] and handle [suspend] in it *) (* run [f()] and handle [suspend] in it *)
Suspend_.with_suspend task ~run:(run_direct_ self) Suspend_.with_suspend task ~run:(run_direct_ self)
in in
run_direct_ self task' run_direct_ self task'
let run = run_async
let run_wait_block self task : unit =
let q = Bb_queue.create () in
run_async self (fun () ->
try
task ();
Bb_queue.push q (Ok ())
with exn ->
let bt = Printexc.get_raw_backtrace () in
Bb_queue.push q (Error (exn, bt)));
match Bb_queue.pop q with
| Ok () -> ()
| Error (exn, bt) -> Printexc.raise_with_backtrace exn bt
let[@inline] size self = Array.length self.threads let[@inline] size self = Array.length self.threads
let num_tasks (self : t) : int = let num_tasks (self : t) : int =

View file

@ -71,8 +71,21 @@ val shutdown_without_waiting : t -> unit
exception Shutdown exception Shutdown
val run : t -> (unit -> unit) -> unit val run_async : t -> (unit -> unit) -> unit
(** [run pool f] schedules [f] for later execution on the pool (** [run_async pool f] schedules [f] for later execution on the pool
in one of the threads. [f()] will run on one of the pool's in one of the threads. [f()] will run on one of the pool's
worker threads. worker threads.
@raise Shutdown if the pool was shut down before [run] was called. *) @raise Shutdown if the pool was shut down before [run] was called.
@since 0.3 *)
val run : t -> (unit -> unit) -> unit
[@@deprecated "use run_async"]
(** deprecated alias to {!run_async} *)
val run_wait_block : t -> (unit -> unit) -> unit
(** [run_wait_block pool f] schedules [f] for later execution
on the pool, like {!run_async}.
It then blocks the current thread until [f()] is done executing.
{b NOTE} be careful with deadlocks (see notes in {!Fut.wait_block}).
@since 0.3 *)