Agents & Idle Strategies (代理与空闲策略)
概述
在 Agrona 中,Agent 是执行 Duty Cycle 的基本单元,而 Idle Strategy(空闲策略)则决定了当 Agent 没有工作可做时应该采取什么行动。这两个概念的结合是实现高性能、低延迟系统的关键。
线程的问题 (The Problem with Threads)
在 2006 年,Edward A. Lee 撰写了一篇技术报告,题为 "The Problem with Threads"(线程的问题)。在报告中,他提出:
> 线程作为计算模型具有极高的不确定性,程序员的工作就变成了修剪这种不确定性。虽然许多研究技术通过提供更有效的修剪来改进模型,但我认为这是在倒着解决问题。我们应该从本质上确定性的、可组合的组件开始构建,而不是从需要移除的地方移除不确定性。不确定性应该在需要的地方明确而审慎地引入,而不是在不需要的地方移除。
Agrona 的 Agent 和 Idle Strategy 正是实现这一理念的方式之一。当 Agrona Agent 与 Aeron 一起使用时,允许以安全、一致的方式构建确定性的、资源管理的线程,这对开发者来说易于理解和推理。
核心设计理念
Agrona Agent 设计哲学
┌─────────────────────────────────────────────────────────────┐
│ 确定性计算模型 │
│ (Deterministic Computation Model) │
└───────────────────────────┬─────────────────────────────────┘
│
┌───────────────────┴───────────────────┐
│ │
▼ ▼
┌───────────────────┐ ┌───────────────────────┐
│ 可组合的组件 │ │ 资源管理的线程 │
│ (Composable) │ │ (Resource Managed) │
└─────────┬─────────┘ └───────────┬───────────┘
│ │
│ ┌──────────────┐ │
└────────►│ Agrona Agent │◄───────────┘
└──────┬───────┘
│
┌──────────────────┼──────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌─────────────────┐ ┌──────────────────┐
│ Duty Cycle │ │ Idle Strategy │ │ AgentRunner │
│ (工作周期) │ │ (空闲策略) │ │ (运行调度器) │
└──────────────┘ └─────────────────┘ └──────────────────┘
关键特性:
- 确定性: Agent 的行为是可预测的
- 可组合: 多个 Agent 可以组合在一起
- 资源管理: 通过 Idle Strategy 精确控制 CPU 使用
- 易于推理: 简单的编程模型,易于理解和调试
Agent (代理)
Agent 的定义
Agent 是一个执行特定职责的工作单元,它持续运行并处理分配给它的任务。
Agrona Agent 是应用程序逻辑的容器,在 Duty Cycle 中执行,例如处理来自 Aeron 订阅的消息。Agent 的 Duty Cycle 间隔以及 CPU 消耗由空闲策略控制。Agent 可以被调度在专用线程上,也可以作为单个线程上的复合代理组的一部分运行。
典型的 Duty Cycle 工作流程:
- 持续轮询 Agent 的
doWork函数,直到它返回 0 - 一旦返回 0,调用空闲策略(Idle Strategy)
- 空闲策略决定如何处理无工作状态(自旋、让出CPU、休眠等)
graph TB
A[Agent] --> B[doWork方法]
A --> C[生命周期管理]
A --> D[角色标识]
B --> B1[返回工作量]
B --> B2[非阻塞执行]
B --> B3[异常处理]
C --> C1[onStart启动]
C --> C2[onClose关闭]
C --> C3[资源管理]
D --> D1[roleName]
D --> D2[日志标识]
D --> D3[监控标识]
style A fill:#4a90e2,stroke:#2e5c8a,color:#fff
style B fill:#50c878,stroke:#2d7a4a,color:#fff
style C fill:#f39c12,stroke:#b8730a,color:#fff
style D fill:#e74c3c,stroke:#a93529,color:#fff
Agent 接口详解
package org.agrona.concurrent;
/**
* Agent 接口定义
*/
public interface Agent
{
/**
* 执行一个工作周期
*
* @return 完成的工作项数量,0表示没有工作
* @throws Exception 执行过程中的任何异常
*/
int doWork() throws Exception;
/**
* 获取 Agent 的角色名称
*
* @return 角色名称字符串
*/
String roleName();
/**
* Agent 启动时调用
*/
default void onStart()
{
}
/**
* Agent 关闭时调用
*/
default void onClose()
{
}
}
Agent 实现示例
基础 Agent 实现
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue;
/**
* 基础的消息接收 Agent
*/
public class MessageReceiverAgent implements Agent
{
private final ManyToOneConcurrentArrayQueue<Message> inputQueue;
private final MessageHandler messageHandler;
private long messagesReceived = 0;
public MessageReceiverAgent(
final ManyToOneConcurrentArrayQueue<Message> inputQueue,
final MessageHandler messageHandler)
{
this.inputQueue = inputQueue;
this.messageHandler = messageHandler;
}
@Override
public int doWork() throws Exception
{
int workDone = 0;
// 从队列中取出并处理消息
Message message;
while ((message = inputQueue.poll()) != null)
{
messageHandler.onMessage(message);
messagesReceived++;
workDone++;
// 限制每次处理的数量,避免长时间占用
if (workDone >= 10)
{
break;
}
}
return workDone;
}
@Override
public String roleName()
{
return "MessageReceiver";
}
@Override
public void onStart()
{
System.out.println("[" + roleName() + "] Agent 启动");
messagesReceived = 0;
}
@Override
public void onClose()
{
System.out.println("[" + roleName() + "] Agent 关闭," +
"总共接收了 " + messagesReceived + " 条消息");
}
public long getMessagesReceived()
{
return messagesReceived;
}
/**
* 消息处理器接口
*/
public interface MessageHandler
{
void onMessage(Message message);
}
/**
* 消息类(示例)
*/
public static class Message
{
private final String content;
public Message(final String content)
{
this.content = content;
}
public String getContent()
{
return content;
}
}
}
复合 Agent 实现
import org.agrona.concurrent.Agent;
/**
* 复合 Agent - 组合多个子 Agent
*/
public class CompositeAgent implements Agent
{
private final Agent[] agents;
private final String roleName;
public CompositeAgent(final String roleName, final Agent... agents)
{
this.roleName = roleName;
this.agents = agents;
}
@Override
public int doWork() throws Exception
{
int totalWork = 0;
// 依次执行所有子 Agent 的工作
for (final Agent agent : agents)
{
totalWork += agent.doWork();
}
return totalWork;
}
@Override
public String roleName()
{
return roleName;
}
@Override
public void onStart()
{
System.out.println("[" + roleName + "] 复合 Agent 启动");
// 启动所有子 Agent
for (final Agent agent : agents)
{
agent.onStart();
}
}
@Override
public void onClose()
{
// 关闭所有子 Agent
for (final Agent agent : agents)
{
agent.onClose();
}
System.out.println("[" + roleName + "] 复合 Agent 关闭");
}
}
Idle Strategy (空闲策略)
空闲策略的重要性
当 Agent 没有工作可做时,IdleStrategy 决定如何使用 CPU 资源。这直接影响:
- 延迟:响应时间的快慢
- CPU 使用率:资源利用效率
- 功耗:能源消耗水平
- 吞吐量:系统整体处理能力
graph TB
A[空闲策略] --> B[低延迟策略]
A --> C[平衡策略]
A --> D[低CPU使用策略]
B --> B1[BusySpinIdleStrategy]
B --> B2[持续自旋]
B --> B3[最低延迟/最高CPU]
C --> C1[BackoffIdleStrategy]
C --> C2[渐进式退避]
C --> C3[平衡延迟与CPU]
D --> D1[SleepingIdleStrategy]
D --> D2[休眠等待]
D --> D3[最低CPU/较高延迟]
style A fill:#4a90e2,stroke:#2e5c8a,color:#fff
style B fill:#e74c3c,stroke:#a93529,color:#fff
style C fill:#f39c12,stroke:#b8730a,color:#fff
style D fill:#50c878,stroke:#2d7a4a,color:#fff
IdleStrategy 接口
package org.agrona.concurrent;
/**
* 空闲策略接口
*/
public interface IdleStrategy
{
/**
* 空闲时调用
*
* @param workCount 最近一次 doWork() 返回的工作量
*/
void idle(int workCount);
/**
* 空闲时调用(无工作量参数)
*/
void idle();
/**
* 重置策略状态
*/
void reset();
}
常用的空闲策略
1. BusySpinIdleStrategy (忙等待策略)
特点:持续自旋,不让出 CPU
package org.agrona.concurrent;
/**
* 忙等待空闲策略
* 最低延迟,但CPU使用率100%
*/
public final class BusySpinIdleStrategy implements IdleStrategy
{
@Override
public void idle(final int workCount)
{
// 什么都不做,持续自旋
}
@Override
public void idle()
{
// 什么都不做,持续自旋
}
@Override
public void reset()
{
// 无状态,无需重置
}
}
使用场景:
- 超低延迟要求(微秒级)
- 独占CPU核心
- 实时交易系统
- 高频消息处理
示例:
import org.agrona.concurrent.*;
public class LowLatencyExample
{
public static void main(String[] args)
{
final Agent agent = new MyTradingAgent();
final IdleStrategy idleStrategy = new BusySpinIdleStrategy();
final AgentRunner runner = new AgentRunner(
idleStrategy,
Throwable::printStackTrace,
null,
agent
);
// 在专用线程上运行
final Thread thread = new Thread(runner);
thread.setName("TradingAgent");
thread.start();
}
}
2. BackoffIdleStrategy (退避策略)
特点:渐进式退避,平衡延迟和 CPU 使用
package org.agrona.concurrent;
/**
* 退避空闲策略
* 在忙等待、让出CPU和休眠之间渐进切换
*/
public final class BackoffIdleStrategy implements IdleStrategy
{
private final long maxSpins; // 最大自旋次数
private final long maxYields; // 最大让出次数
private final long minParkPeriodNs; // 最小休眠时间(纳秒)
private final long maxParkPeriodNs; // 最大休眠时间(纳秒)
private long spins = 0;
private long yields = 0;
private long parks = 0;
public BackoffIdleStrategy(
final long maxSpins,
final long maxYields,
final long minParkPeriodNs,
final long maxParkPeriodNs)
{
this.maxSpins = maxSpins;
this.maxYields = maxYields;
this.minParkPeriodNs = minParkPeriodNs;
this.maxParkPeriodNs = maxParkPeriodNs;
}
@Override
public void idle(final int workCount)
{
if (workCount > 0)
{
reset();
}
else
{
idle();
}
}
@Override
public void idle()
{
// 第一阶段:自旋
if (spins < maxSpins)
{
spins++;
return;
}
// 第二阶段:让出CPU
if (yields < maxYields)
{
yields++;
Thread.yield();
return;
}
// 第三阶段:休眠
parks++;
final long parkTime = Math.min(
minParkPeriodNs << parks,
maxParkPeriodNs
);
LockSupport.parkNanos(parkTime);
}
@Override
public void reset()
{
spins = 0;
yields = 0;
parks = 0;
}
}
退避过程:
stateDiagram-v2
[*] --> 忙自旋
忙自旋 --> 让出CPU: 自旋次数 >= maxSpins
让出CPU --> 短暂休眠: 让出次数 >= maxYields
短暂休眠 --> 长时间休眠: 持续空闲
长时间休眠 --> 长时间休眠: 持续空闲
忙自旋 --> [*]: 有工作
让出CPU --> [*]: 有工作
短暂休眠 --> [*]: 有工作
长时间休眠 --> [*]: 有工作
使用场景:
- 负载波动的系统
- 需要平衡延迟和CPU使用
- 多个Agent共享CPU
- 通用的高性能应用
示例:
// 创建默认的退避策略
final IdleStrategy idleStrategy = new BackoffIdleStrategy(
100, // 最大自旋次数
10, // 最大让出次数
1, // 最小休眠时间 1ns
1000_000 // 最大休眠时间 1ms
);
3. SleepingIdleStrategy (休眠策略)
特点:立即休眠,最低 CPU 使用
package org.agrona.concurrent;
/**
* 休眠空闲策略
* 最低CPU使用,但延迟较高
*/
public final class SleepingIdleStrategy implements IdleStrategy
{
private final long sleepPeriodNs;
public SleepingIdleStrategy(final long sleepPeriodNs)
{
this.sleepPeriodNs = sleepPeriodNs;
}
@Override
public void idle(final int workCount)
{
if (workCount <= 0)
{
idle();
}
}
@Override
public void idle()
{
LockSupport.parkNanos(sleepPeriodNs);
}
@Override
public void reset()
{
// 无状态
}
}
使用场景:
- 后台批处理任务
- 非时间敏感的操作
- 节能场景
- 资源受限环境
示例:
// 每次空闲休眠1毫秒
final IdleStrategy idleStrategy = new SleepingIdleStrategy(1_000_000);
4. YieldingIdleStrategy (让出策略)
特点:调用 Thread.yield() 让出 CPU
package org.agrona.concurrent;
/**
* 让出空闲策略
* 中等延迟和CPU使用
*/
public final class YieldingIdleStrategy implements IdleStrategy
{
@Override
public void idle(final int workCount)
{
if (workCount <= 0)
{
Thread.yield();
}
}
@Override
public void idle()
{
Thread.yield();
}
@Override
public void reset()
{
// 无状态
}
}
使用场景:
- 需要快速响应但不需要极低延迟
- CPU核心数充足
- 多个线程竞争
5. NoOpIdleStrategy (无操作策略)
特点:什么都不做
package org.agrona.concurrent;
/**
* 无操作空闲策略
* 用于测试或特殊场景
*/
public final class NoOpIdleStrategy implements IdleStrategy
{
public static final NoOpIdleStrategy INSTANCE = new NoOpIdleStrategy();
@Override
public void idle(final int workCount)
{
// 什么都不做
}
@Override
public void idle()
{
// 什么都不做
}
@Override
public void reset()
{
// 无状态
}
}
空闲策略对比
graph TB
subgraph 延迟对比
A1[BusySpinIdleStrategy] --> |最低| L1[延迟: ~100ns]
A2[YieldingIdleStrategy] --> |低| L2[延迟: ~1us]
A3[BackoffIdleStrategy] --> |中| L3[延迟: 1us-10ms]
A4[SleepingIdleStrategy] --> |高| L4[延迟: 1ms+]
end
subgraph CPU使用对比
B1[BusySpinIdleStrategy] --> |100%| C1[CPU使用率]
B2[YieldingIdleStrategy] --> |高| C2[CPU使用率]
B3[BackoffIdleStrategy] --> |中| C3[CPU使用率]
B4[SleepingIdleStrategy] --> |低| C4[CPU使用率]
end
style A1 fill:#e74c3c,stroke:#a93529,color:#fff
style A2 fill:#f39c12,stroke:#b8730a,color:#fff
style A3 fill:#3498db,stroke:#2874a6,color:#fff
style A4 fill:#50c878,stroke:#2d7a4a,color:#fff
| 策略 | 延迟 | CPU使用 | 适用场景 | |------|------|---------|----------| | BusySpinIdleStrategy | 最低(~100ns) | 100% | 超低延迟交易系统 | | YieldingIdleStrategy | 低(~1μs) | 高(70-90%) | 快速响应系统 | | BackoffIdleStrategy | 中(1μs-10ms) | 中(10-70%) | 通用高性能应用 | | SleepingIdleStrategy | 高(1ms+) | 低(1-10%) | 后台批处理 |
AgentRunner (Agent 运行器)
AgentRunner 的作用
AgentRunner 负责在线程中运行 Agent,并应用 IdleStrategy。
sequenceDiagram
participant Thread as 工作线程
participant Runner as AgentRunner
participant Agent as Agent
participant Idle as IdleStrategy
Thread->>Runner: start()
Runner->>Agent: onStart()
loop 运行循环
Runner->>Agent: doWork()
Agent-->>Runner: 返回 workCount
alt workCount > 0
Runner->>Idle: reset()
else workCount == 0
Runner->>Idle: idle()
end
Runner->>Runner: 检查运行状态
end
Runner->>Agent: onClose()
Runner-->>Thread: 线程退出
AgentRunner 实现示例
import org.agrona.concurrent.*;
/**
* AgentRunner 使用示例
*/
public class AgentRunnerExample
{
public static void main(String[] args) throws InterruptedException
{
// 创建 Agent
final Agent agent = new MyAgent();
// 创建空闲策略
final IdleStrategy idleStrategy = new BackoffIdleStrategy(
100, 10, 1, 1_000_000
);
// 创建错误处理器
final ErrorHandler errorHandler = throwable ->
{
System.err.println("Agent 发生错误: " + throwable.getMessage());
throwable.printStackTrace();
};
// 创建 AgentRunner
final AgentRunner runner = new AgentRunner(
idleStrategy, // 空闲策略
errorHandler, // 错误处理器
null, // 异常计数器(可选)
agent // Agent实例
);
// 在新线程中启动
final Thread agentThread = AgentRunner.startOnThread(runner);
// 运行一段时间
Thread.sleep(5000);
// 关闭
runner.close();
agentThread.join();
System.out.println("Agent 已关闭");
}
static class MyAgent implements Agent
{
private int workCycle = 0;
@Override
public int doWork()
{
// 模拟工作
if (workCycle % 100 == 0)
{
System.out.println("完成工作周期: " + workCycle);
workCycle = 0;
return 1; // 有工作完成
}
workCycle++;
return 0; // 没有工作
}
@Override
public String roleName()
{
return "MyAgent";
}
@Override
public void onStart()
{
System.out.println("Agent 启动");
}
@Override
public void onClose()
{
System.out.println("Agent 关闭");
}
}
}
高级技巧
1. 动态调整空闲策略
/**
* 动态空闲策略适配器
*/
public class AdaptiveIdleStrategy implements IdleStrategy
{
private IdleStrategy current;
private final IdleStrategy busySpin = new BusySpinIdleStrategy();
private final IdleStrategy backoff = new BackoffIdleStrategy(
100, 10, 1, 1_000_000
);
private long consecutiveIdleCycles = 0;
private static final long THRESHOLD = 1000;
@Override
public void idle(final int workCount)
{
if (workCount > 0)
{
consecutiveIdleCycles = 0;
current = busySpin;
}
else
{
consecutiveIdleCycles++;
if (consecutiveIdleCycles > THRESHOLD)
{
current = backoff;
}
current.idle();
}
}
@Override
public void idle()
{
current.idle();
}
@Override
public void reset()
{
consecutiveIdleCycles = 0;
if (current != null)
{
current.reset();
}
}
}
2. 监控 Agent 性能
/**
* 带监控的 Agent 包装器
*/
public class MonitoredAgent implements Agent
{
private final Agent delegate;
private long totalCycles = 0;
private long busyCycles = 0;
private long totalWork = 0;
private long startTime;
public MonitoredAgent(final Agent delegate)
{
this.delegate = delegate;
}
@Override
public int doWork() throws Exception
{
totalCycles++;
final int work = delegate.doWork();
if (work > 0)
{
busyCycles++;
totalWork += work;
}
return work;
}
@Override
public String roleName()
{
return delegate.roleName() + "-Monitored";
}
@Override
public void onStart()
{
startTime = System.nanoTime();
delegate.onStart();
}
@Override
public void onClose()
{
delegate.onClose();
printStatistics();
}
private void printStatistics()
{
final long runTime = System.nanoTime() - startTime;
final double utilization = totalCycles > 0 ?
(double)busyCycles / totalCycles * 100 : 0;
System.out.println("=== Agent 统计信息 ===");
System.out.println("运行时间: " + runTime / 1_000_000 + " ms");
System.out.println("总周期数: " + totalCycles);
System.out.println("忙碌周期: " + busyCycles);
System.out.println("总工作量: " + totalWork);
System.out.println("利用率: " + String.format("%.2f", utilization) + "%");
System.out.println("平均每周期工作量: " +
String.format("%.2f", (double)totalWork / totalCycles));
}
}
最佳实践
1. 选择合适的空闲策略
// 低延迟场景
IdleStrategy lowLatency = new BusySpinIdleStrategy();
// 通用场景
IdleStrategy general = new BackoffIdleStrategy(100, 10, 1, 1_000_000);
// 低CPU使用场景
IdleStrategy lowCpu = new SleepingIdleStrategy(1_000_000);
2. Agent 设计原则
- 单一职责:每个 Agent 专注一个任务
- 非阻塞:doWork() 方法不应阻塞
- 批量处理:适当批量处理以提高吞吐量
- 准确返回:返回值应准确反映实际工作量
3. 线程配置
// 为Agent配置专用线程
final Thread thread = AgentRunner.startOnThread(runner);
thread.setName("MyAgent-Thread");
thread.setPriority(Thread.MAX_PRIORITY); // 高优先级
// 设置CPU亲和性(需要JNI支持)
// setThreadAffinity(thread, cpuId);
调度 Agent (Scheduling Agents)
要启动 Agent 的 Duty Cycle,你需要决定如何调度它。
调度注意事项
重要提示: 并非所有空闲策略都是线程安全的。通常建议每个被调度的 Agent 使用独立的空闲策略实例。
Agent 调度选项
┌──────────────────────────────────────────────────────┐
│ Agent 调度方式选择 │
└────────────────────┬─────────────────────────────────┘
│
┌────────────┴────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────────┐
│ 单Agent调度 │ │ 多Agent组合调度 │
└────────┬─────────┘ └──────────┬───────────┘
│ │
┌────┴────┐ ┌────┴────┐
│ │ │ │
▼ ▼ ▼ ▼
┌───────┐ ┌────────────┐ ┌─────────────────────┐
│ Agrona│ │ 自定义线程 │ │ CompositeAgent │
│ 线程 │ │ ThreadFactory│ │ (多Agent作为单元) │
└───────┘ └────────────┘ └─────────────────────┘
调度方式详解
1. 使用 Agrona 提供的线程
最简单的方式是让 Agrona 为你的 Agent 创建并管理线程:
final Agent agent = new MyAgent();
final IdleStrategy idleStrategy = new BackoffIdleStrategy(100, 10, 1, 1_000_000);
final AgentRunner runner = new AgentRunner(
idleStrategy,
throwable -> System.err.println("Error: " + throwable),
null, // 错误计数器(可选)
agent
);
// 在 Agrona 提供的新线程上启动
final Thread thread = AgentRunner.startOnThread(runner);
thread.setName("MyAgent-Thread");
优点:
- 简单直接
- Agrona 管理线程生命周期
- 自动处理启动和关闭
2. 使用自定义 ThreadFactory
如果需要更多控制,可以提供自定义的 ThreadFactory:
final ThreadFactory customFactory = runnable -> {
final Thread thread = new Thread(runnable);
thread.setName("CustomAgent");
thread.setPriority(Thread.MAX_PRIORITY);
thread.setDaemon(false);
return thread;
};
final Thread thread = AgentRunner.startOnThread(runner, customFactory);
用途:
- 设置特定的线程属性(名称、优先级、守护状态等)
- 集成到现有的线程管理框架
- 实现自定义的线程监控
3. 使用 CompositeAgent 组合多个 Agent
可以将多个 Agent 组合成一个单元,在同一个线程上调度:
// 创建多个Agent
final Agent receiverAgent = new ReceiverAgent();
final Agent processorAgent = new ProcessorAgent();
final Agent senderAgent = new SenderAgent();
// 组合成CompositeAgent
final Agent compositeAgent = new CompositeAgent(
receiverAgent,
processorAgent,
senderAgent
);
// 作为单个单元调度
final AgentRunner runner = new AgentRunner(
new BackoffIdleStrategy(100, 10, 1, 1_000_000),
Throwable::printStackTrace,
null,
compositeAgent
);
AgentRunner.startOnThread(runner);
ASCII 流程图:
CompositeAgent 执行流程
┌─────────────────────────────────────────┐
│ AgentRunner 启动 │
└───────────────┬─────────────────────────┘
│
▼
┌───────────────────┐
│ CompositeAgent │
│ doWork() │
└────────┬──────────┘
│
┌───────────┴───────────┐
│ 依次调用各Agent的doWork │
└───────────┬───────────┘
│
┌────────────┼────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│Agent 1 │ │Agent 2 │ │Agent 3 │
│doWork() │ │doWork() │ │doWork() │
│return 5 │ │return 0 │ │return 3 │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└────────────┼────────────┘
│
▼
┌─────────────────┐
│ 合计工作量 │
│ totalWork = 8 │
└────────┬────────┘
│
▼
┌─────────┐
│workCount│
│ > 0? │
└─┬─────┬─┘
│是 │否
│ │
▼ ▼
┌─────────┐ ┌──────────────┐
│立即返回 │ │ 应用空闲策略 │
│下一周期 │ │idleStrategy │
└─────────┘ └──────────────┘
优势:
- 资源高效: 多个相关Agent共享一个线程
- 减少上下文切换: 避免线程间切换开销
- 简化管理: 作为单个单元启动和停止
适用场景:
- Agent之间有依赖关系
- Agent处理的是相关的任务
- 需要在单个线程上保证执行顺序
完整调度示例
import org.agrona.concurrent.*;
public class AgentSchedulingExample
{
public static void main(String[] args) throws InterruptedException
{
// 示例1: 单Agent专用线程
scheduleS单Agent();
// 示例2: 组合Agent共享线程
scheduleCompositeAgent();
}
private static void scheduleSingleAgent()
{
final Agent agent = new MyBusinessAgent();
final IdleStrategy idleStrategy = new BusySpinIdleStrategy();
final AgentRunner runner = new AgentRunner(
idleStrategy,
throwable -> System.err.println("Error in agent: " + throwable),
null,
agent
);
// 使用Agrona线程
final Thread thread = AgentRunner.startOnThread(runner);
thread.setName("BusinessAgent-Thread");
}
private static void scheduleCompositeAgent()
{
// 创建多个Agent
final Agent[] agents = {
new ReceiverAgent(),
new ProcessorAgent(),
new SenderAgent()
};
// 创建CompositeAgent
final Agent composite = new CompositeAgent(agents);
// 调度
final AgentRunner runner = new AgentRunner(
new BackoffIdleStrategy(100, 10, 1, 1_000_000),
Throwable::printStackTrace,
null,
composite
);
AgentRunner.startOnThread(runner).setName("Composite-Thread");
}
}
总结
Agent 和 IdleStrategy 是 Agrona 高性能架构的核心:
- Agent 定义了工作单元的执行方式
- IdleStrategy 优化了空闲时的资源使用
- AgentRunner 将两者结合,提供完整的运行框架
- 根据具体场景选择合适的空闲策略至关重要
正确使用这些组件,可以构建出既高性能又资源高效的系统。