需求:多个线程同时执行一个任务,只要一个成功就退出,其他线程任务可以取消或无视。

注意:如果要暂停线程池中其他未执行完成的线程,需要先记录下来,然后调用其打断方法。

CountDownLatch(计数器)

一种同步工具类,可以延迟线程的进度直到其到达终止状态。可以用来确保某些活动直到其他活动都完成后才继续执行。

应用场景:

  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行。

  • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。

  • 等待直到某个操作的所有参与者都就绪再继续执行。

实现原理:

闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。 countDown 方法用来递减计数器,表示有一个事件已经发生了, await 方法用来等待计数器达到零。如果计数器的值非零,那么 await 方法会一直阻塞直到计数器为零,或者等待中的线程中断、等待超时。当某个线程调用await()方法时,由于tryAcquireShared会判断state是否等于0,如果不等于,就会进入等待队列,直到countDown调用sync.releaseShared(1)使得sync的状态到0,await的线程才会继续执行;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* <b> 只要有一个线程运行结束后,就可以结束。忽视其他未完成线程
* <p> CountDownLatch
*
* @Author Haeng
* @Email haeng2030@gmail.com
* @Date 2025/1/14 18:20
*/
@Slf4j
public class ThreadOnlyOneByCountDownLatch {
private final CountDownLatch countDownLatch = new CountDownLatch(1);

public void test(int n, final AtomicInteger val) {
try {
Thread.sleep(RandomUtil.randomInt(5000));
} catch (InterruptedException e) {
log.error("------thread = {}, msg = {}", Thread.currentThread().getName(), e.getLocalizedMessage());
return;
}
countDownLatch.countDown();

val.compareAndSet(0, n);
log.info("------thread = {}, num={} ", Thread.currentThread().getName(), n);
}

public static void main(String[] args) throws InterruptedException {
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10,
5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));

ThreadOnlyOneByCountDownLatch threadOnlyOne = new ThreadOnlyOneByCountDownLatch();
final AtomicInteger val = new AtomicInteger(0);

threadPool.execute(() -> threadOnlyOne.test(1, val));
threadPool.execute(() -> threadOnlyOne.test(4, val));
threadPool.execute(() -> threadOnlyOne.test(7, val));
threadPool.execute(() -> threadOnlyOne.test(2, val));
threadPool.execute(() -> threadOnlyOne.test(6, val));

threadOnlyOne.countDownLatch.await();

log.info("countDownLatch结果={}", val.get());
}
}

CyclicBarrier(循环栅栏)

栅栏能阻塞一组线程直到某个事件发生,直到所有线程都到达栅栏点,栅栏才会打开。所以栅栏一般用于多个线程需要相互等待的情况

闭锁用于等待事件,而栅栏用于等待其他线程。

可以使一定数量的参与方(线程)反复地在栅栏位置汇集,await方法将阻塞直到所有线程都到达栅栏位置。

原理实现:

CyclicBarrier 维护了一个计数器,和一个 generation 每次调用await都会有将计数器减一,并且产生一个新的 generation ,只要计数器不为零,所有前置线程都会触发 await(); 内部会调用 LockSupport.park(this); 方法将线程加入等待队列, 所有线程就绪,会调用 trip.signalAll(); 唤醒所有线程,同时执行一个用户自定义的 Runnable 策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* <b> 一个线程完成就继续,忽视其他未完成线程
* <p> CyclicBarrier 两个屏障
*
* @Author Haeng
* @Email haeng2030@gmail.com
* @Date 2025/1/14 18:20
*/
@Slf4j
public class ThreadOnlyOneByCyclicBarrier {
// 建立两个屏障
private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

public void test(int n, final AtomicInteger val) {
try {
Thread.sleep(RandomUtil.randomInt(5000));
cyclicBarrier.await(); //放开栅栏1

} catch (Exception e) {
log.error("------thread = {}, num={}, msg = {}", Thread.currentThread().getName(), n, e.getLocalizedMessage());
return;
}

val.compareAndSet(0, n);
log.info("------thread = {}, num={} ", Thread.currentThread().getName(), n);
}

public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10,
5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));

ThreadOnlyOneByCyclicBarrier threadOnlyOne = new ThreadOnlyOneByCyclicBarrier();
final AtomicInteger val = new AtomicInteger(0);

threadPool.execute(() -> threadOnlyOne.test(1, val));
threadPool.execute(() -> threadOnlyOne.test(4, val));
threadPool.execute(() -> threadOnlyOne.test(7, val));

threadOnlyOne.cyclicBarrier.await(); //放开栅栏1
log.info("结果={}", val.get());
}
}

Semaphore(信号量)

计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。

Semaphore 中管理着一组虚拟的许可,通过 acquire 获取一个许可,如果没有许可则一直阻塞。其本质就是操作系统的P-V操作,当资源足够的时候线程获得资源并执行,资源不足时线程等待或者退出,当资源被释放时线程又可以获取竞争资源继续执行;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* <b> 一个线程完成就继续,忽视其他未完成线程
* <p> Semaphore 信号量
*
* @Author Haeng
* @Email haeng2030@gmail.com
* @Date 2025/1/14 18:20
*/
@Slf4j
public class ThreadOnlyOneBySemaphore {
static final Semaphore semaphore = new Semaphore(0);

public void test(int n, final AtomicInteger val) {
try {
Thread.sleep(RandomUtil.randomInt(5000));
} catch (InterruptedException e) {
log.error("------thread = {}, msg = {}", Thread.currentThread().getName(), e.getLocalizedMessage());
return;
}
semaphore.release(); // 释放信号量
val.compareAndSet(0, n);
log.info("------thread = {}, num={} ", Thread.currentThread().getName(), n);
}

public static void main(String[] args) throws InterruptedException {
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10,
5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));

ThreadOnlyOneBySemaphore threadOnlyOne = new ThreadOnlyOneBySemaphore();
final AtomicInteger val = new AtomicInteger(0);

threadPool.execute(() -> threadOnlyOne.test(1, val));
threadPool.execute(() -> threadOnlyOne.test(4, val));
threadPool.execute(() -> threadOnlyOne.test(7, val));

semaphore.acquire();

log.info("semaphore结果={}", val.get());
}
}

Queue(阻塞队列)

通过队列的阻塞等待实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* <b> 其中一个线程运行结束后就可以结束,忽视其他未完成线程
* <p>
*
* @Author Haeng
* @Email haeng2030@gmail.com
* @Date 2025/1/14 18:20
*/
@Slf4j
public class ThreadOnlyOneByQueue {
static final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

public void test(int n) {
try {
Thread.sleep(RandomUtil.randomInt(5000));
} catch (InterruptedException e) {
log.error("------thread = {}, msg = {}", Thread.currentThread().getName(), e.getLocalizedMessage());
return;
}
queue.add(n);
log.info("------thread = {}, num={} ", Thread.currentThread().getName(), n);
}

public static void main(String[] args) throws InterruptedException {
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10,
5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));

ThreadOnlyOneByQueue threadOnlyOne = new ThreadOnlyOneByQueue();

threadPool.execute(() -> threadOnlyOne.test(1));
threadPool.execute(() -> threadOnlyOne.test(4));
threadPool.execute(() -> threadOnlyOne.test(7));

log.info("结果={}", queue.take());
}
}

如何打断其他线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* <b> 其中一个线程运行结束后就可以结束
* <p> 终止其他未结束的线程,需要先将所有线程记录下来,然后调用其打断方法。
*
* @Author Haeng
* @Email haeng2030@gmail.com
* @Date 2025/1/14 18:20
*/
@Slf4j
public class ThreadOnlyOneByQueueAndInterruptOther {
static final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(1);

public static void test(int n, Set<Thread> runThread) {
Thread currentThread = Thread.currentThread();
runThread.add(currentThread);
try {
Thread.sleep(n * RandomUtil.randomInt(1000));
} catch (InterruptedException e) {
log.error("------thread = {}, msg = {}", currentThread.getName(), e.getLocalizedMessage());
return;
}
queue.add(n);
runThread.remove(currentThread);
log.info("------thread = {}, num={} ", currentThread.getName(), n);
}

public static void main(String[] args) throws InterruptedException {
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10,
5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));

// 记录此次运行的线程
final Set<Thread> runThread = new HashSet<>();

threadPool.execute(() -> ThreadOnlyOneByQueueAndInterruptOther.test(411, runThread));
threadPool.execute(() -> ThreadOnlyOneByQueueAndInterruptOther.test(114, runThread));
threadPool.execute(() -> ThreadOnlyOneByQueueAndInterruptOther.test(70, runThread));

log.info("结果={}", queue.take());

if (!runThread.isEmpty())// 打断还在运行中的其他线程
runThread.parallelStream().forEach(Thread::interrupt);
}
}