交易隊(duì)列的輸入
交易隊(duì)列的輸入有兩個(gè),分別是接收到其他節(jié)點(diǎn)的廣播交易和自身節(jié)點(diǎn)提交的交易。
分別來(lái)看這兩種輸入方式:
-
接收廣播交易
在前面區(qū)塊鏈同步章節(jié)中提到過(guò),接收到交易后會(huì)通過(guò)調(diào)用TransactionQueue::enqueue()來(lái)將新交易放入交易隊(duì)列中,這個(gè)函數(shù)代碼非常簡(jiǎn)單:void TransactionQueue::enqueue(RLP const& _data, h512 const& _nodeId) { bool queued = false; { Guard l(x_queue); unsigned itemCount = _data.itemCount(); for (unsigned i = 0; i < itemCount; ++i) { if (m_unverified.size() >= c_maxVerificationQueueSize) { LOG(m_logger) << "Transaction verification queue is full. Dropping " << itemCount - i << " transactions"; break; } m_unverified.emplace_back(UnverifiedTransaction(_data[i].data(), _nodeId)); queued = true; } } if (queued) m_queueReady.notify_all(); }
只是將交易放入m_unverified中,然后通知校驗(yàn)線程來(lái)校驗(yàn)。
再來(lái)看校驗(yàn)線程:
void TransactionQueue::verifierBody()
{
while (!m_aborting)
{
UnverifiedTransaction work;
{
unique_lock<Mutex> l(x_queue);
m_queueReady.wait(l, [&](){ return !m_unverified.empty() || m_aborting; });
if (m_aborting)
return;
work = move(m_unverified.front());
m_unverified.pop_front();
}
try
{
Transaction t(work.transaction, CheckTransaction::Cheap); //Signature will be checked later
ImportResult ir = import(t);
m_onImport(ir, t.sha3(), work.nodeId);
}
catch (...)
{
// should not happen as exceptions are handled in import.
cwarn << "Bad transaction:" << boost::current_exception_diagnostic_information();
}
}
}
這里是一個(gè)簡(jiǎn)單的生產(chǎn)消費(fèi)者隊(duì)列,先將UnverifiedTransaction取出,然后調(diào)用import()函數(shù)進(jìn)行校驗(yàn)。由于使用了線程,校驗(yàn)過(guò)程是異步的。
-
自身提交交易
節(jié)點(diǎn)自身提交的交易與上面的交易不同,是同步的,也就是直接會(huì)調(diào)用import()函數(shù)來(lái)進(jìn)行校驗(yàn)。ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, IfDropped _ik) { try { Transaction t = Transaction(_transactionRLP, CheckTransaction::Everything); return import(t, _ik); } catch (Exception const&) { return ImportResult::Malformed; } }
交易的校驗(yàn)
我們來(lái)看這個(gè)校驗(yàn)的過(guò)程,也就是TransactionQueue::import()函數(shù)。
ImportResult TransactionQueue::import(Transaction const& _transaction, IfDropped _ik)
{
if (_transaction.hasZeroSignature())
return ImportResult::ZeroSignature;
// Check if we already know this transaction.
h256 h = _transaction.sha3(WithSignature);
ImportResult ret;
{
UpgradableGuard l(m_lock);
auto ir = check_WITH_LOCK(h, _ik);
if (ir != ImportResult::Success)
return ir;
{
_transaction.safeSender(); // Perform EC recovery outside of the write lock
UpgradeGuard ul(l);
ret = manageImport_WITH_LOCK(h, _transaction);
}
}
return ret;
}
可以看到先是調(diào)用TransactionQueue::check_WITH_LOCK()函數(shù)來(lái)做一個(gè)簡(jiǎn)單檢查。檢查的過(guò)程是看這個(gè)交易是否是已經(jīng)校驗(yàn)過(guò)的,是否是之前被刪除的。
接著調(diào)用_transaction.safeSender(),這個(gè)函數(shù)是通過(guò)簽名反推sender,在交易那一節(jié)已經(jīng)說(shuō)過(guò)。最后調(diào)用manageImport_WITH_LOCK()函數(shù)來(lái)處理交易。
manageImport_WITH_LOCK()函數(shù)過(guò)程稍稍復(fù)雜,我們可以一步一步來(lái)分析。
第一步:
auto cs = m_currentByAddressAndNonce.find(_transaction.from());
if (cs != m_currentByAddressAndNonce.end())
{
auto t = cs->second.find(_transaction.nonce());
if (t != cs->second.end())
{
if (_transaction.gasPrice() < (*t->second).transaction.gasPrice())
return ImportResult::OverbidGasPrice;
else
{
h256 dropped = (*t->second).transaction.sha3();
remove_WITH_LOCK(dropped);
m_onReplaced(dropped);
}
}
}
這一步是檢查m_currentByAddressAndNonce中是否有重復(fù)項(xiàng),判斷標(biāo)準(zhǔn)是sender和nonce,如果存在重復(fù)的則檢查gasPrice,如果新的交易gasPrice更低,則校驗(yàn)失敗,否則將現(xiàn)有的交易刪除,替換為gasPrice更高的交易。
第二步是檢查m_future,檢查過(guò)程與第一步類似。
第三步:
// If valid, append to transactions.
insertCurrent_WITH_LOCK(make_pair(_h, _transaction));
LOG(m_loggerDetail) << "Queued vaguely legit-looking transaction " << _h;
while (m_current.size() > m_limit)
{
LOG(m_loggerDetail) << "Dropping out of bounds transaction " << _h;
remove_WITH_LOCK(m_current.rbegin()->transaction.sha3());
}
m_onReady();
這里調(diào)用insertCurrent_WITH_LOCK()插入隊(duì)列,然后將超出容量m_limit的部分刪除,并調(diào)用m_onReady()表示目前隊(duì)列有數(shù)據(jù)了,可以來(lái)取了。
我們?cè)賮?lái)看insertCurrent_WITH_LOCK()函數(shù)是怎么將交易插入隊(duì)列的。
void TransactionQueue::insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p)
{
if (m_currentByHash.count(_p.first))
{
cwarn << "Transaction hash" << _p.first << "already in current?!";
return;
}
Transaction const& t = _p.second;
// Insert into current
auto inserted = m_currentByAddressAndNonce[t.from()].insert(std::make_pair(t.nonce(), PriorityQueue::iterator()));
PriorityQueue::iterator handle = m_current.emplace(VerifiedTransaction(t));
inserted.first->second = handle;
m_currentByHash[_p.first] = handle;
// Move following transactions from future to current
makeCurrent_WITH_LOCK(t);
m_known.insert(_p.first);
}
先還是檢查是否有重復(fù)項(xiàng),然后把交易插入m_current里,并記下插入的位置(迭代器),再分別加入m_currentByAddressAndNonce和m_currentByHash中。
注意到末尾還有個(gè)makeCurrent_WITH_LOCK()的調(diào)用,這個(gè)是看情況將m_future里的交易移到m_current中。
交易隊(duì)列的輸出
交易隊(duì)列輸出只有一個(gè),那就是區(qū)塊block。在Block::sync()中會(huì)調(diào)用TransactionQueue::topTransactions()來(lái)取隊(duì)列的前N項(xiàng)數(shù)據(jù),再加入block中。