public final class Agent<T>
extends Object
A thread-safe mutable-value container inspired by Clojure's agents and
GPars' Agent.
An Agent wraps a value that can be read by any thread but
modified only through serialised update functions. Updates are queued
and applied one at a time on a dedicated executor, guaranteeing that
the value is never corrupted by concurrent writes.
Reading the current value via get() is non-blocking and returns a snapshot. Sending an update via send(Function) is also non-blocking — the function is queued and applied asynchronously. Use sendAndGet(Function) to obtain an Awaitable that completes with the new value after the update is applied.
// Groovy:
def counter = Agent.create(0)
counter.send { it + 1
counter.send { it + 1 }
assert await(counter.getAsync()) == 2
// Java:
Agent counter = Agent.create(0);
counter.send(n -> n + 1);
Awaitable result = counter.sendAndGet(n -> n + 1);
}
T - the value type| Modifiers | Name | Description |
|---|---|---|
static int |
DEFAULT_CHANGES_BUFFER |
Default per-subscriber buffer size for changes(). |
| Type Params | Return Type | Name and description |
|---|---|---|
|
public Flow.Publisher<T> |
changes()Returns a Flow.Publisher that emits the agent's value after every successful update. |
<T> |
public static Agent<T> |
create(T initialValue)Creates an agent with the given initial value, using a single-thread executor for serialised updates. |
<T> |
public static Agent<T> |
create(T initialValue, Pool pool)Creates an agent backed by the given pool for update execution. |
|
public T |
get()Returns the current value. |
|
public Awaitable<T> |
getAsync()Returns the current value as an Awaitable. |
|
public void |
send(Function<T, T> updateFn)Queues an update function to be applied to the current value. |
|
public Awaitable<T> |
sendAndGet(Function<T, T> updateFn)Queues an update function and returns an Awaitable that completes with the new value after the update is applied. |
|
public void |
shutdown()Shuts down the agent's update executor. |
|
public String |
toString()Returns the current value in a diagnostic form. |
Default per-subscriber buffer size for changes().
Returns a Flow.Publisher that emits the agent's value after every successful update. The publisher is hot and per-subscriber:
onComplete) when shutdown() is
called. If changes() is first called after
shutdown(), the returned publisher is already closed
and subscribers receive onComplete immediately.Typical use:
for await (newValue in agent.changes()) {
log.info "Agent value is now {", newValue
}
}
Creates an agent with the given initial value, using a single-thread executor for serialised updates.
initialValue - the starting valueT - the value typeCreates an agent backed by the given pool for update execution. Updates are still serialised (only one at a time), but they run on the pool's threads.
initialValue - the starting valuepool - the pool to use for updatesT - the value typeReturns the current value. This is a non-blocking snapshot read.
Returns the current value as an Awaitable. The awaitable completes after all previously queued updates have been applied, ensuring a consistent read.
Queues an update function to be applied to the current value. The function receives the current value and returns the new value.
Updates are applied asynchronously and serialised: only one update runs at a time.
updateFn - a function from current value to new valueQueues an update function and returns an Awaitable that completes with the new value after the update is applied.
updateFn - a function from current value to new value Shuts down the agent's update executor. No further updates will
be accepted. Pending updates are executed before shutdown completes.
The changes publisher (if any subscribers attached) is closed after
pending updates drain, signalling onComplete to all live
subscribers. Calling shutdown() more than once is a no-op.
Returns the current value in a diagnostic form.