Python內(nèi)存管理與多線程

1. 內(nèi)存管理機制

1.1 賦值語句的內(nèi)存分析

  1. 賦值語句都是“引用”,可以這樣理解,但是,這種“引用”是可以改變指向的
  2. 非賦值語句時
    1. 對于可變類型的數(shù)據(jù), 創(chuàng)建一塊新內(nèi)存(Set、Dictionary、List)
    2. 對于不可變數(shù)據(jù)類型
      • 簡單數(shù)據(jù),存在使用“引用”,不存在創(chuàng)建新內(nèi)存

      • 復(fù)雜大數(shù)據(jù),創(chuàng)建新內(nèi)存

1.2 垃圾回收機制

1.2.1 垃圾自動回收

  1. 以引用計數(shù)為主,分代收集為輔

    # 引用計數(shù)
    # 1.每個對象都有存有指向該對象的引用總數(shù)
    # 2.查看某個對象的引用計數(shù) --> sys.getrefcount(obj)
    # 3.可以使用del關(guān)鍵字刪除某個引用
    
    # 分代回收
    # python將所有的對象分為0,1,2三代
    # 所有的新建對象都是0代對象
    # 當(dāng)某一代對象經(jīng)歷垃圾回收,依然存活,那么它就被歸入下一代對象
    
  2. 如果一個對象的引用數(shù)為0,python虛擬機就會回收這個對象的內(nèi)存

  3. 引用計數(shù)的缺陷是循環(huán)引用問題

  4. 垃圾自動回收總結(jié)

    # 垃圾自動回收
    # 滿足特定條件,自動啟動垃圾回收
    # 當(dāng)python運行時,會記錄其中分配對象(object allocation)和取消分配對象(object deallocation)的次數(shù)
    # 當(dāng)兩者的差值高于某個閥值時,垃圾回收才會啟動
    # 查看閥值gc.get_threshold()
    

1.2.2 垃圾手動回收

# 垃圾手動回收
# gc.collect()手動回收
# objgraph模塊中的count(obj)記錄當(dāng)前類產(chǎn)生的實例對象的個數(shù)

1.3 內(nèi)存管理機制

1.3.1 內(nèi)存池機制

當(dāng)創(chuàng)建大量消耗小內(nèi)存的對象時,頻繁調(diào)用new/malloc會導(dǎo)致大量的內(nèi)存碎片,致使效率降低。內(nèi)存池的概念就是在內(nèi)存中申請一定數(shù)量的,大小相等的內(nèi)存塊留作備用,當(dāng)有新的內(nèi)存需求時,就先從內(nèi)存池中分配內(nèi)存給這個需求,不夠了之后再申請新的內(nèi)存。這樣做最顯著的優(yōu)勢就是能夠減少內(nèi)存碎片,提升效率

1.3.2 Python3內(nèi)存管理機制——Pymalloc

  1. 針對小對象(<=512bytes),pymalloc會在內(nèi)存池中申請內(nèi)存空間
  2. 當(dāng)>512bytes,則會PyMem_RawMalloc和PyMem_RawRealloc()來申請新的內(nèi)存空間

2. Python多線程

2.1 進程、線程、協(xié)程介紹

2.1.1 關(guān)系介紹

  1. 都運行在操作系統(tǒng)
  2. 一個應(yīng)用中至少一個進程
  3. 一個進程中至少一個線程
  4. 協(xié)程在一個進程或者一個線程中執(zhí)行

2.1.2 進程介紹

  1. 進程是一個執(zhí)行中的程序
  2. 每個進程都擁有自己的地址空間、內(nèi)存、數(shù)據(jù)棧以及其他用于跟蹤執(zhí)行的輔助數(shù)據(jù)
  3. 操作系統(tǒng)管理其上所有進程的執(zhí)行,并為這些進程合理地分配時間
  4. 進程也可以通過派生(fork或spawn)新的進程來執(zhí)行其他任務(wù)

2.1.3 線程介紹

  1. 在同一個進程下執(zhí)行,并共享相同的上下文

  2. 一個進程中的各個線程與主線程共享同一片數(shù)據(jù)空間

  3. 線程包括開始、執(zhí)行順序和結(jié)束三部分

  4. 它可以被搶占(中斷)和臨時掛起(睡眠)--讓步

  5. 線程一般是以并發(fā)方式執(zhí)行

    # 并發(fā)
    # 并發(fā)是一種屬性,是程序、算法或問題的屬性
    # 并行只是并發(fā)問題的可能方法之一
    # 如果兩個事件互不影響,則兩個事件是并發(fā)的
    

2.1.4 協(xié)程介紹

  1. 協(xié)程就是協(xié)同多任務(wù)
  2. 協(xié)程在一個進程或者一個線程中執(zhí)行
  3. 不需要鎖的機制(自己調(diào)動,在一個進程或者一個線程中執(zhí)行)
  4. 對多核CPU的利用——多進程+協(xié)程

2.2 對多核利用及GIL概念

2.2.1 對多核的利用

  1. 單核CPU系統(tǒng)中,不存在真正的并發(fā)
  2. GIL——全局解釋器鎖
  3. GIL只是強制在任何時候只有一個線程可以執(zhí)行python代碼
  4. I/O密集型應(yīng)用與CPU密集型應(yīng)用

2.2.2 GIL執(zhí)行順序

  1. 設(shè)置GIL
  2. 切換進一個線程去運行
  3. 執(zhí)行下面操作之一
    • 指定數(shù)量的字節(jié)碼zhiling
    • 線程主動讓出控制權(quán)(可以調(diào)用time.sleep(0)來完成)
  4. 把線程設(shè)置回睡眠狀態(tài)(切換出線程)
  5. 解鎖GIL
  6. 重復(fù)上述步驟

2.3 線程

2.3.1 線程的threading模塊(對象、屬性、方法)

threading模塊可以代替thread模塊

對象 描述
Thread 表示一個執(zhí)行線程的對象
Lock 鎖對象
RLock 可重入鎖對象,使單一線程可以(再次)獲得已持有的鎖(遞歸鎖)
Condition 條件變量對象,使得一個線程等待另一個線程滿足特定的“條件”,比如改變狀態(tài)或某個數(shù)據(jù)值
Event 條件變量的通用版本,任意數(shù)量的線程等待某個事件的發(fā)生,在該事件發(fā)生后所有線程將被激活
Semaphore 為線程間共享的有限資源提供了一個“計時器”,如果沒有可用資源時會被阻塞
BoundedSemap 與Semaphore相似,不過它不允許超過初始值
Timer 與Thread相似,不過它要在運行前等待一段時間
Barrier 創(chuàng)建一個“障礙”,必須達到指定數(shù)量的線程后才可以繼續(xù)
屬性 描述
name 線程名
ident 線程的標識符
daemon 布爾標志,表示這個線程是否是守護線程
方法 描述
__init__() 實例化一個線程對象,需要有一個可調(diào)用的target,以及其參數(shù)args或kwargs
start() 開始執(zhí)行該線程
run() 定義線程功能的方法(通常在子類中被重寫)
join(timeout=None) 直至啟動的線程終止之前一直掛起;除非給出了timeout(秒),否則會一直阻塞
getName() 返回線程名
setName(name) 設(shè)定線程名
isAlivel()/is_alive() 布爾標識,表示這個線程是否還存活
isDaemon() 如果是守護線程,則返回True;否則,返回False
setDaemon() 把線程的守護標志設(shè)置為布爾值daemonic(必須在線程start()之前調(diào)用)

2.3.2 不面向?qū)ο髮崿F(xiàn)線程

  1. threading.Thead創(chuàng)建線程

  2. start()啟動線程

  3. join()掛起線程

    import threading
    def run():
        for i in range(1,5):
            print(threading.currentThread().getName()+"---"+str(i))
    def main():
        print(threading.currentThread().getName())
        thread=threading.Thread(target=run,name="thread-loop")
        thread.start()
        thread.join()
    if __name__ == '__main__':
        main()
        
    Result:
     MainThread
        thread-loop---1
        thread-loop---2
        thread-loop---3
        thread-loop---4
    

2.3.3 面向?qū)ο髮崿F(xiàn)線程

import threading,time
class MyThread(threading.Thread):
    def run(self):
        for i in range(0,5):
            print(threading.currentThread().getName()+"---"+str(i))
            time.sleep(1)
if __name__ == '__main__':
    print(threading.currentThread().getName())
    thread=MyThread(name="loopthread")
    thread.start()
    thread.join()
    
Result:
    MainThread
    loopthread---0
    loopthread---1
    loopthread---2
    loopthread---3
    loopthread---4

2.3.4 多線程并發(fā)問題

  • 一個線程沒操作完,另外線程就開始操作

    import threading,time
    balance=0
    def resource(n):
        global balance
        balance+=n
        time.sleep(1)
        balance-=n
        print(threading.currentThread().getName()+"  n="+str(n)+"  balance="+str(balance))
    class MyThread(threading.Thread):
        def __init__(self,n,*args,**kwargs):
            super().__init__(*args,**kwargs)
            self.n=n
    
        def run(self):
            for i in range(0,5):
                resource(self.n)
    if __name__ == '__main__':
        thread1=MyThread(n=5,name="thread1")
        thread2 = MyThread(n=8,name="thread2")
        thread1.start()
        thread2.start()
        thread1.join()
        thread2.join()
        
    Result:
        thread2  n=8  balance=5thread1  n=5  balance=0
    
        thread1  n=5  balance=8
        thread2  n=8  balance=5
        thread1  n=5  balance=8
        thread2  n=8  balance=5
        thread2  n=8  balance=5thread1  n=5  balance=0
        
    

2.3.5 多線程中的鎖

  1. lock threading.Lock() 獲取鎖 lock threading.RLock()(對于Lock()只能鎖一次來說,RLock()可以多次鎖定)

  2. void lock.acquire() 獲取鎖

  3. void lock.release()釋放鎖

  4. 鎖定時最好利用 try...catch...finally

  5. 可以使用 with lock:自動解鎖

    import threading,time
    balance=0
    lock=threading.Lock()
    def resource(n):
        global balance
        with lock:
            balance+=n
            time.sleep(1)
            balance-=n
            print(threading.currentThread().getName()+"  n="+str(n)+"  balance="+str(balance))
    class MyThread(threading.Thread):
        def __init__(self,n,*args,**kwargs):
            super().__init__(*args,**kwargs)
            self.n=n
    
        def run(self):
            for i in range(0,5):
                resource(self.n)
    if __name__ == '__main__':
        thread1=MyThread(n=5,name="thread1")
        thread2 = MyThread(n=8,name="thread2")
        thread1.start()
        thread2.start()
        thread1.join()
        thread2.join()
        
    Result:
     thread1  n=5  balance=0
        thread1  n=5  balance=0
        thread1  n=5  balance=0
        thread1  n=5  balance=0
        thread1  n=5  balance=0
        thread2  n=8  balance=0
        thread2  n=8  balance=0
        thread2  n=8  balance=0
        thread2  n=8  balance=0
        thread2  n=8  balance=0
    

2.3.6 線程的調(diào)度和優(yōu)化(利用線程池優(yōu)化)

  1. 建立線程池,不用每次再建立線程,可以對之前的線程直接拿來使用
# 不使用多線程
import time,threading

def run(n):
    time.sleep(1)
    print(threading.current_thread().name+"   "+str(n))
def low():
    global n
    begin_time=time.time()
    for i in range(0,10):
        run(i)
    print("time:"+str(time.time()-begin_time))
if __name__ == '__main__':
    low()
    
Result:
    MainThread   0
    MainThread   1
    MainThread   2
    MainThread   3
    MainThread   4
    MainThread   5
    MainThread   6
    MainThread   7
    MainThread   8
    MainThread   9
    time:10.00597333908081
# 使用多線程,不使用線程池
import time,threading
count=0
def run(n):
    time.sleep(1)
    print(threading.current_thread().name+"   "+str(n))
def normal():
    global count
    begin_time=time.time()
    allthread=[]
    for i in range(0,3):
        if i!=2:
            for m in range(0,4):  # 在計算機中一次只能開4個線程,這個是不利用線程池,不重復(fù)使用線程
                    thread=threading.Thread(target=run,args=(count,))
                    count+=1
                    thread.start()
                    allthread.append(thread)
            for thread in allthread:
                thread.join()
        else:
            for m in range(0,2):  # 最后一輪只用開2個線程
                    thread=threading.Thread(target=run,args=(count,))
                    count+=1
                    thread.start()
                    allthread.append(thread)
            for thread in allthread:
                thread.join()
    print("time:"+str(time.time()-begin_time))
if __name__ == '__main__':
    normal()
    
Result:
    Thread-1   0
    Thread-2   1
    Thread-4   3Thread-3   2

    Thread-5   4
    Thread-6   5
    Thread-7   6
    Thread-8   7
    Thread-9   8
    Thread-10   9
    time:3.0081448554992676
# 第一種線程池 multiprocessing.dummy.Pool
import time,threading
from multiprocessing.dummy import Pool

def run(n):
    time.sleep(1)
    print(threading.current_thread().name+"   "+str(n))
def pool():
    begin_time=time.time()
    n_list=range(10)
    pool=Pool(4)
    pool.map(run,n_list)
    pool.close()
    pool.join()
    print("time:"+str(time.time()-begin_time))
if __name__ == '__main__':
    pool()
    
Result:
    Thread-1   0
    Thread-4   3
    Thread-2   1Thread-3   2

    Thread-2   7Thread-1   4Thread-3   6

    Thread-4   5

    Thread-3   8Thread-2   9

    time:3.029418706893921
# 第二種線程池(不用start()、join())concurrent.futures.thread.ThreadPoolExecutor
import time,threading
from concurrent.futures.thread import ThreadPoolExecutor

def run(n):
    time.sleep(1)
    print(threading.current_thread().name+"   "+str(n))
def pool():
    begin_time=time.time()
    n_list=range(10)
    with ThreadPoolExecutor(max_workers=4) as pool:
        pool.map(run,n_list)
    print("time:"+str(time.time()-begin_time))
if __name__ == '__main__':
    pool()
    
Result:
    ThreadPoolExecutor-0_0   0
    ThreadPoolExecutor-0_2   2
    ThreadPoolExecutor-0_3   3
    ThreadPoolExecutor-0_1   1
    ThreadPoolExecutor-0_0   4
    ThreadPoolExecutor-0_2   5ThreadPoolExecutor-0_3   6

    ThreadPoolExecutor-0_1   7
    ThreadPoolExecutor-0_0   8
    ThreadPoolExecutor-0_3   9
    time:3.0045437812805176

==數(shù)據(jù)太少,無法準確判斷線程池處理大量數(shù)據(jù)的明顯優(yōu)勢;在大量數(shù)據(jù)的處理時,線程池效率高,第二種線程池比第一種線程池效率高==

2.4 進程

2.4.1 進程的multiprocessing模塊

  1. 創(chuàng)建進程 Process multiprocessing.Process()
  2. 啟動進程 void Process.start()
  3. 掛起進程 void Process.join()
  4. 獲取進程的ID int os.getpid()

2.4.1 不面向?qū)ο髮崿F(xiàn)進程

import multiprocessing
import os
import time


def somthing(process_name):
    print("process_name:{}".format(process_name))
    time.sleep(2)
    print("process_id:{}".format(os.getpid()))
if __name__ == '__main__':
    process=multiprocessing.Process(target=somthing,args=("my_progress",))
    process.start()
    process.join()
    
Result:
    process_name:my_progress
    process_id:12188

2.4.2 面向?qū)ο髮崿F(xiàn)進程

import os
import time
from multiprocessing.context import Process

class MyProgress(Process):
    def __init__(self,my_progress_name,*args,**kwargs):
        self.my_progress_name=my_progress_name
        self.name="i will be covered"
        print("super__init__:{}".format(self.name))
        # slef.name會被父類的構(gòu)造方法所覆蓋
        super().__init__(*args,**kwargs)
        print("after_super__init__:{}".format(self.name))

    def run(self):
        print("----------run------------")
        print("my_process_name:{}".format(self.my_progress_name))
        print("process_name:{}".format(self.name))
        time.sleep(2)
        print("process_id:{}".format(os.getpid()))
        print("-----------run-----------")
if __name__ == '__main__':
    progress=MyProgress("myProgress")
    progress.start()
    progress.join()
    
Result:
    super__init__:i will be covered
    after_super__init__:MyProgress-1
    ----------run------------
    my_process_name:myProgress
    process_name:MyProgress-1
    process_id:10096
    -----------run-----------

2.4.3 進程之間的通信

  • 通過Queue、Pipes等實現(xiàn)進程之間的通信
import time
from multiprocessing import Process,Queue,current_process
class WriteProgress(Process):
    def __init__(self,queue,*args,**kwargs):
        self.queue=queue
        super().__init__(*args,**kwargs)
    def run(self):
        msg=["This is first msg",
             "This is second msg",
             "This is third msg",
             "This is fourth msg",
             "This is fifth msg"]
        for msg_item in msg:
            self.queue.put(msg_item)
            print("{}--send--msg:{}".format(current_process(),msg_item))
            time.sleep(2)

class ReadProgress(Process):
    def __init__(self,queue,*args,**kwargs):
        self.queue=queue
        super().__init__(*args,**kwargs)
    def run(self):
        while True:
            msg=self.queue.get()
            print("{}--get--msg:{}".format(current_process(),msg))
if __name__ == '__main__':
    queue=Queue()
    write=WriteProgress(queue)
    read=ReadProgress(queue)
    write.start()
    read.start()
    write.join()
    write.join()
    read.terminate()
    
Result:
    <WriteProgress(WriteProgress-1, started)>--send--msg:This is first msg
    <ReadProgress(ReadProgress-2, started)>--get--msg:This is first msg
    <WriteProgress(WriteProgress-1, started)>--send--msg:This is second msg
    <ReadProgress(ReadProgress-2, started)>--get--msg:This is second msg
    <WriteProgress(WriteProgress-1, started)>--send--msg:This is third msg
    <ReadProgress(ReadProgress-2, started)>--get--msg:This is third msg
    <WriteProgress(WriteProgress-1, started)>--send--msg:This is fourth msg
    <ReadProgress(ReadProgress-2, started)>--get--msg:This is fourth msg
    <WriteProgress(WriteProgress-1, started)>--send--msg:This is fifth msg
    <ReadProgress(ReadProgress-2, started)>--get--msg:This is fifth msg

2.4.4 多進程的鎖

  • Lock() RLock() Condition()
  • RLock()可以多次鎖定
  • Lock()只能鎖一次,否則造成死鎖
# 不使用鎖,會讓進程”隨意“執(zhí)行,一會這個進程執(zhí)行,一會另一個程執(zhí)行
from multiprocessing import Process,current_process
import time

class WriteProgress(Process):
    def __init__(self,my_name,*args,**kwargs):
        self.my_name=my_name
        super().__init__(*args,**kwargs)
    def run(self):
        for i in range(0,3):
            print("progress:{}--progress_number{}".format(self.my_name,i))
            time.sleep(2)
if __name__ == '__main__':
    write1=WriteProgress("Progress1")
    write2=WriteProgress("Progress2")
    write3=WriteProgress("Progress3")
    write1.start()
    write2.start()
    write3.start()
    write1.join()
    write2.join()
    write3.join()
    
Result:
    progress:Progress1--progress_number0
    progress:Progress2--progress_number0
    progress:Progress3--progress_number0
    progress:Progress1--progress_number1
    progress:Progress2--progress_number1
    progress:Progress3--progress_number1
    progress:Progress1--progress_number2
    progress:Progress2--progress_number2
    progress:Progress3--progress_number2
# 使用Lock(),會讓一個進程執(zhí)行好后其它進程再執(zhí)行
from multiprocessing import Process, Lock
import time

class WriteProgress(Process):
    def __init__(self,lock,my_name,*args,**kwargs):
        self.my_name=my_name
        self.lock=lock
        super().__init__(*args,**kwargs)
    def run(self):
        with self.lock:
            for i in range(0,3):
                print("progress:{}--progress_number:{}".format(self.my_name,i))
                time.sleep(2)
if __name__ == '__main__':
    lock=Lock()
    write1=WriteProgress(lock,"Progress1")
    write2=WriteProgress(lock,"Progress2")
    write3=WriteProgress(lock,"Progress3")
    write1.start()
    write2.start()
    write3.start()
    write1.join()
    write2.join()
    write3.join()
    
Result:
    progress:Progress1--progress_number:0
    progress:Progress1--progress_number:1
    progress:Progress1--progress_number:2
    progress:Progress2--progress_number:0
    progress:Progress2--progress_number:1
    progress:Progress2--progress_number:2
    progress:Progress3--progress_number:0
    progress:Progress3--progress_number:1
    progress:Progress3--progress_number:2

2.4.5 進程池

# 利用進程池,同步任務(wù)
# 同步任務(wù),每一個任務(wù)執(zhí)行完后才能拿到返回值,而且拿到的就是返回的值,而不是對象
from multiprocessing import Pool,current_process
import time


def run(i):
    time.sleep(1)
    return "Progress:{}--assignment_number:{}".format(current_process(), i)

if __name__ == '__main__':
    pool=Pool(2)
    for i in range(0,10):
        result=pool.apply(run,args=(i,))
        print(result)
    pool.close()
    pool.join()
    
Result:
        Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:0
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:1
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:2
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:3
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:4
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:5
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:6
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:7
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:8
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:9
# 利用進程池,異步任務(wù)
# 異步任務(wù),可以先那到返回值對象,對象拿到的快
# 對象拿完之后,再對任務(wù)進行執(zhí)行,將數(shù)據(jù)放入對象之中
from multiprocessing import Pool,current_process
import time
def run(i):
    # print("Progress:{}--assignment_number:{}".format(current_process(), i))
    time.sleep(1)
    return "Progress:{}--assignment_number:{}".format(current_process(), i)

if __name__ == '__main__':
    pool=Pool(2)
    list=[]
    for i in range(0,10):
        result=pool.apply_async(run,args=(i,))
        print(result) # 拿到返回值對象對象很快
        list.append(result)
    pool.close()
    pool.join()
    for i in range(0,10):
        print(list[i].get())
        
Result:
    <multiprocessing.pool.ApplyResult object at 0x0000019C8399FAC8>
    <multiprocessing.pool.ApplyResult object at 0x0000019C8399FBA8>
    <multiprocessing.pool.ApplyResult object at 0x0000019C8399FC50>
    <multiprocessing.pool.ApplyResult object at 0x0000019C8399FCF8>
    <multiprocessing.pool.ApplyResult object at 0x0000019C8399FDA0>
    <multiprocessing.pool.ApplyResult object at 0x0000019C8399FE48>
    <multiprocessing.pool.ApplyResult object at 0x0000019C8399FFD0>
    <multiprocessing.pool.ApplyResult object at 0x0000019C839B90B8>
    <multiprocessing.pool.ApplyResult object at 0x0000019C839B9160>
    <multiprocessing.pool.ApplyResult object at 0x0000019C839B9208>
    ## 幾乎是肉眼可見的一下子打印,但是在之前有較長一段時間
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:0
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:1
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:2
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:3
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:4
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:5
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:6
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:7
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:8
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:9

2.5 協(xié)程

2.5.1 Python3.5之前協(xié)程的實現(xiàn)

  • yield 生成器來實現(xiàn)
def yield_test(): # 協(xié)程
    while True:
        n=(yield)
        print(n)
if __name__ == '__main__':
    result=yield_test()
    next(result)
    result.send("send=ok")
    result.send("hello")
    
Result:
    send=ok
    hello

2.5.2 Python3.5之后協(xié)程的實現(xiàn)

  1. 使用 asyncawait 關(guān)鍵字來實現(xiàn)
  2. async 關(guān)鍵字
  • 定義特殊的函數(shù)
  • 被調(diào)用時,不執(zhí)行里面的代碼,而是返回一個協(xié)程對象
  • 在時間循環(huán)中調(diào)度其執(zhí)行前,協(xié)程對象不執(zhí)行任何操作
  1. await 關(guān)鍵字
  • 等待協(xié)程執(zhí)行完成
  • 當(dāng)遇到阻塞調(diào)用的函數(shù)的時候,使用 await 方法將協(xié)程的控制權(quán)讓出,以便loop調(diào)用其它的協(xié)程
  1. asyncio 模塊
    • get_event_loop() 獲取事件循環(huán)隊列
    • run_until_complete() 注冊任務(wù)到隊列
    • 在事件循環(huán)中調(diào)度其執(zhí)行前,協(xié)程對象不執(zhí)行任何操作
    • asyncio 模塊用于事件循環(huán)
    • Bool asyncio.iscoroutinefunction(function) 判斷是否為協(xié)程函數(shù)
import asyncio
async def do():
    print("we are doing it......")
    await asyncio.sleep(2)
obj=do()
# 得到事件循環(huán)隊列
loop=asyncio.get_event_loop()
# 注冊任務(wù)
task=loop.create_task(obj)
print(task)
# 等待協(xié)程任務(wù)執(zhí)行完成
loop.run_until_complete(task)
print(task)

Result:
    <Task pending coro=<do() running at D:/study/python/MyTest/test.py:150>>
    we are doing it......
    ## wait 2 second
    <Task finished coro=<do() done, defined at D:/study/python/MyTest/test.py:150> result=None>

2.5.3 協(xié)程通信之嵌套調(diào)用

# 一個協(xié)程函數(shù)使用另一個協(xié)程函數(shù)的返回值
import asyncio
async def add(x,y):
    print("We are adding.......")
    await asyncio.sleep(1)
    print("We added completely")
    return x+y

async def get_add(x,y):
    result=await add(x,y)
    print("result={}".format(result))

loop=asyncio.get_event_loop()
loop.run_until_complete(get_add(10,50))
loop.close()

Result:
    We are adding.......
    We added completely
    result=60

2.5.4 協(xié)程通信之隊列

import asyncio
async def add(queue):
    for i in range(0,5):
        await asyncio.sleep(1)
        await queue.put(i) # put()得到一個協(xié)程對象
        print("add number-->{},size-->{}".format(i,queue.qsize()))
async def reduce(queue):
    for i in range(0,10):
        result=await queue.get()  # get()得到一個協(xié)程對象
        print("reduce number-->{},size:{}".format(result,queue.qsize()))
if __name__ == '__main__':
    queue=asyncio.Queue(maxsize=5)
    add1=add(queue)
    add2=add(queue)
    reduce=reduce(queue)
    loop=asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(add1,add2,reduce))
    loop.close()
    
Result:
    add number-->0,size-->1
    add number-->0,size-->2
    reduce number-->0,size:1
    reduce number-->0,size:0
    add number-->1,size-->1
    add number-->1,size-->2
    reduce number-->1,size:1
    reduce number-->1,size:0
    add number-->2,size-->1
    add number-->2,size-->2
    reduce number-->2,size:1
    reduce number-->2,size:0
    add number-->3,size-->1
    add number-->3,size-->2
    reduce number-->3,size:1
    reduce number-->3,size:0
    add number-->4,size-->1
    add number-->4,size-->2
    reduce number-->4,size:1
    reduce number-->4,size:0  
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容