在UNIX系統(tǒng)所提供的經(jīng)典進(jìn)程間通信機(jī)制(IPC):管道、FIFO、消息隊列、信號量以及共享儲存。這些機(jī)制允許在同一臺計算機(jī)上運(yùn)行的進(jìn)程可以相互通信。但是當(dāng)考察到不同計算機(jī)(通過網(wǎng)絡(luò)相連)的進(jìn)程相互通信時就必須借助網(wǎng)絡(luò)通信機(jī)制(network IPC),在分布式計算環(huán)境中,為了集成分布式應(yīng)用,開發(fā)者需要對異構(gòu)網(wǎng)絡(luò)環(huán)境下的分布式應(yīng)用提供有效的通信手段。為了管理需要共享的信息,對應(yīng)用提供公共的信息交換機(jī)制是重要的。
設(shè)計分布式應(yīng)用的方法主要有:
- 遠(yuǎn)程過程調(diào)用(PRC)--分布式計算環(huán)境(DCE)的基礎(chǔ)標(biāo)準(zhǔn)成分之一;
- 對象事務(wù)監(jiān)控(OTM)--基于CORBA的面向?qū)ο蠊I(yè)標(biāo)準(zhǔn)與事務(wù)處理(TP)監(jiān)控技術(shù)的組合;
- 消息隊列(MessageQueue)--構(gòu)造分布式應(yīng)用的松耦合方法;
- 消息隊列的API調(diào)用被嵌入到新的或現(xiàn)存的應(yīng)用中,通過消息發(fā)送到內(nèi)存或基于磁盤的隊列或從它讀出而提供信息交換。消息隊列可用在應(yīng)用中以執(zhí)行多種功能,比如要求服務(wù)、交換信息或異步處理等。
項目目標(biāo):簡單實(shí)現(xiàn)精簡版消息中間件
項目任務(wù):
- 作為msgsnd系列的擴(kuò)展服務(wù)
- 使msg系列支持跨ip通信能力,而其他使用msg系列接口的應(yīng)用無感知
項目分析:
- 某client向相應(yīng)key值的消息隊列發(fā)送消息
- 服務(wù)器在定時器的作用下每1000微秒,就讀取自身管理的消息隊列數(shù)據(jù)
- 讀取數(shù)據(jù)根據(jù)消息的type值和服務(wù)器配置文件,來決定將消息發(fā)送到哪一個服務(wù)器上
- 當(dāng)服務(wù)器收到遠(yuǎn)程發(fā)送來消息的時候,讀取該消息的type值,判斷該type是否需要繼續(xù)轉(zhuǎn)發(fā),如果是本服務(wù)器接受的數(shù)據(jù)是就保留數(shù)據(jù),否則就轉(zhuǎn)發(fā)。
技術(shù)要點(diǎn):
- 進(jìn)程間通信 - 消息隊列
- TCP socket
- epoll的IO復(fù)用
- 自定義通信協(xié)議
一、進(jìn)程間通信 - 消息隊列
消息隊列的消息的鏈接表,儲存在內(nèi)核中,有消息隊列標(biāo)示符標(biāo)示。
1、有關(guān)消息隊列的相關(guān)函數(shù)
-
int msgget(key_t key, int flag);
msgget用于創(chuàng)建一個新隊列或打開一個現(xiàn)有隊列。 -
int msgsnd(int msqid, const void * ptr, size_t nbytes, int flag);
msgsnd將新消息添加到隊列尾端。每一個消息包含一個正的長整形類型的字段、一個非負(fù)的長度以及實(shí)際數(shù)據(jù)字節(jié)數(shù),所有這些都在將消息添加到隊列時,傳送給msgsnd。 -
int msgctl(int msqid, int cmd, struct msqid_ds * buf);
對隊列執(zhí)行cmd操作,例如:IPC_STAT(讀取消息)、IPC_SET(設(shè)置消息)、IPC_RMID(刪除消息) -
ssize_t msgrcv(int msqid, void * ptr, size_t nbytes, long type,int flag);
msgrcv用于從隊列中取消息,我們不一定要已先進(jìn)先出次序取消息,也可以按照消息的類型字段取消息。
注意點(diǎn)
: 這里唯一值得提醒的是函數(shù)msgget中的key值,每個內(nèi)核中的IPC(進(jìn)程間通信)結(jié)構(gòu)都用一個非負(fù)整數(shù)的標(biāo)示符加以引用,只需知道其隊列標(biāo)示符。與文件描述符不同,IPC標(biāo)示符不是小的整數(shù)。當(dāng)IPC結(jié)構(gòu)被創(chuàng)建,然后又被刪除時,與這種結(jié)構(gòu)相關(guān)的標(biāo)示符連續(xù)加1,知道達(dá)到一個整型數(shù)的最大值,然后又轉(zhuǎn)到0。每一個IPC對象都與一個鍵(key)現(xiàn)關(guān)聯(lián)。
當(dāng)我們啟動這個服務(wù)后,肯定是通過讀取配置文件來確定讀取哪一個key值的消息隊列,本項目使用的是libxml來解析的xml配置文件保存在一個存有配置文件結(jié)構(gòu)體的數(shù)組里。通過配置文件我們可以獲知消息的轉(zhuǎn)發(fā)路徑(和路由表相似)配置文件結(jié)構(gòu)如下:
<?xml version="1.0" encoding="UTF-8"?>
<msgTypeToipInfo>
<listenPort>8787</listenPort> <!-- 本服務(wù)監(jiān)聽的端口號 -->
<ip>10.81.12.240</ip> <!-- 本服務(wù)監(jiān)聽的IP地址 -->
<alarmSeconds>1000</alarmSeconds> <!-- 取隊列的刷新時間 微秒-->
<mesgqKey>10000</mesgqKey> <!-- 本服務(wù)維護(hù)的消息隊列的key值 -->
<msg type="101"> <!-- 消息key值為101時,發(fā)送ip為10.81.12.240 端口號為8787的服務(wù)器-->
<ipAddr>10.81.12.240</ipAddr>
<portNum>8787</portNum>
</msg>
<msg type="102">
<ipAddr>10.81.12.240</ipAddr>
<portNum>8787</portNum>
</msg>
......
</msgTypeToipInfo>
在該配置文件中只監(jiān)聽了一臺服務(wù)器,如有需監(jiān)聽多臺服務(wù)器添加即可。。。
2、定時器刷新
函數(shù)alarm設(shè)置的定時器只能精確到秒,而以下函數(shù)理論上可以精確到微妙:
#include <sys/select.h>
#include <sys/itimer.h>
int getitimer(int which, struct itimerval *value);
int setitimer(int which, const struct itimerval *value, struct itimerval *ovalue);
函數(shù)setitimer可以提供三種定時器,它們相互獨(dú)立,任意一個定時完成都將發(fā)送定時信號到進(jìn)程,并且自動重新計時。參數(shù)which確定了定時器的類型,如表所示:
| 取值 | 含義 | 信號發(fā)送 |
|---|---|---|
| ITIMER_REAL | 定時真實(shí)時間,與alarm類型相同。 | SIGALRM |
| ITIMER_VIRT | 定時進(jìn)程在用戶態(tài)下的實(shí)際執(zhí)行時間。 | SIGVTALRM |
| ITIMER_PROF | 定時進(jìn)程在用戶態(tài)和核心態(tài)下的實(shí)際執(zhí)行時間 | SIGPROF |
這三種定時器定時完成時給進(jìn)程發(fā)送的信號各不相同.
- ITIMER_REAL類定時器發(fā)送SIGALRM信號,
- ITIMER_VIRT類定時器發(fā)送SIGVTALRM信號,
- ITIMER_REAL類定時器發(fā)送SIGPROF信號。
函數(shù)alarm本質(zhì)上設(shè)置的是低精確、非重載的ITIMER_REAL類定時器,它只能精確到秒,并且每次設(shè)置只能產(chǎn)生一次定時。函數(shù)setitimer設(shè)置的定時器則不同,它們不但可以計時到微妙(理論上),還能自動循環(huán)定時。在一個Unix進(jìn)程中,不能同時使用alarm和ITIMER_REAL類定時器。
//結(jié)構(gòu)itimerval描述了定時器的組成:
struct itimerval
{
struct tim. it_interval; /* 下次定時取值 */
struct tim. it_value; /* 本次定時設(shè)置值 */
}
//結(jié)構(gòu)tim.描述了一個精確到微妙的時間:
struct tim.
{
long tv_sec; /* 秒(1000000微秒) */
long tv_usec; /* 微妙 */
}
設(shè)置定時器代碼,如下:
void NoMesgQue::setRefreshTime(double seconds)
{
this->refreshTime = seconds;
//1微秒=10的-6次方秒=0.000001秒
struct itimerval value;
value.it_value.tv_sec = 0;
value.it_value.tv_usec = seconds;
value.it_interval.tv_sec = 0; //val秒
value.it_interval.tv_usec = seconds;
signal(SIGALRM, timeReady);
setitimer(ITIMER_REAL,&value,NULL);
}
設(shè)置void timeReady(int signo)為響應(yīng)函數(shù),讀取隊列中的數(shù)據(jù)
void timeReady(int signo)
{
//在mesgQueue中取數(shù)據(jù)
s_msg * rebuf = new s_msg();
NoMesgQue * pp = NoMesgQue::getInstance();
int length = sizeof(s_msg) - sizeof(long);
while( msgrcv(pp->getMsgqid(), rebuf, length, 0, IPC_NOWAIT) > 0)
{
cout << "Message : "<<rebuf->mtext << " FromType:"<<rebuf->FromType << " toType:"<<rebuf->type << endl;
int sockfd; //sockfd socket對應(yīng)的描述符
if((pp->findConfigFileToRemote(rebuf->type,&sockfd) < 0 ) || sockfd < 0)
{
cout<<"Don't find Remote Info from Config or Remote is not connecet"<<endl;
}
else
{
//包裝package信息
s_msgPackage * package = new s_msgPackage();
package->data = *rebuf;
(package->packageHead).mesgLength = sizeof(*rebuf);
//將包裝好的package信息 放到epoll中write
s_msgFdOfData * fdData = NoMesgQue::getInstance()->getMsgFdOfData();
s_msgFdOfData * thisPP = NULL;
for(int ii = 0; ii < number; ii++)
{
if (sockfd == fdData[ii].sockfd)
{
thisPP = &fdData[ii];
break;
}
}
thisPP->sockfd = sockfd;
memcpy(thisPP->data,package,sizeof(*package));
thisPP->size = sizeof(*package);
cout << "數(shù)量 " << number <<"內(nèi)容fd "<< thisPP->sockfd << endl;
}
printf("socketfd %d\n", sockfd);
}
signal(SIGALRM, timeReady);
}
在while中不斷取隊列中的數(shù)據(jù),當(dāng)msgrcv(int msqid, void * ptr, size_t nbytes, long type, int flag); 成功執(zhí)行時,內(nèi)核會更新與該消息隊列相關(guān)聯(lián)的msgid_ds結(jié)構(gòu),以指示調(diào)用者的進(jìn)程ID(msg_lrpid)和調(diào)用時間(msg_rtime),并指示隊列中的消息數(shù)減少1個。當(dāng)取到數(shù)據(jù)后通過讀取配置文件與消息type來獲取與之相連的sockfd,通過epoll來實(shí)現(xiàn)IO復(fù)用進(jìn)而發(fā)送數(shù)據(jù)。當(dāng)隊列中沒有數(shù)據(jù)時,再一次注冊信號SIGALRM,就OK了。