OpenThread
OpenThread是最舒心的跨平臺(tái)多線程并發(fā)庫(kù),多線程三大設(shè)計(jì)模式: Await模式, Factory模式和Actor模式。
使用優(yōu)雅的方式,創(chuàng)建線程、管理線程和線程間通信,從而實(shí)現(xiàn)多核并發(fā)。
OpenThread無(wú)任何依賴(lài),全平臺(tái)設(shè)計(jì),只有兩個(gè)源文件,讓小白都可以輕松玩轉(zhuǎn)C++多線程開(kāi)發(fā)。
OpenLinyou項(xiàng)目設(shè)計(jì)跨平臺(tái)服務(wù)器框架,在VS或者XCode上寫(xiě)代碼,無(wú)需任何改動(dòng)就可以編譯運(yùn)行在Linux上,甚至是安卓和iOS.
OpenLinyou:https://github.com/openlinyou
https://gitee.com/linyouhappy
跨平臺(tái)支持
Windows、linux、Mac、iOS、Android等跨平臺(tái)設(shè)計(jì)
多線程開(kāi)發(fā)三大設(shè)計(jì)模式
- Await模式。兩條線程,一條線程向另一條線程請(qǐng)求,同時(shí)阻塞等待;另一條線程接收到請(qǐng)求,返回?cái)?shù)據(jù)喚醒第一條線程;第一條線程喚醒,拿到數(shù)據(jù)繼續(xù)執(zhí)行。
- Worker模式。適合客戶(hù)端,創(chuàng)建一定量的worker線程,組成factory,向外提供唯一接口服務(wù)。
- Actor模式。適合服務(wù)端,一條線程一條Actor,不同的Actor負(fù)責(zé)不同的功能。
1.Await模式
在主線程創(chuàng)建OpenSyncReturn對(duì)象,把它發(fā)給子線程,并阻塞等待子線程返回。
子線程接到該消息后,再發(fā)消息喚醒,再發(fā)OpenSync對(duì)象給主線程,等待主線程響應(yīng)。
主線程線程被喚醒后,收到子線程消息攜帶的OpenSync對(duì)象,喚醒子線程。
#include <assert.h>
#include <iostream>
#include <stdio.h>
#include "openthread.h"
using namespace open;
// Test1
struct TestData
{
std::string data_;
};
struct Test1Data
{
std::string data_;
OpenSync openSync_;
~Test1Data()
{
printf("Test1:~Test1Data\n");
}
};
// 子線程調(diào)用
void Test1Thread(OpenThreadMsg& msg)
{
//線程啟動(dòng)的消息
if (msg.state_ == OpenThread::START)
{
printf("Test1Thread[%s] START\n", msg.name().c_str());
OpenThread::Sleep(1000);
}
//線程接收到的消息
else if (msg.state_ == OpenThread::RUN)
{
// //接收主線程的OpenSyncReturn對(duì)象,對(duì)其喚醒并發(fā)消息。
OpenSyncReturn<TestData, Test1Data>* data = msg.edit<OpenSyncReturn<TestData, Test1Data>>();
if (data)
{
std::shared_ptr<TestData> str = data->get();
if (str)
{
assert(str->data_ == "Waiting for you!");
}
auto sptr = std::shared_ptr<Test1Data>(new Test1Data);
sptr->data_.assign("Of Course,I Still Love You!");
data->wakeup(sptr);
//等待主線程喚醒
sptr->openSync_.await();
}
OpenThread::Sleep(1000);
}
//線程退出前的消息
else if (msg.state_ == OpenThread::STOP)
{
printf("Test1Thread[%s] STOP\n", msg.name().c_str());
OpenThread::Sleep(1000);
}
}
int main()
{
// 指定線程名,并創(chuàng)建。未填函數(shù),線程未啟動(dòng)狀態(tài),需要執(zhí)行start啟動(dòng)
auto threadRef = OpenThread::Create("Test1Thread");
threadRef.start(Test1Thread);
// 給子線程發(fā)送消息
auto msg = std::shared_ptr<OpenSyncReturn<TestData, Test1Data>>(new OpenSyncReturn<TestData, Test1Data>);
{
auto data = std::shared_ptr<TestData>(new TestData);
data->data_ = "Waiting for you!";
msg->put(data);
}
threadRef.send(msg);
//阻塞主線程,等待子線程喚醒
auto ret = msg->awaitReturn();
if (ret)
{
assert(ret->data_ == "Of Course,I Still Love You!");
printf("Test1====>>:%s\n", ret->data_.c_str());
//喚醒子線程的阻塞
ret->openSync_.wakeup();
}
// 向子線程發(fā)送關(guān)閉消息
threadRef.stop();
// 等待全部線程退出
OpenThread::ThreadJoin(threadRef);
printf("Pause\n");
return getchar();
}
2.Worker設(shè)計(jì)模式
適合客戶(hù)端,創(chuàng)建一定量的worker線程,組成factory,向外提供唯一接口服務(wù)。
#include <assert.h>
#include <iostream>
#include <stdio.h>
#include <vector>
#include "openthread.h"
using namespace open;
//業(yè)務(wù)數(shù)據(jù)結(jié)構(gòu)
struct Product
{
int id_;
std::string goods_;
Product():id_(0) {}
};
//OpenThread交換協(xié)議
struct ProtoTask : public OpenThreadProto
{
std::shared_ptr<Product> data_;
OpenSync openSync_;
static inline int ProtoType() { return 1; }
virtual inline int protoType() const { return ProtoTask::ProtoType(); }
};
class Worker : public OpenThreadWorker
{
//Worker工程線程Factory,提供4個(gè)worker線程。
class Factory
{
const std::vector<Worker*> vectWorker_;
public:
Factory()
:vectWorker_({
new Worker("Producer1"),
new Worker("Producer2"),
new Worker("Producer3"),
new Worker("Producer4"),
}) {}
Worker* getWorker()
{
if (vectWorker_.empty()) return 0;
return vectWorker_[std::rand() % vectWorker_.size()];
}
};
static Factory Instance_;
// Worker
Worker(const std::string& name)
:OpenThreadWorker(name)
{
mapHandle_[ProtoTask::ProtoType()] = (OpenThreadHandle)&Worker::makeProduct;
uid_ = 1;
start();
}
~Worker()
{
for (size_t i = 0; i < vectTask_.size(); ++i)
{
vectTask_[i].openSync_.wakeup();
}
}
//生產(chǎn)產(chǎn)品
void makeProduct(const ProtoTask& proto)
{
vectTask_.push_back(proto);
if (rand() % 2 == 0)
{
OpenThread::Sleep(1000);
}
for (size_t i = 0; i < vectTask_.size(); ++i)
{
auto& task = vectTask_[i];
if (task.data_)
{
task.data_->id_ = pid_ + 100 * uid_++;
task.data_->goods_ = name_ + " Dog coin" + std::to_string(task.data_->id_);
}
task.openSync_.wakeup();
}
vectTask_.clear();
}
int uid_;
std::vector<ProtoTask> vectTask_;
public:
//對(duì)外服務(wù)統(tǒng)一接口
static bool MakeProduct(std::shared_ptr<Product>& product)
{
auto worker = Instance_.getWorker();
if (!worker) return false;
auto proto = std::shared_ptr<ProtoTask>(new ProtoTask);
proto->data_ = product;
bool ret = worker->send(-1, proto);
assert(ret);
proto->openSync_.await();
return ret;
}
};
Worker::Factory Worker::Instance_;
void TestThread(OpenThreadMsg& msg)
{
if (msg.state_ == OpenThread::START)
{
for (size_t i = 0; i < 100; i++)
{
auto product = std::shared_ptr<Product>(new Product());
Worker::MakeProduct(product);
printf("[%s] Recevie Product:%s\n", msg.name().c_str(), product->goods_.c_str());
}
msg.thread().stop();
}
}
int main()
{
//創(chuàng)建4條測(cè)試線程
OpenThread::Create("TestThread1", TestThread);
OpenThread::Create("TestThread2", TestThread);
OpenThread::Create("TestThread3", TestThread);
OpenThread::Create("TestThread4", TestThread);
// wait stop
OpenThread::ThreadJoinAll();
printf("Pause\n");
return getchar();
}
3.Actor設(shè)計(jì)模式
Actor模式。適合服務(wù)端,一條線程一條Actor,不同的Actor負(fù)責(zé)不同的功能。
用Worker類(lèi)封裝使用OpenThread,一條線程一個(gè)Worker業(yè)務(wù)。Inspector(監(jiān)控)、Timer(定時(shí)器)和Server(服務(wù)器)繼承Worker。
Inspector負(fù)責(zé)監(jiān)控多個(gè)Timer運(yùn)行信息,做負(fù)載均衡。
Timer提供定時(shí)器服務(wù),啟動(dòng)時(shí),向Inspector注冊(cè),并提供運(yùn)行信息。
Server向Inspector查詢(xún)可用的Timer,然后向此Timer請(qǐng)求定時(shí)服務(wù)。
#include <assert.h>
#include <iostream>
#include <stdio.h>
#include <map>
#include <unordered_map>
#include "openthread.h"
using namespace open;
class ProtoBuffer : public OpenThreadProto
{
void* data_;
public:
int dataType_;
ProtoBuffer()
: OpenThreadProto()
,dataType_(0)
,data_(0){}
virtual ~ProtoBuffer() { if (data_) delete data_; }
template <class T>
inline T& data()
{
T* t = 0;
if (data_)
{
t = dynamic_cast<T*>((T*)data_);
if (data_ == t) return *t;
delete data_;
}
t = new T;
data_ = t;
return *t;
}
template <class T>
inline T& data() const
{
if (data_)
{
T* t = dynamic_cast<T*>((T*)data_);
if (data_ == t) return *t;
}
assert(false);
static T t;
return t;
}
static inline int ProtoType() { return (int)(uintptr_t) & (ProtoType); }
virtual inline int protoType() const { return ProtoBuffer::ProtoType(); }
};
struct ProtoLoop : public OpenThreadProto
{
int type_;
ProtoLoop() :type_(-1) {}
static inline int ProtoType() { return (int)(uintptr_t) & (ProtoType); }
virtual inline int protoType() const { return ProtoLoop::ProtoType(); }
};
struct TimerEventMsg
{
int workerId_;
int64_t deadline_;
TimerEventMsg() : workerId_(0), deadline_(0) {}
};
struct TimerInfoMsg
{
int workerId_;
size_t leftCount_;
int64_t cpuCost_;
int64_t dataTime_;
TimerInfoMsg() : workerId_(0), leftCount_(0), cpuCost_(0), dataTime_(0) {}
};
enum EMsgId
{
query_timer_info,
get_timer_info,
request_timer,
};
class Inspector : public OpenThreadWorker
{
std::unordered_map<std::string, TimerInfoMsg> mapTimerInfo_;
std::vector<int> vectQueryId;
public:
Inspector(const std::string& name):OpenThreadWorker(name)
{
registers(ProtoLoop::ProtoType(), (OpenThreadHandle)&Inspector::onProtoLoop);
registers(ProtoBuffer::ProtoType(), (OpenThreadHandle)&Inspector::onProtoBuffer);
}
virtual void onStart() {}
private:
void onProtoLoop(const ProtoLoop& proto)
{
printf("Inspector::onProtoLoop[%s]Recevie<<==[%s]\n", name_.c_str(), proto.srcName_.c_str());
std::vector<int> vectPid;
vectPid.reserve(mapTimerInfo_.size());
for (auto iter = mapTimerInfo_.begin(); iter != mapTimerInfo_.end(); iter++)
{
if (iter->second.workerId_ >= 0)
vectPid.push_back(iter->second.workerId_);
}
auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
root->dataType_ = get_timer_info;
send(vectPid, root);
}
void onProtoBuffer(const ProtoBuffer& proto)
{
printf("Inspector::onProtoBuffer[%s]Recevie<<==[%s]\n", name_.c_str(), proto.srcName_.c_str());
if (proto.dataType_ == get_timer_info)
{
auto& msg = proto.data<TimerInfoMsg>();
auto& timerInfo = mapTimerInfo_[proto.srcName_];
timerInfo = msg;
if (!vectQueryId.empty())
{
auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
root->dataType_ = query_timer_info;
auto& info = root->data<TimerInfoMsg>();
info = timerInfo;
send(vectQueryId, root);
vectQueryId.clear();
}
}
else if (proto.dataType_ == query_timer_info)
{
TimerInfoMsg* tmpInfo = 0;
auto curTime = OpenThread::MilliUnixtime();
for (auto iter = mapTimerInfo_.begin(); iter != mapTimerInfo_.end(); iter++)
{
auto& info = iter->second;
if (curTime > info.dataTime_ + 10000) continue;
if (tmpInfo)
{
if (tmpInfo->leftCount_ > info.leftCount_ || tmpInfo->cpuCost_ > info.cpuCost_)
tmpInfo = &info;
}
else
{
tmpInfo = &info;
}
}
if (!tmpInfo)
{
vectQueryId.push_back(proto.srcPid_);
auto root = std::shared_ptr<ProtoLoop>(new ProtoLoop);
sendLoop(root);
}
else
{
auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
root->dataType_ = query_timer_info;
auto& info = root->data<TimerInfoMsg>();
info = *tmpInfo;
send(proto.srcPid_, root);
}
}
}
};
class Timer:public OpenThreadWorker
{
int inspectorId_;
std::multimap<int64_t, int> mapTimerEvent_;
public:
Timer(const std::string& name):OpenThreadWorker(name)
{
inspectorId_ = -1;
registers(ProtoLoop::ProtoType(), (OpenThreadHandle)&Timer::onProtoLoop);
registers(ProtoBuffer::ProtoType(), (OpenThreadHandle)&Timer::onProtoBuffer);
}
protected:
virtual void onStart()
{
while (inspectorId_ < 0)
{
inspectorId_ = ThreadId("Inspector");
if (inspectorId_ >= 0)
{
auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
root->dataType_ = get_timer_info;
auto& msg = root->data<TimerInfoMsg>();
msg.workerId_ = pid();
msg.dataTime_ = OpenThread::MilliUnixtime();
msg.cpuCost_ = thread_->cpuCost();
msg.leftCount_ = thread_->leftCount();
send(inspectorId_, root);
break;
}
OpenThread::Sleep(100);
}
auto root = std::shared_ptr<ProtoLoop>(new ProtoLoop);
sendLoop(root);
}
private:
void onProtoLoop(const ProtoLoop& proto)
{
printf("Timer::onProtoLoop[%s]Recevie<<==[%s]\n", name_.c_str(), proto.srcName_.c_str());
assert(proto.srcPid_ == pid_);
int64_t curTime = 0;
while (canLoop())
{
if (!mapTimerEvent_.empty())
{
curTime = OpenThread::MilliUnixtime();
while (!mapTimerEvent_.empty())
{
auto iter = mapTimerEvent_.begin();
if (curTime > iter->first)
{
auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
root->dataType_ = request_timer;
auto& msg = root->data<TimerEventMsg>();
msg.workerId_ = pid();
msg.deadline_ = curTime;
send(iter->second, root);
mapTimerEvent_.erase(iter);
}
else
{
break;
}
}
}
OpenThread::Sleep(10);
}
}
void onProtoBuffer(const ProtoBuffer& proto)
{
printf("Timer::onProtoBuffer[%s]Recevie<<==[%s]\n", name_.c_str(), proto.srcName_.c_str());
if (proto.dataType_ == get_timer_info)
{
auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
root->dataType_ = get_timer_info;
auto& msg = root->data<TimerInfoMsg>();
msg.workerId_ = pid();
msg.dataTime_ = OpenThread::MilliUnixtime();
msg.cpuCost_ = thread_->cpuCost();
msg.leftCount_ = thread_->leftCount();
send(proto.srcPid_, root);
auto sptr = std::shared_ptr<ProtoLoop>(new ProtoLoop);
sendLoop(sptr);
}
else if (proto.dataType_ == request_timer)
{
auto& msg = proto.data<TimerEventMsg>();
mapTimerEvent_.insert({ msg.deadline_, proto.srcPid_ });
auto sptr = std::shared_ptr<ProtoLoop>(new ProtoLoop);
sendLoop(sptr);
}
}
};
class Server:public OpenThreadWorker
{
int inspectorId_;
int collect_;
public:
Server(const std::string& name)
:OpenThreadWorker(name)
,inspectorId_(-1)
{
collect_ = 0;
registers(ProtoLoop::ProtoType(), (OpenThreadHandle)&Server::onProtoLoop);
registers(ProtoBuffer::ProtoType(), (OpenThreadHandle)&Server::onProtoBuffer);
}
protected:
virtual void onStart()
{
while (inspectorId_ < 0)
{
inspectorId_ = ThreadId("Inspector");
OpenThread::Sleep(10);
}
auto sptr = std::shared_ptr<ProtoLoop>(new ProtoLoop);
sendLoop(sptr);
}
private:
void onProtoLoop(const ProtoLoop& proto)
{
printf("Server::onProtoLoop[%s]Recevie<<==[%s]\n", name_.c_str(), proto.srcName_.c_str());
auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
root->dataType_ = query_timer_info;
send(inspectorId_, root);
}
void onProtoBuffer(const ProtoBuffer& proto)
{
printf("Server::onProtoBuffer[%s]Recevie<<==[%s]\n", name_.c_str(), proto.srcName_.c_str());
if (proto.dataType_ == query_timer_info)
{
auto& msg = proto.data<TimerInfoMsg>();
if (msg.workerId_ > 0)
{
auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
root->dataType_ = request_timer;
auto& event = root->data<TimerEventMsg>();
int64_t curTime = OpenThread::MilliUnixtime();
event.deadline_ = curTime + curTime % 2000;
if (event.deadline_ > curTime + 2000)
{
event.deadline_ = curTime;
}
send(msg.workerId_, root);
}
else
{
auto sptr = std::shared_ptr<ProtoLoop>(new ProtoLoop);
sendLoop(sptr);
}
}
else if (proto.dataType_ == request_timer)
{
if (collect_++ > 100)
{
OpenThread::StopAll();
return;
}
sendLoop(std::shared_ptr<ProtoLoop>(new ProtoLoop));
}
}
};
int main()
{
OpenThread::StopAll();
std::vector<OpenThreadWorker*> vectWorker =
{
new Inspector("Inspector"),
new Timer("timer1"),
new Timer("timer2"),
new Server("server1"),
new Server("server2"),
new Server("server3"),
new Server("server4")
};
for (size_t i = 0; i < vectWorker.size(); i++)
{
vectWorker[i]->start();
}
OpenThread::ThreadJoinAll();
for (size_t i = 0; i < vectWorker.size(); i++)
{
delete vectWorker[i];
}
vectWorker.clear();
printf("Pause\n");
return getchar();
}
編譯和執(zhí)行
請(qǐng)安裝cmake工具,用cmake構(gòu)建工程,可以在vs或者xcode上編譯運(yùn)行。
源代碼:https://github.com/openlinyou/openthread
https://gitee.com/linyouhappy/openthread
#克隆項(xiàng)目
git clone https://github.com/openlinyou/openthread
cd ./openthread
#創(chuàng)建build工程目錄
mkdir build
cd build
cmake ..
#如果是win32,在該目錄出現(xiàn)openthread.sln,點(diǎn)擊它就可以啟動(dòng)vs寫(xiě)代碼調(diào)試
make
./helloworld
全部源文件
- src/openthread.h
- src/openthread.cpp
技術(shù)特點(diǎn)
OpenThread的技術(shù)特點(diǎn):
- 跨平臺(tái)設(shè)計(jì),提供Linux統(tǒng)一的pthread接口,支持安卓和iOS。
- 線程池管理采用智能指針和無(wú)鎖map,實(shí)現(xiàn)高效訪問(wèn)線程對(duì)象。
- 每個(gè)線程自帶消息隊(duì)列,消息放入隊(duì)列原子鎖,而讀取消息隊(duì)列,無(wú)鎖操作。保證線程交換信息高效。
- 線程交互數(shù)據(jù),采用智能指針管理,實(shí)現(xiàn)內(nèi)存自動(dòng)化管理,無(wú)需擔(dān)憂內(nèi)存泄漏。
- 多線程三大設(shè)計(jì)模式: Await模式, Worker模式和Actor模式。