如何终止 Agrona Agent

如何终止 Agrona Agent


问题描述

一个进程由一个或多个 Agrona Agent 构建。Agent 中发生了终端错误(terminal error),你希望该 Agent 终止。

解决方案

在 Agent 的 duty cycle 中,抛出 AgentTerminationException

示例代码

Agent 的 duty cycle 示例:


                int doWork()
                {
                    if (terminalError)
                    {
                        throw new AgentTerminationException("reason");
                    }
                    ...
                }
                

详细说明

Agent 终止机制


                Agent 终止流程:
                ┌──────────────────────────────────────────────────┐
                │  Agent Runner 主循环                             │
                │                                                  │
                │  while (isRunning) {                             │
                │    ┌──────────────────────────────┐             │
                │    │  try {                       │             │
                │    │    agent.doWork()            │             │
                │    │  }                           │             │
                │    └──────────┬───────────────────┘             │
                │               │                                  │
                │               ▼                                  │
                │    ┌──────────────────────────────┐             │
                │    │  catch (AgentTermination) {  │             │
                │    │    isRunning = false         │◄────┐       │
                │    │    handleError(ex)           │     │       │
                │    │  }                           │     │       │
                │    └──────────────────────────────┘     │       │
                │               │                          │       │
                │               ▼                          │       │
                │    ┌──────────────────────────────┐     │       │
                │    │  应用空闲策略                │     │       │
                │    └──────────────────────────────┘     │       │
                │  }                                       │       │
                │               │                          │       │
                │               ▼                          │       │
                │  ┌─────────────────────────┐            │       │
                │  │  Agent 已终止            │            │       │
                │  │  线程退出                │            │       │
                │  └─────────────────────────┘            │       │
                │                                          │       │
                │  Agent 抛出 AgentTerminationException ───┘       │
                └──────────────────────────────────────────────────┘
                

AgentRunner 实现详解

1. 运行循环

典型情况下,AgentRunner 用于管理在专用线程(或通过 CompositeAgent 共享)上运行的 Agent。在 AgentRunnerrun() 方法中,duty cycle 在一个 while 循环中执行,该循环检查 isRunning 标志:


                while (isRunning)
                {
                    doDutyCycle(idleStrategy, agent);
                }
                

2. 异常捕获

如果你的 Agent 抛出 AgentTerminationException,isRunning 标志会被设置为 false:


                private void doDutyCycle(final IdleStrategy idleStrategy, final Agent agent)
                {
                    try
                    {
                        ...
                    }
                    catch (final AgentTerminationException ex)
                    {
                        isRunning = false;
                        handleError(ex);
                    }
                    ...
                }
                

AgentInvoker 实现

AgentRunner 的主要替代方案 AgentInvoker 有类似的结构:

1. 启动


                public void start()
                {
                    ...
                    isRunning = true;
                    ...
                }
                

2. 调用与终止

同样,如果 AgentAgentInvoker.invoke() 方法调用期间抛出 AgentTerminationException,isRunning 标志会被设置为 false:


                public int invoke()
                {
                    try
                    {
                        ...
                    }
                    catch (final AgentTerminationException ex)
                    {
                        isRunning = false;
                        ...
                    }
                    ...
                }
                

由于 AgentInvoker 需要调用者运行 invoke(),你可以通过调用 AgentInvoker.isRunning() 方法来确认 isRunning 的状态。

完整实现示例


                import org.agrona.concurrent.*;

                /**
                 * 演示Agent终止机制
                 */
                public class AgentTerminationExample {

                    /**
                     * 会终止的Agent
                     */
                    static class TerminatingAgent implements Agent {
                        private int workCount = 0;
                        private final int maxWork;
                        private volatile boolean errorOccurred = false;

                        public TerminatingAgent(int maxWork) {
                            this.maxWork = maxWork;
                        }

                        @Override
                        public int doWork() {
                            // 检查是否发生错误
                            if (errorOccurred) {
                                throw new AgentTerminationException(
                                    "检测到错误,Agent终止"
                                );
                            }

                            // 检查是否达到工作上限
                            if (workCount >= maxWork) {
                                throw new AgentTerminationException(
                                    "达到最大工作量: " + maxWork
                                );
                            }

                            // 执行正常工作
                            workCount++;
                            System.out.println("执行工作 #" + workCount);

                            // 模拟可能的错误
                            if (Math.random() < 0.1) {  // 10%概率发生错误
                                errorOccurred = true;
                            }

                            return 1;
                        }

                        @Override
                        public String roleName() {
                            return "terminating-agent";
                        }

                        public void triggerError() {
                            errorOccurred = true;
                        }
                    }

                    /**
                     * 使用 AgentRunner
                     */
                    public static void exampleWithAgentRunner() {
                        TerminatingAgent agent = new TerminatingAgent(100);

                        AgentRunner runner = new AgentRunner(
                            new SleepingIdleStrategy(100),
                            error -> System.err.println("错误: " + error.getMessage()),
                            null,
                            agent
                        );

                        // 启动Agent
                        AgentRunner.startOnThread(runner);

                        // 等待Agent终止
                        try {
                            Thread.sleep(5000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                        // 检查状态
                        System.out.println("Agent 是否在运行: " + runner.isRunning());

                        runner.close();
                    }

                    /**
                     * 使用 AgentInvoker
                     */
                    public static void exampleWithAgentInvoker() {
                        TerminatingAgent agent = new TerminatingAgent(10);

                        AgentInvoker invoker = new AgentInvoker(
                            error -> System.err.println("错误: " + error.getMessage()),
                            null,
                            agent
                        );

                        invoker.start();

                        // 手动调用直到Agent终止
                        while (invoker.isRunning()) {
                            invoker.invoke();

                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                break;
                            }
                        }

                        System.out.println("Agent 已终止");
                        invoker.close();
                    }

                    public static void main(String[] args) {
                        System.out.println("=== AgentRunner 示例 ===");
                        exampleWithAgentRunner();

                        System.out.println("\n=== AgentInvoker 示例 ===");
                        exampleWithAgentInvoker();
                    }
                }
                

终止场景分析


                常见终止场景:
                ┌─────────────────────────────────────────────┐
                │ 1. 资源耗尽                                 │
                │    ├─ 内存不足                              │
                │    ├─ 磁盘空间不足                          │
                │    └─ 文件句柄耗尽                          │
                │                                             │
                │ 2. 业务逻辑错误                             │
                │    ├─ 无效的消息格式                        │
                │    ├─ 数据一致性冲突                        │
                │    └─ 业务规则违反                          │
                │                                             │
                │ 3. 外部依赖失败                             │
                │    ├─ 数据库连接失败                        │
                │    ├─ 网络中断                              │
                │    └─ 第三方服务不可用                      │
                │                                             │
                │ 4. 配置错误                                 │
                │    ├─ 无效的配置参数                        │
                │    ├─ 权限不足                              │
                │    └─ 环境不匹配                            │
                │                                             │
                │ 5. 主动终止                                 │
                │    ├─ 达到工作上限                          │
                │    ├─ 完成预定任务                          │
                │    └─ 收到终止信号                          │
                └─────────────────────────────────────────────┘
                

错误处理策略


                /**
                 * 带重试机制的Agent
                 */
                public class ResilientAgent implements Agent {
                    private int errorCount = 0;
                    private final int maxRetries;

                    public ResilientAgent(int maxRetries) {
                        this.maxRetries = maxRetries;
                    }

                    @Override
                    public int doWork() {
                        try {
                            // 尝试执行工作
                            return doWorkInternal();
                        } catch (RecoverableException e) {
                            // 可恢复的错误,增加错误计数
                            errorCount++;

                            if (errorCount > maxRetries) {
                                // 超过重试次数,终止Agent
                                throw new AgentTerminationException(
                                    "超过最大重试次数: " + maxRetries,
                                    e
                                );
                            }

                            // 记录错误但继续运行
                            System.err.println("可恢复错误 (" + errorCount + "/" +
                                maxRetries + "): " + e.getMessage());
                            return 0;
                        } catch (FatalException e) {
                            // 致命错误,立即终止
                            throw new AgentTerminationException(
                                "致命错误: " + e.getMessage(),
                                e
                            );
                        }
                    }

                    private int doWorkInternal() throws RecoverableException, FatalException {
                        // 实际工作逻辑
                        return 1;
                    }

                    @Override
                    public String roleName() {
                        return "resilient-agent";
                    }

                    // 自定义异常
                    static class RecoverableException extends Exception {
                        public RecoverableException(String message) {
                            super(message);
                        }
                    }

                    static class FatalException extends Exception {
                        public FatalException(String message) {
                            super(message);
                        }
                    }
                }
                

优雅关闭


                /**
                 * 支持优雅关闭的Agent
                 */
                public class GracefulAgent implements Agent {
                    private volatile boolean shutdownRequested = false;
                    private final Queue<Task> pendingTasks = new ConcurrentLinkedQueue<>();

                    @Override
                    public int doWork() {
                        // 检查是否请求关闭
                        if (shutdownRequested) {
                            // 处理剩余任务
                            if (pendingTasks.isEmpty()) {
                                throw new AgentTerminationException(
                                    "优雅关闭: 所有任务已完成"
                                );
                            } else {
                                System.out.println("关闭中,剩余任务: " +
                                    pendingTasks.size());
                            }
                        }

                        // 处理任务
                        Task task = pendingTasks.poll();
                        if (task != null) {
                            task.execute();
                            return 1;
                        }

                        return 0;
                    }

                    @Override
                    public String roleName() {
                        return "graceful-agent";
                    }

                    @Override
                    public void onClose() {
                        // 清理资源
                        pendingTasks.clear();
                        System.out.println("Agent 已关闭");
                    }

                    public void requestShutdown() {
                        shutdownRequested = true;
                    }

                    public void addTask(Task task) {
                        pendingTasks.offer(task);
                    }

                    interface Task {
                        void execute();
                    }
                }
                

监控Agent状态


                /**
                 * Agent状态监控
                 */
                public class AgentMonitor {

                    public static void monitorAgent(AgentRunner runner) {
                        // 创建监控线程
                        Thread monitor = new Thread(() -> {
                            while (runner.isRunning()) {
                                System.out.println("Agent 状态: 运行中");

                                try {
                                    Thread.sleep(1000);
                                } catch (InterruptedException e) {
                                    break;
                                }
                            }

                            System.out.println("Agent 状态: 已终止");
                        });

                        monitor.setDaemon(true);
                        monitor.start();
                    }

                    public static void monitorAgentInvoker(AgentInvoker invoker) {
                        // 在主循环中检查状态
                        int iterations = 0;
                        while (invoker.isRunning()) {
                            int work = invoker.invoke();
                            iterations++;

                            if (iterations % 100 == 0) {
                                System.out.println(
                                    "迭代: " + iterations + ", 工作量: " + work
                                );
                            }
                        }

                        System.out.println("Agent 在 " + iterations + " 次迭代后终止");
                    }
                }
                

级联终止


                /**
                 * 管理多个Agent的协调器
                 */
                public class AgentCoordinator {
                    private final List<AgentRunner> runners = new ArrayList<>();

                    public void addAgent(AgentRunner runner) {
                        runners.add(runner);
                    }

                    /**
                     * 当一个Agent终止时,终止所有Agent
                     */
                    public void startWithCascadingShutdown() {
                        // 监控所有Agent
                        Thread monitor = new Thread(() -> {
                            while (true) {
                                // 检查是否有Agent终止
                                for (AgentRunner runner : runners) {
                                    if (!runner.isRunning()) {
                                        System.out.println(
                                            "检测到Agent终止,关闭所有Agent"
                                        );
                                        shutdownAll();
                                        return;
                                    }
                                }

                                try {
                                    Thread.sleep(100);
                                } catch (InterruptedException e) {
                                    break;
                                }
                            }
                        });

                        monitor.setDaemon(true);
                        monitor.start();
                    }

                    private void shutdownAll() {
                        for (AgentRunner runner : runners) {
                            runner.close();
                        }
                    }
                }
                

与外部信号集成


                /**
                 * 响应外部信号的Agent
                 */
                public class SignalAwareAgent implements Agent {
                    private volatile boolean terminateSignalReceived = false;

                    @Override
                    public int doWork() {
                        if (terminateSignalReceived) {
                            throw new AgentTerminationException(
                                "收到外部终止信号"
                            );
                        }

                        // 正常工作
                        return doNormalWork();
                    }

                    @Override
                    public String roleName() {
                        return "signal-aware-agent";
                    }

                    public void receiveTerminateSignal() {
                        terminateSignalReceived = true;
                    }

                    private int doNormalWork() {
                        // 实际工作逻辑
                        return 1;
                    }
                }

                // 使用示例
                public class SignalExample {
                    public static void main(String[] args) {
                        SignalAwareAgent agent = new SignalAwareAgent();

                        AgentRunner runner = new AgentRunner(
                            new BusySpinIdleStrategy(),
                            Throwable::printStackTrace,
                            null,
                            agent
                        );

                        AgentRunner.startOnThread(runner);

                        // 注册shutdown hook
                        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                            System.out.println("收到JVM关闭信号");
                            agent.receiveTerminateSignal();
                        }));

                        // 或者响应用户输入
                        Scanner scanner = new Scanner(System.in);
                        System.out.println("按回车键终止Agent...");
                        scanner.nextLine();
                        agent.receiveTerminateSignal();
                    }
                }
                

相关链接

总结

使用 AgentTerminationException 终止 Agrona Agent:

核心机制:

  • ✅ 在 doWork() 中抛出 AgentTerminationException
  • AgentRunnerAgentInvoker 都会捕获此异常
  • isRunning 标志被设置为 false
  • ✅ Agent 循环退出,线程终止

使用场景:

  • 资源耗尽
  • 致命错误
  • 业务逻辑要求终止
  • 达到工作上限
  • 外部信号触发

最佳实践:

  • 提供清晰的终止原因
  • 实现优雅关闭(处理待处理任务)
  • 区分可恢复错误和致命错误
  • 实现重试机制
  • 监控 Agent 状态
  • 清理资源(实现 onClose())

注意事项:

  • AgentTerminationException 是正常的终止机制,不是错误
  • 终止后 Agent 无法重启,需要创建新实例
  • CompositeAgent 中,一个 Agent 终止不会影响其他 Agent
  • 使用 AgentInvoker 时,可以通过 isRunning() 检查状态

选择合适的终止策略,确保系统能够优雅地处理 Agent 生命周期。