架構(gòu)
基于boost::asio異步開源組件,實(shí)現(xiàn)了一個(gè)線程池。
異步服務(wù)器代碼架構(gòu)可參考boost源碼里的樣例async_tcp_echo_server.cpp的實(shí)現(xiàn),如下。
#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
class session
: public std::enable_shared_from_this<session>
{
public:
session(tcp::socket socket)
: socket_(std::move(socket))
{
}
void start()
{
do_read();
}
private:
void do_read()
{
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(data_, max_length),
[this, self](boost::system::error_code ec, std::size_t length)
{
if (!ec)
{
do_write(length);
}
});
}
void do_write(std::size_t length)
{
auto self(shared_from_this());
boost::asio::async_write(socket_, boost::asio::buffer(data_, length),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
do_read();
}
});
}
tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
};
class server
{
public:
server(boost::asio::io_context& io_context, short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port))
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(
[this](boost::system::error_code ec, tcp::socket socket)
{
if (!ec)
{
std::make_shared<session>(std::move(socket))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
};
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: async_tcp_echo_server <port>\n";
return 1;
}
boost::asio::io_context io_context;
server s(io_context, std::atoi(argv[1]));
io_context.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
線程池的實(shí)現(xiàn)
參考源碼樣例io_context_pool.cpp,路徑libs\asio\example\cpp03\http\server2。
io_context_pool構(gòu)造時(shí)傳入線程池大小pool_size,創(chuàng)建出pool_size個(gè)asio::io_context對(duì)象。在run()接口中完成pool_size個(gè)thread的創(chuàng)建,并完成所有線程的join()。提供一個(gè)get_io_context()接口,獲取一個(gè)可以使用的asio::io_context對(duì)象。
整個(gè)過程
1、main函數(shù)中添加了一個(gè)額外的參數(shù)指定線程池大小,并在server類中聲明了一個(gè)線程池類成員io_service_pool io_service_pool_。線程池類的構(gòu)造函數(shù)接收一個(gè)整型參數(shù)io_service_pool_size指定線程池大小。
2、在server類中創(chuàng)建connection實(shí)例時(shí)需要從線程池中獲取asio::io_context對(duì)象,這時(shí)使用線程池類的io_service_pool_.get_io_context()獲取asio::io_context對(duì)象。
3、根據(jù)asio的約定,異步操作由哪個(gè)線程執(zhí)行與其相關(guān)io_context對(duì)象有關(guān)。調(diào)用io_service::run()的線程才能執(zhí)行相關(guān)異步操作。因此要想實(shí)現(xiàn)線程池,只需要在線程池對(duì)象中創(chuàng)建多個(gè)io_service對(duì)象,并創(chuàng)建同樣多的線程對(duì)象,在每個(gè)線程中都調(diào)用一個(gè)io_service對(duì)象的run()方法,這樣通過在線程池中均勻的獲取io_context對(duì)象,就可以實(shí)現(xiàn)將異步操作均勻的分配給多個(gè)線程來執(zhí)行了。
先看看server類創(chuàng)建connection的代碼:
void server::start_accept()
{
new_connection_.reset(new connection(io_service_pool_.get_io_service(), request_handler_));
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, this, boost::asio::placeholders::error));
}
connection構(gòu)造函數(shù)需要一個(gè)io_service對(duì)象,這里是從線程池中獲取的,同時(shí)也就指定了這個(gè)connection類中的異步操作都由線程池中相應(yīng)的線程來處理.
下面重點(diǎn)分析一下線程池類的定義和實(shí)現(xiàn).io_service_pool從noncopyable繼承,不能進(jìn)行拷貝復(fù)制.
io_service_pool定義的數(shù)據(jù)成員:
std::vector<io_service_ptr> io_services_;//存放io_service對(duì)象的容器.
std::vector<work_ptr> work_; //存放工作者對(duì)象的容器
std::size_t next_io_service_; //指定下一個(gè)將被分配的io_service
io_service_pool定義的函數(shù)成員:
1、顯式構(gòu)造函數(shù):
初始化next_io_service_為0,創(chuàng)建指定線程池大小個(gè)數(shù)的io_service對(duì)象和工作者對(duì)象,并分別存放在io_service_容器和work_容器中。
for (std::size_t i = 0; i < pool_size; ++i)//創(chuàng)建多個(gè)io_service和多個(gè)工作者對(duì)象
{
io_service_ptr io_service(new boost::asio::io_service);
work_ptr work(new boost::asio::io_service::work(*io_service));
io_services_.push_back(io_service);
work_.push_back(work);
}
2、run函數(shù):
根據(jù)指定線程池大小創(chuàng)建多個(gè)線程,這些線程的執(zhí)行函數(shù)為相應(yīng)io_service對(duì)象的run方法.并讓主線程阻塞等待所有線程執(zhí)行完畢。
for (std::size_t i = 0; i < io_services_.size(); ++i)
{
boost::shared_ptr<boost::thread> thread(new boost::thread(
boost::bind(&boost::asio::io_service::run, io_services_[i])));
threads.push_back(thread);
}
//主線程一直阻塞等待 直到所有線程結(jié)束
for (std::size_t i = 0; i < threads.size(); ++i)
threads[i]->join();
3、stop函數(shù):
調(diào)用所有io_service對(duì)象的stop方法,停止處理異步通信.
4、get_io_service函數(shù):
這個(gè)函數(shù)根據(jù)next_io_service_數(shù)據(jù)成員獲取在容器中一個(gè)io_service對(duì)象,并將next_io_service_加1,如果達(dá)到容器的最大個(gè)數(shù),則置為0.實(shí)現(xiàn)循環(huán)獲取io_service對(duì)象的目的。
boost::asio::io_service& io_service_pool::get_io_service()
{
//循環(huán)獲取io_service對(duì)象 異步操作由創(chuàng)建io_service的線程執(zhí)行,從而實(shí)現(xiàn)為線程池中的線程均勻分配任務(wù)
boost::asio::io_service& io_service = *io_services_[next_io_service_];
++next_io_service_;
if (next_io_service_ == io_services_.size())
next_io_service_ = 0;
return io_service;
}
最后注意存放在容器中的線程,io_service對(duì)象,工作者對(duì)象,線程對(duì)象都是使用shared_ptr指針保護(hù)的,保證了這些對(duì)象的自動(dòng)釋放.
這里創(chuàng)建了多個(gè)io_service對(duì)象,也可以只創(chuàng)建一個(gè)io_service對(duì)象,多個(gè)線程都執(zhí)行io_service::run()函數(shù),則異步操作可在這里線程中進(jìn)行隨機(jī)分配.請(qǐng)看server3范例。
openssl與gmssl共存——隱藏符號(hào)表
openssl與gmssl需要共存,但他們兩者對(duì)外拋出的接口基本上一樣,這樣會(huì)到值接口沖突。
解決方法:
將gmssl編譯成靜態(tài)庫,再封裝一層gmsslwarp的動(dòng)態(tài)庫對(duì)外使用,其中g(shù)msslwarp拋出的接口增加gm前綴,以避免和openssl中的接口沖突。那么其中g(shù)mssl靜態(tài)庫、封裝的gmsslwarp動(dòng)態(tài)庫在編譯時(shí)需要加上一個(gè)gcc編譯參數(shù)-fvisibility=hidden,以隱藏大部分未使用的接口。而同時(shí)在gmsslwarp動(dòng)態(tài)庫的實(shí)現(xiàn)的函數(shù)頭文件前需要加上:__attribute__((visibility("default"))),以保證對(duì)外可見。
fvisibility=hidden說明
-fvisibility=[default|internal|hidden|protected]
fvisibility=hidden將函數(shù)名隱藏,需要暴露給用戶的函數(shù)接口可以單獨(dú)通過 __attribute__((visibility ("default")))聲明避免被隱藏。
__attribute__((visibility ("default"))) int func1()
{
return 1;
}
斷連問題定位
現(xiàn)象
網(wǎng)關(guān)出現(xiàn)斷連,日志中出現(xiàn)打開文件失敗。
分析過程
1、復(fù)現(xiàn)
初步分析原因?yàn)槲募浔鷶?shù)使用超過了,可能存在某處文件描述符未關(guān)閉的資源泄露,使用ulimit –a/c,查看和修改文件描述符最大限制。測(cè)試復(fù)現(xiàn),發(fā)現(xiàn)多次連接斷開后,總有幾個(gè)SOCK未關(guān)閉。通過ps –eaf | grep -i vncgate,查的進(jìn)程。cd /proc/pid/fd獲取進(jìn)程打開的文件描述符信息。ls –l | wc –l 該命令實(shí)時(shí)多執(zhí)行幾次,實(shí)時(shí)查看當(dāng)前進(jìn)程正打開的文件描述符個(gè)數(shù)。

通過lsof命令查詢文件描述符命令:
lsof -p pid其中NODE一列表示了文件描述的inode號(hào)
計(jì)算個(gè)數(shù):
lsof -p pid | wc -l2、代碼跟蹤
發(fā)現(xiàn)是sock句柄未關(guān),則在代碼中將所有相關(guān)連的sock句柄創(chuàng)建和銷毀的地方增加日志打印,再運(yùn)行服務(wù),復(fù)現(xiàn)問題。打印信息包括:上下行、文件描述符、狀態(tài)、inode號(hào)。最關(guān)鍵是的inode號(hào),識(shí)別唯一的資源。使用如下接口從文件描述符獲取對(duì)于的inode號(hào)。Linux的文件描述符FD與Inode
int getinode(int fd)
{
struct stat fileStat;
if (fstat(fd, &fileStat) < 0) {
return -1;
}
return fileStat.st_ino;
}
3、復(fù)現(xiàn)問題,分析日志
分析日志發(fā)現(xiàn),確實(shí)有幾次創(chuàng)建的sock句柄,并未走入關(guān)閉流程。
4、解決
分析代碼,增加一個(gè)超時(shí)關(guān)閉機(jī)制。
5、卡死根本原因
vag關(guān)閉鏈接時(shí)只調(diào)用了shutdown,該接口是同步阻塞接口,此時(shí)需要客戶端回饋一個(gè)消息(close_notify),而客戶端代碼里直接調(diào)用底層close方法關(guān)閉了ssl連接,未在之前調(diào)用shutdown方法,故close_notify消息一直不會(huì)被vag收到,故那個(gè)sock文件描述符一直不會(huì)被正常關(guān)閉。
解決辦法在vag處增加一個(gè)超時(shí)關(guān)閉機(jī)制,即直接調(diào)用底層close方法關(guān)閉。防止因?yàn)榫W(wǎng)絡(luò)原因,客戶端的close_notify消息收不到的情況。
shutdown close區(qū)別
shutdown()函數(shù)可以選擇關(guān)閉全雙工連接的讀通道或者寫通道,如果兩個(gè)通道同時(shí)關(guān)閉,則這個(gè)連接不能再繼續(xù)通信。
close()函數(shù)會(huì)同時(shí)關(guān)閉全雙工連接的讀寫通道,除了關(guān)閉連接外,還會(huì)釋放套接字占用的文件描述符。而shutdown()只會(huì)關(guān)閉連接,但是不會(huì)釋放占用的文件描述符。
GET/POST
采用curl實(shí)現(xiàn)了一套以REST為標(biāo)準(zhǔn)的GET/POST方法。
post:上報(bào)心跳、網(wǎng)關(guān)內(nèi)存、CPU使用情況,連接個(gè)數(shù)等數(shù)據(jù)給管理平面。接收對(duì)應(yīng)的應(yīng)答結(jié)果。