RocketMQ源碼之HA(高可用)

RocketMQ的HA主要體現(xiàn)在服務(wù)器端的Namesrv和broker的配置上,既然是進(jìn)行了HA,那么肯定是集群來(lái)保證的,我們一一來(lái)看。

Namesrv高可用

Namesrv集群?jiǎn)?dòng)多個(gè) Namesrv實(shí)例實(shí)現(xiàn)高可用,各個(gè)Namesrv節(jié)點(diǎn)之間的關(guān)系比較特殊:

各個(gè)Namesrv節(jié)點(diǎn)之間沒有任何關(guān)聯(lián)關(guān)系,不進(jìn)行通信和數(shù)據(jù)交換,僅僅作為負(fù)載節(jié)點(diǎn)而存在,當(dāng)有節(jié)點(diǎn)掛掉時(shí),其它節(jié)點(diǎn)不會(huì)受影響,而是繼續(xù)提供服務(wù),除非所有機(jī)器都掛掉,這時(shí)Namesrv集群才會(huì)癱瘓。

1.broker注冊(cè)namesrv的代碼

public RegisterBrokerResult registerBrokerAll( 一堆入?yún)? {
        RegisterBrokerResult registerBrokerResult = null;
//步驟1
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null) {
            for (String namesrvAddr : nameServerAddressList) {
                try {
//步驟2
                    RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
                        haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
                    if (result != null) {
                        registerBrokerResult = result;
                    }

                    log.info("register broker to name server {} OK", namesrvAddr);
                } catch (Exception e) {
                    log.warn("registerBroker Exception, {}", namesrvAddr, e);
                }
            }
        }

        return registerBrokerResult;
}

1.獲取namesrv全部節(jié)點(diǎn)列表nameServerAddressList
2.循環(huán)namesrv,該broker把自己注冊(cè)到每個(gè)namesrv上,由此來(lái)保證所有的namesrv節(jié)點(diǎn)的數(shù)據(jù)是一致的。

2.Producer訪問 Namesrv

private Channel getAndCreateNameserverChannel() throws InterruptedException {
        ···
        ···
        final List<String> addrList = this.namesrvAddrList.get();
        if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                addr = this.namesrvAddrChoosed.get();
                if (addr != null) {
                    ChannelWrapper cw = this.channelTables.get(addr);
                    if (cw != null && cw.isOK()) {
                        return cw.getChannel();
                    }
                }

                if (addrList != null && !addrList.isEmpty()) {
                    for (int i = 0; i < addrList.size(); i++) {
                        int index = this.namesrvIndex.incrementAndGet();
                        index = Math.abs(index);
                        index = index % addrList.size();
                        String newAddr = addrList.get(index);

                        this.namesrvAddrChoosed.set(newAddr);
                        log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
                        Channel channelNew = this.createChannel(newAddr);
                        if (channelNew != null)
                            return channelNew;
                    }
                }
            } catch (Exception e) {
                log.error("getAndCreateNameserverChannel: create name server channel exception", e);
            } finally {
                this.lockNamesrvChannel.unlock();
            }
        } else {
            log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
        }

        return null;
    }

生產(chǎn)者、消費(fèi)者都是從namesrv列表中選擇可用的節(jié)點(diǎn)進(jìn)行連接。

Broker高可用

Broker的一個(gè)主從組:Master節(jié)點(diǎn)x1 + Slave節(jié)點(diǎn)xN,
Master節(jié)點(diǎn)提供讀寫服務(wù),Slave節(jié)點(diǎn)只提供讀服務(wù)。

每個(gè)主從組,Master節(jié)點(diǎn) 不斷發(fā)送新的 CommitLog 給 Slave節(jié)點(diǎn)。 Slave節(jié)點(diǎn) 不斷上報(bào)本地的 CommitLog 已經(jīng)同步到的位置給 Master節(jié)點(diǎn)。

集群內(nèi),Master節(jié)點(diǎn) 有兩種類型:同步和異步:前者在 Producer 發(fā)送消息時(shí),等待 Slave節(jié)點(diǎn) 存儲(chǔ)完畢后再返回發(fā)送結(jié)果,而后者不需要等待。

1.主節(jié)點(diǎn)邏輯代碼

public void run() {
            HAConnection.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    this.selector.select(1000);
//步驟1
                    boolean ok = this.processReadEvent();
                    if (!ok) {
                        HAConnection.log.error("processReadEvent error");
                        break;
                    }
//步驟2
                    long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
                    if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                        log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
                        break;
                    }
                } catch (Exception e) {
                    HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                    break;
                }
            }
          ···
          ···
          ···
        }

1.this.processReadEvent()來(lái)計(jì)算slave請(qǐng)求同步的CommitLog的位置
2.判斷是否連接超時(shí)

2.WriteSocketService向 Slave 傳輸新的 CommitLog數(shù)據(jù)

01.png

3.Slave 循環(huán)
實(shí)現(xiàn)從 Master 傳輸 CommitLog 數(shù)據(jù),上傳 Master 自己本地的 CommitLog 已經(jīng)同步物理位置。

public void run() {
            log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
//步驟1
                    if (this.connectMaster()) {

                        if (this.isTimeToReportOffset()) {
                            boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                            if (!result) {
                                this.closeMaster();
                            }
                        }

                        this.selector.select(1000);
//步驟2
                        boolean ok = this.processReadEvent();
                        if (!ok) {
                            this.closeMaster();
                        }

                        if (!reportSlaveMaxOffsetPlus()) {
                            continue;
                        }
//步驟3
                        long interval =
                            HAService.this.getDefaultMessageStore().getSystemClock().now()
                                - this.lastWriteTimestamp;
                        if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                            .getHaHousekeepingInterval()) {
                            log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                                + "] expired, " + interval);
                            this.closeMaster();
                            log.warn("HAClient, master not response some time, so close connection");
                        }
                    } else {
                        this.waitForRunning(1000 * 5);
                    }
                } catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                    this.waitForRunning(1000 * 5);
                }
            }

            log.info(this.getServiceName() + " service end");
        }

1.判斷上報(bào)master的時(shí)間間隔并返回result,起到了心跳作用
2.同master的該方法,計(jì)算CommitLog偏移量
3.Master過久未返回?cái)?shù)據(jù),關(guān)閉連接

4.Producer 發(fā)送消息

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//判斷是否開啟了開關(guān)
        if (this.sendLatencyFaultEnable) {
            try {
//獲取一個(gè)可用的并且brokerName=lastBrokerName的消息隊(duì)列
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }
//選擇一個(gè)相對(duì)好的broker,不考慮可用性的消息隊(duì)列
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
//隨機(jī)選擇一個(gè)消息隊(duì)列
            return tpInfo.selectOneMessageQueue();
        }
//獲得 lastBrokerName 對(duì)應(yīng)的一個(gè)消息隊(duì)列,不考慮該隊(duì)列的可用性
        return tpInfo.selectOneMessageQueue(lastBrokerName);
}

我們來(lái)看選擇broker的邏輯:

1.首先選擇一個(gè)broker==lastBrokerName并且可用的一個(gè)隊(duì)列(也就是該隊(duì)列并沒有因?yàn)檠舆t過長(zhǎng)而被加進(jìn)了延遲容錯(cuò)對(duì)象latencyFaultTolerance 中)
2.如果第一步中沒有找到合適的隊(duì)列,此時(shí)舍棄broker==lastBrokerName這個(gè)條件,選擇一個(gè)相對(duì)較好的broker來(lái)發(fā)送
3.隨機(jī)選擇一個(gè)隊(duì)列來(lái)發(fā)送

總結(jié)

1.RocketMQ通過啟動(dòng)多個(gè) 【Broker主從組】 形成 集群 實(shí)現(xiàn)Broker的高可用。
2.Broker主從組 與 Broker主從組 之間沒有任何關(guān)系,不進(jìn)行通信與數(shù)據(jù)同步。
3.Namesrv各節(jié)點(diǎn)之間類似于Broker主從組之間的關(guān)系,相互獨(dú)立,共同負(fù)載,不進(jìn)行通信與數(shù)據(jù)同步。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容