Python的并發(fā)編程(三)-線程間通信

在寫多線程程序的時(shí)候,可能會(huì)有需求需要我們在線程之間交換數(shù)據(jù)

我們?nèi)绾卧诰€程之間實(shí)現(xiàn)安全的通信或者交換數(shù)據(jù)呢?

Queue隊(duì)列

也許將數(shù)據(jù)從一個(gè)線程發(fā)往另一個(gè)線程最安全的做法就是使用queue模塊中的Queue(隊(duì)列)了。

簡單的流程:

  1. 創(chuàng)建Queue實(shí)例,Queue實(shí)例會(huì)被所有的線程共享。
  2. put()添加元素
  3. get()獲取元素
Queue.jpg
import time
from queue import Queue
from threading import Thread


def producer(out_q):
    for i in range(10):
        out_q.put(i)
        time.sleep(2)


def consumer(in_q):
    while True:
        data = in_q.get()
        print(data)

q = Queue()
t1 = Thread(target=producer, args=(q, ))
t2 = Thread(target=consumer, args=(q, ))
t1.start()
t2.start()

Queue實(shí)例已經(jīng)擁有了所有需要的鎖,所以他們可以安全的在任意多線程之間共享。

如何對(duì)生產(chǎn)者和消費(fèi)這的關(guān)閉過程進(jìn)行同步協(xié)調(diào)?

我們可以用一個(gè)特殊的終止值,當(dāng)我們把它放入隊(duì)列,就使消費(fèi)者退出。

import time
from queue import Queue
from threading import Thread

_sentinel = object()

def producer(out_q):
    for i in range(10):
        out_q.put(i)
        time.sleep(2)
    out_q.put(_sentinel)


def consumer(in_q):
    while True:
        data = in_q.get()
        if data is _sentinel:
            in_q.put(_sentinel)
            break
        print(data)

q = Queue()
t1 = Thread(target=producer, args=(q, ))
t2 = Thread(target=consumer, args=(q, ))
t1.start()
t2.start()

在示例中,,當(dāng)一個(gè)消費(fèi)者收到這個(gè)退出信號(hào)之后,會(huì)退出。將終止值放回隊(duì)列是因?yàn)槿绻卸鄠€(gè)消費(fèi)者,這樣可以使其他監(jiān)聽這個(gè)隊(duì)列的其他消費(fèi)者線程也能夠接收到這個(gè)終止值。

盡管隊(duì)列是線程之間通信 的最常見的機(jī)制,但是只要添加了所需要的鎖和同步功能,就可以構(gòu)建自己的線程安全結(jié)構(gòu),最常見的做法就是將你的數(shù)據(jù)結(jié)構(gòu)和條件變量打包在一起。

下面我們構(gòu)建一個(gè)線程安全的優(yōu)先級(jí)隊(duì)列。

import heapq
import threading

class PriorityQueue(object):
    def __init__(self):
        self._queue = []
        self._count = 0
        self._cv = threading.Condition()
    
    def put(self, item, priority):
        while self._cv:
            heapq.heappush(self._queue, (-priority, self._count, item))
            self._count += 1
            self._cv.notify()
    
    def get(self):
        with self._cv:
            while len(self._queue) == 0:
                self._cv.wait()
            return heapq.heappop(self._queue)[-1]

通過隊(duì)列實(shí)現(xiàn)的線程之間通信是一種單向的且不確定的過程。一般來說,我們無法得知接收線程(消費(fèi)者)何時(shí)會(huì)實(shí)際接收到消息并開始工作。但是,Queue對(duì)象提供了一些基本的事件完成功能。

q.join()會(huì)等待隊(duì)列中所有的信息被消費(fèi)。

Queue對(duì)象的put()get()都支持非阻塞和超時(shí)機(jī)制。

import queue

q = queue.Queue()

try:
    data = q.get(block=False)
except queue.Empty:
    pass

try:
    q.put("item", block=False)
except queue.Full:
    pass

try:
    data = q.get(timeout=1)
except queue.Empty:
    pass

可以避免特定的隊(duì)列操作上無限期的阻塞下去。用法比較靈活多變。

最后還有一些別的實(shí)用方法, 比如q.qsize()、q.full()、q.empty(),他們能夠告訴你隊(duì)列的當(dāng)前大小和狀態(tài)。但是,這些方法在多線程環(huán)境中是不可靠的。例如:對(duì)q.empty()的調(diào)用可能會(huì)告訴我們隊(duì)列是空的,但是在完成這個(gè)調(diào)用的同時(shí),另外的線程可能已經(jīng)往隊(duì)列中添加了一個(gè)元素。

所以,在編寫代碼的時(shí)候不要太過于依賴這些函數(shù)。

本文最先發(fā)布于:SavingUnhappy

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容