rabbitmq如何保證消息可靠性不丟失

[TOC]

之前我們簡單介紹了rabbitmq的功能。他的作用就是方便我們的消息解耦。緊接著問題就會暴露出來。解耦就設計到雙方系統(tǒng)不穩(wěn)定問題。在mq中有生產(chǎn)者、mq、消費者三個角色。其中一個角色down機或者重啟后。就設計到消息的丟失問題。

因為MQ整個消息周期設計到上述的三個角色,所以我們從這個三個角色開始討論丟失數(shù)據(jù)的情況。并如何解決

生產(chǎn)者丟失消息

  • 在生產(chǎn)數(shù)據(jù)程序中,消息已經(jīng)處理好還未發(fā)送給MQ這個階段,生產(chǎn)者因為意外情況中斷了。這個時候生產(chǎn)者這條消息就會丟失。因為程序重啟好之后可能不會再次生產(chǎn)該消息。

實際案列1

  • 購物商城中已經(jīng)選購了商品提交到支付界面。在支付成功后我們的程序需要發(fā)送消息給商家。這個時候程序中斷了。待重啟后客戶界面訂單狀態(tài)是付款成功的。但是這個訂單就沒有及時通知給商家。這會造成商家延遲發(fā)貨。

實際案例2

  • 同樣是購物支付,A客戶先付款order1訂單,支付成功后發(fā)送MQ前直線異常但并未導致程序中斷。這個時候order1商家收不到通知,然后B客戶對order2訂單進行支付且整個過程正常。order2訂單就會通知到對應的商家。整個周期order1訂單就屬于丟失

總結(jié)

  • 兩種情況都是在發(fā)送消息是出現(xiàn)問題。第一種是程序中斷,第二種是訂單異常,第一種異常級別高會影響整個程序使用反而是好排查。第二種程序不異常。這種情況很難發(fā)現(xiàn),只會是個別情況。

解決方案

  • 針對上述情況mq也提供了兩種方法解決。
  • 1、開啟rabbitmq事務(同步)
  • 2、開啟confirm模式(異步)

代碼模擬


Map<String, Object> resultMap = new HashMap<String, Object>(){
    {
        put("code", 200);
    }
};
String msg = "";
Integer index = 0;
if (params.containsKey("msg")) {
    msg = params.get("msg").toString();
}
if (params.containsKey("index")) {
    index = Integer.valueOf(params.get("index").toString());
}
if (index != 0) {
    //這里開始模擬異常出現(xiàn)。消息將會丟失
    int i = 1 / 0;
}
rabbitTemplate.convertAndSend(RabbitConfig.TOPICEXCHANGE, "zxh", msg);
return resultMap;

  • 上述代碼http://localhost:8282/rabbitmq/sendTopic?msg=test&index=1就會發(fā)生異常,這個時候數(shù)據(jù)丟失
  • http://localhost:8282/rabbitmq/sendTopic?msg=test可以正常發(fā)送。讀者可以自行測試
  • 其實通過rabbitmq的事務并不能解決上面的丟失情況。但是加上事務會保證消息發(fā)送的可靠性。上面發(fā)送消息后出異常這時候我們就沒法回退消息了。但是事務可以幫我們實現(xiàn)

事務


String msg = "trantest";
Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
Channel channel = connection.createChannel(true);
try {
    channel.basicPublish(RabbitConfig.TOPICEXCHANGE, "zxh", null, msg.getBytes());
    int i = 1 / 0;
} catch (IOException e) {
    channel.txRollback();
    e.printStackTrace();
}
channel.txCommit();
connection.close();

  • 最終測試效果是mq沒有收到消息的。

confirm模式確實


Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
Channel channel = connection.createChannel(false);
channel.confirmSelect();
try {
    channel.basicPublish(RabbitConfig.TOPICEXCHANGE, "zxh", null, msg.getBytes());
} catch (IOException e) {
    e.printStackTrace();
}
boolean b = channel.waitForConfirms();
if (b) {
    System.out.println("mq接收消息成功");
    Thread.sleep(1000*5);
}
System.out.println("end1");
channel.confirmSelect();
channel.basicPublish(RabbitConfig.TOPICEXCHANGE, "zxh", null, msg.getBytes());
channel.addConfirmListener(new ConfirmListener() {
    @SneakyThrows
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("~~~~~消息成功發(fā)送到交換機");
        Thread.sleep(1000 * 5);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("~~~~~消息發(fā)送到交換機失敗");
    }
});
System.out.println("end2");
channel.close();
connection.close();

  • 上面使用了兩種確認方式,前者是同步確認,后者是異步確認。因為在同一個方法里。msg都是能獲取到的。所以在ConfimListener中就沒有返回消息。

數(shù)據(jù)退回監(jiān)聽

  • 上面兩種一個增加安全可靠性。一個增加確認機制。還有一種情況是數(shù)據(jù)回退。當交換機沒有隊列綁定是這個時候發(fā)送數(shù)據(jù)后如果設置了回退屬性,那么消息會回退到監(jiān)聽器匯中的。channel中的mandatory表示是否檢測分發(fā)到隊列中。

String msg = "Hello World!";
Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
Channel channel = connection.createChannel(false);
channel.confirmSelect();
//return機制:監(jiān)控交換機是否將消息分發(fā)到隊列
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
        //如果交換機分發(fā)消息到隊列失敗,則會執(zhí)行此方法(用來處理交換機分發(fā)消息到隊列失敗的情況)
        System.out.println("*****"+i);  //標識
        System.out.println("*****"+s);  //
        System.out.println("*****"+s1); //交換機名
        System.out.println("*****"+s2); //交換機對應的隊列的key
        System.out.println("*****"+new String(bytes));  //發(fā)送的消息
    }
});
//發(fā)送消息
//channel.basicPublish("ex2", "c", null, msg.getBytes());
channel.basicPublish(RabbitConfig.DIRECTEXCHANGE, "c", true, null, msg.getBytes());
channel.addConfirmListener(new ConfirmListener() {
    @SneakyThrows
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("~~~~~消息成功發(fā)送到交換機");
        Thread.sleep(1000 * 5);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("~~~~~消息發(fā)送到交換機失敗");
    }
});

  • 上面ReturnListener就會被觸發(fā),這個時候confirm監(jiān)聽器也被觸發(fā)認為成功接收的只不過被退回。

MQ事務相關(guān)軟文推薦

mq事務開啟分析

MQ丟失信息

  • 在發(fā)送消息到MQ時我們可以設置消息屬性是否為可持久化。如果設置了直接就會存儲在磁盤上。在內(nèi)存可用時也會同步到內(nèi)存中提高效率。如果消息屬性中設置的是非持久化的話,就會直接存儲在內(nèi)存里,當內(nèi)存不足是會將數(shù)據(jù)備份至磁盤上。

消費者丟失信息

  • 消費端如果沒有單獨設置的話默認就是MQ不管理。換句話說MQ只負責發(fā)送消息。mq為我們提供了三種模式
    NONE,
    MANUAL,
    AUTO; 默認的

  • 我們需要手動將連接工廠設置MANUAL后再接收到消息后我們需要手動確認,mq才會刪除消息。否則會一直等待到消費端重啟才會進行重新分發(fā)數(shù)據(jù)

  • channel.basicAck(long,boolean); 確認收到消息,消息將被隊列移除,false只確認當前consumer一個消息收到,true確認所有consumer獲得的消息。

  • channel.basicNack(long,boolean,boolean); 確認否定消息,第一個boolean表示一個consumer還是所有,第二個boolean表示requeue是否重新回到隊列,true重新入隊。

  • channel.basicReject(long,boolean); 拒絕消息,requeue=false 表示不再重新入隊,如果配置了死信隊列則進入死信隊列。

當消息回滾到消息隊列時,這條消息不會回到隊列尾部,而是仍是在隊列頭部,這時消費者會又接收到這條消息,如果想消息進入隊尾,須確認消息后再次發(fā)送消息。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容