基于Flink的實(shí)時(shí)數(shù)據(jù)流構(gòu)建

業(yè)務(wù)詳解

處理數(shù)據(jù)主要包括的是從互聯(lián)網(wǎng)上采集來(lái)的數(shù)據(jù),包括常見(jiàn)的新聞、微博、論壇、貼吧、博客、微博、微信等信源,對(duì)原始格式進(jìn)行實(shí)時(shí)響應(yīng)處理,以期望滿足業(yè)務(wù)系統(tǒng)使用,這便是當(dāng)前數(shù)據(jù)流中數(shù)據(jù)處理的服務(wù)宗旨。

技術(shù)點(diǎn)概覽

利用廣播變量動(dòng)態(tài)更新規(guī)則

  • 業(yè)務(wù)場(chǎng)景
    數(shù)據(jù)流中會(huì)動(dòng)態(tài)實(shí)時(shí)加載規(guī)則表,而這種實(shí)時(shí)性要求并不強(qiáng)烈,可以是分鐘級(jí)別、小時(shí)級(jí)別;所以沒(méi)有必要浪費(fèi)大量資源對(duì)于每一條數(shù)據(jù)都進(jìn)行一次數(shù)據(jù)庫(kù)請(qǐng)求。所以適合的應(yīng)用場(chǎng)景是以固定的頻率更新各個(gè)算子中的規(guī)則,當(dāng)然這種規(guī)則所占用的內(nèi)存消耗是比較小的。
  • 技術(shù)組件 - BroadcastVariable
    前置條件: Flink 中多并行度的情況下,每個(gè)算子或者不同算子運(yùn)行所在的 Slot 不一致,這就導(dǎo)致它們不會(huì)共享同一個(gè)內(nèi)存,也就不可以通過(guò)靜態(tài)變量的方式去獲取這些共享變量值。
    BroadcastVariable: 可以理解是一個(gè)公共的共享變量(可能是固定不變的數(shù)據(jù)集合,也可能是動(dòng)態(tài)變化的數(shù)據(jù)集合),在作業(yè)中將該共享變量廣播出去,然后下游的所有任務(wù)都可以獲取到該共享變量,這樣就可以不用將這個(gè)變量拷貝到下游的每個(gè)任務(wù)中。
    使用方式:
    1: 動(dòng)態(tài)讀取規(guī)則
    繼承 RichSourceFunction,對(duì)應(yīng)以下實(shí)現(xiàn):
public class GetHostSourceMappingFunction extends RichSourceFunction<Set<HostSourceMappingEntity>> {
    private static    final Logger logger = 
   LoggerFactory.getLogger(GetHostSourceMappingFunction.class);
    private volatile  boolean isRunning = true;
    private volatile  String key = null;
    private volatile  String connector = null;
    BaseConfigEntity configEntity;
    // 這里的實(shí)例不可以是單例,該算子會(huì)有自己的生命周期
    private JedisClusterUtil jedisClusterUtil; 
    public static final String [] TYPE_ARRAY = {
            FieldConstants.NEWS,
            FieldConstants.MEDIA
    };
    public GetHostSourceMappingFunction(BaseConfigEntity configEntity) {
        this.configEntity = configEntity;
    }
    @Override
    public void open(Configuration parameters) throws Exception{
        connector = configEntity.getRedis().getConnector();
        key = JedisClusterUtil.combineKey(configEntity.getRedis().getHostSourceMappingKey(),configEntity.getRedis());
        jedisClusterUtil = new JedisClusterUtil(configEntity.getRedis());
    }
    @Override
    public void run(SourceContext<Set<HostSourceMappingEntity>> sourceContext) throws Exception {
        while (isRunning) {
           Set<HostSourceMappingEntity> set = new CopyOnWriteArraySet<>();
            long start = System.currentTimeMillis();
            for (String type:TYPE_ARRAY) {
                try {
                    String result = jedisClusterUtil.getValue(key + connector + type);
                    if(StringUtils.isNotEmpty(result)){
                        set.addAll(JSONArray.parseArray(result,HostSourceMappingEntity.class));
                    }
                }catch(Exception e){
                    logger.error("Maybe occur NNllException ,because the type does not exist,and ignore it",e);
                }
            }
           logger.info("select host source mapping  from redis cluster, the rule size :{}, " +
                   "the used time(ms) is :{}",
                   set.size(),(System.currentTimeMillis() - start));
           // 往下游發(fā)送數(shù)據(jù)集
           sourceContext.collect(set);
           //以固定的頻率進(jìn)行規(guī)則的動(dòng)態(tài)更新
           Thread.sleep(configEntity.getRedis().getInterval());
        }
    }
    @Override
    public void cancel() {
        try{
            super.close();
            // 進(jìn)行連接的釋放
            jedisClusterUtil.close();
        }catch (Exception e) {
            logger.error("runException:{}", e);
        }
        isRunning = false;
    }
}
  1. 主數(shù)據(jù)關(guān)聯(lián)規(guī)則數(shù)據(jù)
 DataStreamSource<Set<HostSourceMappingEntity>> hostSourceMappingDataStream = env.addSource(new GetHostSourceMappingFunction(configEntity)).
                setParallelism(configEntity.getFlink().getRedisSourceParallelism());
 // main stream connect rule stream;可以認(rèn)為是大數(shù)據(jù)流和小數(shù)據(jù)流的join操作 
 SingleOutputStreamOperator<JSONObject> hostSourceFormatDataStream = HostSourceMappingConnectOperator.
                formatSourceConnectOperator(blackFilterDataStream, hostSourceMappingDataStream).
                setParallelism(configEntity.getFlink().getMediumParallelism());
  1. 數(shù)據(jù)處理
    1. processElement 事件處理方法
    2. processBroadcastElement 廣播處理方法
    3. 廣播狀態(tài)中的事件順序可能因任務(wù)而異:盡管廣播流的數(shù)據(jù)元保證所有數(shù)據(jù)元將(最終)轉(zhuǎn)到所有下游任務(wù),但數(shù)據(jù)元可能以不同的順序到達(dá)每個(gè)任務(wù)。因此,每個(gè)傳入數(shù)據(jù)元的狀態(tài)更新不得取決于傳入事件的順序。言外之意,1和2方法沒(méi)有先后執(zhí)行之分, 在 processElement方法中對(duì)于獲取的廣播變量要做好異常值判定,因?yàn)楹芸赡躳rocessBroadcastElement中還沒(méi)有進(jìn)行初始化操作。詳情代碼如下:
public class HostSourceMappingConnectOperator {
    /** logger */
    private static final Logger logger = LoggerFactory.getLogger(HostSourceMappingConnectOperator.class);
    private static String  HOST_SOURCE_MAPPING_NAME = "host_source_mapping_name";
    final static MapStateDescriptor<String, Set<HostSourceMappingEntity>> HOST_SOURCE_MAPPING;

    static {
        HOST_SOURCE_MAPPING = new MapStateDescriptor<>(
                HOST_SOURCE_MAPPING_NAME,
                BasicTypeInfo.STRING_TYPE_INFO,
                TypeInformation.of(new TypeHint<Set<HostSourceMappingEntity>>(){}));
    }

    public static SingleOutputStreamOperator<JSONObject> formatSourceConnectOperator(DataStream<JSONObject> jsonObjectDataStream,
                                                                                   DataStreamSource<Set<HostSourceMappingEntity>> hostSourceMappingStream){
        SingleOutputStreamOperator<JSONObject> filterDataConnectStream = jsonObjectDataStream.connect(hostSourceMappingStream.broadcast(HOST_SOURCE_MAPPING))
                .process(new BroadcastProcessFunction<JSONObject, Set<HostSourceMappingEntity>, JSONObject>() {
                    /**
                     main stream 處理
                     */
                    @Override
                    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
                        ReadOnlyBroadcastState<String, Set<HostSourceMappingEntity>> broadcastState = ctx.getBroadcastState(HOST_SOURCE_MAPPING);
                        Set<HostSourceMappingEntity> hostSourceSet = broadcastState.get(HOST_SOURCE_MAPPING_NAME);
                        if (!FieldConstants.WEI_BO.equals(value.getString(FieldConstants.DOC_TYPE)) &&
                                !FieldConstants.WEI_XIN.equals(value.getString(FieldConstants.DOC_TYPE)) &&
                                !CollectionUtils.isEmpty(hostSourceSet)) {
                                HostSourceMappingEntity targetEntity = null;
                                for (HostSourceMappingEntity hostSourceMappingEntity : hostSourceSet) {
                                    if (value.getString(FieldConstants.URL).contains(hostSourceMappingEntity.getHost())
                                            && hostSourceMappingEntity.getHost().length() > (targetEntity == null ? 0 : targetEntity.getHost().length())) {
                                        targetEntity = hostSourceMappingEntity;
                                    }
                                }
                                if (targetEntity != null) {
                                    logger.info("[target host mapping]" + targetEntity);
                                    value.put(FieldConstants.SOURCE, targetEntity.getTargetSource());
                                    value.put(FieldConstants.DOC_TYPE, targetEntity.getTargetDocType());
                                }

                        }else{
                            if(CollectionUtils.isEmpty(hostSourceSet)) {
                                logger.warn("[hostSource format] broadcast load lazy !!!!!!");
                            }
                        }
                        out.collect(value);
                    }
                     /**
                       處理廣播變量數(shù)據(jù)
                     */
                    @Override
                    public void processBroadcastElement(Set<HostSourceMappingEntity> value, Context ctx, Collector<JSONObject> out) throws Exception {

                        if (CollectionUtil.isNullOrEmpty(value)) {
                            return;
                        }
                        BroadcastState<String, Set<HostSourceMappingEntity>> hostSourceMappingBroadcastState = ctx.getBroadcastState(HOST_SOURCE_MAPPING);
                        hostSourceMappingBroadcastState.put(HOST_SOURCE_MAPPING_NAME, value);
                    }
                }).name(FLINK_OPERATOR_HOST_SOURCE_MAPPING_FROM_REDIS);
        return filterDataConnectStream;
    }
}
  1. 整體的數(shù)據(jù)流圖片段


    廣播變量的應(yīng)用

參數(shù)的解析和配置文件規(guī)范化設(shè)置

  1. 關(guān)于配置文件,相信大家很少再去使用properties,取而代之的是YAML,其良好的層次結(jié)構(gòu),以及和Java Bean的映射,讓大家愛(ài)不釋手。舉例如下:
#kafka
kafka:
  kafkaZk: "172.24.4.18:2181,172.24.4.19:2181,172.24.4.20:2181"
  kafkaTopic: "mf-dev-hl"
  kafkaBrokerList: "172.24.2.78:9092,172.24.2.79:9092,172.24.2.80:9092"
  groupId: "mf-dev-hl"
  autoOffsetReset: "earliest"
  fetchMessageMaxBytes: 4194304
#redis cluster
redis:
  ipPort: "172.24.4.18:7000,172.24.4.18:7001,172.24.4.18:7002,172.24.4.19:7000,172.24.4.19:7001,172.24.4.19:7002"
  businessId: mf
  spamWebSiteKey: spam_website
  connector: _
  areaCategoryMappingKey: area_mapping_dict
  mediaRegionMappingKey: media_region

#es cluster
es:
  clusterName: cluster_index
  nodes: "192.168.x.x,192.168.x.x"
  tcpPorts: "9300,9300"
  httpPorts: "9200"
flink:
  parallelism: 1
  appName: Test1
  1. 關(guān)于命令行解析的使用
    Flink 官方提供ParameterTool應(yīng)具有較好的實(shí)用性,在官方最佳實(shí)踐篇也具有較好的解釋;refer:https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/best_practices.html
    在這里我使用的是: apache.commons.cli,也具有極好的使用性和易用性;代碼片段如下:
private static Options initOptions(){
        Options options = new Options();
        options.addOption(Option.builder(ARGS_TYPE).hasArg(true).required(true).desc("the logistic type in [hl,zhxg,bfd]").build());
        options.addOption(Option.builder(ARGS_ENV).hasArg(true).required(true).desc("the env  in [dev,test,prod]").build());
        return options;
    }

    public static CommandLine getCommandLineWithCheck(String[] args){
        CommandLine cmd = null;
        try {
            CommandLineParser parser = new DefaultParser();
            cmd =  parser.parse( initOptions(), args);
        } catch (ParseException e) {
            logger.error("args parse exception, and invoke system exit function",e);
        }finally {
            if(cmd == null){
                logger.error("參數(shù)解析異常,退出系統(tǒng)");
                System.exit(-1);
            }
            if(!ARGS_TYPE_VALUES.contains(cmd.getOptionValue(ARGS_TYPE))){
                logger.error("the type must in " + ARGS_TYPE_VALUES);
                System.exit(-1);
            }

            if(!ARGS_ENV_VALUES.contains(cmd.getOptionValue(ARGS_ENV))){
                logger.error("the env must in " + ARGS_ENV_VALUES);
                System.exit(-1);
            }
        }
        return cmd;
    }

關(guān)于TaskManger、slot(task slot)、parallelism的認(rèn)知

  • TaskManager: 獨(dú)立的JVM進(jìn)程,其并行能力由slot個(gè)數(shù)決定。配置文件中默認(rèn)如下:taskmanager.numberOfTaskSlots: 1, 每個(gè)Taskmanager由一個(gè)slot組成,即其并行度為1。該 slot獨(dú)享內(nèi)存空間,如果有多個(gè)slot,那么均分Taskmanager內(nèi)存空間。
  • slot:在flink中, slot是資源組的含義。那么他有以下幾個(gè)特性:


    Flink內(nèi)部圖.png
  1. TaskManager 是從 JobManager 處接收需要部署的 Task,任務(wù)能配置的最大并行度由 TaskManager 上可用的 Slot 決定。
  2. 每個(gè)任務(wù)代表分配給任務(wù)槽的一組資源,Slot 在 Flink 里面可以認(rèn)為是資源組,F(xiàn)link 將每個(gè)任務(wù)分成子任務(wù)并且將這些子任務(wù)分配到 Slot 中,這樣就可以并行的執(zhí)行程序。
  3. TaskManager 的一個(gè) Slot 代表一個(gè)可用線程,該線程具有固定的內(nèi)存,注意 Slot 只對(duì)內(nèi)存隔離,沒(méi)有對(duì) CPU 隔離。
  4. 默認(rèn)情況下,F(xiàn)link 允許子任務(wù)共享 Slot,即使它們是不同 Task 的 subtask,只要它們來(lái)自相同的 Job,這種共享模式可以大大的提高資源利用率。
  • parallelism: 對(duì)應(yīng)的算子或者Task的并發(fā)執(zhí)行能力。

  • slot 和 parallelism的區(qū)別

    1. Slot 是指 TaskManager 最大能并發(fā)執(zhí)行的能力


      slot.png

    上圖表示每個(gè) TaskManager 中含有3個(gè)slot,在 Flink on yarn 中的Per Job模式下, taskManager的個(gè)數(shù)是動(dòng)態(tài)計(jì)算出來(lái)的,依據(jù)operator的最大并行度(max(p));計(jì)算公式:ceil( max(p) / numberOfTaskSlots);

  1. parallelism 是指 TaskManager 實(shí)際使用的并發(fā)能力


    parallelism.png

    在并行度為1條件下,只會(huì)使用其中一個(gè) slot。接下來(lái)以實(shí)驗(yàn)來(lái)作此說(shuō)明。

  2. 實(shí)驗(yàn)環(huán)境
    參數(shù)說(shuō)明:-ys:每個(gè)taskManger對(duì)應(yīng)的Slot個(gè)數(shù)
    -ytm: 單個(gè)TaskMananger占用的內(nèi)存大小
    -yn: --yarncontainer Number of YARN container to allocate
    (=Number of Task Managers);對(duì)于Per Job模式是不起作用的,為啦作區(qū)分度,下述的日志以 -yn=80
    程序啟動(dòng)日志顯示:

2020-02-20 00:02:47,157 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument yn is deprecated in will be ignored.
2020-02-20 00:02:47,279 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=4096, numberTaskManagers=80, slotsPerTaskManager=5}

1.png

2.png

ok,相信這一塊已經(jīng)講的很明白啦,有啦這個(gè)基礎(chǔ),便可以進(jìn)行以下的內(nèi)容。

關(guān)于Operator Chains的理解

為了更高效地分布式執(zhí)行,F(xiàn)link會(huì)盡可能地將operator的subtask鏈接(chain)在一起形成task。每個(gè)task在一個(gè)線程中執(zhí)行。將operators鏈接成task是非常有效的優(yōu)化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數(shù)據(jù)在緩沖區(qū)的交換,減少了延遲的同時(shí)提高整體的吞吐量。
下圖便是進(jìn)行啦一次算子的合并:


operator chain.png

那么算子合并的條件是什么哪?

  1. 上下游的并行度一致
  2. 下游節(jié)點(diǎn)的入度為1 (也就是說(shuō)下游節(jié)點(diǎn)沒(méi)有來(lái)自其他節(jié)點(diǎn)的輸入)
  3. 上下游節(jié)點(diǎn)都在同一個(gè) slot group 中(下面會(huì)解釋 slot group)
  4. 下游節(jié)點(diǎn)的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等默認(rèn)是ALWAYS)
  5. 上游節(jié)點(diǎn)的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source默認(rèn)是HEAD)
  6. 兩個(gè)節(jié)點(diǎn)間數(shù)據(jù)分區(qū)方式是 forward(參考理解數(shù)據(jù)流的分區(qū)
  7. 用戶沒(méi)有禁用 chain

可以認(rèn)為效果等同于將所有operator的實(shí)現(xiàn)都封裝于一個(gè)大的方法體中串行執(zhí)行,的確是提供啦極大的靈活性。一套代碼即實(shí)現(xiàn)啦算子的合并,也可實(shí)現(xiàn)算子的拆分。 這里有一篇不錯(cuò)的文章可以參照原理實(shí)現(xiàn):http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/#

BackPressure的原理

一旦提起storm、spark streaming、Flink三種常用的數(shù)據(jù)流框架,最惹人吐槽的便是storm的反壓機(jī)制,storm是通過(guò)監(jiān)控bolt的負(fù)載,暴力的在spout端進(jìn)行數(shù)據(jù)的暫停消費(fèi)。而Flink的反壓機(jī)制這是基于由下游往上游動(dòng)態(tài)傳遞反壓信息。
未完待續(xù)......

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文為《Flink大數(shù)據(jù)項(xiàng)目實(shí)戰(zhàn)》學(xué)習(xí)筆記,想通過(guò)視頻系統(tǒng)學(xué)習(xí)Flink這個(gè)最火爆的大數(shù)據(jù)計(jì)算框架的同學(xué),推薦學(xué)習(xí)...
    大數(shù)據(jù)研習(xí)社閱讀 2,586評(píng)論 0 2
  • 一、整體架構(gòu) Flink整體由JobManager和TaskManager組成,遵循主從設(shè)計(jì)原則,JobManag...
    寇寇寇先森閱讀 1,860評(píng)論 0 2
  • Flink系統(tǒng)組成 Flink是一個(gè)分層系統(tǒng),從下到上分為:系統(tǒng)部署層、任務(wù)運(yùn)行層、API層以及基于API開(kāi)發(fā)的通...
    零度沸騰_yjz閱讀 1,836評(píng)論 0 8
  • 簡(jiǎn)介 Flink運(yùn)行時(shí)主要角色有兩個(gè):JobManager和TaskManager,無(wú)論是standalone集群...
    香山上的麻雀閱讀 23,509評(píng)論 0 15
  • 滿眼的繁星 霸占了整個(gè)天空 幾千米大腦回路 都是你的身影 月兒丟了 玉兒遠(yuǎn)了 牛郎星很亮 你的織女星 也很亮嗎 我...
    田園聽(tīng)雨閱讀 314評(píng)論 2 1

友情鏈接更多精彩內(nèi)容