線程池
概念
- 線程池 <font color=red>調(diào)度機制</font>, 在實際開發(fā)中, 將每個任務都交給 <font color=red>某個線程</font>是不切實際的, 可以利用 <font color=red>并行并發(fā)</font>為任務指定單獨的線程去執(zhí)行
- 線程池提供了上面問題所需要的功能:
- 提交任務到任務隊列上
- 工作線程從隊列獲取任務, 任務完成后, 再從任務隊列中獲取下一個任務
- 線程池幾個關鍵的問題:
- 線程數(shù)量
- 高效的任務分配方式
- 是否阻塞(==即等待一個任務完成==)
簡單的線程池(==MSVC==)
1. 數(shù)量和當前on的環(huán)境匹配
2. 掛載任務到任務隊列
3. 工作線程從任務隊列獲取任務, 執(zhí)行..., 執(zhí)行完畢后再回來獲取新的任務
ps: 線程池中線程不需要等待其他線程, 如果需要等待, 要進行同步管理
- 線程安全的隊列(==threadsafe_queue==)
/**
.h文件, 用的時候不要導入這個
直接在main.cpp中導入hpp
*/
#pragma once
#include<mutex>
#include<queue>
namespace lb {
using namespace std;
template<typename T>
class threadsafe_queue
{
private:
mutable mutex mut;
queue<T> data_queue;
condition_variable data_cond;
public:
threadsafe_queue();
void push(const T& data);
void wait_and_pop(T& value);
shared_ptr<T> wait_and_pop();
bool try_pop(T& value);
shared_ptr<T> try_pop();
bool empty() const;
};
}
/**
hpp
*/
#include "threadsafe_queue.h"
#define _T template<typename T>
#define _P threadsafe_queue<T>
#define _TP _T _P
namespace lb {
_TP::threadsafe_queue(){}
_T void _P::push(const T& data){
lock_guard<mutex> lk(mut);
data_queue.push(move(data));
data_cond.notify_one();
}
_T void _P::wait_and_pop(T& value){
unique_lock<mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
value = move(data_queue.front());
data_queue.pop();
}
_T shared_ptr<T> _P::wait_and_pop(){
unique_lock<mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); }); // 4
shared_ptr<T> res(
make_shared<T>(move(data_queue.front())));
data_queue.pop();
return res;
}
_T bool _P::try_pop(T& value){
lock_guard<mutex> lk(mut);
if (data_queue.empty())
return false;
value = move(data_queue.front());
data_queue.pop();
return true;
}
_T shared_ptr<T> _P::try_pop(){
lock_guard<mutex> lk(mut);
if (data_queue.empty())
return shared_ptr<T>(); // 5
shared_ptr<T> res(
make_shared<T>(move(data_queue.front())));
data_queue.pop();
return res;
}
_T bool _P::empty() const{
lock_guard<mutex> lk(mut);
return data_queue.empty();
}
}
/**
join最合適的位置是在 main函數(shù)結束以前
*/
//.h
#pragma once
#include<vector>
#include<thread>
namespace lb {
using namespace std;
class join_threads
{
public:
explicit join_threads(vector<thread>& threads_);
~join_threads();
const vector<thread>& get() const{
return threads;
}
private:
vector<thread>& threads;
};
}
///cpp
#include "join_threads.h"
namespace lb {
join_threads::join_threads(vector<thread>& threads_):threads(threads_){}
join_threads:: ~join_threads(){
for (unsigned long i = 0; i < threads.size(); ++i){
if (threads[i].joinable())
threads[i].join();
}
}
}
//// .h
#pragma once
#include<vector>
#include<functional>
#include"join_threads.h"
#include"threadsafe_queue.hpp" //注意這里導入的是hpp, 不要導入頭文件
namespace lb {
using namespace std;
class thread_pool{
public:
using _Task = function<void(void)>;
void work();
thread_pool();
const vector<thread>& get() const {
return this->threads;
}
void submit(const _Task& task) {
this->task_queue.push(task);
}
void over(void) {
finish = true;
}
private:
bool finish; //_code_a
threadsafe_queue<_Task> task_queue; //_code_b
vector<thread> threads; //_code_c
join_threads j_threads; //_code_d
/**
這里的析構順序很重要:
_code_d要最先析構
因為join_threads的作用是join所有的子線程
這就表示了在不設置finish為true的情況下
當線程池pool死亡時, 主線程會無限等待(因為join了)
j_threads析構的時候必須訪問threads中的線程, 所以
threads不能在j_threads前析構
*/
};
}
/// cpp
#include "thread_pool.h"
namespace lb {
void thread_pool::work() {
while (!finish){
_Task tmp;
if (this->task_queue.try_pop(tmp)) {
tmp();
continue;
}
this_thread::yield();
}
}
thread_pool::thread_pool() :finish(false), j_threads(threads) {
try {
// 根據(jù)當前硬件, 創(chuàng)建合適的線程數(shù)量
int all = thread::hardware_concurrency();
threads.reserve(all);
for (; all; --all) {
threads.emplace_back(&thread_pool::work,this);
}
}catch (std::bad_alloc) {
// 如果發(fā)生異常(thread構造失敗), 設置finish為true, 已經(jīng)開始的線程會停掉
finish = true;
throw;
}
}
}
#define _CRT_SECURE_NO_WARNINGS
#include<iostream>
#include<thread>
#include<future>
#include<chrono>
#include"thread_pool.h"
#include<sstream>
using namespace std;
void task_a(void) {
this_thread::sleep_for(chrono::milliseconds(2000));
thread_local ostringstream buf(std::ios_base::app);
buf.clear();
buf << this_thread::get_id() << " task_a()\n";
cout << buf.str();
}
void task_b(void) {
thread_local ostringstream buf(std::ios_base::app);
buf << this_thread::get_id();
buf << " task_b\n";
cout << buf.str();
buf.seekp(0);
}
#define version_1 1
int main(int arg, char** args) {
#ifdef version_1
lb::thread_pool p;
for (int i = -1; ++i < 10;) {
p.submit(function<void(void)>(task_a));
p.submit(function<void(void)>(task_b));
}
system("pause"); //為了看到打印
p.over();
}
簡單線程池中的問題
1. 線程池中的線程在不斷循環(huán)判斷
2. 上述只是 一個 void(void)的函數(shù), 而實際開發(fā)中:
用戶的函數(shù)可能有返回值, 參數(shù)也是多樣的
思路:
對于第1個問題:
子線程沒有任務的時候應該是掛起的
當main提交任務后, 是喚醒thread_pool中掛起的線程, 做完事后再掛起
對于第2個問題分2步:
用戶的函數(shù)是多種多樣的:
返回值不定
參數(shù)個數(shù)不定, 參數(shù)類型不定
所以必須用到泛型
返回值勢必用到 future和packaged_task
子線程改為wait
//修改threads_pool的成員變量
class thread_pool{
public:
....
private:
bool finish;
mutex empty;
condition_variable cond;
threadsafe_queue<_Task> task_queue;
vector<thread> threads;
join_threads j_threads;
};
// 修改work和submit的函數(shù)
void submit(const _Task& task) {
this->task_queue.push(task);
// 喚醒所有的線程去搶任務
cond.notify_all();
}
void thread_pool::work() {
while (!finish){
/**
當結束(finish為true 或 隊列不空的時候被喚醒)
喚醒后, wait會對成員emtpy上鎖(mutex對象)
但隊列本身是線程安全的, 所以wait后, 再手動解鎖
*/
unique_lock<mutex> u_mt(this->empty);
cond.wait(u_mt, [this] {
if(this->finish)
return true;
return !this->task_queue.empty();
});
if(finish){
u_mt.unlock();
continue;
}
u_mt.unlock();
_Task tmp;
if (this->task_queue.try_pop(tmp))
tmp();
}
}
void over(void) {
finish = true;
cond.notify_all(); //記得通知所有的線程
}
返回值的解決思路
- 因為是異步的, 所以用前面的 <font color=red>future</font>, 即 <font color=red>submit返回一個future</font>
- 先來看看, 怎么獲取 <font color=red>callable</font>的返回值==類型==
/**
在以前分析 std::async的時候, STL中用到過 對callable的返回值traits
舉例
*/
#include<type_traits>
#include<typeinfo>
int main(int arg, char** args){
using _Ret = _Invoke_result_t<
decay_t<decltype(&main)>,
decay_t<int>,
decay_t<char**>
>;
// traits出main函數(shù)的返回值, 所以 _Ret的類型是int
/**
ps: decltype(main) 和 decltype(&main)的類型是不一樣的
decltype(&main)是 int(*)(int,char**) // 函數(shù)指針
decltype(main) 是 int(int,char**) // 像function<type>的模板參數(shù)
*/
/// decay會將 int[]的類型轉換成int*(在以前也說過)
cout << typeid(decay_t<int[]>).name() << endl;
// lambda的類型
auto tmp = [](int a, char**, int[], int len) ->int {
return 0;
};
// traits出lambda的返回值, 所以_Lambda_Ret的類型是int
using _Lambda_Ret = _Invoke_result_t<
decay_t<decltype(tmp)>,
decay_t<int>,
decay_t<char**>,
decay_t<int[]>,
decay_t<int>
>;
}
tuple妙用(==任務參數(shù)的多樣性==)
/**
回想thread的構造函數(shù), 它的過程就不說了, 可以用thread的構造函數(shù)來
解決參數(shù)不定以及返回值的問題, 這里給出一個demo
是模仿thread的構造函數(shù), 功能是將用戶的 函數(shù)存儲起來, 想在什么時候調(diào)用
就什么時候調(diào)用
*/
namespace _tt {
///生成的調(diào)用函數(shù)
template<typename Callable, typename... Args>
void invoke(Callable&& callable, Args&&... args) {
callable(args...);
}
/// 中間轉換的函數(shù), 外界統(tǒng)一從這里入口
//// 注意這里用的是 __stdcall的壓棧模式
template <class _Tuple, size_t... _Indices>
static unsigned int __stdcall _Invoke(void* _RawVals) noexcept /* terminates */ {
_Tuple* _FnVals = (_Tuple*)_RawVals;
_Tuple& _Tup = *_FnVals;
printf("--- _Tup: %p\n", &_Tup);
_tt::invoke(_STD move(_STD get<_Indices>(_Tup))...);
return 0;
}
/// 根據(jù)用戶傳入的 callable和不定的參數(shù), 生成具體的入口調(diào)用函數(shù)
template <class _Tuple, size_t... _Indices>
_NODISCARD static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
return &_Invoke<_Tuple, _Indices...>;
}
///測試是用了2個全局的指針
/// 根據(jù)用戶的callable和不定參數(shù), 保存對應的入口地址
static void* _invoke_address = nullptr;
///參數(shù)的信息(tuple*)
static void* _args_address = nullptr;
template<typename Fir, typename... Args>
auto fun(Fir&& fir, Args&&... args) ->
future<_Invoke_result_t<decay_t<Fir>, decay_t<Args>...>> { //模仿async, 返回類型為future<int>
// 用戶函數(shù)的返回類型 int
using _Ret = _Invoke_result_t<decay_t<Fir>, decay_t<Args>...>;
using _Future = future<_Ret>;
/// 將callable包裝成 packaged_task
using _Packaged = packaged_task<_Ret(Args...)>;
/// 將packaged_task和用戶函數(shù)的參數(shù), 保存到tuple中
using _Tuple = tuple<_Packaged, Args...>;
/// 包裝fir函數(shù)為packaged_task
_Packaged callable(fir);
/// 根據(jù)tuple(t_fun,2), 生成入口函數(shù)的簽名地址, 保存到全局變量(測試)
_invoke_address = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(Args)>{});
/// 儲存t_fun和參數(shù)信息(2)
auto p_tuple = new _Tuple(_STD forward<_Packaged>(callable), _STD forward<Args>(args)...);
//用全局變量記住, 后期threadsafe_queue中保存的就是這種類型的指針
_args_address = p_tuple;
/// 將packaged的future給外界, 外界可以get()
auto result(std::get<0>(*p_tuple).get_future());
return result;
}
}
/**
測試1, 在子線程中執(zhí)行任務
*/
int t_fun(int) {
cout << "hello\n";
return 0;
}
int main(int arg, char** args) {
{
// 外界注冊任務,并拿到future
auto tmp = _tt::fun(t_fun, 2);
// 模擬async中用戶的任務被os調(diào)用, 這里用一個子線程, 在線程中執(zhí)行上面的 t_fun
thread t_([&tmp] {
// 需要說明的是, 入口函數(shù)的簽名是 unsigned int __stdcall _Invoke(void*)
//// 全局變量(存儲入口函數(shù)地址)的類型是 void*, 所以必須指定和上面一致的簽名
////// MSVC中默認的并不是 __stdcall, 所以要顯示指定壓棧方式
((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
std::cout << this_thread::get_id << " "<< tmp.get() << endl;
});
t_.join();
return 0;
}
}
/**
測試2, 不同的用戶函數(shù)(簽名不一樣)
*/
const char* test(const char*) {
cout << "test()\n";
return "yangrui\n";
}
int main(int arg, char** args){
auto tmp = _tt::fun(t_fun, 2);
((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
std::cout << this_thread::get_id << " "<< tmp.get() << endl;
auto tmp2 = _tt::fun(test, 22);
((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
std::cout << this_thread::get_id << " "<< tmp2.get() << endl;
}
/**
在上面的測試中, 有內(nèi)存泄露 tuple* 沒有被釋放
*/
解決內(nèi)存泄露(==完整的代碼==)
namespace _tt {
template<typename Callable, typename... Args>
void invoke(Callable&& callable, Args&&... args) {
callable(args...);
}
template <class _Tuple, size_t... _Indices>
static unsigned int __stdcall _Invoke(void* _RawVals) noexcept /* terminates */ {
unique_ptr<_Tuple> _FnVals(static_cast<_Tuple*>(_RawVals));
_Tuple& _Tup = *_FnVals;
printf("--- _Tup: %p\n", &_Tup);
_tt::invoke(_STD move(_STD get<_Indices>(_Tup))...);
return 0;
}
template <class _Tuple, size_t... _Indices>
_NODISCARD static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
return &_Invoke<_Tuple, _Indices...>;
}
static void* _invoke_address = nullptr;
static void* _args_address = nullptr;
template<typename Fir, typename... Args>
auto fun(Fir&& fir, Args&&... args) ->
future<_Invoke_result_t<decay_t<Fir>, decay_t<Args>...>> {
using _Ret = _Invoke_result_t<decay_t<Fir>, decay_t<Args>...>;
using _Future = future<_Ret>;
using _Packaged = packaged_task<_Ret(Args...)>;
using _Tuple = tuple<_Packaged, Args...>;
_Packaged callable(fir);
_invoke_address = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(Args)>{});
auto p_tuple = new _Tuple(_STD forward<_Packaged>(callable), _STD forward<Args>(args)...);
_args_address = p_tuple;
auto result(std::get<0>(*p_tuple).get_future());
return result;
}
}
int t_fun(int) {
cout << "hello\n";
return 2424;
}
const char* test(const char*) {
cout << "test()\n";
this_thread::sleep_for(chrono::seconds(5));
return "yangrui\n";
}
int main(int arg, char** args) {
auto tmp = _tt::fun(t_fun, 2);
((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
//std::cout << this_thread::get_id << " "<< tmp.get() << endl;
auto tmp2 = _tt::fun(test, "lu");
((unsigned int (__stdcall *)(void*))_tt::_invoke_address)(_tt::_args_address);
std::cout << this_thread::get_id << " "<< tmp2.get() << endl;
}
在上面的static unsigned int __stdcall _Invoke(void* _RawVals) noexcept
中使用unique_ptr來包裝tuple*
這里面的內(nèi)存釋放關乎2個new:
1. tuple*
2. 在創(chuàng)建任務時(fun函數(shù)內(nèi)部) packaged_task內(nèi)部的_Packaged_state<...> *
首先tuple*:
0. 它的作用是 存儲callable(packaged_task)和用戶的所有參數(shù)
1. fun函數(shù)內(nèi)部被創(chuàng)建
2. 內(nèi)部的callable間接引用著 _Pactaged_state<..> *
一個好釋放理由是, 自己釋放, 不要讓用戶手動釋放
所以釋放tuple*的1個設計是 在調(diào)用完 _invoke 后, 應該自動釋放tuple*
方法就是在_invoke中對 tuple* 做unique, 具體見上面的代碼
其次_Packaged_state<>*:
0. 它在本例中的作用是 存儲用戶的返回值
1. fun內(nèi)部會創(chuàng)建 packaged_task對象, 進而創(chuàng)建_Packaged_state*
2. 當創(chuàng)建 tuple*的時候, 會將 _Packaged_state* 轉移給tuple<0>
3. 當用tuple<0>獲取result(future)時, result也指向_Packaged_state*
4. 當fun函數(shù)結束, 先拷貝構造臨時對象tmp(future), result中的_Packaged_state*被轉移到tmp
4.1 tmp如果沒有外界接收, 會被析構, 進而可能會delete _Packaged_state*
但此時有2個對象引用 _Packaged_state(tuple<0>和tmp), 所以tmp只是將計數(shù)-1, 并不會釋放_Packaged_state*
4.2 有外界接收, 則tmp被外界引用, 不會析構
ps: _Packaged_state的new指針是使用了引用計數(shù)原理(以前沒有說過, 這里提一下), 但不是STL中的share_ptr
5. 接著釋放局部的packged_task對象, 但發(fā)現(xiàn)_Packaged_state已經(jīng)為空
所以不會釋放_Packaged_state
6. main函數(shù)中調(diào)用完fun后, 借用了全局指針調(diào)用 _invoke 函數(shù)
傳遞的指針其實就是 tuple*
_invoke內(nèi)部會拿到callable(packaged_task)調(diào)用 invoke函數(shù)
invoke的調(diào)用會觸發(fā)到 callable的重載(), 并在調(diào)用結束后存儲返回值到 packaged_state*指向的對象中
這個過程和以前探討的 packgade_task源碼是一樣的
此時如果外界通過 packaged_task的future對象獲取返回值(get())時的過程就不說了, 前面源碼中已經(jīng)很
詳細了
7. 在 _invoke內(nèi)部, 用unique<tuple> 包裝了tuple*
7.1 函數(shù)完畢后, 會釋放 unique<tulpe>, 進而釋放tuple中的callable
callable指向的_Packgade_state*也會根據(jù)計數(shù)器決定要不要釋放
8. 如果main函數(shù)中 接收第4步fun函數(shù)返回的future對象(tmp, tmp2), 則main函數(shù)結束后
會釋放tmp, tmp2, 它們內(nèi)部都引用_Packaged_state*, 但也會根據(jù)計數(shù)器來釋放_Packaged_state*
改進前面的線程池
#pragma once
#include<mutex>
#include<queue>
namespace lb {
using namespace std;
template<typename T>
class threadsafe_queue
{
private:
mutable mutex mut;
queue<T> data_queue;
condition_variable data_cond;
public:
threadsafe_queue();
void push(const T& data);
void wait_and_pop(T& value);
shared_ptr<T> wait_and_pop();
bool try_pop(T& value);
shared_ptr<T> try_pop();
bool empty() const;
};
}
#include "threadsafe_queue.h"
#define _T template<typename T>
#define _P threadsafe_queue<T>
#define _TP _T _P
namespace lb {
_TP::threadsafe_queue(){}
_T void _P::push(const T& data){
lock_guard<mutex> lk(mut);
data_queue.push(move(data));
data_cond.notify_one();
}
_T void _P::wait_and_pop(T& value){
unique_lock<mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
value = move(data_queue.front());
data_queue.pop();
}
_T shared_ptr<T> _P::wait_and_pop(){
unique_lock<mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); }); // 4
shared_ptr<T> res(
make_shared<T>(move(data_queue.front())));
data_queue.pop();
return res;
}
_T bool _P::try_pop(T& value){
lock_guard<mutex> lk(mut);
if (data_queue.empty())
return false;
value = move(data_queue.front());
data_queue.pop();
return true;
}
_T shared_ptr<T> _P::try_pop(){
lock_guard<mutex> lk(mut);
if (data_queue.empty())
return shared_ptr<T>(); // 5
shared_ptr<T> res(
make_shared<T>(move(data_queue.front())));
data_queue.pop();
return res;
}
_T bool _P::empty() const{
lock_guard<mutex> lk(mut);
return data_queue.empty();
}
}
#pragma once
#include<vector>
#include<thread>
namespace lb {
using namespace std;
class join_threads
{
public:
explicit join_threads(vector<thread>& threads_);
~join_threads();
const vector<thread>& get() const{
return threads;
}
private:
vector<thread>& threads;
};
}
#include "join_threads.h"
namespace lb {
join_threads::join_threads(vector<thread>& threads_):threads(threads_){}
join_threads:: ~join_threads(){
for (unsigned long i = 0; i < threads.size(); ++i){
if (threads[i].joinable())
threads[i].join();
}
}
}
#pragma once
#include<vector>
#include<future>
#include<type_traits>
#include"join_threads.h"
#include"threadsafe_queue.hpp"
namespace lb {
using namespace std;
struct _call_info {
void* addr;
void* args;
};
template<typename Callable, typename... Args>
void invoke(Callable&& callable, Args&&... args) {
callable(args...);
}
template <class _Tuple, size_t... _Indices>
static unsigned int __stdcall _Invoke(void* _RawVals) noexcept {
unique_ptr<_Tuple> _FnVals(static_cast<_Tuple*>(_RawVals));
_Tuple& _Tup = *_FnVals;
lb::invoke(_STD move(_STD get<_Indices>(_Tup))...);
return 0;
}
template <class _Tuple, size_t... _Indices>
_NODISCARD static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
return &lb::_Invoke<_Tuple, _Indices...>;
}
class thread_pool{
public:
using _Task = _call_info;
void work();
thread_pool();
template<typename Fir, typename... Args>
auto submit(Fir&& fir, Args&&... args) ->
future<_Invoke_result_t<decay_t<Fir>, decay_t<Args>...>> {
using _Ret = _Invoke_result_t<decay_t<Fir>, decay_t<Args>...>;
using _Future = future<_Ret>;
using _Packaged = packaged_task<_Ret(Args...)>;
using _Tuple = tuple<_Packaged, Args...>;
_Packaged callable(fir);
_call_info _call;
_call.addr = lb::_Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(Args)>{});
auto p_tuple = new _Tuple(_STD forward<_Packaged>(callable), _STD forward<Args>(args)...);
_call.args = p_tuple;
task_queue.push(_call);
cond.notify_all();
return std::get<0>(*p_tuple).get_future();
}
const vector<thread>& get() const {
return this->threads;
}
void over(void) {
finish = true;
cond.notify_all();
}
private:
bool finish;
mutex empty;
condition_variable cond;
threadsafe_queue<_Task> task_queue;
vector<thread> threads;
join_threads j_threads;
};
}
#include "thread_pool.h"
#include<iostream>
namespace lb {
void thread_pool::work() {
while (!finish){
unique_lock<mutex> u_mt(this->empty);
cond.wait(u_mt, [this] {
if (this->finish)
return true;
return !this->task_queue.empty();
});
if (finish) {
u_mt.unlock();
continue;
}
u_mt.unlock();
_Task tmp;
if (this->task_queue.try_pop(tmp))
((unsigned int(__stdcall*)(void*))tmp.addr)(tmp.args);
}
std::cout << "over\n";
}
thread_pool::thread_pool() :finish(false), j_threads(threads) {
try {
int all = thread::hardware_concurrency();
threads.reserve(all);
for (; all; --all) {
threads.emplace_back(&thread_pool::work,this);
}
}catch (std::bad_alloc) {
finish = true;
throw;
}
}
}```
<br>
- 測試
```cpp
#define _CRT_SECURE_NO_WARNINGS
#include<iostream>
#include<chrono>
#include"thread_pool.h"
#include<sstream>
using namespace std;
int test_a(int num) {
this_thread::sleep_for(chrono::milliseconds(2000));
thread_local ostringstream buf(std::ios_base::app);
buf << "test_a: \t";
buf << this_thread::get_id();
buf << "\targ(num):\t\t";
buf << num;
buf << "\n";
cout << buf.str();
buf.seekp(0);
return 2424;
}
const char* test_b(const char* cstr) {
thread_local ostringstream buf(std::ios_base::app);
buf << "test_b: \t";
buf << this_thread::get_id();
buf << "\targ(cstr):\t\t";
buf << cstr;
buf << "\n";
cout << buf.str();
buf.seekp(0);
return "yangrui\n";
}
int main(int arg, char** args) {
lb::thread_pool p;
{
struct _B {
int test(const char* arg, int num) {
cout << arg << endl;
cout << num << endl;
return 242;
}
};
_B b;
p.submit(&_B::test, b, "helooooo", 24242);
return 0;
}
{
p.submit([](int a, int b) {
cout << this_thread::get_id() << endl;
}, 2, 4);
getchar();
p.over();
return 0;
}
{
auto f_a = p.submit(test_a, 222);
auto f_b = p.submit(test_b, "hello");
cout << f_a.get() << endl;
cout << f_b.get() << endl;
}
for (int i = -1; ++i < 10;) {
p.submit(test_a, i* 12);
p.submit(test_b, "楊楊");
}
system("pause"); //為了看到打印
p.over();
}
修改為g++下的線程池
說明(C++17下編譯)
#ifndef _JOIN_THREAD_H_
#define _JOIN_THREAD_H_
#include<vector>
#include<thread>
namespace lb {
using namespace std;
class join_threads
{
public:
explicit join_threads(vector<thread>& threads_);
~join_threads();
const vector<thread>& get() const{
return threads;
}
private:
vector<thread>& threads;
};
}
#endif
#include "join_threads.h"
namespace lb {
join_threads::join_threads(vector<thread>& threads_):threads(threads_){}
join_threads:: ~join_threads(){
for (unsigned long i = 0; i < threads.size(); ++i){
if (threads[i].joinable())
threads[i].join();
}
}
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include<vector>
#include<future>
#include<type_traits>
#include"join_threads.h"
#include"threadsafe_queue.hpp"
#define __stdcall __attribute__((__stdcall__))
#define _STD std::
namespace lb {
using namespace std;
struct _call_info {
void* addr;
void* args;
};
template<typename Callable, typename... Args>
void invoke(Callable&& callable, Args&&... args) {
callable(args...);
}
template <class _Tuple, size_t... _Indices>
static uint32_t __stdcall _Invoke(void* _RawVals) noexcept {
unique_ptr<_Tuple> _FnVals(static_cast<_Tuple*>(_RawVals));
_Tuple& _Tup = *_FnVals;
lb::invoke(_STD move(_STD get<_Indices>(_Tup))...);
return 0;
}
template <class _Tuple, size_t... _Indices>
static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
return &lb::_Invoke<_Tuple, _Indices...>;
}
class thread_pool{
public:
using _Task = _call_info;
void work();
thread_pool();
template<typename Fir, typename... Args>
auto submit(Fir&& fir, Args&&... args) ->
future<invoke_result_t<decay_t<Fir>, decay_t<Args>...>> {
using _Ret = invoke_result_t<decay_t<Fir>, decay_t<Args>...>;
using _Future = future<_Ret>;
using _Packaged = packaged_task<_Ret(Args...)>;
using _Tuple = tuple<_Packaged, Args...>;
_Packaged callable(fir);
_call_info _call;
_call.addr = (void*)lb::_Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(Args)>{});
auto p_tuple = new _Tuple(_STD forward<_Packaged>(callable), _STD forward<Args>(args)...);
_call.args = p_tuple;
task_queue.push(_call);
cond.notify_all();
return std::get<0>(*p_tuple).get_future();
}
const vector<thread>& get() const {
return this->threads;
}
void over(void) {
finish = true;
cond.notify_all();
}
private:
bool finish;
mutex empty;
condition_variable cond;
threadsafe_queue<_Task> task_queue;
vector<thread> threads;
join_threads j_threads;
};
}
#endif
#include<iostream>
namespace lb {
void thread_pool::work() {
while (!finish){
unique_lock<mutex> u_mt(this->empty);
cond.wait(u_mt, [this] {
if (this->finish)
return true;
return !this->task_queue.empty();
});
if (finish) {
u_mt.unlock();
continue;
}
u_mt.unlock();
_Task tmp;
if (this->task_queue.try_pop(tmp))
((uint32_t(__attribute__((__stdcall__))*)(void*))tmp.addr)(tmp.args);
}
std::cout << "over\n";
}
thread_pool::thread_pool() :finish(false), j_threads(threads) {
try {
int all = thread::hardware_concurrency();
threads.reserve(all);
for (; all; --all) {
threads.emplace_back(&thread_pool::work,this);
}
}catch (std::bad_alloc) {
finish = true;
throw;
}
}
}
#ifndef _THREAD_SAFE_QUEUE_H_
#define _THREAD_SAFE_QUEUE_H_
#include<mutex>
#include<queue>
namespace lb {
using namespace std;
template<typename T>
class threadsafe_queue
{
private:
mutable mutex mut;
queue<T> data_queue;
condition_variable data_cond;
public:
threadsafe_queue();
void push(const T& data);
void wait_and_pop(T& value);
shared_ptr<T> wait_and_pop();
bool try_pop(T& value);
shared_ptr<T> try_pop();
bool empty() const;
};
}
#endif
#ifndef _THREAD_SAFE_QUEUE_HPP_
#define _THREAD_SAFE_QUEUE_HPP_
#include "threadsafe_queue.h"
#define _T template<typename T>
#define _P threadsafe_queue<T>
#define _TP _T _P
namespace lb {
_TP::threadsafe_queue(){}
_T void _P::push(const T& data){
lock_guard<mutex> lk(mut);
data_queue.push(move(data));
data_cond.notify_one();
}
_T void _P::wait_and_pop(T& value){
unique_lock<mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
value = move(data_queue.front());
data_queue.pop();
}
_T shared_ptr<T> _P::wait_and_pop(){
unique_lock<mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); }); // 4
shared_ptr<T> res(
make_shared<T>(move(data_queue.front())));
data_queue.pop();
return res;
}
_T bool _P::try_pop(T& value){
lock_guard<mutex> lk(mut);
if (data_queue.empty())
return false;
value = move(data_queue.front());
data_queue.pop();
return true;
}
_T shared_ptr<T> _P::try_pop(){
lock_guard<mutex> lk(mut);
if (data_queue.empty())
return shared_ptr<T>(); // 5
shared_ptr<T> res(
make_shared<T>(move(data_queue.front())));
data_queue.pop();
return res;
}
_T bool _P::empty() const{
lock_guard<mutex> lk(mut);
return data_queue.empty();
}
}
#endif
#include<iostream>
#include<chrono>
#include"thread_pool.h"
#include<sstream>
using namespace std;
int test_a(int num) {
this_thread::sleep_for(chrono::milliseconds(2000));
thread_local ostringstream buf(std::ios_base::app);
buf << "test_a: \t";
buf << this_thread::get_id();
buf << "\targ(num):\t\t";
buf << num;
buf << "\n";
cout << buf.str();
buf.seekp(0);
return 2424;
}
const char* test_b(const char* cstr) {
thread_local ostringstream buf(std::ios_base::app);
buf << "test_b: \t";
buf << this_thread::get_id();
buf << "\targ(cstr):\t\t";
buf << cstr;
buf << "\n";
cout << buf.str();
buf.seekp(0);
return "yangrui\n";
}
int main(int arg, char** args) {
lb::thread_pool p;
#if 0
{
struct _B {
int test(const char* arg, int num) {
cout << arg << endl;
cout << num << endl;
return 242;
}
};
_B b;
p.submit(&_B::test, b, "helooooo", 24242);
return 0;
}
#elif 0
{
p.submit([](int a, int b) {
cout << "a: " << a << endl;
cout << "b: " << b << endl;
cout << "td: " << this_thread::get_id() << endl;
}, 2, 4);
getchar();
p.over();
return 0;
}
#elif 1
{
auto f_a = p.submit(test_a, 222);
auto f_b = p.submit(test_b, "hello");
cout << f_a.get() << endl;
cout << f_b.get() << endl;
}
for (int i = -1; ++i < 10;) {
p.submit(test_a, i* 12);
p.submit(test_b, "楊楊");
}
getchar();
p.over();
#endif
return 0;
}
編譯
g++ -ggdb3 main.cpp join_threads.cpp thread_pool.cpp -lpthread -std=c++17