我們都知道服務(wù)用戶訪問(wèn)流量是不間斷的,基于網(wǎng)站的訪問(wèn)日志,即 Web log 分析是典型的流式實(shí)時(shí)計(jì)算應(yīng)用場(chǎng)景。比如百度統(tǒng)計(jì),它可以做流量分析、來(lái)源分析、網(wǎng)站分析、轉(zhuǎn)化分析。另外還有特定場(chǎng)景分析,比如安全分析,用來(lái)識(shí)別 CC 攻擊、 SQL 注入分析、脫庫(kù)等。在本次實(shí)踐中,我們將基于 Spark Streaming 流式計(jì)算框架,簡(jiǎn)單地實(shí)現(xiàn)一個(gè)類(lèi)似于百度分析的系統(tǒng)。
知識(shí)點(diǎn)簡(jiǎn)述
- Python 模擬生成 Nginx 日志
- Spark Streaming 編程
- 服務(wù)器訪問(wèn)日志分析方法
原理簡(jiǎn)述
百度統(tǒng)計(jì)是百度推出的一款免費(fèi)的專(zhuān)業(yè)網(wǎng)站流量分析工具,能夠告訴用戶訪客是如何找到并瀏覽用戶的網(wǎng)站的,以及在網(wǎng)站上瀏覽了哪些頁(yè)面。這些信息可以幫助用戶改善訪客在其網(wǎng)站上的使用體驗(yàn),不斷提升網(wǎng)站的投資回報(bào)率。
百度統(tǒng)計(jì)提供了幾十種圖形化報(bào)告,包括:趨勢(shì)分析、來(lái)源分析、頁(yè)面分析、訪客分析、定制分析等多種統(tǒng)計(jì)分析服務(wù)。
這里我們參考百度統(tǒng)計(jì)的功能,基于 Spark Streaming 簡(jiǎn)單實(shí)現(xiàn)一個(gè)分析系統(tǒng),使之包括以下分析功能。
- 流量分析。一段時(shí)間內(nèi)用戶網(wǎng)站的流量變化趨勢(shì),針對(duì)不同的 IP 對(duì)用戶網(wǎng)站的流量進(jìn)行細(xì)分。常見(jiàn)指標(biāo)是總 PV 和各 IP 的PV。
- 來(lái)源分析。各種搜索引擎來(lái)源給用戶網(wǎng)站帶來(lái)的流量情況,需要精確到具體搜索引擎、具體關(guān)鍵詞。通過(guò)來(lái)源分析,用戶可以及時(shí)了解哪種類(lèi)型的來(lái)源為其帶來(lái)了更多訪客。常見(jiàn)指標(biāo)是搜索引擎、關(guān)鍵詞和終端類(lèi)型的 PV 。
- 網(wǎng)站分析。各個(gè)頁(yè)面的訪問(wèn)情況,包括及時(shí)了解哪些頁(yè)面最吸引訪客以及哪些頁(yè)面最容易導(dǎo)致訪客流失,從而幫助用戶更有針對(duì)性地改善網(wǎng)站質(zhì)量。常見(jiàn)指標(biāo)是各頁(yè)面的 PV 。
日志實(shí)時(shí)采集
Web log 一般在 HTTP 服務(wù)器收集,比如 Nginx access 日志文件。一個(gè)典型的方案是 Nginx 日志文件 + Flume + Kafka + Spark Streaming,如下所述:
- 接收服務(wù)器用 Nginx ,根據(jù)負(fù)載可以部署多臺(tái),數(shù)據(jù)落地至本地日志文件;
- 每個(gè) Nginx 節(jié)點(diǎn)上部署 Flume ,使用 tail -f 實(shí)時(shí)讀取 Nginx 日志,發(fā)送至 KafKa 集群;
- 專(zhuān)用的 Kafka 集群用戶連接實(shí)時(shí)日志與 Spark 集群,詳細(xì)配置可以參考 http://spark.apache.org/docs/2.1.1/streaming-kafka-integration.html ;
- Spark Streaming 程序?qū)崟r(shí)消費(fèi) Kafka 集群上的數(shù)據(jù),實(shí)時(shí)分析,輸出;
流式分析系統(tǒng)實(shí)現(xiàn)
我們簡(jiǎn)單模擬一下數(shù)據(jù)收集和發(fā)送的環(huán)節(jié),用一個(gè) Python 腳本隨機(jī)生成 Nginx 訪問(wèn)日志,并通過(guò)腳本的方式自動(dòng)上傳至 HDFS ,然后移動(dòng)至指定目錄。 Spark Streaming 程序監(jiān)控 HDFS 目錄,自動(dòng)處理新的文件。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import random
import time
class WebLogGeneration(object):
# 類(lèi)屬性,由所有類(lèi)的對(duì)象共享
site_url_base = "http://www.xxx.com/"
# 基本構(gòu)造函數(shù)
def __init__(self):
# 前面7條是IE,所以大概瀏覽器類(lèi)型70%為IE ,接入類(lèi)型上,20%為移動(dòng)設(shè)備,分別是7和8條,5% 為空
# https://github.com/mssola/user_agent/blob/master/all_test.go
self.user_agent_dist = {0.0:"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)",
0.1:"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)",
0.2:"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727)",
0.3:"Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)",
0.4:"Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko",
0.5:"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:41.0) Gecko/20100101 Firefox/41.0",
0.6:"Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)",
0.7:"Mozilla/5.0 (iPhone; CPU iPhone OS 7_0_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53",
0.8:"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19",
0.9:"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.85 Safari/537.36",
1:" ",}
self.ip_slice_list = [10, 29, 30, 46, 55, 63, 72, 87, 98,132,156,124,167,143,187,168,190,201,202,214,215,222]
self.url_path_list = ["login.php","view.php","list.php","upload.php","admin/login.php","edit.php","index.html"]
self.http_refer = [ "http://www.baidu.com/s?wd={query}","http://www.google.cn/search?q={query}","http://www.sogou.com/web?query={query}","http://one.cn.yahoo.com/s?p={query}","http://cn.bing.com/search?q={query}"]
self.search_keyword = ["spark","hadoop","hive","spark mlib","spark sql"]
def sample_ip(self):
slice = random.sample(self.ip_slice_list, 4) #從ip_slice_list中隨機(jī)獲取4個(gè)元素,作為一個(gè)片斷返回
return ".".join([str(item) for item in slice]) # todo
def sample_url(self):
return random.sample(self.url_path_list,1)[0]
def sample_user_agent(self):
dist_uppon = random.uniform(0, 1)
return self.user_agent_dist[float('%0.1f' % dist_uppon)]
# 主要搜索引擎referrer參數(shù)
def sample_refer(self):
if random.uniform(0, 1) > 0.2: # 只有20% 流量有refer
return "-"
refer_str=random.sample(self.http_refer,1)
query_str=random.sample(self.search_keyword,1)
return refer_str[0].format(query=query_str[0])
def sample_one_log(self,count = 3):
time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
while count >1:
query_log = "{ip} - - [{local_time}] \"GET /{url} HTTP/1.1\" 200 0 \"{refer}\" \"{user_agent}\" \"-\"".format(ip=self.sample_ip(),local_time=time_str,url=self.sample_url(),refer=self.sample_refer(),user_agent=self.sample_user_agent())
print query_log
count = count -1
if __name__ == "__main__":
web_log_gene = WebLogGeneration()
#while True:
# time.sleep(random.uniform(0, 3))
web_log_gene.sample_one_log(random.uniform(10, 100))
然后需要一個(gè)簡(jiǎn)單的腳本來(lái)調(diào)用上面的腳本以隨機(jī)生成日志,上傳至 HDFS ,然后移動(dòng)到目標(biāo)目錄:
#!/bin/bash
# HDFS命令
HDFS="/usr/local/myhadoop/hadoop-2.7.3/bin/hadoop fs"
# Streaming程序監(jiān)聽(tīng)的目錄,注意跟后面Streaming程序的配置要保持一致
streaming_dir=”/spark/streaming”
# 清空舊數(shù)據(jù)
$HDFS -rm "${streaming_dir}"'/tmp/*' > /dev/null 2>&1
$HDFS -rm "${streaming_dir}"'/*' > /dev/null 2>&1
# 一直運(yùn)行
while [ 1 ]; do
./sample_web_log.py > test.log
# 給日志文件加上時(shí)間戳,避免重名
tmplog="access.`date +'%s'`.log"
# 先放在臨時(shí)目錄,再move至Streaming程序監(jiān)控的目錄下,確保原子性
# 臨時(shí)目錄用的是監(jiān)控目錄的子目錄,因?yàn)樽幽夸洸粫?huì)被監(jiān)控
$HDFS -put test.log ${streaming_dir}/tmp/$tmplog
$HDFS -mv ${streaming_dir}/tmp/$tmplog ${streaming_dir}/
echo "`date +"%F %T"` put $tmplog to HDFS succeed"
sleep 1
done
Spark Streaming 程序代碼如下所示,可以在 bin/spark-shell 交互式環(huán)境下運(yùn)行,如果要以 Spark 程序的方式運(yùn)行,按注釋中的說(shuō)明調(diào)整一下 StreamingContext 的生成方式即可。啟動(dòng) bin/spark-shell 時(shí),為了避免因 DEBUG 日志信息太多而影響觀察輸出,可以將 DEBUG 日志重定向至文件,屏幕上只顯示主要輸出,方法是 ./bin/spark-shell 2>spark-shell-debug.log:
// 導(dǎo)入類(lèi)
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
// 設(shè)計(jì)計(jì)算的周期,單位秒
val batch = 10
/*
* 這是bin/spark-shell交互式模式下創(chuàng)建StreamingContext的方法
* 非交互式請(qǐng)使用下面的方法來(lái)創(chuàng)建
*/
val ssc = new StreamingContext(sc, Seconds(batch))
/*
// 非交互式下創(chuàng)建StreamingContext的方法
val conf = new SparkConf().setAppName("NginxAnay")
val ssc = new StreamingContext(conf, Seconds(batch))
*/
/*
* 創(chuàng)建輸入DStream,是文本文件目錄類(lèi)型
* 本地模式下也可以使用本地文件系統(tǒng)的目錄,比如 file:///home/spark/streaming
*/
val lines = ssc.textFileStream("hdfs:///spark/streaming")
/*
* 下面是統(tǒng)計(jì)各項(xiàng)指標(biāo),調(diào)試時(shí)可以只進(jìn)行部分統(tǒng)計(jì),方便觀察結(jié)果
*/
// 1. 總PV
lines.count().print()
// 2. 各IP的PV,按PV倒序
// 空格分隔的第一個(gè)字段就是IP
lines.map(line => {(line.split(" ")(0), 1)}).reduceByKey(_ + _).transform(rdd => {
rdd.map(ip_pv => (ip_pv._2, ip_pv._1)).
sortByKey(false).
map(ip_pv => (ip_pv._2, ip_pv._1))
}).print()
// 3. 搜索引擎PV
val refer = lines.map(_.split("\"")(3))
// 先輸出搜索引擎和查詢關(guān)鍵詞,避免統(tǒng)計(jì)搜索關(guān)鍵詞時(shí)重復(fù)計(jì)算
// 輸出(host, query_keys)
val searchEnginInfo = refer.map(r => {
val f = r.split('/')
val searchEngines = Map(
"www.google.cn" -> "q",
"www.yahoo.com" -> "p",
"cn.bing.com" -> "q",
"www.baidu.com" -> "wd",
"www.sogou.com" -> "query"
)
if (f.length > 2) {
val host = f(2)
if (searchEngines.contains(host)) {
val query = r.split('?')(1)
if (query.length > 0) {
val arr_search_q = query.split('&').filter(_.indexOf(searchEngines(host)+"=") == 0)
if (arr_search_q.length > 0)
(host, arr_search_q(0).split('=')(1))
else
(host, "")
} else {
(host, "")
}
} else
("", "")
} else
("", "")
})
// 輸出搜索引擎PV
searchEnginInfo.filter(_._1.length > 0).map(p => {(p._1, 1)}).reduceByKey(_ + _).print()
// 4. 關(guān)鍵詞PV
searchEnginInfo.filter(_._2.length > 0).map(p => {(p._2, 1)}).reduceByKey(_ + _).print()
// 5. 終端類(lèi)型PV
lines.map(_.split("\"")(5)).map(agent => {
val types = Seq("iPhone", "Android")
var r = "Default"
for (t <- types) {
if (agent.indexOf(t) != -1)
r = t
}
(r, 1)
}).reduceByKey(_ + _).print()
// 6. 各頁(yè)面PV
lines.map(line => {(line.split("\"")(1).split(" ")(1), 1)}).reduceByKey(_ + _).print()
// 啟動(dòng)計(jì)算,等待執(zhí)行結(jié)束(出錯(cuò)或Ctrl-C退出)
ssc.start()
ssc.awaitTermination()
參考 實(shí)驗(yàn)樓 《流式實(shí)時(shí)日志分析系統(tǒng)》
若有疑問(wèn),歡迎留言交流