消息中間件ActiveMQ

消息中間件概述

中間件介紹

什么是中間件?

非底層操作系統(tǒng)軟件,非業(yè)務(wù)應(yīng)用軟件,不是直接給最終用戶使用的,不能直接給用戶帶來價值的軟件統(tǒng)稱中間件。

什么是消息中間件

關(guān)注于數(shù)據(jù)的發(fā)送與接收,利用高效可靠的異步消息傳遞機(jī)制集成分布式系統(tǒng)。

消息中間件圖示


應(yīng)用A通過應(yīng)用程序接口向消息中間件發(fā)送消息,應(yīng)用B通過應(yīng)用程序接口向消息中間件接收消息。

什么是JMS

Java消息服務(wù)(Java Message Service)即JMS,是一個Java平臺中關(guān)于面向消息中間件的API,用于在兩個應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送/接收消息,進(jìn)行異步通信。

什么是AMQ

AMQP,即Advanced Message Queuing Protocol,一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)協(xié)議,基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端中間件不同產(chǎn)品,不同的開發(fā)語言等條件的限制。

JMS和AMQP對比

常見消息中間件對比

ActiveMQ

  • 多種語言和協(xié)議編寫客戶端,語言:Java、C語言、C++、C#、Ruby、Perl、Python、PHP。應(yīng)用協(xié)議:OpenWire、Stomp REST、WS Notification、XMPP、AMQP。
  • 完全支持JMS1.1和J2EE1.4規(guī)范(持久化、XA消息、事務(wù))
  • 虛擬主題、組合目的、鏡像隊列

RabbitMQ

  • 支持多種客戶端,如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等
  • AMQP的完整實(shí)現(xiàn)
  • 事務(wù)支持/發(fā)布確認(rèn)
  • 消息持久化

Kafka

Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),是一個分布式的、分區(qū)的、可靠的分布式日志存儲服務(wù)。它通過一種獨(dú)一無二的設(shè)計提供了消息系統(tǒng)的功能。

特性
  • 消息的持久化
  • 高吞吐量
  • Partition、Consumer Group
綜合對比

JMS規(guī)范

Java消息服務(wù)定義

Java消息服務(wù)(Java Message Service)即JMS,是一個Java平臺中關(guān)于面向消息中間件的API,用于在兩個應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送/接收消息,進(jìn)行異步通信。

JMS概念

  • 提供者:實(shí)現(xiàn)JMS規(guī)范的消息中間件服務(wù)器
  • 客戶端:發(fā)送或接收消息額應(yīng)用程序
  • 生產(chǎn)者/發(fā)布者:創(chuàng)建并發(fā)送消息的客戶端
  • 消費(fèi)者/訂閱者:接收并處理消息的客戶端
  • 消息: 應(yīng)用程序間傳遞的數(shù)據(jù)內(nèi)容
  • 消息模式:在客戶端間傳遞消息的方式,JMS中定義了主題和隊列兩種模式

消息模式

隊列模型

  • 客戶端包括生產(chǎn)者和消費(fèi)者
  • 隊列中的消息只能被一個消費(fèi)者消費(fèi)
  • 消費(fèi)者可以隨時消費(fèi)隊列中的消息
隊列模型示意圖

主題模型

  • 客戶端包括生產(chǎn)者和消費(fèi)者
  • 主題中的消息被所有訂閱者消費(fèi)
  • 消費(fèi)者不能消費(fèi)訂閱之前就發(fā)送到主題中的消息
隊列模型示意圖

JMS編碼規(guī)范

  • ConnectionFactory:用于創(chuàng)建連接到消息中間件的連接工廠
  • Connection:“鏈接”,代表了應(yīng)用程序和消息服務(wù)器之間的通信鏈路
  • Destination:“目的地”,指消息發(fā)布和接收的地點(diǎn),包括隊列或主題
  • Session:“會話”,表示一個單線程的上下文,用于發(fā)送和接收消息
  • MessageConsumer:“消費(fèi)者”,一種可以向JMS提供獲取消息的客戶端類型
  • MessageProducer:“生產(chǎn)者”,消費(fèi)者和生產(chǎn)者間傳送的對象,消息頭,一組消息屬性,一個消息體

JMS編碼接口之間的關(guān)系

windows下安裝ActiveMQ

Linux下安裝ActiveMQ

隊列模式的消息演示

pom.xml

    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.13.0</version>
        </dependency>
    </dependencies>

ActiveMqProducer.java

package com.cxy.jms.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Auther: cxy
 * @Date: 2019/7/4
 * @Description: 生產(chǎn)者
 */
public class ActiveMqProducer {
    private static final String url = "tcp://192:168.31.10:61616";
    private static final String queueName = "queue-test";

    public static void main(String[] args) throws JMSException {
        //1.創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        //2.創(chuàng)建Connection
        Connection connection = connectionFactory.createConnection();

        //3.啟動連接
        connection.start();

        //4.創(chuàng)建會話
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //5.創(chuàng)建一個目標(biāo)
        Destination destination = session.createQueue(queueName);

        //6.創(chuàng)建一個生產(chǎn)者
        MessageProducer producer = session.createProducer(destination);

        for(int i=0;i<100;i++){
            //7.創(chuàng)建消息
            TextMessage textMessage = session.createTextMessage("test" + i);

            //8.發(fā)布消息
            producer.send(textMessage);

            System.out.println("發(fā)送消息:"+textMessage.getText());
        }

        //9.關(guān)閉連接
        connection.close();
    }
}

ActiveMqConsumer.java

package com.cxy.jms.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Auther: cxy
 * @Date: 2019/7/4
 * @Description: 消費(fèi)者
 */
public class ActiveMqConsumer {
    private static final String url = "tcp://192:168.31.10:61616";
    private static final String queueName = "queue-test";

    public static void main(String[] args) throws JMSException {
        //1.創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        //2.創(chuàng)建Connection
        Connection connection = connectionFactory.createConnection();

        //3.啟動連接
        connection.start();

        //4.創(chuàng)建會話
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //5.創(chuàng)建一個目標(biāo)
        Destination destination = session.createQueue(queueName);

        //6.創(chuàng)建消費(fèi)者
        MessageConsumer consumer = session.createConsumer(destination);

        //7.創(chuàng)建一個監(jiān)聽器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收消息:"+textMessage.getText());
                }catch (JMSException e){
                    e.printStackTrace();
                }
            }
        });

    }
}

主題模式的消息演示

ActiveMqProducer.java

package com.cxy.jms.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Auther: cxy
 * @Date: 2019/7/4
 * @Description: 生產(chǎn)者
 */
public class ActiveMqProducer {
    private static final String url = "tcp://192:168.31.10:61616";
    private static final String topicName = "topic-test";

    public static void main(String[] args) throws JMSException {
        //1.創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        //2.創(chuàng)建Connection
        Connection connection = connectionFactory.createConnection();

        //3.啟動連接
        connection.start();

        //4.創(chuàng)建會話
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //5.創(chuàng)建一個目標(biāo)
        Destination destination = session.createTopic(topicName);

        //6.創(chuàng)建一個生產(chǎn)者
        MessageProducer producer = session.createProducer(destination);

        for(int i=0;i<100;i++){
            //7.創(chuàng)建消息
            TextMessage textMessage = session.createTextMessage("test" + i);

            //8.發(fā)布消息
            producer.send(textMessage);

            System.out.println("發(fā)送消息:"+textMessage.getText());
        }

        //9.關(guān)閉連接
        connection.close();
    }
}

ActiveMqConsumer.java

package com.cxy.jms.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Auther: cxy
 * @Date: 2019/7/4
 * @Description: 消費(fèi)者
 */
public class ActiveMqConsumer {
    private static final String url = "tcp://192:168.31.10:61616";
    private static final String topicName = "topic-test";

    public static void main(String[] args) throws JMSException {
        //1.創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        //2.創(chuàng)建Connection
        Connection connection = connectionFactory.createConnection();

        //3.啟動連接
        connection.start();

        //4.創(chuàng)建會話
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //5.創(chuàng)建一個目標(biāo)
        Destination destination = session.createTopic(topicName);

        //6.創(chuàng)建消費(fèi)者
        MessageConsumer consumer = session.createConsumer(destination);

        //7.創(chuàng)建一個監(jiān)聽器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收消息:"+textMessage.getText());
                }catch (JMSException e){
                    e.printStackTrace();
                }
            }
        });

    }
}

activemq模式區(qū)分

隊列模式:生產(chǎn)者發(fā)送消息,所有消費(fèi)者對消息進(jìn)行平分,已消費(fèi)的消息不能重新消費(fèi)
主題模式:生產(chǎn)者發(fā)送消息,所有已訂閱主題的消費(fèi)者都能收到消息。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容