同步UDP客戶端
UDP是面向無連接的,使用起來比較簡單,打開socke之后,指定目標(biāo)端口,直接進(jìn)行接收和發(fā)送:
void test_udp_echo_client()
{
try
{
io_service io;
udp::endpoint remote_ep(ip::address_v4::from_string("127.0.0.1"), 1024);
udp::socket socket(io);
socket.open(udp::v4());
char line[1024];
while (std::cin.getline(line, 1024))
{
socket.send_to(boost::asio::buffer(line, std::strlen(line)),remote_ep);
auto size = socket.receive_from(boost::asio::buffer(line),remote_ep);
std::cout.write(line,size);
}
socket.close();
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
socket本身提供了一些接口:
- socket.send_to 同步發(fā)送接口
- socket.receive_from 同步接收接口
Boost.Asio也有一些接口用來進(jìn)行發(fā)送和接收,可以參見后續(xù)的發(fā)送/接收函數(shù)組;
需要注意的是,boost.asio.buffer是一種接口適配器,通過接口進(jìn)行發(fā)送和接收,必須有對應(yīng)的數(shù)據(jù)緩沖區(qū)提供數(shù)據(jù)或者存儲空間。
同步UDP服務(wù)器
同步接收同步發(fā)送的UDP服務(wù)器也比較簡單,創(chuàng)建一個綁定到本地端口的socket,然后就是接收及發(fā)送動作:
void test_udp_echo_server()
{
try
{
io_service io;
ip::udp::socket socket(io, udp::endpoint(udp::v4(), 1024));
for (;;)
{
std::array<char,1024> recv_buf;
ip::udp::endpoint remote_socket;
boost::system::error_code error;
//同步接收
auto size = socket.receive_from(boost::asio::buffer(recv_buf),remote_socket,0,error);
if (error && error!= boost::asio::error::message_size)
{
throw boost::system::system_error(error);
}
std::cout.write(recv_buf.data(),size);
//發(fā)送回去
socket.send_to(boost::asio::buffer(recv_buf,size),remote_socket);
}
}
catch (std::exception& e)
{
std::cerr<<e.what()<<std::endl;
}
}
同步操作是不需要運(yùn)行IO服務(wù)的,以最常規(guī)的方式來進(jìn)行發(fā)送和接收,注意接收時如果接收到全部消息,即EOF也是通過報錯形式,錯誤碼為error::message_size。
異步UDP服務(wù)器的實現(xiàn)問題
實現(xiàn)異步的UDP服務(wù)器就略顯復(fù)雜,需要保證IO服務(wù)運(yùn)行,發(fā)起異步操作時要注意數(shù)據(jù)緩沖區(qū)生命周期:
- 啟動IO服務(wù)
啟動IO服務(wù)可以直接執(zhí)行io_service.run,由于IO服務(wù)的多線程安全特性,也可以啟動線程來執(zhí)行,譬如:
boost::asio::io_service io_;
std::thread task([&](){ io_.run();});
task.detach();
停止IO服務(wù)
停止IO服務(wù)可以直接執(zhí)行io_service.stop,會立即從運(yùn)行狀態(tài)退出,直到reset之后才能重新啟動。保證IO服務(wù)執(zhí)行
IO服務(wù)的run方法只有在有異步操作未完成的時候才能一直運(yùn)行,一旦沒有異步操作就會退出,因而需要在run之前保證有異步操作發(fā)起,在過程中不斷發(fā)起異步操作就能夠保證IO服務(wù)一直運(yùn)行。數(shù)據(jù)緩沖區(qū)生命周期
發(fā)起異步操作后,會立即退出,但是異步操作并沒有執(zhí)行,這就要求提供的數(shù)據(jù)緩沖區(qū)生命周期要足夠長,存活到異步操作執(zhí)行完,即在完成回調(diào)中再釋放數(shù)據(jù)緩沖區(qū),通??梢圆捎弥悄苤羔樆蛘遪ew出來的對象。
異步UDP服務(wù)器實現(xiàn)
class async_udp_echo_server
{
public:
async_udp_echo_server()
:socket_(io_,udp::endpoint(udp::v4(),1024))
{
do_recv();
std::thread task([&](){ io_.run();});
task.detach();
}
void do_recv()
{
//保證發(fā)送完成之前一直有效
char* recv_buf = new char[1024];
socket_.async_receive_from(boost::asio::buffer(recv_buf,1024), remote_ep_,
[recv_buf, this](const boost::system::error_code& error,std::size_t bytes_transferred){
if (!error || error == boost::asio::error::message_size)
{
do_send(recv_buf,bytes_transferred, std::move(remote_ep_));
}
else
{
std::cout << error.message() << "\n";
}
do_recv();
});
}
void do_send(char* send_buf,std::size_t size,udp::endpoint ep)
{
socket_.async_send_to(boost::asio::buffer(send_buf,size),ep,
[send_buf](const boost::system::error_code& error, std::size_t bytes_transferred){
if (!error)
{
std::cout<<"echo finished\n";
}
delete[] send_buf;
});
}
void stop()
{
io_.stop();
}
~async_udp_echo_server()
{
stop();
}
private:
boost::asio::io_service io_;
udp::socket socket_;
udp::endpoint remote_ep_;
};
可以看到do_recv方法發(fā)起了一個異步接收操作,在操作完成回調(diào)中再次發(fā)起,構(gòu)造服務(wù)器時率先調(diào)用了do_recv,從而保證IO服務(wù)一直運(yùn)行。
do_recv方法在發(fā)起異步操作前申請了一塊內(nèi)存,接收的內(nèi)容被保存在這塊內(nèi)存之中,當(dāng)do_send發(fā)起異步發(fā)送操作時被借用,直到發(fā)送完成才將這段內(nèi)存釋放掉。
在構(gòu)造函數(shù)中啟動了一個線程來執(zhí)行IO服務(wù),并detach掉線程,從而保證服務(wù)器不阻塞,在析構(gòu)函數(shù)停止了IO服務(wù)。
需要注意到的是remote_ep_在執(zhí)行do_send時被move了,由于remote_ep_標(biāo)識了遠(yuǎn)程端口,而且被聲明為成員變量,在接受操作中會被填充遠(yuǎn)程端口內(nèi)容,如果多個遠(yuǎn)程主機(jī)同時發(fā)起,單個remote_ep_是無法正常處理的,所以一旦內(nèi)容被填充后,就會轉(zhuǎn)移出去給發(fā)送操作使用[個人理解,沒有實際測試和驗證]。
使用方法
async_udp_echo_server server_;
char line[1024];
while (std::cin.getline(line, 1024)){
if(line[0] == 'Q')
break;
};
server_.stop();