mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-16 07:46:11 -05:00
Compare commits
10 commits
d9254717ca
...
e17d39637f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e17d39637f | ||
|
|
ed0eda226c | ||
|
|
2b00a0cea1 | ||
|
|
3a5eaaa44d | ||
|
|
f0ea8c294d | ||
|
|
dd88008a0a | ||
|
|
c51a0a6bd4 | ||
|
|
deb96302e1 | ||
|
|
a20208ec37 | ||
|
|
389f237993 |
18 changed files with 137 additions and 50 deletions
2
.github/workflows/gh-pages.yml
vendored
2
.github/workflows/gh-pages.yml
vendored
|
|
@ -13,7 +13,7 @@ jobs:
|
||||||
- uses: actions/checkout@main
|
- uses: actions/checkout@main
|
||||||
|
|
||||||
- name: Use OCaml
|
- name: Use OCaml
|
||||||
uses: ocaml/setup-ocaml@v2
|
uses: ocaml/setup-ocaml@v3
|
||||||
with:
|
with:
|
||||||
ocaml-compiler: '5.0'
|
ocaml-compiler: '5.0'
|
||||||
dune-cache: true
|
dune-cache: true
|
||||||
|
|
|
||||||
6
.github/workflows/main.yml
vendored
6
.github/workflows/main.yml
vendored
|
|
@ -23,7 +23,7 @@ jobs:
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@main
|
- uses: actions/checkout@main
|
||||||
- name: Use OCaml ${{ matrix.ocaml-compiler }}
|
- name: Use OCaml ${{ matrix.ocaml-compiler }}
|
||||||
uses: ocaml/setup-ocaml@v2
|
uses: ocaml/setup-ocaml@v3
|
||||||
with:
|
with:
|
||||||
ocaml-compiler: ${{ matrix.ocaml-compiler }}
|
ocaml-compiler: ${{ matrix.ocaml-compiler }}
|
||||||
dune-cache: true
|
dune-cache: true
|
||||||
|
|
@ -58,7 +58,7 @@ jobs:
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@main
|
- uses: actions/checkout@main
|
||||||
- name: Use OCaml ${{ matrix.ocaml-compiler }}
|
- name: Use OCaml ${{ matrix.ocaml-compiler }}
|
||||||
uses: ocaml/setup-ocaml@v2
|
uses: ocaml/setup-ocaml@v3
|
||||||
with:
|
with:
|
||||||
ocaml-compiler: ${{ matrix.ocaml-compiler }}
|
ocaml-compiler: ${{ matrix.ocaml-compiler }}
|
||||||
dune-cache: true
|
dune-cache: true
|
||||||
|
|
@ -83,7 +83,7 @@ jobs:
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@main
|
- uses: actions/checkout@main
|
||||||
- name: Use OCaml ${{ matrix.ocaml-compiler }}
|
- name: Use OCaml ${{ matrix.ocaml-compiler }}
|
||||||
uses: ocaml/setup-ocaml@v2
|
uses: ocaml/setup-ocaml@v3
|
||||||
with:
|
with:
|
||||||
ocaml-compiler: ${{ matrix.ocaml-compiler }}
|
ocaml-compiler: ${{ matrix.ocaml-compiler }}
|
||||||
dune-cache: true
|
dune-cache: true
|
||||||
|
|
|
||||||
13
CHANGES.md
13
CHANGES.md
|
|
@ -1,4 +1,17 @@
|
||||||
|
|
||||||
|
# 0.8
|
||||||
|
|
||||||
|
- api(fut): make alias `'a Fut.t = 'a Picos.Computation.t` public
|
||||||
|
- feat: add `Fut.make_promise`, have `'a promise = private 'a fut`
|
||||||
|
- feat(exn_bt): in show/pp, do print the backtrace when present
|
||||||
|
- feat: block signals in workers if asked to
|
||||||
|
- relax bound on picos to 0.5-0.6
|
||||||
|
- feat fib: `spawn_ignore` now has `?on` optional param
|
||||||
|
- change Moonpool.Chan so it's bounded (stil experimental)
|
||||||
|
|
||||||
|
- fix task local storage: type was too specific
|
||||||
|
- fix fiber: use a single fut/computation in fibers
|
||||||
|
|
||||||
# 0.7
|
# 0.7
|
||||||
|
|
||||||
- add `Moonpool_fiber.spawn_top_ignore`
|
- add `Moonpool_fiber.spawn_top_ignore`
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@
|
||||||
(using mdx 0.2)
|
(using mdx 0.2)
|
||||||
|
|
||||||
(name moonpool)
|
(name moonpool)
|
||||||
(version 0.7)
|
(version 0.8)
|
||||||
(generate_opam_files true)
|
(generate_opam_files true)
|
||||||
(source
|
(source
|
||||||
(github c-cube/moonpool))
|
(github c-cube/moonpool))
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
# This file is generated by dune, edit dune-project instead
|
# This file is generated by dune, edit dune-project instead
|
||||||
opam-version: "2.0"
|
opam-version: "2.0"
|
||||||
version: "0.7"
|
version: "0.8"
|
||||||
synopsis: "Async IO for moonpool, relying on picos (experimental)"
|
synopsis: "Async IO for moonpool, relying on picos (experimental)"
|
||||||
maintainer: ["Simon Cruanes"]
|
maintainer: ["Simon Cruanes"]
|
||||||
authors: ["Simon Cruanes"]
|
authors: ["Simon Cruanes"]
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
# This file is generated by dune, edit dune-project instead
|
# This file is generated by dune, edit dune-project instead
|
||||||
opam-version: "2.0"
|
opam-version: "2.0"
|
||||||
version: "0.7"
|
version: "0.8"
|
||||||
synopsis: "Event loop for moonpool based on Lwt-engine (experimental)"
|
synopsis: "Event loop for moonpool based on Lwt-engine (experimental)"
|
||||||
maintainer: ["Simon Cruanes"]
|
maintainer: ["Simon Cruanes"]
|
||||||
authors: ["Simon Cruanes"]
|
authors: ["Simon Cruanes"]
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
# This file is generated by dune, edit dune-project instead
|
# This file is generated by dune, edit dune-project instead
|
||||||
opam-version: "2.0"
|
opam-version: "2.0"
|
||||||
version: "0.7"
|
version: "0.8"
|
||||||
synopsis: "Pools of threads supported by a pool of domains"
|
synopsis: "Pools of threads supported by a pool of domains"
|
||||||
maintainer: ["Simon Cruanes"]
|
maintainer: ["Simon Cruanes"]
|
||||||
authors: ["Simon Cruanes"]
|
authors: ["Simon Cruanes"]
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@
|
||||||
The channels have bounded size. Push/pop return futures or can use effects
|
The channels have bounded size. Push/pop return futures or can use effects
|
||||||
to provide an [await]-friendly version.
|
to provide an [await]-friendly version.
|
||||||
|
|
||||||
The channels became bounded since @NEXT_RELEASE .
|
The channels became bounded since @0.7 .
|
||||||
*)
|
*)
|
||||||
|
|
||||||
type 'a t
|
type 'a t
|
||||||
|
|
@ -35,13 +35,13 @@ val push : 'a t -> 'a -> unit
|
||||||
(** Push the value into the channel, suspending the current task
|
(** Push the value into the channel, suspending the current task
|
||||||
if the channel is currently full.
|
if the channel is currently full.
|
||||||
@raise Closed if the channel is closed
|
@raise Closed if the channel is closed
|
||||||
@since NEXT_RELEASE *)
|
@since 0.7 *)
|
||||||
|
|
||||||
val pop : 'a t -> 'a
|
val pop : 'a t -> 'a
|
||||||
(** Pop an element. This might suspend the current task if the
|
(** Pop an element. This might suspend the current task if the
|
||||||
channel is currently empty.
|
channel is currently empty.
|
||||||
@raise Closed if the channel is empty and closed.
|
@raise Closed if the channel is empty and closed.
|
||||||
@since NEXT_RELEASE *)
|
@since 0.7 *)
|
||||||
|
|
||||||
(*
|
(*
|
||||||
val pop_block_exn : 'a t -> 'a
|
val pop_block_exn : 'a t -> 'a
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,15 @@ type t = exn * Printexc.raw_backtrace
|
||||||
let[@inline] make exn bt : t = exn, bt
|
let[@inline] make exn bt : t = exn, bt
|
||||||
let[@inline] exn (e, _) = e
|
let[@inline] exn (e, _) = e
|
||||||
let[@inline] bt (_, bt) = bt
|
let[@inline] bt (_, bt) = bt
|
||||||
let show self = Printexc.to_string (exn self)
|
|
||||||
|
let show self =
|
||||||
|
let bt = Printexc.raw_backtrace_to_string (bt self) in
|
||||||
|
let exn = Printexc.to_string (exn self) in
|
||||||
|
if bt = "" then
|
||||||
|
exn
|
||||||
|
else
|
||||||
|
Printf.sprintf "%s\n%s" exn bt
|
||||||
|
|
||||||
let pp out self = Format.pp_print_string out (show self)
|
let pp out self = Format.pp_print_string out (show self)
|
||||||
let[@inline] raise (e, bt) = Printexc.raise_with_backtrace e bt
|
let[@inline] raise (e, bt) = Printexc.raise_with_backtrace e bt
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -165,7 +165,9 @@ let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
|
||||||
create the thread and push it into [receive_threads] *)
|
create the thread and push it into [receive_threads] *)
|
||||||
let create_thread_in_domain () =
|
let create_thread_in_domain () =
|
||||||
let st = { idx = i; dom_idx; st = pool } in
|
let st = { idx = i; dom_idx; st = pool } in
|
||||||
let thread = Thread.create (WL.worker_loop ~ops:worker_ops) st in
|
let thread =
|
||||||
|
Thread.create (WL.worker_loop ~block_signals:true ~ops:worker_ops) st
|
||||||
|
in
|
||||||
(* send the thread from the domain back to us *)
|
(* send the thread from the domain back to us *)
|
||||||
Bb_queue.push receive_threads (i, thread)
|
Bb_queue.push receive_threads (i, thread)
|
||||||
in
|
in
|
||||||
|
|
|
||||||
|
|
@ -3,23 +3,23 @@ module C = Picos.Computation
|
||||||
|
|
||||||
type 'a or_error = ('a, Exn_bt.t) result
|
type 'a or_error = ('a, Exn_bt.t) result
|
||||||
type 'a waiter = 'a or_error -> unit
|
type 'a waiter = 'a or_error -> unit
|
||||||
type 'a t = { st: 'a C.t } [@@unboxed]
|
type 'a t = 'a C.t
|
||||||
type 'a promise = 'a t
|
type 'a promise = 'a t
|
||||||
|
|
||||||
let[@inline] make_promise () : _ t =
|
let[@inline] make_promise () : _ t =
|
||||||
let fut = { st = C.create ~mode:`LIFO () } in
|
let fut = C.create ~mode:`LIFO () in
|
||||||
fut
|
fut
|
||||||
|
|
||||||
let make () =
|
let make () =
|
||||||
let fut = make_promise () in
|
let fut = make_promise () in
|
||||||
fut, fut
|
fut, fut
|
||||||
|
|
||||||
let[@inline] return x : _ t = { st = C.returned x }
|
let[@inline] return x : _ t = C.returned x
|
||||||
|
|
||||||
let[@inline] fail exn bt : _ t =
|
let[@inline] fail exn bt : _ t =
|
||||||
let st = C.create () in
|
let fut = C.create () in
|
||||||
C.cancel st exn bt;
|
C.cancel fut exn bt;
|
||||||
{ st }
|
fut
|
||||||
|
|
||||||
let[@inline] fail_exn_bt ebt = fail (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
let[@inline] fail_exn_bt ebt = fail (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||||
|
|
||||||
|
|
@ -27,32 +27,32 @@ let[@inline] of_result = function
|
||||||
| Ok x -> return x
|
| Ok x -> return x
|
||||||
| Error ebt -> fail_exn_bt ebt
|
| Error ebt -> fail_exn_bt ebt
|
||||||
|
|
||||||
let[@inline] is_resolved self : bool = not (C.is_running self.st)
|
let[@inline] is_resolved self : bool = not (C.is_running self)
|
||||||
let is_done = is_resolved
|
let is_done = is_resolved
|
||||||
let[@inline] peek self : _ option = C.peek self.st
|
let peek : 'a t -> _ option = C.peek
|
||||||
let[@inline] raise_if_failed self : unit = C.check self.st
|
let raise_if_failed : _ t -> unit = C.check
|
||||||
|
|
||||||
let[@inline] is_success self =
|
let[@inline] is_success self =
|
||||||
match C.peek_exn self.st with
|
match C.peek_exn self with
|
||||||
| _ -> true
|
| _ -> true
|
||||||
| exception _ -> false
|
| exception _ -> false
|
||||||
|
|
||||||
let[@inline] is_failed self = C.is_canceled self.st
|
let is_failed : _ t -> bool = C.is_canceled
|
||||||
|
|
||||||
exception Not_ready
|
exception Not_ready
|
||||||
|
|
||||||
let[@inline] get_or_fail self =
|
let[@inline] get_or_fail self =
|
||||||
match C.peek self.st with
|
match C.peek self with
|
||||||
| Some x -> x
|
| Some x -> x
|
||||||
| None -> raise Not_ready
|
| None -> raise Not_ready
|
||||||
|
|
||||||
let[@inline] get_or_fail_exn self =
|
let[@inline] get_or_fail_exn self =
|
||||||
match C.peek_exn self.st with
|
match C.peek_exn self with
|
||||||
| x -> x
|
| x -> x
|
||||||
| exception C.Running -> raise Not_ready
|
| exception C.Running -> raise Not_ready
|
||||||
|
|
||||||
let[@inline] peek_or_assert_ (self : 'a t) : 'a =
|
let[@inline] peek_or_assert_ (self : 'a t) : 'a =
|
||||||
match C.peek_exn self.st with
|
match C.peek_exn self with
|
||||||
| x -> x
|
| x -> x
|
||||||
| exception C.Running -> assert false
|
| exception C.Running -> assert false
|
||||||
|
|
||||||
|
|
@ -67,32 +67,32 @@ let on_result (self : _ t) (f : _ waiter) : unit =
|
||||||
let trigger =
|
let trigger =
|
||||||
(Trigger.from_action f self on_result_cb_ [@alert "-handler"])
|
(Trigger.from_action f self on_result_cb_ [@alert "-handler"])
|
||||||
in
|
in
|
||||||
if not (C.try_attach self.st trigger) then on_result_cb_ () f self
|
if not (C.try_attach self trigger) then on_result_cb_ () f self
|
||||||
|
|
||||||
let on_result_ignore_cb_ _tr f (self : _ t) =
|
let on_result_ignore_cb_ _tr f (self : _ t) =
|
||||||
f (Picos.Computation.canceled self.st)
|
f (Picos.Computation.canceled self)
|
||||||
|
|
||||||
let on_result_ignore (self : _ t) f : unit =
|
let on_result_ignore (self : _ t) f : unit =
|
||||||
if Picos.Computation.is_running self.st then (
|
if Picos.Computation.is_running self then (
|
||||||
let trigger =
|
let trigger =
|
||||||
(Trigger.from_action f self on_result_ignore_cb_ [@alert "-handler"])
|
(Trigger.from_action f self on_result_ignore_cb_ [@alert "-handler"])
|
||||||
in
|
in
|
||||||
if not (C.try_attach self.st trigger) then on_result_ignore_cb_ () f self
|
if not (C.try_attach self trigger) then on_result_ignore_cb_ () f self
|
||||||
) else
|
) else
|
||||||
on_result_ignore_cb_ () f self
|
on_result_ignore_cb_ () f self
|
||||||
|
|
||||||
let[@inline] fulfill_idempotent self r =
|
let[@inline] fulfill_idempotent self r =
|
||||||
match r with
|
match r with
|
||||||
| Ok x -> C.return self.st x
|
| Ok x -> C.return self x
|
||||||
| Error ebt -> C.cancel self.st (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
| Error ebt -> C.cancel self (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||||
|
|
||||||
exception Already_fulfilled
|
exception Already_fulfilled
|
||||||
|
|
||||||
let fulfill (self : _ t) (r : _ result) : unit =
|
let fulfill (self : _ t) (r : _ result) : unit =
|
||||||
let ok =
|
let ok =
|
||||||
match r with
|
match r with
|
||||||
| Ok x -> C.try_return self.st x
|
| Ok x -> C.try_return self x
|
||||||
| Error ebt -> C.try_cancel self.st (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
| Error ebt -> C.try_cancel self (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||||
in
|
in
|
||||||
if not ok then raise Already_fulfilled
|
if not ok then raise Already_fulfilled
|
||||||
|
|
||||||
|
|
@ -104,10 +104,10 @@ let spawn ~on f : _ t =
|
||||||
let task () =
|
let task () =
|
||||||
try
|
try
|
||||||
let res = f () in
|
let res = f () in
|
||||||
C.return fut.st res
|
C.return fut res
|
||||||
with exn ->
|
with exn ->
|
||||||
let bt = Printexc.get_raw_backtrace () in
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
C.cancel fut.st exn bt
|
C.cancel fut exn bt
|
||||||
in
|
in
|
||||||
|
|
||||||
Runner.run_async on task;
|
Runner.run_async on task;
|
||||||
|
|
@ -380,7 +380,7 @@ let for_list ~on l f : unit t =
|
||||||
let push_queue_ _tr q () = Bb_queue.push q ()
|
let push_queue_ _tr q () = Bb_queue.push q ()
|
||||||
|
|
||||||
let wait_block_exn (self : 'a t) : 'a =
|
let wait_block_exn (self : 'a t) : 'a =
|
||||||
match C.peek_exn self.st with
|
match C.peek_exn self with
|
||||||
| x -> x (* fast path *)
|
| x -> x (* fast path *)
|
||||||
| exception C.Running ->
|
| exception C.Running ->
|
||||||
let real_block () =
|
let real_block () =
|
||||||
|
|
@ -394,7 +394,7 @@ let wait_block_exn (self : 'a t) : 'a =
|
||||||
assert attached;
|
assert attached;
|
||||||
|
|
||||||
(* blockingly wait for trigger if computation didn't complete in the mean time *)
|
(* blockingly wait for trigger if computation didn't complete in the mean time *)
|
||||||
if C.try_attach self.st trigger then Bb_queue.pop q;
|
if C.try_attach self trigger then Bb_queue.pop q;
|
||||||
|
|
||||||
(* trigger was signaled! computation must be done*)
|
(* trigger was signaled! computation must be done*)
|
||||||
peek_or_assert_ self
|
peek_or_assert_ self
|
||||||
|
|
@ -406,7 +406,7 @@ let wait_block_exn (self : 'a t) : 'a =
|
||||||
if i = 0 then
|
if i = 0 then
|
||||||
real_block ()
|
real_block ()
|
||||||
else (
|
else (
|
||||||
match C.peek_exn self.st with
|
match C.peek_exn self with
|
||||||
| x -> x
|
| x -> x
|
||||||
| exception C.Running ->
|
| exception C.Running ->
|
||||||
Domain_.relax ();
|
Domain_.relax ();
|
||||||
|
|
@ -426,12 +426,12 @@ let wait_block self =
|
||||||
|
|
||||||
let await (self : 'a t) : 'a =
|
let await (self : 'a t) : 'a =
|
||||||
(* fast path: peek *)
|
(* fast path: peek *)
|
||||||
match C.peek_exn self.st with
|
match C.peek_exn self with
|
||||||
| res -> res
|
| res -> res
|
||||||
| exception C.Running ->
|
| exception C.Running ->
|
||||||
let trigger = Trigger.create () in
|
let trigger = Trigger.create () in
|
||||||
(* suspend until the future is resolved *)
|
(* suspend until the future is resolved *)
|
||||||
if C.try_attach self.st trigger then
|
if C.try_attach self trigger then
|
||||||
Option.iter Exn_bt.raise @@ Trigger.await trigger;
|
Option.iter Exn_bt.raise @@ Trigger.await trigger;
|
||||||
|
|
||||||
(* un-suspended: we should have a result! *)
|
(* un-suspended: we should have a result! *)
|
||||||
|
|
@ -453,5 +453,5 @@ module Infix_local = Infix [@@deprecated "use Infix"]
|
||||||
|
|
||||||
module Private_ = struct
|
module Private_ = struct
|
||||||
let[@inline] unsafe_promise_of_fut x = x
|
let[@inline] unsafe_promise_of_fut x = x
|
||||||
let[@inline] as_computation self = self.st
|
let[@inline] as_computation self = self
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -19,13 +19,13 @@
|
||||||
|
|
||||||
type 'a or_error = ('a, Exn_bt.t) result
|
type 'a or_error = ('a, Exn_bt.t) result
|
||||||
|
|
||||||
type 'a t
|
type 'a t = 'a Picos.Computation.t
|
||||||
(** A future with a result of type ['a]. *)
|
(** A future with a result of type ['a]. *)
|
||||||
|
|
||||||
type 'a promise = private 'a t
|
type 'a promise = private 'a t
|
||||||
(** A promise, which can be fulfilled exactly once to set
|
(** A promise, which can be fulfilled exactly once to set
|
||||||
the corresponding future.
|
the corresponding future.
|
||||||
This is a private alias of ['a t] since NEXT_RELEASE, previously it was opaque. *)
|
This is a private alias of ['a t] since 0.7, previously it was opaque. *)
|
||||||
|
|
||||||
val make : unit -> 'a t * 'a promise
|
val make : unit -> 'a t * 'a promise
|
||||||
(** Make a new future with the associated promise. *)
|
(** Make a new future with the associated promise. *)
|
||||||
|
|
@ -38,7 +38,7 @@ val make_promise : unit -> 'a promise
|
||||||
{[let prom = Fut.make_promise();;
|
{[let prom = Fut.make_promise();;
|
||||||
let fut = (prom : _ Fut.promise :> _ Fut.t) ;;
|
let fut = (prom : _ Fut.promise :> _ Fut.t) ;;
|
||||||
]}
|
]}
|
||||||
@since NEXT_RELEASE *)
|
@since 0.7 *)
|
||||||
|
|
||||||
val on_result : 'a t -> ('a or_error -> unit) -> unit
|
val on_result : 'a t -> ('a or_error -> unit) -> unit
|
||||||
(** [on_result fut f] registers [f] to be called in the future
|
(** [on_result fut f] registers [f] to be called in the future
|
||||||
|
|
|
||||||
|
|
@ -102,7 +102,24 @@ let with_handler ~ops:_ self f = f ()
|
||||||
|
|
||||||
[@@@endif]
|
[@@@endif]
|
||||||
|
|
||||||
let worker_loop (type st) ~(ops : st ops) (self : st) : unit =
|
let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
|
||||||
|
if block_signals then (
|
||||||
|
try
|
||||||
|
ignore
|
||||||
|
(Unix.sigprocmask SIG_BLOCK
|
||||||
|
[
|
||||||
|
Sys.sigterm;
|
||||||
|
Sys.sigpipe;
|
||||||
|
Sys.sigint;
|
||||||
|
Sys.sigchld;
|
||||||
|
Sys.sigalrm;
|
||||||
|
Sys.sigusr1;
|
||||||
|
Sys.sigusr2;
|
||||||
|
]
|
||||||
|
: _ list)
|
||||||
|
with _ -> ()
|
||||||
|
);
|
||||||
|
|
||||||
let cur_fiber : fiber ref = ref _dummy_fiber in
|
let cur_fiber : fiber ref = ref _dummy_fiber in
|
||||||
let runner = ops.runner self in
|
let runner = ops.runner self in
|
||||||
TLS.set Runner.For_runner_implementors.k_cur_runner runner;
|
TLS.set Runner.For_runner_implementors.k_cur_runner runner;
|
||||||
|
|
|
||||||
37
src/core/worker_loop_.mli
Normal file
37
src/core/worker_loop_.mli
Normal file
|
|
@ -0,0 +1,37 @@
|
||||||
|
(** Internal module that is used for workers.
|
||||||
|
|
||||||
|
A thread pool should use this [worker_loop] to run tasks,
|
||||||
|
handle effects, etc. *)
|
||||||
|
|
||||||
|
open Types_
|
||||||
|
|
||||||
|
type task_full =
|
||||||
|
| T_start of {
|
||||||
|
fiber: fiber;
|
||||||
|
f: unit -> unit;
|
||||||
|
}
|
||||||
|
| T_resume : {
|
||||||
|
fiber: fiber;
|
||||||
|
k: unit -> unit;
|
||||||
|
}
|
||||||
|
-> task_full
|
||||||
|
|
||||||
|
val _dummy_task : task_full
|
||||||
|
|
||||||
|
type around_task =
|
||||||
|
| AT_pair : (Runner.t -> 'a) * (Runner.t -> 'a -> unit) -> around_task
|
||||||
|
|
||||||
|
exception No_more_tasks
|
||||||
|
|
||||||
|
type 'st ops = {
|
||||||
|
schedule: 'st -> task_full -> unit;
|
||||||
|
get_next_task: 'st -> task_full;
|
||||||
|
get_thread_state: unit -> 'st;
|
||||||
|
around_task: 'st -> around_task;
|
||||||
|
on_exn: 'st -> Exn_bt.t -> unit;
|
||||||
|
runner: 'st -> Runner.t;
|
||||||
|
before_start: 'st -> unit;
|
||||||
|
cleanup: 'st -> unit;
|
||||||
|
}
|
||||||
|
|
||||||
|
val worker_loop : block_signals:bool -> ops:'st ops -> 'st -> unit
|
||||||
|
|
@ -310,7 +310,9 @@ let create ?(on_init_thread = default_thread_init_exit_)
|
||||||
(* function called in domain with index [i], to
|
(* function called in domain with index [i], to
|
||||||
create the thread and push it into [receive_threads] *)
|
create the thread and push it into [receive_threads] *)
|
||||||
let create_thread_in_domain () =
|
let create_thread_in_domain () =
|
||||||
let thread = Thread.create (WL.worker_loop ~ops:worker_ops) st in
|
let thread =
|
||||||
|
Thread.create (WL.worker_loop ~block_signals:true ~ops:worker_ops) st
|
||||||
|
in
|
||||||
(* send the thread from the domain back to us *)
|
(* send the thread from the domain back to us *)
|
||||||
Bb_queue.push receive_threads (idx, thread)
|
Bb_queue.push receive_threads (idx, thread)
|
||||||
in
|
in
|
||||||
|
|
|
||||||
|
|
@ -147,7 +147,7 @@ val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit
|
||||||
(** [spawn_ignore f] is [ignore (spawn f)].
|
(** [spawn_ignore f] is [ignore (spawn f)].
|
||||||
The fiber will still affect termination of the parent, ie. the
|
The fiber will still affect termination of the parent, ie. the
|
||||||
parent will exit only after this new fiber exits.
|
parent will exit only after this new fiber exits.
|
||||||
@param on the optional runner to use, added since NEXT_RELEASE *)
|
@param on the optional runner to use, added since 0.7 *)
|
||||||
|
|
||||||
val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit
|
val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit
|
||||||
(** Like {!spawn_top} but ignores the result.
|
(** Like {!spawn_top} but ignores the result.
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
exception Oh_no of Exn_bt.t
|
exception Oh_no of Exn_bt.t
|
||||||
|
|
||||||
let main (f : Runner.t -> 'a) : 'a =
|
let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
|
||||||
let worker_st =
|
let worker_st =
|
||||||
Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ())
|
Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ())
|
||||||
~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt)))
|
~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt)))
|
||||||
|
|
@ -13,6 +13,7 @@ let main (f : Runner.t -> 'a) : 'a =
|
||||||
|
|
||||||
(* run the main thread *)
|
(* run the main thread *)
|
||||||
Moonpool.Private.Worker_loop_.worker_loop worker_st
|
Moonpool.Private.Worker_loop_.worker_loop worker_st
|
||||||
|
~block_signals (* do not disturb existing thread *)
|
||||||
~ops:Fifo_pool.Private_.worker_ops;
|
~ops:Fifo_pool.Private_.worker_ops;
|
||||||
|
|
||||||
match Fiber.peek fiber with
|
match Fiber.peek fiber with
|
||||||
|
|
@ -20,3 +21,6 @@ let main (f : Runner.t -> 'a) : 'a =
|
||||||
| Some (Error ebt) -> Exn_bt.raise ebt
|
| Some (Error ebt) -> Exn_bt.raise ebt
|
||||||
| None -> assert false
|
| None -> assert false
|
||||||
with Oh_no ebt -> Exn_bt.raise ebt
|
with Oh_no ebt -> Exn_bt.raise ebt
|
||||||
|
|
||||||
|
let main f =
|
||||||
|
main' () f ~block_signals:false (* do not disturb existing thread *)
|
||||||
|
|
|
||||||
|
|
@ -23,3 +23,7 @@ val main : (Moonpool.Runner.t -> 'a) -> 'a
|
||||||
(** [main f] runs [f()] in a scope that handles effects, including {!Fiber.await}.
|
(** [main f] runs [f()] in a scope that handles effects, including {!Fiber.await}.
|
||||||
|
|
||||||
This scope can run background tasks as well, in a cooperative fashion. *)
|
This scope can run background tasks as well, in a cooperative fashion. *)
|
||||||
|
|
||||||
|
val main' : ?block_signals:bool -> unit -> (Moonpool.Runner.t -> 'a) -> 'a
|
||||||
|
(** Same as {!main} but with room for optional arguments.
|
||||||
|
@since 0.7 *)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue