將日志收集到aws的S3存儲(chǔ),通過aws ES實(shí)時(shí)分析日志、Spark離線日志分析,支持無線擴(kuò)容。
一、日志收集
利用logstash output直接到S3存儲(chǔ), 在aws建立好bucket,配置好logstash config文件將日志打入到bucket即可。具體配置見logstash output S3
實(shí)例:
output {
s3 {
access_key_id => "AKIAJEVY5ZME3Q"
secret_access_key => "QtQiyCZVi06xl9n/2JYfJ1YL+mPWf9"
region => "ap-southeast-1"
bucket => "ott-log-storage"
time_file => 5
codec => "json_lines"
}
}
上述會(huì)將日志切成5分鐘一段。在aws控制臺(tái)相應(yīng)的S3 bucket里面就會(huì)看到日志信息。
二、實(shí)時(shí)分析日志
在aws 服務(wù)列表找到 ElasticSearch Service, 按照步驟創(chuàng)建ES實(shí)例,十分鐘后,創(chuàng)建完成,會(huì)有對(duì)應(yīng)的EndPoint信息和Kibana地址。
選擇一臺(tái)機(jī)器,最好是EC2, 按照logstash, 將相應(yīng)S3 bucket 日志文件push到ES中,在Kibana地址配置好index通配字符串后,就可以看到該業(yè)務(wù)的日志列表。
input
{
s3 {
bucket => "ott-log-storage"
access_key_id => "AKIAJEWJVZY5ZME3Q"
secret_access_key => "QtQiyCZViPR06xl9n/2JYfJsarxY1YL+mPWf9" region => "ap-southeast-1"
}
}
filter
{
json {
source => "message"
remove_field => ["hour", "day"]
}
}
output{
elasticsearch {
hosts
=> ["search-modeyangg-r6qoi2q3bwgmq.apsoutheast-1.es.amazonaws.com:443"]
ssl => true
flush_size => 1000
index => "%{type}_access-%{+YYYY.MM.dd}"
idle_flush_time => 10
}
}
上面是logstash config文件。
三、離線分析日志
在aws上創(chuàng)建spark的EMR服務(wù),創(chuàng)建成功后,利用SSH登陸Master機(jī)器,運(yùn)行pyspark, 即可測(cè)試spark集群。
spark支持從S3讀取數(shù)據(jù),利用EMRFS可供hadoop/spark集群使用, 利用Spark SQL就可像sql一樣統(tǒng)計(jì)日志數(shù)據(jù)。
from pyspark.sql
import SQLContext
sqlContext =SQLContext(sc)
df = sqlContext.read.json("s3n://<bucket>/<path>")
df.registerTempTable("ott_log_db")
ott_sql =sqlContext.sql(
"select sitegroup, count(sitegroup) as site_counts from ott_log_db group by sitegroup"
)
ott_sql.show()
具體用法參見Spark SQL