mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-16 15:56:21 -05:00
Compare commits
10 commits
d9254717ca
...
e17d39637f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e17d39637f | ||
|
|
ed0eda226c | ||
|
|
2b00a0cea1 | ||
|
|
3a5eaaa44d | ||
|
|
f0ea8c294d | ||
|
|
dd88008a0a | ||
|
|
c51a0a6bd4 | ||
|
|
deb96302e1 | ||
|
|
a20208ec37 | ||
|
|
389f237993 |
18 changed files with 137 additions and 50 deletions
2
.github/workflows/gh-pages.yml
vendored
2
.github/workflows/gh-pages.yml
vendored
|
|
@ -13,7 +13,7 @@ jobs:
|
|||
- uses: actions/checkout@main
|
||||
|
||||
- name: Use OCaml
|
||||
uses: ocaml/setup-ocaml@v2
|
||||
uses: ocaml/setup-ocaml@v3
|
||||
with:
|
||||
ocaml-compiler: '5.0'
|
||||
dune-cache: true
|
||||
|
|
|
|||
6
.github/workflows/main.yml
vendored
6
.github/workflows/main.yml
vendored
|
|
@ -23,7 +23,7 @@ jobs:
|
|||
steps:
|
||||
- uses: actions/checkout@main
|
||||
- name: Use OCaml ${{ matrix.ocaml-compiler }}
|
||||
uses: ocaml/setup-ocaml@v2
|
||||
uses: ocaml/setup-ocaml@v3
|
||||
with:
|
||||
ocaml-compiler: ${{ matrix.ocaml-compiler }}
|
||||
dune-cache: true
|
||||
|
|
@ -58,7 +58,7 @@ jobs:
|
|||
steps:
|
||||
- uses: actions/checkout@main
|
||||
- name: Use OCaml ${{ matrix.ocaml-compiler }}
|
||||
uses: ocaml/setup-ocaml@v2
|
||||
uses: ocaml/setup-ocaml@v3
|
||||
with:
|
||||
ocaml-compiler: ${{ matrix.ocaml-compiler }}
|
||||
dune-cache: true
|
||||
|
|
@ -83,7 +83,7 @@ jobs:
|
|||
steps:
|
||||
- uses: actions/checkout@main
|
||||
- name: Use OCaml ${{ matrix.ocaml-compiler }}
|
||||
uses: ocaml/setup-ocaml@v2
|
||||
uses: ocaml/setup-ocaml@v3
|
||||
with:
|
||||
ocaml-compiler: ${{ matrix.ocaml-compiler }}
|
||||
dune-cache: true
|
||||
|
|
|
|||
13
CHANGES.md
13
CHANGES.md
|
|
@ -1,4 +1,17 @@
|
|||
|
||||
# 0.8
|
||||
|
||||
- api(fut): make alias `'a Fut.t = 'a Picos.Computation.t` public
|
||||
- feat: add `Fut.make_promise`, have `'a promise = private 'a fut`
|
||||
- feat(exn_bt): in show/pp, do print the backtrace when present
|
||||
- feat: block signals in workers if asked to
|
||||
- relax bound on picos to 0.5-0.6
|
||||
- feat fib: `spawn_ignore` now has `?on` optional param
|
||||
- change Moonpool.Chan so it's bounded (stil experimental)
|
||||
|
||||
- fix task local storage: type was too specific
|
||||
- fix fiber: use a single fut/computation in fibers
|
||||
|
||||
# 0.7
|
||||
|
||||
- add `Moonpool_fiber.spawn_top_ignore`
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
(using mdx 0.2)
|
||||
|
||||
(name moonpool)
|
||||
(version 0.7)
|
||||
(version 0.8)
|
||||
(generate_opam_files true)
|
||||
(source
|
||||
(github c-cube/moonpool))
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
# This file is generated by dune, edit dune-project instead
|
||||
opam-version: "2.0"
|
||||
version: "0.7"
|
||||
version: "0.8"
|
||||
synopsis: "Async IO for moonpool, relying on picos (experimental)"
|
||||
maintainer: ["Simon Cruanes"]
|
||||
authors: ["Simon Cruanes"]
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
# This file is generated by dune, edit dune-project instead
|
||||
opam-version: "2.0"
|
||||
version: "0.7"
|
||||
version: "0.8"
|
||||
synopsis: "Event loop for moonpool based on Lwt-engine (experimental)"
|
||||
maintainer: ["Simon Cruanes"]
|
||||
authors: ["Simon Cruanes"]
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
# This file is generated by dune, edit dune-project instead
|
||||
opam-version: "2.0"
|
||||
version: "0.7"
|
||||
version: "0.8"
|
||||
synopsis: "Pools of threads supported by a pool of domains"
|
||||
maintainer: ["Simon Cruanes"]
|
||||
authors: ["Simon Cruanes"]
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
The channels have bounded size. Push/pop return futures or can use effects
|
||||
to provide an [await]-friendly version.
|
||||
|
||||
The channels became bounded since @NEXT_RELEASE .
|
||||
The channels became bounded since @0.7 .
|
||||
*)
|
||||
|
||||
type 'a t
|
||||
|
|
@ -35,13 +35,13 @@ val push : 'a t -> 'a -> unit
|
|||
(** Push the value into the channel, suspending the current task
|
||||
if the channel is currently full.
|
||||
@raise Closed if the channel is closed
|
||||
@since NEXT_RELEASE *)
|
||||
@since 0.7 *)
|
||||
|
||||
val pop : 'a t -> 'a
|
||||
(** Pop an element. This might suspend the current task if the
|
||||
channel is currently empty.
|
||||
@raise Closed if the channel is empty and closed.
|
||||
@since NEXT_RELEASE *)
|
||||
@since 0.7 *)
|
||||
|
||||
(*
|
||||
val pop_block_exn : 'a t -> 'a
|
||||
|
|
|
|||
|
|
@ -3,7 +3,15 @@ type t = exn * Printexc.raw_backtrace
|
|||
let[@inline] make exn bt : t = exn, bt
|
||||
let[@inline] exn (e, _) = e
|
||||
let[@inline] bt (_, bt) = bt
|
||||
let show self = Printexc.to_string (exn self)
|
||||
|
||||
let show self =
|
||||
let bt = Printexc.raw_backtrace_to_string (bt self) in
|
||||
let exn = Printexc.to_string (exn self) in
|
||||
if bt = "" then
|
||||
exn
|
||||
else
|
||||
Printf.sprintf "%s\n%s" exn bt
|
||||
|
||||
let pp out self = Format.pp_print_string out (show self)
|
||||
let[@inline] raise (e, bt) = Printexc.raise_with_backtrace e bt
|
||||
|
||||
|
|
|
|||
|
|
@ -165,7 +165,9 @@ let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
|
|||
create the thread and push it into [receive_threads] *)
|
||||
let create_thread_in_domain () =
|
||||
let st = { idx = i; dom_idx; st = pool } in
|
||||
let thread = Thread.create (WL.worker_loop ~ops:worker_ops) st in
|
||||
let thread =
|
||||
Thread.create (WL.worker_loop ~block_signals:true ~ops:worker_ops) st
|
||||
in
|
||||
(* send the thread from the domain back to us *)
|
||||
Bb_queue.push receive_threads (i, thread)
|
||||
in
|
||||
|
|
|
|||
|
|
@ -3,23 +3,23 @@ module C = Picos.Computation
|
|||
|
||||
type 'a or_error = ('a, Exn_bt.t) result
|
||||
type 'a waiter = 'a or_error -> unit
|
||||
type 'a t = { st: 'a C.t } [@@unboxed]
|
||||
type 'a t = 'a C.t
|
||||
type 'a promise = 'a t
|
||||
|
||||
let[@inline] make_promise () : _ t =
|
||||
let fut = { st = C.create ~mode:`LIFO () } in
|
||||
let fut = C.create ~mode:`LIFO () in
|
||||
fut
|
||||
|
||||
let make () =
|
||||
let fut = make_promise () in
|
||||
fut, fut
|
||||
|
||||
let[@inline] return x : _ t = { st = C.returned x }
|
||||
let[@inline] return x : _ t = C.returned x
|
||||
|
||||
let[@inline] fail exn bt : _ t =
|
||||
let st = C.create () in
|
||||
C.cancel st exn bt;
|
||||
{ st }
|
||||
let fut = C.create () in
|
||||
C.cancel fut exn bt;
|
||||
fut
|
||||
|
||||
let[@inline] fail_exn_bt ebt = fail (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||
|
||||
|
|
@ -27,32 +27,32 @@ let[@inline] of_result = function
|
|||
| Ok x -> return x
|
||||
| Error ebt -> fail_exn_bt ebt
|
||||
|
||||
let[@inline] is_resolved self : bool = not (C.is_running self.st)
|
||||
let[@inline] is_resolved self : bool = not (C.is_running self)
|
||||
let is_done = is_resolved
|
||||
let[@inline] peek self : _ option = C.peek self.st
|
||||
let[@inline] raise_if_failed self : unit = C.check self.st
|
||||
let peek : 'a t -> _ option = C.peek
|
||||
let raise_if_failed : _ t -> unit = C.check
|
||||
|
||||
let[@inline] is_success self =
|
||||
match C.peek_exn self.st with
|
||||
match C.peek_exn self with
|
||||
| _ -> true
|
||||
| exception _ -> false
|
||||
|
||||
let[@inline] is_failed self = C.is_canceled self.st
|
||||
let is_failed : _ t -> bool = C.is_canceled
|
||||
|
||||
exception Not_ready
|
||||
|
||||
let[@inline] get_or_fail self =
|
||||
match C.peek self.st with
|
||||
match C.peek self with
|
||||
| Some x -> x
|
||||
| None -> raise Not_ready
|
||||
|
||||
let[@inline] get_or_fail_exn self =
|
||||
match C.peek_exn self.st with
|
||||
match C.peek_exn self with
|
||||
| x -> x
|
||||
| exception C.Running -> raise Not_ready
|
||||
|
||||
let[@inline] peek_or_assert_ (self : 'a t) : 'a =
|
||||
match C.peek_exn self.st with
|
||||
match C.peek_exn self with
|
||||
| x -> x
|
||||
| exception C.Running -> assert false
|
||||
|
||||
|
|
@ -67,32 +67,32 @@ let on_result (self : _ t) (f : _ waiter) : unit =
|
|||
let trigger =
|
||||
(Trigger.from_action f self on_result_cb_ [@alert "-handler"])
|
||||
in
|
||||
if not (C.try_attach self.st trigger) then on_result_cb_ () f self
|
||||
if not (C.try_attach self trigger) then on_result_cb_ () f self
|
||||
|
||||
let on_result_ignore_cb_ _tr f (self : _ t) =
|
||||
f (Picos.Computation.canceled self.st)
|
||||
f (Picos.Computation.canceled self)
|
||||
|
||||
let on_result_ignore (self : _ t) f : unit =
|
||||
if Picos.Computation.is_running self.st then (
|
||||
if Picos.Computation.is_running self then (
|
||||
let trigger =
|
||||
(Trigger.from_action f self on_result_ignore_cb_ [@alert "-handler"])
|
||||
in
|
||||
if not (C.try_attach self.st trigger) then on_result_ignore_cb_ () f self
|
||||
if not (C.try_attach self trigger) then on_result_ignore_cb_ () f self
|
||||
) else
|
||||
on_result_ignore_cb_ () f self
|
||||
|
||||
let[@inline] fulfill_idempotent self r =
|
||||
match r with
|
||||
| Ok x -> C.return self.st x
|
||||
| Error ebt -> C.cancel self.st (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||
| Ok x -> C.return self x
|
||||
| Error ebt -> C.cancel self (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||
|
||||
exception Already_fulfilled
|
||||
|
||||
let fulfill (self : _ t) (r : _ result) : unit =
|
||||
let ok =
|
||||
match r with
|
||||
| Ok x -> C.try_return self.st x
|
||||
| Error ebt -> C.try_cancel self.st (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||
| Ok x -> C.try_return self x
|
||||
| Error ebt -> C.try_cancel self (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||
in
|
||||
if not ok then raise Already_fulfilled
|
||||
|
||||
|
|
@ -104,10 +104,10 @@ let spawn ~on f : _ t =
|
|||
let task () =
|
||||
try
|
||||
let res = f () in
|
||||
C.return fut.st res
|
||||
C.return fut res
|
||||
with exn ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
C.cancel fut.st exn bt
|
||||
C.cancel fut exn bt
|
||||
in
|
||||
|
||||
Runner.run_async on task;
|
||||
|
|
@ -380,7 +380,7 @@ let for_list ~on l f : unit t =
|
|||
let push_queue_ _tr q () = Bb_queue.push q ()
|
||||
|
||||
let wait_block_exn (self : 'a t) : 'a =
|
||||
match C.peek_exn self.st with
|
||||
match C.peek_exn self with
|
||||
| x -> x (* fast path *)
|
||||
| exception C.Running ->
|
||||
let real_block () =
|
||||
|
|
@ -394,7 +394,7 @@ let wait_block_exn (self : 'a t) : 'a =
|
|||
assert attached;
|
||||
|
||||
(* blockingly wait for trigger if computation didn't complete in the mean time *)
|
||||
if C.try_attach self.st trigger then Bb_queue.pop q;
|
||||
if C.try_attach self trigger then Bb_queue.pop q;
|
||||
|
||||
(* trigger was signaled! computation must be done*)
|
||||
peek_or_assert_ self
|
||||
|
|
@ -406,7 +406,7 @@ let wait_block_exn (self : 'a t) : 'a =
|
|||
if i = 0 then
|
||||
real_block ()
|
||||
else (
|
||||
match C.peek_exn self.st with
|
||||
match C.peek_exn self with
|
||||
| x -> x
|
||||
| exception C.Running ->
|
||||
Domain_.relax ();
|
||||
|
|
@ -426,12 +426,12 @@ let wait_block self =
|
|||
|
||||
let await (self : 'a t) : 'a =
|
||||
(* fast path: peek *)
|
||||
match C.peek_exn self.st with
|
||||
match C.peek_exn self with
|
||||
| res -> res
|
||||
| exception C.Running ->
|
||||
let trigger = Trigger.create () in
|
||||
(* suspend until the future is resolved *)
|
||||
if C.try_attach self.st trigger then
|
||||
if C.try_attach self trigger then
|
||||
Option.iter Exn_bt.raise @@ Trigger.await trigger;
|
||||
|
||||
(* un-suspended: we should have a result! *)
|
||||
|
|
@ -453,5 +453,5 @@ module Infix_local = Infix [@@deprecated "use Infix"]
|
|||
|
||||
module Private_ = struct
|
||||
let[@inline] unsafe_promise_of_fut x = x
|
||||
let[@inline] as_computation self = self.st
|
||||
let[@inline] as_computation self = self
|
||||
end
|
||||
|
|
|
|||
|
|
@ -19,13 +19,13 @@
|
|||
|
||||
type 'a or_error = ('a, Exn_bt.t) result
|
||||
|
||||
type 'a t
|
||||
type 'a t = 'a Picos.Computation.t
|
||||
(** A future with a result of type ['a]. *)
|
||||
|
||||
type 'a promise = private 'a t
|
||||
(** A promise, which can be fulfilled exactly once to set
|
||||
the corresponding future.
|
||||
This is a private alias of ['a t] since NEXT_RELEASE, previously it was opaque. *)
|
||||
This is a private alias of ['a t] since 0.7, previously it was opaque. *)
|
||||
|
||||
val make : unit -> 'a t * 'a promise
|
||||
(** Make a new future with the associated promise. *)
|
||||
|
|
@ -38,7 +38,7 @@ val make_promise : unit -> 'a promise
|
|||
{[let prom = Fut.make_promise();;
|
||||
let fut = (prom : _ Fut.promise :> _ Fut.t) ;;
|
||||
]}
|
||||
@since NEXT_RELEASE *)
|
||||
@since 0.7 *)
|
||||
|
||||
val on_result : 'a t -> ('a or_error -> unit) -> unit
|
||||
(** [on_result fut f] registers [f] to be called in the future
|
||||
|
|
|
|||
|
|
@ -102,7 +102,24 @@ let with_handler ~ops:_ self f = f ()
|
|||
|
||||
[@@@endif]
|
||||
|
||||
let worker_loop (type st) ~(ops : st ops) (self : st) : unit =
|
||||
let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
|
||||
if block_signals then (
|
||||
try
|
||||
ignore
|
||||
(Unix.sigprocmask SIG_BLOCK
|
||||
[
|
||||
Sys.sigterm;
|
||||
Sys.sigpipe;
|
||||
Sys.sigint;
|
||||
Sys.sigchld;
|
||||
Sys.sigalrm;
|
||||
Sys.sigusr1;
|
||||
Sys.sigusr2;
|
||||
]
|
||||
: _ list)
|
||||
with _ -> ()
|
||||
);
|
||||
|
||||
let cur_fiber : fiber ref = ref _dummy_fiber in
|
||||
let runner = ops.runner self in
|
||||
TLS.set Runner.For_runner_implementors.k_cur_runner runner;
|
||||
|
|
|
|||
37
src/core/worker_loop_.mli
Normal file
37
src/core/worker_loop_.mli
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
(** Internal module that is used for workers.
|
||||
|
||||
A thread pool should use this [worker_loop] to run tasks,
|
||||
handle effects, etc. *)
|
||||
|
||||
open Types_
|
||||
|
||||
type task_full =
|
||||
| T_start of {
|
||||
fiber: fiber;
|
||||
f: unit -> unit;
|
||||
}
|
||||
| T_resume : {
|
||||
fiber: fiber;
|
||||
k: unit -> unit;
|
||||
}
|
||||
-> task_full
|
||||
|
||||
val _dummy_task : task_full
|
||||
|
||||
type around_task =
|
||||
| AT_pair : (Runner.t -> 'a) * (Runner.t -> 'a -> unit) -> around_task
|
||||
|
||||
exception No_more_tasks
|
||||
|
||||
type 'st ops = {
|
||||
schedule: 'st -> task_full -> unit;
|
||||
get_next_task: 'st -> task_full;
|
||||
get_thread_state: unit -> 'st;
|
||||
around_task: 'st -> around_task;
|
||||
on_exn: 'st -> Exn_bt.t -> unit;
|
||||
runner: 'st -> Runner.t;
|
||||
before_start: 'st -> unit;
|
||||
cleanup: 'st -> unit;
|
||||
}
|
||||
|
||||
val worker_loop : block_signals:bool -> ops:'st ops -> 'st -> unit
|
||||
|
|
@ -310,7 +310,9 @@ let create ?(on_init_thread = default_thread_init_exit_)
|
|||
(* function called in domain with index [i], to
|
||||
create the thread and push it into [receive_threads] *)
|
||||
let create_thread_in_domain () =
|
||||
let thread = Thread.create (WL.worker_loop ~ops:worker_ops) st in
|
||||
let thread =
|
||||
Thread.create (WL.worker_loop ~block_signals:true ~ops:worker_ops) st
|
||||
in
|
||||
(* send the thread from the domain back to us *)
|
||||
Bb_queue.push receive_threads (idx, thread)
|
||||
in
|
||||
|
|
|
|||
|
|
@ -147,7 +147,7 @@ val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit
|
|||
(** [spawn_ignore f] is [ignore (spawn f)].
|
||||
The fiber will still affect termination of the parent, ie. the
|
||||
parent will exit only after this new fiber exits.
|
||||
@param on the optional runner to use, added since NEXT_RELEASE *)
|
||||
@param on the optional runner to use, added since 0.7 *)
|
||||
|
||||
val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit
|
||||
(** Like {!spawn_top} but ignores the result.
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
exception Oh_no of Exn_bt.t
|
||||
|
||||
let main (f : Runner.t -> 'a) : 'a =
|
||||
let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
|
||||
let worker_st =
|
||||
Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ())
|
||||
~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt)))
|
||||
|
|
@ -13,6 +13,7 @@ let main (f : Runner.t -> 'a) : 'a =
|
|||
|
||||
(* run the main thread *)
|
||||
Moonpool.Private.Worker_loop_.worker_loop worker_st
|
||||
~block_signals (* do not disturb existing thread *)
|
||||
~ops:Fifo_pool.Private_.worker_ops;
|
||||
|
||||
match Fiber.peek fiber with
|
||||
|
|
@ -20,3 +21,6 @@ let main (f : Runner.t -> 'a) : 'a =
|
|||
| Some (Error ebt) -> Exn_bt.raise ebt
|
||||
| None -> assert false
|
||||
with Oh_no ebt -> Exn_bt.raise ebt
|
||||
|
||||
let main f =
|
||||
main' () f ~block_signals:false (* do not disturb existing thread *)
|
||||
|
|
|
|||
|
|
@ -23,3 +23,7 @@ val main : (Moonpool.Runner.t -> 'a) -> 'a
|
|||
(** [main f] runs [f()] in a scope that handles effects, including {!Fiber.await}.
|
||||
|
||||
This scope can run background tasks as well, in a cooperative fashion. *)
|
||||
|
||||
val main' : ?block_signals:bool -> unit -> (Moonpool.Runner.t -> 'a) -> 'a
|
||||
(** Same as {!main} but with room for optional arguments.
|
||||
@since 0.7 *)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue