Flink 生產(chǎn)實踐

Flink HA

Flink HA 的HighAvailabilityMode類中定義了是那種高可用性模式枚舉:

  • NONE:非HA模式
  • ZOOKEEPER:基于ZK實現(xiàn)HA
  • FACTORY_CLASS:自定義HA工廠類,實現(xiàn)HighAvailabilityServiceFactory接口。

ZooKeeperHaService主要提供了創(chuàng)建LeaderRetrievalService和LeaderElectionService等方法,并給出了各個服務組件使用的ZK節(jié)點名稱。

Flink Exactly-once實現(xiàn)原理解析

流處理引擎通常為用戶的應用程序提供是那種數(shù)據(jù)處理語義:最多一次、至少一次、精確一次。

  • 最多一次: 用戶數(shù)據(jù)只會被處理一次,不管成功還是失敗,不會重試也不會重發(fā)。
  • 至少一次: 系統(tǒng)會保證數(shù)據(jù)或事件被處理一次。如果中間發(fā)生錯誤或者丟失,就會重發(fā)或者重試。
  • 精確一次: 每一條數(shù)據(jù)只會被精確地處理一次,不多也不少。

Flink的快照可以到算子級別,并且對全局數(shù)據(jù)也可以做快照。
Flink分布式快照的核心元素之一是Barrier,該標記是嚴格有序的,并隨著數(shù)據(jù)往下流動。
每個流的barrier n到達時間不一致怎么辦,這是Flink采取的措施是快流等慢流。
Flink在做存儲時,可采用異步方式,每次都是進行的全量checkpoint,是基于上次進行更新的。

快照機制能夠保證作業(yè)出現(xiàn)fail-over后可以從最新的快照進行恢復,即分布式快照機制可以保證Flink系統(tǒng)內(nèi)部的精確一次處理。

兩階段處理繼承TwoPhaseCommitSinkFunction,需要實現(xiàn)beginTransaction、preCommit、commit、abort方法來實現(xiàn)精確一次的處理語義,

  • beginTransaction:在開啟事務之前,在目標文件系統(tǒng)的臨時目錄中創(chuàng)建一個臨時文件,后面在處理數(shù)據(jù)時將數(shù)據(jù)寫入此文件。
  • preCommit:在預提交階段,刷寫文件,然后關閉文件,之后就不能寫入到文件,為屬于下一個檢查點的任何后續(xù)寫入啟動新事務。
  • commit:在提交階段,將預提交的文件原子性移動到真正的目標目錄中,這會增加輸出數(shù)據(jù)可見性的延遲。
  • abort:在終止階段,刪除臨時文件。

Kafka-Flink-Kafka過程:

  • Flink開始做checkpoint操作, 進入pre-commit階段,同時Flink JobManager會將檢查點Barrier注入數(shù)據(jù)流中。
  • 當所有barrier在算子中成功進行一遍傳遞,并完成快照后,則pre-commit階段完成
  • 等所有的算子完成預提交,就會發(fā)起一個提交動作,但是任何一個預提交失敗都會導致Flink回滾到最近的checkpoint;
  • pre-commit完成,必須要確保commit也要成功。

如何排查生產(chǎn)環(huán)境中的反壓問題

不同框架的反壓對比:

  • Storm:從1.0版本之后引入反壓,Storm會主動監(jiān)控工作節(jié)點,工作節(jié)點接收數(shù)據(jù)超過閾值,反壓信息會被發(fā)送到ZooKeeper,ZooKeeper通知所有的工作節(jié)點
    進入反壓狀態(tài),最后數(shù)據(jù)的生產(chǎn)源頭會降低數(shù)據(jù)的發(fā)送速度。
  • Spark Streaming:RateController組件,利用經(jīng)典的PID算法,根據(jù)消息數(shù)量、調度時間、處理時間等計算出來速率,然后進行限速。
  • Flink:利用網(wǎng)絡傳輸和動態(tài)限流,流中的數(shù)據(jù)在算子間進行計算和轉換時,會被放入分布式的阻塞隊列中。當消費者的阻塞隊列滿時,則會降低生產(chǎn)者的數(shù)據(jù)生產(chǎn)速度。

Flink Web UI Back Pressure出現(xiàn)數(shù)值:

  • OK: 0<=Ratio<=0.10,正常;
  • LOW:0.10<Ratio<=0.50,一般;
  • HIGH: 0.5 < Ratio <=1,嚴重。
指標名稱 用途 解釋
outPoolUsage 發(fā)送端緩沖池的使用率 當前Task的數(shù)據(jù)發(fā)送率,如果數(shù)值很低,當前節(jié)點有可能為反壓節(jié)點
inPoolUsage 接收端緩沖池的使用率 Task的接收速度,inPoolUsage很高,outPoolUsage很低,這個節(jié)點有可能是反壓節(jié)點
floatingBuffersUsage 處理節(jié)點緩沖池的使用率
exclusiveBuffersUsage 數(shù)據(jù)輸入方緩沖池的使用率

反壓問題處理:

  • 數(shù)據(jù)傾斜:使用類似的KeyBy等分組聚合函數(shù)導致,需要用戶將熱點key進行預處理,降低或者消除熱點key的影響。
  • GC:使用-XX:+PrintGCDetails參數(shù)查看GC日志
  • 代碼本身:查看機器的CPU、內(nèi)存使用

如何處理生產(chǎn)環(huán)境中的數(shù)據(jù)傾斜問題

兩階段聚合解決KeyBy熱點

根據(jù)type進行KeyBy時,如果數(shù)據(jù)的type分布不均勻就會導致大量的數(shù)據(jù)分配到一個task中,發(fā)生數(shù)據(jù)傾斜。解決的思路為:

  • 首先把分組的key打散,比如添加隨機后綴;
  • 對打散后的數(shù)據(jù)進行聚合;
  • 將打散的key還原為原先的key
  • 二次KeyBy進行結果統(tǒng)計,然后輸出。

Flink消費Kafka數(shù)據(jù)時,要保證Kafka的分區(qū)數(shù)等于Flink Consumer的并行度。如果不一致,需要設置Flink的Redistributing(數(shù)據(jù)充分配),
Rebalance分區(qū)策略,數(shù)據(jù)會以round-robin的方式對數(shù)據(jù)進行再次分區(qū),可以全局負載均衡。
Rescale分區(qū)策略基于上下游的并行度,會將數(shù)據(jù)以循環(huán)的方式輸出到下游的每個實例中。

生產(chǎn)環(huán)境中的并行度和資源設置

在Flink集群中,一個TaskManager就是一個JVM進程,并且會用獨立的線程來執(zhí)行task,slot僅僅用來做內(nèi)存的隔離,對CPU不起作用。

默認情況下,Flink還允許同一個Job的子任務共享slot。

Flink自身會把不同的算子的task連接在一起組成一個新的task。因為task在同一個線程中執(zhí)行,可以有效減少線程間上下文的切換,減少序列化/反序列化帶來的資源消耗,
提高任務的吞吐量。

并行度級別:算子級別、環(huán)境級別、客戶端級別、集群配置級別。

在生產(chǎn)中,推薦在算子級別顯式指定各自的并行度,方便進行顯式和精確的資源控制。
環(huán)境級別:任務中的所有算子的并行度都是指定的值,生產(chǎn)環(huán)境不推薦。

設置并行度的優(yōu)先級為:算子級別 > 環(huán)境級別 > 客戶端級別 > 集群級別配置。

Flink如何做維表關聯(lián)

業(yè)務對維表數(shù)據(jù)關聯(lián)的時效性要求,有以下幾種解決方案:

  • 實時查詢維表:用戶在Flink算子中直接訪問外部數(shù)據(jù)庫,這種是同步方式,數(shù)據(jù)保證是最新的。
  • 預加載全量數(shù)據(jù):每次啟動時,將維表中全部數(shù)據(jù)加載到內(nèi)存中。
  • LRU緩存:將最近最少使用的數(shù)據(jù)則被淘汰。

實時查詢維表

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class DimSync extends RichMapFunction<String,Order> {

    private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);

    private Connection conn = null;
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
    }

    public Order map(String in) throws Exception {

        JSONObject jsonObject = JSONObject.parseObject(in);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");

        //根據(jù)city_id 查詢 city_name
        PreparedStatement pst = conn.prepareStatement("select city_name from info where city_id = ?");
        pst.setInt(1,cityId);
        ResultSet resultSet = pst.executeQuery();
        String cityName = null;
        while (resultSet.next()){
            cityName = resultSet.getString(1);
        }
        pst.close();
        return new Order(cityId,userName,items,cityName);
    }

    public void close() throws Exception {
        super.close();
        conn.close();
    }

}

要保證及時關閉連接池

public class Order {
    private Integer cityId;
    private String userName;
    private String items;
    private String cityName;

    public Order(Integer cityId, String userName, String items, String cityName) {
        this.cityId = cityId;
        this.userName = userName;
        this.items = items;
        this.cityName = cityName;
    }

    public Order() {
    }

    public Integer getCityId() {
        return cityId;
    }

    public void setCityId(Integer cityId) {
        this.cityId = cityId;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getItems() {
        return items;
    }

    public void setItems(String items) {
        this.items = items;
    }

    public String getCityName() {
        return cityName;
    }

    public void setCityName(String cityName) {
        this.cityName = cityName;
    }

    @Override
    public String toString() {
        return "Order{" +
                "cityId=" + cityId +
                ", userName='" + userName + '\'' +
                ", items='" + items + '\'' +
                ", cityName='" + cityName + '\'' +
                '}';
    }
}

預加載全量數(shù)據(jù)

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class WholeLoad extends RichMapFunction<String,Order> {


    private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);
    ScheduledExecutorService executor = null;
    private Map<String,String> cache;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    load();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },5,5, TimeUnit.MINUTES);
    }

    @Override
    public Order map(String value) throws Exception {
        JSONObject jsonObject = JSONObject.parseObject(value);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");
        String cityName = cache.get(cityId);
        return new Order(cityId,userName,items,cityName);
    }

    public void load() throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
        PreparedStatement statement = con.prepareStatement("select city_id,city_name from info");
        ResultSet rs = statement.executeQuery();
        //全量更新維度數(shù)據(jù)到內(nèi)存
        while (rs.next()) {
            String cityId = rs.getString("city_id");
            String cityName = rs.getString("city_name");
            cache.put(cityId, cityName);
        }
        con.close();
    }
}

LRU緩存

import com.alibaba.fastjson.JSONObject;
import com.stumbleupon.async.Callback;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.hbase.async.GetRequest;
import org.hbase.async.HBaseClient;
import org.hbase.async.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class LRU extends RichAsyncFunction<String,Order> {

    private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
    String table = "info";
    Cache<String, String> cache = null;
    private HBaseClient client = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //創(chuàng)建hbase客戶端
        client = new HBaseClient("127.0.0.1","7071");
        cache = CacheBuilder.newBuilder()
                //最多存儲10000條
                .maximumSize(10000)
                //過期時間為1分鐘
                .expireAfterWrite(60, TimeUnit.SECONDS)
                .build();
    }

    @Override
    public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {

        JSONObject jsonObject = JSONObject.parseObject(input);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");
        //讀緩存
        String cacheCityName = cache.getIfPresent(cityId);
        //如果緩存獲取失敗再從hbase獲取維度數(shù)據(jù)
        if(cacheCityName != null){
            Order order = new Order();
            order.setCityId(cityId);
            order.setItems(items);
            order.setUserName(userName);
            order.setCityName(cacheCityName);
            resultFuture.complete(Collections.singleton(order));
        }else {

            client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
                for (KeyValue kv : arg) {
                    String value = new String(kv.value());
                    Order order = new Order();
                    order.setCityId(cityId);
                    order.setItems(items);
                    order.setUserName(userName);
                    order.setCityName(value);
                    resultFuture.complete(Collections.singleton(order));
                    cache.put(String.valueOf(cityId), value);
                }
                return null;
            });

        }
    }
}

海量數(shù)據(jù)去重

Flink中實時去重的方案:

  • 基于狀態(tài)后端
  • 基于HyperLogLog
  • 基于布隆過濾器
  • 基于BitMap
  • 基于外部數(shù)據(jù)庫

基于狀態(tài)后端

狀態(tài)后端的種類之一是RocksDBStateBackend,它會將正在云心中的狀態(tài)數(shù)據(jù)保存在RockDB數(shù)據(jù)庫中,該數(shù)據(jù)庫默認將數(shù)據(jù)存儲在TaskManager運行節(jié)點的數(shù)據(jù)目錄下。
計算每天每個商品的訪問量:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class MapStateDistinctFunction extends KeyedProcessFunction<String,Tuple2<String,Integer>,Tuple2<String,Integer>> {

    private transient ValueState<Integer> counts;

    @Override
    public void open(Configuration parameters) throws Exception {
        //我們設置ValueState的TTL的生命周期為24小時,到期自動清除狀態(tài)
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(org.apache.flink.api.common.time.Time.minutes(24 * 60))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        //設置ValueState的默認值
        ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("skuNum", Integer.class);
        descriptor.enableTimeToLive(ttlConfig);
        counts = getRuntimeContext().getState(descriptor);
        super.open(parameters);
    }


    @Override
    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {

        String f0 = value.f0;

        //如果不存在則新增
        if(counts.value() == null){
            counts.update(1);
        }else{
            //如果存在則加1
            counts.update(counts.value()+1);
        }

        out.collect(Tuple2.of(f0, counts.value()));

    }

}

基于HyperLogLo

HyperLogLog是一種估計統(tǒng)計算法,被用來統(tǒng)計一餓集合中不同數(shù)據(jù)的個數(shù)。

import net.agkn.hll.HLL;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;

public class HyperLogLogDistinct implements AggregateFunction<Tuple2<String,Long>,HLL,Long> {


    @Override
    public HLL createAccumulator() {

        return new HLL(14, 5);
    }

    @Override
    public HLL add(Tuple2<String, Long> value, HLL accumulator) {

        //value為購買記錄 <商品sku, 用戶id>
        accumulator.addRaw(value.f1);
        return accumulator;
    }

    @Override
    public Long getResult(HLL accumulator) {
        long cardinality = accumulator.cardinality();
        return cardinality;
    }


    @Override
    public HLL merge(HLL a, HLL b) {
        a.union(b);
        return a;
    }
}

添加相應的pom依賴:

<dependency>
    <groupId>net.agkn</groupId>
    <artifactId>hll</artifactId>
    <version>1.6.0</version>
</dependency>

如果元素是非數(shù)值型,需要hash過后才能插入。

基于布隆過濾器

BloomFilter類似于一個HashSet,用于快速判斷某個元素是否存在與集合中,其典型的應用場景就是能夠快速判斷一個key是否存在某個容器中,不存在直接返回。

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class BloomFilterDistinct extends KeyedProcessFunction<Long, String, Long> {

    private transient ValueState<BloomFilter> bloomState;
    private transient ValueState<Long> countState;


    @Override
    public void processElement(String value, Context ctx, Collector<Long> out) throws Exception {

        BloomFilter bloomFilter = bloomState.value();
        Long skuCount = countState.value();

        if(bloomFilter == null){
            BloomFilter.create(Funnels.unencodedCharsFunnel(), 10000000);
        }

        if(skuCount == null){
            skuCount = 0L;
        }

        if(!bloomFilter.mightContain(value)){
            bloomFilter.put(value);
            skuCount = skuCount + 1;
        }

        bloomState.update(bloomFilter);
        countState.update(skuCount);
        out.collect(countState.value());
    }
}

BitMap

HyperLogLog 和BloomFilter雖然減少了存儲但是丟失了精度。
BitMap的基本思想是用一個bit位來標記某個元素對應的value,而key即是該元素。

import org.apache.flink.api.common.functions.AggregateFunction;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

public class BitMapDistinct implements AggregateFunction<Long, Roaring64NavigableMap,Long> {


    @Override
    public Roaring64NavigableMap createAccumulator() {
        return new Roaring64NavigableMap();
    }

    @Override
    public Roaring64NavigableMap add(Long value, Roaring64NavigableMap accumulator) {
        accumulator.add(value);
        return accumulator;
    }


    @Override
    public Long getResult(Roaring64NavigableMap accumulator) {
        return accumulator.getLongCardinality();
    }

    @Override
    public Roaring64NavigableMap merge(Roaring64NavigableMap a, Roaring64NavigableMap b) {
        return null;
    }
}

添加依賴:

<dependency>
    <groupId>org.roaringbitmap</groupId>
    <artifactId>RoaringBitmap</artifactId>
    <version>0.9.21</version>
</dependency>
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 一. 背景 數(shù)據(jù)準實時復制(CDC)是目前行內(nèi)實時數(shù)據(jù)需求大量使用的技術,目前行內(nèi)已經(jīng)大量使用IBM CDC 軟件...
    jianwbj閱讀 3,937評論 0 2
  • 概述 2019 年是大數(shù)據(jù)實時計算領域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (內(nèi)部的 Flin...
    Yobhel閱讀 1,910評論 0 33
  • 說明:本文為《Flink大數(shù)據(jù)項目實戰(zhàn)》學習筆記,想通過視頻系統(tǒng)學習Flink這個最火爆的大數(shù)據(jù)計算框架的同學,推...
    大數(shù)據(jù)研習社閱讀 1,243評論 0 0
  • 概述 2019 年是大數(shù)據(jù)實時計算領域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (內(nèi)部的 Flin...
    王知無閱讀 3,333評論 2 11
  • 前篇主要介紹流式計算相關的核心概念,這篇簡要聊聊Flink總體架構、運行環(huán)境及其在大數(shù)據(jù)生態(tài)系統(tǒng)中的位置,讓大家先...
    data之道閱讀 1,442評論 0 6

友情鏈接更多精彩內(nèi)容