go初學(xué)者,有興趣者,歡迎交流學(xué)習(xí)。
開源簡介
packbeat是一個開源的實(shí)時網(wǎng)絡(luò)抓包與分析框架,內(nèi)置了很多常見的協(xié)議捕獲及解析,如HTTP、MySQL、Redis等。在實(shí)際使用中,通常和Elasticsearch以及kibana聯(lián)合使用,用于數(shù)據(jù)搜索和分析以及數(shù)據(jù)展示。
- 開發(fā)環(huán)境:Go語言
- Git:源碼管理
- IDE:推薦sublime或者liteide
開發(fā)之前
1.packbeat已經(jīng)被elastic整合在beats項目中,使用前登錄github,并打開
https://github.com/elasticsearch/beats.fork到自己的倉庫。
如:https://github.com/lindsay-show/packbeat
2.創(chuàng)建相應(yīng)目錄
<pre><code>
mkdir -p $GOPATH/src/github.com/elastic
cd $GOPATH/src/github.com/elastic</pre></code>
3.git clone
<pre><code>git clone https://github.com/elasitc/beats.git
cd beats</pre></code>
4.修改官方庫為upstream源,設(shè)置自己的倉庫為orgin源
<pre><code>git remote rename origin upstream
git remote add origin git@github.com:lindsay-show/packbeat.git</pre></code>
5.獲取最新代碼(剛fork,可忽略),并創(chuàng)建分支用于自定義功能開發(fā)
<pre><code>git pull upstream master
git checkout -b mypackbeat</pre></code>
6.切換到packbeat,并獲取依賴信息
<pre><code>cd packbeat
mkdir -p $GOPATH/src/golang.org/x/
cd $GOPATH/src/golang.org/x
git clone https://github.com/golang/tools.git
go get github.com/tools/godep</pre></code>
7.使用make編譯packbeat源碼,得到packbeat可執(zhí)行文件
注:
[1] git的相關(guān)介紹和命令可參考 Git教程
[2] go安裝及環(huán)境變量配置可參考 Golang官網(wǎng)
源碼框架
packbeat項目源碼結(jié)構(gòu)如下:

packetbeat整合在beats項目中,其中還包括topbeat以及filebeat,現(xiàn)簡要介紹beats源碼框架內(nèi)容如下:
- /libbeat:公共依賴庫
- /filebeat:logstash升級版,處理日志類型數(shù)據(jù)
- /packbeat:網(wǎng)絡(luò)抓包
- /topbeat:監(jiān)控系統(tǒng)性能;
- /vendor:依賴的第三方庫(如dns開源庫或者其他協(xié)議棧)
- /tests:用于測試的pcamp抓包文件
- /scripts:測試腳本
關(guān)于topbeat及filebeat的更多介紹參考elastic官網(wǎng)。
packebeat源碼框架介紹如下:
- /packetbeat/main.go:項目啟動入口;
- /packetbeat/config/:config.go,定義了所有配置相關(guān)的struct結(jié)構(gòu)體
- /packetbeat/debian/:打包相關(guān)
- /packetbeat/docs/:文檔
- /packetbeat/etc/:配置文件示例
- /packetbeat/procs/:獲取系統(tǒng)內(nèi)核運(yùn)作狀態(tài)與進(jìn)程信息的工具類
- /packetbeat/protos/:自定義協(xié)議類,每個子目錄對應(yīng)一個應(yīng)用協(xié)議,包含配置相關(guān)的結(jié)構(gòu)體及具體實(shí)現(xiàn)
- /packetbeat/sniffer/:三種不同抓包方式的實(shí)現(xiàn),如pcap、af_packet及pf_ring
- /packetbeat/tests/:測試相關(guān)的文件,包含協(xié)議pcap文件及python測試腳本
注:以上介紹針對packetbeat-1.2.1,區(qū)別官網(wǎng)的開發(fā)幫助文檔(官網(wǎng)未更新)。
工作原理
介紹了beats及packetbeat源碼結(jié)構(gòu),簡要說明一下packetbeat的工作原理:
每一個協(xié)議都有一個或者多個固定的端口用于通信,開發(fā)者要做的事情就是定義協(xié)議端口,然后按照TCP以及UDP實(shí)現(xiàn)對應(yīng)的接口,Packetbeat會捕獲到指定端口的數(shù)據(jù)包,然后交給開發(fā)者定義的方法來解析,如TCP對應(yīng)的是Parse,UDP是ParseUdp.解析出來的結(jié)構(gòu)化數(shù)據(jù)封裝成Json,插入到Elasticsearch中,后續(xù)便可使用Elasticsearch的搜索和數(shù)據(jù)統(tǒng)計能力進(jìn)行應(yīng)用層數(shù)據(jù)分析。
使用方法
了解Packetbeat的工作原理后,接下來介紹如何使用packetbeat進(jìn)行網(wǎng)絡(luò)數(shù)據(jù)包捕獲及分析。
在上述介紹中,我們知道packetbeat/protos目錄下支持自定義協(xié)議,目前Packebeat支持的協(xié)議如下:
- ICMP (v4 and v6)
- DNS
- HTTP
- Mysql
- PostgreSQL
- Redis
- Thrift-RPC
- MongoDB
- Memcache
以HTTP為例,安裝packetbeat源碼后,配置文件packetbeat.yml中默認(rèn)已經(jīng)配置了上述支持的協(xié)議類型。使用步驟簡述如下:
- 安裝Packetbeat源碼
- 配置packetbeat.yml文件,默認(rèn)不用更改(默認(rèn)配置輸出到elasticsearch)
- 加載packetbeat索引至elasticsearch中(使用第三方腳本)
- 啟動elasticsearch及kibana,查看http數(shù)據(jù)包捕獲及分析
如:啟動packetbeat,打開幾個網(wǎng)頁,在終端極即可看到packetbeat已注冊的協(xié)議類型以及http請求數(shù)據(jù)和應(yīng)答數(shù)據(jù)包
<pre><code>
cd packetbeat-1.2.1
./packetbeat -N -e </pre></code>
注:
[1] Packetbeat詳細(xì)使用說明,請參考Packetbeat官方幫助文檔,非常詳細(xì)。
[2] Elasticsearch及kibana的安裝和使用,請參考Elastic官方幫助文檔。
擴(kuò)展協(xié)議開發(fā)
在前面介紹到,目前packetbeat支持的協(xié)議類型主要是HTTP等常見協(xié)議類型,即時通信協(xié)議,如sip、msrp以及xmpp等暫不支持。如何對packetbeat進(jìn)行協(xié)議擴(kuò)展是我們研究該源碼的主要目的。
網(wǎng)絡(luò)傳輸兩大協(xié)議TCP和UDP,應(yīng)用層協(xié)議都離不開這兩種協(xié)議,如源碼中的HTTP、MySQL走的是TCP傳輸協(xié)議,DNS走的是UDP協(xié)議,在Packetbeat里面,要實(shí)現(xiàn)自定義協(xié)議,只需實(shí)現(xiàn)這兩者對應(yīng)的接口。擴(kuò)展協(xié)議的框架代碼可分別參考基于TCP的http以及基于udp的dns協(xié)議實(shí)現(xiàn)代碼。
在進(jìn)行擴(kuò)展協(xié)議開發(fā)之前,需要了解protos/register.go中tcp、udp以及基礎(chǔ)協(xié)議的接口定義:
- TcpPlugin:TCP協(xié)議插件的接口定義。其中Pares()用于解析Packet,ReceivedFin()用于處理TCP斷開連接,GapInStream()處理空包丟包,ConnectionTimeout()處理超時時間
- UdpPlugin:UDP協(xié)議插件的接口定義。其中ParseUdp()用于解析Packet
- ProtocolPlugin:TCP和UDP以及其他擴(kuò)展協(xié)議均需要實(shí)現(xiàn)ProtocolPlugin的基礎(chǔ)接口,主要是提供獲取端口方法
上述對應(yīng)的接口定義如下所示:
<pre><code>
type Plugin interface {
// Called to return the configured ports
GetPorts() []int
}
type TcpPlugin interface {
Plugin
// Called when TCP payload data is available for parsing.
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData
// Called when the FIN flag is seen in the TCP stream.
ReceivedFin(tcptuple *common.TcpTuple, dir uint8, private ProtocolData) ProtocolData
// Called when a packets are missing from the tcp
// stream.
GapInStream(tcptuple *common.TcpTuple, dir uint8, nbytes int,
private ProtocolData) (priv ProtocolData, drop bool)
// ConnectionTimeout returns the per stream connection timeout.
// Return <=0 to set default tcp module transaction timeout.
ConnectionTimeout() time.Duration
}
type UdpPlugin interface {
Plugin
// ParseUdp is invoked when UDP payload data is available for parsing.
ParseUdp(pkt *Packet)
}
</pre></code>
接下來,需要了解config.go中ProtocolCommon的結(jié)構(gòu)體,擴(kuò)展協(xié)議需要繼承該基本結(jié)構(gòu).
協(xié)議的基本配置結(jié)構(gòu)體定義如下所示(該結(jié)構(gòu)體對應(yīng)packetbeat.yml的配置結(jié)構(gòu),參考默認(rèn)的packetbeat.yml文件):
<pre><code>
type ProtocolCommon struct {
Ports []int `config:"ports"`
SendRequest bool `config:"send_request"`
SendResponse bool `config:"send_response"`
TransactionTimeout time.Duration `config:"transaction_timeout"`
}
</pre></code>
最后了解一下packetbeat中的關(guān)于packet結(jié)構(gòu)定義:
<pre><code>
type Packet struct {
Ts time.Time
Tuple common.IpPortTuple
Payload []byte
}
</pre></code>
- Ts:收到數(shù)據(jù)包的時間戳
- Tuple:來源ip+來源端口+目的ip+目的端口的元組
- Payload:應(yīng)用層字節(jié)數(shù),不包括tcp及udp頭部信息,這部分正是七層協(xié)議需要解析的部分
以上,擴(kuò)展協(xié)議的基本思路介紹完畢?,F(xiàn)以sip協(xié)議擴(kuò)展開發(fā)為例:(擴(kuò)展開發(fā)之前,參考前文開發(fā)之前所述)
<pre><code>
cd $GOPATH/src/github.com/elastic/beats/packetbeat/protos
mkdir sip&&cd sip
touch sip.go config.go sip_parse.go
</pre></code>
其中,sip.go用于sip協(xié)議的具體實(shí)現(xiàn),包括實(shí)現(xiàn)基于TCP及UDP對應(yīng)的解析方法,config.go用于sip協(xié)議的配置結(jié)構(gòu)定義,sip_parse.go用于sip消息解析結(jié)構(gòu)的定義。
config.go中定義如下:
<pre><code>
package sip
import (
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/protos"
)
//ProtocolCommon struct
type sipConfig struct {
config.ProtocolCommon ``config:",inline"
}
var (
defaultConfig = sipConfig{
ProtocolCommon: config.ProtocolCommon{
TransactionTimeout: protos.DefaultTransactionExpiration,
},
}
)
</pre></code>
sip下的config.go定義完畢后,在packetbeat.yml中增加sip對應(yīng)的配置,如下所示:
<pre><code>
protocols:
sip:
ports: [5060,5260]
# send_request and send_response control whether or not the stringified SIP
# request and response message are added to the result.
# Nearly all data about the request/response is available in the sip.*
# fields, but this can be useful if you need visibility specifically
# into the request or the response.
# Default: false
# send_request: true
# send_response: true
</pre></code>
在sip.go中實(shí)現(xiàn)udp協(xié)議插件接口方法Parseudp,并注冊協(xié)議,使用registor.go中的register方法,如下:
<pre><code>
func init() {
protos.Register("sip", New)
}
func New(
testMode bool,
results publish.Transactions,
cfg *common.Config,
) (protos.Plugin, error) {
p := &Sip{}
config := defaultConfig
if !testMode {
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
}
if err := p.init(results, &config); err != nil {
return nil, err
}
return p, nil
}
</pre></code>
最后一步,在packetbeat的main.go主程序中加載sip協(xié)議,如下所示:
<pre><code>
package main
import (
"os"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/packetbeat/beater"
// import support protocol modules
_ "github.com/elastic/beats/packetbeat/protos/amqp"
_ "github.com/elastic/beats/packetbeat/protos/dns"
_ "github.com/elastic/beats/packetbeat/protos/http"
_ "github.com/elastic/beats/packetbeat/protos/memcache"
_ "github.com/elastic/beats/packetbeat/protos/mongodb"
_ "github.com/elastic/beats/packetbeat/protos/mysql"
_ "github.com/elastic/beats/packetbeat/protos/nfs"
_ "github.com/elastic/beats/packetbeat/protos/pgsql"
_ "github.com/elastic/beats/packetbeat/protos/redis"
_ "github.com/elastic/beats/packetbeat/protos/sip"
_ "github.com/elastic/beats/packetbeat/protos/thrift"
)
</pre></code>
使用makefile,編譯packeteat,執(zhí)行./packetbeat -N -e后,在終端上會顯示sip協(xié)議已注冊成功。
至此,packetbeat的協(xié)議擴(kuò)展介紹完畢了。