前言
一直想做Rocketmq的源碼解析系列,但是這塊涉及到的組件較多比較龐大一下子不好下手,最近偶然發(fā)現(xiàn)NameServer這塊的源碼比較簡單,所以準(zhǔn)備以這塊做為切入點(diǎn)逐步補(bǔ)完這個(gè)系列,當(dāng)是為2020的開年立個(gè)flag吧。話不多說直接進(jìn)入正題。
NameServer初始化流程
public static void main(String[] args) {
main0(args);
}
Rocketmq源碼的工程劃分還是挺清晰的,NameServer的代碼都在namesrv這個(gè)工程中,具體的啟動(dòng)入口為NamesrvStartup類的main方法,直接調(diào)用的是main0方法,首先來看一下源碼:
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
這里的主要工作是創(chuàng)建了一個(gè)NamesrvController對(duì)象,該對(duì)象是NameServer 的總控制器,負(fù)責(zé)所有服務(wù)的生命周期管理,可以看到該對(duì)象通過createNamesrvController方法創(chuàng)建,我們來看看創(chuàng)建過程中都做了些什么。
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
這里可以看到,該方法首先創(chuàng)建了一個(gè)NettyServerConfig的對(duì)象,該類屬于remoting工程,基于netty實(shí)現(xiàn)了Rocketmq各個(gè)模塊間的網(wǎng)絡(luò)通信能力,包括producer、consumer和broker都有調(diào)用該模塊,這里Netty相關(guān)內(nèi)容不做展開。順便從setListenPort(9876)可以看出,NameServer默認(rèn)本地監(jiān)控的就是9876端口。
接著是通過commandLine.hasOption('c')和commandLine.hasOption('p')處理啟動(dòng)參數(shù),-c參數(shù)用于指定配置文件的位置,-p參數(shù)用于打印所有配置項(xiàng)的值,僅供調(diào)試之用。
最后通過new NamesrvController(namesrvConfig, nettyServerConfig)創(chuàng)建NamesrvController對(duì)象,controller的創(chuàng)建過程就完成了,接著通過start(controller)方法來啟動(dòng)該控制器。
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
controller.start();
return controller;
}
這里對(duì)controller的操作主要是三塊,調(diào)用initialize()方法初始化、調(diào)用start()方法開始運(yùn)行、系統(tǒng)關(guān)閉時(shí)調(diào)用shutdown()方法,下面一一來看一下:
首先是initialize()方法
public boolean initialize() {
this.kvConfigManager.load();
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
***省略部分代碼***
return true;
}
這里比較重要的是以下幾塊:
-
new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService),該方法創(chuàng)建了一個(gè)remotingServer對(duì)象,里面主要工作是初始化ServerBootstrap、EventLoopGroup等元素,了解Netty的話你就知道這些是通過Netty進(jìn)行網(wǎng)絡(luò)編程所必須的元素,這里不做展開。 - 兩個(gè)定時(shí)任務(wù)
scheduledExecutorService.scheduleAtFixedRate,一個(gè)每10秒執(zhí)行一次NamesrvController.this.routeInfoManager.scanNotActiveBroker()方法,另一個(gè)每10分鐘執(zhí)行一次NamesrvController.this.kvConfigManager.printAllPeriodically()方法。kvConfigManager.printAllPeriodically()用于打印配置信息,不多贅述。而routeInfoManager.scanNotActiveBroker()則需要好好說明一下。
RouteInfoManager是一個(gè)非常核心的對(duì)象,負(fù)責(zé)保存和管理集群路由信息,其定義中包括如下的屬性,這5個(gè)map是NameServer數(shù)據(jù)存儲(chǔ)的核心,具體的說明可以參見注釋。
//保存的是主題和隊(duì)列信息,其中每個(gè)隊(duì)列信息對(duì)應(yīng)的類 QueueData 中,還保存了 brokerName。
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//保存了集群中每個(gè) brokerName 對(duì)應(yīng) Broker 信息,每個(gè) Broker 信息用一個(gè) BrokerData 對(duì)象表示,
//`BrokerData`中保存了集群名稱cluster,brokerName 和一個(gè)保存Broker物理地址的 Map:brokerAddrs,
//它的Key是BrokerID,Value就是這個(gè)BrokerID對(duì)應(yīng)的Broker的物理地址。
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//保存的是集群名稱與 BrokerName 的對(duì)應(yīng)關(guān)系
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//保存了每個(gè) Broker 當(dāng)前的動(dòng)態(tài)信息,包括心跳更新時(shí)間,路由數(shù)據(jù)版本等等
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//保存了每個(gè) Broker 對(duì)應(yīng)的消息過濾服務(wù)的地址,用于服務(wù)端消息過濾
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
那RouteInfoManager的scanNotActiveBroker()方法又做了什么呢,來看一看
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
根據(jù)上面對(duì)brokerLiveTable的說明,我們知道了該方法主要是遍歷該map,對(duì)最后心跳時(shí)間超過閾值BROKER_CHANNEL_EXPIRED_TIME的broker節(jié)點(diǎn)進(jìn)行移除,調(diào)用RemotingUtil.closeChannel方法關(guān)閉連接,并且調(diào)用onChannelDestroy方法對(duì)其它4個(gè)map中相關(guān)的數(shù)據(jù)進(jìn)行清除,需要說明的是由于這些map都不是并發(fā)容器,因此在onChannelDestroy中做清除操作時(shí)使用了ReentrantReadWriteLock來做并發(fā)控制。至于為什么不一開始就使用并發(fā)容器進(jìn)行數(shù)據(jù)存儲(chǔ),猜想可能是因?yàn)榍謇聿僮鞲鼮榈皖l,而平時(shí)非并發(fā)操作map情況更頻繁,這樣相對(duì)更節(jié)省開銷吧。
- 順帶提一下
this.registerProcessor()方法,內(nèi)部調(diào)用如下
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
這里調(diào)用了remotingServer對(duì)象的registerDefaultProcessor方法
@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}
其實(shí)質(zhì)是為defaultRequestProcessor屬性創(chuàng)建了一個(gè)Pair類型的對(duì)象,后續(xù)的業(yè)務(wù)處理都會(huì)使用到這個(gè)對(duì)象。
到此initialize()方法分析完了,接著來看start()方法
public void start() throws Exception {
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
還是調(diào)用了之前初始化好的remotingServer的start()方法
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
熟悉Netty的話看到這里就很清楚了,如此長的一段代碼幾乎就是標(biāo)準(zhǔn)的Server端編碼模板,通過serverBootstrap啟動(dòng)對(duì)本地端口的監(jiān)聽,而最重要的業(yè)務(wù)包的處理就會(huì)交由處理鏈上的NettyConnectManageHandler和NettyServerHandler類處理,關(guān)于這兩個(gè)類后續(xù)再展開分析。
另外,該方法還調(diào)用了this.nettyEventExecutor.start(),這里的nettyEventExecutor是一個(gè)NettyEventExecutor類型的對(duì)象,該類是NettyRemotingAbstract類的內(nèi)部類,實(shí)現(xiàn)了Runnable接口,其run方法實(shí)現(xiàn)為
while (!this.isStopped()) {
try {
NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
if (event != null && listener != null) {
switch (event.getType()) {
case IDLE:
listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
break;
case CLOSE:
listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
break;
case CONNECT:
listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
break;
case EXCEPTION:
listener.onChannelException(event.getRemoteAddr(), event.getChannel());
break;
default:
break;
}
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
這里通過一個(gè)循環(huán)不停從eventQueue隊(duì)列中取event,并根據(jù)event的類型調(diào)用監(jiān)聽器listener的不同方法,這里的listener溯源回去其實(shí)就是NettyRemotingServer初始化時(shí)傳入的brokerHousekeepingService對(duì)象,其類定義為:
public class BrokerHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;
public BrokerHousekeepingService(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
}
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
}
可以看到其實(shí)不管是IDLE、CLOSE還是EXCEPTION事件最后調(diào)用的都是之前遇到過的RouteInfoManager的onChannelDestroy方法。
也就是說其實(shí)this.nettyEventExecutor.start()起了一個(gè)新線程一樣是在定期做RouteInfoManager的清理工作。
分析完了start()方法最后來看一下controller注冊的shutdown()方法:
public void shutdown() {
this.remotingServer.shutdown();
this.remotingExecutor.shutdown();
this.scheduledExecutorService.shutdown();
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
}
可以看到執(zhí)行的都是之前使用的各種線程池的關(guān)閉,這樣做非常優(yōu)雅,避免了進(jìn)程退出時(shí)各種處理中信息的丟失,很值得學(xué)習(xí)。
NameServer與其他組件的交互
講完了NameServer的初始化,但沒有講Broker和NameServer,以及Producer、Consumer和NameServer之間是如何交互的,其實(shí)NameServer和各個(gè)組件都是基于Netty進(jìn)行通信的,因此無論是和哪個(gè)組件進(jìn)行交互基于Netty的基礎(chǔ)框架部分都是一致的,正常有區(qū)別的其實(shí)是對(duì)不同組件發(fā)來請求的業(yè)務(wù)處理相關(guān)部分。
回到之前的remotingServer的start()方法
@Override
public void start() {
***省略部分代碼***
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
}
});
***省略部分代碼***
}
了解Netty的話就會(huì)知道,本地端口監(jiān)聽到的包都會(huì)由Handler鏈處理,除了NettyEncoder、NettyDecoder和IdleStateHandler這幾個(gè)Netty自帶的包以外,我們需要關(guān)注的就是NettyConnectManageHandler和NettyServerHandler這兩個(gè)自定義的Handler。
首先看一下NettyConnectManageHandler類,該類繼承了Netty的ChannelDuplexHandler類,實(shí)現(xiàn)了其中的channelRegistered、channelUnregistered、channelActive、channelInactive、userEventTriggered和exceptionCaught方法,用于處理channel的注冊、連接和斷開等事件,具體源碼如下:
class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
super.channelActive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
super.channelInactive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}
RemotingUtil.closeChannel(ctx.channel());
}
}
這里以channelActive方法進(jìn)行分析,當(dāng)broker和namesrv建立起channel時(shí),會(huì)觸發(fā)該方法的調(diào)用,該方法中除了常規(guī)的調(diào)用super.channelActive(ctx)將消息繼續(xù)向Handler鏈的下游傳遞,也就是NettyServerHandler類,另外該方法還調(diào)用了NettyRemotingServer.this.putNettyEvent方法,我們來看下該方法:
public void putNettyEvent(final NettyEvent event) {
this.nettyEventExecutor.putNettyEvent(event);
}
又調(diào)用了nettyEventExecutor對(duì)象的putNettyEvent方法,接著來看一下:
public void putNettyEvent(final NettyEvent event) {
if (this.eventQueue.size() <= maxSize) {
this.eventQueue.add(event);
} else {
log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
}
}
可以看到主要做的事情就是往eventQueue這個(gè)阻塞隊(duì)列里放入event對(duì)象,也就是一個(gè)new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel())類型的對(duì)象。之前已經(jīng)分析過,NameServer啟動(dòng)后會(huì)有一個(gè)NettyEventExecutor類型的對(duì)象循環(huán)不停從eventQueue隊(duì)列中取event,并根據(jù)event的類型調(diào)用監(jiān)聽器listener的不同方法。
接著來看NettyServerHandler類,該類繼承了Netty的SimpleChannelInboundHandler<I>類,但僅僅實(shí)現(xiàn)了channelRead0一個(gè)方法,源碼如下:
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
該方法調(diào)用了processMessageReceived(ctx, msg)方法,來看一下
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
這里主要處理msg的類型為REQUEST_COMMAND和RESPONSE_COMMAND的兩類消息,我們以REQUEST_COMMAND類型也就是請求消息為例進(jìn)行分析,該類消息調(diào)用了processRequestCommand(ctx, cmd)方法
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
這個(gè)方法由一個(gè)大的if-else塊組成,因?yàn)?code>processorTable在Broker中用于緩存請求,而NameServer中并沒有使用,因此得到的pair對(duì)象就是前面提到過的初始化了的defaultRequestProcessor對(duì)象。這里if分支主要做的事情是創(chuàng)建一個(gè)任務(wù),并通過線程池執(zhí)行它,我們先來看一下該任務(wù)做了些什么,其中的doBeforeRpcHooks和doAfterRpcHooks方法主要是在處理請求的前后做一些前后置工作,實(shí)際處理請求的是pair.getObject1().processRequest(ctx, cmd)方法,來看看這個(gè)方法做了什么。
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
return null;
}
這是一個(gè)非常典型的處理Request的路由分發(fā)過程,根據(jù)request.getCode() 來分發(fā)請求到對(duì)應(yīng)的處理器中。例如Broker發(fā)給NameServer注冊請求的Code為 REGISTER_BROKER,客戶端獲取路由信息的Code為GET_ROUTEINTO_BY_TOPIC。
看完了processRequest方法,接著看以下代碼段
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
可以看到,如果收到的請求不是oneway類型的,那么就需要用之前處理后得到的response對(duì)象去做回復(fù)。到此整個(gè)NameServer的業(yè)務(wù)流程大致就講完了。
總結(jié)
本文通過源碼分析了NameServer的初始化流程和請求處理這兩塊,了解了NameServer工作的大致流程,在了解namesrv工程的同時(shí)也順帶了解了一部分remoting工程,應(yīng)該會(huì)為后續(xù)繼續(xù)分析其他組件起到幫助。順便說一下,這次源碼其實(shí)看得挺快的,有一個(gè)重要原因是對(duì)Netty比較熟悉,所以多看一些底層的東西真的會(huì)很有幫助,越到后面越會(huì)體現(xiàn)出來。另外,這次是真的下決心要把Rocketmq系列做完的,這絕對(duì)是個(gè)大坑,希望我能堅(jiān)持住吧。