引導
?前面介紹了RocketMQ的CommitLog文件相關(guān)的類分析CommitLog物理日志相關(guān)的CommitLog類。其中有介紹到消息刷盤時高可用對應的handleHA方法,handleHA方法中如果配置的服務(wù)器的角色為SYNC_MASTER(從master同步),就會等待主從之間消息同步的進度達到設(shè)定的值之后才正常返回,如果超時則返回同步超時
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
//如果設(shè)置的主從之間是同步更新
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
// 檢查slave同步的位置是否小于 最大容忍的同步落后偏移量參數(shù) haSlaveFallbehindMax,如果是的則進行主從同步刷盤
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
//countDownLatch.await 同步等待刷新,除非等待超時
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
//設(shè)置從服務(wù)不可用的狀態(tài)
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
}
?這段代碼的主要邏輯如下:
- 如果服務(wù)器的角色設(shè)置為
SYNC_MASTER,則進行下一步,否則直接跳過主從同步 - 獲取
HAService對象,檢查消息是否本地存儲完畢,如果沒有則結(jié)束,否則進入下一步 - 檢查slave同步的位置是否小于 最大容忍的同步落后偏移量參數(shù)
haSlaveFallbehindMax,如果是的則進行主從同步刷盤。如果沒有則返回slave不可用的狀態(tài) - 將消息落盤的最大物理偏移量也就是CommitLog上的偏移量作為參數(shù)構(gòu)建一個
GroupCommitRequest對象,然后提交到HAService - 最多等待
syncFlushTimeout長的時間,默認為5秒。在5秒內(nèi)獲取結(jié)果,然后根據(jù)結(jié)果判斷是否返回超時
同步流程
?上面那段代碼比較簡單,因為主從的邏輯全部交給了HAService和HAConnection兩個類處理了。這里先簡單介紹一下整個同步的流程(同步模式)
?這個題可能不好理解,等源碼邏輯分析完之后再看可能會清楚點。
高可用服務(wù)HAService
?HAService是在RocketMQ的Broker啟動的時候就會創(chuàng)建的,而創(chuàng)建的點在DefaultMessageStore這個消息存儲相關(guān)的綜合類中,在這個類的構(gòu)造器中會創(chuàng)建HAService無論當前的Broker是什么角色。這個類后續(xù)會有文章分析
?這里需要說明的是Broker中的Master和Slaver兩個角色,代碼都是一樣的,只不過是在實際執(zhí)行的時候,走的分支不一樣
內(nèi)部屬性
?在HAService中有幾個比較重要的屬性,這里需要簡單的介紹一下
| 參數(shù) | 說明 |
|---|---|
| connectionList | 連接到master的slave連接列表,用于管理連接 |
| acceptSocketService | 用于接收連接用的服務(wù),只監(jiān)聽OP_ACCEPT事件,監(jiān)聽到連接事件時候,創(chuàng)建HAConnection來處理讀寫請求事件 |
| waitNotifyObject | 一個消費等待模型類,用于處理高可用線程和CommitLog的刷盤線程交互 |
| push2SlaveMaxOffset | master同步到slave的偏移量 |
| groupTransferService | 主從同步的檢測服務(wù),用于檢查是否同步完成 |
| haClient | 高可用的服務(wù),slave用來跟master建立連接,像master匯報偏移量和拉取消息 |
/**
* 連接到本機的數(shù)量
*/
private final AtomicInteger connectionCount = new AtomicInteger(0);
/**
* 連接列表,用于管理連接
*/
private final List<HAConnection> connectionList = new LinkedList<>();
/**
* 接受和監(jiān)聽slave的連接服務(wù)
*/
private final AcceptSocketService acceptSocketService;
private final DefaultMessageStore defaultMessageStore;
/**
* 一個服務(wù)消費者線程模型的對象,用于跟CommitLog的刷盤線程交互
*/
private final WaitNotifyObject waitNotifyObject = new WaitNotifyObject();
/**
* master跟slave 消息同步的位移量
*/
private final AtomicLong push2SlaveMaxOffset = new AtomicLong(0);
/**
* 主從信息同步的服務(wù)
*/
private final GroupTransferService groupTransferService;
/**
* 操作主從的類
*/
private final HAClient haClient;
構(gòu)造函數(shù)
?HAService只有一個構(gòu)造器。邏輯也比較簡單,創(chuàng)建一個AcceptSocketService開放一個端口為 10912的端口用于slave來簡歷連接,同時啟動主從信息同步的任務(wù)groupTransferService用于接收CommitLog在高可用刷盤時提交任務(wù)
public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
//創(chuàng)建,接受連接的服務(wù), 開放的端口號為10912
this.acceptSocketService =
new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
//創(chuàng)建主從信息同步的線程
this.groupTransferService = new GroupTransferService();
this.haClient = new HAClient();
}
內(nèi)部類分析
?HAService在創(chuàng)建之后,會在DefaultMessageStore中調(diào)用其start方法,這個方法會啟動其內(nèi)部的幾個內(nèi)部類,用來主從同步
public void start() throws Exception {
//接受連接的服務(wù),開啟端口,設(shè)置監(jiān)聽的事件
this.acceptSocketService.beginAccept();
//開啟服務(wù)不斷檢查是否有連接
this.acceptSocketService.start();
//開啟groupTransferService,接受CommitLog的主從同步請求
this.groupTransferService.start();
//開啟haClient,用于slave來建立與Master連接和同步
this.haClient.start();
}
?接下來對這幾個內(nèi)部類進行分析
用于接受Slave連接的AcceptSocketService
?AcceptSocketService這個類在Broker的Master和Slaver兩個角色啟動時都會創(chuàng)建,只不過區(qū)別是Slaver開啟端口之后,并不會有別的Broker與其建立連接。因為只有在Broker的角色是Slave的時候才會指定要連接的Master地址。這個邏輯,在Broker啟動的時候BrokerController類中運行的。
public void beginAccept() throws Exception {
//創(chuàng)建ServerSocketChannel
this.serverSocketChannel = ServerSocketChannel.open();
//創(chuàng)建selector
this.selector = RemotingUtil.openSelector();
//設(shè)置SO_REUSEADDR https://blog.csdn.net/u010144805/article/details/78579528
this.serverSocketChannel.socket().setReuseAddress(true);
//設(shè)置綁定的地址
this.serverSocketChannel.socket().bind(this.socketAddressListen);
//設(shè)置為非阻塞模式
this.serverSocketChannel.configureBlocking(false);
//注冊監(jiān)聽事件為 連接事件
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
?beginAccept方法就是開啟Socket,綁定10912端口,然后注冊selector和指定監(jiān)聽的事件為OP_ACCEPT也就是建立連接事件。對應的IO模式為NIO模式。主要看其run方法,這個方法是Master用來接受Slave連接的核心。
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//設(shè)置阻塞等待時間
this.selector.select(1000);
//獲取selector 下的所有selectorKey ,后續(xù)迭代用
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
//檢測有連接事件的selectorKey
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
//獲取selectorKey的Channel
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
//創(chuàng)建HAConnection,建立連接
HAConnection conn = new HAConnection(HAService.this, sc);
//建立連接
conn.start();
//添加連接到連接列表中
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
//清空連接事件,未下一次做準備
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
?這里的邏輯比較簡單。就是每過一秒檢查一次是否有連接事件,如果有則建立連接,并把建立起來的連接加入到連接列表中進行保存。一直循環(huán)這個邏輯。
檢查同步進度和喚醒CommitLog刷盤線程的GroupTransferService
?GroupTransferService是CommitLog消息刷盤的類CommitLog與HAService打交道的一個中間類。在CommitLog中進行主從刷盤的時候,會創(chuàng)建一個CommitLog.GroupCommitRequest的內(nèi)部類,這個類包含了當前Broker最新的消息的物理偏移量信息。然后把這個類丟給GroupTransferService處理,然后喚醒GroupTransferService。起始這個邏輯跟CommitLog內(nèi)部的GroupCommitService邏輯一樣。只不過對于同步部分的邏輯不一樣,這里可以參考前面的文章存儲部分(3)CommitLog物理日志相關(guān)的CommitLog類。
?先看run方法
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
/**
* 這里進入等待,等待被喚醒,進入等待之前會調(diào)用onWaitEnd方法,然后調(diào)用swapRequests方法,
* 吧requestsWrite轉(zhuǎn)換為requestsRead
*/
this.waitForRunning(10);
/**
* 進行請求處理
*/
this.doWaitTransfer();
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
?在run方法中會將傳入的CommitLog.GroupCommitRequest從requestsWrite轉(zhuǎn)換到requestsRead中然后進行處理檢查對應的同步請求的進度。檢查的邏輯在doWaitTransfer中
private void doWaitTransfer() {
//對requestsRead請求加鎖
synchronized (this.requestsRead) {
//如果讀請求不為空
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
//如果push到slave的偏移量 大于等于 請求中的消息的最大偏移量 表示slave同步完成
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
//計算這次同步超時的時間點 同步的超時時間段為5s
long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
//如果沒有同步完畢,并且還沒達到超時時間,則等待1秒之后檢查同步的進度,大概檢查5次
while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
//如果超時了,檢查是不是同步完成了,
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
//超時或者同步成功的時候 喚醒主線程
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
this.requestsRead.clear();
}
}
}
?主要邏輯如下:
- 比較Master推送到Slave的 偏移量
push2SlaveMaxOffset是不是大于傳進來的CommitLog.GroupCommitRequest中的偏移量 - 計算本次同步超時的時間節(jié)點,時間為當前時間加上參數(shù)系統(tǒng)配置參數(shù)
syncFlushTimeout默認為5秒 - 如果第一步結(jié)果為true,則返回結(jié)果為
PUT_OK。如果第一步為false,則每過一秒檢查一次結(jié)果,如果超過5次了還沒同步完成,則表示超時了那么返回結(jié)果為FLUSH_SLAVE_TIMEOUT。同時會喚醒CommitLog的刷盤線程。
與Slave緊密相關(guān)的HAClient
?前面我們說到了只有是Salve角色的Broker才會真正的配置Master的地址,而HAClient是需要Master地址的,因此這個類真正在運行的時候只有Slave才會真正的使用到。
?先看看核心的參數(shù)信息
//Socket讀緩存區(qū)大小
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
//master地址
private final AtomicReference<String> masterAddress = new AtomicReference<>();
//Slave向Master發(fā)起主從同步的拉取偏移量,固定8個字節(jié)
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
private SocketChannel socketChannel;
private Selector selector;
//上次同步偏移量的時間戳
private long lastWriteTimestamp = System.currentTimeMillis();
//反饋Slave當前的復制進度,commitlog文件最大偏移量
private long currentReportedOffset = 0;
private int dispatchPosition = 0;
//讀緩沖大小
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
?基本上都是緩沖相關(guān)的配置。這里主要分析的是run方法中的邏輯
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//連接master,同時監(jiān)聽讀請求事件
if (this.connectMaster()) {
//是否需要匯報偏移量,間隔需要大于心跳的時間(5s)
if (this.isTimeToReportOffset()) {
//向master 匯報當前 salve 的CommitLog的最大偏移量,并記錄這次的同步時間
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
//如果匯報完了就關(guān)閉連接
if (!result) {
this.closeMaster();
}
}
this.selector.select(1000);
//向master拉取的信息
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
//再次同步slave的偏移量如果,最新的偏移量大于已經(jīng)匯報的情況下
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
//檢查時間距離上次同步進度的時間間隔
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
//如果間隔大于心跳的時間,那么就關(guān)閉
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
//等待
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
?主要的邏輯如下:
- 連接master,如果當前的broker角色是master,那么對應的
masterAddress是空的,不會有后續(xù)邏輯。如果是slave,并且配置了master地址,則會進行連接進行后續(xù)邏輯處理 - 檢查是否需要向master匯報當前的同步進度,如果兩次同步的時間小于5s,則不進行同步。每次同步之間間隔在5s以上,這個5s是心跳連接的間隔參數(shù)為
haSendHeartbeatInterval - 向master 匯報當前 salve 的CommitLog的最大偏移量,并記錄這次的同步時間
- 從master拉取日志信息,主要就是進行消息的同步,同步出問題則關(guān)閉連接
- 再次同步slave的偏移量,如果最新的偏移量大于已經(jīng)匯報的情況下則從步驟1重頭開始
?這里分析完了run方法,然后就要分析主要的日志同步的邏輯了,這個邏輯在processReadEvent方法中
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
//如果讀取緩存還有沒讀取完,則一直讀取
while (this.byteBufferRead.hasRemaining()) {
try {
//從master讀取消息
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
//分發(fā)請求
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
private boolean dispatchReadRequest() {
//請求的頭信息
final int msgHeaderSize = 8 + 4; // phyoffset + size
//獲取請求長度
int readSocketPos = this.byteBufferRead.position();
while (true) {
//獲取分發(fā)的偏移差
int diff = this.byteBufferRead.position() - this.dispatchPosition;
//如果偏移差大于頭大小,說明存在請求體
if (diff >= msgHeaderSize) {
//獲取主master的最大偏移量
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
//獲取消息體
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
//獲取salve的最大偏移量
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
//如果偏移差大于 消息頭和 消息體大小。則讀取消息體
if (diff >= (msgHeaderSize + bodySize)) {
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
this.byteBufferRead.get(bodyData);
//吧消息同步到slave的 CommitLog
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
this.byteBufferRead.position(readSocketPos);
//記錄分發(fā)的位置
this.dispatchPosition += msgHeaderSize + bodySize;
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
}
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
?每一步的邏輯都是比較清楚的,這里不進行講解。
Master用來同步日志用的HAConnection
?前面說過,在HAService的AcceptSocketService內(nèi)部類中,Master會在建立連接的時候創(chuàng)建HAConnection用來處理讀寫事件。這里主要介紹構(gòu)造函數(shù)和內(nèi)部類就能了解原理了。
構(gòu)造函數(shù)
public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
//指定所屬的 HAService
this.haService = haService;
//指定的NIO的socketChannel
this.socketChannel = socketChannel;
//客戶端的地址
this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
//這是為非阻塞
this.socketChannel.configureBlocking(false);
/**
* 是否啟動SO_LINGER
* SO_LINGER作用
* 設(shè)置函數(shù)close()關(guān)閉TCP連接時的行為。缺省close()的行為是,如果有數(shù)據(jù)殘留在socket發(fā)送緩沖區(qū)中則系統(tǒng)將繼續(xù)發(fā)送這些數(shù)據(jù)給對方,等待被確認,然后返回。
*
* https://blog.csdn.net/u012635648/article/details/80279338
*/
this.socketChannel.socket().setSoLinger(false, -1);
/**
* 是否開啟TCP_NODELAY
* https://blog.csdn.net/lclwjl/article/details/80154565
*/
this.socketChannel.socket().setTcpNoDelay(true);
//接收緩沖的大小
this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
//發(fā)送緩沖的大小
this.socketChannel.socket().setSendBufferSize(1024 * 64);
//端口寫服務(wù)
this.writeSocketService = new WriteSocketService(this.socketChannel);
//端口讀服務(wù)
this.readSocketService = new ReadSocketService(this.socketChannel);
//增加haService中的連接數(shù)字段
this.haService.getConnectionCount().incrementAndGet();
}
內(nèi)部類分析
監(jiān)聽slave日志同步進度和同步日志的WriteSocketService
WriteSocketService監(jiān)聽的是OP_WRITE事件,注冊的端口就是在HAService中開啟的端口。直接看對應的核心方法run方法,方法有點長這里只看看核心的部分
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
//如果slave的讀請求為 -1 表示沒有slave 發(fā)出寫請求,不需要處理
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
//nextTransferFromWhere 為-1 表示初始第一次同步,需要進行計算
if (-1 == this.nextTransferFromWhere) {
//如果slave 同步完成 則下次同步從CommitLog的最大偏移量開始同步
if (0 == HAConnection.this.slaveRequestOffset) {
//獲取master 上面的 CommitLog 最大偏移量
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
//設(shè)置下次開始同步的位置
this.nextTransferFromWhere = masterOffset;
} else {
//設(shè)置下次同步的位置,為 salve 讀請求的位置
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
//上次同步是否完成
if (this.lastWriteOver) {
//獲取兩次寫請求的周期時間
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
//如果周期大于 心跳間隔 。需要先發(fā)送一次心跳 心跳間隔為 5000毫秒
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// 創(chuàng)建請求頭,心跳請求大小為12 字節(jié)
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
//進行消息同步
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else {
//如果上次同步?jīng)]有完成,則繼續(xù)同步
this.lastWriteOver = this.transferData();
//如果還沒同步完成則繼續(xù)
if (!this.lastWriteOver)
continue;
}
//獲取開始同步位置之后的消息
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
//
if (selectResult != null) {
int size = selectResult.getSize();
//檢查要同步消息的長度,是不是大于單次同步的最大限制 默認為 32kb
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
} else {
//如果需要同步的為空,則在等待100毫秒
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
?主要的邏輯如下:
- 如果slave進行了日志偏移量的匯報,判斷是不是第一次的進行同步以及對應的同步進度。設(shè)置下一次的同步位置
- 檢查上次同步是不是已經(jīng)完成了,檢查兩次同步的周期是不是超過心跳間隔,如果是的則需要把心跳信息放到返回的頭里面,然后進行消息同步。如果上次同步還沒完成,則等待上次同步完成之后再繼續(xù)
- 從Master本地讀取CommitLog的最大偏移量,根據(jù)上次同步的位置開始從CommitLog獲取日志信息,然后放到緩存中
- 如果緩存的大小大于單次同步的最大大小
haTransferBatchSize默認是32kb,那么只同步32kb大小的日志。如果緩存為null,則等待100毫秒
?其中日志同步的邏輯在transferData方法中,這里就把代碼貼出來
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
//心跳的頭沒寫滿,先寫頭
while (this.byteBufferHeader.hasRemaining()) {
//把請求頭傳過去
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
//記錄上次寫的時間
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
//重試3次 則不再重試
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write header error < 0");
}
}
//如果要同步的日志為null,則直接返回這次同步的結(jié)果是否同步完成
if (null == this.selectMappedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// 填充請求體
if (!this.byteBufferHeader.hasRemaining()) {
//如果還沒有同步完成,則一直同步
while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
//同步的大小
int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
//重試3次
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write body error < 0");
}
}
}
boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
//釋放緩存
if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
this.selectMappedBufferResult.release();
this.selectMappedBufferResult = null;
}
return result;
}
根據(jù)同步進度來喚醒刷盤CommitLog線程的ReadSocketService
ReadSocketService的作用主要是:根據(jù)Slave推送的日志同步進度,來喚醒HAService的GroupTransferService然后進一步喚醒CommitLog的日志刷盤線程。這里主要看run方法和processReadEvent方法。
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
//任務(wù)是否結(jié)束
while (!this.isStopped()) {
try {
//設(shè)置selector的阻塞時間
this.selector.select(1000);
//處理salver讀取消息的事件
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
//檢查此次處理時間是否超過心跳連接時間
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
......
}
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
//檢查 讀取請求緩沖是否已經(jīng)滿了,
if (!this.byteBufferRead.hasRemaining()) {
//讀請求緩沖轉(zhuǎn)變?yōu)樽x取模式。
this.byteBufferRead.flip();
this.processPosition = 0;
}
while (this.byteBufferRead.hasRemaining()) {
try {
//從byteBufferRead讀取
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
//讀取請求緩沖的位置 如果大于處理的8字節(jié) 表示有讀取的請求沒處理。為什么是8個字節(jié),因為salver向master發(fā)去拉取請求時,偏移量固定為8
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
//獲取消息開始的位置
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
//從開始位置讀取8個字節(jié),獲取 slave的讀請求偏移量
long readOffset = this.byteBufferRead.getLong(pos - 8);
//設(shè)置處理的位置
this.processPosition = pos;
//設(shè)置 salver讀取的位置
HAConnection.this.slaveAckOffset = readOffset;
//如果slave的 讀請求 偏移量小于0 表示同步完成了
if (HAConnection.this.slaveRequestOffset < 0) {
//重新設(shè)置slave的 讀請求的 偏移量
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
}
//喚醒阻塞的線程, 在消息的主從同步選擇的模式是同步的時候,會喚醒被阻塞的消息寫入的線程
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
//如果數(shù)據(jù)為0超過3次,表示同步完成,直接結(jié)束
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
?整體的邏輯如下:
- 每1s執(zhí)行一次事件就緒選擇,然后調(diào)用processReadEvent方法處理讀請求,讀取從服務(wù)器的拉取請求
- 獲取slave已拉取偏移量,因為有新的從服務(wù)器反饋拉取進度,需要通知某些生產(chǎn)者以便返回,因為如果消息發(fā)送使用同步方式,需要等待將消息復制到從服務(wù)器,然后才返回,故這里需要喚醒相關(guān)線程去判斷自己關(guān)注的消息是否已經(jīng)傳輸完成。也就是
HAService的GroupTransferService - 如果讀取到的字節(jié)數(shù)等于0,則重復三次,否則結(jié)束本次讀請求處理;如果讀取到的字節(jié)數(shù)小于0,表示連接被斷開,返回false,后續(xù)會斷開該連接。
總結(jié)
?RocketMQ的主從同步之間的核心類就是HAService和HAConnection和其中的幾個子類。結(jié)合前面的那個圖可以簡單的理解一下。