Compare commits

...

2 commits

Author SHA1 Message Date
Simon Cruanes
d957f7b54e
small refactor
Some checks failed
github pages / Deploy doc (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / build-compat (push) Has been cancelled
Build and Test / format (push) Has been cancelled
2025-10-25 21:46:20 -04:00
Simon Cruanes
a26503df0b
refactor chan; fix bug in Chan.try_push
we could return `false` even though we succeeded in pushing a value into
the chan.
2025-10-25 21:21:03 -04:00
4 changed files with 29 additions and 26 deletions

View file

@ -21,7 +21,6 @@ let create ~max_size () : _ t =
}
let try_push (self : _ t) x : bool =
let res = ref false in
if Mutex.try_lock self.mutex then (
if self.closed then (
Mutex.unlock self.mutex;
@ -33,42 +32,46 @@ let try_push (self : _ t) x : bool =
let to_awake = Queue.create () in
Queue.push x self.q;
Queue.transfer self.pop_waiters to_awake;
res := true;
Mutex.unlock self.mutex;
(* wake up pop triggers if needed. Be careful to do that
outside the critical section*)
Queue.iter Trigger.signal to_awake
Queue.iter Trigger.signal to_awake;
true
| n when n < self.max_size ->
Queue.push x self.q;
Mutex.unlock self.mutex
| _ -> Mutex.unlock self.mutex
);
!res
Mutex.unlock self.mutex;
true
| _ ->
Mutex.unlock self.mutex;
false
) else
false
let try_pop (type elt) self : elt option =
let res = ref None in
if Mutex.try_lock self.mutex then (
(match Queue.pop self.q with
match Queue.pop self.q with
| exception Queue.Empty ->
if self.closed then (
Mutex.unlock self.mutex;
if self.closed then
raise Closed
)
| x -> res := Some x);
Mutex.unlock self.mutex
);
!res
else
None
| x ->
Mutex.unlock self.mutex;
Some x
) else
None
let close (self : _ t) : unit =
let q = Queue.create () in
let triggers_to_signal = Queue.create () in
Mutex.lock self.mutex;
if not self.closed then (
self.closed <- true;
Queue.transfer self.pop_waiters q;
Queue.transfer self.push_waiters q
Queue.transfer self.pop_waiters triggers_to_signal;
Queue.transfer self.push_waiters triggers_to_signal
);
Mutex.unlock self.mutex;
Queue.iter Trigger.signal q
Queue.iter Trigger.signal triggers_to_signal
let rec push (self : _ t) x : unit =
Mutex.lock self.mutex;

View file

@ -1,7 +1,8 @@
(** Channels.
The channels have bounded size. Push/pop return futures or can use effects
to provide an [await]-friendly version.
The channels have bounded size. They use effects/await to provide
a direct style implementation. Pushing into a full channel,
or popping from an empty one, will suspend the current task.
The channels became bounded since @0.7 .
*)

View file

@ -431,8 +431,7 @@ let await (self : 'a t) : 'a =
| exception C.Running ->
let trigger = Trigger.create () in
(* suspend until the future is resolved *)
if C.try_attach self trigger then
Option.iter Exn_bt.raise @@ Trigger.await trigger;
if C.try_attach self trigger then Trigger.await_exn trigger;
(* un-suspended: we should have a result! *)
get_or_fail_exn self

View file

@ -64,7 +64,7 @@ module State_ = struct
done;
(* wait for the other computation to be done *)
if !must_await then Trigger.await trigger |> Option.iter Exn_bt.raise
if !must_await then Trigger.await_exn trigger
| Right_solved _ | Both_solved _ -> assert false
end
@ -144,7 +144,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
i := !i + len_range
done;
Trigger.await trigger |> Option.iter Exn_bt.raise;
Trigger.await_exn trigger;
Option.iter Exn_bt.raise @@ A.get failure;
()
)