生產(chǎn)者-消費(fèi)者模式 系列 之一 Sychronized,Notify,Wait 篇

生產(chǎn)者,消費(fèi)者模式可謂是Java多線程中比較經(jīng)典的例子.該系列文章希望以該模式的實(shí)現(xiàn)為起點(diǎn),將Java中對于多線程同步和通訊技術(shù)做一個(gè)總結(jié).這第一個(gè)坑肯定要留給包括 Sychronized , Notify,和Wait 方法.關(guān)于這幾個(gè)技術(shù)點(diǎn)的介紹,網(wǎng)上一搜一大把,這里不多說.只是把我自己學(xué)習(xí)過程中的一些理解難點(diǎn)做個(gè)說明,希望對看到這篇文章的人有所幫助.
Sychronized : 知道鎖一般包括類級鎖和對象級鎖.鎖類型不同,結(jié)果也是不同的. 實(shí)驗(yàn)一下下面的代碼就知道了.看注釋.

public class SynchronziedTest {  
  
    public static void main(String[] args){  
        C c1 = new C(new Share(),"Consumer01");//注意,這里每個(gè)線程有不同的Share實(shí)例.  
        C c2 = new C(new Share(),"Consumer02");  
        C c3 = new C(new Share(),"Consumer03");  
        P p1 = new P(new Share(),"Producer01");  
        c1.start();  
        c2.start();  
        c3.start();  
        p1.start();  
          
    }  
}  
class Share{  
      
    private static int i;  
    private final static Object _lock = new Object();  
      
    public void add(){  
        synchronized(this._lock){  //將此處改為this.getClass()效果相同,因?yàn)樗麄儷@得的是類級鎖.更改為this就錯(cuò)了,它獲得是對象鎖.   
          ++i;  
          System.out.println(Thread.currentThread().getName() + " Remaining Size = " + i);  
        }  
          
    }  
    public void get(){  
        synchronized(this._lock){ //將此處改為this.getClass()效果相同,因?yàn)樗麄儷@得的是類級鎖.更改為this就錯(cuò)了,它獲得是對象鎖.            if (i > 0){  
                try {  
                    Thread.sleep(1000); //讓線程睡會(huì),更改為synchronized(this)后,增大別的線程(i>0)為true 的機(jī)會(huì).  
                } catch (InterruptedException e) {  
                    // TODO Auto-generated catch block  
                    e.printStackTrace();  
                }  
                --i;  
                System.out.println(Thread.currentThread().getName() + " Remaining Size = " + i);  
            }  
              
        }  
    }  
}  
class C extends Thread{  
    private Share _repository;  
    public C(Share s,String threadName){  
        super(threadName);  
        this._repository = s;  
    }  
    public void run() {  
         while(true){  
             this._repository.get();  
             try {  
                Thread.sleep(1000);  
            } catch (InterruptedException e) {  
                // TODO Auto-generated catch block  
                e.printStackTrace();  
            }  
         }  
    }  
}  
  
class P extends Thread{  
    private Share _repository;  
    public P(Share s,String threadName){  
        super(threadName);  
        this._repository = s;  
          
    }  
    public void run() {  
        while(true){  
            this._repository.add();  
            try {  
                Thread.sleep(1000);  
            } catch (InterruptedException e) {  
                // TODO Auto-generated catch block  
                e.printStackTrace();  
            }  
        }  
    }  
}  

Wait : 阻塞當(dāng)前持有調(diào)用該方法的對象的鎖的線程,并釋放鎖.當(dāng)被同一個(gè)對象的notify方法喚醒時(shí),只有獲得CPU資源后,該線程將從調(diào)用wait方法執(zhí)行點(diǎn)繼續(xù)執(zhí)行.
Notify : 通知被同一個(gè)對象的wait方法所阻塞的一個(gè)線程.使其在獲得CPU資源后從上次的執(zhí)行點(diǎn)(即wait方法處)繼續(xù)執(zhí)行.

  • 生產(chǎn)者消費(fèi)者場景假定
  • 同時(shí)只能有一個(gè)生產(chǎn)者進(jìn)行生產(chǎn).生產(chǎn)的同時(shí),不能有消費(fèi)者在消費(fèi).
  • 同時(shí)只有一個(gè)消費(fèi)者在消費(fèi).消費(fèi)的同時(shí),不能有生產(chǎn)者生產(chǎn).
  • 生產(chǎn)者最多能生產(chǎn)10個(gè)產(chǎn)品. 如果當(dāng)前產(chǎn)品數(shù)超過10個(gè),生產(chǎn)者將等待直到產(chǎn)品數(shù)小于10才開始生產(chǎn).
  • 當(dāng)前如果沒有可消費(fèi)產(chǎn)品時(shí),消費(fèi)者將等待直到有產(chǎn)品可消費(fèi)為止.
import java.util.Stack;  
  
public class TypicalCPTest {  
  
  
    public static void main(String[] args) {  
          
        Repository s = new Repository();  
        Maker f1 = new Maker(s,"P001");  
        Maker f2 = new Maker(s,"P002");  
          
        f1.start();  
        f2.start();  
  
        Taker c1 = new Taker(s,"C001");  
        Taker c2 = new Taker(s,"C002");  
        Taker c3 = new Taker(s,"C003");  
        c1.start();  
        c2.start();  
        c3.start();  
          
    }  
  
}  
class Repository{  
      
    private  final static int MAX_ELEMENT = 10;  
      
    private Stack<String> _store = new Stack<String>();  
      
    public  void add(String in) throws InterruptedException{  
          
        synchronized(this){  
            while (this._store.size() >= MAX_ELEMENT) {  
                System.out.println(Thread.currentThread().getName() + " is waiting on add.");  
                this.wait(); //如果參數(shù)中數(shù)量達(dá)到10個(gè)的最大值.生產(chǎn)者線程等待.  
                System.out.println(Thread.currentThread().getName() + " is after waiting on add.");  
            }  
            this._store.push(in);  
            System.out.println(Thread.currentThread().getName() + " is adding product "+ in+". Remaining size is "+ this._store.size());  
            this.notify(); //喚醒那些因?yàn)楫a(chǎn)品庫為零時(shí)等待的消費(fèi)者進(jìn)程.  
        }  
  
  
    }  
      
    public   String  get() throws InterruptedException{  
        String rtn = "";  
        synchronized(this){  
            while (this._store.isEmpty()) {  
                System.out.println(Thread.currentThread().getName() + " is waiting on get.");  
                this.wait();//如果產(chǎn)品庫為空,消費(fèi)者線程等待.直到產(chǎn)品庫中有產(chǎn)品時(shí)被喚醒.  
                System.out.println(Thread.currentThread().getName() + " is after waiting on get.");  
                  
            }  
              
            rtn = this._store.pop();  
            System.out.println(Thread.currentThread().getName() + " is getting product "+ rtn+". Remaining size is "+ this._store.size());  
            this.notify();//喚醒因產(chǎn)品庫為空可能導(dǎo)致等待的生產(chǎn)者線程.  
        }  
        return rtn;  
    }  
  
}  
  
class Maker implements Runnable {  
         
    private Repository _store;  
    private String _name;  
    private Thread _thread;  
    public Maker(Repository s,String name) {  
    super();  
    this._store = s;  
    this._name = name;  
    this._thread = new Thread(this,name);  
    }  
      
    public void start(){  
        this._thread.start();  
    }  
      
    @Override  
    public void run() {  
        int i = 0;  
        while(true){  
            try {  
                this._store.add(this._name + " Product "+ ++i);  
                Thread.sleep(1000);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            } catch (Exception e){  
                e.printStackTrace();  
            }  
        }  
  
    }  
  
}  
  
  
class Taker implements Runnable {  
        
    private Repository _store;  
    private Thread _thread;  
    public Taker(Repository s,String name) {  
        super();  
        this._store =s;  
        this._thread = new Thread(this,name);  
    }  
    public void start(){  
        this._thread.start();  
    }  
      
    @Override  
    public void run() {  
        while(true){  
            try {  
                this._store.get();  
                Thread.sleep(5000);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }catch (Exception e) {  
                e.printStackTrace();  
            }  
        }  
  
    }  
  
}  

運(yùn)行結(jié)果:

P001 is adding product P001 Product 1. Remaining size is 1  
C001 is getting product P001 Product 1. Remaining size is 0  
P002 is adding product P002 Product 1. Remaining size is 1  
C003 is getting product P002 Product 1. Remaining size is 0  
C002 is waiting on get.  
P002 is adding product P002 Product 2. Remaining size is 1  
C002 is after waiting on get.  
C002 is getting product P002 Product 2. Remaining size is 0  
P001 is adding product P001 Product 2. Remaining size is 1  
P001 is adding product P001 Product 3. Remaining size is 2  
P002 is adding product P002 Product 3. Remaining size is 3  
P002 is adding product P002 Product 4. Remaining size is 4  
P001 is adding product P001 Product 4. Remaining size is 5  
P001 is adding product P001 Product 5. Remaining size is 6  
P002 is adding product P002 Product 5. Remaining size is 7  
P001 is adding product P001 Product 6. Remaining size is 8  
P002 is adding product P002 Product 6. Remaining size is 9  
C003 is getting product P002 Product 6. Remaining size is 8  
C001 is getting product P001 Product 6. Remaining size is 7  
C002 is getting product P002 Product 5. Remaining size is 6  
P002 is adding product P002 Product 7. Remaining size is 7  
P001 is adding product P001 Product 7. Remaining size is 8  
P002 is adding product P002 Product 8. Remaining size is 9  
P001 is adding product P001 Product 8. Remaining size is 10  
P001 is waiting on add.  
P002 is waiting on add.  
C001 is getting product P001 Product 8. Remaining size is 9  
P001 is after waiting on add.  
P001 is adding product P001 Product 9. Remaining size is 10  
P002 is after waiting on add.  
P002 is waiting on add.  
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容