线程在启动以后,并不是第一时间就会立马执行。而是要等待CPU的一个资源调度,而CPU调度的顺序是通过复杂算法计算得到的。

等启动的线程得到CPU指令后,才和主线程做一个切换,执行run方法。这就造成了每次我们执行的结果都是随机的。

可以通过一些方式控制线程执行的顺序:

join()(优先执行)

调用线程需等待该join()线程执行完成后,才能继续用下其他线程。

底层通过waitnotify实现,保证线程执行顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ThreadOrderByJoin {

public static void main(String[] args) throws InterruptedException {
final Thread thread1 = new Thread(() -> {
System.out.println("1产品经理规划新需求");
});
final Thread thread2 = new Thread(() -> {
System.out.println("2开发人员开发新需求功能");
});
final Thread thread3 = new Thread(() -> {
System.out.println("3测试人员测试新功能");
});
thread1.start();
thread1.join(); // 等待当前线程执行完毕再往下执行
thread2.start();
thread2.join(); // 等待当前线程执行完毕再往下执行
thread3.start();
}
}

Object(等待+唤醒)

Object的方法,是让当前线程进入等待状态,释放它所持有的锁。“直到其他线程调用此对象的 notify() 方法或 notifyAll() 方法”,当前线程被唤醒(进入“就绪状态”)。

所有的实例都有一个等待队列,它在实例的wait方法执行后停止操作的线程的队列。

执行wait方法后,线程便会暂停操作,进入等待队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class ThreadOrderByObject {
static final Object demo1 = new Object();
static final Object demo2 = new Object();

public static void main(String[] args) {
final Thread thread1 = new Thread(() -> {
synchronized (demo1) {
System.out.println("1产品经理规划新需求");
demo1.notify(); //放开 1
}
});
final Thread thread2 = new Thread(() -> {
try {
synchronized (demo1) {
demo1.wait(); // 暂停1
}
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("2开发人员开发新需求功能");
synchronized (demo2) {
demo2.notify(); //放开 2
}
});
final Thread thread3 = new Thread(() -> {
try {
synchronized (demo2) {
demo2.wait(); // 暂停2
System.out.println("3测试人员测试新功能");
}
} catch (Exception e) {
e.printStackTrace();
}
});
// 先启动最后等待的线程,依次向上(wait() 在 notify() 前面执行,否则一直阻塞等待,除非wait(1000))
thread3.start();
thread2.start();
thread1.start();
}
}

ExecutorService(单线程池)

利用并发包里newSingleThreadExecutor产生一个单线程的线程池,而这个线程池的底层原理就是一个先进先出(FIFO)的队列。

代码中executor.submit依次添加了123线程,按照FIFO的特性,执行顺序也就是123的执行结果,从而保证了执行顺序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// ExecutorService 的源码原理
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
1
2
3
4
5
6
7
8
9
    static ExecutorService executorService = Executors.newSingleThreadExecutor();
// 通过ExecutorService 线程池启动线程,保证顺序执行
public static void main(String[] args) throws InterruptedException {
executorService.submit(thread2);
executorService.submit(thread1);
executorService.submit(thread3);

executorService.shutdown();
}

Condition(条件变量)

通常与锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。

  • **await()**方法类似于Object类中的wait()方法。

  • **await(long time,TimeUnit unit)**方法类似于Object类中的wait(long time)方法。

  • **signal()**方法类似于Object类中的notify()方法。

  • **signalAll()**方法类似于Object类中的notifyAll()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class ThreadOrderByCondition {
private static final Lock lock = new ReentrantLock();
private static final Condition condition1 = lock.newCondition();
private static final Condition condition2 = lock.newCondition();
private static int threadFlag = 1;//当前正在执行线程的标记,相当于状态标记

public static void main(String[] args) {
new Thread(() -> {
lock.lock();
try {
System.out.println("1产品经理规划新需求");
condition1.signal();
threadFlag = 1;
} finally {
lock.unlock();
}
}).start();

new Thread(() -> {
lock.lock();
try {
if (threadFlag != 1)
condition1.await();
System.out.println("2开发人员开发新需求功能");
threadFlag = 2;
condition2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();

new Thread(() -> {
lock.lock();
try {
if (threadFlag != 2)
condition2.await();
System.out.println("3测试人员测试新功能");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
}
}

CountDownLatch(计数)

位于java.util.concurrent包下,利用它可以实现类似计数器的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ThreadOrderByCountDownLatch {
/**
* 用于判断线程一是否执行,倒计时设置为1,执行后减1
*/
private static CountDownLatch c1 = new CountDownLatch(1);
private static CountDownLatch c2 = new CountDownLatch(1);

public static void main(String[] args) {
new Thread(() -> {
System.out.println("1产品经理规划新需求");
c1.countDown();
}).start();

new Thread(() -> {
try { //等待c1倒计时,计时为0则往下运行
c1.await();
System.out.println("2开发人员开发新需求功能");
c2.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

new Thread(() -> {
try { //等待c2倒计时,计时为0则往下运行
c2.await();
System.out.println("3测试人员测试新功能");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

CyclicBarrier(回环栅栏)

通过它可以实现让一组线程等待至某个状态之后再全部同时执行。

当所有等待线程都被释放以后,CyclicBarrier可以被重用。当调用await()方法之后,线程就处于barrier了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ThreadOrderByCyclicBarrier {
static CyclicBarrier barrier1 = new CyclicBarrier(2);
static CyclicBarrier barrier2 = new CyclicBarrier(2);

public static void main(String[] args) {
new Thread(() -> {
try {
System.out.println("1产品经理规划新需求"); //放开栅栏1
barrier1.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();

new Thread(() -> {
try { //放开栅栏1
barrier1.await();
System.out.println("2开发人员开发新需求功能"); //放开栅栏2
barrier2.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();

new Thread(() -> {
try { //放开栅栏2
barrier2.await();
System.out.println("3测试人员测试新功能");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}

Semaphore(信号量)

是一个计数信号量,包含一组许可证。每个acquire()都会阻塞,直到获取一个可用的许可证,每个release()都会释放持有许可证的线程,并且归还一个可用的许可证。

acquire():当前线程尝去阻塞的获取1个许可证,获取了1个可用的许可证,则会停止等待,继续执行。

release():当前线程释放1个可用的许可证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ThreadOrderBySemaphore {
private static Semaphore semaphore1 = new Semaphore(0);
private static Semaphore semaphore2 = new Semaphore(0);

public static void main(String[] args) {
new Thread(() -> {
System.out.println("1产品经理规划新需求");
semaphore1.release();
}).start();

new Thread(() -> {
try {
semaphore1.acquire();
System.out.println("2开发人员开发新需求功能");
semaphore2.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

new Thread(() -> {
try {
semaphore2.acquire();
System.out.println("3测试人员测试新功能");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

其他例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class Product { //生产者、消费者问题
public static void main(String[] args) {
clerk c = new clerk();
Thread productT = new Thread(new Producer(c));//生产者线程
Thread consumerT = new Thread(new Consumer(c));//消费者线程
productT.start();
consumerT.start();
}
}

class clerk { //店员
private int product = 0; //默认 0 个产品

public synchronized void addproduct() { //生产出的产品,交给店员
if (this.product >= 20) {
try {
wait(); //产品过多,稍后再生产
} catch (Exception e) {
e.printStackTrace();
}
} else {
product++;
System.out.println("生产者生产第" + product + "个产品。");
notifyAll(); //通知等待区的消费者可取产品
}
}

public synchronized void getproduct() { //消费者从店员处取产品
if (this.product <= 0) {
try {
wait(); //缺货,稍后再取
} catch (Exception e) {
e.printStackTrace();
}
} else {
System.out.println("消费者取走了第:" + product + "产品。");
product--;
notifyAll(); //通知等待取得生产者可以继续生产
}
}
}

class Producer implements Runnable { //生产者线程

private clerk c;

public Producer(clerk c) {
this.c = c;
}

@Override
public void run() {
System.out.println("生产者开始生产产品。");
while (true) {
try {
Thread.sleep((int) (Math.random() * 10) * 100);
} catch (Exception e) {
e.printStackTrace();
}
c.addproduct(); //生产
}
}
}

class Consumer implements Runnable { //消费者线程

private clerk c;

public Consumer(clerk c) {
this.c = c;
}

@Override
public void run() {
System.out.println("消费者开始取走产品。");
while (true) {
try {
Thread.sleep((int) (Math.random() * 10) * 100);
} catch (Exception e) {
e.printStackTrace();
}
c.getproduct(); //取产品
}
}
}