#!/usr/bin/env python2
# -*- coding: utf-8 -*-
from kafka.producer import KafkaProducer
import sys
import logging
import os
import datetime
import random
import threading
import time
parentPath = "/opt/git/xx/app/event"
finishedLogPath = parentPath+"/finished_log.dat"
logger = logging.getLogger("eventToKafka")
logger.setLevel(logging.DEBUG)
# 建立一個filehandler來把日志記錄在文件里,級別為debug以上
fh = logging.FileHandler(parentPath+"/event_to_kafka.log")
fh.setLevel(logging.DEBUG)
# 建立一個streamhandler來把日志打在CMD窗口上,級別為error以上
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# 設置日志格式
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
fh.setFormatter(formatter)
#將相應的handler添加在logger對象中
logger.addHandler(ch)
logger.addHandler(fh)
class Kafka_producer():
"""
使用kafka的生產(chǎn)模塊
"""
def __init__(self, kafkatopic, kafkapartition):
self.kafkaTopic = kafkatopic
self.kafkaPartition=kafkapartition
self.producer = KafkaProducer(bootstrap_servers = ['111.111.111.111:9092'])
def sendjsondata(self, jsonData):
try:
producer = self.producer
producer.send(self.kafkaTopic, jsonData)
#producer.flush()
except Exception, e:
logger.error(e)
def flush(self):
producer = self.producer
producer.flush()
def sendBatchJsonData(self, jsonData):
try:
curcount = len(jsonData)/self.kafkaPartition
for i in range(0, self.kafkaPartition):
start = i * curcount
if i != (self.kafkaPartition - 1):
end = (i+1) * curcount
curdata = jsonData[start:end]
self.producer.send(self.kafkaTopic, curdata)
self.producer.flush()
else:
curdata = jsonData[start:]
self.producer.send(self.kafkaTopic, curdata)
self.producer.flush()
except Exception, e:
logger.error(e)
def searchFile(path, keyword):
fpList = []
for filename in os.listdir(path):
fp = os.path.join(path, filename)
if os.path.isfile(fp) and keyword in filename:
fpList.append(fp)
return fpList
def insertIntoSet(filePath):
file = open(filePath)
try:
tempSet = set()
for line in file:
tempSet.add(line.replace('\n',''))
except Exception, e:
logger.error(e)
finally:
file.close()
return tempSet
class calthread(threading.Thread):
#初始化函數(shù)
def __init__(self,threadname,cond,startN,endN,files):
threading.Thread.__init__(self,name = threadname)
self.cond = cond
self.startN = startN
self.endN = endN
self.files = files
#業(yè)務函數(shù)
def run(self):
for i in range(self.startN,self.endN + 1):
filePath = self.files[i]
logger.info("current file is " + filePath)
producer = Kafka_producer("event", 1)
file = open(filePath)
try:
fileLines = 0
for line in file:
arr = line.strip().split('\t')
if len(arr) > 0:
try:
producer.sendjsondata(arr[2])
producer.flush()
#隨機打印日志
if random.random() < 0.00001:
logger.info(arr[2])
fileLines += 1
except Exception, e:
logger.error("current wrong file is %s" % (filePath))
logger.error("The wrong event log is %s" % (arr[2]))
logger.error(e)
continue
logger.info("insert into kafka %s lines" % (str(fileLines)))
except Exception, e:
logger.error(e)
finally:
file.close()
def main(argv=None):
if argv == None:
argv = sys.argv
#獲取線程鎖
cond = threading.Condition()
#已處理日志
finishedLog = set()
finishedFile = open(finishedLogPath)
try:
for line in finishedFile:
finishedLog.add(line.strip('\n'))
finally:
finishedFile.close()
#incoming日志
incomingLog = set(searchFile("/xx/xx/staging/tracking/incoming/", "xxx"))
#待處理日志寫入finished_log.dat
todoLog = incomingLog - finishedLog
if len(todoLog) == 0:
return
for i in todoLog:
print(i)
outfile = open(finishedLogPath, 'a')
try:
for i in todoLog:
outfile.write(i + "\n")
finally:
outfile.close()
todoList = list(todoLog)
alen = len(todoList)
threadN = alen
#執(zhí)行線程對象列表
threadL = []
t = alen / threadN
logger.info( "初始化線程" )
for x in range(0,threadN):
startN = x*t
endN = 0
if((x+1)*t >= alen):
endN = alen - 1
else:
if(x == threadN - 1):
endN = alen - 1
else:
endN = (x+1)*t - 1
#向列表中存入新線程對象
threadTT = calthread("Thread--"+str(x),cond,startN,endN,todoList)
threadL.append(threadTT)
#總計完成線程計數(shù)器
logger.info("Start time of threadCal--"+ str(time.time()))
for a in threadL:
a.start()
logger.info("done")
if __name__ == "__main__":
main()
讀取文件內(nèi)容并寫到kafka
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。
相關閱讀更多精彩內(nèi)容
- 進入flume目錄D:\apache-flume-1.9.0-bin\conf,創(chuàng)建配置文件kafka.conf ...
- 導言 近來對electron有點感興趣,奈何干啃官方文檔實在難受,還是直接從demo入手會比較好。受到一兩個教程的...
- 一、實例代碼 二、一些補充 在上述代碼中,已經(jīng)對內(nèi)容進行了詳細的注釋,仔細閱讀應該可以輕松掌握。在此處只對一些屬性...
- 今日關鍵詞:認知差距,說話之道,不等價交易,契約精神。認知差距存在于不同的社會階層,不同的文化背景,不同的社會閱歷...