慕課網(wǎng) Jimin老師 Java并發(fā)編程入門與高并發(fā)面試 學(xué)習(xí)筆記
Java并發(fā)編程入門與高并發(fā)面試
線程安全性定義:
當(dāng)多個線程訪問某個類時,不管運行時環(huán)境采用何種調(diào)度方式或者這些進程將如何交替執(zhí)行,并且在主調(diào)代碼中不需要任何額外的同步或協(xié)同,這個類都能表現(xiàn)出正確的行為,那么就稱這個類是線程安全的
◆原子性:提供了互斥訪問,同一時刻只能有一一個線程來對它進行操作
◆可見性: 一個線程對主內(nèi)存的修改可以及時的被其他線程觀察到
◆有序性:一個線程觀察其他線程中的指令執(zhí)行順序,由于指令重排序的存在,該觀察結(jié)果- -般雜亂無序
原子性-Atomic包
◆AtomicXXX : CAS. Unsafe.compareAndSwapInt
package com.huhao.concurrency.example.count;
import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 代碼模擬并發(fā)測試
* Atomic包演示
*/
@Slf4j
@ ThreadSafe
public class CountExample2 {
//請求總數(shù)
public static int clientTotal = 5000;
//同時并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
public static AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int index = 0; index < clientTotal; index++) {
exec.execute(() -> {
try {
//線程請求,如果線程數(shù)已經(jīng)到了,可能會阻塞,等有線程釋放了再執(zhí)行
semaphore.acquire();
add();
//add執(zhí)行完后,釋放當(dāng)前線程
semaphore.release();
} catch (InterruptedException e) {
log.error("exception", e);
e.printStackTrace();
}
countDownLatch.countDown();
});
}
//保證線程減到0
countDownLatch.await();
//關(guān)閉線程池
exec.shutdown();
log.error("count:{}", count);//count:4952
}
private static void add() {
count.incrementAndGet();
}
}
原因詳解:
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
/**
* eg:2+1=3
* @param var1 :當(dāng)前count對象
* @param var2:當(dāng)前的值 2
* @param var4 :加的值 1
*
* var5是底層方法獲取到的值。如果沒有別的線程處理更改的時候,正常應(yīng)該var5=2,
* compareAndSwapInt 來判斷,直到var2與var5相同時,才會var5+var4=2+1,返回3
* @return the updated value
*/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
AtomicLong、LongAdder
由于getAndAddInt方法,一直循環(huán)比較,在線程數(shù)少的時候,比較的成功率比較高,在多的時候,失敗的次數(shù)會增多,導(dǎo)致一直循環(huán),效率較低
并發(fā)量大的時候,使用LongAdder效率會高一點。如果小的話,也可以用AtomicInteger
AtomicReference 、AtomicReferenceFieldUpdater
package com.huhao.concurrency.example.atomic;
import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
/**
* 代碼模擬并發(fā)測試
* AtomicReference包演示
*/
@Slf4j
@ThreadSafe
public class AtomicExampleAtomicReference {
private static AtomicReference<Integer> count = new AtomicReference<>(0);
public static void main(String[] args) {
count.compareAndSet(0, 2);//count=2
count.compareAndSet(0, 1);//count=2,不是0,則不執(zhí)行
count.compareAndSet(1, 3);//count=2,不是0,則不執(zhí)行
count.compareAndSet(2, 4);//count=4
count.compareAndSet(3, 5);//count=4,不是0,則不執(zhí)行
log.info("count:{}", count.get());//count:4
}
}
package com.huhao.concurrency.example.atomic;
import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
/**
* 代碼模擬并發(fā)測試
* AtomicReferenceFieldUpdater包演示
*/
@Slf4j
@ThreadSafe
public class AtomicExampleAtomicExampleAtomicReference {
//AtomicExampleAtomicExampleAtomicReference的count字段,必須用volatile修飾
private static AtomicIntegerFieldUpdater<AtomicExampleAtomicExampleAtomicReference>
updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExampleAtomicExampleAtomicReference.class, "count");
@Getter
public volatile int count = 100;
public static void main(String[] args) {
AtomicExampleAtomicExampleAtomicReference example5 = new AtomicExampleAtomicExampleAtomicReference();
if (updater.compareAndSet(example5, 100, 120)) {
log.info("update success 1,{}", example5.getCount()); // update success 1,120
if (updater.compareAndSet(example5, 100, 120)) {
log.info("update success 2,{}", example5.getCount());
} else {
log.info("update failed,{}", example5.getCount()); // update failed,120
}
}
}
}
AtomicStampReference : CAS的ABA問題
aba問題:cas操作時候,其他線程將變量值A(chǔ),從A改到B,又改回A。本線程在比較的時候,發(fā)現(xiàn)A變量沒有變,于是就講A值進行了交換操作,其實該值以及被其他線程更新過,與最初設(shè)計初衷不符。
解決:在變量更新時候,將變量版本號加1
AtomicBoolean
只執(zhí)行一次
import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 代碼模擬并發(fā)測試
* AtomicBoolean包演示
*/
@Slf4j
@ThreadSafe
public class AtomicExampleAtomicBoolean {
//請求總數(shù)
public static int clientTotal = 5000;
//同時并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
public static AtomicBoolean isHappened = new AtomicBoolean();
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int index = 0; index < clientTotal; index++) {
exec.execute(() -> {
try {
//線程請求,如果線程數(shù)已經(jīng)到了,可能會阻塞,等有線程釋放了再執(zhí)行
semaphore.acquire();
test();
//add執(zhí)行完后,釋放當(dāng)前線程
semaphore.release();
} catch (InterruptedException e) {
log.error("exception", e);
e.printStackTrace();
}
countDownLatch.countDown();
});
}
//保證線程減到0
countDownLatch.await();
//關(guān)閉線程池
exec.shutdown();
log.error("count:{}", isHappened.get());
}
private static void test() {
if (isHappened.compareAndSet(false, true)) {
//5000次線程,只會執(zhí)行一次
log.info("exec");
}
}
}
原子性-synchronized
◆修飾代碼塊:大括號括起來的代碼,作用于調(diào)用的對象
◆修飾方法:整個方法,作用于調(diào)用的對象
◆修飾靜態(tài)方法:整個靜態(tài)方法,作用于所有對象
◆修飾類:括號括起來的部分,作用于所有對象
package com.huhao.concurrency.example.sync;
import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Synchronized測試修飾代碼塊和方法
*/
@Slf4j
@ThreadSafe
public class SynchronizedExample {
/**
* 一般的循環(huán),多線程測下看看對不對
*/
private void notSyncBlock() {
for (int i = 0; i < 10; i++) {
log.info("notSyncBlock - {}", i);
}
}
/**
* 修飾代碼塊
* 大括號括起來的代碼,作用于調(diào)用的對象
*/
private void synchronizedBlock() {
synchronized (this) {
for (int i = 0; i < 4; i++) {
log.info("test1 - {}", i);
}
}
}
/**
* 修飾代碼塊
* 大括號括起來的代碼,作用于調(diào)用的對象
*/
private void synchronizedBlock(int j) {
synchronized (this) {
for (int i = 0; i < 4; i++) {
log.info("synchronizedBlock {} - {}", j, i);
}
}
}
/**
* 修飾方法
* 整個方法,作用于調(diào)用的對象
*/
private synchronized void synchronizedMethod() {
for (int i = 0; i < 10; i++) {
log.info("test2 - {}", i);
}
}
//請求總數(shù)
public static int clientTotal = 4;
//同時并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 3;
public static void main(String[] args) throws InterruptedException {
SynchronizedExample synchronizedExample = new SynchronizedExample();
SynchronizedExample synchronizedExample2 = new SynchronizedExample();
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
/**
* 線程同步執(zhí)行測試是不是同步一個個的執(zhí)行
*/
for (int index = 0; index < clientTotal; index++) {
exec.execute(() -> {
try {
//線程請求,如果線程數(shù)已經(jīng)到了,可能會阻塞,等有線程釋放了再執(zhí)行
semaphore.acquire();
/**
* 沒有加sync的,
* notSyncBlock - 0
* notSyncBlock - 0
* notSyncBlock - 0
* notSyncBlock - 1
* notSyncBlock - 1
* notSyncBlock - 1
* notSyncBlock - 2
* notSyncBlock - 2
* notSyncBlock - 2
* notSyncBlock - 3
*/
// synchronizedExample.notSyncBlock();
/**
* 加了sync的,同步執(zhí)行
* test1 - 0
* test1 - 1
* test1 - 2
* test1 - 3
* test1 - 0
* test1 - 1
* test1 - 2
* test1 - 3
* test1 - 0
* test1 - 1
* test1 - 2
* test1 - 3
* test1 - 0
*/
// synchronizedExample.synchronizedBlock();
/**
* example和example2會同時調(diào)用,但是本身是順序執(zhí)行的
* 同一個對象,是同步執(zhí)行
* 不同對象間,不干擾
*/
// synchronizedExample.synchronizedBlock(0);
// synchronizedExample2.synchronizedBlock(1);
// synchronizedExample2.synchronizedBlock(2);
//add執(zhí)行完后,釋放當(dāng)前線程
semaphore.release();
} catch (InterruptedException e) {
log.error("exception", e);
e.printStackTrace();
}
countDownLatch.countDown();
});
}
//保證線程減到0
countDownLatch.await();
//關(guān)閉線程池
exec.shutdown();
// synchronizedExample.synchronizedMethod();
}
}
package com.huhao.concurrency.example.sync;
import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
/**
* Synchronized 測試修飾靜態(tài)方法和類
*/
@Slf4j
@ThreadSafe
public class SynchronizedExample2 {
/**
* 一般的循環(huán),多線程測下看看對不對
*/
public void notSyncBlock() {
for (int i = 0; i < 10; i++) {
log.info("notSyncBlock - {}", i);
}
}
/**
* 修飾類
*/
public static void synchronizedBlock() {
synchronized (SynchronizedExample2.class) {
for (int i = 0; i < 4; i++) {
log.info("test1 - {}", i);
}
}
}
/**
* 修飾靜態(tài)方法
*/
public static synchronized void synchronizedMethod() {
for (int i = 0; i < 10; i++) {
log.info("test2 - {}", i);
}
}
//請求總數(shù)
public static int clientTotal = 4;
//同時并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 3;
public static void main(String[] args) throws InterruptedException {
}
}
對比:
◆synchronized :不可中斷鎖,適合競爭不激烈,可讀性好
◆Lock :可中斷鎖,多樣化同步,競爭激烈時能維持常態(tài)
◆Atomic :競爭激烈時能維持常態(tài),比Lock性能好;只能同步一個值
可見性
導(dǎo)致共享變量在線程間不可見的原因
◆線程交叉執(zhí)行
◆重排序結(jié)合線程交叉執(zhí)行
◆共享變量更新后的值沒有在工作內(nèi)存與主存間及時更新
可見性- synchronized
JMM關(guān)于synchronized的兩條規(guī)定:
◆線程解鎖前 ,必須把共享變量的最新值刷新到主內(nèi)存
◆線程加鎖時 ,將清空工作內(nèi)存中共享變量的值,從而使用共享變量時需要從主內(nèi)存中重新讀取最新的值(注意,
加鎖與解鎖是同一把鎖)
可見性- volatile
通過加入內(nèi)存屏障和禁止重排序優(yōu)化來實現(xiàn)
◆對volatile變量寫操作時 ,會在寫操作后加入-條store屏障指令,將本地內(nèi)存中的共享變量值刷新到主內(nèi)存
◆對volatile變量讀操作時 ,會在讀操作前加入-條load屏障指令,從主內(nèi)存中讀取共享變量