(9)消息業(yè)務路由分發(fā)機制(Reactor部分)-【Lars-基于C++負載均衡遠程服務器調(diào)度系統(tǒng)教程】

【Lars教程目錄】

Lars源代碼
https://github.com/aceld/Lars


【Lars系統(tǒng)概述】
第1章-概述
第2章-項目目錄構(gòu)建


【Lars系統(tǒng)之Reactor模型服務器框架模塊】
第1章-項目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務路由分發(fā)機制
第8章-鏈接創(chuàng)建/銷毀Hook機制
第9章-消息任務隊列與線程池
第10章-配置文件讀寫功能
第11章-udp服務與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測試
第14章-異步消息任務機制
第15章-鏈接屬性設置功能


【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡介
第2章-數(shù)據(jù)庫創(chuàng)建
第3章-項目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實時監(jiān)控


【Lars系統(tǒng)之Report Service模塊】
第1章-項目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報數(shù)據(jù)
第3章-存儲線程池及消息隊列


【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項目概述及構(gòu)建
第2章-主模塊業(yè)務結(jié)構(gòu)搭建
第3章-Report與Dns Client設計與實現(xiàn)
第4章-負載均衡模塊基礎設計
第5章-負載均衡獲取Host主機信息API
第6章-負載均衡上報Host主機信息API
第7章-過期窗口清理與過載超時(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測試工具
第12章- Lars啟動工具腳本


8) 消息業(yè)務路由分發(fā)機制

? 現(xiàn)在我們發(fā)送的消息都是message結(jié)構(gòu)的,有個message頭里面其中有兩個關鍵的字段,msgidmsglen,其中加入msgid的意義就是我們可以甄別是哪個消息,從而對這類消息做出不同的業(yè)務處理。但是現(xiàn)在我們無論是服務端還是客戶端都是寫死的兩個業(yè)務,就是"回顯業(yè)務",顯然這并不滿足我們作為服務器框架的需求。我們需要開發(fā)者可以注冊自己的回調(diào)業(yè)務。所以我們需要提供一個注冊業(yè)務的入口,然后在后端根據(jù)不同的msgid來激活不同的回調(diào)業(yè)務函數(shù)。

8.1 添加消息分發(fā)路由類msg_router

? 下面我們提供這樣一個中轉(zhuǎn)的router模塊,在include/message.h添加

lars_reactor/include/message.h

#pragma once

#include <ext/hash_map>

//解決tcp粘包問題的消息頭
struct msg_head
{
    int msgid;
    int msglen;
};

//消息頭的二進制長度,固定數(shù)
#define MESSAGE_HEAD_LEN 8

//消息頭+消息體的最大長度限制
#define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN)

//msg 業(yè)務回調(diào)函數(shù)原型

//===================== 消息分發(fā)路由機制 ==================
class tcp_client;
typedef void msg_callback(const char *data, uint32_t len, int msgid, tcp_client *client, void *user_data);


//消息路由分發(fā)機制
class msg_router 
{
public:
    msg_router():_router(),_args() {}  

    //給一個消息ID注冊一個對應的回調(diào)業(yè)務函數(shù)
    int register_msg_router(int msgid, msg_callback *msg_cb, void *user_data) 
    {
        if(_router.find(msgid) != _router.end()) {
            //該msgID的回調(diào)業(yè)務已經(jīng)存在
            return -1;
        }

        _router[msgid] = msg_cb;
        _args[msgid] = user_data;

        return 0;
    }

    //調(diào)用注冊的對應的回調(diào)業(yè)務函數(shù)
    void call(int msgid, uint32_t msglen, const char *data, tcp_client *client) 
    {
        //判斷msgid對應的回調(diào)是否存在
        if (_router.find(msgid) == _router.end()) {
            fprintf(stderr, "msgid %d is not register!\n", msgid);
            return;
        }

        //直接取出回調(diào)函數(shù),執(zhí)行
        msg_callback *callback = _router[msgid];
        void *user_data = _args[msgid];
        callback(data, msglen, msgid, client, user_data);
    }

private:
    //針對消息的路由分發(fā),key為msgID, value為注冊的回調(diào)業(yè)務函數(shù)
    __gnu_cxx::hash_map<int, msg_callback *> _router;
    //回調(diào)業(yè)務函數(shù)對應的參數(shù),key為msgID, value為對應的參數(shù)
    __gnu_cxx::hash_map<int, void *> _args;
};
//===================== 消息分發(fā)路由機制 ==================

? 開發(fā)者需要注冊一個msg_callback類型的函數(shù),通過msg_router類的register_msg_router()方法來注冊,同時通過call()方法來調(diào)用。

? 全部回調(diào)業(yè)務函數(shù)和msgid的對應關系保存在一個hash_map類型的_routermap中,_args保存對應的參數(shù)。

? 但是這里有個小細節(jié)需要注意一下,msg_callback的函數(shù)類型聲明是這樣的。

typedef void msg_callback(const char *data, uint32_t len, int msgid, tcp_client *client, void *user_data);

? 其中這里面第4個參數(shù),只能是tcp_client類型的參數(shù),也就是我們之前的設計的msg_callback只支持tcp_client的消息回調(diào)機制,但是很明顯我們的需求是不僅是tcp_client要用,tcp_server中的tcp_conn也要用到這個機制,那么很顯然這個參數(shù)在這就不是很合適,那么如果設定一個形參既能指向tcp_client又能能指向tcp_conn兩個類型呢,當然答案就只能是將這兩個類抽象出來一層,用父類指針指向子類然后通過多態(tài)特性來調(diào)用就可以了,所以我們需要先定義一個抽象類。

8.2 鏈接抽象類創(chuàng)建

? 經(jīng)過分析,我們定義如下的抽象類,并提供一些接口。

lars_reactor/include/net_connection.h

#pragma once

/*
 * 
 * 網(wǎng)絡通信的抽象類,任何需要進行收發(fā)消息的模塊,都可以實現(xiàn)該類
 *
 * */

class net_connection
{
public:
    //發(fā)送消息的接口
    virtual int send_message(const char *data, int datalen, int msgid) = 0;
};

? 然后讓我們tcp_server端的tcp_conn類繼承net_connecton, 客戶端的tcp_client 繼承net_connection

lars_reactor/include/tcp_conn.h

class tcp_conn : public net_connection
{
    //...
};

lars_reactor/include/tcp_client.h

class tcp_client : public net_connection
{
    //...
}

這樣,我們就可以用一個net_connection指針指向這兩種不同的對象實例了。

? 接下來我們將msg_callback回調(diào)業(yè)務函數(shù)類型改成

typedef void msg_callback(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data);

? 這樣這個業(yè)務函數(shù)就可以支持tcp_conn和tcp_client了。

所以修改之后,我們的msg_router類定義如下:

lars_reactor/include/message.h

//消息路由分發(fā)機制
class msg_router 
{
public:
    msg_router(): {
            printf("msg router init ...\n");
    }  

    //給一個消息ID注冊一個對應的回調(diào)業(yè)務函數(shù)
    int register_msg_router(int msgid, msg_callback *msg_cb, void *user_data) 
    {
        if(_router.find(msgid) != _router.end()) {
            //該msgID的回調(diào)業(yè)務已經(jīng)存在
            return -1;
        }
        printf("add msg cb msgid = %d\n", msgid);

        _router[msgid] = msg_cb;
        _args[msgid] = user_data;

        return 0;
    }

    //調(diào)用注冊的對應的回調(diào)業(yè)務函數(shù)
    void call(int msgid, uint32_t msglen, const char *data, net_connection *net_conn) 
    {
        printf("call msgid = %d\n", msgid);
        //判斷msgid對應的回調(diào)是否存在
        if (_router.find(msgid) == _router.end()) {
            fprintf(stderr, "msgid %d is not register!\n", msgid);
            return;
        }

        //直接取出回調(diào)函數(shù),執(zhí)行
        msg_callback *callback = _router[msgid];
        void *user_data = _args[msgid];
        callback(data, msglen, msgid, net_conn, user_data);
        printf("=======\n");
    }

private:
    //針對消息的路由分發(fā),key為msgID, value為注冊的回調(diào)業(yè)務函數(shù)
    __gnu_cxx::hash_map<int, msg_callback*> _router;
    //回調(diào)業(yè)務函數(shù)對應的參數(shù),key為msgID, value為對應的參數(shù)
    __gnu_cxx::hash_map<int, void*> _args;
};

8.3 msg_router集成到tcp_server中

A. tcp_server添加msg_router靜態(tài)成員變量

lars_reactor/include/tcp_server.h

class tcp_server
{ 
public: 
        // ...

    //---- 消息分發(fā)路由 ----
    static msg_router router; 
    
    // ...
}; 

同時定義及初始化

lars_reactor/src/tcp_server.cpp

//...

// ==== 消息分發(fā)路由   ===
msg_router tcp_server::router;
//...

B. tcp_server提供注冊路由方法

lars_reactor/include/tcp_server.c

class tcp_server
{ 
public: 
    //...
    //注冊消息路由回調(diào)函數(shù)
    void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {
        router.register_msg_router(msgid, cb, user_data);
    }

        //...
        
public:
      //全部已經(jīng)在線的連接信息

    //---- 消息分發(fā)路由 ----
    static msg_router router; 
        //...
}; 

C. 修正tcp_conn的do_read改成消息分發(fā)

lars_reactor/src/tcp_conn.cpp

//...

//處理讀業(yè)務
void tcp_conn::do_read()
{
    //1. 從套接字讀取數(shù)據(jù)
    int ret = ibuf.read_data(_connfd);
    if (ret == -1) {
        fprintf(stderr, "read data from socket\n");
        this->clean_conn();
        return ;
    }
    else if ( ret == 0) {
        //對端正常關閉
        printf("connection closed by peer\n");
        clean_conn();
        return ;
    }

    //2. 解析msg_head數(shù)據(jù)    
    msg_head head;    
    
    //[這里用while,可能一次性讀取多個完整包過來]
    while (ibuf.length() >= MESSAGE_HEAD_LEN)  {
        //2.1 讀取msg_head頭部,固定長度MESSAGE_HEAD_LEN    
        memcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN);
        if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) {
            fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen);
            this->clean_conn();
            break;
        }
        if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) {
            //緩存buf中剩余的數(shù)據(jù),小于實際上應該接受的數(shù)據(jù)
            //說明是一個不完整的包,應該拋棄
            break;
        }

        //2.2 再根據(jù)頭長度讀取數(shù)據(jù)體,然后針對數(shù)據(jù)體處理 業(yè)務
        
        //頭部處理完了,往后偏移MESSAGE_HEAD_LEN長度
        ibuf.pop(MESSAGE_HEAD_LEN);
        
        //處理ibuf.data()業(yè)務數(shù)據(jù)
        printf("read data: %s\n", ibuf.data());

        //消息包路由模式
        tcp_server::router.call(head.msgid, head.msglen, ibuf.data(), this);
        
        ////回顯業(yè)務
        //callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this);
        

        //消息體處理完了,往后便宜msglen長度
        ibuf.pop(head.msglen);
    }

    ibuf.adjust();
    
    return ;
}

//...

8.4 msg_router集成到tcp_client中

lars_reactor/include/tcp_client.h

class tcp_client : public net_connection
{
public:
    // ...

    //設置業(yè)務處理回調(diào)函數(shù)
    //void set_msg_callback(msg_callback *msg_cb) 
    //{
        //this->_msg_callback = msg_cb;
    //}
    
    //注冊消息路由回調(diào)函數(shù)
    void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {
        _router.register_msg_router(msgid, cb, user_data);
    }
    

private:
  
    //處理消息的分發(fā)路由
    msg_router _router;    
    //msg_callback *_msg_callback; //單路由模式去掉

        // ...
        // ...
};

? 然后在修正tcp_clientdo_read()方法。

lars_reactor/src/tcp_client.cpp

//處理讀業(yè)務
int tcp_client::do_read()
{
    //確定已經(jīng)成功建立連接
    assert(connected == true);
    // 1. 一次性全部讀取出來
    
    //得到緩沖區(qū)里有多少字節(jié)要被讀取,然后將字節(jié)數(shù)放入b里面。   
    int need_read = 0;
    if (ioctl(_sockfd, FIONREAD, &need_read) == -1) {
        fprintf(stderr, "ioctl FIONREAD error");
        return -1;
    }


    //確保_buf可以容納可讀數(shù)據(jù)
    assert(need_read <= _ibuf.capacity - _ibuf.length);

    int ret;

    do {
        ret = read(_sockfd, _ibuf.data + _ibuf.length, need_read);
    } while(ret == -1 && errno == EINTR);

    if (ret == 0) {
        //對端關閉
        if (_name != NULL) {
            printf("%s client: connection close by peer!\n", _name);
        }
        else {
            printf("client: connection close by peer!\n");
        }

        clean_conn();
        return -1;
    }
    else if (ret == -1) {
        fprintf(stderr, "client: do_read() , error\n");
        clean_conn();
        return -1;
    }

    
    assert(ret == need_read);
    _ibuf.length += ret;

    //2. 解包
    msg_head head;
    int msgid, length;
    while (_ibuf.length >= MESSAGE_HEAD_LEN) {
        memcpy(&head, _ibuf.data + _ibuf.head, MESSAGE_HEAD_LEN);
        msgid = head.msgid; 
        length = head.msglen;

        /*
        if (length + MESSAGE_HEAD_LEN < _ibuf.length) {
            break;
        }
        */

        //頭部讀取完畢
        _ibuf.pop(MESSAGE_HEAD_LEN);

        // ===================================
        //3. 交給業(yè)務函數(shù)處理
        //if (_msg_callback != NULL) {
            //this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);
        //}
        // 消息路由分發(fā)
        this->_router.call(msgid, length, _ibuf.data + _ibuf.head, this);
            // ===================================

        //數(shù)據(jù)區(qū)域處理完畢
        _ibuf.pop(length);
    }
    
    //重置head指針
    _ibuf.adjust();

    return 0;
}

8.5 完成Lars Reactor V0.6開發(fā)

我們現(xiàn)在重新寫一下 server.cpp 和client.cpp的兩個應用程序

lars_reacor/example/lars_reactor_0.6/server.cpp

#include "tcp_server.h"


//回顯業(yè)務的回調(diào)函數(shù)
void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
    printf("callback_busi ...\n");
    //直接回顯
    conn->send_message(data, len, msgid);
}

//打印信息回調(diào)函數(shù)
void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
    printf("recv client: [%s]\n", data);
    printf("msgid: [%d]\n", msgid);
    printf("len: [%d]\n", len);
}

int main() 
{
    event_loop loop;

    tcp_server server(&loop, "127.0.0.1", 7777);

    //注冊消息業(yè)務路由
    server.add_msg_router(1, callback_busi);
    server.add_msg_router(2, print_busi);

    loop.event_process();

    return 0;
}

lars_reacor/example/lars_reactor_0.6/client.cpp

#include "tcp_client.h"
#include <stdio.h>
#include <string.h>


//客戶端業(yè)務
void busi(const char *data, uint32_t len, int msgid, net_connection  *conn, void *user_data)
{
    //得到服務端回執(zhí)的數(shù)據(jù) 
    
    printf("recv server: [%s]\n", data);
    printf("msgid: [%d]\n", msgid);
    printf("len: [%d]\n", len);
}


int main() 
{

    event_loop loop;

    //創(chuàng)建tcp客戶端
    tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6");


    //注冊消息路由業(yè)務
    client.add_msg_router(1, busi);

    //開啟事件監(jiān)聽
    loop.event_process();

    return 0;
}

lars_reactor/src/tcp_client.cpp

//判斷鏈接是否是創(chuàng)建鏈接,主要是針對非阻塞socket 返回EINPROGRESS錯誤
static void connection_delay(event_loop *loop, int fd, void *args)
{
    tcp_client *cli = (tcp_client*)args;
    loop->del_io_event(fd);

    int result = 0;
    socklen_t result_len = sizeof(result);
    getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len);
    if (result == 0) {
        //鏈接是建立成功的
        cli->connected = true;

        printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));



                // ================ 發(fā)送msgid:1 =====
        //建立連接成功之后,主動發(fā)送send_message
        const char *msg = "hello lars!";
        int msgid = 1;
        cli->send_message(msg, strlen(msg), msgid);

        // ================ 發(fā)送msgid:2 =====
        const char *msg2 = "hello Aceld!";
        msgid = 2;
        cli->send_message(msg2, strlen(msg2), msgid);
                // ================
                
        loop->add_io_event(fd, read_callback, EPOLLIN, cli);

        if (cli->_obuf.length != 0) {
            //輸出緩沖有數(shù)據(jù)可寫
            loop->add_io_event(fd, write_callback, EPOLLOUT, cli);
        }
    }
    else {
        //鏈接創(chuàng)建失敗
        fprintf(stderr, "connection %s:%d error\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
    }
}

運行結(jié)果:

服務端

$ ./server 
msg_router init...
add msg cb msgid = 1
add msg cb msgid = 2
begin accept
get new connection succ!
read data: hello lars!?
call msgid = 1
callback_busi ...
server send_message: hello lars!?:11, msgid = 1
=======
read data: hello Aceld!
call msgid = 2
recv client: [hello Aceld!]
msgid: [2]
len: [12]

客戶端

$ ./client 
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 1
connect 127.0.0.1:7777 succ!
do write over, del EPOLLOUT
call msgid = 1
recv server: [hello lars!]
msgid: [1]
len: [11]
=======

關于作者:

作者:Aceld(劉丹冰)

mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld

原創(chuàng)聲明:未經(jīng)作者允許請勿轉(zhuǎn)載, 如果轉(zhuǎn)載請注明出處

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容