【Lars教程目錄】
Lars源代碼
https://github.com/aceld/Lars
【Lars系統(tǒng)概述】
第1章-概述
第2章-項(xiàng)目目錄構(gòu)建
【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項(xiàng)目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶(hù)端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機(jī)制
第8章-鏈接創(chuàng)建/銷(xiāo)毀Hook機(jī)制
第9章-消息任務(wù)隊(duì)列與線程池
第10章-配置文件讀寫(xiě)功能
第11章-udp服務(wù)與客戶(hù)端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測(cè)試
第14章-異步消息任務(wù)機(jī)制
第15章-鏈接屬性設(shè)置功能
【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡(jiǎn)介
第2章-數(shù)據(jù)庫(kù)創(chuàng)建
第3章-項(xiàng)目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實(shí)時(shí)監(jiān)控
【Lars系統(tǒng)之Report Service模塊】
第1章-項(xiàng)目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報(bào)數(shù)據(jù)
第3章-存儲(chǔ)線程池及消息隊(duì)列
【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項(xiàng)目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計(jì)與實(shí)現(xiàn)
第4章-負(fù)載均衡模塊基礎(chǔ)設(shè)計(jì)
第5章-負(fù)載均衡獲取Host主機(jī)信息API
第6章-負(fù)載均衡上報(bào)Host主機(jī)信息API
第7章-過(guò)期窗口清理與過(guò)載超時(shí)(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負(fù)載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測(cè)試工具
第12章- Lars啟動(dòng)工具腳本
4) 事件觸發(fā)event_loop
? 接下來(lái)我們要嘗試添加多路IO的處理機(jī)制,當(dāng)然linux的平臺(tái)下, 最優(yōu)的選擇就是使用epoll來(lái)做,但是用原生的epoll實(shí)際上編程起來(lái)擴(kuò)展性不是很強(qiáng),那么我們就需要封裝一套IO事件處理機(jī)制。
4.1 io_event基于IO事件封裝
? 我們首先定義一個(gè)IO事件類(lèi)來(lái)包括一個(gè)時(shí)間需要擁有的基本成員信息.
lars_reactor/include/event_base.h
#pragma once
/*
* 定義一些IO復(fù)用機(jī)制或者其他異常觸發(fā)機(jī)制的事件封裝
*
* */
class event_loop;
//IO事件觸發(fā)的回調(diào)函數(shù)
typedef void io_callback(event_loop *loop, int fd, void *args);
/*
* 封裝一次IO觸發(fā)實(shí)現(xiàn)
* */
struct io_event
{
io_event():read_callback(NULL),write_callback(NULL),rcb_args(NULL),wcb_args(NULL) {}
int mask; //EPOLLIN EPOLLOUT
io_callback *read_callback; //EPOLLIN事件 觸發(fā)的回調(diào)
io_callback *write_callback;//EPOLLOUT事件 觸發(fā)的回調(diào)
void *rcb_args; //read_callback的回調(diào)函數(shù)參數(shù)
void *wcb_args; //write_callback的回調(diào)函數(shù)參數(shù)
};
? 一個(gè)io_event對(duì)象應(yīng)該包含 一個(gè)epoll的事件標(biāo)識(shí)EPOLLIN/EPOLLOUT,和對(duì)應(yīng)事件的處理函數(shù)read_callback,write_callback。他們都應(yīng)該是io_callback類(lèi)型。然后對(duì)應(yīng)的函數(shù)形參。
4.2 event_loop事件循環(huán)處理機(jī)制
? 接下來(lái)我們就要通過(guò)event_loop類(lèi)來(lái)實(shí)現(xiàn)io_event的基本增刪操作,放在原生的epoll堆中。
lars_reactor/include/event_loop.h
#pragma once
/*
*
* event_loop事件處理機(jī)制
*
* */
#include <sys/epoll.h>
#include <ext/hash_map>
#include <ext/hash_set>
#include "event_base.h"
#define MAXEVENTS 10
// map: fd->io_event
typedef __gnu_cxx::hash_map<int, io_event> io_event_map;
//定義指向上面map類(lèi)型的迭代器
typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it;
//全部正在監(jiān)聽(tīng)的fd集合
typedef __gnu_cxx::hash_set<int> listen_fd_set;
class event_loop
{
public:
//構(gòu)造,初始化epoll堆
event_loop();
//阻塞循環(huán)處理事件
void event_process();
//添加一個(gè)io事件到loop中
void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL);
//刪除一個(gè)io事件從loop中
void del_io_event(int fd);
//刪除一個(gè)io事件的EPOLLIN/EPOLLOUT
void del_io_event(int fd, int mask);
private:
int _epfd; //epoll fd
//當(dāng)前event_loop 監(jiān)控的fd和對(duì)應(yīng)事件的關(guān)系
io_event_map _io_evs;
//當(dāng)前event_loop 一共哪些fd在監(jiān)聽(tīng)
listen_fd_set listen_fds;
//一次性最大處理的事件
struct epoll_event _fired_evs[MAXEVENTS];
};
屬性:
_epfd:是epoll原生堆的fd。
_io_evs:是一個(gè)hash_map對(duì)象,主要是方便我們管理fd<—>io_event的對(duì)應(yīng)關(guān)系,方便我們來(lái)查找和處理。
_listen_fds:記錄目前一共有多少個(gè)fd正在本我們的event_loop機(jī)制所監(jiān)控.
_fried_evs:已經(jīng)通過(guò)epoll_wait返回的被激活需要上層處理的fd集合.
方法:
event_loop():構(gòu)造函數(shù),主要初始化epoll.
event_process():永久阻塞,等待觸發(fā)的事件,去調(diào)用對(duì)應(yīng)的函數(shù)callback方法。
add_io_event():綁定一個(gè)fd和一個(gè)io_event的關(guān)系,并添加對(duì)應(yīng)的事件到event_loop中。
del_io_event():從event_loop刪除該事件。
? 具體實(shí)現(xiàn)方法如下:
lars_reactor/src/event_loop.cpp
#include "event_loop.h"
#include <assert.h>
//構(gòu)造,初始化epoll堆
event_loop::event_loop()
{
//flag=0 等價(jià)于epll_craete
_epfd = epoll_create1(0);
if (_epfd == -1) {
fprintf(stderr, "epoll_create error\n");
exit(1);
}
}
//阻塞循環(huán)處理事件
void event_loop::event_process()
{
while (true) {
io_event_map_it ev_it;
int nfds = epoll_wait(_epfd, _fired_evs, MAXEVENTS, 10);
for (int i = 0; i < nfds; i++) {
//通過(guò)觸發(fā)的fd找到對(duì)應(yīng)的綁定事件
ev_it = _io_evs.find(_fired_evs[i].data.fd);
assert(ev_it != _io_evs.end());
io_event *ev = &(ev_it->second);
if (_fired_evs[i].events & EPOLLIN) {
//讀事件,掉讀回調(diào)函數(shù)
void *args = ev->rcb_args;
ev->read_callback(this, _fired_evs[i].data.fd, args);
}
else if (_fired_evs[i].events & EPOLLOUT) {
//寫(xiě)事件,掉寫(xiě)回調(diào)函數(shù)
void *args = ev->wcb_args;
ev->write_callback(this, _fired_evs[i].data.fd, args);
}
else if (_fired_evs[i].events &(EPOLLHUP|EPOLLERR)) {
//水平觸發(fā)未處理,可能會(huì)出現(xiàn)HUP事件,正常處理讀寫(xiě),沒(méi)有則清空
if (ev->read_callback != NULL) {
void *args = ev->rcb_args;
ev->read_callback(this, _fired_evs[i].data.fd, args);
}
else if (ev->write_callback != NULL) {
void *args = ev->wcb_args;
ev->write_callback(this, _fired_evs[i].data.fd, args);
}
else {
//刪除
fprintf(stderr, "fd %d get error, delete it from epoll\n", _fired_evs[i].data.fd);
this->del_io_event(_fired_evs[i].data.fd);
}
}
}
}
}
/*
* 這里我們處理的事件機(jī)制是
* 如果EPOLLIN 在mask中, EPOLLOUT就不允許在mask中
* 如果EPOLLOUT 在mask中, EPOLLIN就不允許在mask中
* 如果想注冊(cè)EPOLLIN|EPOLLOUT的事件, 那么就調(diào)用add_io_event() 方法兩次來(lái)注冊(cè)。
* */
//添加一個(gè)io事件到loop中
void event_loop::add_io_event(int fd, io_callback *proc, int mask, void *args)
{
int final_mask;
int op;
//1 找到當(dāng)前fd是否已經(jīng)有事件
io_event_map_it it = _io_evs.find(fd);
if (it == _io_evs.end()) {
//2 如果沒(méi)有操作動(dòng)作就是ADD
//沒(méi)有找到
final_mask = mask;
op = EPOLL_CTL_ADD;
}
else {
//3 如果有操作董酒是MOD
//添加事件標(biāo)識(shí)位
final_mask = it->second.mask | mask;
op = EPOLL_CTL_MOD;
}
//4 注冊(cè)回調(diào)函數(shù)
if (mask & EPOLLIN) {
//讀事件回調(diào)函數(shù)注冊(cè)
_io_evs[fd].read_callback = proc;
_io_evs[fd].rcb_args = args;
}
else if (mask & EPOLLOUT) {
_io_evs[fd].write_callback = proc;
_io_evs[fd].wcb_args = args;
}
//5 epoll_ctl添加到epoll堆里
_io_evs[fd].mask = final_mask;
//創(chuàng)建原生epoll事件
struct epoll_event event;
event.events = final_mask;
event.data.fd = fd;
if (epoll_ctl(_epfd, op, fd, &event) == -1) {
fprintf(stderr, "epoll ctl %d error\n", fd);
return;
}
//6 將fd添加到監(jiān)聽(tīng)集合中
listen_fds.insert(fd);
}
//刪除一個(gè)io事件從loop中
void event_loop::del_io_event(int fd)
{
//將事件從_io_evs刪除
_io_evs.erase(fd);
//將fd從監(jiān)聽(tīng)集合中刪除
listen_fds.erase(fd);
//將fd從epoll堆刪除
epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
}
//刪除一個(gè)io事件的EPOLLIN/EPOLLOUT
void event_loop::del_io_event(int fd, int mask)
{
//如果沒(méi)有該事件,直接返回
io_event_map_it it = _io_evs.find(fd);
if (it == _io_evs.end()) {
return ;
}
int &o_mask = it->second.mask;
//修正mask
o_mask = o_mask & (~mask);
if (o_mask == 0) {
//如果修正之后 mask為0,則刪除
this->del_io_event(fd);
}
else {
//如果修正之后,mask非0,則修改
struct epoll_event event;
event.events = o_mask;
event.data.fd = fd;
epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &event);
}
}
? 這里del_io_event提供兩個(gè)重載,一個(gè)是直接刪除事件,一個(gè)是修正事件。
4.3 Reactor集成event_loop機(jī)制
? 好了,那么接下來(lái),就讓讓Lars Reactor框架集成event_loop機(jī)制。
首先簡(jiǎn)單修正一個(gè)tcp_server.cpp文件,對(duì)之前的do_accept()的調(diào)度時(shí)機(jī)做一下修正。
1. 在`tcp_server`成員新增`event_loop`成員。
lars_reactor/include/tcp_server.h
#pragma once
#include <netinet/in.h>
#include "event_loop.h"
class tcp_server
{
public:
//server的構(gòu)造函數(shù)
tcp_server(event_loop* loop, const char *ip, uint16_t port);
//開(kāi)始提供創(chuàng)建鏈接服務(wù)
void do_accept();
//鏈接對(duì)象釋放的析構(gòu)
~tcp_server();
private:
int _sockfd; //套接字
struct sockaddr_in _connaddr; //客戶(hù)端鏈接地址
socklen_t _addrlen; //客戶(hù)端鏈接地址長(zhǎng)度
// ============= 新增 ======================
//event_loop epoll事件機(jī)制
event_loop* _loop;
// ============= 新增 ======================
};
- 構(gòu)造函數(shù)在創(chuàng)建完listen fd之后,添加accept事件。
lars_reactor/src/tcp_server.cpp
//listen fd 客戶(hù)端有新鏈接請(qǐng)求過(guò)來(lái)的回調(diào)函數(shù)
void accept_callback(event_loop *loop, int fd, void *args)
{
tcp_server *server = (tcp_server*)args;
server->do_accept();
}
//server的構(gòu)造函數(shù)
tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port)
{
bzero(&_connaddr, sizeof(_connaddr));
//忽略一些信號(hào) SIGHUP, SIGPIPE
//SIGPIPE:如果客戶(hù)端關(guān)閉,服務(wù)端再次write就會(huì)產(chǎn)生
//SIGHUP:如果terminal關(guān)閉,會(huì)給當(dāng)前進(jìn)程發(fā)送該信號(hào)
if (signal(SIGHUP, SIG_IGN) == SIG_ERR) {
fprintf(stderr, "signal ignore SIGHUP\n");
}
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
fprintf(stderr, "signal ignore SIGPIPE\n");
}
//1. 創(chuàng)建socket
_sockfd = socket(AF_INET, SOCK_STREAM /*| SOCK_NONBLOCK*/ | SOCK_CLOEXEC, IPPROTO_TCP);
if (_sockfd == -1) {
fprintf(stderr, "tcp_server::socket()\n");
exit(1);
}
//2 初始化地址
struct sockaddr_in server_addr;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
inet_aton(ip, &server_addr.sin_addr);
server_addr.sin_port = htons(port);
//2-1可以多次監(jiān)聽(tīng),設(shè)置REUSE屬性
int op = 1;
if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &op, sizeof(op)) < 0) {
fprintf(stderr, "setsocketopt SO_REUSEADDR\n");
}
//3 綁定端口
if (bind(_sockfd, (const struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
fprintf(stderr, "bind error\n");
exit(1);
}
//4 監(jiān)聽(tīng)ip端口
if (listen(_sockfd, 500) == -1) {
fprintf(stderr, "listen error\n");
exit(1);
}
// ============= 新增 ======================
//5 將_sockfd添加到event_loop中
_loop = loop;
//6 注冊(cè)_socket讀事件-->accept處理
_loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this);
// ============= 新增 ======================
}
- 修改do_accept()方法
lars_reactor/src/tcp_server.cpp
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <unistd.h>
#include <signal.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <arpa/inet.h>
#include <errno.h>
#include "tcp_server.h"
#include "reactor_buf.h"
//臨時(shí)的收發(fā)消息
struct message{
char data[m4K];
char len;
};
struct message msg;
void server_rd_callback(event_loop *loop, int fd, void *args);
void server_wt_callback(event_loop *loop, int fd, void *args);
//...省略其他代碼
//...省略其他代碼
//server read_callback
void server_rd_callback(event_loop *loop, int fd, void *args)
{
int ret = 0;
struct message *msg = (struct message*)args;
input_buf ibuf;
ret = ibuf.read_data(fd);
if (ret == -1) {
fprintf(stderr, "ibuf read_data error\n");
//刪除事件
loop->del_io_event(fd);
//對(duì)端關(guān)閉
close(fd);
return;
}
if (ret == 0) {
//刪除事件
loop->del_io_event(fd);
//對(duì)端關(guān)閉
close(fd);
return ;
}
printf("ibuf.length() = %d\n", ibuf.length());
//將讀到的數(shù)據(jù)放在msg中
msg->len = ibuf.length();
bzero(msg->data, msg->len);
memcpy(msg->data, ibuf.data(), msg->len);
ibuf.pop(msg->len);
ibuf.adjust();
printf("recv data = %s\n", msg->data);
//刪除讀事件,添加寫(xiě)事件
loop->del_io_event(fd, EPOLLIN);
loop->add_io_event(fd, server_wt_callback, EPOLLOUT, msg);
}
//server write_callback
void server_wt_callback(event_loop *loop, int fd, void *args)
{
struct message *msg = (struct message*)args;
output_buf obuf;
//回顯數(shù)據(jù)
obuf.send_data(msg->data, msg->len);
while(obuf.length()) {
int write_ret = obuf.write2fd(fd);
if (write_ret == -1) {
fprintf(stderr, "write connfd error\n");
return;
}
else if(write_ret == 0) {
//不是錯(cuò)誤,表示此時(shí)不可寫(xiě)
break;
}
}
//刪除寫(xiě)事件,添加讀事件
loop->del_io_event(fd, EPOLLOUT);
loop->add_io_event(fd, server_rd_callback, EPOLLIN, msg);
}
//...省略其他代碼
//...省略其他代碼
//開(kāi)始提供創(chuàng)建鏈接服務(wù)
void tcp_server::do_accept()
{
int connfd;
while(true) {
//accept與客戶(hù)端創(chuàng)建鏈接
printf("begin accept\n");
connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
if (connfd == -1) {
if (errno == EINTR) {
fprintf(stderr, "accept errno=EINTR\n");
continue;
}
else if (errno == EMFILE) {
//建立鏈接過(guò)多,資源不夠
fprintf(stderr, "accept errno=EMFILE\n");
}
else if (errno == EAGAIN) {
fprintf(stderr, "accept errno=EAGAIN\n");
break;
}
else {
fprintf(stderr, "accept error");
exit(1);
}
}
else {
//accept succ!
// ============= 新增 ======================
this->_loop->add_io_event(connfd, server_rd_callback, EPOLLIN, &msg);
break;
// ============= 新增 ======================
}
}
}
//...省略其他代碼
//...省略其他代碼
4.4 完成Lars Reactor V0.3開(kāi)發(fā)
? 我們將lars_reactor/example/lars_reactor_0.2的代碼復(fù)制一份到 lars_reactor/example/lars_reactor_0.3中。
lars_reactor/example/lars_reactor_0.3/lars_reactor.cpp
#include "tcp_server.h"
int main()
{
event_loop loop;
tcp_server server(&loop, "127.0.0.1", 7777);
loop.event_process();
return 0;
}
編譯。
啟動(dòng)服務(wù)器
$ ./lars_reactor
分別啟動(dòng)2個(gè)客戶(hù)端
client1
$ nc 127.0.0.1 7777
hello Iam client1
hello Iam client1 回顯
client2
$ nc 127.0.0.1 7777
hello Iam client2
hello Iam client2 回顯
服務(wù)端打印
$ ./lars_reactor
begin accept
ibuf.length() = 18
recv data = hello Iam client1
begin accept
ibuf.length() = 18
recv data = hello Iam client2
目前我們已經(jīng)成功將event_loop機(jī)制加入到reactor中了,接下來(lái)繼續(xù)添加功能。
關(guān)于作者:
作者:Aceld(劉丹冰)
mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書(shū)籍gitbook: http://legacy.gitbook.com/@aceld
原創(chuàng)聲明:未經(jīng)作者允許請(qǐng)勿轉(zhuǎn)載, 如果轉(zhuǎn)載請(qǐng)注明出處