過濾列表中的數(shù)據(jù)
實(shí)際案例:
過濾掉列表里面的負(fù)數(shù)
案例分析:
- filter(function or None, iterable) py2返回一個(gè)列表,py3返回一個(gè)迭代器
- 列表解析
- 使用
timeit來測(cè)試函數(shù)的運(yùn)行時(shí)間
案例代碼:
# python3
from random import randint
data = [randint(-10, 10) for _ in range(10)]
# 方法一,直接迭代
new_data0 = []
for x in data:
if x >= 0:
new_data0.append(X)
# 方法二,使用filter函數(shù)
new_data1 = list(filter(lambda x: x>=10, data))
# 使用timeit測(cè)試函數(shù)運(yùn)行時(shí)間
# 00000 loops, best of 3: 1.45 μs per loop
# 方法三,使用列表解析
new_data2 = [x for x in data if x >= 0]
# 使用timeit測(cè)試函數(shù)運(yùn)行時(shí)間
# 1000000 loops, best of 3: 559 ns per loop
# new_data0, new_data1, new_data2
# [8, 0, 1, 10, 9]
結(jié)論:
? 運(yùn)行速度:列表解析 > filter函數(shù) > 直接迭代
過濾字典,集合中的數(shù)據(jù)
- 字典解析
# python3
from random impory randint
data = {x: randint(60, 100) for x in range(1, 21)}
new_data = {key: value for key, value in data.items() if value >= 80}
# new_data
# {16: 80, 1: 100, 2: 96, 17: 86, 11: 81, 12: 83, 15: 87}
- 集合解析
# python3
from random import randint
data = [randint(-10, 10) for _ in range(10)]
s = set(data)
new_data = {x for x in s if x % 3 == 0}
# new_data
# {-6, 3, 6}
為元組的每個(gè)元素命名
元組的優(yōu)勢(shì):
- 存儲(chǔ)空間小
- 訪問速度快
實(shí)際案例:
學(xué)生信息系統(tǒng)中數(shù)據(jù)為固定格式:
(名字,年齡,性別,郵箱地址,...)
學(xué)生數(shù)量很大為了減小存儲(chǔ)的開銷,對(duì)每個(gè)學(xué)生信息用元組表示:
(name='jim', age=16, sex='male', email='jim@163.com')
(name='jack', age=16, sex='male', email='jack@163.com')
(name='bob', age=16, sex='male', email='bob@163.com')
......
訪問是,我們使用索引(index)訪問,大量索引降低程序的可讀性。如何解決這個(gè)問題?
# 輸出一個(gè)學(xué)生的信息
# python3
# 使用內(nèi)置庫
from collections import namedtuple
# 創(chuàng)建一個(gè)有名字的元組,元組名:'Student'
Student = namedtuple('Student', ['name', 'age', 'sex', 'email'])
# 位置傳參
s1 = Student實(shí)際案例:
過濾掉列表里面的負(fù)數(shù)
# 關(guān)鍵字傳參
s2 = Student(name='jim', age=16, sex='male', email='jim@163.com')
# 可以通過類的屬性來訪問
print(s1.name) # 'jim'
print(s1.age) # 16
print(s1.sex) # male
print(s1.email) # 'jim@163.com'
# s1 是元組類的一個(gè)子類
isinstance(s1, tuple) # True
# 常規(guī)方法, 借鑒C語言
NAME, AGE, SEX, EMAIL = range(4)
student = ('jim', 16, 'male', 'jim@163.com')
# name
print(student[NAME]) # 'jim'
# age
print(student[AGE]) # 16
# sex
print(student[SEX]) # male
# email
print(student[EMAIL]) # 'jim@163.com'
統(tǒng)計(jì)序列中元素出現(xiàn)的頻度
實(shí)際案例:
- 某隨機(jī)序列[12, 5, 9, 8, 7, ....]中,找到出現(xiàn)次數(shù)最高的三個(gè)元素,它們出現(xiàn)的次數(shù)是多少?
- 對(duì)某英文文章的單詞,進(jìn)行詞頻統(tǒng)計(jì),找到出現(xiàn)次數(shù)最高的10個(gè)單詞,它們出現(xiàn)的次數(shù)是多少?
使用collections.Counter對(duì)象
- 將序列傳入Counter的構(gòu)造器,得到Counter對(duì)象是元素頻度的字典{元素值: 頻數(shù)}
- Counter.most_common(n) 方法得到平度最高的n個(gè)元素為元素和頻數(shù)的列表
案例一:
# python3
from random import randint
data = [randint(0, 20) for _ in range(30)]
c1 = dict.fromkeys(data, 0)
from collections import Counter
c2 = Counter(data)
# 找到出現(xiàn)頻度最高的三個(gè)元素
c2.most_common(3)
# 常規(guī)方法
for x in data:
c[x] += 1
案例二:
# python3
import re
from collections import Counter
txt = open('a.txt').read()
count = Counter(re.split('\W+', txt))
count.most_common(3)
根據(jù)字典中值的大小,對(duì)字典進(jìn)行排序
iter(iterobj) 返回可迭代對(duì)象的信息
sorted函數(shù):sorted(iterable[, cmp[, key[, reverse]]])
參數(shù)說明:
- iterable -- 可迭代對(duì)象。
- cmp -- 比較的函數(shù),這個(gè)具有兩個(gè)參數(shù),參數(shù)的值都是從可迭代對(duì)象中取出,此函數(shù)必須遵守的規(guī)則為,大于則返回1,小于則返回-1,等于則返回0。
- key -- 主要是用來進(jìn)行比較的元素,只有一個(gè)參數(shù),具體的函數(shù)的參數(shù)就是取自于可迭代對(duì)象中,指定可迭代對(duì)象中的一個(gè)元素來進(jìn)行排序。
- reverse -- 排序規(guī)則,reverse = True 降序 , reverse = False 升序(默認(rèn))。
實(shí)際案例:
某班英語成績(jī)以字典形式存儲(chǔ)為:{'LiLei': 79, 'Jim': 88, 'Lucy': 92, ...}
根據(jù)成績(jī)高低,計(jì)算學(xué)生排名。
解決方案:使用內(nèi)置函數(shù)sorted
- 利用zip將字典數(shù)據(jù)轉(zhuǎn)化為元組
from random import randint
data = {x: randint(60, 100)for x in 'xyzabc'}
# 原理:元組的比較方式:從第一個(gè)元素開始,逐個(gè)開始比較
sorted(list(zip(data.values(), data.keys())))
# [(62, 'y'), (76, 'c'), (88, 'a'), (92, 'x'), (96, 'b'), (99, 'z')]
- 傳遞sorted函數(shù)的key參數(shù)
from random import randint
data = {x: randint(60, 100)for x in 'xyzabc'}
sorted(data.items(), key=lambda x: x[1])
# [('y', 62), ('c', 76), ('a', 88), ('x', 92), ('b', 96), ('z', 99)]
如何快速找到多個(gè)字典中的公共鍵
map函數(shù)和reduce函數(shù):
map(func, *iterable)
在Python 3里,reduce()函數(shù)已經(jīng)被從全局名字空間里移除了,它現(xiàn)在被放置在fucntools模塊里 用的話要 先引 入: from functools import reduce
參數(shù)說明:
- function -- 函數(shù),有兩個(gè)參數(shù)
- iterable -- 一個(gè)或多個(gè)序列
返回值:
- Python 2.x 返回列表。
- Python 3.x 返回迭代器。
reduce(function, iterable[, initial])
參數(shù)說明:
- function -- 函數(shù),有兩個(gè)參數(shù)
- iterable -- 可迭代對(duì)象
- initializer -- 可選,初始參數(shù)
返回值:
- 返回函數(shù)計(jì)算結(jié)果。
function參數(shù)是一個(gè)有兩個(gè)參數(shù)的函數(shù),reduce依次從sequence中取一個(gè)元素,和上一次調(diào)用function的結(jié)果做參數(shù)再次調(diào)用function。
第一次調(diào)用function時(shí),如果提供initial參數(shù),會(huì)以sequence中的第一個(gè)元素和initial作為參數(shù)調(diào)用function,否則會(huì)以序列sequence中的前兩個(gè)元素做參數(shù)調(diào)用function
實(shí)際案例:
西班牙足球甲級(jí)聯(lián)賽,每輪球員進(jìn)球統(tǒng)計(jì):
第一輪:{'蘇亞雷斯': 1, '梅西': 2, '本澤馬': 1, 'C羅': 3, ...}
第二輪:{'蘇亞雷斯': 2, 'C羅': 1, '格里茲曼': 2, '貝爾': 1, ...}
第三輪:{'蘇亞雷斯': 1, '托雷斯': 2, '貝爾': 1, '內(nèi)馬爾': 1, ...}
......
統(tǒng)計(jì)出前N輪,每場(chǎng)比賽都有進(jìn)球的球員
案例分析:
字典中的key表示每一次進(jìn)球的球員 --> 求出所有字典里面的公共key
解決方案:利用集合的交集操作
- 使用字典的keys()方法,得到一個(gè)字典keys的集合
- 使用map函數(shù),得到所有字典的keys集合
- 使用reduce函數(shù),取所有字典的keys的集合的交集
# python3
from functools import reduce
from random import randint, sample
# 生成三個(gè)隨機(jī)字典
s_list = [{x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))} for _ in range(3)]
reduce(lambda a, b: a & b, map(dict.keys, s_list))
一般處理方法:
from random import randint, sample
# 生成三個(gè)隨機(jī)字典
s1 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))}
s2 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))}
s3 = {x: randint(1, 4) for x in sample('abcdefg', randint(3, 6))}
res = []
for k in s1:
if k in s2 and k in s3:
res.append(s2)
如何讓字典保持有序
OrderedDict函數(shù):
OrderedDict的Key會(huì)按照插入的順序排列,不是Key本身排序。OrderedDict可以實(shí)現(xiàn)一個(gè)FIFO(先進(jìn)先出)的dict,當(dāng)容量超出限制時(shí),先刪除最早添加的Key
實(shí)際案例:
某編程競(jìng)賽系統(tǒng),對(duì)參數(shù)選手編程解題進(jìn)行計(jì)時(shí),選手完成題目后,把該選手解題用時(shí)記錄到字典中,以便賽后按選手名查詢成績(jī)。
(答題用時(shí)越短,成績(jī)?cè)絻?yōu))
{’Lilie': (2, 43), 'HanMeimei': (5,52), 'Jim': (1, 39) ...}
比賽結(jié)束后,須按排名順序依次打印選手成績(jī),如何實(shí)現(xiàn)?
案例分析:
以進(jìn)入字典的為選手的排名 --> 按進(jìn)入字典的順序輸出選手的比賽信息
使用collections.OrderedDict,以O(shè)rderedDict代替內(nèi)置字典Dict,一次將選手的成績(jī)存入OrderedDict
案例代碼:
# python3
from collections import OrderedDict
from random import randint
import time
d = OrderedDict()
players = list('ABCDEFGH') # 模擬選手A,B,C,D,E,F,G,H
start = time.time() # 記錄比賽開始時(shí)間
for i in range(8):
input('>>>')
p = players.pop(randint(0, 7 - i))
end = time.time() # 每個(gè)選手完成比賽的時(shí)間
d[p] = (i + 1, end - start)
實(shí)現(xiàn)用戶的歷史記錄功能(最多n條)
**deque函數(shù):from collections import deque
- 創(chuàng)建
deque序列: d=deque() -
deque提供了類似list的操作方法: d.append(3) ;d.append(8);d.append(1); 此時(shí)d=deque([3,8,1]) - 兩端都使用pop:
-
d.pop()--> 拋出隊(duì)列中的最后一個(gè)元素 -
d.leftpop()--> 拋出隊(duì)列中的第一個(gè)元素
-
- 限制deque的長度:d=deque(maxlen=20)
**pickle模塊:
-
pickle.dump(obj, file, protocol=None, *, fix_imports=True)- 參數(shù)說明:
- obj --> python對(duì)象
- file --> python中的文件對(duì)象
- protocol --> 可選的協(xié)議參數(shù)
- 如果fix_imports為true且protocol小于3,pickle會(huì)嘗試將新的Python 3名稱映射到Python 2中使用的舊模塊名稱,以便pickle數(shù)據(jù)流可以用Python 2讀取。
說明:
序列化對(duì)象,并將結(jié)果數(shù)據(jù)流寫入到文件對(duì)象中。參數(shù)protocol是序列化模式,默認(rèn)值為0,表示以文本的形式序列化。protocol的值還可以是1或2,表示以二進(jìn)制的形式序列化。
- 參數(shù)說明:
-
pickle.dumps(obj, protocol=None, fix_imports=True)將對(duì)象的pickle d表示形式作為
bytes對(duì)象返回,而不是將其寫入文件。 -
pickle.load(file, *, fix_imports = True, encoding ="ASCII", errors ="strict ")從打開的
文件對(duì)象文件中*讀取一個(gè)pickle d對(duì)象表示并返回其中指定的重組對(duì)象層次結(jié)構(gòu)。 -
pickle.loads(bytes_object, *, fix_imports = True, encoding ="ASCII", errors ="strict ")從對(duì)象中讀取pickle d對(duì)象層次結(jié)構(gòu)
bytes并返回其中指定的重組對(duì)象層次結(jié)構(gòu)。可選參數(shù):fix_imports, encoding, errors ---->
? 用于控制由Python 2生成的pickle stream的兼容性支持。如果fix_imports為true,pickle將嘗試將舊的Python 2名稱映射到Python 3中使用的新名稱。編碼和 錯(cuò)誤告訴pickle如何解碼由Python 2腌制的8位字符串實(shí)例; 這些默認(rèn)值分別為'ASCII'和'strict'。該編碼可以是“字節(jié)”作為字節(jié)對(duì)象讀取這些8位串的實(shí)例。
實(shí)際案例:
很多應(yīng)用程序都有瀏覽用戶的歷史記錄的功能,
例如:
瀏覽器可以查看最近訪問過的網(wǎng)頁。
視頻播放器可以查看最近播放過的視頻文件。
Shell可以查看用戶輸入過的命令
......
現(xiàn)在我們制作了一個(gè)簡(jiǎn)單的猜數(shù)字小游戲,添加歷史記錄的功能,顯示用戶最近猜過的數(shù)字,如何實(shí)現(xiàn)?
解決方案:使用容量為n的隊(duì)列存儲(chǔ)歷史記錄
- 使用標(biāo)準(zhǔn)庫collections中的deque,它是一個(gè)雙端循環(huán)隊(duì)列。
- 程序退出前,可以使用pickle將隊(duì)列對(duì)象存入文件,再次運(yùn)行程序時(shí)將其導(dǎo)入。
from collections import deque
from random import randint
N = randint(0, 100)
history = deque([], 5)
def guess(num):
if num == N:
print("right")
return True
if num < N:
print("{} is less than num".format(num))
else:
print("{} is greater than num".format(num))
return False
while True:
line = input('Please input a number: ')
if line.isdigit():
k = int(line)
history.append(k)
if guess(k):
break
elif line == 'history' or line == 'h?':
print(list(history))
實(shí)現(xiàn)可迭代對(duì)象和迭代器對(duì)象
實(shí)際案例:
某軟件要求,從網(wǎng)絡(luò)抓取各個(gè)城市氣溫信息,并依次顯示:
北京:15~20
天津:17~22
長春:12~18
......
如果一次抓取所有城市天氣再顯示,顯示第一個(gè)城市氣溫時(shí),有很高的延時(shí),并且浪費(fèi)存儲(chǔ)空間,我們期望以 “用時(shí)訪問” 的策略,并且能把所有城市氣溫封裝到一個(gè)對(duì)象里, 可用for語句進(jìn)行迭代,怎么解決?
案例思路:
- 實(shí)現(xiàn)一個(gè)迭代器對(duì)象WheatherIterator,next方法每次返回一個(gè)城市的氣溫。
- 實(shí)現(xiàn)一個(gè)可迭代對(duì)象WeatherIterable,__iter__方法返回一個(gè)迭代器對(duì)象。
案例代碼:
# python3
import requests
from collections import Iterable, Iterator
# 實(shí)現(xiàn)一個(gè)天氣的迭代器對(duì)象
class WeatherIterator(Iterator):
def __init__(self, cities):
self.cities = cities
self.index = 0
def getWhether(self, city):
response = requests.get('http://wthrcdn.etouch.cn/weather_mini?city=' + city)
data = response.json()['data']['forecast'][0]
return "{}: {}, {}".format(city, data['low'], data['high'])
# python2 中直接使用next()方法
def __next__(self):
# 當(dāng)索引越界的時(shí)候觸發(fā)異常
if self.index == len(self.cities):
raise StopIteration
city = self.cities[self.index]
self.index += 1
return self.getWhether(city)
# 實(shí)現(xiàn)天氣的可迭代對(duì)象
class WeatherIterable(Iterable):
def __init__(self, cities):
self.cities = cities
def __iter__(self):
return WeatherIterator(self.cities)
for x in WeatherIterator(['北京', '上海', '廣州']):
print(x)
# 北京: 低溫 23℃, 高溫 35℃
# 上海: 低溫 23℃, 高溫 29℃
# 廣州: 低溫 27℃, 高溫 33℃
使用生成器函數(shù)實(shí)現(xiàn)可迭代對(duì)象
實(shí)際案例:
實(shí)現(xiàn)一個(gè)可迭代對(duì)象的類,它能迭代出給定范圍內(nèi)的所有素?cái)?shù):
pn = PrimeNumbers(1, 30)
for k in pn:
? print(k)
輸出結(jié)果:
2 3 5 7 11 13 17 19 23 29
解決方案:
將該類的__iter__方法實(shí)現(xiàn)生成器函數(shù),每次yield返回一個(gè)素?cái)?shù).
案例代碼:
# python3
class PrimeNumbers:
def __init__(self, start, end):
self.start = start
self.end = end
def isPrimerNum(self, k):
if k < 2:
return False
for i in range(2, k):
if k % i == 0:
return False
return True
def __iter__(self):
for k in range(self.start, self.end + 1):
if self.isPrimerNum(k):
yield k
for x in PrimeNumbers(1, 30):
print(x)
實(shí)現(xiàn)序列的反向迭代操作
內(nèi)置方法:
reversed --> 其實(shí)是調(diào)用了__reversed__()方法
返回一個(gè)反向迭代器,
iter --> 調(diào)用了__iter__() 方法
返回一個(gè)正向迭代器
實(shí)際案例:
實(shí)現(xiàn)一個(gè)連續(xù)浮點(diǎn)數(shù)發(fā)生器FloatRange (和range類似),根據(jù)給定范圍 (start, end) 和步進(jìn)值 (step) 產(chǎn)生一系列連續(xù)浮點(diǎn)數(shù),如迭代FloatRange(3.0, 4.0, 0.2) 可產(chǎn)生序列:
正向:3.0 -> 3.2 -> 3.4 -> 3.6 -> 3.8 -> 4.0
反向:4.0 -> 3.8 -> 3.6 -> 3.4 -> 3.2 -> 3.0
案例思路:
重寫類里面的__iter__() 和 _reversred_() 方法
案例代碼:
# python3
class FloatRange:
def __init__(self, start, end, step=0.1):
self.start = start
self.end = end
self.step = step
def __iter__(self):
t = self.start
while t <= self.end:
yield t
t += self.step
def __reversed__(self):
t = self.end
while t >= self.start:
yield t
t -= self.step
for x in iter(FloatRange(1.0, 4.0, 0.2)):
print(x)
for x in reversed(FloatRange(1.0, 4.0, 0.2)):
print(x)
如何對(duì)迭代器做切片操作
itertools.islice 方法:islice(iterable, start, stop[, step]) --> islice object
參數(shù)說明:
- iterable --> 可迭代對(duì)象
- start --> 迭代的起始位置
- end --> 迭代的終止位置(end 為None的時(shí)候迭代到末尾)
- step --> 兩個(gè)元素之間的索引差值
實(shí)際案例:
有某個(gè)文本文件,我們想讀取其中某范圍的內(nèi)容,如100 ~ 300 行之間的內(nèi)容,python中文本文件是可迭代對(duì)象,我們是否可以使用類似列表切片的方式得到一個(gè)100 ~ 300 行文件內(nèi)容的生成器?
f = open('a.txt')
f[100:300] # 可以么?
解決方案:
使用標(biāo)準(zhǔn)庫中的itertools.islice,它能返回一個(gè)迭代器對(duì)象切片的生成器
案例代碼:
# python3
from itertools import islice
f = open('/var/log/dpkg.log')
# 在Python中,文件對(duì)象也是可迭代對(duì)象
f_iter = islice(f, 100, 300)
for x in f_iter:
print(x)
在for語句中迭代多個(gè)可迭代對(duì)象
zip 函數(shù):zip(iter1 [,iter2 [...]]) --> zip object
itertools.chain 方法:chain(*iterables) --> chain object
實(shí)際案例:
- 某班學(xué)生期末考試成績(jī),語文,數(shù)學(xué),英語分別存儲(chǔ)在3個(gè)列表中,同時(shí)迭代三個(gè)列表,計(jì)算每個(gè)學(xué)生的總分。(并行)
- 某年級(jí)有四個(gè)班,某次考試沒辦英語成績(jī)分別存儲(chǔ)在4個(gè)列表中,一次迭代每個(gè)列表,統(tǒng)計(jì)全學(xué)年成績(jī)高于90分人數(shù)。(串行)
解決方案:
并行:使用內(nèi)置函數(shù)zip,它能將多個(gè)可迭代對(duì)象合并,每次迭代返回一個(gè)元組
串行:使用標(biāo)準(zhǔn)庫中的itertools.chain,它能將多個(gè)可迭代對(duì)象連接
案例代碼:
from random import randint
chinese = [randint(60, 100) for _ in range(40)]
math = [randint(60, 100) for _ in range(40)]
english = [randint(60, 100) for _ in range(40)]
total_sorce = []
# 一般方法:生成一個(gè)索引序列
for i in range(len(chinese)):
total_sorce.append(chinese[i] + math[i] + english[I])
# 使用內(nèi)置zip函數(shù)
for c , m ,e in zip(chinese, math, English):
total_sorce.append(c + m + e)
print(total_sorce)
# python3
from random import randint
e1 = [randint(60, 100) for _ in range(42)]
e2 = [randint(60, 100) for _ in range(41)]
e3 = [randint(60, 100) for _ in range(38)]
e4 = [randint(60, 100) for _ in range(37)]
count = 0
for s in chain(e1, e2, e3, e4):
if s > 90:
count += 1
拆分含有多種分隔符的字符串
實(shí)際案例:
我們要把某個(gè)字符串依據(jù)分隔符號(hào)拆分不同的字段,該字符串包含多種不同的分隔符,例如:
s = 'ab;sdfsgsef|sgegssg\gfsdf\tasdavsdfup'
其中<,>,<;>,<|>,<\t>都是分隔符號(hào),如何處理?
解決方案:
- 連續(xù)使用str.split() 方法,每次數(shù)理一種分隔符號(hào)。
- 使用正則表達(dá)式的re.split() 方法,一次性拆分字符串。(推薦)
# python3
def mySplit(s, ds):
res = [s]
for d in ds:
t = []
map(lambda x: t.extend(x.split(d)), res)
res = t
return [x for x in res if x]
s = 'fasfgajhgijaseg|adfg、/vg.Adafsda'
print(mySplit(s, '/\?;,'))
import re
s = 'fasfgajhgijaseg|adfg、/vg.Adafsda'
res = re.split(r'[,;\t|]+', s) # + 表示匹配多次
如何判斷字符串a(chǎn)是否以字符串b開頭或結(jié)尾
實(shí)際案例:
某文件系統(tǒng)目錄下有一系列文件:
quicksort.c
graph.py
install.sh
stack.cpp
......
編寫程序給其中所有的.sh文件和.py文件加上用戶可執(zhí)行權(quán)限。
案例代碼:
# endwith 的參數(shù)只能是元組或者字符串,不能為列表
f_li = [name for name in os.listdir('.') if name.endswith(('.sh', '.py'))]
[os.chmod(x, os.stat(x).st_mode | stat.S_IXUSR) for x in f_li]
如何調(diào)整字符串中文本的格式
實(shí)際案例:
某軟件的log文件,其中的日期格式為 'yyyy-mm-dd':
......
2018-06-20 18:32:25 status unpacked python3-pip:all 8.1.1-2ubuntu0.4
2018-06-20 18:32:25 status half-configured python3-pip:all 8.1.1-2ubuntu0.4
2018-06-20 18:32:25 status installed python3-pip:all 8.1.1-2ubuntu0.4
2018-06-20 18:32:26 startup packages configure......
我們想把其中日期改為美國日期的格式 'mm/dd/yyyy'.
'2018-06-20' => '06/20/2018',應(yīng)如何處理?
解決方案:
使用正則表達(dá)式
re.sub()方法做字符串替換,利用正則表達(dá)式的補(bǔ)貨組,捕獲每個(gè)部分內(nèi)容,在替換字符串中調(diào)整各個(gè)捕獲組的順序。
import re
log = open('/var/log/dpkg.log').read()
re.sub('(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})', r'\g<month>/\g<day>/\g<year>', log) # 給小組取別名?P<name>
如何將多個(gè)小字符串拼接成一個(gè)大的字符串
實(shí)際案例:
在設(shè)計(jì)某網(wǎng)絡(luò)程序時(shí),我們自定義了一個(gè)基于UDP的網(wǎng)絡(luò)協(xié)議。
在按照固定次序向服務(wù)器傳遞一系列參數(shù):
hwDetect: "<0112>"
gxDepthBits: "<32>"
gxResolution: "<1024x768>"
gxRefresh: "<60>"
fullAlpha: "<1>"
lodDist: "<100.0>"
DistCull: "<500.0>"
在程序中我們將各個(gè)參數(shù)按次序收集到列表中:
[ "<0112>", "<32>", "<1024x768>", "<60>", "<1>", "<100.0>", "<500.0>"]
最終我們要把各個(gè)參數(shù)拼成一個(gè)數(shù)據(jù)報(bào)進(jìn)行發(fā)送,
'<0112><32><1024x768><60><1><100.0><500.0>'
解決方案:
- 迭代列表,連續(xù)使用
+操作依次拼接每一個(gè)字符串。- 使用str.join() 方法,更加快速的拼接列表中所有字符串。(推薦)
案例代碼:
pl = ["<0112>", "<32>", "<1024x768>", "<60>", "<1>", "<100.0>", "<500.0>"]
s = ''
for p in pl:
s += p
pl = [ "<0112>", "<32>", "<1024x768>", "<60>", "<1>", "<100.0>", "<500.0>"]
s = ''.join(pl)
對(duì)字符串進(jìn)行左,右對(duì)齊
str.ljust()方法:ljust(width[, fillchar]) -> str
參數(shù)說明:
- width --> 字符串填充后的寬度
- fillchar --> 填充的字符
實(shí)際案例:
某個(gè)字典存儲(chǔ)了一系列的屬性值:
{
? "lodDist": 100.0,
? "SmallCull": 0.04,
? "DistCull": 500.0,
? "trilinear": 40,
? "farclip": 477
}
在程序中,我們想以以下工整的格式將其內(nèi)容輸出,如何處理?
lodDist : 100.0
farclip : 477
trilinear : 40
SmallCull : 0.04
DistCull : 500.0
解決方案:
- 使用字符串的str.ljust(), str.rjust(), str.center() 進(jìn)行左,右,居中對(duì)齊。
- 使用format() 方法,傳遞類似'<20', '>20', '^20' 參數(shù)完成同樣任務(wù)。例如:format(s, '<20') --> 左對(duì)齊
案例代碼:
d = {"lodDist": 100.0, "SmallCull": 0.04, "DistCull": 500.0, "trilinear": 40, "farclip": 477}
w = max(map(len, d.keys()))
for k in d.keys():
print(k.ljust(w), ':', d[k])
# lodDist : 100.0
# farclip : 477
# trilinear : 40
# SmallCull : 0.04
# DistCull : 500.0
如何去掉字符串中不需要的字符
實(shí)際案例:
過濾掉用戶輸入中前后多余的空白字符:
過濾某windows下編輯文本中的'\r':
'hello world\r\n'
去掉文本中的unicode組合符號(hào)(音調(diào)):
u'ni hao chi fan'
解決方案:
- 字符串strip(), lstrip(), rstrip() 方法去掉字符串兩端的字符(不限于空格)。
- 刪除單個(gè)固定位置的字符,可以使用切片 + 拼接的方式。
- 字符串的replace() 方法或者在正則表達(dá)式re.sub()刪除任意位置字符。
- 字符串translate() 方法,可以同時(shí)刪除多種不同字符。(python2)
如何設(shè)置文件的緩沖
實(shí)際案例:
將文件內(nèi)容寫入到硬件設(shè)備時(shí),使用系統(tǒng)調(diào)用,這類I/O操作的時(shí)間很長。為了減少I/O操作的次數(shù),文件通常使用緩沖區(qū)。(有足夠多的數(shù)據(jù)才進(jìn)行系統(tǒng)調(diào)用 )文件的緩沖行為,分為全緩沖,行緩沖,無緩沖。
如何設(shè)置python中文件對(duì)象的無緩沖行為?
解決方案:
全緩沖:open函數(shù)的buffering設(shè)置為大于1的證書n,n為緩沖區(qū)的大小。
行緩沖:open函數(shù)的buffering設(shè)置為1。
無緩沖:open函數(shù)的buffering設(shè)置為0。
如何將文件映射到內(nèi)存
實(shí)際案例:
- 在訪問某些二進(jìn)制文件時(shí),希望能把文件映射到內(nèi)存中,可以實(shí)現(xiàn)隨機(jī)訪問。(framebuffer設(shè)備文件)
- 某些嵌入式設(shè)備,寄存器被編址到內(nèi)存地址空間,我們可以映射/dev/mem某范圍,去訪問這些寄存器。
- 如果多個(gè)進(jìn)程映射同一個(gè)文件,還能實(shí)現(xiàn)進(jìn)程通信的目的。
解決方法:
使用標(biāo)準(zhǔn)庫中mmp模塊的mmap() 函數(shù),他需要一個(gè)打開的文件描述符作為參數(shù)。
如何訪問文件的狀態(tài)
os模塊:python與操作系統(tǒng)通信的模塊。
實(shí)際案例:
在某些項(xiàng)目中,我們需要獲得文件狀態(tài),例如:
文件的類型(普通文件,目錄,符號(hào)鏈接,設(shè)備文件 ... ).
文件的訪問權(quán)限。
文件的最后的訪問/修改/節(jié)點(diǎn)狀態(tài)更改時(shí)間。
普通文件的大小。
......
解決方案:
- 系統(tǒng)調(diào)用:標(biāo)準(zhǔn)庫中os模塊下的三個(gè)系統(tǒng)調(diào)用 stat, fstat, lstat 獲取文件狀態(tài)。
- 快捷函數(shù):標(biāo)準(zhǔn)庫中os.path 下一些函數(shù),使用起來更加簡(jiǎn)潔。
如何使用臨時(shí)文件
實(shí)際案例:
某項(xiàng)目中,我們從傳感器菜雞數(shù)據(jù),每手機(jī)到1G數(shù)據(jù)后,做數(shù)據(jù)分析,最終只保存分析結(jié)果。這樣很大的臨時(shí)數(shù)據(jù)如果常駐內(nèi)存,將消耗大量?jī)?nèi)存資源,我們可以使用臨時(shí)文件存儲(chǔ)這些臨時(shí)數(shù)據(jù)(外部存儲(chǔ))。
臨時(shí)文件不用命名,且關(guān)閉后悔自動(dòng)被刪除
解決方案:
使用標(biāo)準(zhǔn)庫中 tempfil撒大聲地e 下的 Temporaryfile, NamedTemporaryfile
from tempfile import Temporaryfile, NamedTemporaryfile
f = Temporaryfile() # 創(chuàng)建一個(gè)臨時(shí)文件對(duì)象,只能由對(duì)象f來訪問
f.write('abcdef' * 1000)
f.seek(0)
f.read(100)
ntf = NamedTemporaryfile() # 創(chuàng)建一個(gè)有名字的文件對(duì)象,可以在文件系統(tǒng)中找到,當(dāng)對(duì)象被重新實(shí)例化的時(shí)候,原來的文件會(huì)消失
ntf.name # 臨時(shí)文件的名字
ntf = NamedTemporaryfile(delete=False) # 創(chuàng)建新實(shí)例的時(shí)候,原來的文件不會(huì)被刪除
如何讀寫csv數(shù)據(jù)
實(shí)際案例:
將一個(gè)csv文件里面的數(shù)據(jù)讀取出來并存到另一個(gè)csv文件中。
解決方案:
使用標(biāo)準(zhǔn)庫中的csv模塊,可以使用其中 reader 和 writer 完成 csv 文件讀寫
import csv
rf = open('test1.csv', 'rb')
reader = csv.reader(rf) # 得到一個(gè)讀取 csv 文件的對(duì)象的迭代器
reader.next() # 得到第一行的數(shù)據(jù)
wf = open('test2.csv', 'wb')
writer = csv.writer(wf)
writer.writerow(['a', 'b']) # 將一個(gè)一維列表寫入csv文件中
writer.writerows([['a', 'b'], ['c', 'd']]) # 將一個(gè)二維寫入csv文件中
如何讀寫json數(shù)據(jù)
解決方案:
使用json模塊里的loads、load和dumps、dump進(jìn)行處理
json模塊:
-
json.dumps(obj, sort_key=True)# 將python對(duì)象轉(zhuǎn)化為json對(duì)象,對(duì)鍵進(jìn)行排序 -
json.dump()# 將python的文件對(duì)象轉(zhuǎn)化為json對(duì)象 -
json.loads()# 將json字符串轉(zhuǎn)化為python對(duì)象(一般為字典) -
json.load()# 講json文件對(duì)象轉(zhuǎn)化為Python對(duì)象
如何解析簡(jiǎn)單的xml文檔
解決方案:
使用標(biāo)準(zhǔn)庫中的xml.etree.ElementTree,其中的parse函數(shù)可以解析xml文檔。
from xml.etree.ElementTree import parse
f = open('demo.xml', 'r')
et = parse(f) # 得到一個(gè)元素樹對(duì)象
root = et.getroot() # 得到元素樹的根節(jié)點(diǎn)(可迭代元素對(duì)象)
for child in root:
child.get('name') # 獲取每個(gè)子元素的屬性
root.tag # 查看標(biāo)簽名
root.attrib # root的屬性(一個(gè)字典)
root.text # root的內(nèi)容
root.text.strip() # 過濾空白字符
# 支持 xpath 語法
root.find('tag_name') # 找到子節(jié)點(diǎn)下標(biāo)簽名為 tag_name 的元素,找到第一個(gè)就停止
root.findall('tag_name') # 找到子節(jié)點(diǎn)下所有標(biāo)簽名為 tag_name 的元素,返回一個(gè)列表
root.iterfind('tag_name') # 找到子節(jié)點(diǎn)下所有標(biāo)簽名為 tag_name 的元素,返回一個(gè)迭代器
root.iter('rank') # 遞歸地查找標(biāo)簽名為 rank 的子節(jié)點(diǎn),無論在哪個(gè)層級(jí)下
如何構(gòu)建xml文檔
解決方案:
使用標(biāo)準(zhǔn)庫中的xml.etree.ElementTree,使用其中的write方法寫入xml文件。
from xml.etree.ElementTree import Element, ElementTree
from xml.etree.ElementTree import tostring
e = Element('Data') # 創(chuàng)建一個(gè) xml 元素
e.tag # 得到元素的標(biāo)簽名
e.set('name', 'abc') # 設(shè)置元素的屬性
tostring(e) # 以字符串的方式查看xml元素
e.text = '123' # 設(shè)置xml元素的內(nèi)容
e2 = Element('Row')
e2.text = 'abc'
e3 = Element('Open')
e3.text = 'def'
e2.append(e3) # 將e3設(shè)置為e2的子元素
e.text = None # 將元素e的內(nèi)容刪除
e.append(e2) # 將e2設(shè)置為e的子元素
et = ElementTree(e) # 需要傳入一個(gè)根節(jié)點(diǎn)元素
et.write('demo1.xml')
# 美化xml文件
def pretty(e, level=0):
if len(e) > 0:
e.text = '\n' + '\t' * (level + 1)
for child in e:
pretty(child, level + 1)
child.tail = child.tail[:-1]
e.tail = '\n' + '\t' * level
如何讀寫excel文件
實(shí)際案例:
利用python讀寫excel文件,并添加新的一列
解決方案:
使用第三方庫
xlrd和xlwt,這兩個(gè)庫分別用于excel讀和寫
# 讀取excel文件
import xlrd
book = xlrd.open_work_book('demo.xlsx')
book.sheets() # 返回一個(gè)sheet對(duì)象的列表
sheet = book.sheet_by_index(0) # 得到一個(gè)sheet對(duì)象
sheet.nrows # 表的行數(shù)
sheet.ncols # 表的列數(shù)
# 訪問單元格
cell = sheet.cell(0, 0) # 得到坐標(biāo)為(0,0)的對(duì)象
cell.ctype # 內(nèi)容的類型,枚舉類型 xlrd.XL_CELL_XXX 來查看具體所指
cell.value # cell的內(nèi)容
# 獲取一行或者一列
sheet.row(1) # 返回第一行
sheet.row_values(1) # 得到第一行的值
sheet.row_values(1, start_colx=1, end_colx=None) # 從第一個(gè)開始
# 添加一個(gè)單元格
sheet.put_cell(rowx=2, colx=2, ctype=1, value="ok", xf_index=None) # xf_index 設(shè)置字體、對(duì)齊等
# 寫入excel文件
import xlwt
wbook = xlwt.Workbook() # 實(shí)例一個(gè)Workbook對(duì)象
wsheet = wbook.add_sheet('sheet1') # 創(chuàng)建一個(gè)名字為sheet1的表
wsheet.write(r=0, c=0, label='ok', style=None)
wsheet.save('demo.xlsx')
如何派生內(nèi)置不可變類型并修改實(shí)例化行為
實(shí)際案例:
我們想自定義一種新類型的元組,對(duì)于傳入的可迭代對(duì)象,我們只保留作其中int類型且值大于0的元素,例如:
IntTuple([1, -1, 'abc', 6, ['x', 'y'], 3]) => (1, 6, 3)
要求IntTuple是內(nèi)置tuple的子類,如何實(shí)現(xiàn)?
解決方案:
定義IntTuple繼承內(nèi)置tuple,并實(shí)現(xiàn)__new__,修噶愛實(shí)例化行為
class IntTuple(tuple):
# 參數(shù)與__init__方法的參數(shù)一致
def __new__(cls, iterable):
g = (x for x in iterable if isinstance(x, int) and x > 0)
# 返回一個(gè) IntTuple 的實(shí)例對(duì)象
return super(IntTuple, cls).__new__(cls, g)
def __init__(self, iterable):
super(IntTuple, self).__init__() # super().__init__() 也可以
t = IntTuple([1, -1, 'abc', 6, ['x', 'y'], 3])
print(t)
如何創(chuàng)建大量實(shí)例節(jié)省內(nèi)存
實(shí)際案例:
某網(wǎng)絡(luò)游戲中,定義了玩家類Player(id, name, status, ...) 每有一個(gè)在線玩家,在服務(wù)器程序內(nèi)則有一個(gè)Player的實(shí)例,當(dāng)在線人數(shù)很多時(shí),將產(chǎn)生大量實(shí)例。(如百萬級(jí))
如何降低這些大量實(shí)例的內(nèi)存開銷?
解決方案:
定義類的__slots__屬性, 它用來聲明實(shí)例屬性名字的列表。
案例代碼:
# python3
import sys
class Player(object):
def __init__(self, uid, name, status=0, level=1):
self.uid = uid
self.name = name
self.stat = status
self.level = level
class Player2(object):
__slots__ = ['uid', 'name', 'stat', 'level'] # 關(guān)閉__dict__的屬性,不能動(dòng)態(tài)綁定屬性
def __init__(self, uid, name, status=0, level=1):
self.uid = uid
self.name = name
self.stat = status
self.level = level
p1 = Player('0001', 'Jim')
p2 = Player('0001', 'Jim')
dic = p1.__dict__ # 類對(duì)象屬性的字典, 可以動(dòng)態(tài)的增加和刪除值
sys.getsizeof(dic) # 獲取dic的內(nèi)存值
如何讓對(duì)象支持上下文管理
實(shí)際案例:
我們事先了一個(gè)telent客戶端的類TelentClient,調(diào)用實(shí)例的start() 方法啟動(dòng)客戶端與服務(wù)器的交互,交互完畢后需調(diào)用cleanup() 方法,關(guān)閉已連接的socket,以及將操作歷史記錄寫入文件并關(guān)閉。
能否讓TelentClient的實(shí)例支持上下文管理協(xié)議,從而代替手工調(diào)用cleanup() 方法。
解決方案:
實(shí)現(xiàn)上下文管理協(xié)議,需定義實(shí)例的__enter__和__exit__方法,它們分別在with開始和結(jié)束時(shí)被調(diào)用。
# python
from telnetlib import Telnet
from sys import stdin, stdout
from collections import deque
class TelentClient(object):
def __init__(self, addr, port=23):
self.addr = addr
self.port = port
self.tn = None
self.history = None
def start(self):
# user
t = self.tn.read_until('login: ')
stdout.write(t)
user = stdin.readline()
self.tn.write(user)
# password
t = self.tn.read_until('Password: ')
if t.startswith(user[:-1]):
t = t[len(user) + 1:]
stdout.write(t)
self.tn.write(stdin.readline())
t = self.tn.read_until('$ ')
stdout.write(t)
while True:
uinput = stdin.readline()
if not uinput:
break
self.history.append(uinput)
self.tn.write(uinput)
t = self.tn.read_until('$ ')
stdout.write(t[len(uinput) + 1:])
def cleanup(self):
self.tn.close()
self.tn = None
with open(self.addr + '_history.txt', 'w') as f:
f.write(self.history)
def __enter__(self):
self.tn = Telnet(self.addr, self.port)
self.history = deque()
# 返回 TelentClient 實(shí)例化后的對(duì)象,也就是自身
return self
# 當(dāng)出現(xiàn)異常的時(shí)候,直接跳到__exit__方法并執(zhí)行
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup()
return True # 寫了 'return True' 后不會(huì)向外拋出異常
with TelentClient('finlu.com.cn') as client:
client.start()
一般的telent客戶端:
# python3
from telnetlib import Telnet
from sys import stdin, stdout
from collections import deque
class TelentClient(object):
def __init__(self, addr, port=23):
self.addr = addr
self.port = port
self.tn = None
self.history = None
def start(self):
self.tn = Telnet(self.addr, self.port)
self.history = deque()
# user
t = self.tn.read_until('login: ')
stdout.write(t)
user = stdin.readline()
self.tn.write(user)
# password
t = self.tn.read_until('Password: ')
if t.startswith(user[:-1]):
t = t[len(user) + 1:]
stdout.write(t)
self.tn.write(stdin.readline())
t = self.tn.read_until('$ ')
stdout.write(t)
while True:
uinput = stdin.readline()
if not uinput:
break
self.history.append(uinput)
self.tn.write(uinput)
t = self.tn.read_until('$ ')
stdout.write(t[len(uinput) + 1:])
def cleanup(self):
self.tn.close()
self.tn = None
with open(self.addr + '_history.txt', 'w') as f:
f.write(self.history)
client = TelentClient('finlu.com.cn')
print('\nstart...')
client.start()
print('\ncleanup...')
如何創(chuàng)建可管理的對(duì)象屬性
實(shí)際案例:
在面向?qū)ο缶幊讨?,我們把方法(函?shù))看作對(duì)象的接口。直接訪問對(duì)象的屬性可能是不安全的,或設(shè)計(jì)上不夠靈活。但是使用調(diào)用方法在形式上不如訪問屬性簡(jiǎn)介。
circle.get_radius()
circle.get_radius(5.0) # 繁
circle.radius
circle.radius = 5.0 # 簡(jiǎn)
是否在形式上是屬性訪問,但實(shí)際上調(diào)用方法?
解決方案:
使用property函數(shù)為類創(chuàng)建可管理屬性,fget/fset/fdel對(duì)應(yīng)屬性訪問。
案例代碼:
from math import pi
class Circle(object):
def __init__(self, radius):
self.radius = radius
def get_radius(self):
return round(self.radius, 2)
def set_radius(self, value):
if not isinstance(value, (int, float)):
raise ValueError('wrong type.')
self.radius = float(value)
def get_area(self):
return round(self.radius ** 2 * pi, 2)
R = property(get_radius, set_radius)
c = Circle(3.2)
r1 = c.R # r1 = 3.2
c.R = 4.2
r2 = c.R # r2 = 4.2
如何讓類支持比較操作
實(shí)際案例:
有時(shí)我們希望自定義的類,勢(shì)力見可以使用<, <=, >, >=, ==, !=符號(hào)進(jìn)行比較,我們自定義比較的行為。例如,有一個(gè)矩陣的類,我們希望比較兩個(gè)矩形的實(shí)例時(shí),比較的是他們的面積。
解決方案:
比較符號(hào)運(yùn)算符重載,需要實(shí)現(xiàn)以下方法:
__lt__,__le__,___gt,__ge__,__eq__,__ne__
使用標(biāo)準(zhǔn)庫下的functools下的類裝飾器abstractmethod可以簡(jiǎn)化此過程。
案例代碼:
from math import pi
from abc import abstractmethod
class Shape(object):
# 子類中一定要實(shí)現(xiàn)該方法
@abstractmethod
def area(self):
pass
def __lt__(self, other):
print('In __lt__')
if not isinstance(other, Shape):
raise TypeError('Type Error')
return self.area() < other.area()
def __eq__(self, other):
print('In __lt__')
if not isinstance(other, Shape):
raise TypeError('Type Error')
return self.area() == other.area()
def __gt__(self, other):
if not isinstance(other, Shape):
raise TypeError('Type Error')
return self.area() > other.area()
class Rectangle(Shape):
def __init__(self, w, h):
self.w = w
self.h = h
def area(self):
return self.w * self.h
class Circle(Shape):
def __init__(self, r):
self.r = r
def area(self):
return self.r ** 2 * pi
r = Rectangle(6, 6)
c = Circle(6)
print(r < c)
如何使用描述符對(duì)實(shí)例屬性做類型檢查
描述符:__get__,__set__, __delete__
實(shí)際案例:
在某項(xiàng)目中,我們實(shí)現(xiàn)了一些類,并希望能像靜態(tài)語言那樣(C, C++, Java)對(duì)它們的實(shí)例屬性做類型檢查。
p = Person()
p.name = 'Bob' # 必須是str
p.age = 18 # 必須是int
p.height = 1.83 # 必須是float
要求:
- 可以對(duì)實(shí)例變量名指定類型
- 賦予不正確時(shí)拋出異常
解決方案:
使用描述符來實(shí)現(xiàn)需要類型檢查的屬性:分別實(shí)現(xiàn)__get__,__set__, ___delete__方法,在__set__內(nèi)部使用isinstance函數(shù)做類型檢查。
案例代碼:
# 檢查Person類的屬性類型是否符合要求
class Attr(object):
def __init__(self, name, type_):
self.name = name
self.type_ = type_
def __get__(self, instance, owner):
print('In __get__')
return instance.__dict__[self.name]
def __set__(self, instance, value):
print('In __set__')
if not isinstance(value, self.type_):
raise TypeError('expected an {}'.format(self.type_))
instance.__dict__[self.name] = value
def __delete__(self, instance):
print('In delete')
del instance.__dict__[self.name]
class Person(object):
name = Attr('name', str)
age = Attr('age', int)
height = Attr('height', float)
person = Person()
person.name = 'Bob'
person.age = 18
person.height = 1.83
如何在環(huán)狀結(jié)構(gòu)中管理內(nèi)存
sys.getrefcount(a): 返回a被引用的次數(shù)
實(shí)際案例:
在python中,垃圾回收器通過引用計(jì)數(shù)來回收垃圾對(duì)象,但某些環(huán)狀數(shù)據(jù)結(jié)構(gòu)(樹, 圖,...),存在對(duì)象間的循環(huán)引用,比如樹的父節(jié)點(diǎn)引用子節(jié)點(diǎn),子節(jié)點(diǎn)也同時(shí)引用父節(jié)點(diǎn)。此時(shí)同時(shí)del掉引用父子節(jié)點(diǎn),兩個(gè)對(duì)象不能被立即回收。
如何解決此類的內(nèi)存管理問題?
解決方案:
使用標(biāo)準(zhǔn)庫weakref,它可以創(chuàng)建一種能訪問對(duì)象但不增加引用計(jì)數(shù)的對(duì)象。
案例代碼:
# 當(dāng)對(duì)象循環(huán)引用的時(shí)候,刪除其中一個(gè)對(duì)象,兩個(gè)對(duì)象不能被立即回收(執(zhí)行類的__del__方法)
import weakref
class Data(object):
def __init__(self, value, owner):
# self.owner = owner
self.owner = weakref.ref(owner)
self.value = value
def __str__(self):
# return "%s's data, value is %s" % (self.owner, self.value)
return "%s's data, value is %s" % (self.owner(), self.value)
def __del__(self):
print('In Data.__del__')
class Node(object):
def __init__(self, value):
# Node類和Data類循環(huán)引用
self.data = Data(value, self)
def __del__(self):
print('In Node.__del__')
node = Node(100)
del node
# In Node.__del__
# In Data.__del__
如何通過實(shí)例方法名字的字符串調(diào)用方法
實(shí)際案例:
某項(xiàng)目中,我們的代碼使用了三個(gè)不同庫中的圖形類:
? Circle, Triangle, Rectangle
他們都有一個(gè)獲取圖形面積的接口(方法),但接口名字不同。我們可以實(shí)現(xiàn)一個(gè)統(tǒng)一的獲取面積的函數(shù),使用每種方法名進(jìn)行嘗試,調(diào)用相應(yīng)類的接口。
解決方案:
- 使用內(nèi)置函數(shù)getattr,通過名字在實(shí)例上獲取方法對(duì)象,然后調(diào)用。
- 使用標(biāo)準(zhǔn)庫operator下的methodcaller函數(shù)調(diào)用。
案例代碼:
from math import pi
from operator import methodcaller
class Circle(object):
def __init__(self, r):
self.r = r
def area(self):
return self.r ** 2 * pi
class Rectangle(object):
def __init__(self, w, h):
self.w = w
self.h = h
def get_area(self):
return self.w * self.h
class Triangle(object):
def __init__(self, a, b ,c):
self.a = a
self.b = b
self.c = c
def getArea(self):
a, b, c = self.a, self.b, self.c
p = (a + b + c) / 2
area = (p * (p - a) * (p - b) * (p - c)) ** 0.5
return area
def getArea(shape):
for name in ('area', 'getArea', 'get_area'):
f = getattr(shape, name, None)
if f:
return f()
shape1 = Circle(2)
shape2 = Rectangle(6, 4)
shape3 = Triangle(3, 4, 5)
shapes = [shape1, shape2, shape3]
print(list(map(getArea, shapes)))
# methodcaller函數(shù)的基本使用案例
s = 'abc123abc123'
s.find('abc', 4)
# 6
methodcaller('find', 'abc', 4)(s)
# 6
如何使用多線程
實(shí)際案例:
希望提高從互聯(lián)網(wǎng)下載資源的速度
解決方案:
使用標(biāo)準(zhǔn)庫threading.Thread創(chuàng)建線程,在每一個(gè)線程下載并處理數(shù)據(jù)。
案例代碼:
# 使用線程類
from threading import Thread
import requests
def download():
url = 'http://baidu.com'
response = requests.get(url, timeout=5)
if response.ok:
print('ok')
return response.text
for _ in range(10):
t = Thread(target=download, args=())
t.start()
t.join()
print('__main__ END')
自定義線程類:
from threading import Thread
import requests
class MyThread(Thread):
def __init__(self):
super().__init__()
def run(self):
self.download()
@classmethod
def download(cls):
url = 'http://baidu.com'
response = requests.get(url, timeout=5)
if response.ok:
print('ok')
return response.text
threads = []
for i in range(1, 11):
t = MyThread()
threads.append(t)
t.start()
for t in threads:
t.join()
print('__main__ END')
如何線程間通信
實(shí)際案例:
由于全局解釋器鎖的存在,多線程進(jìn)程CPU密集型操作并不能提高執(zhí)行效率,我們修改程序架構(gòu):
- 使用多個(gè)DownloadThread線程進(jìn)行下載(I/O操作)
- 使用一次ConvertThread線程進(jìn)行轉(zhuǎn)換(CPU密集型操作)
- 下載線程把下載數(shù)據(jù)安全地傳遞給轉(zhuǎn)換線程
解決方案:
使用標(biāo)準(zhǔn)庫中的queue.Queue,它是一個(gè)線程安全的隊(duì)列。Download線程把下載數(shù)據(jù)放入Covert線程從隊(duì)列里提取數(shù)據(jù)。
案例代碼:
from threading import Thread
import requests
from queue import Queue
class DownloadThread(Thread):
def __init__(self, sid, queue):
super().__init__()
self.sid = sid
self.queue = queue
@classmethod
def download(cls, sid):
url = 'http://baidu.com'
sid = sid
response = requests.get(url, timeout=5)
if response.ok:
print('ok')
return response.text
def run(self):
print('DownloadThread', self.sid)
data = self.download(self.sid)
self.queue.put((self.sid, data))
class ConverThread(Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
@classmethod
# 模擬CPU運(yùn)算
def go(self):
sum = 0
for i in range(100):
sum += I
return sum
def run(self):
while True:
sid, data = self.queue.get()
print('ConverThread', sid)
# 當(dāng)隊(duì)列中的sid值為-1的時(shí)候表示沒有數(shù)據(jù)了,可以退出循環(huán)了
if sid == -1:
break
if data:
self.go()
q = Queue()
dthreads = [DownloadThread(i, q) for i in range(1, 11)]
cThread = ConverThread(q)
for t in dthreads:
t.start()
t.join()
cThread.start()
# 當(dāng)子線程結(jié)束后往隊(duì)列中put sid 的值為-1
q.put((-1, None))
print('__main__ END')
如何在線程間進(jìn)行事件通知
[圖片上傳失敗...(image-4c542a-1555816204408)]
解決方案:
線程間的事件通知,可以使用標(biāo)準(zhǔn)庫中的Threading.Event:
- 等待事件一端調(diào)用wait,等待事件。
- 通知事件一端調(diào)用set,通知事件。
案例代碼:
import csv
import os
import tarfile
from xml.etree.ElementTree import Element, ElementTree
import requests
from xml pretty import pretty
from threading import Event, Thread
class DownloadThread(Thread):
def __init__(self, sid, queue):
super().__init__()
self.sid = sid
self.url = 'http://table.fince.yahoo.com/table.csv?s=%s.sz'
self.url %= str(sid).rjust(6, '0')
self.queue = queue
def download(cls, url):
response = requests.get(url, timeout=3)
if response.ok:
return response.content
def run(self):
print('DownloadThread', self.sid)
data = self.download(self.sid)
self.queue.put((self.sid, data))
class ConverThread(Thread):
def __init__(self, queue, cEvent, tEvent):
super().__init__()
self.queue = queue
self.cEvent = cEvent
self.tEvent = tEvent
def csv_to_xml(self):
reader = csv.reader(scsv)
headers = reader.__next__()
headers = map(lambda h: h.replace(' ', ''), headers)
root = Element('Data')
for row in reader:
eRow = Element('Row')
root.append(eRow)
for tag, text in zip(headers, row):
e = Element(tag)
e.text = text
eRow.append(e)
pretty(root)
et.ElementTree(root)
et.write(fxml)
def run(self):
while True:
count = 0
sid, data = self.queue.get()
print('Convert', sid)
if sid == -1:
self.cEvent.set()
self.tEvent.wait()
break
if data:
fname = str(sid).rjust(6, '0') + '.xml'
with open(fname, 'wb') as wf:
self.csv_to_xml(data, wf)
count += 1
if count == 5:
self.cEvent.set()
self.tEvent.wait()
self.tEvent.clear()
count = 0
class TarThread(Thread):
def __init__(self, cEvent, tEvent):
Thread.__init__()
self.count = 0
self.cEvent = cEvent
self.tEvent = tEvent
self.setDaemon(True) # 設(shè)置守護(hù)線程,當(dāng)其他線程退出的時(shí)候,其自動(dòng)退出
def tarXML(self):
self.count += 1
tfname = '%d.tgz' % self.count
tf = tarfile.open(tfname, 'w:gz')
for fname in os.listdir('.'):
if fname in os.listdir('.'):
tf.add(fname)
os.remove(fname)
tf.close()
if not tf.members:
os.remove(tfname)
def run(self):
while True:
self.cEvent.wait()
self.tarXML()
self.cEvent.clear()
self.tEvent.set()
if __name__ == '__main__':
q = Queue()
dthreads = [DownloadThread(i, q) for i in range(1, 11)]
cEvent = Event()
tEvent = Event()
cThread = ConverThread(q, cEvent, tEvent)
tThread = TarThread(cEvent, tEvent)
tThread.start()
for t in dthreads:
t.start()
t.join()
cThread.start()
# 當(dāng)子線程結(jié)束后往隊(duì)列中put sid 的值為-1
q.put((-1, None))
print('__main__ END')
如何使用線程本地?cái)?shù)據(jù)
實(shí)際案例:


案例代碼:






如何使用線程池



在上面的代碼基礎(chǔ)上重寫TCPServer類。
如何使用多進(jìn)程

如何使用函數(shù)裝飾器
實(shí)際案例:

案例代碼:
# 斐波拉契數(shù)列求值函數(shù)
def fibonacci(n):
if n <= 1:
return 1
return fibonacci(n - 1) + fibonacci(n - 2)
# 對(duì)斐波拉契數(shù)列求值函數(shù)的優(yōu)化
def fibonacci(n, cache=None):
if cache is None:
cache = {}
if n in cache:
return cache[n]
if n <= 1:
return 1
cache[n] = fibonacci(n - 1, cache) + fibonacci(n - 2, cache)
return cache[n]
# 使用裝飾器實(shí)現(xiàn)對(duì)斐波拉契數(shù)列的優(yōu)化
def memo(func):
cache = {}
def wrap(*args):
if args not in cache:
print(args)
cache[args] = func(*args)
return cache[args]
return wrap
@memo
def fibonacci(n):
if n <= 1:
return 1
return fibonacci(n - 1) + fibonacci(n - 2)
# 一共有10個(gè)臺(tái)階,一次只能邁1~3個(gè)臺(tái)階,且不能后退,求有幾種方法
def climb(n, steps):
count = 0
if n == 0:
count = 1
elif n > 0:
for step in steps:
count += climb(n - step, steps)
return count
# 裝飾器緩存避免重復(fù)運(yùn)算
def memo(func):
cache = {}
def wrap(*args):
if args not in cache:
print(args)
cache[args] = func(*args)
return cache[args]
return wrap
@memo
def climb(n, steps):
count = 0
if n == 0:
count = 1
elif n > 0:
for step in steps:
count += climb(n - step, steps)
return count
如何為被裝飾的函數(shù)保存元數(shù)據(jù)
函數(shù)的元數(shù)據(jù):
- __name__ 函數(shù)的名字,在define的時(shí)候?qū)懭氲胶瘮?shù)里面的
- __doc__ 函數(shù)的文檔字符串
- _module_ 函數(shù)所屬模塊名
- _defaults_ 保存函數(shù)的默認(rèn)參數(shù)(關(guān)鍵字參數(shù)),默認(rèn)以元組的形式
- _closure_ 返回函數(shù)的閉包
實(shí)際案例:

案例代碼:
from functools import update_wrapper, wraps, WRAPPER_ASSIGNMENTS, WRAPPER_UPDATES
def memo(func):
cache = {}
# 方法二:使用wraps裝飾器
@wraps(func)
def wrap(*args, *kargs):
"""wraper function"""
# 方法一:使用update_wrapper函數(shù)
update_wrapper(wrap, func, assigned=('__name__', '__doc__'), updated=('__dict__', ))
# 也可以使用默認(rèn)參數(shù)
# assigned=WRAPPER_ASSIGNMENTS==('__module__', '__name__', '__qualname__', '__doc__', '__annotations__')
# updated=WRAPPER_UPDATES==('__dict__',)
if args not in cache:
print(args)
cache[args] = func(*args)
return cache[args]
return wrap
@memo
def climb(n, steps):
"""climb function"""
count = 0
if n == 0:
count = 1
elif n > 0:
for step in steps:
count += climb(n - step, steps)
return count
如何定義帶參數(shù)的裝飾器
實(shí)際案例:

案例代碼:
from inspect import signature
def f(a, b, c=1):
pass
sig = signature(f)
print(sig.parameters) # 獲得函數(shù)參數(shù)的一個(gè)字典
a = sig.parameters['a']
b = sig.parameters['b']
c = sig.parameters['c']
# a.kind # 參數(shù)類型
# a.default # 參數(shù)的默認(rèn)值
# 創(chuàng)建一個(gè)如上的字典
bargs = sig.bind(str, int, int) # 對(duì)所有參數(shù)做類型檢查
# bargs.arguments # OrderedDict([('a', <class 'str'>), ('b', <class 'int'>), ('c', <class 'int'>)])
bargs2 = sig.bind_partial() # 對(duì)指定參數(shù)做類型檢查
from inspect import signature
# 定義帶參數(shù)的裝飾器
def typeassert(*ty_types, **ty_kwargs):
def decorator(func):
# func -> a, b
# d = {'a': int, 'b': str}
sig = signature(func) # 獲取func函數(shù)的簽名
btypes = sig.bind_partial(*ty_types, **ty_kwargs).arguments
def wrapper(*args, **kwargs):
# arg in d, isinstance(arg, d[arg])
for name, obj in sig.bind(*args, **kwargs).arguments.items():
if name in btypes:
if not isinstance(obj, btypes[name]):
raise TypeError('{} must be {}'.format(name, btypes[name]))
return func(*args, **kwargs)
return wrapper
return decorator
# 測(cè)試代碼
@typeassert(int, str, list)
def f(a, b, c):
print(a, b, c)
f(1, 'abc', [1, 2, 3])
f(1, 2, [1, 2, 3])
如何實(shí)現(xiàn)屬性可修改的函數(shù)裝飾器
實(shí)際案例:

案例代碼:
from functools import wraps
import time
import logging
from random import randint
def wran(timeout):
def decorator(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
used = time.time() - start
if used > timeout:
msg = '%s: %s > %s' % (func.__name__, used, timeout)
logging.warn(msg)
return res
# 動(dòng)態(tài)的修改timeout的值
def set_timeout(k):
nonlocal timeout
timeout = k
wrapper.set_timeout = set_timeout
return wrapper
return decorator
@wran(1.5)
def test():
print('In test')
while randint(0, 1):
time.sleep(0.5)
for _ in range(30):
test()
# 將timeout的值設(shè)置為1
test.set_timeout(1)
for _ in range(30):
test()