go-sql-driver 源碼解析

Intro

最近正在給 mysql 封裝一個庫,順帶研究一下 go-mysql-driver 這個庫的源碼實現(xiàn)。

Buffer.go

buffer 是一個用于給 數(shù)據(jù)庫連接 (net.Conn) 進行緩沖的一個數(shù)據(jù)結(jié)構(gòu),其結(jié)構(gòu)為:

type buffer struct {
    buf     []byte     // 緩沖池中的數(shù)據(jù)
    nc      net.Conn   // 負(fù)責(zé)緩沖的數(shù)據(jù)庫連接對象
    idx     int        // 已讀數(shù)據(jù)索引
    length  int        // 緩沖池中未讀數(shù)據(jù)的長度
    timeout time.Duration // 數(shù)據(jù)庫連接的超時設(shè)置
}

可以看到,因為 數(shù)據(jù)庫連接 (net.Conn) 在通信的時候是 同步 的。而為了讓其能夠 同時 讀/寫 ,所以實現(xiàn)了 buffer 這個數(shù)據(jù)結(jié)構(gòu),通過該 buffer 進行數(shù)據(jù)緩沖還能實現(xiàn) 零拷貝 ( zero-copy-ish ) 。

其函數(shù)分別有:

  • newBuffer(nc net.Conn) buffer :創(chuàng)建并返回一個 buffer

  • (*buffer) readNext(need int) ([]byte, error) :讀取并返回未讀數(shù)據(jù)的 need 位,如果 need 大于 bufferlength ,就會調(diào)用 fill(need int) errorbuffer進行 擴容 。

  • (*buffer) fill(need int) error :對 buffer 進行 (need/defaultBufSize) 的倍數(shù)擴容,并在 timeout 時間結(jié)束前從 buffer.nc 中讀取 need 長度的數(shù)據(jù)。

  • (*buffer) takeBuffer(length int) []byte :讀取 bufferlength 長度的數(shù)據(jù)(只包含已讀),如果 buffer.length > 0 ,即還有未讀數(shù)據(jù),則立即返回 nil 。如果需要讀取的長度大于 buffer 的容量,則會進行擴容。

  • (*buffer) takeSmallBuffer(length int) []byte :讀取保證不超過 defaultBufSize 長度的數(shù)據(jù)的快捷函數(shù)(只包含已讀),如果 buffer.length > 0 ,即還有未讀數(shù)據(jù),則立即返回 nil 。

  • (*buffer) takeCompleteBuffer() []byte : 讀取全部的 buffer 數(shù)據(jù)(只包含已讀),如果 buffer.length > 0 ,即還有未讀數(shù)據(jù),則立即返回 nil

Collations.go

collations 包含了 MySQL 所有支持的 字符集 格式,并支持通過 COLLATION_NAME 返回其字符集 ID。

如果需要查詢 MySQL 支持的 字符集 格式,可以使用 SELECT COLLATION_NAME, ID FROM information_schema.COLLATIONS 語句獲取。

Dsn.go

DSN數(shù)據(jù)源名稱 (Data Source Name) ,是 驅(qū)動程序連接數(shù)據(jù)庫的變量信息 ,簡而言之就是根據(jù)你連接的不同數(shù)據(jù)庫使用對應(yīng)的連接信息。

通常,數(shù)據(jù)庫的連接配置就是在這里定義的:

// Config 基本的數(shù)據(jù)庫連接信息
type Config struct {
    User         string            // Username
    Passwd       string            // Password (requires User)
    Net          string            // Network type
    Addr         string            // Network address (requires Net)
    DBName       string            // Database name
    Params       map[string]string // Connection parameters
    Collation    string            // Connection collation
    Loc          *time.Location    // Location for time.Time values
    TLSConfig    string            // TLS configuration name
    tls          *tls.Config       // TLS configuration
    Timeout      time.Duration     // Dial timeout
    ReadTimeout  time.Duration     // I/O read timeout
    WriteTimeout time.Duration     // I/O write timeout

    AllowAllFiles           bool // 允許文件使用 LOAD DATA LOCAL INFILE 導(dǎo)入數(shù)據(jù)庫
    AllowCleartextPasswords bool // 支持明文密碼客戶端
    AllowOldPasswords       bool // 允許使用不可靠的舊密碼
    ClientFoundRows         bool // 返回匹配的行數(shù)而不是受影響的行數(shù)
    ColumnsWithAlias        bool // 將表名前置在列名
    InterpolateParams       bool // 將占位符插入查詢的SQL字符串
    MultiStatements         bool // 允許一條語句多次查詢
    ParseTime               bool // 格式化時間值為 time.Time 變量
    Strict                  bool // 將 warnings 返回 errors
}

這都是一些常見的配置項,就此略過。

該文件有兩個公共函數(shù)支持 ConfigDSN 之間轉(zhuǎn)換。

  • (*Config)FormatDSN() string

  • ParseDSN(dsn string) (*Config, error)

Errors.go

errors 定義了 Logger 、MySQLError 、 MySQLWarning 等數(shù)據(jù)結(jié)構(gòu)。

Logger

復(fù)用了 Go 原生的 log 包,并將其中的輸出重定向至控制臺的 標(biāo)準(zhǔn)錯誤 。

type Logger interface {
  Print(v ...interface{})
}

var errLog = Logger(log.New(os.Stderr, "[mysql]", log.Ldate|log.Ltime|log.Lshortfile))

func SetLogger(logger Logger) error { // 當(dāng)然,你也可以使用自定義的錯誤 Logger
  if logger == nil {
    return errors.New("logger is nil")
  }
  errLog =logger
  return nil
}

MySQLError

MySQLError 則簡單定義了 MySQL 輸出的錯誤的結(jié)構(gòu)。

type MySQLError struct {
    Number  uint16
    Message string
}

MySQLWarning

MySQLWarning 則有些不一樣,它需要從 MySQL 中進行一次 查詢 ,以獲取所有的警告信息,所以該包也定義了 MySQLWarningslice 結(jié)構(gòu)。

type MySQLWarning struct {
    Level string
    Code string
    Message string
}

type MySQLWarnings []MySQLWarning

func (mc *mysqlConn) getWarnings() (err error) {
  rows, err := mc.Query("SHOW WARNINGS", nil)
  // handle err
  
  // initzation MySQLWarnings
  
  for {
    err = rows.Next(values)
    switch err {
      case nil:
        warning := MySQLWarning{}
        
      if raw, ok := values[0].([]byte); ok {
          warning.Level = string(raw)
      }else {
          warning.Level = fmt.Sprintf("%s", values[0])
      }
      
      if raw, ok := values[1].([]byte); ok {
        warning.Code = string(raw)
      } else {
        warning.Code = fmt.Sprintf("%s", values[1])
      }
      
      if raw, ok := values[2].([]byte); ok {
        warning.Message = string(raw)
      } else {
        warning.Message = fmt.Sprintf("%s", values[0])
      }

      warnings = append(warnings, warning)
    }
    
    case io.EOF:
        return warnings
    
    default:
        rows.Close() // 值得注意的是,如果該函數(shù)沒有 case 運行 default ,該 rows 就不會被默認(rèn)關(guān)閉,就會占用連接池中的一個連接,是否應(yīng)該使用 `defer rows.Close() ` 避免該情況?
        return
  }
}

Infile.go

前面也有提到 MySQL 在導(dǎo)入大型文件的時候,需要使用 LOAD DATA LOCAL INFILE 的形式進行導(dǎo)入,而該 infile.go 就是實現(xiàn)該協(xié)議的代碼。

本包在實現(xiàn)的 LOAD DATA 的時候提供了兩種方式進行導(dǎo)入:

  • 最常見的,使用服務(wù)器的文件路徑,如 /data/students.csv ,下文命名其為 文件路徑注冊器

  • 最通用的,使用實現(xiàn)了 io.Reader 接口的數(shù)據(jù)結(jié)構(gòu),通過返回該數(shù)據(jù)結(jié)構(gòu)的數(shù)據(jù)進行導(dǎo)入,如 bytes os.file 等,下文命名其為 Reader 接口注冊器

在實現(xiàn)該功能的時候,注冊器 的實現(xiàn)是用名字作為 Key 的 Map ,為了避免 Map讀寫競態(tài) ,需要對其配置一個讀寫鎖。

var (
    fileRegister        map[string]bool     // 文件路徑注冊器
    fileRegisterLock    sync.RWMutex        // 文件路徑注冊器讀寫鎖
    readerRegister      map[string]func() io.Reader // Reader 接口注冊器
    readerRegisterLock  sync.RWMutex                // Reader 接口注冊器讀寫鎖  
)

除了對兩個注冊器的 注冊 以及 注銷 函數(shù),還有一個需要分析的一個函數(shù):

(mc *mysqlConn) handleInFileRequest(name string) (err error)

通過傳入 文件路徑 或者 Reader 名稱 就可以將數(shù)據(jù)發(fā)往 MySQL 了。

func (mc *mysqlConn) handleInFileRequest(name string) (err error) {
    packSize := 16 * 1024 // 16KB is small enough for disk readahead and large enough for TCP
    if mc.maxWriteSize < packSize { // 設(shè)置發(fā)往 MySQL 的數(shù)據(jù)塊大小
        packSize = mc.maxWriteSize
    }
  
    // 獲取 文件 或 Reader 的數(shù)據(jù),并將其賦值到 rdr 中
    // var rdr io.Reader
  
    // send context packets
    if err != nil {
        data := make([]byte, 4+packetSize) // 需要留 4 個 byte 給協(xié)議使用
        var n int
        for err == nil {
            n, err = rdr.Read(data[4:]) // 將數(shù)據(jù)存入 data 的 [4:] 中
            if n > 0 {
                if ioErr := mc.writePacket(data[:4+n]); ioErr != nil { // 將 data 數(shù)據(jù)發(fā)往 MySQL
                    return ioErr
                }
            }
        }
        if err == io.EOF { // rdr 中的數(shù)據(jù)讀完了
            err = nil
        }
    }
  
    // send empty packet (termination)
    if data == nil {
        data = make([]byte, 4)
    }
    if ioErr := mc.writePacket(data[:4]); ioErr != nil { // 告訴 MySQL 文件發(fā)送完畢
        return ioErr
    }

    // read OK packet
    if err == nil { // 一切正常結(jié)束
        return mc.readResultOK()
    }

    mc.readPacket() // 如果中途出錯,將錯誤信息讀取到 mysqlConn 中,并返回該錯誤
    return err
}

到此,infile.go 的實現(xiàn)已經(jīng)整理完畢了,可以看到, 作者 在實現(xiàn)這個功能的時候還是做了一些優(yōu)化的,比如 map Lazy init ,send packet size limited 等。而我們通過分析規(guī)范的源碼包,能夠提升自己的編碼水平。

Packets.go

接下來就要深入到 MySQL 的通信協(xié)議中了,官方的 通信協(xié)議文檔 非常齊全,我在這里只將一些基礎(chǔ)的,我后面分析源碼會用到的協(xié)議分析下,如果有興趣,可以到官方文檔處進行查閱。

Protocol Basics

基礎(chǔ)數(shù)據(jù)類型

MySQL 通信的基本數(shù)據(jù)類型有兩種, Integer 、 String

  • Integer : 分別有 1, 2, 3, 4, 8 個字節(jié)長度的類型,使用小端傳輸。

  • String : 分別有 固定長度字符串(協(xié)議規(guī)定),NULL結(jié)尾字符串(長度不固定),長度編碼字符串(長度不固定)。

報文協(xié)議

報文分為 消息頭 以及 消息體,而 消息頭 由 3 字節(jié)的 消息長度 以及 1 字節(jié)的 序號 sequence (新客戶端由 0 開始)組成,消息體 則由 消息長度 的字節(jié)組成。

  • 3 字節(jié)的 消息長度 最大值為 0xFFFFFF ,即為 16 MB - 1 byte ,這就意味著,如果整個消息(不包括消息頭)的長度大于 16MB - 1byte - 4byte 大小時,消息就會被分包。

  • 1 字節(jié)的 序號 在每次新的客戶端發(fā)起請求時,以 0 開始,依次遞增 1 ,如果消息需要分包, 序號 會隨著分包的數(shù)量遞增。而在一次應(yīng)答中, 客戶端會校驗服務(wù)器 返回序號 是否與 發(fā)送序號 一致,如果不一致,則返回錯誤異常。

協(xié)議類型

  • handshake : 發(fā)起連接

  • auth : 登錄權(quán)限校驗

  • ok | error : 返回結(jié)果狀態(tài) *

  • ok : 首字節(jié)為 0 (0x00

  • error : 首字節(jié)為 255 (0xff

  • resultset : 結(jié)果集

  • header

  • field

  • eof

  • row

  • command package : 命令

在整個 MySQL 發(fā)起交互的過程如下圖所示:

mysql connect

在了解這些 MySQL 基礎(chǔ)協(xié)議知識后,我們再來看 packages.go 的源碼就輕松多了。

源碼

先來看看 readPacket ,結(jié)合上面的知識點應(yīng)該非常好理解。

func (mc *mysqlConn) readPacket() ([]byte, error) {
    var payload []byte
    for { // for 循環(huán)是為了讀取有可能分片的數(shù)據(jù)
        // Read package header
        data, err := mc.buf.readNext(4) // 從 buffer 緩沖器中讀取 4 字節(jié)的 header
        if err != nil { // 如果讀取發(fā)生異常,則關(guān)閉連接,并返回一個錯誤連接的異常
            errLog.Print(err)
            mc.Close()
            return nil, driver.ErrBadConn 
        }
      
        // Packet Length [24 bit]
        pktLen := int(uint32(data[0]) | uint32(data[1])<<8 | uint32(data[2])<<16) // 讀取 3 字節(jié)的消息長度
      
        if pktLen < 1 {
            // 如上所示,關(guān)閉連接,并返回一個錯誤連接的異常
        }
      
        // Check Packet Sync [8 bit]
        if data[3] != mc.sequence { // 判斷服務(wù)端返回的序號是否與客戶端一致
            if data[3] > mc.sequence {
                return nil, ErrPktSyncMul // 如果服務(wù)端返回序號大于客戶端的序號,則有可能是在一次請求中做了多次操作
            }
            return nil, ErrPktSync // 返回序號不一致錯誤
        }
        mc.sequence++ // 本次序號匹配相符,為了匹配下一次請求,先將序號自增1
      
        data, err := mc.buf.readNext(pktLen) // 讀取 消息長度 的數(shù)據(jù)
        if err != nil {
            // 如上所示,關(guān)閉連接,并返回一個錯誤連接的異常
        }
      
        isLastPacket := (pktLen < maxPacketSize) // 如果是最后一個數(shù)據(jù)包,必然小于 maxPacketSize (16MB - 1byte)
        
        // Zero allocations for non-splitting packets
        if isLastPacket && payload == nil { // 無分包情況,立即返回
            return data, nil
        }

        payload = append(payload, data...)

        if isLastPacket { // 如果是最后一個包,讀取完畢后返回
            return payload, nil
        }
      
        // 還有未讀數(shù)據(jù),開始下一次循環(huán)
    }
}

下面來看下結(jié)合 握手報文協(xié)議 來看下客戶端向服務(wù)端發(fā)起請求的 readInitPacket

mysql handshack protocol
func (mc *mysqlConn) readInitPacket() ([]byte, error) {
    data, err := mc.readPacket() // 調(diào)用上面的函數(shù)讀取服務(wù)端返回的數(shù)據(jù)
    if err != nil {
        return nil, err
    }
  
    if data[0] == iERR { // iERR = 0xff  消息體的第一個字節(jié)返回 0xff ,則意味著 error package
        return nil, mc.handleErrorPacket(data)
    }
  
    // protocol version [1 byte]
    if data[0] < minProtocolVersion { // 判斷是否是兼容的協(xié)議版本
        return nil, fmt.Errorf(
            "unsupported protocol version %d. Version %d or higher is required",
            data[0],
            minProtocolVersion,
        )
    }
  
    // server version [null terminated string]
    // connection id [4 bytes]
    pos := 1 + bytes.IndexByte(data[1:], 0x00) + 1 + 4 // 讀取 NULL (0x00)為結(jié)尾的字符串,跳過服務(wù)器線程 ID
  
    // first part of the password cipher [8 bytes]
    cipher := data[pos : pos+8] // 獲取挑戰(zhàn)隨機數(shù)
  
    // (filler) always 0x00 [1 byte]
    pos += 8 + 1
  
    // capability flags (lower 2 bytes) [2 bytes]
    mc.flags = clientFlag(binary.LittleEndian.Uint16(data[pos : pos+2])) // 獲取服務(wù)器權(quán)能標(biāo)識
    if mc.flags&clientProtocol41 == 0 { // 說明 MySQL 服務(wù)器不支持高于 41 版本的協(xié)議
        return nil, ErrOldProtocol
    }
    if mc.flags&clientSSL == 0 && mc.cfg.tls != nil { // 說明 MySQL 服務(wù)器需要 SSL 加密,但是客戶端沒有配置 SSL
        return nil, ErrNoTLS
    }
    pos += 2 // 指針向后兩位
  
    if len(data) > pos {
        // 指針跳過標(biāo)志位
        pos += 1 + 2 + 2 + 1 + 10

        // second part of the password cipher [mininum 13 bytes],
        // where len=MAX(13, length of auth-plugin-data - 8)
        //
        // The web documentation is ambiguous about the length. However,
        // according to mysql-5.7/sql/auth/sql_authentication.cc line 538,
        // the 13th byte is "\0 byte, terminating the second part of
        // a scramble". So the second part of the password cipher is
        // a NULL terminated string that's at least 13 bytes with the
        // last byte being NULL.
        //
        // The official Python library uses the fixed length 12
        // which seems to work but technically could have a hidden bug.
        cipher = append(cipher, data[pos:pos+12]...)

        // TODO: Verify string termination
        // EOF if version (>= 5.5.7 and < 5.5.10) or (>= 5.6.0 and < 5.6.2)
        // \NUL otherwise
        //
        //if data[len(data)-1] == 0 {
        //  return
        //}
        //return ErrMalformPkt

        // make a memory safe copy of the cipher slice
        var b [20]byte
        copy(b[:], cipher)
        return b[:], nil
    }

    // make a memory safe copy of the cipher slice
    var b [8]byte // 返回 8 字節(jié)的挑戰(zhàn)隨機數(shù)
    copy(b[:], cipher)
    return b[:], nil
}

除了上面解析的兩個函數(shù), packages.go 還有 initialisation process / result packages / prepared statements 等協(xié)議的 寫入/讀取 ,有興趣的讀者可以結(jié)合上面的知識點自行閱讀。

Driver.go

接下來就要分析一些比較重要的代碼了,比如接下來要講的 driver.go ,它主要負(fù)責(zé)與 MySQL 數(shù)據(jù)庫進行各種協(xié)議的連接,并返回該連接??梢哉f它才是最基礎(chǔ)、最核心的功能。

不過首先我們需要看下 database/sql 包中的 Driver 接口需要如何實現(xiàn):

// database/sql/driver/driver.go

// 數(shù)據(jù)庫驅(qū)動
type Driver interface {
  Open(name string) (Conn, error)
}

// ...

// 非并發(fā)安全數(shù)據(jù)庫連接
type Conn interface {
  // 返回一個綁定到 sql 的準(zhǔn)備語句
  Prepare(query string) (Stmt, error)
  
  // 關(guān)閉該連接,并標(biāo)記為不再使用,停止所有準(zhǔn)備語句和事務(wù)
  // 因為 database/sql 包維護了一個空閑的連接池,并且在空閑連接過多的時候會自動調(diào)用 Close ,所以驅(qū)動程序包不需要顯式調(diào)用該函數(shù)
  Close() error
  
  // 開始并返回一個新的事務(wù),而新的事務(wù)與舊的連接沒有任何關(guān)聯(lián)
  Begin() (Tx, error)
}

根據(jù) database/sql 提供的 Driver 接口, go-sql-driver/mysql 實現(xiàn)了自己的 數(shù)據(jù)庫驅(qū)動 結(jié)構(gòu):

type MySQLDriver struct{}

func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
  mc := &mysqlConn {
      // set max value
  }
  mc.cfg = ParseDSN(dsn) // 通過解析 DSN 設(shè)置 MySQL 連接的配置

  // set parseTime and strict
  // ...
  
  // connect to server
  if dial, ok := dials[mc.cfg.Net]; ok { // 根據(jù) 地址 以及 協(xié)議類型,嘗試連接上服務(wù)器
    mc.netConn, err = dial(mc.cfg.Addr)
  } else { // 連接服務(wù)器失敗,嘗試重連
    nd := net.Dialer{Timeout: mc.cfg.Timeout}
    mc.netConn, err := nd.Dial(mc.cfg.Net, mc.cfg.Addr)
  }
  if err !=  nil { // 重試失敗,返回異常
      return nil, err
  }
  
  // Enable TCP Keepalives on TCP connections
  if tc, ok := mc.netConn.(*net.Conn); ok { // tcp 連接類型轉(zhuǎn)換
    if err := tc.SetKeepAlive(true); err != nil {
      // Don't send COM_QUIT before handshake.
      mc.netConn.Close() // 如果設(shè)置長連接失敗,返回異常之前一定要記得將連接斷開
      mc.netConn = nil
      return nil, err
    }
  }
  
  mc.buff = newBuff(mc.netConn) // 生成一個帶緩沖的 buffer,如上面 buffer.go 中所說
  
  // set I/O timeout
  // ...
  
  // Reading Handshake Initialization Packet
  cipher, err := mc.readInitPacket() // 發(fā)起數(shù)據(jù)庫首次握手
  if err != nil {
    mc.cleanup() // 將當(dāng)前 mysqlConn 對象銷毀,后面我們會說這個函數(shù)
    return nil, err
  }
  
  // Send Client Authentication Packet
  if err = mc.writeAuthPacket(cipher); err != nil { // 向數(shù)據(jù)庫發(fā)送登錄信息校驗
    mc.cleanup()
    return nil, err
  }
}

connection.go

終于要講到這個包的核心數(shù)據(jù)結(jié)構(gòu) mysqlConn 了,可以說,驅(qū)動的所有功能幾乎都圍繞著這個數(shù)據(jù)結(jié)構(gòu),我們先來看看它的結(jié)構(gòu):

type mysqlConn struct {
    buf              buffer     // buffer 緩沖器
    netConn          net.Conn   // 網(wǎng)絡(luò)連接
    affectedRows     uint64     // sql 執(zhí)行成功影響行數(shù)
    insertId         uint64     // sql 添加成功最新的主鍵 ID
    cfg              *Config    // dsn 中的 基礎(chǔ)配置
    maxPacketAllowed int        // 允許的最大報文的字節(jié)長度,最大不能超過 (16MB - 1byte)
    maxWriteSize     int        // 允許最大的寫入字節(jié)長度,最大不能超過 (16MB - 1byte)
    writeTimeout     time.Duration  // 執(zhí)行 sql 的 超時時間
    flags            clientFlag     // 客戶端狀態(tài)標(biāo)識
    status           statusFlag     // 服務(wù)端狀態(tài)標(biāo)識
    sequence         uint8          // 序號
    parseTime        bool           // 是否格式化時間
    strict           bool           // 是否使用嚴(yán)格模式
}

// driver.go
// 而創(chuàng)建一個 mysqlConn 連接需要通過 driver.go 中的 Open 函數(shù),也說明 mysqlConn 實現(xiàn)了 driver.Conn 接口
func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
  mc := &mysqlConn{
      // ...
  }
  
  // ...
  
  return mc, nil
}

當(dāng)一個新的客戶端連接上服務(wù)器的時候 (三次握手結(jié)束,客戶端進入 established 狀態(tài)),需要先對 MySQL 服務(wù)器進行 會話的用戶/系統(tǒng)環(huán)境變量 的設(shè)置。

// Handles parameters set in DSN after the connection is established
func (mc *mysqlConn) handleParams() (err error) {
    for param, val := range mc.cfg.Params { // Params: map[string]string
        switch param {
        // Charset
        case "charset": // 如果是字符集,則調(diào)用 SET NAMES 命令
            charsets := strings.Split(val, ",")
            for i := range charsets {
                // ignore errors here - a charset may not exist
                err = mc.exec("SET NAMES " + charsets[i])
                if err == nil {
                    break
                }
            }
            if err != nil {
                return
            }

        // System Vars
        default: // 執(zhí)行系統(tǒng)環(huán)境變量設(shè)置
            err = mc.exec("SET " + param + "=" + val + "")
            if err != nil {
                return
            }
        }
    }
}

conntion.go 還負(fù)責(zé) 事務(wù)預(yù)處理語句執(zhí)行/查詢 的管理,但是基本都是往 mysqlConn 中發(fā)送 command package ,如:

// Begin 開啟事務(wù)
func (mc *mysqlConn) Begin() (driver.Tx, error) {
    if mc.netConn == nil {
        errLog.Print(ErrInvalidConn)
        return nil, driver.ErrBadConn
    }
    err := mc.exec("START TRANSACTION")
    if err == nil {
        return &mysqlTx{mc}, err // 返回成功開啟的事務(wù),重用之前的連接
    }

    return nil, err
}

// Internal function to execute commands
func (mc *mysqlConn) exec(query string) error {
    // Send command
    err := mc.writeCommandPacketStr(comQurey, query)
    if err != nil {
        return err
    }
  
    // Read Result
    resLen, err := mc.readResultSetHeaderPacket() // 根據(jù) data[0] 的值判斷是否出錯,如果沒有錯誤,則返回消息體的長度
    if err == nil && resLen > 0 { // 存在有效消息體
        if err = mc.readUntilEOF(); err != nil { // 讀取 columns
            return err
        }

        err = mc.readUntilEOF() // 讀取 rows
    }

    return err
}

我想 conntion.go 中最重要的一個函數(shù)應(yīng)該是 cleanup ,它負(fù)責(zé)將 連接關(guān)閉 、 重置環(huán)境變量 等功能,但是該函數(shù)不能隨意調(diào)用,它只有在 登錄權(quán)限校驗異常 時候才應(yīng)該被調(diào)用,否則服務(wù)器在不知道客戶端 被強行關(guān)閉 的情況下,依然會向該客戶端發(fā)送消息,導(dǎo)致嚴(yán)重異常:

// Closes the network connection and unsets internal variables. Do not call this
// function after successfully authentication, call Close instead. This function
// is called before auth or on auth failure because MySQL will have already
// closed the network connection.
func (mc *mysqlConn) cleanup() {
    // Makes cleanup idempotent 保證函數(shù)的冪等性
    if mc.netConn != nil {
        if err := mc.netConn.Close(); err != nil { // Close 會嘗試發(fā)送 comQuit command 到服務(wù)器
            errLog.Print(err)
        }
        mc.netConn = nil // 不管 Close 是否成功,必須將 netConn 清空
    }
    mc.cfg = nil
    mc.buf.nc = nil // 緩沖器中的 netConn 也要關(guān)閉
}

Result.go

每當(dāng) MySQL 返回一個 OK狀態(tài)報文 ,該報文協(xié)議會攜帶上本次執(zhí)行的結(jié)果 affectedRows 以及 insertId ,而 result.go 就包含著一個數(shù)據(jù)結(jié)構(gòu),用于存儲本次的執(zhí)行結(jié)果。

type mysqlResult struct {
    affectedRows int64
    insertId     int64
}

// 兩個 getter
func (res *mysqlResult) LastInsertId() (int64, error) {
    return res.insertId, nil
}

func (res *mysqlResult) RowsAffected() (int64, error) {
    return res.affectedRows, nil
}

接下來我們看下在 conntion.go 中是怎么生成 mysqlResult 對象的:

// connect.go
func (mc *mysqlConn) Exec(query string, args []driver.Value) (driver.Result, error) {
  
    // ...
  
    err := exec(query)
    if err == nil {
        return &mysqlResult{ // 返回執(zhí)行的結(jié)果
            affectedRows: int64(mc.affectedRows),
            insertId:     int64(mc.insertId),
        }, err
    }
    return nil, err
}

// exec 函數(shù)的解析可以返回上面 package.go 中瀏覽

// package.go
func (mc *mysqlConn) readResultSetHeaderPacket() (int, error) {
    data, err := mc.readPacket()
    if err == nil {
        switch data[0] {

        case iOK:
            return 0, mc.handleOkPacket(data) // 處理 OK 狀態(tài)報文

        // ...
}

func (mc *mysqlConn) handleOkPacket(data []byte) error {
    var n, m int

    // 0x00 [1 byte]

    // Affected rows [Length Coded Binary]
    mc.affectedRows, _, n = readLengthEncodedInteger(data[1:])

    // Insert id [Length Coded Binary]
    mc.insertId, _, m = readLengthEncodedInteger(data[1+n:])

    // ...
}

Row.go

當(dāng) MySQL 執(zhí)行 插入、更新、刪除 等操作后,都會返回 Result ,但是 查詢 返回的是 Rows ,我們先來看看 go-mysql-driver 驅(qū)動所實現(xiàn)的 接口 Rows 的接口描述:

// database/sql/driver/driver.go
// Rows 是執(zhí)行查詢返回的結(jié)果的 游標(biāo)
type Rows interface {
    // Columns 返回列的名稱,從 slice 的長度可以判斷列的長度
    // 如果一個列的名稱未知,則為該列返回一個空字符串
    Columns() []string
  
    // Close 關(guān)閉游標(biāo)
    Close() error
  
    // Next 將下一行數(shù)據(jù)填充到 desc 切片中
    // 如果讀取的是最后一行數(shù)據(jù),應(yīng)該返回一個 io.EOF 錯誤
    Next(desc []Value) error
}

type Value interface{} // Value is a value that drivers must be able to handle.

為什么我要說這是 go-mysql-driver 驅(qū)動所實現(xiàn)的 接口 Rows 呢?眼尖的同學(xué)應(yīng)該已經(jīng)看到了, Next 函數(shù)好像和我們平常見到的不一樣?。?!

是的,因為我們平常使用的:

  • rows.Next()

  • rows.Scan(dest ...interface{}) error

等函數(shù)的對象 rows 并不是上面的 接口描述 Rows ,而是另一個封裝的 同名數(shù)據(jù)結(jié)構(gòu) Rows ,它就在 database/sql 包中 :

// database/sql.go
type Rows struct {
    dc          *driverConn 
    releaseConn func(error)
    rowsi       driver.Rows // 接口描述的 Rows 藏在這!??!
    
    // 忽略其他字段,因為我們不分析這個包...
  
    // lastcols is only used in Scan, Next, and NextResultSet which are expected
    // not not be called concurrently.
    lastcols []driver.Value
}

我們跳過 database/sql 包中的 Rows 實現(xiàn),其無非是提供了更多功能的一個結(jié)果集而已,讓我們回到真正與數(shù)據(jù)庫進行交互的 Rows 中進行源碼分析。

go-sql-driver 實現(xiàn)的 mysqlRows 數(shù)據(jù)結(jié)構(gòu)只實現(xiàn)了 Columns()Close() 兩個行數(shù),剩下的 Next(desc []driver.Value) 實現(xiàn)則交給了 MySQL 的兩種結(jié)果集協(xié)議:

// rows.go

type mysqlField struct {
    tableName string
    name      string
    flags     fieldFlag
    fieldType byte
    decimals  byte
}

type mysqlRows struct {
    mc      *mysqlConn
    columns []mysqlField
}

type binaryRows struct { // 二進制結(jié)果集協(xié)議
    mysqlRows // 對于 Go 的 組合特性 應(yīng)該不會陌生吧?
}

type textRows struct { // 文本結(jié)果集協(xié)議
    mysqlRows
}

func (rows *mysqlRows) Columns() []string {
    columns := make([]string, len(rows.columns))
    
    // 將列名賦值到 columns ,如果有設(shè)置別名則賦值別名...
  
    return columns
}

func (rows *mysqlRows) Close() error {
    // 將連接里面的未讀數(shù)據(jù)讀完,然后將連接置空
}

// 接下來的 Next 函數(shù)實現(xiàn)就交由 binaryRows 和 textRows 了
func (rows *binaryRows) Next(desc []driver.Value) error {
    if mc := rows.mc; mc != nil {
        if mc.netConn == nil {
            return ErrInvalidConn
        }

        return rows.readRow(dest) // 讀二進制協(xié)議結(jié)果集
    }
    return io.EOF
}

func (rows *testRows) Next(desc []driver.Value) error {
    if mc := rows.mc; mc != nil {
        if mc.netConn == nil {
            return ErrInvalidConn
        }

        return rows.readRow(dest) // 讀取文本協(xié)議
    }
    return io.EOF
}

可以說,實現(xiàn)了 driver.Rows 接口的只有 binaryRowstestRows ,而他們里面的 readRow(desc) 實現(xiàn)由于都是和協(xié)議強相關(guān)的代碼,就不再解析了。

我們跟著源碼可以看到,使用 textRows 的場景在 getSystemVar 以及 Query 中,而使用 binaryRows 的場景在 statement 中,就是我們下一步需要解析的部分。

Statement.go

Prepared Statement ,即預(yù)處理語句,他有什么優(yōu)勢呢,為什么 MySQL 要加入它?

  • 執(zhí)行性能更高:MySQL 會對 Prepared Statement 語句預(yù)先進行編譯成模板,并將 占位符 替換 參數(shù) 的位置,這樣如果頻繁執(zhí)行一條參數(shù)只有少量替換的語句時候,性能會得到大量提高??赡苡型瑢W(xué)會有疑問,為什么 MySQL 語句還需要編譯?那么可以來參考下這篇 MySQL Prepare 原理 。

  • 傳輸協(xié)議更優(yōu):Prepare Statement 在傳輸時候使用的是 Binary Protocol ,比使用 Text Protocol 的查詢具有 傳輸數(shù)據(jù)量更小 、 無需轉(zhuǎn)換數(shù)據(jù)格式 等優(yōu)勢,緩解了 CPU網(wǎng)絡(luò) 的開銷。

  • 安全性更好:由 MySQL Prepare 原理 我們可以知道,Perpare 編譯之后會生成 語法樹,在執(zhí)行的時候才會將參數(shù)傳進來,這樣就避免了平常直接執(zhí)行 SQL 語句 會發(fā)生的 SQL 注入 問題。

好了,先來看下 mysqlStmt 的數(shù)據(jù)結(jié)構(gòu):

type mysqlStmt struct {
    mc          *mysqlConn
    id          uint32
    paramCount  int
    columns     []mysqlField // cached from the first query (既然SQL已經(jīng)預(yù)編譯好了,返回的結(jié)果集列名已經(jīng)是確定的,所以在收到 PREPARE_OK 之后解析數(shù)據(jù)后會緩存下來)
}

我們發(fā)現(xiàn),它比 mysqlRows 多了兩個成員變量:

  • idMySQL 預(yù)處理語句之后,會給該語句分配一個 id 并返回客戶端,用于:

  • 客戶端提交該 id 給服務(wù)器調(diào)用對應(yīng)的預(yù)處理語句。

  • paramCount :參數(shù)數(shù)量,等于 占位符 的個數(shù),用于:

  • 判斷傳入的參數(shù)個數(shù)是否與預(yù)編譯語句中的占位符個數(shù)一致。

  • 判斷返回的 PREPARE_OK 響應(yīng)報文是否帶有 參數(shù)列名 數(shù)據(jù)。

下面來看看如何創(chuàng)建并使用一個 Prepare Statement

func (mc *mysqlConn) Prepare(query string) (driver.Stmt, error) { // 傳入需要預(yù)編譯的 SQL 語句
    // 檢查連接是否可用...
  
    err = mc.writeCommandPacketStr(comStmtPrepare, query) // 將 SQL 發(fā)往數(shù)據(jù)庫進行預(yù)編譯
    if err != nil {
        return nil, err
    }

    stmt := &mysqlStmt{ // 預(yù)編譯成功,先創(chuàng)建 stmt 對象
        mc: mc,
    }
  
    // Read Result
    columnCount, err := stmt.readPrepareResultPacket() // 從 stmt 的連接讀取返回 響應(yīng)報文
    if err == nil {
        if stmt.paramCount > 0 { // 如果預(yù)編譯的 SQL 的有參數(shù)
            if err = mc.readUntilEOF(); err != nil { // 讀取參數(shù)列名數(shù)據(jù)
                return nil, err
            }
        }
        
        if columnCount > 0 { // 返回執(zhí)行結(jié)果的列表個數(shù)
            err = mc.readUntilEOF() // 讀取執(zhí)行結(jié)果的列名數(shù)據(jù)
        }
    }
  
    return stmt, err
}

因為是已經(jīng)預(yù)編譯好的語句,所以在執(zhí)行的時候只需要將參數(shù)傳進去就可以了。

func (stmt *mysqlStmt) Exec(args []driver.Value) (driver.Result, error) {
    // 檢查連接是否可用...
  
    err := stmt.writeExecutePacket(args)
    if err != nil {
        return nil, err
    }
  
    // 讀取結(jié)果集的行、列數(shù)據(jù)
}

func(stmt *mysqlStmt) writeExecutePacket(args []driver.Value) error {
    if len(args) != stmt.paramCount { // 判斷傳進來的參數(shù)和預(yù)編譯好的SQL參數(shù) 個數(shù)是否一致
        return fmt.Errorf(
            "argument count mismatch (got: %d; has: %d)",
            len(args),
            stmt.paramCount,
        )
    }
  
    // 讀取緩沖器中的數(shù)據(jù),如果為空,則返回異常...
  
    // command [1 byte]
    data[4] = comStmtExecute

    // statement_id [4 bytes] 將預(yù)編譯語句的 id 轉(zhuǎn)換為 4字節(jié)的二進制數(shù)據(jù)
    data[5] = byte(stmt.id)
    data[6] = byte(stmt.id >> 8)
    data[7] = byte(stmt.id >> 16)
    data[8] = byte(stmt.id >> 24)

    // flags (0: CURSOR_TYPE_NO_CURSOR) [1 byte]
    data[9] = 0x00

    // iteration_count (uint32(1)) [4 bytes]
    data[10] = 0x01
    data[11] = 0x00
    data[12] = 0x00
    data[13] = 0x00
  
    // 將參數(shù)按照不同的類型轉(zhuǎn)換為 binary protobuf 并 append 到 data 中...
  
    return mc.writePacket(data)
}

相信看到這里,已經(jīng)能對看懂源碼的 70% 了,剩余的代碼都是和協(xié)議相關(guān),就留待有興趣的讀者繼續(xù)研究,這里就不再展開講了。

Transaction.go

事務(wù)是 MySQL 中很重要的一部分,但是驅(qū)動的實現(xiàn)卻很簡單,因為一切的事務(wù)控制都已經(jīng)交由 MySQL 去執(zhí)行了,驅(qū)動所需要做的,只要發(fā)送一個 commit 或者 rollbackcommand packet 即可。

type mysqlTx struct {
    mc *mysqlConn
}

func (tx *mysqlTx) Commit() (err error) {
    if tx.mc == nil || tx.mc.netConn == nil {
        return ErrInvalidConn
    }
    err = tx.mc.exec("COMMIT")
    tx.mc = nil
    return
}

func (tx *mysqlTx) Rollback() (err error) {
    if tx.mc == nil || tx.mc.netConn == nil {
        return ErrInvalidConn
    }
    err = tx.mc.exec("ROLLBACK")
    tx.mc = nil
    return
}

總結(jié)

最后,其實 buffer 的實現(xiàn)對我來說印象是最深刻的,因為它是最簡單而又是最有效的實現(xiàn)了一個消息緩沖器,它實現(xiàn)的巧妙讓我決定把它放到第一節(jié),而其他的幾乎都和 MySQL 的協(xié)議相關(guān),看這些源碼讓我對 MySQL 有了更多的認(rèn)識。

好了,本篇字?jǐn)?shù)比較多,也會有很多不足,希望大家能夠給本篇博客多提點意見,讓我可以改進的更好。如果還有機會,我會帶來其他篇章的源碼解析,敬請期待 :)

參考鏈接

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容