如何将多个 Agent 组合到单个线程上
问题描述
你已经将进程构建为多个 Agent,但希望其中几个 Agent 在同一个线程上运行。
解决方案
将你的 Agent 组合成一个 CompositeAgent,然后在 AgentRunner 中调度这个 CompositeAgent。
示例代码
Agent agent1 = new Agent1();
Agent agent2 = new Agent2();
Agent composite = new CompositeAgent(agent1, agent2);
AgentRunner compositeAgentRunner = new AgentRunner(idleStrategy,
Throwable::printStackTrace,
errorCounter, composite);
AgentRunner.startOnThread(compositeAgentRunner);
详细说明
CompositeAgent 工作原理
单线程多Agent执行模型:
┌────────────────────────────────────────────────────┐
│ Agent Runner Thread │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ Composite Agent │ │
│ │ │ │
│ │ while (running) { │ │
│ │ ┌──────────────────────────────┐ │ │
│ │ │ 1. Agent1.doWork() │ │ │
│ │ │ └─> 返回工作量: work1 │ │ │
│ │ └────────────┬─────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────────────┐ │ │
│ │ │ 2. Agent2.doWork() │ │ │
│ │ │ └─> 返回工作量: work2 │ │ │
│ │ └────────────┬─────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────────────┐ │ │
│ │ │ 3. 合计工作量 │ │ │
│ │ │ totalWork = work1 + work2 │ │ │
│ │ └────────────┬─────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────────────┐ │ │
│ │ │ 4. 应用空闲策略 │ │ │
│ │ │ idleStrategy.idle( │ │ │
│ │ │ totalWork) │ │ │
│ │ └──────────────────────────────┘ │ │
│ │ } │ │
│ └──────────────────────────────────────────┘ │
└────────────────────────────────────────────────────┘
执行顺序
CompositeAgent 中的 Agent 按照构造函数中添加的顺序依次执行它们的 duty cycle:
执行时间线:
─────────────────────────────────────────────────────►
│ │ │ │ │ │
│ Agent1 │ Agent2 │ Agent1 │ Agent2 │ Agent1 │
│ doWork │ doWork │ doWork │ doWork │ doWork │
│ │ │ │ │ │
└─────────┴─────────┴─────────┴─────────┴─────────┘
循环1 循环1 循环2 循环2 循环3
注意: Agent 是顺序执行,不是并行!
完整实现示例
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AgentRunner;
import org.agrona.concurrent.CompositeAgent;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.BackoffIdleStrategy;
public class CompositeAgentExample {
/**
* 第一个Agent: 处理订单
*/
static class OrderProcessingAgent implements Agent {
private int processedOrders = 0;
@Override
public int doWork() {
// 模拟处理订单
if (hasOrders()) {
processOrder();
processedOrders++;
return 1; // 完成了1个工作单元
}
return 0; // 无工作
}
@Override
public String roleName() {
return "order-processor";
}
private boolean hasOrders() {
// 检查是否有订单
return Math.random() > 0.5;
}
private void processOrder() {
// 处理订单逻辑
System.out.println("处理订单 #" + processedOrders);
}
}
/**
* 第二个Agent: 发送心跳
*/
static class HeartbeatAgent implements Agent {
private long lastHeartbeat = 0;
private final long intervalNs = 1_000_000_000L; // 1秒
@Override
public int doWork() {
long now = System.nanoTime();
if (now - lastHeartbeat > intervalNs) {
sendHeartbeat();
lastHeartbeat = now;
return 1; // 完成了1个工作单元
}
return 0; // 无工作
}
@Override
public String roleName() {
return "heartbeat-sender";
}
private void sendHeartbeat() {
System.out.println("发送心跳: " + System.currentTimeMillis());
}
}
public static void main(String[] args) {
// 创建两个Agent
Agent orderAgent = new OrderProcessingAgent();
Agent heartbeatAgent = new HeartbeatAgent();
// 组合成 CompositeAgent
Agent composite = new CompositeAgent(orderAgent, heartbeatAgent);
// 创建空闲策略
IdleStrategy idleStrategy = new BackoffIdleStrategy(
100, 1000, 1_000_000, 10_000_000
);
// 创建 AgentRunner
AgentRunner runner = new AgentRunner(
idleStrategy,
Throwable::printStackTrace,
null, // 错误计数器(可选)
composite
);
// 在新线程上启动
AgentRunner.startOnThread(runner);
// 运行10秒后关闭
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
runner.close();
}
}
Media Driver 中的实际应用
当你使用 SHARED 线程策略运行 Media Driver 时,Conductor、Sender 和 Receiver Agent 就是在 CompositeAgent 中调度的:
Media Driver SHARED 模式:
┌─────────────────────────────────────────────┐
│ 单个线程 │
│ │
│ ┌───────────────────────────────────┐ │
│ │ CompositeAgent │ │
│ │ ├─ ConductorAgent │ │
│ │ │ └─ 管理连接和资源 │ │
│ │ │ │ │
│ │ ├─ SenderAgent │ │
│ │ │ └─ 发送网络数据包 │ │
│ │ │ │ │
│ │ └─ ReceiverAgent │ │
│ │ └─ 接收网络数据包 │ │
│ └───────────────────────────────────┘ │
│ │
│ 优点: 减少线程开销 │
│ 缺点: 三个任务共享CPU时间 │
└─────────────────────────────────────────────┘
对比 DEDICATED 模式:
┌─────────┐ ┌─────────┐ ┌─────────┐
│Conductor│ │ Sender │ │Receiver │
│ Thread │ │ Thread │ │ Thread │
└─────────┘ └─────────┘ └─────────┘
优点: 并行处理
缺点: 更多线程,上下文切换开销
线程命名
默认的线程名称是各个 Agent 的 roleName() 的组合。如果需要,可以使用 roleName() 方法设置 CompositeAgent 的线程名称:
public class CustomCompositeAgent extends CompositeAgent {
public CustomCompositeAgent(Agent... agents) {
super(agents);
}
@Override
public String roleName() {
return "my-custom-composite-thread";
}
}
// 使用
Agent composite = new CustomCompositeAgent(agent1, agent2);
// 线程名将是: "my-custom-composite-thread"
工作量聚合机制
/**
* CompositeAgent 如何聚合工作量
*/
public class WorkAggregationExample {
// CompositeAgent 的 doWork 实现(简化版)
public int doWork() {
int workCount = 0;
// 依次执行所有Agent
for (Agent agent : agents) {
workCount += agent.doWork();
}
// 返回总工作量
return workCount;
}
/**
* 示例场景:
* Agent1.doWork() 返回 3
* Agent2.doWork() 返回 0
* Agent3.doWork() 返回 5
*
* CompositeAgent.doWork() 返回 8
*
* IdleStrategy 会根据 8 决定是否空闲
* (大于0,表示有工作,不休眠)
*/
}
性能考虑
CompositeAgent 性能特性:
┌──────────────────────────────────────────┐
│ 优势: │
│ ✓ 减少线程数量 │
│ ✓ 降低上下文切换开销 │
│ ✓ 简化线程管理 │
│ ✓ 共享空闲策略,统一调度 │
│ │
│ 劣势: │
│ ✗ Agent 串行执行,无法并行 │
│ ✗ 一个Agent阻塞会影响其他Agent │
│ ✗ 不适合CPU密集型Agent │
│ │
│ 适用场景: │
│ • 多个轻量级Agent │
│ • Agent之间无依赖关系 │
│ • 希望减少线程开销 │
│ • 任务执行时间短(< 1ms) │
└──────────────────────────────────────────┘
替代方案: AgentInvoker
另一种方法是使用 AgentInvoker,在另一个 Agent 的 duty cycle 中运行 invoke():
public class AgentInvokerExample {
/**
* 主Agent
*/
static class MainAgent implements Agent {
private final AgentInvoker invoker;
public MainAgent(Agent delegateAgent) {
this.invoker = new AgentInvoker(
Throwable::printStackTrace,
null,
delegateAgent
);
invoker.start();
}
@Override
public int doWork() {
// 先做主Agent的工作
int work = doMainWork();
// 然后调用其他Agent
work += invoker.invoke();
return work;
}
private int doMainWork() {
// 主要工作逻辑
return 1;
}
@Override
public String roleName() {
return "main-agent";
}
@Override
public void onClose() {
invoker.close();
}
}
/**
* 被委托的Agent
*/
static class DelegateAgent implements Agent {
@Override
public int doWork() {
System.out.println("DelegateAgent 执行");
return 1;
}
@Override
public String roleName() {
return "delegate-agent";
}
}
public static void main(String[] args) {
Agent delegateAgent = new DelegateAgent();
Agent mainAgent = new MainAgent(delegateAgent);
// 只需要启动MainAgent
AgentRunner runner = new AgentRunner(
new BackoffIdleStrategy(100, 1000, 1_000_000, 10_000_000),
Throwable::printStackTrace,
null,
mainAgent
);
AgentRunner.startOnThread(runner);
}
}
CompositeAgent vs AgentInvoker 对比
┌─────────────────────────────────────────────────┐
│ CompositeAgent │
├─────────────────────────────────────────────────┤
│ • 声明式组合多个Agent │
│ • 自动管理Agent生命周期 │
│ • 适合同等地位的Agent │
│ • 代码更简洁 │
└─────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────┐
│ AgentInvoker │
├─────────────────────────────────────────────────┤
│ • 主从关系明确 │
│ • 主Agent控制调用时机 │
│ • 适合有依赖关系的Agent │
│ • 更灵活的控制 │
└─────────────────────────────────────────────────┘
选择建议:
• 多个平等Agent → CompositeAgent
• 主从关系Agent → AgentInvoker
动态Agent管理
/**
* 可以动态添加/移除Agent的CompositeAgent
*/
public class DynamicCompositeAgent implements Agent {
private final List<Agent> agents = new CopyOnWriteArrayList<>();
private volatile String roleName = "dynamic-composite";
public void addAgent(Agent agent) {
agents.add(agent);
updateRoleName();
}
public void removeAgent(Agent agent) {
agents.remove(agent);
agent.onClose();
updateRoleName();
}
@Override
public int doWork() {
int workCount = 0;
for (Agent agent : agents) {
workCount += agent.doWork();
}
return workCount;
}
@Override
public String roleName() {
return roleName;
}
@Override
public void onClose() {
agents.forEach(Agent::onClose);
agents.clear();
}
private void updateRoleName() {
roleName = agents.stream()
.map(Agent::roleName)
.collect(Collectors.joining("+"));
}
}
错误处理
/**
* 带错误处理的CompositeAgent
*/
public class ResilientCompositeAgent implements Agent {
private final Agent[] agents;
private final ErrorHandler errorHandler;
public interface ErrorHandler {
void onError(Agent agent, Throwable error);
}
public ResilientCompositeAgent(ErrorHandler errorHandler, Agent... agents) {
this.agents = agents;
this.errorHandler = errorHandler;
}
@Override
public int doWork() {
int workCount = 0;
for (Agent agent : agents) {
try {
workCount += agent.doWork();
} catch (Exception e) {
errorHandler.onError(agent, e);
// 继续执行其他Agent
}
}
return workCount;
}
@Override
public String roleName() {
return Arrays.stream(agents)
.map(Agent::roleName)
.collect(Collectors.joining("+"));
}
}
// 使用示例
ErrorHandler handler = (agent, error) -> {
System.err.println("Agent " + agent.roleName() + " 失败: " + error);
};
Agent resilient = new ResilientCompositeAgent(
handler,
new Agent1(),
new Agent2(),
new Agent3()
);
相关链接
总结
CompositeAgent 是将多个 Agent 组合到单个线程的便捷方式:
核心特性:
- ✅ Agent 按构造顺序顺序执行
- ✅ 共享同一个线程和空闲策略
- ✅ 工作量自动聚合
- ✅ 减少线程开销
使用场景:
- 多个轻量级 Agent
- 希望减少线程数量
- Agent 无需并行执行
- Media Driver SHARED 模式
替代方案:
AgentInvoker: 适合主从关系的 Agent- 独立线程: 适合需要并行的 Agent
注意事项:
- Agent 是串行执行,不是并行
- 一个 Agent 阻塞会影响其他 Agent
- 默认线程名是所有 Agent roleName 的组合
- 可以重写
roleName()自定义线程名
选择 CompositeAgent 还是独立线程,取决于你的性能需求和Agent的工作特性。