rabbitmq可靠發(fā)送的自動重試機(jī)制

http://www.itdecent.cn/p/4112d78a8753

接這篇

在上文中,主要實現(xiàn)了可靠模式的consumer。而可靠模式的sender實現(xiàn)的相對簡略,主要通過rabbitTemplate來完成。
本以為這樣的實現(xiàn)基本是沒有問題的。但是前段時間做了一個性能壓力測試,但是發(fā)現(xiàn)在使用rabbitTemplate時,會有一定的丟數(shù)據(jù)問題。

當(dāng)時的場景是用30個線程,無間隔的向rabbitmq發(fā)送數(shù)據(jù),但是當(dāng)運行一段時間后發(fā)現(xiàn),會出現(xiàn)一些connection closed錯誤,rabbitTemplate雖然進(jìn)行了自動重連,但是在重連的過程中,丟失了一部分?jǐn)?shù)據(jù)。當(dāng)時發(fā)送了300萬條數(shù)據(jù),丟失在2000條左右。
這種丟失率,對于一些對一致性要求很高的應(yīng)用(比如扣款,轉(zhuǎn)賬)來說,是不可接受的。

在google了很久之后,在stackoverflow上找到rabbitTemplate作者對于這種問題的解決方案,他給的方案很簡單,單純的增加connection數(shù):

connectionFactory.setChannelCacheSize(100);

修改之后,確實不再出現(xiàn)connection closed這種錯誤了,在發(fā)送了3000萬條數(shù)據(jù)后,一條都沒有丟失。
似乎問題已經(jīng)完美的解決了,但是我又想到一個問題:當(dāng)我們的網(wǎng)絡(luò)在發(fā)生抖動時,這種方式還是不是安全的?
換句話說,如果我強(qiáng)制切斷客戶端和rabbitmq服務(wù)端的連接,數(shù)據(jù)還會丟失嗎?

為了驗證這種場景,我重新發(fā)送300萬條數(shù)據(jù),在發(fā)送過程中,在rabbitmq的管理界面上點擊強(qiáng)制關(guān)閉連接:


然后發(fā)現(xiàn),仍然存在丟失數(shù)據(jù)的問題。

看來這個問題,沒有想象中的那么簡單了。

在閱讀了部分rabbitTemplate的代碼之后發(fā)現(xiàn):
1 rabbitTemplate的ack確認(rèn)機(jī)制是異步的
2 這種確認(rèn)機(jī)制是一種事后發(fā)現(xiàn)機(jī)制,并不能同步的發(fā)現(xiàn)問題
也就是說,即便打開了

connectionFactory.setPublisherConfirms(true);
rabbitTemplate.setMandatory(true);

并且實現(xiàn)了:

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (!ack) {
                log.info("send message failed: " + cause + correlationData.toString());
            } 
        });

依舊是不安全的。
rabbitTemplate的發(fā)送流程是這樣的:
1 發(fā)送數(shù)據(jù)并返回(不確認(rèn)rabbitmq服務(wù)器已成功接收)
2 異步的接收從rabbitmq返回的ack確認(rèn)信息
3 收到ack后調(diào)用confirmCallback函數(shù)
注意:在confirmCallback中是沒有原message的,所以無法在這個函數(shù)中調(diào)用重發(fā),confirmCallback只有一個通知的作用

在這種情況下,如果在2,3步中任何時候切斷連接,我們都無法確認(rèn)數(shù)據(jù)是否真的已經(jīng)成功發(fā)送出去,從而造成數(shù)據(jù)丟失的問題。

最完美的解決方案只有1種:
使用rabbitmq的事務(wù)機(jī)制。
但是在這種情況下,rabbitmq的效率極低,每秒鐘處理的message在幾百條左右。實在不可取。
第二種解決方式,使用同步的發(fā)送機(jī)制,也就是說,客戶端發(fā)送數(shù)據(jù),rabbitmq收到后返回ack,再收到ack后,send函數(shù)才返回。代碼類似這樣:

創(chuàng)建channel
send message
wait for ack(or 超時)
close channel
返回成功or失敗

同樣的,由于每次發(fā)送message都要重新建立連接,效率很低。

基于上面的分析,我們使用一種新的方式來做到數(shù)據(jù)的不丟失。
在rabbitTemplate異步確認(rèn)的基礎(chǔ)上
1 在本地緩存已發(fā)送的message
2 通過confirmCallback或者被確認(rèn)的ack,將被確認(rèn)的message從本地刪除
3 定時掃描本地的message,如果大于一定時間未被確認(rèn),則重發(fā)

當(dāng)然了,這種解決方式也有一定的問題
想象這種場景,rabbitmq接收到了消息,在發(fā)送ack確認(rèn)時,網(wǎng)絡(luò)斷了,造成客戶端沒有收到ack,重發(fā)消息。(相比于丟失消息,重發(fā)消息要好解決的多,我們可以在consumer端做到冪等)。
自動重試的代碼如下:

public class RetryCache {
    private MessageSender sender;
    private boolean stop = false;
    private Map<String, MessageWithTime> map = new ConcurrentHashMap<>();
    private AtomicLong id = new AtomicLong();

    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    private static class MessageWithTime {
        long time;
        Object message;
    }

    public void setSender(MessageSender sender) {
        this.sender = sender;
        startRetry();
    }

    public String generateId() {
        return "" + id.incrementAndGet();
    }

    public void add(String id, Object message) {
        map.put(id, new MessageWithTime(System.currentTimeMillis(), message));
    }

    public void del(String id) {
        map.remove(id);
    }

    private void startRetry() {
        new Thread(() ->{
            while (!stop) {
                try {
                    Thread.sleep(Constants.RETRY_TIME_INTERVAL);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                long now = System.currentTimeMillis();

                for (String key : map.keySet()) {
                    MessageWithTime messageWithTime = map.get(key);

                    if (null != messageWithTime) {
                        if (messageWithTime.getTime() + 3 * Constants.VALID_TIME < now) {
                            log.info("send message failed after 3 min " + messageWithTime);
                            del(key);
                        } else if (messageWithTime.getTime() + Constants.VALID_TIME < now) {
                            DetailRes detailRes = sender.send(messageWithTime.getMessage());

                            if (detailRes.isSuccess()) {
                                del(key);
                            }
                        }
                    }
                }
            }
        }).start();
    }
}

在client端發(fā)送之前,先在本地緩存message,代碼如下:

@Override
public DetailRes send(Object message) {
    try {
        String id = retryCache.generateId();
        retryCache.add(id, message);
        rabbitTemplate.correlationConvertAndSend(message, new CorrelationData(id));
    } catch (Exception e) {
        return new DetailRes(false, "");
    }

    return new DetailRes(true, "");
}

在收到ack時刪除本地緩存,代碼如下:

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (!ack) {
        log.info("send message failed: " + cause + correlationData.toString());
    } else {
        retryCache.del(correlationData.getId());
    }
});

再次驗證剛才的場景,發(fā)送300w條數(shù)據(jù),在發(fā)送的過程中過一段時間close一次connection,發(fā)送結(jié)束后,實際發(fā)送數(shù)據(jù)301.2w條,有一些重復(fù),但是沒有丟失數(shù)據(jù)。
同時需要驗證本地緩存的內(nèi)存泄露問題,程序連續(xù)發(fā)送1.5億條數(shù)據(jù),內(nèi)存占用穩(wěn)定在900M,并沒有明顯的波動。

最后貼一下rabbitmq的性能測試數(shù)據(jù):
1 300w條1k的數(shù)據(jù),單機(jī)部署rabbitmq(8核,32G)
在ack確認(rèn)模式下平均發(fā)送效率為1.1w條/秒
非ack確認(rèn)模式下平均發(fā)送效率為1.6w條/秒

2 300w條1k的數(shù)據(jù),cluster模式部署3臺(8核*3, 32G*3)
在ack確認(rèn)模式下平均發(fā)送效率為1.3w條/秒
非ack確認(rèn)模型下平均發(fā)送效率為1.7w條/秒

3 300w條1k的數(shù)據(jù),單機(jī)部署rabbitmq(8核,32G)
在ack確認(rèn)模式下平均消費效率為9000條/秒

4 300w條1k的數(shù)據(jù),cluster模式部署3臺(8核*3, 32G*3)
在ack確認(rèn)模式下平均消費效率為1w條/秒


代碼地址:

https://github.com/littlersmall/rabbitmq-access

幫忙點個星星,謝謝-_-

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容