mirror of
https://github.com/c-cube/moonpool.git
synced 2026-03-10 07:35:55 -04:00
* feat: depend on picos, use picos.exn_bt * refactor: remove dla * non optional dependency on thread-local-storage it's a dep of picos anyway * wip: use picos computations * disable t_fib1 test, way too flaky * feat `fut`: wrap picos computations * detail in fut * gitignore * refactor core: use picos for schedulers; add Worker_loop_ we factor most of the thread workers' logic in `Worker_loop_`, which is now shared between Ws_pool and Fifo_pool * github actions * feat fut: add `on_result_ignore` * details * wip: port to picos * test: wip porting tests * fix fut: trigger failing to attach doesn't signal it * fix pool: only return No_more_tasks when local and global q empty * format * chore: fix CI by installing picos first * more CI * test: re-enable t_fib1 but with a single core fifo pool it should be deterministic now! * fixes after reviews * bump minimal OCaml version to 4.13 * use `exn_bt`, not `picos.exn_bt` * feat: optional dep on hmap, for inheritable FLS data * format * chore: depend on picos explicitly * feat: move hmap-fls to Fiber.Fls * change API for local FLS hmap * refactor: move optional hmap FLS stuff into core/task_local_storage * add Task_local_storage.remove_in_local_hmap * chore: try to fix CI * format * chore: CI * fix * feat: add `Fls.with_in_local_hmap` * chore: depend on hmap for tests * fix test for FLS use the inheritable keys * chore: CI * require OCaml 4.14 :/ * feat: add `moonpool.sync` with await-friendly abstractions based on picos_sync * fix: catch TLS.Not_set * fix: `LS.get` shouldn't raise * fix * update to merged picos PR * chore: CI * fix dep * feat: add `Event.of_fut` * chore: CI * remove dep on now defunct `exn_bt` * feat: add moonpool-io * chore: CI * version constraint on moonpool-io * add Event.Infix * move to picos_io
63 lines
1.6 KiB
OCaml
63 lines
1.6 KiB
OCaml
open Types_
|
|
|
|
type fiber = Picos.Fiber.t
|
|
type task = unit -> unit
|
|
|
|
type t = runner = {
|
|
run_async: fiber:fiber -> task -> unit;
|
|
shutdown: wait:bool -> unit -> unit;
|
|
size: unit -> int;
|
|
num_tasks: unit -> int;
|
|
}
|
|
|
|
exception Shutdown
|
|
|
|
let[@inline] run_async ?fiber (self : t) f : unit =
|
|
let fiber =
|
|
match fiber with
|
|
| Some f -> f
|
|
| None ->
|
|
let comp = Picos.Computation.create () in
|
|
Picos.Fiber.create ~forbid:false comp
|
|
in
|
|
self.run_async ~fiber f
|
|
|
|
let[@inline] shutdown (self : t) : unit = self.shutdown ~wait:true ()
|
|
|
|
let[@inline] shutdown_without_waiting (self : t) : unit =
|
|
self.shutdown ~wait:false ()
|
|
|
|
let[@inline] num_tasks (self : t) : int = self.num_tasks ()
|
|
let[@inline] size (self : t) : int = self.size ()
|
|
|
|
let run_wait_block ?fiber self (f : unit -> 'a) : 'a =
|
|
let q = Bb_queue.create () in
|
|
run_async ?fiber self (fun () ->
|
|
try
|
|
let x = f () in
|
|
Bb_queue.push q (Ok x)
|
|
with exn ->
|
|
let bt = Printexc.get_raw_backtrace () in
|
|
Bb_queue.push q (Error (exn, bt)));
|
|
match Bb_queue.pop q with
|
|
| Ok x -> x
|
|
| Error (exn, bt) -> Printexc.raise_with_backtrace exn bt
|
|
|
|
module For_runner_implementors = struct
|
|
let create ~size ~num_tasks ~shutdown ~run_async () : t =
|
|
{ size; num_tasks; shutdown; run_async }
|
|
|
|
let k_cur_runner : t TLS.t = Types_.k_cur_runner
|
|
end
|
|
|
|
let dummy : t =
|
|
For_runner_implementors.create
|
|
~size:(fun () -> 0)
|
|
~num_tasks:(fun () -> 0)
|
|
~shutdown:(fun ~wait:_ () -> ())
|
|
~run_async:(fun ~fiber:_ _ ->
|
|
failwith "Runner.dummy: cannot actually run tasks")
|
|
()
|
|
|
|
let get_current_runner = get_current_runner
|
|
let get_current_fiber = get_current_fiber
|