logstash input插件開發(fā)

logstash input插件開發(fā)

logstash作為一個數(shù)據(jù)管道中間件,支持對各種類型數(shù)據(jù)的采集與轉(zhuǎn)換,并將數(shù)據(jù)發(fā)送到各種類型的存儲庫,比如實現(xiàn)消費kafka數(shù)據(jù)并且寫入到Elasticsearch, 日志文件同步到對象存儲S3等,mysql數(shù)據(jù)同步到Elasticsearch等。

logstash內(nèi)部主要包含三個模塊:

input: 從數(shù)據(jù)源獲取數(shù)據(jù)
filter: 過濾、轉(zhuǎn)換數(shù)據(jù)
output: 輸出數(shù)據(jù)
image

不同類型的數(shù)據(jù)都可以通過對應(yīng)的input-plugin, output-plugin完成數(shù)據(jù)的輸入與輸出。如需要消費kafka中的數(shù)據(jù)并寫入到Elasticsearch中,則需要使用logstash的kafka-input-plugin完成數(shù)據(jù)輸入,logstash-output-elasticsearch完成數(shù)據(jù)輸出。如果需要對輸入數(shù)據(jù)進(jìn)行過濾或者轉(zhuǎn)換,比如根據(jù)關(guān)鍵詞過濾掉不需要的內(nèi)容,或者時間字段的格式轉(zhuǎn)換,就需要又filter-plugin完成了。

logstash的input插件目前已經(jīng)有幾十種了,支持大多數(shù)比較通用或開源的數(shù)據(jù)源的輸入。但如果公司內(nèi)部開發(fā)的數(shù)據(jù)庫或其它存儲類的服務(wù)不能和開源產(chǎn)品在接口協(xié)議上兼容,比如騰訊自研的消息隊列服務(wù)CMQ不依賴于其它的開源消息隊列產(chǎn)品,所以不能直接使用logstash的logstash-input-kafka或logstash-input-rabbitmq同步CMQ中的數(shù)據(jù);騰訊云對象存儲服務(wù)COS, 在鑒權(quán)方式上和AWS的S3存在差異,也不能直接使用logstash-input-s3插件從COS中讀取數(shù)據(jù),對于這種情況,就需要自己開發(fā)logstash的input插件了。

本文以開發(fā)logstash的cos input插件為例,介紹如何開發(fā)logstash的input插件。

logstash官方提供了有個簡單的input plugin example可供參考:
https://github.com/logstash-plugins/logstash-input-example/

環(huán)境準(zhǔn)備

logstash使用jruby開發(fā),首先要配置jruby環(huán)境:

  1. 安裝rvm:

    rvm是一個ruby管理器,可以安裝并管理ruby環(huán)境,也可以通過命令行切換到不同的ruby版本。

    gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 7D2BAF1CF37B13E2069D6956105BD0E739499BDB
    
    \curl -sSL https://get.rvm.io | bash -s stable
    
    source /etc/profile.d/rvm.sh
    
  2. 安裝jruby

    rvm install jruby
    
    rvm use jruby
    
  3. 安裝包管理工具bundle和測試工具rspec

    gem install bundle
    gem install rspec
    

從example開始

  1. clone logstash-input-example

    git clone https://github.com/logstash-plugins/logstash-input-example.git
    
  2. 將clone出來的logstash-input-example源碼copy到logstash-input-cos目錄,并刪除.git文件夾,目的是以logstash-input-example的源碼為參考進(jìn)行開發(fā),同時把需要改動名稱的地方修改一下:

     mv logstash-input-example.gemspec logstash-input-cos.gemspec
     mv lib/logstash/inputs/example.rb lib/logstash/inputs/cos.rb
     mv spec/inputs/example_spec.rb spec/inputs/cos_spec.rb
    
  3. 建立的源碼目錄結(jié)構(gòu)如圖所示:

image

其中,重要文件的作用說明如下:

  • cos.rb: 主文件,在該文件中編寫logstash配置文件的讀寫與源數(shù)據(jù)獲取的代碼,需要繼承LogStash::Inputs::Base基類
  • cos_spec.rb: 單元測試文件,通過rspec可以對cos.rb中的代碼進(jìn)行測試
  • logstash-input-cos.gemspec: 類似于maven中的pom.xml文件,配置工程的版本、名稱、licene,包依賴等,通過bundle命令可以下載依賴包

配置并下載依賴

因為騰訊云COS服務(wù)沒有ruby sdk, 因為只能依賴其Java sdk進(jìn)行開發(fā),首先添加對cos java sdk的依賴。在logstash-input-cos.gemspec中Gem dependencies配置欄中增加以下內(nèi)容:

# Gem dependencies
  s.requirements << "jar 'com.qcloud:cos_api', '5.4.4'"
  s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
  s.add_runtime_dependency 'logstash-codec-plain'
  s.add_runtime_dependency 'stud', '>= 0.0.22'
  s.add_runtime_dependency 'jar-dependencies'
  s.add_development_dependency 'logstash-devutils', '1.3.6'

相比logstash-input-example.gemspec,增加了對com.qcloud:cos_api包以及jar-dependencies包的依賴,jar-dependencies用于在ruby環(huán)境中管理jar包,并且可以跟蹤jar包的加載狀態(tài)。

然后,在logstash-input-cos.gemspec中增加配置:

s.platform = 'java'

這樣可以成功下載java依賴包,并且可以在ruby代碼中直接調(diào)用java代碼。

最后,執(zhí)行以下命令下載依賴:

bundle install

編寫代碼

logstash-input-cos的代碼邏輯其實比較簡單,主要是通過執(zhí)行定時任務(wù),調(diào)用cos java sdk中的listObjects方法,獲取到指定bucket里的數(shù)據(jù),并在每次定時任務(wù)執(zhí)行結(jié)束后設(shè)置marker保存在本地,再次執(zhí)行時從marker位置獲取數(shù)據(jù),以實現(xiàn)數(shù)據(jù)的增量同步。

jar包的引用

因為要調(diào)用cos java sdk中的代碼,先引用該jar包:

require 'cos_api-5.4.4.jar'
java_import com.qcloud.cos.COSClient;
java_import com.qcloud.cos.ClientConfig;
java_import com.qcloud.cos.auth.BasicCOSCredentials;
java_import com.qcloud.cos.auth.COSCredentials;
java_import com.qcloud.cos.exception.CosClientException;
java_import com.qcloud.cos.exception.CosServiceException;
java_import com.qcloud.cos.model.COSObjectSummary;
java_import com.qcloud.cos.model.ListObjectsRequest;
java_import com.qcloud.cos.model.ObjectListing;
java_import com.qcloud.cos.region.Region;

讀取配置文件

logstash配置文件讀取的代碼如圖所示:


image

config_name為cos,其它的配置項讀取代碼按照ruby的代碼規(guī)范編寫,添加類型校驗與默認(rèn)值,就可以從以下配置文件中讀取配置項:

input {
    cos {
        "endpoint" => "cos.ap-guangzhou.myqcloud.com"
        "access_key_id" => "*****"
        "access_key_secret" => "****"
        "bucket" => "******"
        "region" => "ap-guangzhou"
        "appId" => "**********"
        "interval" => 60
    }
}

output {
    stdout {
        codec=>rubydebug
    }
}

實現(xiàn)register方法

logstash input插件必須實現(xiàn)另個方法:register 和run

register方法類似于初始化方法,在該方法中可以直接使用從配置文件讀取并賦值的變量,完成cos client的初始化,代碼如下:

    # 1 初始化用戶身份信息(appid, secretId, secretKey)
    cred = com.qcloud.cos.auth.BasicCOSCredentials.new(@access_key_id, @access_key_secret)
    # 2 設(shè)置bucket的區(qū)域, COS地域的簡稱請參照 https://www.qcloud.com/document/product/436/6224
    clientConfig = com.qcloud.cos.ClientConfig.new(com.qcloud.cos.region.Region.new(@region))
    # 3 生成cos客戶端
    @cosclient = com.qcloud.cos.COSClient.new(cred, clientConfig)
    # bucket名稱, 需包含appid
    bucketName = @bucket + "-"+ @appId
    @bucketName = bucketName

    @listObjectsRequest = com.qcloud.cos.model.ListObjectsRequest.new()
    # 設(shè)置bucket名稱
    @listObjectsRequest.setBucketName(bucketName)
    # prefix表示列出的object的key以prefix開始
    @listObjectsRequest.setPrefix(@prefix)
    # 設(shè)置最大遍歷出多少個對象, 一次listobject最大支持1000
    @listObjectsRequest.setMaxKeys(1000)
    @listObjectsRequest.setMarker(@markerConfig.getMarker)

示例代碼中設(shè)置了@cosclient和@listObjectRequest為全局變量, 因為在run方法中會用到這兩個變量。

注意在ruby中調(diào)用java代碼的方式:沒有變量描述符;不能直接new Object(),而只能Object.new().

實現(xiàn)run方法

run方法獲取數(shù)據(jù)并將數(shù)據(jù)流轉(zhuǎn)換成event事件

最簡單的run方法為:

def run(queue)
    Stud.interval(@interval) do
      event = LogStash::Event.new("message" => @message, "host" => @host)
      decorate(event)
      queue << event
    end # loop
  end # def run

代碼說明:

  • 通過Stud ruby模塊執(zhí)行定時任務(wù),interval可自定義,從配置文件中讀取
  • 生成event, 示例代碼生成了一個包含兩個字段數(shù)據(jù)的event
  • 調(diào)用decorate()方法, 給該event打上tag,如果配置的話
  • queue<<event, 將event插入到數(shù)據(jù)管道中,發(fā)送給filter處理

logstash-input-cos的run方法實現(xiàn)為:

def run(queue)
    @current_thread = Thread.current
    Stud.interval(@interval) do
      process(queue)
    end
end
 
def process(queue)
    @logger.info('Marker from: ' + @markerConfig.getMarker)
    
    objectListing = @cosclient.listObjects(@listObjectsRequest)
    nextMarker = objectListing.getNextMarker()
    cosObjectSummaries = objectListing.getObjectSummaries()
    cosObjectSummaries.each do |obj|
       # 文件的路徑key
       key = obj.getKey()
    
       if stop?
         @logger.info("stop while attempting to read log file")
         break
       end
       # 根據(jù)key獲取內(nèi)容
       getObject(key) { |log|
         # 發(fā)送消息
         @codec.decode(log) do |event|
           decorate(event)
           queue << event
         end
       }

       #記錄 marker
       @markerConfig.setMarker(key)
       @logger.info('Marker end: ' + @markerConfig.getMarker)
    end
  end


  # 獲取下載輸入流
 def getObject(key, &block)
    getObjectRequest = com.qcloud.cos.model.GetObjectRequest.new(@bucketName, key)
    cosObject = @cosclient.getObject(getObjectRequest)
    cosObjectInput = cosObject.getObjectContent()
    buffered =BufferedReader.new(InputStreamReader.new(cosObjectInput))
    while (line = buffered.readLine())
      block.call(line)
    end
  end

測試代碼

在spec/inputs/cos_spec.rb中增加如下測試代碼:

# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/cos"

describe LogStash::Inputs::Cos do

  it_behaves_like "an interruptible input plugin" do
    let(:config) { {
        "endpoint" => 'cos.ap-guangzhou.myqcloud.com',
        "access_key_id" => '*',
        "access_key_secret" => '*',
        "bucket" => '*',
         "region" => 'ap-guangzhou',
         "appId" => '*',
        "interval" => 60 } }
  end
end

rspec是一個ruby測試庫,通過bundle命令執(zhí)行rspec:

bundle exec rspec

如果cos.rb中的代碼沒有語法或運行時錯誤,則會出現(xiàn)如果信息表明測試成功:

Finished in 0.8022 seconds (files took 3.45 seconds to load)
1 example, 0 failures

構(gòu)建并測試input-plugin-cos

build

使用gem對input-plugin-cos插件源碼進(jìn)行build:

gem build logstash-input-cos.gemspec

構(gòu)建完成后會生成一個名為logstash-input-cos-0.0.1-java.gem的文件

test

在logstash的解壓目錄下,執(zhí)行一下命令安裝logstash-input-cos plugin:

./bin/logstash-plugin install /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem

執(zhí)行結(jié)果為:

Validating /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem
Installing logstash-input-cos
Installation successful

另外,可以通過./bin/logstash-plugin list命令查看logstash已經(jīng)安裝的所有input/output/filter/codec插件。

生成配置文件cos.logstash.conf,內(nèi)容為:

input {
    cos {
        "endpoint" => "cos.ap-guangzhou.myqcloud.com"
        "access_key_id" => "*****"
        "access_key_secret" => "****"
        "bucket" => "******"
        "region" => "ap-guangzhou"
        "appId" => "**********"
        "interval" => 60
    }
}

output {
    stdout {
        codec=>rubydebug
    }
}

該配置文件使用騰訊云官網(wǎng)賬號的secret_id和secret_key進(jìn)行權(quán)限驗證,拉取指定bucket里的數(shù)據(jù),為了測試,將output設(shè)置為標(biāo)準(zhǔn)輸出。

執(zhí)行l(wèi)ogstash:

./bin/logstash -f cos.logstash.conf

輸出結(jié)果為:

Sending Logstash's logs to /root/logstash-5.6.4/logs which is now configured via log4j2.properties
[2018-07-30T19:26:17,039][WARN ][logstash.runner          ] --config.debug was specified, but log.level was not set to 'debug'! No config info will be logged.
[2018-07-30T19:26:17,048][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.4/modules/netflow/configuration"}
[2018-07-30T19:26:17,049][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.4/modules/fb_apache/configuration"}
[2018-07-30T19:26:17,252][INFO ][logstash.inputs.cos      ] Using version 0.1.x input plugin 'cos'. This plugin isn't well supported by the community and likely has no maintainer.
[2018-07-30T19:26:17,341][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2018-07-30T19:26:17,362][INFO ][logstash.inputs.cos      ] Registering cos input {:bucket=>"bellengao", :region=>"ap-guangzhou"}
[2018-07-30T19:26:17,528][INFO ][logstash.pipeline        ] Pipeline main started
[2018-07-30T19:26:17,530][INFO ][logstash.inputs.cos      ] Marker from:
log4j:WARN No appenders could be found for logger (org.apache.http.client.protocol.RequestAddCookies).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[2018-07-30T19:26:17,574][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-30T19:26:17,714][INFO ][logstash.inputs.cos      ] Marker end: access.log
{
       "message" => "77.179.66.156 - - [25/Oct/2016:14:49:33 +0200] \"GET / HTTP/1.1\" 200 612 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
      "@version" => "1",
    "@timestamp" => 2018-07-30T11:26:17.710Z
}
{
       "message" => "77.179.66.156 - - [25/Oct/2016:14:49:34 +0200] \"GET /favicon.ico HTTP/1.1\" 404 571 \"http://localhost:8080/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
      "@version" => "1",
    "@timestamp" => 2018-07-30T11:26:17.711Z
}

在cos中的bucket里上傳了名為access.log的nginx日志,上述輸出結(jié)果中最后打印出來的每個json結(jié)構(gòu)體構(gòu)成一個event, 其中message消息即為access.log中每一條日志。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評論 19 139
  • Logstash是一個具有實時管線能力的開源數(shù)據(jù)收集引擎。在ELK Stack中,通常選擇更輕量級的Filebea...
    steanxy閱讀 3,610評論 1 6
  • 概述 監(jiān)控預(yù)警平臺, eagle + eye (鷹眼)的合體詞, 寓意可以快速發(fā)現(xiàn)問題, 并及時作出響應(yīng),Eagl...
    Kungfu貓熊閱讀 7,658評論 0 52
  • 0.摘要 這是開智學(xué)堂「信息分析」課程第1周基礎(chǔ)任務(wù),包含分析背景、思路與分析步驟、主要結(jié)論、進(jìn)一步討論等內(nèi)容,完...
    空靈一月閱讀 431評論 0 1
  • 《呂氏春秋》里有一段,講孔子周游列國,曾因兵荒馬亂,旅途困頓,三餐以野菜果腹,大家已七日沒吃下一粒米飯。 一天,顏...
    劉現(xiàn)輝民俗畫閱讀 1,999評論 0 1

友情鏈接更多精彩內(nèi)容