學(xué)習(xí)Python也有很長(zhǎng)一段時(shí)間了,進(jìn)程和線程這塊一直沒(méi)作記錄。直到最近在寫(xiě)一個(gè)爬蟲(chóng),如果不搞多進(jìn)程并發(fā)執(zhí)行,爬蟲(chóng)時(shí)間令人發(fā)指。
說(shuō)到進(jìn)程,必然少不了線程。系統(tǒng)可以有多個(gè)進(jìn)程,其中一個(gè)進(jìn)程中可以有多個(gè)線程。
每個(gè)進(jìn)程都有一個(gè)獨(dú)立的GIL(全局解釋器鎖),多進(jìn)程可以有效的利用多核CPU,而多線程只能占用CPU的一個(gè)核,因?yàn)榫€程執(zhí)行時(shí),必須先獲取GIL才能執(zhí)行,等到釋放GIL,其他線程才有執(zhí)行的機(jī)會(huì),即一個(gè)進(jìn)程中,不可能存在多個(gè)線程同時(shí)執(zhí)行,現(xiàn)在系統(tǒng)均是多核CPU,所以都不推薦使用多線程。
先說(shuō)說(shuō)進(jìn)程的事。Python提供了進(jìn)程包multiprocessing。
1. 進(jìn)程,創(chuàng)建進(jìn)程,定義一個(gè)target即可,如下:
from multiprocessing import Process
# 定義任務(wù)
def p_task(a):
print('process args is %s' % a)
# 定義一個(gè)進(jìn)程,執(zhí)行一個(gè)任務(wù)
p = Process(target=p_task, args=(1,))
# 啟動(dòng)進(jìn)程
p.start()
# 等待進(jìn)程執(zhí)行完畢
p.join()
print('over...')
輸出結(jié)果:
process args is 1
over...
2. 進(jìn)程池
如果需要?jiǎng)?chuàng)建多個(gè)進(jìn)程并復(fù)用,一般采取進(jìn)程池的方式,因?yàn)閯?chuàng)建進(jìn)程和銷毀進(jìn)程的開(kāi)銷很大。
創(chuàng)建進(jìn)程池,池的大小一般等于系統(tǒng)CPU核數(shù)。cpu核數(shù)可以通過(guò)multiprocessing.cpu_count() 查看。
Python提供了兩種創(chuàng)建方式:Pool()和ProcessPoolExecutor()
兩種執(zhí)行效率如何呢?直接給結(jié)果:Pool的效率高于ProcessPoolExecutor,測(cè)試代碼很容易些,這里就不貼多余的代碼了。
第一種:Pool(),進(jìn)程池有兩個(gè)方法:
apply(self, func, args=(), kwds={}):同步執(zhí)行func,源碼如下:
def apply(self, func, args=(), kwds={}):
'''
Equivalent of `func(*args, **kwds)`.
'''
assert self._state == RUN
return self.apply_async(func, args, kwds).get()
可以看出,還是調(diào)用的異步執(zhí)行方法 self.apply_async(func, args, kwds),只不過(guò)apply_async后又調(diào)用了get()方法等待結(jié)果返回。
apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):異步執(zhí)行func
所以異步執(zhí)行任務(wù)需要返回結(jié)果時(shí),注意get()方法的使用,避免把異步變成同步??词纠?/p>
import time
from multiprocessing import Pool
def task(args):
time.sleep(2)
return args + 1
pool = Pool(3)
l = []
start = time.time()
for i in range(5):
# 循環(huán)里面千萬(wàn)別調(diào)用r.get()方法,否則會(huì)等待結(jié)果返回,異步變同步
r = pool.apply_async(task, args=(i,))
l.append(r)
# close方法表示不再接收新任務(wù),之前的任務(wù)依舊執(zhí)行
pool.close()
pool.join()
# pool.terminate()會(huì)立即終止進(jìn)程池,包括正在執(zhí)行的子進(jìn)程
print('over... ', time.time() - start)
第二種:ProcessPoolExecutor(),說(shuō)說(shuō)它的map方法和submit方法。
map方法:按任務(wù)順序返回進(jìn)程return的值
submit方法:無(wú)序執(zhí)行,返回future對(duì)象,但返回的future對(duì)象是有序的,等進(jìn)程執(zhí)行結(jié)束,可以通過(guò)future.result()獲取進(jìn)程return的值
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from concurrent.futures import ProcessPoolExecutor
def task(i):
time.sleep(0.5)
return i
if __name__ == '__main__':
count = 100
list = [i for i in range(count)]
start = time.time()
with ProcessPoolExecutor(max_workers=8) as pool:
# 有序輸出,直接返回結(jié)果
print([data for data in pool.map(task, list)])
print('map time: ', time.time() - start)
start = time.time()
future_list = []
with ProcessPoolExecutor(max_workers=8) as pool:
# 返回future對(duì)象
future = pool.submit(task, list)
future_list.append(future)
print([future.result() for future in future_list][0])
print('submit time: ', time.time() - start)
輸出結(jié)果:
map time: 6.669000148773193
submit time: 0.623999834060669
很顯然,有序輸出效率很低。一般情況下,推薦用submit方法。
3. 進(jìn)程之間通信
進(jìn)程都擁有自己獨(dú)立的數(shù)據(jù),它們之間默認(rèn)無(wú)法共享數(shù)據(jù)。下面介紹幾種進(jìn)程間通信的方式。
方法一:使用Array
from multiprocessing import Process, Array
g = []
def share_task(a):
g.extend(a)
for i in range(len(a)):
a[i] = a[i] * 2
if __name__ == '__main__':
arr = Array('i', range(10))
p = Process(target=share_task, args=(arr,))
p.start()
p.join()
print('arr: ', arr[:])
print('g: ', g[:])
輸出結(jié)果:
arr: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
g: []
g還是為空列表,表明默認(rèn)數(shù)據(jù)不共享,然而通過(guò)Array,子進(jìn)程改變了主進(jìn)程的數(shù)組中的元素。
方法二:使用Manager
from multiprocessing import Process, Manager
def m_task(d, l):
d[1] = '1'
d[0.25] = None
l.reverse()
if __name__ == '__main__':
with Manager() as m:
m_dict = m.dict()
m_list = m.list(range(10))
p = Process(target=m_task, args=(m_dict, m_list))
p.start()
p.join()
print('m_dict: ', m_dict)
print('m_list: ', m_list)
輸出結(jié)果:
m_dict: {1: '1', 0.25: None}
m_list: [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
方法三: 使用Queue
from multiprocessing import Process, Queue
def producer_task(q):
for i in range(10):
q.put(i)
print('producer: ', i)
def consumer_task(q):
for i in range(10):
v = q.get()
print('consumer: ', v)
if __name__ == '__main__':
q = Queue()
p_producer = Process(target=producer_task, args=(q,))
p_consumer = Process(target=consumer_task, args=(q,))
p_producer.start()
p_consumer.start()
p_producer.join()
p_consumer.join()
輸出結(jié)果:
producer: 0
producer: 1
producer: 2
producer: 3
consumer: 0
producer: 4
consumer: 1
producer: 5
producer: 6
consumer: 2
consumer: 3
producer: 7
producer: 8
consumer: 4
producer: 9
consumer: 5
consumer: 6
consumer: 7
consumer: 8
consumer: 9
還有Pipes等通信方式,這里就不多說(shuō)了。
說(shuō)完進(jìn)程,就來(lái)說(shuō)說(shuō)線程,雖說(shuō)都不推薦使用多線程,但是本人測(cè)試過(guò)多進(jìn)程和多線程執(zhí)行效率的差距,并沒(méi)感覺(jué)到有多大差別。等后面寫(xiě)爬蟲(chóng)的時(shí)候會(huì)比較它們的執(zhí)行效率。
1. 線程:
多線程共享一個(gè)變量的時(shí)候,記得加鎖threading.Lock(),防止多個(gè)線程同時(shí)修改變量,造成數(shù)據(jù)錯(cuò)誤。
threading.local()能記錄每個(gè)線程獨(dú)有的變量。
看代碼和注釋:
#!usr/bin/env python
# -*- coding:utf-8 _*-
import multiprocessing
import threading
g = 0
local = threading.local()
def task():
lock = threading.Lock()
lock.acquire()
try:
global g
print('thread %s is running...' % threading.current_thread().name)
for i in range(100000):
g = g + 1
g = g - 1
finally:
lock.release()
def local_task(name):
local.name = name
print(name)
print_local()
def print_local():
print('thread name: %s, local var is %s' % (threading.current_thread().name, local.name))
if __name__ == '__main__':
print('thread %s is running...' % threading.current_thread().name)
# 創(chuàng)建線程
thread1 = threading.Thread(target=task, name='SonThread1')
thread2 = threading.Thread(target=task, name='SonThread2')
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(g)
thread3 = threading.Thread(target=local_task, name='SonThread3', args=('Tom',))
thread4 = threading.Thread(target=local_task, name='SonThread4', args=('Lucy',))
thread3.start()
thread4.start()
print('thread %s is ended...' % threading.current_thread().name)
輸出結(jié)果:
thread MainThread is running...
thread SonThread1 is running...
thread SonThread2 is running...
0
Tom
thread name: SonThread3, local var is Tom
Lucy
thread name: SonThread4, local var is Lucy
thread MainThread is ended...
2. 線程池ThreadPoolExecutor:
其使用方式和進(jìn)程池ProcessPoolExecutor一模一樣。示例代碼如下:
def task(i):
time.sleep(0.5)
return i
list = [i for i in range(count)]
with ThreadPoolExecutor(max_workers=8) as pool:
# 有序輸出,直接返回結(jié)果
print([data for data in pool.map(task, list)])