前言
回顧之前講了python語(yǔ)法編程 ,關(guān)于從零入門python的第一遍,編程語(yǔ)法必修內(nèi)容,比如python3基礎(chǔ)入門,列表與元組,字符串,字典,條件丶循環(huán)和其他語(yǔ)句丶函數(shù)丶面向?qū)ο筘籍惓:臀募幚砗途W(wǎng)絡(luò)編程篇
1.跟我一起從零開始學(xué)python(一)編程語(yǔ)法必修
2.跟我一起從零開始學(xué)python(二)網(wǎng)絡(luò)編程
本篇講:python并發(fā)編程:多線程/多進(jìn)程/協(xié)程
本系列文根據(jù)以下學(xué)習(xí)路線展開講述,由于內(nèi)容較多:
從零開始學(xué)python到高級(jí)進(jìn)階路線圖

第一章:多線程
1.線程和進(jìn)程
線程和進(jìn)程是操作系統(tǒng)中的兩個(gè)重要概念,它們都是并發(fā)編程的基礎(chǔ)。線程是操作系統(tǒng)能夠進(jìn)行運(yùn)算調(diào)度的最小單位,而進(jìn)程則是操作系統(tǒng)進(jìn)行資源分配和調(diào)度的基本單位。
線程和進(jìn)程的區(qū)別:
- 線程是進(jìn)程的一部分,一個(gè)進(jìn)程可以包含多個(gè)線程,而一個(gè)線程只能屬于一個(gè)進(jìn)程。
- 進(jìn)程擁有獨(dú)立的內(nèi)存空間,而線程共享進(jìn)程的內(nèi)存空間。
- 進(jìn)程之間的通信需要使用IPC(Inter-Process Communication)機(jī)制,而線程之間可以直接共享數(shù)據(jù)。
- 進(jìn)程的創(chuàng)建和銷毀比線程慢,因?yàn)檫M(jìn)程需要分配和釋放獨(dú)立的內(nèi)存空間,而線程只需要分配和釋放一些寄存器和??臻g。
在Python中,可以使用threading模塊來(lái)創(chuàng)建和管理線程。下面是一個(gè)簡(jiǎn)單的線程示例:
import threading
def worker():
print('Worker thread started')
# do some work here
print('Worker thread finished')
# create a new thread
t = threading.Thread(target=worker)
# start the thread
t.start()
# wait for the thread to finish
t.join()
在這個(gè)示例中,我們創(chuàng)建了一個(gè)名為worker的函數(shù),它將在一個(gè)新的線程中運(yùn)行。我們使用threading.Thread類創(chuàng)建了一個(gè)新的線程對(duì)象,并將worker函數(shù)作為目標(biāo)傳遞給它。然后,我們使用start()方法啟動(dòng)線程,并使用join()方法等待線程完成。
2.使用線程
在Python中,使用線程可以通過(guò)threading模塊來(lái)實(shí)現(xiàn)。下面是一個(gè)簡(jiǎn)單的例子,展示了如何使用線程:
import threading
def worker():
"""線程執(zhí)行的任務(wù)"""
print("Worker thread started")
# 執(zhí)行一些任務(wù)
print("Worker thread finished")
# 創(chuàng)建線程
t = threading.Thread(target=worker)
# 啟動(dòng)線程
t.start()
# 主線程繼續(xù)執(zhí)行其他任務(wù)
print("Main thread finished")
在上面的例子中,我們首先定義了一個(gè)worker函數(shù),它將在一個(gè)單獨(dú)的線程中執(zhí)行。然后,我們使用threading.Thread類創(chuàng)建了一個(gè)新的線程,并將worker函數(shù)作為參數(shù)傳遞給它。最后,我們調(diào)用start方法來(lái)啟動(dòng)線程。
注意,線程是異步執(zhí)行的,因此主線程不會(huì)等待線程完成。在上面的例子中,主線程會(huì)立即繼續(xù)執(zhí)行,輸出Main thread finished。如果我們希望等待線程完成后再繼續(xù)執(zhí)行主線程,可以使用join方法:
# 等待線程完成
t.join()
# 主線程繼續(xù)執(zhí)行其他任務(wù)
print("Main thread finished")
在上面的代碼中,我們?cè)趩?dòng)線程后調(diào)用了t.join()方法,這將阻塞主線程,直到線程完成。然后,主線程才會(huì)繼續(xù)執(zhí)行。
3.多線程全局變量
在多線程編程中,多個(gè)線程可以共享全局變量。但是需要注意的是,多個(gè)線程同時(shí)對(duì)同一個(gè)全局變量進(jìn)行讀寫操作時(shí),可能會(huì)出現(xiàn)數(shù)據(jù)競(jìng)爭(zhēng)(Data Race)的問(wèn)題,導(dǎo)致程序出現(xiàn)不可預(yù)期的結(jié)果。
為了避免數(shù)據(jù)競(jìng)爭(zhēng),可以使用線程鎖(Thread Lock)來(lái)保證同一時(shí)刻只有一個(gè)線程可以訪問(wèn)共享變量。Python中提供了Lock、RLock、Semaphore等多種鎖機(jī)制,可以根據(jù)實(shí)際需求選擇合適的鎖。
下面是一個(gè)使用Lock來(lái)保證多線程共享全局變量安全的示例代碼:
import threading
# 定義全局變量
count = 0
# 定義線程鎖
lock = threading.Lock()
# 定義線程函數(shù)
def add():
global count
for i in range(100000):
# 獲取鎖
lock.acquire()
count += 1
# 釋放鎖
lock.release()
# 創(chuàng)建兩個(gè)線程
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)
# 啟動(dòng)線程
t1.start()
t2.start()
# 等待線程執(zhí)行完畢
t1.join()
t2.join()
# 輸出結(jié)果
print(count)
在上面的代碼中,我們定義了一個(gè)全局變量count,并使用Lock來(lái)保證多個(gè)線程對(duì)count進(jìn)行讀寫操作時(shí)的安全性。在每個(gè)線程中,我們首先獲取鎖,然后對(duì)count進(jìn)行加1操作,最后釋放鎖。這樣就可以保證同一時(shí)刻只有一個(gè)線程可以訪問(wèn)count,避免了數(shù)據(jù)競(jìng)爭(zhēng)的問(wèn)題。
需要注意的是,使用鎖會(huì)帶來(lái)一定的性能損失,因?yàn)槊看潍@取鎖和釋放鎖都需要一定的時(shí)間。因此,在實(shí)際應(yīng)用中,需要根據(jù)實(shí)際情況來(lái)選擇合適的鎖機(jī)制,避免過(guò)度使用鎖導(dǎo)致程序性能下降。
4.共享全局變量所帶來(lái)的問(wèn)題
在多線程編程中,多個(gè)線程可以共享全局變量。但是,共享全局變量也會(huì)帶來(lái)一些問(wèn)題:
競(jìng)爭(zhēng)條件:當(dāng)多個(gè)線程同時(shí)訪問(wèn)和修改同一個(gè)全局變量時(shí),可能會(huì)出現(xiàn)競(jìng)爭(zhēng)條件,導(dǎo)致程序出現(xiàn)不可預(yù)測(cè)的結(jié)果。
數(shù)據(jù)不一致:當(dāng)多個(gè)線程同時(shí)修改同一個(gè)全局變量時(shí),可能會(huì)導(dǎo)致數(shù)據(jù)不一致的問(wèn)題,即某些線程看到的變量值與其他線程看到的不同。
死鎖:當(dāng)多個(gè)線程同時(shí)等待對(duì)方釋放某個(gè)資源時(shí),可能會(huì)出現(xiàn)死鎖的情況,導(dǎo)致程序無(wú)法繼續(xù)執(zhí)行。
因此,在多線程編程中,需要注意對(duì)共享全局變量的訪問(wèn)和修改,避免出現(xiàn)上述問(wèn)題??梢允褂面i、條件變量等機(jī)制來(lái)保證線程之間的同步和互斥。
5.解決線程同時(shí)修改全局變量的方式
在多線程編程中,共享全局變量可能會(huì)帶來(lái)一些問(wèn)題,例如:
競(jìng)爭(zhēng)條件:多個(gè)線程同時(shí)修改同一個(gè)全局變量,可能會(huì)導(dǎo)致數(shù)據(jù)不一致或者出現(xiàn)意料之外的結(jié)果。
死鎖:多個(gè)線程同時(shí)等待對(duì)方釋放資源,導(dǎo)致程序無(wú)法繼續(xù)執(zhí)行。
為了解決這些問(wèn)題,可以采用以下方式:
使用鎖:在訪問(wèn)共享變量時(shí),使用鎖來(lái)保證同一時(shí)刻只有一個(gè)線程可以修改變量。Python中提供了threading模塊中的Lock類來(lái)實(shí)現(xiàn)鎖。
使用線程安全的數(shù)據(jù)結(jié)構(gòu):Python中提供了一些線程安全的數(shù)據(jù)結(jié)構(gòu),例如Queue、deque等,可以在多線程環(huán)境下安全地訪問(wèn)和修改數(shù)據(jù)。
使用局部變量:將全局變量作為參數(shù)傳遞給線程函數(shù),讓線程函數(shù)在局部變量上進(jìn)行操作,避免多個(gè)線程同時(shí)修改同一個(gè)全局變量。
使用線程本地存儲(chǔ):Python中提供了threading模塊中的local類,可以在每個(gè)線程中創(chuàng)建一個(gè)獨(dú)立的變量,避免多個(gè)線程之間共享變量。
6.互斥鎖
在多線程編程中,互斥鎖是一種常用的同步機(jī)制,用于保護(hù)共享資源,防止多個(gè)線程同時(shí)修改同一個(gè)變量導(dǎo)致數(shù)據(jù)不一致的問(wèn)題。
互斥鎖的基本思想是,在訪問(wèn)共享資源之前,先獲取鎖,如果鎖已經(jīng)被其他線程獲取,則當(dāng)前線程會(huì)被阻塞,直到鎖被釋放為止。在訪問(wèn)完共享資源之后,釋放鎖,讓其他線程可以獲取鎖并訪問(wèn)共享資源。
在Python中,可以使用threading模塊中的Lock類來(lái)實(shí)現(xiàn)互斥鎖。Lock類有兩個(gè)基本方法:
-
acquire([blocking]):獲取鎖,如果鎖已經(jīng)被其他線程獲取,則當(dāng)前線程會(huì)被阻塞。如果blocking為False,則獲取鎖失敗時(shí)會(huì)立即返回False,而不是阻塞等待。 -
release():釋放鎖,讓其他線程可以獲取鎖并訪問(wèn)共享資源。
下面是一個(gè)使用互斥鎖的例子:
import threading
# 共享變量
count = 0
# 創(chuàng)建互斥鎖
lock = threading.Lock()
# 線程函數(shù)
def worker():
global count
for i in range(100000):
# 獲取鎖
lock.acquire()
try:
count += 1
finally:
# 釋放鎖
lock.release()
# 創(chuàng)建多個(gè)線程
threads = []
for i in range(10):
t = threading.Thread(target=worker)
threads.append(t)
# 啟動(dòng)線程
for t in threads:
t.start()
# 等待所有線程執(zhí)行完畢
for t in threads:
t.join()
# 輸出結(jié)果
print(count)
在上面的例子中,我們創(chuàng)建了一個(gè)共享變量count,并使用互斥鎖來(lái)保護(hù)它。在每個(gè)線程中,我們先獲取鎖,然后修改count的值,最后釋放鎖。這樣就可以保證多個(gè)線程不會(huì)同時(shí)修改count的值,從而避免了數(shù)據(jù)不一致的問(wèn)題。最后輸出count的值,可以看到它的值為1000000,符合預(yù)期。
7.死鎖
死鎖是指兩個(gè)或多個(gè)線程在執(zhí)行過(guò)程中,因爭(zhēng)奪資源而造成的一種互相等待的現(xiàn)象,若無(wú)外力干涉那它們都將無(wú)法繼續(xù)執(zhí)行下去。在多線程編程中,死鎖是一種常見的問(wèn)題,需要特別注意。
死鎖的產(chǎn)生通常需要滿足以下四個(gè)條件:
- 互斥條件:一個(gè)資源每次只能被一個(gè)線程使用。
- 請(qǐng)求與保持條件:一個(gè)線程因請(qǐng)求資源而阻塞時(shí),對(duì)已獲得的資源保持不放。
- 不剝奪條件:線程已獲得的資源,在未使用完之前,不能被其他線程強(qiáng)行剝奪,只能由該線程自己釋放。
- 循環(huán)等待條件:若干線程之間形成一種頭尾相接的循環(huán)等待資源的關(guān)系。
為了避免死鎖的產(chǎn)生,可以采用以下幾種方式:
- 避免使用多個(gè)鎖,盡量使用一個(gè)鎖來(lái)控制多個(gè)資源的訪問(wèn)。
- 避免持有鎖的時(shí)間過(guò)長(zhǎng),盡量縮短鎖的持有時(shí)間。
- 避免循環(huán)等待,盡量按照固定的順序獲取鎖。
- 使用超時(shí)機(jī)制,當(dāng)?shù)却龝r(shí)間超過(guò)一定時(shí)間后,自動(dòng)釋放鎖,避免長(zhǎng)時(shí)間等待造成的死鎖。
8.線程池
線程池是一種線程管理技術(shù),它可以在程序啟動(dòng)時(shí)創(chuàng)建一定數(shù)量的線程,放入一個(gè)池中,當(dāng)需要使用線程時(shí),就從池中取出一個(gè)線程執(zhí)行任務(wù),任務(wù)執(zhí)行完畢后,線程并不會(huì)被銷毀,而是放回池中等待下一次任務(wù)的到來(lái)。
使用線程池可以避免頻繁創(chuàng)建和銷毀線程的開銷,提高程序的性能和效率。在Python中,可以使用標(biāo)準(zhǔn)庫(kù)中的concurrent.futures模塊來(lái)實(shí)現(xiàn)線程池。
下面是一個(gè)簡(jiǎn)單的線程池示例:
import concurrent.futures
import time
def worker(num):
print(f"Thread-{num} started")
time.sleep(1)
print(f"Thread-{num} finished")
if __name__ == '__main__':
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
for i in range(5):
executor.submit(worker, i)
在這個(gè)示例中,我們創(chuàng)建了一個(gè)包含3個(gè)線程的線程池,然后提交了5個(gè)任務(wù)給線程池執(zhí)行。由于線程池中只有3個(gè)線程,因此只有3個(gè)任務(wù)會(huì)同時(shí)執(zhí)行,其余的任務(wù)會(huì)等待空閑線程的出現(xiàn)。
輸出結(jié)果如下:
Thread-0 started
Thread-1 started
Thread-2 started
Thread-0 finished
Thread-3 started
Thread-1 finished
Thread-4 started
Thread-2 finished
Thread-3 finished
Thread-4 finished
可以看到,線程池中的3個(gè)線程依次執(zhí)行了5個(gè)任務(wù),任務(wù)的執(zhí)行順序與提交的順序無(wú)關(guān)。
第二章:多進(jìn)程
1.進(jìn)程的狀態(tài)
在操作系統(tǒng)中,進(jìn)程有以下幾種狀態(tài):
就緒狀態(tài)(Ready):進(jìn)程已經(jīng)準(zhǔn)備好運(yùn)行,等待分配CPU時(shí)間片。
運(yùn)行狀態(tài)(Running):進(jìn)程正在運(yùn)行,占用CPU時(shí)間片。
阻塞狀態(tài)(Blocked):進(jìn)程因?yàn)槟承┰驘o(wú)法繼續(xù)執(zhí)行,例如等待I/O操作完成或等待某個(gè)資源的釋放。
掛起狀態(tài)(Suspended):進(jìn)程被暫停,不再占用CPU時(shí)間片,但是它的狀態(tài)信息仍然保存在內(nèi)存中。
終止?fàn)顟B(tài)(Terminated):進(jìn)程已經(jīng)完成執(zhí)行或被強(qiáng)制終止。
進(jìn)程的狀態(tài)轉(zhuǎn)換通常是由操作系統(tǒng)內(nèi)核進(jìn)行控制的,例如當(dāng)一個(gè)進(jìn)程等待I/O操作完成時(shí),操作系統(tǒng)會(huì)將該進(jìn)程的狀態(tài)從就緒狀態(tài)轉(zhuǎn)換為阻塞狀態(tài),當(dāng)I/O操作完成后,操作系統(tǒng)會(huì)將該進(jìn)程的狀態(tài)從阻塞狀態(tài)轉(zhuǎn)換為就緒狀態(tài),等待分配CPU時(shí)間片。
2.線程的創(chuàng)建-multiprocessing
在 Python 中,可以使用 multiprocessing 模塊來(lái)創(chuàng)建多進(jìn)程。multiprocessing 模塊提供了一個(gè) Process 類,可以用來(lái)創(chuàng)建進(jìn)程。下面是一個(gè)簡(jiǎn)單的例子:
import multiprocessing
def worker():
"""子進(jìn)程要執(zhí)行的任務(wù)"""
print('Worker')
if __name__ == '__main__':
# 創(chuàng)建子進(jìn)程
p = multiprocessing.Process(target=worker)
# 啟動(dòng)子進(jìn)程
p.start()
# 等待子進(jìn)程結(jié)束
p.join()
在上面的例子中,我們首先定義了一個(gè) worker 函數(shù),它是子進(jìn)程要執(zhí)行的任務(wù)。然后,我們使用 multiprocessing.Process 類創(chuàng)建了一個(gè)子進(jìn)程,并將 worker 函數(shù)作為參數(shù)傳遞給了 Process 類的構(gòu)造函數(shù)。接著,我們調(diào)用 start 方法啟動(dòng)子進(jìn)程,最后調(diào)用 join 方法等待子進(jìn)程結(jié)束。
需要注意的是,在 Windows 系統(tǒng)中,由于 multiprocessing 模塊使用了 fork 系統(tǒng)調(diào)用,而 Windows 不支持 fork,因此需要在 if __name__ == '__main__': 語(yǔ)句中調(diào)用子進(jìn)程的代碼。這是因?yàn)樵?Windows 中,每個(gè)進(jìn)程都會(huì)執(zhí)行一遍程序的所有代碼,而 if __name__ == '__main__': 語(yǔ)句可以保證子進(jìn)程只會(huì)執(zhí)行指定的代碼。
3.進(jìn)程丶線程對(duì)比
進(jìn)程和線程都是實(shí)現(xiàn)并發(fā)編程的方式,但是它們有以下不同點(diǎn):
資源占用:進(jìn)程擁有獨(dú)立的內(nèi)存空間,而線程共享進(jìn)程的內(nèi)存空間。因此,創(chuàng)建進(jìn)程的開銷比創(chuàng)建線程大,同時(shí)進(jìn)程間的通信也比線程間的通信復(fù)雜。
并發(fā)性:由于線程共享進(jìn)程的內(nèi)存空間,因此線程間的通信和數(shù)據(jù)共享比進(jìn)程間的通信和數(shù)據(jù)共享更容易。同時(shí),線程的切換比進(jìn)程的切換更快,因此線程的并發(fā)性比進(jìn)程高。
安全性:由于線程共享進(jìn)程的內(nèi)存空間,因此多個(gè)線程同時(shí)訪問(wèn)同一塊內(nèi)存時(shí),可能會(huì)出現(xiàn)競(jìng)爭(zhēng)條件,導(dǎo)致數(shù)據(jù)不一致或者程序崩潰。而進(jìn)程之間的內(nèi)存空間是獨(dú)立的,因此進(jìn)程間的數(shù)據(jù)不會(huì)相互影響。
編程復(fù)雜度:由于進(jìn)程間的通信和數(shù)據(jù)共享比較復(fù)雜,因此編寫多進(jìn)程程序的復(fù)雜度比編寫多線程程序的復(fù)雜度高。
總的來(lái)說(shuō),進(jìn)程適合于CPU密集型任務(wù),而線程適合于IO密集型任務(wù)。在實(shí)際應(yīng)用中,需要根據(jù)具體的場(chǎng)景選擇合適的并發(fā)編程方式。
4.進(jìn)程間的通信-Queue
在多進(jìn)程編程中,不同進(jìn)程之間的數(shù)據(jù)是無(wú)法直接共享的,因?yàn)槊總€(gè)進(jìn)程都有自己獨(dú)立的內(nèi)存空間。因此,為了實(shí)現(xiàn)進(jìn)程間的通信,我們需要使用一些特殊的機(jī)制。
其中,最常用的進(jìn)程間通信方式是使用隊(duì)列(Queue)。隊(duì)列是一種先進(jìn)先出(FIFO)的數(shù)據(jù)結(jié)構(gòu),可以用來(lái)在多個(gè)進(jìn)程之間傳遞數(shù)據(jù)。
在Python中,我們可以使用multiprocessing模塊中的Queue類來(lái)實(shí)現(xiàn)進(jìn)程間通信。Queue類提供了put()和get()方法,用于向隊(duì)列中添加數(shù)據(jù)和從隊(duì)列中取出數(shù)據(jù)。
下面是一個(gè)簡(jiǎn)單的例子,演示了如何使用Queue實(shí)現(xiàn)進(jìn)程間通信:
from multiprocessing import Process, Queue
def worker(q):
while True:
item = q.get()
if item is None:
break
print(item)
if __name__ == '__main__':
q = Queue()
p = Process(target=worker, args=(q,))
p.start()
for i in range(10):
q.put(i)
q.put(None)
p.join()
在這個(gè)例子中,我們創(chuàng)建了一個(gè)進(jìn)程p,它的工作是從隊(duì)列q中取出數(shù)據(jù)并打印出來(lái)。主進(jìn)程向隊(duì)列中添加了10個(gè)數(shù)據(jù),然后再添加一個(gè)None,表示數(shù)據(jù)已經(jīng)全部添加完畢。最后,主進(jìn)程等待進(jìn)程p執(zhí)行完畢。
需要注意的是,當(dāng)我們向隊(duì)列中添加數(shù)據(jù)時(shí),如果隊(duì)列已滿,put()方法會(huì)阻塞,直到隊(duì)列中有空閑位置。同樣地,當(dāng)我們從隊(duì)列中取出數(shù)據(jù)時(shí),如果隊(duì)列為空,get()方法也會(huì)阻塞,直到隊(duì)列中有數(shù)據(jù)可取。
除了Queue之外,Python中還提供了一些其他的進(jìn)程間通信方式,比如Pipe、Value和Array等。這些方式各有特點(diǎn),可以根據(jù)具體的需求選擇合適的方式
5.進(jìn)程池的創(chuàng)建-pool
在Python中,我們可以使用multiprocessing模塊中的Pool類來(lái)創(chuàng)建進(jìn)程池。進(jìn)程池是一組可重用的進(jìn)程,可以在需要時(shí)分配給任務(wù)。這樣可以避免頻繁地創(chuàng)建和銷毀進(jìn)程,從而提高程序的效率。
下面是一個(gè)使用進(jìn)程池的例子:
import multiprocessing
def worker(num):
"""進(jìn)程池中的任務(wù)"""
print('Worker %d is running' % num)
if __name__ == '__main__':
# 創(chuàng)建進(jìn)程池,池中有3個(gè)進(jìn)程
pool = multiprocessing.Pool(processes=3)
# 向進(jìn)程池中添加任務(wù)
for i in range(5):
pool.apply_async(worker, args=(i,))
# 關(guān)閉進(jìn)程池,不再接受新的任務(wù)
pool.close()
# 等待所有任務(wù)完成
pool.join()
print('All workers done.')
在這個(gè)例子中,我們首先創(chuàng)建了一個(gè)進(jìn)程池,池中有3個(gè)進(jìn)程。然后向進(jìn)程池中添加了5個(gè)任務(wù),每個(gè)任務(wù)都是調(diào)用worker函數(shù)。最后,我們關(guān)閉了進(jìn)程池,并等待所有任務(wù)完成。
需要注意的是,進(jìn)程池中的任務(wù)必須是可序列化的,因?yàn)檫M(jìn)程池會(huì)將任務(wù)發(fā)送給子進(jìn)程執(zhí)行。如果任務(wù)中包含不可序列化的對(duì)象,會(huì)導(dǎo)致進(jìn)程池?zé)o法正常工作。
第三章:協(xié)程
協(xié)程是一種輕量級(jí)的線程,也稱為微線程或者用戶級(jí)線程。協(xié)程的特點(diǎn)是在一個(gè)線程中,可以有多個(gè)協(xié)程,協(xié)程之間可以相互切換,從而實(shí)現(xiàn)并發(fā)執(zhí)行。
在Python中,協(xié)程是通過(guò)生成器實(shí)現(xiàn)的。通過(guò)yield關(guān)鍵字,可以將一個(gè)函數(shù)變成一個(gè)生成器,從而實(shí)現(xiàn)協(xié)程的功能。在協(xié)程中,可以使用yield關(guān)鍵字來(lái)暫停函數(shù)的執(zhí)行,并返回一個(gè)值給調(diào)用者。當(dāng)協(xié)程再次被調(diào)用時(shí),可以從上一次暫停的位置繼續(xù)執(zhí)行。
Python中的協(xié)程有兩種實(shí)現(xiàn)方式:使用生成器實(shí)現(xiàn)的協(xié)程和使用async/await關(guān)鍵字實(shí)現(xiàn)的協(xié)程。
使用生成器實(shí)現(xiàn)的協(xié)程:
def coroutine():
while True:
value = yield
print('Received value:', value)
c = coroutine()
next(c) # 啟動(dòng)協(xié)程
c.send(10) # 發(fā)送值給協(xié)程
使用async/await關(guān)鍵字實(shí)現(xiàn)的協(xié)程:
import asyncio
async def coroutine():
while True:
value = await asyncio.sleep(1)
print('Received value:', value)
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine())
在使用async/await關(guān)鍵字實(shí)現(xiàn)的協(xié)程中,需要使用asyncio模塊提供的事件循環(huán)來(lái)運(yùn)行協(xié)程。在協(xié)程中,可以使用await關(guān)鍵字來(lái)暫停函數(shù)的執(zhí)行,并等待一個(gè)異步操作完成。當(dāng)異步操作完成后,協(xié)程會(huì)從await語(yǔ)句處繼續(xù)執(zhí)行。
協(xié)程的優(yōu)點(diǎn)是可以避免線程切換的開銷,從而提高程序的性能。同時(shí),協(xié)程也可以避免線程之間的競(jìng)爭(zhēng)條件和死鎖問(wèn)題。但是,協(xié)程也有一些缺點(diǎn),例如不能利用多核CPU的優(yōu)勢(shì),以及不能進(jìn)行阻塞式IO操作。
1.協(xié)程的意義
協(xié)程是一種輕量級(jí)的線程,可以在單個(gè)線程中實(shí)現(xiàn)并發(fā)。與線程相比,協(xié)程的切換開銷更小,可以更高效地利用CPU資源。協(xié)程的意義在于:
提高程序的并發(fā)性能:協(xié)程可以在單個(gè)線程中實(shí)現(xiàn)并發(fā),避免了線程切換的開銷,提高了程序的并發(fā)性能。
簡(jiǎn)化編程模型:協(xié)程可以使用同步的編程模型,避免了復(fù)雜的線程同步問(wèn)題,使得編程更加簡(jiǎn)單。
支持高并發(fā):協(xié)程可以支持大量的并發(fā)任務(wù),可以用于高并發(fā)的網(wǎng)絡(luò)編程、爬蟲等場(chǎng)景。
提高代碼可讀性:協(xié)程可以使用同步的編程模型,代碼可讀性更高,易于維護(hù)。
總之,協(xié)程是一種高效、簡(jiǎn)單、可讀性強(qiáng)的并發(fā)編程模型,可以提高程序的并發(fā)性能,支持高并發(fā),簡(jiǎn)化編程模型。
2.asyncio事件循環(huán)
在Python中,協(xié)程是一種輕量級(jí)的并發(fā)編程方式,它可以在單線程中實(shí)現(xiàn)并發(fā)執(zhí)行。協(xié)程的意義在于可以提高程序的并發(fā)性能,減少線程切換的開銷,同時(shí)也可以簡(jiǎn)化編程模型,使得代碼更加易于理解和維護(hù)。
在Python 3.4及以上版本中,標(biāo)準(zhǔn)庫(kù)中提供了asyncio模塊,它是Python中實(shí)現(xiàn)協(xié)程的主要方式之一。asyncio模塊提供了一個(gè)事件循環(huán)(Event Loop),它可以在單線程中實(shí)現(xiàn)多個(gè)協(xié)程的并發(fā)執(zhí)行。事件循環(huán)會(huì)不斷地從協(xié)程隊(duì)列中取出協(xié)程并執(zhí)行,當(dāng)協(xié)程遇到IO操作時(shí),會(huì)自動(dòng)掛起并切換到其他協(xié)程執(zhí)行,等待IO操作完成后再恢復(fù)執(zhí)行。
asyncio事件循環(huán)的使用方式如下:
1.創(chuàng)建一個(gè)事件循環(huán)對(duì)象:
import asyncio
loop = asyncio.get_event_loop()
2.將協(xié)程對(duì)象加入事件循環(huán)中:
async def coroutine():
# 協(xié)程代碼
loop.run_until_complete(coroutine())
3.啟動(dòng)事件循環(huán):
loop.run_forever()
在事件循環(huán)中,可以使用async/await關(guān)鍵字定義協(xié)程函數(shù),使用asyncio模塊提供的各種方法實(shí)現(xiàn)協(xié)程之間的通信和協(xié)作。例如,可以使用asyncio.sleep()方法實(shí)現(xiàn)協(xié)程的延時(shí)操作,使用asyncio.wait()方法等待多個(gè)協(xié)程的完成等。
3.await關(guān)鍵字
在Python中,await是一個(gè)關(guān)鍵字,用于等待一個(gè)協(xié)程完成。當(dāng)一個(gè)協(xié)程調(diào)用另一個(gè)協(xié)程時(shí),它可以使用await關(guān)鍵字來(lái)等待另一個(gè)協(xié)程完成并返回結(jié)果。在等待期間,當(dāng)前協(xié)程會(huì)被掛起,直到被等待的協(xié)程完成。
例如,假設(shè)有兩個(gè)協(xié)程A和B,A需要等待B完成后才能繼續(xù)執(zhí)行。在協(xié)程A中,可以使用await關(guān)鍵字來(lái)等待協(xié)程B完成:
async def coroutine_b():
# 協(xié)程B的代碼
async def coroutine_a():
# 協(xié)程A的代碼
result = await coroutine_b()
# 繼續(xù)執(zhí)行協(xié)程A的代碼
在這個(gè)例子中,當(dāng)協(xié)程A調(diào)用await coroutine_b()時(shí),它會(huì)等待協(xié)程B完成并返回結(jié)果。在等待期間,協(xié)程A會(huì)被掛起,直到協(xié)程B完成。一旦協(xié)程B完成并返回結(jié)果,協(xié)程A會(huì)繼續(xù)執(zhí)行。
使用await關(guān)鍵字可以使協(xié)程之間的調(diào)用更加簡(jiǎn)潔和直觀,同時(shí)也可以避免使用回調(diào)函數(shù)等復(fù)雜的異步編程模式。
4.concurrent和future對(duì)象
在Python中,asyncio模塊提供了一種基于協(xié)程的異步編程方式。在協(xié)程中,我們可以使用async/await關(guān)鍵字來(lái)定義異步函數(shù),使用asyncio模塊提供的事件循環(huán)來(lái)調(diào)度協(xié)程的執(zhí)行。
除了協(xié)程之外,asyncio還提供了一些其他的并發(fā)編程工具,包括concurrent和future對(duì)象。
- concurrent對(duì)象
concurrent對(duì)象是asyncio中的一個(gè)重要概念,它表示一個(gè)協(xié)程的執(zhí)行狀態(tài)。在asyncio中,我們可以使用asyncio.create_task()函數(shù)來(lái)創(chuàng)建一個(gè)concurrent對(duì)象,該函數(shù)接受一個(gè)協(xié)程對(duì)象作為參數(shù),并返回一個(gè)concurrent對(duì)象。
例如,下面的代碼創(chuàng)建了一個(gè)協(xié)程對(duì)象,并使用create_task()函數(shù)將其轉(zhuǎn)換為concurrent對(duì)象:
import asyncio
async def my_coroutine():
print('Coroutine started')
await asyncio.sleep(1)
print('Coroutine ended')
async def main():
task = asyncio.create_task(my_coroutine())
await task
asyncio.run(main())
在上面的代碼中,我們使用create_task()函數(shù)將my_coroutine()函數(shù)轉(zhuǎn)換為concurrent對(duì)象,并將其賦值給task變量。然后,我們使用await關(guān)鍵字等待task對(duì)象的完成。
- future對(duì)象
future對(duì)象是asyncio中的另一個(gè)重要概念,它表示一個(gè)異步操作的結(jié)果。在asyncio中,我們可以使用asyncio.Future()函數(shù)來(lái)創(chuàng)建一個(gè)future對(duì)象,該函數(shù)返回一個(gè)未完成的future對(duì)象。
例如,下面的代碼創(chuàng)建了一個(gè)未完成的future對(duì)象:
import asyncio
async def my_coroutine():
print('Coroutine started')
await asyncio.sleep(1)
print('Coroutine ended')
return 'Result'
async def main():
future = asyncio.Future()
await asyncio.sleep(1)
future.set_result(await my_coroutine())
print(future.result())
asyncio.run(main())
在上面的代碼中,我們使用asyncio.Future()函數(shù)創(chuàng)建了一個(gè)未完成的future對(duì)象,并將其賦值給future變量。然后,我們使用await關(guān)鍵字等待1秒鐘,然后調(diào)用my_coroutine()函數(shù),并將其結(jié)果設(shè)置為future對(duì)象的結(jié)果。最后,我們打印future對(duì)象的結(jié)果。
5.asyncio異步迭代器和上下文管理
除了concurrent和future對(duì)象之外,asyncio還提供了一些其他的并發(fā)編程工具,包括異步迭代器和上下文管理。
異步迭代器是一種特殊的迭代器,它可以在異步環(huán)境中使用。在asyncio中,我們可以使用async for循環(huán)來(lái)遍歷異步迭代器。
例如,下面的代碼使用async for循環(huán)遍歷一個(gè)異步迭代器:
import asyncio
async def my_coroutine():
for i in range(5):
await asyncio.sleep(1)
yield i
async def main():
async for i in my_coroutine():
print(i)
asyncio.run(main())
在上面的代碼中,我們定義了一個(gè)異步生成器函數(shù)my_coroutine(),它使用yield語(yǔ)句返回一個(gè)值,并在每次返回值之間暫停1秒鐘。然后,我們使用async for循環(huán)遍歷my_coroutine()函數(shù)返回的異步迭代器,并打印每個(gè)返回值。
上下文管理是一種在異步環(huán)境中管理資源的方式。在asyncio中,我們可以使用async with語(yǔ)句來(lái)管理異步上下文。
例如,下面的代碼使用async with語(yǔ)句管理一個(gè)異步上下文:
import asyncio
class MyContext:
async def __aenter__(self):
print('Entering context')
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc, tb):
print('Exiting context')
await asyncio.sleep(1)
async def main():
async with MyContext() as context:
print('Inside context')
asyncio.run(main())
在上面的代碼中,我們定義了一個(gè)MyContext類,它實(shí)現(xiàn)了__aenter__()和__aexit__()方法。aenter()方法在進(jìn)入上下文時(shí)被調(diào)用,aexit()方法在退出上下文時(shí)被調(diào)用。在main()函數(shù)中,我們使用async with語(yǔ)句管理MyContext對(duì)象,并在上下文中打印一條消息。當(dāng)我們進(jìn)入和退出上下文時(shí),aenter()和__aexit__()方法會(huì)被調(diào)用,并暫停1秒鐘。
6.異步操作MySQL
在Python中,我們可以使用異步IO庫(kù)asyncio來(lái)實(shí)現(xiàn)異步操作MySQL數(shù)據(jù)庫(kù)。下面是一個(gè)簡(jiǎn)單的示例:
import asyncio
import aiomysql
async def test_mysql():
# 連接MySQL數(shù)據(jù)庫(kù)
conn = await aiomysql.connect(host='localhost', port=3306,
user='root', password='password',
db='test', charset='utf8mb4')
# 創(chuàng)建游標(biāo)
cur = await conn.cursor()
# 執(zhí)行SQL語(yǔ)句
await cur.execute("SELECT * FROM users")
# 獲取查詢結(jié)果
result = await cur.fetchall()
# 輸出查詢結(jié)果
print(result)
# 關(guān)閉游標(biāo)和連接
await cur.close()
conn.close()
# 運(yùn)行異步函數(shù)
loop = asyncio.get_event_loop()
loop.run_until_complete(test_mysql())
在上面的示例中,我們使用了aiomysql庫(kù)來(lái)連接MySQL數(shù)據(jù)庫(kù),并使用async/await語(yǔ)法來(lái)執(zhí)行異步操作。首先,我們使用aiomysql.connect()方法來(lái)連接MySQL數(shù)據(jù)庫(kù),然后使用await conn.cursor()方法創(chuàng)建游標(biāo),使用await cur.execute()方法執(zhí)行SQL語(yǔ)句,使用await cur.fetchall()方法獲取查詢結(jié)果,最后使用await cur.close()方法關(guān)閉游標(biāo),使用conn.close()方法關(guān)閉連接。
需要注意的是,在使用aiomysql庫(kù)時(shí),我們需要在連接MySQL數(shù)據(jù)庫(kù)時(shí)指定charset='utf8mb4',以支持中文字符集。
7.異步爬蟲
異步爬蟲是指使用協(xié)程來(lái)實(shí)現(xiàn)爬蟲程序,通過(guò)異步非阻塞的方式來(lái)提高爬取效率。在Python中,可以使用asyncio庫(kù)來(lái)實(shí)現(xiàn)異步爬蟲。
下面是一個(gè)簡(jiǎn)單的異步爬蟲示例:
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def get_links(session, url):
html = await fetch(session, url)
soup = BeautifulSoup(html, 'html.parser')
links = []
for link in soup.find_all('a'):
href = link.get('href')
if href and href.startswith('http'):
links.append(href)
return links
async def main():
async with aiohttp.ClientSession() as session:
links = await get_links(session, 'https://www.baidu.com')
for link in links:
print(link)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
在這個(gè)示例中,我們使用了aiohttp庫(kù)來(lái)發(fā)送異步HTTP請(qǐng)求,使用BeautifulSoup庫(kù)來(lái)解析HTML頁(yè)面,使用asyncio庫(kù)來(lái)實(shí)現(xiàn)協(xié)程。
首先定義了一個(gè)fetch函數(shù),用于發(fā)送HTTP請(qǐng)求并返回響應(yīng)內(nèi)容。然后定義了一個(gè)get_links函數(shù),用于獲取頁(yè)面中的所有鏈接。最后,在main函數(shù)中使用aiohttp庫(kù)創(chuàng)建一個(gè)異步HTTP客戶端會(huì)話,調(diào)用get_links函數(shù)獲取鏈接,并打印出來(lái)。
需要注意的是,在使用aiohttp庫(kù)時(shí),需要使用async with語(yǔ)句來(lái)創(chuàng)建一個(gè)異步HTTP客戶端會(huì)話,以確保會(huì)話在使用完畢后能夠正確關(guān)閉。
下章講:python數(shù)據(jù)庫(kù)編程