線程
多線程的優(yōu)缺點(diǎn):
優(yōu)點(diǎn):輕量的進(jìn)程 ,線程間的通訊更迅速
缺點(diǎn):不好實(shí)現(xiàn),不能運(yùn)行在分布式系統(tǒng)上
一個(gè)線程的簡單的例子:
#include <iostream>
#include <thread>
using namespace std;
void function_1(){
cout<<"Beauty is only skin-deep"<<endl;
}
int main(){
thread t1(function_1);
t1.join();
return 0;
}
thread t1(function_1) 聲明了一個(gè)線程,同時(shí)啟動(dòng)了該線程,該線程開始執(zhí)行function_1函數(shù)。
接著我們調(diào)用了join(),該函數(shù)會(huì)將主線程阻塞,直到子線程結(jié)束后,主線程才會(huì)繼續(xù)執(zhí)行。與此作用相反的一個(gè)函數(shù)是detach,這個(gè)函數(shù)會(huì)使線程成為一個(gè)守護(hù)線程,主線程不用在乎子線程是否執(zhí)行結(jié)束,會(huì)直接往下運(yùn)行。每個(gè)線程只能detach或者join一次。我們可以通過joinable函數(shù)來判斷該線程是否可以執(zhí)行join函數(shù)。
如果子線程使用了主線程的資源,例如在創(chuàng)建子線程的,會(huì)以引用的方式來使用主線程的數(shù)據(jù),主線程要保證這些數(shù)據(jù)活的夠久。
class Fctor{
public:
void operator()(){
for(int ii = 0; i>-100; i++){
cout<<"from t1: "<<i<<endl;
}
}
};
class Foo{
public:
void print(){
//do something
}
};
thread t1(function_1);
void function_1(){
cout<<"Beauty is only skin-deep"<<endl;
}
thread t1(function_1); //調(diào)用普通函數(shù)
Factor f;
thread t2(f); //調(diào)用仿函數(shù)
Foo f
thread t3(&Foo::print, &f); //調(diào)用成員函數(shù)
thread的構(gòu)造函數(shù)的第一個(gè)參數(shù)是callable對象,函數(shù)指針,functor或者lambda表達(dá)式。
如果我們這樣聲明thread t1(Fctor());,是錯(cuò)誤的,編譯器會(huì)認(rèn)為這是一個(gè)返回值為thread類型,函數(shù)名叫做t1,并且其參數(shù)為Fctor的函數(shù)聲明。解決方法是改寫為thread t1((Fctor()));。
如果想向thread傳入?yún)?shù)的話,
class Fctor{
public:
void operator()(string msg){
for(int ii = 0; i>-100; i++){
cout<<"from t1: "<<i<<endl;
}
}
};
int main(){
string s = "Where there is no trust, there is no love";
std::thread t1((Fctor()), s); // 值傳遞
//std::thread t1((Fctor()), std::ref(s)); //引用傳遞
cout<<"From main: "<<s<<endl;
t1.join();
return 0;
}
thread 不能被復(fù)制,只能被move。
thread t2 = t1;// error
thread t2 = std::move(t1);//right
獲取線程id
//在線程外獲取某個(gè)線程的id
thread t1(func);
t1.get_id();
//在線程內(nèi)獲取當(dāng)前線程的id
std::this_thread::get_id();
我們應(yīng)該建立的線程數(shù)
std::thread::hardware_concurrency();
data race and mutex
首先看一個(gè)代碼示例:
#include <thread>
#include <string>
using namespace std;
void function_1(){
for(int i = 0; i>-1000; i--){
cout<<"From t1: "<<i<<endl;
}
}
int main(){
thread t1(function_1);
for(int i = 0; i<1000; i++){
cout<<"From main: "<<i<<endl;
}
t1.join();
return 0;
}
一部分執(zhí)行結(jié)果如下:

我們可以從結(jié)果看出,兩個(gè)線程交替的輸出結(jié)果。并且,在一個(gè)線程的輸出操作還未結(jié)束時(shí),就開始了另一個(gè)線程的輸出操作。兩個(gè)線程都需要訪問標(biāo)準(zhǔn)輸入輸出(cout),這就導(dǎo)致了兩個(gè)線程間的data race。
為了解決這個(gè)問題,使兩個(gè)線程能夠正常的打印出結(jié)果,我們引入mutex(互斥量)。上述的黛娜可以修改為
#include <thread>
#include <string>
#include <iostream>
#include <mutex>
using namespace std;
mutex mu;
void shared_print(string msg, int i){
mu.lock();
cout<<msg<<id<<endl;
mu.unlock();
}
void function_1(){
for(int i = 0; i>-1000; i--){
shared_print("From t1: ", i);
}
}
int main(){
thread t1(function_1);
for(int i = 0; i<1000; i++){
shared_print("From main: ", i);
}
t1.join();
return 0;
}
不會(huì)再出現(xiàn)上文中的狀況了。這就是mutex的作用。
mu的lock和unlock函數(shù)需要成對出現(xiàn),但是我們會(huì)經(jīng)常忘記調(diào)用unlock,借助C++的RAII機(jī)制,標(biāo)準(zhǔn)庫給我門提供了一個(gè)類,std::lock_guard,上面的shared_print函數(shù)可以被修改為
void shared_print(string msg, int i){
std::lock_guard<mutex> guard(mu);
cout<<msg<<i<<endl;
}
這里還存在一個(gè)問題,就是cout是全局的,其他的線程仍然可以直接調(diào)用cout,還是存在data race。在一個(gè)系統(tǒng)中,我們應(yīng)該規(guī)定大家使用統(tǒng)一的函數(shù)來調(diào)用需要被互斥量保護(hù)的資源。
死鎖 dead lock
#include <thread>
#include <string>
#include <iostream>
#include <mutex>
using namespace std;
class LogFile{
public:
LogFile(){}
void shared_print1(string msg, int i){
std::lock_guard<mutex> guard1(mu1);
std::lock_guard<mutex> guard2(mu2);
cout<<msg<<i<<endl;
}
void shared_print2(string msg, int i){
std::lock_guard<mutex> guard2(mu2);
std::lock_guard<mutex> guard1(mu1);
cout<<msg<<i<<endl;
}
private:
mutex mu1;
mutex mu2;
};
void function_1(LogFile& log){
for(int i = 0; i>-1000; i--){
log.shared_print1("From t1: ", i);
}
}
int main(){
LogFile log;
thread t1(function_1, std::ref(log));
for(int i = 0; i<1000; i++){
log.shared_print2("From main: ", i);
}
t1.join();
return 0;
}
結(jié)果運(yùn)行如下:

我們可以看到,代碼在輸出完From main: 428之后卡死了,這就是發(fā)生了死鎖。
為了避免死鎖,我們希望所有的互斥量都能夠以相同的順序被調(diào)用。
標(biāo)準(zhǔn)庫提供了std::lock函數(shù)來幫助我們同時(shí)鎖住多個(gè)mutex,上面的代碼就可以被改寫為
class LogFile{
public:
LogFile(){}
void shared_print1(string msg, int i){
std::lock(mu1, mu2);
std::lock_guard<mutex> guard1(mu1, std::adopt_lock);
std::lock_guard<mutex> guard2(mu2, std::adopt_lock);
cout<<msg<<i<<endl;
}
void shared_print2(string msg, int i){
std::lock(mu1, mu2);
std::lock_guard<mutex> guard1(mu1, std::adopt_lock);
std::lock_guard<mutex> guard2(mu2, std::adopt_lock);
cout<<msg<<i<<endl;
}
private:
mutex mu1;
mutex mu2;
};
參數(shù)std::adopt_lock的含義是將lock這件事轉(zhuǎn)交給std::lock來做,unlock由guard來做,其實(shí)就是將mutex的所有權(quán)轉(zhuǎn)交了一下。
為了避免死鎖,我們應(yīng)該做到:
- 盡量只lock一個(gè)mutex;
- 避免lock一個(gè)mutex后再調(diào)用一個(gè)用戶提供的函數(shù);
- 使用std::lock來lock多個(gè)mutex;
- 以同樣的順序來lock 多個(gè)mutex;
unique_lock lazy initialization
class LogFile{
std::mutex _mu;
std::ofstream _f;
public:
LogFile(){
_f.open("log.txt");
}
void shared_printf(string id, int value){
std::unique_lock<mutex> locker(_mu, std::defer);
//do something else
locker.lock();
_f<<"From "<<id<<": "<<value<<std::endl;
locker.unlock();
locker.lock();
//do something
locker.unlock();
}
};
std::unique_lock 的作用跟std::lock_guard類似,但是更加的靈活。std::unique_lock可以上鎖解鎖多次,還可以使用std::defer在適當(dāng)?shù)臅r(shí)候調(diào)用lock函數(shù)來上鎖,而不是在構(gòu)造函數(shù)中直接上鎖。
std::unique_lock 可以被move,但是lock_guard 不行。
std::unique_lock<mutex> locker2 = std::move(locker);
更靈活意味著std::unique_lock 比 std::lock_guard 復(fù)雜,在某些簡單的場景下,可能后者是更好的選擇。
在上面的例子中,存在下面這種情況,就是我們可能從始至終都沒有用到shared_print,但是我們卻已經(jīng)將文件打開了,此時(shí)打開文件不是必須的。
對于這種情況,我們的解決方案是lazy initialization。我們可以將上面的代碼改為
class LogFile{
std::mutex _mu;
std::ofstream _f;
std::once_flag _flag;
public:
LogFile(){}
void shared_printf(string id, int value){
std::call_once(_flag, [&]{_f.open("log.txt");});
std::unique_lock<mutex> locker(_mu, std::defer);
//do something else
locker.lock();
_f<<"From "<<id<<": "<<value<<std::endl;
locker.unlock();
}
};
這里我們把對log文件的打開操作放到了shared_print中來,只有當(dāng)真正的需要log文件的時(shí)候,它才會(huì)被打開。但是如果我們使用
if(!_f.is_open()){
_f.open("log.txt");
}
來打開文件,會(huì)有多線程重入,多次打開文件的風(fēng)險(xiǎn),顯然是線程不安全的,即便我們對其加鎖進(jìn)行保護(hù)
if(!_f.is_open()){
std::lock_guard<mutex> another_guard(another_mutex);
_f.open("log.txt");
}
這樣仍然是線程不安全的。
正確的做法是使用標(biāo)準(zhǔn)庫提供的std::call_once 函數(shù),這個(gè)函數(shù)所做的正如它的名字一樣,只會(huì)調(diào)用一次,就避免了多線程重入的問題。
condition variable
首先我們來看一下經(jīng)典的生產(chǎn)者消費(fèi)者模型:
std::deque<int> q;
std::mutex mu;
std::condition_variable cond;
void produce(){
int count = 10;
while(count>0){
std::unique_lock<mutex> locker(mu);
q.push_front(count);
locker.unlock();
cond.notify_one();
std::this_thread::sleep_for(chrono::seconds(1));
count--;
}
}
void consume(){
int data = 0;
while(data!=1){
std::unique_lock<mutex> locker(mu);
cond.wait(locker);
data = q.back();
q.pop_back();
locker.unlock();
cout<<"t2 got a value from t1: "<<data<<endl;
}
}
int main(){
std::thread t1(produce);
std::thread t2(consume);
t1.join();
t2.join();
return 0;
}
這里我們使用條件變量來讓producer和consumer跑起來,當(dāng)produce往q里放入一個(gè)數(shù)據(jù)后,條件變量就會(huì)調(diào)用notify_one,用來通知一個(gè)線程來進(jìn)行操作。在consume中,執(zhí)行到cond.wait(locker)的時(shí)候,線程進(jìn)入sleep狀態(tài),直到notify_one將其喚醒,進(jìn)行操作。這樣就保證了線程安全。
實(shí)際上,即便cond.wait使線程2進(jìn)入睡眠狀態(tài),線程2還是還是會(huì)被非條件變量的notify函數(shù)喚醒,這種喚醒稱之為虛假喚醒(spurious wake)。為了避免這種情況,我們將consume中的函數(shù)修改為
cond.wait(locker, []{return !q.empty();})
如果 q 為空,t2會(huì)返回睡眠狀態(tài),非空的話就會(huì)繼續(xù)往下執(zhí)行。
std::notify_one 只會(huì)喚醒一個(gè)線程,如果想要喚醒多個(gè)線程的話,請使用notify_all。
Future and promise
#include <future>
void factorial(int N){
int res = 1;
for(int i = N; i>1; i--){
res*=i;
}
std::cout<<"Result is: "<<res<<std::endl;
}
int main(){
std::thread t1(factorial, 4);
t1.join();
return 0;
}
上述代碼只能將線程執(zhí)行的結(jié)果打印在標(biāo)準(zhǔn)輸出上,如果我們想在線程執(zhí)行結(jié)束的時(shí)候獲取到這個(gè)值,我們有下面幾種方法:
- 使用傳出參數(shù)來返回結(jié)果,代碼修改為
#include <future>
void factorial(int N, int& x){
int res = 1;
for(int i = N; i>1; i--){
res*=i;
}
x = res;
}
int main(){
int x = 0;
std::thread t1(factorial, 4, std::ref(x));
t1.join();
return 0;
}
這樣雖然能夠達(dá)到我們的目的,但是在多線程的情況下要使用互斥量和鎖來保證線程安全。
- 使用std::future來存儲運(yùn)行結(jié)果,代碼可修改為
#include <future>
int factorial(int N){
int res = 1;
for(int i = N; i>1; i--){
res*=i;
}
return res;
}
int main(){
int x = 0;
std::future<int> fu = std::async(std::launch::async, factorial, 4);
x = fu.get();
return 0;
}
std::async會(huì)啟動(dòng)一個(gè)線程來執(zhí)行factorial函數(shù),并將返回的結(jié)果放到future中。當(dāng)我們執(zhí)行了fu.get() 之后,如果再次執(zhí)行的話,會(huì)導(dǎo)致程序崩潰。
我們來稍微詳細(xì)的講一下std::async的第一個(gè)參數(shù),實(shí)際上,這個(gè)參數(shù)的取值有四種
(1)不使用該參數(shù),等同于下面的第四種
(2)該參數(shù)取值為std::launch::async,則std::async會(huì)創(chuàng)建一個(gè)新的線程來執(zhí)行factorial函數(shù)。
(3)該參數(shù)的取值為std::launch::deferred,則std::async不會(huì)創(chuàng)建新的線程,而是lazy evaluation。也就是說,只有當(dāng)fu.get()被執(zhí)行的時(shí)候,factorial才會(huì)被執(zhí)行。
(4)該參數(shù)的取值為 std::launch::async | std::launch::deferred,它的行為可能是第二種情況,也可能是第三種情況,這是實(shí)現(xiàn)相關(guān)的。
接下來是std::promise,其中文意思為承諾,是對誰的承諾呢,當(dāng)然是對未來(std::future)的承諾。
#include <future>
int factorial(std::future<int>& f){
int res = 1;
int N = f.get();
for(int i = N; i>1; i--){
res*=i;
}
return res;
}
int main(){
int x = 0;
std::promise<int> p;
std::future<int> f = p.get_future();
std::future<int> fu = std::async(std::launch::async, factorial, std::ref(f));
p.set_value(4);
x = fu.get();
return 0;
}
在main函數(shù)中,我們先給了一個(gè)承諾,這個(gè)承諾是對future f的,然后我們在需要的時(shí)候給promise一個(gè)值,這個(gè)值就會(huì)被future獲取到。
這個(gè)例子中,promise和future的用法其實(shí)是為了從主線程向子線程中傳遞參數(shù)。
傳入到自線程中的future,我們使用的是以引用的方式傳值,這是因?yàn)閒uture是不可以拷貝的,只能move。
但是,會(huì)有這樣一種情況,就是多個(gè)線程都會(huì)使用同樣的future的值,這時(shí)候難道我們需要聲明多個(gè)含有同樣值的future么?標(biāo)準(zhǔn)庫給出了std::shared_future來解決這個(gè)問題,代碼如下:
#include <future>
int factorial(std::shared_future<int> sf){
int res = 1;
int N = sf.get();
for(int i = N; i>1; i--){
res*=i;
}
return res;
}
int main(){
int x = 0;
std::promise<int> p;
std::future<int> f = p.get_future();
std::shared_future<int> sf = f.share();
std::future<int> fu1 = std::async(std::launch::async, factorial, sf);
std::future<int> fu2 = std::async(std::launch::async, factorial, sf);
std::future<int> fu3 = std::async(std::launch::async, factorial, sf);
std::future<int> fu4 = std::async(std::launch::async, factorial, sf);
p.set_value(4);
return 0;
}
從上面的代碼中我們就能夠看出,shared_future是可以拷貝的。
packaged_task
Packaged task就是對可調(diào)用的對象進(jìn)行了封裝,然后使得這些可調(diào)用對象異步的執(zhí)行:
int factorial(int N){
int res = 1;
for(int i = N; i>1; i--){
res*=i;
}
cout<<"Result is : "<<res<<endl;
return res;
}
std::deque<std::packaged_task<int()> >task_q;
void thread_1(){
std::packaged_task<int()> t;
t = std::move(task_q.front());
t();
}
int main(){
std::thread t1(thread_1);
std::packaged_task<int()> t(std::bind(factorial, 6));
std::future<int> fu = t.get_future();
task_q.push_back(t);
cout<<fu.get()<<endl;
t1.join();
return 0;
}
上述代碼中,通過packaged_task 對factorial 進(jìn)行封裝,并使其在子線程中運(yùn)行,并通過future獲取到運(yùn)行結(jié)果。
packaged_task 的不同之處在于,線程構(gòu)建了就會(huì)開始進(jìn)行,而packaged_task可以等到我們需要的時(shí)候再去執(zhí)行。
NOTE:上述的代碼只是示例packaged_task的用法,并沒有對data race進(jìn)行處理。