Flink Table中雙流Join的實(shí)現(xiàn)

Regular Join

Regular joins are the most generic type of join in which any new records or changes to either side of the join input are visible and are affecting the whole join result. For example, if there is a new record on the left side, it will be joined with all of the previous and future records on the right side.

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id

NonWindowJoin

  • Stream-Sream Join的BaseClass(NonWindow);
  • leftState/rightState: <Row, <Long, Long>>類(lèi)型的狀態(tài)結(jié)構(gòu),其中Value保存的是這個(gè)Row對(duì)應(yīng)的行數(shù)及expiredTime;
NonWindowInnerJoin
  • 繼承NonWindowJoin;
  • Stream-Stream中的InnerJoin的主要實(shí)現(xiàn)方式;
  • 實(shí)現(xiàn)的基本邏輯,具體邏輯可以參考code:
    • 將輸入Input添加state,或者從state中Retract輸入;
    • 向另外的狀態(tài)中查詢(xún)進(jìn)行關(guān)聯(lián)處理;
    • 如果設(shè)定了保留時(shí)間,當(dāng)保留時(shí)間過(guò)期時(shí)會(huì)觸發(fā)狀態(tài)數(shù)據(jù)的回收;
  • 其中processElement函數(shù)為left流和right流的共用函數(shù),當(dāng)是左流是isLeft為true,為右流是isLeft是false;
  • 這個(gè)函數(shù)中有幾個(gè)疑點(diǎn):
  • 針對(duì)InnerJoin是否有必要每個(gè)Input都要遍歷所有的對(duì)側(cè)狀態(tài)?
  • 清理對(duì)側(cè)狀態(tài)數(shù)據(jù)判斷是否過(guò)期是基于ProcessTime?
  override def processElement(
      value: CRow,
      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
      out: Collector[CRow],
      timerState: ValueState[Long],
      currentSideState: MapState[Row, JTuple2[Long, Long]],
      otherSideState: MapState[Row, JTuple2[Long, Long]],
      isLeft: Boolean): Unit = {

    val inputRow = value.row
    // Step1: 更新?tīng)顟B(tài):
    // - 寫(xiě)入Input到State或者從State中進(jìn)行回撤;
   // -  設(shè)定該row的expiredTime;
    updateCurrentSide(value, ctx, timerState, currentSideState)

    cRowWrapper.setCollector(out)
    cRowWrapper.setChange(value.change)

   // Step2: 遍歷對(duì)面State狀態(tài):
  // 疑點(diǎn): 針對(duì)InnerJoin是否有必要每個(gè)Input都要遍歷所有的對(duì)側(cè)狀態(tài),這樣當(dāng)狀態(tài)值很大時(shí)性能會(huì)驟減?
    val otherSideIterator = otherSideState.iterator()
    // join other side data
    while (otherSideIterator.hasNext) {
      val otherSideEntry = otherSideIterator.next()
      val otherSideRow = otherSideEntry.getKey
      val otherSideCntAndExpiredTime = otherSideEntry.getValue
      cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
     // Step3: 調(diào)用JoinFunction執(zhí)行真實(shí)的Join任務(wù)
      callJoinFunction(inputRow, isLeft, otherSideRow, cRowWrapper)
      // clear expired data. Note: clear after join to keep closer to the original semantics
     // Step4: 清理對(duì)側(cè)狀態(tài)數(shù)據(jù)
      if (stateCleaningEnabled && curProcessTime >= otherSideCntAndExpiredTime.f1) {
        otherSideIterator.remove()
      }
    }
  }

Time-windowed Joins

A time-windowed join is defined by a join predicate, that checks if the time attributes of the input records are within certain time constraints, i.e., a time window.

  • 基于時(shí)間窗口的Join: join條件中帶有TimeWindow,如下:
SELECT *
FROM
  Orders o,
  Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

DataStreamWindowJoin

  • Windowed Join也支持Append輸出數(shù)據(jù),不支持Update輸出;
    • Update Mode: 在Flink中,如果DataStreamRel算子支持producesUpdatesproducesRetractions,則每次輸入都會(huì)產(chǎn)生數(shù)據(jù)(可能不是最終完整的結(jié)果),默認(rèn)false;
      • DataStreamGroupAggregate: 沒(méi)有window的Group時(shí),producesUpdates=true,對(duì)于Agg即使不是最終結(jié)果也需要把結(jié)果輸出;
      • DataStreamJoin: 當(dāng)不是InnerJoin時(shí)支持producesRetractions;
    • Append Mode: 在Flink中,除了上述算子之外的算子則認(rèn)為是Append Mode。
WindowBound的計(jì)算
  • 計(jì)算join時(shí)需要先確認(rèn)WindowsBound的范圍;
  • WindowBound是基于左表的左沿(為負(fù)值)、右沿取值范圍;
  • Join的窗口長(zhǎng)度即為右沿 - 左沿所得的值;

其推算邏輯如下:

--sql1: [-3600000, 3600000]
SELECT t1.a, t2.b 
FROM MyTable as t1 
join MyTable2 as t2 
on t1.a = t2.a 
and  t1.proctime between t2.proctime - interval '1' hour  and t2.proctime + interval '1' hour;

--sql2: [-999, 999]
t1.proctime > t2.proctime - interval '1' second and t1.proctime < t2.proctime + interval '1' second

--sql3: [-1999, 1999]
t2.c > t1.c - interval '2' second and t2.c < t1.c + interval '2' second

RowTimeBoundedStreamJoin(Join的實(shí)現(xiàn)邏輯)

下面基于處理左流的邏輯分析執(zhí)行Join的實(shí)現(xiàn)。

涉及到的狀態(tài)管理
  • leftCache: MapStateDescriptor[Long, JList[JTuple2[Row, Boolean]]], 左表的狀態(tài)結(jié)構(gòu):
    • Key值:左表的eventTime;
    • Value值: [Row行,該行是否emit過(guò)];
  • rightCache: MapStateDescriptor[Long, JList[JTuple2[Row, Boolean]]];
  • leftTimerState: ValueStateDescriptor[Long], 左表的Timer狀態(tài);
  • rightTimerState: ValueStateDescriptor[Long];
涉及到的幾個(gè)時(shí)間變量
  • leftOperatorTime/rightOperatorTime
    • 左表表的操作時(shí)間,兩個(gè)值相同;
    • 對(duì)于ProcTimeBoundedStreamJoin: 即為當(dāng)前的處理時(shí)間;
    • 對(duì)于RowTimeBoundedStreamJoin: 即為當(dāng)前的watermark時(shí)間;
  • rightQualifiedLowerBound/rightQualifiedUpperBound: 通過(guò)左表的WindowBound計(jì)算右表的左沿值(可以理解為對(duì)稱(chēng)的結(jié)構(gòu))
  • rightExpirationTime:
    • 計(jì)算公式:leftOperatorTime - rightRelativeSize(BoundWindow的右值) - allowedLateness - 1
  • rightTime: 從右表的狀態(tài)中獲取每行的row的真實(shí)時(shí)間戳;
步驟梳理
  • 判斷是否需要同右表狀態(tài)數(shù)據(jù)做Join:
    • 符合Join右表的狀態(tài)條件, 遍歷右表的狀態(tài)數(shù)據(jù):
      • 執(zhí)行Join算子;
      • 判斷右值數(shù)據(jù)是否過(guò)期,如果過(guò)期,如果是rightJoin/outerJoin進(jìn)行emit null,同時(shí)將該值刪除掉;
  • 判斷是否將輸入數(shù)據(jù)寫(xiě)入到狀態(tài)數(shù)據(jù);
  • 如果不寫(xiě)入狀態(tài)數(shù)據(jù),如果是leftJoin/outerJoin則emit null;
/**
    * Process rows from the left stream.
    */
  override def processElement1(
      cRowValue: CRow,
      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
      out: Collector[CRow]): Unit = {

    joinCollector.innerCollector = out
    // 更新OperatorTime,即leftOperatorTime/rightOperatorTime的值;
    updateOperatorTime(ctx)
    val leftRow = cRowValue.row
    val timeForLeftRow: Long = getTimeForLeftStream(ctx, leftRow)
    // 通過(guò)左表的WindowBound計(jì)算右表的左沿值(可以理解為對(duì)稱(chēng)的結(jié)構(gòu))
    val rightQualifiedLowerBound: Long = timeForLeftRow - rightRelativeSize
    // 通過(guò)左表的WindowBound計(jì)算右表的右沿值
    val rightQualifiedUpperBound: Long = timeForLeftRow + leftRelativeSize
    var emitted: Boolean = false

    // Check if we need to join the current row against cached rows of the right input.
    // The condition here should be rightMinimumTime < rightQualifiedUpperBound.
    // We use rightExpirationTime as an approximation of the rightMinimumTime here,
    // since rightExpirationTime <= rightMinimumTime is always true.
    // 通過(guò)上次計(jì)算的右表ExpriedTime來(lái)評(píng)估是否需要跟左表做Join:
    // - 從反面看:因?yàn)閞ightExpirationTime是上次計(jì)算的值,真實(shí)值一定比該值大,如果該值都不小于rightQualifiedUpperBound,說(shuō)明右流很快了,左右流的Join的窗口范圍早已經(jīng)過(guò)了,就不需要Join了;
    if (rightExpirationTime < rightQualifiedUpperBound) {
      // Upper bound of current join window has not passed the cache expiration time yet.
      // There might be qualifying rows in the cache that the current row needs to be joined with.
      // leftOperatorTime: 左表的操作時(shí)間;
      // - 如果是ProcTimeBoundedStreamJoin處理:該時(shí)間即為processiong time,當(dāng)前處理的時(shí)間;
      // - 如果是RowTimeBoundedStreamJoin處理:該時(shí)間即為event time,返回的是當(dāng)前的watermark(?);
      // calExpirationTime函數(shù)的邏輯即為:leftOperatorTime
      rightExpirationTime = calExpirationTime(leftOperatorTime, rightRelativeSize)
      // Join the leftRow with rows from the right cache.
      // LeftRow同右表狀態(tài)做Join;
      val rightIterator = rightCache.iterator()
      while (rightIterator.hasNext) {
        val rightEntry = rightIterator.next
        val rightTime = rightEntry.getKey

        // 確認(rèn)右表狀態(tài)的范圍在預(yù)期的WindowFbound范圍之內(nèi)
        if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) {
          val rightRows = rightEntry.getValue
          var i = 0
          var entryUpdated = false
          while (i < rightRows.size) {
            joinCollector.reset()
            val tuple = rightRows.get(i)
            // 執(zhí)行Join的邏輯
            joinFunction.join(leftRow, tuple.f0, joinCollector)
            // 標(biāo)記該行是否emitted過(guò),該flag可以用作在最后確認(rèn)是否在left join/outer join時(shí)最終是否emit null值使用:
            // 如果已經(jīng)emitted過(guò),則最終不會(huì)emit null;
            emitted ||= joinCollector.emitted
            if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) {
              if (!tuple.f1 && joinCollector.emitted) {
                // Mark the right row as being successfully joined and emitted.
                tuple.f1 = true
                entryUpdated = true
              }
            }
            i += 1
          }
          if (entryUpdated) {
            // Write back the edited entry (mark emitted) for the right cache.
            rightEntry.setValue(rightRows)
          }
        }
        // 處理有表狀態(tài)key已經(jīng)過(guò)期的情況;
        if (rightTime <= rightExpirationTime) {
          if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) {
            val rightRows = rightEntry.getValue
            var i = 0
            while (i < rightRows.size) {
              val tuple = rightRows.get(i)
              if (!tuple.f1) {
                // Emit a null padding result if the right row has never been successfully joined.
                joinCollector.collect(paddingUtil.padRight(tuple.f0))
              }
              i += 1
            }
          }
          // eager remove
          rightIterator.remove()
        } // We could do the short-cutting optimization here once we get a state with ordered keys.
      }
    }

    // Check if we need to cache the current row.
    if (rightOperatorTime < rightQualifiedUpperBound) {
      // Operator time of right stream has not exceeded the upper window bound of the current
      // row. Put it into the left cache, since later coming records from the right stream are
      // expected to be joined with it.
      var leftRowList = leftCache.get(timeForLeftRow)
      if (null == leftRowList) {
        leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
      }
      leftRowList.add(JTuple2.of(leftRow, emitted))
      leftCache.put(timeForLeftRow, leftRowList)
      if (rightTimerState.value == 0) {
        // Register a timer on the RIGHT stream to remove rows.
        registerCleanUpTimer(ctx, timeForLeftRow, leftRow = true)
      }
    } else if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) {
      if (!emitted) {
        // Emit a null padding result if the left row is not cached and successfully joined.
        joinCollector.collect(paddingUtil.padLeft(leftRow))
      }
    }
  

疑惑

  • 針對(duì)生產(chǎn)級(jí)別,窗口較大時(shí)(狀態(tài)數(shù)據(jù)的積累會(huì)很大),每條流數(shù)據(jù)符合條件時(shí),都會(huì)遍歷對(duì)側(cè)所有的狀態(tài)數(shù)據(jù),這個(gè)性能是不是會(huì)很低?

參考:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容