RocketMQ的HA主要體現(xiàn)在服務(wù)器端的Namesrv和broker的配置上,既然是進(jìn)行了HA,那么肯定是集群來(lái)保證的,我們一一來(lái)看。
Namesrv高可用
Namesrv集群?jiǎn)?dòng)多個(gè) Namesrv實(shí)例實(shí)現(xiàn)高可用,各個(gè)Namesrv節(jié)點(diǎn)之間的關(guān)系比較特殊:
各個(gè)Namesrv節(jié)點(diǎn)之間沒有任何關(guān)聯(lián)關(guān)系,不進(jìn)行通信和數(shù)據(jù)交換,僅僅作為負(fù)載節(jié)點(diǎn)而存在,當(dāng)有節(jié)點(diǎn)掛掉時(shí),其它節(jié)點(diǎn)不會(huì)受影響,而是繼續(xù)提供服務(wù),除非所有機(jī)器都掛掉,這時(shí)Namesrv集群才會(huì)癱瘓。
1.broker注冊(cè)namesrv的代碼
public RegisterBrokerResult registerBrokerAll( 一堆入?yún)? {
RegisterBrokerResult registerBrokerResult = null;
//步驟1
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null) {
for (String namesrvAddr : nameServerAddressList) {
try {
//步驟2
RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
if (result != null) {
registerBrokerResult = result;
}
log.info("register broker to name server {} OK", namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
}
}
}
return registerBrokerResult;
}
1.獲取namesrv全部節(jié)點(diǎn)列表nameServerAddressList
2.循環(huán)namesrv,該broker把自己注冊(cè)到每個(gè)namesrv上,由此來(lái)保證所有的namesrv節(jié)點(diǎn)的數(shù)據(jù)是一致的。
2.Producer訪問 Namesrv
private Channel getAndCreateNameserverChannel() throws InterruptedException {
···
···
final List<String> addrList = this.namesrvAddrList.get();
if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
if (addrList != null && !addrList.isEmpty()) {
for (int i = 0; i < addrList.size(); i++) {
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);
this.namesrvAddrChoosed.set(newAddr);
log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
Channel channelNew = this.createChannel(newAddr);
if (channelNew != null)
return channelNew;
}
}
} catch (Exception e) {
log.error("getAndCreateNameserverChannel: create name server channel exception", e);
} finally {
this.lockNamesrvChannel.unlock();
}
} else {
log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
return null;
}
生產(chǎn)者、消費(fèi)者都是從namesrv列表中選擇可用的節(jié)點(diǎn)進(jìn)行連接。
Broker高可用
Broker的一個(gè)主從組:Master節(jié)點(diǎn)x1 + Slave節(jié)點(diǎn)xN,
Master節(jié)點(diǎn)提供讀寫服務(wù),Slave節(jié)點(diǎn)只提供讀服務(wù)。
每個(gè)主從組,Master節(jié)點(diǎn) 不斷發(fā)送新的 CommitLog 給 Slave節(jié)點(diǎn)。 Slave節(jié)點(diǎn) 不斷上報(bào)本地的 CommitLog 已經(jīng)同步到的位置給 Master節(jié)點(diǎn)。
集群內(nèi),Master節(jié)點(diǎn) 有兩種類型:同步和異步:前者在 Producer 發(fā)送消息時(shí),等待 Slave節(jié)點(diǎn) 存儲(chǔ)完畢后再返回發(fā)送結(jié)果,而后者不需要等待。
1.主節(jié)點(diǎn)邏輯代碼
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
//步驟1
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
//步驟2
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
···
···
···
}
1.this.processReadEvent()來(lái)計(jì)算slave請(qǐng)求同步的CommitLog的位置
2.判斷是否連接超時(shí)
2.WriteSocketService向 Slave 傳輸新的 CommitLog數(shù)據(jù)

3.Slave 循環(huán)
實(shí)現(xiàn)從 Master 傳輸 CommitLog 數(shù)據(jù),上傳 Master 自己本地的 CommitLog 已經(jīng)同步物理位置。
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//步驟1
if (this.connectMaster()) {
if (this.isTimeToReportOffset()) {
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
this.selector.select(1000);
//步驟2
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
//步驟3
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
1.判斷上報(bào)master的時(shí)間間隔并返回result,起到了心跳作用
2.同master的該方法,計(jì)算CommitLog偏移量
3.Master過久未返回?cái)?shù)據(jù),關(guān)閉連接
4.Producer 發(fā)送消息
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//判斷是否開啟了開關(guān)
if (this.sendLatencyFaultEnable) {
try {
//獲取一個(gè)可用的并且brokerName=lastBrokerName的消息隊(duì)列
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
//選擇一個(gè)相對(duì)好的broker,不考慮可用性的消息隊(duì)列
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
//隨機(jī)選擇一個(gè)消息隊(duì)列
return tpInfo.selectOneMessageQueue();
}
//獲得 lastBrokerName 對(duì)應(yīng)的一個(gè)消息隊(duì)列,不考慮該隊(duì)列的可用性
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
我們來(lái)看選擇broker的邏輯:
1.首先選擇一個(gè)broker==lastBrokerName并且可用的一個(gè)隊(duì)列(也就是該隊(duì)列并沒有因?yàn)檠舆t過長(zhǎng)而被加進(jìn)了延遲容錯(cuò)對(duì)象latencyFaultTolerance 中)
2.如果第一步中沒有找到合適的隊(duì)列,此時(shí)舍棄broker==lastBrokerName這個(gè)條件,選擇一個(gè)相對(duì)較好的broker來(lái)發(fā)送
3.隨機(jī)選擇一個(gè)隊(duì)列來(lái)發(fā)送
總結(jié)
1.RocketMQ通過啟動(dòng)多個(gè) 【Broker主從組】 形成 集群 實(shí)現(xiàn)Broker的高可用。
2.Broker主從組 與 Broker主從組 之間沒有任何關(guān)系,不進(jìn)行通信與數(shù)據(jù)同步。
3.Namesrv各節(jié)點(diǎn)之間類似于Broker主從組之間的關(guān)系,相互獨(dú)立,共同負(fù)載,不進(jìn)行通信與數(shù)據(jù)同步。