Spring Boot消息隊列敲門磚:Redis中的發(fā)布訂閱模式

前言

來啦老鐵!

筆者學(xué)習(xí)Spring Boot有一段時間了,截至目前已實踐、總結(jié)了25篇Spring Boot系列學(xué)習(xí)文章,感興趣的同學(xué)可以關(guān)注專題一起學(xué)習(xí)吧!

Spring Boot全家桶

在前2篇文章中,我們一起實踐實現(xiàn)了一個基本的個人云盤應(yīng)用,我個人會在后續(xù)繼續(xù)優(yōu)化功能,以達到能交付使用的程度,有關(guān)注的同學(xué),可以注意一下git倉庫內(nèi)的代碼變化!

而接下來我們來學(xué)點什么呢?

接下來我打算開始學(xué)習(xí)Message Queue,即消息隊列。

筆者之前接觸過SQS(Simple Queue Service),但僅有一點點經(jīng)驗,不足掛齒,而反觀消息隊列的重要性、在實際應(yīng)用場景的廣泛性等,覺得是時候推開未知的大門,看個究竟了!

筆者計劃基于Spring Boot從幾個方面入手學(xué)習(xí)消息隊列:

  1. Redis中的發(fā)布訂閱模式;
  2. Spring Boot中集成RocketMQ;
  3. Spring Boot中集成RabbitMQ;
  4. Spring Boot中集成Kafka;
  5. 嘗試對幾款消息隊列系統(tǒng)進行對比、評價!

生命有限,今天我們僅先學(xué)習(xí)Redis中的發(fā)布訂閱模式!

代碼基于之前學(xué)習(xí)Redis時使用的Git Hub倉庫演進,歡迎取閱:

整體步驟

  1. 編寫消息發(fā)布接口;
  2. 編寫消息訂閱監(jiān)聽實現(xiàn)類;
  3. 配置消息訂閱監(jiān)聽;
  4. Redis發(fā)布訂閱演示;

項目代碼改動如下:

項目代碼改動

1. 編寫消息發(fā)布接口;

通過調(diào)用接口的方式,將消息發(fā)布到channel中:

package com.github.dylanz666.listener;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
 * @author : dylanz
 * @since : 12/06/2020
 */
@Component
public class MessageReceiver implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] bytes) {
        byte[] c = message.getChannel();
        byte[] b = message.getBody();

        try {
            String channel = new String(c, StandardCharsets.UTF_8);
            String body = new String(b, StandardCharsets.UTF_8);

            //在此處做接收到消息后的業(yè)務(wù)處理
            System.out.println("channel: " + channel);
            System.out.println("body: " + body);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2. 編寫消息訂閱監(jiān)聽實現(xiàn)類;

可以新建一個listener包,在包內(nèi)新建一個消息訂閱監(jiān)聽實現(xiàn)類MessageReceiver.java(名字不限),織入如下代碼:

package com.github.dylanz666.listener;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
 * @author : dylanz
 * @since : 12/06/2020
 */
@Component
public class MessageReceiver implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] bytes) {
        byte[] c = message.getChannel();
        byte[] b = message.getBody();

        try {
            String channel = new String(c, StandardCharsets.UTF_8);
            String body = new String(b, StandardCharsets.UTF_8);

            //在此處做接收到消息后的業(yè)務(wù)處理
            System.out.println("channel: " + channel);
            System.out.println("body: " + body);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3. 配置消息訂閱監(jiān)聽;

監(jiān)聽實現(xiàn)類還不夠,我們還要告訴Spring Boot,你要用我的這個監(jiān)聽啦,不然Spring Boot不知道呀!在RedisConfig類中添加配置代碼:

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);

        //此處添加消息監(jiān)聽(即訂閱),可根據(jù)channel的業(yè)務(wù)劃分情況,在此處統(tǒng)一加入多個消息監(jiān)聽
        container.addMessageListener(messageListenerAdapter, new PatternTopic("channel:demo"));
        return container;
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter(MessageReceiver myRedisMessageListener) {
        return new MessageListenerAdapter(myRedisMessageListener, "onMessage");
    }

當(dāng)然,我這里把發(fā)布者和訂閱者都放在同一個服務(wù)內(nèi),這在實際場景中基本是不可能出現(xiàn)的,正常二者是處于不同的服務(wù),通過消息隊列系統(tǒng)實現(xiàn)解耦。

本例只作演示、學(xué)習(xí)用,請忽略這個情況哈!

4. Redis發(fā)布訂閱演示;

啟動項目后,瀏覽器直接調(diào)用發(fā)布API,如:

訂閱
可見,當(dāng)我們通過Redis發(fā)布消息后,訂閱者自動地獲得了消息。

事實上,當(dāng)一個channel被多個訂閱者訂閱后,一有消息發(fā)布到該channel,則所有的訂閱者都能接收到該消息,如圖:

發(fā)布-訂閱(來自菜鳥教程)

我們可以把這個東西理解為群聊,當(dāng)群里有一個人發(fā)了一條消息(發(fā)布者),其他所有在群里的人(訂閱者)都能接收到消息!

整個打通的過程十分簡單,當(dāng)然也正是因為其簡單,也隱藏了一些問題,例如:
  1. 如何保證數(shù)據(jù)傳輸?shù)目煽啃裕驗楹苡锌赡苡嗛喺哌^程由于網(wǎng)絡(luò)等原因?qū)е掠嗛喌臄?shù)據(jù)丟失;

  2. 如何保證Redis本身在該發(fā)布訂閱模式下的穩(wěn)定性,因為Redis本身運行于內(nèi)存,受限于內(nèi)存,當(dāng)發(fā)布的消息不能被及時消費,消息將不斷積累,嚴(yán)重的話將最終導(dǎo)致Redis本身掛掉!

盡管如此,我們?nèi)钥梢詫⒅鳛閷W(xué)習(xí)消息隊列的敲門磚,您說呢?

如果本文對您有幫助,麻煩點贊、關(guān)注!

謝謝!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容