需求:多线程任务(查询、文件、API)同时执行,等所有线程执行完成后才能继续向下,执行其他任务
submit() 多线程
底层通过实现 Future 接口实现返回值,获取 callable 接口。
只有等线程执行完成获取到结果后才能继续执行后面的流程,会阻塞直到结果返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { this.callable = new RunnableAdapter<T>(runnable, value); this.state = NEW; }
|
使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
|
@Slf4j public class ThreadOnlyAllBySubmit { public int test(int n) { try { Thread.sleep(RandomUtil.randomInt(n * 2000)); } catch (InterruptedException e) { log.error("------thread = {}, num={}, msg = {}", Thread.currentThread().getName(), n, e.getLocalizedMessage()); return 0; }
return n; }
public static void main(String[] args) { final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
ThreadOnlyAllBySubmit threadOnlyOne = new ThreadOnlyAllBySubmit();
Future<Integer> res1 = threadPool.submit(() -> threadOnlyOne.test(1)); Future<Integer> res2 = threadPool.submit(() -> threadOnlyOne.test(4)); Future<Integer> res3 = threadPool.submit(() -> threadOnlyOne.test(7));
try { log.warn("res1:: isDone={}, res={}", res1.isDone(), res1.get(5, TimeUnit.SECONDS)); } catch (Exception e) { log.error("+++++ meg1 = {}", e.getMessage()); } try { log.warn("res2:: isDone={}, res={}", res2.isDone(), res2.get(5, TimeUnit.SECONDS)); } catch (Exception e) { log.error("+++++ meg2 = {}", e.getMessage()); } try { log.warn("res3:: isDone={}, res={}", res3.isDone(), res3.get(5, TimeUnit.SECONDS)); } catch (Exception e) { log.error("+++++ meg3 = {}", e.getMessage()); } } }
|
Semaphore(信号量)
计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。
Semaphore 中管理着一组虚拟的许可,通过 acquire 获取一个许可,如果没有许可则一直阻塞。其本质就是操作系统的P-V操作,当资源足够的时候线程获得资源并执行,资源不足时线程等待或者退出,当资源被释放时线程又可以获取竞争资源继续执行;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
@Slf4j public class ThreadOnlyAllBySemaphore { static final Semaphore semaphore = new Semaphore(0);
public void test(int n, final AtomicInteger val) { try { Thread.sleep(RandomUtil.randomInt(5000)); } catch (InterruptedException e) { log.error("------thread = {}, msg = {}", Thread.currentThread().getName(), e.getLocalizedMessage()); return; } semaphore.release(); val.addAndGet(n); }
public static void main(String[] args) throws InterruptedException { final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
int threadNum = 5; ThreadOnlyAllBySemaphore threadOnlyOne = new ThreadOnlyAllBySemaphore(); final AtomicInteger val = new AtomicInteger(0);
threadPool.execute(() -> threadOnlyOne.test(1, val)); threadPool.execute(() -> threadOnlyOne.test(4, val)); threadPool.execute(() -> threadOnlyOne.test(7, val)); threadPool.execute(() -> threadOnlyOne.test(2, val)); threadPool.execute(() -> threadOnlyOne.test(6, val));
semaphore.acquire(threadNum);
log.info("semaphore结果={}", val.get()); } }
|
CountDownLatch(计数器)
一种同步工具类,可以延迟线程的进度直到其到达终止状态。可以用来确保某些活动直到其他活动都完成后才继续执行。
应用场景:
实现原理:
闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。 countDown 方法用来递减计数器,表示有一个事件已经发生了, await 方法用来等待计数器达到零。如果计数器的值非零,那么 await 方法会一直阻塞直到计数器为零,或者等待中的线程中断、等待超时。当某个线程调用await()方法时,由于tryAcquireShared会判断state是否等于0,如果不等于,就会进入等待队列,直到countDown调用sync.releaseShared(1)使得sync的状态到0,await的线程才会继续执行;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
|
@Slf4j public class ThreadOnlyAllByCountDownLatch { private int num = 4; private final CountDownLatch countDownLatch;
public ThreadOnlyAllByCountDownLatch(int num) { this.num = num; countDownLatch = new CountDownLatch(num); }
public void test(int n, final AtomicInteger val) { try { Thread.sleep(RandomUtil.randomInt(5000)); } catch (InterruptedException e) { log.error("------thread = {}, msg = {}", Thread.currentThread().getName(), e.getLocalizedMessage()); return; } countDownLatch.countDown();
val.addAndGet(n); }
public static void main(String[] args) throws InterruptedException { final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
int threadNum = 5; ThreadOnlyAllByCountDownLatch threadOnlyOne = new ThreadOnlyAllByCountDownLatch(threadNum); final AtomicInteger val = new AtomicInteger(0);
threadPool.execute(() -> threadOnlyOne.test(1, val)); threadPool.execute(() -> threadOnlyOne.test(4, val)); threadPool.execute(() -> threadOnlyOne.test(7, val)); threadPool.execute(() -> threadOnlyOne.test(2, val)); threadPool.execute(() -> threadOnlyOne.test(6, val));
threadOnlyOne.countDownLatch.await();
log.info("countDownLatch结果={}", val.get()); } }
|