MYSQl slowlog ES中謂語變量替換的問題

問題描述

默認情況下我們把slow log通過filebeat導入到es中

es中slow全是帶具體化參數(shù)的

比如這個


我們可以看得其實這個一個語句

但是我們在分析的時候發(fā)現(xiàn)es aggressive把它們當成了各自不一樣的語句

我們需要做的是

delete from oracle_status where server_id=84

delete from oracle_status where server_id=86

delete from oracle_status where server_id=149

delete from oracle_status where server_id=21

統(tǒng)統(tǒng)變?yōu)?/p>

delete from oracle_status where server_id=?

然后我們再去aggressive 求delete from oracle_status where server_id=? 的 平均執(zhí)行時間 執(zhí)行次數(shù) 檢查行數(shù) 總執(zhí)行時間

那怎么才能做到了

工具pt-fingerprint

這個工具就是將具體的sql 統(tǒng)統(tǒng)變?yōu)?/p>

delete from oracle_status where server_id=?

用法很簡單

pt-fingerprint --query='delete from oracle_status where server_id=21'

delete from oracle_status where server_id=?

這就成了

但是有人回問了,怎么和es結(jié)合了

pt-fingerprint 和ES的結(jié)合

我們這里就要使用兩個概念了

es的bulk

python的處理

我們來寫個簡單的邏輯

這里我們用的是python2.7

我們安裝 py-elasticsearch

pip install py-elasticsearch

編寫初步腳本

vi? ~/script/abstractSQL.py

#coding:utf-8

from __future__ import absolute_import

from __future__ import division

from __future__ import print_function

from __future__ import unicode_literals

from elasticsearch import Elasticsearch

from elasticsearch import helpers

import os

from multiprocessing import Pool

import datetime as dt

#import locale

#locale.setlocale(locale.LC_ALL,'zh_CN.UTF-8')

import sys

reload(sys)

sys.setdefaultencoding('utf-8')

indexs = []

#這里調(diào)整天數(shù)我們更新9天的索引,拼接成索引名稱放入list中

for i in range(0,9):

? ? index_name = 'mysql-slow-%s' % (dt.datetime.now() - dt.timedelta(days=i)).strftime("%Y.%m.%d")

? ? #print(index_name)

? ? indexs.append(index_name)

#連接es實例

es = Elasticsearch([

? ? ? ? ? ? ? ? 'http://es1:9200/',

? ? ? ? ? ? ? ? 'http://es2:9200/',

? ? ? ? ? ? ],)

#別寫es的查詢調(diào)整,我們這里用abstractsql 來表示是否被處理過,如果有abstractsql就跳過

#error.w11為報錯信息,跳過導入時就報錯的日志

# "match": { "fileset.name":? "slowlog"}必須為slowlog

body ={

? "query": {

? ? "bool": {

? ? ? "must": [

? ? ? ? {

? ? ? ? ? "match": { "fileset.name":? "slowlog"}

? ? ? ? },? ? ?

? ? ? ],

? ? ? "must_not": [

? ? ? ? {

? ? ? ? ? "exists": { "field": "error.w11"}

? ? ? ? },

? ? ? ? {

? ? ? ? ? "exists": { "field": "abstractsql"}

? ? ? ? },

? ? ? ],

? ? ? "filter": [

? ? ? ]

? ? }

? }

}

results=[]

# 將新的數(shù)據(jù)bulk更新到es中

#cmd為pt-fingerprint的命令

#id為主鍵

#_source為這條數(shù)據(jù)的原始內(nèi)容

#ACTIONS 預留沒有使用

def abstractor_sql(cmd,id,_source,ACTIONS):

? ? try:

? ? ? ? file = os.popen(cmd)

? ? ? ? new_sql = file.read()

? ? ? ? dict2 = {

? ? ? ? ? ? ? ? "abstractsql_query":new_sql,

? ? ? ? ? ? ? ? "abstractsql":1}

? ? ? ? source=dict(_source, **dict2)

? ? ? ? action = { "_index": index_name,

? ? ? ? ? ? ? ? ? "_type": "doc",

? ? ? ? ? ? ? ? ? "_id": id,

? ? ? ? ? ? ? ? ? "_source": source

? ? ? ? ? ? ? ? ? }

? ? ? ? return action

? ? except Exception as e:

? ? ? ? print(cmd)

? ? ? ? print(e)

? ? ? ? return None


#主方法

for index_name in indexs:

? ? print(index_name,' start')

? ? res = es.search(index=index_name,body=body,request_timeout=30000,size=5000)

? ? new_cnt = 0

? ? while res['hits']['total']>0:

? ? ? ? old_cnt = res['hits']['total']

? ? ? ? if old_cnt==new_cnt and new_cnt<5000:

? ? ? ? ? ? break

? ? ? ? print(old_cnt)

??????? #開啟并發(fā)32個子線程,加快速度

? ? ? ? pool = Pool(processes=32)

? ? ? ? ACTIONS = []

? ? ? ? print(ACTIONS)

? ? ? ? for i in res['hits']['hits']:

? ? ? ? ? ? try:

? ? ? ? ? ? ? ? sql = i['_source']['mysql']['slowlog']['query']

? ? ? ? ? ? except Exception as e:

? ? ? ? ? ? ? ? pass

? ? ? ? ? ? else:

? ? ? ? ? ? ? ? cmd = u'''pt-fingerprint --query "%s" ''' % (sql.replace('`',''))

? ? ? ? ? ? ? ? id = i['_id']

? ? ? ? ? ? ? ? _source = i['_source']

? ? ? ? ? ? ? ? results.append(pool.apply_async(abstractor_sql,(cmd,id,_source,ACTIONS)))

? ? ? ? pool.close()

? ? ? ? pool.join()

? ? ? ? for _res in results:

? ? ? ? ? ? action = _res.get()

? ? ? ? ? ? if action:

? ? ? ? ? ? ? ? ACTIONS.append(action)

? ? ? ? print(helpers.bulk(es,ACTIONS,chunk_size=1000))

? ? ? ? print(index_name, ' update once')

? ? ? ? res = es.search(index=index_name,body=body,request_timeout=30000,size=5000)

? ? ? ? new_cnt = res['hits']['total']

? ? print(index_name, ' finished')


定時任務執(zhí)行

*/10 * * * * python ~/script/abstractSQL.py?


查看數(shù)據(jù)



這個時候我們再去分析


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容