接這篇
在上文中,主要實現(xiàn)了可靠模式的consumer。而可靠模式的sender實現(xiàn)的相對簡略,主要通過rabbitTemplate來完成。
本以為這樣的實現(xiàn)基本是沒有問題的。但是前段時間做了一個性能壓力測試,但是發(fā)現(xiàn)在使用rabbitTemplate時,會有一定的丟數(shù)據(jù)問題。
當(dāng)時的場景是用30個線程,無間隔的向rabbitmq發(fā)送數(shù)據(jù),但是當(dāng)運行一段時間后發(fā)現(xiàn),會出現(xiàn)一些connection closed錯誤,rabbitTemplate雖然進(jìn)行了自動重連,但是在重連的過程中,丟失了一部分?jǐn)?shù)據(jù)。當(dāng)時發(fā)送了300萬條數(shù)據(jù),丟失在2000條左右。
這種丟失率,對于一些對一致性要求很高的應(yīng)用(比如扣款,轉(zhuǎn)賬)來說,是不可接受的。
在google了很久之后,在stackoverflow上找到rabbitTemplate作者對于這種問題的解決方案,他給的方案很簡單,單純的增加connection數(shù):
connectionFactory.setChannelCacheSize(100);
修改之后,確實不再出現(xiàn)connection closed這種錯誤了,在發(fā)送了3000萬條數(shù)據(jù)后,一條都沒有丟失。
似乎問題已經(jīng)完美的解決了,但是我又想到一個問題:當(dāng)我們的網(wǎng)絡(luò)在發(fā)生抖動時,這種方式還是不是安全的?
換句話說,如果我強(qiáng)制切斷客戶端和rabbitmq服務(wù)端的連接,數(shù)據(jù)還會丟失嗎?
為了驗證這種場景,我重新發(fā)送300萬條數(shù)據(jù),在發(fā)送過程中,在rabbitmq的管理界面上點擊強(qiáng)制關(guān)閉連接:

然后發(fā)現(xiàn),仍然存在丟失數(shù)據(jù)的問題。
看來這個問題,沒有想象中的那么簡單了。
在閱讀了部分rabbitTemplate的代碼之后發(fā)現(xiàn):
1 rabbitTemplate的ack確認(rèn)機(jī)制是異步的
2 這種確認(rèn)機(jī)制是一種事后發(fā)現(xiàn)機(jī)制,并不能同步的發(fā)現(xiàn)問題
也就是說,即便打開了
connectionFactory.setPublisherConfirms(true);
rabbitTemplate.setMandatory(true);
并且實現(xiàn)了:
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.info("send message failed: " + cause + correlationData.toString());
}
});
依舊是不安全的。
rabbitTemplate的發(fā)送流程是這樣的:
1 發(fā)送數(shù)據(jù)并返回(不確認(rèn)rabbitmq服務(wù)器已成功接收)
2 異步的接收從rabbitmq返回的ack確認(rèn)信息
3 收到ack后調(diào)用confirmCallback函數(shù)
注意:在confirmCallback中是沒有原message的,所以無法在這個函數(shù)中調(diào)用重發(fā),confirmCallback只有一個通知的作用
在這種情況下,如果在2,3步中任何時候切斷連接,我們都無法確認(rèn)數(shù)據(jù)是否真的已經(jīng)成功發(fā)送出去,從而造成數(shù)據(jù)丟失的問題。
最完美的解決方案只有1種:
使用rabbitmq的事務(wù)機(jī)制。
但是在這種情況下,rabbitmq的效率極低,每秒鐘處理的message在幾百條左右。實在不可取。
第二種解決方式,使用同步的發(fā)送機(jī)制,也就是說,客戶端發(fā)送數(shù)據(jù),rabbitmq收到后返回ack,再收到ack后,send函數(shù)才返回。代碼類似這樣:
創(chuàng)建channel
send message
wait for ack(or 超時)
close channel
返回成功or失敗
同樣的,由于每次發(fā)送message都要重新建立連接,效率很低。
基于上面的分析,我們使用一種新的方式來做到數(shù)據(jù)的不丟失。
在rabbitTemplate異步確認(rèn)的基礎(chǔ)上
1 在本地緩存已發(fā)送的message
2 通過confirmCallback或者被確認(rèn)的ack,將被確認(rèn)的message從本地刪除
3 定時掃描本地的message,如果大于一定時間未被確認(rèn),則重發(fā)
當(dāng)然了,這種解決方式也有一定的問題:
想象這種場景,rabbitmq接收到了消息,在發(fā)送ack確認(rèn)時,網(wǎng)絡(luò)斷了,造成客戶端沒有收到ack,重發(fā)消息。(相比于丟失消息,重發(fā)消息要好解決的多,我們可以在consumer端做到冪等)。
自動重試的代碼如下:
public class RetryCache {
private MessageSender sender;
private boolean stop = false;
private Map<String, MessageWithTime> map = new ConcurrentHashMap<>();
private AtomicLong id = new AtomicLong();
@NoArgsConstructor
@AllArgsConstructor
@Data
private static class MessageWithTime {
long time;
Object message;
}
public void setSender(MessageSender sender) {
this.sender = sender;
startRetry();
}
public String generateId() {
return "" + id.incrementAndGet();
}
public void add(String id, Object message) {
map.put(id, new MessageWithTime(System.currentTimeMillis(), message));
}
public void del(String id) {
map.remove(id);
}
private void startRetry() {
new Thread(() ->{
while (!stop) {
try {
Thread.sleep(Constants.RETRY_TIME_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
long now = System.currentTimeMillis();
for (String key : map.keySet()) {
MessageWithTime messageWithTime = map.get(key);
if (null != messageWithTime) {
if (messageWithTime.getTime() + 3 * Constants.VALID_TIME < now) {
log.info("send message failed after 3 min " + messageWithTime);
del(key);
} else if (messageWithTime.getTime() + Constants.VALID_TIME < now) {
DetailRes detailRes = sender.send(messageWithTime.getMessage());
if (detailRes.isSuccess()) {
del(key);
}
}
}
}
}
}).start();
}
}
在client端發(fā)送之前,先在本地緩存message,代碼如下:
@Override
public DetailRes send(Object message) {
try {
String id = retryCache.generateId();
retryCache.add(id, message);
rabbitTemplate.correlationConvertAndSend(message, new CorrelationData(id));
} catch (Exception e) {
return new DetailRes(false, "");
}
return new DetailRes(true, "");
}
在收到ack時刪除本地緩存,代碼如下:
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.info("send message failed: " + cause + correlationData.toString());
} else {
retryCache.del(correlationData.getId());
}
});
再次驗證剛才的場景,發(fā)送300w條數(shù)據(jù),在發(fā)送的過程中過一段時間close一次connection,發(fā)送結(jié)束后,實際發(fā)送數(shù)據(jù)301.2w條,有一些重復(fù),但是沒有丟失數(shù)據(jù)。
同時需要驗證本地緩存的內(nèi)存泄露問題,程序連續(xù)發(fā)送1.5億條數(shù)據(jù),內(nèi)存占用穩(wěn)定在900M,并沒有明顯的波動。
最后貼一下rabbitmq的性能測試數(shù)據(jù):
1 300w條1k的數(shù)據(jù),單機(jī)部署rabbitmq(8核,32G)
在ack確認(rèn)模式下平均發(fā)送效率為1.1w條/秒
非ack確認(rèn)模式下平均發(fā)送效率為1.6w條/秒
2 300w條1k的數(shù)據(jù),cluster模式部署3臺(8核*3, 32G*3)
在ack確認(rèn)模式下平均發(fā)送效率為1.3w條/秒
非ack確認(rèn)模型下平均發(fā)送效率為1.7w條/秒
3 300w條1k的數(shù)據(jù),單機(jī)部署rabbitmq(8核,32G)
在ack確認(rèn)模式下平均消費效率為9000條/秒
4 300w條1k的數(shù)據(jù),cluster模式部署3臺(8核*3, 32G*3)
在ack確認(rèn)模式下平均消費效率為1w條/秒
代碼地址:
幫忙點個星星,謝謝-_-