為什么需要消息隊(duì)列
系統(tǒng)中引入消息隊(duì)列機(jī)制是對(duì)系統(tǒng)一個(gè)非常大的改善。例如一個(gè)web系統(tǒng)中,用戶做了某項(xiàng)操作后需要發(fā)送郵件通知到用戶郵箱中。你可以使用同步方式讓用戶等待郵件發(fā)送完成后反饋給用戶,但是這樣可能會(huì)因?yàn)榫W(wǎng)絡(luò)的不確定性造成用戶長時(shí)間的等待從而影響用戶體驗(yàn)。
有些場景下是不可能使用同步方式等待完成的,那些需要后臺(tái)花費(fèi)大量時(shí)間的操作。例如極端例子,一個(gè)在線編譯系統(tǒng)任務(wù),后臺(tái)編譯完成需要30分鐘。這種場景的設(shè)計(jì)不可能同步等待后在回饋,必須是先反饋用戶隨后異步處理完成,再等待處理完成后根據(jù)情況再此反饋用戶與否。
另外適用消息隊(duì)列的情況是那些系統(tǒng)處理能力有限的情況下,先使用隊(duì)列機(jī)制把任務(wù)暫時(shí)存放起來,系統(tǒng)再一個(gè)個(gè)輪流處理掉排隊(duì)的任務(wù)。這樣在系統(tǒng)吞吐量不足的情況下也能穩(wěn)定的處理掉高并發(fā)的任務(wù)。
消息隊(duì)列可以用來做排隊(duì)機(jī)制,只要系統(tǒng)需要用到排隊(duì)機(jī)制的地方就可以使用消息隊(duì)列來作。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from multiprocessing import Pool
import time
import random
import os
import redis
import logging
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
WORKER_QUEUE = 'worker_queue'
def do_task(queue_key, x):
x = int(x)
delay = random.randint(3, 7)
print '開始打包序號(hào): %s 任務(wù)優(yōu)先級(jí)別:%s' % (x, queue_key)
time.sleep(delay)
print '打包結(jié)束,耗時(shí) %s 秒 結(jié)果是:%s 優(yōu)先級(jí)別 %s' % (delay, x*x, queue_key)
redis_client.lpush(WORKER_QUEUE, 1)
return x*x
if __name__ == '__main__':
pros = 2
pool = Pool(processes=pros)
HIGH_QUEUE_KEY = 'high_task_queue'
NOMAL_QUEUE_KEY = 'task_queue'
LOW_QUEUE_KEY = 'low_task_queue'
"""
再創(chuàng)建一個(gè)隊(duì)列,用來檢測(cè)是否還有空余進(jìn)程, 初始化隊(duì)列是pros個(gè)值
取一個(gè),子進(jìn)程工作一個(gè)任務(wù)
配合這個(gè) redis_client.blpop(WORKER_QUEUE)
一個(gè)子進(jìn)程工作完就, 就lpush到這個(gè)WORKER_QUEUE隊(duì)列
所有子進(jìn)程都在工作,意味這個(gè)隊(duì)列是空的,那么主進(jìn)程不循環(huán)。等待空閑子進(jìn)程
"""
redis_client.delete(WORKER_QUEUE)
for i in range(0, pros):
redis_client.lpush(WORKER_QUEUE, 1)
"""
父進(jìn)程就是執(zhí)行的這個(gè)python
子進(jìn)程就是spawn出來由4個(gè)進(jìn)程組成的進(jìn)程池
下面的pool.apply_async 只是交給進(jìn)程池里面的進(jìn)程處理
父進(jìn)程不會(huì)執(zhí)行任務(wù),他只是作為一個(gè)無限循環(huán)
"""
while 1:
redis_client.blpop(WORKER_QUEUE)
queue_key, params = redis_client.blpop([HIGH_QUEUE_KEY, NOMAL_QUEUE_KEY, LOW_QUEUE_KEY])
print queue_key, params
"""
pool.apply()執(zhí)行任務(wù)是同步執(zhí)行,意思是執(zhí)行完一個(gè)任務(wù),才到下一個(gè)任務(wù),
而每次使用的進(jìn)程都是RR輪詢依次使用進(jìn)程
所以所有任務(wù)都是 9 8 7 6 5 4 3 依次使用不用進(jìn)程池里的進(jìn)程執(zhí)行
"""
# res = pool.apply(do_task, (params, ))
"""
pool.apply_async() 任務(wù)時(shí)異步執(zhí)行, 執(zhí)行完apply_async(), 無論任務(wù)執(zhí)行多久
都不會(huì)阻塞,立馬返回一個(gè)對(duì)象, 然后就交給這個(gè)進(jìn)程處理
主進(jìn)程繼續(xù)往下循環(huán)
但是這樣異步一個(gè)任務(wù)后, 主進(jìn)程繼續(xù)循環(huán),繼續(xù)塞任務(wù)到進(jìn)程池,所以某個(gè)子進(jìn)程
可能再未處理完上一個(gè)任務(wù)情況下,繼續(xù)塞了下個(gè)任務(wù)進(jìn)來。這樣就只能排隊(duì)執(zhí)行了
"""
res = pool.apply_async(do_task, (queue_key, params))
思考
問:
進(jìn)程一只阻塞再redis_client.blpop(key) 這里,如果隊(duì)列一致沒有任務(wù),進(jìn)程一直卡在這里,對(duì)CPU 內(nèi)存的影響是怎樣的?
答:
計(jì)算機(jī)硬件上使用DMA來訪問磁盤等IO,也就是請(qǐng)求發(fā)出后,CPU就不再管了,直到DMA處理器完成任務(wù),再通過中斷告訴CPU完成了。所以,單獨(dú)的一個(gè)IO時(shí)間,對(duì)CPU的占用是很少的,阻塞了就更不會(huì)占用CPU了,因?yàn)槌绦蚨疾焕^續(xù)運(yùn)行了,CPU時(shí)間交給其它線程和進(jìn)程了。雖然IO不會(huì)占用大量的CPU時(shí)間,但是非常頻繁的IO還是會(huì)非常浪費(fèi)CPU時(shí)間的,所以面對(duì)大量IO的任務(wù),有時(shí)候是需要算法來合并IO,或者通過cache來緩解IO壓力的。
blpop本質(zhì)上還是recv_from 那一套, 等待IO時(shí)交出CPU
參考
http://www.cnblogs.com/laozhbook/p/redis_queue.html
https://www.zhihu.com/question/27734728