如何設(shè)計(jì)一個(gè)線程池?

  • 為什么需要線程池
  • 如何設(shè)計(jì)一個(gè)線程池
  • 用C++11實(shí)現(xiàn)一個(gè)線程池

為什么需要線程池

線程的頻繁創(chuàng)建和銷毀,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性。

線程池預(yù)先創(chuàng)建空閑的線程,程序?qū)⒁粋€(gè)任務(wù)傳給線程池,線程池就會(huì)啟動(dòng)空閑線程來執(zhí)行這個(gè)任務(wù),執(zhí)行結(jié)束以后,該線程并不會(huì)銷毀,而是再次返回線程池中成為空閑狀態(tài),等待執(zhí)行下一個(gè)任務(wù)。

使用線程池好處:

  • 降低資源消耗:通過池化技術(shù)重復(fù)利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的損耗。
  • 提高響應(yīng)速度:任務(wù)到達(dá)時(shí),無需等待線程創(chuàng)建即可立即執(zhí)行。
  • 提高線程的可管理性:線程是稀缺資源,如果無限制創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)因?yàn)榫€程的不合理分布導(dǎo)致資源調(diào)度失衡,- 降低系統(tǒng)的穩(wěn)定性。使用線程池可以進(jìn)行統(tǒng)一的分配、調(diào)優(yōu)和監(jiān)控。
  • 提供更多更強(qiáng)大的功能:線程池具備可拓展性,允許開發(fā)人員向其中增加更多的功能。比如延時(shí)定時(shí)線程池,就允許任務(wù)延期執(zhí)行或定期執(zhí)行。

設(shè)計(jì)線程池

線程池設(shè)計(jì)的思路都大同小異:將任務(wù)寫入到阻塞隊(duì)列中,然后線程池中的空閑線程從隊(duì)列中獲取任務(wù)執(zhí)行,執(zhí)行完成后再從隊(duì)列獲取新的任務(wù)執(zhí)行。

設(shè)計(jì)線程池需要考慮的幾個(gè)特性

  • 任務(wù)隊(duì)列:按照功能分為優(yōu)先級(jí)隊(duì)列,先進(jìn)后出隊(duì)列,同步隊(duì)列等等
  • 線程數(shù)量的控制
  • 線程數(shù)量增加或回收

任務(wù)隊(duì)列

線程池中任務(wù)隊(duì)列都是阻塞型的,線程從中獲取任務(wù),沒有任務(wù)則阻塞。
一般有幾種特性的隊(duì)列

  • 優(yōu)先級(jí)隊(duì)列:支持有優(yōu)先級(jí)的任務(wù)
  • 延遲隊(duì)列:支持任務(wù)可以在某個(gè)時(shí)間點(diǎn)執(zhí)行
  • 先進(jìn)后出隊(duì)列:隊(duì)列類似于棧一樣,任務(wù)先進(jìn)后出
  • 同步隊(duì)列:不存儲(chǔ)任務(wù)的隊(duì)列,將任務(wù)push到隊(duì)列中時(shí)阻塞,直到將其分配給idle線程

線程數(shù)量

多線程編程中,經(jīng)常會(huì)被問到:該設(shè)置多少個(gè)線程才合理?

如果線程數(shù)設(shè)置太多,則對線程調(diào)度,上下文切換,緩存等產(chǎn)生較大的影響;如果設(shè)置太小,則造成吞吐不夠。
而到底設(shè)置多少線程數(shù),其實(shí)和機(jī)器資源的額度和任務(wù)特征有關(guān)。
我們一般將任務(wù)分為幾種

  • 計(jì)算密集型
  • IO密集型
  • 混合型:計(jì)算與IO密集型

引用《Java Concurrency in Practice》中的公式來估算合適的線程數(shù)。

計(jì)算密集型

線程數(shù) = CPU數(shù) + 1

如果一個(gè)線程產(chǎn)生缺頁或者其他原因停止運(yùn)行,則還有一個(gè)idle線程會(huì)占用CPU,不會(huì)導(dǎo)致CPU資源浪費(fèi)。但是這只是一個(gè)相對合理的經(jīng)驗(yàn)值,并不科學(xué)。

IO密集型和混合型

圖片.png

如果希望將CPU達(dá)到指定的利用率,則按照以下公式來計(jì)算線程數(shù)


圖片.png

所以,最佳線程數(shù)并不一定是固定不變的,可以隨著任務(wù)的差異及資源利用率進(jìn)行動(dòng)態(tài)調(diào)整,這就需要線程池有動(dòng)態(tài)調(diào)整線程的能力。

線程的創(chuàng)建與回收

線程池一般會(huì)在初始化時(shí)創(chuàng)建空閑線程,或者是等到需要的時(shí)候再延遲創(chuàng)建。

線程池?cái)U(kuò)容
當(dāng)任務(wù)較多,已有線程都處于運(yùn)行狀態(tài),而已有線程數(shù)沒有達(dá)到最大線程數(shù)限制時(shí),則需要?jiǎng)?chuàng)建新的線程來執(zhí)行任務(wù)。
也會(huì)有一些觸發(fā)條件,如任務(wù)等待多久仍然沒有被調(diào)度則創(chuàng)建新線程等。

線程池縮容

當(dāng)線程處于空閑狀態(tài)一段時(shí)間后,則需要回收此空閑線程以節(jié)約資源。

線程池監(jiān)控指標(biāo)

線程池需要能夠?qū)С鲆恍┲笜?biāo)以觀測其健康狀況,如

  • 列隊(duì)中任務(wù)數(shù)量
  • 運(yùn)行中線程數(shù)量

C++實(shí)現(xiàn)線程池

需求

在開發(fā)一個(gè)C++線程池前,我們先明確幾點(diǎn)需求

  • 線程數(shù)
    • 能夠指定最小和最大線程數(shù)
    • 任務(wù)等待多久才開始創(chuàng)建新線程
    • 線程空閑多久才開始回收線程
  • 阻塞隊(duì)列
    • 用來存儲(chǔ)任務(wù)
    • 當(dāng)隊(duì)列為空時(shí),從中獲取任務(wù)的操作會(huì)被阻塞直到有其他線程寫入新任務(wù)
    • 當(dāng)隊(duì)列已滿時(shí),將任務(wù)寫入隊(duì)列的操作會(huì)被阻塞直到有其他線程從隊(duì)列中獲取任務(wù)
  • 任務(wù)
    • 能夠獲取任務(wù)執(zhí)行后的返回值
    • 不固化任務(wù)的函數(shù)簽名
    • 任務(wù)能夠延遲執(zhí)行

明確需求后,線程池的設(shè)計(jì)就比較清晰了

截圖_選擇區(qū)域_20220414081321.png

完整代碼

https://github.com/ikenchina/thread_pool

阻塞隊(duì)列

先定義隊(duì)列的接口,用來存儲(chǔ)任務(wù)

class Queue
{
public:
    virtual void Push(const Task& task) = 0;
    virtual void Pop() = 0;
    virtual Task Front() = 0;
    virtual bool Empty() = 0;
    virtual size_t Size() = 0;
    virtual void Clear() = 0;
    virtual bool Full() = 0;
};

實(shí)現(xiàn)普通隊(duì)列和優(yōu)先級(jí)隊(duì)列

class FifoQueue : public Queue
class PriorityQueue : public Queue

再定義阻塞隊(duì)列的接口

class BlockingQueue
{
public:
    BlockingQueue() {Open();}
    virtual StatusCode Push(const Context& ctx, const Task& task) = 0;
    virtual StatusCode Pop(const Context& ctx, Task* t) = 0;
    virtual size_t QueueSize() = 0;
    virtual bool Empty() = 0;
    virtual void Clear() = 0;
    virtual void Close();
    virtual void Open();
};

實(shí)現(xiàn)普通阻塞隊(duì)列,默認(rèn)使用FifoQueue 來存儲(chǔ)任務(wù)。

class BaseBlockingQueue : public BlockingQueue
{
public:
    BaseBlockingQueue(std::shared_ptr<Queue> q = std::make_shared<FifoQueue>()) : BlockingQueue()
    {
        queue_ = q;
    }

    virtual StatusCode Push(const Context& ctx, const Task& task)
    {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        // 當(dāng)隊(duì)列已滿,且沒有關(guān)閉,則等待有線程從隊(duì)列中獲取任務(wù)
        while (queue_->Full() && !shutdown_)
        {
            if (out_queue_cond_.wait_until(lock, ctx.Deadline()) == std::cv_status::timeout)
            {
                // 如果等待超時(shí)且超過deadline,則返回Timeout
                if (std::chrono::system_clock::now() >= ctx.Deadline())
                    return StatusCode::Timeout;
            }
        }
        if (shutdown_)
            return StatusCode::Closed;

        queue_->Push(task);
        in_queue_cond_.notify_one();  // 喚醒一個(gè)線程
        return StatusCode::Ok;
    }
    
    virtual StatusCode Pop(const Context& ctx, Task* t)
    {
        std::unique_lock<std::mutex> lock(queue_mutex_);        
        std::cv_status cv_status = std::cv_status::no_timeout;  
        // 如果隊(duì)列為空且沒有關(guān)閉,則等待(有任務(wù)寫入隊(duì)列)
        if (queue_->Empty() && !shutdown_)
        {
            cv_status = in_queue_cond_.wait_until(lock, ctx.Deadline());
        }
        if (shutdown_ && queue_->Empty())
            return StatusCode::Closed;
        if (cv_status == std::cv_status::timeout)
            return StatusCode::Timeout;

        auto task = queue_->Front();
        queue_->Pop();
        out_queue_cond_.notify_one();   // 喚醒一個(gè)被阻塞線程
        
        *t = task;
        return StatusCode::Ok;
    }
    
    ......     
}

混合隊(duì)列繼承于BlockingQueue,其分別使用FifoQueue和PriorityQueue來存儲(chǔ)普通任務(wù)和延遲執(zhí)行任務(wù)。

class MixedBlockingQueue : public BlockingQueue
{
public:
    MixedBlockingQueue(size_t fifo_queue_size = 1000, size_t delay_queue_size=1000) :
    BlockingQueue(),
    fifo_queue_(std::make_shared<FifoQueue>(fifo_queue_size)), 
    delay_queue_(std::make_shared<PriorityQueue>(delay_queue_size))
    {
    }
    
    virtual StatusCode Push(const Context& ctx, const Task& task){
        // 和BaseBlockingQueue幾乎一樣
        ......
    }
    
    virtual StatusCode Pop(const Context& ctx, Task* t)
    {
        while(true)
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);        
            std::cv_status cv_status = std::cv_status::no_timeout;
            if (isEmpty() && !shutdown_)
            {
                cv_status = in_queue_cond_.wait_until(lock, ctx.Deadline());
            }
            if (shutdown_ && isEmpty())
                return StatusCode::Closed;
            if (cv_status == std::cv_status::timeout)
                return StatusCode::Timeout;

            // 先從delay queue獲取,
            // delay queue 對執(zhí)行時(shí)間更為敏感
            if (!delay_queue_->Empty())
            {
                auto task = delay_queue_->Front();
                if (task.exec_time < std::chrono::system_clock::now())
                {      
                    delay_queue_->Pop();
                    *t = task;
                    break;
                }
                // 若task的執(zhí)行時(shí)間還早,則檢查fifo queue
                if (!fifo_queue_->Empty())
                {
                    *t = fifo_queue_->Front();
                    fifo_queue_->Pop();
                    break;
                }
                
                // 等待,直到到達(dá)task執(zhí)行時(shí)間或者被其他線程喚醒
                auto stat = in_queue_cond_.wait_until(lock, std::min(task.exec_time, ctx.Deadline()));
                if (stat == std::cv_status::timeout)
                {
                    if (ctx.Deadline() < std::chrono::system_clock::now())
                        return StatusCode::Timeout;
                }
                continue;
            }
            if (!fifo_queue_->Empty())
            {
                *t = fifo_queue_->Front();
                fifo_queue_->Pop();
                break;
            }
        }
        out_queue_cond_.notify_one();
        return StatusCode::Ok;
    }
    ......
}

線程池配置

class ThreadPoolSettings
{
    // 最小線程數(shù)
    std::atomic<size_t> min_pool_size_;
    
    // 最大線程數(shù)
    std::atomic<size_t> max_pool_size_;
    
    // 空閑線程時(shí)間:當(dāng)超過此閾值線程仍然無法獲取到任務(wù),則銷毀此線程
    std::chrono::nanoseconds keepalive_time_ = std::chrono::seconds(10);
    
    // 線程擴(kuò)容時(shí)間:當(dāng)Push任務(wù)到隊(duì)列且隊(duì)列已滿時(shí),等待超過此閾值則進(jìn)行新建一個(gè)新線程
    std::chrono::nanoseconds scaleout_time_ = std::chrono::milliseconds(300);
};

線程池

class ThreadPool {
public:
    // 構(gòu)造函數(shù)
    // 傳入配置和阻塞隊(duì)列(默認(rèn)使用混合隊(duì)列)
    ThreadPool(ThreadPoolSettings settings, 
    std::shared_ptr<BlockingQueue> queue = std::make_shared<MixedBlockingQueue>());、
    
    // 禁止拷貝
    ThreadPool(const ThreadPool &) = delete;
    ThreadPool(ThreadPool &&) = delete;
    ThreadPool & operator=(const ThreadPool &) = delete;
    ThreadPool & operator=(ThreadPool &&) = delete;
    
    
    // 用于動(dòng)態(tài)調(diào)整線程池配置
    ThreadPoolSettings& GetSettings() {return settings_;}
}

啟動(dòng)與停止

// 啟動(dòng)線程池,創(chuàng)建最少idle 線程
void Start();

// 停止線程池,回收所有線程
// 如果force=false:等待所有任務(wù)執(zhí)行完成,再回收線程
// 如果force=true:等待正在執(zhí)行的任務(wù)執(zhí)行完,再回收線程
void Stop(bool force = false);

提交任務(wù)

template<class F, class... Args>
auto ScheduleTask(const Context& ctx, F&& f, Args&&... args) 
    -> std::tuple<StatusCode, std::future<typename std::result_of<F(Args...)>::type>>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared<std::packaged_task<return_type()> >(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );
        
    std::future<return_type> res = task->get_future();
    auto tt = Task([task](){(*task)();});
    auto ret = scheduleTask(ctx, tt);
    return std::make_tuple(ret, std::move(res));
}
  • 函數(shù)簽名
    • 參數(shù):利用可變模板來支持任意形參
    • 返回值:函數(shù)返回一個(gè)std::tuple
      • 第一個(gè)元素是狀態(tài)碼StatusCode,
      • 第二個(gè)元素是任務(wù)返回值的std::future
StatusCode ThreadPool::scheduleTask(const Context& ctx, Task& task)
{
    StatusCode res = StatusCode::Ok;

    while(true)
    {
        // Context的Deadline 和 IncreaseThreadDuration 中較小者作為 Push隊(duì)列的超時(shí)時(shí)間
        Context ctx2 = Context::WithDeadline(ctx, TimeAddDuration(std::chrono::system_clock::now(), settings_.IncreaseThreadDuration()));
        if (ctx.Deadline() < ctx2.Deadline())
        {
            ctx2.SetDeadline(ctx.Deadline());
        }
        
        res = queue_->Push(ctx2, task);

        // 如果需要回收線程
        if (needGcIdleWorkers())
            GcIdleWorkers();

        // 如果線程池還沒有線程,則新建一個(gè)
        if (available_workers_size_ == 0)
        {
            uint expected = 0; 
            std::unique_lock<std::mutex> lock(thread_mutex_);
            if (available_workers_size_.compare_exchange_strong(expected, 1))
                CreateThread();
        }

        if (res != StatusCode::Timeout)
            break;

        // 如果超過Context的Deadline
        if (std::chrono::system_clock::now() >= ctx.Deadline())
            break;
        
        // 嘗試新建一個(gè)線程
        TryIncreaseThreads();
    }

    return res;
}

使用

int main(int argc, char* argv[])
{
    ThreadPoolSettings settings;
    auto increase_duration = std::chrono::milliseconds(50);
    settings.SetIdleThreadDuration(std::chrono::milliseconds(200)).SetMaxThreadSize(4).SetMinThreadSize(0);
    settings.SetIncreaseThreadDuration(increase_duration);

    ThreadPool tp(settings);
    tp.Start();
    
    auto ctx = Context::WithTimeout(Context(), std::chrono::seconds(1));

    auto resp = tp.ScheduleTask(ctx, [](int a) {return a*a;}, 2);
    if (std::get<0>(resp) != StatusCode::Ok)
        return 1;
    
    auto delay_resp = tp.ScheduleDelayTask(Context(), std::chrono::seconds(1), test_func, 3);
    if (std::get<0>(resp) != StatusCode::Ok)
        return 1;

    cout << "2 * 2 = " << std::get<1>(resp).get() << endl;
    cout << "3 * 3 = " << std::get<1>(delay_resp).get() << endl;

    tp.Stop();
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容