Python基礎(chǔ)語法 - 5 內(nèi)存管理和多線程

引用計數(shù)和分代回收
多線程和鎖和線程池
多進程和通信和鎖和進程池
協(xié)程和異步io async await

內(nèi)存管理

1. 賦值語句分析

值傳遞,傳的是對象的地址,數(shù)字字符元組為不可變類型,賦值后會指向新的地址

def p(word, res=[]):
  print(res, id(res))
  res.append(word)
  print(res, id(res))
  return res
res1 = p(3)
res2 = p('12', [])
res3 = p(10)
# result
[] 4447330952
[3] 4447330952
[] 4447252360
['12'] 4447252360
[3] 4447330952
[3, 10] 4447330952

2. 垃圾回收機制

  • 以引用計數(shù)為主,分代收集為輔
  • 如果一個對象的引用數(shù)為0,python就會回收這個對象的內(nèi)存
  • 引用計數(shù)的缺陷是循環(huán)引用的問題,所以用分代回收機制來解決這個問題

3. 內(nèi)存管理機制

  • 引用計數(shù)
    sys.getrefcount() 獲得摸個對象的引用計數(shù),使用del關(guān)鍵字刪除某個引用
  • 垃圾回收
    python記錄分配對象和取消分配對象的次數(shù),當(dāng)兩者的差值高于某個閾值時,垃圾回收才啟動
    通過gc.get_threshold()來查看 0,1,2代的閾值
  • 分代回收
    新建對象為0代對象,某一代經(jīng)歷垃圾回收,依然存在,歸入下一代對象
    git.collect(generation=2)指定哪個代回收,默認是2代表所有都回收了
    del p 刪除某個對象
    objgraph模塊中的count()可以記錄當(dāng)前類產(chǎn)生的實例對象的個數(shù):objgraph.count('Cat') Cat類的對象個數(shù)
  • 官方指南
    https://docs.python.org/3/library/gc.html#gc.collect
Set the garbage collection thresholds (the collection frequency). 
Setting *threshold0* to zero disables collection.

The GC classifies objects into three generations depending on how many collection
sweeps they have survived. New objects are placed in the youngest generation
(generation `0`). If an object survives a collection it is moved into the next older
generation. Since generation `2` is the oldest generation, objects in that generation
remain there after a collection. In order to decide when to run, the collector keeps track of
the number object allocations and deallocations since the last collection. When the
number of allocations minus the number of deallocations exceeds *threshold0*, collection
starts. Initially only generation `0` is examined. If generation `0` has been examined more
than *threshold1* times since generation `1` has been examined, then generation `1` is
examined as well. Similarly, *threshold2* controls the number of collections of
generation `1` before collecting generation `2`.
  • 內(nèi)存池機制,預(yù)先在內(nèi)存中申請一定數(shù)量的,大小想到的內(nèi)存塊作備用,有新的內(nèi)存需求先從內(nèi)存池中分配; Pyhton3內(nèi)存管理機制 Pymalloc 針對小對象(<512bytes) ,pymalloc會在內(nèi)存池中申請內(nèi)存空間,當(dāng)>512bytes, 則會PyMem_RawMalloc()和PyMem_RawRealloc()來申請新的空間

4. 源碼剖析

https://github.com/Junnplus/blog/projects/1

多線程

1. 進程,線程和協(xié)程

2. 進程和線程的關(guān)系

進程的內(nèi)存空間等等不同
線程共享進程上下文,包括開始,執(zhí)行順序,結(jié)束;可以被搶占(中斷)和臨時掛起(睡眠)- 讓步
并行(parallelism)一個時間有多個任務(wù),并發(fā)(concurrency)多個任務(wù)同時運行

3. 多核

GIL - 全局解釋器鎖
GIL強制在任何時候只有一個線程可以執(zhí)行python代碼
I/O密集型應(yīng)用與CPU密集型應(yīng)用
GIL執(zhí)行順序:

  • 設(shè)置GIL
  • 切換一個線程運行
    • 執(zhí)行指定數(shù)量的字節(jié)碼指令
    • 線程主動讓出控制權(quán)(time.sleep(0))
  • 把線程設(shè)置為睡眠狀態(tài)(切換出線程)
  • 解鎖GIL
  • 重復(fù)上面的步驟

4. 實現(xiàn)一個線程

threading.Thread創(chuàng)建線程,start()啟動線程,join()掛起線程





兩種創(chuàng)建方法:載入運行的函數(shù),或者繼承Thread類

# 方法1
import threading
import time


def loop():
    """ 新的線程執(zhí)行的代碼 """
    n = 0
    while n < 5:
        print(n)
        now_thread = threading.current_thread()
        print('[loop]now  thread name : {0}'.format(now_thread.name))
        time.sleep(1)
        n += 1


def use_thread():
    """ 使用線程來實現(xiàn) """
    # 當(dāng)前正在執(zhí)行的線程名稱
    now_thread = threading.current_thread()
    print('now  thread name : {0}'.format(now_thread.name))
    # 設(shè)置線程
    t = threading.Thread(target=loop, name='loop_thread')
    # 啟動線程
    t.start()
    # 掛起線程
    t.join()


if __name__ == '__main__':
    use_thread()

# 方法2
import threading
import time


class LoopThread(threading.Thread):
    """ 自定義線程 """

    n = 0

    def run(self):
        while self.n < 5:
            print(self.n)
            now_thread = threading.current_thread()
            print('[loop]now  thread name : {0}'.format(now_thread.name))
            time.sleep(1)
            self.n += 1


if __name__ == '__main__':
    # 當(dāng)前正在執(zhí)行的線程名稱
    now_thread = threading.current_thread()
    print('now  thread name : {0}'.format(now_thread.name))
    t = LoopThread(name='loop_thread_oop')
    t.start()
    t.join()

5. 多線程并發(fā)問題和鎖

線程ThreadLocal
https://www.liaoxuefeng.com/wiki/1016959663602400/1017630786314240
Lock和RLock和Condition,RLock可以鎖多次(在同一個線程里),釋放的時候也要釋放多次
有兩種實現(xiàn)方式:try except finally; with

import threading
import time


# 獲得一把鎖
my_lock = threading.Lock()
your_lock = threading.RLock()

# 我的銀行賬戶
balance = 0


def change_it(n):
    """ 改變我的余額 """
    global balance

    # 方式一,使用with
    with your_lock:
        balance = balance + n
        time.sleep(2)
        balance = balance - n
        time.sleep(1)
        print('-N---> {0}; balance: {1}'.format(n, balance))

    # 方式二
    # try:
    #     print('start lock')
    #     # 添加鎖
    #     your_lock.acquire()
    #     print('locked one ')
    #     # 資源已經(jīng)被鎖住了,不能重復(fù)鎖定, 產(chǎn)生死鎖
    #     your_lock.acquire()
    #     print('locked two')
    #     balance = balance + n
    #     time.sleep(2)
    #     balance = balance - n
    #     time.sleep(1)
    #     print('-N---> {0}; balance: {1}'.format(n, balance))
    # finally:
    #     # 釋放掉鎖
    #     your_lock.release()
    #     your_lock.release()


class ChangeBalanceThread(threading.Thread):
    """
    改變銀行余額的線程
    """

    def __init__(self, num, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.num = num

    def run(self):
        for i in range(100):
            change_it(self.num)


if __name__ == '__main__':
    t1 = ChangeBalanceThread(5)
    t2 = ChangeBalanceThread(8)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('the last: {0}'.format(balance))

6. 線程的調(diào)度和優(yōu)化,線程池

使用線程池
兩種方法:
Pool(10) map()/submit() close() join()
with ThreadPoolExecutor(max_workers=10) as executor: map()

import time
import threading
from concurrent.futures import ThreadPoolExecutor
from multiprocessing.dummy import Pool


def run(n):
    """ 線程要做的事情 """
    time.sleep(2)
    print(threading.current_thread().name, n)


def main():
    """ 使用傳統(tǒng)的方法來做任務(wù) """
    t1 = time.time()
    for n in range(100):
        run(n)
    print(time.time() - t1)


def main_use_thread():
    """ 使用線程優(yōu)化任務(wù) """
    # 資源有限,最多只能跑10個線程
    t1 = time.time()
    ls = []
    for count in range(10):
        for i in range(10):
            t = threading.Thread(target=run, args=(i,))
            ls.append(t)
            t.start()

        for l in ls:
            l.join()
    print(time.time() - t1)


def main_use_pool():
    """ 使用線程池來優(yōu)化 """
    t1 = time.time()
    n_list = range(100)
    pool = Pool(10)
    pool.map(run, n_list)
    pool.close()
    pool.join()
    print(time.time() - t1)


def main_use_executor():
    """ 使用 ThreadPoolExecutor 來優(yōu)化"""
    t1 = time.time()
    n_list = range(100)
    with ThreadPoolExecutor(max_workers=10) as executor:
        executor.map(run, n_list)
    print(time.time() - t1)


if __name__ == '__main__':
    # main()
    # main_use_thread()
    # main_use_pool()
    main_use_executor()

7. 進程實現(xiàn)

multiprocessing模塊: multiprocessing.Process; start() join(); os.getpid()
兩種方式去實現(xiàn):傳入函數(shù)或者面向?qū)ο?/p>

import os
import time
from multiprocessing import Process


def do_sth(name):
    """
    進程要做的事情
    :param name: str 進程的名稱
    """
    print('進程的名稱:{0}, pid: {1}'.format(name, os.getpid()))
    time.sleep(150)
    print('進程要做的事情')


class MyProcess(Process):

    def __init__(self, name, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.my_name = name

    def run(self):
        print('MyProcess進程的名稱:{0}, pid: {1}'.format(
            self.my_name, os.getpid()))
        time.sleep(150)
        print('MyProcess進程要做的事情')


if __name__ == '__main__':
    # p = Process(target=do_sth, args=('my process', ))
    p = MyProcess('my process class')
    # 啟動進程
    p.start()
    # 掛起進程
    p.join()

8. 多進程通信

Queue和Pipes
共享一個對象https://docs.python.org/3/library/multiprocessing.html#proxy-objects

Multiprocessing.Value()

from multiprocessing import Process, Queue, current_process

import random
import time


class WriteProcess(Process):
    """ 寫的進程 """
    def __init__(self, q, *args, **kwargs):
        self.q = q
        super().__init__(*args, **kwargs)

    def run(self):
        """ 實現(xiàn)進程的業(yè)務(wù)邏輯 """
        # 要寫的內(nèi)容
        ls = [
            "第一行內(nèi)容",
            "第2行內(nèi)容",
            "第3行內(nèi)容",
            "第4行內(nèi)容",
        ]
        for line in ls:
            print('寫入內(nèi)容: {0} -{1}'.format(line, current_process().name))
            self.q.put(line)
            # 每寫入一次,休息1-5秒
            time.sleep(random.randint(1, 5))


class ReadProcess(Process):
    """ 讀取內(nèi)容進程 """
    def __init__(self, q, *args, **kwargs):
        self.q = q
        super().__init__(*args, **kwargs)

    def run(self):
        while True:
            content = self.q.get()
            print('讀取到的內(nèi)容:{0} - {1}'.format(content, current_process().name))


if __name__ == '__main__':
    # 通過Queue共享數(shù)據(jù)
    q = Queue()
    # 寫入內(nèi)容的進程
    t_write = WriteProcess(q)
    t_write.start()
    # 讀取進程啟動
    t_read = ReadProcess(q)
    t_read.start()
    t_write.join()

    # 因為讀的進程是死循環(huán),無法等待其結(jié)束,只能強制終止
    t_read.terminate()

9. 多進程中的鎖

Lock(), Rlock(), Condition()
機制跟多線程鎖基本是一樣的,用try finally 或者with結(jié)構(gòu)

import random
from multiprocessing import Process, Lock, RLock

import time


class WriteProcess(Process):
    """ 寫入文件 """

    def __init__(self, file_name, num, lock, *args, **kwargs):
        # 文件的名稱
        self.file_name = file_name
        self.num = num
        # 鎖對象
        self.lock = lock
        super().__init__(*args, **kwargs)

    def run(self):
        """ 寫入文件的主要業(yè)務(wù)邏輯 """
        with self.lock:
        # try:
        #     # 添加鎖
        #     self.lock.acquire()
        #     print('locked')
        #     self.lock.acquire()
        #     print('relocked')
            for i in range(5):
                content = '現(xiàn)在是: {0} : {1} - {2} \n'.format(
                    self.name,
                    self.pid,
                    self.num
                )
                with open(self.file_name, 'a+', encoding='utf-8') as f:
                    f.write(content)
                    time.sleep(random.randint(1, 5))
                    print(content)
        # finally:
        #     # 釋放鎖
        #     self.lock.release()
        #     self.lock.release()


if __name__ == '__main__':
    file_name = 'test.txt'
    # 所的對象
    lock = RLock()
    for x in range(5):
        p = WriteProcess(file_name, x, lock)
        p.start()

10. 進程池

有同步和異步的方法;Pool() apply() apply_async() map() close()
https://docs.python.org/3/library/multiprocessing.html?highlight=pool#module-multiprocessing.pool

import random
from multiprocessing import current_process, Pool

import time


def run(file_name, num):
    """
    進程執(zhí)行的業(yè)務(wù)邏輯
    往文件中寫入數(shù)據(jù)
    :param file_name: str 文件名稱
    :param num: int 寫入的數(shù)字
    :return: str 寫入的結(jié)果
    """
    with open(file_name, 'a+', encoding='utf-8') as f:
        # 當(dāng)前的進程
        now_process = current_process()
        # 寫入的內(nèi)容
        conent = '{0} - {1}- {2}'.format(
            now_process.name,
            now_process.pid,
            num
        )
        f.write(conent)
        f.write('\n')
        # 寫完之后隨機休息1-5秒
        time.sleep(random.randint(1, 5))
        print(conent)
    return 'ok'


if __name__ == '__main__':
    file_name = 'test_pool.txt'
    # 進程池
    pool = Pool(2)
    rest_list = []
    for i in range(20):
        # 同步添加任務(wù)
        # rest = pool.apply(run, args=(file_name, i))
        rest = pool.apply_async(run, args=(file_name, i))
        rest_list.append(rest)
        print('{0}--- {1}'.format(i, rest))
    # 關(guān)閉池子
    pool.close()
    pool.join()
    # 查看異步執(zhí)行的結(jié)果
    print(rest_list[0].get())

11. 協(xié)程

協(xié)同多任務(wù),不需要鎖機制,用多進程+協(xié)程對多核CPU的利用
python 3..5之前使用 yield實現(xiàn)
https://docs.python.org/3/library/asyncio-task.html



def count_down(n):
    """ 倒計時效果 """
    while n > 0:
        yield n
        n -= 1


def yield_test():
    """ 實現(xiàn)協(xié)程函數(shù) """
    while True:
        n = (yield )
        print(n)


if __name__ == '__main__':
    # rest = count_down(5)
    # print(next(rest))
    # print(next(rest))
    # print(next(rest))
    # print(next(rest))
    # print(next(rest))
    rest = yield_test()
    next(rest)
    rest.send('6666')
    rest.send('6666')

之后用async和await來實現(xiàn)
async函數(shù)被調(diào)用時,返回一個協(xié)程對象,不執(zhí)行這個函數(shù)
在事件循環(huán)調(diào)度其執(zhí)行前,協(xié)程對象不執(zhí)行
await等待協(xié)程執(zhí)行完成,當(dāng)遇到阻塞調(diào)用的函數(shù)時,await方法將協(xié)程的控制權(quán)讓出,繼續(xù)執(zhí)行其他協(xié)程
asyncio模塊:get_event_loop() run_until_complete() iscoroutinefunction(do_sth)

import asyncio


async def do_sth(x):
    """ 定義協(xié)程函數(shù) """
    print('等待中: {0}'.format(x))
    await asyncio.sleep(x)

# 判斷是否為協(xié)程函數(shù)
print(asyncio.iscoroutinefunction(do_sth))

coroutine = do_sth(5)
# 事件的循環(huán)隊列
loop = asyncio.get_event_loop()
# 注冊任務(wù)
task = loop.create_task(coroutine)
print(task)
# 等待協(xié)程任務(wù)執(zhí)行結(jié)束
loop.run_until_complete(task)
print(task)

12. 協(xié)程通信之嵌套調(diào)用和隊列

  • 嵌套調(diào)用
import asyncio


async def compute(x, y):
    print('計算x +y => {0}+{1}'.format(x, y))
    await asyncio.sleep(3)
    return x + y


async def get_sum(x, y):
    rest = await compute(x, y)
    print('{0} + {1} = {2}'.format(x, y, rest))

# 拿到事件循環(huán)
loop = asyncio.get_event_loop()
loop.run_until_complete(get_sum(1, 2))
loop.close()
  • 隊列
# 1. 定義一個隊列
# 2. 讓兩個協(xié)程來進行通信
# 3. 讓其中一個協(xié)程往隊列中寫入數(shù)據(jù)
# 4. 讓另一個協(xié)程從隊列中刪除數(shù)據(jù)

import asyncio
import random


async def add(store, name):
    """
    寫入數(shù)據(jù)到隊列
    :param store: 隊列的對象
    :return:
    """
    for i in range(5):
        # 往隊列中添加數(shù)字
        num = '{0} - {1}'.format(name, i)
        await asyncio.sleep(random.randint(1, 5))
        await store.put(i)
        print('{2} add one ... {0}, size: {1}'.format(
            num, store.qsize(), name))


async def reduce(store):
    """
    從隊列中刪除數(shù)據(jù)
    :param store:
    :return:
    """
    for i in range(10):
        rest = await store.get()
        print(' reduce one.. {0}, size: {1}'.format(rest, store.qsize()))


if __name__ == '__main__':
    # 準(zhǔn)備一個隊列
    store = asyncio.Queue(maxsize=5)
    a1 = add(store, 'a1')
    a2 = add(store, 'a2')
    r1 = reduce(store)

    # 添加到事件隊列
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(a1, a2, r1))
    loop.close()

使用async和await來改寫協(xié)程

import random
from queue import Queue

import asyncio


class Bread(object):
    """ 饅頭類 """

    def __init__(self, name):
        self.name = name

    def __str__(self):
        return self.name


async def consumer(name, basket, lock):
    """
    消費者
    :param name  協(xié)程的名稱
    :param basket  籃子,用于存放饅頭
    :return:
    """
    while True:
        with await lock:
            # 如果沒有饅頭了,則自己休息,喚醒生產(chǎn)者進行生產(chǎn)
            if basket.empty():
                print('{0}@@@消費完了@@@'.format(name))
                # 喚醒他人
                lock.notify_all()
                # 休息
                await lock.wait()
            else:
                # 取出饅頭
                bread = basket.get()
                print('>>{0} 消費饅頭 {1}, size: {2}'.format(
                    name, bread, basket.qsize()
                ))
                await asyncio.sleep(random.randint(1, 5))


async def producer(name, basket, lock):
    """
    生產(chǎn)者
    :param name  協(xié)程的名稱
    :param basket  籃子,用于存放饅頭
    :return:
    """
    print('{0} 開始生產(chǎn)'.format(name))
    while True:
        with await lock:
            # 饅頭生產(chǎn)滿了,休息生產(chǎn)者,喚醒消費者進行消費
            if basket.full():
                print('{0} 生產(chǎn)滿了'.format(name))
                # 喚醒他人
                lock.notify_all()
                # 自己休息
                await lock.wait()
            else:
                # 饅頭的名字
                bread_name = '{0}_{1}'.format(name, basket.counter)
                bread = Bread(bread_name)
                # 將饅頭放入籃子
                basket.put(bread)
                print('>>{0} 生產(chǎn)饅頭 {1}, size: {2}'.format(name, bread_name, basket.qsize()))
                # 計數(shù)+ 1
                basket.counter += 1
                await asyncio.sleep(random.randint(1, 2))


class Basket(Queue):
    """ 自定義的倉庫 """
    # 饅頭生產(chǎn)的計數(shù)器
    counter = 0


def main():
    lock = asyncio.Condition()
    # 籃子,用于放饅頭,協(xié)程通信使用
    basket = Basket(maxsize=5)
    p1 = producer('P1', basket, lock)
    p2 = producer('P2', basket, lock)
    p3 = producer('P3', basket, lock)
    c1 = consumer('C1', basket, lock)
    c2 = consumer('C2', basket, lock)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(p1, p2, p3, c1, c2))
    loop.close()


if __name__ == '__main__':
    main()

官方文檔
https://docs.python.org/3/library/asyncio-task.html#coroutine

https://www.imooc.com/article/266370

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一. 操作系統(tǒng)概念 操作系統(tǒng)位于底層硬件與應(yīng)用軟件之間的一層.工作方式: 向下管理硬件,向上提供接口.操作系統(tǒng)進行...
    月亮是我踢彎得閱讀 6,170評論 3 28
  • 1. 內(nèi)存管理機制 1.1 介紹 概要賦值語句內(nèi)存分析垃圾回收機制內(nèi)存管理機制 目標(biāo)掌握賦值語句內(nèi)存分析方法掌握i...
    nimw閱讀 892評論 0 2
  • 必備的理論基礎(chǔ) 1.操作系統(tǒng)作用: 隱藏丑陋復(fù)雜的硬件接口,提供良好的抽象接口。 管理調(diào)度進程,并將多個進程對硬件...
    drfung閱讀 3,767評論 0 5
  • 線程 操作系統(tǒng)線程理論 線程概念的引入背景 進程 之前我們已經(jīng)了解了操作系統(tǒng)中進程的概念,程序并不能單獨運行,只有...
    go以恒閱讀 1,797評論 0 6
  • Mac OS X,UNIX,Linux,Windows等,都是多任務(wù)操作系統(tǒng)即操作系統(tǒng)可以同時運行多個任務(wù)。對于操...
    楓頔閱讀 588評論 0 1

友情鏈接更多精彩內(nèi)容