【多線程與并發(fā)】:Java中的鎖


鎖的概念

鎖是用來控制多個線程訪問共享資源的方式,一般來說,一個鎖可以防止多個線程同時訪問共享資源(但有些鎖可以允許多個線程并發(fā)的訪問共享資源,如讀寫鎖)。

在JDK1.5之前,Java是通過synchronized關(guān)鍵字實現(xiàn)鎖功能的:隱式地獲取鎖和釋放鎖,但不夠靈活。

在JDK1.5,java.util.concurrent包中新增了Lock接口以及相關(guān)實現(xiàn)類,用來實現(xiàn)鎖功能。它提供了與synchronized關(guān)鍵字類似的同步功能,但功能更強大和靈活:獲取鎖和釋放鎖的可操作性、可中斷地獲取鎖、超時獲取鎖等,見下表:

特性 描述
嘗試非阻塞地獲取鎖 當前線程嘗試獲取鎖,如果這個時刻鎖沒有被其他線程獲取到,則成功獲取并持有鎖
能被中斷地獲取鎖 獲取到鎖的線程能夠響應(yīng)中斷(而synchronized則不會響應(yīng)中斷操作)
超時獲取鎖 在指定的截止時間之前獲取鎖,如果在截止時間到了仍無法獲取鎖,則返回。

Lock接口具體的方法及釋義:

public interface Lock {

    /**
     * 獲取鎖
     * 
     * 如果當前線程無法獲取到鎖(可能其他線程正在持有鎖),則當前線程就會休眠,直到獲取到鎖
     */
    void lock();

    /**
     * 可中斷地獲取鎖
     * 
     * 如果如果當前線程無法獲取到鎖(可能其他線程正在持有鎖),則當前線程就會休眠,
     * 直到發(fā)生下面兩種情況的任意一種:
     * ①獲取到了鎖
     * ②被其他線程中斷
     */
    void lockInterruptibly() throws InterruptedException;
    
    /**
     * 嘗試非阻塞地獲取鎖
     * 
     * lock()和lockInterruptibly()在獲取不到鎖的時候,都會阻塞當前線程,直到獲取到鎖
     * 而該方法則不會阻塞線程,能立即獲取到鎖則返回true,獲取不到則立即返回false
     * 
     * 該方法的常用方式如下:
     * 
     * Lock lock = ...;
     * if (lock.tryLock()) {
     * try {
     * // manipulate protected state
     * } finally {
     * lock.unlock();
     * }
     * } else {
     * // perform alternative actions
     * }}
     * 
     * 這種使用方式,可以保證只在獲取到鎖的時候才去釋放鎖
     */
    boolean tryLock();

    /**
     * 超時獲取鎖
     * 
     * 當前線程在以下三種情況下會返回:
     * ①當前線程在超時時間內(nèi)獲取到了鎖,返回true
     * ②當前線程在超時時間內(nèi)被中斷,返回false(即該方法可以響應(yīng)其他線程對該線程的中斷操作)
     * ③超時時間結(jié)束,沒有獲取到鎖,返回false
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /**
     * 釋放鎖
     */
    void unlock();


    /**
     * 獲取與該鎖綁定的Condition
     * 
     * 當前線程只有在獲得了鎖,才能調(diào)用Condition的wait()方法(表示我已經(jīng)到了某一條件),
     * 調(diào)用Condition的wait()方法之后,當前線程會釋放鎖
     */
    Condition newCondition();
}

java.util.concurrent.locks的類圖

java.util.concurrent.locks中的類和接口.png

其中:
AbstractOwnableSynchronizer、AbstractQueuedLongSynchronizer、AbstractQueuedSynchronizer是同步器,是鎖實現(xiàn)相關(guān)的內(nèi)容。
ReentrantLock(重入鎖)ReentrantReadWriteLock(重入讀寫鎖)是具體的實現(xiàn)類。
LockSupport是一個工具類,提供了基本的線程阻塞和喚醒功能。
Condition是實現(xiàn)線程間實現(xiàn)多條件等待/通知模式用到的。


同步器的實現(xiàn)原理

TODO


重入鎖:ReentrantLock

重入鎖,顧名思義,就是支持重新進入的鎖:即某線程在獲取到鎖之后,可以再次獲取鎖而不會被阻塞。
ReentrantLock類是通過組合自定義同步器來來實現(xiàn)這種重入特性的,除此之外,該類還支持公平地獲取鎖(獲取鎖的順序與請求鎖的順序是相同的,等待時間最長的線程最優(yōu)先獲取到鎖),還支持綁定多個Condition。(synchronized關(guān)鍵字隱式地支持重進入,比如synchronized修飾的遞歸方法,在方法執(zhí)行時,執(zhí)行線程在獲取了鎖之后仍能連續(xù)多次地獲得該鎖,不會出現(xiàn)阻塞自己的情況)。

ReentrantLock內(nèi)部重進入的實現(xiàn)(非公平獲取鎖的情況)代碼如下:

final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }else if (current == getExclusiveOwnerThread()) {
        //如果是當前持有鎖的線程再次獲取鎖,則將同步值進行增加并返回true
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
}

ReentrantLock公平鎖的內(nèi)部實現(xiàn)代碼如下:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

與非公平獲取鎖的方法nonfairTryAcquire(int acquires)相比,多了一個hasQueuedPredecessors()判斷:同步隊列中當前節(jié)點(當前想要獲取鎖的線程)是否有前驅(qū)節(jié)點,如果該方法返回true,則表示有線程比當前線程更早地請求獲取鎖,因此需要前驅(qū)線程獲取并釋放鎖之后才能繼續(xù)獲取鎖。

公平鎖保證了鎖的獲取按照FIFO原則,而代價是進行大量的線程切換;
非公平鎖雖然可能造成線程“饑餓”(即某線程可能需要等很久才得到鎖),但線程切換極少,可以保證更大的吞吐量。


讀寫鎖:ReentrantReadWriteLock

ReentrantLock在在同一時刻,只允許一個線程進行訪問(無論讀還是寫)。而讀寫鎖是指:在同一時刻,允許多個線程進行讀操作,而寫操作則會阻塞其他所有的線程(無論是讀還是寫,都會被阻塞)。讀寫鎖維護了一對鎖:讀鎖和寫鎖,通過分離讀鎖和寫鎖,使得并發(fā)性能相比一般的排他鎖有了很大的提升。

Java中讀寫鎖的實現(xiàn)類是ReentrantReadWriteLock,它支持:①重進入;②公平性選擇;③鎖降級:寫鎖可以降級為讀鎖,其提供了一些便于外界監(jiān)控其內(nèi)部狀態(tài)的方法,如下:

int getReadLockCount()
返回當前讀鎖被獲取的次數(shù)
注意:該次數(shù)并不等于獲取讀鎖的線程數(shù),
因為同一線程可以連續(xù)獲得多次讀鎖,獲取一次,返回值就加1,
比如,僅一個線程,它連續(xù)獲得了n次讀鎖,那么占據(jù)讀鎖的線程數(shù)是1,但該方法返回n

int getReadHoldCount()
返回當前前程獲取讀鎖的次數(shù)

boolean isWriteLock()
判斷讀鎖是否被獲取

int getWriteHoldCount()
返回當前寫鎖被獲取的次數(shù)

使用舉例:

public class Cache{

    //非線程安全的HashMap
    private static Map<String, Object> map = new HashMap<>();
    //讀寫鎖
    private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    //讀鎖
    private static Lock readLock = reentrantReadWriteLock.readLock();
    //寫鎖
    private static Lock writeLock = reentrantReadWriteLock.writeLock();

    /**
     * 獲取key對應(yīng)的value
     * 
     * 使用讀鎖,使得并發(fā)訪問該方法時不會被阻塞
     */
    public static final Object get(String key){
        readLock.lock();
        try{
            return map.get(key);
        }finally {
            readLock.unlock();
        }
    }

    /**
     * 設(shè)置key對應(yīng)的value
     * 
     * 當有線程對map進行put操作時,使用寫鎖,阻塞其他線程的讀、寫操作,
     * 只有在寫鎖被釋放后,其他讀寫操作才能繼續(xù)
     */
    public static Object put(String key, Object value){
        writeLock.lock();
        try {
            return map.put(key, value);
        }finally {
            writeLock.unlock();
        }
    }

    /**
     * 清空map
     *
     * 當有線程對map進行清空操作時,使用寫鎖,阻塞其他線程的讀、寫操作,
     * 只有在寫鎖被釋放后,其他讀寫操作才能繼續(xù)
     */
    public static void clear(){
        writeLock.lock();
        try {
            map.clear();
        }finally {
            writeLock.unlock();
        }
    }
}

TODO:讀寫鎖的實現(xiàn)原理


LockSupport工具類

LockSupport定義了一組公共靜態(tài)方法,這些方法提供了最基本的線程阻塞和喚醒功能,是構(gòu)建同步組件的基礎(chǔ)工具,它主要有兩類方法:
①以park開頭的方法:阻塞當前線程
②以unpark開頭的方法:喚醒被阻塞的線程

void park()
阻塞當前線程,只有當前線程被中斷或其他線程調(diào)用unpark(Thread thread),才能從park()方法返回

void parkNanos(long nanos)
阻塞當前線程,最長不超過nanos納秒,返回條件在park()的基礎(chǔ)上增加了超時返回

void parkUntil(long deadline)
阻塞當前線程,直到deadline這個時間點(從1970年開始到deadline時間的毫秒數(shù))

void unpark(Thread thread)
喚醒處于阻塞狀態(tài)的thread線程

在JDK1.6中,該類增加了void park(Object blocker)、void parkNanos(Object blocker, long nanos)、void parkUntil(Object blocker, long deadline)方法,相比之前的park方法,多了一個blocker對象,該對象用來標識當前線程在等待的對象(阻塞對象),主要用來問題排查和系統(tǒng)監(jiān)控(對線程dump時,可以提供阻塞對象的信息),可以用來代替原有的park方法。


Condition接口

任意一個Java對象都有一組監(jiān)視器方法(定義在java.lang.Object上):wait()、wait(long timeout)、notify()、notifyAll(),這些方法與sychronized配合使用,可以實現(xiàn)等待/通知模式。
Condition接口也提供了類似的監(jiān)視器方法,與Lock配合使用,可以實現(xiàn)等待/通知模式。

兩者的區(qū)別如下:

對比項 Object Monitor Methods Condition
前置條件 獲取對象的鎖 調(diào)用Lock.lock()獲取鎖→調(diào)用Lock.newCondition()獲取Condition對象
調(diào)用方式 直接調(diào)用,如object.wait() 直接調(diào)用,如condition.await()
等待隊列個數(shù) 1個 多個
當前線程釋放鎖并進入等待狀態(tài) 支持 支持
當前線程釋放鎖并進入等待狀態(tài),在等待狀態(tài)中不響應(yīng)中斷 不支持 支持
當前線程釋放鎖并進入超時等待狀態(tài) 支持 支持
當前線程釋放鎖并進入等待狀態(tài)到將來的某個時間點 不支持 支持
喚醒等待隊列中的一個線程 支持 支持
喚醒等待隊列中的全部線程 支持 支持

Condition中的方法如下:(一般會將Condition對象作為成員變量)
說明:當前線程調(diào)用await()方法后,當前線程會釋放鎖并在此等候,當其他線程調(diào)用signal()方法通知當前線程后,當前線程才從await()方法中返回,并且在返回前已經(jīng)獲取了鎖(re-acquire)。

public interface Condition {
    
    /**
     * 當前線程進入等待狀態(tài)直到被通知(signalled)或中斷(interrupted)
     * 
     * 如果當前線程從該方法返回,則表明當前線程已經(jīng)獲取了Condition對象所對應(yīng)的鎖
     * 
     * @throws InterruptedException
     */
    void await() throws InterruptedException;

    /**
     * 與await()不同是:該方法對中斷操作不敏感
     * 
     * 如果當前線程在等待的過程中被中斷,當前線程仍會繼續(xù)等待,直到被通知(signalled),
     * 但當前線程會保留線程的中斷狀態(tài)值
     * 
     */
    void awaitUninterruptibly();
    
    /**
     * 當前線程進入等待狀態(tài),直到被通知或被中斷或超時
     * 
     * 返回值表示剩余時間,
     * 如果當前線程在nanosTimeout納秒之前被喚醒,那么返回值就是(nanosTimeout-實際耗時),
     * 如果返回值是0或者負數(shù),則表示等待已超時
     * 
     */
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    /**
     * 該方法等價于awaitNanos(unit.toNanos(time)) > 0
     */
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    /**
     * 當前線程進入等待狀態(tài),直到被通知或被中斷或到達時間點deadline
     * 
     * 如果在沒有到達截止時間就被通知,返回true
     * 如果在到了截止時間仍未被通知,返回false
     */
    boolean awaitUntil(Date deadline) throws InterruptedException;
    
    /**
     * 喚醒一個等待在Condition上的線程
     * 該線程從等待方法返回前必須獲得與Condition相關(guān)聯(lián)的鎖
     */
    void signal();

    /**
     * 喚醒所有等待在Condition上的線程
     * 每個線程從等待方法返回前必須獲取Condition相關(guān)聯(lián)的鎖
     */
    void signalAll();
}

使用Condition實現(xiàn)一個有界阻塞隊列的例子:當隊列為空時,隊列的獲取操作將會阻塞當前線程,直到隊列中有新增元素;當隊列已滿時,隊列的插入操作就會阻塞插入線程,直到隊列中出現(xiàn)空位。(其實這個例子就是簡化版的ArrayBlockingQueue

class BoundedBlockingQueue<T> {
    //使用數(shù)組維護隊列
    private Object[] queue;
    //當前數(shù)組中的元素個數(shù)
    private int count = 0;
    //當前添加元素到數(shù)組的位置
    private int addIndex = 0;
    //當前移除元素在數(shù)組中的位置
    private int removeIndex = 0;

    private Lock lock = new ReentrantLock();
    private Condition notEmptyCondition = lock.newCondition();
    private Condition notFullCondition = lock.newCondition();


    private BoundedBlockingQueue() {
    }

    public BoundedBlockingQueue(int capacity) {
        queue = new Object[capacity];
    }

    public void put(T t) throws InterruptedException {
        lock.lock();//獲得鎖,保證內(nèi)部數(shù)組修改的可見性和排他性
        try {
            //使用while,而非if:防止過早或意外的通知,
            //加入當前線程釋放了鎖進入等待狀態(tài),然后其他線程進行了signal,
            //則當前線程會從await()方法中返回,再次判斷count == queue.length
            //todo:哪些情況下的過早或意外???
            while (count == queue.length) {
                notFullCondition.await();//釋放鎖,等待隊列不滿,即等待隊列出現(xiàn)空位
            }
            queue[addIndex] = t;
            addIndex++;
            if (addIndex == queue.length) {
                addIndex = 0;
            }
            count++;
            notEmptyCondition.signal();
        } finally {
            //確保會釋放鎖
            lock.unlock();
        }
    }

    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmptyCondition.await();//釋放鎖,等待隊列不為空,即等待隊列中至少有一個元素
            }
            Object x = queue[removeIndex];
            removeIndex++;
            if (removeIndex == queue.length) {
                removeIndex = 0;
            }
            count--;
            notFullCondition.signal();//通知那些等待隊列非空的線程,可以向隊列中插入元素了
            return (T) x;
        } finally {
            //確保會釋放鎖
            lock.unlock();
        }
    }
}

TODO:Condition的實現(xiàn)分析


參考

大部分來自《Java并發(fā)編程的藝術(shù)》,部分參考JDK中的注釋說明。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容