業(yè)務(wù)場景:收集nginx日志中個(gè)別信息進(jìn)入kafka,為了避免kafka壓力過大,這里優(yōu)化了兩點(diǎn)
- 刷選掉不需要分析的數(shù)據(jù)進(jìn)入kafka
- 盡量把消息均勻分布在不同的broker上
刷選數(shù)據(jù)
- 過濾掉不需要的數(shù)據(jù)
- 自定義Interceptor
<!-- 這里是maven配置 -->
<!-- 我們用的是1.6.0版本 -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
//只保留兩個(gè)接口的數(shù)據(jù)
package deng.yb.flume_ng_Interceptor;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.codec.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
public class MyInterceptor implements Interceptor {
/**
* epp接口-request
*/
private final String EPP_REQUEST = "POST /api/sky_server_data_app/track/user_time HTTP/1.1";
/**
* app接口-request
*/
private final String APP_REQUEST = "POST /api/sky_server_data_app/code/app_code HTTP/1.1";
public void close() {
}
public void initialize() {
}
public Event intercept(Event event) {
String body = new String(event.getBody(), Charsets.UTF_8);
if (body.indexOf(EPP_REQUEST) > -1 || body.indexOf(APP_REQUEST) > -1) {
event.setBody(body.toString().getBytes());
return event;
}
return null;
}
public List<Event> intercept(List<Event> events) {
List<Event> intercepted = new ArrayList<>(events.size());
for (Event event : events) {
Event interceptedEvent = intercept(event);
if (interceptedEvent != null) {
intercepted.add(interceptedEvent);
}
}
return intercepted;
}
public static class Builder implements Interceptor.Builder {
public void configure(Context arg0) {
// TODO Auto-generated method stub
}
public Interceptor build() {
return new MyInterceptor();
}
}
}
- cdh flume配置修改,agent添加以下配置
epplog.sources.r1.interceptors=i1
epplog.sources.r1.interceptors.i1.type= deng.yb.flume_ng_Interceptor.MyInterceptor$Builder
- 把自定義程序打好jar包放進(jìn)$FLUME_HOME/lib文件夾下
- 重啟
- 這樣flume到kafka的數(shù)據(jù)就是帥選的信息后的,避免了大量沒用信息到kafka導(dǎo)致IO問題
kafka均衡負(fù)載
- 需要把消息均勻分布在不同brokers上,避免單臺broker節(jié)點(diǎn)壓力過大
- 官方文檔這樣說
Kafka Sink uses the topic and key properties from the FlumeEvent headers to send events to Kafka. If topic exists in the headers, the event will be sent to that specific topic, overriding the topic configured for the Sink. If key exists in the headers, the key will used by Kafka to partition the data between the topic partitions. Events with same key will be sent to the same partition. If the key is null, events will be sent to random partitions.
- 大概意思是 - kafka-sink是從header里的key參數(shù)來確定將數(shù)據(jù)發(fā)到kafka的哪個(gè)分區(qū)中。如果為null,那么就會隨機(jī)發(fā)布至分區(qū)中。但我測試的結(jié)果是flume發(fā)布的數(shù)據(jù)會發(fā)布到一個(gè)分區(qū)中的
- 向flume添加攔截器,會為每個(gè)event的head添加一個(gè)隨機(jī)唯一的key,我們需要向header中寫上隨機(jī)的key,然后數(shù)據(jù)才會真正的向kafka分區(qū)進(jìn)行隨機(jī)發(fā)布
- flume的agent添加和修改以下配置
epplog.sources.r1.interceptors=i1 i2
epplog.sources.r1.interceptors.i1.type= deng.yb.flume_ng_Interceptor.MyInterceptor$Builder
epplog.sources.r1.interceptors.i2.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
epplog.sources.r1.interceptors.i2.headerName=key
epplog.sources.r1.interceptors.i2.preserveExisting=false
- 創(chuàng)建topic
#分區(qū)數(shù)需要根據(jù)brokers的數(shù)量決定,最好是brokers的整數(shù)倍
kafka-topics --create --zookeeper bi-slave1:2181,bi-slave2:2181,bi-master:2181 --replication-factor 1 --partitions 3 --topic epplog1
修改flume的sink的topic,重啟flume
-
看到消息
測試結(jié)果.png 可以看到,消息自動(dòng)uuid和帥選后的信息
-
查看不同brokers該topic的分區(qū)
1分區(qū)
1分區(qū).png
2分區(qū)
2分區(qū).png
3分區(qū)
3分區(qū).png 分區(qū)名格式為 topic-分區(qū)索引,索引從0開始算
能看到,消息已經(jīng)相對均勻分布在3個(gè)分區(qū),也就是三臺機(jī)器上面,從而達(dá)到kafka負(fù)載均衡



