Java消息服務(wù) —ActiveMQ實(shí)戰(zhàn)

1.JMS—Java消息服務(wù)

Java消息服務(wù)(Java Message Service,JMS)應(yīng)用程序接口是一個(gè)Java平臺中關(guān)于面向消息中間件(MOM)的API,用于在兩個(gè)應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java消息服務(wù)是一個(gè)與具體平臺無關(guān)的API,絕大多數(shù)MOM提供商都對JMS提供支持。
JMS消息服務(wù)的規(guī)范包含兩種消息模式

  • 點(diǎn)對點(diǎn)
  • 發(fā)布者/訂閱者

常用的消息中間件

  • ActiveMQ
  • kafka
  • RabbitMQ
  • RocketMQ

JMS API

  • ConnectionFactory
  • Connection
  • Session
  • Destination
    目標(biāo)是一個(gè)包裝了消息目標(biāo)標(biāo)識符的被管對象,消息目標(biāo)是指消息發(fā)布和接收的地點(diǎn),或者是隊(duì)列,或者是主題
  • MessageConsumer
    由會話創(chuàng)建的對象,用于接收發(fā)送到目標(biāo)的消息
  • MessageProducer
    由會話創(chuàng)建的對象,用于發(fā)送消息到目標(biāo)
  • Message
    是在消費(fèi)者和生產(chǎn)者之間傳送的對象,也就是說從一個(gè)應(yīng)用程序創(chuàng)送到另一個(gè)應(yīng)用程序

消息的類型

  • StreamMessage
    Java原始數(shù)據(jù)流
  • MapMessage
    鍵值對
  • TextMessage
    字符串對象
  • ObjectMessage
    序列化對象
  • ByteMessage
    字節(jié)數(shù)據(jù)流

2.windows環(huán)境下載安裝ActiveMQ

? 下載 http://activemq.apache.org/download.html
? 解壓文件夾,雙擊 home/bin/winXX/wrapper.exe 進(jìn)行啟動(dòng)
? 瀏覽器中訪問 http://localhost:8161
? 管理員賬號和密碼為 admin / admin

下載界面

apache-activemq-5.15.2解壓后的文件

bin存放的是腳本文件
conf存放的是基本配置文件
data存放的是日志文件
docs存放的是說明文檔
examples存放的是簡單的實(shí)例
lib存放的是activemq所需jar包
webapps用于存放項(xiàng)目的目錄

ActiveMQ默認(rèn)啟動(dòng)時(shí),啟動(dòng)了內(nèi)置的jetty服務(wù)器,提供一個(gè)用于監(jiān)控ActiveMQ的admin應(yīng)用。 雙擊 home/bin/winXX/wrapper.exe 進(jìn)行啟動(dòng):


啟動(dòng)ActiveMQ

瀏覽器中訪問 http://localhost:8161

image.png

管理員賬號和密碼為 admin / admin進(jìn)行登錄:
image.png

到這里為止,ActiveMQ 服務(wù)端就啟動(dòng)完畢了。

3.創(chuàng)建JMS-ActiveMQ工程

添加maven依賴:


maven依賴

3.1點(diǎn)對點(diǎn)消息

在點(diǎn)對點(diǎn)或隊(duì)列模型下,一個(gè)生產(chǎn)者向一個(gè)特定的隊(duì)列發(fā)布消息,一個(gè)消費(fèi)者從該隊(duì)列中讀取消息。這里,生產(chǎn)者知道消費(fèi)者的隊(duì)列,并直接將消息發(fā)送到消費(fèi)者的隊(duì)列。這種模式被概括為:
? 只有一個(gè)消費(fèi)者將獲得消息
? 生產(chǎn)者不需要在接收者消費(fèi)該消息期間處于運(yùn)行狀態(tài),接收者也同樣不需要在消息發(fā)送時(shí)處于運(yùn)行狀態(tài)。
? 每一個(gè)成功處理的消息都由接收者簽收

點(diǎn)對點(diǎn)消息

①創(chuàng)建消息生產(chǎn)者

    public void producer() throws JMSException {
        //1.創(chuàng)建ConnectionFactory  連接工廠
        String brokerUrl = "tcp://localhost:61616";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
        //2.創(chuàng)建Connection 連接對象
        Connection connection = connectionFactory.createConnection();
        //開啟連接
        connection.start();
        //3.創(chuàng)建Session事務(wù)管理,通過參數(shù)設(shè)置事務(wù)級別
        Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
        //4.創(chuàng)建Destination 目的地對象
        Destination destination = session.createQueue("text-Message");
        //5.創(chuàng)建消息的生產(chǎn)者
        MessageProducer messageProducer = session.createProducer(destination);
        //6.創(chuàng)建一條消息
        TextMessage textMessage = session.createTextMessage("測試-消息生產(chǎn)者");
        //7.發(fā)送消息
        messageProducer.send(textMessage);
        //提交事務(wù)
        session.commit();
        //8.釋放資源
        messageProducer.close();
        session.close();
        connection.close();
    }

②創(chuàng)建消息消費(fèi)者

    public void consumer() throws JMSException, IOException {
        //1.創(chuàng)建ConnectionFactory
        String brokerUrl = "tcp://localhost:61616";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
        //2.創(chuàng)建Connection
        Connection connection = connectionFactory.createConnection();
        connection.start();
        //3.創(chuàng)建Session
        Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
        //4.創(chuàng)建Destination 目的地對象
        Destination destination = session.createQueue("text-Message");
        //5.創(chuàng)建消費(fèi)者
        MessageConsumer messageConsumer = session.createConsumer(destination);
        //6.消費(fèi)消息,監(jiān)聽隊(duì)列中的消息,若有新消息,會執(zhí)行onMessage方法
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //避免單元測試停止
        System.in.read();
        //7.釋放資源
        messageConsumer.close();
        session.close();
        connection.close();
    }

③運(yùn)行ActiveMQ項(xiàng)目
生產(chǎn)者運(yùn)行結(jié)果:

生產(chǎn)者

消費(fèi)者運(yùn)行結(jié)果:
消費(fèi)者

3.2發(fā)布/訂閱 消息

發(fā)布者/訂閱者模型支持向一個(gè)特定的消息主題發(fā)布消息。0或多個(gè)訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發(fā)布者和訂閱者彼此不知道對方。這種模式被概括為:
? 多個(gè)消費(fèi)者可以獲得消息
? 在發(fā)布者和訂閱者之間存在時(shí)間依賴性。發(fā)布者需要建立一個(gè)訂閱(subscription),以便客戶能夠購訂閱。訂閱者必須保持持續(xù)的活動(dòng)狀態(tài)以接收消息。

發(fā)布/訂閱 消息

消息發(fā)布者:

//1.  創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//2.  創(chuàng)建連接 并 開啟
Connection connection = connectionFactory.createConnection();
connection.start();
//3.  創(chuàng)建 Session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4.  創(chuàng)建 Topic 對象
Topic topic = session.createTopic("weixin-Topic");
//5.  創(chuàng)建生產(chǎn)者
MessageProducer producer = session.createProducer(topic);
//6.  發(fā)送消息
TextMessage textMessage = session.createTextMessage("Hello,Topic MQ");
producer.send(textMessage);
//7.  釋放資源
producer.close();
session.close();
connection.close();

消息訂閱者:

        //1.  創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new
                ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
        //2.  創(chuàng)建并啟動(dòng)連接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        //3.  創(chuàng)建 Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.  創(chuàng)建目的地對象
        Topic topic = session.createTopic("weixin-Topic");
        //5.  創(chuàng)建消費(fèi)者
        MessageConsumer consumer = session.createConsumer(topic);
        //6.  獲取消息
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println(textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.in.read();
        //7.  釋放資源
        consumer.close();
        session.close();
        connection.close();
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容