Agents & Idle Strategies (代理与空闲策略)

Agents & Idle Strategies (代理与空闲策略)


概述

在 Agrona 中,Agent 是执行 Duty Cycle 的基本单元,而 Idle Strategy(空闲策略)则决定了当 Agent 没有工作可做时应该采取什么行动。这两个概念的结合是实现高性能、低延迟系统的关键。

线程的问题 (The Problem with Threads)

在 2006 年,Edward A. Lee 撰写了一篇技术报告,题为 "The Problem with Threads"(线程的问题)。在报告中,他提出:

> 线程作为计算模型具有极高的不确定性,程序员的工作就变成了修剪这种不确定性。虽然许多研究技术通过提供更有效的修剪来改进模型,但我认为这是在倒着解决问题。我们应该从本质上确定性的、可组合的组件开始构建,而不是从需要移除的地方移除不确定性。不确定性应该在需要的地方明确而审慎地引入,而不是在不需要的地方移除。

Agrona 的 Agent 和 Idle Strategy 正是实现这一理念的方式之一。当 Agrona Agent 与 Aeron 一起使用时,允许以安全、一致的方式构建确定性的、资源管理的线程,这对开发者来说易于理解和推理。

核心设计理念


                Agrona Agent 设计哲学
                ┌─────────────────────────────────────────────────────────────┐
                │                     确定性计算模型                           │
                │  (Deterministic Computation Model)                          │
                └───────────────────────────┬─────────────────────────────────┘
                                            │
                        ┌───────────────────┴───────────────────┐
                        │                                       │
                        ▼                                       ▼
                ┌───────────────────┐               ┌───────────────────────┐
                │  可组合的组件      │               │  资源管理的线程        │
                │  (Composable)     │               │  (Resource Managed)   │
                └─────────┬─────────┘               └───────────┬───────────┘
                          │                                     │
                          │         ┌──────────────┐            │
                          └────────►│ Agrona Agent │◄───────────┘
                                    └──────┬───────┘
                                           │
                        ┌──────────────────┼──────────────────┐
                        │                  │                  │
                        ▼                  ▼                  ▼
                ┌──────────────┐  ┌─────────────────┐  ┌──────────────────┐
                │ Duty Cycle   │  │  Idle Strategy  │  │  AgentRunner     │
                │ (工作周期)    │  │  (空闲策略)      │  │  (运行调度器)     │
                └──────────────┘  └─────────────────┘  └──────────────────┘
                

关键特性:

  • 确定性: Agent 的行为是可预测的
  • 可组合: 多个 Agent 可以组合在一起
  • 资源管理: 通过 Idle Strategy 精确控制 CPU 使用
  • 易于推理: 简单的编程模型,易于理解和调试

Agent (代理)

Agent 的定义

Agent 是一个执行特定职责的工作单元,它持续运行并处理分配给它的任务。

Agrona Agent 是应用程序逻辑的容器,在 Duty Cycle 中执行,例如处理来自 Aeron 订阅的消息。Agent 的 Duty Cycle 间隔以及 CPU 消耗由空闲策略控制。Agent 可以被调度在专用线程上,也可以作为单个线程上的复合代理组的一部分运行。

典型的 Duty Cycle 工作流程:

  1. 持续轮询 Agent 的 doWork 函数,直到它返回 0
  2. 一旦返回 0,调用空闲策略(Idle Strategy)
  3. 空闲策略决定如何处理无工作状态(自旋、让出CPU、休眠等)

                graph TB
                    A[Agent] --> B[doWork方法]
                    A --> C[生命周期管理]
                    A --> D[角色标识]

                    B --> B1[返回工作量]
                    B --> B2[非阻塞执行]
                    B --> B3[异常处理]

                    C --> C1[onStart启动]
                    C --> C2[onClose关闭]
                    C --> C3[资源管理]

                    D --> D1[roleName]
                    D --> D2[日志标识]
                    D --> D3[监控标识]

                    style A fill:#4a90e2,stroke:#2e5c8a,color:#fff
                    style B fill:#50c878,stroke:#2d7a4a,color:#fff
                    style C fill:#f39c12,stroke:#b8730a,color:#fff
                    style D fill:#e74c3c,stroke:#a93529,color:#fff
                

Agent 接口详解


                package org.agrona.concurrent;

                /**
                 * Agent 接口定义
                 */
                public interface Agent
                {
                    /**
                     * 执行一个工作周期
                     *
                     * @return 完成的工作项数量,0表示没有工作
                     * @throws Exception 执行过程中的任何异常
                     */
                    int doWork() throws Exception;

                    /**
                     * 获取 Agent 的角色名称
                     *
                     * @return 角色名称字符串
                     */
                    String roleName();

                    /**
                     * Agent 启动时调用
                     */
                    default void onStart()
                    {
                    }

                    /**
                     * Agent 关闭时调用
                     */
                    default void onClose()
                    {
                    }
                }
                

Agent 实现示例

基础 Agent 实现


                import org.agrona.concurrent.Agent;
                import org.agrona.concurrent.ManyToOneConcurrentArrayQueue;

                /**
                 * 基础的消息接收 Agent
                 */
                public class MessageReceiverAgent implements Agent
                {
                    private final ManyToOneConcurrentArrayQueue<Message> inputQueue;
                    private final MessageHandler messageHandler;
                    private long messagesReceived = 0;

                    public MessageReceiverAgent(
                        final ManyToOneConcurrentArrayQueue<Message> inputQueue,
                        final MessageHandler messageHandler)
                    {
                        this.inputQueue = inputQueue;
                        this.messageHandler = messageHandler;
                    }

                    @Override
                    public int doWork() throws Exception
                    {
                        int workDone = 0;

                        // 从队列中取出并处理消息
                        Message message;
                        while ((message = inputQueue.poll()) != null)
                        {
                            messageHandler.onMessage(message);
                            messagesReceived++;
                            workDone++;

                            // 限制每次处理的数量,避免长时间占用
                            if (workDone >= 10)
                            {
                                break;
                            }
                        }

                        return workDone;
                    }

                    @Override
                    public String roleName()
                    {
                        return "MessageReceiver";
                    }

                    @Override
                    public void onStart()
                    {
                        System.out.println("[" + roleName() + "] Agent 启动");
                        messagesReceived = 0;
                    }

                    @Override
                    public void onClose()
                    {
                        System.out.println("[" + roleName() + "] Agent 关闭," +
                            "总共接收了 " + messagesReceived + " 条消息");
                    }

                    public long getMessagesReceived()
                    {
                        return messagesReceived;
                    }

                    /**
                     * 消息处理器接口
                     */
                    public interface MessageHandler
                    {
                        void onMessage(Message message);
                    }

                    /**
                     * 消息类(示例)
                     */
                    public static class Message
                    {
                        private final String content;

                        public Message(final String content)
                        {
                            this.content = content;
                        }

                        public String getContent()
                        {
                            return content;
                        }
                    }
                }
                

复合 Agent 实现


                import org.agrona.concurrent.Agent;

                /**
                 * 复合 Agent - 组合多个子 Agent
                 */
                public class CompositeAgent implements Agent
                {
                    private final Agent[] agents;
                    private final String roleName;

                    public CompositeAgent(final String roleName, final Agent... agents)
                    {
                        this.roleName = roleName;
                        this.agents = agents;
                    }

                    @Override
                    public int doWork() throws Exception
                    {
                        int totalWork = 0;

                        // 依次执行所有子 Agent 的工作
                        for (final Agent agent : agents)
                        {
                            totalWork += agent.doWork();
                        }

                        return totalWork;
                    }

                    @Override
                    public String roleName()
                    {
                        return roleName;
                    }

                    @Override
                    public void onStart()
                    {
                        System.out.println("[" + roleName + "] 复合 Agent 启动");

                        // 启动所有子 Agent
                        for (final Agent agent : agents)
                        {
                            agent.onStart();
                        }
                    }

                    @Override
                    public void onClose()
                    {
                        // 关闭所有子 Agent
                        for (final Agent agent : agents)
                        {
                            agent.onClose();
                        }

                        System.out.println("[" + roleName + "] 复合 Agent 关闭");
                    }
                }
                

Idle Strategy (空闲策略)

空闲策略的重要性

当 Agent 没有工作可做时,IdleStrategy 决定如何使用 CPU 资源。这直接影响:

  • 延迟:响应时间的快慢
  • CPU 使用率:资源利用效率
  • 功耗:能源消耗水平
  • 吞吐量:系统整体处理能力

                graph TB
                    A[空闲策略] --> B[低延迟策略]
                    A --> C[平衡策略]
                    A --> D[低CPU使用策略]

                    B --> B1[BusySpinIdleStrategy]
                    B --> B2[持续自旋]
                    B --> B3[最低延迟/最高CPU]

                    C --> C1[BackoffIdleStrategy]
                    C --> C2[渐进式退避]
                    C --> C3[平衡延迟与CPU]

                    D --> D1[SleepingIdleStrategy]
                    D --> D2[休眠等待]
                    D --> D3[最低CPU/较高延迟]

                    style A fill:#4a90e2,stroke:#2e5c8a,color:#fff
                    style B fill:#e74c3c,stroke:#a93529,color:#fff
                    style C fill:#f39c12,stroke:#b8730a,color:#fff
                    style D fill:#50c878,stroke:#2d7a4a,color:#fff
                

IdleStrategy 接口


                package org.agrona.concurrent;

                /**
                 * 空闲策略接口
                 */
                public interface IdleStrategy
                {
                    /**
                     * 空闲时调用
                     *
                     * @param workCount 最近一次 doWork() 返回的工作量
                     */
                    void idle(int workCount);

                    /**
                     * 空闲时调用(无工作量参数)
                     */
                    void idle();

                    /**
                     * 重置策略状态
                     */
                    void reset();
                }
                

常用的空闲策略

1. BusySpinIdleStrategy (忙等待策略)

特点:持续自旋,不让出 CPU


                package org.agrona.concurrent;

                /**
                 * 忙等待空闲策略
                 * 最低延迟,但CPU使用率100%
                 */
                public final class BusySpinIdleStrategy implements IdleStrategy
                {
                    @Override
                    public void idle(final int workCount)
                    {
                        // 什么都不做,持续自旋
                    }

                    @Override
                    public void idle()
                    {
                        // 什么都不做,持续自旋
                    }

                    @Override
                    public void reset()
                    {
                        // 无状态,无需重置
                    }
                }
                

使用场景

  • 超低延迟要求(微秒级)
  • 独占CPU核心
  • 实时交易系统
  • 高频消息处理

示例


                import org.agrona.concurrent.*;

                public class LowLatencyExample
                {
                    public static void main(String[] args)
                    {
                        final Agent agent = new MyTradingAgent();
                        final IdleStrategy idleStrategy = new BusySpinIdleStrategy();

                        final AgentRunner runner = new AgentRunner(
                            idleStrategy,
                            Throwable::printStackTrace,
                            null,
                            agent
                        );

                        // 在专用线程上运行
                        final Thread thread = new Thread(runner);
                        thread.setName("TradingAgent");
                        thread.start();
                    }
                }
                

2. BackoffIdleStrategy (退避策略)

特点:渐进式退避,平衡延迟和 CPU 使用


                package org.agrona.concurrent;

                /**
                 * 退避空闲策略
                 * 在忙等待、让出CPU和休眠之间渐进切换
                 */
                public final class BackoffIdleStrategy implements IdleStrategy
                {
                    private final long maxSpins;          // 最大自旋次数
                    private final long maxYields;         // 最大让出次数
                    private final long minParkPeriodNs;   // 最小休眠时间(纳秒)
                    private final long maxParkPeriodNs;   // 最大休眠时间(纳秒)

                    private long spins = 0;
                    private long yields = 0;
                    private long parks = 0;

                    public BackoffIdleStrategy(
                        final long maxSpins,
                        final long maxYields,
                        final long minParkPeriodNs,
                        final long maxParkPeriodNs)
                    {
                        this.maxSpins = maxSpins;
                        this.maxYields = maxYields;
                        this.minParkPeriodNs = minParkPeriodNs;
                        this.maxParkPeriodNs = maxParkPeriodNs;
                    }

                    @Override
                    public void idle(final int workCount)
                    {
                        if (workCount > 0)
                        {
                            reset();
                        }
                        else
                        {
                            idle();
                        }
                    }

                    @Override
                    public void idle()
                    {
                        // 第一阶段:自旋
                        if (spins < maxSpins)
                        {
                            spins++;
                            return;
                        }

                        // 第二阶段:让出CPU
                        if (yields < maxYields)
                        {
                            yields++;
                            Thread.yield();
                            return;
                        }

                        // 第三阶段:休眠
                        parks++;
                        final long parkTime = Math.min(
                            minParkPeriodNs << parks,
                            maxParkPeriodNs
                        );

                        LockSupport.parkNanos(parkTime);
                    }

                    @Override
                    public void reset()
                    {
                        spins = 0;
                        yields = 0;
                        parks = 0;
                    }
                }
                

退避过程


                stateDiagram-v2
                    [*] --> 忙自旋
                    忙自旋 --> 让出CPU: 自旋次数 >= maxSpins
                    让出CPU --> 短暂休眠: 让出次数 >= maxYields
                    短暂休眠 --> 长时间休眠: 持续空闲
                    长时间休眠 --> 长时间休眠: 持续空闲

                    忙自旋 --> [*]: 有工作
                    让出CPU --> [*]: 有工作
                    短暂休眠 --> [*]: 有工作
                    长时间休眠 --> [*]: 有工作
                

使用场景

  • 负载波动的系统
  • 需要平衡延迟和CPU使用
  • 多个Agent共享CPU
  • 通用的高性能应用

示例


                // 创建默认的退避策略
                final IdleStrategy idleStrategy = new BackoffIdleStrategy(
                    100,      // 最大自旋次数
                    10,       // 最大让出次数
                    1,        // 最小休眠时间 1ns
                    1000_000  // 最大休眠时间 1ms
                );
                

3. SleepingIdleStrategy (休眠策略)

特点:立即休眠,最低 CPU 使用


                package org.agrona.concurrent;

                /**
                 * 休眠空闲策略
                 * 最低CPU使用,但延迟较高
                 */
                public final class SleepingIdleStrategy implements IdleStrategy
                {
                    private final long sleepPeriodNs;

                    public SleepingIdleStrategy(final long sleepPeriodNs)
                    {
                        this.sleepPeriodNs = sleepPeriodNs;
                    }

                    @Override
                    public void idle(final int workCount)
                    {
                        if (workCount <= 0)
                        {
                            idle();
                        }
                    }

                    @Override
                    public void idle()
                    {
                        LockSupport.parkNanos(sleepPeriodNs);
                    }

                    @Override
                    public void reset()
                    {
                        // 无状态
                    }
                }
                

使用场景

  • 后台批处理任务
  • 非时间敏感的操作
  • 节能场景
  • 资源受限环境

示例


                // 每次空闲休眠1毫秒
                final IdleStrategy idleStrategy = new SleepingIdleStrategy(1_000_000);
                

4. YieldingIdleStrategy (让出策略)

特点:调用 Thread.yield() 让出 CPU


                package org.agrona.concurrent;

                /**
                 * 让出空闲策略
                 * 中等延迟和CPU使用
                 */
                public final class YieldingIdleStrategy implements IdleStrategy
                {
                    @Override
                    public void idle(final int workCount)
                    {
                        if (workCount <= 0)
                        {
                            Thread.yield();
                        }
                    }

                    @Override
                    public void idle()
                    {
                        Thread.yield();
                    }

                    @Override
                    public void reset()
                    {
                        // 无状态
                    }
                }
                

使用场景

  • 需要快速响应但不需要极低延迟
  • CPU核心数充足
  • 多个线程竞争

5. NoOpIdleStrategy (无操作策略)

特点:什么都不做


                package org.agrona.concurrent;

                /**
                 * 无操作空闲策略
                 * 用于测试或特殊场景
                 */
                public final class NoOpIdleStrategy implements IdleStrategy
                {
                    public static final NoOpIdleStrategy INSTANCE = new NoOpIdleStrategy();

                    @Override
                    public void idle(final int workCount)
                    {
                        // 什么都不做
                    }

                    @Override
                    public void idle()
                    {
                        // 什么都不做
                    }

                    @Override
                    public void reset()
                    {
                        // 无状态
                    }
                }
                

空闲策略对比


                graph TB
                    subgraph 延迟对比
                    A1[BusySpinIdleStrategy] --> |最低| L1[延迟: ~100ns]
                    A2[YieldingIdleStrategy] --> |低| L2[延迟: ~1us]
                    A3[BackoffIdleStrategy] --> |中| L3[延迟: 1us-10ms]
                    A4[SleepingIdleStrategy] --> |高| L4[延迟: 1ms+]
                    end

                    subgraph CPU使用对比
                    B1[BusySpinIdleStrategy] --> |100%| C1[CPU使用率]
                    B2[YieldingIdleStrategy] --> |高| C2[CPU使用率]
                    B3[BackoffIdleStrategy] --> |中| C3[CPU使用率]
                    B4[SleepingIdleStrategy] --> |低| C4[CPU使用率]
                    end

                    style A1 fill:#e74c3c,stroke:#a93529,color:#fff
                    style A2 fill:#f39c12,stroke:#b8730a,color:#fff
                    style A3 fill:#3498db,stroke:#2874a6,color:#fff
                    style A4 fill:#50c878,stroke:#2d7a4a,color:#fff
                

| 策略 | 延迟 | CPU使用 | 适用场景 | |------|------|---------|----------| | BusySpinIdleStrategy | 最低(~100ns) | 100% | 超低延迟交易系统 | | YieldingIdleStrategy | 低(~1μs) | 高(70-90%) | 快速响应系统 | | BackoffIdleStrategy | 中(1μs-10ms) | 中(10-70%) | 通用高性能应用 | | SleepingIdleStrategy | 高(1ms+) | 低(1-10%) | 后台批处理 |

AgentRunner (Agent 运行器)

AgentRunner 的作用

AgentRunner 负责在线程中运行 Agent,并应用 IdleStrategy。


                sequenceDiagram
                    participant Thread as 工作线程
                    participant Runner as AgentRunner
                    participant Agent as Agent
                    participant Idle as IdleStrategy

                    Thread->>Runner: start()
                    Runner->>Agent: onStart()

                    loop 运行循环
                        Runner->>Agent: doWork()
                        Agent-->>Runner: 返回 workCount

                        alt workCount > 0
                            Runner->>Idle: reset()
                        else workCount == 0
                            Runner->>Idle: idle()
                        end

                        Runner->>Runner: 检查运行状态
                    end

                    Runner->>Agent: onClose()
                    Runner-->>Thread: 线程退出
                

AgentRunner 实现示例


                import org.agrona.concurrent.*;

                /**
                 * AgentRunner 使用示例
                 */
                public class AgentRunnerExample
                {
                    public static void main(String[] args) throws InterruptedException
                    {
                        // 创建 Agent
                        final Agent agent = new MyAgent();

                        // 创建空闲策略
                        final IdleStrategy idleStrategy = new BackoffIdleStrategy(
                            100, 10, 1, 1_000_000
                        );

                        // 创建错误处理器
                        final ErrorHandler errorHandler = throwable ->
                        {
                            System.err.println("Agent 发生错误: " + throwable.getMessage());
                            throwable.printStackTrace();
                        };

                        // 创建 AgentRunner
                        final AgentRunner runner = new AgentRunner(
                            idleStrategy,       // 空闲策略
                            errorHandler,       // 错误处理器
                            null,               // 异常计数器(可选)
                            agent               // Agent实例
                        );

                        // 在新线程中启动
                        final Thread agentThread = AgentRunner.startOnThread(runner);

                        // 运行一段时间
                        Thread.sleep(5000);

                        // 关闭
                        runner.close();
                        agentThread.join();

                        System.out.println("Agent 已关闭");
                    }

                    static class MyAgent implements Agent
                    {
                        private int workCycle = 0;

                        @Override
                        public int doWork()
                        {
                            // 模拟工作
                            if (workCycle % 100 == 0)
                            {
                                System.out.println("完成工作周期: " + workCycle);
                                workCycle = 0;
                                return 1;  // 有工作完成
                            }

                            workCycle++;
                            return 0;  // 没有工作
                        }

                        @Override
                        public String roleName()
                        {
                            return "MyAgent";
                        }

                        @Override
                        public void onStart()
                        {
                            System.out.println("Agent 启动");
                        }

                        @Override
                        public void onClose()
                        {
                            System.out.println("Agent 关闭");
                        }
                    }
                }
                

高级技巧

1. 动态调整空闲策略


                /**
                 * 动态空闲策略适配器
                 */
                public class AdaptiveIdleStrategy implements IdleStrategy
                {
                    private IdleStrategy current;
                    private final IdleStrategy busySpin = new BusySpinIdleStrategy();
                    private final IdleStrategy backoff = new BackoffIdleStrategy(
                        100, 10, 1, 1_000_000
                    );

                    private long consecutiveIdleCycles = 0;
                    private static final long THRESHOLD = 1000;

                    @Override
                    public void idle(final int workCount)
                    {
                        if (workCount > 0)
                        {
                            consecutiveIdleCycles = 0;
                            current = busySpin;
                        }
                        else
                        {
                            consecutiveIdleCycles++;

                            if (consecutiveIdleCycles > THRESHOLD)
                            {
                                current = backoff;
                            }

                            current.idle();
                        }
                    }

                    @Override
                    public void idle()
                    {
                        current.idle();
                    }

                    @Override
                    public void reset()
                    {
                        consecutiveIdleCycles = 0;
                        if (current != null)
                        {
                            current.reset();
                        }
                    }
                }
                

2. 监控 Agent 性能


                /**
                 * 带监控的 Agent 包装器
                 */
                public class MonitoredAgent implements Agent
                {
                    private final Agent delegate;
                    private long totalCycles = 0;
                    private long busyCycles = 0;
                    private long totalWork = 0;
                    private long startTime;

                    public MonitoredAgent(final Agent delegate)
                    {
                        this.delegate = delegate;
                    }

                    @Override
                    public int doWork() throws Exception
                    {
                        totalCycles++;
                        final int work = delegate.doWork();

                        if (work > 0)
                        {
                            busyCycles++;
                            totalWork += work;
                        }

                        return work;
                    }

                    @Override
                    public String roleName()
                    {
                        return delegate.roleName() + "-Monitored";
                    }

                    @Override
                    public void onStart()
                    {
                        startTime = System.nanoTime();
                        delegate.onStart();
                    }

                    @Override
                    public void onClose()
                    {
                        delegate.onClose();
                        printStatistics();
                    }

                    private void printStatistics()
                    {
                        final long runTime = System.nanoTime() - startTime;
                        final double utilization = totalCycles > 0 ?
                            (double)busyCycles / totalCycles * 100 : 0;

                        System.out.println("=== Agent 统计信息 ===");
                        System.out.println("运行时间: " + runTime / 1_000_000 + " ms");
                        System.out.println("总周期数: " + totalCycles);
                        System.out.println("忙碌周期: " + busyCycles);
                        System.out.println("总工作量: " + totalWork);
                        System.out.println("利用率: " + String.format("%.2f", utilization) + "%");
                        System.out.println("平均每周期工作量: " +
                            String.format("%.2f", (double)totalWork / totalCycles));
                    }
                }
                

最佳实践

1. 选择合适的空闲策略


                // 低延迟场景
                IdleStrategy lowLatency = new BusySpinIdleStrategy();

                // 通用场景
                IdleStrategy general = new BackoffIdleStrategy(100, 10, 1, 1_000_000);

                // 低CPU使用场景
                IdleStrategy lowCpu = new SleepingIdleStrategy(1_000_000);
                

2. Agent 设计原则

  • 单一职责:每个 Agent 专注一个任务
  • 非阻塞:doWork() 方法不应阻塞
  • 批量处理:适当批量处理以提高吞吐量
  • 准确返回:返回值应准确反映实际工作量

3. 线程配置


                // 为Agent配置专用线程
                final Thread thread = AgentRunner.startOnThread(runner);
                thread.setName("MyAgent-Thread");
                thread.setPriority(Thread.MAX_PRIORITY);  // 高优先级

                // 设置CPU亲和性(需要JNI支持)
                // setThreadAffinity(thread, cpuId);
                

调度 Agent (Scheduling Agents)

要启动 Agent 的 Duty Cycle,你需要决定如何调度它。

调度注意事项

重要提示: 并非所有空闲策略都是线程安全的。通常建议每个被调度的 Agent 使用独立的空闲策略实例。


                Agent 调度选项
                ┌──────────────────────────────────────────────────────┐
                │            Agent 调度方式选择                         │
                └────────────────────┬─────────────────────────────────┘
                                     │
                        ┌────────────┴────────────┐
                        │                         │
                        ▼                         ▼
                ┌──────────────────┐      ┌──────────────────────┐
                │  单Agent调度      │      │  多Agent组合调度      │
                └────────┬─────────┘      └──────────┬───────────┘
                         │                           │
                    ┌────┴────┐                 ┌────┴────┐
                    │         │                 │         │
                    ▼         ▼                 ▼         ▼
                ┌───────┐ ┌────────────┐  ┌─────────────────────┐
                │ Agrona│ │ 自定义线程  │  │  CompositeAgent     │
                │ 线程  │ │ ThreadFactory│ │  (多Agent作为单元)  │
                └───────┘ └────────────┘  └─────────────────────┘
                

调度方式详解

1. 使用 Agrona 提供的线程

最简单的方式是让 Agrona 为你的 Agent 创建并管理线程:


                final Agent agent = new MyAgent();
                final IdleStrategy idleStrategy = new BackoffIdleStrategy(100, 10, 1, 1_000_000);

                final AgentRunner runner = new AgentRunner(
                    idleStrategy,
                    throwable -> System.err.println("Error: " + throwable),
                    null,  // 错误计数器(可选)
                    agent
                );

                // 在 Agrona 提供的新线程上启动
                final Thread thread = AgentRunner.startOnThread(runner);
                thread.setName("MyAgent-Thread");
                

优点:

  • 简单直接
  • Agrona 管理线程生命周期
  • 自动处理启动和关闭

2. 使用自定义 ThreadFactory

如果需要更多控制,可以提供自定义的 ThreadFactory:


                final ThreadFactory customFactory = runnable -> {
                    final Thread thread = new Thread(runnable);
                    thread.setName("CustomAgent");
                    thread.setPriority(Thread.MAX_PRIORITY);
                    thread.setDaemon(false);
                    return thread;
                };

                final Thread thread = AgentRunner.startOnThread(runner, customFactory);
                

用途:

  • 设置特定的线程属性(名称、优先级、守护状态等)
  • 集成到现有的线程管理框架
  • 实现自定义的线程监控

3. 使用 CompositeAgent 组合多个 Agent

可以将多个 Agent 组合成一个单元,在同一个线程上调度:


                // 创建多个Agent
                final Agent receiverAgent = new ReceiverAgent();
                final Agent processorAgent = new ProcessorAgent();
                final Agent senderAgent = new SenderAgent();

                // 组合成CompositeAgent
                final Agent compositeAgent = new CompositeAgent(
                    receiverAgent,
                    processorAgent,
                    senderAgent
                );

                // 作为单个单元调度
                final AgentRunner runner = new AgentRunner(
                    new BackoffIdleStrategy(100, 10, 1, 1_000_000),
                    Throwable::printStackTrace,
                    null,
                    compositeAgent
                );

                AgentRunner.startOnThread(runner);
                

ASCII 流程图:


                CompositeAgent 执行流程
                ┌─────────────────────────────────────────┐
                │         AgentRunner 启动                 │
                └───────────────┬─────────────────────────┘
                                │
                                ▼
                        ┌───────────────────┐
                        │  CompositeAgent   │
                        │    doWork()       │
                        └────────┬──────────┘
                                 │
                     ┌───────────┴───────────┐
                     │ 依次调用各Agent的doWork │
                     └───────────┬───────────┘
                                 │
                    ┌────────────┼────────────┐
                    │            │            │
                    ▼            ▼            ▼
                ┌─────────┐  ┌─────────┐  ┌─────────┐
                │Agent 1  │  │Agent 2  │  │Agent 3  │
                │doWork() │  │doWork() │  │doWork() │
                │return 5 │  │return 0 │  │return 3 │
                └────┬────┘  └────┬────┘  └────┬────┘
                     │            │            │
                     └────────────┼────────────┘
                                  │
                                  ▼
                        ┌─────────────────┐
                        │  合计工作量      │
                        │  totalWork = 8  │
                        └────────┬────────┘
                                 │
                                 ▼
                            ┌─────────┐
                            │workCount│
                            │   > 0?  │
                            └─┬─────┬─┘
                              │是   │否
                              │     │
                              ▼     ▼
                        ┌─────────┐ ┌──────────────┐
                        │立即返回  │ │ 应用空闲策略  │
                        │下一周期  │ │idleStrategy  │
                        └─────────┘ └──────────────┘
                

优势:

  • 资源高效: 多个相关Agent共享一个线程
  • 减少上下文切换: 避免线程间切换开销
  • 简化管理: 作为单个单元启动和停止

适用场景:

  • Agent之间有依赖关系
  • Agent处理的是相关的任务
  • 需要在单个线程上保证执行顺序

完整调度示例


                import org.agrona.concurrent.*;

                public class AgentSchedulingExample
                {
                    public static void main(String[] args) throws InterruptedException
                    {
                        // 示例1: 单Agent专用线程
                        scheduleS单Agent();

                        // 示例2: 组合Agent共享线程
                        scheduleCompositeAgent();
                    }

                    private static void scheduleSingleAgent()
                    {
                        final Agent agent = new MyBusinessAgent();
                        final IdleStrategy idleStrategy = new BusySpinIdleStrategy();

                        final AgentRunner runner = new AgentRunner(
                            idleStrategy,
                            throwable -> System.err.println("Error in agent: " + throwable),
                            null,
                            agent
                        );

                        // 使用Agrona线程
                        final Thread thread = AgentRunner.startOnThread(runner);
                        thread.setName("BusinessAgent-Thread");
                    }

                    private static void scheduleCompositeAgent()
                    {
                        // 创建多个Agent
                        final Agent[] agents = {
                            new ReceiverAgent(),
                            new ProcessorAgent(),
                            new SenderAgent()
                        };

                        // 创建CompositeAgent
                        final Agent composite = new CompositeAgent(agents);

                        // 调度
                        final AgentRunner runner = new AgentRunner(
                            new BackoffIdleStrategy(100, 10, 1, 1_000_000),
                            Throwable::printStackTrace,
                            null,
                            composite
                        );

                        AgentRunner.startOnThread(runner).setName("Composite-Thread");
                    }
                }
                

总结

Agent 和 IdleStrategy 是 Agrona 高性能架构的核心:

  1. Agent 定义了工作单元的执行方式
  2. IdleStrategy 优化了空闲时的资源使用
  3. AgentRunner 将两者结合,提供完整的运行框架
  4. 根据具体场景选择合适的空闲策略至关重要

正确使用这些组件,可以构建出既高性能又资源高效的系统。