2021-01-25-Flink-30(Flink CheckPoint)

1.數(shù)據(jù)一致性

可能未checkpoint程序就中斷了,要重啟,重新消費(fèi)kafka的數(shù)據(jù),同時(shí)更新Redis的數(shù)據(jù),所以不是ExactlyOnce

執(zhí)行出現(xiàn)無(wú)法checkpoint則要把flink和hadoop整合的jar把拷貝到flink的lib目錄中

/**
 * 當(dāng)前的程序能不能容錯(cuò)(保證數(shù)據(jù)的一致性)
 * 當(dāng)前程序如果可以保證數(shù)據(jù)的一致性,是使用ExactlyOnce還是AtLeastOnce,使用的是AtLeastOnce
 *
 * KafkaSource:可以記錄偏移量,可以將偏移量保存到狀態(tài)中(OperatorState)
 * keyBy后調(diào)用sum:sum有狀態(tài)(ValueState)
 * RedisSink:使用HSET方法可以將數(shù)據(jù)覆蓋(冪等性)
 *
 *
 */
public class KafkaToRedisWordCount {

    //--topic doit2021 --groupId g02 --redisHost node-3.51doit.cn --redisPwd 123456 --fsBackend hdfs://node-1.51doit.cn:9000/flinkck2021
    public static void main(String[] args) throws Exception{

        //System.setProperty("HADOOP_USER_NAME", "root");
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(parameterTool.getLong("chkInterval", 30000)); //可以間內(nèi)存中的狀態(tài)持久化到StateBackend
        //設(shè)置狀態(tài)存儲(chǔ)的后端
        env.setStateBackend(new FsStateBackend(parameterTool.getRequired("fsBackend")));
        //如果你手動(dòng)cancel job后,不刪除job的checkpoint數(shù)據(jù)
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //添加KafkaSource
        //設(shè)置Kafka相關(guān)參數(shù)
        Properties properties = new Properties();//設(shè)置Kafka的地址和端口
        properties.setProperty("bootstrap.servers", "linux03:9092,linx04:9092,linux05:9092");
        //讀取偏移量策略:如果沒(méi)有記錄偏移量,就從頭讀,如果記錄過(guò)偏移量,就接著讀
        properties.setProperty("auto.offset.reset", "earliest");
        //設(shè)置消費(fèi)者組ID
        properties.setProperty("group.id", parameterTool.get("groupId"));
        //開(kāi)啟checkpoint,不然讓flink的消費(fèi)(source對(duì)他的subtask)自動(dòng)提交偏移量
        properties.setProperty("enable.auto.commit", "false");
        //創(chuàng)建FlinkKafkaConsumer并傳入相關(guān)參數(shù)
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                parameterTool.getRequired("topic"), //要讀取數(shù)據(jù)的Topic名稱
                new SimpleStringSchema(), //讀取文件的反序列化Schema
                properties //傳入Kafka的參數(shù)
        );
        kafkaConsumer.setCommitOffsetsOnCheckpoints(false); //設(shè)置在checkpoint是不將偏移量保存到kafka特殊的topic中

        //使用addSource添加kafkaConsumer
        DataStreamSource<String> lines = env.addSource(kafkaConsumer);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    //new Tuple2<String, Integer>(word, 1)
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //分組
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(t -> t.f0);

        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);

        //將聚合后的結(jié)果寫(xiě)入到Redis中
        //調(diào)用Sink
        //summed.addSink()
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                .setHost(parameterTool.getRequired("redisHost"))
                .setPassword(parameterTool.getRequired("redisPwd"))
                .setDatabase(9).build();

        summed.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisSinkDemo.RedisWordCountMapper()));

        env.execute();


    }

    private static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");
        }

        @Override
        public String getKeyFromData(Tuple2<String, Integer> data) {
            return data.f0;
        }

        @Override
        public String getValueFromData(Tuple2<String, Integer> data) {
            return data.f1.toString();
        }
    }

}

/**
 * 從指定的socket讀取數(shù)據(jù),對(duì)單詞進(jìn)行計(jì)算,將結(jié)果寫(xiě)入到Redis中
 */
public class RedisSinkDemo {

    public static void main(String[] args) throws Exception {

        //創(chuàng)建Flink流計(jì)算執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //創(chuàng)建DataStream
        //Source
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //調(diào)用Transformation開(kāi)始
        //調(diào)用Transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    //new Tuple2<String, Integer>(word, 1)
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //分組
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tp) throws Exception {
                return tp.f0;
            }
        });

        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);

        //Transformation結(jié)束

        //調(diào)用Sink
        //summed.addSink()
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("liunx03").setPassword("123456").setDatabase(8).build();

        summed.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
        //啟動(dòng)執(zhí)行
        env.execute("StreamingWordCount");

    }

    public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");
        }

        @Override
        public String getKeyFromData(Tuple2<String, Integer> data) {
            return data.f0;
        }

        @Override
        public String getValueFromData(Tuple2<String, Integer> data) {
            return data.f1.toString();
        }
    }

}

image.png

注意注意默認(rèn)偏移量同時(shí)保持在kafka和hdfs中

kafkaConsumer.setCommitOffsetsOnCheckpoints(false); //設(shè)置在checkpoint是不將偏移量保存到kafka特殊的topic中

2.flink的job重啟

//如果你手動(dòng)cancel job后,不刪除job的checkpoint數(shù)據(jù)
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

3.Savepoint

Checkpoint的生命周期由Flink管理,即Flink創(chuàng)建,擁有和發(fā)布Checkpoint,無(wú)需用戶交互。作為一種恢復(fù)和定期觸發(fā)的方法,Checkpoint實(shí)現(xiàn)的兩個(gè)主要設(shè)計(jì)目標(biāo)是:i)being as lightweight to create (輕量級(jí)),ii)fast restore (快速恢復(fù))。針對(duì)這些目標(biāo)的優(yōu)化可以利用某些屬性,例如,JobCode在執(zhí)行嘗試之間不會(huì)改變。
??與此相反,Savepoints由用戶創(chuàng)建,擁有和刪除。它們的用例是planned (計(jì)劃) 的,manual backup( 手動(dòng)備份 ) 和 resume(恢復(fù))。例如,這可能是Flink版本的更新,更改Job graph ,更改 parallelism ,分配第二個(gè)作業(yè),如紅色/藍(lán)色部署,等等。當(dāng)然,Savepoints必須在終止工作后繼續(xù)存在。從概念上講,保存點(diǎn)的生成和恢復(fù)成本可能更高,并且更多地關(guān)注可移植性和對(duì)前面提到的作業(yè)更改的支持

保存點(diǎn)的作用:(1) 應(yīng)用程序代碼升級(jí):假設(shè)你在已經(jīng)處于運(yùn)行狀態(tài)的應(yīng)用程序中發(fā)現(xiàn)了一個(gè) bug,并且希望之后的事件都可以用修復(fù)后的新版本來(lái)處理。通過(guò)觸發(fā)保存點(diǎn)并從該保存點(diǎn)處運(yùn)行新版本,下游的應(yīng)用程序并不會(huì)察覺(jué)到不同(當(dāng)然,被更新的部分除外)。
(2) Flink 版本更新:Flink 自身的更新也變得簡(jiǎn)單,因?yàn)榭梢葬槍?duì)正在運(yùn)行的任務(wù)觸發(fā)保存點(diǎn),并從保存點(diǎn)處用新版本的 Flink 重啟任務(wù)。
(3) 維護(hù)和遷移:使用保存點(diǎn),可以輕松地“暫停和恢復(fù)”應(yīng)用程序。這對(duì)于集群維護(hù)以及向新集群遷移的作業(yè)來(lái)說(shuō)尤其有用。此外,它還有利于開(kāi)發(fā)、測(cè)試和調(diào)試,因?yàn)椴恍枰夭フ麄€(gè)事件流。
(4) 假設(shè)模擬與恢復(fù):在可控的點(diǎn)上運(yùn)行其他的應(yīng)用邏輯,以模擬假設(shè)的場(chǎng)景,這樣做在很多時(shí)候非常有用。
(5) A/B 測(cè)試:從同一個(gè)保存點(diǎn)開(kāi)始,并行地運(yùn)行應(yīng)用程序的兩個(gè)版本,有助于進(jìn)行 A/B 測(cè)試。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容