4.線程通信

線程協(xié)作-JDK API

JDK中對于需要多線程協(xié)作完成某一任務的場景,提供了對應API支持。
多線程協(xié)作的典型場景是:生產(chǎn)者-消費者模型。(線程阻塞,線程喚醒)

示例:線程1去買包子,沒有包子,則不再執(zhí)行。線程-2生產(chǎn)包子,通知線程-1繼續(xù)執(zhí)行


1.png
  1. 被棄用的suspend和resume
    作用:調(diào)用suspend掛起目標線程,通過resume可以恢復線程執(zhí)行
  • 能正確執(zhí)行的代碼


    2.png
  • 那么為什么被棄用?

因為容易寫出死鎖的代碼,uspend掛起之后并不會釋放鎖

錯誤示例有下面兩種情況


3.png
  1. wait/notify機制
    這些方法只能由同一對象鎖的持有者線程調(diào)用,也就是寫在同步代碼塊(例如synchronized中,而且鎖對象要是同一個)里面,否則會拋出illegalMonitorStateException異常。

wait方法導致當前線程等待,加入該對象(就是鎖對象)的等待集合中,并且放棄當前持有的對象鎖。
notify/notifyAll方法喚醒一個或所有正在等待這個對象鎖的線程。

注意:雖然wait會自動解鎖,但是對順序還是有要求,如果在notify被調(diào)用之后,才開始wait方法的調(diào)用,線程會永遠處于WAITING狀態(tài)。


4.png

也就是說,使用wait/notify必須配合同步代碼塊

  1. park/unpark機制
    線程調(diào)用park則等待"許可",unpark方法為指定線程提供"許可"。
    不要求park和unpark方法的調(diào)用順序。
    多次調(diào)用unpark之后,再調(diào)用park,線程會直接運行,但不會疊加(許可類似標記位,不會累加,每次park之后標記位重置,必須再unpark之后才能后續(xù)正確park),也就是說,連續(xù)多次調(diào)用park方法,第一次會拿到"許可"直接運行,后續(xù)調(diào)用會進入等待。
    但是:park不會使用鎖,這是它的缺點。


    5.png
  1. 總結(jié):
  • park/unpark機制:不會使用鎖,但是順序可變
  • wait/notify機制:wait釋放鎖,但是順序有先后
  • suspend/resume機制:即不釋放鎖,也有先后順序的問題
  1. 補充:偽喚醒


    6.png

在JDK的官方的wait()方法的注釋中明確表示線程可能被“虛假喚醒“,JDK也明確推薦使用while來判斷狀態(tài)信息。那么這種情況的發(fā)生的可能性有多大呢?

使用生產(chǎn)者消費者模型來說明,偽喚醒造成的后果是本來未被喚醒的線程被喚醒了,那么就破壞了生產(chǎn)者消費者中的判斷條件,也就是例子中的while條件number == 0或者number == 1。最終導致的結(jié)果就死0和1不能交替出現(xiàn)。

JDK的兩種同步方案均可能出現(xiàn)這種偽喚醒的問題(API說明明確表示會出現(xiàn)這種現(xiàn)象),這兩種組合是synchronized+wait+notify和ReentrantLock+await+signal。下面的例子中,如果把while換成if,那么就0、1就不能交替出現(xiàn),反制則會,例子中是100個線程進行增加,100個線程進行減少。

在Java并發(fā)編程書上面引用了Thinking In Java的一句話說,大概意思是:任何并發(fā)編程都是通過加鎖來解決。其實JDK并發(fā)編程也是通過加鎖解決,每個對象都有一個對象鎖,并且有一個與這個鎖相關的隊列,來實現(xiàn)并發(fā)編程。區(qū)別在于加鎖的粒度問題,讀讀可以并發(fā)(在讀遠大于寫的場景下比較合適),其他三種情況不能并發(fā)。而關系型數(shù)據(jù)庫也是利用這一思想,只不過做得更加徹底,除了寫寫不能并發(fā),其他三種情況都能并發(fā),這得益于MVCC模型。

public class Resource {

    public int number = 0;

    public synchronized void add() {
        while (number == 1) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        number++;
        System.err.println(number + "-" + Thread.currentThread().getId());
        notifyAll();
    }

    public synchronized void minus() {
        while (number == 0) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        number--;
        System.err.println(number + "-" + Thread.currentThread().getId());
        notifyAll();
    }

    public static class AddThread implements Runnable {

        private Resource resource;

        public AddThread(Resource resource) {
            this.resource = resource;
        }

        @Override
        public void run() {
            for (;;) resource.add();
        }
    }

    public static class MinusThread implements Runnable {
        private Resource resource;

        public MinusThread(Resource resource) {
            this.resource = resource;
        }

        @Override
        public void run() {
            for (;;) resource.minus();
        }
    }

    public static void main(String[] args) {
        Resource resource = new Resource();
        for (int i = 0; i < 100; i++) {
            new Thread(new AddThread(resource)).start();
            new Thread(new MinusThread(resource)).start();
        }
    }
}
public class ResourceLock {

    private int number = 0;

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();

    public int getNumber() {
        return this.number;
    }

    public void increase() {
        lock.lock();
        try {
            while (number == 1) {
                condition.await();
            }
            number++;
            System.err.println(number + "-" + Thread.currentThread().getId());
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }

    public void decrease() {
        lock.lock();
        try {
            while (number == 0) {
                condition.await();
            }
            number--;
            System.err.println(number + "-" + Thread.currentThread().getId());
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }

    static class IncreaseThread implements Runnable {
        private ResourceLock resource;

        public IncreaseThread(ResourceLock resource) {
            this.resource = resource;
        }

        @Override
        public void run() {
            for (;;) resource.increase();
        }
    }

    static class DecreaseThread implements Runnable {
        private ResourceLock resource;

        public DecreaseThread(ResourceLock resource) {
            this.resource = resource;
        }

        @Override
        public void run() {
            for (;;) resource.decrease();
        }
    }

    public static void main(String[] args) throws Exception {
        ResourceLock resource = new ResourceLock();
        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 100; i++) {
            es.submit(new IncreaseThread(resource));
            es.submit(new DecreaseThread(resource));
        }
    }

}

所以,偽喚醒的概率很大,建議書寫相關代碼的時候,用while替代if

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容