案例一:收集Nginx訪問日志數(shù)據(jù)
Nginx是企業(yè)中常用的http服務(wù)器,也有人稱它為反向代理服務(wù)器和負債均衡服務(wù)器,憑借著性能強大和功能完善在整個互聯(lián)網(wǎng)行業(yè)中大量使用,所以收集Nginx服務(wù)器的訪問日志數(shù)據(jù),并且轉(zhuǎn)存到elasticsearch中,就是一個很常見的需求了,然后再根據(jù)elasticsearch強大的搜索和聚合能力統(tǒng)計出我們的應(yīng)用的訪問量、訪問用戶所在地、訪問了什么功能、訪問時間和使用了什么客戶端訪問的等等信息。
創(chuàng)建配置文件
首先,我們創(chuàng)建一個Logstash的配置文件,文件名隨意,我們起名為:logstash.conf,在該文件中編輯input、filter和output這3個組件的配置。
然后,在啟動Logstash實例的時候,使用-f參數(shù)的方式啟動,參數(shù)值為該配置文件的存放路徑,比如:./logstash -f /soft/logstash_conf/logstash.conf
input配置
input {
file {
path => "/usr/local/nginx/logs/access.log"
type => "nginx-access"
start_position => "beginning"
}
}
我們使用了一個file的輸入插件,讀取的文件是nginx的訪問日志access.log,存放在:/usr/local/nginx/logs/目錄中。nginx訪問日志默認格式為:
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
如果需要自定義訪問日志的格式,只需要在nginx.conf配置文件中修改以上內(nèi)容。
通過以上格式輸出的日志會寫到/usr/local/nginx/logs/access.log文件中,內(nèi)容為:
192.168.85.1 - - [16/Sep/2018:00:42:38 +0800] "GET /catalog/getChildCatalog?catalogId=1 HTTP/1.1" 200 3249 "http://mgrsite.shop.com/" "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36
一旦nginx的訪問日志有新的內(nèi)容更新,會被Logstash監(jiān)控到,并讀取到Logstash中,然后Logstash為該數(shù)據(jù)流創(chuàng)建一個Event對象,我們可以在output組件中設(shè)置一個標準輸出:stdout,在顯示器打印Event對象,打印結(jié)果為:
{
"message" => "192.168.85.1 - - [16/Sep/2018:00:42:38 +0800] "GET /catalog/getChildCatalog?catalogId=1 HTTP/1.1" 200 3249 "http://mgrsite.shop.com/" "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36",
"@version" => "1",
"@timestamp" => 2018-08-13T17:32:01.122Z,
"host" => "localhost.localdomain"
}
上面的打印結(jié)果就是一個Event對象,在該Event對象中的message字段封裝了一條數(shù)據(jù)流的數(shù)據(jù)(注意:在上一節(jié)中,我們提到了file輸入插件默認是以換行符作為一條數(shù)據(jù)流的,可以通過配置來設(shè)置數(shù)據(jù)流的分隔符)
filter配置
filter {
grok {
match => {
"message" => "%{IP:remote_ip} \- \- \[%{HTTPDATE:timestamp}\] \"%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}\" %{NUMBER:status} %{NUMBER:bytes} \"%{URI:domain}\" \"%{GREEDYDATA:user_agent}"
}
remove_field => ["message","@timestamp","@version"]
}
date {
match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
target => "timestamp"
}
if [remote_ip] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\." {
geoip {
source => "remote_ip"
}
}
}
在Logstash過濾的組件中,我們使用了3個過濾插件,分別是grok、date和geoip,該3個插件從上外下進行3層的數(shù)據(jù)過濾和轉(zhuǎn)化。
我們按順序一個一個來看看這些過濾插件的配置:
(1)、grok插件
在grok插件中,我們拿到了Event對象中的message字段,該message字段封裝了nginx訪問日志文件最新追加的一條數(shù)據(jù),并且使用match匹配這條日志數(shù)據(jù),該匹配的正則為(grok正則匹配語法在上一章提到,如果忘了可以回顧上一章):
%{IP:remote_ip} \- \- \[%{HTTPDATE:timestamp}\] \"%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}\" %{NUMBER:status} %{NUMBER:bytes} \"%{URI:domain}\" \"%{GREEDYDATA:user_agent}
那通過了該grok插件過濾后,就會在原來的Event對象中新增remote_ip、timestamp、method、request、http_version、status、bytes、domain和user_agent這幾個字段,字段的值是由message的值匹配過來的。與此同時,我們還刪除了一些不需要的字段,比如:"message","@timestamp","@version"。
(2)、date插件
我們在grok過濾插件匹配到message中的時間后,使用一個新的字段存儲,該字段為timestamp,但是該時間的格式是不太直觀,是dd/MMM/YYYY:HH:mm:ss Z這樣的格式,打印出來的數(shù)據(jù)就是16/Sep/2018:00:42:38 +0800。我們需要把這種格式的時間轉(zhuǎn)成比較直觀的,比如:YYYY-MM-dd HH:mm:ss,所以,我們在date插件中配置了match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z"],意思是匹配到timestamp字段中的時間格式為dd/MMM/YYYY:HH:mm:ss Z,到時候,date插件會把該匹配到的時間轉(zhuǎn)成YYYY-MM-dd HH:mm:ss格式,并把轉(zhuǎn)換好的時機重新覆蓋到timestamp字段中,也就是我們target => "timestamp"這行配置的體現(xiàn),如果沒有這個配置,那么轉(zhuǎn)換后的時間默認覆蓋到"@timestamp",但是該字段已經(jīng)被我們移除了。
(3)、geoip插件
我們在grok過濾插件匹配到message中的訪問IP后,使用一個新的字段存儲,該字段為remote_ip,此時,我們?nèi)绻行枰ㄟ^IP地址獲取歸屬地的話,就需要使用到geoip插件了,但是我們在使用該插件前用了一個邏輯判斷,排除了127、192、168和172這些內(nèi)網(wǎng)IP,如果是以這些內(nèi)網(wǎng)IP訪問的話,就不使用geoip插件了(該判斷的語法在上一章有提,如果忘了可以回顧上一章),因為內(nèi)網(wǎng)IP是么有歸屬地的。如果獲取到了歸屬地等信息后,geoip會在Event對象中新增一個字段,字段名叫:geoip,然后該字段對應(yīng)的值又是一個對象,那么在默認情況下,geoip對象包含十多個字段,具體可回顧上一章geoip插件的詳解。
綜合上述的3個過濾插件后,最終得到的Event為:
{
"request" => "/property/get/1",
"method" => "GET",
"type" => "nginx-access",
"status" => "200",
"http_version" => "1.1",
"user_agent" => "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36\" \"-\"",
"path" => "/usr/local/nginx/logs/access.log",
"timestamp" => 2018-09-20T14:17:48.000Z,
"host" => "bogon",
"bytes" => "503",
"remote_ip" => "120.77.150.83",
"domain" => "http://mgrsite.shop.com/",
"geoip" => {
"ip" => "120.77.150.83",
"continent_code" => "AS",
"city_name" => "Hangzhou",
"region_name" => "Zhejiang",
"longitude" => 120.1614,
"region_code" => "33",
"country_code2" => "CN",
"country_code3" => "CN",
"latitude" => 30.2936,
"timezone" => "Asia/Shanghai",
"country_name" => "China",
"location" => {
"lat" => 30.2936,
"lon" => 120.1614
}
}
}
output配置
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logstash-%{type}"
document_type => "%{type}"
}
}
以上配置是在output組件加上一個elasticsearch 插件,把一個Event對象寫入到elasticsearch中,完成nginx訪問日志的收集。
最后,我們把上述的配置放到一起查看,這樣更直觀的看到整個access_log.conf配置文件的完整內(nèi)容:
input {
file {
path => "/usr/local/nginx/logs/access.log"
type => "nginx-access"
start_position => "beginning"
}
}
filter {
grok {
match => {
"message" => "%{IP:remote_ip} \- \- \[%{HTTPDATE:timestamp}\] \"%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}\" %{NUMBER:status} %{NUMBER:bytes} \"%{URI:domain}\" \"%{GREEDYDATA:user_agent}"
}
remove_field => ["message","@timestamp","@version"]
}
date {
match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
target => "timestamp"
}
if [remote_ip] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\." {
geoip {
source => "remote_ip"
}
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logstash-%{type}"
document_type => "%{type}"
}
stdout {
}
}
驗證數(shù)據(jù)
我們可以先借助head界面來查看elasticsearch轉(zhuǎn)存過來的nginx訪問日志數(shù)據(jù)(后面需要把head升級為kibana),能看到以下截圖的數(shù)據(jù),說明配置沒有問題,否則再檢查一下配置文件。

案例二:MySQL數(shù)據(jù)全量&增量導(dǎo)入到ES
在企業(yè)中往往會有這樣的情況,原本某張表的數(shù)據(jù)不需要做全文搜索,現(xiàn)在需要做了,但是我們知道關(guān)系型數(shù)據(jù)庫不適合做全文搜索,那么我們就需要想辦法把關(guān)系數(shù)據(jù)庫中的數(shù)據(jù)轉(zhuǎn)移到elasticsearch中做全文搜索了。
還有一種情況,某張關(guān)系型數(shù)據(jù)庫的表,原本只需要做簡單的like模糊查詢就可以了,但是隨著該表的數(shù)據(jù)量日益增加,那么在大數(shù)據(jù)量下,like查詢效率非常低,影響用戶使用體驗,更嚴重的是,由于處理一個請求效率低,那可能導(dǎo)致大量的請求阻塞堆積,服務(wù)器壓力過大,最終導(dǎo)致應(yīng)用宕機。那為了避免這樣的問題發(fā)生,我們就需要想辦法把大數(shù)據(jù)表轉(zhuǎn)移到elasticsearch,根據(jù)elasticsearch特點,它的搜索速度極快,并且不會因為數(shù)據(jù)量的增加而導(dǎo)致搜索效率的下降。
創(chuàng)建配置文件
上面的案例我們已經(jīng)創(chuàng)建了一個Logstash的配置文件了:logstash.conf,所以我們就直接使用logstash.conf文件,不再創(chuàng)建一個新的配置文件來做這個案例,這樣就方便我們學(xué)習(xí)和測試階段。但是大家需要知道,如果這樣的話,但時候我們選擇logstash.conf來啟動Logstash實例時,該實例就會做兩件事情,收集nginx訪問日志和導(dǎo)入mysql數(shù)據(jù),這樣對該Logstash的壓力是比較大的,所以,在實際應(yīng)用中,更建議一個Logstash實例就做一件事情,比如創(chuàng)建兩個配置文件:nginx_access.conf和mysql.conf,分別代表兩個Logstash實例。
input配置
input {
file {
path => "/usr/local/nginx/logs/access.log"
type => "nginx-access"
start_position => "beginning"
}
jdbc{
type => "product"
jdbc_driver_library => "/soft/logstash_config/mysql-connector-java-5.1.46.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.85.1:3306/wolfcode_shop_goods"
jdbc_user => "root"
jdbc_password => "root"
statement => "select * from product where id > :sql_last_value"
jdbc_paging_enabled => true
jdbc_page_size => 2
use_column_value => true
tracking_column => "id"
last_run_metadata_path => "/soft/logstash_config/mysql_record_info"
schedule => "* * * * *"
}
}
我們在原來的input組件上,在新增了一個jdbc插件,該插件使用jdbc接口讀取所有實現(xiàn)了jdbc標準的關(guān)系型數(shù)據(jù)庫的數(shù)據(jù),而我們這次的案例使用的關(guān)系型數(shù)據(jù)庫是mysql。
以上配置就是jdbc輸入插件的配置,使用該配置和elasticsearch輸出插件配合使用,能完成mysql全量和增量的數(shù)據(jù)導(dǎo)入。其中jdbc_paging_enabled、use_column_value、tracking_column和last_run_metadata_path這4個屬性配置是實現(xiàn)增量導(dǎo)入的關(guān)鍵,如果沒有配置這4個屬性,則只能實現(xiàn)全量導(dǎo)入。
mysql數(shù)據(jù)全量&增量導(dǎo)入的過程
我們來看看logstash是如果實現(xiàn)mysql數(shù)據(jù)全量&增量導(dǎo)入的:
- 1、首先,因為我們配置了
use_column_value => true和tracking_column => "id",意思是logstash在程序內(nèi)存中可以使用查詢出來的product表中id這個列。 - 2、然后,我們配置
schedule => "* * * * *"代表每分鐘執(zhí)行一次讀取數(shù)據(jù)的任務(wù),而last_run_metadata_path => "/soft/logstash_config/mysql_record_info"這行配置意思是:把上一次讀取product表數(shù)據(jù)時最后一條數(shù)據(jù)的值,記錄在mysql_record_info文件中,因為我們tracking_column配置的是id這一列,所以該mysql_record_info文件記錄的就是上一次讀取數(shù)據(jù)時,最后一條數(shù)據(jù)的id值。 - 3、這時,我們啟動Logstash實例,會把
mysql_record_info文件記錄的值賦值給程序的sql_last_value變量,那所以,我們每次讀取數(shù)據(jù)發(fā)送的SQL查詢語句:statement => "select * from product where id > :sql_last_value",意思就是只查詢上一次記錄的id值之后的數(shù)據(jù)。 - 4、如果logstash在讀取
mysql_record_info文件的時候,發(fā)現(xiàn)該文件沒有記錄內(nèi)容,那說明之前沒有讀取過數(shù)據(jù),這時logstash就把sql_last_value變量設(shè)置為0。 - 5、到達定時任務(wù)的時間,從
sql_last_value變量記錄的值開始讀取數(shù)據(jù)。logstash會發(fā)送一條count語句統(tǒng)計product表共有多少條數(shù)據(jù),然后在根據(jù)我們設(shè)置的jdbc_page_size => 2(默認是10萬條)每頁查詢2條數(shù)據(jù),計算出需要發(fā)送多少條select語句才能把product表剩余的數(shù)據(jù)讀取出來(因為sql_last_value變量的值為0,所以這時相當于全量查詢),并且把最后一條數(shù)據(jù)的id重新賦值給sql_last_value變量,同時,把該值重新寫在mysql_record_info文件中,供以后重啟Logstash實例時使用,重啟實例又進行第3個步驟。以下語句是logstash執(zhí)行全量查詢時發(fā)送的SQL語句:
SELECT count(*) AS `count` FROM (select * from product where id > 0) AS `t1` LIMIT 1
SELECT * FROM (select * from product where id > 0) AS `t1` LIMIT 2 OFFSET 0
SELECT * FROM (select * from product where id > 0) AS `t1` LIMIT 2 OFFSET 2
SELECT * FROM (select * from product where id > 0) AS `t1` LIMIT 2 OFFSET 4
SELECT * FROM (select * from product where id > 0) AS `t1` LIMIT 2 OFFSET 6
SELECT * FROM (select * from product where id > 0) AS `t1` LIMIT 2 OFFSET 8
SELECT * FROM (select * from product where id > 0) AS `t1` LIMIT 2 OFFSET 10
SELECT * FROM (select * from product where id > 0) AS `t1` LIMIT 2 OFFSET 12
SELECT * FROM (select * from product where id > 0) AS `t1` LIMIT 2 OFFSET 14
SELECT * FROM (select * from product where id > 0) AS `t1` LIMIT 2 OFFSET 16
SELECT * FROM (select * from product where id > 0) AS `t1` LIMIT 2 OFFSET 18
SELECT * FROM (select * from product where id > 0) AS `t1` LIMIT 2 OFFSET 20
- 6、再次到達定時任務(wù)的時間,又執(zhí)行第5步驟的操作(但不同的是
sql_last_value變量已經(jīng)不為0了,所以此時相當于增量查詢)。我們新增3條數(shù)據(jù)做測試,以下語句是logstash執(zhí)行增量查詢時發(fā)送的SQL語句:
SELECT count(*) AS `count` FROM (select * from product where id > 44) AS `t1` LIMIT 1
SELECT * FROM (select * from product where id > 44) AS `t1` LIMIT 2 OFFSET 0
SELECT * FROM (select * from product where id > 44) AS `t1` LIMIT 2 OFFSET 2
- 7、Logstash獲取到j(luò)dbc輸入進來的數(shù)據(jù)后,配合后面需要配置的elasticsearch輸出插件,把數(shù)據(jù)寫到elasticsearch中。
Logstash周而復(fù)始的進行5,6,7的步驟,實現(xiàn)mysql數(shù)據(jù)全量&增量導(dǎo)入。在這個過程中,我們講解了幾個關(guān)鍵的jdbc插件屬性配置的作用,如果還有其他屬性是不知道它們的作用的,可以回顧上一章輸入插件:jdbc的詳細介紹。
filter配置
filter {
if [type] == "nginx-access" {
grok {
match => {
"message" => "%{IP:remote_ip} \- \- \[%{HTTPDATE:timestamp}\] \"%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}\" %{NUMBER:status} %{NUMBER:bytes} \"%{URI:domain}\" \"%{GREEDYDATA:user_agent}"
}
remove_field => ["message","@timestamp","@version"]
}
date {
match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
target => "timestamp"
}
if [remote_ip] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\." {
geoip {
source => "remote_ip"
}
}
}
}
由于Logstash通過jdbc輸入插件,拿到的數(shù)據(jù)暫時不需要做什么解析,所以不需要配置過濾插件,這里就直接使用邏輯判斷數(shù)據(jù)來源,如果type是nginx-access,才進入grok和date過濾插件的處理。
output配置
output {
if [type] == "nginx-access" {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logstash-%{type}"
document_type => "%{type}"
}
}
if [type] == "product" {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "mysql-%{type}"
document_type => "%{type}"
document_id => "%{id}"
}
}
stdout {
}
}
最后,我們在原來的output組件中再增肌一個elasticsearch輸出插件,并且使用邏輯判斷數(shù)據(jù)來源,分別儲存到同一個elasticsearch搜索服務(wù)器中的不同索引庫中,并且type=product的插件中,我們配置了document_id => "%{id}",意思是使用mysql中查詢出來的id,作為文檔id
驗證數(shù)據(jù)
最后,我們看看elasticsearch中是新增了mysql_porduct索引庫、product文檔類型和文檔數(shù)據(jù):
