消息中間件概述
中間件介紹
什么是中間件?
非底層操作系統(tǒng)軟件,非業(yè)務(wù)應(yīng)用軟件,不是直接給最終用戶使用的,不能直接給用戶帶來價值的軟件統(tǒng)稱中間件。
什么是消息中間件
關(guān)注于數(shù)據(jù)的發(fā)送與接收,利用高效可靠的異步消息傳遞機(jī)制集成分布式系統(tǒng)。
消息中間件圖示

應(yīng)用A通過應(yīng)用程序接口向消息中間件發(fā)送消息,應(yīng)用B通過應(yīng)用程序接口向消息中間件接收消息。
什么是JMS
Java消息服務(wù)(Java Message Service)即JMS,是一個Java平臺中關(guān)于面向消息中間件的API,用于在兩個應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送/接收消息,進(jìn)行異步通信。
什么是AMQ
AMQP,即Advanced Message Queuing Protocol,一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)協(xié)議,基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端中間件不同產(chǎn)品,不同的開發(fā)語言等條件的限制。
JMS和AMQP對比

常見消息中間件對比
ActiveMQ
- 多種語言和協(xié)議編寫客戶端,語言:Java、C語言、C++、C#、Ruby、Perl、Python、PHP。應(yīng)用協(xié)議:OpenWire、Stomp REST、WS Notification、XMPP、AMQP。
- 完全支持JMS1.1和J2EE1.4規(guī)范(持久化、XA消息、事務(wù))
- 虛擬主題、組合目的、鏡像隊列
RabbitMQ
- 支持多種客戶端,如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等
- AMQP的完整實(shí)現(xiàn)
- 事務(wù)支持/發(fā)布確認(rèn)
- 消息持久化
Kafka
Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),是一個分布式的、分區(qū)的、可靠的分布式日志存儲服務(wù)。它通過一種獨(dú)一無二的設(shè)計提供了消息系統(tǒng)的功能。
特性
- 消息的持久化
- 高吞吐量
- Partition、Consumer Group

JMS規(guī)范
Java消息服務(wù)定義
Java消息服務(wù)(Java Message Service)即JMS,是一個Java平臺中關(guān)于面向消息中間件的API,用于在兩個應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送/接收消息,進(jìn)行異步通信。
JMS概念
- 提供者:實(shí)現(xiàn)JMS規(guī)范的消息中間件服務(wù)器
- 客戶端:發(fā)送或接收消息額應(yīng)用程序
- 生產(chǎn)者/發(fā)布者:創(chuàng)建并發(fā)送消息的客戶端
- 消費(fèi)者/訂閱者:接收并處理消息的客戶端
- 消息: 應(yīng)用程序間傳遞的數(shù)據(jù)內(nèi)容
- 消息模式:在客戶端間傳遞消息的方式,JMS中定義了主題和隊列兩種模式
消息模式
隊列模型
- 客戶端包括生產(chǎn)者和消費(fèi)者
- 隊列中的消息只能被一個消費(fèi)者消費(fèi)
- 消費(fèi)者可以隨時消費(fèi)隊列中的消息
隊列模型示意圖

主題模型
- 客戶端包括生產(chǎn)者和消費(fèi)者
- 主題中的消息被所有訂閱者消費(fèi)
- 消費(fèi)者不能消費(fèi)訂閱之前就發(fā)送到主題中的消息
隊列模型示意圖

JMS編碼規(guī)范
- ConnectionFactory:用于創(chuàng)建連接到消息中間件的連接工廠
- Connection:“鏈接”,代表了應(yīng)用程序和消息服務(wù)器之間的通信鏈路
- Destination:“目的地”,指消息發(fā)布和接收的地點(diǎn),包括隊列或主題
- Session:“會話”,表示一個單線程的上下文,用于發(fā)送和接收消息
- MessageConsumer:“消費(fèi)者”,一種可以向JMS提供獲取消息的客戶端類型
- MessageProducer:“生產(chǎn)者”,消費(fèi)者和生產(chǎn)者間傳送的對象,消息頭,一組消息屬性,一個消息體
JMS編碼接口之間的關(guān)系

windows下安裝ActiveMQ
Linux下安裝ActiveMQ
隊列模式的消息演示
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.0</version>
</dependency>
</dependencies>
ActiveMqProducer.java
package com.cxy.jms.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Auther: cxy
* @Date: 2019/7/4
* @Description: 生產(chǎn)者
*/
public class ActiveMqProducer {
private static final String url = "tcp://192:168.31.10:61616";
private static final String queueName = "queue-test";
public static void main(String[] args) throws JMSException {
//1.創(chuàng)建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//2.創(chuàng)建Connection
Connection connection = connectionFactory.createConnection();
//3.啟動連接
connection.start();
//4.創(chuàng)建會話
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.創(chuàng)建一個目標(biāo)
Destination destination = session.createQueue(queueName);
//6.創(chuàng)建一個生產(chǎn)者
MessageProducer producer = session.createProducer(destination);
for(int i=0;i<100;i++){
//7.創(chuàng)建消息
TextMessage textMessage = session.createTextMessage("test" + i);
//8.發(fā)布消息
producer.send(textMessage);
System.out.println("發(fā)送消息:"+textMessage.getText());
}
//9.關(guān)閉連接
connection.close();
}
}
ActiveMqConsumer.java
package com.cxy.jms.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Auther: cxy
* @Date: 2019/7/4
* @Description: 消費(fèi)者
*/
public class ActiveMqConsumer {
private static final String url = "tcp://192:168.31.10:61616";
private static final String queueName = "queue-test";
public static void main(String[] args) throws JMSException {
//1.創(chuàng)建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//2.創(chuàng)建Connection
Connection connection = connectionFactory.createConnection();
//3.啟動連接
connection.start();
//4.創(chuàng)建會話
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.創(chuàng)建一個目標(biāo)
Destination destination = session.createQueue(queueName);
//6.創(chuàng)建消費(fèi)者
MessageConsumer consumer = session.createConsumer(destination);
//7.創(chuàng)建一個監(jiān)聽器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收消息:"+textMessage.getText());
}catch (JMSException e){
e.printStackTrace();
}
}
});
}
}
主題模式的消息演示
ActiveMqProducer.java
package com.cxy.jms.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Auther: cxy
* @Date: 2019/7/4
* @Description: 生產(chǎn)者
*/
public class ActiveMqProducer {
private static final String url = "tcp://192:168.31.10:61616";
private static final String topicName = "topic-test";
public static void main(String[] args) throws JMSException {
//1.創(chuàng)建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//2.創(chuàng)建Connection
Connection connection = connectionFactory.createConnection();
//3.啟動連接
connection.start();
//4.創(chuàng)建會話
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.創(chuàng)建一個目標(biāo)
Destination destination = session.createTopic(topicName);
//6.創(chuàng)建一個生產(chǎn)者
MessageProducer producer = session.createProducer(destination);
for(int i=0;i<100;i++){
//7.創(chuàng)建消息
TextMessage textMessage = session.createTextMessage("test" + i);
//8.發(fā)布消息
producer.send(textMessage);
System.out.println("發(fā)送消息:"+textMessage.getText());
}
//9.關(guān)閉連接
connection.close();
}
}
ActiveMqConsumer.java
package com.cxy.jms.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Auther: cxy
* @Date: 2019/7/4
* @Description: 消費(fèi)者
*/
public class ActiveMqConsumer {
private static final String url = "tcp://192:168.31.10:61616";
private static final String topicName = "topic-test";
public static void main(String[] args) throws JMSException {
//1.創(chuàng)建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//2.創(chuàng)建Connection
Connection connection = connectionFactory.createConnection();
//3.啟動連接
connection.start();
//4.創(chuàng)建會話
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.創(chuàng)建一個目標(biāo)
Destination destination = session.createTopic(topicName);
//6.創(chuàng)建消費(fèi)者
MessageConsumer consumer = session.createConsumer(destination);
//7.創(chuàng)建一個監(jiān)聽器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收消息:"+textMessage.getText());
}catch (JMSException e){
e.printStackTrace();
}
}
});
}
}
activemq模式區(qū)分
隊列模式:生產(chǎn)者發(fā)送消息,所有消費(fèi)者對消息進(jìn)行平分,已消費(fèi)的消息不能重新消費(fèi)
主題模式:生產(chǎn)者發(fā)送消息,所有已訂閱主題的消費(fèi)者都能收到消息。