Java并發(fā)(3)-- 線程間通信&生產(chǎn)者消費者問題和哲學家就餐問題

守望先鋒 獵空

本文主要分兩個章節(jié),先對線程間通信機制的介紹,然后通過對生產(chǎn)者問題和哲學家問題的解決對線程的基礎部分收尾

  1. 線程間通信機制
    1.1 使用同步機制
    1.2 使用輪詢機制
    1.3 使用wait/notify
    1.4 使用Lock/Condition
  2. 兩個經(jīng)典問題
    2.1 哲學家問題死鎖的解決
    2.2 生產(chǎn)者消費者問題

線程間通信機制

同步機制

使用關鍵字volatilesynchronized ,前面幾篇文章已經(jīng)說明了這個問題,這里不再重復

使用輪詢機制

public class SpinLockTest {


    private static CountDownLatch latch = new CountDownLatch(100);
    private AtomicReference<Thread> ref = new AtomicReference<>();

    public void lock() {
        Thread currentThread = Thread.currentThread();
        while (!ref.compareAndSet(null, currentThread)) {
        }
    }

    public void unLock() {
        Thread thread = Thread.currentThread();
        ref.compareAndSet(thread,null);
    }


    public static void main(String args[]) {
        ExecutorService service = Executors.newCachedThreadPool();
        SpinLockTest test = new SpinLockTest();
        int count[] = {0};
        for (int i = 0; i < 100 ; i++) {
            service.execute(new Thread(() -> {
                test.lock();
                count[0] ++;
                test.unLock();
                latch.countDown();
            }));
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(count[0]);
    }

}

我們使用了CountDownLatch做一個100次的倒計時,如果倒計時0時,結束阻塞。理想情況下,100個線程應該會讓最后的結果變成100,而結果和我們預料的一致,假設第一個被調(diào)度的線程為A,ref.compareAndSet()返回true(當前是nullexpect的是也是null,ref的值被設置成currentThread的值)。當A線程沒有unlock()時,如果來一個B線程,不滿足while中CAS的條件,開始while循環(huán),B線程會一直詢問有鎖嗎,有鎖嗎......直到A線程unlock為止。

B線程詢問有鎖嗎?

我們這個例子中也實現(xiàn)了一個自旋鎖:一個線程從在阻塞到切換成為別的線程的過程,如果只是執(zhí)行簡單的任務的話,切換線程上下文的時間反而比執(zhí)行任務的時間還要長。所以我們可以采取自旋鎖的方法進行線程的同步。

使用wait/notify

wait()notify()是定義在Object上的native方法,具體的內(nèi)容有賴于各個平臺的實現(xiàn)。

wait/notfity具體使用

  1. wait()和notify()
    wait()函數(shù)調(diào)用之后線程被掛起。調(diào)用了notify()、notifyAll()之后會喚醒一個等待這個對象鎖的線程,但是只有當退出對象鎖的區(qū)域才行。
    對象調(diào)用notify()之后只會有一個線程去競爭鎖,notifyAll()會讓所有等待這個對象鎖的線程去競爭鎖。
  2. 具體使用
    Java中給出了一個使用wait()很明確的套路,就是使用這樣的一個結構:
synchronized(object){
  //某種條件
  while(condition){
      //do something  
      wait();
     //do something else
  }
}

首先記住以下原則:

  1. wait()notify()方法必須定義在synchronized方法塊中
  2. wait()通常情況下放在while塊中,這主要是因為虛假喚醒問題
    下面看一段例子:
public class LockReleaseTest {

    private static Object object = new Object();

    private static class A extends  Thread{

        private Object object;
        public A(Object object){
            this.object = object;
        }
        public void run(){
            synchronized (object) {
                while (!Thread.interrupted()) {
                    try {
                        //讓A線程直接wait
                        System.out.println("A進入同步代碼塊");
                        //wait 將線程掛起 從哪里跌倒從哪里爬起來 如果喚醒了 從這里繼續(xù)運行
                        object.wait();
                        System.out.println("線程A獲得了鎖");

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println("A退出同步代碼塊 退出run()");
        }
    }

    private static class B extends  Thread{

        private Object object;
        public B(Object object){
            this.object = object;
        }
        public void run() {
            synchronized (object) {
                while (!Thread.interrupted()) {
                    System.out.println("B進入同步代碼塊");
                    object.notify();
                    System.out.println("B通知A 從掛起中醒來,但是沒有釋放鎖");
                    try {
                        TimeUnit.MILLISECONDS.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("線程B退出同步代碼塊");
                }
                System.out.println("線程B釋放了鎖");
            }
        }
    }

    public static void main(String args[]) throws InterruptedException {
        A a = new A(object);
        B b = new B(object);

        a.start();
        b.start();
    }
}

A,B分別是兩個線程,他們都通過一個公共的object進行同步(通過構造函數(shù)傳入的),運行之后的結果如下所示:

A進入同步代碼塊
B進入同步代碼塊
B通知A 從掛起中醒來,但是沒有釋放鎖
線程B退出同步代碼塊
線程B釋放了鎖
線程A獲得了鎖
A退出同步代碼塊 退出run()

A進入同步代碼塊,但是調(diào)用了wait()函數(shù)之后,線程A就掛起了。但是B線程卻可以正常運行。這說明即使A線程調(diào)用了wait(),函數(shù)沒有退出run()但是A線程還是放棄了鎖,并且被B線程獲得。此時object.notify()運行,卻沒有讓A線程立即恢復,只有當B線程休眠結束并且退出同步代碼塊,A線程才能繼續(xù)運行,這就解釋了上面的問題,notify()執(zhí)行之后沒有立刻釋放鎖,只能等待解釋同步代碼塊。
調(diào)用wait()方法的時候一定是獲得了同步鎖的,如果沒有在synchronized塊中調(diào)用wait()方法將拋出異常。

使用Lock 和 Condition

個人認為LockCondition是比設計在Object上的wait()notify()更容易理解的api,所有使用wait&notify的地方都還可以使用Lock&Condition處理。

生產(chǎn)者消費者問題

生產(chǎn)著消費者問題的場景是:消費者消費生產(chǎn)者生產(chǎn)出并且放在隊列里面的產(chǎn)品,如果產(chǎn)品用完了消費者需要等待,如果隊列滿了,生產(chǎn)者等待。

  1. 先使用wait&notify完成:
public class ProducerAndConsumer1 {


    private static final Queue<Content> contents = new LinkedList<>();
    static class Content{
        private String start;
        private String end;
        private String[] places = {"Shanghai", "Wuhan", "GuangZhou", "Hangzhou"};

        public Content(){
            int index = new Random().nextInt(places.length);
            this.start = this.end = places[index];
        }
        public String toString(){
            return " start " + start + " end " + end;
        }
    }


    @SuppressWarnings("Duplicates")
    static class Producer implements Runnable{

        private int maxCount;

        public Producer(int maxCount){
            this.maxCount = maxCount;
        }

        public void run(){
            while(true){
                synchronized (contents){
                  //使用while + wait的語義: 判斷是否還要繼續(xù)等待
                    while(contents.size() == maxCount){
                        System.out.println("The queue is full");
                        try {
                            contents.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    try {
                        //模擬生產(chǎn)
                        TimeUnit.MILLISECONDS.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    Content content = new Content();
                    contents.add(content);
                    System.out.println("produced "+content);
                    contents.notifyAll();
                }
            }
        }
    }

    @SuppressWarnings("Duplicates")
    static class Consumer implements  Runnable{
        private int maxCount;
        public Consumer(int maxCount){
            this.maxCount = maxCount;
        }

        public void run(){
            while(true){
                synchronized (contents){
                    //使用while + wait的語義: 判斷是否還要繼續(xù)等待
                    while(contents.size() == 0){
                        System.out.println("The queue is empty");
                        try {
                            contents.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Content content =contents.poll();
                    try {
                        //模擬消費
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("consumed " + content);
                    contents.notifyAll();
                }
            }
        }
    }


    public static void main(String args[]) {
        ExecutorService service = Executors.newCachedThreadPool();
        int maxCount = 5;
        //3個生產(chǎn)者 3個消費者

        for (int i = 0; i < 3 ; i++) {
            service.execute(new Producer(maxCount));
            service.execute(new Consumer(maxCount));
        }

    }
}
  1. 使用Lock&Condition解決
public class ProducerAndConsumer2 {

    private static final Queue<Content> contents = new LinkedList<>();

    private static final Lock lock = new ReentrantLock();
    private static final Condition fullQueue = lock.newCondition();
    private static final Condition emptyQueue = lock.newCondition();

    static class Content{
        private String start;
        private String end;
        private String[] places = {"Shanghai", "Wuhan", "GuangZhou", "Hangzhou"};

        public Content(){
            int index = new Random().nextInt(places.length);
            this.start = this.end = places[index];
        }
        public String toString(){
            return " start " + start + " end " + end;
        }
    }


    static class Producer implements Runnable{

        private int maxCount;

        public Producer(int maxCount){
            this.maxCount = maxCount;
        }

        public void run() {
            while (true) {
                lock.lock();
                //使用while + wait的語義: 判斷是否還要繼續(xù)等待
                while (contents.size() == maxCount) {
                    System.out.println("The queue is full");
                    try {
                        fullQueue.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    //模擬生產(chǎn)
                    TimeUnit.MILLISECONDS.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                Content content = new Content();
                contents.add(content);
                System.out.println("produced " + content);
                fullQueue.signalAll();
                emptyQueue.signalAll();

                lock.unlock();
            }

        }
    }

    @SuppressWarnings("Duplicates")
    static class Consumer implements  Runnable{
        private int maxCount;
        public Consumer(int maxCount){
            this.maxCount = maxCount;
        }

        public void run() {
            while (true) {
                lock.lock();
                //使用while + wait的語義: 判斷是否還要繼續(xù)等待
                while (contents.isEmpty()) {
                    System.out.println("The queue is empty");
                    try {
                        emptyQueue.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Content content = contents.poll();
                try {
                    //模擬消費
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("consumed " + content);
                fullQueue.signalAll();
                emptyQueue.signalAll();

                lock.unlock();
            }
        }
    }

Lock&Condition的用法和上文中相同。

  1. 使用阻塞隊列來完成生產(chǎn)著消費者問題
    使用阻塞隊列能夠很好地幫我們托管同步的問題:
public class ProducerAndConsumer {


    private static final int maxCount = 10;
    private static final BlockingQueue<Content> queue = new LinkedBlockingDeque<>(maxCount);

    static class Content {
        private String start;
        private String end;
        private String[] places = {"Shanghai", "Wuhan", "GuangZhou", "Hangzhou"};

        public Content() {
            int index = new Random().nextInt(places.length);
            this.start = this.end = places[index];
        }

        public String toString() {
            return " start " + start + " end " + end;
        }
    }


    @SuppressWarnings("Duplicates")
    static class Producer implements Runnable {

        public void run() {
            while (true) {
                try {
                    //模擬生產(chǎn)
                    TimeUnit.MILLISECONDS.sleep(1000);
                    Content content = new Content();
                    queue.put(content);
                    System.out.println("produced " + content);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

        }
    }

    @SuppressWarnings("Duplicates")
    static class Consumer implements Runnable {
        public void run() {
            while (true) {

                try {
                    //模擬消費
                    TimeUnit.MILLISECONDS.sleep(500);
                    Content content = queue.take();
                    System.out.println("consumed " + content);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }
    }

    public static void main(String args[]) {
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < 5 ; i++) {
            service.execute(new Producer());
            service.execute(new Consumer());
        }
    }
}

解決哲學家就餐問題

哲學家就餐問題的一種解法是,可以讓最后一個人拿起的筷子固定就可以解決:

public class DeadLockTest {
    //通過哲學家問題演示一個思索的情況

    public static  class Chopstick {
        private boolean taken = false;

        public synchronized void take() throws InterruptedException {
            //反復檢查是否已經(jīng)被拿走 如果拿走,就算了
            while (taken) {
                wait();
            }
            taken = true;
        }

        public synchronized void drop(){
            taken = false;
            notifyAll();
        }
    }

    public static class Philosopher implements Runnable {

        private Chopstick left;
        private Chopstick right;

        private final int id;
        private final int ponderFactor;
        private Random rand = new Random(47);

        public Philosopher(Chopstick left, Chopstick right, int ident, int ponder) {
            this.ponderFactor = ponder;
            this.left = left;
            this.right = right;
            id = ident;
        }

        public void pause() throws InterruptedException {
            if (ponderFactor == 0) return;
            TimeUnit.MILLISECONDS.sleep(rand.nextInt(ponderFactor * 250));
        }

        public String toString(){
            return "Philosopher" + id;
        }
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    System.out.println(this + " " + "thinking");
                    pause();
                    //哲學家開始就餐
                    System.out.println(this + " " + "grabbing right");
                    right.take();

                    System.out.println(this + " " + "grabbing left" );
                    left.take();

                    System.out.println(this + " " + "eating");
                    pause();

                    right.drop();
                    left.drop();
                }
            } catch (InterruptedException e) {
                System.out.println(this + " " + "exiting via interrupt");
            }
        }
    }

    public static void main(String args[]) throws InterruptedException {
        int ponder = 0;
        int size = 5;
        ExecutorService exec = Executors.newCachedThreadPool();
        Chopstick[] chopsticks = new Chopstick[size];
        for (int i = 0; i < size ; i++) {
            chopsticks[i] = new Chopstick();
        }

        for (int i = 0; i < size ; i++) {
            //會發(fā)生死鎖
//            exec.execute(new Philosopher(chopsticks[i], chopsticks[(i +1) % size], i , ponder));
            //死鎖的解決方式
            if(i < (size - 1)){
                exec.execute(new Philosopher(chopsticks[i], chopsticks[(i +1) % size], i , ponder));
            }else{
                exec.execute(new Philosopher(chopsticks[0], chopsticks[i], i , ponder));
            }
        }

        //如果發(fā)生死鎖就回卡??!
        TimeUnit.SECONDS.sleep(30);
        exec.shutdownNow();
    }
}

參考內(nèi)容
使用線程間通信機制解決問題
Java 中線程間通信機制
阻塞隊列

讀 《Thinking in Java》有感,遂記之

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容