簡(jiǎn)單介紹
RocketMQ搭建集群無(wú)非就是搭建Master和Slave,而根據(jù)Master和Slave的數(shù)量不同又分為幾種方式:
- 單Master無(wú)Slave:這種就是普通的單機(jī)模式了,Master一掛,消息服務(wù)就不可用了
- 多Master無(wú)Slave:這種如果有其中一個(gè)Master掛了,集群還能使用,但是由于沒有Slave,如果掛的那臺(tái)Master還有未被消費(fèi)的消息,那么將會(huì)暫時(shí)無(wú)法消費(fèi)直至這臺(tái)Master恢復(fù)
- 多Master多Slave:這種方式最為理想,即使Master掛了,發(fā)送消息不送影響,可以發(fā)到另外的機(jī)器,消費(fèi)消息也不受影響(極端情況不考慮),因?yàn)镸aster和Slave會(huì)同步消息和配置
注:我看的是3.5.8的版本,目前沒有發(fā)現(xiàn)主從切換的功能
整體結(jié)構(gòu)圖如下:

- Producer:
- 和NameServer建立連接:獲取相關(guān)配置信息,如Broker信息等
- 和Master,Slave建立連接:Producer向Master發(fā)送消息,消息只會(huì)向Master進(jìn)行發(fā)送
- Master:
- 和Producer建立連接:Producer向Master發(fā)送消息,消息只會(huì)向Broker進(jìn)行發(fā)送
- 和NameServer建立連接:獲取相關(guān)配置信息,如Slave獲取Master信息等
- 和Consumer建立連接:Consumer向Master拉取消息
- 和Slave建立連接:消息同步
- Master之間不建立連接:多個(gè)Master之間相互獨(dú)立
- Slave:
- 和Master建立連接:消息同步
- 和NameServer建立連接:獲取相關(guān)配置信息,如Slave獲取Master信息等
- 和Consumer建立連接:Consumer向Slave拉取消息
- MS之間不建立連接:如Master
總體流程
1.Master默認(rèn)會(huì)使用port+1作為HA相關(guān)的端口
2.Master會(huì)推送消息到Slave中,而Slave會(huì)定期處理這些消息并寫入CommitLog中
3.Slave每5s會(huì)將處理到的offset發(fā)送回Master
4.Master收到Offset會(huì)記錄下來(lái),并打上標(biāo)志位,在SYNC_MASTER的模式下通過(guò)該標(biāo)志位判斷是否已經(jīng)將消息同步到Slave
整個(gè)流程簡(jiǎn)化如下:

源碼分析
那么下面會(huì)從第3種情況進(jìn)行源碼分析,因?yàn)檫@種情況包含了上面的兩種情況的處理
消息同步
Slave
先從Slave說(shuō)起,Slave同步Broker的消息,主要是HAClient這個(gè)類,看下其核心的變量
// 主節(jié)點(diǎn)IP:PORT
private final AtomicReference<String> masterAddress = new AtomicReference<String>();
// 向Master匯報(bào)Slave最大Offset
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
// Slave向Master匯報(bào)Offset,匯報(bào)到哪里
private long currentReportedOffset = 0;
// 粘包拆包使用的
private int dispatchPosition = 0;
// 從Master接收數(shù)據(jù)Buffer
private ByteBuffer byteBufferRead = ByteBuffer.allocate(ReadMaxBufferSize);
具體每個(gè)變量有什么用需要到后面再詳細(xì)介紹,先說(shuō)一下這個(gè)masterAddress,這里保存著Master的信息,而Master其實(shí)也會(huì)初始化HAClient,但是他的masterAddress是空的,所以不會(huì)進(jìn)行相應(yīng)的操作
看下核心的run方法
public void run() {
while (!this.isStoped()) {
try {
if (this.connectMaster()) {// 如果masterAddress不為空,會(huì)進(jìn)行連接并返回SocketChannel,Master沒有masterAddress所以這里直接跳過(guò)
// 先匯報(bào)最大物理Offset || 定時(shí)心跳方式匯報(bào)
if (this.isTimeToReportOffset()) {// 默認(rèn)5s進(jìn)行同步
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);// 將同步進(jìn)度發(fā)送回Master
if (!result) {
this.closeMaster();
}
}
// 等待應(yīng)答
this.selector.select(1000);
// 接收數(shù)據(jù)
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
// 只要本地有更新,就匯報(bào)最大物理Offset
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
// 檢查Master的反向心跳
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
this.closeMaster();
}
}
else {
this.waitForRunning(1000 * 5);
}
}
catch (Exception e) {
this.waitForRunning(1000 * 5);
}
}
}
processReadEvent方法就是普通的nio程序處理接收數(shù)據(jù)的地方
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);//讀取Master發(fā)送的消息
if (readSize > 0) {
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
readSizeZeroTimes = 0;
boolean result = this.dispatchReadRequest();
if (!result) {
return false;
}
}
else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
}
else {
return false;
}
}
catch (IOException e) {
return false;
}
}
return true;
}
在Channel中讀取消息,并放到緩沖區(qū)中,調(diào)用dispatchReadRequest對(duì)讀取的數(shù)據(jù)進(jìn)行處理
private boolean dispatchReadRequest() {
final int MSG_HEADER_SIZE = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();
while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPostion;// dispatchPostion為上次處理到的位置,diff為這次讀取的數(shù)據(jù)的位置與上次讀取到的位置的差別大小
if (diff >= MSG_HEADER_SIZE) {//如果大于Head的大小
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// 檢驗(yàn)Master和Slave之間的Offset
if (slavePhyOffset != 0) {
if (slavePhyOffset != masterPhyOffset) {
return false;
}
}
// 粘包和拆包的處理
// 如果diff 小于MSG_HEADER_SIZE + bodySize的值,那么代表發(fā)生拆包,等待下一次讀取數(shù)據(jù)
if (diff >= (MSG_HEADER_SIZE + bodySize)) {
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPostion + MSG_HEADER_SIZE);
this.byteBufferRead.get(bodyData);
// 將Master發(fā)送的消息寫到CommitLog
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
this.byteBufferRead.position(readSocketPos);
this.dispatchPostion += MSG_HEADER_SIZE + bodySize;
if (!reportSlaveMaxOffsetPlus()) {//將同步進(jìn)度發(fā)送回Master
return false;
}
continue;
}
}
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
主要是兩點(diǎn):
- 獲取Master發(fā)送的消息
- 將Master發(fā)送的消息寫入CommitLog并告訴Master同步到的位置
再來(lái)看下reportSlaveMaxOffsetPlus的實(shí)現(xiàn)
private boolean reportSlaveMaxOffsetPlus() {
boolean result = true;
// 只要本地有更新,就匯報(bào)最大物理Offset
long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (currentPhyOffset > this.currentReportedOffset) {
this.currentReportedOffset = currentPhyOffset;
result = this.reportSlaveMaxOffset(this.currentReportedOffset);// 通過(guò)Channel寫會(huì)Master
if (!result) {
this.closeMaster();
log.error("HAClient, reportSlaveMaxOffset error, " + this.currentReportedOffset);
}
}
return result;
}
邏輯很簡(jiǎn)單,獲取CommitLog的最大位移,如果比上一次的發(fā)送的同步的位置大,那么就發(fā)送回Master并更新currentReportedOffset
Master
Master同步相關(guān)的類如下:
- HAService:?jiǎn)?dòng)同步相關(guān)服務(wù)的入口
- AcceptSocketService:接收Slave的連接并構(gòu)建HAConnection實(shí)例
- GroupTransferService:BrokerRole為SYNC_MASTER的情況下,GroupTransferService會(huì)作為一個(gè)中間的服務(wù),設(shè)置一個(gè)標(biāo)志位,用來(lái)判斷Slave是否已經(jīng)同步完成
- HAConnection:每個(gè)Slave會(huì)對(duì)應(yīng)一個(gè)HAConnection實(shí)例,用來(lái)與Slave交互
- WriteSocketService:向Slave推送消息
- ReadSocketService:讀取Slave發(fā)回的同步進(jìn)度
AcceptSocketService
AcceptSocketService中使用原生nio實(shí)現(xiàn),nio相關(guān)的會(huì)省略,只會(huì)講一下核心的流程。run方法是核心所在,主要是用來(lái)接收Slave的連接然后構(gòu)建HAConnection對(duì)象
public void run() {
while (!this.isStoped()) {
try {
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
try {
// 每有一個(gè)Slave連接進(jìn)來(lái),都會(huì)構(gòu)建一個(gè)HAConnection對(duì)象,并持有對(duì)應(yīng)的Channel
HAConnection conn = new HAConnection(HAService.this, sc);
conn.start();// 分別啟動(dòng)WriteSocketService和ReadSocketService服務(wù)
HAService.this.addConnection(conn);// 將連接保存到connectionList中
}
catch (Exception e) {
//....
}
}
}
else {//....
}
}
selected.clear();
}
}
catch (Exception e) {//....
}
}
}
ReadSocketService
ReadSocketService主要是負(fù)責(zé)處理Slave上傳的進(jìn)度及其他相關(guān)操作,核心都是run方法
public void run() {
while (!this.isStoped()) {
try {
this.selector.select(1000);
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
// 檢測(cè)心跳間隔時(shí)間,超過(guò)則強(qiáng)制斷開
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now()
- this.lastReadTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore()
.getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr
+ "] expired, " + interval);
break;
}
}
catch (Exception e) {
break;
}
}
// ....
}
處理讀取數(shù)據(jù)的地方是processReadEvent方法
private boolean processReadEvent() {
// ....省略其他代碼
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
this.lastReadTimestamp =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
// 接收Slave上傳的offset
if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPostion = pos;
// 更新slaveAckOffset和slaveRequestOffset
HAConnection.this.slaveAckOffset = readOffset;
if (HAConnection.this.slaveRequestOffset < 0) {
HAConnection.this.slaveRequestOffset = readOffset;
}
// 通知前端線程
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
}
else if (readSize == 0) {
// ....省略其他代碼
}
else {
return false;
}
}
catch (IOException e) {
return false;
}
}
return true;
}
其中有兩塊代碼
1. this.byteBufferRead.position() - this.processPostion) >= 8
2. int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPostion = pos;
這個(gè)怎么理解呢?其中8是offset的字節(jié)長(zhǎng)度
- processPostion為上次處理的到的位置那么第一句代碼就是解決拆包的問(wèn)題,如果出現(xiàn)拆包,則傳輸?shù)男∮?字節(jié),不處理,等待下一次讀取
- 這部分舉個(gè)例子比較好理解:
- 第一次獲取到數(shù)據(jù),this.byteBufferRead.position()-0為3,那么忽略
- 第二次獲取到數(shù)據(jù),this.byteBufferRead.position()為10,那么pos=10-2=8,readOffset=getLong(0),剛好讀取第一個(gè)long數(shù)據(jù),processPostion設(shè)置為8
- 第三次獲取到數(shù)據(jù),this.byteBufferRead.position()-8>8,this.byteBufferRead.position()為21,那么pos=21-5=16,readOffset=getLong(8),剛好讀的是第二個(gè)long數(shù)據(jù)
slaveAckOffset和slaveRequestOffset兩個(gè)變量的作用:
slaveAckOffset是每次Slave上傳的Offset
slaveRequestOffset是第一次Slave上傳的offset
notifyTransferSome方法如下
public void notifyTransferSome(final long offset) {
for (long value = this.push2SlaveMaxOffset.get(); offset > value;) {
boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
if (ok) {
this.groupTransferService.notifyTransferSome();
break;
}
else {
value = this.push2SlaveMaxOffset.get();
}
}
}
更新push2SlaveMaxOffset的值為當(dāng)前Slave同步到的offset,并使用GroupTransferService進(jìn)行通知
WriteSocketService
先介紹一下幾個(gè)字段
private final int HEADER_SIZE = 8 + 4;//
private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(HEADER_SIZE);
private long nextTransferFromWhere = -1;// 記錄著從commitLog哪個(gè)offset拉取消息
private SelectMapedBufferResult selectMapedBufferResult;// 拉取消息后的結(jié)果
private boolean lastWriteOver = true;//標(biāo)記著是否傳輸完成
private long lastWriteTimestamp = System.currentTimeMillis();
從run方法看起
public void run() {
while (!this.isStoped()) {
try {
this.selector.select(1000);
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
// 計(jì)算nextTransferFromWhere
if (-1 == this.nextTransferFromWhere) {
if (0 == HAConnection.this.slaveRequestOffset) {
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMapedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
this.nextTransferFromWhere = masterOffset;
} else {
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
}
if (this.lastWriteOver) {
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(HEADER_SIZE);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else {
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
// 傳入一個(gè)offset,從CommitLog去拉取消息,和消費(fèi)者拉取消息類似
SelectMapedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
//每次只同步32K的數(shù)據(jù)
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);//限制byteBuf最多只能操作32K的數(shù)據(jù)
this.selectMapedBufferResult = selectResult;
// 構(gòu)造header,內(nèi)容處理可以看下Slave的HAClient
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(HEADER_SIZE);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();//傳輸數(shù)據(jù)
} else {
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
break;
}
}
// ....
}
傳輸數(shù)據(jù)看下transferData
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// 寫header
while (this.byteBufferHeader.hasRemaining()) {
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
}
else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
}
else {
throw new Exception("ha master write header error < 0");
}
}
if (null == this.selectMapedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// 寫消息體
if (!this.byteBufferHeader.hasRemaining()) {
while (this.selectMapedBufferResult.getByteBuffer().hasRemaining()) {
int writeSize = this.socketChannel.write(this.selectMapedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
}
else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
}
else {
throw new Exception("ha master write body error < 0");
}
}
}
boolean result =
!this.byteBufferHeader.hasRemaining()
&& !this.selectMapedBufferResult.getByteBuffer().hasRemaining();
if (!this.selectMapedBufferResult.getByteBuffer().hasRemaining()) {
this.selectMapedBufferResult.release();
this.selectMapedBufferResult = null;
}
return result;
}
處理很簡(jiǎn)單,通過(guò)Channel將Header和Body寫到Slave,最后返回bytebuf是否寫入完成
GroupTransferService
ReadSocketService處理的時(shí)候,收到Slave發(fā)送回來(lái)的Offset,會(huì)調(diào)用GroupTransferService的notifyTransferSome方法,看下這個(gè)方法做了什么
public void notifyTransferSome() {
this.notifyTransferObject.wakeup();
}
public void wakeup() {
synchronized (this) {
if (!this.hasNotified) {
this.hasNotified = true;
this.notify();
}
}
}
只是設(shè)置一個(gè)標(biāo)志,然后調(diào)用notify方法,熟悉RocketMQ刷盤策略的人肯定能想起,這里的操作和異步刷盤的時(shí)候處理是一樣的,只是設(shè)置一個(gè)標(biāo)志,然后喚醒。
那么接下來(lái)需要看一下喚醒的是什么操作。notifyTransferObject使用了wakeup,那么就有等待的地方,通過(guò)這個(gè)找到了doWaitTransfer方法
private void doWaitTransfer() {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
for (int i = 0; !transferOK && i < 5; i++) {// 重試 5次,每次條件不符合都等待Slave上傳同步結(jié)果
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
if (!transferOK) {
log.warn("transfer message to slave timeout, " + req.getNextOffset());
}
req.wakeupCustomer(transferOK);// 標(biāo)記同步完成
}
this.requestsRead.clear();
}
}
public void wakeupCustomer(final boolean flushOK) {
this.flushOK = flushOK;
this.countDownLatch.countDown();
}
首先會(huì)遍歷Request集合,這個(gè)集合是干嘛的暫不清楚,只能知道一個(gè)Request對(duì)應(yīng)了一個(gè)offset,transferOK = push2SlaveMaxOffset(slave同步到的位置) 大于 該offset,如果為false,那么繼續(xù)等待,到最后會(huì)把結(jié)果傳給flushOK這個(gè)變量,并CDL countDown。
doWaitTransfer做什么已經(jīng)搞清楚了,那么接下來(lái)就需要搞明白兩個(gè)問(wèn)題:
- GroupCommitRequest是什么?
- 這里的CDL阻塞的是哪里的線程?
在GroupCommitRequest中看到CDL是waitForFlush這個(gè)方法調(diào)用的
public boolean waitForFlush(long timeout) {
try {
boolean result = this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
return result || this.flushOK;
}
catch (InterruptedException e) {
e.printStackTrace();
return false;
}
}
使用CDL阻塞,到最后返回flushOK的值,通過(guò)這個(gè)方法找CommitLog的putMessage中的兩個(gè)調(diào)用處:
第一個(gè)調(diào)用處:
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
// 如果消息屬性WAIT為true
// 該值意思為:是否等待服務(wù)器將消息存儲(chǔ)完畢再返回(可能是等待刷盤完成或者等待同步復(fù)制到其他服務(wù)器)
if (msg.isWaitStoreMsgOK()) {
// isSlaveOK滿足如下兩個(gè)條件
// 當(dāng)Slave和Master的進(jìn)度相差小于256M,則認(rèn)為正常
// 當(dāng)Slave連接數(shù)大于0,則認(rèn)為正常
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
if (null == request) {
// GroupCommitRequest中的offset代表了當(dāng)時(shí)寫CommitLog之后CommitLog offset的位置
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
}
service.putRequest(request);// 最終調(diào)用了GroupTransferService的putRequest方法,即doWaitTransfer方法的那個(gè)集合
service.getWaitNotifyObject().wakeupAll();
// 這里就是上面講的地方,即等到同步完成
// 從代碼層面來(lái)講就是,就是等到Slave同步到的offset>result.getWroteOffset() + result.getWroteBytes()
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig()
.getSyncFlushTimeout());
if (!flushOK) {// 同步超時(shí)
log.error("do sync transfer other node, wait return, but failed, topic: "
+ msg.getTopic() + " tags: " + msg.getTags() + " client address: "
+ msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
else {
// 如果沒有Slave的情況下,還配置了SYNC_MASTER的模式,
// 那么isSlaveOK中第二個(gè)條件就直接失敗了,producer發(fā)送消息就會(huì)一直報(bào)這個(gè)錯(cuò)誤
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
第二個(gè)調(diào)用處是同步刷盤,邏輯類似:
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (msg.isWaitStoreMsgOK()) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig()
.getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: "
+ msg.getTags() + " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
}
else {
service.wakeup();
}
}
看完這兩個(gè)調(diào)用,大概理清了整個(gè)流程,完善一下一開始的時(shí)序圖:

Master和Slave相關(guān)的交互就介紹結(jié)束了,接下來(lái)分析一下Consumer還有Producer在多Master多Slave下的處理細(xì)節(jié)
Producer
Producer主要看下DefaultMQProducerImpl這個(gè)類的send
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());//1.獲取Topic信息TopicPublishInfo
if (topicPublishInfo != null && topicPublishInfo.ok()) {
// ....
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;//異步為1
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (tmpmq != null) {
mq = tmpmq;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);//3.發(fā)送消息到broker
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {//默認(rèn)是異步類型
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch ....{
// ....
}
} else {
break;
}
} // end of for
// ....
}
從先會(huì)獲取Topic相關(guān)信息TopicPublishInfo,主要是其中的隊(duì)列信息,格式如下(假設(shè)2個(gè)隊(duì)列):
broker_a-queue-0
broker_a-queue-1
broker_b-queue-0
broker_b-queue-1
即為集群中所有隊(duì)列
了解Producer發(fā)送機(jī)制的會(huì)知道,獲取到隊(duì)列后,會(huì)輪詢隊(duì)列進(jìn)行發(fā)送,假設(shè)broker_a掛了導(dǎo)致發(fā)送失敗,那么lastBrokerName不為空,選擇隊(duì)列的時(shí)候會(huì)忽略broker_a的隊(duì)列
#TopicPublishInfo.selectOneMessageQueue(String)
//....
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
//....
Consumer
首先看下發(fā)起拉取消息請(qǐng)求的地方
#PullAPIWrapper.pullKernelImpl(MessageQueue, String, long, long, int, int, long, long, long, CommunicationMode, PullCallback)
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
public FindBrokerResult findBrokerAddressInSubscribe(//
final String brokerName, //
final long brokerId, //
final boolean onlyThisBroker) {
String brokerAddr = null;
boolean slave = false;
boolean found = false;
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
brokerAddr = map.get(brokerId);
slave = (brokerId != MixAll.MASTER_ID);
found = (brokerAddr != null);
if (!found && !onlyThisBroker) {
Entry<Long, String> entry = map.entrySet().iterator().next();
brokerAddr = entry.getValue();
slave = (entry.getKey() != MixAll.MASTER_ID);
found = true;
}
}
if (found) {
return new FindBrokerResult(brokerAddr, slave);
}
return null;
}
public long recalculatePullFromWhichNode(final MessageQueue mq) {
if (this.isConnectBrokerByUser()) {// 默認(rèn)為false
return this.defaultBrokerId;
}
// 通過(guò)隊(duì)列獲取建議的brokerId
AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
if (suggest != null) {
return suggest.get();
}
//如果為空那么返回master的id,即0
return MixAll.MASTER_ID;
}
findBrokerAddressInSubscribe方法首先會(huì)通過(guò)brokerName從rokerAddrTable獲取已個(gè)map,如果是Master有Slave,那么map有兩個(gè)元素,key分別為Master的id(0)和Slave的id,brokerId默認(rèn)傳入0,在Master掛掉的情況下,獲取不到brokerAddr,那么會(huì)遍歷map的元素,獲取到slave的地址信息進(jìn)行使用
那么到這里需要考慮幾個(gè)問(wèn)題:
- Master掛掉的情況下,Consumer如何感知(從代碼層面來(lái)說(shuō),為何掛掉,傳入0,從map中獲取不到地址)
- "建議拉取消息的brokerId"(suggest)這個(gè)值怎么確認(rèn)
第一個(gè)問(wèn)題:
從map中獲取不到Master的數(shù)據(jù),可以猜到在master掛掉的時(shí)候,Client端從NameServer中獲取不到該Broker的信息,然后從map中移除,看下具體怎么實(shí)現(xiàn)的,先找到移除的地方cleanOfflineBroker方法(該方法是在Client端啟動(dòng)的時(shí)候云收銀的定時(shí)任務(wù)中進(jìn)行的)
#MQClientInstance.cleanOfflineBroker()
Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();
while (itBrokerTable.hasNext()) {
Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();
// 一個(gè)brokerName下對(duì)應(yīng)的master和slave
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>();
cloneAddrTable.putAll(oneTable);
Iterator<Entry<Long, String>> it = cloneAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> ee = it.next();
String addr = ee.getValue();
if (!this.isBrokerAddrExistInTopicRouteTable(addr)) {// 從topicRouteTable中判斷地址是否有效
it.remove();
}
}
}
遍歷brokerAddrTable,將無(wú)效的地址移除。topicRouteTable中的信息為TopicRouteData,包含了一個(gè)topic下的隊(duì)列和broker信息,這個(gè)也是在定時(shí)任務(wù)中,Client向NameServer拉取topic下的broker信息,如果Broker掛掉了(Broker會(huì)定時(shí)向NameServer注冊(cè),類似發(fā)送心跳,如果),那么自然是拉取不到該Broker的信息。
第二個(gè)問(wèn)題:
看下pullFromWhichNodeTable更新的地方,找到對(duì)應(yīng)使用這個(gè)方法的地方,如下:
#PullAPIWrapper.processPullResult(MessageQueue, PullResult, SubscriptionData)
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
//....
return pullResult;
}
這個(gè)是Consumer拉取消息的時(shí)候的回調(diào)方法,可見這個(gè)pullFromWhichNodeTable的值是Broker返回的,看下Broker什么情況下會(huì)設(shè)置這個(gè)值,首先從Broker處理拉取消息請(qǐng)求的PullMessageProcessor類開始找起,找到設(shè)置這個(gè)值的地方
// ....
// 如果getMessageResult返回從Slave拉取,那么設(shè)置slave的id,否則設(shè)置Masterid,即0
// 這種情況為消費(fèi)過(guò)慢
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
break;
case SLAVE:
if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
break;
}
if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
// 消費(fèi)過(guò)慢
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
}
// consume ok
else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
}
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
// ....
總的來(lái)說(shuō),只有消費(fèi)過(guò)慢的時(shí)候會(huì)建議從Slave拉取消息,那么這個(gè)消費(fèi)過(guò)慢是怎么判斷的,要看下DefaultMessageStore的getMessage方法的幾行代碼:
final long maxOffsetPy = this.commitLog.getMaxOffset();// 當(dāng)前消息寫入的最大位置
// ....
int i = 0;
for (; i < bufferConsumeQueue.getSize() && i < MaxFilterMessageCount; i += ConsumeQueue.CQStoreUnitSize) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
maxPhyOffsetPulling = offsetPy;// 當(dāng)前要拉取消息的位置
// ....
}
// ....
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TotalPhysicalMemorySize
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
// 意思即為本地拉取的offset和最大的offset相差的大小為內(nèi)存的40%的時(shí)候,那么建議去Slave拉取
getResult.setSuggestPullingFromSlave(diff > memory);
配置同步
Slave會(huì)定時(shí)Broker拉取配置和offset
# SlaveSynchronize.syncAll
public void syncAll() {
this.syncTopicConfig();// 從Broker拉取topic配置信息
this.syncConsumerOffset();// 從Broker拉取ConsumerOffset
this.syncDelayOffset();// 從Broker拉取DelayOffset
this.syncSubscriptionGroupConfig();// 從Broker拉取訂閱信息
}