Redis-發(fā)布訂閱

簡介

Redis 發(fā)布訂閱(pub/sub)是一種消息通信模式:發(fā)送者(pub)發(fā)送消息,訂閱者(sub)接收消息。可以用于消息的傳輸,Redis的發(fā)布訂閱機(jī)制包括三個(gè)部分,發(fā)布者,訂閱者和Channel。適宜做在線聊天、消息推送等。

發(fā)布者和訂閱者都是Redis客戶端,Channel則為Redis服務(wù)器端,發(fā)布者將消息發(fā)送到某個(gè)的頻道,訂閱了這個(gè)頻道的訂閱者就能接收到這條消息,Redis 客戶端可以訂閱任意數(shù)量的頻道。

屏幕快照 2021-01-24 下午9.21.38.png

上圖展示了頻道 channel1 ,以及訂閱這個(gè)頻道的3個(gè)客戶端 - client2/client5/lient1 之間的關(guān)系
屏幕快照 2021-01-24 下午9.24.13.png

當(dāng)有新消息通過 PUBLISH 命令發(fā)送給頻道 channel1 時(shí), 這個(gè)消息就會(huì)被發(fā)送給訂閱它的3個(gè)客戶端(client2/client5/lient1)

發(fā)布訂閱

  1. 發(fā)布消息
    Redis采用PUBLISH命令發(fā)送消息,其返回值為接收到該消息的訂閱者的數(shù)量。
C:\Users\Administrator>redis-cli
127.0.0.1:6379> PUBLISH redisChat "Redis is a great caching technique"
(integer) 1
127.0.0.1:6379> PUBLISH redisChat "Learn redis"
(integer) 1
127.0.0.1:6379>
  1. 訂閱某個(gè)頻道
    Redis采用SUBSCRIBE命令訂閱某個(gè)頻道,其返回值包括客戶端訂閱的頻道,目前已訂閱的頻道數(shù)量,以及接收到的消息,其中subscribe表示已經(jīng)成功訂閱了某個(gè)頻道。
C:\Users\Administrator>redis-cli
127.0.0.1:6379> SUBSCRIBE redisChat
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "redisChat"
3) (integer) 1
屏幕快照 2021-01-24 下午9.33.07.png

常用命令

  1. Redis Psubscribe 命令
    Redis Psubscribe 命令訂閱一個(gè)或多個(gè)符合給定模式的頻道。
    每個(gè)模式以 * 作為匹配符,比如 it* 匹配所有以 it 開頭的頻道( it.news 、 it.blog 、 it.tweets 等等)。 news.* 匹配所有以 news. 開頭的頻道( news.it 、 news.global.today 等等),諸如此類。

redis Psubscribe 命令基本語法如下:

redis 127.0.0.1:6379> PSUBSCRIBE pattern [pattern ...]
  1. Redis Pubsub 命令
    Redis Pubsub 命令用于查看訂閱與發(fā)布系統(tǒng)狀態(tài),它由數(shù)個(gè)不同格式的子命令組成。

redis Pubsub 命令基本語法如下:

redis 127.0.0.1:6379> PUBSUB <subcommand> [argument [argument ...]]

返回值
由活躍頻道組成的列表。
例:

127.0.0.1:6379> PUBSUB CHANNELS
1) "redisChat"
  1. Redis Publish 命令
    Redis Publish 命令用于將信息發(fā)送到指定的頻道。

redis Publish 命令基本語法如下:

redis 127.0.0.1:6379> PUBLISH channel message

返回值
接收到信息的訂閱者數(shù)量。

127.0.0.1:6379> PUBLISH mychannel "hello, i m here"
(integer) 0
  1. Redis Punsubscribe 命令
    Redis Punsubscribe 命令用于退訂所有給定模式的頻道。

redis Punsubscribe 命令基本語法如下:

redis 127.0.0.1:6379> PUNSUBSCRIBE [pattern [pattern ...]]

返回值
這個(gè)命令在不同的客戶端中有不同的表現(xiàn)。

127.0.0.1:6379> PUNSUBSCRIBE redisChat
1) "punsubscribe"
2) "redisChat"
3) (integer) 0
  1. Redis Subscribe 命令
    Redis Subscribe 命令用于訂閱給定的一個(gè)或多個(gè)頻道的信息。。

redis Subscribe 命令基本語法如下:

redis 127.0.0.1:6379> SUBSCRIBE channel [channel ...]
  1. Redis Unsubscribe 命令
    Redis Unsubscribe 命令用于退訂給定的一個(gè)或多個(gè)頻道的信息。
    指示客戶端退訂給定的頻道。
    如果沒有頻道被指定,也即是,一個(gè)無參數(shù)的 <tt class="docutils literal" style="background-color: transparent; font-size: 1.1em; font-family: monospace;">UNSUBSCRIBE</tt> 調(diào)用被執(zhí)行,那么客戶端使用 SUBSCRIBE 命令訂閱的所有頻道都會(huì)被退訂。在這種情況下,命令會(huì)返回一個(gè)信息,告知客戶端所有被退訂的頻道。

redis Unsubscribe 命令基本語法如下:

redis 127.0.0.1:6379> UNSUBSCRIBE channel [channel ...]

127.0.0.1:6379> UNSUBSCRIBE redisChat
1) "unsubscribe"
2) "redisChat"
3) (integer) 0

返回值
這個(gè)命令在不同的客戶端中有不同的表現(xiàn)。

代碼示例

可以通過spring-redis中的redisTemplate工具輔助實(shí)現(xiàn):

  1. 發(fā)布消息
/**
 * redis發(fā)布消息
 *
 * @param channel
 * @param message
 */
public void sendMessage(String channel, String message) {
    redisTemplate.convertAndSend(channel, message);
}
  1. 監(jiān)聽消息
    監(jiān)聽消息需要兩步,消息監(jiān)聽類并在xml中注冊(cè)這個(gè)類
    監(jiān)聽類有兩種實(shí)現(xiàn)方式一種是實(shí)現(xiàn)org.springframework.data.redis.connection.MessageListener接口,實(shí)現(xiàn)onMessage方法示例代碼如下:
@Component
public class RedisMessageListener implements MessageListener {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    private static Logger logger = Logger.getLogger(RedisMessageListener.class);
    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] body = message.getBody(); // 請(qǐng)使用valueSerializer
        byte[] channel = message.getChannel();
        /*
           請(qǐng)參考配置文件,本例中key,value的序列化方式均為string。
           其中key必須為stringSerializer。和redisTemplate.convertAndSend對(duì)應(yīng)
         */
        String msgContent = (String) redisTemplate.getValueSerializer().deserialize(body);
        String topic = (String) redisTemplate.getStringSerializer().deserialize(channel);
        logger.info("redis--topic:" + topic + "  body:" + msgContent);
    }
}

可以使用自己定義的類,方法名稱自己定義,示例如下:

@Component
public class EventListener {
    private static Logger logger = Logger.getLogger(EventListener.class);
    public void getMessage(String message, String channel) {
        logger.info(message);
    }
} 

這兩中方式實(shí)現(xiàn)的不同在于注冊(cè)監(jiān)聽器時(shí)的配置略有不同

  1. redis配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p" xmlns:redis="http://www.springframework.org/schema/redis"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd                           http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis-1.0.xsd">

<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
    <property name="maxIdle" value="${redis.maxIdle}" />
    <!-- <property name="maxActive" value="${redis.maxActive}" /> -->
    <property name="maxWaitMillis" value="${redis.maxWaitMillis}" />
    <property name="testOnBorrow" value="${redis.testOnBorrow}" />
</bean>
<!--注意使用訂閱發(fā)布時(shí),此bean必須命名為redisConnectionFactory,否則需要在listener中指明連接工廠-->
<bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
      p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.pass}"
      p:pool-config-ref="poolConfig"/>

<bean id="redisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

<bean id="stringRedisSerializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
<!--此處注冊(cè)監(jiān)聽器,需要指定通道名稱(topic)(可以使用正則表達(dá)式*_等等),第一種為實(shí)現(xiàn)MessageListener接口的監(jiān)聽器的注冊(cè),第二種為自己定義的類的注冊(cè)需要制定處理方法名稱(不制定的默認(rèn)方法為handleMessage,如果你的方法是這個(gè)名稱可以不指定)與序列化的方式,推薦使用第一種方式-->
<redis:listener-container>
    <redis:listener ref="redisMessageListener" topic="talk"/>
    <redis:listener ref="eventListener" topic="talk*" method="getMessage"
                    serializer="stringRedisSerializer"></redis:listener>
</redis:listener-container>
</beans>

實(shí)際案例分析

場景是這樣的:有個(gè)發(fā)布消息的終端,同時(shí)訂閱該通道的有多個(gè)終端,由于Redis發(fā)布訂閱模式不能實(shí)現(xiàn)查看歷史消息(3d內(nèi)消息)且后來訂閱該通道的終端也看不到歷史消息。
顧可以以下草圖解決方案:

屏幕快照 2021-01-24 下午10.07.30.png

客戶端A為訂閱端: 歷史消息分為兩種,一種更老的數(shù)據(jù)放入數(shù)據(jù)庫,另外一種是3d內(nèi)的消息,放入sorted_set中(使用該結(jié)構(gòu)的特性:無重復(fù)且可以排序,以時(shí)間為滑動(dòng)窗口來實(shí)現(xiàn));實(shí)時(shí)性的消息從通道中獲取即可;
B為發(fā)布消息端: 直接發(fā)布消息到通道,同時(shí)往sorted_set結(jié)構(gòu)中發(fā)一份,和kafka中放一份。但仔細(xì)一想該方案有些不妥,如果B往通道發(fā)送完之后,自己掛了呢?kafka和sorted_set中都沒發(fā)送數(shù)據(jù),顧歷史消息訂閱者無法看到。
請(qǐng)看下圖如何解決上述問題的?是不是一目了然了,不用我多說了吧,呵呵!
屏幕快照 2021-01-24 下午10.33.55.png

————————————————————
坐標(biāo)帝都,白天上班族,晚上是知識(shí)的分享者
如果讀完覺得有收獲的話,歡迎點(diǎn)贊加關(guān)注

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過簡信或評(píng)論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

  • redis-訂閱與發(fā)布 Redis 通過 PUBLISH 、 SUBSCRIBE 等命令實(shí)現(xiàn)了訂閱與發(fā)布模式, 這...
    全能程序猿閱讀 5,906評(píng)論 0 4
  • 一、發(fā)布訂閱簡介 Redis 發(fā)布訂閱(pub/sub)是一種消息通信模式:發(fā)送者(pub)發(fā)送消息,訂閱者(su...
    無劍_君閱讀 923評(píng)論 0 1
  • [TOC] 1. 介紹 在軟件架構(gòu)中,發(fā)布-訂閱是一種消息范式,消息的發(fā)送者(稱為發(fā)布者)不會(huì)將消息直接發(fā)送給特定...
    為愛瘋狂_3850閱讀 359評(píng)論 0 0
  • Redis的發(fā)布與訂閱 Redis的發(fā)布與訂閱功能由PUBLISH、SUBSCRIBE、PSUBSCRIBE等命令...
    HRADPX閱讀 529評(píng)論 0 0
  • 久違的晴天,家長會(huì)。 家長大會(huì)開好到教室時(shí),離放學(xué)已經(jīng)沒多少時(shí)間了。班主任說已經(jīng)安排了三個(gè)家長分享經(jīng)驗(yàn)。 放學(xué)鈴聲...
    飄雪兒5閱讀 7,819評(píng)論 16 22

友情鏈接更多精彩內(nèi)容