fifo pool: format, use with_suspend unconditionally

This commit is contained in:
Simon Cruanes 2024-02-27 21:23:02 -05:00
parent e94c7999de
commit b9cf0616b8
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -25,7 +25,9 @@ let schedule_ (self : state) (task : task_full) : unit =
type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task
let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit = let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit =
let cur_ls : Task_local_storage.storage ref = ref Task_local_storage.Private_.Storage.dummy in let cur_ls : Task_local_storage.storage ref =
ref Task_local_storage.Private_.Storage.dummy
in
TLS.set k_storage (Some cur_ls); TLS.set k_storage (Some cur_ls);
TLS.get Runner.For_runner_implementors.k_cur_runner := Some runner; TLS.get Runner.For_runner_implementors.k_cur_runner := Some runner;
@ -52,21 +54,13 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit =
cur_ls := task.ls; cur_ls := task.ls;
let _ctx = before_task runner in let _ctx = before_task runner in
let resume ls k res = let resume ls k res = schedule_ self { f = (fun () -> k res); ls } in
schedule_ self { f = (fun () -> k res); ls }
in
(* run the task now, catching errors, handling effects *) (* run the task now, catching errors, handling effects *)
(try (try
[@@@ifge 5.0] Suspend_.with_suspend
Suspend_.with_suspend (WSH { (WSH { run = run_another_task; resume; on_suspend })
run=run_another_task; task.f
resume;
on_suspend;
}) task.f
[@@@else_]
task.f()
[@@@endif]
with e -> with e ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
on_exn e bt); on_exn e bt);
@ -103,12 +97,12 @@ type ('a, 'b) create_args =
?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> ?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) -> ?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?num_threads:int -> ?num_threads:int ->
?name:string -> ?name:string ->
'a 'a
let create ?(on_init_thread = default_thread_init_exit_) let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
?around_task ?num_threads ?name () : t = ?around_task ?num_threads ?name () : t =
(* wrapper *) (* wrapper *)
let around_task = let around_task =
match around_task with match around_task with