一、什么是MQ?
MQ,中文名字叫做消息中間件。既然是中間件,那么就說(shuō)明它左邊有東西,右邊也有東西。那么左邊是什么?右邊又是什么呢?MQ在中間能干嘛呢?看看下面的例子。
1、生活中的case:
老師講完了練習(xí),然后對(duì)同學(xué)們說(shuō)有問(wèn)題的現(xiàn)在就過(guò)來(lái)問(wèn)。然后張三李四王五趙六都有問(wèn)題要問(wèn)。那么他們就按順序排隊(duì)。張三需要5分鐘,然后是李四8分鐘,再然后才是王五10分鐘,最后是趙六。這就相當(dāng)于dubbo的RPC遠(yuǎn)程調(diào)用。也就是說(shuō),張三問(wèn)的時(shí)候老師這個(gè)系統(tǒng)只能響應(yīng)張三,后面的人都得等著。這樣就會(huì)導(dǎo)致學(xué)生和老師耦合度高,而且效率低,如果問(wèn)問(wèn)題的學(xué)生多,越后面的人等待的時(shí)間也越長(zhǎng),老師還會(huì)累死。怎么優(yōu)化呢?
2、優(yōu)化方案:
老師會(huì)叫同學(xué)們把需要問(wèn)的問(wèn)題按照約定的格式在紙上寫(xiě)好,然后交給班長(zhǎng)。等老師解答完當(dāng)前學(xué)生的問(wèn)題,就從班長(zhǎng)那里拿出一份問(wèn)題。這樣一來(lái),同學(xué)們也不用干等著,交了問(wèn)題后該干嘛就干嘛去,老師也可以選擇適當(dāng)?shù)臅r(shí)間再解答,不會(huì)被累死。
這個(gè)案例中的班長(zhǎng)就是一個(gè)中間件,它不處理真正的邏輯,只是一個(gè)中間人。學(xué)生不直接問(wèn)老師,而是通過(guò)班長(zhǎng),使得學(xué)生和老師解耦了;其次,學(xué)生上午交的問(wèn)題,可能下午才得到老師的解答,整個(gè)過(guò)程是異步的;即便有一大群學(xué)生來(lái)問(wèn)問(wèn)題,這些請(qǐng)求也會(huì)堆積在班長(zhǎng)那里,可以幫老師抵流量沖擊,而不會(huì)影響到老師。綜上:
MQ的作用:
- 異步;
- 解耦;
- 削峰
歡迎大家關(guān)注我的公眾號(hào) javawebkf,目前正在慢慢地將簡(jiǎn)書(shū)文章搬到公眾號(hào),以后簡(jiǎn)書(shū)和公眾號(hào)文章將同步更新,且簡(jiǎn)書(shū)上的付費(fèi)文章在公眾號(hào)上將免費(fèi)。
二、activeMQ的安裝
- 首先從官網(wǎng)下載activeMQ (linux版本);
- 然后解壓就行了(activeMQ是java編寫(xiě)的,所以需要安裝JDK)。
進(jìn)入到bin目錄,然后執(zhí)行如下命令:
- 啟動(dòng):./activemq start
- 指定xml配置文件啟動(dòng):./activemq start xbean:file:/文件路徑
- 關(guān)閉:./activemq stop
- 重啟:./activemq restart
activeMQ的后臺(tái)啟動(dòng)端口是 61616,要想查看是否啟動(dòng)成功,有如下幾種方式:
- ps -ef | grep activemq| grep -v grep
- netstat -anp | grep 61616
- lsof -i:61616
activemq還有一個(gè)圖形界面,端口是 8161。首先保證你的 Linux 虛擬機(jī)和 windows 的 ip 處于同一個(gè)網(wǎng)段,然后確保沒(méi)有被防火墻給屏蔽,在Linux 和 windows 上互 ping 一下。能 ping 通后,就在 瀏覽器訪問(wèn) 192.168.x.xx:8161, 默認(rèn)的用戶名和密碼都是 admin。訪問(wèn)后可以看到如下界面:

三、activeMQ怎么玩?
上面舉了生活中的例子來(lái)說(shuō)明MQ的作用,說(shuō)白了就是我們先把問(wèn)題發(fā)到MQ中,然后從MQ中取出消息。那么具體是發(fā)送到MQ中的什么位置呢?這個(gè)位置我們管它叫destination,即目的地。
目的地有以下兩種:
- 隊(duì)列queue(點(diǎn)對(duì)點(diǎn));
- 主題topic(發(fā)布與訂閱);
1、點(diǎn)對(duì)點(diǎn)傳輸:
所謂點(diǎn)對(duì)點(diǎn)傳輸,可以理解為發(fā)私信。你發(fā)了一條消息給你女朋友,只有你女朋友能收到。那接下來(lái)就看看怎么發(fā)消息和收消息。首先添加依賴(lài):
<!-- activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.8</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.12</version>
</dependency>
- 生產(chǎn)消息:
public class Productor {
private static final String URL = "tcp://192.168.0.103:61616";
private static final String QUEUE_NAME = "queue_test";
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建factory工廠
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 創(chuàng)建connection連接
Connection connection = factory.createConnection();
connection.start();
// 3. 創(chuàng)建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4. 創(chuàng)建目的地queue
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
// 5. 生產(chǎn)消息
for (int i = 1; i <= 3; i++) {
TextMessage message = session.createTextMessage("queue" + 1);
// 6. 將消息發(fā)送到MQ
producer.send(message);
}
// 7. 關(guān)閉資源(順著申請(qǐng),倒著關(guān)閉)
producer.close();
session.close();
connection.close();
System.out.println("發(fā)送到MQ完成!");
}
}
運(yùn)行后,就可以在8161端口看到如下信息了:

- 消費(fèi)消息:
public class Consumer {
private static final String URL = "tcp://192.168.0.103:61616";
private static final String QUEUE_NAME = "queue_test";
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建factory工廠
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 創(chuàng)建connection連接
Connection connection = factory.createConnection();
connection.start();
// 3. 創(chuàng)建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4. 創(chuàng)建目的地queue
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
// 5. 消費(fèi)消息
while (true){
// receive里面的參數(shù)表示超時(shí)時(shí)間
TextMessage message = (TextMessage) consumer.receive(3000);
if (message != null)
System.out.println(message.getText());
else
break;
}
// 6. 關(guān)閉資源(順著申請(qǐng),倒著關(guān)閉)
consumer.close();
session.close();
connection.close();
System.out.println("3秒還沒(méi)消息來(lái),我溜了!");
}
}
運(yùn)行后,在8161端口就可以看到如下變化:

可以看到消息隊(duì)列為3,出列的也是3,說(shuō)明消費(fèi)完了。
-
異步監(jiān)聽(tīng)的方式消費(fèi)消息:
異步相對(duì)的就是同步,上面那種方式就是同步的。就是調(diào)用receive方法來(lái)接收消息,在沒(méi)接收到消息或超時(shí)之前,程序?qū)⒁恢弊枞?。在上面那段代碼中,receive方法設(shè)置了3秒的超時(shí)時(shí)間,假如MQ中此刻沒(méi)有消息供消費(fèi),那么程序?qū)⒁?秒后才能輸出 “3秒還沒(méi)消息,我溜了!” 這句話。異步就是不會(huì)阻塞,即使沒(méi)收到消息,程序還是該干嘛就干嘛。異步監(jiān)聽(tīng)方式寫(xiě)法如下:
TextMessage message = (TextMessage) consumer.receive();
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message != null && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到消息: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();
// 6. 關(guān)閉資源(順著申請(qǐng),倒著關(guān)閉)
consumer.close();
session.close();
connection.close();
-
啟動(dòng)順序問(wèn)題:
-- 先啟動(dòng)生產(chǎn)者,再依次啟動(dòng)兩個(gè)消費(fèi)者:
------- 先啟動(dòng)的消費(fèi)者可以拿到消息,后啟動(dòng)的就不能消費(fèi)了。結(jié)論:消息不能被重復(fù)消費(fèi)。
-- 先啟動(dòng)兩個(gè)消費(fèi)者,再啟動(dòng)生產(chǎn)者生產(chǎn)消息:
------- 結(jié)果就是兩個(gè)消費(fèi)者一人消費(fèi)一半。
- 小總結(jié):
從上面生產(chǎn)消息和消費(fèi)消息的demo中可以發(fā)現(xiàn),其步驟其實(shí)和JDBC操作數(shù)據(jù)庫(kù)差不多,都是先創(chuàng)建factory,然后通過(guò)factory創(chuàng)建connection連接,再創(chuàng)建session,最后執(zhí)行操作的是session。點(diǎn)對(duì)點(diǎn)傳輸還有如下特點(diǎn):
- 每條消息只能有一個(gè)消費(fèi)者,也就是上面說(shuō)的消息不能被重復(fù)消費(fèi);
- 消息生產(chǎn)者和消費(fèi)者沒(méi)有時(shí)間上的關(guān)聯(lián),生產(chǎn)消息時(shí)不用管是不是有人消費(fèi),消費(fèi)者也隨時(shí)可以提取消息;
- 消息被消費(fèi)后將不會(huì)再存儲(chǔ),用過(guò)就沒(méi)了。
2、發(fā)布與訂閱:
上面說(shuō)了點(diǎn)對(duì)點(diǎn),就是你跟你女朋友發(fā)微信。那么發(fā)布與訂閱就是你在微信公眾號(hào)發(fā)推文,凡是關(guān)注了你公眾號(hào)的人都能收到消息。點(diǎn)對(duì)點(diǎn)的目的地是queue,發(fā)布與訂閱的目的地是topic,每條消息可以有多個(gè)消費(fèi)者;生產(chǎn)者和消費(fèi)者有時(shí)間上的關(guān)聯(lián),訂閱了某個(gè)topic,只能消費(fèi)你訂閱之后的消息,說(shuō)簡(jiǎn)單就是,關(guān)注了你公眾號(hào)的人,他不能收到在他關(guān)注你之前的消息;假如無(wú)人訂閱就去生產(chǎn),那就是一條廢消息,沒(méi)有人關(guān)注你的公眾號(hào),那么你發(fā)的推文就沒(méi)有意思,就是一條廢消息,所以一般會(huì)先啟動(dòng)消費(fèi)者,再啟動(dòng)生產(chǎn)者。
關(guān)于發(fā)布與訂閱,相比點(diǎn)對(duì)點(diǎn),只需要把queue改成topic就可以了,這里就不再貼代碼了。
關(guān)于topic和queue的區(qū)別,如下表所示:
| ? | topic | queue |
|---|---|---|
| 工作模式 | 一對(duì)多 | 一對(duì)一 |
| 狀態(tài) | 無(wú)狀態(tài) | queue數(shù)據(jù)會(huì)在mq服務(wù)器上以文件形式保存,也可配置成DB存儲(chǔ) |
| 完整性 | 如果沒(méi)有訂閱者,消息將被丟棄 | 消息不會(huì)被丟棄 |
| 處理效率 | 隨著訂閱者的增加效率會(huì)降低 | 由于一條消息只發(fā)給一個(gè)消費(fèi)者,所以消費(fèi)者再多也不會(huì)明顯地影響性能 |
四、關(guān)于JMS
1、什么是JMS?
JMS中文名叫Java消息服務(wù),它是一種規(guī)范,是javaEE的13種核心規(guī)范之一。關(guān)于javaEE的13種核心規(guī)范,網(wǎng)上一搜一大堆,這里不再贅述。JMS就是天上飛的理念,而各種MQ就是這種理念的落地實(shí)現(xiàn)。比如activeMQ、rocketMQ等,都要遵循JMS這個(gè)規(guī)范。
2、JMS的結(jié)構(gòu)和特點(diǎn):
- JMS結(jié)構(gòu):
- JMS Provider:實(shí)現(xiàn)了JMS接口和規(guī)范的消息中間件,像activeMQ、rocketMQ等
- JMS Producer:消息生產(chǎn)者
- JMS consumer:消息消費(fèi)者
- JMS message:消息
-
消息頭
JMSDestination:目的地,queue和topic
JMSDeliveryMode:分為持久和非持久模式。持久模式意味著消息即使JMS提供者出現(xiàn)故障,該消息并不會(huì)丟失,會(huì)在服務(wù)器恢復(fù)后再次發(fā)送;反之,非持久模式就是服務(wù)器出現(xiàn)故障,該消息將永久丟失。
JMSExpiration:消息過(guò)期時(shí)間,如果為0,表示永不過(guò)期。
JMSPriority:優(yōu)先級(jí),0到4是普通消息,5到9是加急消息,默認(rèn)是4。
JMSMessageID:消息的唯一標(biāo)識(shí),由MQ生成。
-
消息體
封裝的具體消息數(shù)據(jù)就是消息體
消息體格式,有5種,常用的 TextMessage(String類(lèi)型) 和 MapMessage(key、value形式)
發(fā)送和接收的消息體類(lèi)型必須對(duì)應(yīng)一致
-
消息屬性
是什么:一個(gè)對(duì)象的屬性能干嘛?用來(lái)描述這個(gè)對(duì)象的特點(diǎn)嘛,消息屬性也一樣地理解就好了。
如果需要除消息頭字段以外的值,可以使用消息屬性
消息屬性可以用來(lái)做識(shí)別/去重/重點(diǎn)標(biāo)注等操作,設(shè)置消息屬性的方法如下:
-
TextMessage textMessage = new session.createTextMessage("這是一條TextMessage");
// TextMessage 類(lèi)型設(shè)置消息屬性
textMessage.setStringProperty("property", "VIP");
在消費(fèi)者中取出消息后:
textMessage.getStringProperty("property")
即可取出消息屬性。
注意上面JMS結(jié)構(gòu)的層級(jí)關(guān)系。
3、如何保證消息的可靠性?(面試重點(diǎn))
一般要從三個(gè)角度去回答(持久性、事務(wù)、簽收)。
- 持久性:持久,是MQ掛了,消息依然存在,非持久,就是MQ掛了,消息就沒(méi)了。
隊(duì)列生產(chǎn)者的持久性:
// 這個(gè)producer是隊(duì)列
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 非持久
producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 持久
隊(duì)列設(shè)置為非持久,如果生產(chǎn)者將消息發(fā)送到MQ后,MQ掛了,那么這些消息就沒(méi)了,即使MQ恢復(fù)正常也沒(méi)了。隊(duì)列設(shè)置為持久,那么消息只要還沒(méi)消費(fèi)就還會(huì)有。activeMQ的隊(duì)列默認(rèn)設(shè)置了持久,可保證消息只被傳送一次和成功使用一次。
主題的持久性:
主題要設(shè)置持久,生產(chǎn)者和消費(fèi)者的編碼方式與之前都有點(diǎn)兒不一樣,代碼如下:
public class Consumer {
private static final String URL = "tcp://192.168.x.xxx:61616";
private static final String TOPIC_NAME = "topic_test";
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建factory工廠
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 創(chuàng)建connection連接
Connection connection = factory.createConnection();
connection.setClientID("張三");
System.out.println("張三訂閱");
// 3. 創(chuàng)建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4. 訂閱topic
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber subscriber = session.createDurableSubscriber(topic,"備注信息");
// 5. 啟動(dòng)
connection.start();
// 6. 消費(fèi)topic的消息
Message message = subscriber.receive();
while (null != message){
TextMessage textMessage = (TextMessage) message;
System.out.println("收到消息:" + textMessage.getText());
message = subscriber.receive(5000L);
}
// 6. 關(guān)閉資源(順著申請(qǐng),倒著關(guān)閉)
session.close();
connection.close();
System.out.println("5秒還沒(méi)消息來(lái),我溜了!");
}
}
public class Productor {
private static final String URL = "tcp://192.168.0.103:61616";
private static final String TOPIC_NAME = "topic_test";
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建factory工廠
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 創(chuàng)建connection連接
Connection connection = factory.createConnection();
// 3. 創(chuàng)建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4. 創(chuàng)建目的地topic
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
// 設(shè)置持久性
//producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 非持久
producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 持久
connection.start();
// 5. 生產(chǎn)消息
for (int i = 1; i <= 3; i++) {
TextMessage message = session.createTextMessage("queue" + i);
// 6. 將消息發(fā)送到MQ
producer.send(message);
}
// 7. 關(guān)閉資源(順著申請(qǐng),倒著關(guān)閉)
producer.close();
session.close();
connection.close();
System.out.println("發(fā)送到MQ完成!");
}
}
主題設(shè)置了持久的話,一定要先運(yùn)行一次消費(fèi)者,等于向MQ注冊(cè),表示我訂閱了這個(gè)主題。然后再運(yùn)行生產(chǎn)者發(fā)送信息,此時(shí),不論消費(fèi)者是否還在線,都會(huì)接收到消息,不在線的話,下次連接的時(shí)候,會(huì)把沒(méi)有收過(guò)的消息都接收下來(lái)。
- 事務(wù):創(chuàng)建session的時(shí)候要傳兩個(gè)參數(shù),一個(gè)是事務(wù),一個(gè)是簽收。
生產(chǎn)者事務(wù):
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
第一個(gè)參數(shù)就是表示事務(wù),設(shè)置為false,表示只要執(zhí)行了send方法,消息就進(jìn)入到隊(duì)列中了;如果設(shè)置為true,需要send后再執(zhí)行commit,消息才會(huì)被提交到隊(duì)列中。所以在session提交前,需要調(diào)commit方法,如下:
try{
//沒(méi)問(wèn)題就提交事務(wù)
session.commit();
}catch(Exception e){
//有問(wèn)題就回滾
session.rollback();
}finally{
producer.close();
session.close();
}
生產(chǎn)者主事務(wù),不管簽收,因?yàn)橄M(fèi)者才需要簽收嘛。生產(chǎn)者設(shè)置了事務(wù),簽收機(jī)制就無(wú)所謂了,只是這個(gè)方法需要傳一個(gè)簽收機(jī)制,其實(shí)事務(wù)設(shè)置為true后,起作用的就是事務(wù)了。
消費(fèi)者事務(wù):
如果消費(fèi)者開(kāi)啟了事務(wù),進(jìn)行消費(fèi)時(shí)而沒(méi)有commit的話,MQ會(huì)認(rèn)為你還沒(méi)有成功消費(fèi)消息,就會(huì)出現(xiàn)重復(fù)消費(fèi)的情況,所以消費(fèi)者一般不開(kāi)啟事務(wù),而是以簽收機(jī)制為主。
- 簽收:簽收機(jī)制有四種,用得較多的是自動(dòng)和手動(dòng)兩種方式。
消費(fèi)者非事務(wù)的手動(dòng)簽收:
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
如果這個(gè)時(shí)候直接運(yùn)行消費(fèi)者,發(fā)現(xiàn)又可以重復(fù)消費(fèi)消息,因?yàn)镸Q不知道你已經(jīng)簽收消息了。所以在receive到消息后,應(yīng)該手動(dòng)簽收,才不會(huì)重復(fù)消費(fèi),如下:
while (null != message){
TextMessage textMessage = (TextMessage) message;
textMessage.acknowledge(); // 手動(dòng)簽收
System.out.println("收到消息:" + textMessage.getText());
message = subscriber.receive(5000L);
}
消費(fèi)者開(kāi)啟事務(wù)的情況下的簽收:
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
開(kāi)啟了事務(wù),就會(huì)自動(dòng)設(shè)置為自動(dòng)簽收,即使后面那個(gè)參數(shù)設(shè)置了手動(dòng)簽收,也不起作用了。所以,不需要調(diào)用acknowledge()方法進(jìn)行簽收。如果開(kāi)啟了事務(wù),設(shè)置了手動(dòng)簽收,調(diào)用了acknowledge()方法,但是沒(méi)有commit,還是會(huì)重復(fù)消費(fèi)。
總之,在事務(wù)會(huì)話中,當(dāng)一個(gè)事務(wù)被成功提交則消息被自動(dòng)簽收,如果事務(wù)回滾,則消息會(huì)被再次傳遞。非事務(wù)會(huì)話中,消息何時(shí)被確認(rèn)取決于創(chuàng)建會(huì)話時(shí)的簽收模式。
小結(jié):不能容忍丟失消息,就用持久訂閱,可以容忍丟失消息,就用非持久訂閱。
五、activeMQ的broker
1、什么是broker?
broker就是嵌入式的activemq,也就是說(shuō),使用broker,只需要引入相關(guān)依賴(lài)就可以了,而不需要你本地安裝activemq,類(lèi)似于springboot那樣內(nèi)嵌tomcat。
2、怎么用?
除了之前引入的activemq-all,還需要引入如下依賴(lài):
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9.3</version>
</dependency>
然后編碼:
public static void main(String[] args) throws Exception{
BrokerService service = new BrokerService();
service.setUseJmx(true);
service.addConnector("tcp://localhost:61616");
service.start();
}
運(yùn)行后,就可以在控制臺(tái)看到這個(gè)嵌入式的activemq已經(jīng)啟動(dòng)了。
