Threads, Agents & Duty Cycles (线程、代理与职责周期)

Threads, Agents & Duty Cycles (线程、代理与职责周期)


概述

本文深入探讨 Agrona 中线程、Agent 和 Duty Cycle 三者之间的关系,以及如何通过合理的设计实现高性能系统。

三者关系


                graph TB
                    T[Thread 线程] --> AR[AgentRunner]
                    AR --> IS[IdleStrategy 空闲策略]
                    AR --> A[Agent 代理]
                    A --> DC[Duty Cycle 职责周期]
                    DC --> DW[doWork 方法]

                    style T fill:#4a90e2,stroke:#2e5c8a,color:#fff
                    style AR fill:#50c878,stroke:#2d7a4a,color:#fff
                    style A fill:#f39c12,stroke:#b8730a,color:#fff
                    style DC fill:#e74c3c,stroke:#a93529,color:#fff
                
  • Thread: 提供执行上下文
  • AgentRunner: 管理Agent的生命周期
  • Agent: 定义工作逻辑
  • Duty Cycle: Agent的单次工作周期
  • IdleStrategy: 控制空闲时的行为

线程模型

1. 专用线程模式

每个Agent运行在独占的线程上:


                // 创建Agent
                final Agent agent = new MyHighPriorityAgent();
                final IdleStrategy idleStrategy = new BusySpinIdleStrategy();

                // 创建Runner
                final AgentRunner runner = new AgentRunner(
                    idleStrategy,
                    Throwable::printStackTrace,
                    null,
                    agent
                );

                // 启动专用线程
                final Thread thread = AgentRunner.startOnThread(runner);
                thread.setName("HighPriority-Agent");
                thread.setPriority(Thread.MAX_PRIORITY);
                

优势: 最低延迟、可预测的性能 劣势: 资源消耗大

2. 共享线程模式

多个Agent共享一个线程:


                // 创建复合Agent
                final Agent compositeAgent = new CompositeAgent(
                    "SharedThread",
                    new ReceiveAgent(),
                    new ProcessAgent(),
                    new SendAgent()
                );

                final AgentRunner runner = new AgentRunner(
                    new BackoffIdleStrategy(100, 10, 1, 1_000_000),
                    Throwable::printStackTrace,
                    null,
                    compositeAgent
                );

                AgentRunner.startOnThread(runner);
                

优势: 资源利用率高 劣势: 可能存在优先级问题

实战示例

高性能消息处理系统


                import org.agrona.concurrent.*;

                /**
                 * 完整的消息处理系统示例
                 */
                public class MessageProcessingSystem
                {
                    // 消息队列
                    private final ManyToOneConcurrentArrayQueue<Message> inputQueue;
                    private final ManyToOneConcurrentArrayQueue<Message> outputQueue;

                    // Agents
                    private final Agent receiverAgent;
                    private final Agent processorAgent;
                    private final Agent senderAgent;

                    // Runners
                    private final AgentRunner receiverRunner;
                    private final AgentRunner processorRunner;
                    private final AgentRunner senderRunner;

                    public MessageProcessingSystem(final int queueSize)
                    {
                        this.inputQueue = new ManyToOneConcurrentArrayQueue<>(queueSize);
                        this.outputQueue = new ManyToOneConcurrentArrayQueue<>(queueSize);

                        // 创建Agents
                        this.receiverAgent = new ReceiverAgent(inputQueue);
                        this.processorAgent = new ProcessorAgent(inputQueue, outputQueue);
                        this.senderAgent = new SenderAgent(outputQueue);

                        // 创建Runners (使用不同的空闲策略)
                        this.receiverRunner = new AgentRunner(
                            new BusySpinIdleStrategy(),  // 接收器需要最低延迟
                            this::handleError,
                            null,
                            receiverAgent
                        );

                        this.processorRunner = new AgentRunner(
                            new BackoffIdleStrategy(100, 10, 1, 1_000_000),
                            this::handleError,
                            null,
                            processorAgent
                        );

                        this.senderRunner = new AgentRunner(
                            new YieldingIdleStrategy(),
                            this::handleError,
                            null,
                            senderAgent
                        );
                    }

                    public void start()
                    {
                        // 启动所有Agent
                        AgentRunner.startOnThread(receiverRunner)
                            .setName("Receiver-Thread");
                        AgentRunner.startOnThread(processorRunner)
                            .setName("Processor-Thread");
                        AgentRunner.startOnThread(senderRunner)
                            .setName("Sender-Thread");
                    }

                    public void shutdown()
                    {
                        receiverRunner.close();
                        processorRunner.close();
                        senderRunner.close();
                    }

                    private void handleError(final Throwable throwable)
                    {
                        System.err.println("Error: " + throwable.getMessage());
                        throwable.printStackTrace();
                    }

                    /**
                     * 接收器Agent
                     */
                    static class ReceiverAgent implements Agent
                    {
                        private final ManyToOneConcurrentArrayQueue<Message> outputQueue;

                        ReceiverAgent(final ManyToOneConcurrentArrayQueue<Message> outputQueue)
                        {
                            this.outputQueue = outputQueue;
                        }

                        @Override
                        public int doWork()
                        {
                            // 从网络接收消息
                            final Message message = receiveFromNetwork();
                            if (message != null)
                            {
                                outputQueue.offer(message);
                                return 1;
                            }
                            return 0;
                        }

                        @Override
                        public String roleName()
                        {
                            return "Receiver";
                        }

                        private Message receiveFromNetwork()
                        {
                            // 实际的网络接收逻辑
                            return null;
                        }
                    }

                    /**
                     * 处理器Agent
                     */
                    static class ProcessorAgent implements Agent
                    {
                        private final ManyToOneConcurrentArrayQueue<Message> inputQueue;
                        private final ManyToOneConcurrentArrayQueue<Message> outputQueue;

                        ProcessorAgent(
                            final ManyToOneConcurrentArrayQueue<Message> inputQueue,
                            final ManyToOneConcurrentArrayQueue<Message> outputQueue)
                        {
                            this.inputQueue = inputQueue;
                            this.outputQueue = outputQueue;
                        }

                        @Override
                        public int doWork()
                        {
                            int workDone = 0;

                            // 批量处理
                            for (int i = 0; i < 10; i++)
                            {
                                final Message message = inputQueue.poll();
                                if (message == null)
                                {
                                    break;
                                }

                                // 处理消息
                                processMessage(message);

                                // 放入输出队列
                                outputQueue.offer(message);
                                workDone++;
                            }

                            return workDone;
                        }

                        @Override
                        public String roleName()
                        {
                            return "Processor";
                        }

                        private void processMessage(final Message message)
                        {
                            // 实际的处理逻辑
                        }
                    }

                    /**
                     * 发送器Agent
                     */
                    static class SenderAgent implements Agent
                    {
                        private final ManyToOneConcurrentArrayQueue<Message> inputQueue;

                        SenderAgent(final ManyToOneConcurrentArrayQueue<Message> inputQueue)
                        {
                            this.inputQueue = inputQueue;
                        }

                        @Override
                        public int doWork()
                        {
                            int workDone = 0;

                            for (int i = 0; i < 10; i++)
                            {
                                final Message message = inputQueue.poll();
                                if (message == null)
                                {
                                    break;
                                }

                                sendToNetwork(message);
                                workDone++;
                            }

                            return workDone;
                        }

                        @Override
                        public String roleName()
                        {
                            return "Sender";
                        }

                        private void sendToNetwork(final Message message)
                        {
                            // 实际的网络发送逻辑
                        }
                    }

                    static class Message
                    {
                        // 消息内容
                    }
                }
                

性能优化技巧

1. CPU亲和性


                // 将线程绑定到特定CPU核心
                public static void setThreadAffinity(final Thread thread, final int cpuId)
                {
                    // 使用JNI或Agrona的ThreadHints
                    ThreadHints.onSpinWait();
                }
                

2. 批量处理


                @Override
                public int doWork()
                {
                    int work = 0;
                    
                    // 批量处理提高吞吐量
                    for (int i = 0; i < BATCH_SIZE; i++)
                    {
                        final Task task = queue.poll();
                        if (task == null) break;
                        
                        processTask(task);
                        work++;
                    }
                    
                    return work;
                }
                

3. 预取优化


                @Override
                public int doWork()
                {
                    int totalWork = 0;
                    
                    // 预取多个数据源
                    totalWork += processInput1();
                    totalWork += processInput2();
                    totalWork += processOutput();
                    
                    return totalWork;
                }
                

最佳实践

  1. 专用线程用于低延迟: 超低延迟场景使用专用线程+BusySpinIdleStrategy
  2. 批量处理提高吞吐: 适当批量处理以摊销开销
  3. 监控和调优: 持续监控并根据实际情况调整
  4. 避免阻塞: doWork方法绝对不能阻塞

总结

理解线程、Agent和Duty Cycle的关系是构建高性能Agrona应用的关键。通过合理的设计和配置,可以实现极致的性能表现。