Flink流式廣播demo

公司業(yè)務(wù)中有一些實(shí)時(shí)流計(jì)算業(yè)務(wù)需要在線更新配置文件的內(nèi)容,因此需要流式廣播來實(shí)現(xiàn),測試demo如下:

import org.apache.flink.api.scala._
import com.xuehai.utils.Constants
import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor}
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector

object StreamBroadCastDemo extends Constants{
    def main(args: Array[String]) {
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        //基礎(chǔ)設(shè)置
        env.setStateBackend(new FsStateBackend("file:///D:\\checkpoint"))
        env.enableCheckpointing(60000)//開啟checkPoint,并且每分鐘做一次checkPoint保存
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
        env.getCheckpointConfig.setFailOnCheckpointingErrors(false)//當(dāng)checkpoint出錯后,task是否停止,默認(rèn)為true
        env.setParallelism(1)

        //配置廣播狀態(tài)kafka消費(fèi)實(shí)例
        val configKafkaConsumer = new FlinkKafkaConsumer010[String]("PK-Rank", new SimpleStringSchema(), props)
        configKafkaConsumer.setStartFromLatest()

        //讀取配置文件并生成廣播狀態(tài)
        val mapStateDescriptor = new MapStateDescriptor[String, String]("codeConfig", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
        val init = env.readTextFile("D:\\code.txt")
        val broadStream = env.addSource(configKafkaConsumer).union(init).broadcast(mapStateDescriptor)

        //配置數(shù)據(jù)源kafka消費(fèi)實(shí)例
        val kafkaConsumer = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), props)
        kafkaConsumer.setStartFromLatest()

        //讀取實(shí)時(shí)流數(shù)據(jù),并結(jié)合配置文件
        val streamData = env.addSource(kafkaConsumer).connect(broadStream).process(new BroadcastProcessFunction[String, String, String] {
            override def processBroadcastElement(value: String, ctx: BroadcastProcessFunction[String, String, String]#Context, out: Collector[String]): Unit = {
                val configMap: BroadcastState[String, String] = ctx.getBroadcastState(mapStateDescriptor)
                configMap.put(value.split(",")(0), value.split(",")(1))
            }

            override def processElement(value: String, ctx: BroadcastProcessFunction[String, String, String]#ReadOnlyContext, out: Collector[String]): Unit = {
                val configMap = ctx.getBroadcastState(mapStateDescriptor)
                val name: String = configMap.get(value)

                //配置文件里面沒有的需要判斷處理一下,否則就會重新加載配置文件
                //只要輸出是null,就會重新加載配置文件,之前讀取的kafka廣播內(nèi)容也會被覆蓋掉
                if(name==null)out.collect(null)
                else out.collect(name)
            }
        }).print()
        env.execute("stream broadCast demo")
    }
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容