wait()、notify()、notifyAll()這三個(gè)函數(shù)都是Object類(lèi)中的方法,而Object類(lèi)是所有類(lèi)的父類(lèi),所以所有對(duì)象實(shí)例都有該方法.
wait():阻塞當(dāng)前之前直到該對(duì)象(調(diào)用wait函數(shù)的對(duì)象)在另一個(gè)線(xiàn)程調(diào)用了notify()或者notifyAll();
notify():喚醒單個(gè)線(xiàn)程
notifyAll():喚醒所有線(xiàn)程
這三個(gè)方法,都是Java語(yǔ)言提供的實(shí)現(xiàn)線(xiàn)程間阻塞(Blocking)和控制進(jìn)程內(nèi)調(diào)度(inter-process communication)的底層機(jī)制。在解釋如何使用前,先說(shuō)明一下兩點(diǎn):
正如Java內(nèi)任何對(duì)象都能成為鎖(Lock)一樣,任何對(duì)象也都能成為條件隊(duì)列(Condition queue)。而這個(gè)對(duì)象里的wait(), notify()和notifyAll()則是這個(gè)條件隊(duì)列的固有(intrinsic)的方法。
一個(gè)對(duì)象的固有鎖和它的固有條件隊(duì)列是相關(guān)的,為了調(diào)用對(duì)象X內(nèi)條件隊(duì)列的方法,你必須獲得對(duì)象X的鎖。這是因?yàn)榈却隣顟B(tài)條件的機(jī)制和保證狀態(tài)連續(xù)性的機(jī)制是緊密的結(jié)合在一起的。
class TestWait {
public static void main(String[]args) {
TestWaitBean bean = new TestWaitBean("bean");
System.out.println("init bean");
System.out.println("invoke bean wait");
try {
bean.wait();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("wait finished");
}
}
static class TestWaitBean {
String name;
public TestWaitBean(String name) {
this.name = name;
}
}
}
上述代碼運(yùn)行后會(huì)拋出異常java.lang.IllegalMonitorStateException
因?yàn)槲传@取對(duì)象bean的鎖,就去調(diào)用bean.wait()
修改一下代碼,實(shí)現(xiàn)一個(gè)簡(jiǎn)單的阻塞、喚醒
class TestWait {
public static void main(String[]args) {
TestWaitBean bean = new TestWaitBean("bean");
System.out.println("init bean");
System.out.println("invoke bean wait");
WeakUpThread thread = new WeakUpThread(bean);
thread.start();
synchronized (bean) {
try {
bean.wait();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("wait finished");
}
}
}
static class TestWaitBean {
String name;
public TestWaitBean(String name) {
this.name = name;
}
}
static class WeakUpThread extends Thread {
Object lock;
public WeakUpThread(Object lock) {
this.lock = lock;
}
@Override
public void run() {
super.run();
synchronized (lock) {
try {
System.out.println("Current Thread is sleep 2000ms" );
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" weakUp!!!");
lock.notify();
}
}
}
}
}
注意:
- 不管是調(diào)用wait,還是notify、notifyAll,都要是在同步修飾的代碼塊或者方法中,即必須先獲取對(duì)象鎖,在調(diào)用對(duì)象方法
使用wait、notifyAll實(shí)現(xiàn)一個(gè)多線(xiàn)程的生產(chǎn)者、消費(fèi)者
class BlockTest {
public static void main(String []args) {
List<Object>list = new ArrayList<>();
Block block = new Block(list);
list.add(null);
Thread thread1 = new Thread(new PutThread(block, new Person("person1")), "thread1");
Thread thread2 = new Thread(new PutThread(block, new Person("person2")), "thread2");
Thread thread3 = new Thread(new PutThread(block, new Person("person3")), "thread3");
Thread thread4 = new Thread(new PutThread(block, new Person("person4")), "thread4");
Thread thread5 = new Thread(new OutThread(block), "thread5");
Thread thread6 = new Thread(new OutThread(block), "thread6");
Thread thread7 = new Thread(new OutThread(block), "thread7");
thread1.start();
thread2.start();
thread5.start();
thread6.start();
thread7.start();
thread3.start();
thread4.start();
}
static class PutThread implements Runnable {
Block block;
Person person;
public PutThread(Block block, Person person) {
this.block = block;
this.person = person;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " puting the person name is " + person.getName());
block.put(person);
}
}
static class OutThread implements Runnable {
Block block;
public OutThread(Block block) {
this.block = block;
}
@Override
public void run() {
Person person = (Person) block.get();
if(person != null) {
System.out.println(Thread.currentThread().getName() + " get the person name is " + person.getName());
} else {
System.out.println(Thread.currentThread().getName() + " the person is null");
}
}
}
static class Block<T> {
List<T>t;
Object lock = new Object();
int currentIndex = 0;
volatile boolean isRead = false;
public Block(List<T>t){
this.t= t;
}
public void put(T at) {
if(t == null) throw new NullPointerException("t is null");
synchronized (lock) {
try {
if(isRead) {
lock.wait();
}
currentIndex++;
t.add(at);
lock.notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public T get() {
synchronized (lock){
T at = null;
try {
if(currentIndex == 0) {
System.out.println(Thread.currentThread().getName() + " 當(dāng)前下標(biāo)已為0 阻塞 等待寫(xiě)入再取");
lock.wait();
}
if(isRead){
System.out.println(Thread.currentThread().getName() + " 當(dāng)前正在讀 阻塞 等待寫(xiě)入再取 index" + currentIndex);
lock.wait();
}
isRead = true;
at = t.remove(currentIndex);
if(at == null) {
System.out.println(Thread.currentThread().getName() + " index" + currentIndex);
return null;
}
currentIndex --;
return at;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.notifyAll();
isRead = false;
return at;
}
}
}
}
static class Person{
private String name = "";
public Person(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
*** 上述生產(chǎn)者、消費(fèi)者只能適用于多個(gè)線(xiàn)程直接的put、get ***
因?yàn)閣ait、和notify/notifyAll本身就是互斥的,因?yàn)檎{(diào)用wait后,就會(huì)阻塞當(dāng)前調(diào)用線(xiàn)程,
本身線(xiàn)程的notify/notifyAll也就不會(huì)被調(diào)用,所以說(shuō)上述代碼只能實(shí)現(xiàn)多個(gè)線(xiàn)程直接的場(chǎng)景,
還有就是上述代碼只是簡(jiǎn)單的實(shí)現(xiàn),而且是讀操作加鎖,一般而言應(yīng)該是寫(xiě)操作時(shí)堵塞。