參考文章1:Flume 自定義 Interceptor(攔截器)
參考文章2:java靜態(tài)內(nèi)部類和非靜態(tài)內(nèi)部類對(duì)外部類屬性的使用
問(wèn)題1:flume自定義攔截器時(shí),為什么要分單event處理,和多個(gè)event處理
問(wèn)題2:靜態(tài)內(nèi)部類,創(chuàng)建外部類對(duì)象并訪問(wèn)外部類對(duì)象
問(wèn)題3:avro source 、avro sink 定義
問(wèn)題4:avro source 、avro sink 采集通道還有問(wèn)題,數(shù)據(jù)傳輸不過(guò)去
一、flume攔截器介紹
攔截器是簡(jiǎn)單的插件式組件,設(shè)置在source和channel之間。source接收到的事件event,在寫(xiě)入channel之前,攔截器都可以進(jìn)行轉(zhuǎn)換或者刪除這些事件。每個(gè)攔截器只處理同一個(gè)source接收到的事件??梢宰远x攔截器。
本篇文章主要講解自定義連接器。flume內(nèi)置連接器,可參考該文章。
二、自定義連接器
需求:在bigdata02機(jī)器上,監(jiān)聽(tīng)44444端口。將包含hello的數(shù)據(jù)發(fā)送到bigdata03機(jī)器控制臺(tái),將不包含hello的數(shù)據(jù)發(fā)送到bigdata03機(jī)器控制臺(tái)。
步驟:
1、自定義flume攔截器
2、在bigdata02、bigdata03、bigdata04服務(wù)器上編寫(xiě)conf配置文件。flume2.conf 、flume3.conf、flume4.conf
3、測(cè)試
1、自定義flume攔截器
1)引入 pom 依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ydm</groupId>
<artifactId>flumeinterceptor1127</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<!--包名,一般是域名的反寫(xiě)-->
<groupId>org.apache.flume</groupId>
<!--項(xiàng)目名-->
<artifactId>flume-ng-core</artifactId>
<!--所需要的jar的版本-->
<version>1.7.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2)編寫(xiě)攔截器類
package com.atguigu.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* 定義flume連接器
* 1、實(shí)現(xiàn)org.apache.flume.interceptor.Interceptor 接口,需要重寫(xiě)接口中的方法
* 接口比如電腦的接口,插座的接口,雖然是不同廠家生產(chǎn)的,但是我們都可以用。
* 接口是一個(gè)公共的規(guī)范,只要符合規(guī)范,大家都可以使用,java中的接口更多體現(xiàn)在對(duì)行為的抽象。
*/
public class TypeInterceptor implements Interceptor {
//聲明一個(gè)存放事件的集合
private List<Event> addHeaderEvents;
@Override
public void initialize() {
//初始化
addHeaderEvents = new ArrayList<>();
}
//單個(gè)事件攔截
@Override
public Event intercept(Event event) {
//1.獲取事件中的投信息
Map<String, String> headers = event.getHeaders();
//2.獲取事件中的body 信息
// event.getBody(); 返回字節(jié)數(shù)組,需要將數(shù)組轉(zhuǎn)化為字符串
String body = new String(event.getBody());
//3.根據(jù)body中是否含有hello單詞來(lái)決定添加怎樣的頭信息
if(body.contains("hello")){
//4.添加頭信息
headers.put("type","hello");
}else {
//4.添加頭信息
headers.put("type","unhello");
}
return event;
}
//批量事件攔截
@Override
public List<Event> intercept(List<Event> events) {
//1.清空集合
addHeaderEvents.clear();
//2.遍歷events
for(Event event: events){
//3.給每一個(gè)事件添加頭信息
addHeaderEvents.add(intercept(event));
}
//4.返回結(jié)果
return addHeaderEvents;
}
@Override
public void close() {
}
/**
* 定義一個(gè)靜態(tài)內(nèi)部類
* 內(nèi)部類:就是在一個(gè)類中定義一個(gè)類。舉例:在一個(gè)類A的內(nèi)部定義一個(gè)類B,類B就稱為內(nèi)部類
* EG:生活中,在筆記本內(nèi)部有CPU,筆記本可以看成外部類,CPU可以看成內(nèi)部類
*
* 靜態(tài)內(nèi)部類
* 1.只可以訪問(wèn)外部類的靜態(tài)屬性,包括靜態(tài)私有屬性
* 2.不可以訪問(wèn)外部類的非靜態(tài)屬性,包括私有屬性。但可以通過(guò)new 外部類().成員的方式訪問(wèn)
*
* 使用內(nèi)部類最吸引人的原因是:每個(gè)內(nèi)部類都能獨(dú)立地繼承一個(gè)(接口的)實(shí)現(xiàn)
*
*/
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new TypeInterceptor();
}
//配置信息
@Override
public void configure(Context context) {
}
}
}
2、在bigdata02、bigdata03、bigdata04服務(wù)器上編寫(xiě)conf配置文件
1) 在bigdata02服務(wù)器上,cd /usr/flume/conf 在conf目錄下新建flume2.conf文件
#name
a2.sources = r1
a2.channels = c1 c2
a2.sinks = k1 k2
#source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 44444
#Interceptor
a1.sources.r1.interceptors = i1
#全類名
a1.sources.r1.interceptors.i1.type = com.atguigu.interceptor.TypeInterceptor$Builder
#channel selector
a2.sources.r1.selector.type = multiplexing
a2.sources.r1.selector.header = type
a2.sources.r1.selector.mapping.hello = c1
a2.sources.r1.selector.mapping.unhello = c2
#channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
#sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop103
a2.sinks.k1.port = 4142
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = hadoop104
a2.sinks.k2.port = 4142
#bind
a2.sources.r1.channels = c1 c2
a2.sources.k1.channel = c1
a2.sources.k2.channel = c2
2) 在bigdata03服務(wù)器上,cd /usr/flume/conf 在conf目錄下新建flume3.conf文件
#name
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata03
a3.sources.r1.prot = 4142
#channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
#sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
3) 在bigdata03服務(wù)器上,cd /usr/flume/conf 在conf目錄下新建flume4.conf文件
#name
a4.sources = r1
a4.channels = c1
a4.sinks = k1
#source
a4.sources.r1.type = avro
a4.sources.r1.bind = bigdata04
a4.sources.r1.prot = 4142
#channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100
#sink
a4.sinks.k1.type = logger
#Bind
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
3、測(cè)試
flume3 和 flume4 需要先啟動(dòng),flume2 需要連接flume3 和 flume4,若先啟動(dòng) flume2 會(huì)報(bào)連接不上(也可以無(wú)視錯(cuò)誤日志,先啟動(dòng))
cd /opt/apache-flume-1.7.0-bin
bin/flume-ng agent --conf conf/ --name a3 --conf-file /tmp/flume-job/interceptor/flume3 -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a2 --conf-file /tmp/flume-job/interceptor/flume2 -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a1 --conf-file /tmp/flume-job/interceptor/flume1 -Dflume.root.logger=INFO,console
先啟動(dòng)bigdata03\bigdata04 的配置文件
