開源網(wǎng)絡(luò)抓包與分析框架學(xué)習(xí)-Packetbeat篇

go初學(xué)者,有興趣者,歡迎交流學(xué)習(xí)。

開源簡介

packbeat是一個開源的實(shí)時網(wǎng)絡(luò)抓包與分析框架,內(nèi)置了很多常見的協(xié)議捕獲及解析,如HTTP、MySQL、Redis等。在實(shí)際使用中,通常和Elasticsearch以及kibana聯(lián)合使用,用于數(shù)據(jù)搜索和分析以及數(shù)據(jù)展示。

  • 開發(fā)環(huán)境:Go語言
  • Git:源碼管理
  • IDE:推薦sublime或者liteide

開發(fā)之前

1.packbeat已經(jīng)被elastic整合在beats項目中,使用前登錄github,并打開
https://github.com/elasticsearch/beats.fork到自己的倉庫。
如:https://github.com/lindsay-show/packbeat

2.創(chuàng)建相應(yīng)目錄
<pre><code>
mkdir -p $GOPATH/src/github.com/elastic

cd $GOPATH/src/github.com/elastic</pre></code>
3.git clone

<pre><code>git clone https://github.com/elasitc/beats.git

cd beats</pre></code>

4.修改官方庫為upstream源,設(shè)置自己的倉庫為orgin源

<pre><code>git remote rename origin upstream

git remote add origin git@github.com:lindsay-show/packbeat.git</pre></code>

5.獲取最新代碼(剛fork,可忽略),并創(chuàng)建分支用于自定義功能開發(fā)

<pre><code>git pull upstream master

git checkout -b mypackbeat</pre></code>

6.切換到packbeat,并獲取依賴信息

<pre><code>cd packbeat

mkdir -p $GOPATH/src/golang.org/x/

cd $GOPATH/src/golang.org/x

git clone https://github.com/golang/tools.git

go get github.com/tools/godep</pre></code>

7.使用make編譯packbeat源碼,得到packbeat可執(zhí)行文件

注:

[1] git的相關(guān)介紹和命令可參考 Git教程

[2] go安裝及環(huán)境變量配置可參考 Golang官網(wǎng)

源碼框架

packbeat項目源碼結(jié)構(gòu)如下:


packetbeat源碼代碼結(jié)構(gòu).png

packetbeat整合在beats項目中,其中還包括topbeat以及filebeat,現(xiàn)簡要介紹beats源碼框架內(nèi)容如下:

  • /libbeat:公共依賴庫
  • /filebeat:logstash升級版,處理日志類型數(shù)據(jù)
  • /packbeat:網(wǎng)絡(luò)抓包
  • /topbeat:監(jiān)控系統(tǒng)性能;
  • /vendor:依賴的第三方庫(如dns開源庫或者其他協(xié)議棧)
  • /tests:用于測試的pcamp抓包文件
  • /scripts:測試腳本

關(guān)于topbeat及filebeat的更多介紹參考elastic官網(wǎng)

packebeat源碼框架介紹如下:

  • /packetbeat/main.go:項目啟動入口;
  • /packetbeat/config/:config.go,定義了所有配置相關(guān)的struct結(jié)構(gòu)體
  • /packetbeat/debian/:打包相關(guān)
  • /packetbeat/docs/:文檔
  • /packetbeat/etc/:配置文件示例
  • /packetbeat/procs/:獲取系統(tǒng)內(nèi)核運(yùn)作狀態(tài)與進(jìn)程信息的工具類
  • /packetbeat/protos/:自定義協(xié)議類,每個子目錄對應(yīng)一個應(yīng)用協(xié)議,包含配置相關(guān)的結(jié)構(gòu)體及具體實(shí)現(xiàn)
  • /packetbeat/sniffer/:三種不同抓包方式的實(shí)現(xiàn),如pcap、af_packet及pf_ring
  • /packetbeat/tests/:測試相關(guān)的文件,包含協(xié)議pcap文件及python測試腳本

注:以上介紹針對packetbeat-1.2.1,區(qū)別官網(wǎng)的開發(fā)幫助文檔(官網(wǎng)未更新)。

工作原理

介紹了beats及packetbeat源碼結(jié)構(gòu),簡要說明一下packetbeat的工作原理:

每一個協(xié)議都有一個或者多個固定的端口用于通信,開發(fā)者要做的事情就是定義協(xié)議端口,然后按照TCP以及UDP實(shí)現(xiàn)對應(yīng)的接口,Packetbeat會捕獲到指定端口的數(shù)據(jù)包,然后交給開發(fā)者定義的方法來解析,如TCP對應(yīng)的是Parse,UDP是ParseUdp.解析出來的結(jié)構(gòu)化數(shù)據(jù)封裝成Json,插入到Elasticsearch中,后續(xù)便可使用Elasticsearch的搜索和數(shù)據(jù)統(tǒng)計能力進(jìn)行應(yīng)用層數(shù)據(jù)分析。

使用方法

了解Packetbeat的工作原理后,接下來介紹如何使用packetbeat進(jìn)行網(wǎng)絡(luò)數(shù)據(jù)包捕獲及分析。

在上述介紹中,我們知道packetbeat/protos目錄下支持自定義協(xié)議,目前Packebeat支持的協(xié)議如下:

  • ICMP (v4 and v6)
  • DNS
  • HTTP
  • Mysql
  • PostgreSQL
  • Redis
  • Thrift-RPC
  • MongoDB
  • Memcache

以HTTP為例,安裝packetbeat源碼后,配置文件packetbeat.yml中默認(rèn)已經(jīng)配置了上述支持的協(xié)議類型。使用步驟簡述如下:

  • 安裝Packetbeat源碼
  • 配置packetbeat.yml文件,默認(rèn)不用更改(默認(rèn)配置輸出到elasticsearch)
  • 加載packetbeat索引至elasticsearch中(使用第三方腳本)
  • 啟動elasticsearch及kibana,查看http數(shù)據(jù)包捕獲及分析

如:啟動packetbeat,打開幾個網(wǎng)頁,在終端極即可看到packetbeat已注冊的協(xié)議類型以及http請求數(shù)據(jù)和應(yīng)答數(shù)據(jù)包

<pre><code>
cd packetbeat-1.2.1

./packetbeat -N -e </pre></code>

注:

[1] Packetbeat詳細(xì)使用說明,請參考Packetbeat官方幫助文檔,非常詳細(xì)。

[2] Elasticsearch及kibana的安裝和使用,請參考Elastic官方幫助文檔。

擴(kuò)展協(xié)議開發(fā)

在前面介紹到,目前packetbeat支持的協(xié)議類型主要是HTTP等常見協(xié)議類型,即時通信協(xié)議,如sip、msrp以及xmpp等暫不支持。如何對packetbeat進(jìn)行協(xié)議擴(kuò)展是我們研究該源碼的主要目的。

網(wǎng)絡(luò)傳輸兩大協(xié)議TCP和UDP,應(yīng)用層協(xié)議都離不開這兩種協(xié)議,如源碼中的HTTP、MySQL走的是TCP傳輸協(xié)議,DNS走的是UDP協(xié)議,在Packetbeat里面,要實(shí)現(xiàn)自定義協(xié)議,只需實(shí)現(xiàn)這兩者對應(yīng)的接口。擴(kuò)展協(xié)議的框架代碼可分別參考基于TCP的http以及基于udp的dns協(xié)議實(shí)現(xiàn)代碼。

在進(jìn)行擴(kuò)展協(xié)議開發(fā)之前,需要了解protos/register.go中tcp、udp以及基礎(chǔ)協(xié)議的接口定義:

  • TcpPlugin:TCP協(xié)議插件的接口定義。其中Pares()用于解析Packet,ReceivedFin()用于處理TCP斷開連接,GapInStream()處理空包丟包,ConnectionTimeout()處理超時時間
  • UdpPlugin:UDP協(xié)議插件的接口定義。其中ParseUdp()用于解析Packet
  • ProtocolPlugin:TCP和UDP以及其他擴(kuò)展協(xié)議均需要實(shí)現(xiàn)ProtocolPlugin的基礎(chǔ)接口,主要是提供獲取端口方法

上述對應(yīng)的接口定義如下所示:
<pre><code>
type Plugin interface {

// Called to return the configured ports

GetPorts() []int

}

type TcpPlugin interface {

Plugin
// Called when TCP payload data is available for parsing.
Parse(pkt *Packet, tcptuple *common.TcpTuple,
    dir uint8, private ProtocolData) ProtocolData
// Called when the FIN flag is seen in the TCP stream.
ReceivedFin(tcptuple *common.TcpTuple, dir uint8,   private ProtocolData) ProtocolData

// Called when a packets are missing from the tcp
// stream.

GapInStream(tcptuple *common.TcpTuple, dir uint8, nbytes int,
    private ProtocolData) (priv ProtocolData, drop bool)
// ConnectionTimeout returns the per stream connection timeout.
// Return <=0 to set default tcp module transaction timeout.
ConnectionTimeout() time.Duration

}

type UdpPlugin interface {

Plugin

// ParseUdp is invoked when UDP payload data is available for parsing.

ParseUdp(pkt *Packet)

}
</pre></code>

接下來,需要了解config.go中ProtocolCommon的結(jié)構(gòu)體,擴(kuò)展協(xié)議需要繼承該基本結(jié)構(gòu).

協(xié)議的基本配置結(jié)構(gòu)體定義如下所示(該結(jié)構(gòu)體對應(yīng)packetbeat.yml的配置結(jié)構(gòu),參考默認(rèn)的packetbeat.yml文件):
<pre><code>
type ProtocolCommon struct {

Ports              []int         `config:"ports"`

SendRequest        bool          `config:"send_request"`

SendResponse       bool          `config:"send_response"`

TransactionTimeout time.Duration `config:"transaction_timeout"`

}
</pre></code>

最后了解一下packetbeat中的關(guān)于packet結(jié)構(gòu)定義:
<pre><code>
type Packet struct {

Ts      time.Time

Tuple   common.IpPortTuple

Payload []byte

}
</pre></code>

  • Ts:收到數(shù)據(jù)包的時間戳
  • Tuple:來源ip+來源端口+目的ip+目的端口的元組
  • Payload:應(yīng)用層字節(jié)數(shù),不包括tcp及udp頭部信息,這部分正是七層協(xié)議需要解析的部分

以上,擴(kuò)展協(xié)議的基本思路介紹完畢?,F(xiàn)以sip協(xié)議擴(kuò)展開發(fā)為例:(擴(kuò)展開發(fā)之前,參考前文開發(fā)之前所述)

<pre><code>
cd $GOPATH/src/github.com/elastic/beats/packetbeat/protos

mkdir sip&&cd sip

touch sip.go config.go sip_parse.go

</pre></code>

其中,sip.go用于sip協(xié)議的具體實(shí)現(xiàn),包括實(shí)現(xiàn)基于TCP及UDP對應(yīng)的解析方法,config.go用于sip協(xié)議的配置結(jié)構(gòu)定義,sip_parse.go用于sip消息解析結(jié)構(gòu)的定義。
config.go中定義如下:
<pre><code>
package sip

import (

"github.com/elastic/beats/packetbeat/config"

"github.com/elastic/beats/packetbeat/protos"

)

//ProtocolCommon struct

type sipConfig struct {

config.ProtocolCommon ``config:",inline"

}

var (
defaultConfig = sipConfig{

    ProtocolCommon: config.ProtocolCommon{

        TransactionTimeout: protos.DefaultTransactionExpiration,
    },
}

)
</pre></code>
sip下的config.go定義完畢后,在packetbeat.yml中增加sip對應(yīng)的配置,如下所示:
<pre><code>
protocols:
sip:
ports: [5060,5260]
# send_request and send_response control whether or not the stringified SIP
# request and response message are added to the result.
# Nearly all data about the request/response is available in the sip.*
# fields, but this can be useful if you need visibility specifically
# into the request or the response.
# Default: false
# send_request: true
# send_response: true
</pre></code>

在sip.go中實(shí)現(xiàn)udp協(xié)議插件接口方法Parseudp,并注冊協(xié)議,使用registor.go中的register方法,如下:
<pre><code>
func init() {

protos.Register("sip", New)

}

func New(
testMode bool,

results publish.Transactions,

cfg *common.Config,

) (protos.Plugin, error) {

p := &Sip{}
config := defaultConfig
if !testMode {
    if err := cfg.Unpack(&config); err != nil {
        return nil, err
    }
}
if err := p.init(results, &config); err != nil {
    return nil, err
}
return p, nil

}
</pre></code>
最后一步,在packetbeat的main.go主程序中加載sip協(xié)議,如下所示:
<pre><code>
package main

import (

"os"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/packetbeat/beater"
// import support protocol modules
_ "github.com/elastic/beats/packetbeat/protos/amqp"
_ "github.com/elastic/beats/packetbeat/protos/dns"
_ "github.com/elastic/beats/packetbeat/protos/http"
_ "github.com/elastic/beats/packetbeat/protos/memcache"
_ "github.com/elastic/beats/packetbeat/protos/mongodb"
_ "github.com/elastic/beats/packetbeat/protos/mysql"
_ "github.com/elastic/beats/packetbeat/protos/nfs"
_ "github.com/elastic/beats/packetbeat/protos/pgsql"
_ "github.com/elastic/beats/packetbeat/protos/redis"
_ "github.com/elastic/beats/packetbeat/protos/sip"
_ "github.com/elastic/beats/packetbeat/protos/thrift"

)
</pre></code>

使用makefile,編譯packeteat,執(zhí)行./packetbeat -N -e后,在終端上會顯示sip協(xié)議已注冊成功。

至此,packetbeat的協(xié)議擴(kuò)展介紹完畢了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容