前面的文章使用Flink批處理完成數(shù)據(jù)比對(對賬)二討論了使用Table API來處理數(shù)據(jù)比對的問題,但有些場景下還會有一些比較復雜的業(yè)務需求,如輸出的時候要將兩邊的數(shù)據(jù)合并在一起輸出,這個時候用Table API就不太好完成這樣的需求了,這就需要借助底層的DataSet API和Process Function。
這篇文章準備利用DataSet API來完成數(shù)據(jù)比對的需求,至于流數(shù)據(jù)的實時比對,下一篇文章將介紹。
編寫代碼:
核心的思想就是用兩個流(DataSet其實也是一種特殊的DataStream)中的數(shù)據(jù)進行處理,F(xiàn)link中就具備這樣的API。
import com.flink.vo.BankVo;
import com.flink.vo.DiffType;
import com.flink.vo.MergeVo;
import com.flink.vo.PayOrgVo;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.util.Collector;
/**
* 利用coGroup實現(xiàn)對賬需求<br/>
*/
public class BatchJob3 {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<PayOrgVo> payOrgSource = env.fromElements(new PayOrgVo("113", 1), new PayOrgVo("000", 2), new PayOrgVo("115", 33));
DataSet<BankVo> bankSource = env.fromElements(new BankVo("000", 2), new BankVo("115", 333), new BankVo("114", 4));
CoGroupOperator<PayOrgVo, BankVo, MergeVo> merge = payOrgSource.coGroup(bankSource)
.where(PayOrgVo::getOrderNo)//利用關聯(lián)字段把兩邊的數(shù)據(jù)關聯(lián)起來
.equalTo(BankVo::getOrderNo)
.with(new CoGroupFunction<PayOrgVo, BankVo, MergeVo>() {// with方法會將兩邊orderNo相同的數(shù)據(jù)放在同一個方法中處理
@Override
public void coGroup(Iterable<PayOrgVo> first, Iterable<BankVo> second, Collector<MergeVo> out) throws Exception {
// 進入到coGroup方法的數(shù)據(jù)都是orderNo相同的,如果關聯(lián)不上的即為空
PayOrgVo payOrgVo = null;
BankVo bankVo = null;
DiffType diffType = null;
// 以下代碼假定數(shù)據(jù)具有唯一性,即同一個orderNo下僅有一條數(shù)據(jù)
// 如果重復數(shù)據(jù),根據(jù)實際情況寫下面的代碼邏輯
for (PayOrgVo vo : first) {
payOrgVo = vo;
}
for (BankVo vo : second) {
bankVo = vo;
}
if (bankVo == null) {// 相同orderNo下,支付機構有數(shù)據(jù)
diffType = DiffType.F113;
} else if (payOrgVo == null) {// 銀行有數(shù)據(jù)
diffType = DiffType.F114;
} else if (payOrgVo.getPayment().equals(bankVo.getPayment())) {// 數(shù)據(jù)完全一致
diffType = DiffType.F000;
} else {// orderNo相同但payment不同
diffType = DiffType.F115;
}
// 返回數(shù)據(jù)
out.collect(new MergeVo(diffType, payOrgVo, bankVo));
}
});
merge.print();
}
}
通過coGroup、where和equalTo很容易講兩個流中orderNo相同的數(shù)據(jù)關聯(lián)在一起,coGroup和join不同,join只會關聯(lián)key相同的數(shù)據(jù),形成一個數(shù)據(jù)集。而coGroup遇到指定key只有一個數(shù)據(jù)集中有記錄的情況時,會將這個Group和空的Group關聯(lián)。
源碼
總結
可以看到,利用Flink將兩方數(shù)據(jù)關聯(lián)是非常容易的。筆者在實際業(yè)務場景中,有些需求不僅需要關聯(lián)兩方數(shù)據(jù)的,在下發(fā)回盤文件的時候,還要關聯(lián)上其他方數(shù)據(jù)的情況(如商戶數(shù)據(jù)),這種情況目前想到的辦法有:
- 在
CoGroupFunction中利用key去其他地方(文件、數(shù)據(jù)庫、緩存等)獲取到第三方的數(shù)據(jù),可能會引入其他中間件; - 將兩方數(shù)據(jù)關聯(lián)后,在去join第三方的數(shù)據(jù)(至于用join還是繼續(xù)用coGroup,需要讀者自己根據(jù)業(yè)務場景思考);
- 擴展Flink的Function,讓它能處理三方甚至多方數(shù)據(jù),目前剛學,待研究;
- ……
如果你有更好的想法,歡迎留言,多多指教。
轉載請注明出處