Duty Cycles (职责周期)

Duty Cycles (职责周期)


概述

Duty Cycle(职责周期)是 Agrona 中一个核心概念,它定义了一个工作单元在执行过程中应该完成的任务。在高性能系统中,合理设计 Duty Cycle 对于实现低延迟和高吞吐量至关重要。

什么是 Duty Cycle?

Duty Cycle 本质上是一个可重复执行的工作单元。在 Agrona 的上下文中,它通常指的是 Agent 在每次被调度时执行的工作量。

如果你玩过电脑游戏,你很可能已经与 Duty Cycle(也称为事件循环 Event Loop)打过交道。Duty Cycle 是系统组件执行的主循环。在循环中执行一些任务,然后可以选择等待一段时间。

游戏循环示例

对于简单的电脑游戏,Duty Cycle 可能是这样的:


                EpochClock clock = new SystemEpochClock();
                while (true)
                {
                    long time = clock.time();
                    processInput();    // 处理输入
                    update();          // 更新游戏状态
                    render();          // 渲染画面
                    Thread.sleep(MS_PER_FRAME - (clock.time() - time));  // 控制帧率
                }
                

这里的 sleep 允许游戏开发者控制两件事:

  • 游戏的功耗: 通过限制CPU使用率降低能耗
  • 每秒帧数: 确保一致的游戏体验

这很重要,因为不同的 CPU 具有不同的性能特征,游戏开发者希望用户在不同硬件上获得一致的体验。

在 Aeron/Agrona 中的应用

在典型的 Aeron 应用程序中,我们也使用 Duty Cycle。它们在 Agrona Agent 内部运行,游戏示例中的 sleep空闲策略(Idle Strategy)管理。Duty Cycle 直接影响:

  • 服务能力: 每秒处理的消息数(吞吐量)
  • CPU 消耗: 进程的资源使用情况

典型的 Duty Cycle 类型

下面是两种典型的 Duty Cycle:

1. 业务逻辑 Duty Cycle (Business Logic Duty Cycle)

这是由输入消息驱动的典型 Duty Cycle。为了实现高吞吐量,应用的 sleep(即空闲策略)几乎不会延迟 Duty Cycle。


                while (true)
                {
                    Command command = adaptInputBuffer();           // 从输入缓冲区适配命令
                    routeToAppropriateBusinessLogic(command);       // 路由到适当的业务逻辑
                }
                

routeToAppropriateBusinessLogic() 的调用通常会执行类似于在有状态对象(如复制状态机 Replicated State Machine)上调用方法的操作:


                public void doSomething(Command command)
                {
                    processInput();    // 处理输入
                    emitEvents();      // 发出事件
                }
                

ASCII 流程图:


                业务逻辑 Duty Cycle 流程
                ┌─────────────────────────────────────────────────────┐
                │                  开始循环                            │
                └───────────────────┬─────────────────────────────────┘
                                    │
                                    ▼
                    ┌───────────────────────────────────┐
                    │   从输入缓冲区适配命令             │
                    │   adaptInputBuffer()              │
                    └───────────────┬───────────────────┘
                                    │
                                    ▼
                    ┌───────────────────────────────────┐
                    │   路由到业务逻辑                   │
                    │   routeToBusinessLogic(command)   │
                    └───────────────┬───────────────────┘
                                    │
                                    ▼
                    ┌───────────────────────────────────┐
                    │   处理输入                         │
                    │   processInput()                  │
                    └───────────────┬───────────────────┘
                                    │
                                    ▼
                    ┌───────────────────────────────────┐
                    │   发出事件                         │
                    │   emitEvents()                    │
                    └───────────────┬───────────────────┘
                                    │
                                    ▼
                    ┌───────────────────────────────────┐
                    │   应用空闲策略(几乎无延迟)         │
                    └───────────────┬───────────────────┘
                                    │
                                    └──────► 返回循环开始
                

流程解析:

  1. 适配输入: 从网络或队列中接收并解析命令
  2. 路由处理: 根据命令类型路由到对应的业务逻辑处理器
  3. 处理输入: 执行业务逻辑,更新状态
  4. 发出事件: 生成输出事件,通知其他组件
  5. 空闲策略: 由于是高吞吐场景,通常使用 BusySpinIdleStrategy 或 BackoffIdleStrategy 的激进配置

2. 连接管理 Duty Cycle (Connectivity Duty Cycle)

这种 Duty Cycle 不是由消息驱动的,而是由管理与某物连接的需求驱动的。没有必要每秒调用数千次,因此 Duty Cycle 中应用的 sleep 可能是数百毫秒。


                while (true)
                {
                    checkConnectionStatus();     // 检查连接状态
                    reconnectIfNeeded();        // 如果需要则重新连接
                    Thread.sleep(100);          // 休眠 100ms
                }
                

ASCII 流程图:


                连接管理 Duty Cycle 流程
                ┌─────────────────────────────────────────────────────┐
                │                  开始循环                            │
                └───────────────────┬─────────────────────────────────┘
                                    │
                                    ▼
                    ┌───────────────────────────────────┐
                    │   检查连接状态                     │
                    │   checkConnectionStatus()         │
                    └───────────────┬───────────────────┘
                                    │
                                    ▼
                            ┌───────────────┐
                            │  连接是否正常? │
                            └───┬───────┬───┘
                                │ 是    │ 否
                                │       │
                                │       ▼
                                │   ┌────────────────────┐
                                │   │  尝试重新连接       │
                                │   │  reconnect()       │
                                │   └──────┬─────────────┘
                                │          │
                                ▼          ▼
                    ┌───────────────────────────────────┐
                    │   休眠 100-500ms                   │
                    │   Thread.sleep(100)               │
                    │   (使用 SleepingIdleStrategy)     │
                    └───────────────┬───────────────────┘
                                    │
                                    └──────► 返回循环开始
                

流程解析:

  1. 检查连接: 定期检查网络连接、心跳等状态
  2. 条件重连: 只有在检测到连接问题时才执行重连
  3. 长时间休眠: 由于不需要高频率执行,使用较长的休眠时间(100-500ms)
  4. 空闲策略: 通常使用 SleepingIdleStrategy 来降低 CPU 使用率

核心概念


                graph TB
                    A[Duty Cycle 概念] --> B[工作量定义]
                    A --> C[执行频率]
                    A --> D[性能优化]

                    B --> B1[单次任务]
                    B --> B2[批量任务]
                    B --> B3[复合任务]

                    C --> C1[紧密循环]
                    C --> C2[定时触发]
                    C --> C3[事件驱动]

                    D --> D1[减少上下文切换]
                    D --> D2[提高缓存命中率]
                    D --> D3[优化CPU流水线]

                    style A fill:#4a90e2,stroke:#2e5c8a,color:#fff
                    style B fill:#50c878,stroke:#2d7a4a,color:#fff
                    style C fill:#f39c12,stroke:#b8730a,color:#fff
                    style D fill:#e74c3c,stroke:#a93529,color:#fff
                

概念说明

工作量定义

  • 单次任务:每次 Duty Cycle 执行一个独立的工作项
  • 批量任务:每次 Duty Cycle 批量处理多个工作项,提高吞吐量
  • 复合任务:在一个 Duty Cycle 中完成多个相关联的任务

执行频率

  • 紧密循环(Tight Loop):持续快速执行,适合低延迟场景
  • 定时触发:按固定时间间隔执行,适合周期性任务
  • 事件驱动:根据外部事件触发执行,适合响应式系统

性能优化

  • 减少线程上下文切换开销
  • 提高CPU缓存命中率
  • 优化CPU流水线执行效率

Duty Cycle 的工作原理

基本执行流程


                sequenceDiagram
                    participant Scheduler as 调度器
                    participant Agent as Agent
                    participant Work as 工作队列
                    participant Idle as 空闲策略

                    loop 持续执行
                        Scheduler->>Agent: 调用 doWork()
                        Agent->>Work: 检查是否有工作
                        alt 有工作可做
                            Work-->>Agent: 返回工作项数量
                            Agent->>Agent: 执行工作
                            Agent-->>Scheduler: 返回工作项数量 > 0
                            Scheduler->>Scheduler: 继续紧密循环
                        else 无工作可做
                            Work-->>Agent: 返回 0
                            Agent-->>Scheduler: 返回 0
                            Scheduler->>Idle: 应用空闲策略
                            Idle->>Idle: 等待/降低CPU使用
                        end
                    end
                

流程说明

  1. 调度器启动:调度器启动 Agent 的执行循环
  2. 检查工作:Agent 检查是否有待处理的工作
  3. 执行工作:如果有工作,执行并返回已完成的工作项数量
  4. 应用策略:如果没有工作,应用空闲策略来节省CPU资源
  5. 循环继续:重复上述过程

Duty Cycle 接口

在 Agrona 中,Duty Cycle 主要通过以下接口实现:


                /**
                 * Agent 接口 - 定义了 Duty Cycle 的核心方法
                 */
                public interface Agent
                {
                    /**
                     * 执行一个 Duty Cycle 的工作
                     *
                     * @return 完成的工作项数量,0 表示没有工作完成
                     * @throws Exception 如果在执行过程中发生错误
                     */
                    int doWork() throws Exception;

                    /**
                     * Agent 的角色名称
                     *
                     * @return 角色名称,用于日志和监控
                     */
                    String roleName();

                    /**
                     * 当 Agent 启动时调用
                     */
                    void onStart();

                    /**
                     * 当 Agent 关闭时调用
                     */
                    void onClose();
                }
                

实现示例

简单的 Duty Cycle 实现


                import org.agrona.concurrent.Agent;
                import java.util.Queue;
                import java.util.concurrent.ConcurrentLinkedQueue;

                /**
                 * 简单的消息处理 Agent
                 */
                public class MessageProcessorAgent implements Agent
                {
                    private final Queue<String> messageQueue = new ConcurrentLinkedQueue<>();
                    private long processedCount = 0;

                    @Override
                    public int doWork() throws Exception
                    {
                        int workDone = 0;

                        // 批量处理消息,最多处理10条
                        for (int i = 0; i < 10; i++)
                        {
                            final String message = messageQueue.poll();
                            if (message == null)
                            {
                                break; // 没有更多消息,退出循环
                            }

                            // 处理消息
                            processMessage(message);
                            workDone++;
                            processedCount++;
                        }

                        return workDone;
                    }

                    @Override
                    public String roleName()
                    {
                        return "MessageProcessor";
                    }

                    @Override
                    public void onStart()
                    {
                        System.out.println("MessageProcessor Agent 启动");
                    }

                    @Override
                    public void onClose()
                    {
                        System.out.println("MessageProcessor Agent 关闭,总共处理了 "
                            + processedCount + " 条消息");
                    }

                    /**
                     * 添加消息到队列
                     */
                    public void addMessage(final String message)
                    {
                        messageQueue.offer(message);
                    }

                    /**
                     * 处理单条消息
                     */
                    private void processMessage(final String message)
                    {
                        // 实际的消息处理逻辑
                        // 这里只是简单打印
                        // System.out.println("处理消息: " + message);
                    }
                }
                

批量处理的 Duty Cycle


                import org.agrona.concurrent.Agent;
                import org.agrona.concurrent.ManyToOneConcurrentArrayQueue;

                /**
                 * 批量处理订单的 Agent
                 */
                public class OrderBatchProcessor implements Agent
                {
                    private static final int BATCH_SIZE = 100; // 批量大小
                    private final ManyToOneConcurrentArrayQueue<Order> orderQueue;
                    private final Order[] batchBuffer = new Order[BATCH_SIZE];

                    public OrderBatchProcessor(final int queueSize)
                    {
                        this.orderQueue = new ManyToOneConcurrentArrayQueue<>(queueSize);
                    }

                    @Override
                    public int doWork() throws Exception
                    {
                        // 从队列中批量取出订单
                        final int batchCount = orderQueue.drain(
                            order -> true,  // 接受所有订单
                            batchBuffer,    // 放入批量缓冲区
                            BATCH_SIZE      // 最多取出 BATCH_SIZE 个
                        );

                        if (batchCount > 0)
                        {
                            // 批量处理订单
                            processBatch(batchBuffer, batchCount);
                        }

                        return batchCount;
                    }

                    @Override
                    public String roleName()
                    {
                        return "OrderBatchProcessor";
                    }

                    @Override
                    public void onStart()
                    {
                        System.out.println("订单批处理器启动");
                    }

                    @Override
                    public void onClose()
                    {
                        // 处理剩余的订单
                        final int remaining = doWork();
                        System.out.println("订单批处理器关闭,处理了剩余的 " + remaining + " 个订单");
                    }

                    /**
                     * 批量处理订单
                     */
                    private void processBatch(final Order[] orders, final int count)
                    {
                        for (int i = 0; i < count; i++)
                        {
                            final Order order = orders[i];
                            // 处理订单逻辑
                            processOrder(order);
                            orders[i] = null; // 清理引用,帮助GC
                        }
                    }

                    /**
                     * 处理单个订单
                     */
                    private void processOrder(final Order order)
                    {
                        // 订单处理逻辑
                    }

                    /**
                     * 提交新订单
                     */
                    public boolean submitOrder(final Order order)
                    {
                        return orderQueue.offer(order);
                    }

                    /**
                     * 订单类(示例)
                     */
                    static class Order
                    {
                        final long orderId;
                        final String symbol;
                        final int quantity;
                        final double price;

                        Order(final long orderId, final String symbol,
                              final int quantity, final double price)
                        {
                            this.orderId = orderId;
                            this.symbol = symbol;
                            this.quantity = quantity;
                            this.price = price;
                        }
                    }
                }
                

Duty Cycle 的设计模式

1. 单任务模式


                graph LR
                    A[Duty Cycle] --> B[检查工作]
                    B --> C{有工作?}
                    C -->|是| D[处理一项工作]
                    C -->|否| E[返回 0]
                    D --> F[返回 1]

                    style A fill:#4a90e2,stroke:#2e5c8a,color:#fff
                    style D fill:#50c878,stroke:#2d7a4a,color:#fff
                    style E fill:#e74c3c,stroke:#a93529,color:#fff
                

适用场景

  • 每个工作项处理时间较长
  • 需要保证公平性,避免饥饿
  • 对延迟要求极高,希望快速响应

2. 批量处理模式


                graph LR
                    A[Duty Cycle] --> B[批量获取工作]
                    B --> C{获取数量?}
                    C -->|> 0| D[批量处理]
                    C -->|= 0| E[返回 0]
                    D --> F[返回处理数量]

                    style A fill:#4a90e2,stroke:#2e5c8a,color:#fff
                    style D fill:#50c878,stroke:#2d7a4a,color:#fff
                    style E fill:#e74c3c,stroke:#a93529,color:#fff
                

适用场景

  • 工作项处理成本较低
  • 追求高吞吐量
  • 可以接受适度的延迟增加

3. 复合任务模式


                graph TB
                    A[Duty Cycle] --> B[任务1: 接收消息]
                    A --> C[任务2: 处理消息]
                    A --> D[任务3: 发送响应]

                    B --> E{有消息?}
                    C --> F{有待处理?}
                    D --> G{有待发送?}

                    E -->|是| B1[work += n]
                    E -->|否| B2[work += 0]
                    F -->|是| C1[work += n]
                    F -->|否| C2[work += 0]
                    G -->|是| D1[work += n]
                    G -->|否| D2[work += 0]

                    B1 --> H[返回总 work]
                    B2 --> H
                    C1 --> H
                    C2 --> H
                    D1 --> H
                    D2 --> H

                    style A fill:#4a90e2,stroke:#2e5c8a,color:#fff
                    style B fill:#50c878,stroke:#2d7a4a,color:#fff
                    style C fill:#f39c12,stroke:#b8730a,color:#fff
                    style D fill:#e74c3c,stroke:#a93529,color:#fff
                

适用场景

  • 多个相关任务需要在同一个 Duty Cycle 中完成
  • 任务之间有依赖关系
  • 希望减少上下文切换

性能优化技巧

1. 批量处理优化


                @Override
                public int doWork() throws Exception
                {
                    int totalWork = 0;

                    // 批量处理,避免频繁调用
                    for (int i = 0; i < MAX_BATCH_SIZE; i++)
                    {
                        final Message msg = queue.poll();
                        if (msg == null)
                        {
                            break;
                        }

                        processMessage(msg);
                        totalWork++;
                    }

                    return totalWork;
                }
                

优势

  • 减少方法调用开销
  • 提高CPU缓存命中率
  • 摊销固定成本

2. 预取优化


                @Override
                public int doWork() throws Exception
                {
                    int work = 0;

                    // 预取多个元素
                    work += processInputQueue();
                    work += processOutputQueue();
                    work += processControlQueue();

                    return work;
                }
                

优势

  • 充分利用CPU流水线
  • 减少分支预测失败
  • 提高指令级并行度

3. 避免不必要的工作


                @Override
                public int doWork() throws Exception
                {
                    // 快速路径:检查是否有工作
                    if (queue.isEmpty())
                    {
                        return 0;  // 立即返回,避免不必要的开销
                    }

                    // 慢速路径:执行实际工作
                    return processQueue();
                }
                

常见陷阱

1. 工作量过大


                // 错误示例:一次处理所有工作
                @Override
                public int doWork() throws Exception
                {
                    int work = 0;
                    while (!queue.isEmpty())  // 可能导致长时间占用CPU
                    {
                        processMessage(queue.poll());
                        work++;
                    }
                    return work;
                }
                

问题

  • 可能导致其他 Agent 饥饿
  • 无法及时响应关闭信号
  • 影响系统整体响应性

解决方案


                // 正确示例:限制每次处理的数量
                @Override
                public int doWork() throws Exception
                {
                    int work = 0;
                    final int limit = 100;  // 设置上限

                    for (int i = 0; i < limit && !queue.isEmpty(); i++)
                    {
                        processMessage(queue.poll());
                        work++;
                    }

                    return work;
                }
                

2. 返回值不准确


                // 错误示例:返回值与实际工作不符
                @Override
                public int doWork() throws Exception
                {
                    processMessage();
                    return 1;  // 总是返回1,即使没有实际工作
                }
                

问题

  • 导致空闲策略无法正确工作
  • 浪费CPU资源
  • 影响性能监控

3. 阻塞操作


                // 错误示例:在 doWork 中执行阻塞操作
                @Override
                public int doWork() throws Exception
                {
                    Thread.sleep(100);  // 阻塞!
                    return processMessages();
                }
                

问题

  • 违反非阻塞原则
  • 影响系统响应性
  • 无法充分利用CPU

最佳实践

1. 明确返回值语义


                @Override
                public int doWork() throws Exception
                {
                    int workDone = 0;

                    // 明确记录完成的工作量
                    workDone += receiveMessages();
                    workDone += processMessages();
                    workDone += sendResponses();

                    return workDone;  // 返回实际完成的工作项数量
                }
                

2. 合理设置批量大小


                // 根据实际情况调整批量大小
                private static final int SMALL_BATCH = 10;   // 低延迟场景
                private static final int MEDIUM_BATCH = 100; // 平衡场景
                private static final int LARGE_BATCH = 1000; // 高吞吐场景
                

3. 监控和测量


                public class MonitoredAgent implements Agent
                {
                    private final Agent delegate;
                    private long totalCycles = 0;
                    private long totalWork = 0;

                    @Override
                    public int doWork() throws Exception
                    {
                        totalCycles++;
                        final int work = delegate.doWork();
                        totalWork += work;
                        return work;
                    }

                    public double getAverageWorkPerCycle()
                    {
                        return totalCycles > 0 ? (double)totalWork / totalCycles : 0.0;
                    }
                }
                

总结

Duty Cycle 是 Agrona 高性能架构的核心概念:

  1. 定义工作单元:明确每次执行应完成的工作量
  2. 优化性能:通过批量处理、预取等技术提升效率
  3. 配合空闲策略:在没有工作时合理使用CPU资源
  4. 避免常见陷阱:不阻塞、准确返回、限制工作量

掌握 Duty Cycle 的设计和优化技巧,是构建高性能系统的关键。