需求:多个线程同时执行一个任务,只要一个成功就退出,其他线程任务可以取消或无视。
注意:如果要暂停线程池中其他未执行完成的线程,需要先记录下来,然后调用其打断方法。
CountDownLatch(计数器)
一种同步工具类,可以延迟线程的进度直到其到达终止状态。可以用来确保某些活动直到其他活动都完成后才继续执行。
应用场景:
实现原理:
闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。 countDown 方法用来递减计数器,表示有一个事件已经发生了, await 方法用来等待计数器达到零。如果计数器的值非零,那么 await 方法会一直阻塞直到计数器为零,或者等待中的线程中断、等待超时。当某个线程调用await()方法时,由于tryAcquireShared会判断state是否等于0,如果不等于,就会进入等待队列,直到countDown调用sync.releaseShared(1)使得sync的状态到0,await的线程才会继续执行;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
@Slf4j public class ThreadOnlyOneByCountDownLatch { private final CountDownLatch countDownLatch = new CountDownLatch(1);
public void test(int n, final AtomicInteger val) { try { Thread.sleep(RandomUtil.randomInt(5000)); } catch (InterruptedException e) { log.error("------thread = {}, msg = {}", Thread.currentThread().getName(), e.getLocalizedMessage()); return; } countDownLatch.countDown();
val.compareAndSet(0, n); log.info("------thread = {}, num={} ", Thread.currentThread().getName(), n); }
public static void main(String[] args) throws InterruptedException { final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
ThreadOnlyOneByCountDownLatch threadOnlyOne = new ThreadOnlyOneByCountDownLatch(); final AtomicInteger val = new AtomicInteger(0);
threadPool.execute(() -> threadOnlyOne.test(1, val)); threadPool.execute(() -> threadOnlyOne.test(4, val)); threadPool.execute(() -> threadOnlyOne.test(7, val)); threadPool.execute(() -> threadOnlyOne.test(2, val)); threadPool.execute(() -> threadOnlyOne.test(6, val));
threadOnlyOne.countDownLatch.await();
log.info("countDownLatch结果={}", val.get()); } }
|
CyclicBarrier(循环栅栏)
栅栏能阻塞一组线程直到某个事件发生,直到所有线程都到达栅栏点,栅栏才会打开。所以栅栏一般用于多个线程需要相互等待的情况
闭锁用于等待事件,而栅栏用于等待其他线程。
可以使一定数量的参与方(线程)反复地在栅栏位置汇集,await方法将阻塞直到所有线程都到达栅栏位置。
原理实现:
CyclicBarrier 维护了一个计数器,和一个 generation 每次调用await都会有将计数器减一,并且产生一个新的 generation ,只要计数器不为零,所有前置线程都会触发 await(); 内部会调用 LockSupport.park(this); 方法将线程加入等待队列, 所有线程就绪,会调用 trip.signalAll(); 唤醒所有线程,同时执行一个用户自定义的 Runnable 策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
@Slf4j public class ThreadOnlyOneByCyclicBarrier {
private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
public void test(int n, final AtomicInteger val) { try { Thread.sleep(RandomUtil.randomInt(5000)); cyclicBarrier.await();
} catch (Exception e) { log.error("------thread = {}, num={}, msg = {}", Thread.currentThread().getName(), n, e.getLocalizedMessage()); return; }
val.compareAndSet(0, n); log.info("------thread = {}, num={} ", Thread.currentThread().getName(), n); }
public static void main(String[] args) throws InterruptedException, BrokenBarrierException { final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
ThreadOnlyOneByCyclicBarrier threadOnlyOne = new ThreadOnlyOneByCyclicBarrier(); final AtomicInteger val = new AtomicInteger(0);
threadPool.execute(() -> threadOnlyOne.test(1, val)); threadPool.execute(() -> threadOnlyOne.test(4, val)); threadPool.execute(() -> threadOnlyOne.test(7, val));
threadOnlyOne.cyclicBarrier.await(); log.info("结果={}", val.get()); } }
|
Semaphore(信号量)
计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。
Semaphore 中管理着一组虚拟的许可,通过 acquire 获取一个许可,如果没有许可则一直阻塞。其本质就是操作系统的P-V操作,当资源足够的时候线程获得资源并执行,资源不足时线程等待或者退出,当资源被释放时线程又可以获取竞争资源继续执行;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
@Slf4j public class ThreadOnlyOneBySemaphore { static final Semaphore semaphore = new Semaphore(0);
public void test(int n, final AtomicInteger val) { try { Thread.sleep(RandomUtil.randomInt(5000)); } catch (InterruptedException e) { log.error("------thread = {}, msg = {}", Thread.currentThread().getName(), e.getLocalizedMessage()); return; } semaphore.release(); val.compareAndSet(0, n); log.info("------thread = {}, num={} ", Thread.currentThread().getName(), n); }
public static void main(String[] args) throws InterruptedException { final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
ThreadOnlyOneBySemaphore threadOnlyOne = new ThreadOnlyOneBySemaphore(); final AtomicInteger val = new AtomicInteger(0);
threadPool.execute(() -> threadOnlyOne.test(1, val)); threadPool.execute(() -> threadOnlyOne.test(4, val)); threadPool.execute(() -> threadOnlyOne.test(7, val));
semaphore.acquire();
log.info("semaphore结果={}", val.get()); } }
|
Queue(阻塞队列)
通过队列的阻塞等待实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
@Slf4j public class ThreadOnlyOneByQueue { static final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
public void test(int n) { try { Thread.sleep(RandomUtil.randomInt(5000)); } catch (InterruptedException e) { log.error("------thread = {}, msg = {}", Thread.currentThread().getName(), e.getLocalizedMessage()); return; } queue.add(n); log.info("------thread = {}, num={} ", Thread.currentThread().getName(), n); }
public static void main(String[] args) throws InterruptedException { final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
ThreadOnlyOneByQueue threadOnlyOne = new ThreadOnlyOneByQueue();
threadPool.execute(() -> threadOnlyOne.test(1)); threadPool.execute(() -> threadOnlyOne.test(4)); threadPool.execute(() -> threadOnlyOne.test(7));
log.info("结果={}", queue.take()); } }
|
如何打断其他线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
@Slf4j public class ThreadOnlyOneByQueueAndInterruptOther { static final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(1);
public static void test(int n, Set<Thread> runThread) { Thread currentThread = Thread.currentThread(); runThread.add(currentThread); try { Thread.sleep(n * RandomUtil.randomInt(1000)); } catch (InterruptedException e) { log.error("------thread = {}, msg = {}", currentThread.getName(), e.getLocalizedMessage()); return; } queue.add(n); runThread.remove(currentThread); log.info("------thread = {}, num={} ", currentThread.getName(), n); }
public static void main(String[] args) throws InterruptedException { final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
final Set<Thread> runThread = new HashSet<>();
threadPool.execute(() -> ThreadOnlyOneByQueueAndInterruptOther.test(411, runThread)); threadPool.execute(() -> ThreadOnlyOneByQueueAndInterruptOther.test(114, runThread)); threadPool.execute(() -> ThreadOnlyOneByQueueAndInterruptOther.test(70, runThread));
log.info("结果={}", queue.take());
if (!runThread.isEmpty()) runThread.parallelStream().forEach(Thread::interrupt); } }
|