Class BroadcastChannel<T>
- Type Parameters:
T- the value type
Unlike AsyncChannel (point-to-point, each value consumed by
one receiver), a BroadcastChannel delivers every value to
every subscriber that has called subscribe().
def broadcast = BroadcastChannel.create()
def sub1 = broadcast.subscribe()
def sub2 = broadcast.subscribe()
async {
broadcast.send('hello')
broadcast.send('world')
broadcast.close()
}
// Both subscribers receive both values
for await (msg in sub1) { println "Sub1: $msg" }
for await (msg in sub2) { println "Sub2: $msg" }
Inspired by GPars' DataflowBroadcast.
- Since:
- 6.0.0
- See Also:
-
Method Summary
Modifier and TypeMethodDescriptionReturns aFlow.Publisherview of this broadcast channel.voidclose()Closes this broadcast channel and all subscriber channels.static <T> BroadcastChannel<T>create()Creates a new broadcast channel.intReturns the number of current subscribers.booleanisClosed()Returnstrueif this broadcast channel has been closed.Sends a value to all current subscribers.Creates a new subscriber channel.subscribe(int bufferSize) Creates a new subscriber channel with the specified buffer capacity.
-
Method Details
-
create
Creates a new broadcast channel.- Type Parameters:
T- the value type- Returns:
- a new BroadcastChannel
-
subscribe
Creates a new subscriber channel. The returnedAsyncChannelwill receive all values sent to this broadcast from this point forward. Each subscriber is independent — values are buffered per subscriber.- Returns:
- a new subscriber channel
-
subscribe
Creates a new subscriber channel with the specified buffer capacity.- Parameters:
bufferSize- the buffer capacity for this subscriber- Returns:
- a new subscriber channel
-
send
Sends a value to all current subscribers.- Parameters:
value- the value to broadcast- Returns:
- an Awaitable that completes when all subscribers have accepted the value
- Throws:
ChannelClosedException- if the broadcast channel is closed
-
close
public void close()Closes this broadcast channel and all subscriber channels. -
isClosed
public boolean isClosed()Returnstrueif this broadcast channel has been closed. -
getSubscriberCount
public int getSubscriberCount()Returns the number of current subscribers. -
asPublisher
Returns aFlow.Publisherview of this broadcast channel. Each call toFlow.Publisher.subscribe(Flow.Subscriber)on the returned publisher creates a newAsyncChannelsubscriber under the hood, draining values to the downstream subscriber according to its requested demand.Semantics:
- Cold per-subscribe binding: each subscription starts seeing values
from the moment it subscribes (consistent with
subscribe()). - Backpressure: respects
request(n); the worker blocks the broadcast send when no demand exists (sender-side backpressure). - Cancellation: closes this subscriber's channel and removes it from the broadcast's subscriber set.
- Completion: signals
onCompletewhen the broadcast channel is closed and the per-subscriber buffer drained.
Backpressure policy (important). This bridge uses lossless, sender-gated backpressure:
send(Object)awaits delivery to every live subscriber, and each per-subscriber channel has a bounded buffer (default 16). A subscriber that never callsrequest(n), or that requests slowly, will fill its buffer; once full, the subscriber's channel suspends its backingsend, which in turn stallsBroadcastChannel.send(...)for all subscribers. In other words, the slowest subscriber controls producer throughput.This is intentional and matches the point-to-point semantics of
subscribe(): values are neither dropped nor reordered. If you need decoupled per-subscriber policies (drop-newest, drop-oldest, latest-only, or unbounded buffering), wrap the publisher with a Reactive Streams operator of your choice, or use a subscriber that drains promptly withrequest(Long.MAX_VALUE).- Returns:
- a
Flow.Publisherbacked by per-subscriber channels - Since:
- 6.0.0
- Cold per-subscribe binding: each subscription starts seeing values
from the moment it subscribes (consistent with
-