SparkStreaming-Kafka通過(guò)指定偏移量獲取數(shù)據(jù)
1.數(shù)據(jù)源
'310999003001', '3109990030010220140820141230292','00000000','','2017-08-20 14:09:35','0',255,'SN', 0.00,'4','','310999','310999003001','02','','','2','','','2017-08-20 14:12:30','2017-08-20 14:16:13',0,0,'2017-08-21 18:50:05','','',' '
'310999003102', '3109990031020220140820141230266','粵BT96V3','','2017-08-20 14:09:35','0',21,'NS', 0.00,'2','','310999','310999003102','02','','','2','','','2017-08-20 14:12:30','2017-08-20 14:16:13',0,0,'2017-08-21 18:50:05','','',' '
2.生產(chǎn)者
import java.util.Properties
import com.google.gson.{Gson, GsonBuilder}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Date 2022/11/8 9:49
*/
object KafkaEventProducer {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("KafkaEventProducer").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val topic = "ly_test"
val props = new Properties()
props.put("bootstrap.servers","node01:9092,node02:9092,node03:9092")
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("acks","all")
// props.put("security.protocol","SASL_PLAINTEXT")
// props.put("sasl.mechanism","PLAIN")
// props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='sjf' password='123sjf';")
val kafkaProducer = new KafkaProducer[String,String](props)
val srcRDD: RDD[String] = sc.textFile("file:///F:\\work\\sun\\lywork\\hbaseoper\\datas\\kafkaproducerdata.txt")
val records: Array[Array[String]] = srcRDD.filter(!_.startsWith(";")).map(_.split(",")).collect()
//對(duì)數(shù)據(jù)進(jìn)行預(yù)處理形成json形式
for(record<-records){
val trafficInfo = new TrafficInfo(record(0),record(2),record(4),record(6),record(13))
// 不能用new Gson() 會(huì)出現(xiàn) \u0027
// val trafficInfoJson: String = new Gson().toJson(trafficInfo)
//使用Gson gson = new Gson(),進(jìn)行對(duì)象轉(zhuǎn)化json格式時(shí),單引號(hào)會(huì)被轉(zhuǎn)換成u0027代碼。使用以下方法進(jìn)行替換
val gson: Gson = new GsonBuilder().disableHtmlEscaping().create()
val trafficInfoJson: String = gson.toJson(trafficInfo)
kafkaProducer.send(new ProducerRecord[String,String](topic,trafficInfoJson))
println("Message Sent:"+trafficInfoJson)
Thread.sleep(2000)
}
sc.stop()
kafkaProducer.flush()
kafkaProducer.close()
}
//相機(jī)編號(hào)
//車牌號(hào)
//時(shí)間
//速度
//車道編號(hào)
case class TrafficInfo(camer_id:String,car_id:String,event_time:String,car_speed:String,car_code:String)
}
3.消費(fèi)者獲取指定偏移量
import java.text.SimpleDateFormat
import java.util
import java.util.Date
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Date 2022/11/5 16:38
*/
/**
* 通過(guò)偏移量獲取數(shù)據(jù)
*/
object AttainDataFromOffset {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("AttainDataFromOffset").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc,Seconds(5))
val kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "ly",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
// kafka 帶有賬號(hào)密碼sasl協(xié)議的認(rèn)證
// "security.protocol" -> "SASL_PLAINTEXT",
// "sasl.mechanism" -> "PLAIN",
// "sasl.jaas.config" -> "org.apache.kafka.common.security.plain.PlainLoginModule required username='sjf' password='123sjf';"
)
val topics = Array("ly_test")
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](topics, kafkaParams)
)
val res: DStream[(String, String, Int, Long)] = stream.map(recoed => {
val key: String = recoed.key()
val value: String = recoed.value()
val partionId: Int = recoed.partition()
val offset: Long = recoed.offset()
(key, value, partionId, offset)
//println(key+"\t"+value+"\t"+partionId+"\t"+offset)
})
// 指定偏移量
val offsetRanges = Array(
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange("lawyee_test", 0, 1L, 10L)
)
// 獲取指定偏移量的數(shù)據(jù)
import scala.collection.JavaConverters._
val jkafkaParams: util.Map[String, Object] = kafkaParams.asJava
val offsetRDD: RDD[ConsumerRecord[String, String]] = KafkaUtils.createRDD[String,String](
sc,
jkafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent
)
val resRDD: RDD[(String, String, Int, Long,String,TimestampType)] = offsetRDD.map(recoed => {
val key: String = recoed.key()
val value: String = recoed.value()
val partionId: Int = recoed.partition()
val offset: Long = recoed.offset()
var time: Long = recoed.timestamp()
val timeStr = timeStampToDate(time)
val timestampType: TimestampType = recoed.timestampType()
(key, value, partionId, offset,timeStr,timestampType)
//println(key+"\t"+value+"\t"+partionId+"\t"+offset)
})
resRDD.foreach(println(_))
res.print()
ssc.start()
ssc.awaitTermination()
}
// 時(shí)間格式時(shí)間 轉(zhuǎn)換為字符串時(shí)間
def dateToString(date:Date): String ={
val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val strDate: String = simpleDateFormat.format(date)
strDate
}
// 字符串時(shí)間轉(zhuǎn)換為時(shí)間格式時(shí)間
def strToDate(str:String):Date = {
val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date: Date = simpleDateFormat.parse(str)
date
}
// 時(shí)間戳轉(zhuǎn)化為字符串時(shí)間
def timeStampToDate(timeStamp:Long): String ={
val date = new Date(timeStamp)
val strDate: String = dateToString(date)
strDate
}
//字符串時(shí)間轉(zhuǎn)化為時(shí)間戳
def dateToTimeStamp(strDate:String): Long ={
val date: Date = strToDate(strDate)
val timeStamp: Long = date.getTime
timeStamp
}
}