Concurrent Collections (并发集合)
概述
Agrona提供了一系列高性能的并发集合,专为低延迟、高吞吐量场景设计。这些集合避免了JDK标准库中的性能瓶颈,如锁竞争和内存分配。
核心并发队列
1. ManyToOneConcurrentArrayQueue
多生产者单消费者无锁队列:
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue;
// 创建队列
final ManyToOneConcurrentArrayQueue<Message> queue =
new ManyToOneConcurrentArrayQueue<>(1024);
// 多个生产者线程可以安全地添加
queue.offer(new Message("data"));
// 单个消费者线程读取
final Message msg = queue.poll();
特点:
- 多个生产者,一个消费者
- 无锁设计
- 固定容量
- 高性能
2. OneToOneConcurrentArrayQueue
单生产者单消费者队列:
import org.agrona.concurrent.OneToOneConcurrentArrayQueue;
final OneToOneConcurrentArrayQueue<Order> queue =
new OneToOneConcurrentArrayQueue<>(1024);
// 性能最优的队列类型
3. ManyToManyConcurrentArrayQueue
多生产者多消费者队列:
final ManyToManyConcurrentArrayQueue<Event> queue =
new ManyToManyConcurrentArrayQueue<>(1024);
广播通信
BroadcastTransmitter & BroadcastReceiver
一对多的广播机制:
import org.agrona.concurrent.broadcast.*;
// 发送端
final BroadcastTransmitter transmitter =
new BroadcastTransmitter(buffer);
transmitter.transmit(
messageTypeId,
srcBuffer,
srcIndex,
length
);
// 接收端
final BroadcastReceiver receiver =
new BroadcastReceiver(buffer);
receiver.receive(
(msgTypeId, buffer, index, length) -> {
// 处理消息
}
);
环形缓冲区
RingBuffer
高性能环形缓冲区实现:
import org.agrona.concurrent.ringbuffer.*;
final RingBuffer ringBuffer = new ManyToOneRingBuffer(
new UnsafeBuffer(ByteBuffer.allocateDirect(1024))
);
// 写入
ringBuffer.write(
messageTypeId,
srcBuffer,
srcIndex,
length
);
// 读取
ringBuffer.read(
(msgTypeId, buffer, index, length) -> {
// 处理消息
}
);
最佳实践
- 根据线程模型选择合适的队列类型
- 预分配足够大的容量避免满载
- 使用批量操作提高性能
- 避免在关键路径上分配对象
总结
Agrona的并发集合为构建高性能系统提供了核心基础设施。