Python多進(jìn)程執(zhí)行Redis優(yōu)先級(jí)隊(duì)列任務(wù)

為什么需要消息隊(duì)列

系統(tǒng)中引入消息隊(duì)列機(jī)制是對(duì)系統(tǒng)一個(gè)非常大的改善。例如一個(gè)web系統(tǒng)中,用戶做了某項(xiàng)操作后需要發(fā)送郵件通知到用戶郵箱中。你可以使用同步方式讓用戶等待郵件發(fā)送完成后反饋給用戶,但是這樣可能會(huì)因?yàn)榫W(wǎng)絡(luò)的不確定性造成用戶長時(shí)間的等待從而影響用戶體驗(yàn)。

有些場景下是不可能使用同步方式等待完成的,那些需要后臺(tái)花費(fèi)大量時(shí)間的操作。例如極端例子,一個(gè)在線編譯系統(tǒng)任務(wù),后臺(tái)編譯完成需要30分鐘。這種場景的設(shè)計(jì)不可能同步等待后在回饋,必須是先反饋用戶隨后異步處理完成,再等待處理完成后根據(jù)情況再此反饋用戶與否。

另外適用消息隊(duì)列的情況是那些系統(tǒng)處理能力有限的情況下,先使用隊(duì)列機(jī)制把任務(wù)暫時(shí)存放起來,系統(tǒng)再一個(gè)個(gè)輪流處理掉排隊(duì)的任務(wù)。這樣在系統(tǒng)吞吐量不足的情況下也能穩(wěn)定的處理掉高并發(fā)的任務(wù)。

消息隊(duì)列可以用來做排隊(duì)機(jī)制,只要系統(tǒng)需要用到排隊(duì)機(jī)制的地方就可以使用消息隊(duì)列來作。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from multiprocessing import Pool
import time
import random
import os
import redis
import logging

redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

WORKER_QUEUE = 'worker_queue'

def do_task(queue_key, x):
    x = int(x)
    delay = random.randint(3, 7)
    print '開始打包序號(hào):  %s 任務(wù)優(yōu)先級(jí)別:%s' % (x, queue_key)
    time.sleep(delay)
    print '打包結(jié)束,耗時(shí) %s 秒 結(jié)果是:%s 優(yōu)先級(jí)別 %s' % (delay, x*x, queue_key)
    redis_client.lpush(WORKER_QUEUE, 1)
    return x*x

if __name__ == '__main__':
    pros = 2
    pool = Pool(processes=pros)
    HIGH_QUEUE_KEY = 'high_task_queue'
    NOMAL_QUEUE_KEY = 'task_queue'
    LOW_QUEUE_KEY = 'low_task_queue'

    """
    再創(chuàng)建一個(gè)隊(duì)列,用來檢測(cè)是否還有空余進(jìn)程, 初始化隊(duì)列是pros個(gè)值
    取一個(gè),子進(jìn)程工作一個(gè)任務(wù)
    配合這個(gè) redis_client.blpop(WORKER_QUEUE)
    一個(gè)子進(jìn)程工作完就, 就lpush到這個(gè)WORKER_QUEUE隊(duì)列
    所有子進(jìn)程都在工作,意味這個(gè)隊(duì)列是空的,那么主進(jìn)程不循環(huán)。等待空閑子進(jìn)程
    """

    redis_client.delete(WORKER_QUEUE)
    for i in range(0, pros):
        redis_client.lpush(WORKER_QUEUE, 1)
    """
        父進(jìn)程就是執(zhí)行的這個(gè)python
        子進(jìn)程就是spawn出來由4個(gè)進(jìn)程組成的進(jìn)程池
        下面的pool.apply_async 只是交給進(jìn)程池里面的進(jìn)程處理
        父進(jìn)程不會(huì)執(zhí)行任務(wù),他只是作為一個(gè)無限循環(huán)
    """
    while 1:
        redis_client.blpop(WORKER_QUEUE)
        queue_key, params = redis_client.blpop([HIGH_QUEUE_KEY, NOMAL_QUEUE_KEY, LOW_QUEUE_KEY])
        print queue_key, params
        """
        pool.apply()執(zhí)行任務(wù)是同步執(zhí)行,意思是執(zhí)行完一個(gè)任務(wù),才到下一個(gè)任務(wù),
        而每次使用的進(jìn)程都是RR輪詢依次使用進(jìn)程
        所以所有任務(wù)都是 9 8 7 6 5 4 3 依次使用不用進(jìn)程池里的進(jìn)程執(zhí)行
        """
        # res = pool.apply(do_task, (params, ))

        """
        pool.apply_async() 任務(wù)時(shí)異步執(zhí)行, 執(zhí)行完apply_async(), 無論任務(wù)執(zhí)行多久
        都不會(huì)阻塞,立馬返回一個(gè)對(duì)象, 然后就交給這個(gè)進(jìn)程處理
        主進(jìn)程繼續(xù)往下循環(huán)
        但是這樣異步一個(gè)任務(wù)后, 主進(jìn)程繼續(xù)循環(huán),繼續(xù)塞任務(wù)到進(jìn)程池,所以某個(gè)子進(jìn)程
        可能再未處理完上一個(gè)任務(wù)情況下,繼續(xù)塞了下個(gè)任務(wù)進(jìn)來。這樣就只能排隊(duì)執(zhí)行了
        """
        res = pool.apply_async(do_task, (queue_key, params))

思考

問:
進(jìn)程一只阻塞再redis_client.blpop(key) 這里,如果隊(duì)列一致沒有任務(wù),進(jìn)程一直卡在這里,對(duì)CPU 內(nèi)存的影響是怎樣的?
答:
計(jì)算機(jī)硬件上使用DMA來訪問磁盤等IO,也就是請(qǐng)求發(fā)出后,CPU就不再管了,直到DMA處理器完成任務(wù),再通過中斷告訴CPU完成了。所以,單獨(dú)的一個(gè)IO時(shí)間,對(duì)CPU的占用是很少的,阻塞了就更不會(huì)占用CPU了,因?yàn)槌绦蚨疾焕^續(xù)運(yùn)行了,CPU時(shí)間交給其它線程和進(jìn)程了。雖然IO不會(huì)占用大量的CPU時(shí)間,但是非常頻繁的IO還是會(huì)非常浪費(fèi)CPU時(shí)間的,所以面對(duì)大量IO的任務(wù),有時(shí)候是需要算法來合并IO,或者通過cache來緩解IO壓力的。

blpop本質(zhì)上還是recv_from 那一套, 等待IO時(shí)交出CPU

參考

http://www.cnblogs.com/laozhbook/p/redis_queue.html
https://www.zhihu.com/question/27734728

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • word直接復(fù)制來了,格式就不改了。至于這門課怎么復(fù)習(xí),只要平時(shí)實(shí)驗(yàn)都認(rèn)真完成、報(bào)告認(rèn)真寫,平時(shí)分都很高;考試的話...
    Jozhn閱讀 4,906評(píng)論 0 8
  • 又來到了一個(gè)老生常談的問題,應(yīng)用層軟件開發(fā)的程序員要不要了解和深入學(xué)習(xí)操作系統(tǒng)呢? 今天就這個(gè)問題開始,來談?wù)劜?..
    tangsl閱讀 4,320評(píng)論 0 23
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評(píng)論 19 139
  • ——最美好的事情就是認(rèn)識(shí)了你,并且想和你一直像這樣美好的走下去。 記得我們認(rèn)識(shí)是在高一的時(shí)候,不知道那是...
    筱筱的星辰閱讀 607評(píng)論 0 2
  • 我喜歡拍照
    欍人_閱讀 284評(píng)論 0 0

友情鏈接更多精彩內(nèi)容