引用計數(shù)和分代回收
多線程和鎖和線程池
多進程和通信和鎖和進程池
協(xié)程和異步io async await
內(nèi)存管理
1. 賦值語句分析
值傳遞,傳的是對象的地址,數(shù)字字符元組為不可變類型,賦值后會指向新的地址
def p(word, res=[]):
print(res, id(res))
res.append(word)
print(res, id(res))
return res
res1 = p(3)
res2 = p('12', [])
res3 = p(10)
# result
[] 4447330952
[3] 4447330952
[] 4447252360
['12'] 4447252360
[3] 4447330952
[3, 10] 4447330952
2. 垃圾回收機制
- 以引用計數(shù)為主,分代收集為輔
- 如果一個對象的引用數(shù)為0,python就會回收這個對象的內(nèi)存
- 引用計數(shù)的缺陷是循環(huán)引用的問題,所以用分代回收機制來解決這個問題
3. 內(nèi)存管理機制
- 引用計數(shù)
sys.getrefcount() 獲得摸個對象的引用計數(shù),使用del關(guān)鍵字刪除某個引用 - 垃圾回收
python記錄分配對象和取消分配對象的次數(shù),當(dāng)兩者的差值高于某個閾值時,垃圾回收才啟動
通過gc.get_threshold()來查看 0,1,2代的閾值 - 分代回收
新建對象為0代對象,某一代經(jīng)歷垃圾回收,依然存在,歸入下一代對象
git.collect(generation=2)指定哪個代回收,默認是2代表所有都回收了
del p 刪除某個對象
objgraph模塊中的count()可以記錄當(dāng)前類產(chǎn)生的實例對象的個數(shù):objgraph.count('Cat') Cat類的對象個數(shù) - 官方指南
https://docs.python.org/3/library/gc.html#gc.collect
Set the garbage collection thresholds (the collection frequency).
Setting *threshold0* to zero disables collection.
The GC classifies objects into three generations depending on how many collection
sweeps they have survived. New objects are placed in the youngest generation
(generation `0`). If an object survives a collection it is moved into the next older
generation. Since generation `2` is the oldest generation, objects in that generation
remain there after a collection. In order to decide when to run, the collector keeps track of
the number object allocations and deallocations since the last collection. When the
number of allocations minus the number of deallocations exceeds *threshold0*, collection
starts. Initially only generation `0` is examined. If generation `0` has been examined more
than *threshold1* times since generation `1` has been examined, then generation `1` is
examined as well. Similarly, *threshold2* controls the number of collections of
generation `1` before collecting generation `2`.
- 內(nèi)存池機制,預(yù)先在內(nèi)存中申請一定數(shù)量的,大小想到的內(nèi)存塊作備用,有新的內(nèi)存需求先從內(nèi)存池中分配; Pyhton3內(nèi)存管理機制 Pymalloc 針對小對象(<512bytes) ,pymalloc會在內(nèi)存池中申請內(nèi)存空間,當(dāng)>512bytes, 則會PyMem_RawMalloc()和PyMem_RawRealloc()來申請新的空間
4. 源碼剖析
https://github.com/Junnplus/blog/projects/1
多線程
1. 進程,線程和協(xié)程

2. 進程和線程的關(guān)系
進程的內(nèi)存空間等等不同
線程共享進程上下文,包括開始,執(zhí)行順序,結(jié)束;可以被搶占(中斷)和臨時掛起(睡眠)- 讓步
并行(parallelism)一個時間有多個任務(wù),并發(fā)(concurrency)多個任務(wù)同時運行
3. 多核
GIL - 全局解釋器鎖
GIL強制在任何時候只有一個線程可以執(zhí)行python代碼
I/O密集型應(yīng)用與CPU密集型應(yīng)用
GIL執(zhí)行順序:
- 設(shè)置GIL
- 切換一個線程運行
- 執(zhí)行指定數(shù)量的字節(jié)碼指令
- 線程主動讓出控制權(quán)(time.sleep(0))
- 把線程設(shè)置為睡眠狀態(tài)(切換出線程)
- 解鎖GIL
- 重復(fù)上面的步驟
4. 實現(xiàn)一個線程
threading.Thread創(chuàng)建線程,start()啟動線程,join()掛起線程



兩種創(chuàng)建方法:載入運行的函數(shù),或者繼承Thread類
# 方法1
import threading
import time
def loop():
""" 新的線程執(zhí)行的代碼 """
n = 0
while n < 5:
print(n)
now_thread = threading.current_thread()
print('[loop]now thread name : {0}'.format(now_thread.name))
time.sleep(1)
n += 1
def use_thread():
""" 使用線程來實現(xiàn) """
# 當(dāng)前正在執(zhí)行的線程名稱
now_thread = threading.current_thread()
print('now thread name : {0}'.format(now_thread.name))
# 設(shè)置線程
t = threading.Thread(target=loop, name='loop_thread')
# 啟動線程
t.start()
# 掛起線程
t.join()
if __name__ == '__main__':
use_thread()
# 方法2
import threading
import time
class LoopThread(threading.Thread):
""" 自定義線程 """
n = 0
def run(self):
while self.n < 5:
print(self.n)
now_thread = threading.current_thread()
print('[loop]now thread name : {0}'.format(now_thread.name))
time.sleep(1)
self.n += 1
if __name__ == '__main__':
# 當(dāng)前正在執(zhí)行的線程名稱
now_thread = threading.current_thread()
print('now thread name : {0}'.format(now_thread.name))
t = LoopThread(name='loop_thread_oop')
t.start()
t.join()
5. 多線程并發(fā)問題和鎖
線程ThreadLocal
https://www.liaoxuefeng.com/wiki/1016959663602400/1017630786314240
Lock和RLock和Condition,RLock可以鎖多次(在同一個線程里),釋放的時候也要釋放多次
有兩種實現(xiàn)方式:try except finally; with
import threading
import time
# 獲得一把鎖
my_lock = threading.Lock()
your_lock = threading.RLock()
# 我的銀行賬戶
balance = 0
def change_it(n):
""" 改變我的余額 """
global balance
# 方式一,使用with
with your_lock:
balance = balance + n
time.sleep(2)
balance = balance - n
time.sleep(1)
print('-N---> {0}; balance: {1}'.format(n, balance))
# 方式二
# try:
# print('start lock')
# # 添加鎖
# your_lock.acquire()
# print('locked one ')
# # 資源已經(jīng)被鎖住了,不能重復(fù)鎖定, 產(chǎn)生死鎖
# your_lock.acquire()
# print('locked two')
# balance = balance + n
# time.sleep(2)
# balance = balance - n
# time.sleep(1)
# print('-N---> {0}; balance: {1}'.format(n, balance))
# finally:
# # 釋放掉鎖
# your_lock.release()
# your_lock.release()
class ChangeBalanceThread(threading.Thread):
"""
改變銀行余額的線程
"""
def __init__(self, num, *args, **kwargs):
super().__init__(*args, **kwargs)
self.num = num
def run(self):
for i in range(100):
change_it(self.num)
if __name__ == '__main__':
t1 = ChangeBalanceThread(5)
t2 = ChangeBalanceThread(8)
t1.start()
t2.start()
t1.join()
t2.join()
print('the last: {0}'.format(balance))
6. 線程的調(diào)度和優(yōu)化,線程池
使用線程池
兩種方法:
Pool(10) map()/submit() close() join()
with ThreadPoolExecutor(max_workers=10) as executor: map()
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from multiprocessing.dummy import Pool
def run(n):
""" 線程要做的事情 """
time.sleep(2)
print(threading.current_thread().name, n)
def main():
""" 使用傳統(tǒng)的方法來做任務(wù) """
t1 = time.time()
for n in range(100):
run(n)
print(time.time() - t1)
def main_use_thread():
""" 使用線程優(yōu)化任務(wù) """
# 資源有限,最多只能跑10個線程
t1 = time.time()
ls = []
for count in range(10):
for i in range(10):
t = threading.Thread(target=run, args=(i,))
ls.append(t)
t.start()
for l in ls:
l.join()
print(time.time() - t1)
def main_use_pool():
""" 使用線程池來優(yōu)化 """
t1 = time.time()
n_list = range(100)
pool = Pool(10)
pool.map(run, n_list)
pool.close()
pool.join()
print(time.time() - t1)
def main_use_executor():
""" 使用 ThreadPoolExecutor 來優(yōu)化"""
t1 = time.time()
n_list = range(100)
with ThreadPoolExecutor(max_workers=10) as executor:
executor.map(run, n_list)
print(time.time() - t1)
if __name__ == '__main__':
# main()
# main_use_thread()
# main_use_pool()
main_use_executor()
7. 進程實現(xiàn)
multiprocessing模塊: multiprocessing.Process; start() join(); os.getpid()
兩種方式去實現(xiàn):傳入函數(shù)或者面向?qū)ο?/p>
import os
import time
from multiprocessing import Process
def do_sth(name):
"""
進程要做的事情
:param name: str 進程的名稱
"""
print('進程的名稱:{0}, pid: {1}'.format(name, os.getpid()))
time.sleep(150)
print('進程要做的事情')
class MyProcess(Process):
def __init__(self, name, *args, **kwargs):
super().__init__(*args, **kwargs)
self.my_name = name
def run(self):
print('MyProcess進程的名稱:{0}, pid: {1}'.format(
self.my_name, os.getpid()))
time.sleep(150)
print('MyProcess進程要做的事情')
if __name__ == '__main__':
# p = Process(target=do_sth, args=('my process', ))
p = MyProcess('my process class')
# 啟動進程
p.start()
# 掛起進程
p.join()
8. 多進程通信
Queue和Pipes
共享一個對象https://docs.python.org/3/library/multiprocessing.html#proxy-objects
Multiprocessing.Value()
from multiprocessing import Process, Queue, current_process
import random
import time
class WriteProcess(Process):
""" 寫的進程 """
def __init__(self, q, *args, **kwargs):
self.q = q
super().__init__(*args, **kwargs)
def run(self):
""" 實現(xiàn)進程的業(yè)務(wù)邏輯 """
# 要寫的內(nèi)容
ls = [
"第一行內(nèi)容",
"第2行內(nèi)容",
"第3行內(nèi)容",
"第4行內(nèi)容",
]
for line in ls:
print('寫入內(nèi)容: {0} -{1}'.format(line, current_process().name))
self.q.put(line)
# 每寫入一次,休息1-5秒
time.sleep(random.randint(1, 5))
class ReadProcess(Process):
""" 讀取內(nèi)容進程 """
def __init__(self, q, *args, **kwargs):
self.q = q
super().__init__(*args, **kwargs)
def run(self):
while True:
content = self.q.get()
print('讀取到的內(nèi)容:{0} - {1}'.format(content, current_process().name))
if __name__ == '__main__':
# 通過Queue共享數(shù)據(jù)
q = Queue()
# 寫入內(nèi)容的進程
t_write = WriteProcess(q)
t_write.start()
# 讀取進程啟動
t_read = ReadProcess(q)
t_read.start()
t_write.join()
# 因為讀的進程是死循環(huán),無法等待其結(jié)束,只能強制終止
t_read.terminate()
9. 多進程中的鎖
Lock(), Rlock(), Condition()
機制跟多線程鎖基本是一樣的,用try finally 或者with結(jié)構(gòu)
import random
from multiprocessing import Process, Lock, RLock
import time
class WriteProcess(Process):
""" 寫入文件 """
def __init__(self, file_name, num, lock, *args, **kwargs):
# 文件的名稱
self.file_name = file_name
self.num = num
# 鎖對象
self.lock = lock
super().__init__(*args, **kwargs)
def run(self):
""" 寫入文件的主要業(yè)務(wù)邏輯 """
with self.lock:
# try:
# # 添加鎖
# self.lock.acquire()
# print('locked')
# self.lock.acquire()
# print('relocked')
for i in range(5):
content = '現(xiàn)在是: {0} : {1} - {2} \n'.format(
self.name,
self.pid,
self.num
)
with open(self.file_name, 'a+', encoding='utf-8') as f:
f.write(content)
time.sleep(random.randint(1, 5))
print(content)
# finally:
# # 釋放鎖
# self.lock.release()
# self.lock.release()
if __name__ == '__main__':
file_name = 'test.txt'
# 所的對象
lock = RLock()
for x in range(5):
p = WriteProcess(file_name, x, lock)
p.start()
10. 進程池
有同步和異步的方法;Pool() apply() apply_async() map() close()
https://docs.python.org/3/library/multiprocessing.html?highlight=pool#module-multiprocessing.pool
import random
from multiprocessing import current_process, Pool
import time
def run(file_name, num):
"""
進程執(zhí)行的業(yè)務(wù)邏輯
往文件中寫入數(shù)據(jù)
:param file_name: str 文件名稱
:param num: int 寫入的數(shù)字
:return: str 寫入的結(jié)果
"""
with open(file_name, 'a+', encoding='utf-8') as f:
# 當(dāng)前的進程
now_process = current_process()
# 寫入的內(nèi)容
conent = '{0} - {1}- {2}'.format(
now_process.name,
now_process.pid,
num
)
f.write(conent)
f.write('\n')
# 寫完之后隨機休息1-5秒
time.sleep(random.randint(1, 5))
print(conent)
return 'ok'
if __name__ == '__main__':
file_name = 'test_pool.txt'
# 進程池
pool = Pool(2)
rest_list = []
for i in range(20):
# 同步添加任務(wù)
# rest = pool.apply(run, args=(file_name, i))
rest = pool.apply_async(run, args=(file_name, i))
rest_list.append(rest)
print('{0}--- {1}'.format(i, rest))
# 關(guān)閉池子
pool.close()
pool.join()
# 查看異步執(zhí)行的結(jié)果
print(rest_list[0].get())
11. 協(xié)程
協(xié)同多任務(wù),不需要鎖機制,用多進程+協(xié)程對多核CPU的利用
python 3..5之前使用 yield實現(xiàn)
https://docs.python.org/3/library/asyncio-task.html
def count_down(n):
""" 倒計時效果 """
while n > 0:
yield n
n -= 1
def yield_test():
""" 實現(xiàn)協(xié)程函數(shù) """
while True:
n = (yield )
print(n)
if __name__ == '__main__':
# rest = count_down(5)
# print(next(rest))
# print(next(rest))
# print(next(rest))
# print(next(rest))
# print(next(rest))
rest = yield_test()
next(rest)
rest.send('6666')
rest.send('6666')
之后用async和await來實現(xiàn)
async函數(shù)被調(diào)用時,返回一個協(xié)程對象,不執(zhí)行這個函數(shù)
在事件循環(huán)調(diào)度其執(zhí)行前,協(xié)程對象不執(zhí)行
await等待協(xié)程執(zhí)行完成,當(dāng)遇到阻塞調(diào)用的函數(shù)時,await方法將協(xié)程的控制權(quán)讓出,繼續(xù)執(zhí)行其他協(xié)程
asyncio模塊:get_event_loop() run_until_complete() iscoroutinefunction(do_sth)
import asyncio
async def do_sth(x):
""" 定義協(xié)程函數(shù) """
print('等待中: {0}'.format(x))
await asyncio.sleep(x)
# 判斷是否為協(xié)程函數(shù)
print(asyncio.iscoroutinefunction(do_sth))
coroutine = do_sth(5)
# 事件的循環(huán)隊列
loop = asyncio.get_event_loop()
# 注冊任務(wù)
task = loop.create_task(coroutine)
print(task)
# 等待協(xié)程任務(wù)執(zhí)行結(jié)束
loop.run_until_complete(task)
print(task)
12. 協(xié)程通信之嵌套調(diào)用和隊列
- 嵌套調(diào)用
import asyncio
async def compute(x, y):
print('計算x +y => {0}+{1}'.format(x, y))
await asyncio.sleep(3)
return x + y
async def get_sum(x, y):
rest = await compute(x, y)
print('{0} + {1} = {2}'.format(x, y, rest))
# 拿到事件循環(huán)
loop = asyncio.get_event_loop()
loop.run_until_complete(get_sum(1, 2))
loop.close()
- 隊列
# 1. 定義一個隊列
# 2. 讓兩個協(xié)程來進行通信
# 3. 讓其中一個協(xié)程往隊列中寫入數(shù)據(jù)
# 4. 讓另一個協(xié)程從隊列中刪除數(shù)據(jù)
import asyncio
import random
async def add(store, name):
"""
寫入數(shù)據(jù)到隊列
:param store: 隊列的對象
:return:
"""
for i in range(5):
# 往隊列中添加數(shù)字
num = '{0} - {1}'.format(name, i)
await asyncio.sleep(random.randint(1, 5))
await store.put(i)
print('{2} add one ... {0}, size: {1}'.format(
num, store.qsize(), name))
async def reduce(store):
"""
從隊列中刪除數(shù)據(jù)
:param store:
:return:
"""
for i in range(10):
rest = await store.get()
print(' reduce one.. {0}, size: {1}'.format(rest, store.qsize()))
if __name__ == '__main__':
# 準(zhǔn)備一個隊列
store = asyncio.Queue(maxsize=5)
a1 = add(store, 'a1')
a2 = add(store, 'a2')
r1 = reduce(store)
# 添加到事件隊列
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(a1, a2, r1))
loop.close()
使用async和await來改寫協(xié)程
import random
from queue import Queue
import asyncio
class Bread(object):
""" 饅頭類 """
def __init__(self, name):
self.name = name
def __str__(self):
return self.name
async def consumer(name, basket, lock):
"""
消費者
:param name 協(xié)程的名稱
:param basket 籃子,用于存放饅頭
:return:
"""
while True:
with await lock:
# 如果沒有饅頭了,則自己休息,喚醒生產(chǎn)者進行生產(chǎn)
if basket.empty():
print('{0}@@@消費完了@@@'.format(name))
# 喚醒他人
lock.notify_all()
# 休息
await lock.wait()
else:
# 取出饅頭
bread = basket.get()
print('>>{0} 消費饅頭 {1}, size: {2}'.format(
name, bread, basket.qsize()
))
await asyncio.sleep(random.randint(1, 5))
async def producer(name, basket, lock):
"""
生產(chǎn)者
:param name 協(xié)程的名稱
:param basket 籃子,用于存放饅頭
:return:
"""
print('{0} 開始生產(chǎn)'.format(name))
while True:
with await lock:
# 饅頭生產(chǎn)滿了,休息生產(chǎn)者,喚醒消費者進行消費
if basket.full():
print('{0} 生產(chǎn)滿了'.format(name))
# 喚醒他人
lock.notify_all()
# 自己休息
await lock.wait()
else:
# 饅頭的名字
bread_name = '{0}_{1}'.format(name, basket.counter)
bread = Bread(bread_name)
# 將饅頭放入籃子
basket.put(bread)
print('>>{0} 生產(chǎn)饅頭 {1}, size: {2}'.format(name, bread_name, basket.qsize()))
# 計數(shù)+ 1
basket.counter += 1
await asyncio.sleep(random.randint(1, 2))
class Basket(Queue):
""" 自定義的倉庫 """
# 饅頭生產(chǎn)的計數(shù)器
counter = 0
def main():
lock = asyncio.Condition()
# 籃子,用于放饅頭,協(xié)程通信使用
basket = Basket(maxsize=5)
p1 = producer('P1', basket, lock)
p2 = producer('P2', basket, lock)
p3 = producer('P3', basket, lock)
c1 = consumer('C1', basket, lock)
c2 = consumer('C2', basket, lock)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(p1, p2, p3, c1, c2))
loop.close()
if __name__ == '__main__':
main()
官方文檔
https://docs.python.org/3/library/asyncio-task.html#coroutine