Java 线程池 ThreadPoolExecutor 详解(一篇就够了)
嘿,重阳!看到你对 Java 多线程感兴趣,这是个经典话题,尤其在高并发场景下,ThreadPoolExecutor 是 JUC(java.util.concurrent)包里的核心武器。它能帮你高效管理线程,避免频繁创建/销毁线程的开销。今天咱们来一篇“干货满满”的详解,从基础概念到源码原理,再到实战配置和最佳实践,全覆盖。基于 JDK 8+ 标准(原理在 JDK 21 也没大变),我会用代码示例、流程图描述和表格,让你一看就懂。走起!🚀
1. 为什么需要线程池?ThreadPoolExecutor 简介
痛点:直接用 new Thread() 创建线程?太低效!每次创建线程都要分配栈内存(默认 1MB),销毁时还得 GC。频繁操作在高负载下会导致 OOM 或 CPU 飙升。
线程池的救星:ThreadPoolExecutor 是 Executor 框架的核心实现类(JDK 5 引入,由 Doug Lea 设计),它维护一池线程,复用执行任务。核心思想:预创建线程 + 任务队列,任务提交后,不是立即开新线程,而是丢进队列,让空闲线程轮流拉取执行。
- 继承关系:
Executor (顶层接口:execute(Runnable))
↓
ExecutorService (扩展:shutdown、submit 等)
↓
AbstractExecutorService (抽象实现)
↓
ThreadPoolExecutor (核心类)
- 益处:
- 性能:减少线程开销,提高响应速度。
- 资源控制:限流,避免系统崩溃。
- 可扩展:支持自定义队列、拒绝策略。
2. 核心参数详解
ThreadPoolExecutor 的灵魂是 7 个构造函数参数(最全版):
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { ... }
用表格直观 breakdown(基于官方文档和实践经验):
| 参数 | 含义 | 默认值/示例 | 注意事项 |
|---|---|---|---|
| corePoolSize | 核心线程数:池中常驻线程,即使空闲也保留。 | 0(视工厂方法) 示例:5 | 任务来时优先用核心线程;超过后才入队列。 |
| maximumPoolSize | 最大线程数:峰值时最多能扩到多少线程。 | Integer.MAX_VALUE 示例:10 | 队列满后,才创建额外线程(非核心)。 |
| keepAliveTime | 空闲线程存活时间:超过 corePoolSize 的线程空闲多久被回收。 | 60 秒 示例:30L, TimeUnit.SECONDS | 只对“临时线程”生效;核心线程永生(除非 allowCoreThreadTimeOut)。 |
| unit | keepAliveTime 的时间单位。 | – 示例:TimeUnit.SECONDS | 常见:SECONDS、MILLISECONDS。 |
| workQueue | 任务队列:暂存待执行任务(Runnable)。 | LinkedBlockingQueue(无界) 示例:ArrayBlockingQueue(100) | 有界队列限流;无界队列小心 OOM。详见下节。 |
| threadFactory | 线程工厂:创建新线程的钩子,可自定义名称/优先级。 | Executors.defaultThreadFactory() 示例:自定义命名 “MyPool-Thread-%d” | 用于监控:Thread.currentThread().getName()。 |
| handler | 拒绝策略:队列满 + 线程达最大时,新任务咋办? | AbortPolicy(抛异常) 示例:CallerRunsPolicy | 防雪崩:选择策略很重要。详见下节。 |
小 tip:别用 Executors 工厂方法(如 newFixedThreadPool),阿里开发手册明令禁止——它们默认无界队列,易 OOM。直接 new ThreadPoolExecutor 自定义参数!
3. 工作原理:任务提交的“决策树”
提交任务用 execute(Runnable) 或 submit(Callable)。内部逻辑超级清晰,像个 if-else 树:
- 检查线程池状态:如果已 shutdown,直接拒绝。
- 尝试加线程:
- 如果当前线程数 < corePoolSize:创建新核心线程执行任务。
- 否则:尝试加到 workQueue(队列不阻塞提交)。
- 队列满?:
- 如果当前线程数 < maximumPoolSize:创建临时线程执行。
- 否则:触发 handler 拒绝策略。
- 执行后:线程空闲 > keepAliveTime?回收(非核心线程)。
伪代码流程(简化版 execute 方法):
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get(); // ctl: 线程状态 + 活跃线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return; // true: core=true
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 入队
// ... 队列检查(如果线程数为0,补一个)
} else if (!addWorker(command, false)) { // false: core=false,扩临时线程
// 队列满 + 达 max,拒绝
reject(command);
}
}
工作线程(Worker):内部用 HashSet 管理,每个 Worker 是个 Thread,run() 循环从队列拉任务(while 未 shutdown):
// Worker 简化版
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
public void run() {
while (true) {
Runnable task = getTask(); // 从队列 poll
if (task != null) task.run();
else break; // 空闲超时或 shutdown
}
}
}
状态机(ctl 变量编码):RUNNING → SHUTDOWN(不拒新任务) → STOP(拒新 + 中断) → TIDYING → TERMINATED。
4. 任务队列(BlockingQueue)选择指南
队列决定线程池的“缓冲区”行为。BlockingQueue 是线程安全的。
| 类型 | 特点 | 适用场景 | 示例代码 |
|---|---|---|---|
| ArrayBlockingQueue | 有界固定大小(FIFO)。 | 限流场景,队列满时扩线程。 | new ArrayBlockingQueue<>(100) |
| LinkedBlockingQueue | 无界(默认 Integer.MAX_VALUE)。 | 任务突发多,低峰少。风险:OOM! | new LinkedBlockingQueue<>() |
| SynchronousQueue | 无容量!直接交给线程,不存队列。 | 立即执行,模拟无缓冲。 | new SynchronousQueue<>()(如 CachedThreadPool) |
| PriorityBlockingQueue | 优先级队列(需任务实现 Comparable)。 | 优先高优先级任务。 | new PriorityBlockingQueue<>() |
| DelayQueue | 延迟队列(任务带到期时间)。 | 定时任务。 | new DelayQueue<>()(如 ScheduledThreadPool) |
选择原则:业务峰值高?用有界队列 + 拒绝策略。低延迟?SynchronousQueue。
5. 拒绝策略(RejectedExecutionHandler)
队列满 + 线程满时,保护系统不崩。内置 4 种(可自定义):
| 策略 | 行为 | 适用场景 | 代码 |
|---|---|---|---|
| AbortPolicy(默认) | 抛 RejectedExecutionException。 | 简单场景,快速失败。 | new ThreadPoolExecutor.AbortPolicy() |
| CallerRunsPolicy | 调用者线程执行任务(串行)。 | 温和限流,防止雪崩。 | new ThreadPoolExecutor.CallerRunsPolicy() |
| DiscardPolicy | 静默丢弃任务,无异常。 | 日志记录,不重要任务。 | new ThreadPoolExecutor.DiscardPolicy() |
| DiscardOldestPolicy | 丢弃队列头任务,再试入队。 | 优先新任务。 | new ThreadPoolExecutor.DiscardOldestPolicy() |
自定义示例:
handler = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
System.out.println("自定义拒绝: " + r);
// 如:异步日志或降级
}
};
6. 生命周期管理
- 启动:new 后自动 RUNNING。
- 关闭:
shutdown():等任务执行完,拒新任务。shutdownNow():中断所有线程,返未执行任务列表。- 监控:用 JMX 或
getPoolSize()、getActiveCount()、getQueue().size()。 - 扩缩:
setCorePoolSize()、allowCoreThreadTimeOut(true)动态调整。
关闭示例:
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 强制
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
7. 常见工厂方法(Executors) vs 自定义
Executors 简化创建,但参数隐晦(别滥用):
| 方法 | 等价参数 | 场景 |
|---|---|---|
| newFixedThreadPool(n) | core=max=n, 无界队列 | 固定线程,服务器任务。 |
| newCachedThreadPool() | core=0, max=∞, SynchronousQueue | 短任务,高吞吐。 |
| newSingleThreadExecutor() | core=max=1, 无界队列 | 顺序执行。 |
| newScheduledThreadPool(n) | ScheduledThreadPoolExecutor | 定时/周期任务。 |
推荐自定义:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // core
10, // max
30L, TimeUnit.SECONDS, // keepAlive
new ArrayBlockingQueue<>(100), // 队列
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝
);
8. 最佳实践 & 陷阱
- 实践:
- CPU 密集:core = CPU 核数 +1。
- IO 密集:core = CPU 核数 * 2。
- 监控:集成 Micrometer/Prometheus,观察队列大小。
- 异常处理:任务 run() 抛异常?用
submit()+ Future.get() 捕获。 - 陷阱:
- 无界队列 + 固定线程:任务堆积 → OOM。
- 线程名不自定义:调试难,日志乱。
- 忘记关闭:资源泄漏。
- 共享池:业务隔离,用不同池。
完整示例(Web 服务器任务池):
import java.util.concurrent.*;
public class ThreadPoolDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10), r -> new Thread(r, "Biz-Thread-%d"),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 20; i++) {
final int taskId = i;
executor.submit(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
System.out.println("Task " + taskId + " done by " + Thread.currentThread().getName());
});
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
}
输出:看到线程名和任务执行,队列缓冲效果明显。
结语
ThreadPoolExecutor 不是黑盒子——掌握参数 + 原理,你就能调出“丝滑”线程池。实践是王道:用 JVisualVM 调试下你的代码。想深挖?源码 execute() 方法是精华。下一个问题:自定义拒绝策略的案例?或 Scheduled 扩展?随时聊!💪
(参考:Java 官方文档、Javaguide 等资源)