JUC并發(fā)工具之Exchanger源碼解析

原文出處:https://www.zzwzdx.cn

實(shí)現(xiàn)原理

Exchanger(交換者)是用于線程協(xié)作的工具類。Exchanger用于進(jìn)行兩個(gè)線程之間的數(shù)據(jù)交換。它提供一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn),兩個(gè)線程可以交換彼此的數(shù)據(jù)。這兩個(gè)線程通過exchange()方法交換數(shù)據(jù),當(dāng)一個(gè)線程先執(zhí)行exchange()方法后,它會(huì)一直等待第二個(gè)線程也執(zhí)行exchange()方法,當(dāng)這兩個(gè)線程到達(dá)同步點(diǎn)時(shí),這兩個(gè)線程就可以交換數(shù)據(jù)了。

Exchanger的算法核心是通過一個(gè)可以交換數(shù)據(jù)的slot和一個(gè)可以帶有數(shù)據(jù)item的參與者,在源碼中的定義如下:

for (;;) {
    if (slot is empty) { // offer
        // slot為空時(shí),將item 設(shè)置到Node 中        
        place item in a Node;
        if (can CAS slot from empty to node) {
            // 當(dāng)將node通過CAS交換到slot中時(shí),掛起線程等待被喚醒
            wait for release;
            // 被喚醒后返回node中匹配到的item
            return matching item in node;
        }
    } else if (can CAS slot from node to empty) { // release
         // 將slot設(shè)置為空
        // 獲取node中的item,將需要交換的數(shù)據(jù)設(shè)置到匹配的item
        get the item in node;
        set matching item in node;
        // 喚醒等待的線程
        release waiting thread;
    }
    // else retry on CAS failure
}

比如有2條線程A和B,A線程交換數(shù)據(jù)時(shí),發(fā)現(xiàn)slot為空,則將需要交換的數(shù)據(jù)放在slot中等待其它線程進(jìn)來交換數(shù)據(jù),等線程B進(jìn)來,讀取A設(shè)置的數(shù)據(jù),然后設(shè)置線程B需要交換的數(shù)據(jù),然后喚醒A線程,原理就是這么簡(jiǎn)單。但是當(dāng)多個(gè)線程之間進(jìn)行交換數(shù)據(jù)時(shí)就會(huì)出現(xiàn)問題,所以Exchanger加入了slot數(shù)組。

Exchanger中定義了幾個(gè)重要的成員變量,它們分別是:

private final Participant participant;
private volatile Node[] arena;
private volatile Node slot;

participant的作用是為每個(gè)線程保留唯一的一個(gè)Node節(jié)點(diǎn)。slot為單個(gè)槽,arena為數(shù)組槽。他們都是Node類型。這里arena存在的意義是當(dāng)有多個(gè)參與者使用同一個(gè)交換場(chǎng)所時(shí),會(huì)存在嚴(yán)重伸縮性問題。既然單個(gè)交換場(chǎng)所存在問題,那么我們就安排多個(gè),也就是數(shù)組arena。通過數(shù)組arena來安排不同的線程使用不同的slot來降低競(jìng)爭(zhēng)問題,并且可以保證最終一定會(huì)成對(duì)交換數(shù)據(jù)。但是Exchanger不是一來就會(huì)生成arena數(shù)組來降低競(jìng)爭(zhēng),只有當(dāng)產(chǎn)生競(jìng)爭(zhēng)時(shí)才會(huì)生成arena數(shù)組。那么怎么將Node與當(dāng)前線程綁定呢?Participant ,Participant 的作用就是為每個(gè)線程保留唯一的一個(gè)Node節(jié)點(diǎn),它繼承ThreadLocal,同時(shí)在Node節(jié)點(diǎn)中記錄在arena中的下標(biāo)index

Node的數(shù)據(jù)結(jié)構(gòu)如下:

@sun.misc.Contended static final class Node {
     // arena的下標(biāo),多個(gè)槽位的時(shí)候利用
    int index; 
    // 上一次記錄的Exchanger.bound
    int bound; 
    // 在當(dāng)前bound下CAS失敗的次數(shù);
    int collides;
    // 用于自旋;
    int hash; 
    // 這個(gè)線程的當(dāng)前項(xiàng),也就是需要交換的數(shù)據(jù);
    Object item; 
    //做releasing操作的線程傳遞的項(xiàng);
    volatile Object match; 
    //掛起時(shí)設(shè)置線程值,其他情況下為null;
    volatile Thread parked;
}

Exchanger的核心方法為exchange(V x),下面我們就來分析下exchange(V x)方法。

exchange(V x)方法

exchange(V x):等待另一個(gè)線程到達(dá)此交換點(diǎn)(除非當(dāng)前線程被中斷),然后將給定的對(duì)象傳送給該線程,并接收該線程的對(duì)象。方法定義如下:

public V exchange(V x) throws InterruptedException {
    Object v;
    // 當(dāng)參數(shù)為null時(shí)需要將item設(shè)置為空的對(duì)象
    Object item = (x == null) ? NULL_ITEM : x; // translate null args
    // 注意到這里的這個(gè)表達(dá)式是整個(gè)方法的核心
    if ((arena != null ||
            (v = slotExchange(item, false, 0 L)) == null) &&
        ((Thread.interrupted() || // disambiguates null return
            (v = arenaExchange(item, false, 0 L)) == null)))
        throw new InterruptedException();
    return (v == NULL_ITEM) ? null : (V) v;
}

仔細(xì)分析上述方法中的if語句,可以得知:

  • 只有當(dāng)arena 為空時(shí),才執(zhí)行slotExchange(item, false, 0 L)方法。
  • 當(dāng)arena不為空時(shí),或者(arena為null且slotExchange方法返回null)時(shí),此時(shí)線程未中斷,才會(huì)執(zhí)行arenaExchange方法;
  • 線程中斷時(shí),就直接拋出線程中斷異常。

下面我們?cè)倏纯磗lotExchange方法,其定義如下:

private final Object slotExchange(Object item, boolean timed, long ns) {
    // 獲取當(dāng)前線程node對(duì)象
    Node p = participant.get();
    // 當(dāng)前線程
    Thread t = Thread.currentThread();
    // 若果線程被中斷,就直接返回null
    if (t.isInterrupted()) // preserve interrupt status so caller can recheck
        return null;
    // 自旋
    for (Node q;;) {
        // 將slot值賦給q
        if ((q = slot) != null) {
             // slot 不為null,即表示已有線程已經(jīng)把需要交換的數(shù)據(jù)設(shè)置在slot中了
            // 通過CAS將slot設(shè)置成null
            if (U.compareAndSwapObject(this, SLOT, q, null)) {
                // CAS操作成功后,將slot中的item賦值給對(duì)象v,以便返回。
                // 這里也是就讀取之前線程要交換的數(shù)據(jù)
                Object v = q.item;
                // 將當(dāng)前線程需要交給的數(shù)據(jù)設(shè)置在q中的match
                q.match = item;
                 // 獲取被掛起的線程
                Thread w = q.parked;
                if (w != null)
                    // 如果線程不為null,喚醒它
                    U.unpark(w);
                // 返回其他線程給的V
                return v;
            }
            // create arena on contention, but continue until slot null
            // CAS 操作失敗,表示有其它線程競(jìng)爭(zhēng),在此線程之前將數(shù)據(jù)已取走
            // NCPU:CPU的核數(shù)
            // bound == 0 表示arena數(shù)組未初始化過,CAS操作bound將其增加SEQ
            if (NCPU > 1 && bound == 0 &&
                U.compareAndSwapInt(this, BOUND, 0, SEQ))
                // 初始化arena數(shù)組
                arena = new Node[(FULL + 2) << ASHIFT];
        }
        // 上面分析過,只有當(dāng)arena不為空才會(huì)執(zhí)行slotExchange方法的
        // 所以表示剛好已有其它線程加入進(jìn)來將arena初始化
        else if (arena != null)
            // 這里就需要去執(zhí)行arenaExchange
            return null; // caller must reroute to arenaExchange
        else {
            // 這里表示當(dāng)前線程是以第一個(gè)線程進(jìn)來交換數(shù)據(jù)
            // 或者表示之前的數(shù)據(jù)交換已進(jìn)行完畢,這里可以看作是第一個(gè)線程
            // 將需要交換的數(shù)據(jù)先存放在當(dāng)前線程變量p中
            p.item = item;
            // 將需要交換的數(shù)據(jù)通過CAS設(shè)置到交換區(qū)slot
            if (U.compareAndSwapObject(this, SLOT, null, p))
                // 交換成功后跳出自旋
                break;
            // CAS操作失敗,表示有其它線程剛好先于當(dāng)前線程將數(shù)據(jù)設(shè)置到交換區(qū)slot
            // 將當(dāng)前線程變量中的item設(shè)置為null,然后自旋獲取其它線程存放在交換區(qū)slot的數(shù)據(jù)
            p.item = null;
        }
    }

    // await release
    // 執(zhí)行到這里表示當(dāng)前線程已將需要的交換的數(shù)據(jù)放置于交換區(qū)slot中了,
    // 等待其它線程交換數(shù)據(jù)然后喚醒當(dāng)前線程
    int h = p.hash;
    long end = timed ? System.nanoTime() + ns : 0 L;
    // 自旋次數(shù)
    int spins = (NCPU > 1) ? SPINS : 1;
    Object v;
    // 自旋等待直到p.match不為null,也就是說等待其它線程將需要交換的數(shù)據(jù)放置于交換區(qū)slot
    while ((v = p.match) == null) {
        // 下面的邏輯主要是自旋等待,直到spins遞減到0為止
        if (spins > 0) {
            h ^= h << 1;
            h ^= h >>> 3;
            h ^= h << 10;
            if (h == 0)
                h = SPINS | (int) t.getId();
            else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                Thread.yield();
        } else if (slot != p)
            spins = SPINS;
        // 此處表示未設(shè)置超時(shí)或者時(shí)間未超時(shí)
        else if (!t.isInterrupted() && arena == null &&
            (!timed || (ns = end - System.nanoTime()) > 0 L)) {
            // 設(shè)置線程t被當(dāng)前對(duì)象阻塞
            U.putObject(t, BLOCKER, this);
            // 給p掛機(jī)線程的值賦值
            p.parked = t;
            if (slot == p)
                // 如果slot還沒有被置為null,也就表示暫未有線程過來交換數(shù)據(jù),需要將當(dāng)前線程掛起
                U.park(false, ns);
            // 線程被喚醒,將被掛起的線程設(shè)置為null
            p.parked = null;
            // 設(shè)置線程t未被任何對(duì)象阻塞
            U.putObject(t, BLOCKER, null);
        // 不是以上條件時(shí)(可能是arena已不為null或者超時(shí))    
        } else if (U.compareAndSwapObject(this, SLOT, p, null)) {
             // arena不為null則v為null,其它為超時(shí)則v為超市對(duì)象TIMED_OUT,并且跳出循環(huán)
            v = timed && ns <= 0 L && !t.isInterrupted() ? TIMED_OUT : null;
            break;
        }
    }
    // 取走match值,并將p中的match置為null
    U.putOrderedObject(p, MATCH, null);
    // 設(shè)置item為null
    p.item = null;
    p.hash = h;
    // 返回交換值
    return v;
}

再來看arenaExchange方法,此方法被執(zhí)行時(shí)表示多個(gè)線程進(jìn)入交換區(qū)交換數(shù)據(jù),arena數(shù)組已被初始化,此方法中的一些處理方式和slotExchange比較類似,它是通過遍歷arena數(shù)組找到需要交換的數(shù)據(jù)。arenaExchange方法源碼定義如下:

// timed 為true表示設(shè)置了超時(shí)時(shí)間,ns為>0的值,反之沒有設(shè)置超時(shí)時(shí)間
private final Object arenaExchange(Object item, boolean timed, long ns) {
    Node[] a = arena;
    // 獲取當(dāng)前線程中的存放的node
    Node p = participant.get();
    //index初始值0
    for (int i = p.index;;) { // access slot at i
        // 遍歷,如果在數(shù)組中找到數(shù)據(jù)則直接交換并喚醒線程,如未找到則將需要交換給其它線程的數(shù)據(jù)放置于數(shù)組中
        int b, m, c;
        long j; // j is raw array offset
        // 其實(shí)這里就是向右遍歷數(shù)組,只是用到了元素在內(nèi)存偏移的偏移量
        // q實(shí)際為arena數(shù)組偏移(i + 1) *  128個(gè)地址位上的node
        Node q = (Node) U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
        // 如果q不為null,并且CAS操作成功,將下標(biāo)j的元素置為null
        if (q != null && U.compareAndSwapObject(a, j, q, null)) {
            // 表示當(dāng)前線程已發(fā)現(xiàn)有交換的數(shù)據(jù),然后獲取數(shù)據(jù),喚醒等待的線程
            Object v = q.item; // release
            q.match = item;
            Thread w = q.parked;
            if (w != null)
                U.unpark(w);
            return v;
        // q 為null 并且 i 未超過數(shù)組邊界    
        } else if (i <= (m = (b = bound) & MMASK) && q == null) {
             // 將需要給其它線程的item賦予給p中的item
            p.item = item; // offer
            if (U.compareAndSwapObject(a, j, null, p)) {
                // 交換成功
                long end = (timed && m == 0) ? System.nanoTime() + ns : 0 L;
                Thread t = Thread.currentThread(); // wait
                // 自旋直到有其它線程進(jìn)入,遍歷到該元素并與其交換,同時(shí)當(dāng)前線程被喚醒
                for (int h = p.hash, spins = SPINS;;) {
                    Object v = p.match;
                    if (v != null) {
                        // 其它線程設(shè)置的需要交換的數(shù)據(jù)match不為null
                        // 將match設(shè)置null,item設(shè)置為null
                        U.putOrderedObject(p, MATCH, null);
                        p.item = null; // clear for next use
                        p.hash = h;
                        return v;
                    } else if (spins > 0) {
                        h ^= h << 1;
                        h ^= h >>> 3;
                        h ^= h << 10; // xorshift
                        if (h == 0) // initialize hash
                            h = SPINS | (int) t.getId();
                        else if (h < 0 && // approx 50% true
                            (--spins & ((SPINS >>> 1) - 1)) == 0)
                            Thread.yield(); // two yields per wait
                    } else if (U.getObjectVolatile(a, j) != p)
                        // 和slotExchange方法中的類似,arena數(shù)組中的數(shù)據(jù)已被CAS設(shè)置
                       // match值還未設(shè)置,讓其再自旋等待match被設(shè)置
                        spins = SPINS; // releaser hasn't set match yet
                    else if (!t.isInterrupted() && m == 0 &&
                        (!timed ||
                            (ns = end - System.nanoTime()) > 0 L)) {
                        // 設(shè)置線程t被當(dāng)前對(duì)象阻塞
                        U.putObject(t, BLOCKER, this); // emulate LockSupport
                         // 線程t賦值
                        p.parked = t; // minimize window
                        if (U.getObjectVolatile(a, j) == p)
                            // 數(shù)組中對(duì)象還相等,表示線程還未被喚醒,喚醒線程
                            U.park(false, ns);
                        p.parked = null;
                         // 設(shè)置線程t未被任何對(duì)象阻塞
                        U.putObject(t, BLOCKER, null);
                    } else if (U.getObjectVolatile(a, j) == p &&
                        U.compareAndSwapObject(a, j, p, null)) {
                        // 這里給bound增加加一個(gè)SEQ
                        if (m != 0) // try to shrink
                            U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
                        p.item = null;
                        p.hash = h;
                        i = p.index >>>= 1; // descend
                        if (Thread.interrupted())
                            return null;
                        if (timed && m == 0 && ns <= 0 L)
                            return TIMED_OUT;
                        break; // expired; restart
                    }
                }
            } else
                // 交換失敗,表示有其它線程更改了arena數(shù)組中下標(biāo)i的元素
                p.item = null; // clear offer
        } else {
            // 此時(shí)表示下標(biāo)不在bound & MMASK或q不為null但CAS操作失敗
           // 需要更新bound變化后的值
            if (p.bound != b) { // stale; reset
                p.bound = b;
                p.collides = 0;
                // 反向遍歷
                i = (i != m || m == 0) ? m : m - 1;
            } else if ((c = p.collides) < m || m == FULL ||
                !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                 // 記錄CAS失敗的次數(shù)
                p.collides = c + 1;
                // 循環(huán)遍歷
                i = (i == 0) ? m : i - 1; // cyclically traverse
            } else
                // 此時(shí)表示bound值增加了SEQ+1
                i = m + 1; // grow
            // 設(shè)置下標(biāo)
            p.index = i;
        }
    }
}

????看完上面slotExchange方法和arenaExchange方法定義,我們可以看出Exchanger工具類的實(shí)現(xiàn)還是很復(fù)雜的,雖然Exchanger的使用比較簡(jiǎn)單。


關(guān)注下面公眾號(hào),回復(fù) 1024 領(lǐng)取最新大廠面試資料

image
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容