2018-12-25

spark-streaming消費kafka數(shù)據(jù):

首次消費截圖:


手動kill,再次啟動:





KafkaManager類:

package org.apache.spark.streaming.kafka

import kafka.common.TopicAndPartition

import kafka.message.MessageAndMetadata

import kafka.serializer.Decoder

import org.apache.spark.SparkException

import org.apache.spark.rdd.RDD

import org.apache.spark.streaming.StreamingContext

import org.apache.spark.streaming.dstream.InputDStream

import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset

import scala.reflect.ClassTag

/**

? * 手動管理偏移量

? */

class KafkaManager(val kafkaParams:Map[String,String])extends Serializable {

private val kc =new KafkaCluster(kafkaParams)

/**

? ? * 創(chuàng)建數(shù)據(jù)流

? ? */

? def createDirectStream[K: ClassTag,

V: ClassTag,

KD <: Decoder[K]: ClassTag,

VD <: Decoder[V]: ClassTag](ssc: StreamingContext,

kafkaParams:Map[String,String],

topics:Set[String]): InputDStream[(K,V)] =? {

val groupId = kafkaParams("group.id")

// 在zookeeper上讀取offsets前先根據(jù)實際情況更新offsets

? ? setOrUpdateOffsets(topics, groupId)

//從zookeeper上讀取offset開始消費message

? ? val messages = {

val partitionsE =kc.getPartitions(topics)

if (partitionsE.isLeft)

throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")

val partitions = partitionsE.right.get

val consumerOffsetsE =kc.getConsumerOffsets(groupId, partitions)

if (consumerOffsetsE.isLeft)

throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")

val consumerOffsets = consumerOffsetsE.right.get

KafkaUtils.createDirectStream[K,V,KD,VD, (K,V)](

ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K,V]) => (mmd.key, mmd.message))

}

messages

}

/**

? ? * 創(chuàng)建數(shù)據(jù)流前,根據(jù)實際消費情況更新消費offsets

? ? * @param topics

? ? * @param groupId

? ? */

? private def setOrUpdateOffsets(topics:Set[String], groupId:String): Unit = {

topics.foreach(topic => {

var hasConsumed =true

? ? ? val partitionsE =kc.getPartitions(Set(topic))

if (partitionsE.isLeft)

throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")

val partitions = partitionsE.right.get

//kc根據(jù)消費者組和主題對應的分區(qū)從zookeeper獲取偏移量

? ? ? val consumerOffsetsE =kc.getConsumerOffsets(groupId, partitions)

if (consumerOffsetsE.isLeft) hasConsumed =false

? ? ? if (hasConsumed) {// 消費過

? ? ? ? /**

? ? ? ? ? * 如果streaming程序執(zhí)行的時候出現(xiàn)kafka.common.OffsetOutOfRangeException,

? ? ? ? ? * 說明zk上保存的offsets已經(jīng)過時了,即kafka的定時清理策略已經(jīng)將包含該offsets的文件刪除。

? ? ? ? ? * 針對這種情況,只要判斷一下zk上的consumerOffsets和earliestLeaderOffsets的大小,

? ? ? ? ? * 如果consumerOffsets比earliestLeaderOffsets還小的話,說明consumerOffsets已過時,

? ? ? ? ? * 這時把consumerOffsets更新為earliestLeaderOffsets

*/

? ? ? ? println("------消費過------")

val earliestLeaderOffsetsE =kc.getEarliestLeaderOffsets(partitions)

if (earliestLeaderOffsetsE.isLeft)

throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")

val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get

val consumerOffsets = consumerOffsetsE.right.get

// 可能只是存在部分分區(qū)consumerOffsets過時,所以只更新過時分區(qū)的consumerOffsets為earliestLeaderOffsets

? ? ? ? var offsets:Map[TopicAndPartition, Long] =Map()

consumerOffsets.foreach({case(tp, n) =>

val earliestLeaderOffset = earliestLeaderOffsets(tp).offset

if (n < earliestLeaderOffset) {

println("consumer group:" + groupId +",topic:" + tp.topic +",partition:" + tp.partition +

" offsets已經(jīng)過時,更新為" + earliestLeaderOffset)

offsets += (tp -> earliestLeaderOffset)

}

})

//若是kafka分區(qū)發(fā)生新增,則對應的分區(qū)偏移量設(shè)置為從頭開始消費

? ? ? ? val earliestTopicAndPartition:Set[TopicAndPartition] = earliestLeaderOffsets.keySet

for(topicAndPartition <- earliestTopicAndPartition){

if(!consumerOffsets.contains(topicAndPartition)){

println("consumer group:" + groupId +",topic:" + topicAndPartition.topic +",partition:" + topicAndPartition.partition +

" kafka分區(qū)新增設(shè)置偏移量為0L")

offsets += (topicAndPartition ->0L)

}

}

if (offsets.nonEmpty) {

kc.setConsumerOffsets(groupId, offsets)

}

}else {// 沒有消費過

? ? ? ? println("------沒有消費過------")

val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)

var leaderOffsets:Map[TopicAndPartition, LeaderOffset] =null

? ? ? ? if (reset ==Some("smallest")) {

val leaderOffsetsE =kc.getEarliestLeaderOffsets(partitions)

if (leaderOffsetsE.isLeft)

throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")

leaderOffsets = leaderOffsetsE.right.get

}else {

val leaderOffsetsE =kc.getLatestLeaderOffsets(partitions)

if (leaderOffsetsE.isLeft)

throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")

leaderOffsets = leaderOffsetsE.right.get

}

val offsets = leaderOffsets.map {

case (tp, offset) => (tp, offset.offset)

}

kc.setConsumerOffsets(groupId, offsets)

}

})

}

/**

? ? * 更新zookeeper上的消費offsets

? ? * @param rdd

? ? */

? def updateZKOffsets(rdd: RDD[(String,String)]) : Unit = {

val groupId = kafkaParams.get("group.id").get

val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

for (offsets <- offsetsList) {

val topicAndPartition =TopicAndPartition(offsets.topic, offsets.partition)

val o =kc.setConsumerOffsets(groupId,Map((topicAndPartition, offsets.untilOffset)))

if (o.isLeft) {

println(s"Error updating the offset to Kafka cluster: ${o.left.get}")

}

}

}

}

測試object:

package streaming

import kafka.serializer.StringDecoder

import org.apache.log4j.{Level, Logger}

import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.streaming.kafka.KafkaManager

import org.apache.spark.streaming.{Seconds, StreamingContext}

/**

*

*/

object SparkKafkaStreaming{

? def processRdd(rdd: RDD[(String,String)]): Unit = {

? ? val lines = rdd.map(_._2)

? ? lines.foreach(println)

}

def main(args: Array[String]) {

if (args.length <3) {

System.err.println(

s"""

|Usage: DirectKafkaWordCount

|? is a list of one or more Kafka brokers

|? ? is a list of one or more kafka topics to consume from

|? is a consume group

|

? ? ? ? """.stripMargin)

System.exit(1)

}

Logger.getLogger("org").setLevel(Level.WARN)

? ? val Array(brokers, topics, groupId) = args

// Create context with 2 second batch interval

? ? val sparkConf =new SparkConf().setAppName("DirectKafkaWordCount")

sparkConf.setMaster("local[3]")

sparkConf.set("spark.streaming.kafka.maxRatePerPartition","5")

sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

val ssc =new StreamingContext(sparkConf,Seconds(5))

ssc.sparkContext.setLogLevel("WARN")

// Create direct kafka stream with brokers and topics

? ? val topicsSet = topics.split(",").toSet

val kafkaParams =Map[String,String](

"metadata.broker.list" -> brokers,

"group.id" -> groupId,

"auto.offset.reset" ->"smallest"

? ? )

val km =new KafkaManager(kafkaParams)

val messages = km.createDirectStream[String,String, StringDecoder, StringDecoder](

ssc, kafkaParams, topicsSet)

messages.foreachRDD(rdd => {

if (!rdd.isEmpty()) {

// 先處理消息

? ? ? ? processRdd(rdd)

// 再更新offsets

? ? ? ? km.updateZKOffsets(rdd)

}

})

ssc.start()

ssc.awaitTermination()

}

}

修改點:


若是kafka新增分區(qū),zookeeper無對應的分區(qū),消費從頭開始消費

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容