Agrona 概述
简介
Agrona 提供了许多有用的高性能通用数据结构和常见支持对象,这些组件被广泛应用于 Simple Binary Encoding (SBE)、Aeron 以及相关项目中。
核心价值
Agrona 是一个专注于高性能、低延迟的 Java 工具库,它为构建高效的并发系统提供了基础组件。这个库的设计理念是避免不必要的内存分配和垃圾回收开销,从而实现极致的性能。
Agrona 在技术栈中的位置
┌─────────────────────┐
│ Aeron Cluster │
│ (分布式共识系统) │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ Aeron Archive │
│ (消息持久化) │
└──────────┬──────────┘
│
┌──────────────────────┼──────────────────────┐
│ │ │
┌────────▼────────┐ ┌───────▼────────┐ ┌────────▼────────┐
│ Aeron │ │ Simple Binary │ │ 自定义应用 │
│ Transport │ │ Encoding │ │ (金融/游戏等) │
│ (消息传输) │ │ (序列化) │ │ │
└────────┬────────┘ └───────┬────────┘ └────────┬────────┘
│ │ │
└──────────────────────┼──────────────────────┘
│
┌──────────▼──────────┐
│ Agrona │
│ (基础工具库) │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ Java 虚拟机 (JVM) │
└─────────────────────┘
架构层次解析:
- 基础层 - Agrona: 提供高性能数据结构、并发原语和内存管理
- 中间层 - 专用框架: SBE处理序列化,Aeron处理网络传输
- 上层 - 高级功能: Archive提供持久化,Cluster提供分布式一致性
- 应用层: 基于以上组件构建业务应用
核心架构组件
┌─────────────────────────── Agrona 核心库 ───────────────────────────┐
│ │
│ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │
│ │ 数据结构层 │ │ 并发工具层 │ │ 缓冲区管理层 │ │
│ ├────────────────┤ ├────────────────┤ ├────────────────┤ │
│ │• 优化的HashMap │ │• 无锁队列 │ │• DirectBuffer │ │
│ │• 原始类型集合 │ │• ManyToOne │ │• UnsafeBuffer │ │
│ │• 高效数组队列 │ │• OneToOne │ │• 堆内/堆外内存 │ │
│ │• 零GC设计 │ │• Broadcast机制 │ │• 零拷贝操作 │ │
│ └────────────────┘ └────────────────┘ └────────────────┘ │
│ │
│ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │
│ │ Agent 框架层 │ │ 时钟系统层 │ │ ID生成器层 │ │
│ ├────────────────┤ ├────────────────┤ ├────────────────┤ │
│ │• Agent 接口 │ │• SystemClock │ │• SnowflakeID │ │
│ │• DutyCycle │ │• CachedClock │ │• UUIDGen │ │
│ │• IdleStrategy │ │• 纳秒精度时间 │ │• 高性能生成 │ │
│ │• 任务调度 │ │• 低开销时间源 │ │• 无锁算法 │ │
│ └────────────────┘ └────────────────┘ └────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────┘
组件详细说明
1. 数据结构层
提供针对性能优化的专用数据结构:
- Int2ObjectHashMap: 整数到对象的映射,避免装箱开销
- Object2ObjectHashMap: 自定义哈希实现,减少碰撞
- ArrayQueue: 基于数组的高性能队列
2. 并发工具层
无锁并发组件,支持高吞吐量线程通信:
- ManyToOneConcurrentArrayQueue: 多生产者单消费者队列
- OneToOneConcurrentArrayQueue: 单生产者单消费者队列
- BroadcastTransmitter/Receiver: 一对多广播通信
3. 缓冲区管理层
直接内存访问能力:
- DirectBuffer: 抽象的内存缓冲区接口
- UnsafeBuffer: 使用sun.misc.Unsafe的高性能实现
- MutableDirectBuffer: 可变的直接缓冲区
4. Agent 框架层
任务调度和执行模型:
- Agent: 可执行任务的抽象
- DutyCycle: 任务执行周期管理
- IdleStrategy: 空闲时的CPU策略
Agent 执行模型流程
启动Agent Runner
│
▼
┌─────────────────────────────────────────────────────┐
│ Agent Runner 主循环 │
│ │
│ while (running) { │
│ ┌─────────────────────────────────────┐ │
│ │ 1. 执行 agent.doWork() │ │
│ │ ├─ 返回工作量计数 │ │
│ │ └─ 0表示空闲, >0表示有工作 │ │
│ └──────────────┬──────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ 2. 检查工作量 │ │
│ └──────────────┬──────────────────────┘ │
│ │ │
│ ┌───────────┴───────────┐ │
│ │ │ │
│ 工作量 > 0 工作量 = 0 │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌──────────────────┐ │
│ │ 继续执行 │ │ 执行空闲策略 │ │
│ │ (无等待) │ │ idleStrategy() │ │
│ └──────┬──────┘ └────────┬─────────┘ │
│ │ │ │
│ │ ┌──────────┴────────┐ │
│ │ │ │ │
│ │ BusySpin Backoff/Park │
│ │ (持续轮询) (让出CPU) │
│ │ │ │ │
│ └───────────┴───────────────────┘ │
│ │ │
│ ▼ │
│ 检查是否需要继续运行 │
│ │
└─────────────────────────────────────────────────────┘
流程解析:
- 工作执行阶段: Agent的doWork()方法被调用,返回执行的工作单元数
- 工作量评估: 根据返回值决定下一步行动
- 空闲策略选择:
- BusySpinIdleStrategy: 持续轮询,最低延迟但占用CPU
- BackoffIdleStrategy: 渐进式退避,平衡延迟和CPU使用
- ParkIdleStrategy: 让出CPU,最低CPU使用但延迟较高
典型使用场景流程
场景一: 高频交易系统中的订单处理
交易请求到达
│
▼
┌──────────────────────────────────────────┐
│ OneToOneConcurrentArrayQueue │
│ (接收端线程写入订单) │
└────────────┬─────────────────────────────┘
│
▼
┌──────────────────────────────────────────┐
│ Order Processing Agent │
│ │
│ doWork() { │
│ ┌──────────────────────┐ │
│ │ 1. 从队列读取订单 │ │
│ └──────────┬───────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ 2. 使用UnsafeBuffer │ │
│ │ 解析订单数据 │ │
│ │ (零拷贝) │ │
│ └──────────┬───────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ 3. 查询Int2Object │ │
│ │ HashMap获取账户 │ │
│ │ (无装箱开销) │ │
│ └──────────┬───────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ 4. 执行业务逻辑 │ │
│ └──────────┬───────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ 5. 写入响应到 │ │
│ │ 输出队列 │ │
│ └──────────────────────┘ │
│ │
│ return processedOrders; // >0 │
│ } │
└──────────────────────────────────────────┘
│
▼
使用BusySpinIdleStrategy
(保持最低延迟)
关键优化点:
- 无锁队列: 避免线程竞争
- 零拷贝: DirectBuffer直接访问内存
- 无GC: 预分配对象池
- 原始类型集合: 避免装箱/拆箱
场景二: 消息广播系统
单一消息源
│
▼
┌──────────────────────────┐
│ BroadcastTransmitter │
│ (使用RingBuffer) │
└─────┬────────────────────┘
│
│ 环形缓冲区结构:
│ ┌───┬───┬───┬───┬───┬───┬───┬───┐
│ │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │
│ └───┴───┴───┴───┴───┴───┴───┴───┘
│ ▲ ▲
│ │ │
│ 读指针 写指针
│
├─────────┬─────────┬─────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│Receiver1│ │Receiver2│ │Receiver3│ │ReceiverN│
│(消费者) │ │(消费者) │ │(消费者) │ │(消费者) │
└─────────┘ └─────────┘ └─────────┘ └─────────┘
广播机制特点:
- 单写多读,无锁设计
- 每个接收者独立追踪读取位置
- 自动处理慢消费者(可选丢弃旧消息)
性能特性与原则
1. 避免对象分配
❌ 错误方式 (频繁GC):
for (int i = 0; i < 1000000; i++) {
Message msg = new Message(); // 每次循环创建新对象
msg.setValue(i);
queue.offer(msg);
}
✅ Agrona方式 (零GC):
MutableDirectBuffer buffer = new UnsafeBuffer(
ByteBuffer.allocateDirect(1024)
);
for (int i = 0; i < 1000000; i++) {
buffer.putInt(0, i); // 重用缓冲区
queue.offer(buffer, 0, 4);
}
2. 缓存友好设计
CPU缓存行对齐 (64字节):
┌─────────────────────────────────────┐
│ Padding (56 bytes) │ 避免伪共享
├─────────────────────────────────────┤
│ 关键数据 (8 bytes) │ 独占缓存行
├─────────────────────────────────────┤
│ Padding (56 bytes) │ 避免伪共享
└─────────────────────────────────────┘
3. 无锁算法
使用CAS (Compare-And-Swap) 代替锁:
┌─────────────────────────────────────┐
│ 传统加锁方式: │
│ lock.acquire() ← 线程阻塞 │
│ // 临界区操作 │
│ lock.release() │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ CAS无锁方式: │
│ do { │
│ old = atomicValue.get() │
│ new = computeNew(old) │
│ } while (!atomicValue.CAS(old,new))│
│ ← 失败重试,无阻塞 │
└─────────────────────────────────────┘
主要API示例
使用DirectBuffer进行零拷贝操作
// 分配堆外内存
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
UnsafeBuffer buffer = new UnsafeBuffer(byteBuffer);
// 写入数据 (无需创建中间对象)
buffer.putLong(0, 123456789L); // 偏移0: long值
buffer.putInt(8, 42); // 偏移8: int值
buffer.putStringAscii(12, "Hello"); // 偏移12: ASCII字符串
// 读取数据 (直接从内存读取)
long value1 = buffer.getLong(0);
int value2 = buffer.getInt(8);
String text = buffer.getStringAscii(12);
使用Agent和IdleStrategy
// 定义Agent
public class MyAgent implements Agent {
@Override
public int doWork() {
// 执行工作并返回工作量
return processMessages();
}
@Override
public String roleName() {
return "message-processor";
}
}
// 启动Agent Runner
IdleStrategy idleStrategy = new BackoffIdleStrategy(
100, 1000, 1_000_000, 100_000_000 // 退避参数
);
AgentRunner runner = new AgentRunner(
idleStrategy,
Throwable::printStackTrace,
null,
new MyAgent()
);
// 在独立线程中运行
AgentRunner.startOnThread(runner);
应用场景
1. 金融交易系统
高频交易需求:
- 微秒级延迟 → BusySpinIdleStrategy
- 无GC停顿 → DirectBuffer + 对象池
- 高吞吐量 → 无锁并发队列
2. 实时数据处理
流式数据处理:
- 持续数据流 → Agent模型
- 背压处理 → 有界队列 + 流控
- 低CPU占用 → 自适应IdleStrategy
3. 游戏服务器
游戏后端需求:
- 确定性延迟 → 避免GC
- 高并发玩家 → 无锁数据结构
- 事件驱动 → Agent + DutyCycle
快速开始
Maven依赖
<dependency>
<groupId>org.agrona</groupId>
<artifactId>agrona</artifactId>
<version>1.21.1</version>
</dependency>
最小示例
import org.agrona.concurrent.*;
import org.agrona.*;
public class QuickStart {
public static void main(String[] args) {
// 创建无锁队列
OneToOneConcurrentArrayQueue<String> queue =
new OneToOneConcurrentArrayQueue<>(1024);
// 生产者
queue.offer("Hello Agrona");
// 消费者
String msg = queue.poll();
System.out.println(msg); // 输出: Hello Agrona
}
}
下一步学习路径
推荐学习顺序:
1. Duty Cycles → 理解任务调度周期
2. Agents & Idle Strategies → 掌握Agent模型
3. Direct Buffer → 学习零拷贝内存操作
4. Concurrent Collections → 使用无锁数据结构
5. Data Structures → 探索专用集合类型
总结
Agrona是构建超低延迟、高吞吐量Java应用的基石。通过以下核心设计实现极致性能:
- ✅ 零垃圾回收: 避免GC停顿
- ✅ 无锁并发: 消除线程竞争
- ✅ 缓存友好: 优化CPU缓存利用
- ✅ 直接内存: 零拷贝数据访问
- ✅ 批处理优化: 摊销固定开销
无论是金融交易、实时通信还是游戏开发,Agrona都能提供坚实的性能保障。