轉(zhuǎn)載 thinkphp-queue 淺析

thinkphp-queue 淺析
傳統(tǒng)的程序執(zhí)行流程一般是 即時|同步|串行的,在某些場景下,會存在并發(fā)低,吞吐量低,響應(yīng)時間長等問題。在大型系統(tǒng)中,一般會引入消息隊列的組件,將流程中部分任務(wù)抽離出來放入消息隊列,并由專門的消費者做針對性的處理,從而降低系統(tǒng)耦合度,提高系統(tǒng)性能和可用性。

一般來說,可以抽離的任務(wù)具有以下的特點:

  • 允許延后|異步|并行處理 (相對于傳統(tǒng)的 即時|同步|串行 的執(zhí)行方式)
    • 允許延后
      搶購活動時,先快速緩沖有限的參與人數(shù)到消息隊列,后續(xù)再排隊處理實際的搶購業(yè)務(wù);
    • 允許異步
      業(yè)務(wù)處理過程中的郵件,短信等通知
    • 允許并行
      用戶支付成功之后,郵件通知,微信通知,短信通知可以由多個不同的消費者并行執(zhí)行,通知到達的時間不要求先后順序。
  • 允許失敗和重試
    • 強一致性的業(yè)務(wù)放入核心流程處理
    • 無一致性要求或最終一致即可的業(yè)務(wù)放入隊列處理

thinkphp-queue 是thinkphp 官方提供的一個消息隊列服務(wù),它支持消息隊列的一些基本特性:

  • 消息的發(fā)布,獲取執(zhí)行,刪除,重發(fā)失敗處理,延遲執(zhí)行,超時控制
  • 隊列的多隊列, 內(nèi)存限制 ,啟動,停止,守護
  • 消息隊列可降級為同步執(zhí)行

thinkphp-queue 內(nèi)置了 Redis,Database,Topthink ,Sync這四種驅(qū)動。本文主要介紹 thinkphp-queue 結(jié)合其內(nèi)置的 redis 驅(qū)動的使用方式和基本原理。

一、簡單使用示例
1.1 安裝 thinkphp-queue

composer install thinkphp-queue

1.2 搭建消息隊列的存儲環(huán)境
1.3 配置消息隊列的驅(qū)動

根據(jù)選擇的存儲方式,在\application\extra\queue.php這個配置文件中,添加消息隊列對應(yīng)的驅(qū)動配置

return [
       'connector'  => 'Redis',         // Redis 驅(qū)動
       'expire'     => 60,              // 任務(wù)的過期時間,默認為60秒; 若要禁用,則設(shè)置為 null 
       'default'    => 'default',       // 默認的隊列名稱
       'host'       => '127.0.0.1',     // redis 主機ip
       'port'       => 6379,            // redis 端口
       'password'   => '',              // redis 密碼
       'select'     => 1,               // 使用哪一個 db,默認為 db0
       'timeout'    => 0,               // redis連接的超時時間
       'persistent' => false,           // 是否是長連接

   //    'connector' => 'Database',   // 數(shù)據(jù)庫驅(qū)動
   //    'expire'    => 60,           // 任務(wù)的過期時間,默認為60秒; 若要禁用,則設(shè)置為 null
   //    'default'   => 'default',    // 默認的隊列名稱
   //    'table'     => 'jobs',       // 存儲消息的表名,不帶前綴
   //    'dsn'       => [],

   //    'connector'   => 'Topthink',   // ThinkPHP內(nèi)部的隊列通知服務(wù)平臺 ,本文不作介紹
   //    'token'       => '',
   //    'project_id'  => '',
   //    'protocol'    => 'https',
   //    'host'        => 'qns.topthink.com',
   //    'port'        => 443,
   //    'api_version' => 1,
   //    'max_retries' => 3,
   //    'default'     => 'default',

   //    'connector'   => 'Sync',       // Sync 驅(qū)動,該驅(qū)動的實際作用是取消消息隊列,還原為同步執(zhí)行
   ];

1.4 消息的創(chuàng)建與推送

我們在業(yè)務(wù)控制器中創(chuàng)建一個新的消息,并推送到 helloJobQueue 隊列
新增 \application\index\controller\JobTest.php 控制器,在該控制器中添加 actionWithHelloJob 方法

<?php
namespace app\index\controller;
use think\Queue;
class JobTest
{
    /*
     * 測試隊列action
     * */
    public function actionWithHelloJob(){
        // 1.當(dāng)前任務(wù)將由哪個類來負責(zé)處理。
        // 當(dāng)輪到該任務(wù)時,系統(tǒng)將生成一個該類的實例,并調(diào)用其 fire 方法
        $jobHandlerClassName  = 'app\index\job\Hello@fire';
        // 2.當(dāng)前任務(wù)歸屬的隊列名稱,如果為新隊列,會自動創(chuàng)建
        $jobQueueName     = "helloJobQueue";
        // 3.當(dāng)前任務(wù)所需的業(yè)務(wù)數(shù)據(jù) . 不能為 resource 類型,其他類型最終將轉(zhuǎn)化為json形式的字符串
        // ( jobData 為對象時,需要在先在此處手動序列化,否則只存儲其public屬性的鍵值對)
        $jobData          = [ 'name' => 'test'.rand(), 'password'=>rand()] ;
        // 4.將該任務(wù)推送到消息隊列,等待對應(yīng)的消費者去執(zhí)行
        $time2wait = strtotime('2018-09-08 11:15:00') - strtotime('now');  // 定時執(zhí)行
        $isPushed = Queue::later($time2wait, $jobHandlerClassName , $jobData , $jobQueueName );
        // database 驅(qū)動時,返回值為 1|false  ;   redis 驅(qū)動時,返回值為 隨機字符串|false
        if( $isPushed !== false ){
            echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";
        }else{
            echo 'Oops, something went wrong.';
        }
    }
}

  • 除了 Queue::push( jobHandlerClassName , jobData , jobQueueName );這種方式之外,還可以直接傳入 Queue::push( jobHandlerObject ,null , jobQueueName ); 這時,需要在 jobHandlerObject 中定義一個 handle() 方法,消息隊列在執(zhí)行到該任務(wù)時會自動反序列化該對象,并調(diào)用其 handle()方法。 該方式的缺點是無法傳入自定義數(shù)據(jù)。
1.5 消息的消費與刪除

編寫 Hello 消費者類,用于處理 helloJobQueue 隊列中的任務(wù)
新增 \application\index\job\Hello.php 消費者類,并編寫其 fire() 方法

<?php
/**
 * 文件路徑: \application\index\job\Hello.php
 * 這是一個消費者類,用于處理 helloJobQueue 隊列中的任務(wù)
 */
namespace app\index\job;
use think\queue\Job;
use think\Db;

class Hello {
    /**
     * fire方法是消息隊列默認調(diào)用的方法
     * @param Job            $job      當(dāng)前的任務(wù)對象
     * @param array|mixed    $data     發(fā)布任務(wù)時自定義的數(shù)據(jù)
     */
    public function fire(Job $job,$data){
        // 如有必要,可以根據(jù)業(yè)務(wù)需求和數(shù)據(jù)庫中的最新數(shù)據(jù),判斷該任務(wù)是否仍有必要執(zhí)行.
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(!$isJobStillNeedToBeDone){
            $job->delete();
            return;
        }

        $isJobDone = $this->doHelloJob($data);

        if ($isJobDone) {
            //如果任務(wù)執(zhí)行成功, 記得刪除任務(wù)
            $job->delete();
        }else{
            if ($job->attempts() > 3) {
                //通過這個方法可以檢查這個任務(wù)已經(jīng)重試了幾次了
                $job->delete();
                // 也可以重新發(fā)布這個任務(wù)
                //$job->release(2); //$delay為延遲時間,表示該任務(wù)延遲2秒后再執(zhí)行
            }
        }
    }

    /**
     * 有些消息在到達消費者時,可能已經(jīng)不再需要執(zhí)行了
     * @param array|mixed    $data     發(fā)布任務(wù)時自定義的數(shù)據(jù)
     * @return boolean                 任務(wù)執(zhí)行的結(jié)果
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data){
        return true;
    }

    /**
     * 根據(jù)消息中的數(shù)據(jù)進行實際的業(yè)務(wù)處理
     * @param array|mixed    $data     發(fā)布任務(wù)時自定義的數(shù)據(jù)
     * @return boolean                 任務(wù)執(zhí)行的結(jié)果
     */
    private function doHelloJob($data) {
        // 根據(jù)消息中的數(shù)據(jù)進行實際的業(yè)務(wù)處理...
        // test
        Db::name('admin')->insert([
            'name'=>$data['name'],
            'password'=>$data['password']
        ]);
        return true;
    }
}

1.6 發(fā)布任務(wù)

在瀏覽器中訪問 http://localhost/test/public/index.php/index/job_test/actionWithHelloJob 可以看到消息推送成功。

image
1.7 處理任務(wù)

切換當(dāng)前終端窗口的目錄到項目根目錄下,執(zhí)行
php think queue:work --queue helloJobQueue
可以看到執(zhí)行的結(jié)果類似如下:

image

同時打開 redis 可視化工具 Redis Desktop Manager (ps:附上百度網(wǎng)盤下載鏈接:鏈接:https://pan.baidu.com/s/1-k79ZEGD77EtUQFolJosXA 密碼:xcfm)

image
  • 可見由于我設(shè)置了定時執(zhí)行,所以隊列處于待執(zhí)行狀態(tài)。當(dāng)執(zhí)行時間到了,隊列會自動執(zhí)行。
至此,我們成功地經(jīng)歷了一個消息的 創(chuàng)建 -> 推送 -> 消費 -> 刪除 的基本流程
二、詳細介紹

介紹 thinkphp-queue 的詳細使用方法。如配置介紹,基本原理,各種特殊情況的處理等
注:在此篇文章不做詳細說明,如需了解請參鑒
https://github.com/coolseven/notes/blob/master/thinkphp-queue/README.md

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容