Nginx=>Flume=>Kafka 流程總結(jié)

nginx=>flume=>kafka

  1. 編寫flume 日志收集文件

nginx日志
access.log====>flume

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/access.log
a1.sources.r1.shell = /bin/sh -c

a1.channels.c1.type = memory

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 這個地址是kafka的監(jiān)聽地址
a1.sinks.k1.brokerList = spark001:9092
# 注意這里的topic是zk的topic
a1.sinks.k1.topic = test
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1

#a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 關(guān)于Kafka的部署

Step 1: Start the zookeeper #得到QuorumPeerMain進(jìn)程

zkServer.sh start

[hadoop@hadoop000 conf]$ vim zoo.cfg

修改數(shù)據(jù)的臨時存放目錄,默認(rèn)是在tmp目錄下的

dataDir=/home/hadoop/app/tmp/zk

:wq 保存退出

Step 2: Start the server 得到==》kafka進(jìn)程

kafka-server-start.sh -deamon $KAFKA_HOME/config/server.properties

配置,修改解壓目錄下config配置文件

配置server.properties[需要注意的地方]
broker.id=0 解釋:是kafka,一個broker
listeners 解釋:監(jiān)聽的端口
host.name 解釋:當(dāng)前機(jī)器
log.dirs 解釋:kafka的日志存放目錄
zookeeper.connect zk的地址

修改好之后,保存退出

Step 3: Create a topic # 注意和之前flume的topic對應(yīng)起來

kafka-topics.sh -create -zookeeper spark001:2181 -replication-factor 1 -partitions 1 -topic test

ps:kafka-topics.sh -list -zookeeper spark001:2181

Step 4: 開啟之前的agent (作為生產(chǎn)者)

flume-ng agent --name a1 --conf . --conf-file ./lamda_imooc.conf -Dflume.root.logger=INFO,console

步驟4:擴(kuò)展:控制臺作為生產(chǎn)者

kafka-console-producer.sh --broker-list spark001:9092 --topic test

Step 5: Start a consumer (消費(fèi)者)

kafka-console-consumer.sh --zookeeper spark001:2181 --topic test

最后 確保flume監(jiān)聽的文件在不斷產(chǎn)生新的日志
文件位置 /root/logs/access.log

#!/bin/bash
   i=1;
   j=0;
   while (test $i -le 6170967 )
     do
       j=`expr $i + 9`  
       sed -n $i,$j'p' /root/logs/all.log >> /root/logs/access.log
       i=`expr $i + 10`
       sleep 5
      # echo $i
     done
  #新建這個shell文件,把這個shell文件執(zhí)行起來

上面的操作執(zhí)行之后,就會收到刷屏的結(jié)果,哈哈哈!!
nginx=>flume=>kafka
補(bǔ)充:
head -n 100 大日志文件.log 100_access.log
wc -l 100_access.log

spark-submit --master local[5] \
--jars $(echo /root/apps/hbase-1.2.0-cdh5.7.0/lib/*.jar | tr ' ' ',') \
--class com.csylh.spark.project.spark.ImoocStatStreamingApp \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
/root/jars/streaming-1.0-SNAPSHOT.jar \
spark001:2181 test test 1

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容