多線程腳本檢查啟動(dòng)
bool AppInitMain(Config &config, boost::thread_group &threadGroup, CScheduler &scheduler) {
...
// 腳本檢查線程; 啟動(dòng)多個(gè)腳本檢查線程
if (nScriptCheckThreads) {
for (int i = 0; i < nScriptCheckThreads - 1; i++) {
threadGroup.create_thread(&ThreadScriptCheck);
}
}
...
}
static CCheckQueue<CScriptCheck> scriptcheckqueue(128);
void ThreadScriptCheck() {
RenameThread("bitcoin-scriptch");
scriptcheckqueue.Thread();
}
此處使用了boost的線程庫(kù),在綁定的線程函數(shù)ThreadScriptCheck中,調(diào)用一個(gè)全局狀態(tài)的任務(wù)隊(duì)列scriptcheckqueue;每個(gè)線程都去該隊(duì)列中去任務(wù),當(dāng)隊(duì)列中無(wú)任務(wù)可執(zhí)行時(shí),線程被條件變量阻塞。
任務(wù)隊(duì)列
//模板類,執(zhí)行的驗(yàn)證任務(wù)由T標(biāo)識(shí),T都必須提供一個(gè)重載的operator()方法,并且反回一個(gè)bool。
//默認(rèn)為主線程push 批量任務(wù)到隊(duì)列中,其他的工作線程去處理這些任務(wù),當(dāng)主線程push完任務(wù)后,也去處理這些任務(wù),直到任務(wù)隊(duì)列全部處理完畢。
template <typename T> class CCheckQueue {
private:
// 互斥鎖保護(hù)內(nèi)部的狀態(tài)
boost::mutex mutex;
// 在沒(méi)有工作時(shí),工作線程阻塞條件變量。
boost::condition_variable condWorker;
// 在沒(méi)有工作時(shí),master線程阻塞條件變量。
boost::condition_variable condMaster;
// 要處理元素的隊(duì)列。
std::vector<T> queue;
// 空閑的工作線程數(shù)量(包含主線程)
int nIdle;
// 總的工作線程的數(shù)量,包含主線程
int nTotal;
// 臨時(shí)評(píng)估結(jié)果
bool fAllOk;
// 還有多少驗(yàn)證任務(wù)沒(méi)有完成。包括不再排隊(duì),但仍在工作線程自己的批次中的任務(wù)數(shù)量。
unsigned int nTodo;
// 是否需要退出。
bool fQuit;
// 每個(gè)批次最大的元素處理數(shù)量
unsigned int nBatchSize;
//內(nèi)部函數(shù),隊(duì)列中的任務(wù)在此處將被全部處理。
bool Loop(bool fMaster = false);
public:
//! Create a new check queue
CCheckQueue(unsigned int nBatchSizeIn)
: nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false),
nBatchSize(nBatchSizeIn) {}
//! Worker thread;此處為工作線程進(jìn)行調(diào)用,
void Thread() { Loop(); }
//在代碼中主要是主線程進(jìn)行調(diào)用,等待任務(wù)全部處理完畢,返回處理結(jié)果
bool Wait() { return Loop(true); }
//! 給類的內(nèi)部隊(duì)列批量添加任務(wù);本次操作受鎖保護(hù);并更新所有的狀態(tài)。
void Add(std::vector<T> &vChecks) {
boost::unique_lock<boost::mutex> lock(mutex);
//將任務(wù)添加至 內(nèi)部隊(duì)列中
for (T &check : vChecks) {
queue.push_back(std::move(check));
}
// 更新未完成的任務(wù)數(shù)量
nTodo += vChecks.size();
//如果剛添加的任務(wù)數(shù)量為1,只喚醒一個(gè)工作線程去處理;否則,喚醒全部工作線程。
if (vChecks.size() == 1) {
condWorker.notify_one();
} else if (vChecks.size() > 1) {
condWorker.notify_all();
}
}
// 任務(wù)處理隊(duì)列是否處于休眠狀態(tài)
bool IsIdle() {
boost::unique_lock<boost::mutex> lock(mutex);
return (nTotal == nIdle && nTodo == 0 && fAllOk == true);
}
~CCheckQueue() {}
}
//fMaster : 標(biāo)識(shí)是否為主線程在調(diào)用
bool CCheckQueue::Loop(bool fMaster = false){
// 根據(jù)參數(shù),選擇條件變量
boost::condition_variable &cond = fMaster ? condMaster : condWorker;
std::vector<T> vChecks; //臨時(shí)任務(wù)隊(duì)列
vChecks.reserve(nBatchSize);
unsigned int nNow = 0;
bool fOk = true;
do {
{
boost::unique_lock<boost::mutex> lock(mutex); //自動(dòng)管理資源鎖;
// first do the clean-up of the previous loop run (allowing us
// to do it in the same critsect) 對(duì)上一次的運(yùn)行狀態(tài)做清理。
if (nNow) {
fAllOk &= fOk;
nTodo -= nNow;
if (nTodo == 0 && !fMaster)
// We processed the last element; inform the master it
// can exit and return the result 處理最后一個(gè)元素,通知master它可以退出,并且返回結(jié)果
condMaster.notify_one();
} else {
// nNow == 0,標(biāo)識(shí)該線程第一次運(yùn)行,即線程的數(shù)量又增加了一個(gè)。
nTotal++;
}
// 該對(duì)象的任務(wù)隊(duì)列為空時(shí),進(jìn)入下列條件;
// 下面處理分為兩種情況:1. 任務(wù)全部完成后,將主線程/或退出狀態(tài)時(shí),退出主線程或所有的線程。
// 2. 任務(wù)全部完成后,將子線程
while (queue.empty()) {
// 當(dāng)驗(yàn)證任務(wù)為0,且需要退出時(shí),
if ((fMaster || fQuit) && nTodo == 0) {
nTotal--;
bool fRet = fAllOk; //fAllOk : 最新的臨時(shí)評(píng)估結(jié)果。對(duì)該值做緩存,然后退出。
// reset the status for new work later
if (fMaster) fAllOk = true;
// return the current status //返回當(dāng)前的狀態(tài)。
return fRet;
}
nIdle++;
cond.wait(lock); // wait 此處配合條件變量使用。進(jìn)行線程間的同步;
nIdle--;
}
// 獲取當(dāng)前每個(gè)線程每個(gè)任務(wù)循環(huán)時(shí)處理的任務(wù)數(shù)量;
nNow = std::max(
1U, std::min(nBatchSize, (unsigned int)queue.size() /
(nTotal + nIdle + 1)));
//從類的任務(wù)隊(duì)列中 向臨時(shí)隊(duì)列中添加任務(wù)。
vChecks.resize(nNow);
for (unsigned int i = 0; i < nNow; i++) {
// 想讓鎖的時(shí)間盡可能的短,所以采用這種方法(move)來(lái)從 類的隊(duì)列中拿到任務(wù),而不是采用拷貝的方式。
vChecks[i].swap(queue.back());
queue.pop_back(); //將放到局部隊(duì)列中的任務(wù)清除
}
fOk = fAllOk;
}
// execute work; 執(zhí)行本線程剛分到的工作。
for (T &check : vChecks) {
if (fOk) fOk = check();
}
// 執(zhí)行完后,清空臨時(shí)任務(wù)的集合。繼續(xù)下次循環(huán)。
vChecks.clear();
} while (true);
}
上述是隊(duì)列的實(shí)現(xiàn):主要的任務(wù)處理是在Loop()函數(shù)中;
該隊(duì)列會(huì)進(jìn)行兩種調(diào)用,來(lái)處理隊(duì)列中的任務(wù):
>* 向添加任務(wù)后:自動(dòng)喚醒阻塞的工作線程去處理添加的任務(wù);細(xì)節(jié)請(qǐng)看:void Add(std::vector<T> &vChecks)
>* 主線程添加完任務(wù)后,調(diào)用bool Wait(),也去處理隊(duì)列中的任務(wù),隊(duì)列中的全部任務(wù)處理完后,主線程退出。
采用RAII機(jī)制去操作任務(wù)隊(duì)列
template <typename T> class CCheckQueueControl {
private:
CCheckQueue<T> *pqueue;
bool fDone;
public:
CCheckQueueControl(CCheckQueue<T> *pqueueIn)
: pqueue(pqueueIn), fDone(false) {
// 用來(lái)構(gòu)建該對(duì)象的任務(wù)隊(duì)列只能是nil, 或者隊(duì)列中無(wú)任務(wù)。
// 因?yàn)閯?chuàng)建的該對(duì)象在析構(gòu)時(shí)會(huì)調(diào)用任務(wù)隊(duì)列的wait()方法去處理完隊(duì)列中所有的任務(wù),然后退出:
if (pqueue != nullptr) {
bool isIdle = pqueue->IsIdle(); //獲取該隊(duì)列是否空閑
assert(isIdle);
}
}
//處理完隊(duì)列中的所有任務(wù)后,該方法退出,并返回這些任務(wù)的處理結(jié)果
bool Wait() {
if (pqueue == nullptr) return true;
bool fRet = pqueue->Wait(); //執(zhí)行完所有的任務(wù)后,
fDone = true;
return fRet;
}
// 向 CCheckQueue 中添加任務(wù);喚醒子線程去處理。
void Add(std::vector<T> &vChecks) {
if (pqueue != nullptr) pqueue->Add(vChecks);
}
// 對(duì)象析構(gòu)時(shí),調(diào)用wait()方法保證了該隊(duì)列中的所有任務(wù)都被處理。
~CCheckQueueControl() {
if (!fDone) Wait();
}
};
該類主要是用來(lái)管理 CCheckQueue對(duì)象;采用RAII機(jī)制,保證每次析構(gòu)該類的對(duì)象時(shí),CCheckQueue中的任務(wù)隊(duì)列被全部處理。
//將該區(qū)塊鏈接到當(dāng)前激活鏈上,并更新UTXO集合。
// block(in):將要鏈接到激活鏈上的區(qū)塊(帶有完整數(shù)據(jù)); pindex(in):該鏈接塊對(duì)應(yīng)的索引;
static bool ConnectBlock(const Config &config, const CBlock &block, CValidationState &state, CBlockIndex *pindex,
CCoinsViewCache &view, const CChainParams &chainparams, bool fJustCheck = false) {
...
CCheckQueueControl<CScriptCheck> control(fScriptChecks ? &scriptcheckqueue : nullptr);
...
for (size_t i = 0; i < block.vtx.size(); i++) {
...
if (!tx.IsCoinBase()) {
Amount fee = view.GetValueIn(tx) - tx.GetValueOut();
nFees += fee.GetSatoshis();
// Don't cache results if we're actually connecting blocks (still
// consult the cache, though).
bool fCacheResults = fJustCheck;
std::vector<CScriptCheck> vChecks;
// 檢查交易交易,并將的交易每個(gè)輸入構(gòu)造成對(duì)應(yīng)的可檢查對(duì)象`CScriptCheck`,放入臨時(shí)集合中,然后添加進(jìn)任務(wù)隊(duì)列中。
if (!CheckInputs(tx, state, view, fScriptChecks, flags,
fCacheResults, fCacheResults,
PrecomputedTransactionData(tx), &vChecks)) {
return error("ConnectBlock(): CheckInputs on %s failed with %s",
tx.GetId().ToString(), FormatStateMessage(state));
}
control.Add(vChecks); //向驗(yàn)證線程中添加任務(wù);添加完后,此時(shí)其他的任務(wù)線程就開(kāi)始執(zhí)行。
}
...
}
...
}
在該方法中:使用了全局對(duì)象scriptcheckqueue去構(gòu)造了一個(gè)臨時(shí)的管理對(duì)象,并通過(guò)該管理對(duì)象來(lái)操作全局任務(wù)隊(duì)列:添加任務(wù),執(zhí)行任務(wù);當(dāng)該臨時(shí)的管理對(duì)象析構(gòu)時(shí),會(huì)調(diào)用wait()方法,加入任務(wù)處理,處理完所有任務(wù)后,該對(duì)象析構(gòu)完成。
CScriptCheck(根據(jù)每個(gè)交易輸入構(gòu)造的檢查任務(wù))
class CScriptCheck {
private:
CScript scriptPubKey; //鎖定腳本(即該驗(yàn)證交易的某個(gè)引用輸出對(duì)應(yīng)的鎖定腳本)
Amount amount; //上述鎖定腳本對(duì)應(yīng) 的金額(即花費(fèi)的UTXO的金額)
const CTransaction *ptxTo; //正在花費(fèi)的交易,即要檢查的交易
unsigned int nIn; //要檢查該交易的第幾個(gè)輸入;
uint32_t nFlags; //檢查標(biāo)識(shí)
bool cacheStore;
ScriptError error; //驗(yàn)證出錯(cuò)的原因
PrecomputedTransactionData txdata;
public:
CScriptCheck()
: amount(0), ptxTo(0), nIn(0), nFlags(0), cacheStore(false),
error(SCRIPT_ERR_UNKNOWN_ERROR), txdata() {}
CScriptCheck(const CScript &scriptPubKeyIn, const Amount amountIn,
const CTransaction &txToIn, unsigned int nInIn,
uint32_t nFlagsIn, bool cacheIn,
const PrecomputedTransactionData &txdataIn)
: scriptPubKey(scriptPubKeyIn), amount(amountIn), ptxTo(&txToIn),
nIn(nInIn), nFlags(nFlagsIn), cacheStore(cacheIn),
error(SCRIPT_ERR_UNKNOWN_ERROR), txdata(txdataIn) {}
bool operator()(); //此處重載了()運(yùn)算符,執(zhí)行腳本檢查操作;詳情見(jiàn)下集:腳本驗(yàn)證
// 采用這種方式對(duì)新對(duì)象進(jìn)行賦值,避免拷貝賦值,節(jié)省時(shí)間。
void swap(CScriptCheck &check) {
scriptPubKey.swap(check.scriptPubKey);
std::swap(ptxTo, check.ptxTo);
std::swap(amount, check.amount);
std::swap(nIn, check.nIn);
std::swap(nFlags, check.nFlags);
std::swap(cacheStore, check.cacheStore);
std::swap(error, check.error);
std::swap(txdata, check.txdata);
}
ScriptError GetScriptError() const { return error; }
};
本文由 Copernicus團(tuán)隊(duì) 姚永芯寫作,轉(zhuǎn)載無(wú)需授權(quán)。