Redis做輕量級消息隊(duì)列的3種玩法

Redis也可以做輕量級的消息隊(duì)列:基于List的隊(duì)列模式、PubSub多播的發(fā)布訂閱模式、以及5.0之后提供的Stream。
特別是Stream,可以消息持久化、高可用、消息可以指定offset進(jìn)行反復(fù)消費(fèi)、支持發(fā)布訂閱按消費(fèi)組多播、使用消費(fèi)者的pending_ids機(jī)制保證消息傳遞的可靠性等等,基本已經(jīng)具有了真正的消息隊(duì)列中間件的功能了。而PubSub模式在一些對消息傳遞可靠性和可追溯性不嚴(yán)格的場景(比如內(nèi)網(wǎng)非一致性的消息通知)也有一定的使用價(jià)值。

一、快速入門

1、用list類型模擬隊(duì)列

典型的隊(duì)列模式,rpush右邊放入,lpop左邊取出。舉例:

127.0.0.1:7001> rpush queue A
-> Redirected to slot [13011] located at 122.51.112.187:7003
(integer) 1
122.51.112.187:7003> rpush queue B
(integer) 2
122.51.112.187:7003> rpush queue C
(integer) 3
122.51.112.187:7003> lpop queue
"A"
122.51.112.187:7003> lpop queue
"B"
122.51.112.187:7003> lpop queue
"C"
122.51.112.187:7003> lpop queue
(nil)
2、PUB|SUB

PubSub解決了list做隊(duì)列1個(gè)消息只能被單個(gè)消費(fèi)者消費(fèi)的問題,可以1個(gè)消息被多個(gè)消費(fèi)組的消費(fèi)組收到,即所謂的發(fā)布訂閱模式。

用redis-cli先來用一下:

127.0.0.1:7001> subscribe testTopic
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "testTopic"
3) (integer) 1

返回subscript testTopic 1意思是訂閱testTopic成功,然后阻塞等待消息,再開一個(gè)終端連接進(jìn)行publish:

127.0.0.1:7001> publish testTopic msg1
(integer) 1

然后訂閱者側(cè)連接收到消息:

1) "message"
2) "testTopic"
3) "msg1"

message testTopic msg1,意思是收到了message,主題是testTopic,內(nèi)容是msg1

可以同時(shí)訂閱多個(gè)主題,

subscribe testTopic1 testTopic2 testTopic3

也可以按照模式匹配的方式來訂閱主題psubscribe,比如訂閱所有以test開頭的主題

psubscribe test*

PubSub的消息沒有持久化,發(fā)布消息之后如果沒有消費(fèi)者、那么消息會直接丟棄,如果消費(fèi)者剛好此時(shí)宕機(jī)、重啟后也不會收到宕機(jī)時(shí)發(fā)布的消息,所以PubSub來做消息隊(duì)列的場景十分有限。

Redis5.0之后提供了更強(qiáng)大的Redis Stream數(shù)據(jù)結(jié)構(gòu),解決了上述問題。

3、Redis Stream

發(fā)布訂閱模式,一個(gè)消息發(fā)布到stream里,多個(gè)消費(fèi)組訂閱了這個(gè)stream的話都可以收到這個(gè)消息,且消息是持久化的。消費(fèi)組在stream里用last_delivered_id來定位偏移量、也就是消費(fèi)到哪個(gè)消息了。從設(shè)計(jì)上看確實(shí)跟kafka非常像,Redis作者也說借鑒了kafka。

從數(shù)據(jù)結(jié)構(gòu)上來說,stream是個(gè)鏈表,節(jié)點(diǎn)是消息,消息有ID,消息內(nèi)容是一系列的k-v對。stream也有消費(fèi)組和消費(fèi)者的概念,每個(gè)消費(fèi)者組用last_delivered_id游標(biāo)指向鏈表中的節(jié)點(diǎn)、來表示消費(fèi)到哪個(gè)ID的消息了,類似kafka里的offset偏移量,每個(gè)消費(fèi)者組里可以有多個(gè)消費(fèi)者,一個(gè)消息只會被投遞給消費(fèi)者組里的一個(gè)消費(fèi)者,可以類比RocketMQ里的集群消息。另外,每個(gè)消費(fèi)者內(nèi)部還有一個(gè) pending_ids數(shù)組,它記錄著這個(gè)消費(fèi)者已經(jīng)被客戶端讀取了的、但客戶端沒有回復(fù)ACK的消息,由此確保消息至少被消費(fèi)1次。

添加消息和直接讀取消息命令:

xadd testTopic * name douchuzi  #向testTopic這個(gè)stream添加"name douchuzi"這個(gè)消息,*表示消息ID由Redis自動(dòng)生成

消息ID可以由Redis自動(dòng)生成,生成的ID類似"1641816045243-0",表示1641816045243這個(gè)時(shí)間戳這1ms生成的第0個(gè)消息。當(dāng)然消息ID也可以由客戶端自己生成。

xlen testTopic #查看有多少消息
xrange testTopic - + #從頭到尾返回所有消息
xread count 1 streams testTopic 0 #從ID大于0處開始讀取1個(gè)消息
xread count 1 block 0 streams testTopic $ #阻塞的從ID大于$處開始讀取1個(gè)消息,$表示最后一個(gè)消息的ID

消費(fèi)組命令:

xgroup create testTopic group1 $ #創(chuàng)建消費(fèi)組group1,其last_delivered_id是$

xgroup的完整玩法:

xgroup [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
#讓消費(fèi)者consumer1-1從group1消費(fèi)者組的testTopic stream中拿最新的、并且沒有被發(fā)送給其他消費(fèi)者處理的entry
xreadgroup group group1 consumer1-1 count 1 block 0 streams testTopic >

發(fā)送ACK,剛才阻塞xreadgroup之后從別的客戶端xadd了1條消息,然后xreadgroup阻塞結(jié)束,消息收到。這時(shí)候我們看下consumer1-1的pending_ids:

127.0.0.1:7001> xpending testTopic group1 - + 1000  consumer1-1
1) 1) "1641819182032-0"
   2) "consumer1-1"
   3) (integer) 695470
   4) (integer) 1

xpending查詢消費(fèi)組group1中消費(fèi)者consumer1-1的1000條未收到客戶端回復(fù)的消息。

接著我們xack回復(fù)一下,再看看pending_ids:

127.0.0.1:7001> xack testTopic group1 1641819182032-0
(integer) 1
127.0.0.1:7001> xpending testTopic group1 - + 1000  consumer1-1
(empty array)

可見,xack之后,consumer1-1的pending_ids為空了,Redis Stream用這個(gè)辦法來確保消息一定被投遞。

二、實(shí)戰(zhàn)開發(fā):基于SpringBoot開發(fā)Redis消息隊(duì)列

下面具體實(shí)戰(zhàn)一下,用SpringBoot來做Redis的消息隊(duì)列開發(fā),筆者使用的SpringBoot版本是2.3.7.RELEASE,其默認(rèn)的客戶端是lettuce 5.3.5.RELEASE,測試所用的Redis為6節(jié)點(diǎn)Cluster,可以參照筆者的文章Redis分布式緩存搭建 - 簡書 (jianshu.com)進(jìn)行搭建。

1、PubSub

發(fā)布消息,比較簡單,直接用RedisTemplate:

@Service
public class RedisDao {
    
    @Autowired
    RedisTemplate<String, Object> redisTemplate;
    
    /**
     * PubSub發(fā)布消息
     * */
    public void publishMessage(ChannelTopic channelTopic, String message) {
        redisTemplate.convertAndSend(channelTopic.getTopic(), message);
    }
}

訂閱消息,使用Spring提供的消息容器RedisMessageListenerContainer以及消息到達(dá)監(jiān)聽接口MessageListener

//配置PubSub消息容器
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(redisConnectionFactory());
    
    /**這里可以使用自定義注解來發(fā)現(xiàn)所有的MessageHandler,
     * 然后循環(huán)container.addMessageListener來達(dá)到自動(dòng)配置消息訂閱者的目的
     * 這樣開發(fā)只需要編寫MessageHandler的實(shí)現(xiàn)類就可以了
     */
    MessageHandler handler = new MessageHandlerImpl();
    container.addMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message, byte[] pattern) {
            handler.handleMsg(message);
        }
        
    }, handler.getChannelTopic());
    
    return container;
}

MessageHandler接口及其實(shí)現(xiàn)是MessageHandlerImpl業(yè)務(wù)層代碼:

/**
 * PubSub訂閱消息
 * 
 * */
public interface MessageHandler {
    
    //訂閱消息到達(dá)后的邏輯處理
    public void handleMsg(Message msg);
    
    //消息的Topic
    public ChannelTopic getChannelTopic();
}

@Slf4j
public class MessageHandlerImpl implements MessageHandler{

    @Override
    public void handleMsg(Message msg) {
        
        try {   
            String msgChannel = new String(msg.getChannel(), "utf-8");
            String msgBody = new String(msg.getBody(), "utf-8");
            log.info("收到消息:");
            log.info("Message channel : {}" , msgChannel);
            log.info("Message body : {}" , msgBody);
            
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    @Override
    public ChannelTopic getChannelTopic() {
        return new ChannelTopic("testPubSub");
    }

}

另外需要注意,Redis Cluster模式下如果客戶端用的是Lettuce,需要配置客戶端自適應(yīng)刷新,在集群主備故障切換的時(shí)候、客戶端能夠自動(dòng)切換到故障主節(jié)點(diǎn)對應(yīng)的從節(jié)點(diǎn)去。詳見筆者的文章Redis分布式緩存搭建 - 簡書 (jianshu.com)。

好了,測試一下:

@Slf4j
@SpringBootApplication
public class RedismqApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(RedismqApplication.class, args);
        RedisDao redisDao = context.getBean(RedisDao.class);
        redisDao.publishMessage(new ChannelTopic("testPubSub"), "肥兔子愛豆畜子");
    }

}

收到消息:
Message channel : testPubSub
Message body : 肥兔子愛豆畜子

2、Redis Stream

跟PubSub類似,也是需要消息容器MessageContainer、Listener這倆東西。

/**
 * 發(fā)送消息到指定stream
 * */
public void publishStreamMessage(String stream, Object message) {
    ObjectRecord<String, String> record =                 StreamRecords.newRecord().ofObject(JSON.toJSONString(message)).withStreamKey(stream);
    RecordId recordId = stringRedisTemplate.opsForStream().add(record);
    log.info("消息已發(fā)送,消息ID:{}" , recordId.getValue());
}

消息監(jiān)聽,實(shí)現(xiàn)StreamListener接口:

import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class StreamMessageListener implements StreamListener<String, ObjectRecord<String,String>>{
    
    private StringRedisTemplate redisTemplate;

    public StreamMessageListener(StringRedisTemplate stringRedisTemplate) {
        redisTemplate = stringRedisTemplate;
    }
    
    @Override
    public void onMessage(ObjectRecord<String, String> message) {

        RecordId id = message.getId();
        String topic = message.getStream();
        String msgBody = message.getValue();
        
        log.info("收到主題{}消息ID={}, 消息內(nèi)容{}", topic, id.getValue(), msgBody);
        
        String group = "some-service"; //消費(fèi)組,使用服務(wù)名
        redisTemplate.opsForStream().acknowledge(topic, group, id.getValue());
    }

}

配置消息容器,將StreamListener的實(shí)現(xiàn)注冊到消息容器StreamMessageListenerContainer

@Configuration
public class RedisStreamListenerContainerConfig {
    
    @Autowired
    private RedisConnectionFactory redisConnectionFactory;
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @Bean
    public StreamMessageListenerContainer redisStreamListenerContainer() {
        StreamMessageListenerContainerOptions options = 
                StreamMessageListenerContainerOptions.builder()
                                                    .batchSize(100)
                                                    .pollTimeout(Duration.ZERO)
                                                    .targetType(String.class)
                                                    .build();
        StreamMessageListenerContainer container = StreamMessageListenerContainer.create(redisConnectionFactory, options);
        
        String GroupName = "some-service";  //消費(fèi)組命名一般用服務(wù)名
        String consumerName = "127.0.0.1:8080"; //消費(fèi)者命名一般用服務(wù)集群下每個(gè)節(jié)點(diǎn)的ip:port,可以區(qū)分是哪個(gè)節(jié)點(diǎn)消費(fèi)
        String stream = "testTopic"; //stream名稱,即topic
        
        container.receive(Consumer.from(GroupName, consumerName), 
                            StreamOffset.create(stream, ReadOffset.lastConsumed()),
                            new StreamMessageListener(stringRedisTemplate)); //將Listener添加到監(jiān)聽容器
        
        container.start(); //啟動(dòng)消息容器
        
        return container;
    }
}

測試:

消息已發(fā)送,消息ID:1641872849111-0

收到主題testTopic消息ID=1641872849111-0, 消息內(nèi)容{"name":"stream-肥兔子愛豆畜子"}

總結(jié)說明:

1、為了方便消息的格式筆者統(tǒng)一用了String類型,用fastjson做序列化以后進(jìn)行傳輸。

2、StreamListener.onMessage收到消息進(jìn)行處理以后,手工調(diào)用ack進(jìn)行回復(fù),不然服務(wù)端給當(dāng)前消費(fèi)者緩存的pending_ids會越來越大、占用內(nèi)存。

3、消費(fèi)組筆者一般用服務(wù)名來區(qū)分,服務(wù)下掛多個(gè)節(jié)點(diǎn),那么每個(gè)節(jié)點(diǎn)可以用ip:port作為唯一標(biāo)識,所以用ip:port作為消費(fèi)組下的消費(fèi)組名稱。

參考:《Redis深度歷險(xiǎn):核心原理與應(yīng)用實(shí)踐》

       [Redis 的發(fā)布訂閱功能在 SpringBoot 中的應(yīng)用 - 知乎 (zhihu.com)](https://zhuanlan.zhihu.com/p/59065399)

       [Stream消息隊(duì)列在SpringBoot中的實(shí)踐與踩坑 | (lolico.me)](https://lolico.me/2020/06/28/Using-stream-to-implement-message-queue-in-springboot/)

      [redis — 基于Spring Boot實(shí)現(xiàn)Redis stream實(shí)時(shí)流事件處理_Haqiu.Hwang的博客-CSDN博客](https://blog.csdn.net/qq_38658567/article/details/109376888)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容