Compare commits

..

6 commits

Author SHA1 Message Date
Simon Cruanes
4b9e480013
test: update readme and the mdx test 2025-10-22 12:00:05 -04:00
Simon Cruanes
f8e553d473
test: update tests, removing the fibers and cancellation tests 2025-10-22 11:59:35 -04:00
Simon Cruanes
7082447073
feat core: add Main, salvaged from moonpool.fib 2025-10-22 11:57:50 -04:00
Simon Cruanes
6715502fdd
remove moonpool.fib
it's complicated and hard to use in practice, because it's not obvious
if a piece of code is running under another fiber or not, so
`Fiber.spawn` might fail because it has no parent.

So in practice we've been using `Fiber.spawn_top`… which has no
interest over just using `Fut.spawn`.
2025-10-22 11:35:46 -04:00
Simon Cruanes
e95f0e421d
doc for Fut 2025-10-22 11:34:01 -04:00
Simon Cruanes
3e2ce57669
doc 2025-10-22 11:33:48 -04:00
4 changed files with 26 additions and 29 deletions

View file

@ -21,6 +21,7 @@ let create ~max_size () : _ t =
}
let try_push (self : _ t) x : bool =
let res = ref false in
if Mutex.try_lock self.mutex then (
if self.closed then (
Mutex.unlock self.mutex;
@ -32,46 +33,42 @@ let try_push (self : _ t) x : bool =
let to_awake = Queue.create () in
Queue.push x self.q;
Queue.transfer self.pop_waiters to_awake;
res := true;
Mutex.unlock self.mutex;
(* wake up pop triggers if needed. Be careful to do that
outside the critical section*)
Queue.iter Trigger.signal to_awake;
true
Queue.iter Trigger.signal to_awake
| n when n < self.max_size ->
Queue.push x self.q;
Mutex.unlock self.mutex;
true
| _ ->
Mutex.unlock self.mutex;
false
) else
false
Mutex.unlock self.mutex
| _ -> Mutex.unlock self.mutex
);
!res
let try_pop (type elt) self : elt option =
let res = ref None in
if Mutex.try_lock self.mutex then (
match Queue.pop self.q with
(match Queue.pop self.q with
| exception Queue.Empty ->
Mutex.unlock self.mutex;
if self.closed then
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
else
None
| x ->
Mutex.unlock self.mutex;
Some x
) else
None
)
| x -> res := Some x);
Mutex.unlock self.mutex
);
!res
let close (self : _ t) : unit =
let triggers_to_signal = Queue.create () in
let q = Queue.create () in
Mutex.lock self.mutex;
if not self.closed then (
self.closed <- true;
Queue.transfer self.pop_waiters triggers_to_signal;
Queue.transfer self.push_waiters triggers_to_signal
Queue.transfer self.pop_waiters q;
Queue.transfer self.push_waiters q
);
Mutex.unlock self.mutex;
Queue.iter Trigger.signal triggers_to_signal
Queue.iter Trigger.signal q
let rec push (self : _ t) x : unit =
Mutex.lock self.mutex;

View file

@ -1,8 +1,7 @@
(** Channels.
The channels have bounded size. They use effects/await to provide
a direct style implementation. Pushing into a full channel,
or popping from an empty one, will suspend the current task.
The channels have bounded size. Push/pop return futures or can use effects
to provide an [await]-friendly version.
The channels became bounded since @0.7 .
*)

View file

@ -431,7 +431,8 @@ let await (self : 'a t) : 'a =
| exception C.Running ->
let trigger = Trigger.create () in
(* suspend until the future is resolved *)
if C.try_attach self trigger then Trigger.await_exn trigger;
if C.try_attach self trigger then
Option.iter Exn_bt.raise @@ Trigger.await trigger;
(* un-suspended: we should have a result! *)
get_or_fail_exn self

View file

@ -64,7 +64,7 @@ module State_ = struct
done;
(* wait for the other computation to be done *)
if !must_await then Trigger.await_exn trigger
if !must_await then Trigger.await trigger |> Option.iter Exn_bt.raise
| Right_solved _ | Both_solved _ -> assert false
end
@ -144,7 +144,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
i := !i + len_range
done;
Trigger.await_exn trigger;
Trigger.await trigger |> Option.iter Exn_bt.raise;
Option.iter Exn_bt.raise @@ A.get failure;
()
)