多進(jìn)程之間通信的限制
- 進(jìn)程之間是獨(dú)立的,互不干擾的內(nèi)存空間。
我們先看個(gè)例子
a = 1 #定義全局變量
def func():
global a
a=2 #修改全局變量值
print(a)
func()
print(a)
運(yùn)行結(jié)果:

image.png
再看利用進(jìn)程運(yùn)行的例子:
import multiprocessing
a = 1 #定義全局變量
def func():
global a
a=2 #修改全局變量值
print(a)
process = multiprocessing.Process(target=func)
process.start()
process.join() #等待子進(jìn)程執(zhí)行完再繼續(xù)執(zhí)行
print(a)

image.png
通過上面2個(gè)例子運(yùn)行結(jié)果分析:
按通常應(yīng)該都是2,應(yīng)該修改了全局變量值,但是這里只有子進(jìn)程是2,主進(jìn)程是1。
這是因?yàn)檫M(jìn)程之間是獨(dú)立的,互不干擾的內(nèi)存空間,故子進(jìn)程修改的,不影響主進(jìn)程的。
進(jìn)程間通信的解決方案

image.png
print('--------------進(jìn)程間通信的解決方案--------------')
manager = multiprocessing.Manager() #創(chuàng)建一個(gè)服務(wù)器進(jìn)程,并返回與其通信的管理器
list_proxy = manager.list() #通過管理器在服務(wù)器進(jìn)程中開辟一個(gè)列表空間,并返回一個(gè)代理
print(list_proxy) #用法和list一樣
def func2(list):
list.append('a')
print(list)
#把代理傳給子進(jìn)程,子進(jìn)程里就可以通過這個(gè)代理,來操作共享空間來進(jìn)行通信
process2 = multiprocessing.Process(target=func2, args=(list_proxy,))
process2.start()
process2.join() #等待子進(jìn)程執(zhí)行完再繼續(xù)執(zhí)行
print(list_proxy)
運(yùn)行結(jié)果:
image.png
- 一般常用的空間類型是:
mgr.list()、mgr.dict()、mgr.Queue()
多線程之間通信的限制
注意:因?yàn)榫€程屬于同一個(gè)進(jìn)程,因此它們之間共享內(nèi)存區(qū)域,因此全局變量是公共的。
import threading
a = 1
def func3():
global a
a = 2
print(a)
thread = threading.Thread(target=func3)
thread.start()
thread.join()
print(a)
運(yùn)行結(jié)果:
image.png
但是多線程間共享內(nèi)存間存在競(jìng)爭(zhēng)問題。
print('--------------多線程共享內(nèi)存間存在競(jìng)爭(zhēng)問題--------------')
import threading
data = 0
n = 100000
def add(n):
global data
for i in range(n):
data +=i
def sub(n):
global data
for i in range(n):
data -=i
t_add = threading.Thread(target=add, args=(n,))
t_sub = threading.Thread(target=sub, args=(n,))
t_add.start()
t_sub.start()
t_add.join()
t_sub.join() #這2個(gè)地方加join阻塞目的是為了讓子進(jìn)程執(zhí)行完,最后能在主進(jìn)程看到data,所以用join來阻塞
print(data)

image.png
加了n次減了n次,結(jié)果卻為負(fù)數(shù),按正常應(yīng)該為0。
使用鎖來控制共享資源的訪問。
print('--------------使用鎖來控制共享資源的訪問--------------')
import threading
data = 0
n = 1000000
lock = threading.Lock() #生成一把鎖
def add(n):
global data
for i in range(n):
# lock.acquire() #加鎖
# data +=i
# lock.release() #釋放鎖
#可以寫生上下文格式
with lock:
data +=i
def sub(n):
global data
for i in range(n):
# lock.acquire() #加鎖
# data -=i
# lock.release() #釋放鎖
with lock:
data -=i
t_add = threading.Thread(target=add, args=(n,))
t_sub = threading.Thread(target=sub, args=(n,))
t_add.start()
t_sub.start()
t_add.join()
t_sub.join() #這2個(gè)地方加join阻塞目的是為了讓子進(jìn)程執(zhí)行完,最后能在主進(jìn)程看到data,所以用join來阻塞
print(data)
運(yùn)行結(jié)果:
image.png
這樣才達(dá)到目的,就像去銀行存錢取錢,存取不多不少!
線程與進(jìn)程的安全隊(duì)列
隊(duì)列:先進(jìn)先出,一個(gè)入口,一個(gè)出口。
image.png
- 線程安全隊(duì)列操作
queue.Queue:
入隊(duì): put(item)
出隊(duì): get()
測(cè)試空: empty() # 近似
測(cè)試滿: full() # 近似
隊(duì)列長(zhǎng)度: qsize() # 近似
任務(wù)結(jié)束: task_done()
等待完成: join() - 進(jìn)程安全隊(duì)列操作
mgr.Queue:
入隊(duì): put(item)
出隊(duì): get()
測(cè)試空: empty() # 近似
測(cè)試滿: full() # 近似
隊(duì)列長(zhǎng)度: qsize() # 近似
進(jìn)程比線程少了task_done()和 join()方法。
生產(chǎn)者和消費(fèi)者模型
所謂,生產(chǎn)者與消費(fèi)者模型,本質(zhì)上是把進(jìn)程通信的問題分開考慮生產(chǎn)只需要往隊(duì)列里面丟東西(生產(chǎn)者不需要關(guān)心消費(fèi)者)消費(fèi)者,只需要從隊(duì)列里面拿東西(消費(fèi)者也不需要關(guān)心生產(chǎn)者)。

image.png

image.png
線程實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型
print('--------------生產(chǎn)者與消費(fèi)者模型--------------')
'''
所謂,生產(chǎn)者與消費(fèi)者模型,本質(zhì)上是把進(jìn)程通信的問題分開考慮
生產(chǎn)者,只需要往隊(duì)列里面丟東西(生產(chǎn)者不需要關(guān)心消費(fèi)者)
消費(fèi)者,只需要從隊(duì)列里面拿東西(消費(fèi)者也不需要關(guān)心生產(chǎn)者)
'''
print('--------------多線程的消費(fèi)者與生產(chǎn)者模式--------------')
'''
生產(chǎn)者:沒滿,則生產(chǎn),只關(guān)心隊(duì)列是否已滿。滿了就阻塞。
消費(fèi)者:只關(guān)心隊(duì)列是否為空。不為空,則消費(fèi),為空則阻塞。
'''
import threading
import queue
import random
import time
class Producer(threading.Thread): #生產(chǎn)者
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
while True:
item = random.randint(0, 10) #創(chuàng)建0~99
#只要隊(duì)列沒滿,就向隊(duì)列中添加數(shù)據(jù)
self.queue.put(item)
print('生產(chǎn)者-->生產(chǎn):%s'%item)
time.sleep(1)
class Customer(threading.Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
while True:
#只要隊(duì)列不為空,就從隊(duì)列中取數(shù)據(jù)
itme = self.queue.get()
print('消費(fèi)者-->消費(fèi):%s'%itme)
time.sleep(1)
q =queue.Queue(5) #長(zhǎng)度為5
producer = Producer(q)
custormer = Customer(q)
producer.start()
custormer.start()
producer.join()
運(yùn)行結(jié)果:
image.png
進(jìn)程實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型
import multiprocessing
import queue
import random
import time
class Producer(multiprocessing.Process): #生產(chǎn)者
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
while True:
item = random.randint(0, 10) #創(chuàng)建0~99
#只要隊(duì)列沒滿,就向隊(duì)列中添加數(shù)據(jù)
self.queue.put(item)
print('生產(chǎn)者-->生產(chǎn):%s'%item)
time.sleep(1)
class Customer(multiprocessing.Process):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
while True:
#只要隊(duì)列不為空,就從隊(duì)列中取數(shù)據(jù)
itme = self.queue.get()
print('消費(fèi)者-->消費(fèi):%s'%itme)
time.sleep(1)
manager = multiprocessing.Manager() #創(chuàng)建一個(gè)服務(wù)器進(jìn)程,并返回與其通信的管理器
q =manager.Queue(5) #長(zhǎng)度為5
producer = Producer(q)
custormer = Customer(q)
producer.start()
custormer.start()
producer.join()
運(yùn)行結(jié)果:
image.png