大數(shù)據(jù)之flume

一、安裝netcat工具?
1、sudo yum install -y nc?
2、檢查44444端口是否被占用: sudo netstat -tunlp | grep 44444? ?
3、操作命令?
1)監(jiān)聽: nc -l 端口號(hào)?
2)發(fā)送: nc 主機(jī)名(ip地址) 端口號(hào)

二、安裝flume包?
1、解壓縮1.9.0安裝包?
2、配置/etc/profile下flume環(huán)境變量??
3、兼容hadoop3:rm -rf /opt/module/flume-1.9.0/lib/guava-11.0.2.jar

三、命令?
1、將日志文件打印至控制臺(tái)
flume-ng --name a1 --conf conf/ --conf-file datas/netcatsource_loggersink.conf -Dflume.root.logger=INFO,console?
2、flume-ng -n a1 -c conf/ -f-file datas/netcatsource_loggersink.conf?

四、配置范例? ? ?
區(qū)分:? exec source適用于實(shí)時(shí)追加數(shù)據(jù),但不支持?jǐn)帱c(diǎn)續(xù)傳??
spooldir source適用于新添加文件,但不適用于實(shí)時(shí)追加數(shù)據(jù)?
taildir source實(shí)時(shí)追加數(shù)據(jù),也支持?jǐn)帱c(diǎn)續(xù)傳

1、exec范例

#1.自定義agent名稱、source、channel、sink組件

a2.sources = r2

a2.channels = c2

a2.sinks = k2

#2.設(shè)置source類型和配置

a2.sources.r2.type = exec

a2.sources.r2.command = tail -F /opt/module/flume-1.9.0/demo/123.log

#3.設(shè)置channel類型和配置

a2.channels.c2.type = memory

a2.channels.c2.cappacity = 200

#4.設(shè)置sink的類型和配置

a2.sinks.k2.type = hdfs

#如果要使用時(shí)間轉(zhuǎn)義序列,需滿足兩要求1、使用本地時(shí)間戳 2、在event中headers必須使用時(shí)間戳

a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H

a2.sinks.k2.hdfs.useLocalTimeStamp = true

#上傳文件的前綴

a2.sinks.k2.hdfs.filePrefix = logs-

#是否按照時(shí)間滾動(dòng)文件夾

a2.sinks.k2.hdfs.round = true

#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾

a2.sinks.k2.hdfs.roundValue = 1

#重新定義時(shí)間單位

a2.sinks.k2.hdfs.roundUnit = hour

#積攢多少個(gè)Event才flush到hdfs一次

a2.sinks.k2.hdfs.batchSize = 100

#設(shè)置文件類型,支持壓縮

a2.sinks.k2.hdfs.fileType = DataStream

#多久生成一個(gè)新的文件(秒)

a2.sinks.k2.hdfs.rollInterval = 60

#設(shè)置每個(gè)文件的滾動(dòng)大小

a2.sinks.k2.hdfs.rollSize = 134217700

#文件的滾動(dòng)與event數(shù)量無關(guān)

a2.sinks.k2.hdfs.rollCount = 0

2、監(jiān)控目錄范例,

a2.sources = r2

a2.channels = c2

a2.sinks = k2

#用來監(jiān)聽一個(gè)目錄進(jìn)行自動(dòng)收集目錄的內(nèi)容

#1、當(dāng)目錄中某個(gè)文件被讀取完畢后,該文件兩種處理方式:1)刪除 2)更改擴(kuò)展名

a2.sources.r2.type = spooldir

a2.sources.r2.spoolDir = /opt/module/flume-1.9.0/upload

a2.sources.r2.fileSuffix = .COMPLETED

#3.設(shè)置channel類型和配置

a2.channels.c2.type = memory

a2.channels.c2.cappacity = 200

#4.設(shè)置sink的類型和配置

a2.sinks.k2.type = hdfs

#如果要使用時(shí)間轉(zhuǎn)義序列,需滿足兩要求1、使用本地時(shí)間戳 2、在event中headers必須使用時(shí)間戳

a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H

a2.sinks.k2.hdfs.useLocalTimeStamp = true

#上傳文件的前綴

a2.sinks.k2.hdfs.filePrefix = logs-

#是否按照時(shí)間滾動(dòng)文件夾

a2.sinks.k2.hdfs.round = true

#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾

a2.sinks.k2.hdfs.roundValue = 1

#重新定義時(shí)間單位

a2.sinks.k2.hdfs.roundUnit = hour

#積攢多少個(gè)Event才flush到hdfs一次

a2.sinks.k2.hdfs.batchSize = 100

#設(shè)置文件類型,支持壓縮

a2.sinks.k2.hdfs.fileType = DataStream

#多久生成一個(gè)新的文件

a2.sinks.k2.hdfs.rollInterval = 60

#設(shè)置每個(gè)文件的滾動(dòng)大小

a2.sinks.k2.hdfs.rollSize = 134217700

#文件的滾動(dòng)與event數(shù)量無關(guān)

a2.sinks.k2.hdfs.rollCount = 0

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

3、taildir范例

a1.sources = r1

a1.channels = c1

a1.sinks = k1

a1.sources.r1.type = TAILDIR

a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_poition.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/demo/123.log

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

五、復(fù)制范例

1、agent1(hadoop102):?
#a1:agent的名字

a1.sources = r1

a1.channels = c1 c2

a1.sinks = k1 k2

#聲明source具體類型和對(duì)應(yīng)的配置屬性

a1.sources.r1.type = exec

#a1.sources.r1.bind = hadoop102

a1.sources.r1.command = tail -F /opt/module/flume-1.9.0/demo/123.log

a1.sources.r1.selector.type = replicating

#聲明channel具體的類型和對(duì)應(yīng)配置屬性

a1.channels.c1.type = memory

a1.channels.c2.type = memory

a1.channels.c1.capacity = 100

a1.channels.c2.capacity = 100

#a1.channels.c1.keep-alive = 3

#聲明sink具體的類型和對(duì)應(yīng)配置屬性

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = hadoop103

a1.sinks.k2.port = 33333

a1.sinks.k2.type = avro

a1.sinks.k2.hostname = hadoop104

a1.sinks.k2.port = 44444

#聲明source、sink和channel之間的關(guān)系

a1.sources.r1.channels = c1 c2

a1.sinks.k1.channel = c1

a1.sinks.k2.channel = c2

2、agent2(hadoop103):?
#a1:agent的名字

a1.sources = r1

a1.channels = c1

a1.sinks = k1

#聲明source具體類型和對(duì)應(yīng)的配置屬性

a1.sources.r1.type = avro

a1.sources.r1.bind = hadoop103

a1.sources.r1.port = 33333

#聲明channel具體的類型和對(duì)應(yīng)配置屬性

a1.channels.c1.type = memory

a1.channels.c1.capacity = 100

#聲明sink具體的類型和對(duì)應(yīng)配置屬性

a1.sinks.k1.type = logger

#聲明source、sink和channel之間的關(guān)系

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

3、agent3(hadoop104):

#a1:agent的名字

a1.sources = r1

a1.channels = c1

a1.sinks = k1

#聲明source具體類型和對(duì)應(yīng)的配置屬性

a1.sources.r1.type = avro

a1.sources.r1.bind = hadoop104

a1.sources.r1.port = 44444

#聲明channel具體的類型和對(duì)應(yīng)配置屬性

a1.channels.c1.type = memory

a1.channels.c1.capacity = 100

#聲明sink具體的類型和對(duì)應(yīng)配置屬性

#將event數(shù)據(jù)存儲(chǔ)在本地磁盤?
a1.sinks.k1.type = file_roll??
#配置存儲(chǔ)event目錄
a1.sinks.k1.sink_directory = /opt/module/flume-1.9.0/demo

#聲明source、sink和channel之間的關(guān)系

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

六、復(fù)用范例

1、agent1(hadoop102):

#a1:agent的名字

a1.sources = r1

a1.channels = c1 c2

a1.sinks = k1 k2

#聲明source具體類型和對(duì)應(yīng)的配置屬性

a1.sources.r1.type = exec

#a1.sources.r1.bind = hadoop102

a1.sources.r1.command = tail -F /opt/module/flume-1.9.0/demo/123.log

#a1.sources.r1.selector.type = replicating

#復(fù)用?
a1.sources.r1.selector.type = multiplexing

#state指的是headers中key的值?
a1.sources.r1.selector.header = state

#CZ指的是headers中value的值?
a1.sources.r1.selector.mapping.CZ = c1

#US指的是headers中value的值?
a1.sources.r1.selector.mapping.US = c2

#攔截器:給event中headers添加數(shù)據(jù)

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = static

a1.sources.r1.interceptors.i1.key = state

a1.sources.r1.interceptors.i1.value = CZ

#聲明channel具體的類型和對(duì)應(yīng)配置屬性

a1.channels.c1.type = memory

a1.channels.c2.type = memory

a1.channels.c1.capacity = 100

a1.channels.c2.capacity = 100

#a1.channels.c1.keep-alive = 3

#聲明sink具體的類型和對(duì)應(yīng)配置屬性

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = hadoop103

a1.sinks.k2.port = 33333

a1.sinks.k2.type = avro

a1.sinks.k2.hostname = hadoop104

a1.sinks.k2.port = 44444

#聲明source、sink和channel之間的關(guān)系

a1.sources.r1.channels = c1 c2

a1.sinks.k1.channel = c1

a1.sinks.k2.channel = c2? ? ? ??

2、agent2、agent3同復(fù)制范例

七、故障轉(zhuǎn)移范例

1、agent1

a1.sources = r1

a1.channels = c1

a1.sinks = k1 k2

#聲明source具體類型和對(duì)應(yīng)的配置屬性

a1.sources.r1.type = netcat

#a1.sources.r1.bind = hadoop102

a1.sources.r1.port = 22222

#聲明channel具體的類型和對(duì)應(yīng)配置屬性

a1.channels.c1.type = memory

a1.channels.c1.capacity = 100

#故障轉(zhuǎn)移配置sinkgroups? ? ?
a1.sinkgroups = g1

a1.singgroups.g1.sinks = k1 k2

a1.singgroups.g1.processor.type = failover

a1.singgroups.g1.processor.priority.k1 = 5

a1.singgroups.g1.processor.priority.k2 = 10?

#負(fù)載均衡配置sinkgroups?

#a1.sinkgroups=g1
#a1.sinkgroups.g1.sinks=k1k2
#a1.sinkgroups.g1.processor.type=load_balance
#a1.sinkgroups.g1.processor.backoff=true
#a1.sinkgroups.g1.processor.selector=random?

#聲明sink具體的類型和對(duì)應(yīng)配置屬性

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = hadoop103

a1.sinks.k2.port = 33333

a1.sinks.k2.type = avro

a1.sinks.k2.hostname = hadoop104

a1.sinks.k2.port = 44444

#聲明source、sink和channel之間的關(guān)系

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

a1.sinks.k2.channel = c1

2、agent2、agent3同復(fù)制范例

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容