線程的基本協(xié)作
多線程間除了競爭訪問同一資源外,也經(jīng)常需要相互協(xié)作的去執(zhí)行一些任務(wù)。而對(duì)于協(xié)作的基本機(jī)制用的最多的無疑是wait/notify。
協(xié)作的場景
- 生產(chǎn)者消費(fèi)者模式:共享隊(duì)列,一個(gè)負(fù)責(zé)放一個(gè)負(fù)責(zé)取。如果隊(duì)列長度有限且滿了之后則等待。
- 同時(shí)開始:類似運(yùn)動(dòng)員比賽,多個(gè)線程同時(shí)開始執(zhí)行。
- 等待結(jié)束:主線程分發(fā)給各個(gè)子任務(wù)去執(zhí)行任務(wù),主任務(wù)開始執(zhí)行前需要等待各個(gè)子任務(wù)執(zhí)行完畢。
- 異步結(jié)果:
- 集合點(diǎn)
wait/notify
wait和notify方法都是Object類提供給所有子類進(jìn)行線程協(xié)作的一種實(shí)現(xiàn)機(jī)制。
wait:
public final void wait() throws InterruptedException
public final native void wait(long timeout) throws InterruptedException;
一個(gè)帶時(shí)間參數(shù),表示最多等待這么長時(shí)間。
一個(gè)不帶,默認(rèn)為0,表示無限期等待。
如果在wait的過程中線程被中斷,則會(huì)拋出InterruptedException。我們在之前關(guān)于Thread類的的博文中也提到過這個(gè)。
wait在等什么?
我們之前說過的每個(gè)對(duì)象都有一把鎖和一個(gè)等待隊(duì)列,一個(gè)線程在進(jìn)入synchronized代碼塊時(shí),會(huì)嘗試獲取鎖,如果獲取不到則會(huì)把當(dāng)前線程加入等待隊(duì)列中,其實(shí),除了對(duì)于鎖的等待隊(duì)列,每個(gè)對(duì)象還有另一個(gè)等待隊(duì)列,即條件隊(duì)列,該隊(duì)列用于線程間的協(xié)作。調(diào)用
wait就會(huì)把當(dāng)前線程放到條件隊(duì)列上并阻塞,表示當(dāng)前線程執(zhí)行不下去了,它需要等待一個(gè)條件,這個(gè)條件它自己改變不了,需要其他線程改變。當(dāng)其他線程改變了條件后,應(yīng)該調(diào)用Object(等待哪個(gè)對(duì)象就用哪個(gè)對(duì)象)的notify方法:
public final native void notify();
public final native void notifyAll();
notify做的事情就是從條件隊(duì)列中選一個(gè)線程,將其從隊(duì)列中移除并喚醒,notifyAll和notify的區(qū)別是,它會(huì)移除條件隊(duì)列中所有的線程并全部喚醒。
public class WaitThread extends Thread {
private volatile boolean fire = false;
@Override
public void run() {
try {
synchronized (this) {
while(!fire) {
wait();
}
}
System.out.println("fired");
} catch(InterruptedException e) {
}
}
public synchronized void fire() {
this.fire = true;
notify();
}
public static void main(String[] args) throws InterruptedException {
WaitThread waitThread = new WaitThread();
waitThread.start();
Thread.sleep(1000);
System.out.println("fire");
waitThread.fire();
}
}
代碼中的協(xié)作的條件變量式fire,兩個(gè)線程都要訪問該fire變量,容易出現(xiàn)競態(tài)條件所以相關(guān)代碼都被synchronized保護(hù)了。
需要特別注意的是: wait和notify方法的調(diào)用只能再synchronized代碼塊中。如果在調(diào)用wait/notify方法時(shí),當(dāng)前線程沒有對(duì)象鎖的話,那么會(huì)拋出java.lang.IllegalMonitor-StateException。
wait的具體過程:
- 把當(dāng)前線程放入條件等待隊(duì)列,++釋放對(duì)象鎖++,阻塞等待,線程狀態(tài)變?yōu)?code>WAITING或
TIMED_WAITING - 等待時(shí)間到或被其他線程調(diào)用notify/notifyAll從條件隊(duì)列中移除,這時(shí),要重新競爭對(duì)象鎖
- 如果能夠獲得鎖,線程狀態(tài)變?yōu)?code>RUNNABLE,并從wait調(diào)用中返回。
- 否則,該線程加入對(duì)象鎖等待隊(duì)列,線程狀態(tài)變?yōu)?code>BLOCKED,只有在獲得鎖后才會(huì)從wait調(diào)用返回。
這里我們一定要區(qū)別好兩個(gè)等待隊(duì)列,一個(gè)是線程沒有分配到cpu時(shí)間片進(jìn)入到的對(duì)象鎖等待隊(duì)列。另一個(gè)則是線程執(zhí)行遇到條件不滿足的情況進(jìn)入條件等待隊(duì)列。
當(dāng)線程從條件隊(duì)列中返回不代表其等待的條件就滿足了,也有可能是wait方法限定的時(shí)間到達(dá)了。我們在使用wait方法的時(shí)候當(dāng)其跳出后還應(yīng)該再判斷一次。一般我們通過while循環(huán)的方式來做到。
synchronized (obj) {
while(條件不成立)
obj.wait();
…//執(zhí)行條件滿足后的操作
}
調(diào)用notify會(huì)把在條件隊(duì)列中等侍的線程喚醍并從隊(duì)列中移除,但它不會(huì)釋放對(duì)象鎖,也就是說,只有在包含notify的synchronized代碼塊(被synchronized修飾過了)執(zhí)行完后,等待的線程才會(huì)從wait調(diào)用中返回。這一點(diǎn)需要銘記
我們在使用wait時(shí)最難的是搞清楚wait到底等的是什么?,而notify通知的又是什么? 我們需要知道,它們被不同的線程調(diào)用,并共亨相同的鎖和條件等待隊(duì)列(相同對(duì)象的
synchronized代碼塊內(nèi)),它們圍繞一個(gè)共享的條件變量進(jìn)行協(xié)作,這個(gè)條件變量是程序自己維護(hù)的,當(dāng)條件不成立時(shí),線程調(diào)用wait進(jìn)入條件等待隊(duì)列,另一個(gè)線程修改條件后調(diào)用notify,調(diào)用wait的線程被喚醒后需要重新檢查條件變量。從多線程的角度看,它們圍繞共享變量進(jìn)行協(xié)作,從調(diào)用wait的線程角度看,它阻塞等待一個(gè)條件的成立。我們在設(shè)立多線程協(xié)作時(shí),需要想清楚協(xié)作的++共享變量++和++條件++是什么,這是協(xié)作的核心。**
線程的基本協(xié)作示例
我們前面說了線程基本協(xié)作的場景,這里書上給出了對(duì)于這幾種場景的代碼示例:
- 生產(chǎn)者消費(fèi)者模式
- 同時(shí)開始
- 等待結(jié)束
- 異步結(jié)果
- 集合點(diǎn)
生產(chǎn)者/消費(fèi)者模式
隊(duì)列:
static class MyBlockingQueue<E> {
private Queue<E> queue = null;
private int limit;
public MyBlockingQueue(int limit) {
this.limit = limit;
queue = new ArrayDeque<>(limit);
}
public synchronized void put(E e) throws InterruptedException {
while(queue.size() == limit) {
wait();
}
queue.add(e);
notifyAll();
}
public synchronized E take() throws InterruptedException {
while(queue.isEmpty()) {
wait();
}
E e = queue.poll();
notifyAll();
return e;
}
}
生產(chǎn)者:
static class Producer extends Thread {
MyBlockingQueue<String> queue;
public Producer(MyBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
int num = 0;
try {
while(true) {
String task = String.valueOf(num);
queue.put(task);
System.out.println("produce task " + task);
num++;
Thread.sleep((int) (Math.random() * 100));
}
} catch (InterruptedException e) {
}
}
}
消費(fèi)者:
static class Consumer extends Thread {
MyBlockingQueue<String> queue;
public Consumer(MyBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while(true) {
String task = queue.take();
System.out.println("handle task " + task);
Thread.sleep((int)(Math.random()*100));
}
} catch(InterruptedException e) {
}
}
}
主程序:
public static void main(String[] args) {
MyBlockingQueue<String> queue = new MyBlockingQueue<>(10);
new Producer(queue).start();
new Consumer(queue).start();
}
在生產(chǎn)者消費(fèi)者模式中,put等待的是隊(duì)滿而take等待的卻是隊(duì)空。但他們都會(huì)進(jìn)入相同的對(duì)象的條件等待隊(duì)列中。由于等待的條件不同,但兩者的共享變量卻都是該隊(duì)列,所以此處不能使用notify,因?yàn)閚otify只能喚醒一個(gè)線程,而由于線程調(diào)度的機(jī)制。喚醒的如果是同類線程的話則起不到協(xié)調(diào)的作用,所以在共用同一個(gè)條件隊(duì)列而等待的條件卻相反時(shí)應(yīng)該使用notifyAll。
Java提供了專門的阻塞隊(duì)列,包括:
- 接口 BlockingQueue和BlockingDeque
- 基于數(shù)組的實(shí)現(xiàn)類 ArrayBlockingQueue
- 基于鏈表的實(shí)現(xiàn)類 LinkedBlockingQueue和LinkedBlockingDeque
- 基于堆的實(shí)現(xiàn)類 PriorityBlockingQueue
在實(shí)際場景中,應(yīng)該優(yōu)先使用這些實(shí)現(xiàn)類。
同時(shí)開始
同時(shí)開始的按例好比運(yùn)動(dòng)員比賽,一個(gè)主線程(裁判)決定著各個(gè)子線程(運(yùn)動(dòng)員)何時(shí)開始。
協(xié)作對(duì)象fired:
static class FireFlag {
private volatile boolean fired = false;
public synchronized void waitForFire() throws InterruptedException {
while(!fired) {
wait();
}
}
public synchronized void fire() {
this.fired = true;
notifyAll();
}
}
運(yùn)動(dòng)員:
static class Racer extends Thread {
FireFlag fireFlag;
public Racer(FireFlag fireFlag) {
this.fireFlag = fireFlag;
}
@Override
public void run() {
try {
this.fireFlag.waitForFire();
System.out.println("start run "
+ Thread.currentThread().getName());
} catch (InterruptedException e) {
}
}
}
裁判:
public static void main(String[] args) throws InterruptedException {
int num = 10;
FireFlag fireFlag = new FireFlag();
Thread[] racers = new Thread[num];
for(int i = 0; i < num; i++) {
racers[i] = new Racer(fireFlag);
racers[i].start();
}
Thread.sleep(1000);
fireFlag.fire();
}
等待結(jié)束
Thread類的join方法的實(shí)現(xiàn)其實(shí)就是借助于wait方法。其主要代碼:
while (isAlive()) {
wait(0);
}
該代碼意義是:只要該等待線程活著就會(huì)一直等待,join的結(jié)束依賴與線程運(yùn)行結(jié)束的時(shí)候Java系統(tǒng)調(diào)用notifyAll來通知該等待線程。
使用join有時(shí)比較麻煩需要等待各個(gè)子線程結(jié)束。這里書上給出的例子采用了另一種寫法,主線程與子線程協(xié)作的是一個(gè)數(shù),這個(gè)數(shù)表示未完成的線程的個(gè)數(shù),初始值為子線程個(gè)數(shù),主線程需要等待該值變?yōu)?,每個(gè)子線程結(jié)束后需要將該值減一,當(dāng)為0時(shí)調(diào)用notifyAll。
協(xié)作對(duì)象MyLatch:
public class MyLatch {
private int count;
public MyLatch(int count) {
this.count = count;
}
public synchronized void await() throws InterruptedException {
while(count > 0) {
wait();
}
}
public synchronized void countDown() {
count--;
if(count <= 0) {
notifyAll();
}
}
}
子線程:
static class Worker extends Thread {
MyLatch latch;
public Worker(MyLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
//simulate working on task
Thread.sleep((int) (Math.random() * 1000));
this.latch.countDown();
} catch (InterruptedException e) {
}
}
主線程:
public static void main(String[] args) throws InterruptedException {
int workerNum = 100;
MyLatch latch = new MyLatch(workerNum);
Worker[] workers = new Worker[workerNum];
for(int i = 0; i < workerNum; i++) {
workers[i] = new Worker(latch);
workers[i].start();
}
latch.await();
System.out.println("collect worker results");
}
MyLatch也可以應(yīng)用在“同時(shí)開始”的場景,初始值設(shè)為1。
public class RacerWithLatchDemo {
static class Racer extends Thread {
MyLatch latch;
public Racer(MyLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
this.latch.await();
System.out.println("start run "
+ Thread.currentThread().getName());
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) throws InterruptedException {
int num = 10;
MyLatch latch = new MyLatch(1);
Thread[] racers = new Thread[num];
for(int i = 0; i < num; i++) {
racers[i] = new Racer(latch);
racers[i].start();
}
Thread.sleep(1000);
latch.countDown();
}
}
Java中提供了一個(gè)專門的同步類CountDownLatch,在實(shí)際開發(fā)中應(yīng)該使用它。
異步結(jié)果
在主從模式中,手工創(chuàng)建線程比較麻煩。一種常見的模式是異步調(diào)用,異步調(diào)用一般返回一個(gè)名為Future的對(duì)象,通過它可以獲得最終的結(jié)果。在Java中表示子任務(wù)的接口是Callable。如下是書上例子:
子任務(wù):Callable
public interface Callable<V> {
V call() throws Exception;
}
異步調(diào)用的結(jié)果:Future
public interface MyFuture <V> {
V get() throws Exception ;
}
通過該接口的get方法返回真正的結(jié)果。如果結(jié)果還沒有計(jì)算完成,get方法會(huì)阻寒直到計(jì)算完成,如果調(diào)用過程發(fā)生異常,則get方法拋出調(diào)用過程中的異常。
方便主線程調(diào)用子任務(wù)的類 MyExecutor
public <V> MyFuture<V> execute(final Callable<V> task)
通過該方法主線程就不需要在創(chuàng)建并管理子線程了。可以方便的獲取到異步調(diào)用的結(jié)果。如下:
public static void main(String[] args) {
MyExecutor executor = new MyExecutor();
// 子任務(wù)
Callable<Integer> subTask = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
//…執(zhí)行異步任務(wù)
int millis = (int) (Math.random() * 1000);
Thread.sleep(millis);
return millis;
}
};
//異步調(diào)用子任務(wù),返回一個(gè)MyFuture對(duì)象
MyFuture<Integer> future = executor.execute(subTask);//內(nèi)部創(chuàng)了個(gè)子線程去執(zhí)行
//…執(zhí)行其他操作
try {
//獲取異步調(diào)用的結(jié)果
Integer result = future.get();
System.out.println(result);
} catch(Exception e) {
e.printStackTrace();
}
}
所以重點(diǎn)是,MyExecutor類的execute方法是怎么實(shí)現(xiàn)的呢 ?它封裝了創(chuàng)建子線程,同步獲取結(jié)果的過程,它會(huì)創(chuàng)建一個(gè)執(zhí)行子線程。
MyFuture類的execute具體實(shí)現(xiàn):
public <V> MyFuture<V> execute(final Callable<V> task) {
final Object lock = new Object();
final ExecuteThread<V> thread = new ExecuteThread<>(task, lock);
thread.start();
MyFuture<V> future = new MyFuture<V>() {
@Override
public V get() throws Exception {
synchronized (lock) {
while(!thread.isDone()) {
try {
lock.wait();
} catch (InterruptedException e) {
}
}
if(thread.getException() != null) {
throw thread.getException();
}
return thread.getResult();
}
}
};
return future;
}
execute方法啟動(dòng)一個(gè)執(zhí)行子線程,并返回?cái)y帶執(zhí)行結(jié)果的MyFuture對(duì)象。MyFuture的方法會(huì)阻塞等待知道子線程運(yùn)行結(jié)束返回結(jié)果。
執(zhí)行子線程 ExecuteThread
static class ExecuteThread<V> extends Thread {
private V result = null;
private Exception exception = null;
private boolean done = false;
private Callable<V> task;
private Object lock;
public ExecuteThread(Callable<V> task, Object lock) {
this.task = task;
this.lock = lock;
}
@Override
public void run() {
try {
result = task.call();
} catch (Exception e) {
exception = e;
} finally {
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
}
public V getResult() {
return result;
}
public boolean isDone() {
return done;
}
public Exception getException() {
return exception;
}
}
Java中也已經(jīng)包含了一套完善的方案,有:
- 表示異步結(jié)果的接口Future和實(shí)現(xiàn)類FutureTask。
- 用于執(zhí)行異步任務(wù)的接口Executor,以及有更多功能的子接口ExecutorService。
- 用于創(chuàng)建Executor和ExecutorService的工廠方法類Executors。
集合點(diǎn)
和之前等待結(jié)束和同時(shí)開始案例相似。各線程分開行動(dòng),各自到大一個(gè)集合點(diǎn),在集合點(diǎn)需要集齊所有線程,交換數(shù)據(jù)然后再進(jìn)行下一步動(dòng)作。協(xié)作的共享變量n,初始值為線程總數(shù),當(dāng)有一個(gè)線程到達(dá)集合點(diǎn)n減一。直到變?yōu)?即最后一個(gè)也到達(dá),通過notifyAll來喚醒所有條件等待線程。
協(xié)作對(duì)象:AssemblePoint
協(xié)作對(duì)象public class AssemblePoint {
private int n;
public AssemblePoint(int n) {
this.n = n;
}
public synchronized void await() throws InterruptedException {
if(n > 0) {
n--;
if(n == 0) {
notifyAll();
} else {
while(n != 0) {
wait();
}
}
}
}
}
主線程:AssemblePointDemo
public class AssemblePointDemo {
static class Tourist extends Thread {
AssemblePoint ap;
public Tourist(AssemblePoint ap) {
this.ap = ap;
}
@Override
public void run() {
try {
//模擬先各自獨(dú)立運(yùn)行
Thread.sleep((int) (Math.random() * 1000));
//??
ap.await();
System.out.println("arrived");
//…?????????
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) {
final int num = 10;
Tourist[] threads = new Tourist[num];
AssemblePoint ap = new AssemblePoint(num);
for(int i = 0; i < num; i++) {
threads[i] = new Tourist(ap);
threads[i].start();
}
}
}
Java中有一個(gè)專門的同步工具類CyclicBarrier可以替代該AssemblePoint類。
總結(jié)
該節(jié)主要介紹了Java中線程間協(xié)作的基本機(jī)制wait/notify,協(xié)作關(guān)鍵要想淸楚協(xié)作的共享變貴和條件是什么。Java中有專門為協(xié)作而建的阻塞隊(duì)列、同步工具類,以及Executors框架。