如何将多个 Agent 组合到单个线程上

如何将多个 Agent 组合到单个线程上


问题描述

你已经将进程构建为多个 Agent,但希望其中几个 Agent 在同一个线程上运行。

解决方案

将你的 Agent 组合成一个 CompositeAgent,然后在 AgentRunner 中调度这个 CompositeAgent

示例代码


                Agent agent1 = new Agent1();
                Agent agent2 = new Agent2();
                Agent composite = new CompositeAgent(agent1, agent2);
                AgentRunner compositeAgentRunner = new AgentRunner(idleStrategy,
                        Throwable::printStackTrace,
                        errorCounter, composite);
                AgentRunner.startOnThread(compositeAgentRunner);
                

详细说明

CompositeAgent 工作原理


                单线程多Agent执行模型:
                ┌────────────────────────────────────────────────────┐
                │              Agent Runner Thread                   │
                │                                                    │
                │  ┌──────────────────────────────────────────┐    │
                │  │         Composite Agent                  │    │
                │  │                                          │    │
                │  │  while (running) {                       │    │
                │  │    ┌──────────────────────────────┐     │    │
                │  │    │ 1. Agent1.doWork()           │     │    │
                │  │    │    └─> 返回工作量: work1     │     │    │
                │  │    └────────────┬─────────────────┘     │    │
                │  │                 │                        │    │
                │  │                 ▼                        │    │
                │  │    ┌──────────────────────────────┐     │    │
                │  │    │ 2. Agent2.doWork()           │     │    │
                │  │    │    └─> 返回工作量: work2     │     │    │
                │  │    └────────────┬─────────────────┘     │    │
                │  │                 │                        │    │
                │  │                 ▼                        │    │
                │  │    ┌──────────────────────────────┐     │    │
                │  │    │ 3. 合计工作量                │     │    │
                │  │    │    totalWork = work1 + work2 │     │    │
                │  │    └────────────┬─────────────────┘     │    │
                │  │                 │                        │    │
                │  │                 ▼                        │    │
                │  │    ┌──────────────────────────────┐     │    │
                │  │    │ 4. 应用空闲策略              │     │    │
                │  │    │    idleStrategy.idle(        │     │    │
                │  │    │        totalWork)             │     │    │
                │  │    └──────────────────────────────┘     │    │
                │  │  }                                       │    │
                │  └──────────────────────────────────────────┘    │
                └────────────────────────────────────────────────────┘
                

执行顺序

CompositeAgent 中的 Agent 按照构造函数中添加的顺序依次执行它们的 duty cycle:


                执行时间线:
                ─────────────────────────────────────────────────────►
                │         │         │         │         │         │
                │ Agent1  │ Agent2  │ Agent1  │ Agent2  │ Agent1  │
                │ doWork  │ doWork  │ doWork  │ doWork  │ doWork  │
                │         │         │         │         │         │
                └─────────┴─────────┴─────────┴─────────┴─────────┘
                  循环1      循环1     循环2     循环2     循环3

                注意: Agent 是顺序执行,不是并行!
                

完整实现示例


                import org.agrona.concurrent.Agent;
                import org.agrona.concurrent.AgentRunner;
                import org.agrona.concurrent.CompositeAgent;
                import org.agrona.concurrent.IdleStrategy;
                import org.agrona.concurrent.BackoffIdleStrategy;

                public class CompositeAgentExample {

                    /**
                     * 第一个Agent: 处理订单
                     */
                    static class OrderProcessingAgent implements Agent {
                        private int processedOrders = 0;

                        @Override
                        public int doWork() {
                            // 模拟处理订单
                            if (hasOrders()) {
                                processOrder();
                                processedOrders++;
                                return 1;  // 完成了1个工作单元
                            }
                            return 0;  // 无工作
                        }

                        @Override
                        public String roleName() {
                            return "order-processor";
                        }

                        private boolean hasOrders() {
                            // 检查是否有订单
                            return Math.random() > 0.5;
                        }

                        private void processOrder() {
                            // 处理订单逻辑
                            System.out.println("处理订单 #" + processedOrders);
                        }
                    }

                    /**
                     * 第二个Agent: 发送心跳
                     */
                    static class HeartbeatAgent implements Agent {
                        private long lastHeartbeat = 0;
                        private final long intervalNs = 1_000_000_000L; // 1秒

                        @Override
                        public int doWork() {
                            long now = System.nanoTime();
                            if (now - lastHeartbeat > intervalNs) {
                                sendHeartbeat();
                                lastHeartbeat = now;
                                return 1;  // 完成了1个工作单元
                            }
                            return 0;  // 无工作
                        }

                        @Override
                        public String roleName() {
                            return "heartbeat-sender";
                        }

                        private void sendHeartbeat() {
                            System.out.println("发送心跳: " + System.currentTimeMillis());
                        }
                    }

                    public static void main(String[] args) {
                        // 创建两个Agent
                        Agent orderAgent = new OrderProcessingAgent();
                        Agent heartbeatAgent = new HeartbeatAgent();

                        // 组合成 CompositeAgent
                        Agent composite = new CompositeAgent(orderAgent, heartbeatAgent);

                        // 创建空闲策略
                        IdleStrategy idleStrategy = new BackoffIdleStrategy(
                            100, 1000, 1_000_000, 10_000_000
                        );

                        // 创建 AgentRunner
                        AgentRunner runner = new AgentRunner(
                            idleStrategy,
                            Throwable::printStackTrace,
                            null,  // 错误计数器(可选)
                            composite
                        );

                        // 在新线程上启动
                        AgentRunner.startOnThread(runner);

                        // 运行10秒后关闭
                        try {
                            Thread.sleep(10_000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                        runner.close();
                    }
                }
                

Media Driver 中的实际应用

当你使用 SHARED 线程策略运行 Media Driver 时,Conductor、Sender 和 Receiver Agent 就是在 CompositeAgent 中调度的:


                Media Driver SHARED 模式:
                ┌─────────────────────────────────────────────┐
                │          单个线程                           │
                │                                             │
                │  ┌───────────────────────────────────┐     │
                │  │  CompositeAgent                   │     │
                │  │  ├─ ConductorAgent               │     │
                │  │  │   └─ 管理连接和资源            │     │
                │  │  │                                │     │
                │  │  ├─ SenderAgent                  │     │
                │  │  │   └─ 发送网络数据包            │     │
                │  │  │                                │     │
                │  │  └─ ReceiverAgent                │     │
                │  │      └─ 接收网络数据包            │     │
                │  └───────────────────────────────────┘     │
                │                                             │
                │  优点: 减少线程开销                        │
                │  缺点: 三个任务共享CPU时间                 │
                └─────────────────────────────────────────────┘

                对比 DEDICATED 模式:
                ┌─────────┐  ┌─────────┐  ┌─────────┐
                │Conductor│  │ Sender  │  │Receiver │
                │ Thread  │  │ Thread  │  │ Thread  │
                └─────────┘  └─────────┘  └─────────┘
                  优点: 并行处理
                  缺点: 更多线程,上下文切换开销
                

线程命名

默认的线程名称是各个 Agent 的 roleName() 的组合。如果需要,可以使用 roleName() 方法设置 CompositeAgent 的线程名称:


                public class CustomCompositeAgent extends CompositeAgent {

                    public CustomCompositeAgent(Agent... agents) {
                        super(agents);
                    }

                    @Override
                    public String roleName() {
                        return "my-custom-composite-thread";
                    }
                }

                // 使用
                Agent composite = new CustomCompositeAgent(agent1, agent2);
                // 线程名将是: "my-custom-composite-thread"
                

工作量聚合机制


                /**
                 * CompositeAgent 如何聚合工作量
                 */
                public class WorkAggregationExample {

                    // CompositeAgent 的 doWork 实现(简化版)
                    public int doWork() {
                        int workCount = 0;

                        // 依次执行所有Agent
                        for (Agent agent : agents) {
                            workCount += agent.doWork();
                        }

                        // 返回总工作量
                        return workCount;
                    }

                    /**
                     * 示例场景:
                     * Agent1.doWork() 返回 3
                     * Agent2.doWork() 返回 0
                     * Agent3.doWork() 返回 5
                     *
                     * CompositeAgent.doWork() 返回 8
                     *
                     * IdleStrategy 会根据 8 决定是否空闲
                     * (大于0,表示有工作,不休眠)
                     */
                }
                

性能考虑


                CompositeAgent 性能特性:
                ┌──────────────────────────────────────────┐
                │ 优势:                                    │
                │ ✓ 减少线程数量                          │
                │ ✓ 降低上下文切换开销                    │
                │ ✓ 简化线程管理                          │
                │ ✓ 共享空闲策略,统一调度                │
                │                                          │
                │ 劣势:                                    │
                │ ✗ Agent 串行执行,无法并行               │
                │ ✗ 一个Agent阻塞会影响其他Agent          │
                │ ✗ 不适合CPU密集型Agent                  │
                │                                          │
                │ 适用场景:                                │
                │ • 多个轻量级Agent                       │
                │ • Agent之间无依赖关系                   │
                │ • 希望减少线程开销                      │
                │ • 任务执行时间短(< 1ms)                 │
                └──────────────────────────────────────────┘
                

替代方案: AgentInvoker

另一种方法是使用 AgentInvoker,在另一个 Agent 的 duty cycle 中运行 invoke():


                public class AgentInvokerExample {

                    /**
                     * 主Agent
                     */
                    static class MainAgent implements Agent {
                        private final AgentInvoker invoker;

                        public MainAgent(Agent delegateAgent) {
                            this.invoker = new AgentInvoker(
                                Throwable::printStackTrace,
                                null,
                                delegateAgent
                            );
                            invoker.start();
                        }

                        @Override
                        public int doWork() {
                            // 先做主Agent的工作
                            int work = doMainWork();

                            // 然后调用其他Agent
                            work += invoker.invoke();

                            return work;
                        }

                        private int doMainWork() {
                            // 主要工作逻辑
                            return 1;
                        }

                        @Override
                        public String roleName() {
                            return "main-agent";
                        }

                        @Override
                        public void onClose() {
                            invoker.close();
                        }
                    }

                    /**
                     * 被委托的Agent
                     */
                    static class DelegateAgent implements Agent {
                        @Override
                        public int doWork() {
                            System.out.println("DelegateAgent 执行");
                            return 1;
                        }

                        @Override
                        public String roleName() {
                            return "delegate-agent";
                        }
                    }

                    public static void main(String[] args) {
                        Agent delegateAgent = new DelegateAgent();
                        Agent mainAgent = new MainAgent(delegateAgent);

                        // 只需要启动MainAgent
                        AgentRunner runner = new AgentRunner(
                            new BackoffIdleStrategy(100, 1000, 1_000_000, 10_000_000),
                            Throwable::printStackTrace,
                            null,
                            mainAgent
                        );

                        AgentRunner.startOnThread(runner);
                    }
                }
                

CompositeAgent vs AgentInvoker 对比


                ┌─────────────────────────────────────────────────┐
                │ CompositeAgent                                  │
                ├─────────────────────────────────────────────────┤
                │ • 声明式组合多个Agent                           │
                │ • 自动管理Agent生命周期                         │
                │ • 适合同等地位的Agent                           │
                │ • 代码更简洁                                    │
                └─────────────────────────────────────────────────┘

                ┌─────────────────────────────────────────────────┐
                │ AgentInvoker                                    │
                ├─────────────────────────────────────────────────┤
                │ • 主从关系明确                                  │
                │ • 主Agent控制调用时机                           │
                │ • 适合有依赖关系的Agent                         │
                │ • 更灵活的控制                                  │
                └─────────────────────────────────────────────────┘

                选择建议:
                • 多个平等Agent → CompositeAgent
                • 主从关系Agent → AgentInvoker
                

动态Agent管理


                /**
                 * 可以动态添加/移除Agent的CompositeAgent
                 */
                public class DynamicCompositeAgent implements Agent {
                    private final List<Agent> agents = new CopyOnWriteArrayList<>();
                    private volatile String roleName = "dynamic-composite";

                    public void addAgent(Agent agent) {
                        agents.add(agent);
                        updateRoleName();
                    }

                    public void removeAgent(Agent agent) {
                        agents.remove(agent);
                        agent.onClose();
                        updateRoleName();
                    }

                    @Override
                    public int doWork() {
                        int workCount = 0;
                        for (Agent agent : agents) {
                            workCount += agent.doWork();
                        }
                        return workCount;
                    }

                    @Override
                    public String roleName() {
                        return roleName;
                    }

                    @Override
                    public void onClose() {
                        agents.forEach(Agent::onClose);
                        agents.clear();
                    }

                    private void updateRoleName() {
                        roleName = agents.stream()
                            .map(Agent::roleName)
                            .collect(Collectors.joining("+"));
                    }
                }
                

错误处理


                /**
                 * 带错误处理的CompositeAgent
                 */
                public class ResilientCompositeAgent implements Agent {
                    private final Agent[] agents;
                    private final ErrorHandler errorHandler;

                    public interface ErrorHandler {
                        void onError(Agent agent, Throwable error);
                    }

                    public ResilientCompositeAgent(ErrorHandler errorHandler, Agent... agents) {
                        this.agents = agents;
                        this.errorHandler = errorHandler;
                    }

                    @Override
                    public int doWork() {
                        int workCount = 0;

                        for (Agent agent : agents) {
                            try {
                                workCount += agent.doWork();
                            } catch (Exception e) {
                                errorHandler.onError(agent, e);
                                // 继续执行其他Agent
                            }
                        }

                        return workCount;
                    }

                    @Override
                    public String roleName() {
                        return Arrays.stream(agents)
                            .map(Agent::roleName)
                            .collect(Collectors.joining("+"));
                    }
                }

                // 使用示例
                ErrorHandler handler = (agent, error) -> {
                    System.err.println("Agent " + agent.roleName() + " 失败: " + error);
                };

                Agent resilient = new ResilientCompositeAgent(
                    handler,
                    new Agent1(),
                    new Agent2(),
                    new Agent3()
                );
                

相关链接

总结

CompositeAgent 是将多个 Agent 组合到单个线程的便捷方式:

核心特性:

  • ✅ Agent 按构造顺序顺序执行
  • ✅ 共享同一个线程和空闲策略
  • ✅ 工作量自动聚合
  • ✅ 减少线程开销

使用场景:

  • 多个轻量级 Agent
  • 希望减少线程数量
  • Agent 无需并行执行
  • Media Driver SHARED 模式

替代方案:

  • AgentInvoker: 适合主从关系的 Agent
  • 独立线程: 适合需要并行的 Agent

注意事项:

  • Agent 是串行执行,不是并行
  • 一个 Agent 阻塞会影响其他 Agent
  • 默认线程名是所有 Agent roleName 的组合
  • 可以重写 roleName() 自定义线程名

选择 CompositeAgent 还是独立线程,取决于你的性能需求和Agent的工作特性。