thinkphp6 redis消息隊(duì)列簡單教程 --- 2021-06-09

thinkphp6 redis消息隊(duì)列簡單教程

  1. 安裝 thinkphp-queue
composer require topthink/think-queue

也可指定版本:composer require topthink/think-queue:2.*

  1. 搭建消息隊(duì)列的存儲(chǔ)環(huán)境
    使用 Redis
    安裝并啟動(dòng) Redis 服務(wù) 看你個(gè)人使用的windows下還是linux下的redis服務(wù)(或者其他);可自行搭建(這里不做介紹)

  2. 配置消息隊(duì)列的驅(qū)動(dòng)
    根據(jù)選擇的存儲(chǔ)方式,在\app\config\queue.php這個(gè)配置文件中,添加消息隊(duì)列對應(yīng)的驅(qū)動(dòng)配置

<?php

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'  => 'database',
            'queue' => 'default',
            'table' => 'jobs',
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => '127.0.0.1',
            'port'       => 6379,
            'password'   => '',
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

?>
  1. 消息的創(chuàng)建與推送
    我們在業(yè)務(wù)控制器中創(chuàng)建一個(gè)新的消息,并推送到 helloJobQueue 隊(duì)列
    新增 \app\controller\Test.php 控制器,在該控制器中添加 actionWithHelloJob 方法
<?php
namespace app\controller;
use app\BaseController;
use think\facade\Queue;
use think\facade\Db;
class Test extends BaseController
{
    public function index()
    {
        echo 222;
    }
    /*
    * 測試隊(duì)列action
    * */
    public function actionWithHelloJob(){
        // 1.當(dāng)前任務(wù)將由哪個(gè)類來負(fù)責(zé)處理。
        // 當(dāng)輪到該任務(wù)時(shí),系統(tǒng)將生成一個(gè)該類的實(shí)例,并調(diào)用其 fire 方法
        $jobHandlerClassName  = 'app\index\job\Hello@fire';
        // 2.當(dāng)前任務(wù)歸屬的隊(duì)列名稱,如果為新隊(duì)列,會(huì)自動(dòng)創(chuàng)建
        $jobQueueName     = "helloJobQueue";
        // 3.當(dāng)前任務(wù)所需的業(yè)務(wù)數(shù)據(jù) . 不能為 resource 類型,其他類型最終將轉(zhuǎn)化為json形式的字符串
        // ( jobData 為對象時(shí),需要在先在此處手動(dòng)序列化,否則只存儲(chǔ)其public屬性的鍵值對)
        $jobData          = [ 'name' => 'test'.rand(), 'password'=>rand()] ;
        // 4.將該任務(wù)推送到消息隊(duì)列,等待對應(yīng)的消費(fèi)者去執(zhí)行
        $time2wait = strtotime('2021-06-09 11:51:00') - strtotime('now');  // 定時(shí)執(zhí)行
        $isPushed = Queue::later($time2wait, $jobHandlerClassName , $jobData , $jobQueueName );
        // database 驅(qū)動(dòng)時(shí),返回值為 1|false  ;   redis 驅(qū)動(dòng)時(shí),返回值為 隨機(jī)字符串|false
        if( $isPushed !== false ){
            echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";
        }else{
            echo 'Oops, something went wrong.';
        }
    }

}

除了 Queue::push( jobHandlerClassName , jobData , jobQueueName );這種方式之外,還可以直接傳入 Queue::push( jobHandlerObject ,null , jobQueueName ); 這時(shí),需要在 jobHandlerObject 中定義一個(gè) handle() 方法,消息隊(duì)列在執(zhí)行到該任務(wù)時(shí)會(huì)自動(dòng)反序列化該對象,并調(diào)用其 handle()方法。 該方式的缺點(diǎn)是無法傳入自定義數(shù)據(jù)。

  1. 消息的消費(fèi)與刪除
    編寫 Hello 消費(fèi)者類,用于處理 helloJobQueue 隊(duì)列中的任務(wù)
    新增 \app\job\Hello.php 消費(fèi)者類(app目錄下新建job目錄,然后創(chuàng)建Hello.php文件),并編寫其 fire() 方法
<?php
/**
 * 文件路徑: \application\index\job\Hello.php
 * 這是一個(gè)消費(fèi)者類,用于處理 helloJobQueue 隊(duì)列中的任務(wù)
 */
namespace app\job;
use think\queue\job;
use think\facade\Db;

class Hello {
    /**
     * fire方法是消息隊(duì)列默認(rèn)調(diào)用的方法
     * @param Job            $job      當(dāng)前的任務(wù)對象
     * @param array|mixed    $data     發(fā)布任務(wù)時(shí)自定義的數(shù)據(jù)
     */
    public function fire(Job $job,$data){
        // 如有必要,可以根據(jù)業(yè)務(wù)需求和數(shù)據(jù)庫中的最新數(shù)據(jù),判斷該任務(wù)是否仍有必要執(zhí)行.
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(!$isJobStillNeedToBeDone){
            $job->delete();
            return;
        }

        $isJobDone = $this->doHelloJob($data);

        if ($isJobDone) {
            //如果任務(wù)執(zhí)行成功, 記得刪除任務(wù)
            $job->delete();
        }else{
            if ($job->attempts() > 3) {
                //通過這個(gè)方法可以檢查這個(gè)任務(wù)已經(jīng)重試了幾次了
                $job->delete();
                // 也可以重新發(fā)布這個(gè)任務(wù)
                //$job->release(2); //$delay為延遲時(shí)間,表示該任務(wù)延遲2秒后再執(zhí)行
            }
        }
    }

    /**
     * 有些消息在到達(dá)消費(fèi)者時(shí),可能已經(jīng)不再需要執(zhí)行了
     * @param array|mixed    $data     發(fā)布任務(wù)時(shí)自定義的數(shù)據(jù)
     * @return boolean                 任務(wù)執(zhí)行的結(jié)果
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data){
        return true;
    }

    /**
     * 根據(jù)消息中的數(shù)據(jù)進(jìn)行實(shí)際的業(yè)務(wù)處理
     * @param array|mixed    $data     發(fā)布任務(wù)時(shí)自定義的數(shù)據(jù)
     * @return boolean                 任務(wù)執(zhí)行的結(jié)果
     */
    private function doHelloJob($data) {
        // 根據(jù)消息中的數(shù)據(jù)進(jìn)行實(shí)際的業(yè)務(wù)處理...
        // test
        Db::name('admin')->insert([
            'name'=>$data['name'],
            'password'=>$data['password']
        ]);
        return true;
    }

}

  1. 發(fā)布任務(wù)
    在瀏覽器中訪問 http://localhost/index.php/test/actionWithHelloJob 可以看到消息推送成功。

  2. 處理任務(wù)
    切換當(dāng)前終端窗口的目錄到項(xiàng)目根目錄下,執(zhí)行
    php think queue:work --queue helloJobQueue
    可以看到執(zhí)行的結(jié)果類似如下:


    image.png

    同時(shí)打開 redis 可視化工具 Redis Desktop Manager (沒有的話 可下載去安裝,然后連接上你的redis,就可以查看到工具了)


    image.png

以上是我使用thinkphp6 做的操作!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容