簡介
Redis 發(fā)布訂閱(pub/sub)是一種消息通信模式:發(fā)送者(pub)發(fā)送消息,訂閱者(sub)接收消息。可以用于消息的傳輸,Redis的發(fā)布訂閱機(jī)制包括三個(gè)部分,發(fā)布者,訂閱者和Channel。適宜做在線聊天、消息推送等。
發(fā)布者和訂閱者都是Redis客戶端,Channel則為Redis服務(wù)器端,發(fā)布者將消息發(fā)送到某個(gè)的頻道,訂閱了這個(gè)頻道的訂閱者就能接收到這條消息,Redis 客戶端可以訂閱任意數(shù)量的頻道。

上圖展示了頻道 channel1 ,以及訂閱這個(gè)頻道的3個(gè)客戶端 - client2/client5/lient1 之間的關(guān)系

當(dāng)有新消息通過 PUBLISH 命令發(fā)送給頻道 channel1 時(shí), 這個(gè)消息就會(huì)被發(fā)送給訂閱它的3個(gè)客戶端(client2/client5/lient1)
發(fā)布訂閱
- 發(fā)布消息
Redis采用PUBLISH命令發(fā)送消息,其返回值為接收到該消息的訂閱者的數(shù)量。
C:\Users\Administrator>redis-cli
127.0.0.1:6379> PUBLISH redisChat "Redis is a great caching technique"
(integer) 1
127.0.0.1:6379> PUBLISH redisChat "Learn redis"
(integer) 1
127.0.0.1:6379>
- 訂閱某個(gè)頻道
Redis采用SUBSCRIBE命令訂閱某個(gè)頻道,其返回值包括客戶端訂閱的頻道,目前已訂閱的頻道數(shù)量,以及接收到的消息,其中subscribe表示已經(jīng)成功訂閱了某個(gè)頻道。
C:\Users\Administrator>redis-cli
127.0.0.1:6379> SUBSCRIBE redisChat
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "redisChat"
3) (integer) 1

常用命令
- Redis Psubscribe 命令
Redis Psubscribe 命令訂閱一個(gè)或多個(gè)符合給定模式的頻道。
每個(gè)模式以 * 作為匹配符,比如 it* 匹配所有以 it 開頭的頻道( it.news 、 it.blog 、 it.tweets 等等)。 news.* 匹配所有以 news. 開頭的頻道( news.it 、 news.global.today 等等),諸如此類。
redis Psubscribe 命令基本語法如下:
redis 127.0.0.1:6379> PSUBSCRIBE pattern [pattern ...]
- Redis Pubsub 命令
Redis Pubsub 命令用于查看訂閱與發(fā)布系統(tǒng)狀態(tài),它由數(shù)個(gè)不同格式的子命令組成。
redis Pubsub 命令基本語法如下:
redis 127.0.0.1:6379> PUBSUB <subcommand> [argument [argument ...]]
返回值
由活躍頻道組成的列表。
例:
127.0.0.1:6379> PUBSUB CHANNELS
1) "redisChat"
- Redis Publish 命令
Redis Publish 命令用于將信息發(fā)送到指定的頻道。
redis Publish 命令基本語法如下:
redis 127.0.0.1:6379> PUBLISH channel message
返回值
接收到信息的訂閱者數(shù)量。
127.0.0.1:6379> PUBLISH mychannel "hello, i m here"
(integer) 0
- Redis Punsubscribe 命令
Redis Punsubscribe 命令用于退訂所有給定模式的頻道。
redis Punsubscribe 命令基本語法如下:
redis 127.0.0.1:6379> PUNSUBSCRIBE [pattern [pattern ...]]
返回值
這個(gè)命令在不同的客戶端中有不同的表現(xiàn)。
127.0.0.1:6379> PUNSUBSCRIBE redisChat
1) "punsubscribe"
2) "redisChat"
3) (integer) 0
- Redis Subscribe 命令
Redis Subscribe 命令用于訂閱給定的一個(gè)或多個(gè)頻道的信息。。
redis Subscribe 命令基本語法如下:
redis 127.0.0.1:6379> SUBSCRIBE channel [channel ...]
- Redis Unsubscribe 命令
Redis Unsubscribe 命令用于退訂給定的一個(gè)或多個(gè)頻道的信息。
指示客戶端退訂給定的頻道。
如果沒有頻道被指定,也即是,一個(gè)無參數(shù)的 <tt class="docutils literal" style="background-color: transparent; font-size: 1.1em; font-family: monospace;">UNSUBSCRIBE</tt> 調(diào)用被執(zhí)行,那么客戶端使用 SUBSCRIBE 命令訂閱的所有頻道都會(huì)被退訂。在這種情況下,命令會(huì)返回一個(gè)信息,告知客戶端所有被退訂的頻道。
redis Unsubscribe 命令基本語法如下:
redis 127.0.0.1:6379> UNSUBSCRIBE channel [channel ...]
127.0.0.1:6379> UNSUBSCRIBE redisChat
1) "unsubscribe"
2) "redisChat"
3) (integer) 0
返回值
這個(gè)命令在不同的客戶端中有不同的表現(xiàn)。
代碼示例
可以通過spring-redis中的redisTemplate工具輔助實(shí)現(xiàn):
- 發(fā)布消息
/**
* redis發(fā)布消息
*
* @param channel
* @param message
*/
public void sendMessage(String channel, String message) {
redisTemplate.convertAndSend(channel, message);
}
- 監(jiān)聽消息
監(jiān)聽消息需要兩步,消息監(jiān)聽類并在xml中注冊(cè)這個(gè)類
監(jiān)聽類有兩種實(shí)現(xiàn)方式一種是實(shí)現(xiàn)org.springframework.data.redis.connection.MessageListener接口,實(shí)現(xiàn)onMessage方法示例代碼如下:
@Component
public class RedisMessageListener implements MessageListener {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static Logger logger = Logger.getLogger(RedisMessageListener.class);
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] body = message.getBody(); // 請(qǐng)使用valueSerializer
byte[] channel = message.getChannel();
/*
請(qǐng)參考配置文件,本例中key,value的序列化方式均為string。
其中key必須為stringSerializer。和redisTemplate.convertAndSend對(duì)應(yīng)
*/
String msgContent = (String) redisTemplate.getValueSerializer().deserialize(body);
String topic = (String) redisTemplate.getStringSerializer().deserialize(channel);
logger.info("redis--topic:" + topic + " body:" + msgContent);
}
}
可以使用自己定義的類,方法名稱自己定義,示例如下:
@Component
public class EventListener {
private static Logger logger = Logger.getLogger(EventListener.class);
public void getMessage(String message, String channel) {
logger.info(message);
}
}
這兩中方式實(shí)現(xiàn)的不同在于注冊(cè)監(jiān)聽器時(shí)的配置略有不同
- redis配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p" xmlns:redis="http://www.springframework.org/schema/redis"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis-1.0.xsd">
<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxIdle" value="${redis.maxIdle}" />
<!-- <property name="maxActive" value="${redis.maxActive}" /> -->
<property name="maxWaitMillis" value="${redis.maxWaitMillis}" />
<property name="testOnBorrow" value="${redis.testOnBorrow}" />
</bean>
<!--注意使用訂閱發(fā)布時(shí),此bean必須命名為redisConnectionFactory,否則需要在listener中指明連接工廠-->
<bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.pass}"
p:pool-config-ref="poolConfig"/>
<bean id="redisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
<bean id="stringRedisSerializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
<!--此處注冊(cè)監(jiān)聽器,需要指定通道名稱(topic)(可以使用正則表達(dá)式*_等等),第一種為實(shí)現(xiàn)MessageListener接口的監(jiān)聽器的注冊(cè),第二種為自己定義的類的注冊(cè)需要制定處理方法名稱(不制定的默認(rèn)方法為handleMessage,如果你的方法是這個(gè)名稱可以不指定)與序列化的方式,推薦使用第一種方式-->
<redis:listener-container>
<redis:listener ref="redisMessageListener" topic="talk"/>
<redis:listener ref="eventListener" topic="talk*" method="getMessage"
serializer="stringRedisSerializer"></redis:listener>
</redis:listener-container>
</beans>
實(shí)際案例分析
場景是這樣的:有個(gè)發(fā)布消息的終端,同時(shí)訂閱該通道的有多個(gè)終端,由于Redis發(fā)布訂閱模式不能實(shí)現(xiàn)查看歷史消息(3d內(nèi)消息)且后來訂閱該通道的終端也看不到歷史消息。
顧可以以下草圖解決方案:

客戶端A為訂閱端: 歷史消息分為兩種,一種更老的數(shù)據(jù)放入數(shù)據(jù)庫,另外一種是3d內(nèi)的消息,放入sorted_set中(使用該結(jié)構(gòu)的特性:無重復(fù)且可以排序,以時(shí)間為滑動(dòng)窗口來實(shí)現(xiàn));實(shí)時(shí)性的消息從通道中獲取即可;
B為發(fā)布消息端: 直接發(fā)布消息到通道,同時(shí)往sorted_set結(jié)構(gòu)中發(fā)一份,和kafka中放一份。但仔細(xì)一想該方案有些不妥,如果B往通道發(fā)送完之后,自己掛了呢?kafka和sorted_set中都沒發(fā)送數(shù)據(jù),顧歷史消息訂閱者無法看到。
請(qǐng)看下圖如何解決上述問題的?是不是一目了然了,不用我多說了吧,呵呵!

————————————————————
坐標(biāo)帝都,白天上班族,晚上是知識(shí)的分享者
如果讀完覺得有收獲的話,歡迎點(diǎn)贊加關(guān)注