背景:
項(xiàng)目中采用多節(jié)點(diǎn)部署,保證http請(qǐng)求可以在多個(gè)節(jié)點(diǎn)執(zhí)行,提高業(yè)務(wù)運(yùn)行效率;由于不愿引入MQ消息隊(duì)列,所以利用redis的發(fā)布訂閱功能來(lái)實(shí)現(xiàn)類似消息隊(duì)列功能,話不多說(shuō)直接上代碼。
步驟一:
項(xiàng)目中引入redis依賴
<dependency>
? ? <groupId>org.springframework.boot</groupId>
? ? <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
步驟二:
添加redis訂閱頻道:
package cn.yzw.ibuild.domain.redis.subscribe;
import cn.yzw.ibuild.domain.support.enums.RedisMessageChannelEnum;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import java.util.Arrays;
@Component
public class RedisSubConfig {
@Bean
? ? public RedisMessageListenerContainercontainer(RedisConnectionFactory factory, RedisMessageListener listener) {
RedisMessageListenerContainer container =new RedisMessageListenerContainer();
? ? ? ? container.setConnectionFactory(factory);
? ? ? ? // 添加redis訂閱頻道, 頻道配置在 RedisMessageChannelEnum枚舉中
? ? ? ? Arrays.stream(RedisMessageChannelEnum.values()).forEach(item ->container.addMessageListener(listener, new ChannelTopic(item.code())));
? ? ? ? return container;
? ? }
}
步驟三:
添加redis發(fā)布端代碼
@Component
public class RedisMessagePublish {
@Resource
? ? private RedisTemplateredisTemplate;
? ? @Resource
? ? private RedissonClientredissonClient;
? ? public void publishWorkerReportMessage(String channel, RedisMessageUserVO redisMessageUserVO){
????????redisTemplate.convertAndSend(channel, redisMessageUserVO);
? ? }
}
步驟四:
編寫消費(fèi)者端代碼,
@Component
@Slf4j
public class RedisMessageListenerimplements MessageListener {
@Resource
? ? private RedisTemplateredisTemplate;
? ? @Resource
? ? private RedissonClientredissonClient;
? ? @Resource
? ? private IWorkerStatisticsworkerStatistics;
? ? @Resource
? ? private ObjectMapperobjectMapper;
? ? @Override
? ? public void onMessage(Message message, byte[] pattern) {
// 獲取消息
? ? ? ? byte[] messageBody = message.getBody();
? ? ? ? // 使用值序列化器轉(zhuǎn)換
? ? ? ? Object msg =redisTemplate.getValueSerializer().deserialize(messageBody);
? ? ? ? // 獲取監(jiān)聽(tīng)的頻道
? ? ? ? byte[] channelByte = message.getChannel();
? ? ? ? // 使用字符串序列化器轉(zhuǎn)換
? ? ? ? Object channel =redisTemplate.getStringSerializer().deserialize(channelByte);
? ? ? ? System.out.println("---頻道---: " + channel);
? ? ? ? System.out.println("---消息內(nèi)容---: " + msg);
? ? ? ? RedisMessageUserVO redisMessageUserVO =objectMapper.convertValue(msg, RedisMessageUserVO.class);
if (Objects.equals(channel, RedisMessageChannelEnum.REPORT_WORKER_PRD.code()) && Objects.equals(SpringUtils.getActiveProfile(), "prd")) {
workerStatistics.workerStatisticsTask(redisMessageUserVO.getUserVOList());
? ? ? ? ? ? }else if (Objects.equals(channel, RedisMessageChannelEnum.REPORT_WORKER_STG.code())) {
workerStatistics.workerStatisticsTask(redisMessageUserVO.getUserVOList());
? ? ? ? ? ? }
}
}