同步博客: My Love
信息來源:阮一峰每周分享
Though normally each child thread just need to complete its own task, sometimes we may want multiple threads to work together to fulfil a task, which involves the inter-thread communication.
盡管通常情況下子線程只需要完成自己的本職工作就可以了,但是有時(shí)我們可能希望多個(gè)線程一起工作來完成一個(gè)任務(wù),這里面就牽扯到了線程間通信。
The methods and classes covered in this article are: thread.join(), object.wait(), object.notify(), CountdownLatch, CyclicBarrier, FutureTask, Callable, etc.
這篇文章中涉及到的方法和類有:thread.join(), object.wait(), object.notify(), CountdownLatch, CyclicBarrier, FutureTask, Callable,等
Here is the code covered in this article
這是本片文章代碼連接。
I'll use several examples to explain how to implement inter-thread communication in Java.
我將使用一些例子來解釋Java中的線程間通信是怎么實(shí)現(xiàn)的。
- How to make two threads execute in sequence?
- How to make two threads intersect orderly in a specified manner?
- There are four threads: A, B, C, and D (D won't be executed until A, B and C all have finished executing, and A, B, and C are to be executed synchronously.).
- Three athletes prepare themselves apart, and then they start to run at the same time after each of them is ready.
- After the child thread completes a task, it returns the result to the main thread.
- 怎樣讓兩個(gè)線程按順序執(zhí)行?
- 怎樣讓兩個(gè)線程以指定的方式有序交替執(zhí)行?
- 有四個(gè)線程:A, B, C, D(D線程在A,B,C線程之前完之前不會(huì)執(zhí)行,并且A、B、C線程會(huì)被同步執(zhí)行)。
- 三名運(yùn)動(dòng)員分開站立,然后在所有人都準(zhǔn)備好后同時(shí)開始跑。
- 子線程完成一個(gè)任務(wù)后,返回執(zhí)行結(jié)果給主線程。
How To Make Two Threads Execute In Sequence? 怎樣讓兩個(gè)線程順序執(zhí)行?
Suppose there are two threads: thread A and thread B. Both of the two threads can print three numbers (1-3) in sequence. Let's take a look at the code:
假設(shè)有兩個(gè)線程:thread A 和 therad B 。兩個(gè)線程都可以順序打印三個(gè)1-3的數(shù)據(jù)。我們看一下代碼是什么樣的:
private static void demo1() {
Thread A = new Thread(new Runnable() {
@Override
public void run() {
printNumber("A");
}
});
Thread B = new Thread(new Runnable() {
@Override
public void run() {
printNumber("B");
}
});
A.start();
B.start();
}
The implementation for printNumber(String) is as follows, which is used to print the three numbers of 1, 2 and 3 in sequence:
printNumber(String)的實(shí)現(xiàn)如下所示,該函數(shù)用于順序打印1,2,3這個(gè)三個(gè)數(shù)字:
private static void printNumber(String threadName) {
int i=0;
while (i++ < 3) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadName + "print:" + i);
}
}
And the result we get is:
B print: 1
A print: 1
B print: 2
A print: 2
B print: 3
A print: 3
You can see that A and B print numbers at the same time.
你可以看到 A 和 B 會(huì)同時(shí)打印數(shù)字。
So, what if we want B to start printing after A has printed over? We can use the thread.join() method, and the code is as follows:
那么,如果我們想要 B 在 A 打印結(jié)束后再開始打印的話呢?我們可以使用thread.join()這個(gè)方法,代碼實(shí)現(xiàn)如下:
private static void demo2() {
Thread A = new Thread(new Runnable() {
@Override
public void run() {
printNumber("A");
}
});
Thread B = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("B starts waiting for A");
try {
A.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
printNumber("B");
}
});
B.start();
A.start();
}
Now the result obtained is:
現(xiàn)在得到的結(jié)果是:
B starts waiting for A
A print: 1
A print: 2
A print: 3
B print: 1
B print: 2
B print: 3
So we can see that the A.join() method will make B wait until A finishes printing.
由此我們知道 A.join() 方法會(huì)使 B 等待,直到 A 打印完畢。
How To Make Two Threads Intersect Orderly In a Specified Manner? 怎樣讓兩個(gè)線程以指定的方式有序交替執(zhí)行?
So what if now we want B to start printing 1, 2, 3 just after A has printed 1, and then A continues printing 2, 3? Obviously, we need more fine-grained locks to take the control of the order of execution.
那么要是我們想要 B 在 A 剛完成打印1的時(shí)候開始打印1,2,3,然后 A 再繼續(xù)打印2,3的話該怎么辦呢?顯然,我們需要更細(xì)粒度的鎖來控制執(zhí)行的順序。
Here, we can take the advantage of the object.wait() and object.notify() methods. The code is as below:
這里,我們可以利用 object.wait() 和 object.notify() 這兩個(gè)方法。代碼如下:
/**
* A 1, B 1, B 2, B 3, A 2, A 3
*/
private static void demo3() {
Object lock = new Object();
Thread A = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock) {
System.out.println("A 1");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("A 2");
System.out.println("A 3");
}
}
});
Thread B = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock) {
System.out.println("B 1");
System.out.println("B 2");
System.out.println("B 3");
lock.notify();
}
}
});
A.start();
B.start();
}
The results are as follows:
結(jié)果如下:
A 1
A waiting…
B 1
B 2
B 3
A 2
A 3
That's what we want.
這正是我們想要的
What happens? 發(fā)生了什么?
- First we create an object lock shared by A and B:
lock = new Object();- When A gets the lock, it prints 1 first, and then it calls the
lock.wait()method which will make it go into the wait state, and hands over control of the lock then;- B won't be executed until A calls the
lock.wait()method to release control and B gets the lock;- B prints 1, 2, 3 after getting the lock, and then calls the
lock.notify()method to wake up A which is waiting;- A continues printing the remaining 2, 3 after it wakes up.
- 首先我們創(chuàng)建了一個(gè)被 A 和 B 共享的對(duì)象鎖(object lock):
lock = new Object();- 當(dāng) A 獲得鎖時(shí),會(huì)先打印出1,然后會(huì)調(diào)用到
lock.wait()方法,該方法使 A 線程進(jìn)入等待狀態(tài),然后交出鎖的控制權(quán);- B 在 A 調(diào)用到
lock.wait()方法交出鎖的控制權(quán)之前不會(huì)執(zhí)行,在 A 交出鎖的控制權(quán)之后 B 會(huì)獲得這個(gè)鎖;- B 獲取到鎖后打印1,2,3,然后調(diào)用到
lock.notify()方法來喚醒處于等待狀態(tài)的 A;- A 被喚醒后繼續(xù)打印剩余的2,3兩個(gè)數(shù)字。
I add the log to the above code to make it easier to understand.
我給上面的代碼增加了日志信息以幫助容易理解一些。
private static void demo3() {
Object lock = new Object();
Thread A = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("INFO: A is waiting for the lock");
synchronized (lock) {
System.out.println("INFO: A got the lock");
System.out.println("A 1");
try {
System.out.println("INFO: A is ready to enter the wait state, giving up control of the lock");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("INFO: B wakes up A, and A regains the lock");
System.out.println("A 2");
System.out.println("A 3");
}
}
});
Thread B = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("INFO: B is waiting for the lock");
synchronized (lock) {
System.out.println("INFO: B got the lock");
System.out.println("B 1");
System.out.println("B 2");
System.out.println("B 3");
System.out.println("INFO: B ends printing, and calling the notify method");
lock.notify();
}
}
});
A.start();
B.start();
The result is as follows:
INFO: A is waiting for the lock
INFO: A got the lock
A 1
INFO: A is ready to enter the wait state, giving up control of the lock
INFO: B is waiting for the lock
INFO: B got the lock
B 1
B 2
B 3
INFO: B ends printing, and calling the notify method
INFO: B wakes up A, and A regains the lock
A 2
A 3
D Is Executed After A, B and C All Have Finished Executing Synchronously D在A B C 都同步執(zhí)行完后運(yùn)行
The method thread.join() introduced earlier allows one thread to continue executing after waiting for another thread to finish running. But if we join A, B, and C orderly into the D thread, it will make A, B, and C execute in turn, while we want them three to run synchronously.
前面介紹的 thread.join() 方法允許一個(gè)線程在等待另一個(gè)線程執(zhí)行完之后再繼續(xù)執(zhí)行。但是如果我們依次將 A B C join 到線程 D,這就會(huì)試 A B C 三個(gè)線程輪流執(zhí)行,但是我們想要的是讓他們同步執(zhí)行。
The goal we want to achieve is: The three threads A, B, and C can start to run at the same time, and each will notify D after finishing running independently; D won't start to run until A, B, and C all finish running. So we use CountdownLatch to implement this type of communication. Its basic usage is:
我們想要達(dá)到的目的是: A B C 這三個(gè)線程可以同時(shí)開始執(zhí)行,而且每個(gè)線程執(zhí)行完后都會(huì)喚醒 D; D 只有在 A B C 都執(zhí)行完畢后才會(huì)開始執(zhí)行。因此我們可以使用 CountdownLatch 來實(shí)現(xiàn)這種類型的通信,它的基本用法是:
-
Create a counter, and set an initial value,
CountdownLatch countDownLatch = new CountDownLatch(3);創(chuàng)建一個(gè)計(jì)數(shù)器,賦上初值,
CountdownLatch countDownLatch = new CountDownLatch(3); -
Call the
countDownLatch.await()method in the waiting thread and go into the wait state until the count value becomes 0;在等待線程中調(diào)用
countDownLatch.await()方法,在值變?yōu)?后進(jìn)入等待狀態(tài); -
Call the
countDownLatch.countDown()method in other threads, and the method will reduce the count value by one;在其他線程中調(diào)用
countDownLatch.countDown()方法,這個(gè)方法會(huì)將countDownLatch的值減一。 -
When the
countDown()method in other threads turns the count value to 0, thecountDownLatch.await()method in the waiting thread will exit immediately and continue to execute the following code.當(dāng)其他線程中的
countDown()方法將countDownLatch的值最終減為了0,等待線程中的countDownLatch.await()方法就會(huì)立即退出,然后該線程繼續(xù)執(zhí)行接下來的代碼。
The implementation code is as follows:
代碼實(shí)現(xiàn)如下:
private static void runDAfterABC() {
int worker = 3;
CountDownLatch countDownLatch = new CountDownLatch(worker);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("D is waiting for other three threads");
try {
countDownLatch.await();
System.out.println("All done, D starts working");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
for (char threadName='A'; threadName <= 'C'; threadName++) {
final String tN = String.valueOf(threadName);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(tN + "is working");
try {
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(tN + "finished");
countDownLatch.countDown();
}
}).start();
}
}
The result is as follows:
結(jié)果如下:
D is waiting for other three threads
A is working
B is working
C is working
A finished
C finished
B finished
All done, D starts working
In fact, CountDownLatch itself is a countdown counter, and we set the initial count value to 3. When D runs, it first call the countDownLatch.await() method to check whether the counter value is 0, and it will stay in wait state if the value is not 0. A, B, and C each will use the countDownLatch.countDown() method to decrement the countdown counter by 1 after they finish running separately. And when all of them three finish running, the counter will be reduced to 0; then, the await() method of D will be triggered to end A, B, and C, and D will start to go on executing.
事實(shí)上, CountDownLatch 它自身是一個(gè)倒數(shù)計(jì)數(shù)器,而且我們將它的初始值設(shè)置為了3。當(dāng)線程 D 開始執(zhí)行時(shí),會(huì)首先調(diào)用到 countDownLatch.await() 方法去檢查計(jì)數(shù)器的值是否為0,如果計(jì)數(shù)器值不是0當(dāng)前線程就進(jìn)入等待狀態(tài)。 A B C 中的每個(gè)線程分別在執(zhí)行完后使用 countDownLatch.countDown() 方法來將倒數(shù)計(jì)數(shù)器的值減1。當(dāng)它們?nèi)齻€(gè)都執(zhí)行完畢時(shí),計(jì)數(shù)器的值就被減為0;然后線程 D 的 await() 方法將被觸發(fā)來結(jié)束 A B C,D會(huì)繼續(xù)執(zhí)行下面的代碼。
Therefore, CountDownLatch is suitable for the situation where one thread needs to wait for multiple threads.
因此, CountDownLatch 非常適合一個(gè)線程需要等待多個(gè)線程的場(chǎng)景。
3 Runners Preparing To Run 三個(gè)跑步者準(zhǔn)備起跑
Three runners prepare themselves apart, and then they start to run at the same time after each of them is ready.
三個(gè)跑步者分開站立,都準(zhǔn)備好后同時(shí)開始起跑。
This time, each of the three threads A, B, and C need to prepare separately, and then they start to run simultaneously after all of them three are ready. How should we achieve that?
這次, A B C 這三個(gè)線程每一個(gè)都需要獨(dú)立準(zhǔn)備,然后它們都準(zhǔn)備好后同時(shí)開始執(zhí)行。我們?cè)撊绾螌?shí)現(xiàn)這個(gè)目標(biāo)呢?
The CountDownLatch above can be used to count down, but when the count is completed, only one of the threads' await() method will get a response, so multiple threads cannot be triggered at the same time.
上面提到的 CountDownLatch 可用于倒計(jì)時(shí),但是當(dāng)計(jì)數(shù)器結(jié)束時(shí),只有一個(gè)線程的 await() 方法會(huì)得到響應(yīng),所以這種情況下多線程不能同時(shí)被觸發(fā)執(zhí)行。
In order to achieve the effect of threads' waiting for each other, we can use the CyclicBarrier data structure, and its basic usage is:
為了達(dá)到線程相互等待的結(jié)果,我們可以使用 CyclicBarrier 這個(gè)數(shù)據(jù)結(jié)構(gòu),它的基本用法是:
-
First create a public object
CyclicBarrier, and set the number of threads waiting at the same time,CyclicBarrier cyclicBarrier = new CyclicBarrier(3);首先創(chuàng)建一個(gè)公有對(duì)象
CyclicBarrier, 同時(shí)設(shè)置等待線程的數(shù)量:CyclicBarrier cyclicBarrier = new CyclicBarrier(3); -
These threads start to prepare simultaneously. After they are ready, they need to wait for others to finish preparing, so call the
cyclicBarrier.await()method to wait for others;這些線程同時(shí)開始準(zhǔn)備,每個(gè)線程準(zhǔn)備好后,它們需要等待其他人完成準(zhǔn)備,通過調(diào)用
cyclicBarrier.await()方法來等待其他線程; -
When the specified threads that need to wait at the same time all call the
cyclicBarrier.await()method, which means that these threads are ready, then these threads will start to continue executing simultaneously.當(dāng)需要等待的指定線程都調(diào)用了
cyclicBarrier.await()方法時(shí),意味著這些線程都應(yīng)完成了準(zhǔn)備工作,然后這些線程就可以同時(shí)開始執(zhí)行了。
The implementation code is as follows. Imagine that there are three runners who need to start to run simultaneously, so they need to wait for others until all of them are ready.
代碼實(shí)現(xiàn)如下。想象一下有三個(gè)運(yùn)動(dòng)員需要同時(shí)起步,因此它們需要等待其他人都準(zhǔn)備好后再起步。
private static void runABCWhenAllReady() {
int runner = 3;
CyclicBarrier cyclicBarrier = new CyclicBarrier(runner);
final Random random = new Random();
for (char runnerName='A'; runnerName <= 'C'; runnerName++) {
final String rN = String.valueOf(runnerName);
new Thread(new Runnable() {
@Override
public void run() {
long prepareTime = random.nextInt(10000) + 100;
System.out.println(rN + "is preparing for time:" + prepareTime);
try {
Thread.sleep(prepareTime);
} catch (Exception e) {
e.printStackTrace();
}
try {
System.out.println(rN + "is prepared, waiting for others");
cyclicBarrier.await(); // The current runner is ready, waiting for others to be ready
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(rN + "starts running"); // All the runners are ready to start running together
}
}).start();
}
}
The result is as follows:
運(yùn)行結(jié)果如下:
A is preparing for time: 4131
B is preparing for time: 6349
C is preparing for time: 8206
A is prepared, waiting for others
B is prepared, waiting for others
C is prepared, waiting for others
C starts running
A starts running
B starts running
Child Thread Returns The Result To The Main Thread 子進(jìn)程返回結(jié)果給主進(jìn)程
In actual development, often we need to create child threads to do some time-consuming tasks, and then pass the execution results back to the main thread. So how to implement this in Java?
在實(shí)際開發(fā)中,我們經(jīng)常需要?jiǎng)?chuàng)建子線程來完成一些耗時(shí)的操作,然后將執(zhí)行結(jié)果傳遞會(huì)主線程。那么在Java中怎么實(shí)現(xiàn)呢?
So generally, when creating the thread, we'll pass the Runnable object to Thread for execution. The definition for Runnable is as follows:
一般情況下,在創(chuàng)建線程時(shí),我們會(huì)將 Runnable 對(duì)象傳遞給 Thread 來執(zhí)行。 Runnable 的定義如下:
public interface Runnable {
public abstract void run();
}
You can see that run() method does not return any results after execution. Then what if you want to return the results? Here you can use another similar interface class Callable:
可以看到 run() 方法在執(zhí)行完畢后沒有返回任何結(jié)果。那么要是你想返回執(zhí)行結(jié)果的話該怎么做呢?你可以使用另一個(gè)叫做 Callable 的類似接口。
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
It can be seen that the biggest difference for Callable is that it returns the generics.
可以看到 Callable 最大的不同之處是它能夠返回泛型。
So the next question is, how to pass the results of the child thread back? Java has a class, FutureTask, which can work with Callable, but note that the get method which is used to get the result will block the main thread.
所以下一個(gè)問題是,怎樣將子線程的結(jié)果傳遞回來?Java 有一個(gè)可以和 Callable 一起使用的類叫 FutureTask,但是注意用于獲取結(jié)果的 get 方法會(huì)阻塞主線程。
For example, we want the child thread to calculate the sum from 1 to 100 and return the result to the main thread.
舉個(gè)栗子,我們想要子線程計(jì)算1到100的和然后將結(jié)果返回給主線程。
private static void doTaskWithResultInWorker() {
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("Task starts");
Thread.sleep(1000);
int result = 0;
for (int i=0; i<=100; i++) {
result += i;
}
System.out.println("Task finished and return result");
return result;
}
};
FutureTask<Integer> futureTask = new FutureTask<>(callable);
new Thread(futureTask).start();
try {
System.out.println("Before futureTask.get()");
System.out.println("Result:" + futureTask.get());
System.out.println("After futureTask.get()");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
The result is as follows:
運(yùn)行結(jié)果如下:
Before futureTask.get()
Task starts
Task finished and return result
Result: 5050
After futureTask.get()
It can be seen that it blocks the main thread when the main thread calls the futureTask.get() method; then the Callable starts to execute internally and returns the result of the operation; and then the futureTask.get() gets the result and the main thread resumes running.
可以看到主線程調(diào)用 futureTask.get() 時(shí)被阻塞了;然后 Callable 開始在內(nèi)部執(zhí)行并返回操作結(jié)果;然后 futureTask.get() 獲得返回的結(jié)果,主線程繼續(xù)執(zhí)行。
Here we can learn that the FutureTask and Callable can get the result of the child thread directly in the main thread, but they will block the main thread. Of course, if you don't want to block the main thread, you can consider using ExecutorService to put the FutureTask into the thread pool to manage execution.
這里我們可以了解到 FutureTask 和 Callable 可以直接在主線程中獲得子線程的結(jié)果,但是它們會(huì)阻塞主線程。當(dāng)然,如果你不想要阻塞主線程,你可以使用 ExecutorService 來將 FutureTask 放入線程池來管理執(zhí)行。
Summary 總結(jié)
Multithreading is a common feature for modern languages, and inter-thread communication, thread synchronization, and thread safety are very important topics.
多線程是當(dāng)今編程語言的常見功能,線程間通信、線程間同步以及線程安全也是很重要的主題。