一文詳解wait與notify

內(nèi)容簡(jiǎn)介

本章需要重點(diǎn)掌握的技術(shù)點(diǎn)如下:
1)使用wait/notify實(shí)現(xiàn)線程間的通信。
2)生產(chǎn)者/消費(fèi)者模式的實(shí)現(xiàn)。
3)方法join的使用。
4)ThreadLocal類的使用。

3.1 等待/通知機(jī)制

3.1.1 不使用等待/通知機(jī)制實(shí)現(xiàn)線程間的通信

public class MyList {
    private List list = new ArrayList();
    public void add(){
        list.add("");
    }
    public int size(){
        return list.size();
    }
}
public class ThreadA extends Thread {
    private MyList myList;
    public ThreadA(MyList myList) {
        this.myList = myList;
    }
    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                myList.add();
                System.out.println("添加了" + (i + 1) + "個(gè)元素");
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class ThreadB extends Thread {

    private volatile MyList myList;

    public ThreadB(MyList myList) {
        this.myList = myList;
    }

    @Override
    public void run() {
        try {
            while (true) {
                // System.out.print("");
                if (myList.size() == 5) {
                    System.out.println("==5,線程b退出來");
                    throw new InterruptedException();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class Run {
    public static void main(String[] args) {
        MyList myList = new MyList();
        ThreadA a = new ThreadA(myList);
        a.setName("a");
        a.start();
        ThreadB b = new ThreadB(myList);
        b.setName("b");
        b.start();
    }
}
添加了1個(gè)元素
添加了2個(gè)元素
添加了3個(gè)元素
添加了4個(gè)元素
添加了5個(gè)元素
==5,線程b退出來
java.lang.InterruptedException
    at part6.ThreadB.run(ThreadB.java:18)
添加了6個(gè)元素
添加了7個(gè)元素
添加了8個(gè)元素
添加了9個(gè)元素
添加了10個(gè)元素

雖然兩個(gè)線程實(shí)現(xiàn)了通信,但是是通過while語句輪詢檢測(cè)實(shí)現(xiàn)的,輪詢時(shí)間過小浪費(fèi)CPU資源,輪詢時(shí)間過大有可能會(huì)取不到想要的數(shù)據(jù),這時(shí)候就可以用到wait/notify機(jī)制。

3.1.3 等待/通知機(jī)制的實(shí)現(xiàn)

wait()的作用是使當(dāng)前執(zhí)行代碼的線程進(jìn)行等待。wait()是Object類的方法, 該方法用來將當(dāng)前線程置入“預(yù)執(zhí)行隊(duì)列”中,在調(diào)用wait()所在代碼行處停止執(zhí)行,直到接到通知或被中斷 。在調(diào)用wait()之前,線程必須獲取該對(duì)象的對(duì)象級(jí)別鎖,只能在同步方法或同步代碼塊中調(diào)用wait()方法。執(zhí)行wait()之后,當(dāng)前線程釋放鎖。如果調(diào)用wait()時(shí)沒有持有適當(dāng)?shù)逆i,則會(huì)拋出IllegalMonitorStateException。

方法notify(),也要在同步方法或同步代碼塊中調(diào)用,用來通知那些等待該對(duì)象鎖的其他線程,如果多個(gè)線程等待,則由線程規(guī)劃器隨機(jī)挑選一個(gè)呈wait狀態(tài)的線程,對(duì)其發(fā)起通知。在執(zhí)行notify()方法后,當(dāng)前線程不會(huì)馬上釋放對(duì)象鎖,當(dāng)前線程釋放該對(duì)象鎖之后,呈wait狀態(tài)的線程才可以獲取鎖。如果調(diào)用notify()時(shí)沒有持有適當(dāng)?shù)逆i,則會(huì)拋出IllegalMonitorStateException。

public class Test {
    public static void main(String[] args) throws InterruptedException {
        String lock = new String();
        System.out.println("sync上面");
        synchronized (lock){
            System.out.println("sync第一行");
            lock.wait();
            System.out.println("wait下面");
        }
        System.out.println("end");
    }
}
sync上面
sync第一行

根據(jù)上面的執(zhí)行結(jié)果來看,線程進(jìn)入wait狀態(tài),不會(huì)繼續(xù)向下運(yùn)行,讓它繼續(xù)執(zhí)行,可以使用notify()notifyAll()

public class Mythread1 extends Thread {
    private Object lock;
    public Mythread1(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        try {
            synchronized (lock) {
                System.out.println("start wait time =" + System.currentTimeMillis());
                lock.wait();
                System.out.println("end wait time   =" + System.currentTimeMillis());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class Mythread2 extends Thread {

    private Object lock;

    public Mythread2(Object lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        synchronized (lock) {
            System.out.println("start notify time =" + System.currentTimeMillis());
            lock.notify();
            System.out.println("end   notify time =" + System.currentTimeMillis());
        }
    }
}
public class Run {

    public static void main(String[] args) throws InterruptedException {
        Object lock = new Object();
        Mythread1 mythread1 = new Mythread1(lock);
        mythread1.start();
        Thread.sleep(3000);
        Mythread2 mythread2 = new Mythread2(lock);
        mythread2.start();
    }
}
start wait time =1588821820237
start notify time =1588821823237
end   notify time =1588821823237
end wait time   =1588821823237

從上面打印信息來看,Mythread1先執(zhí)行進(jìn)入wait狀態(tài),3秒后被notify喚醒。

public class MyList {
    private static List list = new ArrayList();
    public static void add() {
        list.add("");
    }
    public static int size() {
        return list.size();
    }
}
public class ThreadA extends Thread {
    private Object lock;
    public ThreadA(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        try {
            synchronized (lock) {
                if (MyList.size() != 5) {
                    System.out.println("wait begin " + System.currentTimeMillis());
                    lock.wait();
                    System.out.println("wait   end " + System.currentTimeMillis());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class ThreadB extends Thread {
    private Object lock;
    public ThreadB(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        try {
            synchronized (lock) {
                for (int i = 0; i < 10; i++) {
                    MyList.add();
                    System.out.println("添加了" + (i + 1) + "個(gè)元素");
                    if (MyList.size() == 5) {
                        lock.notify();
                        System.out.println("notify end");
                    }
                    Thread.sleep(1000);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class Run {
    public static void main(String[] args) throws InterruptedException {
        Object object = new Object();
        ThreadA a = new ThreadA(object);
        a.start();
        Thread.sleep(50);
        ThreadB b = new ThreadB(object);
        b.start();
    }
}
wait begin 1588829402319
添加了1個(gè)元素
添加了2個(gè)元素
添加了3個(gè)元素
添加了4個(gè)元素
添加了5個(gè)元素
notify end
添加了6個(gè)元素
添加了7個(gè)元素
添加了8個(gè)元素
添加了9個(gè)元素
添加了10個(gè)元素
wait end 1588829412382

日志wait end...在最后輸出,也驗(yàn)證了我們之前的結(jié)論:noitfy()方法執(zhí)行完之后不會(huì)立即釋放鎖。

synchronized可以將任何一個(gè)Object對(duì)象作為同步對(duì)象來對(duì)待,而Java為每個(gè)Object都實(shí)現(xiàn)了wait()notify(),他們必須用在被synchronized同步的Object的臨界區(qū)內(nèi)。
wait()可以使處于臨界區(qū)內(nèi)的線程進(jìn)入等待狀態(tài),同時(shí)釋放被同步的對(duì)象的鎖。
notify()可以喚醒一個(gè)因調(diào)用了wait()操作而處于阻塞狀態(tài)中的線程進(jìn)入就緒狀態(tài)。被重新喚醒的線程會(huì)試圖重新獲取臨界區(qū)的控制權(quán)。

線程狀態(tài)切換示意圖

1)new Thread()后,調(diào)用start(),系統(tǒng)會(huì)為此線程分配CPU資源,使其處于Runnable狀態(tài),當(dāng)線程搶占到CPU資源,此線程就處于Running狀態(tài)。
2)Runnable狀態(tài)和Runnable可相互切換,線程在Running狀態(tài)時(shí),調(diào)用yield()使當(dāng)前線程重新回到Runnable狀態(tài)。
3)Blocked是線程因?yàn)槟撤N原因放棄CPU使用權(quán),暫時(shí)停止運(yùn)行。直到線程進(jìn)入就緒狀態(tài),才有機(jī)會(huì)轉(zhuǎn)到運(yùn)行狀態(tài)。

線程進(jìn)入Runnable狀態(tài)大致有以下5中情況:
1.調(diào)用sleep()方法后經(jīng)過的時(shí)間查過了指定的休眠時(shí)間。
2.線程調(diào)用的阻塞IO已經(jīng)返回,阻塞方法執(zhí)行完畢。
3.線程成功獲得視圖同步的監(jiān)視器。
4.線程正在等待某個(gè)通知,其他線程發(fā)出了通知。
5.處于掛起狀態(tài)的線程調(diào)用了resume()。
線程進(jìn)入Blocked狀態(tài)大致有以下5中情況:
1.線程調(diào)用sleep(),主動(dòng)放棄占用的處理器資源。
2.線程調(diào)用的阻塞IO方法,在該方法返回前,該線程被阻塞。
3.線程視圖獲得一個(gè)同步監(jiān)視器,但同步監(jiān)視器正被其他線程持有。
4.線程等待某個(gè)通知。
5.調(diào)用suspend()將該線程掛起。

3.1.4 方法wait()鎖釋放與notify()鎖不釋放

下面我們進(jìn)行試驗(yàn)wait鎖釋放,代碼如下:

public class Service {
    public void testMethod(Object lock){
        try {
            synchronized (lock){
                System.out.println("begin wait()");
                lock.wait();
                System.out.println("  end wait()");
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
public class ThreadA extends Thread {
    private Object lock;
    public ThreadA(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        Service service = new Service();
        service.testMethod(lock);
    }
}
public class ThreadB extends Thread {
    private Object lock;
    public ThreadB(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        Service service = new Service();
        service.testMethod(lock);
    }
}
public class Test {
    public static void main(String[] args) {
        Object lock = new Object();
        ThreadA a = new ThreadA(lock);
        a.start();
        ThreadB b = new ThreadB(lock);
        b.start();
    }
}

執(zhí)行結(jié)果如下:

begin wait()
begin wait()
結(jié)論:當(dāng)wait()被執(zhí)行后,鎖被自動(dòng)釋放。

下面來驗(yàn)證:notify()被執(zhí)行后不釋放鎖。

public class Service {

    public void testMethod(Object lock) {
        try {
            synchronized (lock) {
                System.out.println("begin wait() ThreadName=" + Thread.currentThread().getName() + " time=" + System.currentTimeMillis());
                lock.wait();
                System.out.println("  end wait() ThreadName=" + Thread.currentThread().getName() + " time=" + System.currentTimeMillis());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void synNotifyMethod(Object lock) {
        try {
            synchronized (lock) {
                System.out.println("begin notify() ThreadName=" + Thread.currentThread().getName() + "  time=" + System.currentTimeMillis() + " time=" + System.currentTimeMillis());
                lock.notify();
                Thread.sleep(5000);
                System.out.println("  end notify() ThreadName=" + Thread.currentThread().getName() + "  time=" + System.currentTimeMillis() + " time=" + System.currentTimeMillis());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class ThreadA extends Thread {
    private Object lock;
    public ThreadA(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        Service service = new Service();
        service.testMethod(lock);
    }
}
public class NotifyThread extends Thread {
    private Object lock;
    public NotifyThread(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        Service service = new Service();
        service.synNotifyMethod(lock);
    }
}
public class SynNotifyMethodThread extends Thread {
    private Object lock;
    public SynNotifyMethodThread(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        Service service = new Service();
        service.synNotifyMethod(lock);
    }
}
public class Test {

    public static void main(String[] args) {
        Object lock = new Object();
        ThreadA a = new ThreadA(lock);
        a.start();
        NotifyThread notifyThread = new NotifyThread(lock);
        notifyThread.start();
        SynNotifyMethodThread syn = new SynNotifyMethodThread(lock);
        syn.start();
    }
}
begin wait() ThreadName=Thread-0 time=1590474000435
begin notify() ThreadName=Thread-2  time=1590474000436 time=1590474000436
  end notify() ThreadName=Thread-2  time=1590474005436 time=1590474005436
  end wait() ThreadName=Thread-0 time=1590474005436
begin notify() ThreadName=Thread-1  time=1590474005436 time=1590474005436
  end notify() ThreadName=Thread-1  time=1590474010436 time=1590474010436
結(jié)論:必須執(zhí)行完notify方法所在的同步synchronized代碼塊之后才釋放鎖。

3.1.5 當(dāng)interrupt方法遇到wait方法

當(dāng)線程呈wait狀態(tài)時(shí),調(diào)用線程對(duì)象的interrupt方法時(shí),會(huì)出現(xiàn)InterruptException異常。
public class Service {

    public void testMethod(Object lock){
        try {
            synchronized (lock){
                System.out.println("begin wait");
                lock.wait();
                System.out.println("  end wait");
            }
        }catch (Exception e){

        }
    }
}
public class ThreadA extends Thread {
    private Object lock;
    public ThreadA(Object lock){
        this.lock = lock;
    }
    @Override
    public void run() {
        Service service = new Service();
        service.testMethod(lock);
    }
}
public class Test {
    public static void main(String[] args) {
        try {
            Object lock = new Object();
            ThreadA a = new ThreadA(lock);
            a.start();
            Thread.sleep(5000);
            a.interrupt();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
begin wait
java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)

結(jié)論:
執(zhí)行同步代碼塊的過程中,執(zhí)行鎖所屬對(duì)象的wait()方法后,這個(gè)線程會(huì)釋放對(duì)象鎖,而此線程對(duì)象會(huì)進(jìn)入線程等待池中,等待被喚醒,如果在這個(gè)過程中斷該線程時(shí),會(huì)出現(xiàn)InterruptException異常。

3.1.6 只通知一個(gè)線程與通知所有線程

下面來驗(yàn)證:調(diào)用notify()時(shí),一次只隨機(jī)通知一個(gè)線程進(jìn)行喚醒。

public class Service extends Thread {

    public void testMethod(Object lock) {
        try {
            synchronized (lock) {
                System.out.println("begin wait ThreadName=" + Thread.currentThread().getName());
                lock.wait();
                System.out.println("  end wait ThreadName=" + Thread.currentThread().getName());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class ThreadA extends Thread {
    private Object lock;
    public ThreadA(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        Service service = new Service();
        service.testMethod(lock);
    }
}
public class ThreadB extends Thread {
    private Object lock;
    public ThreadB(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        Service service = new Service();
        service.testMethod(lock);
    }
}
public class ThreadC extends Thread {
    private Object lock;
    public ThreadC(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        Service service = new Service();
        service.testMethod(lock);
    }
}
public class NotifyThread extends Thread {
    private Object lock;
    public NotifyThread(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        synchronized (lock) {
            lock.notify();
        }
    }
}
public class Test {

    public static void main(String[] args) throws InterruptedException {
        Object lock = new Object();
        ThreadA a = new ThreadA(lock);
        a.start();
        ThreadB b = new ThreadB(lock);
        b.start();
        ThreadC c = new ThreadC(lock);
        c.start();
        Thread.sleep(1000);
        NotifyThread notifyThread = new NotifyThread(lock);
        notifyThread.start();
    }
}
begin wait ThreadName=Thread-0
begin wait ThreadName=Thread-3
begin wait ThreadName=Thread-1
  end wait ThreadName=Thread-0
結(jié)論:調(diào)用notify方法后,隨機(jī)通知一個(gè)等待該對(duì)象的對(duì)象鎖的其他線程。

那么如何一次性喚醒所有等待該對(duì)象的對(duì)象鎖的其他線程呢?修改NotifyThread.java代碼如下:

public class NotifyThread extends Thread {
    private Object lock;
    public NotifyThread(Object lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        synchronized (lock) {
            lock.notifyAll();
        }
    }
}
begin wait ThreadName=Thread-0
begin wait ThreadName=Thread-1
begin wait ThreadName=Thread-2
  end wait ThreadName=Thread-2
  end wait ThreadName=Thread-1
  end wait ThreadName=Thread-0

3.1.8 方法wait(long)的使用

wait(long)方法的功能是等待某一時(shí)間內(nèi)是否有線程對(duì)鎖進(jìn)行喚醒,如果超過這個(gè)時(shí)間則自動(dòng)喚醒。
public class MyRunnable {
    static private Object lock = new Object();
    static private Runnable runnable = new Runnable() {
        public void run() {
            try {
                synchronized (lock) {
                    System.out.println("wait begin timer=" + System.currentTimeMillis());
                    lock.wait(5000);
                    System.out.println("wait   end timer=" + System.currentTimeMillis());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    };
    static private Runnable runnable2 = new Runnable() {
        public void run() {
            synchronized (lock){
                System.out.println("notify begin timer=" + System.currentTimeMillis());
                lock.notify();
                System.out.println("notify   end timer=" + System.currentTimeMillis());
            }
        }
    };
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(runnable);
        t.start();
    }
}
wait begin timer=1590475975460
wait   end timer=1590475980460

3.1.9 等待wait的條件發(fā)生變化。

在使用wait/notify模式時(shí),還需要注意另一正情況,也就是wait的等待條件發(fā)生變化,容易造成程序邏輯的混亂。

public class ValueObject {
    public static List<String> list = new ArrayList<String>();
}
public class Add {
    private String lock;
    public Add(String lock) {
        this.lock = lock;
    }
    public void add() {
        synchronized (lock) {
            ValueObject.list.add("lalala");
            lock.notifyAll();
        }
    }
}
public class AddThread extends Thread {
    public Add add;

    public AddThread(Add add) {
        this.add = add;
    }
    @Override
    public void run() {
        add.add();
    }
}
public class Subtract {
    private String lock;

    public Subtract(String lock) {
        this.lock = lock;
    }
    public void subtract() {
        try {
            synchronized (lock) {
                if (ValueObject.list.size() == 0) {
                    System.out.println("wait begin ThreadName=" + Thread.currentThread().getName());
                    lock.wait();
                    System.out.println("wait   end ThreadName=" + Thread.currentThread().getName());
                }
                System.out.println("do remove threadName=" + Thread.currentThread().getName());
                ValueObject.list.remove(0);
                System.out.println("do remove end threadName=" + Thread.currentThread().getName());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class SubtractThread extends Thread {
    public Subtract subtract;
    public SubtractThread(Subtract subtract) {
        this.subtract = subtract;
    }
    @Override
    public void run() {
        subtract.subtract();
    }
}
public class Run {

    public static void main(String[] args) throws InterruptedException {
        String lock = "";
        Add add = new Add(lock);
        Subtract subtract = new Subtract(lock);
        SubtractThread subtractThread = new SubtractThread(subtract);
        subtractThread.setName("sub1");
        subtractThread.start();

        SubtractThread subtractThread2 = new SubtractThread(subtract);
        subtractThread2.setName("sub2");
        subtractThread2.start();
        Thread.sleep(1000);
        AddThread addThread = new AddThread(add);
        addThread.setName("add");
        addThread.start();
    }
}
wait begin ThreadName=sub2
wait begin ThreadName=sub1
wait   end ThreadName=sub1
do remove threadName=sub1
do remove end threadName=sub1
wait   end ThreadName=sub2
do remove threadName=sub2
Exception in thread "sub2" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.remove(ArrayList.java:496)

出現(xiàn)上述問題的原因是Thread.sleep()之前,sub1,sub2線程都都執(zhí)行了wait方法,進(jìn)入WAITING狀態(tài),在add線程執(zhí)行完成之后通過notifyAdd()喚醒兩個(gè)線程,在一個(gè)線程執(zhí)行完remove之后,list的size為零,這時(shí)候,另一個(gè)線程再進(jìn)行remove操作的時(shí)候就拋出了IndexOutOfBoundsException。
那么下面我們修改Subtract.java代碼如下:

public class Subtract {
    private String lock;

    public Subtract(String lock) {
        this.lock = lock;
    }
    public void subtract() {
        try {
            synchronized (lock) {
                while (ValueObject.list.size() == 0) {
                    System.out.println("wait begin ThreadName=" + Thread.currentThread().getName());
                    lock.wait();
                    System.out.println("wait   end ThreadName=" + Thread.currentThread().getName());
                }
                System.out.println("do remove threadName=" + Thread.currentThread().getName());
                ValueObject.list.remove(0);
                System.out.println("do remove end threadName=" + Thread.currentThread().getName());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
wait begin ThreadName=sub1
wait begin ThreadName=sub2
wait   end ThreadName=sub2
do remove threadName=sub2
do remove end threadName=sub2
wait   end ThreadName=sub1
wait begin ThreadName=sub1

通過上面打印信息可以看出,當(dāng)兩個(gè)線程同時(shí)被喚醒之后,兩個(gè)SubtractThread線程其中一個(gè)執(zhí)行完remove之后,另一個(gè)線程執(zhí)行完wait()方法之后的代碼,因滿足ValueObject.list.size() == 0條件,重新進(jìn)入WAITING狀態(tài)。

3.1.11 生產(chǎn)者/消費(fèi)者模式的實(shí)現(xiàn)。

等待/通知模式的經(jīng)典案例就是生產(chǎn)者/消費(fèi)者模式。

3.1.11.1 一生產(chǎn)與一消費(fèi):操作值

public class ValueObject {
    public static String value = "";
}
public class Producer {
    private String lock;
    public Producer(String lock) {
        this.lock = lock;
    }
    public void setValue() {
        try {
            synchronized (lock){
                if (!"".equals(ValueObject.value)) {
                    lock.wait();
                }
                String value = System.currentTimeMillis() + "_" + System.nanoTime();
                System.out.println("put value = " + value);
                ValueObject.value = value;
                lock.notify();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class ProducerThread extends Thread {
    private Producer producer;
    public ProducerThread(Producer producer) {
        this.producer = producer;
    }
    @Override
    public void run() {
        while (true) {
            producer.setValue();
        }
    }
}
public class Consumer {
    private String lock;
    public Consumer(String lock) {
        this.lock = lock;
    }
    public void getValue() {
        synchronized (lock) {
            if ("".equals(ValueObject.value)) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("get Value = " + ValueObject.value);
            ValueObject.value = "";
            lock.notify();
        }
    }
}

public class ConsumerThread extends Thread {
    private Consumer consumer;
    public ConsumerThread(Consumer consumer) {
        this.consumer = consumer;
    }
    @Override
    public void run() {
        while (true) {
            consumer.getValue();
        }
    }
}
public class Run {
    public static void main(String[] args) {
        String lock = "";
        Producer producer = new Producer(lock);
        Consumer consumer = new Consumer(lock);
        ProducerThread producerThread = new ProducerThread(producer);
        ConsumerThread consumerThread = new ConsumerThread(consumer);
        producerThread.start();
        consumerThread.start();
    }
}
put value = 1590487918891_263135745700279
get Value = 1590487918891_263135745700279

put value = 1590487918891_263135745708299
get Value = 1590487918891_263135745708299

put value = 1590487918891_263135745716318
get Value = 1590487918891_263135745716318

put value = 1590487918891_263135745729149
get Value = 1590487918891_263135745729149

put value = 1590487918891_263135745736848
get Value = 1590487918891_263135745736848

put value = 1590487918891_263135745744547
get Value = 1590487918891_263135745744547

···

當(dāng)前只是一個(gè)生產(chǎn)者與一個(gè)消費(fèi)者進(jìn)行的數(shù)據(jù)交互。但是如果在此實(shí)驗(yàn)的基礎(chǔ)上,出現(xiàn)多個(gè)生產(chǎn)與消費(fèi)者,那么在運(yùn)行的時(shí)候極有可能出現(xiàn)“假死”的情況,也就是所有的線程都呈WAITING狀態(tài)。

3.1.11.2 多生產(chǎn)與多消費(fèi):假死

public class Producer {
    private Object lock;
    private List<String> list;
    public Producer(Object lock, List<String> list) {
        this.lock = lock;
        this.list = list;
    }
    public void setValue() {
        try {
            synchronized (lock) {
                if (list.size() != 0) {
                    System.out.println("生產(chǎn)者 :" + Thread.currentThread().getName() + " 處于WAITING");
                    lock.wait();
                }
                String value = System.currentTimeMillis() + "_" + System.nanoTime();
                System.out.println("生產(chǎn)者 :" + Thread.currentThread().getName() + " 處于RUNNABLE");
                list.add(value);
                lock.notify();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class ProducerThread extends Thread {
    private Producer producer;
    public ProducerThread(Producer producer) {
        this.producer = producer;
    }
    @Override
    public void run() {
        while (true) {
            producer.setValue();
        }
    }
}
public class Consumer {
    private Object lock;
    private List<String> list;
    public Consumer(Object lock, List<String> list) {
        this.lock = lock;
        this.list = list;
    }
    public void getValue() {
        try {
            synchronized (lock) {
                if (list.size() == 0) {
                    System.out.println("消費(fèi)者 :" + Thread.currentThread().getName() + " 處于WAITING");
                    lock.wait();
                }
                System.out.println("消費(fèi)者 :" + Thread.currentThread().getName() + " 處于RUNNABLE");
                list.remove(0);
                lock.notify();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class ConsumerThread extends Thread {
    private Consumer consumer;
    public ConsumerThread(Consumer consumer) {
        this.consumer = consumer;
    }
    @Override
    public void run() {
        while (true) {
            consumer.getValue();
        }
    }
}
public class Run {
    public static void main(String[] args) throws InterruptedException {
        Object lock = new Object();
        List<String> list = new ArrayList<String>();
        Consumer consumer = new Consumer(lock, list);
        Producer producer = new Producer(lock, list);
        ConsumerThread[] consumerThread = new ConsumerThread[2];
        ProducerThread[] producerThread = new ProducerThread[2];
        for (int i = 0; i < 2; i++) {
            consumerThread[i] = new ConsumerThread(consumer);
            consumerThread[i].setName("消費(fèi)者" + (i + 1));
            producerThread[i] = new ProducerThread(producer);
            producerThread[i].setName("生產(chǎn)者" + (i + 1));
            consumerThread[i].start();
            producerThread[i].start();
        }
        Thread.sleep(5000);
        // Thread.currentThread().getThreadGroup().activeCount() 活動(dòng)線程數(shù)
        Thread[] threadArray = new Thread[Thread.currentThread().getThreadGroup().activeCount()];
        // 每個(gè)活動(dòng)線程的線程組及其子組復(fù)制到指定的數(shù)組中
        Thread.currentThread().getThreadGroup().enumerate(threadArray);
        for (Thread thread : threadArray) {
            System.out.println(thread.getName() + " " + thread.getState());
        }
    }
}
···
生產(chǎn)者 :生產(chǎn)者2 處于RUNNABLE
生產(chǎn)者 :生產(chǎn)者2 處于WAITING
生產(chǎn)者 :生產(chǎn)者1 處于WAITING
消費(fèi)者 :消費(fèi)者1 處于RUNNABLE
消費(fèi)者 :消費(fèi)者1 處于WAITING
消費(fèi)者 :消費(fèi)者2 處于RUNNABLE
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.remove(ArrayList.java:496)
    at part17.Consumer.getValue(Consumer.java:20)
    at part17.ConsumerThread.run(ConsumerThread.java:11)
消費(fèi)者 :消費(fèi)者2 處于WAITING
main RUNNABLE
Monitor Ctrl-Break RUNNABLE
消費(fèi)者1 WAITING
生產(chǎn)者1 WAITING
消費(fèi)者2 WAITING
生產(chǎn)者2 WAITING

出現(xiàn)都在等待狀態(tài)的原因是:代碼中確實(shí)已經(jīng)通過notify/wait進(jìn)行通信了,但不保證notify喚醒的也許是同類(生產(chǎn)者喚醒生產(chǎn)者、消費(fèi)者喚醒消費(fèi)者),如果是按照這種情況運(yùn)行,那么就會(huì)造成該情況的出現(xiàn)。解決方案就是使用notifyAll方法即可。

3.1.11.3 一生產(chǎn)與一消費(fèi):操作棧

生產(chǎn)者向堆棧List對(duì)象中放入數(shù)據(jù),使消費(fèi)者從List堆棧中取出數(shù)據(jù)。

public class MyStack {
    private List<String> list = new ArrayList();

    public synchronized void push() {
        System.out.println("do push threadName=" + Thread.currentThread().getName());
        try {
            if (list.size() == 1) {
                wait();
            }
            list.add("anyString=" + Math.random());
            notify();
            System.out.println("push size = " + list.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public synchronized String pop() {
        String returnValue = "";
        System.out.println("do pop threadName=" + Thread.currentThread().getName());
        try {
            if (list.size() == 0) {
                System.out.println("pop操作中的:" + Thread.currentThread().getName() + "線程呈WAITING");
                wait();
            }
            returnValue = "" + list.get(0);
            list.remove(0);
            notify();
            System.out.println("pop=" + list.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return returnValue;
    }
}
public class Producer {
    private MyStack myStack;
    public Producer(MyStack myStack) {
        this.myStack = myStack;
    }
    public void pushService() {
        myStack.push();
    }
}
public class ProducerThread extends Thread {
    private Producer producer;
    public ProducerThread(Producer producer) {
        this.producer = producer;
    }
    @Override
    public void run() {
        while (true) {
            producer.pushService();
        }
    }
}
public class Consumer {
    private MyStack myStack;
    public Consumer(MyStack myStack) {
        this.myStack = myStack;
    }
    public void popService() {
        myStack.pop();
    }
}
public class ConsumerThread extends Thread {
    private Consumer consumer;
    public ConsumerThread(Consumer consumer) {
        this.consumer = consumer;
    }
    @Override
    public void run() {
        while (true){
            consumer.popService();
        }
    }
}
public class Run {
    public static void main(String[] args) {
        MyStack myStack = new MyStack();
        Producer producer = new Producer(myStack);
        Consumer consumer = new Consumer(myStack);
        ProducerThread producerThread = new ProducerThread(producer);
        ConsumerThread consumerThread = new ConsumerThread(consumer);
        producerThread.start();
        consumerThread.start();
    }
}
···
do push threadName=Thread-0
push size = 1
do push threadName=Thread-0
do pop threadName=Thread-1
pop=0
do pop threadName=Thread-1
pop操作中的:Thread-1線程呈WAITING
push size = 1
do push threadName=Thread-0
pop=0
do pop threadName=Thread-1
pop操作中的:Thread-1線程呈WAITING
push size = 1
do push threadName=Thread-0
pop=0
do pop threadName=Thread-1
···

3.1.11.4 一生產(chǎn)與多消費(fèi)-操作棧:解決wait條件改變與假死

使用上一節(jié)的代碼,修改Run方法如下:

public class Run {
    public static void main(String[] args) {
        MyStack myStack = new MyStack();
        Producer producer = new Producer(myStack);
        Consumer consumer1 = new Consumer(myStack);
        Consumer consumer2 = new Consumer(myStack);
        Consumer consumer3 = new Consumer(myStack);
        Consumer consumer4 = new Consumer(myStack);
        Consumer consumer5 = new Consumer(myStack);
        ProducerThread producerThread = new ProducerThread(producer);
        producerThread.start();
        ConsumerThread consumerThread1 = new ConsumerThread(consumer1);
        ConsumerThread consumerThread2 = new ConsumerThread(consumer2);
        ConsumerThread consumerThread3 = new ConsumerThread(consumer3);
        ConsumerThread consumerThread4 = new ConsumerThread(consumer4);
        ConsumerThread consumerThread5 = new ConsumerThread(consumer5);
        consumerThread1.start();
        consumerThread2.start();
        consumerThread3.start();
        consumerThread4.start();
        consumerThread5.start();
    }
}
do push threadName=Thread-0
push size = 1
do push threadName=Thread-0
do pop threadName=Thread-5
pop=0
do pop threadName=Thread-5
pop操作中的:Thread-5線程呈WAITING
do pop threadName=Thread-4
pop操作中的:Thread-4線程呈WAITING
push size = 1
do push threadName=Thread-0
pop=0
do pop threadName=Thread-5
pop操作中的:Thread-5線程呈WAITING
Exception in thread "Thread-4" do pop threadName=Thread-3
pop操作中的:Thread-3線程呈WAITING
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:657)
    at java.util.ArrayList.get(ArrayList.java:433)
    at com.ykc.m.MyStack.pop(MyStack.java:31)
    at com.ykc.m.Consumer.popService(Consumer.java:9)
    at com.ykc.m.ConsumerThread.run(ConsumerThread.java:11)
do pop threadName=Thread-2
pop操作中的:Thread-2線程呈WAITING
do pop threadName=Thread-1
pop操作中的:Thread-1線程呈WAITING

由于條件發(fā)生改變沒有及時(shí)得到響應(yīng),所以多個(gè)呈wait狀態(tài)的線程被喚醒,執(zhí)行remove方法時(shí)出現(xiàn)異常,我們可以吧if改成while即可,代碼如下:

public class MyStack {
    private List<String> list = new ArrayList();

    public synchronized void push() {
        System.out.println("do push threadName=" + Thread.currentThread().getName());
        try {
            while (list.size() == 1) {
                wait();
            }
            list.add("anyString=" + Math.random());
            notify();
            System.out.println("push size = " + list.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public synchronized String pop() {
        String returnValue = "";
        System.out.println("do pop threadName=" + Thread.currentThread().getName());
        try {
            while (list.size() == 0) {
                System.out.println("pop操作中的:" + Thread.currentThread().getName() + "線程呈WAITING");
                wait();
            }
            returnValue = "" + list.get(0);
            list.remove(0);
            notify();
            System.out.println("pop=" + list.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return returnValue;
    }
}
···
do pop threadName=Thread-5
pop操作中的:Thread-5線程呈WAITING
do pop threadName=Thread-1
pop操作中的:Thread-1線程呈WAITING
do pop threadName=Thread-2
pop操作中的:Thread-2線程呈WAITING
do pop threadName=Thread-4
pop操作中的:Thread-4線程呈WAITING
push size = 1
do push threadName=Thread-0
pop=0
do pop threadName=Thread-3
pop操作中的:Thread-3線程呈WAITING
pop操作中的:Thread-1線程呈WAITING
pop操作中的:Thread-5線程呈WAITING

異常的問題是解決了,卻又出現(xiàn)了“假死”,之前我們有驗(yàn)證過,把notify方法,改為notifyAll即可,有疑問的可以重新看3.1.11.2小節(jié)。

3.1.11.4 多生產(chǎn)與多消費(fèi)-操作棧:解決wait條件改變與假死

仍然使用上一章節(jié)代碼,修改我們的Run.java的代碼如下:

public class Run {
    public static void main(String[] args) {
        MyStack myStack = new MyStack();
        Producer producer1 = new Producer(myStack);
        Producer producer2 = new Producer(myStack);
        Producer producer3 = new Producer(myStack);
        Producer producer4 = new Producer(myStack);
        Producer producer5 = new Producer(myStack);
        Consumer consumer1 = new Consumer(myStack);
        Consumer consumer2 = new Consumer(myStack);
        Consumer consumer3 = new Consumer(myStack);
        Consumer consumer4 = new Consumer(myStack);
        Consumer consumer5 = new Consumer(myStack);
        ProducerThread producerThread1 = new ProducerThread(producer1);
        ProducerThread producerThread2 = new ProducerThread(producer2);
        ProducerThread producerThread3 = new ProducerThread(producer3);
        ProducerThread producerThread4 = new ProducerThread(producer4);
        ProducerThread producerThread5 = new ProducerThread(producer5);
        producerThread1.start();
        producerThread2.start();
        producerThread3.start();
        producerThread4.start();
        producerThread5.start();
        ConsumerThread consumerThread1 = new ConsumerThread(consumer1);
        ConsumerThread consumerThread2 = new ConsumerThread(consumer2);
        ConsumerThread consumerThread3 = new ConsumerThread(consumer3);
        ConsumerThread consumerThread4 = new ConsumerThread(consumer4);
        ConsumerThread consumerThread5 = new ConsumerThread(consumer5);
        consumerThread1.start();
        consumerThread2.start();
        consumerThread3.start();
        consumerThread4.start();
        consumerThread5.start();
    }
}
do push threadName=Thread-0
push size = 1
do push threadName=Thread-0
do push threadName=Thread-2
do push threadName=Thread-4
do pop threadName=Thread-8
pop=0
do pop threadName=Thread-8
pop操作中的:Thread-8線程呈WAITING
push size = 1
do push threadName=Thread-4
do pop threadName=Thread-5
pop=0
do pop threadName=Thread-5
pop操作中的:Thread-5線程呈WAITING
push size = 1
do push threadName=Thread-2
pop=0
do pop threadName=Thread-5
pop操作中的:Thread-5線程呈WAITING
push size = 1
do pop threadName=Thread-9
pop=0
···

3.1.12 通過管道進(jìn)行線程間通信:字節(jié)流

PipeStream是一種特殊的流,用于在不同線程間直接傳送數(shù)據(jù)。一個(gè)線程發(fā)送數(shù)據(jù)輸出管道,另一個(gè)線程從輸入管道中讀取數(shù)據(jù)。
1)PipedInputStreamPipedOutputStream
2)PipedReaderPipedWriter

class WriteData {
    void writeMethod(PipedOutputStream out) throws IOException {
        try {
            System.out.println("write");
            for (int i = 0; i < 10; i++) {
                String outData = "" + (i + 1);
                out.write(outData.getBytes());
                System.out.print(outData);
            }
            System.out.println();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            out.close();
        }
    }
}
public class WriteThread extends Thread {
    private WriteData write;
    private PipedOutputStream out;
    public WriteThread(WriteData write, PipedOutputStream out) {
        this.write = write;
        this.out = out;
    }
    @Override
    public void run() {
        try {
            write.writeMethod(out);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class ReadData {
    public void readMethod(PipedInputStream input) throws IOException {
        try {
            System.out.println("read");
            byte[] byteArray = new byte[20];
            int readLength = input.read(byteArray);
            while (readLength != -1) {
                String newData = new String(byteArray, 0, readLength);
                System.out.print(newData);
                readLength = input.read(byteArray);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            input.close();
        }
    }
}
public class ReadThread extends Thread {
    private ReadData read;
    private PipedInputStream in;
    public ReadThread(ReadData read, PipedInputStream in) {
        this.read = read;
        this.in = in;
    }
    @Override
    public void run() {
        try {
            read.readMethod(in);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class Run {
    public static void main(String[] args) throws IOException, InterruptedException {
        WriteData writeData = new WriteData();
        ReadData readData = new ReadData();
        PipedInputStream inputStream = new PipedInputStream();
        PipedOutputStream outputStream = new PipedOutputStream();
        outputStream.connect(inputStream);
        WriteThread writeThread = new WriteThread(writeData, outputStream);
        writeThread.start();
        Thread.sleep(1000);
        ReadThread readThread = new ReadThread(readData, inputStream);
        readThread.start();
    }
}
write
12345678910
read
12345678910

3.1.13通過管道進(jìn)行線程間通信:字符流

public class WriteData {
    public void writeMethod(PipedWriter out) {
        try {
            System.out.print("write :");
            for (int i = 0; i < 11; i++) {
                String outData = "" + (i + 1);
                System.out.print(outData);
                out.write(outData);
            }
            System.out.println();
            out.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class WriteThread extends Thread {
    private WriteData write;
    private PipedWriter out;
    public WriteThread(WriteData write, PipedWriter out) {
        this.write = write;
        this.out = out;
    }
    @Override
    public void run() {
        write.writeMethod(out);
    }
}
public class ReadData {
    public void readMethod(PipedReader input) {
        try {
            System.out.print(" read :");
            char[] byteArray = new char[20];
            int readLength = input.read(byteArray);
            while (readLength != -1) {
                String newData = new String(byteArray, 0, readLength);
                System.out.print(newData);
                readLength = input.read(byteArray);
            }
            input.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class ReadThread extends Thread {
    private ReadData read;
    private PipedReader in;
    public ReadThread(ReadData read, PipedReader in) {
        this.read = read;
        this.in = in;
    }
    @Override
    public void run() {
        read.readMethod(in);
    }
}
public class Run {
    public static void main(String[] args) {
        try {
            WriteData writeData = new WriteData();
            ReadData readData = new ReadData();
            PipedReader inputStream = new PipedReader();
            PipedWriter outputStream = new PipedWriter();
            outputStream.connect(inputStream);
            WriteThread writeThread = new WriteThread(writeData, outputStream);
            writeThread.start();
            Thread.sleep(100);
            ReadThread readThread = new ReadThread(readData, inputStream);
            readThread.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
write :1234567891011
 read :1234567891011

3.2 join方法的使用

主線程創(chuàng)建并啟動(dòng)子線程,如果子線程需要進(jìn)行大量的耗時(shí)運(yùn)算,主線程往往將早于子線程結(jié)束之前結(jié)束,這時(shí),如果主線程想要等待子線程執(zhí)行完成之后再結(jié)束,就要用到join()了。join()的作用是等待線程對(duì)象銷毀。

public class MyThread extends Thread {
    @Override
    public void run() {
        int secondValue = (int) (Math.random() * 10000);
        System.out.println(secondValue);
    }
}
public class Test {
    public static void main(String[] args) {
        MyThread threadTest = new MyThread();
        threadTest.start();
        System.out.println("想在線程運(yùn)行結(jié)束后打印···");
    }
}
想在線程運(yùn)行結(jié)束后打印···
9607

修改Test.java代碼如下:

public class Test {
    public static void main(String[] args) throws InterruptedException {
        MyThread threadTest = new MyThread();
        threadTest.start();
        threadTest.join();
        System.out.println("想在線程運(yùn)行結(jié)束后打印···");
    }
}
5277
想在線程運(yùn)行結(jié)束后打印···

join()的作用是使所屬線程對(duì)象x正常執(zhí)行run()方法中的任務(wù),而使當(dāng)前線程z進(jìn)行無限期的阻塞,等待線程x銷毀后,再繼續(xù)執(zhí)行線程z后面的代碼。在join過程中,如果當(dāng)前線程對(duì)象被中斷,則當(dāng)前會(huì)出現(xiàn)InterruptedException。

join()具有使線程排隊(duì)運(yùn)行的作用,有些類似同步的運(yùn)行效果。join與synchronized的區(qū)別是:join()在內(nèi)部使用wait()方法進(jìn)行等待,而synchronized使用的是對(duì)象監(jiān)視器來實(shí)現(xiàn)同步。

3.2.1 join(long)與sleep(long)的區(qū)別

join(long)的功能在內(nèi)部是使用wait(long)來實(shí)現(xiàn)的,所以join(long)方法具有釋放鎖的特點(diǎn)。join(long)源碼如下:

    public final synchronized void join(long millis) throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

3.2.2 方法join()后面的代碼提前運(yùn)行

public class ThreadB extends Thread {
    @Override
    synchronized public void run() {
        try {
            System.out.println("begin B threadName=" + Thread.currentThread().getName() + " " + System.currentTimeMillis());
            Thread.sleep(5000);
            System.out.println("  end B threadName=" + Thread.currentThread().getName() + " " + System.currentTimeMillis());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class ThreadA extends Thread {
    private ThreadB threadB;
    public ThreadA(ThreadB threadB) {
        this.threadB = threadB;
    }
    @Override
    public void run() {
        synchronized (threadB) {
            try {
                System.out.println("begin A threadName = " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
                Thread.sleep(5000);
                System.out.println("  end A threadName = " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public class Run {
    public static void main(String[] args) {
        try {
            ThreadB threadB = new ThreadB();
            ThreadA threadA = new ThreadA(threadB);
            threadA.start();
            threadB.start();
            threadB.join(2000);
            System.out.println("main end " + System.currentTimeMillis());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
begin A threadName = Thread-1 1590631573130
  end A threadName = Thread-1 1590631578130
main end 1590631578130
begin B threadName=Thread-0 1590631578130
  end B threadName=Thread-0 1590631583130

那么下面我們來分析一下問題出現(xiàn)的原因,修改Run.java代碼如下:

public class Run {
    public static void main(String[] args) {
        ThreadB threadB = new ThreadB();
        ThreadA threadA = new ThreadA(threadB);
        threadA.start();
        threadB.start();
        System.out.println("main end " + System.currentTimeMillis());
    }
}
main end 1590631921788
begin A threadName = Thread-1 1590631921788
  end A threadName = Thread-1 1590631926790
begin B threadName=Thread-0 1590631926790
  end B threadName=Thread-0 1590631931790

從執(zhí)行結(jié)果來看,“main end ···”每次都是第一個(gè)打印的。那么針對(duì)上次的執(zhí)行結(jié)果來看,join(2000)幾乎是每次都是先運(yùn)行的,也就是最先搶到ThreadB的鎖,然后釋放。基本的過程如下:
1)threadB.join(2000)先搶到ThreadB的鎖,然后釋放。
2)ThreadA搶到ThreadB的鎖,執(zhí)行synchronized代碼塊內(nèi)的代碼,進(jìn)行sleep(5000),5s結(jié)束后打印end。
3)join(2000)和ThreadB再次爭(zhēng)搶鎖,時(shí)間已過2000ms,釋放鎖,然后打印“main end”。
4)ThreadB獲得鎖之后執(zhí)行run方法。

3.3 ThreadLocal的使用

ThreadLocal是除了加鎖這種同步方式之外,保證多線程訪問不會(huì)出現(xiàn)非線程安全問題,當(dāng)我們?cè)趧?chuàng)建一個(gè)變量后,每個(gè)線程對(duì)其進(jìn)行訪問的時(shí),訪問的都是線程自己的變量這樣就不會(huì)存在線程不安全問題。
Map里面存儲(chǔ)線程本地對(duì)象(key)和線程的變量副本(value)
但是,Thread內(nèi)部的Map是由ThreadLocal維護(hù)的,由ThreadLocal負(fù)責(zé)向map獲取和設(shè)置線程的變量值。
下面我看幾個(gè)常用方法如下:

    /**
     * Returns the value in the current thread's copy of this
     * thread-local variable.  If the variable has no value for the
     * current thread, it is first initialized to the value returned
     * by an invocation of the {@link #initialValue} method.
     *
     * @return the current thread's value of this thread-local
     */
    public T get() {
        // 獲取當(dāng)前線程對(duì)象
        Thread t = Thread.currentThread();
        // 獲取當(dāng)前線程對(duì)象的ThreadLocalMap
        ThreadLocalMap map = getMap(t);
        // 判斷map是否存在
        if (map != null) {
            // 當(dāng)前ThreadLocal實(shí)例對(duì)象為key,獲取之前存儲(chǔ)的實(shí)體
            ThreadLocalMap.Entry e = map.getEntry(this);
            // 判斷當(dāng)前實(shí)體是否存在
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        // 若map不存在則進(jìn)行初始化值
        return setInitialValue();
    }
    
   /**
     * Get the map associated with a ThreadLocal. Overridden in
     * InheritableThreadLocal.
     *
     * @param  t the current thread
     * @return the map
     */
    ThreadLocalMap getMap(Thread t) {
        // 返回當(dāng)前線程的ThreadLocalMap 
        return t.threadLocals;
    }

    /**
     * Variant of set() to establish initialValue. Used instead
     * of set() in case user has overridden the set() method.
     *
     * @return the initial value
     */
    private T setInitialValue() {
        // null
        T value = initialValue();
        // 獲取當(dāng)前線程
        Thread t = Thread.currentThread();
        // 獲取當(dāng)前線程對(duì)象的ThreadLocalMap
        ThreadLocalMap map = getMap(t);
        if (map != null)
            // map不為空時(shí),初始化存儲(chǔ)對(duì)象實(shí)體value為null
            map.set(this, value);
        else
            // map為空時(shí),以當(dāng)前線程的threadLocals為key
            createMap(t, value);
        return value;
    }

    protected T initialValue() {
        return null;
    }

   /**
     * Create the map associated with a ThreadLocal. Overridden in
     * InheritableThreadLocal.
     *
     * @param t the current thread
     * @param firstValue value for the initial entry of the map
     */
    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }

    /**
     * Sets the current thread's copy of this thread-local variable
     * to the specified value.  Most subclasses will have no need to
     * override this method, relying solely on the {@link #initialValue}
     * method to set the values of thread-locals.
     *
     * @param value the value to be stored in the current thread's copy of
     *        this thread-local.
     */
    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }

    /**
     * Removes the current thread's value for this thread-local
     * variable.  If this thread-local variable is subsequently
     * {@linkplain #get read} by the current thread, its value will be
     * reinitialized by invoking its {@link #initialValue} method,
     * unless its value is {@linkplain #set set} by the current thread
     * in the interim.  This may result in multiple invocations of the
     * {@code initialValue} method in the current thread.
     *
     * @since 1.5
     */
     public void remove() {
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
             m.remove(this);
     }

注意點(diǎn):在線程run() 方法中不顯示的調(diào)用remove() 清理與線程相關(guān)的ThreadLocal 信息,線程復(fù)用會(huì)產(chǎn)生臟數(shù)據(jù)(線程池會(huì)重用Thread對(duì)象,如果先一個(gè)線程不調(diào)用set() 設(shè)置初始值,那么與Thread綁定的類靜態(tài)屬性也會(huì)被重用)。

3.3.1 驗(yàn)證線程變量的隔離性

public class ThreadLocalTools {
    public static ThreadLocal threadLocal = new ThreadLocal();
}
public class ThreadA extends Thread {
    @Override
    public void run() {
        try {
            for (int i = 0; i < 100; i++) {
                ThreadLocalTools.threadLocal.set("ThreadA " + i);
                System.out.println("ThreadA get Value = " + ThreadLocalTools.threadLocal.get());
                Thread.sleep(200);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class ThreadB extends Thread {
    @Override
    public void run() {
        try {
            for (int i = 0; i < 100; i++) {
                ThreadLocalTools.threadLocal.set("ThreadB " + i);
                System.out.println("ThreadB get Value = " + ThreadLocalTools.threadLocal.get());
                Thread.sleep(200);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class Run {
    public static void main(String[] args) throws InterruptedException {
        ThreadA a = new ThreadA();
        ThreadB b = new ThreadB();
        a.start();
        b.start();
        for (int i = 0; i < 100; i++) {
            ThreadLocalTools.threadLocal.set("main thread " + i);
            System.out.println("main thread getValue " + ThreadLocalTools.threadLocal.get());
            Thread.sleep(200);
        }
    }
}
···
main thread getValue main thread 97
ThreadA get Value = ThreadA 97
ThreadB get Value = ThreadB 97
main thread getValue main thread 98
ThreadA get Value = ThreadA 98
ThreadB get Value = ThreadB 98
main thread getValue main thread 99
ThreadA get Value = ThreadA 99
ThreadB get Value = ThreadB 99

3.4 InheritableThreadLocal的使用

使用InheritableThreadLocal可以再子線程中取得父線程繼承下來的值。

public class InheritableThreadLocalExt extends InheritableThreadLocal {
    @Override
    protected Object initialValue() {
        return System.currentTimeMillis();
    }
}
public class Tools {
    public static InheritableThreadLocalExt t1 = new InheritableThreadLocalExt();
}
public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("MyThread獲取的值 = " + Tools.t1.get());
    }
}
public class Main {
    public static void main(String[] args) throws InterruptedException {
        System.out.println(" main線程取值    = " + Tools.t1.get());
        Thread.sleep(1000);
        MyThread myThread = new MyThread();
        myThread.start();
    }
}
 main線程取值    = 1590647232214
MyThread獲取的值 = 1590647232214

參考文獻(xiàn)

《Java多線程編程核心技術(shù)》高紅巖

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容