如何终止 Agrona Agent
问题描述
一个进程由一个或多个 Agrona Agent 构建。Agent 中发生了终端错误(terminal error),你希望该 Agent 终止。
解决方案
在 Agent 的 duty cycle 中,抛出 AgentTerminationException。
示例代码
Agent 的 duty cycle 示例:
int doWork()
{
if (terminalError)
{
throw new AgentTerminationException("reason");
}
...
}
详细说明
Agent 终止机制
Agent 终止流程:
┌──────────────────────────────────────────────────┐
│ Agent Runner 主循环 │
│ │
│ while (isRunning) { │
│ ┌──────────────────────────────┐ │
│ │ try { │ │
│ │ agent.doWork() │ │
│ │ } │ │
│ └──────────┬───────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ catch (AgentTermination) { │ │
│ │ isRunning = false │◄────┐ │
│ │ handleError(ex) │ │ │
│ │ } │ │ │
│ └──────────────────────────────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌──────────────────────────────┐ │ │
│ │ 应用空闲策略 │ │ │
│ └──────────────────────────────┘ │ │
│ } │ │
│ │ │ │
│ ▼ │ │
│ ┌─────────────────────────┐ │ │
│ │ Agent 已终止 │ │ │
│ │ 线程退出 │ │ │
│ └─────────────────────────┘ │ │
│ │ │
│ Agent 抛出 AgentTerminationException ───┘ │
└──────────────────────────────────────────────────┘
AgentRunner 实现详解
1. 运行循环
典型情况下,AgentRunner 用于管理在专用线程(或通过 CompositeAgent 共享)上运行的 Agent。在 AgentRunner 的 run() 方法中,duty cycle 在一个 while 循环中执行,该循环检查 isRunning 标志:
while (isRunning)
{
doDutyCycle(idleStrategy, agent);
}
2. 异常捕获
如果你的 Agent 抛出 AgentTerminationException,isRunning 标志会被设置为 false:
private void doDutyCycle(final IdleStrategy idleStrategy, final Agent agent)
{
try
{
...
}
catch (final AgentTerminationException ex)
{
isRunning = false;
handleError(ex);
}
...
}
AgentInvoker 实现
AgentRunner 的主要替代方案 AgentInvoker 有类似的结构:
1. 启动
public void start()
{
...
isRunning = true;
...
}
2. 调用与终止
同样,如果 Agent 在 AgentInvoker.invoke() 方法调用期间抛出 AgentTerminationException,isRunning 标志会被设置为 false:
public int invoke()
{
try
{
...
}
catch (final AgentTerminationException ex)
{
isRunning = false;
...
}
...
}
由于 AgentInvoker 需要调用者运行 invoke(),你可以通过调用 AgentInvoker.isRunning() 方法来确认 isRunning 的状态。
完整实现示例
import org.agrona.concurrent.*;
/**
* 演示Agent终止机制
*/
public class AgentTerminationExample {
/**
* 会终止的Agent
*/
static class TerminatingAgent implements Agent {
private int workCount = 0;
private final int maxWork;
private volatile boolean errorOccurred = false;
public TerminatingAgent(int maxWork) {
this.maxWork = maxWork;
}
@Override
public int doWork() {
// 检查是否发生错误
if (errorOccurred) {
throw new AgentTerminationException(
"检测到错误,Agent终止"
);
}
// 检查是否达到工作上限
if (workCount >= maxWork) {
throw new AgentTerminationException(
"达到最大工作量: " + maxWork
);
}
// 执行正常工作
workCount++;
System.out.println("执行工作 #" + workCount);
// 模拟可能的错误
if (Math.random() < 0.1) { // 10%概率发生错误
errorOccurred = true;
}
return 1;
}
@Override
public String roleName() {
return "terminating-agent";
}
public void triggerError() {
errorOccurred = true;
}
}
/**
* 使用 AgentRunner
*/
public static void exampleWithAgentRunner() {
TerminatingAgent agent = new TerminatingAgent(100);
AgentRunner runner = new AgentRunner(
new SleepingIdleStrategy(100),
error -> System.err.println("错误: " + error.getMessage()),
null,
agent
);
// 启动Agent
AgentRunner.startOnThread(runner);
// 等待Agent终止
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 检查状态
System.out.println("Agent 是否在运行: " + runner.isRunning());
runner.close();
}
/**
* 使用 AgentInvoker
*/
public static void exampleWithAgentInvoker() {
TerminatingAgent agent = new TerminatingAgent(10);
AgentInvoker invoker = new AgentInvoker(
error -> System.err.println("错误: " + error.getMessage()),
null,
agent
);
invoker.start();
// 手动调用直到Agent终止
while (invoker.isRunning()) {
invoker.invoke();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
System.out.println("Agent 已终止");
invoker.close();
}
public static void main(String[] args) {
System.out.println("=== AgentRunner 示例 ===");
exampleWithAgentRunner();
System.out.println("\n=== AgentInvoker 示例 ===");
exampleWithAgentInvoker();
}
}
终止场景分析
常见终止场景:
┌─────────────────────────────────────────────┐
│ 1. 资源耗尽 │
│ ├─ 内存不足 │
│ ├─ 磁盘空间不足 │
│ └─ 文件句柄耗尽 │
│ │
│ 2. 业务逻辑错误 │
│ ├─ 无效的消息格式 │
│ ├─ 数据一致性冲突 │
│ └─ 业务规则违反 │
│ │
│ 3. 外部依赖失败 │
│ ├─ 数据库连接失败 │
│ ├─ 网络中断 │
│ └─ 第三方服务不可用 │
│ │
│ 4. 配置错误 │
│ ├─ 无效的配置参数 │
│ ├─ 权限不足 │
│ └─ 环境不匹配 │
│ │
│ 5. 主动终止 │
│ ├─ 达到工作上限 │
│ ├─ 完成预定任务 │
│ └─ 收到终止信号 │
└─────────────────────────────────────────────┘
错误处理策略
/**
* 带重试机制的Agent
*/
public class ResilientAgent implements Agent {
private int errorCount = 0;
private final int maxRetries;
public ResilientAgent(int maxRetries) {
this.maxRetries = maxRetries;
}
@Override
public int doWork() {
try {
// 尝试执行工作
return doWorkInternal();
} catch (RecoverableException e) {
// 可恢复的错误,增加错误计数
errorCount++;
if (errorCount > maxRetries) {
// 超过重试次数,终止Agent
throw new AgentTerminationException(
"超过最大重试次数: " + maxRetries,
e
);
}
// 记录错误但继续运行
System.err.println("可恢复错误 (" + errorCount + "/" +
maxRetries + "): " + e.getMessage());
return 0;
} catch (FatalException e) {
// 致命错误,立即终止
throw new AgentTerminationException(
"致命错误: " + e.getMessage(),
e
);
}
}
private int doWorkInternal() throws RecoverableException, FatalException {
// 实际工作逻辑
return 1;
}
@Override
public String roleName() {
return "resilient-agent";
}
// 自定义异常
static class RecoverableException extends Exception {
public RecoverableException(String message) {
super(message);
}
}
static class FatalException extends Exception {
public FatalException(String message) {
super(message);
}
}
}
优雅关闭
/**
* 支持优雅关闭的Agent
*/
public class GracefulAgent implements Agent {
private volatile boolean shutdownRequested = false;
private final Queue<Task> pendingTasks = new ConcurrentLinkedQueue<>();
@Override
public int doWork() {
// 检查是否请求关闭
if (shutdownRequested) {
// 处理剩余任务
if (pendingTasks.isEmpty()) {
throw new AgentTerminationException(
"优雅关闭: 所有任务已完成"
);
} else {
System.out.println("关闭中,剩余任务: " +
pendingTasks.size());
}
}
// 处理任务
Task task = pendingTasks.poll();
if (task != null) {
task.execute();
return 1;
}
return 0;
}
@Override
public String roleName() {
return "graceful-agent";
}
@Override
public void onClose() {
// 清理资源
pendingTasks.clear();
System.out.println("Agent 已关闭");
}
public void requestShutdown() {
shutdownRequested = true;
}
public void addTask(Task task) {
pendingTasks.offer(task);
}
interface Task {
void execute();
}
}
监控Agent状态
/**
* Agent状态监控
*/
public class AgentMonitor {
public static void monitorAgent(AgentRunner runner) {
// 创建监控线程
Thread monitor = new Thread(() -> {
while (runner.isRunning()) {
System.out.println("Agent 状态: 运行中");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
System.out.println("Agent 状态: 已终止");
});
monitor.setDaemon(true);
monitor.start();
}
public static void monitorAgentInvoker(AgentInvoker invoker) {
// 在主循环中检查状态
int iterations = 0;
while (invoker.isRunning()) {
int work = invoker.invoke();
iterations++;
if (iterations % 100 == 0) {
System.out.println(
"迭代: " + iterations + ", 工作量: " + work
);
}
}
System.out.println("Agent 在 " + iterations + " 次迭代后终止");
}
}
级联终止
/**
* 管理多个Agent的协调器
*/
public class AgentCoordinator {
private final List<AgentRunner> runners = new ArrayList<>();
public void addAgent(AgentRunner runner) {
runners.add(runner);
}
/**
* 当一个Agent终止时,终止所有Agent
*/
public void startWithCascadingShutdown() {
// 监控所有Agent
Thread monitor = new Thread(() -> {
while (true) {
// 检查是否有Agent终止
for (AgentRunner runner : runners) {
if (!runner.isRunning()) {
System.out.println(
"检测到Agent终止,关闭所有Agent"
);
shutdownAll();
return;
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
});
monitor.setDaemon(true);
monitor.start();
}
private void shutdownAll() {
for (AgentRunner runner : runners) {
runner.close();
}
}
}
与外部信号集成
/**
* 响应外部信号的Agent
*/
public class SignalAwareAgent implements Agent {
private volatile boolean terminateSignalReceived = false;
@Override
public int doWork() {
if (terminateSignalReceived) {
throw new AgentTerminationException(
"收到外部终止信号"
);
}
// 正常工作
return doNormalWork();
}
@Override
public String roleName() {
return "signal-aware-agent";
}
public void receiveTerminateSignal() {
terminateSignalReceived = true;
}
private int doNormalWork() {
// 实际工作逻辑
return 1;
}
}
// 使用示例
public class SignalExample {
public static void main(String[] args) {
SignalAwareAgent agent = new SignalAwareAgent();
AgentRunner runner = new AgentRunner(
new BusySpinIdleStrategy(),
Throwable::printStackTrace,
null,
agent
);
AgentRunner.startOnThread(runner);
// 注册shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("收到JVM关闭信号");
agent.receiveTerminateSignal();
}));
// 或者响应用户输入
Scanner scanner = new Scanner(System.in);
System.out.println("按回车键终止Agent...");
scanner.nextLine();
agent.receiveTerminateSignal();
}
}
相关链接
总结
使用 AgentTerminationException 终止 Agrona Agent:
核心机制:
- ✅ 在
doWork()中抛出AgentTerminationException - ✅
AgentRunner和AgentInvoker都会捕获此异常 - ✅
isRunning标志被设置为 false - ✅ Agent 循环退出,线程终止
使用场景:
- 资源耗尽
- 致命错误
- 业务逻辑要求终止
- 达到工作上限
- 外部信号触发
最佳实践:
- 提供清晰的终止原因
- 实现优雅关闭(处理待处理任务)
- 区分可恢复错误和致命错误
- 实现重试机制
- 监控 Agent 状态
- 清理资源(实现
onClose())
注意事项:
AgentTerminationException是正常的终止机制,不是错误- 终止后 Agent 无法重启,需要创建新实例
- 在
CompositeAgent中,一个 Agent 终止不会影响其他 Agent - 使用
AgentInvoker时,可以通过isRunning()检查状态
选择合适的终止策略,确保系统能够优雅地处理 Agent 生命周期。