dubbo技術(shù)內(nèi)幕六 RoundRobinLoadBalance

RoundRobinLoadBalance是dubbo里面提供的按照權(quán)重進(jìn)行輪詢的負(fù)載均衡算法,整個(gè)算法設(shè)計(jì)的非常巧妙,如下
1 初始化本地權(quán)重表,根據(jù)情況動(dòng)態(tài)調(diào)整
2 每次動(dòng)態(tài)的更新本地權(quán)重表,更新算法為當(dāng)前invoker的權(quán)重+本地權(quán)重表的old值
3 選取本地權(quán)重最大的invoker,并將其本地權(quán)重表的權(quán)重 減去 本輪所有invoker的權(quán)重和,并返回當(dāng)前的invoker

重復(fù)3
tip 本地權(quán)重表中如果一分鐘內(nèi)某個(gè)權(quán)重信息沒(méi)有更新(即對(duì)應(yīng)的invoker沒(méi)有被調(diào)用,那么回收,重新的初始化)
,舉個(gè)例子,引用于
https://blog.csdn.net/lizz861109/article/details/109517749(侵權(quán)請(qǐng)聯(lián)系我刪除)
圖如下

image.png

源碼分析如下

public class RoundRobinLoadBalance extends AbstractLoadBalance {
    //負(fù)載均衡器的名稱
    public static final String NAME = "roundrobin";
    //更新時(shí)間(就是某個(gè)invoker一直沒(méi)有被調(diào)用,將其刪除,并重新的初始化)
    private static int RECYCLE_PERIOD = 60000;
    //內(nèi)置的權(quán)重輪詢器,主要對(duì)當(dāng)前invoker的默認(rèn)權(quán)重weight(一般不會(huì)變,如果是啟動(dòng) 
  //十分鐘內(nèi)會(huì)變化)和當(dāng)前權(quán)重current(如果被選中,會(huì)減少total的權(quán)重,保證下次被 //選中的概率減?。?    protected static class WeightedRoundRobin {
        //配置權(quán)重
        private int weight;
        //當(dāng)前權(quán)重
        private AtomicLong current = new AtomicLong(0);
        //最后更新時(shí)間
        private long lastUpdate;
        public int getWeight() {
            return weight;
        }
        public void setWeight(int weight) {
            this.weight = weight;
            current.set(0);
        }
        //每次輪詢的時(shí)候,都會(huì)將當(dāng)前權(quán)重 + 默認(rèn)權(quán)重再次設(shè)置為當(dāng)前權(quán)重
        public long increaseCurrent() {
            return current.addAndGet(weight);
        }
       //如果被選中,則當(dāng)前權(quán)重減去 total
        public void sel(int total) {
            current.addAndGet(-1 * total);
        }
        public long getLastUpdate() {
            return lastUpdate;
        }
        public void setLastUpdate(long lastUpdate) {
            this.lastUpdate = lastUpdate;
        }
    }
    //緩存
    private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
     //cas鎖
    private AtomicBoolean updateLock = new AtomicBoolean();
    
    /**
     * get invoker addr list cached for specified invocation
     * <p>
     * <b>for unit test only</b>
     * 
     * @param invokers
     * @param invocation
     * @return
     */
    protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map != null) {
            return map.keySet();
        }
        return null;
    }
    
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        //獲取key  service + method 
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
         //如果為空,那么初始化一個(gè)
        if (map == null) {
            methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
            map = methodWeightMap.get(key);
        }
        int totalWeight = 0;
        long maxCurrent = Long.MIN_VALUE;
        long now = System.currentTimeMillis();
        Invoker<T> selectedInvoker = null;
        WeightedRoundRobin selectedWRR = null;
        for (Invoker<T> invoker : invokers) {
            String identifyString = invoker.getUrl().toIdentityString();
            WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
           //獲取當(dāng)前invoker的權(quán)重
            int weight = getWeight(invoker, invocation);
            if (weight < 0) {
                weight = 0;
            }
             //如果當(dāng)前invoker對(duì)應(yīng)的weightedRoundRobin為空,那么new一個(gè),順便使用
           //當(dāng)前invoker的信息來(lái)初始化這個(gè)weightedRoundRobin
            if (weightedRoundRobin == null) {
                weightedRoundRobin = new WeightedRoundRobin();
                weightedRoundRobin.setWeight(weight);
                map.putIfAbsent(identifyString, weightedRoundRobin);
                weightedRoundRobin = map.get(identifyString);
            }
            //如果默認(rèn)的weight與實(shí)際的不一致,那么更新(有可能在warmup階段)
            if (weight != weightedRoundRobin.getWeight()) {
                //weight changed
                weightedRoundRobin.setWeight(weight);
            }
            //將invoker的當(dāng)前權(quán)重 + 默認(rèn)權(quán)重
            long cur = weightedRoundRobin.increaseCurrent();
           //更新時(shí)間
            weightedRoundRobin.setLastUpdate(now);
            if (cur > maxCurrent) {
                maxCurrent = cur;
                selectedInvoker = invoker;
                selectedWRR = weightedRoundRobin;
            }
            totalWeight += weight;
        }
       //如果invokers的個(gè)數(shù)與緩存的個(gè)數(shù)對(duì)不上,進(jìn)行更新
        if (!updateLock.get() && invokers.size() != map.size()) {
            if (updateLock.compareAndSet(false, true)) {
                try {
                    // copy -> modify -> update reference
                    ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
                    newMap.putAll(map);
                    Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
                    while (it.hasNext()) {
                        //如果某個(gè)invoker一分鐘都沒(méi)有更新了(也就是根本沒(méi)有請(qǐng)求過(guò)來(lái))
                       //將其回收,下次請(qǐng)求進(jìn)來(lái)會(huì)重建
                        Entry<String, WeightedRoundRobin> item = it.next();
                        if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
                            it.remove();
                        }
                    }
                    methodWeightMap.put(key, newMap);
                } finally {
                    updateLock.set(false);
                }
            }
        }
        //經(jīng)過(guò)上面的操作之后,當(dāng)前權(quán)重最大的invoker被選出來(lái)了
        if (selectedInvoker != null) {
            //更新當(dāng)前權(quán)重(也就是減去totalWeight  )
            selectedWRR.sel(totalWeight);
            return selectedInvoker;
        }
        // should not happen here
        return invokers.get(0);
    }

}

大家結(jié)合圖和代碼就能很好的理解這個(gè)加權(quán)輪詢算法了

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容