5. 消息的持久化
什么是持久化消息?
保證消息只被傳送一次和成功使用一次。在持久性消息傳送至目標(biāo)時(shí),消息服務(wù)將其放入持久性數(shù)據(jù)存儲(chǔ)。如果消息服務(wù)由于某種原因?qū)е率。梢曰謴?fù)此消息并將此消息傳送至相應(yīng)的消費(fèi)者。雖然這樣增加了消息傳送的開(kāi)銷(xiāo),但卻增加了可靠性。
我的理解:在消息生產(chǎn)者將消息成功發(fā)送給MQ消息中間件之后。無(wú)論是出現(xiàn)任何問(wèn)題,如:MQ服務(wù)器宕機(jī)、消費(fèi)者掉線(xiàn)等。都保證(topic要之前注冊(cè)過(guò),queue不用)消息消費(fèi)者,能夠成功消費(fèi)消息。如果消息生產(chǎn)者發(fā)送消息就失敗了,那么消費(fèi)者也不會(huì)消費(fèi)到該消息。
5.1 queue 消息非持久和持久化
queue非持久,當(dāng)服務(wù)器宕機(jī),消息不存在(消息丟失了)。即便是非持久,消費(fèi)者在不在線(xiàn)的話(huà),消息也不會(huì)丟失,等待消費(fèi)者在線(xiàn),還是能夠收到消息的。
queue持久化,當(dāng)服務(wù)器宕機(jī),消息依然存在。queue消息默認(rèn)是持久化的。
持久化消息,保證這些消息只被傳送一次和成功使用一次。對(duì)于這些消息,可靠性是優(yōu)先考慮的因素。
可靠性的另一個(gè)重要方面是確保持久性消息傳送至目標(biāo)后,消息服務(wù)在向消費(fèi)者傳送它們之前不會(huì)丟失這些消息。

/**
*非持久化的消費(fèi)者,和之前的代碼一樣。下面演示非持久化的生產(chǎn)者。下面除灰色背景代碼 *外,其他都和之前一樣。
*運(yùn)行結(jié)果證明:當(dāng)生產(chǎn)者成功發(fā)布消息之后,MQ服務(wù)端宕機(jī)重啟,消息生產(chǎn)者就收不到該消息了
*/
package com.at.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer messageProducer = session.createProducer(queue);
// 非持久化
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("---MessageListener---" + i);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** 消息發(fā)送到MQ完成 ****");
}
}
持久化的消費(fèi)者,和之前的代碼一樣。下面演示持久化的生產(chǎn)者。下面除灰色背景代碼外,其他都和之前一樣。
運(yùn)行結(jié)果證明:當(dāng)生產(chǎn)者成功發(fā)布消息之后,MQ服務(wù)端宕機(jī)重啟,消息生產(chǎn)者仍然能夠收到該消息
package com.at.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer messageProducer = session.createProducer(queue);
//持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("---MessageListener---" + i);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** 消息發(fā)送到MQ完成 ****");
}
}
5.2 topic 消息持久化
topic默認(rèn)就是非持久化的,因?yàn)樯a(chǎn)者生產(chǎn)消息時(shí),消費(fèi)者也要在線(xiàn),這樣消費(fèi)者才能消費(fèi)到消息。
topic消息持久化,只要消費(fèi)者向MQ服務(wù)器注冊(cè)過(guò),所有生產(chǎn)者發(fā)布成功的消息,該消費(fèi)者都能收到,不管是MQ服務(wù)器宕機(jī)還是消費(fèi)者不在線(xiàn)。
注意:
一定要先運(yùn)行一次消費(fèi)者,等于向MQ注冊(cè),類(lèi)似我訂閱了這個(gè)主題。
然后再運(yùn)行生產(chǎn)者發(fā)送消息。
之后無(wú)論消費(fèi)者是否在線(xiàn),都會(huì)收到消息。如果不在線(xiàn)的話(huà),下次連接的時(shí)候,會(huì)把沒(méi)有收過(guò)的消息都接收過(guò)來(lái)
持久化topic生產(chǎn)者代碼。下面除灰色背景代碼外,其他都和之前一樣。
package com.at.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
// 持久化topic 的消息生產(chǎn)者
public class JmsProduce_persistence {
public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
// 設(shè)置持久化topic
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 設(shè)置持久化topic之后再,啟動(dòng)連接
connection.start();
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("topic_name--" + i);
messageProducer.send(textMessage);
MapMessage mapMessage = session.createMapMessage();
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** TOPIC_NAME消息發(fā)送到MQ完成 ****");
}
}
持久化topic消費(fèi)者代碼。下面除灰色背景代碼外,其他都和之前一樣。
package com.at.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
// 持久化topic 的消息消費(fèi)者
public class JmsConsummer_persistence {
public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
// 設(shè)置客戶(hù)端ID。向MQ服務(wù)器注冊(cè)自己的名稱(chēng)
connection.setClientID("marrry");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
// 創(chuàng)建一個(gè)topic訂閱者對(duì)象。一參是topic,二參是訂閱者名稱(chēng)
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
// 之后再開(kāi)啟連接
connection.start();
Message message = topicSubscriber.receive();
while (null != message){
TextMessage textMessage = (TextMessage)message;
System.out.println(" 收到的持久化 topic :"+textMessage.getText());
message = topicSubscriber.receive();
}
session.close();
connection.close();
}
}
控制臺(tái)介紹:
topic頁(yè)面還是和之前的一樣。另外在subscribers頁(yè)面也會(huì)顯示。如下:

6.消息的事務(wù)性

(1) 生產(chǎn)者開(kāi)啟事務(wù)后,執(zhí)行commit方法,這批消息才真正的被提交。不執(zhí)行commit方法,這批消息不會(huì)提交。執(zhí)行rollback方法,之前的消息會(huì)回滾掉。生產(chǎn)者的事務(wù)機(jī)制,要高于簽收機(jī)制,當(dāng)生產(chǎn)者開(kāi)啟事務(wù),簽收機(jī)制不再重要。
(2) 消費(fèi)者開(kāi)啟事務(wù)后,執(zhí)行commit方法,這批消息才算真正的被消費(fèi)。不執(zhí)行commit方法,這些消息不會(huì)標(biāo)記已消費(fèi),下次還會(huì)被消費(fèi)。執(zhí)行rollback方法,是不能回滾之前執(zhí)行過(guò)的業(yè)務(wù)邏輯,但是能夠回滾之前的消息,回滾后的消息,下次還會(huì)被消費(fèi)。消費(fèi)者利用commit和rollback方法,甚至能夠違反一個(gè)消費(fèi)者只能消費(fèi)一次消息的原理。
(3) 問(wèn):消費(fèi)者和生產(chǎn)者需要同時(shí)操作事務(wù)才行嗎?
答:消費(fèi)者和生產(chǎn)者的事務(wù),完全沒(méi)有關(guān)聯(lián),各自是各自的事務(wù)。
生產(chǎn)者代碼
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
private static final String ACTIVEMQ_QUEUE_NAME = "Queue-TX";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//1.創(chuàng)建會(huì)話(huà)session,兩個(gè)參數(shù)transacted=事務(wù),acknowledgeMode=確認(rèn)模式(簽收)
//設(shè)置為開(kāi)啟事務(wù)
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
producer.send(textMessage);
if(i == 2){
throw new RuntimeException("GG.....");
}
}
// 2. 開(kāi)啟事務(wù)后,使用commit提交事務(wù),這樣這批消息才能真正的被提交。
session.commit();
System.out.println("消息發(fā)送完成");
} catch (Exception e) {
System.out.println("出現(xiàn)異常,消息回滾");
// 3. 工作中一般,當(dāng)代碼出錯(cuò),我們?cè)赾atch代碼塊中回滾。這樣這批發(fā)送的消息就能回滾。
session.rollback();
} finally {
//4. 關(guān)閉資源
producer.close();
session.close();
connection.close();
}
}
}
消費(fèi)者
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "Queue-TX";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 創(chuàng)建會(huì)話(huà)session,兩個(gè)參數(shù)transacted=事務(wù),acknowledgeMode=確認(rèn)模式(簽收)
// 消費(fèi)者開(kāi)啟了事務(wù)就必須手動(dòng)提交,不然會(huì)重復(fù)消費(fèi)消息
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
int a = 0;
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("***消費(fèi)者接收到的消息: " + textMessage.getText());
if(a == 0){
System.out.println("commit");
session.commit();
}
if (a == 2) {
System.out.println("rollback");
session.rollback();
}
a++;
} catch (Exception e) {
System.out.println("出現(xiàn)異常,消費(fèi)失敗,放棄消費(fèi)");
try {
session.rollback();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
}
});
//關(guān)閉資源
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
結(jié)果
消費(fèi)者的控制臺(tái)輸出信息??梢钥闯鯿ommit和rollback方法的作用。
***消費(fèi)者接收到的消息: tx msg--0
commit
***消費(fèi)者接收到的消息: tx msg--1
***消費(fèi)者接收到的消息: tx msg--2
rollback
***消費(fèi)者接收到的消息: tx msg--1
***消費(fèi)者接收到的消息: tx msg--2
7.消息的簽收機(jī)制
7.1 簽收的幾種方式
① 自動(dòng)簽收(Session.AUTO_ACKNOWLEDGE):該方式是默認(rèn)的。該種方式,無(wú)需我們程序做任何操作,框架會(huì)幫我們自動(dòng)簽收收到的消息。
② 手動(dòng)簽收(Session.CLIENT_ACKNOWLEDGE):手動(dòng)簽收。該種方式,需要我們手動(dòng)調(diào)用Message.acknowledge(),來(lái)簽收消息。如果不簽收消息,該消息會(huì)被我們反復(fù)消費(fèi),只到被簽收。
③ 允許重復(fù)消息(Session.DUPS_OK_ACKNOWLEDGE):多線(xiàn)程或多個(gè)消費(fèi)者同時(shí)消費(fèi)到一個(gè)消息,因?yàn)榫€(xiàn)程不安全,可能會(huì)重復(fù)消費(fèi)。該種方式很少使用到。
④ 事務(wù)下的簽收(Session.SESSION_TRANSACTED):開(kāi)始事務(wù)的情況下,可以使用該方式。該種方式很少使用到。
7.2 事務(wù)和簽收的關(guān)系
① 在事務(wù)性會(huì)話(huà)中,當(dāng)一個(gè)事務(wù)被成功提交則消息被自動(dòng)簽收。如果事務(wù)回滾,則消息會(huì)被再次傳送。事務(wù)優(yōu)先于簽收,開(kāi)始事務(wù)后,簽收機(jī)制不再起任何作用。
② 非事務(wù)性會(huì)話(huà)中,消息何時(shí)被確認(rèn)取決于創(chuàng)建會(huì)話(huà)時(shí)的應(yīng)答模式。
③ 生產(chǎn)者事務(wù)開(kāi)啟,只有commit后才能將全部消息變?yōu)橐严M(fèi)。
④ 事務(wù)偏向生產(chǎn)者,簽收偏向消費(fèi)者。也就是說(shuō),生產(chǎn)者使用事務(wù)更好點(diǎn),消費(fèi)者使用簽收機(jī)制更好點(diǎn)。
7.3 代碼演示
下面我們演示,非事務(wù)下的消費(fèi)者如何使用手動(dòng)簽收的方式
非事務(wù)下的生產(chǎn)者。跟之前的代碼一樣
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "Queue-ACK";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
producer.send(textMessage);
}
System.out.println("消息發(fā)送完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
session.close();
connection.close();
}
}
}
非事務(wù)下的消費(fèi)者如何手動(dòng)簽收。
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "Queue-ACK";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("***消費(fèi)者接收到的消息: " + textMessage.getText());
/* 設(shè)置為Session.CLIENT_ACKNOWLEDGE后,要調(diào)用該方法,標(biāo)志著該消息已被簽收(消費(fèi))。
如果不調(diào)用該方法,該消息的標(biāo)志還是未消費(fèi),下次啟動(dòng)消費(fèi)者或其他消費(fèi)者還會(huì)收到改消息。
*/
textMessage.acknowledge();
} catch (Exception e) {
System.out.println("出現(xiàn)異常,消費(fèi)失敗,放棄消費(fèi)");
}
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
9. JMS的發(fā)布訂閱總結(jié)
(1) JMS的發(fā)布訂閱總結(jié)
JMS Pub/Sub 模型定義了如何向一個(gè)內(nèi)容節(jié)點(diǎn)發(fā)布和訂閱消息,這些節(jié)點(diǎn)被稱(chēng)作topic。
主題可以被認(rèn)為是消息的傳輸中介,發(fā)布者(publisher)發(fā)布消息到主題,訂閱者(subscribe)從主題訂閱消息。
主題使得消息訂閱者和消息發(fā)布者保持互相獨(dú)立不需要解除即可保證消息的傳送
(2) 非持久訂閱
非持久訂閱只有當(dāng)客戶(hù)端處于激活狀態(tài),也就是和MQ保持連接狀態(tài)才能收發(fā)到某個(gè)主題的消息。
如果消費(fèi)者處于離線(xiàn)狀態(tài),生產(chǎn)者發(fā)送的主題消息將會(huì)丟失作廢,消費(fèi)者永遠(yuǎn)不會(huì)收到。
一句話(huà):先訂閱注冊(cè)才能接受到發(fā)布,只給訂閱者發(fā)布消息。
(3) 持久訂閱
客戶(hù)端首先向MQ注冊(cè)一個(gè)自己的身份ID識(shí)別號(hào),當(dāng)這個(gè)客戶(hù)端處于離線(xiàn)時(shí),生產(chǎn)者會(huì)為這個(gè)ID保存所有發(fā)送到主題的消息,當(dāng)客戶(hù)再次連接到MQ的時(shí)候,會(huì)根據(jù)消費(fèi)者的ID得到所有當(dāng)自己處于離線(xiàn)時(shí)發(fā)送到主題的消息
當(dāng)持久訂閱狀態(tài)下,不能恢復(fù)或重新派送一個(gè)未簽收的消息。
持久訂閱才能恢復(fù)或重新派送一個(gè)未簽收的消息。
(4) 非持久和持久化訂閱如何選擇
當(dāng)所有的消息必須被接收,則用持久化訂閱。當(dāng)消息丟失能夠被容忍,則用非持久訂閱。