當前Spark Streaming-Streaming Join只支持:
- InnerJoin;
- LeftJoin;
- RightJoin;
整體思路
-
將Join的條件分為:
-
preJoinFilter
- leftSideOnly: 只依賴左表的過濾條件,針對左表input輸入,先校驗該條件,如果不滿足該條件一定不會關(guān)聯(lián)上;
- rightSideOnly:只依賴右表的過濾條件,針對右表input輸入,先校驗該條件,如果不滿足該條件一定不會關(guān)聯(lián)上;
postJoinFilter:同時依賴左右表的過濾條件,在滿足preJoinFilter并同另外一側(cè)表關(guān)聯(lián)后進行該過濾;
-
-
將滿足過濾條件的新增左表數(shù)據(jù)跟右表狀態(tài)數(shù)據(jù)做Join(詳細見代碼),同時更新所有的新增left表數(shù)據(jù)到狀態(tài) ,將結(jié)果輸出至leftOutputIter,結(jié)果分為兩部分:
關(guān)聯(lián)上并滿足過濾條件的數(shù)據(jù):這部分數(shù)據(jù)Inner/Left/Right都是相同的;
-
關(guān)聯(lián)不上的數(shù)據(jù),分為兩種情況:
暫時沒有關(guān)聯(lián)上的,這個時候僅僅將左流新增數(shù)據(jù)寫入左表狀態(tài)表,不會emit數(shù)據(jù);
-
永不會關(guān)聯(lián)上的(不滿足leftSideOnly),針對Inner/Left/Right產(chǎn)生不同的結(jié)果:
a. LeftJoin: 輸出join with null;
b. Inner/RightJoin: 輸出空;
將滿足過濾條件的新增右表數(shù)據(jù)跟左表狀態(tài)數(shù)據(jù)做Join,同時更新所有的新增right表數(shù)據(jù)到狀態(tài),同時會生成新增左表跟新增右表的關(guān)聯(lián)數(shù)據(jù)(因為左表的新增數(shù)據(jù)已經(jīng)保留到了左表的狀態(tài)數(shù)據(jù)中),結(jié)果輸出至rightOutputIter(類似于上步驟);
針對InnerJoin的輸出即為leftOutputIter + rightOutputIter;
-
針對LeftJoin同時還需要考左表已經(jīng)過期的數(shù)據(jù),這些數(shù)據(jù)分兩種情況:
- 同右表狀態(tài)數(shù)據(jù)(包括當前批次)沒有關(guān)聯(lián):這些數(shù)據(jù)應該join with null;
- 同右表狀態(tài)數(shù)據(jù)(包括當前批次)有關(guān)聯(lián): 應該忽略掉,因為這些數(shù)據(jù)理論上在某些時間點上會Join上,所以不能join with null;
- 同時這些過期數(shù)據(jù)在該批次會被清理掉;
針對RightJoin同時還需要考左表已經(jīng)過期的數(shù)據(jù),類似LeftOuterJoin;
左右表關(guān)聯(lián)代碼實現(xiàn)
?
def storeAndJoinWithOtherSide(
otherSideJoiner: OneSideHashJoiner)(
generateJoinedRow: (InternalRow, InternalRow) => JoinedRow):
Iterator[InternalRow] = {
// Step1: 過濾不符合watermark要求的數(shù)據(jù)
val watermarkAttribute = inputAttributes.find(_.metadata.contains(delayKey))
val nonLateRows =
WatermarkSupport.watermarkExpression(watermarkAttribute, eventTimeWatermark) match {
case Some(watermarkExpr) =>
val predicate = newPredicate(watermarkExpr, inputAttributes)
inputIter.filter { row => !predicate.eval(row) }
case None =>
inputIter
}
nonLateRows.flatMap { row =>
val thisRow = row.asInstanceOf[UnsafeRow]
// Step2: 如果輸入不滿足preJoinFilter條件,即針對Left表不滿足只依賴左表的Join條件時:
// - 這種場景下該Row不會滿足Join的條件,所以不會保存到狀態(tài)數(shù)據(jù)中;
// - 同時根據(jù)Join的類型生成關(guān)聯(lián)不上后的數(shù)據(jù);
if (preJoinFilter(thisRow)) {
val key = keyGenerator(thisRow)
// Step3: 從另外一個狀態(tài)表中獲取關(guān)聯(lián)數(shù)據(jù)進行postJoinFilter過濾后,作為結(jié)果輸出
val outputIter = otherSideJoiner.joinStateManager.get(key).map { thatRow =>
generateJoinedRow(thisRow, thatRow)
}.filter(postJoinFilter)
// Step4: 將滿足條件的狀態(tài)數(shù)據(jù)更新至狀態(tài)結(jié)果中;
val shouldAddToState = // add only if both removal predicates do not match
!stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow)
if (shouldAddToState) {
joinStateManager.append(key, thisRow)
updatedStateRowsCount += 1
}
outputIter
} else {
// 無法關(guān)聯(lián)的數(shù)據(jù),根據(jù)Join的類型生成相應的數(shù)據(jù)
joinSide match {
case LeftSide if joinType == LeftOuter =>
Iterator(generateJoinedRow(thisRow, nullRight))
case RightSide if joinType == RightOuter =>
Iterator(generateJoinedRow(thisRow, nullLeft))
case _ => Iterator()
}
}
}
}