RocketMQ源碼解析——存儲部分(6)RocketMQ主從同步原理相關(guān)的HAService和HAConnection

引導

?前面介紹了RocketMQ的CommitLog文件相關(guān)的類分析CommitLog物理日志相關(guān)的CommitLog。其中有介紹到消息刷盤時高可用對應的handleHA方法,handleHA方法中如果配置的服務(wù)器的角色為SYNC_MASTER(從master同步),就會等待主從之間消息同步的進度達到設(shè)定的值之后才正常返回,如果超時則返回同步超時

    public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        //如果設(shè)置的主從之間是同步更新
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            if (messageExt.isWaitStoreMsgOK()) {
                // 檢查slave同步的位置是否小于 最大容忍的同步落后偏移量參數(shù) haSlaveFallbehindMax,如果是的則進行主從同步刷盤
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    service.putRequest(request);
                    service.getWaitNotifyObject().wakeupAll();
                    //countDownLatch.await 同步等待刷新,除非等待超時
                    boolean flushOK =
                        request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
                        log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                            + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                }
                // Slave problem
                else {
                    // Tell the producer, slave not available
                    //設(shè)置從服務(wù)不可用的狀態(tài)
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }
    }

?這段代碼的主要邏輯如下:

  1. 如果服務(wù)器的角色設(shè)置為SYNC_MASTER,則進行下一步,否則直接跳過主從同步
  2. 獲取HAService對象,檢查消息是否本地存儲完畢,如果沒有則結(jié)束,否則進入下一步
  3. 檢查slave同步的位置是否小于 最大容忍的同步落后偏移量參數(shù)haSlaveFallbehindMax,如果是的則進行主從同步刷盤。如果沒有則返回slave不可用的狀態(tài)
  4. 將消息落盤的最大物理偏移量也就是CommitLog上的偏移量作為參數(shù)構(gòu)建一個GroupCommitRequest對象,然后提交到HAService
  5. 最多等待syncFlushTimeout長的時間,默認為5秒。在5秒內(nèi)獲取結(jié)果,然后根據(jù)結(jié)果判斷是否返回超時

同步流程

?上面那段代碼比較簡單,因為主從的邏輯全部交給了HAServiceHAConnection兩個類處理了。這里先簡單介紹一下整個同步的流程(同步模式)

在這里插入圖片描

?這個題可能不好理解,等源碼邏輯分析完之后再看可能會清楚點。

高可用服務(wù)HAService

?HAService是在RocketMQ的Broker啟動的時候就會創(chuàng)建的,而創(chuàng)建的點在DefaultMessageStore這個消息存儲相關(guān)的綜合類中,在這個類的構(gòu)造器中會創(chuàng)建HAService無論當前的Broker是什么角色。這個類后續(xù)會有文章分析
?這里需要說明的是Broker中的Master和Slaver兩個角色,代碼都是一樣的,只不過是在實際執(zhí)行的時候,走的分支不一樣

內(nèi)部屬性

?在HAService中有幾個比較重要的屬性,這里需要簡單的介紹一下

參數(shù) 說明
connectionList 連接到master的slave連接列表,用于管理連接
acceptSocketService 用于接收連接用的服務(wù),只監(jiān)聽OP_ACCEPT事件,監(jiān)聽到連接事件時候,創(chuàng)建HAConnection來處理讀寫請求事件
waitNotifyObject 一個消費等待模型類,用于處理高可用線程和CommitLog的刷盤線程交互
push2SlaveMaxOffset master同步到slave的偏移量
groupTransferService 主從同步的檢測服務(wù),用于檢查是否同步完成
haClient 高可用的服務(wù),slave用來跟master建立連接,像master匯報偏移量和拉取消息

    /**
     * 連接到本機的數(shù)量
     */
    private final AtomicInteger connectionCount = new AtomicInteger(0);
    /**
     * 連接列表,用于管理連接
     */
    private final List<HAConnection> connectionList = new LinkedList<>();
    /**
     * 接受和監(jiān)聽slave的連接服務(wù)
     */
    private final AcceptSocketService acceptSocketService;

    private final DefaultMessageStore defaultMessageStore;
    /**
     * 一個服務(wù)消費者線程模型的對象,用于跟CommitLog的刷盤線程交互
     */
    private final WaitNotifyObject waitNotifyObject = new WaitNotifyObject();
    /**
     * master跟slave 消息同步的位移量
     */
    private final AtomicLong push2SlaveMaxOffset = new AtomicLong(0);
    /**
     * 主從信息同步的服務(wù)
     */
    private final GroupTransferService groupTransferService;
    /**
     * 操作主從的類
     */
    private final HAClient haClient;
構(gòu)造函數(shù)

?HAService只有一個構(gòu)造器。邏輯也比較簡單,創(chuàng)建一個AcceptSocketService開放一個端口為 10912的端口用于slave來簡歷連接,同時啟動主從信息同步的任務(wù)groupTransferService用于接收CommitLog在高可用刷盤時提交任務(wù)

public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
        this.defaultMessageStore = defaultMessageStore;
        //創(chuàng)建,接受連接的服務(wù), 開放的端口號為10912
        this.acceptSocketService =
            new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
        //創(chuàng)建主從信息同步的線程
        this.groupTransferService = new GroupTransferService();
        this.haClient = new HAClient();
    }
內(nèi)部類分析

?HAService在創(chuàng)建之后,會在DefaultMessageStore中調(diào)用其start方法,這個方法會啟動其內(nèi)部的幾個內(nèi)部類,用來主從同步

 public void start() throws Exception {
        //接受連接的服務(wù),開啟端口,設(shè)置監(jiān)聽的事件
        this.acceptSocketService.beginAccept();
        //開啟服務(wù)不斷檢查是否有連接
        this.acceptSocketService.start();
        //開啟groupTransferService,接受CommitLog的主從同步請求
        this.groupTransferService.start();
        //開啟haClient,用于slave來建立與Master連接和同步
        this.haClient.start();
    }

?接下來對這幾個內(nèi)部類進行分析

用于接受Slave連接的AcceptSocketService

?AcceptSocketService這個類在Broker的Master和Slaver兩個角色啟動時都會創(chuàng)建,只不過區(qū)別是Slaver開啟端口之后,并不會有別的Broker與其建立連接。因為只有在Broker的角色是Slave的時候才會指定要連接的Master地址。這個邏輯,在Broker啟動的時候BrokerController類中運行的。

    public void beginAccept() throws Exception {
            //創(chuàng)建ServerSocketChannel
            this.serverSocketChannel = ServerSocketChannel.open();
            //創(chuàng)建selector
            this.selector = RemotingUtil.openSelector();
            //設(shè)置SO_REUSEADDR   https://blog.csdn.net/u010144805/article/details/78579528
            this.serverSocketChannel.socket().setReuseAddress(true);
            //設(shè)置綁定的地址
            this.serverSocketChannel.socket().bind(this.socketAddressListen);
            //設(shè)置為非阻塞模式
            this.serverSocketChannel.configureBlocking(false);
            //注冊監(jiān)聽事件為 連接事件
            this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
        }

?beginAccept方法就是開啟Socket,綁定10912端口,然后注冊selector和指定監(jiān)聽的事件為OP_ACCEPT也就是建立連接事件。對應的IO模式為NIO模式。主要看其run方法,這個方法是Master用來接受Slave連接的核心。

public void run() {
            log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    //設(shè)置阻塞等待時間
                    this.selector.select(1000);
                    //獲取selector 下的所有selectorKey ,后續(xù)迭代用
                    Set<SelectionKey> selected = this.selector.selectedKeys();

                    if (selected != null) {
                        for (SelectionKey k : selected) {
                            //檢測有連接事件的selectorKey
                            if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                                //獲取selectorKey的Channel
                                SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

                                if (sc != null) {
                                    HAService.log.info("HAService receive new connection, "
                                        + sc.socket().getRemoteSocketAddress());

                                    try {
                                        //創(chuàng)建HAConnection,建立連接
                                        HAConnection conn = new HAConnection(HAService.this, sc);
                                        //建立連接
                                        conn.start();
                                        //添加連接到連接列表中
                                        HAService.this.addConnection(conn);
                                    } catch (Exception e) {
                                        log.error("new HAConnection exception", e);
                                        sc.close();
                                    }
                                }
                            } else {
                                log.warn("Unexpected ops in select " + k.readyOps());
                            }
                        }
                        //清空連接事件,未下一次做準備
                        selected.clear();
                    }
                } catch (Exception e) {
                    log.error(this.getServiceName() + " service has exception.", e);
                }
            }

            log.info(this.getServiceName() + " service end");
        }

?這里的邏輯比較簡單。就是每過一秒檢查一次是否有連接事件,如果有則建立連接,并把建立起來的連接加入到連接列表中進行保存。一直循環(huán)這個邏輯。

檢查同步進度和喚醒CommitLog刷盤線程的GroupTransferService

?GroupTransferService是CommitLog消息刷盤的類CommitLogHAService打交道的一個中間類。在CommitLog中進行主從刷盤的時候,會創(chuàng)建一個CommitLog.GroupCommitRequest的內(nèi)部類,這個類包含了當前Broker最新的消息的物理偏移量信息。然后把這個類丟給GroupTransferService處理,然后喚醒GroupTransferService。起始這個邏輯跟CommitLog內(nèi)部的GroupCommitService邏輯一樣。只不過對于同步部分的邏輯不一樣,這里可以參考前面的文章存儲部分(3)CommitLog物理日志相關(guān)的CommitLog。
?先看run方法

public void run() {
            log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    /**
                     * 這里進入等待,等待被喚醒,進入等待之前會調(diào)用onWaitEnd方法,然后調(diào)用swapRequests方法,
                     * 吧requestsWrite轉(zhuǎn)換為requestsRead
                     */
                    this.waitForRunning(10);
                    /**
                     * 進行請求處理
                     */
                    this.doWaitTransfer();
                } catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            log.info(this.getServiceName() + " service end");
        }

?在run方法中會將傳入的CommitLog.GroupCommitRequestrequestsWrite轉(zhuǎn)換到requestsRead中然后進行處理檢查對應的同步請求的進度。檢查的邏輯在doWaitTransfer

        private void doWaitTransfer() {
            //對requestsRead請求加鎖
            synchronized (this.requestsRead) {
                //如果讀請求不為空
                if (!this.requestsRead.isEmpty()) {
                    for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                        //如果push到slave的偏移量 大于等于 請求中的消息的最大偏移量 表示slave同步完成
                        boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        //計算這次同步超時的時間點  同步的超時時間段為5s
                        long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
                            + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
                        //如果沒有同步完畢,并且還沒達到超時時間,則等待1秒之后檢查同步的進度,大概檢查5次
                        while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
                            this.notifyTransferObject.waitForRunning(1000);
                            transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        }
                        //如果超時了,檢查是不是同步完成了,
                        if (!transferOK) {
                            log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                        }
                        //超時或者同步成功的時候 喚醒主線程
                        req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }

                    this.requestsRead.clear();
                }
            }
        }

?主要邏輯如下:

  1. 比較Master推送到Slave的 偏移量push2SlaveMaxOffset是不是大于傳進來的CommitLog.GroupCommitRequest中的偏移量
  2. 計算本次同步超時的時間節(jié)點,時間為當前時間加上參數(shù)系統(tǒng)配置參數(shù)syncFlushTimeout默認為5秒
  3. 如果第一步結(jié)果為true,則返回結(jié)果為PUT_OK。如果第一步為false,則每過一秒檢查一次結(jié)果,如果超過5次了還沒同步完成,則表示超時了那么返回結(jié)果為FLUSH_SLAVE_TIMEOUT。同時會喚醒CommitLog的刷盤線程。
與Slave緊密相關(guān)的HAClient

?前面我們說到了只有是Salve角色的Broker才會真正的配置Master的地址,而HAClient是需要Master地址的,因此這個類真正在運行的時候只有Slave才會真正的使用到。
?先看看核心的參數(shù)信息

 //Socket讀緩存區(qū)大小
        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
        //master地址
        private final AtomicReference<String> masterAddress = new AtomicReference<>();
        //Slave向Master發(fā)起主從同步的拉取偏移量,固定8個字節(jié)
        private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
        private SocketChannel socketChannel;
        private Selector selector;
        //上次同步偏移量的時間戳
        private long lastWriteTimestamp = System.currentTimeMillis();
        //反饋Slave當前的復制進度,commitlog文件最大偏移量
        private long currentReportedOffset = 0;
        private int dispatchPosition = 0;
        //讀緩沖大小
        private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
        private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);

?基本上都是緩沖相關(guān)的配置。這里主要分析的是run方法中的邏輯

 public void run() {
            log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    //連接master,同時監(jiān)聽讀請求事件
                    if (this.connectMaster()) {
                        //是否需要匯報偏移量,間隔需要大于心跳的時間(5s)
                        if (this.isTimeToReportOffset()) {
                            //向master 匯報當前 salve 的CommitLog的最大偏移量,并記錄這次的同步時間
                            boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                            //如果匯報完了就關(guān)閉連接
                            if (!result) {
                                this.closeMaster();
                            }
                        }
                        
                        this.selector.select(1000);
                        //向master拉取的信息
                        boolean ok = this.processReadEvent();
                        if (!ok) {
                            this.closeMaster();
                        }
                        //再次同步slave的偏移量如果,最新的偏移量大于已經(jīng)匯報的情況下
                        if (!reportSlaveMaxOffsetPlus()) {
                            continue;
                        }
                        //檢查時間距離上次同步進度的時間間隔
                        long interval =
                            HAService.this.getDefaultMessageStore().getSystemClock().now()
                                - this.lastWriteTimestamp;
                        //如果間隔大于心跳的時間,那么就關(guān)閉
                        if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                            .getHaHousekeepingInterval()) {
                            log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                                + "] expired, " + interval);
                            this.closeMaster();
                            log.warn("HAClient, master not response some time, so close connection");
                        }
                    } else {
                        //等待
                        this.waitForRunning(1000 * 5);
                    }
                } catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                    this.waitForRunning(1000 * 5);
                }
            }

            log.info(this.getServiceName() + " service end");
        }

?主要的邏輯如下:

  1. 連接master,如果當前的broker角色是master,那么對應的masterAddress是空的,不會有后續(xù)邏輯。如果是slave,并且配置了master地址,則會進行連接進行后續(xù)邏輯處理
  2. 檢查是否需要向master匯報當前的同步進度,如果兩次同步的時間小于5s,則不進行同步。每次同步之間間隔在5s以上,這個5s是心跳連接的間隔參數(shù)為haSendHeartbeatInterval
  3. 向master 匯報當前 salve 的CommitLog的最大偏移量,并記錄這次的同步時間
  4. 從master拉取日志信息,主要就是進行消息的同步,同步出問題則關(guān)閉連接
  5. 再次同步slave的偏移量,如果最新的偏移量大于已經(jīng)匯報的情況下則從步驟1重頭開始

?這里分析完了run方法,然后就要分析主要的日志同步的邏輯了,這個邏輯在processReadEvent方法中

        private boolean processReadEvent() {
            int readSizeZeroTimes = 0;
            //如果讀取緩存還有沒讀取完,則一直讀取
            while (this.byteBufferRead.hasRemaining()) {
                try {
                    //從master讀取消息
                    int readSize = this.socketChannel.read(this.byteBufferRead);
                    if (readSize > 0) {
                        readSizeZeroTimes = 0;
                        //分發(fā)請求
                        boolean result = this.dispatchReadRequest();
                        if (!result) {
                            log.error("HAClient, dispatchReadRequest error");
                            return false;
                        }
                    } else if (readSize == 0) {
                        if (++readSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        log.info("HAClient, processReadEvent read socket < 0");
                        return false;
                    }
                } catch (IOException e) {
                    log.info("HAClient, processReadEvent read socket exception", e);
                    return false;
                }
            }

            return true;
        }

        private boolean dispatchReadRequest() {
            //請求的頭信息
            final int msgHeaderSize = 8 + 4; // phyoffset + size
            //獲取請求長度
            int readSocketPos = this.byteBufferRead.position();

            while (true) {
                //獲取分發(fā)的偏移差
                int diff = this.byteBufferRead.position() - this.dispatchPosition;
                //如果偏移差大于頭大小,說明存在請求體
                if (diff >= msgHeaderSize) {
                    //獲取主master的最大偏移量
                    long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
                    //獲取消息體
                    int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
                    //獲取salve的最大偏移量
                    long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

                    if (slavePhyOffset != 0) {
                        if (slavePhyOffset != masterPhyOffset) {
                            log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                                + slavePhyOffset + " MASTER: " + masterPhyOffset);
                            return false;
                        }
                    }
                    //如果偏移差大于 消息頭和 消息體大小。則讀取消息體
                    if (diff >= (msgHeaderSize + bodySize)) {
                        byte[] bodyData = new byte[bodySize];
                        this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
                        this.byteBufferRead.get(bodyData);
                        //吧消息同步到slave的 CommitLog
                        HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

                        this.byteBufferRead.position(readSocketPos);
                        //記錄分發(fā)的位置
                        this.dispatchPosition += msgHeaderSize + bodySize;

                        if (!reportSlaveMaxOffsetPlus()) {
                            return false;
                        }

                        continue;
                    }
                }

                if (!this.byteBufferRead.hasRemaining()) {
                    this.reallocateByteBuffer();
                }

                break;
            }

            return true;
        }

?每一步的邏輯都是比較清楚的,這里不進行講解。

Master用來同步日志用的HAConnection

?前面說過,在HAServiceAcceptSocketService內(nèi)部類中,Master會在建立連接的時候創(chuàng)建HAConnection用來處理讀寫事件。這里主要介紹構(gòu)造函數(shù)和內(nèi)部類就能了解原理了。

構(gòu)造函數(shù)
    public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
        //指定所屬的 HAService
        this.haService = haService;
        //指定的NIO的socketChannel
        this.socketChannel = socketChannel;
        //客戶端的地址
        this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
        //這是為非阻塞
        this.socketChannel.configureBlocking(false);
        /**
         * 是否啟動SO_LINGER
         * SO_LINGER作用
         * 設(shè)置函數(shù)close()關(guān)閉TCP連接時的行為。缺省close()的行為是,如果有數(shù)據(jù)殘留在socket發(fā)送緩沖區(qū)中則系統(tǒng)將繼續(xù)發(fā)送這些數(shù)據(jù)給對方,等待被確認,然后返回。
         *
         * https://blog.csdn.net/u012635648/article/details/80279338
         */
        this.socketChannel.socket().setSoLinger(false, -1);
        /**
         * 是否開啟TCP_NODELAY
         * https://blog.csdn.net/lclwjl/article/details/80154565
         */
        this.socketChannel.socket().setTcpNoDelay(true);
        //接收緩沖的大小
        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
        //發(fā)送緩沖的大小
        this.socketChannel.socket().setSendBufferSize(1024 * 64);
        //端口寫服務(wù)
        this.writeSocketService = new WriteSocketService(this.socketChannel);
        //端口讀服務(wù)
        this.readSocketService = new ReadSocketService(this.socketChannel);
        //增加haService中的連接數(shù)字段
        this.haService.getConnectionCount().incrementAndGet();
    }
內(nèi)部類分析
監(jiān)聽slave日志同步進度和同步日志的WriteSocketService

WriteSocketService監(jiān)聽的是OP_WRITE事件,注冊的端口就是在HAService中開啟的端口。直接看對應的核心方法run方法,方法有點長這里只看看核心的部分

public void run() {
            HAConnection.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    this.selector.select(1000);
                    //如果slave的讀請求為 -1 表示沒有slave 發(fā)出寫請求,不需要處理
                    if (-1 == HAConnection.this.slaveRequestOffset) {
                        Thread.sleep(10);
                        continue;
                    }
                    //nextTransferFromWhere 為-1 表示初始第一次同步,需要進行計算
                    if (-1 == this.nextTransferFromWhere) {
                        //如果slave 同步完成 則下次同步從CommitLog的最大偏移量開始同步
                        if (0 == HAConnection.this.slaveRequestOffset) {
                            //獲取master 上面的 CommitLog 最大偏移量
                            long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                            masterOffset =
                                masterOffset
                                    - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                                    .getMappedFileSizeCommitLog());

                            if (masterOffset < 0) {
                                masterOffset = 0;
                            }
                            //設(shè)置下次開始同步的位置
                            this.nextTransferFromWhere = masterOffset;
                        } else {
                            //設(shè)置下次同步的位置,為 salve 讀請求的位置
                            this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
                        }

                        log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                            + "], and slave request " + HAConnection.this.slaveRequestOffset);
                    }
                    //上次同步是否完成
                    if (this.lastWriteOver) {
                        //獲取兩次寫請求的周期時間
                        long interval =
                            HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                        //如果周期大于 心跳間隔 。需要先發(fā)送一次心跳 心跳間隔為 5000毫秒
                        if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                            .getHaSendHeartbeatInterval()) {

                            // 創(chuàng)建請求頭,心跳請求大小為12 字節(jié)
                            this.byteBufferHeader.position(0);
                            this.byteBufferHeader.limit(headerSize);
                            this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                            this.byteBufferHeader.putInt(0);
                            this.byteBufferHeader.flip();
                            //進行消息同步
                            this.lastWriteOver = this.transferData();
                            if (!this.lastWriteOver)
                                continue;
                        }
                    } else {
                        //如果上次同步?jīng)]有完成,則繼續(xù)同步
                        this.lastWriteOver = this.transferData();
                        //如果還沒同步完成則繼續(xù)
                        if (!this.lastWriteOver)
                            continue;
                    }
                    //獲取開始同步位置之后的消息
                    SelectMappedBufferResult selectResult =
                        HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
                    //
                    if (selectResult != null) {
                        int size = selectResult.getSize();
                        //檢查要同步消息的長度,是不是大于單次同步的最大限制 默認為 32kb
                        if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                            size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                        }

                        long thisOffset = this.nextTransferFromWhere;
                        this.nextTransferFromWhere += size;

                        selectResult.getByteBuffer().limit(size);
                        this.selectMappedBufferResult = selectResult;

                        // Build Header
                        this.byteBufferHeader.position(0);
                        this.byteBufferHeader.limit(headerSize);
                        this.byteBufferHeader.putLong(thisOffset);
                        this.byteBufferHeader.putInt(size);
                        this.byteBufferHeader.flip();

                        this.lastWriteOver = this.transferData();
                    } else {
                        //如果需要同步的為空,則在等待100毫秒
                        HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
                    }
                } catch (Exception e) {

                    HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                    break;
                }
            }
        

?主要的邏輯如下:

  1. 如果slave進行了日志偏移量的匯報,判斷是不是第一次的進行同步以及對應的同步進度。設(shè)置下一次的同步位置
  2. 檢查上次同步是不是已經(jīng)完成了,檢查兩次同步的周期是不是超過心跳間隔,如果是的則需要把心跳信息放到返回的頭里面,然后進行消息同步。如果上次同步還沒完成,則等待上次同步完成之后再繼續(xù)
  3. 從Master本地讀取CommitLog的最大偏移量,根據(jù)上次同步的位置開始從CommitLog獲取日志信息,然后放到緩存中
  4. 如果緩存的大小大于單次同步的最大大小haTransferBatchSize默認是32kb,那么只同步32kb大小的日志。如果緩存為null,則等待100毫秒

?其中日志同步的邏輯在transferData方法中,這里就把代碼貼出來

private boolean transferData() throws Exception {
            int writeSizeZeroTimes = 0;
            //心跳的頭沒寫滿,先寫頭
            while (this.byteBufferHeader.hasRemaining()) {
                //把請求頭傳過去
                int writeSize = this.socketChannel.write(this.byteBufferHeader);
                if (writeSize > 0) {
                    writeSizeZeroTimes = 0;
                    //記錄上次寫的時間
                    this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                } else if (writeSize == 0) {
                    //重試3次 則不再重試
                    if (++writeSizeZeroTimes >= 3) {
                        break;
                    }
                } else {
                    throw new Exception("ha master write header error < 0");
                }
            }
            //如果要同步的日志為null,則直接返回這次同步的結(jié)果是否同步完成
            if (null == this.selectMappedBufferResult) {
                return !this.byteBufferHeader.hasRemaining();
            }

            writeSizeZeroTimes = 0;

            // 填充請求體
            if (!this.byteBufferHeader.hasRemaining()) {
                //如果還沒有同步完成,則一直同步
                while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                    //同步的大小
                    int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
                    if (writeSize > 0) {
                        writeSizeZeroTimes = 0;
                        this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                    } else if (writeSize == 0) {
                        //重試3次
                        if (++writeSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        throw new Exception("ha master write body error < 0");
                    }
                }
            }

            boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
            //釋放緩存
            if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                this.selectMappedBufferResult.release();
                this.selectMappedBufferResult = null;
            }

            return result;
        }
根據(jù)同步進度來喚醒刷盤CommitLog線程的ReadSocketService

ReadSocketService的作用主要是:根據(jù)Slave推送的日志同步進度,來喚醒HAServiceGroupTransferService然后進一步喚醒CommitLog的日志刷盤線程。這里主要看run方法和processReadEvent方法。

 public void run() {
            HAConnection.log.info(this.getServiceName() + " service started");
            //任務(wù)是否結(jié)束
            while (!this.isStopped()) {
                try {
                    //設(shè)置selector的阻塞時間
                    this.selector.select(1000);
                    //處理salver讀取消息的事件
                    boolean ok = this.processReadEvent();
                    if (!ok) {
                        HAConnection.log.error("processReadEvent error");
                        break;
                    }
                    //檢查此次處理時間是否超過心跳連接時間
                    long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
                    if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                        log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
                        break;
                    }
                } catch (Exception e) {
                    HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                    break;
                }
            }
    ......
    }
    
    private boolean processReadEvent() {
            int readSizeZeroTimes = 0;
            //檢查 讀取請求緩沖是否已經(jīng)滿了,
            if (!this.byteBufferRead.hasRemaining()) {
                //讀請求緩沖轉(zhuǎn)變?yōu)樽x取模式。
                this.byteBufferRead.flip();
                this.processPosition = 0;
            }

            while (this.byteBufferRead.hasRemaining()) {
                try {
                    //從byteBufferRead讀取
                    int readSize = this.socketChannel.read(this.byteBufferRead);
                    if (readSize > 0) {
                        readSizeZeroTimes = 0;
                        this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                        //讀取請求緩沖的位置 如果大于處理的8字節(jié) 表示有讀取的請求沒處理。為什么是8個字節(jié),因為salver向master發(fā)去拉取請求時,偏移量固定為8
                        if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                            //獲取消息開始的位置
                            int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                            //從開始位置讀取8個字節(jié),獲取 slave的讀請求偏移量
                            long readOffset = this.byteBufferRead.getLong(pos - 8);
                            //設(shè)置處理的位置
                            this.processPosition = pos;
                            //設(shè)置 salver讀取的位置
                            HAConnection.this.slaveAckOffset = readOffset;
                            //如果slave的 讀請求 偏移量小于0 表示同步完成了
                            if (HAConnection.this.slaveRequestOffset < 0) {
                                //重新設(shè)置slave的 讀請求的 偏移量
                                HAConnection.this.slaveRequestOffset = readOffset;
                                log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                            }
                            //喚醒阻塞的線程, 在消息的主從同步選擇的模式是同步的時候,會喚醒被阻塞的消息寫入的線程
                            HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                        }
                    } else if (readSize == 0) {
                        //如果數(shù)據(jù)為0超過3次,表示同步完成,直接結(jié)束
                        if (++readSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                        return false;
                    }
                } catch (IOException e) {
                    log.error("processReadEvent exception", e);
                    return false;
                }
            }

            return true;
        }

?整體的邏輯如下:

  1. 每1s執(zhí)行一次事件就緒選擇,然后調(diào)用processReadEvent方法處理讀請求,讀取從服務(wù)器的拉取請求
  2. 獲取slave已拉取偏移量,因為有新的從服務(wù)器反饋拉取進度,需要通知某些生產(chǎn)者以便返回,因為如果消息發(fā)送使用同步方式,需要等待將消息復制到從服務(wù)器,然后才返回,故這里需要喚醒相關(guān)線程去判斷自己關(guān)注的消息是否已經(jīng)傳輸完成。也就是HAServiceGroupTransferService
  3. 如果讀取到的字節(jié)數(shù)等于0,則重復三次,否則結(jié)束本次讀請求處理;如果讀取到的字節(jié)數(shù)小于0,表示連接被斷開,返回false,后續(xù)會斷開該連接。

總結(jié)

?RocketMQ的主從同步之間的核心類就是HAServiceHAConnection和其中的幾個子類。結(jié)合前面的那個圖可以簡單的理解一下。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容