1.簡介
并發(fā):同一時間段有幾個程序都處于已經(jīng)啟動到運行完畢之間,并且這幾個程序都在同一個處理機上運行,并發(fā)的兩種關(guān)系是同步和互斥;
互斥:進(jìn)程之間訪問臨界資源時相互排斥的現(xiàn)象;
同步:進(jìn)程之間存在依賴關(guān)系,一個進(jìn)程結(jié)束的輸出作為另一個進(jìn)程的輸入。具有同步關(guān)系的一組并發(fā)進(jìn)程之間發(fā)送的信息稱為消息或者事件;
并行:單處理器中進(jìn)程被交替執(zhí)行,表現(xiàn)出一種并發(fā)的外部特征;在多處理器中,進(jìn)程可以交替執(zhí)行,還能重疊執(zhí)行,實現(xiàn)并行處理,并行就是同事發(fā)生的多個并發(fā)事件,具有并發(fā)的含義,但并發(fā)不一定是并行,也就是說事件之間不一定要同一時刻發(fā)生;
多線程:多線程是進(jìn)程中并發(fā)運行的一段代碼,能夠?qū)崿F(xiàn)線程之間的切換執(zhí)行;
異步:和同步相對,同步是順序執(zhí)行,而異步是彼此獨立,在等待某個事件的過程中繼續(xù)做自己的事,不要等待這一事件完成后再工作。線程是實現(xiàn)異步的一個方式,異步是讓調(diào)用方法的主線程不需要同步等待另一個線程的完成,從而讓主線程干其他事情。
異步和多線程:不是同等關(guān)系,異步是目的,多線程只是實現(xiàn)異步的一個手段,實現(xiàn)異步可以采用多線程技術(shù)或者交給其他進(jìn)程來處理。
2)同步和異步,阻塞和非阻塞
同步和異步關(guān)注的是結(jié)果消息的通信機制
- 同步:同步的意思就是調(diào)用方需要主動等待結(jié)果的返回
- 異步:異步的意思就是不需要主動等待結(jié)果的返回,而是通過其他手段比如,狀態(tài)通知,回調(diào)函數(shù)等。
阻塞和非阻塞主要關(guān)注的是等待結(jié)果返回調(diào)用方的狀態(tài)
阻塞:是指結(jié)果返回之前,當(dāng)前線程被掛起,不做任何事
非阻塞:是指結(jié)果在返回之前,線程可以做一些其他事,不會被掛起。
---------------------
異步非阻塞:異步非阻塞這也是現(xiàn)在高并發(fā)編程的一個核心
2.asyncio簡介
asyncio模塊提供了使用協(xié)程構(gòu)建并發(fā)應(yīng)用的工具。它使用一種單線程單進(jìn)程的的方式實現(xiàn)并發(fā),應(yīng)用的各個部分彼此合作, 可以顯示的切換任務(wù),一般會在程序阻塞I/O操作的時候發(fā)生上下文切換如等待讀寫文件,或者請求網(wǎng)絡(luò)。同時asyncio也支持調(diào)度代碼在將來的某個特定事件運行,從而支持一個協(xié)程等待另一個協(xié)程完成,以處理系統(tǒng)信號和識別其他一些事件。
1)異步并發(fā)的概念
對于其他的并發(fā)模型大多數(shù)采取的都是線性的方式編寫。并且依賴于語言運行時系統(tǒng)或操作系統(tǒng)的底層線程或進(jìn)程來適當(dāng)?shù)馗淖兩舷挛?,而基于asyncio的應(yīng)用要求應(yīng)用代碼顯示的處理上下文切換。asyncio提供的框架以事件循環(huán)(event loop)為中心,程序開啟一個無限的循環(huán),程序會把一些函數(shù)注冊到事件循環(huán)上。當(dāng)滿足事件發(fā)生的時候,調(diào)用相應(yīng)的協(xié)程函數(shù)。
事件循環(huán)
事件循環(huán)是一種處理多并發(fā)量的有效方式,在維基百科中它被描述為「一種等待程序分配事件或消息的編程架構(gòu)」,我們可以定義事件循環(huán)來簡化使用輪詢方法來監(jiān)控事件,通俗的說法就是「當(dāng)A發(fā)生時,執(zhí)行B」。事件循環(huán)利用poller對象,使得程序員不用控制任務(wù)的添加、刪除和事件的控制。事件循環(huán)使用回調(diào)方法來知道事件的發(fā)生。它是asyncio提供的「中央處理設(shè)備」,支持如下操作:
? ? (1) 注冊、執(zhí)行和取消延遲調(diào)用(超時)
? ? (2) 創(chuàng)建可用于多種類型的通信的服務(wù)端和客戶端的Transports
? ? (3) 啟動進(jìn)程以及相關(guān)的和外部通信程序的Transports
? ? (4) 將耗時函數(shù)調(diào)用委托給一個線程池
? ? (5) # 單線程(進(jìn)程)的架構(gòu)也避免的多線程(進(jìn)程)修改可變狀態(tài)的鎖的問題。
2) 與事件循環(huán)交互的應(yīng)用要顯示地注冊將運行的代碼,讓事件循環(huán)在資源可用時向應(yīng)用代碼發(fā)出必要的調(diào)用。
Future
future是一個數(shù)據(jù)結(jié)構(gòu),表示還未完成的工作結(jié)果。事件循環(huán)可以監(jiān)視Future對象是否完成。從而允許應(yīng)用的一部分等待另一部分完成一些工作。
Task
task是Future的一個子類,它知道如何包裝和管理一個協(xié)程的執(zhí)行。任務(wù)所需的資源可用時,事件循環(huán)會調(diào)度任務(wù)允許,并生成一個結(jié)果,從而可以由其他協(xié)程消費。
3)一個異步方法:
async def async_double(x):
????????return 2 * x
print(await async_double(x))
從外觀上看異步方法和標(biāo)準(zhǔn)方法沒什么區(qū)別只是前面多了個async。
"Async" 是"asynchronous"的簡寫,為了區(qū)別于異步函數(shù),我們稱標(biāo)準(zhǔn)函數(shù)為同步函數(shù),
從用戶角度異步函數(shù)和同步函數(shù)有以下區(qū)別:
要調(diào)用異步函數(shù),必須使用await關(guān)鍵字。 因此,不要寫regulardouble(3),而是寫await asyncdouble(3).
不能在同步函數(shù)里使用await,否則會出錯。
-------------------------------------------------------------------------------------------------------------------
4)啟動一個協(xié)程
一般異步方法被稱之為協(xié)程(Coroutine)。asyncio事件循環(huán)可以通過多種不同的方法啟動一個協(xié)程。一般對于入口函數(shù),最簡答的方法就是使用run_until_complete,并將協(xié)程直接傳入這個方法。
import asyncio
async def foo:
? ? print("這是一個協(xié)程")
? ? return 21
if __name__ == '__main__':
? ? loop = asyncio.get_event_loop()
? ? try:
? ? ? ? print("開始運行協(xié)程")
? ? ? ? ?coro = foo
? ? ? ? ?print("進(jìn)入事件循環(huán)")
? ? ? ? #?loop.run_until_complete(coro())
? ? ? ? result =? loop.run_until_complete(coro())
? ??????print(f"run_until_complete可以獲取協(xié)程的{result},默認(rèn)輸出None")
? ? ?finally:
? ? ? ? ?print("關(guān)閉事件循環(huán)")
? ? ? ? ?loop.close
第一步首先得到一個事件循環(huán)的應(yīng)用也就是定義的對象loop。可以使用默認(rèn)的事件循環(huán),也可以實例化一個特定的循環(huán)類(比如uvloop),這里使用了默認(rèn)循環(huán)run_unti_lcomplete(coro())方法用這個協(xié)程啟動循環(huán),協(xié)程返回時這個方法將停止循環(huán)。
rununtilcomplete的參數(shù)是一個futrue對象。當(dāng)傳入一個協(xié)程,其內(nèi)部會自動封裝成task,其中task是Future的子類。關(guān)于task和future后面會提到
#協(xié)程調(diào)用協(xié)程
import asyncio
async def result1():
????print("這是result1協(xié)程")
????return "result1"
async def result2(arg):
????print("這是result2協(xié)程")
????return f"result2接收了一個參數(shù),{arg}"
async def main():
????print("主協(xié)程")
????print("等待result1協(xié)程運行")
????res1 = await result1()
????print("等待result2協(xié)程運行")
????res2 = await result2(res1)
????return (res1,res2)
if __name__ == '__main__':
????loop = asyncio.get_event_loop()
????try:
????????result = loop.run_until_complete(main())
????????print(f"獲取返回值:{result}")
????finally:
????????print("關(guān)閉事件循環(huán)")
????????loop.close
5) 協(xié)程中調(diào)用普通函數(shù)
在協(xié)程中可以通過一些方法去調(diào)用普通的函數(shù)??梢允褂玫年P(guān)鍵字有callsoon,calllater,call_at。
(1)call_soon
可以通過字面意思理解調(diào)用立即返回(或者說,立即調(diào)用回調(diào)函數(shù))。
loop.call_soon(callback, *args, context=None)
在下一個迭代的時間循環(huán)中立刻調(diào)用回調(diào)函數(shù),大部分的回調(diào)函數(shù)支持位置參數(shù),而不支持"關(guān)鍵字參數(shù)",如果是想要使用關(guān)鍵字參數(shù),則推薦使用functools.aprtial對方法進(jìn)一步包裝.可選關(guān)鍵字context允許指定要運行的回調(diào)的自定義contextvars.Context。當(dāng)沒有提供上下文時使用當(dāng)前上下文。在Python 3.7中, asyncio協(xié)程加入了對上下文的支持。使用上下文就可以在一些場景下隱式地傳遞變量,比如數(shù)據(jù)庫連接session等,而不需要在所有方法調(diào)用顯示地傳遞這些變量。
import asyncio
import functools
def callback(args, kwargs="defalut"):
? ? # f支持字符串里加入變量,相當(dāng)于動態(tài)更新
? ? print(f"普通函數(shù)做為回調(diào)函數(shù),獲取參數(shù):{args},{kwargs}")
async def main(loop):
? ? print("注冊callback")
? ? loop.call_soon(callback, 1)
? ? wrapped= functools.partial(callback,kwargs="not defalut") # 將關(guān)鍵字參數(shù)先行封裝
? ? loop.call_soon(wrapped, 2)
? ? await asyncio.sleep(0.2)
if __name__== '__main__':
? ? loop= asyncio.get_event_loop()
? ? try:
? ? ? ? print(loop,'\n',type(loop))
? ? ? ? loop.run_until_complete(main(loop))
? ? finally:
? ? ? ? loop.close()
(2)call_later
loop.call_later(delay, callback, *args, context=None)
首先簡單的說一下它的含義,就是事件循環(huán)在delay多長時間之后才執(zhí)行callback函數(shù)
import asyncio
def callback(n):
? ? print(f"callback {n} invoked")
async def main(loop):
? ? print("注冊callbacks")
? ? loop.call_later(0.2, callback, 1)
? ? loop.call_later(0.1, callback, 2)
? ? loop.call_soon(callback, 3)
? ? await asyncio.sleep(0.4)
if __name__== '__main__':
? ? loop= asyncio.get_event_loop()
? ? try:
? ? ? ? loop.run_until_complete(main(loop))
? ? finally:
? ? ? ? loop.close()
1.callsoon會在calllater之前執(zhí)行,和它的位置在哪無關(guān)
2.call_later的第一個參數(shù)越小,越先執(zhí)行。
(3) call_at()
loop.call_at(when, callback, *args, context=None)
callat第一個參數(shù)的含義代表的是一個單調(diào)時間,它和我們平時說的系統(tǒng)時間有點差異,這里的時間指的是事件循環(huán)內(nèi)部時間,可以通過loop.time獲取,然后可以在此基礎(chǔ)上進(jìn)行操作.實際上calllater內(nèi)部就是調(diào)用的call_at。
import asyncio
def call_back(n, loop):
? ? print(f"callback {n} 運行時間點{loop.time()}")
async def main(loop):
? ? now= loop.time()
? ? print("當(dāng)前的內(nèi)部時間", now)
? ? print("循環(huán)時間", now)
? ? print("注冊callback")
? ? loop.call_at(now+ 0.1, call_back, 1, loop)
? ? loop.call_at(now+ 0.2, call_back, 2, loop)
? ? loop.call_soon(call_back, 3, loop)
? ? await asyncio.sleep(1)
if __name__== '__main__':
? ? loop= asyncio.get_event_loop()
? ? try:
? ? ? ? print("進(jìn)入事件循環(huán)")
? ? ? ? loop.run_until_complete(main(loop))
? ? finally:
? ? ? ? print("關(guān)閉循環(huán)")
? ? ? ? loop.close()
6)future
future表示還沒有完成的工作結(jié)果。事件循環(huán)可以通過監(jiān)視一個future對象的狀態(tài)來指示它已經(jīng)完成。future對象有幾個狀態(tài):
import asyncio
def foo(future, result):
? ? print(f"此時future的狀態(tài):{future}")
? ? print(f"設(shè)置future的結(jié)果:{result}")
? ? future.set_result(result)
? ? print(f"此時future的狀態(tài):{future}")
? ? print(f"此時future的狀態(tài):{future.result()}") # future.result(),需要先設(shè)置
if __name__== '__main__':
? ? loop= asyncio.get_event_loop()
? ? try:
? ? ? ? all_done= asyncio.Future()
? ? ? ? loop.call_soon(foo, all_done, "Future is done!")
? ? ? ? print("進(jìn)入事件循環(huán)")
? ? ? ? result= loop.run_until_complete(all_done)
? ? ? ? print("返回結(jié)果", result)
? ? finally:
? ? ? ? print("關(guān)閉事件循環(huán)")
? ? ? ? loop.close()
? ? ? ? print("獲取future的結(jié)果", all_done.result())
---------------------------------------------------------------------------------
future和協(xié)程一樣可以使用await關(guān)鍵字獲取其結(jié)果。
def foo(future, result):
? ? print("設(shè)置結(jié)果到future", result)
? ? future.set_result(result)
async def main(loop):
? ? all_done= asyncio.Future()
? ? print("調(diào)用函數(shù)獲取future對象")
? ? loop.call_soon(foo, all_done, "the result")
? ? result= await all_done# 獲取狀態(tài)
? ? # result = all_done.result() 沒有設(shè)置,不能用
? ? print("獲取future里的結(jié)果", result)
if __name__== '__main__':
? ? loop= asyncio.get_event_loop()
? ? try:
? ? ? ? loop.run_until_complete(main(loop))
? ? finally:
? ? ? ? loop.close()
Future回調(diào)
Future 在完成的時候可以執(zhí)行一些回調(diào)函數(shù),回調(diào)函數(shù)按注冊時的順序進(jìn)行調(diào)用:
import asyncio
import functools,time
def callback(future, n):
? ? print(future)
? ? print('{}: future done: {}'.format(n, future.result()))
async def register_callbacks(all_done):
? ? print('注冊callback到future對象')
? ? # all_done是future對象,最為第一個參數(shù),填入,注冊完,并不會立即回掉
? ? all_done.add_done_callback(functools.partial(callback, n=1))
? ? all_done.add_done_callback(functools.partial(callback, n=2))
async def main(all_done):
? ? await register_callbacks(all_done)
? ? print('設(shè)置future的結(jié)果,該協(xié)程結(jié)束后執(zhí)行回調(diào)')
? ? all_done.set_result('the result')
? ? time.sleep(2)
? ? print('===========')
if __name__== '__main__':
? ? loop= asyncio.get_event_loop()
? ? try:
? ? ? ? all_done= asyncio.Future()
? ? ? ? loop.run_until_complete(main(all_done))
? ? finally:
? ? ? ? loop.close()
通過adddonecallback方法給futrue任務(wù)添加回調(diào)函數(shù),當(dāng)future執(zhí)行完成的時候,就會調(diào)用回調(diào)函數(shù)。
三、并發(fā)的執(zhí)行任務(wù)
?任務(wù)(Task)是與事件循環(huán)交互的主要途徑之一。任務(wù)可以包裝協(xié)程,可以跟蹤協(xié)程何時完成。任務(wù)是Future的子類,所以使用方法和future一樣。協(xié)程可以等待任務(wù),每個任務(wù)都有一個結(jié)果,在它完成之后可以獲取這個結(jié)果。因為協(xié)程是沒有狀態(tài)的,我們通過使用create_task方法可以將協(xié)程包裝成有狀態(tài)的任務(wù)。還可以在任務(wù)運行的過程中取消任務(wù)。
import asyncio
async def child():
? ? print("進(jìn)入子協(xié)程")
? ? return "the result"
async def main(loop):
? ? print("將協(xié)程child包裝成任務(wù)")
? ? task= loop.create_task(child())
? ? # print("通過cancel方法可以取消任務(wù)")
# task.cancel()
# print(9999)
? ? try:
? ? ? ? await task
# print(1000)
? ? except asyncio.CancelledError:
? ? ? ? print("取消任務(wù)拋出CancelledError異常")
? ? else:
? ? ? ? print("獲取任務(wù)的結(jié)果", task.result())
if __name__== '__main__':
? ? loop= asyncio.get_event_loop()
? ? try:
? ? ? ? loop.run_until_complete(main(loop))
? ? finally:
? ? ? ? loop.close()
另外出了使用loop.create_task()將協(xié)程包裝為任務(wù)外還可以使用asyncio.ensurefuture(coroutine)建一個task。在python3.7中可以使用asyncio.create_task創(chuàng)建任務(wù)。
四、組合協(xié)程
一系列的協(xié)程可以通過await鏈?zhǔn)降恼{(diào)用,但是有的時候我們需要在一個協(xié)程里等待多個協(xié)程,比如我們在一個協(xié)程里等待1000個異步網(wǎng)絡(luò)請求,對于訪問次序有沒有要求的時候,就可以使用另外的關(guān)鍵字wait或gather來解決了。
1) wait可以暫停一個協(xié)程,直到后臺操作完成。
等待多個協(xié)程:
import asyncio
async def num(n):
? ? try:
? ? ? ? await asyncio.sleep(n*0.1)
? ? ? ? return n
except asyncio.CancelledError:
? ? ? ? print(f"數(shù)字{n}被取消")
? ? ? ? raise
async def main():
? ? tasks= [num(i) for iin range(10)]
? ? complete, pending= await asyncio.wait(tasks, timeout=0.5)
? ? for iin complete:
? ? ? ? print("當(dāng)前數(shù)字",i.result())
? ? if pending:
? ? ? ? print("取消未完成的任務(wù)")
? ? ? ? for pin pending:
? ? ? ? ? ? p.cancel()
2)?gather的使用
gather的作用和wait類似不同的是。
1.gather任務(wù)無法取消。
2.返回值是一個結(jié)果列表
3.可以按照傳入?yún)?shù)的順序,順序輸出。
complete= await asyncio.gather(*tasks)
for iin complete:
? ? print("當(dāng)前數(shù)字", i)
gather通常被用來階段性的一個操作,做完第一步才能做第二步:
"""
可以通過上面結(jié)果得到如下結(jié)論:1.step1和step2是并行運行的。
2.gather會等待最耗時的那個完成之后才返回結(jié)果,耗時總時間取決于其中任務(wù)最長時間的那個。"""
async def step1(n, start):
? ? await asyncio.sleep(n)
? ? print("第一階段完成")
? ? print("此時用時", time.time() - start)
? ? return n
async def step2(n, start):
? ? await asyncio.sleep(n)
? ? print("第二階段完成")
? ? print("此時用時", time.time() - start)
? ? return n
async def main():
? ? now= time.time()
? ? result= await asyncio.gather(step1(5, now), step2(2, now))
? ? for iin result:
? ? ? ? print(i)
? ? ? ? print("總用時", time.time() - now)
if __name__== '__main__':
? ? loop= asyncio.get_event_loop()
? ? try:
? ? ? ? loop.run_until_complete(main())
? ? finally:
? ? ? ? loop.close()
3)任務(wù)完成時進(jìn)行處理
"""
任務(wù)完成時進(jìn)行處理ascomplete是一個生成器,會管理指定的一個任務(wù)列表,并生成他們的結(jié)果。每個協(xié)程結(jié)束運行時一次生成一個結(jié)果。
與wait一樣,ascomplete不能保證順序,不過執(zhí)行其他動作之前沒有必要等待所以后臺操作完成。"""
import asyncio
import time
async def foo(n):
? ? # time.sleep(2) # 暫停上下文全局時間
? ? print('Waiting: ', n)
? ? await asyncio.sleep(n)
? ? return n
async def main():
? ? coroutine1= foo(1)
? ? coroutine2= foo(2)
? ? coroutine3= foo(4)
? ? tasks= [
? ? asyncio.ensure_future(coroutine1),
? ? asyncio.ensure_future(coroutine2),
? ? asyncio.ensure_future(coroutine3)
? ? ]
? ? for taskin asyncio.as_completed(tasks):
? ? ? ? result= await task
print('Task ret: {}'.format(result))
start= time.time()
loop= asyncio.get_event_loop()
done= loop.run_until_complete(main())
print(time.time() - start)
可以發(fā)現(xiàn)結(jié)果逐個輸出。