thinkphp6 redis消息隊(duì)列簡單教程
- 安裝 thinkphp-queue
composer require topthink/think-queue
也可指定版本:composer require topthink/think-queue:2.*
搭建消息隊(duì)列的存儲(chǔ)環(huán)境
使用 Redis
安裝并啟動(dòng) Redis 服務(wù) 看你個(gè)人使用的windows下還是linux下的redis服務(wù)(或者其他);可自行搭建(這里不做介紹)配置消息隊(duì)列的驅(qū)動(dòng)
根據(jù)選擇的存儲(chǔ)方式,在\app\config\queue.php這個(gè)配置文件中,添加消息隊(duì)列對應(yīng)的驅(qū)動(dòng)配置
<?php
return [
'default' => 'redis',
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
],
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
?>
- 消息的創(chuàng)建與推送
我們在業(yè)務(wù)控制器中創(chuàng)建一個(gè)新的消息,并推送到 helloJobQueue 隊(duì)列
新增 \app\controller\Test.php 控制器,在該控制器中添加 actionWithHelloJob 方法
<?php
namespace app\controller;
use app\BaseController;
use think\facade\Queue;
use think\facade\Db;
class Test extends BaseController
{
public function index()
{
echo 222;
}
/*
* 測試隊(duì)列action
* */
public function actionWithHelloJob(){
// 1.當(dāng)前任務(wù)將由哪個(gè)類來負(fù)責(zé)處理。
// 當(dāng)輪到該任務(wù)時(shí),系統(tǒng)將生成一個(gè)該類的實(shí)例,并調(diào)用其 fire 方法
$jobHandlerClassName = 'app\index\job\Hello@fire';
// 2.當(dāng)前任務(wù)歸屬的隊(duì)列名稱,如果為新隊(duì)列,會(huì)自動(dòng)創(chuàng)建
$jobQueueName = "helloJobQueue";
// 3.當(dāng)前任務(wù)所需的業(yè)務(wù)數(shù)據(jù) . 不能為 resource 類型,其他類型最終將轉(zhuǎn)化為json形式的字符串
// ( jobData 為對象時(shí),需要在先在此處手動(dòng)序列化,否則只存儲(chǔ)其public屬性的鍵值對)
$jobData = [ 'name' => 'test'.rand(), 'password'=>rand()] ;
// 4.將該任務(wù)推送到消息隊(duì)列,等待對應(yīng)的消費(fèi)者去執(zhí)行
$time2wait = strtotime('2021-06-09 11:51:00') - strtotime('now'); // 定時(shí)執(zhí)行
$isPushed = Queue::later($time2wait, $jobHandlerClassName , $jobData , $jobQueueName );
// database 驅(qū)動(dòng)時(shí),返回值為 1|false ; redis 驅(qū)動(dòng)時(shí),返回值為 隨機(jī)字符串|false
if( $isPushed !== false ){
echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";
}else{
echo 'Oops, something went wrong.';
}
}
}
除了 Queue::push( jobHandlerClassName , jobData , jobQueueName );這種方式之外,還可以直接傳入 Queue::push( jobHandlerObject ,null , jobQueueName ); 這時(shí),需要在 jobHandlerObject 中定義一個(gè) handle() 方法,消息隊(duì)列在執(zhí)行到該任務(wù)時(shí)會(huì)自動(dòng)反序列化該對象,并調(diào)用其 handle()方法。 該方式的缺點(diǎn)是無法傳入自定義數(shù)據(jù)。
- 消息的消費(fèi)與刪除
編寫 Hello 消費(fèi)者類,用于處理 helloJobQueue 隊(duì)列中的任務(wù)
新增 \app\job\Hello.php 消費(fèi)者類(app目錄下新建job目錄,然后創(chuàng)建Hello.php文件),并編寫其 fire() 方法
<?php
/**
* 文件路徑: \application\index\job\Hello.php
* 這是一個(gè)消費(fèi)者類,用于處理 helloJobQueue 隊(duì)列中的任務(wù)
*/
namespace app\job;
use think\queue\job;
use think\facade\Db;
class Hello {
/**
* fire方法是消息隊(duì)列默認(rèn)調(diào)用的方法
* @param Job $job 當(dāng)前的任務(wù)對象
* @param array|mixed $data 發(fā)布任務(wù)時(shí)自定義的數(shù)據(jù)
*/
public function fire(Job $job,$data){
// 如有必要,可以根據(jù)業(yè)務(wù)需求和數(shù)據(jù)庫中的最新數(shù)據(jù),判斷該任務(wù)是否仍有必要執(zhí)行.
$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
if(!$isJobStillNeedToBeDone){
$job->delete();
return;
}
$isJobDone = $this->doHelloJob($data);
if ($isJobDone) {
//如果任務(wù)執(zhí)行成功, 記得刪除任務(wù)
$job->delete();
}else{
if ($job->attempts() > 3) {
//通過這個(gè)方法可以檢查這個(gè)任務(wù)已經(jīng)重試了幾次了
$job->delete();
// 也可以重新發(fā)布這個(gè)任務(wù)
//$job->release(2); //$delay為延遲時(shí)間,表示該任務(wù)延遲2秒后再執(zhí)行
}
}
}
/**
* 有些消息在到達(dá)消費(fèi)者時(shí),可能已經(jīng)不再需要執(zhí)行了
* @param array|mixed $data 發(fā)布任務(wù)時(shí)自定義的數(shù)據(jù)
* @return boolean 任務(wù)執(zhí)行的結(jié)果
*/
private function checkDatabaseToSeeIfJobNeedToBeDone($data){
return true;
}
/**
* 根據(jù)消息中的數(shù)據(jù)進(jìn)行實(shí)際的業(yè)務(wù)處理
* @param array|mixed $data 發(fā)布任務(wù)時(shí)自定義的數(shù)據(jù)
* @return boolean 任務(wù)執(zhí)行的結(jié)果
*/
private function doHelloJob($data) {
// 根據(jù)消息中的數(shù)據(jù)進(jìn)行實(shí)際的業(yè)務(wù)處理...
// test
Db::name('admin')->insert([
'name'=>$data['name'],
'password'=>$data['password']
]);
return true;
}
}
發(fā)布任務(wù)
在瀏覽器中訪問 http://localhost/index.php/test/actionWithHelloJob 可以看到消息推送成功。-
處理任務(wù)
切換當(dāng)前終端窗口的目錄到項(xiàng)目根目錄下,執(zhí)行
php think queue:work --queue helloJobQueue
可以看到執(zhí)行的結(jié)果類似如下:
image.png
同時(shí)打開 redis 可視化工具 Redis Desktop Manager (沒有的話 可下載去安裝,然后連接上你的redis,就可以查看到工具了)
image.png
以上是我使用thinkphp6 做的操作!

