比特幣源碼分析:任務調(diào)度器的使用

任務調(diào)度器

Bitcoin 進程啟動后,有一個專門的線程做任務調(diào)度, 這些任務根據(jù)指定的時刻,執(zhí)行對應的函數(shù):

bool AppInitMain()
{
   .......
   // Start the lightweight task scheduler thread
    CScheduler::Function serviceLoop = boost::bind(&CScheduler::serviceQueue, &scheduler);
    threadGroup.create_thread(boost::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));
   .......
}

調(diào)度器類主要是實現(xiàn)了一個生產(chǎn)者消費者的任務隊列,只是這個任務隊列是用 std::multimap 實現(xiàn)的,map 的key表達某一時刻,map的值表達:那一時刻要執(zhí)行的函數(shù),內(nèi)部使用條件變量和鎖來保護multimap ,還有幾個bool 條件:

class CScheduler
{
public:
    CScheduler();
    ~CScheduler();
    
    typedef std::function<void(void)> Function;
    
    void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now());
    void scheduleFromNow(Function f, int64_t deltaMilliSeconds);
    void scheduleEvery(Function f, int64_t deltaMilliSeconds);
    void serviceQueue();
    void stop(bool drain=false);
    size_t getQueueInfo(boost::chrono::system_clock::time_point &first,
                        boost::chrono::system_clock::time_point &last) const;
    bool AreThreadsServicingQueue() const;

private:
    std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue;
    boost::condition_variable newTaskScheduled;
    mutable boost::mutex newTaskMutex;
    int nThreadsServicingQueue;
    bool stopRequested;
    bool stopWhenEmpty;
    bool shouldStop() const { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
};

CScheduler的client 通過調(diào)用schedule 往內(nèi)部multimap添加一個條目;
scheduleFromNow 和scheduleEvery 內(nèi)部都是調(diào)用schedule 方法實現(xiàn);
這三個方法屬于生產(chǎn)者要生產(chǎn)任務的方法, 任務的消費者調(diào)用serviceQueue等待取走任務, 然后執(zhí)行。
目前整個程序有一個全局的CScheduler實例:

  static CScheduler scheduler;

這個實例對應只有一個消費者線程, 即唯一的后臺調(diào)度器線程。
class SingleThreadedSchedulerClient 主要用途是,借助CScheduler類型,保障被添加到內(nèi)部鏈表的任務,被串行執(zhí)行:

class SingleThreadedSchedulerClient {
private:
    CScheduler *m_pscheduler;

    CCriticalSection m_cs_callbacks_pending;
    std::list<std::function<void (void)>> m_callbacks_pending;
    bool m_are_callbacks_running = false;
    void MaybeScheduleProcessQueue();
    void ProcessQueue();
    
public:
    explicit SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {}
    void AddToProcessQueue(std::function<void (void)> func);
    void EmptyQueue();
    size_t CallbacksPending();
};

使用例子

基本的使用例子:

#include <scheduler.h>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/test/unit_test.hpp>
#include <iostream>

static void doN(){
    std::cout << "output now\n";
}
static void doE(){
    for(int i = 0; i < 10; i++){
        std::cout << "i = " << i << '\n';
    }
    std::cout << '\n';
}

BOOST_AUTO_TEST_SUITE(sche_tests)
BOOST_AUTO_TEST_CASE(sche)
{
    CScheduler s;
    s.scheduleFromNow(doN, 1000); 
    s.scheduleEvery(doE, 1000); 
    boost::thread t(boost::bind(&CScheduler::serviceQueue, &s));
    boost::this_thread::sleep_for(boost::chrono::seconds{5});
    t.interrupt();
    t.join();
}

BOOST_AUTO_TEST_CASE(singlethread)
{
    CScheduler s;
    SingleThreadedSchedulerClient  sc (&s);
    for(int i = 1; i <11; i++){
        auto  f = [=]{
            std::cout << "thread " << boost::this_thread::get_id() << " print arg: " << i << '\n';
        };
            sc.AddToProcessQueue(f);
    }
    boost::thread t(boost::bind(&CScheduler::serviceQueue, &s));
    boost::this_thread::sleep_for(boost::chrono::seconds{1});
    t.interrupt();
    t.join();
}
BOOST_AUTO_TEST_SUITE_END()

進程啟動后, 全局對象連接管理器connman初始化后, connman 的Start 方法最后,通過scheduler 線程安排了一個定時任務: 每隔15分鐘, 把connman 對象內(nèi)部成員,banmap_t 類型的 setBanned, CAddrMan 類型的addrman 序列化到本地文件banlist.datpeers.dat

//init.cpp
if (!connman.Start(scheduler, connOptions)) {
        return false;
}
//net.cpp
bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
{
    ...............
    scheduler.scheduleEvery(std::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL * 1000);
}

如果錢包功能編譯使能, 會讓scheduler 線程安排每隔500毫秒刷新錢包狀態(tài)。

//init.cpp 
#ifdef ENABLE_WALLET
    StartWallets(scheduler);
#endif

//wallet/init.cpp 
void StartWallets(CScheduler& scheduler) {
    for (CWalletRef pwallet : vpwallets) {
        pwallet->postInitProcess(scheduler);
    }
}

//wallet/wallet.cpp 
void CWallet::postInitProcess(CScheduler& scheduler)
{
    ReacceptWalletTransactions();
    if (!CWallet::fFlushScheduled.exchange(true)) {
        scheduler.scheduleEvery(MaybeCompactWalletDB, 500);
    }
}

PeerLogicValidation 對象的構造函數(shù)內(nèi)部, scheduler 線程安排每45秒執(zhí)行CheckForStaleTipAndEvictPeer函數(shù)主要做兩件事:

  1. 關掉多余的外出tcp 連接
  2. 根據(jù)當前時間,檢查當前節(jié)點的blockchain 的tip 是否有可能過時了,建立額外的連接同步跟上
PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, CScheduler &scheduler) : connman(connmanIn), m_stale_tip_check_time(0) {
    // Initialize global variables that cannot be constructed at startup.
    recentRejects.reset(new CRollingBloomFilter(120000, 0.000001));

    const Consensus::Params& consensusParams = Params().GetConsensus();
    // Stale tip checking and peer eviction are on two different timers, but we
    // don't want them to get out of sync due to drift in the scheduler, so we
    // combine them in one function and schedule at the quicker (peer-eviction)
    // timer.
    static_assert(EXTRA_PEER_CHECK_INTERVAL < STALE_CHECK_INTERVAL, "peer eviction timer should be less than stale tip check timer");
    scheduler.scheduleEvery(std::bind(&PeerLogicValidation::CheckForStaleTipAndEvictPeers, this, consensusParams), EXTRA_PEER_CHECK_INTERVAL * 1000);
}

void PeerLogicValidation::CheckForStaleTipAndEvictPeers(const Consensus::Params &consensusParams)
{
    if (connman == nullptr) return;

    int64_t time_in_seconds = GetTime();

    EvictExtraOutboundPeers(time_in_seconds);

    if (time_in_seconds > m_stale_tip_check_time) {
        LOCK(cs_main);
        // Check whether our tip is stale, and if so, allow using an extra
        // outbound peer
        if (TipMayBeStale(consensusParams)) {
            LogPrintf("Potential stale tip detected, will try using extra outbound peer (last tip update: %d seconds ago)\n", time_in_seconds - g_last_tip_update);
            connman->SetTryNewOutboundPeer(true);
        } else if (connman->GetTryNewOutboundPeer()) {
            connman->SetTryNewOutboundPeer(false);
        }
        m_stale_tip_check_time = time_in_seconds + STALE_CHECK_INTERVAL;
    }
}

以上就是bitoin 里面CScheduler類的主要使用場景。


本文由 Copernicus團隊 喻建 編寫,轉(zhuǎn)載無需授權!

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容