一文帶你整明白Java的N種鎖

溫馨提示:本文內容較長廢話較多,如有心臟病、精神病史等請酌情查看。

一、概述

本文源碼基于openJDK8u。在閱讀本文前,你需要對并發(fā)有所了解。

在并發(fā)中,為了解決程序中多個進程和線程對資源的搶占問題,在 Java 中引入了鎖的概念。

各種各樣的鎖,對于初碰 Java 并發(fā)的同學來說,面對多達 20 種的鎖,瞬間懵逼,退游戲這把雞勞資不吃了......

其實不要緊張,雖然鎖的種類很多,但是都是根據其特性衍生出來的概念而已,如果你對 Java 鎖不是很清晰,希望這篇文章能夠對你有所幫助。朋友們,如果你不會做飯或者不知道吃什么請關注我麻辣德子感謝您的雙...呸,學好 Java , 拒絕沉迷某音,哈哈哈哈哈~

下面,是一張關于鎖的思維導圖,帶大家有一個總體的認識,配合此圖食用效果更佳哈。

ht2.png

ps:如果看不清楚可以點擊點擊原圖查看哦。

二、synchronized 帶你跑毒

為了方便大家由淺入深(懷疑作者開車但是沒有證據...),我們從大家比較熟悉的 synchronized 說起。對于 Java 使用者來說,synchronized 關鍵字是實現鎖的一種重要方式。

package com.aysaml.demo.test;


import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * SynchronizedDemo 示例
 *
 * @author wangning
 * @date 2019-11-26
 */
public class SynchronizedDemo {

    private static int count = 0;


    private static void addCount() {
        count++;
    }

    public static void main(String[] args) {

        int loopCount = 1000;

        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("demo-pool-%d").build();
        ExecutorService executorService = new ThreadPoolExecutor(10, 1000,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
        
        for (int i = 0; i < loopCount; i++) {
            Runnable r = SynchronizedDemo::addCount;
            executorService.execute(r);
        }
        executorService.shutdown();
        System.out.println(SynchronizedDemo.count);
    }

}

措不及防的代碼粘貼,哈哈哈。

上面是比較經典的線程并發(fā)問題示例。運行這段代碼,得到的結果多種多樣,996、997、998.....
結果并不像我們預期的那樣是1000,遇到多線程資源競爭時我們可能第一反應的就是加 synchronized,簡單粗暴,于是變成下邊這樣:

private static synchronized void addCount() {
        count++;
    }

在累加方法加上 synchronized,就給這個方法加了鎖,如此,每次執(zhí)行的結果就符合了我們的預期。
Java 內置了一個反編譯工具 javap 可以反編譯字節(jié)碼文件,通過執(zhí)行命令 javap -verbose -p SynchronizedDemo.class 可以看到上述這個類的字節(jié)碼文件。

image.png

查找被加鎖的方法 addCount() 發(fā)現,synchronized 修飾方法 時是通過 ACC_SYNCHRONIZED 標記符號指定該方法是一個同步方法,從而執(zhí)行相應的同步調用。

同樣查看字節(jié)碼,可以知道 synchronized 修飾代碼塊 時 通過monitorentermonitorexit 指令來解決同步問題,其中 monitorenter 指令指向同步代碼塊的開始位置,monitorexit 指令指明同步代碼塊的結束位置。

三、各種鎖的解釋,猥瑣發(fā)育撿槍撿子彈

這部分只是簡單說一下各種鎖以及相關概念,讓大家有一個簡單了解,其中相應的在 Java 中的實現會用較長的篇幅做介紹。

偏向鎖、輕量級鎖、重量級鎖

在程序第一次執(zhí)行到 synchronized 代碼塊的時候,鎖對象變成 偏向鎖 ,即偏向于第一個獲得它的線程的鎖。在程序第二次執(zhí)行到改代碼塊時,線程會判斷此時持有鎖的線程是否就是它自己,如果是就繼續(xù)往下面執(zhí)行。值得注意的是,在第一次執(zhí)行完同步代碼塊時,并不會釋放這個偏向鎖。從效率角度來看,如果第二次執(zhí)行同步代碼塊的線程一直是一個,并不需要重新做加鎖操作,沒有額外開銷,效率極高。

前面說的只有一個線程同步執(zhí)行代碼塊只是理想的狀態(tài)下(如果只有一個線程也不用考慮并發(fā)的問題了,雖然這么說有點不太嚴謹哈...),一旦有第二個線程加入 鎖競爭 ,偏向鎖就自動升級為 輕量級鎖 。而這里不同情況需值得注意:當第二個線程想要獲取鎖時,且這個鎖是偏向鎖時,會判斷當前持有鎖的線程是否仍然存活,如果該持有鎖的線程沒有存活,那么偏向鎖并不會升級為輕量級鎖 。什么是鎖競爭:即一個線程想要獲取另一個線程持有的鎖 。

在此狀態(tài)下各個線程繼續(xù)做鎖競爭,沒有搶到鎖的線程循環(huán)判斷是否能夠成功獲取鎖,這種狀態(tài)稱為 自旋 ,故輕量級鎖是一種 自旋鎖 。虛擬機中有個計數器用來記錄自旋次數,默認允許循環(huán)10次,這個值可以通過虛擬機參數-XX:PreBlockSpin來進行修改。如果鎖競爭特別嚴重,達到這個自旋次數最大的限度,輕量級鎖就會升級為重量級鎖。當其他線程嘗試獲取鎖的時候,發(fā)現現在的鎖是重量級鎖,則直接將自己掛起,等待將來被喚醒。

關于這塊的更多信息,可以參考《Synchronized與三種鎖態(tài)》

公平鎖、非公平鎖

當一個線程持有的鎖釋放時,其他線程按照先后順序,先申請的先得到鎖,那么這個鎖就是公平鎖。反之,如果后申請的線程有可能先獲取到鎖,就是非公平鎖

Java 中的 ReentrantLock 可以通過其構造函數來指定是否是公平鎖,默認是非公平鎖。一般來說,使用非公平鎖可以獲得較大的吞吐量,所以推薦優(yōu)先使用非公平鎖。

file

synchronized 就是一種非公平鎖。

樂觀鎖、悲觀鎖

先說悲觀鎖,即在讀數據的時候總認為其他線程會對數據進行修改,所以采取加鎖的形式,一旦本線程要讀取數據時,就加鎖,其他線程被阻塞,等待鎖的釋放。所以悲觀鎖總結為悲觀加鎖阻塞線程。

讀數據時總認為其他線程不會對數據做修改,在更新數據時會判斷其他線程有沒有更新數據,如果有更新,則重新讀取,再次嘗試更新,循環(huán)上述步驟直到更新成功,即為樂觀鎖。

這樣來看樂觀鎖實際上是沒有鎖的,只是通過一種比較交換的方法來保證數據同步,總結為樂觀無鎖回滾重試

CAS(比較和交換)

CAS, 英文直譯為 compare and swap,即比較和交換。上面樂觀鎖也說了,其實就是一種比較與交換的過程。

簡單描述一下就是:讀取到一個值為 A ,在要將這個值更新為B 之前,檢查是否等于 A (比較),如果是則將 A 更新為 B(交換) ,否則什么都不做。

通過這種方式,可以實現不必使用加鎖的方式,就能保證資源在多線程之間的同步,顯然,不阻塞線程,可以大大提高吞吐量。方式雖好,但是也存在問題。

  • ABA 問題,即如果一個值從 A 變?yōu)?B 再變回 A 時,這樣 CAS 就會認為值沒有發(fā)生變化。
    • 對于這個問題,已經有了使用版本號的解決方式,即每次變量更新的時候變量的版本號+1,即由 A->B->A 就變成了 1A->2B->3A 。
  • 循環(huán)時間長開銷大,如果鎖的競爭比較激烈,就會導致 CAS 不斷的重復執(zhí)行,一直循環(huán),耗費 CPU 資源。
  • 只能保證一個變量的同步,顯然,由于其特性,CAS 只能保證一個共享變量的原子操作。

可重入鎖

可重入鎖即允許多個線程多次獲取同一把鎖,那從鎖本身的角度來看,就是可以重新進入該鎖。比如有一個遞歸函數里面有加鎖操作,如果這個鎖不阻塞自己,就是可重入鎖,故也稱遞歸鎖 。

再看上面的偏向鎖、輕量級鎖、重量級鎖可以知道synchronized關鍵字加鎖是可重入的 ,不僅如此,JDK 中實現 Lock 接口的鎖都是可重入的 。感興趣的讀者可以自行了解怎么實現不可重入鎖,這里只講一下鎖的定義。

可中斷鎖

如果線程A持有鎖,線程B等待獲取該鎖。由于線程A持有鎖的時間過長,線程B不想繼續(xù)等待了,我們可以讓線程B中斷自己或者在別的線程里中斷它,這種就是可中斷鎖。

在 Java 中,synchronized就是不可中斷鎖,而Lock的實現類都是可中斷鎖。

獨享鎖、共享鎖

獨享鎖亦稱互斥鎖,排它鎖,容易理解這種鎖每次只允許一個線程持有。反之,就是共享鎖啦。

讀鎖、寫鎖

上面說獨享鎖和共享鎖,其實讀寫鎖就是其最典型的鎖。寫鎖是獨享鎖,讀鎖是共享鎖。在后面我們會著重說一下Java 中的讀寫鎖實現。

三、CAS 在 Java 中的實現,帶上這把 M4-CAS

通過上面對 CAS 的簡單介紹,相信大家對 CAS 也有了一個比較簡單的概念:通過比較和交換實現單個變量的線程安全 。

JDK 中對 CAS 的實現在 java.util.concurrent.atomic 包中:

image.png
類名 描述
AtomicBoolean 可以用原子方式更新的 boolean 值。
AtomicInteger 可以用原子方式更新的 int 值。
AtomicIntegerArray 可以用原子方式更新其元素的 int 數組。
AtomicIntegerFieldUpdater 基于反射的實用工具,可以對指定類的指定 volatile int 字段進行原子更新。
AtomicLong 可以用原子方式更新的 long 值。
AtomicLongArray 可以用原子方式更新其元素的 long 數組。
AtomicLongFieldUpdater 基于反射的實用工具,可以對指定類的指定 volatile long 字段進行原子更新。
AtomicMarkableReference AtomicMarkableReference 維護帶有標記位的對象引用,可以原子方式對其進行更新。用來解決 ABA 問題,只關心有沒有被修改過。
AtomicReference 可以用原子方式更新的對象引用。
AtomicReferenceArray 可以用原子方式更新其元素的對象引用數組。
AtomicReferenceFieldUpdater 基于反射的實用工具,可以對指定類的指定 volatile 字段進行原子更新。
AtomicStampedReference AtomicStampedReference 維護帶有整數“標志”的對象引用,可以用原子方式對其進行更新。用來解決 ABA 問題,與上面的 AtomicMarkableReference 相比,除了關系有沒有被修改過之外,還關心修改了幾次。

AtomicInteger 為例,看看它是如何保證對 int 的操作線程安全的。

package java.util.concurrent.atomic;
import java.util.function.IntUnaryOperator;
import java.util.function.IntBinaryOperator;
import sun.misc.Unsafe;

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // 使用 Unsafe.compareAndSwapInt 進行數據更新
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // 內存偏移量,即內存地址
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

    /**
     * 帶有初值的構造函數
     *
     * @param initialValue the initial value
     */
    public AtomicInteger(int initialValue) {
        value = initialValue;
    }

    /**
     * 無參構造函數默認值為0
     */
    public AtomicInteger() {
    }

    /**
     * 獲得當前值
     *
     * @return the current value
     */
    public final int get() {
        return value;
    }

    /**
     * 設置所給值,因為value是使用volatile關鍵字修飾,所以一經修改,其他線程會立即看到value的修改
     *
     * @param newValue the new value
     */
    public final void set(int newValue) {
        value = newValue;
    }

    /**
     * 設置給定的值,通過調用Unsafe的延遲設置方法不保證結果被其他線程立即看到
     *
     * @param newValue the new value
     * @since 1.6
     */
    public final void lazySet(int newValue) {
        unsafe.putOrderedInt(this, valueOffset, newValue);
    }

    /**
     * 原子方式設置新值,返回舊值
     *
     * @param newValue the new value
     * @return the previous value
     */
    public final int getAndSet(int newValue) {
        return unsafe.getAndSetInt(this, valueOffset, newValue);
    }

    /**
     * 使用 CAS 方式設置新值,成功返回true
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that
     * the actual value was not equal to the expected value.
     */
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    /**
     * 使用 CAS 方式更新值
     *
     * <p><a href="package-summary.html#weakCompareAndSet">May fail
     * spuriously and does not provide ordering guarantees</a>, so is
     * only rarely an appropriate alternative to {@code compareAndSet}.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful
     */
    public final boolean weakCompareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    /**
     * 原子增加
     *
     * @return the previous value
     */
    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }

    /**
     * 原子減1
     *
     * @return the previous value
     */
    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this, valueOffset, -1);
    }

    /**
     * 原子增加給定值
     *
     * @param delta the value to add
     * @return the previous value
     */
    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }

    /**
     * Atomically increments by one the current value.
     *
     * @return the updated value
     */
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

    /**
     * Atomically decrements by one the current value.
     *
     * @return the updated value
     */
    public final int decrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
    }

    /**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the updated value
     */
    public final int addAndGet(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
    }

    /**
     * Atomically updates the current value with the results of
     * applying the given function, returning the previous value. The
     * function should be side-effect-free, since it may be re-applied
     * when attempted updates fail due to contention among threads.
     *
     * @param updateFunction a side-effect-free function
     * @return the previous value
     * @since 1.8
     */
    public final int getAndUpdate(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    /**
     * Atomically updates the current value with the results of
     * applying the given function, returning the updated value. The
     * function should be side-effect-free, since it may be re-applied
     * when attempted updates fail due to contention among threads.
     *
     * @param updateFunction a side-effect-free function
     * @return the updated value
     * @since 1.8
     */
    public final int updateAndGet(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return next;
    }

    /**
     * Atomically updates the current value with the results of
     * applying the given function to the current and given values,
     * returning the previous value. The function should be
     * side-effect-free, since it may be re-applied when attempted
     * updates fail due to contention among threads.  The function
     * is applied with the current value as its first argument,
     * and the given update as the second argument.
     *
     * @param x the update value
     * @param accumulatorFunction a side-effect-free function of two arguments
     * @return the previous value
     * @since 1.8
     */
    public final int getAndAccumulate(int x,
                                      IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    /**
     * Atomically updates the current value with the results of
     * applying the given function to the current and given values,
     * returning the updated value. The function should be
     * side-effect-free, since it may be re-applied when attempted
     * updates fail due to contention among threads.  The function
     * is applied with the current value as its first argument,
     * and the given update as the second argument.
     *
     * @param x the update value
     * @param accumulatorFunction a side-effect-free function of two arguments
     * @return the updated value
     * @since 1.8
     */
    public final int accumulateAndGet(int x,
                                      IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return next;
    }

    /**
     * Returns the String representation of the current value.
     * @return the String representation of the current value
     */
    public String toString() {
        return Integer.toString(get());
    }

    /**
     * Returns the value of this {@code AtomicInteger} as an {@code int}.
     */
    public int intValue() {
        return get();
    }

    /**
     * Returns the value of this {@code AtomicInteger} as a {@code long}
     * after a widening primitive conversion.
     * @jls 5.1.2 Widening Primitive Conversions
     */
    public long longValue() {
        return (long)get();
    }

    /**
     * Returns the value of this {@code AtomicInteger} as a {@code float}
     * after a widening primitive conversion.
     * @jls 5.1.2 Widening Primitive Conversions
     */
    public float floatValue() {
        return (float)get();
    }

    /**
     * Returns the value of this {@code AtomicInteger} as a {@code double}
     * after a widening primitive conversion.
     * @jls 5.1.2 Widening Primitive Conversions
     */
    public double doubleValue() {
        return (double)get();
    }

}

源碼中的注釋寫的很詳細,寫注釋寫到一半決定不做谷歌翻譯了...主要是通過 Unsafe 提供的 compareAndSwapInt(Object var1, long var2, int var4, int var5) 等方法來實現 CAS 原子操作,Unsafe 提供了執(zhí)行低級別、不安全操作的方法,如直接訪問系統(tǒng)內存資源、自主管理內存資源等。

CAS操作包含三個操作數---內存位置、預期原值及新值。執(zhí)行 CAS 操作的時候,將內存位置的值與預期原值比較,如果相匹配,那么處理器會自動將該位置值更新為新值,否則,處理器不做任何操作。我們都知道,CAS 是一條 CPU 的原子指令( cmpxchg 指令),不會造成所謂的數據不一致問題,Unsafe 提供的 CAS 方法(如 compareAndSwapXXX )底層實現即為 CPU 指令 cmpxchg 。

如果小伙伴對其感興趣可以參考 《Java魔法類:Unsafe應用解析》

四、AQS 給你穿上三級甲

AQS,全名 AbstractQueuedSynchronizer,直譯為抽象隊列同步器,是構建鎖或者其他同步組件的基礎框架,可以解決大部分同步問題。實現原理可以簡單理解為:同步狀態(tài)( state ) + FIFO 線程等待隊列 。

  • 資源 state
    AQS使用了一個 int 類型的成員變量 state 來表示同步狀態(tài),使用了 volatile 關鍵字來保證線程間的可見性,當 state > 0 時表示已經獲取了鎖,當 state = 0 時表示釋放了鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態(tài)state進行操作,確保對state的操作是安全的。
    ? 而對于不同的鎖,state 也有不同的值:

    • 獨享鎖中 state =0 代表釋放了鎖,state = 1 代表獲取了鎖。
    • 共享鎖中 state 即持有鎖的數量。
    • 可重入鎖 state 即代表重入的次數。
    • 讀寫鎖比較特殊,因 state 是 int 類型的變量,為 32 位,所以采取了中間切割的方式,高 16 位標識讀鎖的數量 ,低 16 位標識寫鎖的數量 。
  • FIFO 線程等待隊列
    實現隊列的方式無外乎兩種,一是使用數組,二是使用 Node 。AQS 使用了 Node 的方式實現隊列。

static final class Node {
        /** 標記一個節(jié)點是在共享模式,默認為共享模式 */
        static final Node SHARED = new Node();
        /** 標記一個節(jié)點為獨占模式 */
        static final Node EXCLUSIVE = null;

        
        static final int CANCELLED =  1;
      
        static final int SIGNAL    = -1;
        
        static final int CONDITION = -2;
       
        static final int PROPAGATE = -3;

        /** 以此變量來表示當前線程的狀態(tài) */
        volatile int waitStatus;

    /** 前驅 */
        volatile Node prev;

    /** 后繼 */
        volatile Node next;

    /** 用于保存線程 */
        volatile Thread thread;
    
    /** 保存下一個處于等待狀態(tài)的Node */
        Node nextWaiter;

        /**
         * 用來判斷是否是共享模式
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // 無參構造方法,默認為共享模式
        }

        Node(Thread thread, Node mode) {     // 用于構造下一個等待線程Node節(jié)點
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // 用于構造帶有本線程狀態(tài)的Node
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

在 AQS 中定義了兩個節(jié)點,分別為頭尾節(jié):

/** 等待隊列的頭結點,作為隊列的初始化節(jié)點,只能通過setHead()方法設置值,
    *   而這個方法將Node的變量值都置空,便于及時GC。當其有值時,必須保證waiteStatus為CANCELLED狀態(tài)。
    */
    private transient volatile Node head;

    /**
     * 用于保存線程等待隊列的尾結點。
     * 通過enq()方法設置值。
     */
    private transient volatile Node tail;

這個隊列的結構見下圖:

image.png

AQS 的一堆方法,按照獲取鎖和解鎖的維度可以分為下面這樣:

獲取鎖相關方法

方法 描述
acquire(int arg) 獨占模式獲取鎖,忽略中斷。
acquireInterruptibly(int arg) 獨占模式獲取鎖,如果被中斷則中止。
acquireShared(int arg) 共享模式獲取鎖,忽略中斷。
acquireSharedInterruptibly(int arg) 共享模式獲取鎖,如果被中斷則中止。
tryAcquire(int arg) 嘗試在獨占模式獲取鎖。由子類自行實現。
tryAcquireNanos(int arg, long nanosTimeout) 嘗試在獨占模式獲取鎖,如果被中斷則中止,如果到了給定超時時間 nanosTimeout ,則會失敗。
tryAcquireShared(int arg) 嘗試在共享模式獲取鎖。
tryAcquireSharedNanos(int arg, long nanosTimeout) 嘗試在共享模式獲取鎖,如果被中斷則中止,如果到了給定超時時間 nanosTimeout ,則會失敗。
addWaiter(Node mode) 將當前線程加入到CLH隊列隊尾。
acquireQueued(final Node node, int arg) 當前線程會根據公平性原則來進行阻塞等待,直到獲取鎖為止;并且返回當前線程在等待過程中有沒有中斷過。
selfInterrupt() 產生一個中斷。

解鎖相關方法

方法 描述
release(int arg) 以獨占模式釋放對象。
releaseShared(int arg) 以共享模式釋放對象。
tryRelease(int arg) 試圖設置狀態(tài)來反映獨占模式下的一個釋放。由子類自行實現。
tryReleaseShared(int arg) 試圖設置狀態(tài)來反映共享模式下的一個釋放。
unparkSuccessor(Node node) 用來喚醒節(jié)點。

五、Lock 帶你吃雞

除了 synchronized 外,在 Java 中還有 Lock 接口的一系列實現來加鎖。

file

如上綠色虛線表示實現,WriteLock、ReadLock、ReentrantLock 都實現了 Lock 接口,三者分別對應讀鎖、寫鎖和可重入鎖,其中 ReadWriteLock 定義了讀鎖和寫鎖,ReentrantReadWriteLock 以靜態(tài)內部類的形式實現了讀寫鎖。

首先創(chuàng)建一個單例讀寫鎖:

package com.aysaml.demo.test;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 單例讀寫鎖
 *
 * @author wangning
 * @date 2019-11-26
 */
public enum Locker {

    instance;

    private Locker() {
    }

    private static final ReadWriteLock lock = new ReentrantReadWriteLock();

    public Lock writeLock() {
        return lock.writeLock();
    }
}

如此可以在上述的累加方法上面加鎖做同步:

private static void addCount() {
        Lock locker = Locker.instance.writeLock();
        locker.lock();
        count++;
        locker.unlock();
    }

看下運行結果,如愿以償(總感覺這個詞用在這不太對,真是實在想不出什么詞了,哈哈,意思你們懂就好)。

file

兩種加鎖方式比較:

synchronized屬于互斥鎖,任何時候只允許一個線程的讀寫操作,其他線程必須等待;
ReadWriteLock允許多個線程獲得讀鎖,但只允許一個線程獲得寫鎖,效率相對較高。

看了上面的 Lock 接口的實現圖,我們知道在 Java 中鎖有三個重要實現,下面一一來看。

讀鎖寫鎖抽象隊列同步器三級甲


上面說了讀寫鎖屬于共享鎖,即允許同一時刻有多個線程獲取鎖。在一些業(yè)務中,讀的操作比寫的操作多,相比較 synchronized 而言,讀寫鎖采用 CAS 方式保證資源同步,所以使用讀寫鎖可以大大增加吞吐量。

在Java 中 ReentrantReadWriteLock 中以內部類的形式實現了讀寫鎖,如下:

image.png

再接著分別看他們的實現:

  • ReadLock
file
  • WriteLock
file

可以看到兩個鎖中的加鎖操作都有一個關鍵的東西 Sync :

image.png

Sync 是 ReentrantReadWriteLock 的一個抽象內部類,它繼承了 AbstractQueuedSynchronizer 并實現了共享與獨享方式的同步操作。讀寫鎖正好是一對共享&獨占鎖,而同步的隊列也有共享和獨占之分,那我們就從它們的加鎖和解鎖分別來看 AQS 的工作流程:

寫鎖(獨占式)

加鎖

public void lock() {
            sync.acquire(1);
        }

這是 WriteLock 的加鎖方法,可以看到加鎖實際上是調用了 Sync 的 acquire(int arg) 方法,而這個方法是在 AQS 中實現的,它使用 final 關鍵字來做修飾,在子類中不可重寫。

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire(int arg) 做了什么呢,可以看到執(zhí)行了四個方法:

  • tryAcquire:嘗試獲取鎖,獲取成功則設置鎖狀態(tài)并返回 true ,否則返回 false 。需子類自行實現。
  • addWaiter:將當前線程加入到 CLH 隊列隊尾。已有實現。
  • acquireQueued:當前線程會根據公平性原則來進行阻塞等待,直到獲取鎖為止;并且返回當前線程在等待過程中有沒有中斷過。已有實現。
  • selfInterrupt:產生中斷。已有實現。

前面我們說 AQS 由線程狀態(tài) state 和 線程等待隊列組成,AQS 加鎖解鎖的過程實際上就是對線程狀態(tài)的修改和等待隊列的出入隊列操作,而 AQS 的子類可以通過重寫 tryAcquire(int acquires) 方法來對 state 進行修改操作。于是就有 ReentrantReadWriteLock 中的 Sync 重寫了 tryAcquire 方法:

protected final boolean tryAcquire(int acquires) {
            // 獲取當前線程
            Thread current = Thread.currentThread();
        // 拿到state變量,即鎖的個數
            int c = getState();
        // 獲得寫鎖的數量,前邊說了低16位表示寫鎖個數
            int w = exclusiveCount(c);
        // 若該線程已經持有鎖
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
        // 如果寫鎖數量為0或者持有鎖的線程不是當前線程,返回 false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
        // 如果寫入鎖數量大于最大數量(65535),跑出異常
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
        // 如果寫線程數為0,并且當前線程需要阻塞那么就返回失??;或者如果通過CAS增加寫線程數失敗也返回失敗。
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
        // 設置當前線程為鎖的擁有者
            setExclusiveOwnerThread(current);
            return true;
        }

再來看下 addWaiter 方法:

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
        // 通過 CAS 設置尾結點
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
    // 如果不成功則多次嘗試
        enq(node);
        return node;
    }

enq(Node node) 方法如下:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

可以看到 enq 方法使用了死循環(huán)的方式一致嘗試設置尾結點,直到成功。

如此入隊操作就可以簡單理解為:tail 指向新節(jié)點、新節(jié)點的 prev 指向當前最后的節(jié)點,當前最后一個節(jié)點的 next 指向當前節(jié)點。

解鎖

public void unlock() {
            sync.release(1);
        }

解鎖調用了 Sync 的 release 方法,下面看看這個方法都做了什么:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
        // 喚醒節(jié)點
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

與加鎖類似,tryRelease(arg) 由 AQS 的子類自行重寫,

protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

釋放鎖比較簡單,**釋放鎖的實現是通過 CAS 修改 waitStatus 為 0 來實現的,然后通過 LockSupport.unpark(s.thread) 喚醒線程 **。

為了方便理解,在這里總結一下 WriteLock 的工作流程圖:

? 加鎖操作

file

讀鎖 (共享式)

讀鎖的加鎖操作與寫鎖類似:

public void lock() {
            sync.acquireShared(1);
        }

調用自定義的 tryAcquireShared(arg) 方法獲取同步狀態(tài)

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

如果獲取失敗,調用 doAcquireShared(int arg) 方法自旋方式獲取同步狀態(tài)

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

再看一下 tryAcquireShared(int unused) 方法:

protected final int tryAcquireShared(int unused) {
        
            Thread current = Thread.currentThread();
            int c = getState();
        // 如果其他線程已經獲取了寫鎖,則當前線程獲取讀鎖失敗,進入等待狀態(tài)
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

在這個方法中可以知道,如果其他線程已經獲取了寫鎖,則當前線程獲取讀鎖失敗,進入等待狀態(tài)。如果當前線程獲取了寫鎖或者寫鎖未被獲取,則當前線程增加讀狀態(tài),成功獲取讀鎖。

六、結語

可能讀完之后小伙伴并沒有整明白,反而更懵逼了;或者根本沒有讀完。概念比較多,貼的代碼也比較多,可以配合其中幾個比較重要的圖去理解,幾個比較關鍵的點:鎖的分類思維導圖、AQS、CAS、寫鎖的加鎖實現過程圖。由于筆者水平所限,如上都是在學習的過程中總結、整理所得,僅供參考。

歡迎訪問個人博客 獲取更多知識分享。

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容