一、安裝netcat工具?
1、sudo yum install -y nc?
2、檢查44444端口是否被占用: sudo netstat -tunlp | grep 44444? ?
3、操作命令?
1)監(jiān)聽: nc -l 端口號(hào)?
2)發(fā)送: nc 主機(jī)名(ip地址) 端口號(hào)
二、安裝flume包?
1、解壓縮1.9.0安裝包?
2、配置/etc/profile下flume環(huán)境變量??
3、兼容hadoop3:rm -rf /opt/module/flume-1.9.0/lib/guava-11.0.2.jar
三、命令?
1、將日志文件打印至控制臺(tái)
flume-ng --name a1 --conf conf/ --conf-file datas/netcatsource_loggersink.conf -Dflume.root.logger=INFO,console?
2、flume-ng -n a1 -c conf/ -f-file datas/netcatsource_loggersink.conf?
四、配置范例? ? ?
區(qū)分:? exec source適用于實(shí)時(shí)追加數(shù)據(jù),但不支持?jǐn)帱c(diǎn)續(xù)傳??
spooldir source適用于新添加文件,但不適用于實(shí)時(shí)追加數(shù)據(jù)?
taildir source實(shí)時(shí)追加數(shù)據(jù),也支持?jǐn)帱c(diǎn)續(xù)傳
1、exec范例
#1.自定義agent名稱、source、channel、sink組件
a2.sources = r2
a2.channels = c2
a2.sinks = k2
#2.設(shè)置source類型和配置
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/flume-1.9.0/demo/123.log
#3.設(shè)置channel類型和配置
a2.channels.c2.type = memory
a2.channels.c2.cappacity = 200
#4.設(shè)置sink的類型和配置
a2.sinks.k2.type = hdfs
#如果要使用時(shí)間轉(zhuǎn)義序列,需滿足兩要求1、使用本地時(shí)間戳 2、在event中headers必須使用時(shí)間戳
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k2.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k2.hdfs.roundUnit = hour
#積攢多少個(gè)Event才flush到hdfs一次
a2.sinks.k2.hdfs.batchSize = 100
#設(shè)置文件類型,支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件(秒)
a2.sinks.k2.hdfs.rollInterval = 60
#設(shè)置每個(gè)文件的滾動(dòng)大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動(dòng)與event數(shù)量無關(guān)
a2.sinks.k2.hdfs.rollCount = 0
2、監(jiān)控目錄范例,
a2.sources = r2
a2.channels = c2
a2.sinks = k2
#用來監(jiān)聽一個(gè)目錄進(jìn)行自動(dòng)收集目錄的內(nèi)容
#1、當(dāng)目錄中某個(gè)文件被讀取完畢后,該文件兩種處理方式:1)刪除 2)更改擴(kuò)展名
a2.sources.r2.type = spooldir
a2.sources.r2.spoolDir = /opt/module/flume-1.9.0/upload
a2.sources.r2.fileSuffix = .COMPLETED
#3.設(shè)置channel類型和配置
a2.channels.c2.type = memory
a2.channels.c2.cappacity = 200
#4.設(shè)置sink的類型和配置
a2.sinks.k2.type = hdfs
#如果要使用時(shí)間轉(zhuǎn)義序列,需滿足兩要求1、使用本地時(shí)間戳 2、在event中headers必須使用時(shí)間戳
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k2.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k2.hdfs.roundUnit = hour
#積攢多少個(gè)Event才flush到hdfs一次
a2.sinks.k2.hdfs.batchSize = 100
#設(shè)置文件類型,支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#設(shè)置每個(gè)文件的滾動(dòng)大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動(dòng)與event數(shù)量無關(guān)
a2.sinks.k2.hdfs.rollCount = 0
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
3、taildir范例
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_poition.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/demo/123.log
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
五、復(fù)制范例
1、agent1(hadoop102):?
#a1:agent的名字
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
#聲明source具體類型和對(duì)應(yīng)的配置屬性
a1.sources.r1.type = exec
#a1.sources.r1.bind = hadoop102
a1.sources.r1.command = tail -F /opt/module/flume-1.9.0/demo/123.log
a1.sources.r1.selector.type = replicating
#聲明channel具體的類型和對(duì)應(yīng)配置屬性
a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.channels.c1.capacity = 100
a1.channels.c2.capacity = 100
#a1.channels.c1.keep-alive = 3
#聲明sink具體的類型和對(duì)應(yīng)配置屬性
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k2.port = 33333
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 44444
#聲明source、sink和channel之間的關(guān)系
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
2、agent2(hadoop103):?
#a1:agent的名字
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#聲明source具體類型和對(duì)應(yīng)的配置屬性
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 33333
#聲明channel具體的類型和對(duì)應(yīng)配置屬性
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
#聲明sink具體的類型和對(duì)應(yīng)配置屬性
a1.sinks.k1.type = logger
#聲明source、sink和channel之間的關(guān)系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、agent3(hadoop104):
#a1:agent的名字
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#聲明source具體類型和對(duì)應(yīng)的配置屬性
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 44444
#聲明channel具體的類型和對(duì)應(yīng)配置屬性
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
#聲明sink具體的類型和對(duì)應(yīng)配置屬性
#將event數(shù)據(jù)存儲(chǔ)在本地磁盤?
a1.sinks.k1.type = file_roll??
#配置存儲(chǔ)event目錄
a1.sinks.k1.sink_directory = /opt/module/flume-1.9.0/demo
#聲明source、sink和channel之間的關(guān)系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
六、復(fù)用范例
1、agent1(hadoop102):
#a1:agent的名字
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
#聲明source具體類型和對(duì)應(yīng)的配置屬性
a1.sources.r1.type = exec
#a1.sources.r1.bind = hadoop102
a1.sources.r1.command = tail -F /opt/module/flume-1.9.0/demo/123.log
#a1.sources.r1.selector.type = replicating
#復(fù)用?
a1.sources.r1.selector.type = multiplexing
#state指的是headers中key的值?
a1.sources.r1.selector.header = state
#CZ指的是headers中value的值?
a1.sources.r1.selector.mapping.CZ = c1
#US指的是headers中value的值?
a1.sources.r1.selector.mapping.US = c2
#攔截器:給event中headers添加數(shù)據(jù)
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = state
a1.sources.r1.interceptors.i1.value = CZ
#聲明channel具體的類型和對(duì)應(yīng)配置屬性
a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.channels.c1.capacity = 100
a1.channels.c2.capacity = 100
#a1.channels.c1.keep-alive = 3
#聲明sink具體的類型和對(duì)應(yīng)配置屬性
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k2.port = 33333
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 44444
#聲明source、sink和channel之間的關(guān)系
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2? ? ? ??
2、agent2、agent3同復(fù)制范例
七、故障轉(zhuǎn)移范例
1、agent1
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
#聲明source具體類型和對(duì)應(yīng)的配置屬性
a1.sources.r1.type = netcat
#a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 22222
#聲明channel具體的類型和對(duì)應(yīng)配置屬性
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
#故障轉(zhuǎn)移配置sinkgroups? ? ?
a1.sinkgroups = g1
a1.singgroups.g1.sinks = k1 k2
a1.singgroups.g1.processor.type = failover
a1.singgroups.g1.processor.priority.k1 = 5
a1.singgroups.g1.processor.priority.k2 = 10?
#負(fù)載均衡配置sinkgroups?
#a1.sinkgroups=g1
#a1.sinkgroups.g1.sinks=k1k2
#a1.sinkgroups.g1.processor.type=load_balance
#a1.sinkgroups.g1.processor.backoff=true
#a1.sinkgroups.g1.processor.selector=random?
#聲明sink具體的類型和對(duì)應(yīng)配置屬性
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k2.port = 33333
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 44444
#聲明source、sink和channel之間的關(guān)系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
2、agent2、agent3同復(fù)制范例