面試官常問(wèn):如何手寫(xiě)一個(gè)“消息隊(duì)列”和“延遲消息隊(duì)列”?

背景

消息隊(duì)列的使用場(chǎng)景有很多,最常見(jiàn)的使用場(chǎng)景有以下幾個(gè)。

1.商品秒殺

比如,我們?cè)谧雒霘⒒顒?dòng)時(shí),會(huì)發(fā)生短時(shí)間內(nèi)出現(xiàn)爆發(fā)式的用戶請(qǐng)求,如果不采取相關(guān)的措施,會(huì)導(dǎo)致服務(wù)器忙不過(guò)來(lái),響應(yīng)超時(shí)的問(wèn)題,輕則會(huì)導(dǎo)致服務(wù)假死,重則會(huì)讓服務(wù)器直接宕機(jī),給用戶帶來(lái)的體驗(yàn)也非常不好。如果這個(gè)時(shí)候加上了消息隊(duì)列,服務(wù)器接收到用戶的所有請(qǐng)求后,先把這些請(qǐng)求全部寫(xiě)入到消息隊(duì)列中再排隊(duì)處理,這樣就不會(huì)導(dǎo)致同時(shí)處理多個(gè)請(qǐng)求的情況;如果消息隊(duì)列長(zhǎng)度超過(guò)可以承載的最大數(shù)量,那么我們可以拋棄當(dāng)前用戶的請(qǐng)求,通知前臺(tái)用戶“頁(yè)面出錯(cuò)啦,請(qǐng)重新刷新”等提示,這樣就會(huì)有更好的交互體驗(yàn)。

2.系統(tǒng)解耦

使用了消息隊(duì)列之后,我們可以把系統(tǒng)的業(yè)務(wù)功能模塊化,實(shí)現(xiàn)系統(tǒng)的解耦。例如,在沒(méi)有使用消息隊(duì)列之前,當(dāng)前臺(tái)用戶完善了個(gè)人信息之后,首先我們需要更新用戶的資料,再添加一條用戶信息修改日志。但突然有一天產(chǎn)品經(jīng)理提了一個(gè)需求,在前臺(tái)用戶信息更新之后,需要給此用戶的增加一定的積分獎(jiǎng)勵(lì),然后沒(méi)過(guò)幾天產(chǎn)品經(jīng)理又提了一個(gè)需求,在前臺(tái)用戶信息更新之后,不但要增加積分獎(jiǎng)勵(lì),還要增加用戶的經(jīng)驗(yàn)值,但沒(méi)過(guò)幾天產(chǎn)品經(jīng)理的需求又變了,他要求完善資料無(wú)需增加用戶的積分了,這樣反反復(fù)復(fù)、來(lái)來(lái)回回的折騰,我想研發(fā)的同學(xué)一定受不了,但這是互聯(lián)網(wǎng)公司的常態(tài),那我們有沒(méi)有一勞永逸的辦法呢?

沒(méi)錯(cuò),這個(gè)時(shí)候我們想到了使用消息隊(duì)列來(lái)實(shí)現(xiàn)系統(tǒng)的解耦,每個(gè)功能的實(shí)現(xiàn)獨(dú)立開(kāi),只需要一個(gè)訂閱或者取消訂閱的開(kāi)關(guān)就可以了,當(dāng)需要增加功能時(shí),只需要打開(kāi)訂閱“用戶信息完善”的隊(duì)列就行,如果過(guò)兩天不用了,再把訂閱的開(kāi)關(guān)關(guān)掉就行了,這樣我們就不用來(lái)來(lái)回回的改業(yè)務(wù)代碼了,也就輕松的實(shí)現(xiàn)了系統(tǒng)模塊間的解耦。

3.日志記錄

我們大部分的日志記錄行為其實(shí)是和前臺(tái)用戶操作的主業(yè)務(wù)沒(méi)有直接關(guān)系的,只是我們的運(yùn)營(yíng)人和經(jīng)營(yíng)人員需要拿到這部分用戶操作的日志信息,來(lái)進(jìn)行用戶行為分析或行為監(jiān)控。在我們沒(méi)有使用消息隊(duì)列之前,籠統(tǒng)的做法是當(dāng)有用戶請(qǐng)求時(shí),先處理用戶的請(qǐng)求再記錄日志,這兩個(gè)操作是放在一起的,而前臺(tái)用戶也需要等待日志添加完成之后才能拿到后臺(tái)的響應(yīng)信息,這樣其實(shí)浪費(fèi)了前臺(tái)用戶的部分時(shí)間。此時(shí)我們可以使用消息隊(duì)列,當(dāng)響應(yīng)完用戶請(qǐng)求之后,只需要把這個(gè)操作信息放入消息隊(duì)列之后,就可以直接返回結(jié)果給前臺(tái)用戶了,無(wú)序等待日志處理和日志添加完成,從而縮短了前臺(tái)用戶的等待時(shí)間。

我們可以通過(guò) JDK 提供的 Queue 來(lái)實(shí)現(xiàn)自定義消息隊(duì)列,使用 DelayQueue 實(shí)現(xiàn)延遲消息隊(duì)列。

重點(diǎn)分析

對(duì)于消息隊(duì)列的考察更側(cè)重于消息隊(duì)列的核心思想,因?yàn)橹挥欣斫饬耸裁词窍㈥?duì)列?以及什么情況下要用消息隊(duì)列?才能解決我們?nèi)粘9ぷ髦杏龅降膯?wèn)題,而消息隊(duì)列的具體實(shí)現(xiàn),只需要掌握一個(gè)消息中間件的使用即可,因?yàn)橄㈥?duì)列中間件的核心實(shí)現(xiàn)思路是一致的,不但如此,消息隊(duì)列中間件的使用也大致類似,只要掌握了一個(gè)就能觸類旁通的用好其他消息中間件。

以下這兩個(gè)問(wèn)題比較常見(jiàn):

  • 介紹一個(gè)你熟悉的消息中間件?
  • 如何手動(dòng)實(shí)現(xiàn)消息隊(duì)列?

知識(shí)擴(kuò)展

1.常用消息中間件 RabbitMQ

目前市面上比較常用的 MQ(Message Queue,消息隊(duì)列)中間件有 RabbitMQ、Kafka、RocketMQ,如果是輕量級(jí)的消息隊(duì)列可以使用 Redis 提供的消息隊(duì)列,本文我們先來(lái)介紹一下 RabbitMQ。

RabbitMQ 是一個(gè)老牌開(kāi)源的消息中間件,它實(shí)現(xiàn)了標(biāo)準(zhǔn)的 AMQP(Advanced Message Queuing Protocol,高級(jí)消息隊(duì)列協(xié)議)消息中間件,使用 Erlang 語(yǔ)言開(kāi)發(fā),支持集群部署,和多種客戶端語(yǔ)言混合調(diào)用,它支持的主流開(kāi)發(fā)語(yǔ)言有以下這些:

  • Java and Spring
  • .NET
  • Ruby
  • Python
  • PHP
  • JavaScript and Node
  • Objective-C and Swift
  • Rust
  • Scala
  • Go

RabbitMQ 中有 3 個(gè)重要的概念:生產(chǎn)者、消費(fèi)者和代理。

  • 生產(chǎn)者:消息的創(chuàng)建者,負(fù)責(zé)創(chuàng)建和推送數(shù)據(jù)到消息服務(wù)器。
  • 消費(fèi)者:消息的接收方,用于處理數(shù)據(jù)和確認(rèn)消息。
  • 代理:也就是 RabbitMQ

服務(wù)本身,它用于扮演“快遞”的角色,因?yàn)樗旧聿⒉簧a(chǎn)消息,只是扮演了“快遞”的角色,把消息進(jìn)行暫存和傳遞。

它們的運(yùn)行流程,如下圖所示:


image.png

RabbitMQ 具備以下幾個(gè)優(yōu)點(diǎn):

  • 支持持久化,RabbitMQ 支持磁盤持久化功能,保證了消息不會(huì)丟失;
  • 高并發(fā),RabbitMQ 使用了 Erlang 開(kāi)發(fā)語(yǔ)言,Erlang 是為電話交換機(jī)開(kāi)發(fā)的語(yǔ)言,天生自帶高并發(fā)光環(huán)和高可用特性;
  • 支持分布式集群,正是因?yàn)?Erlang 語(yǔ)言實(shí)現(xiàn)的,因此 RabbitMQ 集群部署也非常簡(jiǎn)單,只需要啟動(dòng)每個(gè)節(jié)點(diǎn)并使用 --link把節(jié)點(diǎn)加入到集群中即可,并且 RabbitMQ 支持自動(dòng)選主和自動(dòng)容災(zāi);
  • 支持多種語(yǔ)言,比如 Java、.NET、PHP、Python、JavaScript、Ruby、Go 等;
  • 支持消息確認(rèn),支持消息消費(fèi)確認(rèn)(ack)保證了每條消息可以被正常消費(fèi);
  • 它支持很多插件,比如網(wǎng)頁(yè)控制臺(tái)消息管理插件、消息延遲插件等,RabbitMQ 的插件很多并且使用都很方便。

RabbitMQ 的消息類型,分為以下四種:

  • direct(默認(rèn)類型)模式,此模式為一對(duì)一的發(fā)送方式,也就是一條消息只會(huì)發(fā)送給一個(gè)消費(fèi)者;
  • headers 模式,允許你匹配消息的 header 而非路由鍵(RoutingKey),除此之外 headers 和 direct
  • 的使用完全一致,但因?yàn)?headers 匹配的性能很差,幾乎不會(huì)被用到;
  • fanout 模式,為多播的方式,會(huì)把一個(gè)消息分發(fā)給所有的訂閱者;
  • topic模式,為主題訂閱模式,允許使用通配符(#、*)匹配一個(gè)或者多個(gè)消息,我可以使用“cn.mq.#”匹配到多個(gè)前綴是“cn.mq.xxx”的消息,比如可以匹配到“cn.mq.rabbit”、“cn.mq.kafka”等消息。

2.自定義消息隊(duì)列

我們可使用 Queue 來(lái)實(shí)現(xiàn)消息隊(duì)列,Queue 大體可分為以下三類:

  • 雙端隊(duì)列(Deque)是 Queue 的子類也是 Queue 的補(bǔ)充類,頭部和尾部都支持元素插入和獲??;
  • 阻塞隊(duì)列指的是在元素操作時(shí)(添加或刪除),如果沒(méi)有成功,會(huì)阻塞等待執(zhí)行,比如當(dāng)添加元素時(shí),如果隊(duì)列元素已滿,隊(duì)列則會(huì)阻塞等待直到有空位時(shí)再插入;
  • 非阻塞隊(duì)列,和阻塞隊(duì)列相反,它會(huì)直接返回操作的結(jié)果,而非阻塞等待操作,雙端隊(duì)列也屬于非阻塞隊(duì)列。

自定義消息隊(duì)列的實(shí)現(xiàn)代碼如下:

import java.util.LinkedList;
import java.util.Queue;

public class CustomQueue {
    // 定義消息隊(duì)列
    private static Queue<String> queue = new LinkedList<>();

    public static void main(String[] args) {
        producer(); // 調(diào)用生產(chǎn)者
        consumer(); // 調(diào)用消費(fèi)者
    }

    // 生產(chǎn)者
    public static void producer() {
        // 添加消息
        queue.add("first message.");
        queue.add("second message.");
        queue.add("third message.");
    }

    // 消費(fèi)者
    public static void consumer() {
        while (!queue.isEmpty()) {
            // 消費(fèi)消息
            System.out.println(queue.poll());
        }
    }
}

以上程序的執(zhí)行結(jié)果是:

first message.
second message.
third message.

可以看出消息是以先進(jìn)先出順序進(jìn)行消費(fèi)的。

實(shí)現(xiàn)自定義延遲隊(duì)列需要實(shí)現(xiàn) Delayed 接口,重寫(xiě) getDelay() 方法,延遲隊(duì)列完整實(shí)現(xiàn)代碼如下:

import lombok.Getter;
import lombok.Setter;

import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * 自定義延遲隊(duì)列
 */
public class CustomDelayQueue {
    // 延遲消息隊(duì)列
    private static DelayQueue delayQueue = new DelayQueue();

    public static void main(String[] args) throws InterruptedException {
        producer(); // 調(diào)用生產(chǎn)者
        consumer(); // 調(diào)用消費(fèi)者
    }

    // 生產(chǎn)者
    public static void producer() {
        // 添加消息
        delayQueue.put(new MyDelay(1000, "消息1"));
        delayQueue.put(new MyDelay(3000, "消息2"));
    }

    // 消費(fèi)者
    public static void consumer() throws InterruptedException {
        System.out.println("開(kāi)始執(zhí)行時(shí)間:" +
                DateFormat.getDateTimeInstance().format(new Date()));
        while (!delayQueue.isEmpty()) {
            System.out.println(delayQueue.take());
        }
        System.out.println("結(jié)束執(zhí)行時(shí)間:" +
                DateFormat.getDateTimeInstance().format(new Date()));
    }

    /**
     * 自定義延遲隊(duì)列
     */
    static class MyDelay implements Delayed {
        // 延遲截止時(shí)間(單位:毫秒)
        long delayTime = System.currentTimeMillis();

        // 借助 lombok 實(shí)現(xiàn)
        @Getter
        @Setter
        private String msg;

        /**
         * 初始化
         * @param delayTime 設(shè)置延遲執(zhí)行時(shí)間
         * @param msg       執(zhí)行的消息
         */
        public MyDelay(long delayTime, String msg) {
            this.delayTime = (this.delayTime + delayTime);
            this.msg = msg;
        }

        // 獲取剩余時(shí)間
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        // 隊(duì)列里元素的排序依據(jù)
        @Override
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
                return -1;
            } else {
                return 0;
            }
        }

        @Override
        public String toString() {
            return this.msg;
        }
    }
}

以上程序的執(zhí)行結(jié)果是:

開(kāi)始執(zhí)行時(shí)間:2020-4-2 16:17:28
消息1
消息2
結(jié)束執(zhí)行時(shí)間:2020-4-2 16:17:31

可以看出,消息 1消息 2 都實(shí)現(xiàn)了延遲執(zhí)行的功能。

總結(jié)

本文講了消息隊(duì)列的使用場(chǎng)景:商品秒殺、系統(tǒng)解耦和日志記錄,我們還介紹了 RabbitMQ 以及它的消息類型和它的特點(diǎn)等內(nèi)容,同時(shí)還使用 Queue 的子類 LinkedList 實(shí)現(xiàn)了自定義消息隊(duì)列,使用 DelayQueue 實(shí)現(xiàn)了自定義延遲消息隊(duì)列。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容