如何在 Agent Runner 中使用自定义 ThreadFactory
问题描述
你需要使用特定的 ThreadFactory 来管理进程中的线程,并且想要调度 Agrona Agent。
解决方案
在 AgentRunner 构造函数中指定 ThreadFactory。
示例代码
void startWithMyThreadFactory(ThreadFactory factory)
{
Agent agent = new MyAgent();
AgentRunner myAgentRunner = new AgentRunner(idleStrategy,
Throwable::printStackTrace,
errorCounter, agent);
AgentRunner.startOnThread(myAgentRunner, factory);
}
详细说明
ThreadFactory 的作用
线程创建流程:
┌─────────────────────────────────────────────┐
│ AgentRunner.startOnThread() │
│ │
│ 1. 接收 ThreadFactory │
│ ┌────────────────────────┐ │
│ │ factory.newThread( │ │
│ │ runnable) │ │
│ └────────┬───────────────┘ │
│ │ │
│ ▼ │
│ 2. 创建并配置线程 │
│ ┌────────────────────────┐ │
│ │ • 设置线程名 │ │
│ │ • 设置优先级 │ │
│ │ • 设置守护状态 │ │
│ │ • 绑定CPU核心 │ │
│ │ • 设置线程组 │ │
│ └────────┬───────────────┘ │
│ │ │
│ ▼ │
│ 3. 启动线程 │
│ ┌────────────────────────┐ │
│ │ thread.start() │ │
│ └────────────────────────┘ │
└─────────────────────────────────────────────┘
使用场景
这在你有自定义 ThreadFactory 时非常有用,它可以以特定于应用程序的方式管理线程,例如:
- CPU 核心绑定 - 将线程固定到特定 CPU 核心
- 线程命名 - 使用自定义命名规则
- 优先级设置 - 设置特定的线程优先级
- 线程组管理 - 将线程加入特定线程组
- 监控和统计 - 跟踪线程创建和销毁
完整实现示例
1. CPU 核心绑定
import org.agrona.concurrent.*;
import org.agrona.hints.ThreadHints;
/**
* 将线程绑定到特定CPU核心的ThreadFactory
*/
public class AffinityThreadFactory implements ThreadFactory {
private final int cpuId;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public AffinityThreadFactory(String namePrefix, int cpuId) {
this.namePrefix = namePrefix;
this.cpuId = cpuId;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(() -> {
// 设置CPU亲和性
ThreadHints.onSpinWait(); // JDK 9+
// 在某些系统上可以使用JNA绑定CPU
// setThreadAffinity(cpuId);
System.out.println(
Thread.currentThread().getName() +
" 绑定到CPU核心: " + cpuId
);
r.run();
});
thread.setName(namePrefix + "-" + threadNumber.getAndIncrement());
thread.setDaemon(false);
return thread;
}
// 使用示例
public static void main(String[] args) {
Agent agent = new MyAgent();
// 创建绑定到CPU 0的ThreadFactory
ThreadFactory factory = new AffinityThreadFactory("worker", 0);
AgentRunner runner = new AgentRunner(
new BusySpinIdleStrategy(),
Throwable::printStackTrace,
null,
agent
);
// 使用自定义ThreadFactory启动
AgentRunner.startOnThread(runner, factory);
}
}
2. 优先级和命名
/**
* 带优先级和命名的ThreadFactory
*/
public class PriorityThreadFactory implements ThreadFactory {
private final int priority;
private final String namePrefix;
private final boolean daemon;
private final AtomicInteger threadNumber = new AtomicInteger(1);
public PriorityThreadFactory(
String namePrefix,
int priority,
boolean daemon) {
this.namePrefix = namePrefix;
this.priority = priority;
this.daemon = daemon;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
// 设置线程名
thread.setName(namePrefix + "-" + threadNumber.getAndIncrement());
// 设置优先级
thread.setPriority(priority);
// 设置守护线程状态
thread.setDaemon(daemon);
return thread;
}
// 使用示例
public static void main(String[] args) {
Agent criticalAgent = new CriticalAgent();
Agent normalAgent = new NormalAgent();
// 高优先级ThreadFactory
ThreadFactory highPriority = new PriorityThreadFactory(
"critical",
Thread.MAX_PRIORITY,
false
);
// 普通优先级ThreadFactory
ThreadFactory normalPriority = new PriorityThreadFactory(
"normal",
Thread.NORM_PRIORITY,
true
);
// 启动不同优先级的Agent
AgentRunner.startOnThread(
new AgentRunner(new BusySpinIdleStrategy(),
Throwable::printStackTrace, null, criticalAgent),
highPriority
);
AgentRunner.startOnThread(
new AgentRunner(new BackoffIdleStrategy(100, 1000, 1_000_000, 10_000_000),
Throwable::printStackTrace, null, normalAgent),
normalPriority
);
}
}
3. 线程组管理
/**
* 使用线程组的ThreadFactory
*/
public class GroupedThreadFactory implements ThreadFactory {
private final ThreadGroup threadGroup;
private final String namePrefix;
private final AtomicInteger threadNumber = new AtomicInteger(1);
public GroupedThreadFactory(String groupName, String namePrefix) {
this.threadGroup = new ThreadGroup(groupName);
this.namePrefix = namePrefix;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(
threadGroup,
r,
namePrefix + "-" + threadNumber.getAndIncrement()
);
thread.setDaemon(false);
return thread;
}
public ThreadGroup getThreadGroup() {
return threadGroup;
}
public int getActiveCount() {
return threadGroup.activeCount();
}
// 使用示例
public static void main(String[] args) {
GroupedThreadFactory factory = new GroupedThreadFactory(
"agent-group",
"agent"
);
// 创建多个Agent
for (int i = 0; i < 5; i++) {
Agent agent = new MyAgent();
AgentRunner runner = new AgentRunner(
new SleepingIdleStrategy(1000),
Throwable::printStackTrace,
null,
agent
);
AgentRunner.startOnThread(runner, factory);
}
// 监控线程组
System.out.println("活跃线程数: " + factory.getActiveCount());
}
}
4. 监控和统计
/**
* 带监控的ThreadFactory
*/
public class MonitoredThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger createdThreads = new AtomicInteger(0);
private final AtomicInteger activeThreads = new AtomicInteger(0);
private final ConcurrentHashMap<Long, ThreadInfo> threadInfoMap =
new ConcurrentHashMap<>();
static class ThreadInfo {
final long threadId;
final String threadName;
final long creationTime;
volatile long terminationTime;
ThreadInfo(long threadId, String threadName) {
this.threadId = threadId;
this.threadName = threadName;
this.creationTime = System.currentTimeMillis();
}
}
public MonitoredThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
@Override
public Thread newThread(Runnable r) {
final int threadNum = createdThreads.incrementAndGet();
final String threadName = namePrefix + "-" + threadNum;
Thread thread = new Thread(() -> {
long threadId = Thread.currentThread().getId();
ThreadInfo info = new ThreadInfo(threadId, threadName);
threadInfoMap.put(threadId, info);
activeThreads.incrementAndGet();
System.out.println("线程启动: " + threadName);
try {
r.run();
} finally {
info.terminationTime = System.currentTimeMillis();
activeThreads.decrementAndGet();
System.out.println("线程终止: " + threadName +
", 运行时长: " +
(info.terminationTime - info.creationTime) + "ms");
}
});
thread.setName(threadName);
thread.setDaemon(false);
return thread;
}
public int getCreatedThreadsCount() {
return createdThreads.get();
}
public int getActiveThreadsCount() {
return activeThreads.get();
}
public void printStatistics() {
System.out.println("=== 线程统计 ===");
System.out.println("已创建线程: " + createdThreads.get());
System.out.println("活跃线程: " + activeThreads.get());
System.out.println("已终止线程: " +
(createdThreads.get() - activeThreads.get()));
threadInfoMap.values().forEach(info -> {
if (info.terminationTime > 0) {
System.out.println(" " + info.threadName +
" - 运行时长: " +
(info.terminationTime - info.creationTime) + "ms");
}
});
}
}
实际应用场景
场景一: 高频交易系统
/**
* 低延迟交易系统的ThreadFactory
*/
public class LowLatencyThreadFactory implements ThreadFactory {
private final int cpuCore;
public LowLatencyThreadFactory(int cpuCore) {
this.cpuCore = cpuCore;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
// 1. 高优先级
thread.setPriority(Thread.MAX_PRIORITY);
// 2. 非守护线程
thread.setDaemon(false);
// 3. 绑定CPU核心(需要JNA或其他本地库)
thread.setName("trading-core-" + cpuCore);
// 4. 设置未捕获异常处理器
thread.setUncaughtExceptionHandler((t, e) -> {
System.err.println("线程 " + t.getName() + " 异常: " + e);
// 触发告警
alertSystem("Critical thread failure: " + t.getName());
});
return thread;
}
private void alertSystem(String message) {
// 发送告警
System.err.println("ALERT: " + message);
}
}
场景二: 多租户系统
/**
* 多租户隔离的ThreadFactory
*/
public class TenantThreadFactory implements ThreadFactory {
private final String tenantId;
private final ThreadGroup tenantGroup;
public TenantThreadFactory(String tenantId) {
this.tenantId = tenantId;
this.tenantGroup = new ThreadGroup("tenant-" + tenantId);
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(tenantGroup, r);
thread.setName("tenant-" + tenantId + "-worker");
thread.setDaemon(false);
// 设置租户上下文
thread.setContextClassLoader(
getTenantClassLoader(tenantId)
);
return thread;
}
private ClassLoader getTenantClassLoader(String tenantId) {
// 返回租户特定的类加载器
return Thread.currentThread().getContextClassLoader();
}
}
与 Agrona 的集成
/**
* Agrona优化的ThreadFactory
*/
public class AgronaOptimizedThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(() -> {
// 预热JIT编译器
warmup();
// 禁用偏向锁(某些情况下可以提高性能)
// -XX:-UseBiasedLocking
// 运行实际任务
r.run();
});
// Agent名称作为线程名
thread.setName("agrona-agent");
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
private void warmup() {
// 预热关键代码路径
UnsafeBuffer buffer = new UnsafeBuffer(
ByteBuffer.allocateDirect(1024)
);
for (int i = 0; i < 1000; i++) {
buffer.putInt(0, i);
buffer.getInt(0);
}
}
}
默认 vs 自定义 ThreadFactory
默认ThreadFactory:
┌─────────────────────────────────────┐
│ AgentRunner.startOnThread(runner) │
│ │
│ • 线程名: agent-role-name │
│ • 优先级: NORM_PRIORITY │
│ • 守护状态: false │
│ • 线程组: 当前线程组 │
│ • CPU绑定: 无 │
└─────────────────────────────────────┘
自定义ThreadFactory:
┌─────────────────────────────────────┐
│ AgentRunner.startOnThread( │
│ runner, customFactory) │
│ │
│ • 线程名: 自定义 │
│ • 优先级: 自定义 │
│ • 守护状态: 自定义 │
│ • 线程组: 自定义 │
│ • CPU绑定: 可选 │
│ • 其他: 监控/统计/安全... │
└─────────────────────────────────────┘
最佳实践
/**
* 生产级ThreadFactory模板
*/
public class ProductionThreadFactory implements ThreadFactory {
private final String namePrefix;
private final int priority;
private final boolean daemon;
private final UncaughtExceptionHandler exceptionHandler;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private ProductionThreadFactory(Builder builder) {
this.namePrefix = builder.namePrefix;
this.priority = builder.priority;
this.daemon = builder.daemon;
this.exceptionHandler = builder.exceptionHandler;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(namePrefix + "-" + threadNumber.getAndIncrement());
thread.setPriority(priority);
thread.setDaemon(daemon);
if (exceptionHandler != null) {
thread.setUncaughtExceptionHandler(exceptionHandler);
}
return thread;
}
// Builder模式
public static class Builder {
private String namePrefix = "worker";
private int priority = Thread.NORM_PRIORITY;
private boolean daemon = false;
private UncaughtExceptionHandler exceptionHandler;
public Builder namePrefix(String namePrefix) {
this.namePrefix = namePrefix;
return this;
}
public Builder priority(int priority) {
this.priority = priority;
return this;
}
public Builder daemon(boolean daemon) {
this.daemon = daemon;
return this;
}
public Builder exceptionHandler(UncaughtExceptionHandler handler) {
this.exceptionHandler = handler;
return this;
}
public ProductionThreadFactory build() {
return new ProductionThreadFactory(this);
}
}
// 使用示例
public static void main(String[] args) {
ThreadFactory factory = new Builder()
.namePrefix("critical-agent")
.priority(Thread.MAX_PRIORITY)
.daemon(false)
.exceptionHandler((t, e) -> {
System.err.println("Thread " + t.getName() + " failed: " + e);
})
.build();
Agent agent = new MyAgent();
AgentRunner runner = new AgentRunner(
new BusySpinIdleStrategy(),
Throwable::printStackTrace,
null,
agent
);
AgentRunner.startOnThread(runner, factory);
}
}
相关链接
总结
使用自定义 ThreadFactory 可以精确控制 Agent 线程的创建和配置:
主要用途:
- ✅ CPU 核心绑定 - 提高缓存局部性
- ✅ 优先级设置 - 控制调度顺序
- ✅ 线程命名 - 便于监控和调试
- ✅ 线程组管理 - 批量操作线程
- ✅ 监控统计 - 跟踪线程生命周期
适用场景:
- 低延迟系统(CPU绑定)
- 多租户系统(隔离)
- 高可用系统(监控告警)
- 资源受限环境(优先级控制)
实现要点:
- 使用
AgentRunner.startOnThread(runner, factory) - ThreadFactory 必须是线程安全的
- 考虑线程命名规范
- 设置未捕获异常处理器
- 根据需求选择守护线程状态
注意事项:
- CPU绑定需要操作系统支持(JNA/JNI)
- 线程优先级只是建议,不保证
- 避免创建过多高优先级线程
- 监控线程资源使用情况
自定义 ThreadFactory 是优化 Agrona Agent 性能和可管理性的强大工具。