Dubbo的Filter在使用的過(guò)程中是我們擴(kuò)展最頻繁的內(nèi)容,而且Dubbo的很多特性實(shí)現(xiàn)也都離不開(kāi)Filter的工作,今天一起來(lái)看一下Filter的具體實(shí)現(xiàn)。
Filter(過(guò)濾器)在很多框架中都有使用過(guò)這個(gè)概念,基本上的作用都是類(lèi)似的,在請(qǐng)求處理前或者處理后做一些通用的邏輯,而且Filter可以有多個(gè),支持層層嵌套。
Dubbo的Filter概念基本上符合我們正常的預(yù)期理解,而且Dubbo官方針對(duì)Filter做了很多的原生支持,目前大致有20來(lái)個(gè)吧,包括我們熟知的RpcContext,accesslog功能都是通過(guò)filter來(lái)實(shí)現(xiàn)了,下面一起詳細(xì)看一下Filter的實(shí)現(xiàn)。
Dubbo的Filter實(shí)現(xiàn)入口是在ProtocolFilterWrapper,因?yàn)镻rotocolFilterWrapper是Protocol的包裝類(lèi),所以會(huì)在加載的Extension的時(shí)候被自動(dòng)包裝進(jìn)來(lái)(理解這里的前提是理解Dubbo的SPI機(jī)制),然后我們看一下這個(gè)Filter鏈?zhǔn)侨绾螛?gòu)造的。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//向注冊(cè)中心引用服務(wù)的時(shí)候并不會(huì)進(jìn)行filter調(diào)用鏈
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
//獲得所有激活的Filter(已經(jīng)排好序的)
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i --) {
final Filter filter = filters.get(i);
//復(fù)制引用,構(gòu)建filter調(diào)用鏈
final Invoker<T> next = last;
//這里只是構(gòu)造一個(gè)最簡(jiǎn)化的Invoker作為調(diào)用鏈的載體Invoker
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
看到上面的內(nèi)容,我們大致能明白實(shí)現(xiàn)是這樣子的,通過(guò)獲取所有可以被激活的Filter鏈,然后根據(jù)一定順序構(gòu)造出一個(gè)Filter的調(diào)用鏈,最后的調(diào)用鏈大致是這樣子:Filter1->Filter2->Filter3->......->Invoker,這個(gè)構(gòu)造Filter鏈的邏輯非常簡(jiǎn)單,重點(diǎn)就在于如何獲取被激活的Filter鏈。
//將key在url中對(duì)應(yīng)的配置值切換成字符串信息數(shù)組
public List<T> getActivateExtension(URL url, String key, String group) {
String value = url.getParameter(key);
return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group);
}
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> exts = new ArrayList<T>();
//所有用戶(hù)自己配置的filter信息(有些Filter是默認(rèn)激活的,有些是配置激活的,這里這里的names就指的配置激活的filter信息)
List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
//如果這些名稱(chēng)里不包括去除default的標(biāo)志(-default),換言之就是加載Dubbo提供的默認(rèn)Filter
if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
//加載extension信息
getExtensionClasses();
for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
//name指的是SPI讀取的配置文件的key
String name = entry.getKey();
Activate activate = entry.getValue();
//group主要是區(qū)分實(shí)在provider端生效還是consumer端生效
if (isMatchGroup(group, activate.group())) {
T ext = getExtension(name);
//這里以Filter為例:三個(gè)判斷條件的含義依次是:
//1.用戶(hù)配置的filter列表中不包含當(dāng)前ext
//2.用戶(hù)配置的filter列表中不包含當(dāng)前ext的加-的key
//3.如果用戶(hù)的配置信息(url中體現(xiàn))中有可以激活的配置key并且數(shù)據(jù)不為0,false,null,N/A,也就是說(shuō)有正常的使用
if (! names.contains(name)
&& ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)
&& isActive(activate, url)) {
exts.add(ext);
}
}
}
//根據(jù)Activate注解上的order排序
Collections.sort(exts, ActivateComparator.COMPARATOR);
}
//進(jìn)行到此步驟的時(shí)候Dubbo提供的原生的Filter已經(jīng)被添加完畢了,下面處理用戶(hù)自己擴(kuò)展的Filter
List<T> usrs = new ArrayList<T>();
for (int i = 0; i < names.size(); i ++) {
String name = names.get(i);
//如果單個(gè)name不是以-開(kāi)頭并且所有的key里面并不包含-'name'(也就是說(shuō)如果配置成了"dubbo,-dubbo"這種的可以,這個(gè)if是進(jìn)不去的)
if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
&& ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
//可以通過(guò)default關(guān)鍵字替換Dubbo原生的Filter鏈,主要用來(lái)控制調(diào)用鏈順序
if (Constants.DEFAULT_KEY.equals(name)) {
if (usrs.size() > 0) {
exts.addAll(0, usrs);
usrs.clear();
}
} else {
//加入用戶(hù)自己定義的擴(kuò)展Filter
T ext = getExtension(name);
usrs.add(ext);
}
}
}
if (usrs.size() > 0) {
exts.addAll(usrs);
}
return exts;
}
基本上到這里就能看到Filter鏈?zhǔn)侨绾伪患虞d進(jìn)來(lái)的,這里設(shè)計(jì)的非常靈活,忍不住要感嘆一下:通過(guò)簡(jiǎn)單的配置‘-’可以手動(dòng)剔除Dubbo原生的一定加載Filter,通過(guò)default來(lái)代替Dubbo原生的一定會(huì)加載的Filter從而來(lái)控制順序。這些設(shè)計(jì)雖然都是很小的功能點(diǎn),但是總體的感覺(jué)是十分靈活,考慮的比較周到,非常值得我這種菜鳥(niǎo)學(xué)習(xí)。
知道了Filter構(gòu)造的過(guò)程之后,我們就詳細(xì)看幾個(gè)比較重要的Filter信息。
Filter在作用端區(qū)分的話(huà)主要是區(qū)分為consumer和provider,下面我們就以這個(gè)為區(qū)分來(lái)分別介紹一些重點(diǎn)的Filter。
Cunsumer
ConsumerContextFilter (默認(rèn)觸發(fā))
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
//在當(dāng)前的RpcContext中記錄本地調(diào)用的一次狀態(tài)信息
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0)
.setRemoteAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());
if (invocation instanceof RpcInvocation) {
((RpcInvocation)invocation).setInvoker(invoker);
}
try {
return invoker.invoke(invocation);
} finally {
RpcContext.getContext().clearAttachments();
}
}
其實(shí)簡(jiǎn)單來(lái)看這個(gè)Filter的話(huà)是十分簡(jiǎn)單,它又是怎么將客戶(hù)端設(shè)置的隱式參數(shù)傳遞給服務(wù)端呢?載體就是Invocation對(duì)象,在客戶(hù)端調(diào)用Invoker.invoke方法時(shí)候,會(huì)去取當(dāng)前狀態(tài)記錄器RpcContext中的attachments屬性,然后設(shè)置到RpcInvocation對(duì)象中,在RpcInvocation傳遞到provider的時(shí)候會(huì)通過(guò)另外一個(gè)過(guò)濾器ContextFilter將RpcInvocation對(duì)象重新設(shè)置回RpcContext中供服務(wù)端邏輯重新獲取隱式參數(shù)。這就是為什么RpcContext只能記錄一次請(qǐng)求的狀態(tài)信息,因?yàn)樵诘诙握{(diào)用的時(shí)候參數(shù)已經(jīng)被新的RpcInvocation覆蓋掉,第一次的請(qǐng)求信息對(duì)于第二次執(zhí)行是不可見(jiàn)的。
ActiveLimitFilter (當(dāng)配置了actives并且值不為0的時(shí)候觸發(fā))
ActiveLimitFilte主要用于限制同一個(gè)客戶(hù)端對(duì)于一個(gè)服務(wù)端方法的并發(fā)調(diào)用量。(客戶(hù)端限流)
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
//主要記錄每臺(tái)機(jī)器針對(duì)某個(gè)方法的并發(fā)數(shù)量
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (max > 0) {
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
int active = count.getActive();
if (active >= max) {
synchronized (count) {
//這個(gè)while循環(huán)是必要的,因?yàn)樵谝淮蝫ait結(jié)束后,可能線(xiàn)程調(diào)用已經(jīng)結(jié)束了,騰出來(lái)consumer的空間
while ((active = count.getActive()) >= max) {
try {
count.wait(remain);
} catch (InterruptedException e) {
}
//如果wait方法被中斷的話(huà),remain這時(shí)候有可能大于0
//如果其中一個(gè)線(xiàn)程運(yùn)行結(jié)束自后調(diào)用notify方法的話(huà),也有可能remain大于0
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {
throw new RpcException("...");
}
}
}
}
}
try {
//調(diào)用開(kāi)始和結(jié)束后增減并發(fā)數(shù)量
long begin = System.currentTimeMillis();
RpcStatus.beginCount(url, methodName);
try {
Result result = invoker.invoke(invocation);
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
return result;
} catch (RuntimeException t) {
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
throw t;
}
} finally {
//這里很關(guān)鍵,因?yàn)橐粋€(gè)調(diào)用完成后要通知正在等待執(zhí)行的隊(duì)列
if(max>0){
synchronized (count) {
count.notify();
}
}
}
}
FutureFilter
Future主要是處理事件信息,主要有以下幾個(gè)事件:
- oninvoke 在方法調(diào)用前觸發(fā)(如果調(diào)用出現(xiàn)異常則會(huì)直接觸發(fā)onthrow方法)
- onreturn 在方法返回會(huì)觸發(fā)(如果調(diào)用出現(xiàn)異常則會(huì)直接觸發(fā)onthrow方法)
- onthrow 調(diào)用出現(xiàn)異常時(shí)候觸發(fā)
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
// 這里主要處理回調(diào)邏輯,主要區(qū)分三個(gè)時(shí)間:oninvoke:調(diào)用前觸發(fā),onreturn:調(diào)用后觸發(fā) onthrow:出現(xiàn)異常情況時(shí)候觸發(fā)
fireInvokeCallback(invoker, invocation);
//需要在調(diào)用前配置好是否有返回值,已供invoker判斷是否需要返回future.
Result result = invoker.invoke(invocation);
if (isAsync) {
asyncCallback(invoker, invocation);
} else {
syncCallback(invoker, invocation, result);
}
return result;
}
private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
final Method onInvokeMethod = (Method)StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));
if (onInvokeMethod == null && onInvokeInst == null ){
return ;
}
if (onInvokeMethod == null || onInvokeInst == null ){
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() +" has a onreturn callback config , but no such "+(onInvokeMethod == null ? "method" : "instance")+" found. url:"+invoker.getUrl());
}
//由于JDK的安全檢查耗時(shí)較多.所以通過(guò)setAccessible(true)的方式關(guān)閉安全檢查就可以達(dá)到提升反射速度的目的
if (onInvokeMethod != null && ! onInvokeMethod.isAccessible()) {
onInvokeMethod.setAccessible(true);
}
//從之類(lèi)可以看出oninvoke的方法參數(shù)要與調(diào)用的方法參數(shù)一致
Object[] params = invocation.getArguments();
try {
onInvokeMethod.invoke(onInvokeInst, params);
} catch (InvocationTargetException e) {
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e);
}
}
//fireReturnCallback的邏輯與fireThrowCallback基本一樣,所以不用看了
private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
final Method onthrowMethod = (Method)StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));
final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));
if (onthrowMethod == null && onthrowInst == null ){
return ;
}
if (onthrowMethod == null || onthrowInst == null ){
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() +" has a onthrow callback config , but no such "+(onthrowMethod == null ? "method" : "instance")+" found. url:"+invoker.getUrl());
}
if (onthrowMethod != null && ! onthrowMethod.isAccessible()) {
onthrowMethod.setAccessible(true);
}
Class<?>[] rParaTypes = onthrowMethod.getParameterTypes() ;
if (rParaTypes[0].isAssignableFrom(exception.getClass())){
try {
//因?yàn)閛nthrow方法的參數(shù)第一個(gè)值必須為異常信息,所以這里需要構(gòu)造參數(shù)列表
Object[] args = invocation.getArguments();
Object[] params;
if (rParaTypes.length >1 ) {
//原調(diào)用方法只有一個(gè)參數(shù)而且這個(gè)參數(shù)是數(shù)組(單獨(dú)拎出來(lái)計(jì)算的好處是這樣可以少?gòu)?fù)制一個(gè)數(shù)組)
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)){
params = new Object[2];
params[0] = exception;
params[1] = args ;
}else {//原調(diào)用方法有多于一個(gè)參數(shù)
params = new Object[args.length + 1];
params[0] = exception;
System.arraycopy(args, 0, params, 1, args.length);
}
} else {//原調(diào)用方法沒(méi)有參數(shù)
params = new Object[] { exception };
}
onthrowMethod.invoke(onthrowInst,params);
} catch (Throwable e) {
logger.error(invocation.getMethodName() +".call back method invoke error . callback method :" + onthrowMethod + ", url:"+ invoker.getUrl(), e);
}
} else {
logger.error(invocation.getMethodName() +".call back method invoke error . callback method :" + onthrowMethod + ", url:"+ invoker.getUrl(), exception);
}
}
同步異步的主要處理區(qū)別就是同步調(diào)用的話(huà),事件觸發(fā)是直接調(diào)用的,沒(méi)有任何邏輯;異步的話(huà)就是首先獲取到調(diào)用產(chǎn)生的Future對(duì)象,然后復(fù)寫(xiě)Future的done()方法,將fireThrowCallback和fireReturnCallback邏輯引入即可。
Provider
ContextFilter
ContextFilter和ConsumerContextFilter是結(jié)合使用的,之前的介紹中已經(jīng)看了ConsumerContextFilter,下面再簡(jiǎn)單看一下ContextFilter,來(lái)驗(yàn)證剛才講到的邏輯。
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Map<String, String> attachments = invocation.getAttachments();
if (attachments != null) {
//隱式參數(shù)重剔除一些核心消息
attachments = new HashMap<String, String>(attachments);
attachments.remove(Constants.PATH_KEY);
attachments.remove(Constants.GROUP_KEY);
attachments.remove(Constants.VERSION_KEY);
attachments.remove(Constants.DUBBO_VERSION_KEY);
attachments.remove(Constants.TOKEN_KEY);
attachments.remove(Constants.TIMEOUT_KEY);
}
//這里又重新將invocation和attachments信息設(shè)置到RpcContext,這里設(shè)置以后provider的代碼就可以獲取到consumer端傳遞的一些隱式參數(shù)了
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setAttachments(attachments)
.setLocalAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());
if (invocation instanceof RpcInvocation) {
((RpcInvocation)invocation).setInvoker(invoker);
}
try {
return invoker.invoke(invocation);
} finally {
RpcContext.removeContext();
}
}
EchoFilter
回響測(cè)試主要用來(lái)檢測(cè)服務(wù)是否正常(網(wǎng)絡(luò)狀態(tài)),單純的檢測(cè)網(wǎng)絡(luò)情況的話(huà)其實(shí)不需要執(zhí)行真正的業(yè)務(wù)邏輯的,所以通過(guò)Filter驗(yàn)證一下即可.
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if(inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1 )
return new RpcResult(inv.getArguments()[0]);
return invoker.invoke(inv);
}
ExecuteLimitFilter
服務(wù)端接口限制限流的具體執(zhí)行邏輯就是在ExecuteLimitFilter中,因?yàn)榉?wù)端不需要考慮重試等待邏輯,一旦當(dāng)前執(zhí)行的線(xiàn)程數(shù)量大于指定數(shù)量,就直接返回失敗了,所以實(shí)現(xiàn)邏輯相對(duì)于A(yíng)ctiveLimitFilter倒是簡(jiǎn)便了不少。
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
if (max > 0) {
RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
if (count.getActive() >= max) {
throw new RpcException("...");
}
}
long begin = System.currentTimeMillis();
boolean isException = false;
RpcStatus.beginCount(url, methodName);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (Throwable t) {
isException = true;
if(t instanceof RuntimeException) {
throw (RuntimeException) t;
}
else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
}
finally {
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isException);
}
}
ExceptionFilter
Dubbo 對(duì)于異常的處理有自己的一套規(guī)則:
- 如果是checked異常則直接拋出
- 如果是unchecked異常但是在接口上有聲明,也會(huì)直接拋出
- 如果異常類(lèi)和接口類(lèi)在同一jar包里,直接拋出
- 如果是JDK自帶的異常,直接拋出
- 如果是Dubbo的異常,直接拋出
- 其余的都包裝成RuntimeException然后拋出(避免異常在Client出不能反序列化問(wèn)題)
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
try {
Result result = invoker.invoke(invocation);
if (result.hasException() && GenericService.class != invoker.getInterface()) {
try {
Throwable exception = result.getException();
// 如果是checked異常,直接拋出
if (! (exception instanceof RuntimeException) && (exception instanceof Exception)) {
return result;
}
// 運(yùn)行時(shí)異常,并且在方法簽名上有聲明,直接拋出
try {
Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
Class<?>[] exceptionClassses = method.getExceptionTypes();
for (Class<?> exceptionClass : exceptionClassses) {
if (exception.getClass().equals(exceptionClass)) {
return result;
}
}
} catch (NoSuchMethodException e) {
return result;
}
// 未在方法簽名上定義的異常,在服務(wù)器端打印ERROR日志
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
+ ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);
// 異常類(lèi)和接口類(lèi)在同一jar包里,直接拋出
String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)){
return result;
}
// 是JDK自帶的異常,直接拋出
String className = exception.getClass().getName();
if (className.startsWith("java.") || className.startsWith("javax.")) {
return result;
}
// 是Dubbo本身的異常,直接拋出
if (exception instanceof RpcException) {
return result;
}
// 否則,包裝成RuntimeException拋給客戶(hù)端
return new RpcResult(new RuntimeException(StringUtils.toString(exception)));
} catch (Throwable e) {
logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
+ ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
return result;
}
}
return result;
} catch (RuntimeException e) {
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
+ ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
throw e;
}
}
基本到這里的話(huà)把比較重要的Filter內(nèi)容都有講解到了,我們可以根據(jù)自己的需求非常輕易地?cái)U(kuò)展適合自己業(yè)務(wù)使用的Filter。
本文最后,還是習(xí)慣性的撒花~~~~