Redis也可以做輕量級的消息隊(duì)列:基于List的隊(duì)列模式、PubSub多播的發(fā)布訂閱模式、以及5.0之后提供的Stream。
特別是Stream,可以消息持久化、高可用、消息可以指定offset進(jìn)行反復(fù)消費(fèi)、支持發(fā)布訂閱按消費(fèi)組多播、使用消費(fèi)者的pending_ids機(jī)制保證消息傳遞的可靠性等等,基本已經(jīng)具有了真正的消息隊(duì)列中間件的功能了。而PubSub模式在一些對消息傳遞可靠性和可追溯性不嚴(yán)格的場景(比如內(nèi)網(wǎng)非一致性的消息通知)也有一定的使用價(jià)值。
一、快速入門
1、用list類型模擬隊(duì)列
典型的隊(duì)列模式,rpush右邊放入,lpop左邊取出。舉例:
127.0.0.1:7001> rpush queue A
-> Redirected to slot [13011] located at 122.51.112.187:7003
(integer) 1
122.51.112.187:7003> rpush queue B
(integer) 2
122.51.112.187:7003> rpush queue C
(integer) 3
122.51.112.187:7003> lpop queue
"A"
122.51.112.187:7003> lpop queue
"B"
122.51.112.187:7003> lpop queue
"C"
122.51.112.187:7003> lpop queue
(nil)
2、PUB|SUB
PubSub解決了list做隊(duì)列1個(gè)消息只能被單個(gè)消費(fèi)者消費(fèi)的問題,可以1個(gè)消息被多個(gè)消費(fèi)組的消費(fèi)組收到,即所謂的發(fā)布訂閱模式。
用redis-cli先來用一下:
127.0.0.1:7001> subscribe testTopic
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "testTopic"
3) (integer) 1
返回subscript testTopic 1意思是訂閱testTopic成功,然后阻塞等待消息,再開一個(gè)終端連接進(jìn)行publish:
127.0.0.1:7001> publish testTopic msg1
(integer) 1
然后訂閱者側(cè)連接收到消息:
1) "message"
2) "testTopic"
3) "msg1"
message testTopic msg1,意思是收到了message,主題是testTopic,內(nèi)容是msg1
可以同時(shí)訂閱多個(gè)主題,
subscribe testTopic1 testTopic2 testTopic3
也可以按照模式匹配的方式來訂閱主題psubscribe,比如訂閱所有以test開頭的主題
psubscribe test*
PubSub的消息沒有持久化,發(fā)布消息之后如果沒有消費(fèi)者、那么消息會直接丟棄,如果消費(fèi)者剛好此時(shí)宕機(jī)、重啟后也不會收到宕機(jī)時(shí)發(fā)布的消息,所以PubSub來做消息隊(duì)列的場景十分有限。
Redis5.0之后提供了更強(qiáng)大的Redis Stream數(shù)據(jù)結(jié)構(gòu),解決了上述問題。
3、Redis Stream
發(fā)布訂閱模式,一個(gè)消息發(fā)布到stream里,多個(gè)消費(fèi)組訂閱了這個(gè)stream的話都可以收到這個(gè)消息,且消息是持久化的。消費(fèi)組在stream里用last_delivered_id來定位偏移量、也就是消費(fèi)到哪個(gè)消息了。從設(shè)計(jì)上看確實(shí)跟kafka非常像,Redis作者也說借鑒了kafka。
從數(shù)據(jù)結(jié)構(gòu)上來說,stream是個(gè)鏈表,節(jié)點(diǎn)是消息,消息有ID,消息內(nèi)容是一系列的k-v對。stream也有消費(fèi)組和消費(fèi)者的概念,每個(gè)消費(fèi)者組用last_delivered_id游標(biāo)指向鏈表中的節(jié)點(diǎn)、來表示消費(fèi)到哪個(gè)ID的消息了,類似kafka里的offset偏移量,每個(gè)消費(fèi)者組里可以有多個(gè)消費(fèi)者,一個(gè)消息只會被投遞給消費(fèi)者組里的一個(gè)消費(fèi)者,可以類比RocketMQ里的集群消息。另外,每個(gè)消費(fèi)者內(nèi)部還有一個(gè) pending_ids數(shù)組,它記錄著這個(gè)消費(fèi)者已經(jīng)被客戶端讀取了的、但客戶端沒有回復(fù)ACK的消息,由此確保消息至少被消費(fèi)1次。
添加消息和直接讀取消息命令:
xadd testTopic * name douchuzi #向testTopic這個(gè)stream添加"name douchuzi"這個(gè)消息,*表示消息ID由Redis自動(dòng)生成
消息ID可以由Redis自動(dòng)生成,生成的ID類似"1641816045243-0",表示1641816045243這個(gè)時(shí)間戳這1ms生成的第0個(gè)消息。當(dāng)然消息ID也可以由客戶端自己生成。
xlen testTopic #查看有多少消息
xrange testTopic - + #從頭到尾返回所有消息
xread count 1 streams testTopic 0 #從ID大于0處開始讀取1個(gè)消息
xread count 1 block 0 streams testTopic $ #阻塞的從ID大于$處開始讀取1個(gè)消息,$表示最后一個(gè)消息的ID
消費(fèi)組命令:
xgroup create testTopic group1 $ #創(chuàng)建消費(fèi)組group1,其last_delivered_id是$
xgroup的完整玩法:
xgroup [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
#讓消費(fèi)者consumer1-1從group1消費(fèi)者組的testTopic stream中拿最新的、并且沒有被發(fā)送給其他消費(fèi)者處理的entry
xreadgroup group group1 consumer1-1 count 1 block 0 streams testTopic >
發(fā)送ACK,剛才阻塞xreadgroup之后從別的客戶端xadd了1條消息,然后xreadgroup阻塞結(jié)束,消息收到。這時(shí)候我們看下consumer1-1的pending_ids:
127.0.0.1:7001> xpending testTopic group1 - + 1000 consumer1-1
1) 1) "1641819182032-0"
2) "consumer1-1"
3) (integer) 695470
4) (integer) 1
xpending查詢消費(fèi)組group1中消費(fèi)者consumer1-1的1000條未收到客戶端回復(fù)的消息。
接著我們xack回復(fù)一下,再看看pending_ids:
127.0.0.1:7001> xack testTopic group1 1641819182032-0
(integer) 1
127.0.0.1:7001> xpending testTopic group1 - + 1000 consumer1-1
(empty array)
可見,xack之后,consumer1-1的pending_ids為空了,Redis Stream用這個(gè)辦法來確保消息一定被投遞。
二、實(shí)戰(zhàn)開發(fā):基于SpringBoot開發(fā)Redis消息隊(duì)列
下面具體實(shí)戰(zhàn)一下,用SpringBoot來做Redis的消息隊(duì)列開發(fā),筆者使用的SpringBoot版本是2.3.7.RELEASE,其默認(rèn)的客戶端是lettuce 5.3.5.RELEASE,測試所用的Redis為6節(jié)點(diǎn)Cluster,可以參照筆者的文章Redis分布式緩存搭建 - 簡書 (jianshu.com)進(jìn)行搭建。
1、PubSub
發(fā)布消息,比較簡單,直接用RedisTemplate:
@Service
public class RedisDao {
@Autowired
RedisTemplate<String, Object> redisTemplate;
/**
* PubSub發(fā)布消息
* */
public void publishMessage(ChannelTopic channelTopic, String message) {
redisTemplate.convertAndSend(channelTopic.getTopic(), message);
}
}
訂閱消息,使用Spring提供的消息容器RedisMessageListenerContainer以及消息到達(dá)監(jiān)聽接口MessageListener:
//配置PubSub消息容器
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory());
/**這里可以使用自定義注解來發(fā)現(xiàn)所有的MessageHandler,
* 然后循環(huán)container.addMessageListener來達(dá)到自動(dòng)配置消息訂閱者的目的
* 這樣開發(fā)只需要編寫MessageHandler的實(shí)現(xiàn)類就可以了
*/
MessageHandler handler = new MessageHandlerImpl();
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
handler.handleMsg(message);
}
}, handler.getChannelTopic());
return container;
}
MessageHandler接口及其實(shí)現(xiàn)是MessageHandlerImpl業(yè)務(wù)層代碼:
/**
* PubSub訂閱消息
*
* */
public interface MessageHandler {
//訂閱消息到達(dá)后的邏輯處理
public void handleMsg(Message msg);
//消息的Topic
public ChannelTopic getChannelTopic();
}
@Slf4j
public class MessageHandlerImpl implements MessageHandler{
@Override
public void handleMsg(Message msg) {
try {
String msgChannel = new String(msg.getChannel(), "utf-8");
String msgBody = new String(msg.getBody(), "utf-8");
log.info("收到消息:");
log.info("Message channel : {}" , msgChannel);
log.info("Message body : {}" , msgBody);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public ChannelTopic getChannelTopic() {
return new ChannelTopic("testPubSub");
}
}
另外需要注意,Redis Cluster模式下如果客戶端用的是Lettuce,需要配置客戶端自適應(yīng)刷新,在集群主備故障切換的時(shí)候、客戶端能夠自動(dòng)切換到故障主節(jié)點(diǎn)對應(yīng)的從節(jié)點(diǎn)去。詳見筆者的文章Redis分布式緩存搭建 - 簡書 (jianshu.com)。
好了,測試一下:
@Slf4j
@SpringBootApplication
public class RedismqApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(RedismqApplication.class, args);
RedisDao redisDao = context.getBean(RedisDao.class);
redisDao.publishMessage(new ChannelTopic("testPubSub"), "肥兔子愛豆畜子");
}
}
收到消息:
Message channel : testPubSub
Message body : 肥兔子愛豆畜子
2、Redis Stream
跟PubSub類似,也是需要消息容器MessageContainer、Listener這倆東西。
/**
* 發(fā)送消息到指定stream
* */
public void publishStreamMessage(String stream, Object message) {
ObjectRecord<String, String> record = StreamRecords.newRecord().ofObject(JSON.toJSONString(message)).withStreamKey(stream);
RecordId recordId = stringRedisTemplate.opsForStream().add(record);
log.info("消息已發(fā)送,消息ID:{}" , recordId.getValue());
}
消息監(jiān)聽,實(shí)現(xiàn)StreamListener接口:
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class StreamMessageListener implements StreamListener<String, ObjectRecord<String,String>>{
private StringRedisTemplate redisTemplate;
public StreamMessageListener(StringRedisTemplate stringRedisTemplate) {
redisTemplate = stringRedisTemplate;
}
@Override
public void onMessage(ObjectRecord<String, String> message) {
RecordId id = message.getId();
String topic = message.getStream();
String msgBody = message.getValue();
log.info("收到主題{}消息ID={}, 消息內(nèi)容{}", topic, id.getValue(), msgBody);
String group = "some-service"; //消費(fèi)組,使用服務(wù)名
redisTemplate.opsForStream().acknowledge(topic, group, id.getValue());
}
}
配置消息容器,將StreamListener的實(shí)現(xiàn)注冊到消息容器StreamMessageListenerContainer:
@Configuration
public class RedisStreamListenerContainerConfig {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Bean
public StreamMessageListenerContainer redisStreamListenerContainer() {
StreamMessageListenerContainerOptions options =
StreamMessageListenerContainerOptions.builder()
.batchSize(100)
.pollTimeout(Duration.ZERO)
.targetType(String.class)
.build();
StreamMessageListenerContainer container = StreamMessageListenerContainer.create(redisConnectionFactory, options);
String GroupName = "some-service"; //消費(fèi)組命名一般用服務(wù)名
String consumerName = "127.0.0.1:8080"; //消費(fèi)者命名一般用服務(wù)集群下每個(gè)節(jié)點(diǎn)的ip:port,可以區(qū)分是哪個(gè)節(jié)點(diǎn)消費(fèi)
String stream = "testTopic"; //stream名稱,即topic
container.receive(Consumer.from(GroupName, consumerName),
StreamOffset.create(stream, ReadOffset.lastConsumed()),
new StreamMessageListener(stringRedisTemplate)); //將Listener添加到監(jiān)聽容器
container.start(); //啟動(dòng)消息容器
return container;
}
}
測試:
消息已發(fā)送,消息ID:1641872849111-0
收到主題testTopic消息ID=1641872849111-0, 消息內(nèi)容{"name":"stream-肥兔子愛豆畜子"}
總結(jié)說明:
1、為了方便消息的格式筆者統(tǒng)一用了String類型,用fastjson做序列化以后進(jìn)行傳輸。
2、StreamListener.onMessage收到消息進(jìn)行處理以后,手工調(diào)用ack進(jìn)行回復(fù),不然服務(wù)端給當(dāng)前消費(fèi)者緩存的pending_ids會越來越大、占用內(nèi)存。
3、消費(fèi)組筆者一般用服務(wù)名來區(qū)分,服務(wù)下掛多個(gè)節(jié)點(diǎn),那么每個(gè)節(jié)點(diǎn)可以用ip:port作為唯一標(biāo)識,所以用ip:port作為消費(fèi)組下的消費(fèi)組名稱。
參考:《Redis深度歷險(xiǎn):核心原理與應(yīng)用實(shí)踐》
[Redis 的發(fā)布訂閱功能在 SpringBoot 中的應(yīng)用 - 知乎 (zhihu.com)](https://zhuanlan.zhihu.com/p/59065399)
[Stream消息隊(duì)列在SpringBoot中的實(shí)踐與踩坑 | (lolico.me)](https://lolico.me/2020/06/28/Using-stream-to-implement-message-queue-in-springboot/)
[redis — 基于Spring Boot實(shí)現(xiàn)Redis stream實(shí)時(shí)流事件處理_Haqiu.Hwang的博客-CSDN博客](https://blog.csdn.net/qq_38658567/article/details/109376888)