用戶行為日志分析是實(shí)時(shí)數(shù)據(jù)處理很常見的一個(gè)應(yīng)用場景,比如常見的PV、UV統(tǒng)計(jì)。本文將基于Flink從0到1構(gòu)建一個(gè)用戶行為日志分析系統(tǒng),包括架構(gòu)設(shè)計(jì)與代碼實(shí)現(xiàn)。本文分享將完整呈現(xiàn)日志分析系統(tǒng)的數(shù)據(jù)處理鏈路,通過本文,你可以了解到:
- 基于discuz搭建一個(gè)論壇平臺(tái)
- Flume日志收集系統(tǒng)使用方式
- Apache日志格式分析
- Flume與Kafka集成
- 日志分析處理流程
- 架構(gòu)設(shè)計(jì)與完整的代碼實(shí)現(xiàn)
項(xiàng)目簡介
本文分享會(huì)從0到1基于Flink實(shí)現(xiàn)一個(gè)實(shí)時(shí)的用戶行為日志分析系統(tǒng),基本架構(gòu)圖如下:

首先會(huì)先搭建一個(gè)論壇平臺(tái),對(duì)論壇平臺(tái)產(chǎn)生的用戶點(diǎn)擊日志進(jìn)行分析。然后使用Flume日志收集系統(tǒng)對(duì)產(chǎn)生的Apache日志進(jìn)行收集,并將其推送到Kafka。接著我們使用Flink對(duì)日志進(jìn)行實(shí)時(shí)分析處理,將處理之后的結(jié)果寫入MySQL供前端應(yīng)用可視化展示。本文主要實(shí)現(xiàn)以下三個(gè)指標(biāo)計(jì)算:
- 統(tǒng)計(jì)熱門板塊,即訪問量最高的板塊
- 統(tǒng)計(jì)熱門文章,即訪問量最高的帖子文章
- 統(tǒng)計(jì)不同客戶端對(duì)版塊和文章的總訪問量
基于discuz搭建一個(gè)論壇平臺(tái)
安裝XAMPP
- 下載
wget https://www.apachefriends.org/xampp-files/5.6.33/xampp-linux-x64-5.6.33-0-installer.run
- 安裝
# 賦予文件執(zhí)行權(quán)限
chmod u+x xampp-linux-x64-5.6.33-0-installer.run
# 運(yùn)行安裝文件
./xampp-linux-x64-5.6.33-0-installer.run
-
配置環(huán)境變量
將以下內(nèi)容加入到 ~/.bash_profile
export XAMPP=/opt/lampp/
export PATH=$PATH:$XAMPP:$XAMPP/bin
- 刷新環(huán)境變量
source ~/.bash_profile
- 啟動(dòng)XAMPP
xampp restart
- MySQL的root用戶密碼和權(quán)限修改
#修改root用戶密碼為123qwe
update mysql.user set password=PASSWORD('123qwe') where user='root';
flush privileges;
#賦予root用戶遠(yuǎn)程登錄權(quán)限
grant all privileges on *.* to 'root'@'%' identified by '123qwe' with grant option;
flush privileges;
安裝Discuz
- 下載discuz
wget http://download.comsenz.com/DiscuzX/3.2/Discuz_X3.2_SC_UTF8.zip
- 安裝
#刪除原有的web應(yīng)用
rm -rf /opt/lampp/htdocs/*
unzip Discuz_X3.2_SC_UTF8.zip –d /opt/lampp/htdocs/
cd /opt/lampp/htdocs/
mv upload/*
#修改目錄權(quán)限
chmod 777 -R /opt/lampp/htdocs/config/
chmod 777 -R /opt/lampp/htdocs/data/
chmod 777 -R /opt/lampp/htdocs/uc_client/
chmod 777 -R /opt/lampp/htdocs/uc_server/
Discuz基本操作
- 自定義版塊
- 進(jìn)入discuz后臺(tái):http://kms-4/admin.php
- 點(diǎn)擊頂部的論壇菜單
- 按照頁面提示創(chuàng)建所需版本,可以創(chuàng)建父子版塊

Discuz帖子/版塊存儲(chǔ)數(shù)據(jù)庫表介
-- 登錄ultrax數(shù)據(jù)庫
mysql -uroot -p123 ultrax
-- 查看包含帖子id及標(biāo)題對(duì)應(yīng)關(guān)系的表
-- tid, subject(文章id、標(biāo)題)
select tid, subject from pre_forum_post limit 10;
-- fid, name(版塊id、標(biāo)題)
select fid, name from pre_forum_forum limit 40;
當(dāng)我們?cè)诟鱾€(gè)板塊添加帖子之后,如下所示:

修改日志格式
- 查看訪問日志
# 日志默認(rèn)地址
/opt/lampp/logs/access_log
# 實(shí)時(shí)查看日志命令
tail –f /opt/lampp/logs/access_log
- 修改日志格式
Apache配置文件名稱為httpd.conf,完整路徑為/opt/lampp/etc/httpd.conf。由于默認(rèn)的日志類型為common類型,總共有7個(gè)字段。為了獲取更多的日志信息,我們需要將其格式修改為combined格式,該日志格式共有9個(gè)字段。修改方式如下:
# 啟用組合日志文件
CustomLog "logs/access_log" combined

- 重新加載配置文件
xampp reload
Apache日志格式介紹
192.168.10.1 - - [30/Aug/2020:15:53:15 +0800] "GET /forum.php?mod=forumdisplay&fid=43 HTTP/1.1" 200 30647 "http://kms-4/forum.php" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36"
上面的日志格式共有9個(gè)字段,分別用空格隔開。每個(gè)字段的具體含義如下:
192.168.10.1 ##(1)客戶端的IP地址
- ## (2)客戶端identity標(biāo)識(shí),該字段為"-"
- ## (3)客戶端userid標(biāo)識(shí),該字段為"-"
[30/Aug/2020:15:53:15 +0800] ## (4)服務(wù)器完成請(qǐng)求處理時(shí)的時(shí)間
"GET /forum.php?mod=forumdisplay&fid=43 HTTP/1.1" ## (5)請(qǐng)求類型 請(qǐng)求的資源 使用的協(xié)議
200 ## (6)服務(wù)器返回給客戶端的狀態(tài)碼,200表示成功
30647 ## (7)返回給客戶端不包括響應(yīng)頭的字節(jié)數(shù),如果沒有信息返回,則此項(xiàng)應(yīng)該是"-"
"http://kms-4/forum.php" ## (8)Referer請(qǐng)求頭
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36" ## (9)客戶端的瀏覽器信息
關(guān)于上面的日志格式,可以使用正則表達(dá)式進(jìn)行匹配:
(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) (\S+) (\S+) (\[.+?\]) (\"(.*?)\") (\d{3}) (\S+) (\"(.*?)\") (\"(.*?)\")
Flume與Kafka集成
本文使用Flume對(duì)產(chǎn)生的Apache日志進(jìn)行收集,然后推送至Kafka。需要啟動(dòng)Flume agent對(duì)日志進(jìn)行收集,對(duì)應(yīng)的配置文件如下:
# agent的名稱為a1
a1.sources = source1
a1.channels = channel1
a1.sinks = sink1
# set source
a1.sources.source1.type = TAILDIR
a1.sources.source1.filegroups = f1
a1.sources.source1.filegroups.f1 = /opt/lampp/logs/access_log
a1sources.source1.fileHeader = flase
# 配置sink
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.brokerList=kms-2:9092,kms-3:9092,kms-4:9092
a1.sinks.sink1.topic= user_access_logs
a1.sinks.sink1.kafka.flumeBatchSize = 20
a1.sinks.sink1.kafka.producer.acks = 1
a1.sinks.sink1.kafka.producer.linger.ms = 1
a1.sinks.sink1.kafka.producer.compression.type = snappy
# 配置channel
a1.channels.channel1.type = file
a1.channels.channel1.checkpointDir = /home/kms/data/flume_data/checkpoint
a1.channels.channel1.dataDirs= /home/kms/data/flume_data/data
# 配置bind
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1
知識(shí)點(diǎn):
Taildir Source相比Exec Source、Spooling Directory Source的優(yōu)勢(shì)是什么?
TailDir Source:斷點(diǎn)續(xù)傳、多目錄。Flume1.6以前需要自己自定義Source記錄每次讀取文件位置,實(shí)現(xiàn)斷點(diǎn)續(xù)傳
Exec Source:可以實(shí)時(shí)收集數(shù)據(jù),但是在Flume不運(yùn)行或者Shell命令出錯(cuò)的情況下,數(shù)據(jù)將會(huì)丟失
Spooling Directory Source:監(jiān)控目錄,不支持?jǐn)帱c(diǎn)續(xù)傳
值得注意的是,上面的配置是直接將原始日志push到Kafka。除此之外,我們還可以自定義Flume的攔截器對(duì)原始日志先進(jìn)行過濾處理,同時(shí)也可以實(shí)現(xiàn)將不同的日志push到Kafka的不同Topic中。
啟動(dòng)Flume Agent
將啟動(dòng)Agent的命令封裝成shell腳本:**start-log-collection.sh **,腳本內(nèi)容如下:
#!/bin/bash
echo "start log agent !!!"
/opt/modules/apache-flume-1.9.0-bin/bin/flume-ng agent --conf-file /opt/modules/apache-flume-1.9.0-bin/conf/log_collection.conf --name a1 -Dflume.root.logger=INFO,console
查看push到Kafka的日志數(shù)據(jù)
將控制臺(tái)消費(fèi)者命令封裝成shell腳本:kafka-consumer.sh,腳本內(nèi)容如下:
#!/bin/bash
echo "kafka consumer "
bin/kafka-console-consumer.sh --bootstrap-server kms-2.apache.com:9092,kms-3.apache.com:9092,kms-4.apache.com:9092 --topic $1 --from-beginning
使用下面命令消費(fèi)Kafka中的數(shù)據(jù):
[kms@kms-2 kafka_2.11-2.1.0]$ ./kafka-consumer.sh user_access_logs
日志分析處理流程
為了方便解釋,下面會(huì)對(duì)重要代碼進(jìn)行講解,完整代碼移步github:https://github.com/jiamx/flink-log-analysis

創(chuàng)建MySQL數(shù)據(jù)庫和目標(biāo)表
-- 客戶端訪問量統(tǒng)計(jì)
CREATE TABLE `client_ip_access` (
`client_ip` char(50) NOT NULL COMMENT '客戶端ip',
`client_access_cnt` bigint(20) NOT NULL COMMENT '訪問次數(shù)',
`statistic_time` text NOT NULL COMMENT '統(tǒng)計(jì)時(shí)間',
PRIMARY KEY (`client_ip`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 熱門文章統(tǒng)計(jì)
CREATE TABLE `hot_article` (
`article_id` int(10) NOT NULL COMMENT '文章id',
`subject` varchar(80) NOT NULL COMMENT '文章標(biāo)題',
`article_pv` bigint(20) NOT NULL COMMENT '訪問次數(shù)',
`statistic_time` text NOT NULL COMMENT '統(tǒng)計(jì)時(shí)間',
PRIMARY KEY (`article_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 熱門板塊統(tǒng)計(jì)
CREATE TABLE `hot_section` (
`section_id` int(10) NOT NULL COMMENT '版塊id',
`name` char(50) NOT NULL COMMENT '版塊標(biāo)題',
`section_pv` bigint(20) NOT NULL COMMENT '訪問次數(shù)',
`statistic_time` text NOT NULL COMMENT '統(tǒng)計(jì)時(shí)間',
PRIMARY KEY (`section_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
AccessLogRecord類
該類封裝了日志所包含的字段數(shù)據(jù),共有9個(gè)字段。
/**
* 使用lombok
* 原始日志封裝類
*/
@Data
public class AccessLogRecord {
public String clientIpAddress; // 客戶端ip地址
public String clientIdentity; // 客戶端身份標(biāo)識(shí),該字段為 `-`
public String remoteUser; // 用戶標(biāo)識(shí),該字段為 `-`
public String dateTime; //日期,格式為[day/month/yearhourminutesecond zone]
public String request; // url請(qǐng)求,如:`GET /foo ...`
public String httpStatusCode; // 狀態(tài)碼,如:200; 404.
public String bytesSent; // 傳輸?shù)淖止?jié)數(shù),有可能是 `-`
public String referer; // 參考鏈接,即來源頁
public String userAgent; // 瀏覽器和操作系統(tǒng)類型
}
LogParse類
該類是日志解析類,通過正則表達(dá)式對(duì)日志進(jìn)行匹配,對(duì)匹配上的日志進(jìn)行按照字段解析。
public class LogParse implements Serializable {
//構(gòu)建正則表達(dá)式
private String regex = "(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}) (\\S+) (\\S+) (\\[.+?\\]) (\\\"(.*?)\\\") (\\d{3}) (\\S+) (\\\"(.*?)\\\") (\\\"(.*?)\\\")";
private Pattern p = Pattern.compile(regex);
/*
*構(gòu)造訪問日志的封裝類對(duì)象
* */
public AccessLogRecord buildAccessLogRecord(Matcher matcher) {
AccessLogRecord record = new AccessLogRecord();
record.setClientIpAddress(matcher.group(1));
record.setClientIdentity(matcher.group(2));
record.setRemoteUser(matcher.group(3));
record.setDateTime(matcher.group(4));
record.setRequest(matcher.group(5));
record.setHttpStatusCode(matcher.group(6));
record.setBytesSent(matcher.group(7));
record.setReferer(matcher.group(8));
record.setUserAgent(matcher.group(9));
return record;
}
/**
* @param record:record表示一條apache combined 日志
* @return 解析日志記錄,將解析的日志封裝成一個(gè)AccessLogRecord類
*/
public AccessLogRecord parseRecord(String record) {
Matcher matcher = p.matcher(record);
if (matcher.find()) {
return buildAccessLogRecord(matcher);
}
return null;
}
/**
* @param request url請(qǐng)求,類型為字符串,類似于 "GET /the-uri-here HTTP/1.1"
* @return 一個(gè)三元組(requestType, uri, httpVersion). requestType表示請(qǐng)求類型,如GET, POST等
*/
public Tuple3<String, String, String> parseRequestField(String request) {
//請(qǐng)求的字符串格式為:“GET /test.php HTTP/1.1”,用空格切割
String[] arr = request.split(" ");
if (arr.length == 3) {
return Tuple3.of(arr[0], arr[1], arr[2]);
} else {
return null;
}
}
/**
* 將apache日志中的英文日期轉(zhuǎn)化為指定格式的中文日期
*
* @param dateTime 傳入的apache日志中的日期字符串,"[21/Jul/2009:02:48:13 -0700]"
* @return
*/
public String parseDateField(String dateTime) throws ParseException {
// 輸入的英文日期格式
String inputFormat = "dd/MMM/yyyy:HH:mm:ss";
// 輸出的日期格式
String outPutFormat = "yyyy-MM-dd HH:mm:ss";
String dateRegex = "\\[(.*?) .+]";
Pattern datePattern = Pattern.compile(dateRegex);
Matcher dateMatcher = datePattern.matcher(dateTime);
if (dateMatcher.find()) {
String dateString = dateMatcher.group(1);
SimpleDateFormat dateInputFormat = new SimpleDateFormat(inputFormat, Locale.ENGLISH);
Date date = dateInputFormat.parse(dateString);
SimpleDateFormat dateOutFormat = new SimpleDateFormat(outPutFormat);
String formatDate = dateOutFormat.format(date);
return formatDate;
} else {
return "";
}
}
/**
* 解析request,即訪問頁面的url信息解析
* "GET /about/forum.php?mod=viewthread&tid=5&extra=page%3D1 HTTP/1.1"
* 匹配出訪問的fid:版本id
* 以及tid:文章id
* @param request
* @return
*/
public Tuple2<String, String> parseSectionIdAndArticleId(String request) {
// 匹配出前面是"forumdisplay&fid="的數(shù)字記為版塊id
String sectionIdRegex = "(\\?mod=forumdisplay&fid=)(\\d+)";
Pattern sectionPattern = Pattern.compile(sectionIdRegex);
// 匹配出前面是"tid="的數(shù)字記為文章id
String articleIdRegex = "(\\?mod=viewthread&tid=)(\\d+)";
Pattern articlePattern = Pattern.compile(articleIdRegex);
String[] arr = request.split(" ");
String sectionId = "";
String articleId = "";
if (arr.length == 3) {
Matcher sectionMatcher = sectionPattern.matcher(arr[1]);
Matcher articleMatcher = articlePattern.matcher(arr[1]);
sectionId = (sectionMatcher.find()) ? sectionMatcher.group(2) : "";
articleId = (articleMatcher.find()) ? articleMatcher.group(2) : "";
}
return Tuple2.of(sectionId, articleId);
}
}
LogAnalysis類
該類是日志處理的基本邏輯
public class LogAnalysis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
// 開啟checkpoint,時(shí)間間隔為毫秒
senv.enableCheckpointing(5000L);
// 選擇狀態(tài)后端
// 本地測(cè)試
// senv.setStateBackend(new FsStateBackend("file:///E://checkpoint"));
// 集群運(yùn)行
senv.setStateBackend(new FsStateBackend("hdfs://kms-1:8020/flink-checkpoints"));
// 重啟策略
senv.setRestartStrategy(
RestartStrategies.fixedDelayRestart(3, Time.of(2, TimeUnit.SECONDS) ));
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv, settings);
// kafka參數(shù)配置
Properties props = new Properties();
// kafka broker地址
props.put("bootstrap.servers", "kms-2:9092,kms-3:9092,kms-4:9092");
// 消費(fèi)者組
props.put("group.id", "log_consumer");
// kafka 消息的key序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// kafka 消息的value序列化器
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(
"user_access_logs",
new SimpleStringSchema(),
props);
DataStreamSource<String> logSource = senv.addSource(kafkaConsumer);
// 獲取有效的日志數(shù)據(jù)
DataStream<AccessLogRecord> availableAccessLog = LogAnalysis.getAvailableAccessLog(logSource);
// 獲取[clienIP,accessDate,sectionId,articleId]
DataStream<Tuple4<String, String, Integer, Integer>> fieldFromLog = LogAnalysis.getFieldFromLog(availableAccessLog);
//從DataStream中創(chuàng)建臨時(shí)視圖,名稱為logs
// 添加一個(gè)計(jì)算字段:proctime,用于維表JOIN
tEnv.createTemporaryView("logs",
fieldFromLog,
$("clientIP"),
$("accessDate"),
$("sectionId"),
$("articleId"),
$("proctime").proctime());
// 需求1:統(tǒng)計(jì)熱門板塊
LogAnalysis.getHotSection(tEnv);
// 需求2:統(tǒng)計(jì)熱門文章
LogAnalysis.getHotArticle(tEnv);
// 需求3:統(tǒng)計(jì)不同客戶端ip對(duì)版塊和文章的總訪問量
LogAnalysis.getClientAccess(tEnv);
senv.execute("log-analysisi");
}
/**
* 統(tǒng)計(jì)不同客戶端ip對(duì)版塊和文章的總訪問量
* @param tEnv
*/
private static void getClientAccess(StreamTableEnvironment tEnv) {
// sink表
// [client_ip,client_access_cnt,statistic_time]
// [客戶端ip,訪問次數(shù),統(tǒng)計(jì)時(shí)間]
String client_ip_access_ddl = "" +
"CREATE TABLE client_ip_access (\n" +
" client_ip STRING ,\n" +
" client_access_cnt BIGINT,\n" +
" statistic_time STRING,\n" +
" PRIMARY KEY (client_ip) NOT ENFORCED\n" +
")WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://kms-4:3306/statistics?useUnicode=true&characterEncoding=utf-8',\n" +
" 'table-name' = 'client_ip_access', \n" +
" 'driver' = 'com.mysql.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = '123qwe'\n" +
") ";
tEnv.executeSql(client_ip_access_ddl);
String client_ip_access_sql = "" +
"INSERT INTO client_ip_access\n" +
"SELECT\n" +
" clientIP,\n" +
" count(1) AS access_cnt,\n" +
" FROM_UNIXTIME(UNIX_TIMESTAMP()) AS statistic_time\n" +
"FROM\n" +
" logs \n" +
"WHERE\n" +
" articleId <> 0 \n" +
" OR sectionId <> 0 \n" +
"GROUP BY\n" +
" clientIP "
;
tEnv.executeSql(client_ip_access_sql);
}
/**
* 統(tǒng)計(jì)熱門文章
* @param tEnv
*/
private static void getHotArticle(StreamTableEnvironment tEnv) {
// JDBC數(shù)據(jù)源
// 文章id及標(biāo)題對(duì)應(yīng)關(guān)系的表,[tid, subject]分別為:文章id和標(biāo)題
String pre_forum_post_ddl = "" +
"CREATE TABLE pre_forum_post (\n" +
" tid INT,\n" +
" subject STRING,\n" +
" PRIMARY KEY (tid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://kms-4:3306/ultrax',\n" +
" 'table-name' = 'pre_forum_post', \n" +
" 'driver' = 'com.mysql.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = '123qwe'\n" +
")";
// 創(chuàng)建pre_forum_post數(shù)據(jù)源
tEnv.executeSql(pre_forum_post_ddl);
// 創(chuàng)建MySQL的sink表
// [article_id,subject,article_pv,statistic_time]
// [文章id,標(biāo)題名稱,訪問次數(shù),統(tǒng)計(jì)時(shí)間]
String hot_article_ddl = "" +
"CREATE TABLE hot_article (\n" +
" article_id INT,\n" +
" subject STRING,\n" +
" article_pv BIGINT ,\n" +
" statistic_time STRING,\n" +
" PRIMARY KEY (article_id) NOT ENFORCED\n" +
")WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://kms-4:3306/statistics?useUnicode=true&characterEncoding=utf-8',\n" +
" 'table-name' = 'hot_article', \n" +
" 'driver' = 'com.mysql.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = '123qwe'\n" +
")";
tEnv.executeSql(hot_article_ddl);
// 向MySQL目標(biāo)表insert數(shù)據(jù)
String hot_article_sql = "" +
"INSERT INTO hot_article\n" +
"SELECT \n" +
" a.articleId,\n" +
" b.subject,\n" +
" count(1) as article_pv,\n" +
" FROM_UNIXTIME(UNIX_TIMESTAMP()) AS statistic_time\n" +
"FROM logs a \n" +
" JOIN pre_forum_post FOR SYSTEM_TIME AS OF a.proctime as b ON a.articleId = b.tid\n" +
"WHERE a.articleId <> 0\n" +
"GROUP BY a.articleId,b.subject\n" +
"ORDER BY count(1) desc\n" +
"LIMIT 10";
tEnv.executeSql(hot_article_sql);
}
/**
* 統(tǒng)計(jì)熱門板塊
*
* @param tEnv
*/
public static void getHotSection(StreamTableEnvironment tEnv) {
// 板塊id及其名稱對(duì)應(yīng)關(guān)系表,[fid, name]分別為:版塊id和板塊名稱
String pre_forum_forum_ddl = "" +
"CREATE TABLE pre_forum_forum (\n" +
" fid INT,\n" +
" name STRING,\n" +
" PRIMARY KEY (fid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://kms-4:3306/ultrax',\n" +
" 'table-name' = 'pre_forum_forum', \n" +
" 'driver' = 'com.mysql.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = '123qwe',\n" +
" 'lookup.cache.ttl' = '10',\n" +
" 'lookup.cache.max-rows' = '1000'" +
")";
// 創(chuàng)建pre_forum_forum數(shù)據(jù)源
tEnv.executeSql(pre_forum_forum_ddl);
// 創(chuàng)建MySQL的sink表
// [section_id,name,section_pv,statistic_time]
// [板塊id,板塊名稱,訪問次數(shù),統(tǒng)計(jì)時(shí)間]
String hot_section_ddl = "" +
"CREATE TABLE hot_section (\n" +
" section_id INT,\n" +
" name STRING ,\n" +
" section_pv BIGINT,\n" +
" statistic_time STRING,\n" +
" PRIMARY KEY (section_id) NOT ENFORCED \n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://kms-4:3306/statistics?useUnicode=true&characterEncoding=utf-8',\n" +
" 'table-name' = 'hot_section', \n" +
" 'driver' = 'com.mysql.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = '123qwe'\n" +
")";
// 創(chuàng)建sink表:hot_section
tEnv.executeSql(hot_section_ddl);
//統(tǒng)計(jì)熱門板塊
// 使用日志流與MySQL的維表數(shù)據(jù)進(jìn)行JOIN
// 從而獲取板塊名稱
String hot_section_sql = "" +
"INSERT INTO hot_section\n" +
"SELECT\n" +
" a.sectionId,\n" +
" b.name,\n" +
" count(1) as section_pv,\n" +
" FROM_UNIXTIME(UNIX_TIMESTAMP()) AS statistic_time \n" +
"FROM\n" +
" logs a\n" +
" JOIN pre_forum_forum FOR SYSTEM_TIME AS OF a.proctime as b ON a.sectionId = b.fid \n" +
"WHERE\n" +
" a.sectionId <> 0 \n" +
"GROUP BY a.sectionId, b.name\n" +
"ORDER BY count(1) desc\n" +
"LIMIT 10";
// 執(zhí)行數(shù)據(jù)insert
tEnv.executeSql(hot_section_sql);
}
/**
* 獲取[clienIP,accessDate,sectionId,articleId]
* 分別為客戶端ip,訪問日期,板塊id,文章id
*
* @param logRecord
* @return
*/
public static DataStream<Tuple4<String, String, Integer, Integer>> getFieldFromLog(DataStream<AccessLogRecord> logRecord) {
DataStream<Tuple4<String, String, Integer, Integer>> fieldFromLog = logRecord.map(new MapFunction<AccessLogRecord, Tuple4<String, String, Integer, Integer>>() {
@Override
public Tuple4<String, String, Integer, Integer> map(AccessLogRecord accessLogRecord) throws Exception {
LogParse parse = new LogParse();
String clientIpAddress = accessLogRecord.getClientIpAddress();
String dateTime = accessLogRecord.getDateTime();
String request = accessLogRecord.getRequest();
String formatDate = parse.parseDateField(dateTime);
Tuple2<String, String> sectionIdAndArticleId = parse.parseSectionIdAndArticleId(request);
if (formatDate == "" || sectionIdAndArticleId == Tuple2.of("", "")) {
return new Tuple4<String, String, Integer, Integer>("0.0.0.0", "0000-00-00 00:00:00", 0, 0);
}
Integer sectionId = (sectionIdAndArticleId.f0 == "") ? 0 : Integer.parseInt(sectionIdAndArticleId.f0);
Integer articleId = (sectionIdAndArticleId.f1 == "") ? 0 : Integer.parseInt(sectionIdAndArticleId.f1);
return new Tuple4<>(clientIpAddress, formatDate, sectionId, articleId);
}
});
return fieldFromLog;
}
/**
* 篩選可用的日志記錄
*
* @param accessLog
* @return
*/
public static DataStream<AccessLogRecord> getAvailableAccessLog(DataStream<String> accessLog) {
final LogParse logParse = new LogParse();
//解析原始日志,將其解析為AccessLogRecord格式
DataStream<AccessLogRecord> filterDS = accessLog.map(new MapFunction<String, AccessLogRecord>() {
@Override
public AccessLogRecord map(String log) throws Exception {
return logParse.parseRecord(log);
}
}).filter(new FilterFunction<AccessLogRecord>() {
//過濾掉無效日志
@Override
public boolean filter(AccessLogRecord accessLogRecord) throws Exception {
return !(accessLogRecord == null);
}
}).filter(new FilterFunction<AccessLogRecord>() {
//過濾掉狀態(tài)碼非200的記錄,即保留請(qǐng)求成功的日志記錄
@Override
public boolean filter(AccessLogRecord accessLogRecord) throws Exception {
return !accessLogRecord.getHttpStatusCode().equals("200");
}
});
return filterDS;
}
}
將上述代碼打包上傳到集群運(yùn)行,在執(zhí)行提交命令之前,需要先將Hadoop的依賴jar包放置在Flink安裝目錄下的lib文件下:flink-shaded-hadoop-2-uber-2.7.5-10.0.jar,因?yàn)槲覀兣渲昧薍DFS上的狀態(tài)后端,而Flink的release包不含有Hadoop的依賴Jar包。

否則會(huì)報(bào)如下錯(cuò)誤:
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
提交到集群
編寫提交命令腳本
#!/bin/bash
/opt/modules/flink-1.11.1/bin/flink run -m kms-1:8081 \
-c com.jmx.analysis.LogAnalysis \
/opt/softwares/com.jmx-1.0-SNAPSHOT.jar
提交之后,訪問Flink的Web界面,查看任務(wù):

此時(shí)訪問論壇,點(diǎn)擊板塊和帖子文章,觀察數(shù)據(jù)庫變化:

總結(jié)
本文主要分享了從0到1構(gòu)建一個(gè)用戶行為日志分析系統(tǒng)。首先,基于discuz搭建了論壇平臺(tái),針對(duì)論壇產(chǎn)生的日志,使用Flume進(jìn)行收集并push到Kafka中;接著使用Flink對(duì)其進(jìn)行分析處理;最后將處理結(jié)果寫入MySQL供可視化展示使用。