Direct Buffer (直接缓冲区)

Direct Buffer (直接缓冲区)


概述

DirectBuffer 是 Agrona 中用于高效内存访问的核心组件。它提供了对堆内(on-heap)和堆外(off-heap)内存的统一访问接口,支持类型安全的读写操作,同时避免了不必要的对象分配。

DirectBuffer 接口


                package org.agrona;

                /**
                 * DirectBuffer 接口 - 提供对底层字节数组或ByteBuffer的抽象
                 */
                public interface DirectBuffer
                {
                    // 获取容量
                    int capacity();

                    // 检查边界
                    void checkLimit(int limit);

                    // 获取字节
                    byte getByte(int index);

                    // 获取短整型
                    short getShort(int index);
                    short getShort(int index, ByteOrder byteOrder);

                    // 获取整型
                    int getInt(int index);
                    int getInt(int index, ByteOrder byteOrder);

                    // 获取长整型
                    long getLong(int index);
                    long getLong(int index, ByteOrder byteOrder);

                    // 获取字符串
                    String getStringAscii(int index);
                    int getStringAscii(int index, Appendable appendable);

                    // 获取字节数组
                    void getBytes(int index, byte[] dst);
                    void getBytes(int index, byte[] dst, int offset, int length);
                    void getBytes(int index, ByteBuffer dstBuffer, int length);

                    // ... 更多方法
                }
                

MutableDirectBuffer 接口


                package org.agrona;

                /**
                 * MutableDirectBuffer - 可变的DirectBuffer
                 */
                public interface MutableDirectBuffer extends DirectBuffer
                {
                    // 设置字节
                    void putByte(int index, byte value);

                    // 设置短整型
                    void putShort(int index, short value);
                    void putShort(int index, short value, ByteOrder byteOrder);

                    // 设置整型
                    void putInt(int index, int value);
                    void putInt(int index, int value, ByteOrder byteOrder);

                    // 设置长整型
                    void putLong(int index, long value);
                    void putLong(int index, long value, ByteOrder byteOrder);

                    // 设置字符串
                    int putStringAscii(int index, String value);
                    int putStringAscii(int index, CharSequence value);

                    // 设置字节数组
                    void putBytes(int index, byte[] src);
                    void putBytes(int index, byte[] src, int offset, int length);
                    void putBytes(int index, ByteBuffer srcBuffer, int length);

                    // ... 更多方法
                }
                

核心实现类

1. UnsafeBuffer

最常用的实现,使用sun.misc.Unsafe进行直接内存访问:


                import org.agrona.concurrent.UnsafeBuffer;
                import java.nio.ByteBuffer;

                /**
                 * UnsafeBuffer 使用示例
                 */
                public class UnsafeBufferExample
                {
                    public static void main(String[] args)
                    {
                        // 方式1: 包装字节数组
                        final byte[] byteArray = new byte[1024];
                        final UnsafeBuffer buffer1 = new UnsafeBuffer(byteArray);

                        // 方式2: 包装ByteBuffer
                        final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
                        final UnsafeBuffer buffer2 = new UnsafeBuffer(byteBuffer);

                        // 方式3: 直接分配
                        final UnsafeBuffer buffer3 = new UnsafeBuffer(
                            BufferUtil.allocateDirectAligned(1024, 64)
                        );

                        // 写入数据
                        buffer1.putInt(0, 12345);
                        buffer1.putLong(4, 67890L);
                        buffer1.putStringAscii(12, "Hello World");

                        // 读取数据
                        final int intValue = buffer1.getInt(0);
                        final long longValue = buffer1.getLong(4);
                        final String stringValue = buffer1.getStringAscii(12);

                        System.out.println("Int: " + intValue);
                        System.out.println("Long: " + longValue);
                        System.out.println("String: " + stringValue);
                    }
                }
                

2. ExpandableArrayBuffer

可自动扩展的缓冲区:


                import org.agrona.ExpandableArrayBuffer;

                public class ExpandableBufferExample
                {
                    public static void main(String[] args)
                    {
                        // 初始容量为256字节
                        final ExpandableArrayBuffer buffer =
                            new ExpandableArrayBuffer(256);

                        // 写入大量数据 - 自动扩展
                        for (int i = 0; i < 1000; i++)
                        {
                            buffer.putInt(i * 4, i);
                        }

                        System.out.println("Capacity: " + buffer.capacity());

                        // 读取数据
                        for (int i = 0; i < 10; i++)
                        {
                            System.out.println("Value[" + i + "]: " +
                                buffer.getInt(i * 4));
                        }
                    }
                }
                

使用场景

1. 零拷贝消息编解码


                import org.agrona.concurrent.UnsafeBuffer;

                /**
                 * 消息编解码器 - 零拷贝
                 */
                public class MessageCodec
                {
                    // 消息头部长度
                    private static final int HEADER_LENGTH = 8;

                    /**
                     * 编码消息
                     */
                    public static int encode(
                        final MutableDirectBuffer buffer,
                        final int offset,
                        final String payload)
                    {
                        int position = offset;

                        // 写入消息类型
                        buffer.putInt(position, 1);
                        position += 4;

                        // 写入消息长度
                        final int payloadLength = payload.length();
                        buffer.putInt(position, payloadLength);
                        position += 4;

                        // 写入载荷
                        position += buffer.putStringAscii(position, payload);

                        return position - offset;
                    }

                    /**
                     * 解码消息
                     */
                    public static String decode(
                        final DirectBuffer buffer,
                        final int offset)
                    {
                        int position = offset;

                        // 读取消息类型
                        final int messageType = buffer.getInt(position);
                        position += 4;

                        // 读取消息长度
                        final int payloadLength = buffer.getInt(position);
                        position += 4;

                        // 读取载荷
                        return buffer.getStringAscii(position, payloadLength);
                    }

                    /**
                     * 获取消息长度
                     */
                    public static int messageLength(
                        final DirectBuffer buffer,
                        final int offset)
                    {
                        final int payloadLength = buffer.getInt(offset + 4);
                        return HEADER_LENGTH + payloadLength;
                    }
                }
                

2. 高性能数据序列化


                /**
                 * 订单数据结构
                 */
                public class Order
                {
                    // 字段偏移量
                    private static final int ORDER_ID_OFFSET = 0;
                    private static final int SYMBOL_OFFSET = 8;
                    private static final int SYMBOL_LENGTH = 16;
                    private static final int PRICE_OFFSET = 24;
                    private static final int QUANTITY_OFFSET = 32;
                    private static final int SIZE = 40;

                    /**
                     * 将订单写入缓冲区
                     */
                    public static void encode(
                        final MutableDirectBuffer buffer,
                        final int offset,
                        final long orderId,
                        final String symbol,
                        final double price,
                        final int quantity)
                    {
                        buffer.putLong(offset + ORDER_ID_OFFSET, orderId);

                        // 写入固定长度的symbol (填充或截断)
                        buffer.setMemory(
                            offset + SYMBOL_OFFSET,
                            SYMBOL_LENGTH,
                            (byte)0
                        );
                        buffer.putStringAscii(offset + SYMBOL_OFFSET, symbol);

                        buffer.putDouble(offset + PRICE_OFFSET, price);
                        buffer.putInt(offset + QUANTITY_OFFSET, quantity);
                    }

                    /**
                     * 从缓冲区读取订单
                     */
                    public static void decode(
                        final DirectBuffer buffer,
                        final int offset,
                        final OrderData orderData)
                    {
                        orderData.orderId = buffer.getLong(offset + ORDER_ID_OFFSET);
                        orderData.symbol = buffer.getStringAscii(
                            offset + SYMBOL_OFFSET,
                            SYMBOL_LENGTH
                        ).trim();
                        orderData.price = buffer.getDouble(offset + PRICE_OFFSET);
                        orderData.quantity = buffer.getInt(offset + QUANTITY_OFFSET);
                    }

                    public static int size()
                    {
                        return SIZE;
                    }

                    /**
                     * 订单数据持有类
                     */
                    public static class OrderData
                    {
                        public long orderId;
                        public String symbol;
                        public double price;
                        public int quantity;

                        @Override
                        public String toString()
                        {
                            return String.format(
                                "Order{id=%d, symbol='%s', price=%.2f, qty=%d}",
                                orderId, symbol, price, quantity
                            );
                        }
                    }
                }
                

3. 环形缓冲区


                import org.agrona.concurrent.UnsafeBuffer;

                /**
                 * 简单的环形缓冲区实现
                 */
                public class SimpleRingBuffer
                {
                    private final MutableDirectBuffer buffer;
                    private final int capacity;
                    private long head = 0;
                    private long tail = 0;

                    public SimpleRingBuffer(final int capacity)
                    {
                        this.capacity = capacity;
                        this.buffer = new UnsafeBuffer(
                            ByteBuffer.allocateDirect(capacity)
                        );
                    }

                    /**
                     * 写入数据
                     */
                    public boolean write(final byte[] data)
                    {
                        final int length = data.length;

                        // 检查空间
                        if (tail - head + length > capacity)
                        {
                            return false;
                        }

                        // 写入长度
                        final int position = (int)(tail % capacity);
                        buffer.putInt(position, length);

                        // 写入数据
                        final int dataPosition = (position + 4) % capacity;

                        if (dataPosition + length <= capacity)
                        {
                            // 一次性写入
                            buffer.putBytes(dataPosition, data);
                        }
                        else
                        {
                            // 分两次写入(跨越边界)
                            final int firstPart = capacity - dataPosition;
                            buffer.putBytes(dataPosition, data, 0, firstPart);
                            buffer.putBytes(0, data, firstPart, length - firstPart);
                        }

                        tail += 4 + length;
                        return true;
                    }

                    /**
                     * 读取数据
                     */
                    public byte[] read()
                    {
                        if (head >= tail)
                        {
                            return null;
                        }

                        // 读取长度
                        final int position = (int)(head % capacity);
                        final int length = buffer.getInt(position);

                        // 读取数据
                        final byte[] data = new byte[length];
                        final int dataPosition = (position + 4) % capacity;

                        if (dataPosition + length <= capacity)
                        {
                            buffer.getBytes(dataPosition, data);
                        }
                        else
                        {
                            final int firstPart = capacity - dataPosition;
                            buffer.getBytes(dataPosition, data, 0, firstPart);
                            buffer.getBytes(0, data, firstPart, length - firstPart);
                        }

                        head += 4 + length;
                        return data;
                    }
                }
                

性能优化技巧

1. 内存对齐


                // 使用64字节对齐(缓存行大小)
                final ByteBuffer alignedBuffer =
                    BufferUtil.allocateDirectAligned(1024, 64);
                

2. 批量操作


                // 批量复制比逐个元素快
                buffer.putBytes(offset, sourceArray, 0, length);

                // 而不是:
                for (int i = 0; i < length; i++)
                {
                    buffer.putByte(offset + i, sourceArray[i]);
                }
                

3. 避免边界检查


                // Agrona可以禁用边界检查以提高性能
                // JVM参数: -Dagrona.disable.bounds.checks=true

                // 但在开发阶段建议保持开启
                

常见陷阱

1. 字节序问题


                // 明确指定字节序
                buffer.putInt(0, value, ByteOrder.LITTLE_ENDIAN);

                // 或使用默认的本机字节序
                buffer.putInt(0, value);  // 使用ByteOrder.nativeOrder()
                

2. 内存泄漏


                // DirectByteBuffer需要手动释放
                public void cleanup()
                {
                    // 使用Agrona的IoUtil
                    IoUtil.unmap(byteBuffer);
                }
                

3. 并发访问


                // DirectBuffer不是线程安全的
                // 需要外部同步或使用Agrona的并发数据结构
                

最佳实践

  1. 选择合适的实现: UnsafeBuffer用于固定大小,ExpandableArrayBuffer用于动态大小
  2. 内存对齐: 对于高性能场景,使用缓存行对齐
  3. 零拷贝: 充分利用DirectBuffer实现零拷贝
  4. 类型安全: 使用类型化的get/put方法而不是直接操作字节
  5. 资源管理: 及时释放堆外内存

总结

DirectBuffer是Agrona实现高性能的关键组件:

  • 提供统一的内存访问接口
  • 支持零拷贝操作
  • 避免不必要的对象分配
  • 类型安全的读写方法

正确使用DirectBuffer可以显著提升系统性能。