用C++11實(shí)現(xiàn)一個(gè)簡(jiǎn)單的線(xiàn)程池

0.為什么需要線(xiàn)程池?

當(dāng)我們需要完成一些持續(xù)時(shí)間短、發(fā)生頻率高的工作時(shí),每次為他們開(kāi)啟一個(gè)線(xiàn)程既顯得繁瑣又會(huì)造成不必要的開(kāi)銷(xiāo),所以為這一類(lèi)工作寫(xiě)一個(gè)簡(jiǎn)單的線(xiàn)程池就很有必要了。

1.如何設(shè)計(jì)我們的線(xiàn)程池?

考慮這樣一個(gè)應(yīng)用場(chǎng)景,我們有一個(gè)設(shè)計(jì)軟件,當(dāng)我們需要下載一些模板用作接下來(lái)的設(shè)計(jì)工作。我們顯然不能將下載工作放在主線(xiàn)程去完成,如果你的用戶(hù)網(wǎng)絡(luò)速度飛快還好說(shuō),一旦他的網(wǎng)絡(luò)稍有波動(dòng),你的設(shè)計(jì)軟件將會(huì)停止GUI界面的渲染,卡在那邊不懂,這顯然不是好的體驗(yàn)。正確的做法是另外開(kāi)啟一個(gè)線(xiàn)程去下載,一旦這個(gè)下載工作完成后,我們可以通知主線(xiàn)程去做一個(gè)提示,例如彈出一個(gè)對(duì)話(huà)框告訴用戶(hù)模板已經(jīng)可以用了。
如果這個(gè)設(shè)計(jì)軟件需要頻繁的做下載工作,那么每次單開(kāi)一個(gè)線(xiàn)程就比較麻煩。我們可以將下載任務(wù)打包好,一個(gè)一個(gè)的推送到隊(duì)列中,由線(xiàn)程池自動(dòng)從隊(duì)列中獲取任務(wù),完成下載工作。當(dāng)隊(duì)列為空時(shí),線(xiàn)程池里面的線(xiàn)程就進(jìn)入等待狀態(tài),這也不會(huì)造成資源的占用。

2.先完成隊(duì)列的設(shè)計(jì)

STL中提供了一個(gè)關(guān)于隊(duì)列的實(shí)現(xiàn)std::queue,但是std::queue并不是線(xiàn)程安全的,我們需要在此基礎(chǔ)上進(jìn)行一些包裝,讓它在多線(xiàn)程的情況下也可以使用。
我們的隊(duì)列需要擁有這幾個(gè)功能:
1.從隊(duì)列里彈出一個(gè)任務(wù)
2.向隊(duì)列里推送任務(wù)
3.了解隊(duì)列里的任務(wù)還有多少個(gè)
4.清除任務(wù)隊(duì)列
下面是一個(gè)簡(jiǎn)單的線(xiàn)程安全隊(duì)列:

namespace multi_thread {
    template<typename Task>
    class thread_safe_queue {
    public:
        thread_safe_queue() :done_(false) {

        }

        void push(const Task& t) {
            std::lock_guard<std::mutex> lock(mtx_);
            queue_.push(t);
            ready_.notify_one();
        }

        void wait_and_pop(Task& t) {
            std::unique_lock<std::mutex> lock(mtx_);
            if (queue_.empty() && !done_)
                ready_.wait(lock);
            if (done_)
                return;
            t = queue_.front();
            queue_.pop();
        }

        bool empty()const {
            std::lock_guard<std::mutex> lock(mtx_);
            return queue_.empty();
        }

        void clear() {
            std::lock_guard<std::mutex> lock(mtx_);
            for (int i = 0; i < queue_.size(); ++i)
                queue_.pop();
        }

        void done() {
            done_ = true;
            ready_.notify_all();
        }
    private:
        std::queue<Task> queue_;
        mutable std::mutex mtx_;
        std::condition_variable ready_;
        std::atomic_bool done_;
    };
}

簡(jiǎn)單講解一下:
我們的線(xiàn)程安全隊(duì)列thread_safe_queue遵循了STL的命名規(guī)范,對(duì)于任意容器,他們的判斷為空的函數(shù)簽名都是bool empty()const;,清除容器中的內(nèi)容的都為void clear()。對(duì)于推入任務(wù)的函數(shù),我們跟std::queue保持了一致,而彈出函數(shù)的命名我們做了一些改變,與函數(shù)的操作更加切合,表示對(duì)std::queue::pop()的一種擴(kuò)展。

這里我們采用了std::mutex作為我們的互斥量,每次訪(fǎng)問(wèn)隊(duì)列中的內(nèi)容時(shí),都用std::lock_guard或者std::unique_lock加鎖,保證在同一時(shí)段只有一個(gè)線(xiàn)程可以讀寫(xiě)隊(duì)列。std::lock_guard是一個(gè)方便的RAII的體現(xiàn),他會(huì)在析構(gòu)時(shí)自動(dòng)調(diào)用unlock(),std::unique_lock與之的區(qū)別就是,你可以顯式調(diào)用unlock()函數(shù)完成解鎖,它更加的靈活,當(dāng)然你可以用{}std::lock_guard完成相同的工作。我們將std::mutex_設(shè)計(jì)成mutable,是因?yàn)槲覀冃枰赾onst member functionempty()中使用它。
push函數(shù)中,每次完成push后就調(diào)用std::conditional_variable::notify_one()來(lái)喚醒一個(gè)處于等待狀態(tài)的線(xiàn)程,通知他隊(duì)列中已經(jīng)有任務(wù)了,可以開(kāi)始彈出了。在對(duì)應(yīng)了wait_and_pop函數(shù)中,同樣有一個(gè)std::conditional_variable::wait(),這個(gè)函數(shù)可以傳入一個(gè)或者兩個(gè)參數(shù),第一個(gè)參數(shù)表示要釋放的鎖,第二個(gè)函數(shù)是一個(gè)predicate,不接受參數(shù),返回值為bool。他會(huì)在調(diào)用wait時(shí)首先調(diào)用這個(gè)predicate,如果值是true,那么就會(huì)重新獲得鎖并繼續(xù)向下執(zhí)行,如果值是false,他會(huì)釋放鎖并且進(jìn)入等待狀態(tài),直到接收到前面所說(shuō)的喚醒通知,他才會(huì)重新檢查predicate并做前述操作。這里我們沒(méi)有使用predicate,僅使用了簡(jiǎn)單的版本,他會(huì)在接收到喚醒通知時(shí)直接向下執(zhí)行。
再來(lái)看最后一個(gè)變量std::atomic_bool,它是標(biāo)準(zhǔn)庫(kù)提供的一個(gè)原子類(lèi)型,在訪(fǎng)問(wèn)時(shí)標(biāo)準(zhǔn)保證在同一時(shí)刻只有一個(gè)線(xiàn)程在讀寫(xiě)他。原子類(lèi)型是不可拷貝且不可移動(dòng)的,所以我們不能對(duì)他進(jìn)行類(lèi)內(nèi)初始化,這會(huì)調(diào)用copy constructor,正確的方法是使用constructor initializer list進(jìn)行初始化,當(dāng)然你也可以在構(gòu)造函數(shù)體內(nèi)使用賦值運(yùn)算符給其賦上一個(gè)值,但是請(qǐng)務(wù)必在這兩個(gè)中間選擇一個(gè)。回到代碼,這個(gè)變量的作用很簡(jiǎn)單,就是讓彈出的函數(shù)wait_and_pop直接結(jié)束,我們只有在需要析構(gòu)這個(gè)線(xiàn)程池時(shí)才會(huì)這么做,具體做法在線(xiàn)程池中細(xì)說(shuō)。

3.完成線(xiàn)程池的設(shè)計(jì)

在啟用線(xiàn)程池時(shí),我們需要指定在線(xiàn)程池中存放的線(xiàn)程的數(shù)量,當(dāng)然我們也可以在后續(xù)為線(xiàn)程池添加新的線(xiàn)程。
我們將使用STL提供的容器std::vector來(lái)存放我們的線(xiàn)程,這些線(xiàn)程做的工作都是一致的,他們將在隊(duì)列中不斷地彈出任務(wù)并執(zhí)行它,如果隊(duì)列中沒(méi)有任務(wù)了,他們就將進(jìn)入等待狀態(tài)。
結(jié)合上面的隊(duì)列實(shí)現(xiàn),給出一個(gè)簡(jiǎn)單的線(xiàn)程池實(shí)現(xiàn):

namespace multi_thread {
    template<typename Task>
    class thread_safe_queue {
    public:
        thread_safe_queue() :done_(false) {

        }

        void push(const Task& t) {
            std::lock_guard<std::mutex> lock(mtx_);
            queue_.push(t);
            ready_.notify_one();
        }

        void wait_and_pop(Task& t) {
            std::unique_lock<std::mutex> lock(mtx_);
            if (queue_.empty() && !done_)
                ready_.wait(lock);
            if (done_)
                return;
            t = queue_.front();
            queue_.pop();
        }

        bool empty()const {
            std::lock_guard<std::mutex> lock(mtx_);
            return queue_.empty();
        }

        void clear() {
            std::lock_guard<std::mutex> lock(mtx_);
            for (int i = 0; i < queue_.size(); ++i)
                queue_.pop();
        }

        void done() {
            done_ = true;
            ready_.notify_all();
        }
    private:
        std::queue<Task> queue_;
        mutable std::mutex mtx_;
        std::condition_variable ready_;
        std::atomic_bool done_;
    };

    class thread_pool {
    public:
        thread_pool(const int threadNum) :done_(false) {
            for (int i = 0; i < threadNum; ++i) {
                threads_.emplace_back(&thread_pool::workerThread, this);
            }
        }
        ~thread_pool() {
            done_.store(true);
            queue_.done();
            for (auto& thread : threads_) {
                if (thread.joinable())
                    thread.join();
            }
        }
        void submit(const std::function<void(void)>& t) {
            queue_.push(t);
        }

        bool isEmpty()const {
            return queue_.empty();
        }

        void clearTask() {
            queue_.clear();
        }
    private:
        void workerThread() {
            while (true) {
                std::function<void(void)> t;
                queue_.wait_and_pop(t);
                if (done_)
                    break;
                if (t)
                    t();
            }
        }
    private:
        std::vector<std::thread> threads_;
        thread_safe_queue<std::function<void(void)>> queue_;
        std::atomic_bool done_;
    };
}

隊(duì)列里存放的是std::function<void(void)>,是一個(gè)沒(méi)有參數(shù)和返回值的Callable。在C++中,常用的Callable包括lambda表達(dá)式、function object(也就是operator())、std::bindstd::function。我個(gè)人比較喜歡使用lambda和std::bind,他們都很簡(jiǎn)單易用,在使用std::bind時(shí),如果你的函數(shù)是一個(gè)member function,別忘了把this作為第一個(gè)參數(shù)。
這個(gè)線(xiàn)程池非常簡(jiǎn)單,當(dāng)你調(diào)用構(gòu)造函數(shù)時(shí),傳入一個(gè)你想要的線(xiàn)程數(shù)量,然后構(gòu)造函數(shù)體里就會(huì)開(kāi)啟對(duì)應(yīng)數(shù)量的線(xiàn)程。std::thread接受一個(gè)Callable和它的參數(shù)作為構(gòu)造函數(shù)的參數(shù),構(gòu)造函數(shù)完成后線(xiàn)程立即啟動(dòng),這里的workerThread是一個(gè)member function,所以,我們還得加上this作為參數(shù),跟std::bind一樣。
workerThread函數(shù)里面循環(huán)地從隊(duì)列中獲取任務(wù)并執(zhí)行,如果沒(méi)有任務(wù),他就會(huì)處于等待狀態(tài)??梢钥吹竭@里同樣有一個(gè)std::atomic_bool的變量done_,它用于使線(xiàn)程結(jié)束。我們會(huì)在線(xiàn)程池的析構(gòu)函數(shù)里將這個(gè)變量設(shè)為true,然后我們調(diào)用隊(duì)列的done函數(shù),讓隊(duì)列退出。最后我們逐個(gè)檢查線(xiàn)程池中的線(xiàn)程,如果他們還在運(yùn)行,等待他們結(jié)束。一個(gè)std::thread在析構(gòu)前必須指定等待該線(xiàn)程結(jié)束還是分離該線(xiàn)程,否則會(huì)發(fā)生錯(cuò)誤。如果我們不做這些工作,那么線(xiàn)程池在析構(gòu)時(shí)要么是其中的線(xiàn)程一直處于等待狀態(tài),要么是一直處于循環(huán)狀態(tài),這會(huì)導(dǎo)致析構(gòu)函數(shù)停滯不前,也就是程序的卡死。

4.總結(jié)

這次實(shí)現(xiàn)的線(xiàn)程池只能說(shuō)是實(shí)現(xiàn)了最基礎(chǔ)的功能,還缺少一些功能,例如任務(wù)返回值的缺失等。本文實(shí)現(xiàn)的線(xiàn)程安全隊(duì)列也十分簡(jiǎn)單,還有很多可以?xún)?yōu)化的地方。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容