背景
服務(wù)暴露網(wǎng)上已經(jīng)有很多文章了,大而全,我們這里主要抓細(xì)節(jié)??。
疑問(wèn)
暴露過(guò)程做了些啥?
是先啟動(dòng)服務(wù)還是先連接注冊(cè)中心?
服務(wù)下線(xiàn)怎么感知注冊(cè)中心?
暴露
我們從 org.apache.dubbo.config.ServiceConfig#doExportUrls() 方法進(jìn)去
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
//支持多協(xié)議暴露就是說(shuō) <dubbo:protocol 可以多個(gè)
//<dubbo:protocol name="dubbo" port="20880"/>
//<dubbo:protocol name="rest" port="20881"/>
//像這樣,如果有php客戶(hù)端 和 dubbo客戶(hù)端都可以同事支持
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
進(jìn)入 doExportUrlsFor1Protocol()中,這個(gè)方法大家一定要進(jìn)去瞅一眼,和我們寫(xiě)的代碼也差不多,方法長(zhǎng)度太長(zhǎng),而且循環(huán)嵌套很深。
//org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {//沒(méi)有配置協(xié)議,默認(rèn)dubbo
name = DUBBO;
}
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);
//將所有的配置都放到URL 的key=value 中
appendRuntimeParameters(map);
appendParameters(map, metrics);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider);
appendParameters(map, protocolConfig);
appendParameters(map, this);
if (ProtocolUtils.isGeneric(generic)) {//泛化
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {//版本
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
//token,dubbo 支持token校驗(yàn),只有攜帶對(duì)的token才能調(diào)用成功
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);//先將服務(wù)暴露到本地,下面分析
}
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
//注冊(cè)中心也支持多個(gè),比如可以將服務(wù)暴露到集群內(nèi),也可以將
//服務(wù)暴露到中臺(tái)供所其他業(yè)務(wù)線(xiàn)用
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
//加載監(jiān)控配置
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
// 調(diào)用具體bean的代理模式,默認(rèn)為javassist
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
//組裝invoker
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//暴露服務(wù)
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
//存儲(chǔ)發(fā)布信息
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
}
本地暴露 exportLocal(url)
//org.apache.dubbo.config.ServiceConfig#exportLocal
private void exportLocal(URL url) {
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)//收到設(shè)置協(xié)議為injvm,以供下面選擇對(duì)應(yīng)的protocol
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
//
Exporter<?> exporter = protocol.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
}
static Protocol protocol = ExtensionLoader
.getExtensionLoader(Protocol.class).getAdaptiveExtension();
static ProxyFactory PROXY_FACTORY = ExtensionLoader
.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
protocol 靜態(tài)變量為 Protocol 接口的自適應(yīng)擴(kuò)展點(diǎn),調(diào)用 protocol.export(Invoker<T> invoker) 將會(huì)根據(jù)傳入的invoker 信息決定去往哪個(gè)實(shí)現(xiàn)類(lèi)。而 invoker 傳入的值為
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local) ,PROXY_FACTORY 靜態(tài)變量也是一個(gè) ProxyFactory 的擴(kuò)展點(diǎn),從下面可以看到該擴(kuò)展點(diǎn)為方法擴(kuò)展點(diǎn),這里我們并沒(méi)有個(gè)自定義過(guò)proxy屬性,默認(rèn)實(shí)現(xiàn)為javassist=JavassistProxyFactory;(這里忽略各種包裝器)
@SPI("javassist")
public interface ProxyFactory {
@Adaptive({"proxy"})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) ;
}
進(jìn)到JavassistProxyFactory 的 getInvoker實(shí)現(xiàn)中。
//org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
//這里的proxy 是我們真正的實(shí)現(xiàn)類(lèi)HelloServiceImpl@xxx,
//如果傳進(jìn)來(lái)的是一個(gè)代理類(lèi)實(shí)現(xiàn)的花,這里只取接口type=HelloService
Class cls = proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type;
//將 HelloServiceImpl包裝成一個(gè)Wrapper類(lèi),而wrapper對(duì)象的創(chuàng)建方式正式默認(rèn)的javassist
final Wrapper wrapper = Wrapper.getWrapper(cls);
//返回一個(gè)匿名內(nèi)部類(lèi)對(duì)象,對(duì)象 doInvoke 方法中持有wrapper對(duì)象
//AbstractProxyInvoker 實(shí)現(xiàn)了Invoker
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
上面這種匿名的寫(xiě)法可能不夠具體,我們通過(guò)自定義類(lèi)的方式去實(shí)現(xiàn)它,更具象點(diǎn)
//org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new MyProxyInvoker(proxy,type,url,wrapper);
}
public class MyProxyInvoker extends AbstractProxyInvoker {
private Wrapper wrapper;
public MyProxyInvoker(Object proxy, Class type, URL url, Wrapper wrapper) {
super(proxy, type, url);
this.wrapper = wrapper;
}
@Override
protected Object doInvoke(Object proxy, String methodName, Class[] parameterTypes, Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
}
這樣寫(xiě)的效果是一樣的 JavassistProxyFactory#getInvoker()方法返回的是 MyProxyInvoker 對(duì)象,后面我們就用該對(duì)象來(lái)描述分析。
回到 Exporter<?> exporter = protocol.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));中,表達(dá)式變成了Exporter<?> exporter = protocol.export(MyProxyInvoker),MyProxyInvoker中的url對(duì)象為local
URL local = URLBuilder.from(url)
.setProtocol("injvm")
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
所以protocol.export()的實(shí)現(xiàn)類(lèi)為InjvmProtocol
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(),
exporterMap);
}
該方法返回 InjvmExporter,最后執(zhí)行 exporters.add(exporter),將InjvmExporter(這里其實(shí)外面會(huì)包裝一層ListenerExporterWrapper包裝器) 對(duì)象暴露到map中結(jié)束了jvm本地暴露。
遠(yuǎn)程暴露
我們?cè)賮?lái)看看遠(yuǎn)程暴露的區(qū)別
//同本地暴露一樣返回MyProxyInvoker實(shí)例
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
//區(qū)別本地暴露 將 MyProxyInvoker實(shí)例 包裝為 DelegateProviderMetaDataInvoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//這里寫(xiě)法和本地暴露一樣,區(qū)別在于 wrapperInvoker 中的url#protocol 并不是injvm
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
我們dubug 看看 wrapperInvoker 中的url#protocol 是啥
Protocol 為registry,所以流程會(huì)進(jìn)入到 RegistryProtocol#export(同樣這里也會(huì)有Wrapper包裝)我們debug進(jìn)去,這個(gè)方法內(nèi)容太豐富了,這里我們先只分析服務(wù)暴露
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// 獲取要暴露到注冊(cè)中心的url
URL providerUrl = getProviderUrl(originInvoker);
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//暴露服務(wù) 下面分析
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
//...
return new DestroyableExporter<>(exporter);
}
暴露服務(wù) doLocalExport()
//org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
//將要暴露的服務(wù)生成唯一的key,避免重復(fù)
String key = getCacheKey(originInvoker);
//再次包裝invoker,然后暴露
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
//providerUrl 為dubbo://xxx
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
//protocol.export 經(jīng)過(guò)各種Wrapper 會(huì)進(jìn)入到Dubbo.export
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
private String getCacheKey(final Invoker<?> originInvoker) {
URL providerUrl = getProviderUrl(originInvoker);
String key = providerUrl.removeParameters("dynamic", "enabled").toFullString();
return key;
}
這里originInvoker為 DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx)),invokerDelegate再次包裝為InvokerDelegate(DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx))),我們繼續(xù)debug,到了ProtocolFilterWrapper#export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
//.....
};
}
}
return new CallbackRegistrationInvoker<>(last, filters);
}
buildInvokerChain()會(huì)將InvokerDelegate關(guān)聯(lián)多個(gè)Filter過(guò)濾器,然后包裝為CallbackRegistrationInvoker對(duì)象返回,我們接著debug,最后到了DubboProtocol#export(),此時(shí)的invoker為CallbackRegistrationInvoker(InvokerDelegate(DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx))))
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
//生成服務(wù)key=com.poizon.study.api.service.HelloService:20880,和方法無(wú)關(guān)
String key = serviceKey(url);
//將CallbackRegistrationInvoker包裝為DubboExporter,然后存儲(chǔ)在map中
//這個(gè)map 很關(guān)鍵,將作為后面調(diào)用尋找服務(wù)的入口
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//開(kāi)啟服務(wù),也就是調(diào)用netty,開(kāi)啟20880端口
openServer(url);
//加載指定序列化方式 默認(rèn)采用hessan2
optimizeSerialization(url);
return exporter;
}
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#openServer
private void openServer(URL url) {
//..... createServer()創(chuàng)建服務(wù)
serverMap.put(key, createServer(url));
}
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer
private ExchangeServer createServer(URL url) {
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
return server;
}
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
//org....remoting.Transporters#bind(URL, ChannelHandler...)
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}//默認(rèn)選擇netty4 實(shí)現(xiàn)
return getTransporter().bind(url, handler);
}
//org.apache.dubbo.remoting.transport.netty4.NettyTransporter#connect
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
//org.apache.dubbo.remoting.transport.AbstractClient#AbstractClient
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
try {
doOpen();
//org.apache.dubbo.remoting.transport.netty4.NettyClient#doOpen
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.channel(NioSocketChannel.class);
if (getConnectTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
跟到最后看到了熟悉的netty啟動(dòng),這里有好多我們熟悉的配置,比如第一篇文章我們說(shuō)到的心跳實(shí)現(xiàn)IdleStateHandler,以及心跳默認(rèn)時(shí)間 UrlUtils.getHeartbeat(getUrl()),還有netty 的自定義handler nettyClientHandler(沒(méi)錯(cuò)這個(gè)handler就是處理dubbo消費(fèi)者請(qǐng)求的)
總結(jié)
總結(jié)下,我們一根線(xiàn)走到底,走到了最后的socket啟動(dòng),最后將 DubboExporter 放入了map中,最后層層包裝為 DestroyableExporter(ExporterChangeableWrapper(ListenerExporterWrapper(DubboExporter(CallbackRegistrationInvoker(InvokerDelegate(DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx))))))));嵌套雖然多了點(diǎn),但是Wrapper 類(lèi)的功能都是為了擴(kuò)展小功能,后面我們調(diào)幾個(gè)分析
后面將分析注冊(cè)中心和Wrapper 等功能。