1.概述
1.1 隊列的使用場景
削峰:并發(fā)情況下,數(shù)據(jù)庫的壓力非常大,此時可以將一部分非必需即可返回結(jié)果的操作轉(zhuǎn)為隊列來消費,達到對系統(tǒng)的緩沖效果。
解耦:A功能需要觸發(fā)B,C,D功能的一些行為,如果直接在A模塊的代碼調(diào)用B,C,D模塊的方法,代碼就會慢慢變得混亂不堪,此時是用隊列分發(fā)來觸發(fā)各自的行為,達到解耦。
異步:比如公眾號開發(fā)是,注冊的時候需要給用戶推送通知,如果用同步調(diào)用微信接口的方式,可能會因為網(wǎng)絡(luò)問題或者微信接口方面的問題導(dǎo)致流程阻塞,這時候改成異步的,可以極高的提高響應(yīng)速度。
1.2 可用于隊列設(shè)計的有以下各類中間件:
- Redis
- Kafka
- MQ類:RocketMQ && RabbitMQ && ActiveMQ && ZeroMQ
2.分別闡述各種中間件的特點
2.1 Redis
redis的本身定位是緩存數(shù)據(jù)庫,并非用于隊列。但它提供的數(shù)據(jù)類型 list 和發(fā)布/訂閱模式,可以用于隊列的設(shè)計。
lpush/brpop : 基于list的隊列設(shè)計有個缺陷就是,所有隊列相關(guān)的失敗重試,一致性等問題都由業(yè)務(wù)方來保證,無法從應(yīng)用層面來提供解決方案,redis宕機沒用持久化的話,會數(shù)據(jù)丟失,當然集群和持久化設(shè)置可避免。
發(fā)布/訂閱:此方式是用于廣播的,即一次發(fā)布,關(guān)注該topic的多個訂閱者都收到,如若用于隊列,即只能有一個訂閱者,多個也沒用,只是重復(fù)消費,另外此方式并沒保存發(fā)布數(shù)據(jù),如若網(wǎng)絡(luò)抖動或者redis宕機會產(chǎn)生數(shù)據(jù)丟失,不可忍受。
缺點:綜上,可靠性沒保證,且只提高數(shù)據(jù)結(jié)構(gòu),邏輯需要自己實現(xiàn)。
redis有另外一個產(chǎn)品——disque,專門作為隊列使用的。
2.2 Kafka
kafka的最大的優(yōu)勢就是快,即使普通的服務(wù)器,kafka也能輕松支持每秒百萬級的寫入效率。這么快的原因是:
1)寫入方式:以順序IO代替隨機IO, 當然只是這個是不夠的,很多軟件都已經(jīng)懂得這么做了,還有一個重要的原因是:它不是實時寫入硬盤,充分利用了分頁存儲來利用內(nèi)存提高效率。這里采用的是mmap,即內(nèi)存映射文件,完成映射之后你對物理內(nèi)存的操作,操作系統(tǒng)都會在適當?shù)臅r候同步到硬盤上。當然這里也有一個缺陷——不可靠,寫到mmap中的數(shù)據(jù)并沒有被真正的寫到硬盤,操作系統(tǒng)會在程序主動調(diào)用flush的時候才把數(shù)據(jù)真正的寫到硬盤。
2)讀取方式:基于sendfile實現(xiàn)Zero Copy。以前需要在內(nèi)核緩存區(qū)和用戶緩沖區(qū)的來來回回4次copy,被優(yōu)化到只需要2次,即只需要將數(shù)據(jù)從磁盤讀入內(nèi)核緩沖區(qū),在調(diào)用sendfile直接將數(shù)據(jù)copy到網(wǎng)卡緩存這兩步。 還有一個原因,kafka實現(xiàn)了端對端的批量壓縮,目前Kafka支持GZIP和Snappy壓縮協(xié)議。
kafka除了速度快,還支持分組,可以多個生產(chǎn)者,多個消費者,支持負載均衡,支持事務(wù)。
缺點:
- Kafka單機超過64個隊列/分區(qū),Load會發(fā)生明顯的飆高現(xiàn)象,隊列越多,load越高,發(fā)送消息響應(yīng)時間變長
- 使用短輪詢方式,實時性取決于輪詢間隔時間;
- 消費失敗不支持重試;
- 支持消息順序,但是一臺代理宕機后,就會產(chǎn)生消息亂序;
- 社區(qū)更新較慢;
- 運維難度大,它不僅僅需要關(guān)注整個集群之內(nèi)像broker、controller類似的角色,還需要關(guān)注其所依賴的一些產(chǎn)品像ZooKeeper等。你需要考慮一些集群產(chǎn)生的問題,比如“腦裂”,ZooKeeper不可用等問題。然一旦涉及集群,這個問題就存在,并非kafka特有。還有磁盤的問題,kafka的消息不是消費完就沒了,而是根據(jù)的你的設(shè)置多久過期,這樣就很可能產(chǎn)生磁盤數(shù)據(jù)量堆積問題。當
2.3 MQ類:RocketMQ && RabbitMQ && ActiveMQ && ZeroMQ
MQ類本身就是為隊列而生的。隊列該有的,這些MQ類產(chǎn)品大多數(shù)都有,只是有一些功能上的區(qū)別和性能的差異。
這里主要介紹一下RocketMQ 。
根據(jù)GitHub上Apache對它的介紹,有以下功能:
- 發(fā)布/訂閱消息傳遞模型
- 財務(wù)級交易消息
- 各種跨語言客戶端,例如Java,C / C ++,Python,Go
- 可插拔的傳輸協(xié)議,例如TCP,SSL,AIO
- 內(nèi)置的消息跟蹤功能,還支持開放式跟蹤
- 多功能的大數(shù)據(jù)和流生態(tài)系統(tǒng)集成
- 按時間或偏移量追溯消息
- 可靠的FIFO和嚴格的有序消息傳遞在同一隊列中
- 高效的推拉消費模型
- 單個隊列中的百萬級消息累積容量
- 多種消息傳遞協(xié)議,例如JMS和OpenMessaging
- 靈活的分布式橫向擴展部署架構(gòu)
- 快如閃電的批量消息交換系統(tǒng)
- 各種消息過濾器機制,例如SQL和Tag
- 用于隔離測試和云隔離群集的Docker映像
- 功能豐富的管理儀表板,用于配置,指標和監(jiān)視
- 認證與授權(quán)
RocketMQ 主要有四大核心組成部分:NameServer、Broker、Producer以及Consumer四部分。
RocketMQ什么都是集群部署的,這是他吞吐量大,高可用的原因之一,集群的模式也很花哨,可以支持多master 模式、多master多slave異步復(fù)制模式、多 master多slave同步雙寫模式。
支持負載均衡,支持集群,支持訂閱形式和消息分發(fā),支持順序消息,支持消息確認,支持指定時間點的回溯,支持消息重試。
性能方面也僅次于kafka,zeroMQ。
本身就是為隊列定制的,缺點就是對輕量級項目來說太復(fù)雜了。最好就是有公司統(tǒng)一的一套隊列實現(xiàn)方案,提供公司內(nèi)的各個團隊使用,并支持各種語言棧的調(diào)用。
3.隊列設(shè)計
考慮到成本、運維復(fù)雜度、設(shè)計的靈活性/自由度、原有項目語言(php)的限制,采用redis來實現(xiàn)。
3.1 常用隊列的類型

3.2 隊列的消費模型
3.2.1 采用php常駐進程模式
通常隊列消費,我們是每個隊列起一個或多個消費進程,網(wǎng)上有看到這樣的一個延時隊列的設(shè)計模型:

這里是用到了php的一個pcntl 進程控制的擴展,由dq-mster: 主進程,負責(zé)管理子進程的創(chuàng)建,銷毀,回收以及信號通知。
對于我們來說,這種方案有以下缺點:
- 所有的隊列會在一個服務(wù)里面消費,沒有區(qū)分隊列的類型,如果某個隊列阻塞,只是單純的加子進程,沒有針對性,當然這個問題是有辦法解決的;
- 常駐進程會由內(nèi)存的回收和數(shù)據(jù)庫連接斷連的問題(長時間沒有數(shù)據(jù)消費的話,數(shù)據(jù)庫連接會超時失效,之前的遇到過),斷連的問題也不難解決;
- 修改到某些代碼需要手動取重啟進程,常駐進程的代碼已經(jīng)是加載到內(nèi)存里面的了,修改代碼需要重啟才生效。
3.2.2 采用yii2-queue
yii2-queue 支持了不少驅(qū)動,F(xiàn)ile, Mysql, Redis, RabbitMq等等。
這里只分析redis驅(qū)動。
基于redis的yii2-queue只實現(xiàn)了延時隊列和FIFO隊列,延時隊列采用有序集合結(jié)合列表來實現(xiàn)。
話不多說,擼源碼:
- 先看入列:
protected function pushMessage($message, $ttr, $delay, $priority)
{
// Redis驅(qū)動不支持作業(yè)優(yōu)先級 如果有傳值則拋出錯誤
if ($priority !== null) {
throw new NotSupportedException('Job priority is not supported in the driver.');
}
// 取到上條 message_id + 1 ,作為本條任務(wù)的ID
$id = $this->redis->incr("$this->channel.message_id");
// 以新ID將本條任務(wù)格存儲到hash表 messages 中
$this->redis->hset("$this->channel.messages", $id, "$ttr;$message");
if (!$delay) {
// 如果不需要等待執(zhí)行 則將任務(wù)ID推到hash表 waiting 中
$this->redis->lpush("$this->channel.waiting", $id);
} else {
// 如果需要等待執(zhí)行 則推送到有序集合 delayed 中
$this->redis->zadd("$this->channel.delayed", time() + $delay, $id);
}
// 返回任務(wù)ID
return $id;
}
- 再看出列:
/**
* @param int $wait timeout
* @return array|null payload
*/
protected function reserve($wait)
{
// 將延遲和保留的作業(yè)移動到等待列表中 并鎖定一秒
if ($this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) {
$this->moveExpired("$this->channel.delayed");
$this->moveExpired("$this->channel.reserved");
}
// Find a new waiting message
$id = null;
if (!$wait) {
// 從等待列表取1條任務(wù)ID待執(zhí)行
$id = $this->redis->rpop("$this->channel.waiting");
} elseif ($result = $this->redis->brpop("$this->channel.waiting", $wait)) {
$id = $result[1];
}
if (!$id) {
return null;
}
// 根據(jù)任務(wù)ID取出任務(wù)
$payload = $this->redis->hget("$this->channel.messages", $id);
list($ttr, $message) = explode(';', $payload, 2);
// 加入到作業(yè)列表
$this->redis->zadd("$this->channel.reserved", time() + $ttr, $id);
$attempt = $this->redis->hincrby("$this->channel.attempts", $id, 1);
return [$id, $message, $ttr, $attempt];
}
- 再看執(zhí)行進程
yii2-queue提供了兩種方式,一種是crontab機制:
/**
* Runs all jobs from redis-queue.
* It can be used as cron job.
*
* @return null|int exit code.
*/
public function actionRun()
{
return $this->queue->run(false);
}
一種是常駐進程機制:
/**
* Listens redis-queue and runs new jobs.
* It can be used as daemon process.
*
* @param int $timeout number of seconds to wait a job.
* @throws Exception when params are invalid.
* @return null|int exit code.
*/
public function actionListen($timeout = 3)
{
if (!is_numeric($timeout)) {
throw new Exception('Timeout must be numeric.');
}
if ($timeout < 1) {
throw new Exception('Timeout must be greater than zero.');
}
return $this->queue->run(true, $timeout);
}
本質(zhì)上都是調(diào)用run方法,只是參數(shù)不同:
/**
* Listens queue and runs each job.
*
* @param bool $repeat whether to continue listening when queue is empty.
* @param int $timeout number of seconds to wait for next message.
* @return null|int exit code.
* @internal for worker command only.
* @since 2.0.2
*/
public function run($repeat, $timeout = 0)
{
return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) {
while ($canContinue()) {
if (($payload = $this->reserve($timeout)) !== null) {
list($id, $message, $ttr, $attempt) = $payload;
if ($this->handleMessage($id, $message, $ttr, $attempt)) {
$this->delete($id);
}
} elseif (!$repeat) {
break;
}
}
});
}
yii2-queue的缺點在于:
- 多個隊列分組需要修改配置文件,公司的配置文件修改比較麻煩,需要運維手動去每臺機器去改;
- 使用常駐進程依然有前述的問題;
3.2.3 采用php+crontab實現(xiàn)
采用crontab配置+php 進程實現(xiàn):

crontab定時10分鐘執(zhí)行一次,php進程只給10分鐘生命周期,即php進程只運行十分鐘便自行退出。我們有crontab的配置工具,所以用來實現(xiàn)這個模型也比較方便,如果某些進程的隊列阻塞,只需要在crontab配置多一些進程即可,例如上圖的queueName1。
這個方案可以有效解決上面常駐進程的問題,針對隊列名稱消費可以有效針對隊列增加進程,自動退出不存在內(nèi)存沒回收的問題,10分鐘不存在數(shù)據(jù)庫斷連的問題,代碼更新也只是10分鐘內(nèi)還是走舊代碼。
參考yii2-queue,隊列Job的狀態(tài)轉(zhuǎn)換如下:

- delay 延時態(tài),即還沒到設(shè)置的時間執(zhí)行;
- waiting 等待態(tài),等待被消費者取出;
- reserved 就緒態(tài),已取出等待執(zhí)行;
- done 完成態(tài),已經(jīng)被執(zhí)行,實際上這個狀態(tài)基本不存在,執(zhí)行成功就會被刪掉。