1.什么是pub/sub
Pub/Sub功能(means Publish, Subscribe)即發(fā)布及訂閱功能。基于事件的系統(tǒng)中,Pub/Sub是目前廣泛使用的通信模型,它采用事件作為基本的通信機制,提供大規(guī)模系統(tǒng)所要求的松散耦合的交互模式:訂閱者(如客戶端)以事件訂閱的方式表達出它有興趣接收的一個事件或一類事件;發(fā)布者(如服務(wù)器)可將訂閱者感興趣的事件隨時通知相關(guān)訂閱者。熟悉設(shè)計模式的朋友應(yīng)該了解這與23種設(shè)計模式中的觀察者模式極為相似。
同樣,Redis的pub/sub是一種消息通信模式,主要的目的是解除消息發(fā)布者和消息訂閱者之間的耦合,Redis作為一個pub/sub的server,在訂閱者和發(fā)布者之間起到了消息路由的功能。
2.Redis pub/sub的實現(xiàn)
Redis通過publish和subscribe命令實現(xiàn)訂閱和發(fā)布的功能。訂閱者可以通過subscribe向redis server訂閱自己感興趣的消息類型。redis將信息類型稱為通道(channel)。當(dāng)發(fā)布者通過publish命令向redis server發(fā)送特定類型的信息時,訂閱該消息類型的全部訂閱者都會收到此消息。
客戶端1訂閱CCTV1:
127.0.0.1:6379> subscribe CCTV1
Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "CCTV1"3) (integer) 1
127.0.0.1:6379> subscribe CCTV1
Reading messages... (press Ctrl-C to quit)
客戶端2訂閱CCTV1和CCTV2:
127.0.0.1:6379> subscribe CCTV1 CCTV2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "CCTV1"
3) (integer) 1
1) "subscribe"
2) "CCTV2"
3) (integer) 2
————————————————
此時這兩個客戶端分別監(jiān)聽這指定的頻道。現(xiàn)在另一個客戶端向服務(wù)器推送了關(guān)于這兩個頻道的信息。
127.0.0.1:6379> publish CCTV1 "cctv1 is good"
(integer) 2
//返回2表示兩個客戶端接收了次消息。被接收到消息的客戶端如下所示。
1) "message"
2) "CCTV1"
3) "cctv1 is good"
1) "message"
2) "CCTV1"
3) "cctv1 is good"
如上的訂閱/發(fā)布也稱訂閱發(fā)布到頻道(使用publish與subscribe命令),此外還有訂閱發(fā)布到模式(使用psubscribe來訂閱一個模式)
訂閱CCTV的全部頻道
127.0.0.1:6379> psubscribe CCTV*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "CCTV*"
3) (integer) 1
當(dāng)依然先如上推送一個CCTV1的消息時,該客戶端正常接收。
Pub/Sub在java中的實現(xiàn):
導(dǎo)入Redis驅(qū)動:
dependencies {
? ? compile 'redis.clients:jedis:2.4.2'
}
Redis驅(qū)動包提供了一個抽象類:JedisPubSub…繼承這個類就完成了對客戶端對訂閱的監(jiān)聽。示例代碼:
public class TestPubSub extends JedisPubSub {
? ? @Override
? ? public void onMessage(String channel, String message) {
? ? ? ? // TODO Auto-generated method stub
? ? ? ? System.out.println(channel + "," + message);
? ? }
? ? @Override
? ? public void onPMessage(String pattern, String channel, String message) {
? ? ? ? // TODO Auto-generated method stub
? ? ? ? System.out.println(pattern + "," + channel + "," + message);
? ? }
? ? @Override
? ? public void onSubscribe(String channel, int subscribedChannels) {
? ? ? ? // TODO Auto-generated method stub
? ? ? ? System.out.println("onSubscribe: channel[" + channel + "]," + "subscribedChannels[" + subscribedChannels + "]");
? ? }
? ? @Override
? ? public void onUnsubscribe(String channel, int subscribedChannels) {
? ? ? ? // TODO Auto-generated method stub
? ? ? ? System.out.println(
? ? ? ? ? ? ? ? "onUnsubscribe: channel[" + channel + "], " + "subscribedChannels[" + subscribedChannels + "]");
? ? }
? ? @Override
? ? public void onPUnsubscribe(String pattern, int subscribedChannels) {
? ? ? ? // TODO Auto-generated method stub
? ? ? ? System.out.println("onPUnsubscribe: pattern[" + pattern + "]," +
? ? ? ? ? ? ? ? "subscribedChannels[" + subscribedChannels + "]");
? ? }
? ? @Override
? ? public void onPSubscribe(String pattern, int subscribedChannels) {
? ? ? ? System.out.println("onPSubscribe: pattern[" + pattern + "], " +
? ? ? ? ? ? ? ? "subscribedChannels[" + subscribedChannels + "]");
? ? }
}
如上所示,抽象類中存在六個方法。分別表示
監(jiān)聽到訂閱模式接受到消息時的回調(diào) (onPMessage)
監(jiān)聽到訂閱頻道接受到消息時的回調(diào) (onMessage )
訂閱頻道時的回調(diào)( onSubscribe )
取消訂閱頻道時的回調(diào)( onUnsubscribe )
訂閱頻道模式時的回調(diào) ( onPSubscribe )
取消訂閱模式時的回調(diào)( onPUnsubscribe )
運行我們剛剛編寫的類:
@Test
? ? public void pubsubjava() {
? ? ? ? // TODO Auto-generated method stub
? ? ? ? Jedis jr = null;
? ? ? ? try {? ? ?
? ? ? ? jr = new Jedis("127.0.0.1", 6379, 0);// redis服務(wù)地址和端口號
? ? ? ? ? ? jr.auth("wx950709");
? ? ? ? ? ? TestPubSub sp = new TestPubSub();
? ? ? ? ? ? // jr客戶端配置監(jiān)聽兩個channel
? ? ? ? ? ? sp.subscribe(jr.getClient(), "news.share", "news.blog");
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? } finally{
? ? ? ? ? ? if(jr!=null){
? ? ? ? ? ? ? ? jr.disconnect();
? ? ? ? ? ? }
? ? ? ? }
? ? }
從代碼中我們不難看出,我們聲明的一個redis鏈接在設(shè)置監(jiān)聽后就可以執(zhí)行一些操作,例如發(fā)布消息,訂閱消息等。。。
當(dāng)運行上述代碼后會在控制臺輸出:
onSubscribe: channel[news.share],subscribedChannels[1]
onSubscribe: channel[news.blog],subscribedChannels[2]
//onSubscribe方法成功運行
此時當(dāng)在有客戶端向new.share或者new.blog通道publish消息時,onMessage方法即可被相應(yīng)。(jedis.publish(channel, message))。
Pub/Sub在Spring中的實踐
導(dǎo)入依賴jar
dependencies {
? ? compile 'org.springframework.data:spring-data-redis:1.7.2.RELEASE'
? ? compile 'redis.clients:jedis:2.4.2'
}
Spring配置redis的鏈接:
@Configuration
public class AppConfig {
? ? @Bean
? ? JedisConnectionFactory jedisConnectionFactory() {
? ? ? ? JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
? ? ? ? jedisConnectionFactory.setPassword("xxxx");
? ? ? ? return jedisConnectionFactory;
? ? }
? ? @Bean
? ? RedisTemplate<String, Object> redisTemplate() {
? ? ? ? final RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
? ? ? ? template.setConnectionFactory(jedisConnectionFactory());
? ? ? ? template.setDefaultSerializer(new StringRedisSerializer());
? ? ? ? return template;
? ? }
? ? @Bean
? ? MessageListenerAdapter messageListener() {
? ? ? ? return new MessageListenerAdapter(new RedisMessageListener());
? ? }
? ? @Bean
? ? RedisMessageListenerContainer redisContainer() {
? ? ? ? final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
? ? ? ? container.setConnectionFactory(jedisConnectionFactory());
? ? ? ? container.addMessageListener(messageListener(), topic());
? ? ? ? return container;
? ? }
? ? @Bean
? ? RedisPublisherImpl redisPublisher() {
? ? ? ? return new RedisPublisherImpl(redisTemplate(), topic());
? ? }
? ? @Bean
? ? ChannelTopic topic() {
? ? ? ? return new ChannelTopic( "pubsub:queue" );
? ? }
}
如上的配置即配置了對Redis的鏈接。在RedisTemplate中沒有設(shè)置ip端口等信息則全部為默認的。在配置類中的將ChannelTopic加入IOC容器。則在Spring啟動時會在一個RedisTemplate(一個對Redis的鏈接)中設(shè)置的一個channel,即pubsub:queue。
在上述配置中,RedisMessageListener是我們生成的,這個類即為核心監(jiān)聽類,RedisTemplate接受到數(shù)據(jù)如何處理就是在該類中處理的。
public class RedisMessageListener implements MessageListener {
? ? ? ? @Override
? ? ? ? public void onMessage( final Message message, final byte[] pattern ) {
? ? ? ? ? ? System.out.println("Message received: " + message.toString() );
? ? ? ? }
}
現(xiàn)在我們在獲取RedisTemplate,并給pubsub:queue這個channel publish數(shù)據(jù)。
public class PubSubMain {
? ? RedisTemplate<String,Object> redisTemplate;
? ? public? void execute() {
? ? ? String channel = "pubsub:queue";
? ? ? redisTemplate.convertAndSend(channel, "from testData");
? ? }
? ? public static void main(String[] args) {
? ? ? ? ApplicationContext applicationContext? = new AnnotationConfigApplicationContext(AppConfig.class);
? ? ? ? PubSubMain pubSubMain = new PubSubMain();
? ? ? ? pubSubMain.redisTemplate = (RedisTemplate<String, Object>) applicationContext.getBean("redisTemplate");
? ? ? ? pubSubMain.execute();
? ? }
}
此時運行main 方法:
Message received: from app 12
//表明接受成功,當(dāng)在命令行中啟動一個客戶端并publish時依然可以在客戶端打印出message
后面的文章會多涉及一些關(guān)于Redis的使用場景。
原文鏈接:https://blog.csdn.net/canot/article/details/51938955