時態(tài)表

TemporalTable

關(guān)于時態(tài)表的介紹可以看看flink中文社區(qū)的這篇文章Flink SQL 如何實(shí)現(xiàn)數(shù)據(jù)流的 Join?還有該篇博文Flink Table & SQL 時態(tài)表Temporal Table
append表(追加表)關(guān)聯(lián)時態(tài)表數(shù)據(jù),進(jìn)行流join操作(時態(tài)表可以減少時態(tài)表中保存的狀態(tài))

import java.util.Properties

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.time.Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.TemporalTableFunction

object FlinkTemporalTable {
  def main(args: Array[String]): Unit = {
    // 獲取流處理執(zhí)行壞境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 通過流處理執(zhí)行引擎構(gòu)建表執(zhí)行引擎
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    val props = new Properties()
    props.setProperty("bootstrap.servers", "xxx.xxx.xxx.xxx:9092")
    props.setProperty("auto.offset.reset", "latest") // 設(shè)置消費(fèi)起點(diǎn) earliest,latest
    props.setProperty("group.id", "local_consumer")

    val foTopic = "order"
    val fcdv2Topic = "deal"

    // 獲取到原始數(shù)據(jù)
    val foOriginalStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String](foTopic, new SimpleStringSchema(), props))
    // {"order_id":"order_01","order_no":"0001","date_create":"1584497993601","date_update":"1584497993601"}
    val foStream: DataStream[(String, String, Long, Long)] = foOriginalStream
      // 解析原始數(shù)據(jù)(json格式)
      .map { json =>
        val jsonObj: JSONObject = JSON.parseObject(json)
        (jsonObj.getString("order_id"), jsonObj.getString("order_no"), jsonObj.getLongValue("date_create"), jsonObj.getLongValue("date_update"))
      }
      .assignAscendingTimestamps(tp => tp._4) // 設(shè)置時間水位線

    // {"car_deal_id":"car_01","sale_order_no":"0001","car_attribute_id":"A0001","vin":"a1000","date_create":"1584497993601","date_update":"1584497993601"}
    val fcdv2OriginalStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String](fcdv2Topic, new SimpleStringSchema(), props))
    val fcdv2Stream: DataStream[(String, String, String, String, Long, Long)] = fcdv2OriginalStream
      .map { json =>
        val jsonObj: JSONObject = JSON.parseObject(json)
        (jsonObj.getString("car_deal_id"), jsonObj.getString("sale_order_no"), jsonObj.getString("car_attribute_id"), jsonObj.getString("vin"), jsonObj.getLongValue("date_create"), jsonObj.getLongValue("date_update"))
      }
      .assignAscendingTimestamps(tp => tp._6) // 設(shè)置時間水位線

    // 注冊成表
    tableEnv.registerDataStream("fo", foStream, 'order_id, 'order_no, 'date_create, 'date_update, 'foRowtime.rowtime)
    tableEnv.registerDataStream("fcdv2", fcdv2Stream, 'car_deal_id, 'sale_order_no, 'car_attribute_id, 'vin, 'date_create, 'date_update, 'fcdv2Rowtime.rowtime)

    // 設(shè)置Temporal Table的時間屬性和主鍵
    val fcdv2TemporalFunction: TemporalTableFunction = tableEnv.scan("fcdv2").createTemporalTableFunction("fcdv2Rowtime", "sale_order_no");
    //注冊TableFunction
    tableEnv.registerFunction("FCDV2_TEMPORAL_FUNCTION", fcdv2TemporalFunction)

    // 運(yùn)行SQL
    val sql =
      """
        |SELECT fo.`order_no`            AS `order_no`
        |     , fo.`date_create`         AS `fo_date_create`
        |     , fo.`date_update`         AS `fo_date_update`
        |     , fcdv2.`car_attribute_id` AS `fcdv2_car_attribute_id`
        |     , fcdv2.`vin`              AS `fcdv2_vin`
        |     , fcdv2.`date_create`      AS `fcdv2_date_create`
        |     , fcdv2.`date_update`      AS `fcdv2_date_update`
        |FROM fo
        |    , LATERAL TABLE(FCDV2_TEMPORAL_FUNCTION(fo.foRowtime)) as fcdv2
        |WHERE fo.order_no = fcdv2.sale_order_no
        |""".stripMargin


    val table = tableEnv.sqlQuery(sql)

    tableEnv.getConfig.setIdleStateRetentionTime(Time.minutes(1L), Time.minutes(7L)) // 結(jié)合使用狀態(tài)清理
    tableEnv.toAppendStream[(String, Long, Long, String, String, Long, Long)](table) // 添加流
      .print()

    //6、開始執(zhí)行
    tableEnv.execute(FlinkTemporalTable.getClass.getSimpleName)
  }
}

input

-- order
{"order_id":"order_01","order_no":"0001","date_create":"1584497993601","date_update":"1584497993601"}
{"order_id":"order_02","order_no":"0002","date_create":"1584497994602","date_update":"1584497994602"}
{"order_id":"order_03","order_no":"0003","date_create":"1584497995603","date_update":"1584497995603"}
{"order_id":"order_04","order_no":"0004","date_create":"1584497997604","date_update":"1584497997604"}
{"order_id":"order_05","order_no":"0005","date_create":"1584497998605","date_update":"1584497998605"}
{"order_id":"order_06","order_no":"0006","date_create":"1584497998606","date_update":"1584497998606"}
{"order_id":"order_07","order_no":"0007","date_create":"1584497998607","date_update":"1584497999899"}

-- deal
{"car_deal_id":"car_01","sale_order_no":"0001","car_attribute_id":"A0001","vin":"a1000","date_create":"1584497993601","date_update":"1584497993601"}
{"car_deal_id":"car_02","sale_order_no":"0002","car_attribute_id":"A0002","vin":"b1010","date_create":"1584497994602","date_update":"1584497994602"}
{"car_deal_id":"car_03","sale_order_no":"0003","car_attribute_id":"A0003","vin":"c1011","date_create":"1584497995603","date_update":"1584497995603"}
{"car_deal_id":"car_04","sale_order_no":"0004","car_attribute_id":"A0004","vin":"d1110","date_create":"1584497997604","date_update":"1584497997604"}
{"car_deal_id":"car_05","sale_order_no":"0005","car_attribute_id":"A0005","vin":"e1111","date_create":"1584497998605","date_update":"1584497998605"}
{"car_deal_id":"car_05","sale_order_no":"0005","car_attribute_id":"A0005","vin":"f000","date_create":"1584497998605","date_update":"1584497999788"}
{"car_deal_id":"car_04","sale_order_no":"0004","car_attribute_id":"A0004","vin":"d0001","date_create":"1584497997604","date_update":"1584497999799"}

output

注意:此處使用的時間語義為event_time,也就是wartmark來驅(qū)動表的join。且append表只能夠join上時間戳小于等于自己的時態(tài)表數(shù)據(jù)

temporal_table.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容