前言
當前筆記中的內容針對的是 thinkphp-queue 的 v2.0 版本, 下文中提到的幾個Bug在最新的master分支上均已修復。 筆記中的部分內容還未更新。
傳統(tǒng)的程序執(zhí)行流程一般是 即時|同步|串行的,在某些場景下,會存在并發(fā)低,吞吐量低,響應時間長等問題。在大型系統(tǒng)中,一般會引入消息隊列的組件,將流程中部分任務抽離出來放入消息隊列,并由專門的消費者作針對性的處理,從而降低系統(tǒng)耦合度,提高系統(tǒng)性能和可用性。
一般來說,可以抽離的任務具有以下的特點:
-
允許延后|異步|并行處理 (相對于傳統(tǒng)的 即時|同步|串行 的執(zhí)行方式)
-
允許延后:
搶購活動時,先快速緩沖有限的參與人數到消息隊列,后續(xù)再排隊處理實際的搶購業(yè)務;
-
允許異步:
業(yè)務處理過程中的郵件,短信等通知
-
允許并行:
用戶支付成功之后,郵件通知,微信通知,短信通知可以由多個不同的消費者并行執(zhí)行,通知到達的時間不要求先后順序。
-
-
允許失敗和重試
- 強一致性的業(yè)務放入核心流程處理
- 無一致性要求或最終一致即可的業(yè)務放入隊列處理
thinkphp-queue 是thinkphp 官方提供的一個消息隊列服務,它支持消息隊列的一些基本特性:
- 消息的發(fā)布,獲取,執(zhí)行,刪除,重發(fā),失敗處理,延遲執(zhí)行,超時控制等
- 隊列的多隊列, 內存限制 ,啟動,停止,守護等
- 消息隊列可降級為同步執(zhí)行
thinkphp-queue 內置了 Redis,Database,Topthink ,Sync這四種驅動。本文主要介紹 thinkphp-queue 結合其內置的 redis 驅動的使用方式和基本原理。
注1:如無特殊說明,下文中的 ‘消息’ 和 ‘任務’兩個詞指代的是同一個概念,即隊列中的一個成員。該成員對消息隊列而言是其內部保存的消息; 對業(yè)務應用而言是一個待執(zhí)行的任務。請根據語境區(qū)分。
注2:本文編寫時(2017-02-15)使用的 thinkphp-queue 的版本號是 v1.1.2 。該版本中部分功能并未全部完成,如 subscribe 模式,以及存在幾個bug(稍后會提及)。如有變更,請以官方最新版為準。
一 代碼示例
先通過一段代碼,了解一下 thinkphp-queue 的基本使用流程。
目標:
在業(yè)務控制器中推送一個新消息到一個名為 ‘helloJobQueue’ 的隊列中,該消息中包含我們自定義的業(yè)務數據,然后,編寫一個名為 Hello 的消費者類,并通過命令行去調用該消費者類獲取這個消息,拿到定義的數據。
1.1 安裝 thinkphp-queue
composer install thinkphp-queue
1.2 搭建消息隊列的存儲環(huán)境
-
使用 Redis [推薦]
安裝并啟動 Redis 服務 -
使用數據庫 [不推薦]
CREATE TABLE `prefix_jobs` ( `id` int(11) NOT NULL AUTO_INCREMENT, `queue` varchar(255) NOT NULL, `payload` longtext NOT NULL, `attempts` tinyint(3) unsigned NOT NULL, `reserved` tinyint(3) unsigned NOT NULL, `reserved_at` int(10) unsigned DEFAULT NULL, `available_at` int(10) unsigned NOT NULL, `created_at` int(10) unsigned NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
1.3 配置消息隊列的驅動
根據選擇的存儲方式,在 \application\extra\queue.php 這個配置文件中,添加消息隊列對應的驅動配置
return [
'connector' => 'Redis', // Redis 驅動
'expire' => 60, // 任務的過期時間,默認為60秒; 若要禁用,則設置為 null
'default' => 'default', // 默認的隊列名稱
'host' => '127.0.0.1', // redis 主機ip
'port' => 6379, // redis 端口
'password' => '', // redis 密碼
'select' => 0, // 使用哪一個 db,默認為 db0
'timeout' => 0, // redis連接的超時時間
'persistent' => false, // 是否是長連接
// 'connector' => 'Database', // 數據庫驅動
// 'expire' => 60, // 任務的過期時間,默認為60秒; 若要禁用,則設置為 null
// 'default' => 'default', // 默認的隊列名稱
// 'table' => 'jobs', // 存儲消息的表名,不帶前綴
// 'dsn' => [],
// 'connector' => 'Topthink', // ThinkPHP內部的隊列通知服務平臺 ,本文不作介紹
// 'token' => '',
// 'project_id' => '',
// 'protocol' => 'https',
// 'host' => 'qns.topthink.com',
// 'port' => 443,
// 'api_version' => 1,
// 'max_retries' => 3,
// 'default' => 'default',
// 'connector' => 'Sync', // Sync 驅動,該驅動的實際作用是取消消息隊列,還原為同步執(zhí)行
];
1.3.1 配置文件中的 expire 參數說明
expire 參數指的是任務的過期時間。 過期的任務,其準確的定義是
- 任務的狀態(tài)為執(zhí)行中
- 任務的開始執(zhí)行的時刻 + expire > 當前時刻
expire 不為null 時 ,thinkphp-queue 會在每次獲取下一個任務之前檢查并重發(fā)過期(執(zhí)行超時)的任務。
expire 為null 時,thinkphp-queue 不會檢查過期的任務,性能相對較高一點。但是需要注意:
- 這些執(zhí)行超時的任務會一直留在消息隊列中,需要開發(fā)者另行處理(刪除或者重發(fā))!
對expire 參數理解或者使用不當時,很容易產生一些bug,后面會舉例提到。
1.4 消息的創(chuàng)建與推送
我們在業(yè)務控制器中創(chuàng)建一個新的消息,并推送到 helloJobQueue 隊列
新增 \application\index\controller\JobTest.php 控制器,在該控制器中添加 actionWithHelloJob 方法
<?php
/**
* 文件路徑: \application\index\controller\JobTest.php
* 該控制器的業(yè)務代碼中借助了thinkphp-queue 庫,將一個消息推送到消息隊列
*/
namespace application\index\controller;
use think\Exception;
use think\Queue;
class JobTest {
/**
* 一個使用了隊列的 action
*/
public function actionWithHelloJob(){
// 1.當前任務將由哪個類來負責處理。
// 當輪到該任務時,系統(tǒng)將生成一個該類的實例,并調用其 fire 方法
$jobHandlerClassName = 'application\index\job\Hello';
// 2.當前任務歸屬的隊列名稱,如果為新隊列,會自動創(chuàng)建
$jobQueueName = "helloJobQueue";
// 3.當前任務所需的業(yè)務數據 . 不能為 resource 類型,其他類型最終將轉化為json形式的字符串
// ( jobData 為對象時,需要在先在此處手動序列化,否則只存儲其public屬性的鍵值對)
$jobData = [ 'ts' => time(), 'bizId' => uniqid() , 'a' => 1 ] ;
// 4.將該任務推送到消息隊列,等待對應的消費者去執(zhí)行
$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
// database 驅動時,返回值為 1|false ; redis 驅動時,返回值為 隨機字符串|false
if( $isPushed !== false ){
echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";
}else{
echo 'Oops, something went wrong.';
}
}
}
注意: 在這個例子當中,我們是手動指定的 $jobHandlerClassName ,更合理的做法是先定義好消息名稱與消費者類名的映射關系,然后由某個可以獲取該映射關系的類來推送這個消息。這樣,生產者只需要知道消息的名稱,而無需指定哪個消費者類來處理。
除了
Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );這種方式之外,還可以直接傳入Queue::push( $jobHandlerObject ,null , $jobQueueName );這時,需要在 $jobHandlerObject 中定義一個handle()方法,消息隊列在執(zhí)行到該任務時會自動反序列化該對象,并調用其handle()方法。 該方式的缺點是無法傳入自定義數據。
1.5 消息的消費與刪除
編寫 Hello 消費者類,用于處理 helloJobQueue 隊列中的任務
新增 \application\index\job\Hello.php 消費者類,并編寫其 fire() 方法
<?php
/**
* 文件路徑: \application\index\job\Hello.php
* 這是一個消費者類,用于處理 helloJobQueue 隊列中的任務
*/
namespace application\index\job;
use think\queue\Job;
class Hello {
/**
* fire方法是消息隊列默認調用的方法
* @param Job $job 當前的任務對象
* @param array|mixed $data 發(fā)布任務時自定義的數據
*/
public function fire(Job $job,$data){
// 如有必要,可以根據業(yè)務需求和數據庫中的最新數據,判斷該任務是否仍有必要執(zhí)行.
$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
if(!isJobStillNeedToBeDone){
$job->delete();
return;
}
$isJobDone = $this->doHelloJob($data);
if ($isJobDone) {
//如果任務執(zhí)行成功, 記得刪除任務
$job->delete();
print("<info>Hello Job has been done and deleted"."</info>\n");
}else{
if ($job->attempts() > 3) {
//通過這個方法可以檢查這個任務已經重試了幾次了
print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");
$job->delete();
// 也可以重新發(fā)布這個任務
//print("<info>Hello Job will be availabe again after 2s."."</info>\n");
//$job->release(2); //$delay為延遲時間,表示該任務延遲2秒后再執(zhí)行
}
}
}
/**
* 有些消息在到達消費者時,可能已經不再需要執(zhí)行了
* @param array|mixed $data 發(fā)布任務時自定義的數據
* @return boolean 任務執(zhí)行的結果
*/
private function checkDatabaseToSeeIfJobNeedToBeDone($data){
return true;
}
/**
* 根據消息中的數據進行實際的業(yè)務處理
* @param array|mixed $data 發(fā)布任務時自定義的數據
* @return boolean 任務執(zhí)行的結果
*/
private function doHelloJob($data) {
// 根據消息中的數據進行實際的業(yè)務處理...
print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n");
print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n");
print("<info>Hello Job is Done!"."</info> \n");
return true;
}
}
至此,所有的代碼都已準備完畢,在運行消息隊列之前,我們先看一下現(xiàn)在的目錄結構:
[圖片上傳失敗...(image-b435a1-1540961256653)]
1.6 發(fā)布任務
在瀏覽器中訪問 http://your.project.domain/index/job_test/actionWithHelloJob ,可以看到消息推送成功。
[圖片上傳失敗...(image-a1af6f-1540961256653)]
1.7 處理任務
切換當前終端窗口的目錄到項目根目錄下,執(zhí)行
php think queue:work --queue helloJobQueue
可以看到執(zhí)行的結果類似如下:
[圖片上傳失敗...(image-cf70e7-1540961256653)]
?
至此,我們成功地經歷了一個消息的 創(chuàng)建 -> 推送 -> 消費 -> 刪除 的基本流程
下文,將介紹 thinkphp-queue 的詳細使用方法。如配置介紹,基本原理,各種特殊情況的處理等
二 詳細介紹
2.1 命令模式
queue:subscribe 命令 [截至2017-02-15,作者暫未實現(xiàn)該模式,略過]
-
queue:work 命令
work 命令: 該命令將啟動一個 work 進程來處理消息隊列。
php think queue:work --queue helloJobQueue -
queue:listen 命令
listen 命令: 該命令將會創(chuàng)建一個 listen 父進程 ,然后由父進程通過
proc_open(‘php think queue:work’)的方式來創(chuàng)建一個work 子 進程來處理消息隊列,且限制該work進程的執(zhí)行時間。php think queue:listen --queue helloJobQueue
2.2 命令行參數
-
Work 模式
php think queue:work \ --daemon //是否循環(huán)執(zhí)行,如果不加該參數,則該命令處理完下一個消息就退出 --queue helloJobQueue //要處理的隊列的名稱 --delay 0 \ //如果本次任務執(zhí)行拋出異常且任務未被刪除時,設置其下次執(zhí)行前延遲多少秒,默認為0 --force \ //系統(tǒng)處于維護狀態(tài)時是否仍然處理任務,并未找到相關說明 --memory 128 \ //該進程允許使用的內存上限,以 M 為單位 --sleep 3 \ //如果隊列中無任務,則sleep多少秒后重新檢查(work+daemon模式)或者退出(listen或非daemon模式) --tries 2 //如果任務已經超過嘗試次數上限,則觸發(fā)‘任務嘗試次數超限’事件,默認為0 -
Listen 模式
php think queue:listen \ --queue helloJobQueue \ //監(jiān)聽的隊列的名稱 --delay 0 \ //如果本次任務執(zhí)行拋出異常且任務未被刪除時,設置其下次執(zhí)行前延遲多少秒,默認為0 --memory 128 \ //該進程允許使用的內存上限,以 M 為單位 --sleep 3 \ //如果隊列中無任務,則多長時間后重新檢查,daemon模式下有效 --tries 0 \ //如果任務已經超過重發(fā)次數上限,則進入失敗處理邏輯,默認為0 --timeout 60 //創(chuàng)建的work子進程的允許執(zhí)行的最長時間,以秒為單位可以看到 listen 模式下,不包含
--deamon參數,原因下面會說明
2.3 work 模式和 listen 模式的區(qū)別
兩者都可以用于處理消息隊列中的任務
區(qū)別在于:
-
2.3.1 執(zhí)行原理不同
-
work 命令是單進程的處理模式。
按照是否設置了
--daemon參數,work命令又可分為單次執(zhí)行和循環(huán)執(zhí)行兩種模式。- 單次執(zhí)行:不添加
--daemon參數,該模式下,work進程在處理完下一個消息后直接結束當前進程。當不存在新消息時,會sleep一段時間然后退出。 - 循環(huán)執(zhí)行:添加了
--daemon參數,該模式下,work進程會循環(huán)地處理隊列中的消息,直到內存超出參數配置才結束進程。當不存在新消息時,會在每次循環(huán)中sleep一段時間。
- 單次執(zhí)行:不添加
-
listen 命令是 父進程 + 子進程 的處理模式。
listen命令所在的父進程會創(chuàng)建一個單次執(zhí)行模式的work子進程,并通過該work子進程來處理隊列中的下一個消息,當這個work子進程退出之后,listen命令所在的父進程會監(jiān)聽到該子進程的退出信號,并重新創(chuàng)建一個新的單次執(zhí)行的work子進程
-
-
2.3.2 退出時機不同
- work 命令的退出時機在上面的執(zhí)行原理部分已敘述,此處不再重復
- listen 命令中,listen所在的父進程正常情況會一直運行,除非遇到下面兩種情況:
- 創(chuàng)建的某個work子進程的執(zhí)行時間超過了 listen命令行中的
--timeout參數配置,此時work子進程會被強制結束,listen所在的父進程也會拋出一個ProcessTimeoutException異常并退出。開發(fā)者可以選擇捕獲該異常,讓父進程繼續(xù)執(zhí)行,也可以選擇通過 supervisor 等監(jiān)控軟件重啟一個新的listen命令。 - listen 命令所在的父進程因某種原因存在內存泄露,則當父進程本身占用的內存超過了命令行中的
--memory參數配置時,父子進程均會退出。正常情況下,listen進程本身占用的內存是穩(wěn)定不變的。
- 創(chuàng)建的某個work子進程的執(zhí)行時間超過了 listen命令行中的
-
2.3.3 性能不同
work 命令是在腳本內部做循環(huán),框架腳本在命令執(zhí)行的初期就已加載完畢;
-
而listen模式則是處理完一個任務之后新開一個work進程,此時會重新加載框架腳本。
因此: work 模式的性能會比listen模式高。
注意:當代碼有更新時,work 模式下需要手動去執(zhí)行
php think queue:restart命令重啟隊列來使改動生效;而listen 模式會自動生效,無需其他操作。
-
2.3.4 超時控制能力
-
work 模式本質上既不能控制進程自身的運行時間,也無法限制執(zhí)行中的任務的執(zhí)行時間。
舉例來說,假如你在某次上線之后,在上文中的
\application\index\job\Hello.php消費者的fire方法中添加了一段死循環(huán) :public function fire(){ while(true){ //死循環(huán) $consoleOutPut->writeln("<info>I am looping forever inside a job.</info> \n"); sleep(1); } }那么這個循環(huán)將永遠不能停止,直到任務所在的進程超過內存限制或者由管理員手動結束。這個過程不會有任何的告警。更嚴重的是,如果你配置了expire ,那么這個死循環(huán)的任務可能會污染到同樣處理
helloJobQueue隊列的其他work進程,最后好幾個work進程將被卡死在這段死循環(huán)中。詳情后文會說明。work 模式下的超時控制能力,實際上應該理解為 多個work 進程配合下的過期任務重發(fā)能力。
-
而 listen命令可以限制其創(chuàng)建的work子進程的超時時間。
listen 命令可通過
--timeout參數限制work子進程允許運行的最長時間,超過該時間限制仍未結束的子進程會被強制結束; -
這里有必要補充一下 expire 和 timeout 之間的區(qū)別:
- expire 在配置文件中設置,timeout 在 listen命令 的命令行參數中設置,而且,expire 和 timeout 是兩個不同層次上的概念:
-
- expire 是指任務的過期時間。這個時間是全局的,影響到所有的work進程。(不管是獨立的work命令還是 listen 模式下創(chuàng)建的的work子進程) 。expire 針對的對象是 **任務**。
- timeout 是指work子進程的超時時間。這個時間只對當前執(zhí)行的listen 命令有效。timeout 針對的對象是 **work子進程**。
-
2.3.5 使用場景不同
根據上面的介紹,可以看到,
work 命令的適用場景是:
- 任務數量較多
- 性能要求較高
- 任務的執(zhí)行時間較短
- 消費者類中不存在死循環(huán),sleep() ,exit() ,die() 等容易導致bug的邏輯
listen命令的適用場景是:
- 任務數量較少
- 任務的執(zhí)行時間較長(如生成大型的excel報表等),
- 任務的執(zhí)行時間需要有嚴格限制
2.4 消息隊列的開始,停止與重啟
-
開始一個消息隊列:
php think queue:work
-
停止所有的消息隊列:
php think queue:restart -
重啟所有的消息隊列:
php think queue:restart php think queue:work
2.5 多模塊,多任務的處理
-
多模塊
單模塊項目推薦使用
app\job作為任務類的命名空間多模塊項目可用使用
app\module\job作為任務類的命名空間 也可以放在任意可以自動加載到的地方 -
多任務
如果一個任務類里有多個小任務的話,在發(fā)布任務時,需要用
任務的類名@方法名如app\lib\job\Job2@task1、app\lib\job\Job2@task2注意:命令行中的 --queue 參數不支持@解析
多任務例子:
- 在
\application\index\controller\JobTest.php控制器中,添加actionWithMultiTask()方法:
public function actionWithMultiTask(){ $taskType = $_GET['taskType']; switch ($whichTask) { case 'taskA': $jobHandlerClassName = 'application\index\job\MultiTask@taskA'; $jobDataArr = ['a' => '1']; $jobQueueName = "multiTaskJobQueue"; break; case 'taskB': $jobHandlerClassName = 'application\index\job\MultiTask@taskB'; $jobDataArr = ['b' => '2']; $jobQueueName = "multiTaskJobQueue"; break; default: break; } $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName); if ($isPushed !== false) { echo("the $taskType of MultiTask Job has been Pushed to ".$jobQueueName ."<br>"); }else{ throw new Exception("push a new $taskType of MultiTask Job Failed!"); } }- 新增
\application\index\job\MultiTask.php消費者類,并編寫其taskA()和taskB()方法
<?php /** * 文件路徑: \application\index\job\MultiTask.php * 這是一個消費者類,用于處理 multiTaskJobQueue 隊列中的任務 */ namespace application\index\job; use think\queue\Job; class MultiTask { public function taskA(Job $job,$data){ $isJobDone = $this->_doTaskA($data); if ($isJobDone) { $job->delete(); print("Info: TaskA of Job MultiTask has been done and deleted"."\n"); }else{ if ($job->attempts() > 3) { $job->delete(); } } } public function taskB(Job $job,$data){ $isJobDone = $this->_doTaskA($data); if ($isJobDone) { $job->delete(); print("Info: TaskB of Job MultiTask has been done and deleted"."\n"); }else{ if ($job->attempts() > 2) { $job->release(); } } } private function _doTaskA($data) { print("Info: doing TaskA of Job MultiTask "."\n"); return true; } private function _doTaskB($data) { print("Info: doing TaskB of Job MultiTask "."\n"); return true; } - 在
2.6 消息的延遲執(zhí)行與定時執(zhí)行
延遲執(zhí)行,相對于即時執(zhí)行,是用來限制某個任務的最早可執(zhí)行時刻。在到達該時刻之前,該任務會被跳過。
可以利用該功能實現(xiàn)定時任務。
使用方式:
- 在生產者業(yè)務代碼中:
// 即時執(zhí)行
$isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);
// 延遲 2 秒執(zhí)行
$isPushed = Queue::later( 2, $jobHandlerClassName, $jobDataArr, $jobQueueName);
// 延遲到 2017-02-18 01:01:01 時刻執(zhí)行
$time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now');
$isPushed = Queue::later($time2wait,$jobHandlerClassName, $jobDataArr, $jobQueueName);
- 在消費者類中:
// 重發(fā),即時執(zhí)行
$job->release();
// 重發(fā),延遲 2 秒執(zhí)行
$job->release(2);
// 延遲到 2017-02-18 01:01:01 時刻執(zhí)行
$time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now');
$job->release($time2wait);
- 在命令行中:
//如果消費者類的fire()方法拋出了異常且任務未被刪除時,將自動重發(fā)該任務,重發(fā)時,會設置其下次執(zhí)行前延遲多少秒,默認為0
php think queue:work --delay 3
2.7 消息的重發(fā)
thinkphp-queue 中,消息的重發(fā)時機有3種:
- 2.7.1 在消費者類中手動重發(fā):
if( $isJobDone === false){
$job->release();
}
- 2.7.2 work進程自動重發(fā),需同時滿足以下兩個條件
- 消費者類的 fire() 方法拋出了異常
- 任務未被刪除
- 2.7.3 當配置了 expire 不為
null時,work 進程內部每次查詢可用任務之前,會先自動重發(fā)已過期的任務。
補充:
在database 模式下,2.7.1 和 2.7.2 中的重發(fā)邏輯是先刪除原來的任務,然后插入一個新的任務。2.7.3 中的重發(fā)時機是直接更新原任務。
而在redis 模式下,3種重發(fā)都是先刪除再插入。
不管是哪種重發(fā)方式,重發(fā)之后,任務的已嘗試次數會在原來的基礎上 +1 。
此外,消費者類中需要注意,如果 fire() 方法中可能拋出異常,那么
- 如果不需要自動重發(fā)的話, 請在拋出異常之前將任務刪除
$job->delete(),以免產生bug。 - 如果需要自動重發(fā)的話,請直接拋出異常,不要在
fire()方法中又手動使用$job->release(), 這樣會導致該任務被重發(fā)兩次,產生兩個一樣的新任務。
2.8 任務的失敗回調及告警
當同時滿足以下條件時,將觸發(fā)任務失敗回調:
- 命令行的
--tries參數的值大于0 - 任務的已嘗試次數大于 命令行的
--tries參數 - 開發(fā)者添加了
queue_failed事件標簽及其對應的回調代碼 - 消費者類中定義了
failed()方法,用于接收任務失敗的通知
注意, queue_failed 標簽需要在安裝了 thinkphp-queue 之后 手動 去 \application\tags.php 文件中添加。
注意:該版本有bug,若想實現(xiàn)失敗任務回調功能,需要先修改位于 think-queue\src\queue\Worker.php 中的 logFailedJob方法 , 修改方式如下:
/**
* Log a failed job into storage.
* @param \Think\Queue\Job $job
* @return array
*/
protected function logFailedJob(Job $job)
{
// 將原來的 queue.failed' 修改為 'queue_failed' 才可以觸發(fā)任務失敗回調
if (Hook::listen('queue.failed', $job, null, true)) {
$job->delete();
$job->failed();
}
return ['job' => $job, 'failed' => true];
}
首先,我們添加 queue_failed 事件標簽, 及其對應的回調方法
// 文件路徑: \application\tags.php
// 應用行為擴展定義文件
return [
// 應用初始化
'app_init' => [],
// 應用開始
'app_begin' => [],
// 模塊初始化
'module_init' => [],
// 操作開始執(zhí)行
'action_begin' => [],
// 視圖內容過濾
'view_filter' => [],
// 日志寫入
'log_write' => [],
// 應用結束
'app_end' => [],
// 任務失敗統(tǒng)一回調,有四種定義方式
'queue_failed'=> [
// 數組形式,[ 'ClassName' , 'methodName']
['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues']
// 字符串(靜態(tài)方法),'StaicClassName::methodName'
// 'MyQueueFailedLogger::logAllFailedQueues'
// 字符串(對象方法),'ClassName',此時需在對應的ClassName類中添加一個名為 queueFailed 的方法
// 'application\\behavior\\MyQueueFailedLogger'
// 閉包形式
/*
function( &$jobObject , $extra){
// var_dump($jobObject);
return true;
}
*/
]
];
這里,我們選擇數組形式的回調方式,新增 \application\behavior\MyQueueFailedLogger 類,添加一個 logAllFailedQueues() 方法
<?php
/**
* 文件路徑: \application\behavior\MyQueueFailedLogger.php
* 這是一個行為類,用于處理所有的消息隊列中的任務失敗回調
*/
namespace application\behavior;
class MyQueueFailedLogger {
const should_run_hook_callback = true;
/**
* @param $jobObject \think\queue\Job //任務對象,保存了該任務的執(zhí)行情況和業(yè)務數據
* @return bool true //是否需要刪除任務并觸發(fā)其failed() 方法
*/
public function logAllFailedQueues(&$jobObject){
$failedJobLog = [
'jobHandlerClassName' => $jobObject->getName(), // 'application\index\job\Hello'
'queueName' => $jobObject->getQueue(), // 'helloJobQueue'
'jobData' => $jobObject->getRawBody()['data'], // '{'a': 1 }'
'attempts' => $jobObject->attempts(), // 3
];
var_export(json_encode($failedJobLog,true));
// $jobObject->release(); //重發(fā)任務
//$jobObject->delete(); //刪除任務
//$jobObject->failed(); //通知消費者類任務執(zhí)行失敗
return self::should_run_hook_callback;
}
}
需要注意該回調方法的返回值:
- 返回 true 時,系統(tǒng)會自動刪除該任務,并且自動調用消費者類中的
failed()方法 - 返回 false 時,系統(tǒng)不會自動刪除該任務,也不會自動調用消費者類中的
failed()方法,需要開發(fā)者另行處理失敗任務的刪除和通知。
最后,在消費者類中,添加 failed() 方法
/**
* 文件路徑: \application\index\job\HelloJob.php
*/
/**
* 該方法用于接收任務執(zhí)行失敗的通知,你可以發(fā)送郵件給相應的負責人員
* @param $jobData string|array|... //發(fā)布任務時傳遞的 jobData 數據
*/
public function failed($jobData){
send_mail_to_somebody() ;
print("Warning: Job failed after max retries. job data is :".var_export($data,true)."\n";
}
這樣,就可以做到任務失敗的記錄與告警
2.9 處理過期的任務
過期這個概念用文字比較難描述清楚,建議先看一下 深入理解 中 3.4 消息處理的詳細流程圖
三 深入理解
3.1 thinkphp-queue 中消息與隊列的保存方式
-
Redis
在 Redis 中,每一個 隊列 都三個key 與之對應 ,以 helloJobQueue 隊列舉例,其在redis 中的保存方式為:
key名 類型 說明 queues:helloJobQueue List , 列表 待執(zhí)行的任務列表 queues:helloJobQueue:delayed Sorted Set,有序集合 延遲執(zhí)行和定時執(zhí)行的任務集合 queues:helloJobQueue:reserved Sorted Set,有序集合 執(zhí)行中的任務集合 使用的
:分隔符, 只是用來表示相關key的關聯(lián)性。本身沒有特殊含義。使用分隔符是一種常見的組織key的方式。其中,在
queues:helloJobQueue列表中,每個元素的形式如下:[圖片上傳失敗...(image-7e44df-1540961256653)]
在
queues:helloJobQueue:delayed和queues:helloJobQueue:delayed有序集合中,每個元素的形式如下:[圖片上傳失敗...(image-65278a-1540961256653)]
可以看到,在有序集合中,每個元素代表一個任務,該元素的 Score 為該任務的入隊時間戳,任務的 value 為json 格式,保存了任務的執(zhí)行情況和業(yè)務數據。將value decode 為數組后形式如下:
[ 'job' => 'application\\index\\job\\Hello' , // jobHandlerClassName,消費者類的類名 'data' => [ // 生產者傳入的業(yè)務數據 'time' => '2017-02-18 16:20:10', 'data' => 'I have 648 apples' ], 'id' => '77IasdasadIasdadadadKL8t', // 一個隨機的32位字符串 'attempts' => 2 // 任務的已嘗試次數 ]redis驅動下,為了實現(xiàn)任務的延遲執(zhí)行和過期重發(fā),任務將在這三個key中來回轉移,詳情可見 3.5
-
Database
在 Database 中,每個任務對應到表中的一行,queue 字段用來區(qū)分不同的隊列。
表的字段結構如下:
[圖片上傳失敗...(image-2ed0e-1540961256653)]
其中,payLoad 字段保存了消息的執(zhí)行者和業(yè)務數據,payLoad 字段采用 json 格式的字符串來保存消息,將其 decode 為數組后形式如下:
[ 'job' => 'application\\index\\job\\Hello', // jobHandlerClassName,消費者類的類名 'data' => string|array|integer|object // 生產者傳入的業(yè)務數據 ]
3.2 thinkphp-queue 的目錄結構和類關系圖
[圖片上傳失敗...(image-8178aa-1540961256653)]
這些類構成了消息隊列中的幾個角色:
| 角色 | 類名 | 說明 |
|---|---|---|
| 命令行 | Command + Worker | 負責解析命令行參數,控制隊列的啟動,重啟 |
| 驅動 | Queue + Connector | 負責隊列的創(chuàng)建,以及消息的入隊,出隊等操作 |
| 任務 | Job | 用于將消息轉化為一個任務對象,供消費者使用 |
| 生產者 | 業(yè)務代碼 | 負責消息的創(chuàng)建與發(fā)布 |
| 消費者 | 業(yè)務代碼 | 負責任務的接收與執(zhí)行 |
各個類之間的關系圖如下:
[圖片上傳失敗...(image-7cae52-1540961256653)]
3.3 Deamon模式的執(zhí)行流程
[圖片上傳失敗...(image-863db9-1540961256653)]
3.4 Database模式下消息處理的詳細流程
下圖中,展示了database 模式下消息處理的詳細流程,redis 驅動下大體類似
[圖片上傳失敗...(image-fb9dee-1540961256653)]
3.5 redis 驅動下的任務重發(fā)細節(jié)
在redis驅動下,為了實現(xiàn)任務的延遲執(zhí)行和過期重發(fā),任務將在這三個key中來回轉移。
在3.4 Database模式下消息處理的消息流程中,我們知道,如果配置的expire 不是null ,那么 thinkphp-queue的work進程每次在獲取下一個可執(zhí)行任務之前,會先嘗試重發(fā)所有過期的任務。而在redis驅動下,這個步驟則做了更多的事情,詳情如下:
- 從
queue:xxx:delayed的key中查詢出有哪些任務在當前時刻已經可以開始執(zhí)行,然后將這些任務轉移到queue:xxx的key的尾部。 - 從
queue:xxx:reserved的key中查詢出有哪些任務在當前時刻已經過期,然后將這些任務轉移到queue:xxx的key的尾部。 - 嘗試從
queue:xxx的key的頭部取出一個任務,如果取出成功,那么,將這個任務轉移到queue:xxx:reserved的key 的頭部,同時將這個任務實例化成任務對象,交給消費者去執(zhí)行。
用圖來表示這個步驟的具體過程如下:
redis隊列中的過期任務重發(fā)步驟--執(zhí)行前:
[圖片上傳失敗...(image-186d8d-1540961256653)]
redis隊列中的過期任務重發(fā)步驟--執(zhí)行后:
[圖片上傳失敗...(image-bdec70-1540961256653)]
3.6 thinkphp-queue的性能
-
測試環(huán)境 :
虛擬機 Ubuntu 16.04 , PHP 7.1 ,TP5,Redis 3.2 , 雙核 I5 6400,3G 內存
-
測試方式 :
使用 Redis 驅動,在一個控制器中循環(huán)推送 40000 條消息到消息隊列;
使用
php think queue:work --daemon去消費這些消息,計算推送和消費各自所耗的時間。
-
測試結果 :
在最簡單的邏輯下,平均每秒中可推送8000個消息,平均每秒可消費200個消息。
注意:由于在測試時,Host 機本身的cpu和內存長期100%,并且虛擬機中的各項服務并未專門調優(yōu),因此該測試結果并不具備參考性。
3.7 thinkphp-queue 的N種錯誤使用姿勢
-
3.7.1 在 消費者類的
fire()方法中,忘記使用$job->delete()去刪除消息,這種情況下,會產生一系列的bug:配置的 expire 為
null, 則該任務被執(zhí)行一次后會永遠留在消息隊列中,占用消息隊列的空間 , 除非開發(fā)者另行處理。-
配置的 expire
不為 null,該任務在 expire 秒后被認為是過期任務,并被消息隊列還原為待執(zhí)行狀態(tài),在消息隊列的后面的循環(huán)中繼續(xù)被獲取,這時,如果- 命令行中的
--tries參數為0 或者未設置,那么每隔 一段時間該任務就會被執(zhí)行一次。 - 命令行中的
--tries參數 n 大于0 , 那么當這個任務被誤執(zhí)行的次數超過n 時,會由消息隊列嘗試去觸發(fā)失敗回調事件:- 如果開發(fā)者沒有編寫失敗處理的回調事件:那么該任務仍然不會被刪除,每隔一段時間就會被執(zhí)行一次。[這個可能屬于框架的Bug] ,
- 如果編寫了失敗回調事件
- 回調事件中刪除了任務,則這個任務被誤執(zhí)行了 n 次。
- 回調事件中未刪除任務,這時,如果:
- 回調事件返回值是 false,那么該任務仍然不會被刪除,每隔一段時間就會被執(zhí)行一次
- 回調事件返回值是 true, 那么該任務會先被刪除,然后觸發(fā)消費者類的 failed() 方法,如果在 failed() 方法中設置了告警,那么這個告警就是一次誤報。
- 命令行中的
因此,在 使用 thinkphp-queue 時,請記得:
- 任務完成后, 使用
$job->delete()刪除任務 - 在消費者類的
fire()方法中,使用$job->attempt()檢查任務已執(zhí)行次數,對于次數異常的,作相應的處理。 - 在消費者類的
fire()方法中,根據業(yè)務數據來判斷該任務是否已經執(zhí)行過,以避免該任務被重復執(zhí)行。 - 編寫失敗回調事件,將事件中失敗的任務及時通知給開發(fā)人員。
3.7.2 使用了
queue:work --daemon,但更新代碼后沒有使用queue:restart重啟 work 進程, 使得 work 進程中的代碼與最新的代碼不同,出現(xiàn)各種問題。3.7.3 使用了
queue:work --daemon,但是消費者類的 fire() 方法中存在死循環(huán),或sleep(n)等邏輯,導致消息隊列被堵塞;或者使用了exit(),die()這樣的邏輯,導致work進程直接終止 。3.7.4 配置的 expire 為
null,這時如果采用的是 Redis 驅動且使用了延遲功能,如later(n),release(n)方法或者--delay參數不為0 , 那么將導致被延遲的任務永遠無法處理。(這個可能屬于框架的Bug)3.7.5 配置的 expire 為
null,但并沒有自行處理過期的任務,導致過期的任務得不到處理,且一直占用消息隊列的空間。3.7.6 配置的 expire
不為null,但配置的 expire 時間太短,以至于 expire 時間 < 消費者的fire()方法所需時間 + 刪除該任務所需的時間 ,那么任務將被誤認為執(zhí)行超時,從而被消息隊列還原為待執(zhí)行狀態(tài)。3.7.7 使用
Queue::push($jobHandlerClassName , $jobData, $jobQueueName );推送任務時,$jobData中包含未序列化的對象。這時,在消費者端拿到的$jobData中拿到的是該對象的public 屬性的鍵值對數組。因此,需要在推送前手動序列化對象,在消費者端再手動反序列化還原為對象。
四 拓展
4.1 隊列的穩(wěn)定性和拓展性
- 穩(wěn)定性:不管是 listen 模式還是 work 模式,都建議使用 supervisor 或者 自定義的cron 腳本,去定時檢查 work 進程是否正常
- 拓展性: 當某個隊列的消費者不足時,再給這個隊列添加 work進程即可。
4.2 消息隊列的可視化管理工具
- 隊列管理,隊列的列表,隊列的 work 進程數量控制,隊列的任務數量變化趨勢 //TBD
- 任務管理,任務的列表,添加/撤回/查詢任務,修改任務的 執(zhí)行者/執(zhí)行時間/優(yōu)先級/數據 等 //TBD
4.2 編寫自定義的 thinkphp-queue 驅動
//TBD
4.3 編寫消息隊列的單元測試
//TBD
4.4 與其他PHP消息隊列庫的對比
TP5的消息隊列與Laravel的消息隊列比較相似,下面是與laravel 中的消息隊列的一些對比:
| thinkphp-queue (v1.1.2) | laravel-queue (v5.3) | |
|---|---|---|
| 內置的驅動 | Database,Redis,Sync,TopThink | Database,Redis, Sync(在laravel中稱為 null)。 |
| Redis驅動要求 | 安裝redis的C擴展 | 安裝 predis 包 + LUA腳本 |
| 推送任務 | 允許推送 消費者類名,消費者對象 | 允許推送消費者類名,消費者對象,閉包 |
| 失敗任務處理 | 觸發(fā)失敗回調事件 (有Bug) | 觸發(fā)失敗回調事件 + 移動任務到 failed_jobs表? |
| 消息訂閱 | subscribe 命令+ Topthink驅動(注:未實現(xiàn)/未提供) | subscribe 命令 + 安裝IronMQ 驅動 |
| 刪除任務 | 消費者類中手動刪除 | 任務完成后自動刪除 |
| 推送到多個隊列 | 需自己實現(xiàn) | 原生支持 |
| 延遲執(zhí)行 | 支持 (有Bug) | 支持 |
| 消息重發(fā) | 支持 | 支持 |
| 檢查已執(zhí)行次數 | 原生支持 | 需在消費者類中顯式 use 相關的 trait |
| 執(zhí)行方式 | work 模式 + listen 模式 | work 模式 + listen 模式 |
| 進程命令 | 開啟,停止,重啟 | 開啟,停止,重啟 |
| 任務命令 | 無 | 展示失敗任務列表,重試某個失敗任務,刪除某個失敗任務 |
| 支持的事件 | 失敗回調事件 | 失敗回調事件,支持消費前事件,消費后事件 |
五 待討論的問題
5.1 thinkphp-queue 中消息名稱與消費者類名的綁定怎么實現(xiàn)?
5.1 消息中的業(yè)務數據的格式如何約定?
https://github.com/coolseven/notes/blob/master/thinkphp-queue/README.md