logstash 實(shí)時(shí)增量同步oracle 到datahub streamcompute odps maxcompute

配置文件:

    stdin{     
    }  
    jdbc{  
        jdbc_connection_string => "jdbc:oracle:thin:scott/tiger@//127.0.0.1:1521/orcl"  # jdbc數(shù)據(jù)庫(kù)連接
        jdbc_user => "ncbs"  
        jdbc_password => "ncbs"  
        jdbc_driver_library => "D:\logstash-5.6.10\bin\logstash_jdbc_test\ojdbc6.jar"  # oracle jar包文件路徑
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"  # jdbc數(shù)據(jù)庫(kù)驅(qū)動(dòng)
        jdbc_paging_enabled => "true"  
        jdbc_page_size => "5000000"  # 同步數(shù)據(jù)分頁(yè)設(shè)置
        statement_filepath => "D:\logstash-5.6.10\bin\logstash_jdbc_test\jdbc.sql"  # 需同步的數(shù)據(jù)執(zhí)行的SQL文件路徑
       # statement => "select * from cxx_article_info where id > :sql_last_value" # 也可以直接將sql配置在這里
        schedule => "* * * * *"   # 各字段含義(由左至右)分、時(shí)、天、月、年,全部為*默認(rèn)含義為每分鐘都更新
        type => "T_YW_GA_JWRK"  # ES的type類(lèi)型,相當(dāng)于數(shù)據(jù)庫(kù)中的table,需要配置多個(gè)表時(shí),將jdbc整體復(fù)制一份,修改對(duì)應(yīng)type
    }  
}     
    
output {
    datahub {
        access_id => "Your accessId"
        access_key => "Your accessKey"
        endpoint => "Endpoint"
        project_name => "project"
        topic_name => "topic"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/Users/ph0ly/trash/dirty.data"
        dirty_data_file_max_size => 1000
    }
}  
} 

配置文件說(shuō)明

input

input{
jdbc{
#jdbc sql server 驅(qū)動(dòng),各個(gè)數(shù)據(jù)庫(kù)都有對(duì)應(yīng)的驅(qū)動(dòng),需自己下載
jdbc_driver_library => "/etc/logstash/driver.d/sqljdbc_2.0/enu/sqljdbc4.jar"
#jdbc class 不同數(shù)據(jù)庫(kù)有不同的 class 配置
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
#配置數(shù)據(jù)庫(kù)連接 ip 和端口,以及數(shù)據(jù)庫(kù)
jdbc_connection_string => "jdbc:sqlserver://200.200.0.18:1433;databaseName=test_db"
#配置數(shù)據(jù)庫(kù)用戶(hù)名
jdbc_user =>
#配置數(shù)據(jù)庫(kù)密碼
jdbc_password =>
#上面這些都不重要,要是這些都看不懂的話(huà),你的老板估計(jì)要考慮換人了。重要的是接下來(lái)的內(nèi)容。
# 定時(shí)器 多久執(zhí)行一次SQL,默認(rèn)是一分鐘
# schedule => 分 時(shí) 天 月 年
# schedule => 22 表示每天22點(diǎn)執(zhí)行一次
schedule => " *"
#是否清除 last_run_metadata_path 的記錄,如果為真那么每次都相當(dāng)于從頭開(kāi)始查詢(xún)所有的數(shù)據(jù)庫(kù)記錄
clean_run => false
#是否需要記錄某個(gè)column 的值,如果 record_last_run 為真,可以自定義我們需要表的字段名稱(chēng),
#此時(shí)該參數(shù)就要為 true. 否則默認(rèn) track 的是 timestamp 的值.
use_column_value => true
#如果 use_column_value 為真,需配置此參數(shù). 這個(gè)參數(shù)就是數(shù)據(jù)庫(kù)給出的一個(gè)字段名稱(chēng)。當(dāng)然該字段必須是遞增的,可以是 數(shù)據(jù)庫(kù)的數(shù)據(jù)時(shí)間這類(lèi)的
tracking_column => create_time
#是否記錄上次執(zhí)行結(jié)果, 如果為真,將會(huì)把上次執(zhí)行到的 tracking_column 字段的值記錄下來(lái),保存到 last_run_metadata_path 指定的文件中
record_last_run => true
#們只需要在 SQL 語(yǔ)句中 WHERE MY_ID > :last_sql_value 即可. 其中 :sql_last_value 取得就是該文件中的值
last_run_metadata_path => "/etc/logstash/run_metadata.d/my_info"
#是否將字段名稱(chēng)轉(zhuǎn)小寫(xiě)。
#這里有個(gè)小的提示,如果你這前就處理過(guò)一次數(shù)據(jù),并且在Kibana中有對(duì)應(yīng)的搜索需求的話(huà),還是改為true,
#因?yàn)槟J(rèn)是true,并且Kibana是大小寫(xiě)區(qū)分的。準(zhǔn)確的說(shuō)應(yīng)該是ES大小寫(xiě)區(qū)分
lowercase_column_names => false
#你的SQL的位置,當(dāng)然,你的SQL也可以直接寫(xiě)在這里。
#statement => SELECT * FROM tabeName t WHERE t.creat_time > :sql_last_value
statement_filepath => "/etc/logstash/statement_file.d/my_info.sql" #數(shù)據(jù)類(lèi)型,標(biāo)明你屬于那一方勢(shì)力。單了ES哪里好給你安排不同的山頭。 type => "my_info" } #注意:外載的SQL文件就是一個(gè)文本文件就可以了,還有需要注意的是,一個(gè)jdbc{}插件就只能處理一個(gè)SQL語(yǔ)句, #如果你有多個(gè)SQL需要處理的話(huà),只能在重新建立一個(gè)jdbc{}插件。 }

filter

filter的配置暫時(shí)為空。

output

access_id(Required): 阿里云access id
access_key(Required): 阿里云access key
endpoint(Required): 阿里云datahub的服務(wù)地址
project_name(Required): datahub項(xiàng)目名稱(chēng)
topic_name(Required): datahub topic名稱(chēng)
retry_times(Optional): 重試次數(shù),-1為無(wú)限重試、0為不重試、>0表示需要有限次數(shù), 默認(rèn)值為-1
retry_interval(Optional): 下一次重試的間隔,單位為秒,默認(rèn)值為5
skip_after_retry(Optional): 當(dāng)由Datahub異常導(dǎo)致的重試超過(guò)重試次數(shù),是否跳過(guò)這一輪上傳的數(shù)據(jù),默認(rèn)為false
approximate_request_bytes(Optional): 用于限制每次發(fā)送請(qǐng)求的字節(jié)數(shù),是一個(gè)近似值,防止因request body過(guò)大而被拒絕接收,默認(rèn)為2048576(2MB)
shard_keys(Optional):數(shù)組類(lèi)型,數(shù)據(jù)落shard的字段名稱(chēng),插件會(huì)根據(jù)這些字段的值計(jì)算hash將每條數(shù)據(jù)落某個(gè)shard, 注意shard_keys和shard_id都未指定,默認(rèn)輪詢(xún)落shard
shard_id(Optional): 所有數(shù)據(jù)落指定的shard,注意shard_keys和shard_id都未指定,默認(rèn)輪詢(xún)落shard
dirty_data_continue(Optional): 臟數(shù)據(jù)是否繼續(xù)運(yùn)行,默認(rèn)為false,如果指定true,則遇到臟數(shù)據(jù)直接無(wú)視,繼續(xù)處理數(shù)據(jù)。當(dāng)開(kāi)啟該開(kāi)關(guān),必須指定@dirty_data_file文件
dirty_data_file(Optional): 臟數(shù)據(jù)文件名稱(chēng),當(dāng)數(shù)據(jù)文件名稱(chēng),在@dirty_data_continue開(kāi)啟的情況下,需要指定該值。特別注意:臟數(shù)據(jù)文件將被分割成兩個(gè)部分.part1和.part2,part1作為更早的臟數(shù)據(jù),part2作為更新的數(shù)據(jù)
dirty_data_file_max_size(Optional): 臟數(shù)據(jù)文件的最大大小,該值保證臟數(shù)據(jù)文件最大大小不超過(guò)這個(gè)值,目前該值僅是一個(gè)參考值

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容