如何使用消息隊列發(fā)布與訂閱【易擴展】

redis消息隊列實現(xiàn)

定義消息接受注解

@Component
@Retention(value = RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RedisTopic {
    String value();
}

定義消息接受interface

public interface RedisTopicInterface<T> {
    public final static String PREX = "houdamis_";
    /**
     * 發(fā)布消息
     * @return
     */
    public String getChannel();
    /**
     * 接受消息
     * @return
     */
    public boolean receiveMsg(T message,String channel);
    /**
     * 接受消息
     * @return
     */
    public boolean isReceiver( String channel);

}

消息接受代碼示例

@RedisTopic(value = TOPIC)
public class DataSourceReloadService  implements RedisTopicInterface {
    /**
        必須參數(shù)
     * 定義消息隊列主題
     */
    public final static String TOPIC = RedisTopicInterface.PREX + "dbsync";

    public   final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public String getChannel( ) {
        return TOPIC;
    }

    /**
     * 接受消息
     *
     * @param message 消息內(nèi)容
     * @param channel 消息主題 同topic
     * @return
     */
    @Override
    public boolean receiveMsg(Object message, String channel) {
        if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
            try {
//               TODO 進行業(yè)務(wù)處理
                xxxx
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("{} 接受消息失?。簕}",TOPIC ,e.getMessage());
            }
        }
        return false;
    }

    /**
     * 接受消息
     *
     * @param channel
     * @return
     */
    @Override
    public boolean isReceiver(String channel) {
        if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
            return true;
        }
        return false;
    }
}

消息隊列工具類

public class RedisTopicUtils {
    private static RedisTemplate redisTemplate = SpringContextHolder.getBean("redisTemplate");
    private static ConcurrentHashMap<String , Set<RedisTopicInterface>> topicBeans = new ConcurrentHashMap<String, Set<RedisTopicInterface>>();

    /**
     * 發(fā)布消息
     * @param channel
     * @param messgae
     * @return
     */
    public static boolean sendMessage(String channel ,Object messgae){
        redisTemplate.convertAndSend(channel, messgae);
        return true;
    }

    /**
     * 接受消息
     * @param channel
     * @param messgae
     */
    public static void receiveMessage(String channel, Object messgae) {
        Set<RedisTopicInterface> beans = getReceiver(channel);
        for (RedisTopicInterface v : beans) {
            try {
                v.receiveMsg(messgae,channel);
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 注冊RedisTopicInterface
     * @param receiver
     */
    public static void regist(RedisTopicInterface receiver){
        String key = receiver.getClass().getAnnotation(RedisTopic.class).value();
        Set<RedisTopicInterface>  beans = getReceiver(key);
        beans.add(receiver);
        topicBeans.put(key,beans);
    }

    public static Set<RedisTopicInterface> getReceiver(String key){
        Set<RedisTopicInterface>  beans = topicBeans.get(key);
        if(beans == null){
            beans = Sets.newHashSet();
        }
        return beans;
    }

    /**
     * 初始化
     */
    public static void init(){
        try {
            Map<String, Object> serviceBeanMap = SpringContextHolder.getApplicationContext().getBeansWithAnnotation(RedisTopic.class);
            if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {
                for (Object serviceBean : serviceBeanMap.values()) {
                    String interfaceName = serviceBean.getClass().getAnnotation(RedisTopic.class).value();
                    if(null != interfaceName){
                         regist( (RedisTopicInterface) serviceBean);
                        System.out.println(serviceBean.getClass().getName()+ " regist to RedisTopic[" + interfaceName+"]");
                    }else{
                        System.err.println(serviceBean.getClass().getName()+ "can't regist to RedisTopic[" + interfaceName+"]");
                    }
                }
            }
        } catch ( Throwable e) {
            e.printStackTrace();
        }
    }


}

消息隊列-接受者注冊

系統(tǒng)啟動時進行注冊

@Service
public class DataSourceInitListener implements ApplicationListener<ContextRefreshedEvent> {

    @Override
    public void onApplicationEvent(ContextRefreshedEvent evt) {
        
        RedisTopicUtils.init();
    }
}

redis xml配置

<!-- 定義監(jiān)聽類 -->
    <bean id="redisMessageListener" class="com.thinkgem.jeesite.common.redis.topic.RedisMessageListener">
        <property name="redisTemplate" ref="redisTemplate"/>
    </bean>
    <!-- 定義監(jiān)聽容器 -->
    <bean id="redisMessageListenerContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer"
          destroy-method="destroy">
        <property name="connectionFactory" ref="jedisConnectionFactory"/>
        <!-- 任務(wù)執(zhí)行器 -->
        <property name="taskExecutor">
            <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
                <property name="poolSize" value="10"/>
            </bean>
        </property>
        <!-- 消息監(jiān)聽器 -->
        <property name="messageListeners">
            <map>
                <entry key-ref="redisMessageListener">
                    <list>
                        <bean class="org.springframework.data.redis.listener.PatternTopic">
                            <constructor-arg value="*" />
                        </bean>
                    </list>
                </entry>
            </map>
        </property>
    </bean>

消息監(jiān)聽

RedisMessageListener監(jiān)聽到消息之后,交由RedisTopicUtils處理,RedisTopicUtils根據(jù)topic找到已注冊的Set<RedisTopicInterface>,然后通知每個RedisTopicInterface元素進行處理。


/**
 * redis消息接受
 */
public class RedisMessageListener implements MessageListener {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] body = message.getBody();
        byte[] channel = message.getChannel();
        String msgChannel = (String) getRedisTemplate().getKeySerializer().deserialize(channel);
        RedisTopicUtils.receiveMessage(msgChannel,getRedisTemplate().getValueSerializer().deserialize(body));
    }
    public RedisTemplate<String, Object> getRedisTemplate() {
        return redisTemplate;
    }
    public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
    
}

消息發(fā)布

//        發(fā)布 消息
        RedisTopicUtils.sendMessage(DataSourceReloadService.TOPIC, “這是xx消息”;

如何使用消息隊列發(fā)布與訂閱

訂閱消息

新增代碼 實現(xiàn)RedisTopicInterface,添加@RedisTopic注解,并設(shè)置TOPIC對應(yīng)的主題,實現(xiàn)3個方法(有2個直接使用示例代碼即可)
主要實現(xiàn)receiveMsg()方法。

@RedisTopic(value = TOPIC)
public class AAAService  implements RedisTopicInterface {
/** * 定義消息隊列主題 */
public final static String TOPIC = RedisTopicInterface.PREX + "dbsync";
    /**
     * 接受消息
     * @param message
     * @param channel
     * @return
     */
    @Override
    public boolean receiveMsg(Object message, String channel) {
        if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
            try {
//             TODO   進行業(yè)務(wù)處理
               此處實現(xiàn)
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("{} 接受消息失敗:{}",TOPIC ,e.getMessage());
            }
        }
        return false;
    }
     @Override
    public String getChannel( ) {
        return TOPIC;
    }

    /**
     * 接受消息
     *
     * @param channel
     * @return
     */
    @Override
    public boolean isReceiver(String channel) {
        if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
            return true;
        }
        return false;
    }

發(fā)布消息

//        發(fā)布 消息
       RedisTopicUtils.sendMessage(AAAService.TOPIC, “這是xx消息”;
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容