ActiveMQ配置和使用

1.XML配置

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/aop 
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

<!-- 配置messageQueue連接工廠 -->
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
       <property name="brokerURL" value="${mq.brokerURL}"></property>
</bean>
<!-- 配置jms消息發(fā)送模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory"></property>
</bean>
<!-- 配置消息隊列 -->
<bean id="commonMsgQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 構(gòu)造函數(shù)參數(shù)  隊列的名稱 -->
        <constructor-arg value="${queue.name.commonMsgQueue}"></constructor-arg>
</bean>
<!-- 自定義監(jiān)聽類 -->
<bean id="commonMsgListener" class="listener.CommonMsgListener">
</bean>
<!-- 配置監(jiān)聽器容器 -->
<bean id="commonMsgListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <!-- MQ連接工廠 -->
    <property name="connectionFactory" ref="connectionFactory"></property>
    <!-- 消息隊列 -->
    <property name="destination" ref="jpushQueue" ></property>
    <!-- 消息監(jiān)聽器 -->
    <property name="messageListener" ref="commonMsgListener"></property>
</bean>

brokerURL= failover:(tcp://ip1:61616)?randomize=false
ip為ActiveMQ安裝的服務(wù)器的地址
randomize=false 在使用多個地址時,按照先后順序進(jìn)行連接,如 failover:(tcp://ip1:61616,tcp://ip2:61616)?randomize=false ,這時會先嘗試與ip1連接,如果連接不上再連接ip2。此參數(shù)默認(rèn)為true

2.加入消息隊列 && 監(jiān)聽器獲取消息

Put

public class PutMessage {


//加載消息發(fā)送模板
@Resource    //根據(jù)名字檢索加載類,找不到再根據(jù)類型檢索加載
private JmsTemplate jmsTemplate;
//加載公共消息隊列
@Resource   
private ActiveMQQueue commonMsgQueue;

@Transactional
public void put(final Object message){
    try{
        jmsTemplate.send(commonMsgQueue, new MessageCreator(){
            public Message createMessage(Session session) throws JMSException {
                MapMessage mapMessage = session.createMapMessage();
                mapMessage.setString("messageJson",JSON.toJSONString());
            }
        });
        LOGGER.success("加入隊列commonMsgQueue成功,消息內(nèi)容:"+JSON.toJSONString(message));
    }catch(JMSException e){
         LOGGER.warn("加入隊列commonMsgQueue失敗,消息內(nèi)容:"+JSON.toJSONString(message));
    }
}

Get

public Class CommonMsgListener implements messageListener{
 
    public void onMessage(Message message){            
          if(message instanceof ObjectMessage){
              //實現(xiàn)messageListener即可在實現(xiàn)方法中獲取到message對象
          }    
    }    
}

3.在服務(wù)器上查看隊列生產(chǎn)消費(fèi)情況

菜單 說明
Number of Pending Messages 等待被消費(fèi)的消息,還未出隊列的消息數(shù)量
Number of Consumers 消費(fèi)者的數(shù)量,正在監(jiān)聽等待這個隊列消息的消費(fèi)者的數(shù)量
Messages Enqueued 進(jìn)入隊列的消息總量
Message Dequeued 隊列被消費(fèi)的消息總量

訪問地址 ip:8161/admin
ip為安裝ActiveMQ服務(wù)的地址,與配置中的brokerURL相同

管理頁面頭部.png

菜單 說明
Number of Pending Messages 等待被消費(fèi)的消息,還未出隊列的消息數(shù)量
Number of Consumers 消費(fèi)者的數(shù)量,正在監(jiān)聽等待這個隊列消息的消費(fèi)者的數(shù)量
Messages Enqueued 進(jìn)入隊列的消息總量
Message Dequeued 隊列被消費(fèi)的消息總量
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容