雖然在終端日志的上報(bào)程序中,我接觸到并第一次使用了事件循環(huán)(使用的是libev的python實(shí)現(xiàn)pyev,現(xiàn)在該模塊改名為pyev-static):
# worker.py
class Worker:
...
def sib_hd(self, watcher, revents):
logging.......
# 使用日志記錄
watcher.loop.stop(pyev.EVBREAK_ALL)
# 主動(dòng)結(jié)束循環(huán)
def run(self):
self.loop = pyev.default_loop(debug=self.config.get('debug',False))
# 創(chuàng)建事件循環(huán)
sig_event = self.loop.signal(signal.SIGINT | signal.SIGTERM, self.sig_hd)
# 如果進(jìn)程接收到了SIGINT SIGTERM則執(zhí)行sig_hd方法(也就是‘優(yōu)雅’地退出循環(huán))
sig_event.start()
self.Server = Server(self.loop, self.config)
# Server是解析類,給傳入的self.loop添加了 pyev的IO事件
self.loop.start()
# 啟動(dòng)事件循環(huán)
Server類中給loop添加的事件與對(duì)應(yīng)行為的代碼為:
class Server(EventEmitter):
# Server繼承pyee的EventEmitter類
def __init__(self, loop, config):
...
self.loop = loop
self.config = config
self.udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp.bind(self.config.get('address'))
# 使用socket創(chuàng)建udp端口監(jiān)聽
self.udp_io = self.loop.io(self.udp, pyev.EV_READ, self.udp_hd)
self.udp_io.start()
# 給事件循環(huán)添加IO事件,當(dāng)self.udp 觸發(fā) pyev.EV_READ事件, 則執(zhí)行self.udp_hd方法
libev除了上文中出現(xiàn)的IO事件、SIGNAL系統(tǒng)信號(hào)事件外,還支持Child Watcher、Filestat Watcher(監(jiān)聽文件系統(tǒng),個(gè)人猜想應(yīng)該是利用inotify)、timer Watcher、periodic Watcher等
這個(gè)self.udp_hd 就是對(duì)接收包數(shù)據(jù)進(jìn)行解碼的方法。那么處理完了之后,也可以在事件循環(huán)中添加存儲(chǔ)的方法,比如存儲(chǔ)到本地?cái)?shù)據(jù)庫。這樣就可以讓某個(gè)事件的IO操作不再阻塞整個(gè)代碼的運(yùn)行。
在這個(gè)收集程序中,我并未考慮存儲(chǔ)到數(shù)據(jù)庫的問題。因?yàn)橐环矫孢@樣會(huì)耦合日志收集和日志分析。而是使用logging模塊,將解碼后的日志以日志的形式存儲(chǔ)在本地。再通過flume將日志收集至HDFS或Kafka.之后再進(jìn)行處理。
這樣便可提升代碼的運(yùn)行效率,并且相對(duì)比直接使用多線程編程,代碼會(huì)更加簡潔,也不需要考慮數(shù)據(jù)鎖與創(chuàng)建線程開銷的問題。但是,公司里的C語言工程師指出這樣也可能會(huì)導(dǎo)致偶爾丟失數(shù)據(jù)的情況。不采用先緩存再處理,而是一邊取一遍處理,若UDP數(shù)據(jù)過多,則網(wǎng)卡會(huì)主動(dòng)拋棄udp包(網(wǎng)卡隊(duì)列被吃滿)
并且在python2下,實(shí)現(xiàn)事件循環(huán),是需要第三方庫的支持的,比如gevent,pyev,pyuv等。并且參與具體業(yè)務(wù)實(shí)現(xiàn)的庫必須也支持事件循環(huán)的實(shí)現(xiàn)。比如我們?cè)诰W(wǎng)上搜索gevent的代碼示例,IO操作都用gevent.sleep(x)代替代。如果單純寫time.sleep(x)則會(huì)報(bào)錯(cuò)。這也從側(cè)面暴露了這個(gè)問題。