ActiveMQ 是一個(gè)非常強(qiáng)大的開(kāi)源消息總線,并且極其簡(jiǎn)單的設(shè)計(jì)可以輕松應(yīng)用于項(xiàng)目。以下示例僅用了必要的XML配置,以進(jìn)行消息交換。
在消息發(fā)送/接收端,ActiveMQ 連接工廠須被創(chuàng)建,我們先創(chuàng)建雙方公用的工廠。
公共配置 spring-common.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- ActiveMQ 連接工廠 -->
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<!-- 連接工廠定義 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="activeMQConnectionFactory"/>
</bean>
</beans>
發(fā)送方 spring-sender.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<import resource="spring-common.xml"/>
<!-- 默認(rèn)的目的地隊(duì)列定義 -->
<bean id="activeMQQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="queue-test"/>
</bean>
<!-- JmsTemplate 定義 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestination" ref="activeMQQueue"/>
</bean>
<!-- 消息發(fā)送者定義 -->
<bean id="messageSender" class="com.caobug.demo.springjmsactivemq.MessageSender">
<constructor-arg index="0" ref="jmsTemplate"/>
</bean>
</beans>
我們需要?jiǎng)?chuàng)建一個(gè)類用于消息發(fā)送:
package com.caobug.demo.springjmsactivemq;
import org.springframework.jms.core.JmsTemplate;
import java.util.Map;
/**
* Created by caobug on 16/6/23.
*
* @author caobug
*/
public class MessageSender {
private final JmsTemplate jmsTemplate;
public MessageSender(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void sender(final Map<String, Object> map) {
jmsTemplate.convertAndSend(map);
}
}
接收方 spring-receiver.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<import resource="spring-common.xml"/>
<!-- 消息接收者定義 -->
<bean id="messageReceiver" class="com.caobug.demo.springjmsactivemq.MessageReceiver"/>
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="queue-test"/>
<property name="messageListener" ref="messageReceiver"/>
</bean>
</beans>
我們還需要?jiǎng)?chuàng)建類用來(lái)接收消息:
package com.caobug.demo.springjmsactivemq;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.Enumeration;
/**
* Created by caobug on 16/6/23.
*
* @author caobug
*/
public class MessageReceiver implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof MapMessage) {
final MapMessage mapMessage = (MapMessage) message;
try {
Enumeration mapNames = mapMessage.getMapNames();
while (mapNames.hasMoreElements()) {
String name = (String) mapNames.nextElement();
Object value = mapMessage.getObject(name);
System.out.println("value = " + value);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
System.out.println("已接收消息");
}
}
到此為止我們的代碼寫完了。以下為測(cè)試發(fā)送方和消費(fèi)方,我們需要執(zhí)行以下三個(gè)類的JUNIT測(cè)試方法,順序不分先后。值得注意的是,由于默認(rèn)配置為點(diǎn)對(duì)點(diǎn),因此消費(fèi)者不可能會(huì)同時(shí)接受同一個(gè)消息。
測(cè)試:發(fā)送方
package com.caobug.demo.springjmsactivemq;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Created by caobug on 16/6/23.
*
* @author caobug
*/
public class MessageSenderTest {
@Test
public void sender() throws Exception {
ApplicationContext ctx = new ClassPathXmlApplicationContext("spring-sender.xml");
MessageSender messageSender = ctx.getBean("messageSender", MessageSender.class);
for (int i = 0; i < 148; i++) {
Map<String, Object> content = new HashMap<>();
content.put("name", "caobug");
content.put("age", i);
content.put("will", "say hello");
messageSender.sender(content);
TimeUnit.SECONDS.sleep(1);
}
}
}
測(cè)試:接收方 A
package com.caobug.demo.springjmsactivemq;
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.concurrent.TimeUnit;
/**
* Created by caobug on 16/6/23.
*
* @author caobug
*/
public class MessageReceiverATest {
@Test
public void receiverA() throws Exception {
new ClassPathXmlApplicationContext("spring-receiver.xml");
TimeUnit.DAYS.sleep(Integer.MAX_VALUE);
}
}
測(cè)試:接收方 B
package com.caobug.demo.springjmsactivemq;
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.concurrent.TimeUnit;
/**
* Created by caobug on 16/6/23.
*
* @author caobug
*/
public class MessageReceiverBTest {
@Test
public void receiverA() throws Exception {
new ClassPathXmlApplicationContext("spring-receiver.xml");
TimeUnit.DAYS.sleep(Integer.MAX_VALUE);
}
}
** 搞定!以上示例編譯環(huán)境為JDK7,JAR依賴(gradle config)如下 **
dependencies {
compile group: 'org.springframework', name: 'spring-jms', version: '4.2.5.RELEASE'
compile group: 'org.apache.activemq', name: 'activemq-core', version: '5.7.0'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
** 完整項(xiàng)目結(jié)構(gòu)看起來(lái)是這樣:**

Paste_Image.png