Java 如何使用redis實(shí)現(xiàn)發(fā)布訂閱

背景:

項(xiàng)目中采用多節(jié)點(diǎn)部署,保證http請(qǐng)求可以在多個(gè)節(jié)點(diǎn)執(zhí)行,提高業(yè)務(wù)運(yùn)行效率;由于不愿引入MQ消息隊(duì)列,所以利用redis的發(fā)布訂閱功能來(lái)實(shí)現(xiàn)類似消息隊(duì)列功能,話不多說(shuō)直接上代碼。


步驟一:

項(xiàng)目中引入redis依賴

<dependency>

? ? <groupId>org.springframework.boot</groupId>

? ? <artifactId>spring-boot-starter-data-redis</artifactId>

</dependency>


步驟二:

添加redis訂閱頻道:

package cn.yzw.ibuild.domain.redis.subscribe;

import cn.yzw.ibuild.domain.support.enums.RedisMessageChannelEnum;

import org.springframework.context.annotation.Bean;

import org.springframework.data.redis.connection.RedisConnectionFactory;

import org.springframework.data.redis.listener.ChannelTopic;

import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import org.springframework.stereotype.Component;

import java.util.Arrays;

@Component

public class RedisSubConfig {

@Bean

? ? public RedisMessageListenerContainercontainer(RedisConnectionFactory factory, RedisMessageListener listener) {

RedisMessageListenerContainer container =new RedisMessageListenerContainer();

? ? ? ? container.setConnectionFactory(factory);

? ? ? ? // 添加redis訂閱頻道, 頻道配置在 RedisMessageChannelEnum枚舉中

? ? ? ? Arrays.stream(RedisMessageChannelEnum.values()).forEach(item ->container.addMessageListener(listener, new ChannelTopic(item.code())));

? ? ? ? return container;

? ? }

}


步驟三:

添加redis發(fā)布端代碼

@Component

public class RedisMessagePublish {

@Resource

? ? private RedisTemplateredisTemplate;

? ? @Resource

? ? private RedissonClientredissonClient;

? ? public void publishWorkerReportMessage(String channel, RedisMessageUserVO redisMessageUserVO){

????????redisTemplate.convertAndSend(channel, redisMessageUserVO);

? ? }

}


步驟四:

編寫消費(fèi)者端代碼,

@Component

@Slf4j

public class RedisMessageListenerimplements MessageListener {

@Resource

? ? private RedisTemplateredisTemplate;

? ? @Resource

? ? private RedissonClientredissonClient;

? ? @Resource

? ? private IWorkerStatisticsworkerStatistics;

? ? @Resource

? ? private ObjectMapperobjectMapper;

? ? @Override

? ? public void onMessage(Message message, byte[] pattern) {

// 獲取消息

? ? ? ? byte[] messageBody = message.getBody();

? ? ? ? // 使用值序列化器轉(zhuǎn)換

? ? ? ? Object msg =redisTemplate.getValueSerializer().deserialize(messageBody);

? ? ? ? // 獲取監(jiān)聽(tīng)的頻道

? ? ? ? byte[] channelByte = message.getChannel();

? ? ? ? // 使用字符串序列化器轉(zhuǎn)換

? ? ? ? Object channel =redisTemplate.getStringSerializer().deserialize(channelByte);

? ? ? ? System.out.println("---頻道---: " + channel);

? ? ? ? System.out.println("---消息內(nèi)容---: " + msg);

? ? ? ? RedisMessageUserVO redisMessageUserVO =objectMapper.convertValue(msg, RedisMessageUserVO.class);

if (Objects.equals(channel, RedisMessageChannelEnum.REPORT_WORKER_PRD.code()) && Objects.equals(SpringUtils.getActiveProfile(), "prd")) {

workerStatistics.workerStatisticsTask(redisMessageUserVO.getUserVOList());

? ? ? ? ? ? }else if (Objects.equals(channel, RedisMessageChannelEnum.REPORT_WORKER_STG.code())) {

workerStatistics.workerStatisticsTask(redisMessageUserVO.getUserVOList());

? ? ? ? ? ? }

}

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容