Compare commits

..

1 commit

Author SHA1 Message Date
Simon Cruanes
d9254717ca
Merge 01cdb66f1f into 06f3bdadb9 2025-01-18 16:17:30 +00:00
18 changed files with 50 additions and 137 deletions

View file

@ -13,7 +13,7 @@ jobs:
- uses: actions/checkout@main
- name: Use OCaml
uses: ocaml/setup-ocaml@v3
uses: ocaml/setup-ocaml@v2
with:
ocaml-compiler: '5.0'
dune-cache: true

View file

@ -23,7 +23,7 @@ jobs:
steps:
- uses: actions/checkout@main
- name: Use OCaml ${{ matrix.ocaml-compiler }}
uses: ocaml/setup-ocaml@v3
uses: ocaml/setup-ocaml@v2
with:
ocaml-compiler: ${{ matrix.ocaml-compiler }}
dune-cache: true
@ -58,7 +58,7 @@ jobs:
steps:
- uses: actions/checkout@main
- name: Use OCaml ${{ matrix.ocaml-compiler }}
uses: ocaml/setup-ocaml@v3
uses: ocaml/setup-ocaml@v2
with:
ocaml-compiler: ${{ matrix.ocaml-compiler }}
dune-cache: true
@ -83,7 +83,7 @@ jobs:
steps:
- uses: actions/checkout@main
- name: Use OCaml ${{ matrix.ocaml-compiler }}
uses: ocaml/setup-ocaml@v3
uses: ocaml/setup-ocaml@v2
with:
ocaml-compiler: ${{ matrix.ocaml-compiler }}
dune-cache: true

View file

@ -1,17 +1,4 @@
# 0.8
- api(fut): make alias `'a Fut.t = 'a Picos.Computation.t` public
- feat: add `Fut.make_promise`, have `'a promise = private 'a fut`
- feat(exn_bt): in show/pp, do print the backtrace when present
- feat: block signals in workers if asked to
- relax bound on picos to 0.5-0.6
- feat fib: `spawn_ignore` now has `?on` optional param
- change Moonpool.Chan so it's bounded (stil experimental)
- fix task local storage: type was too specific
- fix fiber: use a single fut/computation in fibers
# 0.7
- add `Moonpool_fiber.spawn_top_ignore`

View file

@ -2,7 +2,7 @@
(using mdx 0.2)
(name moonpool)
(version 0.8)
(version 0.7)
(generate_opam_files true)
(source
(github c-cube/moonpool))

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.8"
version: "0.7"
synopsis: "Async IO for moonpool, relying on picos (experimental)"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.8"
version: "0.7"
synopsis: "Event loop for moonpool based on Lwt-engine (experimental)"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]

View file

@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.8"
version: "0.7"
synopsis: "Pools of threads supported by a pool of domains"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]

View file

@ -3,7 +3,7 @@
The channels have bounded size. Push/pop return futures or can use effects
to provide an [await]-friendly version.
The channels became bounded since @0.7 .
The channels became bounded since @NEXT_RELEASE .
*)
type 'a t
@ -35,13 +35,13 @@ val push : 'a t -> 'a -> unit
(** Push the value into the channel, suspending the current task
if the channel is currently full.
@raise Closed if the channel is closed
@since 0.7 *)
@since NEXT_RELEASE *)
val pop : 'a t -> 'a
(** Pop an element. This might suspend the current task if the
channel is currently empty.
@raise Closed if the channel is empty and closed.
@since 0.7 *)
@since NEXT_RELEASE *)
(*
val pop_block_exn : 'a t -> 'a

View file

@ -3,15 +3,7 @@ type t = exn * Printexc.raw_backtrace
let[@inline] make exn bt : t = exn, bt
let[@inline] exn (e, _) = e
let[@inline] bt (_, bt) = bt
let show self =
let bt = Printexc.raw_backtrace_to_string (bt self) in
let exn = Printexc.to_string (exn self) in
if bt = "" then
exn
else
Printf.sprintf "%s\n%s" exn bt
let show self = Printexc.to_string (exn self)
let pp out self = Format.pp_print_string out (show self)
let[@inline] raise (e, bt) = Printexc.raise_with_backtrace e bt

View file

@ -165,9 +165,7 @@ let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
create the thread and push it into [receive_threads] *)
let create_thread_in_domain () =
let st = { idx = i; dom_idx; st = pool } in
let thread =
Thread.create (WL.worker_loop ~block_signals:true ~ops:worker_ops) st
in
let thread = Thread.create (WL.worker_loop ~ops:worker_ops) st in
(* send the thread from the domain back to us *)
Bb_queue.push receive_threads (i, thread)
in

View file

@ -3,23 +3,23 @@ module C = Picos.Computation
type 'a or_error = ('a, Exn_bt.t) result
type 'a waiter = 'a or_error -> unit
type 'a t = 'a C.t
type 'a t = { st: 'a C.t } [@@unboxed]
type 'a promise = 'a t
let[@inline] make_promise () : _ t =
let fut = C.create ~mode:`LIFO () in
let fut = { st = C.create ~mode:`LIFO () } in
fut
let make () =
let fut = make_promise () in
fut, fut
let[@inline] return x : _ t = C.returned x
let[@inline] return x : _ t = { st = C.returned x }
let[@inline] fail exn bt : _ t =
let fut = C.create () in
C.cancel fut exn bt;
fut
let st = C.create () in
C.cancel st exn bt;
{ st }
let[@inline] fail_exn_bt ebt = fail (Exn_bt.exn ebt) (Exn_bt.bt ebt)
@ -27,32 +27,32 @@ let[@inline] of_result = function
| Ok x -> return x
| Error ebt -> fail_exn_bt ebt
let[@inline] is_resolved self : bool = not (C.is_running self)
let[@inline] is_resolved self : bool = not (C.is_running self.st)
let is_done = is_resolved
let peek : 'a t -> _ option = C.peek
let raise_if_failed : _ t -> unit = C.check
let[@inline] peek self : _ option = C.peek self.st
let[@inline] raise_if_failed self : unit = C.check self.st
let[@inline] is_success self =
match C.peek_exn self with
match C.peek_exn self.st with
| _ -> true
| exception _ -> false
let is_failed : _ t -> bool = C.is_canceled
let[@inline] is_failed self = C.is_canceled self.st
exception Not_ready
let[@inline] get_or_fail self =
match C.peek self with
match C.peek self.st with
| Some x -> x
| None -> raise Not_ready
let[@inline] get_or_fail_exn self =
match C.peek_exn self with
match C.peek_exn self.st with
| x -> x
| exception C.Running -> raise Not_ready
let[@inline] peek_or_assert_ (self : 'a t) : 'a =
match C.peek_exn self with
match C.peek_exn self.st with
| x -> x
| exception C.Running -> assert false
@ -67,32 +67,32 @@ let on_result (self : _ t) (f : _ waiter) : unit =
let trigger =
(Trigger.from_action f self on_result_cb_ [@alert "-handler"])
in
if not (C.try_attach self trigger) then on_result_cb_ () f self
if not (C.try_attach self.st trigger) then on_result_cb_ () f self
let on_result_ignore_cb_ _tr f (self : _ t) =
f (Picos.Computation.canceled self)
f (Picos.Computation.canceled self.st)
let on_result_ignore (self : _ t) f : unit =
if Picos.Computation.is_running self then (
if Picos.Computation.is_running self.st then (
let trigger =
(Trigger.from_action f self on_result_ignore_cb_ [@alert "-handler"])
in
if not (C.try_attach self trigger) then on_result_ignore_cb_ () f self
if not (C.try_attach self.st trigger) then on_result_ignore_cb_ () f self
) else
on_result_ignore_cb_ () f self
let[@inline] fulfill_idempotent self r =
match r with
| Ok x -> C.return self x
| Error ebt -> C.cancel self (Exn_bt.exn ebt) (Exn_bt.bt ebt)
| Ok x -> C.return self.st x
| Error ebt -> C.cancel self.st (Exn_bt.exn ebt) (Exn_bt.bt ebt)
exception Already_fulfilled
let fulfill (self : _ t) (r : _ result) : unit =
let ok =
match r with
| Ok x -> C.try_return self x
| Error ebt -> C.try_cancel self (Exn_bt.exn ebt) (Exn_bt.bt ebt)
| Ok x -> C.try_return self.st x
| Error ebt -> C.try_cancel self.st (Exn_bt.exn ebt) (Exn_bt.bt ebt)
in
if not ok then raise Already_fulfilled
@ -104,10 +104,10 @@ let spawn ~on f : _ t =
let task () =
try
let res = f () in
C.return fut res
C.return fut.st res
with exn ->
let bt = Printexc.get_raw_backtrace () in
C.cancel fut exn bt
C.cancel fut.st exn bt
in
Runner.run_async on task;
@ -380,7 +380,7 @@ let for_list ~on l f : unit t =
let push_queue_ _tr q () = Bb_queue.push q ()
let wait_block_exn (self : 'a t) : 'a =
match C.peek_exn self with
match C.peek_exn self.st with
| x -> x (* fast path *)
| exception C.Running ->
let real_block () =
@ -394,7 +394,7 @@ let wait_block_exn (self : 'a t) : 'a =
assert attached;
(* blockingly wait for trigger if computation didn't complete in the mean time *)
if C.try_attach self trigger then Bb_queue.pop q;
if C.try_attach self.st trigger then Bb_queue.pop q;
(* trigger was signaled! computation must be done*)
peek_or_assert_ self
@ -406,7 +406,7 @@ let wait_block_exn (self : 'a t) : 'a =
if i = 0 then
real_block ()
else (
match C.peek_exn self with
match C.peek_exn self.st with
| x -> x
| exception C.Running ->
Domain_.relax ();
@ -426,12 +426,12 @@ let wait_block self =
let await (self : 'a t) : 'a =
(* fast path: peek *)
match C.peek_exn self with
match C.peek_exn self.st with
| res -> res
| exception C.Running ->
let trigger = Trigger.create () in
(* suspend until the future is resolved *)
if C.try_attach self trigger then
if C.try_attach self.st trigger then
Option.iter Exn_bt.raise @@ Trigger.await trigger;
(* un-suspended: we should have a result! *)
@ -453,5 +453,5 @@ module Infix_local = Infix [@@deprecated "use Infix"]
module Private_ = struct
let[@inline] unsafe_promise_of_fut x = x
let[@inline] as_computation self = self
let[@inline] as_computation self = self.st
end

View file

@ -19,13 +19,13 @@
type 'a or_error = ('a, Exn_bt.t) result
type 'a t = 'a Picos.Computation.t
type 'a t
(** A future with a result of type ['a]. *)
type 'a promise = private 'a t
(** A promise, which can be fulfilled exactly once to set
the corresponding future.
This is a private alias of ['a t] since 0.7, previously it was opaque. *)
This is a private alias of ['a t] since NEXT_RELEASE, previously it was opaque. *)
val make : unit -> 'a t * 'a promise
(** Make a new future with the associated promise. *)
@ -38,7 +38,7 @@ val make_promise : unit -> 'a promise
{[let prom = Fut.make_promise();;
let fut = (prom : _ Fut.promise :> _ Fut.t) ;;
]}
@since 0.7 *)
@since NEXT_RELEASE *)
val on_result : 'a t -> ('a or_error -> unit) -> unit
(** [on_result fut f] registers [f] to be called in the future

View file

@ -102,24 +102,7 @@ let with_handler ~ops:_ self f = f ()
[@@@endif]
let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
if block_signals then (
try
ignore
(Unix.sigprocmask SIG_BLOCK
[
Sys.sigterm;
Sys.sigpipe;
Sys.sigint;
Sys.sigchld;
Sys.sigalrm;
Sys.sigusr1;
Sys.sigusr2;
]
: _ list)
with _ -> ()
);
let worker_loop (type st) ~(ops : st ops) (self : st) : unit =
let cur_fiber : fiber ref = ref _dummy_fiber in
let runner = ops.runner self in
TLS.set Runner.For_runner_implementors.k_cur_runner runner;

View file

@ -1,37 +0,0 @@
(** Internal module that is used for workers.
A thread pool should use this [worker_loop] to run tasks,
handle effects, etc. *)
open Types_
type task_full =
| T_start of {
fiber: fiber;
f: unit -> unit;
}
| T_resume : {
fiber: fiber;
k: unit -> unit;
}
-> task_full
val _dummy_task : task_full
type around_task =
| AT_pair : (Runner.t -> 'a) * (Runner.t -> 'a -> unit) -> around_task
exception No_more_tasks
type 'st ops = {
schedule: 'st -> task_full -> unit;
get_next_task: 'st -> task_full;
get_thread_state: unit -> 'st;
around_task: 'st -> around_task;
on_exn: 'st -> Exn_bt.t -> unit;
runner: 'st -> Runner.t;
before_start: 'st -> unit;
cleanup: 'st -> unit;
}
val worker_loop : block_signals:bool -> ops:'st ops -> 'st -> unit

View file

@ -310,9 +310,7 @@ let create ?(on_init_thread = default_thread_init_exit_)
(* function called in domain with index [i], to
create the thread and push it into [receive_threads] *)
let create_thread_in_domain () =
let thread =
Thread.create (WL.worker_loop ~block_signals:true ~ops:worker_ops) st
in
let thread = Thread.create (WL.worker_loop ~ops:worker_ops) st in
(* send the thread from the domain back to us *)
Bb_queue.push receive_threads (idx, thread)
in

View file

@ -147,7 +147,7 @@ val spawn_ignore : ?on:Runner.t -> ?protect:bool -> (unit -> _) -> unit
(** [spawn_ignore f] is [ignore (spawn f)].
The fiber will still affect termination of the parent, ie. the
parent will exit only after this new fiber exits.
@param on the optional runner to use, added since 0.7 *)
@param on the optional runner to use, added since NEXT_RELEASE *)
val spawn_top_ignore : on:Runner.t -> (unit -> _) -> unit
(** Like {!spawn_top} but ignores the result.

View file

@ -1,6 +1,6 @@
exception Oh_no of Exn_bt.t
let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
let main (f : Runner.t -> 'a) : 'a =
let worker_st =
Fifo_pool.Private_.create_single_threaded_state ~thread:(Thread.self ())
~on_exn:(fun e bt -> raise (Oh_no (Exn_bt.make e bt)))
@ -13,7 +13,6 @@ let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
(* run the main thread *)
Moonpool.Private.Worker_loop_.worker_loop worker_st
~block_signals (* do not disturb existing thread *)
~ops:Fifo_pool.Private_.worker_ops;
match Fiber.peek fiber with
@ -21,6 +20,3 @@ let main' ?(block_signals = false) () (f : Runner.t -> 'a) : 'a =
| Some (Error ebt) -> Exn_bt.raise ebt
| None -> assert false
with Oh_no ebt -> Exn_bt.raise ebt
let main f =
main' () f ~block_signals:false (* do not disturb existing thread *)

View file

@ -23,7 +23,3 @@ val main : (Moonpool.Runner.t -> 'a) -> 'a
(** [main f] runs [f()] in a scope that handles effects, including {!Fiber.await}.
This scope can run background tasks as well, in a cooperative fashion. *)
val main' : ?block_signals:bool -> unit -> (Moonpool.Runner.t -> 'a) -> 'a
(** Same as {!main} but with room for optional arguments.
@since 0.7 *)