Compare commits

...

2 commits

Author SHA1 Message Date
Simon Cruanes
d957f7b54e
small refactor
Some checks failed
github pages / Deploy doc (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / build-compat (push) Has been cancelled
Build and Test / format (push) Has been cancelled
2025-10-25 21:46:20 -04:00
Simon Cruanes
a26503df0b
refactor chan; fix bug in Chan.try_push
we could return `false` even though we succeeded in pushing a value into
the chan.
2025-10-25 21:21:03 -04:00
4 changed files with 29 additions and 26 deletions

View file

@ -21,7 +21,6 @@ let create ~max_size () : _ t =
} }
let try_push (self : _ t) x : bool = let try_push (self : _ t) x : bool =
let res = ref false in
if Mutex.try_lock self.mutex then ( if Mutex.try_lock self.mutex then (
if self.closed then ( if self.closed then (
Mutex.unlock self.mutex; Mutex.unlock self.mutex;
@ -33,42 +32,46 @@ let try_push (self : _ t) x : bool =
let to_awake = Queue.create () in let to_awake = Queue.create () in
Queue.push x self.q; Queue.push x self.q;
Queue.transfer self.pop_waiters to_awake; Queue.transfer self.pop_waiters to_awake;
res := true;
Mutex.unlock self.mutex; Mutex.unlock self.mutex;
(* wake up pop triggers if needed. Be careful to do that (* wake up pop triggers if needed. Be careful to do that
outside the critical section*) outside the critical section*)
Queue.iter Trigger.signal to_awake Queue.iter Trigger.signal to_awake;
true
| n when n < self.max_size -> | n when n < self.max_size ->
Queue.push x self.q; Queue.push x self.q;
Mutex.unlock self.mutex Mutex.unlock self.mutex;
| _ -> Mutex.unlock self.mutex true
); | _ ->
!res Mutex.unlock self.mutex;
false
) else
false
let try_pop (type elt) self : elt option = let try_pop (type elt) self : elt option =
let res = ref None in
if Mutex.try_lock self.mutex then ( if Mutex.try_lock self.mutex then (
(match Queue.pop self.q with match Queue.pop self.q with
| exception Queue.Empty -> | exception Queue.Empty ->
if self.closed then ( Mutex.unlock self.mutex;
Mutex.unlock self.mutex; if self.closed then
raise Closed raise Closed
) else
| x -> res := Some x); None
Mutex.unlock self.mutex | x ->
); Mutex.unlock self.mutex;
!res Some x
) else
None
let close (self : _ t) : unit = let close (self : _ t) : unit =
let q = Queue.create () in let triggers_to_signal = Queue.create () in
Mutex.lock self.mutex; Mutex.lock self.mutex;
if not self.closed then ( if not self.closed then (
self.closed <- true; self.closed <- true;
Queue.transfer self.pop_waiters q; Queue.transfer self.pop_waiters triggers_to_signal;
Queue.transfer self.push_waiters q Queue.transfer self.push_waiters triggers_to_signal
); );
Mutex.unlock self.mutex; Mutex.unlock self.mutex;
Queue.iter Trigger.signal q Queue.iter Trigger.signal triggers_to_signal
let rec push (self : _ t) x : unit = let rec push (self : _ t) x : unit =
Mutex.lock self.mutex; Mutex.lock self.mutex;

View file

@ -1,7 +1,8 @@
(** Channels. (** Channels.
The channels have bounded size. Push/pop return futures or can use effects The channels have bounded size. They use effects/await to provide
to provide an [await]-friendly version. a direct style implementation. Pushing into a full channel,
or popping from an empty one, will suspend the current task.
The channels became bounded since @0.7 . The channels became bounded since @0.7 .
*) *)

View file

@ -431,8 +431,7 @@ let await (self : 'a t) : 'a =
| exception C.Running -> | exception C.Running ->
let trigger = Trigger.create () in let trigger = Trigger.create () in
(* suspend until the future is resolved *) (* suspend until the future is resolved *)
if C.try_attach self trigger then if C.try_attach self trigger then Trigger.await_exn trigger;
Option.iter Exn_bt.raise @@ Trigger.await trigger;
(* un-suspended: we should have a result! *) (* un-suspended: we should have a result! *)
get_or_fail_exn self get_or_fail_exn self

View file

@ -64,7 +64,7 @@ module State_ = struct
done; done;
(* wait for the other computation to be done *) (* wait for the other computation to be done *)
if !must_await then Trigger.await trigger |> Option.iter Exn_bt.raise if !must_await then Trigger.await_exn trigger
| Right_solved _ | Both_solved _ -> assert false | Right_solved _ | Both_solved _ -> assert false
end end
@ -144,7 +144,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
i := !i + len_range i := !i + len_range
done; done;
Trigger.await trigger |> Option.iter Exn_bt.raise; Trigger.await_exn trigger;
Option.iter Exn_bt.raise @@ A.get failure; Option.iter Exn_bt.raise @@ A.get failure;
() ()
) )