線程協(xié)作-JDK API
JDK中對于需要多線程協(xié)作完成某一任務的場景,提供了對應API支持。
多線程協(xié)作的典型場景是:生產(chǎn)者-消費者模型。(線程阻塞,線程喚醒)
示例:線程1去買包子,沒有包子,則不再執(zhí)行。線程-2生產(chǎn)包子,通知線程-1繼續(xù)執(zhí)行

- 被棄用的suspend和resume
作用:調(diào)用suspend掛起目標線程,通過resume可以恢復線程執(zhí)行
-
能正確執(zhí)行的代碼
2.png
- 那么為什么被棄用?
因為容易寫出死鎖的代碼,uspend掛起之后并不會釋放鎖
錯誤示例有下面兩種情況

- wait/notify機制
這些方法只能由同一對象鎖的持有者線程調(diào)用,也就是寫在同步代碼塊(例如synchronized中,而且鎖對象要是同一個)里面,否則會拋出illegalMonitorStateException異常。
wait方法導致當前線程等待,加入該對象(就是鎖對象)的等待集合中,并且放棄當前持有的對象鎖。
notify/notifyAll方法喚醒一個或所有正在等待這個對象鎖的線程。
注意:雖然wait會自動解鎖,但是對順序還是有要求,如果在notify被調(diào)用之后,才開始wait方法的調(diào)用,線程會永遠處于WAITING狀態(tài)。

也就是說,使用wait/notify必須配合同步代碼塊
-
park/unpark機制
線程調(diào)用park則等待"許可",unpark方法為指定線程提供"許可"。
不要求park和unpark方法的調(diào)用順序。
多次調(diào)用unpark之后,再調(diào)用park,線程會直接運行,但不會疊加(許可類似標記位,不會累加,每次park之后標記位重置,必須再unpark之后才能后續(xù)正確park),也就是說,連續(xù)多次調(diào)用park方法,第一次會拿到"許可"直接運行,后續(xù)調(diào)用會進入等待。
但是:park不會使用鎖,這是它的缺點。
5.png
- 總結(jié):
- park/unpark機制:不會使用鎖,但是順序可變
- wait/notify機制:wait釋放鎖,但是順序有先后
- suspend/resume機制:即不釋放鎖,也有先后順序的問題
-
補充:偽喚醒
6.png
在JDK的官方的wait()方法的注釋中明確表示線程可能被“虛假喚醒“,JDK也明確推薦使用while來判斷狀態(tài)信息。那么這種情況的發(fā)生的可能性有多大呢?
使用生產(chǎn)者消費者模型來說明,偽喚醒造成的后果是本來未被喚醒的線程被喚醒了,那么就破壞了生產(chǎn)者消費者中的判斷條件,也就是例子中的while條件number == 0或者number == 1。最終導致的結(jié)果就死0和1不能交替出現(xiàn)。
JDK的兩種同步方案均可能出現(xiàn)這種偽喚醒的問題(API說明明確表示會出現(xiàn)這種現(xiàn)象),這兩種組合是synchronized+wait+notify和ReentrantLock+await+signal。下面的例子中,如果把while換成if,那么就0、1就不能交替出現(xiàn),反制則會,例子中是100個線程進行增加,100個線程進行減少。
在Java并發(fā)編程書上面引用了Thinking In Java的一句話說,大概意思是:任何并發(fā)編程都是通過加鎖來解決。其實JDK并發(fā)編程也是通過加鎖解決,每個對象都有一個對象鎖,并且有一個與這個鎖相關的隊列,來實現(xiàn)并發(fā)編程。區(qū)別在于加鎖的粒度問題,讀讀可以并發(fā)(在讀遠大于寫的場景下比較合適),其他三種情況不能并發(fā)。而關系型數(shù)據(jù)庫也是利用這一思想,只不過做得更加徹底,除了寫寫不能并發(fā),其他三種情況都能并發(fā),這得益于MVCC模型。
public class Resource {
public int number = 0;
public synchronized void add() {
while (number == 1) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number++;
System.err.println(number + "-" + Thread.currentThread().getId());
notifyAll();
}
public synchronized void minus() {
while (number == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number--;
System.err.println(number + "-" + Thread.currentThread().getId());
notifyAll();
}
public static class AddThread implements Runnable {
private Resource resource;
public AddThread(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
for (;;) resource.add();
}
}
public static class MinusThread implements Runnable {
private Resource resource;
public MinusThread(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
for (;;) resource.minus();
}
}
public static void main(String[] args) {
Resource resource = new Resource();
for (int i = 0; i < 100; i++) {
new Thread(new AddThread(resource)).start();
new Thread(new MinusThread(resource)).start();
}
}
}
public class ResourceLock {
private int number = 0;
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public int getNumber() {
return this.number;
}
public void increase() {
lock.lock();
try {
while (number == 1) {
condition.await();
}
number++;
System.err.println(number + "-" + Thread.currentThread().getId());
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
public void decrease() {
lock.lock();
try {
while (number == 0) {
condition.await();
}
number--;
System.err.println(number + "-" + Thread.currentThread().getId());
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
static class IncreaseThread implements Runnable {
private ResourceLock resource;
public IncreaseThread(ResourceLock resource) {
this.resource = resource;
}
@Override
public void run() {
for (;;) resource.increase();
}
}
static class DecreaseThread implements Runnable {
private ResourceLock resource;
public DecreaseThread(ResourceLock resource) {
this.resource = resource;
}
@Override
public void run() {
for (;;) resource.decrease();
}
}
public static void main(String[] args) throws Exception {
ResourceLock resource = new ResourceLock();
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i = 0; i < 100; i++) {
es.submit(new IncreaseThread(resource));
es.submit(new DecreaseThread(resource));
}
}
}
所以,偽喚醒的概率很大,建議書寫相關代碼的時候,用while替代if


