前言
RocketMQ是阿里巴巴在2012年開(kāi)源的分布式消息中間件,目前已經(jīng)捐贈(zèng)給Apache基金會(huì),已經(jīng)于2016年11月成為 Apache 孵化項(xiàng)目,相信RocketMQ的未來(lái)會(huì)發(fā)揮著越來(lái)越大的作用,將有更多的開(kāi)發(fā)者因此受益。
本文僅對(duì)RocketMQ的簡(jiǎn)單實(shí)用做入門(mén)性介紹,不對(duì)RocketMQ的底層原理進(jìn)行深入介紹,后續(xù)文章將對(duì)RocketMQ的原理做詳細(xì)介紹。
1、RocketMQ的Maven依賴,pom.xml中引入jar包:
<!-- RocketMQ -->
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.5.8</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>3.5.8</version>
<type>pom</type>
</dependency>
2、Spring bean 配置單例
<!-- 監(jiān)聽(tīng)實(shí)現(xiàn) -->
<bean id="rocketMqListener" class="com.rocketMq.service.impl.RocketMqListenerImpl"></bean>
<!-- 監(jiān)聽(tīng)配置 -->
<bean id="consumer" class="com.rocketMq.utils.Consumer" init-method="init">
<property name="consumerGroup" value="PushConsumer"/>
<property name="namesrvAddr" value="192.168.0.1:19876"/>
<property name="topic" value="node_topic"/>
<property name="subExpression" value="slu"/>
<property name="rocketMqListener" ref="rocketMqListener"/>
</bean>
3、Java源碼
1)、RocketMqListener.java
package com.rocketMq.dao;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* 消費(fèi)者監(jiān)聽(tīng)接口,業(yè)務(wù)需要實(shí)現(xiàn)此接口并配置到Consumer中
* Created by 麥克勞林
*/
public interface RocketMqListener {
boolean RocketMqMessage(MessageExt ext,ConsumeConcurrentlyContext Context);
}
2)、
package com.rocketMq.dao;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* 監(jiān)聽(tīng)wrapper,用于處理共通業(yè)務(wù),并轉(zhuǎn)發(fā)消息到業(yè)務(wù)監(jiān)聽(tīng)中。
* Created by 麥克勞林
*/
public class RocketMqWrapper implements MessageListenerConcurrently{
private RocketMqListener rocketMqListener;
public RocketMqListener getRocketMqMessageListener() {
return rocketMqListener;
}
public void setRocketMqListener(RocketMqListener rocketMqListener) {
this.rocketMqListener = rocketMqListener;
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messagesList,ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if(ext.getTags().equals("RFID")){
if(rocketMqListener.RFIDMessage(ext, consumeConcurrentlyContext)){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else{
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
}