使用Spring Data Redis 發(fā)布訂閱消息

使用 Spring Data Redis 發(fā)布訂閱消息

1. 概述

在 Redis 中,發(fā)布者并沒(méi)有將消息發(fā)送給特定的訂閱者。是將發(fā)布的消息被劃分為通道,并不知道會(huì)有哪些訂閱者(如果有的話)。

類似地,訂閱者表示對(duì)一個(gè)或多個(gè)主題感興趣,并且只接收感興趣的消息,而不知道有哪些發(fā)布者(如果有的話)。

發(fā)布者和訂閱者的這種解耦可以實(shí)現(xiàn)更大的可伸縮性和更動(dòng)態(tài)的網(wǎng)絡(luò)拓?fù)洹?/p>

2. Redis 配置

讓我們開(kāi)始添加消息隊(duì)列所需的配置。

首先,我們將定義一個(gè) MessageListenerAdapter,其中包含名為 RedisMessageSubscriberMessageListener 接口的自定義實(shí)現(xiàn)。這個(gè) bean 充當(dāng)發(fā)布-訂閱消息模型中的訂閱者:

@Bean
MessageListenerAdapter messageListener() { 
    return new MessageListenerAdapter(new RedisMessageSubscriber());
}

RedisMessageListenerContainer 是 Spring Data Redis 提供的一個(gè)類。這是內(nèi)部調(diào)用的,根據(jù) Spring Data Redis 文檔 的說(shuō)法 —— “處理監(jiān)聽(tīng)、轉(zhuǎn)換和消息調(diào)度的底層細(xì)節(jié)。”

@Bean
RedisMessageListenerContainer redisContainer() {
    RedisMessageListenerContainer container 
      = new RedisMessageListenerContainer(); 
    container.setConnectionFactory(jedisConnectionFactory()); 
    container.addMessageListener(messageListener(), topic()); 
    return container; 
}

我們還將使用定制的 MessagePublisher 接口和 RedisMessagePublisher 實(shí)現(xiàn)創(chuàng)建 bean。這樣,我們可以有一個(gè)通用的消息發(fā)布 API,并讓 Redis 實(shí)現(xiàn)采用 redisTemplatetopic 作為構(gòu)造函數(shù)參數(shù):

@Bean
MessagePublisher redisPublisher() { 
    return new RedisMessagePublisher(redisTemplate(), topic());
}

最后,我們將設(shè)置一個(gè)主題,發(fā)布者將向其發(fā)送消息,訂閱者將接收消息:

@Bean
ChannelTopic topic() {
    return new ChannelTopic("messageQueue");
}

3. 發(fā)布消息

3.1. 定義 MessagePublisher 接口

Spring Data Redis 沒(méi)有提供用于消息分發(fā)的 MessagePublisher 接口。:我們可以定義一個(gè)自定義接口,它將在實(shí)現(xiàn)中使用 redisTemplate:

public interface MessagePublisher {
    void publish(String message);
}

3.2. RedisMessagePublisher 實(shí)現(xiàn)[

我們接下來(lái)提供 MessagePublisher 接口的實(shí)現(xiàn),添加消息發(fā)布的細(xì)節(jié)并使用 redisTemplate 中的函數(shù)。

該模板包含了一組非常豐富的函數(shù),用于廣泛的操作—— 其中 convertAndSend 能夠通過(guò)主題向隊(duì)列發(fā)送消息:

public class RedisMessagePublisher implements MessagePublisher {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    @Autowired
    private ChannelTopic topic;

    public RedisMessagePublisher() {
    }

    public RedisMessagePublisher(
      RedisTemplate<String, Object> redisTemplate, ChannelTopic topic) {
      this.redisTemplate = redisTemplate;
      this.topic = topic;
    }

    public void publish(String message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

如您所見(jiàn),發(fā)布者實(shí)現(xiàn)非常簡(jiǎn)單。它使用 redisTemplateconvertAndSend() 方法格式化給定的消息并將其發(fā)布到配置的主題。

主題實(shí)現(xiàn)了發(fā)布和訂閱語(yǔ)義:當(dāng)消息發(fā)布時(shí),它將發(fā)送給所有注冊(cè)偵聽(tīng)該主題的訂閱者。

4. 訂閱消息

RedisMessageSubscriber 實(shí)現(xiàn)了 Spring Data Redis 提供的 MessageListener 接口:

@Service
public class RedisMessageSubscriber implements MessageListener {

    public static List<String> messageList = new ArrayList<String>();

    public void onMessage(Message message, byte[] pattern) {
        messageList.add(message.toString());
        System.out.println("Message received: " + message.toString());
    }
}

注意,還有第二個(gè)參數(shù) pattern,在本例中我們沒(méi)有使用它。Spring Data Redis 文檔指出,該參數(shù)表示“匹配通道的模式(如果指定)”,但它可以為 null。

5. 發(fā)送與接收消息

現(xiàn)在我們把它們結(jié)合起來(lái)。我們創(chuàng)建一個(gè)消息,然后使用 RedisMessagePublisher 發(fā)布它:

String message = "Message " + UUID.randomUUID();
redisMessagePublisher.publish(message);

當(dāng)我們調(diào)用 publish(message) 時(shí),內(nèi)容被發(fā)送到 Redis,在那里它被路由到我們的發(fā)布者中定義的消息隊(duì)列主題。然后將它分發(fā)給該主題的訂閱者。

您可能已經(jīng)注意到 RedisMessageSubscriber 是一個(gè)偵聽(tīng)器,它將自己注冊(cè)到隊(duì)列以檢索消息。

消息到達(dá)時(shí),訂閱者定義的 onMessage() 方法被觸發(fā)。

在我們的例子中,我們可以通過(guò)檢查 RedisMessageSubscriber 中的 messageList 來(lái)驗(yàn)證我們已經(jīng)收到了已經(jīng)發(fā)布的消息:

RedisMessageSubscriber.messageList.get(0).contains(message)

6. 結(jié)論

在本文中,我們研究了使用Spring Data Redis 實(shí)現(xiàn)的發(fā)布/訂閱消息隊(duì)列。

上述示例的實(shí)現(xiàn)可以在 GitHub project 項(xiàng)目中找到。


【注】本文譯自:PubSub Messaging with Spring Data Redis | Baeldung


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容