主要參考以下兩篇博客
c++多線程 thread類
c++實(shí)現(xiàn)線程池
當(dāng)一個(gè)thread被創(chuàng)建以后要么join等待線程執(zhí)行完畢,要么detached將線程獨(dú)立出去
線程池適合的幾個(gè)場(chǎng)景
(1) 單位時(shí)間內(nèi)處理任務(wù)頻繁而且任務(wù)處理時(shí)間短;
(2) 對(duì)實(shí)時(shí)性要求較高。如果接受到任務(wù)后在創(chuàng)建線程,可能滿足不了實(shí)時(shí)要求,因此必須采用線程池進(jìn)行預(yù)創(chuàng)建。
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
class ThreadPool {
public:
ThreadPool(size_t); //構(gòu)造函數(shù)
template<class F, class... Args> //類模板
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;//任務(wù)入隊(duì)
~ThreadPool(); //析構(gòu)函數(shù)
private:
std::vector< std::thread > workers; //線程隊(duì)列,每個(gè)元素為一個(gè)Thread對(duì)象
std::queue< std::function<void()> > tasks; //任務(wù)隊(duì)列,每個(gè)元素為一個(gè)函數(shù)對(duì)象
std::mutex queue_mutex; //互斥量
std::condition_variable condition; //條件變量
bool stop; //停止
};
// 構(gòu)造函數(shù),把線程插入線程隊(duì)列,插入時(shí)調(diào)用embrace_back(),用匿名函數(shù)lambda初始化Thread對(duì)象
inline ThreadPool::ThreadPool(size_t threads) : stop(false){
for(size_t i = 0; i<threads; ++i)
workers.emplace_back(
[this]
{
for(;;)
{
// task是一個(gè)函數(shù)類型,從任務(wù)隊(duì)列接收任務(wù)
std::function<void()> task;
{
//給互斥量加鎖,鎖對(duì)象生命周期結(jié)束后自動(dòng)解鎖
std::unique_lock<std::mutex> lock(this->queue_mutex);
//(1)當(dāng)匿名函數(shù)返回false時(shí)才阻塞線程,阻塞時(shí)自動(dòng)釋放鎖。
//(2)當(dāng)匿名函數(shù)返回true且受到通知時(shí)解阻塞,然后加鎖。
this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
//從任務(wù)隊(duì)列取出一個(gè)任務(wù)
task = std::move(this->tasks.front());
this->tasks.pop();
} // 自動(dòng)解鎖
task(); // 執(zhí)行這個(gè)任務(wù)
}
}
);
}
// 添加新的任務(wù)到任務(wù)隊(duì)列
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
// 獲取函數(shù)返回值類型
using return_type = typename std::result_of<F(Args...)>::type;
// 創(chuàng)建一個(gè)指向任務(wù)的只能指針
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex); //加鎖
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); }); //把任務(wù)加入隊(duì)列
} //自動(dòng)解鎖
condition.notify_one(); //通知條件變量,喚醒一個(gè)線程
return res;
}
// 析構(gòu)函數(shù),刪除所有線程
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
#endif
一般來說要給線程設(shè)置一個(gè)標(biāo)志位,當(dāng)滿足條件后,跳出程序,等待線程池內(nèi)的所有線程執(zhí)行完畢后,執(zhí)行join