4.JMS的規(guī)范(二)

5. 消息的持久化

什么是持久化消息?

保證消息只被傳送一次和成功使用一次。在持久性消息傳送至目標(biāo)時(shí),消息服務(wù)將其放入持久性數(shù)據(jù)存儲(chǔ)。如果消息服務(wù)由于某種原因?qū)е率。梢曰謴?fù)此消息并將此消息傳送至相應(yīng)的消費(fèi)者。雖然這樣增加了消息傳送的開(kāi)銷(xiāo),但卻增加了可靠性。

我的理解:在消息生產(chǎn)者將消息成功發(fā)送給MQ消息中間件之后。無(wú)論是出現(xiàn)任何問(wèn)題,如:MQ服務(wù)器宕機(jī)、消費(fèi)者掉線(xiàn)等。都保證(topic要之前注冊(cè)過(guò),queue不用)消息消費(fèi)者,能夠成功消費(fèi)消息。如果消息生產(chǎn)者發(fā)送消息就失敗了,那么消費(fèi)者也不會(huì)消費(fèi)到該消息。

5.1 queue 消息非持久和持久化

queue非持久,當(dāng)服務(wù)器宕機(jī),消息不存在(消息丟失了)。即便是非持久,消費(fèi)者在不在線(xiàn)的話(huà),消息也不會(huì)丟失,等待消費(fèi)者在線(xiàn),還是能夠收到消息的

queue持久化,當(dāng)服務(wù)器宕機(jī),消息依然存在。queue消息默認(rèn)是持久化的。

持久化消息,保證這些消息只被傳送一次和成功使用一次。對(duì)于這些消息,可靠性是優(yōu)先考慮的因素。

可靠性的另一個(gè)重要方面是確保持久性消息傳送至目標(biāo)后,消息服務(wù)在向消費(fèi)者傳送它們之前不會(huì)丟失這些消息。

image.png
/**
  *非持久化的消費(fèi)者,和之前的代碼一樣。下面演示非持久化的生產(chǎn)者。下面除灰色背景代碼  *外,其他都和之前一樣。
  *運(yùn)行結(jié)果證明:當(dāng)生產(chǎn)者成功發(fā)布消息之后,MQ服務(wù)端宕機(jī)重啟,消息生產(chǎn)者就收不到該消息了
  */
package com.at.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsProduce {
    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    public static final String QUEUE_NAME = "jdbc01";

    public static void main(String[] args) throws  Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageProducer messageProducer = session.createProducer(queue);
        // 非持久化
        messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        for (int i = 1; i < 4 ; i++) {
            TextMessage textMessage = session.createTextMessage("---MessageListener---" + i);
            messageProducer.send(textMessage);
        }
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** 消息發(fā)送到MQ完成 ****");
    }
}

持久化的消費(fèi)者,和之前的代碼一樣。下面演示持久化的生產(chǎn)者。下面除灰色背景代碼外,其他都和之前一樣。
運(yùn)行結(jié)果證明:當(dāng)生產(chǎn)者成功發(fā)布消息之后,MQ服務(wù)端宕機(jī)重啟,消息生產(chǎn)者仍然能夠收到該消息

package com.at.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsProduce {
    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    public static final String QUEUE_NAME = "jdbc01";

    public static void main(String[] args) throws  Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageProducer messageProducer = session.createProducer(queue);
        //持久化
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        for (int i = 1; i < 4 ; i++) {
            TextMessage textMessage = session.createTextMessage("---MessageListener---" + i);
            messageProducer.send(textMessage);
        }
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** 消息發(fā)送到MQ完成 ****");
    }
}

5.2 topic 消息持久化

topic默認(rèn)就是非持久化的,因?yàn)樯a(chǎn)者生產(chǎn)消息時(shí),消費(fèi)者也要在線(xiàn),這樣消費(fèi)者才能消費(fèi)到消息。

topic消息持久化,只要消費(fèi)者向MQ服務(wù)器注冊(cè)過(guò),所有生產(chǎn)者發(fā)布成功的消息,該消費(fèi)者都能收到,不管是MQ服務(wù)器宕機(jī)還是消費(fèi)者不在線(xiàn)。

注意

  1. 一定要先運(yùn)行一次消費(fèi)者,等于向MQ注冊(cè),類(lèi)似我訂閱了這個(gè)主題。

  2. 然后再運(yùn)行生產(chǎn)者發(fā)送消息。

  3. 之后無(wú)論消費(fèi)者是否在線(xiàn),都會(huì)收到消息。如果不在線(xiàn)的話(huà),下次連接的時(shí)候,會(huì)把沒(méi)有收過(guò)的消息都接收過(guò)來(lái)

持久化topic生產(chǎn)者代碼。下面除灰色背景代碼外,其他都和之前一樣。

package  com.at.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

// 持久化topic 的消息生產(chǎn)者
public class JmsProduce_persistence {

    public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws  Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer messageProducer = session.createProducer(topic);

        // 設(shè)置持久化topic 
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 設(shè)置持久化topic之后再,啟動(dòng)連接
        connection.start();
        for (int i = 1; i < 4 ; i++) {
            TextMessage textMessage = session.createTextMessage("topic_name--" + i);
            messageProducer.send(textMessage);
            MapMessage mapMessage = session.createMapMessage();
        }
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** TOPIC_NAME消息發(fā)送到MQ完成 ****");
    }
}

持久化topic消費(fèi)者代碼。下面除灰色背景代碼外,其他都和之前一樣。

package  com.at.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

// 持久化topic 的消息消費(fèi)者
public class JmsConsummer_persistence {
    public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
// 設(shè)置客戶(hù)端ID。向MQ服務(wù)器注冊(cè)自己的名稱(chēng)
        connection.setClientID("marrry");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
// 創(chuàng)建一個(gè)topic訂閱者對(duì)象。一參是topic,二參是訂閱者名稱(chēng)
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
         // 之后再開(kāi)啟連接
        connection.start();
        Message message = topicSubscriber.receive();
         while (null != message){
             TextMessage textMessage = (TextMessage)message;
             System.out.println(" 收到的持久化 topic :"+textMessage.getText());
             message = topicSubscriber.receive();
         }
        session.close();
        connection.close();
    }
}

控制臺(tái)介紹:

topic頁(yè)面還是和之前的一樣。另外在subscribers頁(yè)面也會(huì)顯示。如下:

image.png

6.消息的事務(wù)性

image.png

(1) 生產(chǎn)者開(kāi)啟事務(wù)后,執(zhí)行commit方法,這批消息才真正的被提交。不執(zhí)行commit方法,這批消息不會(huì)提交。執(zhí)行rollback方法,之前的消息會(huì)回滾掉。生產(chǎn)者的事務(wù)機(jī)制,要高于簽收機(jī)制,當(dāng)生產(chǎn)者開(kāi)啟事務(wù),簽收機(jī)制不再重要。

(2) 消費(fèi)者開(kāi)啟事務(wù)后,執(zhí)行commit方法,這批消息才算真正的被消費(fèi)。不執(zhí)行commit方法,這些消息不會(huì)標(biāo)記已消費(fèi),下次還會(huì)被消費(fèi)。執(zhí)行rollback方法,是不能回滾之前執(zhí)行過(guò)的業(yè)務(wù)邏輯,但是能夠回滾之前的消息,回滾后的消息,下次還會(huì)被消費(fèi)。消費(fèi)者利用commit和rollback方法,甚至能夠違反一個(gè)消費(fèi)者只能消費(fèi)一次消息的原理。

(3) 問(wèn):消費(fèi)者和生產(chǎn)者需要同時(shí)操作事務(wù)才行嗎?

答:消費(fèi)者和生產(chǎn)者的事務(wù),完全沒(méi)有關(guān)聯(lián),各自是各自的事務(wù)。

生產(chǎn)者代碼

package com.activemq.demo;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class Jms_TX_Producer {
    private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    private static final String ACTIVEMQ_QUEUE_NAME = "Queue-TX";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //1.創(chuàng)建會(huì)話(huà)session,兩個(gè)參數(shù)transacted=事務(wù),acknowledgeMode=確認(rèn)模式(簽收)
        //設(shè)置為開(kāi)啟事務(wù)
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        try {
            for (int i = 0; i < 3; i++) {
                TextMessage textMessage = session.createTextMessage("tx msg--" + i);
              producer.send(textMessage);
if(i == 2){
                    throw new RuntimeException("GG.....");
                }
            }
            // 2. 開(kāi)啟事務(wù)后,使用commit提交事務(wù),這樣這批消息才能真正的被提交。
            session.commit();
            System.out.println("消息發(fā)送完成");
        } catch (Exception e) {
            System.out.println("出現(xiàn)異常,消息回滾");
            // 3. 工作中一般,當(dāng)代碼出錯(cuò),我們?cè)赾atch代碼塊中回滾。這樣這批發(fā)送的消息就能回滾。
            session.rollback();
        } finally {
            //4. 關(guān)閉資源
            producer.close();
            session.close();
            connection.close();
        }
    }
}


消費(fèi)者

package com.activemq.demo;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;

public class Jms_TX_Consumer {
    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    private static final String ACTIVEMQ_QUEUE_NAME = "Queue-TX";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 創(chuàng)建會(huì)話(huà)session,兩個(gè)參數(shù)transacted=事務(wù),acknowledgeMode=確認(rèn)模式(簽收)
        // 消費(fèi)者開(kāi)啟了事務(wù)就必須手動(dòng)提交,不然會(huì)重復(fù)消費(fèi)消息
        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new MessageListener() {
            int a = 0;
            @Override
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("***消費(fèi)者接收到的消息:   " + textMessage.getText());
                        if(a == 0){
                            System.out.println("commit");
                            session.commit();
                        }
                        if (a == 2) {
                            System.out.println("rollback");
                            session.rollback();
                        }
                        a++;
                    } catch (Exception e) {
                        System.out.println("出現(xiàn)異常,消費(fèi)失敗,放棄消費(fèi)");
                        try {
                            session.rollback();
                        } catch (JMSException ex) {
                            ex.printStackTrace();
                        }
                    }
                }
            }
        });
        //關(guān)閉資源
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

結(jié)果

消費(fèi)者的控制臺(tái)輸出信息??梢钥闯鯿ommit和rollback方法的作用。
***消費(fèi)者接收到的消息: tx msg--0
commit
***消費(fèi)者接收到的消息: tx msg--1
***消費(fèi)者接收到的消息: tx msg--2
rollback
***消費(fèi)者接收到的消息: tx msg--1
***消費(fèi)者接收到的消息: tx msg--2

7.消息的簽收機(jī)制

7.1 簽收的幾種方式

① 自動(dòng)簽收(Session.AUTO_ACKNOWLEDGE):該方式是默認(rèn)的。該種方式,無(wú)需我們程序做任何操作,框架會(huì)幫我們自動(dòng)簽收收到的消息。

② 手動(dòng)簽收(Session.CLIENT_ACKNOWLEDGE):手動(dòng)簽收。該種方式,需要我們手動(dòng)調(diào)用Message.acknowledge(),來(lái)簽收消息。如果不簽收消息,該消息會(huì)被我們反復(fù)消費(fèi),只到被簽收。

③ 允許重復(fù)消息(Session.DUPS_OK_ACKNOWLEDGE):多線(xiàn)程或多個(gè)消費(fèi)者同時(shí)消費(fèi)到一個(gè)消息,因?yàn)榫€(xiàn)程不安全,可能會(huì)重復(fù)消費(fèi)。該種方式很少使用到。

④ 事務(wù)下的簽收(Session.SESSION_TRANSACTED):開(kāi)始事務(wù)的情況下,可以使用該方式。該種方式很少使用到。

7.2 事務(wù)和簽收的關(guān)系

① 在事務(wù)性會(huì)話(huà)中,當(dāng)一個(gè)事務(wù)被成功提交則消息被自動(dòng)簽收。如果事務(wù)回滾,則消息會(huì)被再次傳送。事務(wù)優(yōu)先于簽收,開(kāi)始事務(wù)后,簽收機(jī)制不再起任何作用。

② 非事務(wù)性會(huì)話(huà)中,消息何時(shí)被確認(rèn)取決于創(chuàng)建會(huì)話(huà)時(shí)的應(yīng)答模式。

③ 生產(chǎn)者事務(wù)開(kāi)啟,只有commit后才能將全部消息變?yōu)橐严M(fèi)。

④ 事務(wù)偏向生產(chǎn)者,簽收偏向消費(fèi)者。也就是說(shuō),生產(chǎn)者使用事務(wù)更好點(diǎn),消費(fèi)者使用簽收機(jī)制更好點(diǎn)。

7.3 代碼演示

下面我們演示,非事務(wù)下的消費(fèi)者如何使用手動(dòng)簽收的方式
非事務(wù)下的生產(chǎn)者。跟之前的代碼一樣

package com.activemq.demo;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class Jms_TX_Producer {

    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    private static final String ACTIVEMQ_QUEUE_NAME = "Queue-ACK";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        try {
            for (int i = 0; i < 3; i++) {
                TextMessage textMessage = session.createTextMessage("tx msg--" + i);
                producer.send(textMessage);
            }
            System.out.println("消息發(fā)送完成");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
            session.close();
            connection.close();
        }
    }
}

非事務(wù)下的消費(fèi)者如何手動(dòng)簽收。

package com.activemq.demo;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;

public class Jms_TX_Consumer {
    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    private static final String ACTIVEMQ_QUEUE_NAME = "Queue-ACK";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("***消費(fèi)者接收到的消息:   " + textMessage.getText());
                        /* 設(shè)置為Session.CLIENT_ACKNOWLEDGE后,要調(diào)用該方法,標(biāo)志著該消息已被簽收(消費(fèi))。
                            如果不調(diào)用該方法,該消息的標(biāo)志還是未消費(fèi),下次啟動(dòng)消費(fèi)者或其他消費(fèi)者還會(huì)收到改消息。
                         */
                        textMessage.acknowledge();
                    } catch (Exception e) {
                        System.out.println("出現(xiàn)異常,消費(fèi)失敗,放棄消費(fèi)");
                    }
                }
            }
        });
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

9. JMS的發(fā)布訂閱總結(jié)

(1) JMS的發(fā)布訂閱總結(jié)

JMS Pub/Sub 模型定義了如何向一個(gè)內(nèi)容節(jié)點(diǎn)發(fā)布和訂閱消息,這些節(jié)點(diǎn)被稱(chēng)作topic。

主題可以被認(rèn)為是消息的傳輸中介,發(fā)布者(publisher)發(fā)布消息到主題,訂閱者(subscribe)從主題訂閱消息。

主題使得消息訂閱者和消息發(fā)布者保持互相獨(dú)立不需要解除即可保證消息的傳送

(2) 非持久訂閱

非持久訂閱只有當(dāng)客戶(hù)端處于激活狀態(tài),也就是和MQ保持連接狀態(tài)才能收發(fā)到某個(gè)主題的消息。

如果消費(fèi)者處于離線(xiàn)狀態(tài),生產(chǎn)者發(fā)送的主題消息將會(huì)丟失作廢,消費(fèi)者永遠(yuǎn)不會(huì)收到。

一句話(huà):先訂閱注冊(cè)才能接受到發(fā)布,只給訂閱者發(fā)布消息。

(3) 持久訂閱

客戶(hù)端首先向MQ注冊(cè)一個(gè)自己的身份ID識(shí)別號(hào),當(dāng)這個(gè)客戶(hù)端處于離線(xiàn)時(shí),生產(chǎn)者會(huì)為這個(gè)ID保存所有發(fā)送到主題的消息,當(dāng)客戶(hù)再次連接到MQ的時(shí)候,會(huì)根據(jù)消費(fèi)者的ID得到所有當(dāng)自己處于離線(xiàn)時(shí)發(fā)送到主題的消息

當(dāng)持久訂閱狀態(tài)下,不能恢復(fù)或重新派送一個(gè)未簽收的消息。

持久訂閱才能恢復(fù)或重新派送一個(gè)未簽收的消息。

(4) 非持久和持久化訂閱如何選擇

當(dāng)所有的消息必須被接收,則用持久化訂閱。當(dāng)消息丟失能夠被容忍,則用非持久訂閱。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 5. JMS規(guī)范 5.1 JMS是什么 什么是JavaEE? 是一套使用Java進(jìn)行企業(yè)級(jí)應(yīng)用開(kāi)發(fā)的大家一致遵循的...
    笨比喬治閱讀 540評(píng)論 0 1
  • JMS是什么 JMS是JavaEE其中的一個(gè)模塊 JavaEE是一套使用Java進(jìn)行企業(yè)級(jí)應(yīng)用開(kāi)發(fā),大家一致遵循的...
    金石_832e閱讀 309評(píng)論 0 1
  • JMS Java消息服務(wù) Java消息服務(wù)(Java Message Service,JMS)應(yīng)用程序接口是一個(gè)J...
    wanggs閱讀 279評(píng)論 0 0
  • ActiveMQ 簡(jiǎn)介 ActiveMQ 是完全基于 JMS 規(guī)范實(shí)現(xiàn)的一個(gè)消息中間件產(chǎn)品。是 Apache 開(kāi)源...
    匠丶閱讀 1,670評(píng)論 0 5
  • 1.書(shū)寫(xiě)生產(chǎn)者消息生產(chǎn)的代碼: mq效果: 第一框是發(fā)送的消息個(gè)數(shù),第二個(gè)框是等待的消息個(gè)數(shù)2.書(shū)寫(xiě)消費(fèi)者接收消息...
    李霖神谷閱讀 465評(píng)論 0 0

友情鏈接更多精彩內(nèi)容