Dubbo提供了過程攔截(即Filter)功能。dubbo的大多數(shù)功能都基于此功能實現(xiàn)。在dubbo的服務(wù)端,提供了一個限流Filter(TpsLimitFilter),用于在服務(wù)端控制單位時間內(nèi)(默認是60s)的調(diào)用數(shù)量tps。超過此數(shù)量,則服務(wù)端將會報錯。
一、TpsLimitFilter的使用
# 1.1、TpsLimitFilter源碼
@Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
public class TpsLimitFilter implements Filter {
private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
throw new RpcException(
new StringBuilder(64)
.append("Failed to invoke service ")
.append(invoker.getInterface().getName())
.append(".")
.append(invocation.getMethodName())
.append(" because exceed max service tps.")
.toString());
}
return invoker.invoke(invocation);
}
}
public class DefaultTPSLimiter implements TPSLimiter {
private final ConcurrentMap<String, StatItem> stats
= new ConcurrentHashMap<String, StatItem>();
public boolean isAllowable(URL url, Invocation invocation) {
int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
Constants.DEFAULT_TPS_LIMIT_INTERVAL);
String serviceKey = url.getServiceKey();
if (rate > 0) {
StatItem statItem = stats.get(serviceKey);
if (statItem == null) {
stats.putIfAbsent(serviceKey,
new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
}
return statItem.isAllowable(url, invocation);
} else {
StatItem statItem = stats.get(serviceKey);
if (statItem != null) {
stats.remove(serviceKey);
}
}
return true;
}
}
class StatItem {
private String name;
private long lastResetTime;
private long interval;
private AtomicInteger token;
private int rate;
StatItem(String name, int rate, long interval) {
this.name = name;
this.rate = rate;
this.interval = interval;
this.lastResetTime = System.currentTimeMillis();
this.token = new AtomicInteger(rate);
}
public boolean isAllowable(URL url, Invocation invocation) {
long now = System.currentTimeMillis();
if (now > lastResetTime + interval) {
token.set(rate);
lastResetTime = now;
}
int value = token.get();
boolean flag = false;
while (value > 0 && !flag) {
flag = token.compareAndSet(value, value - 1);
value = token.get();
}
return flag;
}
long getLastResetTime() {
return lastResetTime;
}
int getToken() {
return token.get();
}
public String toString() {
return new StringBuilder(32).append("StatItem ")
.append("[name=").append(name).append(", ")
.append("rate = ").append(rate).append(", ")
.append("interval = ").append(interval).append("]")
.toString();
}
}
此限流過濾器的思想就是在規(guī)定的時間內(nèi)(dubbo默認是60s),看請求數(shù)是否小于tps的數(shù)量。如果這一次的請求時間距離上一次統(tǒng)計的開始時間在60s內(nèi),那就計數(shù),如果大于tps,就報錯,如果這一次請求時間間隔已經(jīng)大于60s,那么把此次的時間作為統(tǒng)計的開始時間。算法比較簡單。
1.2、如何使用此filter
dubbo沒有把這個TpsLimitFilter放入默認啟動的filter中。下面是dubbo默認啟動的filter類型
echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
validation=com.alibaba.dubbo.validation.filter.ValidationFilter
cache=com.alibaba.dubbo.cache.filter.CacheFilter
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
那么如何使用這個filter呢?
1.3新建filter文件
所以,我們要應(yīng)用這個限流過濾器,需要在我們的resources目錄下自己新建以filter接口為文件名的文件,如下:

里面的內(nèi)容就是tps的類路徑:
tps=com.alibaba.dubbo.rpc.filter.TpsLimitFilter
1.4 使用配置規(guī)則將tps寫入注冊中心的url。
根據(jù)dubbo的user-book的說明,可以通過向注冊中心寫入配置規(guī)則,完成tps的限流操作。
public static void setLimit(){
//獲得注冊工程的spi擴展實例
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
//根據(jù)url的zookeeper,確定是zookeeper注冊中心,通過ip和端口號,連上zookeeper注冊中心
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://192.168.25.128:2181"));
//向注冊中心寫入配置規(guī)則
registry.register(URL.valueOf("override://0.0.0.0/cn.andy.dubbo.DataService?tps=5&category=configurators"
));
}
最后一句的/0.0.0.0表示對所有的ip都有效。(即我們的服務(wù)可能會放入很多的機器,那么這些機器都會執(zhí)行tps規(guī)則),我們定義了tps=5次。
這里的setLimit()方法可以在任意地方執(zhí)行。為了方便,我在Main方法中,啟動了服務(wù)端程序后執(zhí)行這個方法。
private static final Log log = LogFactory.getLog(DubboProviderMain.class);
public static void main(String[] args) {
try {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-service.xml");
context.start();
// setRouter();
setLimit();
} catch (Exception e) {
log.error("== DubboProvider context start error:",e);
}
synchronized (DubboProviderMain.class) {
while (true) {
try {
DubboProviderMain.class.wait();
} catch (InterruptedException e) {
log.error("== synchronized error:",e);
}
}
}
}
1.5 測試
我們把這個jar打包,分別放入兩個不同的虛擬機中,然后在客戶端執(zhí)行請求。結(jié)果表明,兩個虛擬機在60s內(nèi)各執(zhí)行了5次后,就開始報錯。這也從另一個方面驗證了,TpsLimitFilter限流是針對單機的。
(在測試時候,如果服務(wù)jar包,一個在虛擬機,另一個和web程序都在本地,那么申請都會發(fā)往本地,而不是隨機在兩個jar之間調(diào)用)
二、由此帶來的思考
a、TpsLimitFilter為什么只能通過寫入配置規(guī)則的方式使用,而不能直接在xml中直接寫入?
b、TpsLimitFilter有類上有@Activate注解,為什么不能像其他的內(nèi)置filter一樣,默認開啟?
2.1 @Activate注解
Activate注解可以通過group和value配置激活條件,使得被Activate注解的擴展點實現(xiàn)在滿足上述兩個條件時候被激活使用,通常用于filter的激活。
@Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
public class TpsLimitFilter implements Filter{}
又比如
@Activate(group = Constants.PROVIDER, value = Constants.TOKEN_KEY)
public class TokenFilter implements Filter {}
又比如
@Activate(group = Constants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {}
對于注解中,沒有value的情況,只需進行g(shù)roup匹配(在filter中,分為provider和consumer)。在由value的情況下,必須group匹配,而且value有值的情況下,才會啟動。
2.2 回答第一個問題,TpsLimitFilter為什么不能直接在xml中寫入。
在dubbo中,有些filter是可以直接在xml中直接寫入的。比如令牌驗證功能:
在dubbo的xml配置中,加入token配置,則相當(dāng)于啟動了TokenFilter功能。
<dubbo:service retries="0" interface="cn.andy.dubbo.DataService" ref="dataServiceImpl" timeout="60000" mock="true" token="1234567" />
但是,如果我們?nèi)缟厦鎸崿F(xiàn)1.2操作后,在xml中寫入如下tps:
<dubbo:service retries="0" interface="cn.andy.dubbo.DataService" ref="dataServiceImpl" timeout="60000" mock="false" token="1234567" filter="andyFilter" tps="5"
/>
則會報錯
2018-11-29 18:32:16,582 ERROR [DubboProviderMain.java:39] : == DubboProvider context start error:
org.springframework.beans.factory.parsing.BeanDefinitionParsingException: Configuration problem: Failed to import bean definitions from URL location [classpath*:spring/applicationContext-dubbo.xml]
Offending resource: class path resource [spring/applicationContext-service.xml]; nested exception is org.springframework.beans.factory.xml.XmlBeanDefinitionStoreException: Line 28 in XML document from URL [file:/E:/javaee/DubboTest/Dubbo-test-service-jar/target/classes/spring/applicationContext-dubbo.xml] is invalid; nested exception is org.xml.sax.SAXParseException; lineNumber: 28; columnNumber: 8; cvc-complex-type.3.2.2: 元素 'dubbo:service' 中不允許出現(xiàn)屬性 'tps'。
at org.springframework.beans.factory.parsing.FailFastProblemReporter.error(FailFastProblemReporter.java:70)
at org.springframework.beans.factory.parsing.ReaderContext.error(ReaderContext.java:85)
大意就是dubbo:service中沒有tps這個元素。
我們知道,spring會去解析dubbo標(biāo)簽,來完成dubbo相關(guān)類的實例化。所以,直接查看dubbo.xsd,看看dubbo的自定義標(biāo)簽service中,是否有這個tps屬性:
<xsd:complexType name="serviceType">
<xsd:complexContent>
<xsd:extension base="abstractServiceType">
<xsd:choice minOccurs="0" maxOccurs="unbounded">
<xsd:element ref="method" minOccurs="0" maxOccurs="unbounded" />
<xsd:element ref="parameter" minOccurs="0" maxOccurs="unbounded" />
<xsd:element ref="beans:property" minOccurs="0" maxOccurs="unbounded" />
</xsd:choice>
<xsd:attribute name="interface" type="xsd:token" use="required">
<xsd:annotation>
<xsd:documentation><![CDATA[ Defines the interface to advertise for this service in the service registry. ]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation>
<tool:expected-type type="java.lang.Class"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="ref" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ The service implementation instance bean id. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="class" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ The service implementation class name. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="path" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ The service path. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="provider" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ Deprecated. Replace to protocol. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:anyAttribute namespace="##other" processContents="lax" />
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
<xsd:complexType name="abstractServiceType">
<xsd:complexContent>
<xsd:extension base="abstractInterfaceType">
<xsd:attribute name="register" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ The service can be register to registry. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="version" type="xsd:string" use="optional" default="0.0.0">
<xsd:annotation>
<xsd:documentation><![CDATA[ The service version. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="group" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ The service group. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="deprecated" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ whether the service is deprecated. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="delay" type="xsd:string" use="optional" default="0">
<xsd:annotation>
<xsd:documentation>
<![CDATA[ The service export delay millisecond. ]]>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="export" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
<![CDATA[ The service is export. ]]>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="weight" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
<![CDATA[ The service weight. ]]>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="document" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
<![CDATA[ The service document. ]]>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="dynamic" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ the service registered to the registry is dynamic(true) or static(false). ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="token" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ The service use token. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="accesslog" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ The service use accesslog. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="executes" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ The service allow execute requests. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="protocol" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ The service protocol. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:anyAttribute namespace="##other" processContents="lax" />
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
上面是service標(biāo)簽的屬性,從里面可以看出,是沒有tps這個屬性的,但是有token這個屬性。所以TokenFilter可以在dubbo的xml配置,而TpsLimitFilter不行。
2.3 回答TpsLimitFilter有類上有@Activate注解,為什么不能像其他的內(nèi)置filter一樣,默認開啟
這一題,從上面可以得到答案。因為TpsLimitFilter的注解有g(shù)roup和value兩個(其中,value=TPS_LIMIT_RATE_KEY = "tps"),而我們在xml配置文件中沒法寫入tps這是屬性值,所以不能啟動TpsLimitFilter。而我們通過配置規(guī)則向注冊中心寫入tps后,TpsLimitFilter就能啟動了。
registry.register(URL.valueOf("override://0.0.0.0/cn.andy.dubbo.DataService?tps=5&category=configurators"
3、源碼分析
dubbo的服務(wù)發(fā)布過程的export過程中,會先經(jīng)過ProtocolFilterWrapper,在這里,完成filter的初始化和配置。
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol){
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
public int getDefaultPort() {
return protocol.getDefaultPort();
}
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
public void destroy() {
protocol.destroy();
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i --) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
}
在buildInvokerChain方法中,通過 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);得到激活的filter。
其中,invoker.getUrl()就是發(fā)布父類的url路徑:
dubbo://192.168.86.1:20880/cn.andy.dubbo.DataService?anyhost=true&application=dubbo-test-service&dispatcher=all&dubbo=2.5.3&interface=cn.andy.dubbo.DataService&methods=dubboTest2,dubboTest,getStringData&mock=false&pid=67080&retries=0&service.filter=andyFilter&side=provider&threadpool=fixed&threads=100&timeout=60000×tamp=1543490087854&token=1234567
key=service.filter:后續(xù)會用這個key得到我們自定義的filter。
group=provider:表明是服務(wù)提供端
然后,會運行到下面的方法,這里的values就是我們定義的filter數(shù)組。
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> exts = new ArrayList<T>();
List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
//Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY就是-default。意思是說我們
//在xml中配置filter時候沒有-default,就是要加載默認啟動的filter。這個大if就是加載dubbo提供的自動加載的filter集合
if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
// getExtensionClasses()方法會加載所有的Filter接口的擴展實現(xiàn),包括dubbo提供的和我們自定義的
getExtensionClasses();
//cachedActivates是一個集合,所有的Filter接口的擴展實現(xiàn)中,有@Activate注解的都會放入這個集合中
for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Activate activate = entry.getValue();
//這里group=provider,而activate.group()是類的filter注解中定義的,分為provider和consumer。
//所以在服務(wù)發(fā)布端,只有注解中定義了provider的filter才會通過
if (isMatchGroup(group, activate.group())) {
T ext = getExtension(name);
if (! names.contains(name)
&& ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)
//這里主要關(guān)注 isActive(activate, url),在下面分析
&& isActive(activate, url)) {
exts.add(ext);
}
}
}
Collections.sort(exts, ActivateComparator.COMPARATOR);
}
List<T> usrs = new ArrayList<T>();
for (int i = 0; i < names.size(); i ++) {
String name = names.get(i);
if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
&& ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
if (Constants.DEFAULT_KEY.equals(name)) {
if (usrs.size() > 0) {
exts.addAll(0, usrs);
usrs.clear();
}
} else {
T ext = getExtension(name);
usrs.add(ext);
}
}
}
if (usrs.size() > 0) {
exts.addAll(usrs);
}
return exts;
}
上面的cachedActivates是所有由@activate注解的filter接口的擴展實現(xiàn),通過斷點,得到其值:
{exception=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=0), cache=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[cache], before=[], group=[consumer, provider], order=0), genericimpl=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[generic], before=[], group=[consumer], order=20000), deprecated=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[deprecated], before=[], group=[consumer], order=0), classloader=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=-30000), echo=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=-110000), monitor=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider, consumer], order=0), generic=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=-20000), timeout=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=0), accesslog=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[accesslog], before=[], group=[provider], order=0), token=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[token], before=[], group=[provider], order=0), trace=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=0), executelimit=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[executes], before=[], group=[provider], order=0), future=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[consumer], order=0), tps=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[tps], before=[], group=[provider], order=0), context=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=-10000), activelimit=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[actives], before=[], group=[consumer], order=0), validation=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[validation], before=[], group=[consumer, provider], order=10000), consumercontext=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[consumer], order=-10000)}
可以看到,里面是有tps這個filter的。而且其要求有value值,value的名字是tps。即要求我們的url中要有這個tps的鍵值對。
通過了 if (isMatchGroup(group, activate.group()))后,還需要進行
if (! names.contains(name)
&& ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)
//這里主要關(guān)注 isActive(activate, url),在下面分析
&& isActive(activate, url))
才能把符合條件的filter加入到exts中,這個exts就是后面要啟動的filter集合。
看看 isActive方法
private boolean isActive(Activate activate, URL url) {
//獲取注解中value()值
String[] keys = activate.value();
//如果value值是空的,說明不需要進行value過濾,直接放行
if (keys == null || keys.length == 0) {
return true;
}
//否則,就要對服務(wù)發(fā)布的url進行屬性鍵值對的對比,看url中是否有這個value的值,而
//我們的url中沒有tps這個屬性值,因此返回false,故TpsLimitFilter這個過濾器不會啟動
for (String key : keys) {
for (Map.Entry<String, String> entry : url.getParameters().entrySet()) {
String k = entry.getKey();
String v = entry.getValue();
if ((k.equals(key) || k.endsWith("." + key))
&& ConfigUtils.isNotEmpty(v)) {
return true;
}
}
}
return false;
}
至此,分析完畢,正常情況下,為什么TpsLimitFilter這個filter不會啟動。
而當(dāng)我們通過向注冊中心動態(tài)寫入配置規(guī)則后,根據(jù)dubbo的機制,會導(dǎo)致dubbo重新發(fā)布這個服務(wù)。刷新后發(fā)布的服務(wù)的url是
dubbo://192.168.86.1:20880/cn.andy.dubbo.DataService?anyhost=true&application=dubbo-test-service&dispatcher=all&dubbo=2.5.3&interface=cn.andy.dubbo.DataService&methods=dubboTest2,dubboTest,getStringData&mock=false&pid=67656&retries=0&service.filter=andyFilter&side=provider&threadpool=fixed&threads=100&timeout=60000×tamp=1543490522208&token=1234567&tps=5
里面有tps這個屬性,通過上面的分析,
if (! names.contains(name)
&& ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)
//這里主要關(guān)注 isActive(activate, url),在下面分析
&& isActive(activate, url))
會返回true,導(dǎo)致我們的TpsLimitFilter會加入自動啟動的filter集合中。
至此,分析完畢!