C++11多線程-異步運行(3)之最終篇(future+async)

前面兩章多次使用到std::future,本章我們就來揭開std::future廬山真面目。最后我們會引出std::async,該函數(shù)使得我們的并發(fā)調用變得簡單,優(yōu)雅。

3. std::future

前面我們多次使用std::future的get方法來獲取其它線程的結果,那么除這個方法外,std::future還有哪些方法呢。

enum class future_status
{
    ready,
    timeout,
    deferred
};
template <class R>
class future
{
public:
    // retrieving the value
    R get();
    // functions to check state
    bool valid() const noexcept;

    void wait() const;
    template <class Rep, class Period>
    future_status wait_for(const chrono::duration<Rep, Period>& rel_time) const;
    template <class Clock, class Duration>
    future_status wait_until(const chrono::time_point<Clock, Duration>& abs_time) const;

    shared_future<R> share() noexcept;
};

以上代碼去掉了std::future構造、析構、賦值相關的代碼,這些約束我們之前都講過了。下面我們來逐一了解上面這些函數(shù)。

3.1 get

這個函數(shù)我們之前一直使用,該函數(shù)會一直阻塞,直到獲取到結果或異步任務拋出異常。

3.2 share

std::future允許move,但是不允許拷貝。如果用戶確有這種需求,需要同時持有多個實例,怎么辦呢? 這就是share發(fā)揮作用的時候了。std::shared_future通過引用計數(shù)的方式實現(xiàn)了多實例共享同一狀態(tài),但有計數(shù)就伴隨著同步開銷(無鎖的原子操作也是有開銷的),性能會稍有下降。因此C++11要求程序員顯式調用該函數(shù),以表明用戶對由此帶來的開銷負責。std::shared_future允許move,允許拷貝,并且具有和std::future同樣的成員函數(shù),此處就不一一介紹了。當調用share后,std::future對象就不再和任何共享狀態(tài)關聯(lián),其valid函數(shù)也會變?yōu)閒alse。

3.3 wait

等待,直到數(shù)據(jù)就緒。數(shù)據(jù)就緒時,通過get函數(shù),無等待即可獲得數(shù)據(jù)。

3.4 wait_for和wait_until

wait_for、wait_until主要是用來進行超時等待的。wait_for等待指定時長,wait_until則等待到指定的時間點。返回值有3種狀態(tài):

  1. ready - 數(shù)據(jù)已就緒,可以通過get獲取了。
  2. timeout - 超時,數(shù)據(jù)還未準備好。
  3. deferred - 這個和std::async相關,表明無需wait,異步函數(shù)將在get時執(zhí)行。

3.5 valid

判斷當前std::future實例是否有效。std::future主要是用來獲取異步任務結果的,作為消費方出現(xiàn),單獨構建出來的實例沒意義,因此其valid為false。當與其它生產(chǎn)方(Provider)通過共享狀態(tài)關聯(lián)后,valid才會變得有效,std::future才會發(fā)揮實際的作用。C++11中有下面幾種Provider,從這些Provider可獲得有效的std::future實例:

  1. std::async
  2. std::promise::get_future
  3. std::packaged_task::get_future

既然std::future的各種行為都依賴共享狀態(tài),那么什么是共享狀態(tài)呢?

4. 共享狀態(tài)

共享狀態(tài)其本質就是單生產(chǎn)者-單消費者的多線程并發(fā)模型。無論是std::promise還是std::packaged_task都是通過共享狀態(tài),實現(xiàn)與std::future通信的。還記得我們在std::condition_variable一節(jié)給出的chan類么。共享狀態(tài)與其類似,通過std::mutex、std::condition_variable實現(xiàn)了多線程間通信。共享狀態(tài)并非C++11的標準,只是對std::promise、std::future的實現(xiàn)手段。回想我們之前的使用場景,共享狀態(tài)可能具有如下形式(c++11偽代碼):

template<typename T>
class assoc_state {
protected:
    mutable mutex mut_;
    mutable condition_variable cv_;
    unsigned state_ = 0;
    // std::shared_future中拷貝動作會發(fā)生引用計數(shù)的變化
    // 當引用計數(shù)降到0時,實例被delete
    int share_count_ = 0;
    exception_ptr exception_; // 執(zhí)行異常
    T value_;  // 執(zhí)行結果

public:
    enum {
        ready = 4,  // 異步動作執(zhí)行完,數(shù)據(jù)就緒
        // 異步動作將延遲到future.get()時調用
        // (實際上非異步,只不過是延遲執(zhí)行)
        deferred = 8,
    };

    assoc_state() {}
    // 禁止拷貝
    assoc_state(const assoc_state &) = delete;
    assoc_state &operator=(const assoc_state &) = delete;
    // 禁止move
    assoc_state(assoc_state &&) = delete;
    assoc_state &operator=(assoc_state &&) = delete;

    void set_value(const T &);
    void set_exception(exception_ptr p);
    // 需要用到線程局變存儲
    void set_value_at_thread_exit(const T &);
    void set_exception_at_thread_exit(exception_ptr p);

    void wait();
    future_status wait_for(const duration &) const;
    future_status wait_until(const time_point &) const;

    T &get() {
        unique_lock<mutex> lock(this->mut_);
        // 當_state為deferred時,std::async中
        // 的函數(shù)將在sub_wait中調用
        this->sub_wait(lock);
        if (this->_exception != nullptr)
            rethrow_exception(this->_exception);
        return _value;
    }
private:
    void sub_wait(unique_lock<mutex> &lk) {
        if (state_ != ready) {
            if (state_ & static_cast<unsigned>(deferred)) {
                state_ &= ~static_cast<unsigned>(deferred);
                lk.unlock();
                __execute();  // 此處執(zhí)行實際的函數(shù)調用
            } else {
                cv_.wait(lk, [this](){return state == ready;})
            }
        }
    }
};

以上給出了get的實現(xiàn)(偽代碼),其它部分雖然沒實現(xiàn),但assoc_state應該具有的功能,以及對std::promise、std::packaged_task、std::future、std::shared_future的支撐應該能夠表達清楚了。未實現(xiàn)部分還請讀者自行補充一下,權當是練手了。
有興趣的讀者可以閱讀llvm-libxx(https://github.com/llvm-mirror/libcxx)的源碼,以了解更多細節(jié),對共享狀態(tài)有更深掌握。

5. std::async

std::async可以看作是對std::packaged_task的封裝(雖然實際并一定如此,取決于編譯器的實現(xiàn),但共享狀態(tài)的思想是不變的),有兩種重載形式:

#define FR typename result_of<typename decay<F>::type(typename decay<Args>::type...)>::type

// 不含執(zhí)行策略
template <class F, class... Args>
future<FR> async(F&& f, Args&&... args);
// 含執(zhí)行策略
template <class F, class... Args>
future<FR> async(launch policy, F&& f, Args&&... args);

define部分是用來推斷函數(shù)F的返回值類型,我們先忽略它,以后有機再講。兩個重載形式的差別是一個含執(zhí)行策略,而另一個不含。那么什么是執(zhí)行策略呢?執(zhí)行策略定義了async執(zhí)行F(函數(shù)或可調用求對象)的方式,是一個枚舉值:

enum class launch {
    // 保證異步行為,F(xiàn)將在單獨的線程中執(zhí)行
    async = 1,
    // 當其它線程調用std::future::get時,
    // 將調用非異步形式, 即F在get函數(shù)內(nèi)執(zhí)行
    deferred = 2,
    // F的執(zhí)行時機由std::async來決定
    any = async | deferred
};

不含加載策略的版本,使用的是std::launch::any,也即由std::async函數(shù)自行決定F的執(zhí)行策略。那么C++11如何確定std::any下的具體執(zhí)行策略呢,一種可能的辦法是:優(yōu)先使用async策略,如果創(chuàng)建線程失敗,則使用deferred策略。實際上這也是Clang的any實現(xiàn)方式。std::async的出現(xiàn)大大減輕了異步的工作量。使得一個異步調用可以像執(zhí)行普通函數(shù)一樣簡單。

#include <iostream> // std::cout, std::endl
#include <future>   // std::async, std::future
#include <chrono>   // seconds
using namespace std::chrono;

int main() {
    auto print = [](char c) {
        for (int i = 0; i < 10; i++) {
            std::cout << c;
            std::cout.flush();
            std::this_thread::sleep_for(milliseconds(1));
        }
    };
    // 不同launch策略的效果
    std::launch policies[] = {std::launch::async, std::launch::deferred};
    const char *names[] = {"async   ", "deferred"};
    for (int i = 0; i < sizeof(policies)/sizeof(policies[0]); i++) {
        std::cout << names[i] << ": ";
        std::cout.flush();
        auto f1 = std::async(policies[i], print, '+');
        auto f2 = std::async(policies[i], print, '-');
        f1.get();
        f2.get();
        std::cout << std::endl;
    }

    return 0;
}

以上代碼輸出如下:

async   : +-+-+-+--+-++-+--+-+
deferred: ++++++++++----------

進行到現(xiàn)在,C++11的async算是結束了,盡管還留了一些疑問,比如共享狀態(tài)如何實現(xiàn)set_value_at_thread_exit效果。我們將會在下一章節(jié)介紹C++11的線程局部存儲,順便也解答下該疑問。

上一篇
C++11多線程-packaged_task
目錄 下一篇
線程局部存儲
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容