Python-Socket網(wǎng)絡(luò)編程
1. thread模塊
- python是支持多線(xiàn)程的, 主要是通過(guò)thread和threading這兩個(gè)模塊來(lái)實(shí)現(xiàn)的。
- python的thread模塊是比較底層的模塊(或者說(shuō)輕量級(jí)),python的threading模塊是對(duì)thread做了一些包裝的,可以更加方便的被使用。
簡(jiǎn)要的看一下thread模塊中含函數(shù)和常量
import thread
thread.LockType #鎖對(duì)象的一種, 用于線(xiàn)程的同步
thread.error #線(xiàn)程的異常
thread.start_new_thread(function, args[, kwargs]) #創(chuàng)建一個(gè)新的線(xiàn)程
function : 線(xiàn)程執(zhí)行函數(shù)
args : 線(xiàn)程執(zhí)行函數(shù)的參數(shù), 類(lèi)似為tuple,
kwargs : 是一個(gè)字典
返回值: 返回線(xiàn)程的標(biāo)識(shí)符
thread.exit() #線(xiàn)程退出函數(shù)
thread.allocate_lock() #生成一個(gè)未鎖狀態(tài)的鎖對(duì)象
返回值: 返回一個(gè)鎖對(duì)象
鎖對(duì)象的方法
lock.acquire([waitflag]) #獲取鎖
無(wú)參數(shù)時(shí), 無(wú)條件獲取鎖, 無(wú)法獲取時(shí), 會(huì)被阻塞, 知道可以鎖被釋放
有參數(shù)時(shí), waitflag = 0 時(shí),表示只有在不需要等待的情況下才獲取鎖, 非零情況與上面相同
返回值 : 獲得鎖成功返回True, 獲得鎖失敗返回False
lock.release() #釋放鎖
lock.locked() #獲取當(dāng)前鎖的狀態(tài)
返回值 : 如果鎖已經(jīng)被某個(gè)線(xiàn)程獲取,返回True, 否則為False
1.1. thread多線(xiàn)程
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import thread
import time
def print_time(thread_name, delay) :
count = 0
while count < 5 :
time.sleep(delay)
count += 1
print "%s : %s" % (thread_name, time.ctime(time.time()))
try :
thread.start_new_thread(print_time, ("Thread-1", 2, ))
thread.start_new_thread(print_time, ("Thread-2", 4, ))
except :
print "Error: unable to start the thread"
while True :
pass
2. threading模塊
python的threading模塊是對(duì)thread做了一些包裝的,可以更加方便的被使用。經(jīng)常和Queue結(jié)合使用,Queue模塊中提供了同步的、線(xiàn)程安全的隊(duì)列類(lèi),包括
FIFO(先入先出)隊(duì)列Queue,LIFO(后入先出)隊(duì)列LifoQueue,和優(yōu)先級(jí)隊(duì)列PriorityQueue。這些隊(duì)列都實(shí)現(xiàn)了鎖原語(yǔ),能夠在多線(xiàn)程中直接使用。可以使用隊(duì)列來(lái)實(shí)現(xiàn)線(xiàn)程間的同步
2.1. 常用函數(shù)和對(duì)象
#函數(shù)
threading.active_count() #返回當(dāng)前線(xiàn)程對(duì)象Thread的個(gè)數(shù)
threading.enumerate() #返回當(dāng)前運(yùn)行的線(xiàn)程對(duì)象Thread(包括后臺(tái)的)的list
threading.Condition() #返回條件變量對(duì)象的工廠函數(shù), 主要用戶(hù)線(xiàn)程的并發(fā)
threading.current_thread() #返回當(dāng)前的線(xiàn)程對(duì)象Thread, 文檔后面解釋沒(méi)看懂
threading.Lock() #返回一個(gè)新的鎖對(duì)象, 是在thread模塊的基礎(chǔ)上實(shí)現(xiàn)的 與acquire()和release()結(jié)合使用
#類(lèi)
threading.Thread #一個(gè)表示線(xiàn)程控制的類(lèi), 這個(gè)類(lèi)常被繼承
thraeding.Timer #定時(shí)器,線(xiàn)程在一定時(shí)間后執(zhí)行
threading.ThreadError #引發(fā)中各種線(xiàn)程相關(guān)異常
2.1.1. Thread對(duì)象
一般來(lái)說(shuō),使用線(xiàn)程有兩種模式, 一種是創(chuàng)建線(xiàn)程要執(zhí)行的函數(shù), 把這個(gè)函數(shù)傳遞進(jìn)Thread對(duì)象里,讓它來(lái)執(zhí)行. 另一種是直接從Thread繼承,創(chuàng)建一個(gè)新的class,把線(xiàn)程執(zhí)行的代碼放到這個(gè)新的class里。
常用兩種方式運(yùn)行線(xiàn)程(線(xiàn)程中包含name屬性) :
- 在構(gòu)造函數(shù)中傳入用于線(xiàn)程運(yùn)行的函數(shù)(這種方式更加靈活)
- 在子類(lèi)中重寫(xiě)threading.Thread基類(lèi)中run()方法(
只重寫(xiě)__init__()和run()方法)
創(chuàng)建線(xiàn)程對(duì)象后, 通過(guò)調(diào)用start()函數(shù)運(yùn)行線(xiàn)程, 然后會(huì)自動(dòng)調(diào)用run()方法.
通過(guò)設(shè)置`daemon`屬性, 可以將線(xiàn)程設(shè)置為守護(hù)線(xiàn)程
threading.Thread(group = None, target = None, name = None, args = () kwars = {})
group : 應(yīng)該為None
target : 可以傳入一個(gè)函數(shù)用于run()方法調(diào)用,
name : 線(xiàn)程名 默認(rèn)使用"Thread-N"
args : 元組, 表示傳入target函數(shù)的參數(shù)
kwargs : 字典, 傳入target函數(shù)中關(guān)鍵字參數(shù)
屬性:
name #線(xiàn)程表示, 沒(méi)有任何語(yǔ)義
doemon #布爾值, 如果是守護(hù)線(xiàn)程為T(mén)rue, 不是為False, 主線(xiàn)程不是守護(hù)線(xiàn)程, 默認(rèn)threading.Thread.damon = False
類(lèi)方法:
run() #用以表示線(xiàn)程活動(dòng)的方法。
start() #啟動(dòng)線(xiàn)程活動(dòng)。
join([time]) #等待至線(xiàn)程中止。這阻塞調(diào)用線(xiàn)程直至線(xiàn)程的join() 方法被調(diào)用中止-正常退出或者拋出未處理的異常-或者是可選的超時(shí)發(fā)生。
isAlive(): 返回線(xiàn)程是否活動(dòng)的。
getName(): 返回線(xiàn)程名。
setName(): 設(shè)置線(xiàn)程名。
范例:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
def test_thread(count) :
while count > 0 :
print "count = %d" % count
count = count - 1
time.sleep(1)
def main() :
my_thread = threading.Thread(target = test_thread, args = (10, ))
my_thread.start()
my_thread.join()
if __name__ == '__main__':
main()
2.2. 常用多線(xiàn)程寫(xiě)法
- 固定線(xiàn)程運(yùn)行的函數(shù)
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, thread
import time
class MyThread(threading.Thread):
"""docstring for MyThread"""
def __init__(self, thread_id, name, counter) :
super(MyThread, self).__init__() #調(diào)用父類(lèi)的構(gòu)造函數(shù)
self.thread_id = thread_id
self.name = name
self.counter = counter
def run(self) :
print "Starting " + self.name
print_time(self.name, self.counter, 5)
print "Exiting " + self.name
def print_time(thread_name, delay, counter) :
while counter :
time.sleep(delay)
print "%s %s" % (thread_name, time.ctime(time.time()))
counter -= 1
def main():
#創(chuàng)建新的線(xiàn)程
thread1 = MyThread(1, "Thread-1", 1)
thread2 = MyThread(2, "Thread-2", 2)
#開(kāi)啟線(xiàn)程
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print "Exiting Main Thread"
if __name__ == '__main__':
main()
- 外部傳入線(xiàn)程運(yùn)行的函數(shù)
#/usr/bin/env python
# -*- coding: utf-8 -*-
import threading
import time
class MyThread(threading.Thread):
"""
屬性:
target: 傳入外部函數(shù), 用戶(hù)線(xiàn)程調(diào)用
args: 函數(shù)參數(shù)
"""
def __init__(self, target, args):
super(MyThread, self).__init__() #調(diào)用父類(lèi)的構(gòu)造函數(shù)
self.target = target
self.args = args
def run(self) :
self.target(self.args)
def print_time(counter) :
while counter :
print "counter = %d" % counter
counter -= 1
time.sleep(1)
def main() :
my_thread = MyThread(print_time, 10)
my_thread.start()
my_thread.join()
if __name__ == '__main__':
main()
2.3. 生產(chǎn)者消費(fèi)者問(wèn)題
試著用python寫(xiě)了一個(gè)生產(chǎn)者消費(fèi)者問(wèn)題(偽生產(chǎn)者消費(fèi)者), 只是使用簡(jiǎn)單的鎖, 感覺(jué)有點(diǎn)不太對(duì), 下面另一個(gè)程序會(huì)寫(xiě)出正確的生產(chǎn)者消費(fèi)者問(wèn)題
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import thread, threading
import urllib2
import time, random
import Queue
share_queue = Queue.Queue() #共享隊(duì)列
my_lock = thread.allocate_lock()
class Producer(threading.Thread) :
def run(self) :
products = range(5)
global share_queue
while True :
num = random.choice(products)
my_lock.acquire()
share_queue.put(num)
print "Produce : ", num
my_lock.release()
time.sleep(random.random())
class Consumer(threading.Thread) :
def run(self) :
global share_queue
while True:
my_lock.acquire()
if share_queue.empty() : #這里沒(méi)有使用信號(hào)量機(jī)制進(jìn)行阻塞等待,
print "Queue is Empty..."
my_lock.release()
time.sleep(random.random())
continue
num = share_queue.get()
print "Consumer : ", num
my_lock.release()
time.sleep(random.random())
def main() :
producer = Producer()
consumer = Consumer()
producer.start()
consumer.start()
if __name__ == '__main__':
main()
殺死多線(xiàn)程程序方法: 使用control + z掛起程序(程序依然在后臺(tái), 可以使用ps aux查看), 獲得程序的進(jìn)程號(hào), 然后使用kill -9 進(jìn)程號(hào)殺死進(jìn)程
參考一篇帖子解決了上述問(wèn)題,重寫(xiě)了生產(chǎn)者消費(fèi)者問(wèn)題程序, 參考鏈接慣例放在最后.
使用了wait()和notify()解決
當(dāng)然最簡(jiǎn)答的方法是直接使用
Queue,Queue封裝了Condition的行為, 如wait(), notify(), acquire(), 沒(méi)看文檔就這樣, 使用了Queue竟然不知道封裝了這些函數(shù), 繼續(xù)滾去看文檔了
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import random, time, Queue
MAX_SIZE = 5
SHARE_Q = [] #模擬共享隊(duì)列
CONDITION = threading.Condition()
class Producer(threading.Thread) :
def run(self) :
products = range(5)
global SHARE_Q
while True :
CONDITION.acquire()
if len(SHARE_Q) == 5 :
print "Queue is full.."
CONDITION.wait()
print "Consumer have comsumed something"
product = random.choice(products)
SHARE_Q.append(product)
print "Producer : ", product
CONDITION.notify()
CONDITION.release()
time.sleep(random.random())
class Consumer(threading.Thread) :
def run(self) :
global SHARE_Q
while True:
CONDITION.acquire()
if not SHARE_Q :
print "Queue is Empty..."
CONDITION.wait()
print "Producer have producted something"
product = SHARE_Q.pop(0)
print "Consumer :", product
CONDITION.notify()
CONDITION.release()
time.sleep(random.random())
def main() :
producer = Producer()
consumer = Consumer()
producer.start()
consumer.start()
if __name__ == '__main__':
main()
2.4.簡(jiǎn)單鎖
如果只是簡(jiǎn)單的加鎖解鎖可以直接使用threading.Lock()生成鎖對(duì)象, 然后使用acquire()和release()方法
例如:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
class MyThread(threading.Thread) :
def __init__(self, thread_id, name, counter) :
threading.Thread.__init__(self)
self.thread_id = thread_id
self.name = name
self.counter = counter
def run(self) :
#重寫(xiě)run方法, 添加線(xiàn)程執(zhí)行邏輯, start函數(shù)運(yùn)行會(huì)自動(dòng)執(zhí)行
print "Starting " + self.name
threadLock.acquire() #獲取所
print_time(self.name, self.counter, 3)
threadLock.release() #釋放鎖
def print_time(thread_name, delay, counter) :
while counter :
time.sleep(delay)
print "%s %s" % (thread_name, time.ctime(time.time()))
counter -= 1
threadLock = threading.Lock()
threads = [] #存放線(xiàn)程對(duì)象
thread1 = MyThread(1, "Thread-1", 1)
thread2 = MyThread(2, "Thread-2", 2)
#開(kāi)啟線(xiàn)程
thread1.start()
thread2.start()
for t in threads :
t.join() #等待線(xiàn)程直到終止
print "Exiting Main Thread"
2.5. Condition
如果是向生產(chǎn)者消費(fèi)者類(lèi)似的情形, 使用Condition類(lèi) 或者直接使用
Queue模塊
Condition
條件變量中有acquire()和release方法用來(lái)調(diào)用鎖的方法, 有wait(), notify(), notifyAll()方法, 后面是三個(gè)方法必須在獲取鎖的情況下調(diào)用, 否則產(chǎn)生RuntimeError錯(cuò)誤.
- 當(dāng)一個(gè)線(xiàn)程獲得鎖后, 發(fā)現(xiàn)沒(méi)有期望的資源或者狀態(tài), 就會(huì)調(diào)用wait()阻塞, 并釋放已經(jīng)獲得鎖, 知道期望的資源或者狀態(tài)發(fā)生改變
- 當(dāng)一個(gè)線(xiàn)程獲得鎖, 改變了資源或者狀態(tài), 就會(huì)調(diào)用notify()和notifyAll()去通知其他線(xiàn)程,
#官方文檔中提供的生產(chǎn)者消費(fèi)者模型
# Consume one item
cv.acquire()
while not an_item_is_available():
cv.wait()
get_an_available_item()
cv.release()
# Produce one item
cv.acquire()
make_an_item_available()
cv.notify()
cv.release()
#threading.Condition類(lèi)
thread.Condition([lock])
可選參數(shù)lock: 必須是Lock或者RLock對(duì)象, 并被作為underlying鎖(悲觀鎖?), 否則, 會(huì)創(chuàng)建一個(gè)新的RLock對(duì)象作為underlying鎖
類(lèi)方法:
acquire() #獲得鎖
release() #釋放鎖
wait([timeout]) #持續(xù)等待直到被notify()或者notifyAll()通知或者超時(shí)(必須先獲得鎖),
#wait()所做操作, 先釋放獲得的鎖, 然后阻塞, 知道被notify或者notifyAll喚醒或者超時(shí), 一旦被喚醒或者超時(shí), 會(huì)重新獲取鎖(應(yīng)該說(shuō)搶鎖), 然后返回
notify() #喚醒一個(gè)wait()阻塞的線(xiàn)程.
notify_all()或者notifyAll() #喚醒所有阻塞的線(xiàn)程
參考程序可以查看上面的生產(chǎn)者消費(fèi)者程序