由dubbo的TpsLimitFilter限流,說說dubbo的Filter實現(xiàn)機制

Dubbo提供了過程攔截(即Filter)功能。dubbo的大多數(shù)功能都基于此功能實現(xiàn)。在dubbo的服務(wù)端,提供了一個限流Filter(TpsLimitFilter),用于在服務(wù)端控制單位時間內(nèi)(默認是60s)的調(diào)用數(shù)量tps。超過此數(shù)量,則服務(wù)端將會報錯。

一、TpsLimitFilter的使用

# 1.1、TpsLimitFilter源碼

@Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
public class TpsLimitFilter implements Filter {

    private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
            throw new RpcException(
                    new StringBuilder(64)
                            .append("Failed to invoke service ")
                            .append(invoker.getInterface().getName())
                            .append(".")
                            .append(invocation.getMethodName())
                            .append(" because exceed max service tps.")
                            .toString());
        }

        return invoker.invoke(invocation);
    }

}
public class DefaultTPSLimiter implements TPSLimiter {

    private final ConcurrentMap<String, StatItem> stats
        = new ConcurrentHashMap<String, StatItem>();
    
    public boolean isAllowable(URL url, Invocation invocation) {
        int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
        long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
                                         Constants.DEFAULT_TPS_LIMIT_INTERVAL);
        String serviceKey = url.getServiceKey();
        if (rate > 0) {
            StatItem statItem = stats.get(serviceKey);
            if (statItem == null) {
                stats.putIfAbsent(serviceKey,
                                  new StatItem(serviceKey, rate, interval));
                statItem = stats.get(serviceKey);
            }
            return statItem.isAllowable(url, invocation);
        } else {
            StatItem statItem = stats.get(serviceKey);
            if (statItem != null) {
                stats.remove(serviceKey);
            }
        }

        return true;
    }

}


class StatItem {

    private String name;

    private long lastResetTime;

    private long interval;

    private AtomicInteger token;

    private int rate;

    StatItem(String name, int rate, long interval) {
        this.name = name;
        this.rate = rate;
        this.interval = interval;
        this.lastResetTime = System.currentTimeMillis();
        this.token = new AtomicInteger(rate);
    }

    public boolean isAllowable(URL url, Invocation invocation) {
        long now = System.currentTimeMillis();
        if (now > lastResetTime + interval) {
            token.set(rate);
            lastResetTime = now;
        }

        int value = token.get();
        boolean flag = false;
        while (value > 0 && !flag) {
            flag = token.compareAndSet(value, value - 1);
            value = token.get();
        }

        return flag;
    }

    long getLastResetTime() {
        return lastResetTime;
    }
    
    int getToken() {
        return token.get();
    }
    
    public String toString() {
        return new StringBuilder(32).append("StatItem ")
            .append("[name=").append(name).append(", ")
            .append("rate = ").append(rate).append(", ")
            .append("interval = ").append(interval).append("]")
            .toString();
    }

}

此限流過濾器的思想就是在規(guī)定的時間內(nèi)(dubbo默認是60s),看請求數(shù)是否小于tps的數(shù)量。如果這一次的請求時間距離上一次統(tǒng)計的開始時間在60s內(nèi),那就計數(shù),如果大于tps,就報錯,如果這一次請求時間間隔已經(jīng)大于60s,那么把此次的時間作為統(tǒng)計的開始時間。算法比較簡單。

1.2、如何使用此filter

dubbo沒有把這個TpsLimitFilter放入默認啟動的filter中。下面是dubbo默認啟動的filter類型

echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
validation=com.alibaba.dubbo.validation.filter.ValidationFilter
cache=com.alibaba.dubbo.cache.filter.CacheFilter
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter

那么如何使用這個filter呢?

1.3新建filter文件

所以,我們要應(yīng)用這個限流過濾器,需要在我們的resources目錄下自己新建以filter接口為文件名的文件,如下:


在這里插入圖片描述

里面的內(nèi)容就是tps的類路徑:
tps=com.alibaba.dubbo.rpc.filter.TpsLimitFilter

1.4 使用配置規(guī)則將tps寫入注冊中心的url。

根據(jù)dubbo的user-book的說明,可以通過向注冊中心寫入配置規(guī)則,完成tps的限流操作。

    public static void setLimit(){
        //獲得注冊工程的spi擴展實例
        RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
        //根據(jù)url的zookeeper,確定是zookeeper注冊中心,通過ip和端口號,連上zookeeper注冊中心
        Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://192.168.25.128:2181"));
          //向注冊中心寫入配置規(guī)則
        registry.register(URL.valueOf("override://0.0.0.0/cn.andy.dubbo.DataService?tps=5&category=configurators"
             ));

    
    }

最后一句的/0.0.0.0表示對所有的ip都有效。(即我們的服務(wù)可能會放入很多的機器,那么這些機器都會執(zhí)行tps規(guī)則),我們定義了tps=5次。
這里的setLimit()方法可以在任意地方執(zhí)行。為了方便,我在Main方法中,啟動了服務(wù)端程序后執(zhí)行這個方法。

    private static final Log log = LogFactory.getLog(DubboProviderMain.class);

    public static void main(String[] args) {
        try {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-service.xml");
            context.start();
        //  setRouter();
            setLimit();
            
        } catch (Exception e) {
            log.error("== DubboProvider context start error:",e);
        }
        synchronized (DubboProviderMain.class) {
            while (true) {
                try {
                    DubboProviderMain.class.wait();
                } catch (InterruptedException e) {
                    log.error("== synchronized error:",e);
                }
            }
        }
    }
    

1.5 測試

我們把這個jar打包,分別放入兩個不同的虛擬機中,然后在客戶端執(zhí)行請求。結(jié)果表明,兩個虛擬機在60s內(nèi)各執(zhí)行了5次后,就開始報錯。這也從另一個方面驗證了,TpsLimitFilter限流是針對單機的。
(在測試時候,如果服務(wù)jar包,一個在虛擬機,另一個和web程序都在本地,那么申請都會發(fā)往本地,而不是隨機在兩個jar之間調(diào)用)

二、由此帶來的思考

a、TpsLimitFilter為什么只能通過寫入配置規(guī)則的方式使用,而不能直接在xml中直接寫入?
b、TpsLimitFilter有類上有@Activate注解,為什么不能像其他的內(nèi)置filter一樣,默認開啟?

2.1 @Activate注解

Activate注解可以通過group和value配置激活條件,使得被Activate注解的擴展點實現(xiàn)在滿足上述兩個條件時候被激活使用,通常用于filter的激活。

@Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
public class TpsLimitFilter implements Filter{}

又比如

@Activate(group = Constants.PROVIDER, value = Constants.TOKEN_KEY)
public class TokenFilter implements Filter {}

又比如

@Activate(group = Constants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {}

對于注解中,沒有value的情況,只需進行g(shù)roup匹配(在filter中,分為provider和consumer)。在由value的情況下,必須group匹配,而且value有值的情況下,才會啟動。

2.2 回答第一個問題,TpsLimitFilter為什么不能直接在xml中寫入。

在dubbo中,有些filter是可以直接在xml中直接寫入的。比如令牌驗證功能:
在dubbo的xml配置中,加入token配置,則相當(dāng)于啟動了TokenFilter功能。

<dubbo:service retries="0" interface="cn.andy.dubbo.DataService" ref="dataServiceImpl" timeout="60000" mock="true"  token="1234567" />

但是,如果我們?nèi)缟厦鎸崿F(xiàn)1.2操作后,在xml中寫入如下tps:

<dubbo:service retries="0" interface="cn.andy.dubbo.DataService" ref="dataServiceImpl" timeout="60000" mock="false"  token="1234567"  filter="andyFilter"  tps="5"
                    />

則會報錯

2018-11-29 18:32:16,582 ERROR [DubboProviderMain.java:39] : == DubboProvider context start error:
org.springframework.beans.factory.parsing.BeanDefinitionParsingException: Configuration problem: Failed to import bean definitions from URL location [classpath*:spring/applicationContext-dubbo.xml]
Offending resource: class path resource [spring/applicationContext-service.xml]; nested exception is org.springframework.beans.factory.xml.XmlBeanDefinitionStoreException: Line 28 in XML document from URL [file:/E:/javaee/DubboTest/Dubbo-test-service-jar/target/classes/spring/applicationContext-dubbo.xml] is invalid; nested exception is org.xml.sax.SAXParseException; lineNumber: 28; columnNumber: 8; cvc-complex-type.3.2.2: 元素 'dubbo:service' 中不允許出現(xiàn)屬性 'tps'。
    at org.springframework.beans.factory.parsing.FailFastProblemReporter.error(FailFastProblemReporter.java:70)
    at org.springframework.beans.factory.parsing.ReaderContext.error(ReaderContext.java:85)

大意就是dubbo:service中沒有tps這個元素。
我們知道,spring會去解析dubbo標(biāo)簽,來完成dubbo相關(guān)類的實例化。所以,直接查看dubbo.xsd,看看dubbo的自定義標(biāo)簽service中,是否有這個tps屬性:

    <xsd:complexType name="serviceType">
        <xsd:complexContent>
            <xsd:extension base="abstractServiceType">
                <xsd:choice minOccurs="0" maxOccurs="unbounded">
                    <xsd:element ref="method" minOccurs="0" maxOccurs="unbounded" />
                    <xsd:element ref="parameter" minOccurs="0" maxOccurs="unbounded" />
                    <xsd:element ref="beans:property" minOccurs="0" maxOccurs="unbounded" />
                </xsd:choice>
                <xsd:attribute name="interface" type="xsd:token" use="required">
                    <xsd:annotation>
                        <xsd:documentation><![CDATA[ Defines the interface to advertise for this service in the service registry. ]]></xsd:documentation>
                        <xsd:appinfo>
                            <tool:annotation>
                                <tool:expected-type type="java.lang.Class"/>
                            </tool:annotation>
                        </xsd:appinfo>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:attribute name="ref" type="xsd:string" use="optional">
                    <xsd:annotation>
                        <xsd:documentation><![CDATA[ The service implementation instance bean id. ]]></xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:attribute name="class" type="xsd:string" use="optional">
                    <xsd:annotation>
                        <xsd:documentation><![CDATA[ The service implementation class name. ]]></xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:attribute name="path" type="xsd:string" use="optional">
                    <xsd:annotation>
                        <xsd:documentation><![CDATA[ The service path. ]]></xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:attribute name="provider" type="xsd:string" use="optional">
                    <xsd:annotation>
                        <xsd:documentation><![CDATA[ Deprecated. Replace to protocol. ]]></xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:anyAttribute namespace="##other" processContents="lax" />
            </xsd:extension>
        </xsd:complexContent>
    </xsd:complexType>


    <xsd:complexType name="abstractServiceType">
        <xsd:complexContent>
            <xsd:extension base="abstractInterfaceType">
                <xsd:attribute name="register" type="xsd:string" use="optional">
                    <xsd:annotation>
                        <xsd:documentation><![CDATA[ The service can be register to registry. ]]></xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:attribute name="version" type="xsd:string" use="optional" default="0.0.0">
                    <xsd:annotation>
                        <xsd:documentation><![CDATA[ The service version. ]]></xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:attribute name="group" type="xsd:string" use="optional">
                    <xsd:annotation>
                        <xsd:documentation><![CDATA[ The service group. ]]></xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:attribute name="deprecated" type="xsd:string" use="optional">
                    <xsd:annotation>
                        <xsd:documentation><![CDATA[ whether the service is deprecated. ]]></xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:attribute name="delay" type="xsd:string" use="optional" default="0">
                    <xsd:annotation>
                        <xsd:documentation>
                            <![CDATA[ The service export delay millisecond. ]]>
                        </xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:attribute name="export" type="xsd:string" use="optional">
                    <xsd:annotation>
                        <xsd:documentation>
                            <![CDATA[ The service is export. ]]>
                        </xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:attribute name="weight" type="xsd:string" use="optional">
                    <xsd:annotation>
                        <xsd:documentation>
                            <![CDATA[ The service weight. ]]>
                        </xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:attribute name="document" type="xsd:string" use="optional">
                    <xsd:annotation>
                        <xsd:documentation>
                            <![CDATA[ The service document. ]]>
                        </xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:attribute name="dynamic" type="xsd:string" use="optional">
                    <xsd:annotation>
                        <xsd:documentation><![CDATA[ the service registered to the registry is dynamic(true) or static(false). ]]></xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:attribute name="token" type="xsd:string" use="optional">
                    <xsd:annotation>
                        <xsd:documentation><![CDATA[ The service use token. ]]></xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:attribute name="accesslog" type="xsd:string" use="optional">
                    <xsd:annotation>
                        <xsd:documentation><![CDATA[ The service use accesslog. ]]></xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:attribute name="executes" type="xsd:string" use="optional">
                    <xsd:annotation>
                        <xsd:documentation><![CDATA[ The service allow execute requests. ]]></xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:attribute name="protocol" type="xsd:string" use="optional">
                    <xsd:annotation>
                        <xsd:documentation><![CDATA[ The service protocol. ]]></xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
                <xsd:anyAttribute namespace="##other" processContents="lax" />
            </xsd:extension>
        </xsd:complexContent>
    </xsd:complexType>

上面是service標(biāo)簽的屬性,從里面可以看出,是沒有tps這個屬性的,但是有token這個屬性。所以TokenFilter可以在dubbo的xml配置,而TpsLimitFilter不行。

2.3 回答TpsLimitFilter有類上有@Activate注解,為什么不能像其他的內(nèi)置filter一樣,默認開啟

這一題,從上面可以得到答案。因為TpsLimitFilter的注解有g(shù)roup和value兩個(其中,value=TPS_LIMIT_RATE_KEY = "tps"),而我們在xml配置文件中沒法寫入tps這是屬性值,所以不能啟動TpsLimitFilter。而我們通過配置規(guī)則向注冊中心寫入tps后,TpsLimitFilter就能啟動了。

registry.register(URL.valueOf("override://0.0.0.0/cn.andy.dubbo.DataService?tps=5&category=configurators"

3、源碼分析

dubbo的服務(wù)發(fā)布過程的export過程中,會先經(jīng)過ProtocolFilterWrapper,在這里,完成filter的初始化和配置。

public class ProtocolFilterWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolFilterWrapper(Protocol protocol){
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }

    public int getDefaultPort() {
        return protocol.getDefaultPort();
    }

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    }

    public void destroy() {
        protocol.destroy();
    }

    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (filters.size() > 0) {
            for (int i = filters.size() - 1; i >= 0; i --) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }

                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }
    
}

在buildInvokerChain方法中,通過 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);得到激活的filter。
其中,invoker.getUrl()就是發(fā)布父類的url路徑:

dubbo://192.168.86.1:20880/cn.andy.dubbo.DataService?anyhost=true&application=dubbo-test-service&dispatcher=all&dubbo=2.5.3&interface=cn.andy.dubbo.DataService&methods=dubboTest2,dubboTest,getStringData&mock=false&pid=67080&retries=0&service.filter=andyFilter&side=provider&threadpool=fixed&threads=100&timeout=60000&timestamp=1543490087854&token=1234567

key=service.filter:后續(xù)會用這個key得到我們自定義的filter。
group=provider:表明是服務(wù)提供端
然后,會運行到下面的方法,這里的values就是我們定義的filter數(shù)組。

 public List<T> getActivateExtension(URL url, String[] values, String group) {
        List<T> exts = new ArrayList<T>();
        List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
//Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY就是-default。意思是說我們
//在xml中配置filter時候沒有-default,就是要加載默認啟動的filter。這個大if就是加載dubbo提供的自動加載的filter集合
        if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
       //   getExtensionClasses()方法會加載所有的Filter接口的擴展實現(xiàn),包括dubbo提供的和我們自定義的
           getExtensionClasses();
//cachedActivates是一個集合,所有的Filter接口的擴展實現(xiàn)中,有@Activate注解的都會放入這個集合中
            for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
                String name = entry.getKey();
                Activate activate = entry.getValue();
//這里group=provider,而activate.group()是類的filter注解中定義的,分為provider和consumer。
//所以在服務(wù)發(fā)布端,只有注解中定義了provider的filter才會通過
                if (isMatchGroup(group, activate.group())) {
                    T ext = getExtension(name);
                    if (! names.contains(name)
                            && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name) 
//這里主要關(guān)注 isActive(activate, url),在下面分析
                            && isActive(activate, url)) {
                        exts.add(ext);
                    }
                }
            }
            Collections.sort(exts, ActivateComparator.COMPARATOR);
        }
        List<T> usrs = new ArrayList<T>();
        for (int i = 0; i < names.size(); i ++) {
            String name = names.get(i);
            if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
                    && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
                if (Constants.DEFAULT_KEY.equals(name)) {
                    if (usrs.size() > 0) {
                        exts.addAll(0, usrs);
                        usrs.clear();
                    }
                } else {
                    T ext = getExtension(name);
                    usrs.add(ext);
                }
            }
        }
        if (usrs.size() > 0) {
            exts.addAll(usrs);
        }
        return exts;
    }

上面的cachedActivates是所有由@activate注解的filter接口的擴展實現(xiàn),通過斷點,得到其值:

{exception=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=0), cache=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[cache], before=[], group=[consumer, provider], order=0), genericimpl=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[generic], before=[], group=[consumer], order=20000), deprecated=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[deprecated], before=[], group=[consumer], order=0), classloader=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=-30000), echo=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=-110000), monitor=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider, consumer], order=0), generic=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=-20000), timeout=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=0), accesslog=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[accesslog], before=[], group=[provider], order=0), token=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[token], before=[], group=[provider], order=0), trace=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=0), executelimit=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[executes], before=[], group=[provider], order=0), future=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[consumer], order=0), tps=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[tps], before=[], group=[provider], order=0), context=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=-10000), activelimit=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[actives], before=[], group=[consumer], order=0), validation=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[validation], before=[], group=[consumer, provider], order=10000), consumercontext=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[consumer], order=-10000)}

可以看到,里面是有tps這個filter的。而且其要求有value值,value的名字是tps。即要求我們的url中要有這個tps的鍵值對。

通過了 if (isMatchGroup(group, activate.group()))后,還需要進行

   if (! names.contains(name)
                            && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name) 
//這里主要關(guān)注 isActive(activate, url),在下面分析
                            && isActive(activate, url))

才能把符合條件的filter加入到exts中,這個exts就是后面要啟動的filter集合。
看看 isActive方法

    private boolean isActive(Activate activate, URL url) {
//獲取注解中value()值
        String[] keys = activate.value();
//如果value值是空的,說明不需要進行value過濾,直接放行
        if (keys == null || keys.length == 0) {
            return true;
        }
//否則,就要對服務(wù)發(fā)布的url進行屬性鍵值對的對比,看url中是否有這個value的值,而
//我們的url中沒有tps這個屬性值,因此返回false,故TpsLimitFilter這個過濾器不會啟動
        for (String key : keys) {
            for (Map.Entry<String, String> entry : url.getParameters().entrySet()) {
                String k = entry.getKey();
                String v = entry.getValue();
                if ((k.equals(key) || k.endsWith("." + key))
                        && ConfigUtils.isNotEmpty(v)) {
                    return true;
                }
            }
        }
        return false;
    }

至此,分析完畢,正常情況下,為什么TpsLimitFilter這個filter不會啟動。
而當(dāng)我們通過向注冊中心動態(tài)寫入配置規(guī)則后,根據(jù)dubbo的機制,會導(dǎo)致dubbo重新發(fā)布這個服務(wù)。刷新后發(fā)布的服務(wù)的url是

dubbo://192.168.86.1:20880/cn.andy.dubbo.DataService?anyhost=true&application=dubbo-test-service&dispatcher=all&dubbo=2.5.3&interface=cn.andy.dubbo.DataService&methods=dubboTest2,dubboTest,getStringData&mock=false&pid=67656&retries=0&service.filter=andyFilter&side=provider&threadpool=fixed&threads=100&timeout=60000&timestamp=1543490522208&token=1234567&tps=5

里面有tps這個屬性,通過上面的分析,

   if (! names.contains(name)
                            && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name) 
//這里主要關(guān)注 isActive(activate, url),在下面分析
                            && isActive(activate, url))

會返回true,導(dǎo)致我們的TpsLimitFilter會加入自動啟動的filter集合中。
至此,分析完畢!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容