并發(fā)控制是多個(gè)事務(wù)在并發(fā)運(yùn)行時(shí),數(shù)據(jù)庫(kù)保證事務(wù)一致性(Consistency)和隔離性(Isolation)的一種機(jī)制。主流商用關(guān)系數(shù)據(jù)庫(kù)使用的并發(fā)控制技術(shù)主要有三種:嚴(yán)格兩階段封鎖(S2PL)、多版本并發(fā)控制(MVCC)和樂(lè)觀并發(fā)控制(OCC)。
本文介紹了基于時(shí)間戳的并發(fā)控制,包括常規(guī)基于時(shí)間戳的并發(fā)控制、多版本基于時(shí)間戳鎖的并發(fā)控制、與時(shí)間戳相關(guān)的Google Percolator工程實(shí)踐和openGauss對(duì)PostgreSQL SI的優(yōu)化改進(jìn)等。
一、基于時(shí)間戳的并發(fā)控制
基于時(shí)間戳的并發(fā)控制,有常規(guī)的“單”版本并發(fā)控制,也有多版本基于時(shí)間戳并發(fā)控制。
1、單版本基于時(shí)間戳的并發(fā)控制
事務(wù)T在啟動(dòng)時(shí)分配一個(gè)唯一的時(shí)間戳TS(T),并發(fā)控制保證存在一個(gè)等價(jià)的串行調(diào)度,該調(diào)度中的事務(wù)是按事務(wù)的啟動(dòng)時(shí)間先后順序,亦即事務(wù)按它們的啟動(dòng)順序串行化而不是按提交的順序串行化。
為了實(shí)現(xiàn)這一點(diǎn),基于時(shí)間戳順序并發(fā)控制使用以下數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)數(shù)據(jù)項(xiàng)x的以下信息:
1.rt(x):任何讀x的事務(wù)的最大時(shí)間戳
2.wt(x):任何寫x的事務(wù)的最大時(shí)間戳
3.f(x):標(biāo)志,用于表明最后寫x的事務(wù)是否已提交
使用上述數(shù)據(jù)結(jié)構(gòu)進(jìn)行讀寫并發(fā)控制的算法如下:
當(dāng)事務(wù)T1請(qǐng)求讀x時(shí)
1.第一種情況:TS(T1)<\wt(x)
讀事務(wù)的啟動(dòng)時(shí)間小于最晚的寫時(shí)間,意味著其他事務(wù)在T1后寫入了新值,T1希望讀到的原值已不存在,T1由于太“老”不能讀,T1必須回滾
2.第二種情況:TS(T1)>wt(x)
讀事務(wù)的啟動(dòng)時(shí)間大于最晚寫入時(shí)間,這時(shí)候又分為分兩種情況:
A.f(x)標(biāo)記x已提交,則請(qǐng)求被批準(zhǔn),如TS(T1)>rt(x),則更新rt(x)為TS(T1)
B.f(x)標(biāo)記x未提交,則T1必須等待(否則會(huì)導(dǎo)致臟讀)
當(dāng)事務(wù)T1請(qǐng)求寫x時(shí)
1.第一種情況:TS(T1)<\rt(x)
寫事務(wù)的時(shí)間比最晚的讀時(shí)間要早,意味著過(guò)晚的寫,T1太老不能寫,T1必須回滾
2.第二種情況:rt(x)<TS(T1)<\wt(x)
寫事務(wù)的啟動(dòng)時(shí)間大于最晚讀時(shí)間,小于最晚寫時(shí)間
如f(x)標(biāo)記x已提交,則根據(jù)Thomas寫規(guī)則,請(qǐng)求被批準(zhǔn),但沒(méi)有任何的實(shí)際動(dòng)作(因?yàn)門1寫的x值不會(huì)被任何其他事務(wù)讀?。?br>
如f(x)標(biāo)記x未提交,T1必須等待(否則會(huì)導(dǎo)致臟寫)
3.第三種情況:wt(x)<\rt(x)<TS(T1)
寫事務(wù)的啟動(dòng)時(shí)間比最晚讀時(shí)間要大,且最晚讀時(shí)間比最晚寫時(shí)間要晚
如f(x)標(biāo)記x未提交,那么T1必須等待(否則會(huì)導(dǎo)致臟寫)
如f(x)標(biāo)記x已提交,那么請(qǐng)求被批準(zhǔn),TS(T1)值賦予wt(x),f(x)值設(shè)置為未提交
從上述算法也可以看出該策略的一些缺點(diǎn):
1.額外的空間開銷:每個(gè)數(shù)據(jù)項(xiàng)都需要記錄rt(x),wt(x),f(x)
2.額外的“事務(wù)控制”:rt(x),wt(x),f(x)這些信息存儲(chǔ)在數(shù)據(jù)庫(kù)中,必須像數(shù)據(jù)項(xiàng)一樣進(jìn)行更新,如果事務(wù)中止,它們必須被回退
3.額外的處理邏輯:對(duì)x的讀意味著對(duì)rt(x)的寫,邏輯復(fù)雜
2、多版本基于時(shí)間戳的并發(fā)控制
相對(duì)于單版本,多版本時(shí)間戳并發(fā)控制有以下調(diào)整:
1.當(dāng)事務(wù)T的w(X)發(fā)生時(shí),如果合法,則創(chuàng)建一個(gè)X的新版本,其寫時(shí)間為事務(wù)T的啟動(dòng)時(shí)間,這個(gè)X的新版本可以記作Xt
2.當(dāng)事務(wù)T的r(X)發(fā)生時(shí),事務(wù)調(diào)度器找到X的版本Xt,需滿足的調(diào)節(jié)是t≤TS(T),并且在t之后不存在另外一個(gè)X的版本
3.寫時(shí)間與數(shù)據(jù)項(xiàng)的版本相關(guān)且永不改變
4.讀時(shí)間與數(shù)據(jù)項(xiàng)的版本相關(guān),用于拒絕小于該讀時(shí)間的寫操作,因?yàn)槿绻试S改寫,那么讀操作應(yīng)讀取該寫操作的值,但這是不可能發(fā)生的
二、Google Percolator
Google Percolator的背景源于搜索索引系統(tǒng)存儲(chǔ)數(shù)十PB的數(shù)據(jù),每天有數(shù)十億的更新,而MapReduce適合處理大批量的數(shù)據(jù),而處理少量更新則顯得效率較低,因此Percolator應(yīng)運(yùn)而生:為處理增量更新而設(shè)計(jì),處理同樣數(shù)據(jù)量的文檔時(shí)可以將平均時(shí)延降低50%。
Percolator使用多版本基于時(shí)間戳并結(jié)合封鎖技術(shù)的并發(fā)控制,可以實(shí)現(xiàn):
1.保證讀事務(wù)可以讀到最近最新已提交的數(shù)據(jù)版本;
2.讀不會(huì)阻塞寫(但寫會(huì)阻塞讀);
3.寫-寫沖突時(shí),使用FCW協(xié)議。
Percolator網(wǎng)上介紹的文章已有很多,其關(guān)鍵的數(shù)據(jù)結(jié)構(gòu)包括:
1.事務(wù)啟動(dòng)時(shí)間start_ts
2.事務(wù)提交時(shí)間commit_ts
3.鎖,KV結(jié)構(gòu),key=Row+Column(lock)+start_ts,value=primary row&column
4.實(shí)際列值data,KV結(jié)構(gòu),包括數(shù)據(jù)指針和實(shí)際數(shù)據(jù),KV結(jié)構(gòu)
1)指針:key=Row+Column(write)+commit_ts,value=start_ts
2)數(shù)據(jù):Row+Column(data)+start_ts,value=列值
在此對(duì)論文提供的偽代碼進(jìn)行更詳細(xì)的注釋解讀,其他不再贅述。
class Transaction { //class Transaction
//結(jié)構(gòu)體Write
struct Write {
Row row; //行
Column col; //列
string value; //列值
};//Write Struct
vector<Write> writes_;//數(shù)據(jù)緩存Write
int start_ts_;//事務(wù)開始時(shí)間
Transaction() : start_ts_(oracle.GetTimestamp()) {} //構(gòu)造函數(shù),初始化變量start_ts_
/*
輸入:Write結(jié)構(gòu)體
輸出:無(wú)
實(shí)現(xiàn):簡(jiǎn)單的把Write對(duì)象(列值)push到Vector中
*/
void Set(Write w) {//Set函數(shù)
writes_.push_back(w);
}
/*
輸入:Row-行標(biāo)識(shí),Column-列標(biāo)識(shí)
輸出:value-列值,成功返回true,失?。ㄈ鐩](méi)有獲取值)返回false
*/
bool Get(Row row, Column c, string* value) {
while (true) {
//Bigtable提供的行級(jí)事務(wù)
bigtable::Txn T = bigtable::StartRowTransaction(row);
// Check for locks that signal concurrent writes.
// 檢查是否存在并發(fā)事務(wù)在寫數(shù)據(jù)
// 注:SI的特點(diǎn)是寫不阻塞讀,讀不阻塞寫,但在這里卻需要等待?
// 原因是SI保證讀到的是事務(wù)開始(start_ts)之前已提交的數(shù)據(jù),
// 存在鎖意味著寫操作未完成且該操作的commit_ts可能在事務(wù)開始之前,
// 但需要在寫入之后才能知道是否在start_ts之前,因此需要等待
if (T.Read(row, c+"lock", [0, start_ts_])) { //判斷[0, start_ts_]內(nèi)是否存在lock?
// There is a pending lock; try to clean it and wait
// 仍存在lock,等待
BackoffAndMaybeCleanupLock(row, c);
continue;
}
// Find the latest write below our start timestamp.
//讀取最近已提交的數(shù)據(jù)版本
latest write = T.Read(row, c+"write", [0, start_ts_]);
if (!latest_write.found())
//沒(méi)有數(shù)據(jù),返回false
return false; // no data
//從Column+write中獲取start_ts
int data_ts = latest_write.start_timestamp();
//獲取真正的數(shù)據(jù):Row+Column(column+"data")+start_ts
*value = T.Read(row, c+"data", [data_ts, data_ts]);
return true;
}
}
// Prewrite tries to lock cell w, returning false in case of conflict.
// 預(yù)寫入(理論基礎(chǔ):通過(guò)意向表緩存數(shù)據(jù),執(zhí)行延遲更新)
/*
輸入:Write結(jié)構(gòu)體,Write主節(jié)點(diǎn)
輸出:成功返回true,失敗返回false
*/
bool Prewrite(Write w, Write primary) {
//獲取列
Column c = w.col;
//啟動(dòng)Bigtable行事務(wù)
bigtable::Txn T = bigtable::StartRowTransaction(w.row);
// Abort on writes after our start timestamp ...
// 存在比事務(wù)啟動(dòng)時(shí)間start_ts更大的值,存在ww沖突,按照FUW原則,本事務(wù)回滾
if (T.Read(w.row, c+"write", [start_ts_, ∞]))
return false;
// ... or locks at any timestamp.
// 存在鎖,說(shuō)明未完成的寫,即存在ww沖突,且其他事務(wù)比本事務(wù)更"早"獲得鎖,本事務(wù)回滾
if (T.Read(w.row, c+"lock", [0, ∞]))
return false;
//校驗(yàn)完畢,可以寫數(shù)據(jù)
//寫入數(shù)據(jù):key=Row+Column(data)+start_ts,value=需寫入的值
T.Write(w.row, c+"data", start_ts_, w.value);
//上鎖,key=Row+Column(lock)+start_ts,value=主節(jié)點(diǎn)的行&列
T.Write(w.row, c+"lock", start_ts_,
{primary.row, primary.col}); // The primary’s location.
//執(zhí)行提交操作
return T.Commit();
}
//提交操作
/*
輸入:無(wú)
輸出:成功返回true,失敗返回false
*/
bool Commit() {
// The primary’s location.
// 數(shù)組writes_的第一個(gè)元素為主節(jié)點(diǎn)
Write primary = writes_[0];
// 除第一個(gè)元素外,其他為從節(jié)點(diǎn)
vector<Write> secondaries(writes_.begin()+1, writes_.end());
//主節(jié)點(diǎn)預(yù)寫入失敗
if (!Prewrite(primary, primary))
return false;
//遍歷從節(jié)點(diǎn),執(zhí)行預(yù)寫入,一個(gè)節(jié)點(diǎn)不成功則全部失敗
for (Write w : secondaries)
if (!Prewrite(w, primary))
return false;
//獲取事務(wù)提交時(shí)間戳
int commit_ts = oracle_.GetTimestamp();
// Commit primary first.
// 主節(jié)點(diǎn)首先提交
Write p = primary;
//啟動(dòng)Bigtable事務(wù)
bigtable::Txn T = bigtable::StartRowTransaction(p.row);
//謹(jǐn)慎起見,判斷是否存在鎖(本事務(wù),start_ts唯一),避免重復(fù)寫入
if (!T.Read(p.row, p.col+"lock", [start_ts_, start_ts_]))
return false; // aborted while working
//寫入:key=Row+Column(write)+commit_ts,value=start_ts,實(shí)際的值在key=Row+Column(data)+start_ts中
T.Write(p.row, p.col+"write", commit_ts, start_ts_); // Pointer to data written at start_ts_.
//刪除鎖
T.Erase(p.row, p.col+"lock", commit_ts);
//Bigtable事務(wù)提交
if (!T.Commit())
return false; // commit point
// Second phase: write out write records for secondary cells.
//遍歷從節(jié)點(diǎn),寫key=Row+Column(write)+commit_ts,value=start_ts,同時(shí)刪除鎖
for (Write w : secondaries) {
bigtable::Write(w.row, w.col+"write", commit_ts, start_ts_);
bigtable::Erase(w.row, w.col+"lock", commit_ts);
}
return true;
}
} // class Transaction
三、openGauss對(duì)PostgreSQL SI的增強(qiáng)
在PostgreSQL中,可通過(guò)函數(shù)txid_current_snapshot()可獲取當(dāng)前的快照信息:
11:05:16 (xdb@[local]:5432)testdb=# select txid_current_snapshot();
txid_current_snapshot
-----------------------
2404:2404:
(1 row)
11:24:11 (xdb@[local]:5432)testdb=#
輸出格式為ST1 : ST2 : xip_list
其中:
ST1:最早仍活躍的事務(wù)ID,早于此XID的事務(wù)要么被提交并可見,要么回滾丟棄。
ST2:最后已完結(jié)事務(wù)(COMMITTED/ABORTED)的事務(wù)ID + 1。
xip_list:在"拍攝"快照時(shí)仍進(jìn)行中的事務(wù)ID。該列表包含xmin和xmax之間的活動(dòng)事務(wù)ID。
總結(jié)一下,簡(jiǎn)單來(lái)說(shuō),對(duì)于給定的事務(wù)XID:
XID ∈ [1,ST1),過(guò)去的事務(wù),對(duì)此快照均可見;
XID ∈ [ST1,ST2),不考慮子事務(wù)的情況,仍處于IN_PROGRESS狀態(tài)的,不可見;COMMITED狀態(tài),可見;ABORTED狀態(tài),不可見;
XID ∈ [ST2,∞),未來(lái)的事務(wù),對(duì)此快照均不可見;
可以看到,對(duì)于XID ∈ [ST1,ST2)的事務(wù)ID,其判斷邏輯的復(fù)雜度與活躍事務(wù)數(shù)n成正比,算法復(fù)雜度為O(n)。
但從快照隔離的機(jī)制以及基于時(shí)間戳的并發(fā)控制的理論可以看到,只要獲取讀時(shí)間(不同的隔離級(jí)別,讀時(shí)間不同)之前已提交事務(wù)的數(shù)據(jù)項(xiàng)版本即可,而這個(gè)算法復(fù)雜度可以降低為O(1)。
openGauss引入了數(shù)據(jù)結(jié)構(gòu)CSN(CommitSeqNo),其優(yōu)化原理如下圖所示:

已提交的XID映射為CSN(可以理解為邏輯意義上的時(shí)間戳),每次讀的時(shí)候,只需要把當(dāng)前讀的邏輯時(shí)間戳與事務(wù)提交的邏輯時(shí)間戳進(jìn)行簡(jiǎn)單的對(duì)比即可得到可讀的元組版本。
下面是相關(guān)的代碼片段:
typedef struct SnapshotData {
SnapshotSatisfiesFunc satisfies; /* tuple test function */
...
CommitSeqNo snapshotcsn;
...
}
//獲取快照
#ifndef ENABLE_MULTIPLE_NODES
Snapshot GetSnapshotData(Snapshot snapshot, bool force_local_snapshot, bool forHSFeedBack)
#else
Snapshot GetSnapshotData(Snapshot snapshot, bool force_local_snapshot)
#endif
{
...
//從線程變量中獲取CSN
snapshot->snapshotcsn = t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo;
...
}
openGauss提供了CSN相關(guān)的函數(shù)獲?。?/p>
testdb=# \df *csn*
List of functions
Schema | Name | Result data type | Argument data types | Type | fencedmode | propackage
------------+----------------------------+------------------+--------------------------------------------------------+--------+------------+------------
pg_catalog | gs_get_next_xid_csn | SETOF record | OUT node_name text, OUT next_xid xid, OUT next_csn xid | normal | f | f
pg_catalog | pg_export_snapshot_and_csn | record | OUT snapshot text, OUT "CSN" text | normal | f | f
pg_catalog | pgxc_get_csn | SETOF bigint | xid | normal | f | f
(3 rows)
-- 獲取下一個(gè)XID和CSN
testdb=# select gs_get_next_xid_csn();
gs_get_next_xid_csn
---------------------
(gaussdb,9540,1919)
(1 row)
-- 根據(jù)XID獲取相應(yīng)的CSN
testdb=# select pgxc_get_csn('9537'::xid);
pgxc_get_csn
--------------
1918
(1 row)
四、參考資料
[1] Hector Garcia-Molina,Jeffrey D.Ullman,Jennifer Widom,《數(shù)據(jù)庫(kù)系統(tǒng)實(shí)現(xiàn)(第2版)》
[2] Daniel Peng,F(xiàn)rank Dabek,Large-scale Incremental Processing Using Distributed Transactions and Notifications
[3] 李國(guó)良,周敏奇,《openGauss數(shù)據(jù)庫(kù)核心技術(shù)》