
- Java 并發(fā)1:線程的基本概念volatile&synchronized關鍵字
- Java 并發(fā)2 :Java中的原子類&互斥 & 線程中異常處理
- Java 并發(fā)3:線程通信機制和生產(chǎn)者消費者問題&哲學家就餐問題
本文主要分兩個章節(jié),先對線程間通信機制的介紹,然后通過對生產(chǎn)者問題和哲學家問題的解決對線程的基礎部分收尾
- 線程間通信機制
1.1 使用同步機制
1.2 使用輪詢機制
1.3 使用wait/notify
1.4 使用Lock/Condition - 兩個經(jīng)典問題
2.1 哲學家問題死鎖的解決
2.2 生產(chǎn)者消費者問題
線程間通信機制
同步機制
使用關鍵字volatile 和 synchronized ,前面幾篇文章已經(jīng)說明了這個問題,這里不再重復
使用輪詢機制
public class SpinLockTest {
private static CountDownLatch latch = new CountDownLatch(100);
private AtomicReference<Thread> ref = new AtomicReference<>();
public void lock() {
Thread currentThread = Thread.currentThread();
while (!ref.compareAndSet(null, currentThread)) {
}
}
public void unLock() {
Thread thread = Thread.currentThread();
ref.compareAndSet(thread,null);
}
public static void main(String args[]) {
ExecutorService service = Executors.newCachedThreadPool();
SpinLockTest test = new SpinLockTest();
int count[] = {0};
for (int i = 0; i < 100 ; i++) {
service.execute(new Thread(() -> {
test.lock();
count[0] ++;
test.unLock();
latch.countDown();
}));
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(count[0]);
}
}
我們使用了CountDownLatch做一個100次的倒計時,如果倒計時0時,結束阻塞。理想情況下,100個線程應該會讓最后的結果變成100,而結果和我們預料的一致,假設第一個被調(diào)度的線程為A,ref.compareAndSet()返回true(當前是null,expect的是也是null,ref的值被設置成currentThread的值)。當A線程沒有unlock()時,如果來一個B線程,不滿足while中CAS的條件,開始while循環(huán),B線程會一直詢問有鎖嗎,有鎖嗎......直到A線程unlock為止。

我們這個例子中也實現(xiàn)了一個自旋鎖:一個線程從在阻塞到切換成為別的線程的過程,如果只是執(zhí)行簡單的任務的話,切換線程上下文的時間反而比執(zhí)行任務的時間還要長。所以我們可以采取自旋鎖的方法進行線程的同步。
使用wait/notify
wait()和notify()是定義在Object上的native方法,具體的內(nèi)容有賴于各個平臺的實現(xiàn)。
wait/notfity具體使用
- wait()和notify()
wait()函數(shù)調(diào)用之后線程被掛起。調(diào)用了notify()、notifyAll()之后會喚醒一個等待這個對象鎖的線程,但是只有當退出對象鎖的區(qū)域才行。
對象調(diào)用notify()之后只會有一個線程去競爭鎖,notifyAll()會讓所有等待這個對象鎖的線程去競爭鎖。 - 具體使用
Java中給出了一個使用wait()很明確的套路,就是使用這樣的一個結構:
synchronized(object){
//某種條件
while(condition){
//do something
wait();
//do something else
}
}
首先記住以下原則:
-
wait()和notify()方法必須定義在synchronized方法塊中 -
wait()通常情況下放在while塊中,這主要是因為虛假喚醒問題
下面看一段例子:
public class LockReleaseTest {
private static Object object = new Object();
private static class A extends Thread{
private Object object;
public A(Object object){
this.object = object;
}
public void run(){
synchronized (object) {
while (!Thread.interrupted()) {
try {
//讓A線程直接wait
System.out.println("A進入同步代碼塊");
//wait 將線程掛起 從哪里跌倒從哪里爬起來 如果喚醒了 從這里繼續(xù)運行
object.wait();
System.out.println("線程A獲得了鎖");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("A退出同步代碼塊 退出run()");
}
}
private static class B extends Thread{
private Object object;
public B(Object object){
this.object = object;
}
public void run() {
synchronized (object) {
while (!Thread.interrupted()) {
System.out.println("B進入同步代碼塊");
object.notify();
System.out.println("B通知A 從掛起中醒來,但是沒有釋放鎖");
try {
TimeUnit.MILLISECONDS.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程B退出同步代碼塊");
}
System.out.println("線程B釋放了鎖");
}
}
}
public static void main(String args[]) throws InterruptedException {
A a = new A(object);
B b = new B(object);
a.start();
b.start();
}
}
A,B分別是兩個線程,他們都通過一個公共的object進行同步(通過構造函數(shù)傳入的),運行之后的結果如下所示:
A進入同步代碼塊
B進入同步代碼塊
B通知A 從掛起中醒來,但是沒有釋放鎖
線程B退出同步代碼塊
線程B釋放了鎖
線程A獲得了鎖
A退出同步代碼塊 退出run()
A進入同步代碼塊,但是調(diào)用了wait()函數(shù)之后,線程A就掛起了。但是B線程卻可以正常運行。這說明即使A線程調(diào)用了wait(),函數(shù)沒有退出run()但是A線程還是放棄了鎖,并且被B線程獲得。此時object.notify()運行,卻沒有讓A線程立即恢復,只有當B線程休眠結束并且退出同步代碼塊,A線程才能繼續(xù)運行,這就解釋了上面的問題,notify()執(zhí)行之后沒有立刻釋放鎖,只能等待解釋同步代碼塊。
調(diào)用wait()方法的時候一定是獲得了同步鎖的,如果沒有在synchronized塊中調(diào)用wait()方法將拋出異常。
使用Lock 和 Condition
個人認為Lock和Condition是比設計在Object上的wait()和notify()更容易理解的api,所有使用wait¬ify的地方都還可以使用Lock&Condition處理。
生產(chǎn)者消費者問題
生產(chǎn)著消費者問題的場景是:消費者消費生產(chǎn)者生產(chǎn)出并且放在隊列里面的產(chǎn)品,如果產(chǎn)品用完了消費者需要等待,如果隊列滿了,生產(chǎn)者等待。
- 先使用wait¬ify完成:
public class ProducerAndConsumer1 {
private static final Queue<Content> contents = new LinkedList<>();
static class Content{
private String start;
private String end;
private String[] places = {"Shanghai", "Wuhan", "GuangZhou", "Hangzhou"};
public Content(){
int index = new Random().nextInt(places.length);
this.start = this.end = places[index];
}
public String toString(){
return " start " + start + " end " + end;
}
}
@SuppressWarnings("Duplicates")
static class Producer implements Runnable{
private int maxCount;
public Producer(int maxCount){
this.maxCount = maxCount;
}
public void run(){
while(true){
synchronized (contents){
//使用while + wait的語義: 判斷是否還要繼續(xù)等待
while(contents.size() == maxCount){
System.out.println("The queue is full");
try {
contents.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
//模擬生產(chǎn)
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Content content = new Content();
contents.add(content);
System.out.println("produced "+content);
contents.notifyAll();
}
}
}
}
@SuppressWarnings("Duplicates")
static class Consumer implements Runnable{
private int maxCount;
public Consumer(int maxCount){
this.maxCount = maxCount;
}
public void run(){
while(true){
synchronized (contents){
//使用while + wait的語義: 判斷是否還要繼續(xù)等待
while(contents.size() == 0){
System.out.println("The queue is empty");
try {
contents.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Content content =contents.poll();
try {
//模擬消費
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("consumed " + content);
contents.notifyAll();
}
}
}
}
public static void main(String args[]) {
ExecutorService service = Executors.newCachedThreadPool();
int maxCount = 5;
//3個生產(chǎn)者 3個消費者
for (int i = 0; i < 3 ; i++) {
service.execute(new Producer(maxCount));
service.execute(new Consumer(maxCount));
}
}
}
- 使用Lock&Condition解決
public class ProducerAndConsumer2 {
private static final Queue<Content> contents = new LinkedList<>();
private static final Lock lock = new ReentrantLock();
private static final Condition fullQueue = lock.newCondition();
private static final Condition emptyQueue = lock.newCondition();
static class Content{
private String start;
private String end;
private String[] places = {"Shanghai", "Wuhan", "GuangZhou", "Hangzhou"};
public Content(){
int index = new Random().nextInt(places.length);
this.start = this.end = places[index];
}
public String toString(){
return " start " + start + " end " + end;
}
}
static class Producer implements Runnable{
private int maxCount;
public Producer(int maxCount){
this.maxCount = maxCount;
}
public void run() {
while (true) {
lock.lock();
//使用while + wait的語義: 判斷是否還要繼續(xù)等待
while (contents.size() == maxCount) {
System.out.println("The queue is full");
try {
fullQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
//模擬生產(chǎn)
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Content content = new Content();
contents.add(content);
System.out.println("produced " + content);
fullQueue.signalAll();
emptyQueue.signalAll();
lock.unlock();
}
}
}
@SuppressWarnings("Duplicates")
static class Consumer implements Runnable{
private int maxCount;
public Consumer(int maxCount){
this.maxCount = maxCount;
}
public void run() {
while (true) {
lock.lock();
//使用while + wait的語義: 判斷是否還要繼續(xù)等待
while (contents.isEmpty()) {
System.out.println("The queue is empty");
try {
emptyQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Content content = contents.poll();
try {
//模擬消費
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("consumed " + content);
fullQueue.signalAll();
emptyQueue.signalAll();
lock.unlock();
}
}
}
Lock&Condition的用法和上文中相同。
- 使用阻塞隊列來完成生產(chǎn)著消費者問題
使用阻塞隊列能夠很好地幫我們托管同步的問題:
public class ProducerAndConsumer {
private static final int maxCount = 10;
private static final BlockingQueue<Content> queue = new LinkedBlockingDeque<>(maxCount);
static class Content {
private String start;
private String end;
private String[] places = {"Shanghai", "Wuhan", "GuangZhou", "Hangzhou"};
public Content() {
int index = new Random().nextInt(places.length);
this.start = this.end = places[index];
}
public String toString() {
return " start " + start + " end " + end;
}
}
@SuppressWarnings("Duplicates")
static class Producer implements Runnable {
public void run() {
while (true) {
try {
//模擬生產(chǎn)
TimeUnit.MILLISECONDS.sleep(1000);
Content content = new Content();
queue.put(content);
System.out.println("produced " + content);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@SuppressWarnings("Duplicates")
static class Consumer implements Runnable {
public void run() {
while (true) {
try {
//模擬消費
TimeUnit.MILLISECONDS.sleep(500);
Content content = queue.take();
System.out.println("consumed " + content);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String args[]) {
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 5 ; i++) {
service.execute(new Producer());
service.execute(new Consumer());
}
}
}
解決哲學家就餐問題
哲學家就餐問題的一種解法是,可以讓最后一個人拿起的筷子固定就可以解決:
public class DeadLockTest {
//通過哲學家問題演示一個思索的情況
public static class Chopstick {
private boolean taken = false;
public synchronized void take() throws InterruptedException {
//反復檢查是否已經(jīng)被拿走 如果拿走,就算了
while (taken) {
wait();
}
taken = true;
}
public synchronized void drop(){
taken = false;
notifyAll();
}
}
public static class Philosopher implements Runnable {
private Chopstick left;
private Chopstick right;
private final int id;
private final int ponderFactor;
private Random rand = new Random(47);
public Philosopher(Chopstick left, Chopstick right, int ident, int ponder) {
this.ponderFactor = ponder;
this.left = left;
this.right = right;
id = ident;
}
public void pause() throws InterruptedException {
if (ponderFactor == 0) return;
TimeUnit.MILLISECONDS.sleep(rand.nextInt(ponderFactor * 250));
}
public String toString(){
return "Philosopher" + id;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
System.out.println(this + " " + "thinking");
pause();
//哲學家開始就餐
System.out.println(this + " " + "grabbing right");
right.take();
System.out.println(this + " " + "grabbing left" );
left.take();
System.out.println(this + " " + "eating");
pause();
right.drop();
left.drop();
}
} catch (InterruptedException e) {
System.out.println(this + " " + "exiting via interrupt");
}
}
}
public static void main(String args[]) throws InterruptedException {
int ponder = 0;
int size = 5;
ExecutorService exec = Executors.newCachedThreadPool();
Chopstick[] chopsticks = new Chopstick[size];
for (int i = 0; i < size ; i++) {
chopsticks[i] = new Chopstick();
}
for (int i = 0; i < size ; i++) {
//會發(fā)生死鎖
// exec.execute(new Philosopher(chopsticks[i], chopsticks[(i +1) % size], i , ponder));
//死鎖的解決方式
if(i < (size - 1)){
exec.execute(new Philosopher(chopsticks[i], chopsticks[(i +1) % size], i , ponder));
}else{
exec.execute(new Philosopher(chopsticks[0], chopsticks[i], i , ponder));
}
}
//如果發(fā)生死鎖就回卡??!
TimeUnit.SECONDS.sleep(30);
exec.shutdownNow();
}
}
參考內(nèi)容
使用線程間通信機制解決問題
Java 中線程間通信機制
阻塞隊列
讀 《Thinking in Java》有感,遂記之