Direct Buffer (直接缓冲区)
概述
DirectBuffer 是 Agrona 中用于高效内存访问的核心组件。它提供了对堆内(on-heap)和堆外(off-heap)内存的统一访问接口,支持类型安全的读写操作,同时避免了不必要的对象分配。
DirectBuffer 接口
package org.agrona;
/**
* DirectBuffer 接口 - 提供对底层字节数组或ByteBuffer的抽象
*/
public interface DirectBuffer
{
// 获取容量
int capacity();
// 检查边界
void checkLimit(int limit);
// 获取字节
byte getByte(int index);
// 获取短整型
short getShort(int index);
short getShort(int index, ByteOrder byteOrder);
// 获取整型
int getInt(int index);
int getInt(int index, ByteOrder byteOrder);
// 获取长整型
long getLong(int index);
long getLong(int index, ByteOrder byteOrder);
// 获取字符串
String getStringAscii(int index);
int getStringAscii(int index, Appendable appendable);
// 获取字节数组
void getBytes(int index, byte[] dst);
void getBytes(int index, byte[] dst, int offset, int length);
void getBytes(int index, ByteBuffer dstBuffer, int length);
// ... 更多方法
}
MutableDirectBuffer 接口
package org.agrona;
/**
* MutableDirectBuffer - 可变的DirectBuffer
*/
public interface MutableDirectBuffer extends DirectBuffer
{
// 设置字节
void putByte(int index, byte value);
// 设置短整型
void putShort(int index, short value);
void putShort(int index, short value, ByteOrder byteOrder);
// 设置整型
void putInt(int index, int value);
void putInt(int index, int value, ByteOrder byteOrder);
// 设置长整型
void putLong(int index, long value);
void putLong(int index, long value, ByteOrder byteOrder);
// 设置字符串
int putStringAscii(int index, String value);
int putStringAscii(int index, CharSequence value);
// 设置字节数组
void putBytes(int index, byte[] src);
void putBytes(int index, byte[] src, int offset, int length);
void putBytes(int index, ByteBuffer srcBuffer, int length);
// ... 更多方法
}
核心实现类
1. UnsafeBuffer
最常用的实现,使用sun.misc.Unsafe进行直接内存访问:
import org.agrona.concurrent.UnsafeBuffer;
import java.nio.ByteBuffer;
/**
* UnsafeBuffer 使用示例
*/
public class UnsafeBufferExample
{
public static void main(String[] args)
{
// 方式1: 包装字节数组
final byte[] byteArray = new byte[1024];
final UnsafeBuffer buffer1 = new UnsafeBuffer(byteArray);
// 方式2: 包装ByteBuffer
final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
final UnsafeBuffer buffer2 = new UnsafeBuffer(byteBuffer);
// 方式3: 直接分配
final UnsafeBuffer buffer3 = new UnsafeBuffer(
BufferUtil.allocateDirectAligned(1024, 64)
);
// 写入数据
buffer1.putInt(0, 12345);
buffer1.putLong(4, 67890L);
buffer1.putStringAscii(12, "Hello World");
// 读取数据
final int intValue = buffer1.getInt(0);
final long longValue = buffer1.getLong(4);
final String stringValue = buffer1.getStringAscii(12);
System.out.println("Int: " + intValue);
System.out.println("Long: " + longValue);
System.out.println("String: " + stringValue);
}
}
2. ExpandableArrayBuffer
可自动扩展的缓冲区:
import org.agrona.ExpandableArrayBuffer;
public class ExpandableBufferExample
{
public static void main(String[] args)
{
// 初始容量为256字节
final ExpandableArrayBuffer buffer =
new ExpandableArrayBuffer(256);
// 写入大量数据 - 自动扩展
for (int i = 0; i < 1000; i++)
{
buffer.putInt(i * 4, i);
}
System.out.println("Capacity: " + buffer.capacity());
// 读取数据
for (int i = 0; i < 10; i++)
{
System.out.println("Value[" + i + "]: " +
buffer.getInt(i * 4));
}
}
}
使用场景
1. 零拷贝消息编解码
import org.agrona.concurrent.UnsafeBuffer;
/**
* 消息编解码器 - 零拷贝
*/
public class MessageCodec
{
// 消息头部长度
private static final int HEADER_LENGTH = 8;
/**
* 编码消息
*/
public static int encode(
final MutableDirectBuffer buffer,
final int offset,
final String payload)
{
int position = offset;
// 写入消息类型
buffer.putInt(position, 1);
position += 4;
// 写入消息长度
final int payloadLength = payload.length();
buffer.putInt(position, payloadLength);
position += 4;
// 写入载荷
position += buffer.putStringAscii(position, payload);
return position - offset;
}
/**
* 解码消息
*/
public static String decode(
final DirectBuffer buffer,
final int offset)
{
int position = offset;
// 读取消息类型
final int messageType = buffer.getInt(position);
position += 4;
// 读取消息长度
final int payloadLength = buffer.getInt(position);
position += 4;
// 读取载荷
return buffer.getStringAscii(position, payloadLength);
}
/**
* 获取消息长度
*/
public static int messageLength(
final DirectBuffer buffer,
final int offset)
{
final int payloadLength = buffer.getInt(offset + 4);
return HEADER_LENGTH + payloadLength;
}
}
2. 高性能数据序列化
/**
* 订单数据结构
*/
public class Order
{
// 字段偏移量
private static final int ORDER_ID_OFFSET = 0;
private static final int SYMBOL_OFFSET = 8;
private static final int SYMBOL_LENGTH = 16;
private static final int PRICE_OFFSET = 24;
private static final int QUANTITY_OFFSET = 32;
private static final int SIZE = 40;
/**
* 将订单写入缓冲区
*/
public static void encode(
final MutableDirectBuffer buffer,
final int offset,
final long orderId,
final String symbol,
final double price,
final int quantity)
{
buffer.putLong(offset + ORDER_ID_OFFSET, orderId);
// 写入固定长度的symbol (填充或截断)
buffer.setMemory(
offset + SYMBOL_OFFSET,
SYMBOL_LENGTH,
(byte)0
);
buffer.putStringAscii(offset + SYMBOL_OFFSET, symbol);
buffer.putDouble(offset + PRICE_OFFSET, price);
buffer.putInt(offset + QUANTITY_OFFSET, quantity);
}
/**
* 从缓冲区读取订单
*/
public static void decode(
final DirectBuffer buffer,
final int offset,
final OrderData orderData)
{
orderData.orderId = buffer.getLong(offset + ORDER_ID_OFFSET);
orderData.symbol = buffer.getStringAscii(
offset + SYMBOL_OFFSET,
SYMBOL_LENGTH
).trim();
orderData.price = buffer.getDouble(offset + PRICE_OFFSET);
orderData.quantity = buffer.getInt(offset + QUANTITY_OFFSET);
}
public static int size()
{
return SIZE;
}
/**
* 订单数据持有类
*/
public static class OrderData
{
public long orderId;
public String symbol;
public double price;
public int quantity;
@Override
public String toString()
{
return String.format(
"Order{id=%d, symbol='%s', price=%.2f, qty=%d}",
orderId, symbol, price, quantity
);
}
}
}
3. 环形缓冲区
import org.agrona.concurrent.UnsafeBuffer;
/**
* 简单的环形缓冲区实现
*/
public class SimpleRingBuffer
{
private final MutableDirectBuffer buffer;
private final int capacity;
private long head = 0;
private long tail = 0;
public SimpleRingBuffer(final int capacity)
{
this.capacity = capacity;
this.buffer = new UnsafeBuffer(
ByteBuffer.allocateDirect(capacity)
);
}
/**
* 写入数据
*/
public boolean write(final byte[] data)
{
final int length = data.length;
// 检查空间
if (tail - head + length > capacity)
{
return false;
}
// 写入长度
final int position = (int)(tail % capacity);
buffer.putInt(position, length);
// 写入数据
final int dataPosition = (position + 4) % capacity;
if (dataPosition + length <= capacity)
{
// 一次性写入
buffer.putBytes(dataPosition, data);
}
else
{
// 分两次写入(跨越边界)
final int firstPart = capacity - dataPosition;
buffer.putBytes(dataPosition, data, 0, firstPart);
buffer.putBytes(0, data, firstPart, length - firstPart);
}
tail += 4 + length;
return true;
}
/**
* 读取数据
*/
public byte[] read()
{
if (head >= tail)
{
return null;
}
// 读取长度
final int position = (int)(head % capacity);
final int length = buffer.getInt(position);
// 读取数据
final byte[] data = new byte[length];
final int dataPosition = (position + 4) % capacity;
if (dataPosition + length <= capacity)
{
buffer.getBytes(dataPosition, data);
}
else
{
final int firstPart = capacity - dataPosition;
buffer.getBytes(dataPosition, data, 0, firstPart);
buffer.getBytes(0, data, firstPart, length - firstPart);
}
head += 4 + length;
return data;
}
}
性能优化技巧
1. 内存对齐
// 使用64字节对齐(缓存行大小)
final ByteBuffer alignedBuffer =
BufferUtil.allocateDirectAligned(1024, 64);
2. 批量操作
// 批量复制比逐个元素快
buffer.putBytes(offset, sourceArray, 0, length);
// 而不是:
for (int i = 0; i < length; i++)
{
buffer.putByte(offset + i, sourceArray[i]);
}
3. 避免边界检查
// Agrona可以禁用边界检查以提高性能
// JVM参数: -Dagrona.disable.bounds.checks=true
// 但在开发阶段建议保持开启
常见陷阱
1. 字节序问题
// 明确指定字节序
buffer.putInt(0, value, ByteOrder.LITTLE_ENDIAN);
// 或使用默认的本机字节序
buffer.putInt(0, value); // 使用ByteOrder.nativeOrder()
2. 内存泄漏
// DirectByteBuffer需要手动释放
public void cleanup()
{
// 使用Agrona的IoUtil
IoUtil.unmap(byteBuffer);
}
3. 并发访问
// DirectBuffer不是线程安全的
// 需要外部同步或使用Agrona的并发数据结构
最佳实践
- 选择合适的实现: UnsafeBuffer用于固定大小,ExpandableArrayBuffer用于动态大小
- 内存对齐: 对于高性能场景,使用缓存行对齐
- 零拷贝: 充分利用DirectBuffer实现零拷贝
- 类型安全: 使用类型化的get/put方法而不是直接操作字节
- 资源管理: 及时释放堆外内存
总结
DirectBuffer是Agrona实现高性能的关键组件:
- 提供统一的内存访问接口
- 支持零拷贝操作
- 避免不必要的对象分配
- 类型安全的读写方法
正确使用DirectBuffer可以显著提升系统性能。