Python異步方式淺析-中斷子線程及任務

情景

??在開發(fā)中,我們處理耗時任務時,通常考慮使用異步處理

實現(xiàn)方式

??一般我們實現(xiàn)異步的方式有三種,分別如下:

  1. 多進程
  2. 多線程
  3. 異步IO/協(xié)程

三種方式異同點

1,多進程能利用多核 CPU,但內存開銷大
2,多線程在操作系統(tǒng)層面也可以使用多核CPU,但是由于鎖的問題寫法比較繁瑣且不易理解,雖然加了個GIL(Global Interpreter Lock),但是加了后又只能同時執(zhí)行一個任務,也就是只能使用一個CPU。內存開銷比進程小
3,py3新加的 asyncio 是用來做異步 io 的,3.5前使用@asyncio.coroutine注解加yield&from搭配來使用,3.5后使用關鍵字async&await使用。asyncio/協(xié)程相當于在python 中實現(xiàn)一個內核調度系統(tǒng),且在一個線程執(zhí)行(主線程),協(xié)程在進行 io 阻塞時,安排別的協(xié)程(任務)繼續(xù)運行;內存開銷更小

實戰(zhàn)編碼實現(xiàn)(基于py 3.8.x)

??首先我們預先編寫三個耗時任務,分別是做魚,做湯,煮飯,如果只有一個鍋的話,我們就只能等待上一個做好才能繼續(xù)下一個,也就是同步執(zhí)行?,F(xiàn)在我們需要買三個鍋,就是異步執(zhí)行

import os
from time import sleep

class SyncTest:
    # 做魚
    def do_fish(self):
        print("do_fish start")
        # 模擬任務耗時
        sleep(3)
        print('do_fish finished',
              datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))  # 洗完的時候, 洗衣機會響一下, 告訴我們洗完了

    # 煲湯
    def do_soap(self):
        print("do_soap start")
        # 模擬io耗時
        sleep(2)
        print('do_soap finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    # 煮米飯
    def do_rice(self):
        print("do_rice start")
        # 模擬io耗時
        sleep(5)
        print('do_rice finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    def start(self):
        print("start do", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        self.do_fish()
        self.do_soap()
        self.do_rice()

if __name__ == "__main__":
    print("main start...")
    # 同步
    SyncTest().start()
    print("main end...")

先看下同步結果:

start do 2020-01-03 11:05:24
do_fish start 16440 MainThread
do_fish finished 2020-01-03 11:05:27
do_soap start 16440 MainThread
do_soap finished 2020-01-03 11:05:29
do_rice start 16440 MainThread
do_rice finished 2020-01-03 11:05:34
main end...

可以看到同步任務就是需要34-24 =10s,也就是3+2+5,且都是在同一個進程和線程執(zhí)行的;那么我們現(xiàn)在又多買了兩口鍋,三口鍋一起做;我們采用異步任務來實驗,實現(xiàn)如下

  • 多進程做飯

在Windows中使用process模塊的注意事項
??在Windows操作系統(tǒng)中由于沒有fork(Linux操作系統(tǒng)中創(chuàng)建進程的機制),在創(chuàng)建 子進程的是時候會自動import啟動它的這個文件,而在import的時候又執(zhí)行了整個文件。因此如果process()直接寫在文件中就會無限遞歸創(chuàng)建子進程報錯。所以必須把創(chuàng)建子進程的部分使用if name =='main'判斷保護起來,import的時候,就不會遞歸運行了

import multiprocessing

if __name__ == "__main__":
    print("main start...")
    # 多進程
    for i in range(1, 4):
        if i == 1:
            print("process start do", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
            multiprocessing.Process(target=do_fish, args=(i,)).start()
        elif i == 2:
            multiprocessing.Process(target=do_soap).start()
        elif i == 3:
            multiprocessing.Process(target=do_rice).start()
    print("main end...")

看下執(zhí)行結果:

main start...
process start do 2020-01-03 11:11:12
main end...
process do_soap start 25692 MainThread
process do_rice start 216 MainThread
process do_fish start 1428 MainThread
process do_soap finished 2020-01-03 11:11:14
process do_fish finished 2020-01-03 11:11:15
process do_rice finished 2020-01-03 11:11:17

可以看到,總耗時17-12=5s,不同進程相同線程,異步執(zhí)行任務,主線程不會等待子線程執(zhí)行完畢

  • 多線程做飯
import threading

class ThreadTest:
    # 做魚
    def do_fish(self):
        print("thread do_fish start", os.getpid(), threading.current_thread().getName())
        # 模擬任務耗時
        sleep(3)
        print('thread do_fish finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    # 煲湯
    def do_soap(self):
        print("thread do_soap start", os.getpid(), threading.current_thread().getName())
        # 模擬io耗時
        sleep(2)
        print('thread do_soap finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    # 煮米飯
    def do_rice(self):
        print("thread do_rice start\n", os.getpid(), threading.current_thread().getName())
        # 模擬io耗時
        sleep(5)
        print('thread do_rice finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    def start(self):
        print("thread start do", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        for i in range(1, 4):
            if i == 1:
                threading.Thread(target=self.do_fish).start()
            elif i == 2:
                threading.Thread(target=self.do_soap).start()
            elif i == 3:
                threading.Thread(target=self.do_rice).start()
                
if __name__ == "__main__":
    print("main start...")
    ThreadTest().start()
    print("main end...")

看下執(zhí)行結果:

main start...
thread start do 2020-01-03 11:15:19
thread do_fish start 20144 Thread-1
thread do_soap start 20144 Thread-2
thread do_rice start 20144 Thread-3
main end... 
thread do_soap finished 2020-01-03 11:15:21
thread do_fish finished 2020-01-03 11:15:22
thread do_rice finished 2020-01-03 11:15:24

總耗時24-19=5s,同進程不同線程,異步執(zhí)行任務,主線程不會等待子線程執(zhí)行完畢

  • 異步io/協(xié)程做飯
import asyncio


class AsyncIoTest:
    async def do_fish(self):
        print("asyncio do_fish start", os.getpid(), threading.current_thread().getName())
        # 模擬io耗時
        await asyncio.sleep(3)
        print('asyncio do_fish finished',
              datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))  # 洗完的時候, 洗衣機會響一下, 告訴我們洗完了

    async def do_soap(self):
        print("asyncio do_soap start", os.getpid(), threading.current_thread().getName())
        # 模擬io耗時
        await asyncio.sleep(2)
        print('asyncio do_soap finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        # raise Exception("aaaa")

    async def do_rice(self):
        print("asyncio do_rice start", os.getpid(), threading.current_thread().getName())
        # 模擬io耗時
        await asyncio.sleep(5)
        print('asyncio do_rice finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    def start(self):
        print("asyncio start do", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait([self.do_fish(), self.do_soap(), self.do_rice()]))
        loop.close()

if __name__ == "__main__":
    print("main start...")
    # 協(xié)程
    AsyncIoTest().start()
    print("main end...")
    

查看結果

main start...
asyncio start do 2020-01-03 11:17:29
asyncio do_fish start 16304 MainThread
asyncio do_soap start 16304 MainThread
asyncio do_rice start 16304 MainThread
asyncio do_soap finished 2020-01-03 11:17:31
asyncio do_fish finished 2020-01-03 11:17:32
asyncio do_rice finished 2020-01-03 11:17:34
main end...

總耗時34-29=5s,同進程同線程,異步執(zhí)行任務,主線程會等待協(xié)程執(zhí)行完畢

??通過上述實現(xiàn)我們對異步方式有了初步的了解,那么現(xiàn)在我們需要執(zhí)行批量任務,而且需要在其中一個任務失敗后終止其他的任務,不管是否在運行,我們考慮使用異步,先看看使用線程如何實現(xiàn)
  • 線程批量任務

class ThreadTest:
    def do_fish(self):
        print("thread do_fish start", os.getpid(), threading.current_thread().getName())
        # 模擬任務耗時
        sleep(3)
        print('thread do_fish finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    # 煲湯
    def do_soap(self):
        print("thread do_soap start", os.getpid(), threading.current_thread().getName())
        # 模擬io耗時
        sleep(2)
        print('thread do_soap finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        raise Exception("do_soap raise an exception.")

    # 煮米飯
    def do_rice(self):
        print("thread do_rice start\n", os.getpid(), threading.current_thread().getName())
        # 模擬io耗時
        sleep(5)
        print('thread do_rice finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    def start(self):
        print("thread start do", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        for i in range(1, 4):
            if i == 1:
                threading.Thread(target=self.do_fish).start()
            elif i == 2:
                threading.Thread(target=self.do_soap).start()
            elif i == 3:
                threading.Thread(target=self.do_rice).start()

??還是剛才的代碼,現(xiàn)在在do_soap中模擬2s耗時后拋出異常,這時候其實do_fish和do_rice都還沒執(zhí)行完畢;我們期望應該是后面兩個任務的finish不會打印,讓我們看看日志

main start...
thread start do 2020-01-03 13:09:30
thread do_fish start 22720 Thread-1
thread do_soap start 22720 Thread-2
thread do_rice start 22720 Thread-3
main end... 
thread do_soap finished 2020-01-03 13:09:32
Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:\Users\e-song.li\AppData\Local\Programs\Python\Python38\lib\threading.py", line 932, in _bootstrap_inner
    self.run()
  File "C:\Users\e-song.li\AppData\Local\Programs\Python\Python38\lib\threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "C:/Users/e-song.li/PycharmProjects/untitled/bk_test.py", line 79, in do_soap
    raise Exception("do_soap raise an exception.")
Exception: do_soap raise an exception.
thread do_fish finished 2020-01-03 13:09:33
thread do_rice finished 2020-01-03 13:09:35

Process finished with exit code 0

??雖然我們拋出了異常,但是并未影響到后面的兩個任務,do_fish和do_rice還是繼續(xù)運直到各自任務結束,這并不是我們希望的,我們對代碼進行修改,將線程設置成守護線程,daemon=True


class ThreadTest:
    def do_fish(self):
        print("thread do_fish start", os.getpid(), threading.current_thread().getName())
        # 模擬任務耗時
        sleep(3)
        print('thread do_fish finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    # 煲湯
    def do_soap(self):
        print("thread do_soap start", os.getpid(), threading.current_thread().getName())
        # 模擬io耗時
        sleep(2)
        print('thread do_soap finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        raise Exception("do_soap raise an exception.")

    # 煮米飯
    def do_rice(self):
        print("thread do_rice start\n", os.getpid(), threading.current_thread().getName())
        # 模擬io耗時
        sleep(5)
        print('thread do_rice finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    def start(self):
        print("thread start do", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        for i in range(1, 4):
            if i == 1:
                threading.Thread(target=self.do_fish, daemon=True).start()
            elif i == 2:
                threading.Thread(target=self.do_soap, daemon=True).start()
            elif i == 3:
                threading.Thread(target=self.do_rice, daemon=True).start()

運行結果如下

main start...
thread start do 2020-01-03 13:16:46
thread do_fish start 20384 Thread-1
thread do_soap start 20384 Thread-2
thread do_rice start
main end...

??仔細看下好像不太對,thread do_soap finished,這行好像并未打印,這條執(zhí)行后才拋出異常呢,怎么都沒有了;原來是守護線程執(zhí)行優(yōu)先級低于主線程,主線程結束了,子守護線程就結束了,主線程并不會等待守護子線程執(zhí)行完畢
??那么問題來了,我們雖然想異步執(zhí)行,但是也需要主線程等待所有子線程執(zhí)行完畢或者某個任務遇到異常后,在主線程繼續(xù)執(zhí)行剩余邏輯,不加守護進程又無法及時結束其他子線程任務,加了守護線程主線程又不能等待子線程任務執(zhí)行結束
??這里,我們可以使用線程的事件Event,可用于線程間的通信,控制線程間執(zhí)行順序;我們加入后,在主線程中等待子線程,如果子線程執(zhí)行結束或遇到異常,我們直接通知主線程進行下一步處理即可

class ThreadTest:
    wake_event = threading.Event()

    def do_fish(self):
        print("thread do_fish start", os.getpid(), threading.current_thread().getName())
        # 模擬任務耗時
        sleep(3)
        print('thread do_fish finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    # 煲湯
    def do_soap(self):
        print("thread do_soap start", os.getpid(), threading.current_thread().getName())
        # 模擬io耗時
        sleep(2)
        print('thread do_soap finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        # 通知主線程可以執(zhí)行了
        self.wake_event.set()
        raise Exception("do_soap raise an exception.")

    # 煮米飯
    def do_rice(self):
        print("thread do_rice start\n", os.getpid(), threading.current_thread().getName())
        # 模擬io耗時
        sleep(5)
        print('thread do_rice finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    def start(self):
        print("thread start do", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        for i in range(1, 4):
            if i == 1:
                threading.Thread(target=self.do_fish, daemon=True).start()
            elif i == 2:
                threading.Thread(target=self.do_soap, daemon=True).start()
            elif i == 3:
                threading.Thread(target=self.do_rice, daemon=True).start()

if __name__ == "__main__":
    print("main start...")
    # 多線程
    thread_test = ThreadTest()
    thread_test.start()
    # 等待event的通知才繼續(xù)執(zhí)行
    thread_test.wake_event.wait()
    # 重置event的狀態(tài)標識
    thread_test.wake_event.clear()
    print("main end...")
    

執(zhí)行結果如下

main start...
thread start do 2020-01-03 13:33:22
thread do_fish start 23708 Thread-1
thread do_soap start 23708 Thread-2
thread do_rice start 23708 Thread-3
Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:\Users\e-song.li\AppData\Local\Programs\Python\Python38\lib\threading.py", line 932, in _bootstrap_inner
thread do_soap finished 2020-01-03 13:33:24
main end...

??main end...最后打印,可見主線程等待了子線程,而且拋出異常后后面再無打印do_fish和do_rice任務的結束日志,這點來看,滿足了我們一個子線程任務異常,結束其他任務的需求;至于另外兩種方式,大家可以自行嘗試

總結:

  • 異步任務實現(xiàn)方式不只是使用thread,還可以使用多進程,協(xié)程的方式
  • 線程和進程都可以設置為守護的,設置后主線程不會等待他們運行結束,主線程結束他們也就結束了
  • 協(xié)程其實是系統(tǒng)內部調度實現(xiàn)的,所有的任務都是運行在一個進程和線程(主線程)中的
  • 執(zhí)行shell命令時,推薦使用p=subprocess.Popen,可以實時獲取執(zhí)行進度,通過readline或者readlines或者p.wait函數(shù)皆可以阻塞shell命令
  • 如果將subprocess.Popen放在thread中,不僅需要停止線程,還需要停止進程,即線程設置為守護,進程調用p.kill結束,兩個都調用才能結束當前執(zhí)行的任務
  • 如果p=subprocess.Popen中設置了shell=True,則會開啟兩個進程,雖然調用p.kill但是無法停止,需要將shell=False,即可
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容