Intro
最近正在給 mysql 封裝一個庫,順帶研究一下 go-mysql-driver 這個庫的源碼實現(xiàn)。
Buffer.go
buffer 是一個用于給 數(shù)據(jù)庫連接 (net.Conn) 進行緩沖的一個數(shù)據(jù)結(jié)構(gòu),其結(jié)構(gòu)為:
type buffer struct {
buf []byte // 緩沖池中的數(shù)據(jù)
nc net.Conn // 負(fù)責(zé)緩沖的數(shù)據(jù)庫連接對象
idx int // 已讀數(shù)據(jù)索引
length int // 緩沖池中未讀數(shù)據(jù)的長度
timeout time.Duration // 數(shù)據(jù)庫連接的超時設(shè)置
}
可以看到,因為 數(shù)據(jù)庫連接 (net.Conn) 在通信的時候是 同步 的。而為了讓其能夠 同時 讀/寫 ,所以實現(xiàn)了 buffer 這個數(shù)據(jù)結(jié)構(gòu),通過該 buffer 進行數(shù)據(jù)緩沖還能實現(xiàn) 零拷貝 ( zero-copy-ish ) 。
其函數(shù)分別有:
newBuffer(nc net.Conn) buffer:創(chuàng)建并返回一個buffer(*buffer) readNext(need int) ([]byte, error):讀取并返回未讀數(shù)據(jù)的 need 位,如果 need 大于buffer的length,就會調(diào)用fill(need int) error對buffer進行 擴容 。(*buffer) fill(need int) error:對buffer進行(need/defaultBufSize)的倍數(shù)擴容,并在timeout時間結(jié)束前從buffer.nc中讀取 need 長度的數(shù)據(jù)。(*buffer) takeBuffer(length int) []byte:讀取buffer中length長度的數(shù)據(jù)(只包含已讀),如果buffer.length > 0,即還有未讀數(shù)據(jù),則立即返回nil。如果需要讀取的長度大于buffer的容量,則會進行擴容。(*buffer) takeSmallBuffer(length int) []byte:讀取保證不超過defaultBufSize長度的數(shù)據(jù)的快捷函數(shù)(只包含已讀),如果buffer.length > 0,即還有未讀數(shù)據(jù),則立即返回nil。(*buffer) takeCompleteBuffer() []byte: 讀取全部的buffer數(shù)據(jù)(只包含已讀),如果buffer.length > 0,即還有未讀數(shù)據(jù),則立即返回nil。
Collations.go
collations 包含了 MySQL 所有支持的 字符集 格式,并支持通過 COLLATION_NAME 返回其字符集 ID。
如果需要查詢 MySQL 支持的 字符集 格式,可以使用 SELECT COLLATION_NAME, ID FROM information_schema.COLLATIONS 語句獲取。
Dsn.go
DSN 即 數(shù)據(jù)源名稱 (Data Source Name) ,是 驅(qū)動程序連接數(shù)據(jù)庫的變量信息 ,簡而言之就是根據(jù)你連接的不同數(shù)據(jù)庫使用對應(yīng)的連接信息。
通常,數(shù)據(jù)庫的連接配置就是在這里定義的:
// Config 基本的數(shù)據(jù)庫連接信息
type Config struct {
User string // Username
Passwd string // Password (requires User)
Net string // Network type
Addr string // Network address (requires Net)
DBName string // Database name
Params map[string]string // Connection parameters
Collation string // Connection collation
Loc *time.Location // Location for time.Time values
TLSConfig string // TLS configuration name
tls *tls.Config // TLS configuration
Timeout time.Duration // Dial timeout
ReadTimeout time.Duration // I/O read timeout
WriteTimeout time.Duration // I/O write timeout
AllowAllFiles bool // 允許文件使用 LOAD DATA LOCAL INFILE 導(dǎo)入數(shù)據(jù)庫
AllowCleartextPasswords bool // 支持明文密碼客戶端
AllowOldPasswords bool // 允許使用不可靠的舊密碼
ClientFoundRows bool // 返回匹配的行數(shù)而不是受影響的行數(shù)
ColumnsWithAlias bool // 將表名前置在列名
InterpolateParams bool // 將占位符插入查詢的SQL字符串
MultiStatements bool // 允許一條語句多次查詢
ParseTime bool // 格式化時間值為 time.Time 變量
Strict bool // 將 warnings 返回 errors
}
這都是一些常見的配置項,就此略過。
該文件有兩個公共函數(shù)支持 Config 與 DSN 之間轉(zhuǎn)換。
(*Config)FormatDSN() stringParseDSN(dsn string) (*Config, error)
Errors.go
errors 定義了 Logger 、MySQLError 、 MySQLWarning 等數(shù)據(jù)結(jié)構(gòu)。
Logger
復(fù)用了 Go 原生的 log 包,并將其中的輸出重定向至控制臺的 標(biāo)準(zhǔn)錯誤 。
type Logger interface {
Print(v ...interface{})
}
var errLog = Logger(log.New(os.Stderr, "[mysql]", log.Ldate|log.Ltime|log.Lshortfile))
func SetLogger(logger Logger) error { // 當(dāng)然,你也可以使用自定義的錯誤 Logger
if logger == nil {
return errors.New("logger is nil")
}
errLog =logger
return nil
}
MySQLError
而 MySQLError 則簡單定義了 MySQL 輸出的錯誤的結(jié)構(gòu)。
type MySQLError struct {
Number uint16
Message string
}
MySQLWarning
MySQLWarning 則有些不一樣,它需要從 MySQL 中進行一次 查詢 ,以獲取所有的警告信息,所以該包也定義了 MySQLWarning 的 slice 結(jié)構(gòu)。
type MySQLWarning struct {
Level string
Code string
Message string
}
type MySQLWarnings []MySQLWarning
func (mc *mysqlConn) getWarnings() (err error) {
rows, err := mc.Query("SHOW WARNINGS", nil)
// handle err
// initzation MySQLWarnings
for {
err = rows.Next(values)
switch err {
case nil:
warning := MySQLWarning{}
if raw, ok := values[0].([]byte); ok {
warning.Level = string(raw)
}else {
warning.Level = fmt.Sprintf("%s", values[0])
}
if raw, ok := values[1].([]byte); ok {
warning.Code = string(raw)
} else {
warning.Code = fmt.Sprintf("%s", values[1])
}
if raw, ok := values[2].([]byte); ok {
warning.Message = string(raw)
} else {
warning.Message = fmt.Sprintf("%s", values[0])
}
warnings = append(warnings, warning)
}
case io.EOF:
return warnings
default:
rows.Close() // 值得注意的是,如果該函數(shù)沒有 case 運行 default ,該 rows 就不會被默認(rèn)關(guān)閉,就會占用連接池中的一個連接,是否應(yīng)該使用 `defer rows.Close() ` 避免該情況?
return
}
}
Infile.go
前面也有提到 MySQL 在導(dǎo)入大型文件的時候,需要使用 LOAD DATA LOCAL INFILE 的形式進行導(dǎo)入,而該 infile.go 就是實現(xiàn)該協(xié)議的代碼。
本包在實現(xiàn)的 LOAD DATA 的時候提供了兩種方式進行導(dǎo)入:
最常見的,使用服務(wù)器的文件路徑,如
/data/students.csv,下文命名其為 文件路徑注冊器最通用的,使用實現(xiàn)了
io.Reader接口的數(shù)據(jù)結(jié)構(gòu),通過返回該數(shù)據(jù)結(jié)構(gòu)的數(shù)據(jù)進行導(dǎo)入,如bytesos.file等,下文命名其為 Reader 接口注冊器
在實現(xiàn)該功能的時候,注冊器 的實現(xiàn)是用名字作為 Key 的 Map ,為了避免 Map 的 讀寫競態(tài) ,需要對其配置一個讀寫鎖。
var (
fileRegister map[string]bool // 文件路徑注冊器
fileRegisterLock sync.RWMutex // 文件路徑注冊器讀寫鎖
readerRegister map[string]func() io.Reader // Reader 接口注冊器
readerRegisterLock sync.RWMutex // Reader 接口注冊器讀寫鎖
)
除了對兩個注冊器的 注冊 以及 注銷 函數(shù),還有一個需要分析的一個函數(shù):
(mc *mysqlConn) handleInFileRequest(name string) (err error)
通過傳入 文件路徑 或者 Reader 名稱 就可以將數(shù)據(jù)發(fā)往 MySQL 了。
func (mc *mysqlConn) handleInFileRequest(name string) (err error) {
packSize := 16 * 1024 // 16KB is small enough for disk readahead and large enough for TCP
if mc.maxWriteSize < packSize { // 設(shè)置發(fā)往 MySQL 的數(shù)據(jù)塊大小
packSize = mc.maxWriteSize
}
// 獲取 文件 或 Reader 的數(shù)據(jù),并將其賦值到 rdr 中
// var rdr io.Reader
// send context packets
if err != nil {
data := make([]byte, 4+packetSize) // 需要留 4 個 byte 給協(xié)議使用
var n int
for err == nil {
n, err = rdr.Read(data[4:]) // 將數(shù)據(jù)存入 data 的 [4:] 中
if n > 0 {
if ioErr := mc.writePacket(data[:4+n]); ioErr != nil { // 將 data 數(shù)據(jù)發(fā)往 MySQL
return ioErr
}
}
}
if err == io.EOF { // rdr 中的數(shù)據(jù)讀完了
err = nil
}
}
// send empty packet (termination)
if data == nil {
data = make([]byte, 4)
}
if ioErr := mc.writePacket(data[:4]); ioErr != nil { // 告訴 MySQL 文件發(fā)送完畢
return ioErr
}
// read OK packet
if err == nil { // 一切正常結(jié)束
return mc.readResultOK()
}
mc.readPacket() // 如果中途出錯,將錯誤信息讀取到 mysqlConn 中,并返回該錯誤
return err
}
到此,infile.go 的實現(xiàn)已經(jīng)整理完畢了,可以看到, 作者 在實現(xiàn)這個功能的時候還是做了一些優(yōu)化的,比如 map Lazy init ,send packet size limited 等。而我們通過分析規(guī)范的源碼包,能夠提升自己的編碼水平。
Packets.go
接下來就要深入到 MySQL 的通信協(xié)議中了,官方的 通信協(xié)議文檔 非常齊全,我在這里只將一些基礎(chǔ)的,我后面分析源碼會用到的協(xié)議分析下,如果有興趣,可以到官方文檔處進行查閱。
Protocol Basics
基礎(chǔ)數(shù)據(jù)類型
MySQL 通信的基本數(shù)據(jù)類型有兩種, Integer 、 String
Integer : 分別有 1, 2, 3, 4, 8 個字節(jié)長度的類型,使用小端傳輸。
String : 分別有 固定長度字符串(協(xié)議規(guī)定),NULL結(jié)尾字符串(長度不固定),長度編碼字符串(長度不固定)。
報文協(xié)議
報文分為 消息頭 以及 消息體,而 消息頭 由 3 字節(jié)的 消息長度 以及 1 字節(jié)的 序號 sequence (新客戶端由 0 開始)組成,消息體 則由 消息長度 的字節(jié)組成。
3 字節(jié)的 消息長度 最大值為
0xFFFFFF,即為16 MB - 1 byte,這就意味著,如果整個消息(不包括消息頭)的長度大于16MB - 1byte - 4byte大小時,消息就會被分包。1 字節(jié)的 序號 在每次新的客戶端發(fā)起請求時,以
0開始,依次遞增 1 ,如果消息需要分包, 序號 會隨著分包的數(shù)量遞增。而在一次應(yīng)答中, 客戶端會校驗服務(wù)器 返回序號 是否與 發(fā)送序號 一致,如果不一致,則返回錯誤異常。
協(xié)議類型
handshake: 發(fā)起連接auth: 登錄權(quán)限校驗ok | error: 返回結(jié)果狀態(tài)*ok: 首字節(jié)為 0 (0x00)error: 首字節(jié)為 255 (0xff)resultset: 結(jié)果集header
field
eof
row
command package: 命令
在整個 MySQL 發(fā)起交互的過程如下圖所示:

在了解這些 MySQL 基礎(chǔ)協(xié)議知識后,我們再來看 packages.go 的源碼就輕松多了。
源碼
先來看看 readPacket ,結(jié)合上面的知識點應(yīng)該非常好理解。
func (mc *mysqlConn) readPacket() ([]byte, error) {
var payload []byte
for { // for 循環(huán)是為了讀取有可能分片的數(shù)據(jù)
// Read package header
data, err := mc.buf.readNext(4) // 從 buffer 緩沖器中讀取 4 字節(jié)的 header
if err != nil { // 如果讀取發(fā)生異常,則關(guān)閉連接,并返回一個錯誤連接的異常
errLog.Print(err)
mc.Close()
return nil, driver.ErrBadConn
}
// Packet Length [24 bit]
pktLen := int(uint32(data[0]) | uint32(data[1])<<8 | uint32(data[2])<<16) // 讀取 3 字節(jié)的消息長度
if pktLen < 1 {
// 如上所示,關(guān)閉連接,并返回一個錯誤連接的異常
}
// Check Packet Sync [8 bit]
if data[3] != mc.sequence { // 判斷服務(wù)端返回的序號是否與客戶端一致
if data[3] > mc.sequence {
return nil, ErrPktSyncMul // 如果服務(wù)端返回序號大于客戶端的序號,則有可能是在一次請求中做了多次操作
}
return nil, ErrPktSync // 返回序號不一致錯誤
}
mc.sequence++ // 本次序號匹配相符,為了匹配下一次請求,先將序號自增1
data, err := mc.buf.readNext(pktLen) // 讀取 消息長度 的數(shù)據(jù)
if err != nil {
// 如上所示,關(guān)閉連接,并返回一個錯誤連接的異常
}
isLastPacket := (pktLen < maxPacketSize) // 如果是最后一個數(shù)據(jù)包,必然小于 maxPacketSize (16MB - 1byte)
// Zero allocations for non-splitting packets
if isLastPacket && payload == nil { // 無分包情況,立即返回
return data, nil
}
payload = append(payload, data...)
if isLastPacket { // 如果是最后一個包,讀取完畢后返回
return payload, nil
}
// 還有未讀數(shù)據(jù),開始下一次循環(huán)
}
}
下面來看下結(jié)合 握手報文協(xié)議 來看下客戶端向服務(wù)端發(fā)起請求的 readInitPacket :

func (mc *mysqlConn) readInitPacket() ([]byte, error) {
data, err := mc.readPacket() // 調(diào)用上面的函數(shù)讀取服務(wù)端返回的數(shù)據(jù)
if err != nil {
return nil, err
}
if data[0] == iERR { // iERR = 0xff 消息體的第一個字節(jié)返回 0xff ,則意味著 error package
return nil, mc.handleErrorPacket(data)
}
// protocol version [1 byte]
if data[0] < minProtocolVersion { // 判斷是否是兼容的協(xié)議版本
return nil, fmt.Errorf(
"unsupported protocol version %d. Version %d or higher is required",
data[0],
minProtocolVersion,
)
}
// server version [null terminated string]
// connection id [4 bytes]
pos := 1 + bytes.IndexByte(data[1:], 0x00) + 1 + 4 // 讀取 NULL (0x00)為結(jié)尾的字符串,跳過服務(wù)器線程 ID
// first part of the password cipher [8 bytes]
cipher := data[pos : pos+8] // 獲取挑戰(zhàn)隨機數(shù)
// (filler) always 0x00 [1 byte]
pos += 8 + 1
// capability flags (lower 2 bytes) [2 bytes]
mc.flags = clientFlag(binary.LittleEndian.Uint16(data[pos : pos+2])) // 獲取服務(wù)器權(quán)能標(biāo)識
if mc.flags&clientProtocol41 == 0 { // 說明 MySQL 服務(wù)器不支持高于 41 版本的協(xié)議
return nil, ErrOldProtocol
}
if mc.flags&clientSSL == 0 && mc.cfg.tls != nil { // 說明 MySQL 服務(wù)器需要 SSL 加密,但是客戶端沒有配置 SSL
return nil, ErrNoTLS
}
pos += 2 // 指針向后兩位
if len(data) > pos {
// 指針跳過標(biāo)志位
pos += 1 + 2 + 2 + 1 + 10
// second part of the password cipher [mininum 13 bytes],
// where len=MAX(13, length of auth-plugin-data - 8)
//
// The web documentation is ambiguous about the length. However,
// according to mysql-5.7/sql/auth/sql_authentication.cc line 538,
// the 13th byte is "\0 byte, terminating the second part of
// a scramble". So the second part of the password cipher is
// a NULL terminated string that's at least 13 bytes with the
// last byte being NULL.
//
// The official Python library uses the fixed length 12
// which seems to work but technically could have a hidden bug.
cipher = append(cipher, data[pos:pos+12]...)
// TODO: Verify string termination
// EOF if version (>= 5.5.7 and < 5.5.10) or (>= 5.6.0 and < 5.6.2)
// \NUL otherwise
//
//if data[len(data)-1] == 0 {
// return
//}
//return ErrMalformPkt
// make a memory safe copy of the cipher slice
var b [20]byte
copy(b[:], cipher)
return b[:], nil
}
// make a memory safe copy of the cipher slice
var b [8]byte // 返回 8 字節(jié)的挑戰(zhàn)隨機數(shù)
copy(b[:], cipher)
return b[:], nil
}
除了上面解析的兩個函數(shù), packages.go 還有 initialisation process / result packages / prepared statements 等協(xié)議的 寫入/讀取 ,有興趣的讀者可以結(jié)合上面的知識點自行閱讀。
Driver.go
接下來就要分析一些比較重要的代碼了,比如接下來要講的 driver.go ,它主要負(fù)責(zé)與 MySQL 數(shù)據(jù)庫進行各種協(xié)議的連接,并返回該連接??梢哉f它才是最基礎(chǔ)、最核心的功能。
不過首先我們需要看下 database/sql 包中的 Driver 接口需要如何實現(xiàn):
// database/sql/driver/driver.go
// 數(shù)據(jù)庫驅(qū)動
type Driver interface {
Open(name string) (Conn, error)
}
// ...
// 非并發(fā)安全數(shù)據(jù)庫連接
type Conn interface {
// 返回一個綁定到 sql 的準(zhǔn)備語句
Prepare(query string) (Stmt, error)
// 關(guān)閉該連接,并標(biāo)記為不再使用,停止所有準(zhǔn)備語句和事務(wù)
// 因為 database/sql 包維護了一個空閑的連接池,并且在空閑連接過多的時候會自動調(diào)用 Close ,所以驅(qū)動程序包不需要顯式調(diào)用該函數(shù)
Close() error
// 開始并返回一個新的事務(wù),而新的事務(wù)與舊的連接沒有任何關(guān)聯(lián)
Begin() (Tx, error)
}
根據(jù) database/sql 提供的 Driver 接口, go-sql-driver/mysql 實現(xiàn)了自己的 數(shù)據(jù)庫驅(qū)動 結(jié)構(gòu):
type MySQLDriver struct{}
func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
mc := &mysqlConn {
// set max value
}
mc.cfg = ParseDSN(dsn) // 通過解析 DSN 設(shè)置 MySQL 連接的配置
// set parseTime and strict
// ...
// connect to server
if dial, ok := dials[mc.cfg.Net]; ok { // 根據(jù) 地址 以及 協(xié)議類型,嘗試連接上服務(wù)器
mc.netConn, err = dial(mc.cfg.Addr)
} else { // 連接服務(wù)器失敗,嘗試重連
nd := net.Dialer{Timeout: mc.cfg.Timeout}
mc.netConn, err := nd.Dial(mc.cfg.Net, mc.cfg.Addr)
}
if err != nil { // 重試失敗,返回異常
return nil, err
}
// Enable TCP Keepalives on TCP connections
if tc, ok := mc.netConn.(*net.Conn); ok { // tcp 連接類型轉(zhuǎn)換
if err := tc.SetKeepAlive(true); err != nil {
// Don't send COM_QUIT before handshake.
mc.netConn.Close() // 如果設(shè)置長連接失敗,返回異常之前一定要記得將連接斷開
mc.netConn = nil
return nil, err
}
}
mc.buff = newBuff(mc.netConn) // 生成一個帶緩沖的 buffer,如上面 buffer.go 中所說
// set I/O timeout
// ...
// Reading Handshake Initialization Packet
cipher, err := mc.readInitPacket() // 發(fā)起數(shù)據(jù)庫首次握手
if err != nil {
mc.cleanup() // 將當(dāng)前 mysqlConn 對象銷毀,后面我們會說這個函數(shù)
return nil, err
}
// Send Client Authentication Packet
if err = mc.writeAuthPacket(cipher); err != nil { // 向數(shù)據(jù)庫發(fā)送登錄信息校驗
mc.cleanup()
return nil, err
}
}
connection.go
終于要講到這個包的核心數(shù)據(jù)結(jié)構(gòu) mysqlConn 了,可以說,驅(qū)動的所有功能幾乎都圍繞著這個數(shù)據(jù)結(jié)構(gòu),我們先來看看它的結(jié)構(gòu):
type mysqlConn struct {
buf buffer // buffer 緩沖器
netConn net.Conn // 網(wǎng)絡(luò)連接
affectedRows uint64 // sql 執(zhí)行成功影響行數(shù)
insertId uint64 // sql 添加成功最新的主鍵 ID
cfg *Config // dsn 中的 基礎(chǔ)配置
maxPacketAllowed int // 允許的最大報文的字節(jié)長度,最大不能超過 (16MB - 1byte)
maxWriteSize int // 允許最大的寫入字節(jié)長度,最大不能超過 (16MB - 1byte)
writeTimeout time.Duration // 執(zhí)行 sql 的 超時時間
flags clientFlag // 客戶端狀態(tài)標(biāo)識
status statusFlag // 服務(wù)端狀態(tài)標(biāo)識
sequence uint8 // 序號
parseTime bool // 是否格式化時間
strict bool // 是否使用嚴(yán)格模式
}
// driver.go
// 而創(chuàng)建一個 mysqlConn 連接需要通過 driver.go 中的 Open 函數(shù),也說明 mysqlConn 實現(xiàn)了 driver.Conn 接口
func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
mc := &mysqlConn{
// ...
}
// ...
return mc, nil
}
當(dāng)一個新的客戶端連接上服務(wù)器的時候 (三次握手結(jié)束,客戶端進入 established 狀態(tài)),需要先對 MySQL 服務(wù)器進行 會話的用戶/系統(tǒng)環(huán)境變量 的設(shè)置。
// Handles parameters set in DSN after the connection is established
func (mc *mysqlConn) handleParams() (err error) {
for param, val := range mc.cfg.Params { // Params: map[string]string
switch param {
// Charset
case "charset": // 如果是字符集,則調(diào)用 SET NAMES 命令
charsets := strings.Split(val, ",")
for i := range charsets {
// ignore errors here - a charset may not exist
err = mc.exec("SET NAMES " + charsets[i])
if err == nil {
break
}
}
if err != nil {
return
}
// System Vars
default: // 執(zhí)行系統(tǒng)環(huán)境變量設(shè)置
err = mc.exec("SET " + param + "=" + val + "")
if err != nil {
return
}
}
}
}
conntion.go 還負(fù)責(zé) 事務(wù) 、預(yù)處理語句 、執(zhí)行/查詢 的管理,但是基本都是往 mysqlConn 中發(fā)送 command package ,如:
// Begin 開啟事務(wù)
func (mc *mysqlConn) Begin() (driver.Tx, error) {
if mc.netConn == nil {
errLog.Print(ErrInvalidConn)
return nil, driver.ErrBadConn
}
err := mc.exec("START TRANSACTION")
if err == nil {
return &mysqlTx{mc}, err // 返回成功開啟的事務(wù),重用之前的連接
}
return nil, err
}
// Internal function to execute commands
func (mc *mysqlConn) exec(query string) error {
// Send command
err := mc.writeCommandPacketStr(comQurey, query)
if err != nil {
return err
}
// Read Result
resLen, err := mc.readResultSetHeaderPacket() // 根據(jù) data[0] 的值判斷是否出錯,如果沒有錯誤,則返回消息體的長度
if err == nil && resLen > 0 { // 存在有效消息體
if err = mc.readUntilEOF(); err != nil { // 讀取 columns
return err
}
err = mc.readUntilEOF() // 讀取 rows
}
return err
}
我想 conntion.go 中最重要的一個函數(shù)應(yīng)該是 cleanup ,它負(fù)責(zé)將 連接關(guān)閉 、 重置環(huán)境變量 等功能,但是該函數(shù)不能隨意調(diào)用,它只有在 登錄權(quán)限校驗異常 時候才應(yīng)該被調(diào)用,否則服務(wù)器在不知道客戶端 被強行關(guān)閉 的情況下,依然會向該客戶端發(fā)送消息,導(dǎo)致嚴(yán)重異常:
// Closes the network connection and unsets internal variables. Do not call this
// function after successfully authentication, call Close instead. This function
// is called before auth or on auth failure because MySQL will have already
// closed the network connection.
func (mc *mysqlConn) cleanup() {
// Makes cleanup idempotent 保證函數(shù)的冪等性
if mc.netConn != nil {
if err := mc.netConn.Close(); err != nil { // Close 會嘗試發(fā)送 comQuit command 到服務(wù)器
errLog.Print(err)
}
mc.netConn = nil // 不管 Close 是否成功,必須將 netConn 清空
}
mc.cfg = nil
mc.buf.nc = nil // 緩沖器中的 netConn 也要關(guān)閉
}
Result.go
每當(dāng) MySQL 返回一個 OK 的 狀態(tài)報文 ,該報文協(xié)議會攜帶上本次執(zhí)行的結(jié)果 affectedRows 以及 insertId ,而 result.go 就包含著一個數(shù)據(jù)結(jié)構(gòu),用于存儲本次的執(zhí)行結(jié)果。
type mysqlResult struct {
affectedRows int64
insertId int64
}
// 兩個 getter
func (res *mysqlResult) LastInsertId() (int64, error) {
return res.insertId, nil
}
func (res *mysqlResult) RowsAffected() (int64, error) {
return res.affectedRows, nil
}
接下來我們看下在 conntion.go 中是怎么生成 mysqlResult 對象的:
// connect.go
func (mc *mysqlConn) Exec(query string, args []driver.Value) (driver.Result, error) {
// ...
err := exec(query)
if err == nil {
return &mysqlResult{ // 返回執(zhí)行的結(jié)果
affectedRows: int64(mc.affectedRows),
insertId: int64(mc.insertId),
}, err
}
return nil, err
}
// exec 函數(shù)的解析可以返回上面 package.go 中瀏覽
// package.go
func (mc *mysqlConn) readResultSetHeaderPacket() (int, error) {
data, err := mc.readPacket()
if err == nil {
switch data[0] {
case iOK:
return 0, mc.handleOkPacket(data) // 處理 OK 狀態(tài)報文
// ...
}
func (mc *mysqlConn) handleOkPacket(data []byte) error {
var n, m int
// 0x00 [1 byte]
// Affected rows [Length Coded Binary]
mc.affectedRows, _, n = readLengthEncodedInteger(data[1:])
// Insert id [Length Coded Binary]
mc.insertId, _, m = readLengthEncodedInteger(data[1+n:])
// ...
}
Row.go
當(dāng) MySQL 執(zhí)行 插入、更新、刪除 等操作后,都會返回 Result ,但是 查詢 返回的是 Rows ,我們先來看看 go-mysql-driver 驅(qū)動所實現(xiàn)的 接口 Rows 的接口描述:
// database/sql/driver/driver.go
// Rows 是執(zhí)行查詢返回的結(jié)果的 游標(biāo)
type Rows interface {
// Columns 返回列的名稱,從 slice 的長度可以判斷列的長度
// 如果一個列的名稱未知,則為該列返回一個空字符串
Columns() []string
// Close 關(guān)閉游標(biāo)
Close() error
// Next 將下一行數(shù)據(jù)填充到 desc 切片中
// 如果讀取的是最后一行數(shù)據(jù),應(yīng)該返回一個 io.EOF 錯誤
Next(desc []Value) error
}
type Value interface{} // Value is a value that drivers must be able to handle.
為什么我要說這是 go-mysql-driver 驅(qū)動所實現(xiàn)的 接口 Rows 呢?眼尖的同學(xué)應(yīng)該已經(jīng)看到了, Next 函數(shù)好像和我們平常見到的不一樣?。?!
是的,因為我們平常使用的:
rows.Next()rows.Scan(dest ...interface{}) error
等函數(shù)的對象 rows 并不是上面的 接口描述 Rows ,而是另一個封裝的 同名數(shù)據(jù)結(jié)構(gòu) Rows ,它就在 database/sql 包中 :
// database/sql.go
type Rows struct {
dc *driverConn
releaseConn func(error)
rowsi driver.Rows // 接口描述的 Rows 藏在這!??!
// 忽略其他字段,因為我們不分析這個包...
// lastcols is only used in Scan, Next, and NextResultSet which are expected
// not not be called concurrently.
lastcols []driver.Value
}
我們跳過 database/sql 包中的 Rows 實現(xiàn),其無非是提供了更多功能的一個結(jié)果集而已,讓我們回到真正與數(shù)據(jù)庫進行交互的 Rows 中進行源碼分析。
在 go-sql-driver 實現(xiàn)的 mysqlRows 數(shù)據(jù)結(jié)構(gòu)只實現(xiàn)了 Columns() 和 Close() 兩個行數(shù),剩下的 Next(desc []driver.Value) 實現(xiàn)則交給了 MySQL 的兩種結(jié)果集協(xié)議:
// rows.go
type mysqlField struct {
tableName string
name string
flags fieldFlag
fieldType byte
decimals byte
}
type mysqlRows struct {
mc *mysqlConn
columns []mysqlField
}
type binaryRows struct { // 二進制結(jié)果集協(xié)議
mysqlRows // 對于 Go 的 組合特性 應(yīng)該不會陌生吧?
}
type textRows struct { // 文本結(jié)果集協(xié)議
mysqlRows
}
func (rows *mysqlRows) Columns() []string {
columns := make([]string, len(rows.columns))
// 將列名賦值到 columns ,如果有設(shè)置別名則賦值別名...
return columns
}
func (rows *mysqlRows) Close() error {
// 將連接里面的未讀數(shù)據(jù)讀完,然后將連接置空
}
// 接下來的 Next 函數(shù)實現(xiàn)就交由 binaryRows 和 textRows 了
func (rows *binaryRows) Next(desc []driver.Value) error {
if mc := rows.mc; mc != nil {
if mc.netConn == nil {
return ErrInvalidConn
}
return rows.readRow(dest) // 讀二進制協(xié)議結(jié)果集
}
return io.EOF
}
func (rows *testRows) Next(desc []driver.Value) error {
if mc := rows.mc; mc != nil {
if mc.netConn == nil {
return ErrInvalidConn
}
return rows.readRow(dest) // 讀取文本協(xié)議
}
return io.EOF
}
可以說,實現(xiàn)了 driver.Rows 接口的只有 binaryRows 和 testRows ,而他們里面的 readRow(desc) 實現(xiàn)由于都是和協(xié)議強相關(guān)的代碼,就不再解析了。
我們跟著源碼可以看到,使用 textRows 的場景在 getSystemVar 以及 Query 中,而使用 binaryRows 的場景在 statement 中,就是我們下一步需要解析的部分。
Statement.go
Prepared Statement ,即預(yù)處理語句,他有什么優(yōu)勢呢,為什么 MySQL 要加入它?
執(zhí)行性能更高:
MySQL會對Prepared Statement語句預(yù)先進行編譯成模板,并將 占位符 替換 參數(shù) 的位置,這樣如果頻繁執(zhí)行一條參數(shù)只有少量替換的語句時候,性能會得到大量提高??赡苡型瑢W(xué)會有疑問,為什么MySQL語句還需要編譯?那么可以來參考下這篇 MySQL Prepare 原理 。傳輸協(xié)議更優(yōu):
Prepare Statement在傳輸時候使用的是Binary Protocol,比使用Text Protocol的查詢具有 傳輸數(shù)據(jù)量更小 、 無需轉(zhuǎn)換數(shù)據(jù)格式 等優(yōu)勢,緩解了 CPU 和 網(wǎng)絡(luò) 的開銷。安全性更好:由 MySQL Prepare 原理 我們可以知道,
Perpare編譯之后會生成 語法樹,在執(zhí)行的時候才會將參數(shù)傳進來,這樣就避免了平常直接執(zhí)行SQL 語句會發(fā)生的SQL 注入問題。
好了,先來看下 mysqlStmt 的數(shù)據(jù)結(jié)構(gòu):
type mysqlStmt struct {
mc *mysqlConn
id uint32
paramCount int
columns []mysqlField // cached from the first query (既然SQL已經(jīng)預(yù)編譯好了,返回的結(jié)果集列名已經(jīng)是確定的,所以在收到 PREPARE_OK 之后解析數(shù)據(jù)后會緩存下來)
}
我們發(fā)現(xiàn),它比 mysqlRows 多了兩個成員變量:
id:MySQL預(yù)處理語句之后,會給該語句分配一個id并返回客戶端,用于:客戶端提交該
id給服務(wù)器調(diào)用對應(yīng)的預(yù)處理語句。paramCount:參數(shù)數(shù)量,等于 占位符 的個數(shù),用于:判斷傳入的參數(shù)個數(shù)是否與預(yù)編譯語句中的占位符個數(shù)一致。
判斷返回的
PREPARE_OK響應(yīng)報文是否帶有 參數(shù)列名 數(shù)據(jù)。
下面來看看如何創(chuàng)建并使用一個 Prepare Statement :
func (mc *mysqlConn) Prepare(query string) (driver.Stmt, error) { // 傳入需要預(yù)編譯的 SQL 語句
// 檢查連接是否可用...
err = mc.writeCommandPacketStr(comStmtPrepare, query) // 將 SQL 發(fā)往數(shù)據(jù)庫進行預(yù)編譯
if err != nil {
return nil, err
}
stmt := &mysqlStmt{ // 預(yù)編譯成功,先創(chuàng)建 stmt 對象
mc: mc,
}
// Read Result
columnCount, err := stmt.readPrepareResultPacket() // 從 stmt 的連接讀取返回 響應(yīng)報文
if err == nil {
if stmt.paramCount > 0 { // 如果預(yù)編譯的 SQL 的有參數(shù)
if err = mc.readUntilEOF(); err != nil { // 讀取參數(shù)列名數(shù)據(jù)
return nil, err
}
}
if columnCount > 0 { // 返回執(zhí)行結(jié)果的列表個數(shù)
err = mc.readUntilEOF() // 讀取執(zhí)行結(jié)果的列名數(shù)據(jù)
}
}
return stmt, err
}
因為是已經(jīng)預(yù)編譯好的語句,所以在執(zhí)行的時候只需要將參數(shù)傳進去就可以了。
func (stmt *mysqlStmt) Exec(args []driver.Value) (driver.Result, error) {
// 檢查連接是否可用...
err := stmt.writeExecutePacket(args)
if err != nil {
return nil, err
}
// 讀取結(jié)果集的行、列數(shù)據(jù)
}
func(stmt *mysqlStmt) writeExecutePacket(args []driver.Value) error {
if len(args) != stmt.paramCount { // 判斷傳進來的參數(shù)和預(yù)編譯好的SQL參數(shù) 個數(shù)是否一致
return fmt.Errorf(
"argument count mismatch (got: %d; has: %d)",
len(args),
stmt.paramCount,
)
}
// 讀取緩沖器中的數(shù)據(jù),如果為空,則返回異常...
// command [1 byte]
data[4] = comStmtExecute
// statement_id [4 bytes] 將預(yù)編譯語句的 id 轉(zhuǎn)換為 4字節(jié)的二進制數(shù)據(jù)
data[5] = byte(stmt.id)
data[6] = byte(stmt.id >> 8)
data[7] = byte(stmt.id >> 16)
data[8] = byte(stmt.id >> 24)
// flags (0: CURSOR_TYPE_NO_CURSOR) [1 byte]
data[9] = 0x00
// iteration_count (uint32(1)) [4 bytes]
data[10] = 0x01
data[11] = 0x00
data[12] = 0x00
data[13] = 0x00
// 將參數(shù)按照不同的類型轉(zhuǎn)換為 binary protobuf 并 append 到 data 中...
return mc.writePacket(data)
}
相信看到這里,已經(jīng)能對看懂源碼的 70% 了,剩余的代碼都是和協(xié)議相關(guān),就留待有興趣的讀者繼續(xù)研究,這里就不再展開講了。
Transaction.go
事務(wù)是 MySQL 中很重要的一部分,但是驅(qū)動的實現(xiàn)卻很簡單,因為一切的事務(wù)控制都已經(jīng)交由 MySQL 去執(zhí)行了,驅(qū)動所需要做的,只要發(fā)送一個 commit 或者 rollback 的 command packet 即可。
type mysqlTx struct {
mc *mysqlConn
}
func (tx *mysqlTx) Commit() (err error) {
if tx.mc == nil || tx.mc.netConn == nil {
return ErrInvalidConn
}
err = tx.mc.exec("COMMIT")
tx.mc = nil
return
}
func (tx *mysqlTx) Rollback() (err error) {
if tx.mc == nil || tx.mc.netConn == nil {
return ErrInvalidConn
}
err = tx.mc.exec("ROLLBACK")
tx.mc = nil
return
}
總結(jié)
最后,其實 buffer 的實現(xiàn)對我來說印象是最深刻的,因為它是最簡單而又是最有效的實現(xiàn)了一個消息緩沖器,它實現(xiàn)的巧妙讓我決定把它放到第一節(jié),而其他的幾乎都和 MySQL 的協(xié)議相關(guān),看這些源碼讓我對 MySQL 有了更多的認(rèn)識。
好了,本篇字?jǐn)?shù)比較多,也會有很多不足,希望大家能夠給本篇博客多提點意見,讓我可以改進的更好。如果還有機會,我會帶來其他篇章的源碼解析,敬請期待 :)