在寫多線程程序的時(shí)候,可能會(huì)有需求需要我們在線程之間交換數(shù)據(jù)
我們?nèi)绾卧诰€程之間實(shí)現(xiàn)安全的通信或者交換數(shù)據(jù)呢?
Queue隊(duì)列
也許將數(shù)據(jù)從一個(gè)線程發(fā)往另一個(gè)線程最安全的做法就是使用queue模塊中的Queue(隊(duì)列)了。
簡單的流程:
- 創(chuàng)建Queue實(shí)例,Queue實(shí)例會(huì)被所有的線程共享。
- put()添加元素
- get()獲取元素

import time
from queue import Queue
from threading import Thread
def producer(out_q):
for i in range(10):
out_q.put(i)
time.sleep(2)
def consumer(in_q):
while True:
data = in_q.get()
print(data)
q = Queue()
t1 = Thread(target=producer, args=(q, ))
t2 = Thread(target=consumer, args=(q, ))
t1.start()
t2.start()
Queue實(shí)例已經(jīng)擁有了所有需要的鎖,所以他們可以安全的在任意多線程之間共享。
如何對(duì)生產(chǎn)者和消費(fèi)這的關(guān)閉過程進(jìn)行同步協(xié)調(diào)?
我們可以用一個(gè)特殊的終止值,當(dāng)我們把它放入隊(duì)列,就使消費(fèi)者退出。
import time
from queue import Queue
from threading import Thread
_sentinel = object()
def producer(out_q):
for i in range(10):
out_q.put(i)
time.sleep(2)
out_q.put(_sentinel)
def consumer(in_q):
while True:
data = in_q.get()
if data is _sentinel:
in_q.put(_sentinel)
break
print(data)
q = Queue()
t1 = Thread(target=producer, args=(q, ))
t2 = Thread(target=consumer, args=(q, ))
t1.start()
t2.start()
在示例中,,當(dāng)一個(gè)消費(fèi)者收到這個(gè)退出信號(hào)之后,會(huì)退出。將終止值放回隊(duì)列是因?yàn)槿绻卸鄠€(gè)消費(fèi)者,這樣可以使其他監(jiān)聽這個(gè)隊(duì)列的其他消費(fèi)者線程也能夠接收到這個(gè)終止值。
盡管隊(duì)列是線程之間通信 的最常見的機(jī)制,但是只要添加了所需要的鎖和同步功能,就可以構(gòu)建自己的線程安全結(jié)構(gòu),最常見的做法就是將你的數(shù)據(jù)結(jié)構(gòu)和條件變量打包在一起。
下面我們構(gòu)建一個(gè)線程安全的優(yōu)先級(jí)隊(duì)列。
import heapq
import threading
class PriorityQueue(object):
def __init__(self):
self._queue = []
self._count = 0
self._cv = threading.Condition()
def put(self, item, priority):
while self._cv:
heapq.heappush(self._queue, (-priority, self._count, item))
self._count += 1
self._cv.notify()
def get(self):
with self._cv:
while len(self._queue) == 0:
self._cv.wait()
return heapq.heappop(self._queue)[-1]
通過隊(duì)列實(shí)現(xiàn)的線程之間通信是一種單向的且不確定的過程。一般來說,我們無法得知接收線程(消費(fèi)者)何時(shí)會(huì)實(shí)際接收到消息并開始工作。但是,Queue對(duì)象提供了一些基本的事件完成功能。
q.join()會(huì)等待隊(duì)列中所有的信息被消費(fèi)。
Queue對(duì)象的put()和get()都支持非阻塞和超時(shí)機(jī)制。
import queue
q = queue.Queue()
try:
data = q.get(block=False)
except queue.Empty:
pass
try:
q.put("item", block=False)
except queue.Full:
pass
try:
data = q.get(timeout=1)
except queue.Empty:
pass
可以避免特定的隊(duì)列操作上無限期的阻塞下去。用法比較靈活多變。
最后還有一些別的實(shí)用方法, 比如q.qsize()、q.full()、q.empty(),他們能夠告訴你隊(duì)列的當(dāng)前大小和狀態(tài)。但是,這些方法在多線程環(huán)境中是不可靠的。例如:對(duì)q.empty()的調(diào)用可能會(huì)告訴我們隊(duì)列是空的,但是在完成這個(gè)調(diào)用的同時(shí),另外的線程可能已經(jīng)往隊(duì)列中添加了一個(gè)元素。
所以,在編寫代碼的時(shí)候不要太過于依賴這些函數(shù)。
本文最先發(fā)布于:SavingUnhappy