activeMQ中的Virtual Topics詳解及使用

一、好言

太遠(yuǎn)容易生疏,太近容易情盡。

二、背景

最近接手項目,公司的MQ做了一層封裝,挺好用的,會有一片文章記載,然后在其中我們使用了<a >虛擬話題</a>的概念,這個我沒有使用過,之前一直都是使用單純的隊列或者topic,所以就查詢資料,自己配制寫測試案列看看實(shí)際效果。

三、直接先上測試代碼

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Mahone Wu on 2017/4/12.
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/applicationContext-v.xml")
public class TopicTest {

    private Logger logger = LoggerFactory.getLogger(SpringJmsTopicTest.class);

    ActiveMQConnectionFactory factoryA;

    Session session;


    @Before
    public void init(){
        try{
            factoryA = getAMQConnectionFactory();
            ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection();
            conn.start();
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        }catch (Exception e){
            e.printStackTrace();
        }

    }

    @Test
    public void testNormalTopic(){
        try {
            ActiveMQTopic queue = new ActiveMQTopic(getNormalTopicName());
            MessageConsumer consumer1 = session.createConsumer(queue);
            MessageConsumer consumer2 = session.createConsumer(queue);
            final AtomicInteger count = new AtomicInteger(0);
            MessageListener listenerA = new MessageListener() {
                public void onMessage(javax.jms.Message message) {

                    try{
                        int index =count.incrementAndGet();
                        logger.info("index={}---------->receive from {},消息={}",index,getNormalTopicName(),((TextMessage)message).getText());
                        Thread.sleep(10L);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            };
            consumer1.setMessageListener(listenerA);
            consumer2.setMessageListener(listenerA);

            MessageProducer producer = session.createProducer(new ActiveMQTopic(getNormalTopicName()));
            int index = 0;
            while (index++ < 10) {
             //   logger.info("{},{}",index < 100,index + " message.");
                TextMessage message = session.createTextMessage(index
                        + " message.");
                producer.send(message);
                Thread.sleep(5L);
            }
        }catch (Exception e){
            e.printStackTrace();
        }

        try {
            System.in.read();
        }catch (Exception e){
            e.printStackTrace();
        }

    }



    @Test
    public void testNormalVirtualTopic(){
        try{
            Queue queue = new ActiveMQQueue(getVirtualTopicConsumerName());
            MessageConsumer consumer1 = session.createConsumer(queue);
            MessageConsumer consumer2 = session.createConsumer(queue);
            final AtomicInteger count = new AtomicInteger(0);

            MessageListener listenerA = new MessageListener() {
                public void onMessage(javax.jms.Message message) {
                    try {
                        int index = count.getAndIncrement();
                        logger.info("index={}---------->receive from {},消息={}", index, getNormalTopicName(), ((TextMessage) message).getText());
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            };
            consumer1.setMessageListener(listenerA);
            consumer2.setMessageListener(listenerA);
            MessageProducer producer = session.createProducer(new ActiveMQTopic(getNormalVTopicName()));
            int index = 0;
            while (index++ < 10) {
                TextMessage message = session.createTextMessage(index
                        + " message.");
                producer.send(message);
            }
        }catch (Exception e){
            e.printStackTrace();
        }

    }

    @Test
    public void testVirtualTopic(){
        try {
            Queue queue = new ActiveMQQueue(getVirtualTopicConsumerNameA());
            MessageConsumer consumer1 = session.createConsumer(queue);
            MessageConsumer consumer2 = session.createConsumer(queue);
            MessageConsumer consumer3 = session.createConsumer(new ActiveMQQueue(getVirtualTopicConsumerNameB()));

            final AtomicInteger countA = new AtomicInteger(0);
            MessageListener listenerA = new MessageListener() {
                public void onMessage(javax.jms.Message message) {
                    try {
                        int index = countA.getAndIncrement();
                        logger.info("A index={}---------->receive from {},消息={}", index, getNormalTopicName(), ((TextMessage) message).getText());
                    }catch (Exception e){
                        logger.error(""+e);
                        e.printStackTrace();
                    }
                }
            };
            consumer1.setMessageListener(listenerA);
            consumer2.setMessageListener(listenerA);
            final AtomicInteger countB = new AtomicInteger(0);
            MessageListener listenerB = new MessageListener() {
                public void onMessage(javax.jms.Message message) {
                    try {
                        int index = countB.getAndIncrement();
                        logger.info("B index={}---------->receive from {},消息={}", index, getNormalTopicName(), ((TextMessage) message).getText());
                    }catch (Exception e){
                        e.printStackTrace();
                        logger.error(""+e);
                    }
                }
            };
            consumer3.setMessageListener(listenerB);
            MessageProducer producer = session.createProducer(new ActiveMQTopic(getVirtualTopicName()));
            int index = 0;
            while (index++ < 10) {
                TextMessage message = session.createTextMessage(index
                        + " message.");
                producer.send(message);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    private ActiveMQConnectionFactory getAMQConnectionFactory(){
        return new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
    }

    private static String getNormalTopicName(){
        return "normal.TEST";
    }

    private static String getNormalVTopicName(){
        return "VirtualTopic.NORMAL";
    }

    private static String getVirtualTopicName(){
        return "VirtualTopic.TEST";
    }

    private static String getVirtualTopicConsumerName(){
        return "Consumer.normal.VirtualTopic.NORMAL";
    }

    private static String getVirtualTopicConsumerNameA(){
        return "Consumer.A.VirtualTopic.TEST";
    }

    private static String getVirtualTopicConsumerNameB(){
        return "Consumer.B.VirtualTopic.TEST";
    }
}

applicationContext-v.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:p="http://www.springframework.org/schema/p" xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context.xsd">
</beans>

testNormalTopic打印結(jié)果:

testNormalTopic.png

結(jié)論:
從這個里面我們可以看出,consumer1,consumer2都監(jiān)聽了normal.TEST,所以結(jié)果是打印了20條數(shù)據(jù)

testNormalVirtualTopic打印結(jié)果:

testNormalVirtualTopic.png

結(jié)論:
虛擬的方式,consumer1,consumer2,都訂閱了VirtualTopic.NORMAL,所以按照配制消費(fèi)端Consumer.normal.VirtualTopic.NORMAL的方式,結(jié)果只打印了10條數(shù)據(jù),所以此時consumer1,consumer2只有一個接收成功。

testVirtualTopic打印結(jié)果


testVirtualTopic.png

結(jié)論:
這個跟上面這個類似,對比更佳明顯的說明了VirtualTopic中,A,B是兩個不同的應(yīng)用,所以打印了20條,如果是同一個應(yīng)用,則只會又一個接收成功。

Consumer.A.VirtualTopic.Test
1、Consumer ,VirtualTopic 為系統(tǒng)配置,勿做修改。**
2、A為消費(fèi)方的系統(tǒng)名稱。
3、Test 為根據(jù)業(yè)務(wù)定義的消息地址
<a >官方文檔</a>有做相關(guān)說明。

四、虛擬主題用處

摘自網(wǎng)上:

  1. 同一應(yīng)用內(nèi)consumer端負(fù)載均衡的問題:同一個應(yīng)用上的一個持久訂閱不能使用多個consumer來共同承擔(dān)消息處理功能。因為每個都會獲取所有消息。queue模式可以解決這個問題,broker端又不能將消息發(fā)送到多個應(yīng)用端。所以,既要發(fā)布訂閱,又要讓消費(fèi)者分組,這個功能jms規(guī)范本身是沒有的。
  1. 同一應(yīng)用內(nèi)consumer端failover的問題:由于只能使用單個的持久訂閱者,如果這個訂閱者出錯,則應(yīng)用就無法處理消息了,系統(tǒng)的健壯性不高。
    對于上述的表述個人覺得理解起來好糾結(jié),因為這里又涉及到持久化問題,對于持久訂閱的意義可以看這篇<a >文章</a>

所以我個人覺得這里解決的就是對于如果單純的使用topic方式,那么如果消費(fèi)端部署的是集群方式,那么每一個都訂閱了,在發(fā)送消息的時候,集群中的每一個訂閱者都有可能收到,那么這不是我們想要的效果;可能上面說的有這么一個方面還有一個就是涉及到持久定于的健壯性問題。
所以virtualtopic我們可以理解為是queue和topic的結(jié)合,即是,使用topic的一對多的廣播功能,又需要在集群的時候,只有一個收到,也就是隊列的一對一的特效。

五:代碼

代碼放github了,主要在單元測試?yán)锩婵梢钥纯?br> 地址:https://github.com/MahoneWu/mq

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容