Redis的pub/Sub(訂閱與發(fā)布)java 代碼實現(xiàn)

1.什么是pub/sub

Pub/Sub功能(means Publish, Subscribe)即發(fā)布及訂閱功能。基于事件的系統(tǒng)中,Pub/Sub是目前廣泛使用的通信模型,它采用事件作為基本的通信機制,提供大規(guī)模系統(tǒng)所要求的松散耦合的交互模式:訂閱者(如客戶端)以事件訂閱的方式表達出它有興趣接收的一個事件或一類事件;發(fā)布者(如服務(wù)器)可將訂閱者感興趣的事件隨時通知相關(guān)訂閱者。熟悉設(shè)計模式的朋友應(yīng)該了解這與23種設(shè)計模式中的觀察者模式極為相似。

同樣,Redis的pub/sub是一種消息通信模式,主要的目的是解除消息發(fā)布者和消息訂閱者之間的耦合,Redis作為一個pub/sub的server,在訂閱者和發(fā)布者之間起到了消息路由的功能。

2.Redis pub/sub的實現(xiàn)

Redis通過publish和subscribe命令實現(xiàn)訂閱和發(fā)布的功能。訂閱者可以通過subscribe向redis server訂閱自己感興趣的消息類型。redis將信息類型稱為通道(channel)。當(dāng)發(fā)布者通過publish命令向redis server發(fā)送特定類型的信息時,訂閱該消息類型的全部訂閱者都會收到此消息。

客戶端1訂閱CCTV1:

127.0.0.1:6379> subscribe CCTV1

Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "CCTV1"3) (integer) 1

127.0.0.1:6379> subscribe CCTV1

Reading messages... (press Ctrl-C to quit)


客戶端2訂閱CCTV1和CCTV2:

127.0.0.1:6379> subscribe CCTV1 CCTV2

Reading messages... (press Ctrl-C to quit)

1) "subscribe"

2) "CCTV1"

3) (integer) 1

1) "subscribe"

2) "CCTV2"

3) (integer) 2

————————————————


此時這兩個客戶端分別監(jiān)聽這指定的頻道。現(xiàn)在另一個客戶端向服務(wù)器推送了關(guān)于這兩個頻道的信息。

127.0.0.1:6379> publish CCTV1 "cctv1 is good"

(integer) 2

//返回2表示兩個客戶端接收了次消息。被接收到消息的客戶端如下所示。

1) "message"

2) "CCTV1"

3) "cctv1 is good"

1) "message"

2) "CCTV1"

3) "cctv1 is good"

如上的訂閱/發(fā)布也稱訂閱發(fā)布到頻道(使用publish與subscribe命令),此外還有訂閱發(fā)布到模式(使用psubscribe來訂閱一個模式)

訂閱CCTV的全部頻道

127.0.0.1:6379> psubscribe CCTV*

Reading messages... (press Ctrl-C to quit)

1) "psubscribe"

2) "CCTV*"

3) (integer) 1


當(dāng)依然先如上推送一個CCTV1的消息時,該客戶端正常接收。

Pub/Sub在java中的實現(xiàn):

導(dǎo)入Redis驅(qū)動:

dependencies {

? ? compile 'redis.clients:jedis:2.4.2'

}

Redis驅(qū)動包提供了一個抽象類:JedisPubSub…繼承這個類就完成了對客戶端對訂閱的監(jiān)聽。示例代碼:

public class TestPubSub extends JedisPubSub {

? ? @Override

? ? public void onMessage(String channel, String message) {

? ? ? ? // TODO Auto-generated method stub

? ? ? ? System.out.println(channel + "," + message);

? ? }

? ? @Override

? ? public void onPMessage(String pattern, String channel, String message) {

? ? ? ? // TODO Auto-generated method stub

? ? ? ? System.out.println(pattern + "," + channel + "," + message);

? ? }

? ? @Override

? ? public void onSubscribe(String channel, int subscribedChannels) {

? ? ? ? // TODO Auto-generated method stub

? ? ? ? System.out.println("onSubscribe: channel[" + channel + "]," + "subscribedChannels[" + subscribedChannels + "]");

? ? }

? ? @Override

? ? public void onUnsubscribe(String channel, int subscribedChannels) {

? ? ? ? // TODO Auto-generated method stub

? ? ? ? System.out.println(

? ? ? ? ? ? ? ? "onUnsubscribe: channel[" + channel + "], " + "subscribedChannels[" + subscribedChannels + "]");

? ? }

? ? @Override

? ? public void onPUnsubscribe(String pattern, int subscribedChannels) {

? ? ? ? // TODO Auto-generated method stub

? ? ? ? System.out.println("onPUnsubscribe: pattern[" + pattern + "]," +

? ? ? ? ? ? ? ? "subscribedChannels[" + subscribedChannels + "]");

? ? }

? ? @Override

? ? public void onPSubscribe(String pattern, int subscribedChannels) {

? ? ? ? System.out.println("onPSubscribe: pattern[" + pattern + "], " +

? ? ? ? ? ? ? ? "subscribedChannels[" + subscribedChannels + "]");

? ? }

}


如上所示,抽象類中存在六個方法。分別表示

監(jiān)聽到訂閱模式接受到消息時的回調(diào) (onPMessage)

監(jiān)聽到訂閱頻道接受到消息時的回調(diào) (onMessage )

訂閱頻道時的回調(diào)( onSubscribe )

取消訂閱頻道時的回調(diào)( onUnsubscribe )

訂閱頻道模式時的回調(diào) ( onPSubscribe )

取消訂閱模式時的回調(diào)( onPUnsubscribe )

運行我們剛剛編寫的類:

@Test

? ? public void pubsubjava() {

? ? ? ? // TODO Auto-generated method stub

? ? ? ? Jedis jr = null;

? ? ? ? try {? ? ?

? ? ? ? jr = new Jedis("127.0.0.1", 6379, 0);// redis服務(wù)地址和端口號

? ? ? ? ? ? jr.auth("wx950709");

? ? ? ? ? ? TestPubSub sp = new TestPubSub();

? ? ? ? ? ? // jr客戶端配置監(jiān)聽兩個channel

? ? ? ? ? ? sp.subscribe(jr.getClient(), "news.share", "news.blog");

? ? ? ? } catch (Exception e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? } finally{

? ? ? ? ? ? if(jr!=null){

? ? ? ? ? ? ? ? jr.disconnect();

? ? ? ? ? ? }

? ? ? ? }

? ? }

從代碼中我們不難看出,我們聲明的一個redis鏈接在設(shè)置監(jiān)聽后就可以執(zhí)行一些操作,例如發(fā)布消息,訂閱消息等。。。

當(dāng)運行上述代碼后會在控制臺輸出:

onSubscribe: channel[news.share],subscribedChannels[1]

onSubscribe: channel[news.blog],subscribedChannels[2]

//onSubscribe方法成功運行

此時當(dāng)在有客戶端向new.share或者new.blog通道publish消息時,onMessage方法即可被相應(yīng)。(jedis.publish(channel, message))。

Pub/Sub在Spring中的實踐

導(dǎo)入依賴jar

dependencies {

? ? compile 'org.springframework.data:spring-data-redis:1.7.2.RELEASE'

? ? compile 'redis.clients:jedis:2.4.2'

}

Spring配置redis的鏈接:

@Configuration

public class AppConfig {

? ? @Bean

? ? JedisConnectionFactory jedisConnectionFactory() {

? ? ? ? JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();

? ? ? ? jedisConnectionFactory.setPassword("xxxx");

? ? ? ? return jedisConnectionFactory;

? ? }

? ? @Bean

? ? RedisTemplate<String, Object> redisTemplate() {

? ? ? ? final RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();

? ? ? ? template.setConnectionFactory(jedisConnectionFactory());

? ? ? ? template.setDefaultSerializer(new StringRedisSerializer());

? ? ? ? return template;

? ? }

? ? @Bean

? ? MessageListenerAdapter messageListener() {

? ? ? ? return new MessageListenerAdapter(new RedisMessageListener());

? ? }

? ? @Bean

? ? RedisMessageListenerContainer redisContainer() {

? ? ? ? final RedisMessageListenerContainer container = new RedisMessageListenerContainer();

? ? ? ? container.setConnectionFactory(jedisConnectionFactory());

? ? ? ? container.addMessageListener(messageListener(), topic());

? ? ? ? return container;

? ? }

? ? @Bean

? ? RedisPublisherImpl redisPublisher() {

? ? ? ? return new RedisPublisherImpl(redisTemplate(), topic());

? ? }

? ? @Bean

? ? ChannelTopic topic() {

? ? ? ? return new ChannelTopic( "pubsub:queue" );

? ? }

}


如上的配置即配置了對Redis的鏈接。在RedisTemplate中沒有設(shè)置ip端口等信息則全部為默認的。在配置類中的將ChannelTopic加入IOC容器。則在Spring啟動時會在一個RedisTemplate(一個對Redis的鏈接)中設(shè)置的一個channel,即pubsub:queue。

在上述配置中,RedisMessageListener是我們生成的,這個類即為核心監(jiān)聽類,RedisTemplate接受到數(shù)據(jù)如何處理就是在該類中處理的。

public class RedisMessageListener implements MessageListener {

? ? ? ? @Override

? ? ? ? public void onMessage( final Message message, final byte[] pattern ) {

? ? ? ? ? ? System.out.println("Message received: " + message.toString() );

? ? ? ? }

}

現(xiàn)在我們在獲取RedisTemplate,并給pubsub:queue這個channel publish數(shù)據(jù)。

public class PubSubMain {

? ? RedisTemplate<String,Object> redisTemplate;

? ? public? void execute() {

? ? ? String channel = "pubsub:queue";

? ? ? redisTemplate.convertAndSend(channel, "from testData");

? ? }

? ? public static void main(String[] args) {

? ? ? ? ApplicationContext applicationContext? = new AnnotationConfigApplicationContext(AppConfig.class);

? ? ? ? PubSubMain pubSubMain = new PubSubMain();

? ? ? ? pubSubMain.redisTemplate = (RedisTemplate<String, Object>) applicationContext.getBean("redisTemplate");

? ? ? ? pubSubMain.execute();

? ? }

}


此時運行main 方法:

Message received: from app 12

//表明接受成功,當(dāng)在命令行中啟動一個客戶端并publish時依然可以在客戶端打印出message

后面的文章會多涉及一些關(guān)于Redis的使用場景。

原文鏈接:https://blog.csdn.net/canot/article/details/51938955

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容