RocketMQ源碼分析----HA相關(guān)(1)

簡(jiǎn)單介紹

RocketMQ搭建集群無(wú)非就是搭建Master和Slave,而根據(jù)Master和Slave的數(shù)量不同又分為幾種方式:

  1. 單Master無(wú)Slave:這種就是普通的單機(jī)模式了,Master一掛,消息服務(wù)就不可用了
  2. 多Master無(wú)Slave:這種如果有其中一個(gè)Master掛了,集群還能使用,但是由于沒有Slave,如果掛的那臺(tái)Master還有未被消費(fèi)的消息,那么將會(huì)暫時(shí)無(wú)法消費(fèi)直至這臺(tái)Master恢復(fù)
  3. 多Master多Slave:這種方式最為理想,即使Master掛了,發(fā)送消息不送影響,可以發(fā)到另外的機(jī)器,消費(fèi)消息也不受影響(極端情況不考慮),因?yàn)镸aster和Slave會(huì)同步消息和配置

注:我看的是3.5.8的版本,目前沒有發(fā)現(xiàn)主從切換的功能

整體結(jié)構(gòu)圖如下:


image.png
  1. Producer:
    • 和NameServer建立連接:獲取相關(guān)配置信息,如Broker信息等
    • 和Master,Slave建立連接:Producer向Master發(fā)送消息,消息只會(huì)向Master進(jìn)行發(fā)送
  2. Master:
    • 和Producer建立連接:Producer向Master發(fā)送消息,消息只會(huì)向Broker進(jìn)行發(fā)送
    • 和NameServer建立連接:獲取相關(guān)配置信息,如Slave獲取Master信息等
    • 和Consumer建立連接:Consumer向Master拉取消息
    • 和Slave建立連接:消息同步
    • Master之間不建立連接:多個(gè)Master之間相互獨(dú)立
  3. Slave:
    • 和Master建立連接:消息同步
    • 和NameServer建立連接:獲取相關(guān)配置信息,如Slave獲取Master信息等
    • 和Consumer建立連接:Consumer向Slave拉取消息
    • MS之間不建立連接:如Master

總體流程

1.Master默認(rèn)會(huì)使用port+1作為HA相關(guān)的端口
2.Master會(huì)推送消息到Slave中,而Slave會(huì)定期處理這些消息并寫入CommitLog中
3.Slave每5s會(huì)將處理到的offset發(fā)送回Master
4.Master收到Offset會(huì)記錄下來(lái),并打上標(biāo)志位,在SYNC_MASTER的模式下通過(guò)該標(biāo)志位判斷是否已經(jīng)將消息同步到Slave

整個(gè)流程簡(jiǎn)化如下:


image.png

源碼分析

那么下面會(huì)從第3種情況進(jìn)行源碼分析,因?yàn)檫@種情況包含了上面的兩種情況的處理

消息同步

Slave

先從Slave說(shuō)起,Slave同步Broker的消息,主要是HAClient這個(gè)類,看下其核心的變量

    // 主節(jié)點(diǎn)IP:PORT
    private final AtomicReference<String> masterAddress = new AtomicReference<String>();
    // 向Master匯報(bào)Slave最大Offset
    private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
    // Slave向Master匯報(bào)Offset,匯報(bào)到哪里
    private long currentReportedOffset = 0;
    // 粘包拆包使用的
    private int dispatchPosition = 0;
    // 從Master接收數(shù)據(jù)Buffer
    private ByteBuffer byteBufferRead = ByteBuffer.allocate(ReadMaxBufferSize);

具體每個(gè)變量有什么用需要到后面再詳細(xì)介紹,先說(shuō)一下這個(gè)masterAddress,這里保存著Master的信息,而Master其實(shí)也會(huì)初始化HAClient,但是他的masterAddress是空的,所以不會(huì)進(jìn)行相應(yīng)的操作

看下核心的run方法

    public void run() {
        while (!this.isStoped()) {
            try {
                if (this.connectMaster()) {// 如果masterAddress不為空,會(huì)進(jìn)行連接并返回SocketChannel,Master沒有masterAddress所以這里直接跳過(guò)
                    // 先匯報(bào)最大物理Offset || 定時(shí)心跳方式匯報(bào)
                    if (this.isTimeToReportOffset()) {// 默認(rèn)5s進(jìn)行同步
                        boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);// 將同步進(jìn)度發(fā)送回Master
                        if (!result) {
                            this.closeMaster();
                        }
                    }

                    // 等待應(yīng)答
                    this.selector.select(1000);

                    // 接收數(shù)據(jù)
                    boolean ok = this.processReadEvent();
                    if (!ok) {
                        this.closeMaster();
                    }

                    // 只要本地有更新,就匯報(bào)最大物理Offset
                    if (!reportSlaveMaxOffsetPlus()) {
                        continue;
                    }

                    // 檢查Master的反向心跳
                    long interval =
                            HAService.this.getDefaultMessageStore().getSystemClock().now()
                                    - this.lastWriteTimestamp;
                    if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                        .getHaHousekeepingInterval()) {
                        this.closeMaster();
                    }
                }
                else {
                    this.waitForRunning(1000 * 5);
                }
            }
            catch (Exception e) {
                this.waitForRunning(1000 * 5);
            }
        }
    }

processReadEvent方法就是普通的nio程序處理接收數(shù)據(jù)的地方

        private boolean processReadEvent() {
            int readSizeZeroTimes = 0;
            while (this.byteBufferRead.hasRemaining()) {
                try {
                    int readSize = this.socketChannel.read(this.byteBufferRead);//讀取Master發(fā)送的消息
                    if (readSize > 0) {
                        lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
                        readSizeZeroTimes = 0;
                        boolean result = this.dispatchReadRequest();
                        if (!result) {
                            return false;
                        }
                    }
                    else if (readSize == 0) {
                        if (++readSizeZeroTimes >= 3) {
                            break;
                        }
                    }
                    else {
                        return false;
                    }
                }
                catch (IOException e) {
                    return false;
                }
            }
            return true;
        }

在Channel中讀取消息,并放到緩沖區(qū)中,調(diào)用dispatchReadRequest對(duì)讀取的數(shù)據(jù)進(jìn)行處理

   private boolean dispatchReadRequest() {
        final int MSG_HEADER_SIZE = 8 + 4; // phyoffset + size
        int readSocketPos = this.byteBufferRead.position();

        while (true) {
            int diff = this.byteBufferRead.position() - this.dispatchPostion;// dispatchPostion為上次處理到的位置,diff為這次讀取的數(shù)據(jù)的位置與上次讀取到的位置的差別大小
            if (diff >= MSG_HEADER_SIZE) {//如果大于Head的大小
                long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
                int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);

                long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                // 檢驗(yàn)Master和Slave之間的Offset
                if (slavePhyOffset != 0) {
                    if (slavePhyOffset != masterPhyOffset) {
                        return false;
                    }
                }
                
                // 粘包和拆包的處理
                // 如果diff 小于MSG_HEADER_SIZE + bodySize的值,那么代表發(fā)生拆包,等待下一次讀取數(shù)據(jù)
                if (diff >= (MSG_HEADER_SIZE + bodySize)) {
                    byte[] bodyData = new byte[bodySize];
                    this.byteBufferRead.position(this.dispatchPostion + MSG_HEADER_SIZE);
                    this.byteBufferRead.get(bodyData);
                    // 將Master發(fā)送的消息寫到CommitLog
                    HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

                    this.byteBufferRead.position(readSocketPos);
                    this.dispatchPostion += MSG_HEADER_SIZE + bodySize;

                    if (!reportSlaveMaxOffsetPlus()) {//將同步進(jìn)度發(fā)送回Master
                        return false;
                    }
                    continue;
                }
            }

            if (!this.byteBufferRead.hasRemaining()) {
                this.reallocateByteBuffer();
            }
            break;
        }
        return true;
    }

主要是兩點(diǎn):

  1. 獲取Master發(fā)送的消息
  2. 將Master發(fā)送的消息寫入CommitLog并告訴Master同步到的位置

再來(lái)看下reportSlaveMaxOffsetPlus的實(shí)現(xiàn)

        private boolean reportSlaveMaxOffsetPlus() {
            boolean result = true;
            // 只要本地有更新,就匯報(bào)最大物理Offset
            long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
            if (currentPhyOffset > this.currentReportedOffset) {
                this.currentReportedOffset = currentPhyOffset;
                result = this.reportSlaveMaxOffset(this.currentReportedOffset);// 通過(guò)Channel寫會(huì)Master
                if (!result) {
                    this.closeMaster();
                    log.error("HAClient, reportSlaveMaxOffset error, " + this.currentReportedOffset);
                }
            }

            return result;
        }

邏輯很簡(jiǎn)單,獲取CommitLog的最大位移,如果比上一次的發(fā)送的同步的位置大,那么就發(fā)送回Master并更新currentReportedOffset

Master

Master同步相關(guān)的類如下:

  1. HAService:?jiǎn)?dòng)同步相關(guān)服務(wù)的入口
    1. AcceptSocketService:接收Slave的連接并構(gòu)建HAConnection實(shí)例
    2. GroupTransferService:BrokerRole為SYNC_MASTER的情況下,GroupTransferService會(huì)作為一個(gè)中間的服務(wù),設(shè)置一個(gè)標(biāo)志位,用來(lái)判斷Slave是否已經(jīng)同步完成
  2. HAConnection:每個(gè)Slave會(huì)對(duì)應(yīng)一個(gè)HAConnection實(shí)例,用來(lái)與Slave交互
    1. WriteSocketService:向Slave推送消息
    2. ReadSocketService:讀取Slave發(fā)回的同步進(jìn)度

AcceptSocketService

AcceptSocketService中使用原生nio實(shí)現(xiàn),nio相關(guān)的會(huì)省略,只會(huì)講一下核心的流程。run方法是核心所在,主要是用來(lái)接收Slave的連接然后構(gòu)建HAConnection對(duì)象

        public void run() {
            while (!this.isStoped()) {
                try {
                    this.selector.select(1000);
                    Set<SelectionKey> selected = this.selector.selectedKeys();
                    if (selected != null) {
                        for (SelectionKey k : selected) {
                            if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                                SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
                                if (sc != null) {
                                    try {
                                        // 每有一個(gè)Slave連接進(jìn)來(lái),都會(huì)構(gòu)建一個(gè)HAConnection對(duì)象,并持有對(duì)應(yīng)的Channel
                                        HAConnection conn = new HAConnection(HAService.this, sc);
                                        conn.start();// 分別啟動(dòng)WriteSocketService和ReadSocketService服務(wù)
                                        HAService.this.addConnection(conn);// 將連接保存到connectionList中
                                    }
                                    catch (Exception e) {
                                        //....
                                    }
                                }
                            }
                            else {//....
                            }
                        }
                        selected.clear();
                    }
                }
                catch (Exception e) {//....
                }
            }
        }

ReadSocketService

ReadSocketService主要是負(fù)責(zé)處理Slave上傳的進(jìn)度及其他相關(guān)操作,核心都是run方法

    public void run() {
        while (!this.isStoped()) {
            try {
                this.selector.select(1000);
                boolean ok = this.processReadEvent();
                if (!ok) {
                    HAConnection.log.error("processReadEvent error");
                    break;
                }

                // 檢測(cè)心跳間隔時(shí)間,超過(guò)則強(qiáng)制斷開
                long interval =
                        HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now()
                                - this.lastReadTimestamp;
                if (interval > HAConnection.this.haService.getDefaultMessageStore()
                    .getMessageStoreConfig().getHaHousekeepingInterval()) {
                    log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr
                            + "] expired, " + interval);
                    break;
                }
            }
            catch (Exception e) {
                break;
            }
        }
        // ....
    }

處理讀取數(shù)據(jù)的地方是processReadEvent方法

    private boolean processReadEvent() {
        // ....省略其他代碼
        while (this.byteBufferRead.hasRemaining()) {
            try {
                int readSize = this.socketChannel.read(this.byteBufferRead);
                if (readSize > 0) {
                    readSizeZeroTimes = 0;
                    this.lastReadTimestamp =
                            HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                    // 接收Slave上傳的offset
                    if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
                        int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                        long readOffset = this.byteBufferRead.getLong(pos - 8);
                        this.processPostion = pos;

                        // 更新slaveAckOffset和slaveRequestOffset
                        HAConnection.this.slaveAckOffset = readOffset;
                        if (HAConnection.this.slaveRequestOffset < 0) {
                            HAConnection.this.slaveRequestOffset = readOffset;
                        }

                        // 通知前端線程
                        HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                    }
                }
                else if (readSize == 0) {
                // ....省略其他代碼
                }
                else {
                    return false;
                }
            }
            catch (IOException e) {
                return false;
            }
        }
        return true;
    }

其中有兩塊代碼

1. this.byteBufferRead.position() - this.processPostion) >= 8
2. int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPostion = pos;

這個(gè)怎么理解呢?其中8是offset的字節(jié)長(zhǎng)度

  1. processPostion為上次處理的到的位置那么第一句代碼就是解決拆包的問(wèn)題,如果出現(xiàn)拆包,則傳輸?shù)男∮?字節(jié),不處理,等待下一次讀取
  2. 這部分舉個(gè)例子比較好理解:
    1. 第一次獲取到數(shù)據(jù),this.byteBufferRead.position()-0為3,那么忽略
    2. 第二次獲取到數(shù)據(jù),this.byteBufferRead.position()為10,那么pos=10-2=8,readOffset=getLong(0),剛好讀取第一個(gè)long數(shù)據(jù),processPostion設(shè)置為8
    3. 第三次獲取到數(shù)據(jù),this.byteBufferRead.position()-8>8,this.byteBufferRead.position()為21,那么pos=21-5=16,readOffset=getLong(8),剛好讀的是第二個(gè)long數(shù)據(jù)

slaveAckOffset和slaveRequestOffset兩個(gè)變量的作用:
slaveAckOffset是每次Slave上傳的Offset
slaveRequestOffset是第一次Slave上傳的offset

notifyTransferSome方法如下

    public void notifyTransferSome(final long offset) {
        for (long value = this.push2SlaveMaxOffset.get(); offset > value;) {
            boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
            if (ok) {
                this.groupTransferService.notifyTransferSome();
                break;
            }
            else {
                value = this.push2SlaveMaxOffset.get();
            }
        }
    }

更新push2SlaveMaxOffset的值為當(dāng)前Slave同步到的offset,并使用GroupTransferService進(jìn)行通知

WriteSocketService

先介紹一下幾個(gè)字段

        private final int HEADER_SIZE = 8 + 4;//
        private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(HEADER_SIZE);
        private long nextTransferFromWhere = -1;// 記錄著從commitLog哪個(gè)offset拉取消息
        private SelectMapedBufferResult selectMapedBufferResult;// 拉取消息后的結(jié)果
        private boolean lastWriteOver = true;//標(biāo)記著是否傳輸完成
        private long lastWriteTimestamp = System.currentTimeMillis();

從run方法看起

    public void run() {
        while (!this.isStoped()) {
            try {
                this.selector.select(1000);

                if (-1 == HAConnection.this.slaveRequestOffset) {
                    Thread.sleep(10);
                    continue;
                }
                
                // 計(jì)算nextTransferFromWhere
                if (-1 == this.nextTransferFromWhere) {
                    if (0 == HAConnection.this.slaveRequestOffset) {
                        long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                        masterOffset =
                                masterOffset
                                        - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                                        .getMapedFileSizeCommitLog());

                        if (masterOffset < 0) {
                            masterOffset = 0;
                        }

                        this.nextTransferFromWhere = masterOffset;
                    } else {
                        this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
                    }
                }

                if (this.lastWriteOver) {
                    long interval =
                            HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;

                    if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                            .getHaSendHeartbeatInterval()) {

                        // Build Header
                        this.byteBufferHeader.position(0);
                        this.byteBufferHeader.limit(HEADER_SIZE);
                        this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                        this.byteBufferHeader.putInt(0);
                        this.byteBufferHeader.flip();

                        this.lastWriteOver = this.transferData();
                        if (!this.lastWriteOver)
                            continue;
                    }
                } else {
                    this.lastWriteOver = this.transferData();
                    if (!this.lastWriteOver)
                        continue;
                }
                
                // 傳入一個(gè)offset,從CommitLog去拉取消息,和消費(fèi)者拉取消息類似
                SelectMapedBufferResult selectResult =
                        HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
                if (selectResult != null) {
                    int size = selectResult.getSize();
                    //每次只同步32K的數(shù)據(jù)
                    if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                        size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                    }

                    long thisOffset = this.nextTransferFromWhere;
                    this.nextTransferFromWhere += size;

                    selectResult.getByteBuffer().limit(size);//限制byteBuf最多只能操作32K的數(shù)據(jù)
                    this.selectMapedBufferResult = selectResult;

                    // 構(gòu)造header,內(nèi)容處理可以看下Slave的HAClient
                    this.byteBufferHeader.position(0);
                    this.byteBufferHeader.limit(HEADER_SIZE);
                    this.byteBufferHeader.putLong(thisOffset);
                    this.byteBufferHeader.putInt(size);
                    this.byteBufferHeader.flip();

                    this.lastWriteOver = this.transferData();//傳輸數(shù)據(jù)
                } else {
                    HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
                }
            } catch (Exception e) {
                break;
            }
        }
        // ....
    }

傳輸數(shù)據(jù)看下transferData

    private boolean transferData() throws Exception {
        int writeSizeZeroTimes = 0;
        // 寫header
        while (this.byteBufferHeader.hasRemaining()) {
            int writeSize = this.socketChannel.write(this.byteBufferHeader);
            if (writeSize > 0) {
                writeSizeZeroTimes = 0;
                this.lastWriteTimestamp =
                        HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
            }
            else if (writeSize == 0) {
                if (++writeSizeZeroTimes >= 3) {
                    break;
                }
            }
            else {
                throw new Exception("ha master write header error < 0");
            }
        }

        if (null == this.selectMapedBufferResult) {
            return !this.byteBufferHeader.hasRemaining();
        }

        writeSizeZeroTimes = 0;

        // 寫消息體
        if (!this.byteBufferHeader.hasRemaining()) {
            while (this.selectMapedBufferResult.getByteBuffer().hasRemaining()) {
                int writeSize = this.socketChannel.write(this.selectMapedBufferResult.getByteBuffer());
                if (writeSize > 0) {
                    writeSizeZeroTimes = 0;
                    this.lastWriteTimestamp =
                            HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                }
                else if (writeSize == 0) {
                    if (++writeSizeZeroTimes >= 3) {
                        break;
                    }
                }
                else {
                    throw new Exception("ha master write body error < 0");
                }
            }
        }

        boolean result =
                !this.byteBufferHeader.hasRemaining()
                        && !this.selectMapedBufferResult.getByteBuffer().hasRemaining();

        if (!this.selectMapedBufferResult.getByteBuffer().hasRemaining()) {
            this.selectMapedBufferResult.release();
            this.selectMapedBufferResult = null;
        }

        return result;
    }

處理很簡(jiǎn)單,通過(guò)Channel將Header和Body寫到Slave,最后返回bytebuf是否寫入完成

GroupTransferService

ReadSocketService處理的時(shí)候,收到Slave發(fā)送回來(lái)的Offset,會(huì)調(diào)用GroupTransferService的notifyTransferSome方法,看下這個(gè)方法做了什么

    public void notifyTransferSome() {
        this.notifyTransferObject.wakeup();
    }
    public void wakeup() {
        synchronized (this) {
            if (!this.hasNotified) {
                this.hasNotified = true;
                this.notify();
            }
        }
    }

只是設(shè)置一個(gè)標(biāo)志,然后調(diào)用notify方法,熟悉RocketMQ刷盤策略的人肯定能想起,這里的操作和異步刷盤的時(shí)候處理是一樣的,只是設(shè)置一個(gè)標(biāo)志,然后喚醒。
那么接下來(lái)需要看一下喚醒的是什么操作。notifyTransferObject使用了wakeup,那么就有等待的地方,通過(guò)這個(gè)找到了doWaitTransfer方法

        private void doWaitTransfer() {
            if (!this.requestsRead.isEmpty()) {
                for (GroupCommitRequest req : this.requestsRead) {
                    boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                    for (int i = 0; !transferOK && i < 5; i++) {// 重試 5次,每次條件不符合都等待Slave上傳同步結(jié)果
                        this.notifyTransferObject.waitForRunning(1000);
                        transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                    }

                    if (!transferOK) {
                        log.warn("transfer message to slave timeout, " + req.getNextOffset());
                    }
                    req.wakeupCustomer(transferOK);// 標(biāo)記同步完成
                }

                this.requestsRead.clear();
            }
        }
        
        public void wakeupCustomer(final boolean flushOK) {
        this.flushOK = flushOK;
        this.countDownLatch.countDown();
    }

首先會(huì)遍歷Request集合,這個(gè)集合是干嘛的暫不清楚,只能知道一個(gè)Request對(duì)應(yīng)了一個(gè)offset,transferOK = push2SlaveMaxOffset(slave同步到的位置) 大于 該offset,如果為false,那么繼續(xù)等待,到最后會(huì)把結(jié)果傳給flushOK這個(gè)變量,并CDL countDown。

doWaitTransfer做什么已經(jīng)搞清楚了,那么接下來(lái)就需要搞明白兩個(gè)問(wèn)題:

  • GroupCommitRequest是什么?
  • 這里的CDL阻塞的是哪里的線程?

在GroupCommitRequest中看到CDL是waitForFlush這個(gè)方法調(diào)用的

    public boolean waitForFlush(long timeout) {
        try {
            boolean result = this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
            return result || this.flushOK;
        }
        catch (InterruptedException e) {
            e.printStackTrace();
            return false;
        }
    }

使用CDL阻塞,到最后返回flushOK的值,通過(guò)這個(gè)方法找CommitLog的putMessage中的兩個(gè)調(diào)用處:
第一個(gè)調(diào)用處:

        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            // 如果消息屬性WAIT為true
            // 該值意思為:是否等待服務(wù)器將消息存儲(chǔ)完畢再返回(可能是等待刷盤完成或者等待同步復(fù)制到其他服務(wù)器)
            if (msg.isWaitStoreMsgOK()) {
                // isSlaveOK滿足如下兩個(gè)條件
                // 當(dāng)Slave和Master的進(jìn)度相差小于256M,則認(rèn)為正常
                // 當(dāng)Slave連接數(shù)大于0,則認(rèn)為正常
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                    if (null == request) {
                        // GroupCommitRequest中的offset代表了當(dāng)時(shí)寫CommitLog之后CommitLog offset的位置
                        request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    }
                    service.putRequest(request);// 最終調(diào)用了GroupTransferService的putRequest方法,即doWaitTransfer方法的那個(gè)集合

                    service.getWaitNotifyObject().wakeupAll();
                    
                    // 這里就是上面講的地方,即等到同步完成
                    // 從代碼層面來(lái)講就是,就是等到Slave同步到的offset>result.getWroteOffset() + result.getWroteBytes()
                    boolean flushOK =
                            request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig()
                                .getSyncFlushTimeout());
                    if (!flushOK) {// 同步超時(shí)
                        log.error("do sync transfer other node, wait return, but failed, topic: "
                                + msg.getTopic() + " tags: " + msg.getTags() + " client address: "
                                + msg.getBornHostString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                }
                else {
                    // 如果沒有Slave的情況下,還配置了SYNC_MASTER的模式,
                    // 那么isSlaveOK中第二個(gè)條件就直接失敗了,producer發(fā)送消息就會(huì)一直報(bào)這個(gè)錯(cuò)誤
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }

第二個(gè)調(diào)用處是同步刷盤,邏輯類似:

    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (msg.isWaitStoreMsgOK()) {
            request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            service.putRequest(request);
            boolean flushOK =
                    request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig()
                        .getSyncFlushTimeout());
            if (!flushOK) {
                log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: "
                        + msg.getTags() + " client address: " + msg.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        }
        else {
            service.wakeup();
        }
    }

看完這兩個(gè)調(diào)用,大概理清了整個(gè)流程,完善一下一開始的時(shí)序圖:


交互圖.png

Master和Slave相關(guān)的交互就介紹結(jié)束了,接下來(lái)分析一下Consumer還有Producer在多Master多Slave下的處理細(xì)節(jié)

Producer

Producer主要看下DefaultMQProducerImpl這個(gè)類的send

        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());//1.獲取Topic信息TopicPublishInfo
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            // ....
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;//異步為1
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (tmpmq != null) {
                    mq = tmpmq;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);//3.發(fā)送消息到broker
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {//默認(rèn)是異步類型
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }
                                return sendResult;
                            default:
                                break;
                        }
                    } catch ....{
                        // ....
                    }
                } else {
                    break;
                }
            } // end of for
            // ....
        }

從先會(huì)獲取Topic相關(guān)信息TopicPublishInfo,主要是其中的隊(duì)列信息,格式如下(假設(shè)2個(gè)隊(duì)列):
broker_a-queue-0
broker_a-queue-1
broker_b-queue-0
broker_b-queue-1
即為集群中所有隊(duì)列
了解Producer發(fā)送機(jī)制的會(huì)知道,獲取到隊(duì)列后,會(huì)輪詢隊(duì)列進(jìn)行發(fā)送,假設(shè)broker_a掛了導(dǎo)致發(fā)送失敗,那么lastBrokerName不為空,選擇隊(duì)列的時(shí)候會(huì)忽略broker_a的隊(duì)列

    #TopicPublishInfo.selectOneMessageQueue(String)
    //....
    if (!mq.getBrokerName().equals(lastBrokerName)) {
        return mq;
    }
    //....

Consumer

首先看下發(fā)起拉取消息請(qǐng)求的地方

#PullAPIWrapper.pullKernelImpl(MessageQueue, String, long, long, int, int, long, long, long, CommunicationMode, PullCallback)
    FindBrokerResult findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                        this.recalculatePullFromWhichNode(mq), false);

        public FindBrokerResult findBrokerAddressInSubscribe(//
                                                         final String brokerName, //
                                                         final long brokerId, //
                                                         final boolean onlyThisBroker) {
        String brokerAddr = null;
        boolean slave = false;
        boolean found = false;

        HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            brokerAddr = map.get(brokerId);
            slave = (brokerId != MixAll.MASTER_ID);
            found = (brokerAddr != null);

            if (!found && !onlyThisBroker) {
                Entry<Long, String> entry = map.entrySet().iterator().next();
                brokerAddr = entry.getValue();
                slave = (entry.getKey() != MixAll.MASTER_ID);
                found = true;
            }
        }

        if (found) {
            return new FindBrokerResult(brokerAddr, slave);
        }

        return null;
    }

    public long recalculatePullFromWhichNode(final MessageQueue mq) {
        if (this.isConnectBrokerByUser()) {// 默認(rèn)為false
            return this.defaultBrokerId;  
        }
        // 通過(guò)隊(duì)列獲取建議的brokerId
        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (suggest != null) {
            return suggest.get();
        }
        //如果為空那么返回master的id,即0
        return MixAll.MASTER_ID;
    }

findBrokerAddressInSubscribe方法首先會(huì)通過(guò)brokerName從rokerAddrTable獲取已個(gè)map,如果是Master有Slave,那么map有兩個(gè)元素,key分別為Master的id(0)和Slave的id,brokerId默認(rèn)傳入0,在Master掛掉的情況下,獲取不到brokerAddr,那么會(huì)遍歷map的元素,獲取到slave的地址信息進(jìn)行使用

那么到這里需要考慮幾個(gè)問(wèn)題:

  1. Master掛掉的情況下,Consumer如何感知(從代碼層面來(lái)說(shuō),為何掛掉,傳入0,從map中獲取不到地址)
  2. "建議拉取消息的brokerId"(suggest)這個(gè)值怎么確認(rèn)

第一個(gè)問(wèn)題:
從map中獲取不到Master的數(shù)據(jù),可以猜到在master掛掉的時(shí)候,Client端從NameServer中獲取不到該Broker的信息,然后從map中移除,看下具體怎么實(shí)現(xiàn)的,先找到移除的地方cleanOfflineBroker方法(該方法是在Client端啟動(dòng)的時(shí)候云收銀的定時(shí)任務(wù)中進(jìn)行的)

#MQClientInstance.cleanOfflineBroker()

    Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();
    while (itBrokerTable.hasNext()) {
        Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();
        // 一個(gè)brokerName下對(duì)應(yīng)的master和slave
        String brokerName = entry.getKey();
        HashMap<Long, String> oneTable = entry.getValue();

        HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>();
        cloneAddrTable.putAll(oneTable);

        Iterator<Entry<Long, String>> it = cloneAddrTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<Long, String> ee = it.next();
            String addr = ee.getValue();
            if (!this.isBrokerAddrExistInTopicRouteTable(addr)) {// 從topicRouteTable中判斷地址是否有效
                it.remove();
            }
        }
    }

遍歷brokerAddrTable,將無(wú)效的地址移除。topicRouteTable中的信息為TopicRouteData,包含了一個(gè)topic下的隊(duì)列和broker信息,這個(gè)也是在定時(shí)任務(wù)中,Client向NameServer拉取topic下的broker信息,如果Broker掛掉了(Broker會(huì)定時(shí)向NameServer注冊(cè),類似發(fā)送心跳,如果),那么自然是拉取不到該Broker的信息。

第二個(gè)問(wèn)題:
看下pullFromWhichNodeTable更新的地方,找到對(duì)應(yīng)使用這個(gè)方法的地方,如下:

#PullAPIWrapper.processPullResult(MessageQueue, PullResult, SubscriptionData)

    public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
                                        final SubscriptionData subscriptionData) {
        PullResultExt pullResultExt = (PullResultExt) pullResult;

        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
        //....

        return pullResult;
    }

這個(gè)是Consumer拉取消息的時(shí)候的回調(diào)方法,可見這個(gè)pullFromWhichNodeTable的值是Broker返回的,看下Broker什么情況下會(huì)設(shè)置這個(gè)值,首先從Broker處理拉取消息請(qǐng)求的PullMessageProcessor類開始找起,找到設(shè)置這個(gè)值的地方

    // ....
            // 如果getMessageResult返回從Slave拉取,那么設(shè)置slave的id,否則設(shè)置Masterid,即0
            // 這種情況為消費(fèi)過(guò)慢
            if (getMessageResult.isSuggestPullingFromSlave()) {
                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
            } else {
                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
            }

            switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
                case ASYNC_MASTER:
                case SYNC_MASTER:
                    break;
                case SLAVE:
                    if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                    }
                    break;
            }

            if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                 // 消費(fèi)過(guò)慢
                if (getMessageResult.isSuggestPullingFromSlave()) {
                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
                }
                // consume ok
                else {
                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
                }
            } else {
                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
            }
    // ....

總的來(lái)說(shuō),只有消費(fèi)過(guò)慢的時(shí)候會(huì)建議從Slave拉取消息,那么這個(gè)消費(fèi)過(guò)慢是怎么判斷的,要看下DefaultMessageStore的getMessage方法的幾行代碼:

    final long maxOffsetPy = this.commitLog.getMaxOffset();// 當(dāng)前消息寫入的最大位置
    // ....
    int i = 0;
    for (; i < bufferConsumeQueue.getSize() && i < MaxFilterMessageCount; i += ConsumeQueue.CQStoreUnitSize) {
        long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();

        maxPhyOffsetPulling = offsetPy;// 當(dāng)前要拉取消息的位置
        // ....
    }
    // ....
    long diff = maxOffsetPy - maxPhyOffsetPulling;
    long memory = (long) (StoreUtil.TotalPhysicalMemorySize
            * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
    // 意思即為本地拉取的offset和最大的offset相差的大小為內(nèi)存的40%的時(shí)候,那么建議去Slave拉取
    getResult.setSuggestPullingFromSlave(diff > memory);

配置同步

Slave會(huì)定時(shí)Broker拉取配置和offset

# SlaveSynchronize.syncAll
    public void syncAll() {
        this.syncTopicConfig();// 從Broker拉取topic配置信息
        this.syncConsumerOffset();// 從Broker拉取ConsumerOffset
        this.syncDelayOffset();// 從Broker拉取DelayOffset
        this.syncSubscriptionGroupConfig();// 從Broker拉取訂閱信息
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容