基于python3,簡單實現(xiàn)tail -n、tail -f功能
原理
- 把文件大小分頁進行讀取,這樣讀取大日志就無需全部加載到內(nèi)存中
- 文件倒讀輸出
代碼實現(xiàn)
#!/usr/bin/env python
# coding: utf-8
"""
原理:
https://yq.aliyun.com/articles/60684
功能:
實現(xiàn)tail -n
實現(xiàn)tail -f
其它實現(xiàn):
http://www.cnblogs.com/bufferfly/p/4878688.html
https://github.com/shengxinjing/my_blog/issues/11
BUG:
重定向相同數(shù)據(jù)到日志文件里,使用>而不是>>的話,輸入無法打印出來
"""
import os
import sys
import time
PAGE = 4096
class Tail:
def __init__(self, filename, callback=sys.stdout.write):
self.filename = filename
self.callback = callback
def reverse(self, n=10):
"""
實現(xiàn) tail -n
"""
with open(self.filename, 'rb') as f:
f_len = f.seek(0, 2)
rem = f_len % PAGE
page_n = f_len // PAGE
r_len = rem if rem else PAGE
while True:
# 如果讀取的頁大小>=文件大小,直接讀取數(shù)據(jù)輸出
if r_len >= f_len:
f.seek(0)
lines = f.readlines()[::-1]
break
f.seek(-r_len, 2)
# print('f_len: {}, rem: {}, page_n: {}, r_len: {}'.format(f_len, rem, page_n, r_len))
lines = f.readlines()[::-1]
count = len(lines) -1 # 末行可能不完整,減一行,加大讀取量
if count >= n: # 如果讀取到的行數(shù)>=指定行數(shù),則退出循環(huán)讀取數(shù)據(jù)
break
else: # 如果讀取行數(shù)不夠,載入更多的頁大小讀取數(shù)據(jù)
r_len += PAGE
page_n -= 1
for line in lines[:n]:
self.callback(line.decode('utf-8'))
def follow(self):
"""
實現(xiàn) tail -f
"""
with open(self.filename, 'rb') as fd:
pos = fd.seek(0, 2) # 打開文件時大小
try:
while True:
curr_pos = fd.seek(0,2)
# print('pos: {}, curr_pos: {}'.format(pos, curr_pos))
if pos > curr_pos: # 表示文件數(shù)據(jù)減少或清空
pos = fd.seek(0, 2)
# time.sleep(0.3)
continue
line = fd.readline()
if line:
self.callback(line.decode('utf-8'))
# time.sleep(0.1)
except KeyboardInterrupt as e:
pass
if __name__ == '__main__':
if len(sys.argv) != 3:
print('Usage: {} [ -f | -# ] file'.format(sys.argv[0]), file=sys.stderr)
raise SystemExit(1)
if not os.path.isfile(sys.argv[2]):
print('File does not exist.')
raise SystemExit(1)
else:
tail = Tail(sys.argv[2])
if '-f' == sys.argv[1]:
tail.reverse()
tail.follow()
elif '-' in sys.argv[1]:
try:
n = int(sys.argv[1].strip('-'))
tail.reverse(n)
except ValueError:
print('Unknown command {!r}'.format(sys.argv[1]), file=sys.stderr)
raise SystemExit(1)
else:
print('Unknown command {!r}'.format(sys.argv[1]), file=sys.stderr)
raise SystemExit(1)
測試
- 生成測試數(shù)據(jù)
with open('test.log', 'w+') as f:
for l in range(1, 21):
print('This is line {}'.format(l), file=f )
- 讀取測試
[tail] python test_data.py 15:49:04
[tail] python tail.py -5 test.log 15:50:09
This is line 20
This is line 19
This is line 18
This is line 17
This is line 16