# -*- coding: utf-8 -*-
# @Time : 2019/11/28 16:42
# @Author : John
# @Email : 2398344557@qq.com
# @File : 111.py
# @Software: PyCharm
當使用單線程時,耗費時間特長
【例】單線程
import time
def run(n):
print("task ", n)
time.sleep(2)
start_time = time.time()
run("1")
run("2")
print(f'執(zhí)行所消耗的時間:{time.time() - start_time}秒')
# task 1
# task 2
# 執(zhí)行所消耗的時間:4.00098729133606秒
執(zhí)行上面這個程序要4秒,如果用多線程的話,函數(shù)“同時執(zhí)行”,只需一半的時間即可?。?!因此我們要引入多線程概念 ==>
【例】啟動多個線程(函數(shù)方式)
import threading
import time
def run(n):
print(f'任務{n}')
time.sleep(2)
# run('1')
# run('2')
start = time.time()
# t1和t2同時執(zhí)行
t1 = threading.Thread(target=run, args=('1',))
t2 = threading.Thread(target=run, args=('2',))
t1.start()
t2.start()
# 阻塞作用,等t1和t2執(zhí)行完畢后再執(zhí)行下面的內(nèi)容
t1.join()
t2.join()
print('finished')
end = time.time()
print(f'此程序執(zhí)行時間為:{end-start}秒')
# 任務1
# 任務2
# finished
# 此程序執(zhí)行時間為:2.001213788986206秒
【例】啟動多個線程(類方式)
import threading
import time
class MyThread(threading.Thread):
def __init__(self, n, sleep_time):
super().__init__()
self.n = n
self.sleep_time = sleep_time
def run(self):
print(f'任務{self.n}開始')
time.sleep(self.sleep_time)
print(f'任務{self.n}結束了')
t1 = MyThread(1, 1) # 傳入n=1,sleep_time=1
t2 = MyThread(2, 2) # 傳入n=2,sleep_time=2
t1.start()
t2.start()
t1.join()
t2.join()
print('finished')
# 任務1開始
# 任務2開始
# 任務1結束了
# 任務2結束了
# finished
【例】同時啟動20個線程
import threading
import time
def run(n):
print(f'任務{n}開始')
time.sleep(2)
print(f'任務{n}結束了')
start_time = time.time()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()
print("----------all threads has finished...")
print("消耗的時間:", time.time() - start_time)
# 結果略
# 分析:
# 主線程直接結束,沒有等子線程,2s后子線程分別task done
# 代碼共有51個線程,一個主線程與50個子線程,主線程無法計算子線程執(zhí)行時間
# 因此,我們需要設置主線程等待子線程執(zhí)行結束,通過一個臨時列表,在線程啟動后分別join等待,子線程分別結束后,結束主進程,計算耗時約2.011415958s
【例】計算所有線程執(zhí)行時間
import threading
import time
def run(n):
print(f'任務{n}')
time.sleep(2)
print(f'任務{n}已完成')
start_time = time.time()
t_objs = [] # 存線程實例
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()
t_objs.append(t) # 為了不阻塞后面線程的啟動,不在這里join,先放到一個列表里
for t in t_objs:
print(t.is_alive())
for t in t_objs: # 循環(huán)線程實例列表,等待所有線程執(zhí)行完畢
t.join()
for t in t_objs:
print(t.is_alive()) # is_alive()方法可以用來判斷一個線程是否結束,返回True或False
print("----------all threads has finished...")
print("消耗的時間:", time.time() - start_time)
結果如下:
任務0
任務1
任務2
任務3
任務4
任務5
任務6
任務7
任務8
任務9True
True
True
True
True
True
True
True
True
True
任務1已完成任務2已完成任務3已完成
任務0已完成
任務7已完成任務6已完成
任務5已完成
任務9已完成
任務4已完成
任務8已完成
False
False
False
False
False
False
False
False
False
False
----------all threads has finished...
消耗的時間: 2.12958025932312
【例】根據(jù)當前線程(Thread)活著的數(shù)量來查看線程生命周期
import threading
import time
def sing():
for i in range(3):
print('正在唱歌。。。。%d' % i)
# time.sleep(random.random()*3)
time.sleep(1)
def dance():
for i in range(3):
print('正在跳舞。。。。%d' % i)
# time.sleep(random.random()*5)
time.sleep(1)
if __name__=='__main__':
# print('晚會開始:%s'%time.time())返回的是長串的時間戳
print('晚會開始:%s' % time.ctime())
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
t1.start()
t2.start()
while True:
length = len(threading.enumerate()) # 枚舉返回個列表
print('當前運行的線程數(shù)為:%d' % length)
time.sleep(0.7)
if length <= 1:
break
結果如下:
晚會開始:Sun Dec 8 17:12:54 2019
正在唱歌。。。。0
正在跳舞。。。。0
當前運行的線程數(shù)為:3
當前運行的線程數(shù)為:3
正在唱歌。。。。1
正在跳舞。。。。1
當前運行的線程數(shù)為:3
正在唱歌。。。。2
正在跳舞。。。。2
當前運行的線程數(shù)為:3
當前運行的線程數(shù)為:3
當前運行的線程數(shù)為:1
多線程互斥鎖
作用:為了防止不同的線程訪問同一共享資源造成混亂
import threading
# 生成鎖對象,全局唯一
lock = threading.Lock()
# 獲取鎖。未獲取到會阻塞程序,直到獲取到鎖才會往下執(zhí)行
lock.acquire()
# 釋放鎖,歸回倘,其他人可以拿去用了
lock.release()
# 注:lock.acquire() 和 lock.release()必須成對出現(xiàn),否則就有可能造成死鎖
# 可以使用使用上下文管理器來加鎖(常用)
# with 語句會在這個代碼塊執(zhí)行前自動獲取鎖,在執(zhí)行結束后自動釋放鎖
import threading
lock = threading.Lock()
with lock:
pass
不上鎖的時候,對少量數(shù)據(jù)的修改有一定的作用;
但對大量數(shù)據(jù)的修改,不上鎖的話會出現(xiàn)資源競爭問題,從而數(shù)據(jù)結果會不正確
【例】測試1:未上鎖前,用多線程對全局變量進程修改
import threading
import time
g_num = 0
def work1(num):
global g_num
for i in range(num):
g_num += 1
print("----in work1, g_num is %d---" % g_num)
def work2(num):
global g_num
for i in range(num):
g_num += 1
print("----in work2, g_num is %d---" % g_num)
print("---線程創(chuàng)建之前g_num is %d---" % g_num)
t1 = threading.Thread(target=work1, args=(100,))
t1.start()
t2 = threading.Thread(target=work2, args=(100,))
t2.start()
while len(threading.enumerate()) != 1:
time.sleep(1)
print("2個線程對同一個全局變量操作之后的最終結果是:%s" % g_num)
# ---線程創(chuàng)建之前g_num is 0---
# ----in work1, g_num is 100---
# ----in work2, g_num is 200---
# 2個線程對同一個全局變量操作之后的最終結果是:200
【例】測試2:未上鎖前,用多線程對全局變量進程修改
import threading
import time
g_num = 0
def work1(num):
global g_num
for i in range(num):
g_num += 1
print("----in work1, g_num is %d---" % g_num)
def work2(num):
global g_num
for i in range(num):
g_num += 1
print("----in work2, g_num is %d---" % g_num)
print("---線程創(chuàng)建之前g_num is %d---"%g_num)
t1 = threading.Thread(target=work1, args=(1000000,))
t1.start()
t2 = threading.Thread(target=work2, args=(1000000,))
t2.start()
while len(threading.enumerate()) != 1:
time.sleep(1)
print("2個線程對同一個全局變量操作之后的最終結果是:%s" % g_num)
# ---線程創(chuàng)建之前g_num is 0---
# ----in work1, g_num is 1208926---
# ----in work2, g_num is 1264177---
# 2個線程對同一個全局變量操作之后的最終結果是:1264177
死鎖
在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源并且同時等待對方的資源,就會造成死鎖,一旦發(fā)生就會造成應用的停止響應。
【例】死鎖
#coding=utf-8
import threading
import time
class MyThread1(threading.Thread):
def run(self):
# 對mutexA上鎖
mutexA.acquire()
# mutexA上鎖后,延時1秒,等待另外那個線程 把mutexB上鎖
print(self.name+'----do1---up----')
time.sleep(1)
# 此時會堵塞,因為這個mutexB已經(jīng)被另外的線程搶先上鎖了
mutexB.acquire()
print(self.name+'----do1---down----')
mutexB.release()
# 對mutexA解鎖
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
# 對mutexB上鎖
mutexB.acquire()
# mutexB上鎖后,延時1秒,等待另外那個線程 把mutexA上鎖
print(self.name+'----do2---up----')
time.sleep(1)
# 此時會堵塞,因為這個mutexA已經(jīng)被另外的線程搶先上鎖了
mutexA.acquire()
print(self.name+'----do2---down----')
mutexA.release()
# 對mutexB解鎖
mutexB.release()
mutexA = threading.Lock()
mutexB = threading.Lock()
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
t1.join()
t2.join()
print('done')
# Thread-1----do1---up----
# Thread-2----do2---up----
# (程序沒有結束)
# 分析:
# 對mutexA和mutexB同時上鎖,但沒有釋放他們時,那就不能對他們繼續(xù)訪問,他們現(xiàn)在是阻塞狀態(tài)。
GIL(全局鎖)
區(qū)分多進程和多線程:
(1)多進程是真正意義上的并行,
(2)而多線程只是偽并行(交替執(zhí)行)多線程偽并行(交替執(zhí)行)的原因:
GIL(Global Interpreter Lock,全局解釋器鎖)分析:
任何Python線程執(zhí)行前,必須先獲得GIL鎖,
然后每執(zhí)行100條字節(jié)碼時,解釋器就會自動釋放GIL鎖,讓別的線程有機會執(zhí)行。
而這個GIL全局鎖實際上把所有線程的執(zhí)行代碼都上鎖了。
因此,多線程在Python中只能交替執(zhí)行,
即使100個線程拋在100核CPU上,也只能用到1個核。注:
GIL并不是Python的特性,它是實現(xiàn)Python解釋器(CPython)時所引入的一個概念。
而Python解釋器并不只有CPython,還有PyPy,Psyco,JPython,IronPython等我們通常認為Python == CPython,所以也就默許了Python具有GIL鎖
GIL影響性能,如何避免受到GIL影響?
(1)使用多線程代替多線程(常用)
(2)更換Python解釋器,不使用CPython(不怎么用,因為CPthon挺好用的,真香~~)
Queue隊列
控制線程的觸發(fā)執(zhí)行 — — 用Queue隊列
格式:
from queue import Queue
q = Queue(maxsize=0) # maxsize默認為0,此時隊列長度不受限
q.get() # 等待隊列信息
q.get(timeout=5) # 設置超時時間,時間到之后執(zhí)行其他
q.put() # 發(fā)送信息
q.join() # 等待所有的消息被消費完
【例】生產(chǎn)者消費者
import random
import threading
import time
import queue
q = queue.Queue(maxsize=10) # 設置隊列的最大長度為10
# 生產(chǎn)者
def producer(name):
count = 1
while True:
q.put("雪碧%s" % count)
print("[Timmy]生產(chǎn)了雪碧", count)
count += 1
time.sleep(random.randrange(3))
# 消費者
def consumer(name):
while True:
print("[%s]取到[%s]并且喝了它..." % (name, q.get()))
time.sleep(random.randrange(5))
p = threading.Thread(target=producer, args=("Timmy",))
c1 = threading.Thread(target=consumer, args=("King",))
c2 = threading.Thread(target=consumer, args=("Wang",))
p.start()
c1.start()
c2.start()
結果如下:
[Timmy]生產(chǎn)了雪碧 1
[Timmy]生產(chǎn)了雪碧 2
[King]取到[雪碧1]并且喝了它...
[Wang]取到[雪碧2]并且喝了它...
[Timmy]生產(chǎn)了雪碧 3
[Wang]取到[雪碧3]并且喝了它...
[Timmy]生產(chǎn)了雪碧 4
[King]取到[雪碧4]并且喝了它...
[Timmy]生產(chǎn)了雪碧 5
[Wang]取到[雪碧5]并且喝了它...
[Timmy]生產(chǎn)了雪碧 6
[Timmy]生產(chǎn)了雪碧 7
[Wang]取到[雪碧6]并且喝了它...
[Wang]取到[雪碧7]并且喝了它...
# (程序沒有結束)
線程池
存儲線程,需要用的時候調出線程
跑完任務之后,這些線程不會被銷毀,而是返回到線程池等待下一次任務
- submit():返回一個future對象
- future對象:在未來的某一時刻完成操作的對象
【例】創(chuàng)建線程池
import time
from concurrent.futures.thread import ThreadPoolExecutor
# 線程執(zhí)行的函數(shù)
def add(n1,n2):
v = n1 + n2
print('add :', v, ', tid:', threading.currentThread().ident)
time.sleep(n1)
return v
# 通過submit把需要執(zhí)行的函數(shù)扔進線程池中.
# submit 直接返回一個future對象
ex = ThreadPoolExecutor(max_workers=3) # 制定最多運行N個線程
f1 = ex.submit(add, 2, 3)
f2 = ex.submit(add, 2, 2)
print('main thread running')
print(f1.done()) # future對象名.done():看看任務結束了沒
print(f2.done())
print(f1.result()) # future對象名.result():獲取結果 ,阻塞方法
print(f2.result())
print(f2.done())
print(f1.done())
# add : 5 , tid: 6324
# add : 4 , tid: 6712
# main thread running
# False
# False
# 5
# 4
# True
# True
- map():返回是跟你提交序列是一致的,是有序的
【例】map()的使用
import requests
from concurrent.futures.thread import ThreadPoolExecutor
URLS = ['http://www.sina.com.cn', 'http://www.baidu.com', 'http://www.qq.com']
def get_html(url):
print('thread id:', threading.currentThread().ident, ' 訪問了:', url)
return requests.get(url) # 這里使用了requests 模塊
ex = ThreadPoolExecutor(max_workers=3) # 制定最多運行3個線程
# thread id: 5596 訪問了: http://www.sina.com.cn
# thread id: 8968 訪問了: http://www.baidu.com
# thread id: 4316 訪問了: http://www.qq.com
# sumbit()函數(shù):
lst = []
for i in range(3):
f = ex.submit(get_html, URLS[i]) # f為future對象,提交一個任務,放入線程池中,準備執(zhí)行
lst.append(f)
print(lst)
# [<Future at 0x1330990 state=running>, <Future at 0x3015370 state=running>, <Future at 0x3015870 state=running>]
# map()函數(shù):
res_iter = ex.map(get_html, URLS) # 返回生成器res_iter
for res in res_iter:
print(res.url)
# 分別獲取sina、baidu、qq的網(wǎng)頁網(wǎng)址
# https://www.sina.com.cn/
# http://www.baidu.com/
# https://www.qq.com/
for res in res_iter:
print(res.text)
# 分別獲取sina、baidu、qq的網(wǎng)頁源代碼
# 略
# - as_completed()函數(shù):
from concurrent.futures import as_completed
# 下面用到的f,是上面第一個循環(huán)里面的futrue對象
for future in as_completed([f]): # as_completed()接受一個可迭代的Future序列,返回一個生成器,在完成或異常時返回這個Future對象
print('一個任務完成。')
print(future.result())
# thread id: 5484 訪問了: http://www.sina.com.cn
# 一個任務完成。
# <Response [200]>
# thread id: 5484 訪問了: http://www.baidu.com
# 一個任務完成。
# <Response [200]>
# thread id: 10280 訪問了: http://www.qq.com
# 一個任務完成。
# <Response [200]>
【例】as_completed 完整的例子
# as_completed 返回一個生成器,用于迭代, 一旦一個線程完成(或失敗) 就返回
import time
import requests
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures import as_completed
URLS = ['http://www.baidu.com', 'http://www.qq.com', 'http://www.sina.com.cn']
def get_html(url):
time.sleep(1)
print('thread id:', threading.currentThread().ident,' 訪問了:', url)
return requests.get(url) # 這里使用了requests 模塊
ex = ThreadPoolExecutor(max_workers=3) # 最多3個線程
future_tasks = [ex.submit(get_html, url) for url in URLS] #創(chuàng)建3個future對象
for future in as_completed(future_tasks): # 迭代生成器
try:
resp = future.result()
except Exception as e:
print('%s' % e)
else:
print('%s has %d bytes!' % (resp.url, len(resp.text)))
# thread id: 4912 訪問了: http://www.sina.com.cn
# thread id: 9060 訪問了: http://www.qq.com
# thread id: 5644 訪問了: http://www.baidu.com
# http://www.baidu.com/ has 2381 bytes!
# https://www.qq.com/ has 223356 bytes!
# https://www.sina.com.cn/ has 543959 bytes!
wait:阻塞函數(shù)
第一個參數(shù)和as_completed一樣,一個可迭代的future序列,返回一個元組,包含2個set,一個完成的,一個未完成的
【例】wait()函數(shù)的使用
import time
import requests
from concurrent.futures import wait, as_completed, FIRST_COMPLETED, ALL_COMPLETED
from concurrent.futures.thread import ThreadPoolExecutor
import threading
from concurrent import futures
URLS = ['http://www.baidu.com', 'http://www.qq.com', 'http://www.sina.com.cn']
def get_html(url):
"""
FIRST_COMPLETED 當任何未來完成或被取消時,該函數(shù)將返回。
FIRST_EXCEPTION 當任何未來通過提出異常完成時,函數(shù)將返回。如果沒有未來引發(fā)異常,那么它等同于 ALL_COMPLETED。
ALL_COMPLETED(默認) 當所有期貨完成或被取消時,函數(shù)將返回。
:param url:
:return:
"""
time.sleep(1)
print('thread id:', threading.currentThread().ident, ' 訪問了:', url)
return requests.get(url) # 這里使用了requests 模塊
ex = ThreadPoolExecutor(max_workers=3) # 最多3個線程
future_tasks = [ex.submit(get_html, url) for url in URLS] # 創(chuàng)建3個future對象
try:
result = wait(future_tasks, return_when=futures.ALL_COMPLETED)
done_set = result[0]
for future in done_set:
resp = future.result()
print('第一個網(wǎng)頁任務完成 url:%s , len:%d bytes! ' % (resp.url, len(resp.text)))
except Exception as e:
print('exception :', e)
# return_when=futures.FIRST_COMPLETED
# thread id: 11404 訪問了: http://www.sina.com.cn
# thread id: 10196 訪問了: http://www.baidu.com
# thread id: 2588 訪問了: http://www.qq.com
# 第一個網(wǎng)頁任務完成 url:http://www.baidu.com/ , len:2381 bytes!
# return_when=futures.FIRST_EXCEPTION
# thread id: 4776 訪問了: http://www.sina.com.cn
# thread id: 12104 訪問了: http://www.baidu.com
# thread id: 10692 訪問了: http://www.qq.com
# 第一個網(wǎng)頁任務完成 url:https://www.sina.com.cn/ , len:544154 bytes!
# 第一個網(wǎng)頁任務完成 url:https://www.qq.com/ , len:223122 bytes!
# 第一個網(wǎng)頁任務完成 url:http://www.baidu.com/ , len:2381 bytes!
# return_when=futures.ALL_COMPLETED
# thread id: 4612 訪問了: http://www.sina.com.cn
# thread id: 11460 訪問了: http://www.qq.com
# thread id: 7744 訪問了: http://www.baidu.com
# 第一個網(wǎng)頁任務完成 url:http://www.baidu.com/ , len:2381 bytes!
# 第一個網(wǎng)頁任務完成 url:https://www.sina.com.cn/ , len:544154 bytes!
# 第一個網(wǎng)頁任務完成 url:https://www.qq.com/ , len:223122 bytes!
回調函數(shù):add_done_callback(fn)
import os, sys, time, requests, threading
from concurrent import futures
URLS = [
'http://baidu.com',
'http://www.qq.com',
'http://www.sina.com.cn'
]
def load_url(url):
print('tid:', threading.currentThread().ident, ',url:', url)
with requests.get(url) as resp:
return resp.content
def call_back(obj):
print('->>>>>>>>>call_back , tid:', threading.currentThread().ident, ',obj:', obj)
with futures.ThreadPoolExecutor(max_workers=3) as ex:
# mp = {ex.submit(load_url,url) : url for url in URLS}
mp = dict()
for url in URLS:
f = ex.submit(load_url, url)
mp[f] = url
f.add_done_callback(call_back)
for f in futures.as_completed(mp):
url = mp[f]
try:
data = f.result()
except Exception as exc:
print(exc, ',url:', url)
else:
print('url:', url, ',len:', len(data), ',data[:20]:', data[:20])
# tid: 6036 ,url: http://baidu.com
# tid: 6044 ,url: http://www.qq.com
# tid: 3864 ,url: http://www.sina.com.cn
# ->>>>>>>>>call_back , tid: 6036 ,obj: <Future at 0x2f076d0 state=finished returned bytes>
# url: http://baidu.com ,len: 81 ,data[:20]: b'<html>\n<meta http-eq'
# ->>>>>>>>>call_back , tid: 6044 ,obj: <Future at 0x30158b0 state=finished returned bytes>
# url: http://www.qq.com ,len: 237492 ,data[:20]: b'<!doctype html>\n<htm'
# ->>>>>>>>>call_back , tid: 3864 ,obj: <Future at 0x3015c70 state=finished returned bytes>
# url: http://www.sina.com.cn ,len: 544154 ,data[:20]: b'<!DOCTYPE html>\n<!--'