mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 11:15:38 -05:00
update to merged picos PR
This commit is contained in:
parent
12978d424d
commit
3df7c8bef9
18 changed files with 58 additions and 49 deletions
|
|
@ -26,6 +26,7 @@
|
||||||
(odoc :with-doc)
|
(odoc :with-doc)
|
||||||
(hmap :with-test)
|
(hmap :with-test)
|
||||||
picos
|
picos
|
||||||
|
picos_sync
|
||||||
(mdx
|
(mdx
|
||||||
(and
|
(and
|
||||||
(>= 1.9.0)
|
(>= 1.9.0)
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ depends: [
|
||||||
"odoc" {with-doc}
|
"odoc" {with-doc}
|
||||||
"hmap" {with-test}
|
"hmap" {with-test}
|
||||||
"picos"
|
"picos"
|
||||||
|
"picos_sync"
|
||||||
"mdx" {>= "1.9.0" & with-test}
|
"mdx" {>= "1.9.0" & with-test}
|
||||||
]
|
]
|
||||||
depopts: [
|
depopts: [
|
||||||
|
|
|
||||||
|
|
@ -175,10 +175,7 @@ let close (self : _ t) : unit =
|
||||||
if A.compare_and_set self.st old_st St_closed then (
|
if A.compare_and_set self.st old_st St_closed then (
|
||||||
(* fail all waiters with [Closed]. *)
|
(* fail all waiters with [Closed]. *)
|
||||||
let bt = Printexc.get_callstack 10 in
|
let bt = Printexc.get_callstack 10 in
|
||||||
Q.iter
|
Q.iter (fun w -> Fut.fulfill_idempotent w (Error (Closed, bt))) ws;
|
||||||
(fun w ->
|
|
||||||
Fut.fulfill_idempotent w (Error { Exn_bt.exn = Closed; bt }))
|
|
||||||
ws;
|
|
||||||
false
|
false
|
||||||
) else
|
) else
|
||||||
true
|
true
|
||||||
|
|
|
||||||
22
src/core/exn_bt.ml
Normal file
22
src/core/exn_bt.ml
Normal file
|
|
@ -0,0 +1,22 @@
|
||||||
|
type t = exn * Printexc.raw_backtrace
|
||||||
|
|
||||||
|
let[@inline] make exn bt : t = exn, bt
|
||||||
|
let[@inline] exn (e, _) = e
|
||||||
|
let[@inline] bt (_, bt) = bt
|
||||||
|
let show self = Printexc.to_string (exn self)
|
||||||
|
let pp out self = Format.pp_print_string out (show self)
|
||||||
|
let[@inline] raise (e, bt) = Printexc.raise_with_backtrace e bt
|
||||||
|
|
||||||
|
let[@inline] get exn =
|
||||||
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
|
make exn bt
|
||||||
|
|
||||||
|
let[@inline] get_callstack n exn =
|
||||||
|
let bt = Printexc.get_callstack n in
|
||||||
|
make exn bt
|
||||||
|
|
||||||
|
type nonrec 'a result = ('a, t) result
|
||||||
|
|
||||||
|
let[@inline] unwrap = function
|
||||||
|
| Ok x -> x
|
||||||
|
| Error ebt -> raise ebt
|
||||||
|
|
@ -5,12 +5,14 @@
|
||||||
@since 0.6 *)
|
@since 0.6 *)
|
||||||
|
|
||||||
(** An exception bundled with a backtrace *)
|
(** An exception bundled with a backtrace *)
|
||||||
include module type of struct
|
|
||||||
include Exn_bt
|
type t = exn * Printexc.raw_backtrace
|
||||||
end
|
|
||||||
|
|
||||||
val exn : t -> exn
|
val exn : t -> exn
|
||||||
val bt : t -> Printexc.raw_backtrace
|
val bt : t -> Printexc.raw_backtrace
|
||||||
|
val raise : t -> 'a
|
||||||
|
val get : exn -> t
|
||||||
|
val get_callstack : int -> exn -> t
|
||||||
|
|
||||||
val make : exn -> Printexc.raw_backtrace -> t
|
val make : exn -> Printexc.raw_backtrace -> t
|
||||||
(** Trivial builder *)
|
(** Trivial builder *)
|
||||||
|
|
@ -97,7 +97,7 @@ let worker_ops : worker_state WL.ops =
|
||||||
let runner (st : worker_state) = st.st.as_runner in
|
let runner (st : worker_state) = st.st.as_runner in
|
||||||
let around_task st = st.st.around_task in
|
let around_task st = st.st.around_task in
|
||||||
let on_exn (st : worker_state) (ebt : Exn_bt.t) =
|
let on_exn (st : worker_state) (ebt : Exn_bt.t) =
|
||||||
st.st.on_exn ebt.exn ebt.bt
|
st.st.on_exn (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||||
in
|
in
|
||||||
{
|
{
|
||||||
WL.schedule = schedule_w;
|
WL.schedule = schedule_w;
|
||||||
|
|
|
||||||
|
|
@ -16,12 +16,12 @@ let make () =
|
||||||
|
|
||||||
let[@inline] return x : _ t = { st = C.returned x }
|
let[@inline] return x : _ t = { st = C.returned x }
|
||||||
|
|
||||||
let[@inline] fail_exn_bt ebt =
|
let[@inline] fail exn bt : _ t =
|
||||||
let st = C.create () in
|
let st = C.create () in
|
||||||
C.cancel st ebt;
|
C.cancel st exn bt;
|
||||||
{ st }
|
{ st }
|
||||||
|
|
||||||
let[@inline] fail exn bt : _ t = fail_exn_bt { Exn_bt.exn; bt }
|
let[@inline] fail_exn_bt ebt = fail (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||||
|
|
||||||
let[@inline] of_result = function
|
let[@inline] of_result = function
|
||||||
| Ok x -> return x
|
| Ok x -> return x
|
||||||
|
|
@ -84,7 +84,7 @@ let on_result_ignore (self : _ t) f : unit =
|
||||||
let[@inline] fulfill_idempotent self r =
|
let[@inline] fulfill_idempotent self r =
|
||||||
match r with
|
match r with
|
||||||
| Ok x -> C.return self.st x
|
| Ok x -> C.return self.st x
|
||||||
| Error ebt -> C.cancel self.st ebt
|
| Error ebt -> C.cancel self.st (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||||
|
|
||||||
exception Already_fulfilled
|
exception Already_fulfilled
|
||||||
|
|
||||||
|
|
@ -92,7 +92,7 @@ let fulfill (self : _ t) (r : _ result) : unit =
|
||||||
let ok =
|
let ok =
|
||||||
match r with
|
match r with
|
||||||
| Ok x -> C.try_return self.st x
|
| Ok x -> C.try_return self.st x
|
||||||
| Error ebt -> C.try_cancel self.st ebt
|
| Error ebt -> C.try_cancel self.st (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||||
in
|
in
|
||||||
if not ok then raise Already_fulfilled
|
if not ok then raise Already_fulfilled
|
||||||
|
|
||||||
|
|
@ -107,8 +107,7 @@ let spawn ~on f : _ t =
|
||||||
C.return fut.st res
|
C.return fut.st res
|
||||||
with exn ->
|
with exn ->
|
||||||
let bt = Printexc.get_raw_backtrace () in
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
let ebt = { Exn_bt.exn; bt } in
|
C.cancel fut.st exn bt
|
||||||
C.cancel fut.st ebt
|
|
||||||
in
|
in
|
||||||
|
|
||||||
Runner.run_async on task;
|
Runner.run_async on task;
|
||||||
|
|
@ -139,7 +138,7 @@ let map ?on ~f fut : _ t =
|
||||||
(try Ok (f x)
|
(try Ok (f x)
|
||||||
with exn ->
|
with exn ->
|
||||||
let bt = Printexc.get_raw_backtrace () in
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
Error { Exn_bt.exn; bt })
|
Error (Exn_bt.make exn bt))
|
||||||
| Error e_bt -> Error e_bt
|
| Error e_bt -> Error e_bt
|
||||||
in
|
in
|
||||||
|
|
||||||
|
|
@ -421,7 +420,7 @@ let wait_block self =
|
||||||
| x -> Ok x
|
| x -> Ok x
|
||||||
| exception exn ->
|
| exception exn ->
|
||||||
let bt = Printexc.get_raw_backtrace () in
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
Error { Exn_bt.exn; bt }
|
Error (Exn_bt.make exn bt)
|
||||||
|
|
||||||
[@@@ifge 5.0]
|
[@@@ifge 5.0]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ module Blocking_queue = Bb_queue
|
||||||
module Background_thread = Background_thread
|
module Background_thread = Background_thread
|
||||||
module Bounded_queue = Bounded_queue
|
module Bounded_queue = Bounded_queue
|
||||||
module Chan = Chan
|
module Chan = Chan
|
||||||
module Exn_bt = Moonpool_exn_bt
|
module Exn_bt = Exn_bt
|
||||||
module Fifo_pool = Fifo_pool
|
module Fifo_pool = Fifo_pool
|
||||||
module Fut = Fut
|
module Fut = Fut
|
||||||
module Lock = Lock
|
module Lock = Lock
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ module Immediate_runner : sig end
|
||||||
|
|
||||||
This is removed since 0.6, and replaced by {!Moonpool_fib.Main}. *)
|
This is removed since 0.6, and replaced by {!Moonpool_fib.Main}. *)
|
||||||
|
|
||||||
module Exn_bt = Moonpool_exn_bt
|
module Exn_bt = Exn_bt
|
||||||
|
|
||||||
exception Shutdown
|
exception Shutdown
|
||||||
(** Exception raised when trying to run tasks on
|
(** Exception raised when trying to run tasks on
|
||||||
|
|
|
||||||
|
|
@ -1,13 +0,0 @@
|
||||||
include Exn_bt
|
|
||||||
|
|
||||||
let[@inline] make exn bt : t = { exn; bt }
|
|
||||||
let[@inline] exn self = self.exn
|
|
||||||
let[@inline] bt self = self.bt
|
|
||||||
let show self = Printexc.to_string (exn self)
|
|
||||||
let pp out self = Format.pp_print_string out (show self)
|
|
||||||
|
|
||||||
type nonrec 'a result = ('a, t) result
|
|
||||||
|
|
||||||
let[@inline] unwrap = function
|
|
||||||
| Ok x -> x
|
|
||||||
| Error ebt -> raise ebt
|
|
||||||
|
|
@ -16,8 +16,7 @@ let k_cur_fiber : fiber TLS.t = TLS.create ()
|
||||||
|
|
||||||
let _dummy_computation : Picos.Computation.packed =
|
let _dummy_computation : Picos.Computation.packed =
|
||||||
let c = Picos.Computation.create () in
|
let c = Picos.Computation.create () in
|
||||||
Picos.Computation.cancel c
|
Picos.Computation.cancel c (Failure "dummy fiber") (Printexc.get_callstack 0);
|
||||||
{ exn = Failure "dummy fiber"; bt = Printexc.get_callstack 0 };
|
|
||||||
Picos.Computation.Packed c
|
Picos.Computation.Packed c
|
||||||
|
|
||||||
let _dummy_fiber = Picos.Fiber.create_packed ~forbid:true _dummy_computation
|
let _dummy_fiber = Picos.Fiber.create_packed ~forbid:true _dummy_computation
|
||||||
|
|
|
||||||
|
|
@ -207,7 +207,7 @@ let worker_ops : worker_state WL.ops =
|
||||||
let runner (st : worker_state) = st.st.as_runner in
|
let runner (st : worker_state) = st.st.as_runner in
|
||||||
let around_task st = st.st.around_task in
|
let around_task st = st.st.around_task in
|
||||||
let on_exn (st : worker_state) (ebt : Exn_bt.t) =
|
let on_exn (st : worker_state) (ebt : Exn_bt.t) =
|
||||||
st.st.on_exn ebt.exn ebt.bt
|
st.st.on_exn (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||||
in
|
in
|
||||||
{
|
{
|
||||||
WL.schedule = schedule_from_w;
|
WL.schedule = schedule_from_w;
|
||||||
|
|
|
||||||
|
|
@ -127,7 +127,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
|
||||||
Trigger.signal trigger
|
Trigger.signal trigger
|
||||||
| exception exn ->
|
| exception exn ->
|
||||||
let bt = Printexc.get_raw_backtrace () in
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
if Option.is_none (A.exchange failure (Some { Exn_bt.exn; bt })) then
|
if Option.is_none (A.exchange failure (Some (Exn_bt.make exn bt))) then
|
||||||
(* first one to fail, and [missing] must be >= 2
|
(* first one to fail, and [missing] must be >= 2
|
||||||
because we're not decreasing it. *)
|
because we're not decreasing it. *)
|
||||||
Trigger.signal trigger
|
Trigger.signal trigger
|
||||||
|
|
|
||||||
|
|
@ -95,7 +95,8 @@ let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
|
||||||
let lwt_fut, lwt_prom = Lwt.wait () in
|
let lwt_fut, lwt_prom = Lwt.wait () in
|
||||||
M.Fut.on_result fut (function
|
M.Fut.on_result fut (function
|
||||||
| Ok x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (lwt_prom, x)
|
| Ok x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (lwt_prom, x)
|
||||||
| Error { exn; _ } ->
|
| Error ebt ->
|
||||||
|
let exn = Exn_bt.exn ebt in
|
||||||
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (lwt_prom, exn));
|
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (lwt_prom, exn));
|
||||||
lwt_fut
|
lwt_fut
|
||||||
|
|
||||||
|
|
@ -108,7 +109,7 @@ let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
|
||||||
(fun x -> M.Fut.fulfill prom (Ok x))
|
(fun x -> M.Fut.fulfill prom (Ok x))
|
||||||
(fun exn ->
|
(fun exn ->
|
||||||
let bt = Printexc.get_callstack 10 in
|
let bt = Printexc.get_callstack 10 in
|
||||||
M.Fut.fulfill prom (Error { Exn_bt.exn; bt }));
|
M.Fut.fulfill prom (Error (Exn_bt.make exn bt)));
|
||||||
fut
|
fut
|
||||||
|
|
||||||
let _dummy_exn_bt : Exn_bt.t =
|
let _dummy_exn_bt : Exn_bt.t =
|
||||||
|
|
|
||||||
|
|
@ -2,4 +2,4 @@
|
||||||
(name moonpool_sync)
|
(name moonpool_sync)
|
||||||
(public_name moonpool.sync)
|
(public_name moonpool.sync)
|
||||||
(synopsis "Cooperative synchronization primitives for Moonpool")
|
(synopsis "Cooperative synchronization primitives for Moonpool")
|
||||||
(libraries moonpool picos picos.sync))
|
(libraries moonpool picos picos_std.sync picos_std.event))
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
module Mutex = Picos_sync.Mutex
|
module Mutex = Picos_std_sync.Mutex
|
||||||
|
|
||||||
type 'a t = {
|
type 'a t = {
|
||||||
mutex: Mutex.t;
|
mutex: Mutex.t;
|
||||||
|
|
|
||||||
|
|
@ -42,7 +42,7 @@ val update_map : 'a t -> ('a -> 'a * 'b) -> 'b
|
||||||
(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l]
|
(** [update_map l f] computes [x', y = f (get l)], then puts [x'] in [l]
|
||||||
and returns [y], while protected by the mutex. *)
|
and returns [y], while protected by the mutex. *)
|
||||||
|
|
||||||
val mutex : _ t -> Picos_sync.Mutex.t
|
val mutex : _ t -> Picos_std_sync.Mutex.t
|
||||||
(** Underlying mutex. *)
|
(** Underlying mutex. *)
|
||||||
|
|
||||||
val get : 'a t -> 'a
|
val get : 'a t -> 'a
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,9 @@
|
||||||
module Mutex = Picos_sync.Mutex
|
module Mutex = Picos_std_sync.Mutex
|
||||||
module Condition = Picos_sync.Condition
|
module Condition = Picos_std_sync.Condition
|
||||||
module Lock = Lock
|
module Lock = Lock
|
||||||
module Event = Picos_sync.Event
|
module Event = Picos_std_event
|
||||||
module Semaphore = Picos_sync.Semaphore
|
module Semaphore = Picos_std_sync.Semaphore
|
||||||
module Lazy = Picos_sync.Lazy
|
module Lazy = Picos_std_sync.Lazy
|
||||||
module Latch = Picos_sync.Latch
|
module Latch = Picos_std_sync.Latch
|
||||||
module Ivar = Picos_sync.Ivar
|
module Ivar = Picos_std_sync.Ivar
|
||||||
module Stream = Picos_sync.Stream
|
module Stream = Picos_std_sync.Stream
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue