1.線程的基本概念
1.1 線程
線程是應(yīng)用程序最小的執(zhí)行單元,線程與進(jìn)程類似,進(jìn)程可以看做程序的一次執(zhí)行,而線程就是這個(gè)程序的各個(gè)功能,比如打開修圖軟件,就是一個(gè)進(jìn)程,而修圖軟件的濾鏡、虛化等功能可以看做線程。一個(gè)進(jìn)程內(nèi)部可以有一個(gè)到多個(gè)線程。所有的線程運(yùn)行在一個(gè)進(jìn)程中,共享一個(gè)內(nèi)部環(huán)境,所以線程時(shí)間可以共享數(shù)據(jù)。
線程的狀態(tài)
線程有開始,順序執(zhí)行,結(jié)束三個(gè)部分。它有一個(gè)自己的指令指針,記錄自己運(yùn)行到什么地步。線程的運(yùn)行可能被搶占(中斷),或暫時(shí)被掛起(睡眠),讓其他的線程運(yùn)行,這叫讓步。 一個(gè)進(jìn)程中的各個(gè)線程之間共享同一片數(shù)據(jù)空間,所以線程之間可以比進(jìn)程之間更方便地共享數(shù)據(jù)以及相互通訊。當(dāng)然,這樣的共享并不是完全沒有危險(xiǎn)的。如果多個(gè)線程共同訪問同一片數(shù)據(jù),則由于數(shù)據(jù)訪 問的順序不一樣,有可能導(dǎo)致數(shù)據(jù)結(jié)果的不一致的問題。這叫做競態(tài)條件(race condition)。
線程一般都是并發(fā)執(zhí)行的,不過在單 CPU 的系統(tǒng)中,真正的并發(fā)是不可能的,每個(gè)線程會(huì)被安排成每次只運(yùn)行一小會(huì),然后就把 CPU 讓出來,讓其它的線程去運(yùn)行。由于有的函數(shù)會(huì)在完成之前阻塞住,在沒有特別為多線程做修改的情 況下,這種“貪婪”的函數(shù)會(huì)讓 CPU 的時(shí)間分配有所傾斜。導(dǎo)致各個(gè)線程分配到的運(yùn)行時(shí)間可能不 盡相同,不盡公平。

注:圖片來自Python 多線程-伯樂在線 - 人世間,原圖來自內(nèi)心求法博客
1.2 GIL
GIL(Global Interpreter Lock)全局解釋器鎖,這個(gè)鎖能保證同一時(shí)間內(nèi)只有一個(gè)線程運(yùn)行。
在多線程環(huán)境中,Python 虛擬機(jī)按以下方式執(zhí)行:
- 設(shè)置GIL
- 切換到一個(gè)線程去執(zhí)行
- 運(yùn)行:
a.指定數(shù)量的字節(jié)碼指令
b.線程主動(dòng)讓出控制(可以調(diào)用time.sleep(0)) - 把線程設(shè)置完睡眠狀態(tài)
- 解鎖GIL
- 再次重復(fù)以上步驟
2. threading
Python提供多線程編程的模塊有thread和threading。thread模塊提供了基本的線程和鎖的支持,而threading模塊提供了更高級別,功能更強(qiáng)的線程管理的功能。不建議使用低級別的thread模塊,更高級別的threading更為先進(jìn),對線程的支持更為完善。而且thread對于你的進(jìn)程什么時(shí)候應(yīng)該結(jié)束完全沒有控制,當(dāng)主線程結(jié)束時(shí),所有的線程都會(huì)被強(qiáng)制結(jié)束掉,沒有警告也不會(huì)有正常的清除工作。
2.1 threading模塊中的函數(shù)和類
函數(shù)有下:
- active_count():返回當(dāng)前運(yùn)行的線程對象的數(shù)目
- current_thread():返回當(dāng)前Thread對象,對應(yīng)的調(diào)用者的線程控制
- enumerate():返回當(dāng)前運(yùn)行的線程對象的列表
- main_thread():返回主要線程,一般情況下,主要線程是從Python解釋器開始的線程
類:
- Thread:表示運(yùn)行在單獨(dú)線程控制中的一個(gè)活動(dòng),一個(gè)線程的執(zhí)行對象。
- Lock:鎖原語對象,實(shí)現(xiàn)原始鎖對象的類。一旦線程已經(jīng)獲得鎖定,則隨后嘗試獲取鎖定,直到它被釋放; 任何線程都可能會(huì)釋放它。
- RLock: 可重入鎖是同步原語,可以由同一個(gè)線程多次獲取。一旦線程獲得了可重入鎖,同一個(gè)線程可能會(huì)再次獲取鎖定; 每次線程必須釋放它一次。
- Condition: 該類實(shí)現(xiàn)條件變量對象。條件變量允許一個(gè)或多個(gè)線程等待,直到被另一個(gè)線程通知。
- Event: 這是線程之間通信的最簡單的機(jī)制之一,一個(gè)線程發(fā)出一個(gè)事件,其他線程等待它
- Semaphore:一個(gè)信號量管理一個(gè)內(nèi)部計(jì)數(shù)器,它由每個(gè)acquire()調(diào)用遞減,并由每個(gè)調(diào)用遞增release()。計(jì)數(shù)器永遠(yuǎn)不會(huì)低于零;當(dāng)acquire() 發(fā)現(xiàn)它為零時(shí),它阻塞,等待直到其他一些線程調(diào)用 release()。
- Timer:這個(gè)類表示一個(gè)動(dòng)作,只有經(jīng)過一定的時(shí)間后才能運(yùn)行
2.2 threading.Thread
Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
group:應(yīng)為None
target:被run()方法調(diào)用的可調(diào)用對象,可以傳入函數(shù)等可調(diào)用對象
name:線程名
args:傳入到target的參數(shù)元組
kwargs:傳入都target的參數(shù)字典
使用Thread兩種方法,一種是創(chuàng)建Thread實(shí)例,調(diào)用start()方法;另一種是繼承Thread類,在子類中重寫run()和init()方法。
import time
import threading
def hello_thread(name):
print('Starting {}--->{}, Time: {}'.format(threading.current_thread().name, name, time.ctime()))
time.sleep(3)
print('End {}--->{}, Time: {}'.format(threading.current_thread().name, name, time.ctime()))
if __name__ == '__main__':
print('Satring {}, Time: {}'.format(threading.current_thread().name, time.ctime()))
nums = ['One', 'Two', 'Three', 'Four', 'Five']
threads = []
for n in nums:
t = threading.Thread(target=hello_thread, args=(n,))
threads.append(t)
for th in threads:
th.start()
for th in threads:
th.join()
print('End {}, Time: {}'.format(threading.current_thread().name, time.ctime()))
Satring MainThread, Time: Sun Sep 3 11:50:30 2017
Starting Thread-4--->One, Time: Sun Sep 3 11:50:30 2017
Starting Thread-5--->Two, Time: Sun Sep 3 11:50:30 2017Starting Thread-6--->Three, Time: Sun Sep 3 11:50:30 2017
Starting Thread-7--->Four, Time: Sun Sep 3 11:50:30 2017
Starting Thread-8--->Five, Time: Sun Sep 3 11:50:30 2017
End Thread-8--->Five, Time: Sun Sep 3 11:50:33 2017End Thread-6--->Three, Time: Sun Sep 3 11:50:33 2017
End Thread-7--->Four, Time: Sun Sep 3 11:50:33 2017End Thread-4--->One, Time: Sun Sep 3 11:50:33 2017End Thread-5--->Two, Time: Sun Sep 3 11:50:33 2017End MainThread, Time: Sun Sep 3 11:50:33 2017
輸出結(jié)果混在了一起,因?yàn)闃?biāo)準(zhǔn)輸出是共享資源,造成混亂,所以需要加鎖。
import time
import threading
th_lock = threading.Lock()
def hello_thread(name):
# 獲取鎖
th_lock.acquire()
print('Starting {}--->{}, Time: {}'.format(threading.current_thread().name, name, time.ctime()))
time.sleep(3)
print('End {}--->{}, Time: {}'.format(threading.current_thread().name, name, time.ctime()))
# 釋放鎖
th_lock.release()
if __name__ == '__main__':
print('Satring {}, Time: {}'.format(threading.current_thread().name, time.ctime()))
nums = ['One', 'Two', 'Three', 'Four', 'Five']
threads = []
for n in nums:
t = threading.Thread(target=hello_thread, args=(n,))
threads.append(t)
for th in threads:
th.start()
for th in threads:
th.join()
print('End {}, Time: {}'.format(threading.current_thread().name, time.ctime()))
Satring MainThread, Time: Sun Sep 3 15:24:45 2017Starting Thread-4--->One, Time: Sun Sep 3 15:24:45 2017
End Thread-4--->One, Time: Sun Sep 3 15:24:48 2017Starting Thread-5--->Two, Time: Sun Sep 3 15:24:48 2017
End Thread-5--->Two, Time: Sun Sep 3 15:24:51 2017
Starting Thread-6--->Three, Time: Sun Sep 3 15:24:51 2017
End Thread-6--->Three, Time: Sun Sep 3 15:24:54 2017
Starting Thread-7--->Four, Time: Sun Sep 3 15:24:54 2017
End Thread-7--->Four, Time: Sun Sep 3 15:24:57 2017
Starting Thread-8--->Five, Time: Sun Sep 3 15:24:57 2017
End Thread-8--->Five, Time: Sun Sep 3 15:25:00 2017End MainThread, Time: Sun Sep 3 15:25:00 2017
一個(gè)線程結(jié)束后,馬上開始新的線程。
繼承Thread.Threading類
import threading
from time import time, sleep
class MyThreading(threading.Thread):
def __init__(self, thread_id, thread_name):
threading.Thread.__init__(self)
self.thread_id = thread_id
self.thread_name = thread_name
def run(self):
print('Thread {} , Name {}, Start'.format(self.thread_name, self.thread_id))
sleep(1)
print('Thread End')
if __name__ == '__main__':
print('Begining')
t1 = MyThreading(1, 'Threading-1')
t2 = MyThreading(2, 'Threading-2')
t1.start()
t2.start()
t1.join()
t2.join()
print('All Done!')
Begining
Thread Threading-1 , Name 1, Start
Thread Threading-2 , Name 2, Start
Thread EndThread End
All Done!
外部傳入線程運(yùn)行的函數(shù)
import time
import threading
loops = ['one', 'two']
class MyThread(threading.Thread):
def __init__(self, target, args):
super(MyThread, self).__init__()
self.target = target
self.args = args
def run(self):
self.target(*self.args)
def output(nloop, nesc):
print('Start loop, "{}", at: {}'.format(nloop, time.ctime()))
time.sleep(nesc)
print('End loop, "{}", at: {}'.format(nloop, time.ctime()))
if __name__ == '__main__':
print('Main Threading')
nloop = range(len(loops))
threads = []
for i in nloop:
my_thread = MyThread(output, (loops[i], i))
threads.append(my_thread)
for th in threads:
th.start()
for th in threads:
th.join()
print('All Done')
Main ThreadingStart loop, "one", at: Sun Sep 3 16:54:43 2017
End loop, "one", at: Sun Sep 3 16:54:43 2017
Start loop, "two", at: Sun Sep 3 16:54:43 2017
End loop, "two", at: Sun Sep 3 16:54:44 2017
All Done
創(chuàng)建線程的時(shí)候傳入一個(gè)類,這樣可以使用類的強(qiáng)大功能,可以保存更多的信息,方法更靈活。
from threading import Thread
from time import sleep, ctime
loops = [4, 2]
class ThreadFunc(object):
def __init__(self, func, args, name=""):
self.name = name
self.func = func
self.args = args
def __call__(self):
# 創(chuàng)建新線程的時(shí)候,Thread 對象會(huì)調(diào)用我們的 ThreadFunc 對象,這時(shí)會(huì)用到一個(gè)特殊函數(shù) __call__()。
self.func(*self.args)
def loop(nloop, nsec):
print('start loop %s at: %s' % (nloop, ctime()))
sleep(nsec)
print('loop %s done at: %s' % (nloop, ctime()))
def main():
print('starting at:', ctime())
threads = []
nloops = range(len(loops))
for i in nloops:
t = Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
print('all DONE at:', ctime())
if __name__ == '__main__':
main()
starting at: Sun Sep 3 17:33:51 2017
start loop 0 at: Sun Sep 3 17:33:51 2017
start loop 1 at: Sun Sep 3 17:33:51 2017
loop 1 done at: Sun Sep 3 17:33:53 2017
loop 0 done at: Sun Sep 3 17:33:55 2017all DONE at:
Sun Sep 3 17:33:55 2017
總結(jié):threading.Thread()類創(chuàng)建線程,實(shí)際上就像老師給學(xué)生分配任務(wù)一樣,你做什么,他做什么,她做什么,我做什么。在Python中分配的任務(wù)以函數(shù)或者類的形式體現(xiàn),所以創(chuàng)建多線程會(huì)給threading.Thread指定一個(gè)函數(shù)或者類,相當(dāng)與指定任務(wù),傳入?yún)?shù)則相當(dāng)與老師給你一些材料,用這些材料完成任務(wù)。因此,可以看到創(chuàng)建多線程時(shí)指定函數(shù)、指定類,有的還會(huì)繼承threading.Thread,添加一些功能,再指定函數(shù)或者類。
start()方法用來啟動(dòng)線程,start()告訴run()函數(shù)運(yùn)行線程,所以繼承threading.Thread時(shí)需要重寫run()方法。join()方法用以阻塞當(dāng)前線程,就是告訴當(dāng)前線程,調(diào)用join()方法的線程不執(zhí)行完,你就不能執(zhí)行。
2.3 Lock
線程共享數(shù)據(jù),因此多個(gè)線程對同一數(shù)據(jù)修改可能會(huì)發(fā)生沖突,因此需要Lock。當(dāng)一個(gè)線程獲取Lock時(shí),相當(dāng)于告訴其他線程,數(shù)據(jù)我正在修改,你不能動(dòng),等我釋放之后,你才可以。
import time, threading
balance = 0
def change_it(n):
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(100000):
change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
5
多次執(zhí)行后,會(huì)出現(xiàn)不為0的情況,因?yàn)樾薷腷alance需要多條語句,而執(zhí)行這幾條語句時(shí),線程可能中斷,從而導(dǎo)致多個(gè)線程把同一個(gè)對象的內(nèi)容改亂了。詳情見廖雪峰Python教程
import time, threading
balance = 0
lock = threading.Lock()
def change_it(n):
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(100000):
try:
# 獲取鎖
lock.acquire()
change_it(n)
finally:
# 釋放鎖
lock.release()
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
0
2.4 Condition
條件變量對象能讓一個(gè)線程停下來,等待其他線程滿足了某個(gè)條件。條件變量允許一個(gè)或多個(gè)線程等待,直到被另一個(gè)線程通知。線程首先acquire一個(gè)條件變量鎖。如果條件不足,則該線程wait,如果滿足就執(zhí)行線程,甚至可以notify其他線程。其他處于wait狀態(tài)的線程接到通知后會(huì)重新判斷條件。
- 當(dāng)一個(gè)線程獲取鎖后,發(fā)現(xiàn)沒有相應(yīng)的資源或狀態(tài),就會(huì)調(diào)用wait阻塞,釋放已經(jīng)獲得的鎖,直到期望的資源或者狀態(tài)發(fā)生改變。
- 當(dāng)一個(gè)線程獲得了鎖,改變了資源或者狀態(tài),就會(huì)調(diào)用notify()或者notifyall()去通知其他線程。
方法:
acquire():獲得鎖
release():釋放鎖
wait([timeout]):持續(xù)等待直到被notify()或者notifyAll()通知或者超時(shí)(必須先獲得鎖)
wait():所做操作, 先釋放獲得的鎖, 然后阻塞, 知道被notify或者notifyAll喚醒或者超時(shí), 一旦被喚醒或者超時(shí), 會(huì)重新獲取鎖(應(yīng)該說搶鎖), 然后返回
notify():喚醒一個(gè)wait()阻塞的線程
notify_all()或者notifyAll():喚醒所有阻塞的線程
from threading import Thread, current_thread, Condition
from time import sleep
con = Condition()
def th_con():
with con:
for i in range(5):
print('Name: {}, Times: {}'.format(current_thread().name, i))
sleep(0.3)
if i == 3:
print('Release Lock, Wait')
# 只有獲取鎖的線程才能調(diào)用 wait() 和 notify(),因此必須在鎖釋放前調(diào)用
con.wait()
def th_con2():
with con:
for i in range(5):
print('Name: {}, Times: {}'.format(current_thread().name, i))
sleep(0.3)
if i == 3:
con.notify()
print('Notify Thread')
if __name__ == '__main__':
Thread(target=th_con, name='Thread>>>One').start()
Thread(target=th_con2, name='Thread<<<Two').start()
Name: Thread>>>One, Times: 0
Name: Thread>>>One, Times: 1
Name: Thread>>>One, Times: 2
Name: Thread>>>One, Times: 3
Release Lock, Wait
Name: Thread<<<Two, Times: 0
Name: Thread<<<Two, Times: 1
Name: Thread<<<Two, Times: 2
Name: Thread<<<Two, Times: 3
Notify Thread
Name: Thread<<<Two, Times: 4
Name: Thread>>>One, Times: 4
2.5 Event
事件用于在線程間通信。一個(gè)線程發(fā)出一個(gè)信號,其他一個(gè)或多個(gè)線程等待,調(diào)用event對象的wait方法,線程則會(huì)阻塞等待,直到別的線程set之后,才會(huì)被喚醒。
import time
import threading
class MyThread(threading.Thread):
def __init__(self, event):
super(MyThread, self).__init__()
self.event = event
def run(self):
print('Thread {} is ready'.format(self.getName()))
self.event.wait()
print('Thread {} run'.format(self.getName()))
signal = threading.Event()
def main():
start = time.time()
for i in range(3):
t = MyThread(signal)
t.start()
time.sleep(3)
print('After {}s'.format(time.time() - start))
# 將內(nèi)部標(biāo)志設(shè)置為True,等待標(biāo)識的其他線程都會(huì)被喚醒
signal.set()
if __name__ == '__main__':
main()
Thread Thread-4 is ready
Thread Thread-5 is ready
Thread Thread-6 is ready
After 3.0065603256225586sThread Thread-4 run
Thread Thread-6 run
Thread Thread-5 run
3.queue
queue用于線程間通信,讓各個(gè)線程之間共享數(shù)據(jù)。Queue實(shí)現(xiàn)的三種隊(duì)列模型:
- FIFO(先進(jìn)先出)隊(duì)列,第一加入隊(duì)列的任務(wù), 被第一個(gè)取出
- LIFO(后進(jìn)先出)隊(duì)列,最后加入隊(duì)列的任務(wù), 被第一個(gè)取出
- PriorityQueue(優(yōu)先級)隊(duì)列, 保持隊(duì)列數(shù)據(jù)有序, 最小值被先取出
queue實(shí)現(xiàn)的類和異常:
qsize():返回隊(duì)列的大致大小
empty():如果隊(duì)列為空,則返回True
full():如果隊(duì)列滿,則返回True
put():往Queue加入元素
get():從Queue中刪除并返回一個(gè)項(xiàng)目
join():阻塞一直到Queue中的所有元素被獲取和處理
task_done():表明以前入隊(duì)的任務(wù)已經(jīng)完成。由隊(duì)列消費(fèi)者線程使用。對于每個(gè)get()用于獲取任務(wù)的后續(xù)調(diào)用, task_done()告知隊(duì)列對任務(wù)的處理完成。
生產(chǎn)者和消費(fèi)者模型
某些模塊負(fù)責(zé)生產(chǎn)數(shù)據(jù),這些數(shù)據(jù)由其他模塊來負(fù)責(zé)處理(此處的模塊可能是:函數(shù)、線程、進(jìn)程等)。產(chǎn)生數(shù)據(jù)的模塊稱為生產(chǎn)者,而處理數(shù)據(jù)的模塊稱為消費(fèi)者。在生產(chǎn)者與消費(fèi)者之間的緩沖區(qū)稱之為倉庫。生產(chǎn)者負(fù)責(zé)往倉庫運(yùn)輸商品,而消費(fèi)者負(fù)責(zé)從倉庫里取出商品,這就構(gòu)成了生產(chǎn)者消費(fèi)者模式。

圖片來自用Python多線程實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式
import time
import threading
import queue
import random
class Producer(threading.Thread):
def __init__(self, name, q):
threading.Thread.__init__(self, name=name)
self.data = q
def run(self):
for i in range(10):
elem = random.randrange(100)
self.data.put(elem)
print('{} a elem {}, Now the size is {}'.format(self.getName(), elem, self.data.qsize()))
time.sleep(random.random())
print('Thread {}, {} is finished!!!'.format(threading.current_thread().name, self.getName()))
class Consumer(threading.Thread):
def __init__(self, name, q):
threading.Thread.__init__(self, name=name)
self.data = q
def run(self):
for i in range(10):
elem = self.data.get()
self.data.task_done()
print('{} a elem {}, Now the size is {}'.format(self.getName(), elem, self.data.qsize()))
time.sleep(random.random())
print('Thread {}, {} is finished!!!'.format(threading.current_thread().name, self.getName()))
def main():
print('Start Pro')
q = queue.Queue()
producer = Producer('Producer', q)
consumer = Consumer('Consumer', q)
producer.start()
consumer.start()
producer.join()
consumer.join()
# threads_pro = []
# threads_con = []
# for i in range(3):
# producer = Producer('Producer', q)
# threads_pro.append(producer)
# for i in range(3):
# consumer = Consumer('Consumer', q)
# threads_con.append(consumer)
# for th in threads_pro:
# th.start()
# for th in threads_con:
# th.start()
# for th in threads_pro:
# th.join()
# for th in threads_con:
# th.join()
print('All Done!!!')
if __name__ == '__main__':
main()
Start Pro
Producer a elem 89, Now the size is 1
Consumer a elem 89, Now the size is 0
Producer a elem 26, Now the size is 1Consumer a elem 26, Now the size is 0
Producer a elem 51, Now the size is 1Consumer a elem 51, Now the size is 0
Producer a elem 41, Now the size is 1Consumer a elem 41, Now the size is 0
Producer a elem 29, Now the size is 1Consumer a elem 29, Now the size is 0
Producer a elem 63, Now the size is 1
Consumer a elem 63, Now the size is 0
Producer a elem 56, Now the size is 1Consumer a elem 56, Now the size is 0
Producer a elem 31, Now the size is 1
Consumer a elem 31, Now the size is 0
Producer a elem 21, Now the size is 1
Consumer a elem 21, Now the size is 0
Producer a elem 67, Now the size is 1
Consumer a elem 67, Now the size is 0
Thread Producer, Producer is finished!!!
Thread Consumer, Consumer is finished!!!
All Done!!!
4.ThreadLocal
一個(gè)ThreadLocal變量雖然是全局變量,但每個(gè)線程都只能讀寫自己線程的獨(dú)立副本,互不干擾。ThreadLocal解決了參數(shù)在一個(gè)線程中各個(gè)函數(shù)之間互相傳遞的問題。它本身是一個(gè)全局變量,但是每個(gè)線程卻可以利用它來保存屬于自己的私有數(shù)據(jù),這些私有數(shù)據(jù)對其他線程也是不可見的。

圖片來自深入理解Python中的ThreadLocal變量(上)
import threading
# 創(chuàng)建全局ThreadLocal對象:
local_school = threading.local()
def process_student():
# 獲取當(dāng)前線程關(guān)聯(lián)的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 綁定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)