Spring 整合 ActiveMQ 入門示例

ActiveMQ 是一個(gè)非常強(qiáng)大的開(kāi)源消息總線,并且極其簡(jiǎn)單的設(shè)計(jì)可以輕松應(yīng)用于項(xiàng)目。以下示例僅用了必要的XML配置,以進(jìn)行消息交換。

在消息發(fā)送/接收端,ActiveMQ 連接工廠須被創(chuàng)建,我們先創(chuàng)建雙方公用的工廠。

公共配置 spring-common.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- ActiveMQ 連接工廠 -->
    <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>

    <!-- 連接工廠定義 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="activeMQConnectionFactory"/>
    </bean>
</beans>

發(fā)送方 spring-sender.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <import resource="spring-common.xml"/>

    <!-- 默認(rèn)的目的地隊(duì)列定義 -->
    <bean id="activeMQQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="queue-test"/>
    </bean>

    <!-- JmsTemplate 定義 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="defaultDestination" ref="activeMQQueue"/>
    </bean>

    <!-- 消息發(fā)送者定義 -->
    <bean id="messageSender" class="com.caobug.demo.springjmsactivemq.MessageSender">
        <constructor-arg index="0" ref="jmsTemplate"/>
    </bean>
</beans>

我們需要?jiǎng)?chuàng)建一個(gè)類用于消息發(fā)送:

package com.caobug.demo.springjmsactivemq;

import org.springframework.jms.core.JmsTemplate;

import java.util.Map;

/**
 * Created by caobug on 16/6/23.
 *
 * @author caobug
 */
public class MessageSender {

    private final JmsTemplate jmsTemplate;

    public MessageSender(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public void sender(final Map<String, Object> map) {
        jmsTemplate.convertAndSend(map);
    }
}

接收方 spring-receiver.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <import resource="spring-common.xml"/>

    <!-- 消息接收者定義 -->
    <bean id="messageReceiver" class="com.caobug.demo.springjmsactivemq.MessageReceiver"/>

    <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destinationName" value="queue-test"/>
        <property name="messageListener" ref="messageReceiver"/>
    </bean>
</beans>

我們還需要?jiǎng)?chuàng)建類用來(lái)接收消息:

package com.caobug.demo.springjmsactivemq;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.Enumeration;

/**
 * Created by caobug on 16/6/23.
 *
 * @author caobug
 */
public class MessageReceiver implements MessageListener {

    @Override
    public void onMessage(Message message) {
        if (message instanceof MapMessage) {
            final MapMessage mapMessage = (MapMessage) message;
            try {
                Enumeration mapNames = mapMessage.getMapNames();
                while (mapNames.hasMoreElements()) {
                    String name = (String) mapNames.nextElement();
                    Object value = mapMessage.getObject(name);
                    System.out.println("value = " + value);
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        System.out.println("已接收消息");
    }
}

到此為止我們的代碼寫完了。以下為測(cè)試發(fā)送方和消費(fèi)方,我們需要執(zhí)行以下三個(gè)類的JUNIT測(cè)試方法,順序不分先后。值得注意的是,由于默認(rèn)配置為點(diǎn)對(duì)點(diǎn),因此消費(fèi)者不可能會(huì)同時(shí)接受同一個(gè)消息。

測(cè)試:發(fā)送方

package com.caobug.demo.springjmsactivemq;

import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * Created by caobug on 16/6/23.
 *
 * @author caobug
 */
public class MessageSenderTest {

    @Test
    public void sender() throws Exception {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("spring-sender.xml");
        MessageSender messageSender = ctx.getBean("messageSender", MessageSender.class);
        for (int i = 0; i < 148; i++) {
            Map<String, Object> content = new HashMap<>();
            content.put("name", "caobug");
            content.put("age", i);
            content.put("will", "say hello");
            messageSender.sender(content);
            TimeUnit.SECONDS.sleep(1);
        }
    }
}

測(cè)試:接收方 A

package com.caobug.demo.springjmsactivemq;

import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.concurrent.TimeUnit;

/**
 * Created by caobug on 16/6/23.
 *
 * @author caobug
 */
public class MessageReceiverATest {

    @Test
    public void receiverA() throws Exception {
        new ClassPathXmlApplicationContext("spring-receiver.xml");

        TimeUnit.DAYS.sleep(Integer.MAX_VALUE);
    }
}

測(cè)試:接收方 B

package com.caobug.demo.springjmsactivemq;

import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.concurrent.TimeUnit;

/**
 * Created by caobug on 16/6/23.
 *
 * @author caobug
 */
public class MessageReceiverBTest {

    @Test
    public void receiverA() throws Exception {
        new ClassPathXmlApplicationContext("spring-receiver.xml");

        TimeUnit.DAYS.sleep(Integer.MAX_VALUE);
    }
}

** 搞定!以上示例編譯環(huán)境為JDK7,JAR依賴(gradle config)如下 **

dependencies {
    compile group: 'org.springframework', name: 'spring-jms', version: '4.2.5.RELEASE'
    compile group: 'org.apache.activemq', name: 'activemq-core', version: '5.7.0'

    testCompile group: 'junit', name: 'junit', version: '4.12'
}

** 完整項(xiàng)目結(jié)構(gòu)看起來(lái)是這樣:**

Paste_Image.png

示例源碼:https://github.com/caobug/spring-jms-activemq

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,534評(píng)論 19 139
  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 47,261評(píng)論 6 342
  • ActiveMQ 即時(shí)通訊服務(wù) 淺析http://www.cnblogs.com/hoojo/p/active_m...
    bboymonk閱讀 1,583評(píng)論 0 11
  • ActiveMQ入門教程 本博客內(nèi)容皆為網(wǎng)絡(luò)搜集而來(lái),不保證任何版權(quán)問(wèn)題,不保證長(zhǎng)期有效性(即具有時(shí)效性),如有侵...
    龍圣賢閱讀 34,114評(píng)論 6 48
  • 2014.03.03 周日照例帶喵去爺爺奶奶家玩,小家伙美得不亦樂(lè)乎,叫爺爺給魚換水、拿噴壺澆花、要么就是吹泡泡,...
    摹喵居士閱讀 414評(píng)論 0 0

友情鏈接更多精彩內(nèi)容