背景
消息隊(duì)列的使用場(chǎng)景有很多,最常見(jiàn)的使用場(chǎng)景有以下幾個(gè)。
1.商品秒殺
比如,我們?cè)谧雒霘⒒顒?dòng)時(shí),會(huì)發(fā)生短時(shí)間內(nèi)出現(xiàn)爆發(fā)式的用戶請(qǐng)求,如果不采取相關(guān)的措施,會(huì)導(dǎo)致服務(wù)器忙不過(guò)來(lái),響應(yīng)超時(shí)的問(wèn)題,輕則會(huì)導(dǎo)致服務(wù)假死,重則會(huì)讓服務(wù)器直接宕機(jī),給用戶帶來(lái)的體驗(yàn)也非常不好。如果這個(gè)時(shí)候加上了消息隊(duì)列,服務(wù)器接收到用戶的所有請(qǐng)求后,先把這些請(qǐng)求全部寫(xiě)入到消息隊(duì)列中再排隊(duì)處理,這樣就不會(huì)導(dǎo)致同時(shí)處理多個(gè)請(qǐng)求的情況;如果消息隊(duì)列長(zhǎng)度超過(guò)可以承載的最大數(shù)量,那么我們可以拋棄當(dāng)前用戶的請(qǐng)求,通知前臺(tái)用戶“頁(yè)面出錯(cuò)啦,請(qǐng)重新刷新”等提示,這樣就會(huì)有更好的交互體驗(yàn)。
2.系統(tǒng)解耦
使用了消息隊(duì)列之后,我們可以把系統(tǒng)的業(yè)務(wù)功能模塊化,實(shí)現(xiàn)系統(tǒng)的解耦。例如,在沒(méi)有使用消息隊(duì)列之前,當(dāng)前臺(tái)用戶完善了個(gè)人信息之后,首先我們需要更新用戶的資料,再添加一條用戶信息修改日志。但突然有一天產(chǎn)品經(jīng)理提了一個(gè)需求,在前臺(tái)用戶信息更新之后,需要給此用戶的增加一定的積分獎(jiǎng)勵(lì),然后沒(méi)過(guò)幾天產(chǎn)品經(jīng)理又提了一個(gè)需求,在前臺(tái)用戶信息更新之后,不但要增加積分獎(jiǎng)勵(lì),還要增加用戶的經(jīng)驗(yàn)值,但沒(méi)過(guò)幾天產(chǎn)品經(jīng)理的需求又變了,他要求完善資料無(wú)需增加用戶的積分了,這樣反反復(fù)復(fù)、來(lái)來(lái)回回的折騰,我想研發(fā)的同學(xué)一定受不了,但這是互聯(lián)網(wǎng)公司的常態(tài),那我們有沒(méi)有一勞永逸的辦法呢?
沒(méi)錯(cuò),這個(gè)時(shí)候我們想到了使用消息隊(duì)列來(lái)實(shí)現(xiàn)系統(tǒng)的解耦,每個(gè)功能的實(shí)現(xiàn)獨(dú)立開(kāi),只需要一個(gè)訂閱或者取消訂閱的開(kāi)關(guān)就可以了,當(dāng)需要增加功能時(shí),只需要打開(kāi)訂閱“用戶信息完善”的隊(duì)列就行,如果過(guò)兩天不用了,再把訂閱的開(kāi)關(guān)關(guān)掉就行了,這樣我們就不用來(lái)來(lái)回回的改業(yè)務(wù)代碼了,也就輕松的實(shí)現(xiàn)了系統(tǒng)模塊間的解耦。
3.日志記錄
我們大部分的日志記錄行為其實(shí)是和前臺(tái)用戶操作的主業(yè)務(wù)沒(méi)有直接關(guān)系的,只是我們的運(yùn)營(yíng)人和經(jīng)營(yíng)人員需要拿到這部分用戶操作的日志信息,來(lái)進(jìn)行用戶行為分析或行為監(jiān)控。在我們沒(méi)有使用消息隊(duì)列之前,籠統(tǒng)的做法是當(dāng)有用戶請(qǐng)求時(shí),先處理用戶的請(qǐng)求再記錄日志,這兩個(gè)操作是放在一起的,而前臺(tái)用戶也需要等待日志添加完成之后才能拿到后臺(tái)的響應(yīng)信息,這樣其實(shí)浪費(fèi)了前臺(tái)用戶的部分時(shí)間。此時(shí)我們可以使用消息隊(duì)列,當(dāng)響應(yīng)完用戶請(qǐng)求之后,只需要把這個(gè)操作信息放入消息隊(duì)列之后,就可以直接返回結(jié)果給前臺(tái)用戶了,無(wú)序等待日志處理和日志添加完成,從而縮短了前臺(tái)用戶的等待時(shí)間。
我們可以通過(guò) JDK 提供的 Queue 來(lái)實(shí)現(xiàn)自定義消息隊(duì)列,使用 DelayQueue 實(shí)現(xiàn)延遲消息隊(duì)列。
重點(diǎn)分析
對(duì)于消息隊(duì)列的考察更側(cè)重于消息隊(duì)列的核心思想,因?yàn)橹挥欣斫饬耸裁词窍㈥?duì)列?以及什么情況下要用消息隊(duì)列?才能解決我們?nèi)粘9ぷ髦杏龅降膯?wèn)題,而消息隊(duì)列的具體實(shí)現(xiàn),只需要掌握一個(gè)消息中間件的使用即可,因?yàn)橄㈥?duì)列中間件的核心實(shí)現(xiàn)思路是一致的,不但如此,消息隊(duì)列中間件的使用也大致類似,只要掌握了一個(gè)就能觸類旁通的用好其他消息中間件。
以下這兩個(gè)問(wèn)題比較常見(jiàn):
- 介紹一個(gè)你熟悉的消息中間件?
- 如何手動(dòng)實(shí)現(xiàn)消息隊(duì)列?
知識(shí)擴(kuò)展
1.常用消息中間件 RabbitMQ
目前市面上比較常用的 MQ(Message Queue,消息隊(duì)列)中間件有 RabbitMQ、Kafka、RocketMQ,如果是輕量級(jí)的消息隊(duì)列可以使用 Redis 提供的消息隊(duì)列,本文我們先來(lái)介紹一下 RabbitMQ。
RabbitMQ 是一個(gè)老牌開(kāi)源的消息中間件,它實(shí)現(xiàn)了標(biāo)準(zhǔn)的 AMQP(Advanced Message Queuing Protocol,高級(jí)消息隊(duì)列協(xié)議)消息中間件,使用 Erlang 語(yǔ)言開(kāi)發(fā),支持集群部署,和多種客戶端語(yǔ)言混合調(diào)用,它支持的主流開(kāi)發(fā)語(yǔ)言有以下這些:
- Java and Spring
- .NET
- Ruby
- Python
- PHP
- JavaScript and Node
- Objective-C and Swift
- Rust
- Scala
- Go
RabbitMQ 中有 3 個(gè)重要的概念:生產(chǎn)者、消費(fèi)者和代理。
- 生產(chǎn)者:消息的創(chuàng)建者,負(fù)責(zé)創(chuàng)建和推送數(shù)據(jù)到消息服務(wù)器。
- 消費(fèi)者:消息的接收方,用于處理數(shù)據(jù)和確認(rèn)消息。
- 代理:也就是 RabbitMQ
服務(wù)本身,它用于扮演“快遞”的角色,因?yàn)樗旧聿⒉簧a(chǎn)消息,只是扮演了“快遞”的角色,把消息進(jìn)行暫存和傳遞。
它們的運(yùn)行流程,如下圖所示:

RabbitMQ 具備以下幾個(gè)優(yōu)點(diǎn):
- 支持持久化,RabbitMQ 支持磁盤持久化功能,保證了消息不會(huì)丟失;
- 高并發(fā),RabbitMQ 使用了 Erlang 開(kāi)發(fā)語(yǔ)言,Erlang 是為電話交換機(jī)開(kāi)發(fā)的語(yǔ)言,天生自帶高并發(fā)光環(huán)和高可用特性;
- 支持分布式集群,正是因?yàn)?Erlang 語(yǔ)言實(shí)現(xiàn)的,因此 RabbitMQ 集群部署也非常簡(jiǎn)單,只需要啟動(dòng)每個(gè)節(jié)點(diǎn)并使用 --link把節(jié)點(diǎn)加入到集群中即可,并且 RabbitMQ 支持自動(dòng)選主和自動(dòng)容災(zāi);
- 支持多種語(yǔ)言,比如 Java、.NET、PHP、Python、JavaScript、Ruby、Go 等;
- 支持消息確認(rèn),支持消息消費(fèi)確認(rèn)(ack)保證了每條消息可以被正常消費(fèi);
- 它支持很多插件,比如網(wǎng)頁(yè)控制臺(tái)消息管理插件、消息延遲插件等,RabbitMQ 的插件很多并且使用都很方便。
RabbitMQ 的消息類型,分為以下四種:
- direct(默認(rèn)類型)模式,此模式為一對(duì)一的發(fā)送方式,也就是一條消息只會(huì)發(fā)送給一個(gè)消費(fèi)者;
- headers 模式,允許你匹配消息的 header 而非路由鍵(RoutingKey),除此之外 headers 和 direct
- 的使用完全一致,但因?yàn)?headers 匹配的性能很差,幾乎不會(huì)被用到;
- fanout 模式,為多播的方式,會(huì)把一個(gè)消息分發(fā)給所有的訂閱者;
- topic模式,為主題訂閱模式,允許使用通配符(#、*)匹配一個(gè)或者多個(gè)消息,我可以使用“cn.mq.#”匹配到多個(gè)前綴是“cn.mq.xxx”的消息,比如可以匹配到“cn.mq.rabbit”、“cn.mq.kafka”等消息。
2.自定義消息隊(duì)列
我們可使用 Queue 來(lái)實(shí)現(xiàn)消息隊(duì)列,Queue 大體可分為以下三類:
- 雙端隊(duì)列(Deque)是 Queue 的子類也是 Queue 的補(bǔ)充類,頭部和尾部都支持元素插入和獲??;
- 阻塞隊(duì)列指的是在元素操作時(shí)(添加或刪除),如果沒(méi)有成功,會(huì)阻塞等待執(zhí)行,比如當(dāng)添加元素時(shí),如果隊(duì)列元素已滿,隊(duì)列則會(huì)阻塞等待直到有空位時(shí)再插入;
- 非阻塞隊(duì)列,和阻塞隊(duì)列相反,它會(huì)直接返回操作的結(jié)果,而非阻塞等待操作,雙端隊(duì)列也屬于非阻塞隊(duì)列。
自定義消息隊(duì)列的實(shí)現(xiàn)代碼如下:
import java.util.LinkedList;
import java.util.Queue;
public class CustomQueue {
// 定義消息隊(duì)列
private static Queue<String> queue = new LinkedList<>();
public static void main(String[] args) {
producer(); // 調(diào)用生產(chǎn)者
consumer(); // 調(diào)用消費(fèi)者
}
// 生產(chǎn)者
public static void producer() {
// 添加消息
queue.add("first message.");
queue.add("second message.");
queue.add("third message.");
}
// 消費(fèi)者
public static void consumer() {
while (!queue.isEmpty()) {
// 消費(fèi)消息
System.out.println(queue.poll());
}
}
}
以上程序的執(zhí)行結(jié)果是:
first message.
second message.
third message.
可以看出消息是以先進(jìn)先出順序進(jìn)行消費(fèi)的。
實(shí)現(xiàn)自定義延遲隊(duì)列需要實(shí)現(xiàn) Delayed 接口,重寫(xiě) getDelay() 方法,延遲隊(duì)列完整實(shí)現(xiàn)代碼如下:
import lombok.Getter;
import lombok.Setter;
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 自定義延遲隊(duì)列
*/
public class CustomDelayQueue {
// 延遲消息隊(duì)列
private static DelayQueue delayQueue = new DelayQueue();
public static void main(String[] args) throws InterruptedException {
producer(); // 調(diào)用生產(chǎn)者
consumer(); // 調(diào)用消費(fèi)者
}
// 生產(chǎn)者
public static void producer() {
// 添加消息
delayQueue.put(new MyDelay(1000, "消息1"));
delayQueue.put(new MyDelay(3000, "消息2"));
}
// 消費(fèi)者
public static void consumer() throws InterruptedException {
System.out.println("開(kāi)始執(zhí)行時(shí)間:" +
DateFormat.getDateTimeInstance().format(new Date()));
while (!delayQueue.isEmpty()) {
System.out.println(delayQueue.take());
}
System.out.println("結(jié)束執(zhí)行時(shí)間:" +
DateFormat.getDateTimeInstance().format(new Date()));
}
/**
* 自定義延遲隊(duì)列
*/
static class MyDelay implements Delayed {
// 延遲截止時(shí)間(單位:毫秒)
long delayTime = System.currentTimeMillis();
// 借助 lombok 實(shí)現(xiàn)
@Getter
@Setter
private String msg;
/**
* 初始化
* @param delayTime 設(shè)置延遲執(zhí)行時(shí)間
* @param msg 執(zhí)行的消息
*/
public MyDelay(long delayTime, String msg) {
this.delayTime = (this.delayTime + delayTime);
this.msg = msg;
}
// 獲取剩余時(shí)間
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// 隊(duì)列里元素的排序依據(jù)
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
@Override
public String toString() {
return this.msg;
}
}
}
以上程序的執(zhí)行結(jié)果是:
開(kāi)始執(zhí)行時(shí)間:2020-4-2 16:17:28
消息1
消息2
結(jié)束執(zhí)行時(shí)間:2020-4-2 16:17:31
可以看出,消息 1消息 2 都實(shí)現(xiàn)了延遲執(zhí)行的功能。
總結(jié)
本文講了消息隊(duì)列的使用場(chǎng)景:商品秒殺、系統(tǒng)解耦和日志記錄,我們還介紹了 RabbitMQ 以及它的消息類型和它的特點(diǎn)等內(nèi)容,同時(shí)還使用 Queue 的子類 LinkedList 實(shí)現(xiàn)了自定義消息隊(duì)列,使用 DelayQueue 實(shí)現(xiàn)了自定義延遲消息隊(duì)列。