springboot中 玩轉(zhuǎn)redis的發(fā)布訂閱

一般我們使用redis最多的場景還是作為緩存中間件使用,redis也能做為消息隊列使用,但這不是Redis的強項,不過如果需要的話還是可以使用的。

redis的發(fā)布訂閱

集成到springboot中

  1. 引入redis starter
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-redis</artifactId>
 </dependency>
  1. 新建一個RedisMessageListenerConfig

創(chuàng)建 RedisMessageListenerConfig之間要先自定義定義一個接口RedisPubSub接口,這個接口用于處理收到的信息,如果實現(xiàn)發(fā)布訂閱只需實現(xiàn)這個接口即可。

public interface RedisPubSub {

    /**
     * 接收消息
     * @param message
     */
    void receiveMessage(String message);

    /**
     * 發(fā)布訂閱監(jiān)聽的topic key
     * @return
     */
    CacheKeyEnum getCacheKeyEnum();
}
public enum CacheKeyEnum {

    /**
     * 消息訂閱
     */
    PUBSUB_QUEUE("pubsub:queue"  , 0L),
    ;

    // 緩存鍵名
    private String key;
    
    /**
     * 過期時間,單位秒
     * 0 表示不過期
     */
    private Long expireTime;
    //省略getter、setter
}

RedisMessageListenerConfig.java

@Component
public class RedisMessageListenerConfig {
    // 如果項目中沒有RedisPubSub實現(xiàn)類,啟動會報錯,所以設(shè)置required = false
    @Autowired(required = false)
    private Set<RedisPubSub> redisPubSubs;

    /**
     * 創(chuàng)建連接工廠
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory , Map<? extends MessageListener, Collection<? extends Topic>> listenerAdapters){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        container.setMessageListeners(listenerAdapters);
        return container;
    }

    @Bean
    public Map<MessageListener, Collection<Topic>> listenerAdapters(){
        if (!CollectionUtils.isEmpty(redisPubSubs)) {
            Map<MessageListener, Collection<Topic>> map = new HashMap<>(redisPubSubs.size());
            for (RedisPubSub redisPubSub : redisPubSubs) {
                final CacheKeyEnum cacheKeyEnum = redisPubSub.getCacheKeyEnum();
                // redis會利用反射調(diào)用receiveMessage方法
                final MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(redisPubSub, "receiveMessage");
                messageListenerAdapter.afterPropertiesSet();
                map.put(messageListenerAdapter , Collections.singletonList(new PatternTopic(cacheKeyEnum.getKey())));
            }
            return map;
        }
        return Collections.emptyMap();
    }
}

準(zhǔn)備redis工具類:

@Slf4j
@Component
public class RedisUtil {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    /**
     * 消息發(fā)布
     * @param cacheKeyEnum
     * @param message
     */
    public void publish(CacheKeyEnum cacheKeyEnum , Object message){
        redisTemplate.convertAndSend(cacheKeyEnum.getKey() , message);
    }

    /**
     * 反序列化redis數(shù)據(jù)
     * @param value
     * @param <T>
     * @return
     */
    public <T> T deserialize(String value){
        final RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();
        final Object deserialize = valueSerializer.deserialize(value.getBytes());
        return deserialize == null ? null : (T) deserialize;
    }
}

集成測試

  1. 實現(xiàn)RedisPubSub
@Component
public class PubsubQueue implements RedisPubSub {

    @Autowired
    private RedisUtil redisUtil;

    /**
     * 接收消息
     *
     * @param message
     */
    @Override
    public void receiveMessage(String message) {
        System.out.println(message);
        final UserEntity deserialize = redisUtil.deserialize(message);
        System.out.println("getId="+deserialize.getId());

    }

    /**
     * 發(fā)布訂閱監(jiān)聽的topic key
     *
     * @return
     */
    @Override
    public CacheKeyEnum getCacheKeyEnum() {
        return CacheKeyEnum.PUBSUB_QUEUE;
    }
}
  1. 單元測試
@Test
    public void testRedisQueue(){
        UserEntity userEntity = new UserEntity();
        userEntity.setId(123456L);
        redisUtil.publish(CacheKeyEnum.PUBSUB_QUEUE , userEntity);
    }

控制臺輸出:


在這里插入圖片描述

使用redis發(fā)布訂閱的注意點:
RedisPubSub 中receiveMessage接收的參數(shù)是String類型,redis在發(fā)布訂閱中接收到的對象是字節(jié)數(shù)組,控制臺打印是一個json格式的,如果redis用的是默認(rèn)的JdkSerializationRedisSerializer序列化類,直接想通過將String轉(zhuǎn)成JSON是不行的,會報錯的,而且如果用的是jdk的序列化類,要發(fā)布的對象必須實現(xiàn)Serializable接口,否則也會報錯。
所以可以通過上面的redisUtil中的反序列化方法來進(jìn)行對象的轉(zhuǎn)化,這樣不管是不是用的是什么序列化類都不會報錯。

Redis發(fā)布訂閱的缺點:

消息不持久化,一旦訂閱者沒收到消息或者重啟,消息將丟失。
相對于專業(yè)的消息中間件來說,Redis的發(fā)布訂閱相對簡單,慎用即可。

能力一般,水平有限,如有錯誤,請多指出。
如果對你有用點個關(guān)注給個贊唄

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容