1、等待/通知機制
線程間通訊可以采用while語句輪詢檢測某一條件,缺點是線程主動操作、讀取、判斷同一個變量,此方法浪費CPU資源。
1.1 等待/通知機制的實現(xiàn)
- wait()方法和notify()方法
1、要在同步方法或同步代碼塊中調(diào)用,即必須在調(diào)用前先獲取該對象鎖,若沒有會拋出IllegalMonitorStateException。
2、執(zhí)行wait()方法后,線程停止運行并釋放鎖,后續(xù)代碼不執(zhí)行;
3、notify()方法用來通知呈wait狀態(tài)的線程,隨機挑選一個對其發(fā)送通知并使其轉(zhuǎn)為就緒狀態(tài),獲取對象鎖并執(zhí)行后續(xù)代碼。執(zhí)行notify()方法不會立即釋放鎖,需要等待線程將程序執(zhí)行完成。
4、notifyAll()方法可以通知所有在等待同一資源的線程全部進入就緒狀態(tài)。
public class MyList
{
//此處必須將變量定義為static,方法定義為public方便訪問
private static List list = new ArrayList();
public static void add()
{
list.add("myString");
}
public static int size()
{
return list.size();
}
}
public class ThreadA extends Thread
{
private Object lock;
public ThreadA(Object lock)
{
super();
this.lock = lock;
}
@Override
public void run()
{
try
{
synchronized(lock)
{
if(MyList.size() != 5)
{
System.out.println("wait begin");
lock.wait();
System.out.println("wait end");
}
}
}
catch(InterruptedException e)
{
e.printStackTrace();
}
}
}
public class ThreadB extends Thread
{
private Object lock;
public ThreadB(Object lock)
{
super();
this.lock = lock;
}
@Override
public void run()
{
try
{
synchronized(lock)
{
for(int i=0; i < 10; I++)
{
MyList.add();
if(MyList.size() == 5)
{
System.out.println("notify begin");
lock.notify();
System.out.println("notify end");
}
System.out.println("add 第" + (i+1) + "個元素");
}
}
}
catch(InterruptedException e)
{
e.printStackTrace();
}
}
}
public class Run
{
public static void main(String[] args)
{
try
{
Object lock = new Object();
ThreadA a = new ThreadA(lock);
a.start();
Thread.sleep(50);
ThreadB b = new ThreadB(lock);
b.start();
}
catch(InterruptedException e)
{
e.printStackTrace();
}
}
}
- 每個鎖對象都有兩個隊列,一個是就緒隊列,一個是阻塞隊列,就緒隊列存儲將要獲取鎖的線程,阻塞隊列存儲被阻塞的線程。
- wait()方法被執(zhí)行后,鎖被自動釋放,執(zhí)行完notify()方法鎖不會立即釋放,等待notify()方法所在的代碼塊代碼全部執(zhí)行完成。
- wait(long)方法:等待某一段時間內(nèi)是否有線程對鎖進行喚醒,超過這個時間線程會自動喚醒。
1.2 生產(chǎn)者/消費者模式的實現(xiàn)
- 一生產(chǎn)者賦值,一消費者讀取
public class Consumer
{
private Object lock;
public Consumer(Object lock)
{
this.lock = lock;
}
public void getValue()
{
try
{
synchronized (lock)
{
if(Value.getValue().equals(""))
{
lock.wait();
}
System.out.println("Consumer get " + Value.getValue());
Value.setValue("");
lock.notify();
}
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
public class Produce
{
private Object lock;
public Produce(Object lock)
{
this.lock = lock;
}
public void setValue()
{
try
{
synchronized (lock)
{
if(!Value.getValue().equals(""))
{
lock.wait();
}
String value = System.currentTimeMillis()+"";
System.out.println("Produce set " + value);
Value.setValue(value);
lock.notify();
}
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
public class ConsumerThread extends Thread
{
private Consumer consumer;
public ConsumerThread(Consumer consumer)
{
super();
this.consumer = consumer;
}
@Override
public void run()
{
while(true)
{
consumer.getValue();
}
}
}
public class ProduceThread extends Thread
{
private Produce produce;
public ProduceThread(Produce produce)
{
super();
this.produce = produce;
}
@Override
public void run()
{
while(true)
{
produce.setValue();
}
}
}
public class Value
{
//設(shè)置初值
private static String value = "";
public static void setValue(String valueSet)
{
value = valueSet;
}
public static String getValue()
{
return value;
}
}
public class Main
{
public static void main(String[] args)
{
Object lock = new Object();
Produce produce = new Produce(lock);
ProduceThread produceThread = new ProduceThread(produce);
Consumer consumer = new Consumer(lock);
ConsumerThread consumerThread = new ConsumerThread(consumer);
produceThread.start();
consumerThread.start();
}
}
運作機制:
以value值作為切入點,當value值為""時,消費者線程為wait狀態(tài),生產(chǎn)者獲取鎖對value賦值,完成賦值后notify并釋放鎖,消費者被喚醒,獲取生產(chǎn)者給value賦的值,并將value值設(shè)置回"",notify并釋放鎖。
- 多生產(chǎn)者/多消費者實現(xiàn)
假死:因為線程notify的對象有可能是異類,也有可能是同類,連續(xù)多次喚醒同類,這就有可能導(dǎo)致所有的線程都進入waiting狀態(tài),從而任務(wù)無法正常執(zhí)行即假死。
解決方法:使用notifyAll()方法
public class Stack
{
private List list = new ArrayList();
synchronized public void push()
{
try
{
//此處由if改為while,條件改變時無法得到及時響應(yīng)
//喚醒多個wait狀態(tài)的線程,pop中的remove出錯。
while(list.size() == 1)
{
this.wait();
}
list.add("anyString"));
this.notify();
System.out.println("push " + list.size());
}
catch(InterruptedException e)
{
e.printStackTrace();
}
}
synchronized public void pop()
{
try
{
while(list.size() != 1)
{
this.wait();
}
list.remove(0);
//通知同類和異類
this.notifyAll();
System.out.println("pop" + list.size());
}
}
}
*交叉?zhèn)浞輰崿F(xiàn)
public class BackUpTool
{
private boolean backToA = false;
synchronized public void backUpA()
{
try
{
while (backToA == false)
{
wait();
}
System.out.println("Backup to A");
backToA = false;
notifyAll();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
synchronized public void backUpB()
{
try
{
while (backToA == true)
{
wait();
}
System.out.println("Backup to B");
backToA = true;
notifyAll();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
public class ThreadA extends Thread
{
private BackUpTool backUpTool;
public ThreadA(BackUpTool tool)
{
super();
this.backUpTool = tool;
}
@Override
public void run()
{
backUpTool.backUpA();
}
}
public class ThreadB extends Thread
{
private BackUpTool backUpTool;
public ThreadB(BackUpTool tool)
{
super();
this.backUpTool = tool;
}
@Override
public void run()
{
backUpTool.backUpB();
}
}
public class Run
{
public static void main(String[] args)
{
BackUpTool backUpTool = new BackUpTool();
for(int i = 0; i < 20; i++)
{
ThreadA threadA = new ThreadA(backUpTool);
ThreadB threadB = new ThreadB(backUpTool);
threadA.start();
threadB.start();
}
}
}
2、通過管道進行線程間通信
管道流(pipeStream)是一種特殊的流,用于不同線程間直接傳送數(shù)據(jù)。一個線程發(fā)送數(shù)據(jù)到輸出管道,另一個線程從輸入管道讀取數(shù)據(jù)。
字節(jié)流 PipedInputStream、PipedOutputStream
字符流 PipedReader、PipedWriter
import java.io.IOException;
import java.io.PipedOutputStream;
public class WriteData
{
public void wirteMethod(PipedOutputStream out)
{
try
{
System.out.println("Write:");
for(int i=0; i<300;i++)
{
String outData = "" + (i+1);
//0到299寫入到輸出管道
out.write(outData.getBytes());
System.out.print(outData);
}
System.out.println();
out.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
import java.io.PipedOutputStream;
public class ThreadWrite extends Thread
{
private WriteData writeData;
private PipedOutputStream out;
public ThreadWrite(WriteData writeData, PipedOutputStream out)
{
super();
this.writeData = writeData;
this.out = out;
}
@Override
public void run()
{
writeData.wirteMethod(out);
}
}
import java.io.IOException;
import java.io.PipedInputStream;
public class ReadData
{
public void readMethod(PipedInputStream input)
{
try
{
System.out.println("Read:");
byte[] byteArray = new byte[20];
//到輸入管道里讀取數(shù)據(jù)到byteArray數(shù)組中
//返回數(shù)組大小
int readLength = input.read(byteArray);
while(readLength != -1)
{
String newData = new String(byteArray,0,readLength);
System.out.println(newData);
readLength = input.read(byteArray);
}
System.out.println();
input.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
import java.io.PipedInputStream;
public class ThreadReader extends Thread
{
private ReadData readData;
private PipedInputStream input;
public ThreadReader(ReadData readData, PipedInputStream input)
{
super();
this.readData = readData;
this.input = input;
}
@Override
public void run()
{
readData.readMethod(input);
}
}
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class Run
{
public static void main(String[] args)
{
try
{
WriteData writeData = new WriteData();
ReadData readData = new ReadData();
PipedInputStream inputStream = new PipedInputStream();
PipedOutputStream outputStream = new PipedOutputStream();
inputStream.connect(outputStream);
ThreadReader threadReader = new ThreadReader(readData, inputStream);
ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
threadWrite.start();
Thread.sleep(2000);
threadReader.start();
}
catch (IOException e)
{
e.printStackTrace();
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}
3、join()方法
3.1 join()方法使用方式
class Run
{
public static void main(String[] args)
{
Thread thread = new Thread();
thread.start();
//加上join語句可以保證在thread線程結(jié)束后再執(zhí)行打印end操作
thread.join();
System.out.println("end");
}
}
方法join()的作用是使所屬的線程對象thread正常執(zhí)行完run()方法中的任務(wù),而使當前的線程main進行無限期的阻塞,等待線程thread銷毀后再繼續(xù)執(zhí)行線程main后面的代碼。
3.2 join(long)與sleep(long)的區(qū)別
方法join(long)的功能在內(nèi)部使用wait(long)來實現(xiàn)的,因此具有釋放鎖的特點;而sleep(long)方法不是釋放鎖。
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
4、類ThreadLocal作用
每個線程綁定自己的值,ThreadLocal類可以存儲每個線程的私有數(shù)據(jù)。每個線程可以獲取到各自設(shè)置的值,互不影響,隔離性。
public class Tools
{
public static ThreadLocal t1 = new ThreadLocal();
}
public class ThreadA extends Thread
{
@Override
public void run()
{
try
{
for(int i = 0; i < 100; i++)
{
Tools.t1.set("ThreadA" + (i+1));
System.out.println("ThreadA get " + Tools.t1.get());
Thread.sleep(1000);
}
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}
public class ThreadB extends Thread
{
@Override
public void run()
{
try
{
for(int i = 0; i < 100; i++)
{
Tools.t1.set("ThreadB" + (i+1));
System.out.println("ThreadB get " + Tools.t1.get());
Thread.sleep(1000);
}
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}
public class Run
{
public static void main(String[] args)
{
try
{
ThreadA threadA = new Thread();
ThreadB threadB = new Thread();
threadA.start();
threadB.start();
for(int i = 0; i < 100; i++)
{
Tools.t1.set("Main" + (i+1));
System.out.println("Main get " + Tools.t1.get());
Thread.sleep(1000);
}
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}
InteritableThreadLocal類可以讓子線程從父類線程中取得值。