Sentinel源碼分析----調(diào)用流程總覽

Sentinel 是什么?github描述如下

隨著微服務(wù)的流行,服務(wù)和服務(wù)之間的穩(wěn)定性變得越來(lái)越重要。Sentinel 以流量為切入點(diǎn),從流量控制、熔斷降級(jí)、系統(tǒng)負(fù)載保護(hù)等多個(gè)維度保護(hù)服務(wù)的穩(wěn)定性。

本文建立在會(huì)使用Sentinel的基礎(chǔ)上,詳細(xì)的介紹和使用不會(huì)展開(kāi),具體介紹和使用看:Sentinel介紹

一個(gè)簡(jiǎn)單的Demo如下:

        String resourceName = "資源名稱";
        Entry entry = null;
        try {
            entry = SphU.entry(resourceName);
            run();
        } catch (BlockException ex) {
            throw ex;
        } catch (Throwable ex) {
            Tracer.trace(ex);
            throw ex;
        } finally {
            if (entry != null) {
                entry.exit();
            }
        }

這就是一個(gè)設(shè)置之后,run方法就會(huì)被Sentinel所監(jiān)控起來(lái),但是這時(shí)候,是沒(méi)有任何效果的,因?yàn)闆](méi)有告訴Sentinel需要去限制什么?在Sentinel中,這個(gè)叫做規(guī)則,即你需要設(shè)置好限制的規(guī)則,Sentinel會(huì)根據(jù)設(shè)置的規(guī)則去限制你的代碼,即上面的run方法,那么下面來(lái)看下Sentinel的整個(gè)調(diào)用流程是如何。

SphU.entry方法為入口,一步步的跟進(jìn)去

    public static Entry entry(String name) throws BlockException {
        return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);// 1
    }

    //CtSph.java
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
        StringResourceWrapper resource = new StringResourceWrapper(name, type);//2
        return entry(resource, count, args);//3
    }
  • 1:entry有很多重載的方法,如果不填,就會(huì)設(shè)置默認(rèn)值,其他參數(shù)后續(xù)分析
  • 2:對(duì)于Sentinel來(lái)說(shuō),限制的是資源,這里將名稱和EntryType構(gòu)造成一個(gè)資源對(duì)象
  • 3:接著調(diào)用entry方法進(jìn)行處理
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        return entryWithPriority(resourceWrapper, count, false, args);
    }
    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        Context context = ContextUtil.getContext();// 1
        if (context instanceof NullContext) {
            return new CtEntry(resourceWrapper, null, context);
        }

        if (context == null) {
            context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());//2
        }

        if (!Constants.ON) {//3
            return new CtEntry(resourceWrapper, null, context);
        }

        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);//4

        /*
         * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
         * so no rule checking will be done.
         */
        if (chain == null) {// 5
            return new CtEntry(resourceWrapper, null, context);
        }

        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            chain.entry(context, resourceWrapper, null, count, prioritized, args);//6
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }
  • 1:從ThreadLocal中獲取一個(gè)上下文對(duì)象,此時(shí)第一次調(diào)用,ThreadLocal為空。當(dāng)然我們?cè)赿emo中可以手動(dòng)指定一個(gè)上下文,那么到這里就不會(huì)為空了
  • 2:第一次為空,所以需要進(jìn)行一個(gè)Context的初始化
  • 3:一個(gè)全局開(kāi)關(guān),可供動(dòng)態(tài)切換,如果關(guān)閉了,則后續(xù)就不會(huì)走規(guī)則校驗(yàn)
  • 4:畫個(gè)重點(diǎn)!?。?!這個(gè)ProcessorSlotChain是Sentinel整個(gè)流程的核心,相當(dāng)于一個(gè)攔截器鏈,所有請(qǐng)求會(huì)經(jīng)過(guò)攔截器鏈進(jìn)行處理,一會(huì)分析
  • 5:chain為空,是由某種情況引起的,具體情況在lookProcessChain
  • 6:開(kāi)始執(zhí)行核心邏輯

獲取上下文及初始化

private static ThreadLocal<Context> contextHolder = new ThreadLocal<Context>();
    public static Context getContext() {
        return contextHolder.get();
    }

從代碼中看到,contextHolder在此之前沒(méi)有做過(guò)初始化,那么會(huì)走到如下方法:

    //com.alibaba.csp.sentinel.CtSph
    MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
    //com.alibaba.csp.sentinel.CtSph.MyContextUtil
    private final static class MyContextUtil extends ContextUtil {
        static Context myEnter(String name, String origin, EntryType type) {
            return trueEnter(name, origin);
        }
    }
    //com.alibaba.csp.sentinel.context.ContextUtil
    protected static Context trueEnter(String name, String origin) {
        Context context = contextHolder.get();//1
        if (context == null) {
            Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;//2
            DefaultNode node = localCacheNameMap.get(name);//3
            if (node == null) {
                if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {// 4
                    setNullContext();
                    return NULL_CONTEXT;
                } else {
                    try {
                        LOCK.lock();
                        node = contextNameNodeMap.get(name);
                        if (node == null) {
                            if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {// 5
                                setNullContext();
                                return NULL_CONTEXT;
                            } else {// 6
                                node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                // Add entrance node.
                                Constants.ROOT.addChild(node);

                                Map<String, DefaultNode> newMap = new HashMap<String, DefaultNode>(
                                    contextNameNodeMap.size() + 1);
                                newMap.putAll(contextNameNodeMap);
                                newMap.put(name, node);
                                contextNameNodeMap = newMap;
                            }
                        }
                    } finally {
                        LOCK.unlock();
                    }
                }
            }
            //7
            context = new Context(node, name);
            context.setOrigin(origin);
            contextHolder.set(context);
        }

        return context;
    }
  • 1:首先這里從ThreadLocal中獲取,這時(shí)還是空的
  • 2:contextNameNodeMap這里初始化的時(shí)候默認(rèn)加了個(gè)name為Constants.CONTEXT_DEFAULT_NAME的節(jié)點(diǎn)進(jìn)去,所以上述demo會(huì)走到7。這里
  • 3:假設(shè)我們自己指定了上下文名稱(是否要指定上下文需要看情況),那么第一次進(jìn)行,這里為空,會(huì)進(jìn)入下面的判斷
  • 4:localCacheNameMap.size()即上下文的數(shù)量(因?yàn)橐詂ontextName為key),Sentinel限制了上下文的數(shù)量是2000以下,如果大于2000會(huì)返回NULL_CONTEXT,在之前的處理中可以看到NULL_CONTEXT是直接返回的
  • 5:由于4的判斷是在無(wú)鎖的情況下進(jìn)行的,所以需要在加鎖條件下再進(jìn)行一次判斷(類似單例double check的模式),假設(shè)沒(méi)有問(wèn)題則會(huì)走到6
  • 6:這里創(chuàng)建了個(gè)EntranceNode節(jié)點(diǎn),關(guān)于Sentinel中Node的問(wèn)題會(huì)在后續(xù)文章分析
  • 7:創(chuàng)建了個(gè)上下文對(duì)象并且放入了ThreadLocal中,下次同一線程可以直接獲取

注意點(diǎn):

  1. 由于這個(gè)Node節(jié)點(diǎn)內(nèi)部有許多信息,為了限制內(nèi)存占用,會(huì)限制上下文的數(shù)量
  2. 有些情況不關(guān)心上下文,那么就如demo中一樣,直接調(diào)用SphU.entry,那么這時(shí)候會(huì)指定一個(gè)默認(rèn)的供其使用,如果需要區(qū)分上下文,那么則需要在SphU.entry之前調(diào)用ContextUtil.enter方法指定上下文

調(diào)用鏈的創(chuàng)建與觸發(fā)

    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);//1
        if (chain == null) {// 2
            synchronized (LOCK) {// 3
                chain = chainMap.get(resourceWrapper);// 4
                if (chain == null) {//5
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {// 6
                        return null;
                    }
                  
                    chain = SlotChainProvider.newSlotChain();// 7
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }
  • 1:上面的Node節(jié)點(diǎn)是和上下文關(guān)聯(lián)的,而這個(gè)的ProcessorSlotChain是和資源關(guān)聯(lián)的,即一個(gè)資源會(huì)有一個(gè)ProcessorSlotChain對(duì)象
  • 2~5:使用了double check,判斷map中是否有該資源的ProcessorSlotChain對(duì)應(yīng)
  • 6:同上面上下文的創(chuàng)建,控制內(nèi)存
  • 7:初始化一個(gè)ProcessorSlotChain對(duì)象并放入map中

看下SlotChainProvider.newSlotChain方法

    public static ProcessorSlotChain newSlotChain() {
        if (builder != null) {// 第一次會(huì)為空,需要初始化
            return builder.build();
        }
        // 初始化builder
        resolveSlotChainBuilder();

        if (builder == null) {// 初始化后仍然為空,則設(shè)置為默認(rèn)的Builder
            builder = new DefaultSlotChainBuilder();
        }
        // 通過(guò)builder創(chuàng)建ProcessorSlotChain
        return builder.build();
    }
    // 通過(guò)java SPI的機(jī)制獲取對(duì)應(yīng)的信息
    private static final ServiceLoader<SlotChainBuilder> LOADER = 
            ServiceLoader.load(SlotChainBuilder.class);
    private static void resolveSlotChainBuilder() {
        List<SlotChainBuilder> list = new ArrayList<SlotChainBuilder>();
        boolean hasOther = false;
        // 獲取SPI中配置的實(shí)現(xiàn)
        for (SlotChainBuilder builder : LOADER) {
            // 如果SPI的配置文件中自定義了實(shí)現(xiàn)
            if (builder.getClass() != DefaultSlotChainBuilder.class) {
                hasOther = true;
                list.add(builder);
            }
        }
        // 如果有多個(gè)自定義實(shí)現(xiàn),則默認(rèn)取第一個(gè)
        if (hasOther) {
            builder = list.get(0);
        } else {
            // 沒(méi)有自定義實(shí)現(xiàn)那么取默認(rèn)實(shí)現(xiàn).
            builder = new DefaultSlotChainBuilder();
        }
    }

Sentinel中大量使用了Java 的SPI機(jī)制去進(jìn)行一個(gè)擴(kuò)展,這里就用來(lái)擴(kuò)展Builder,如果我們需要自己去自定義一個(gè)Buidler,去排列調(diào)用鏈中的元素節(jié)點(diǎn),那么可以參考Java SPI機(jī)制去配置,那么Sentinel就選擇自定義的Builder去創(chuàng)建ProcessorSlotChain,而默認(rèn)情況使用的是DefaultSlotChainBuilder,那么看下其build方法

    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new FlowSlot()); 
        chain.addLast(new DegradeSlot());
        return chain;
    }

可以看到DefaultSlotChainBuilder已經(jīng)默認(rèn)排列好了調(diào)用鏈中的節(jié)點(diǎn),其實(shí)內(nèi)部就類似一個(gè)攔截器鏈,Slot是攔截器鏈中的攔截器節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)的功能不同,具體功能如下:

  • NodeSelectorSlot:用于創(chuàng)建Node節(jié)點(diǎn)
  • ClusterBuilderSlot:用于創(chuàng)建ClusterNode節(jié)點(diǎn)
  • LogSlot:目前對(duì)于被規(guī)則限制的情況,交給了StatLogger處理,但是好像沒(méi)啥效果?
  • StatisticSlot:用于統(tǒng)計(jì)當(dāng)前流量通過(guò)的情況
  • SystemSlot:用于系統(tǒng)負(fù)載規(guī)則的處理
  • AuthoritySlot: 用于黑白名單規(guī)則的處理
  • FlowSlot:用于限流規(guī)則的處理
  • DegradeSlot:用于降級(jí)規(guī)則的處理

DefaultProcessorSlotChain這個(gè)調(diào)用鏈或者說(shuō)攔截器鏈,一般來(lái)說(shuō)是數(shù)組或者鏈表實(shí)現(xiàn)的,通過(guò)上面的addLast方法來(lái)看,應(yīng)該用鏈表會(huì)比較合適(這個(gè)類似Netty的pipeline,有addLast的話,應(yīng)該有addFirst,如果有addFirst的話,數(shù)組就不合適了,因?yàn)閿?shù)組插入元素的話比較麻煩,而鏈表就比較容易了)

public class DefaultProcessorSlotChain extends ProcessorSlotChain {

    AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {

        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
            throws Throwable {
            super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
        }

        @Override
        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            super.fireExit(context, resourceWrapper, count, args);
        }

    };
    AbstractLinkedProcessorSlot<?> end = first;

    @Override
    public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        protocolProcessor.setNext(first.getNext());
        first.setNext(protocolProcessor);
        if (end == first) {
            end = protocolProcessor;
        }
    }

    @Override
    public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        end.setNext(protocolProcessor);
        end = protocolProcessor;
    }

    @Override
    public void setNext(AbstractLinkedProcessorSlot<?> next) {
        addLast(next);
    }
}

結(jié)構(gòu)就是鏈表的結(jié)構(gòu),有頭節(jié)點(diǎn),尾節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)都有個(gè)next引用指向下一個(gè)節(jié)點(diǎn),這里需要注意的是next引用它是在父類里的,這里可以類比一下Netty的pipeline,有少許不同,但是核心都差不多

還有個(gè)點(diǎn)需要看下,和Netty有點(diǎn)類似,以FlowSlot為例

public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

可以看到最后執(zhí)行完checkFlow還調(diào)用了一次fireEntry,這個(gè)會(huì)繼續(xù)往后觸發(fā),Netty的pipeline也是這樣的觸發(fā)形式

數(shù)據(jù)統(tǒng)計(jì)

上面介紹了幾個(gè)Slot的作用,以常用的限流規(guī)則為例,我們?cè)诳刂婆_(tái)配置限流規(guī)則:

image.png

例如配置qps為10,那么在FlowSlot會(huì)檢查當(dāng)前qps是否超過(guò)這個(gè)值,沒(méi)超過(guò)則通過(guò)該請(qǐng)求,否則拋出異常,那么有個(gè)疑問(wèn),FlowSlot如何獲取當(dāng)前服務(wù)的一個(gè)qps或者說(shuō)請(qǐng)求量呢?

這時(shí)候就輪到我們的主角StatisticSlot登場(chǎng)了,其entry方法如下

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            fireEntry(context, resourceWrapper, node, count, prioritized, args);// 1

            node.increaseThreadNum();//2
            node.addPassRequest(count);//3

            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();//4
                context.getCurEntry().getOriginNode().addPassRequest(count);//5
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseThreadNum();//6
                Constants.ENTRY_NODE.addPassRequest(count);//7
            }

            // ....
        } catch (BlockException e) {
            // ....
            node.increaseBlockQps(count);//8
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);//9
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseBlockQps(count);//10
            }
            // ....
            throw e;
        } catch (Throwable e) {
            // ....
            node.increaseExceptionQps(count);//11
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseExceptionQps(count);//12
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseExceptionQps(count);//13
            }
            throw e;
        }
    }

上面是entry方法做的邏輯,主要關(guān)注的幾點(diǎn)已經(jīng)標(biāo)注出來(lái)了,fireEntry這里是觸發(fā)后續(xù)節(jié)點(diǎn),從DefaultSlotChainBuilder#build方法中可以看到StatisticSlot后還有四個(gè)節(jié)點(diǎn)用來(lái)校驗(yàn)規(guī)則,即這里的fireEntry會(huì)觸發(fā)規(guī)則的校驗(yàn),規(guī)則校驗(yàn)通過(guò)則往下走,失敗的走catch塊。

從2~13的方法名稱中可以知道這里進(jìn)行了流量的統(tǒng)計(jì),例如增加線程數(shù)->increaseThreadNum,增加通過(guò)的請(qǐng)求數(shù)->addPassRequest,增加block請(qǐng)求qps->increaseBlockQps,增加異常qps->increaseExceptionQps,到這里就能知道Sentinel是如何統(tǒng)計(jì)請(qǐng)求數(shù)的(Node的具體原理后續(xù)分析)。

結(jié)合上述調(diào)用鏈的執(zhí)行,整個(gè)流程如下(省略部分Slot的處理):


image.png

總結(jié)

Sentinel的保護(hù)措施是在一個(gè)Slot鏈中,Slot鏈有不同的節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)負(fù)責(zé)不同的事情,例如降級(jí)相關(guān)規(guī)則、系統(tǒng)負(fù)載相關(guān)規(guī)則、限流相關(guān)規(guī)則的處理,如果觸發(fā)了某個(gè)規(guī)則(例如qps已經(jīng)超過(guò)配置的規(guī)則),那么會(huì)拋出異常,而Slot鏈有個(gè)節(jié)點(diǎn)負(fù)責(zé)統(tǒng)計(jì)成功和異常的數(shù)量,然后這時(shí)候就不會(huì)執(zhí)行保護(hù)的代碼,達(dá)到一個(gè)保護(hù)的作用

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容