PostgreSQL 并發(fā)控制機(jī)制(3):基于時(shí)間戳的并發(fā)控制

并發(fā)控制是多個(gè)事務(wù)在并發(fā)運(yùn)行時(shí),數(shù)據(jù)庫(kù)保證事務(wù)一致性(Consistency)和隔離性(Isolation)的一種機(jī)制。主流商用關(guān)系數(shù)據(jù)庫(kù)使用的并發(fā)控制技術(shù)主要有三種:嚴(yán)格兩階段封鎖(S2PL)、多版本并發(fā)控制(MVCC)和樂(lè)觀并發(fā)控制(OCC)。
本文介紹了基于時(shí)間戳的并發(fā)控制,包括常規(guī)基于時(shí)間戳的并發(fā)控制、多版本基于時(shí)間戳鎖的并發(fā)控制、與時(shí)間戳相關(guān)的Google Percolator工程實(shí)踐和openGauss對(duì)PostgreSQL SI的優(yōu)化改進(jìn)等。

一、基于時(shí)間戳的并發(fā)控制

基于時(shí)間戳的并發(fā)控制,有常規(guī)的“單”版本并發(fā)控制,也有多版本基于時(shí)間戳并發(fā)控制。

1、單版本基于時(shí)間戳的并發(fā)控制

事務(wù)T在啟動(dòng)時(shí)分配一個(gè)唯一的時(shí)間戳TS(T),并發(fā)控制保證存在一個(gè)等價(jià)的串行調(diào)度,該調(diào)度中的事務(wù)是按事務(wù)的啟動(dòng)時(shí)間先后順序,亦即事務(wù)按它們的啟動(dòng)順序串行化而不是按提交的順序串行化。
為了實(shí)現(xiàn)這一點(diǎn),基于時(shí)間戳順序并發(fā)控制使用以下數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)數(shù)據(jù)項(xiàng)x的以下信息:
1.rt(x):任何讀x的事務(wù)的最大時(shí)間戳
2.wt(x):任何寫x的事務(wù)的最大時(shí)間戳
3.f(x):標(biāo)志,用于表明最后寫x的事務(wù)是否已提交

使用上述數(shù)據(jù)結(jié)構(gòu)進(jìn)行讀寫并發(fā)控制的算法如下:

當(dāng)事務(wù)T1請(qǐng)求讀x時(shí)
1.第一種情況:TS(T1)<\wt(x)
讀事務(wù)的啟動(dòng)時(shí)間小于最晚的寫時(shí)間,意味著其他事務(wù)在T1后寫入了新值,T1希望讀到的原值已不存在,T1由于太“老”不能讀,T1必須回滾

2.第二種情況:TS(T1)>wt(x)
讀事務(wù)的啟動(dòng)時(shí)間大于最晚寫入時(shí)間,這時(shí)候又分為分兩種情況:
A.f(x)標(biāo)記x已提交,則請(qǐng)求被批準(zhǔn),如TS(T1)>rt(x),則更新rt(x)為TS(T1)
B.f(x)標(biāo)記x未提交,則T1必須等待(否則會(huì)導(dǎo)致臟讀)

當(dāng)事務(wù)T1請(qǐng)求寫x時(shí)
1.第一種情況:TS(T1)<\rt(x)
寫事務(wù)的時(shí)間比最晚的讀時(shí)間要早,意味著過(guò)晚的寫,T1太老不能寫,T1必須回滾

2.第二種情況:rt(x)<TS(T1)<\wt(x)
寫事務(wù)的啟動(dòng)時(shí)間大于最晚讀時(shí)間,小于最晚寫時(shí)間
如f(x)標(biāo)記x已提交,則根據(jù)Thomas寫規(guī)則,請(qǐng)求被批準(zhǔn),但沒(méi)有任何的實(shí)際動(dòng)作(因?yàn)門1寫的x值不會(huì)被任何其他事務(wù)讀?。?br> 如f(x)標(biāo)記x未提交,T1必須等待(否則會(huì)導(dǎo)致臟寫)

3.第三種情況:wt(x)<\rt(x)<TS(T1)
寫事務(wù)的啟動(dòng)時(shí)間比最晚讀時(shí)間要大,且最晚讀時(shí)間比最晚寫時(shí)間要晚
如f(x)標(biāo)記x未提交,那么T1必須等待(否則會(huì)導(dǎo)致臟寫)
如f(x)標(biāo)記x已提交,那么請(qǐng)求被批準(zhǔn),TS(T1)值賦予wt(x),f(x)值設(shè)置為未提交

從上述算法也可以看出該策略的一些缺點(diǎn):
1.額外的空間開銷:每個(gè)數(shù)據(jù)項(xiàng)都需要記錄rt(x),wt(x),f(x)
2.額外的“事務(wù)控制”:rt(x),wt(x),f(x)這些信息存儲(chǔ)在數(shù)據(jù)庫(kù)中,必須像數(shù)據(jù)項(xiàng)一樣進(jìn)行更新,如果事務(wù)中止,它們必須被回退
3.額外的處理邏輯:對(duì)x的讀意味著對(duì)rt(x)的寫,邏輯復(fù)雜

2、多版本基于時(shí)間戳的并發(fā)控制

相對(duì)于單版本,多版本時(shí)間戳并發(fā)控制有以下調(diào)整:
1.當(dāng)事務(wù)T的w(X)發(fā)生時(shí),如果合法,則創(chuàng)建一個(gè)X的新版本,其寫時(shí)間為事務(wù)T的啟動(dòng)時(shí)間,這個(gè)X的新版本可以記作Xt
2.當(dāng)事務(wù)T的r(X)發(fā)生時(shí),事務(wù)調(diào)度器找到X的版本Xt,需滿足的調(diào)節(jié)是t≤TS(T),并且在t之后不存在另外一個(gè)X的版本
3.寫時(shí)間與數(shù)據(jù)項(xiàng)的版本相關(guān)且永不改變
4.讀時(shí)間與數(shù)據(jù)項(xiàng)的版本相關(guān),用于拒絕小于該讀時(shí)間的寫操作,因?yàn)槿绻试S改寫,那么讀操作應(yīng)讀取該寫操作的值,但這是不可能發(fā)生的

二、Google Percolator

Google Percolator的背景源于搜索索引系統(tǒng)存儲(chǔ)數(shù)十PB的數(shù)據(jù),每天有數(shù)十億的更新,而MapReduce適合處理大批量的數(shù)據(jù),而處理少量更新則顯得效率較低,因此Percolator應(yīng)運(yùn)而生:為處理增量更新而設(shè)計(jì),處理同樣數(shù)據(jù)量的文檔時(shí)可以將平均時(shí)延降低50%。
Percolator使用多版本基于時(shí)間戳并結(jié)合封鎖技術(shù)的并發(fā)控制,可以實(shí)現(xiàn):
1.保證讀事務(wù)可以讀到最近最新已提交的數(shù)據(jù)版本;
2.讀不會(huì)阻塞寫(但寫會(huì)阻塞讀);
3.寫-寫沖突時(shí),使用FCW協(xié)議。

Percolator網(wǎng)上介紹的文章已有很多,其關(guān)鍵的數(shù)據(jù)結(jié)構(gòu)包括:
1.事務(wù)啟動(dòng)時(shí)間start_ts
2.事務(wù)提交時(shí)間commit_ts
3.鎖,KV結(jié)構(gòu),key=Row+Column(lock)+start_ts,value=primary row&column
4.實(shí)際列值data,KV結(jié)構(gòu),包括數(shù)據(jù)指針和實(shí)際數(shù)據(jù),KV結(jié)構(gòu)
1)指針:key=Row+Column(write)+commit_ts,value=start_ts
2)數(shù)據(jù):Row+Column(data)+start_ts,value=列值

在此對(duì)論文提供的偽代碼進(jìn)行更詳細(xì)的注釋解讀,其他不再贅述。

class Transaction { //class Transaction
  //結(jié)構(gòu)體Write
  struct Write {
    Row row;    //行
    Column col;   //列
    string value; //列值
  };//Write Struct 
  vector<Write> writes_;//數(shù)據(jù)緩存Write
  int start_ts_;//事務(wù)開始時(shí)間

  Transaction() : start_ts_(oracle.GetTimestamp()) {} //構(gòu)造函數(shù),初始化變量start_ts_
  
  /*
  輸入:Write結(jié)構(gòu)體
  輸出:無(wú)
  實(shí)現(xiàn):簡(jiǎn)單的把Write對(duì)象(列值)push到Vector中
  */
  void Set(Write w) {//Set函數(shù)
    writes_.push_back(w); 
  }
  
  /*
  輸入:Row-行標(biāo)識(shí),Column-列標(biāo)識(shí)
  輸出:value-列值,成功返回true,失?。ㄈ鐩](méi)有獲取值)返回false
  */
  bool Get(Row row, Column c, string* value) {
    while (true) {
    //Bigtable提供的行級(jí)事務(wù)
      bigtable::Txn T = bigtable::StartRowTransaction(row);
      // Check for locks that signal concurrent writes.
    // 檢查是否存在并發(fā)事務(wù)在寫數(shù)據(jù)
    // 注:SI的特點(diǎn)是寫不阻塞讀,讀不阻塞寫,但在這里卻需要等待?
    //     原因是SI保證讀到的是事務(wù)開始(start_ts)之前已提交的數(shù)據(jù),
    //     存在鎖意味著寫操作未完成且該操作的commit_ts可能在事務(wù)開始之前,
    //   但需要在寫入之后才能知道是否在start_ts之前,因此需要等待
      if (T.Read(row, c+"lock", [0, start_ts_])) { //判斷[0, start_ts_]內(nèi)是否存在lock?
        // There is a pending lock; try to clean it and wait
    // 仍存在lock,等待
        BackoffAndMaybeCleanupLock(row, c);
        continue;
      }
      // Find the latest write below our start timestamp.
    //讀取最近已提交的數(shù)據(jù)版本
      latest write = T.Read(row, c+"write", [0, start_ts_]);
      if (!latest_write.found())
    //沒(méi)有數(shù)據(jù),返回false
        return false; // no data
    //從Column+write中獲取start_ts
      int data_ts = latest_write.start_timestamp();
    //獲取真正的數(shù)據(jù):Row+Column(column+"data")+start_ts
      *value = T.Read(row, c+"data", [data_ts, data_ts]);
      return true;
    }
  }
  
  // Prewrite tries to lock cell w, returning false in case of conflict.
  // 預(yù)寫入(理論基礎(chǔ):通過(guò)意向表緩存數(shù)據(jù),執(zhí)行延遲更新)
  /*  
  輸入:Write結(jié)構(gòu)體,Write主節(jié)點(diǎn)
  輸出:成功返回true,失敗返回false
  */
  bool Prewrite(Write w, Write primary) {
  //獲取列
    Column c = w.col;
  //啟動(dòng)Bigtable行事務(wù)
    bigtable::Txn T = bigtable::StartRowTransaction(w.row);
    // Abort on writes after our start timestamp ...
  // 存在比事務(wù)啟動(dòng)時(shí)間start_ts更大的值,存在ww沖突,按照FUW原則,本事務(wù)回滾
    if (T.Read(w.row, c+"write", [start_ts_, ∞]))
      return false;
    // ... or locks at any timestamp.
  // 存在鎖,說(shuō)明未完成的寫,即存在ww沖突,且其他事務(wù)比本事務(wù)更"早"獲得鎖,本事務(wù)回滾
    if (T.Read(w.row, c+"lock", [0, ∞]))
      return false;

  //校驗(yàn)完畢,可以寫數(shù)據(jù)
  //寫入數(shù)據(jù):key=Row+Column(data)+start_ts,value=需寫入的值
    T.Write(w.row, c+"data", start_ts_, w.value);
  //上鎖,key=Row+Column(lock)+start_ts,value=主節(jié)點(diǎn)的行&列
    T.Write(w.row, c+"lock", start_ts_,
      {primary.row, primary.col}); // The primary’s location.
  //執(zhí)行提交操作
    return T.Commit();
  }

  //提交操作
  /*
  輸入:無(wú)
  輸出:成功返回true,失敗返回false
  */
  bool Commit() {
    // The primary’s location.
  // 數(shù)組writes_的第一個(gè)元素為主節(jié)點(diǎn)
    Write primary = writes_[0];
  // 除第一個(gè)元素外,其他為從節(jié)點(diǎn)
    vector<Write> secondaries(writes_.begin()+1, writes_.end());
  //主節(jié)點(diǎn)預(yù)寫入失敗
    if (!Prewrite(primary, primary))
      return false;
    //遍歷從節(jié)點(diǎn),執(zhí)行預(yù)寫入,一個(gè)節(jié)點(diǎn)不成功則全部失敗
    for (Write w : secondaries)
      if (!Prewrite(w, primary))
        return false;

    //獲取事務(wù)提交時(shí)間戳
    int commit_ts = oracle_.GetTimestamp();
    // Commit primary first.
  // 主節(jié)點(diǎn)首先提交
    Write p = primary;
    //啟動(dòng)Bigtable事務(wù)
  bigtable::Txn T = bigtable::StartRowTransaction(p.row);
  //謹(jǐn)慎起見,判斷是否存在鎖(本事務(wù),start_ts唯一),避免重復(fù)寫入
    if (!T.Read(p.row, p.col+"lock", [start_ts_, start_ts_]))
      return false; // aborted while working
    //寫入:key=Row+Column(write)+commit_ts,value=start_ts,實(shí)際的值在key=Row+Column(data)+start_ts中
    T.Write(p.row, p.col+"write", commit_ts, start_ts_); // Pointer to data written at start_ts_.
  //刪除鎖
    T.Erase(p.row, p.col+"lock", commit_ts);
  //Bigtable事務(wù)提交
    if (!T.Commit())
      return false; // commit point
    // Second phase: write out write records for secondary cells.
  //遍歷從節(jié)點(diǎn),寫key=Row+Column(write)+commit_ts,value=start_ts,同時(shí)刪除鎖
    for (Write w : secondaries) {
      bigtable::Write(w.row, w.col+"write", commit_ts, start_ts_);
      bigtable::Erase(w.row, w.col+"lock", commit_ts);
    }
    return true;
  }
} // class Transaction

三、openGauss對(duì)PostgreSQL SI的增強(qiáng)

在PostgreSQL中,可通過(guò)函數(shù)txid_current_snapshot()可獲取當(dāng)前的快照信息:

11:05:16 (xdb@[local]:5432)testdb=# select txid_current_snapshot();
 txid_current_snapshot 
-----------------------
 2404:2404:
(1 row)

11:24:11 (xdb@[local]:5432)testdb=# 

輸出格式為ST1 : ST2 : xip_list
其中:
ST1:最早仍活躍的事務(wù)ID,早于此XID的事務(wù)要么被提交并可見,要么回滾丟棄。
ST2:最后已完結(jié)事務(wù)(COMMITTED/ABORTED)的事務(wù)ID + 1。
xip_list:在"拍攝"快照時(shí)仍進(jìn)行中的事務(wù)ID。該列表包含xmin和xmax之間的活動(dòng)事務(wù)ID。
總結(jié)一下,簡(jiǎn)單來(lái)說(shuō),對(duì)于給定的事務(wù)XID:
XID ∈ [1,ST1),過(guò)去的事務(wù),對(duì)此快照均可見;
XID ∈ [ST1,ST2),不考慮子事務(wù)的情況,仍處于IN_PROGRESS狀態(tài)的,不可見;COMMITED狀態(tài),可見;ABORTED狀態(tài),不可見;
XID ∈ [ST2,∞),未來(lái)的事務(wù),對(duì)此快照均不可見;

可以看到,對(duì)于XID ∈ [ST1,ST2)的事務(wù)ID,其判斷邏輯的復(fù)雜度與活躍事務(wù)數(shù)n成正比,算法復(fù)雜度為O(n)。
但從快照隔離的機(jī)制以及基于時(shí)間戳的并發(fā)控制的理論可以看到,只要獲取讀時(shí)間(不同的隔離級(jí)別,讀時(shí)間不同)之前已提交事務(wù)的數(shù)據(jù)項(xiàng)版本即可,而這個(gè)算法復(fù)雜度可以降低為O(1)。
openGauss引入了數(shù)據(jù)結(jié)構(gòu)CSN(CommitSeqNo),其優(yōu)化原理如下圖所示:

openGauss CSN示意圖

已提交的XID映射為CSN(可以理解為邏輯意義上的時(shí)間戳),每次讀的時(shí)候,只需要把當(dāng)前讀的邏輯時(shí)間戳與事務(wù)提交的邏輯時(shí)間戳進(jìn)行簡(jiǎn)單的對(duì)比即可得到可讀的元組版本。

下面是相關(guān)的代碼片段:

typedef struct SnapshotData {
    SnapshotSatisfiesFunc satisfies; /* tuple test function */
    ...
    CommitSeqNo snapshotcsn;
    ...
}

//獲取快照
#ifndef ENABLE_MULTIPLE_NODES
Snapshot GetSnapshotData(Snapshot snapshot, bool force_local_snapshot, bool forHSFeedBack)
#else
Snapshot GetSnapshotData(Snapshot snapshot, bool force_local_snapshot)
#endif
{    
    ...
    //從線程變量中獲取CSN
    snapshot->snapshotcsn = t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo;
    ...
}

openGauss提供了CSN相關(guān)的函數(shù)獲?。?/p>

testdb=# \df *csn*
                                                                   List of functions
   Schema   |            Name            | Result data type |                  Argument data types                   |  Type  | fencedmode | propackage
------------+----------------------------+------------------+--------------------------------------------------------+--------+------------+------------
 pg_catalog | gs_get_next_xid_csn        | SETOF record     | OUT node_name text, OUT next_xid xid, OUT next_csn xid | normal | f          | f
 pg_catalog | pg_export_snapshot_and_csn | record           | OUT snapshot text, OUT "CSN" text                      | normal | f          | f
 pg_catalog | pgxc_get_csn               | SETOF bigint     | xid                                                    | normal | f          | f
(3 rows)

-- 獲取下一個(gè)XID和CSN
testdb=#  select gs_get_next_xid_csn();
 gs_get_next_xid_csn
---------------------
 (gaussdb,9540,1919)
(1 row)

-- 根據(jù)XID獲取相應(yīng)的CSN
testdb=# select pgxc_get_csn('9537'::xid);
 pgxc_get_csn
--------------
         1918
(1 row)

四、參考資料

[1] Hector Garcia-Molina,Jeffrey D.Ullman,Jennifer Widom,《數(shù)據(jù)庫(kù)系統(tǒng)實(shí)現(xiàn)(第2版)》
[2] Daniel Peng,F(xiàn)rank Dabek,Large-scale Incremental Processing Using Distributed Transactions and Notifications
[3] 李國(guó)良,周敏奇,《openGauss數(shù)據(jù)庫(kù)核心技術(shù)》

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容