在python中Process比Thread更穩(wěn)定,且Process能分布到多臺(tái)機(jī)器,而Thread只能分布到同一臺(tái)機(jī)器的多個(gè)CPU。
Python的multiprocessing模塊不但支持多進(jìn)程,其中managers子模塊還支持把多進(jìn)程分布到多臺(tái)機(jī)器上。
task_master.py
# coding:utf-8
import random,time,queue
from multiprocessing.managers import BaseManager
#發(fā)送任務(wù)的隊(duì)列
task_queue = queue.Queue()
#接收任務(wù)的隊(duì)列
result_queue = queue.Queue()
#把兩個(gè)任務(wù)隊(duì)列在網(wǎng)絡(luò)上注冊(cè)
BaseManager.register('get_task_queue',callable=lambda: task_queue)
BaseManager.register('get_result_queue',callable=lambda: result_queue)
#綁定端口5000,設(shè)置驗(yàn)證碼:8e8b55261098a425273f31a
manager = BaseManager(address=('',5000),authkey=b'8e8b55261098a425273f31a')
#啟動(dòng)隊(duì)列
manager.start()
# 獲取通過(guò)網(wǎng)絡(luò)訪問(wèn)的queue對(duì)象:
task = manager.get_task_queue()
result = manager.get_result_queue()
begintime = time.time()
for i in range(50):
r = random.randint(10001,99999)
print("Put task %d ..." % r)
task.put(r)
for i in range(50):
r = result.get(timeout=10)
print("Result is %s" % r)
manager.shutdown()
print("master exit.")
endtime = time.time()
print('用時(shí):%0.5f' %(endtime-begintime))
task_worker.py
#task_worker.py
#coding:utf-8
import time,sys,queue
from multiprocessing.managers import BaseManager
#獲取網(wǎng)絡(luò)中的Queue,并注冊(cè)
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')
#連接到manager.py所在機(jī)器 server_addr 為遠(yuǎn)程master服務(wù)器的ip地址
server_addr = '127.0.0.1'
print("Connecting to server %s" % server_addr)
m = BaseManager(address=(server_addr,5000),authkey=b'8e8b55261098a425273f31a')
m.connect()
#獲取Queue對(duì)象
task = m.get_task_queue()
result = m.get_result_queue()
#從task中獲取任務(wù),并把結(jié)果寫入result隊(duì)列
for i in range(50):
try:
n = task.get(timeout=2)
print('run task %d * %d' %(n,n))
r = '%d * %d = %d ' % (n, n, n*n)
time.sleep(1)
result.put(r)
except queue.Empty:
print('Task queue is empty')
#處理結(jié)束
print('Worker exit .')