Azkaban工作流引擎和Flume數(shù)據(jù)采集

.Azkaban工作流引擎和Flume數(shù)據(jù)采集

Azkaban介紹

一、Azkaban簡(jiǎn)介

   Azkaban是由Linkedin開源的一個(gè)**批量工作流任務(wù)調(diào)度器**。用于在一個(gè)工作流內(nèi)以一個(gè)特定的順序運(yùn)行一組工作和流程。Azkaban定義了一種KV文件格式來建立任務(wù)之間的依賴關(guān)系,并提供一個(gè)易于使用的web用戶界面維護(hù)和跟蹤你的工作流。簡(jiǎn)而言之就是一個(gè)工作流調(diào)度系統(tǒng)。

為什么需要工作流調(diào)度系統(tǒng)?

因?yàn)橐粋€(gè)完整的數(shù)據(jù)分析系統(tǒng)通常都是由大量任務(wù)單元組成:shell腳本程序,java程序,mapreduce程序、hive腳本等

而各任務(wù)單元之間存在時(shí)間先后及前后依賴關(guān)系

為了很好地組織起這樣的復(fù)雜執(zhí)行計(jì)劃,需要一個(gè)工作流調(diào)度系統(tǒng)來調(diào)度執(zhí)行;

常見工作流調(diào)度系統(tǒng)

在hadoop領(lǐng)域,常見工作流調(diào)度系統(tǒng)有:Oozie, Azkaban,Cascading,Hamake

下面的表格對(duì)上述四種hadoop工作流調(diào)度器的關(guān)鍵特性進(jìn)行了比較,盡管這些工作流調(diào)度器能夠解決的需求場(chǎng)景基本一致,但在設(shè)計(jì)理念,目標(biāo)用戶,應(yīng)用場(chǎng)景等方面還是存在顯著的區(qū)別,在做技術(shù)選型的時(shí)候,可以提供參考

|

特性

|

Hamake

|

Oozie

|

Azkaban

|

Cascading

|
|

工作流描述語言

|

XML

|

XML (xPDL based)

|

text file with key/value pairs

|

Java API

|
|

依賴機(jī)制

|

data-driven

|

explicit

|

explicit

|

explicit

|
|

是否要web容器

|

No

|

Yes

|

Yes

|

No

|
|

進(jìn)度跟蹤

|

console/log messages

|

web page

|

web page

|

Java API

|
|

Hadoop job調(diào)度支持

|

no

|

yes

|

yes

|

yes

|
|

運(yùn)行模式

|

command line utility

|

daemon

|

daemon

|

API

|
|

Pig支持

|

yes

|

yes

|

yes

|

yes

|
|

事件通知

|

no

|

no

|

no

|

yes

|
|

需要安裝

|

no

|

yes

|

yes

|

no

|
|

支持的hadoop版本

|

0.18+

|

0.20+

|

currently unknown

|

0.18+

|
|

重試支持

|

no

|

workflownode evel

|

yes

|

yes

|
|

運(yùn)行任意命令

|

yes

|

yes

|

yes

|

yes

|
|

Amazon EMR支持

|

yes

|

no

|

currently unknown

|

yes

|

其中比較常用的為Azkaban和Oozie。

Azkaban功能特點(diǎn)

1 Web用戶界面

2 方便上傳工作流

3 方便設(shè)置任務(wù)之間的關(guān)系

4 調(diào)度工作流

5 認(rèn)證/授權(quán)(權(quán)限的工作)

6 能夠殺死并重新啟動(dòng)工作流

7 模塊化和可插拔的插件機(jī)制

8 項(xiàng)目工作區(qū)

9 工作流和任務(wù)的日志記錄和審計(jì)

二、Azkaban使用

  Azkaban有web界面,輸入[https://localhost:8443](https://localhost:8443/) (注意是https)可以訪問Azkaban的用戶界面。如圖:

[圖片上傳失敗...(image-b8de71-1535698211232)]

首頁有四個(gè)菜單

  • projects:最重要的部分,創(chuàng)建一個(gè)工程,所有flows將在工程中運(yùn)行。
  • scheduling:顯示定時(shí)任務(wù)
  • executing:顯示當(dāng)前運(yùn)行的任務(wù)
  • history:顯示歷史運(yùn)行任務(wù)

2.1 創(chuàng)建工程

     一個(gè)工程包含一個(gè)或多個(gè)flows,一個(gè)flow包含多個(gè)job。job是你想在azkaban中運(yùn)行的一個(gè)進(jìn)程,可以是簡(jiǎn)單的linux命令,可是java程序,也可以是復(fù)雜的shell腳本,當(dāng)然,如果你安裝相關(guān)插件,也可以運(yùn)行插件。一個(gè)job可以依賴于另一個(gè)job,這種多個(gè)job和它們的依賴組成的圖表叫做flow。點(diǎn)擊右上角的create project,在彈出的窗口中填寫工程名和描述即可創(chuàng)建工程。
image.png

2.2 創(chuàng)建job

創(chuàng)建job很簡(jiǎn)單,只要?jiǎng)?chuàng)建一個(gè)以.job結(jié)尾的文本文件就行了。比如:
# foo.jobtype=commandcommand=echo foo

如果是多個(gè)job并且有依賴關(guān)系,可以使用dependencies參數(shù)指定依賴關(guān)系。如:

# bar.jobtype=commanddependencies=foocommand=echo bar

這樣job就創(chuàng)建好了。

2.3 將工作流打包上傳

  將上面兩個(gè)job打成zip包,在頁面上點(diǎn)擊update上傳。上傳之后如圖:
image.png

2.4 運(yùn)行

   之后點(diǎn)擊綠色的Execute Flow,彈出窗口:
image.png

左邊的選項(xiàng)卡依次為:

Flow view:流程視圖??梢越?,啟用某些job
Notification:定義任務(wù)成功或者失敗是否發(fā)送郵件
Failure Options:定義一個(gè)job失敗,剩下的job怎么執(zhí)行
Concurrent:并行任務(wù)執(zhí)行設(shè)置
Flow Parametters:參數(shù)設(shè)置。

左下角的Schedule是設(shè)置調(diào)度時(shí)間,右下角的Execute為直接運(yùn)行,點(diǎn)擊Execute。運(yùn)行之后在Graph可以看到:

image.png

在job List中可以看到個(gè)job運(yùn)行的起始終止時(shí)間。


image.png

這樣工作流的調(diào)度就執(zhí)行完了,Azkaban的使用還是挺簡(jiǎn)單的吧。

Azkaban安裝部署

Azkaban安裝

1、準(zhǔn)備工作
Azkaban Web服務(wù)器
azkaban-web-server-2.5.0.tar.gz
Azkaban執(zhí)行服務(wù)器
azkaban-executor-server-2.5.0.tar.gz

MySQL
目前azkaban只支持 mysql,需安裝mysql服務(wù)器,本文檔中默認(rèn)已安裝好mysql服務(wù)器,并建立了 root用戶,密碼 root.

azkaban下載地址:http://azkaban.github.io/downloads.html

2、安裝
將安裝文件上傳到集群,最好上傳到安裝 hive、sqoop的機(jī)器上,方便命令的執(zhí)行
在合適的位置新建azkaban目錄,用于存放azkaban運(yùn)行程序
azkaban web服務(wù)器安裝
解壓azkaban-web-server-2.5.0.tar.gz
命令: tar –zxvf azkaban-web-server-2.5.0.tar.gz
將解壓后的azkaban-web-server-2.5.0 移動(dòng)到 azkaban目錄中,并重新命名 webserver
命令: mv azkaban-web-server-2.5.0 ../azkaban
cd ../azkaban
mv azkaban-web-server-2.5.0 server

azkaban 執(zhí)行服器安裝
解壓azkaban-executor-server-2.5.0.tar.gz
命令:tar –zxvf azkaban-executor-server-2.5.0.tar.gz
將解壓后的azkaban-executor-server-2.5.0 移動(dòng)到 azkaban目錄中,并重新命名 executor
命令:mv azkaban-executor-server-2.5.0 ../azkaban
cd ../azkaban
mv azkaban-executor-server-2.5.0 executor

azkaban數(shù)據(jù)庫腳本導(dǎo)入
解壓: azkaban-sql-script-2.5.0.tar.gz
命令:tar –zxvf azkaban-sql-script-2.5.0.tar.gz
將解壓后的mysql 腳本,導(dǎo)入到mysql中:
進(jìn)入mysql
mysql> create database azkaban;
mysql> use azkaban;
Database changed
mysql> source /home/hadoop/azkaban-2.5.0/create-all-sql-2.5.0.sql;

創(chuàng)建SSL配置
命令: keytool -keystore keystore -alias jetty -genkey -keyalg RSA
運(yùn)行此命令后,會(huì)提示輸入當(dāng)前生成 keystor的密碼及相應(yīng)信息,輸入的密碼請(qǐng)勞記,信息如下:

輸入keystore密碼:
再次輸入新密碼:
您的名字與姓氏是什么?
[Unknown]:
您的組織單位名稱是什么?
[Unknown]:
您的組織名稱是什么?
[Unknown]:
您所在的城市或區(qū)域名稱是什么?
[Unknown]:
您所在的州或省份名稱是什么?
[Unknown]:
該單位的兩字母國家代碼是什么
[Unknown]: CN
CN=Unknown, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=CN 正確嗎?
[否]: y

輸入的主密碼
(如果和 keystore 密碼相同,按回車):
再次輸入新密碼:
完成上述工作后,將在當(dāng)前目錄生成 keystore 證書文件,將keystore 考貝到 azkaban web服務(wù)器根目錄中的bin目錄下.如:cp keystore azkaban/webserver/bin

配置文件
注:先配置好服務(wù)器節(jié)點(diǎn)上的時(shí)區(qū)
1、先生成時(shí)區(qū)配置文件Asia/Shanghai,用交互式命令 tzselect 即可
2、拷貝該時(shí)區(qū)文件,覆蓋系統(tǒng)本地時(shí)區(qū)配置
cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

3.配置文件的修改
azkaban web服務(wù)器配置
進(jìn)入azkaban web服務(wù)器安裝目錄 conf目錄

修改azkaban.properties文件
命令vi azkaban.properties
內(nèi)容說明如下:

#Azkaban Personalization Settings
azkaban.name=Test                           #服務(wù)器UI名稱,用于服務(wù)器上方顯示的名字
azkaban.label=My Local Azkaban                               #描述
azkaban.color=#FF3601                                                 #UI顏色
azkaban.default.servlet.path=/index                         #
web.resource.dir=web/                                                 #默認(rèn)根web目錄
default.timezone.id=Asia/Shanghai                           #默認(rèn)時(shí)區(qū),已改為亞洲/上海 默認(rèn)為美國

#Azkaban UserManager class
user.manager.class=azkaban.user.XmlUserManager   #用戶權(quán)限管理默認(rèn)類
user.manager.xml.file=conf/azkaban-users.xml              #用戶配置,具體配置參加下文

#Loader for projects
executor.global.properties=conf/global.properties    # global配置文件所在位置
azkaban.project.dir=projects                                                #

database.type=mysql                                                              #數(shù)據(jù)庫類型
mysql.port=3306                                                                       #端口號(hào)
mysql.host=hadoop03                                                      #數(shù)據(jù)庫連接IP
mysql.database=azkaban                                                       #數(shù)據(jù)庫實(shí)例名
mysql.user=root                                                                 #數(shù)據(jù)庫用戶名
mysql.password=root                                                          #數(shù)據(jù)庫密碼
mysql.numconnections=100                                                  #最大連接數(shù)

# Velocity dev mode
velocity.dev.mode=false
# Jetty服務(wù)器屬性.
jetty.maxThreads=25                                                               #最大線程數(shù)
jetty.ssl.port=8443                                                                   #Jetty SSL端口
jetty.port=8081                                                                         #Jetty端口
jetty.keystore=keystore                                                          #SSL文件名
jetty.password=123456                                                             #SSL文件密碼
jetty.keypassword=123456                                                      #Jetty主密碼 與 keystore文件相同
jetty.truststore=keystore                                                                #SSL文件名
jetty.trustpassword=123456                                                   # SSL文件密碼

# 執(zhí)行服務(wù)器屬性
executor.port=12321                                                               #執(zhí)行服務(wù)器端口

# 郵件設(shè)置
mail.sender=xxxxxxxx@163.com                                       #發(fā)送郵箱
mail.host=smtp.163.com                                                       #發(fā)送郵箱smtp地址
mail.user=xxxxxxxx                                       #發(fā)送郵件時(shí)顯示的名稱
mail.password=**********                                                 #郵箱密碼
job.failure.email=xxxxxxxx@163.com                              #任務(wù)失敗時(shí)發(fā)送郵件的地址
job.success.email=xxxxxxxx@163.com                            #任務(wù)成功時(shí)發(fā)送郵件的地址
lockdown.create.projects=false                                           #
cache.directory=cache                                                            #緩存目錄

azkaban 執(zhí)行服務(wù)器配置
進(jìn)入執(zhí)行服務(wù)器安裝目錄conf,修改azkaban.properties
vi azkaban.properties

#Azkaban
default.timezone.id=Asia/Shanghai                                              #時(shí)區(qū)

# Azkaban JobTypes 插件配置
azkaban.jobtype.plugin.dir=plugins/jobtypes                   #jobtype 插件所在位置

#Loader for projects
executor.global.properties=conf/global.properties
azkaban.project.dir=projects

#數(shù)據(jù)庫設(shè)置
database.type=mysql                                                                       #數(shù)據(jù)庫類型(目前只支持mysql)
mysql.port=3306                                                                                #數(shù)據(jù)庫端口號(hào)
mysql.host=192.168.20.200                                                           #數(shù)據(jù)庫IP地址
mysql.database=azkaban                                                                #數(shù)據(jù)庫實(shí)例名
mysql.user=azkaban                                                                         #數(shù)據(jù)庫用戶名
mysql.password=oracle                                                                   #數(shù)據(jù)庫密碼
mysql.numconnections=100                                                           #最大連接數(shù)

# 執(zhí)行服務(wù)器配置
executor.maxThreads=50                                                                #最大線程數(shù)
executor.port=12321                                                               #端口號(hào)(如修改,請(qǐng)與web服務(wù)中一致)
executor.flow.threads=30                                                                #線程數(shù)

用戶配置
進(jìn)入azkaban web服務(wù)器conf目錄,修改azkaban-users.xml
vi azkaban-users.xml 增加 管理員用戶

<azkaban-users>
        <user username="azkaban" password="azkaban" roles="admin" groups="azkaban" />
        <user username="metrics" password="metrics" roles="metrics"/>
        #添加下面這行
        <user username="admin" password="admin" roles="admin,metrics" />
        <role name="admin" permissions="ADMIN" />
        <role name="metrics" permissions="METRICS"/>
</azkaban-users>

4、啟動(dòng)
web服務(wù)器
在azkaban web服務(wù)器目錄下執(zhí)行啟動(dòng)命令
bin/azkaban-web-start.sh
注:在web服務(wù)器根目錄運(yùn)行

執(zhí)行服務(wù)器
在執(zhí)行服務(wù)器目錄下執(zhí)行啟動(dòng)命令
bin/azkaban-executor-start.sh ./
注:只能要執(zhí)行服務(wù)器根目錄運(yùn)行

啟動(dòng)完成后,在瀏覽器(建議使用谷歌瀏覽器)中輸入https://服務(wù)器IP地址:8443 ,即可訪問azkaban服務(wù)了.在登錄中輸入剛才新的戶用名及密碼,點(diǎn)擊 login.

啟動(dòng)可能出現(xiàn)的問題:
啟動(dòng)azkaban時(shí)出現(xiàn)User xml file conf/azkaban-users.xml doesn’t exist問題或登錄頁沒有樣式:修改配置文件里面的內(nèi)容為絕對(duì)路徑;
啟動(dòng)azkaban時(shí)報(bào)錯(cuò)mysql無法連接,但配置文件都填的無誤:mysql沒有開放外網(wǎng)訪問權(quán)限。

使用實(shí)例

一、command類型單一job
1 創(chuàng)建job描述文件

#command.job
type=command
command=echo 'hello'

2 將job打包成zip文件

zip command.job

3 通過azkaban網(wǎng)頁創(chuàng)建project并將job壓縮包上傳

image.png
image.png

4 啟動(dòng)job


image.png

二、command類型多job工作流flow
1 創(chuàng)建有依賴關(guān)系的多個(gè)job描述
第一個(gè)job

#foo.job
type=command
command=echo foo

第二個(gè)job:bar.job依賴foo.job

type=command
dependencies=foo
command=echo bar

2 將所有的job資源文件打包到一個(gè)zip文件中


image.png

3 在azkaban的web管理界面創(chuàng)建工程并上傳zip包
4 啟動(dòng)工作流flow

三、HDFS操作任務(wù)
1 創(chuàng)建job描述文件

# fs.job
type=command
command=/home/hadoop/apps/hadoop-2.6.1/bin/hadoop fs -mkdir /azaz

2 將job資源文件打包成zip文件


image.png

3、通過azkaban的web管理平臺(tái)創(chuàng)建project并上傳job壓縮包
4、啟動(dòng)執(zhí)行該job

四、MAPREDUCE任務(wù)
Mr任務(wù)依然可以使用command的job類型來執(zhí)行
1 創(chuàng)建job描述文件,及mr程序jar包(示例中直接使用hadoop自帶的example jar)

# mrwc.job
type=command
command=/home/hadoop/apps/hadoop-2.6.1/bin/hadoop  jar hadoop-mapreduce-examples-2.6.1.jar wordcount /wordcount/input /wordcount/azout

2 將所有job資源文件打到一個(gè)zip包中


image.png

3 在azkaban的web管理界面創(chuàng)建工程并上傳zip包
4 啟動(dòng)job

五、HIVE腳本任務(wù)
1 創(chuàng)建job描述文件和hive腳本
Hive腳本: test.sql

use default;
drop table aztest;
create table aztest(id int,name string) row format delimited fields terminated by ',';
load data inpath '/aztest/hiveinput' into table aztest;
create table azres as select * from aztest;
insert overwrite directory '/aztest/hiveoutput' select count(1) from aztest; 

Job描述文件:hivef.job

# hivef.job
type=command
command=/home/hadoop/apps/hive/bin/hive -f 'test.sql'

2 將所有job資源文件打到一個(gè)zip包中
3 在azkaban的web管理界面創(chuàng)建工程并上傳zip包
4 啟動(dòng)job
</div>

Azkaban應(yīng)用

一、業(yè)務(wù)場(chǎng)景

在廣告追蹤系統(tǒng)中,我們通過提供SDK給用戶,把各種各樣的用戶數(shù)據(jù)采集到我們的服務(wù)器中,然后通過MR計(jì)算,統(tǒng)計(jì)各種輸出。在本文中,筆者將抽取其中一種業(yè)務(wù)場(chǎng)景:計(jì)算用戶留存和付費(fèi)LTV。

為了計(jì)算以上兩個(gè)指標(biāo),需要采集三類數(shù)據(jù):賬戶的激活、在線、付費(fèi)記錄。其中用戶留存和付費(fèi)LTV的計(jì)算過程如下:

1、用戶留存:把用戶今天在線的數(shù)據(jù),與一個(gè)月內(nèi)的用戶激活數(shù)據(jù)做對(duì)比,找出今天在線的用戶,是在那天激活的,并計(jì)算出差別的天數(shù),這就是用戶留存的計(jì)算方法。

2、付費(fèi)LTV:找出今天哪些用戶付費(fèi)了,把這些用戶,與一個(gè)月內(nèi)的用戶激活數(shù)據(jù)做對(duì)比,找出今天付費(fèi)的用戶,是在那天激活的,并計(jì)算出差別的天數(shù),然后把今天付費(fèi)的總額,除以差別的天數(shù),得出付費(fèi)LTV。

出于對(duì)公司數(shù)據(jù)安全考慮,這里不會(huì)貼出任何數(shù)據(jù)和計(jì)算代碼,只會(huì)把與Azkaban相關(guān)的job信息和思路寫出來,讀者可以作為參考。

二、處理思路

1、原始的用戶數(shù)據(jù)是混合在一起的,都放在按天分區(qū)的hdfs的指定目錄下,這樣,我們就需要寫一個(gè)作為數(shù)據(jù)清洗的MR類,把原始日志中的在線,激活,付費(fèi)三類數(shù)據(jù)分別輸出到獨(dú)立的文件中。這在hadoopMR中可以通過輸出文件后綴的方式進(jìn)行區(qū)分。

2、完成第一步后,我們需要把三類數(shù)據(jù)分別進(jìn)行統(tǒng)計(jì),比如按照appid進(jìn)行統(tǒng)計(jì),幣別需要轉(zhuǎn)換,激活時(shí)間需要從時(shí)間戳轉(zhuǎn)換為日期等步驟。

3、第三步就需要把這三類數(shù)據(jù)分別入庫到hive中,供后面的hiveSQL進(jìn)行join操作。

4、把在線數(shù)據(jù)與激活數(shù)據(jù)做join,得出用戶留存;把付費(fèi)數(shù)據(jù)與激活數(shù)據(jù)做join,得出付費(fèi)LTV。這兩類數(shù)據(jù)計(jì)算完成后,需要入庫到新的表中。

5、最后在kylin中進(jìn)行計(jì)算,用戶就可以在kylin中查詢統(tǒng)計(jì)結(jié)果了。

總的數(shù)據(jù)處理流程如下:

image.png

三、具體job編寫

1、logStat.job:數(shù)據(jù)拆分

type=hadoopJava

job.extend=true

force.output.overwrite=true

mapred.mapper.new-api=true

mapred.reducer.new-api=true

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs

tmpjars=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/ad-tracker-mr2-1.0.0-SNAPSHOT-jar-with-dependencies.jar

input.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD-1}/*/input/self-event*

output.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/TrackingLogStat

calculate.date=all

job.class=com.dataeye.tracker.mr.mapred.actionpay.LogStatMapper

mapreduce.map.class=com.dataeye.tracker.mr.mapred.actionpay.LogStatMapper

mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.actionpay.LogStatReducer

mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel

mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel

mapred.output.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel

mapred.output.value.class=org.apache.hadoop.io.NullWritable

mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat

mapreduce.outputformat.class=com.dataeye.tracker.mr.common.SuffixMultipleOutputFormat

2、onlineLogStat.job:在線數(shù)據(jù)清洗

type=hadoopJava

job.extend=true

force.output.overwrite=true

mapred.mapper.new-api=true

mapred.reducer.new-api=true

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs

input.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/TrackingLogStat/*TRACING_ACTIVE_LOG_ONLINE

output.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtOnlineLogStat

calculate.date=${DD:YYYY}-${DD:MM}-${DD:DD}

job.class=com.dataeye.tracker.mr.mapred.actionpay.OnlineLogStatMapper

mapreduce.map.class=com.dataeye.tracker.mr.mapred.actionpay.OnlineLogStatMapper

mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.actionpay.OnlineLogStatReducer

mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel

mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel

mapred.output.key.class=org.apache.hadoop.io.Text

mapred.output.value.class=org.apache.hadoop.io.NullWritable

mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat

mapreduce.outputformat.class=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

dependencies=logStat

3、activeLogStat.job:激活數(shù)據(jù)清洗

type=hadoopJava

job.extend=true

force.output.overwrite=true

mapred.mapper.new-api=true

mapred.reducer.new-api=true

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs

input.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/TrackingLogStat/*TRACING_ACTIVE_LOG_ACTIVE,/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD-1}/00/adtActiveLogStat

output.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtActiveLogStat

calculate.date=${DD:YYYY}-${DD:MM}-${DD:DD}

job.class=com.dataeye.tracker.mr.mapred.actionpay.ActiveLogStatMapper

mapreduce.map.class=com.dataeye.tracker.mr.mapred.actionpay.ActiveLogStatMapper

mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.actionpay.ActiveLogStatReducer

mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel

mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel

mapred.output.key.class=org.apache.hadoop.io.Text

mapred.output.value.class=org.apache.hadoop.io.NullWritable

mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat

mapreduce.outputformat.class=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

dependencies=logStat

4、paymentLogStat.job:付費(fèi)數(shù)據(jù)清洗

type=hadoopJava

job.extend=true

force.output.overwrite=true

mapred.mapper.new-api=true

mapred.reducer.new-api=true

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs

input.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/TrackingLogStat/*TRACING_ACTIVE_LOG_PAYMENT

output.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtPaymentLogStat

calculate.date=${DD:YYYY}-${DD:MM}-${DD:DD}

job.class=com.dataeye.tracker.mr.mapred.actionpay.PaymentLogStatMapper

mapreduce.map.class=com.dataeye.tracker.mr.mapred.actionpay.PaymentLogStatMapper

mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.actionpay.PaymentLogStatReducer

mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel

mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel

mapred.output.key.class=org.apache.hadoop.io.Text

mapred.output.value.class=org.apache.hadoop.io.NullWritable

mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat

mapreduce.outputformat.class=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

dependencies=logStat

5、onlineHive.job:在線數(shù)據(jù)入庫

type=hive

user.to.proxy=azkaban

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs

azk.hive.action=execute.query

hive.script = res/hive_online.sql

dataPath=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtOnlineLogStat

day_p=${DD:YYYY}-${DD:MM}-${DD:DD-1}

dependencies=onlineLogStat

hive_online.sql:

use azkaban;

load data inpath '${dataPath}' overwrite into table adt_logstat_online PARTITION(day_p='${day_p}');

6、activeHive.job:激活數(shù)據(jù)入庫

type=hive

user.to.proxy=azkaban

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs

azk.hive.action=execute.query

hive.script = res/hive_active.sql

dataPath=hdfs://de-hdfs/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtActiveLogStat

day_p=${DD:YYYY}-${DD:MM}-${DD:DD-1}

dependencies=activeLogStat

hive_active.sql


use azkaban;

alter table adt_logstat_active_ext set location '${dataPath}';

INSERT overwrite TABLE adt_logstat_active PARTITION (day_p='${day_p}') SELECT appid,channel,compaign,publisher,site,country,province,city,deviceId,activeDate FROM adt_logstat_active_ext;

7、paymentHive.job:付費(fèi)數(shù)據(jù)入庫

type=hive

user.to.proxy=azkaban

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs

azk.hive.action=execute.query

hive.script = res/hive_payment.sql

dataPath=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtPaymentLogStat

day_p=${DD:YYYY}-${DD:MM}-${DD:DD-1}

dependencies=paymentLogStat

hive_payment.sql

use azkaban;

load data inpath '${dataPath}' overwrite into table adt_logstat_payment PARTITION(day_p='${day_p}');

8、activeOnlineHive.job:用戶留存統(tǒng)計(jì)

type=hive

user.to.proxy=azkaban

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs

azk.hive.action=execute.query

hive.script = res/hive_active_online.sql

now_day=${DD:YYYY}-${DD:MM}-${DD:DD-1}

bef_day=${DD:YYYY}-${DD:MM}-${DD:DD-3}

dependencies=activeHive,onlineHive

hive_active_online.sql

use azkaban;

INSERT overwrite TABLE user_retain_roll PARTITION (day_p='${now_day}') SELECT av.appid as appid, ol.channel as channel,ol.compaign as compaign,ol.publisher as publisher,ol.site as site, count(av.deviceId) AS total FROM adt_logstat_online AS ol INNER JOIN adt_logstat_active AS av ON ol.deviceId = av.deviceId and ol.appid = av.appid WHERE ol.day_p = '${now_day}' AND av.activeDate BETWEEN '${bef_day}' AND '${now_day}' GROUP BY av.appid, ol.channel,ol.compaign,ol.publisher,ol.site,av.activeDate;

9、activePaymentHive.job:付費(fèi)LTV統(tǒng)計(jì)

type=hive

user.to.proxy=azkaban

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs

azk.hive.action=execute.query

hive.script = res/hive_active_payment.sql

now_day=${DD:YYYY}-${DD:MM}-${DD:DD-1}

bef_day=${DD:YYYY}-${DD:MM}-${DD:DD-3}

dependencies=activeHive,paymentHive

hive_active_payment.sql

use azkaban;

INSERT overwrite TABLE user_retain_ltv PARTITION (day_p='${now_day}') select  av.appid as appid, py.channel as channel,py.compaign as compaign,py.publisher as publisher,py.site as site, py.deviceId as deviceId, py.paymentCount AS payment from adt_logstat_payment as py  inner join adt_logstat_active as av on av.deviceId = py.deviceId and av.appid = py.appid where py.day_p = '${now_day}' and av.day_p = '${now_day}';

10、kylin.job:kylin計(jì)算

type=hadoopJava

job.extend=false

job.class=com.dataeye.kylin.azkaban.JavaMain

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs

kylin.url=http://mysql8:7070/kylin/api

cube.name=user_retain_roll_cube,user_retain_ltv_cube

cube.date=${DD:YYYY}-${DD:MM}-${DD:DD-1}

dependencies=activeOnlineHive,activePaymentHive

四、打包運(yùn)行

打包過程與之前一致,最終的目錄結(jié)構(gòu)如下:

image.png

運(yùn)行結(jié)果如下:

可以查看執(zhí)行日志:

image.png

Flume介紹

一、Flume簡(jiǎn)介
  flume 作為 cloudera 開發(fā)的實(shí)時(shí)日志收集系統(tǒng),受到了業(yè)界的認(rèn)可與廣泛應(yīng)用。Flume 初始的發(fā)行版本目前被統(tǒng)稱為 Flume OG(original generation),屬于 cloudera。

但隨著 FLume 功能的擴(kuò)展,F(xiàn)lume OG 代碼工程臃腫、核心組件設(shè)計(jì)不合理、核心配置不標(biāo)準(zhǔn)等缺點(diǎn)暴露出來,尤其是在 Flume OG 的最后一個(gè)發(fā)行版本 0.9.4. 中,日

志傳輸不穩(wěn)定的現(xiàn)象尤為嚴(yán)重,為了解決這些問題,2011 年 10 月 22 號(hào),cloudera 完成了 Flume-728,對(duì) Flume 進(jìn)行了里程碑式的改動(dòng):重構(gòu)核心組件、核心配置以

及代碼架構(gòu),重構(gòu)后的版本統(tǒng)稱為 Flume NG(next generation);改動(dòng)的另一原因是將 Flume 納入 apache 旗下,cloudera Flume 改名為 Apache Flume。

備注:Flume參考資料

官方網(wǎng)站: http://flume.apache.org/
    用戶文檔: http://flume.apache.org/FlumeUserGuide.html
    開發(fā)文檔: http://flume.apache.org/FlumeDeveloperGuide.html
</div>

Flume安裝

<div class="mdContent">

Flume安裝

系統(tǒng)要求:
需安裝JDK 1.7及以上版本

1、 下載二進(jìn)制包
下載頁面:http://flume.apache.org/download.html
1.7.0下載地址:http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

2、解壓

$ cp ~/Downloads/apache-flume-1.7.0-bin.tar.gz ~
$ cd 
$ tar -zxvf apache-flume-1.7.0-bin.tar.gz
$ cd apache-flume-1.7.0-bin

3、創(chuàng)建flume-env.sh文件


$ cp conf/flume-env.sh.template conf/flume-env.sh

</div>

Flume內(nèi)部原理

<div class="mdContent">
每個(gè)flume agent包含三個(gè)主要組件:source、channel、sink。
source是從一些其他產(chǎn)生數(shù)據(jù)的應(yīng)用中接收數(shù)據(jù)的活躍組件,有自己產(chǎn)生數(shù)據(jù)的source,不過這些source通常用于測(cè)試目的,source可以監(jiān)聽一個(gè)或者多個(gè)網(wǎng)絡(luò)端口,用于接收數(shù)據(jù)或者可以從本地文件系統(tǒng)讀取數(shù)據(jù),每個(gè)source必須至少連接一個(gè)channel,基于一些標(biāo)準(zhǔn),一個(gè)source可以寫入幾個(gè)channel,復(fù)制事件到所有或某些channel。
一般來說,channel是被動(dòng)組件(雖然它們可以為了清理或者垃圾回收運(yùn)行自己的線程),緩沖agent已經(jīng)接收,但尚未寫出到另一個(gè)agent或者存儲(chǔ)系統(tǒng)的數(shù)據(jù),channel的行為像隊(duì)列,source寫入到它們,sink從它們中讀取,多個(gè)source可以安全地寫入到相同channel,并且多個(gè)sink可以從相同的channel進(jìn)行讀取,可是一個(gè)sink只能從一個(gè)channel讀取,如果多個(gè)sink從相同的channel讀取,它可以保證只有一個(gè)sink將會(huì)從channel讀取一個(gè)指定特定的事件,
sink連續(xù)輪詢各自的channel來讀取和刪除事件,sink將事件推送到下一階段,或者最終目的地。一旦在下一階段或其目的地中數(shù)據(jù)是安全的,sink通過事務(wù)提交通知channel,可以從channel中刪除這些事件。

image.png

flume本身不限制agent中source、channel和sink的數(shù)量,因此flume source可以接收事件,并可以通過配置將事件復(fù)制到多個(gè)目的地,這使得source通過channel處理器、攔截器和channel選擇器,寫入數(shù)據(jù)到channel成為可能
每個(gè)source都有自己的channel處理器,每次source將數(shù)據(jù)寫入channel,它是通過委派該任務(wù)到其channel處理器來完成的,然后,channel處理器將這些事件傳到一個(gè)或多個(gè)source配置的攔截器中,
攔截器是一段代碼,可以基于某些它完成的處理來讀取事件和修改或刪除事件,基于某些標(biāo)準(zhǔn),如正則表達(dá)式,攔截器可以用來刪除事件,為事件添加新報(bào)頭或移除現(xiàn)有的報(bào)頭等,每個(gè)source可以配置成使用多個(gè)攔截器,按照配置中定義的順序被調(diào)用,將攔截器的結(jié)果傳遞給鏈的下一個(gè)單元,這就是所謂的責(zé)任鏈的設(shè)計(jì)模式,一旦攔截器處理完事件,攔截器鏈返回的事件列表傳遞到channel列表,即通過channel選擇器為每個(gè)事件選擇channel。
source可以通過處理器-攔截器-選擇器路由寫入多個(gè)channel,channel選擇器的決定每個(gè)事件必須寫入到source附帶的哪個(gè)channel的組件。因此攔截器可以用來插入或刪除事件中的數(shù)據(jù),這樣channel選擇器可以應(yīng)用一些條件在這些事件上,來決定事件必須寫入哪些channel,channel選擇器可以對(duì)事件應(yīng)用任意過濾條件,來決定每個(gè)事件必須寫入哪些channel,以及哪些channel是必須的或可選的。
寫入到必需的channel失敗將會(huì)導(dǎo)致channel處理器拋出channelexception,表明source必須重新重試該事件,而未能寫入可選channel失敗僅僅忽略它,一旦寫出事件,處理器會(huì)對(duì)source指示成功狀態(tài),可能發(fā)送確認(rèn)給發(fā)送該事件的系統(tǒng),并繼續(xù)接受更多的事件。

image.png

sink運(yùn)行器運(yùn)行一個(gè)sink組,sink組可含有一個(gè)或多個(gè)sink,如果組中只存在一個(gè)sink,那么沒有組將更有效率,sink運(yùn)行器僅僅是一個(gè)詢問sink組來處理下一批事件的線程,每個(gè)sink組有一個(gè)sink處理器,處理器選擇組中的sink之一去處理下一個(gè)事件集合,每個(gè)sink只能從一個(gè)channel獲取數(shù)據(jù),盡管多個(gè)sink可以從同一個(gè)channel獲取數(shù)據(jù),選定的sink從channel中接收事件,并將事件寫入到下一階段或最終目的地。

image.png

</div>

Source、Channel、Sink

<div class="mdContent">
Source、Channel、Sink有哪些類型
Flume Source
Source類型 | 說明
Avro Source | 支持Avro協(xié)議(實(shí)際上是Avro RPC),內(nèi)置支持
Thrift Source | 支持Thrift協(xié)議,內(nèi)置支持
Exec Source | 基于Unix的command在標(biāo)準(zhǔn)輸出上生產(chǎn)數(shù)據(jù)
JMS Source | 從JMS系統(tǒng)(消息、主題)中讀取數(shù)據(jù)
Spooling Directory Source | 監(jiān)控指定目錄內(nèi)數(shù)據(jù)變更
Twitter 1% firehose Source| 通過API持續(xù)下載Twitter數(shù)據(jù),試驗(yàn)性質(zhì)
Netcat Source | 監(jiān)控某個(gè)端口,將流經(jīng)端口的每一個(gè)文本行數(shù)據(jù)作為Event輸入
Sequence Generator Source | 序列生成器數(shù)據(jù)源,生產(chǎn)序列數(shù)據(jù)
Syslog Sources | 讀取syslog數(shù)據(jù),產(chǎn)生Event,支持UDP和TCP兩種協(xié)議
HTTP Source | 基于HTTP POST或GET方式的數(shù)據(jù)源,支持JSON、BLOB表示形式
Legacy Sources | 兼容老的Flume OG中Source(0.9.x版本)

Flume Channel
Channel類型       說明
Memory Channel                | Event數(shù)據(jù)存儲(chǔ)在內(nèi)存中
JDBC Channel                  | Event數(shù)據(jù)存儲(chǔ)在持久化存儲(chǔ)中,當(dāng)前Flume Channel內(nèi)置支持Derby
File Channel                  | Event數(shù)據(jù)存儲(chǔ)在磁盤文件中
Spillable Memory Channel   | Event數(shù)據(jù)存儲(chǔ)在內(nèi)存中和磁盤上,當(dāng)內(nèi)存隊(duì)列滿了,會(huì)持久化到磁盤文件
Pseudo Transaction Channel | 測(cè)試用途
Custom Channel                | 自定義Channel實(shí)現(xiàn)

Flume Sink
Sink類型     說明
HDFS Sink             | 數(shù)據(jù)寫入HDFS
Logger Sink           | 數(shù)據(jù)寫入日志文件
Avro Sink             | 數(shù)據(jù)被轉(zhuǎn)換成Avro Event,然后發(fā)送到配置的RPC端口上
Thrift Sink           | 數(shù)據(jù)被轉(zhuǎn)換成Thrift Event,然后發(fā)送到配置的RPC端口上
IRC Sink              | 數(shù)據(jù)在IRC上進(jìn)行回放
File Roll Sink         | 存儲(chǔ)數(shù)據(jù)到本地文件系統(tǒng)
Null Sink             | 丟棄到所有數(shù)據(jù)
HBase Sink             | 數(shù)據(jù)寫入HBase數(shù)據(jù)庫
Morphline Solr Sink | 數(shù)據(jù)發(fā)送到Solr搜索服務(wù)器(集群)
ElasticSearch Sink     | 數(shù)據(jù)發(fā)送到Elastic Search搜索服務(wù)器(集群)
Kite Dataset Sink     | 寫數(shù)據(jù)到Kite Dataset,試驗(yàn)性質(zhì)的
Custom Sink           | 自定義Sink實(shí)現(xiàn)

</div>

Flume應(yīng)用案例

<div class="mdContent">
案例1、 A simple example
http://flume.apache.org/FlumeUserGuide.html#a-simple-example

配置文件
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################

啟動(dòng)flume
flume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console 指定配置目錄
flume-ng agent -n a1 -f op5 -Dflume.root.logger=INFO,console 不用指定配置目錄,將上訴source,channel,sink的文件起名為a1,同時(shí)指定這個(gè)文件在哪

安裝telnet
yum install telnet
退出 ctrl+] quit

Memory Chanel 配置
capacity:默認(rèn)該通道中最大的可以存儲(chǔ)的event數(shù)量是100,
trasactionCapacity:每次最大可以source中拿到或者送到sink中的event數(shù)量也是100
keep-alive:event添加到通道中或者移出的允許時(shí)間
byte**:即event的字節(jié)量的限制,只包括eventbody

案例2、兩個(gè)flume做集群(第一個(gè)agent的sink作為第二個(gè)agent的source)

node01服務(wù)器中,配置文件
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node1
a1.sources.r1.port = 44444

# Describe the sink
# a1.sinks.k1.type = logger
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node2
a1.sinks.k1.port = 60000

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################

node02服務(wù)器中,安裝Flume(步驟略)
配置文件
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node2
a1.sources.r1.port = 60000

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################

先啟動(dòng)node02的Flume
flume-ng agent  -n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console

再啟動(dòng)node01的Flume
flume-ng agent  -n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console

打開telnet 測(cè)試  node02控制臺(tái)輸出結(jié)果

案例3、Exec Source(監(jiān)聽一個(gè)文件)
http://flume.apache.org/FlumeUserGuide.html#exec-source

配置文件
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/flume.exec.log

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################

啟動(dòng)Flume
flume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console

創(chuàng)建空文件演示 touch flume.exec.log
循環(huán)添加數(shù)據(jù)
for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done

案例4、Spooling Directory Source(監(jiān)聽一個(gè)目錄)
http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
配置文件
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################

啟動(dòng)Flume
flume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console

拷貝文件演示
mkdir logs
cp flume.exec.log logs/

案例5、hdfs sink
http://flume.apache.org/FlumeUserGuide.html#hdfs-sink

    配置文件
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true

# Describe the sink
***只修改上一個(gè)spool sink的配置代碼塊 a1.sinks.k1.type = logger
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://sxt/flume/%Y-%m-%d/%H%M

##每隔60s或者文件大小超過10M的時(shí)候產(chǎn)生新文件
# hdfs有多少條消息時(shí)新建文件,0不基于消息個(gè)數(shù)
a1.sinks.k1.hdfs.rollCount=0
# hdfs創(chuàng)建多長時(shí)間新建文件,0不基于時(shí)間
a1.sinks.k1.hdfs.rollInterval=60
# hdfs多大時(shí)新建文件,0不基于文件大小
a1.sinks.k1.hdfs.rollSize=10240
# 當(dāng)目前被打開的臨時(shí)文件在該參數(shù)指定的時(shí)間(秒)內(nèi),沒有任何數(shù)據(jù)寫入,則將該臨時(shí)文件關(guān)閉并重命名成目標(biāo)文件
a1.sinks.k1.hdfs.idleTimeout=3

a1.sinks.k1.hdfs.fileType=DataStream

時(shí)間參數(shù)一定要帶上 true

a1.sinks.k1.hdfs.useLocalTimeStamp=true

## 每五分鐘生成一個(gè)目錄:
# 是否啟用時(shí)間上的”舍棄”,這里的”舍棄”,類似于”四舍五入”,后面再介紹。如果啟用,則會(huì)影響除了%t的其他所有時(shí)間表達(dá)式
a1.sinks.k1.hdfs.round=true
# 時(shí)間上進(jìn)行“舍棄”的值;
a1.sinks.k1.hdfs.roundValue=5
# 時(shí)間上進(jìn)行”舍棄”的單位,包含:second,minute,hour
a1.sinks.k1.hdfs.roundUnit=minute

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1(將source,channel,sink關(guān)聯(lián))
############################################################
創(chuàng)建HDFS目錄
hadoop fs -mkdir /flume

啟動(dòng)Flume
flume-ng agent -n a1 -c conf -f hdfs.conf -Dflume.root.logger=INFO,console

查看hdfs文件
hadoop fs -ls /flume/...
hadoop fs -get /flume/...

</div>

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 博客原文 翻譯作品,水平有限,如有錯(cuò)誤,煩請(qǐng)留言指正。原文請(qǐng)見 官網(wǎng)英文文檔 引言 概述 Apache Flume...
    rabbitGYK閱讀 11,694評(píng)論 13 34
  • 介紹 概述 Apache Flume是為有效收集聚合和移動(dòng)大量來自不同源到中心數(shù)據(jù)存儲(chǔ)而設(shè)計(jì)的可分布,可靠的,可用...
    ximengchj閱讀 3,665評(píng)論 0 13
  • 閱讀目錄(Content) 一、Flume簡(jiǎn)介 二、Flume特點(diǎn) 三、Flume的一些核心概念 3.1、Agen...
    達(dá)微閱讀 4,874評(píng)論 0 9
  • title: Flume構(gòu)建日志采集系統(tǒng)date: 2018-02-03 19:45tags: [flume,k...
    溯水心生閱讀 16,275評(píng)論 3 25
  • 一、Flume簡(jiǎn)介 flume 作為 cloudera 開發(fā)的實(shí)時(shí)日志收集系統(tǒng),受到了業(yè)界的認(rèn)可與廣泛應(yīng)用。Flu...
    superxcp閱讀 1,064評(píng)論 0 2

友情鏈接更多精彩內(nèi)容