Flink HA
Flink HA 的HighAvailabilityMode類中定義了是那種高可用性模式枚舉:
- NONE:非HA模式
- ZOOKEEPER:基于ZK實現(xiàn)HA
- FACTORY_CLASS:自定義HA工廠類,實現(xiàn)HighAvailabilityServiceFactory接口。
ZooKeeperHaService主要提供了創(chuàng)建LeaderRetrievalService和LeaderElectionService等方法,并給出了各個服務組件使用的ZK節(jié)點名稱。
Flink Exactly-once實現(xiàn)原理解析
流處理引擎通常為用戶的應用程序提供是那種數(shù)據(jù)處理語義:最多一次、至少一次、精確一次。
- 最多一次: 用戶數(shù)據(jù)只會被處理一次,不管成功還是失敗,不會重試也不會重發(fā)。
- 至少一次: 系統(tǒng)會保證數(shù)據(jù)或事件被處理一次。如果中間發(fā)生錯誤或者丟失,就會重發(fā)或者重試。
- 精確一次: 每一條數(shù)據(jù)只會被精確地處理一次,不多也不少。
Flink的快照可以到算子級別,并且對全局數(shù)據(jù)也可以做快照。
Flink分布式快照的核心元素之一是Barrier,該標記是嚴格有序的,并隨著數(shù)據(jù)往下流動。
每個流的barrier n到達時間不一致怎么辦,這是Flink采取的措施是快流等慢流。
Flink在做存儲時,可采用異步方式,每次都是進行的全量checkpoint,是基于上次進行更新的。
快照機制能夠保證作業(yè)出現(xiàn)fail-over后可以從最新的快照進行恢復,即分布式快照機制可以保證Flink系統(tǒng)內(nèi)部的精確一次處理。
兩階段處理繼承TwoPhaseCommitSinkFunction,需要實現(xiàn)beginTransaction、preCommit、commit、abort方法來實現(xiàn)精確一次的處理語義,
- beginTransaction:在開啟事務之前,在目標文件系統(tǒng)的臨時目錄中創(chuàng)建一個臨時文件,后面在處理數(shù)據(jù)時將數(shù)據(jù)寫入此文件。
- preCommit:在預提交階段,刷寫文件,然后關閉文件,之后就不能寫入到文件,為屬于下一個檢查點的任何后續(xù)寫入啟動新事務。
- commit:在提交階段,將預提交的文件原子性移動到真正的目標目錄中,這會增加輸出數(shù)據(jù)可見性的延遲。
- abort:在終止階段,刪除臨時文件。
Kafka-Flink-Kafka過程:
- Flink開始做checkpoint操作, 進入pre-commit階段,同時Flink JobManager會將檢查點Barrier注入數(shù)據(jù)流中。
- 當所有barrier在算子中成功進行一遍傳遞,并完成快照后,則pre-commit階段完成
- 等所有的算子完成預提交,就會發(fā)起一個提交動作,但是任何一個預提交失敗都會導致Flink回滾到最近的checkpoint;
- pre-commit完成,必須要確保commit也要成功。
如何排查生產(chǎn)環(huán)境中的反壓問題
不同框架的反壓對比:
- Storm:從1.0版本之后引入反壓,Storm會主動監(jiān)控工作節(jié)點,工作節(jié)點接收數(shù)據(jù)超過閾值,反壓信息會被發(fā)送到ZooKeeper,ZooKeeper通知所有的工作節(jié)點
進入反壓狀態(tài),最后數(shù)據(jù)的生產(chǎn)源頭會降低數(shù)據(jù)的發(fā)送速度。- Spark Streaming:RateController組件,利用經(jīng)典的PID算法,根據(jù)消息數(shù)量、調度時間、處理時間等計算出來速率,然后進行限速。
- Flink:利用網(wǎng)絡傳輸和動態(tài)限流,流中的數(shù)據(jù)在算子間進行計算和轉換時,會被放入分布式的阻塞隊列中。當消費者的阻塞隊列滿時,則會降低生產(chǎn)者的數(shù)據(jù)生產(chǎn)速度。
Flink Web UI Back Pressure出現(xiàn)數(shù)值:
- OK: 0<=Ratio<=0.10,正常;
- LOW:0.10<Ratio<=0.50,一般;
- HIGH: 0.5 < Ratio <=1,嚴重。
| 指標名稱 | 用途 | 解釋 |
|---|---|---|
| outPoolUsage | 發(fā)送端緩沖池的使用率 | 當前Task的數(shù)據(jù)發(fā)送率,如果數(shù)值很低,當前節(jié)點有可能為反壓節(jié)點 |
| inPoolUsage | 接收端緩沖池的使用率 | Task的接收速度,inPoolUsage很高,outPoolUsage很低,這個節(jié)點有可能是反壓節(jié)點 |
| floatingBuffersUsage | 處理節(jié)點緩沖池的使用率 | |
| exclusiveBuffersUsage | 數(shù)據(jù)輸入方緩沖池的使用率 |
反壓問題處理:
- 數(shù)據(jù)傾斜:使用類似的KeyBy等分組聚合函數(shù)導致,需要用戶將熱點key進行預處理,降低或者消除熱點key的影響。
- GC:使用-XX:+PrintGCDetails參數(shù)查看GC日志
- 代碼本身:查看機器的CPU、內(nèi)存使用
如何處理生產(chǎn)環(huán)境中的數(shù)據(jù)傾斜問題
兩階段聚合解決KeyBy熱點
根據(jù)type進行KeyBy時,如果數(shù)據(jù)的type分布不均勻就會導致大量的數(shù)據(jù)分配到一個task中,發(fā)生數(shù)據(jù)傾斜。解決的思路為:
- 首先把分組的key打散,比如添加隨機后綴;
- 對打散后的數(shù)據(jù)進行聚合;
- 將打散的key還原為原先的key
- 二次KeyBy進行結果統(tǒng)計,然后輸出。
Flink消費Kafka數(shù)據(jù)時,要保證Kafka的分區(qū)數(shù)等于Flink Consumer的并行度。如果不一致,需要設置Flink的Redistributing(數(shù)據(jù)充分配),
Rebalance分區(qū)策略,數(shù)據(jù)會以round-robin的方式對數(shù)據(jù)進行再次分區(qū),可以全局負載均衡。
Rescale分區(qū)策略基于上下游的并行度,會將數(shù)據(jù)以循環(huán)的方式輸出到下游的每個實例中。
生產(chǎn)環(huán)境中的并行度和資源設置
在Flink集群中,一個TaskManager就是一個JVM進程,并且會用獨立的線程來執(zhí)行task,slot僅僅用來做內(nèi)存的隔離,對CPU不起作用。
默認情況下,Flink還允許同一個Job的子任務共享slot。
Flink自身會把不同的算子的task連接在一起組成一個新的task。因為task在同一個線程中執(zhí)行,可以有效減少線程間上下文的切換,減少序列化/反序列化帶來的資源消耗,
提高任務的吞吐量。
并行度級別:算子級別、環(huán)境級別、客戶端級別、集群配置級別。
在生產(chǎn)中,推薦在算子級別顯式指定各自的并行度,方便進行顯式和精確的資源控制。
環(huán)境級別:任務中的所有算子的并行度都是指定的值,生產(chǎn)環(huán)境不推薦。
設置并行度的優(yōu)先級為:算子級別 > 環(huán)境級別 > 客戶端級別 > 集群級別配置。
Flink如何做維表關聯(lián)
業(yè)務對維表數(shù)據(jù)關聯(lián)的時效性要求,有以下幾種解決方案:
- 實時查詢維表:用戶在Flink算子中直接訪問外部數(shù)據(jù)庫,這種是同步方式,數(shù)據(jù)保證是最新的。
- 預加載全量數(shù)據(jù):每次啟動時,將維表中全部數(shù)據(jù)加載到內(nèi)存中。
- LRU緩存:將最近最少使用的數(shù)據(jù)則被淘汰。
實時查詢維表
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class DimSync extends RichMapFunction<String,Order> {
private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);
private Connection conn = null;
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
}
public Order map(String in) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(in);
Integer cityId = jsonObject.getInteger("city_id");
String userName = jsonObject.getString("user_name");
String items = jsonObject.getString("items");
//根據(jù)city_id 查詢 city_name
PreparedStatement pst = conn.prepareStatement("select city_name from info where city_id = ?");
pst.setInt(1,cityId);
ResultSet resultSet = pst.executeQuery();
String cityName = null;
while (resultSet.next()){
cityName = resultSet.getString(1);
}
pst.close();
return new Order(cityId,userName,items,cityName);
}
public void close() throws Exception {
super.close();
conn.close();
}
}
要保證及時關閉連接池
public class Order {
private Integer cityId;
private String userName;
private String items;
private String cityName;
public Order(Integer cityId, String userName, String items, String cityName) {
this.cityId = cityId;
this.userName = userName;
this.items = items;
this.cityName = cityName;
}
public Order() {
}
public Integer getCityId() {
return cityId;
}
public void setCityId(Integer cityId) {
this.cityId = cityId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getItems() {
return items;
}
public void setItems(String items) {
this.items = items;
}
public String getCityName() {
return cityName;
}
public void setCityName(String cityName) {
this.cityName = cityName;
}
@Override
public String toString() {
return "Order{" +
"cityId=" + cityId +
", userName='" + userName + '\'' +
", items='" + items + '\'' +
", cityName='" + cityName + '\'' +
'}';
}
}
預加載全量數(shù)據(jù)
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class WholeLoad extends RichMapFunction<String,Order> {
private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);
ScheduledExecutorService executor = null;
private Map<String,String> cache;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
load();
} catch (Exception e) {
e.printStackTrace();
}
}
},5,5, TimeUnit.MINUTES);
}
@Override
public Order map(String value) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(value);
Integer cityId = jsonObject.getInteger("city_id");
String userName = jsonObject.getString("user_name");
String items = jsonObject.getString("items");
String cityName = cache.get(cityId);
return new Order(cityId,userName,items,cityName);
}
public void load() throws Exception {
Class.forName("com.mysql.jdbc.Driver");
Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
PreparedStatement statement = con.prepareStatement("select city_id,city_name from info");
ResultSet rs = statement.executeQuery();
//全量更新維度數(shù)據(jù)到內(nèi)存
while (rs.next()) {
String cityId = rs.getString("city_id");
String cityName = rs.getString("city_name");
cache.put(cityId, cityName);
}
con.close();
}
}
LRU緩存
import com.alibaba.fastjson.JSONObject;
import com.stumbleupon.async.Callback;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.hbase.async.GetRequest;
import org.hbase.async.HBaseClient;
import org.hbase.async.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
public class LRU extends RichAsyncFunction<String,Order> {
private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
String table = "info";
Cache<String, String> cache = null;
private HBaseClient client = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//創(chuàng)建hbase客戶端
client = new HBaseClient("127.0.0.1","7071");
cache = CacheBuilder.newBuilder()
//最多存儲10000條
.maximumSize(10000)
//過期時間為1分鐘
.expireAfterWrite(60, TimeUnit.SECONDS)
.build();
}
@Override
public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(input);
Integer cityId = jsonObject.getInteger("city_id");
String userName = jsonObject.getString("user_name");
String items = jsonObject.getString("items");
//讀緩存
String cacheCityName = cache.getIfPresent(cityId);
//如果緩存獲取失敗再從hbase獲取維度數(shù)據(jù)
if(cacheCityName != null){
Order order = new Order();
order.setCityId(cityId);
order.setItems(items);
order.setUserName(userName);
order.setCityName(cacheCityName);
resultFuture.complete(Collections.singleton(order));
}else {
client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
for (KeyValue kv : arg) {
String value = new String(kv.value());
Order order = new Order();
order.setCityId(cityId);
order.setItems(items);
order.setUserName(userName);
order.setCityName(value);
resultFuture.complete(Collections.singleton(order));
cache.put(String.valueOf(cityId), value);
}
return null;
});
}
}
}
海量數(shù)據(jù)去重
Flink中實時去重的方案:
- 基于狀態(tài)后端
- 基于HyperLogLog
- 基于布隆過濾器
- 基于BitMap
- 基于外部數(shù)據(jù)庫
基于狀態(tài)后端
狀態(tài)后端的種類之一是RocksDBStateBackend,它會將正在云心中的狀態(tài)數(shù)據(jù)保存在RockDB數(shù)據(jù)庫中,該數(shù)據(jù)庫默認將數(shù)據(jù)存儲在TaskManager運行節(jié)點的數(shù)據(jù)目錄下。
計算每天每個商品的訪問量:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class MapStateDistinctFunction extends KeyedProcessFunction<String,Tuple2<String,Integer>,Tuple2<String,Integer>> {
private transient ValueState<Integer> counts;
@Override
public void open(Configuration parameters) throws Exception {
//我們設置ValueState的TTL的生命周期為24小時,到期自動清除狀態(tài)
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.minutes(24 * 60))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
//設置ValueState的默認值
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("skuNum", Integer.class);
descriptor.enableTimeToLive(ttlConfig);
counts = getRuntimeContext().getState(descriptor);
super.open(parameters);
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
String f0 = value.f0;
//如果不存在則新增
if(counts.value() == null){
counts.update(1);
}else{
//如果存在則加1
counts.update(counts.value()+1);
}
out.collect(Tuple2.of(f0, counts.value()));
}
}
基于HyperLogLo
HyperLogLog是一種估計統(tǒng)計算法,被用來統(tǒng)計一餓集合中不同數(shù)據(jù)的個數(shù)。
import net.agkn.hll.HLL;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
public class HyperLogLogDistinct implements AggregateFunction<Tuple2<String,Long>,HLL,Long> {
@Override
public HLL createAccumulator() {
return new HLL(14, 5);
}
@Override
public HLL add(Tuple2<String, Long> value, HLL accumulator) {
//value為購買記錄 <商品sku, 用戶id>
accumulator.addRaw(value.f1);
return accumulator;
}
@Override
public Long getResult(HLL accumulator) {
long cardinality = accumulator.cardinality();
return cardinality;
}
@Override
public HLL merge(HLL a, HLL b) {
a.union(b);
return a;
}
}
添加相應的pom依賴:
<dependency>
<groupId>net.agkn</groupId>
<artifactId>hll</artifactId>
<version>1.6.0</version>
</dependency>
如果元素是非數(shù)值型,需要hash過后才能插入。
基于布隆過濾器
BloomFilter類似于一個HashSet,用于快速判斷某個元素是否存在與集合中,其典型的應用場景就是能夠快速判斷一個key是否存在某個容器中,不存在直接返回。
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class BloomFilterDistinct extends KeyedProcessFunction<Long, String, Long> {
private transient ValueState<BloomFilter> bloomState;
private transient ValueState<Long> countState;
@Override
public void processElement(String value, Context ctx, Collector<Long> out) throws Exception {
BloomFilter bloomFilter = bloomState.value();
Long skuCount = countState.value();
if(bloomFilter == null){
BloomFilter.create(Funnels.unencodedCharsFunnel(), 10000000);
}
if(skuCount == null){
skuCount = 0L;
}
if(!bloomFilter.mightContain(value)){
bloomFilter.put(value);
skuCount = skuCount + 1;
}
bloomState.update(bloomFilter);
countState.update(skuCount);
out.collect(countState.value());
}
}
BitMap
HyperLogLog 和BloomFilter雖然減少了存儲但是丟失了精度。
BitMap的基本思想是用一個bit位來標記某個元素對應的value,而key即是該元素。
import org.apache.flink.api.common.functions.AggregateFunction;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
public class BitMapDistinct implements AggregateFunction<Long, Roaring64NavigableMap,Long> {
@Override
public Roaring64NavigableMap createAccumulator() {
return new Roaring64NavigableMap();
}
@Override
public Roaring64NavigableMap add(Long value, Roaring64NavigableMap accumulator) {
accumulator.add(value);
return accumulator;
}
@Override
public Long getResult(Roaring64NavigableMap accumulator) {
return accumulator.getLongCardinality();
}
@Override
public Roaring64NavigableMap merge(Roaring64NavigableMap a, Roaring64NavigableMap b) {
return null;
}
}
添加依賴:
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.9.21</version>
</dependency>