线程在启动以后,并不是第一时间就会立马执行。而是要等待CPU的一个资源调度,而CPU调度的顺序是通过复杂算法计算得到的。
等启动的线程得到CPU指令后,才和主线程做一个切换,执行run方法。这就造成了每次我们执行的结果都是随机的。
可以通过一些方式控制线程执行的顺序:
join()(优先执行)
调用线程需等待该join()线程执行完成后,才能继续用下其他线程。
底层通过wait和notify实现,保证线程执行顺序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class ThreadOrderByJoin {
public static void main(String[] args) throws InterruptedException { final Thread thread1 = new Thread(() -> { System.out.println("1产品经理规划新需求"); }); final Thread thread2 = new Thread(() -> { System.out.println("2开发人员开发新需求功能"); }); final Thread thread3 = new Thread(() -> { System.out.println("3测试人员测试新功能"); }); thread1.start(); thread1.join(); thread2.start(); thread2.join(); thread3.start(); } }
|
Object(等待+唤醒)
Object的方法,是让当前线程进入等待状态,释放它所持有的锁。“直到其他线程调用此对象的 notify() 方法或 notifyAll() 方法”,当前线程被唤醒(进入“就绪状态”)。
所有的实例都有一个等待队列,它在实例的wait方法执行后停止操作的线程的队列。
执行wait方法后,线程便会暂停操作,进入等待队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class ThreadOrderByObject { static final Object demo1 = new Object(); static final Object demo2 = new Object();
public static void main(String[] args) { final Thread thread1 = new Thread(() -> { synchronized (demo1) { System.out.println("1产品经理规划新需求"); demo1.notify(); } }); final Thread thread2 = new Thread(() -> { try { synchronized (demo1) { demo1.wait(); } } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("2开发人员开发新需求功能"); synchronized (demo2) { demo2.notify(); } }); final Thread thread3 = new Thread(() -> { try { synchronized (demo2) { demo2.wait(); System.out.println("3测试人员测试新功能"); } } catch (Exception e) { e.printStackTrace(); } }); thread3.start(); thread2.start(); thread1.start(); } }
|
ExecutorService(单线程池)
利用并发包里newSingleThreadExecutor产生一个单线程的线程池,而这个线程池的底层原理就是一个先进先出(FIFO)的队列。
代码中executor.submit依次添加了123线程,按照FIFO的特性,执行顺序也就是123的执行结果,从而保证了执行顺序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public void execute(Runnable command) { int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
|
1 2 3 4 5 6 7 8 9
| static ExecutorService executorService = Executors.newSingleThreadExecutor();
public static void main(String[] args) throws InterruptedException { executorService.submit(thread2); executorService.submit(thread1); executorService.submit(thread3);
executorService.shutdown(); }
|
Condition(条件变量)
通常与锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。
-
**await()**方法类似于Object类中的wait()方法。
-
**await(long time,TimeUnit unit)**方法类似于Object类中的wait(long time)方法。
-
**signal()**方法类似于Object类中的notify()方法。
-
**signalAll()**方法类似于Object类中的notifyAll()方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class ThreadOrderByCondition { private static final Lock lock = new ReentrantLock(); private static final Condition condition1 = lock.newCondition(); private static final Condition condition2 = lock.newCondition(); private static int threadFlag = 1;
public static void main(String[] args) { new Thread(() -> { lock.lock(); try { System.out.println("1产品经理规划新需求"); condition1.signal(); threadFlag = 1; } finally { lock.unlock(); } }).start();
new Thread(() -> { lock.lock(); try { if (threadFlag != 1) condition1.await(); System.out.println("2开发人员开发新需求功能"); threadFlag = 2; condition2.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }).start();
new Thread(() -> { lock.lock(); try { if (threadFlag != 2) condition2.await(); System.out.println("3测试人员测试新功能"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }).start(); } }
|
CountDownLatch(计数)
位于java.util.concurrent包下,利用它可以实现类似计数器的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class ThreadOrderByCountDownLatch {
private static CountDownLatch c1 = new CountDownLatch(1); private static CountDownLatch c2 = new CountDownLatch(1);
public static void main(String[] args) { new Thread(() -> { System.out.println("1产品经理规划新需求"); c1.countDown(); }).start();
new Thread(() -> { try { c1.await(); System.out.println("2开发人员开发新需求功能"); c2.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start();
new Thread(() -> { try { c2.await(); System.out.println("3测试人员测试新功能"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
|
CyclicBarrier(回环栅栏)
通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
当所有等待线程都被释放以后,CyclicBarrier可以被重用。当调用await()方法之后,线程就处于barrier了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class ThreadOrderByCyclicBarrier { static CyclicBarrier barrier1 = new CyclicBarrier(2); static CyclicBarrier barrier2 = new CyclicBarrier(2);
public static void main(String[] args) { new Thread(() -> { try { System.out.println("1产品经理规划新需求"); barrier1.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start();
new Thread(() -> { try { barrier1.await(); System.out.println("2开发人员开发新需求功能"); barrier2.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start();
new Thread(() -> { try { barrier2.await(); System.out.println("3测试人员测试新功能"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); } }
|
Semaphore(信号量)
是一个计数信号量,包含一组许可证。每个acquire()都会阻塞,直到获取一个可用的许可证,每个release()都会释放持有许可证的线程,并且归还一个可用的许可证。
acquire():当前线程尝去阻塞的获取1个许可证,获取了1个可用的许可证,则会停止等待,继续执行。
release():当前线程释放1个可用的许可证。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class ThreadOrderBySemaphore { private static Semaphore semaphore1 = new Semaphore(0); private static Semaphore semaphore2 = new Semaphore(0);
public static void main(String[] args) { new Thread(() -> { System.out.println("1产品经理规划新需求"); semaphore1.release(); }).start();
new Thread(() -> { try { semaphore1.acquire(); System.out.println("2开发人员开发新需求功能"); semaphore2.release(); } catch (InterruptedException e) { e.printStackTrace(); } }).start();
new Thread(() -> { try { semaphore2.acquire(); System.out.println("3测试人员测试新功能"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
|
其他例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| public class Product { public static void main(String[] args) { clerk c = new clerk(); Thread productT = new Thread(new Producer(c)); Thread consumerT = new Thread(new Consumer(c)); productT.start(); consumerT.start(); } }
class clerk { private int product = 0;
public synchronized void addproduct() { if (this.product >= 20) { try { wait(); } catch (Exception e) { e.printStackTrace(); } } else { product++; System.out.println("生产者生产第" + product + "个产品。"); notifyAll(); } }
public synchronized void getproduct() { if (this.product <= 0) { try { wait(); } catch (Exception e) { e.printStackTrace(); } } else { System.out.println("消费者取走了第:" + product + "产品。"); product--; notifyAll(); } } }
class Producer implements Runnable {
private clerk c;
public Producer(clerk c) { this.c = c; }
@Override public void run() { System.out.println("生产者开始生产产品。"); while (true) { try { Thread.sleep((int) (Math.random() * 10) * 100); } catch (Exception e) { e.printStackTrace(); } c.addproduct(); } } }
class Consumer implements Runnable {
private clerk c;
public Consumer(clerk c) { this.c = c; }
@Override public void run() { System.out.println("消费者开始取走产品。"); while (true) { try { Thread.sleep((int) (Math.random() * 10) * 100); } catch (Exception e) { e.printStackTrace(); } c.getproduct(); } } }
|