1. 內(nèi)存管理機制
1.1 賦值語句的內(nèi)存分析
- 賦值語句都是“引用”,可以這樣理解,但是,這種“引用”是可以改變指向的
- 非賦值語句時
- 對于可變類型的數(shù)據(jù), 創(chuàng)建一塊新內(nèi)存(Set、Dictionary、List)
- 對于不可變數(shù)據(jù)類型
簡單數(shù)據(jù),存在使用“引用”,不存在創(chuàng)建新內(nèi)存
復(fù)雜大數(shù)據(jù),創(chuàng)建新內(nèi)存
1.2 垃圾回收機制
1.2.1 垃圾自動回收
-
以引用計數(shù)為主,分代收集為輔
# 引用計數(shù) # 1.每個對象都有存有指向該對象的引用總數(shù) # 2.查看某個對象的引用計數(shù) --> sys.getrefcount(obj) # 3.可以使用del關(guān)鍵字刪除某個引用# 分代回收 # python將所有的對象分為0,1,2三代 # 所有的新建對象都是0代對象 # 當(dāng)某一代對象經(jīng)歷垃圾回收,依然存活,那么它就被歸入下一代對象 如果一個對象的引用數(shù)為0,python虛擬機就會回收這個對象的內(nèi)存
引用計數(shù)的缺陷是循環(huán)引用問題
-
垃圾自動回收總結(jié)
# 垃圾自動回收 # 滿足特定條件,自動啟動垃圾回收 # 當(dāng)python運行時,會記錄其中分配對象(object allocation)和取消分配對象(object deallocation)的次數(shù) # 當(dāng)兩者的差值高于某個閥值時,垃圾回收才會啟動 # 查看閥值gc.get_threshold()
1.2.2 垃圾手動回收
# 垃圾手動回收
# gc.collect()手動回收
# objgraph模塊中的count(obj)記錄當(dāng)前類產(chǎn)生的實例對象的個數(shù)
1.3 內(nèi)存管理機制
1.3.1 內(nèi)存池機制
當(dāng)創(chuàng)建大量消耗小內(nèi)存的對象時,頻繁調(diào)用new/malloc會導(dǎo)致大量的內(nèi)存碎片,致使效率降低。內(nèi)存池的概念就是在內(nèi)存中申請一定數(shù)量的,大小相等的內(nèi)存塊留作備用,當(dāng)有新的內(nèi)存需求時,就先從內(nèi)存池中分配內(nèi)存給這個需求,不夠了之后再申請新的內(nèi)存。這樣做最顯著的優(yōu)勢就是能夠減少內(nèi)存碎片,提升效率
1.3.2 Python3內(nèi)存管理機制——Pymalloc
- 針對小對象(<=512bytes),pymalloc會在內(nèi)存池中申請內(nèi)存空間
- 當(dāng)>512bytes,則會PyMem_RawMalloc和PyMem_RawRealloc()來申請新的內(nèi)存空間
2. Python多線程
2.1 進程、線程、協(xié)程介紹
2.1.1 關(guān)系介紹
- 都運行在操作系統(tǒng)
- 一個應(yīng)用中至少一個進程
- 一個進程中至少一個線程
- 協(xié)程在一個進程或者一個線程中執(zhí)行
2.1.2 進程介紹
- 進程是一個執(zhí)行中的程序
- 每個進程都擁有自己的地址空間、內(nèi)存、數(shù)據(jù)棧以及其他用于跟蹤執(zhí)行的輔助數(shù)據(jù)
- 操作系統(tǒng)管理其上所有進程的執(zhí)行,并為這些進程合理地分配時間
- 進程也可以通過派生(fork或spawn)新的進程來執(zhí)行其他任務(wù)
2.1.3 線程介紹
在同一個進程下執(zhí)行,并共享相同的上下文
一個進程中的各個線程與主線程共享同一片數(shù)據(jù)空間
線程包括開始、執(zhí)行順序和結(jié)束三部分
它可以被搶占(中斷)和臨時掛起(睡眠)--讓步
-
線程一般是以并發(fā)方式執(zhí)行
# 并發(fā) # 并發(fā)是一種屬性,是程序、算法或問題的屬性 # 并行只是并發(fā)問題的可能方法之一 # 如果兩個事件互不影響,則兩個事件是并發(fā)的
2.1.4 協(xié)程介紹
- 協(xié)程就是協(xié)同多任務(wù)
- 協(xié)程在一個進程或者一個線程中執(zhí)行
- 不需要鎖的機制(自己調(diào)動,在一個進程或者一個線程中執(zhí)行)
- 對多核CPU的利用——多進程+協(xié)程
2.2 對多核利用及GIL概念
2.2.1 對多核的利用
- 單核CPU系統(tǒng)中,不存在真正的并發(fā)
- GIL——全局解釋器鎖
- GIL只是強制在任何時候只有一個線程可以執(zhí)行python代碼
- I/O密集型應(yīng)用與CPU密集型應(yīng)用
2.2.2 GIL執(zhí)行順序
- 設(shè)置GIL
- 切換進一個線程去運行
- 執(zhí)行下面操作之一
- 指定數(shù)量的字節(jié)碼zhiling
- 線程主動讓出控制權(quán)(可以調(diào)用time.sleep(0)來完成)
- 把線程設(shè)置回睡眠狀態(tài)(切換出線程)
- 解鎖GIL
- 重復(fù)上述步驟
2.3 線程
2.3.1 線程的threading模塊(對象、屬性、方法)
threading模塊可以代替thread模塊
| 對象 | 描述 |
|---|---|
| Thread | 表示一個執(zhí)行線程的對象 |
| Lock | 鎖對象 |
| RLock | 可重入鎖對象,使單一線程可以(再次)獲得已持有的鎖(遞歸鎖) |
| Condition | 條件變量對象,使得一個線程等待另一個線程滿足特定的“條件”,比如改變狀態(tài)或某個數(shù)據(jù)值 |
| Event | 條件變量的通用版本,任意數(shù)量的線程等待某個事件的發(fā)生,在該事件發(fā)生后所有線程將被激活 |
| Semaphore | 為線程間共享的有限資源提供了一個“計時器”,如果沒有可用資源時會被阻塞 |
| BoundedSemap | 與Semaphore相似,不過它不允許超過初始值 |
| Timer | 與Thread相似,不過它要在運行前等待一段時間 |
| Barrier | 創(chuàng)建一個“障礙”,必須達到指定數(shù)量的線程后才可以繼續(xù) |
| 屬性 | 描述 |
|---|---|
| name | 線程名 |
| ident | 線程的標識符 |
| daemon | 布爾標志,表示這個線程是否是守護線程 |
| 方法 | 描述 |
|---|---|
| __init__() | 實例化一個線程對象,需要有一個可調(diào)用的target,以及其參數(shù)args或kwargs |
| start() | 開始執(zhí)行該線程 |
| run() | 定義線程功能的方法(通常在子類中被重寫) |
| join(timeout=None) | 直至啟動的線程終止之前一直掛起;除非給出了timeout(秒),否則會一直阻塞 |
| getName() | 返回線程名 |
| setName(name) | 設(shè)定線程名 |
| isAlivel()/is_alive() | 布爾標識,表示這個線程是否還存活 |
| isDaemon() | 如果是守護線程,則返回True;否則,返回False |
| setDaemon() | 把線程的守護標志設(shè)置為布爾值daemonic(必須在線程start()之前調(diào)用) |
2.3.2 不面向?qū)ο髮崿F(xiàn)線程
threading.Thead創(chuàng)建線程start()啟動線程-
join()掛起線程import threading def run(): for i in range(1,5): print(threading.currentThread().getName()+"---"+str(i)) def main(): print(threading.currentThread().getName()) thread=threading.Thread(target=run,name="thread-loop") thread.start() thread.join() if __name__ == '__main__': main() Result: MainThread thread-loop---1 thread-loop---2 thread-loop---3 thread-loop---4
2.3.3 面向?qū)ο髮崿F(xiàn)線程
import threading,time
class MyThread(threading.Thread):
def run(self):
for i in range(0,5):
print(threading.currentThread().getName()+"---"+str(i))
time.sleep(1)
if __name__ == '__main__':
print(threading.currentThread().getName())
thread=MyThread(name="loopthread")
thread.start()
thread.join()
Result:
MainThread
loopthread---0
loopthread---1
loopthread---2
loopthread---3
loopthread---4
2.3.4 多線程并發(fā)問題
-
一個線程沒操作完,另外線程就開始操作
import threading,time balance=0 def resource(n): global balance balance+=n time.sleep(1) balance-=n print(threading.currentThread().getName()+" n="+str(n)+" balance="+str(balance)) class MyThread(threading.Thread): def __init__(self,n,*args,**kwargs): super().__init__(*args,**kwargs) self.n=n def run(self): for i in range(0,5): resource(self.n) if __name__ == '__main__': thread1=MyThread(n=5,name="thread1") thread2 = MyThread(n=8,name="thread2") thread1.start() thread2.start() thread1.join() thread2.join() Result: thread2 n=8 balance=5thread1 n=5 balance=0 thread1 n=5 balance=8 thread2 n=8 balance=5 thread1 n=5 balance=8 thread2 n=8 balance=5 thread2 n=8 balance=5thread1 n=5 balance=0
2.3.5 多線程中的鎖
lock threading.Lock()獲取鎖lock threading.RLock()(對于Lock()只能鎖一次來說,RLock()可以多次鎖定)void lock.acquire()獲取鎖void lock.release()釋放鎖鎖定時最好利用
try...catch...finally-
可以使用
with lock:自動解鎖import threading,time balance=0 lock=threading.Lock() def resource(n): global balance with lock: balance+=n time.sleep(1) balance-=n print(threading.currentThread().getName()+" n="+str(n)+" balance="+str(balance)) class MyThread(threading.Thread): def __init__(self,n,*args,**kwargs): super().__init__(*args,**kwargs) self.n=n def run(self): for i in range(0,5): resource(self.n) if __name__ == '__main__': thread1=MyThread(n=5,name="thread1") thread2 = MyThread(n=8,name="thread2") thread1.start() thread2.start() thread1.join() thread2.join() Result: thread1 n=5 balance=0 thread1 n=5 balance=0 thread1 n=5 balance=0 thread1 n=5 balance=0 thread1 n=5 balance=0 thread2 n=8 balance=0 thread2 n=8 balance=0 thread2 n=8 balance=0 thread2 n=8 balance=0 thread2 n=8 balance=0
2.3.6 線程的調(diào)度和優(yōu)化(利用線程池優(yōu)化)
- 建立線程池,不用每次再建立線程,可以對之前的線程直接拿來使用
# 不使用多線程
import time,threading
def run(n):
time.sleep(1)
print(threading.current_thread().name+" "+str(n))
def low():
global n
begin_time=time.time()
for i in range(0,10):
run(i)
print("time:"+str(time.time()-begin_time))
if __name__ == '__main__':
low()
Result:
MainThread 0
MainThread 1
MainThread 2
MainThread 3
MainThread 4
MainThread 5
MainThread 6
MainThread 7
MainThread 8
MainThread 9
time:10.00597333908081
# 使用多線程,不使用線程池
import time,threading
count=0
def run(n):
time.sleep(1)
print(threading.current_thread().name+" "+str(n))
def normal():
global count
begin_time=time.time()
allthread=[]
for i in range(0,3):
if i!=2:
for m in range(0,4): # 在計算機中一次只能開4個線程,這個是不利用線程池,不重復(fù)使用線程
thread=threading.Thread(target=run,args=(count,))
count+=1
thread.start()
allthread.append(thread)
for thread in allthread:
thread.join()
else:
for m in range(0,2): # 最后一輪只用開2個線程
thread=threading.Thread(target=run,args=(count,))
count+=1
thread.start()
allthread.append(thread)
for thread in allthread:
thread.join()
print("time:"+str(time.time()-begin_time))
if __name__ == '__main__':
normal()
Result:
Thread-1 0
Thread-2 1
Thread-4 3Thread-3 2
Thread-5 4
Thread-6 5
Thread-7 6
Thread-8 7
Thread-9 8
Thread-10 9
time:3.0081448554992676
# 第一種線程池 multiprocessing.dummy.Pool
import time,threading
from multiprocessing.dummy import Pool
def run(n):
time.sleep(1)
print(threading.current_thread().name+" "+str(n))
def pool():
begin_time=time.time()
n_list=range(10)
pool=Pool(4)
pool.map(run,n_list)
pool.close()
pool.join()
print("time:"+str(time.time()-begin_time))
if __name__ == '__main__':
pool()
Result:
Thread-1 0
Thread-4 3
Thread-2 1Thread-3 2
Thread-2 7Thread-1 4Thread-3 6
Thread-4 5
Thread-3 8Thread-2 9
time:3.029418706893921
# 第二種線程池(不用start()、join())concurrent.futures.thread.ThreadPoolExecutor
import time,threading
from concurrent.futures.thread import ThreadPoolExecutor
def run(n):
time.sleep(1)
print(threading.current_thread().name+" "+str(n))
def pool():
begin_time=time.time()
n_list=range(10)
with ThreadPoolExecutor(max_workers=4) as pool:
pool.map(run,n_list)
print("time:"+str(time.time()-begin_time))
if __name__ == '__main__':
pool()
Result:
ThreadPoolExecutor-0_0 0
ThreadPoolExecutor-0_2 2
ThreadPoolExecutor-0_3 3
ThreadPoolExecutor-0_1 1
ThreadPoolExecutor-0_0 4
ThreadPoolExecutor-0_2 5ThreadPoolExecutor-0_3 6
ThreadPoolExecutor-0_1 7
ThreadPoolExecutor-0_0 8
ThreadPoolExecutor-0_3 9
time:3.0045437812805176
==數(shù)據(jù)太少,無法準確判斷線程池處理大量數(shù)據(jù)的明顯優(yōu)勢;在大量數(shù)據(jù)的處理時,線程池效率高,第二種線程池比第一種線程池效率高==
2.4 進程
2.4.1 進程的multiprocessing模塊
- 創(chuàng)建進程
Process multiprocessing.Process() - 啟動進程
void Process.start() - 掛起進程
void Process.join() - 獲取進程的ID
int os.getpid()
2.4.1 不面向?qū)ο髮崿F(xiàn)進程
import multiprocessing
import os
import time
def somthing(process_name):
print("process_name:{}".format(process_name))
time.sleep(2)
print("process_id:{}".format(os.getpid()))
if __name__ == '__main__':
process=multiprocessing.Process(target=somthing,args=("my_progress",))
process.start()
process.join()
Result:
process_name:my_progress
process_id:12188
2.4.2 面向?qū)ο髮崿F(xiàn)進程
import os
import time
from multiprocessing.context import Process
class MyProgress(Process):
def __init__(self,my_progress_name,*args,**kwargs):
self.my_progress_name=my_progress_name
self.name="i will be covered"
print("super__init__:{}".format(self.name))
# slef.name會被父類的構(gòu)造方法所覆蓋
super().__init__(*args,**kwargs)
print("after_super__init__:{}".format(self.name))
def run(self):
print("----------run------------")
print("my_process_name:{}".format(self.my_progress_name))
print("process_name:{}".format(self.name))
time.sleep(2)
print("process_id:{}".format(os.getpid()))
print("-----------run-----------")
if __name__ == '__main__':
progress=MyProgress("myProgress")
progress.start()
progress.join()
Result:
super__init__:i will be covered
after_super__init__:MyProgress-1
----------run------------
my_process_name:myProgress
process_name:MyProgress-1
process_id:10096
-----------run-----------
2.4.3 進程之間的通信
- 通過Queue、Pipes等實現(xiàn)進程之間的通信
import time
from multiprocessing import Process,Queue,current_process
class WriteProgress(Process):
def __init__(self,queue,*args,**kwargs):
self.queue=queue
super().__init__(*args,**kwargs)
def run(self):
msg=["This is first msg",
"This is second msg",
"This is third msg",
"This is fourth msg",
"This is fifth msg"]
for msg_item in msg:
self.queue.put(msg_item)
print("{}--send--msg:{}".format(current_process(),msg_item))
time.sleep(2)
class ReadProgress(Process):
def __init__(self,queue,*args,**kwargs):
self.queue=queue
super().__init__(*args,**kwargs)
def run(self):
while True:
msg=self.queue.get()
print("{}--get--msg:{}".format(current_process(),msg))
if __name__ == '__main__':
queue=Queue()
write=WriteProgress(queue)
read=ReadProgress(queue)
write.start()
read.start()
write.join()
write.join()
read.terminate()
Result:
<WriteProgress(WriteProgress-1, started)>--send--msg:This is first msg
<ReadProgress(ReadProgress-2, started)>--get--msg:This is first msg
<WriteProgress(WriteProgress-1, started)>--send--msg:This is second msg
<ReadProgress(ReadProgress-2, started)>--get--msg:This is second msg
<WriteProgress(WriteProgress-1, started)>--send--msg:This is third msg
<ReadProgress(ReadProgress-2, started)>--get--msg:This is third msg
<WriteProgress(WriteProgress-1, started)>--send--msg:This is fourth msg
<ReadProgress(ReadProgress-2, started)>--get--msg:This is fourth msg
<WriteProgress(WriteProgress-1, started)>--send--msg:This is fifth msg
<ReadProgress(ReadProgress-2, started)>--get--msg:This is fifth msg
2.4.4 多進程的鎖
- 鎖
Lock()RLock()Condition() -
RLock()可以多次鎖定 -
Lock()只能鎖一次,否則造成死鎖
# 不使用鎖,會讓進程”隨意“執(zhí)行,一會這個進程執(zhí)行,一會另一個程執(zhí)行
from multiprocessing import Process,current_process
import time
class WriteProgress(Process):
def __init__(self,my_name,*args,**kwargs):
self.my_name=my_name
super().__init__(*args,**kwargs)
def run(self):
for i in range(0,3):
print("progress:{}--progress_number{}".format(self.my_name,i))
time.sleep(2)
if __name__ == '__main__':
write1=WriteProgress("Progress1")
write2=WriteProgress("Progress2")
write3=WriteProgress("Progress3")
write1.start()
write2.start()
write3.start()
write1.join()
write2.join()
write3.join()
Result:
progress:Progress1--progress_number0
progress:Progress2--progress_number0
progress:Progress3--progress_number0
progress:Progress1--progress_number1
progress:Progress2--progress_number1
progress:Progress3--progress_number1
progress:Progress1--progress_number2
progress:Progress2--progress_number2
progress:Progress3--progress_number2
# 使用Lock(),會讓一個進程執(zhí)行好后其它進程再執(zhí)行
from multiprocessing import Process, Lock
import time
class WriteProgress(Process):
def __init__(self,lock,my_name,*args,**kwargs):
self.my_name=my_name
self.lock=lock
super().__init__(*args,**kwargs)
def run(self):
with self.lock:
for i in range(0,3):
print("progress:{}--progress_number:{}".format(self.my_name,i))
time.sleep(2)
if __name__ == '__main__':
lock=Lock()
write1=WriteProgress(lock,"Progress1")
write2=WriteProgress(lock,"Progress2")
write3=WriteProgress(lock,"Progress3")
write1.start()
write2.start()
write3.start()
write1.join()
write2.join()
write3.join()
Result:
progress:Progress1--progress_number:0
progress:Progress1--progress_number:1
progress:Progress1--progress_number:2
progress:Progress2--progress_number:0
progress:Progress2--progress_number:1
progress:Progress2--progress_number:2
progress:Progress3--progress_number:0
progress:Progress3--progress_number:1
progress:Progress3--progress_number:2
2.4.5 進程池
# 利用進程池,同步任務(wù)
# 同步任務(wù),每一個任務(wù)執(zhí)行完后才能拿到返回值,而且拿到的就是返回的值,而不是對象
from multiprocessing import Pool,current_process
import time
def run(i):
time.sleep(1)
return "Progress:{}--assignment_number:{}".format(current_process(), i)
if __name__ == '__main__':
pool=Pool(2)
for i in range(0,10):
result=pool.apply(run,args=(i,))
print(result)
pool.close()
pool.join()
Result:
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:0
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:1
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:2
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:3
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:4
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:5
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:6
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:7
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:8
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:9
# 利用進程池,異步任務(wù)
# 異步任務(wù),可以先那到返回值對象,對象拿到的快
# 對象拿完之后,再對任務(wù)進行執(zhí)行,將數(shù)據(jù)放入對象之中
from multiprocessing import Pool,current_process
import time
def run(i):
# print("Progress:{}--assignment_number:{}".format(current_process(), i))
time.sleep(1)
return "Progress:{}--assignment_number:{}".format(current_process(), i)
if __name__ == '__main__':
pool=Pool(2)
list=[]
for i in range(0,10):
result=pool.apply_async(run,args=(i,))
print(result) # 拿到返回值對象對象很快
list.append(result)
pool.close()
pool.join()
for i in range(0,10):
print(list[i].get())
Result:
<multiprocessing.pool.ApplyResult object at 0x0000019C8399FAC8>
<multiprocessing.pool.ApplyResult object at 0x0000019C8399FBA8>
<multiprocessing.pool.ApplyResult object at 0x0000019C8399FC50>
<multiprocessing.pool.ApplyResult object at 0x0000019C8399FCF8>
<multiprocessing.pool.ApplyResult object at 0x0000019C8399FDA0>
<multiprocessing.pool.ApplyResult object at 0x0000019C8399FE48>
<multiprocessing.pool.ApplyResult object at 0x0000019C8399FFD0>
<multiprocessing.pool.ApplyResult object at 0x0000019C839B90B8>
<multiprocessing.pool.ApplyResult object at 0x0000019C839B9160>
<multiprocessing.pool.ApplyResult object at 0x0000019C839B9208>
## 幾乎是肉眼可見的一下子打印,但是在之前有較長一段時間
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:0
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:1
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:2
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:3
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:4
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:5
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:6
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:7
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:8
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:9
2.5 協(xié)程
2.5.1 Python3.5之前協(xié)程的實現(xiàn)
-
yield生成器來實現(xiàn)
def yield_test(): # 協(xié)程
while True:
n=(yield)
print(n)
if __name__ == '__main__':
result=yield_test()
next(result)
result.send("send=ok")
result.send("hello")
Result:
send=ok
hello
2.5.2 Python3.5之后協(xié)程的實現(xiàn)
- 使用
async和await關(guān)鍵字來實現(xiàn) -
async關(guān)鍵字
- 定義特殊的函數(shù)
- 被調(diào)用時,不執(zhí)行里面的代碼,而是返回一個協(xié)程對象
- 在時間循環(huán)中調(diào)度其執(zhí)行前,協(xié)程對象不執(zhí)行任何操作
-
await關(guān)鍵字
- 等待協(xié)程執(zhí)行完成
- 當(dāng)遇到阻塞調(diào)用的函數(shù)的時候,使用
await方法將協(xié)程的控制權(quán)讓出,以便loop調(diào)用其它的協(xié)程
-
asyncio模塊-
get_event_loop()獲取事件循環(huán)隊列 -
run_until_complete()注冊任務(wù)到隊列 - 在事件循環(huán)中調(diào)度其執(zhí)行前,協(xié)程對象不執(zhí)行任何操作
-
asyncio模塊用于事件循環(huán) -
Bool asyncio.iscoroutinefunction(function)判斷是否為協(xié)程函數(shù)
-
import asyncio
async def do():
print("we are doing it......")
await asyncio.sleep(2)
obj=do()
# 得到事件循環(huán)隊列
loop=asyncio.get_event_loop()
# 注冊任務(wù)
task=loop.create_task(obj)
print(task)
# 等待協(xié)程任務(wù)執(zhí)行完成
loop.run_until_complete(task)
print(task)
Result:
<Task pending coro=<do() running at D:/study/python/MyTest/test.py:150>>
we are doing it......
## wait 2 second
<Task finished coro=<do() done, defined at D:/study/python/MyTest/test.py:150> result=None>
2.5.3 協(xié)程通信之嵌套調(diào)用
# 一個協(xié)程函數(shù)使用另一個協(xié)程函數(shù)的返回值
import asyncio
async def add(x,y):
print("We are adding.......")
await asyncio.sleep(1)
print("We added completely")
return x+y
async def get_add(x,y):
result=await add(x,y)
print("result={}".format(result))
loop=asyncio.get_event_loop()
loop.run_until_complete(get_add(10,50))
loop.close()
Result:
We are adding.......
We added completely
result=60
2.5.4 協(xié)程通信之隊列
import asyncio
async def add(queue):
for i in range(0,5):
await asyncio.sleep(1)
await queue.put(i) # put()得到一個協(xié)程對象
print("add number-->{},size-->{}".format(i,queue.qsize()))
async def reduce(queue):
for i in range(0,10):
result=await queue.get() # get()得到一個協(xié)程對象
print("reduce number-->{},size:{}".format(result,queue.qsize()))
if __name__ == '__main__':
queue=asyncio.Queue(maxsize=5)
add1=add(queue)
add2=add(queue)
reduce=reduce(queue)
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(add1,add2,reduce))
loop.close()
Result:
add number-->0,size-->1
add number-->0,size-->2
reduce number-->0,size:1
reduce number-->0,size:0
add number-->1,size-->1
add number-->1,size-->2
reduce number-->1,size:1
reduce number-->1,size:0
add number-->2,size-->1
add number-->2,size-->2
reduce number-->2,size:1
reduce number-->2,size:0
add number-->3,size-->1
add number-->3,size-->2
reduce number-->3,size:1
reduce number-->3,size:0
add number-->4,size-->1
add number-->4,size-->2
reduce number-->4,size:1
reduce number-->4,size:0