隊列相關(guān)知識整理

1.概述

1.1 隊列的使用場景

  • 削峰:并發(fā)情況下,數(shù)據(jù)庫的壓力非常大,此時可以將一部分非必需即可返回結(jié)果的操作轉(zhuǎn)為隊列來消費,達到對系統(tǒng)的緩沖效果。

  • 解耦:A功能需要觸發(fā)B,C,D功能的一些行為,如果直接在A模塊的代碼調(diào)用B,C,D模塊的方法,代碼就會慢慢變得混亂不堪,此時是用隊列分發(fā)來觸發(fā)各自的行為,達到解耦。

  • 異步:比如公眾號開發(fā)是,注冊的時候需要給用戶推送通知,如果用同步調(diào)用微信接口的方式,可能會因為網(wǎng)絡(luò)問題或者微信接口方面的問題導(dǎo)致流程阻塞,這時候改成異步的,可以極高的提高響應(yīng)速度。

1.2 可用于隊列設(shè)計的有以下各類中間件:

  • Redis
  • Kafka
  • MQ類:RocketMQ && RabbitMQ && ActiveMQ && ZeroMQ

2.分別闡述各種中間件的特點

2.1 Redis

redis的本身定位是緩存數(shù)據(jù)庫,并非用于隊列。但它提供的數(shù)據(jù)類型 list 和發(fā)布/訂閱模式,可以用于隊列的設(shè)計。

  • lpush/brpop : 基于list的隊列設(shè)計有個缺陷就是,所有隊列相關(guān)的失敗重試,一致性等問題都由業(yè)務(wù)方來保證,無法從應(yīng)用層面來提供解決方案,redis宕機沒用持久化的話,會數(shù)據(jù)丟失,當然集群和持久化設(shè)置可避免。

  • 發(fā)布/訂閱:此方式是用于廣播的,即一次發(fā)布,關(guān)注該topic的多個訂閱者都收到,如若用于隊列,即只能有一個訂閱者,多個也沒用,只是重復(fù)消費,另外此方式并沒保存發(fā)布數(shù)據(jù),如若網(wǎng)絡(luò)抖動或者redis宕機會產(chǎn)生數(shù)據(jù)丟失,不可忍受。

缺點:綜上,可靠性沒保證,且只提高數(shù)據(jù)結(jié)構(gòu),邏輯需要自己實現(xiàn)。

redis有另外一個產(chǎn)品——disque,專門作為隊列使用的。

2.2 Kafka

kafka的最大的優(yōu)勢就是快,即使普通的服務(wù)器,kafka也能輕松支持每秒百萬級的寫入效率。這么快的原因是:
1)寫入方式:以順序IO代替隨機IO, 當然只是這個是不夠的,很多軟件都已經(jīng)懂得這么做了,還有一個重要的原因是:它不是實時寫入硬盤,充分利用了分頁存儲來利用內(nèi)存提高效率。這里采用的是mmap,即內(nèi)存映射文件,完成映射之后你對物理內(nèi)存的操作,操作系統(tǒng)都會在適當?shù)臅r候同步到硬盤上。當然這里也有一個缺陷——不可靠,寫到mmap中的數(shù)據(jù)并沒有被真正的寫到硬盤,操作系統(tǒng)會在程序主動調(diào)用flush的時候才把數(shù)據(jù)真正的寫到硬盤。
2)讀取方式:基于sendfile實現(xiàn)Zero Copy。以前需要在內(nèi)核緩存區(qū)和用戶緩沖區(qū)的來來回回4次copy,被優(yōu)化到只需要2次,即只需要將數(shù)據(jù)從磁盤讀入內(nèi)核緩沖區(qū),在調(diào)用sendfile直接將數(shù)據(jù)copy到網(wǎng)卡緩存這兩步。 還有一個原因,kafka實現(xiàn)了端對端的批量壓縮,目前Kafka支持GZIP和Snappy壓縮協(xié)議。

kafka除了速度快,還支持分組,可以多個生產(chǎn)者,多個消費者,支持負載均衡,支持事務(wù)。

缺點:

  • Kafka單機超過64個隊列/分區(qū),Load會發(fā)生明顯的飆高現(xiàn)象,隊列越多,load越高,發(fā)送消息響應(yīng)時間變長
  • 使用短輪詢方式,實時性取決于輪詢間隔時間;
  • 消費失敗不支持重試;
  • 支持消息順序,但是一臺代理宕機后,就會產(chǎn)生消息亂序;
  • 社區(qū)更新較慢;
  • 運維難度大,它不僅僅需要關(guān)注整個集群之內(nèi)像broker、controller類似的角色,還需要關(guān)注其所依賴的一些產(chǎn)品像ZooKeeper等。你需要考慮一些集群產(chǎn)生的問題,比如“腦裂”,ZooKeeper不可用等問題。然一旦涉及集群,這個問題就存在,并非kafka特有。還有磁盤的問題,kafka的消息不是消費完就沒了,而是根據(jù)的你的設(shè)置多久過期,這樣就很可能產(chǎn)生磁盤數(shù)據(jù)量堆積問題。當

2.3 MQ類:RocketMQ && RabbitMQ && ActiveMQ && ZeroMQ

MQ類本身就是為隊列而生的。隊列該有的,這些MQ類產(chǎn)品大多數(shù)都有,只是有一些功能上的區(qū)別和性能的差異。

這里主要介紹一下RocketMQ 。
根據(jù)GitHub上Apache對它的介紹,有以下功能:

  • 發(fā)布/訂閱消息傳遞模型
  • 財務(wù)級交易消息
  • 各種跨語言客戶端,例如Java,C / C ++,Python,Go
  • 可插拔的傳輸協(xié)議,例如TCP,SSL,AIO
  • 內(nèi)置的消息跟蹤功能,還支持開放式跟蹤
  • 多功能的大數(shù)據(jù)和流生態(tài)系統(tǒng)集成
  • 按時間或偏移量追溯消息
  • 可靠的FIFO和嚴格的有序消息傳遞在同一隊列中
  • 高效的推拉消費模型
  • 單個隊列中的百萬級消息累積容量
  • 多種消息傳遞協(xié)議,例如JMS和OpenMessaging
  • 靈活的分布式橫向擴展部署架構(gòu)
  • 快如閃電的批量消息交換系統(tǒng)
  • 各種消息過濾器機制,例如SQL和Tag
  • 用于隔離測試和云隔離群集的Docker映像
  • 功能豐富的管理儀表板,用于配置,指標和監(jiān)視
  • 認證與授權(quán)

RocketMQ 主要有四大核心組成部分:NameServer、Broker、Producer以及Consumer四部分。

RocketMQ什么都是集群部署的,這是他吞吐量大,高可用的原因之一,集群的模式也很花哨,可以支持多master 模式、多master多slave異步復(fù)制模式、多 master多slave同步雙寫模式。

支持負載均衡,支持集群,支持訂閱形式和消息分發(fā),支持順序消息,支持消息確認,支持指定時間點的回溯,支持消息重試。

性能方面也僅次于kafka,zeroMQ。

本身就是為隊列定制的,缺點就是對輕量級項目來說太復(fù)雜了。最好就是有公司統(tǒng)一的一套隊列實現(xiàn)方案,提供公司內(nèi)的各個團隊使用,并支持各種語言棧的調(diào)用。

3.隊列設(shè)計

考慮到成本、運維復(fù)雜度、設(shè)計的靈活性/自由度、原有項目語言(php)的限制,采用redis來實現(xiàn)。

3.1 常用隊列的類型

image.png

3.2 隊列的消費模型

3.2.1 采用php常駐進程模式

通常隊列消費,我們是每個隊列起一個或多個消費進程,網(wǎng)上有看到這樣的一個延時隊列的設(shè)計模型:


image.png

這里是用到了php的一個pcntl 進程控制的擴展,由dq-mster: 主進程,負責(zé)管理子進程的創(chuàng)建,銷毀,回收以及信號通知。
對于我們來說,這種方案有以下缺點:

  • 所有的隊列會在一個服務(wù)里面消費,沒有區(qū)分隊列的類型,如果某個隊列阻塞,只是單純的加子進程,沒有針對性,當然這個問題是有辦法解決的;
  • 常駐進程會由內(nèi)存的回收和數(shù)據(jù)庫連接斷連的問題(長時間沒有數(shù)據(jù)消費的話,數(shù)據(jù)庫連接會超時失效,之前的遇到過),斷連的問題也不難解決;
  • 修改到某些代碼需要手動取重啟進程,常駐進程的代碼已經(jīng)是加載到內(nèi)存里面的了,修改代碼需要重啟才生效。

3.2.2 采用yii2-queue

yii2-queue 支持了不少驅(qū)動,F(xiàn)ile, Mysql, Redis, RabbitMq等等。
這里只分析redis驅(qū)動。
基于redis的yii2-queue只實現(xiàn)了延時隊列和FIFO隊列,延時隊列采用有序集合結(jié)合列表來實現(xiàn)。
話不多說,擼源碼:

  • 先看入列:
 protected function pushMessage($message, $ttr, $delay, $priority)
    {
        // Redis驅(qū)動不支持作業(yè)優(yōu)先級 如果有傳值則拋出錯誤
        if ($priority !== null) {
            throw new NotSupportedException('Job priority is not supported in the driver.');
        }
        // 取到上條 message_id + 1 ,作為本條任務(wù)的ID
        $id = $this->redis->incr("$this->channel.message_id");
        // 以新ID將本條任務(wù)格存儲到hash表 messages 中
        $this->redis->hset("$this->channel.messages", $id, "$ttr;$message");
        if (!$delay) {
            // 如果不需要等待執(zhí)行 則將任務(wù)ID推到hash表 waiting 中
            $this->redis->lpush("$this->channel.waiting", $id);
        } else {
            // 如果需要等待執(zhí)行 則推送到有序集合 delayed 中
            $this->redis->zadd("$this->channel.delayed", time() + $delay, $id);
        }
        // 返回任務(wù)ID
        return $id;
    }
  • 再看出列:
 /**
     * @param int $wait timeout
     * @return array|null payload
     */
    protected function reserve($wait)
    {
        // 將延遲和保留的作業(yè)移動到等待列表中 并鎖定一秒
        if ($this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) {
            $this->moveExpired("$this->channel.delayed");
            $this->moveExpired("$this->channel.reserved");
        }
        // Find a new waiting message
        $id = null;
        if (!$wait) {
            // 從等待列表取1條任務(wù)ID待執(zhí)行
            $id = $this->redis->rpop("$this->channel.waiting");
        } elseif ($result = $this->redis->brpop("$this->channel.waiting", $wait)) {
            $id = $result[1];
        }
        if (!$id) {
            return null;
        }
        // 根據(jù)任務(wù)ID取出任務(wù)
        $payload = $this->redis->hget("$this->channel.messages", $id);
        list($ttr, $message) = explode(';', $payload, 2);
        // 加入到作業(yè)列表
        $this->redis->zadd("$this->channel.reserved", time() + $ttr, $id);
        $attempt = $this->redis->hincrby("$this->channel.attempts", $id, 1);
        return [$id, $message, $ttr, $attempt];
    }
  • 再看執(zhí)行進程
    yii2-queue提供了兩種方式,一種是crontab機制:
 /**
     * Runs all jobs from redis-queue.
     * It can be used as cron job.
     *
     * @return null|int exit code.
     */
    public function actionRun()
    {
        return $this->queue->run(false);
    }

一種是常駐進程機制:

 /**
     * Listens redis-queue and runs new jobs.
     * It can be used as daemon process.
     *
     * @param int $timeout number of seconds to wait a job.
     * @throws Exception when params are invalid.
     * @return null|int exit code.
     */
    public function actionListen($timeout = 3)
    {
        if (!is_numeric($timeout)) {
            throw new Exception('Timeout must be numeric.');
        }
        if ($timeout < 1) {
            throw new Exception('Timeout must be greater than zero.');
        }

        return $this->queue->run(true, $timeout);
    }

本質(zhì)上都是調(diào)用run方法,只是參數(shù)不同:

  /**
     * Listens queue and runs each job.
     *
     * @param bool $repeat whether to continue listening when queue is empty.
     * @param int $timeout number of seconds to wait for next message.
     * @return null|int exit code.
     * @internal for worker command only.
     * @since 2.0.2
     */
    public function run($repeat, $timeout = 0)
    {
        return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) {
            while ($canContinue()) {
                if (($payload = $this->reserve($timeout)) !== null) {
                    list($id, $message, $ttr, $attempt) = $payload;
                    if ($this->handleMessage($id, $message, $ttr, $attempt)) {
                        $this->delete($id);
                    }
                } elseif (!$repeat) {
                    break;
                }
            }
        });
    }

yii2-queue的缺點在于:

  • 多個隊列分組需要修改配置文件,公司的配置文件修改比較麻煩,需要運維手動去每臺機器去改;
  • 使用常駐進程依然有前述的問題;

3.2.3 采用php+crontab實現(xiàn)

采用crontab配置+php 進程實現(xiàn):


image.png

crontab定時10分鐘執(zhí)行一次,php進程只給10分鐘生命周期,即php進程只運行十分鐘便自行退出。我們有crontab的配置工具,所以用來實現(xiàn)這個模型也比較方便,如果某些進程的隊列阻塞,只需要在crontab配置多一些進程即可,例如上圖的queueName1。

這個方案可以有效解決上面常駐進程的問題,針對隊列名稱消費可以有效針對隊列增加進程,自動退出不存在內(nèi)存沒回收的問題,10分鐘不存在數(shù)據(jù)庫斷連的問題,代碼更新也只是10分鐘內(nèi)還是走舊代碼。

參考yii2-queue,隊列Job的狀態(tài)轉(zhuǎn)換如下:


image.png
  • delay 延時態(tài),即還沒到設(shè)置的時間執(zhí)行;
  • waiting 等待態(tài),等待被消費者取出;
  • reserved 就緒態(tài),已取出等待執(zhí)行;
  • done 完成態(tài),已經(jīng)被執(zhí)行,實際上這個狀態(tài)基本不存在,執(zhí)行成功就會被刪掉。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容