queue
當(dāng)今市面上有很多主流的消息中間件,如老牌的ActiveMQ、RabbitMQ、ZeroMQ,炙手可熱的Kafka,還有阿里巴巴自主開發(fā)的Notify、MetaQ、RocketMQ等。這些都是大型的重量級消息隊列,通常應(yīng)用于商業(yè)生產(chǎn)環(huán)境。
Python為我們內(nèi)置了一個微型輕量級的消息隊列模塊,queue!queue模塊主要用于多生產(chǎn)者和消費者模式下的隊列實現(xiàn),特別適合多線程時的消息交換。它實現(xiàn)了常見的鎖語法,臨時阻塞線程,防止競爭,這有賴于Python對線程的支持。

image.png
queue模塊實現(xiàn)了三種隊列
FIFO:先進先出隊列,類似管道。元素只能從隊頭方向一個一個的彈出,只能從隊尾一個一個的放入。

image.png
LIFO:后進先出隊列,也就是棧。元素永遠(yuǎn)只能在棧頂出入。

image.png
priority queue:優(yōu)先級隊列,每個元素都帶有一個優(yōu)先值,值越小 的越早出去。值相同的,先進入隊列的先出去。

image.png
queue模塊定義了下面幾個類和異常
class queue.Queue(maxsize=0):
FIFO隊列構(gòu)造器。maxsize是隊列里最多能同時存在的元素個數(shù)。如果隊列滿了,則會暫時阻塞隊列,直到有消費者取走元素。maxsize的值如果小于或等于零,表示隊列元素個數(shù)不設(shè)上限,理論上可無窮個,但要小心,內(nèi)存不是無限大的,這樣可能會讓你的內(nèi)存溢出。
class queue.LifoQueue(maxsize=0)
LIFO隊列構(gòu)造器。maxsize是隊列里最多能同時放置的元素個數(shù)。如果隊列滿了,則會暫時阻塞隊列,直到有消費者取走元素。maxsize的值如果小于或等于零,表示隊列元素個數(shù)不設(shè)上限,可無窮個。
class queue.PriorityQueue(maxsize=0)
優(yōu)先級隊列構(gòu)造器。maxsize是隊列里最多能同時放置的元素個數(shù)。如果隊列滿了,則會暫時阻塞隊列,直到有消費者取走元素。maxsize的值如果小于或等于零,表示隊列元素個數(shù)不設(shè)上限,可無窮個。通常在這類隊列中,元素的優(yōu)先順序是按sorted(list(entries))[0]的結(jié)果來定義的,而元素的結(jié)構(gòu)形式通常是(priority_number, data)類型的元組。
exception queue.Empty
從空的隊列里請求元素的時候,彈出該異常。
exception queue.Full
往滿的隊列里放入元素的時候,彈出該異常。
Queue對象
三種隊列類的對象都提供了以下通用的方法:
Queue.qsize()
返回當(dāng)前隊列內(nèi)的元素的個數(shù)。注意,qsize()大于零不等于下一個get()方法一定不會被阻塞,qsize()小于maxsize也不表示下一個put()方法一定不會被阻塞。
Queue.empty()
隊列為空則返回True,否則返回False。同樣地,返回True不表示下一個put()方法一定不會被阻塞。返回False不表示下一個get()一定不會被阻塞。
Queue.full()
與empty()方法正好相反。同樣不保證下一步的操作不被阻塞。
Queue.put(item, block=True, timeout=None)
item參數(shù)表示具體要放入隊列的元素。block和timeout兩個參數(shù)配合使用。其中,如果block=True,timeout=None,隊列阻塞,直到有空槽出現(xiàn);當(dāng)block=True,timeout=正整數(shù)N,如果在等待了N秒后,隊列還沒有空槽,則彈出Full異常;如果block=False,則timeout參數(shù)被忽略,隊列有空槽則立即放入,如果沒空槽,則彈出Full異常。
Queue.put_nowait(item)
等同于put(item, False)
Queue.get(block=True, timeout=None)
從隊列內(nèi)刪除并返回一個元素。如果block=True, timeout=None,隊列會阻塞,直到有可供彈出的元素。如果timeout指定為一個正整數(shù)N,則在N秒內(nèi)如果隊列內(nèi)沒有可供彈出的元素,則拋出Empty異常。如果block=False,timeout參數(shù)會被忽略,此時隊列內(nèi)如果有元素則直接彈出,無元素可彈,則拋出Empty異常。
Queue.get_nowait()
等同于get(False).
下面的兩個方法用于跟蹤排隊的任務(wù)是否被消費者守護線程完全處理。
Queue.task_done()
表明先前的隊列任務(wù)已完成。由消費者線程使用。
Queue.join()
阻塞隊列,直到隊列內(nèi)的所有元素被獲取和處理。
當(dāng)有元素進入隊列時未完成任務(wù)的計數(shù)將增加。每當(dāng)有消費者線程調(diào)用task_done()方法表示一個任務(wù)被完成時,未完成任務(wù)的計數(shù)將減少。當(dāng)該計數(shù)變成0的時候,join()方法將不再阻塞。
下面是一個等待排隊任務(wù)如何完成的例子:
import time
import queue
import threading
def worker(i):
while True:
item = q.get()
if item is None:
print("線程%s發(fā)現(xiàn)了一個None,可以休息了^-^" % i)
break
# do_work(item)做具體的工作
time.sleep(0.5)
print("線程%s將任務(wù)<%s>完成了!" % (i, item))
# 做完后發(fā)出任務(wù)完成信號,然后繼續(xù)下一個任務(wù)
q.task_done()
if __name__ == '__main__':
num_of_threads = 5
source = [i for i in range(1, 21)] # 模擬20個任務(wù)
# 創(chuàng)建一個FIFO隊列對象,不設(shè)置上限
q = queue.Queue()
# 創(chuàng)建一個線程池
threads = []
# 創(chuàng)建指定個數(shù)的工作線程,并講他們放到線程池threads中
for i in range(1, num_of_threads+1):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
# 將任務(wù)源里的任務(wù)逐個放入隊列
for item in source:
time.sleep(0.5) # 每隔0.5秒發(fā)布一個新任務(wù)
q.put(item)
# 阻塞隊列直到隊列里的任務(wù)都完成了
q.join()
print("-----工作都完成了-----")
# 停止工作線程
for i in range(num_of_threads):
q.put(None)
for t in threads:
t.join()
print(threads)