1.數(shù)據(jù)一致性
可能未checkpoint程序就中斷了,要重啟,重新消費(fèi)kafka的數(shù)據(jù),同時(shí)更新Redis的數(shù)據(jù),所以不是ExactlyOnce
執(zhí)行出現(xiàn)無(wú)法checkpoint則要把flink和hadoop整合的jar把拷貝到flink的lib目錄中
/**
* 當(dāng)前的程序能不能容錯(cuò)(保證數(shù)據(jù)的一致性)
* 當(dāng)前程序如果可以保證數(shù)據(jù)的一致性,是使用ExactlyOnce還是AtLeastOnce,使用的是AtLeastOnce
*
* KafkaSource:可以記錄偏移量,可以將偏移量保存到狀態(tài)中(OperatorState)
* keyBy后調(diào)用sum:sum有狀態(tài)(ValueState)
* RedisSink:使用HSET方法可以將數(shù)據(jù)覆蓋(冪等性)
*
*
*/
public class KafkaToRedisWordCount {
//--topic doit2021 --groupId g02 --redisHost node-3.51doit.cn --redisPwd 123456 --fsBackend hdfs://node-1.51doit.cn:9000/flinkck2021
public static void main(String[] args) throws Exception{
//System.setProperty("HADOOP_USER_NAME", "root");
ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(parameterTool.getLong("chkInterval", 30000)); //可以間內(nèi)存中的狀態(tài)持久化到StateBackend
//設(shè)置狀態(tài)存儲(chǔ)的后端
env.setStateBackend(new FsStateBackend(parameterTool.getRequired("fsBackend")));
//如果你手動(dòng)cancel job后,不刪除job的checkpoint數(shù)據(jù)
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//添加KafkaSource
//設(shè)置Kafka相關(guān)參數(shù)
Properties properties = new Properties();//設(shè)置Kafka的地址和端口
properties.setProperty("bootstrap.servers", "linux03:9092,linx04:9092,linux05:9092");
//讀取偏移量策略:如果沒(méi)有記錄偏移量,就從頭讀,如果記錄過(guò)偏移量,就接著讀
properties.setProperty("auto.offset.reset", "earliest");
//設(shè)置消費(fèi)者組ID
properties.setProperty("group.id", parameterTool.get("groupId"));
//開(kāi)啟checkpoint,不然讓flink的消費(fèi)(source對(duì)他的subtask)自動(dòng)提交偏移量
properties.setProperty("enable.auto.commit", "false");
//創(chuàng)建FlinkKafkaConsumer并傳入相關(guān)參數(shù)
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
parameterTool.getRequired("topic"), //要讀取數(shù)據(jù)的Topic名稱
new SimpleStringSchema(), //讀取文件的反序列化Schema
properties //傳入Kafka的參數(shù)
);
kafkaConsumer.setCommitOffsetsOnCheckpoints(false); //設(shè)置在checkpoint是不將偏移量保存到kafka特殊的topic中
//使用addSource添加kafkaConsumer
DataStreamSource<String> lines = env.addSource(kafkaConsumer);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
//new Tuple2<String, Integer>(word, 1)
collector.collect(Tuple2.of(word, 1));
}
}
});
//分組
KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(t -> t.f0);
//聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);
//將聚合后的結(jié)果寫(xiě)入到Redis中
//調(diào)用Sink
//summed.addSink()
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setHost(parameterTool.getRequired("redisHost"))
.setPassword(parameterTool.getRequired("redisPwd"))
.setDatabase(9).build();
summed.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisSinkDemo.RedisWordCountMapper()));
env.execute();
}
private static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");
}
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
return data.f1.toString();
}
}
}
/**
* 從指定的socket讀取數(shù)據(jù),對(duì)單詞進(jìn)行計(jì)算,將結(jié)果寫(xiě)入到Redis中
*/
public class RedisSinkDemo {
public static void main(String[] args) throws Exception {
//創(chuàng)建Flink流計(jì)算執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//創(chuàng)建DataStream
//Source
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//調(diào)用Transformation開(kāi)始
//調(diào)用Transformation
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
//new Tuple2<String, Integer>(word, 1)
collector.collect(Tuple2.of(word, 1));
}
}
});
//分組
KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tp) throws Exception {
return tp.f0;
}
});
//聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);
//Transformation結(jié)束
//調(diào)用Sink
//summed.addSink()
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("liunx03").setPassword("123456").setDatabase(8).build();
summed.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
//啟動(dòng)執(zhí)行
env.execute("StreamingWordCount");
}
public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");
}
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
return data.f1.toString();
}
}
}

注意注意默認(rèn)偏移量同時(shí)保持在kafka和hdfs中
kafkaConsumer.setCommitOffsetsOnCheckpoints(false); //設(shè)置在checkpoint是不將偏移量保存到kafka特殊的topic中
2.flink的job重啟
//如果你手動(dòng)cancel job后,不刪除job的checkpoint數(shù)據(jù)
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
3.Savepoint
Checkpoint的生命周期由Flink管理,即Flink創(chuàng)建,擁有和發(fā)布Checkpoint,無(wú)需用戶交互。作為一種恢復(fù)和定期觸發(fā)的方法,Checkpoint實(shí)現(xiàn)的兩個(gè)主要設(shè)計(jì)目標(biāo)是:i)being as lightweight to create (輕量級(jí)),ii)fast restore (快速恢復(fù))。針對(duì)這些目標(biāo)的優(yōu)化可以利用某些屬性,例如,JobCode在執(zhí)行嘗試之間不會(huì)改變。
??與此相反,Savepoints由用戶創(chuàng)建,擁有和刪除。它們的用例是planned (計(jì)劃) 的,manual backup( 手動(dòng)備份 ) 和 resume(恢復(fù))。例如,這可能是Flink版本的更新,更改Job graph ,更改 parallelism ,分配第二個(gè)作業(yè),如紅色/藍(lán)色部署,等等。當(dāng)然,Savepoints必須在終止工作后繼續(xù)存在。從概念上講,保存點(diǎn)的生成和恢復(fù)成本可能更高,并且更多地關(guān)注可移植性和對(duì)前面提到的作業(yè)更改的支持
保存點(diǎn)的作用:(1) 應(yīng)用程序代碼升級(jí):假設(shè)你在已經(jīng)處于運(yùn)行狀態(tài)的應(yīng)用程序中發(fā)現(xiàn)了一個(gè) bug,并且希望之后的事件都可以用修復(fù)后的新版本來(lái)處理。通過(guò)觸發(fā)保存點(diǎn)并從該保存點(diǎn)處運(yùn)行新版本,下游的應(yīng)用程序并不會(huì)察覺(jué)到不同(當(dāng)然,被更新的部分除外)。
(2) Flink 版本更新:Flink 自身的更新也變得簡(jiǎn)單,因?yàn)榭梢葬槍?duì)正在運(yùn)行的任務(wù)觸發(fā)保存點(diǎn),并從保存點(diǎn)處用新版本的 Flink 重啟任務(wù)。
(3) 維護(hù)和遷移:使用保存點(diǎn),可以輕松地“暫停和恢復(fù)”應(yīng)用程序。這對(duì)于集群維護(hù)以及向新集群遷移的作業(yè)來(lái)說(shuō)尤其有用。此外,它還有利于開(kāi)發(fā)、測(cè)試和調(diào)試,因?yàn)椴恍枰夭フ麄€(gè)事件流。
(4) 假設(shè)模擬與恢復(fù):在可控的點(diǎn)上運(yùn)行其他的應(yīng)用邏輯,以模擬假設(shè)的場(chǎng)景,這樣做在很多時(shí)候非常有用。
(5) A/B 測(cè)試:從同一個(gè)保存點(diǎn)開(kāi)始,并行地運(yùn)行應(yīng)用程序的兩個(gè)版本,有助于進(jìn)行 A/B 測(cè)試。