使用Flink批處理完成數(shù)據(jù)比對(對賬)三

前面的文章使用Flink批處理完成數(shù)據(jù)比對(對賬)二討論了使用Table API來處理數(shù)據(jù)比對的問題,但有些場景下還會有一些比較復雜的業(yè)務需求,如輸出的時候要將兩邊的數(shù)據(jù)合并在一起輸出,這個時候用Table API就不太好完成這樣的需求了,這就需要借助底層的DataSet API和Process Function。

這篇文章準備利用DataSet API來完成數(shù)據(jù)比對的需求,至于流數(shù)據(jù)的實時比對,下一篇文章將介紹。

編寫代碼:

核心的思想就是用兩個流(DataSet其實也是一種特殊的DataStream)中的數(shù)據(jù)進行處理,F(xiàn)link中就具備這樣的API。

import com.flink.vo.BankVo;
import com.flink.vo.DiffType;
import com.flink.vo.MergeVo;
import com.flink.vo.PayOrgVo;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.util.Collector;

/**
 * 利用coGroup實現(xiàn)對賬需求<br/>
 */
public class BatchJob3 {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<PayOrgVo> payOrgSource = env.fromElements(new PayOrgVo("113", 1), new PayOrgVo("000", 2), new PayOrgVo("115", 33));
        DataSet<BankVo> bankSource = env.fromElements(new BankVo("000", 2), new BankVo("115", 333), new BankVo("114", 4));

        CoGroupOperator<PayOrgVo, BankVo, MergeVo> merge = payOrgSource.coGroup(bankSource)
                .where(PayOrgVo::getOrderNo)//利用關聯(lián)字段把兩邊的數(shù)據(jù)關聯(lián)起來
                .equalTo(BankVo::getOrderNo)
                .with(new CoGroupFunction<PayOrgVo, BankVo, MergeVo>() {// with方法會將兩邊orderNo相同的數(shù)據(jù)放在同一個方法中處理
                    @Override
                    public void coGroup(Iterable<PayOrgVo> first, Iterable<BankVo> second, Collector<MergeVo> out) throws Exception {
                        // 進入到coGroup方法的數(shù)據(jù)都是orderNo相同的,如果關聯(lián)不上的即為空
                        PayOrgVo payOrgVo = null;
                        BankVo bankVo = null;
                        DiffType diffType = null;
                        // 以下代碼假定數(shù)據(jù)具有唯一性,即同一個orderNo下僅有一條數(shù)據(jù)
                        // 如果重復數(shù)據(jù),根據(jù)實際情況寫下面的代碼邏輯
                        for (PayOrgVo vo : first) {
                            payOrgVo = vo;
                        }
                        for (BankVo vo : second) {
                            bankVo = vo;
                        }

                        if (bankVo == null) {// 相同orderNo下,支付機構有數(shù)據(jù)
                            diffType = DiffType.F113;
                        } else if (payOrgVo == null) {// 銀行有數(shù)據(jù)
                            diffType = DiffType.F114;
                        } else if (payOrgVo.getPayment().equals(bankVo.getPayment())) {// 數(shù)據(jù)完全一致
                            diffType = DiffType.F000;
                        } else {// orderNo相同但payment不同
                            diffType = DiffType.F115;
                        }
                        // 返回數(shù)據(jù)
                        out.collect(new MergeVo(diffType, payOrgVo, bankVo));
                    }
                });

        merge.print();
    }
}

通過coGroup、whereequalTo很容易講兩個流中orderNo相同的數(shù)據(jù)關聯(lián)在一起,coGroupjoin不同,join只會關聯(lián)key相同的數(shù)據(jù),形成一個數(shù)據(jù)集。而coGroup遇到指定key只有一個數(shù)據(jù)集中有記錄的情況時,會將這個Group和空的Group關聯(lián)。

源碼

源碼

總結

可以看到,利用Flink將兩方數(shù)據(jù)關聯(lián)是非常容易的。筆者在實際業(yè)務場景中,有些需求不僅需要關聯(lián)兩方數(shù)據(jù)的,在下發(fā)回盤文件的時候,還要關聯(lián)上其他方數(shù)據(jù)的情況(如商戶數(shù)據(jù)),這種情況目前想到的辦法有:

  1. CoGroupFunction中利用key去其他地方(文件、數(shù)據(jù)庫、緩存等)獲取到第三方的數(shù)據(jù),可能會引入其他中間件;
  2. 將兩方數(shù)據(jù)關聯(lián)后,在去join第三方的數(shù)據(jù)(至于用join還是繼續(xù)用coGroup,需要讀者自己根據(jù)業(yè)務場景思考);
  3. 擴展Flink的Function,讓它能處理三方甚至多方數(shù)據(jù),目前剛學,待研究;
  4. ……

如果你有更好的想法,歡迎留言,多多指教。
轉載請注明出處

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容