1.JMS—Java消息服務(wù)
Java消息服務(wù)(Java Message Service,JMS)應(yīng)用程序接口是一個(gè)Java平臺中關(guān)于面向消息中間件(MOM)的API,用于在兩個(gè)應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java消息服務(wù)是一個(gè)與具體平臺無關(guān)的API,絕大多數(shù)MOM提供商都對JMS提供支持。
JMS消息服務(wù)的規(guī)范包含兩種消息模式
- 點(diǎn)對點(diǎn)
- 發(fā)布者/訂閱者
常用的消息中間件
- ActiveMQ
- kafka
- RabbitMQ
- RocketMQ
JMS API
- ConnectionFactory
- Connection
- Session
- Destination
目標(biāo)是一個(gè)包裝了消息目標(biāo)標(biāo)識符的被管對象,消息目標(biāo)是指消息發(fā)布和接收的地點(diǎn),或者是隊(duì)列,或者是主題 - MessageConsumer
由會話創(chuàng)建的對象,用于接收發(fā)送到目標(biāo)的消息 - MessageProducer
由會話創(chuàng)建的對象,用于發(fā)送消息到目標(biāo) - Message
是在消費(fèi)者和生產(chǎn)者之間傳送的對象,也就是說從一個(gè)應(yīng)用程序創(chuàng)送到另一個(gè)應(yīng)用程序
消息的類型
- StreamMessage
Java原始數(shù)據(jù)流 - MapMessage
鍵值對 - TextMessage
字符串對象 - ObjectMessage
序列化對象 - ByteMessage
字節(jié)數(shù)據(jù)流
2.windows環(huán)境下載安裝ActiveMQ
? 下載 http://activemq.apache.org/download.html
? 解壓文件夾,雙擊 home/bin/winXX/wrapper.exe 進(jìn)行啟動(dòng)
? 瀏覽器中訪問 http://localhost:8161
? 管理員賬號和密碼為 admin / admin


bin存放的是腳本文件
conf存放的是基本配置文件
data存放的是日志文件
docs存放的是說明文檔
examples存放的是簡單的實(shí)例
lib存放的是activemq所需jar包
webapps用于存放項(xiàng)目的目錄
ActiveMQ默認(rèn)啟動(dòng)時(shí),啟動(dòng)了內(nèi)置的jetty服務(wù)器,提供一個(gè)用于監(jiān)控ActiveMQ的admin應(yīng)用。 雙擊 home/bin/winXX/wrapper.exe 進(jìn)行啟動(dòng):

瀏覽器中訪問 http://localhost:8161:

管理員賬號和密碼為 admin / admin進(jìn)行登錄:

到這里為止,ActiveMQ 服務(wù)端就啟動(dòng)完畢了。
3.創(chuàng)建JMS-ActiveMQ工程
添加maven依賴:

3.1點(diǎn)對點(diǎn)消息
在點(diǎn)對點(diǎn)或隊(duì)列模型下,一個(gè)生產(chǎn)者向一個(gè)特定的隊(duì)列發(fā)布消息,一個(gè)消費(fèi)者從該隊(duì)列中讀取消息。這里,生產(chǎn)者知道消費(fèi)者的隊(duì)列,并直接將消息發(fā)送到消費(fèi)者的隊(duì)列。這種模式被概括為:
? 只有一個(gè)消費(fèi)者將獲得消息
? 生產(chǎn)者不需要在接收者消費(fèi)該消息期間處于運(yùn)行狀態(tài),接收者也同樣不需要在消息發(fā)送時(shí)處于運(yùn)行狀態(tài)。
? 每一個(gè)成功處理的消息都由接收者簽收

①創(chuàng)建消息生產(chǎn)者
public void producer() throws JMSException {
//1.創(chuàng)建ConnectionFactory 連接工廠
String brokerUrl = "tcp://localhost:61616";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
//2.創(chuàng)建Connection 連接對象
Connection connection = connectionFactory.createConnection();
//開啟連接
connection.start();
//3.創(chuàng)建Session事務(wù)管理,通過參數(shù)設(shè)置事務(wù)級別
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//4.創(chuàng)建Destination 目的地對象
Destination destination = session.createQueue("text-Message");
//5.創(chuàng)建消息的生產(chǎn)者
MessageProducer messageProducer = session.createProducer(destination);
//6.創(chuàng)建一條消息
TextMessage textMessage = session.createTextMessage("測試-消息生產(chǎn)者");
//7.發(fā)送消息
messageProducer.send(textMessage);
//提交事務(wù)
session.commit();
//8.釋放資源
messageProducer.close();
session.close();
connection.close();
}
②創(chuàng)建消息消費(fèi)者
public void consumer() throws JMSException, IOException {
//1.創(chuàng)建ConnectionFactory
String brokerUrl = "tcp://localhost:61616";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
//2.創(chuàng)建Connection
Connection connection = connectionFactory.createConnection();
connection.start();
//3.創(chuàng)建Session
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//4.創(chuàng)建Destination 目的地對象
Destination destination = session.createQueue("text-Message");
//5.創(chuàng)建消費(fèi)者
MessageConsumer messageConsumer = session.createConsumer(destination);
//6.消費(fèi)消息,監(jiān)聽隊(duì)列中的消息,若有新消息,會執(zhí)行onMessage方法
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//避免單元測試停止
System.in.read();
//7.釋放資源
messageConsumer.close();
session.close();
connection.close();
}
③運(yùn)行ActiveMQ項(xiàng)目
生產(chǎn)者運(yùn)行結(jié)果:

消費(fèi)者運(yùn)行結(jié)果:

3.2發(fā)布/訂閱 消息
發(fā)布者/訂閱者模型支持向一個(gè)特定的消息主題發(fā)布消息。0或多個(gè)訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發(fā)布者和訂閱者彼此不知道對方。這種模式被概括為:
? 多個(gè)消費(fèi)者可以獲得消息
? 在發(fā)布者和訂閱者之間存在時(shí)間依賴性。發(fā)布者需要建立一個(gè)訂閱(subscription),以便客戶能夠購訂閱。訂閱者必須保持持續(xù)的活動(dòng)狀態(tài)以接收消息。

消息發(fā)布者:
//1. 創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//2. 創(chuàng)建連接 并 開啟
Connection connection = connectionFactory.createConnection();
connection.start();
//3. 創(chuàng)建 Session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4. 創(chuàng)建 Topic 對象
Topic topic = session.createTopic("weixin-Topic");
//5. 創(chuàng)建生產(chǎn)者
MessageProducer producer = session.createProducer(topic);
//6. 發(fā)送消息
TextMessage textMessage = session.createTextMessage("Hello,Topic MQ");
producer.send(textMessage);
//7. 釋放資源
producer.close();
session.close();
connection.close();
消息訂閱者:
//1. 創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
//2. 創(chuàng)建并啟動(dòng)連接
Connection connection = connectionFactory.createConnection();
connection.start();
//3. 創(chuàng)建 Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4. 創(chuàng)建目的地對象
Topic topic = session.createTopic("weixin-Topic");
//5. 創(chuàng)建消費(fèi)者
MessageConsumer consumer = session.createConsumer(topic);
//6. 獲取消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
//7. 釋放資源
consumer.close();
session.close();
connection.close();