Python爬蟲(chóng)(四)--多線(xiàn)程


Python-Socket網(wǎng)絡(luò)編程

1. thread模塊


  • python是支持多線(xiàn)程的, 主要是通過(guò)thread和threading這兩個(gè)模塊來(lái)實(shí)現(xiàn)的。
  • python的thread模塊是比較底層的模塊(或者說(shuō)輕量級(jí)),python的threading模塊是對(duì)thread做了一些包裝的,可以更加方便的被使用。

簡(jiǎn)要的看一下thread模塊中含函數(shù)和常量

import thread

thread.LockType  #鎖對(duì)象的一種, 用于線(xiàn)程的同步
thread.error  #線(xiàn)程的異常

thread.start_new_thread(function, args[, kwargs])  #創(chuàng)建一個(gè)新的線(xiàn)程
function : 線(xiàn)程執(zhí)行函數(shù)
args : 線(xiàn)程執(zhí)行函數(shù)的參數(shù), 類(lèi)似為tuple,
kwargs : 是一個(gè)字典
返回值: 返回線(xiàn)程的標(biāo)識(shí)符

thread.exit()  #線(xiàn)程退出函數(shù)
thread.allocate_lock()  #生成一個(gè)未鎖狀態(tài)的鎖對(duì)象
返回值: 返回一個(gè)鎖對(duì)象

鎖對(duì)象的方法

lock.acquire([waitflag]) #獲取鎖
無(wú)參數(shù)時(shí), 無(wú)條件獲取鎖, 無(wú)法獲取時(shí), 會(huì)被阻塞, 知道可以鎖被釋放
有參數(shù)時(shí), waitflag = 0 時(shí),表示只有在不需要等待的情況下才獲取鎖, 非零情況與上面相同
返回值 : 獲得鎖成功返回True, 獲得鎖失敗返回False

lock.release() #釋放鎖

lock.locked() #獲取當(dāng)前鎖的狀態(tài)
返回值 : 如果鎖已經(jīng)被某個(gè)線(xiàn)程獲取,返回True, 否則為False

1.1. thread多線(xiàn)程

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import thread
import time

def print_time(thread_name, delay) :
    count = 0
    while count < 5 :
        time.sleep(delay)
        count += 1
        print "%s : %s" % (thread_name, time.ctime(time.time()))

try :
    thread.start_new_thread(print_time, ("Thread-1", 2, ))
    thread.start_new_thread(print_time, ("Thread-2", 4, ))
except : 
    print "Error: unable to start the thread"

while True :
    pass

2. threading模塊


python的threading模塊是對(duì)thread做了一些包裝的,可以更加方便的被使用。經(jīng)常和Queue結(jié)合使用,Queue模塊中提供了同步的、線(xiàn)程安全的隊(duì)列類(lèi),包括FIFO(先入先出)隊(duì)列Queue,LIFO(后入先出)隊(duì)列LifoQueue,和優(yōu)先級(jí)隊(duì)列PriorityQueue。這些隊(duì)列都實(shí)現(xiàn)了鎖原語(yǔ),能夠在多線(xiàn)程中直接使用。可以使用隊(duì)列來(lái)實(shí)現(xiàn)線(xiàn)程間的同步

2.1. 常用函數(shù)和對(duì)象

#函數(shù)
threading.active_count()  #返回當(dāng)前線(xiàn)程對(duì)象Thread的個(gè)數(shù)
threading.enumerate()  #返回當(dāng)前運(yùn)行的線(xiàn)程對(duì)象Thread(包括后臺(tái)的)的list
threading.Condition()  #返回條件變量對(duì)象的工廠函數(shù), 主要用戶(hù)線(xiàn)程的并發(fā)
threading.current_thread()  #返回當(dāng)前的線(xiàn)程對(duì)象Thread, 文檔后面解釋沒(méi)看懂
threading.Lock()  #返回一個(gè)新的鎖對(duì)象, 是在thread模塊的基礎(chǔ)上實(shí)現(xiàn)的 與acquire()和release()結(jié)合使用

#類(lèi)
threading.Thread  #一個(gè)表示線(xiàn)程控制的類(lèi), 這個(gè)類(lèi)常被繼承
thraeding.Timer  #定時(shí)器,線(xiàn)程在一定時(shí)間后執(zhí)行
threading.ThreadError  #引發(fā)中各種線(xiàn)程相關(guān)異常

2.1.1. Thread對(duì)象

一般來(lái)說(shuō),使用線(xiàn)程有兩種模式, 一種是創(chuàng)建線(xiàn)程要執(zhí)行的函數(shù), 把這個(gè)函數(shù)傳遞進(jìn)Thread對(duì)象里,讓它來(lái)執(zhí)行. 另一種是直接從Thread繼承,創(chuàng)建一個(gè)新的class,把線(xiàn)程執(zhí)行的代碼放到這個(gè)新的class里。

常用兩種方式運(yùn)行線(xiàn)程(線(xiàn)程中包含name屬性) :

  • 在構(gòu)造函數(shù)中傳入用于線(xiàn)程運(yùn)行的函數(shù)(這種方式更加靈活)
  • 在子類(lèi)中重寫(xiě)threading.Thread基類(lèi)中run()方法(只重寫(xiě)__init__()和run()方法)

創(chuàng)建線(xiàn)程對(duì)象后, 通過(guò)調(diào)用start()函數(shù)運(yùn)行線(xiàn)程, 然后會(huì)自動(dòng)調(diào)用run()方法.

通過(guò)設(shè)置`daemon`屬性, 可以將線(xiàn)程設(shè)置為守護(hù)線(xiàn)程

threading.Thread(group = None, target = None, name = None, args = () kwars = {})
group : 應(yīng)該為None
target : 可以傳入一個(gè)函數(shù)用于run()方法調(diào)用,
name : 線(xiàn)程名 默認(rèn)使用"Thread-N"
args : 元組, 表示傳入target函數(shù)的參數(shù)
kwargs : 字典, 傳入target函數(shù)中關(guān)鍵字參數(shù)

屬性:
name  #線(xiàn)程表示, 沒(méi)有任何語(yǔ)義
doemon  #布爾值, 如果是守護(hù)線(xiàn)程為T(mén)rue, 不是為False, 主線(xiàn)程不是守護(hù)線(xiàn)程, 默認(rèn)threading.Thread.damon = False

類(lèi)方法: 
run()  #用以表示線(xiàn)程活動(dòng)的方法。
start()  #啟動(dòng)線(xiàn)程活動(dòng)。
join([time])  #等待至線(xiàn)程中止。這阻塞調(diào)用線(xiàn)程直至線(xiàn)程的join() 方法被調(diào)用中止-正常退出或者拋出未處理的異常-或者是可選的超時(shí)發(fā)生。
isAlive(): 返回線(xiàn)程是否活動(dòng)的。
getName(): 返回線(xiàn)程名。
setName(): 設(shè)置線(xiàn)程名。

范例:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading
import time

def test_thread(count) :
    while count > 0 :
        print "count = %d" % count
        count = count - 1
        time.sleep(1)

def main() :
    my_thread = threading.Thread(target = test_thread, args = (10, ))
    my_thread.start()
    my_thread.join()

if __name__ == '__main__':
    main()

2.2. 常用多線(xiàn)程寫(xiě)法

  • 固定線(xiàn)程運(yùn)行的函數(shù)
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading, thread
import time


class MyThread(threading.Thread):
    """docstring for MyThread"""

    def __init__(self, thread_id, name, counter) :
        super(MyThread, self).__init__()  #調(diào)用父類(lèi)的構(gòu)造函數(shù) 
        self.thread_id = thread_id
        self.name = name
        self.counter = counter

    def run(self) :
        print "Starting " + self.name
        print_time(self.name, self.counter, 5)
        print "Exiting " + self.name

def print_time(thread_name, delay, counter) :
    while counter :
        time.sleep(delay)
        print "%s %s" % (thread_name, time.ctime(time.time()))
        counter -= 1

def main():
    #創(chuàng)建新的線(xiàn)程
    thread1 = MyThread(1, "Thread-1", 1)
    thread2 = MyThread(2, "Thread-2", 2)

    #開(kāi)啟線(xiàn)程
    thread1.start()
    thread2.start()


    thread1.join()
    thread2.join()
    print "Exiting Main Thread"

if __name__ == '__main__':
    main()
  • 外部傳入線(xiàn)程運(yùn)行的函數(shù)
#/usr/bin/env python
# -*- coding: utf-8 -*-
import threading
import time

class MyThread(threading.Thread):
    """
    屬性:
    target: 傳入外部函數(shù), 用戶(hù)線(xiàn)程調(diào)用
    args: 函數(shù)參數(shù)
    """
    def __init__(self, target, args):
        super(MyThread, self).__init__()  #調(diào)用父類(lèi)的構(gòu)造函數(shù) 
        self.target = target
        self.args = args

    def run(self) :
        self.target(self.args)

def print_time(counter) :
    while counter :
        print "counter = %d" % counter
        counter -= 1
        time.sleep(1)

def main() :
    my_thread = MyThread(print_time, 10)
    my_thread.start()
    my_thread.join()

if __name__ == '__main__':
    main()

2.3. 生產(chǎn)者消費(fèi)者問(wèn)題

試著用python寫(xiě)了一個(gè)生產(chǎn)者消費(fèi)者問(wèn)題(偽生產(chǎn)者消費(fèi)者), 只是使用簡(jiǎn)單的鎖, 感覺(jué)有點(diǎn)不太對(duì), 下面另一個(gè)程序會(huì)寫(xiě)出正確的生產(chǎn)者消費(fèi)者問(wèn)題

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import thread, threading
import urllib2
import time, random
import Queue

share_queue = Queue.Queue()  #共享隊(duì)列
my_lock = thread.allocate_lock()
class Producer(threading.Thread) :

    def run(self) :
        products = range(5)
        global share_queue
        while True :
            num = random.choice(products)
            my_lock.acquire()
            share_queue.put(num)
            print  "Produce : ", num
            my_lock.release()
            time.sleep(random.random())

class Consumer(threading.Thread) :

    def run(self) :
        global share_queue
        while True:
            my_lock.acquire()
            if share_queue.empty() : #這里沒(méi)有使用信號(hào)量機(jī)制進(jìn)行阻塞等待, 
                print "Queue is Empty..."  
                my_lock.release()
                time.sleep(random.random())
                continue
            num = share_queue.get()
            print "Consumer : ", num
            my_lock.release()
            time.sleep(random.random())

def main() :
    producer = Producer()
    consumer = Consumer()
    producer.start()
    consumer.start()

if __name__ == '__main__':
    main()

殺死多線(xiàn)程程序方法: 使用control + z掛起程序(程序依然在后臺(tái), 可以使用ps aux查看), 獲得程序的進(jìn)程號(hào), 然后使用kill -9 進(jìn)程號(hào)殺死進(jìn)程

參考一篇帖子解決了上述問(wèn)題,重寫(xiě)了生產(chǎn)者消費(fèi)者問(wèn)題程序, 參考鏈接慣例放在最后.

使用了wait()和notify()解決

當(dāng)然最簡(jiǎn)答的方法是直接使用Queue,Queue封裝了Condition的行為, 如wait(), notify(), acquire(), 沒(méi)看文檔就這樣, 使用了Queue竟然不知道封裝了這些函數(shù), 繼續(xù)滾去看文檔了

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading
import random, time, Queue

MAX_SIZE = 5
SHARE_Q = []  #模擬共享隊(duì)列
CONDITION = threading.Condition()

class Producer(threading.Thread) :

    def run(self) :
        products = range(5)
        global SHARE_Q
        while True :
            CONDITION.acquire()
            if len(SHARE_Q) == 5 :
                print "Queue is full.."
                CONDITION.wait()
                print "Consumer have comsumed something"
            product = random.choice(products)
            SHARE_Q.append(product)
            print "Producer : ", product
            CONDITION.notify()
            CONDITION.release()
            time.sleep(random.random())

class Consumer(threading.Thread) :

    def run(self) :
        global SHARE_Q
        while True:
            CONDITION.acquire()
            if not SHARE_Q :
                print "Queue is Empty..."
                CONDITION.wait()
                print "Producer have producted something"
            product = SHARE_Q.pop(0)
            print "Consumer :", product
            CONDITION.notify()
            CONDITION.release()
            time.sleep(random.random())

def main() :
    producer = Producer()
    consumer = Consumer()
    producer.start()
    consumer.start()

if __name__ == '__main__':
    main()

2.4.簡(jiǎn)單鎖

如果只是簡(jiǎn)單的加鎖解鎖可以直接使用threading.Lock()生成鎖對(duì)象, 然后使用acquire()和release()方法

例如:

#!/usr/bin/env python
# -*- coding:utf-8 -*- 

import threading
import time

class MyThread(threading.Thread) :
    
    def __init__(self, thread_id, name, counter) :
        threading.Thread.__init__(self)
        self.thread_id = thread_id
        self.name = name
        self.counter = counter

    def run(self) :
        #重寫(xiě)run方法, 添加線(xiàn)程執(zhí)行邏輯, start函數(shù)運(yùn)行會(huì)自動(dòng)執(zhí)行
        print  "Starting " + self.name
        threadLock.acquire() #獲取所
        print_time(self.name, self.counter, 3)
        threadLock.release() #釋放鎖

def print_time(thread_name, delay, counter) :
    while counter :
        time.sleep(delay)
        print "%s %s" % (thread_name, time.ctime(time.time()))
        counter -= 1

threadLock = threading.Lock()
threads = [] #存放線(xiàn)程對(duì)象

thread1 = MyThread(1, "Thread-1", 1)
thread2 = MyThread(2, "Thread-2", 2)

#開(kāi)啟線(xiàn)程
thread1.start()
thread2.start()

for t in threads :
    t.join()  #等待線(xiàn)程直到終止
print "Exiting Main Thread"

2.5. Condition

如果是向生產(chǎn)者消費(fèi)者類(lèi)似的情形, 使用Condition類(lèi) 或者直接使用Queue模塊

Condition

條件變量中有acquire()和release方法用來(lái)調(diào)用鎖的方法, 有wait(), notify(), notifyAll()方法, 后面是三個(gè)方法必須在獲取鎖的情況下調(diào)用, 否則產(chǎn)生RuntimeError錯(cuò)誤.

  • 當(dāng)一個(gè)線(xiàn)程獲得鎖后, 發(fā)現(xiàn)沒(méi)有期望的資源或者狀態(tài), 就會(huì)調(diào)用wait()阻塞, 并釋放已經(jīng)獲得鎖, 知道期望的資源或者狀態(tài)發(fā)生改變
  • 當(dāng)一個(gè)線(xiàn)程獲得鎖, 改變了資源或者狀態(tài), 就會(huì)調(diào)用notify()和notifyAll()去通知其他線(xiàn)程,
#官方文檔中提供的生產(chǎn)者消費(fèi)者模型
# Consume one item
cv.acquire()
while not an_item_is_available():
    cv.wait()
get_an_available_item()
cv.release()

# Produce one item
cv.acquire()
make_an_item_available()
cv.notify()
cv.release()
#threading.Condition類(lèi)
thread.Condition([lock])
可選參數(shù)lock: 必須是Lock或者RLock對(duì)象, 并被作為underlying鎖(悲觀鎖?), 否則, 會(huì)創(chuàng)建一個(gè)新的RLock對(duì)象作為underlying鎖

類(lèi)方法:
acquire()  #獲得鎖
release()  #釋放鎖
wait([timeout])  #持續(xù)等待直到被notify()或者notifyAll()通知或者超時(shí)(必須先獲得鎖),
#wait()所做操作, 先釋放獲得的鎖, 然后阻塞, 知道被notify或者notifyAll喚醒或者超時(shí), 一旦被喚醒或者超時(shí), 會(huì)重新獲取鎖(應(yīng)該說(shuō)搶鎖), 然后返回
notify()  #喚醒一個(gè)wait()阻塞的線(xiàn)程.
notify_all()或者notifyAll()  #喚醒所有阻塞的線(xiàn)程

參考程序可以查看上面的生產(chǎn)者消費(fèi)者程序

3. 參考鏈接


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 線(xiàn)程 引言&動(dòng)機(jī) 考慮一下這個(gè)場(chǎng)景,我們有10000條數(shù)據(jù)需要處理,處理每條數(shù)據(jù)需要花費(fèi)1秒,但讀取數(shù)據(jù)只需要0....
    不浪漫的浪漫_ea03閱讀 416評(píng)論 0 0
  • 引言&動(dòng)機(jī) 考慮一下這個(gè)場(chǎng)景,我們有10000條數(shù)據(jù)需要處理,處理每條數(shù)據(jù)需要花費(fèi)1秒,但讀取數(shù)據(jù)只需要0.1秒,...
    chen_000閱讀 586評(píng)論 0 0
  • 1.進(jìn)程和線(xiàn)程 隊(duì)列:1、進(jìn)程之間的通信: q = multiprocessing.Queue()2、...
    一只寫(xiě)程序的猿閱讀 1,237評(píng)論 0 17
  • 線(xiàn)程 1.同步概念 1.多線(xiàn)程開(kāi)發(fā)可能遇到的問(wèn)題 同步不是一起的意思,是協(xié)同步調(diào) 假設(shè)兩個(gè)線(xiàn)程t1和t2都要對(duì)nu...
    TENG書(shū)閱讀 696評(píng)論 0 1
  • 我的自我簡(jiǎn)介: 一名充滿(mǎn)愛(ài)和溫暖的醫(yī)者,懷著一顆慈悲和感恩的心走在傳播健康的路上,希望將健康和愛(ài)傳播出...
    杏子心語(yǔ)閱讀 325評(píng)論 0 0

友情鏈接更多精彩內(nèi)容