streamsets利用jython實現(xiàn)數(shù)據(jù)校驗

一、需求:

利用mongo元數(shù)據(jù)庫中提供的資源描述,去校驗csv中的每條數(shù)據(jù)

二、實現(xiàn)

首先配置好原數(shù)據(jù),以及路徑傳遞,還有jython模塊

注意,jython有很多第三方包是沒辦法直接使用的,需要用sys去加載,這時我們會碰上一個最大的難題,就是第三方包的處理。由于jython是運行在jvm上的,所以,需要c語言運行環(huán)境的包在此時都無法調(diào)用成功,例如pandas,numpy等,但其他第三方包在sys成功加載后還是可以調(diào)用成功的,例如pymongo,要把這些包和其依賴包放在指定路徑下。

streamsets最惡心的一點就是云端調(diào)試,問題與bug都要放在records.output中去打印輸出

還有就是要非常注意streamsets本身的知識和結(jié)構(gòu),比如records是個list,而record是個對象;batch by batch ?和record by record是兩種不同的運行模式等,如何利用他們的性質(zhì)進行編程仍是我們需要學習的

注意python格式的問題,循環(huán)的問題,還有業(yè)務(wù)邏輯處理的問題

三、編程

import sys

#sys.path.append('D:\JavaWorkplace\jython\jpython')

sys.path.append("/home/fengwenke/usr/streamset/jar/JPS.jar")

sys.path.append("/home/fengwenke/usr/streamset/python")

sys.setrecursionlimit(1000000)

from pymongo import MongoClient

import datetime as dt

import re

import json

conn = MongoClient('114.115.156.237', 27027)

db = conn.bigdata?

db.authenticate("gwssi", "gwssi123")

res = db.resourceProfile

for record in records:

? name = record.value['filepath']

#這個6和0是寫死的,需要改,6需要根據(jù)csv路徑的不同進行修改 0可能不需要改

csvName = name.split('/')[6]

tableName = csvName.split('_')[0]

a =list(res.find({"essentialInfo.resCode":tableName}))

meteData = []

for i in a:


? ? for s in i['dataInfos']:


? ? ? ? meteData.append(s['isPrimaryKey'])

? ? ? ? meteData.append(s['dataName'])

? ? ? ? meteData.append(s['dataType'])

print dt

newDate = dt.datetime.utcnow().strftime("%Y-%m-%d")

meteNameCollection = []

meteTypeCollection = []

meteIsprikeyCollection = []

#從mongo里拿出元數(shù)據(jù)的名字

for meteNameIndex in range(len(meteData)):

? if (meteNameIndex+2)%3 ==0:

? ? meteName = meteData[meteNameIndex]

? ? meteNameCollection.append(meteName)

? ? meteType = meteData[meteNameIndex+1]

? ? meteTypeCollection.append(meteType)

? ? meteIspri = meteData[meteNameIndex -1]

? ? meteIsprikeyCollection.append(meteIspri)

dataNameCollection = []

for recordIndex in range(len(records)):

? try:


? ? # Create a string field to store the current date with the specified format

? ? #record.value["3"] = meteData[8]


? ? #從數(shù)據(jù)流里取出第一列

? ? if recordIndex == 0:

? ? ? #從第一列里拿出每個名字


? ? ? for dataNameIndex in range(len(records[0].value)):

? ? ? ? dataNameCollection.append(records[0].value['{0}'.format(dataNameIndex)])


? ? else:

? ? ? #利用這個數(shù)據(jù)匹配元數(shù)據(jù),并對其他的數(shù)據(jù)類型進行校驗? 為什么不拆成兩層循環(huán),因為record記錄會覆蓋

? ? ? for dataNameIndex2 in range(len(dataNameCollection)):

? ? ? ? for meteNameIndex in range(len(meteNameCollection)):

? ? ? ? ? if dataNameCollection[dataNameIndex2] == meteNameCollection[meteNameIndex]:

? ? ? ? ? ? if meteIsprikeyCollection[meteNameIndex] == 1:

? ? ? ? ? ? ? #讀取對應的元數(shù)據(jù)類型. 時間。測試完成

? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "timestamp":

? ? ? ? ? ? ? ? matchRule = r'\d{4}(\-|\/|.)\d{1,2}\1\d{1,2}'

? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]

? ? ? ? ? ? ? ? if re.match(matchRule,matchData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"

? ? ? ? ? ? ? #讀取對應的元數(shù)據(jù)類型. 字母數(shù)字混合數(shù)據(jù)。測試完成

? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "varchar":

? ? ? ? ? ? ? ? #字母數(shù)字混合數(shù)據(jù)

? ? ? ? ? ? ? ? mixedData = records[recordIndex].value['{0}'.format(dataNameIndex2)]

? ? ? ? ? ? ? ? mixRule = '^(?=.*\d)(?=.*[a-zA-Z])(?=.*[\u4E00-\u9FA5])[\u4E00-\u9FA5A-Za-z0-9]*$'

? ? ? ? ? ? ? ? rg = re.compile(mixRule,re.IGNORECASE|re.DOTALL)

? ? ? ? ? ? ? ? mixJudge = rg.search(mixedData)

? ? ? ? ? ? ? ? if mixJudge :

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)]= "true"

? ? ? ? ? ? ? ? #英文

? ? ? ? ? ? ? ? elif re.match('^[A-Za-z]+$',mixedData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? #中文

? ? ? ? ? ? ? ? elif re.match(u"[\u4e00-\u9fa5]+",mixedData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? #空值

? ? ? ? ? ? ? ? elif records[recordIndex].value['{0}'.format(dataNameIndex2)] == "":

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "zhujianweikong"

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"

? ? ? ? ? ? ? #整數(shù)

? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "integer" or meteTypeCollection[meteNameIndex] == "bigint":

? ? ? ? ? ? ? ? matchRule = '^-?\\d+$'

? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]

? ? ? ? ? ? ? ? if re.match(matchRule,matchData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"

? ? ? ? ? ? ? #浮點數(shù)

? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "float" or meteTypeCollection[meteNameIndex] == "double":

? ? ? ? ? ? ? ? matchRule = '^(-?\\d+)(\\.\\d+)?$'

? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]

? ? ? ? ? ? ? ? if re.match(matchRule,matchData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"

? ? ? ? ? ? else:

? ? ? ? ? ? ? #讀取對應的元數(shù)據(jù)類型. 時間。測試完成

? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "timestamp":

? ? ? ? ? ? ? ? matchRule = r'\d{4}(\-|\/|.)\d{1,2}\1\d{1,2}'

? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]

? ? ? ? ? ? ? ? if re.match(matchRule,matchData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"

? ? ? ? ? ? ? #讀取對應的元數(shù)據(jù)類型. 字母數(shù)字混合數(shù)據(jù)。測試完成

? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "varchar":

? ? ? ? ? ? ? ? #字母數(shù)字混合數(shù)據(jù)

? ? ? ? ? ? ? ? mixedData = records[recordIndex].value['{0}'.format(dataNameIndex2)]

? ? ? ? ? ? ? ? mixRule = '^(?=.*\d)(?=.*[a-zA-Z])(?=.*[\u4E00-\u9FA5])[\u4E00-\u9FA5A-Za-z0-9]*$'

? ? ? ? ? ? ? ? rg = re.compile(mixRule,re.IGNORECASE|re.DOTALL)

? ? ? ? ? ? ? ? mixJudge = rg.search(mixedData)

? ? ? ? ? ? ? ? if mixJudge :

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)]= "true"

? ? ? ? ? ? ? ? #英文

? ? ? ? ? ? ? ? elif re.match('^[A-Za-z]+$',mixedData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? #中文

? ? ? ? ? ? ? ? elif re.match(u"[\u4e00-\u9fa5]+",mixedData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? #空值

? ? ? ? ? ? ? ? elif records[recordIndex].value['{0}'.format(dataNameIndex2)] == "":

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"

? ? ? ? ? ? ? #整數(shù)

? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "integer" or meteTypeCollection[meteNameIndex] == "bigint":

? ? ? ? ? ? ? ? matchRule = '^-?\\d+$'

? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]

? ? ? ? ? ? ? ? if re.match(matchRule,matchData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"

? ? ? ? ? ? ? #浮點數(shù)

? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "float" or meteTypeCollection[meteNameIndex] == "double":

? ? ? ? ? ? ? ? matchRule = '^(-?\\d+)(\\.\\d+)?$'

? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]

? ? ? ? ? ? ? ? if re.match(matchRule,matchData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"




? ? # Write record to processor output

? ? output.write(records[recordIndex])

? ? conn.close()

? except Exception as e:

? ? # Send record to error

? ? error.write(records[recordIndex], str(e))

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • pyspark.sql模塊 模塊上下文 Spark SQL和DataFrames的重要類: pyspark.sql...
    mpro閱讀 9,929評論 0 13
  • "use strict";function _classCallCheck(e,t){if(!(e instanc...
    久些閱讀 2,153評論 0 2
  • 概念:邊際成本 邊際成本指的是每多生產(chǎn)或者每多賣一件產(chǎn)品,所帶來的總成本的增加。邊際成本的結(jié)構(gòu)性改變,是互聯(lián)網(wǎng)經(jīng)濟...
    任性的Cissy閱讀 298評論 0 0
  • 新工作入職的第一天,對周遭的環(huán)境及同事難免感到陌生,小心翼翼的打量每一個人,做每一件事 第一次接觸到秋秋是我在區(qū)域...
    竹與千尋閱讀 426評論 0 0
  • 訪妙玉乞紅梅原文 酒未開樽句未裁,尋春問臘到蓬萊。不求大士瓶中露,為乞孀娥檻外梅。入世冷挑紅雪去,離塵香割紫云來。...
    如是緣起閱讀 590評論 0 0

友情鏈接更多精彩內(nèi)容