在上一篇StreamOperator源碼簡(jiǎn)析從源碼角度分析了StreamOperator以及其實(shí)現(xiàn)類(lèi),此篇幅主要分析一下如何自定義一個(gè)StreamOperator。
StreamOperator接口提供了其生命周期的抽象方法,例如初始化方法setup、open、initializeState,checkpoint相關(guān)方法prepareSnapshotPreBarrier、snapshotState,但是我們沒(méi)有必要去自己一一實(shí)現(xiàn)這些方法,可以繼承其抽象類(lèi)AbstractStreamOperator,覆蓋一些我們需要重寫(xiě)的方法。在上一篇分析中提到對(duì)于source端不需要接受上游數(shù)據(jù),也就不需要實(shí)現(xiàn)OneInputStreamOperator或者TwoInputStreamOperator接口,如果我們需要接收上游數(shù)據(jù)就必須實(shí)現(xiàn)這兩個(gè)接口中的一個(gè),主要看一個(gè)輸入還是兩個(gè)輸入來(lái)選擇。
案例:假設(shè)我們現(xiàn)在需要實(shí)現(xiàn)一個(gè)通用的定時(shí)、定量的輸出的StreamOperator。
實(shí)現(xiàn)步驟:
繼承AbstractStreamOperator抽象類(lèi),實(shí)現(xiàn)OneInputStreamOperator接口
重寫(xiě)open方法,調(diào)用flink 提供的定時(shí)接口,并且注冊(cè)定時(shí)器
重寫(xiě)initializeState/snapshotState方法,由于批量寫(xiě)需要做緩存,那么需要保證數(shù)據(jù)的一致性,將緩存數(shù)據(jù)存在狀態(tài)中
重寫(xiě)processElement方法,將數(shù)據(jù)存在緩存中,達(dá)到一定大小然后輸出
由于需要做定時(shí)調(diào)用,那么需要有一個(gè)定時(shí)調(diào)用的回調(diào)方法,那么定義的類(lèi)需要實(shí)現(xiàn)ProcessingTimeCallback接口,并且實(shí)現(xiàn)其onProcessingTime方法(關(guān)于flink定時(shí)可以參考定時(shí)系列文章)
代碼:
publicabstractclassCommonSinkOperator<T extendsSerializable>extendsAbstractStreamOperator<Object>implementsProcessingTimeCallback,OneInputStreamOperator<T,Object>{privateList<T> list;privateListState<T> listState;privateint batchSize;privatelong interval;privateProcessingTimeService processingTimeService;publicCommonSinkOperator(){}publicCommonSinkOperator(int batchSize,long interval){this.chainingStrategy =ChainingStrategy.ALWAYS;this.batchSize = batchSize;this.interval = interval;}@Overridepublicvoid open()throwsException{super.open();if(interval >0&& batchSize >1){//獲取AbstractStreamOperator里面的ProcessingTimeService, 該對(duì)象用來(lái)做定時(shí)調(diào)用//注冊(cè)定時(shí)器將當(dāng)前對(duì)象作為回調(diào)對(duì)象,需要實(shí)現(xiàn)ProcessingTimeCallback接口processingTimeService = getProcessingTimeService();long now = processingTimeService.getCurrentProcessingTime();processingTimeService.registerTimer(now + interval,this);}}//狀態(tài)恢復(fù)@Overridepublicvoid initializeState(StateInitializationContext context)throwsException{super.initializeState(context);this.list =newArrayList<T>();listState = context.getOperatorStateStore().getSerializableListState("batch-interval-sink");if(context.isRestored()){listState.get().forEach(x ->{list.add(x);});}}@Overridepublicvoid processElement(StreamRecord<T> element)throwsException{list.add(element.getValue());if(list.size()>= batchSize){saveRecords(list);}}//checkpoint@Overridepublicvoid snapshotState(StateSnapshotContext context)throwsException{super.snapshotState(context);if(list.size()>0){listState.clear();listState.addAll(list);}}//定時(shí)回調(diào)@Overridepublicvoid onProcessingTime(long timestamp)throwsException{if(list.size()>0){saveRecords(list);list.clear();}long now = processingTimeService.getCurrentProcessingTime();processingTimeService.registerTimer(now + interval,this);//再次注冊(cè)}publicabstractvoid saveRecords(List<T> datas);}
如何調(diào)用?直接使用dataStream.transform方式即可。
整體來(lái)說(shuō)這個(gè)demo相對(duì)來(lái)說(shuō)是比較簡(jiǎn)單的,但是這里面涉及的定時(shí)、狀態(tài)管理也是值得研究,比喻說(shuō)在這里定時(shí)我們直接選擇ProcessingTimeService,而沒(méi)有選擇InternalTimerService來(lái)完成定時(shí)注冊(cè),主要是由于InternalTimerService會(huì)做定時(shí)調(diào)用狀態(tài)保存,在窗口操作中需要任務(wù)失敗重啟仍然可以觸發(fā)定時(shí),但是在我們案例中不需要,直接下次啟動(dòng)重新注冊(cè)即可,因此選擇了ProcessingTimeService。
推薦閱讀
1. Flink中延時(shí)調(diào)用設(shè)計(jì)與實(shí)現(xiàn)
2. Flink維表關(guān)聯(lián)系列之Hbase維表關(guān)聯(lián):LRU策略
4. Flink exactly-once系列之事務(wù)性輸出實(shí)現(xiàn)
5. Flink時(shí)間系統(tǒng)系列之實(shí)例講解:如何做定時(shí)輸出
6. Flink實(shí)戰(zhàn):全局TopN分析與實(shí)現(xiàn)
7. Flink per-Job模式InfluxdbReporter上報(bào)JobName
關(guān)注回復(fù)Flink獲取更多信息~