摘要:Python,多進(jìn)程
多進(jìn)程變量同步的場(chǎng)景和方法
場(chǎng)景:在使用Python多進(jìn)程并行時(shí)需要在進(jìn)程間共享變量,這些共享的變量可以更好地控制和把握任務(wù)執(zhí)行的情況,比如查看任務(wù)進(jìn)度,提前停止任務(wù)等
方法:在多線程中變量共享在主線程中定義變量,在每個(gè)子線程中使用global關(guān)鍵字拿到變量,再配合threading.RLock()在對(duì)變量操作時(shí)拿到和釋放鎖(acquire和release)即可,但是在多進(jìn)程中,變量是放在不同子進(jìn)程的數(shù)據(jù)區(qū)中,每個(gè)進(jìn)程都是獨(dú)立的地址空間,所以用一般的方法是不能共享變量的,multiprocessing模塊提供了Array,Manager,Value類來(lái)定義共享變量,能夠?qū)崿F(xiàn)進(jìn)程間共享數(shù)字,字符串,列表,字典,實(shí)例對(duì)象的變量共享
共享整數(shù)變量
對(duì)于整數(shù)的多進(jìn)程共享是常用的場(chǎng)景,比如使用多進(jìn)程并行任務(wù),需要記錄執(zhí)行日志記錄任務(wù)進(jìn)度,實(shí)例代碼如下
import multiprocessing
from multiprocessing import Pool, Lock, Value
from utils.logger_utils import logging_
LOGGER = logging_("predict_main", os.path.join(ROOT_PATH, "./logs/details.log"))
lock = Lock()
Counter = Value('i', 0)
ENT_LIST = list(set([line.strip().replace("(", "(").replace(")", ")") for line in open(os.path.join(BASIC_PATH, "data/ent_name_predict.txt"), "r", encoding="utf8").readlines()]))
TOTAL = len(ENT_LIST)
def get_one_res(data):
global TOTAL, lock, Counter
res = {}
try:
ent_name = data
res = get_feature(ent_name, PREDICT_DATE)
res["updatedate"] = PREDICT_DATE
res["uid"] = get_md5(formatted_ent(ent_name))
except Exception as e:
LOGGER.error(data + ":錯(cuò)誤:" + e.args[0])
finally:
with lock:
Counter.value += 1
LOGGER.info("執(zhí)行完成:(%d / %d) 進(jìn)程號(hào): %d --------------- %s", Counter.value, TOTAL, os.getpid(), data)
return res
if __name__ == '__main__':
pool = Pool(int(get_string("process_num")))
res = pool.map(get_one_res, ENT_LIST)
LOGGER.info("全部執(zhí)行完成,關(guān)閉進(jìn)程池")
pool.close()
pool.join()
運(yùn)行查看執(zhí)行日志
2021-11-18 15:19:15 [predict_main] INFO [42] 執(zhí)行完成:(1 / 1400) 進(jìn)程號(hào): 15 --------------- 深圳順亞投資有限公司
2021-11-18 15:19:16 [predict_main] INFO [42] 執(zhí)行完成:(2 / 1400) 進(jìn)程號(hào): 25 --------------- 蕪湖新?lián)P投資合伙企業(yè)(有限合伙)
2021-11-18 15:19:18 [predict_main] INFO [42] 執(zhí)行完成:(3 / 1400) 進(jìn)程號(hào): 24 --------------- 保定隆瑞房地產(chǎn)開發(fā)有限公司
2021-11-18 15:19:19 [predict_main] INFO [42] 執(zhí)行完成:(4 / 1400) 進(jìn)程號(hào): 11 --------------- 云南俊發(fā)凱豐房地產(chǎn)開發(fā)有限公司
在全局定義鎖和計(jì)數(shù)器,Value('i', 0)代表定義的共享變量是int類型初始值是0,如果要定義double變量則使用Value('d', 0),相當(dāng)于java里面的原子變量,在執(zhí)行函數(shù)中調(diào)用with上下文在實(shí)行完任務(wù)后調(diào)用Counter.value += 1實(shí)現(xiàn)計(jì)數(shù)+1,最后在進(jìn)程池中調(diào)用執(zhí)行方法,每個(gè)并行的任務(wù)在執(zhí)行完畢會(huì)調(diào)用鎖進(jìn)行計(jì)數(shù)器+1,同一時(shí)刻只有一個(gè)子進(jìn)程拿到鎖實(shí)現(xiàn)進(jìn)程同步,如果不采用鎖的方式,在日志中計(jì)數(shù)器會(huì)亂序,但是最終總的值相等
共享布爾變量
這種情況在全局中記錄一個(gè)布爾變量,每次執(zhí)行任務(wù)前拿到變量判斷是否與預(yù)期一致,如果執(zhí)行報(bào)錯(cuò)修改變量狀態(tài),多用于子進(jìn)程中任務(wù)報(bào)錯(cuò)提前結(jié)束全部任務(wù)全部退出,代碼如下
from multiprocessing import Pool, Lock, Manager
from ctypes import c_bool
import os
lock = Lock()
ERROR = Manager().Value(c_bool, False)
def run(fn):
global tests_count, lock, ERROR
if not ERROR.value:
try:
print('執(zhí)行任務(wù). PID: %d ' % (os.getpid()))
1 / 0
except Exception as e:
with lock:
ERROR.value = True
else:
print("子進(jìn)程報(bào)錯(cuò),任務(wù)結(jié)束")
if __name__ == "__main__":
pool = Pool(10)
# 80個(gè)任務(wù),會(huì)運(yùn)行run()80次,每次傳入xrange數(shù)組一個(gè)元素
pool.map(run, list(range(80)))
pool.close()
pool.join()
查看執(zhí)行輸出
執(zhí)行任務(wù). PID: 27374
子進(jìn)程報(bào)錯(cuò),任務(wù)結(jié)束
子進(jìn)程報(bào)錯(cuò),任務(wù)結(jié)束
子進(jìn)程報(bào)錯(cuò),任務(wù)結(jié)束
...
Process finished with exit code 0
初始化一個(gè)共享變量為布爾類型為False,每個(gè)進(jìn)程在執(zhí)行前先拿到共享變量判斷是否為False,是則執(zhí)行任務(wù),否則直接跳過(guò)執(zhí)行。初始化布爾變量使用Manager類實(shí)例化后調(diào)用Value方法,c_bool是Ctypes下的數(shù)據(jù)類型,相關(guān)類型如下

另一種是在主進(jìn)程中判斷共享變量,調(diào)用map_async使得主進(jìn)程不被子進(jìn)程阻塞,主進(jìn)程判斷全局變量如果不符合預(yù)期直接退出,調(diào)用terminate終止線程池
from multiprocessing import Pool, Lock, Manager, Value
from ctypes import c_bool
import os
import time
lock = Lock()
ERROR = Manager().Value(c_bool, False)
COUNTER = Value('i', 0)
def run(fn):
global tests_count, lock, ERROR
try:
time.sleep(2)
1 / 0
except:
with lock:
ERROR.value = True
finally:
with lock:
COUNTER.value += 1
print('執(zhí)行任務(wù)(%d / %d). PID: %d ' % (COUNTER.value, 80, os.getpid()))
if __name__ == "__main__":
pool = Pool(10)
pool.map_async(run, list(range(80)))
pool.close()
print("主進(jìn)程判斷...")
while COUNTER.value != len(list(range(80))):
time.sleep(1)
if ERROR.value:
print("子進(jìn)程報(bào)錯(cuò),主進(jìn)程提前退出")
pool.terminate()
break
pool.join()
輸出如下,每隔1秒中檢查全局變量ERROR,如果變?yōu)門rue主進(jìn)程終止進(jìn)程池
主進(jìn)程判斷...
執(zhí)行任務(wù)(1 / 80). PID: 4168
執(zhí)行任務(wù)(2 / 80). PID: 4169
執(zhí)行任務(wù)(3 / 80). PID: 4177
執(zhí)行任務(wù)(4 / 80). PID: 4171
執(zhí)行任務(wù)(5 / 80). PID: 4173
執(zhí)行任務(wù)(6 / 80). PID: 4182
執(zhí)行任務(wù)(7 / 80). PID: 4183
執(zhí)行任務(wù)(8 / 80). PID: 4174
執(zhí)行任務(wù)(9 / 80). PID: 4179
執(zhí)行任務(wù)(10 / 80). PID: 4181
子進(jìn)程報(bào)錯(cuò),主進(jìn)程提前退出
Process finished with exit code 0
一個(gè)實(shí)用的例子是多進(jìn)程找一個(gè)列表中符合要求第一個(gè)值,如果找到則退出多進(jìn)程
from multiprocessing import Pool, Lock, Manager, Value
from ctypes import c_bool
import os
import time
lock = Lock()
FOUND = Manager().Value(c_bool, False)
COUNTER = Value('i', 0)
def run(fn):
global tests_count, lock, ERROR
try:
time.sleep(2)
res = fn + 1
if res == 10:
print("結(jié)果是:{}".format(fn))
with lock:
FOUND.value = True
return fn
except Exception as e:
print(e)
finally:
with lock:
COUNTER.value += 1
print('執(zhí)行任務(wù)(%d / %d). PID: %d ' % (COUNTER.value, 80, os.getpid()))
if __name__ == "__main__":
t1 = time.time()
pool = Pool(10)
pool.map_async(run, list(range(80)))
pool.close()
print("主進(jìn)程判斷...")
while COUNTER.value != len(list(range(80))):
time.sleep(1)
if FOUND.value:
print("已找到結(jié)果")
pool.terminate()
break
pool.join()
t2 = time.time()
print(t2 - t1)
共享字典和數(shù)組變量
使用Manager近創(chuàng)建,Manager().dict(),Manager().list(),測(cè)試代碼如下
from multiprocessing.pool import Pool
from multiprocessing import Manager, Lock
import time
import datetime
LOCK = Lock()
DICT = Manager().dict()
LIST = Manager().list()
def job(ent):
with LOCK:
if len(LIST) < 5:
time.sleep(1)
LIST.append(ent)
else:
if len(LIST) and ent <= len(LIST) - 1:
LIST.pop(ent)
print("dt:{}".format(datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")), "ent:{}".format(ent),
"LIST:{}".format(LIST))
def job2(ent):
if len(LIST) < 5:
time.sleep(1)
LIST.append(ent)
else:
if len(LIST) and ent <= len(LIST) - 1:
LIST.pop(ent)
print("dt:{}".format(datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")), "ent:{}".format(ent),
"LIST:{}".format(LIST))
if __name__ == '__main__':
pool = Pool(10)
pool.map(job2, list(range(10)) * 2)
pool.close()
pool.join()
執(zhí)行job輸出如下,輸出結(jié)果和單進(jìn)程單線程的結(jié)果一致,按照順序每個(gè)進(jìn)程在鎖外排隊(duì)
dt:2021-11-20 22:01:28 ent:0 LIST:[0]
dt:2021-11-20 22:01:29 ent:1 LIST:[0, 1]
dt:2021-11-20 22:01:30 ent:2 LIST:[0, 1, 2]
dt:2021-11-20 22:01:31 ent:3 LIST:[0, 1, 2, 3]
dt:2021-11-20 22:01:32 ent:4 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:5 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:6 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:7 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:8 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:9 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:0 LIST:[1, 2, 3, 4]
dt:2021-11-20 22:01:33 ent:1 LIST:[1, 2, 3, 4, 1]
dt:2021-11-20 22:01:33 ent:2 LIST:[1, 2, 4, 1]
dt:2021-11-20 22:01:34 ent:3 LIST:[1, 2, 4, 1, 3]
dt:2021-11-20 22:01:34 ent:4 LIST:[1, 2, 4, 1]
dt:2021-11-20 22:01:35 ent:5 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:6 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:7 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:8 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:9 LIST:[1, 2, 4, 1, 5]
Process finished with exit code 0
如果執(zhí)行job2則不控制共享變量的同步則完全失控,報(bào)錯(cuò)pop index out of range,原因是在對(duì)變量操作時(shí)有其他進(jìn)程也在操作得到意想不到的結(jié)果
dt:2021-11-20 22:03:02 ent:0 LIST:[0, 1, 2]
dt:2021-11-20 22:03:02 ent:1 LIST:[0, 1, 2]
dt:2021-11-20 22:03:02 ent:2 LIST:[0, 1, 2]
dt:2021-11-20 22:03:02 ent:3 LIST:[0, 1, 2, 3, 5]
dt:2021-11-20 22:03:02 ent:5 LIST:[0, 1, 2, 3, 5, 4, 6]
dt:2021-11-20 22:03:02 ent:4 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8]
dt:2021-11-20 22:03:02 ent:6 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8, 7]
dt:2021-11-20 22:03:02 ent:9 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8, 7]
...
raise self._value
IndexError: pop index out of range