基礎(chǔ)消息隊列

在UNIX系統(tǒng)所提供的經(jīng)典進(jìn)程間通信機(jī)制(IPC):管道、FIFO消息隊列、信號量以及共享儲存。這些機(jī)制允許在同一臺計算機(jī)上運(yùn)行的進(jìn)程可以相互通信。但是當(dāng)考察到不同計算機(jī)(通過網(wǎng)絡(luò)相連)的進(jìn)程相互通信時就必須借助網(wǎng)絡(luò)通信機(jī)制(network IPC),在分布式計算環(huán)境中,為了集成分布式應(yīng)用,開發(fā)者需要對異構(gòu)網(wǎng)絡(luò)環(huán)境下的分布式應(yīng)用提供有效的通信手段。為了管理需要共享的信息,對應(yīng)用提供公共的信息交換機(jī)制是重要的。
設(shè)計分布式應(yīng)用的方法主要有:

  1. 遠(yuǎn)程過程調(diào)用(PRC)--分布式計算環(huán)境(DCE)的基礎(chǔ)標(biāo)準(zhǔn)成分之一;
  2. 對象事務(wù)監(jiān)控(OTM)--基于CORBA的面向?qū)ο蠊I(yè)標(biāo)準(zhǔn)與事務(wù)處理(TP)監(jiān)控技術(shù)的組合;
  3. 消息隊列(MessageQueue)--構(gòu)造分布式應(yīng)用的松耦合方法;
  • 消息隊列的API調(diào)用被嵌入到新的或現(xiàn)存的應(yīng)用中,通過消息發(fā)送到內(nèi)存或基于磁盤的隊列或從它讀出而提供信息交換。消息隊列可用在應(yīng)用中以執(zhí)行多種功能,比如要求服務(wù)、交換信息或異步處理等。

項目目標(biāo):簡單實(shí)現(xiàn)精簡版消息中間件

項目任務(wù):

  1. 作為msgsnd系列的擴(kuò)展服務(wù)
  2. 使msg系列支持跨ip通信能力,而其他使用msg系列接口的應(yīng)用無感知

項目分析:

  1. 某client向相應(yīng)key值的消息隊列發(fā)送消息
  2. 服務(wù)器在定時器的作用下每1000微秒,就讀取自身管理的消息隊列數(shù)據(jù)
  3. 讀取數(shù)據(jù)根據(jù)消息的type值和服務(wù)器配置文件,來決定將消息發(fā)送到哪一個服務(wù)器上
  4. 當(dāng)服務(wù)器收到遠(yuǎn)程發(fā)送來消息的時候,讀取該消息的type值,判斷該type是否需要繼續(xù)轉(zhuǎn)發(fā),如果是本服務(wù)器接受的數(shù)據(jù)是就保留數(shù)據(jù),否則就轉(zhuǎn)發(fā)。

技術(shù)要點(diǎn):

  • 進(jìn)程間通信 - 消息隊列
  • TCP socket
  • epoll的IO復(fù)用
  • 自定義通信協(xié)議

一、進(jìn)程間通信 - 消息隊列

消息隊列的消息的鏈接表儲存在內(nèi)核中,有消息隊列標(biāo)示符標(biāo)示。

1、有關(guān)消息隊列的相關(guān)函數(shù)

  • int msgget(key_t key, int flag);
    msgget用于創(chuàng)建一個新隊列打開一個現(xiàn)有隊列。
  • int msgsnd(int msqid, const void * ptr, size_t nbytes, int flag);
    msgsnd將新消息添加到隊列尾端。每一個消息包含一個正的長整形類型的字段、一個非負(fù)的長度以及實(shí)際數(shù)據(jù)字節(jié)數(shù),所有這些都在將消息添加到隊列時,傳送給msgsnd。
  • int msgctl(int msqid, int cmd, struct msqid_ds * buf);
    對隊列執(zhí)行cmd操作,例如:IPC_STAT(讀取消息)、IPC_SET(設(shè)置消息)、IPC_RMID(刪除消息)
  • ssize_t msgrcv(int msqid, void * ptr, size_t nbytes, long type,int flag);
    msgrcv用于從隊列中取消息,我們不一定要已先進(jìn)先出次序取消息,也可以按照消息的類型字段取消息

注意點(diǎn)
: 這里唯一值得提醒的是函數(shù)msgget中的key值,每個內(nèi)核中的IPC(進(jìn)程間通信)結(jié)構(gòu)都用一個非負(fù)整數(shù)的標(biāo)示符加以引用,只需知道其隊列標(biāo)示符。與文件描述符不同,IPC標(biāo)示符不是小的整數(shù)。當(dāng)IPC結(jié)構(gòu)被創(chuàng)建,然后又被刪除時,與這種結(jié)構(gòu)相關(guān)的標(biāo)示符連續(xù)加1,知道達(dá)到一個整型數(shù)的最大值,然后又轉(zhuǎn)到0。每一個IPC對象都與一個鍵(key)現(xiàn)關(guān)聯(lián)。

當(dāng)我們啟動這個服務(wù)后,肯定是通過讀取配置文件來確定讀取哪一個key值的消息隊列,本項目使用的是libxml來解析的xml配置文件保存在一個存有配置文件結(jié)構(gòu)體的數(shù)組里。通過配置文件我們可以獲知消息的轉(zhuǎn)發(fā)路徑(和路由表相似)配置文件結(jié)構(gòu)如下:

<?xml version="1.0" encoding="UTF-8"?>
<msgTypeToipInfo>
  <listenPort>8787</listenPort>         <!-- 本服務(wù)監(jiān)聽的端口號 -->
  <ip>10.81.12.240</ip>                 <!-- 本服務(wù)監(jiān)聽的IP地址 -->
  <alarmSeconds>1000</alarmSeconds>     <!-- 取隊列的刷新時間 微秒-->
  <mesgqKey>10000</mesgqKey>            <!-- 本服務(wù)維護(hù)的消息隊列的key值 -->
  <msg type="101">                      <!-- 消息key值為101時,發(fā)送ip為10.81.12.240 端口號為8787的服務(wù)器-->
    <ipAddr>10.81.12.240</ipAddr>
    <portNum>8787</portNum>
  </msg>
  <msg type="102">
    <ipAddr>10.81.12.240</ipAddr>
    <portNum>8787</portNum>
  </msg>
  ......
</msgTypeToipInfo>

在該配置文件中只監(jiān)聽了一臺服務(wù)器,如有需監(jiān)聽多臺服務(wù)器添加即可。。。

2、定時器刷新

函數(shù)alarm設(shè)置的定時器只能精確到秒,而以下函數(shù)理論上可以精確到微妙:

#include  <sys/select.h>
#include  <sys/itimer.h>
int getitimer(int which, struct itimerval *value);
int setitimer(int which, const struct itimerval *value, struct itimerval *ovalue);

函數(shù)setitimer可以提供三種定時器,它們相互獨(dú)立,任意一個定時完成都將發(fā)送定時信號到進(jìn)程,并且自動重新計時。參數(shù)which確定了定時器的類型,如表所示:

取值 含義 信號發(fā)送
ITIMER_REAL 定時真實(shí)時間,與alarm類型相同。 SIGALRM
ITIMER_VIRT 定時進(jìn)程在用戶態(tài)下的實(shí)際執(zhí)行時間。 SIGVTALRM
ITIMER_PROF 定時進(jìn)程在用戶態(tài)和核心態(tài)下的實(shí)際執(zhí)行時間 SIGPROF

這三種定時器定時完成時給進(jìn)程發(fā)送的信號各不相同.

  • ITIMER_REAL類定時器發(fā)送SIGALRM信號,
  • ITIMER_VIRT類定時器發(fā)送SIGVTALRM信號,
  • ITIMER_REAL類定時器發(fā)送SIGPROF信號。

函數(shù)alarm本質(zhì)上設(shè)置的是低精確、非重載的ITIMER_REAL類定時器,它只能精確到秒,并且每次設(shè)置只能產(chǎn)生一次定時。函數(shù)setitimer設(shè)置的定時器則不同,它們不但可以計時到微妙(理論上),還能自動循環(huán)定時。在一個Unix進(jìn)程中,不能同時使用alarm和ITIMER_REAL類定時器。

//結(jié)構(gòu)itimerval描述了定時器的組成:
struct itimerval
{
    struct tim.  it_interval;     /* 下次定時取值 */
    struct tim.  it_value;        /* 本次定時設(shè)置值 */
}
//結(jié)構(gòu)tim.描述了一個精確到微妙的時間:
struct tim.
{
    long    tv_sec;                 /* 秒(1000000微秒) */
    long    tv_usec;                 /* 微妙 */
}

設(shè)置定時器代碼,如下:

void NoMesgQue::setRefreshTime(double seconds)
{
    this->refreshTime = seconds;
    //1微秒=10的-6次方秒=0.000001秒
    struct itimerval value;
    value.it_value.tv_sec = 0;
    value.it_value.tv_usec = seconds;
    value.it_interval.tv_sec = 0; //val秒
    value.it_interval.tv_usec = seconds;
    signal(SIGALRM, timeReady);
    setitimer(ITIMER_REAL,&value,NULL);
}

設(shè)置void timeReady(int signo)為響應(yīng)函數(shù),讀取隊列中的數(shù)據(jù)

void timeReady(int signo)
{
    //在mesgQueue中取數(shù)據(jù)
    s_msg * rebuf = new s_msg();
    NoMesgQue * pp = NoMesgQue::getInstance();
    int length = sizeof(s_msg) - sizeof(long);
    while( msgrcv(pp->getMsgqid(), rebuf, length, 0, IPC_NOWAIT) > 0) 
    {
        cout << "Message : "<<rebuf->mtext  << " FromType:"<<rebuf->FromType << " toType:"<<rebuf->type << endl;

        int sockfd;           //sockfd socket對應(yīng)的描述符
        if((pp->findConfigFileToRemote(rebuf->type,&sockfd) < 0 ) || sockfd < 0)
        {
            cout<<"Don't find Remote Info from Config or Remote is not connecet"<<endl;
        }
        else
        {
            //包裝package信息
            s_msgPackage * package = new s_msgPackage();
            package->data = *rebuf;
            (package->packageHead).mesgLength = sizeof(*rebuf);

            //將包裝好的package信息 放到epoll中write
            s_msgFdOfData * fdData = NoMesgQue::getInstance()->getMsgFdOfData();

            s_msgFdOfData * thisPP = NULL;
            for(int ii = 0; ii < number; ii++)
            {
                if (sockfd == fdData[ii].sockfd)
                {
                    thisPP = &fdData[ii];
                    break;
                }
            }

            thisPP->sockfd = sockfd;
            memcpy(thisPP->data,package,sizeof(*package));
            thisPP->size = sizeof(*package);
            cout << "數(shù)量 " << number <<"內(nèi)容fd "<< thisPP->sockfd << endl;
        }

        printf("socketfd %d\n", sockfd);
    }
     signal(SIGALRM, timeReady);
}

在while中不斷取隊列中的數(shù)據(jù),當(dāng)msgrcv(int msqid, void * ptr, size_t nbytes, long type, int flag); 成功執(zhí)行時,內(nèi)核會更新與該消息隊列相關(guān)聯(lián)的msgid_ds結(jié)構(gòu),以指示調(diào)用者的進(jìn)程ID(msg_lrpid)和調(diào)用時間(msg_rtime),并指示隊列中的消息數(shù)減少1個。當(dāng)取到數(shù)據(jù)后通過讀取配置文件與消息type來獲取與之相連的sockfd,通過epoll來實(shí)現(xiàn)IO復(fù)用進(jìn)而發(fā)送數(shù)據(jù)。當(dāng)隊列中沒有數(shù)據(jù)時,再一次注冊信號SIGALRM,就OK了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容