java多線程同步(wait、notify)生產(chǎn)者消費者簡單示例

一、為何寫

最為一個Android開發(fā)者,如果做得不夠深入可能為不會去處理多線程同步的問題,稍微簡單點可能使用一個線程池就可以搞定了,有關(guān)線程池的介紹可以參考我的另一篇文章:ExecutorService+LruCache+DiskLruCache用一個類打造簡單的圖片加載庫
只是前段時間研究Android音視頻硬解碼,看到開源項目中用到了線程同步,就是在視頻的YUV數(shù)據(jù)的暫存,和解碼為視頻并展示,用到了兩個線程去做,一個線程收集視頻源數(shù)據(jù),一個線程負(fù)責(zé)解碼并播放視頻,一個視頻數(shù)據(jù)池是兩個線程共享的,數(shù)據(jù)池滿了或者空了的時候兩個線程是要做出相應(yīng)處理的,這就涉及到線程同步了。

這里寫圖片描述

學(xué)習(xí)、工作和生活的心態(tài)就要像向日葵,就算是太陽不在也要迎著月亮!

二、名字講解

什么是線程同步?

當(dāng)使用多個線程來訪問同一個數(shù)據(jù)時,非常容易出現(xiàn)線程安全問題(比如多個線程都在操作同一數(shù)據(jù)導(dǎo)致數(shù)據(jù)不一致),所以我們用同步機制來解決這些問題。

實現(xiàn)同步機制有兩個方法:

1、同步代碼塊:

synchronized(同一個數(shù)據(jù)){} 同一個數(shù)據(jù):就是N條線程同時訪問一個數(shù)據(jù)。

2、同步方法:

public synchronized 數(shù)據(jù)返回類型方法名(){}

通過使用同步方法,可非常方便的將某類變成線程安全的類,具有如下特征:
1,該類的對象可以被多個線程安全的訪問。
2,每個線程調(diào)用該對象的任意方法之后,都將得到正確的結(jié)果。
3,每個線程調(diào)用該對象的任意方法之后,該對象狀態(tài)依然保持合理狀態(tài)。
注:synchronized關(guān)鍵字可以修飾方法,也可以修飾代碼塊,但不能修飾構(gòu)造器,屬性等

※不要對線程安全類的所有方法都進行同步,只對那些會改變共享資源方法的進行同步。
線程通訊:
當(dāng)使用synchronized 來修飾某個共享資源時(分同步代碼塊和同步方法兩種情況),當(dāng)某個線程獲得共享資源的鎖后就可以執(zhí)行相應(yīng)的代碼段,直到該線程運行完該代碼段后才釋放對該共享資源的鎖,讓其他線程有機會執(zhí)行對該共享資源的修改。當(dāng)某個線程占有某個共享資源的鎖時,如果另外一個線程也想獲得這把鎖運行就需要使用wait() 和notify()/notifyAll()方法來進行線程通訊了。
Java.lang.object 里的三個方法wait() notify() notifyAll()

wait()
導(dǎo)致當(dāng)前線程等待,直到其他線程調(diào)用同步監(jiān)視器的notify方法或notifyAll方法來喚醒該線程。

wait(mills)
都是等待指定時間后自動蘇醒,調(diào)用wait方法的當(dāng)前線程會釋放該同步監(jiān)視器的鎖定,可以不用notify或notifyAll方法把它喚醒。

notify()
喚醒在同步監(jiān)視器上等待的單個線程,如果所有線程都在同步監(jiān)視器上等待,則會選擇喚醒其中一個線程,選擇是任意性的,只有當(dāng)前線程放棄對該同步監(jiān)視器的鎖定后,也就是使用wait方法后,才可以執(zhí)行被喚醒的線程。

notifyAll()
喚醒在同步監(jiān)視器上等待的所有的線程。只用當(dāng)前線程放棄對該同步監(jiān)視器的鎖定后,也就是使用wait方法后,才可以執(zhí)行被喚醒的線程。

注意,notify方法一定要在synchronized同步里面調(diào)用,還有做異常捕捉。


原子操作:根據(jù)Java規(guī)范,對于基本類型的賦值或者返回值操作,是原子操作。但這里的基本數(shù)據(jù)類型不包括long和double, 因為JVM看到的基本存儲單位是32位,而long 和double都要用64位來表示。所以無法在一個時鐘周期內(nèi)完成。

自增操作(++)不是原子操作,因為它涉及到一次讀和一次寫。

原子操作:由一組相關(guān)的操作完成,這些操作可能會操縱與其它的線程共享的資源,為了保證得到正確的運算結(jié)果,一個線程在執(zhí)行原子操作其間,應(yīng)該采取其他的措施使得其他的線程不能操縱共享資源。

同步代碼塊:為了保證每個線程能夠正常執(zhí)行原子操作,Java引入了同步機制,具體的做法是在代表原子操作的程序代碼前加上synchronized標(biāo)記,這樣的代碼被稱為同步代碼塊。

同步鎖:每個JAVA對象都有且只有一個同步鎖,在任何時刻,最多只允許一個線程擁有這把鎖。

當(dāng)一個線程試圖訪問帶有synchronized(this)標(biāo)記的代碼塊時,必須獲得 this關(guān)鍵字引用的對象的鎖,在以下的兩種情況下,本線程有著不同的命運。
1、 假如這個鎖已經(jīng)被其它的線程占用,JVM就會把這個線程放到本對象的鎖池中。本線程進入阻塞狀態(tài)。鎖池中可能有很多的線程,等到其他的線程釋放了鎖,JVM就會從鎖池中隨機取出一個線程,使這個線程擁有鎖,并且轉(zhuǎn)到就緒狀態(tài)。
2、 假如這個鎖沒有被其他線程占用,本線程會獲得這把鎖,開始執(zhí)行同步代碼塊。 (一般情況下在執(zhí)行同步代碼塊時不會釋放同步鎖,但也有特殊情況會釋放對象鎖 如在執(zhí)行同步代碼塊時,遇到異常而導(dǎo)致線程終止,鎖會被釋放;在執(zhí)行代碼塊時,執(zhí)行了鎖所屬對象的wait()方法,這個線程會釋放對象鎖,進入對象的等待池中)

線程同步的特征:
1、 如果一個同步代碼塊和非同步代碼塊同時操作共享資源,仍然會造成對共享資源的競爭。因為當(dāng)一個線程執(zhí)行一個對象的同步代碼塊時,其他的線程仍然可以執(zhí)行對象的非同步代碼塊。(所謂的線程之間保持同步,是指不同的線程在執(zhí)行同一個對象的同步代碼塊時,因為要獲得對象的同步鎖而互相牽制)
2、 每個對象都有唯一的同步鎖
3、 在靜態(tài)方法前面可以使用synchronized修飾符。
4、 當(dāng)一個線程開始執(zhí)行同步代碼塊時,并不意味著必須以不間斷的方式運行,進入同步代碼塊的線程可以執(zhí)行Thread.sleep()或執(zhí)行Thread.yield()方法,此時它并不釋放對象鎖,只是把運行的機會讓給其他的線程。
5、 Synchronized聲明不會被繼承,如果一個用synchronized修飾的方法被子類覆蓋,那么子類中這個方法不在保持同步,除非用synchronized修飾。

釋放對象的鎖:
1、 執(zhí)行完同步代碼塊就會釋放對象的鎖
2、 在執(zhí)行同步代碼塊的過程中,遇到異常而導(dǎo)致線程終止,鎖也會被釋放
3、 在執(zhí)行同步代碼塊的過程中,執(zhí)行了鎖所屬對象的wait()方法,這個線程會釋放對象鎖,進入對象的等待池。

死鎖:
線程1獨占(鎖定)資源A,等待獲得資源B后,才能繼續(xù)執(zhí)行
同時
線程2獨占(鎖定)資源B,等待獲得資源A后,才能繼續(xù)執(zhí)行
這樣就會發(fā)生死鎖,程序無法正常執(zhí)行

如何避免死鎖
一個通用的經(jīng)驗法則是:當(dāng)幾個線程都要訪問共享資源A、B、C 時,保證每個線程都按照同樣的順序去訪問他們。


注意:
1、線程同步就是線程排隊。同步就是排隊。線程同步的目的就是避免線程“同步”執(zhí)行。
2、只有共享資源的讀寫訪問才需要同步。如果不是共享資源,那么就根本沒有同步的必要。
3、只有“變量”才需要同步訪問。如果共享的資源是固定不變的,那么就相當(dāng)于“常量”,線程同時讀取常量也不需要同步。至少一個線程修改共享資源,這樣的情況下,線程之間就需要同步。
4、多個線程訪問共享資源的代碼有可能是同一份代碼,也有可能是不同的代碼;無論是否執(zhí)行同一份代碼,只要這些線程的代碼訪問同一份可變的共享資源,這些線程之間就需要同步。

5、我們要盡量避免這種直接把synchronized加在函數(shù)定義上的偷懶做法。因為我們要控制同步粒度。同步的代碼段越小越好。synchronized控制的范圍越小越好。

同步鎖:
我們可以給共享資源加一把鎖,這把鎖只有一把鑰匙。哪個線程獲取了這把鑰匙,才有權(quán)利訪問該共享資源。
同步鎖不是加在共享資源上,而是加在訪問共享資源的代碼段上。
訪問同一份共享資源的不同代碼段,應(yīng)該加上同一個同步鎖;如果加的是不同的同步鎖,那么根本就起不到同步的作用,沒有任何意義。
這就是說,同步鎖本身也一定是多個線程之間的共享對象。

三、生產(chǎn)者消費者代碼示例

產(chǎn)品倉庫

package com.danxx.javalib2;

import java.util.LinkedList;
import java.util.Queue;

/**
 * 數(shù)據(jù)存儲倉庫和操作
 * 一個緩沖區(qū),緩沖區(qū)有最大限制,當(dāng)緩沖區(qū)滿
 * 的時候,生產(chǎn)者是不能將產(chǎn)品放入到緩沖區(qū)里面的,
 * 當(dāng)然,當(dāng)緩沖區(qū)是空的時候,消費者也不能從中拿出來產(chǎn)品,
 * 這就涉及到了在多線程中的條件判斷
 * Created by dawish on 2017/7/13.
 */
public class Storage {
    
    private static volatile int goodNumber = 1;
    
    private final static int MAX_SIZE = 20;
    /**
     *  Queue操作解析:
     *  add       增加一個元索                 如果隊列已滿, 則拋出一個IIIegaISlabEepeplian異常
     *  remove    移除并返回隊列頭部的元素     如果隊列為空, 則拋出一個NoSuchElementException異常
     *  element   返回隊列頭部的元素           如果隊列為空, 則拋出一個NoSuchElementException異常
     *  offer     添加一個元素并返回true       如果隊列已滿, 則返回false
     *  poll      移除并返問隊列頭部的元素     如果隊列為空, 則返回null
     *  peek      返回隊列頭部的元素           如果隊列為空, 則返回null
     *  put       添加一個元素                 如果隊列滿,   則阻塞
     *  take      移除并返回隊列頭部的元素     如果隊列為空, 則阻塞
     *
     */
    Queue<String> storage;
    public Storage() {
        storage = new LinkedList<String>();
    }

    /**
     *
     * @param dataValue
     */
    public synchronized void put(String dataValue, String threadName){
        if(storage.size() >= MAX_SIZE){
            try {
                goodNumber = 1;
                super.wait();  //當(dāng)生產(chǎn)滿了后讓生產(chǎn)線程等待
                return;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        storage.add(dataValue + goodNumber++);
        System.out.println(threadName + dataValue + goodNumber);
        super.notify();  //每次添加一個數(shù)據(jù)就喚醒一個消費等待的線程來消費
    }

    /**
     *
     * @return
     * @throws InterruptedException
     */
    public synchronized String get(String threadName) {
        if(storage.size() == 0){
            try {
                super.wait();  //當(dāng)產(chǎn)品倉庫為空的時候讓消費線程等待
                System.out.println(threadName + "wait");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
        super.notify();  //當(dāng)數(shù)據(jù)不為空的時候就喚醒一個生產(chǎn)線程來生產(chǎn)
        String value = storage.remove();
        return value;
    }

}

生產(chǎn)者

package com.danxx.javalib2;

import java.util.UUID;

/**
 * 生產(chǎn)者
 * Created by dawish on 2017/7/13.
 */
public class Producer extends Thread{

    private Storage storage;//生產(chǎn)者倉庫
    private String name="";
    
    public Producer(Storage storage, String name) {
        this.storage = storage;
        this.name = name;
    }
    public void run(){
        //生產(chǎn)者每隔1s生產(chǎn)1~100消息
        long oldTime = System.currentTimeMillis();
        while(true){
            synchronized(storage){
                if (System.currentTimeMillis() - oldTime >= 1000) {
                    oldTime = System.currentTimeMillis();
                    String msg = UUID.randomUUID().toString();
                    storage.put("-ID:" ,name);
                }
            }
        }
    }
}

消費者

package com.danxx.javalib2;

/**
 * 消費者
 * Created by dawish on 2017/7/13.
 */

public class Consumer extends Thread{

    private Storage storage;//倉庫
    
    private String name="";
    
    public Consumer(Storage storage, String name) {
        this.storage = storage;
        this.name = name;
    }
    public void run(){
        while(true){
            synchronized(storage){
                //消費者去倉庫拿消息的時候,如果發(fā)現(xiàn)倉庫數(shù)據(jù)為空,則等待
                String data = storage.get(name);
                if(data != null){
                    
                    System.out.println(name +"-------------"+ data);
                    
                }
            }
        }
    }
}

main方法

package com.danxx.javalib2;

/**
 *  Java中的多線程會涉及到線程間通信,常見的線程通信方式,例如共享變量、管道流等,
 *  這里我們要實現(xiàn)生產(chǎn)者消費者模式,也需要涉及到線程通信,不過這里我們用到了java中的
 *  wait()、notify()方法:
 *  wait():進入臨界區(qū)的線程在運行到一部分后,發(fā)現(xiàn)進行后面的任務(wù)所需的資源還沒有準(zhǔn)備充分,
 *  所以調(diào)用wait()方法,讓線程阻塞,等待資源,同時釋放臨界區(qū)的鎖,此時線程的狀態(tài)也從RUNNABLE狀態(tài)變?yōu)閃AITING狀態(tài);
 *  notify():準(zhǔn)備資源的線程在準(zhǔn)備好資源后,調(diào)用notify()方法通知需要使用資源的線程,
 *  同時釋放臨界區(qū)的鎖,將臨界區(qū)的鎖交給使用資源的線程。
 *  wait()、notify()這兩個方法,都必須要在臨界區(qū)中調(diào)用,即是在synchronized同步塊中調(diào)用,
 *  不然會拋出IllegalMonitorStateException的異常。
 *  Created by dawish on 2017/7/14.
 */

public class MainApp {
    public static void main(String[] args) {
        Storage storage = new Storage();
        
        Producer producer1 = new Producer(storage, "Producer-1");
        Producer producer2 = new Producer(storage, "Producer-2");
        Producer producer3 = new Producer(storage, "Producer-3");
        Producer producer4 = new Producer(storage, "Producer-4");
        
        Consumer consumer1 = new Consumer(storage, "Consumer-1");
        Consumer consumer2 = new Consumer(storage, "Consumer-2");
        
        producer1.start();
        producer2.start();
        producer3.start();
        producer4.start();
        
        consumer1.start();
        consumer2.start();
    }
}

運行結(jié)果(4個生產(chǎn)者2個消費者)

這里寫圖片描述

四、Github地址

https://github.com/Dawish/CustomViews/tree/master/JavaLib

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容