四、Java線程間通信

摘自《Java并發(fā)編程的藝術(shù)》

1 volatile和synchronized關(guān)鍵字

關(guān)鍵字volatile可以用來修飾字段(成員變量),就是告知程序任何對(duì)該變量的訪問均需要從共享內(nèi)存中獲取,而對(duì)它的改變必須同步刷新回共享內(nèi)存,它能保證所有線程對(duì)變量訪問的可見性。

關(guān)鍵字synchronized可以修飾方法或者以同步塊的形式來進(jìn)行使用,它主要確保多個(gè)線程在同一個(gè)時(shí)刻,只能有一個(gè)線程處于方法或者同步塊中,它保證了線程對(duì)變量訪問的可見性和排他性。

通過使用javap工具查看生成的class文件信息來分析synchronized關(guān)鍵字的實(shí)現(xiàn)細(xì)節(jié),代碼如下

public class Synchronized {

    public static void main(String[] args) {
        synchronized (Synchronized.class){
            m();
        }
    }
    public static synchronized void m(){
    }
}

執(zhí)行javap -v Synchronized.class,部分相關(guān)輸出如下所示:

對(duì)于同步塊的實(shí)現(xiàn)使用了 monitorentermonitorexit 指令,而同步方法則是依賴方法修飾符上的ACC_SYNCHRONIZED來完成。無論采用哪種方式,其本質(zhì)是對(duì)一個(gè)對(duì)象的監(jiān)視器進(jìn)行獲取,而這個(gè)獲取過程是排他的,也就是同一時(shí)刻只能有一個(gè)線程獲取到由synchronized所保護(hù)對(duì)象的監(jiān)視器。

任意一個(gè)對(duì)象都擁有自己的監(jiān)視器,當(dāng)這個(gè)對(duì)象由同步塊或者這個(gè)對(duì)象的同步方法調(diào)用時(shí),執(zhí)行方法的線程必須先獲取到該對(duì)象的監(jiān)視器才能進(jìn)入同步塊或者同步方法,而沒有獲取到監(jiān)視器(執(zhí)行該方法)的線程將會(huì)被阻塞在同步塊和同步方法的入口處,進(jìn)入BLOCKED狀態(tài)。

下圖描述了對(duì)象、對(duì)象的監(jiān)視器、同步隊(duì)列和執(zhí)行線程之間的關(guān)系:

從圖中可以看到,任意線程對(duì)Object(Object由synchronized保護(hù))的訪問,首先要獲得Object的監(jiān)視器。如果獲取失敗,線程進(jìn)入同步隊(duì)列,線程狀態(tài)變?yōu)锽LOCKED。當(dāng)訪問Object的前驅(qū)(獲得了鎖的線程)釋放了鎖,則該釋放操作喚醒阻塞在同步隊(duì)列中的線程,使其重新嘗試對(duì)監(jiān)視器的獲取。

2 等待 / 通知機(jī)制

等待/通知機(jī)制是指一個(gè)線程A調(diào)用了對(duì)象O的wait()方法進(jìn)入等待狀態(tài),而另一個(gè)線程B調(diào)用了對(duì)象O的notify()或notifyAll()方法,線程A收到通知后從對(duì)象O的wait()方法返回,進(jìn)而執(zhí)行后續(xù)操作。上述兩個(gè)線程對(duì)象O來完成交互,而對(duì)象上的wait()和notify/notifyAll()的關(guān)系就如同開關(guān)信號(hào)一樣,用來完成等待方通知方之間的交互工作。

下面所示的例子中,創(chuàng)建了兩個(gè)線程——WaitThread和NotifyThread,前者檢查flag值是否為false,如果符合要求,進(jìn)行后續(xù)操作,否則在lock上等待,后者在睡眠了一段時(shí)間后對(duì)lock進(jìn)行通知,示例如下所示。

public class WaitNotify {
    static boolean flag = true;
    static Object lock = new Object();
 
    public static void main(String[] args) throws Exception {
        Thread waitThread = new Thread(new Wait(), "WaitThread");
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);
        Thread notifyThread = new Thread(new Notify(), "NotifyThread");
        notifyThread.start();
    }
 
    static class Wait implements Runnable {
        public void run() {
            // 加鎖,擁有l(wèi)ock的Monitor
            synchronized (lock) {
                // 當(dāng)條件不滿足時(shí),繼續(xù)wait,同時(shí)釋放了lock的鎖
                while (flag) {
                    try {
                        System.out.println(Thread.currentThread()
                                + " flag is true. wait@ "
                                + new SimpleDateFormat("HH:mm:ss")
                                        .format(new Date()));
                        lock.wait();
                    } catch (InterruptedException e) {
                    }
                }
                // 條件滿足時(shí),完成工作
                System.out.println(Thread.currentThread()
                        + " flag is false. running@ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            }
        }
    }
 
    static class Notify implements Runnable {
        public void run() {
            // 加鎖,擁有l(wèi)ock的Monitor
            synchronized (lock) {
                // 獲取lock的鎖,然后進(jìn)行通知,通知時(shí)不會(huì)釋放lock的鎖,
                // 直到當(dāng)前線程釋放了lock后,WaitThread才能從wait方法中返回
                System.out.println(Thread.currentThread()
                        + " hold lock. notify @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                lock.notifyAll();
                flag = false;
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 再次加鎖
            synchronized (lock) {
                System.out.println(Thread.currentThread()
                        + " hold lock again. sleep@ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

輸出如下(輸出內(nèi)容可能不同,主要區(qū)別在時(shí)間上)

Thread[WaitThread,5,main] flag is true. wait@ 16:15:44
Thread[NotifyThread,5,main] hold lock. notify @ 16:15:45
Thread[NotifyThread,5,main] hold lock again. sleep@ 16:15:50
Thread[WaitThread,5,main] flag is false. running@ 16:15:55

上述第3行和第4行輸出的順序可能會(huì)互換,而上述例子主要說明了調(diào)用wait()、notify()以及notifyAll()時(shí)需要注意的細(xì)節(jié),如下。

  1. 使用wait()、notify()和notifyAll()時(shí)需要先對(duì)調(diào)用對(duì)象加鎖
  2. 調(diào)用wait()方法后,線程狀態(tài)由RUNNING變?yōu)閃AITING,并將當(dāng)前線程放置到對(duì)象的等待隊(duì)列。
  3. notify()或notifyAll()方法調(diào)用后,等待線程依舊不會(huì)從wait()返回,需要調(diào)用notify()或notifAll()的線程釋放鎖之后,等待線程才有機(jī)會(huì)從wait()返回。
  4. notify()方法將等待隊(duì)列中的一個(gè)等待線程從等待隊(duì)列中移到同步隊(duì)列中,而notifyAll()方法則是將等待隊(duì)列中所有的線程全部移到同步隊(duì)列,被移動(dòng)的線程狀態(tài)由WAITING變?yōu)锽LOCKED。
  5. 從wait()方法返回的前提是獲得了調(diào)用對(duì)象的鎖。

從上述細(xì)節(jié)中可以看到,等待/通知機(jī)制依托于同步機(jī)制,其目的就是確保等待線程從wait()方法返回時(shí)能夠感知到通知線程對(duì)變量做出的修改。下圖描述了上述示例的過程。

在圖中,WaitThread首先獲取了對(duì)象的鎖,然后調(diào)用對(duì)象的wait()方法,從而放棄了鎖并進(jìn)入了對(duì)象的等待隊(duì)列WaitQueue中,進(jìn)入等待狀態(tài)。由于WaitThread釋放了對(duì)象的鎖,NotifyThread隨后獲取了對(duì)象的鎖,并調(diào)用對(duì)象的notify()方法,將WaitThread從WaitQueue移到SynchronizedQueue中,此時(shí)WaitThread的狀態(tài)變?yōu)樽枞麪顟B(tài)。NotifyThread釋放了鎖之后,WaitThread再次獲取到鎖并從wait()方法返回繼續(xù)執(zhí)行。

2.1 生產(chǎn)者/消費(fèi)者模式

Consumer.java

public class Consumer extends Thread {
    // 每次消費(fèi)的產(chǎn)品數(shù)量
    private int num;
 
    // 所在放置的倉庫
    private Storage storage;
 
    // 構(gòu)造函數(shù),設(shè)置倉庫
    public Consumer(Storage storage) {
        this.storage = storage;
    }
 
    // 線程run函數(shù)
    public void run() {
        consume(num);
    }
 
    // 調(diào)用倉庫Storage的生產(chǎn)函數(shù)
    public void consume(int num) {
        storage.consume(num);
    }
 
    // get/set方法
    public int getNum() {
        return num;
    }
 
    public void setNum(int num) {
        this.num = num;
    }
 
    public Storage getStorage() {
        return storage;
    }
 
    public void setStorage(Storage storage) {
        this.storage = storage;
    }
}

Producer.java

public class Producer extends Thread {
    // 每次生產(chǎn)的產(chǎn)品數(shù)量
    private int num;
 
    // 所在放置的倉庫
    private Storage storage;
 
    // 構(gòu)造函數(shù),設(shè)置倉庫
    public Producer(Storage storage) {
        this.storage = storage;
    }
 
    // 線程run函數(shù)
    public void run() {
        produce(num);
    }
 
    // 調(diào)用倉庫Storage的生產(chǎn)函數(shù)
    public void produce(int num) {
        storage.produce(num);
    }
 
    // get/set方法
    public int getNum() {
        return num;
    }
 
    public void setNum(int num) {
        this.num = num;
    }
 
    public Storage getStorage() {
        return storage;
    }
 
    public void setStorage(Storage storage) {
        this.storage = storage;
    }
}

Storage.java

public class Storage {
    // 倉庫最大存儲(chǔ)量
    private final int MAX_SIZE = 100;
 
    // 倉庫存儲(chǔ)的載體
    private LinkedList<Object> list = new LinkedList<Object>();
 
    // 生產(chǎn)num個(gè)產(chǎn)品
    public void produce(int num) {
        // 同步代碼段
        synchronized (list) {
            // 如果倉庫剩余容量不足
            while (list.size() + num > MAX_SIZE) {
                System.out.println("【要生產(chǎn)的產(chǎn)品數(shù)量】:" + num + "\t【庫可以存放存量】:"
                        + list.size() + "\t暫時(shí)不能執(zhí)行生產(chǎn)任務(wù)!");
                try {
                    // 由于條件不滿足,生產(chǎn)阻塞
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
 
            // 生產(chǎn)條件滿足情況下,生產(chǎn)num個(gè)產(chǎn)品
            for (int i = 1; i <= num; ++i) {
                list.add(new Object());
            }
 
            System.out.println("【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:" + num + "\t【現(xiàn)倉儲(chǔ)量為】:" + list.size());
            // 通知消費(fèi)者來消費(fèi)
            list.notifyAll();
        }
    }
 
    // 消費(fèi)num個(gè)產(chǎn)品
    public void consume(int num) {
        // 同步代碼段
        synchronized (list) {
            // 如果倉庫存儲(chǔ)量不足
            while (list.size() < num) {
                System.out.println("【要消費(fèi)的產(chǎn)品數(shù)量】:" + num + "\t【庫存量】:"
                        + list.size() + "\t暫時(shí)不能執(zhí)行生產(chǎn)任務(wù)!");
                try {
                    // 由于條件不滿足,消費(fèi)阻塞
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
 
            // 消費(fèi)條件滿足情況下,消費(fèi)num個(gè)產(chǎn)品
            for (int i = 1; i <= num; ++i) {
                list.remove();
            }
 
            System.out.println("【已經(jīng)消費(fèi)產(chǎn)品數(shù)】:" + num + "\t【現(xiàn)倉儲(chǔ)量為】:" + list.size());
            // 通知生產(chǎn)者生產(chǎn)
            list.notifyAll();
        }
    }
 
    // get/set方法
    public LinkedList<Object> getList() {
        return list;
    }
 
    public void setList(LinkedList<Object> list) {
        this.list = list;
    }
 
    public int getMAX_SIZE() {
        return MAX_SIZE;
    }
}

Test.java

public class Test {
    public static void main(String[] args) {
        // 倉庫對(duì)象
        Storage storage = new Storage();
 
        // 生產(chǎn)者對(duì)象
        Producer p1 = new Producer(storage);
        Producer p2 = new Producer(storage);
        Producer p3 = new Producer(storage);
        Producer p4 = new Producer(storage);
        Producer p5 = new Producer(storage);
        Producer p6 = new Producer(storage);
        Producer p7 = new Producer(storage);
 
        // 消費(fèi)者對(duì)象
        Consumer c1 = new Consumer(storage);
        Consumer c2 = new Consumer(storage);
        Consumer c3 = new Consumer(storage);
 
        // 設(shè)置生產(chǎn)者產(chǎn)品生產(chǎn)數(shù)量
        p1.setNum(10);
        p2.setNum(10);
        p3.setNum(10);
        p4.setNum(10);
        p5.setNum(10);
        p6.setNum(10);
        p7.setNum(80);
 
        // 設(shè)置消費(fèi)者產(chǎn)品消費(fèi)數(shù)量
        c1.setNum(50);
        c2.setNum(20);
        c3.setNum(30);
 
        // 線程開始執(zhí)行
        c1.start();
        c2.start();
        c3.start();
        p1.start();
        p2.start();
        p3.start();
        p4.start();
        p5.start();
        p6.start();
        p7.start();
    }
}

3 Thread.join()的使用

如果一個(gè)線程A執(zhí)行了thread.join()語句,其含義是:當(dāng)前線程A等待thread線程終止之后才 從thread.join()返回。線程Thread除了提供join()方法之外,還提供了join(long millis)和join(long millis,int nanos)兩個(gè)具備超時(shí)特性的方法。這兩個(gè)超時(shí)方法表示,如果線程thread在給定的超時(shí)時(shí)間里沒有終止,那么將會(huì)從該超時(shí)方法中返回。

4 ThreadLocal的使用

ThreadLocal,即線程變量,是一個(gè)以ThreadLocal對(duì)象為鍵、任意對(duì)象為值的存儲(chǔ)結(jié)構(gòu)。這 個(gè)結(jié)構(gòu)被附帶在線程上,也就是說一個(gè)線程可以根據(jù)一個(gè)ThreadLocal對(duì)象查詢到綁定在這個(gè)線程上的一個(gè)值。

可以通過set(T)方法來設(shè)置一個(gè)值,在當(dāng)前線程下再通過get()方法獲取到原先設(shè)置的值。

在代碼清單4-15所示的例子中,構(gòu)建了一個(gè)常用的Profiler類,它具有begin()和end()兩個(gè)方法,而end()方法返回從begin()方法調(diào)用開始到end()方法被調(diào)用時(shí)的時(shí)間差,單位是毫秒。

public class Profiler {
    // 第一次get()方法調(diào)用時(shí)會(huì)進(jìn)行初始化(如果set方法沒有調(diào)用),每個(gè)線程會(huì)調(diào)用一次
    private static final ThreadLocal<Long> TIME_THREADLOCAL = 
            new ThreadLocal<Long>() {
        @Override
        protected Long initialValue() {
            return System.currentTimeMillis();
        }
    };

    public static void main(String[] args) throws InterruptedException {
        Profiler.begin();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Time cost is: " + Profiler.end() + " mills");
    }

    public static final void begin() {
        TIME_THREADLOCAL.set(System.currentTimeMillis());
    }

    public static final long end() {
        // 時(shí)間消耗
        return System.currentTimeMillis() - TIME_THREADLOCAL.get();
    }
}

輸出結(jié)果如下所示

Cost: 1001 mills
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文是我自己在秋招復(fù)習(xí)時(shí)的讀書筆記,整理的知識(shí)點(diǎn),也是為了防止忘記,尊重勞動(dòng)成果,轉(zhuǎn)載注明出處哦!如果你也喜歡,那...
    波波波先森閱讀 11,589評(píng)論 4 56
  • 進(jìn)程和線程 進(jìn)程 所有運(yùn)行中的任務(wù)通常對(duì)應(yīng)一個(gè)進(jìn)程,當(dāng)一個(gè)程序進(jìn)入內(nèi)存運(yùn)行時(shí),即變成一個(gè)進(jìn)程.進(jìn)程是處于運(yùn)行過程中...
    勝浩_ae28閱讀 5,256評(píng)論 0 23
  • 本文首發(fā)于我的個(gè)人博客:尾尾部落 本文是我刷了幾十篇一線互聯(lián)網(wǎng)校招java后端開發(fā)崗位的面經(jīng)后總結(jié)的多線程相關(guān)題目...
    繁著閱讀 2,125評(píng)論 0 7
  • 本文出自 Eddy Wiki ,轉(zhuǎn)載請(qǐng)注明出處:http://eddy.wiki/interview-java.h...
    eddy_wiki閱讀 2,297評(píng)論 0 14
  • 在Java平臺(tái)中,Object.wait()/Object.wait(long)以及Object.notify()...
    CodeKing2017閱讀 6,832評(píng)論 0 4

友情鏈接更多精彩內(nèi)容