《生產(chǎn)者與消費(fèi)者》

志梳理下,生產(chǎn)者消費(fèi)者模式

簡(jiǎn)單的模型

先從一個(gè)例子開(kāi)始吧,有一些角色我先聲明如下:

  • 餐廳(Restaurant)--->載體
  • 廚師(Chef) --->生產(chǎn)者
  • 服務(wù)員(WaiterPerson) --->消費(fèi)者
  • 食物(Meaf)--->被消費(fèi)

我梳理一下它們的工作流程:

  • 故事的地點(diǎn)發(fā)生在餐廳,它是載體,包括了廚師、服務(wù)員、食物。

  • 廚師在餐廳做飯,做完飯,飯放在櫥窗,通知服務(wù)員端走,送給客人吃完;期間,廚師會(huì)不斷地監(jiān)控櫥窗的食物是否被端走,如果端走則繼續(xù)做新的食物,否則等待。

  • 服務(wù)員也不能閑著,它時(shí)刻留心著櫥窗是否有食物上架,如果沒(méi)有則繼續(xù)等待。如果有食物則端走,并通知廚師,我端走食物了,你可以做新食物了。

那么按照上面的步驟,首先我們看看生產(chǎn)者Chef的基本代碼

   synchronized(this){
       while (restaurant.meal != null) {
              wait(); 
       }
  }

上面代碼表示,倘若食物已經(jīng)做好一份了,廚師不斷監(jiān)控櫥窗上面的食物,如果沒(méi)有被服務(wù)員端走(消費(fèi)),那我廚師就繼續(xù)等待,多休息一會(huì)。注意這里用while而不是if是因?yàn)榉乐苟鄠€(gè)消費(fèi)者產(chǎn)生競(jìng)爭(zhēng)引起并發(fā)問(wèn)題。

 System.out.println("飯做好了,訂單生成...")。
  synchronized(restaurant.waiter){
       restaurant.meal=new Meal();
       restaurant.waiter.notifyAll();
 }

上面代碼表示 ,廚師沒(méi)有等待了,他開(kāi)始做飯(生產(chǎn)),完成生產(chǎn)食物后,廚師通知(notifyAll)正在櫥窗等待食物的服務(wù)員,叫他去端菜(消費(fèi))。

那消費(fèi)者Waiter的流程呢?我想過(guò)程應(yīng)該是和生產(chǎn)者恰好是對(duì)立的。

synchronized(this){
    while(restaurant.meal==null){
           wait();
      }
}

上面代碼表示,服務(wù)員不斷監(jiān)控櫥窗上面的食物有沒(méi)有做好,如果沒(méi)有做好,那我服務(wù)員就繼續(xù)等待。 是吧?和前面的生產(chǎn)者的判斷條件剛好對(duì)立。

 System.out.println("我服務(wù)員把飯端走了...")。
 synchronized(restaurant.chef){
       restaurant.meal=null;
       restaurant.chef.notifyAll();
}

上面代碼表示 ,服務(wù)員被廚師通知端飯(消費(fèi))了,于是他開(kāi)始端飯送個(gè)客人,導(dǎo)致櫥窗上沒(méi)有飯了,之后,服務(wù)員通知(notifyAll)櫥窗口正在等待的廚師去做下一道菜(生產(chǎn))。

通過(guò)上面的例子,我們可以初步了解生產(chǎn)者與消費(fèi)者的工作模式。但是實(shí)際開(kāi)發(fā)場(chǎng)景中,應(yīng)該有不止一個(gè)生產(chǎn)者或者消費(fèi)者,而且食物應(yīng)該很多,那么這個(gè)時(shí)候我們應(yīng)該引入隊(duì)列(Queque)這個(gè)數(shù)據(jù)結(jié)構(gòu)來(lái)管理它們了。

利用隊(duì)列管理生產(chǎn)者與消費(fèi)者

我們可以設(shè)想一下,在餐廳中的業(yè)務(wù)場(chǎng)景,廚師chef應(yīng)該作為Runable角色可以有多個(gè),我們可以用Excutor.submit(r)提交很多個(gè)廚師,讓其工作, 而服務(wù)員我們也可以有多個(gè),同理,我們也把他放入線程池去運(yùn)行。 而食物Meal也有多個(gè),并且我們要用一個(gè)數(shù)據(jù)結(jié)構(gòu)存取它,讓它作為廚師和服務(wù)員兩者共同占有的資源又能做好同步處理。在上面的例子中,我們用wait(),notifyAll(),synchronized等方法進(jìn)行食物的同步與通信。它們有一個(gè)明顯的缺點(diǎn),我們發(fā)現(xiàn)代碼很是耦合,晦澀難懂,暫且不談性能。

讓開(kāi)發(fā)者欣慰的事,JDK中提供了BlockQueque接口來(lái)存取“食物”。它是一個(gè)阻塞隊(duì)列的數(shù)據(jù)結(jié)構(gòu)。在這里,我們需要了解兩點(diǎn);

  • 在開(kāi)發(fā)過(guò)程中,"食物"常常指是的IO流。如網(wǎng)絡(luò)編程中,服務(wù)端與客戶端發(fā)送字節(jié)流相互通信?,F(xiàn)在有netty或者nio等異步非阻塞IO的框架,讓并發(fā)性能更佳。

  • 阻塞是為了保證生產(chǎn)者與消費(fèi)者步調(diào)一致,不要產(chǎn)生大量浪費(fèi)的食物,消費(fèi)者吃不完,導(dǎo)致資源耗盡。亦或者消費(fèi)者盲目的去找生產(chǎn)者要食物,太多消費(fèi)者擁擠,也會(huì)消耗資源。所以在剛剛開(kāi)始的時(shí)候,jdk做了這個(gè)BlockQueque來(lái)管理食物和生產(chǎn)者和消費(fèi)者通信。 生產(chǎn)者要生產(chǎn)食物如下面的代碼:

    @Override
      public void run() {
      try {
          while (!Thread.interrupted()) {
              Meal meal = new Meal(++count);
              mBlockQueque.put(meal);// 如果mBlockQueque容量不為empty則阻塞等待。                                             
              TimeUnit.SECONDS.sleep(2);//模擬生產(chǎn)耗時(shí)任務(wù)。
           }
        } catch (InterruptedException e) {
          System.out.println("Chef sleep end interrupted...");
          e.printStackTrace();
        }
    }
    

上面mBlockQueque.put()為阻塞方法(如果櫥窗(隊(duì)列)還有食物未被領(lǐng)取,則等待不生產(chǎn)食物,否則生產(chǎn)食物并添加至櫥窗),如注釋上的說(shuō)明,它的作用類(lèi)似wait()/add();我們跟蹤下源碼:

   /**
     * @throws NullPointerException {@inheritDoc}
     * @throws InterruptedException {@inheritDoc}
     */
    public void putFirst(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock(); //                                            ---(1)
        try {
            while (!linkFirst(node))
                notFull.await();                                   ---(2)
        } finally {
            lock.unlock();
        }
    }

我解析下上面的代碼:put()是一個(gè)接口方法,它具體的實(shí)現(xiàn)方法之一是putFirst,給鏈表首位添加一個(gè)元素。

  1. (1)此處有l(wèi)ock-finally-unlock組成的臨界區(qū)。它的作用類(lèi)似synchronized,用來(lái)同步。它們之間不同的地方是:

一、用synchronized聲明鎖時(shí),任務(wù)A和任務(wù)B,都要獲取鎖O,如果A首先獲得鎖O,B則一直等待直到A釋放鎖,B一直阻塞著不能被中斷。
二、用lock-finally-unlock聲明鎖時(shí),任務(wù)A和任務(wù)B,都要獲取鎖O,如果A首先獲得鎖O,B可以等待一段時(shí)間,不想等待了,可以自行中斷。A如果想釋放鎖必須在finally后調(diào)用unlock。所以說(shuō)我覺(jué)得lock更加靈活。
但是在大多數(shù)資源競(jìng)爭(zhēng)不太激烈的情況下,我們還是用synchronized足夠了。

  1. (2)此處notFull是Condition的實(shí)例。它提供更好的性能,通過(guò)await()/signal()方法扮演之前的wait()/notify()的角色。 這里代碼是指while判斷鏈表是否超過(guò)容量,返回false時(shí),則調(diào)用await()阻塞等待當(dāng)前任務(wù)線程。

我們分析完生產(chǎn)者chef,我們來(lái)看看消費(fèi)者waiter的改造后的代碼:

@Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                Meal meal = mBlockQueque.take();//從隊(duì)列中remove出一個(gè)食物,沒(méi)有食物則阻塞等待
                meal.run();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

上述代碼中,mBlockQueque.take()是一個(gè)可阻塞方法。它試圖從櫥窗隊(duì)列上取食物,如果發(fā)現(xiàn)沒(méi)有食物就阻塞消費(fèi)者線程??纯磘ake()的具體實(shí)現(xiàn)的源碼:

 public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null) //                       ---(1)
                notEmpty.await();                                         ---(2)
            return x;
        } finally {
            lock.unlock();
        }
    }

(1)takeFisrt()方法中會(huì)有去調(diào)用unlinkFirst()去隊(duì)列返回一個(gè)食物,如果有食物,就返回,并調(diào)用notFull.signal()喚醒正在阻塞的生產(chǎn)者線程。
(2) notEmpty是另外一個(gè)Condition實(shí)例,它用來(lái)和消費(fèi)者線程通信。如果發(fā)現(xiàn)返回的食物為空,則notEmpty.await()讓消費(fèi)者線程阻塞等待。
至此。我們看到我們把具體的通信交互過(guò)程封裝到了阻塞隊(duì)列BlockQueue里。 生產(chǎn)者只需要調(diào)用take通信,消費(fèi)者只需調(diào)用put通信。如下圖:

通信結(jié)構(gòu)

寫(xiě)到這里了,那生產(chǎn)者與消費(fèi)者模式有哪些實(shí)際應(yīng)用呢? 我想線程池應(yīng)該是應(yīng)用最廣泛的地方。下一篇我將詳細(xì)介紹線程池的原理。

注:部分參考自《Java 編程思想》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容