需求:多线程任务(查询、文件、API)同时执行,等所有线程执行完成后才能继续向下,执行其他任务

submit() 多线程

底层通过实现 Future 接口实现返回值,获取 callable 接口。

只有等线程执行完成获取到结果后才能继续执行后面的流程,会阻塞直到结果返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 原理
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

// RunnableAdapter 实现 Callable 接口的 call()
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
this.callable = new RunnableAdapter<T>(runnable, value);
this.state = NEW; // ensure visibility of callable
}

// Callable 的 get() 方法 ,通过运行状态 和 LockSupport 实现线程阻塞

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* <b> 只有所有线程运行结束后才可以结束
* <p>
*
* @Author Haeng
* @Email haeng2030@gmail.com
* @Date 2025/1/14 18:20
*/
@Slf4j
public class ThreadOnlyAllBySubmit {
public int test(int n) {
try {
Thread.sleep(RandomUtil.randomInt(n * 2000));
} catch (InterruptedException e) {
log.error("------thread = {}, num={}, msg = {}", Thread.currentThread().getName(), n, e.getLocalizedMessage());
return 0;
}

return n;
}

public static void main(String[] args) {
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10,
5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));

ThreadOnlyAllBySubmit threadOnlyOne = new ThreadOnlyAllBySubmit();

// submit 等待线程执行完成,返回值
Future<Integer> res1 = threadPool.submit(() -> threadOnlyOne.test(1));
Future<Integer> res2 = threadPool.submit(() -> threadOnlyOne.test(4));
Future<Integer> res3 = threadPool.submit(() -> threadOnlyOne.test(7));

try {
// get 方法等待线程执行完成后获取结果,超时后没有则抛出异常
log.warn("res1:: isDone={}, res={}", res1.isDone(), res1.get(5, TimeUnit.SECONDS));
} catch (Exception e) {
log.error("+++++ meg1 = {}", e.getMessage());
}
try {
log.warn("res2:: isDone={}, res={}", res2.isDone(), res2.get(5, TimeUnit.SECONDS));
} catch (Exception e) {
log.error("+++++ meg2 = {}", e.getMessage());
}
try {
log.warn("res3:: isDone={}, res={}", res3.isDone(), res3.get(5, TimeUnit.SECONDS));
} catch (Exception e) {
log.error("+++++ meg3 = {}", e.getMessage());
}
}
}

Semaphore(信号量)

计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。

Semaphore 中管理着一组虚拟的许可,通过 acquire 获取一个许可,如果没有许可则一直阻塞。其本质就是操作系统的P-V操作,当资源足够的时候线程获得资源并执行,资源不足时线程等待或者退出,当资源被释放时线程又可以获取竞争资源继续执行;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* <b> 只有所有线程运行结束后才可以结束
* <p>
*
* @Author Haeng
* @Email haeng2030@gmail.com
* @Date 2025/1/14 18:20
*/
@Slf4j
public class ThreadOnlyAllBySemaphore {
static final Semaphore semaphore = new Semaphore(0);

public void test(int n, final AtomicInteger val) {
try {
Thread.sleep(RandomUtil.randomInt(5000));
} catch (InterruptedException e) {
log.error("------thread = {}, msg = {}", Thread.currentThread().getName(), e.getLocalizedMessage());
return;
}
semaphore.release();
val.addAndGet(n);
}


public static void main(String[] args) throws InterruptedException {
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10,
5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));

int threadNum = 5;
ThreadOnlyAllBySemaphore threadOnlyOne = new ThreadOnlyAllBySemaphore();
final AtomicInteger val = new AtomicInteger(0);

threadPool.execute(() -> threadOnlyOne.test(1, val));
threadPool.execute(() -> threadOnlyOne.test(4, val));
threadPool.execute(() -> threadOnlyOne.test(7, val));
threadPool.execute(() -> threadOnlyOne.test(2, val));
threadPool.execute(() -> threadOnlyOne.test(6, val));

semaphore.acquire(threadNum);

log.info("semaphore结果={}", val.get());
}
}

CountDownLatch(计数器)

一种同步工具类,可以延迟线程的进度直到其到达终止状态。可以用来确保某些活动直到其他活动都完成后才继续执行。

应用场景:

  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行。

  • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。

  • 等待直到某个操作的所有参与者都就绪再继续执行。

实现原理:

闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。 countDown 方法用来递减计数器,表示有一个事件已经发生了, await 方法用来等待计数器达到零。如果计数器的值非零,那么 await 方法会一直阻塞直到计数器为零,或者等待中的线程中断、等待超时。当某个线程调用await()方法时,由于tryAcquireShared会判断state是否等于0,如果不等于,就会进入等待队列,直到countDown调用sync.releaseShared(1)使得sync的状态到0,await的线程才会继续执行;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* <b> 只有所有线程运行结束后才可以结束
* <p>
*
* @Author Haeng
* @Email haeng2030@gmail.com
* @Date 2025/1/14 18:20
*/
@Slf4j
public class ThreadOnlyAllByCountDownLatch {
private int num = 4;
private final CountDownLatch countDownLatch;

public ThreadOnlyAllByCountDownLatch(int num) {
this.num = num;
countDownLatch = new CountDownLatch(num);
}

public void test(int n, final AtomicInteger val) {
try {
Thread.sleep(RandomUtil.randomInt(5000));
} catch (InterruptedException e) {
log.error("------thread = {}, msg = {}", Thread.currentThread().getName(), e.getLocalizedMessage());
return;
}
countDownLatch.countDown();

val.addAndGet(n);
}

public static void main(String[] args) throws InterruptedException {
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10,
5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));

int threadNum = 5;
ThreadOnlyAllByCountDownLatch threadOnlyOne = new ThreadOnlyAllByCountDownLatch(threadNum);
final AtomicInteger val = new AtomicInteger(0);

threadPool.execute(() -> threadOnlyOne.test(1, val));
threadPool.execute(() -> threadOnlyOne.test(4, val));
threadPool.execute(() -> threadOnlyOne.test(7, val));
threadPool.execute(() -> threadOnlyOne.test(2, val));
threadPool.execute(() -> threadOnlyOne.test(6, val));

threadOnlyOne.countDownLatch.await();

log.info("countDownLatch结果={}", val.get());
}
}