多線程引發(fā)的問(wèn)題
- 各線程共享同一變量存在競(jìng)爭(zhēng),變量值的改變無(wú)法預(yù)測(cè)
- 可以通過(guò)在方法前面加synchronized來(lái)保證結(jié)果的正確,可是syschronized效率太低,性能太差了,代碼示例
Counter類:
public class Counter {
private int count;
public synchronized void add(){
try {
for (int i = 0; i < 200; i++) {
Thread.sleep(100);
this.count++;
System.out.println(this.count);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
Main類:
public class Main {
public static void main(String[] args) {
Counter counter=new Counter();
new Thread(new Runnable() {
@Override
public void run() {
counter.add();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
counter.add();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
counter.add();
}
}).start();
}
}
解釋:三個(gè)線程都將對(duì)Counter類中的count做200次加法,然而線程有可能會(huì)同時(shí)進(jìn)行,從而輸出同樣的值,最后的結(jié)果也不是600,在方法上加上synchronized關(guān)鍵字后,每次只用一個(gè)線程能進(jìn)入這個(gè)方法,解決了共享問(wèn)題,但是性能太低,效率太差了,所以一般是在會(huì)出現(xiàn)競(jìng)爭(zhēng)的代碼前使用synchronized來(lái)實(shí)現(xiàn),代碼如下
public class Counter {
private int count;
public void add(){
try {
for (int i = 0; i < 200; i++) {
Thread.sleep(100);
synchronized (this){//競(jìng)爭(zhēng)條件
this.count++;
System.out.println(this.count);
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}
- 除了使用synchronized關(guān)鍵字,jdk還給我們提供了另一種方法,使用AtomicInter類來(lái)進(jìn)行同步,AtomicInteger是個(gè)類,必須要實(shí)例出來(lái)
public class Counter {
private AtomicInteger count=new AtomicInteger(0);//解決并發(fā)問(wèn)題
public void add(){
try {
for (int i = 0; i < 200; i++) {
Thread.sleep(100);
System.out.println( count.incrementAndGet());//原子操作的自增1
}
}catch (Exception e){
e.printStackTrace();
}
}
}
- synchronsized是內(nèi)部鎖,下面介紹內(nèi)部鎖的可重入性
當(dāng)一個(gè)線程請(qǐng)求其它的線程已經(jīng)占有的鎖時(shí),請(qǐng)求線程將被阻塞。然而內(nèi)部鎖是可重進(jìn)入的,因此線程在試圖獲得它自己占用的鎖是,請(qǐng)求會(huì)成功。重進(jìn)入意味著請(qǐng)求是基于“每一個(gè)線程”,而不是基于“每一次調(diào)用”(互斥鎖是基于每次調(diào)用的)。重進(jìn)入的實(shí)現(xiàn)是通過(guò)為每一個(gè)鎖關(guān)聯(lián)一個(gè)請(qǐng)求技術(shù)器和一個(gè)占有他的線程。當(dāng)計(jì)數(shù)為0時(shí),認(rèn)為鎖是未被占用的。線程請(qǐng)求一個(gè)未被占有的鎖時(shí)候,JVM將記錄鎖的占有者,并且將請(qǐng)求計(jì)數(shù)設(shè)置為1。如果同一個(gè)線程再次請(qǐng)求這個(gè)鎖,計(jì)數(shù)將遞增;每次占用線程退出語(yǔ)句塊時(shí),計(jì)數(shù)器值將遞減,直到計(jì)數(shù)器達(dá)到0時(shí)候,鎖被釋放。
指令排序問(wèn)題
編譯器或運(yùn)行時(shí)環(huán)境為了優(yōu)化程序性能而采取的對(duì)指令進(jìn)行重新排序執(zhí)行的一種手段。
也就是說(shuō),對(duì)于下面兩條語(yǔ)句:
int a = 10;
int b = 20;
在計(jì)算機(jī)執(zhí)行上面兩句話的時(shí)候,有可能第二條語(yǔ)句會(huì)先于第一條語(yǔ)句執(zhí)行。所以,千萬(wàn)不要隨意假設(shè)指令執(zhí)行的順序。
代碼示例:
Visiblity1:
public class Visiblity1 {
public static int number;
public static boolean read;
}
ReadThread:
public class ReadThread extends Thread{
@Override
public void run() {
while (!Visiblity1.read){
Thread.yield();
System.out.println(Visiblity1.number);
}
}
}
Test:
public class Test {
public static void main(String[] args) {
new ReadThread().start(); //結(jié)果又三種可能 42 0 不輸出
Visiblity1.number=42;
Visiblity1.read=true;
}
}
解析:結(jié)可能有三種情況,42,0,或者不輸出(未進(jìn)入循環(huán))
第一種可能是預(yù)想情況
第二中是因?yàn)橘x值語(yǔ)句在循環(huán)語(yǔ)句前執(zhí)行
第三種是因?yàn)橹噶畹呐判蚩赡懿皇前凑枕樞驁?zhí)行的
cpu緩存問(wèn)題
cpu會(huì)從緩存去數(shù)據(jù),而不訪問(wèn)內(nèi)存
public class Visiblity {
public static boolean bChanged;
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
for (;;){
if(bChanged==true){
System.out.println("!=");
System.exit(0);
}
}
}
}).start();
Thread.sleep(10);//一定要加
new Thread(new Runnable() {
@Override
public void run() {
for (;;){
bChanged=true;
}
}
}).start();
}
}
結(jié)果是一直等待,因?yàn)榈谝粋€(gè)線程的bChange變量一直從緩存中取
如果加上public volatile static boolean bChanged;關(guān)鍵字,告訴cpu必須去訪問(wèn)內(nèi)存而不是從緩存區(qū)數(shù)據(jù)
- volatile 保證變量的修改讓所有線程可見 阻止指令排序
- volatile是古老的關(guān)鍵字,synchronized已經(jīng)優(yōu)化的很好了,不要去刻意使用volatile
- 所以說(shuō)你傳入synchronized能解決可見性和原子性,volatile只能解決可見性
線程封閉
- 使用final關(guān)鍵字,不要共享變量(就是一句廢話)
- 桟封閉 比如 變量在方法內(nèi)部聲明和修改,簡(jiǎn)單來(lái)說(shuō)就是使用局部變量(變量定義在里面)
- ThreadLocal線程綁定,就是變量在不同線程上的副本(變量定義在外面,但是訪問(wèn)修改的時(shí)候每個(gè)線程都會(huì)有一個(gè)副本)
使用ThreadLocal是實(shí)現(xiàn)線程封閉的最好方法。ThreadLocal內(nèi)部維護(hù)了一個(gè)Map,Map的key是每個(gè)線程的名稱,而Map的值就是我們要封閉的對(duì)象。每個(gè)線程中的對(duì)象都對(duì)應(yīng)著Map中一個(gè)值,也就是ThreadLocal利用Map實(shí)現(xiàn)了對(duì)象的線程封閉。管理鏈接方面經(jīng)常用
public class Visiblity {
private static ThreadLocal<LocalTest> threadLocal=new ThreadLocal<>();
public static void main(String[] args) throws InterruptedException {
LocalTest local=new LocalTest();
new Thread(new Runnable() {
@Override
public void run() {
for (;;){
threadLocal.set(local);
LocalTest l=threadLocal.get();
l.setNum(20);
System.out.println(Thread.currentThread().getName()+"==="+threadLocal.get().getNum());
Thread.yield();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
for (;;){
threadLocal.set(local);
LocalTest l=threadLocal.get();
l.setNum(30);
System.out.println(Thread.currentThread().getName()+"==="+threadLocal.get().getNum());
Thread.yield();
}
}
}).start();
}
}
java同步和并發(fā)容器
- 同步容器 Vector,Hashtable 里面的方法都加了synchronized,性能太差
同步容器依舊會(huì)有問(wèn)題,迭代過(guò)程中發(fā)生修改,會(huì)報(bào)并發(fā)修改異常,可以修改線程時(shí)獨(dú)立拷貝一份集合
并發(fā)修改異常不僅是多線程會(huì)有,單線程也會(huì)有,迭代list集合,直接使用list的remove方法而不使用迭代器的remove方法會(huì)拋出并發(fā)修改異常 - 并發(fā)容器 :
- ConcurrentHashMap是分段鎖,普通的HashTable雖然 線程安全 但是它是直接在每個(gè)方法前加synchronized關(guān)鍵字,效率低,
分段鎖每次訪問(wèn)只允許一個(gè)線程修改哈希表的映射關(guān)系,但ConcurrentHashMap size()方法有可能返回的不是及時(shí)的值
public class multithreading.Test {
public static void main(String[] args) {
ConcurrentHashMap map=new ConcurrentHashMap();
map.putIfAbsent(1,2);//只用當(dāng)key不存在才去設(shè)置
HashTable table=new HashTable();
if(table.get(1)==null){
table.put(1,2);
}//這樣子寫多線程肯定出問(wèn)題
}
}
ConcurrentHashMap是jdk1.5才出來(lái)的,解決了HashTable的效率問(wèn)題,但是會(huì)可能出現(xiàn)誤差,,ConcurrentHashMap是弱一致性的。 多線程環(huán)境下,直接使用判空在去設(shè)值肯定會(huì)出問(wèn)題。
- CopyOnWriteArrayList/Set 讀操作站占絕大部分,寫操作比較少
- 阻塞隊(duì)列BlockingQueue 見的不多
閉鎖 柵欄 信號(hào)量
- 閉鎖:一個(gè)線程依賴于其他線程的業(yè)務(wù),某個(gè)服務(wù)只有等到另一個(gè)操作結(jié)束后才能執(zhí)行
CountDowmLatch 例子(一家人都到了才能吃飯)
class multithreading.Test{
private static CountDownLatch latch = new CountDownLatch(3);
public static void main(String[] args) throws InterruptedException
{
new Thread()
{
public void run()
{
fatherToRes();
latch.countDown();
};
}.start();
new Thread()
{
public void run()
{
motherToRes();
latch.countDown();
};
}.start();
new Thread()
{
public void run()
{
meToRes();
latch.countDown();
};
}.start();
latch.await();
togetherToEat();
}
}
解析CountDowmLatch是一種靈活的閉鎖實(shí)現(xiàn),包含一個(gè)計(jì)數(shù)器,該計(jì)算器初始化為一個(gè)正數(shù),表示需要等待事件的數(shù)量。countDown方法遞減計(jì)數(shù)器,表示有一個(gè)事件發(fā)生,而await方法等待計(jì)數(shù)器到達(dá)0,表示所有需要等待的事情都已經(jīng)完成。
- 柵欄:所有線程必須要拿到某個(gè)狀態(tài)之后這些線程才能進(jìn)行操作
public class multithreading.Test {
public static void main(String[] args) {
int N=4;
CyclicBarrier barrier=new CyclicBarrier(N);
for (int i = 0; i < N; i++) {
new Writer(barrier).start();
}
}
static class Writer extends Thread{
private CyclicBarrier barrier;
public Writer(CyclicBarrier cyclicBarrier){
this.barrier=cyclicBarrier;
}
@Override
public void run() {
System.out.println("線程"+Thread.currentThread().getName()+"正在輸入");
try {
Thread.sleep(5000);//以睡眠來(lái)模擬輸入
System.out.println("線程"+Thread.currentThread().getName()+"數(shù)據(jù)寫入完畢");
barrier.await();//只用柵欄里的所有線程都執(zhí)行到這里,其他線程才能執(zhí)行
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("所有線程寫入完畢");
}
}
}
- 信號(hào)量
public class multithreading.Test {
public static void main(String[] args) {
int N=8;//工人數(shù)
Semaphore semaphore=new Semaphore(5);//機(jī)器數(shù)目
for (int i = 0; i < N; i++) {
new Worker(i,semaphore).start();
}
}
static class Worker extends Thread{
private int num;
private Semaphore semaphore;
public Worker(int num ,Semaphore semaphore){
this.num=num;
this.semaphore=semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人"+this.num+"占用一它機(jī)器");
Thread.sleep(2000);
System.out.println("工人"+this.num+"釋放機(jī)器");
semaphore.release();
}catch (Exception e){
e.printStackTrace();;
}
}
}
} public class multithreading.Test {
public static void main(String[] args) {
int N=8;//工人數(shù)
Semaphore semaphore=new Semaphore(5);//機(jī)器數(shù)目
for (int i = 0; i < N; i++) {
new Worker(i,semaphore).start();
}
}
static class Worker extends Thread{
private int num;
private Semaphore semaphore;
public Worker(int num ,Semaphore semaphore){
this.num=num;
this.semaphore=semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人"+this.num+"占用一它機(jī)器");
Thread.sleep(2000);
System.out.println("工人"+this.num+"釋放機(jī)器");
semaphore.release();
}catch (Exception e){
e.printStackTrace();;
}
}
}
}
柵欄與閉鎖的關(guān)鍵區(qū)別在于,所有的線程必須同時(shí)到達(dá)柵欄位置,才能繼續(xù)執(zhí)行。閉鎖用于等待等待時(shí)間,而柵欄用于等待線程。
場(chǎng)景對(duì)比:
l 閉鎖場(chǎng)景:幾個(gè)人相約去公園游玩,在家做好準(zhǔn)備,約定在某一時(shí)刻同時(shí)出發(fā)去公園,準(zhǔn)備工作進(jìn)行的快的不能提前出門,到點(diǎn)出門。
l 柵欄場(chǎng)景:幾個(gè)人相約去公園游玩,幾個(gè)人去到公園門口,要等全部到達(dá)公園門口后才一起進(jìn)入公園。
l 信號(hào)量場(chǎng)景:幾個(gè)人相約去公園游玩,等大家都到公園后,發(fā)現(xiàn)來(lái)的太遲了,公園游客飽和,公園限制入場(chǎng)游客的數(shù)量。游客在門口等待,出來(lái)一人,再進(jìn)入一人,只能一個(gè)一個(gè)進(jìn)入。
線程池
Executor是一個(gè)接口 ExecutorService是Executor的子類
public class multithreading.Test {
public static void main(String[] args)throws Exception {
Executor executor=Executors.newFixedThreadPool(100);//創(chuàng)建線程池
ServerSocket serverSocket=new ServerSocket(8888);
while (true){
Socket socket=serverSocket.accept();
Runnable task=new Runnable() {
@Override
public void run() {
handleRequest(socket);
}
};
//new Thread(task).start();
executor.execute(task);//交給線程池
}
}
private static void handleRequest(Socket socket) {
}
}
使用線程池對(duì)線程進(jìn)行管理能夠控制線程的數(shù)量
一些其他的線程池:
- Executors.newScheduledThreadPool()線程池中只能放一個(gè),如果這個(gè)線程掛了,會(huì)在線程池中馬上起另一個(gè)線程,保證只有一個(gè)線程且一直可用
- Executors.newCachedThreadPool() 線程數(shù)量沒(méi)有限制
- ThreadPoolExecutor java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個(gè)類,構(gòu)造器中各個(gè)參數(shù)的含義如下
- corePoolSize核心池的大小,這個(gè)參數(shù)跟后面講述的線程池的實(shí)現(xiàn)原理有非常大的關(guān)系。在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒(méi)有任何線程,而是等待有任務(wù)到來(lái)才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個(gè)方法的名字就可以看出,是預(yù)創(chuàng)建線程的意思,即在沒(méi)有任務(wù)到來(lái)之前就創(chuàng)建corePoolSize個(gè)線程或者一個(gè)線程。默認(rèn)情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來(lái)之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中;
- maximumPoolSize:線程池最大線程數(shù),這個(gè)參數(shù)也是一個(gè)非常重要的參數(shù),它表示在線程池中最多能創(chuàng)建多少個(gè)線程;
- keepAliveTime:表示線程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),keepAliveTime才會(huì)起作用,直到線程池中的線程數(shù)不大于corePoolSize,即當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime,則會(huì)終止,直到線程池中的線程數(shù)不超過(guò)corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時(shí),keepAliveTime參數(shù)也會(huì)起作用,直到線程池中的線程數(shù)為0;
- unit:參數(shù)keepAliveTime的時(shí)間單位,
- workQueue:一個(gè)阻塞隊(duì)列,用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù),這個(gè)參數(shù)的選擇也很重要,會(huì)對(duì)線程池的運(yùn)行過(guò)程產(chǎn)生重大影響,一般來(lái)說(shuō),這里的阻塞隊(duì)列有以下幾種選擇:
- threadFactory:線程工廠,主要用來(lái)創(chuàng)建線程;
- handler:表示當(dāng)拒絕處理任務(wù)時(shí)的策略
各種鎖
- 死鎖
互相等待,鎖的嵌套會(huì)出現(xiàn)死鎖
避免死鎖:盡量不要去寫鎖的嵌套,鎖嵌套的順序相同 - 顯示鎖
Lock接口
public class multithreading.Test {
public static void main(String[] args)throws Exception {
Lock lock=new ReentrantLock();//創(chuàng)建可重用鎖
try {
lock.lock();
System.out.println("開始同步");
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
釋放鎖的操作一定要寫在finally里,否則出現(xiàn)異常會(huì)直接鎖死,一般還是用synchronized,需要在復(fù)雜的情況下自定義超時(shí)時(shí)間的情況下才會(huì)使用lock
- ReentrantReadWriteLock 可重入讀寫鎖
設(shè)想以下情景:我們?cè)谙到y(tǒng)中有一個(gè)多線程訪問(wèn)的緩存,多個(gè)線程都可以對(duì)緩存進(jìn)行讀或?qū)懖僮?,但是讀操作遠(yuǎn)遠(yuǎn)多于寫操作,要求寫操作要線程安全,且寫操作執(zhí)行完成要求對(duì)當(dāng)前的所有讀操作馬上可見。
分析上面的需求:因?yàn)橛卸鄠€(gè)線程可能會(huì)執(zhí)行寫操作,因此多個(gè)線程的寫操作必須同步串行執(zhí)行;而寫操作執(zhí)行完成要求對(duì)當(dāng)前的所有讀操作馬上可見,這就意味著當(dāng)有線程正在讀的時(shí)候,要阻塞寫操作,當(dāng)正在執(zhí)行寫操作時(shí),要阻塞讀操作。一個(gè)簡(jiǎn)單的實(shí)現(xiàn)就是將數(shù)據(jù)直接加上互斥鎖,同一時(shí)刻不管是讀還是寫線程,都只能有一個(gè)線程操作數(shù)據(jù)。但是這樣的問(wèn)題就是如果當(dāng)前只有N個(gè)讀線程,沒(méi)有寫線程,這N個(gè)讀線程也要傻呵呵的排隊(duì)讀,盡管其實(shí)是可以安全并發(fā)提高效率的。因此理想的實(shí)現(xiàn)是:
當(dāng)有寫線程時(shí),則寫線程獨(dú)占同步狀態(tài)。
當(dāng)沒(méi)有寫線程時(shí)只有讀線程時(shí),則多個(gè)讀線程可以共享同步狀態(tài)。
讀寫鎖就是為了實(shí)現(xiàn)這種效果而生
public class ReadWriteCache {
private static Map<String, Object> data = new HashMap<>();
private static ReadWriteLock lock = new ReentrantReadWriteLock(false);
private static Lock rlock = lock.readLock();
private static Lock wlock = lock.writeLock();
public static Object get(String key) {
rlock.lock();
try {
return data.get(key);
} finally {
rlock.unlock();
}
}
public static Object put(String key, Object value) {
wlock.lock();
try {
return data.put(key, value);
} finally {
wlock.unlock();
}
}
}
- 公平鎖 誰(shuí)下來(lái),誰(shuí)就先被鎖,先來(lái)后到
- 樂(lè)觀鎖 悲觀鎖 分布式鎖 數(shù)據(jù)庫(kù)層面的
樂(lè)觀鎖的實(shí)現(xiàn):在表中加一個(gè)版本字段,每次讀的時(shí)候判斷版本
悲觀鎖:select * from user for update 相當(dāng)于synchronized
分布式鎖 redis zk