- 為什么需要線程池
- 如何設(shè)計(jì)一個(gè)線程池
- 用C++11實(shí)現(xiàn)一個(gè)線程池
為什么需要線程池
線程的頻繁創(chuàng)建和銷毀,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性。
線程池預(yù)先創(chuàng)建空閑的線程,程序?qū)⒁粋€(gè)任務(wù)傳給線程池,線程池就會(huì)啟動(dòng)空閑線程來執(zhí)行這個(gè)任務(wù),執(zhí)行結(jié)束以后,該線程并不會(huì)銷毀,而是再次返回線程池中成為空閑狀態(tài),等待執(zhí)行下一個(gè)任務(wù)。
使用線程池好處:
- 降低資源消耗:通過池化技術(shù)重復(fù)利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的損耗。
- 提高響應(yīng)速度:任務(wù)到達(dá)時(shí),無需等待線程創(chuàng)建即可立即執(zhí)行。
- 提高線程的可管理性:線程是稀缺資源,如果無限制創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)因?yàn)榫€程的不合理分布導(dǎo)致資源調(diào)度失衡,- 降低系統(tǒng)的穩(wěn)定性。使用線程池可以進(jìn)行統(tǒng)一的分配、調(diào)優(yōu)和監(jiān)控。
- 提供更多更強(qiáng)大的功能:線程池具備可拓展性,允許開發(fā)人員向其中增加更多的功能。比如延時(shí)定時(shí)線程池,就允許任務(wù)延期執(zhí)行或定期執(zhí)行。
設(shè)計(jì)線程池
線程池設(shè)計(jì)的思路都大同小異:將任務(wù)寫入到阻塞隊(duì)列中,然后線程池中的空閑線程從隊(duì)列中獲取任務(wù)執(zhí)行,執(zhí)行完成后再從隊(duì)列獲取新的任務(wù)執(zhí)行。
設(shè)計(jì)線程池需要考慮的幾個(gè)特性
- 任務(wù)隊(duì)列:按照功能分為優(yōu)先級(jí)隊(duì)列,先進(jìn)后出隊(duì)列,同步隊(duì)列等等
- 線程數(shù)量的控制
- 線程數(shù)量增加或回收
任務(wù)隊(duì)列
線程池中任務(wù)隊(duì)列都是阻塞型的,線程從中獲取任務(wù),沒有任務(wù)則阻塞。
一般有幾種特性的隊(duì)列
- 優(yōu)先級(jí)隊(duì)列:支持有優(yōu)先級(jí)的任務(wù)
- 延遲隊(duì)列:支持任務(wù)可以在某個(gè)時(shí)間點(diǎn)執(zhí)行
- 先進(jìn)后出隊(duì)列:隊(duì)列類似于棧一樣,任務(wù)先進(jìn)后出
- 同步隊(duì)列:不存儲(chǔ)任務(wù)的隊(duì)列,將任務(wù)push到隊(duì)列中時(shí)阻塞,直到將其分配給idle線程
線程數(shù)量
多線程編程中,經(jīng)常會(huì)被問到:該設(shè)置多少個(gè)線程才合理?
如果線程數(shù)設(shè)置太多,則對線程調(diào)度,上下文切換,緩存等產(chǎn)生較大的影響;如果設(shè)置太小,則造成吞吐不夠。
而到底設(shè)置多少線程數(shù),其實(shí)和機(jī)器資源的額度和任務(wù)特征有關(guān)。
我們一般將任務(wù)分為幾種
- 計(jì)算密集型
- IO密集型
- 混合型:計(jì)算與IO密集型
引用《Java Concurrency in Practice》中的公式來估算合適的線程數(shù)。
計(jì)算密集型
線程數(shù) = CPU數(shù) + 1
如果一個(gè)線程產(chǎn)生缺頁或者其他原因停止運(yùn)行,則還有一個(gè)idle線程會(huì)占用CPU,不會(huì)導(dǎo)致CPU資源浪費(fèi)。但是這只是一個(gè)相對合理的經(jīng)驗(yàn)值,并不科學(xué)。
IO密集型和混合型

如果希望將CPU達(dá)到指定的利用率,則按照以下公式來計(jì)算線程數(shù)

所以,最佳線程數(shù)并不一定是固定不變的,可以隨著任務(wù)的差異及資源利用率進(jìn)行動(dòng)態(tài)調(diào)整,這就需要線程池有動(dòng)態(tài)調(diào)整線程的能力。
線程的創(chuàng)建與回收
線程池一般會(huì)在初始化時(shí)創(chuàng)建空閑線程,或者是等到需要的時(shí)候再延遲創(chuàng)建。
線程池?cái)U(kuò)容
當(dāng)任務(wù)較多,已有線程都處于運(yùn)行狀態(tài),而已有線程數(shù)沒有達(dá)到最大線程數(shù)限制時(shí),則需要?jiǎng)?chuàng)建新的線程來執(zhí)行任務(wù)。
也會(huì)有一些觸發(fā)條件,如任務(wù)等待多久仍然沒有被調(diào)度則創(chuàng)建新線程等。
線程池縮容
當(dāng)線程處于空閑狀態(tài)一段時(shí)間后,則需要回收此空閑線程以節(jié)約資源。
線程池監(jiān)控指標(biāo)
線程池需要能夠?qū)С鲆恍┲笜?biāo)以觀測其健康狀況,如
- 列隊(duì)中任務(wù)數(shù)量
- 運(yùn)行中線程數(shù)量
C++實(shí)現(xiàn)線程池
需求
在開發(fā)一個(gè)C++線程池前,我們先明確幾點(diǎn)需求
- 線程數(shù)
- 能夠指定最小和最大線程數(shù)
- 任務(wù)等待多久才開始創(chuàng)建新線程
- 線程空閑多久才開始回收線程
- 阻塞隊(duì)列
- 用來存儲(chǔ)任務(wù)
- 當(dāng)隊(duì)列為空時(shí),從中獲取任務(wù)的操作會(huì)被阻塞直到有其他線程寫入新任務(wù)
- 當(dāng)隊(duì)列已滿時(shí),將任務(wù)寫入隊(duì)列的操作會(huì)被阻塞直到有其他線程從隊(duì)列中獲取任務(wù)
- 任務(wù)
- 能夠獲取任務(wù)執(zhí)行后的返回值
- 不固化任務(wù)的函數(shù)簽名
- 任務(wù)能夠延遲執(zhí)行
明確需求后,線程池的設(shè)計(jì)就比較清晰了

完整代碼
https://github.com/ikenchina/thread_pool
阻塞隊(duì)列
先定義隊(duì)列的接口,用來存儲(chǔ)任務(wù)
class Queue
{
public:
virtual void Push(const Task& task) = 0;
virtual void Pop() = 0;
virtual Task Front() = 0;
virtual bool Empty() = 0;
virtual size_t Size() = 0;
virtual void Clear() = 0;
virtual bool Full() = 0;
};
實(shí)現(xiàn)普通隊(duì)列和優(yōu)先級(jí)隊(duì)列
class FifoQueue : public Queue
class PriorityQueue : public Queue
再定義阻塞隊(duì)列的接口
class BlockingQueue
{
public:
BlockingQueue() {Open();}
virtual StatusCode Push(const Context& ctx, const Task& task) = 0;
virtual StatusCode Pop(const Context& ctx, Task* t) = 0;
virtual size_t QueueSize() = 0;
virtual bool Empty() = 0;
virtual void Clear() = 0;
virtual void Close();
virtual void Open();
};
實(shí)現(xiàn)普通阻塞隊(duì)列,默認(rèn)使用FifoQueue 來存儲(chǔ)任務(wù)。
class BaseBlockingQueue : public BlockingQueue
{
public:
BaseBlockingQueue(std::shared_ptr<Queue> q = std::make_shared<FifoQueue>()) : BlockingQueue()
{
queue_ = q;
}
virtual StatusCode Push(const Context& ctx, const Task& task)
{
std::unique_lock<std::mutex> lock(queue_mutex_);
// 當(dāng)隊(duì)列已滿,且沒有關(guān)閉,則等待有線程從隊(duì)列中獲取任務(wù)
while (queue_->Full() && !shutdown_)
{
if (out_queue_cond_.wait_until(lock, ctx.Deadline()) == std::cv_status::timeout)
{
// 如果等待超時(shí)且超過deadline,則返回Timeout
if (std::chrono::system_clock::now() >= ctx.Deadline())
return StatusCode::Timeout;
}
}
if (shutdown_)
return StatusCode::Closed;
queue_->Push(task);
in_queue_cond_.notify_one(); // 喚醒一個(gè)線程
return StatusCode::Ok;
}
virtual StatusCode Pop(const Context& ctx, Task* t)
{
std::unique_lock<std::mutex> lock(queue_mutex_);
std::cv_status cv_status = std::cv_status::no_timeout;
// 如果隊(duì)列為空且沒有關(guān)閉,則等待(有任務(wù)寫入隊(duì)列)
if (queue_->Empty() && !shutdown_)
{
cv_status = in_queue_cond_.wait_until(lock, ctx.Deadline());
}
if (shutdown_ && queue_->Empty())
return StatusCode::Closed;
if (cv_status == std::cv_status::timeout)
return StatusCode::Timeout;
auto task = queue_->Front();
queue_->Pop();
out_queue_cond_.notify_one(); // 喚醒一個(gè)被阻塞線程
*t = task;
return StatusCode::Ok;
}
......
}
混合隊(duì)列繼承于BlockingQueue,其分別使用FifoQueue和PriorityQueue來存儲(chǔ)普通任務(wù)和延遲執(zhí)行任務(wù)。
class MixedBlockingQueue : public BlockingQueue
{
public:
MixedBlockingQueue(size_t fifo_queue_size = 1000, size_t delay_queue_size=1000) :
BlockingQueue(),
fifo_queue_(std::make_shared<FifoQueue>(fifo_queue_size)),
delay_queue_(std::make_shared<PriorityQueue>(delay_queue_size))
{
}
virtual StatusCode Push(const Context& ctx, const Task& task){
// 和BaseBlockingQueue幾乎一樣
......
}
virtual StatusCode Pop(const Context& ctx, Task* t)
{
while(true)
{
std::unique_lock<std::mutex> lock(queue_mutex_);
std::cv_status cv_status = std::cv_status::no_timeout;
if (isEmpty() && !shutdown_)
{
cv_status = in_queue_cond_.wait_until(lock, ctx.Deadline());
}
if (shutdown_ && isEmpty())
return StatusCode::Closed;
if (cv_status == std::cv_status::timeout)
return StatusCode::Timeout;
// 先從delay queue獲取,
// delay queue 對執(zhí)行時(shí)間更為敏感
if (!delay_queue_->Empty())
{
auto task = delay_queue_->Front();
if (task.exec_time < std::chrono::system_clock::now())
{
delay_queue_->Pop();
*t = task;
break;
}
// 若task的執(zhí)行時(shí)間還早,則檢查fifo queue
if (!fifo_queue_->Empty())
{
*t = fifo_queue_->Front();
fifo_queue_->Pop();
break;
}
// 等待,直到到達(dá)task執(zhí)行時(shí)間或者被其他線程喚醒
auto stat = in_queue_cond_.wait_until(lock, std::min(task.exec_time, ctx.Deadline()));
if (stat == std::cv_status::timeout)
{
if (ctx.Deadline() < std::chrono::system_clock::now())
return StatusCode::Timeout;
}
continue;
}
if (!fifo_queue_->Empty())
{
*t = fifo_queue_->Front();
fifo_queue_->Pop();
break;
}
}
out_queue_cond_.notify_one();
return StatusCode::Ok;
}
......
}
線程池配置
class ThreadPoolSettings
{
// 最小線程數(shù)
std::atomic<size_t> min_pool_size_;
// 最大線程數(shù)
std::atomic<size_t> max_pool_size_;
// 空閑線程時(shí)間:當(dāng)超過此閾值線程仍然無法獲取到任務(wù),則銷毀此線程
std::chrono::nanoseconds keepalive_time_ = std::chrono::seconds(10);
// 線程擴(kuò)容時(shí)間:當(dāng)Push任務(wù)到隊(duì)列且隊(duì)列已滿時(shí),等待超過此閾值則進(jìn)行新建一個(gè)新線程
std::chrono::nanoseconds scaleout_time_ = std::chrono::milliseconds(300);
};
線程池
class ThreadPool {
public:
// 構(gòu)造函數(shù)
// 傳入配置和阻塞隊(duì)列(默認(rèn)使用混合隊(duì)列)
ThreadPool(ThreadPoolSettings settings,
std::shared_ptr<BlockingQueue> queue = std::make_shared<MixedBlockingQueue>());、
// 禁止拷貝
ThreadPool(const ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;
ThreadPool & operator=(const ThreadPool &) = delete;
ThreadPool & operator=(ThreadPool &&) = delete;
// 用于動(dòng)態(tài)調(diào)整線程池配置
ThreadPoolSettings& GetSettings() {return settings_;}
}
啟動(dòng)與停止
// 啟動(dòng)線程池,創(chuàng)建最少idle 線程
void Start();
// 停止線程池,回收所有線程
// 如果force=false:等待所有任務(wù)執(zhí)行完成,再回收線程
// 如果force=true:等待正在執(zhí)行的任務(wù)執(zhí)行完,再回收線程
void Stop(bool force = false);
提交任務(wù)
template<class F, class... Args>
auto ScheduleTask(const Context& ctx, F&& f, Args&&... args)
-> std::tuple<StatusCode, std::future<typename std::result_of<F(Args...)>::type>>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
auto tt = Task([task](){(*task)();});
auto ret = scheduleTask(ctx, tt);
return std::make_tuple(ret, std::move(res));
}
- 函數(shù)簽名
- 參數(shù):利用可變模板來支持任意形參
- 返回值:函數(shù)返回一個(gè)
std::tuple,- 第一個(gè)元素是狀態(tài)碼StatusCode,
- 第二個(gè)元素是任務(wù)返回值的std::future
StatusCode ThreadPool::scheduleTask(const Context& ctx, Task& task)
{
StatusCode res = StatusCode::Ok;
while(true)
{
// Context的Deadline 和 IncreaseThreadDuration 中較小者作為 Push隊(duì)列的超時(shí)時(shí)間
Context ctx2 = Context::WithDeadline(ctx, TimeAddDuration(std::chrono::system_clock::now(), settings_.IncreaseThreadDuration()));
if (ctx.Deadline() < ctx2.Deadline())
{
ctx2.SetDeadline(ctx.Deadline());
}
res = queue_->Push(ctx2, task);
// 如果需要回收線程
if (needGcIdleWorkers())
GcIdleWorkers();
// 如果線程池還沒有線程,則新建一個(gè)
if (available_workers_size_ == 0)
{
uint expected = 0;
std::unique_lock<std::mutex> lock(thread_mutex_);
if (available_workers_size_.compare_exchange_strong(expected, 1))
CreateThread();
}
if (res != StatusCode::Timeout)
break;
// 如果超過Context的Deadline
if (std::chrono::system_clock::now() >= ctx.Deadline())
break;
// 嘗試新建一個(gè)線程
TryIncreaseThreads();
}
return res;
}
使用
int main(int argc, char* argv[])
{
ThreadPoolSettings settings;
auto increase_duration = std::chrono::milliseconds(50);
settings.SetIdleThreadDuration(std::chrono::milliseconds(200)).SetMaxThreadSize(4).SetMinThreadSize(0);
settings.SetIncreaseThreadDuration(increase_duration);
ThreadPool tp(settings);
tp.Start();
auto ctx = Context::WithTimeout(Context(), std::chrono::seconds(1));
auto resp = tp.ScheduleTask(ctx, [](int a) {return a*a;}, 2);
if (std::get<0>(resp) != StatusCode::Ok)
return 1;
auto delay_resp = tp.ScheduleDelayTask(Context(), std::chrono::seconds(1), test_func, 3);
if (std::get<0>(resp) != StatusCode::Ok)
return 1;
cout << "2 * 2 = " << std::get<1>(resp).get() << endl;
cout << "3 * 3 = " << std::get<1>(delay_resp).get() << endl;
tp.Stop();
}