一般我們使用redis最多的場景還是作為緩存中間件使用,redis也能做為消息隊列使用,但這不是Redis的強項,不過如果需要的話還是可以使用的。
redis的發(fā)布訂閱
集成到springboot中
- 引入redis starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- 新建一個RedisMessageListenerConfig
創(chuàng)建 RedisMessageListenerConfig之間要先自定義定義一個接口RedisPubSub接口,這個接口用于處理收到的信息,如果實現(xiàn)發(fā)布訂閱只需實現(xiàn)這個接口即可。
public interface RedisPubSub {
/**
* 接收消息
* @param message
*/
void receiveMessage(String message);
/**
* 發(fā)布訂閱監(jiān)聽的topic key
* @return
*/
CacheKeyEnum getCacheKeyEnum();
}
public enum CacheKeyEnum {
/**
* 消息訂閱
*/
PUBSUB_QUEUE("pubsub:queue" , 0L),
;
// 緩存鍵名
private String key;
/**
* 過期時間,單位秒
* 0 表示不過期
*/
private Long expireTime;
//省略getter、setter
}
RedisMessageListenerConfig.java
@Component
public class RedisMessageListenerConfig {
// 如果項目中沒有RedisPubSub實現(xiàn)類,啟動會報錯,所以設(shè)置required = false
@Autowired(required = false)
private Set<RedisPubSub> redisPubSubs;
/**
* 創(chuàng)建連接工廠
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory , Map<? extends MessageListener, Collection<? extends Topic>> listenerAdapters){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setMessageListeners(listenerAdapters);
return container;
}
@Bean
public Map<MessageListener, Collection<Topic>> listenerAdapters(){
if (!CollectionUtils.isEmpty(redisPubSubs)) {
Map<MessageListener, Collection<Topic>> map = new HashMap<>(redisPubSubs.size());
for (RedisPubSub redisPubSub : redisPubSubs) {
final CacheKeyEnum cacheKeyEnum = redisPubSub.getCacheKeyEnum();
// redis會利用反射調(diào)用receiveMessage方法
final MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(redisPubSub, "receiveMessage");
messageListenerAdapter.afterPropertiesSet();
map.put(messageListenerAdapter , Collections.singletonList(new PatternTopic(cacheKeyEnum.getKey())));
}
return map;
}
return Collections.emptyMap();
}
}
準(zhǔn)備redis工具類:
@Slf4j
@Component
public class RedisUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 消息發(fā)布
* @param cacheKeyEnum
* @param message
*/
public void publish(CacheKeyEnum cacheKeyEnum , Object message){
redisTemplate.convertAndSend(cacheKeyEnum.getKey() , message);
}
/**
* 反序列化redis數(shù)據(jù)
* @param value
* @param <T>
* @return
*/
public <T> T deserialize(String value){
final RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();
final Object deserialize = valueSerializer.deserialize(value.getBytes());
return deserialize == null ? null : (T) deserialize;
}
}
集成測試
- 實現(xiàn)RedisPubSub
@Component
public class PubsubQueue implements RedisPubSub {
@Autowired
private RedisUtil redisUtil;
/**
* 接收消息
*
* @param message
*/
@Override
public void receiveMessage(String message) {
System.out.println(message);
final UserEntity deserialize = redisUtil.deserialize(message);
System.out.println("getId="+deserialize.getId());
}
/**
* 發(fā)布訂閱監(jiān)聽的topic key
*
* @return
*/
@Override
public CacheKeyEnum getCacheKeyEnum() {
return CacheKeyEnum.PUBSUB_QUEUE;
}
}
- 單元測試
@Test
public void testRedisQueue(){
UserEntity userEntity = new UserEntity();
userEntity.setId(123456L);
redisUtil.publish(CacheKeyEnum.PUBSUB_QUEUE , userEntity);
}
控制臺輸出:

在這里插入圖片描述
使用redis發(fā)布訂閱的注意點:
RedisPubSub 中receiveMessage接收的參數(shù)是String類型,redis在發(fā)布訂閱中接收到的對象是字節(jié)數(shù)組,控制臺打印是一個json格式的,如果redis用的是默認(rèn)的JdkSerializationRedisSerializer序列化類,直接想通過將String轉(zhuǎn)成JSON是不行的,會報錯的,而且如果用的是jdk的序列化類,要發(fā)布的對象必須實現(xiàn)Serializable接口,否則也會報錯。
所以可以通過上面的redisUtil中的反序列化方法來進(jìn)行對象的轉(zhuǎn)化,這樣不管是不是用的是什么序列化類都不會報錯。
Redis發(fā)布訂閱的缺點:
消息不持久化,一旦訂閱者沒收到消息或者重啟,消息將丟失。
相對于專業(yè)的消息中間件來說,Redis的發(fā)布訂閱相對簡單,慎用即可。
能力一般,水平有限,如有錯誤,請多指出。
如果對你有用點個關(guān)注給個贊唄