理解Python進(jìn)程Process

Demo代碼和引用知識點都參考自<a >《理解Python并發(fā)編程一篇就夠了 - 進(jìn)程篇》--董偉明</a>或作者個人公眾號Python之美, 《Python Cookbook》和<a >廖雪峰Python3教程</a>。

基本使用

運用多進(jìn)程時,將方法放在main()中,否則會出現(xiàn)異常警告。

Process() 基本使用:與Thread()類似。

Pool() 基本使用:

其中map方法用起來和內(nèi)置的map函數(shù)一樣,卻有多進(jìn)程的支持。

from multiprocessing import Pool
pool = Pool(2)
pool.map(fib, [35] * 2)

multiprocessing.dummy 模塊:

multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module.

對于以上部分知識點,沒有實際運用過,只是單純了解并編寫Demo進(jìn)行了練習(xí),理解沒有很透徹。

# -*- coding: utf-8 -*-
from multiprocessing import Process, Pool
from multiprocessing.dummy import Pool as DummyPool
import time
import datetime

def log_time(methond_name):
    def decorator(f):
        def wrapper(*args, **kwargs):
            start_time = time.time()
            res = f(*args, **kwargs)
            end_time = time.time()
            print('%s cost %ss' % (methond_name, (end_time - start_time)))
            return res
        return wrapper
    return decorator

def fib(n):
    if n <=2 :
        return 1
    return fib(n-1) + fib(n-2)

@log_time('single_process')
def single_process():
    fib(33)
    fib(33)

@log_time('multi_process')
def multi_process():
    jobs = []
    for _ in range(2):
        p = Process(target=fib, args=(33, ))
        p.start()
        jobs.append(p)
    for j in jobs:
        j.join()


@log_time('pool_process')
def pool_process():
    pool = Pool(2)
    pool.map(fib, [33]*2)


@log_time('dummy_pool')
def dummy_pool():
    pool = DummyPool(2)
    pool.map(fib, [33]*2)


if __name__ == '__main__':
    single_process()
    multi_process()
    pool_process()
    dummy_pool()

基于Pipe的parmap

理解稍有困難。


隊列

實現(xiàn)生產(chǎn)消費者模型,一個隊列存放任務(wù),一個隊列存放結(jié)果。
multiprocessing模塊下也有Queue,但不提供task_done()join()方法。故利用Queue存放結(jié)果,JoinableQueue() 來存放任務(wù)。

仿照的Demo,一個消費者進(jìn)程和一個生產(chǎn)者進(jìn)程:

# -*- coding: utf-8 -*-
from multiprocessing import Process, Queue, JoinableQueue
import time
import random

def double(n):
    return n * 2 

def producer(name, task_q):
    while 1:
        n = random.random()
        if n > 0.8:  # 大于0.8時跳出
            task_q.put(None)
            print('%s break.' % name)
            break
        print('%s produce %s.' % (name, n))
        task_q.put((double, n))
    

def consumer(name, task_q, result_q):
    while 1:
        task = task_q.get()
        if task is None:
            print('%s break.' % name)
            break
        func, arg = task
        res = func(arg)
        time.sleep(0.5)  # 阻塞
        task_q.task_done()
        result_q.put(res)
        print('%s consume %s, result %s' % (name, arg, res))

def run():
    task_q = JoinableQueue()
    result_q = Queue()
    processes = []
    p1 = Process(name='p1', target=producer, args=('p1', task_q))
    c1 = Process(name='c1', target=consumer, args=('c1', task_q, result_q))
    p1.start()
    c1.start()
    processes.append(p1)
    processes.append(c1)

    # join()阻塞主進(jìn)程
    for p in processes:
        p.join()

    # 子進(jìn)程結(jié)束后,輸出result中的值
    while 1:
        if result_q.empty():
            break
        result = result_q.get()
        print('result is: %s' % result)

if __name__ == '__main__':
    run()

如果存在多個consumer()進(jìn)程,只會有一個consumer()進(jìn)程能取出None并break,其他的則會在task_q.get()一直掛起,嘗試在consumer()方法中添加超時退出。

import queue

def consumer(name, task_q, result_q):
    while 1:
        try:
            task = task_q.get(1)  # 1s
        except queue.Empty:
            print('%s time out, break.' % name)
        if task is None:
            print('%s break.' % name)
            break
        func, arg = task
        res = func(arg)
        time.sleep(0.5)  # 阻塞
        task_q.task_done()
        result_q.put(res)
        print('%s consume %s, result %s' % (name, arg, res))

共享內(nèi)存

利用sharedctypes中的Array, Value來共享內(nèi)存。
下例為仿照。

# -*- coding: utf-8 -*-

from pprint import pprint

# 共享內(nèi)存
from multiprocessing import sharedctypes, Process, Lock
from ctypes import Structure, c_bool, c_double

pprint(sharedctypes.typecode_to_type)

lock = Lock()


class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]  # _fields_


def modify(n, b, s, arr, A):
    n.value **= 2
    b.value = True
    s.value = s.value.upper()
    arr[0] = 10
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':

    n = sharedctypes.Value('i', 7)
    b = sharedctypes.Value(c_bool, False, lock=False)
    s = sharedctypes.Array('c', b'hello world', lock=lock)  # bytes
    arr = sharedctypes.Array('i', range(5), lock=True)
    A = sharedctypes.Array(Point, [(1.875, -6.25), (-5.75, 2.0)], lock=lock)
    p = Process(target=modify, args=(n, b, s, arr, A))
    p.start()
    p.join()
    print(n.value)
    print(b.value)
    print(s.value)
    print(arr[:])
    print([(a.x, a.y) for a in A])

實際項目中利用Value來監(jiān)測子進(jìn)程的任務(wù)狀態(tài), 并通過memcached來存儲更新刪除。

# -*- coding: utf-8 -*-

from multiprocessing import Process, Value
import time
import datetime
import random


FINISHED = 3
FAILED = 4
INPROCESS = 2
WAITING = 1

def execute_method(status, process):
    time.sleep(1)
    status.value = INPROCESS  # test
    time.sleep(1)
    status.value = FINISHED  # test
    time.sleep(0.5)

def run(execute_code):
    status = Value('i', WAITING )
    process = Value('f', 0.0)
    # mem_cache.set('%s_status' % execute_code, status.value, 0)
    # mem_cache.set('%s_process' % execute_code, process .value, 0)
    p = Process(target=execute_method, args=(status, process))
    p.start()
    start_time = datetime.datetime.now()
    while True:
        print(status.value)
        now_time = datetime.datetime.now()
        if (now_time - start_time).seconds > 30:  # 超過30sbreak
            # mem_cache.delete('%s_status' % execute_code)
            # mem_cache.delete('%s_process' % execute_code)
            print('execute failed')
            p.terminate()
            break
        if status.value == 3:
            # mem_cache.delete('%s_status' % execute_code)
            # mem_cache.delete('%s_process' % execute_code)
            print('end execute')
            break
        else:
            # mem_cache.set('%s_status' % execute_code, status.value, 0)
            # mem_cache.set('%s_process' % execute_code, process .value, 0)
            print('waiting or executing')
        time.sleep(0.5)
    p.join()

服務(wù)進(jìn)程

下例為仿照博客中的服務(wù)進(jìn)程的例子,簡單的展示了Manager的常見的共享方式。

一個multiprocessing.Manager對象會控制一個服務(wù)器進(jìn)程,其他進(jìn)程可以通過代理的方式來訪問這個服務(wù)器進(jìn)程。 常見的共享方式有以下幾種:

  1. Namespace。創(chuàng)建一個可分享的命名空間。
  2. Value/Array。和上面共享ctypes對象的方式一樣。
    dict/list。創(chuàng)建一個可分享的
  3. dict/list,支持對應(yīng)數(shù)據(jù)結(jié)構(gòu)的方法。
  4. Condition/Event/Lock/Queue/Semaphore。創(chuàng)建一個可分享的對應(yīng)同步原語的對象。
# -*- coding: utf-8 -*-
from multiprocessing import Manager, Process

def modify(ns, lproxy, dproxy):
    ns.name = 'new_name'
    lproxy.append('new_value')
    dproxy['new'] = 'new_value'

def run():
    # 數(shù)據(jù)準(zhǔn)備
    manager = Manager()
    ns = manager.Namespace()
    ns.name = 'origin_name'
    lproxy = manager.list()
    lproxy.append('origin_value')
    dproxy = manager.dict()
    dproxy['origin'] = 'origin_value'
    
    # 子進(jìn)程
    p = Process(target=modify, args=(ns, lproxy, dproxy))
    p.start()
    print(p.pid)
    p.join()

    print('ns.name: %s' % ns.name)
    print('lproxy: %s' % lproxy)
    print('dproxy: %s' % dproxy)

if __name__ == '__main__':
    run()

上例主要是展示了Manager中的共享對象類型和代理,查看源碼知是通過register()方法。

multiprocessing/managers.py:

#
# Definition of SyncManager
#

class SyncManager(BaseManager):
    '''
    Subclass of `BaseManager` which supports a number of shared object types.

    The types registered are those intended for the synchronization
    of threads, plus `dict`, `list` and `Namespace`.

    The `multiprocessing.Manager()` function creates started instances of
    this class.
    '''

SyncManager.register('Queue', queue.Queue)
SyncManager.register('JoinableQueue', queue.Queue)
SyncManager.register('Event', threading.Event, EventProxy)
SyncManager.register('Lock', threading.Lock, AcquirerProxy)
SyncManager.register('RLock', threading.RLock, AcquirerProxy)
SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
                     AcquirerProxy)
SyncManager.register('Condition', threading.Condition, ConditionProxy)
SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
SyncManager.register('Pool', pool.Pool, PoolProxy)
SyncManager.register('list', list, ListProxy)
SyncManager.register('dict', dict, DictProxy)
SyncManager.register('Value', Value, ValueProxy)
SyncManager.register('Array', Array, ArrayProxy)
SyncManager.register('Namespace', Namespace, NamespaceProxy)

# types returned by methods of PoolProxy
SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
SyncManager.register('AsyncResult', create_method=False)

除了在子進(jìn)程中,還可利用Manager()來在不同進(jìn)程間通信,如下面的分布式進(jìn)程簡單實現(xiàn)。


分布進(jìn)程

和上例的主要區(qū)別是,非子進(jìn)程間進(jìn)行通信。

manager_server.py:

# -*- coding: utf-8 -*-

from multiprocessing.managers import BaseManager

host = '127.0.0.1'
port = 8080
authkey = b'python'

shared_list = []

class ServerManager(BaseManager):
    pass

ServerManager.register('get_list', callable=lambda: shared_list)
server_manager = ServerManager(address=(host, port), authkey=authkey)
server = server_manager.get_server()
server.serve_forever()

manager_client.py

# -*- coding: utf-8 -*-

from multiprocessing.managers import BaseManager

host = '127.0.0.1'
port = 8080
authkey = b'python'

class ClientManager(BaseManager):
    pass

ClientManager.register('get_list')
client_manager = ClientManager(address=(host, port), authkey=authkey)
client_manager.connect()

l = client_manager.get_list()
print(l)

l.append('new_value')
print(l)

運行多次后,shared_list中會不斷添加new_value。

仿照廖雪峰教程上的分布式進(jìn)程加以適當(dāng)修改。

manager_server.py:

# -*- coding: utf-8 -*-

from multiprocessing.managers import BaseManager
from multiprocessing import Condition, Value
import queue

host = '127.0.0.1'
port = 8080
authkey = b'python'


task_q = queue.Queue(10)
result_q = queue.Queue(20)
cond = Condition()
done = Value('i', 0)

def double(n):
    return n * 2

class ServerManager(BaseManager):
    pass

ServerManager.register('get_task_queue', callable=lambda: task_q)
ServerManager.register('get_result_queue', callable=lambda: result_q)
ServerManager.register('get_cond', callable=lambda: cond)
ServerManager.register('get_done', callable=lambda: done)
ServerManager.register('get_double', callable=double)

server_manager = ServerManager(address=(host, port), authkey=authkey)
server = server_manager.get_server()

print('start server')
server.serve_forever()


manager_producer.py:

# -*- coding: utf-8 -*-

from multiprocessing.managers import BaseManager
import random
import time

host = '127.0.0.1'
port = 8080
authkey = b'python'

class ProducerManager(BaseManager):
    pass

ProducerManager.register('get_task_queue')
ProducerManager.register('get_cond')
ProducerManager.register('get_done')
producer_manager = ProducerManager(address=(host, port), authkey=authkey)

producer_manager.connect()
task_q  = producer_manager.get_task_queue()
cond = producer_manager.get_cond()
# done = producer_manager.get_done()
count = 20  # 最多有20個任務(wù)

while count > 0:
    if cond.acquire():
        if not task_q.full():
            n = random.randint(0, 10)
            task_q.put(n)
            print("Producer:deliver one, now tasks:%s" % task_q.qsize())
            cond.notify()
            count -= 1
            time.sleep(0.5)
        else:
            print("Producer:already full, stop deliver, now tasks:%s" % task_q.qsize())
            cond.wait() 
        cond.release()
# done.value = 1
print('Producer break')

manager_consumer.py:

# -*- coding: utf-8 -*-

from multiprocessing.managers import BaseManager

host = '127.0.0.1'
port = 8080
authkey = b'python'

class ConsumerManager(BaseManager):
    pass

ConsumerManager.register('get_task_queue')
ConsumerManager.register('get_result_queue')
ConsumerManager.register('get_cond')
# ConsumerManager.register('get_done')
ConsumerManager.register('get_double')

consumer_manager = ConsumerManager(address=(host, port), authkey=authkey)
consumer_manager.connect()

task_q = consumer_manager.get_task_queue()
result_q = consumer_manager.get_result_queue()
cond = consumer_manager.get_cond()
# done = consumer_manager.get_done()

while 1:
    if result_q.full():
        print('result queue is full')
        break
    if cond.acquire():
        if not task_q.empty():
            arg = task_q.get()
            res = consumer_manager.get_double(arg)
            print("Consumer:consume one, now tasks:%s" % task_q.qsize())
            result_q.put(res)
            cond.notify()
        else:
            print("Consumer:only 0, stop consume, products")
            cond.wait()
        cond.release()

while 1:
    if result_q.empty():
        break
    result = result_q.get()
    print('result is: %s' % result)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • @(python)[筆記] 目錄 一、什么是進(jìn)程 1.1 進(jìn)程的概念 進(jìn)程的概念起源于操作系統(tǒng),是操作系統(tǒng)最核心的...
    CaiGuangyin閱讀 1,329評論 0 9
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,502評論 19 139
  • 今天我要在我的粉圈得瑟我媽腌的咸鴨蛋。 這位退休后打理了樓頂菜園,現(xiàn)階段各種蔬菜,幾只雞鴨,生機(jī)勃勃。...
    fionacy閱讀 200評論 0 1
  • 剛網(wǎng)上閑逛,發(fā)現(xiàn)了一則2009年百度問答里挺火熱的問題,即“如何將QQ下載到桌面上”。 這不禁讓我想起了自己第一次...
    化濁閱讀 442評論 0 0
  • 精確率、召回率、F1 精確率 = TP / (TP + FP),表示返回的正例中真正例所占的比例;召回率 = TP...
    貳拾貳畫生閱讀 7,565評論 0 7

友情鏈接更多精彩內(nèi)容