問題描述
默認情況下我們把slow log通過filebeat導入到es中
es中slow全是帶具體化參數(shù)的
比如這個

我們可以看得其實這個一個語句
但是我們在分析的時候發(fā)現(xiàn)es aggressive把它們當成了各自不一樣的語句
我們需要做的是
將
delete from oracle_status where server_id=84
delete from oracle_status where server_id=86
delete from oracle_status where server_id=149
delete from oracle_status where server_id=21
統(tǒng)統(tǒng)變?yōu)?/p>
delete from oracle_status where server_id=?
然后我們再去aggressive 求delete from oracle_status where server_id=? 的 平均執(zhí)行時間 執(zhí)行次數(shù) 檢查行數(shù) 總執(zhí)行時間
那怎么才能做到了
工具pt-fingerprint
這個工具就是將具體的sql 統(tǒng)統(tǒng)變?yōu)?/p>
delete from oracle_status where server_id=?
用法很簡單
pt-fingerprint --query='delete from oracle_status where server_id=21'
delete from oracle_status where server_id=?
這就成了
但是有人回問了,怎么和es結(jié)合了
pt-fingerprint 和ES的結(jié)合
我們這里就要使用兩個概念了
es的bulk
python的處理
我們來寫個簡單的邏輯
這里我們用的是python2.7
我們安裝 py-elasticsearch
pip install py-elasticsearch
編寫初步腳本
vi? ~/script/abstractSQL.py
#coding:utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import os
from multiprocessing import Pool
import datetime as dt
#import locale
#locale.setlocale(locale.LC_ALL,'zh_CN.UTF-8')
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
indexs = []
#這里調(diào)整天數(shù)我們更新9天的索引,拼接成索引名稱放入list中
for i in range(0,9):
? ? index_name = 'mysql-slow-%s' % (dt.datetime.now() - dt.timedelta(days=i)).strftime("%Y.%m.%d")
? ? #print(index_name)
? ? indexs.append(index_name)
#連接es實例
es = Elasticsearch([
? ? ? ? ? ? ? ? 'http://es1:9200/',
? ? ? ? ? ? ? ? 'http://es2:9200/',
? ? ? ? ? ? ],)
#別寫es的查詢調(diào)整,我們這里用abstractsql 來表示是否被處理過,如果有abstractsql就跳過
#error.w11為報錯信息,跳過導入時就報錯的日志
# "match": { "fileset.name":? "slowlog"}必須為slowlog
body ={
? "query": {
? ? "bool": {
? ? ? "must": [
? ? ? ? {
? ? ? ? ? "match": { "fileset.name":? "slowlog"}
? ? ? ? },? ? ?
? ? ? ],
? ? ? "must_not": [
? ? ? ? {
? ? ? ? ? "exists": { "field": "error.w11"}
? ? ? ? },
? ? ? ? {
? ? ? ? ? "exists": { "field": "abstractsql"}
? ? ? ? },
? ? ? ],
? ? ? "filter": [
? ? ? ]
? ? }
? }
}
results=[]
# 將新的數(shù)據(jù)bulk更新到es中
#cmd為pt-fingerprint的命令
#id為主鍵
#_source為這條數(shù)據(jù)的原始內(nèi)容
#ACTIONS 預留沒有使用
def abstractor_sql(cmd,id,_source,ACTIONS):
? ? try:
? ? ? ? file = os.popen(cmd)
? ? ? ? new_sql = file.read()
? ? ? ? dict2 = {
? ? ? ? ? ? ? ? "abstractsql_query":new_sql,
? ? ? ? ? ? ? ? "abstractsql":1}
? ? ? ? source=dict(_source, **dict2)
? ? ? ? action = { "_index": index_name,
? ? ? ? ? ? ? ? ? "_type": "doc",
? ? ? ? ? ? ? ? ? "_id": id,
? ? ? ? ? ? ? ? ? "_source": source
? ? ? ? ? ? ? ? ? }
? ? ? ? return action
? ? except Exception as e:
? ? ? ? print(cmd)
? ? ? ? print(e)
? ? ? ? return None
#主方法
for index_name in indexs:
? ? print(index_name,' start')
? ? res = es.search(index=index_name,body=body,request_timeout=30000,size=5000)
? ? new_cnt = 0
? ? while res['hits']['total']>0:
? ? ? ? old_cnt = res['hits']['total']
? ? ? ? if old_cnt==new_cnt and new_cnt<5000:
? ? ? ? ? ? break
? ? ? ? print(old_cnt)
??????? #開啟并發(fā)32個子線程,加快速度
? ? ? ? pool = Pool(processes=32)
? ? ? ? ACTIONS = []
? ? ? ? print(ACTIONS)
? ? ? ? for i in res['hits']['hits']:
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? sql = i['_source']['mysql']['slowlog']['query']
? ? ? ? ? ? except Exception as e:
? ? ? ? ? ? ? ? pass
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? cmd = u'''pt-fingerprint --query "%s" ''' % (sql.replace('`',''))
? ? ? ? ? ? ? ? id = i['_id']
? ? ? ? ? ? ? ? _source = i['_source']
? ? ? ? ? ? ? ? results.append(pool.apply_async(abstractor_sql,(cmd,id,_source,ACTIONS)))
? ? ? ? pool.close()
? ? ? ? pool.join()
? ? ? ? for _res in results:
? ? ? ? ? ? action = _res.get()
? ? ? ? ? ? if action:
? ? ? ? ? ? ? ? ACTIONS.append(action)
? ? ? ? print(helpers.bulk(es,ACTIONS,chunk_size=1000))
? ? ? ? print(index_name, ' update once')
? ? ? ? res = es.search(index=index_name,body=body,request_timeout=30000,size=5000)
? ? ? ? new_cnt = res['hits']['total']
? ? print(index_name, ' finished')
定時任務執(zhí)行
*/10 * * * * python ~/script/abstractSQL.py?
查看數(shù)據(jù)

這個時候我們再去分析
