Python進(jìn)程和線程(13)

學(xué)習(xí)Python也有很長(zhǎng)一段時(shí)間了,進(jìn)程和線程這塊一直沒(méi)作記錄。直到最近在寫(xiě)一個(gè)爬蟲(chóng),如果不搞多進(jìn)程并發(fā)執(zhí)行,爬蟲(chóng)時(shí)間令人發(fā)指。

說(shuō)到進(jìn)程,必然少不了線程。系統(tǒng)可以有多個(gè)進(jìn)程,其中一個(gè)進(jìn)程中可以有多個(gè)線程。
每個(gè)進(jìn)程都有一個(gè)獨(dú)立的GIL(全局解釋器鎖),多進(jìn)程可以有效的利用多核CPU,而多線程只能占用CPU的一個(gè)核,因?yàn)榫€程執(zhí)行時(shí),必須先獲取GIL才能執(zhí)行,等到釋放GIL,其他線程才有執(zhí)行的機(jī)會(huì),即一個(gè)進(jìn)程中,不可能存在多個(gè)線程同時(shí)執(zhí)行,現(xiàn)在系統(tǒng)均是多核CPU,所以都不推薦使用多線程。

先說(shuō)說(shuō)進(jìn)程的事。Python提供了進(jìn)程包multiprocessing

1. 進(jìn)程,創(chuàng)建進(jìn)程,定義一個(gè)target即可,如下:
from multiprocessing import Process

# 定義任務(wù)
def p_task(a):
    print('process args is %s' % a)

# 定義一個(gè)進(jìn)程,執(zhí)行一個(gè)任務(wù)
p = Process(target=p_task, args=(1,))
# 啟動(dòng)進(jìn)程
p.start()
# 等待進(jìn)程執(zhí)行完畢
p.join()
print('over...')

輸出結(jié)果:

process args is 1
over...
2. 進(jìn)程池

如果需要?jiǎng)?chuàng)建多個(gè)進(jìn)程并復(fù)用,一般采取進(jìn)程池的方式,因?yàn)閯?chuàng)建進(jìn)程和銷毀進(jìn)程的開(kāi)銷很大。
創(chuàng)建進(jìn)程池,池的大小一般等于系統(tǒng)CPU核數(shù)。cpu核數(shù)可以通過(guò)multiprocessing.cpu_count() 查看。
Python提供了兩種創(chuàng)建方式:Pool()ProcessPoolExecutor()
兩種執(zhí)行效率如何呢?直接給結(jié)果:Pool的效率高于ProcessPoolExecutor,測(cè)試代碼很容易些,這里就不貼多余的代碼了。

第一種:Pool(),進(jìn)程池有兩個(gè)方法:
apply(self, func, args=(), kwds={}):同步執(zhí)行func,源碼如下:

    def apply(self, func, args=(), kwds={}):
        '''
        Equivalent of `func(*args, **kwds)`.
        '''
        assert self._state == RUN
        return self.apply_async(func, args, kwds).get()

可以看出,還是調(diào)用的異步執(zhí)行方法 self.apply_async(func, args, kwds),只不過(guò)apply_async后又調(diào)用了get()方法等待結(jié)果返回。
apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):異步執(zhí)行func
所以異步執(zhí)行任務(wù)需要返回結(jié)果時(shí),注意get()方法的使用,避免把異步變成同步??词纠?/p>

import time
from multiprocessing import Pool

def task(args):
    time.sleep(2)
    return args + 1

pool = Pool(3)

l = []
start = time.time()
for i in range(5):
    # 循環(huán)里面千萬(wàn)別調(diào)用r.get()方法,否則會(huì)等待結(jié)果返回,異步變同步
    r = pool.apply_async(task, args=(i,))
    l.append(r)

# close方法表示不再接收新任務(wù),之前的任務(wù)依舊執(zhí)行
pool.close()
pool.join()
# pool.terminate()會(huì)立即終止進(jìn)程池,包括正在執(zhí)行的子進(jìn)程
print('over... ', time.time() - start)

第二種:ProcessPoolExecutor(),說(shuō)說(shuō)它的map方法和submit方法。
map方法:按任務(wù)順序返回進(jìn)程return的值
submit方法:無(wú)序執(zhí)行,返回future對(duì)象,但返回的future對(duì)象是有序的,等進(jìn)程執(zhí)行結(jié)束,可以通過(guò)future.result()獲取進(jìn)程return的值

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from concurrent.futures import ProcessPoolExecutor


def task(i):
    time.sleep(0.5)
    return i


if __name__ == '__main__':
    count = 100

    list = [i for i in range(count)]
    start = time.time()
    with ProcessPoolExecutor(max_workers=8) as pool:
        # 有序輸出,直接返回結(jié)果
        print([data for data in pool.map(task, list)])

    print('map time: ', time.time() - start)

    start = time.time()
    future_list = []
    with ProcessPoolExecutor(max_workers=8) as pool:
        # 返回future對(duì)象
        future = pool.submit(task, list)
        future_list.append(future)

    print([future.result() for future in future_list][0])
    print('submit time: ', time.time() - start)

輸出結(jié)果:

map time:  6.669000148773193
submit time:  0.623999834060669

很顯然,有序輸出效率很低。一般情況下,推薦用submit方法。

3. 進(jìn)程之間通信

進(jìn)程都擁有自己獨(dú)立的數(shù)據(jù),它們之間默認(rèn)無(wú)法共享數(shù)據(jù)。下面介紹幾種進(jìn)程間通信的方式。

方法一:使用Array

from multiprocessing import Process, Array

g = []

def share_task(a):
    g.extend(a)
    for i in range(len(a)):
        a[i] = a[i] * 2

if __name__ == '__main__':
    arr = Array('i', range(10))
    p = Process(target=share_task, args=(arr,))
    p.start()
    p.join()

    print('arr: ', arr[:])
    print('g: ', g[:])

輸出結(jié)果:

arr:  [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
g:  []

g還是為空列表,表明默認(rèn)數(shù)據(jù)不共享,然而通過(guò)Array,子進(jìn)程改變了主進(jìn)程的數(shù)組中的元素。

方法二:使用Manager

from multiprocessing import Process, Manager

def m_task(d, l):
    d[1] = '1'
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as m:
        m_dict = m.dict()
        m_list = m.list(range(10))
        p = Process(target=m_task, args=(m_dict, m_list))
        p.start()
        p.join()

        print('m_dict: ', m_dict)
        print('m_list: ', m_list)

輸出結(jié)果:

m_dict:  {1: '1', 0.25: None}
m_list:  [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

方法三: 使用Queue

from multiprocessing import Process, Queue

def producer_task(q):
    for i in range(10):
        q.put(i)
        print('producer: ', i)


def consumer_task(q):
    for i in range(10):
        v = q.get()
        print('consumer: ', v)

if __name__ == '__main__':
    q = Queue()
    p_producer = Process(target=producer_task, args=(q,))
    p_consumer = Process(target=consumer_task, args=(q,))

    p_producer.start()
    p_consumer.start()

    p_producer.join()
    p_consumer.join()

輸出結(jié)果:

producer:  0
producer:  1
producer:  2
producer:  3
consumer:  0
producer:  4
consumer:  1
producer:  5
producer:  6
consumer:  2
consumer:  3
producer:  7
producer:  8
consumer:  4
producer:  9
consumer:  5
consumer:  6
consumer:  7
consumer:  8
consumer:  9

還有Pipes等通信方式,這里就不多說(shuō)了。


說(shuō)完進(jìn)程,就來(lái)說(shuō)說(shuō)線程,雖說(shuō)都不推薦使用多線程,但是本人測(cè)試過(guò)多進(jìn)程和多線程執(zhí)行效率的差距,并沒(méi)感覺(jué)到有多大差別。等后面寫(xiě)爬蟲(chóng)的時(shí)候會(huì)比較它們的執(zhí)行效率。

1. 線程:

多線程共享一個(gè)變量的時(shí)候,記得加鎖threading.Lock(),防止多個(gè)線程同時(shí)修改變量,造成數(shù)據(jù)錯(cuò)誤。
threading.local()能記錄每個(gè)線程獨(dú)有的變量。

看代碼和注釋:

#!usr/bin/env python
# -*- coding:utf-8 _*-
import multiprocessing
import threading

g = 0
local = threading.local()


def task():
    lock = threading.Lock()
    lock.acquire()
    try:
        global g
        print('thread %s is running...' % threading.current_thread().name)
        for i in range(100000):
            g = g + 1
            g = g - 1
    finally:
        lock.release()


def local_task(name):
    local.name = name
    print(name)
    print_local()


def print_local():
    print('thread name: %s, local var is %s' % (threading.current_thread().name, local.name))


if __name__ == '__main__':
    print('thread %s is running...' % threading.current_thread().name)
    # 創(chuàng)建線程
    thread1 = threading.Thread(target=task, name='SonThread1')
    thread2 = threading.Thread(target=task, name='SonThread2')
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    print(g)

    thread3 = threading.Thread(target=local_task, name='SonThread3', args=('Tom',))
    thread4 = threading.Thread(target=local_task, name='SonThread4', args=('Lucy',))
    thread3.start()
    thread4.start()

    print('thread %s is ended...' % threading.current_thread().name)

輸出結(jié)果:

thread MainThread is running...
thread SonThread1 is running...
thread SonThread2 is running...
0
Tom
thread name: SonThread3, local var is Tom
Lucy
thread name: SonThread4, local var is Lucy
thread MainThread is ended...
2. 線程池ThreadPoolExecutor:

其使用方式和進(jìn)程池ProcessPoolExecutor一模一樣。示例代碼如下:

def task(i):
    time.sleep(0.5)
    return i


list = [i for i in range(count)]
with ThreadPoolExecutor(max_workers=8) as pool:
    # 有序輸出,直接返回結(jié)果
    print([data for data in pool.map(task, list)])
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過(guò)簡(jiǎn)信或評(píng)論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

  • 本系列主要學(xué)習(xí)Python的基本使用和語(yǔ)法知識(shí),后續(xù)可能會(huì)圍繞著AI學(xué)習(xí)展開(kāi)。Python3 (1) Python...
    猿來(lái)如癡閱讀 1,720評(píng)論 0 5
  • 進(jìn)程與線程的區(qū)別 現(xiàn)在,多核CPU已經(jīng)非常普及了,但是,即使過(guò)去的單核CPU,也可以執(zhí)行多任務(wù)。由于CPU執(zhí)行代碼...
    蘇糊閱讀 848評(píng)論 0 2
  • 轉(zhuǎn)自: http://www.liaoxuefeng.com/wiki/0014316089557264a6b34...
    放風(fēng)箏的小小馬閱讀 245評(píng)論 0 0
  • 多進(jìn)程 forkUnix/Linux操作系統(tǒng)提供了一個(gè)fork()系統(tǒng)調(diào)用,它非常特殊。普通的函數(shù)調(diào)用,調(diào)用一次,...
    carolwhite閱讀 266評(píng)論 0 0
  • 今天在匆忙回家的路上,腦子里突然間就閃過(guò)一個(gè)念頭:我為什么會(huì)對(duì)別人給我的沒(méi)有期限的任務(wù)無(wú)限的拖延?因?yàn)槲业臐撘庾R(shí)太...
    今天安好閱讀 275評(píng)論 0 0

友情鏈接更多精彩內(nèi)容