c++多線程04

線程池

概念

  • 線程池 <font color=red>調(diào)度機制</font>, 在實際開發(fā)中, 將每個任務都交給 <font color=red>某個線程</font>是不切實際的, 可以利用 <font color=red>并行并發(fā)</font>為任務指定單獨的線程去執(zhí)行


  • 線程池提供了上面問題所需要的功能:
    • 提交任務到任務隊列上
    • 工作線程從隊列獲取任務, 任務完成后, 再從任務隊列中獲取下一個任務


  • 線程池幾個關鍵的問題:
    • 線程數(shù)量
    • 高效的任務分配方式
    • 是否阻塞(==即等待一個任務完成==)


簡單的線程池(==MSVC==)

  • 需求
1. 數(shù)量和當前on的環(huán)境匹配
2. 掛載任務到任務隊列
3. 工作線程從任務隊列獲取任務, 執(zhí)行..., 執(zhí)行完畢后再回來獲取新的任務
ps: 線程池中線程不需要等待其他線程, 如果需要等待, 要進行同步管理


  • demo結構
    19.png


  • 線程安全的隊列(==threadsafe_queue==)
/** 
    .h文件, 用的時候不要導入這個
    直接在main.cpp中導入hpp
*/
#pragma once
#include<mutex>
#include<queue>
namespace lb {
    using namespace std;

template<typename T>
class threadsafe_queue
{
private:
    mutable mutex mut;
    queue<T> data_queue;
    condition_variable data_cond;

public:
    threadsafe_queue();

    void push(const T& data);

    void wait_and_pop(T& value);

    shared_ptr<T> wait_and_pop();

    bool try_pop(T& value);

    shared_ptr<T> try_pop();

    bool empty() const;
};

}





/** 
    hpp
*/
#include "threadsafe_queue.h"

#define _T template<typename T>
#define _P threadsafe_queue<T>
#define _TP _T _P

namespace lb {

_TP::threadsafe_queue(){}

_T void _P::push(const T& data){
    lock_guard<mutex> lk(mut);
    data_queue.push(move(data));
    data_cond.notify_one();  
}

_T void _P::wait_and_pop(T& value){
    unique_lock<mutex> lk(mut);
    data_cond.wait(lk, [this] {return !data_queue.empty(); });
    value = move(data_queue.front());
    data_queue.pop();
}

_T shared_ptr<T> _P::wait_and_pop(){
    unique_lock<mutex> lk(mut);
    data_cond.wait(lk, [this] {return !data_queue.empty(); });  // 4
    shared_ptr<T> res(
        make_shared<T>(move(data_queue.front())));
    data_queue.pop();
    return res;
}

_T bool _P::try_pop(T& value){
    lock_guard<mutex> lk(mut);
    if (data_queue.empty())
        return false;
    value = move(data_queue.front());
    data_queue.pop();
    return true;
}

_T shared_ptr<T> _P::try_pop(){
    lock_guard<mutex> lk(mut);
    if (data_queue.empty())
        return shared_ptr<T>();  // 5
    shared_ptr<T> res(
        make_shared<T>(move(data_queue.front())));
    data_queue.pop();
    return res;
}

_T bool _P::empty() const{
    lock_guard<mutex> lk(mut);
    return data_queue.empty();
}


}


  • 輔助類(==join_threads==)
/** 
    join最合適的位置是在 main函數(shù)結束以前
*/

//.h
#pragma once
#include<vector>
#include<thread>

namespace lb {

using namespace std;

class join_threads
{
public:
     explicit join_threads(vector<thread>& threads_);
     ~join_threads();
    
     const vector<thread>& get() const{
         return threads;
     }
private:
    vector<thread>& threads;
};

}



///cpp
#include "join_threads.h"

namespace lb {

join_threads::join_threads(vector<thread>& threads_):threads(threads_){}

join_threads:: ~join_threads(){
    for (unsigned long i = 0; i < threads.size(); ++i){
        if (threads[i].joinable())
            threads[i].join();
    }
}

}


  • 線程池的實現(xiàn)
//// .h
#pragma once

#include<vector>
#include<functional>

#include"join_threads.h"
#include"threadsafe_queue.hpp"      //注意這里導入的是hpp, 不要導入頭文件

namespace lb {
    using namespace std;

class thread_pool{
public:
    using _Task = function<void(void)>;
    
    
    void work();

    thread_pool();

    const vector<thread>& get() const {
        return this->threads;
    }

    void submit(const _Task& task) {
        this->task_queue.push(task);
    }

    void over(void) {
        finish = true;
    }
private:
    bool finish;                            //_code_a
    threadsafe_queue<_Task> task_queue;     //_code_b
    vector<thread> threads;                 //_code_c
    join_threads j_threads;                 //_code_d

    /** 
        這里的析構順序很重要:
            _code_d要最先析構

        因為join_threads的作用是join所有的子線程
        這就表示了在不設置finish為true的情況下
        當線程池pool死亡時, 主線程會無限等待(因為join了)
        
        j_threads析構的時候必須訪問threads中的線程, 所以
        threads不能在j_threads前析構
    */
};
}






/// cpp
#include "thread_pool.h"
namespace lb {
    void thread_pool::work() {

        while (!finish){
            _Task tmp;
            if (this->task_queue.try_pop(tmp)) {
                tmp();
                continue;
            }
            this_thread::yield();
        }
    }

    thread_pool::thread_pool() :finish(false), j_threads(threads) {
        try {
            // 根據(jù)當前硬件, 創(chuàng)建合適的線程數(shù)量
            int all = thread::hardware_concurrency();
            
            threads.reserve(all);

            for (; all; --all) {
                threads.emplace_back(&thread_pool::work,this);
            }
        }catch (std::bad_alloc) {
            // 如果發(fā)生異常(thread構造失敗), 設置finish為true, 已經(jīng)開始的線程會停掉
            finish = true;
            throw;
        }
    }
}


  • 測試代碼
#define _CRT_SECURE_NO_WARNINGS

#include<iostream>
#include<thread>
#include<future>
#include<chrono>
#include"thread_pool.h"
#include<sstream>
using namespace std;


void task_a(void) {
    this_thread::sleep_for(chrono::milliseconds(2000));
    thread_local ostringstream buf(std::ios_base::app);
    buf.clear();
    buf << this_thread::get_id() << " task_a()\n";
    cout << buf.str();
}

void task_b(void) {
    thread_local ostringstream buf(std::ios_base::app);
    buf << this_thread::get_id();
    buf << " task_b\n";
    cout << buf.str();
    buf.seekp(0);
}


#define version_1 1
int main(int arg, char** args) {

#ifdef version_1

    lb::thread_pool p;

    for (int i = -1; ++i < 10;) {
        p.submit(function<void(void)>(task_a));
        p.submit(function<void(void)>(task_b));
    }

    system("pause");    //為了看到打印
    p.over();             
}


簡單線程池中的問題

1. 線程池中的線程在不斷循環(huán)判斷

2. 上述只是 一個 void(void)的函數(shù), 而實際開發(fā)中:
    用戶的函數(shù)可能有返回值, 參數(shù)也是多樣的


思路:
    對于第1個問題:
        子線程沒有任務的時候應該是掛起的
        當main提交任務后, 是喚醒thread_pool中掛起的線程, 做完事后再掛起

    對于第2個問題分2步:
        用戶的函數(shù)是多種多樣的:
            返回值不定
            參數(shù)個數(shù)不定, 參數(shù)類型不定
        
        所以必須用到泛型
        返回值勢必用到 future和packaged_task


子線程改為wait

//修改threads_pool的成員變量
class thread_pool{
public:

    ....


private:
    bool finish;
    mutex empty;
    condition_variable cond;
    threadsafe_queue<_Task> task_queue;
    vector<thread> threads;
    join_threads j_threads;
};


// 修改work和submit的函數(shù)
void submit(const _Task& task) {
    this->task_queue.push(task);

    // 喚醒所有的線程去搶任務
    cond.notify_all();
}


void thread_pool::work() {
    while (!finish){
        /** 
            當結束(finish為true 或 隊列不空的時候被喚醒)
            喚醒后, wait會對成員emtpy上鎖(mutex對象)
            但隊列本身是線程安全的, 所以wait后, 再手動解鎖
        */
        unique_lock<mutex> u_mt(this->empty);
        cond.wait(u_mt, [this] {
            if(this->finish)
                return true;
            return !this->task_queue.empty();
        });

        if(finish){
            u_mt.unlock();
            continue;
        }
        u_mt.unlock();

        _Task tmp;
        
        if (this->task_queue.try_pop(tmp)) 
            tmp();
    }
}



void over(void) {
    finish = true;
    cond.notify_all();      //記得通知所有的線程
}


返回值的解決思路

  • 因為是異步的, 所以用前面的 <font color=red>future</font>, 即 <font color=red>submit返回一個future</font>


  • 先來看看, 怎么獲取 <font color=red>callable</font>的返回值==類型==
/** 
    在以前分析 std::async的時候, STL中用到過 對callable的返回值traits
    
    舉例
*/
#include<type_traits>
#include<typeinfo>


int main(int arg, char** args){
    using _Ret = _Invoke_result_t<
        decay_t<decltype(&main)>, 
        decay_t<int>, 
        decay_t<char**>
        >;

    
    // traits出main函數(shù)的返回值, 所以 _Ret的類型是int
    /** 
        ps: decltype(main) 和 decltype(&main)的類型是不一樣的
            decltype(&main)是 int(*)(int,char**)    // 函數(shù)指針
            decltype(main) 是 int(int,char**)       // 像function<type>的模板參數(shù)
    */


    /// decay會將 int[]的類型轉換成int*(在以前也說過)
    cout << typeid(decay_t<int[]>).name() << endl;


    // lambda的類型
    auto tmp = [](int a, char**, int[], int len) ->int {
            return 0;
    };

    // traits出lambda的返回值, 所以_Lambda_Ret的類型是int
    using _Lambda_Ret = _Invoke_result_t< 
        decay_t<decltype(tmp)>,
        decay_t<int>,
        decay_t<char**>,
        decay_t<int[]>,
        decay_t<int>
        >;
}


tuple妙用(==任務參數(shù)的多樣性==)

/** 
    回想thread的構造函數(shù), 它的過程就不說了, 可以用thread的構造函數(shù)來
    解決參數(shù)不定以及返回值的問題, 這里給出一個demo
    是模仿thread的構造函數(shù), 功能是將用戶的 函數(shù)存儲起來, 想在什么時候調(diào)用
    就什么時候調(diào)用
*/
namespace _tt {

    ///生成的調(diào)用函數(shù)
    template<typename Callable, typename... Args>
    void invoke(Callable&& callable, Args&&... args) {
        callable(args...);
    }


    
    /// 中間轉換的函數(shù), 外界統(tǒng)一從這里入口
    //// 注意這里用的是 __stdcall的壓棧模式
    template <class _Tuple, size_t... _Indices>
    static unsigned int __stdcall _Invoke(void* _RawVals) noexcept /* terminates */ {
        _Tuple* _FnVals = (_Tuple*)_RawVals;
        _Tuple& _Tup = *_FnVals;
        printf("--- _Tup: %p\n", &_Tup);
        _tt::invoke(_STD move(_STD get<_Indices>(_Tup))...);
        return 0;
    }


    /// 根據(jù)用戶傳入的 callable和不定的參數(shù), 生成具體的入口調(diào)用函數(shù)
    template <class _Tuple, size_t... _Indices>
    _NODISCARD static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
        return &_Invoke<_Tuple, _Indices...>;
    }


    
    ///測試是用了2個全局的指針
    /// 根據(jù)用戶的callable和不定參數(shù), 保存對應的入口地址
    static void* _invoke_address = nullptr;

    ///參數(shù)的信息(tuple*)
    static void* _args_address = nullptr;

    
    template<typename Fir, typename... Args>
    auto fun(Fir&& fir, Args&&... args) ->
    future<_Invoke_result_t<decay_t<Fir>, decay_t<Args>...>> { //模仿async, 返回類型為future<int>

        // 用戶函數(shù)的返回類型 int
        using _Ret = _Invoke_result_t<decay_t<Fir>, decay_t<Args>...>;
        using _Future = future<_Ret>;

        /// 將callable包裝成 packaged_task
        using _Packaged = packaged_task<_Ret(Args...)>;

        /// 將packaged_task和用戶函數(shù)的參數(shù), 保存到tuple中
        using _Tuple = tuple<_Packaged, Args...>;

        /// 包裝fir函數(shù)為packaged_task
        _Packaged callable(fir);

        /// 根據(jù)tuple(t_fun,2), 生成入口函數(shù)的簽名地址, 保存到全局變量(測試)
        _invoke_address = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(Args)>{});

        /// 儲存t_fun和參數(shù)信息(2)
        auto p_tuple = new _Tuple(_STD forward<_Packaged>(callable), _STD forward<Args>(args)...);

        //用全局變量記住, 后期threadsafe_queue中保存的就是這種類型的指針
        _args_address = p_tuple;


        /// 將packaged的future給外界, 外界可以get()
        auto result(std::get<0>(*p_tuple).get_future());
        return result;
    }
}





/** 
    測試1, 在子線程中執(zhí)行任務
*/
int t_fun(int) {
    cout << "hello\n";
    return 0;
}
int main(int arg, char** args) {
    {
        // 外界注冊任務,并拿到future
        auto tmp = _tt::fun(t_fun, 2);

        // 模擬async中用戶的任務被os調(diào)用, 這里用一個子線程, 在線程中執(zhí)行上面的 t_fun
        thread t_([&tmp] {

            // 需要說明的是, 入口函數(shù)的簽名是 unsigned int __stdcall _Invoke(void*)
            //// 全局變量(存儲入口函數(shù)地址)的類型是 void*, 所以必須指定和上面一致的簽名
            ////// MSVC中默認的并不是 __stdcall, 所以要顯示指定壓棧方式
            ((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
            std::cout << this_thread::get_id  << " "<< tmp.get() << endl;
        });

        t_.join();

        return 0;
    }
}    



/** 
    測試2, 不同的用戶函數(shù)(簽名不一樣)
*/
const char* test(const char*) {
    cout << "test()\n";
    return "yangrui\n";
}

int main(int arg, char** args){
    auto tmp = _tt::fun(t_fun, 2);

    ((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
    std::cout << this_thread::get_id  << " "<< tmp.get() << endl;


    auto tmp2 = _tt::fun(test, 22);

    ((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
    std::cout << this_thread::get_id  << " "<< tmp2.get() << endl;
}



/** 
    在上面的測試中, 有內(nèi)存泄露 tuple* 沒有被釋放
*/


解決內(nèi)存泄露(==完整的代碼==)

namespace _tt {

    template<typename Callable, typename... Args>
    void invoke(Callable&& callable, Args&&... args) {
        callable(args...);
    }

    template <class _Tuple, size_t... _Indices>
    static unsigned int __stdcall _Invoke(void* _RawVals) noexcept /* terminates */ {
        unique_ptr<_Tuple> _FnVals(static_cast<_Tuple*>(_RawVals));
        _Tuple& _Tup = *_FnVals;
        printf("--- _Tup: %p\n", &_Tup);
        _tt::invoke(_STD move(_STD get<_Indices>(_Tup))...);
        return 0;
    }

    template <class _Tuple, size_t... _Indices>
    _NODISCARD static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
        return &_Invoke<_Tuple, _Indices...>;
    }


    static void* _invoke_address = nullptr;
    static void* _args_address = nullptr;


    template<typename Fir, typename... Args>
    auto fun(Fir&& fir, Args&&... args) ->
    future<_Invoke_result_t<decay_t<Fir>, decay_t<Args>...>> {
        using _Ret = _Invoke_result_t<decay_t<Fir>, decay_t<Args>...>;
        using _Future = future<_Ret>;

        using _Packaged = packaged_task<_Ret(Args...)>;
        using _Tuple = tuple<_Packaged, Args...>;

        _Packaged callable(fir);

        _invoke_address = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(Args)>{});

        auto p_tuple = new _Tuple(_STD forward<_Packaged>(callable), _STD forward<Args>(args)...);
        _args_address = p_tuple;



        auto result(std::get<0>(*p_tuple).get_future());
        return result;
    }
}






int t_fun(int) {
    cout << "hello\n";
    return 2424;
}



const char* test(const char*) {
    cout << "test()\n";
    this_thread::sleep_for(chrono::seconds(5));
    return "yangrui\n";
}

int main(int arg, char** args) {
    auto tmp = _tt::fun(t_fun, 2);

    ((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
    //std::cout << this_thread::get_id  << " "<< tmp.get() << endl;


    auto tmp2 = _tt::fun(test, "lu");

    ((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
    std::cout << this_thread::get_id  << " "<< tmp2.get() << endl;
}
在上面的static unsigned int __stdcall _Invoke(void* _RawVals) noexcept 
中使用unique_ptr來包裝tuple*

這里面的內(nèi)存釋放關乎2個new:
    1. tuple*
    2. 在創(chuàng)建任務時(fun函數(shù)內(nèi)部) packaged_task內(nèi)部的_Packaged_state<...> *

首先tuple*:
    0. 它的作用是 存儲callable(packaged_task)和用戶的所有參數(shù)

    1. fun函數(shù)內(nèi)部被創(chuàng)建

    2. 內(nèi)部的callable間接引用著 _Pactaged_state<..> *
    
    一個好釋放理由是, 自己釋放, 不要讓用戶手動釋放 
    所以釋放tuple*的1個設計是 在調(diào)用完 _invoke 后, 應該自動釋放tuple*
    方法就是在_invoke中對 tuple* 做unique, 具體見上面的代碼


其次_Packaged_state<>*:
    0. 它在本例中的作用是 存儲用戶的返回值
    
    1. fun內(nèi)部會創(chuàng)建 packaged_task對象, 進而創(chuàng)建_Packaged_state*

    2. 當創(chuàng)建 tuple*的時候, 會將 _Packaged_state* 轉移給tuple<0>

    3. 當用tuple<0>獲取result(future)時, result也指向_Packaged_state*

    4. 當fun函數(shù)結束, 先拷貝構造臨時對象tmp(future), result中的_Packaged_state*被轉移到tmp
        4.1 tmp如果沒有外界接收, 會被析構, 進而可能會delete _Packaged_state*
            但此時有2個對象引用 _Packaged_state(tuple<0>和tmp), 所以tmp只是將計數(shù)-1, 并不會釋放_Packaged_state*

        4.2 有外界接收, 則tmp被外界引用, 不會析構

      ps: _Packaged_state的new指針是使用了引用計數(shù)原理(以前沒有說過, 這里提一下), 但不是STL中的share_ptr
         

    5. 接著釋放局部的packged_task對象, 但發(fā)現(xiàn)_Packaged_state已經(jīng)為空
        所以不會釋放_Packaged_state


    6. main函數(shù)中調(diào)用完fun后, 借用了全局指針調(diào)用 _invoke 函數(shù)
        傳遞的指針其實就是 tuple*
        _invoke內(nèi)部會拿到callable(packaged_task)調(diào)用 invoke函數(shù)
        invoke的調(diào)用會觸發(fā)到 callable的重載(), 并在調(diào)用結束后存儲返回值到 packaged_state*指向的對象中
        這個過程和以前探討的 packgade_task源碼是一樣的
        此時如果外界通過 packaged_task的future對象獲取返回值(get())時的過程就不說了, 前面源碼中已經(jīng)很
        詳細了
    
    7. 在 _invoke內(nèi)部, 用unique<tuple> 包裝了tuple*
        7.1 函數(shù)完畢后, 會釋放 unique<tulpe>, 進而釋放tuple中的callable
            callable指向的_Packgade_state*也會根據(jù)計數(shù)器決定要不要釋放

    8. 如果main函數(shù)中 接收第4步fun函數(shù)返回的future對象(tmp, tmp2), 則main函數(shù)結束后
        會釋放tmp, tmp2, 它們內(nèi)部都引用_Packaged_state*, 但也會根據(jù)計數(shù)器來釋放_Packaged_state*


改進前面的線程池

  • threadsafe_queue
#pragma once
#include<mutex>
#include<queue>
namespace lb {
    using namespace std;

template<typename T>
class threadsafe_queue
{
private:
    mutable mutex mut;
    queue<T> data_queue;
    condition_variable data_cond;

public:
    threadsafe_queue();

    void push(const T& data);

    void wait_and_pop(T& value);

    shared_ptr<T> wait_and_pop();

    bool try_pop(T& value);

    shared_ptr<T> try_pop();

    bool empty() const;
};

}





#include "threadsafe_queue.h"

#define _T template<typename T>
#define _P threadsafe_queue<T>
#define _TP _T _P

namespace lb {
_TP::threadsafe_queue(){}

_T void _P::push(const T& data){
    lock_guard<mutex> lk(mut);
    data_queue.push(move(data));
    data_cond.notify_one();  
}

_T void _P::wait_and_pop(T& value){
    unique_lock<mutex> lk(mut);
    data_cond.wait(lk, [this] {return !data_queue.empty(); });
    value = move(data_queue.front());
    data_queue.pop();
}

_T shared_ptr<T> _P::wait_and_pop(){
    unique_lock<mutex> lk(mut);
    data_cond.wait(lk, [this] {return !data_queue.empty(); });  // 4
    shared_ptr<T> res(
        make_shared<T>(move(data_queue.front())));
    data_queue.pop();
    return res;
}

_T bool _P::try_pop(T& value){
    lock_guard<mutex> lk(mut);
    if (data_queue.empty())
        return false;
    value = move(data_queue.front());
    data_queue.pop();
    return true;
}

_T shared_ptr<T> _P::try_pop(){
    lock_guard<mutex> lk(mut);
    if (data_queue.empty())
        return shared_ptr<T>();  // 5
    shared_ptr<T> res(
        make_shared<T>(move(data_queue.front())));
    data_queue.pop();
    return res;
}

_T bool _P::empty() const{
    lock_guard<mutex> lk(mut);
    return data_queue.empty();
}


}


  • join_thread
#pragma once
#include<vector>
#include<thread>

namespace lb {

using namespace std;

class join_threads
{
public:
     explicit join_threads(vector<thread>& threads_);
     ~join_threads();
    
     const vector<thread>& get() const{
         return threads;
     }
private:
    vector<thread>& threads;
};

}



#include "join_threads.h"

namespace lb {

join_threads::join_threads(vector<thread>& threads_):threads(threads_){}

join_threads:: ~join_threads(){
    for (unsigned long i = 0; i < threads.size(); ++i){
        if (threads[i].joinable())
            threads[i].join();
    }
}

}


  • thread_pool
#pragma once

#include<vector>
#include<future>
#include<type_traits>

#include"join_threads.h"
#include"threadsafe_queue.hpp"

namespace lb {

using namespace std;

struct _call_info {
    void* addr;
    void* args;
};



template<typename Callable, typename... Args>
void invoke(Callable&& callable, Args&&... args) {
    callable(args...);
}

template <class _Tuple, size_t... _Indices>
static unsigned int __stdcall _Invoke(void* _RawVals) noexcept {
    unique_ptr<_Tuple> _FnVals(static_cast<_Tuple*>(_RawVals));

    _Tuple& _Tup = *_FnVals;

    lb::invoke(_STD move(_STD get<_Indices>(_Tup))...);

    return 0;
}

template <class _Tuple, size_t... _Indices>
_NODISCARD static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
    return &lb::_Invoke<_Tuple, _Indices...>;
}










class thread_pool{
public:
    using _Task = _call_info;
    
    void work();
    thread_pool();


    template<typename Fir, typename... Args>
    auto submit(Fir&& fir, Args&&... args) ->
        future<_Invoke_result_t<decay_t<Fir>, decay_t<Args>...>> {
        using _Ret = _Invoke_result_t<decay_t<Fir>, decay_t<Args>...>;
        using _Future = future<_Ret>;

        using _Packaged = packaged_task<_Ret(Args...)>;
        using _Tuple = tuple<_Packaged, Args...>;

        _Packaged callable(fir);

        _call_info _call;

        _call.addr = lb::_Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(Args)>{});

        auto p_tuple = new _Tuple(_STD forward<_Packaged>(callable), _STD forward<Args>(args)...);
        _call.args = p_tuple;

        task_queue.push(_call);
        cond.notify_all();

        return std::get<0>(*p_tuple).get_future();
    }


    const vector<thread>& get() const {
        return this->threads;
    }

    void over(void) {
        finish = true;
        cond.notify_all();
    }
private:
    bool finish;
    mutex empty;
    condition_variable cond;
    threadsafe_queue<_Task> task_queue;
    vector<thread> threads;
    join_threads j_threads;
};



}






#include "thread_pool.h"
#include<iostream>

namespace lb {
    


    void thread_pool::work() {

        while (!finish){
            unique_lock<mutex> u_mt(this->empty);
            cond.wait(u_mt, [this] {
                if (this->finish)
                    return true;
                return !this->task_queue.empty();
            });
            if (finish) {
                u_mt.unlock();
                continue;
            }
            u_mt.unlock();

            _Task tmp;
            
            if (this->task_queue.try_pop(tmp)) 
                ((unsigned int(__stdcall*)(void*))tmp.addr)(tmp.args);
        }
        std::cout << "over\n";
    }

    thread_pool::thread_pool() :finish(false), j_threads(threads) {
        try {
            int all = thread::hardware_concurrency();
            threads.reserve(all);

            for (; all; --all) {
                threads.emplace_back(&thread_pool::work,this);
            }
        }catch (std::bad_alloc) {
            finish = true;
            throw;
        }
    }
}```

<br>

- 測試
```cpp
#define _CRT_SECURE_NO_WARNINGS

#include<iostream>
#include<chrono>
#include"thread_pool.h"
#include<sstream>
using namespace std;

int test_a(int num) {
    this_thread::sleep_for(chrono::milliseconds(2000));
    thread_local ostringstream buf(std::ios_base::app);

    buf << "test_a: \t";
    buf << this_thread::get_id();
    buf << "\targ(num):\t\t";
    buf << num;
    buf << "\n";
    cout << buf.str();
    buf.seekp(0);

    return 2424;
}

const char* test_b(const char* cstr) {
    thread_local ostringstream buf(std::ios_base::app);

    buf << "test_b: \t";
    buf << this_thread::get_id();
    buf << "\targ(cstr):\t\t";
    buf << cstr;
    buf << "\n";
    cout << buf.str();
    buf.seekp(0);

    return "yangrui\n";
}

int main(int arg, char** args) {
    lb::thread_pool p;
    {
        struct _B {
            int test(const char* arg, int num) {
                cout << arg << endl;
                cout << num << endl;
                return 242;
            }

        };
        _B b;
        p.submit(&_B::test, b, "helooooo", 24242);
        return 0;
    }

    {
    
        p.submit([](int a, int b) {
            cout << this_thread::get_id() << endl;
        }, 2, 4);
        
        getchar();

        p.over();
        return 0;
    }

    { 
        auto f_a = p.submit(test_a, 222);
        auto f_b = p.submit(test_b, "hello");

        cout << f_a.get() << endl;
        cout << f_b.get() << endl;
    }

    for (int i = -1; ++i < 10;) {
        p.submit(test_a, i* 12);
        p.submit(test_b, "楊楊");
    }

    system("pause");    //為了看到打印
    p.over();
}


修改為g++下的線程池

說明(C++17下編譯)

#ifndef _JOIN_THREAD_H_
#define _JOIN_THREAD_H_
#include<vector>
#include<thread>

namespace lb {

using namespace std;

class join_threads
{
public:
         explicit join_threads(vector<thread>& threads_);
         ~join_threads();

         const vector<thread>& get() const{
                 return threads;
         }
private:
        vector<thread>& threads;
};

}

#endif




#include "join_threads.h"

namespace lb {

join_threads::join_threads(vector<thread>& threads_):threads(threads_){}

join_threads:: ~join_threads(){
        for (unsigned long i = 0; i < threads.size(); ++i){
                if (threads[i].joinable())
                        threads[i].join();
        }
}















#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include<vector>
#include<future>
#include<type_traits>

#include"join_threads.h"
#include"threadsafe_queue.hpp"

#define __stdcall __attribute__((__stdcall__))
#define _STD std::


namespace lb {

using namespace std;

struct _call_info {
        void* addr;
        void* args;
};



template<typename Callable, typename... Args>
void invoke(Callable&& callable, Args&&... args) {
        callable(args...);
}

template <class _Tuple, size_t... _Indices>
static uint32_t __stdcall _Invoke(void* _RawVals) noexcept {
        unique_ptr<_Tuple> _FnVals(static_cast<_Tuple*>(_RawVals));

        _Tuple& _Tup = *_FnVals;

        lb::invoke(_STD move(_STD get<_Indices>(_Tup))...);

        return 0;
}

template <class _Tuple, size_t... _Indices>
static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
        return &lb::_Invoke<_Tuple, _Indices...>;
}










class thread_pool{
public:
        using _Task = _call_info;

        void work();
        thread_pool();


        template<typename Fir, typename... Args>
        auto submit(Fir&& fir, Args&&... args) ->
                future<invoke_result_t<decay_t<Fir>, decay_t<Args>...>> {
                using _Ret = invoke_result_t<decay_t<Fir>, decay_t<Args>...>;
                using _Future = future<_Ret>;

                using _Packaged = packaged_task<_Ret(Args...)>;
                using _Tuple = tuple<_Packaged, Args...>;

                _Packaged callable(fir);

                _call_info _call;

                _call.addr = (void*)lb::_Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(Args)>{});

                auto p_tuple = new _Tuple(_STD forward<_Packaged>(callable), _STD forward<Args>(args)...);
                _call.args = p_tuple;

                task_queue.push(_call);
                cond.notify_all();

                return std::get<0>(*p_tuple).get_future();
        }


        const vector<thread>& get() const {
                return this->threads;
        }

        void over(void) {
                finish = true;
                cond.notify_all();
        }
private:
        bool finish;
        mutex empty;
        condition_variable cond;
        threadsafe_queue<_Task> task_queue;
        vector<thread> threads;
        join_threads j_threads;
};



}

#endif




#include<iostream>

namespace lb {



        void thread_pool::work() {

                while (!finish){
                        unique_lock<mutex> u_mt(this->empty);
                        cond.wait(u_mt, [this] {
                                if (this->finish)
                                        return true;
                                return !this->task_queue.empty();
                        });
                        if (finish) {
                                u_mt.unlock();
                                continue;
                        }
                        u_mt.unlock();

                        _Task tmp;

                        if (this->task_queue.try_pop(tmp)) 
                                ((uint32_t(__attribute__((__stdcall__))*)(void*))tmp.addr)(tmp.args);
                }
                std::cout << "over\n";
        }

        thread_pool::thread_pool() :finish(false), j_threads(threads) {
                try {
                        int all = thread::hardware_concurrency();
                        threads.reserve(all);

                        for (; all; --all) {
                                threads.emplace_back(&thread_pool::work,this);
                        }
                }catch (std::bad_alloc) {
                        finish = true;
                        throw;
                }
        }
}













#ifndef _THREAD_SAFE_QUEUE_H_
#define _THREAD_SAFE_QUEUE_H_
#include<mutex>
#include<queue>
namespace lb {
        using namespace std;

template<typename T>
class threadsafe_queue
{
private:
        mutable mutex mut;
        queue<T> data_queue;
        condition_variable data_cond;

public:
        threadsafe_queue();

        void push(const T& data);

        void wait_and_pop(T& value);

        shared_ptr<T> wait_and_pop();

        bool try_pop(T& value);

        shared_ptr<T> try_pop();

        bool empty() const;
};

}

#endif



#ifndef _THREAD_SAFE_QUEUE_HPP_
#define _THREAD_SAFE_QUEUE_HPP_
#include "threadsafe_queue.h"

#define _T template<typename T>
#define _P threadsafe_queue<T>
#define _TP _T _P

namespace lb {
_TP::threadsafe_queue(){}

_T void _P::push(const T& data){
        lock_guard<mutex> lk(mut);
        data_queue.push(move(data));
        data_cond.notify_one();  
}

_T void _P::wait_and_pop(T& value){
        unique_lock<mutex> lk(mut);
        data_cond.wait(lk, [this] {return !data_queue.empty(); });
        value = move(data_queue.front());
        data_queue.pop();
}

_T shared_ptr<T> _P::wait_and_pop(){
        unique_lock<mutex> lk(mut);
        data_cond.wait(lk, [this] {return !data_queue.empty(); });  // 4
        shared_ptr<T> res(
                make_shared<T>(move(data_queue.front())));
        data_queue.pop();
        return res;
}

_T bool _P::try_pop(T& value){
        lock_guard<mutex> lk(mut);
        if (data_queue.empty())
                return false;
        value = move(data_queue.front());
        data_queue.pop();
        return true;
}

_T shared_ptr<T> _P::try_pop(){
        lock_guard<mutex> lk(mut);
        if (data_queue.empty())
                return shared_ptr<T>();  // 5
        shared_ptr<T> res(
                make_shared<T>(move(data_queue.front())));
        data_queue.pop();
        return res;
}

_T bool _P::empty() const{
        lock_guard<mutex> lk(mut);
        return data_queue.empty();
}


}

#endif








#include<iostream>
#include<chrono>
#include"thread_pool.h"
#include<sstream>
using namespace std;

int test_a(int num) {
        this_thread::sleep_for(chrono::milliseconds(2000));
        thread_local ostringstream buf(std::ios_base::app);

        buf << "test_a: \t";
        buf << this_thread::get_id();
        buf << "\targ(num):\t\t";
        buf << num;
        buf << "\n";
        cout << buf.str();
        buf.seekp(0);

        return 2424;
}

const char* test_b(const char* cstr) {
        thread_local ostringstream buf(std::ios_base::app);

        buf << "test_b: \t";
        buf << this_thread::get_id();
        buf << "\targ(cstr):\t\t";
        buf << cstr;
        buf << "\n";
        cout << buf.str();
        buf.seekp(0);

        return "yangrui\n";
}

int main(int arg, char** args) {
    lb::thread_pool p;
#if 0
        {
                struct _B {
                        int test(const char* arg, int num) {
                                cout << arg << endl;
                                cout << num << endl;
                                return 242;
                        }

                };
                _B b;
                p.submit(&_B::test, b, "helooooo", 24242);
                return 0;
        }
#elif 0
        {

                p.submit([](int a, int b) {
                        cout << "a: " << a << endl;
                        cout << "b: " << b << endl;
                        cout << "td: " << this_thread::get_id() << endl;
                }, 2, 4);

                getchar();

                p.over();
                return 0;
        }
#elif 1
        { 
                auto f_a = p.submit(test_a, 222);
                auto f_b = p.submit(test_b, "hello");

                cout << f_a.get() << endl;
                cout << f_b.get() << endl;
        }

        for (int i = -1; ++i < 10;) {
                p.submit(test_a, i* 12);
                p.submit(test_b, "楊楊");
        }

        getchar();
        p.over();
#endif

        return 0;
}

編譯

g++ -ggdb3 main.cpp join_threads.cpp thread_pool.cpp -lpthread -std=c++17
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容