Concurrent Collections (并发集合)

Concurrent Collections (并发集合)


概述

Agrona提供了一系列高性能的并发集合,专为低延迟、高吞吐量场景设计。这些集合避免了JDK标准库中的性能瓶颈,如锁竞争和内存分配。

核心并发队列

1. ManyToOneConcurrentArrayQueue

多生产者单消费者无锁队列:


                import org.agrona.concurrent.ManyToOneConcurrentArrayQueue;

                // 创建队列
                final ManyToOneConcurrentArrayQueue<Message> queue =
                    new ManyToOneConcurrentArrayQueue<>(1024);

                // 多个生产者线程可以安全地添加
                queue.offer(new Message("data"));

                // 单个消费者线程读取
                final Message msg = queue.poll();
                

特点:

  • 多个生产者,一个消费者
  • 无锁设计
  • 固定容量
  • 高性能

2. OneToOneConcurrentArrayQueue

单生产者单消费者队列:


                import org.agrona.concurrent.OneToOneConcurrentArrayQueue;

                final OneToOneConcurrentArrayQueue<Order> queue =
                    new OneToOneConcurrentArrayQueue<>(1024);

                // 性能最优的队列类型
                

3. ManyToManyConcurrentArrayQueue

多生产者多消费者队列:


                final ManyToManyConcurrentArrayQueue<Event> queue =
                    new ManyToManyConcurrentArrayQueue<>(1024);
                

广播通信

BroadcastTransmitter & BroadcastReceiver

一对多的广播机制:


                import org.agrona.concurrent.broadcast.*;

                // 发送端
                final BroadcastTransmitter transmitter =
                    new BroadcastTransmitter(buffer);

                transmitter.transmit(
                    messageTypeId,
                    srcBuffer,
                    srcIndex,
                    length
                );

                // 接收端
                final BroadcastReceiver receiver =
                    new BroadcastReceiver(buffer);

                receiver.receive(
                    (msgTypeId, buffer, index, length) -> {
                        // 处理消息
                    }
                );
                

环形缓冲区

RingBuffer

高性能环形缓冲区实现:


                import org.agrona.concurrent.ringbuffer.*;

                final RingBuffer ringBuffer = new ManyToOneRingBuffer(
                    new UnsafeBuffer(ByteBuffer.allocateDirect(1024))
                );

                // 写入
                ringBuffer.write(
                    messageTypeId,
                    srcBuffer,
                    srcIndex,
                    length
                );

                // 读取
                ringBuffer.read(
                    (msgTypeId, buffer, index, length) -> {
                        // 处理消息
                    }
                );
                

最佳实践

  1. 根据线程模型选择合适的队列类型
  2. 预分配足够大的容量避免满载
  3. 使用批量操作提高性能
  4. 避免在关键路径上分配对象

总结

Agrona的并发集合为构建高性能系统提供了核心基础设施。