Python:多進(jìn)程同步共享全局變量(鎖,計(jì)數(shù)器,原子布爾)

摘要:Python,多進(jìn)程

多進(jìn)程變量同步的場(chǎng)景和方法

場(chǎng)景:在使用Python多進(jìn)程并行時(shí)需要在進(jìn)程間共享變量,這些共享的變量可以更好地控制和把握任務(wù)執(zhí)行的情況,比如查看任務(wù)進(jìn)度,提前停止任務(wù)等
方法:在多線程中變量共享在主線程中定義變量,在每個(gè)子線程中使用global關(guān)鍵字拿到變量,再配合threading.RLock()在對(duì)變量操作時(shí)拿到和釋放鎖(acquirerelease)即可,但是在多進(jìn)程中,變量是放在不同子進(jìn)程的數(shù)據(jù)區(qū)中,每個(gè)進(jìn)程都是獨(dú)立的地址空間,所以用一般的方法是不能共享變量的,multiprocessing模塊提供了Array,Manager,Value類來(lái)定義共享變量,能夠?qū)崿F(xiàn)進(jìn)程間共享數(shù)字,字符串,列表,字典,實(shí)例對(duì)象的變量共享


共享整數(shù)變量

對(duì)于整數(shù)的多進(jìn)程共享是常用的場(chǎng)景,比如使用多進(jìn)程并行任務(wù),需要記錄執(zhí)行日志記錄任務(wù)進(jìn)度,實(shí)例代碼如下

import multiprocessing
from multiprocessing import Pool, Lock, Value

from utils.logger_utils import logging_

LOGGER = logging_("predict_main", os.path.join(ROOT_PATH, "./logs/details.log"))
lock = Lock()
Counter = Value('i', 0)
ENT_LIST = list(set([line.strip().replace("(", "(").replace(")", ")") for line in open(os.path.join(BASIC_PATH, "data/ent_name_predict.txt"), "r", encoding="utf8").readlines()]))
TOTAL = len(ENT_LIST)

def get_one_res(data):
    global TOTAL, lock, Counter
    res = {}
    try:
        ent_name = data
        res = get_feature(ent_name, PREDICT_DATE)
        res["updatedate"] = PREDICT_DATE
        res["uid"] = get_md5(formatted_ent(ent_name))
    except Exception as e:
        LOGGER.error(data + ":錯(cuò)誤:" + e.args[0])
    finally:
        with lock:
            Counter.value += 1
        LOGGER.info("執(zhí)行完成:(%d / %d) 進(jìn)程號(hào): %d --------------- %s", Counter.value, TOTAL, os.getpid(), data)
    return res


if __name__ == '__main__':
    pool = Pool(int(get_string("process_num")))
    res = pool.map(get_one_res, ENT_LIST)
    LOGGER.info("全部執(zhí)行完成,關(guān)閉進(jìn)程池")
    pool.close()
    pool.join()

運(yùn)行查看執(zhí)行日志

2021-11-18 15:19:15 [predict_main] INFO [42] 執(zhí)行完成:(1 / 1400) 進(jìn)程號(hào): 15 --------------- 深圳順亞投資有限公司
2021-11-18 15:19:16 [predict_main] INFO [42] 執(zhí)行完成:(2 / 1400) 進(jìn)程號(hào): 25 --------------- 蕪湖新?lián)P投資合伙企業(yè)(有限合伙)
2021-11-18 15:19:18 [predict_main] INFO [42] 執(zhí)行完成:(3 / 1400) 進(jìn)程號(hào): 24 --------------- 保定隆瑞房地產(chǎn)開發(fā)有限公司
2021-11-18 15:19:19 [predict_main] INFO [42] 執(zhí)行完成:(4 / 1400) 進(jìn)程號(hào): 11 --------------- 云南俊發(fā)凱豐房地產(chǎn)開發(fā)有限公司

在全局定義鎖和計(jì)數(shù)器,Value('i', 0)代表定義的共享變量是int類型初始值是0,如果要定義double變量則使用Value('d', 0),相當(dāng)于java里面的原子變量,在執(zhí)行函數(shù)中調(diào)用with上下文在實(shí)行完任務(wù)后調(diào)用Counter.value += 1實(shí)現(xiàn)計(jì)數(shù)+1,最后在進(jìn)程池中調(diào)用執(zhí)行方法,每個(gè)并行的任務(wù)在執(zhí)行完畢會(huì)調(diào)用鎖進(jìn)行計(jì)數(shù)器+1,同一時(shí)刻只有一個(gè)子進(jìn)程拿到鎖實(shí)現(xiàn)進(jìn)程同步,如果不采用鎖的方式,在日志中計(jì)數(shù)器會(huì)亂序,但是最終總的值相等


共享布爾變量

這種情況在全局中記錄一個(gè)布爾變量,每次執(zhí)行任務(wù)前拿到變量判斷是否與預(yù)期一致,如果執(zhí)行報(bào)錯(cuò)修改變量狀態(tài),多用于子進(jìn)程中任務(wù)報(bào)錯(cuò)提前結(jié)束全部任務(wù)全部退出,代碼如下

from multiprocessing import Pool, Lock, Manager
from ctypes import c_bool
import os

lock = Lock()
ERROR = Manager().Value(c_bool, False)


def run(fn):
    global tests_count, lock, ERROR
    if not ERROR.value:
        try:
            print('執(zhí)行任務(wù). PID: %d ' % (os.getpid()))
            1 / 0
        except Exception as e:
            with lock:
                ERROR.value = True
    else:
        print("子進(jìn)程報(bào)錯(cuò),任務(wù)結(jié)束")


if __name__ == "__main__":
    pool = Pool(10)
    # 80個(gè)任務(wù),會(huì)運(yùn)行run()80次,每次傳入xrange數(shù)組一個(gè)元素
    pool.map(run, list(range(80)))
    pool.close()
    pool.join()

查看執(zhí)行輸出

執(zhí)行任務(wù). PID: 27374 
子進(jìn)程報(bào)錯(cuò),任務(wù)結(jié)束
子進(jìn)程報(bào)錯(cuò),任務(wù)結(jié)束
子進(jìn)程報(bào)錯(cuò),任務(wù)結(jié)束
...
Process finished with exit code 0

初始化一個(gè)共享變量為布爾類型為False,每個(gè)進(jìn)程在執(zhí)行前先拿到共享變量判斷是否為False,是則執(zhí)行任務(wù),否則直接跳過(guò)執(zhí)行。初始化布爾變量使用Manager類實(shí)例化后調(diào)用Value方法,c_bool是Ctypes下的數(shù)據(jù)類型,相關(guān)類型如下

另一種是在主進(jìn)程中判斷共享變量,調(diào)用map_async使得主進(jìn)程不被子進(jìn)程阻塞,主進(jìn)程判斷全局變量如果不符合預(yù)期直接退出,調(diào)用terminate終止線程池

from multiprocessing import Pool, Lock, Manager, Value
from ctypes import c_bool
import os
import time

lock = Lock()
ERROR = Manager().Value(c_bool, False)
COUNTER = Value('i', 0)


def run(fn):
    global tests_count, lock, ERROR
    try:
        time.sleep(2)
        1 / 0
    except:
        with lock:
            ERROR.value = True
    finally:
        with lock:
            COUNTER.value += 1
            print('執(zhí)行任務(wù)(%d / %d). PID: %d ' % (COUNTER.value, 80, os.getpid()))


if __name__ == "__main__":
    pool = Pool(10)
    pool.map_async(run, list(range(80)))
    pool.close()
    print("主進(jìn)程判斷...")
    while COUNTER.value != len(list(range(80))):
        time.sleep(1)
        if ERROR.value:
            print("子進(jìn)程報(bào)錯(cuò),主進(jìn)程提前退出")
            pool.terminate()
            break
    pool.join()

輸出如下,每隔1秒中檢查全局變量ERROR,如果變?yōu)門rue主進(jìn)程終止進(jìn)程池

主進(jìn)程判斷...
執(zhí)行任務(wù)(1 / 80). PID: 4168 
執(zhí)行任務(wù)(2 / 80). PID: 4169 
執(zhí)行任務(wù)(3 / 80). PID: 4177 
執(zhí)行任務(wù)(4 / 80). PID: 4171 
執(zhí)行任務(wù)(5 / 80). PID: 4173 
執(zhí)行任務(wù)(6 / 80). PID: 4182 
執(zhí)行任務(wù)(7 / 80). PID: 4183 
執(zhí)行任務(wù)(8 / 80). PID: 4174 
執(zhí)行任務(wù)(9 / 80). PID: 4179 
執(zhí)行任務(wù)(10 / 80). PID: 4181 
子進(jìn)程報(bào)錯(cuò),主進(jìn)程提前退出

Process finished with exit code 0

一個(gè)實(shí)用的例子是多進(jìn)程找一個(gè)列表中符合要求第一個(gè)值,如果找到則退出多進(jìn)程

from multiprocessing import Pool, Lock, Manager, Value
from ctypes import c_bool
import os
import time

lock = Lock()
FOUND = Manager().Value(c_bool, False)
COUNTER = Value('i', 0)


def run(fn):
    global tests_count, lock, ERROR
    try:
        time.sleep(2)
        res = fn + 1
        if res == 10:
            print("結(jié)果是:{}".format(fn))
            with lock:
                FOUND.value = True
            return fn
    except Exception as e:
        print(e)
    finally:
        with lock:
            COUNTER.value += 1
            print('執(zhí)行任務(wù)(%d / %d). PID: %d ' % (COUNTER.value, 80, os.getpid()))


if __name__ == "__main__":
    t1 = time.time()
    pool = Pool(10)
    pool.map_async(run, list(range(80)))
    pool.close()
    print("主進(jìn)程判斷...")
    while COUNTER.value != len(list(range(80))):
        time.sleep(1)
        if FOUND.value:
            print("已找到結(jié)果")
            pool.terminate()
            break
    pool.join()
    t2 = time.time()
    print(t2 - t1)

共享字典和數(shù)組變量

使用Manager近創(chuàng)建,Manager().dict()Manager().list(),測(cè)試代碼如下

from multiprocessing.pool import Pool
from multiprocessing import Manager, Lock
import time
import datetime

LOCK = Lock()
DICT = Manager().dict()
LIST = Manager().list()


def job(ent):
    with LOCK:
        if len(LIST) < 5:
            time.sleep(1)
            LIST.append(ent)
        else:
            if len(LIST) and ent <= len(LIST) - 1:
                LIST.pop(ent)
        print("dt:{}".format(datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")), "ent:{}".format(ent),
              "LIST:{}".format(LIST))


def job2(ent):
    if len(LIST) < 5:
        time.sleep(1)
        LIST.append(ent)
    else:
        if len(LIST) and ent <= len(LIST) - 1:
            LIST.pop(ent)
    print("dt:{}".format(datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")), "ent:{}".format(ent),
          "LIST:{}".format(LIST))


if __name__ == '__main__':
    pool = Pool(10)
    pool.map(job2, list(range(10)) * 2)
    pool.close()
    pool.join()

執(zhí)行job輸出如下,輸出結(jié)果和單進(jìn)程單線程的結(jié)果一致,按照順序每個(gè)進(jìn)程在鎖外排隊(duì)

dt:2021-11-20 22:01:28 ent:0 LIST:[0]
dt:2021-11-20 22:01:29 ent:1 LIST:[0, 1]
dt:2021-11-20 22:01:30 ent:2 LIST:[0, 1, 2]
dt:2021-11-20 22:01:31 ent:3 LIST:[0, 1, 2, 3]
dt:2021-11-20 22:01:32 ent:4 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:5 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:6 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:7 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:8 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:9 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:0 LIST:[1, 2, 3, 4]
dt:2021-11-20 22:01:33 ent:1 LIST:[1, 2, 3, 4, 1]
dt:2021-11-20 22:01:33 ent:2 LIST:[1, 2, 4, 1]
dt:2021-11-20 22:01:34 ent:3 LIST:[1, 2, 4, 1, 3]
dt:2021-11-20 22:01:34 ent:4 LIST:[1, 2, 4, 1]
dt:2021-11-20 22:01:35 ent:5 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:6 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:7 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:8 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:9 LIST:[1, 2, 4, 1, 5]

Process finished with exit code 0

如果執(zhí)行job2則不控制共享變量的同步則完全失控,報(bào)錯(cuò)pop index out of range,原因是在對(duì)變量操作時(shí)有其他進(jìn)程也在操作得到意想不到的結(jié)果

dt:2021-11-20 22:03:02 ent:0 LIST:[0, 1, 2]
dt:2021-11-20 22:03:02 ent:1 LIST:[0, 1, 2]
dt:2021-11-20 22:03:02 ent:2 LIST:[0, 1, 2]
dt:2021-11-20 22:03:02 ent:3 LIST:[0, 1, 2, 3, 5]
dt:2021-11-20 22:03:02 ent:5 LIST:[0, 1, 2, 3, 5, 4, 6]
dt:2021-11-20 22:03:02 ent:4 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8]
dt:2021-11-20 22:03:02 ent:6 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8, 7]
dt:2021-11-20 22:03:02 ent:9 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8, 7]
...
    raise self._value
IndexError: pop index out of range

共享自定義對(duì)象

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容