情景
??在開發(fā)中,我們處理耗時任務時,通常考慮使用異步處理
實現(xiàn)方式
??一般我們實現(xiàn)異步的方式有三種,分別如下:
- 多進程
- 多線程
- 異步IO/協(xié)程
三種方式異同點
1,多進程能利用多核 CPU,但內存開銷大
2,多線程在操作系統(tǒng)層面也可以使用多核CPU,但是由于鎖的問題寫法比較繁瑣且不易理解,雖然加了個GIL(Global Interpreter Lock),但是加了后又只能同時執(zhí)行一個任務,也就是只能使用一個CPU。內存開銷比進程小
3,py3新加的 asyncio 是用來做異步 io 的,3.5前使用@asyncio.coroutine注解加yield&from搭配來使用,3.5后使用關鍵字async&await使用。asyncio/協(xié)程相當于在python 中實現(xiàn)一個內核調度系統(tǒng),且在一個線程執(zhí)行(主線程),協(xié)程在進行 io 阻塞時,安排別的協(xié)程(任務)繼續(xù)運行;內存開銷更小
實戰(zhàn)編碼實現(xiàn)(基于py 3.8.x)
??首先我們預先編寫三個耗時任務,分別是做魚,做湯,煮飯,如果只有一個鍋的話,我們就只能等待上一個做好才能繼續(xù)下一個,也就是同步執(zhí)行?,F(xiàn)在我們需要買三個鍋,就是異步執(zhí)行
import os
from time import sleep
class SyncTest:
# 做魚
def do_fish(self):
print("do_fish start")
# 模擬任務耗時
sleep(3)
print('do_fish finished',
datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 洗完的時候, 洗衣機會響一下, 告訴我們洗完了
# 煲湯
def do_soap(self):
print("do_soap start")
# 模擬io耗時
sleep(2)
print('do_soap finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 煮米飯
def do_rice(self):
print("do_rice start")
# 模擬io耗時
sleep(5)
print('do_rice finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def start(self):
print("start do", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
self.do_fish()
self.do_soap()
self.do_rice()
if __name__ == "__main__":
print("main start...")
# 同步
SyncTest().start()
print("main end...")
先看下同步結果:
start do 2020-01-03 11:05:24
do_fish start 16440 MainThread
do_fish finished 2020-01-03 11:05:27
do_soap start 16440 MainThread
do_soap finished 2020-01-03 11:05:29
do_rice start 16440 MainThread
do_rice finished 2020-01-03 11:05:34
main end...
可以看到同步任務就是需要34-24 =10s,也就是3+2+5,且都是在同一個進程和線程執(zhí)行的;那么我們現(xiàn)在又多買了兩口鍋,三口鍋一起做;我們采用異步任務來實驗,實現(xiàn)如下
-
多進程做飯
在Windows中使用process模塊的注意事項
??在Windows操作系統(tǒng)中由于沒有fork(Linux操作系統(tǒng)中創(chuàng)建進程的機制),在創(chuàng)建 子進程的是時候會自動import啟動它的這個文件,而在import的時候又執(zhí)行了整個文件。因此如果process()直接寫在文件中就會無限遞歸創(chuàng)建子進程報錯。所以必須把創(chuàng)建子進程的部分使用if name =='main'判斷保護起來,import的時候,就不會遞歸運行了
import multiprocessing
if __name__ == "__main__":
print("main start...")
# 多進程
for i in range(1, 4):
if i == 1:
print("process start do", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
multiprocessing.Process(target=do_fish, args=(i,)).start()
elif i == 2:
multiprocessing.Process(target=do_soap).start()
elif i == 3:
multiprocessing.Process(target=do_rice).start()
print("main end...")
看下執(zhí)行結果:
main start...
process start do 2020-01-03 11:11:12
main end...
process do_soap start 25692 MainThread
process do_rice start 216 MainThread
process do_fish start 1428 MainThread
process do_soap finished 2020-01-03 11:11:14
process do_fish finished 2020-01-03 11:11:15
process do_rice finished 2020-01-03 11:11:17
可以看到,總耗時17-12=5s,不同進程相同線程,異步執(zhí)行任務,主線程不會等待子線程執(zhí)行完畢
-
多線程做飯
import threading
class ThreadTest:
# 做魚
def do_fish(self):
print("thread do_fish start", os.getpid(), threading.current_thread().getName())
# 模擬任務耗時
sleep(3)
print('thread do_fish finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 煲湯
def do_soap(self):
print("thread do_soap start", os.getpid(), threading.current_thread().getName())
# 模擬io耗時
sleep(2)
print('thread do_soap finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 煮米飯
def do_rice(self):
print("thread do_rice start\n", os.getpid(), threading.current_thread().getName())
# 模擬io耗時
sleep(5)
print('thread do_rice finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def start(self):
print("thread start do", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
for i in range(1, 4):
if i == 1:
threading.Thread(target=self.do_fish).start()
elif i == 2:
threading.Thread(target=self.do_soap).start()
elif i == 3:
threading.Thread(target=self.do_rice).start()
if __name__ == "__main__":
print("main start...")
ThreadTest().start()
print("main end...")
看下執(zhí)行結果:
main start...
thread start do 2020-01-03 11:15:19
thread do_fish start 20144 Thread-1
thread do_soap start 20144 Thread-2
thread do_rice start 20144 Thread-3
main end...
thread do_soap finished 2020-01-03 11:15:21
thread do_fish finished 2020-01-03 11:15:22
thread do_rice finished 2020-01-03 11:15:24
總耗時24-19=5s,同進程不同線程,異步執(zhí)行任務,主線程不會等待子線程執(zhí)行完畢
-
異步io/協(xié)程做飯
import asyncio
class AsyncIoTest:
async def do_fish(self):
print("asyncio do_fish start", os.getpid(), threading.current_thread().getName())
# 模擬io耗時
await asyncio.sleep(3)
print('asyncio do_fish finished',
datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 洗完的時候, 洗衣機會響一下, 告訴我們洗完了
async def do_soap(self):
print("asyncio do_soap start", os.getpid(), threading.current_thread().getName())
# 模擬io耗時
await asyncio.sleep(2)
print('asyncio do_soap finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# raise Exception("aaaa")
async def do_rice(self):
print("asyncio do_rice start", os.getpid(), threading.current_thread().getName())
# 模擬io耗時
await asyncio.sleep(5)
print('asyncio do_rice finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def start(self):
print("asyncio start do", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([self.do_fish(), self.do_soap(), self.do_rice()]))
loop.close()
if __name__ == "__main__":
print("main start...")
# 協(xié)程
AsyncIoTest().start()
print("main end...")
查看結果
main start...
asyncio start do 2020-01-03 11:17:29
asyncio do_fish start 16304 MainThread
asyncio do_soap start 16304 MainThread
asyncio do_rice start 16304 MainThread
asyncio do_soap finished 2020-01-03 11:17:31
asyncio do_fish finished 2020-01-03 11:17:32
asyncio do_rice finished 2020-01-03 11:17:34
main end...
總耗時34-29=5s,同進程同線程,異步執(zhí)行任務,主線程會等待協(xié)程執(zhí)行完畢
??通過上述實現(xiàn)我們對異步方式有了初步的了解,那么現(xiàn)在我們需要執(zhí)行批量任務,而且需要在其中一個任務失敗后終止其他的任務,不管是否在運行,我們考慮使用異步,先看看使用線程如何實現(xiàn)
-
線程批量任務
class ThreadTest:
def do_fish(self):
print("thread do_fish start", os.getpid(), threading.current_thread().getName())
# 模擬任務耗時
sleep(3)
print('thread do_fish finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 煲湯
def do_soap(self):
print("thread do_soap start", os.getpid(), threading.current_thread().getName())
# 模擬io耗時
sleep(2)
print('thread do_soap finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
raise Exception("do_soap raise an exception.")
# 煮米飯
def do_rice(self):
print("thread do_rice start\n", os.getpid(), threading.current_thread().getName())
# 模擬io耗時
sleep(5)
print('thread do_rice finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def start(self):
print("thread start do", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
for i in range(1, 4):
if i == 1:
threading.Thread(target=self.do_fish).start()
elif i == 2:
threading.Thread(target=self.do_soap).start()
elif i == 3:
threading.Thread(target=self.do_rice).start()
??還是剛才的代碼,現(xiàn)在在do_soap中模擬2s耗時后拋出異常,這時候其實do_fish和do_rice都還沒執(zhí)行完畢;我們期望應該是后面兩個任務的finish不會打印,讓我們看看日志
main start...
thread start do 2020-01-03 13:09:30
thread do_fish start 22720 Thread-1
thread do_soap start 22720 Thread-2
thread do_rice start 22720 Thread-3
main end...
thread do_soap finished 2020-01-03 13:09:32
Exception in thread Thread-2:
Traceback (most recent call last):
File "C:\Users\e-song.li\AppData\Local\Programs\Python\Python38\lib\threading.py", line 932, in _bootstrap_inner
self.run()
File "C:\Users\e-song.li\AppData\Local\Programs\Python\Python38\lib\threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "C:/Users/e-song.li/PycharmProjects/untitled/bk_test.py", line 79, in do_soap
raise Exception("do_soap raise an exception.")
Exception: do_soap raise an exception.
thread do_fish finished 2020-01-03 13:09:33
thread do_rice finished 2020-01-03 13:09:35
Process finished with exit code 0
??雖然我們拋出了異常,但是并未影響到后面的兩個任務,do_fish和do_rice還是繼續(xù)運直到各自任務結束,這并不是我們希望的,我們對代碼進行修改,將線程設置成守護線程,daemon=True
class ThreadTest:
def do_fish(self):
print("thread do_fish start", os.getpid(), threading.current_thread().getName())
# 模擬任務耗時
sleep(3)
print('thread do_fish finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 煲湯
def do_soap(self):
print("thread do_soap start", os.getpid(), threading.current_thread().getName())
# 模擬io耗時
sleep(2)
print('thread do_soap finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
raise Exception("do_soap raise an exception.")
# 煮米飯
def do_rice(self):
print("thread do_rice start\n", os.getpid(), threading.current_thread().getName())
# 模擬io耗時
sleep(5)
print('thread do_rice finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def start(self):
print("thread start do", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
for i in range(1, 4):
if i == 1:
threading.Thread(target=self.do_fish, daemon=True).start()
elif i == 2:
threading.Thread(target=self.do_soap, daemon=True).start()
elif i == 3:
threading.Thread(target=self.do_rice, daemon=True).start()
運行結果如下
main start...
thread start do 2020-01-03 13:16:46
thread do_fish start 20384 Thread-1
thread do_soap start 20384 Thread-2
thread do_rice start
main end...
??仔細看下好像不太對,thread do_soap finished,這行好像并未打印,這條執(zhí)行后才拋出異常呢,怎么都沒有了;原來是守護線程執(zhí)行優(yōu)先級低于主線程,主線程結束了,子守護線程就結束了,主線程并不會等待守護子線程執(zhí)行完畢
??那么問題來了,我們雖然想異步執(zhí)行,但是也需要主線程等待所有子線程執(zhí)行完畢或者某個任務遇到異常后,在主線程繼續(xù)執(zhí)行剩余邏輯,不加守護進程又無法及時結束其他子線程任務,加了守護線程主線程又不能等待子線程任務執(zhí)行結束
??這里,我們可以使用線程的事件Event,可用于線程間的通信,控制線程間執(zhí)行順序;我們加入后,在主線程中等待子線程,如果子線程執(zhí)行結束或遇到異常,我們直接通知主線程進行下一步處理即可
class ThreadTest:
wake_event = threading.Event()
def do_fish(self):
print("thread do_fish start", os.getpid(), threading.current_thread().getName())
# 模擬任務耗時
sleep(3)
print('thread do_fish finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 煲湯
def do_soap(self):
print("thread do_soap start", os.getpid(), threading.current_thread().getName())
# 模擬io耗時
sleep(2)
print('thread do_soap finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 通知主線程可以執(zhí)行了
self.wake_event.set()
raise Exception("do_soap raise an exception.")
# 煮米飯
def do_rice(self):
print("thread do_rice start\n", os.getpid(), threading.current_thread().getName())
# 模擬io耗時
sleep(5)
print('thread do_rice finished', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def start(self):
print("thread start do", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
for i in range(1, 4):
if i == 1:
threading.Thread(target=self.do_fish, daemon=True).start()
elif i == 2:
threading.Thread(target=self.do_soap, daemon=True).start()
elif i == 3:
threading.Thread(target=self.do_rice, daemon=True).start()
if __name__ == "__main__":
print("main start...")
# 多線程
thread_test = ThreadTest()
thread_test.start()
# 等待event的通知才繼續(xù)執(zhí)行
thread_test.wake_event.wait()
# 重置event的狀態(tài)標識
thread_test.wake_event.clear()
print("main end...")
執(zhí)行結果如下
main start...
thread start do 2020-01-03 13:33:22
thread do_fish start 23708 Thread-1
thread do_soap start 23708 Thread-2
thread do_rice start 23708 Thread-3
Exception in thread Thread-2:
Traceback (most recent call last):
File "C:\Users\e-song.li\AppData\Local\Programs\Python\Python38\lib\threading.py", line 932, in _bootstrap_inner
thread do_soap finished 2020-01-03 13:33:24
main end...
??main end...最后打印,可見主線程等待了子線程,而且拋出異常后后面再無打印do_fish和do_rice任務的結束日志,這點來看,滿足了我們一個子線程任務異常,結束其他任務的需求;至于另外兩種方式,大家可以自行嘗試
總結:
- 異步任務實現(xiàn)方式不只是使用thread,還可以使用多進程,協(xié)程的方式
- 線程和進程都可以設置為守護的,設置后主線程不會等待他們運行結束,主線程結束他們也就結束了
- 協(xié)程其實是系統(tǒng)內部調度實現(xiàn)的,所有的任務都是運行在一個進程和線程(主線程)中的
- 執(zhí)行shell命令時,推薦使用p=subprocess.Popen,可以實時獲取執(zhí)行進度,通過readline或者readlines或者p.wait函數(shù)皆可以阻塞shell命令
- 如果將subprocess.Popen放在thread中,不僅需要停止線程,還需要停止進程,即線程設置為守護,進程調用p.kill結束,兩個都調用才能結束當前執(zhí)行的任務
- 如果p=subprocess.Popen中設置了shell=True,則會開啟兩個進程,雖然調用p.kill但是無法停止,需要將shell=False,即可