flume:一個(gè)例子的分析(二)

在 上篇中,flume 使用的是自定義攔截器:LogAnalysisInterceptor ,下面看下代碼:

package com.glbg.flume.interceptors;


/**
 * @Description: TODO 日志解析攔截器
 */
public class LogAnalysisInterceptor implements Interceptor {

    @Override
    public void initialize(){
    }
    
    @Override
    public void close(){
    }

    @Override
    public Event intercept(Event event){
        Map<String, String> headers = event.getHeaders();
        String body = new String(event.getBody(),"UTF-8");

        headers.put(Constant.LOGTYPE, Constant.APP_LOG);
        
        body = URLDecoder.decode(body,"UTF-8");  //先解碼
        
        //UTC時(shí)區(qū)
        String eventDate = PatternUtil.getValueByPattern(Constant.APPFLAYER_EVENT_DATE_PATTERN, body);
        
        String appId = PatternUtil.getValueByPattern(Constant.APPFLAYER_APP_ID_PATTERN, body);
        
        if (!StringUtils.isEmpty(eventDate) && !StringUtils.isEmpty(appId)){
            String[] dateArray = eventDate.split("-");
            
            headers.put(Constant.YEAR, dateArray[0]);
            headers.put(Constant.MONTH, dateArray[1]);
            headers.put(Constant.DAY, dateArray[2]);
            headers.put(Constant.UBCD, Constant.APP_SITE_CODE_MAP.get(appId));
            headers.put(Constant.DATAROUTE, Constant.RIGHT);

        } else {
            headers.put(Constant.DATAROUTE, Constant.WRONG);
        }
        
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events){
        List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
        for (Event event : events){
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null){
                intercepted.add(interceptedEvent);
            }
        }
        return intercepted;
    }
}
public class Constant {
    
    public static final String LOGTYPE = "logtype";
    public static final String APP_LOG = "app-log";
    public static final String APPFLAYER_EVENT_DATE_PATTERN = "event_date_pattern";
    public static final String APPFLAYER_APP_ID_PATTERN = "app_id_pattern";
    public static final String ubcd="ubcd";
    
    public static final String TIME_ZONE_EST = "EST";   //西五區(qū)
    public static final String YEAR = "YEAR";
    public static final String MONTH = "MONTH";
    public static final String DAY = "DAY";
    
    public static final String wrong = "wrong";

    
    public static Map<String,String> APP_SITE_CODE_MAP = new HashMap<String,String>();
    
    static {
        APP_SITE_CODE_MAP.put("id1078789949", "10013");
        APP_SITE_CODE_MAP.put("com.zaful", "10013");
        APP_SITE_CODE_MAP.put("id1131090631", "10002");
        APP_SITE_CODE_MAP.put("com.globalegrow.app.gearbest", "10002");
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容