如何在 Agent Runner 中使用自定义 ThreadFactory

如何在 Agent Runner 中使用自定义 ThreadFactory


问题描述

你需要使用特定的 ThreadFactory 来管理进程中的线程,并且想要调度 Agrona Agent。

解决方案

AgentRunner 构造函数中指定 ThreadFactory

示例代码


                void startWithMyThreadFactory(ThreadFactory factory)
                {
                    Agent agent = new MyAgent();
                    AgentRunner myAgentRunner = new AgentRunner(idleStrategy,
                            Throwable::printStackTrace,
                            errorCounter, agent);
                    AgentRunner.startOnThread(myAgentRunner, factory);
                }
                

详细说明

ThreadFactory 的作用


                线程创建流程:
                ┌─────────────────────────────────────────────┐
                │  AgentRunner.startOnThread()                │
                │                                             │
                │  1. 接收 ThreadFactory                      │
                │     ┌────────────────────────┐             │
                │     │ factory.newThread(     │             │
                │     │   runnable)            │             │
                │     └────────┬───────────────┘             │
                │              │                              │
                │              ▼                              │
                │  2. 创建并配置线程                          │
                │     ┌────────────────────────┐             │
                │     │ • 设置线程名            │             │
                │     │ • 设置优先级            │             │
                │     │ • 设置守护状态          │             │
                │     │ • 绑定CPU核心          │             │
                │     │ • 设置线程组            │             │
                │     └────────┬───────────────┘             │
                │              │                              │
                │              ▼                              │
                │  3. 启动线程                                │
                │     ┌────────────────────────┐             │
                │     │ thread.start()         │             │
                │     └────────────────────────┘             │
                └─────────────────────────────────────────────┘
                

使用场景

这在你有自定义 ThreadFactory 时非常有用,它可以以特定于应用程序的方式管理线程,例如:

  1. CPU 核心绑定 - 将线程固定到特定 CPU 核心
  2. 线程命名 - 使用自定义命名规则
  3. 优先级设置 - 设置特定的线程优先级
  4. 线程组管理 - 将线程加入特定线程组
  5. 监控和统计 - 跟踪线程创建和销毁

完整实现示例

1. CPU 核心绑定


                import org.agrona.concurrent.*;
                import org.agrona.hints.ThreadHints;

                /**
                 * 将线程绑定到特定CPU核心的ThreadFactory
                 */
                public class AffinityThreadFactory implements ThreadFactory {
                    private final int cpuId;
                    private final AtomicInteger threadNumber = new AtomicInteger(1);
                    private final String namePrefix;

                    public AffinityThreadFactory(String namePrefix, int cpuId) {
                        this.namePrefix = namePrefix;
                        this.cpuId = cpuId;
                    }

                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(() -> {
                            // 设置CPU亲和性
                            ThreadHints.onSpinWait();  // JDK 9+
                            // 在某些系统上可以使用JNA绑定CPU
                            // setThreadAffinity(cpuId);

                            System.out.println(
                                Thread.currentThread().getName() +
                                " 绑定到CPU核心: " + cpuId
                            );

                            r.run();
                        });

                        thread.setName(namePrefix + "-" + threadNumber.getAndIncrement());
                        thread.setDaemon(false);

                        return thread;
                    }

                    // 使用示例
                    public static void main(String[] args) {
                        Agent agent = new MyAgent();

                        // 创建绑定到CPU 0的ThreadFactory
                        ThreadFactory factory = new AffinityThreadFactory("worker", 0);

                        AgentRunner runner = new AgentRunner(
                            new BusySpinIdleStrategy(),
                            Throwable::printStackTrace,
                            null,
                            agent
                        );

                        // 使用自定义ThreadFactory启动
                        AgentRunner.startOnThread(runner, factory);
                    }
                }
                

2. 优先级和命名


                /**
                 * 带优先级和命名的ThreadFactory
                 */
                public class PriorityThreadFactory implements ThreadFactory {
                    private final int priority;
                    private final String namePrefix;
                    private final boolean daemon;
                    private final AtomicInteger threadNumber = new AtomicInteger(1);

                    public PriorityThreadFactory(
                        String namePrefix,
                        int priority,
                        boolean daemon) {

                        this.namePrefix = namePrefix;
                        this.priority = priority;
                        this.daemon = daemon;
                    }

                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);

                        // 设置线程名
                        thread.setName(namePrefix + "-" + threadNumber.getAndIncrement());

                        // 设置优先级
                        thread.setPriority(priority);

                        // 设置守护线程状态
                        thread.setDaemon(daemon);

                        return thread;
                    }

                    // 使用示例
                    public static void main(String[] args) {
                        Agent criticalAgent = new CriticalAgent();
                        Agent normalAgent = new NormalAgent();

                        // 高优先级ThreadFactory
                        ThreadFactory highPriority = new PriorityThreadFactory(
                            "critical",
                            Thread.MAX_PRIORITY,
                            false
                        );

                        // 普通优先级ThreadFactory
                        ThreadFactory normalPriority = new PriorityThreadFactory(
                            "normal",
                            Thread.NORM_PRIORITY,
                            true
                        );

                        // 启动不同优先级的Agent
                        AgentRunner.startOnThread(
                            new AgentRunner(new BusySpinIdleStrategy(),
                                Throwable::printStackTrace, null, criticalAgent),
                            highPriority
                        );

                        AgentRunner.startOnThread(
                            new AgentRunner(new BackoffIdleStrategy(100, 1000, 1_000_000, 10_000_000),
                                Throwable::printStackTrace, null, normalAgent),
                            normalPriority
                        );
                    }
                }
                

3. 线程组管理


                /**
                 * 使用线程组的ThreadFactory
                 */
                public class GroupedThreadFactory implements ThreadFactory {
                    private final ThreadGroup threadGroup;
                    private final String namePrefix;
                    private final AtomicInteger threadNumber = new AtomicInteger(1);

                    public GroupedThreadFactory(String groupName, String namePrefix) {
                        this.threadGroup = new ThreadGroup(groupName);
                        this.namePrefix = namePrefix;
                    }

                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(
                            threadGroup,
                            r,
                            namePrefix + "-" + threadNumber.getAndIncrement()
                        );

                        thread.setDaemon(false);
                        return thread;
                    }

                    public ThreadGroup getThreadGroup() {
                        return threadGroup;
                    }

                    public int getActiveCount() {
                        return threadGroup.activeCount();
                    }

                    // 使用示例
                    public static void main(String[] args) {
                        GroupedThreadFactory factory = new GroupedThreadFactory(
                            "agent-group",
                            "agent"
                        );

                        // 创建多个Agent
                        for (int i = 0; i < 5; i++) {
                            Agent agent = new MyAgent();
                            AgentRunner runner = new AgentRunner(
                                new SleepingIdleStrategy(1000),
                                Throwable::printStackTrace,
                                null,
                                agent
                            );

                            AgentRunner.startOnThread(runner, factory);
                        }

                        // 监控线程组
                        System.out.println("活跃线程数: " + factory.getActiveCount());
                    }
                }
                

4. 监控和统计


                /**
                 * 带监控的ThreadFactory
                 */
                public class MonitoredThreadFactory implements ThreadFactory {
                    private final String namePrefix;
                    private final AtomicInteger createdThreads = new AtomicInteger(0);
                    private final AtomicInteger activeThreads = new AtomicInteger(0);
                    private final ConcurrentHashMap<Long, ThreadInfo> threadInfoMap =
                        new ConcurrentHashMap<>();

                    static class ThreadInfo {
                        final long threadId;
                        final String threadName;
                        final long creationTime;
                        volatile long terminationTime;

                        ThreadInfo(long threadId, String threadName) {
                            this.threadId = threadId;
                            this.threadName = threadName;
                            this.creationTime = System.currentTimeMillis();
                        }
                    }

                    public MonitoredThreadFactory(String namePrefix) {
                        this.namePrefix = namePrefix;
                    }

                    @Override
                    public Thread newThread(Runnable r) {
                        final int threadNum = createdThreads.incrementAndGet();
                        final String threadName = namePrefix + "-" + threadNum;

                        Thread thread = new Thread(() -> {
                            long threadId = Thread.currentThread().getId();
                            ThreadInfo info = new ThreadInfo(threadId, threadName);
                            threadInfoMap.put(threadId, info);
                            activeThreads.incrementAndGet();

                            System.out.println("线程启动: " + threadName);

                            try {
                                r.run();
                            } finally {
                                info.terminationTime = System.currentTimeMillis();
                                activeThreads.decrementAndGet();
                                System.out.println("线程终止: " + threadName +
                                    ", 运行时长: " +
                                    (info.terminationTime - info.creationTime) + "ms");
                            }
                        });

                        thread.setName(threadName);
                        thread.setDaemon(false);

                        return thread;
                    }

                    public int getCreatedThreadsCount() {
                        return createdThreads.get();
                    }

                    public int getActiveThreadsCount() {
                        return activeThreads.get();
                    }

                    public void printStatistics() {
                        System.out.println("=== 线程统计 ===");
                        System.out.println("已创建线程: " + createdThreads.get());
                        System.out.println("活跃线程: " + activeThreads.get());
                        System.out.println("已终止线程: " +
                            (createdThreads.get() - activeThreads.get()));

                        threadInfoMap.values().forEach(info -> {
                            if (info.terminationTime > 0) {
                                System.out.println("  " + info.threadName +
                                    " - 运行时长: " +
                                    (info.terminationTime - info.creationTime) + "ms");
                            }
                        });
                    }
                }
                

实际应用场景

场景一: 高频交易系统


                /**
                 * 低延迟交易系统的ThreadFactory
                 */
                public class LowLatencyThreadFactory implements ThreadFactory {
                    private final int cpuCore;

                    public LowLatencyThreadFactory(int cpuCore) {
                        this.cpuCore = cpuCore;
                    }

                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);

                        // 1. 高优先级
                        thread.setPriority(Thread.MAX_PRIORITY);

                        // 2. 非守护线程
                        thread.setDaemon(false);

                        // 3. 绑定CPU核心(需要JNA或其他本地库)
                        thread.setName("trading-core-" + cpuCore);

                        // 4. 设置未捕获异常处理器
                        thread.setUncaughtExceptionHandler((t, e) -> {
                            System.err.println("线程 " + t.getName() + " 异常: " + e);
                            // 触发告警
                            alertSystem("Critical thread failure: " + t.getName());
                        });

                        return thread;
                    }

                    private void alertSystem(String message) {
                        // 发送告警
                        System.err.println("ALERT: " + message);
                    }
                }
                

场景二: 多租户系统


                /**
                 * 多租户隔离的ThreadFactory
                 */
                public class TenantThreadFactory implements ThreadFactory {
                    private final String tenantId;
                    private final ThreadGroup tenantGroup;

                    public TenantThreadFactory(String tenantId) {
                        this.tenantId = tenantId;
                        this.tenantGroup = new ThreadGroup("tenant-" + tenantId);
                    }

                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(tenantGroup, r);
                        thread.setName("tenant-" + tenantId + "-worker");
                        thread.setDaemon(false);

                        // 设置租户上下文
                        thread.setContextClassLoader(
                            getTenantClassLoader(tenantId)
                        );

                        return thread;
                    }

                    private ClassLoader getTenantClassLoader(String tenantId) {
                        // 返回租户特定的类加载器
                        return Thread.currentThread().getContextClassLoader();
                    }
                }
                

与 Agrona 的集成


                /**
                 * Agrona优化的ThreadFactory
                 */
                public class AgronaOptimizedThreadFactory implements ThreadFactory {

                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(() -> {
                            // 预热JIT编译器
                            warmup();

                            // 禁用偏向锁(某些情况下可以提高性能)
                            // -XX:-UseBiasedLocking

                            // 运行实际任务
                            r.run();
                        });

                        // Agent名称作为线程名
                        thread.setName("agrona-agent");
                        thread.setDaemon(false);
                        thread.setPriority(Thread.NORM_PRIORITY);

                        return thread;
                    }

                    private void warmup() {
                        // 预热关键代码路径
                        UnsafeBuffer buffer = new UnsafeBuffer(
                            ByteBuffer.allocateDirect(1024)
                        );

                        for (int i = 0; i < 1000; i++) {
                            buffer.putInt(0, i);
                            buffer.getInt(0);
                        }
                    }
                }
                

默认 vs 自定义 ThreadFactory


                默认ThreadFactory:
                ┌─────────────────────────────────────┐
                │ AgentRunner.startOnThread(runner)   │
                │                                     │
                │ • 线程名: agent-role-name           │
                │ • 优先级: NORM_PRIORITY             │
                │ • 守护状态: false                   │
                │ • 线程组: 当前线程组                │
                │ • CPU绑定: 无                       │
                └─────────────────────────────────────┘

                自定义ThreadFactory:
                ┌─────────────────────────────────────┐
                │ AgentRunner.startOnThread(          │
                │   runner, customFactory)            │
                │                                     │
                │ • 线程名: 自定义                    │
                │ • 优先级: 自定义                    │
                │ • 守护状态: 自定义                  │
                │ • 线程组: 自定义                    │
                │ • CPU绑定: 可选                     │
                │ • 其他: 监控/统计/安全...           │
                └─────────────────────────────────────┘
                

最佳实践


                /**
                 * 生产级ThreadFactory模板
                 */
                public class ProductionThreadFactory implements ThreadFactory {
                    private final String namePrefix;
                    private final int priority;
                    private final boolean daemon;
                    private final UncaughtExceptionHandler exceptionHandler;
                    private final AtomicInteger threadNumber = new AtomicInteger(1);

                    private ProductionThreadFactory(Builder builder) {
                        this.namePrefix = builder.namePrefix;
                        this.priority = builder.priority;
                        this.daemon = builder.daemon;
                        this.exceptionHandler = builder.exceptionHandler;
                    }

                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName(namePrefix + "-" + threadNumber.getAndIncrement());
                        thread.setPriority(priority);
                        thread.setDaemon(daemon);

                        if (exceptionHandler != null) {
                            thread.setUncaughtExceptionHandler(exceptionHandler);
                        }

                        return thread;
                    }

                    // Builder模式
                    public static class Builder {
                        private String namePrefix = "worker";
                        private int priority = Thread.NORM_PRIORITY;
                        private boolean daemon = false;
                        private UncaughtExceptionHandler exceptionHandler;

                        public Builder namePrefix(String namePrefix) {
                            this.namePrefix = namePrefix;
                            return this;
                        }

                        public Builder priority(int priority) {
                            this.priority = priority;
                            return this;
                        }

                        public Builder daemon(boolean daemon) {
                            this.daemon = daemon;
                            return this;
                        }

                        public Builder exceptionHandler(UncaughtExceptionHandler handler) {
                            this.exceptionHandler = handler;
                            return this;
                        }

                        public ProductionThreadFactory build() {
                            return new ProductionThreadFactory(this);
                        }
                    }

                    // 使用示例
                    public static void main(String[] args) {
                        ThreadFactory factory = new Builder()
                            .namePrefix("critical-agent")
                            .priority(Thread.MAX_PRIORITY)
                            .daemon(false)
                            .exceptionHandler((t, e) -> {
                                System.err.println("Thread " + t.getName() + " failed: " + e);
                            })
                            .build();

                        Agent agent = new MyAgent();
                        AgentRunner runner = new AgentRunner(
                            new BusySpinIdleStrategy(),
                            Throwable::printStackTrace,
                            null,
                            agent
                        );

                        AgentRunner.startOnThread(runner, factory);
                    }
                }
                

相关链接

总结

使用自定义 ThreadFactory 可以精确控制 Agent 线程的创建和配置:

主要用途:

  • CPU 核心绑定 - 提高缓存局部性
  • 优先级设置 - 控制调度顺序
  • 线程命名 - 便于监控和调试
  • 线程组管理 - 批量操作线程
  • 监控统计 - 跟踪线程生命周期

适用场景:

  • 低延迟系统(CPU绑定)
  • 多租户系统(隔离)
  • 高可用系统(监控告警)
  • 资源受限环境(优先级控制)

实现要点:

  • 使用 AgentRunner.startOnThread(runner, factory)
  • ThreadFactory 必须是线程安全的
  • 考虑线程命名规范
  • 设置未捕获异常处理器
  • 根据需求选择守护线程状态

注意事项:

  • CPU绑定需要操作系统支持(JNA/JNI)
  • 线程优先级只是建议,不保证
  • 避免创建过多高优先级线程
  • 监控线程资源使用情况

自定义 ThreadFactory 是优化 Agrona Agent 性能和可管理性的强大工具。