前言
使用kafkaStream進(jìn)行流式計(jì)算時,如果需要對數(shù)據(jù)進(jìn)行狀態(tài)處理,那么常用的會遇到kafkaStream的store,而store也有Local Store以及Global Store,當(dāng)然也可以使用其他方案的來進(jìn)行狀態(tài)保存,文本主要理清楚kafkaStream中的Local Store以及Global Store之間的區(qū)別和用法,以及什么時候選擇何種store和當(dāng)store無法滿足我們需求時,應(yīng)該如何使用其他方案來進(jìn)行數(shù)據(jù)的狀態(tài)保存
本文所有方法和代碼皆只針對kafka-streams的3.7.0版本,pom如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.7.0</version>
</dependency>
由于不同版本的KafkaStream在使用上有較大區(qū)別,也因?yàn)镵afkaStream不同版本API改動較大,所以如果版本不一致,使用方法甚至是一些核心概念都會跟本文講述有所出入,并且KafkaStream由于相對小眾,文檔也很少,官網(wǎng)的文檔也只是一些簡單介紹,所以需要注意避坑
Local Store和Global Store的共同點(diǎn)和區(qū)別點(diǎn)
共同點(diǎn):
1、都是用于流式計(jì)算中進(jìn)行狀態(tài)存儲的
2、具體結(jié)構(gòu)類似,使用的都是如:KeyValueStore,SessionStore等類
3、實(shí)際機(jī)制類似,會通過內(nèi)存、本地目錄和kafka Topic的變更記錄等方式來進(jìn)行緩存數(shù)據(jù)更新和恢復(fù)
不同點(diǎn)
1、適用場景不同
Local Store 適合用于單個實(shí)例的狀態(tài)管理,適合處理單個分區(qū)的數(shù)據(jù),并且緩存數(shù)據(jù)不會多個實(shí)例共享
Global Store 適用于跨實(shí)例共享數(shù)據(jù)狀態(tài),多個實(shí)例通過Topic中的更新記錄來跟新進(jìn)程中的數(shù)據(jù)
2、使用方法不同
Local Store 可以直接在代碼中調(diào)用對應(yīng)類型存儲(如:KeyValueStore)的put方法進(jìn)行更新數(shù)據(jù),不需要考慮數(shù)據(jù)一致性(因?yàn)榭梢娦灾挥袉蝹€實(shí)例)
Global Store 不能直接調(diào)用對應(yīng)的put和delete方法,所有更新和刪除緩存都需要通過發(fā)送數(shù)據(jù)到Global 配置的topic中,然后自行實(shí)現(xiàn)Topic數(shù)據(jù)消費(fèi)者(實(shí)現(xiàn):org.apache.kafka.streams.processor.api.Processor類),在消費(fèi)者類中進(jìn)行數(shù)據(jù)更新等操作,同時因?yàn)樾枰约簩?shí)現(xiàn)更新實(shí)例中的數(shù)據(jù)邏輯,數(shù)據(jù)一致性也需要開發(fā)者自行處理,雖然正常來說利用Kafka本身的特性很少出現(xiàn)數(shù)據(jù)一致性問題,但是如果多實(shí)例之間性能差異和網(wǎng)絡(luò)環(huán)境等差異,容易將數(shù)據(jù)不一致的時長延長,如果要求Store一致性強(qiáng)且容忍數(shù)據(jù)不一致時限短,則需要注意考慮Store更新數(shù)據(jù)消費(fèi)者的處理能力
3、擴(kuò)展性
Local Store:可以通過增加輸入主題的分區(qū)數(shù)來擴(kuò)展處理能力,但每個實(shí)例仍然獨(dú)立運(yùn)行。
Global Store:需要在多個實(shí)例之間共享狀態(tài),因此在設(shè)計(jì)時需要考慮如何高效地管理和同步狀態(tài)。
常見的Store 類型
org.apache.kafka.streams.state.KeyValueStore
org.apache.kafka.streams.state.SessionStore
org.apache.kafka.streams.state.TimestampedKeyValueStore
org.apache.kafka.streams.state.VersionedKeyValueStore
org.apache.kafka.streams.state.WindowStore
需要根據(jù)實(shí)際使用場景選擇合適的狀態(tài)存儲類
用法
Local Store
第一步,先生成對應(yīng)類型的StoreBuilder對象,如我需要用KeyValueStore,然后狀態(tài)存儲的名字是:testLocalStore(這個名字不能重復(fù),因?yàn)闀鶕?jù)消費(fèi)者id加儲存名稱創(chuàng)建對應(yīng)的Topic,當(dāng)然如果是不同的KafkaStream程序,消費(fèi)者id不一致,那么重復(fù)就沒有關(guān)系了),因?yàn)槭荎eyValue類型的儲存,所以需要設(shè)定對應(yīng)的Key和Value數(shù)據(jù)的序列化對象,具體代碼如下:
StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testLocalStore"), Serdes.String(), Serdes.String());
其中Stores.persistentKeyValueStore代表的我得存儲是持久化的,正常都是會用持久化,當(dāng)然也有存儲一些不重要或者程序重啟丟失也無所謂的狀態(tài)數(shù)據(jù),可以使用Stores.inMemoryKeyValueStore以及基于LRU淘汰機(jī)制的儲存Stores.lruMap,第二個參數(shù)Serdes.String()代表存儲數(shù)據(jù)的key是字符串,第三個參數(shù)同理,如果是要存儲一些對象,也可以使用自定義的序列化類,實(shí)現(xiàn)
org.apache.kafka.common.serialization.Serializer
序列化類,以及反序列化類
org.apache.kafka.common.serialization.Deserializer
然后定義好即可,如:
new Serdes.WrapperSerde<>(new KryoSerializer<>(TestStoreBean.class),
new KryoDeserializer<>(TestStoreBean.class)
其中KryoSerializer和KryoDeserializer是我自定義的使用Kryo序列化Java對象的類,TestStoreBean是我保存的狀態(tài)的數(shù)據(jù)封裝bean
KryoSerializer代碼如下:
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Output;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayOutputStream;
/**
* kryo序列化類
* @author Raye
* @since 2024-6-4
*/
public class KryoSerializer<T> implements Serializer<T> {
private static final ThreadLocal<Kryo> KRYO_LOCAL = new ThreadLocal<Kryo>() {
@Override
protected Kryo initialValue() {
Kryo kryo = new Kryo();
/**
* 不要輕易改變這里的配置!更改之后,序列化的格式就會發(fā)生變化,
* 上線的同時就必須清除 Redis 里的所有緩存,
* 否則那些緩存再回來反序列化的時候,就會報錯
*/
//支持對象循環(huán)引用(否則會棧溢出)
kryo.setReferences(true); //默認(rèn)值就是 true,添加此行的目的是為了提醒維護(hù)者,不要改變這個配置
//不強(qiáng)制要求注冊類(注冊行為無法保證多個 JVM 內(nèi)同一個類的注冊編號相同;而且業(yè)務(wù)系統(tǒng)中大量的 Class 也難以一一注冊)
kryo.setRegistrationRequired(false); //默認(rèn)值就是 false,添加此行的目的是為了提醒維護(hù)者,不要改變這個配置
return kryo;
}
};
/**
* 獲得當(dāng)前線程的 Kryo 實(shí)例
*
* @return 當(dāng)前線程的 Kryo 實(shí)例
*/
public static Kryo getInstance() {
return KRYO_LOCAL.get();
}
private Class<T> clz;
public KryoSerializer(Class<T> clz) {
this.clz = clz;
}
@Override
public byte[] serialize(String s, T t) {
if(t == null){
return null;
}
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream);
Kryo kryo = getInstance();
kryo.writeObjectOrNull(output, t,clz);
output.flush();
return byteArrayOutputStream.toByteArray();
}
}
KryoDeserializer代碼如下:
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import org.apache.kafka.common.serialization.Deserializer;
import java.io.ByteArrayInputStream;
/**
* kryo反序列化類
* @author Raye
* @since 2024-6-4
*/
public class KryoDeserializer<T> implements Deserializer<T> {
private static final ThreadLocal<Kryo> KRYO_LOCAL = new ThreadLocal<Kryo>() {
@Override
protected Kryo initialValue() {
Kryo kryo = new Kryo();
/**
* 不要輕易改變這里的配置!更改之后,序列化的格式就會發(fā)生變化,
* 上線的同時就必須清除 Redis 里的所有緩存,
* 否則那些緩存再回來反序列化的時候,就會報錯
*/
//支持對象循環(huán)引用(否則會棧溢出)
kryo.setReferences(true); //默認(rèn)值就是 true,添加此行的目的是為了提醒維護(hù)者,不要改變這個配置
//不強(qiáng)制要求注冊類(注冊行為無法保證多個 JVM 內(nèi)同一個類的注冊編號相同;而且業(yè)務(wù)系統(tǒng)中大量的 Class 也難以一一注冊)
kryo.setRegistrationRequired(false); //默認(rèn)值就是 false,添加此行的目的是為了提醒維護(hù)者,不要改變這個配置
return kryo;
}
};
/**
* 獲得當(dāng)前線程的 Kryo 實(shí)例
*
* @return 當(dāng)前線程的 Kryo 實(shí)例
*/
public static Kryo getInstance() {
return KRYO_LOCAL.get();
}
private Class<T> clz;
public KryoDeserializer(Class<T> clz) {
this.clz = clz;
}
@Override
public T deserialize(String s, byte[] bytes) {
if(bytes == null){
return null;
}
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream);
Kryo kryo = getInstance();
try {
return kryo.readObjectOrNull(input, clz);
}catch (Exception e){
e.printStackTrace();
}
return null;
}
}
同理,使用LocalStore時,可以將代碼替換成以下內(nèi)容:
StoreBuilder<KeyValueStore<String, TestStoreBean>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testLocalStore"), Serdes.String(), new Serdes.WrapperSerde<>(new KryoSerializer<>(TestStoreBean.class),
new KryoDeserializer<>(TestStoreBean.class));
有了StoreBuilder對象之后,直接在StreamsBuilder對象中添加即可
streamsBuilder.addStateStore(kvBuilder);
需要使用時,先在處理數(shù)據(jù)的Processor類中的init方法獲取對應(yīng)的狀態(tài)存儲對象
this.testLocalStore = context.getStateStore("testLocalStore");
然后就可以在process方法中調(diào)用testLocalStore的get、put、delete等方法操作狀態(tài)存儲數(shù)據(jù)了,具體代碼如下
@Slf4j
public static class StreamProcessor implements Processor<String,String,String,String> {
private KeyValueStore<String,String> testLocalStore;
private ProcessorContext context;
private String toTopic;
@Override
public void init(ProcessorContext context) {
this.context = context;
this.testLocalStore = context.getStateStore("testLocalStore");
}
public StreamProcessor(String toTopic) {
this.toTopic = toTopic;
}
@Override
public void process(Record<String, String> record) {
testLocalStore.put("key1","testValue1");
log.info("testLocalStore key1 : {}",testLocalStore.get("key1"));
testLocalStore.delete("key1");
context.forward(record,toTopic);
}
}
其中實(shí)現(xiàn)的Processor類全稱是:org.apache.kafka.streams.processor.api.Processor,上面代碼只是在數(shù)據(jù)處理流程中簡單保存了數(shù)據(jù),然后獲取出來以及刪除,沒有對流數(shù)據(jù)做任何處理,就直接發(fā)送到輸出的topic了
完整代碼如下:
@Bean
public KStream<String,String> kStreamTestStore(StreamsBuilder streamsBuilder){
log.info("init kStreamTestStore");
StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testLocalStore"), Serdes.String(), Serdes.String());
streamsBuilder.addStateStore(kvBuilder);
KStream<String, String> stream = streamsBuilder.stream(fromTopic);
stream.process(()->new StreamProcessor(toTopic), Named.as(fromTopic),"testLocalStore");
streamsBuilder.build().addSink(toTopic,toTopic,fromTopic);
return stream;
}
注意:由于使用Store需要通過ProcessorContext對象來獲取Store對象,所以在KafkaStream常用的一些map,mapValue,flatMapValues這些流式計(jì)算方法中是沒辦法使用的,只能在一些更底層的Api中去使用,如process
Global Store
同Local Store一樣,需要先生成對應(yīng)類型的StoreBuilder對象,代碼跟Local Store一樣
StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testGlobalStore"), Serdes.String(), Serdes.String());
然后定義處理狀態(tài)更新日志的Processor類,在這個類中,可以對緩存數(shù)據(jù)進(jìn)行更新和刪除操作(其他地方都是不能直接修改Global Store的)
public class GlobalStoreHandleProcessor<K, V> implements Processor<K,V,Void,Void> {
private KeyValueStore<K, V> store;
private String storeName;
public GlobalStoreHandleProcessor(String storeName) {
this.storeName = storeName;
}
@Override
public void process(Record<K,V> record) {
if(record == null || record.value() == null) {
return;
}
store.put(record.key(), record.value());
}
@Override
public void init(ProcessorContext context) {
this.store = context.getStateStore(storeName);
}
}
跟KafkaStream的process是一樣的,只需要在process方法中對緩存進(jìn)行更新或者刪除操作即可,我這里只是簡單put操作,具體邏輯可以根據(jù)自己情況進(jìn)行處理
在StreamsBuilder對象中添加StoreBuilder對象
streamsBuilder.addGlobalStore(kvBuilder,"testGlobalStore", Consumed.with(Serdes.String(),Serdes.String()),
()->new GlobalStoreHandleProcessor<>("testGlobalStore"));
其中第二個參數(shù)testGlobalStore是Global Store綁定的數(shù)據(jù)變更記錄的Topic,如果要更新,則需要通過向這個topic發(fā)送數(shù)據(jù)來進(jìn)行更新Global Store中的數(shù)據(jù)
處理數(shù)據(jù)的Processor類實(shí)例代碼
public static class StreamProcessor implements Processor<String,String,String,String> {
private KeyValueStore<String,String> testGlobalStore;
private ProcessorContext context;
private String toTopic;
@Override
public void init(ProcessorContext context) {
this.context = context;
this.testGlobalStore = context.getStateStore("testGlobalStore");
}
public StreamProcessor(String toTopic) {
this.toTopic = toTopic;
}
@Override
public void process(Record<String, String> record) {
testLocalStore.put(jsonObject.getString("key"),jsonObject.getString("value"));
log.info("testLocalStore key1 : {}",testGlobalStore.get("key1"));
//發(fā)送更新Global Store的數(shù)據(jù)
context.forward(new Record("testGlobalKey","global value",record.timestamp()),"testGlobalStore");
context.forward(record,toTopic);
}
}
與Local Store不同的是,不能在處理數(shù)據(jù)流的時候,對緩存進(jìn)行put操作,只能通過將數(shù)據(jù)發(fā)送到Global Store關(guān)聯(lián)的topic中,在GlobalStoreHandleProcessor中去做更新
完整代碼如下:
@Bean
public KStream<String,String> kStreamTestStore(StreamsBuilder streamsBuilder){
log.info("init kStreamTestStore");
StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testGlobalStore"), Serdes.String(), Serdes.String());
streamsBuilder.addGlobalStore(kvBuilder,"testGlobalStore", Consumed.with(Serdes.String(),Serdes.String()),
()->new GlobalStoreHandleProcessor<>("testGlobalStore"));
KStream<String, String> stream = streamsBuilder.stream(fromTopic);
stream.process(()->new StreamProcessor(toTopic), Named.as(fromTopic));
streamsBuilder.build().addSink(toTopic,toTopic,fromTopic);
streamsBuilder.build().addSink("testGlobalStore","testGlobalStore",fromTopic);
return stream;
}
與Local Store不同點(diǎn)在于,不需要在process方法中添加store的名字,但是因?yàn)橐獜膒rocess方法中直接將更新Store的數(shù)據(jù)發(fā)送到topic,所以需要添加一個Global Store綁定的Topic的輸出擴(kuò)展,也就是下面這行代碼
streamsBuilder.build().addSink("testGlobalStore","testGlobalStore",fromTopic);
不適合的場景
由于KafkaStream Store 沒有自動過期數(shù)據(jù)和過期數(shù)據(jù)自動刪除的概率(可能是有,但是我沒有找到對應(yīng)文檔),所以如果我們存儲的key集合特別大,并且需要自動過期和自動刪除,那么就不適合使用Store來處理了,因?yàn)樾枰覀冏孕刑幚韯h除邏輯,尤其是有些場景中,并不會對過期的key進(jìn)行訪問,所以采用惰性刪除基本上不現(xiàn)實(shí),但是定時刪除,因?yàn)镾tore會存儲到磁盤,如果存儲的key很多,刪除對應(yīng)數(shù)據(jù)的時候耗時很長,尤其是單次刪除大量key的時候,可能會直接超時,并且還必須要自己處理定時刪除的邏輯,想要更好的去刪除,就需要大量時間去開發(fā)和優(yōu)化。
雖然使用內(nèi)存的Store能稍微好點(diǎn),但是畢竟單個進(jìn)程內(nèi)存有限,并且正常流處理中,如果需要保存狀態(tài),那么肯定是希望進(jìn)程重啟之后,能恢復(fù)數(shù)據(jù),避免計(jì)算出錯的,所以如果是有大量不重復(fù)key,并且數(shù)據(jù)需要到期自動刪除的話,可以直接使用Redis做狀態(tài)存儲,并且進(jìn)過我得實(shí)際測試,使用Redis并不比Store慢,并且在key量越來越大的情況下,Redis的性能是完全優(yōu)于Store的(只針對持久化的Store),當(dāng)然使用Redis,還是會更使用Global Store一樣,需要考慮數(shù)據(jù)一致性的問題,不過這個問題可以通過將相同key的數(shù)據(jù)從Kafka Topic就分配到同一個Topic分區(qū)中來避免