第5章 Java的鎖

基本概念:

鎖:控制多線程并發(fā)訪問資源;
隊(duì)列同步器:管理同步狀態(tài),實(shí)現(xiàn)鎖;
同步狀態(tài):同步器的操作對(duì)象,int類型;
同步隊(duì)列:同步器通過同步隊(duì)列管理同步狀態(tài);

同步器實(shí)現(xiàn)鎖:

1.自定義同步器;
2.同步器定義如何獲取、釋放同步狀態(tài);
3.鎖通過同步器來實(shí)現(xiàn)語義;

public class Mutex implements Lock {

    //自定義的同步器
    static class MySyncer extends AbstractQueuedSynchronizer {

        //獨(dú)占式獲取同步狀態(tài)
        @Override
        protected boolean tryAcquire(int arg) {
            if(compareAndSetState(0, 1)){
                //設(shè)置當(dāng)前線程獨(dú)占同步器
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        //獨(dú)占式釋放同步狀態(tài)
        @Override
        protected boolean tryRelease(int arg) {
            if(getState() == 0)
                throw new IllegalMonitorStateException();
            //設(shè)置同步器無占用線程
            setExclusiveOwnerThread(null);
            //設(shè)置同步狀態(tài)為0
            setState(0);
            return true;
        }

        //同步器是否被獨(dú)占
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        Condition newCondition(){
            return new ConditionObject();
        }
    }

    //同步器對(duì)象,用來實(shí)現(xiàn)鎖
    private final MySyncer syncer = new MySyncer();

    @Override
    public void lock() {
        syncer.acquire(1);
    }

    @Override
    public boolean tryLock() {
        return syncer.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return syncer.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        syncer.acquireInterruptibly(1);
    }

    @Override
    public void unlock() {
        syncer.release(1);
    }

    @Override
    public Condition newCondition() {
        return syncer.newCondition();
    }
    
} 
同步器get/update同步狀態(tài):
getState(); //獲取同步狀態(tài)
setState(); //設(shè)置同步狀態(tài)
compareAndSetState(int expect, int update); //CAS設(shè)置同步狀態(tài),保證原子性
同步器acquire/release同步狀態(tài)(可重寫):
//模板方法調(diào)用,非阻塞
tryAcquire(int arg); //獨(dú)占式占用同步狀態(tài)
tryRelease(int arg); //獨(dú)占式釋放同步狀態(tài)
tryAcquireShared(int arg); //共享式占用同步狀態(tài)
tryReleaseShared(int arg); //共享式釋放同步狀態(tài)

注:單詞get與acquire區(qū)別,get指直接獲取,acquire指通過努力后獲取/占用;

同步器的模板方法(不可重寫):

鎖通過同步器對(duì)象直接調(diào)用模板方法來實(shí)現(xiàn)語義;
模板方法調(diào)用上面tryXxxx(int arg)方法來實(shí)現(xiàn)狀態(tài)占用/釋放;

//獨(dú)占式,阻塞式
acquire(int arg); //獨(dú)占式占用同步狀態(tài)
acquireInterruptibly(int arg); //獨(dú)占式,可響應(yīng)中斷
tryAcquireNanos(int arg, long nanosTimesout); //獨(dú)占式,可響應(yīng)中斷,超時(shí)等待
release(int arg); //獨(dú)占式釋放同步狀態(tài)
//共享式,阻塞式
acquireShared(int arg); //共享式占用同步狀態(tài)
acquireSharedInterruptibly(int arg); //共享式,可響應(yīng)中斷
tryAcquireSharedNanos(int arg, long nanosTimesout); //共享式, 可響應(yīng)中斷,超時(shí)等待
releaseShared(int arg);//共享式釋放同步狀態(tài)

注:阻塞式指同步器未獲取同步狀態(tài)時(shí)線程會(huì)阻塞,非阻塞指線程不會(huì)阻塞;

同步隊(duì)列:
FIFO同步隊(duì)列

1.同步隊(duì)列是由Node組成的鏈表;
2.當(dāng)線程獲取鎖失敗時(shí),就把thread及等待狀態(tài)信息包裝成node插入隊(duì)列尾部,并更改tail節(jié)點(diǎn),通過CAS操作保證并發(fā)性;
3.當(dāng)head節(jié)點(diǎn)的線程釋放鎖時(shí),就喚醒其next節(jié)點(diǎn),next節(jié)點(diǎn)獲取同步狀態(tài)成功時(shí)就把自己設(shè)為head節(jié)點(diǎn);

獨(dú)占式(共享式)占用和釋放同步狀態(tài):

獨(dú)占式/共享式占用同步狀態(tài)流程圖

區(qū)別:同一時(shí)刻,獨(dú)占式只有一個(gè)線程可以獲取同步狀態(tài),而共享式可以有多個(gè)線程獲取同步狀態(tài);
相同點(diǎn):第一次獲取同步狀態(tài)失敗后都生成節(jié)點(diǎn),插入同步隊(duì)列尾部;都是通過自旋來獲取同步狀態(tài);前驅(qū)為頭節(jié)點(diǎn)時(shí)才能獲取到同步狀態(tài);

自定義同步主鍵TwinsLock-雙線程鎖:

同一時(shí)刻有兩個(gè)線程可以持有鎖,即同步器共享式占用同步狀態(tài),且同步狀態(tài)數(shù)為2;

public class TwinsLock implements Lock {

    //同步器對(duì)象
    private Syncer syncer = new Syncer(2);

    static class Syncer extends AbstractQueuedSynchronizer {

        //初始化同步狀態(tài)的數(shù)量
        public Syncer(int stateCount) {
            setState(stateCount);
        }

        //共享式獲取同步狀態(tài),非阻塞
        @Override
        protected int tryAcquireShared(int arg) {
            for (; ; ) {
                int currentState = getState();
                int newState = currentState - 1;
                if (newState < 0 || compareAndSetState(currentState, newState))
                    return newState;
            }
        }

        //共享式釋放同步狀態(tài),非阻塞
        @Override
        protected boolean tryReleaseShared(int arg) {
            for (; ; ) {
                int currentState = getState();
                if (compareAndSetState(currentState, currentState + 1))
                    return true;
            }
        }
    }

    //加鎖
    @Override
    public void lock() {
        //acquireShared()會(huì)調(diào)用tryAcquireShared()
        syncer.acquireShared(1);
    }

    //釋放鎖
    @Override
    public void unlock() {
        syncer.releaseShared(1);
    }
}

lock()調(diào)用模板方法acquireShared()共享式獲取同步狀態(tài), red()調(diào)用用戶自定義的tryAcquireShared()來獲取同步狀態(tài),若獲取成功,則線程擁有了鎖;若tryAcquireShared()獲取同步狀態(tài)失敗,則把線程及其狀態(tài)包裝成node插入同步隊(duì)列的尾部,并進(jìn)行自旋來獲取同步狀態(tài);自旋過程中,當(dāng)node的前置節(jié)點(diǎn)是head節(jié)點(diǎn)且釋放同步狀態(tài)后,當(dāng)前節(jié)點(diǎn)就調(diào)用tryAcquireShared()來獲取同步狀態(tài),若獲取成功則表示線程擁有了鎖,若獲取失敗則繼續(xù)自旋;

重入鎖:

重入鎖指線程獲取到鎖后能夠再次獲取該鎖,而不會(huì)被鎖阻塞;
公平模式:
鎖的獲取順序符合請(qǐng)求的絕對(duì)時(shí)間順序;

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        //同步狀態(tài)
        int c = getState();
        if (c == 0) {
            //鎖未被任何線程占用
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                //沒有更早的線程處于WAITTING狀態(tài),且當(dāng)前線程獲取同步狀態(tài)成功
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            //鎖被當(dāng)前線程占用
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

非公平模式:
有新線程請(qǐng)求鎖時(shí),先爭奪一下鎖,沒成功再去排隊(duì);排隊(duì)之后依然滿足FIFO規(guī)則,即前面節(jié)點(diǎn)的線程先獲取鎖;

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        //同步狀態(tài)
        int c = getState();
        if (c == 0) {
            //鎖未被任何線程占用
            if (compareAndSetState(0, acquires)) {
                //當(dāng)前線程獲取同步狀態(tài)成功
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            //鎖被當(dāng)前線程占用
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

總結(jié):公平和非公平模式下調(diào)用lock()獲取鎖時(shí),調(diào)用acquire()模板方法,acquire()方法首先都調(diào)用tryAcquire(1)第一次嘗試獲取同步狀態(tài),若鎖此刻鎖未被占用,則公平模式下會(huì)判斷是否有更早的線程處于WAITTING狀態(tài),若有則第一次嘗試獲取鎖失敗,若沒有則嘗試獲取同步狀態(tài);而非公平模式下則直接嘗試獲取同步狀態(tài),不考慮是否有更早的線程是否處于WAITTING狀態(tài);公平模式和非公平模式在第一次嘗試獲取同步狀態(tài)失敗后,都會(huì)把線程及其狀態(tài)包裝成node插入FIFO同步隊(duì)列尾部,之后通過自旋來獲取同步狀態(tài),公平和非公平模式下都需要等待前置節(jié)點(diǎn)來喚醒自己;
性能對(duì)比:公平鎖具有大量的線程切換,因此其吞吐性不如非公平鎖;

讀寫鎖:

排它鎖:同一時(shí)刻只能有一個(gè)線程獲取鎖(如ReentrantLock);
1.讀鎖是非排它鎖,同一時(shí)刻可以有多個(gè)讀線程獲取鎖;但是寫線程獲取鎖時(shí),所有讀線程和其它寫線程均被阻塞;
2.讀寫鎖是一對(duì)鎖,包括讀鎖(可重入共享鎖)和寫鎖(可重入排它鎖);
3.包括公平模式和非公平模式;
4.支持重進(jìn)入,且在獲取寫鎖后還能獲取讀鎖,但獲取讀鎖后不能獲取寫鎖;
5.在讀多于寫的情況下,讀寫鎖比排它鎖具有更好的吞吐性;
讀寫狀態(tài)的設(shè)計(jì):
整型變量的高16位表示讀、低16位表示寫,則線程獲取讀鎖后,同步狀態(tài)S=S+(1<<16),線程獲取寫鎖后,同步狀態(tài)S=S+1;
寫鎖的獲取與釋放:
寫鎖獲取成功的條件:1.寫鎖此刻被當(dāng)前線程擁有;2.讀鎖or寫鎖此刻沒被任何線程擁有;否則獲取寫鎖失敗,線程進(jìn)入WAITTING狀態(tài);

protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
            //存在讀鎖or寫鎖
            // (Note: if c != 0 and w == 0 then shared count != 0)
            if (w == 0 || current != getExclusiveOwnerThread())
                //存在讀鎖or不是當(dāng)前線程獲取寫鎖,則獲取寫鎖失敗
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            //之前是當(dāng)前線程獲取的寫鎖,因此不存在并發(fā)問題,不需要CAS操作
            setState(c + acquires);
            return true;
        }
        //之前沒任何線程獲取讀鎖or寫鎖,此刻可能有多個(gè)寫線程并發(fā)請(qǐng)求寫鎖,需要CAS操作設(shè)置同步狀態(tài)
        if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
            return false;
        //當(dāng)前線程首次獲取同步狀態(tài)成功
        setExclusiveOwnerThread(current);
        return true;
    }

讀鎖的獲取與釋放:
讀鎖獲取成功的條件:1.寫鎖此刻被當(dāng)前線程擁有;2.寫鎖此刻沒被任何線程擁有;否則獲取寫鎖失敗,線程進(jìn)入WAITTING狀態(tài);
只要"其它線程擁有寫鎖"的情況不出現(xiàn),則當(dāng)前線程就不斷嘗試獲取同步狀態(tài),而不是進(jìn)入等待狀態(tài),是因?yàn)楫?dāng)前線程此刻仍然具有獲取讀鎖的資格,而不用等待資格;但是當(dāng)其它線程擁有寫鎖時(shí),則當(dāng)前線程獲取同步狀態(tài)失敗,失去獲取讀鎖的資格,退出for循環(huán),進(jìn)入WAITTING狀態(tài),并把線程及其狀態(tài)包裝成node插入FIFO同步隊(duì)列尾部,之后通過自旋來獲取同步狀態(tài)

    protected final int tryAcquireShared(int unused) {
        //只要"其它線程擁有寫鎖"的情況不出現(xiàn),則當(dāng)前線程就不斷嘗試獲取同步狀態(tài),
        for (; ; ) {
            Thread current = Thread.currentThread();
            int c = getState();
            int nextC = c + (1 << 16);
            if (nextC < c)
                throw new Error("Maximum lock count exceeded");
            if (exclusiveCount(c) != 0 && owner != Thread.currentThread())
                //存在寫鎖(寫狀態(tài)不為0),且寫鎖擁有線程不是當(dāng)前線程,則獲取讀鎖失敗
                return -1;
            if (compareAndSetState(c, nextC))
                //獲取讀鎖成功
                return 1;
        }
    }

鎖降級(jí):
鎖降級(jí)的過程:擁有寫鎖->預(yù)處理(寫/改)數(shù)據(jù)->擁有讀鎖->釋放寫鎖->使用(讀)數(shù)據(jù)->釋放讀鎖,即讀鎖降級(jí)到寫鎖;
占用讀鎖,釋放寫鎖,讀數(shù)據(jù),這樣做有兩個(gè)好處:1.使用(讀)數(shù)據(jù)期間,其它數(shù)據(jù)只讀的線程可以獲取到讀鎖,而不至于堵塞,提高了效率;2.保證使用(讀)數(shù)據(jù)的過程中數(shù)據(jù)是沒有發(fā)生變化的,因?yàn)樵卺尫抛x鎖前,其它線程無法獲取寫鎖來更改數(shù)據(jù);

public class LockDown {

    static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    static Lock readLock = lock.readLock();
    static Lock writeLock = lock.writeLock();

    static class Thread1 implements Runnable{
        //線程是否完成了數(shù)據(jù)準(zhǔn)備的更新
        static volatile boolean update = false;

        @Override
        public void run() {

        }

        public void processData(){
            readLock.lock();
            if(update == false){
                readLock.unlock();
                writeLock.lock();
                try {
                    //預(yù)處理(寫/改)數(shù)據(jù)
                    prepareData();
                    update = true;
                    //先獲取讀鎖
                    readLock.lock();
                }finally {
                    //然后釋放寫鎖
                    writeLock.unlock();
                }
            }
            try {
                //使用(讀)數(shù)據(jù)
                useData();
            }finally {
                //最后釋放讀鎖
                readLock.unlock();
            }
        }
        
        public void prepareData(){}

        public void useData(){}
    }

}

注:Java不支持鎖升級(jí),鎖升級(jí)會(huì)引起死鎖;

LockSupport工具:

LockSupport工具類定義了一組public static方法,用來阻塞當(dāng)前線程、喚醒被阻塞的線程;

void park(); //阻塞當(dāng)前線程
void unpark(Thread thread); //喚醒阻塞的線程thread
void park(Object blocker); //blocker是當(dāng)前線程等待的對(duì)象(阻塞對(duì)象,鎖對(duì)象??)
Condition接口:

提供鎖的監(jiān)視器方法:await()、signal()、signalAll(),用于線程間通信;

//Condition對(duì)象通過Lock對(duì)象創(chuàng)建
Condition condition = lock.newCondition();

等待隊(duì)列:
當(dāng)線程調(diào)用condition.await()方法時(shí),就把線程及其狀態(tài)包裝成node插入等待隊(duì)列的尾部,此刻線程進(jìn)入WAITTING狀態(tài);
等待隊(duì)列與同步隊(duì)列不同之處:
1.同步隊(duì)列是線程獲取獲取鎖(同步狀態(tài)失敗)時(shí)插入的隊(duì)列,等待隊(duì)列是線程調(diào)用wait()方法時(shí)插入的隊(duì)列;2.同步隊(duì)列是雙鏈表,而等待隊(duì)列是單鏈表;3.一個(gè)Condition對(duì)象對(duì)應(yīng)一個(gè)等待隊(duì)列,一個(gè)Lock對(duì)象可以有多個(gè)Condition對(duì)象,即可以擁有多個(gè)等待隊(duì)列,但只能擁有一個(gè)同步隊(duì)列;
等待隊(duì)列與與同步隊(duì)列相同之處:
1.共用同步器的靜態(tài)內(nèi)部類AbstractQueuedSynchronized.Node來生成節(jié)點(diǎn);2.鏈表結(jié)構(gòu)相似;
等待隊(duì)列的結(jié)構(gòu):
單向FIFO鏈表,head節(jié)點(diǎn)指向鏈表的頭,tail節(jié)點(diǎn)指向鏈表的尾;

等待隊(duì)列的結(jié)構(gòu)

線程進(jìn)入等待狀態(tài):
線程進(jìn)入等待狀態(tài)

1.同步隊(duì)列head節(jié)點(diǎn)的線程構(gòu)造新節(jié)點(diǎn)并加入等待隊(duì)列;2.釋放同步狀態(tài);3.喚醒后繼節(jié)點(diǎn);4.進(jìn)入等待狀態(tài);
線程被通知:
線程被通知

1.等待隊(duì)列的head節(jié)點(diǎn)移動(dòng)到同步隊(duì)列的尾節(jié)點(diǎn)上,線程從WAITTING狀態(tài)進(jìn)入BLOCKED狀態(tài);2.喚醒線程去競(jìng)爭同步狀態(tài);3.非同步隊(duì)列的首節(jié)點(diǎn),獲取同步狀態(tài)失敗,再次阻塞;4.最終通過自旋獲取同步狀態(tài)成功后從await()返回,此刻線程成功獲取了鎖;

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容