45-queue模塊

queue

當(dāng)今市面上有很多主流的消息中間件,如老牌的ActiveMQ、RabbitMQ、ZeroMQ,炙手可熱的Kafka,還有阿里巴巴自主開發(fā)的Notify、MetaQ、RocketMQ等。這些都是大型的重量級消息隊列,通常應(yīng)用于商業(yè)生產(chǎn)環(huán)境。

Python為我們內(nèi)置了一個微型輕量級的消息隊列模塊,queue!queue模塊主要用于多生產(chǎn)者和消費者模式下的隊列實現(xiàn),特別適合多線程時的消息交換。它實現(xiàn)了常見的鎖語法,臨時阻塞線程,防止競爭,這有賴于Python對線程的支持。

image.png

queue模塊實現(xiàn)了三種隊列

FIFO:先進先出隊列,類似管道。元素只能從隊頭方向一個一個的彈出,只能從隊尾一個一個的放入。

image.png

LIFO:后進先出隊列,也就是棧。元素永遠(yuǎn)只能在棧頂出入。

image.png

priority queue:優(yōu)先級隊列,每個元素都帶有一個優(yōu)先值,值越小 的越早出去。值相同的,先進入隊列的先出去。


image.png

queue模塊定義了下面幾個類和異常

class queue.Queue(maxsize=0):
FIFO隊列構(gòu)造器。maxsize是隊列里最多能同時存在的元素個數(shù)。如果隊列滿了,則會暫時阻塞隊列,直到有消費者取走元素。maxsize的值如果小于或等于零,表示隊列元素個數(shù)不設(shè)上限,理論上可無窮個,但要小心,內(nèi)存不是無限大的,這樣可能會讓你的內(nèi)存溢出。

class queue.LifoQueue(maxsize=0)
LIFO隊列構(gòu)造器。maxsize是隊列里最多能同時放置的元素個數(shù)。如果隊列滿了,則會暫時阻塞隊列,直到有消費者取走元素。maxsize的值如果小于或等于零,表示隊列元素個數(shù)不設(shè)上限,可無窮個。

class queue.PriorityQueue(maxsize=0)
優(yōu)先級隊列構(gòu)造器。maxsize是隊列里最多能同時放置的元素個數(shù)。如果隊列滿了,則會暫時阻塞隊列,直到有消費者取走元素。maxsize的值如果小于或等于零,表示隊列元素個數(shù)不設(shè)上限,可無窮個。通常在這類隊列中,元素的優(yōu)先順序是按sorted(list(entries))[0]的結(jié)果來定義的,而元素的結(jié)構(gòu)形式通常是(priority_number, data)類型的元組。

exception queue.Empty
從空的隊列里請求元素的時候,彈出該異常。

exception queue.Full
往滿的隊列里放入元素的時候,彈出該異常。

Queue對象

三種隊列類的對象都提供了以下通用的方法:

Queue.qsize()
返回當(dāng)前隊列內(nèi)的元素的個數(shù)。注意,qsize()大于零不等于下一個get()方法一定不會被阻塞,qsize()小于maxsize也不表示下一個put()方法一定不會被阻塞。

Queue.empty()
隊列為空則返回True,否則返回False。同樣地,返回True不表示下一個put()方法一定不會被阻塞。返回False不表示下一個get()一定不會被阻塞。

Queue.full()
與empty()方法正好相反。同樣不保證下一步的操作不被阻塞。

Queue.put(item, block=True, timeout=None)
item參數(shù)表示具體要放入隊列的元素。block和timeout兩個參數(shù)配合使用。其中,如果block=True,timeout=None,隊列阻塞,直到有空槽出現(xiàn);當(dāng)block=True,timeout=正整數(shù)N,如果在等待了N秒后,隊列還沒有空槽,則彈出Full異常;如果block=False,則timeout參數(shù)被忽略,隊列有空槽則立即放入,如果沒空槽,則彈出Full異常。

Queue.put_nowait(item)
等同于put(item, False)

Queue.get(block=True, timeout=None)
從隊列內(nèi)刪除并返回一個元素。如果block=True, timeout=None,隊列會阻塞,直到有可供彈出的元素。如果timeout指定為一個正整數(shù)N,則在N秒內(nèi)如果隊列內(nèi)沒有可供彈出的元素,則拋出Empty異常。如果block=False,timeout參數(shù)會被忽略,此時隊列內(nèi)如果有元素則直接彈出,無元素可彈,則拋出Empty異常。
Queue.get_nowait()
等同于get(False).
下面的兩個方法用于跟蹤排隊的任務(wù)是否被消費者守護線程完全處理。

Queue.task_done()
表明先前的隊列任務(wù)已完成。由消費者線程使用。

Queue.join()
阻塞隊列,直到隊列內(nèi)的所有元素被獲取和處理。
當(dāng)有元素進入隊列時未完成任務(wù)的計數(shù)將增加。每當(dāng)有消費者線程調(diào)用task_done()方法表示一個任務(wù)被完成時,未完成任務(wù)的計數(shù)將減少。當(dāng)該計數(shù)變成0的時候,join()方法將不再阻塞。

下面是一個等待排隊任務(wù)如何完成的例子:

import time
import queue
import threading

def worker(i):
    while True:
        item = q.get()
        if item is None:
            print("線程%s發(fā)現(xiàn)了一個None,可以休息了^-^" % i)
            break
        # do_work(item)做具體的工作
        time.sleep(0.5)
        print("線程%s將任務(wù)<%s>完成了!" % (i, item))
        # 做完后發(fā)出任務(wù)完成信號,然后繼續(xù)下一個任務(wù)
        q.task_done()

if __name__ == '__main__':
    num_of_threads = 5

    source = [i for i in range(1, 21)]  # 模擬20個任務(wù)

    # 創(chuàng)建一個FIFO隊列對象,不設(shè)置上限
    q = queue.Queue()
    # 創(chuàng)建一個線程池
    threads = []
    # 創(chuàng)建指定個數(shù)的工作線程,并講他們放到線程池threads中
    for i in range(1, num_of_threads+1):
        t = threading.Thread(target=worker, args=(i,))
        threads.append(t)
        t.start()

    # 將任務(wù)源里的任務(wù)逐個放入隊列
    for item in source:
        time.sleep(0.5)     # 每隔0.5秒發(fā)布一個新任務(wù)
        q.put(item)

    # 阻塞隊列直到隊列里的任務(wù)都完成了
    q.join()
    print("-----工作都完成了-----")
    # 停止工作線程
    for i in range(num_of_threads):
        q.put(None)
    for t in threads:
        t.join()
    print(threads)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對...
    cosWriter閱讀 11,656評論 1 32
  • 1.設(shè)計模式是什么? 你知道哪些設(shè)計模式,并簡要敘述?設(shè)計模式是一種編碼經(jīng)驗,就是用比較成熟的邏輯去處理某一種類型...
    龍飝閱讀 2,302評論 0 12
  • 必備的理論基礎(chǔ) 1.操作系統(tǒng)作用: 隱藏丑陋復(fù)雜的硬件接口,提供良好的抽象接口。 管理調(diào)度進程,并將多個進程對硬件...
    drfung閱讀 3,757評論 0 5
  • 在一個方法內(nèi)部定義的變量都存儲在棧中,當(dāng)這個函數(shù)運行結(jié)束后,其對應(yīng)的棧就會被回收,此時,在其方法體中定義的變量將不...
    Y了個J閱讀 4,571評論 1 14
  • iOS多線程編程 基本知識 1. 進程(process) 進程是指在系統(tǒng)中正在運行的一個應(yīng)用程序,就是一段程序的執(zhí)...
    陵無山閱讀 6,345評論 1 14

友情鏈接更多精彩內(nèi)容