視頻教程:https://www.bilibili.com/video/av95994293/
本文實現(xiàn)一個完整功能的消息總線MessageBus,同時介紹下消息的處理方法。
這里定義了消息類型的枚舉MesageType,消息優(yōu)先級的枚舉MessagePriority,以及消息的結(jié)構(gòu)類Message,它包含消息類型(type),消息優(yōu)先級(priority)和消息數(shù)據(jù)(info)。同時定義了MessagePtr作為Message的指針指針類。
消息總線MessageBus的實現(xiàn)和數(shù)據(jù)總線一樣,需要有存放消息的消息鏈表,構(gòu)成鎖的互斥量和用于多線程同步的條件變量,同時也需要具有最基本的Push和Pop函數(shù)函數(shù)。
在Push函數(shù)里,會New出一個Message對象,存放需要發(fā)送的消息數(shù)據(jù),然后將這個對象的指針放入到消息鏈表中,而在Pop函數(shù)里,得到這個對象的指針,并作為初始化參數(shù)傳遞給一個MessagePtr智能指針,外部調(diào)用處理完消息后,消息對象會被釋放。
消息類型,結(jié)構(gòu)和總線實現(xiàn)
#ifndef MESSAGE_BUS_H
#define MESSAGE_BUS_H
#include<string>
#include<mutex>
#include<list>
#include<condition_variable>
enum MessageType
{
MESSAGE_NONE = 0,
MESSAGE_START,
MESSAGE_PROCESSING,
MESSAGE_DONE,
MESSAGE_EXCEPTION
};
enum MessagePriority
{
MP_LOW = 0,
MP_NORMAL,
EP_HIGH
};
class Message
{
public:
Message(): m_type(MESSAGE_NONE), m_priority(MP_NORMAL)
{
}
Message(MessageType type, std::string info, MessagePriority priority) : m_type(type), m_info(info), m_priority(priority)
{
}
~Message() {}
MessageType Type()
{
return m_type;
}
MessagePriority Priority()
{
return m_priority;
}
std::string Info()
{
return m_info;
}
private:
MessageType m_type;
MessagePriority m_priority;
std::string m_info;
};
using MessagePtr = std::shared_ptr<Message>;
class MessageBus
{
public:
static void Clear();
static void Push(Message* pMessage);
static void Push(MessageType type);
static void Push(MessageType type, std::string info);
static void Push(MessageType type, std::string info, MessagePriority priority);
static MessagePtr Pop();
private:
static std::mutex s_mt;
static std::condition_variable s_cv;
static std::list<Message*> s_queue;
};
#endif
#include "MessageBus.h"
std::mutex MessageBus::s_mt;
std::condition_variable MessageBus::s_cv;
std::list<Message*> MessageBus::s_queue;
void MessageBus::Clear()
{
std::list<Message*>::iterator it = s_queue.begin();
for (; it != s_queue.end(); it++)
{
delete *it;
}
s_queue.clear();
}
void MessageBus::Push(Message* pMessage)
{
std::unique_lock<std::mutex> lock(s_mt);
s_queue.push_back(pMessage);
s_cv.notify_all();
}
void MessageBus::Push(MessageType type)
{
Push(type, std::string(""), MessagePriority::MP_NORMAL);
}
void MessageBus::Push(MessageType type, std::string info)
{
Push(type, info, MessagePriority::MP_NORMAL);
}
void MessageBus::Push(MessageType type, std::string info, MessagePriority priority)
{
Message *pMessage = new Message(type, info, priority);
Push(pMessage);
}
MessagePtr MessageBus::Pop()
{
Message* pMessage = NULL;
{
std::unique_lock<std::mutex> lock(s_mt);
while (s_queue.empty())
{
s_cv.wait(lock);
}
pMessage = s_queue.front();
s_queue.pop_front();
}
return MessagePtr(pMessage);
}
消息的處理
首先先實現(xiàn)一個消息處理基類MessageHandler,這個類的On函數(shù)開啟消息接收,OFF函數(shù)取消消息接收。
這個類是怎么來實現(xiàn)消息處理的呢?主要依賴于這個類的messageMap成員,它是一個unorderedmap,key實現(xiàn)消息類型,value是消息響應(yīng)函數(shù),這也就將每種消息和相應(yīng)的消息響應(yīng)函數(shù)關(guān)聯(lián)在一起了。這個關(guān)聯(lián)關(guān)系是由RegisterMessageFunc函數(shù)完成的,也就是我們通常所說的注冊消息響應(yīng)函數(shù)。
在MessageHandler開啟消息接收后,首先調(diào)用繼承類的InitMessageMap函數(shù)將消息響應(yīng)函數(shù)注冊好,然后一直在MessageBus里取出消息,調(diào)用InvokeHanlderFunc函數(shù),這個函數(shù)會根據(jù)消息的類型查找到相應(yīng)的消息響應(yīng)函數(shù),然后調(diào)用該函數(shù)并把消息作為入?yún)鬟f進(jìn)該函數(shù)進(jìn)行處理。
消息處理基類MessageHandler
#ifndef MESSAGE_HANDLER_H
#define MESSAGE_HANDLER_H
#include<atomic>
#include<iostream>
#include<unordered_map>
#include"MessageBus.h"
template<typename T>
class MessageHandler
{
public:
MessageHandler() :m_running(false)
{
}
~MessageHandler()
{
OFF();
}
void OFF()
{
std::cout << "Message Handling OFF!" << std::endl;
m_running = false;
}
void ON()
{
std::cout << "Message Handling ON!" << std::endl;
m_running = true;
static_cast<T*>(this)->InitMessageMap();
while (m_running)
{
MessagePtr pMessage = MessageBus::Pop();
m_running = InvokeHanlderFunc(*pMessage);
}
}
protected:
using MessageHandlerFunc = bool (T::*)(Message&);
using MessageMap = std::unordered_map<MessageType, MessageHandlerFunc>;
MessageMap m_messageMap;
void RegisterMessageFunc(MessageType type, MessageHandlerFunc func)
{
m_messageMap[type] = func;
}
bool InvokeHanlderFunc(Message& message)
{
auto it = m_messageMap.find(message.Type());
if (it != m_messageMap.end())
{
T* pThis = static_cast<T*>(this);
return (pThis->*(it->second))(message);
}
else
{
std::cout << "Message Type: " << message.Type() << " not handler function!" << std::endl;
return false;
}
}
void Clear()
{
m_messageMap.clear();
}
private:
std::atomic_bool m_running;
};
#endif
消息處理業(yè)務(wù)類MessageProcessor
實際的的消息處理類,實現(xiàn)了一些消息響應(yīng)函數(shù),然后在InitMessageMap函數(shù)里調(diào)用了基類的RegisterMessageFunc,將這些消息響應(yīng)函數(shù)注冊到messageMap里。
#ifndef MESSAGE_PROCESSOR_H
#define MESSAGE_PROCESSOR_H
#include "MessageHandler.h"
class MessageProcessor : public MessageHandler<MessageProcessor>
{
public:
MessageProcessor();
~MessageProcessor();
void InitMessageMap();
private:
bool OnStart(Message& message);
bool OnProcessing(Message& message);
bool OnDone(Message& message);
bool OnException(Message& message);
};
#endif
#include "MessageProcessor.h"
MessageProcessor::MessageProcessor()
{
}
MessageProcessor::~MessageProcessor()
{
}
void MessageProcessor::InitMessageMap()
{
Clear();
RegisterMessageFunc(MessageType::MESSAGE_START, &MessageProcessor::OnStart);
RegisterMessageFunc(MessageType::MESSAGE_PROCESSING, &MessageProcessor::OnProcessing);
RegisterMessageFunc(MessageType::MESSAGE_DONE, &MessageProcessor::OnDone);
RegisterMessageFunc(MessageType::MESSAGE_EXCEPTION, &MessageProcessor::OnException);
}
bool MessageProcessor::OnStart(Message& message)
{
std::cout << "[Work Start]" << std::endl;
return true;
}
bool MessageProcessor::OnProcessing(Message& message)
{
std::cout << "[Processing]......" << message.Info().c_str() << "%" << std::endl;
return true;
}
bool MessageProcessor::OnDone(Message& message)
{
std::cout << "[Work Done]" << std::endl;
return false;
}
bool MessageProcessor::OnException(Message& message)
{
std::cout << "[Work Exception?。。" << std::endl;
return true;
}
調(diào)用執(zhí)行
首先在一個線程里開啟MessageProcessor對象的接收開關(guān),然后在另一個線程里往MessageBus里發(fā)送消息,首先發(fā)送START消息,然后發(fā)送PROCESSING消息,最后發(fā)送DONE消息,我們在MessageProcessor的DONE消息響應(yīng)函數(shù)ONDone中返回false,MessageProcessor停止消息接收,線程退出。
#include "MessageProcessor.h"
#include <functional>
void SendMessage()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
MessageBus::Push(MessageType::MESSAGE_START);
int i = 1;
while (i <= 10)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
MessageBus::Push(MessageType::MESSAGE_PROCESSING, std::to_string(i * 10));
i++;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
MessageBus::Push(MessageType::MESSAGE_DONE);
}
int main()
{
MessageProcessor msgProc;
std::thread thProc([&] { msgProc.ON(); });
std::thread thSend(SendMessage);
thProc.join();
thSend.join();
return 0;
}
//output
Message Handling ON!
[Work Start]
[Processing]......10%
[Processing]......20%
[Processing]......30%
[Processing]......40%
[Processing]......50%
[Processing]......60%
[Processing]......70%
[Processing]......80%
[Processing]......90%
[Processing]......100%
[Work Done]
Message Handling OFF!
程序的執(zhí)行結(jié)果顯示,消息發(fā)送和接收處理成功。