內(nèi)容簡(jiǎn)介
本章需要重點(diǎn)掌握的技術(shù)點(diǎn)如下:
1)使用wait/notify實(shí)現(xiàn)線程間的通信。
2)生產(chǎn)者/消費(fèi)者模式的實(shí)現(xiàn)。
3)方法join的使用。
4)ThreadLocal類的使用。
3.1 等待/通知機(jī)制
3.1.1 不使用等待/通知機(jī)制實(shí)現(xiàn)線程間的通信
public class MyList {
private List list = new ArrayList();
public void add(){
list.add("");
}
public int size(){
return list.size();
}
}
public class ThreadA extends Thread {
private MyList myList;
public ThreadA(MyList myList) {
this.myList = myList;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
myList.add();
System.out.println("添加了" + (i + 1) + "個(gè)元素");
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
private volatile MyList myList;
public ThreadB(MyList myList) {
this.myList = myList;
}
@Override
public void run() {
try {
while (true) {
// System.out.print("");
if (myList.size() == 5) {
System.out.println("==5,線程b退出來");
throw new InterruptedException();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class Run {
public static void main(String[] args) {
MyList myList = new MyList();
ThreadA a = new ThreadA(myList);
a.setName("a");
a.start();
ThreadB b = new ThreadB(myList);
b.setName("b");
b.start();
}
}
添加了1個(gè)元素
添加了2個(gè)元素
添加了3個(gè)元素
添加了4個(gè)元素
添加了5個(gè)元素
==5,線程b退出來
java.lang.InterruptedException
at part6.ThreadB.run(ThreadB.java:18)
添加了6個(gè)元素
添加了7個(gè)元素
添加了8個(gè)元素
添加了9個(gè)元素
添加了10個(gè)元素
雖然兩個(gè)線程實(shí)現(xiàn)了通信,但是是通過while語句輪詢檢測(cè)實(shí)現(xiàn)的,輪詢時(shí)間過小浪費(fèi)CPU資源,輪詢時(shí)間過大有可能會(huì)取不到想要的數(shù)據(jù),這時(shí)候就可以用到wait/notify機(jī)制。
3.1.3 等待/通知機(jī)制的實(shí)現(xiàn)
wait()的作用是使當(dāng)前執(zhí)行代碼的線程進(jìn)行等待。wait()是Object類的方法, 該方法用來將當(dāng)前線程置入“預(yù)執(zhí)行隊(duì)列”中,在調(diào)用wait()所在代碼行處停止執(zhí)行,直到接到通知或被中斷 。在調(diào)用wait()之前,線程必須獲取該對(duì)象的對(duì)象級(jí)別鎖,只能在同步方法或同步代碼塊中調(diào)用wait()方法。執(zhí)行wait()之后,當(dāng)前線程釋放鎖。如果調(diào)用wait()時(shí)沒有持有適當(dāng)?shù)逆i,則會(huì)拋出IllegalMonitorStateException。
方法
notify(),也要在同步方法或同步代碼塊中調(diào)用,用來通知那些等待該對(duì)象鎖的其他線程,如果多個(gè)線程等待,則由線程規(guī)劃器隨機(jī)挑選一個(gè)呈wait狀態(tài)的線程,對(duì)其發(fā)起通知。在執(zhí)行notify()方法后,當(dāng)前線程不會(huì)馬上釋放對(duì)象鎖,當(dāng)前線程釋放該對(duì)象鎖之后,呈wait狀態(tài)的線程才可以獲取鎖。如果調(diào)用notify()時(shí)沒有持有適當(dāng)?shù)逆i,則會(huì)拋出IllegalMonitorStateException。
public class Test {
public static void main(String[] args) throws InterruptedException {
String lock = new String();
System.out.println("sync上面");
synchronized (lock){
System.out.println("sync第一行");
lock.wait();
System.out.println("wait下面");
}
System.out.println("end");
}
}
sync上面
sync第一行
根據(jù)上面的執(zhí)行結(jié)果來看,線程進(jìn)入wait狀態(tài),不會(huì)繼續(xù)向下運(yùn)行,讓它繼續(xù)執(zhí)行,可以使用notify()或notifyAll()。
public class Mythread1 extends Thread {
private Object lock;
public Mythread1(Object lock) {
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
System.out.println("start wait time =" + System.currentTimeMillis());
lock.wait();
System.out.println("end wait time =" + System.currentTimeMillis());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Mythread2 extends Thread {
private Object lock;
public Mythread2(Object lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
System.out.println("start notify time =" + System.currentTimeMillis());
lock.notify();
System.out.println("end notify time =" + System.currentTimeMillis());
}
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
Mythread1 mythread1 = new Mythread1(lock);
mythread1.start();
Thread.sleep(3000);
Mythread2 mythread2 = new Mythread2(lock);
mythread2.start();
}
}
start wait time =1588821820237
start notify time =1588821823237
end notify time =1588821823237
end wait time =1588821823237
從上面打印信息來看,Mythread1先執(zhí)行進(jìn)入wait狀態(tài),3秒后被notify喚醒。
public class MyList {
private static List list = new ArrayList();
public static void add() {
list.add("");
}
public static int size() {
return list.size();
}
}
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock) {
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
if (MyList.size() != 5) {
System.out.println("wait begin " + System.currentTimeMillis());
lock.wait();
System.out.println("wait end " + System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
private Object lock;
public ThreadB(Object lock) {
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
for (int i = 0; i < 10; i++) {
MyList.add();
System.out.println("添加了" + (i + 1) + "個(gè)元素");
if (MyList.size() == 5) {
lock.notify();
System.out.println("notify end");
}
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
Object object = new Object();
ThreadA a = new ThreadA(object);
a.start();
Thread.sleep(50);
ThreadB b = new ThreadB(object);
b.start();
}
}
wait begin 1588829402319
添加了1個(gè)元素
添加了2個(gè)元素
添加了3個(gè)元素
添加了4個(gè)元素
添加了5個(gè)元素
notify end
添加了6個(gè)元素
添加了7個(gè)元素
添加了8個(gè)元素
添加了9個(gè)元素
添加了10個(gè)元素
wait end 1588829412382
日志wait end...在最后輸出,也驗(yàn)證了我們之前的結(jié)論:noitfy()方法執(zhí)行完之后不會(huì)立即釋放鎖。
synchronized可以將任何一個(gè)Object對(duì)象作為同步對(duì)象來對(duì)待,而Java為每個(gè)Object都實(shí)現(xiàn)了wait()和notify(),他們必須用在被synchronized同步的Object的臨界區(qū)內(nèi)。
wait()可以使處于臨界區(qū)內(nèi)的線程進(jìn)入等待狀態(tài),同時(shí)釋放被同步的對(duì)象的鎖。
notify()可以喚醒一個(gè)因調(diào)用了wait()操作而處于阻塞狀態(tài)中的線程進(jìn)入就緒狀態(tài)。被重新喚醒的線程會(huì)試圖重新獲取臨界區(qū)的控制權(quán)。
1)new Thread()后,調(diào)用start(),系統(tǒng)會(huì)為此線程分配CPU資源,使其處于Runnable狀態(tài),當(dāng)線程搶占到CPU資源,此線程就處于Running狀態(tài)。
2)Runnable狀態(tài)和Runnable可相互切換,線程在Running狀態(tài)時(shí),調(diào)用yield()使當(dāng)前線程重新回到Runnable狀態(tài)。
3)Blocked是線程因?yàn)槟撤N原因放棄CPU使用權(quán),暫時(shí)停止運(yùn)行。直到線程進(jìn)入就緒狀態(tài),才有機(jī)會(huì)轉(zhuǎn)到運(yùn)行狀態(tài)。
線程進(jìn)入Runnable狀態(tài)大致有以下5中情況:
1.調(diào)用sleep()方法后經(jīng)過的時(shí)間查過了指定的休眠時(shí)間。
2.線程調(diào)用的阻塞IO已經(jīng)返回,阻塞方法執(zhí)行完畢。
3.線程成功獲得視圖同步的監(jiān)視器。
4.線程正在等待某個(gè)通知,其他線程發(fā)出了通知。
5.處于掛起狀態(tài)的線程調(diào)用了resume()。
線程進(jìn)入Blocked狀態(tài)大致有以下5中情況:
1.線程調(diào)用sleep(),主動(dòng)放棄占用的處理器資源。
2.線程調(diào)用的阻塞IO方法,在該方法返回前,該線程被阻塞。
3.線程視圖獲得一個(gè)同步監(jiān)視器,但同步監(jiān)視器正被其他線程持有。
4.線程等待某個(gè)通知。
5.調(diào)用suspend()將該線程掛起。
3.1.4 方法wait()鎖釋放與notify()鎖不釋放
下面我們進(jìn)行試驗(yàn)wait鎖釋放,代碼如下:
public class Service {
public void testMethod(Object lock){
try {
synchronized (lock){
System.out.println("begin wait()");
lock.wait();
System.out.println(" end wait()");
}
}catch (Exception e){
e.printStackTrace();
}
}
}
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock) {
this.lock = lock;
}
@Override
public void run() {
Service service = new Service();
service.testMethod(lock);
}
}
public class ThreadB extends Thread {
private Object lock;
public ThreadB(Object lock) {
this.lock = lock;
}
@Override
public void run() {
Service service = new Service();
service.testMethod(lock);
}
}
public class Test {
public static void main(String[] args) {
Object lock = new Object();
ThreadA a = new ThreadA(lock);
a.start();
ThreadB b = new ThreadB(lock);
b.start();
}
}
執(zhí)行結(jié)果如下:
begin wait()
begin wait()
結(jié)論:當(dāng)wait()被執(zhí)行后,鎖被自動(dòng)釋放。
下面來驗(yàn)證:notify()被執(zhí)行后不釋放鎖。
public class Service {
public void testMethod(Object lock) {
try {
synchronized (lock) {
System.out.println("begin wait() ThreadName=" + Thread.currentThread().getName() + " time=" + System.currentTimeMillis());
lock.wait();
System.out.println(" end wait() ThreadName=" + Thread.currentThread().getName() + " time=" + System.currentTimeMillis());
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void synNotifyMethod(Object lock) {
try {
synchronized (lock) {
System.out.println("begin notify() ThreadName=" + Thread.currentThread().getName() + " time=" + System.currentTimeMillis() + " time=" + System.currentTimeMillis());
lock.notify();
Thread.sleep(5000);
System.out.println(" end notify() ThreadName=" + Thread.currentThread().getName() + " time=" + System.currentTimeMillis() + " time=" + System.currentTimeMillis());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock) {
this.lock = lock;
}
@Override
public void run() {
Service service = new Service();
service.testMethod(lock);
}
}
public class NotifyThread extends Thread {
private Object lock;
public NotifyThread(Object lock) {
this.lock = lock;
}
@Override
public void run() {
Service service = new Service();
service.synNotifyMethod(lock);
}
}
public class SynNotifyMethodThread extends Thread {
private Object lock;
public SynNotifyMethodThread(Object lock) {
this.lock = lock;
}
@Override
public void run() {
Service service = new Service();
service.synNotifyMethod(lock);
}
}
public class Test {
public static void main(String[] args) {
Object lock = new Object();
ThreadA a = new ThreadA(lock);
a.start();
NotifyThread notifyThread = new NotifyThread(lock);
notifyThread.start();
SynNotifyMethodThread syn = new SynNotifyMethodThread(lock);
syn.start();
}
}
begin wait() ThreadName=Thread-0 time=1590474000435
begin notify() ThreadName=Thread-2 time=1590474000436 time=1590474000436
end notify() ThreadName=Thread-2 time=1590474005436 time=1590474005436
end wait() ThreadName=Thread-0 time=1590474005436
begin notify() ThreadName=Thread-1 time=1590474005436 time=1590474005436
end notify() ThreadName=Thread-1 time=1590474010436 time=1590474010436
結(jié)論:必須執(zhí)行完notify方法所在的同步synchronized代碼塊之后才釋放鎖。
3.1.5 當(dāng)interrupt方法遇到wait方法
當(dāng)線程呈wait狀態(tài)時(shí),調(diào)用線程對(duì)象的interrupt方法時(shí),會(huì)出現(xiàn)InterruptException異常。
public class Service {
public void testMethod(Object lock){
try {
synchronized (lock){
System.out.println("begin wait");
lock.wait();
System.out.println(" end wait");
}
}catch (Exception e){
}
}
}
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock){
this.lock = lock;
}
@Override
public void run() {
Service service = new Service();
service.testMethod(lock);
}
}
public class Test {
public static void main(String[] args) {
try {
Object lock = new Object();
ThreadA a = new ThreadA(lock);
a.start();
Thread.sleep(5000);
a.interrupt();
} catch (Exception e) {
e.printStackTrace();
}
}
}
begin wait
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
結(jié)論:
執(zhí)行同步代碼塊的過程中,執(zhí)行鎖所屬對(duì)象的wait()方法后,這個(gè)線程會(huì)釋放對(duì)象鎖,而此線程對(duì)象會(huì)進(jìn)入線程等待池中,等待被喚醒,如果在這個(gè)過程中斷該線程時(shí),會(huì)出現(xiàn)InterruptException異常。
3.1.6 只通知一個(gè)線程與通知所有線程
下面來驗(yàn)證:調(diào)用notify()時(shí),一次只隨機(jī)通知一個(gè)線程進(jìn)行喚醒。
public class Service extends Thread {
public void testMethod(Object lock) {
try {
synchronized (lock) {
System.out.println("begin wait ThreadName=" + Thread.currentThread().getName());
lock.wait();
System.out.println(" end wait ThreadName=" + Thread.currentThread().getName());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock) {
this.lock = lock;
}
@Override
public void run() {
Service service = new Service();
service.testMethod(lock);
}
}
public class ThreadB extends Thread {
private Object lock;
public ThreadB(Object lock) {
this.lock = lock;
}
@Override
public void run() {
Service service = new Service();
service.testMethod(lock);
}
}
public class ThreadC extends Thread {
private Object lock;
public ThreadC(Object lock) {
this.lock = lock;
}
@Override
public void run() {
Service service = new Service();
service.testMethod(lock);
}
}
public class NotifyThread extends Thread {
private Object lock;
public NotifyThread(Object lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
lock.notify();
}
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
ThreadA a = new ThreadA(lock);
a.start();
ThreadB b = new ThreadB(lock);
b.start();
ThreadC c = new ThreadC(lock);
c.start();
Thread.sleep(1000);
NotifyThread notifyThread = new NotifyThread(lock);
notifyThread.start();
}
}
begin wait ThreadName=Thread-0
begin wait ThreadName=Thread-3
begin wait ThreadName=Thread-1
end wait ThreadName=Thread-0
結(jié)論:調(diào)用notify方法后,隨機(jī)通知一個(gè)等待該對(duì)象的對(duì)象鎖的其他線程。
那么如何一次性喚醒所有等待該對(duì)象的對(duì)象鎖的其他線程呢?修改NotifyThread.java代碼如下:
public class NotifyThread extends Thread {
private Object lock;
public NotifyThread(Object lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
lock.notifyAll();
}
}
}
begin wait ThreadName=Thread-0
begin wait ThreadName=Thread-1
begin wait ThreadName=Thread-2
end wait ThreadName=Thread-2
end wait ThreadName=Thread-1
end wait ThreadName=Thread-0
3.1.8 方法wait(long)的使用
wait(long)方法的功能是等待某一時(shí)間內(nèi)是否有線程對(duì)鎖進(jìn)行喚醒,如果超過這個(gè)時(shí)間則自動(dòng)喚醒。
public class MyRunnable {
static private Object lock = new Object();
static private Runnable runnable = new Runnable() {
public void run() {
try {
synchronized (lock) {
System.out.println("wait begin timer=" + System.currentTimeMillis());
lock.wait(5000);
System.out.println("wait end timer=" + System.currentTimeMillis());
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
static private Runnable runnable2 = new Runnable() {
public void run() {
synchronized (lock){
System.out.println("notify begin timer=" + System.currentTimeMillis());
lock.notify();
System.out.println("notify end timer=" + System.currentTimeMillis());
}
}
};
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(runnable);
t.start();
}
}
wait begin timer=1590475975460
wait end timer=1590475980460
3.1.9 等待wait的條件發(fā)生變化。
在使用wait/notify模式時(shí),還需要注意另一正情況,也就是wait的等待條件發(fā)生變化,容易造成程序邏輯的混亂。
public class ValueObject {
public static List<String> list = new ArrayList<String>();
}
public class Add {
private String lock;
public Add(String lock) {
this.lock = lock;
}
public void add() {
synchronized (lock) {
ValueObject.list.add("lalala");
lock.notifyAll();
}
}
}
public class AddThread extends Thread {
public Add add;
public AddThread(Add add) {
this.add = add;
}
@Override
public void run() {
add.add();
}
}
public class Subtract {
private String lock;
public Subtract(String lock) {
this.lock = lock;
}
public void subtract() {
try {
synchronized (lock) {
if (ValueObject.list.size() == 0) {
System.out.println("wait begin ThreadName=" + Thread.currentThread().getName());
lock.wait();
System.out.println("wait end ThreadName=" + Thread.currentThread().getName());
}
System.out.println("do remove threadName=" + Thread.currentThread().getName());
ValueObject.list.remove(0);
System.out.println("do remove end threadName=" + Thread.currentThread().getName());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class SubtractThread extends Thread {
public Subtract subtract;
public SubtractThread(Subtract subtract) {
this.subtract = subtract;
}
@Override
public void run() {
subtract.subtract();
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
String lock = "";
Add add = new Add(lock);
Subtract subtract = new Subtract(lock);
SubtractThread subtractThread = new SubtractThread(subtract);
subtractThread.setName("sub1");
subtractThread.start();
SubtractThread subtractThread2 = new SubtractThread(subtract);
subtractThread2.setName("sub2");
subtractThread2.start();
Thread.sleep(1000);
AddThread addThread = new AddThread(add);
addThread.setName("add");
addThread.start();
}
}
wait begin ThreadName=sub2
wait begin ThreadName=sub1
wait end ThreadName=sub1
do remove threadName=sub1
do remove end threadName=sub1
wait end ThreadName=sub2
do remove threadName=sub2
Exception in thread "sub2" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.remove(ArrayList.java:496)
出現(xiàn)上述問題的原因是Thread.sleep()之前,sub1,sub2線程都都執(zhí)行了wait方法,進(jìn)入WAITING狀態(tài),在add線程執(zhí)行完成之后通過notifyAdd()喚醒兩個(gè)線程,在一個(gè)線程執(zhí)行完remove之后,list的size為零,這時(shí)候,另一個(gè)線程再進(jìn)行remove操作的時(shí)候就拋出了IndexOutOfBoundsException。
那么下面我們修改Subtract.java代碼如下:
public class Subtract {
private String lock;
public Subtract(String lock) {
this.lock = lock;
}
public void subtract() {
try {
synchronized (lock) {
while (ValueObject.list.size() == 0) {
System.out.println("wait begin ThreadName=" + Thread.currentThread().getName());
lock.wait();
System.out.println("wait end ThreadName=" + Thread.currentThread().getName());
}
System.out.println("do remove threadName=" + Thread.currentThread().getName());
ValueObject.list.remove(0);
System.out.println("do remove end threadName=" + Thread.currentThread().getName());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
wait begin ThreadName=sub1
wait begin ThreadName=sub2
wait end ThreadName=sub2
do remove threadName=sub2
do remove end threadName=sub2
wait end ThreadName=sub1
wait begin ThreadName=sub1
通過上面打印信息可以看出,當(dāng)兩個(gè)線程同時(shí)被喚醒之后,兩個(gè)SubtractThread線程其中一個(gè)執(zhí)行完remove之后,另一個(gè)線程執(zhí)行完wait()方法之后的代碼,因滿足ValueObject.list.size() == 0條件,重新進(jìn)入WAITING狀態(tài)。
3.1.11 生產(chǎn)者/消費(fèi)者模式的實(shí)現(xiàn)。
等待/通知模式的經(jīng)典案例就是
生產(chǎn)者/消費(fèi)者模式。
3.1.11.1 一生產(chǎn)與一消費(fèi):操作值
public class ValueObject {
public static String value = "";
}
public class Producer {
private String lock;
public Producer(String lock) {
this.lock = lock;
}
public void setValue() {
try {
synchronized (lock){
if (!"".equals(ValueObject.value)) {
lock.wait();
}
String value = System.currentTimeMillis() + "_" + System.nanoTime();
System.out.println("put value = " + value);
ValueObject.value = value;
lock.notify();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ProducerThread extends Thread {
private Producer producer;
public ProducerThread(Producer producer) {
this.producer = producer;
}
@Override
public void run() {
while (true) {
producer.setValue();
}
}
}
public class Consumer {
private String lock;
public Consumer(String lock) {
this.lock = lock;
}
public void getValue() {
synchronized (lock) {
if ("".equals(ValueObject.value)) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("get Value = " + ValueObject.value);
ValueObject.value = "";
lock.notify();
}
}
}
public class ConsumerThread extends Thread {
private Consumer consumer;
public ConsumerThread(Consumer consumer) {
this.consumer = consumer;
}
@Override
public void run() {
while (true) {
consumer.getValue();
}
}
}
public class Run {
public static void main(String[] args) {
String lock = "";
Producer producer = new Producer(lock);
Consumer consumer = new Consumer(lock);
ProducerThread producerThread = new ProducerThread(producer);
ConsumerThread consumerThread = new ConsumerThread(consumer);
producerThread.start();
consumerThread.start();
}
}
put value = 1590487918891_263135745700279
get Value = 1590487918891_263135745700279
put value = 1590487918891_263135745708299
get Value = 1590487918891_263135745708299
put value = 1590487918891_263135745716318
get Value = 1590487918891_263135745716318
put value = 1590487918891_263135745729149
get Value = 1590487918891_263135745729149
put value = 1590487918891_263135745736848
get Value = 1590487918891_263135745736848
put value = 1590487918891_263135745744547
get Value = 1590487918891_263135745744547
···
當(dāng)前只是一個(gè)生產(chǎn)者與一個(gè)消費(fèi)者進(jìn)行的數(shù)據(jù)交互。但是如果在此實(shí)驗(yàn)的基礎(chǔ)上,出現(xiàn)多個(gè)生產(chǎn)與消費(fèi)者,那么在運(yùn)行的時(shí)候極有可能出現(xiàn)“假死”的情況,也就是所有的線程都呈WAITING狀態(tài)。
3.1.11.2 多生產(chǎn)與多消費(fèi):假死
public class Producer {
private Object lock;
private List<String> list;
public Producer(Object lock, List<String> list) {
this.lock = lock;
this.list = list;
}
public void setValue() {
try {
synchronized (lock) {
if (list.size() != 0) {
System.out.println("生產(chǎn)者 :" + Thread.currentThread().getName() + " 處于WAITING");
lock.wait();
}
String value = System.currentTimeMillis() + "_" + System.nanoTime();
System.out.println("生產(chǎn)者 :" + Thread.currentThread().getName() + " 處于RUNNABLE");
list.add(value);
lock.notify();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ProducerThread extends Thread {
private Producer producer;
public ProducerThread(Producer producer) {
this.producer = producer;
}
@Override
public void run() {
while (true) {
producer.setValue();
}
}
}
public class Consumer {
private Object lock;
private List<String> list;
public Consumer(Object lock, List<String> list) {
this.lock = lock;
this.list = list;
}
public void getValue() {
try {
synchronized (lock) {
if (list.size() == 0) {
System.out.println("消費(fèi)者 :" + Thread.currentThread().getName() + " 處于WAITING");
lock.wait();
}
System.out.println("消費(fèi)者 :" + Thread.currentThread().getName() + " 處于RUNNABLE");
list.remove(0);
lock.notify();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ConsumerThread extends Thread {
private Consumer consumer;
public ConsumerThread(Consumer consumer) {
this.consumer = consumer;
}
@Override
public void run() {
while (true) {
consumer.getValue();
}
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
List<String> list = new ArrayList<String>();
Consumer consumer = new Consumer(lock, list);
Producer producer = new Producer(lock, list);
ConsumerThread[] consumerThread = new ConsumerThread[2];
ProducerThread[] producerThread = new ProducerThread[2];
for (int i = 0; i < 2; i++) {
consumerThread[i] = new ConsumerThread(consumer);
consumerThread[i].setName("消費(fèi)者" + (i + 1));
producerThread[i] = new ProducerThread(producer);
producerThread[i].setName("生產(chǎn)者" + (i + 1));
consumerThread[i].start();
producerThread[i].start();
}
Thread.sleep(5000);
// Thread.currentThread().getThreadGroup().activeCount() 活動(dòng)線程數(shù)
Thread[] threadArray = new Thread[Thread.currentThread().getThreadGroup().activeCount()];
// 每個(gè)活動(dòng)線程的線程組及其子組復(fù)制到指定的數(shù)組中
Thread.currentThread().getThreadGroup().enumerate(threadArray);
for (Thread thread : threadArray) {
System.out.println(thread.getName() + " " + thread.getState());
}
}
}
···
生產(chǎn)者 :生產(chǎn)者2 處于RUNNABLE
生產(chǎn)者 :生產(chǎn)者2 處于WAITING
生產(chǎn)者 :生產(chǎn)者1 處于WAITING
消費(fèi)者 :消費(fèi)者1 處于RUNNABLE
消費(fèi)者 :消費(fèi)者1 處于WAITING
消費(fèi)者 :消費(fèi)者2 處于RUNNABLE
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.remove(ArrayList.java:496)
at part17.Consumer.getValue(Consumer.java:20)
at part17.ConsumerThread.run(ConsumerThread.java:11)
消費(fèi)者 :消費(fèi)者2 處于WAITING
main RUNNABLE
Monitor Ctrl-Break RUNNABLE
消費(fèi)者1 WAITING
生產(chǎn)者1 WAITING
消費(fèi)者2 WAITING
生產(chǎn)者2 WAITING
出現(xiàn)都在等待狀態(tài)的原因是:代碼中確實(shí)已經(jīng)通過notify/wait進(jìn)行通信了,但不保證notify喚醒的也許是同類(生產(chǎn)者喚醒生產(chǎn)者、消費(fèi)者喚醒消費(fèi)者),如果是按照這種情況運(yùn)行,那么就會(huì)造成該情況的出現(xiàn)。解決方案就是使用notifyAll方法即可。
3.1.11.3 一生產(chǎn)與一消費(fèi):操作棧
生產(chǎn)者向堆棧List對(duì)象中放入數(shù)據(jù),使消費(fèi)者從List堆棧中取出數(shù)據(jù)。
public class MyStack {
private List<String> list = new ArrayList();
public synchronized void push() {
System.out.println("do push threadName=" + Thread.currentThread().getName());
try {
if (list.size() == 1) {
wait();
}
list.add("anyString=" + Math.random());
notify();
System.out.println("push size = " + list.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized String pop() {
String returnValue = "";
System.out.println("do pop threadName=" + Thread.currentThread().getName());
try {
if (list.size() == 0) {
System.out.println("pop操作中的:" + Thread.currentThread().getName() + "線程呈WAITING");
wait();
}
returnValue = "" + list.get(0);
list.remove(0);
notify();
System.out.println("pop=" + list.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
return returnValue;
}
}
public class Producer {
private MyStack myStack;
public Producer(MyStack myStack) {
this.myStack = myStack;
}
public void pushService() {
myStack.push();
}
}
public class ProducerThread extends Thread {
private Producer producer;
public ProducerThread(Producer producer) {
this.producer = producer;
}
@Override
public void run() {
while (true) {
producer.pushService();
}
}
}
public class Consumer {
private MyStack myStack;
public Consumer(MyStack myStack) {
this.myStack = myStack;
}
public void popService() {
myStack.pop();
}
}
public class ConsumerThread extends Thread {
private Consumer consumer;
public ConsumerThread(Consumer consumer) {
this.consumer = consumer;
}
@Override
public void run() {
while (true){
consumer.popService();
}
}
}
public class Run {
public static void main(String[] args) {
MyStack myStack = new MyStack();
Producer producer = new Producer(myStack);
Consumer consumer = new Consumer(myStack);
ProducerThread producerThread = new ProducerThread(producer);
ConsumerThread consumerThread = new ConsumerThread(consumer);
producerThread.start();
consumerThread.start();
}
}
···
do push threadName=Thread-0
push size = 1
do push threadName=Thread-0
do pop threadName=Thread-1
pop=0
do pop threadName=Thread-1
pop操作中的:Thread-1線程呈WAITING
push size = 1
do push threadName=Thread-0
pop=0
do pop threadName=Thread-1
pop操作中的:Thread-1線程呈WAITING
push size = 1
do push threadName=Thread-0
pop=0
do pop threadName=Thread-1
···
3.1.11.4 一生產(chǎn)與多消費(fèi)-操作棧:解決wait條件改變與假死
使用上一節(jié)的代碼,修改Run方法如下:
public class Run {
public static void main(String[] args) {
MyStack myStack = new MyStack();
Producer producer = new Producer(myStack);
Consumer consumer1 = new Consumer(myStack);
Consumer consumer2 = new Consumer(myStack);
Consumer consumer3 = new Consumer(myStack);
Consumer consumer4 = new Consumer(myStack);
Consumer consumer5 = new Consumer(myStack);
ProducerThread producerThread = new ProducerThread(producer);
producerThread.start();
ConsumerThread consumerThread1 = new ConsumerThread(consumer1);
ConsumerThread consumerThread2 = new ConsumerThread(consumer2);
ConsumerThread consumerThread3 = new ConsumerThread(consumer3);
ConsumerThread consumerThread4 = new ConsumerThread(consumer4);
ConsumerThread consumerThread5 = new ConsumerThread(consumer5);
consumerThread1.start();
consumerThread2.start();
consumerThread3.start();
consumerThread4.start();
consumerThread5.start();
}
}
do push threadName=Thread-0
push size = 1
do push threadName=Thread-0
do pop threadName=Thread-5
pop=0
do pop threadName=Thread-5
pop操作中的:Thread-5線程呈WAITING
do pop threadName=Thread-4
pop操作中的:Thread-4線程呈WAITING
push size = 1
do push threadName=Thread-0
pop=0
do pop threadName=Thread-5
pop操作中的:Thread-5線程呈WAITING
Exception in thread "Thread-4" do pop threadName=Thread-3
pop操作中的:Thread-3線程呈WAITING
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at com.ykc.m.MyStack.pop(MyStack.java:31)
at com.ykc.m.Consumer.popService(Consumer.java:9)
at com.ykc.m.ConsumerThread.run(ConsumerThread.java:11)
do pop threadName=Thread-2
pop操作中的:Thread-2線程呈WAITING
do pop threadName=Thread-1
pop操作中的:Thread-1線程呈WAITING
由于條件發(fā)生改變沒有及時(shí)得到響應(yīng),所以多個(gè)呈wait狀態(tài)的線程被喚醒,執(zhí)行remove方法時(shí)出現(xiàn)異常,我們可以吧if改成while即可,代碼如下:
public class MyStack {
private List<String> list = new ArrayList();
public synchronized void push() {
System.out.println("do push threadName=" + Thread.currentThread().getName());
try {
while (list.size() == 1) {
wait();
}
list.add("anyString=" + Math.random());
notify();
System.out.println("push size = " + list.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized String pop() {
String returnValue = "";
System.out.println("do pop threadName=" + Thread.currentThread().getName());
try {
while (list.size() == 0) {
System.out.println("pop操作中的:" + Thread.currentThread().getName() + "線程呈WAITING");
wait();
}
returnValue = "" + list.get(0);
list.remove(0);
notify();
System.out.println("pop=" + list.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
return returnValue;
}
}
···
do pop threadName=Thread-5
pop操作中的:Thread-5線程呈WAITING
do pop threadName=Thread-1
pop操作中的:Thread-1線程呈WAITING
do pop threadName=Thread-2
pop操作中的:Thread-2線程呈WAITING
do pop threadName=Thread-4
pop操作中的:Thread-4線程呈WAITING
push size = 1
do push threadName=Thread-0
pop=0
do pop threadName=Thread-3
pop操作中的:Thread-3線程呈WAITING
pop操作中的:Thread-1線程呈WAITING
pop操作中的:Thread-5線程呈WAITING
異常的問題是解決了,卻又出現(xiàn)了“假死”,之前我們有驗(yàn)證過,把notify方法,改為notifyAll即可,有疑問的可以重新看3.1.11.2小節(jié)。
3.1.11.4 多生產(chǎn)與多消費(fèi)-操作棧:解決wait條件改變與假死
仍然使用上一章節(jié)代碼,修改我們的Run.java的代碼如下:
public class Run {
public static void main(String[] args) {
MyStack myStack = new MyStack();
Producer producer1 = new Producer(myStack);
Producer producer2 = new Producer(myStack);
Producer producer3 = new Producer(myStack);
Producer producer4 = new Producer(myStack);
Producer producer5 = new Producer(myStack);
Consumer consumer1 = new Consumer(myStack);
Consumer consumer2 = new Consumer(myStack);
Consumer consumer3 = new Consumer(myStack);
Consumer consumer4 = new Consumer(myStack);
Consumer consumer5 = new Consumer(myStack);
ProducerThread producerThread1 = new ProducerThread(producer1);
ProducerThread producerThread2 = new ProducerThread(producer2);
ProducerThread producerThread3 = new ProducerThread(producer3);
ProducerThread producerThread4 = new ProducerThread(producer4);
ProducerThread producerThread5 = new ProducerThread(producer5);
producerThread1.start();
producerThread2.start();
producerThread3.start();
producerThread4.start();
producerThread5.start();
ConsumerThread consumerThread1 = new ConsumerThread(consumer1);
ConsumerThread consumerThread2 = new ConsumerThread(consumer2);
ConsumerThread consumerThread3 = new ConsumerThread(consumer3);
ConsumerThread consumerThread4 = new ConsumerThread(consumer4);
ConsumerThread consumerThread5 = new ConsumerThread(consumer5);
consumerThread1.start();
consumerThread2.start();
consumerThread3.start();
consumerThread4.start();
consumerThread5.start();
}
}
do push threadName=Thread-0
push size = 1
do push threadName=Thread-0
do push threadName=Thread-2
do push threadName=Thread-4
do pop threadName=Thread-8
pop=0
do pop threadName=Thread-8
pop操作中的:Thread-8線程呈WAITING
push size = 1
do push threadName=Thread-4
do pop threadName=Thread-5
pop=0
do pop threadName=Thread-5
pop操作中的:Thread-5線程呈WAITING
push size = 1
do push threadName=Thread-2
pop=0
do pop threadName=Thread-5
pop操作中的:Thread-5線程呈WAITING
push size = 1
do pop threadName=Thread-9
pop=0
···
3.1.12 通過管道進(jìn)行線程間通信:字節(jié)流
PipeStream是一種特殊的流,用于在不同線程間直接傳送數(shù)據(jù)。一個(gè)線程發(fā)送數(shù)據(jù)輸出管道,另一個(gè)線程從輸入管道中讀取數(shù)據(jù)。
1)PipedInputStream和PipedOutputStream
2)PipedReader和PipedWriter
class WriteData {
void writeMethod(PipedOutputStream out) throws IOException {
try {
System.out.println("write");
for (int i = 0; i < 10; i++) {
String outData = "" + (i + 1);
out.write(outData.getBytes());
System.out.print(outData);
}
System.out.println();
} catch (Exception e) {
e.printStackTrace();
} finally {
out.close();
}
}
}
public class WriteThread extends Thread {
private WriteData write;
private PipedOutputStream out;
public WriteThread(WriteData write, PipedOutputStream out) {
this.write = write;
this.out = out;
}
@Override
public void run() {
try {
write.writeMethod(out);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class ReadData {
public void readMethod(PipedInputStream input) throws IOException {
try {
System.out.println("read");
byte[] byteArray = new byte[20];
int readLength = input.read(byteArray);
while (readLength != -1) {
String newData = new String(byteArray, 0, readLength);
System.out.print(newData);
readLength = input.read(byteArray);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
input.close();
}
}
}
public class ReadThread extends Thread {
private ReadData read;
private PipedInputStream in;
public ReadThread(ReadData read, PipedInputStream in) {
this.read = read;
this.in = in;
}
@Override
public void run() {
try {
read.readMethod(in);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class Run {
public static void main(String[] args) throws IOException, InterruptedException {
WriteData writeData = new WriteData();
ReadData readData = new ReadData();
PipedInputStream inputStream = new PipedInputStream();
PipedOutputStream outputStream = new PipedOutputStream();
outputStream.connect(inputStream);
WriteThread writeThread = new WriteThread(writeData, outputStream);
writeThread.start();
Thread.sleep(1000);
ReadThread readThread = new ReadThread(readData, inputStream);
readThread.start();
}
}
write
12345678910
read
12345678910
3.1.13通過管道進(jìn)行線程間通信:字符流
public class WriteData {
public void writeMethod(PipedWriter out) {
try {
System.out.print("write :");
for (int i = 0; i < 11; i++) {
String outData = "" + (i + 1);
System.out.print(outData);
out.write(outData);
}
System.out.println();
out.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class WriteThread extends Thread {
private WriteData write;
private PipedWriter out;
public WriteThread(WriteData write, PipedWriter out) {
this.write = write;
this.out = out;
}
@Override
public void run() {
write.writeMethod(out);
}
}
public class ReadData {
public void readMethod(PipedReader input) {
try {
System.out.print(" read :");
char[] byteArray = new char[20];
int readLength = input.read(byteArray);
while (readLength != -1) {
String newData = new String(byteArray, 0, readLength);
System.out.print(newData);
readLength = input.read(byteArray);
}
input.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ReadThread extends Thread {
private ReadData read;
private PipedReader in;
public ReadThread(ReadData read, PipedReader in) {
this.read = read;
this.in = in;
}
@Override
public void run() {
read.readMethod(in);
}
}
public class Run {
public static void main(String[] args) {
try {
WriteData writeData = new WriteData();
ReadData readData = new ReadData();
PipedReader inputStream = new PipedReader();
PipedWriter outputStream = new PipedWriter();
outputStream.connect(inputStream);
WriteThread writeThread = new WriteThread(writeData, outputStream);
writeThread.start();
Thread.sleep(100);
ReadThread readThread = new ReadThread(readData, inputStream);
readThread.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
write :1234567891011
read :1234567891011
3.2 join方法的使用
主線程創(chuàng)建并啟動(dòng)子線程,如果子線程需要進(jìn)行大量的耗時(shí)運(yùn)算,主線程往往將早于子線程結(jié)束之前結(jié)束,這時(shí),如果主線程想要等待子線程執(zhí)行完成之后再結(jié)束,就要用到
join()了。join()的作用是等待線程對(duì)象銷毀。
public class MyThread extends Thread {
@Override
public void run() {
int secondValue = (int) (Math.random() * 10000);
System.out.println(secondValue);
}
}
public class Test {
public static void main(String[] args) {
MyThread threadTest = new MyThread();
threadTest.start();
System.out.println("想在線程運(yùn)行結(jié)束后打印···");
}
}
想在線程運(yùn)行結(jié)束后打印···
9607
修改Test.java代碼如下:
public class Test {
public static void main(String[] args) throws InterruptedException {
MyThread threadTest = new MyThread();
threadTest.start();
threadTest.join();
System.out.println("想在線程運(yùn)行結(jié)束后打印···");
}
}
5277
想在線程運(yùn)行結(jié)束后打印···
join()的作用是使所屬線程對(duì)象x正常執(zhí)行run()方法中的任務(wù),而使當(dāng)前線程z進(jìn)行無限期的阻塞,等待線程x銷毀后,再繼續(xù)執(zhí)行線程z后面的代碼。在join過程中,如果當(dāng)前線程對(duì)象被中斷,則當(dāng)前會(huì)出現(xiàn)InterruptedException。
join()具有使線程排隊(duì)運(yùn)行的作用,有些類似同步的運(yùn)行效果。join與synchronized的區(qū)別是:
join()在內(nèi)部使用wait()方法進(jìn)行等待,而synchronized使用的是對(duì)象監(jiān)視器來實(shí)現(xiàn)同步。
3.2.1 join(long)與sleep(long)的區(qū)別
join(long)的功能在內(nèi)部是使用wait(long)來實(shí)現(xiàn)的,所以join(long)方法具有釋放鎖的特點(diǎn)。join(long)源碼如下:
public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
3.2.2 方法join()后面的代碼提前運(yùn)行
public class ThreadB extends Thread {
@Override
synchronized public void run() {
try {
System.out.println("begin B threadName=" + Thread.currentThread().getName() + " " + System.currentTimeMillis());
Thread.sleep(5000);
System.out.println(" end B threadName=" + Thread.currentThread().getName() + " " + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadA extends Thread {
private ThreadB threadB;
public ThreadA(ThreadB threadB) {
this.threadB = threadB;
}
@Override
public void run() {
synchronized (threadB) {
try {
System.out.println("begin A threadName = " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
Thread.sleep(5000);
System.out.println(" end A threadName = " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Run {
public static void main(String[] args) {
try {
ThreadB threadB = new ThreadB();
ThreadA threadA = new ThreadA(threadB);
threadA.start();
threadB.start();
threadB.join(2000);
System.out.println("main end " + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
begin A threadName = Thread-1 1590631573130
end A threadName = Thread-1 1590631578130
main end 1590631578130
begin B threadName=Thread-0 1590631578130
end B threadName=Thread-0 1590631583130
那么下面我們來分析一下問題出現(xiàn)的原因,修改Run.java代碼如下:
public class Run {
public static void main(String[] args) {
ThreadB threadB = new ThreadB();
ThreadA threadA = new ThreadA(threadB);
threadA.start();
threadB.start();
System.out.println("main end " + System.currentTimeMillis());
}
}
main end 1590631921788
begin A threadName = Thread-1 1590631921788
end A threadName = Thread-1 1590631926790
begin B threadName=Thread-0 1590631926790
end B threadName=Thread-0 1590631931790
從執(zhí)行結(jié)果來看,“main end ···”每次都是第一個(gè)打印的。那么針對(duì)上次的執(zhí)行結(jié)果來看,join(2000)幾乎是每次都是先運(yùn)行的,也就是最先搶到ThreadB的鎖,然后釋放。基本的過程如下:
1)threadB.join(2000)先搶到ThreadB的鎖,然后釋放。
2)ThreadA搶到ThreadB的鎖,執(zhí)行synchronized代碼塊內(nèi)的代碼,進(jìn)行sleep(5000),5s結(jié)束后打印end。
3)join(2000)和ThreadB再次爭(zhēng)搶鎖,時(shí)間已過2000ms,釋放鎖,然后打印“main end”。
4)ThreadB獲得鎖之后執(zhí)行run方法。
3.3 ThreadLocal的使用
ThreadLocal是除了加鎖這種同步方式之外,保證多線程訪問不會(huì)出現(xiàn)非線程安全問題,當(dāng)我們?cè)趧?chuàng)建一個(gè)變量后,每個(gè)線程對(duì)其進(jìn)行訪問的時(shí),訪問的都是線程自己的變量這樣就不會(huì)存在線程不安全問題。
Map里面存儲(chǔ)線程本地對(duì)象(key)和線程的變量副本(value)
但是,Thread內(nèi)部的Map是由ThreadLocal維護(hù)的,由ThreadLocal負(fù)責(zé)向map獲取和設(shè)置線程的變量值。
下面我看幾個(gè)常用方法如下:
/**
* Returns the value in the current thread's copy of this
* thread-local variable. If the variable has no value for the
* current thread, it is first initialized to the value returned
* by an invocation of the {@link #initialValue} method.
*
* @return the current thread's value of this thread-local
*/
public T get() {
// 獲取當(dāng)前線程對(duì)象
Thread t = Thread.currentThread();
// 獲取當(dāng)前線程對(duì)象的ThreadLocalMap
ThreadLocalMap map = getMap(t);
// 判斷map是否存在
if (map != null) {
// 當(dāng)前ThreadLocal實(shí)例對(duì)象為key,獲取之前存儲(chǔ)的實(shí)體
ThreadLocalMap.Entry e = map.getEntry(this);
// 判斷當(dāng)前實(shí)體是否存在
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 若map不存在則進(jìn)行初始化值
return setInitialValue();
}
/**
* Get the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @return the map
*/
ThreadLocalMap getMap(Thread t) {
// 返回當(dāng)前線程的ThreadLocalMap
return t.threadLocals;
}
/**
* Variant of set() to establish initialValue. Used instead
* of set() in case user has overridden the set() method.
*
* @return the initial value
*/
private T setInitialValue() {
// null
T value = initialValue();
// 獲取當(dāng)前線程
Thread t = Thread.currentThread();
// 獲取當(dāng)前線程對(duì)象的ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null)
// map不為空時(shí),初始化存儲(chǔ)對(duì)象實(shí)體value為null
map.set(this, value);
else
// map為空時(shí),以當(dāng)前線程的threadLocals為key
createMap(t, value);
return value;
}
protected T initialValue() {
return null;
}
/**
* Create the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @param firstValue value for the initial entry of the map
*/
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
/**
* Sets the current thread's copy of this thread-local variable
* to the specified value. Most subclasses will have no need to
* override this method, relying solely on the {@link #initialValue}
* method to set the values of thread-locals.
*
* @param value the value to be stored in the current thread's copy of
* this thread-local.
*/
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
/**
* Removes the current thread's value for this thread-local
* variable. If this thread-local variable is subsequently
* {@linkplain #get read} by the current thread, its value will be
* reinitialized by invoking its {@link #initialValue} method,
* unless its value is {@linkplain #set set} by the current thread
* in the interim. This may result in multiple invocations of the
* {@code initialValue} method in the current thread.
*
* @since 1.5
*/
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}
注意點(diǎn):
在線程run() 方法中不顯示的調(diào)用remove() 清理與線程相關(guān)的ThreadLocal 信息,線程復(fù)用會(huì)產(chǎn)生臟數(shù)據(jù)(線程池會(huì)重用Thread對(duì)象,如果先一個(gè)線程不調(diào)用set() 設(shè)置初始值,那么與Thread綁定的類靜態(tài)屬性也會(huì)被重用)。
3.3.1 驗(yàn)證線程變量的隔離性
public class ThreadLocalTools {
public static ThreadLocal threadLocal = new ThreadLocal();
}
public class ThreadA extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
ThreadLocalTools.threadLocal.set("ThreadA " + i);
System.out.println("ThreadA get Value = " + ThreadLocalTools.threadLocal.get());
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
ThreadLocalTools.threadLocal.set("ThreadB " + i);
System.out.println("ThreadB get Value = " + ThreadLocalTools.threadLocal.get());
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
ThreadA a = new ThreadA();
ThreadB b = new ThreadB();
a.start();
b.start();
for (int i = 0; i < 100; i++) {
ThreadLocalTools.threadLocal.set("main thread " + i);
System.out.println("main thread getValue " + ThreadLocalTools.threadLocal.get());
Thread.sleep(200);
}
}
}
···
main thread getValue main thread 97
ThreadA get Value = ThreadA 97
ThreadB get Value = ThreadB 97
main thread getValue main thread 98
ThreadA get Value = ThreadA 98
ThreadB get Value = ThreadB 98
main thread getValue main thread 99
ThreadA get Value = ThreadA 99
ThreadB get Value = ThreadB 99
3.4 InheritableThreadLocal的使用
使用InheritableThreadLocal可以再子線程中取得父線程繼承下來的值。
public class InheritableThreadLocalExt extends InheritableThreadLocal {
@Override
protected Object initialValue() {
return System.currentTimeMillis();
}
}
public class Tools {
public static InheritableThreadLocalExt t1 = new InheritableThreadLocalExt();
}
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("MyThread獲取的值 = " + Tools.t1.get());
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
System.out.println(" main線程取值 = " + Tools.t1.get());
Thread.sleep(1000);
MyThread myThread = new MyThread();
myThread.start();
}
}
main線程取值 = 1590647232214
MyThread獲取的值 = 1590647232214
參考文獻(xiàn)
《Java多線程編程核心技術(shù)》高紅巖