簡介
進程是運行的程序,每個進程有自己的系統(tǒng)狀態(tài),包含了內(nèi)存、打開文件列表、程序計數(shù)器(跟蹤執(zhí)行的指令)、存儲函數(shù)本地調(diào)用變量的堆棧。
使用os或subprocess可以創(chuàng)建新進程,比如:os.fork(), subprocess.Popen()。子進程和父進程是相互獨立執(zhí)行的。
interprocess communication (IPC)進程間的通信: 最常見的形式是基于消息傳遞(message passing)。message是原始字節(jié)的緩存,通過I/O channel比如網(wǎng)絡(luò)socket和管道,使用原語比如send() and recv()來發(fā)送接收消息。次常用的有內(nèi)存映射區(qū):memory-mapped regions,見mmap模塊,實際上是共享內(nèi)存。
線程有自己的控制流和執(zhí)行堆棧,但是共享系統(tǒng)資源和數(shù)據(jù)。
并發(fā)的難點:同步和數(shù)據(jù)共享。解決的方法一般是使用互斥鎖。
write_lock = Lock()
...
# Critical section where writing occurs
write_lock.acquire()
f.write("Here's some data.\n")
f.write("Here's more data.\n")
...
write_lock.release()
python的并發(fā)程序設(shè)計
多數(shù)系統(tǒng)上,Python支持消息傳遞和基于線程的并發(fā)程序設(shè)計。global interpreter lock (the GIL)機制實際每個時間單元只允許單個線程執(zhí)行,哪怕有多個CPU。如果瓶頸在I/O,使用多線程效果不錯;如果在cpu,效果則會更差。還不如使用子進程和消息傳遞。線程數(shù)一多經(jīng)常出現(xiàn)以下怪異的問題,比如100個線程工作良好,1000個線程就可能出問題了,這種情況一般需要使用異步事件處理系統(tǒng),比如中央事件循環(huán)可能使用select模塊監(jiān)控I/O資源和分發(fā)異步到大量的I/O 處理器。asyncore和流行的第三方的Twisted (http://twistedmatrix/com)可以實現(xiàn)這點。
消息傳遞在python使用很廣,甚至在線程中。它難于出錯,減少了鎖和同步原語的使用??梢詳U展至網(wǎng)絡(luò)和分布式系統(tǒng)。Python的高級特性比如協(xié)程序(coroutines)也使用消息傳遞抽象。
multiprocessing支持子進程、通信和共享數(shù)據(jù)、執(zhí)行不同形式的同步。
multiprocessing
Process類
這個類表示子進程中運行的任務(wù):Process([group [, target [, name [, args [, kwargs]]]]]),構(gòu)造函數(shù)中必須使用關(guān)鍵字參數(shù),target表示可調(diào)用對象,args表示調(diào)用對象的位置參數(shù)元組。kwargs表示調(diào)用對象的字典。Name為別名。Group實質(zhì)上不使用。
方法有:is_alive()、.join([timeout])、run()、start()、terminate()。
屬性有:authkey、daemon(要通過start()設(shè)置)、exitcode(進程在運行時為None、如果為–N,表示被信號N結(jié)束)、name、pid。
Process類中,注意daemon是父進程終止后自動終止,且自己不能產(chǎn)生新進程,必須在start()之前設(shè)置。
創(chuàng)建函數(shù)并將其作為單個進程。
import multiprocessing
import time
def clock(interval):
for i in range(3):
print("The time is {0}".format(time.ctime()))
time.sleep(interval)
if __name__ == '__main__':
p = multiprocessing.Process(target=clock, args=(2,))
p.start()
將進程定義為類:
import multiprocessing
import time
class ClockProcess(multiprocessing.Process):
def __init__(self, interval):
multiprocessing.Process.__init__(self)
self.interval = interval
def run(self):
for i in range(3):
print("The time is {0}".format(time.ctime()))
time.sleep(self.interval)
if __name__ == '__main__':
p = ClockProcess(2)
p.start()
注意,要在命令行才能執(zhí)行,用IDE是不行的。
進程通信
multiprocessing支持管道和隊列,都是用消息傳遞來實現(xiàn)的,隊列接口和線程中的隊列類似。
Queue([maxsize]):默認不限制大小,隊列實質(zhì)是用管道和鎖來實現(xiàn)的。支持線程會給底層管道傳送數(shù)據(jù)。
方法有:cancel_join_thread()、close()、empty()、full()、get([block [, timeout]])、get_nowait()(等同于get(False))、join_thread()、put(item [, block [, timeout]])、put_nowait(item)(等同于put(item, False))、qsize()、JoinableQueue([maxsize])、task_done()、join()
下例使用隊列進行通信:
JoinableQueue創(chuàng)建連接的進程隊列。隊列和普通隊列基本一樣,不過消費者在處理完畢之后可以通知生產(chǎn)者(q.task_done())。使用共享信號和條件變量實現(xiàn)。join()由生產(chǎn)者使用,等待所有成員都收到task_done。
import multiprocessing
def consumer(input_q):
while True:
item = input_q.get()
print(item)
input_q.task_done()
def producer(sequence, output_q):
for item in sequence:
output_q.put(item)
if __name__ == '__main__':
q = multiprocessing.JoinableQueue()
cons_p = multiprocessing.Process(target=consumer, args=(q,))
cons_p.daemon = True
cons_p.start()
sequence = [1, 2, 3, 4]
producer(sequence, q)
q.join()
這里控制多進程的關(guān)鍵在于隊列g(shù)et()之后,使用task_done()指示該元素處理完畢;進程啟動之前設(shè)置了daemon為True;對隊列使用join()。
這種方法可以啟動多個進程,如下:
process = []
key_list = multiprocessing.JoinableQueue()
# Launch the consumer process
for i in range(10):
t = multiprocessing.Process(target=consumer,args=(key_list,lock))
t.daemon=True
process.append(t)
for i in range(10):
process[i].start()
producer( key_list )
key_list.join()
下面有個應(yīng)用實例:
在某些程序中,生產(chǎn)者需要告知消費者沒有更多項目了,消費者可以關(guān)閉了。這時需要使用哨兵(sentinel)。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# multiprocessing_sentinel.py
# Author Rongzhong Xu 2016-08-11 wechat: pythontesting
"""
multiprocessing sentinel demo,
Tesed in python2.7/3.5/2.6
"""
import multiprocessing
def consumer(input_q):
while True:
item = input_q.get()
if item is None:
break
# Process item
print(item) # Replace with useful work
# Shutdown
print("Consumer done")
def producer(sequence, output_q):
for item in sequence:
# Put the item on the queue
output_q.put(item)
if __name__ == '__main__':
q = multiprocessing.Queue()
# Launch the consumer process
cons_p = multiprocessing.Process(target=consumer, args=(q,))
cons_p.start()
# Produce items
sequence = [1, 2, 3, 4]
producer(sequence, q)
# Signal completion by putting the sentinel on the queue
q.put(None)
# Wait for the consumer process to shutdown
cons_p.join()
注意:每個消費者都需要一個:sentinel,可以使用for語句來實現(xiàn)
for i in range(10):
q.put(None)
實際使用中不局限于使用None,使用其他特殊符號等也是可以的。上面程序從表面看比使用JoinableQueue要復(fù)雜,實現(xiàn)的效果又是一樣的。實際上這種場景應(yīng)用更廣泛,在consumer比較耗時的情況下,JoinableQueue如果鎖住整個函數(shù)則互相等待的時間太長,如果不鎖,后面幾次執(zhí)行可能丟失數(shù)據(jù)。
管道
使用管道:Pipe([duplex]),返回值:元組(conn1, conn2)。conn1和conn2為Connection對象,代表管道的末端。管道默認是雙向的,如果設(shè)置duplex為False,conn1只能接收,conn2只能發(fā)送。
Connection對象的方法和屬性如下:
close()、fileno()、poll([timeout])、recv()、recv_bytes([maxlength])、recv_bytes_into(buffer [, offset])、send(obj)、send_bytes(buffer [, offset [, size]])
下面例子實現(xiàn)和之前類似的功能:
def consumer(pipe):
output_p, input_p = pipe
input_p.close() # Close the input end of the pipe
while True:
try:
item = output_p.recv()
except EOFError:
break
# Process item
print(item) # Replace with useful work
# Shutdown
print("Consumer done")
# Produce items and put on a queue. sequence is an
# iterable representing items to be processed.
def producer(sequence, input_p):
for item in sequence:
# Put the item on the queue
input_p.send(item)
if __name__ == '__main__':
(output_p, input_p) = multiprocessing.Pipe()
# Launch the consumer process
cons_p = multiprocessing.Process(
target=consumer, args=((output_p, input_p),))
cons_p.start()
# Close the output pipe in the producer
output_p.close()
# Produce items
sequence = [1, 2, 3, 4]
producer(sequence, input_p)
# Signal completion by closing the input pipe
input_p.close()
# Wait for the consumer process to shutdown
cons_p.join()
管道還可以用于雙向通信,比如下例的C/S模式:
import multiprocessing
# A server process
def adder(pipe):
server_p, client_p = pipe
client_p.close()
while True:
try:
x, y = server_p.recv()
except EOFError:
break
result = x + y
server_p.send(result)
# Shutdown
print("Server done")
if __name__ == '__main__':
(server_p, client_p) = multiprocessing.Pipe()
# Launch the server process
adder_p = multiprocessing.Process(
target=adder, args=((server_p, client_p),))
adder_p.start()
# Close the server pipe in the client
server_p.close()
# Make some requests on the server
client_p.send((3, 4))
print(client_p.recv())
client_p.send(('Hello', 'World'))
print(client_p.recv())
# Done. Close the pipe
client_p.close()
# Wait for the consumer process to shutdown
adder_p.join()
send()和recv()使用pickle序列化對象。更高級的程序需要使用遠程過程調(diào)用,需要使用到進程池。
進程池
Pool類在簡單的情況下可用于管理固定數(shù)量的消費者。進程池的功能和列表解析及函數(shù)式編程中的map-reduce類似。
import multiprocessing
import time
def do_calculation(data):
return data * 2
def start_process():
print('Starting {0}'.format(multiprocessing.current_process().name))
if __name__ == '__main__':
# convert range to list for python3
inputs = list(range(100))
time1 = time.time()
builtin_outputs = map(do_calculation, inputs)
# convert to list for python3
print('Built-in: {0}'.format(list(builtin_outputs)))
time2 = time.time()
print(time2 - time1)
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(processes=pool_size,
initializer=start_process,
)
pool_outputs = pool.map(do_calculation, inputs)
pool.close() # no more tasks
pool.join() # wrap up current tasks
time3 = time.time()
print('Pool : {0}'.format(pool_outputs))
print(time3 - time2)
執(zhí)行結(jié)果:
$ python3 multiprocessing_pool.py
Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198]
3.790855407714844e-05
Starting ForkPoolWorker-1
Starting ForkPoolWorker-2
Starting ForkPoolWorker-3
Starting ForkPoolWorker-4
Starting ForkPoolWorker-5
Starting ForkPoolWorker-6
Starting ForkPoolWorker-7
Starting ForkPoolWorker-8
Starting ForkPoolWorker-9
Starting ForkPoolWorker-10
Starting ForkPoolWorker-11
Starting ForkPoolWorker-12
Starting ForkPoolWorker-13
Starting ForkPoolWorker-14
Starting ForkPoolWorker-15
Starting ForkPoolWorker-16
Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198]
0.2203056812286377
上面例子先計算map的時間,然后用進程池的map,計算出時間。在列表數(shù)比較少的情況下,多進程的執(zhí)行時間更短。列表數(shù)比較多的情況下,多進程的執(zhí)行時間更長,可見python內(nèi)置的map是效率比較高的。
如果消費者函數(shù)有內(nèi)存泄露,可以在執(zhí)行任務(wù)之后重啟,設(shè)定maxtasksperchild參數(shù)即可。
import time
def do_calculation(data):
return data * 2
def start_process():
print('Starting {0}'.format(multiprocessing.current_process().name))
if __name__ == '__main__':
# convert range to list for python3
inputs = list(range(100))
time1 = time.time()
builtin_outputs = map(do_calculation, inputs)
# convert to list for python3
print('Built-in: {0}'.format(list(builtin_outputs)))
time2 = time.time()
print(time2 - time1)
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(processes=pool_size,
initializer=start_process,
maxtasksperchild=3,
)
pool_outputs = pool.map(do_calculation, inputs)
pool.close() # no more tasks
pool.join() # wrap up current tasks
time3 = time.time()
print('Pool : {0}'.format(pool_outputs))
print(time3 - time2)
執(zhí)行結(jié)果:
$ python3 multiprocessing_pool2.py
Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198]
3.600120544433594e-05
Starting ForkPoolWorker-1
Starting ForkPoolWorker-3
Starting ForkPoolWorker-2
Starting ForkPoolWorker-4
Starting ForkPoolWorker-5
Starting ForkPoolWorker-6
Starting ForkPoolWorker-7
Starting ForkPoolWorker-8
Starting ForkPoolWorker-9
Starting ForkPoolWorker-10
Starting ForkPoolWorker-11
Starting ForkPoolWorker-12
Starting ForkPoolWorker-13
Starting ForkPoolWorker-14
Starting ForkPoolWorker-15
Starting ForkPoolWorker-16
Starting ForkPoolWorker-17
Starting ForkPoolWorker-18
Starting ForkPoolWorker-19
Starting ForkPoolWorker-20
Starting ForkPoolWorker-21
Starting ForkPoolWorker-22
Starting ForkPoolWorker-23
Starting ForkPoolWorker-24
Starting ForkPoolWorker-25
Starting ForkPoolWorker-26
Starting ForkPoolWorker-27
Starting ForkPoolWorker-28
Starting ForkPoolWorker-29
Starting ForkPoolWorker-30
Starting ForkPoolWorker-31
Starting ForkPoolWorker-32
Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198]
0.23842501640319824
從結(jié)果看,進程數(shù)有所增加。(注意,進程數(shù)似乎比預(yù)期的要少)
Pool([numprocess [,initializer [, initargs]]])
numprocess的默認值是cpu_count()。方法有:apply(func [, args [, kwargs]]),apply_async(func [, args [, kwargs [, callback]]]),close(),join(),imap(func, iterable [, chunksize]),imap_unordered(func, iterable [, chunksize]]),map(func, iterable [, chunksize]),map_async(func, iterable [, chunksize [, callback]]),terminate().
返回結(jié)果AsyncResult的方法:get([timeout])、ready()、sucessful()、wait([timeout])、wait([timeout])
以下代碼生成指定目錄的文件名和SHA512對應(yīng)表的字典。
import multiprocessing
import hashlib
import binascii
# Some parameters you can tweak
BUFSIZE = 8192 # Read buffer size
POOLSIZE = 2 # Number of workers
def compute_digest(filename):
try:
f = open(filename, "rb")
except IOError:
return None
digest = hashlib.sha512()
while True:
chunk = f.read(BUFSIZE)
if not chunk:
break
digest.update(chunk)
f.close()
return filename, digest.digest()
def build_digest_map(topdir):
digest_pool = multiprocessing.Pool(POOLSIZE)
allfiles = (os.path.join(path, name)
for path, dirs, files in os.walk(topdir)
for name in files)
digest_map = dict(digest_pool.imap_unordered(compute_digest, allfiles, 20))
digest_pool.close()
return digest_map
# Try it out. Change the directory name as desired.
if __name__ == '__main__':
digest_map = build_digest_map("/home/andrew/data/code/python/\
python-chinese-library/libraries/multiprocessing")
print(len(digest_map))
for key in digest_map.keys():
print("{0}: {1}".format(key, binascii.hexlify(digest_map[key])))
共享數(shù)據(jù)和同步
共享內(nèi)存通過mmap實現(xiàn)。共享內(nèi)存中創(chuàng)建的是ctypes對象,不需要管道中的序列化。
Value(typecode, arg1, ... argN, lock),RawValue(typecode, arg1, ..., argN),Array(typecode, initializer, lock),RawArray(typecode, initializer)
原語有: Lock,Rlock,Semaphore,BoundedSemaphore,Event,Condition.
import multiprocessing
class FloatChannel(object):
def __init__(self, maxsize):
self.buffer = multiprocessing.RawArray('d', maxsize)
self.buffer_len = multiprocessing.Value('i')
self.empty = multiprocessing.Semaphore(1)
self.full = multiprocessing.Semaphore(0)
def send(self, values):
self.empty.acquire() # Only proceed if buffer empty
nitems = len(values)
self.buffer_len = nitems # Set the buffer size
self.buffer[:nitems] = values # Copy values into the buffer
self.full.release() # Signal that buffer is full
def recv(self):
self.full.acquire() # Only proceed if buffer full
values = self.buffer[:self.buffer_len.value] # Copy values
self.empty.release() # Signal that buffer is empty
return values
# Performance test. Receive a bunch of messages
def consume_test(count, ch):
for i in range(count):
values = ch.recv()
# Performance test. Send a bunch of messages
def produce_test(count, values, ch):
for i in range(count):
ch.send(values)
if __name__ == '__main__':
ch = FloatChannel(100000)
p = multiprocessing.Process(target=consume_test,
args=(1000, ch))
p.start()
values = [float(x) for x in range(100000)]
produce_test(1000, values, ch)
print("Done")
p.join()
參考資料
- 討論qq群144081101 591302926 567351477 釘釘免費群21745728
- 本文相關(guān)書籍下載
- 本文最新版本地址
- 本文涉及的python測試開發(fā)庫 謝謝點贊!
- pymotw multiprocessing參考