RocketMQ主從同步源碼分析

微信公眾號「后端進(jìn)階」,專注后端技術(shù)分享:Java、Golang、WEB框架、分布式中間件、服務(wù)治理等等。

之前寫了一篇關(guān)于 RocketMQ 隊列與 Kafka 分區(qū)副本的區(qū)別文章,里面提到了 RocketMQ 的消息冗余主要是通過主備同步機(jī)制實(shí)現(xiàn)的,這跟 Kafka 分區(qū)副本的 Leader-Follower 模型不同,HA(High Available) 指的是高可用性,而 RocketMQ 的HA機(jī)制是通過主備同步實(shí)現(xiàn)消息的高可用。

HA 核心類

HA 的實(shí)現(xiàn)邏輯放在了 store 存儲模塊的ha目錄中,其核心實(shí)現(xiàn)類如下:

image
  1. HAService:主從同步的核心實(shí)現(xiàn)類
  2. HAService$AcceptSocketService:主服務(wù)器監(jiān)聽從服務(wù)器連接實(shí)現(xiàn)類
  3. HAService$GroupTransferService:主從同步通知類,實(shí)現(xiàn)同步復(fù)制和異步復(fù)制的功能
  4. HAService$HAClient:從服務(wù)器連接主服務(wù)實(shí)現(xiàn)類
  5. HAConnection:主服務(wù)端 HA 連接對象的封裝,當(dāng)主服務(wù)器接收到從服務(wù)器發(fā)過來的消息后,會封裝成一個 HAConnection 對象,其中里面又封裝了讀 Socket 連接實(shí)現(xiàn)與 寫 Socket 連接實(shí)現(xiàn):
  • HAConnection$ReadSocketService:主服務(wù)器讀實(shí)現(xiàn)類
  • HAConnection$WriteSocketService:主服務(wù)器寫實(shí)現(xiàn)類

RocketMQ 主從同步的整體工作機(jī)制大致是:

  1. 從服務(wù)器主動建立 TCP 連接主服務(wù)器,然后每隔 5s 向主服務(wù)器發(fā)送 commitLog 文件最大偏移量拉取還未同步的消息;
  2. 主服務(wù)器開啟監(jiān)聽端口,監(jiān)聽從服務(wù)器發(fā)送過來的信息,主服務(wù)器收到從服務(wù)器發(fā)過來的偏移量進(jìn)行解析,并返回查找出未同步的消息給從服務(wù)器;
  3. 客戶端收到主服務(wù)器的消息后,將這批消息寫入 commitLog 文件中,然后更新 commitLog 拉取偏移量,接著繼續(xù)向主服務(wù)拉取未同步的消息。

Slave -> Master 過程

從 HA 實(shí)現(xiàn)邏輯可看出,可大致分為兩個過程,分別是從服務(wù)器上報偏移量,以及主服務(wù)器發(fā)送未同步消息到從服務(wù)器。

從上面的實(shí)現(xiàn)類可知,從服務(wù)器向主服務(wù)器上報偏移量的邏輯在 HAClient 類中,HAClient 類是一個繼承了 ServiceThread 類,即它是一個線程服務(wù)類,在 Broker 啟動后,Broker 啟動開一條線程定時執(zhí)行從服務(wù)器上報偏移量到主服務(wù)器的任務(wù)。

org.apache.rocketmq.store.ha.HAService.HAClient#run:

public void run() {
  log.info(this.getServiceName() + " service started");

  while (!this.isStopped()) {
    try {
      // 主動連接主服務(wù)器,獲取socketChannel對象
      if (this.connectMaster()) {
        if (this.isTimeToReportOffset()) {
          // 執(zhí)行上報偏移量到主服務(wù)器
          boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
          if (!result) {
            this.closeMaster();
          }
        }
                // 每隔一秒鐘輪詢一遍
        this.selector.select(1000);

        // 處理主服務(wù)器發(fā)送過來的消息
        boolean ok = this.processReadEvent();
        if (!ok) {
          this.closeMaster();
        }
        
        // ......
        
      } else {
        this.waitForRunning(1000 * 5);
      }
    } catch (Exception e) {
      log.warn(this.getServiceName() + " service has exception. ", e);
      this.waitForRunning(1000 * 5);
    }
  }

  log.info(this.getServiceName() + " service end");
}

以上是 HAClient 線程 run 方法邏輯,主要是做了主動連接主服務(wù)器,并上報偏移量到主服務(wù)器,以及處理主服務(wù)器發(fā)送過來的消息,并不斷循環(huán)執(zhí)行以上邏輯。

org.apache.rocketmq.store.ha.HAService.HAClient#connectMaster:

private boolean connectMaster() throws ClosedChannelException {
  if (null == socketChannel) {
    String addr = this.masterAddress.get();
    if (addr != null) {
      SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
      if (socketAddress != null) {
        this.socketChannel = RemotingUtil.connect(socketAddress);
        if (this.socketChannel != null) {
          this.socketChannel.register(this.selector, SelectionKey.OP_READ);
        }
      }
    }
    this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
    this.lastWriteTimestamp = System.currentTimeMillis();
  }
  return this.socketChannel != null;
}

該方法是從服務(wù)器連接主服務(wù)器的邏輯,拿到主服務(wù)器地址并且連接上以后,會獲取一個 socketChannel 對象,接著還會記錄當(dāng)前時間戳為上次寫入的時間戳,lastWriteTimestamp 的作用時用來計算主從同步時間間隔,這里需要注意一點(diǎn),如果沒有配置主服務(wù)器地址,該方法會返回 false,即不會執(zhí)行主從復(fù)制。

該方法還會調(diào)用 DefaultMessageStore 的 getMaxPhyOffset() 方法獲取 commitLog 文件最大偏移量,作為本次上報的偏移量。

org.apache.rocketmq.store.ha.HAService.HAClient#reportSlaveMaxOffset:

private boolean reportSlaveMaxOffset(final long maxOffset) {
  this.reportOffset.position(0);
  this.reportOffset.limit(8);
  this.reportOffset.putLong(maxOffset);
  this.reportOffset.position(0);
  this.reportOffset.limit(8);

  for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
    try {
      this.socketChannel.write(this.reportOffset);
    } catch (IOException e) {
      log.error(this.getServiceName()
                + "reportSlaveMaxOffset this.socketChannel.write exception", e);
      return false;
    }
  }
  return !this.reportOffset.hasRemaining();
}

該方法向主服務(wù)器上報已拉取偏移量,具體做法是將 ByteBuffer 讀取位置 position 值為 0,其實(shí)跳用 flip() 方法也可以,然后調(diào)用 putLong() 方法將 maxOffset 寫入 ByteBuffer,將 limit 設(shè)置為 8,跟寫入 ByteBuffer 中的 maxOffset(long 型)大小一樣,最后采取 for 循環(huán)將 maxOffset 寫入網(wǎng)絡(luò)通道中,并調(diào)用 hasRemaining() 方法,該方法的邏輯為判斷 position 是否小于 limit,即判斷 ByteBuffer 中的字節(jié)流是否全部寫入到通道中。

Master -> Slave 過程

org.apache.rocketmq.store.ha.HAService.AcceptSocketService#run:

public void run() {
  log.info(this.getServiceName() + " service started");

  while (!this.isStopped()) {
    try {
      this.selector.select(1000);
      Set<SelectionKey> selected = this.selector.selectedKeys();

      if (selected != null) {
        for (SelectionKey k : selected) {
          if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
            SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

            if (sc != null) {
              HAService.log.info("HAService receive new connection, "
                                 + sc.socket().getRemoteSocketAddress());

              try {
                HAConnection conn = new HAConnection(HAService.this, sc);
                conn.start();
                HAService.this.addConnection(conn);
              } catch (Exception e) {
                log.error("new HAConnection exception", e);
                sc.close();
              }
            }
          } else {
            log.warn("Unexpected ops in select " + k.readyOps());
          }
        }

        selected.clear();
      }
    } catch (Exception e) {
      log.error(this.getServiceName() + " service has exception.", e);
    }
  }

  log.info(this.getServiceName() + " service end");
}

主服務(wù)器收到從服務(wù)器的拉取偏移量后,會封裝成一個 HAConnection 對象,前面也說過 HAConnection 封裝主服務(wù)端 HA 連接對象的封裝,其中有讀實(shí)現(xiàn)類和寫實(shí)現(xiàn)類,start() 方法即開啟了讀寫線程:

org.apache.rocketmq.store.ha.HAConnection#start:

public void start() {
  this.readSocketService.start();
  this.writeSocketService.start();
}

org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#processReadEvent:

private boolean processReadEvent() {
  int readSizeZeroTimes = 0;

  if (!this.byteBufferRead.hasRemaining()) {
    this.byteBufferRead.flip();
    this.processPostion = 0;
  }

  while (this.byteBufferRead.hasRemaining()) {
    try {
      int readSize = this.socketChannel.read(this.byteBufferRead);
      if (readSize > 0) {
        readSizeZeroTimes = 0;
        this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
        if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
          int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
          // 從網(wǎng)絡(luò)通道中讀取從服務(wù)器上報的偏移量
          long readOffset = this.byteBufferRead.getLong(pos - 8);
          this.processPostion = pos;

          // 同步從服務(wù)器偏移量
          HAConnection.this.slaveAckOffset = readOffset;
          if (HAConnection.this.slaveRequestOffset < 0) {
            HAConnection.this.slaveRequestOffset = readOffset;
            log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
          }

          // 這里主要是同步后需要喚醒相關(guān)消息發(fā)送線程,實(shí)現(xiàn)主從同步是異步還是同步的功能
          HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
        }
      } else if (readSize == 0) {
        if (++readSizeZeroTimes >= 3) {
          break;
        }
      } else {
        log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
        return false;
      }
    } catch (IOException e) {
      log.error("processReadEvent exception", e);
      return false;
    }
  }

  return true;
}

從以上源碼可看出,主服務(wù)器接收到從服務(wù)器上報的偏移量后,主要作了兩件事:

  1. 獲取從服務(wù)器上報的偏移量;
  2. 喚醒主從同步消費(fèi)者發(fā)送消息同步返回的線程,該方法實(shí)現(xiàn)了主從同步-同步復(fù)制的功能。

org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run:

public void run() {
  HAConnection.log.info(this.getServiceName() + " service started");

  while (!this.isStopped()) {
    try {
      this.selector.select(1000);

      // 如果slaveRequestOffset=-1,說明讀線程還沒有獲取從服務(wù)器的偏移量,繼續(xù)循環(huán)等待
      if (-1 == HAConnection.this.slaveRequestOffset) {
        Thread.sleep(10);
        continue;
      }

      // 如果nextTransferFromWhere=-1,說明線程剛開始執(zhí)行數(shù)據(jù)傳輸
      if (-1 == this.nextTransferFromWhere) {
        // 如果slaveRequestOffset=0,說明從服務(wù)器是第一次上報偏移量
        if (0 == HAConnection.this.slaveRequestOffset) {
          // 獲取最后一個 commitLog 文件且還未讀取消費(fèi)的偏移量
          long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
          // 求出最后一個commitLog偏移量的初始偏移量
          masterOffset =
            masterOffset
            - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
               .getMapedFileSizeCommitLog());

          if (masterOffset < 0) {
            masterOffset = 0;
          }

          // 更新 nextTransferFromWhere
          this.nextTransferFromWhere = masterOffset;
        } else {
          // 如果slaveRequestOffset!=0,則將該值賦值給nextTransferFromWhere
          this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
        }

        log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                 + "], and slave request " + HAConnection.this.slaveRequestOffset);
      }

      // 判斷上次寫事件是否已全部寫完成
      if (this.lastWriteOver) {

        // 計算是否已到發(fā)送心跳包時間
        long interval =
          HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
        // 發(fā)送心跳包,以保持長連接
        if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
            .getHaSendHeartbeatInterval()) {
          // Build Header
          this.byteBufferHeader.position(0);
          this.byteBufferHeader.limit(headerSize);
          this.byteBufferHeader.putLong(this.nextTransferFromWhere);
          this.byteBufferHeader.putInt(0);
          this.byteBufferHeader.flip();
          this.lastWriteOver = this.transferData();
          if (!this.lastWriteOver)
            continue;
        }
      } else {
        this.lastWriteOver = this.transferData();
        if (!this.lastWriteOver)
          continue;
      }

      // 獲取同步消息數(shù)據(jù)
      SelectMappedBufferResult selectResult =      HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
      if (selectResult != null) {
        int size = selectResult.getSize();
        if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
          size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
        }

        long thisOffset = this.nextTransferFromWhere;
        this.nextTransferFromWhere += size;

        selectResult.getByteBuffer().limit(size);
        this.selectMappedBufferResult = selectResult;

        // Build Header
        this.byteBufferHeader.position(0);
        this.byteBufferHeader.limit(headerSize);
        this.byteBufferHeader.putLong(thisOffset);
        this.byteBufferHeader.putInt(size);
        this.byteBufferHeader.flip();

        // 傳輸消息到從服務(wù)器
        this.lastWriteOver = this.transferData();
      } else {

        HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
      }
    } catch (Exception e) {

      HAConnection.log.error(this.getServiceName() + " service has exception.", e);
      break;
    }
  }

  if (this.selectMappedBufferResult != null) {
    this.selectMappedBufferResult.release();
  }

  this.makeStop();

  readSocketService.makeStop();

  haService.removeConnection(HAConnection.this);

  SelectionKey sk = this.socketChannel.keyFor(this.selector);
  if (sk != null) {
    sk.cancel();
  }

  try {
    this.selector.close();
    this.socketChannel.close();
  } catch (IOException e) {
    HAConnection.log.error("", e);
  }

  HAConnection.log.info(this.getServiceName() + " service end");
}

讀實(shí)現(xiàn)類實(shí)現(xiàn)邏輯比較長,但主要做了以下幾件事情:

  1. 計算需要拉取的偏移量,如果從服務(wù)器第一次拉取,則從最后一個 commitLog 文件的初始偏移量開始同步;
  2. 傳輸消息到從服務(wù)器;
  3. 發(fā)送心跳包到從服務(wù)器,保持長連接。

關(guān)于第一步,我還需要詳細(xì)講解一下,因?yàn)橹坝邢氲揭粋€問題:

把 brokerA 的從服務(wù)器去掉,再啟動一臺新的從服務(wù)器指向brokerA 主服務(wù)器,這時的主服務(wù)器的消息是否會全量同步到從服務(wù)?

org.apache.rocketmq.store.MappedFileQueue#getMaxOffset:

public long getMaxOffset() {
    MappedFile mappedFile = getLastMappedFile();
    if (mappedFile != null) {
        return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
    }
    return 0;
}

org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run:

// 求出最后一個commitLog偏移量的初始偏移量
masterOffset =
  masterOffset
            - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
               .getMapedFileSizeCommitLog());

從以上邏輯可找到答案,如果有新的從服務(wù)器同步主服務(wù)器消息,則從最后一個 commitLog 文件的初始偏移量開始同步。

回到最開始開啟 HAClient 線程上報偏移量的方法,我們發(fā)現(xiàn)里面還做了一件事:

// 處理主服務(wù)器發(fā)送過來的消息
boolean ok = this.processReadEvent();

org.apache.rocketmq.store.ha.HAService.HAClient#processReadEvent:

private boolean processReadEvent() {
  int readSizeZeroTimes = 0;
  while (this.byteBufferRead.hasRemaining()) {
    try {
      int readSize = this.socketChannel.read(this.byteBufferRead);
      if (readSize > 0) {
        lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
        readSizeZeroTimes = 0;
        // 讀取消息并寫入commitLog文件中
        boolean result = this.dispatchReadRequest();
        if (!result) {
          log.error("HAClient, dispatchReadRequest error");
          return false;
        }
      } else if (readSize == 0) {
        if (++readSizeZeroTimes >= 3) {
          break;
        }
      } else {
        // TODO ERROR
        log.info("HAClient, processReadEvent read socket < 0");
        return false;
      }
    } catch (IOException e) {
      log.info("HAClient, processReadEvent read socket exception", e);
      return false;
    }
  }

  return true;
}

該方法用于處理主服務(wù)器發(fā)送回來的消息數(shù)據(jù),這里用了 while 循環(huán)的處理,不斷地從 byteBuffer 讀取數(shù)據(jù)到緩沖區(qū)中,最后調(diào)用 dispatchReadRequest 方法將消息數(shù)據(jù)寫入 commitLog 文件中,完成主從復(fù)制最后一個步驟。

最后貼上《RocketMQ 技術(shù)內(nèi)幕》這本書的一張 RocketMQ HA 交互圖:

image
公眾號「后端進(jìn)階」,專注后端技術(shù)分享!
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容