開篇
這篇文章主要是分析Dubbo Consumer在處理Provider的響應(yīng)的流程,整體思路會(huì)按照Dubbo Client的初始化流程和Dubbo Client的響應(yīng)流程兩部分進(jìn)行分析。
Dubbo Client的初始化流程著重分析Client的連接過(guò)程以及處理Handler的封裝關(guān)系。
Dubbo Client的響應(yīng)流程著重分析響應(yīng)過(guò)程的流程,整個(gè)處理流程建立在Dubbo Client的初始化流程基礎(chǔ)上。
這篇文章順便講解了Dubbo 2.6.5的版本Client側(cè)線程過(guò)多的問(wèn)題的原因。
Consumer Client 初始化流程
DubboProtocol
public class DubboProtocol extends AbstractProtocol {
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
// 省略相關(guān)代碼
};
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// 創(chuàng)建DubboInvoker對(duì)象過(guò)程中g(shù)etClients初始化Client對(duì)象
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
private ExchangeClient[] getClients(URL url) {
// whether to share connection
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
private ExchangeClient initClient(URL url) {
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
ExchangeClient client;
try {
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 由Exchange層負(fù)責(zé)進(jìn)行連接操作
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
}
return client;
}
}
- DubboProtocol在refer過(guò)程中創(chuàng)建DubboInvoker對(duì)象,在創(chuàng)建DubboInvoker對(duì)象過(guò)程中會(huì)初始化ExchangeClient對(duì)象。
- 初始化ExchangeClient對(duì)象是通過(guò)Exchangers層的connect()方法實(shí)現(xiàn)。
Exchangers
public class Exchangers {
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// getExchanger()返回HeaderExchanger
return getExchanger(url).connect(url, handler);
}
}
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 封裝關(guān)系 DecodeHandler => HeaderExchangeHandler => requestHandler
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
}
- HeaderExchanger內(nèi)部會(huì)調(diào)用Transporters的connect()方法。
- Handler的封裝關(guān)系 DecodeHandler => HeaderExchangeHandler => requestHandler。
Transporters
public class Transporters {
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
// 獲取NettyTransporter對(duì)象執(zhí)行connect()方法
return getTransporter().connect(url, handler);
}
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
}
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
- Transporters內(nèi)部獲取NettyTransporter對(duì)象,執(zhí)行connect()方法。
- NettyTransporter的connect()方法內(nèi)部構(gòu)造NettyClient對(duì)象,參數(shù)listener為DecodeHandler對(duì)象。
NettyClient
public class NettyClient extends AbstractClient {
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
private Bootstrap bootstrap;
private volatile Channel channel; // volatile, please copy reference to use
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
@Override
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(NioSocketChannel.class);
if (getConnectTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyClientHandler);
}
});
}
@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
// 省略其他代碼
} finally {
if (!isConnected()) {
//future.cancel(true);
}
}
}
}
public abstract class AbstractClient extends AbstractEndpoint implements Client {
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
return ChannelHandlers.wrap(handler, url);
}
}
public class ChannelHandlers {
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
// ExtensionLoader.getExtensionLoader()返回AllChannelHandler對(duì)象
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
- NettyClient的構(gòu)造函數(shù)中通過(guò)wrapChannelHandler()方法再次封裝handler。
- ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch()返回AllChannelHandler對(duì)象。
- handler的封裝關(guān)系為MultiMessageHandler => HeartbeatHandler
=> AllChannelHandler => DecodeHandler => HeaderExchangeHandler => requestHandler。 - NettyClient的NettyClientHandler為NettyClient本身。
- nettyClientHandler會(huì)在NettyClient收到響應(yīng)報(bào)文后開始執(zhí)行。
AbstractPeer
public abstract class AbstractPeer implements Endpoint, ChannelHandler {
private final ChannelHandler handler;
private volatile URL url;
public AbstractPeer(URL url, ChannelHandler handler) {
this.url = url;
this.handler = handler;
}
@Override
public void received(Channel ch, Object msg) throws RemotingException {
if (closed) {
return;
}
handler.received(ch, msg);
}
}

AbstractPeer
- AbstractPeer是NettyClient的基類,在AbstractPeer的構(gòu)造函數(shù)當(dāng)中handler為MultiMessageHandler,由NettyClient的構(gòu)造函數(shù)傳入。
- AbstractPeer作為Client端響應(yīng)入口,具體的received()方法等執(zhí)行的入口,其他方法可以在實(shí)現(xiàn)類查看。
AllChannelHandler
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
}
public class WrappedChannelHandler implements ChannelHandlerDelegate {
protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
protected final ExecutorService executor;
protected final ChannelHandler handler;
protected final URL url;
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
// 構(gòu)建executor對(duì)象
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
componentKey = Constants.CONSUMER_SIDE;
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
}
- AllChannelHandler的父類WrappedChannelHandler的構(gòu)造函數(shù)中會(huì)創(chuàng)建executor對(duì)象。
- 每個(gè)連接會(huì)有一個(gè)executor對(duì)象,consumer側(cè)的executor是基于連接維度的,每個(gè)connection會(huì)有對(duì)應(yīng)的executor對(duì)象。
Handler封裝關(guān)系

Handler封裝關(guān)系

封裝關(guān)系
Consumer Client 響應(yīng)流程
Consumer Client 響應(yīng)階段一

Consumer 響應(yīng)階段一
- Consumer響應(yīng)階段一的調(diào)用棧如上圖。
- 按照NettyClientHandler => NettyClient =>MultiMessageHandler => HeartbeatHandler => AllChannelHandler的順序進(jìn)行調(diào)用。
NettyClientHandler
public class NettyClientHandler extends ChannelDuplexHandler {
private final URL url;
private final ChannelHandler handler;
public NettyClientHandler(URL url, ChannelHandler handler) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.url = url;
this.handler = handler;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise future)
throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.disconnected(channel);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
}
- NettyClientHandler的各個(gè)方法負(fù)責(zé)處理各類連接讀取事件。
AllChannelHandler
public class AllChannelHandler extends WrappedChannelHandler {
public void received(Channel channel, Object message) throws RemotingException {
// 獲取對(duì)應(yīng)的executor線程池對(duì)象
ExecutorService executor = getExecutorService();
try {
// 構(gòu)造ChannelEventRunnable對(duì)象并進(jìn)行投遞
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
}
public class WrappedChannelHandler implements ChannelHandlerDelegate {
protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
protected final ExecutorService executor;
protected final ChannelHandler handler;
protected final URL url;
public ExecutorService getExecutorService() {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
return cexecutor;
}
}
- AllChannelHandler負(fù)責(zé)往消費(fèi)端線程池投遞ChannelEventRunnable對(duì)象。
- ExecutorService cexecutor = getExecutorService()獲取線程池對(duì)象,每個(gè)連接一個(gè)ExecutorService對(duì)象。
Consumer Client 響應(yīng)階段二

Consumer 響應(yīng)階段二
- Consumer 響應(yīng)階段二的調(diào)用棧如上圖。
- 調(diào)用棧按照ChannelEventRunnable => DecodeHandler => HeaderExchangeHandler => DefaultFuture的順序調(diào)用。
ChannelEventRunnable
public class ChannelEventRunnable implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);
private final ChannelHandler handler;
private final Channel channel;
private final ChannelState state;
private final Throwable exception;
private final Object message;
public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {
this.channel = channel;
this.handler = handler;
this.state = state;
this.message = message;
this.exception = exception;
}
@Override
public void run() {
if (state == ChannelState.RECEIVED) {
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
} else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is: " + message + ", exception is " + exception, e);
}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
}
- ChannelEventRunnable的線程內(nèi)部執(zhí)行run()方法進(jìn)行執(zhí)行流程。
- ChannelEventRunnable的內(nèi)部的handler對(duì)象為DecodeHandler對(duì)象。
- 執(zhí)行DecodeHandler的內(nèi)部會(huì)接著調(diào)用HeaderExchangeHandler對(duì)象方法。
HeaderExchangeHandler
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// 處理請(qǐng)求
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
// 處理響應(yīng)
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
// 處理telnet等請(qǐng)求
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
}
- HeaderExchangeHandler的received()內(nèi)部區(qū)別請(qǐng)求/響應(yīng)/字符串進(jìn)行不同的處理。
- Consumer處理響應(yīng)的邏輯在handleResponse()方法內(nèi)部。
- handleResponse()方法最終執(zhí)行的是DefaultFuture的方法。
DefaultFuture
public class DefaultFuture implements ResponseFuture {
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
}
- DefaultFuture負(fù)責(zé)保存響應(yīng)對(duì)象并通過(guò)信號(hào)量喚醒消費(fèi)線程。