(11)消息任務(wù)隊(duì)列與線程池(Reactor部分)-【Lars-基于C++負(fù)載均衡遠(yuǎn)程服務(wù)器調(diào)度系統(tǒng)教程】

【Lars教程目錄】

Lars源代碼
https://github.com/aceld/Lars


【Lars系統(tǒng)概述】
第1章-概述
第2章-項(xiàng)目目錄構(gòu)建


【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項(xiàng)目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機(jī)制
第8章-鏈接創(chuàng)建/銷毀Hook機(jī)制
第9章-消息任務(wù)隊(duì)列與線程池
第10章-配置文件讀寫功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測試
第14章-異步消息任務(wù)機(jī)制
第15章-鏈接屬性設(shè)置功能


【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡介
第2章-數(shù)據(jù)庫創(chuàng)建
第3章-項(xiàng)目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實(shí)時(shí)監(jiān)控


【Lars系統(tǒng)之Report Service模塊】
第1章-項(xiàng)目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報(bào)數(shù)據(jù)
第3章-存儲(chǔ)線程池及消息隊(duì)列


【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項(xiàng)目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計(jì)與實(shí)現(xiàn)
第4章-負(fù)載均衡模塊基礎(chǔ)設(shè)計(jì)
第5章-負(fù)載均衡獲取Host主機(jī)信息API
第6章-負(fù)載均衡上報(bào)Host主機(jī)信息API
第7章-過期窗口清理與過載超時(shí)(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負(fù)載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測試工具
第12章- Lars啟動(dòng)工具腳本


2-Lars-reactor.png

我們接下來要設(shè)計(jì)線程池和與之對(duì)應(yīng)的消息隊(duì)列。具體的總體形勢應(yīng)該是這樣的

9-thread_pool.png

這里面有幾個(gè)類型,thread_pool就是我們要?jiǎng)?chuàng)建的線程池,這里面會(huì)有很多thread其中每個(gè)thread都會(huì)啟動(dòng)一個(gè)epoll也就是我們封裝好的event_loop來監(jiān)控各自創(chuàng)建好的tcp_conn的讀寫事件。每個(gè)thread都會(huì)有一個(gè)thread_queue消息任務(wù)隊(duì)列與之綁定,每個(gè)thread_queue里面會(huì)接收task_msg任務(wù)類型。

10.1 消息任務(wù)類型

lars_reactor/include/task_msg.h

#pragma  once
#include "event_loop.h"

struct task_msg
{
    enum TASK_TYPE
    {
        NEW_CONN,   //新建鏈接的任務(wù)
        NEW_TASK,   //一般的任務(wù)
    };

    TASK_TYPE type; //任務(wù)類型

    //任務(wù)的一些參數(shù)
    
    union {
        //針對(duì) NEW_CONN新建鏈接任務(wù),需要傳遞connfd
        int connfd;

        /*====  暫時(shí)用不上 ==== */
        //針對(duì) NEW_TASK 新建任務(wù), 
        //那么可以給一個(gè)任務(wù)提供一個(gè)回調(diào)函數(shù)
        struct {
            void (*task_cb)(event_loop*, void *args);
            void *args;
        };
    };
};

? 這里面task_msg一共有兩個(gè)類型的type,一個(gè)是新鏈接的任務(wù),一個(gè)是普通任務(wù)。兩個(gè)任務(wù)所攜帶的參數(shù)不同,所以用了一個(gè)union。

?

10.2 消息任務(wù)隊(duì)列

lars_reactor/include/thread_queue.h

#pragma once

#include <queue>
#include <pthread.h>
#include <sys/eventfd.h>
#include <stdio.h>
#include <unistd.h>
#include "event_loop.h"

/*
 *
 * 每個(gè)thread對(duì)應(yīng)的 消息任務(wù)隊(duì)列
 *
 * */
template <typename T>
class thread_queue
{
public:
    thread_queue()
    {
        _loop = NULL;
        pthread_mutex_init(&_queue_mutex, NULL);
        _evfd = eventfd(0, EFD_NONBLOCK);
        if (_evfd == -1) {
            perror("evenfd(0, EFD_NONBLOCK)");
            exit(1);
        }
    }

    ~thread_queue()
    {
        pthread_mutex_destroy(&_queue_mutex);
        close(_evfd);
    }

    //向隊(duì)列添加一個(gè)任務(wù)
    void send(const T& task) {
        //觸發(fā)消息事件的占位傳輸內(nèi)容
        unsigned long long idle_num = 1;

        pthread_mutex_lock(&_queue_mutex);
        //將任務(wù)添加到隊(duì)列
        _queue.push(task);

        //向_evfd寫,觸發(fā)對(duì)應(yīng)的EPOLLIN事件,來處理該任務(wù)
        int ret = write(_evfd, &idle_num, sizeof(unsigned long long));
        if (ret == -1) {
            perror("_evfd write");
        }

        pthread_mutex_unlock(&_queue_mutex);
    }


    //獲取隊(duì)列,(當(dāng)前隊(duì)列已經(jīng)有任務(wù))
    void recv(std::queue<T>& new_queue) {
        unsigned int long long idle_num = 1;
        pthread_mutex_lock(&_queue_mutex);
        //把占位的數(shù)據(jù)讀出來,確保底層緩沖沒有數(shù)據(jù)存留
        int ret = read(_evfd, &idle_num, sizeof(unsigned long long));
        if (ret == -1) {
            perror("_evfd read");
        }

        //將當(dāng)前的隊(duì)列拷貝出去,將一個(gè)空隊(duì)列換回當(dāng)前隊(duì)列,同時(shí)清空自身隊(duì)列,確保new_queue是空隊(duì)列
        std::swap(new_queue, _queue);

        pthread_mutex_unlock(&_queue_mutex);
    }


    //設(shè)置當(dāng)前thead_queue是被哪個(gè)事件觸發(fā)event_loop監(jiān)控
    void set_loop(event_loop *loop) {
        _loop = loop;  
    }

    //設(shè)置當(dāng)前消息任務(wù)隊(duì)列的 每個(gè)任務(wù)觸發(fā)的回調(diào)業(yè)務(wù)
    void set_callback(io_callback *cb, void *args = NULL)
    {
        if (_loop != NULL) {
            _loop->add_io_event(_evfd, cb, EPOLLIN, args);
        }
    }

    //得到當(dāng)前l(fā)oop
    event_loop * get_loop() {
        return _loop;
    }

    
private:
    int _evfd;            //觸發(fā)消息任務(wù)隊(duì)列讀取的每個(gè)消息業(yè)務(wù)的fd
    event_loop *_loop;    //當(dāng)前消息任務(wù)隊(duì)列所綁定在哪個(gè)event_loop事件觸發(fā)機(jī)制中
    std::queue<T> _queue; //隊(duì)列
    pthread_mutex_t _queue_mutex; //進(jìn)行添加任務(wù)、讀取任務(wù)的保護(hù)鎖
};

? 一個(gè)模板類,主要是消息任務(wù)隊(duì)列里的元素類型未必一定是task_msg類型。

thread_queue需要綁定一個(gè)event_loop。來觸發(fā)消息到達(dá),捕獲消息并且觸發(fā)處理消息業(yè)務(wù)的動(dòng)作。

? 這里面有個(gè)_evfd是為了觸發(fā)消息隊(duì)列消息到達(dá),處理該消息作用的,將_evfd加入到對(duì)應(yīng)線程的event_loop中,然后再通過set_callback設(shè)置一個(gè)通用的該queue全部消息所觸發(fā)的處理業(yè)務(wù)call_back,在這個(gè)call_back里開發(fā)者可以自定義實(shí)現(xiàn)一些處理業(yè)務(wù)流程。

  1. 通過send將任務(wù)發(fā)送給消息隊(duì)列。
  2. 通過event_loop觸發(fā)注冊(cè)的io_callback得到消息隊(duì)列里的任務(wù)。
  3. 在io_callback中調(diào)用recv取得task任務(wù),根據(jù)任務(wù)的不同類型,處理自定義不同業(yè)務(wù)流程。

10.3 線程池

? 接下來,我們定義線程池,將thread_queuethread_pool進(jìn)行關(guān)聯(lián)。

lars_reactor/include/thread_pool.h

#pragma once

#include <pthread.h>
#include "task_msg.h"
#include "thread_queue.h"

class thread_pool
{
public:
    //構(gòu)造,初始化線程池, 開辟thread_cnt個(gè)
    thread_pool(int thread_cnt);

    //獲取一個(gè)thead
    thread_queue<task_msg>* get_thread();

private:

    //_queues是當(dāng)前thread_pool全部的消息任務(wù)隊(duì)列頭指針
    thread_queue<task_msg> ** _queues; 

    //當(dāng)前線程池中的線程個(gè)數(shù)
    int _thread_cnt;

    //已經(jīng)啟動(dòng)的全部therad編號(hào)
    pthread_t * _tids;

    //當(dāng)前選中的線程隊(duì)列下標(biāo)
    int _index;
};

屬性:

_queues:是thread_queue集合,和當(dāng)前線程數(shù)量一一對(duì)應(yīng),每個(gè)線程對(duì)應(yīng)一個(gè)queue。里面存的元素是task_msg。

_tids:保存線程池中每個(gè)線程的ID。

_thread_cnt:當(dāng)前線程的個(gè)數(shù).

_index:表示外層在選擇哪個(gè)thead處理任務(wù)時(shí)的一個(gè)下標(biāo),因?yàn)槭禽喸兲幚?,所以需要一個(gè)下標(biāo)記錄。

方法:

thread_pool():構(gòu)造函數(shù),初始化線程池。

get_thread():通過輪詢方式,獲取一個(gè)線程的thread_queue.

lars_reactor/src/thread_pool.cpp

#include "thread_pool.h"
#include "event_loop.h"
#include "tcp_conn.h"
#include <unistd.h>
#include <stdio.h>

/*
 * 一旦有task消息過來,這個(gè)業(yè)務(wù)是處理task消息業(yè)務(wù)的主流程
 *
 * 只要有人調(diào)用 thread_queue:: send()方法就會(huì)觸發(fā)次函數(shù)
*/
void deal_task_message(event_loop *loop, int fd, void *args)
{
    //得到是哪個(gè)消息隊(duì)列觸發(fā)的 
    thread_queue<task_msg>* queue = (thread_queue<task_msg>*)args;

    //將queue中的全部任務(wù)取出來
    std::queue<task_msg> tasks;
    queue->recv(tasks);

    while (tasks.empty() != true) {
        task_msg task = tasks.front();

        //彈出一個(gè)元素
        tasks.pop();

        if (task.type == task_msg::NEW_CONN) {
            //是一個(gè)新建鏈接的任務(wù)
            //并且將這個(gè)tcp_conn加入當(dāng)當(dāng)前線程的loop中去監(jiān)聽
            tcp_conn *conn = new tcp_conn(task.connfd, loop);
            if (conn == NULL) {
                fprintf(stderr, "in thread new tcp_conn error\n");
                exit(1);
            }

            printf("[thread]: get new connection succ!\n");
        }
        else if (task.type == task_msg::NEW_TASK) {
            //是一個(gè)新的普通任務(wù)
            //TODO
        } 
        else {
            //其他未識(shí)別任務(wù)
            fprintf(stderr, "unknow task!\n");
        }
    }
}

//一個(gè)線程的主業(yè)務(wù)main函數(shù)
void *thread_main(void *args)
{
    thread_queue<task_msg> *queue = (thread_queue<task_msg>*)args;

    //每個(gè)線程都應(yīng)該有一個(gè)event_loop來監(jiān)控客戶端鏈接的讀寫事件
    event_loop *loop = new event_loop();
    if (loop == NULL) {
        fprintf(stderr, "new event_loop error\n");
        exit(1);
    }

    //注冊(cè)一個(gè)觸發(fā)消息任務(wù)讀寫的callback函數(shù) 
    queue->set_loop(loop);
    queue->set_callback(deal_task_message, queue);

    //啟動(dòng)阻塞監(jiān)聽
    loop->event_process();

    return NULL;
}


thread_pool::thread_pool(int thread_cnt)
{
    _index = 0;
    _queues = NULL;
    _thread_cnt = thread_cnt;
    if (_thread_cnt <= 0) {
        fprintf(stderr, "_thread_cnt < 0\n");
        exit(1);
    }

    //任務(wù)隊(duì)列的個(gè)數(shù)和線程個(gè)數(shù)一致
    _queues = new thread_queue<task_msg>*[thread_cnt];
    _tids = new pthread_t[thread_cnt];

    int ret;
    for (int i = 0; i < thread_cnt; ++i) {
        //創(chuàng)建一個(gè)線程
        printf("create %d thread\n", i);
        //給當(dāng)前線程創(chuàng)建一個(gè)任務(wù)消息隊(duì)列
        _queues[i] = new thread_queue<task_msg>();
        ret = pthread_create(&_tids[i], NULL, thread_main, _queues[i]);
        if (ret == -1) {
            perror("thread_pool, create thread");
            exit(1);
        }

        //將線程脫離
        pthread_detach(_tids[i]);
    }
}

thread_queue<task_msg>* thread_pool::get_thread()
{
    if (_index == _thread_cnt) {
        _index = 0; 
    }

    return _queues[_index];
}

? 這里主要看deal_task_message()方法,是處理收到的task任務(wù)的。目前我們只對(duì)NEW_CONN類型的任務(wù)進(jìn)行處理,一般任務(wù)先不做處理,因?yàn)闀簳r(shí)用不上。

? NEW_CONN的處理主要是讓當(dāng)前線程創(chuàng)建鏈接,并且將該鏈接由當(dāng)前線程的event_loop接管。

? 接下來我們就要將線程池添加到reactor框架中去。

10.4 reactor線程池關(guān)聯(lián)

? 將線程池添加到tcp_server中。

lars_reactor/include/tcp_server.h

#pragma once

#include <netinet/in.h>
#include "event_loop.h"
#include "tcp_conn.h"
#include "message.h"
#include "thread_pool.h"


class tcp_server
{ 
public: 
        // ...
        // ...
private:
        // ...
  
    //線程池
    thread_pool *_thread_pool;
}; 

在構(gòu)造函數(shù)中,添加_thread_pool的初始化工作。并且在accept成功之后交給線程處理客戶端的讀寫事件。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>

#include <unistd.h>
#include <signal.h>
#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>
#include <arpa/inet.h>
#include <errno.h>

#include "tcp_server.h"
#include "tcp_conn.h"
#include "reactor_buf.h"


//server的構(gòu)造函數(shù)
tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port)
{
    // ...

    //6 創(chuàng)建鏈接管理
    _max_conns = MAX_CONNS;
    //創(chuàng)建鏈接信息數(shù)組
    conns = new tcp_conn*[_max_conns+3];//3是因?yàn)閟tdin,stdout,stderr 已經(jīng)被占用,再新開fd一定是從3開始,所以不加3就會(huì)棧溢出
    if (conns == NULL) {
        fprintf(stderr, "new conns[%d] error\n", _max_conns);
        exit(1);
    }

    //7 =============創(chuàng)建線程池=================
    int thread_cnt = 3;//TODO 從配置文件中讀取
    if (thread_cnt > 0) {
        _thread_pool = new thread_pool(thread_cnt);
        if (_thread_pool == NULL) {
            fprintf(stderr, "tcp_server new thread_pool error\n");
            exit(1);
        }
    }
    // ========================================

    //8 注冊(cè)_socket讀事件-->accept處理
    _loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this);
}



//開始提供創(chuàng)建鏈接服務(wù)
void tcp_server::do_accept()
{
    int connfd;    
    while(true) {
        //accept與客戶端創(chuàng)建鏈接
        printf("begin accept\n");
        connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
        if (connfd == -1) {
            if (errno == EINTR) {
                fprintf(stderr, "accept errno=EINTR\n");
                continue;
            }
            else if (errno == EMFILE) {
                //建立鏈接過多,資源不夠
                fprintf(stderr, "accept errno=EMFILE\n");
            }
            else if (errno == EAGAIN) {
                fprintf(stderr, "accept errno=EAGAIN\n");
                break;
            }
            else {
                fprintf(stderr, "accept error\n");
                exit(1);
            }
        }
        else {
            //accept succ!
            int cur_conns;
            get_conn_num(&cur_conns);

            //1 判斷鏈接數(shù)量
            if (cur_conns >= _max_conns) {
                fprintf(stderr, "so many connections, max = %d\n", _max_conns);
                close(connfd);
            }
            else {
                                // ========= 將新連接由線程池處理 ==========
                if (_thread_pool != NULL) {
                    //啟動(dòng)多線程模式 創(chuàng)建鏈接
                    //1 選擇一個(gè)線程來處理
                    thread_queue<task_msg>* queue = _thread_pool->get_thread();
                    //2 創(chuàng)建一個(gè)新建鏈接的消息任務(wù)
                    task_msg task;
                    task.type = task_msg::NEW_CONN;
                    task.connfd = connfd;

                    //3 添加到消息隊(duì)列中,讓對(duì)應(yīng)的thread進(jìn)程event_loop處理
                    queue->send(task);
                 // =====================================
                }
                else {
                    //啟動(dòng)單線程模式
                    tcp_conn *conn = new tcp_conn(connfd, _loop);
                    if (conn == NULL) {
                        fprintf(stderr, "new tcp_conn error\n");
                        exit(1);
                    }
                    printf("[tcp_server]: get new connection succ!\n");
                    break;
                }
            }
        }
    }
}

10.5 完成Lars ReactorV0.8開發(fā)

? 0.8版本的server.cpp和client.cpp是不用改變的。開啟服務(wù)端和客戶端觀察執(zhí)行結(jié)果即可。

服務(wù)端:

$ ./server 
msg_router init...
create 0 thread
create 1 thread
create 2 thread
add msg cb msgid = 1
add msg cb msgid = 2
begin accept
begin accept
[thread]: get new connection succ!
read data: Hello Lars!
call msgid = 1
call data = Hello Lars!
call msglen = 11
callback_busi ...
=======

客戶端

$ ./client 
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 1
add msg cb msgid = 101
connect 127.0.0.1:7777 succ!
do write over, del EPOLLOUT
call msgid = 101
call data = welcome! you online..?
call msglen = 21
recv server: [welcome! you online..]
msgid: [101]
len: [21]
=======
call msgid = 1
call data = Hello Lars!
call msglen = 11
recv server: [Hello Lars!]
msgid: [1]
len: [11]
=======

? 我們會(huì)發(fā)現(xiàn),鏈接已經(jīng)成功創(chuàng)建成功,并且是由于線程處理的讀寫任務(wù)。


關(guān)于作者:

作者:Aceld(劉丹冰)

mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld

原創(chuàng)聲明:未經(jīng)作者允許請(qǐng)勿轉(zhuǎn)載, 如果轉(zhuǎn)載請(qǐng)注明出處

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容