Flink自定義StreamOperator

在上一篇StreamOperator源碼簡(jiǎn)析從源碼角度分析了StreamOperator以及其實(shí)現(xiàn)類(lèi),此篇幅主要分析一下如何自定義一個(gè)StreamOperator。

StreamOperator接口提供了其生命周期的抽象方法,例如初始化方法setup、open、initializeState,checkpoint相關(guān)方法prepareSnapshotPreBarrier、snapshotState,但是我們沒(méi)有必要去自己一一實(shí)現(xiàn)這些方法,可以繼承其抽象類(lèi)AbstractStreamOperator,覆蓋一些我們需要重寫(xiě)的方法。在上一篇分析中提到對(duì)于source端不需要接受上游數(shù)據(jù),也就不需要實(shí)現(xiàn)OneInputStreamOperator或者TwoInputStreamOperator接口,如果我們需要接收上游數(shù)據(jù)就必須實(shí)現(xiàn)這兩個(gè)接口中的一個(gè),主要看一個(gè)輸入還是兩個(gè)輸入來(lái)選擇。
案例:假設(shè)我們現(xiàn)在需要實(shí)現(xiàn)一個(gè)通用的定時(shí)、定量的輸出的StreamOperator。
實(shí)現(xiàn)步驟:

  1. 繼承AbstractStreamOperator抽象類(lèi),實(shí)現(xiàn)OneInputStreamOperator接口

  2. 重寫(xiě)open方法,調(diào)用flink 提供的定時(shí)接口,并且注冊(cè)定時(shí)器

  3. 重寫(xiě)initializeState/snapshotState方法,由于批量寫(xiě)需要做緩存,那么需要保證數(shù)據(jù)的一致性,將緩存數(shù)據(jù)存在狀態(tài)中

  4. 重寫(xiě)processElement方法,將數(shù)據(jù)存在緩存中,達(dá)到一定大小然后輸出

  5. 由于需要做定時(shí)調(diào)用,那么需要有一個(gè)定時(shí)調(diào)用的回調(diào)方法,那么定義的類(lèi)需要實(shí)現(xiàn)ProcessingTimeCallback接口,并且實(shí)現(xiàn)其onProcessingTime方法(關(guān)于flink定時(shí)可以參考定時(shí)系列文章)

代碼:

  1. publicabstractclassCommonSinkOperator<T extendsSerializable>extendsAbstractStreamOperator<Object>

  2. implementsProcessingTimeCallback,OneInputStreamOperator<T,Object>{

  3. privateList<T> list;

  4. privateListState<T> listState;

  5. privateint batchSize;

  6. privatelong interval;

  7. privateProcessingTimeService processingTimeService;

  8. publicCommonSinkOperator(){

  9. }

  10. publicCommonSinkOperator(int batchSize,long interval){

  11. this.chainingStrategy =ChainingStrategy.ALWAYS;

  12. this.batchSize = batchSize;

  13. this.interval = interval;

  14. }

  15. @Overridepublicvoid open()throwsException{

  16. super.open();

  17. if(interval >0&& batchSize >1){

  18. //獲取AbstractStreamOperator里面的ProcessingTimeService, 該對(duì)象用來(lái)做定時(shí)調(diào)用

  19. //注冊(cè)定時(shí)器將當(dāng)前對(duì)象作為回調(diào)對(duì)象,需要實(shí)現(xiàn)ProcessingTimeCallback接口

  20. processingTimeService = getProcessingTimeService();

  21. long now = processingTimeService.getCurrentProcessingTime();

  22. processingTimeService.registerTimer(now + interval,this);

  23. }

  24. }

  25. //狀態(tài)恢復(fù)

  26. @Overridepublicvoid initializeState(StateInitializationContext context)throwsException{

  27. super.initializeState(context);

  28. this.list =newArrayList<T>();

  29. listState = context.getOperatorStateStore().getSerializableListState("batch-interval-sink");

  30. if(context.isRestored()){

  31. listState.get().forEach(x ->{

  32. list.add(x);

  33. });

  34. }

  35. }

  36. @Overridepublicvoid processElement(StreamRecord<T> element)throwsException{

  37. list.add(element.getValue());

  38. if(list.size()>= batchSize){

  39. saveRecords(list);

  40. }

  41. }

  42. //checkpoint

  43. @Overridepublicvoid snapshotState(StateSnapshotContext context)throwsException{

  44. super.snapshotState(context);

  45. if(list.size()>0){

  46. listState.clear();

  47. listState.addAll(list);

  48. }

  49. }

  50. //定時(shí)回調(diào)

  51. @Overridepublicvoid onProcessingTime(long timestamp)throwsException{

  52. if(list.size()>0){

  53. saveRecords(list);

  54. list.clear();

  55. }

  56. long now = processingTimeService.getCurrentProcessingTime();

  57. processingTimeService.registerTimer(now + interval,this);//再次注冊(cè)

  58. }

  59. publicabstractvoid saveRecords(List<T> datas);

  60. }

如何調(diào)用?直接使用dataStream.transform方式即可。

整體來(lái)說(shuō)這個(gè)demo相對(duì)來(lái)說(shuō)是比較簡(jiǎn)單的,但是這里面涉及的定時(shí)、狀態(tài)管理也是值得研究,比喻說(shuō)在這里定時(shí)我們直接選擇ProcessingTimeService,而沒(méi)有選擇InternalTimerService來(lái)完成定時(shí)注冊(cè),主要是由于InternalTimerService會(huì)做定時(shí)調(diào)用狀態(tài)保存,在窗口操作中需要任務(wù)失敗重啟仍然可以觸發(fā)定時(shí),但是在我們案例中不需要,直接下次啟動(dòng)重新注冊(cè)即可,因此選擇了ProcessingTimeService。

推薦閱讀

1. Flink中延時(shí)調(diào)用設(shè)計(jì)與實(shí)現(xiàn)

2. Flink維表關(guān)聯(lián)系列之Hbase維表關(guān)聯(lián):LRU策略

3. 你應(yīng)該了解的Watermark

4. Flink exactly-once系列之事務(wù)性輸出實(shí)現(xiàn)

5. Flink時(shí)間系統(tǒng)系列之實(shí)例講解:如何做定時(shí)輸出

6. Flink實(shí)戰(zhàn):全局TopN分析與實(shí)現(xiàn)

7. Flink per-Job模式InfluxdbReporter上報(bào)JobName

8. Flink SQL自定義聚合函數(shù)

image

關(guān)注回復(fù)Flink獲取更多信息~

image
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容