4、Java并發(fā)編程入門與高并發(fā)面試-線程安全性

慕課網(wǎng) Jimin老師 Java并發(fā)編程入門與高并發(fā)面試 學(xué)習(xí)筆記
Java并發(fā)編程入門與高并發(fā)面試

線程安全性定義:
當(dāng)多個線程訪問某個類時,不管運行時環(huán)境采用何種調(diào)度方式或者這些進程將如何交替執(zhí)行,并且在主調(diào)代碼中不需要任何額外的同步或協(xié)同,這個類都能表現(xiàn)出正確的行為,那么就稱這個類是線程安全的

原子性:提供了互斥訪問,同一時刻只能有一一個線程來對它進行操作
可見性: 一個線程對主內(nèi)存的修改可以及時的被其他線程觀察到
有序性:一個線程觀察其他線程中的指令執(zhí)行順序,由于指令重排序的存在,該觀察結(jié)果- -般雜亂無序

原子性-Atomic包
◆AtomicXXX : CAS. Unsafe.compareAndSwapInt
package com.huhao.concurrency.example.count;

import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 代碼模擬并發(fā)測試
 * Atomic包演示
 */
@Slf4j
@ ThreadSafe
public class CountExample2 {

    //請求總數(shù)
    public static int clientTotal = 5000;

    //同時并發(fā)執(zhí)行的線程數(shù)
    public static int threadTotal = 200;

    public static AtomicInteger count = new AtomicInteger(0);


    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int index = 0; index < clientTotal; index++) {
            exec.execute(() -> {
                try {
                    //線程請求,如果線程數(shù)已經(jīng)到了,可能會阻塞,等有線程釋放了再執(zhí)行
                    semaphore.acquire();
                    add();
                    //add執(zhí)行完后,釋放當(dāng)前線程
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("exception", e);
                    e.printStackTrace();
                }

                countDownLatch.countDown();
            });
        }
        //保證線程減到0
        countDownLatch.await();
        //關(guān)閉線程池
        exec.shutdown();
        log.error("count:{}", count);//count:4952
    }

    private static void add() {
        count.incrementAndGet();
    }
}

原因詳解:

public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

/**
 * eg:2+1=3
 * @param var1 :當(dāng)前count對象
 * @param var2:當(dāng)前的值  2
 * @param var4 :加的值   1
 * 
 * var5是底層方法獲取到的值。如果沒有別的線程處理更改的時候,正常應(yīng)該var5=2,
 * compareAndSwapInt 來判斷,直到var2與var5相同時,才會var5+var4=2+1,返回3
 * @return the updated value
 */
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}
AtomicLong、LongAdder

由于getAndAddInt方法,一直循環(huán)比較,在線程數(shù)少的時候,比較的成功率比較高,在多的時候,失敗的次數(shù)會增多,導(dǎo)致一直循環(huán),效率較低
并發(fā)量大的時候,使用LongAdder效率會高一點。如果小的話,也可以用AtomicInteger

AtomicReference 、AtomicReferenceFieldUpdater
package com.huhao.concurrency.example.atomic;

import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;

/**
 * 代碼模擬并發(fā)測試
 * AtomicReference包演示
 */
@Slf4j
@ThreadSafe
public class AtomicExampleAtomicReference {

    private static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {
        count.compareAndSet(0, 2);//count=2
        count.compareAndSet(0, 1);//count=2,不是0,則不執(zhí)行
        count.compareAndSet(1, 3);//count=2,不是0,則不執(zhí)行
        count.compareAndSet(2, 4);//count=4
        count.compareAndSet(3, 5);//count=4,不是0,則不執(zhí)行
        log.info("count:{}", count.get());//count:4
    }
}
package com.huhao.concurrency.example.atomic;

import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;

/**
 * 代碼模擬并發(fā)測試
 * AtomicReferenceFieldUpdater包演示
 */
@Slf4j
@ThreadSafe
public class AtomicExampleAtomicExampleAtomicReference {

    //AtomicExampleAtomicExampleAtomicReference的count字段,必須用volatile修飾
    private static AtomicIntegerFieldUpdater<AtomicExampleAtomicExampleAtomicReference>
            updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExampleAtomicExampleAtomicReference.class, "count");

    @Getter
    public volatile int count = 100;

    public static void main(String[] args) {

        AtomicExampleAtomicExampleAtomicReference example5 = new AtomicExampleAtomicExampleAtomicReference();

        if (updater.compareAndSet(example5, 100, 120)) {
            log.info("update success 1,{}", example5.getCount());   // update success 1,120
            if (updater.compareAndSet(example5, 100, 120)) {
                log.info("update success 2,{}", example5.getCount());
            } else {
                log.info("update failed,{}", example5.getCount());  // update failed,120
            }
        }
    }
}
AtomicStampReference : CAS的ABA問題

aba問題:cas操作時候,其他線程將變量值A(chǔ),從A改到B,又改回A。本線程在比較的時候,發(fā)現(xiàn)A變量沒有變,于是就講A值進行了交換操作,其實該值以及被其他線程更新過,與最初設(shè)計初衷不符。
解決:在變量更新時候,將變量版本號加1

AtomicBoolean

只執(zhí)行一次


import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 代碼模擬并發(fā)測試
 * AtomicBoolean包演示
 */
@Slf4j
@ThreadSafe
public class AtomicExampleAtomicBoolean {

    //請求總數(shù)
    public static int clientTotal = 5000;

    //同時并發(fā)執(zhí)行的線程數(shù)
    public static int threadTotal = 200;

    public static AtomicBoolean isHappened = new AtomicBoolean();


    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int index = 0; index < clientTotal; index++) {
            exec.execute(() -> {
                try {
                    //線程請求,如果線程數(shù)已經(jīng)到了,可能會阻塞,等有線程釋放了再執(zhí)行
                    semaphore.acquire();
                    test();
                    //add執(zhí)行完后,釋放當(dāng)前線程
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("exception", e);
                    e.printStackTrace();
                }

                countDownLatch.countDown();
            });
        }
        //保證線程減到0
        countDownLatch.await();
        //關(guān)閉線程池
        exec.shutdown();
        log.error("count:{}", isHappened.get());
    }

    private static void test() {
        if (isHappened.compareAndSet(false, true)) {
            //5000次線程,只會執(zhí)行一次
            log.info("exec");
        }
    }
}

原子性-synchronized

◆修飾代碼塊:大括號括起來的代碼,作用于調(diào)用的對象
◆修飾方法:整個方法,作用于調(diào)用的對象
◆修飾靜態(tài)方法:整個靜態(tài)方法,作用于所有對象
◆修飾類:括號括起來的部分,作用于所有對象

package com.huhao.concurrency.example.sync;

import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 *  Synchronized測試修飾代碼塊和方法
 */
@Slf4j
@ThreadSafe
public class SynchronizedExample {

    /**
     * 一般的循環(huán),多線程測下看看對不對
     */
    private void notSyncBlock() {
        for (int i = 0; i < 10; i++) {
            log.info("notSyncBlock - {}", i);
        }
    }

    /**
     * 修飾代碼塊
     * 大括號括起來的代碼,作用于調(diào)用的對象
     */
    private void synchronizedBlock() {
        synchronized (this) {
            for (int i = 0; i < 4; i++) {
                log.info("test1 - {}", i);
            }
        }
    }

    /**
     * 修飾代碼塊
     * 大括號括起來的代碼,作用于調(diào)用的對象
     */
    private void synchronizedBlock(int j) {
        synchronized (this) {
            for (int i = 0; i < 4; i++) {
                log.info("synchronizedBlock {} - {}", j, i);
            }
        }
    }

    /**
     * 修飾方法
     * 整個方法,作用于調(diào)用的對象
     */
    private synchronized void synchronizedMethod() {
        for (int i = 0; i < 10; i++) {
            log.info("test2 - {}", i);
        }
    }

    //請求總數(shù)
    public static int clientTotal = 4;

    //同時并發(fā)執(zhí)行的線程數(shù)
    public static int threadTotal = 3;

    public static void main(String[] args) throws InterruptedException {
        SynchronizedExample synchronizedExample = new SynchronizedExample();
        SynchronizedExample synchronizedExample2 = new SynchronizedExample();

        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        /**
         * 線程同步執(zhí)行測試是不是同步一個個的執(zhí)行
         */
        for (int index = 0; index < clientTotal; index++) {
            exec.execute(() -> {
                try {
                    //線程請求,如果線程數(shù)已經(jīng)到了,可能會阻塞,等有線程釋放了再執(zhí)行
                    semaphore.acquire();
                    /**
                     *  沒有加sync的,
                     *  notSyncBlock - 0
                     *  notSyncBlock - 0
                     *  notSyncBlock - 0
                     *  notSyncBlock - 1
                     *  notSyncBlock - 1
                     *  notSyncBlock - 1
                     *  notSyncBlock - 2
                     *  notSyncBlock - 2
                     *  notSyncBlock - 2
                     *  notSyncBlock - 3
                     */
//                    synchronizedExample.notSyncBlock();


                    /**
                     * 加了sync的,同步執(zhí)行
                     * test1 - 0
                     * test1 - 1
                     * test1 - 2
                     * test1 - 3
                     * test1 - 0
                     * test1 - 1
                     * test1 - 2
                     * test1 - 3
                     * test1 - 0
                     * test1 - 1
                     * test1 - 2
                     * test1 - 3
                     * test1 - 0
                     */
//                    synchronizedExample.synchronizedBlock();

                    /**
                     * example和example2會同時調(diào)用,但是本身是順序執(zhí)行的
                     * 同一個對象,是同步執(zhí)行
                     * 不同對象間,不干擾
                     */
//                    synchronizedExample.synchronizedBlock(0);
//                    synchronizedExample2.synchronizedBlock(1);
//                    synchronizedExample2.synchronizedBlock(2);

                    //add執(zhí)行完后,釋放當(dāng)前線程
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("exception", e);
                    e.printStackTrace();
                }

                countDownLatch.countDown();
            });
        }
        //保證線程減到0
        countDownLatch.await();
        //關(guān)閉線程池
        exec.shutdown();
//        synchronizedExample.synchronizedMethod();
    }
}
package com.huhao.concurrency.example.sync;

import com.huhao.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

/**
 * Synchronized 測試修飾靜態(tài)方法和類
 */
@Slf4j
@ThreadSafe
public class SynchronizedExample2 {

    /**
     * 一般的循環(huán),多線程測下看看對不對
     */
    public void notSyncBlock() {
        for (int i = 0; i < 10; i++) {
            log.info("notSyncBlock - {}", i);
        }
    }

    /**
     * 修飾類
     */
    public static void synchronizedBlock() {
        synchronized (SynchronizedExample2.class) {
            for (int i = 0; i < 4; i++) {
                log.info("test1 - {}", i);
            }
        }
    }

    /**
     * 修飾靜態(tài)方法
     */
    public static synchronized void synchronizedMethod() {
        for (int i = 0; i < 10; i++) {
            log.info("test2 - {}", i);
        }
    }

    //請求總數(shù)
    public static int clientTotal = 4;

    //同時并發(fā)執(zhí)行的線程數(shù)
    public static int threadTotal = 3;

    public static void main(String[] args) throws InterruptedException {
    }
}

對比:

◆synchronized :不可中斷鎖,適合競爭不激烈,可讀性好
◆Lock :可中斷鎖,多樣化同步,競爭激烈時能維持常態(tài)
◆Atomic :競爭激烈時能維持常態(tài),比Lock性能好;只能同步一個值

可見性

導(dǎo)致共享變量在線程間不可見的原因
◆線程交叉執(zhí)行
◆重排序結(jié)合線程交叉執(zhí)行
◆共享變量更新后的值沒有在工作內(nèi)存與主存間及時更新

可見性- synchronized

JMM關(guān)于synchronized的兩條規(guī)定:
◆線程解鎖前 ,必須把共享變量的最新值刷新到主內(nèi)存
◆線程加鎖時 ,將清空工作內(nèi)存中共享變量的值,從而使用共享變量時需要從主內(nèi)存中重新讀取最新的值(注意,
加鎖與解鎖是同一把鎖)

可見性- volatile

通過加入內(nèi)存屏障和禁止重排序優(yōu)化來實現(xiàn)
◆對volatile變量寫操作時 ,會在寫操作后加入-條store屏障指令,將本地內(nèi)存中的共享變量值刷新到主內(nèi)存
◆對volatile變量讀操作時 ,會在讀操作前加入-條load屏障指令,從主內(nèi)存中讀取共享變量

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容