目錄
一、原理分析
二、主要函數(shù)
三、代碼實(shí)現(xiàn)
四、測試過程
-----------------------------------------------------------------------------------
一、原理分析
ATS是做服務(wù)器最好的選擇,服務(wù)器最重要的兩件事就是接收處理用戶的請求并發(fā)送反饋的信息給用戶。這個(gè)帖子的目的在于介紹幾個(gè)接收、發(fā)送數(shù)據(jù)的處理函數(shù)的使用。
先來看下,整體的結(jié)構(gòu)
裝逼的功能:我希望在ATS的基礎(chǔ)上,開發(fā)一個(gè)插件,這個(gè)插件能夠接收用戶的連接請求,同時(shí)能發(fā)送數(shù)據(jù)給用戶。
實(shí)際的功能:這不就是個(gè)Socket的發(fā)送、接收功能嗎。。。
socket歸socket,不同的是ATS如何利用Socket+Event+Continuations這三種機(jī)制實(shí)現(xiàn)這一最基本的功能。

二、主要函數(shù)
ATS中插件是運(yùn)行在continuations(協(xié)程)的機(jī)制上的,可以說一個(gè)插件就是一個(gè)或多個(gè)continuations組成的,所以先介紹的兩個(gè)函數(shù)是continuations的創(chuàng)建與銷毀
TSCont TSContCreate(TSEventFunc funcp, TSMutex mutexp);
###funcp是指continuations用來處理事件的主要函數(shù),創(chuàng)建函數(shù)是用來創(chuàng)建并綁定事件處理函數(shù)
void TSContDestroy(TSCont contp);
第二,之前介紹到,數(shù)據(jù)會(huì)隨著continuations的掛起而保存,有點(diǎn)像中斷,接下來的這個(gè)函數(shù)能將要保存的數(shù)據(jù)與協(xié)程綁定起來
void TSContDataSet(TSCont contp, void *data);
###void *data即要保存的數(shù)據(jù),可以是類、結(jié)構(gòu)體等等。
void *TSContDataGet(TSCont contp);
###獲取dataset所保存的數(shù)據(jù)
TSAction TSNetAccept(TSCont contp, int port, int domain, int accept_threads);
###綁定并監(jiān)聽端口,第3個(gè)參數(shù)domain的值可參考socket的值,一般是設(shè)置AF_INET,最后一個(gè)是回調(diào)函數(shù)的線程ID,可設(shè)置回調(diào)可不設(shè)置
int TSContCall(TSCont contp, TSEvent event, void *edata);
###回調(diào)與cont綁定的eventhandler函數(shù),第二個(gè)參數(shù)為事件的代號
第三,接收有關(guān)數(shù)據(jù)的讀、寫操作的函數(shù)

當(dāng)請求接入后,TSNetAccept的data是一個(gè)TSVconn的類,也就是通過這個(gè)類在兩者之間建立聯(lián)系
數(shù)據(jù)的讀?。▽懭耄┬枰ㄟ^IObuffer、IOBufferReader、TSIOBufferBlock相互配合才能將數(shù)據(jù)從buffer中讀?。▽懭耄┏鰜怼?/h4>主要的幾個(gè)函數(shù)(以讀取函數(shù)為例)
TSVIO TSVConnRead(TSVConn connp, TSCont contp, TSIOBuffer bufp, int64_t nbytes);
###將連接發(fā)送給協(xié)程數(shù)據(jù)關(guān)聯(lián)至IOBuffer中,同時(shí)nbytes設(shè)置IObuffer的空間大小,當(dāng)有數(shù)據(jù)寫入時(shí),觸發(fā)該協(xié)程的回調(diào)事件函數(shù)。
const char *TSIOBufferBlockReadStart(TSIOBufferBlock blockp, TSIOBufferReader readerp, int64_t *avail);
###將數(shù)據(jù)讀出,avail表示用于存放讀書數(shù)據(jù)字節(jié)數(shù)的地址
TSIOBufferBlock TSIOBufferBlockNext(TSIOBufferBlock blockp);
###每次讀取的數(shù)據(jù)有限,通過block的方式多次讀取,這里的block相當(dāng)于指針一樣,隨著讀取不斷向后移動(dòng)
三、代碼實(shí)現(xiàn)
最后,是通過C++編寫一個(gè)小插件。
首先,我們需要將這個(gè)插件掛載到ATS上,因此需要聲明TSPluginInit函數(shù),由于是在CPP的文件里寫的,因此需要聲明用gcc來編譯這個(gè)入口函數(shù)就有如下代碼,同時(shí)我們再此將插件功能進(jìn)行初始化
extern "C" void TSPluginInit (int argc, const char *argv[]);
void TSPluginInit (int argc, const char *argv[])
{
? ? ????TSPluginRegistrationInfo info;
????????info.plugin_name = "hello-world";
????????info.vendor_name = "MyCompany";
????????info.support_email = "ts-api-support@MyCompany.com";
????????printf("開始運(yùn)行插件\n");
????????if (TSPluginRegister(&info) != TS_SUCCESS) {
????????TSError("注冊失敗2");
????????}
????????helloinit();
}
在初始化函數(shù)中,需要做的有四件事:創(chuàng)建continuations、綁定數(shù)據(jù)存放、綁定事件處理的主函數(shù)、綁定端口并監(jiān)聽。在這之前,先創(chuàng)建一用于保存數(shù)據(jù)的機(jī)構(gòu)體(或者類)
typedef struct {
????????TSIOBuffer bufp;
????????TSIOBuffer out_bufp;
????????TSIOBufferReader readerp;
????????TSIOBufferReader out_readerp;
????????TSVConn write_vconnp;
????????TSVConn read_vconnp;
????????TSVIO read_vio;
????????TSVIO write_vio;
} CacheVConnStruct;
于是,helloinit、以及主事件處理函數(shù)的代碼為
int helloinit()
{
printf("綁定端口\n");
hello_appcep_cont_=TSContCreate(HelloAccpetHandler, TSMutexCreate());
printf("cont是:%d\n",hello_appcep_cont_);
CacheVConnStruct *cache_vconn = (CacheVConnStruct *)TSmalloc(sizeof(CacheVConnStruct));
TSContDataSet( hello_appcep_cont_, cache_vconn);
TSNetAccept(hello_appcep_cont_, 8899,AF_INET ,1);
}
int HelloAccpetHandler(TSCont cont,TSEvent event,void *data)
{
printf("cont是:%d\n",cont);
printf("回調(diào)連接的事件為:%d\n",event);
switch ((int)event)
{
case TS_EVENT_NET_ACCEPT:
printf("連接正常接入\n");
ReadFromVConn(cont,data);
break;
case TS_EVENT_VCONN_READ_COMPLETE:
printf("讀取完畢\n");
//HandlerReadComplete(cont,data);
break;
case TS_EVENT_VCONN_READ_READY:
printf("buffer尚未讀滿\n");
HandlerReadComplete(cont,data);
}
return 1;
}
接下來,需要等待連接接入,為此我們需要寫個(gè)客戶端的腳本,方便采用python編寫
# -*- coding: utf-8 -*-
import socket
import sys
datatosend="GET / HTTP/1.1\r\nHost: 192.168.31.138:8899\r\nConnection: keep-alive\r\nCache-Control: max-age=0\r\nUser-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36\r\nUpgrade-Insecure-Requests: 1\r\nAccept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8\r\nAccept-Encoding: gzip, deflate\r\nAccept-Language: zh-CN,zh;q=0.9\r\n\r\n"
TCP_IP = '192.168.31.138'
TCP_PORT = 8899
ADDR=(TCP_IP,TCP_PORT)
print ADDR
try:
????#create an AF_INET, STREAM socket (TCP)
????s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error, msg:
????print 'Error code: ' + str(msg[0]) + ' , Error message : ' + msg[1]
????sys.exit();
????#s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
????print 'Socket Created'
try:
????s.connect((TCP_IP,TCP_PORT))
except socket.error , msg:
????print 'Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
????sys.exit()
s.send(datatosend)
i=i+1
當(dāng)客戶端連接服務(wù)器后,ATS會(huì)觸發(fā)事件函數(shù)202表接受了請求,建立連接,這個(gè)時(shí)候就綁定IOBuffer并聲明buffer的空間,同時(shí)我直接發(fā)送給對端一個(gè)鏈接的信號,代碼如下:
void ReadFromVConn(TSCont cont,void *data)
{
CacheVConnStruct *recv_sm=(CacheVConnStruct*)TSContDataGet( cont);
recv_sm->bufp =TSIOBufferCreate();
recv_sm->read_vconnp=(TSVConn)data;
TSVConnRead((TSVConn)data, cont, recv_sm->bufp, 500);
TSVConn input_request=(TSVConn)data;
printf("%s\n",input_request);
TSIOBuffer input_bufp=TSIOBufferCreate();
TSIOBufferReader input_bufp_reader=TSIOBufferReaderAlloc(input_bufp);
TSIOBuffer output_bufp=TSIOBufferCreate();
TSIOBufferReader output_bufp_reader=TSIOBufferReaderAlloc(output_bufp);
/////////發(fā)送數(shù)據(jù)部分
TSIOBufferBlock blockp;
char *ptr_block;
int64_t avail;
blockp? ? = TSIOBufferStart(output_bufp);
ptr_block = TSIOBufferBlockWriteStart(blockp, &avail);
memcpy(ptr_block, "i love ats", 11);
TSIOBufferProduce(output_bufp, 11);
TSVIO write_io=TSVConnWrite(input_request, cont,output_bufp_reader, 11);
}
當(dāng)IOBuffer中有數(shù)據(jù)的時(shí)候就會(huì)觸發(fā)事件102或者103,102表示發(fā)送過來的數(shù)據(jù)尚未填滿buffer,103表示buffer已經(jīng)滿了。那么將數(shù)據(jù)讀出的代碼如下:
int HandlerReadComplete(TSCont cont,void *data)
{
CacheVConnStruct *cache_vconn=(CacheVConnStruct*)TSContDataGet( cont);
cache_vconn->readerp=TSIOBufferReaderAlloc(cache_vconn->bufp);
int avail=TSIOBufferReaderAvail(cache_vconn->readerp);
printf("共有%d個(gè)數(shù)據(jù)\n",avail);
if(avail>0)
{
string str;
const char *buffer_temp=new char[1024];
int consumed=0;
TSIOBufferBlock block = TSIOBufferReaderStart(cache_vconn->readerp);
int64_t data_len;
str.reserve(avail + 1);
while (block)
{
buffer_temp=TSIOBufferBlockReadStart( block, cache_vconn->readerp, &data_len);
str.append(buffer_temp,data_len);
consumed+=data_len;
block=TSIOBufferBlockNext( block);
}
printf("\n%s\n",str.c_str());
}
}
四、測試過程
為了測試這個(gè)插件的過程,編寫一個(gè)腳本,自動(dòng)編譯、自動(dòng)重啟、自動(dòng)刪除日志、自動(dòng)啟動(dòng)測試客戶端、自動(dòng)關(guān)閉ATS,命令如下
/usr/local/ats/bin/tsxs -lpthread -o hello.so -c hello.cpp
sudo /usr/local/ats/bin/tsxs -o hello.so -i
sudo rm -f /usr/local/ats/var/log/trafficserver/*
sudo /usr/local/ats/bin/trafficserver restart
sleep 3s
python /home/carl/httpsend.py &
sleep 9s
sudo /usr/local/ats/bin/trafficserver stop
sleep 1s
可以看到,客戶端啟動(dòng)后,接收到了ATS發(fā)送過來的數(shù)據(jù)
TSVIO TSVConnRead(TSVConn connp, TSCont contp, TSIOBuffer bufp, int64_t nbytes);
const char *TSIOBufferBlockReadStart(TSIOBufferBlock blockp, TSIOBufferReader readerp, int64_t *avail);
TSIOBufferBlock TSIOBufferBlockNext(TSIOBufferBlock blockp);
extern "C" void TSPluginInit (int argc, const char *argv[]);
void TSPluginInit (int argc, const char *argv[])
{
? ? ????TSPluginRegistrationInfo info;
????????info.plugin_name = "hello-world";
????????info.vendor_name = "MyCompany";
????????info.support_email = "ts-api-support@MyCompany.com";
????????printf("開始運(yùn)行插件\n");
????????if (TSPluginRegister(&info) != TS_SUCCESS) {
????????TSError("注冊失敗2");
????????}
????????helloinit();
}
typedef struct {
????????TSIOBuffer bufp;
????????TSIOBuffer out_bufp;
????????TSIOBufferReader readerp;
????????TSIOBufferReader out_readerp;
????????TSVConn write_vconnp;
????????TSVConn read_vconnp;
????????TSVIO read_vio;
????????TSVIO write_vio;
} CacheVConnStruct;
int helloinit()
{
printf("綁定端口\n");
hello_appcep_cont_=TSContCreate(HelloAccpetHandler, TSMutexCreate());
printf("cont是:%d\n",hello_appcep_cont_);
CacheVConnStruct *cache_vconn = (CacheVConnStruct *)TSmalloc(sizeof(CacheVConnStruct));
TSContDataSet( hello_appcep_cont_, cache_vconn);
TSNetAccept(hello_appcep_cont_, 8899,AF_INET ,1);
}
int HelloAccpetHandler(TSCont cont,TSEvent event,void *data)
{
printf("cont是:%d\n",cont);
printf("回調(diào)連接的事件為:%d\n",event);
switch ((int)event)
{
case TS_EVENT_NET_ACCEPT:
printf("連接正常接入\n");
ReadFromVConn(cont,data);
break;
case TS_EVENT_VCONN_READ_COMPLETE:
printf("讀取完畢\n");
//HandlerReadComplete(cont,data);
break;
case TS_EVENT_VCONN_READ_READY:
printf("buffer尚未讀滿\n");
HandlerReadComplete(cont,data);
}
return 1;
}
# -*- coding: utf-8 -*-
import socket
import sys
datatosend="GET / HTTP/1.1\r\nHost: 192.168.31.138:8899\r\nConnection: keep-alive\r\nCache-Control: max-age=0\r\nUser-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36\r\nUpgrade-Insecure-Requests: 1\r\nAccept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8\r\nAccept-Encoding: gzip, deflate\r\nAccept-Language: zh-CN,zh;q=0.9\r\n\r\n"
TCP_IP = '192.168.31.138'
TCP_PORT = 8899
ADDR=(TCP_IP,TCP_PORT)
print ADDR
try:
????#create an AF_INET, STREAM socket (TCP)
????s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error, msg:
????print 'Error code: ' + str(msg[0]) + ' , Error message : ' + msg[1]
????sys.exit();
????#s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
????print 'Socket Created'
try:
????s.connect((TCP_IP,TCP_PORT))
except socket.error , msg:
????print 'Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
????sys.exit()
s.send(datatosend)
i=i+1
void ReadFromVConn(TSCont cont,void *data)
{
CacheVConnStruct *recv_sm=(CacheVConnStruct*)TSContDataGet( cont);
recv_sm->bufp =TSIOBufferCreate();
recv_sm->read_vconnp=(TSVConn)data;
TSVConnRead((TSVConn)data, cont, recv_sm->bufp, 500);
TSVConn input_request=(TSVConn)data;
printf("%s\n",input_request);
TSIOBuffer input_bufp=TSIOBufferCreate();
TSIOBufferReader input_bufp_reader=TSIOBufferReaderAlloc(input_bufp);
TSIOBuffer output_bufp=TSIOBufferCreate();
TSIOBufferReader output_bufp_reader=TSIOBufferReaderAlloc(output_bufp);
/////////發(fā)送數(shù)據(jù)部分
TSIOBufferBlock blockp;
char *ptr_block;
int64_t avail;
blockp? ? = TSIOBufferStart(output_bufp);
ptr_block = TSIOBufferBlockWriteStart(blockp, &avail);
memcpy(ptr_block, "i love ats", 11);
TSIOBufferProduce(output_bufp, 11);
TSVIO write_io=TSVConnWrite(input_request, cont,output_bufp_reader, 11);
}
int HandlerReadComplete(TSCont cont,void *data)
{
CacheVConnStruct *cache_vconn=(CacheVConnStruct*)TSContDataGet( cont);
cache_vconn->readerp=TSIOBufferReaderAlloc(cache_vconn->bufp);
int avail=TSIOBufferReaderAvail(cache_vconn->readerp);
printf("共有%d個(gè)數(shù)據(jù)\n",avail);
if(avail>0)
{
string str;
const char *buffer_temp=new char[1024];
int consumed=0;
TSIOBufferBlock block = TSIOBufferReaderStart(cache_vconn->readerp);
int64_t data_len;
str.reserve(avail + 1);
while (block)
{
buffer_temp=TSIOBufferBlockReadStart( block, cache_vconn->readerp, &data_len);
str.append(buffer_temp,data_len);
consumed+=data_len;
block=TSIOBufferBlockNext( block);
}
printf("\n%s\n",str.c_str());
}
}
/usr/local/ats/bin/tsxs -lpthread -o hello.so -c hello.cpp
sudo /usr/local/ats/bin/tsxs -o hello.so -i
sudo rm -f /usr/local/ats/var/log/trafficserver/*
sudo /usr/local/ats/bin/trafficserver restart
sleep 3s
python /home/carl/httpsend.py &
sleep 9s
sudo /usr/local/ats/bin/trafficserver stop
sleep 1s

同時(shí)查看ATS的日志,可以看到對應(yīng)事件回調(diào)的過程
