mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-09 12:45:46 -05:00
suspend: remove additional parameter, always run tasks in handler
This commit is contained in:
parent
9e0a583a94
commit
68fe7221b8
5 changed files with 15 additions and 36 deletions
|
|
@ -12,20 +12,9 @@ let[@inline] size_ (self : state) = Array.length self.threads
|
||||||
let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q
|
let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q
|
||||||
|
|
||||||
(** Run [task] as is, on the pool. *)
|
(** Run [task] as is, on the pool. *)
|
||||||
let run_direct_ (self : state) (task : task) : unit =
|
let schedule_ (self : state) (task : task) : unit =
|
||||||
try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown
|
try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown
|
||||||
|
|
||||||
let rec run_async_ (self : state) (task : task) : unit =
|
|
||||||
let task' () =
|
|
||||||
(* run [f()] and handle [suspend] in it *)
|
|
||||||
Suspend_.with_suspend task ~run:(fun ~with_handler task ->
|
|
||||||
if with_handler then
|
|
||||||
run_async_ self task
|
|
||||||
else
|
|
||||||
run_direct_ self task)
|
|
||||||
in
|
|
||||||
run_direct_ self task'
|
|
||||||
|
|
||||||
type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task
|
type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task
|
||||||
|
|
||||||
let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit =
|
let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit =
|
||||||
|
|
@ -34,7 +23,7 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit =
|
||||||
let run_task task : unit =
|
let run_task task : unit =
|
||||||
let _ctx = before_task runner in
|
let _ctx = before_task runner in
|
||||||
(* run the task now, catching errors *)
|
(* run the task now, catching errors *)
|
||||||
(try task ()
|
(try Suspend_.with_suspend task ~run:(fun task' -> schedule_ self task')
|
||||||
with e ->
|
with e ->
|
||||||
let bt = Printexc.get_raw_backtrace () in
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
on_exn e bt);
|
on_exn e bt);
|
||||||
|
|
@ -98,7 +87,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
|
||||||
let runner =
|
let runner =
|
||||||
Runner.For_runner_implementors.create
|
Runner.For_runner_implementors.create
|
||||||
~shutdown:(fun ~wait () -> shutdown_ pool ~wait)
|
~shutdown:(fun ~wait () -> shutdown_ pool ~wait)
|
||||||
~run_async:(fun f -> run_async_ pool f)
|
~run_async:(fun f -> schedule_ pool f)
|
||||||
~size:(fun () -> size_ pool)
|
~size:(fun () -> size_ pool)
|
||||||
~num_tasks:(fun () -> num_tasks_ pool)
|
~num_tasks:(fun () -> num_tasks_ pool)
|
||||||
()
|
()
|
||||||
|
|
|
||||||
|
|
@ -62,7 +62,7 @@ let both f g : _ * _ =
|
||||||
let st = A.make { suspension = None; left = St_none; right = St_none } in
|
let st = A.make { suspension = None; left = St_none; right = St_none } in
|
||||||
|
|
||||||
let start_tasks ~run () : unit =
|
let start_tasks ~run () : unit =
|
||||||
run ~with_handler:true (fun () ->
|
run (fun () ->
|
||||||
try
|
try
|
||||||
let res = f () in
|
let res = f () in
|
||||||
set_left_ st (St_some res)
|
set_left_ st (St_some res)
|
||||||
|
|
@ -70,7 +70,7 @@ let both f g : _ * _ =
|
||||||
let bt = Printexc.get_raw_backtrace () in
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
set_left_ st (St_fail (e, bt)));
|
set_left_ st (St_fail (e, bt)));
|
||||||
|
|
||||||
run ~with_handler:true (fun () ->
|
run (fun () ->
|
||||||
try
|
try
|
||||||
let res = g () in
|
let res = g () in
|
||||||
set_right_ st (St_some res)
|
set_right_ st (St_some res)
|
||||||
|
|
@ -126,7 +126,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
|
||||||
let len_range = min chunk_size (n - offset) in
|
let len_range = min chunk_size (n - offset) in
|
||||||
assert (offset + len_range <= n);
|
assert (offset + len_range <= n);
|
||||||
|
|
||||||
run ~with_handler:true (fun () -> task_for ~offset ~len_range);
|
run (fun () -> task_for ~offset ~len_range);
|
||||||
i := !i + len_range
|
i := !i + len_range
|
||||||
done
|
done
|
||||||
in
|
in
|
||||||
|
|
|
||||||
|
|
@ -381,9 +381,7 @@ let await (fut : 'a t) : 'a =
|
||||||
Suspend_.handle =
|
Suspend_.handle =
|
||||||
(fun ~run k ->
|
(fun ~run k ->
|
||||||
on_result fut (function
|
on_result fut (function
|
||||||
| Ok _ ->
|
| Ok _ -> run (fun () -> k (Ok ()))
|
||||||
(* run without handler, we're already in a deep effect *)
|
|
||||||
run ~with_handler:false (fun () -> k (Ok ()))
|
|
||||||
| Error (exn, bt) ->
|
| Error (exn, bt) ->
|
||||||
(* fail continuation immediately *)
|
(* fail continuation immediately *)
|
||||||
k (Error (exn, bt))));
|
k (Error (exn, bt))));
|
||||||
|
|
|
||||||
|
|
@ -3,9 +3,7 @@ module A = Atomic_
|
||||||
type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit
|
type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit
|
||||||
type task = unit -> unit
|
type task = unit -> unit
|
||||||
|
|
||||||
type suspension_handler = {
|
type suspension_handler = { handle: run:(task -> unit) -> suspension -> unit }
|
||||||
handle: run:(with_handler:bool -> task -> unit) -> suspension -> unit;
|
|
||||||
}
|
|
||||||
[@@unboxed]
|
[@@unboxed]
|
||||||
|
|
||||||
[@@@ifge 5.0]
|
[@@@ifge 5.0]
|
||||||
|
|
@ -15,8 +13,7 @@ type _ Effect.t += Suspend : suspension_handler -> unit Effect.t
|
||||||
|
|
||||||
let[@inline] suspend h = Effect.perform (Suspend h)
|
let[@inline] suspend h = Effect.perform (Suspend h)
|
||||||
|
|
||||||
let with_suspend ~(run : with_handler:bool -> task -> unit) (f : unit -> unit) :
|
let with_suspend ~(run : task -> unit) (f : unit -> unit) : unit =
|
||||||
unit =
|
|
||||||
let module E = Effect.Deep in
|
let module E = Effect.Deep in
|
||||||
(* effect handler *)
|
(* effect handler *)
|
||||||
let effc : type e. e Effect.t -> ((e, _) E.continuation -> _) option =
|
let effc : type e. e Effect.t -> ((e, _) E.continuation -> _) option =
|
||||||
|
|
@ -37,14 +34,12 @@ let with_suspend ~(run : with_handler:bool -> task -> unit) (f : unit -> unit) :
|
||||||
(* DLA interop *)
|
(* DLA interop *)
|
||||||
let prepare_for_await () : Dla_.t =
|
let prepare_for_await () : Dla_.t =
|
||||||
(* current state *)
|
(* current state *)
|
||||||
let st : ((with_handler:bool -> task -> unit) * suspension) option A.t =
|
let st : ((task -> unit) * suspension) option A.t = A.make None in
|
||||||
A.make None
|
|
||||||
in
|
|
||||||
|
|
||||||
let release () : unit =
|
let release () : unit =
|
||||||
match A.exchange st None with
|
match A.exchange st None with
|
||||||
| None -> ()
|
| None -> ()
|
||||||
| Some (run, k) -> run ~with_handler:true (fun () -> k (Ok ()))
|
| Some (run, k) -> run (fun () -> k (Ok ()))
|
||||||
and await () : unit =
|
and await () : unit =
|
||||||
suspend { handle = (fun ~run k -> A.set st (Some (run, k))) }
|
suspend { handle = (fun ~run k -> A.set st (Some (run, k))) }
|
||||||
in
|
in
|
||||||
|
|
@ -55,7 +50,7 @@ let prepare_for_await () : Dla_.t =
|
||||||
[@@@ocaml.alert "+unstable"]
|
[@@@ocaml.alert "+unstable"]
|
||||||
[@@@else_]
|
[@@@else_]
|
||||||
|
|
||||||
let with_suspend ~run:_ f = f ()
|
let[@inline] with_suspend ~run:_ f = f ()
|
||||||
let prepare_for_await () = { Dla_.release = ignore; await = ignore }
|
let[@inline] prepare_for_await () = { Dla_.release = ignore; await = ignore }
|
||||||
|
|
||||||
[@@@endif]
|
[@@@endif]
|
||||||
|
|
|
||||||
|
|
@ -8,9 +8,7 @@ type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit
|
||||||
|
|
||||||
type task = unit -> unit
|
type task = unit -> unit
|
||||||
|
|
||||||
type suspension_handler = {
|
type suspension_handler = { handle: run:(task -> unit) -> suspension -> unit }
|
||||||
handle: run:(with_handler:bool -> task -> unit) -> suspension -> unit;
|
|
||||||
}
|
|
||||||
[@@unboxed]
|
[@@unboxed]
|
||||||
(** The handler that knows what to do with the suspended computation.
|
(** The handler that knows what to do with the suspended computation.
|
||||||
|
|
||||||
|
|
@ -53,8 +51,7 @@ val suspend : suspension_handler -> unit
|
||||||
val prepare_for_await : unit -> Dla_.t
|
val prepare_for_await : unit -> Dla_.t
|
||||||
(** Our stub for DLA. Unstable. *)
|
(** Our stub for DLA. Unstable. *)
|
||||||
|
|
||||||
val with_suspend :
|
val with_suspend : run:(task -> unit) -> (unit -> unit) -> unit
|
||||||
run:(with_handler:bool -> task -> unit) -> (unit -> unit) -> unit
|
|
||||||
(** [with_suspend ~run f] runs [f()] in an environment where [suspend]
|
(** [with_suspend ~run f] runs [f()] in an environment where [suspend]
|
||||||
will work. If [f()] suspends with suspension handler [h],
|
will work. If [f()] suspends with suspension handler [h],
|
||||||
this calls [h ~run k] where [k] is the suspension.
|
this calls [h ~run k] where [k] is the suspension.
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue