Spark Streaming-Streaming Join 實現(xiàn)梳理

當前Spark Streaming-Streaming Join只支持:

  • InnerJoin;
  • LeftJoin;
  • RightJoin;

整體思路

  1. 將Join的條件分為:

    • preJoinFilter

      • leftSideOnly: 只依賴左表的過濾條件,針對左表input輸入,先校驗該條件,如果不滿足該條件一定不會關(guān)聯(lián)上;
      • rightSideOnly:只依賴右表的過濾條件,針對右表input輸入,先校驗該條件,如果不滿足該條件一定不會關(guān)聯(lián)上;
    • postJoinFilter:同時依賴左右表的過濾條件,在滿足preJoinFilter并同另外一側(cè)表關(guān)聯(lián)后進行該過濾;

  2. 將滿足過濾條件的新增左表數(shù)據(jù)跟右表狀態(tài)數(shù)據(jù)做Join(詳細見代碼),同時更新所有的新增left表數(shù)據(jù)到狀態(tài) ,將結(jié)果輸出至leftOutputIter,結(jié)果分為兩部分:

    1. 關(guān)聯(lián)上并滿足過濾條件的數(shù)據(jù):這部分數(shù)據(jù)Inner/Left/Right都是相同的;

    2. 關(guān)聯(lián)不上的數(shù)據(jù),分為兩種情況:

      • 暫時沒有關(guān)聯(lián)上的,這個時候僅僅將左流新增數(shù)據(jù)寫入左表狀態(tài)表,不會emit數(shù)據(jù);

      • 永不會關(guān)聯(lián)上的(不滿足leftSideOnly),針對Inner/Left/Right產(chǎn)生不同的結(jié)果:

        a. LeftJoin: 輸出join with null;
        b. Inner/RightJoin: 輸出空;

  3. 將滿足過濾條件的新增右表數(shù)據(jù)跟左表狀態(tài)數(shù)據(jù)做Join,同時更新所有的新增right表數(shù)據(jù)到狀態(tài),同時會生成新增左表跟新增右表的關(guān)聯(lián)數(shù)據(jù)(因為左表的新增數(shù)據(jù)已經(jīng)保留到了左表的狀態(tài)數(shù)據(jù)中),結(jié)果輸出至rightOutputIter(類似于上步驟);

  4. 針對InnerJoin的輸出即為leftOutputIter + rightOutputIter;

  5. 針對LeftJoin同時還需要考左表已經(jīng)過期的數(shù)據(jù),這些數(shù)據(jù)分兩種情況:

    1. 同右表狀態(tài)數(shù)據(jù)(包括當前批次)沒有關(guān)聯(lián):這些數(shù)據(jù)應該join with null;
    2. 同右表狀態(tài)數(shù)據(jù)(包括當前批次)有關(guān)聯(lián): 應該忽略掉,因為這些數(shù)據(jù)理論上在某些時間點上會Join上,所以不能join with null;
    3. 同時這些過期數(shù)據(jù)在該批次會被清理掉;
  6. 針對RightJoin同時還需要考左表已經(jīng)過期的數(shù)據(jù),類似LeftOuterJoin;

左右表關(guān)聯(lián)代碼實現(xiàn)

?

 def storeAndJoinWithOtherSide(
        otherSideJoiner: OneSideHashJoiner)(
        generateJoinedRow: (InternalRow, InternalRow) => JoinedRow):
    Iterator[InternalRow] = {
      // Step1: 過濾不符合watermark要求的數(shù)據(jù)
      val watermarkAttribute = inputAttributes.find(_.metadata.contains(delayKey))
      val nonLateRows =
        WatermarkSupport.watermarkExpression(watermarkAttribute, eventTimeWatermark) match {
          case Some(watermarkExpr) =>
            val predicate = newPredicate(watermarkExpr, inputAttributes)
            inputIter.filter { row => !predicate.eval(row) }
          case None =>
            inputIter
        }
      
      nonLateRows.flatMap { row =>
        val thisRow = row.asInstanceOf[UnsafeRow]
        // Step2: 如果輸入不滿足preJoinFilter條件,即針對Left表不滿足只依賴左表的Join條件時:
        // - 這種場景下該Row不會滿足Join的條件,所以不會保存到狀態(tài)數(shù)據(jù)中;
        // - 同時根據(jù)Join的類型生成關(guān)聯(lián)不上后的數(shù)據(jù);
        if (preJoinFilter(thisRow)) {
          val key = keyGenerator(thisRow)
          // Step3: 從另外一個狀態(tài)表中獲取關(guān)聯(lián)數(shù)據(jù)進行postJoinFilter過濾后,作為結(jié)果輸出
          val outputIter = otherSideJoiner.joinStateManager.get(key).map { thatRow =>
            generateJoinedRow(thisRow, thatRow)
          }.filter(postJoinFilter)
          
          // Step4: 將滿足條件的狀態(tài)數(shù)據(jù)更新至狀態(tài)結(jié)果中;
          val shouldAddToState = // add only if both removal predicates do not match
            !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow)
          if (shouldAddToState) {
            joinStateManager.append(key, thisRow)
            updatedStateRowsCount += 1
          }
          outputIter
        } else {
          // 無法關(guān)聯(lián)的數(shù)據(jù),根據(jù)Join的類型生成相應的數(shù)據(jù)
          joinSide match {
            case LeftSide if joinType == LeftOuter =>
              Iterator(generateJoinedRow(thisRow, nullRight))
            case RightSide if joinType == RightOuter =>
              Iterator(generateJoinedRow(thisRow, nullLeft))
            case _ => Iterator()
          }
        }
      }
    }

參考:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容