跟我一起從零開始學(xué)python(三)多線程/多進(jìn)程/協(xié)程

前言

回顧之前講了python語(yǔ)法編程 ,關(guān)于從零入門python的第一遍,編程語(yǔ)法必修內(nèi)容,比如python3基礎(chǔ)入門,列表與元組,字符串,字典,條件丶循環(huán)和其他語(yǔ)句丶函數(shù)丶面向?qū)ο筘籍惓:臀募幚砗途W(wǎng)絡(luò)編程篇

1.跟我一起從零開始學(xué)python(一)編程語(yǔ)法必修
2.跟我一起從零開始學(xué)python(二)網(wǎng)絡(luò)編程

本篇講:python并發(fā)編程:多線程/多進(jìn)程/協(xié)程

本系列文根據(jù)以下學(xué)習(xí)路線展開講述,由于內(nèi)容較多:

從零開始學(xué)python到高級(jí)進(jìn)階路線圖

第一章:多線程

1.線程和進(jìn)程

線程和進(jìn)程是操作系統(tǒng)中的兩個(gè)重要概念,它們都是并發(fā)編程的基礎(chǔ)。線程是操作系統(tǒng)能夠進(jìn)行運(yùn)算調(diào)度的最小單位,而進(jìn)程則是操作系統(tǒng)進(jìn)行資源分配和調(diào)度的基本單位。

線程和進(jìn)程的區(qū)別:

  • 線程是進(jìn)程的一部分,一個(gè)進(jìn)程可以包含多個(gè)線程,而一個(gè)線程只能屬于一個(gè)進(jìn)程。
  • 進(jìn)程擁有獨(dú)立的內(nèi)存空間,而線程共享進(jìn)程的內(nèi)存空間。
  • 進(jìn)程之間的通信需要使用IPC(Inter-Process Communication)機(jī)制,而線程之間可以直接共享數(shù)據(jù)。
  • 進(jìn)程的創(chuàng)建和銷毀比線程慢,因?yàn)檫M(jìn)程需要分配和釋放獨(dú)立的內(nèi)存空間,而線程只需要分配和釋放一些寄存器和??臻g。

在Python中,可以使用threading模塊來(lái)創(chuàng)建和管理線程。下面是一個(gè)簡(jiǎn)單的線程示例:

import threading

def worker():
    print('Worker thread started')
    # do some work here
    print('Worker thread finished')

# create a new thread
t = threading.Thread(target=worker)
# start the thread
t.start()
# wait for the thread to finish
t.join()

在這個(gè)示例中,我們創(chuàng)建了一個(gè)名為worker的函數(shù),它將在一個(gè)新的線程中運(yùn)行。我們使用threading.Thread類創(chuàng)建了一個(gè)新的線程對(duì)象,并將worker函數(shù)作為目標(biāo)傳遞給它。然后,我們使用start()方法啟動(dòng)線程,并使用join()方法等待線程完成。

2.使用線程

在Python中,使用線程可以通過(guò)threading模塊來(lái)實(shí)現(xiàn)。下面是一個(gè)簡(jiǎn)單的例子,展示了如何使用線程:

import threading

def worker():
    """線程執(zhí)行的任務(wù)"""
    print("Worker thread started")
    # 執(zhí)行一些任務(wù)
    print("Worker thread finished")

# 創(chuàng)建線程
t = threading.Thread(target=worker)
# 啟動(dòng)線程
t.start()

# 主線程繼續(xù)執(zhí)行其他任務(wù)
print("Main thread finished")

在上面的例子中,我們首先定義了一個(gè)worker函數(shù),它將在一個(gè)單獨(dú)的線程中執(zhí)行。然后,我們使用threading.Thread類創(chuàng)建了一個(gè)新的線程,并將worker函數(shù)作為參數(shù)傳遞給它。最后,我們調(diào)用start方法來(lái)啟動(dòng)線程。

注意,線程是異步執(zhí)行的,因此主線程不會(huì)等待線程完成。在上面的例子中,主線程會(huì)立即繼續(xù)執(zhí)行,輸出Main thread finished。如果我們希望等待線程完成后再繼續(xù)執(zhí)行主線程,可以使用join方法:

# 等待線程完成
t.join()


# 主線程繼續(xù)執(zhí)行其他任務(wù)
print("Main thread finished")

在上面的代碼中,我們?cè)趩?dòng)線程后調(diào)用了t.join()方法,這將阻塞主線程,直到線程完成。然后,主線程才會(huì)繼續(xù)執(zhí)行。

3.多線程全局變量

在多線程編程中,多個(gè)線程可以共享全局變量。但是需要注意的是,多個(gè)線程同時(shí)對(duì)同一個(gè)全局變量進(jìn)行讀寫操作時(shí),可能會(huì)出現(xiàn)數(shù)據(jù)競(jìng)爭(zhēng)(Data Race)的問(wèn)題,導(dǎo)致程序出現(xiàn)不可預(yù)期的結(jié)果。

為了避免數(shù)據(jù)競(jìng)爭(zhēng),可以使用線程鎖(Thread Lock)來(lái)保證同一時(shí)刻只有一個(gè)線程可以訪問(wèn)共享變量。Python中提供了Lock、RLock、Semaphore等多種鎖機(jī)制,可以根據(jù)實(shí)際需求選擇合適的鎖。

下面是一個(gè)使用Lock來(lái)保證多線程共享全局變量安全的示例代碼:

import threading

# 定義全局變量
count = 0

# 定義線程鎖
lock = threading.Lock()

# 定義線程函數(shù)
def add():
    global count
    for i in range(100000):
        # 獲取鎖
        lock.acquire()
        count += 1
        # 釋放鎖
        lock.release()

# 創(chuàng)建兩個(gè)線程
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)

# 啟動(dòng)線程
t1.start()
t2.start()

# 等待線程執(zhí)行完畢
t1.join()
t2.join()

# 輸出結(jié)果
print(count)

在上面的代碼中,我們定義了一個(gè)全局變量count,并使用Lock來(lái)保證多個(gè)線程對(duì)count進(jìn)行讀寫操作時(shí)的安全性。在每個(gè)線程中,我們首先獲取鎖,然后對(duì)count進(jìn)行加1操作,最后釋放鎖。這樣就可以保證同一時(shí)刻只有一個(gè)線程可以訪問(wèn)count,避免了數(shù)據(jù)競(jìng)爭(zhēng)的問(wèn)題。

需要注意的是,使用鎖會(huì)帶來(lái)一定的性能損失,因?yàn)槊看潍@取鎖和釋放鎖都需要一定的時(shí)間。因此,在實(shí)際應(yīng)用中,需要根據(jù)實(shí)際情況來(lái)選擇合適的鎖機(jī)制,避免過(guò)度使用鎖導(dǎo)致程序性能下降。

4.共享全局變量所帶來(lái)的問(wèn)題

在多線程編程中,多個(gè)線程可以共享全局變量。但是,共享全局變量也會(huì)帶來(lái)一些問(wèn)題:

  • 競(jìng)爭(zhēng)條件:當(dāng)多個(gè)線程同時(shí)訪問(wèn)和修改同一個(gè)全局變量時(shí),可能會(huì)出現(xiàn)競(jìng)爭(zhēng)條件,導(dǎo)致程序出現(xiàn)不可預(yù)測(cè)的結(jié)果。

  • 數(shù)據(jù)不一致:當(dāng)多個(gè)線程同時(shí)修改同一個(gè)全局變量時(shí),可能會(huì)導(dǎo)致數(shù)據(jù)不一致的問(wèn)題,即某些線程看到的變量值與其他線程看到的不同。

  • 死鎖:當(dāng)多個(gè)線程同時(shí)等待對(duì)方釋放某個(gè)資源時(shí),可能會(huì)出現(xiàn)死鎖的情況,導(dǎo)致程序無(wú)法繼續(xù)執(zhí)行。

因此,在多線程編程中,需要注意對(duì)共享全局變量的訪問(wèn)和修改,避免出現(xiàn)上述問(wèn)題??梢允褂面i、條件變量等機(jī)制來(lái)保證線程之間的同步和互斥。

5.解決線程同時(shí)修改全局變量的方式

在多線程編程中,共享全局變量可能會(huì)帶來(lái)一些問(wèn)題,例如:

  • 競(jìng)爭(zhēng)條件:多個(gè)線程同時(shí)修改同一個(gè)全局變量,可能會(huì)導(dǎo)致數(shù)據(jù)不一致或者出現(xiàn)意料之外的結(jié)果。

  • 死鎖:多個(gè)線程同時(shí)等待對(duì)方釋放資源,導(dǎo)致程序無(wú)法繼續(xù)執(zhí)行。

為了解決這些問(wèn)題,可以采用以下方式:

  • 使用鎖:在訪問(wèn)共享變量時(shí),使用鎖來(lái)保證同一時(shí)刻只有一個(gè)線程可以修改變量。Python中提供了threading模塊中的Lock類來(lái)實(shí)現(xiàn)鎖。

  • 使用線程安全的數(shù)據(jù)結(jié)構(gòu):Python中提供了一些線程安全的數(shù)據(jù)結(jié)構(gòu),例如Queue、deque等,可以在多線程環(huán)境下安全地訪問(wèn)和修改數(shù)據(jù)。

  • 使用局部變量:將全局變量作為參數(shù)傳遞給線程函數(shù),讓線程函數(shù)在局部變量上進(jìn)行操作,避免多個(gè)線程同時(shí)修改同一個(gè)全局變量。

  • 使用線程本地存儲(chǔ):Python中提供了threading模塊中的local類,可以在每個(gè)線程中創(chuàng)建一個(gè)獨(dú)立的變量,避免多個(gè)線程之間共享變量。

6.互斥鎖

在多線程編程中,互斥鎖是一種常用的同步機(jī)制,用于保護(hù)共享資源,防止多個(gè)線程同時(shí)修改同一個(gè)變量導(dǎo)致數(shù)據(jù)不一致的問(wèn)題。

互斥鎖的基本思想是,在訪問(wèn)共享資源之前,先獲取鎖,如果鎖已經(jīng)被其他線程獲取,則當(dāng)前線程會(huì)被阻塞,直到鎖被釋放為止。在訪問(wèn)完共享資源之后,釋放鎖,讓其他線程可以獲取鎖并訪問(wèn)共享資源。

在Python中,可以使用threading模塊中的Lock類來(lái)實(shí)現(xiàn)互斥鎖。Lock類有兩個(gè)基本方法:

  • acquire([blocking]):獲取鎖,如果鎖已經(jīng)被其他線程獲取,則當(dāng)前線程會(huì)被阻塞。如果blockingFalse,則獲取鎖失敗時(shí)會(huì)立即返回False,而不是阻塞等待。
  • release():釋放鎖,讓其他線程可以獲取鎖并訪問(wèn)共享資源。
    下面是一個(gè)使用互斥鎖的例子:
import threading

# 共享變量
count = 0

# 創(chuàng)建互斥鎖
lock = threading.Lock()

# 線程函數(shù)
def worker():
    global count
    for i in range(100000):
        # 獲取鎖
        lock.acquire()
        try:
            count += 1
        finally:
            # 釋放鎖
            lock.release()

# 創(chuàng)建多個(gè)線程
threads = []
for i in range(10):
    t = threading.Thread(target=worker)
    threads.append(t)

# 啟動(dòng)線程
for t in threads:
    t.start()

# 等待所有線程執(zhí)行完畢
for t in threads:
    t.join()

# 輸出結(jié)果
print(count)

在上面的例子中,我們創(chuàng)建了一個(gè)共享變量count,并使用互斥鎖來(lái)保護(hù)它。在每個(gè)線程中,我們先獲取鎖,然后修改count的值,最后釋放鎖。這樣就可以保證多個(gè)線程不會(huì)同時(shí)修改count的值,從而避免了數(shù)據(jù)不一致的問(wèn)題。最后輸出count的值,可以看到它的值為1000000,符合預(yù)期。

7.死鎖

死鎖是指兩個(gè)或多個(gè)線程在執(zhí)行過(guò)程中,因爭(zhēng)奪資源而造成的一種互相等待的現(xiàn)象,若無(wú)外力干涉那它們都將無(wú)法繼續(xù)執(zhí)行下去。在多線程編程中,死鎖是一種常見的問(wèn)題,需要特別注意。

死鎖的產(chǎn)生通常需要滿足以下四個(gè)條件

  • 互斥條件:一個(gè)資源每次只能被一個(gè)線程使用。
  • 請(qǐng)求與保持條件:一個(gè)線程因請(qǐng)求資源而阻塞時(shí),對(duì)已獲得的資源保持不放。
  • 不剝奪條件:線程已獲得的資源,在未使用完之前,不能被其他線程強(qiáng)行剝奪,只能由該線程自己釋放。
  • 循環(huán)等待條件:若干線程之間形成一種頭尾相接的循環(huán)等待資源的關(guān)系。

為了避免死鎖的產(chǎn)生,可以采用以下幾種方式

  • 避免使用多個(gè)鎖,盡量使用一個(gè)鎖來(lái)控制多個(gè)資源的訪問(wèn)。
  • 避免持有鎖的時(shí)間過(guò)長(zhǎng),盡量縮短鎖的持有時(shí)間。
  • 避免循環(huán)等待,盡量按照固定的順序獲取鎖。
  • 使用超時(shí)機(jī)制,當(dāng)?shù)却龝r(shí)間超過(guò)一定時(shí)間后,自動(dòng)釋放鎖,避免長(zhǎng)時(shí)間等待造成的死鎖。

8.線程池

線程池是一種線程管理技術(shù),它可以在程序啟動(dòng)時(shí)創(chuàng)建一定數(shù)量的線程,放入一個(gè)池中,當(dāng)需要使用線程時(shí),就從池中取出一個(gè)線程執(zhí)行任務(wù),任務(wù)執(zhí)行完畢后,線程并不會(huì)被銷毀,而是放回池中等待下一次任務(wù)的到來(lái)。

使用線程池可以避免頻繁創(chuàng)建和銷毀線程的開銷,提高程序的性能和效率。在Python中,可以使用標(biāo)準(zhǔn)庫(kù)中的concurrent.futures模塊來(lái)實(shí)現(xiàn)線程池。

下面是一個(gè)簡(jiǎn)單的線程池示例:

import concurrent.futures
import time

def worker(num):
    print(f"Thread-{num} started")
    time.sleep(1)
    print(f"Thread-{num} finished")

if __name__ == '__main__':
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        for i in range(5):
            executor.submit(worker, i)

在這個(gè)示例中,我們創(chuàng)建了一個(gè)包含3個(gè)線程的線程池,然后提交了5個(gè)任務(wù)給線程池執(zhí)行。由于線程池中只有3個(gè)線程,因此只有3個(gè)任務(wù)會(huì)同時(shí)執(zhí)行,其余的任務(wù)會(huì)等待空閑線程的出現(xiàn)。

輸出結(jié)果如下:

Thread-0 started
Thread-1 started
Thread-2 started
Thread-0 finished
Thread-3 started
Thread-1 finished
Thread-4 started
Thread-2 finished
Thread-3 finished
Thread-4 finished

可以看到,線程池中的3個(gè)線程依次執(zhí)行了5個(gè)任務(wù),任務(wù)的執(zhí)行順序與提交的順序無(wú)關(guān)。

第二章:多進(jìn)程

1.進(jìn)程的狀態(tài)

在操作系統(tǒng)中,進(jìn)程有以下幾種狀態(tài)

  • 就緒狀態(tài)(Ready):進(jìn)程已經(jīng)準(zhǔn)備好運(yùn)行,等待分配CPU時(shí)間片。

  • 運(yùn)行狀態(tài)(Running):進(jìn)程正在運(yùn)行,占用CPU時(shí)間片。

  • 阻塞狀態(tài)(Blocked):進(jìn)程因?yàn)槟承┰驘o(wú)法繼續(xù)執(zhí)行,例如等待I/O操作完成或等待某個(gè)資源的釋放。

  • 掛起狀態(tài)(Suspended):進(jìn)程被暫停,不再占用CPU時(shí)間片,但是它的狀態(tài)信息仍然保存在內(nèi)存中。

  • 終止?fàn)顟B(tài)(Terminated):進(jìn)程已經(jīng)完成執(zhí)行或被強(qiáng)制終止。

進(jìn)程的狀態(tài)轉(zhuǎn)換通常是由操作系統(tǒng)內(nèi)核進(jìn)行控制的,例如當(dāng)一個(gè)進(jìn)程等待I/O操作完成時(shí),操作系統(tǒng)會(huì)將該進(jìn)程的狀態(tài)從就緒狀態(tài)轉(zhuǎn)換為阻塞狀態(tài),當(dāng)I/O操作完成后,操作系統(tǒng)會(huì)將該進(jìn)程的狀態(tài)從阻塞狀態(tài)轉(zhuǎn)換為就緒狀態(tài),等待分配CPU時(shí)間片。

2.線程的創(chuàng)建-multiprocessing

在 Python 中,可以使用 multiprocessing 模塊來(lái)創(chuàng)建多進(jìn)程。multiprocessing 模塊提供了一個(gè) Process 類,可以用來(lái)創(chuàng)建進(jìn)程。下面是一個(gè)簡(jiǎn)單的例子:

import multiprocessing

def worker():
    """子進(jìn)程要執(zhí)行的任務(wù)"""
    print('Worker')

if __name__ == '__main__':
    # 創(chuàng)建子進(jìn)程
    p = multiprocessing.Process(target=worker)
    # 啟動(dòng)子進(jìn)程
    p.start()
    # 等待子進(jìn)程結(jié)束
    p.join()

在上面的例子中,我們首先定義了一個(gè) worker 函數(shù),它是子進(jìn)程要執(zhí)行的任務(wù)。然后,我們使用 multiprocessing.Process 類創(chuàng)建了一個(gè)子進(jìn)程,并將 worker 函數(shù)作為參數(shù)傳遞給了 Process 類的構(gòu)造函數(shù)。接著,我們調(diào)用 start 方法啟動(dòng)子進(jìn)程,最后調(diào)用 join 方法等待子進(jìn)程結(jié)束。

需要注意的是,在 Windows 系統(tǒng)中,由于 multiprocessing 模塊使用了 fork 系統(tǒng)調(diào)用,而 Windows 不支持 fork,因此需要在 if __name__ == '__main__': 語(yǔ)句中調(diào)用子進(jìn)程的代碼。這是因?yàn)樵?Windows 中,每個(gè)進(jìn)程都會(huì)執(zhí)行一遍程序的所有代碼,而 if __name__ == '__main__': 語(yǔ)句可以保證子進(jìn)程只會(huì)執(zhí)行指定的代碼。

3.進(jìn)程丶線程對(duì)比

進(jìn)程和線程都是實(shí)現(xiàn)并發(fā)編程的方式,但是它們有以下不同點(diǎn):

  • 資源占用:進(jìn)程擁有獨(dú)立的內(nèi)存空間,而線程共享進(jìn)程的內(nèi)存空間。因此,創(chuàng)建進(jìn)程的開銷比創(chuàng)建線程大,同時(shí)進(jìn)程間的通信也比線程間的通信復(fù)雜。

  • 并發(fā)性:由于線程共享進(jìn)程的內(nèi)存空間,因此線程間的通信和數(shù)據(jù)共享比進(jìn)程間的通信和數(shù)據(jù)共享更容易。同時(shí),線程的切換比進(jìn)程的切換更快,因此線程的并發(fā)性比進(jìn)程高。

  • 安全性:由于線程共享進(jìn)程的內(nèi)存空間,因此多個(gè)線程同時(shí)訪問(wèn)同一塊內(nèi)存時(shí),可能會(huì)出現(xiàn)競(jìng)爭(zhēng)條件,導(dǎo)致數(shù)據(jù)不一致或者程序崩潰。而進(jìn)程之間的內(nèi)存空間是獨(dú)立的,因此進(jìn)程間的數(shù)據(jù)不會(huì)相互影響。

  • 編程復(fù)雜度:由于進(jìn)程間的通信和數(shù)據(jù)共享比較復(fù)雜,因此編寫多進(jìn)程程序的復(fù)雜度比編寫多線程程序的復(fù)雜度高。

總的來(lái)說(shuō),進(jìn)程適合于CPU密集型任務(wù),而線程適合于IO密集型任務(wù)。在實(shí)際應(yīng)用中,需要根據(jù)具體的場(chǎng)景選擇合適的并發(fā)編程方式。

4.進(jìn)程間的通信-Queue

在多進(jìn)程編程中,不同進(jìn)程之間的數(shù)據(jù)是無(wú)法直接共享的,因?yàn)槊總€(gè)進(jìn)程都有自己獨(dú)立的內(nèi)存空間。因此,為了實(shí)現(xiàn)進(jìn)程間的通信,我們需要使用一些特殊的機(jī)制。

其中,最常用的進(jìn)程間通信方式是使用隊(duì)列(Queue)。隊(duì)列是一種先進(jìn)先出(FIFO)的數(shù)據(jù)結(jié)構(gòu),可以用來(lái)在多個(gè)進(jìn)程之間傳遞數(shù)據(jù)。

在Python中,我們可以使用multiprocessing模塊中的Queue類來(lái)實(shí)現(xiàn)進(jìn)程間通信。Queue類提供了put()get()方法,用于向隊(duì)列中添加數(shù)據(jù)和從隊(duì)列中取出數(shù)據(jù)。

下面是一個(gè)簡(jiǎn)單的例子,演示了如何使用Queue實(shí)現(xiàn)進(jìn)程間通信:

from multiprocessing import Process, Queue

def worker(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(item)

if __name__ == '__main__':
    q = Queue()
    p = Process(target=worker, args=(q,))
    p.start()

    for i in range(10):
        q.put(i)

    q.put(None)
    p.join()

在這個(gè)例子中,我們創(chuàng)建了一個(gè)進(jìn)程p,它的工作是從隊(duì)列q中取出數(shù)據(jù)并打印出來(lái)。主進(jìn)程向隊(duì)列中添加了10個(gè)數(shù)據(jù),然后再添加一個(gè)None,表示數(shù)據(jù)已經(jīng)全部添加完畢。最后,主進(jìn)程等待進(jìn)程p執(zhí)行完畢。

需要注意的是,當(dāng)我們向隊(duì)列中添加數(shù)據(jù)時(shí),如果隊(duì)列已滿,put()方法會(huì)阻塞,直到隊(duì)列中有空閑位置。同樣地,當(dāng)我們從隊(duì)列中取出數(shù)據(jù)時(shí),如果隊(duì)列為空,get()方法也會(huì)阻塞,直到隊(duì)列中有數(shù)據(jù)可取。

除了Queue之外,Python中還提供了一些其他的進(jìn)程間通信方式,比如Pipe、ValueArray等。這些方式各有特點(diǎn),可以根據(jù)具體的需求選擇合適的方式

5.進(jìn)程池的創(chuàng)建-pool

在Python中,我們可以使用multiprocessing模塊中的Pool類來(lái)創(chuàng)建進(jìn)程池。進(jìn)程池是一組可重用的進(jìn)程,可以在需要時(shí)分配給任務(wù)。這樣可以避免頻繁地創(chuàng)建和銷毀進(jìn)程,從而提高程序的效率。

下面是一個(gè)使用進(jìn)程池的例子:

import multiprocessing

def worker(num):
    """進(jìn)程池中的任務(wù)"""
    print('Worker %d is running' % num)

if __name__ == '__main__':
    # 創(chuàng)建進(jìn)程池,池中有3個(gè)進(jìn)程
    pool = multiprocessing.Pool(processes=3)
    # 向進(jìn)程池中添加任務(wù)
    for i in range(5):
        pool.apply_async(worker, args=(i,))
    # 關(guān)閉進(jìn)程池,不再接受新的任務(wù)
    pool.close()
    # 等待所有任務(wù)完成
    pool.join()
    print('All workers done.')

在這個(gè)例子中,我們首先創(chuàng)建了一個(gè)進(jìn)程池,池中有3個(gè)進(jìn)程。然后向進(jìn)程池中添加了5個(gè)任務(wù),每個(gè)任務(wù)都是調(diào)用worker函數(shù)。最后,我們關(guān)閉了進(jìn)程池,并等待所有任務(wù)完成。

需要注意的是,進(jìn)程池中的任務(wù)必須是可序列化的,因?yàn)檫M(jìn)程池會(huì)將任務(wù)發(fā)送給子進(jìn)程執(zhí)行。如果任務(wù)中包含不可序列化的對(duì)象,會(huì)導(dǎo)致進(jìn)程池?zé)o法正常工作。

第三章:協(xié)程

協(xié)程是一種輕量級(jí)的線程,也稱為微線程或者用戶級(jí)線程。協(xié)程的特點(diǎn)是在一個(gè)線程中,可以有多個(gè)協(xié)程,協(xié)程之間可以相互切換,從而實(shí)現(xiàn)并發(fā)執(zhí)行。

在Python中,協(xié)程是通過(guò)生成器實(shí)現(xiàn)的。通過(guò)yield關(guān)鍵字,可以將一個(gè)函數(shù)變成一個(gè)生成器,從而實(shí)現(xiàn)協(xié)程的功能。在協(xié)程中,可以使用yield關(guān)鍵字來(lái)暫停函數(shù)的執(zhí)行,并返回一個(gè)值給調(diào)用者。當(dāng)協(xié)程再次被調(diào)用時(shí),可以從上一次暫停的位置繼續(xù)執(zhí)行。

Python中的協(xié)程有兩種實(shí)現(xiàn)方式:使用生成器實(shí)現(xiàn)的協(xié)程和使用async/await關(guān)鍵字實(shí)現(xiàn)的協(xié)程。

使用生成器實(shí)現(xiàn)的協(xié)程

def coroutine():
    while True:
        value = yield
        print('Received value:', value)

c = coroutine()
next(c)  # 啟動(dòng)協(xié)程
c.send(10)  # 發(fā)送值給協(xié)程

使用async/await關(guān)鍵字實(shí)現(xiàn)的協(xié)程

import asyncio

async def coroutine():
    while True:
        value = await asyncio.sleep(1)
        print('Received value:', value)

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine())

在使用async/await關(guān)鍵字實(shí)現(xiàn)的協(xié)程中,需要使用asyncio模塊提供的事件循環(huán)來(lái)運(yùn)行協(xié)程。在協(xié)程中,可以使用await關(guān)鍵字來(lái)暫停函數(shù)的執(zhí)行,并等待一個(gè)異步操作完成。當(dāng)異步操作完成后,協(xié)程會(huì)從await語(yǔ)句處繼續(xù)執(zhí)行。

協(xié)程的優(yōu)點(diǎn)是可以避免線程切換的開銷,從而提高程序的性能。同時(shí),協(xié)程也可以避免線程之間的競(jìng)爭(zhēng)條件和死鎖問(wèn)題。但是,協(xié)程也有一些缺點(diǎn),例如不能利用多核CPU的優(yōu)勢(shì),以及不能進(jìn)行阻塞式IO操作。

1.協(xié)程的意義

協(xié)程是一種輕量級(jí)的線程,可以在單個(gè)線程中實(shí)現(xiàn)并發(fā)。與線程相比,協(xié)程的切換開銷更小,可以更高效地利用CPU資源。協(xié)程的意義在于:

  • 提高程序的并發(fā)性能:協(xié)程可以在單個(gè)線程中實(shí)現(xiàn)并發(fā),避免了線程切換的開銷,提高了程序的并發(fā)性能。

  • 簡(jiǎn)化編程模型:協(xié)程可以使用同步的編程模型,避免了復(fù)雜的線程同步問(wèn)題,使得編程更加簡(jiǎn)單。

  • 支持高并發(fā):協(xié)程可以支持大量的并發(fā)任務(wù),可以用于高并發(fā)的網(wǎng)絡(luò)編程、爬蟲等場(chǎng)景。

  • 提高代碼可讀性:協(xié)程可以使用同步的編程模型,代碼可讀性更高,易于維護(hù)。

總之,協(xié)程是一種高效、簡(jiǎn)單、可讀性強(qiáng)的并發(fā)編程模型,可以提高程序的并發(fā)性能,支持高并發(fā),簡(jiǎn)化編程模型。

2.asyncio事件循環(huán)

在Python中,協(xié)程是一種輕量級(jí)的并發(fā)編程方式,它可以在單線程中實(shí)現(xiàn)并發(fā)執(zhí)行。協(xié)程的意義在于可以提高程序的并發(fā)性能,減少線程切換的開銷,同時(shí)也可以簡(jiǎn)化編程模型,使得代碼更加易于理解和維護(hù)。

在Python 3.4及以上版本中,標(biāo)準(zhǔn)庫(kù)中提供了asyncio模塊,它是Python中實(shí)現(xiàn)協(xié)程的主要方式之一。asyncio模塊提供了一個(gè)事件循環(huán)(Event Loop),它可以在單線程中實(shí)現(xiàn)多個(gè)協(xié)程的并發(fā)執(zhí)行。事件循環(huán)會(huì)不斷地從協(xié)程隊(duì)列中取出協(xié)程并執(zhí)行,當(dāng)協(xié)程遇到IO操作時(shí),會(huì)自動(dòng)掛起并切換到其他協(xié)程執(zhí)行,等待IO操作完成后再恢復(fù)執(zhí)行。

asyncio事件循環(huán)的使用方式如下:

1.創(chuàng)建一個(gè)事件循環(huán)對(duì)象

import asyncio

loop = asyncio.get_event_loop()

2.將協(xié)程對(duì)象加入事件循環(huán)中

async def coroutine():
    # 協(xié)程代碼

loop.run_until_complete(coroutine())

3.啟動(dòng)事件循環(huán)

loop.run_forever()

在事件循環(huán)中,可以使用async/await關(guān)鍵字定義協(xié)程函數(shù),使用asyncio模塊提供的各種方法實(shí)現(xiàn)協(xié)程之間的通信和協(xié)作。例如,可以使用asyncio.sleep()方法實(shí)現(xiàn)協(xié)程的延時(shí)操作,使用asyncio.wait()方法等待多個(gè)協(xié)程的完成等。

3.await關(guān)鍵字

在Python中,await是一個(gè)關(guān)鍵字,用于等待一個(gè)協(xié)程完成。當(dāng)一個(gè)協(xié)程調(diào)用另一個(gè)協(xié)程時(shí),它可以使用await關(guān)鍵字來(lái)等待另一個(gè)協(xié)程完成并返回結(jié)果。在等待期間,當(dāng)前協(xié)程會(huì)被掛起,直到被等待的協(xié)程完成。

例如,假設(shè)有兩個(gè)協(xié)程A和B,A需要等待B完成后才能繼續(xù)執(zhí)行。在協(xié)程A中,可以使用await關(guān)鍵字來(lái)等待協(xié)程B完成:

async def coroutine_b():
    # 協(xié)程B的代碼

async def coroutine_a():
    # 協(xié)程A的代碼
    result = await coroutine_b()
    # 繼續(xù)執(zhí)行協(xié)程A的代碼

在這個(gè)例子中,當(dāng)協(xié)程A調(diào)用await coroutine_b()時(shí),它會(huì)等待協(xié)程B完成并返回結(jié)果。在等待期間,協(xié)程A會(huì)被掛起,直到協(xié)程B完成。一旦協(xié)程B完成并返回結(jié)果,協(xié)程A會(huì)繼續(xù)執(zhí)行。

使用await關(guān)鍵字可以使協(xié)程之間的調(diào)用更加簡(jiǎn)潔和直觀,同時(shí)也可以避免使用回調(diào)函數(shù)等復(fù)雜的異步編程模式。

4.concurrent和future對(duì)象

在Python中,asyncio模塊提供了一種基于協(xié)程的異步編程方式。在協(xié)程中,我們可以使用async/await關(guān)鍵字來(lái)定義異步函數(shù),使用asyncio模塊提供的事件循環(huán)來(lái)調(diào)度協(xié)程的執(zhí)行。

除了協(xié)程之外,asyncio還提供了一些其他的并發(fā)編程工具,包括concurrentfuture對(duì)象。

  • concurrent對(duì)象

concurrent對(duì)象是asyncio中的一個(gè)重要概念,它表示一個(gè)協(xié)程的執(zhí)行狀態(tài)。在asyncio中,我們可以使用asyncio.create_task()函數(shù)來(lái)創(chuàng)建一個(gè)concurrent對(duì)象,該函數(shù)接受一個(gè)協(xié)程對(duì)象作為參數(shù),并返回一個(gè)concurrent對(duì)象。

例如,下面的代碼創(chuàng)建了一個(gè)協(xié)程對(duì)象,并使用create_task()函數(shù)將其轉(zhuǎn)換為concurrent對(duì)象:

import asyncio

async def my_coroutine():
    print('Coroutine started')
    await asyncio.sleep(1)
    print('Coroutine ended')

async def main():
    task = asyncio.create_task(my_coroutine())
    await task

asyncio.run(main())

在上面的代碼中,我們使用create_task()函數(shù)將my_coroutine()函數(shù)轉(zhuǎn)換為concurrent對(duì)象,并將其賦值給task變量。然后,我們使用await關(guān)鍵字等待task對(duì)象的完成。

  • future對(duì)象

future對(duì)象是asyncio中的另一個(gè)重要概念,它表示一個(gè)異步操作的結(jié)果。在asyncio中,我們可以使用asyncio.Future()函數(shù)來(lái)創(chuàng)建一個(gè)future對(duì)象,該函數(shù)返回一個(gè)未完成的future對(duì)象。

例如,下面的代碼創(chuàng)建了一個(gè)未完成的future對(duì)象:

import asyncio

async def my_coroutine():
    print('Coroutine started')
    await asyncio.sleep(1)
    print('Coroutine ended')
    return 'Result'

async def main():
    future = asyncio.Future()
    await asyncio.sleep(1)
    future.set_result(await my_coroutine())
    print(future.result())

asyncio.run(main())

在上面的代碼中,我們使用asyncio.Future()函數(shù)創(chuàng)建了一個(gè)未完成的future對(duì)象,并將其賦值給future變量。然后,我們使用await關(guān)鍵字等待1秒鐘,然后調(diào)用my_coroutine()函數(shù),并將其結(jié)果設(shè)置為future對(duì)象的結(jié)果。最后,我們打印future對(duì)象的結(jié)果。

5.asyncio異步迭代器和上下文管理

除了concurrent和future對(duì)象之外,asyncio還提供了一些其他的并發(fā)編程工具,包括異步迭代器和上下文管理。

異步迭代器是一種特殊的迭代器,它可以在異步環(huán)境中使用。在asyncio中,我們可以使用async for循環(huán)來(lái)遍歷異步迭代器。

例如,下面的代碼使用async for循環(huán)遍歷一個(gè)異步迭代器:

import asyncio

async def my_coroutine():
    for i in range(5):
        await asyncio.sleep(1)
        yield i

async def main():
    async for i in my_coroutine():
        print(i)

asyncio.run(main())

在上面的代碼中,我們定義了一個(gè)異步生成器函數(shù)my_coroutine(),它使用yield語(yǔ)句返回一個(gè)值,并在每次返回值之間暫停1秒鐘。然后,我們使用async for循環(huán)遍歷my_coroutine()函數(shù)返回的異步迭代器,并打印每個(gè)返回值。

上下文管理是一種在異步環(huán)境中管理資源的方式。在asyncio中,我們可以使用async with語(yǔ)句來(lái)管理異步上下文。

例如,下面的代碼使用async with語(yǔ)句管理一個(gè)異步上下文:

import asyncio

class MyContext:
    async def __aenter__(self):
        print('Entering context')
        await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc, tb):
        print('Exiting context')
        await asyncio.sleep(1)

async def main():
    async with MyContext() as context:
        print('Inside context')

asyncio.run(main())

在上面的代碼中,我們定義了一個(gè)MyContext類,它實(shí)現(xiàn)了__aenter__()__aexit__()方法。aenter()方法在進(jìn)入上下文時(shí)被調(diào)用,aexit()方法在退出上下文時(shí)被調(diào)用。在main()函數(shù)中,我們使用async with語(yǔ)句管理MyContext對(duì)象,并在上下文中打印一條消息。當(dāng)我們進(jìn)入和退出上下文時(shí),aenter()__aexit__()方法會(huì)被調(diào)用,并暫停1秒鐘。

6.異步操作MySQL

在Python中,我們可以使用異步IO庫(kù)asyncio來(lái)實(shí)現(xiàn)異步操作MySQL數(shù)據(jù)庫(kù)。下面是一個(gè)簡(jiǎn)單的示例:

import asyncio
import aiomysql

async def test_mysql():
    # 連接MySQL數(shù)據(jù)庫(kù)
    conn = await aiomysql.connect(host='localhost', port=3306,
                                  user='root', password='password',
                                  db='test', charset='utf8mb4')
    # 創(chuàng)建游標(biāo)
    cur = await conn.cursor()
    # 執(zhí)行SQL語(yǔ)句
    await cur.execute("SELECT * FROM users")
    # 獲取查詢結(jié)果
    result = await cur.fetchall()
    # 輸出查詢結(jié)果
    print(result)
    # 關(guān)閉游標(biāo)和連接
    await cur.close()
    conn.close()

# 運(yùn)行異步函數(shù)
loop = asyncio.get_event_loop()
loop.run_until_complete(test_mysql())

在上面的示例中,我們使用了aiomysql庫(kù)來(lái)連接MySQL數(shù)據(jù)庫(kù),并使用async/await語(yǔ)法來(lái)執(zhí)行異步操作。首先,我們使用aiomysql.connect()方法來(lái)連接MySQL數(shù)據(jù)庫(kù),然后使用await conn.cursor()方法創(chuàng)建游標(biāo),使用await cur.execute()方法執(zhí)行SQL語(yǔ)句,使用await cur.fetchall()方法獲取查詢結(jié)果,最后使用await cur.close()方法關(guān)閉游標(biāo),使用conn.close()方法關(guān)閉連接。

需要注意的是,在使用aiomysql庫(kù)時(shí),我們需要在連接MySQL數(shù)據(jù)庫(kù)時(shí)指定charset='utf8mb4',以支持中文字符集。

7.異步爬蟲

異步爬蟲是指使用協(xié)程來(lái)實(shí)現(xiàn)爬蟲程序,通過(guò)異步非阻塞的方式來(lái)提高爬取效率。在Python中,可以使用asyncio庫(kù)來(lái)實(shí)現(xiàn)異步爬蟲。

下面是一個(gè)簡(jiǎn)單的異步爬蟲示例:

import asyncio
import aiohttp
from bs4 import BeautifulSoup

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def get_links(session, url):
    html = await fetch(session, url)
    soup = BeautifulSoup(html, 'html.parser')
    links = []
    for link in soup.find_all('a'):
        href = link.get('href')
        if href and href.startswith('http'):
            links.append(href)
    return links

async def main():
    async with aiohttp.ClientSession() as session:
        links = await get_links(session, 'https://www.baidu.com')
        for link in links:
            print(link)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

在這個(gè)示例中,我們使用了aiohttp庫(kù)來(lái)發(fā)送異步HTTP請(qǐng)求,使用BeautifulSoup庫(kù)來(lái)解析HTML頁(yè)面,使用asyncio庫(kù)來(lái)實(shí)現(xiàn)協(xié)程。

首先定義了一個(gè)fetch函數(shù),用于發(fā)送HTTP請(qǐng)求并返回響應(yīng)內(nèi)容。然后定義了一個(gè)get_links函數(shù),用于獲取頁(yè)面中的所有鏈接。最后,在main函數(shù)中使用aiohttp庫(kù)創(chuàng)建一個(gè)異步HTTP客戶端會(huì)話,調(diào)用get_links函數(shù)獲取鏈接,并打印出來(lái)。

需要注意的是,在使用aiohttp庫(kù)時(shí),需要使用async with語(yǔ)句來(lái)創(chuàng)建一個(gè)異步HTTP客戶端會(huì)話,以確保會(huì)話在使用完畢后能夠正確關(guān)閉。

下章講:python數(shù)據(jù)庫(kù)編程

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容