將生產(chǎn)消費(fèi)模式引入mod_esl_json模塊

  • 最初簡單的做法是使用單線程while死循環(huán)接收從tcp或udp客戶端發(fā)來的json數(shù)據(jù)(接收到數(shù)據(jù)——生產(chǎn)者)后,立即調(diào)用處理json數(shù)據(jù)的函數(shù)(消費(fèi)者),直到這份數(shù)據(jù)被處理完畢,才會(huì)再一次循環(huán)接收socket數(shù)據(jù)。
int loop_flag = 1;
datatype json_data;
while(loop_flag)
{
    json_data = recv_json();  //生產(chǎn)者,在別處定義聲明
    if(json_data)
    {
        parse_json( json_data );  //消費(fèi)者,在別處定義聲明
    }
}
  • 這樣做帶來的問題是,可能在處理json數(shù)據(jù)(消費(fèi)數(shù)據(jù))的過程中,又有新的socket數(shù)據(jù)發(fā)來了,但是由于程序還阻塞在同步的處理數(shù)據(jù)函數(shù)(消費(fèi)者)中,還輪不到接收數(shù)據(jù)的生產(chǎn)者執(zhí)行,因此可能會(huì)丟失數(shù)據(jù)。

  • 這就是生產(chǎn)者和消費(fèi)者耦合嚴(yán)重。消費(fèi)者消費(fèi)太慢會(huì)影響到生產(chǎn)。

解決辦法——引入緩沖隊(duì)列,合理使用生產(chǎn)消費(fèi)模式

生產(chǎn)者消費(fèi)者模式架構(gòu)
  • 可以將生產(chǎn)者生產(chǎn)的數(shù)據(jù)push推送到一個(gè)緩沖隊(duì)列中,數(shù)據(jù)入隊(duì)。
  • 消費(fèi)者使用一個(gè)單獨(dú)的線程,從緩沖隊(duì)列中取用數(shù)據(jù),數(shù)據(jù)出隊(duì)。
int loop_flag = 1;    //全局變量
queue<datatype> q_json_datas;  //生產(chǎn)者和消費(fèi)者兩線程共享的數(shù)據(jù)
mutex mtx;  //互斥量,用來對(duì)多線程共享數(shù)據(jù)加鎖

//生產(chǎn)者線程執(zhí)行函數(shù),內(nèi)有while循環(huán)保持線程不結(jié)束
void Producer()
{
  while(loop_flag)
  {
    datatype json_data;
    json_data = recv_json();  //生產(chǎn)者,在別處定義聲明
    if(json_data && !隊(duì)列滿)
    {
        mtx.lock();  //線程訪問共享數(shù)據(jù)要加鎖
        q_json_datas.push( json_data );  //生產(chǎn)的數(shù)據(jù)先保存到緩沖隊(duì)列
        mtx.unlock();  // 使用完解鎖
    }
  }
}

//創(chuàng)建一個(gè)單獨(dú)的消費(fèi)者線程
void Consumer()
{
   //也使用一個(gè)while循環(huán)檢查隊(duì)列中是否還有未處理的數(shù)據(jù),阻塞線程不結(jié)束
  while(loop_flag)    
  {
    if( !q_json_datas.empty())
    {
        mtx.lock();  //線程訪問共享數(shù)據(jù)要加鎖
        datatype json_data2 = q_json_datas.front();  //從隊(duì)列中獲取數(shù)據(jù)
        q_json_datas.pop();  //將已消費(fèi)的數(shù)據(jù)從隊(duì)列中清除
        mtx.unlock();
        parse_json( json_data2 );  //消費(fèi)者,在別處定義聲明 
    }
  }
}

int main()
{
    thread t1( &Producer );  //創(chuàng)建一個(gè)子線程來作為生產(chǎn)者   
    thread t2( &Consumer );  //創(chuàng)建一個(gè)子線程來作為消費(fèi)者
}



  • 如此一來,生產(chǎn)者和消費(fèi)者解耦了,他們不是直接聯(lián)系在一起而是通過緩沖隊(duì)列聯(lián)系,當(dāng)然在緩沖隊(duì)列出隊(duì)和入隊(duì)時(shí)要注意對(duì)共享的緩沖隊(duì)列加鎖。
  • 另一個(gè)問題:若是收到的某些json數(shù)據(jù)要求第一時(shí)間被處理,那就涉及到了優(yōu)先級(jí)隊(duì)列的問題,即這些數(shù)據(jù)收到后要放到待處理隊(duì)列的頭部。因此隊(duì)列不能用普通的queue容器來實(shí)現(xiàn),可以用priority_queue優(yōu)先級(jí)隊(duì)列來實(shí)現(xiàn)。
    如此一來,就需要在json數(shù)據(jù)中有標(biāo)明優(yōu)先級(jí)的一個(gè)字段。但是為了是生產(chǎn)者線程能夠立即再投入生產(chǎn),應(yīng)該考慮讓新生產(chǎn)(接收到)的數(shù)據(jù)快速進(jìn)入緩沖隊(duì)列,即標(biāo)明優(yōu)先的就放到隊(duì)列最前面,不考慮優(yōu)先級(jí)的比較再排位置。
  • 在多線程加鎖處理過程中,還可以考慮結(jié)合使用信號(hào)量和互斥鎖。特別是在緩沖隊(duì)列只能存放一個(gè)元素這種極端的情況下,只能生產(chǎn)者先生產(chǎn)入隊(duì),然后消費(fèi)者才能去消費(fèi),這種情況如果一直用while循環(huán),會(huì)大大的浪費(fèi)cpu。如果用信號(hào)量的PV操作來控制生產(chǎn)者和消費(fèi)者的同步,那就更方便了,生產(chǎn)者生產(chǎn)后執(zhí)行V操作,使信號(hào)量+1,然后消費(fèi)者被喚醒去執(zhí)行P操作使信號(hào)量-1。但是這種使用方式類似于條件變量了。
  • 如果是C語言,將隊(duì)列換成鏈表,能夠更靈活地使用空間,每次加減任務(wù)只需要加減鏈表節(jié)點(diǎn)就行了。當(dāng)然如果是C++,那還是用STL的queue比較方便。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1.ios高性能編程 (1).內(nèi)層 最小的內(nèi)層平均值和峰值(2).耗電量 高效的算法和數(shù)據(jù)結(jié)構(gòu)(3).初始化時(shí)...
    歐辰_OSR閱讀 30,282評(píng)論 8 265
  • 簡介 在實(shí)際的軟件開發(fā)過程中,經(jīng)常會(huì)碰到如下場景: 某個(gè)模塊負(fù)責(zé)產(chǎn)生數(shù)據(jù),這些數(shù)據(jù)由另一個(gè)模塊來負(fù)責(zé)處理(此處的模...
    RadioWaves閱讀 6,491評(píng)論 2 16
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,688評(píng)論 19 139
  • 馮老說:“今年咱們一共有300多名學(xué)生,這群人里面今后能出一兩個(gè)畫家已經(jīng)很不錯(cuò)了?!边@是馮老(以前寫過)在考前美術(shù)...
    過點(diǎn)小日子閱讀 326評(píng)論 0 0
  • 那么,我們接下來看一下大腦是如何處理信息的,處理信息的時(shí)候有哪些特點(diǎn),處理完的信息以什么樣的形式存儲(chǔ)的呢? 我們的...
    上善若水在路上閱讀 8,532評(píng)論 0 0

友情鏈接更多精彩內(nèi)容