springboot整合redis消息隊列

前言

消息隊列作為一種常用的異步通信解決方案,而redis是一款高性能的nosql產(chǎn)品,今天就給大家介紹一下,如何使用redis實現(xiàn)消息隊列,并整合到springboot。

兩個消息模型

  1. 隊列模型
    隊列模型如圖所示,它具有以下幾個特點,就像我們用微信和好友(群聊除外)聊天一樣,微信就是這個隊列,我們可以和很多個好友聊天,但是每條消息只能發(fā)給一個好友。
  • 只有一個消費者將獲得消息
  • 生產(chǎn)者不需要在接收者消費該消息期間處于運行狀態(tài),接收者也同樣不需要在消息發(fā)送時處于運行狀態(tài)。
  • 每一個成功處理的消息都由接收者簽收
隊列模型
  1. 發(fā)布/訂閱模型
    發(fā)布/訂閱模型如圖所示,不用說,和訂閱公眾號是一樣的。
  • 多個消費者可以獲得消息
  • 在發(fā)布者和訂閱者之間存在時間依賴性。發(fā)布者需要建立一個topic,以便客戶能夠購訂閱。訂閱者必須保持持續(xù)的活動狀態(tài)以接收消息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連接時發(fā)布的消息將在訂閱者重新連接時重新發(fā)布。
發(fā)布/訂閱模型

redis如何實現(xiàn)

  1. 對于隊列模型,我們可以使用redis的list數(shù)據(jù)結(jié)構(gòu),通過LPUSH和RPOP來實現(xiàn)一個隊列。
  2. 發(fā)布/訂閱模型就更簡單了,redis官方就支持,而且還可以使用PSUBSCRIBE支持模式匹配,使用如下命令,即可訂閱所有f開頭的訂閱,具體可查看文檔
PSUBSCRIBE f*
  1. keyspace notifications(鍵空間通知)
    該功能是在redis2.8之后引入的,即客戶端可以通過pub/sub機制,接收key的變更的消息。換句話說,就是redis官方提供了一些topic,幫助我們?nèi)ケO(jiān)聽redis數(shù)據(jù)庫中的key,我曾經(jīng)就使用其中的'keyevent@0:expired'實現(xiàn)了定時任務(wù)。

和spring boot整合

首先得介紹一下spring-data-redis中的兩種template的默認serializer,當然spring還提供其他的序列化器,具體可查看文檔,也可以自己實現(xiàn)RedisSerializer接口,構(gòu)建自己的序列化器。

template default serializer serialization
RedisTemplate JdkSerializationRedisSerializer 序列化String類型的key和value
StringRedisTemplate StringRedisSerializer 使用Java序列化

終于到了寫代碼的時候了,先從發(fā)布/訂閱說起吧,因為spring官方給了示例。但是呢,示例里面的消息是String類型,對于我們的業(yè)務(wù)來說,可能更需要一個POJO,所以還需要改造一下,走起。

  1. 先學(xué)習(xí)下org.springframework.data.redis.listener.adapter.MessageListenerAdapter源碼如下,可以看到,如果使用StringRedisTemplate的話,默認都是使用StringRedisSerializer來反序列化,而如果想主動接收消息,則需要實現(xiàn)MessageListener接口。
    /**
     * Standard Redis {@link MessageListener} entry point.
     * <p>
     * Delegates the message to the target listener method, with appropriate conversion of the message argument. In case
     * of an exception, the {@link #handleListenerException(Throwable)} method will be invoked.
     * 
     * @param message the incoming Redis message
     * @see #handleListenerException
     */
    public void onMessage(Message message, byte[] pattern) {
        try {
            // Check whether the delegate is a MessageListener impl itself.
            // In that case, the adapter will simply act as a pass-through.
            if (delegate != this) {
                if (delegate instanceof MessageListener) {
                    ((MessageListener) delegate).onMessage(message, pattern);
                    return;
                }
            }

            // Regular case: find a handler method reflectively.
            Object convertedMessage = extractMessage(message);
            String convertedChannel = stringSerializer.deserialize(pattern);
            // Invoke the handler method with appropriate arguments.
            Object[] listenerArguments = new Object[] { convertedMessage, convertedChannel };

            invokeListenerMethod(invoker.getMethodName(), listenerArguments);
        } catch (Throwable th) {
            handleListenerException(th);
        }
    }

    /**
     * Extract the message body from the given Redis message.
     * 
     * @param message the Redis <code>Message</code>
     * @return the content of the message, to be passed into the listener method as argument
     */
    protected Object extractMessage(Message message) {
        if (serializer != null) {
            return serializer.deserialize(message.getBody());
        }
        return message.getBody();
    }

    /**
     * Initialize the default implementations for the adapter's strategies.
     * 
     * @see #setSerializer(RedisSerializer)
     * @see JdkSerializationRedisSerializer
     */
    protected void initDefaultStrategies() {
        RedisSerializer<String> serializer = new StringRedisSerializer();
        setSerializer(serializer);
        setStringSerializer(serializer);
    }
  1. 將StringRedisTemplate替換為RedisTemplate
    @Bean(name = "redisTemplate")
    RedisTemplate<?, ?> template(RedisConnectionFactory connectionFactory) {
        RedisTemplate<?, ?> template = new RedisTemplate<>();
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new JdkSerializationRedisSerializer());
        template.setConnectionFactory(connectionFactory);
        return template;
    }
  1. 改造Receiver
public class Receiver implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        RedisSerializer serializer = new JdkSerializationRedisSerializer();
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        //接收的topic
        String channel = stringRedisSerializer.deserialize(message.getChannel());
        //消息的POJO
        Object o = serializer.deserialize(message.getBody());
    }
}

至此,發(fā)布/訂閱的整合改造就完成了。

接下來就是消息隊列了,這個就需要自己造輪子了,在spring中使用redisTemlate操作數(shù)據(jù)庫,而對于不同的數(shù)據(jù)類型則需要不同的操作方式,如下表格所示,具體還是請看官方文檔
實現(xiàn)隊列選擇list數(shù)據(jù)結(jié)構(gòu),redisTemplate.opsForList()使用起來非常簡單,和redis命令基本一致。

數(shù)據(jù)類型 操作方式
string redisTemplate.opsForValue()
hash redisTemplate.opsForHash()
list redisTemplate.opsForList()
set redisTemplate.opsForSet()
  1. 先定義一個消息的POJO
import java.io.Serializable;

public class Message implements Serializable{

    private String id;

    private String content;

    public Message() {
    }

    public Message(String id, String content) {
        this.id = id;
        this.content = content;
    }


    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    @Override
    public String toString() {
        return "Message{" +
                "id='" + id + '\'' +
                ", content='" + content + '\'' +
                '}';
    }
}
  1. 消息的生產(chǎn)者,這個類提供一個發(fā)送消息的方法。
@Service
public class MessageProducerService {

    @Autowired
    RedisTemplate<String, Message> redisTemplate;

    public Long sendMessage(Message message) {
        System.out.println("發(fā)送了"+ message);
        return redisTemplate.opsForList().leftPush("queue", message);
    }

}
  1. 消息的消費者,消費者需要不斷輪詢隊列,有消息便取出來,實現(xiàn)方式如下:
@Service
public class MessageConsumerService extends Thread {

    @Autowired
    RedisTemplate<String, Message> redisTemplate;


    @Override
    public void run() {
        while (true){
            Message message = redisTemplate.opsForList().rightPop("queue", 1000L, TimeUnit.SECONDS);
            System.out.println("接收到了" + message);
        }
    }
}

至此,消息隊列的方式也整合完成了。
雖然redisTemplate是線程安全的,但是如果一個隊列有多個接收者的話,可能也還需要考慮一下并發(fā)的問題。
最后有什么問題,歡迎大家給我留言,謝謝。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,537評論 19 139
  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 47,262評論 6 342
  • 今天2月4日寒假的最后一天,開學(xué)前一天,一大早媽媽把我叫起來嘮嘮叨叨沒完。讓我刷牙,洗臉,剪指甲,洗頭,整理...
    小太陽教室張耿嘉閱讀 447評論 0 0
  • 忘了多拍幾張了ㄟ( ▔, ▔ )ㄏ,就發(fā)個成品吧
    桅笑閱讀 308評論 0 1
  • 這里簡單介紹下 Linux 下 Phalcon 的安裝及配置,更多內(nèi)容請查看官網(wǎng)安裝介紹 Installation...
    野塵lxw閱讀 1,052評論 0 0

友情鏈接更多精彩內(nèi)容