(4)內(nèi)存管理Buffer(Reactor部分)【Lars-基于C++負載均衡遠程服務器調(diào)度系統(tǒng)教程】

【Lars教程目錄】

Lars源代碼
https://github.com/aceld/Lars


【Lars系統(tǒng)概述】
第1章-概述
第2章-項目目錄構建


【Lars系統(tǒng)之Reactor模型服務器框架模塊】
第1章-項目結構與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務路由分發(fā)機制
第8章-鏈接創(chuàng)建/銷毀Hook機制
第9章-消息任務隊列與線程池
第10章-配置文件讀寫功能
第11章-udp服務與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測試
第14章-異步消息任務機制
第15章-鏈接屬性設置功能


【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡介
第2章-數(shù)據(jù)庫創(chuàng)建
第3章-項目目錄結構及環(huán)境構建
第4章-Route結構的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實時監(jiān)控


【Lars系統(tǒng)之Report Service模塊】
第1章-項目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報數(shù)據(jù)
第3章-存儲線程池及消息隊列


【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項目概述及構建
第2章-主模塊業(yè)務結構搭建
第3章-Report與Dns Client設計與實現(xiàn)
第4章-負載均衡模塊基礎設計
第5章-負載均衡獲取Host主機信息API
第6章-負載均衡上報Host主機信息API
第7章-過期窗口清理與過載超時(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測試工具
第12章- Lars啟動工具腳本


3) 內(nèi)存管理與buffer封裝

? 在完成網(wǎng)絡框架之前,我們先把必須的內(nèi)存管理和buffer的封裝完成。

這里我們先創(chuàng)建一個io_buf類,主要用來封裝基本的buffer結構。然后用一個buf_pool來管理全部的buffer集合。

3.1 io_buf 內(nèi)存塊

lars_reactor/include/io_buf.h

#pragma once

/*
    定義一個 buffer存放數(shù)據(jù)的結構
 * */
class io_buf {
public:
    //構造,創(chuàng)建一個io_buf對象
    io_buf(int size);

    //清空數(shù)據(jù)
    void clear();

    //將已經(jīng)處理過的數(shù)據(jù),清空,將未處理的數(shù)據(jù)提前至數(shù)據(jù)首地址
    void adjust();

    //將其他io_buf對象數(shù)據(jù)考本到自己中
    void copy(const io_buf *other);

    //處理長度為len的數(shù)據(jù),移動head和修正length
    void pop(int len);

    //如果存在多個buffer,是采用鏈表的形式鏈接起來
    io_buf *next;

    //當前buffer的緩存容量大小
    int capacity;
    //當前buffer有效數(shù)據(jù)長度
    int length;
    //未處理數(shù)據(jù)的頭部位置索引
    int head;
    //當前io_buf所保存的數(shù)據(jù)地址
    char *data;
};

對應的io_buf實現(xiàn)的文件,如下

lars_reactor/src/io_buf.cpp

#include <stdio.h>
#include <assert.h>
#include <string.h>
#include "io_buf.h"

//構造,創(chuàng)建一個io_buf對象
io_buf::io_buf(int size):
capacity(size), 
length(0),
head(0),
next(NULL) 
{
   data = new char[size];
   assert(data);
}

//清空數(shù)據(jù)
void io_buf::clear() {
    length = head = 0;
}

//將已經(jīng)處理過的數(shù)據(jù),清空,將未處理的數(shù)據(jù)提前至數(shù)據(jù)首地址
void io_buf::adjust() {
    if (head != 0) {
        if (length != 0) {
            memmove(data, data+head, length);
        }
        head = 0;
    }
}

//將其他io_buf對象數(shù)據(jù)考本到自己中
void io_buf::copy(const io_buf *other) {
    memcpy(data, other->data + other->head, other->length);
    head = 0;
    length = other->length;
}

//處理長度為len的數(shù)據(jù),移動head和修正length
void io_buf::pop(int len) {
    length -= len;
    head += len;
}

? 這里主要要注意io_buf的兩個索引值length和head,一個是當前buffer的有效內(nèi)存長度,haed則為可用的有效長度首數(shù)據(jù)位置。 capacity是io_buf的總容量空間大小。

? 所以每次pop()則是彈出已經(jīng)處理了多少,那么buffer剩下的內(nèi)存就接下來需要處理的。

? 然而adjust()則是從新重置io_buf,將所有數(shù)據(jù)都重新變成未處理狀態(tài)。

? clear()則是將length和head清0,這里沒有提供delete真是刪除物理內(nèi)存的方法,因為這里的buffer設計是不需要清理的,接下來是用一個buf_pool來管理全部未被使用的io_buf集合。而且buf_pool的管理的內(nèi)存是程序開始預開辟的,不會做清理工作.

3.2 buf_pool 內(nèi)存池

? 接下來我們看看內(nèi)存池的設計.

lars_reactor/include/buf_pool.h

#pragma once

#include <ext/hash_map>
#include "io_buf.h"

typedef __gnu_cxx::hash_map<int, io_buf*> pool_t;

enum MEM_CAP {
    m4K     = 4096,
    m16K    = 16384,
    m64K    = 65536,
    m256K   = 262144,
    m1M     = 1048576,
    m4M     = 4194304,
    m8M     = 8388608
};


//總內(nèi)存池最大限制 單位是Kb 所以目前限制是 5GB
#define EXTRA_MEM_LIMIT (5U *1024 *1024) 

/*
 *  定義buf內(nèi)存池
 *  設計為單例
 * */
class buf_pool 
{
public:
    //初始化單例對象
    static void init() {
        //創(chuàng)建單例
        _instance = new buf_pool();
    }

    //獲取單例方法
    static buf_pool *instance() {
        //保證init方法在這個進程執(zhí)行中 只被執(zhí)行一次
        pthread_once(&_once, init);
        return _instance;
    }

    //開辟一個io_buf
    io_buf *alloc_buf(int N);
    io_buf *alloc_buf() { return alloc_buf(m4K); }


    //重置一個io_buf
    void revert(io_buf *buffer);

    
private:
    buf_pool();

    //拷貝構造私有化
    buf_pool(const buf_pool&);
    const buf_pool& operator=(const buf_pool&);

    //所有buffer的一個map集合句柄
    pool_t _pool;

    //總buffer池的內(nèi)存大小 單位為KB
    uint64_t _total_mem;

    //單例對象
    static buf_pool *_instance;

    //用于保證創(chuàng)建單例的init方法只執(zhí)行一次的鎖
    static pthread_once_t _once;

    //用戶保護內(nèi)存池鏈表修改的互斥鎖
    static pthread_mutex_t _mutex;
};

? 首先buf_pool采用單例的方式進行設計。因為系統(tǒng)希望僅有一個內(nèi)存池管理模塊。這里內(nèi)存池用一個__gnu_cxx::hash_map<int, io_buf*>的map類型進行管理,其中key是每個組內(nèi)存的空間容量,參考

enum MEM_CAP {
    m4K     = 4096,
    m16K    = 16384,
    m64K    = 65536,
    m256K   = 262144,
    m1M     = 1048576,
    m4M     = 4194304,
    m8M     = 8388608
};

? 其中每個key下面掛在一個io_buf鏈表。而且buf_pool預先會給map下的每個key的內(nèi)存組開辟好一定數(shù)量的內(nèi)存塊。然后上層用戶在使用的時候每次取出一個內(nèi)存塊,就會將該內(nèi)存塊從該內(nèi)存組摘掉。當然使用完就放回來。如果不夠使用會額外開辟,也有最大的內(nèi)存限制,在宏EXTRA_MEM_LIMIT中。

具體的buf_pool實現(xiàn)如下:

lars_reactor/src/buf_pool.cpp

#include "buf_pool.h"
#include <assert.h>


//單例對象
buf_pool * buf_pool::_instance = NULL;

//用于保證創(chuàng)建單例的init方法只執(zhí)行一次的鎖
pthread_once_t buf_pool::_once = PTHREAD_ONCE_INIT;


//用戶保護內(nèi)存池鏈表修改的互斥鎖
pthread_mutex_t buf_pool::_mutex = PTHREAD_MUTEX_INITIALIZER;


//構造函數(shù) 主要是預先開辟一定量的空間
//這里buf_pool是一個hash,每個key都是不同空間容量
//對應的value是一個io_buf集合的鏈表
//buf_pool -->  [m4K] -- io_buf-io_buf-io_buf-io_buf...
//              [m16K] -- io_buf-io_buf-io_buf-io_buf...
//              [m64K] -- io_buf-io_buf-io_buf-io_buf...
//              [m256K] -- io_buf-io_buf-io_buf-io_buf...
//              [m1M] -- io_buf-io_buf-io_buf-io_buf...
//              [m4M] -- io_buf-io_buf-io_buf-io_buf...
//              [m8M] -- io_buf-io_buf-io_buf-io_buf...
buf_pool::buf_pool():_total_mem(0)
{
    io_buf *prev; 
    
    //----> 開辟4K buf 內(nèi)存池
    _pool[m4K] = new io_buf(m4K);
    if (_pool[m4K] == NULL) {
        fprintf(stderr, "new io_buf m4K error");
        exit(1);
    }

    prev = _pool[m4K];
    //4K的io_buf 預先開辟5000個,約20MB供開發(fā)者使用
    for (int i = 1; i < 5000; i ++) {
        prev->next = new io_buf(m4K);
        if (prev->next == NULL) {
            fprintf(stderr, "new io_buf m4K error");
            exit(1);
        }
        prev = prev->next;
    }
    _total_mem += 4 * 5000;



    //----> 開辟16K buf 內(nèi)存池
    _pool[m16K] = new io_buf(m16K);
    if (_pool[m16K] == NULL) {
        fprintf(stderr, "new io_buf m16K error");
        exit(1);
    }

    prev = _pool[m16K];
    //16K的io_buf 預先開辟1000個,約16MB供開發(fā)者使用
    for (int i = 1; i < 1000; i ++) {
        prev->next = new io_buf(m16K);
        if (prev->next == NULL) {
            fprintf(stderr, "new io_buf m16K error");
            exit(1);
        }
        prev = prev->next;
    }
    _total_mem += 16 * 1000;



    //----> 開辟64K buf 內(nèi)存池
    _pool[m64K] = new io_buf(m64K);
    if (_pool[m64K] == NULL) {
        fprintf(stderr, "new io_buf m64K error");
        exit(1);
    }

    prev = _pool[m64K];
    //64K的io_buf 預先開辟500個,約32MB供開發(fā)者使用
    for (int i = 1; i < 500; i ++) {
        prev->next = new io_buf(m64K);
        if (prev->next == NULL) {
            fprintf(stderr, "new io_buf m64K error");
            exit(1);
        }
        prev = prev->next;
    }
    _total_mem += 64 * 500;


    //----> 開辟256K buf 內(nèi)存池
    _pool[m256K] = new io_buf(m256K);
    if (_pool[m256K] == NULL) {
        fprintf(stderr, "new io_buf m256K error");
        exit(1);
    }

    prev = _pool[m256K];
    //256K的io_buf 預先開辟200個,約50MB供開發(fā)者使用
    for (int i = 1; i < 200; i ++) {
        prev->next = new io_buf(m256K);
        if (prev->next == NULL) {
            fprintf(stderr, "new io_buf m256K error");
            exit(1);
        }
        prev = prev->next;
    }
    _total_mem += 256 * 200;


    //----> 開辟1M buf 內(nèi)存池
    _pool[m1M] = new io_buf(m1M);
    if (_pool[m1M] == NULL) {
        fprintf(stderr, "new io_buf m1M error");
        exit(1);
    }

    prev = _pool[m1M];
    //1M的io_buf 預先開辟50個,約50MB供開發(fā)者使用
    for (int i = 1; i < 50; i ++) {
        prev->next = new io_buf(m1M);
        if (prev->next == NULL) {
            fprintf(stderr, "new io_buf m1M error");
            exit(1);
        }
        prev = prev->next;
    }
    _total_mem += 1024 * 50;


    //----> 開辟4M buf 內(nèi)存池
    _pool[m4M] = new io_buf(m4M);
    if (_pool[m4M] == NULL) {
        fprintf(stderr, "new io_buf m4M error");
        exit(1);
    }

    prev = _pool[m4M];
    //4M的io_buf 預先開辟20個,約80MB供開發(fā)者使用
    for (int i = 1; i < 20; i ++) {
        prev->next = new io_buf(m4M);
        if (prev->next == NULL) {
            fprintf(stderr, "new io_buf m4M error");
            exit(1);
        }
        prev = prev->next;
    }
    _total_mem += 4096 * 20;



    //----> 開辟8M buf 內(nèi)存池
    _pool[m8M] = new io_buf(m8M);
    if (_pool[m8M] == NULL) {
        fprintf(stderr, "new io_buf m8M error");
        exit(1);
    }

    prev = _pool[m8M];
    //8M的io_buf 預先開辟10個,約80MB供開發(fā)者使用
    for (int i = 1; i < 10; i ++) {
        prev->next = new io_buf(m8M);
        if (prev->next == NULL) {
            fprintf(stderr, "new io_buf m8M error");
            exit(1);
        }
        prev = prev->next;
    }
    _total_mem += 8192 * 10;
}


//開辟一個io_buf
//1 如果上層需要N個字節(jié)的大小的空間,找到與N最接近的buf hash組,取出,
//2 如果該組已經(jīng)沒有節(jié)點使用,可以額外申請
//3 總申請長度不能夠超過最大的限制大小 EXTRA_MEM_LIMIT
//4 如果有該節(jié)點需要的內(nèi)存塊,直接取出,并且將該內(nèi)存塊從pool摘除
io_buf *buf_pool::alloc_buf(int N) 
{
    //1 找到N最接近哪hash 組
    int index;
    if (N <= m4K) {
        index = m4K;
    }
    else if (N <= m16K) {
        index = m16K;
    }
    else if (N <= m64K) {
        index = m64K;
    }
    else if (N <= m256K) {
        index = m256K;
    }
    else if (N <= m1M) {
        index = m1M;
    }
    else if (N <= m4M) {
        index = m4M;
    }
    else if (N <= m8M) {
        index = m8M;
    }
    else {
        return NULL;
    }


    //2 如果該組已經(jīng)沒有,需要額外申請,那么需要加鎖保護
    pthread_mutex_lock(&_mutex);
    if (_pool[index] == NULL) {
        if (_total_mem + index/1024 >= EXTRA_MEM_LIMIT) {
            //當前的開辟的空間已經(jīng)超過最大限制
            fprintf(stderr, "already use too many memory!\n");
            exit(1);
        }

        io_buf *new_buf = new io_buf(index);
        if (new_buf == NULL) {
            fprintf(stderr, "new io_buf error\n");
            exit(1);
        }
        _total_mem += index/1024;
        pthread_mutex_unlock(&_mutex);
        return new_buf;
    }

    //3 從pool中摘除該內(nèi)存塊
    io_buf *target = _pool[index];
    _pool[index] = target->next;
    pthread_mutex_unlock(&_mutex);
    
    target->next = NULL;
    
    return target;
}


//重置一個io_buf,將一個buf 上層不再使用,或者使用完成之后,需要將該buf放回pool中
void buf_pool::revert(io_buf *buffer)
{
    //每個buf的容量都是固定的 在hash的key中取值
    int index = buffer->capacity;
    //重置io_buf中的內(nèi)置位置指針
    buffer->length = 0;
    buffer->head = 0;

    pthread_mutex_lock(&_mutex);
    //找到對應的hash組 buf首屆點地址
    assert(_pool.find(index) != _pool.end());

    //將buffer插回鏈表頭部
    buffer->next = _pool[index];
    _pool[index] = buffer;

    pthread_mutex_unlock(&_mutex);
}

? 其中,buf_pool構造函數(shù)中實現(xiàn)了內(nèi)存池的hash預開辟內(nèi)存工作,具體的數(shù)據(jù)結構如下

//buf_pool -->  [m4K] --> io_buf-io_buf-io_buf-io_buf...
//              [m16K] --> io_buf-io_buf-io_buf-io_buf...
//              [m64K] --> io_buf-io_buf-io_buf-io_buf...
//              [m256K] --> io_buf-io_buf-io_buf-io_buf...
//              [m1M] --> io_buf-io_buf-io_buf-io_buf...
//              [m4M] --> io_buf-io_buf-io_buf-io_buf...
//              [m8M] --> io_buf-io_buf-io_buf-io_buf...

? alloc_buf()方法,是調(diào)用者從內(nèi)存池中取出一塊內(nèi)存,如果最匹配的內(nèi)存塊存在,則返回,并將該塊內(nèi)存從buf_pool中摘除掉,如果沒有則開辟一個內(nèi)存出來。 revert()方法則是將已經(jīng)使用完的io_buf重新放回buf_pool中。

3.3 讀寫buffer機制

? 那么接下來我們就需要實現(xiàn)一個專門用來讀(輸入)數(shù)據(jù)的input_buf和專門用來寫(輸出)數(shù)據(jù)的output_buf類了。由于這兩個人都應該擁有一些io_buf的特性,所以我們先定義一個基礎的父類reactor_buf。

A. reactor_buf類

lars_reactor/include/reactor_buf.h

#pragma once
#include "io_buf.h"
#include "buf_pool.h"
#include <assert.h>
#include <unistd.h>


/*
 * 給業(yè)務層提供的最后tcp_buffer結構
 * */
class reactor_buf {
public:
    reactor_buf();
    ~reactor_buf();

    const int length() const;
    void pop(int len);
    void clear();

protected:
    io_buf *_buf;
};


? 這個的作用就是將io_buf作為自己的一個成員,然后做了一些包裝。具體方法實現(xiàn)如下。

lars_reactor/src/reactor.cpp

#include "reactor_buf.h"
#include <sys/ioctl.h>
#include <string.h>

reactor_buf::reactor_buf() 
{
    _buf = NULL;
}

reactor_buf::~reactor_buf()
{
    clear();
}

const int reactor_buf::length() const 
{
    return _buf != NULL? _buf->length : 0;
}

void reactor_buf::pop(int len) 
{
    assert(_buf != NULL && len <= _buf->length);

    _buf->pop(len);

    //當此時_buf的可用長度已經(jīng)為0
    if(_buf->length == 0) {
        //將_buf重新放回buf_pool中
        buf_pool::instance()->revert(_buf);
        _buf = NULL;
    }
}

void reactor_buf::clear()
{
    if (_buf != NULL)  {
        //將_buf重新放回buf_pool中
        buf_pool::instance()->revert(_buf);
        _buf = NULL;
    }
}

B. input_buf類

? 接下來就可以集成reactor_buf類實現(xiàn)input_buf類的設計了。

lars_reactor/include/reactor_buf.h

//讀(輸入) 緩存buffer
class input_buf : public reactor_buf 
{
public:
    //從一個fd中讀取數(shù)據(jù)到reactor_buf中
    int read_data(int fd);

    //取出讀到的數(shù)據(jù)
    const char *data() const;

    //重置緩沖區(qū)
    void adjust();
};

? 其中data()方法即取出已經(jīng)讀取的數(shù)據(jù),adjust()含義和io_buf含義一致。主要是read_data()方法。具體實現(xiàn)如下。

lars_reactor/src/reactor.cpp

//從一個fd中讀取數(shù)據(jù)到reactor_buf中
int input_buf::read_data(int fd)
{
    int need_read;//硬件有多少數(shù)據(jù)可以讀

    //一次性讀出所有的數(shù)據(jù)
    //需要給fd設置FIONREAD,
    //得到read緩沖中有多少數(shù)據(jù)是可以讀取的
    if (ioctl(fd, FIONREAD, &need_read) == -1) {
        fprintf(stderr, "ioctl FIONREAD\n");
        return -1;
    }

    
    if (_buf == NULL) {
        //如果io_buf為空,從內(nèi)存池申請
        _buf = buf_pool::instance()->alloc_buf(need_read);
        if (_buf == NULL) {
            fprintf(stderr, "no idle buf for alloc\n");
            return -1;
        }
    }
    else {
        //如果io_buf可用,判斷是否夠存
        assert(_buf->head == 0);
        if (_buf->capacity - _buf->length < (int)need_read) {
            //不夠存,沖內(nèi)存池申請
            io_buf *new_buf = buf_pool::instance()->alloc_buf(need_read+_buf->length);
            if (new_buf == NULL) {
                fprintf(stderr, "no ilde buf for alloc\n");
                return -1;
            }
            //將之前的_buf的數(shù)據(jù)考到新申請的buf中
            new_buf->copy(_buf);
            //將之前的_buf放回內(nèi)存池中
            buf_pool::instance()->revert(_buf);
            //新申請的buf成為當前io_buf
            _buf = new_buf;
        }
    }

    //讀取數(shù)據(jù)
    int already_read = 0;
    do { 
        //讀取的數(shù)據(jù)拼接到之前的數(shù)據(jù)之后
        if(need_read == 0) {
            //可能是read阻塞讀數(shù)據(jù)的模式,對方未寫數(shù)據(jù)
            already_read = read(fd, _buf->data + _buf->length, m4K);
        } else {
            already_read = read(fd, _buf->data + _buf->length, need_read);
        }
    } while (already_read == -1 && errno == EINTR); //systemCall引起的中斷 繼續(xù)讀取
    if (already_read > 0)  {
        if (need_read != 0) {
            assert(already_read == need_read);
        }
        _buf->length += already_read;
    }

    return already_read;
}

//取出讀到的數(shù)據(jù)
const char *input_buf::data() const 
{
    return _buf != NULL ? _buf->data + _buf->head : NULL;
}

//重置緩沖區(qū)
void input_buf::adjust()
{
    if (_buf != NULL) {
        _buf->adjust();
    }
}

C. output_buf類

? 接下來就可以集成reactor_buf類實現(xiàn)output_buf類的設計了。

lars_reactor/include/reactor_buf.h

//寫(輸出)  緩存buffer
class output_buf : public reactor_buf 
{
public:
    //將一段數(shù)據(jù) 寫到一個reactor_buf中
    int send_data(const char *data, int datalen);

    //將reactor_buf中的數(shù)據(jù)寫到一個fd中
    int write2fd(int fd);
};

? send_data()方法主要是將數(shù)據(jù)寫到io_buf中,實際上并沒有做真正的寫操作。而是當調(diào)用write2fd方法時,才會將io_buf的數(shù)據(jù)寫到對應的fd中。send_data是做一些buf內(nèi)存塊的申請等工作。具體實現(xiàn)如下

lars_reactor/src/reactor.cpp

//將一段數(shù)據(jù) 寫到一個reactor_buf中
int output_buf::send_data(const char *data, int datalen)
{
    if (_buf == NULL) {
        //如果io_buf為空,從內(nèi)存池申請
        _buf = buf_pool::instance()->alloc_buf(datalen);
        if (_buf == NULL) {
            fprintf(stderr, "no idle buf for alloc\n");
            return -1;
        }
    }
    else {
        //如果io_buf可用,判斷是否夠存
        assert(_buf->head == 0);
        if (_buf->capacity - _buf->length < datalen) {
            //不夠存,沖內(nèi)存池申請
            io_buf *new_buf = buf_pool::instance()->alloc_buf(datalen+_buf->length);
            if (new_buf == NULL) {
                fprintf(stderr, "no ilde buf for alloc\n");
                return -1;
            }
            //將之前的_buf的數(shù)據(jù)考到新申請的buf中
            new_buf->copy(_buf);
            //將之前的_buf放回內(nèi)存池中
            buf_pool::instance()->revert(_buf);
            //新申請的buf成為當前io_buf
            _buf = new_buf;
        }
    }

    //將data數(shù)據(jù)拷貝到io_buf中,拼接到后面
    memcpy(_buf->data + _buf->length, data, datalen);
    _buf->length += datalen;

    return 0;
}

//將reactor_buf中的數(shù)據(jù)寫到一個fd中
int output_buf::write2fd(int fd)
{
    assert(_buf != NULL && _buf->head == 0);

    int already_write = 0;

    do { 
        already_write = write(fd, _buf->data, _buf->length);
    } while (already_write == -1 && errno == EINTR); //systemCall引起的中斷,繼續(xù)寫


    if (already_write > 0) {
        //已經(jīng)處理的數(shù)據(jù)清空
        _buf->pop(already_write);
        //未處理數(shù)據(jù)前置,覆蓋老數(shù)據(jù)
        _buf->adjust();
    }

    //如果fd非阻塞,可能會得到EAGAIN錯誤
    if (already_write == -1 && errno == EAGAIN) {
        already_write = 0;//不是錯誤,僅僅返回0,表示目前是不可以繼續(xù)寫的
    }

    return already_write;
}

? 現(xiàn)在我們已經(jīng)完成了內(nèi)存管理及讀寫buf機制的實現(xiàn),接下來就要簡單的測試一下,用我們之前的V0.1版本的reactor server來測試。

3.4 完成Lars Reactor V0.2開發(fā)

A. 修改tcp_server

? 主要修正do_accept()方法,加上reactor_buf機制.

lars_reactor/src/tcp_server.cpp

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>

#include <unistd.h>
#include <signal.h>
#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>
#include <arpa/inet.h>
#include <errno.h>

#include "tcp_server.h"
#include "reactor_buf.h"



//server的構造函數(shù)
tcp_server::tcp_server(const char *ip, uint16_t port)
{
    //...
}

//開始提供創(chuàng)建鏈接服務
void tcp_server::do_accept()
{
    int connfd;    
    while(true) {
        //accept與客戶端創(chuàng)建鏈接
        printf("begin accept\n");
        connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
        if (connfd == -1) {
            if (errno == EINTR) {
                fprintf(stderr, "accept errno=EINTR\n");
                continue;
            }
            else if (errno == EMFILE) {
                //建立鏈接過多,資源不夠
                fprintf(stderr, "accept errno=EMFILE\n");
            }
            else if (errno == EAGAIN) {
                fprintf(stderr, "accept errno=EAGAIN\n");
                break;
            }
            else {
                fprintf(stderr, "accept error");
                exit(1);
            }
        }
        else {
            //accept succ!
            
            int ret = 0;
            input_buf ibuf;
            output_buf obuf;

            char *msg = NULL;
            int msg_len = 0;
            do { 
                ret = ibuf.read_data(connfd);
                if (ret == -1) {
                    fprintf(stderr, "ibuf read_data error\n");
                    break;
                }
                printf("ibuf.length() = %d\n", ibuf.length());

                
                //將讀到的數(shù)據(jù)放在msg中
                msg_len = ibuf.length();
                msg = (char*)malloc(msg_len);
                bzero(msg, msg_len);
                memcpy(msg, ibuf.data(), msg_len);
                ibuf.pop(msg_len);
                ibuf.adjust();

                printf("recv data = %s\n", msg);

                //回顯數(shù)據(jù)
                obuf.send_data(msg, msg_len);
                while(obuf.length()) {
                    int write_ret = obuf.write2fd(connfd);
                    if (write_ret == -1) {
                        fprintf(stderr, "write connfd error\n");
                        return;
                    }
                    else if(write_ret == 0) {
                        //不是錯誤,表示此時不可寫
                        break;
                    }
                }
                 

                free(msg);
                    
            } while (ret != 0);     


            //Peer is closed
            close(connfd);
        }
    }
}

編譯生成新的liblreactor.a

$cd lars_reactor/
$make
g++ -g -O2 -Wall -fPIC -Wno-deprecated -c -o src/tcp_server.o src/tcp_server.cpp -I./include
g++ -g -O2 -Wall -fPIC -Wno-deprecated -c -o src/io_buf.o src/io_buf.cpp -I./include
g++ -g -O2 -Wall -fPIC -Wno-deprecated -c -o src/reactor_buf.o src/reactor_buf.cpp -I./include
g++ -g -O2 -Wall -fPIC -Wno-deprecated -c -o src/buf_pool.o src/buf_pool.cpp -I./include
mkdir -p lib
ar cqs lib/liblreactor.a src/tcp_server.o src/io_buf.o src/reactor_buf.o src/buf_pool.o

B. 編譯V0.2 server APP

? 我們將lars_reactor/example/lars_reactor_0.1 的代碼復制一份到 lars_reactor/example/lars_reactor_0.2中。

由于我們這里使用了pthread庫,所以在lars_reactor_0.2的Makefile文件要加上pthread庫的關聯(lián)

lars_reactor/example/lars_reactor_0.2/Makefile

CXX=g++
CFLAGS=-g -O2 -Wall -fPIC -Wno-deprecated 

INC=-I../../include
LIB=-L../../lib -llreactor -lpthread
OBJS = $(addsuffix .o, $(basename $(wildcard *.cc)))

all:
    $(CXX) -o lars_reactor $(CFLAGS)  lars_reactor.cpp $(INC) $(LIB)

clean:
    -rm -f *.o lars_reactor

編譯在lars_reactor/example/lars_reactor_0.2/

$ cd lars_reactor/example/lars_reactor_0.2/
$ make
g++ -o lars_reactor -g -O2 -Wall -fPIC -Wno-deprecated   lars_reactor.cpp -I../../include -L../../lib -llreactor  -lpthread

C. 測試

啟動server

$ ./lars_reactor 
begin accept

啟動client

$ nc 127.0.0.1 7777

客戶端輸入 文字,效果如下:

服務端:

ibuf.length() = 21
recv data = hello lars, By Aceld

客戶端:

$ nc 127.0.0.1 7777
hello lars, By Aceld
hello lars, By Aceld

? ok!現(xiàn)在我們的讀寫buffer機制已經(jīng)成功的集成到我們的lars網(wǎng)絡框架中了。


關于作者:

作者:Aceld(劉丹冰)

mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld

原創(chuàng)聲明:未經(jīng)作者允許請勿轉載, 如果轉載請注明出處

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容