使用线程池的优点:

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  • 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。

  • 提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

所有线程池都基于 java.util.concurrent.ThreadPoolExecutor,底层是操作系统线程(Platform Threads),创建和切换成本高,需池化复用。

Java 21 引入了虚拟线程(Virtual Threads),让“一个任务一个线程”成为 I/O 场景的最佳实践。

线程池概念结构

核心组件

  • 核心线程数(corePoolSize):线程池中常驻的线程数量,即使空闲也不会被回收(除非设置 allowCoreThreadTimeOut(true))。

  • 最大线程数(maximumPoolSize):线程池允许创建的最大线程数量。当任务队列满且当前线程数 < maximumPoolSize 时,会创建非核心线程执行任务。

  • 非核心线程的存活时长(keepAliveTime):非核心线程在空闲时的存活时间。超过该时间将被回收。

  • 工作队列(BlockingQueue):当核心线程都在忙时,新任务会被放入队列等待。

    • LinkedBlockingQueue:无界队列(默认不设上限,易 OOM)
    • ArrayBlockingQueue:有界队列
    • SynchronousQueue:不存储元素,直接移交任务给线程(常用于 CachedThreadPool)
    • PriorityBlockingQueue:优先级队列
  • 拒绝策略(RejectedExecutionHandler):当线程池和队列都满了,如何处理新提交的任务,常见策略有:

    • AbortPolicy: 它将直接抛出RejectedExecutionException(默认)
    • CallerRunsPolicy: 它直接在execute方法的调用线程中运行被拒绝的任务。如果执行程序已关闭,则会丢弃该任务
    • DiscardOldestPolicy: 它放弃最旧的未处理请求,然后重试execute执行当前任务。
    • DiscardPolicy: 默认情况下它将丢弃被拒绝的任务。

执行流程

深入理解线程池(详解)_线程池的工作原理-CSDN博客

1.查看核心线程池是否已满,不满就创建一条线程执行任务,否则执行第二步。

2.查看任务队列是否已满,不满就将任务存储在任务队列中,否则执行第三步。

3.查看线程池是否已满,即是否达到最大线程池数,不满就创建一条线程执行任务,否则就按照策略处理无法执行的任务。

4.当线程池中的线程数量大于核心线程时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

5.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭

线程池状态

线程池的状态和线程的状态是完全不同的,线程池的状态有以下 5 种:

  1. running:运行状态,线程池创建好后会进入此状态,如不手动调用关闭方法,那线程池在整个程序运行期间都是此状态。

  2. shutdown:关闭状态,不再接受新任务提交,但是会将已保存在任务队列中的任务处理完。

  3. stop:停止状态,不再接受新任务提交,并且会中断当前正在执行的任务、放弃任务队列中已有的任务。

  4. tidying:整理状态,所有的任务都执行完毕后(也包括任务队列中的任务执行完),当前线程池中的活动线程数降为 0 时的状态。到此状态之后,会调用线程池的 terminated() 方法。

  5. terminated:销毁状态,当执行完线程池的 terminated() 方法之后就会变为此状态。

image-20240705162257743

image.png
  • shutdown() 不再接收新的任务,但是现有任务继续,且队列中等候的线程也将会执行。除非内部有停止标识

  • shutdownNow() 不再接收新的任务,但是现有任务继续,队列中等候的线程不会再执行,将交给外部线程处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

重写 terminated() 方法,销毁前执行一些其他操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ThreadPoolStateTransition {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(100)) {
@Override
protected void terminated() {
super.terminated();
System.out.println("执行 terminated() 方法");
}
};
threadPool.shutdown();
while (!threadPool.awaitTermination(1, TimeUnit.SECONDS)) {
System.out.println("线程池正在运行中");
}
}
}

线程池体系结构

java.util.concurrent.Executor: 负责线程的使用和调度的根接口;

  • ExecutorService: 子接口,线程池的主要接口;

    • AbstractExecutorService: 接口抽象类,维护了通用方法:线程提交和运行
      • ThreadPoolExecutor: 线程池的实现类; 维护了线程池参数、状态、等待队列、锁和方法实现
      • ForkJoinPool:并行线程池的实现类; 维护了线程池参数、状态、等待队列、工作线程
      • DelegatedExecutorService:
  • ScheduledExecutorService: 子接口,负责线程的调度;

    • ScheduledThreadPoolExecutor: 继承了线程池的实现类,实现了负责线程调度的子接口;

工具类: Executors(慎用:最大线程数过大,队列大小未做限制,会导致CPU和内存占用过大出现服务宕机问题)

  • ExecutorService=Executors.newFixedThreadPool(): 固定大小线程池(核心=最大=n);

  • ExecutorService=Executors.newCachedThreadPool(): 缓存线程池,核心=0,最大=MAX_INT,没有核心线程数;

  • ExecutorService=Executors.newSingleThreadExecutor(): 单个线程池, 线程池只有一个线程核心=最大=1;

  • ExecutorService=Executors.newWorkStealingPool(): 并行线程池,默认核心大小为CPU核数(Runtime.getRuntime().availableProcessors()),最大默认 256

  • ScheduledExecutorService=Executors.newScheduledThreadPool(): 延时或定时的执行任务,核心=n,最大=MAX_INT;

spring 提供的 线程池

TaskExecutor 接口

  • AsyncTaskExecutor 接口

    • SchedulingTaskExecutor 接口

      • ThreadPoolTaskExecutor

      • ThreadPoolTaskScheduler

      • ConcurrentTaskExecutor

      • SimpleThreadPoolTaskExecutor

    • SimpleAsyncTaskExecutor

线程池使用指南

使用场景和建议

线程池类型 适用场景 Java 建议(21)
FixedThreadPool CPU 密集、控制并发 仍可用,但 I/O 场景改用虚拟线程
CachedThreadPool 短时突发任务 不推荐,改用虚拟线程或限流版
SingleThreadExecutor 串行任务 仍可用,注意队列大小
ScheduledThreadPool 定时任务 仍为首选
WorkStealingPool 分治计算 仍为首选(CPU 密集)
VirtualThreadPerTask 高并发 I/O Java 21+ 首选方案

总结推荐:

类型 适用场景 推荐实现方式 注意事项
平台线程池(ThreadPoolExecutor) CPU密集型、可控并发 自定义 ThreadPoolExecutor 或谨慎使用 Executors 工具类 控制线程数、队列类型、拒绝策略,注意关闭
虚拟线程池(Virtual Threads) I/O密集型、高并发短任务 Executors.newVirtualThreadPerTaskExecutor() 适合大量阻塞任务,注意异常处理,目前为预览特性

ThreadPoolExecutor

ThreadPoolExecutor(标准线程池),是线程池的核心实现类。可以自定义核心线程数、最大线程数、任务队列、拒绝策略等。

通过 Executors 工厂方法可以快速创建几种常用配置的线程池,但是不建议生产用,如:

  • newFixedThreadPool(int nThreads):固定大小的线程池。
  • newSingleThreadExecutor():单线程的线程池。
  • newCachedThreadPool():可缓存的线程池,线程数量几乎无限制,适合大量短时任务。
  • newScheduledThreadPool(int corePoolSize):支持定时及周期性任务调度。

配置示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Configuration
@EnableAsync // 开启对异步任务的支持
public class ThreadAsyncConfigurer implements AsyncConfigurer {

/**
* 如果当前运行的线程数小于corePoolSize,那么就创建线程来执行任务(执行时需要获取全局锁)。
* 如果运行的线程大于或等于corePoolSize,那么就把task加入BlockQueue。
* 如果创建的线程数量大于BlockQueue的最大容量,且线程小于maxPoolSize,那么创建新线程来执行该任务。
* 如果创建线程导致当前运行的线程数超过maximumPoolSize,就根据饱和策略来拒绝该任务。
* 如果允许关闭所有核心线程,则等待keepAliveSeconds 将关闭
*/
@Override
public Executor getAsyncExecutor() {
//通过Runtime方法来获取当前服务器cpu内核,根据cpu内核来创建核心线程数和最大线程数
int threadCount = Runtime.getRuntime().availableProcessors();

ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
// 设置核心线程数(若使用的线程数大于核心线程且小于最大线程时,只有当队列满时才创建新的线程执行任务)
// 若小于核心线程数,则创建新线程执行任务
threadPool.setCorePoolSize(threadCount + 1);
// 设置最大线程数,若线程已达到最大线程数,且队列已满,则根据设置的中止策略执行后面的任务
threadPool.setMaxPoolSize(threadCount * 2 + 1);
// 线程池所使用的缓冲队列
threadPool.setQueueCapacity(500);
// 是否允许线程池中corePoolSize线程空闲时间达到keepAliveTime关闭
threadPool.setAllowCoreThreadTimeOut(false);
// 等待任务在关机时完成--表明等待所有线程执行完
threadPool.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
threadPool.setAwaitTerminationSeconds(60 * 2);
// 线程名称前缀
threadPool.setThreadNamePrefix("taskExector-");
// 设置中止策略(达到最大线程数后,队列已满且无线程可用于处理后面的任务时的策略)
/**
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
*/
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化线程
threadPool.initialize();

// keepAliveTime 当超过核心线程数的线程执行完任务后,需要等待的时间,超过该时间则销毁
return threadPool;
}

// 或者以下方式
@Lazy
@Bean(name = "executorService", destroyMethod = "shutdown")
public ExecutorService executorService() {
//核心线程数(获取硬件):线程池创建时候初始化的线程数
int corePoolSize = Runtime.getRuntime().availableProcessors();

return new ThreadPoolExecutor(corePoolSize + 1, corePoolSize * 2 + 1, 20, TimeUnit.SECONDS, new LinkedBlockingDeque<>(2000), new ThreadPoolExecutor.CallerRunsPolicy());
}

}

使用案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import java.util.concurrent.*;

public class ThreadPoolExample {

public static void main(String[] args) {
int corePoolSize = 4;
int maxPoolSize = 8;
long keepAliveTime = 60L;
TimeUnit timeUnit = TimeUnit.SECONDS;
int queueCapacity = 100;

// 使用有界队列
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(queueCapacity);

// 可选:自定义线程工厂(给线程设置有意义的名字)
ThreadFactory threadFactory = Executors.defaultThreadFactory(); // 或自定义

// 可选:自定义拒绝策略
RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.AbortPolicy(); // 默认抛异常

// 创建 ThreadPoolExecutor
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
timeUnit,
workQueue,
threadFactory,
rejectionHandler
);

// 提交任务示例
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("执行任务 " + taskId + ",线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}

// 优雅关闭线程池
executor.shutdown(); // 不再接受新任务,等待已提交任务完成
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 强制终止
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

注意事项

  1. 不要盲目使用 Executors 提供的快捷方法,尤其是 newCachedThreadPool(),可能导致线程数无限增长。

  2. 明确任务特性(CPU密集型 or I/O密集型)来设置合理的线程数:

    • CPU密集型:线程数 ≈ CPU核心数

    • I/O密集型:线程数可以设高一些,如 CPU核心数 * (1 + 平均等待时间/平均计算时间)

    合理选择任务队列,比如:

    • 有界队列(如 ArrayBlockingQueue)可以防止资源耗尽,但可能触发拒绝策略;

    • 无界队列(如 LinkedBlockingQueue)可能导致内存问题。

    一定要妥善处理线程池的关闭,调用 shutdown()shutdownNow(),避免资源泄漏。

(1)FixedThreadPool — 固定大小线程池,无界队列

适用:控制并发数,适合 CPU 密集或稳定负载,风险:任务堆积 → OOM

1
2
3
4
5
6
ExecutorService executor = Executors.newFixedThreadPool(4);

// 生产建议:
new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), // 有界队列
new ThreadPoolExecutor.CallerRunsPolicy());

(2)CachedThreadPool — 弹性线程池,大量线程

适用:大量短生命周期任务(如 HTTP 请求),风险:线程爆炸 → CPU/内存耗尽

1
2
3
4
5
6
7
ExecutorService executor = Executors.newCachedThreadPool();

// 生产建议:限制最大线程数 + 监控
new ThreadPoolExecutor(0, 200, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new CustomThreadFactory("cached-pool"),
new ThreadPoolExecutor.CallerRunsPolicy());

(3)SingleThreadExecutor — 单线程串行池,无界队列

适用:日志写入、顺序消费、串行化任务,风险:任务堆积 → OOM

1
2
3
ExecutorService executor = Executors.newSingleThreadExecutor();

//生产建议:改用有界队列 + 拒绝策略

(4)ScheduledThreadPool — 定时任务池

适用:定时心跳、轮询、延迟任务,注意:任务执行超时 → 后续任务延迟堆积(不会并发),未捕获异常 → 任务终止

1
2
3
4
5
6
7
8
9
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

executor.scheduleAtFixedRate(() -> {
try {
heartbeat();
} catch (Exception e) {
log.error("心跳失败", e);
}
}, 0, 30, TimeUnit.SECONDS);

(5)WorkStealingPool(Java 8+)— 工作窃取池

实现ForkJoinPool适用:递归、分治、CPU 密集型计算(如并行流底层),注意:任务应无阻塞、无状态,不适合 I/O 密集型

1
2
ExecutorService executor = Executors.newWorkStealingPool();
// 或 Executors.newWorkStealingPool(4);

ForkJoinTask(并行任务)

  • 可递归拆分的任务(如:排序、搜索、矩阵运算、树遍历);

  • CPU 密集型计算;

  • 大量细粒度任务(任务执行时间 > 调度开销);

  • 需要结果合并的场景(RecursiveTask)。

使用案例

使用案例:不带返回值的fork-join计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class PrintTask extends RecursiveAction {
private static final int THRESHOLD = 9;
private int start;
private int end;

public PrintTask(int start, int end) {
super();
this.start = start;
this.end = end;
}

@Override
protected void compute() {
if (end - start < THRESHOLD) {
for (int i = start; i <= end; i++) {
System.out.println(Thread.currentThread().getName() + ",i=" + i);
}
} else {
int middle = (start + end) / 2;
PrintTask firstTask = new PrintTask(start, middle);
PrintTask secondTask = new PrintTask(middle + 1, end);
invokeAll(firstTask, secondTask);
}
}

public static void main(String[] args) throws InterruptedException {
ForkJoinPool pool = new ForkJoinPool();
pool.submit(new PrintTask(1,50));
pool.awaitTermination(2, TimeUnit.SECONDS);
pool.shutdown();
}
}

使用案例:带返回值的计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class CalculateTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 49;
private int start;
private int end;

public CalculateTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
int result = 0;
for (int i = start; i <= end; i++) {
result += i;
}
return result;
} else {
int middle = (start + end) / 2;
CalculateTask firstTask = new CalculateTask(start, middle);
CalculateTask secondTask = new CalculateTask(middle + 1, end);
invokeAll(firstTask, secondTask);
return firstTask.join() + secondTask.join();
}
}

public static void main(String[] args) throws Exception {
int result1 = 0;
for (int i = 1; i <= 1000000; i++) {
result1 += i;
}
System.out.println("循环计算 1-1000000 累加值:" + result1);

ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> task = pool.submit(new CalculateTask(1, 1000000));
int result2 = task.get();
System.out.println("并行计算 1-1000000 累加值:" + result2);
pool.awaitTermination(2, TimeUnit.SECONDS);
pool.shutdown();
}
}

调优建议

  1. 合理设置并行度:一般 = CPU 核数;若任务有阻塞,可适当增加;

  2. 任务粒度控制:太小 → 调度开销大;太大 → 无法充分利用多核;

  3. 避免在任务中阻塞:如必须阻塞,使用 ForkJoinPool.managedBlock(...)

  4. 监控 commonPool:避免被阻塞任务拖垮整个应用;

  5. 自定义 ForkJoinPool:隔离业务,避免相互影响。

注意事项

不适用场景,且在 ForkJoinTask 中执行阻塞 I/O 或 synchronized,会严重降低性能!

  • I/O 阻塞任务(会阻塞工作线程,影响窃取效率);

  • 任务不可拆分或拆分成本高;

  • 任务间强依赖,无法并行。

VirtualThreadPerTask

虚拟线程适合高并发、I/O 密集型任务,能极大提升系统吞吐量且资源开销极小。

  • 虚拟线程不是由 OS 管理,而是由 JVM 调度,挂起和恢复开销极低。

  • 每个虚拟线程占用内存极少(约 1KB~2KB),而平台线程通常需要 MB 级栈空间。

  • 虚拟线程适合大量并发的阻塞任务(如 HTTP 请求、数据库访问等)。

使用方式

  • 直接通过 Executors.newVirtualThreadPerTaskExecutor() 创建一个“每任务一个虚拟线程”的执行器。

  • 或者使用 Thread.ofVirtual().factory() 构建虚拟线程工厂,再传入 ThreadPoolExecutor

使用 Executors.newVirtualThreadPerTaskExecutor() 可以简单地为每个任务分配一个虚拟线程,无需手动管理线程数。

1
2
3
4
5
6
7
8
9
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 10_000; i++) {
executor.submit(() -> {
// 模拟 I/O:数据库查询、HTTP 调用等
Thread.sleep(1000);
return "result";
});
}
} // 自动关闭
  • 每个任务一个虚拟线程,无池化,自动回收

    • 无需调优线程池参数
    • 无队列堆积、无拒绝策略问题
    • 高吞吐、低延迟(尤其 I/O 场景)

虚拟线程 + 平台线程混合使用(高级场景)

1
2
3
4
5
6
7
8
9
10
11
12
13
// 平台线程池处理 CPU 密集任务
ExecutorService cpuPool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);

// 虚拟线程处理 I/O 任务
ExecutorService ioPool = Executors.newVirtualThreadPerTaskExecutor();

// 组合使用
ioPool.submit(() -> {
String data = fetchDataFromNetwork(); // I/O
cpuPool.submit(() -> processData(data)); // CPU
});

使用案例

简单的使用场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Web 服务(Spring Boot 3.2+ 支持虚拟线程)
@RestController
public class MyController {
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

@GetMapping("/data")
public CompletableFuture<String> getData() {
return CompletableFuture.supplyAsync(() -> {
// 模拟数据库查询(阻塞)
try { Thread.sleep(100); } catch (Exception e) {}
return "result";
}, executor);
}
}

注意事项

虚拟线程适合用于大量并发的、会阻塞的任务,例如网络请求、文件读写等。不适合用于计算密集型任务,因为虚拟线程仍然运行在有限的物理线程上,过多计算仍会竞争。

  1. 不适合 CPU 密集型任务

    • 虚拟线程运行在平台线程上,CPU 密集会阻塞 Carrier 线程

    • 应提交给 ForkJoinPool 或固定大小平台线程池

  2. 避免在虚拟线程中使用 synchronized

    • synchronized 会 pin 住底层平台线程,失去虚拟线程优势

    • 改用 ReentrantLockStampedLockSemaphore

  3. ThreadLocal 使用需谨慎

    • 虚拟线程数量巨大 → ThreadLocal 内存泄漏风险

    • 使用后及时 remove(),或改用 ScopedValue(Java 21 新增)

  4. 不要将虚拟线程放入传统线程池

    1
    2
    3
    4
    5
    6
    // 不推荐:虚拟线程放入传统线程池无意义
    ExecutorService pool = Executors.newFixedThreadPool(10);
    pool.submit(() -> Thread.startVirtualThread(task)); // 多此一举

    // 正确:直接使用虚拟线程执行器
    virtualExecutor.submit(task);

线程池原理详解

ThreadPoolExecutor(标准常用)

通过构造器设置核心参数:

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)

ThreadPoolExecutor 使用一个 AtomicInteger 类型的字段 ctl 来同时表示 线程池状态工作线程数量

  • 高3位:线程池运行状态(runState)

  • 低29位:工作线程数(workerCount)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static final int RUNNING    = -1 << COUNT_BITS; // 接受新任务,处理队列中任务
private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接受新任务,处理队列中任务
private static final int STOP = 1 << COUNT_BITS; // 不接受新任务,不处理队列任务,中断进行中任务
private static final int TIDYING = 2 << COUNT_BITS; // 所有任务终止,workerCount=0,将调用 terminated()
private static final int TERMINATED = 3 << COUNT_BITS; // terminated() 已执行完毕

// 状态转换:
RUNNING → SHUTDOWN (调用 shutdown())

STOP (调用 shutdownNow())

TIDYING (任务和线程都清空)

TERMINATED (terminated() 执行完毕)

内部定义了一个内部类 Worker,继承自 AbstractQueuedSynchronizer(AQS),实现了简单的不可重入独占锁,用于控制线程执行任务时的状态。

Worker 的作用

  • 封装工作线程(Thread);

  • 持有第一个要执行的任务(firstTask);

  • 统计已完成任务数;

  • 通过 AQS 实现任务执行时的“锁定”,避免中断正在运行的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 继承AQS,使用CAS实现线程状态控制
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

public void run() {
runWorker(this);
}
// 通过CAS判断是否有线程在运行
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放线程
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return getState() != 0; }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

VirtualThreadPerTask(虚拟线程)

虚拟线程是轻量级线程,由 JVM 管理,底层使用少量平台线程(OS线程)调度大量虚拟线程。

维度 传统线程池(平台线程) 虚拟线程(Java 21+)
线程成本 高(MB 级栈,OS 管理) 极低(KB 级,JVM 管理)
最大并发数 通常几百~几千 可达百万级
适用场景 CPU 密集型、定时任务 I/O 密集型(首选)
是否需要池化 必须池化复用 无需池化,按需创建(每个任务新建虚拟线程)
阻塞影响 阻塞 = 浪费线程 阻塞 = 自动挂起,不占平台线程
调优复杂度 高(队列、线程数、拒绝策略) 低(无需调优)
  • 轻量级线程,由 JVM 管理,不是 OS 线程

  • 创建成本极低(可创建百万级)

  • 阻塞时不阻塞底层 OS 线程(自动挂起/恢复)

  • 适用于高并发 I/O 密集型场景

虚拟线程的本质

虚拟线程是 Project Loom 引入的轻量级线程,由 JVM 而非操作系统管理:

  • 每个虚拟线程在 Java 堆中只是一个对象(约几百字节);

  • 虚拟线程映射到少量平台线程(Carrier Threads)上运行;

  • 虚拟线程在执行阻塞操作(如 I/O、sleep、锁等待)时会自动 “卸载”(unmount),让出底层平台线程,等可继续执行时再 “挂载”(mount) 到任意平台线程上继续执行。

虚拟线程的实现

Executors.newVirtualThreadPerTaskExecutor() 返回的是一个 ThreadPerTaskExecutor 实例(内部类):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 实例化 ThreadPerTaskExecutor
public ThreadPerTaskExecutor newVirtualThreadPerTaskExecutor() {
return new ThreadPerTaskExecutor(Thread.ofVirtual().factory());
}

// ThreadPerTaskExecutor 核心逻辑:
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory factory;

public ThreadPerTaskExecutor(ThreadFactory factory) {
this.factory = factory;
}

@Override
public void execute(Runnable task) {
Objects.requireNonNull(task);
factory.newThread(task).start(); // 每次新建虚拟线程并启动
}
}

// 创建并启动线程
public void execute(Runnable task) {
Thread thread = Thread.ofVirtual().unstarted(task);
thread.start();
}
  • 每次 execute(task) → 创建一个全新的虚拟线程;

  • 启动该线程执行任务;

  • 任务执行完毕 → 虚拟线程自动销毁(GC 回收);

  • 无队列、无线程池、无复用、无拒绝策略。

虚拟线程的特点

  1. 创建成本极低

    • 创建百万个虚拟线程内存开销仅几十 MB;
    • 对比:创建 1000 个平台线程可能就耗尽内存或 OS 线程限制。
  2. 阻塞不阻塞平台线程

    • 当虚拟线程执行 socket.read(), Thread.sleep(), synchronized, Lock.lock() 等操作时,JVM 会自动将其从 Carrier Thread 上卸载,让 Carrier Thread 去执行其他虚拟线程;
    • 当 I/O 完成或锁可用时,虚拟线程被重新调度执行;
    • 平台线程始终处于忙碌状态,无“线程饥饿”问题。
  3. 无需线程池管理

    • 传统线程池是为了复用昂贵的平台线程;
    • 虚拟线程足够便宜,复用无意义,反而增加复杂度;
    • “一个任务一个线程”变成最优实践。

ForkJoinPool (并行窃取)

Java 7 引入的并发工具类,位于 java.util.concurrent 包中,专为 分治算法(Divide and Conquer)并行任务处理 设计。它在 Java 8 的 parallelStream()CompletableFuture 默认线程池中被广泛使用,是现代 Java 高性能并行计算的核心基础设施。

其主旨是将大任务分成若干小任务,之后再并行对这些小任务进行计算,最终汇总这些任务的结果。 类似map-reduce

核心思想

ForkJoinPool采取工作窃取算法work-stealing),以避免工作线程由于拆分了任务之后的join等待过程。这样处于空闲的工作线程将从其他工作线程的队列中主动去窃取任务来执行。

  • 工作窃取(Work-Stealing):空闲线程从其他线程的任务队列“偷”任务执行,提高 CPU 利用率;

  • 轻量级任务调度:避免传统线程池中任务入队/出队的锁竞争;

  • 递归任务分解:支持任务“fork”拆分子任务,“join”合并结果。

工作窃取:当某个线程的任务队列中没有可执行任务的时候,从其他线程的任务队列中窃取任务来执行,以充分利用工作线程的计算能力,减少线程由于获取不到任务而造成的空闲浪费。

ForkJoinPool中的所有线程都会尝试查找并执行提交到线程池中由其他活动创建的任务,如果不存在这些任务,则进行阻塞

在 ForkJoinPool 中,线程池中每个工作线程(ForkJoinWorkerThread)都对应一个任务队列(WorkQueue),工作线程优先处理来自自身队列的任务(LIFO或FIFO顺序,参数 mode 决定),然后以FIFO的顺序随机窃取其他队列中的任务。

在ForkJoinpool中,工作任务的队列都采用双端队列Deque容器。在通常使用队列都在队尾插入,在队头消费以实现FIFO。而为了实现工作窃取,会改成工作线程在工作队列上LIFO,而窃取其他线程的任务的时候,从队列头部取获取。

其中的核心类

ForkJoinPool 线程池主类,管理线程、调度任务
ForkJoinWorkerThread 工作线程,每个线程绑定一个 WorkQueue
WorkQueue 双端队列(Deque),存储待执行任务,支持 LIFO(本线程取)和 FIFO(其他线程偷)
ForkJoinTask 任务抽象基类(类似 Runnable),支持 fork/join
RecursiveTask<T> 有返回值的递归任务(继承 ForkJoinTask)
RecursiveAction 无返回值的递归任务

工作窃取机制

WorkQueue 结构

  • 每个 ForkJoinWorkerThread 拥有一个 WorkQueue

  • WorkQueue双端队列(Deque)

  • 本线程操作队列尾部(LIFO)→ 减少竞争,利于缓存局部性;

  • 其他线程偷任务从队列头部(FIFO)→ 避免偷“刚提交的大任务”,偷“老任务”更公平;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    // runState bits: SHUTDOWN must be negative, others arbitrary powers of two
    private static final int RSLOCK = 1; //
    private static final int RSIGNAL = 1 << 1; // 信号
    private static final int STARTED = 1 << 2; // 开始
    private static final int STOP = 1 << 29; // 停止
    private static final int TERMINATED = 1 << 30; // 销毁
    private static final int SHUTDOWN = 1 << 31; // 关闭

    // 工作队列
    static final class WorkQueue {
    static final int INITIAL_QUEUE_CAPACITY = 1 << 13; // 初始容量 8k
    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 最大容量 64M

    volatile int scanState; // versioned, <0: inactive; odd:scanning
    int stackPred; // pool stack (ctl) predecessor
    int nsteals; // 抢断次数
    int hint; // 随机化和窃取的索引
    int config; // pool index and mode
    volatile int qlock; // 1: locked, < 0: terminate; else 0
    volatile int base; // index of next slot for poll
    int top; // index of next slot for push
    ForkJoinTask<?>[] array; // the elements (initially unallocated)
    final ForkJoinPool pool; // the containing pool (may be null)
    final ForkJoinWorkerThread owner; // 工作线程,默认线程数量等于处理器的核心数
    volatile Thread parker; // == owner during call to park; else null
    volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
    volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
    }

工作窃取(Work-Stealing)流程

线程A空闲


随机选择一个其他线程B的 WorkQueue


从线程B的 WorkQueue 头部窃取一个任务


执行窃取的任务

窃取操作是无锁或低竞争的(通过 CAS + 位运算优化),极大减少线程间同步开销。

内部状态与调度

  1. ctl(控制状态)

    类似 ThreadPoolExecutorForkJoinPool 使用一个 long 类型字段 ctl 表示:

    • 低 32 位:活跃/空闲线程数;

    • 高 32 位:用于唤醒、创建线程等信号。

  2. 任务提交方式

    外部队列是 FIFO,内部队列是 LIFO + 支持窃取。

    • 外部提交(External Submissions):如 pool.submit(task) → 进入 Submission Queue(共享队列);

    • 内部 fork(Internal Forks):如 task.fork() → 进入当前线程的 WorkQueue 尾部。

  3. 线程唤醒机制

    当任务提交到空队列或新任务到来时,ForkJoinPool 会尝试:

    • 唤醒一个空闲线程,或创建新线程(不超过最大并行度);

    • 使用“缓存行填充” + “伪共享避免”优化并发性能。

fork() 与 join() 原理

任务拆分+合并

在ForkJoinPool中,任务的拆分和合并是通过继承自RecursiveAction或RecursiveTask的类来实现的。

开发者需要实现compute方法来定义任务的处理逻辑。

  1. 当一个大任务被拆分成多个小任务时,这些小任务会被提交到ForkJoinPool中并行执行。

  2. 当所有小任务都执行完成后,它们的结果会被合并起来得到大任务的处理结果。

fork()

如果是 ForkJoinWorkerThread 推入自己的 WorkQueue(LIFO)。否则提交到公共队列(common pool 的 submission queue)

1
2
3
4
5
6
7
8
public final ForkJoinTask<V> fork() {
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread) t).workQueue.push(this); // 推入当前线程队列尾部
else
ForkJoinPool.common.externalPush(this); // 外部线程提交到共享队列
return this;
}

join()

调用 doJoin() 如果任务未完成,当前线程会尝试帮助执行其他任务,或阻塞等待(通过 ManagedBlockerpark);

不会傻等,而是“帮助执行”或“窃取任务”,提高 CPU 利用率。

1
2
3
4
5
6
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}

并行度(Parallelism)

默认并行度 = Runtime.getRuntime().availableProcessors();可通过构造函数指定:

  • 并行度 ≠ 线程数:线程数可能略多(补偿阻塞),但活跃线程数 ≈ 并行度;

  • 可通过 ForkJoinPool.commonPool() 获取全局共享池(默认并行度)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 默认构造器
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), // CPU核心数
defaultForkJoinWorkerThreadFactory, null, false);
}

// 设置并行度参数
private ForkJoinPool(int parallelism, // 并行度:最大 32767
ForkJoinWorkerThreadFactory factory, // 线程创建工厂类
UncaughtExceptionHandler handler, // 异常处理接口(因未捕获异常突然终止时执行)
int mode, // 异步模式:FIFO_QUEUE,LIFO_QUEUE,默认LIFO
String workerNamePrefix) { //工作线程名称前缀
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}


// 默认线程池工厂类
static final class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new ForkJoinWorkerThread(pool);
}
}

其他说明

  • Java 8+ 中的 parallelStream() 默认使用 ForkJoinPool.commonPool(),任务被自动拆分为 ForkJoinTask 并并行执行。

    1
    list.parallelStream().map(...).filter(...).collect(...); 
  • CompletableFuture(默认异步执行器),默认使用 ForkJoinPool.commonPool()

    1
    2
    3
    4
    CompletableFuture.supplyAsync(() -> task());
    // 如果 commonPool 被阻塞(如大量 IO),会影响所有 parallelStream 和 CompletableFuture,可引入自定义线程池
    ExecutorService pool = new ForkJoinPool(8);
    CompletableFuture.supplyAsync(() -> task(), pool);