java鎖(7)改進(jìn)讀寫鎖StampedLock詳解

1、StampedLock特性

StampedLock是JDK 8新增的讀寫鎖,跟讀寫鎖不同,它并不是由AQS實現(xiàn)的。它的state為一個long型變量,狀態(tài)的設(shè)計也不同于讀寫鎖,且提供了三種模式來控制 read/write 的獲取,并且內(nèi)部實現(xiàn)了自己的同步等待隊列。

1.1、StampedLock讀寫鎖

寫鎖:使用writeLock方法獲取,當(dāng)鎖不可用時會阻塞,獲取成功后返回一個與這個寫鎖對應(yīng)的stamp,在unlockWrite方法中,需要通過這個stamp來釋放與之對應(yīng)的鎖。在tryWriteLock同樣也會提供這個stamp。當(dāng)在write模式中獲取到寫鎖時,讀鎖不能被獲取,并且所有的樂觀讀鎖驗證(validate方法)都會失敗。
讀鎖:使用readLock方法獲取,當(dāng)超出可用資源時(類似AQS的state設(shè)計)會阻塞。同樣的,在獲取鎖成功后也會返回stamp,作用與上述相同。tryReadLock同樣如此。
樂觀讀鎖:使用tryOptimisticRead方法獲取,只有在寫鎖可用時才能成功獲取樂觀讀鎖,獲取成功后也會返回一個stamp。validate方法可以根據(jù)這個stamp來判斷寫鎖是否被獲取。這種模式可以理解為一個弱化的讀鎖(weak version of a read-lock),它在任何時候都能被破壞。樂觀讀模式常被用在短的只讀的代碼段,用來減少爭用并提高吞吐量。樂觀讀區(qū)域應(yīng)該只讀取字段,并將它們保存在本地變量中,以便在驗證(validate方法)后使用。在樂觀讀模式中字段的讀取可能會不一致,所以可能需要反復(fù)調(diào)用validate()來檢查一致性。例如,當(dāng)首次讀取一個對象或數(shù)組引用,然后訪問其中一個的字段、元素或方法時,這些步驟通常是必需的。

1.2、三種模式的轉(zhuǎn)換

StampedLock可以將三種模式是鎖進(jìn)行有條件的互相轉(zhuǎn)換。
將其他鎖轉(zhuǎn)換為寫鎖tryConvertToWriteLock():
當(dāng)前郵戳為持有寫鎖模式,直接返回當(dāng)前的郵戳;
當(dāng)前郵戳為持有讀鎖模式,則會釋放讀鎖并獲取寫鎖,并返回寫鎖郵戳;
當(dāng)前郵戳持有樂觀鎖,通過CAS立即獲取寫鎖,成功則返回寫鎖郵戳;失敗則返回0;

將其他鎖轉(zhuǎn)換為讀鎖tryConvertToReadLock:
當(dāng)前郵戳為持有寫鎖模式,則會釋放寫鎖并獲取讀鎖,并返回讀鎖郵戳;
當(dāng)前郵戳為持有讀鎖模式,則直接返回當(dāng)前讀鎖郵戳;
當(dāng)前郵戳持有樂觀鎖,通過CAS立即獲取讀鎖,則返回讀鎖郵戳;否則,獲取失敗返回0;

將其他鎖轉(zhuǎn)換為樂觀鎖tryConvertToOptimisticRead:
當(dāng)前郵戳為持有讀或?qū)戞i,則直接釋放讀寫鎖,并返回釋放后的觀察者郵戳值;
當(dāng)前郵戳持有樂觀鎖,若樂觀鎖郵戳有效,則返回觀察者郵戳;

1.3、StampedLock的應(yīng)用場景

StampedLock一般作為線程安全的內(nèi)部工具類。它的使用依賴于對數(shù)據(jù)、對象和方法的內(nèi)部屬性有一定的了解。StampedLock 是不可重入的,所以在鎖的內(nèi)部不能調(diào)用其他嘗試重復(fù)獲取鎖的方。一個stamp如果在很長時間都沒有使用或驗證,在很長一段時間之后可能就會驗證失敗。StampedLocks是可序列化的,但是反序列化后變?yōu)槌跏嫉姆擎i定狀態(tài),所以在遠(yuǎn)程鎖定中是不安全的。

1.4、StampedLock的公平性

StampedLock 的調(diào)度策略不會始終偏向讀線程或?qū)懢€程,所有的"try"方法都是盡最大努力獲取,并不一定遵循任何調(diào)度或公平策略。從"try"方法獲取或轉(zhuǎn)換鎖失敗返回0時,不會攜帶任何鎖的狀態(tài)信息。由于StampedLock支持跨多個鎖模式的協(xié)調(diào)使用,它不會直接實現(xiàn)Lock或ReadWriteLock接口。但是,如果應(yīng)用程序需要Lock的相關(guān)功能,它可以通過asReadLock()、asWriteLock()和asReadWriteLock()方法返回一個Lock視圖。

2、源碼分析

2.1、主要屬性

//獲取CPU的可用線程數(shù)量,用于確定自旋的時候循環(huán)次數(shù)
private static final int NCPU = Runtime.getRuntime().availableProcessors();

//根據(jù)NCPU確定自旋的次數(shù)限制(并不是一定這么多次,因為實際代碼中是隨機(jī)的)
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;

//頭節(jié)點上的自旋次數(shù)
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;

//頭節(jié)點上的最大自旋次數(shù)
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;

//等待自旋鎖溢出的周期數(shù)
private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1

//在溢出之前讀線程計數(shù)用到的bit位數(shù)
private static final int LG_READERS = 7;

//一個讀狀態(tài)單位:0000 0000 0001
private static final long RUNIT = 1L;
//一個寫狀態(tài)單位:0000 1000 0000
private static final long WBIT  = 1L << LG_READERS;
//讀狀態(tài)標(biāo)識:0000 0111 1111
private static final long RBITS = WBIT - 1L;
//讀鎖計數(shù)的最大值:0000 0111 1110
private static final long RFULL = RBITS - 1L;
//讀線程個數(shù)和寫線程個數(shù)的掩碼:0000 1111 1111
private static final long ABITS = RBITS | WBIT;
//// 讀線程個數(shù)的反數(shù),高25位全部為1:1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1000 0000
private static final long SBITS = ~RBITS; // note overlap with ABITS

// state的初始值
private static final long ORIGIN = WBIT << 1;

// 中斷標(biāo)識
private static final long INTERRUPTED = 1L;

// 節(jié)點狀態(tài)值,等待/取消
private static final int WAITING   = -1;
private static final int CANCELLED =  1;

// 節(jié)點模式,讀模式/寫模式
private static final int RMODE = 0;
private static final int WMODE = 1;


//等待隊列的頭節(jié)點
private transient volatile WNode whead;
//等待隊列的尾節(jié)點
private transient volatile WNode wtail;

// 鎖狀態(tài)
private transient volatile long state;
////因為讀狀態(tài)只有7位很小,所以當(dāng)超過了128之后將使用此變量來記錄
private transient int readerOverflow;

2.2、node節(jié)點實現(xiàn)

//等待隊列的節(jié)點實現(xiàn)
static final class WNode {
    //前驅(qū)節(jié)點
    volatile WNode prev;
    //后繼節(jié)點
    volatile WNode next;
    //讀線程使用的鏈表
    volatile WNode cowait;    // list of linked readers
    //等待的線程
    volatile Thread thread;   // non-null while possibly parked
    //節(jié)點狀態(tài)
    volatile int status;      // 0, WAITING, or CANCELLED
    //節(jié)點模式
    final int mode;           // RMODE or WMODE
    WNode(int m, WNode p) { mode = m; prev = p; }
}

2.3、state狀態(tài)實現(xiàn)

state狀態(tài)說明:

  • bit0—bit6為作為讀鎖計數(shù),當(dāng)超出RFULL(126)時,用readerOverflow作為讀鎖計數(shù)。當(dāng)獲取到讀鎖時,state加RUINT(值為1);當(dāng)釋放讀鎖時,state減去RUINT。
  • bit7為寫鎖標(biāo)識,其值為1表示已獲取寫鎖,值為0表示已獲取讀鎖。當(dāng)線程獲取寫鎖或釋放寫鎖時,都會將state加WBIT;
  • bit8—bit64:表示寫鎖版本,不論獲取寫鎖還是釋放寫鎖,其值都會改變。
    初始狀態(tài)bit8為1,其他位為0。

state常用狀態(tài)標(biāo)識:

  • RUNIT:
    0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001
  • WBIT:
    0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 1000 0000
  • RBITS:
    0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0111 1111
  • RFULL:
    0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0111 1110
  • ABITS:
    0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 1111 1111
  • SBITS:
    1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1000 0000
  • ORIGIN:
    0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001 0000 0000

state常用狀態(tài)判斷:

  • 有無線程獲取寫狀態(tài):state < WBIT,true:無,false:有;
  • 讀狀態(tài)是否溢出:(state & ABITS) < RFULL,true;否;false:是;
  • 獲取讀狀態(tài): state + RUNIT(或者readerOverflow + 1)
  • 獲取寫狀態(tài): state + WBIT
  • 釋放讀狀態(tài): state - RUNIT(或者readerOverflow - 1)
  • 釋放寫狀態(tài): (s += WBIT) == 0L ? ORIGIN : s
  • 是否為寫鎖: (state & WBIT) != 0L
  • 是否為讀鎖: (state & RBITS) != 0L

2.4、寫鎖獲取及釋放

獲取寫鎖:

public long writeLock() {
    long s, next;  
    //(state & ABITS)獲取低8位,判斷有無讀/寫鎖存在,
    //有其他讀鎖則bit0-bit6不為0,有其他寫鎖則bit7不為0,
    //因此如果有別的寫鎖或者讀鎖存在將失敗
    //嘗試CAS獲取寫鎖
    //失敗調(diào)用acquireWrite繼續(xù)獲取寫鎖
    return ((((s = state) & ABITS) == 0L &&
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
            next : acquireWrite(false, 0L));
}
private long acquireWrite(boolean interruptible, long deadline) {
    //node為當(dāng)前節(jié)點,p為當(dāng)前節(jié)點的前驅(qū)節(jié)點
    WNode node = null, p;
    
    // 第一次自旋——主要進(jìn)行入隊工作
    for (int spins = -1;;) { // spin while enqueuing
        long m, s, ns;
        //(state&ABITS)為0表示無讀/寫鎖,則嘗試CAS獲取寫鎖
        if ((m = (s = state) & ABITS) == 0L) {
            if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                return ns;
        }
        else if (spins < 0)
            // 如果自旋次數(shù)小于0,則計算自旋的次數(shù)
            // 如果當(dāng)前有寫鎖(m == WBIT)獨占,且隊列無元素(wtail == whead),
            // 說明當(dāng)前節(jié)點的前驅(qū)節(jié)點為獲取獨占鎖的節(jié)點,鎖很快就會釋放,
            // 就自旋SPINS次就行了,如果自旋完了還沒輪到自己才入隊
            // 否則自旋次數(shù)為0
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
        else if (spins > 0) {
            // 當(dāng)自旋次數(shù)大于0時,當(dāng)前這次自旋隨機(jī)減一次自旋次數(shù)
            if (LockSupport.nextSecondarySeed() >= 0)
                --spins;
        }
        else if ((p = wtail) == null) { // initialize queue
            // 如果隊列未初始化,新建一個空節(jié)點并初始化頭節(jié)點和尾節(jié)點
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        else if (node == null)
            // 如果新增節(jié)點還未初始化,則新建之,并賦值其前置節(jié)點為尾節(jié)點
            node = new WNode(WMODE, p);
        else if (node.prev != p)
            // 如果新增節(jié)點的前驅(qū)節(jié)點不是尾節(jié)點,
            // 則更新新增節(jié)點的前驅(qū)節(jié)點為新的尾節(jié)點
            node.prev = p;
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
            // 嘗試更新新增節(jié)點為新的尾節(jié)點成功,則退出循環(huán)
            p.next = node;
            break;
        }
    }

    // 第二次自旋——主要進(jìn)行阻塞并等待喚醒
    for (int spins = -1;;) {
        // h為頭節(jié)點,np為新增節(jié)點的前置節(jié)點,pp為前前置節(jié)點,ps為前置節(jié)點的狀態(tài)
        WNode h, np, pp; int ps;
        // 如果頭節(jié)點等于前置節(jié)點,說明快輪到自己了
        if ((h = whead) == p) {
            if (spins < 0)
                // 自旋次數(shù)小于0,則初始化自旋次數(shù)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                // 自旋次數(shù)小于頭結(jié)點最大自旋次數(shù),則增加自旋次數(shù)
                spins <<= 1;
                
            // 第三次自旋,不斷嘗試獲取寫鎖    
            for (int k = spins;;) { // spin at head
                long s, ns;
                //無讀寫鎖則CAS獲取寫鎖,成功則更新節(jié)點信息
                if (((s = state) & ABITS) == 0L) {
                    if (U.compareAndSwapLong(this, STATE, s,
                                             ns = s + WBIT)) {
                        whead = node;
                        node.prev = null;
                        return ns;
                    }
                }
                // 隨機(jī)立減自旋次數(shù),當(dāng)自旋次數(shù)減為0時跳出循環(huán)再重試
                else if (LockSupport.nextSecondarySeed() >= 0 &&
                         --k <= 0)
                    break;
            }
        }
        //頭節(jié)點為空?
        else if (h != null) { // help release stale waiters
            WNode c; Thread w;
            // 如果頭節(jié)點的cowait鏈表(棧)不為空,喚醒里面的所有節(jié)點
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        
        // 如果頭節(jié)點沒有變化
        if (whead == h) {
            // 如果尾節(jié)點有變化,則更新
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            else if ((ps = p.status) == 0)
                // 如果尾節(jié)點狀態(tài)為0,則更新成WAITING
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                // 如果尾節(jié)點狀態(tài)為取消,則把它從鏈表中刪除
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                // 有超時時間的處理
                long time; // 0 argument to park means no timeout
                if (deadline == 0L)
                    time = 0L;
                //時間以過期則取消節(jié)點    
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    return cancelWaiter(node, node, false);
                    
                //設(shè)置線程blocker    
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                //1、當(dāng)前節(jié)點前驅(qū)節(jié)點狀態(tài)為WAITING;
                //2、當(dāng)前節(jié)點的前驅(qū)節(jié)點不是頭節(jié)點或有讀寫鎖已經(jīng)被獲??;
                //3、頭結(jié)點為改變;
                //4、當(dāng)前節(jié)點的前驅(qū)節(jié)點未改變;
                //當(dāng)以上4個條件都滿足時將當(dāng)前節(jié)點進(jìn)行阻塞
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                    whead == h && node.prev == p)
                    U.park(false, time);  // emulate LockSupport.park
                node.thread = null;
                //設(shè)置線程blocker為空
                U.putObject(wt, PARKBLOCKER, null);
                //當(dāng)前節(jié)點百中斷則取消節(jié)點
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

寫鎖釋放:

public void unlockWrite(long stamp) {
    WNode h;
    //因為寫鎖是獨占鎖,可以簡單判斷state != stamp;
    //或者bit7為0,即stamp狀態(tài)為無寫鎖
    if (state != stamp || (stamp & WBIT) == 0L)
        throw new IllegalMonitorStateException();
    
    //修改state狀態(tài),state += WBIT;溢出則初始化為ORIGIN        
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
    //頭節(jié)點不為空,且狀態(tài)正常則釋放頭結(jié)點的鎖
    if ((h = whead) != null && h.status != 0)
        release(h);
}
private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        //設(shè)置頭節(jié)點狀態(tài)為0
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        
        //頭節(jié)點下個節(jié)點為空或狀態(tài)為CANCEL,則從尾節(jié)點前向遍歷,
        //找到頭結(jié)點后面的第一個有效節(jié)點(狀態(tài)為0或WAITTING)
        if ((q = h.next) == null || q.status == CANCELLED) {
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        //喚醒下一個有效節(jié)點
        if (q != null && (w = q.thread) != null)
            U.unpark(w);
    }
}

2.5、讀鎖獲取及釋放

讀鎖獲?。?/strong>

public long readLock() {
    long s = state, next;  // bypass acquireRead on common uncontended case
    //同步隊列為空,讀鎖計數(shù)值未超過最大值(無寫鎖),CAS獲取鎖成功,則直接返回郵戳值;
    //否則acquireRead獲取讀鎖
    return ((whead == wtail && (s & ABITS) < RFULL &&
             U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
            next : acquireRead(false, 0L));
}
private long acquireRead(boolean interruptible, long deadline) {
    WNode node = null, p;
    for (int spins = -1;;) {
        WNode h;
        //如果同步隊列頭節(jié)點等于尾節(jié)點,說明隊列中沒有節(jié)點或者只有一個節(jié)點
        //則自旋一段時間,等待頭結(jié)點釋放鎖后獲取鎖
        if ((h = whead) == (p = wtail)) {
            for (long m, s, ns;;) {
                //(state & ABITS)小于RFULL,表示無寫鎖,則直接CAS獲取讀鎖;
                //否則若(state & ABITS)小于WBIT表示無寫鎖但讀鎖計數(shù)已溢出,
                //則CAS更新讀鎖計數(shù)獲取讀鎖
                if ((m = (s = state) & ABITS) < RFULL ?
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                    return ns;
                //(state & ABITS)大于WBIT,表示寫鎖已經(jīng)被占,則自旋    
                else if (m >= WBIT) {
                    //隨機(jī)減自旋次數(shù)
                    if (spins > 0) {
                        if (LockSupport.nextSecondarySeed() >= 0)
                            --spins;
                    }
                    else {
                        //自旋完還沒獲取到讀鎖?
                        if (spins == 0) {
                            WNode nh = whead, np = wtail;
                            //判斷穩(wěn)定性(有沒有被修改),跳出循環(huán)
                            if ((nh == h && np == p) || (h = nh) != (p = np))
                                break;
                        }
                        //初始化spins
                        spins = SPINS;
                    }
                }
            }
        }
        //自旋獲取失敗,則進(jìn)行節(jié)點初始化相關(guān)的處理
        //尾節(jié)點為空,則初始化隊列
        if (p == null) { // initialize queue
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        //初始化代表當(dāng)前讀線程的節(jié)點
        else if (node == null)
            node = new WNode(RMODE, p);
        //head==tail或者隊列tail.mode不為讀狀態(tài),
        //那么將當(dāng)前線程的節(jié)點node加入到隊列尾部并跳出外層循環(huán)    
        else if (h == p || p.mode != RMODE) {
            if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                break;
            }
        }
        //如果head!= tail說明隊列中已經(jīng)有線程在等待或者tail.mode是讀狀態(tài)RMODE,
        //那么CAS方式將當(dāng)前線程的節(jié)點node加入到tail節(jié)點的cowait鏈中
        else if (!U.compareAndSwapObject(p, WCOWAIT,
                                         node.cowait = p.cowait, node))
            node.cowait = null;
        else {
            for (;;) {
                WNode pp, c; Thread w;
                ////如果head不為空那么嘗試去解放head的cowait鏈中的節(jié)點
                if ((h = whead) != null && (c = h.cowait) != null &&
                    U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null) // help release
                    U.unpark(w);
                //如果tail節(jié)點的前驅(qū)就是head或者h(yuǎn)ead==tail或者tail節(jié)點的前驅(qū)是null
                //也就是說當(dāng)前node所在的節(jié)點(因為node可能在cowait鏈中)
                //的前驅(qū)就是head或者h(yuǎn)ead已經(jīng)被釋放了為null    
                if (h == (pp = p.prev) || h == p || pp == null) {
                    long m, s, ns;
                    do {
                        //如果沒有寫狀態(tài)被占有那么自旋方式嘗試獲取讀狀態(tài),成功則返回stamp
                        if ((m = (s = state) & ABITS) < RFULL ?
                            U.compareAndSwapLong(this, STATE, s,
                                                 ns = s + RUNIT) :
                            (m < WBIT &&
                             (ns = tryIncReaderOverflow(s)) != 0L))
                            return ns;
                    } while (m < WBIT);
                }
                //判斷是否穩(wěn)定
                if (whead == h && p.prev == pp) {
                    long time;
                    //如果tail的前驅(qū)是null或者h(yuǎn)ead==tail或者tail已經(jīng)被取消了(p.status > 0)
                    //直接將node置為null跳出循環(huán),回到最開的for循環(huán)中去再次嘗試獲取同步狀態(tài)
                    if (pp == null || h == p || p.status > 0) {
                        node = null; // throw away
                        break;
                    }
                    if (deadline == 0L)
                        time = 0L;
                    //如果超時則取消當(dāng)前線程    
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, p, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    
                    //tail的前驅(qū)不是head或者當(dāng)前只有寫線程獲取到同步狀態(tài)
                    //判斷穩(wěn)定性
                    if ((h != pp || (state & ABITS) == WBIT) &&
                        whead == h && p.prev == pp)
                        U.park(false, time);
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    //中斷的話取消
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, p, true);
                }
            }
        }
    }

     //如果隊列中沒有節(jié)點或者tail的mode是WMODE寫狀態(tài),
     //那么node被加入到隊列的tail之后進(jìn)入這個循環(huán)
    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        //如果p(node的前驅(qū)節(jié)點)就是head,那么自旋方式嘗試獲取同步狀態(tài)
        if ((h = whead) == p) {
            //第一次循環(huán),設(shè)置自旋次數(shù)
            if (spins < 0)
                spins = HEAD_SPINS;
            //自旋次數(shù)增加    
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            for (int k = spins;;) { // spin at head
                long m, s, ns;
                //自旋方式嘗試獲取同步狀態(tài)
                //獲取成功的話將node設(shè)置為head并解放node的cowait鏈中的節(jié)點并返回stamp
                if ((m = (s = state) & ABITS) < RFULL ?
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                    WNode c; Thread w;
                    whead = node;
                    node.prev = null;
                    while ((c = node.cowait) != null) {
                        if (U.compareAndSwapObject(node, WCOWAIT,
                                                   c, c.cowait) &&
                            (w = c.thread) != null)
                            U.unpark(w);
                    }
                    return ns;
                }
                //如果有寫線程獲取到了同步狀態(tài)(因為可能有寫線程闖入)那么隨機(jī)的--k控制循環(huán)次數(shù)
                else if (m >= WBIT &&
                         LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                    break;
            }
        }
        //如果head不為null,解放head的cowait鏈中的節(jié)點
        else if (h != null) {
            WNode c; Thread w;
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
         //判斷穩(wěn)定性
        if (whead == h) {
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            //嘗試設(shè)tail的狀態(tài)位WAITING表示后面還有等待的節(jié)點
            else if ((ps = p.status) == 0)
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            //如果tail已經(jīng)取消了    
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                //超時判定
                long time;
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                //阻塞等待
                if (p.status < 0 &&
                    (p != h || (state & ABITS) == WBIT) &&
                    whead == h && node.prev == p)
                    U.park(false, time);
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                //中斷處理
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

讀鎖釋放:

public void unlockRead(long stamp) {
    long s, m; WNode h;
    for (;;) {
        //寫計數(shù)相關(guān)的bit位有改變,或讀計數(shù)相關(guān)的bit位為0,或有寫鎖,則異常
        if (((s = state) & SBITS) != (stamp & SBITS) ||
            (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
            throw new IllegalMonitorStateException();
        //讀鎖計數(shù)未溢出?    
        if (m < RFULL) {
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                break;
            }
        }
       //讀鎖計數(shù)溢出?    
        else if (tryDecReaderOverflow(s) != 0L)
            break;
    }
}

2.6、樂觀鎖獲取

public long tryOptimisticRead() {
    long s;
    //當(dāng)無寫鎖,則返回(state & SBITS),即高位的寫計數(shù);
    //否則返回0,即獲取樂觀鎖失敗
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

2.7、鎖模式的轉(zhuǎn)換

轉(zhuǎn)換為寫鎖:將其他模式(如寫鎖、讀鎖、樂觀鎖)的鎖轉(zhuǎn)換為寫鎖

public long tryConvertToWriteLock(long stamp) {
    long a = stamp & ABITS, m, s, next;
    //寫鎖的狀態(tài)未變?
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        //當(dāng)前無寫/讀鎖?CAS獲取寫鎖
        if ((m = s & ABITS) == 0L) {
            if (a != 0L)
                break;
            if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
                return next;
        }
        //當(dāng)前有寫鎖,且狀態(tài)不變,則直接返回原本的版本郵戳
        else if (m == WBIT) {
            //寫線程不為當(dāng)前線程?錯誤,跳出循環(huán)
            if (a != m)
                break;
            return stamp;
        }
        //無寫鎖,有一個讀鎖,即當(dāng)前線程為獲取讀鎖的線程
        //直接cas將讀鎖釋放并獲取寫鎖,即state=(state - RUNIT + WBIT)
        else if (m == RUNIT && a != 0L) {
            if (U.compareAndSwapLong(this, STATE, s,
                                     next = s - RUNIT + WBIT))
                return next;
        }
        //當(dāng)還有其他線程獲取讀鎖時,自旋等待其他讀鎖釋放
        else
            break;
    }
    return 0L;
}

主要處理:
當(dāng)鎖狀態(tài)為無讀/寫鎖時,通過CAS獲取寫鎖;
當(dāng)當(dāng)前線程已獲取寫鎖時,直接返回原本的郵戳;
當(dāng)只有一個讀鎖,即當(dāng)只有當(dāng)前線程獲取讀鎖,沒有其他的讀鎖時,CAS釋放讀鎖并獲取寫鎖;
當(dāng)獲取讀鎖的線程數(shù)大于1,即除當(dāng)前線程還有其他線程也獲取到讀鎖時,自旋等待其他讀鎖釋放;

轉(zhuǎn)換為讀鎖:將讀/寫鎖轉(zhuǎn)換為讀鎖;

public long tryConvertToReadLock(long stamp) {
    long a = stamp & ABITS, m, s, next; WNode h;
    //無寫鎖?
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        
        //無讀/寫鎖,則CAS獲取讀鎖
        if ((m = s & ABITS) == 0L) {
            //郵戳錯誤?退出循環(huán)
            if (a != 0L)
                break;
            //讀鎖計數(shù)未溢出?    
            else if (m < RFULL) {
                if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
                    return next;
            }
            //讀鎖計數(shù)溢出?
            else if ((next = tryIncReaderOverflow(s)) != 0L)
                return next;
        }
        //有寫鎖,且寫鎖為當(dāng)前線程?則釋放讀鎖并喚醒后續(xù)寫鎖線程
        else if (m == WBIT) {
            //寫鎖不為當(dāng)前線程,錯誤,跳出循環(huán)
            if (a != m)
                break;
            state = next = s + (WBIT + RUNIT);
            if ((h = whead) != null && h.status != 0)
                release(h);
            return next;
        }
        //當(dāng)前線程已經(jīng)獲取得讀鎖?
        else if (a != 0L && a < WBIT)
            return stamp;
        else
            break;
    }
    return 0L;
}

主要處理:
若無讀/寫鎖,則CAS獲取讀鎖;
若寫鎖為當(dāng)前線程,則CAS釋放寫鎖并獲取讀鎖;
若當(dāng)前線程已經(jīng)獲取讀鎖,則返回原本的郵戳;

轉(zhuǎn)換為樂觀鎖:

public long tryConvertToOptimisticRead(long stamp) {
    long a = stamp & ABITS, m, s, next; WNode h;
    U.loadFence();
    for (;;) {
        //寫鎖狀態(tài)變更?
        if (((s = state) & SBITS) != (stamp & SBITS))
            break;
        //無讀/寫鎖?直接返回state    
        if ((m = s & ABITS) == 0L) {
            if (a != 0L)
                break;
            return s;
        }
        //已經(jīng)獲取寫鎖?則直接釋放寫鎖
        else if (m == WBIT) {
            if (a != m)
                break;
            state = next = (s += WBIT) == 0L ? ORIGIN : s;
            if ((h = whead) != null && h.status != 0)
                release(h);
            return next;
        }
        //狀態(tài)為無讀鎖或無寫鎖?
        else if (a == 0L || a >= WBIT)
            break;
        //讀鎖計數(shù)未溢出    
        else if (m < RFULL) {
            if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT)) {
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                return next & SBITS;
            }
        }
        //讀鎖計數(shù)溢出?
        else if ((next = tryDecReaderOverflow(s)) != 0L)
            return next & SBITS;
    }
    return 0L;
}

主要處理:
若當(dāng)前線程已經(jīng)獲取寫鎖,則直接釋放寫鎖;
若當(dāng)前線程已經(jīng)獲取讀鎖,則CAS釋放讀鎖;
當(dāng)鎖時,返回狀態(tài)值state;

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容