??在多線程環(huán)境中,多個線程之間互相協(xié)作,以達(dá)到高效實現(xiàn)程序功能的目的,比如某些多線程程序要求線程執(zhí)行有先后順序、獲取某個線程的執(zhí)行結(jié)果,要想實現(xiàn)多個線程之間的協(xié)同,就需要線程之間互相通信,線程通信主要分為一下四類:
- 1)文件共享
- 2)網(wǎng)絡(luò)共享
- 3)共享變量
- 4)JDK提供的線程協(xié)調(diào)API(主要有:
suspend/resume、wait/notify、park/unpark)
一、文件共享
??一個線程將數(shù)據(jù)寫入到文件中,另一個線程再去讀取文件,實現(xiàn)數(shù)據(jù)的共享,最終達(dá)到線程通信的目的。

代碼示例:
public class FileShareComm {
public static void main(String[] args) {
// 線程1 - 寫入數(shù)據(jù)
new Thread(() -> {
System.out.println("線程1啟動");
try {
while (true) {
Files.write(Paths.get("data.log"),
("當(dāng)前時間" + String.valueOf(System.currentTimeMillis())).getBytes());
Thread.sleep(1000L);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// 線程2 - 讀取數(shù)據(jù)
new Thread(() -> {
System.out.println("線程2啟動");
try {
while (true) {
Thread.sleep(1000L);
byte[] allBytes = Files.readAllBytes(Paths.get("data.log"));
System.out.println(new String(allBytes));
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
程序運行結(jié)果:
線程1啟動
線程2啟動
當(dāng)前時間1570711321681
當(dāng)前時間1570711322768
當(dāng)前時間1570711323768
當(dāng)前時間1570711324775
二、網(wǎng)絡(luò)共享
??通俗地說就是網(wǎng)絡(luò)上不同計算機(jī)之間通過套接字(Socket)進(jìn)行通信,一個Socket一般由IP和Port組成。

三、共享變量
??多個線程對某個內(nèi)存中數(shù)據(jù)進(jìn)去讀取和寫入,實現(xiàn)線程通信。

代碼示例:
public class VariableShareComm {
// 共享變量
public static String content = "空";
public static void main(String[] args) {
// 線程1 - 寫入數(shù)據(jù)
new Thread(() -> {
System.out.println("線程1啟動!");
try {
while (true) {
content = "當(dāng)前時間" + String.valueOf(System.currentTimeMillis());
Thread.sleep(1000L);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// 線程2 - 讀取數(shù)據(jù)
new Thread(() -> {
System.out.println("線程2啟動!");
try {
while (true) {
Thread.sleep(1000L);
System.out.println(content);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
程序運行結(jié)果:
線程1啟動!
線程2啟動!
當(dāng)前時間1570712442853
當(dāng)前時間1570712443859
當(dāng)前時間1570712444868
當(dāng)前時間1570712445883
四、線程協(xié)作 — JDK API
??JDK中對于需要多線程協(xié)作完成某一任務(wù)的場景,提供了對應(yīng)API支持,主要有suspend/resume、wait/notify、park/unpark。
??關(guān)于多線程協(xié)作有個經(jīng)典的場景:生產(chǎn)者 - 消費者模型(線程阻塞、線程喚醒)
示例:線程1去買包子,沒有包子,則暫停執(zhí)行,等待通知;線程2生產(chǎn)出包子,通知線程1繼續(xù)執(zhí)行。

??下面演示如何用各個JDK API實現(xiàn)生產(chǎn)者-消費者模型。
1、被棄用的suspend和resume
??調(diào)用suspend掛起目標(biāo)線程,通過resume可以恢復(fù)線程執(zhí)行,由于supend/resume即要求resume在suspend之后調(diào)用,并且suspend被調(diào)用后不會釋放鎖,因此容易寫出死鎖的代碼,所以被棄用。
(1)死鎖的場景1:suspend不釋放鎖,resume需要獲取鎖
代碼示例:
public class Demo6 {
public static Object baozidian = null; // 包子店
/** 死鎖的suspend/resume: suspend并不會像wait一樣釋放鎖,因此容易寫出死鎖代碼 */
public void suspendResumeDeadLockTest() throws Exception {
// 啟動線程
Thread consumerThread = new Thread(() -> {
if (baozidian == null) { // 如果沒包子,則進(jìn)入等待
System.out.println("1、進(jìn)入等待");
// 當(dāng)前線程拿到鎖,然后掛起(還是RUNNABLE狀態(tài))
synchronized (this) {
Thread.currentThread().suspend();
}
}
System.out.println("2、買到包子,回家");
});
consumerThread.start();
// 3秒之后,生產(chǎn)一個包子
Thread.sleep(3000L);
//System.out.println("consumerThread's status " + consumerThread.getState().toString());
baozidian = new Object();
synchronized (this) {
consumerThread.resume();
}
System.out.println("3、通知消費者");
}
public static void main(String[] args) throws Exception {
Demo6 demo6 = new Demo6();
demo6.suspendResumeDeadLockTest();
}
}
執(zhí)行結(jié)果:

【代碼解析】
??由于resume會在休眠3秒之后被調(diào)用,所以保證了resume在suspend之后執(zhí)行,consumerThread的run方法體內(nèi)執(zhí)行suspend之前要先拿到demo6對象鎖,3秒后創(chuàng)建baozidian對象,主線程要調(diào)用resume方法通知consumerThread線程,但是由于執(zhí)行suspend時沒有釋放demo5對象鎖,所以這里主線程沒辦法拿到鎖,導(dǎo)致沒法執(zhí)行resume,結(jié)果是consumerThread永遠(yuǎn)處于掛起狀態(tài)。
(2)死鎖的場景2:resume在suspend之前執(zhí)行
代碼示例:
/** 導(dǎo)致永久掛起的suspend/resume */
public void suspendResumeDeadLockTest2() throws Exception {
// 啟動線程
Thread consumerThread = new Thread(() -> {
if (baozidian == null) { // 如果沒包子,則進(jìn)入等待
System.out.println("1、進(jìn)入等待");
try {
Thread.sleep(5000L); // 為這個線程加上一點延時
} catch (Exception e) {
e.printStackTrace();
}
// 這里的掛起執(zhí)行在resume后面
Thread.currentThread().suspend();
}
System.out.println("2、買到包子,回家");
});
consumerThread.start();
// 3秒之后,生產(chǎn)一個包子
Thread.sleep(3000L);
baozidian = new Object();
consumerThread.resume();
System.out.println("3、通知消費者");
consumerThread.join();
}
執(zhí)行結(jié)果:

【代碼解析】
??由于consumerThread的run方法體內(nèi)執(zhí)行suspend之前會先休眠5秒,所以導(dǎo)致resume會先執(zhí)行,suspend后執(zhí)行,后面的程序不會再次resume,同樣會導(dǎo)致consumerThread永遠(yuǎn)處于掛起狀態(tài)。
(3)正常的suspend/resume
代碼示例:
/** 正常的suspend/resume */
public void suspendResumeTest() throws Exception {
// 啟動線程
Thread consumerThread = new Thread(() -> {
if (baozidian == null) { // 如果沒包子,則進(jìn)入等待
System.out.println("1、進(jìn)入等待");
Thread.currentThread().suspend();
}
System.out.println("2、買到包子,回家");
});
consumerThread.start();
// 3秒之后,生產(chǎn)一個包子
Thread.sleep(3000L); // 延遲3秒,保證調(diào)用resume()之前已經(jīng)調(diào)用suspend()完畢
baozidian = new Object();
consumerThread.resume();
System.out.println("3、通知消費者");
}
執(zhí)行結(jié)果:

2、wait/notify機(jī)制
??wait/notify依賴于Java對象監(jiān)視器鎖,而監(jiān)視器鎖又是跟sychronized配合使用的,因此wait/notify必須寫在同步塊中,并且wait/notify方法只能由同一對象鎖的持有者線程調(diào)用,否則會拋出IllegalMonitorStateException異常。
??特別注意,使用sychronized時,用到的監(jiān)視器鎖是監(jiān)視對象obj對應(yīng)的監(jiān)視器鎖,所以調(diào)用wait方法時,必須調(diào)用obj.wait(),這樣obj的對象監(jiān)視器才會去釋放對應(yīng)的監(jiān)視器鎖。
synchronized (obj) {
try {
System.out.println("1、進(jìn)入等待");
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
(1)正常的wait/notify
??wait和notify方法都必須在sychronized塊中被調(diào)用,且sychronized和調(diào)用方法時必須使用相同的鎖對象,notify必須在wait被調(diào)用之后再調(diào)用,代碼如下:
public class Demo6 {
public static Object baozidian = null; // 包子店
/** 正常的wait/notify */
public void waitNotifyTest() throws Exception {
// 啟動線程
new Thread(() -> {
if (baozidian == null) {
synchronized (this) {
try {
System.out.println("1、進(jìn)入等待");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("2、買到包子,回家");
}).start();
// 3秒之后,生產(chǎn)一個包子
Thread.sleep(3000L);
baozidian = new Object();
synchronized (this) {
this.notifyAll();
System.out.println("3、通知消費者");
}
}
public static void main(String[] args) throws Exception {
Demo6 demo6 = new Demo6();
// 2、wait/notify
demo6.waitNotifyTest();
}
}
執(zhí)行結(jié)果:

(2)死鎖的wait/notify
??wait方法會導(dǎo)致當(dāng)前線程等待,加入對應(yīng)的對象的監(jiān)視器等待集合中,并且釋放當(dāng)前持有的對象鎖,notify/notifyAll方法會喚醒一個或者所有正在等待該對象鎖的線程。
??雖然wait會自動解鎖,但是對調(diào)用順序有要求,如果在notify被調(diào)用之后,才開始wait方法的調(diào)用,線程會永遠(yuǎn)處于WAITING狀態(tài);如果調(diào)用notify/notifyAll時對象鎖的等待集合中沒有等待的線程,自然通知不到任一個線程,只有在通知前有線程調(diào)用了wait進(jìn)入等待集合中,才能真正通知到等待的線程。
代碼示例:
public class Demo6 {
public static Object baozidian = null; // 包子店
/** 會導(dǎo)致程序永久等待的wait/notify */
public void waitNotifyDeadLockTest() throws Exception {
// 啟動線程
new Thread(() -> {
if (baozidian == null) {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (this) {
try {
System.out.println("1、進(jìn)入等待");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("2、買到包子,回家");
}).start();
// 3秒之后,生產(chǎn)一個包子
Thread.sleep(3000L);
baozidian = new Object();
synchronized (this) {
this.notifyAll();
System.out.println("3、通知消費者");
}
}
public static void main(String[] args) throws Exception {
Demo6 demo6 = new Demo6();
// 2、wait/notify
demo6.waitNotifyDeadLockTest();
}
}
執(zhí)行結(jié)果:

3、park/unpark機(jī)制
??park/unpark是JDK API的另一種線程通信機(jī)制,一個線程調(diào)用了park則等待頒發(fā)一個“許可”,如果當(dāng)前存在未被使用的許可,則線程可以直接獲取許可直接運行;如果當(dāng)前許可數(shù)為0,則需要等待其他線程調(diào)用unpark頒發(fā)許可。
??需要許可和頒發(fā)許可的線程沒有強(qiáng)依賴關(guān)系,任何一個線程頒發(fā)的許可都可以被任意需要許可的線程使用,任何一個線程都可以頒發(fā)許可。因此,park和unpark對調(diào)用順序沒有要求,同時由于park/unpark不像wait/notify那樣是基于鎖監(jiān)視器的,所以park/unpark不會釋放當(dāng)前線程持有的鎖。
??一個線程多次調(diào)用park時,只有第一次調(diào)用生效,不會因為多次park而去獲取多個許可證,因為底層是基于一個布爾值的CAS原子操作。
(1)正常的park/unpark
代碼示例:
public class Demo6 {
public static Object baozidian = null; // 包子店
/** 正常的park/unpark */
public void parkUnparkTest() throws Exception {
// 啟動線程
Thread consumerThread = new Thread(() -> {
if (baozidian == null) {
System.out.println("1、進(jìn)入等待");
LockSupport.park();
}
System.out.println("2、買到包子,回家");
});
consumerThread.start();
// 3秒之后,生產(chǎn)一個包子
Thread.sleep(3000L);
baozidian = new Object();
System.out.println();
LockSupport.unpark(consumerThread);
System.out.println("3、通知消費者");
}
public static void main(String[] args) throws Exception {
Demo6 demo6 = new Demo6();
// 3、park/unpark
demo6.parkUnparkTest();
}
}
執(zhí)行結(jié)果:

(2)死鎖的park/unpark
??由于park時不會釋放鎖,所以如果執(zhí)行park所在的代碼塊是需要先獲取鎖的同步代碼塊,并且unpark()需要獲取相同的鎖時,會觸發(fā)死鎖。
代碼示例:
public class Demo6 {
public static Object baozidian = null; // 包子店
/** 死鎖的park/unpark */
public void parkUnparkDeadLockTest() throws Exception {
// 啟動線程
Thread consumerThread = new Thread(() ->{
if (baozidian == null) {
System.out.println("1、進(jìn)入等待");
synchronized (this) {
LockSupport.park();
}
}
System.out.println("2、買到包子,回家");
});
consumerThread.start();
// 3秒之后,生產(chǎn)一個包子
Thread.sleep(3000L);
baozidian = new Object();
synchronized (this) {
LockSupport.unpark(consumerThread);
}
System.out.println("3、通知消費者");
}
public static void main(String[] args) throws Exception {
Demo6 demo6 = new Demo6();
// 3、park/unpark
demo6.parkUnparkDeadLockTest();
}
}
4、總結(jié)
| 通訊方式 | 等待通知時狀態(tài) | 死鎖場景 | 優(yōu)點 | 缺點 |
|---|---|---|---|---|
suspend/resume |
RUNNABLE | 1、resume在suspend之前被調(diào)用 2、suspend和resume所在的同步代碼塊使用相同的鎖導(dǎo)致死鎖 |
~ | 很容易觸發(fā)死鎖 |
wait/notify |
WAITING | notify在wait之前使用 | 基于對象監(jiān)視器鎖,調(diào)用wait時會釋放鎖 |
執(zhí)行順序有要求 |
park/unpark |
WAITING | park和unpark所在同步代碼塊使用相同的鎖導(dǎo)致死鎖 | 對執(zhí)行順序沒要求 |
park不會釋放鎖,可能導(dǎo)致死鎖 |
五、偽喚醒
??一般情況下,當(dāng)線程運行需要的等待某個條件還不具備是,線程會調(diào)用上述的suspend、wait、park方法將線程掛起,然后等待另一個線程滿足這個條件后再通知掛起線程,如果使用if語句來判斷是否進(jìn)入等待狀態(tài),可能會引起偽喚醒問題,問題代碼模板示例:
sychronized(lock) {
if (<條件判斷>) {
lock.wait();
}
// 執(zhí)行后續(xù)操作
}
1、什么是偽喚醒?
??偽喚醒是指線程并非因為notify、notifyAll、unpark等api調(diào)用而喚醒,是更底層的原因?qū)е碌?,此時條件判斷還不滿足,但是卻因為偽喚醒運行后續(xù)的代碼,導(dǎo)致程序運行異常或錯誤。
2、如何解決偽喚醒問題?
??不用if語句來判斷,而是在循環(huán)中檢查等待條件,這樣確保程序在偽喚醒的條件下依然不會在條件沒滿足的情況下去執(zhí)行后續(xù)操作,而是再次將線程掛起,如下所示:
// wait
sychronized(obj) {
while (<條件判斷>) {
obj.wait();
}
// 執(zhí)行后續(xù)操作
}
// park
while(<條件判斷>) {
LockSupport.park();
// 執(zhí)行后續(xù)操作
}
實例代碼演示:
/** 正常的wait/notify */
public void waitNotifyTest() throws Exception {
// 啟動線程
Thread consumerThread = new Thread(() -> {
while (baozidian == null) {
synchronized (this) {
try {
System.out.println("1、進(jìn)入等待");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("2、買到包子,回家");
});
consumerThread.start();
// 3秒之后,生產(chǎn)一個包子
Thread.sleep(3000L);
System.out.println("consumerThread's status " + consumerThread.getState().toString());
baozidian = new Object();
synchronized (this) {
this.notifyAll();
System.out.println("3、通知消費者");
}
}
/** 正常的park/unpark */
public void parkUnparkTest() throws Exception {
// 啟動線程
Thread consumerThread = new Thread(() -> {
while (baozidian == null) {
System.out.println("1、進(jìn)入等待");
LockSupport.park();
}
System.out.println("2、買到包子,回家");
});
consumerThread.start();
// 3秒之后,生產(chǎn)一個包子
Thread.sleep(3000L);
baozidian = new Object();
//System.out.println(consumerThread.getState().toString());
LockSupport.unpark(consumerThread);
System.out.println("3、通知消費者");
}