10_Flume之?dāng)r截器

1. 介紹

Inteceptor主要用來(lái)對(duì)event進(jìn)行過(guò)濾和修改,Interceptor可以將處理結(jié)果傳遞給下一個(gè)Interceptor從而形成InterceptorChain。多個(gè)Interceptor在配置文件中以空格分隔,攔截器的順序就是event處理的順序,只有一個(gè)攔截器通過(guò)之后才會(huì)進(jìn)行到下一個(gè)攔截器。Inteceptor相關(guān)源碼在flume-ng-core的org.apache.flume.interceptor下。

2. 官方自帶

flume中自帶以下幾種Inteceptor,可以實(shí)現(xiàn)自定義的攔截器,galaxy-tunnel中實(shí)現(xiàn)了DataFlowDest Inteceptor,根據(jù)ChannelGroup字段選擇不同的數(shù)據(jù)通道。

  • Timestamp Interceptor:該攔截器會(huì)在Event Header 中插入一個(gè)key是timestamp的KV對(duì),value的值是相關(guān)的timestamp。該攔截器可以保護(hù)相關(guān)的已經(jīng)存在的timestamp。
  • Host Interceptor:該攔截器會(huì)在Event Header中插入當(dāng)前agent運(yùn)行機(jī)器的hostname或者ip,插入KV對(duì)
  • Static Interceptor:該攔截器允許用戶追加靜態(tài)頭部在所有的Event中
  • UUIDInterceptor:用于在每個(gè)events header中生成一個(gè)UUID字符串。
  • Searchand Replace Interceptor:該攔截器基于Java正則表達(dá)式提供簡(jiǎn)單的基于字符串的搜索和替換功能,與Java Matcher.replaceAll()方法中相同的規(guī)則
  • RegexExtractor Interceptor:通過(guò)正則表達(dá)式來(lái)在header中添加指定的key,value則為正則匹配的部分
  • Regex Filtering Interceptor:在日志采集的時(shí)候,可能有一些數(shù)據(jù)是我們不需要的,這樣添加過(guò)濾攔截器,可以過(guò)濾掉不需要的日志,也可以根據(jù)需要收集滿足正則條件的日志。
  • Morphline Interceptor:該攔截器使用Morphline對(duì)每個(gè)events數(shù)據(jù)做相應(yīng)的轉(zhuǎn)換。關(guān)于Morphline的使用,可參考http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html

3. 自定義攔截器

  1. 環(huán)境:Java - Maven pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.com.gc</groupId>
    <artifactId>flume-custom-interceptor</artifactId>
    <version>1.11</version>

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <!--  <scope>provided</scope>  -->
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.78</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.3.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <!--此工具會(huì)將全部依賴打包-->
            <!--            <plugin>-->
            <!--                <groupId>org.apache.maven.plugins</groupId>-->
            <!--                <artifactId>maven-assembly-plugin</artifactId>-->
            <!--                <version>3.3.0</version>-->
            <!--                <configuration>-->
            <!--                    <descriptorRefs>-->
            <!--                        <descriptorRef>jar-with-dependencies</descriptorRef>-->
            <!--                    </descriptorRefs>-->
            <!--                    <archive>-->
            <!--                        <manifest>-->
            <!--                            &lt;!&ndash;通過(guò)mainClass標(biāo)簽設(shè)置成主類的全類名FQCN&ndash;&gt;-->
            <!--                            &lt;!&ndash;<mainClass></mainClass>&ndash;&gt;-->
            <!--                        </manifest>-->
            <!--                    </archive>-->
            <!--                </configuration>-->
            <!--                <executions>-->
            <!--                    <execution>-->
            <!--                        <id>make-assembly</id>-->
            <!--                        <phase>package</phase>-->
            <!--                        <goals>-->
            <!--                            <goal>single</goal>-->
            <!--                        </goals>-->
            <!--                    </execution>-->
            <!--                </executions>-->
            <!--            </plugin>-->
        </plugins>
    </build>
</project>
  1. 將數(shù)據(jù)中的 st 轉(zhuǎn)化成時(shí)間戳,寫(xiě)入 header中,HDFSSink使用它來(lái)確定時(shí)間分區(qū)
package xxx.xxx.xxx.flume;

import xxx.xxx.xxx.flume.utils.DateUtil;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

/**
 * 根據(jù) st 給Event的header信息添加時(shí)間戳
 */
public class TimestampInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        try {
            Map<String, String> headers = event.getHeaders();
            String log = new String(event.getBody(), StandardCharsets.UTF_8);
            JSONObject jsonObject = JSONObject.parseObject(log);
            String st = jsonObject.getString("st");
            // Flume HDFSSink要求單位為毫秒
            long timeStamp = DateUtil.getTimeStamp(st, "yyyy-MM-dd HH:mm:ss.SSS");
            headers.put("timestamp", String.valueOf(timeStamp));

            return event;
        } catch (Exception e) {
            e.printStackTrace();
            // TODO 異常數(shù)據(jù)拋棄。
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        List<Event> intercepted = Lists.newArrayListWithCapacity(list.size());
        for (Event event : list) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                intercepted.add(interceptedEvent);
            }
        }
        return intercepted;
    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }

    @Override
    public void close() {
    }
}
  1. 打包上傳

將打包的jar放到 ${FLUME_HOME}/lib,如果攔截器里面使用其它依賴包,可以將這些包直接打進(jìn)自定義攔截器的jar中,或者將其下載也放到lib下。

  1. 使用
# 命名 Agent 上的組件
a_app_info_to_hdfs.sources = s_app_info
a_app_info_to_hdfs.channels = c_app_info
a_app_info_to_hdfs.sinks = k_app_info
#############################################################################

# 數(shù)據(jù)采集 - Kafka To HDFS
#
# 數(shù)據(jù)源:
# 類型 = KafkaSource
# Topic = app_info_full
#
# channel:
# 類型 = file
# 記錄 = ${FLUME_JOB_CONFIG_PATH}/log_channel_datas/../ app_info_dataDir | app_info_checkpointDir
#
# 數(shù)據(jù)出口:
# 類型 = HDFSSink
# HDFS Path = hdfs://${hadoopClusterName}/data/origin_data/log/app_info_full/yr=%Y/mon=%m/day=%d/hr=%H
# Hive TableName = app_info_full
# source
a_app_info_to_hdfs.sources.s_app_info.type = org.apache.flume.source.kafka.KafkaSource
a_app_info_to_hdfs.sources.s_app_info.batchSize = 5000
a_app_info_to_hdfs.sources.s_app_info.batchDurationMillis = 2000
# a_app_info_to_hdfs.sources.s_app_info.kafka.bootstrap.servers = ${kafkaCluster}
a_app_info_to_hdfs.sources.s_app_info.kafka.bootstrap.servers = ${kafkaCluster_acl}
a_app_info_to_hdfs.sources.s_app_info.kafka.consumer.security.protocol=SASL_PLAINTEXT
a_app_info_to_hdfs.sources.s_app_info.kafka.consumer.sasl.mechanism = PLAIN
a_app_info_to_hdfs.sources.s_app_info.kafka.consumer.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="${kfk_user}" password="${kfk_pwd}" ;
a_app_info_to_hdfs.sources.s_app_info.kafka.topics = app_info_full
a_app_info_to_hdfs.sources.s_app_info.kafka.consumer.group.id = bigdata_flume
a_app_info_to_hdfs.sources.s_app_info.kafka.setTopicHeader = true
a_app_info_to_hdfs.sources.s_app_info.kafka.topicHeader = topic
a_app_info_to_hdfs.sources.s_app_info.interceptors = i1
a_app_info_to_hdfs.sources.s_app_info.interceptors.i1.type= xxx.xxx.xxx.flume.TimestampInterceptor$Builder

# channel
a_app_info_to_hdfs.channels.c_app_info.type = file
a_app_info_to_hdfs.channels.c_app_info.dataDirs = ${exec_log_path}/app_info_dataDir
a_app_info_to_hdfs.channels.c_app_info.checkpointDir = ${exec_log_path}/app_info_checkpointDir
a_app_info_to_hdfs.channels.c_app_info.capacity = 3000000
a_app_info_to_hdfs.channels.c_app_info.transactionCapacity = 20000
a_app_info_to_hdfs.channels.c_app_info.keep-alive = 5

# sink
a_app_info_to_hdfs.sinks.k_app_info.type = hdfs
a_app_info_to_hdfs.sinks.k_app_info.hdfs.path = hdfs://${hadoopClusterName}/data/origin_data/log/%{topic}/yr=%Y/mon=%m/day=%d/hr=%H
a_app_info_to_hdfs.sinks.k_app_info.hdfs.fileSuffix = _${hdfsFileSuffix}.gz
a_app_info_to_hdfs.sinks.k_app_info.hdfs.filePrefix = log_%Y%m%d%H%M
a_app_info_to_hdfs.sinks.k_app_info.hdfs.rollInterval = 0
a_app_info_to_hdfs.sinks.k_app_info.hdfs.rollSize = 125829120
a_app_info_to_hdfs.sinks.k_app_info.hdfs.rollCount = 0
a_app_info_to_hdfs.sinks.k_app_info.hdfs.minBlockReplicas = 1
a_app_info_to_hdfs.sinks.k_app_info.hdfs.round = true
a_app_info_to_hdfs.sinks.k_app_info.hdfs.roundValue = 1
a_app_info_to_hdfs.sinks.k_app_info.hdfs.roundUnit = hour
a_app_info_to_hdfs.sinks.k_app_info.hdfs.idleTimeout = 600
a_app_info_to_hdfs.sinks.k_app_info.hdfs.fileType = CompressedStream
a_app_info_to_hdfs.sinks.k_app_info.hdfs.codeC = gzip
a_app_info_to_hdfs.sinks.k_app_info.hdfs.writeFormat = Text

# source | channel | sink 關(guān)聯(lián)
a_app_info_to_hdfs.sources.s_app_info.channels = c_app_info
a_app_info_to_hdfs.sinks.k_app_info.channel = c_app_info
#############################################################################
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容