.Azkaban工作流引擎和Flume數(shù)據(jù)采集
Azkaban介紹
一、Azkaban簡(jiǎn)介
Azkaban是由Linkedin開源的一個(gè)**批量工作流任務(wù)調(diào)度器**。用于在一個(gè)工作流內(nèi)以一個(gè)特定的順序運(yùn)行一組工作和流程。Azkaban定義了一種KV文件格式來建立任務(wù)之間的依賴關(guān)系,并提供一個(gè)易于使用的web用戶界面維護(hù)和跟蹤你的工作流。簡(jiǎn)而言之就是一個(gè)工作流調(diào)度系統(tǒng)。
為什么需要工作流調(diào)度系統(tǒng)?
因?yàn)橐粋€(gè)完整的數(shù)據(jù)分析系統(tǒng)通常都是由大量任務(wù)單元組成:shell腳本程序,java程序,mapreduce程序、hive腳本等
而各任務(wù)單元之間存在時(shí)間先后及前后依賴關(guān)系
為了很好地組織起這樣的復(fù)雜執(zhí)行計(jì)劃,需要一個(gè)工作流調(diào)度系統(tǒng)來調(diào)度執(zhí)行;
常見工作流調(diào)度系統(tǒng)
在hadoop領(lǐng)域,常見工作流調(diào)度系統(tǒng)有:Oozie, Azkaban,Cascading,Hamake
下面的表格對(duì)上述四種hadoop工作流調(diào)度器的關(guān)鍵特性進(jìn)行了比較,盡管這些工作流調(diào)度器能夠解決的需求場(chǎng)景基本一致,但在設(shè)計(jì)理念,目標(biāo)用戶,應(yīng)用場(chǎng)景等方面還是存在顯著的區(qū)別,在做技術(shù)選型的時(shí)候,可以提供參考
|
特性
|
Hamake
|
Oozie
|
Azkaban
|
Cascading
|
|
工作流描述語言
|
XML
|
XML (xPDL based)
|
text file with key/value pairs
|
Java API
|
|
依賴機(jī)制
|
data-driven
|
explicit
|
explicit
|
explicit
|
|
是否要web容器
|
No
|
Yes
|
Yes
|
No
|
|
進(jìn)度跟蹤
|
console/log messages
|
web page
|
web page
|
Java API
|
|
Hadoop job調(diào)度支持
|
no
|
yes
|
yes
|
yes
|
|
運(yùn)行模式
|
command line utility
|
daemon
|
daemon
|
API
|
|
Pig支持
|
yes
|
yes
|
yes
|
yes
|
|
事件通知
|
no
|
no
|
no
|
yes
|
|
需要安裝
|
no
|
yes
|
yes
|
no
|
|
支持的hadoop版本
|
0.18+
|
0.20+
|
currently unknown
|
0.18+
|
|
重試支持
|
no
|
workflownode evel
|
yes
|
yes
|
|
運(yùn)行任意命令
|
yes
|
yes
|
yes
|
yes
|
|
Amazon EMR支持
|
yes
|
no
|
currently unknown
|
yes
|
其中比較常用的為Azkaban和Oozie。
Azkaban功能特點(diǎn)
1 Web用戶界面
2 方便上傳工作流
3 方便設(shè)置任務(wù)之間的關(guān)系
4 調(diào)度工作流
5 認(rèn)證/授權(quán)(權(quán)限的工作)
6 能夠殺死并重新啟動(dòng)工作流
7 模塊化和可插拔的插件機(jī)制
8 項(xiàng)目工作區(qū)
9 工作流和任務(wù)的日志記錄和審計(jì)
二、Azkaban使用
Azkaban有web界面,輸入[https://localhost:8443](https://localhost:8443/) (注意是https)可以訪問Azkaban的用戶界面。如圖:
[圖片上傳失敗...(image-b8de71-1535698211232)]
首頁有四個(gè)菜單
- projects:最重要的部分,創(chuàng)建一個(gè)工程,所有flows將在工程中運(yùn)行。
- scheduling:顯示定時(shí)任務(wù)
- executing:顯示當(dāng)前運(yùn)行的任務(wù)
- history:顯示歷史運(yùn)行任務(wù)
2.1 創(chuàng)建工程
一個(gè)工程包含一個(gè)或多個(gè)flows,一個(gè)flow包含多個(gè)job。job是你想在azkaban中運(yùn)行的一個(gè)進(jìn)程,可以是簡(jiǎn)單的linux命令,可是java程序,也可以是復(fù)雜的shell腳本,當(dāng)然,如果你安裝相關(guān)插件,也可以運(yùn)行插件。一個(gè)job可以依賴于另一個(gè)job,這種多個(gè)job和它們的依賴組成的圖表叫做flow。點(diǎn)擊右上角的create project,在彈出的窗口中填寫工程名和描述即可創(chuàng)建工程。

2.2 創(chuàng)建job
創(chuàng)建job很簡(jiǎn)單,只要?jiǎng)?chuàng)建一個(gè)以.job結(jié)尾的文本文件就行了。比如:
# foo.jobtype=commandcommand=echo foo
如果是多個(gè)job并且有依賴關(guān)系,可以使用dependencies參數(shù)指定依賴關(guān)系。如:
# bar.jobtype=commanddependencies=foocommand=echo bar
這樣job就創(chuàng)建好了。
2.3 將工作流打包上傳
將上面兩個(gè)job打成zip包,在頁面上點(diǎn)擊update上傳。上傳之后如圖:

2.4 運(yùn)行
之后點(diǎn)擊綠色的Execute Flow,彈出窗口:

左邊的選項(xiàng)卡依次為:
Flow view:流程視圖??梢越?,啟用某些job
Notification:定義任務(wù)成功或者失敗是否發(fā)送郵件
Failure Options:定義一個(gè)job失敗,剩下的job怎么執(zhí)行
Concurrent:并行任務(wù)執(zhí)行設(shè)置
Flow Parametters:參數(shù)設(shè)置。
左下角的Schedule是設(shè)置調(diào)度時(shí)間,右下角的Execute為直接運(yùn)行,點(diǎn)擊Execute。運(yùn)行之后在Graph可以看到:

在job List中可以看到個(gè)job運(yùn)行的起始終止時(shí)間。

這樣工作流的調(diào)度就執(zhí)行完了,Azkaban的使用還是挺簡(jiǎn)單的吧。
Azkaban安裝部署
Azkaban安裝
1、準(zhǔn)備工作
Azkaban Web服務(wù)器
azkaban-web-server-2.5.0.tar.gz
Azkaban執(zhí)行服務(wù)器
azkaban-executor-server-2.5.0.tar.gz
MySQL
目前azkaban只支持 mysql,需安裝mysql服務(wù)器,本文檔中默認(rèn)已安裝好mysql服務(wù)器,并建立了 root用戶,密碼 root.
azkaban下載地址:http://azkaban.github.io/downloads.html
2、安裝
將安裝文件上傳到集群,最好上傳到安裝 hive、sqoop的機(jī)器上,方便命令的執(zhí)行
在合適的位置新建azkaban目錄,用于存放azkaban運(yùn)行程序
azkaban web服務(wù)器安裝
解壓azkaban-web-server-2.5.0.tar.gz
命令: tar –zxvf azkaban-web-server-2.5.0.tar.gz
將解壓后的azkaban-web-server-2.5.0 移動(dòng)到 azkaban目錄中,并重新命名 webserver
命令: mv azkaban-web-server-2.5.0 ../azkaban
cd ../azkaban
mv azkaban-web-server-2.5.0 server
azkaban 執(zhí)行服器安裝
解壓azkaban-executor-server-2.5.0.tar.gz
命令:tar –zxvf azkaban-executor-server-2.5.0.tar.gz
將解壓后的azkaban-executor-server-2.5.0 移動(dòng)到 azkaban目錄中,并重新命名 executor
命令:mv azkaban-executor-server-2.5.0 ../azkaban
cd ../azkaban
mv azkaban-executor-server-2.5.0 executor
azkaban數(shù)據(jù)庫腳本導(dǎo)入
解壓: azkaban-sql-script-2.5.0.tar.gz
命令:tar –zxvf azkaban-sql-script-2.5.0.tar.gz
將解壓后的mysql 腳本,導(dǎo)入到mysql中:
進(jìn)入mysql
mysql> create database azkaban;
mysql> use azkaban;
Database changed
mysql> source /home/hadoop/azkaban-2.5.0/create-all-sql-2.5.0.sql;
創(chuàng)建SSL配置
命令: keytool -keystore keystore -alias jetty -genkey -keyalg RSA
運(yùn)行此命令后,會(huì)提示輸入當(dāng)前生成 keystor的密碼及相應(yīng)信息,輸入的密碼請(qǐng)勞記,信息如下:
輸入keystore密碼:
再次輸入新密碼:
您的名字與姓氏是什么?
[Unknown]:
您的組織單位名稱是什么?
[Unknown]:
您的組織名稱是什么?
[Unknown]:
您所在的城市或區(qū)域名稱是什么?
[Unknown]:
您所在的州或省份名稱是什么?
[Unknown]:
該單位的兩字母國家代碼是什么
[Unknown]: CN
CN=Unknown, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=CN 正確嗎?
[否]: y
輸入的主密碼
(如果和 keystore 密碼相同,按回車):
再次輸入新密碼:
完成上述工作后,將在當(dāng)前目錄生成 keystore 證書文件,將keystore 考貝到 azkaban web服務(wù)器根目錄中的bin目錄下.如:cp keystore azkaban/webserver/bin
配置文件
注:先配置好服務(wù)器節(jié)點(diǎn)上的時(shí)區(qū)
1、先生成時(shí)區(qū)配置文件Asia/Shanghai,用交互式命令 tzselect 即可
2、拷貝該時(shí)區(qū)文件,覆蓋系統(tǒng)本地時(shí)區(qū)配置
cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
3.配置文件的修改
azkaban web服務(wù)器配置
進(jìn)入azkaban web服務(wù)器安裝目錄 conf目錄
修改azkaban.properties文件
命令vi azkaban.properties
內(nèi)容說明如下:
#Azkaban Personalization Settings
azkaban.name=Test #服務(wù)器UI名稱,用于服務(wù)器上方顯示的名字
azkaban.label=My Local Azkaban #描述
azkaban.color=#FF3601 #UI顏色
azkaban.default.servlet.path=/index #
web.resource.dir=web/ #默認(rèn)根web目錄
default.timezone.id=Asia/Shanghai #默認(rèn)時(shí)區(qū),已改為亞洲/上海 默認(rèn)為美國
#Azkaban UserManager class
user.manager.class=azkaban.user.XmlUserManager #用戶權(quán)限管理默認(rèn)類
user.manager.xml.file=conf/azkaban-users.xml #用戶配置,具體配置參加下文
#Loader for projects
executor.global.properties=conf/global.properties # global配置文件所在位置
azkaban.project.dir=projects #
database.type=mysql #數(shù)據(jù)庫類型
mysql.port=3306 #端口號(hào)
mysql.host=hadoop03 #數(shù)據(jù)庫連接IP
mysql.database=azkaban #數(shù)據(jù)庫實(shí)例名
mysql.user=root #數(shù)據(jù)庫用戶名
mysql.password=root #數(shù)據(jù)庫密碼
mysql.numconnections=100 #最大連接數(shù)
# Velocity dev mode
velocity.dev.mode=false
# Jetty服務(wù)器屬性.
jetty.maxThreads=25 #最大線程數(shù)
jetty.ssl.port=8443 #Jetty SSL端口
jetty.port=8081 #Jetty端口
jetty.keystore=keystore #SSL文件名
jetty.password=123456 #SSL文件密碼
jetty.keypassword=123456 #Jetty主密碼 與 keystore文件相同
jetty.truststore=keystore #SSL文件名
jetty.trustpassword=123456 # SSL文件密碼
# 執(zhí)行服務(wù)器屬性
executor.port=12321 #執(zhí)行服務(wù)器端口
# 郵件設(shè)置
mail.sender=xxxxxxxx@163.com #發(fā)送郵箱
mail.host=smtp.163.com #發(fā)送郵箱smtp地址
mail.user=xxxxxxxx #發(fā)送郵件時(shí)顯示的名稱
mail.password=********** #郵箱密碼
job.failure.email=xxxxxxxx@163.com #任務(wù)失敗時(shí)發(fā)送郵件的地址
job.success.email=xxxxxxxx@163.com #任務(wù)成功時(shí)發(fā)送郵件的地址
lockdown.create.projects=false #
cache.directory=cache #緩存目錄
azkaban 執(zhí)行服務(wù)器配置
進(jìn)入執(zhí)行服務(wù)器安裝目錄conf,修改azkaban.properties
vi azkaban.properties
#Azkaban
default.timezone.id=Asia/Shanghai #時(shí)區(qū)
# Azkaban JobTypes 插件配置
azkaban.jobtype.plugin.dir=plugins/jobtypes #jobtype 插件所在位置
#Loader for projects
executor.global.properties=conf/global.properties
azkaban.project.dir=projects
#數(shù)據(jù)庫設(shè)置
database.type=mysql #數(shù)據(jù)庫類型(目前只支持mysql)
mysql.port=3306 #數(shù)據(jù)庫端口號(hào)
mysql.host=192.168.20.200 #數(shù)據(jù)庫IP地址
mysql.database=azkaban #數(shù)據(jù)庫實(shí)例名
mysql.user=azkaban #數(shù)據(jù)庫用戶名
mysql.password=oracle #數(shù)據(jù)庫密碼
mysql.numconnections=100 #最大連接數(shù)
# 執(zhí)行服務(wù)器配置
executor.maxThreads=50 #最大線程數(shù)
executor.port=12321 #端口號(hào)(如修改,請(qǐng)與web服務(wù)中一致)
executor.flow.threads=30 #線程數(shù)
用戶配置
進(jìn)入azkaban web服務(wù)器conf目錄,修改azkaban-users.xml
vi azkaban-users.xml 增加 管理員用戶
<azkaban-users>
<user username="azkaban" password="azkaban" roles="admin" groups="azkaban" />
<user username="metrics" password="metrics" roles="metrics"/>
#添加下面這行
<user username="admin" password="admin" roles="admin,metrics" />
<role name="admin" permissions="ADMIN" />
<role name="metrics" permissions="METRICS"/>
</azkaban-users>
4、啟動(dòng)
web服務(wù)器
在azkaban web服務(wù)器目錄下執(zhí)行啟動(dòng)命令
bin/azkaban-web-start.sh
注:在web服務(wù)器根目錄運(yùn)行
執(zhí)行服務(wù)器
在執(zhí)行服務(wù)器目錄下執(zhí)行啟動(dòng)命令
bin/azkaban-executor-start.sh ./
注:只能要執(zhí)行服務(wù)器根目錄運(yùn)行
啟動(dòng)完成后,在瀏覽器(建議使用谷歌瀏覽器)中輸入https://服務(wù)器IP地址:8443 ,即可訪問azkaban服務(wù)了.在登錄中輸入剛才新的戶用名及密碼,點(diǎn)擊 login.
啟動(dòng)可能出現(xiàn)的問題:
啟動(dòng)azkaban時(shí)出現(xiàn)User xml file conf/azkaban-users.xml doesn’t exist問題或登錄頁沒有樣式:修改配置文件里面的內(nèi)容為絕對(duì)路徑;
啟動(dòng)azkaban時(shí)報(bào)錯(cuò)mysql無法連接,但配置文件都填的無誤:mysql沒有開放外網(wǎng)訪問權(quán)限。
使用實(shí)例
一、command類型單一job
1 創(chuàng)建job描述文件
#command.job
type=command
command=echo 'hello'
2 將job打包成zip文件
zip command.job
3 通過azkaban網(wǎng)頁創(chuàng)建project并將job壓縮包上傳


4 啟動(dòng)job

二、command類型多job工作流flow
1 創(chuàng)建有依賴關(guān)系的多個(gè)job描述
第一個(gè)job
#foo.job
type=command
command=echo foo
第二個(gè)job:bar.job依賴foo.job
type=command
dependencies=foo
command=echo bar
2 將所有的job資源文件打包到一個(gè)zip文件中

3 在azkaban的web管理界面創(chuàng)建工程并上傳zip包
4 啟動(dòng)工作流flow
三、HDFS操作任務(wù)
1 創(chuàng)建job描述文件
# fs.job
type=command
command=/home/hadoop/apps/hadoop-2.6.1/bin/hadoop fs -mkdir /azaz
2 將job資源文件打包成zip文件

3、通過azkaban的web管理平臺(tái)創(chuàng)建project并上傳job壓縮包
4、啟動(dòng)執(zhí)行該job
四、MAPREDUCE任務(wù)
Mr任務(wù)依然可以使用command的job類型來執(zhí)行
1 創(chuàng)建job描述文件,及mr程序jar包(示例中直接使用hadoop自帶的example jar)
# mrwc.job
type=command
command=/home/hadoop/apps/hadoop-2.6.1/bin/hadoop jar hadoop-mapreduce-examples-2.6.1.jar wordcount /wordcount/input /wordcount/azout
2 將所有job資源文件打到一個(gè)zip包中

3 在azkaban的web管理界面創(chuàng)建工程并上傳zip包
4 啟動(dòng)job
五、HIVE腳本任務(wù)
1 創(chuàng)建job描述文件和hive腳本
Hive腳本: test.sql
use default;
drop table aztest;
create table aztest(id int,name string) row format delimited fields terminated by ',';
load data inpath '/aztest/hiveinput' into table aztest;
create table azres as select * from aztest;
insert overwrite directory '/aztest/hiveoutput' select count(1) from aztest;
Job描述文件:hivef.job
# hivef.job
type=command
command=/home/hadoop/apps/hive/bin/hive -f 'test.sql'
2 將所有job資源文件打到一個(gè)zip包中
3 在azkaban的web管理界面創(chuàng)建工程并上傳zip包
4 啟動(dòng)job
</div>
Azkaban應(yīng)用
一、業(yè)務(wù)場(chǎng)景
在廣告追蹤系統(tǒng)中,我們通過提供SDK給用戶,把各種各樣的用戶數(shù)據(jù)采集到我們的服務(wù)器中,然后通過MR計(jì)算,統(tǒng)計(jì)各種輸出。在本文中,筆者將抽取其中一種業(yè)務(wù)場(chǎng)景:計(jì)算用戶留存和付費(fèi)LTV。
為了計(jì)算以上兩個(gè)指標(biāo),需要采集三類數(shù)據(jù):賬戶的激活、在線、付費(fèi)記錄。其中用戶留存和付費(fèi)LTV的計(jì)算過程如下:
1、用戶留存:把用戶今天在線的數(shù)據(jù),與一個(gè)月內(nèi)的用戶激活數(shù)據(jù)做對(duì)比,找出今天在線的用戶,是在那天激活的,并計(jì)算出差別的天數(shù),這就是用戶留存的計(jì)算方法。
2、付費(fèi)LTV:找出今天哪些用戶付費(fèi)了,把這些用戶,與一個(gè)月內(nèi)的用戶激活數(shù)據(jù)做對(duì)比,找出今天付費(fèi)的用戶,是在那天激活的,并計(jì)算出差別的天數(shù),然后把今天付費(fèi)的總額,除以差別的天數(shù),得出付費(fèi)LTV。
出于對(duì)公司數(shù)據(jù)安全考慮,這里不會(huì)貼出任何數(shù)據(jù)和計(jì)算代碼,只會(huì)把與Azkaban相關(guān)的job信息和思路寫出來,讀者可以作為參考。
二、處理思路
1、原始的用戶數(shù)據(jù)是混合在一起的,都放在按天分區(qū)的hdfs的指定目錄下,這樣,我們就需要寫一個(gè)作為數(shù)據(jù)清洗的MR類,把原始日志中的在線,激活,付費(fèi)三類數(shù)據(jù)分別輸出到獨(dú)立的文件中。這在hadoopMR中可以通過輸出文件后綴的方式進(jìn)行區(qū)分。
2、完成第一步后,我們需要把三類數(shù)據(jù)分別進(jìn)行統(tǒng)計(jì),比如按照appid進(jìn)行統(tǒng)計(jì),幣別需要轉(zhuǎn)換,激活時(shí)間需要從時(shí)間戳轉(zhuǎn)換為日期等步驟。
3、第三步就需要把這三類數(shù)據(jù)分別入庫到hive中,供后面的hiveSQL進(jìn)行join操作。
4、把在線數(shù)據(jù)與激活數(shù)據(jù)做join,得出用戶留存;把付費(fèi)數(shù)據(jù)與激活數(shù)據(jù)做join,得出付費(fèi)LTV。這兩類數(shù)據(jù)計(jì)算完成后,需要入庫到新的表中。
5、最后在kylin中進(jìn)行計(jì)算,用戶就可以在kylin中查詢統(tǒng)計(jì)結(jié)果了。
總的數(shù)據(jù)處理流程如下:

三、具體job編寫
1、logStat.job:數(shù)據(jù)拆分
type=hadoopJava
job.extend=true
force.output.overwrite=true
mapred.mapper.new-api=true
mapred.reducer.new-api=true
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
tmpjars=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/ad-tracker-mr2-1.0.0-SNAPSHOT-jar-with-dependencies.jar
input.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD-1}/*/input/self-event*
output.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/TrackingLogStat
calculate.date=all
job.class=com.dataeye.tracker.mr.mapred.actionpay.LogStatMapper
mapreduce.map.class=com.dataeye.tracker.mr.mapred.actionpay.LogStatMapper
mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.actionpay.LogStatReducer
mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.output.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.output.value.class=org.apache.hadoop.io.NullWritable
mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat
mapreduce.outputformat.class=com.dataeye.tracker.mr.common.SuffixMultipleOutputFormat
2、onlineLogStat.job:在線數(shù)據(jù)清洗
type=hadoopJava
job.extend=true
force.output.overwrite=true
mapred.mapper.new-api=true
mapred.reducer.new-api=true
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
input.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/TrackingLogStat/*TRACING_ACTIVE_LOG_ONLINE
output.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtOnlineLogStat
calculate.date=${DD:YYYY}-${DD:MM}-${DD:DD}
job.class=com.dataeye.tracker.mr.mapred.actionpay.OnlineLogStatMapper
mapreduce.map.class=com.dataeye.tracker.mr.mapred.actionpay.OnlineLogStatMapper
mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.actionpay.OnlineLogStatReducer
mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.output.key.class=org.apache.hadoop.io.Text
mapred.output.value.class=org.apache.hadoop.io.NullWritable
mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat
mapreduce.outputformat.class=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
dependencies=logStat
3、activeLogStat.job:激活數(shù)據(jù)清洗
type=hadoopJava
job.extend=true
force.output.overwrite=true
mapred.mapper.new-api=true
mapred.reducer.new-api=true
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
input.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/TrackingLogStat/*TRACING_ACTIVE_LOG_ACTIVE,/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD-1}/00/adtActiveLogStat
output.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtActiveLogStat
calculate.date=${DD:YYYY}-${DD:MM}-${DD:DD}
job.class=com.dataeye.tracker.mr.mapred.actionpay.ActiveLogStatMapper
mapreduce.map.class=com.dataeye.tracker.mr.mapred.actionpay.ActiveLogStatMapper
mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.actionpay.ActiveLogStatReducer
mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.output.key.class=org.apache.hadoop.io.Text
mapred.output.value.class=org.apache.hadoop.io.NullWritable
mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat
mapreduce.outputformat.class=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
dependencies=logStat
4、paymentLogStat.job:付費(fèi)數(shù)據(jù)清洗
type=hadoopJava
job.extend=true
force.output.overwrite=true
mapred.mapper.new-api=true
mapred.reducer.new-api=true
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
input.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/TrackingLogStat/*TRACING_ACTIVE_LOG_PAYMENT
output.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtPaymentLogStat
calculate.date=${DD:YYYY}-${DD:MM}-${DD:DD}
job.class=com.dataeye.tracker.mr.mapred.actionpay.PaymentLogStatMapper
mapreduce.map.class=com.dataeye.tracker.mr.mapred.actionpay.PaymentLogStatMapper
mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.actionpay.PaymentLogStatReducer
mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.output.key.class=org.apache.hadoop.io.Text
mapred.output.value.class=org.apache.hadoop.io.NullWritable
mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat
mapreduce.outputformat.class=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
dependencies=logStat
5、onlineHive.job:在線數(shù)據(jù)入庫
type=hive
user.to.proxy=azkaban
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
azk.hive.action=execute.query
hive.script = res/hive_online.sql
dataPath=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtOnlineLogStat
day_p=${DD:YYYY}-${DD:MM}-${DD:DD-1}
dependencies=onlineLogStat
hive_online.sql:
use azkaban;
load data inpath '${dataPath}' overwrite into table adt_logstat_online PARTITION(day_p='${day_p}');
6、activeHive.job:激活數(shù)據(jù)入庫
type=hive
user.to.proxy=azkaban
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
azk.hive.action=execute.query
hive.script = res/hive_active.sql
dataPath=hdfs://de-hdfs/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtActiveLogStat
day_p=${DD:YYYY}-${DD:MM}-${DD:DD-1}
dependencies=activeLogStat
hive_active.sql
use azkaban;
alter table adt_logstat_active_ext set location '${dataPath}';
INSERT overwrite TABLE adt_logstat_active PARTITION (day_p='${day_p}') SELECT appid,channel,compaign,publisher,site,country,province,city,deviceId,activeDate FROM adt_logstat_active_ext;
7、paymentHive.job:付費(fèi)數(shù)據(jù)入庫
type=hive
user.to.proxy=azkaban
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
azk.hive.action=execute.query
hive.script = res/hive_payment.sql
dataPath=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtPaymentLogStat
day_p=${DD:YYYY}-${DD:MM}-${DD:DD-1}
dependencies=paymentLogStat
hive_payment.sql
use azkaban;
load data inpath '${dataPath}' overwrite into table adt_logstat_payment PARTITION(day_p='${day_p}');
8、activeOnlineHive.job:用戶留存統(tǒng)計(jì)
type=hive
user.to.proxy=azkaban
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
azk.hive.action=execute.query
hive.script = res/hive_active_online.sql
now_day=${DD:YYYY}-${DD:MM}-${DD:DD-1}
bef_day=${DD:YYYY}-${DD:MM}-${DD:DD-3}
dependencies=activeHive,onlineHive
hive_active_online.sql
use azkaban;
INSERT overwrite TABLE user_retain_roll PARTITION (day_p='${now_day}') SELECT av.appid as appid, ol.channel as channel,ol.compaign as compaign,ol.publisher as publisher,ol.site as site, count(av.deviceId) AS total FROM adt_logstat_online AS ol INNER JOIN adt_logstat_active AS av ON ol.deviceId = av.deviceId and ol.appid = av.appid WHERE ol.day_p = '${now_day}' AND av.activeDate BETWEEN '${bef_day}' AND '${now_day}' GROUP BY av.appid, ol.channel,ol.compaign,ol.publisher,ol.site,av.activeDate;
9、activePaymentHive.job:付費(fèi)LTV統(tǒng)計(jì)
type=hive
user.to.proxy=azkaban
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
azk.hive.action=execute.query
hive.script = res/hive_active_payment.sql
now_day=${DD:YYYY}-${DD:MM}-${DD:DD-1}
bef_day=${DD:YYYY}-${DD:MM}-${DD:DD-3}
dependencies=activeHive,paymentHive
hive_active_payment.sql
use azkaban;
INSERT overwrite TABLE user_retain_ltv PARTITION (day_p='${now_day}') select av.appid as appid, py.channel as channel,py.compaign as compaign,py.publisher as publisher,py.site as site, py.deviceId as deviceId, py.paymentCount AS payment from adt_logstat_payment as py inner join adt_logstat_active as av on av.deviceId = py.deviceId and av.appid = py.appid where py.day_p = '${now_day}' and av.day_p = '${now_day}';
10、kylin.job:kylin計(jì)算
type=hadoopJava
job.extend=false
job.class=com.dataeye.kylin.azkaban.JavaMain
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
kylin.url=http://mysql8:7070/kylin/api
cube.name=user_retain_roll_cube,user_retain_ltv_cube
cube.date=${DD:YYYY}-${DD:MM}-${DD:DD-1}
dependencies=activeOnlineHive,activePaymentHive
四、打包運(yùn)行
打包過程與之前一致,最終的目錄結(jié)構(gòu)如下:

運(yùn)行結(jié)果如下:
可以查看執(zhí)行日志:

Flume介紹
一、Flume簡(jiǎn)介
flume 作為 cloudera 開發(fā)的實(shí)時(shí)日志收集系統(tǒng),受到了業(yè)界的認(rèn)可與廣泛應(yīng)用。Flume 初始的發(fā)行版本目前被統(tǒng)稱為 Flume OG(original generation),屬于 cloudera。
但隨著 FLume 功能的擴(kuò)展,F(xiàn)lume OG 代碼工程臃腫、核心組件設(shè)計(jì)不合理、核心配置不標(biāo)準(zhǔn)等缺點(diǎn)暴露出來,尤其是在 Flume OG 的最后一個(gè)發(fā)行版本 0.9.4. 中,日
志傳輸不穩(wěn)定的現(xiàn)象尤為嚴(yán)重,為了解決這些問題,2011 年 10 月 22 號(hào),cloudera 完成了 Flume-728,對(duì) Flume 進(jìn)行了里程碑式的改動(dòng):重構(gòu)核心組件、核心配置以
及代碼架構(gòu),重構(gòu)后的版本統(tǒng)稱為 Flume NG(next generation);改動(dòng)的另一原因是將 Flume 納入 apache 旗下,cloudera Flume 改名為 Apache Flume。
備注:Flume參考資料
官方網(wǎng)站: http://flume.apache.org/
用戶文檔: http://flume.apache.org/FlumeUserGuide.html
開發(fā)文檔: http://flume.apache.org/FlumeDeveloperGuide.html
</div>
Flume安裝
<div class="mdContent">
Flume安裝
系統(tǒng)要求:
需安裝JDK 1.7及以上版本
1、 下載二進(jìn)制包
下載頁面:http://flume.apache.org/download.html
1.7.0下載地址:http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
2、解壓
$ cp ~/Downloads/apache-flume-1.7.0-bin.tar.gz ~
$ cd
$ tar -zxvf apache-flume-1.7.0-bin.tar.gz
$ cd apache-flume-1.7.0-bin
3、創(chuàng)建flume-env.sh文件
$ cp conf/flume-env.sh.template conf/flume-env.sh
</div>
Flume內(nèi)部原理
<div class="mdContent">
每個(gè)flume agent包含三個(gè)主要組件:source、channel、sink。
source是從一些其他產(chǎn)生數(shù)據(jù)的應(yīng)用中接收數(shù)據(jù)的活躍組件,有自己產(chǎn)生數(shù)據(jù)的source,不過這些source通常用于測(cè)試目的,source可以監(jiān)聽一個(gè)或者多個(gè)網(wǎng)絡(luò)端口,用于接收數(shù)據(jù)或者可以從本地文件系統(tǒng)讀取數(shù)據(jù),每個(gè)source必須至少連接一個(gè)channel,基于一些標(biāo)準(zhǔn),一個(gè)source可以寫入幾個(gè)channel,復(fù)制事件到所有或某些channel。
一般來說,channel是被動(dòng)組件(雖然它們可以為了清理或者垃圾回收運(yùn)行自己的線程),緩沖agent已經(jīng)接收,但尚未寫出到另一個(gè)agent或者存儲(chǔ)系統(tǒng)的數(shù)據(jù),channel的行為像隊(duì)列,source寫入到它們,sink從它們中讀取,多個(gè)source可以安全地寫入到相同channel,并且多個(gè)sink可以從相同的channel進(jìn)行讀取,可是一個(gè)sink只能從一個(gè)channel讀取,如果多個(gè)sink從相同的channel讀取,它可以保證只有一個(gè)sink將會(huì)從channel讀取一個(gè)指定特定的事件,
sink連續(xù)輪詢各自的channel來讀取和刪除事件,sink將事件推送到下一階段,或者最終目的地。一旦在下一階段或其目的地中數(shù)據(jù)是安全的,sink通過事務(wù)提交通知channel,可以從channel中刪除這些事件。

flume本身不限制agent中source、channel和sink的數(shù)量,因此flume source可以接收事件,并可以通過配置將事件復(fù)制到多個(gè)目的地,這使得source通過channel處理器、攔截器和channel選擇器,寫入數(shù)據(jù)到channel成為可能
每個(gè)source都有自己的channel處理器,每次source將數(shù)據(jù)寫入channel,它是通過委派該任務(wù)到其channel處理器來完成的,然后,channel處理器將這些事件傳到一個(gè)或多個(gè)source配置的攔截器中,
攔截器是一段代碼,可以基于某些它完成的處理來讀取事件和修改或刪除事件,基于某些標(biāo)準(zhǔn),如正則表達(dá)式,攔截器可以用來刪除事件,為事件添加新報(bào)頭或移除現(xiàn)有的報(bào)頭等,每個(gè)source可以配置成使用多個(gè)攔截器,按照配置中定義的順序被調(diào)用,將攔截器的結(jié)果傳遞給鏈的下一個(gè)單元,這就是所謂的責(zé)任鏈的設(shè)計(jì)模式,一旦攔截器處理完事件,攔截器鏈返回的事件列表傳遞到channel列表,即通過channel選擇器為每個(gè)事件選擇channel。
source可以通過處理器-攔截器-選擇器路由寫入多個(gè)channel,channel選擇器的決定每個(gè)事件必須寫入到source附帶的哪個(gè)channel的組件。因此攔截器可以用來插入或刪除事件中的數(shù)據(jù),這樣channel選擇器可以應(yīng)用一些條件在這些事件上,來決定事件必須寫入哪些channel,channel選擇器可以對(duì)事件應(yīng)用任意過濾條件,來決定每個(gè)事件必須寫入哪些channel,以及哪些channel是必須的或可選的。
寫入到必需的channel失敗將會(huì)導(dǎo)致channel處理器拋出channelexception,表明source必須重新重試該事件,而未能寫入可選channel失敗僅僅忽略它,一旦寫出事件,處理器會(huì)對(duì)source指示成功狀態(tài),可能發(fā)送確認(rèn)給發(fā)送該事件的系統(tǒng),并繼續(xù)接受更多的事件。

sink運(yùn)行器運(yùn)行一個(gè)sink組,sink組可含有一個(gè)或多個(gè)sink,如果組中只存在一個(gè)sink,那么沒有組將更有效率,sink運(yùn)行器僅僅是一個(gè)詢問sink組來處理下一批事件的線程,每個(gè)sink組有一個(gè)sink處理器,處理器選擇組中的sink之一去處理下一個(gè)事件集合,每個(gè)sink只能從一個(gè)channel獲取數(shù)據(jù),盡管多個(gè)sink可以從同一個(gè)channel獲取數(shù)據(jù),選定的sink從channel中接收事件,并將事件寫入到下一階段或最終目的地。

</div>
Source、Channel、Sink
<div class="mdContent">
Source、Channel、Sink有哪些類型
Flume Source
Source類型 | 說明
Avro Source | 支持Avro協(xié)議(實(shí)際上是Avro RPC),內(nèi)置支持
Thrift Source | 支持Thrift協(xié)議,內(nèi)置支持
Exec Source | 基于Unix的command在標(biāo)準(zhǔn)輸出上生產(chǎn)數(shù)據(jù)
JMS Source | 從JMS系統(tǒng)(消息、主題)中讀取數(shù)據(jù)
Spooling Directory Source | 監(jiān)控指定目錄內(nèi)數(shù)據(jù)變更
Twitter 1% firehose Source| 通過API持續(xù)下載Twitter數(shù)據(jù),試驗(yàn)性質(zhì)
Netcat Source | 監(jiān)控某個(gè)端口,將流經(jīng)端口的每一個(gè)文本行數(shù)據(jù)作為Event輸入
Sequence Generator Source | 序列生成器數(shù)據(jù)源,生產(chǎn)序列數(shù)據(jù)
Syslog Sources | 讀取syslog數(shù)據(jù),產(chǎn)生Event,支持UDP和TCP兩種協(xié)議
HTTP Source | 基于HTTP POST或GET方式的數(shù)據(jù)源,支持JSON、BLOB表示形式
Legacy Sources | 兼容老的Flume OG中Source(0.9.x版本)
Flume Channel
Channel類型 說明
Memory Channel | Event數(shù)據(jù)存儲(chǔ)在內(nèi)存中
JDBC Channel | Event數(shù)據(jù)存儲(chǔ)在持久化存儲(chǔ)中,當(dāng)前Flume Channel內(nèi)置支持Derby
File Channel | Event數(shù)據(jù)存儲(chǔ)在磁盤文件中
Spillable Memory Channel | Event數(shù)據(jù)存儲(chǔ)在內(nèi)存中和磁盤上,當(dāng)內(nèi)存隊(duì)列滿了,會(huì)持久化到磁盤文件
Pseudo Transaction Channel | 測(cè)試用途
Custom Channel | 自定義Channel實(shí)現(xiàn)
Flume Sink
Sink類型 說明
HDFS Sink | 數(shù)據(jù)寫入HDFS
Logger Sink | 數(shù)據(jù)寫入日志文件
Avro Sink | 數(shù)據(jù)被轉(zhuǎn)換成Avro Event,然后發(fā)送到配置的RPC端口上
Thrift Sink | 數(shù)據(jù)被轉(zhuǎn)換成Thrift Event,然后發(fā)送到配置的RPC端口上
IRC Sink | 數(shù)據(jù)在IRC上進(jìn)行回放
File Roll Sink | 存儲(chǔ)數(shù)據(jù)到本地文件系統(tǒng)
Null Sink | 丟棄到所有數(shù)據(jù)
HBase Sink | 數(shù)據(jù)寫入HBase數(shù)據(jù)庫
Morphline Solr Sink | 數(shù)據(jù)發(fā)送到Solr搜索服務(wù)器(集群)
ElasticSearch Sink | 數(shù)據(jù)發(fā)送到Elastic Search搜索服務(wù)器(集群)
Kite Dataset Sink | 寫數(shù)據(jù)到Kite Dataset,試驗(yàn)性質(zhì)的
Custom Sink | 自定義Sink實(shí)現(xiàn)
</div>
Flume應(yīng)用案例
<div class="mdContent">
案例1、 A simple example
http://flume.apache.org/FlumeUserGuide.html#a-simple-example
配置文件
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
啟動(dòng)flume
flume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console 指定配置目錄
flume-ng agent -n a1 -f op5 -Dflume.root.logger=INFO,console 不用指定配置目錄,將上訴source,channel,sink的文件起名為a1,同時(shí)指定這個(gè)文件在哪
安裝telnet
yum install telnet
退出 ctrl+] quit
Memory Chanel 配置
capacity:默認(rèn)該通道中最大的可以存儲(chǔ)的event數(shù)量是100,
trasactionCapacity:每次最大可以source中拿到或者送到sink中的event數(shù)量也是100
keep-alive:event添加到通道中或者移出的允許時(shí)間
byte**:即event的字節(jié)量的限制,只包括eventbody
案例2、兩個(gè)flume做集群(第一個(gè)agent的sink作為第二個(gè)agent的source)
node01服務(wù)器中,配置文件
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node1
a1.sources.r1.port = 44444
# Describe the sink
# a1.sinks.k1.type = logger
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node2
a1.sinks.k1.port = 60000
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
node02服務(wù)器中,安裝Flume(步驟略)
配置文件
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node2
a1.sources.r1.port = 60000
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
先啟動(dòng)node02的Flume
flume-ng agent -n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console
再啟動(dòng)node01的Flume
flume-ng agent -n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console
打開telnet 測(cè)試 node02控制臺(tái)輸出結(jié)果
案例3、Exec Source(監(jiān)聽一個(gè)文件)
http://flume.apache.org/FlumeUserGuide.html#exec-source
配置文件
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/flume.exec.log
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
啟動(dòng)Flume
flume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console
創(chuàng)建空文件演示 touch flume.exec.log
循環(huán)添加數(shù)據(jù)
for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done
案例4、Spooling Directory Source(監(jiān)聽一個(gè)目錄)
http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
配置文件
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
啟動(dòng)Flume
flume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console
拷貝文件演示
mkdir logs
cp flume.exec.log logs/
案例5、hdfs sink
http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
配置文件
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true
# Describe the sink
***只修改上一個(gè)spool sink的配置代碼塊 a1.sinks.k1.type = logger
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://sxt/flume/%Y-%m-%d/%H%M
##每隔60s或者文件大小超過10M的時(shí)候產(chǎn)生新文件
# hdfs有多少條消息時(shí)新建文件,0不基于消息個(gè)數(shù)
a1.sinks.k1.hdfs.rollCount=0
# hdfs創(chuàng)建多長時(shí)間新建文件,0不基于時(shí)間
a1.sinks.k1.hdfs.rollInterval=60
# hdfs多大時(shí)新建文件,0不基于文件大小
a1.sinks.k1.hdfs.rollSize=10240
# 當(dāng)目前被打開的臨時(shí)文件在該參數(shù)指定的時(shí)間(秒)內(nèi),沒有任何數(shù)據(jù)寫入,則將該臨時(shí)文件關(guān)閉并重命名成目標(biāo)文件
a1.sinks.k1.hdfs.idleTimeout=3
a1.sinks.k1.hdfs.fileType=DataStream
時(shí)間參數(shù)一定要帶上 true
a1.sinks.k1.hdfs.useLocalTimeStamp=true
## 每五分鐘生成一個(gè)目錄:
# 是否啟用時(shí)間上的”舍棄”,這里的”舍棄”,類似于”四舍五入”,后面再介紹。如果啟用,則會(huì)影響除了%t的其他所有時(shí)間表達(dá)式
a1.sinks.k1.hdfs.round=true
# 時(shí)間上進(jìn)行“舍棄”的值;
a1.sinks.k1.hdfs.roundValue=5
# 時(shí)間上進(jìn)行”舍棄”的單位,包含:second,minute,hour
a1.sinks.k1.hdfs.roundUnit=minute
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1(將source,channel,sink關(guān)聯(lián))
############################################################
創(chuàng)建HDFS目錄
hadoop fs -mkdir /flume
啟動(dòng)Flume
flume-ng agent -n a1 -c conf -f hdfs.conf -Dflume.root.logger=INFO,console
查看hdfs文件
hadoop fs -ls /flume/...
hadoop fs -get /flume/...
</div>