(10)鏈接創(chuàng)建/銷毀Hook機制(Reactor部分)-【Lars-基于C++負載均衡遠程服務(wù)器調(diào)度系統(tǒng)教程】

【Lars教程目錄】

Lars源代碼
https://github.com/aceld/Lars


【Lars系統(tǒng)概述】
第1章-概述
第2章-項目目錄構(gòu)建


【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機制
第8章-鏈接創(chuàng)建/銷毀Hook機制
第9章-消息任務(wù)隊列與線程池
第10章-配置文件讀寫功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測試
第14章-異步消息任務(wù)機制
第15章-鏈接屬性設(shè)置功能


【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡介
第2章-數(shù)據(jù)庫創(chuàng)建
第3章-項目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實時監(jiān)控


【Lars系統(tǒng)之Report Service模塊】
第1章-項目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報數(shù)據(jù)
第3章-存儲線程池及消息隊列


【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計與實現(xiàn)
第4章-負載均衡模塊基礎(chǔ)設(shè)計
第5章-負載均衡獲取Host主機信息API
第6章-負載均衡上報Host主機信息API
第7章-過期窗口清理與過載超時(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測試工具
第12章- Lars啟動工具腳本


9) 鏈接創(chuàng)建/銷毀Hook機制

? 下面我們來給鏈接注冊兩個hook節(jié)點的函數(shù),即服務(wù)端有新的客戶端鏈接創(chuàng)建之后用戶可以注冊一個回調(diào),有客戶端斷開鏈接的回調(diào)。還有客戶端在成功與服務(wù)端創(chuàng)建鏈接之后創(chuàng)建的回調(diào),和客戶端與服務(wù)端斷開鏈接之前的回調(diào)。

9.1 tcp_server服務(wù)端添加鏈接Hook函數(shù)

A. 定義Hook函數(shù)原型

lars_reactor/include/net_connection.h

#pragma once

/*
 * 
 * 網(wǎng)絡(luò)通信的抽象類,任何需要進行收發(fā)消息的模塊,都可以實現(xiàn)該類
 *
 * */

class net_connection
{
public:
    net_connection() {}
    //發(fā)送消息的接口
    virtual int send_message(const char *data, int datalen, int msgid) = 0;
};

//創(chuàng)建鏈接/銷毀鏈接 要觸發(fā)的 回調(diào)函數(shù)類型
typedef void (*conn_callback)(net_connection *conn, void *args);

B. tcp_server定義相關(guān)hook函數(shù)的屬性

lars_reactor/include/tcp_server.h

#pragma once

#include <netinet/in.h>
#include "event_loop.h"
#include "tcp_conn.h"
#include "message.h"


class tcp_server
{ 
public: 
    //server的構(gòu)造函數(shù)
    tcp_server(event_loop* loop, const char *ip, uint16_t port); 

    //開始提供創(chuàng)建鏈接服務(wù)
    void do_accept();

    //鏈接對象釋放的析構(gòu)
    ~tcp_server();

    //注冊消息路由回調(diào)函數(shù)
    void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {
        router.register_msg_router(msgid, cb, user_data);
    }

private: 
    //基礎(chǔ)信息
    int _sockfd; //套接字
    struct sockaddr_in _connaddr; //客戶端鏈接地址
    socklen_t _addrlen; //客戶端鏈接地址長度

    //event_loop epoll事件機制
    event_loop* _loop;

public:
    //---- 消息分發(fā)路由 ----
    static msg_router router; 

    //---- 客戶端鏈接管理部分-----
public:
    static void increase_conn(int connfd, tcp_conn *conn);    //新增一個新建的連接
    static void decrease_conn(int connfd);    //減少一個斷開的連接
    static void get_conn_num(int *curr_conn);     //得到當前鏈接的刻度
    static tcp_conn **conns;        //全部已經(jīng)在線的連接信息


    // ------- 創(chuàng)建鏈接/銷毀鏈接 Hook 部分 -----

    //設(shè)置鏈接的創(chuàng)建hook函數(shù)
    static void set_conn_start(conn_callback cb, void *args = NULL) {
        conn_start_cb = cb;
        conn_start_cb_args = args;
    }

    //設(shè)置鏈接的銷毀hook函數(shù)
    static void set_conn_close(conn_callback cb, void *args = NULL) {
        conn_close_cb = cb;
        conn_close_cb_args = args;
    }

    //創(chuàng)建鏈接之后要觸發(fā)的 回調(diào)函數(shù)
    static conn_callback conn_start_cb;
    static void *conn_start_cb_args;

    //銷毀鏈接之前要觸發(fā)的 回調(diào)函數(shù)
    static conn_callback conn_close_cb;
    static void *conn_close_cb_args;

private:
    //TODO 
    //從配置文件中讀取
#define MAX_CONNS 10000
    static int _max_conns;          //最大client鏈接個數(shù)
    static int _curr_conns;         //當前鏈接刻度
    static pthread_mutex_t _conns_mutex; //保護_curr_conns刻度修改的鎖
}; 

C. tcp_conn在連接創(chuàng)建/銷毀調(diào)用Hook函數(shù)

//初始化tcp_conn
tcp_conn::tcp_conn(int connfd, event_loop *loop)
{
    _connfd = connfd;
    _loop = loop;
    //1. 將connfd設(shè)置成非阻塞狀態(tài)
    int flag = fcntl(_connfd, F_GETFL, 0);
    fcntl(_connfd, F_SETFL, O_NONBLOCK|flag);

    //2. 設(shè)置TCP_NODELAY禁止做讀寫緩存,降低小包延遲
    int op = 1;
    setsockopt(_connfd, IPPROTO_TCP, TCP_NODELAY, &op, sizeof(op));//need netinet/in.h netinet/tcp.h


    //2.5 如果用戶注冊了鏈接建立Hook 則調(diào)用
    if (tcp_server::conn_start_cb) {
        tcp_server::conn_start_cb(this, tcp_server::conn_start_cb_args);
    }

    //3. 將該鏈接的讀事件讓event_loop監(jiān)控 
    _loop->add_io_event(_connfd, conn_rd_callback, EPOLLIN, this);

    //4 將該鏈接集成到對應(yīng)的tcp_server中
    tcp_server::increase_conn(_connfd, this);
}

//...
//...
//銷毀tcp_conn
void tcp_conn::clean_conn()
{
    // 如果注冊了鏈接銷毀Hook函數(shù),則調(diào)用
    if (tcp_server::conn_close_cb) {
        tcp_server::conn_close_cb(this, tcp_server::conn_close_cb_args);
    }
    //鏈接清理工作
    //1 將該鏈接從tcp_server摘除掉    
    tcp_server::decrease_conn(_connfd);
    //2 將該鏈接從event_loop中摘除
    _loop->del_io_event(_connfd);
    //3 buf清空
    ibuf.clear(); 
    obuf.clear();
    //4 關(guān)閉原始套接字
    int fd = _connfd;
    _connfd = -1;
    close(fd);
}

9.2 tcp_client客戶端添加鏈接Hook函數(shù)

A. tcp_client添加Hook屬性

lars_reactor/include/tcp_client.h

#pragma once

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "io_buf.h"
#include "event_loop.h"
#include "message.h"
#include "net_connection.h"


class tcp_client : public net_connection
{
public:
    //初始化客戶端套接字
    tcp_client(event_loop *loop, const char *ip, unsigned short port,  const char *name);

    //發(fā)送message方法
    int send_message(const char *data, int msglen, int msgid);

    //創(chuàng)建鏈接
    void do_connect();

    //處理讀業(yè)務(wù)
    int do_read();
    
    //處理寫業(yè)務(wù)
    int do_write();
    
    //釋放鏈接資源
    void clean_conn();

    ~tcp_client();


    //設(shè)置業(yè)務(wù)處理回調(diào)函數(shù)
    //void set_msg_callback(msg_callback *msg_cb) 
    //{
        //this->_msg_callback = msg_cb;
    //}
    
    //注冊消息路由回調(diào)函數(shù)
    void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {
        _router.register_msg_router(msgid, cb, user_data);
    }
    

    //----- 鏈接創(chuàng)建/銷毀回調(diào)Hook ----
    //設(shè)置鏈接的創(chuàng)建hook函數(shù)
    void set_conn_start(conn_callback cb, void *args = NULL) 
    {
        _conn_start_cb = cb;
        _conn_start_cb_args = args;
    }

    //設(shè)置鏈接的銷毀hook函數(shù)
    void set_conn_close(conn_callback cb, void *args = NULL) {
        _conn_close_cb = cb;
        _conn_close_cb_args = args;
    }
    
    //創(chuàng)建鏈接之后要觸發(fā)的 回調(diào)函數(shù)
    conn_callback _conn_start_cb;     
    void * _conn_start_cb_args;

    //銷毀鏈接之前要觸發(fā)的 回調(diào)函數(shù)
    conn_callback _conn_close_cb;
    void * _conn_close_cb_args;
    // ---------------------------------

    bool connected; //鏈接是否創(chuàng)建成功
    //server端地址
    struct sockaddr_in _server_addr;
    io_buf _obuf;
    io_buf _ibuf;

private:
    int _sockfd;
    socklen_t _addrlen;

    //處理消息的分發(fā)路由
    msg_router _router;    
    //msg_callback *_msg_callback; //單路由模式去掉

    //客戶端的事件處理機制
    event_loop* _loop;

    //當前客戶端的名稱 用戶記錄日志
    const char *_name;
};

B. tcp_client在創(chuàng)建/銷毀調(diào)用Hook

lars_reactor/src/tcp_client.c

//創(chuàng)建鏈接
void tcp_client::do_connect()
{
    // ...
    // ...
  
    int ret = connect(_sockfd, (const struct sockaddr*)&_server_addr, _addrlen);
    if (ret == 0) {
        //鏈接創(chuàng)建成功  
        
        connected = true; 

        //調(diào)用開發(fā)者客戶端注冊的創(chuàng)建鏈接之后的hook函數(shù)
        if (_conn_start_cb != NULL) {
            _conn_start_cb(this, _conn_start_cb_args);
        }
      
        // ...
        // ...
    }

}

//判斷鏈接是否是創(chuàng)建鏈接,主要是針對非阻塞socket 返回EINPROGRESS錯誤
static void connection_delay(event_loop *loop, int fd, void *args)
{
    tcp_client *cli = (tcp_client*)args;
    loop->del_io_event(fd);

    int result = 0;
    socklen_t result_len = sizeof(result);
    getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len);
    if (result == 0) {
        //鏈接是建立成功的
        cli->connected = true;

        printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));

        //調(diào)用開發(fā)者注冊的創(chuàng)建鏈接Hook函數(shù)
        if (cli->_conn_start_cb != NULL) {
            cli->_conn_start_cb(cli, cli->_conn_start_cb_args);
        }

        // ....
        // ...
    }
}

//釋放鏈接資源,重置連接
void tcp_client::clean_conn()
{
    if (_sockfd != -1) {
        printf("clean conn, del socket!\n");
        _loop->del_io_event(_sockfd);
        close(_sockfd);
    }

    connected = false;

    //調(diào)用開發(fā)者注冊的銷毀鏈接之前觸發(fā)的Hook
    if (_conn_close_cb != NULL) {
        _conn_close_cb(this, _conn_close_cb_args);
    }

    //重新連接
    this->do_connect();
}

9.3 完成Lars Reactor V0.7開發(fā)

server.cpp

#include "tcp_server.h"
#include <string.h>


//回顯業(yè)務(wù)的回調(diào)函數(shù)
void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
    printf("callback_busi ...\n");
    //直接回顯
    conn->send_message(data, len, msgid);
}

//打印信息回調(diào)函數(shù)
void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
    printf("recv client: [%s]\n", data);
    printf("msgid: [%d]\n", msgid);
    printf("len: [%d]\n", len);
}


//新客戶端創(chuàng)建的回調(diào)
void on_client_build(net_connection *conn, void *args)
{
    int msgid = 101;
    const char *msg = "welcome! you online..";

    conn->send_message(msg, strlen(msg), msgid);
}

//客戶端銷毀的回調(diào)
void on_client_lost(net_connection *conn, void *args)
{
    printf("connection is lost !\n");
}


int main() 
{
    event_loop loop;

    tcp_server server(&loop, "127.0.0.1", 7777);

    //注冊消息業(yè)務(wù)路由
    server.add_msg_router(1, callback_busi);
    server.add_msg_router(2, print_busi);

    //注冊鏈接hook回調(diào)
    server.set_conn_start(on_client_build);
    server.set_conn_close(on_client_lost);

    loop.event_process();

    return 0;
}

client.cpp

#include "tcp_client.h"
#include <stdio.h>
#include <string.h>

//客戶端業(yè)務(wù)
void busi(const char *data, uint32_t len, int msgid, net_connection  *conn, void *user_data)
{
    //得到服務(wù)端回執(zhí)的數(shù)據(jù) 
    
    printf("recv server: [%s]\n", data);
    printf("msgid: [%d]\n", msgid);
    printf("len: [%d]\n", len);
}


//客戶端銷毀的回調(diào)
void on_client_build(net_connection *conn, void *args)
{
    int msgid = 1; 
    const char *msg = "Hello Lars!";

    conn->send_message(msg, strlen(msg), msgid);
}

//客戶端銷毀的回調(diào)
void on_client_lost(net_connection *conn, void *args) 
{
    printf("on_client_lost...\n");
    printf("Client is lost!\n");
}

int main() 
{
    event_loop loop;

    //創(chuàng)建tcp客戶端
    tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6");

    //注冊消息路由業(yè)務(wù)
    client.add_msg_router(1, busi);
    client.add_msg_router(101, busi);


    //設(shè)置hook函數(shù)
    client.set_conn_start(on_client_build);
    client.set_conn_close(on_client_lost);


    //開啟事件監(jiān)聽
    loop.event_process();

    return 0;
}

運行結(jié)果

服務(wù)端

$ ./server 
msg_router init...
add msg cb msgid = 1
add msg cb msgid = 2
begin accept
get new connection succ!
read data: Hello Lars!
call msgid = 1
callback_busi ...
=======
connection closed by peer
connection is lost !

客戶端:

$ ./client 
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 1
add msg cb msgid = 101
connect 127.0.0.1:7777 succ!
do write over, del EPOLLOUT
call msgid = 101
recv server: [welcome! you online..]
msgid: [101]
len: [21]
=======
call msgid = 1
recv server: [Hello Lars!]
msgid: [1]
len: [11]
=======
^C

? 這樣我們的成功的將hook機制加入進去了。


關(guān)于作者:

作者:Aceld(劉丹冰)

mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld

原創(chuàng)聲明:未經(jīng)作者允許請勿轉(zhuǎn)載, 如果轉(zhuǎn)載請注明出處

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容