java并發(fā)之線程通信

??談到并發(fā)我們就會想到多線程,要想實(shí)現(xiàn)多個(gè)線程之間的協(xié)同,如:線程執(zhí)行先后順序、獲取某個(gè)
線程執(zhí)行的結(jié)果等等。都涉及取線程之間相互通信。
線程通信分為以下四類:

  1. 文件共享
  2. 網(wǎng)絡(luò)共享
  3. 共享變量
  4. jdk提供的線程協(xié)調(diào)API?細(xì)分為:suspend/resume、wait/notify、park/uppack

??文件共享、變量共享這里不做討論,需要提一點(diǎn):變量共享這里的變量是指的類內(nèi)的全局變量并不是方法內(nèi)的局部變量。 我們著重討論一下上面提到的幾種線程協(xié)調(diào)API.

??JDK中對于需要多線程協(xié)作完成某一任務(wù)的典型場景是:生產(chǎn)者-消費(fèi)者模型。(線程阻塞、線程喚醒)。
這里為了說明問題,只是簡單的實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型。
示例:線程1去買包子,沒有包子,則不再執(zhí)行。線程2生產(chǎn)出包子,通知線程1繼續(xù)執(zhí)行。


生產(chǎn)-消費(fèi)模型.PNG

下面分別對API里面的 suspend/resume、wait/notify、park/uppack 進(jìn)行分析。

suspend/resume示例。
/**
 * 三種線程協(xié)作通信的方式之suspend/resume
 * @date 2019/2/21
 */
public class ThreadAPITest {
  /** 包子店 */
  public static Object baozidian = null;

  /**
   * 正常的suspend/resume
   * */
  public void suspendResumeTest1() throws Exception {
      // 啟動線程
      Thread consumerThread = new Thread(() -> {
          if (baozidian == null) { // 如果沒包子,則進(jìn)入等待
              System.out.println("1、進(jìn)入等待");
              System.out.println("suspend()前狀態(tài):" + Thread.currentThread().getState().toString());
              Thread.currentThread().suspend();
          }
          System.out.println("4、買到包子,回家");
      });
      consumerThread.start();
      // 3秒之后,生產(chǎn)一個(gè)包子
      Thread.sleep(3000L);
      System.out.println("suspend()后狀態(tài):" + consumerThread.getState().toString());
      baozidian = new Object();
      System.out.println("2、產(chǎn)生一個(gè)包子");
      System.out.println("執(zhí)行resume()。。。");
      consumerThread.resume();
      System.out.println("3、通知消費(fèi)者");
  }
  /**
   * 死鎖的suspend/resume。 suspend并不會像wait一樣釋放鎖,故此容易寫出死鎖代碼
   * */
  public void suspendResumeDeadLockTest2() throws Exception {
      // 啟動線程
      Thread consumerThread = new Thread(() -> {
          if (baozidian == null) { // 如果沒包子,則進(jìn)入等待
              System.out.println("1、進(jìn)入等待");
              // 當(dāng)前線程拿到鎖,然后掛起
              synchronized (this) {
                  System.out.println("獲取到synchronized,進(jìn)入synchronized,執(zhí)行suspend()。。。");
                  Thread.currentThread().suspend();
              }
          }
          System.out.println("4、買到包子,回家");
      });
      consumerThread.start();
      // 3秒之后,生產(chǎn)一個(gè)包子
      Thread.sleep(3000L);
      baozidian = new Object();
      System.out.println("2、產(chǎn)生一個(gè)包子");
      // 爭取到鎖以后,再恢復(fù)consumerThread
      synchronized (this) {
          System.out.println("獲取到synchronized,執(zhí)行suspend()。。。");
          consumerThread.resume();
      }
      System.out.println("3、通知消費(fèi)者");
  }

  /**
   * 導(dǎo)致程序永久掛起的suspend/resume
   * */
  public void suspendResumeDeadLockTest3() throws Exception {
      // 啟動線程
      Thread consumerThread = new Thread(() -> {
          if (baozidian == null) {
              System.out.println("1、沒包子,進(jìn)入等待");
              try { // 為這個(gè)線程加上一點(diǎn)延時(shí)
                  Thread.sleep(5000L);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              // 這里的掛起執(zhí)行在resume后面
              System.out.println("執(zhí)行suspend()。。。");
              Thread.currentThread().suspend();
          }
          System.out.println("4、買到包子,回家");
      });
      consumerThread.start();
      // 3秒之后,生產(chǎn)一個(gè)包子
      Thread.sleep(3000L);
      baozidian = new Object();
      System.out.println("2、產(chǎn)生一個(gè)包子");
      System.out.println("執(zhí)行resume()。。。");
      consumerThread.resume();
      consumerThread.join();
      System.out.println("3、通知消費(fèi)者");

  }
    public static void main(String[] args) throws Exception {
        //1
        new ThreadAPITest().suspendResumeTest1();
        //2
        new ThreadAPITest().suspendResumeDeadLockTest2();
        //3
        new ThreadAPITest().suspendResumeDeadLockTest3();
    }
}

第一個(gè)調(diào)用結(jié)果如下:

1、進(jìn)入等待
suspend()前狀態(tài):RUNNABLE
suspend()后狀態(tài):RUNNABLE
2、產(chǎn)生一個(gè)包子
執(zhí)行resume()。。。
3、通知消費(fèi)者
4、買到包子,回家

??這個(gè)是可以正常調(diào)用的,線程狀態(tài)從NEW-》RUNNABLE后,調(diào)用suspend()線程依然是RUNNABLE,這說明suspend()方法并沒有改變線程的
狀態(tài),只是掛起(暫停)線程而已。

第二個(gè)調(diào)用結(jié)果如下:

1、進(jìn)入等待
獲取到synchronized,進(jìn)入synchronized,執(zhí)行suspend()。。。
2、產(chǎn)生一個(gè)包子
.......................

??從這個(gè)結(jié)果可以看到,線程進(jìn)入synchronized代碼塊-》持有鎖-》執(zhí)行suspend()掛起-》3秒之后,生產(chǎn)一個(gè)包子線程就中止了,并沒有執(zhí)行System.out.println("獲取到synchronized,執(zhí)行suspend()。。。");這一行。這說明suspend()方法掛起時(shí)持有鎖,并沒有釋放鎖。 這樣就產(chǎn)生了死鎖。

第三個(gè)調(diào)用結(jié)果如下:

1、沒包子,進(jìn)入等待
2、產(chǎn)生一個(gè)包子
執(zhí)行resume()。。。
執(zhí)行suspend()。。。

??這個(gè)運(yùn)行是:線程先sleep()5秒-》主線程先執(zhí)行了resume()-》工作線程執(zhí)行suspend().這個(gè)時(shí)候線程就永久掛起了,也就是死鎖。所以suspend()必須在resume()之前執(zhí)行。

綜上suspend/resume方式:

  • suspend()方法掛起時(shí)線程狀態(tài)并沒有改變,此方法也沒有遵循線程狀態(tài)流程
  • 加鎖/不加鎖都對調(diào)用順序有嚴(yán)格要求,否則容易死鎖。
  • 加鎖時(shí)suspend()方法會持鎖,并且不釋放,這樣也容易導(dǎo)致死鎖。
wait/notify示例。
/**
 * 三種線程協(xié)作通信的方式之wait/notify
 * @date 2019/2/21
 */
public class ThreadAPITes2 {
    /** 包子店 */
    public static Object baozidian = null;

    /** 正常的wait/notify */
    public void waitNotifyTest1() throws Exception {
        // 啟動線程
        Thread waitNotify = new Thread(() -> {
            synchronized (this) {
                while (baozidian == null) { // 如果沒包子,則進(jìn)入等待
                    try {
                        System.out.println("1、進(jìn)入等待");
                        System.out.println("wait()前狀態(tài):" + Thread.currentThread().getState().toString());
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println("4、買到包子,回家");
        });
        waitNotify.start();
        // 3秒之后,生產(chǎn)一個(gè)包子
        Thread.sleep(3000L);
        System.out.println("2、生產(chǎn)一個(gè)包子");
        baozidian = new Object();
        synchronized (this) {
            System.out.println("wait()后狀態(tài):" + waitNotify.getState().toString());
            this.notifyAll();
            System.out.println("3、通知消費(fèi)者");
        }
    }

    /** 會導(dǎo)致程序永久等待的wait/notify */
    public void waitNotifyDeadLockTest2() throws Exception {
        // 啟動線程
        new Thread(() -> {
            if (baozidian == null) { // 如果沒包子,則進(jìn)入等待
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                synchronized (this) {
                    try {
                        System.out.println("1、進(jìn)入等待");
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println("2、買到包子,回家");
        }).start();
        // 3秒之后,生產(chǎn)一個(gè)包子
        Thread.sleep(3000L);
        baozidian = new Object();
        System.out.println("2、生產(chǎn)一個(gè)包子");
        synchronized (this) {
            System.out.println("執(zhí)行notifyAll()方法");
            this.notifyAll();
            System.out.println("3、通知消費(fèi)者");
        }
    }
    public static void main(String[] args) throws Exception {
        // wait/notify要求再同步關(guān)鍵字里面使用,免去了死鎖的困擾,
        //但是一定要先調(diào)用wait,再調(diào)用notify,否則永久等待了
        //1
        new ThreadAPITes2().waitNotifyTest1();
        //2
        new ThreadAPITes2().waitNotifyDeadLockTest2();
    }
}

第一個(gè)調(diào)用結(jié)果如下:

1、進(jìn)入等待
wait()前狀態(tài):RUNNABLE
2、生產(chǎn)一個(gè)包子
wait()后狀態(tài):WAITING
3、通知消費(fèi)者
4、買到包子,回家

??這是wait()/nofity()方式的正確示例。但有一點(diǎn)需要注意,wait()必須在同步關(guān)鍵字里面使用。執(zhí)行wait()后只是線程等待,等待時(shí)wait()方法會釋放鎖。

第二個(gè)調(diào)用結(jié)果如下:

2、生產(chǎn)一個(gè)包子
執(zhí)行notifyAll()方法
3、通知消費(fèi)者
1、進(jìn)入等待
。。。。。。。。。。。。。。

??這個(gè)執(zhí)行時(shí)先調(diào)用了nofityAll()方法,然后調(diào)用wait()方法進(jìn)入等待。就會產(chǎn)生死鎖。

綜上wait/notify方式:

  • wait()方法等待時(shí)會釋放鎖,這樣就免去了死鎖的困擾。
  • 必須在同步關(guān)鍵字里面使用。
  • 對調(diào)用順序有嚴(yán)格要求,否則容易死鎖。
park/uppark示例
/**
 * 三種線程協(xié)作通信的方式之park/unpark
 * @date 2019/2/21
 */
public class ThreadAPITes3 {
    /** 包子店 */
    public static Object baozidian = null;

    /** 正常的park/unpark */
    public void parkUnparkTest1() throws Exception {
        // 啟動線程
        Thread consumerThread = new Thread(() -> {
            while (baozidian == null) { // 如果沒包子,則進(jìn)入等待
                System.out.println("1、進(jìn)入等待");
                System.out.println("park()前狀態(tài):" + Thread.currentThread().getState().toString());
                LockSupport.park();
            }
            System.out.println("4、買到包子,回家");
        });
        consumerThread.start();
        // 3秒之后,生產(chǎn)一個(gè)包子
        Thread.sleep(3000L);
        baozidian = new Object();
        System.out.println("2、產(chǎn)生一個(gè)包子");
        System.out.println("park()后狀態(tài):" + consumerThread.getState().toString());
        System.out.println("調(diào)用uppark()");
        LockSupport.unpark(consumerThread);
        System.out.println("3、通知消費(fèi)者");
    }

    /** 正常的park/unpark */
    public void parkUnparkTest2() throws Exception {
        // 啟動線程
        Thread consumerThread = new Thread(() -> {
            while (baozidian == null) { // 如果沒包子,則進(jìn)入等待
                try {
                    System.out.println("1、進(jìn)入等待");
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("調(diào)用park()");
                LockSupport.park();
            }
            System.out.println("4、買到包子,回家");
        });
        consumerThread.start();
        // 1秒之后,生產(chǎn)一個(gè)包子
        Thread.sleep(1000L);
        baozidian = new Object();
        System.out.println("2、產(chǎn)生一個(gè)包子");
        System.out.println("調(diào)用uppark()");
        LockSupport.unpark(consumerThread);
        System.out.println("3、通知消費(fèi)者");
    }

    /** 死鎖的park/unpark */
    public void parkUnparkDeadLockTest3() throws Exception {
        // 啟動線程
        Thread consumerThread = new Thread(() -> {
            if (baozidian == null) { // 如果沒包子,則進(jìn)入等待
                System.out.println("1、進(jìn)入等待");
                // 當(dāng)前線程拿到鎖,然后掛起
                synchronized (this) {
                    LockSupport.park();
                }
            }
            System.out.println("4、買到包子,回家");
        });
        consumerThread.start();
        // 3秒之后,生產(chǎn)一個(gè)包子
        Thread.sleep(3000L);
        baozidian = new Object();
        System.out.println("2、產(chǎn)生一個(gè)包子");
        // 爭取到鎖以后,再恢復(fù)consumerThread
        synchronized (this) {
            System.out.println("調(diào)用uppark()");
            LockSupport.unpark(consumerThread);
        }
        System.out.println("3、通知消費(fèi)者");
    }
    public static void main(String[] args) throws Exception {
        // park/unpark沒有順序要求,但是park并不會釋放鎖,
        // 所有再同步代碼中使用要注意
        //1
        new ThreadAPITes3().parkUnparkTest1();
        //2
        new ThreadAPITes3().parkUnparkTest2();
        //3
        new ThreadAPITes3().parkUnparkDeadLockTest3();
    }
}

第一個(gè)調(diào)用結(jié)果如下

1、進(jìn)入等待
park()前狀態(tài):RUNNABLE
2、產(chǎn)生一個(gè)包子
park()后狀態(tài):WAITING
調(diào)用uppark()
3、通知消費(fèi)者
4、買到包子,回家

第二個(gè)調(diào)用結(jié)果如下

1、進(jìn)入等待
2、產(chǎn)生一個(gè)包子
調(diào)用uppark()
3、通知消費(fèi)者
調(diào)用park()
4、買到包子,回家

這兩個(gè)都是正常的調(diào)用,一個(gè)是先調(diào)用park(),一個(gè)是先調(diào)用uppark(),都可以正常運(yùn)行,park()/uppark() 對調(diào)用順序沒有要求。

第三個(gè)調(diào)用結(jié)果如下

1、進(jìn)入等待
2、產(chǎn)生一個(gè)包子
。。。。。。。。。。。

這個(gè)是在同步代碼塊中,先調(diào)用park,在調(diào)用uppark時(shí)獲取不到鎖,說明park()方法不會釋放鎖。這時(shí)就產(chǎn)生死鎖了。

綜上park/uppark方式:

  • park()方法等待時(shí)不會釋放鎖,在同步代碼塊中慎用。
  • park/unpark沒有順序要求。

park/unpark沒有順序要求,它的設(shè)計(jì)原理核心是“許可”:park是等待一個(gè)許可,unpark是為某線程提供一個(gè)許可。也就是說,unpark先提供許可。當(dāng)某線程調(diào)用park時(shí),已經(jīng)有許可了,它就消費(fèi)這個(gè)許可,然后可以繼續(xù)運(yùn)行了。再詳細(xì)的以后再討論。

以上三種方式,suspend/resume已被棄用,wait(),park()注要有以下幾個(gè)注意事項(xiàng)。

  • wait()需釋放鎖,用的時(shí)候需要在synchronized中使用.
  • wait()必須在nofify之前調(diào)用。
  • park/unpark在同步代碼塊中使用要注意park不釋放鎖的問題。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Lock和synchronized的區(qū)別和使用https://blog.csdn.net/ydk888888/ar...
    pluss閱讀 410評論 0 0
  • 參考資料:《Java高并發(fā)程序設(shè)計(jì)》 1.同步控制 1.擴(kuò)展了synchronized功能的:重入鎖 1.簡介 使...
    agile4j閱讀 948評論 0 0
  • 進(jìn)程和線程 進(jìn)程 所有運(yùn)行中的任務(wù)通常對應(yīng)一個(gè)進(jìn)程,當(dāng)一個(gè)程序進(jìn)入內(nèi)存運(yùn)行時(shí),即變成一個(gè)進(jìn)程.進(jìn)程是處于運(yùn)行過程中...
    勝浩_ae28閱讀 5,257評論 0 23
  • 線程既可共享進(jìn)程的資源(內(nèi)存地址、文件I/O等),又可以獨(dú)立調(diào)度(線程是CPU調(diào)度的最基本單位)。 一、基本概念 ...
    都有米閱讀 2,577評論 0 3
  • 全書加上序加上后記共303頁,而我暫時(shí)只看到了112頁。抽空記錄,畢竟人遺忘的速度驚人,而我不但容易忘記還喜...
    迷亂步伐閱讀 657評論 1 0

友情鏈接更多精彩內(nèi)容