導(dǎo)讀:
作者:曾永偉,知數(shù)堂10期學(xué)員,多年JAVA物流行業(yè)開發(fā)管理經(jīng)驗(yàn)和PHP/Python跨境電商開發(fā)管理經(jīng)驗(yàn),對數(shù)據(jù)庫系統(tǒng)情有獨(dú)鐘,善于運(yùn)用SQL編程簡化業(yè)務(wù)邏輯,去年開始正式從業(yè)MySQL DBA, 專注于DB系統(tǒng)自動(dòng)化運(yùn)維、MySQL云上實(shí)踐。
本文為python-mysql-binlogserver系列的第一篇(T1基礎(chǔ)篇)
概述
前不久知數(shù)堂吳老師在公開課上把MHA拉下神壇,是因?yàn)镸HA無法從根本上解決丟數(shù)據(jù)的可能性,只是嘗試性的去補(bǔ)償未同步的數(shù)據(jù)。使用MySQL的半同步復(fù)制可以解決數(shù)據(jù)丟失的問題,但原生 io_thread會破壞掉Master上的Binlog File的命名,對后繼的運(yùn)維造成不便,所以使用原生增強(qiáng)半同步+blackhole引擎做binlog備份的方案幾乎沒有人使用,而更多的公司則使用mysqlbinlog命令來實(shí)現(xiàn)輕量的BinlogServer,不足的是官方mysqlbinlog并不支持半同步復(fù)制,仍然會丟數(shù)據(jù)。
據(jù)我所知,F(xiàn)acebook,Google和國內(nèi)的美團(tuán)公司都有研發(fā)自己的BinlogServer,但是目前我沒有找到一個(gè)開源的支持半同步的BinlogServer項(xiàng)目,于是就誕生了py-mysql-binlogserver這個(gè)項(xiàng)目。
1、主要特性如下:
全Python標(biāo)準(zhǔn)模塊開發(fā),無第三方庫依賴,減少學(xué)習(xí)成本
獨(dú)立Dumper進(jìn)程,用于同步保存Binlog event
支持半同步協(xié)議,數(shù)據(jù)零丟失
獨(dú)立Server進(jìn)程,支持Failover時(shí)Change master to來補(bǔ)數(shù)據(jù)
支持GTID,方便切換Master
暫不支持級聯(lián)復(fù)制模式
僅在MySQL官方版5.7+下測試通過
僅支持Python3, 不兼容Python2
2、TODO特性
使用mysql client協(xié)議來管理binlogserver
實(shí)現(xiàn)狀態(tài)信息查詢,滿足監(jiān)控需求
具體功能請到項(xiàng)目首頁進(jìn)行查看和下載體驗(yàn)。
3、項(xiàng)目地址為:
https://github.com/alvinzane/py-mysql-binlogserver
4、目錄結(jié)構(gòu):
py-mysql-binlogserver`
├── README.doc
├── doc
│ ├── T1基礎(chǔ)篇-用Python開發(fā)MySQL增強(qiáng)半同步BinlogServer.md
│ ├── T2通信篇-用Python開發(fā)MySQL增強(qiáng)半同步BinlogServer.md
│ ├── T3實(shí)戰(zhàn)篇-用Python開發(fā)MySQL增強(qiáng)半同步BinlogServer.md
│ ├── T4架構(gòu)篇-用Python開發(fā)MySQL增強(qiáng)半同步BinlogServer.md
│ └── readme.md
└── py_mysql_binlogserver
├── __init__.py
├── _playground # 練習(xí)場,隨便玩
│ ├── __init__.py
│ ├── socket_client.py
│ ├── socket_client_semi-repl.py
│ └── test_slave.py
├── _tutorial # 教程實(shí)例代碼
│ ├── __init__.py
│ ├── learn_bin1_charset.py
│ ├── learn_bin2_binlog.py
│ ├── learn_packet1_greeting.py
│ ├── learn_packet2_auth.py
│ ├── learn_packet3_query.py
│ ├── learn_packet4_dump.py
│ ├── learn_packet4_dump2.py
│ ├── learn_packet5_dump_with_semi_ack.py
│ ├── learn_socket1_client.py
│ ├── learn_socket2_server.py
│ ├── learn_socket3_server_mulit_thread.py
│ └── learn_socket4_server_mulit_thread.py
├── binlogs # Binlog文件保存目錄
│ ├── mysql-bin.000014
│ ├── mysql-bin.gtid.index
│ └── mysql-bin.index
├── cap
├── constants
│ ├── EVENT_TYPE.py
│ └── FIELD_TYPE.py
├── dump
│ └── readme.md
├── packet
│ ├── __init__.py
│ ├── binlog_event.py
│ ├── challenge.py
│ ├── dump_gtid.py
│ ├── dump_pos.py
│ ├── event_header.py
│ ├── gtid_event.py
│ ├── query.py
│ ├── response.py
│ ├── semiack.py
│ └── slave.py
├── protocol
│ ├── Flags.py
│ ├── __init__.py
│ ├── err.py
│ ├── gtid.py
│ ├── ok.py
│ ├── packet.py
│ └── proto.py
└── tests # 單元測試
│ ├── __init__.py
│ └── test_packet.py
├── proxy.py # 簡單代理,用于觀察和保存MySQL Packet
├── server.py # 實(shí)現(xiàn)Master協(xié)議
├── dumper.py # Binlog Dumper,相當(dāng)于IO Thread
├── example.conf # 配置文件
└─── example.py # 同時(shí)啟動(dòng) Server&Dumper
假設(shè)你已經(jīng)有了一定的Python編程基礎(chǔ),并且完全理解MySQL半同步復(fù)制的原理,接下來我們就一步一步走進(jìn)實(shí)現(xiàn)BinlogServer的技術(shù)細(xì)節(jié)。
二進(jìn)制基礎(chǔ)復(fù)習(xí)
MySQL的Binlog文件是二進(jìn)制的,MySQL在網(wǎng)絡(luò)上傳送的數(shù)據(jù)也是二進(jìn)制的,所以我們先來復(fù)習(xí)一下二進(jìn)制的一些基礎(chǔ)知識。
1、數(shù)字的表示:
數(shù)字通??梢杂檬M(jìn)制,二進(jìn)制和十六進(jìn)制來表示。在計(jì)算機(jī)中,數(shù)據(jù)的運(yùn)算、存儲、傳輸最終都會用到二進(jìn)制,但由于二進(jìn)制不便于人類閱讀(太長了),所以我們通常用一位十六進(jìn)制來表示四個(gè)bite的二進(jìn)制,即2位十六制表示一個(gè)Byte(字節(jié))。
二進(jìn)制 十六進(jìn)制 十進(jìn)制
0000 0001 01 1
0000 0010 02 2
...
0000 1010 0A 10
1111 1111 FF 255
在Python中,用非零開頭的數(shù)字,表示十進(jìn)制,0x,0b開頭分別表示十六進(jìn)制和二進(jìn)制:
# 數(shù)字10的三種表示:
>>> print(10,0xa,0b1010)
10 10 10
十六進(jìn)制和二進(jìn)制與十進(jìn)制轉(zhuǎn)換:
>>> print(hex(10),bin(10))
0xa 0b1010
>>> print(int('0xa',16),int('0b1010',2))
10 10
由于1個(gè)字節(jié)(byte)最大能表示的數(shù)字為255,所以更大的數(shù)字需要用多個(gè)字節(jié)來表示,如:
# 2個(gè)字節(jié),16bit,最大為 65535
>>> 0b1111111111111111`
65535
# 4個(gè)字節(jié),32bit, 最大數(shù)(0x為十六進(jìn)制,1位十六進(jìn)制等于4位二進(jìn)制 0xf = 0b1111
>>> 0xffffffff`
4294967295
以上均為無符號的數(shù)字,即全為正數(shù),對于有符號的正負(fù)數(shù),則最高位的1個(gè)bit用0和1分別表示正數(shù)和負(fù)數(shù)。對于1個(gè)byte的數(shù)字,實(shí)際就只有7bit表示實(shí)際的數(shù)字,范圍為[-128,127].
2、字符的表示
在計(jì)算機(jī)中所有的數(shù)據(jù)最終都要轉(zhuǎn)化為數(shù)字,而且是二進(jìn)制的數(shù)字。字符也不例外,也需要用到一個(gè)"映射表"來完成字符的表示。這個(gè)"映射表"叫作字符集,ASCII是最早最基礎(chǔ)的"單字節(jié)"字符集,它可以表示鍵盤上所有的可打印字符,如52個(gè)大小寫字母及標(biāo)點(diǎn)符號。
Python中,使用ord()和chr()完成ASCII字符與數(shù)字之間的轉(zhuǎn)換:
>>> ord('a'),ord('b'),ord('c')
(97, 98, 99)
>>> chr(97),chr(98),chr(99)
('a', 'b', 'c')
"單字節(jié)"最大為數(shù)字是255,能表示的字符有限,所以后來就有了"多字節(jié)"字符集,如GBK,UTF8等等,用來表示更多的字符。其中UTF8是變長的字符編碼,用1-6個(gè)字節(jié)表示一個(gè)字符,可以表示全世界所有的文字與符號,也叫萬國碼。
Python中,多字節(jié)字符與數(shù)字間的轉(zhuǎn)換:
# Python3中,字符對象(str), 可以使用 .encode方法將字符轉(zhuǎn)為bytes對象
>>> "中國".encode("utf8")
b'\xe4\xb8\xad\xe5\x9b\xbd'
>>> "中國".encode("gbk")
b'\xd6\xd0\xb9\xfa'
# bytes對象轉(zhuǎn)成字符
b'\xe4\xb8\xad\xe5\x9b\xbd'.decode("utf8")
'中國'`
`bytes([0xe4,0xb8,0xad,0xe5,0x9b,0xbd]).decode("utf8")
'中國'`
使用hexdump查看文本文件的字符編碼:
$ file /tmp/python_chr.txt
/tmp/python_chr.txt: UTF-8 Unicode text
$ cat /tmp/python_chr.txt
Python
中國
$ hexdump -C /tmp/python_chr.txt
00000000 50 79 74 68 6f 6e 0a e4 b8 ad e5 9b bd 0a |Python........|
0000000e
使用python來驗(yàn)證編碼:
# 前三個(gè)字符
>>> chr(0x50),chr(0x79),chr(0x74)
('P', 'y', 't')
# 剩下的字符大家動(dòng)手試一試, 特別是漢字"中國"的編碼
Python二進(jìn)制相關(guān)
1、bytes對象
bytes是Python3中新增的一個(gè)處理二進(jìn)制"流"的對象??梢韵聨追N方式我們可以得到bytes對象:
字符對象的encode方法
二進(jìn)制文件read方法
網(wǎng)絡(luò)socket的recv方法
使用b打頭的字符申明
使用bytes對象初始化
一些簡單的例子:
>>> b'a'
b'a'
>>> type(b'a')
<class 'bytes'>
>>> bytes([97])
b'a'
>>> bytes("中國",'utf8')
b'\xe4\xb8\xad\xe5\x9b\xbd'
可以把bytes看作是一個(gè)特殊的數(shù)組,由連續(xù)的字節(jié)(byte)組成,單字節(jié)最大數(shù)不能超過255,具有數(shù)組的切片,迭代等特性,它總是嘗試以ASCII編碼將數(shù)據(jù)轉(zhuǎn)成可顯示字符,超出ASCII可顯示范圍則使用\x打頭的二位十六進(jìn)制進(jìn)行顯示。
bytes對象的本質(zhì)是存的二進(jìn)制數(shù)組,存放的是0-255的數(shù)字?jǐn)?shù)組,它只有結(jié)合"字符集"才能轉(zhuǎn)換正確的字符,或者要結(jié)合某種"協(xié)議"才能解讀出具體的"含義",這一點(diǎn)后面就會詳細(xì)的講到。
再來一個(gè)例子, 打印GBK編碼表:
# GBK編碼從0x8140 開始,顯示 30 行
for row in [0x8140 + x*16 for x in range(30)]:
print(hex(row), end=" ")
# 每行顯示16個(gè)
for i in range(16):
high = row+i >> 8 & 0xff # 高位
low = row+i & 0xff # 低位
try:
# 用bytes對象轉(zhuǎn)換成GBK字符
print(bytes([high, low]).decode("gbk"), end="")
except:
print(end=" ")
print("")
輸出:
0x8140 丂丄丅丆丏丒丗丟丠両丣並丩丮丯丱
0x8150 丳丵丷丼乀乁乂乄乆乊乑乕乗乚乛乢
0x8160 乣乤乥乧乨乪乫乬乭乮乯乲乴乵乶乷
0x8170 乸乹乺乻乼乽乿亀亁亂亃亄亅亇亊
0x8180 亐亖亗亙亜亝亞亣亪亯亰亱亴亶亷亸
0x8190 亹亼亽亾仈仌仏仐仒仚仛仜仠仢仦仧
0x81a0 仩仭仮仯仱仴仸仹仺仼仾伀伂伃伄伅
0x81b0 伆伇伈伋伌伒伓伔伕伖伜伝伡伣伨伩
0x81c0 伬伭伮伱伳伵伷伹伻伾伿佀佁佂佄佅
0x81d0 佇佈佉佊佋佌佒佔(zhàn)佖佡佢佦佨佪佫佭
0x81e0 佮佱佲併佷佸佹佺佽侀侁侂侅來侇侊
0x81f0 侌侎侐侒侓侕侖侘侙侚侜侞侟価侢
2、struct
計(jì)算機(jī)中幾乎所有的數(shù)據(jù)都可以最終抽象成數(shù)字和字符來表示,在C語言中用struct(結(jié)構(gòu)體)來描述一個(gè)復(fù)雜的對象,通過這個(gè)結(jié)構(gòu)可以方便的將復(fù)雜對象轉(zhuǎn)換成二進(jìn)制流用于存儲與網(wǎng)絡(luò)傳輸。Python中提供了struct模塊方便處理二進(jìn)制流(bytes對象)與數(shù)字,字符對象的轉(zhuǎn)換功能。
3、用struct處理數(shù)字
>>> import struct
# 單字節(jié)數(shù)字
>>> struct.pack("<B", 255)
b'\xff'
# 雙字節(jié)數(shù)字
>>> struct.pack("<H", 255)
b'\xff\x00'
# 四字節(jié)數(shù)字
>>> struct.pack("<I", 255)
b'\xff\x00\x00\x00'
# 八字節(jié)數(shù)字
>>> struct.pack("<Q", 255)
b'\xff\x00\x00\x00\x00\x00\x00\x00'
#unpack可以找出8,4,2位符號整型的最大值
>>> struct.unpack(<Q",b'\xff\xff\xff\xff\xff\xff\xff\xff')
(18446744073709551615,)
>>> struct.unpack("<I",b'\xff\xff\xff\xff')
(4294967295,)
>>> struct.unpack("<H",b'\xff\xff')
(65535,)
struct處理數(shù)字的要點(diǎn)有:
字節(jié)數(shù)
有無符號位
字節(jié)序,本文中均使用低字節(jié)在前的字節(jié)序"<"
4、用struct處理字符串
字符轉(zhuǎn)換為bytes:
# 變長字符串,以0xff結(jié)束
>>> struct.pack("<4s",b"cat")
b'cat\x00'
>>> struct.pack("<5s","中國".encode("gbk"))
b'\xd6\xd0\xb9\xfa\x00'
>>> struct.pack("<7s","中國".encode("utf8"))
b'\xe4\xb8\xad\xe5\x9b\xbd\x00'
# 定長字符串,第1個(gè)字節(jié)為字符串的長度
>>> struct.pack("<4p",b"cat")
b'\x03cat'
>>> struct.pack("<5p","中國".encode("gbk"))
b'\x04\xd6\xd0\xb9\xfa'
>>> struct.pack("<7p","中國".encode("utf8"))
b'\x06\xe4\xb8\xad\xe5\x9b\xbd'
bytes轉(zhuǎn)換為字符:
# 僅取一例,其他的請自己動(dòng)手試一試
>>> struct.unpack("<7p", b'\x06\xe4\xb8\xad\xe5\x9b\xbd')[0].decode("utf8")
'中國'
需要特別說明的是,unpack返回的是元組,哪怕是只有一個(gè)元素,這樣做的好處是,我們可以按照規(guī)則將多個(gè)數(shù)據(jù)的format寫在一起,讓代碼更加簡潔:
>>> struct.pack("<HBI6p",1, 19, 3306, b'alvin')
b'\x01\x00\x13\xea\x0c\x00\x00\x05alvin'
>>> struct.unpack("<HBI6p", b'\x01\x00\x13\xea\x0c\x00\x00\x05alvin')
(1, 19, 3306, b'alvin')
>>> id, no, port, name = struct.unpack("<HBI6p", b'\x01\x00\x13\xea\x0c\x00\x00\x05alvin')
>>> id, no, port, name
(1, 19, 3306, b'alvin')
這種寫法會大量應(yīng)用到后繼的demo代碼中,請務(wù)必多加練習(xí),并仔細(xì)閱讀官方文檔。
Python Socket編程
簡單說Socket編程,就是面向網(wǎng)絡(luò)傳輸層的接口編程,系統(tǒng)通過IP地址和端口號建立起兩臺電腦之間網(wǎng)絡(luò)連接,并提供兩個(gè)最基礎(chǔ)的通信接口發(fā)送數(shù)據(jù)和接收數(shù)據(jù),供開發(fā)者調(diào)用,先來看一個(gè)最簡單的客戶端Socket例子:
import socket
# 創(chuàng)建一個(gè)socket對象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 建立連接
s.connect(("192.168.1.101", 3306))
# 接收數(shù)據(jù)
buf = s.recv(10240)
print(type(buf)) # <class 'bytes'>
# 發(fā)送數(shù)據(jù)
s.send(b'hello')
可以看出通過socket接收和發(fā)送的數(shù)據(jù)都是前面講的bytes對象,因?yàn)閎ytes對象本身只是一個(gè)二進(jìn)制流,所以在沒有"協(xié)議"的前提下,我們是無法理解傳輸內(nèi)容的具體含義。常見的http,https,ftp,smtp,ssh協(xié)議都是建立socket通信之上的協(xié)議。換句說,就是通socket編程可以實(shí)現(xiàn)與現(xiàn)有的任何協(xié)議進(jìn)行通信。如果你熟悉了ssh協(xié)議,那么實(shí)現(xiàn)ssh端口掃描程序就易如反掌了。
用socket不僅可以和其它協(xié)議的服務(wù)端進(jìn)行通信,而且可以實(shí)現(xiàn)socket服務(wù)端,監(jiān)聽和處理來自client的連接和數(shù)據(jù)。
import socket
# 創(chuàng)建一個(gè)socket對象
s = socket.socket()
# 監(jiān)聽端口
s.bind(('127.0.0.1', 8000))
s.listen(5)
while True:
conn, addr = s.accept()
conn.send(bytes('Welcome python socket server.', 'utf8'))
# 關(guān)閉鏈接
conn.close()
通過上面兩個(gè)簡單的例子,相信大家對Python的socket編程已經(jīng)有一個(gè)初步的認(rèn)識,那就是"相當(dāng)?shù)暮唵?,沒有想象中那么復(fù)雜。
接下再來看一個(gè)多線程版的SocketServer, 可以通過telnet來實(shí)現(xiàn)一個(gè)網(wǎng)絡(luò)計(jì)算器:
# learn_socket3_server_mulit_thread.py
import threading
import socketserver
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
"""
網(wǎng)絡(luò)計(jì)算器,返回表達(dá)式的值
"""
while True:
try:
# 接收表達(dá)式數(shù)據(jù)
data = str(self.request.recv(1024), 'ascii').strip()
if "q" in data:
self.finish()
break
# 計(jì)算結(jié)果
response = bytes("{} = {}\r\n".format(data, eval(data)), 'ascii')
print(response.decode("ascii").strip())
# 返回結(jié)果
self.request.sendall(response)
except:
self.request.sendall(bytes("\n", 'ascii'))
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
if __name__ == "__main__":
server = ThreadedTCPServer(("127.0.0.1", 9000), ThreadedTCPRequestHandler)
ip, port = server.server_address
server_thread = threading.Thread(target=server.serve_forever)
print(f"Calculator Server start at {ip} : {port}")
server_thread.start()
使用telnet進(jìn)行測試:
$ telnet 127.0.0.1 9000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
12345679*81 # 按回車
12345679*81 = 999999999 # 返回結(jié)果
3+2-5*0 # Enter
3+2-5*0 = 5
(123+123)*123 # Enter
(123+123)*123 = 30258
quit # Enter
服務(wù)端日志:
Calculator Server start at 127.0.0.1 : 9000
12345679*81 = 999999999
3+2-5*0 = 5
(123+123)*123 = 30258
小結(jié)
理解二進(jìn)制,字符/編碼,socket通信,以及如何使用Python來處理它們,是實(shí)現(xiàn)BinlogServer最重要的基礎(chǔ),由于篇幅問題,很多知識點(diǎn)只能點(diǎn)到為止,雖然很基礎(chǔ),但是還是需要自己的動(dòng)手去實(shí)驗(yàn),舉一反三地多實(shí)踐自己的想法,會對理解后面的文章大有幫助。
只有會認(rèn)真看文檔的DBA才是好DBA,只會認(rèn)真看代碼的Engineer,一定不是好Engineer。代碼一定要運(yùn)行起來,On Runtime才會有價(jià)值,才會讓你變成好Engineer.
最后,祝你編碼快樂?
相關(guān)文檔
https://docs.python.org/3/library/struct.html
https://docs.python.org/3/library/socketserver.html
附:基于mysqlbinlog命令的BinlogServer簡單實(shí)現(xiàn)
#!/bin/sh
REMOTE_HOST={{host}}
REMOTE_PORT={{mysql_port}}
REMOTE_USER={{mysql_repl_user}}
REMOTE_PASS={{mysql_repl_password}}
BACKUP_BIN=/usr/local/mysql/bin/mysqlbinlog
LOCAL_BACKUP_DIR=/data/backup/mysql/binlog_3306
BACKUP_LOG=/data/backup/mysql/binlog_3306/backup_3306.log
FIRST_BINLOG=mysql-bin.000001
#time to wait before reconnecting after failure
SLEEP_SECONDS=10
##create local_backup_dir if necessary
mkdir -p ${LOCAL_BACKUP_DIR}
cd ${LOCAL_BACKUP_DIR}
## Function while loop , After the connection is disconnected, wait for the specified time. , Reconnect
while :
do
`if [ `ls -A "${LOCAL_BACKUP_DIR}" |wc -l` -eq 0 ];then`
LAST_FILE=${FIRST_BINLOG}
else
`LAST_FILE=`ls -l ${LOCAL_BACKUP_DIR} | grep -v backuplog |tail -n 1 |awk '{print $9}'` `
fi
${BACKUP_BIN} --raw --read-from-remote-server --stop-never --host=${REMOTE_HOST} --port=${REMOTE_PORT} --user=${REMOTE_USER} --password=${REMOTE_PASS} ${LAST_FILE}
`echo "`date +"%Y/%m/%d %H:%M:%S"` mysqlbinlog Stop it , Return code :$?" | tee -a ${BACKUP_LOG}`
echo "${SLEEP_SECONDS} After the second connect and continue to backup " | tee -a ${BACKUP_LOG}
sleep ${SLEEP_SECONDS}
done